|
1 |
# -*- coding: utf-8 -*-
|
|
2 |
|
|
3 |
import sys
|
|
4 |
import subprocess
|
|
5 |
import time
|
|
6 |
import os
|
|
7 |
import shutil
|
|
8 |
import random
|
|
9 |
import socket
|
|
10 |
from contextlib import closing
|
|
11 |
from collections import namedtuple
|
|
12 |
|
|
13 |
import psycopg2
|
|
14 |
|
|
15 |
import pytest
|
|
16 |
|
|
17 |
Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid'])
|
|
18 |
|
|
19 |
|
|
20 |
class Database(object):
|
|
21 |
def __init__(self):
|
|
22 |
self.db_name = 'db%s' % random.getrandbits(20)
|
|
23 |
self.dsn = 'dbname=%s' % self.db_name
|
|
24 |
with closing(psycopg2.connect('')) as conn:
|
|
25 |
conn.set_isolation_level(0)
|
|
26 |
with conn.cursor() as cursor:
|
|
27 |
cursor.execute('CREATE DATABASE %s' % self.db_name)
|
|
28 |
|
|
29 |
def conn(self):
|
|
30 |
return closing(psycopg2.connect(self.dsn))
|
|
31 |
|
|
32 |
def delete(self):
|
|
33 |
with closing(psycopg2.connect('')) as conn:
|
|
34 |
conn.set_isolation_level(0)
|
|
35 |
with conn.cursor() as cursor:
|
|
36 |
cursor.execute('DROP DATABASE IF EXISTS %s' % self.db_name)
|
|
37 |
|
|
38 |
|
|
39 |
@pytest.fixture
|
|
40 |
def postgres_db():
|
|
41 |
db = Database()
|
|
42 |
try:
|
|
43 |
yield db
|
|
44 |
finally:
|
|
45 |
db.delete()
|
|
46 |
|
|
47 |
|
|
48 |
WCS_SCRIPTS = {
|
|
49 |
'setup-auth': u"""
|
|
50 |
from quixote import get_publisher
|
|
51 |
|
|
52 |
get_publisher().cfg['identification'] = {'methods': ['password']}
|
|
53 |
get_publisher().cfg['debug'] = {'display_exceptions': 'text'}
|
|
54 |
get_publisher().write_cfg()
|
|
55 |
""",
|
|
56 |
'create-user': u"""
|
|
57 |
from quixote import get_publisher
|
|
58 |
from qommon.ident.password_accounts import PasswordAccount
|
|
59 |
|
|
60 |
user = get_publisher().user_class()
|
|
61 |
user.name = 'foo bar'
|
|
62 |
user.email = 'foo@example.net'
|
|
63 |
user.store()
|
|
64 |
account = PasswordAccount(id='user')
|
|
65 |
account.set_password('user')
|
|
66 |
account.user_id = user.id
|
|
67 |
account.store()
|
|
68 |
""",
|
|
69 |
'create-data': u"""
|
|
70 |
import datetime
|
|
71 |
import random
|
|
72 |
from quixote import get_publisher
|
|
73 |
|
|
74 |
from wcs.categories import Category
|
|
75 |
from wcs.formdef import FormDef
|
|
76 |
from wcs.roles import Role
|
|
77 |
from wcs import fields
|
|
78 |
|
|
79 |
cats = []
|
|
80 |
for i in range(1, 10):
|
|
81 |
cat = Category()
|
|
82 |
cat.name = 'Test %d' % i
|
|
83 |
cat.description = 'Hello world'
|
|
84 |
cat.store()
|
|
85 |
cats.append(cat)
|
|
86 |
|
|
87 |
formdef = FormDef()
|
|
88 |
formdef.name = 'form title'
|
|
89 |
formdef.category_id = cat.id
|
|
90 |
formdef.fields = [
|
|
91 |
fields.StringField(id='1', label='1st field', type='string', anonymise=False, varname='field_string'),
|
|
92 |
fields.ItemField(id='2', label='2nd field', type='item',
|
|
93 |
items=['foo', 'bar', 'baz'], varname='field_item'),
|
|
94 |
]
|
|
95 |
formdef.store()
|
|
96 |
|
|
97 |
user = get_publisher().user_class.select()[0]
|
|
98 |
|
|
99 |
for i in range(50):
|
|
100 |
formdata = formdef.data_class()()
|
|
101 |
formdata.just_created()
|
|
102 |
formdata.receipt_time = datetime.datetime(2018, random.randrange(1, 13), random.randrange(1, 29)).timetuple()
|
|
103 |
formdata.data = {'1': 'FOO BAR %d' % i}
|
|
104 |
if i%4 == 0:
|
|
105 |
formdata.data['2'] = 'foo'
|
|
106 |
formdata.data['2_display'] = 'foo'
|
|
107 |
elif i%4 == 1:
|
|
108 |
formdata.data['2'] = 'bar'
|
|
109 |
formdata.data['2_display'] = 'bar'
|
|
110 |
else:
|
|
111 |
formdata.data['2'] = 'baz'
|
|
112 |
formdata.data['2_display'] = 'baz'
|
|
113 |
if i%3 == 0:
|
|
114 |
formdata.jump_status('new')
|
|
115 |
else:
|
|
116 |
formdata.jump_status('finished')
|
|
117 |
if i%7 == 0:
|
|
118 |
formdata.user_id = user.id
|
|
119 |
formdata.store()
|
|
120 |
|
|
121 |
# another formdef in same category
|
|
122 |
formdef = FormDef()
|
|
123 |
formdef.name = 'a second form title'
|
|
124 |
formdef.category_id = cat.id
|
|
125 |
formdef.fields = []
|
|
126 |
formdef.store()
|
|
127 |
|
|
128 |
# a formdef in another category
|
|
129 |
formdef = FormDef()
|
|
130 |
formdef.name = 'third form title'
|
|
131 |
formdef.category_id = cats[2].id
|
|
132 |
formdef.fields = []
|
|
133 |
formdef.enable_tracking_codes = True
|
|
134 |
formdef.store()
|
|
135 |
|
|
136 |
# a draft of that formdef
|
|
137 |
formdata = formdef.data_class()()
|
|
138 |
formdata.data = {}
|
|
139 |
formdata.page_no = 1
|
|
140 |
formdata.status = 'draft'
|
|
141 |
formdata.receipt_time = datetime.datetime(2015, 1, 1).timetuple()
|
|
142 |
formdata.user_id = user.id
|
|
143 |
formdata.store()
|
|
144 |
|
|
145 |
if '127.0.0.2' in get_publisher().get_frontoffice_url():
|
|
146 |
# create a tracking code on second website only
|
|
147 |
code = get_publisher().tracking_code_class()
|
|
148 |
code.id = 'CNPHNTFB'
|
|
149 |
code.formdata = formdata
|
|
150 |
|
|
151 |
# a private formdef
|
|
152 |
role = Role(name='Blah')
|
|
153 |
role.store()
|
|
154 |
|
|
155 |
formdef = FormDef()
|
|
156 |
formdef.name = 'a private form'
|
|
157 |
formdef.category_id = cats[2].id
|
|
158 |
formdef.roles = [role.id]
|
|
159 |
formdef.backoffice_submission_roles = [role.id]
|
|
160 |
formdef.fields = []
|
|
161 |
formdef.store()
|
|
162 |
|
|
163 |
user2 = get_publisher().user_class() # agent
|
|
164 |
user2.name = 'foo2 bar2'
|
|
165 |
user2.email = 'foo2@example.net'
|
|
166 |
user2.roles = [role.id]
|
|
167 |
user2.store()
|
|
168 |
""",
|
|
169 |
|
|
170 |
}
|
|
171 |
|
|
172 |
|
|
173 |
@pytest.mark.skipif(
|
|
174 |
'WCSCTL' not in os.environ or not os.path.exists(os.environ['WCSCTL']),
|
|
175 |
reason='WCSCTL not defined in environment')
|
|
176 |
@pytest.fixture(scope='session')
|
|
177 |
def wcs(tmp_path_factory):
|
|
178 |
WCSCTL = os.environ.get('WCSCTL')
|
|
179 |
WCS_DIR = tmp_path_factory.mktemp('wcs')
|
|
180 |
|
|
181 |
WCS_PID = None
|
|
182 |
|
|
183 |
def run_wcs_script(script, hostname):
|
|
184 |
script_path = WCS_DIR / (script + '.py')
|
|
185 |
with script_path.open('w') as fd:
|
|
186 |
fd.write(WCS_SCRIPTS[script])
|
|
187 |
|
|
188 |
subprocess.check_call(
|
|
189 |
[WCSCTL, 'runscript', '--app-dir', str(WCS_DIR), '--vhost', hostname,
|
|
190 |
str(script_path)])
|
|
191 |
|
|
192 |
HOSTNAME = '127.0.0.1'
|
|
193 |
tenant_dir = WCS_DIR / HOSTNAME
|
|
194 |
tenant_dir.mkdir()
|
|
195 |
|
|
196 |
run_wcs_script('setup-auth', HOSTNAME)
|
|
197 |
run_wcs_script('create-user', HOSTNAME)
|
|
198 |
run_wcs_script('create-data', HOSTNAME)
|
|
199 |
|
|
200 |
with (tenant_dir / 'site-options.cfg').open('w') as fd:
|
|
201 |
fd.write(u'''[api-secrets]
|
|
202 |
olap = olap
|
|
203 |
''')
|
|
204 |
|
|
205 |
with (WCS_DIR / 'wcs.cfg').open('w') as fd:
|
|
206 |
fd.write(u'''[main]
|
|
207 |
app_dir = %s\n''' % WCS_DIR)
|
|
208 |
|
|
209 |
with (WCS_DIR / 'local_settings.py').open('w') as fd:
|
|
210 |
fd.write(u'''
|
|
211 |
WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg'
|
|
212 |
THEMES_DIRECTORY = '/'
|
|
213 |
ALLOWED_HOSTS = ['%s']
|
|
214 |
''' % (WCS_DIR, HOSTNAME))
|
|
215 |
|
|
216 |
PORT = 8899
|
|
217 |
ADDRESS = '0.0.0.0'
|
|
218 |
WCS_PID = os.fork()
|
|
219 |
if not WCS_PID:
|
|
220 |
os.chdir(os.path.dirname(WCSCTL))
|
|
221 |
os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings'
|
|
222 |
os.environ['WCS_SETTINGS_FILE'] = str(WCS_DIR / 'local_settings.py')
|
|
223 |
os.execvp('python', ['python', 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)])
|
|
224 |
sys.exit(0)
|
|
225 |
# verify django is launched
|
|
226 |
s = socket.socket()
|
|
227 |
i = 0
|
|
228 |
while True:
|
|
229 |
i += 1
|
|
230 |
try:
|
|
231 |
s.connect((ADDRESS, PORT))
|
|
232 |
except Exception:
|
|
233 |
time.sleep(0.1)
|
|
234 |
else:
|
|
235 |
s.close()
|
|
236 |
break
|
|
237 |
assert i < 50, 'no connection found after 5 seconds'
|
|
238 |
|
|
239 |
yield Wcs(url='http://%s:%s/' % (HOSTNAME, PORT), appdir=WCS_DIR, pid=WCS_PID)
|
|
240 |
os.kill(WCS_PID, 9)
|
|
241 |
shutil.rmtree(str(WCS_DIR))
|