0001-misc-fix-pylint-consider-using-with-53406.patch
pylint.rc | ||
---|---|---|
13 | 13 |
consider-using-dict-comprehension, |
14 | 14 |
consider-using-max-builtin, |
15 | 15 |
consider-using-set-comprehension, |
16 |
consider-using-with, |
|
17 | 16 |
cyclic-import, |
18 | 17 |
duplicate-code, |
19 | 18 |
fixme, |
tests/admin_pages/test_all.py | ||
---|---|---|
115 | 115 |
resp = get_app(pub).get('/backoffice/settings/', status=200) |
116 | 116 | |
117 | 117 |
# check it doesn't work with a non-empty ADMIN_FOR_ALL file |
118 |
fd = open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w') |
|
119 |
fd.write('x.x.x.x') |
|
120 |
fd.close() |
|
118 |
with open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w') as fd: |
|
119 |
fd.write('x.x.x.x') |
|
121 | 120 |
resp = get_app(pub).get('/backoffice/settings/', status=302) |
122 | 121 | |
123 | 122 |
# check it works if the file contains the user IP address |
124 |
fd = open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w') |
|
125 |
fd.write('127.0.0.1') |
|
126 |
fd.close() |
|
123 |
with open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w') as fd: |
|
124 |
fd.write('127.0.0.1') |
|
127 | 125 |
resp = get_app(pub).get('/backoffice/settings/', status=200) |
128 | 126 | |
129 | 127 |
# check it's also ok if the user is logged in but doesn't have the |
tests/admin_pages/test_datasource.py | ||
---|---|---|
251 | 251 | |
252 | 252 |
# check json |
253 | 253 |
json_file_path = os.path.join(pub.app_dir, 'test.json') |
254 |
json_file = open(json_file_path, 'w') |
|
255 |
json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file) |
|
256 |
json_file.close() |
|
254 |
with open(json_file_path, 'w') as json_file: |
|
255 |
json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file) |
|
257 | 256 | |
258 | 257 |
data_source.data_source = {'type': 'json', 'value': 'file://%s' % json_file_path} |
259 | 258 |
data_source.store() |
... | ... | |
263 | 262 |
assert 'foo' in resp.text |
264 | 263 | |
265 | 264 |
# with other attributes |
266 |
json_file = open(json_file_path, 'w') |
|
267 |
json.dump({'results': [{'pk': '1', 'label': 'foo'}, {'pk': '2'}]}, json_file) |
|
268 |
json_file.close() |
|
265 |
with open(json_file_path, 'w') as json_file: |
|
266 |
json.dump({'results': [{'pk': '1', 'label': 'foo'}, {'pk': '2'}]}, json_file) |
|
269 | 267 | |
270 | 268 |
data_source.data_attribute = 'results' |
271 | 269 |
data_source.id_attribute = 'pk' |
... | ... | |
288 | 286 | |
289 | 287 |
# check geojson |
290 | 288 |
geojson_file_path = os.path.join(pub.app_dir, 'test.geojson') |
291 |
geojson_file = open(geojson_file_path, 'w') |
|
292 |
json.dump( |
|
293 |
{ |
|
294 |
'features': [ |
|
295 |
{'properties': {'id': '1', 'text': 'foo', 'label': 'foo'}}, |
|
296 |
{'properties': {'id': '2', 'text': 'bar', 'label': 'bar'}}, |
|
297 |
] |
|
298 |
}, |
|
299 |
geojson_file, |
|
300 |
) |
|
301 |
geojson_file.close() |
|
289 |
with open(geojson_file_path, 'w') as geojson_file: |
|
290 |
json.dump( |
|
291 |
{ |
|
292 |
'features': [ |
|
293 |
{'properties': {'id': '1', 'text': 'foo', 'label': 'foo'}}, |
|
294 |
{'properties': {'id': '2', 'text': 'bar', 'label': 'bar'}}, |
|
295 |
] |
|
296 |
}, |
|
297 |
geojson_file, |
|
298 |
) |
|
302 | 299 |
data_source.data_source = {'type': 'geojson', 'value': 'file://%s' % geojson_file_path} |
303 | 300 |
data_source.store() |
304 | 301 |
with HttpRequestsMocking(): |
tests/admin_pages/test_form.py | ||
---|---|---|
2439 | 2439 |
resp = resp.click(href='archive') |
2440 | 2440 |
resp = resp.form.submit('submit') |
2441 | 2441 |
assert resp.content_type == 'application/x-wcs-archive' |
2442 |
tf = tarfile.open(fileobj=io.BytesIO(resp.body))
|
|
2443 |
assert 'formdef' in [x.name for x in tf.getmembers()] |
|
2444 |
assert len(tf.getmembers()) == 8 # 7 formdata + 1 formdef |
|
2442 |
with tarfile.open(fileobj=io.BytesIO(resp.body)) as tf:
|
|
2443 |
assert 'formdef' in [x.name for x in tf.getmembers()]
|
|
2444 |
assert len(tf.getmembers()) == 8 # 7 formdata + 1 formdef
|
|
2445 | 2445 | |
2446 | 2446 |
# second archive, it shouldn't get anything (but the formdef) |
2447 | 2447 |
resp = app.get('/backoffice/forms/1/') |
2448 | 2448 |
resp = resp.click(href='archive') |
2449 | 2449 |
resp = resp.form.submit('submit') |
2450 | 2450 |
assert resp.content_type == 'application/x-wcs-archive' |
2451 |
tf = tarfile.open(fileobj=io.BytesIO(resp.body))
|
|
2452 |
assert 'formdef' in [x.name for x in tf.getmembers()] |
|
2453 |
assert len(tf.getmembers()) == 1 # 0 formdata + 1 formdef |
|
2451 |
with tarfile.open(fileobj=io.BytesIO(resp.body)) as tf:
|
|
2452 |
assert 'formdef' in [x.name for x in tf.getmembers()]
|
|
2453 |
assert len(tf.getmembers()) == 1 # 0 formdata + 1 formdef
|
|
2454 | 2454 | |
2455 | 2455 | |
2456 | 2456 |
def test_form_overwrite(pub): |
tests/admin_pages/test_settings.py | ||
---|---|---|
115 | 115 |
assert 'completed' in resp.text |
116 | 116 |
resp = resp.click('Download Export') |
117 | 117 |
zip_content = io.BytesIO(resp.body) |
118 |
zipf = zipfile.ZipFile(zip_content, 'a')
|
|
119 |
filelist = zipf.namelist() |
|
118 |
with zipfile.ZipFile(zip_content, 'a') as zipf:
|
|
119 |
filelist = zipf.namelist()
|
|
120 | 120 |
assert len(filelist) == 0 |
121 | 121 | |
122 | 122 |
# check afterjob ajax call |
... | ... | |
166 | 166 |
resp = resp.follow() |
167 | 167 |
resp = resp.click('Download Export') |
168 | 168 |
zip_content = io.BytesIO(resp.body) |
169 |
zipf = zipfile.ZipFile(zip_content, 'a')
|
|
170 |
filelist = zipf.namelist() |
|
169 |
with zipfile.ZipFile(zip_content, 'a') as zipf:
|
|
170 |
filelist = zipf.namelist()
|
|
171 | 171 |
assert 'formdefs/1' not in filelist |
172 | 172 |
assert 'formdefs_xml/1' in filelist |
173 | 173 |
assert 'carddefs/1' not in filelist |
... | ... | |
249 | 249 |
resp = resp.follow() |
250 | 250 |
resp = resp.click('Download Export') |
251 | 251 |
zip_content = io.BytesIO(resp.body) |
252 |
zipf = zipfile.ZipFile(zip_content, 'a')
|
|
253 |
filelist = zipf.namelist() |
|
252 |
with zipfile.ZipFile(zip_content, 'a') as zipf:
|
|
253 |
filelist = zipf.namelist()
|
|
254 | 254 |
assert 'formdefs_xml/%s' % formdef.id in filelist |
255 | 255 |
assert 'workflows_xml/%s' % workflow.id in filelist |
256 | 256 |
assert 'roles_xml/%s' % role.id not in filelist |
... | ... | |
283 | 283 |
resp = resp.follow() |
284 | 284 |
resp = resp.click('Download Export') |
285 | 285 |
zip_content = io.BytesIO(resp.body) |
286 |
zipf = zipfile.ZipFile(zip_content, 'a')
|
|
287 |
filelist = zipf.namelist() |
|
286 |
with zipfile.ZipFile(zip_content, 'a') as zipf:
|
|
287 |
filelist = zipf.namelist()
|
|
288 | 288 |
assert len([x for x in filelist if 'roles_xml/' in x]) == 0 |
289 | 289 | |
290 | 290 |
# check an error is displayed if such an import is then used and roles are |
... | ... | |
305 | 305 |
# create mock theme |
306 | 306 |
os.mkdir(os.path.join(pub.app_dir, 'themes')) |
307 | 307 |
os.mkdir(os.path.join(pub.app_dir, 'themes', 'test')) |
308 |
fd = open(os.path.join(pub.app_dir, 'themes', 'test', 'desc.xml'), 'w') |
|
309 |
fd.write( |
|
310 |
'<?xml version="1.0"?>' '<theme name="test" version="1.0">' ' <label>Test Theme</label>' '</theme>' |
|
311 |
) |
|
312 |
fd.close() |
|
308 |
with open(os.path.join(pub.app_dir, 'themes', 'test', 'desc.xml'), 'w') as fd: |
|
309 |
fd.write( |
|
310 |
'<?xml version="1.0"?>' |
|
311 |
'<theme name="test" version="1.0">' |
|
312 |
' <label>Test Theme</label>' |
|
313 |
'</theme>' |
|
314 |
) |
|
313 | 315 | |
314 | 316 |
resp = app.get('/backoffice/settings/themes') |
315 | 317 |
assert 'biglist themes' in resp.text |
... | ... | |
831 | 833 |
resp = app.get('/backoffice/settings/themes') |
832 | 834 |
resp = resp.click('download', index=0) |
833 | 835 |
assert resp.headers['content-type'] == 'application/zip' |
836 | ||
834 | 837 |
zip_content = io.BytesIO(resp.body) |
835 |
zipf = zipfile.ZipFile(zip_content, 'a') |
|
836 |
filelist = zipf.namelist() |
|
837 |
assert 'alto/icon.png' in filelist |
|
838 |
assert 'alto/desc.xml' in filelist |
|
839 |
assert 'alto/template.ezt' in filelist |
|
840 |
assert 'alto/wcs.css' in filelist |
|
841 | ||
842 |
# modify it |
|
843 |
zipf.writestr('alto/foobar.txt', 'XXX') |
|
844 |
zipf.close() |
|
838 |
with zipfile.ZipFile(zip_content, 'a') as zipf: |
|
839 |
filelist = zipf.namelist() |
|
840 |
assert 'alto/icon.png' in filelist |
|
841 |
assert 'alto/desc.xml' in filelist |
|
842 |
assert 'alto/template.ezt' in filelist |
|
843 |
assert 'alto/wcs.css' in filelist |
|
844 | ||
845 |
# modify it |
|
846 |
zipf.writestr('alto/foobar.txt', 'XXX') |
|
845 | 847 | |
846 | 848 |
# upload it |
847 | 849 |
resp = app.get('/backoffice/settings/themes') |
tests/api/test_custom_view.py | ||
---|---|---|
244 | 244 | |
245 | 245 |
# check it now gets the data |
246 | 246 |
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user)) |
247 |
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
|
|
248 |
ods_sheet = ET.parse(zipf.open('content.xml')) |
|
247 |
with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
|
|
248 |
ods_sheet = ET.parse(zipf.open('content.xml'))
|
|
249 | 249 |
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 11 |
250 | 250 | |
251 | 251 |
pub.custom_view_class.wipe() |
... | ... | |
258 | 258 |
custom_view.store() |
259 | 259 | |
260 | 260 |
resp = get_app(pub).get(sign_uri('/api/forms/test/ods/custom-view', user=local_user)) |
261 |
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
|
|
262 |
ods_sheet = ET.parse(zipf.open('content.xml')) |
|
261 |
with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
|
|
262 |
ods_sheet = ET.parse(zipf.open('content.xml'))
|
|
263 | 263 |
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 21 |
264 | 264 | |
265 | 265 |
tests/api/test_formdata.py | ||
---|---|---|
45 | 45 |
pub.cfg['language'] = {'language': 'en'} |
46 | 46 |
pub.write_cfg() |
47 | 47 | |
48 |
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write( |
|
49 |
'''\ |
|
48 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
49 |
fd.write( |
|
50 |
'''\ |
|
50 | 51 |
[api-secrets] |
51 | 52 |
coucou = 1234 |
52 | 53 |
''' |
53 |
) |
|
54 |
)
|
|
54 | 55 | |
55 | 56 |
return pub |
56 | 57 | |
... | ... | |
965 | 966 | |
966 | 967 |
resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user)) |
967 | 968 |
assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet' |
968 |
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
|
|
969 |
ods_sheet = ET.parse(zipf.open('content.xml')) |
|
969 |
with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
|
|
970 |
ods_sheet = ET.parse(zipf.open('content.xml'))
|
|
970 | 971 |
assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 311 |
971 | 972 | |
972 | 973 |
tests/backoffice_pages/test_all.py | ||
---|---|---|
66 | 66 |
pub.cfg['identification'] = {'methods': ['password']} |
67 | 67 |
pub.cfg['language'] = {'language': 'en'} |
68 | 68 |
pub.write_cfg() |
69 |
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') |
|
70 |
fd.write( |
|
71 |
''' |
|
72 |
[api-secrets] |
|
73 |
coucou = 1234 |
|
74 |
''' |
|
75 |
) |
|
76 |
fd.close() |
|
69 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
70 |
fd.write( |
|
71 |
''' |
|
72 |
[api-secrets] |
|
73 |
coucou = 1234 |
|
74 |
''' |
|
75 |
) |
|
77 | 76 | |
78 | 77 |
return pub |
79 | 78 | |
... | ... | |
528 | 527 |
if not pub.site_options.has_section('options'): |
529 | 528 |
pub.site_options.add_section('options') |
530 | 529 |
pub.site_options.set('options', 'default-sort-order', '-last_update_time') |
531 |
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') |
|
532 |
pub.site_options.write(fd) |
|
533 |
fd.close() |
|
530 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
531 |
pub.site_options.write(fd) |
|
534 | 532 | |
535 | 533 |
resp = app.get('/backoffice/management/form-title/') |
536 | 534 |
assert resp.text.count('data-link') == 17 |
... | ... | |
605 | 603 |
if not pub.site_options.has_section('variables'): |
606 | 604 |
pub.site_options.add_section('variables') |
607 | 605 |
pub.site_options.set('variables', 'welco_url', 'xxx') |
608 |
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') |
|
609 |
pub.site_options.write(fd) |
|
610 |
fd.close() |
|
606 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
607 |
pub.site_options.write(fd) |
|
611 | 608 | |
612 | 609 |
create_superuser(pub) |
613 | 610 |
create_environment(pub) |
... | ... | |
1885 | 1882 |
pub.site_options.add_section('options') |
1886 | 1883 | |
1887 | 1884 |
pub.site_options.set('options', 'map-tile-urltemplate', 'https://{s}.tile.example.net/{z}/{x}/{y}.png') |
1888 |
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') |
|
1889 |
pub.site_options.write(fd) |
|
1890 |
fd.close() |
|
1885 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
1886 |
pub.site_options.write(fd) |
|
1891 | 1887 | |
1892 | 1888 |
resp = app.get('/backoffice/management/form-title/') |
1893 | 1889 |
resp = resp.click('Plot on a Map') |
... | ... | |
2085 | 2081 |
resp = app.get('/backoffice/management/form-title/%s/' % number31.id) |
2086 | 2082 |
resp = resp.click('Download all files as .zip') |
2087 | 2083 |
zip_content = io.BytesIO(resp.body) |
2088 |
zipf = zipfile.ZipFile(zip_content, 'a')
|
|
2089 |
filelist = zipf.namelist() |
|
2090 |
assert set(filelist) == {'1_bar', '2_bar'} |
|
2091 |
for zipinfo in zipf.infolist(): |
|
2092 |
content = zipf.read(zipinfo) |
|
2093 |
if zipinfo.filename == '1_bar': |
|
2094 |
assert content == b'hello world' |
|
2095 |
elif zipinfo.filename == '2_bar': |
|
2096 |
assert content == b'hello world2' |
|
2097 |
else: |
|
2098 |
assert False # unknown zip part |
|
2084 |
with zipfile.ZipFile(zip_content, 'a') as zipf:
|
|
2085 |
filelist = zipf.namelist()
|
|
2086 |
assert set(filelist) == {'1_bar', '2_bar'}
|
|
2087 |
for zipinfo in zipf.infolist():
|
|
2088 |
content = zipf.read(zipinfo)
|
|
2089 |
if zipinfo.filename == '1_bar':
|
|
2090 |
assert content == b'hello world'
|
|
2091 |
elif zipinfo.filename == '2_bar':
|
|
2092 |
assert content == b'hello world2'
|
|
2093 |
else:
|
|
2094 |
assert False # unknown zip part
|
|
2099 | 2095 | |
2100 | 2096 | |
2101 | 2097 |
def test_backoffice_sidebar_user_template(pub): |
... | ... | |
3011 | 3007 |
if not pub.site_options.has_section('variables'): |
3012 | 3008 |
pub.site_options.add_section('variables') |
3013 | 3009 |
pub.site_options.set('variables', 'welco_url', 'xxx') |
3014 |
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') |
|
3015 |
pub.site_options.write(fd) |
|
3016 |
fd.close() |
|
3010 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
3011 |
pub.site_options.write(fd) |
|
3017 | 3012 | |
3018 | 3013 |
resp = app.get('/backoffice/management/listing?limit=500') |
3019 | 3014 |
formdata = formdef.data_class().select(lambda x: x.status == 'wf-new')[0] |
... | ... | |
3611 | 3606 |
if not pub.site_options.has_section('options'): |
3612 | 3607 |
pub.site_options.add_section('options') |
3613 | 3608 |
pub.site_options.set('options', 'per-user-view', 'true') |
3614 |
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') |
|
3615 |
pub.site_options.write(fd) |
|
3616 |
fd.close() |
|
3609 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
3610 |
pub.site_options.write(fd) |
|
3617 | 3611 | |
3618 | 3612 |
resp = app.get('/backoffice/management/').follow() |
3619 | 3613 |
assert 'Per User View' in resp.text |
tests/backoffice_pages/test_carddata.py | ||
---|---|---|
35 | 35 |
pub.cfg['identification'] = {'methods': ['password']} |
36 | 36 |
pub.cfg['language'] = {'language': 'en'} |
37 | 37 |
pub.write_cfg() |
38 |
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') |
|
39 |
fd.write( |
|
40 |
''' |
|
41 |
[api-secrets] |
|
42 |
coucou = 1234 |
|
43 |
''' |
|
44 |
) |
|
45 |
fd.close() |
|
38 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
39 |
fd.write( |
|
40 |
''' |
|
41 |
[api-secrets] |
|
42 |
coucou = 1234 |
|
43 |
''' |
|
44 |
) |
|
46 | 45 | |
47 | 46 |
return pub |
48 | 47 |
tests/backoffice_pages/test_custom_view.py | ||
---|---|---|
31 | 31 |
pub.cfg['identification'] = {'methods': ['password']} |
32 | 32 |
pub.cfg['language'] = {'language': 'en'} |
33 | 33 |
pub.write_cfg() |
34 |
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') |
|
35 |
fd.write( |
|
36 |
''' |
|
37 |
[api-secrets] |
|
38 |
coucou = 1234 |
|
39 |
''' |
|
40 |
) |
|
41 |
fd.close() |
|
34 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
35 |
fd.write( |
|
36 |
''' |
|
37 |
[api-secrets] |
|
38 |
coucou = 1234 |
|
39 |
''' |
|
40 |
) |
|
42 | 41 | |
43 | 42 |
return pub |
44 | 43 |
tests/backoffice_pages/test_export.py | ||
---|---|---|
37 | 37 |
pub.cfg['identification'] = {'methods': ['password']} |
38 | 38 |
pub.cfg['language'] = {'language': 'en'} |
39 | 39 |
pub.write_cfg() |
40 |
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') |
|
41 |
fd.write( |
|
42 |
''' |
|
43 |
[api-secrets] |
|
44 |
coucou = 1234 |
|
45 |
''' |
|
46 |
) |
|
47 |
fd.close() |
|
40 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
41 |
fd.write( |
|
42 |
''' |
|
43 |
[api-secrets] |
|
44 |
coucou = 1234 |
|
45 |
''' |
|
46 |
) |
|
48 | 47 | |
49 | 48 |
return pub |
50 | 49 | |
... | ... | |
236 | 235 |
if not pub.site_options.has_section('variables'): |
237 | 236 |
pub.site_options.add_section('variables') |
238 | 237 |
pub.site_options.set('variables', 'welco_url', 'xxx') |
239 |
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') |
|
240 |
pub.site_options.write(fd) |
|
241 |
fd.close() |
|
238 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
239 |
pub.site_options.write(fd) |
|
242 | 240 | |
243 | 241 |
create_superuser(pub) |
244 | 242 | |
... | ... | |
272 | 270 | |
273 | 271 | |
274 | 272 |
def test_backoffice_csv_export_anonymised(pub): |
275 |
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') |
|
276 |
pub.site_options.write(fd) |
|
277 |
fd.close() |
|
273 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
274 |
pub.site_options.write(fd) |
|
278 | 275 | |
279 | 276 |
create_superuser(pub) |
280 | 277 | |
... | ... | |
483 | 480 |
assert 'filename=form-title.ods' in resp.headers['content-disposition'] |
484 | 481 |
assert resp.body[:2] == b'PK' # ods has a zip container |
485 | 482 | |
486 |
zipf = zipfile.ZipFile(io.BytesIO(resp.body))
|
|
487 |
ods_sheet = ET.parse(zipf.open('content.xml')) |
|
483 |
with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
|
|
484 |
ods_sheet = ET.parse(zipf.open('content.xml'))
|
|
488 | 485 |
# check the ods contains a link to the document |
489 | 486 |
elem = ods_sheet.findall('.//{%s}a' % ods.NS['text'])[0] |
490 | 487 |
assert ( |
tests/backoffice_pages/test_submission.py | ||
---|---|---|
37 | 37 |
pub.cfg['identification'] = {'methods': ['password']} |
38 | 38 |
pub.cfg['language'] = {'language': 'en'} |
39 | 39 |
pub.write_cfg() |
40 |
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') |
|
41 |
fd.write( |
|
42 |
''' |
|
43 |
[api-secrets] |
|
44 |
coucou = 1234 |
|
45 |
''' |
|
46 |
) |
|
47 |
fd.close() |
|
40 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
41 |
fd.write( |
|
42 |
''' |
|
43 |
[api-secrets] |
|
44 |
coucou = 1234 |
|
45 |
''' |
|
46 |
) |
|
48 | 47 | |
49 | 48 |
return pub |
50 | 49 |
tests/form_pages/test_all.py | ||
---|---|---|
56 | 56 | |
57 | 57 | |
58 | 58 |
def assert_equal_zip(stream1, stream2): |
59 |
z1 = zipfile.ZipFile(stream1) |
|
60 |
z2 = zipfile.ZipFile(stream2) |
|
61 |
assert set(z1.namelist()) == set(z2.namelist()) |
|
62 |
for name in z1.namelist(): |
|
63 |
if name == 'styles.xml': |
|
64 |
continue |
|
65 |
if name in ['content.xml', 'meta.xml']: |
|
66 |
t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name))) |
|
67 |
try: |
|
68 |
# >= python 3.8: tostring preserves attribute order; use canonicalize to sort them |
|
69 |
t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2) |
|
70 |
except AttributeError: |
|
71 |
pass |
|
72 |
else: |
|
73 |
t1, t2 = z1.read(name), z2.read(name) |
|
74 |
assert t1 == t2, 'file "%s" differs' % name |
|
59 |
with zipfile.ZipFile(stream1) as z1, zipfile.ZipFile(stream2) as z2: |
|
60 |
assert set(z1.namelist()) == set(z2.namelist()) |
|
61 |
for name in z1.namelist(): |
|
62 |
if name == 'styles.xml': |
|
63 |
continue |
|
64 |
if name in ['content.xml', 'meta.xml']: |
|
65 |
t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name))) |
|
66 |
try: |
|
67 |
# >= python 3.8: tostring preserves attribute order; use canonicalize to sort them |
|
68 |
t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2) |
|
69 |
except AttributeError: |
|
70 |
pass |
|
71 |
else: |
|
72 |
t1, t2 = z1.read(name), z2.read(name) |
|
73 |
assert t1 == t2, 'file "%s" differs' % name |
|
75 | 74 | |
76 | 75 | |
77 | 76 |
def pytest_generate_tests(metafunc): |
tests/form_pages/test_formdata.py | ||
---|---|---|
60 | 60 | |
61 | 61 | |
62 | 62 |
def assert_equal_zip(stream1, stream2): |
63 |
z1 = zipfile.ZipFile(stream1) |
|
64 |
z2 = zipfile.ZipFile(stream2) |
|
65 |
assert set(z1.namelist()) == set(z2.namelist()) |
|
66 |
for name in z1.namelist(): |
|
67 |
if name == 'styles.xml': |
|
68 |
continue |
|
69 |
if name in ['content.xml', 'meta.xml']: |
|
70 |
t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name))) |
|
71 |
try: |
|
72 |
# >= python 3.8: tostring preserves attribute order; use canonicalize to sort them |
|
73 |
t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2) |
|
74 |
except AttributeError: |
|
75 |
pass |
|
76 |
else: |
|
77 |
t1, t2 = z1.read(name), z2.read(name) |
|
78 |
assert t1 == t2, 'file "%s" differs' % name |
|
63 |
with zipfile.ZipFile(stream1) as z1, zipfile.ZipFile(stream2) as z2: |
|
64 |
assert set(z1.namelist()) == set(z2.namelist()) |
|
65 |
for name in z1.namelist(): |
|
66 |
if name == 'styles.xml': |
|
67 |
continue |
|
68 |
if name in ['content.xml', 'meta.xml']: |
|
69 |
t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name))) |
|
70 |
try: |
|
71 |
# >= python 3.8: tostring preserves attribute order; use canonicalize to sort them |
|
72 |
t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2) |
|
73 |
except AttributeError: |
|
74 |
pass |
|
75 |
else: |
|
76 |
t1, t2 = z1.read(name), z2.read(name) |
|
77 |
assert t1 == t2, 'file "%s" differs' % name |
|
79 | 78 | |
80 | 79 | |
81 | 80 |
def test_formdata_attachment_download(pub): |
tests/test_categories.py | ||
---|---|---|
123 | 123 |
test.description = 'Hello world' |
124 | 124 | |
125 | 125 |
os.mkdir(os.path.join(pub.app_dir, 'categories')) |
126 |
fd = open(os.path.join(pub.app_dir, 'categories', '1'), 'wb') |
|
127 |
pickle.dump(test, fd) |
|
128 |
fd.close() |
|
126 |
with open(os.path.join(pub.app_dir, 'categories', '1'), 'wb') as fd: |
|
127 |
pickle.dump(test, fd) |
|
129 | 128 | |
130 | 129 |
test2 = Category.get(1) |
131 | 130 |
assert test.id == test2.id |
tests/test_datasource.py | ||
---|---|---|
29 | 29 |
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'}) |
30 | 30 |
pub.set_app_dir(req) |
31 | 31 | |
32 |
open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write( |
|
33 |
''' |
|
32 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
33 |
fd.write( |
|
34 |
''' |
|
34 | 35 |
[wscall-secrets] |
35 | 36 |
api.example.com = 1234 |
36 | 37 |
''' |
37 |
) |
|
38 |
)
|
|
38 | 39 | |
39 | 40 |
pub.load_site_options() |
40 | 41 | |
... | ... | |
192 | 193 | |
193 | 194 |
# invalid json file |
194 | 195 |
get_request().datasources_cache = {} |
195 |
json_file = open(json_file_path, 'wb') |
|
196 |
json_file.write(codecs.encode(b'foobar', 'zlib_codec')) |
|
197 |
json_file.close() |
|
196 |
with open(json_file_path, 'wb') as json_file: |
|
197 |
json_file.write(codecs.encode(b'foobar', 'zlib_codec')) |
|
198 | 198 |
assert data_sources.get_items(datasource) == [] |
199 | 199 | |
200 | 200 |
# empty json file |
201 | 201 |
get_request().datasources_cache = {} |
202 |
json_file = open(json_file_path, 'w') |
|
203 |
json.dump({}, json_file) |
|
204 |
json_file.close() |
|
202 |
with open(json_file_path, 'w') as json_file: |
|
203 |
json.dump({}, json_file) |
|
205 | 204 |
assert data_sources.get_items(datasource) == [] |
206 | 205 | |
207 | 206 |
# unrelated json file |
208 | 207 |
get_request().datasources_cache = {} |
209 |
json_file = open(json_file_path, 'w') |
|
210 |
json.dump('foobar', json_file) |
|
211 |
json_file.close() |
|
208 |
with open(json_file_path, 'w') as json_file: |
|
209 |
json.dump('foobar', json_file) |
|
212 | 210 |
assert data_sources.get_items(datasource) == [] |
213 | 211 | |
214 | 212 |
# another unrelated json file |
215 | 213 |
get_request().datasources_cache = {} |
216 |
json_file = open(json_file_path, 'w') |
|
217 |
json.dump({'data': 'foobar'}, json_file) |
|
218 |
json_file.close() |
|
214 |
with open(json_file_path, 'w') as json_file: |
|
215 |
json.dump({'data': 'foobar'}, json_file) |
|
219 | 216 |
assert data_sources.get_items(datasource) == [] |
220 | 217 | |
221 | 218 |
# json file not using dictionaries |
222 | 219 |
get_request().datasources_cache = {} |
223 |
json_file = open(json_file_path, 'w') |
|
224 |
json.dump({'data': [['1', 'foo'], ['2', 'bar']]}, json_file) |
|
225 |
json_file.close() |
|
220 |
with open(json_file_path, 'w') as json_file: |
|
221 |
json.dump({'data': [['1', 'foo'], ['2', 'bar']]}, json_file) |
|
226 | 222 |
assert data_sources.get_items(datasource) == [] |
227 | 223 | |
228 | 224 |
# a good json file |
229 | 225 |
get_request().datasources_cache = {} |
230 |
json_file = open(json_file_path, 'w') |
|
231 |
json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file) |
|
232 |
json_file.close() |
|
226 |
with open(json_file_path, 'w') as json_file: |
|
227 |
json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file) |
|
233 | 228 |
assert data_sources.get_items(datasource) == [ |
234 | 229 |
('1', 'foo', '1', {'id': '1', 'text': 'foo'}), |
235 | 230 |
('2', 'bar', '2', {'id': '2', 'text': 'bar'}), |
... | ... | |
241 | 236 | |
242 | 237 |
# a json file with additional keys |
243 | 238 |
get_request().datasources_cache = {} |
244 |
json_file = open(json_file_path, 'w') |
|
245 |
json.dump( |
|
246 |
{'data': [{'id': '1', 'text': 'foo', 'more': 'xxx'}, {'id': '2', 'text': 'bar', 'more': 'yyy'}]}, |
|
247 |
json_file, |
|
248 |
) |
|
249 |
json_file.close() |
|
239 |
with open(json_file_path, 'w') as json_file: |
|
240 |
json.dump( |
|
241 |
{'data': [{'id': '1', 'text': 'foo', 'more': 'xxx'}, {'id': '2', 'text': 'bar', 'more': 'yyy'}]}, |
|
242 |
json_file, |
|
243 |
) |
|
250 | 244 |
assert data_sources.get_items(datasource) == [ |
251 | 245 |
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'more': 'xxx'}), |
252 | 246 |
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'more': 'yyy'}), |
... | ... | |
299 | 293 | |
300 | 294 |
# a json file with integer as 'id' |
301 | 295 |
get_request().datasources_cache = {} |
302 |
json_file = open(json_file_path, 'w') |
|
303 |
json.dump({'data': [{'id': 1, 'text': 'foo'}, {'id': 2, 'text': 'bar'}]}, json_file) |
|
304 |
json_file.close() |
|
296 |
with open(json_file_path, 'w') as json_file: |
|
297 |
json.dump({'data': [{'id': 1, 'text': 'foo'}, {'id': 2, 'text': 'bar'}]}, json_file) |
|
305 | 298 |
assert data_sources.get_items(datasource) == [ |
306 | 299 |
('1', 'foo', '1', {'id': 1, 'text': 'foo'}), |
307 | 300 |
('2', 'bar', '2', {'id': 2, 'text': 'bar'}), |
... | ... | |
313 | 306 | |
314 | 307 |
# a json file with empty or no text values |
315 | 308 |
get_request().datasources_cache = {} |
316 |
json_file = open(json_file_path, 'w') |
|
317 |
json.dump({'data': [{'id': '1', 'text': ''}, {'id': '2'}]}, json_file) |
|
318 |
json_file.close() |
|
309 |
with open(json_file_path, 'w') as json_file: |
|
310 |
json.dump({'data': [{'id': '1', 'text': ''}, {'id': '2'}]}, json_file) |
|
319 | 311 |
assert data_sources.get_items(datasource) == [ |
320 | 312 |
('1', '', '1', {'id': '1', 'text': ''}), |
321 | 313 |
('2', '2', '2', {'id': '2', 'text': '2'}), |
... | ... | |
327 | 319 | |
328 | 320 |
# a json file with empty or no id |
329 | 321 |
get_request().datasources_cache = {} |
330 |
json_file = open(json_file_path, 'w') |
|
331 |
json.dump({'data': [{'id': '', 'text': 'foo'}, {'text': 'bar'}, {'id': None}]}, json_file) |
|
332 |
json_file.close() |
|
322 |
with open(json_file_path, 'w') as json_file: |
|
323 |
json.dump({'data': [{'id': '', 'text': 'foo'}, {'text': 'bar'}, {'id': None}]}, json_file) |
|
333 | 324 |
assert data_sources.get_items(datasource) == [] |
334 | 325 |
assert data_sources.get_structured_items(datasource) == [] |
335 | 326 | |
336 | 327 |
# specify data_attribute |
337 | 328 |
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'data_attribute': 'results'} |
338 | 329 |
get_request().datasources_cache = {} |
339 |
json_file = open(json_file_path, 'w') |
|
340 |
json.dump({'results': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file) |
|
341 |
json_file.close() |
|
330 |
with open(json_file_path, 'w') as json_file: |
|
331 |
json.dump({'results': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file) |
|
342 | 332 |
assert data_sources.get_structured_items(datasource) == [ |
343 | 333 |
{'id': '1', 'text': 'foo'}, |
344 | 334 |
{'id': '2', 'text': 'bar'}, |
... | ... | |
351 | 341 |
# specify id_attribute |
352 | 342 |
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'id_attribute': 'pk'} |
353 | 343 |
get_request().datasources_cache = {} |
354 |
json_file = open(json_file_path, 'w') |
|
355 |
json.dump({'data': [{'pk': '1', 'text': 'foo'}, {'pk': '2', 'text': 'bar'}]}, json_file) |
|
356 |
json_file.close() |
|
344 |
with open(json_file_path, 'w') as json_file: |
|
345 |
json.dump({'data': [{'pk': '1', 'text': 'foo'}, {'pk': '2', 'text': 'bar'}]}, json_file) |
|
357 | 346 |
assert data_sources.get_structured_items(datasource) == [ |
358 | 347 |
{'id': '1', 'text': 'foo', 'pk': '1'}, |
359 | 348 |
{'id': '2', 'text': 'bar', 'pk': '2'}, |
... | ... | |
366 | 355 |
# specify text_attribute |
367 | 356 |
datasource = {'type': 'json', 'value': ' {{ json_url }}', 'text_attribute': 'label'} |
368 | 357 |
get_request().datasources_cache = {} |
369 |
json_file = open(json_file_path, 'w') |
|
370 |
json.dump({'data': [{'id': '1', 'label': 'foo'}, {'id': '2', 'label': 'bar'}]}, json_file) |
|
371 |
json_file.close() |
|
358 |
with open(json_file_path, 'w') as json_file: |
|
359 |
json.dump({'data': [{'id': '1', 'label': 'foo'}, {'id': '2', 'label': 'bar'}]}, json_file) |
|
372 | 360 |
assert data_sources.get_structured_items(datasource) == [ |
373 | 361 |
{'id': '1', 'text': 'foo', 'label': 'foo'}, |
374 | 362 |
{'id': '2', 'text': 'bar', 'label': 'bar'}, |
... | ... | |
508 | 496 | |
509 | 497 |
# invalid geojson file |
510 | 498 |
get_request().datasources_cache = {} |
511 |
geojson_file = open(geojson_file_path, 'wb') |
|
512 |
geojson_file.write(codecs.encode(b'foobar', 'zlib_codec')) |
|
513 |
geojson_file.close() |
|
499 |
with open(geojson_file_path, 'wb') as geojson_file: |
|
500 |
geojson_file.write(codecs.encode(b'foobar', 'zlib_codec')) |
|
514 | 501 |
assert data_sources.get_items(datasource) == [] |
515 | 502 | |
516 | 503 |
# empty geojson file |
517 | 504 |
get_request().datasources_cache = {} |
518 |
geojson_file = open(geojson_file_path, 'w') |
|
519 |
json.dump({}, geojson_file) |
|
520 |
geojson_file.close() |
|
505 |
with open(geojson_file_path, 'w') as geojson_file: |
|
506 |
json.dump({}, geojson_file) |
|
521 | 507 |
assert data_sources.get_items(datasource) == [] |
522 | 508 | |
523 | 509 |
# unrelated geojson file |
524 | 510 |
get_request().datasources_cache = {} |
525 |
geojson_file = open(geojson_file_path, 'w') |
|
526 |
json.dump('foobar', geojson_file) |
|
527 |
geojson_file.close() |
|
511 |
with open(geojson_file_path, 'w') as geojson_file: |
|
512 |
json.dump('foobar', geojson_file) |
|
528 | 513 |
assert data_sources.get_items(datasource) == [] |
529 | 514 | |
530 | 515 |
# another unrelated geojson file |
531 | 516 |
get_request().datasources_cache = {} |
532 |
geojson_file = open(geojson_file_path, 'w') |
|
533 |
json.dump({'features': 'foobar'}, geojson_file) |
|
534 |
geojson_file.close() |
|
517 |
with open(geojson_file_path, 'w') as geojson_file: |
|
518 |
json.dump({'features': 'foobar'}, geojson_file) |
|
535 | 519 |
assert data_sources.get_items(datasource) == [] |
536 | 520 | |
537 | 521 |
# a good geojson file |
538 | 522 |
get_request().datasources_cache = {} |
539 |
geojson_file = open(geojson_file_path, 'w') |
|
540 |
json.dump( |
|
541 |
{ |
|
542 |
'features': [ |
|
543 |
{'properties': {'id': '1', 'text': 'foo'}}, |
|
544 |
{'properties': {'id': '2', 'text': 'bar'}}, |
|
545 |
] |
|
546 |
}, |
|
547 |
geojson_file, |
|
548 |
) |
|
549 |
geojson_file.close() |
|
523 |
with open(geojson_file_path, 'w') as geojson_file: |
|
524 |
json.dump( |
|
525 |
{ |
|
526 |
'features': [ |
|
527 |
{'properties': {'id': '1', 'text': 'foo'}}, |
|
528 |
{'properties': {'id': '2', 'text': 'bar'}}, |
|
529 |
] |
|
530 |
}, |
|
531 |
geojson_file, |
|
532 |
) |
|
550 | 533 |
assert data_sources.get_items(datasource) == [ |
551 | 534 |
('1', 'foo', '1', {'id': '1', 'text': 'foo', 'properties': {'id': '1', 'text': 'foo'}}), |
552 | 535 |
('2', 'bar', '2', {'id': '2', 'text': 'bar', 'properties': {'id': '2', 'text': 'bar'}}), |
... | ... | |
558 | 541 | |
559 | 542 |
# a geojson file with additional keys |
560 | 543 |
get_request().datasources_cache = {} |
561 |
geojson_file = open(geojson_file_path, 'w') |
|
562 |
json.dump( |
|
563 |
{ |
|
564 |
'features': [ |
|
565 |
{'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}}, |
|
566 |
{'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}}, |
|
567 |
] |
|
568 |
}, |
|
569 |
geojson_file, |
|
570 |
) |
|
571 |
geojson_file.close() |
|
544 |
with open(geojson_file_path, 'w') as geojson_file: |
|
545 |
json.dump( |
|
546 |
{ |
|
547 |
'features': [ |
|
548 |
{'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}}, |
|
549 |
{'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}}, |
|
550 |
] |
|
551 |
}, |
|
552 |
geojson_file, |
|
553 |
) |
|
572 | 554 |
assert data_sources.get_items(datasource) == [ |
573 | 555 |
( |
574 | 556 |
'1', |
... | ... | |
671 | 653 | |
672 | 654 |
# a geojson file with integer as 'id' |
673 | 655 |
get_request().datasources_cache = {} |
674 |
geojson_file = open(geojson_file_path, 'w') |
|
675 |
json.dump( |
|
676 |
{'features': [{'properties': {'id': 1, 'text': 'foo'}}, {'properties': {'id': 2, 'text': 'bar'}}]}, |
|
677 |
geojson_file, |
|
678 |
) |
|
679 |
geojson_file.close() |
|
656 |
with open(geojson_file_path, 'w') as geojson_file: |
|
657 |
json.dump( |
|
658 |
{ |
|
659 |
'features': [ |
|
660 |
{'properties': {'id': 1, 'text': 'foo'}}, |
|
661 |
{'properties': {'id': 2, 'text': 'bar'}}, |
|
662 |
] |
|
663 |
}, |
|
664 |
geojson_file, |
|
665 |
) |
|
680 | 666 |
assert data_sources.get_items(datasource) == [ |
681 | 667 |
('1', 'foo', '1', {'id': 1, 'text': 'foo', 'properties': {'id': 1, 'text': 'foo'}}), |
682 | 668 |
('2', 'bar', '2', {'id': 2, 'text': 'bar', 'properties': {'id': 2, 'text': 'bar'}}), |
... | ... | |
688 | 674 | |
689 | 675 |
# a geojson file with empty or no text values |
690 | 676 |
get_request().datasources_cache = {} |
691 |
geojson_file = open(geojson_file_path, 'w') |
|
692 |
json.dump( |
|
693 |
{'features': [{'properties': {'id': '1', 'text': ''}}, {'properties': {'id': '2'}}]}, geojson_file |
|
694 |
) |
|
695 |
geojson_file.close() |
|
677 |
with open(geojson_file_path, 'w') as geojson_file: |
|
678 |
json.dump( |
|
679 |
{'features': [{'properties': {'id': '1', 'text': ''}}, {'properties': {'id': '2'}}]}, geojson_file |
|
680 |
) |
|
696 | 681 |
assert data_sources.get_items(datasource) == [ |
697 | 682 |
('1', '1', '1', {'id': '1', 'text': '1', 'properties': {'id': '1', 'text': ''}}), |
698 | 683 |
('2', '2', '2', {'id': '2', 'text': '2', 'properties': {'id': '2'}}), |
... | ... | |
704 | 689 | |
705 | 690 |
# a geojson file with empty or no id |
706 | 691 |
get_request().datasources_cache = {} |
707 |
geojson_file = open(geojson_file_path, 'w') |
|
708 |
json.dump( |
|
709 |
{ |
|
710 |
'features': [ |
|
711 |
{'properties': {'id': '', 'text': 'foo'}}, |
|
712 |
{'properties': {'text': 'bar'}}, |
|
713 |
{'properties': {'id': None}}, |
|
714 |
] |
|
715 |
}, |
|
716 |
geojson_file, |
|
717 |
) |
|
718 |
geojson_file.close() |
|
692 |
with open(geojson_file_path, 'w') as geojson_file: |
|
693 |
json.dump( |
|
694 |
{ |
|
695 |
'features': [ |
|
696 |
{'properties': {'id': '', 'text': 'foo'}}, |
|
697 |
{'properties': {'text': 'bar'}}, |
|
698 |
{'properties': {'id': None}}, |
|
699 |
] |
|
700 |
}, |
|
701 |
geojson_file, |
|
702 |
) |
|
719 | 703 |
assert data_sources.get_items(datasource) == [] |
720 | 704 |
assert data_sources.get_structured_items(datasource) == [] |
721 | 705 | |
722 | 706 |
# specify id_property |
723 | 707 |
datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}', 'id_property': 'gid'} |
724 | 708 |
get_request().datasources_cache = {} |
725 |
geojson_file = open(geojson_file_path, 'w') |
|
726 |
json.dump( |
|
727 |
{ |
|
728 |
'features': [ |
|
729 |
{'properties': {'gid': '1', 'text': 'foo'}}, |
|
730 |
{'properties': {'gid': '2', 'text': 'bar'}}, |
|
731 |
] |
|
732 |
}, |
|
733 |
geojson_file, |
|
734 |
) |
|
735 |
geojson_file.close() |
|
709 |
with open(geojson_file_path, 'w') as geojson_file: |
|
710 |
json.dump( |
|
711 |
{ |
|
712 |
'features': [ |
|
713 |
{'properties': {'gid': '1', 'text': 'foo'}}, |
|
714 |
{'properties': {'gid': '2', 'text': 'bar'}}, |
|
715 |
] |
|
716 |
}, |
|
717 |
geojson_file, |
|
718 |
) |
|
736 | 719 |
assert data_sources.get_structured_items(datasource) == [ |
737 | 720 |
{'id': '1', 'text': 'foo', 'properties': {'gid': '1', 'text': 'foo'}}, |
738 | 721 |
{'id': '2', 'text': 'bar', 'properties': {'gid': '2', 'text': 'bar'}}, |
... | ... | |
745 | 728 | |
746 | 729 |
# check with feature IDs |
747 | 730 |
get_request().datasources_cache = {} |
748 |
geojson_file = open(geojson_file_path, 'w') |
|
749 |
json.dump( |
|
750 |
{ |
|
751 |
'features': [ |
|
752 |
{'id': '1', 'properties': {'text': 'foo'}}, |
|
753 |
{'id': '2', 'properties': {'text': 'bar'}}, |
|
754 |
] |
|
755 |
}, |
|
756 |
geojson_file, |
|
757 |
) |
|
758 |
geojson_file.close() |
|
731 |
with open(geojson_file_path, 'w') as geojson_file: |
|
732 |
json.dump( |
|
733 |
{ |
|
734 |
'features': [ |
|
735 |
{'id': '1', 'properties': {'text': 'foo'}}, |
|
736 |
{'id': '2', 'properties': {'text': 'bar'}}, |
|
737 |
] |
|
738 |
}, |
|
739 |
geojson_file, |
|
740 |
) |
|
759 | 741 |
assert data_sources.get_structured_items(datasource) == [ |
760 | 742 |
{'id': '1', 'text': 'foo', 'properties': {'text': 'foo'}}, |
761 | 743 |
{'id': '2', 'text': 'bar', 'properties': {'text': 'bar'}}, |
... | ... | |
768 | 750 |
'label_template_property': '{{ id }}: {{ text }}', |
769 | 751 |
} |
770 | 752 |
get_request().datasources_cache = {} |
771 |
geojson_file = open(geojson_file_path, 'w') |
|
772 |
json.dump( |
|
773 |
{ |
|
774 |
'features': [ |
|
775 |
{'properties': {'id': '1', 'text': 'foo'}}, |
|
776 |
{'properties': {'id': '2', 'text': 'bar'}}, |
|
777 |
] |
|
778 |
}, |
|
779 |
geojson_file, |
|
780 |
) |
|
781 |
geojson_file.close() |
|
753 |
with open(geojson_file_path, 'w') as geojson_file: |
|
754 |
json.dump( |
|
755 |
{ |
|
756 |
'features': [ |
|
757 |
{'properties': {'id': '1', 'text': 'foo'}}, |
|
758 |
{'properties': {'id': '2', 'text': 'bar'}}, |
|
759 |
] |
|
760 |
}, |
|
761 |
geojson_file, |
|
762 |
) |
|
782 | 763 |
assert data_sources.get_structured_items(datasource) == [ |
783 | 764 |
{'id': '1', 'text': '1: foo', 'properties': {'id': '1', 'text': 'foo'}}, |
784 | 765 |
{'id': '2', 'text': '2: bar', 'properties': {'id': '2', 'text': 'bar'}}, |
... | ... | |
801 | 782 |
# unknown property or empty value |
802 | 783 |
datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}', 'label_template_property': '{{ label }}'} |
803 | 784 |
get_request().datasources_cache = {} |
804 |
geojson_file = open(geojson_file_path, 'w') |
|
805 |
json.dump( |
|
806 |
{ |
|
807 |
'features': [ |
|
808 |
{'properties': {'id': '1', 'text': 'foo', 'label': ''}}, |
|
809 |
{'properties': {'id': '2', 'text': 'bar'}}, |
|
810 |
] |
|
811 |
}, |
|
812 |
geojson_file, |
|
813 |
) |
|
814 |
geojson_file.close() |
|
785 |
with open(geojson_file_path, 'w') as geojson_file: |
|
786 |
json.dump( |
|
787 |
{ |
|
788 |
'features': [ |
|
789 |
{'properties': {'id': '1', 'text': 'foo', 'label': ''}}, |
|
790 |
{'properties': {'id': '2', 'text': 'bar'}}, |
|
791 |
] |
|
792 |
}, |
|
793 |
geojson_file, |
|
794 |
) |
|
815 | 795 |
assert data_sources.get_structured_items(datasource) == [ |
816 | 796 |
{'id': '1', 'text': '1', 'properties': {'id': '1', 'text': 'foo', 'label': ''}}, |
817 | 797 |
{'id': '2', 'text': '2', 'properties': {'id': '2', 'text': 'bar'}}, |
tests/test_ezt.py | ||
---|---|---|
217 | 217 | |
218 | 218 |
def test_ezt_script(pub): |
219 | 219 |
os.mkdir(os.path.join(pub.app_dir, 'scripts')) |
220 |
fd = open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') |
|
221 |
fd.write('''result = "Hello %s" % ("world" if not args else args[0])''') |
|
222 |
fd.close() |
|
220 |
with open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') as fd: |
|
221 |
fd.write('''result = "Hello %s" % ("world" if not args else args[0])''') |
|
223 | 222 | |
224 | 223 |
vars = {'script': ScriptsSubstitutionProxy()} |
225 | 224 |
template = Template() |
tests/test_hobo.py | ||
---|---|---|
375 | 375 |
def test_deploy(): |
376 | 376 |
cleanup() |
377 | 377 |
WcsPublisher.APP_DIR = alt_tempdir |
378 |
fd = open(os.path.join(alt_tempdir, 'hobo.json'), 'w') |
|
379 |
hobo_json = copy.deepcopy(HOBO_JSON) |
|
380 |
del hobo_json['services'][1] # authentic |
|
381 |
fd.write(json.dumps(HOBO_JSON)) |
|
382 |
fd.close() |
|
378 |
with open(os.path.join(alt_tempdir, 'hobo.json'), 'w') as fd: |
|
379 |
hobo_json = copy.deepcopy(HOBO_JSON) |
|
380 |
del hobo_json['services'][1] # authentic |
|
381 |
fd.write(json.dumps(HOBO_JSON)) |
|
383 | 382 |
hobo_cmd = CmdCheckHobos() |
384 | 383 |
base_options = {} |
385 | 384 |
sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra']) |
... | ... | |
406 | 405 |
def test_configure_postgresql(): |
407 | 406 |
cleanup() |
408 | 407 |
WcsPublisher.APP_DIR = alt_tempdir |
409 |
fd = open(os.path.join(alt_tempdir, 'hobo.json'), 'w') |
|
410 |
hobo_json = copy.deepcopy(HOBO_JSON) |
|
411 |
del hobo_json['services'][1] # authentic |
|
412 |
fd.write(json.dumps(HOBO_JSON)) |
|
413 |
fd.close() |
|
408 |
with open(os.path.join(alt_tempdir, 'hobo.json'), 'w') as fd: |
|
409 |
hobo_json = copy.deepcopy(HOBO_JSON) |
|
410 |
del hobo_json['services'][1] # authentic |
|
411 |
fd.write(json.dumps(HOBO_JSON)) |
|
414 | 412 | |
415 | 413 |
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0] |
416 | 414 | |
... | ... | |
424 | 422 |
) |
425 | 423 |
assert os.path.exists(os.path.join(alt_tempdir, 'wcs.example.net')) |
426 | 424 | |
427 |
fd = open(os.path.join(alt_tempdir, 'wcs.example.net', 'site-options.cfg'), 'w') |
|
428 |
fd.write('[options]\n') |
|
429 |
fd.write('postgresql = true\n') |
|
430 |
fd.close() |
|
425 |
with open(os.path.join(alt_tempdir, 'wcs.example.net', 'site-options.cfg'), 'w') as fd: |
|
426 |
fd.write('[options]\n') |
|
427 |
fd.write('postgresql = true\n') |
|
431 | 428 | |
432 | 429 |
cleanup() |
433 | 430 |
tests/test_misc.py | ||
---|---|---|
209 | 209 |
assert variables['script'].hello_world() |
210 | 210 | |
211 | 211 |
os.mkdir(os.path.join(pub.app_dir, 'scripts')) |
212 |
fd = open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') |
|
213 |
fd.write('"""docstring"""\nresult = "hello world"') |
|
214 |
fd.close() |
|
212 |
with open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') as fd: |
|
213 |
fd.write('"""docstring"""\nresult = "hello world"') |
|
215 | 214 |
assert variables['script'].hello_world() == 'hello world' |
216 | 215 | |
217 | 216 |
assert Script('hello_world').__doc__ == 'docstring' |
218 | 217 | |
219 | 218 |
os.mkdir(os.path.join(pub.APP_DIR, 'scripts')) |
220 |
fd = open(os.path.join(pub.APP_DIR, 'scripts', 'hello_world.py'), 'w') |
|
221 |
fd.write('result = "hello global world"') |
|
222 |
fd.close() |
|
219 |
with open(os.path.join(pub.APP_DIR, 'scripts', 'hello_world.py'), 'w') as fd: |
|
220 |
fd.write('result = "hello global world"') |
|
223 | 221 |
assert variables['script'].hello_world() == 'hello world' |
224 | 222 | |
225 | 223 |
os.unlink(os.path.join(pub.app_dir, 'scripts', 'hello_world.py')) |
226 | 224 |
assert variables['script'].hello_world() == 'hello global world' |
227 | 225 | |
228 |
fd = open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') |
|
229 |
fd.write('result = site_url') |
|
230 |
fd.close() |
|
226 |
with open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') as fd: |
|
227 |
fd.write('result = site_url') |
|
231 | 228 |
assert variables['script'].hello_world() == 'http://example.net' |
232 | 229 | |
233 | 230 | |
... | ... | |
404 | 401 |
if not pub.site_options.has_section('options'): |
405 | 402 |
pub.site_options.add_section('options') |
406 | 403 |
pub.site_options.set('options', 'default-page-size', '500') |
407 |
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') |
|
408 |
pub.site_options.write(fd) |
|
409 |
fd.close() |
|
404 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
405 |
pub.site_options.write(fd) |
|
410 | 406 |
assert '(1-101/1000)' in get_texts(pagination_links(0, 101, 1000)) |
411 | 407 |
assert '(1-500/1000)' in get_texts(pagination_links(0, 500, 1000)) |
412 | 408 |
assert '(1-500/1000)' in get_texts(pagination_links(0, 501, 1000)) # 500 is the max |
... | ... | |
705 | 701 |
def test_find_vc_version(): |
706 | 702 |
import wcs.qommon.admin.menu |
707 | 703 | |
708 |
with mock.patch('os.path.exists') as os_path_exists, mock.patch('subprocess.Popen') as popen: |
|
709 | ||
710 |
def mocked_os_path_exists(path): |
|
711 |
return bool(not path.endswith('setup.py')) |
|
712 | ||
713 |
os_path_exists.side_effect = mocked_os_path_exists |
|
714 | ||
715 |
def mocked_popen(*args, **kwargs): |
|
716 |
class Process: |
|
717 |
returncode = 0 |
|
704 |
def mocked_popen(*args, **kwargs): |
|
705 |
class Process: |
|
706 |
returncode = 0 |
|
718 | 707 | |
719 |
def communicate(self):
|
|
720 |
return (
|
|
721 |
b'''Desired=Unknown/Install/Remove/Purge/Hold
|
|
708 |
def communicate(self): |
|
709 |
return ( |
|
710 |
b'''Desired=Unknown/Install/Remove/Purge/Hold |
|
722 | 711 |
| Status=Not/Inst/Conf-files/Unpacked/halF-conf/Half-inst/trig-aWait/Trig-pend |
723 | 712 |
|/ Err?=(none)/Reinst-required (Status,Err: uppercase=bad) |
724 | 713 |
||/ Name Version Architecture Description |
725 | 714 |
+++-==============-===============-============-================================================= |
726 | 715 |
ii wcs 5.71-1~eob100+1 all web application to design and set up online forms |
727 | 716 |
''', |
728 |
'',
|
|
729 |
)
|
|
717 |
'', |
|
718 |
) |
|
730 | 719 | |
731 |
return Process() |
|
720 |
return Process() |
|
721 | ||
722 |
with mock.patch('os.path.exists') as os_path_exists, mock.patch('subprocess.Popen') as popen: |
|
723 | ||
724 |
def mocked_os_path_exists(path): |
|
725 |
return bool(not path.endswith('setup.py')) |
|
726 | ||
727 |
os_path_exists.side_effect = mocked_os_path_exists |
|
732 | 728 | |
733 |
popen.side_effect = mocked_popen |
|
729 |
handle = mock.MagicMock() |
|
730 |
handle.__enter__.side_effect = mocked_popen |
|
731 |
popen.return_value = handle |
|
734 | 732 | |
735 | 733 |
version = wcs.qommon.admin.menu._find_vc_version() |
736 | 734 |
assert version == 'wcs 5.71-1~eob100+1 (Debian)' |
tests/test_publisher.py | ||
---|---|---|
177 | 177 |
pub.write_cfg() |
178 | 178 | |
179 | 179 |
c = io.BytesIO() |
180 |
z = zipfile.ZipFile(c, 'w') |
|
181 |
z.writestr('config.pck', pickle.dumps({'language': {'language': 'fr'}, 'whatever': ['a', 'b', 'c']})) |
|
182 |
z.close() |
|
180 |
with zipfile.ZipFile(c, 'w') as z: |
|
181 |
z.writestr('config.pck', pickle.dumps({'language': {'language': 'fr'}, 'whatever': ['a', 'b', 'c']})) |
|
183 | 182 |
c.seek(0) |
184 | 183 | |
185 | 184 |
pub.import_zip(c) |
... | ... | |
188 | 187 |
assert pub.cfg['sp'] == {'what': 'ever'} |
189 | 188 | |
190 | 189 |
c = io.BytesIO() |
191 |
z = zipfile.ZipFile(c, 'w') |
|
192 |
z.writestr( |
|
193 |
'config.json', json.dumps({'language': {'language': 'en'}, 'whatever2': ['a', 'b', {'c': 'd'}]}) |
|
194 |
) |
|
195 |
z.close() |
|
190 |
with zipfile.ZipFile(c, 'w') as z: |
|
191 |
z.writestr( |
|
192 |
'config.json', json.dumps({'language': {'language': 'en'}, 'whatever2': ['a', 'b', {'c': 'd'}]}) |
|
193 |
) |
|
196 | 194 |
c.seek(0) |
197 | 195 | |
198 | 196 |
pub.import_zip(c) |
tests/test_saml_auth.py | ||
---|---|---|
81 | 81 |
'role': lasso.PROVIDER_ROLE_IDP, |
82 | 82 |
} |
83 | 83 |
filename = pub.cfg['idp'][base_id]['metadata'] |
84 |
fd = open(os.path.join(pub.app_dir, filename), 'w') |
|
85 |
fd.write(metadata) |
|
86 |
fd.close() |
|
84 |
with open(os.path.join(pub.app_dir, filename), 'w') as fd: |
|
85 |
fd.write(metadata) |
|
87 | 86 | |
88 | 87 |
filename = pub.cfg['idp'][base_id]['publickey'] |
89 |
fd = open(os.path.join(pub.app_dir, filename), 'w') |
|
90 |
fd.write(idp_publickey) |
|
91 |
fd.close() |
|
88 |
with open(os.path.join(pub.app_dir, filename), 'w') as fd: |
|
89 |
fd.write(idp_publickey) |
|
92 | 90 | |
93 | 91 |
filename = pub.cfg['idp'][base_id]['publickey'].replace('public', 'private') |
94 |
fd = open(os.path.join(pub.app_dir, filename), 'w') |
|
95 |
fd.write(idp_privatekey) |
|
96 |
fd.close() |
|
92 |
with open(os.path.join(pub.app_dir, filename), 'w') as fd: |
|
93 |
fd.write(idp_privatekey) |
|
97 | 94 | |
98 | 95 |
pub.write_cfg() |
99 | 96 |
tests/test_workflows.py | ||
---|---|---|
3506 | 3506 |
item.perform(formdata) |
3507 | 3507 | |
3508 | 3508 |
assert formdata.evolution[-1].parts[-1].base_filename == 'template.odt' |
3509 |
zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
|
|
3510 |
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg') |
|
3509 |
with zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r') as zfile:
|
|
3510 |
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
|
|
3511 | 3511 |
# check the image has been replaced by the one from the formdata |
3512 | 3512 |
assert zinfo.file_size == len(image_data) |
3513 | 3513 | |
... | ... | |
3521 | 3521 | |
3522 | 3522 |
item.perform(formdata) |
3523 | 3523 | |
3524 |
zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
|
|
3525 |
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg') |
|
3524 |
with zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r') as zfile:
|
|
3525 |
zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
|
|
3526 | 3526 |
# check the original image has been left |
3527 | 3527 |
assert zinfo.file_size == 580 |
3528 | 3528 | |
... | ... | |
3574 | 3574 |
fbo1 = formdata.data['bo1'] |
3575 | 3575 |
assert fbo1.base_filename == 'template.odt' |
3576 | 3576 |
assert fbo1.content_type == 'application/octet-stream' |
3577 |
zfile = zipfile.ZipFile(fbo1.get_file())
|
|
3578 |
assert b'foo-export-to-bofile' in zfile.read('content.xml') |
|
3577 |
with zipfile.ZipFile(fbo1.get_file()) as zfile:
|
|
3578 |
assert b'foo-export-to-bofile' in zfile.read('content.xml')
|
|
3579 | 3579 | |
3580 | 3580 |
# no more 'bo1' backoffice field: do nothing |
3581 | 3581 |
formdata = formdef.data_class()() |
tests/utilities.py | ||
---|---|---|
108 | 108 |
created = True |
109 | 109 | |
110 | 110 |
# always reset site-options.cfg |
111 |
fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') |
|
112 |
fd.write('[wscall-secrets]\n') |
|
113 |
fd.write('idp.example.net = BAR\n') |
|
114 |
fd.write('\n') |
|
115 |
fd.write('[options]\n') |
|
116 |
fd.write('formdef-captcha-option = true\n') |
|
117 |
fd.write('formdef-appearance-keywords = true\n') |
|
118 |
fd.write('workflow-resubmit-action = true\n') |
|
119 |
if lazy_mode: |
|
120 |
fd.write('force-lazy-mode = true\n') |
|
121 |
if sql_mode: |
|
122 |
fd.write('postgresql = true\n') |
|
123 |
fd.close() |
|
111 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
112 |
fd.write('[wscall-secrets]\n') |
|
113 |
fd.write('idp.example.net = BAR\n') |
|
114 |
fd.write('\n') |
|
115 |
fd.write('[options]\n') |
|
116 |
fd.write('formdef-captcha-option = true\n') |
|
117 |
fd.write('formdef-appearance-keywords = true\n') |
|
118 |
fd.write('workflow-resubmit-action = true\n') |
|
119 |
if lazy_mode: |
|
120 |
fd.write('force-lazy-mode = true\n') |
|
121 |
if sql_mode: |
|
122 |
fd.write('postgresql = true\n') |
|
124 | 123 | |
125 | 124 |
# make sure site options are not cached |
126 | 125 |
pub.site_options = None |
wcs/admin/forms.py | ||
---|---|---|
1519 | 1519 |
all_forms = [x for x in all_forms if x.last_update_time < date] |
1520 | 1520 | |
1521 | 1521 |
self.fd = io.BytesIO() |
1522 |
t = tarfile.open('wcs.tar.gz', 'w:gz', fileobj=self.fd) |
|
1523 |
t.add(self.formdef.get_object_filename(), 'formdef') |
|
1524 |
for formdata in all_forms: |
|
1525 |
t.add(formdata.get_object_filename(), '%s/%s' % (self.formdef.url_name, str(formdata.id))) |
|
1526 |
t.close() |
|
1522 |
with tarfile.open('wcs.tar.gz', 'w:gz', fileobj=self.fd) as t: |
|
1523 |
t.add(self.formdef.get_object_filename(), 'formdef') |
|
1524 |
for formdata in all_forms: |
|
1525 |
t.add( |
|
1526 |
formdata.get_object_filename(), |
|
1527 |
'%s/%s' % (self.formdef.url_name, str(formdata.id)), |
|
1528 |
) |
|
1527 | 1529 | |
1528 | 1530 |
if form.get_widget('keep').parse() is False: |
1529 | 1531 |
for f in all_forms: |
wcs/admin/settings.py | ||
---|---|---|
848 | 848 | |
849 | 849 |
parent_theme_directory = os.path.dirname(theme_directory) |
850 | 850 |
c = io.BytesIO() |
851 |
z = zipfile.ZipFile(c, 'w') |
|
852 |
for base, dummy, filenames in os.walk(theme_directory): |
|
853 |
basetheme = base[len(parent_theme_directory) + 1 :] |
|
854 |
for filename in filenames: |
|
855 |
z.write(os.path.join(base, filename), os.path.join(basetheme, filename)) |
|
856 |
z.close() |
|
851 |
with zipfile.ZipFile(c, 'w') as z: |
|
852 |
for base, dummy, filenames in os.walk(theme_directory): |
|
853 |
basetheme = base[len(parent_theme_directory) + 1 :] |
|
854 |
for filename in filenames: |
|
855 |
z.write(os.path.join(base, filename), os.path.join(basetheme, filename)) |
|
857 | 856 | |
858 | 857 |
response = get_response() |
859 | 858 |
response.set_content_type('application/zip') |
... | ... | |
896 | 895 | |
897 | 896 |
def install_theme_from_file(self, fp): |
898 | 897 |
try: |
899 |
z = zipfile.ZipFile(fp, 'r') |
|
898 |
with zipfile.ZipFile(fp, 'r') as z: |
|
899 |
theme_dir = os.path.join(get_publisher().app_dir, 'themes') |
|
900 |
filename_list = [x for x in z.namelist() if x[0] != '/' and x[-1] != '/'] |
|
901 |
if len(filename_list) == 0: |
|
902 |
get_session().message = ('error', _('Empty theme file.')) |
|
903 |
return redirect('themes') |
|
904 |
theme_name = filename_list[0].split('/')[0] |
|
905 |
if ('%s/desc.xml' % theme_name) not in filename_list: |
|
906 |
get_session().message = ('error', _('Theme is missing a desc.xml file.')) |
|
907 |
return redirect('themes') |
|
908 |
desc_xml = z.read('%s/desc.xml' % theme_name) |
|
909 |
theme_dict = template.get_theme_dict(io.StringIO(force_text(desc_xml))) |
|
910 |
if theme_dict.get('name') != theme_name: |
|
911 |
get_session().message = ('error', _('desc.xml is missing a name attribute.')) |
|
912 |
return redirect('themes') |
|
913 |
if os.path.exists(os.path.join(theme_dir, theme_name)): |
|
914 |
shutil.rmtree(os.path.join(theme_dir, theme_name)) |
|
915 |
for f in z.namelist(): |
|
916 |
if f[-1] == '/': |
|
917 |
continue |
|
918 |
path = os.path.join(theme_dir, f) |
|
919 |
data = z.read(f) |
|
920 |
if not os.path.exists(os.path.dirname(path)): |
|
921 |
os.makedirs(os.path.dirname(path)) |
|
922 |
with open(path, 'wb') as _f: |
|
923 |
_f.write(data) |
|
924 |
return redirect('themes') |
|
900 | 925 |
except Exception as e: |
901 | 926 |
get_session().message = ('error', _('Failed to read theme file. (%s)') % str(e)) |
902 | 927 |
return redirect('themes') |
903 |
theme_dir = os.path.join(get_publisher().app_dir, 'themes') |
|
904 |
filename_list = [x for x in z.namelist() if x[0] != '/' and x[-1] != '/'] |
|
905 |
if len(filename_list) == 0: |
|
906 |
get_session().message = ('error', _('Empty theme file.')) |
|
907 |
return redirect('themes') |
|
908 |
theme_name = filename_list[0].split('/')[0] |
|
909 |
if ('%s/desc.xml' % theme_name) not in filename_list: |
|
910 |
get_session().message = ('error', _('Theme is missing a desc.xml file.')) |
|
911 |
return redirect('themes') |
|
912 |
desc_xml = z.read('%s/desc.xml' % theme_name) |
|
913 |
theme_dict = template.get_theme_dict(io.StringIO(force_text(desc_xml))) |
|
914 |
if theme_dict.get('name') != theme_name: |
|
915 |
get_session().message = ('error', _('desc.xml is missing a name attribute.')) |
|
916 |
return redirect('themes') |
|
917 |
if os.path.exists(os.path.join(theme_dir, theme_name)): |
|
918 |
shutil.rmtree(os.path.join(theme_dir, theme_name)) |
|
919 |
for f in z.namelist(): |
|
920 |
if f[-1] == '/': |
|
921 |
continue |
|
922 |
path = os.path.join(theme_dir, f) |
|
923 |
data = z.read(f) |
|
924 |
if not os.path.exists(os.path.dirname(path)): |
|
925 |
os.makedirs(os.path.dirname(path)) |
|
926 |
open(path, 'wb').write(data) |
|
927 |
z.close() |
|
928 |
return redirect('themes') |
|
929 | 928 | |
930 | 929 |
def install_theme_from_url(self, url): |
931 | 930 |
try: |
... | ... | |
1028 | 1027 | |
1029 | 1028 |
def export(self, job): |
1030 | 1029 |
c = io.BytesIO() |
1031 |
z = zipfile.ZipFile(c, 'w') |
|
1032 |
for d in self.dirs: |
|
1033 |
if d not in ( |
|
1034 |
'categories', |
|
1035 |
'carddef_categories', |
|
1036 |
'wscalls', |
|
1037 |
'mail-templates', |
|
1038 |
'apiaccess', |
|
1039 |
): |
|
1040 |
continue |
|
1041 |
path = os.path.join(self.app_dir, d) |
|
1042 |
if not os.path.exists(path): |
|
1043 |
continue |
|
1044 |
for f in os.listdir(path): |
|
1045 |
if f in ('.indexes', '.max_id'): |
|
1030 |
with zipfile.ZipFile(c, 'w') as z: |
|
1031 |
for d in self.dirs: |
|
1032 |
if d not in ( |
|
1033 |
'categories', |
|
1034 |
'carddef_categories', |
|
1035 |
'wscalls', |
|
1036 |
'mail-templates', |
|
1037 |
'apiaccess', |
|
1038 |
): |
|
1046 | 1039 |
continue |
1047 |
z.write(os.path.join(path, f), os.path.join(d, f)) |
|
1048 |
if 'datasources' in self.dirs: |
|
1049 |
for ds in NamedDataSource.select(): |
|
1050 |
if ds.external == 'agenda': |
|
1040 |
path = os.path.join(self.app_dir, d) |
|
1041 |
if not os.path.exists(path): |
|
1051 | 1042 |
continue |
1052 |
node = ds.export_to_xml(include_id=True) |
|
1053 |
misc.indent_xml(node) |
|
1054 |
z.writestr( |
|
1055 |
os.path.join('datasources', str(ds.id)), |
|
1056 |
ET.tostring(node), |
|
1057 |
) |
|
1058 |
if 'formdefs' in self.dirs: |
|
1059 |
for formdef in FormDef.select(): |
|
1060 |
node = formdef.export_to_xml(include_id=True) |
|
1061 |
misc.indent_xml(node) |
|
1062 |
z.writestr( |
|
1063 |
os.path.join('formdefs_xml', str(formdef.id)), |
|
1064 |
b'<?xml version="1.0"?>\n' + ET.tostring(node), |
|
1065 |
) |
|
1066 |
if 'carddefs' in self.dirs: |
|
1067 |
for formdef in CardDef.select(): |
|
1068 |
node = formdef.export_to_xml(include_id=True) |
|
1069 |
misc.indent_xml(node) |
|
1070 |
z.writestr( |
|
1071 |
os.path.join('carddefs_xml', str(formdef.id)), |
|
1072 |
b'<?xml version="1.0"?>\n' + ET.tostring(node), |
|
1073 |
) |
|
1074 |
if 'workflows' in self.dirs: |
|
1075 |
for workflow in Workflow.select(): |
|
1076 |
node = workflow.export_to_xml(include_id=True) |
|
1077 |
misc.indent_xml(node) |
|
1078 |
z.writestr( |
|
1079 |
os.path.join('workflows_xml', str(workflow.id)), |
|
1080 |
b'<?xml version="1.0"?>\n' + ET.tostring(node), |
|
1081 |
) |
|
1082 |
if 'blockdefs' in self.dirs: |
|
1083 |
for blockdef in BlockDef.select(): |
|
1084 |
node = blockdef.export_to_xml(include_id=True) |
|
1085 |
misc.indent_xml(node) |
|
1086 |
z.writestr( |
|
1087 |
os.path.join('blockdefs_xml', str(blockdef.id)), |
|
1088 |
b'<?xml version="1.0"?>\n' + ET.tostring(node), |
|
1089 |
) |
|
1090 |
if 'roles' in self.dirs: |
|
1091 |
for role in get_publisher().role_class.select(): |
|
1092 |
node = role.export_to_xml(include_id=True) |
|
1093 |
misc.indent_xml(node) |
|
1094 |
z.writestr( |
|
1095 |
os.path.join('roles_xml', str(role.id)), |
|
1096 |
b'<?xml version="1.0"?>\n' + ET.tostring(node), |
|
1097 |
) |
|
1098 | ||
1099 |
if self.settings: |
|
1100 |
z.write(os.path.join(self.app_dir, 'config.pck'), 'config.pck') |
|
1101 |
for f in os.listdir(self.app_dir): |
|
1102 |
if f.startswith('idp-') and os.path.splitext(f)[-1] in ('.pem', '.xml'): |
|
1103 |
z.write(os.path.join(self.app_dir, f), f) |
|
1104 |
if os.path.exists(os.path.join(self.app_dir, 'config')): |
|
1105 |
for f in os.listdir(os.path.join(self.app_dir, 'config')): |
|
1106 |
z.write(os.path.join(self.app_dir, 'config', f), os.path.join('config', f)) |
|
1107 |
z.close() |
|
1043 |
for f in os.listdir(path): |
|
1044 |
if f in ('.indexes', '.max_id'): |
|
1045 |
continue |
|
1046 |
z.write(os.path.join(path, f), os.path.join(d, f)) |
|
1047 |
if 'datasources' in self.dirs: |
|
1048 |
for ds in NamedDataSource.select(): |
|
1049 |
if ds.external == 'agenda': |
|
1050 |
continue |
|
1051 |
node = ds.export_to_xml(include_id=True) |
|
1052 |
misc.indent_xml(node) |
|
1053 |
z.writestr( |
|
1054 |
os.path.join('datasources', str(ds.id)), |
|
1055 |
ET.tostring(node), |
|
1056 |
) |
|
1057 |
if 'formdefs' in self.dirs: |
|
1058 |
for formdef in FormDef.select(): |
|
1059 |
node = formdef.export_to_xml(include_id=True) |
|
1060 |
misc.indent_xml(node) |
|
1061 |
z.writestr( |
|
1062 |
os.path.join('formdefs_xml', str(formdef.id)), |
|
1063 |
b'<?xml version="1.0"?>\n' + ET.tostring(node), |
|
1064 |
) |
|
1065 |
if 'carddefs' in self.dirs: |
|
1066 |
for formdef in CardDef.select(): |
|
1067 |
node = formdef.export_to_xml(include_id=True) |
|
1068 |
misc.indent_xml(node) |
|
1069 |
z.writestr( |
|
1070 |
os.path.join('carddefs_xml', str(formdef.id)), |
|
1071 |
b'<?xml version="1.0"?>\n' + ET.tostring(node), |
|
1072 |
) |
|
1073 |
if 'workflows' in self.dirs: |
|
1074 |
for workflow in Workflow.select(): |
|
1075 |
node = workflow.export_to_xml(include_id=True) |
|
1076 |
misc.indent_xml(node) |
|
1077 |
z.writestr( |
|
1078 |
os.path.join('workflows_xml', str(workflow.id)), |
|
1079 |
b'<?xml version="1.0"?>\n' + ET.tostring(node), |
|
1080 |
) |
|
1081 |
if 'blockdefs' in self.dirs: |
|
1082 |
for blockdef in BlockDef.select(): |
|
1083 |
node = blockdef.export_to_xml(include_id=True) |
|
1084 |
misc.indent_xml(node) |
|
1085 |
z.writestr( |
|
1086 |
os.path.join('blockdefs_xml', str(blockdef.id)), |
|
1087 |
b'<?xml version="1.0"?>\n' + ET.tostring(node), |
|
1088 |
) |
|
1089 |
if 'roles' in self.dirs: |
|
1090 |
for role in get_publisher().role_class.select(): |
|
1091 |
node = role.export_to_xml(include_id=True) |
|
1092 |
misc.indent_xml(node) |
|
1093 |
z.writestr( |
|
1094 |
os.path.join('roles_xml', str(role.id)), |
|
1095 |
b'<?xml version="1.0"?>\n' + ET.tostring(node), |
|
1096 |
) |
|
1097 | ||
1098 |
if self.settings: |
|
1099 |
z.write(os.path.join(self.app_dir, 'config.pck'), 'config.pck') |
|
1100 |
for f in os.listdir(self.app_dir): |
|
1101 |
if f.startswith('idp-') and os.path.splitext(f)[-1] in ('.pem', '.xml'): |
|
1102 |
z.write(os.path.join(self.app_dir, f), f) |
|
1103 |
if os.path.exists(os.path.join(self.app_dir, 'config')): |
|
1104 |
for f in os.listdir(os.path.join(self.app_dir, 'config')): |
|
1105 |
z.write(os.path.join(self.app_dir, 'config', f), os.path.join('config', f)) |
|
1108 | 1106 | |
1109 | 1107 |
job.file_content = c.getvalue() |
1110 | 1108 |
job.store() |
wcs/admin/workflows.py | ||
---|---|---|
275 | 275 |
out = out.getvalue() |
276 | 276 |
if svg: |
277 | 277 |
try: |
278 |
process = Popen(['dot', '-Tsvg'], stdin=PIPE, stdout=PIPE)
|
|
279 |
out = process.communicate(force_bytes(out))[0] |
|
280 |
if process.returncode != 0: |
|
281 |
return '' |
|
278 |
with Popen(['dot', '-Tsvg'], stdin=PIPE, stdout=PIPE) as process:
|
|
279 |
out = process.communicate(force_bytes(out))[0]
|
|
280 |
if process.returncode != 0:
|
|
281 |
return ''
|
|
282 | 282 |
except OSError: |
283 | 283 |
return '' |
284 | 284 |
out = graphviz_post_treatment(out, revert_colours, include=include) |
wcs/backoffice/management.py | ||
---|---|---|
2997 | 2997 |
def download_as_zip(self): |
2998 | 2998 |
formdata = self.filled |
2999 | 2999 |
zip_content = io.BytesIO() |
3000 |
zip_file = zipfile.ZipFile(zip_content, 'w') |
|
3001 | 3000 |
counter = {'value': 0} |
3002 | 3001 | |
3003 |
def add_zip_file(upload): |
|
3002 |
def add_zip_file(upload, zip_file):
|
|
3004 | 3003 |
counter['value'] += 1 |
3005 | 3004 |
filename = '%s_%s' % (counter['value'], upload.base_filename) |
3006 | 3005 |
zip_file.writestr(filename, upload.get_content()) |
3007 | 3006 | |
3008 |
for value in formdata.data.values(): |
|
3009 |
if isinstance(value, PicklableUpload): |
|
3010 |
add_zip_file(value) |
|
3011 |
if isinstance(value, dict) and isinstance(value.get('data'), list): |
|
3012 |
for subvalue in value.get('data'): |
|
3013 |
for subvalue_elem in subvalue.values(): |
|
3014 |
if isinstance(subvalue_elem, PicklableUpload): |
|
3015 |
add_zip_file(subvalue_elem) |
|
3016 | ||
3017 |
zip_file.close() |
|
3007 |
with zipfile.ZipFile(zip_content, 'w') as zip_file: |
|
3008 |
for value in formdata.data.values(): |
|
3009 |
if isinstance(value, PicklableUpload): |
|
3010 |
add_zip_file(value, zip_file) |
|
3011 |
if isinstance(value, dict) and isinstance(value.get('data'), list): |
|
3012 |
for subvalue in value.get('data'): |
|
3013 |
for subvalue_elem in subvalue.values(): |
|
3014 |
if isinstance(subvalue_elem, PicklableUpload): |
|
3015 |
add_zip_file(subvalue_elem, zip_file) |
|
3018 | 3016 | |
3019 | 3017 |
response = get_response() |
3020 | 3018 |
response.set_content_type('application/zip') |
wcs/ctl/backup.py | ||
---|---|---|
48 | 48 |
os.mkdir(backup_dir) |
49 | 49 |
backup_filepath = os.path.join(backup_dir, 'backup-%s%s%s-%s%s%s.tar.gz' % time.localtime()[:6]) |
50 | 50 | |
51 |
backup = tarfile.open(backup_filepath, mode='w:gz') |
|
52 |
for basename, dirnames, filenames in os.walk(pub.app_dir): |
|
53 |
if 'backups' in dirnames: # do not recurse in backup directory |
|
54 |
idx = dirnames.index('backups') |
|
55 |
dirnames[idx : idx + 1] = [] |
|
56 |
for filename in filenames: |
|
57 |
backup.add( |
|
58 |
os.path.join(basename, filename), os.path.join(basename, filename)[len(pub.app_dir) :] |
|
59 |
) |
|
60 | ||
61 |
backup.close() |
|
51 |
with tarfile.open(backup_filepath, mode='w:gz') as backup: |
|
52 |
for basename, dirnames, filenames in os.walk(pub.app_dir): |
|
53 |
if 'backups' in dirnames: # do not recurse in backup directory |
|
54 |
idx = dirnames.index('backups') |
|
55 |
dirnames[idx : idx + 1] = [] |
|
56 |
for filename in filenames: |
|
57 |
backup.add( |
|
58 |
os.path.join(basename, filename), os.path.join(basename, filename)[len(pub.app_dir) :] |
|
59 |
) |
|
62 | 60 | |
63 | 61 | |
64 | 62 |
CmdBackup.register() |
wcs/ctl/management/commands/convert_to_sql.py | ||
---|---|---|
100 | 100 |
sql.SqlUser.fix_sequences() |
101 | 101 | |
102 | 102 |
if errors: |
103 |
error_log = open('error_user.log', 'w') |
|
104 |
for user, trace in errors: |
|
105 |
error_log.write('user_id %s\n' % user.id) |
|
106 |
error_log.write(trace) |
|
107 |
error_log.write('-' * 80) |
|
108 |
error_log.write('\n\n') |
|
109 |
error_log.close() |
|
103 |
with open('error_user.log', 'w') as error_log: |
|
104 |
for user, trace in errors: |
|
105 |
error_log.write('user_id %s\n' % user.id) |
|
106 |
error_log.write(trace) |
|
107 |
error_log.write('-' * 80) |
|
108 |
error_log.write('\n\n') |
|
110 | 109 |
print('There were some errors, see error_user.log for details.') |
111 | 110 | |
112 | 111 |
def store_forms(self): |
... | ... | |
138 | 137 |
sql_data_class.fix_sequences() |
139 | 138 | |
140 | 139 |
if errors: |
141 |
error_log = open('error_formdata.log', 'w') |
|
142 |
for formdata, trace in errors: |
|
143 |
error_log.write( |
|
144 |
'%s %s - %s\n' % (formdata.formdef, formdata.id, localstrftime(formdata.receipt_time)) |
|
145 |
) |
|
146 |
error_log.write(trace) |
|
147 |
error_log.write('-' * 80) |
|
148 |
error_log.write('\n\n') |
|
149 |
error_log.close() |
|
140 |
with open('error_formdata.log', 'w') as error_log: |
|
141 |
for formdata, trace in errors: |
|
142 |
error_log.write( |
|
143 |
'%s %s - %s\n' % (formdata.formdef, formdata.id, localstrftime(formdata.receipt_time)) |
|
144 |
) |
|
145 |
error_log.write(trace) |
|
146 |
error_log.write('-' * 80) |
|
147 |
error_log.write('\n\n') |
|
150 | 148 |
print('There were some errors, see error_formdata.log.') |
151 | 149 | |
152 | 150 |
def update_progress(self, progress, num_columns=120): |
wcs/ctl/restore.py | ||
---|---|---|
50 | 50 |
return 1 |
51 | 51 |
backup_filepath = sub_options.filename |
52 | 52 | |
53 |
backup = tarfile.open(backup_filepath, mode='r:*') |
|
54 |
for tarinfo in backup: |
|
55 |
if os.path.normpath(tarinfo.name).startswith('..'): |
|
56 |
continue |
|
57 |
backup.extract(tarinfo, pub.app_dir) |
|
58 |
backup.close() |
|
53 |
with tarfile.open(backup_filepath, mode='r:*') as backup: |
|
54 |
for tarinfo in backup: |
|
55 |
if os.path.normpath(tarinfo.name).startswith('..'): |
|
56 |
continue |
|
57 |
backup.extract(tarinfo, pub.app_dir) |
|
59 | 58 | |
60 | 59 | |
61 | 60 |
CmdRestore.register() |
wcs/publisher.py | ||
---|---|---|
162 | 162 |
self.set_session_manager(self.session_manager_class(session_class=self.session_class)) |
163 | 163 | |
164 | 164 |
def import_zip(self, fd): |
165 |
z = zipfile.ZipFile(fd) |
|
166 | 165 |
results = { |
167 | 166 |
'formdefs': 0, |
168 | 167 |
'carddefs': 0, |
... | ... | |
203 | 202 |
rv[key] = value |
204 | 203 |
return rv |
205 | 204 | |
206 |
for f in z.namelist(): |
|
207 |
if f in ('.indexes', '.max_id'): |
|
208 |
continue |
|
209 |
if os.path.dirname(f) in ( |
|
210 |
'formdefs_xml', |
|
211 |
'carddefs_xml', |
|
212 |
'workflows_xml', |
|
213 |
'blockdefs_xml', |
|
214 |
'roles_xml', |
|
215 |
): |
|
216 |
continue |
|
217 |
path = os.path.join(self.app_dir, f) |
|
218 |
if not os.path.exists(os.path.dirname(path)): |
|
219 |
os.mkdir(os.path.dirname(path)) |
|
220 |
if not os.path.basename(f): |
|
221 |
# skip directories |
|
222 |
continue |
|
223 |
data = z.read(f) |
|
224 |
if f in ('config.pck', 'config.json'): |
|
225 |
results['settings'] = 1 |
|
226 |
if f == 'config.pck': |
|
227 |
d = pickle.loads(data) |
|
228 |
else: |
|
229 |
d = json.loads(force_text(data), object_hook=_decode_dict) |
|
230 |
if 'sp' in self.cfg: |
|
231 |
current_sp = self.cfg['sp'] |
|
232 |
else: |
|
233 |
current_sp = None |
|
234 |
self.cfg = d |
|
235 |
if current_sp: |
|
236 |
self.cfg['sp'] = current_sp |
|
237 |
elif 'sp' in self.cfg: |
|
238 |
del self.cfg['sp'] |
|
239 |
self.write_cfg() |
|
240 |
continue |
|
241 |
open(path, 'wb').write(data) |
|
242 |
if os.path.split(f)[0] in results: |
|
243 |
results[os.path.split(f)[0]] += 1 |
|
244 | ||
245 |
# second pass, fields blocks |
|
246 |
from wcs.blocks import BlockDef |
|
247 | ||
248 |
for f in z.namelist(): |
|
249 |
if os.path.dirname(f) == 'blockdefs_xml' and os.path.basename(f): |
|
250 |
blockdef = BlockDef.import_from_xml(z.open(f), include_id=True) |
|
251 |
blockdef.store() |
|
252 |
results['blockdefs'] += 1 |
|
253 | ||
254 |
# third pass, workflows |
|
255 |
from wcs.workflows import Workflow |
|
256 | ||
257 |
for f in z.namelist(): |
|
258 |
if os.path.dirname(f) == 'workflows_xml' and os.path.basename(f): |
|
259 |
workflow = Workflow.import_from_xml(z.open(f), include_id=True, check_datasources=False) |
|
260 |
workflow.store() |
|
261 |
results['workflows'] += 1 |
|
262 | ||
263 |
# fourth pass, forms and cards |
|
264 |
from wcs.carddef import CardDef |
|
265 |
from wcs.formdef import FormDef |
|
266 | ||
267 |
formdefs = [] |
|
268 |
carddefs = [] |
|
269 |
for f in z.namelist(): |
|
270 |
if os.path.dirname(f) == 'formdefs_xml' and os.path.basename(f): |
|
271 |
formdef = FormDef.import_from_xml(z.open(f), include_id=True, check_datasources=False) |
|
272 |
formdef.store() |
|
273 |
formdefs.append(formdef) |
|
274 |
results['formdefs'] += 1 |
|
275 |
if os.path.dirname(f) == 'carddefs_xml' and os.path.basename(f): |
|
276 |
carddef = CardDef.import_from_xml(z.open(f), include_id=True, check_datasources=False) |
|
277 |
carddef.store() |
|
278 |
carddefs.append(carddef) |
|
279 |
results['carddefs'] += 1 |
|
280 | ||
281 |
# sixth pass, roles |
|
282 |
roles = [] |
|
283 |
for f in z.namelist(): |
|
284 |
if os.path.dirname(f) == 'roles_xml' and os.path.basename(f): |
|
285 |
role = self.role_class.import_from_xml(z.open(f), include_id=True) |
|
286 |
role.store() |
|
287 |
roles.append(role) |
|
288 |
results['roles'] += 1 |
|
289 | ||
290 |
# rebuild indexes for imported objects |
|
291 |
for k, v in results.items(): |
|
292 |
if k == 'settings': |
|
293 |
continue |
|
294 |
if v == 0: |
|
295 |
continue |
|
296 |
klass = None |
|
297 |
if k == 'formdefs': |
|
298 |
from .formdef import FormDef |
|
299 | ||
300 |
klass = FormDef |
|
301 |
elif k == 'carddefs': |
|
302 |
from .carddef import CardDef |
|
303 | ||
304 |
klass = CardDef |
|
305 |
elif k == 'blockdefs': |
|
306 |
klass = BlockDef |
|
307 |
elif k == 'categories': |
|
308 |
from .categories import Category |
|
309 | ||
310 |
klass = Category |
|
311 |
elif k == 'roles': |
|
312 |
klass = self.role_class |
|
313 |
elif k == 'workflows': |
|
314 |
klass = Workflow |
|
315 |
if klass: |
|
316 |
klass.rebuild_indexes() |
|
317 | ||
318 |
if k == 'formdefs': |
|
319 |
# in case of formdefs, we store them anew in case SQL changes |
|
320 |
# are required. |
|
321 |
for formdef in formdefs or FormDef.select(): |
|
205 |
with zipfile.ZipFile(fd) as z: |
|
206 |
for f in z.namelist(): |
|
207 |
if f in ('.indexes', '.max_id'): |
|
208 |
continue |
|
209 |
if os.path.dirname(f) in ( |
|
210 |
'formdefs_xml', |
|
211 |
'carddefs_xml', |
|
212 |
'workflows_xml', |
|
213 |
'blockdefs_xml', |
|
214 |
'roles_xml', |
|
215 |
): |
|
216 |
continue |
|
217 |
path = os.path.join(self.app_dir, f) |
|
218 |
if not os.path.exists(os.path.dirname(path)): |
|
219 |
os.mkdir(os.path.dirname(path)) |
|
220 |
if not os.path.basename(f): |
|
221 |
# skip directories |
|
222 |
continue |
|
223 |
data = z.read(f) |
|
224 |
if f in ('config.pck', 'config.json'): |
|
225 |
results['settings'] = 1 |
|
226 |
if f == 'config.pck': |
|
227 |
d = pickle.loads(data) |
|
228 |
else: |
|
229 |
d = json.loads(force_text(data), object_hook=_decode_dict) |
|
230 |
if 'sp' in self.cfg: |
|
231 |
current_sp = self.cfg['sp'] |
|
232 |
else: |
|
233 |
current_sp = None |
|
234 |
self.cfg = d |
|
235 |
if current_sp: |
|
236 |
self.cfg['sp'] = current_sp |
|
237 |
elif 'sp' in self.cfg: |
|
238 |
del self.cfg['sp'] |
|
239 |
self.write_cfg() |
|
240 |
continue |
|
241 |
open(path, 'wb').write(data) |
|
242 |
if os.path.split(f)[0] in results: |
|
243 |
results[os.path.split(f)[0]] += 1 |
|
244 | ||
245 |
# second pass, fields blocks |
|
246 |
from wcs.blocks import BlockDef |
|
247 | ||
248 |
for f in z.namelist(): |
|
249 |
if os.path.dirname(f) == 'blockdefs_xml' and os.path.basename(f): |
|
250 |
blockdef = BlockDef.import_from_xml(z.open(f), include_id=True) |
|
251 |
blockdef.store() |
|
252 |
results['blockdefs'] += 1 |
|
253 | ||
254 |
# third pass, workflows |
|
255 |
from wcs.workflows import Workflow |
|
256 | ||
257 |
for f in z.namelist(): |
|
258 |
if os.path.dirname(f) == 'workflows_xml' and os.path.basename(f): |
|
259 |
workflow = Workflow.import_from_xml(z.open(f), include_id=True, check_datasources=False) |
|
260 |
workflow.store() |
|
261 |
results['workflows'] += 1 |
|
262 | ||
263 |
# fourth pass, forms and cards |
|
264 |
from wcs.carddef import CardDef |
|
265 |
from wcs.formdef import FormDef |
|
266 | ||
267 |
formdefs = [] |
|
268 |
carddefs = [] |
|
269 |
for f in z.namelist(): |
|
270 |
if os.path.dirname(f) == 'formdefs_xml' and os.path.basename(f): |
|
271 |
formdef = FormDef.import_from_xml(z.open(f), include_id=True, check_datasources=False) |
|
322 | 272 |
formdef.store() |
323 |
elif k == 'carddefs': |
|
324 |
# ditto for cards |
|
325 |
for carddef in carddefs or CardDef.select(): |
|
273 |
formdefs.append(formdef) |
|
274 |
results['formdefs'] += 1 |
|
275 |
if os.path.dirname(f) == 'carddefs_xml' and os.path.basename(f): |
|
276 |
carddef = CardDef.import_from_xml(z.open(f), include_id=True, check_datasources=False) |
|
326 | 277 |
carddef.store() |
278 |
carddefs.append(carddef) |
|
279 |
results['carddefs'] += 1 |
|
280 | ||
281 |
# sixth pass, roles |
|
282 |
roles = [] |
|
283 |
for f in z.namelist(): |
|
284 |
if os.path.dirname(f) == 'roles_xml' and os.path.basename(f): |
|
285 |
role = self.role_class.import_from_xml(z.open(f), include_id=True) |
|
286 |
role.store() |
|
287 |
roles.append(role) |
|
288 |
results['roles'] += 1 |
|
289 | ||
290 |
# rebuild indexes for imported objects |
|
291 |
for k, v in results.items(): |
|
292 |
if k == 'settings': |
|
293 |
continue |
|
294 |
if v == 0: |
|
295 |
continue |
|
296 |
klass = None |
|
297 |
if k == 'formdefs': |
|
298 |
from .formdef import FormDef |
|
299 | ||
300 |
klass = FormDef |
|
301 |
elif k == 'carddefs': |
|
302 |
from .carddef import CardDef |
|
303 | ||
304 |
klass = CardDef |
|
305 |
elif k == 'blockdefs': |
|
306 |
klass = BlockDef |
|
307 |
elif k == 'categories': |
|
308 |
from .categories import Category |
|
309 | ||
310 |
klass = Category |
|
311 |
elif k == 'roles': |
|
312 |
klass = self.role_class |
|
313 |
elif k == 'workflows': |
|
314 |
klass = Workflow |
|
315 |
if klass: |
|
316 |
klass.rebuild_indexes() |
|
317 | ||
318 |
if k == 'formdefs': |
|
319 |
# in case of formdefs, we store them anew in case SQL changes |
|
320 |
# are required. |
|
321 |
for formdef in formdefs or FormDef.select(): |
|
322 |
formdef.store() |
|
323 |
elif k == 'carddefs': |
|
324 |
# ditto for cards |
|
325 |
for carddef in carddefs or CardDef.select(): |
|
326 |
carddef.store() |
|
327 | 327 | |
328 |
z.close() |
|
329 | 328 |
return results |
330 | 329 | |
331 | 330 |
def initialize_sql(self): |
wcs/qommon/admin/menu.py | ||
---|---|---|
45 | 45 |
if os.path.exists('/etc/debian_version'): |
46 | 46 |
# debian case |
47 | 47 |
try: |
48 |
process = subprocess.Popen(
|
|
48 |
with subprocess.Popen(
|
|
49 | 49 |
['dpkg', '-l', package], stdout=subprocess.PIPE, stderr=subprocess.STDOUT |
50 |
) |
|
51 |
version = process.communicate()[0].splitlines()[-1].split()[2] |
|
52 |
if process.returncode == 0: |
|
53 |
return "%s %s (Debian)" % (package, version.decode()) |
|
50 |
) as process:
|
|
51 |
version = process.communicate()[0].splitlines()[-1].split()[2]
|
|
52 |
if process.returncode == 0:
|
|
53 |
return "%s %s (Debian)" % (package, version.decode())
|
|
54 | 54 |
except Exception: |
55 | 55 |
pass |
56 | 56 |
return None |
... | ... | |
66 | 66 | |
67 | 67 |
if os.path.exists(os.path.join(srcdir, '.git')): |
68 | 68 |
try: |
69 |
process = subprocess.Popen(
|
|
69 |
with subprocess.Popen(
|
|
70 | 70 |
['git', 'log', '--pretty=oneline', '-1'], stdout=subprocess.PIPE, cwd=srcdir |
71 |
) |
|
72 |
output = process.communicate()[0] |
|
71 |
) as process:
|
|
72 |
output = process.communicate()[0]
|
|
73 | 73 |
rev = str(output.split()[0].decode('ascii')) |
74 |
process = subprocess.Popen(['git', 'branch'], stdout=subprocess.PIPE, cwd=srcdir)
|
|
75 |
output = process.communicate()[0] |
|
74 |
with subprocess.Popen(['git', 'branch'], stdout=subprocess.PIPE, cwd=srcdir) as process:
|
|
75 |
output = process.communicate()[0]
|
|
76 | 76 |
starred_line = [x for x in output.splitlines() if x.startswith(b'*')][0] |
77 | 77 |
branch = str(starred_line.split()[1].decode('ascii')) |
78 | 78 |
url = "https://repos.entrouvert.org/%s.git/commit/?id=%s" % (package, rev) |
wcs/qommon/form.py | ||
---|---|---|
660 | 660 |
file_path = self.build_file_path() |
661 | 661 |
if not os.path.exists(self.dir_path()): |
662 | 662 |
os.mkdir(self.dir_path()) |
663 |
open(file_path, 'wb').write(content) |
|
663 |
with open(file_path, 'wb') as f: |
|
664 |
f.write(content) |
|
664 | 665 | |
665 | 666 |
def dir_path(self): |
666 | 667 |
return os.path.join(get_publisher().app_dir, self.directory) |
... | ... | |
669 | 670 |
return os.path.join(get_publisher().app_dir, self.directory, self.filename) |
670 | 671 | |
671 | 672 |
def get_file(self): |
672 |
return open(self.build_file_path(), 'rb') |
|
673 |
return open(self.build_file_path(), 'rb') # pylint: disable=consider-using-with
|
|
673 | 674 | |
674 | 675 |
def get_content(self): |
675 | 676 |
return self.get_file().read() |
wcs/qommon/misc.py | ||
---|---|---|
679 | 679 |
except subprocess.CalledProcessError: |
680 | 680 |
raise ThumbnailError() |
681 | 681 |
else: |
682 |
fp = open(filepath, 'rb') |
|
683 | ||
682 |
fp = open(filepath, 'rb') # pylint: disable=consider-using-with |
|
684 | 683 |
try: |
685 | 684 |
image = Image.open(fp) |
686 | 685 |
try: |
wcs/qommon/ods.py | ||
---|---|---|
157 | 157 |
return ET.tostring(self.get_content_node(), 'utf-8') |
158 | 158 | |
159 | 159 |
def save(self, output): |
160 |
z = zipfile.ZipFile(output, 'w') |
|
161 |
z.writestr('content.xml', self.get_content()) |
|
162 |
z.writestr('styles.xml', self.get_styles()) |
|
163 |
z.writestr('mimetype', 'application/vnd.oasis.opendocument.spreadsheet') |
|
164 |
z.writestr( |
|
165 |
'META-INF/manifest.xml', |
|
166 |
'''<?xml version="1.0" encoding="UTF-8"?> |
|
167 |
<manifest:manifest xmlns:manifest="urn:oasis:names:tc:opendocument:xmlns:manifest:1.0"> |
|
168 |
<manifest:file-entry manifest:full-path="/" manifest:media-type="application/vnd.oasis.opendocument.spreadsheet"/> |
|
169 |
<manifest:file-entry manifest:full-path="styles.xml" manifest:media-type="text/xml"/> |
|
170 |
<manifest:file-entry manifest:full-path="content.xml" manifest:media-type="text/xml"/> |
|
171 |
<manifest:file-entry manifest:full-path="META-INF/manifest.xml" manifest:media-type="text/xml"/> |
|
172 |
<manifest:file-entry manifest:full-path="mimetype" manifest:media-type="text/plain"/> |
|
173 |
</manifest:manifest>''', |
|
174 |
) |
|
175 |
z.close() |
|
160 |
with zipfile.ZipFile(output, 'w') as z: |
|
161 |
z.writestr('content.xml', self.get_content()) |
|
162 |
z.writestr('styles.xml', self.get_styles()) |
|
163 |
z.writestr('mimetype', 'application/vnd.oasis.opendocument.spreadsheet') |
|
164 |
z.writestr( |
|
165 |
'META-INF/manifest.xml', |
|
166 |
'''<?xml version="1.0" encoding="UTF-8"?> |
|
167 |
<manifest:manifest xmlns:manifest="urn:oasis:names:tc:opendocument:xmlns:manifest:1.0"> |
|
168 |
<manifest:file-entry manifest:full-path="/" manifest:media-type="application/vnd.oasis.opendocument.spreadsheet"/> |
|
169 |
<manifest:file-entry manifest:full-path="styles.xml" manifest:media-type="text/xml"/> |
|
170 |
<manifest:file-entry manifest:full-path="content.xml" manifest:media-type="text/xml"/> |
|
171 |
<manifest:file-entry manifest:full-path="META-INF/manifest.xml" manifest:media-type="text/xml"/> |
|
172 |
<manifest:file-entry manifest:full-path="mimetype" manifest:media-type="text/plain"/> |
|
173 |
</manifest:manifest>''', |
|
174 |
) |
|
176 | 175 | |
177 | 176 | |
178 | 177 |
class WorkSheet: |
wcs/qommon/storage.py | ||
---|---|---|
501 | 501 |
def get_filename(cls, filename, ignore_errors=False, ignore_migration=False, **kwargs): |
502 | 502 |
fd = None |
503 | 503 |
try: |
504 |
fd = open(force_bytes(filename, 'utf-8'), 'rb') |
|
504 |
fd = open(force_bytes(filename, 'utf-8'), 'rb') # pylint: disable=consider-using-with
|
|
505 | 505 |
o = cls.storage_load(fd, **kwargs) |
506 | 506 |
except IOError: |
507 | 507 |
if ignore_errors: |
wcs/qommon/upload_storage.py | ||
---|---|---|
42 | 42 |
return self.__dict__.get('fp') |
43 | 43 |
elif getattr(self, 'qfilename', None): |
44 | 44 |
basedir = os.path.join(get_publisher().app_dir, 'uploads') |
45 |
self.fp = open(os.path.join(basedir, self.qfilename), 'rb') |
|
45 |
self.fp = open(os.path.join(basedir, self.qfilename), 'rb') # pylint: disable=consider-using-with
|
|
46 | 46 |
return self.fp |
47 | 47 |
return None |
48 | 48 | |
... | ... | |
125 | 125 |
upload.__class__ = PicklableUpload |
126 | 126 |
dirname = os.path.join(get_publisher().app_dir, 'tempfiles') |
127 | 127 |
filename = os.path.join(dirname, upload.token) |
128 |
fd = open(filename, 'wb') |
|
129 |
upload.get_file_pointer().seek(0) |
|
130 |
fd.write(upload.get_file_pointer().read()) |
|
131 |
upload.size = fd.tell() |
|
132 |
fd.close() |
|
128 |
with open(filename, 'wb') as fd: |
|
129 |
upload.get_file_pointer().seek(0) |
|
130 |
fd.write(upload.get_file_pointer().read()) |
|
131 |
upload.size = fd.tell() |
|
133 | 132 | |
134 | 133 |
def get_tempfile(self, temp_data): |
135 | 134 |
value = PicklableUpload(temp_data['orig_filename'], temp_data['content_type'], temp_data['charset']) |
... | ... | |
138 | 137 |
filename = os.path.join(dirname, temp_data['unsigned_token']) |
139 | 138 |
value.token = temp_data['token'] |
140 | 139 |
value.file_size = os.path.getsize(filename) |
141 |
value.fp = open(filename, 'rb') |
|
140 |
value.fp = open(filename, 'rb') # pylint: disable=consider-using-with
|
|
142 | 141 |
return value |
143 | 142 | |
144 | 143 |
def save(self, upload): |
wcs/qommon/vendor/locket.py | ||
---|---|---|
65 | 65 | |
66 | 66 | |
67 | 67 |
def lock_file(path, **kwargs): |
68 |
_locks_lock.acquire() |
|
69 |
try: |
|
68 |
with _locks_lock: |
|
70 | 69 |
lock = _locks.get(path) |
71 | 70 |
if lock is None: |
72 | 71 |
lock = _create_lock_file(path, **kwargs) |
73 | 72 |
_locks[path] = lock |
74 | 73 |
return lock |
75 |
finally: |
|
76 |
_locks_lock.release() |
|
77 | 74 | |
78 | 75 | |
79 | 76 |
def _create_lock_file(path, **kwargs): |
... | ... | |
139 | 136 | |
140 | 137 |
def acquire(self): |
141 | 138 |
if self._timeout is None: |
142 |
self._lock.acquire() |
|
139 |
self._lock.acquire() # pylint: disable=consider-using-with
|
|
143 | 140 |
else: |
144 | 141 |
_acquire_non_blocking( |
145 |
acquire=lambda: self._lock.acquire(False), |
|
142 |
acquire=lambda: self._lock.acquire(False), # pylint: disable=consider-using-with
|
|
146 | 143 |
timeout=self._timeout, |
147 | 144 |
retry_period=self._retry_period, |
148 | 145 |
path=self._path, |
... | ... | |
162 | 159 | |
163 | 160 |
def acquire(self): |
164 | 161 |
if self._file is None: |
165 |
self._file = open(self._path, "w") |
|
162 |
self._file = open(self._path, "w") # pylint: disable=consider-using-with
|
|
166 | 163 |
if self._timeout is None: |
167 | 164 |
_lock_file_blocking(self._file) |
168 | 165 |
else: |
wcs/qommon/x509utils.py | ||
---|---|---|
45 | 45 |
Return a tuple made of the return code and the stdout output |
46 | 46 |
""" |
47 | 47 |
try: |
48 |
process = subprocess.Popen(args=[_openssl] + args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) |
|
49 |
output = process.communicate()[0] |
|
50 |
return process.returncode, output |
|
48 |
with subprocess.Popen( |
|
49 |
args=[_openssl] + args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT |
|
50 |
) as process: |
|
51 |
output = process.communicate()[0] |
|
52 |
return process.returncode, output |
|
51 | 53 |
except OSError: |
52 | 54 |
return 1, None |
53 | 55 |
wcs/wf/export_to_model.py | ||
---|---|---|
135 | 135 |
format, parse context.xml with element tree and apply process to its root |
136 | 136 |
node. |
137 | 137 |
""" |
138 |
zin = zipfile.ZipFile(instream, mode='r') |
|
139 |
zout = zipfile.ZipFile(outstream, mode='w') |
|
140 |
new_images = {} |
|
141 |
assert 'content.xml' in zin.namelist() |
|
142 |
for filename in zin.namelist(): |
|
143 |
# first pass to process meta.xml, content.xml and styles.xml |
|
144 |
if filename not in ('meta.xml', 'content.xml', 'styles.xml'): |
|
145 |
continue |
|
146 |
content = zin.read(filename) |
|
147 |
root = ET.fromstring(content) |
|
148 |
process(root, new_images) |
|
149 |
content = ET.tostring(root) |
|
150 |
zout.writestr(filename, content) |
|
151 | ||
152 |
for filename in zin.namelist(): |
|
153 |
# second pass to copy/replace other files |
|
154 |
if filename in ('meta.xml', 'content.xml', 'styles.xml'): |
|
155 |
continue |
|
156 |
if filename in new_images: |
|
157 |
content = new_images[filename].get_content() |
|
158 |
else: |
|
138 |
with zipfile.ZipFile(instream, mode='r') as zin, zipfile.ZipFile(outstream, mode='w') as zout: |
|
139 |
new_images = {} |
|
140 |
assert 'content.xml' in zin.namelist() |
|
141 |
for filename in zin.namelist(): |
|
142 |
# first pass to process meta.xml, content.xml and styles.xml |
|
143 |
if filename not in ('meta.xml', 'content.xml', 'styles.xml'): |
|
144 |
continue |
|
159 | 145 |
content = zin.read(filename) |
160 |
zout.writestr(filename, content) |
|
161 |
zout.close() |
|
146 |
root = ET.fromstring(content) |
|
147 |
process(root, new_images) |
|
148 |
content = ET.tostring(root) |
|
149 |
zout.writestr(filename, content) |
|
150 | ||
151 |
for filename in zin.namelist(): |
|
152 |
# second pass to copy/replace other files |
|
153 |
if filename in ('meta.xml', 'content.xml', 'styles.xml'): |
|
154 |
continue |
|
155 |
if filename in new_images: |
|
156 |
content = new_images[filename].get_content() |
|
157 |
else: |
|
158 |
content = zin.read(filename) |
|
159 |
zout.writestr(filename, content) |
|
162 | 160 | |
163 | 161 | |
164 | 162 |
def is_opendocument(stream): |
wcs/workflows.py | ||
---|---|---|
229 | 229 |
def get_file_pointer(self): |
230 | 230 |
if self.filename.startswith('uuid-'): |
231 | 231 |
return None |
232 |
return open(self.filename, 'rb') |
|
232 |
return open(self.filename, 'rb') # pylint: disable=consider-using-with
|
|
233 | 233 | |
234 | 234 |
def __getstate__(self): |
235 | 235 |
odict = self.__dict__.copy() |
236 |
- |