Projet

Général

Profil

0001-misc-fix-pylint-consider-using-with-53406.patch

Lauréline Guérin, 30 avril 2021 10:56

Télécharger (99,4 ko)

Voir les différences:

Subject: [PATCH] misc: fix pylint consider-using-with (#53406)

 pylint.rc                                     |   1 -
 tests/admin_pages/test_all.py                 |  10 +-
 tests/admin_pages/test_datasource.py          |  31 +-
 tests/admin_pages/test_form.py                |  12 +-
 tests/admin_pages/test_settings.py            |  48 ++--
 tests/api/test_custom_view.py                 |   8 +-
 tests/api/test_formdata.py                    |  11 +-
 tests/backoffice_pages/test_all.py            |  62 ++--
 tests/backoffice_pages/test_carddata.py       |  15 +-
 tests/backoffice_pages/test_custom_view.py    |  15 +-
 tests/backoffice_pages/test_export.py         |  29 +-
 tests/backoffice_pages/test_submission.py     |  15 +-
 tests/form_pages/test_all.py                  |  31 +-
 tests/form_pages/test_formdata.py             |  31 +-
 tests/test_categories.py                      |   5 +-
 tests/test_datasource.py                      | 272 ++++++++----------
 tests/test_ezt.py                             |   5 +-
 tests/test_hobo.py                            |  25 +-
 tests/test_misc.py                            |  56 ++--
 tests/test_publisher.py                       |  14 +-
 tests/test_saml_auth.py                       |  15 +-
 tests/test_workflows.py                       |  12 +-
 tests/utilities.py                            |  25 +-
 wcs/admin/forms.py                            |  12 +-
 wcs/admin/settings.py                         | 214 +++++++-------
 wcs/admin/workflows.py                        |   8 +-
 wcs/backoffice/management.py                  |  22 +-
 wcs/ctl/backup.py                             |  20 +-
 wcs/ctl/management/commands/convert_to_sql.py |  30 +-
 wcs/ctl/restore.py                            |  11 +-
 wcs/publisher.py                              | 241 ++++++++--------
 wcs/qommon/admin/menu.py                      |  20 +-
 wcs/qommon/form.py                            |   5 +-
 wcs/qommon/misc.py                            |   3 +-
 wcs/qommon/ods.py                             |  31 +-
 wcs/qommon/storage.py                         |   2 +-
 wcs/qommon/upload_storage.py                  |  13 +-
 wcs/qommon/vendor/locket.py                   |  11 +-
 wcs/qommon/x509utils.py                       |   8 +-
 wcs/wf/export_to_model.py                     |  44 ++-
 wcs/workflows.py                              |   2 +-
 41 files changed, 691 insertions(+), 754 deletions(-)
pylint.rc
13 13
    consider-using-dict-comprehension,
14 14
    consider-using-max-builtin,
15 15
    consider-using-set-comprehension,
16
    consider-using-with,
17 16
    cyclic-import,
18 17
    duplicate-code,
19 18
    fixme,
tests/admin_pages/test_all.py
115 115
        resp = get_app(pub).get('/backoffice/settings/', status=200)
116 116

  
117 117
        # check it doesn't work with a non-empty ADMIN_FOR_ALL file
118
        fd = open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w')
119
        fd.write('x.x.x.x')
120
        fd.close()
118
        with open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w') as fd:
119
            fd.write('x.x.x.x')
121 120
        resp = get_app(pub).get('/backoffice/settings/', status=302)
122 121

  
123 122
        # check it works if the file contains the user IP address
124
        fd = open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w')
125
        fd.write('127.0.0.1')
126
        fd.close()
123
        with open(os.path.join(pub.app_dir, 'ADMIN_FOR_ALL'), 'w') as fd:
124
            fd.write('127.0.0.1')
127 125
        resp = get_app(pub).get('/backoffice/settings/', status=200)
128 126

  
129 127
        # check it's also ok if the user is logged in but doesn't have the
tests/admin_pages/test_datasource.py
251 251

  
252 252
    # check json
253 253
    json_file_path = os.path.join(pub.app_dir, 'test.json')
254
    json_file = open(json_file_path, 'w')
255
    json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
256
    json_file.close()
254
    with open(json_file_path, 'w') as json_file:
255
        json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
257 256

  
258 257
    data_source.data_source = {'type': 'json', 'value': 'file://%s' % json_file_path}
259 258
    data_source.store()
......
263 262
    assert 'foo' in resp.text
264 263

  
265 264
    # with other attributes
266
    json_file = open(json_file_path, 'w')
267
    json.dump({'results': [{'pk': '1', 'label': 'foo'}, {'pk': '2'}]}, json_file)
268
    json_file.close()
265
    with open(json_file_path, 'w') as json_file:
266
        json.dump({'results': [{'pk': '1', 'label': 'foo'}, {'pk': '2'}]}, json_file)
269 267

  
270 268
    data_source.data_attribute = 'results'
271 269
    data_source.id_attribute = 'pk'
......
288 286

  
289 287
    # check geojson
290 288
    geojson_file_path = os.path.join(pub.app_dir, 'test.geojson')
291
    geojson_file = open(geojson_file_path, 'w')
292
    json.dump(
293
        {
294
            'features': [
295
                {'properties': {'id': '1', 'text': 'foo', 'label': 'foo'}},
296
                {'properties': {'id': '2', 'text': 'bar', 'label': 'bar'}},
297
            ]
298
        },
299
        geojson_file,
300
    )
301
    geojson_file.close()
289
    with open(geojson_file_path, 'w') as geojson_file:
290
        json.dump(
291
            {
292
                'features': [
293
                    {'properties': {'id': '1', 'text': 'foo', 'label': 'foo'}},
294
                    {'properties': {'id': '2', 'text': 'bar', 'label': 'bar'}},
295
                ]
296
            },
297
            geojson_file,
298
        )
302 299
    data_source.data_source = {'type': 'geojson', 'value': 'file://%s' % geojson_file_path}
303 300
    data_source.store()
304 301
    with HttpRequestsMocking():
tests/admin_pages/test_form.py
2439 2439
    resp = resp.click(href='archive')
2440 2440
    resp = resp.form.submit('submit')
2441 2441
    assert resp.content_type == 'application/x-wcs-archive'
2442
    tf = tarfile.open(fileobj=io.BytesIO(resp.body))
2443
    assert 'formdef' in [x.name for x in tf.getmembers()]
2444
    assert len(tf.getmembers()) == 8  # 7 formdata + 1 formdef
2442
    with tarfile.open(fileobj=io.BytesIO(resp.body)) as tf:
2443
        assert 'formdef' in [x.name for x in tf.getmembers()]
2444
        assert len(tf.getmembers()) == 8  # 7 formdata + 1 formdef
2445 2445

  
2446 2446
    # second archive, it shouldn't get anything (but the formdef)
2447 2447
    resp = app.get('/backoffice/forms/1/')
2448 2448
    resp = resp.click(href='archive')
2449 2449
    resp = resp.form.submit('submit')
2450 2450
    assert resp.content_type == 'application/x-wcs-archive'
2451
    tf = tarfile.open(fileobj=io.BytesIO(resp.body))
2452
    assert 'formdef' in [x.name for x in tf.getmembers()]
2453
    assert len(tf.getmembers()) == 1  # 0 formdata + 1 formdef
2451
    with tarfile.open(fileobj=io.BytesIO(resp.body)) as tf:
2452
        assert 'formdef' in [x.name for x in tf.getmembers()]
2453
        assert len(tf.getmembers()) == 1  # 0 formdata + 1 formdef
2454 2454

  
2455 2455

  
2456 2456
def test_form_overwrite(pub):
tests/admin_pages/test_settings.py
115 115
    assert 'completed' in resp.text
116 116
    resp = resp.click('Download Export')
117 117
    zip_content = io.BytesIO(resp.body)
118
    zipf = zipfile.ZipFile(zip_content, 'a')
119
    filelist = zipf.namelist()
118
    with zipfile.ZipFile(zip_content, 'a') as zipf:
119
        filelist = zipf.namelist()
120 120
    assert len(filelist) == 0
121 121

  
122 122
    # check afterjob ajax call
......
166 166
    resp = resp.follow()
167 167
    resp = resp.click('Download Export')
168 168
    zip_content = io.BytesIO(resp.body)
169
    zipf = zipfile.ZipFile(zip_content, 'a')
170
    filelist = zipf.namelist()
169
    with zipfile.ZipFile(zip_content, 'a') as zipf:
170
        filelist = zipf.namelist()
171 171
    assert 'formdefs/1' not in filelist
172 172
    assert 'formdefs_xml/1' in filelist
173 173
    assert 'carddefs/1' not in filelist
......
249 249
    resp = resp.follow()
250 250
    resp = resp.click('Download Export')
251 251
    zip_content = io.BytesIO(resp.body)
252
    zipf = zipfile.ZipFile(zip_content, 'a')
253
    filelist = zipf.namelist()
252
    with zipfile.ZipFile(zip_content, 'a') as zipf:
253
        filelist = zipf.namelist()
254 254
    assert 'formdefs_xml/%s' % formdef.id in filelist
255 255
    assert 'workflows_xml/%s' % workflow.id in filelist
256 256
    assert 'roles_xml/%s' % role.id not in filelist
......
283 283
    resp = resp.follow()
284 284
    resp = resp.click('Download Export')
285 285
    zip_content = io.BytesIO(resp.body)
286
    zipf = zipfile.ZipFile(zip_content, 'a')
287
    filelist = zipf.namelist()
286
    with zipfile.ZipFile(zip_content, 'a') as zipf:
287
        filelist = zipf.namelist()
288 288
    assert len([x for x in filelist if 'roles_xml/' in x]) == 0
289 289

  
290 290
    # check an error is displayed if such an import is then used and roles are
......
305 305
    # create mock theme
306 306
    os.mkdir(os.path.join(pub.app_dir, 'themes'))
307 307
    os.mkdir(os.path.join(pub.app_dir, 'themes', 'test'))
308
    fd = open(os.path.join(pub.app_dir, 'themes', 'test', 'desc.xml'), 'w')
309
    fd.write(
310
        '<?xml version="1.0"?>' '<theme name="test" version="1.0">' '  <label>Test Theme</label>' '</theme>'
311
    )
312
    fd.close()
308
    with open(os.path.join(pub.app_dir, 'themes', 'test', 'desc.xml'), 'w') as fd:
309
        fd.write(
310
            '<?xml version="1.0"?>'
311
            '<theme name="test" version="1.0">'
312
            '  <label>Test Theme</label>'
313
            '</theme>'
314
        )
313 315

  
314 316
    resp = app.get('/backoffice/settings/themes')
315 317
    assert 'biglist themes' in resp.text
......
831 833
    resp = app.get('/backoffice/settings/themes')
832 834
    resp = resp.click('download', index=0)
833 835
    assert resp.headers['content-type'] == 'application/zip'
836

  
834 837
    zip_content = io.BytesIO(resp.body)
835
    zipf = zipfile.ZipFile(zip_content, 'a')
836
    filelist = zipf.namelist()
837
    assert 'alto/icon.png' in filelist
838
    assert 'alto/desc.xml' in filelist
839
    assert 'alto/template.ezt' in filelist
840
    assert 'alto/wcs.css' in filelist
841

  
842
    # modify it
843
    zipf.writestr('alto/foobar.txt', 'XXX')
844
    zipf.close()
838
    with zipfile.ZipFile(zip_content, 'a') as zipf:
839
        filelist = zipf.namelist()
840
        assert 'alto/icon.png' in filelist
841
        assert 'alto/desc.xml' in filelist
842
        assert 'alto/template.ezt' in filelist
843
        assert 'alto/wcs.css' in filelist
844

  
845
        # modify it
846
        zipf.writestr('alto/foobar.txt', 'XXX')
845 847

  
846 848
    # upload it
847 849
    resp = app.get('/backoffice/settings/themes')
tests/api/test_custom_view.py
244 244

  
245 245
    # check it now gets the data
246 246
    resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
247
    zipf = zipfile.ZipFile(io.BytesIO(resp.body))
248
    ods_sheet = ET.parse(zipf.open('content.xml'))
247
    with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
248
        ods_sheet = ET.parse(zipf.open('content.xml'))
249 249
    assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 11
250 250

  
251 251
    pub.custom_view_class.wipe()
......
258 258
    custom_view.store()
259 259

  
260 260
    resp = get_app(pub).get(sign_uri('/api/forms/test/ods/custom-view', user=local_user))
261
    zipf = zipfile.ZipFile(io.BytesIO(resp.body))
262
    ods_sheet = ET.parse(zipf.open('content.xml'))
261
    with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
262
        ods_sheet = ET.parse(zipf.open('content.xml'))
263 263
    assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 21
264 264

  
265 265

  
tests/api/test_formdata.py
45 45
    pub.cfg['language'] = {'language': 'en'}
46 46
    pub.write_cfg()
47 47

  
48
    open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
49
        '''\
48
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
49
        fd.write(
50
            '''\
50 51
[api-secrets]
51 52
coucou = 1234
52 53
'''
53
    )
54
        )
54 55

  
55 56
    return pub
56 57

  
......
965 966

  
966 967
    resp = get_app(pub).get(sign_uri('/api/forms/test/ods', user=local_user))
967 968
    assert resp.content_type == 'application/vnd.oasis.opendocument.spreadsheet'
968
    zipf = zipfile.ZipFile(io.BytesIO(resp.body))
969
    ods_sheet = ET.parse(zipf.open('content.xml'))
969
    with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
970
        ods_sheet = ET.parse(zipf.open('content.xml'))
970 971
    assert len(ods_sheet.findall('.//{%s}table-row' % ods.NS['table'])) == 311
971 972

  
972 973

  
tests/backoffice_pages/test_all.py
66 66
    pub.cfg['identification'] = {'methods': ['password']}
67 67
    pub.cfg['language'] = {'language': 'en'}
68 68
    pub.write_cfg()
69
    fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
70
    fd.write(
71
        '''
72
[api-secrets]
73
coucou = 1234
74
'''
75
    )
76
    fd.close()
69
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
70
        fd.write(
71
            '''
72
    [api-secrets]
73
    coucou = 1234
74
    '''
75
        )
77 76

  
78 77
    return pub
79 78

  
......
528 527
    if not pub.site_options.has_section('options'):
529 528
        pub.site_options.add_section('options')
530 529
        pub.site_options.set('options', 'default-sort-order', '-last_update_time')
531
        fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
532
        pub.site_options.write(fd)
533
        fd.close()
530
        with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
531
            pub.site_options.write(fd)
534 532

  
535 533
    resp = app.get('/backoffice/management/form-title/')
536 534
    assert resp.text.count('data-link') == 17
......
605 603
    if not pub.site_options.has_section('variables'):
606 604
        pub.site_options.add_section('variables')
607 605
    pub.site_options.set('variables', 'welco_url', 'xxx')
608
    fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
609
    pub.site_options.write(fd)
610
    fd.close()
606
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
607
        pub.site_options.write(fd)
611 608

  
612 609
    create_superuser(pub)
613 610
    create_environment(pub)
......
1885 1882
        pub.site_options.add_section('options')
1886 1883

  
1887 1884
    pub.site_options.set('options', 'map-tile-urltemplate', 'https://{s}.tile.example.net/{z}/{x}/{y}.png')
1888
    fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
1889
    pub.site_options.write(fd)
1890
    fd.close()
1885
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
1886
        pub.site_options.write(fd)
1891 1887

  
1892 1888
    resp = app.get('/backoffice/management/form-title/')
1893 1889
    resp = resp.click('Plot on a Map')
......
2085 2081
    resp = app.get('/backoffice/management/form-title/%s/' % number31.id)
2086 2082
    resp = resp.click('Download all files as .zip')
2087 2083
    zip_content = io.BytesIO(resp.body)
2088
    zipf = zipfile.ZipFile(zip_content, 'a')
2089
    filelist = zipf.namelist()
2090
    assert set(filelist) == {'1_bar', '2_bar'}
2091
    for zipinfo in zipf.infolist():
2092
        content = zipf.read(zipinfo)
2093
        if zipinfo.filename == '1_bar':
2094
            assert content == b'hello world'
2095
        elif zipinfo.filename == '2_bar':
2096
            assert content == b'hello world2'
2097
        else:
2098
            assert False  # unknown zip part
2084
    with zipfile.ZipFile(zip_content, 'a') as zipf:
2085
        filelist = zipf.namelist()
2086
        assert set(filelist) == {'1_bar', '2_bar'}
2087
        for zipinfo in zipf.infolist():
2088
            content = zipf.read(zipinfo)
2089
            if zipinfo.filename == '1_bar':
2090
                assert content == b'hello world'
2091
            elif zipinfo.filename == '2_bar':
2092
                assert content == b'hello world2'
2093
            else:
2094
                assert False  # unknown zip part
2099 2095

  
2100 2096

  
2101 2097
def test_backoffice_sidebar_user_template(pub):
......
3011 3007
    if not pub.site_options.has_section('variables'):
3012 3008
        pub.site_options.add_section('variables')
3013 3009
    pub.site_options.set('variables', 'welco_url', 'xxx')
3014
    fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
3015
    pub.site_options.write(fd)
3016
    fd.close()
3010
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
3011
        pub.site_options.write(fd)
3017 3012

  
3018 3013
    resp = app.get('/backoffice/management/listing?limit=500')
3019 3014
    formdata = formdef.data_class().select(lambda x: x.status == 'wf-new')[0]
......
3611 3606
    if not pub.site_options.has_section('options'):
3612 3607
        pub.site_options.add_section('options')
3613 3608
        pub.site_options.set('options', 'per-user-view', 'true')
3614
        fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
3615
        pub.site_options.write(fd)
3616
        fd.close()
3609
        with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
3610
            pub.site_options.write(fd)
3617 3611

  
3618 3612
    resp = app.get('/backoffice/management/').follow()
3619 3613
    assert 'Per User View' in resp.text
tests/backoffice_pages/test_carddata.py
35 35
    pub.cfg['identification'] = {'methods': ['password']}
36 36
    pub.cfg['language'] = {'language': 'en'}
37 37
    pub.write_cfg()
38
    fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
39
    fd.write(
40
        '''
41
[api-secrets]
42
coucou = 1234
43
'''
44
    )
45
    fd.close()
38
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
39
        fd.write(
40
            '''
41
    [api-secrets]
42
    coucou = 1234
43
    '''
44
        )
46 45

  
47 46
    return pub
48 47

  
tests/backoffice_pages/test_custom_view.py
31 31
    pub.cfg['identification'] = {'methods': ['password']}
32 32
    pub.cfg['language'] = {'language': 'en'}
33 33
    pub.write_cfg()
34
    fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
35
    fd.write(
36
        '''
37
[api-secrets]
38
coucou = 1234
39
'''
40
    )
41
    fd.close()
34
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
35
        fd.write(
36
            '''
37
    [api-secrets]
38
    coucou = 1234
39
    '''
40
        )
42 41

  
43 42
    return pub
44 43

  
tests/backoffice_pages/test_export.py
37 37
    pub.cfg['identification'] = {'methods': ['password']}
38 38
    pub.cfg['language'] = {'language': 'en'}
39 39
    pub.write_cfg()
40
    fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
41
    fd.write(
42
        '''
43
[api-secrets]
44
coucou = 1234
45
'''
46
    )
47
    fd.close()
40
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
41
        fd.write(
42
            '''
43
    [api-secrets]
44
    coucou = 1234
45
    '''
46
        )
48 47

  
49 48
    return pub
50 49

  
......
236 235
    if not pub.site_options.has_section('variables'):
237 236
        pub.site_options.add_section('variables')
238 237
    pub.site_options.set('variables', 'welco_url', 'xxx')
239
    fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
240
    pub.site_options.write(fd)
241
    fd.close()
238
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
239
        pub.site_options.write(fd)
242 240

  
243 241
    create_superuser(pub)
244 242

  
......
272 270

  
273 271

  
274 272
def test_backoffice_csv_export_anonymised(pub):
275
    fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
276
    pub.site_options.write(fd)
277
    fd.close()
273
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
274
        pub.site_options.write(fd)
278 275

  
279 276
    create_superuser(pub)
280 277

  
......
483 480
    assert 'filename=form-title.ods' in resp.headers['content-disposition']
484 481
    assert resp.body[:2] == b'PK'  # ods has a zip container
485 482

  
486
    zipf = zipfile.ZipFile(io.BytesIO(resp.body))
487
    ods_sheet = ET.parse(zipf.open('content.xml'))
483
    with zipfile.ZipFile(io.BytesIO(resp.body)) as zipf:
484
        ods_sheet = ET.parse(zipf.open('content.xml'))
488 485
    # check the ods contains a link to the document
489 486
    elem = ods_sheet.findall('.//{%s}a' % ods.NS['text'])[0]
490 487
    assert (
tests/backoffice_pages/test_submission.py
37 37
    pub.cfg['identification'] = {'methods': ['password']}
38 38
    pub.cfg['language'] = {'language': 'en'}
39 39
    pub.write_cfg()
40
    fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
41
    fd.write(
42
        '''
43
[api-secrets]
44
coucou = 1234
45
'''
46
    )
47
    fd.close()
40
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
41
        fd.write(
42
            '''
43
    [api-secrets]
44
    coucou = 1234
45
    '''
46
        )
48 47

  
49 48
    return pub
50 49

  
tests/form_pages/test_all.py
56 56

  
57 57

  
58 58
def assert_equal_zip(stream1, stream2):
59
    z1 = zipfile.ZipFile(stream1)
60
    z2 = zipfile.ZipFile(stream2)
61
    assert set(z1.namelist()) == set(z2.namelist())
62
    for name in z1.namelist():
63
        if name == 'styles.xml':
64
            continue
65
        if name in ['content.xml', 'meta.xml']:
66
            t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name)))
67
            try:
68
                # >= python 3.8: tostring preserves attribute order; use canonicalize to sort them
69
                t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2)
70
            except AttributeError:
71
                pass
72
        else:
73
            t1, t2 = z1.read(name), z2.read(name)
74
        assert t1 == t2, 'file "%s" differs' % name
59
    with zipfile.ZipFile(stream1) as z1, zipfile.ZipFile(stream2) as z2:
60
        assert set(z1.namelist()) == set(z2.namelist())
61
        for name in z1.namelist():
62
            if name == 'styles.xml':
63
                continue
64
            if name in ['content.xml', 'meta.xml']:
65
                t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name)))
66
                try:
67
                    # >= python 3.8: tostring preserves attribute order; use canonicalize to sort them
68
                    t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2)
69
                except AttributeError:
70
                    pass
71
            else:
72
                t1, t2 = z1.read(name), z2.read(name)
73
            assert t1 == t2, 'file "%s" differs' % name
75 74

  
76 75

  
77 76
def pytest_generate_tests(metafunc):
tests/form_pages/test_formdata.py
60 60

  
61 61

  
62 62
def assert_equal_zip(stream1, stream2):
63
    z1 = zipfile.ZipFile(stream1)
64
    z2 = zipfile.ZipFile(stream2)
65
    assert set(z1.namelist()) == set(z2.namelist())
66
    for name in z1.namelist():
67
        if name == 'styles.xml':
68
            continue
69
        if name in ['content.xml', 'meta.xml']:
70
            t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name)))
71
            try:
72
                # >= python 3.8: tostring preserves attribute order; use canonicalize to sort them
73
                t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2)
74
            except AttributeError:
75
                pass
76
        else:
77
            t1, t2 = z1.read(name), z2.read(name)
78
        assert t1 == t2, 'file "%s" differs' % name
63
    with zipfile.ZipFile(stream1) as z1, zipfile.ZipFile(stream2) as z2:
64
        assert set(z1.namelist()) == set(z2.namelist())
65
        for name in z1.namelist():
66
            if name == 'styles.xml':
67
                continue
68
            if name in ['content.xml', 'meta.xml']:
69
                t1, t2 = ET.tostring(ET.XML(z1.read(name))), ET.tostring(ET.XML(z2.read(name)))
70
                try:
71
                    # >= python 3.8: tostring preserves attribute order; use canonicalize to sort them
72
                    t1, t2 = ET.canonicalize(t1), ET.canonicalize(t2)
73
                except AttributeError:
74
                    pass
75
            else:
76
                t1, t2 = z1.read(name), z2.read(name)
77
            assert t1 == t2, 'file "%s" differs' % name
79 78

  
80 79

  
81 80
def test_formdata_attachment_download(pub):
tests/test_categories.py
123 123
    test.description = 'Hello world'
124 124

  
125 125
    os.mkdir(os.path.join(pub.app_dir, 'categories'))
126
    fd = open(os.path.join(pub.app_dir, 'categories', '1'), 'wb')
127
    pickle.dump(test, fd)
128
    fd.close()
126
    with open(os.path.join(pub.app_dir, 'categories', '1'), 'wb') as fd:
127
        pickle.dump(test, fd)
129 128

  
130 129
    test2 = Category.get(1)
131 130
    assert test.id == test2.id
tests/test_datasource.py
29 29
    req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
30 30
    pub.set_app_dir(req)
31 31

  
32
    open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w').write(
33
        '''
32
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
33
        fd.write(
34
            '''
34 35
[wscall-secrets]
35 36
api.example.com = 1234
36 37
'''
37
    )
38
        )
38 39

  
39 40
    pub.load_site_options()
40 41

  
......
192 193

  
193 194
    # invalid json file
194 195
    get_request().datasources_cache = {}
195
    json_file = open(json_file_path, 'wb')
196
    json_file.write(codecs.encode(b'foobar', 'zlib_codec'))
197
    json_file.close()
196
    with open(json_file_path, 'wb') as json_file:
197
        json_file.write(codecs.encode(b'foobar', 'zlib_codec'))
198 198
    assert data_sources.get_items(datasource) == []
199 199

  
200 200
    # empty json file
201 201
    get_request().datasources_cache = {}
202
    json_file = open(json_file_path, 'w')
203
    json.dump({}, json_file)
204
    json_file.close()
202
    with open(json_file_path, 'w') as json_file:
203
        json.dump({}, json_file)
205 204
    assert data_sources.get_items(datasource) == []
206 205

  
207 206
    # unrelated json file
208 207
    get_request().datasources_cache = {}
209
    json_file = open(json_file_path, 'w')
210
    json.dump('foobar', json_file)
211
    json_file.close()
208
    with open(json_file_path, 'w') as json_file:
209
        json.dump('foobar', json_file)
212 210
    assert data_sources.get_items(datasource) == []
213 211

  
214 212
    # another unrelated json file
215 213
    get_request().datasources_cache = {}
216
    json_file = open(json_file_path, 'w')
217
    json.dump({'data': 'foobar'}, json_file)
218
    json_file.close()
214
    with open(json_file_path, 'w') as json_file:
215
        json.dump({'data': 'foobar'}, json_file)
219 216
    assert data_sources.get_items(datasource) == []
220 217

  
221 218
    # json file not using dictionaries
222 219
    get_request().datasources_cache = {}
223
    json_file = open(json_file_path, 'w')
224
    json.dump({'data': [['1', 'foo'], ['2', 'bar']]}, json_file)
225
    json_file.close()
220
    with open(json_file_path, 'w') as json_file:
221
        json.dump({'data': [['1', 'foo'], ['2', 'bar']]}, json_file)
226 222
    assert data_sources.get_items(datasource) == []
227 223

  
228 224
    # a good json file
229 225
    get_request().datasources_cache = {}
230
    json_file = open(json_file_path, 'w')
231
    json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
232
    json_file.close()
226
    with open(json_file_path, 'w') as json_file:
227
        json.dump({'data': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
233 228
    assert data_sources.get_items(datasource) == [
234 229
        ('1', 'foo', '1', {'id': '1', 'text': 'foo'}),
235 230
        ('2', 'bar', '2', {'id': '2', 'text': 'bar'}),
......
241 236

  
242 237
    # a json file with additional keys
243 238
    get_request().datasources_cache = {}
244
    json_file = open(json_file_path, 'w')
245
    json.dump(
246
        {'data': [{'id': '1', 'text': 'foo', 'more': 'xxx'}, {'id': '2', 'text': 'bar', 'more': 'yyy'}]},
247
        json_file,
248
    )
249
    json_file.close()
239
    with open(json_file_path, 'w') as json_file:
240
        json.dump(
241
            {'data': [{'id': '1', 'text': 'foo', 'more': 'xxx'}, {'id': '2', 'text': 'bar', 'more': 'yyy'}]},
242
            json_file,
243
        )
250 244
    assert data_sources.get_items(datasource) == [
251 245
        ('1', 'foo', '1', {'id': '1', 'text': 'foo', 'more': 'xxx'}),
252 246
        ('2', 'bar', '2', {'id': '2', 'text': 'bar', 'more': 'yyy'}),
......
299 293

  
300 294
    # a json file with integer as 'id'
301 295
    get_request().datasources_cache = {}
302
    json_file = open(json_file_path, 'w')
303
    json.dump({'data': [{'id': 1, 'text': 'foo'}, {'id': 2, 'text': 'bar'}]}, json_file)
304
    json_file.close()
296
    with open(json_file_path, 'w') as json_file:
297
        json.dump({'data': [{'id': 1, 'text': 'foo'}, {'id': 2, 'text': 'bar'}]}, json_file)
305 298
    assert data_sources.get_items(datasource) == [
306 299
        ('1', 'foo', '1', {'id': 1, 'text': 'foo'}),
307 300
        ('2', 'bar', '2', {'id': 2, 'text': 'bar'}),
......
313 306

  
314 307
    # a json file with empty or no text values
315 308
    get_request().datasources_cache = {}
316
    json_file = open(json_file_path, 'w')
317
    json.dump({'data': [{'id': '1', 'text': ''}, {'id': '2'}]}, json_file)
318
    json_file.close()
309
    with open(json_file_path, 'w') as json_file:
310
        json.dump({'data': [{'id': '1', 'text': ''}, {'id': '2'}]}, json_file)
319 311
    assert data_sources.get_items(datasource) == [
320 312
        ('1', '', '1', {'id': '1', 'text': ''}),
321 313
        ('2', '2', '2', {'id': '2', 'text': '2'}),
......
327 319

  
328 320
    # a json file with empty or no id
329 321
    get_request().datasources_cache = {}
330
    json_file = open(json_file_path, 'w')
331
    json.dump({'data': [{'id': '', 'text': 'foo'}, {'text': 'bar'}, {'id': None}]}, json_file)
332
    json_file.close()
322
    with open(json_file_path, 'w') as json_file:
323
        json.dump({'data': [{'id': '', 'text': 'foo'}, {'text': 'bar'}, {'id': None}]}, json_file)
333 324
    assert data_sources.get_items(datasource) == []
334 325
    assert data_sources.get_structured_items(datasource) == []
335 326

  
336 327
    # specify data_attribute
337 328
    datasource = {'type': 'json', 'value': ' {{ json_url }}', 'data_attribute': 'results'}
338 329
    get_request().datasources_cache = {}
339
    json_file = open(json_file_path, 'w')
340
    json.dump({'results': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
341
    json_file.close()
330
    with open(json_file_path, 'w') as json_file:
331
        json.dump({'results': [{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]}, json_file)
342 332
    assert data_sources.get_structured_items(datasource) == [
343 333
        {'id': '1', 'text': 'foo'},
344 334
        {'id': '2', 'text': 'bar'},
......
351 341
    # specify id_attribute
352 342
    datasource = {'type': 'json', 'value': ' {{ json_url }}', 'id_attribute': 'pk'}
353 343
    get_request().datasources_cache = {}
354
    json_file = open(json_file_path, 'w')
355
    json.dump({'data': [{'pk': '1', 'text': 'foo'}, {'pk': '2', 'text': 'bar'}]}, json_file)
356
    json_file.close()
344
    with open(json_file_path, 'w') as json_file:
345
        json.dump({'data': [{'pk': '1', 'text': 'foo'}, {'pk': '2', 'text': 'bar'}]}, json_file)
357 346
    assert data_sources.get_structured_items(datasource) == [
358 347
        {'id': '1', 'text': 'foo', 'pk': '1'},
359 348
        {'id': '2', 'text': 'bar', 'pk': '2'},
......
366 355
    # specify text_attribute
367 356
    datasource = {'type': 'json', 'value': ' {{ json_url }}', 'text_attribute': 'label'}
368 357
    get_request().datasources_cache = {}
369
    json_file = open(json_file_path, 'w')
370
    json.dump({'data': [{'id': '1', 'label': 'foo'}, {'id': '2', 'label': 'bar'}]}, json_file)
371
    json_file.close()
358
    with open(json_file_path, 'w') as json_file:
359
        json.dump({'data': [{'id': '1', 'label': 'foo'}, {'id': '2', 'label': 'bar'}]}, json_file)
372 360
    assert data_sources.get_structured_items(datasource) == [
373 361
        {'id': '1', 'text': 'foo', 'label': 'foo'},
374 362
        {'id': '2', 'text': 'bar', 'label': 'bar'},
......
508 496

  
509 497
    # invalid geojson file
510 498
    get_request().datasources_cache = {}
511
    geojson_file = open(geojson_file_path, 'wb')
512
    geojson_file.write(codecs.encode(b'foobar', 'zlib_codec'))
513
    geojson_file.close()
499
    with open(geojson_file_path, 'wb') as geojson_file:
500
        geojson_file.write(codecs.encode(b'foobar', 'zlib_codec'))
514 501
    assert data_sources.get_items(datasource) == []
515 502

  
516 503
    # empty geojson file
517 504
    get_request().datasources_cache = {}
518
    geojson_file = open(geojson_file_path, 'w')
519
    json.dump({}, geojson_file)
520
    geojson_file.close()
505
    with open(geojson_file_path, 'w') as geojson_file:
506
        json.dump({}, geojson_file)
521 507
    assert data_sources.get_items(datasource) == []
522 508

  
523 509
    # unrelated geojson file
524 510
    get_request().datasources_cache = {}
525
    geojson_file = open(geojson_file_path, 'w')
526
    json.dump('foobar', geojson_file)
527
    geojson_file.close()
511
    with open(geojson_file_path, 'w') as geojson_file:
512
        json.dump('foobar', geojson_file)
528 513
    assert data_sources.get_items(datasource) == []
529 514

  
530 515
    # another unrelated geojson file
531 516
    get_request().datasources_cache = {}
532
    geojson_file = open(geojson_file_path, 'w')
533
    json.dump({'features': 'foobar'}, geojson_file)
534
    geojson_file.close()
517
    with open(geojson_file_path, 'w') as geojson_file:
518
        json.dump({'features': 'foobar'}, geojson_file)
535 519
    assert data_sources.get_items(datasource) == []
536 520

  
537 521
    # a good geojson file
538 522
    get_request().datasources_cache = {}
539
    geojson_file = open(geojson_file_path, 'w')
540
    json.dump(
541
        {
542
            'features': [
543
                {'properties': {'id': '1', 'text': 'foo'}},
544
                {'properties': {'id': '2', 'text': 'bar'}},
545
            ]
546
        },
547
        geojson_file,
548
    )
549
    geojson_file.close()
523
    with open(geojson_file_path, 'w') as geojson_file:
524
        json.dump(
525
            {
526
                'features': [
527
                    {'properties': {'id': '1', 'text': 'foo'}},
528
                    {'properties': {'id': '2', 'text': 'bar'}},
529
                ]
530
            },
531
            geojson_file,
532
        )
550 533
    assert data_sources.get_items(datasource) == [
551 534
        ('1', 'foo', '1', {'id': '1', 'text': 'foo', 'properties': {'id': '1', 'text': 'foo'}}),
552 535
        ('2', 'bar', '2', {'id': '2', 'text': 'bar', 'properties': {'id': '2', 'text': 'bar'}}),
......
558 541

  
559 542
    # a geojson file with additional keys
560 543
    get_request().datasources_cache = {}
561
    geojson_file = open(geojson_file_path, 'w')
562
    json.dump(
563
        {
564
            'features': [
565
                {'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}},
566
                {'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}},
567
            ]
568
        },
569
        geojson_file,
570
    )
571
    geojson_file.close()
544
    with open(geojson_file_path, 'w') as geojson_file:
545
        json.dump(
546
            {
547
                'features': [
548
                    {'properties': {'id': '1', 'text': 'foo', 'more': 'xxx'}},
549
                    {'properties': {'id': '2', 'text': 'bar', 'more': 'yyy'}},
550
                ]
551
            },
552
            geojson_file,
553
        )
572 554
    assert data_sources.get_items(datasource) == [
573 555
        (
574 556
            '1',
......
671 653

  
672 654
    # a geojson file with integer as 'id'
673 655
    get_request().datasources_cache = {}
674
    geojson_file = open(geojson_file_path, 'w')
675
    json.dump(
676
        {'features': [{'properties': {'id': 1, 'text': 'foo'}}, {'properties': {'id': 2, 'text': 'bar'}}]},
677
        geojson_file,
678
    )
679
    geojson_file.close()
656
    with open(geojson_file_path, 'w') as geojson_file:
657
        json.dump(
658
            {
659
                'features': [
660
                    {'properties': {'id': 1, 'text': 'foo'}},
661
                    {'properties': {'id': 2, 'text': 'bar'}},
662
                ]
663
            },
664
            geojson_file,
665
        )
680 666
    assert data_sources.get_items(datasource) == [
681 667
        ('1', 'foo', '1', {'id': 1, 'text': 'foo', 'properties': {'id': 1, 'text': 'foo'}}),
682 668
        ('2', 'bar', '2', {'id': 2, 'text': 'bar', 'properties': {'id': 2, 'text': 'bar'}}),
......
688 674

  
689 675
    # a geojson file with empty or no text values
690 676
    get_request().datasources_cache = {}
691
    geojson_file = open(geojson_file_path, 'w')
692
    json.dump(
693
        {'features': [{'properties': {'id': '1', 'text': ''}}, {'properties': {'id': '2'}}]}, geojson_file
694
    )
695
    geojson_file.close()
677
    with open(geojson_file_path, 'w') as geojson_file:
678
        json.dump(
679
            {'features': [{'properties': {'id': '1', 'text': ''}}, {'properties': {'id': '2'}}]}, geojson_file
680
        )
696 681
    assert data_sources.get_items(datasource) == [
697 682
        ('1', '1', '1', {'id': '1', 'text': '1', 'properties': {'id': '1', 'text': ''}}),
698 683
        ('2', '2', '2', {'id': '2', 'text': '2', 'properties': {'id': '2'}}),
......
704 689

  
705 690
    # a geojson file with empty or no id
706 691
    get_request().datasources_cache = {}
707
    geojson_file = open(geojson_file_path, 'w')
708
    json.dump(
709
        {
710
            'features': [
711
                {'properties': {'id': '', 'text': 'foo'}},
712
                {'properties': {'text': 'bar'}},
713
                {'properties': {'id': None}},
714
            ]
715
        },
716
        geojson_file,
717
    )
718
    geojson_file.close()
692
    with open(geojson_file_path, 'w') as geojson_file:
693
        json.dump(
694
            {
695
                'features': [
696
                    {'properties': {'id': '', 'text': 'foo'}},
697
                    {'properties': {'text': 'bar'}},
698
                    {'properties': {'id': None}},
699
                ]
700
            },
701
            geojson_file,
702
        )
719 703
    assert data_sources.get_items(datasource) == []
720 704
    assert data_sources.get_structured_items(datasource) == []
721 705

  
722 706
    # specify id_property
723 707
    datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}', 'id_property': 'gid'}
724 708
    get_request().datasources_cache = {}
725
    geojson_file = open(geojson_file_path, 'w')
726
    json.dump(
727
        {
728
            'features': [
729
                {'properties': {'gid': '1', 'text': 'foo'}},
730
                {'properties': {'gid': '2', 'text': 'bar'}},
731
            ]
732
        },
733
        geojson_file,
734
    )
735
    geojson_file.close()
709
    with open(geojson_file_path, 'w') as geojson_file:
710
        json.dump(
711
            {
712
                'features': [
713
                    {'properties': {'gid': '1', 'text': 'foo'}},
714
                    {'properties': {'gid': '2', 'text': 'bar'}},
715
                ]
716
            },
717
            geojson_file,
718
        )
736 719
    assert data_sources.get_structured_items(datasource) == [
737 720
        {'id': '1', 'text': 'foo', 'properties': {'gid': '1', 'text': 'foo'}},
738 721
        {'id': '2', 'text': 'bar', 'properties': {'gid': '2', 'text': 'bar'}},
......
745 728

  
746 729
    # check with feature IDs
747 730
    get_request().datasources_cache = {}
748
    geojson_file = open(geojson_file_path, 'w')
749
    json.dump(
750
        {
751
            'features': [
752
                {'id': '1', 'properties': {'text': 'foo'}},
753
                {'id': '2', 'properties': {'text': 'bar'}},
754
            ]
755
        },
756
        geojson_file,
757
    )
758
    geojson_file.close()
731
    with open(geojson_file_path, 'w') as geojson_file:
732
        json.dump(
733
            {
734
                'features': [
735
                    {'id': '1', 'properties': {'text': 'foo'}},
736
                    {'id': '2', 'properties': {'text': 'bar'}},
737
                ]
738
            },
739
            geojson_file,
740
        )
759 741
    assert data_sources.get_structured_items(datasource) == [
760 742
        {'id': '1', 'text': 'foo', 'properties': {'text': 'foo'}},
761 743
        {'id': '2', 'text': 'bar', 'properties': {'text': 'bar'}},
......
768 750
        'label_template_property': '{{ id }}: {{ text }}',
769 751
    }
770 752
    get_request().datasources_cache = {}
771
    geojson_file = open(geojson_file_path, 'w')
772
    json.dump(
773
        {
774
            'features': [
775
                {'properties': {'id': '1', 'text': 'foo'}},
776
                {'properties': {'id': '2', 'text': 'bar'}},
777
            ]
778
        },
779
        geojson_file,
780
    )
781
    geojson_file.close()
753
    with open(geojson_file_path, 'w') as geojson_file:
754
        json.dump(
755
            {
756
                'features': [
757
                    {'properties': {'id': '1', 'text': 'foo'}},
758
                    {'properties': {'id': '2', 'text': 'bar'}},
759
                ]
760
            },
761
            geojson_file,
762
        )
782 763
    assert data_sources.get_structured_items(datasource) == [
783 764
        {'id': '1', 'text': '1: foo', 'properties': {'id': '1', 'text': 'foo'}},
784 765
        {'id': '2', 'text': '2: bar', 'properties': {'id': '2', 'text': 'bar'}},
......
801 782
    # unknown property or empty value
802 783
    datasource = {'type': 'geojson', 'value': ' {{ geojson_url }}', 'label_template_property': '{{ label }}'}
803 784
    get_request().datasources_cache = {}
804
    geojson_file = open(geojson_file_path, 'w')
805
    json.dump(
806
        {
807
            'features': [
808
                {'properties': {'id': '1', 'text': 'foo', 'label': ''}},
809
                {'properties': {'id': '2', 'text': 'bar'}},
810
            ]
811
        },
812
        geojson_file,
813
    )
814
    geojson_file.close()
785
    with open(geojson_file_path, 'w') as geojson_file:
786
        json.dump(
787
            {
788
                'features': [
789
                    {'properties': {'id': '1', 'text': 'foo', 'label': ''}},
790
                    {'properties': {'id': '2', 'text': 'bar'}},
791
                ]
792
            },
793
            geojson_file,
794
        )
815 795
    assert data_sources.get_structured_items(datasource) == [
816 796
        {'id': '1', 'text': '1', 'properties': {'id': '1', 'text': 'foo', 'label': ''}},
817 797
        {'id': '2', 'text': '2', 'properties': {'id': '2', 'text': 'bar'}},
tests/test_ezt.py
217 217

  
218 218
def test_ezt_script(pub):
219 219
    os.mkdir(os.path.join(pub.app_dir, 'scripts'))
220
    fd = open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w')
221
    fd.write('''result = "Hello %s" % ("world" if not args else args[0])''')
222
    fd.close()
220
    with open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') as fd:
221
        fd.write('''result = "Hello %s" % ("world" if not args else args[0])''')
223 222

  
224 223
    vars = {'script': ScriptsSubstitutionProxy()}
225 224
    template = Template()
tests/test_hobo.py
375 375
def test_deploy():
376 376
    cleanup()
377 377
    WcsPublisher.APP_DIR = alt_tempdir
378
    fd = open(os.path.join(alt_tempdir, 'hobo.json'), 'w')
379
    hobo_json = copy.deepcopy(HOBO_JSON)
380
    del hobo_json['services'][1]  # authentic
381
    fd.write(json.dumps(HOBO_JSON))
382
    fd.close()
378
    with open(os.path.join(alt_tempdir, 'hobo.json'), 'w') as fd:
379
        hobo_json = copy.deepcopy(HOBO_JSON)
380
        del hobo_json['services'][1]  # authentic
381
        fd.write(json.dumps(HOBO_JSON))
383 382
    hobo_cmd = CmdCheckHobos()
384 383
    base_options = {}
385 384
    sub_options_class = collections.namedtuple('Options', ['ignore_timestamp', 'redeploy', 'extra'])
......
406 405
def test_configure_postgresql():
407 406
    cleanup()
408 407
    WcsPublisher.APP_DIR = alt_tempdir
409
    fd = open(os.path.join(alt_tempdir, 'hobo.json'), 'w')
410
    hobo_json = copy.deepcopy(HOBO_JSON)
411
    del hobo_json['services'][1]  # authentic
412
    fd.write(json.dumps(HOBO_JSON))
413
    fd.close()
408
    with open(os.path.join(alt_tempdir, 'hobo.json'), 'w') as fd:
409
        hobo_json = copy.deepcopy(HOBO_JSON)
410
        del hobo_json['services'][1]  # authentic
411
        fd.write(json.dumps(HOBO_JSON))
414 412

  
415 413
    service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
416 414

  
......
424 422
    )
425 423
    assert os.path.exists(os.path.join(alt_tempdir, 'wcs.example.net'))
426 424

  
427
    fd = open(os.path.join(alt_tempdir, 'wcs.example.net', 'site-options.cfg'), 'w')
428
    fd.write('[options]\n')
429
    fd.write('postgresql = true\n')
430
    fd.close()
425
    with open(os.path.join(alt_tempdir, 'wcs.example.net', 'site-options.cfg'), 'w') as fd:
426
        fd.write('[options]\n')
427
        fd.write('postgresql = true\n')
431 428

  
432 429
    cleanup()
433 430

  
tests/test_misc.py
209 209
        assert variables['script'].hello_world()
210 210

  
211 211
    os.mkdir(os.path.join(pub.app_dir, 'scripts'))
212
    fd = open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w')
213
    fd.write('"""docstring"""\nresult = "hello world"')
214
    fd.close()
212
    with open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') as fd:
213
        fd.write('"""docstring"""\nresult = "hello world"')
215 214
    assert variables['script'].hello_world() == 'hello world'
216 215

  
217 216
    assert Script('hello_world').__doc__ == 'docstring'
218 217

  
219 218
    os.mkdir(os.path.join(pub.APP_DIR, 'scripts'))
220
    fd = open(os.path.join(pub.APP_DIR, 'scripts', 'hello_world.py'), 'w')
221
    fd.write('result = "hello global world"')
222
    fd.close()
219
    with open(os.path.join(pub.APP_DIR, 'scripts', 'hello_world.py'), 'w') as fd:
220
        fd.write('result = "hello global world"')
223 221
    assert variables['script'].hello_world() == 'hello world'
224 222

  
225 223
    os.unlink(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'))
226 224
    assert variables['script'].hello_world() == 'hello global world'
227 225

  
228
    fd = open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w')
229
    fd.write('result = site_url')
230
    fd.close()
226
    with open(os.path.join(pub.app_dir, 'scripts', 'hello_world.py'), 'w') as fd:
227
        fd.write('result = site_url')
231 228
    assert variables['script'].hello_world() == 'http://example.net'
232 229

  
233 230

  
......
404 401
    if not pub.site_options.has_section('options'):
405 402
        pub.site_options.add_section('options')
406 403
    pub.site_options.set('options', 'default-page-size', '500')
407
    fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
408
    pub.site_options.write(fd)
409
    fd.close()
404
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
405
        pub.site_options.write(fd)
410 406
    assert '(1-101/1000)' in get_texts(pagination_links(0, 101, 1000))
411 407
    assert '(1-500/1000)' in get_texts(pagination_links(0, 500, 1000))
412 408
    assert '(1-500/1000)' in get_texts(pagination_links(0, 501, 1000))  # 500 is the max
......
705 701
def test_find_vc_version():
706 702
    import wcs.qommon.admin.menu
707 703

  
708
    with mock.patch('os.path.exists') as os_path_exists, mock.patch('subprocess.Popen') as popen:
709

  
710
        def mocked_os_path_exists(path):
711
            return bool(not path.endswith('setup.py'))
712

  
713
        os_path_exists.side_effect = mocked_os_path_exists
714

  
715
        def mocked_popen(*args, **kwargs):
716
            class Process:
717
                returncode = 0
704
    def mocked_popen(*args, **kwargs):
705
        class Process:
706
            returncode = 0
718 707

  
719
                def communicate(self):
720
                    return (
721
                        b'''Desired=Unknown/Install/Remove/Purge/Hold
708
            def communicate(self):
709
                return (
710
                    b'''Desired=Unknown/Install/Remove/Purge/Hold
722 711
| Status=Not/Inst/Conf-files/Unpacked/halF-conf/Half-inst/trig-aWait/Trig-pend
723 712
|/ Err?=(none)/Reinst-required (Status,Err: uppercase=bad)
724 713
||/ Name           Version         Architecture Description
725 714
+++-==============-===============-============-=================================================
726 715
ii  wcs            5.71-1~eob100+1 all          web application to design and set up online forms
727 716
''',
728
                        '',
729
                    )
717
                    '',
718
                )
730 719

  
731
            return Process()
720
        return Process()
721

  
722
    with mock.patch('os.path.exists') as os_path_exists, mock.patch('subprocess.Popen') as popen:
723

  
724
        def mocked_os_path_exists(path):
725
            return bool(not path.endswith('setup.py'))
726

  
727
        os_path_exists.side_effect = mocked_os_path_exists
732 728

  
733
        popen.side_effect = mocked_popen
729
        handle = mock.MagicMock()
730
        handle.__enter__.side_effect = mocked_popen
731
        popen.return_value = handle
734 732

  
735 733
        version = wcs.qommon.admin.menu._find_vc_version()
736 734
        assert version == 'wcs 5.71-1~eob100+1 (Debian)'
tests/test_publisher.py
177 177
    pub.write_cfg()
178 178

  
179 179
    c = io.BytesIO()
180
    z = zipfile.ZipFile(c, 'w')
181
    z.writestr('config.pck', pickle.dumps({'language': {'language': 'fr'}, 'whatever': ['a', 'b', 'c']}))
182
    z.close()
180
    with zipfile.ZipFile(c, 'w') as z:
181
        z.writestr('config.pck', pickle.dumps({'language': {'language': 'fr'}, 'whatever': ['a', 'b', 'c']}))
183 182
    c.seek(0)
184 183

  
185 184
    pub.import_zip(c)
......
188 187
    assert pub.cfg['sp'] == {'what': 'ever'}
189 188

  
190 189
    c = io.BytesIO()
191
    z = zipfile.ZipFile(c, 'w')
192
    z.writestr(
193
        'config.json', json.dumps({'language': {'language': 'en'}, 'whatever2': ['a', 'b', {'c': 'd'}]})
194
    )
195
    z.close()
190
    with zipfile.ZipFile(c, 'w') as z:
191
        z.writestr(
192
            'config.json', json.dumps({'language': {'language': 'en'}, 'whatever2': ['a', 'b', {'c': 'd'}]})
193
        )
196 194
    c.seek(0)
197 195

  
198 196
    pub.import_zip(c)
tests/test_saml_auth.py
81 81
            'role': lasso.PROVIDER_ROLE_IDP,
82 82
        }
83 83
        filename = pub.cfg['idp'][base_id]['metadata']
84
        fd = open(os.path.join(pub.app_dir, filename), 'w')
85
        fd.write(metadata)
86
        fd.close()
84
        with open(os.path.join(pub.app_dir, filename), 'w') as fd:
85
            fd.write(metadata)
87 86

  
88 87
        filename = pub.cfg['idp'][base_id]['publickey']
89
        fd = open(os.path.join(pub.app_dir, filename), 'w')
90
        fd.write(idp_publickey)
91
        fd.close()
88
        with open(os.path.join(pub.app_dir, filename), 'w') as fd:
89
            fd.write(idp_publickey)
92 90

  
93 91
        filename = pub.cfg['idp'][base_id]['publickey'].replace('public', 'private')
94
        fd = open(os.path.join(pub.app_dir, filename), 'w')
95
        fd.write(idp_privatekey)
96
        fd.close()
92
        with open(os.path.join(pub.app_dir, filename), 'w') as fd:
93
            fd.write(idp_privatekey)
97 94

  
98 95
    pub.write_cfg()
99 96

  
tests/test_workflows.py
3506 3506
    item.perform(formdata)
3507 3507

  
3508 3508
    assert formdata.evolution[-1].parts[-1].base_filename == 'template.odt'
3509
    zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
3510
    zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
3509
    with zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r') as zfile:
3510
        zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
3511 3511
    # check the image has been replaced by the one from the formdata
3512 3512
    assert zinfo.file_size == len(image_data)
3513 3513

  
......
3521 3521

  
3522 3522
        item.perform(formdata)
3523 3523

  
3524
        zfile = zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r')
3525
        zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
3524
        with zipfile.ZipFile(formdata.evolution[-1].parts[0].filename, mode='r') as zfile:
3525
            zinfo = zfile.getinfo('Pictures/10000000000000320000003276E9D46581B55C88.jpg')
3526 3526
        # check the original image has been left
3527 3527
        assert zinfo.file_size == 580
3528 3528

  
......
3574 3574
    fbo1 = formdata.data['bo1']
3575 3575
    assert fbo1.base_filename == 'template.odt'
3576 3576
    assert fbo1.content_type == 'application/octet-stream'
3577
    zfile = zipfile.ZipFile(fbo1.get_file())
3578
    assert b'foo-export-to-bofile' in zfile.read('content.xml')
3577
    with zipfile.ZipFile(fbo1.get_file()) as zfile:
3578
        assert b'foo-export-to-bofile' in zfile.read('content.xml')
3579 3579

  
3580 3580
    # no more 'bo1' backoffice field: do nothing
3581 3581
    formdata = formdef.data_class()()
tests/utilities.py
108 108
        created = True
109 109

  
110 110
    # always reset site-options.cfg
111
    fd = open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w')
112
    fd.write('[wscall-secrets]\n')
113
    fd.write('idp.example.net = BAR\n')
114
    fd.write('\n')
115
    fd.write('[options]\n')
116
    fd.write('formdef-captcha-option = true\n')
117
    fd.write('formdef-appearance-keywords = true\n')
118
    fd.write('workflow-resubmit-action = true\n')
119
    if lazy_mode:
120
        fd.write('force-lazy-mode = true\n')
121
    if sql_mode:
122
        fd.write('postgresql = true\n')
123
    fd.close()
111
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
112
        fd.write('[wscall-secrets]\n')
113
        fd.write('idp.example.net = BAR\n')
114
        fd.write('\n')
115
        fd.write('[options]\n')
116
        fd.write('formdef-captcha-option = true\n')
117
        fd.write('formdef-appearance-keywords = true\n')
118
        fd.write('workflow-resubmit-action = true\n')
119
        if lazy_mode:
120
            fd.write('force-lazy-mode = true\n')
121
        if sql_mode:
122
            fd.write('postgresql = true\n')
124 123

  
125 124
    # make sure site options are not cached
126 125
    pub.site_options = None
wcs/admin/forms.py
1519 1519
                    all_forms = [x for x in all_forms if x.last_update_time < date]
1520 1520

  
1521 1521
                self.fd = io.BytesIO()
1522
                t = tarfile.open('wcs.tar.gz', 'w:gz', fileobj=self.fd)
1523
                t.add(self.formdef.get_object_filename(), 'formdef')
1524
                for formdata in all_forms:
1525
                    t.add(formdata.get_object_filename(), '%s/%s' % (self.formdef.url_name, str(formdata.id)))
1526
                t.close()
1522
                with tarfile.open('wcs.tar.gz', 'w:gz', fileobj=self.fd) as t:
1523
                    t.add(self.formdef.get_object_filename(), 'formdef')
1524
                    for formdata in all_forms:
1525
                        t.add(
1526
                            formdata.get_object_filename(),
1527
                            '%s/%s' % (self.formdef.url_name, str(formdata.id)),
1528
                        )
1527 1529

  
1528 1530
                if form.get_widget('keep').parse() is False:
1529 1531
                    for f in all_forms:
wcs/admin/settings.py
848 848

  
849 849
        parent_theme_directory = os.path.dirname(theme_directory)
850 850
        c = io.BytesIO()
851
        z = zipfile.ZipFile(c, 'w')
852
        for base, dummy, filenames in os.walk(theme_directory):
853
            basetheme = base[len(parent_theme_directory) + 1 :]
854
            for filename in filenames:
855
                z.write(os.path.join(base, filename), os.path.join(basetheme, filename))
856
        z.close()
851
        with zipfile.ZipFile(c, 'w') as z:
852
            for base, dummy, filenames in os.walk(theme_directory):
853
                basetheme = base[len(parent_theme_directory) + 1 :]
854
                for filename in filenames:
855
                    z.write(os.path.join(base, filename), os.path.join(basetheme, filename))
857 856

  
858 857
        response = get_response()
859 858
        response.set_content_type('application/zip')
......
896 895

  
897 896
    def install_theme_from_file(self, fp):
898 897
        try:
899
            z = zipfile.ZipFile(fp, 'r')
898
            with zipfile.ZipFile(fp, 'r') as z:
899
                theme_dir = os.path.join(get_publisher().app_dir, 'themes')
900
                filename_list = [x for x in z.namelist() if x[0] != '/' and x[-1] != '/']
901
                if len(filename_list) == 0:
902
                    get_session().message = ('error', _('Empty theme file.'))
903
                    return redirect('themes')
904
                theme_name = filename_list[0].split('/')[0]
905
                if ('%s/desc.xml' % theme_name) not in filename_list:
906
                    get_session().message = ('error', _('Theme is missing a desc.xml file.'))
907
                    return redirect('themes')
908
                desc_xml = z.read('%s/desc.xml' % theme_name)
909
                theme_dict = template.get_theme_dict(io.StringIO(force_text(desc_xml)))
910
                if theme_dict.get('name') != theme_name:
911
                    get_session().message = ('error', _('desc.xml is missing a name attribute.'))
912
                    return redirect('themes')
913
                if os.path.exists(os.path.join(theme_dir, theme_name)):
914
                    shutil.rmtree(os.path.join(theme_dir, theme_name))
915
                for f in z.namelist():
916
                    if f[-1] == '/':
917
                        continue
918
                    path = os.path.join(theme_dir, f)
919
                    data = z.read(f)
920
                    if not os.path.exists(os.path.dirname(path)):
921
                        os.makedirs(os.path.dirname(path))
922
                    with open(path, 'wb') as _f:
923
                        _f.write(data)
924
                return redirect('themes')
900 925
        except Exception as e:
901 926
            get_session().message = ('error', _('Failed to read theme file.  (%s)') % str(e))
902 927
            return redirect('themes')
903
        theme_dir = os.path.join(get_publisher().app_dir, 'themes')
904
        filename_list = [x for x in z.namelist() if x[0] != '/' and x[-1] != '/']
905
        if len(filename_list) == 0:
906
            get_session().message = ('error', _('Empty theme file.'))
907
            return redirect('themes')
908
        theme_name = filename_list[0].split('/')[0]
909
        if ('%s/desc.xml' % theme_name) not in filename_list:
910
            get_session().message = ('error', _('Theme is missing a desc.xml file.'))
911
            return redirect('themes')
912
        desc_xml = z.read('%s/desc.xml' % theme_name)
913
        theme_dict = template.get_theme_dict(io.StringIO(force_text(desc_xml)))
914
        if theme_dict.get('name') != theme_name:
915
            get_session().message = ('error', _('desc.xml is missing a name attribute.'))
916
            return redirect('themes')
917
        if os.path.exists(os.path.join(theme_dir, theme_name)):
918
            shutil.rmtree(os.path.join(theme_dir, theme_name))
919
        for f in z.namelist():
920
            if f[-1] == '/':
921
                continue
922
            path = os.path.join(theme_dir, f)
923
            data = z.read(f)
924
            if not os.path.exists(os.path.dirname(path)):
925
                os.makedirs(os.path.dirname(path))
926
            open(path, 'wb').write(data)
927
        z.close()
928
        return redirect('themes')
929 928

  
930 929
    def install_theme_from_url(self, url):
931 930
        try:
......
1028 1027

  
1029 1028
            def export(self, job):
1030 1029
                c = io.BytesIO()
1031
                z = zipfile.ZipFile(c, 'w')
1032
                for d in self.dirs:
1033
                    if d not in (
1034
                        'categories',
1035
                        'carddef_categories',
1036
                        'wscalls',
1037
                        'mail-templates',
1038
                        'apiaccess',
1039
                    ):
1040
                        continue
1041
                    path = os.path.join(self.app_dir, d)
1042
                    if not os.path.exists(path):
1043
                        continue
1044
                    for f in os.listdir(path):
1045
                        if f in ('.indexes', '.max_id'):
1030
                with zipfile.ZipFile(c, 'w') as z:
1031
                    for d in self.dirs:
1032
                        if d not in (
1033
                            'categories',
1034
                            'carddef_categories',
1035
                            'wscalls',
1036
                            'mail-templates',
1037
                            'apiaccess',
1038
                        ):
1046 1039
                            continue
1047
                        z.write(os.path.join(path, f), os.path.join(d, f))
1048
                if 'datasources' in self.dirs:
1049
                    for ds in NamedDataSource.select():
1050
                        if ds.external == 'agenda':
1040
                        path = os.path.join(self.app_dir, d)
1041
                        if not os.path.exists(path):
1051 1042
                            continue
1052
                        node = ds.export_to_xml(include_id=True)
1053
                        misc.indent_xml(node)
1054
                        z.writestr(
1055
                            os.path.join('datasources', str(ds.id)),
1056
                            ET.tostring(node),
1057
                        )
1058
                if 'formdefs' in self.dirs:
1059
                    for formdef in FormDef.select():
1060
                        node = formdef.export_to_xml(include_id=True)
1061
                        misc.indent_xml(node)
1062
                        z.writestr(
1063
                            os.path.join('formdefs_xml', str(formdef.id)),
1064
                            b'<?xml version="1.0"?>\n' + ET.tostring(node),
1065
                        )
1066
                if 'carddefs' in self.dirs:
1067
                    for formdef in CardDef.select():
1068
                        node = formdef.export_to_xml(include_id=True)
1069
                        misc.indent_xml(node)
1070
                        z.writestr(
1071
                            os.path.join('carddefs_xml', str(formdef.id)),
1072
                            b'<?xml version="1.0"?>\n' + ET.tostring(node),
1073
                        )
1074
                if 'workflows' in self.dirs:
1075
                    for workflow in Workflow.select():
1076
                        node = workflow.export_to_xml(include_id=True)
1077
                        misc.indent_xml(node)
1078
                        z.writestr(
1079
                            os.path.join('workflows_xml', str(workflow.id)),
1080
                            b'<?xml version="1.0"?>\n' + ET.tostring(node),
1081
                        )
1082
                if 'blockdefs' in self.dirs:
1083
                    for blockdef in BlockDef.select():
1084
                        node = blockdef.export_to_xml(include_id=True)
1085
                        misc.indent_xml(node)
1086
                        z.writestr(
1087
                            os.path.join('blockdefs_xml', str(blockdef.id)),
1088
                            b'<?xml version="1.0"?>\n' + ET.tostring(node),
1089
                        )
1090
                if 'roles' in self.dirs:
1091
                    for role in get_publisher().role_class.select():
1092
                        node = role.export_to_xml(include_id=True)
1093
                        misc.indent_xml(node)
1094
                        z.writestr(
1095
                            os.path.join('roles_xml', str(role.id)),
1096
                            b'<?xml version="1.0"?>\n' + ET.tostring(node),
1097
                        )
1098

  
1099
                if self.settings:
1100
                    z.write(os.path.join(self.app_dir, 'config.pck'), 'config.pck')
1101
                    for f in os.listdir(self.app_dir):
1102
                        if f.startswith('idp-') and os.path.splitext(f)[-1] in ('.pem', '.xml'):
1103
                            z.write(os.path.join(self.app_dir, f), f)
1104
                    if os.path.exists(os.path.join(self.app_dir, 'config')):
1105
                        for f in os.listdir(os.path.join(self.app_dir, 'config')):
1106
                            z.write(os.path.join(self.app_dir, 'config', f), os.path.join('config', f))
1107
                z.close()
1043
                        for f in os.listdir(path):
1044
                            if f in ('.indexes', '.max_id'):
1045
                                continue
1046
                            z.write(os.path.join(path, f), os.path.join(d, f))
1047
                    if 'datasources' in self.dirs:
1048
                        for ds in NamedDataSource.select():
1049
                            if ds.external == 'agenda':
1050
                                continue
1051
                            node = ds.export_to_xml(include_id=True)
1052
                            misc.indent_xml(node)
1053
                            z.writestr(
1054
                                os.path.join('datasources', str(ds.id)),
1055
                                ET.tostring(node),
1056
                            )
1057
                    if 'formdefs' in self.dirs:
1058
                        for formdef in FormDef.select():
1059
                            node = formdef.export_to_xml(include_id=True)
1060
                            misc.indent_xml(node)
1061
                            z.writestr(
1062
                                os.path.join('formdefs_xml', str(formdef.id)),
1063
                                b'<?xml version="1.0"?>\n' + ET.tostring(node),
1064
                            )
1065
                    if 'carddefs' in self.dirs:
1066
                        for formdef in CardDef.select():
1067
                            node = formdef.export_to_xml(include_id=True)
1068
                            misc.indent_xml(node)
1069
                            z.writestr(
1070
                                os.path.join('carddefs_xml', str(formdef.id)),
1071
                                b'<?xml version="1.0"?>\n' + ET.tostring(node),
1072
                            )
1073
                    if 'workflows' in self.dirs:
1074
                        for workflow in Workflow.select():
1075
                            node = workflow.export_to_xml(include_id=True)
1076
                            misc.indent_xml(node)
1077
                            z.writestr(
1078
                                os.path.join('workflows_xml', str(workflow.id)),
1079
                                b'<?xml version="1.0"?>\n' + ET.tostring(node),
1080
                            )
1081
                    if 'blockdefs' in self.dirs:
1082
                        for blockdef in BlockDef.select():
1083
                            node = blockdef.export_to_xml(include_id=True)
1084
                            misc.indent_xml(node)
1085
                            z.writestr(
1086
                                os.path.join('blockdefs_xml', str(blockdef.id)),
1087
                                b'<?xml version="1.0"?>\n' + ET.tostring(node),
1088
                            )
1089
                    if 'roles' in self.dirs:
1090
                        for role in get_publisher().role_class.select():
1091
                            node = role.export_to_xml(include_id=True)
1092
                            misc.indent_xml(node)
1093
                            z.writestr(
1094
                                os.path.join('roles_xml', str(role.id)),
1095
                                b'<?xml version="1.0"?>\n' + ET.tostring(node),
1096
                            )
1097

  
1098
                    if self.settings:
1099
                        z.write(os.path.join(self.app_dir, 'config.pck'), 'config.pck')
1100
                        for f in os.listdir(self.app_dir):
1101
                            if f.startswith('idp-') and os.path.splitext(f)[-1] in ('.pem', '.xml'):
1102
                                z.write(os.path.join(self.app_dir, f), f)
1103
                        if os.path.exists(os.path.join(self.app_dir, 'config')):
1104
                            for f in os.listdir(os.path.join(self.app_dir, 'config')):
1105
                                z.write(os.path.join(self.app_dir, 'config', f), os.path.join('config', f))
1108 1106

  
1109 1107
                job.file_content = c.getvalue()
1110 1108
                job.store()
wcs/admin/workflows.py
275 275
    out = out.getvalue()
276 276
    if svg:
277 277
        try:
278
            process = Popen(['dot', '-Tsvg'], stdin=PIPE, stdout=PIPE)
279
            out = process.communicate(force_bytes(out))[0]
280
            if process.returncode != 0:
281
                return ''
278
            with Popen(['dot', '-Tsvg'], stdin=PIPE, stdout=PIPE) as process:
279
                out = process.communicate(force_bytes(out))[0]
280
                if process.returncode != 0:
281
                    return ''
282 282
        except OSError:
283 283
            return ''
284 284
        out = graphviz_post_treatment(out, revert_colours, include=include)
wcs/backoffice/management.py
2997 2997
    def download_as_zip(self):
2998 2998
        formdata = self.filled
2999 2999
        zip_content = io.BytesIO()
3000
        zip_file = zipfile.ZipFile(zip_content, 'w')
3001 3000
        counter = {'value': 0}
3002 3001

  
3003
        def add_zip_file(upload):
3002
        def add_zip_file(upload, zip_file):
3004 3003
            counter['value'] += 1
3005 3004
            filename = '%s_%s' % (counter['value'], upload.base_filename)
3006 3005
            zip_file.writestr(filename, upload.get_content())
3007 3006

  
3008
        for value in formdata.data.values():
3009
            if isinstance(value, PicklableUpload):
3010
                add_zip_file(value)
3011
            if isinstance(value, dict) and isinstance(value.get('data'), list):
3012
                for subvalue in value.get('data'):
3013
                    for subvalue_elem in subvalue.values():
3014
                        if isinstance(subvalue_elem, PicklableUpload):
3015
                            add_zip_file(subvalue_elem)
3016

  
3017
        zip_file.close()
3007
        with zipfile.ZipFile(zip_content, 'w') as zip_file:
3008
            for value in formdata.data.values():
3009
                if isinstance(value, PicklableUpload):
3010
                    add_zip_file(value, zip_file)
3011
                if isinstance(value, dict) and isinstance(value.get('data'), list):
3012
                    for subvalue in value.get('data'):
3013
                        for subvalue_elem in subvalue.values():
3014
                            if isinstance(subvalue_elem, PicklableUpload):
3015
                                add_zip_file(subvalue_elem, zip_file)
3018 3016

  
3019 3017
        response = get_response()
3020 3018
        response.set_content_type('application/zip')
wcs/ctl/backup.py
48 48
                os.mkdir(backup_dir)
49 49
            backup_filepath = os.path.join(backup_dir, 'backup-%s%s%s-%s%s%s.tar.gz' % time.localtime()[:6])
50 50

  
51
        backup = tarfile.open(backup_filepath, mode='w:gz')
52
        for basename, dirnames, filenames in os.walk(pub.app_dir):
53
            if 'backups' in dirnames:  # do not recurse in backup directory
54
                idx = dirnames.index('backups')
55
                dirnames[idx : idx + 1] = []
56
            for filename in filenames:
57
                backup.add(
58
                    os.path.join(basename, filename), os.path.join(basename, filename)[len(pub.app_dir) :]
59
                )
60

  
61
        backup.close()
51
        with tarfile.open(backup_filepath, mode='w:gz') as backup:
52
            for basename, dirnames, filenames in os.walk(pub.app_dir):
53
                if 'backups' in dirnames:  # do not recurse in backup directory
54
                    idx = dirnames.index('backups')
55
                    dirnames[idx : idx + 1] = []
56
                for filename in filenames:
57
                    backup.add(
58
                        os.path.join(basename, filename), os.path.join(basename, filename)[len(pub.app_dir) :]
59
                    )
62 60

  
63 61

  
64 62
CmdBackup.register()
wcs/ctl/management/commands/convert_to_sql.py
100 100
        sql.SqlUser.fix_sequences()
101 101

  
102 102
        if errors:
103
            error_log = open('error_user.log', 'w')
104
            for user, trace in errors:
105
                error_log.write('user_id %s\n' % user.id)
106
                error_log.write(trace)
107
                error_log.write('-' * 80)
108
                error_log.write('\n\n')
109
            error_log.close()
103
            with open('error_user.log', 'w') as error_log:
104
                for user, trace in errors:
105
                    error_log.write('user_id %s\n' % user.id)
106
                    error_log.write(trace)
107
                    error_log.write('-' * 80)
108
                    error_log.write('\n\n')
110 109
            print('There were some errors, see error_user.log for details.')
111 110

  
112 111
    def store_forms(self):
......
138 137
            sql_data_class.fix_sequences()
139 138

  
140 139
        if errors:
141
            error_log = open('error_formdata.log', 'w')
142
            for formdata, trace in errors:
143
                error_log.write(
144
                    '%s %s - %s\n' % (formdata.formdef, formdata.id, localstrftime(formdata.receipt_time))
145
                )
146
                error_log.write(trace)
147
                error_log.write('-' * 80)
148
                error_log.write('\n\n')
149
            error_log.close()
140
            with open('error_formdata.log', 'w') as error_log:
141
                for formdata, trace in errors:
142
                    error_log.write(
143
                        '%s %s - %s\n' % (formdata.formdef, formdata.id, localstrftime(formdata.receipt_time))
144
                    )
145
                    error_log.write(trace)
146
                    error_log.write('-' * 80)
147
                    error_log.write('\n\n')
150 148
            print('There were some errors, see error_formdata.log.')
151 149

  
152 150
    def update_progress(self, progress, num_columns=120):
wcs/ctl/restore.py
50 50
            return 1
51 51
        backup_filepath = sub_options.filename
52 52

  
53
        backup = tarfile.open(backup_filepath, mode='r:*')
54
        for tarinfo in backup:
55
            if os.path.normpath(tarinfo.name).startswith('..'):
56
                continue
57
            backup.extract(tarinfo, pub.app_dir)
58
        backup.close()
53
        with tarfile.open(backup_filepath, mode='r:*') as backup:
54
            for tarinfo in backup:
55
                if os.path.normpath(tarinfo.name).startswith('..'):
56
                    continue
57
                backup.extract(tarinfo, pub.app_dir)
59 58

  
60 59

  
61 60
CmdRestore.register()
wcs/publisher.py
162 162
        self.set_session_manager(self.session_manager_class(session_class=self.session_class))
163 163

  
164 164
    def import_zip(self, fd):
165
        z = zipfile.ZipFile(fd)
166 165
        results = {
167 166
            'formdefs': 0,
168 167
            'carddefs': 0,
......
203 202
                rv[key] = value
204 203
            return rv
205 204

  
206
        for f in z.namelist():
207
            if f in ('.indexes', '.max_id'):
208
                continue
209
            if os.path.dirname(f) in (
210
                'formdefs_xml',
211
                'carddefs_xml',
212
                'workflows_xml',
213
                'blockdefs_xml',
214
                'roles_xml',
215
            ):
216
                continue
217
            path = os.path.join(self.app_dir, f)
218
            if not os.path.exists(os.path.dirname(path)):
219
                os.mkdir(os.path.dirname(path))
220
            if not os.path.basename(f):
221
                # skip directories
222
                continue
223
            data = z.read(f)
224
            if f in ('config.pck', 'config.json'):
225
                results['settings'] = 1
226
                if f == 'config.pck':
227
                    d = pickle.loads(data)
228
                else:
229
                    d = json.loads(force_text(data), object_hook=_decode_dict)
230
                if 'sp' in self.cfg:
231
                    current_sp = self.cfg['sp']
232
                else:
233
                    current_sp = None
234
                self.cfg = d
235
                if current_sp:
236
                    self.cfg['sp'] = current_sp
237
                elif 'sp' in self.cfg:
238
                    del self.cfg['sp']
239
                self.write_cfg()
240
                continue
241
            open(path, 'wb').write(data)
242
            if os.path.split(f)[0] in results:
243
                results[os.path.split(f)[0]] += 1
244

  
245
        # second pass, fields blocks
246
        from wcs.blocks import BlockDef
247

  
248
        for f in z.namelist():
249
            if os.path.dirname(f) == 'blockdefs_xml' and os.path.basename(f):
250
                blockdef = BlockDef.import_from_xml(z.open(f), include_id=True)
251
                blockdef.store()
252
                results['blockdefs'] += 1
253

  
254
        # third pass, workflows
255
        from wcs.workflows import Workflow
256

  
257
        for f in z.namelist():
258
            if os.path.dirname(f) == 'workflows_xml' and os.path.basename(f):
259
                workflow = Workflow.import_from_xml(z.open(f), include_id=True, check_datasources=False)
260
                workflow.store()
261
                results['workflows'] += 1
262

  
263
        # fourth pass, forms and cards
264
        from wcs.carddef import CardDef
265
        from wcs.formdef import FormDef
266

  
267
        formdefs = []
268
        carddefs = []
269
        for f in z.namelist():
270
            if os.path.dirname(f) == 'formdefs_xml' and os.path.basename(f):
271
                formdef = FormDef.import_from_xml(z.open(f), include_id=True, check_datasources=False)
272
                formdef.store()
273
                formdefs.append(formdef)
274
                results['formdefs'] += 1
275
            if os.path.dirname(f) == 'carddefs_xml' and os.path.basename(f):
276
                carddef = CardDef.import_from_xml(z.open(f), include_id=True, check_datasources=False)
277
                carddef.store()
278
                carddefs.append(carddef)
279
                results['carddefs'] += 1
280

  
281
        # sixth pass, roles
282
        roles = []
283
        for f in z.namelist():
284
            if os.path.dirname(f) == 'roles_xml' and os.path.basename(f):
285
                role = self.role_class.import_from_xml(z.open(f), include_id=True)
286
                role.store()
287
                roles.append(role)
288
                results['roles'] += 1
289

  
290
        # rebuild indexes for imported objects
291
        for k, v in results.items():
292
            if k == 'settings':
293
                continue
294
            if v == 0:
295
                continue
296
            klass = None
297
            if k == 'formdefs':
298
                from .formdef import FormDef
299

  
300
                klass = FormDef
301
            elif k == 'carddefs':
302
                from .carddef import CardDef
303

  
304
                klass = CardDef
305
            elif k == 'blockdefs':
306
                klass = BlockDef
307
            elif k == 'categories':
308
                from .categories import Category
309

  
310
                klass = Category
311
            elif k == 'roles':
312
                klass = self.role_class
313
            elif k == 'workflows':
314
                klass = Workflow
315
            if klass:
316
                klass.rebuild_indexes()
317

  
318
            if k == 'formdefs':
319
                # in case of formdefs, we store them anew in case SQL changes
320
                # are required.
321
                for formdef in formdefs or FormDef.select():
205
        with zipfile.ZipFile(fd) as z:
206
            for f in z.namelist():
207
                if f in ('.indexes', '.max_id'):
208
                    continue
209
                if os.path.dirname(f) in (
210
                    'formdefs_xml',
211
                    'carddefs_xml',
212
                    'workflows_xml',
213
                    'blockdefs_xml',
214
                    'roles_xml',
215
                ):
216
                    continue
217
                path = os.path.join(self.app_dir, f)
218
                if not os.path.exists(os.path.dirname(path)):
219
                    os.mkdir(os.path.dirname(path))
220
                if not os.path.basename(f):
221
                    # skip directories
222
                    continue
223
                data = z.read(f)
224
                if f in ('config.pck', 'config.json'):
225
                    results['settings'] = 1
226
                    if f == 'config.pck':
227
                        d = pickle.loads(data)
228
                    else:
229
                        d = json.loads(force_text(data), object_hook=_decode_dict)
230
                    if 'sp' in self.cfg:
231
                        current_sp = self.cfg['sp']
232
                    else:
233
                        current_sp = None
234
                    self.cfg = d
235
                    if current_sp:
236
                        self.cfg['sp'] = current_sp
237
                    elif 'sp' in self.cfg:
238
                        del self.cfg['sp']
239
                    self.write_cfg()
240
                    continue
241
                open(path, 'wb').write(data)
242
                if os.path.split(f)[0] in results:
243
                    results[os.path.split(f)[0]] += 1
244

  
245
            # second pass, fields blocks
246
            from wcs.blocks import BlockDef
247

  
248
            for f in z.namelist():
249
                if os.path.dirname(f) == 'blockdefs_xml' and os.path.basename(f):
250
                    blockdef = BlockDef.import_from_xml(z.open(f), include_id=True)
251
                    blockdef.store()
252
                    results['blockdefs'] += 1
253

  
254
            # third pass, workflows
255
            from wcs.workflows import Workflow
256

  
257
            for f in z.namelist():
258
                if os.path.dirname(f) == 'workflows_xml' and os.path.basename(f):
259
                    workflow = Workflow.import_from_xml(z.open(f), include_id=True, check_datasources=False)
260
                    workflow.store()
261
                    results['workflows'] += 1
262

  
263
            # fourth pass, forms and cards
264
            from wcs.carddef import CardDef
265
            from wcs.formdef import FormDef
266

  
267
            formdefs = []
268
            carddefs = []
269
            for f in z.namelist():
270
                if os.path.dirname(f) == 'formdefs_xml' and os.path.basename(f):
271
                    formdef = FormDef.import_from_xml(z.open(f), include_id=True, check_datasources=False)
322 272
                    formdef.store()
323
            elif k == 'carddefs':
324
                # ditto for cards
325
                for carddef in carddefs or CardDef.select():
273
                    formdefs.append(formdef)
274
                    results['formdefs'] += 1
275
                if os.path.dirname(f) == 'carddefs_xml' and os.path.basename(f):
276
                    carddef = CardDef.import_from_xml(z.open(f), include_id=True, check_datasources=False)
326 277
                    carddef.store()
278
                    carddefs.append(carddef)
279
                    results['carddefs'] += 1
280

  
281
            # sixth pass, roles
282
            roles = []
283
            for f in z.namelist():
284
                if os.path.dirname(f) == 'roles_xml' and os.path.basename(f):
285
                    role = self.role_class.import_from_xml(z.open(f), include_id=True)
286
                    role.store()
287
                    roles.append(role)
288
                    results['roles'] += 1
289

  
290
            # rebuild indexes for imported objects
291
            for k, v in results.items():
292
                if k == 'settings':
293
                    continue
294
                if v == 0:
295
                    continue
296
                klass = None
297
                if k == 'formdefs':
298
                    from .formdef import FormDef
299

  
300
                    klass = FormDef
301
                elif k == 'carddefs':
302
                    from .carddef import CardDef
303

  
304
                    klass = CardDef
305
                elif k == 'blockdefs':
306
                    klass = BlockDef
307
                elif k == 'categories':
308
                    from .categories import Category
309

  
310
                    klass = Category
311
                elif k == 'roles':
312
                    klass = self.role_class
313
                elif k == 'workflows':
314
                    klass = Workflow
315
                if klass:
316
                    klass.rebuild_indexes()
317

  
318
                if k == 'formdefs':
319
                    # in case of formdefs, we store them anew in case SQL changes
320
                    # are required.
321
                    for formdef in formdefs or FormDef.select():
322
                        formdef.store()
323
                elif k == 'carddefs':
324
                    # ditto for cards
325
                    for carddef in carddefs or CardDef.select():
326
                        carddef.store()
327 327

  
328
        z.close()
329 328
        return results
330 329

  
331 330
    def initialize_sql(self):
wcs/qommon/admin/menu.py
45 45
        if os.path.exists('/etc/debian_version'):
46 46
            # debian case
47 47
            try:
48
                process = subprocess.Popen(
48
                with subprocess.Popen(
49 49
                    ['dpkg', '-l', package], stdout=subprocess.PIPE, stderr=subprocess.STDOUT
50
                )
51
                version = process.communicate()[0].splitlines()[-1].split()[2]
52
                if process.returncode == 0:
53
                    return "%s %s (Debian)" % (package, version.decode())
50
                ) as process:
51
                    version = process.communicate()[0].splitlines()[-1].split()[2]
52
                    if process.returncode == 0:
53
                        return "%s %s (Debian)" % (package, version.decode())
54 54
            except Exception:
55 55
                pass
56 56
        return None
......
66 66

  
67 67
    if os.path.exists(os.path.join(srcdir, '.git')):
68 68
        try:
69
            process = subprocess.Popen(
69
            with subprocess.Popen(
70 70
                ['git', 'log', '--pretty=oneline', '-1'], stdout=subprocess.PIPE, cwd=srcdir
71
            )
72
            output = process.communicate()[0]
71
            ) as process:
72
                output = process.communicate()[0]
73 73
            rev = str(output.split()[0].decode('ascii'))
74
            process = subprocess.Popen(['git', 'branch'], stdout=subprocess.PIPE, cwd=srcdir)
75
            output = process.communicate()[0]
74
            with subprocess.Popen(['git', 'branch'], stdout=subprocess.PIPE, cwd=srcdir) as process:
75
                output = process.communicate()[0]
76 76
            starred_line = [x for x in output.splitlines() if x.startswith(b'*')][0]
77 77
            branch = str(starred_line.split()[1].decode('ascii'))
78 78
            url = "https://repos.entrouvert.org/%s.git/commit/?id=%s" % (package, rev)
wcs/qommon/form.py
660 660
        file_path = self.build_file_path()
661 661
        if not os.path.exists(self.dir_path()):
662 662
            os.mkdir(self.dir_path())
663
        open(file_path, 'wb').write(content)
663
        with open(file_path, 'wb') as f:
664
            f.write(content)
664 665

  
665 666
    def dir_path(self):
666 667
        return os.path.join(get_publisher().app_dir, self.directory)
......
669 670
        return os.path.join(get_publisher().app_dir, self.directory, self.filename)
670 671

  
671 672
    def get_file(self):
672
        return open(self.build_file_path(), 'rb')
673
        return open(self.build_file_path(), 'rb')  # pylint: disable=consider-using-with
673 674

  
674 675
    def get_content(self):
675 676
        return self.get_file().read()
wcs/qommon/misc.py
679 679
        except subprocess.CalledProcessError:
680 680
            raise ThumbnailError()
681 681
    else:
682
        fp = open(filepath, 'rb')
683

  
682
        fp = open(filepath, 'rb')  # pylint: disable=consider-using-with
684 683
    try:
685 684
        image = Image.open(fp)
686 685
        try:
wcs/qommon/ods.py
157 157
        return ET.tostring(self.get_content_node(), 'utf-8')
158 158

  
159 159
    def save(self, output):
160
        z = zipfile.ZipFile(output, 'w')
161
        z.writestr('content.xml', self.get_content())
162
        z.writestr('styles.xml', self.get_styles())
163
        z.writestr('mimetype', 'application/vnd.oasis.opendocument.spreadsheet')
164
        z.writestr(
165
            'META-INF/manifest.xml',
166
            '''<?xml version="1.0" encoding="UTF-8"?>
167
<manifest:manifest xmlns:manifest="urn:oasis:names:tc:opendocument:xmlns:manifest:1.0">
168
 <manifest:file-entry manifest:full-path="/" manifest:media-type="application/vnd.oasis.opendocument.spreadsheet"/>
169
 <manifest:file-entry manifest:full-path="styles.xml" manifest:media-type="text/xml"/>
170
 <manifest:file-entry manifest:full-path="content.xml" manifest:media-type="text/xml"/>
171
 <manifest:file-entry manifest:full-path="META-INF/manifest.xml" manifest:media-type="text/xml"/>
172
 <manifest:file-entry manifest:full-path="mimetype" manifest:media-type="text/plain"/>
173
</manifest:manifest>''',
174
        )
175
        z.close()
160
        with zipfile.ZipFile(output, 'w') as z:
161
            z.writestr('content.xml', self.get_content())
162
            z.writestr('styles.xml', self.get_styles())
163
            z.writestr('mimetype', 'application/vnd.oasis.opendocument.spreadsheet')
164
            z.writestr(
165
                'META-INF/manifest.xml',
166
                '''<?xml version="1.0" encoding="UTF-8"?>
167
    <manifest:manifest xmlns:manifest="urn:oasis:names:tc:opendocument:xmlns:manifest:1.0">
168
     <manifest:file-entry manifest:full-path="/" manifest:media-type="application/vnd.oasis.opendocument.spreadsheet"/>
169
     <manifest:file-entry manifest:full-path="styles.xml" manifest:media-type="text/xml"/>
170
     <manifest:file-entry manifest:full-path="content.xml" manifest:media-type="text/xml"/>
171
     <manifest:file-entry manifest:full-path="META-INF/manifest.xml" manifest:media-type="text/xml"/>
172
     <manifest:file-entry manifest:full-path="mimetype" manifest:media-type="text/plain"/>
173
    </manifest:manifest>''',
174
            )
176 175

  
177 176

  
178 177
class WorkSheet:
wcs/qommon/storage.py
501 501
    def get_filename(cls, filename, ignore_errors=False, ignore_migration=False, **kwargs):
502 502
        fd = None
503 503
        try:
504
            fd = open(force_bytes(filename, 'utf-8'), 'rb')
504
            fd = open(force_bytes(filename, 'utf-8'), 'rb')  # pylint: disable=consider-using-with
505 505
            o = cls.storage_load(fd, **kwargs)
506 506
        except IOError:
507 507
            if ignore_errors:
wcs/qommon/upload_storage.py
42 42
            return self.__dict__.get('fp')
43 43
        elif getattr(self, 'qfilename', None):
44 44
            basedir = os.path.join(get_publisher().app_dir, 'uploads')
45
            self.fp = open(os.path.join(basedir, self.qfilename), 'rb')
45
            self.fp = open(os.path.join(basedir, self.qfilename), 'rb')  # pylint: disable=consider-using-with
46 46
            return self.fp
47 47
        return None
48 48

  
......
125 125
        upload.__class__ = PicklableUpload
126 126
        dirname = os.path.join(get_publisher().app_dir, 'tempfiles')
127 127
        filename = os.path.join(dirname, upload.token)
128
        fd = open(filename, 'wb')
129
        upload.get_file_pointer().seek(0)
130
        fd.write(upload.get_file_pointer().read())
131
        upload.size = fd.tell()
132
        fd.close()
128
        with open(filename, 'wb') as fd:
129
            upload.get_file_pointer().seek(0)
130
            fd.write(upload.get_file_pointer().read())
131
            upload.size = fd.tell()
133 132

  
134 133
    def get_tempfile(self, temp_data):
135 134
        value = PicklableUpload(temp_data['orig_filename'], temp_data['content_type'], temp_data['charset'])
......
138 137
        filename = os.path.join(dirname, temp_data['unsigned_token'])
139 138
        value.token = temp_data['token']
140 139
        value.file_size = os.path.getsize(filename)
141
        value.fp = open(filename, 'rb')
140
        value.fp = open(filename, 'rb')  # pylint: disable=consider-using-with
142 141
        return value
143 142

  
144 143
    def save(self, upload):
wcs/qommon/vendor/locket.py
65 65

  
66 66

  
67 67
def lock_file(path, **kwargs):
68
    _locks_lock.acquire()
69
    try:
68
    with _locks_lock:
70 69
        lock = _locks.get(path)
71 70
        if lock is None:
72 71
            lock = _create_lock_file(path, **kwargs)
73 72
            _locks[path] = lock
74 73
        return lock
75
    finally:
76
        _locks_lock.release()
77 74

  
78 75

  
79 76
def _create_lock_file(path, **kwargs):
......
139 136

  
140 137
    def acquire(self):
141 138
        if self._timeout is None:
142
            self._lock.acquire()
139
            self._lock.acquire()  # pylint: disable=consider-using-with
143 140
        else:
144 141
            _acquire_non_blocking(
145
                acquire=lambda: self._lock.acquire(False),
142
                acquire=lambda: self._lock.acquire(False),  # pylint: disable=consider-using-with
146 143
                timeout=self._timeout,
147 144
                retry_period=self._retry_period,
148 145
                path=self._path,
......
162 159

  
163 160
    def acquire(self):
164 161
        if self._file is None:
165
            self._file = open(self._path, "w")
162
            self._file = open(self._path, "w")  # pylint: disable=consider-using-with
166 163
        if self._timeout is None:
167 164
            _lock_file_blocking(self._file)
168 165
        else:
wcs/qommon/x509utils.py
45 45
    Return a tuple made of the return code and the stdout output
46 46
    """
47 47
    try:
48
        process = subprocess.Popen(args=[_openssl] + args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
49
        output = process.communicate()[0]
50
        return process.returncode, output
48
        with subprocess.Popen(
49
            args=[_openssl] + args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
50
        ) as process:
51
            output = process.communicate()[0]
52
            return process.returncode, output
51 53
    except OSError:
52 54
        return 1, None
53 55

  
wcs/wf/export_to_model.py
135 135
    format, parse context.xml with element tree and apply process to its root
136 136
    node.
137 137
    """
138
    zin = zipfile.ZipFile(instream, mode='r')
139
    zout = zipfile.ZipFile(outstream, mode='w')
140
    new_images = {}
141
    assert 'content.xml' in zin.namelist()
142
    for filename in zin.namelist():
143
        # first pass to process meta.xml, content.xml and styles.xml
144
        if filename not in ('meta.xml', 'content.xml', 'styles.xml'):
145
            continue
146
        content = zin.read(filename)
147
        root = ET.fromstring(content)
148
        process(root, new_images)
149
        content = ET.tostring(root)
150
        zout.writestr(filename, content)
151

  
152
    for filename in zin.namelist():
153
        # second pass to copy/replace other files
154
        if filename in ('meta.xml', 'content.xml', 'styles.xml'):
155
            continue
156
        if filename in new_images:
157
            content = new_images[filename].get_content()
158
        else:
138
    with zipfile.ZipFile(instream, mode='r') as zin, zipfile.ZipFile(outstream, mode='w') as zout:
139
        new_images = {}
140
        assert 'content.xml' in zin.namelist()
141
        for filename in zin.namelist():
142
            # first pass to process meta.xml, content.xml and styles.xml
143
            if filename not in ('meta.xml', 'content.xml', 'styles.xml'):
144
                continue
159 145
            content = zin.read(filename)
160
        zout.writestr(filename, content)
161
    zout.close()
146
            root = ET.fromstring(content)
147
            process(root, new_images)
148
            content = ET.tostring(root)
149
            zout.writestr(filename, content)
150

  
151
        for filename in zin.namelist():
152
            # second pass to copy/replace other files
153
            if filename in ('meta.xml', 'content.xml', 'styles.xml'):
154
                continue
155
            if filename in new_images:
156
                content = new_images[filename].get_content()
157
            else:
158
                content = zin.read(filename)
159
            zout.writestr(filename, content)
162 160

  
163 161

  
164 162
def is_opendocument(stream):
wcs/workflows.py
229 229
    def get_file_pointer(self):
230 230
        if self.filename.startswith('uuid-'):
231 231
            return None
232
        return open(self.filename, 'rb')
232
        return open(self.filename, 'rb')  # pylint: disable=consider-using-with
233 233

  
234 234
    def __getstate__(self):
235 235
        odict = self.__dict__.copy()
236
-