0009-misc-fix-consider-using-with-pylint-error-56288.patch
combo/apps/assets/utils.py | ||
---|---|---|
66 | 66 | |
67 | 67 | |
68 | 68 |
def import_assets(fd, overwrite=False): |
69 |
tar = tarfile.open(mode='r', fileobj=fd)
|
|
70 |
data = untar_assets_files(tar, overwrite=overwrite) |
|
69 |
with tarfile.open(mode='r', fileobj=fd) as tar:
|
|
70 |
data = untar_assets_files(tar, overwrite=overwrite)
|
|
71 | 71 |
Asset.load_serialized_objects(data.get('assets') or []) |
72 |
tar.close() |
|
73 | 72 | |
74 | 73 | |
75 | 74 |
def export_assets(fd): |
76 |
tar = tarfile.open(mode='w', fileobj=fd) |
|
77 |
tar_assets_files(tar) |
|
78 |
tar.close() |
|
75 |
with tarfile.open(mode='w', fileobj=fd) as tar: |
|
76 |
tar_assets_files(tar) |
combo/data/management/commands/export_site.py | ||
---|---|---|
37 | 37 |
def handle(self, *args, **options): |
38 | 38 |
if options['format_json']: |
39 | 39 |
if options['output'] and options['output'] != '-': |
40 |
output = open(options['output'], 'w') |
|
40 |
with open(options['output'], 'w') as output: |
|
41 |
json.dump(export_site(), output, indent=2) |
|
41 | 42 |
else: |
42 |
output = sys.stdout |
|
43 |
json.dump(export_site(), output, indent=2) |
|
44 |
else: |
|
45 |
if options['output'] and options['output'] != '-': |
|
46 |
try: |
|
47 |
output = open(options['output'], 'wb') |
|
48 |
except OSError as e: |
|
49 |
raise CommandError(e) |
|
50 |
export_site_tar(output) |
|
51 |
output.close() |
|
52 |
else: |
|
53 |
raise CommandError(_('TAR format require output filename parameter')) |
|
43 |
json.dump(export_site(), sys.stdout, indent=2) |
|
44 |
return |
|
45 | ||
46 |
if options['output'] and options['output'] != '-': |
|
47 |
try: |
|
48 |
with open(options['output'], 'wb') as output: |
|
49 |
export_site_tar(output) |
|
50 |
except OSError as e: |
|
51 |
raise CommandError(e) |
|
52 |
return |
|
53 | ||
54 |
raise CommandError(_('TAR format require output filename parameter')) |
combo/data/management/commands/import_site.py | ||
---|---|---|
36 | 36 |
parser.add_argument('--overwrite', action='store_true', default=False, help='Overwrite asset files') |
37 | 37 | |
38 | 38 |
def handle(self, filename, *args, **options): |
39 |
def _import_site_json(fd): |
|
40 |
import_site(json.load(fd), if_empty=options['if_empty'], clean=options['clean']) |
|
41 | ||
42 |
def _import_site_tar(fd): |
|
43 |
import_site_tar( |
|
44 |
fd, |
|
45 |
if_empty=options['if_empty'], |
|
46 |
clean=options['clean'], |
|
47 |
overwrite=options['overwrite'], |
|
48 |
) |
|
49 | ||
39 | 50 |
with contextlib.ExitStack() as stack: |
40 |
if filename == '-': |
|
41 |
format = 'json' |
|
42 |
fd = sys.stdin |
|
43 |
else: |
|
51 |
try: |
|
52 |
if filename == '-': |
|
53 |
_import_site_json(sys.stdin) |
|
54 |
return |
|
55 | ||
44 | 56 |
try: |
45 |
fd = stack.enter_context(open(filename, 'rb')) |
|
57 |
with open(filename, 'rb') as _f: |
|
58 |
fd = stack.enter_context(_f) |
|
59 |
try: |
|
60 |
with tarfile.open(mode='r', fileobj=fd): |
|
61 |
pass |
|
62 |
except tarfile.TarError: |
|
63 |
with open(filename) as json_fd: |
|
64 |
_import_site_json(json_fd) |
|
65 |
else: |
|
66 |
with open(filename, 'rb') as tar_fd: |
|
67 |
_import_site_tar(tar_fd) |
|
46 | 68 |
except OSError as e: |
47 | 69 |
raise CommandError(e) |
48 |
try: |
|
49 |
tarfile.open(mode='r', fileobj=fd) |
|
50 |
except tarfile.TarError as e: |
|
51 |
format = 'json' |
|
52 |
fd = open(filename) |
|
53 |
else: |
|
54 |
format = 'tar' |
|
55 |
fd = open(filename, 'rb') |
|
56 |
try: |
|
57 |
if format == 'json': |
|
58 |
import_site(json.load(fd), if_empty=options['if_empty'], clean=options['clean']) |
|
59 |
else: |
|
60 |
import_site_tar( |
|
61 |
fd, |
|
62 |
if_empty=options['if_empty'], |
|
63 |
clean=options['clean'], |
|
64 |
overwrite=options['overwrite'], |
|
65 |
) |
|
66 | 70 |
except ImportSiteError as e: |
67 | 71 |
raise CommandError(e) |
combo/data/utils.py | ||
---|---|---|
139 | 139 | |
140 | 140 | |
141 | 141 |
def export_site_tar(fd, export_kwargs=None): |
142 |
tar = tarfile.open(mode='w', fileobj=fd) |
|
143 |
data = export_site(**(export_kwargs or {})) |
|
144 |
del data['assets'] |
|
145 |
add_tar_content(tar, '_site.json', json.dumps(data, indent=2)) |
|
146 |
tar_assets_files(tar) |
|
147 |
tar.close() |
|
142 |
with tarfile.open(mode='w', fileobj=fd) as tar: |
|
143 |
data = export_site(**(export_kwargs or {})) |
|
144 |
del data['assets'] |
|
145 |
add_tar_content(tar, '_site.json', json.dumps(data, indent=2)) |
|
146 |
tar_assets_files(tar) |
|
148 | 147 | |
149 | 148 | |
150 | 149 |
def import_site_tar(fd, if_empty=False, clean=False, overwrite=False, request=None): |
151 |
tar = tarfile.open(mode='r', fileobj=fd)
|
|
152 |
try: |
|
153 |
tarinfo = tar.getmember('_site.json') |
|
154 |
except KeyError: |
|
155 |
raise ImportSiteError(_('TAR file should provide _site.json file')) |
|
150 |
with tarfile.open(mode='r', fileobj=fd) as tar:
|
|
151 |
try:
|
|
152 |
tarinfo = tar.getmember('_site.json')
|
|
153 |
except KeyError:
|
|
154 |
raise ImportSiteError(_('TAR file should provide _site.json file'))
|
|
156 | 155 | |
157 |
if if_empty and (Page.objects.count() or MapLayer.objects.count()): |
|
158 |
return |
|
156 |
if if_empty and (Page.objects.count() or MapLayer.objects.count()):
|
|
157 |
return
|
|
159 | 158 | |
160 |
if clean: |
|
161 |
clean_assets_files() |
|
159 |
if clean:
|
|
160 |
clean_assets_files()
|
|
162 | 161 | |
163 |
json_site = tar.extractfile(tarinfo).read() |
|
164 |
data = json.loads(json_site.decode('utf-8')) |
|
165 |
data.update(untar_assets_files(tar, overwrite=overwrite)) |
|
166 |
pages = import_site(data, if_empty=if_empty, clean=clean, request=request) |
|
167 |
tar.close() |
|
162 |
json_site = tar.extractfile(tarinfo).read() |
|
163 |
data = json.loads(json_site.decode('utf-8')) |
|
164 |
data.update(untar_assets_files(tar, overwrite=overwrite)) |
|
165 |
pages = import_site(data, if_empty=if_empty, clean=clean, request=request) |
|
168 | 166 |
return pages |
combo/manager/views.py | ||
---|---|---|
113 | 113 |
def form_valid(self, form): |
114 | 114 |
fd = self.request.FILES['site_file'].file |
115 | 115 |
try: |
116 |
tarfile.open(mode='r', fileobj=fd) |
|
116 |
with tarfile.open(mode='r', fileobj=fd): |
|
117 |
pass |
|
117 | 118 |
except tarfile.TarError: |
118 | 119 |
try: |
119 | 120 |
fd.seek(0) |
combo/settings.py | ||
---|---|---|
381 | 381 |
'COMBO_SETTINGS_FILE', os.path.join(os.path.dirname(__file__), 'local_settings.py') |
382 | 382 |
) |
383 | 383 |
if os.path.exists(local_settings_file): |
384 |
exec(open(local_settings_file).read()) |
|
384 |
with open(local_settings_file) as fd: |
|
385 |
exec(fd.read()) |
tests/test_assets.py | ||
---|---|---|
88 | 88 | |
89 | 89 |
def test_add_tar_content(tmpdir): |
90 | 90 |
filename = os.path.join(str(tmpdir), 'file.tar') |
91 |
tar = tarfile.open(filename, 'w') |
|
92 |
add_tar_content(tar, 'foo.txt', 'bar') |
|
93 |
tar.close() |
|
91 |
with tarfile.open(filename, 'w') as tar: |
|
92 |
add_tar_content(tar, 'foo.txt', 'bar') |
|
94 | 93 | |
95 |
tar = tarfile.open(filename, 'r')
|
|
96 |
tarinfo = tar.getmember('foo.txt') |
|
97 |
assert tar.extractfile(tarinfo).read().decode('utf-8') == 'bar' |
|
94 |
with tarfile.open(filename, 'r') as tar:
|
|
95 |
tarinfo = tar.getmember('foo.txt')
|
|
96 |
assert tar.extractfile(tarinfo).read().decode('utf-8') == 'bar'
|
|
98 | 97 | |
99 | 98 | |
100 | 99 |
def test_tar_untar_assets(some_assets): |
... | ... | |
102 | 101 |
assert count_asset_files() == 2 |
103 | 102 |
fd = BytesIO() |
104 | 103 | |
105 |
tar = tarfile.open(mode='w', fileobj=fd) |
|
106 |
tar_assets_files(tar) |
|
107 |
tar_bytes = fd.getvalue() |
|
108 |
tar.close() |
|
104 |
with tarfile.open(mode='w', fileobj=fd) as tar: |
|
105 |
tar_assets_files(tar) |
|
106 |
tar_bytes = fd.getvalue() |
|
109 | 107 | |
110 | 108 |
path = default_storage.path('') |
111 | 109 |
os.remove('%s/assets/test.png' % path) |
112 |
open('%s/assets/test2.png' % path, 'w').write('foo') |
|
110 |
with open('%s/assets/test2.png' % path, 'w') as fd: |
|
111 |
fd.write('foo') |
|
113 | 112 |
assert count_asset_files() == 1 |
114 | 113 |
Asset.objects.all().delete() |
115 | 114 |
assert Asset.objects.count() == 0 |
116 | 115 |
fd = BytesIO(tar_bytes) |
117 | 116 | |
118 |
tar = tarfile.open(mode='r', fileobj=fd)
|
|
119 |
data = untar_assets_files(tar) |
|
117 |
with tarfile.open(mode='r', fileobj=fd) as tar:
|
|
118 |
data = untar_assets_files(tar)
|
|
120 | 119 |
assert [x['fields']['key'] for x in data['assets']] == ['banner', 'favicon'] |
121 | 120 |
assert count_asset_files() == 2 |
122 |
assert open('%s/assets/test.png' % path).read() == 'test' |
|
123 |
assert open('%s/assets/test2.png' % path).read() == 'foo' |
|
121 |
with open('%s/assets/test.png' % path) as fd: |
|
122 |
assert fd.read() == 'test' |
|
123 |
with open('%s/assets/test2.png' % path) as fd: |
|
124 |
assert fd.read() == 'foo' |
|
124 | 125 |
clean_assets_files() |
125 | 126 | |
126 | 127 | |
... | ... | |
128 | 129 |
filename = os.path.join(str(tmpdir), 'file.tar') |
129 | 130 |
assert Asset.objects.count() == 2 |
130 | 131 |
assert count_asset_files() == 2 |
131 |
fd = open(filename, 'wb')
|
|
132 |
export_assets(fd) |
|
132 |
with open(filename, 'wb') as fd:
|
|
133 |
export_assets(fd)
|
|
133 | 134 | |
134 | 135 |
path = default_storage.path('') |
135 | 136 |
os.remove('%s/assets/test.png' % path) |
136 |
open('%s/assets/test2.png' % path, 'w').write('foo') |
|
137 |
with open('%s/assets/test2.png' % path, 'w') as fd: |
|
138 |
fd.write('foo') |
|
137 | 139 |
assert count_asset_files() == 1 |
138 | 140 |
Asset.objects.all().delete() |
139 | 141 |
assert Asset.objects.count() == 0 |
140 | 142 | |
141 |
fd = open(filename, 'rb')
|
|
142 |
import_assets(fd, overwrite=True) |
|
143 |
with open(filename, 'rb') as fd:
|
|
144 |
import_assets(fd, overwrite=True)
|
|
143 | 145 |
assert count_asset_files() == 2 |
144 |
assert open('%s/assets/test.png' % path).read() == 'test' |
|
145 |
assert open('%s/assets/test2.png' % path).read() == 'test2' |
|
146 |
with open('%s/assets/test.png' % path) as fd: |
|
147 |
assert fd.read() == 'test' |
|
148 |
with open('%s/assets/test2.png' % path) as fd: |
|
149 |
assert fd.read() == 'test2' |
|
146 | 150 |
clean_assets_files() |
147 | 151 |
assert count_asset_files() == 0 |
148 | 152 |
clean_assets_files() |
tests/test_import_export.py | ||
---|---|---|
389 | 389 |
assert Page.objects.count() == 1 |
390 | 390 |
assert Asset.objects.count() == 3 |
391 | 391 |
Asset.objects.get(key='banner').asset.name == 'assets/test.png' |
392 |
assert open('%s/assets/test.png' % default_storage.path('')).read() == 'original content' |
|
392 |
with open('%s/assets/test.png' % default_storage.path('')) as fd: |
|
393 |
assert fd.read() == 'original content' |
|
393 | 394 | |
394 | 395 |
populate_site() |
395 | 396 |
call_command('import_site', filename, '--overwrite') |
396 | 397 |
assert Page.objects.count() == 1 |
397 | 398 |
assert Asset.objects.count() == 3 |
398 | 399 |
Asset.objects.get(key='banner').asset.name == 'assets/test.png' |
399 |
assert open('%s/assets/test.png' % default_storage.path('')).read() == 'test' |
|
400 |
with open('%s/assets/test.png' % default_storage.path('')) as fd: |
|
401 |
assert fd.read() == 'test' |
|
400 | 402 | |
401 | 403 |
populate_site() |
402 | 404 |
call_command('import_site', filename, '--if-empty') |
403 | 405 |
assert Page.objects.count() == 1 |
404 | 406 |
assert Asset.objects.count() == 2 |
405 | 407 |
Asset.objects.get(key='banner').asset.name == 'assets/test3.png' |
406 |
assert open('%s/assets/test.png' % default_storage.path('')).read() == 'original content' |
|
408 |
with open('%s/assets/test.png' % default_storage.path('')) as fd: |
|
409 |
assert fd.read() == 'original content' |
|
407 | 410 |
Asset.objects.get(key='logo').asset.name == 'assets/logo.png' |
408 | 411 |
assert os.path.isfile('%s/assets/logo.png' % default_storage.path('')) |
409 | 412 | |
... | ... | |
412 | 415 |
assert Page.objects.count() == 0 |
413 | 416 |
assert Asset.objects.count() == 2 |
414 | 417 |
Asset.objects.get(key='banner').asset.name == 'assets/test.png' |
415 |
assert open('%s/assets/test.png' % default_storage.path('')).read() == 'test' |
|
418 |
with open('%s/assets/test.png' % default_storage.path('')) as fd: |
|
419 |
assert fd.read() == 'test' |
|
416 | 420 |
assert not Asset.objects.filter(key='logo') |
417 | 421 |
assert not os.path.isfile('%s/assets/logo.png' % default_storage.path('')) |
418 | 422 | |
... | ... | |
426 | 430 |
with pytest.raises(CommandError, match=r'No such file or directory'): |
427 | 431 |
call_command('import_site', '%s/noway/foo.tar' % tmpdir) |
428 | 432 | |
429 |
tarfile.open(filename, 'w').close() # empty tar file |
|
433 |
with tarfile.open(filename, 'w'): |
|
434 |
# empty tar file |
|
435 |
pass |
|
430 | 436 |
with pytest.raises(CommandError, match=r'TAR file should provide _site.json file'): |
431 | 437 |
call_command('import_site', filename) |
432 | 438 |
tests/test_manager.py | ||
---|---|---|
513 | 513 |
resp = app.get('/manage/pages/%s/' % page.id) |
514 | 514 |
resp = resp.click(href='.*/picture/') |
515 | 515 | |
516 |
resp.form['picture'] = Upload( |
|
517 |
'black.jpeg', open(os.path.join(TESTS_DATA_DIR, 'black.jpeg'), mode='rb').read(), 'image/jpeg' |
|
518 |
) |
|
516 |
with open(os.path.join(TESTS_DATA_DIR, 'black.jpeg'), mode='rb') as fd: |
|
517 |
resp.form['picture'] = Upload('black.jpeg', fd.read(), 'image/jpeg') |
|
519 | 518 |
resp = resp.form.submit() |
520 | 519 |
assert resp.location.endswith('/manage/pages/%s/' % page.id) |
521 | 520 |
resp = resp.follow() |
... | ... | |
932 | 931 |
cell.save() |
933 | 932 |
Asset(key='collectivity:banner', asset=File(BytesIO(b'test'), 'test.png')).save() |
934 | 933 |
path = default_storage.path('') |
935 |
assert open('%s/assets/test.png' % path).read() == 'test' |
|
934 |
with open('%s/assets/test.png' % path) as fd: |
|
935 |
assert fd.read() == 'test' |
|
936 | 936 | |
937 | 937 |
app = login(app) |
938 | 938 |
resp = app.get('/manage/') |
... | ... | |
947 | 947 |
assert Page.objects.count() == 0 |
948 | 948 |
assert TextCell.objects.count() == 0 |
949 | 949 |
assert Asset.objects.filter(key='collectivity:banner').count() == 0 |
950 |
open('%s/assets/test.png' % path, 'w').write('foo') |
|
950 |
with open('%s/assets/test.png' % path, 'w') as fd: |
|
951 |
fd.write('foo') |
|
951 | 952 |
app = login(app) |
952 | 953 |
resp = app.get('/manage/') |
953 | 954 |
resp = resp.click('Import Site') |
... | ... | |
957 | 958 |
assert PageSnapshot.objects.all().count() == 1 |
958 | 959 |
assert TextCell.objects.count() == 1 |
959 | 960 |
assert Asset.objects.filter(key='collectivity:banner').count() == 1 |
960 |
assert open('%s/assets/test.png' % path).read() == 'foo' |
|
961 |
with open('%s/assets/test.png' % path) as fd: |
|
962 |
assert fd.read() == 'foo' |
|
961 | 963 | |
962 | 964 |
os.remove('%s/assets/test.png' % path) |
963 | 965 |
app = login(app) |
... | ... | |
965 | 967 |
resp = resp.click('Import Site') |
966 | 968 |
resp.form['site_file'] = Upload('site-export.json', site_export, 'application/json') |
967 | 969 |
resp = resp.form.submit() |
968 |
assert open('%s/assets/test.png' % path).read() == 'test' |
|
970 |
with open('%s/assets/test.png' % path) as fd: |
|
971 |
assert fd.read() == 'test' |
|
969 | 972 | |
970 | 973 | |
971 | 974 |
def test_site_export_import_missing_group(app, admin_user): |
... | ... | |
1610 | 1613 |
thumbnail_filename = re.findall('src="/media/(.*thumb.*)"', resp.text)[0] |
1611 | 1614 |
thumbnail_path = default_storage.path(thumbnail_filename) |
1612 | 1615 |
assert os.path.exists(thumbnail_path) |
1613 |
thumbnail_contents = open(thumbnail_path, mode='rb').read() |
|
1616 |
with open(thumbnail_path, mode='rb') as fd: |
|
1617 |
thumbnail_contents = fd.read() |
|
1614 | 1618 | |
1615 | 1619 |
# check overwriting |
1616 | 1620 |
resp = resp.click('Overwrite') |
... | ... | |
1627 | 1631 |
resp.click('test.png') |
1628 | 1632 |
assert re.findall('src="/media/(.*thumb.*)"', resp.text)[0] == thumbnail_filename |
1629 | 1633 |
assert os.path.exists(thumbnail_path) |
1630 |
thumbnail_contents_new = open(thumbnail_path, mode='rb').read() |
|
1634 |
with open(thumbnail_path, mode='rb') as fd: |
|
1635 |
thumbnail_contents_new = fd.read() |
|
1631 | 1636 |
assert thumbnail_contents_new != thumbnail_contents |
1632 | 1637 | |
1633 | 1638 |
# try to overwrite with a different mimetype |
tests/test_pages.py | ||
---|---|---|
352 | 352 |
try: |
353 | 353 |
sys.stdout = StringIO() |
354 | 354 |
cmd.handle(output='-', format_json=True) |
355 |
assert sys.stdout.getvalue() == open(export_filename).read() |
|
355 |
with open(export_filename) as fd: |
|
356 |
assert sys.stdout.getvalue() == fd.read() |
|
356 | 357 |
finally: |
357 | 358 |
sys.stdout = stdout |
358 | 359 | |
359 |
- |