0002-assets-factorize-import-export-code-39425.patch
combo/apps/assets/utils.py | ||
---|---|---|
1 |
# combo - content management system |
|
2 |
# Copyright (C) 2020 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import json |
|
18 |
import os |
|
19 |
import tarfile |
|
20 | ||
21 |
from django.core.files.storage import default_storage |
|
22 |
from django.utils.six import BytesIO |
|
23 | ||
24 |
from .models import Asset |
|
25 | ||
26 | ||
27 |
def clean_assets_files(): |
|
28 |
media_prefix = default_storage.path('') |
|
29 |
for basedir, dirnames, filenames in os.walk(media_prefix): |
|
30 |
for filename in filenames: |
|
31 |
os.remove('%s/%s' % (basedir, filename)) |
|
32 | ||
33 |
def add_tar_content(tar, filename, content): |
|
34 |
file = tarfile.TarInfo(filename) |
|
35 |
fd = BytesIO() |
|
36 |
fd.write(content.encode('utf-8')) |
|
37 |
file.size = fd.tell() |
|
38 |
fd.seek(0) |
|
39 |
tar.addfile(file, fileobj=fd) |
|
40 |
fd.close() |
|
41 | ||
42 |
def untar_assets_files(tar, overwrite=False): |
|
43 |
media_prefix = default_storage.path('') |
|
44 |
for tarinfo in tar.getmembers(): |
|
45 |
filepath = default_storage.path(tarinfo.name) |
|
46 |
if not overwrite and os.path.exists(filepath): |
|
47 |
continue |
|
48 |
if tarinfo.name == '_assets.json': |
|
49 |
json_assets = tar.extractfile(tarinfo).read() |
|
50 |
data = json.loads(json_assets.decode('utf-8')) |
|
51 |
elif tarinfo.name != '_site.json': |
|
52 |
tar.extract(tarinfo, path=media_prefix) |
|
53 |
return data |
|
54 | ||
55 |
def tar_assets_files(tar): |
|
56 |
media_prefix = default_storage.path('') |
|
57 |
for basedir, dirnames, filenames in os.walk(media_prefix): |
|
58 |
for filename in filenames: |
|
59 |
tar.add(os.path.join(basedir, filename), |
|
60 |
os.path.join(basedir, filename)[len(media_prefix):]) |
|
61 |
export = {'assets': Asset.export_all_for_json()} |
|
62 |
add_tar_content(tar, '_assets.json', json.dumps(export, indent=2)) |
|
63 | ||
64 |
def import_assets(fd, overwrite=False): |
|
65 |
tar = tarfile.open(mode='r', fileobj=fd) |
|
66 |
data = untar_assets_files(tar, overwrite=overwrite) |
|
67 |
Asset.load_serialized_objects(data.get('assets') or []) |
|
68 |
tar.close() |
|
69 | ||
70 |
def export_assets(fd): |
|
71 |
tar = tarfile.open(mode='w', fileobj=fd) |
|
72 |
tar_assets_files(tar) |
|
73 |
tar.close() |
combo/apps/assets/views.py | ||
---|---|---|
9 | 9 |
# This program is distributed in the hope that it will be useful, |
10 | 10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | 11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | 12 |
# GNU Affero General Public License for more details. |
13 | 13 |
# |
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import json |
|
18 | 17 |
import tarfile |
19 | 18 |
import os |
20 | 19 | |
21 | 20 |
from django.conf import settings |
22 | 21 |
from django.contrib import messages |
23 | 22 |
from django.core.exceptions import PermissionDenied |
24 | 23 |
from django.core.files.storage import default_storage |
25 | 24 |
from django.core.urlresolvers import reverse, reverse_lazy |
... | ... | |
29 | 28 |
from django.utils.six import BytesIO |
30 | 29 |
from django.utils.translation import ugettext_lazy as _ |
31 | 30 |
from django.views.generic import TemplateView, ListView, FormView |
32 | 31 | |
33 | 32 |
import ckeditor |
34 | 33 |
from sorl.thumbnail.shortcuts import get_thumbnail |
35 | 34 | |
36 | 35 |
from combo.data.models import CellBase |
37 |
from combo.data.utils import import_site
|
|
36 |
from combo.apps.assets.utils import import_assets, export_assets
|
|
38 | 37 | |
39 | 38 |
from .forms import AssetUploadForm, AssetsImportForm |
40 | 39 |
from .models import Asset |
41 | 40 | |
42 | 41 | |
43 | 42 |
class CkEditorAsset(object): |
44 | 43 |
def __init__(self, filepath): |
45 | 44 |
self.filepath = filepath |
... | ... | |
264 | 263 |
class AssetsImport(FormView): |
265 | 264 |
form_class = AssetsImportForm |
266 | 265 |
template_name = 'combo/manager_assets_import.html' |
267 | 266 |
success_url = reverse_lazy('combo-manager-assets') |
268 | 267 | |
269 | 268 |
def form_valid(self, form): |
270 | 269 |
overwrite = form.cleaned_data.get('overwrite') |
271 | 270 |
try: |
272 |
assets = tarfile.open(fileobj=form.cleaned_data['assets_file'])
|
|
271 |
import_assets(form.cleaned_data['assets_file'], overwrite)
|
|
273 | 272 |
except tarfile.TarError: |
274 | 273 |
messages.error(self.request, _('The assets file is not valid.')) |
275 | 274 |
return super(AssetsImport, self).form_valid(form) |
276 |
media_prefix = default_storage.path('') |
|
277 |
for tarinfo in assets.getmembers(): |
|
278 |
filepath = default_storage.path(tarinfo.name) |
|
279 |
if not overwrite and os.path.exists(filepath): |
|
280 |
continue |
|
281 |
if tarinfo.name == '_assets.json': |
|
282 |
json_assets = assets.extractfile(tarinfo).read() |
|
283 |
import_site(json.loads(json_assets.decode('utf-8'))) |
|
284 |
else: |
|
285 |
assets.extract(tarinfo, path=media_prefix) |
|
286 | 275 |
messages.success(self.request, _('The assets file has been imported.')) |
287 | 276 |
return super(AssetsImport, self).form_valid(form) |
288 | 277 | |
289 | 278 |
assets_import = AssetsImport.as_view() |
290 | 279 | |
291 | 280 | |
292 | 281 |
def assets_export(request, *args, **kwargs): |
293 | 282 |
fd = BytesIO() |
294 |
assets_file = tarfile.open('assets.tar', 'w', fileobj=fd) |
|
295 |
media_prefix = default_storage.path('') |
|
296 |
for basedir, dirnames, filenames in os.walk(media_prefix): |
|
297 |
for filename in filenames: |
|
298 |
assets_file.add( |
|
299 |
os.path.join(basedir, filename), |
|
300 |
os.path.join(basedir, filename)[len(media_prefix):]) |
|
301 |
if Asset.objects.exists(): |
|
302 |
json_file = tarfile.TarInfo('_assets.json') |
|
303 |
json_fd = BytesIO() |
|
304 |
export = {'assets': Asset.export_all_for_json(),} |
|
305 |
json_fd.write(json.dumps(export).encode('utf-8')) |
|
306 |
json_file.size = json_fd.tell() |
|
307 |
json_fd.seek(0) |
|
308 |
assets_file.addfile(json_file, fileobj=json_fd) |
|
309 |
assets_file.close() |
|
283 |
export_assets(fd) |
|
310 | 284 |
return HttpResponse(fd.getvalue(), content_type='application/x-tar') |
311 | 285 | |
312 | 286 | |
313 | 287 |
def serve_asset(request, key): |
314 | 288 |
asset = get_object_or_404(Asset, key=key) |
315 | 289 | |
316 | 290 |
if not os.path.exists(asset.asset.path): |
317 | 291 |
raise Http404() |
tests/test_assets.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 | 3 |
import base64 |
4 |
import os |
|
5 |
import tarfile |
|
4 | 6 | |
5 | 7 |
from django.core.urlresolvers import reverse |
8 |
from django.core.files.storage import default_storage |
|
9 |
from django.core.files import File |
|
10 |
from django.utils.six import BytesIO |
|
6 | 11 | |
7 | 12 |
import pytest |
8 | 13 | |
9 | 14 |
from combo.apps.assets.models import Asset |
15 |
from combo.apps.assets.utils import (add_tar_content, clean_assets_files, |
|
16 |
export_assets, import_assets, untar_assets_files, tar_assets_files) |
|
10 | 17 | |
11 | 18 |
pytestmark = pytest.mark.django_db |
12 | 19 | |
20 |
@pytest.fixture |
|
21 |
def some_assets(): |
|
22 |
Asset(key='banner', asset=File(BytesIO(b'test'), 'test.png')).save() |
|
23 |
Asset(key='favicon', asset=File(BytesIO(b'test2'), 'test2.png')).save() |
|
24 | ||
25 |
def count_asset_files(): |
|
26 |
nb_assets = 0 |
|
27 |
media_prefix = default_storage.path('') |
|
28 |
for basedir, dirnames, filenames in os.walk(media_prefix): |
|
29 |
nb_assets += len(filenames) |
|
30 |
return nb_assets |
|
31 | ||
13 | 32 |
def test_asset_set_api(app, john_doe): |
14 | 33 |
app.authorization = ('Basic', (john_doe.username, john_doe.username)) |
15 | 34 |
resp = app.post_json(reverse('api-assets-set', kwargs={'key': 'plop'}), params={ |
16 | 35 |
'asset': { |
17 | 36 |
'content': base64.encodebytes(b'plop').decode('ascii'), |
18 | 37 |
'content_type': 'text/plain', |
19 | 38 |
'filename': 'plop.txt', |
20 | 39 |
} |
... | ... | |
37 | 56 |
resp = app.post_json(reverse('api-assets-set', kwargs={'key': 'plop'}), params={ |
38 | 57 |
'asset': { |
39 | 58 |
'content': invalid_value, |
40 | 59 |
'content_type': 'text/plain', |
41 | 60 |
'filename': 'plop.txt', |
42 | 61 |
} |
43 | 62 |
}, status=400) |
44 | 63 |
assert resp.json.get('err') == 1 |
64 |
clean_assets_files() |
|
65 | ||
66 |
def test_clean_assets_files(some_assets): |
|
67 |
assert count_asset_files() == 2 |
|
68 |
clean_assets_files() |
|
69 |
assert count_asset_files() == 0 |
|
70 | ||
71 |
def test_add_tar_content(tmpdir): |
|
72 |
filename = os.path.join(str(tmpdir), 'file.tar') |
|
73 |
tar = tarfile.open(filename, 'w') |
|
74 |
add_tar_content(tar, 'foo.txt', 'bar') |
|
75 |
tar.close() |
|
76 | ||
77 |
tar = tarfile.open(filename, 'r') |
|
78 |
tarinfo = tar.getmember('foo.txt') |
|
79 |
assert tar.extractfile(tarinfo).read().decode('utf-8') == 'bar' |
|
80 | ||
81 |
def test_tar_untar_assets(some_assets): |
|
82 |
assert Asset.objects.count() == 2 |
|
83 |
assert count_asset_files() == 2 |
|
84 |
fd = BytesIO() |
|
85 | ||
86 |
tar = tarfile.open(mode='w', fileobj=fd) |
|
87 |
tar_assets_files(tar) |
|
88 |
tar_bytes = fd.getvalue() |
|
89 |
tar.close() |
|
90 | ||
91 |
path = default_storage.path('') |
|
92 |
os.remove('%s/assets/test.png' % path) |
|
93 |
open('%s/assets/test2.png' % path, 'w').write('foo') |
|
94 |
assert count_asset_files() == 1 |
|
95 |
Asset.objects.all().delete() |
|
96 |
assert Asset.objects.count() == 0 |
|
97 |
fd = BytesIO(tar_bytes) |
|
98 | ||
99 |
tar = tarfile.open(mode='r', fileobj=fd) |
|
100 |
data = untar_assets_files(tar) |
|
101 |
assert [x['fields']['key'] for x in data['assets']] == ['banner', 'favicon'] |
|
102 |
assert count_asset_files() == 2 |
|
103 |
assert open('%s/assets/test.png' % path, 'r').read() == 'test' |
|
104 |
assert open('%s/assets/test2.png' % path, 'r').read() == 'foo' |
|
105 |
clean_assets_files() |
|
106 | ||
107 |
def test_import_export_assets(some_assets, tmpdir): |
|
108 |
filename = os.path.join(str(tmpdir), 'file.tar') |
|
109 |
assert Asset.objects.count() == 2 |
|
110 |
assert count_asset_files() == 2 |
|
111 |
fd = open(filename, 'wb') |
|
112 |
export_assets(fd) |
|
113 | ||
114 |
path = default_storage.path('') |
|
115 |
os.remove('%s/assets/test.png' % path) |
|
116 |
open('%s/assets/test2.png' % path, 'w').write('foo') |
|
117 |
assert count_asset_files() == 1 |
|
118 |
Asset.objects.all().delete() |
|
119 |
assert Asset.objects.count() == 0 |
|
120 | ||
121 |
fd = open(filename, 'rb') |
|
122 |
import_assets(fd, overwrite=True) |
|
123 |
assert count_asset_files() == 2 |
|
124 |
assert open('%s/assets/test.png' % path, 'r').read() == 'test' |
|
125 |
assert open('%s/assets/test2.png' % path, 'r').read() == 'test2' |
|
126 |
clean_assets_files() |
|
127 |
assert count_asset_files() == 0 |
|
128 |
clean_assets_files() |
|
45 |
- |