Projet

Général

Profil

0001-add-tests-13612.patch

Benjamin Dauvergne, 15 janvier 2019 19:23

Télécharger (10,3 ko)

Voir les différences:

Subject: [PATCH 1/2] add tests (#13612)

 get_wcs.sh        |   4 +
 tests/conftest.py | 241 ++++++++++++++++++++++++++++++++++++++++++++++
 tests/test_wcs.py |  54 +++++++++++
 tox.ini           |   9 +-
 wcs_olap/cmd.py   |   4 +
 5 files changed, 311 insertions(+), 1 deletion(-)
 create mode 100755 get_wcs.sh
 create mode 100644 tests/conftest.py
 create mode 100644 tests/test_wcs.py
get_wcs.sh
1
#!/bin/sh -xue
2

  
3
test -d wcs || git clone http://git.entrouvert.org/wcs.git
4
(cd wcs && git pull)
tests/conftest.py
1
# -*- coding: utf-8 -*-
2

  
3
import sys
4
import subprocess
5
import time
6
import os
7
import shutil
8
import random
9
import socket
10
from contextlib import closing
11
from collections import namedtuple
12

  
13
import psycopg2
14

  
15
import pytest
16

  
17
Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid'])
18

  
19

  
20
class Database(object):
21
    def __init__(self):
22
        self.db_name = 'db%s' % random.getrandbits(20)
23
        self.dsn = 'dbname=%s' % self.db_name
24
        with closing(psycopg2.connect('')) as conn:
25
            conn.set_isolation_level(0)
26
            with conn.cursor() as cursor:
27
                cursor.execute('CREATE DATABASE %s' % self.db_name)
28

  
29
    def conn(self):
30
        return closing(psycopg2.connect(self.dsn))
31

  
32
    def delete(self):
33
        with closing(psycopg2.connect('')) as conn:
34
            conn.set_isolation_level(0)
35
            with conn.cursor() as cursor:
36
                cursor.execute('DROP DATABASE IF EXISTS %s' % self.db_name)
37

  
38

  
39
@pytest.fixture
40
def postgres_db():
41
    db = Database()
42
    try:
43
        yield db
44
    finally:
45
        db.delete()
46

  
47

  
48
WCS_SCRIPTS = {
49
    'setup-auth': u"""
50
from quixote import get_publisher
51

  
52
get_publisher().cfg['identification'] = {'methods': ['password']}
53
get_publisher().cfg['debug'] = {'display_exceptions': 'text'}
54
get_publisher().write_cfg()
55
""",
56
    'create-user': u"""
57
from quixote import get_publisher
58
from qommon.ident.password_accounts import PasswordAccount
59

  
60
user = get_publisher().user_class()
61
user.name = 'foo bar'
62
user.email = 'foo@example.net'
63
user.store()
64
account = PasswordAccount(id='user')
65
account.set_password('user')
66
account.user_id = user.id
67
account.store()
68
""",
69
    'create-data': u"""
70
import datetime
71
import random
72
from quixote import get_publisher
73

  
74
from wcs.categories import Category
75
from wcs.formdef import FormDef
76
from wcs.roles import Role
77
from wcs import fields
78

  
79
cats = []
80
for i in range(1, 10):
81
   cat = Category()
82
   cat.name = 'Test %d' % i
83
   cat.description = 'Hello world'
84
   cat.store()
85
   cats.append(cat)
86

  
87
formdef = FormDef()
88
formdef.name = 'form title'
89
formdef.category_id = cat.id
90
formdef.fields = [
91
    fields.StringField(id='1', label='1st field', type='string', anonymise=False, varname='field_string'),
92
    fields.ItemField(id='2', label='2nd field', type='item',
93
                     items=['foo', 'bar', 'baz'], varname='field_item'),
94
]
95
formdef.store()
96

  
97
user = get_publisher().user_class.select()[0]
98

  
99
for i in range(50):
100
    formdata = formdef.data_class()()
101
    formdata.just_created()
102
    formdata.receipt_time = datetime.datetime(2018, random.randrange(1, 13), random.randrange(1, 29)).timetuple()
103
    formdata.data = {'1': 'FOO BAR %d' % i}
104
    if i%4 == 0:
105
        formdata.data['2'] = 'foo'
106
        formdata.data['2_display'] = 'foo'
107
    elif i%4 == 1:
108
        formdata.data['2'] = 'bar'
109
        formdata.data['2_display'] = 'bar'
110
    else:
111
        formdata.data['2'] = 'baz'
112
        formdata.data['2_display'] = 'baz'
113
    if i%3 == 0:
114
        formdata.jump_status('new')
115
    else:
116
        formdata.jump_status('finished')
117
    if i%7 == 0:
118
        formdata.user_id = user.id
119
    formdata.store()
120

  
121
# another formdef in same category
122
formdef = FormDef()
123
formdef.name = 'a second form title'
124
formdef.category_id = cat.id
125
formdef.fields = []
126
formdef.store()
127

  
128
# a formdef in another category
129
formdef = FormDef()
130
formdef.name = 'third form title'
131
formdef.category_id = cats[2].id
132
formdef.fields = []
133
formdef.enable_tracking_codes = True
134
formdef.store()
135

  
136
# a draft of that formdef
137
formdata = formdef.data_class()()
138
formdata.data = {}
139
formdata.page_no = 1
140
formdata.status = 'draft'
141
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
142
formdata.user_id = user.id
143
formdata.store()
144

  
145
if '127.0.0.2' in get_publisher().get_frontoffice_url():
146
    # create a tracking code on second website only
147
    code = get_publisher().tracking_code_class()
148
    code.id = 'CNPHNTFB'
149
    code.formdata = formdata
150

  
151
# a private formdef
152
role = Role(name='Blah')
153
role.store()
154

  
155
formdef = FormDef()
156
formdef.name = 'a private form'
157
formdef.category_id = cats[2].id
158
formdef.roles = [role.id]
159
formdef.backoffice_submission_roles = [role.id]
160
formdef.fields = []
161
formdef.store()
162

  
163
user2 = get_publisher().user_class()  # agent
164
user2.name = 'foo2 bar2'
165
user2.email = 'foo2@example.net'
166
user2.roles = [role.id]
167
user2.store()
168
""",
169

  
170
}
171

  
172

  
173
@pytest.mark.skipif(
174
    'WCSCTL' not in os.environ or not os.path.exists(os.environ['WCSCTL']),
175
    reason='WCSCTL not defined in environment')
176
@pytest.fixture(scope='session')
177
def wcs(tmp_path_factory):
178
    WCSCTL = os.environ.get('WCSCTL')
179
    WCS_DIR = tmp_path_factory.mktemp('wcs')
180

  
181
    WCS_PID = None
182

  
183
    def run_wcs_script(script, hostname):
184
        script_path = WCS_DIR / (script + '.py')
185
        with script_path.open('w') as fd:
186
            fd.write(WCS_SCRIPTS[script])
187

  
188
        subprocess.check_call(
189
            [WCSCTL, 'runscript', '--app-dir', str(WCS_DIR), '--vhost', hostname,
190
             str(script_path)])
191

  
192
    HOSTNAME = '127.0.0.1'
193
    tenant_dir = WCS_DIR / HOSTNAME
194
    tenant_dir.mkdir()
195

  
196
    run_wcs_script('setup-auth', HOSTNAME)
197
    run_wcs_script('create-user', HOSTNAME)
198
    run_wcs_script('create-data', HOSTNAME)
199

  
200
    with (tenant_dir / 'site-options.cfg').open('w') as fd:
201
        fd.write(u'''[api-secrets]
202
olap = olap
203
''')
204

  
205
    with (WCS_DIR / 'wcs.cfg').open('w') as fd:
206
        fd.write(u'''[main]
207
app_dir = %s\n''' % WCS_DIR)
208

  
209
    with (WCS_DIR / 'local_settings.py').open('w') as fd:
210
        fd.write(u'''
211
WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg'
212
THEMES_DIRECTORY = '/'
213
ALLOWED_HOSTS = ['%s']
214
''' % (WCS_DIR, HOSTNAME))
215

  
216
    PORT = 8899
217
    ADDRESS = '0.0.0.0'
218
    WCS_PID = os.fork()
219
    if not WCS_PID:
220
        os.chdir(os.path.dirname(WCSCTL))
221
        os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings'
222
        os.environ['WCS_SETTINGS_FILE'] = str(WCS_DIR / 'local_settings.py')
223
        os.execvp('python', ['python', 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)])
224
        sys.exit(0)
225
    # verify django is launched
226
    s = socket.socket()
227
    i = 0
228
    while True:
229
        i += 1
230
        try:
231
            s.connect((ADDRESS, PORT))
232
        except Exception:
233
            time.sleep(0.1)
234
        else:
235
            s.close()
236
            break
237
        assert i < 50, 'no connection found after 5 seconds'
238

  
239
    yield Wcs(url='http://%s:%s/' % (HOSTNAME, PORT), appdir=WCS_DIR, pid=WCS_PID)
240
    os.kill(WCS_PID, 9)
241
    shutil.rmtree(str(WCS_DIR))
tests/test_wcs.py
1
import subprocess
2

  
3

  
4
def test_wcs_fixture(wcs, postgres_db, tmpdir):
5
    config_ini = tmpdir / 'config.ini'
6
    model_dir = tmpdir / 'model_dir'
7
    model_dir.mkdir()
8
    with config_ini.open('w') as fd:
9
        fd.write(u'''
10
[wcs-olap]
11
cubes_model_dirs = {model_dir}
12
pg_dsn = {dsn}
13

  
14
[loggers]
15
keys = root
16

  
17
[formatters]
18
keys = console
19

  
20
[formatter_console]
21
format = %(asctime)s %(levelname)s %(message)s
22

  
23
[handlers]
24
keys = console
25

  
26
[handler_console]
27
class = StreamHandler
28
level = NOTSET
29
args = (sys.stderr,)
30
formatter = console
31

  
32
[logger_root]
33
level = DEBUG
34
handlers = console
35

  
36
[{wcs.url}]
37
orig = olap
38
key = olap
39
schema = public
40
'''.format(wcs=wcs, model_dir=model_dir, dsn=postgres_db.dsn))
41

  
42
    from wcs_olap import cmd
43
    import sys
44

  
45
    sys.argv = ['', '--no-log-errors', str(config_ini)]
46
    cmd.main2()
47

  
48
    with postgres_db.conn() as conn:
49
        with conn.cursor() as c:
50
            c.execute('SELECT table_name, column_name FROM information_schema.columns WHERE table_schema = \'public\' ORDER BY table_name, ordinal_position')
51
            for row in c.fetchall():
52
                print row
53
            import pdb
54
            pdb.set_trace()
tox.ini
9 9
[testenv]
10 10
usedevelop = true
11 11
setenv =
12
	WCSCTL=wcs/wcsctl.py
12 13
	coverage: COVERAGE=--junit-xml=junit.xml --cov=src --cov-report xml 
13 14
deps =
14 15
	coverage
15 16
	pytest
16 17
	pytest-cov
17 18
	pytest-random
19
	quixote<3.0
20
	psycopg2-binary
21
	vobject
22
	gadjo
23
	django>=1.11,<1.12
18 24
commands =
19
	py.test {env:COVERAGE:} {posargs:--random tests}
25
	./get_wcs.sh
26
	py.test {env:COVERAGE:} {posargs:--random-group tests}
wcs_olap/cmd.py
49 49
    group = parser.add_mutually_exclusive_group()
50 50
    parser.add_argument('--no-feed', dest='feed', help='only produce the model',
51 51
                        action='store_false', default=True)
52
    parser.add_argument('--no-log-errors', dest='no_log_errors',
53
                        action='store_true', default=False)
52 54
    parser.add_argument('--fake', action='store_true', default=False)
53 55
    group.add_argument("-a", "--all", help="synchronize all wcs", action='store_true',
54 56
                       default=False)
......
111 113
                logger.info('finished')
112 114
                feed_result = False
113 115
            except:
116
                if args.no_log_errors:
117
                    raise
114 118
                feed_result = True
115 119
                logger.exception('failed to synchronize with %s', url)
116 120
            failure = failure or feed_result
117
-