0001-data-merge-assets-and-site-export-file-39425.patch
combo/apps/assets/utils.py | ||
---|---|---|
1 |
# combo - content management system |
|
2 |
# Copyright (C) 2020 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import json |
|
18 |
import os |
|
19 |
import tarfile |
|
20 | ||
21 |
from django.core.files.storage import default_storage |
|
22 |
from django.utils.six import BytesIO |
|
23 | ||
24 |
from .models import Asset |
|
25 | ||
26 | ||
27 |
def clean_assets(): |
|
28 |
media_prefix = default_storage.path('') |
|
29 |
for basedir, dirnames, filenames in os.walk(media_prefix): |
|
30 |
for filename in filenames: |
|
31 |
os.remove('%s/%s' % (basedir, filename)) |
|
32 |
Asset.objects.all().delete() |
|
33 | ||
34 |
def add_tar_content(tar, filename, content): |
|
35 |
file = tarfile.TarInfo(filename) |
|
36 |
fd = BytesIO() |
|
37 |
fd.write(content.encode('utf-8')) |
|
38 |
file.size = fd.tell() |
|
39 |
fd.seek(0) |
|
40 |
tar.addfile(file, fileobj=fd) |
|
41 |
fd.close() |
|
42 | ||
43 |
def import_assets(tar, overwrite=False): |
|
44 |
media_prefix = default_storage.path('') |
|
45 |
for tarinfo in tar.getmembers(): |
|
46 |
filepath = default_storage.path(tarinfo.name) |
|
47 |
if not overwrite and os.path.exists(filepath): |
|
48 |
continue |
|
49 |
if tarinfo.name == '_assets.json': |
|
50 |
json_assets = tar.extractfile(tarinfo).read() |
|
51 |
data = json.loads(json_assets.decode('utf-8')) |
|
52 |
Asset.load_serialized_objects(data.get('assets') or []) |
|
53 |
elif tarinfo.name != '_site.json': |
|
54 |
tar.extract(tarinfo, path=media_prefix) |
|
55 | ||
56 |
def export_assets(tar): |
|
57 |
media_prefix = default_storage.path('') |
|
58 |
for basedir, dirnames, filenames in os.walk(media_prefix): |
|
59 |
for filename in filenames: |
|
60 |
tar.add( |
|
61 |
os.path.join(basedir, filename), |
|
62 |
os.path.join(basedir, filename)[len(media_prefix):]) |
|
63 |
export = {'assets': Asset.export_all_for_json()} |
|
64 |
add_tar_content(tar, '_assets.json', json.dumps(export, indent=2)) |
combo/apps/assets/views.py | ||
---|---|---|
30 | 30 |
from django.utils.translation import ugettext_lazy as _ |
31 | 31 |
from django.views.generic import TemplateView, ListView, FormView |
32 | 32 | |
33 | 33 |
import ckeditor |
34 | 34 |
from sorl.thumbnail.shortcuts import get_thumbnail |
35 | 35 | |
36 | 36 |
from combo.data.models import CellBase |
37 | 37 |
from combo.data.utils import import_site |
38 |
from combo.apps.assets.utils import import_assets, export_assets |
|
38 | 39 | |
39 | 40 |
from .forms import AssetUploadForm, AssetsImportForm |
40 | 41 |
from .models import Asset |
41 | 42 | |
42 | 43 | |
43 | 44 |
class CkEditorAsset(object): |
44 | 45 |
def __init__(self, filepath): |
45 | 46 |
self.filepath = filepath |
... | ... | |
264 | 265 |
class AssetsImport(FormView): |
265 | 266 |
form_class = AssetsImportForm |
266 | 267 |
template_name = 'combo/manager_assets_import.html' |
267 | 268 |
success_url = reverse_lazy('combo-manager-assets') |
268 | 269 | |
269 | 270 |
def form_valid(self, form): |
270 | 271 |
overwrite = form.cleaned_data.get('overwrite') |
271 | 272 |
try: |
272 |
assets = tarfile.open(fileobj=form.cleaned_data['assets_file'])
|
|
273 |
tar = tarfile.open(fileobj=form.cleaned_data['assets_file'])
|
|
273 | 274 |
except tarfile.TarError: |
274 | 275 |
messages.error(self.request, _('The assets file is not valid.')) |
275 | 276 |
return super(AssetsImport, self).form_valid(form) |
276 |
media_prefix = default_storage.path('') |
|
277 |
for tarinfo in assets.getmembers(): |
|
278 |
filepath = default_storage.path(tarinfo.name) |
|
279 |
if not overwrite and os.path.exists(filepath): |
|
280 |
continue |
|
281 |
if tarinfo.name == '_assets.json': |
|
282 |
json_assets = assets.extractfile(tarinfo).read() |
|
283 |
import_site(json.loads(json_assets.decode('utf-8'))) |
|
284 |
else: |
|
285 |
assets.extract(tarinfo, path=media_prefix) |
|
277 |
import_assets(tar, overwrite) |
|
278 |
tar.close() |
|
286 | 279 |
messages.success(self.request, _('The assets file has been imported.')) |
287 | 280 |
return super(AssetsImport, self).form_valid(form) |
288 | 281 | |
289 | 282 |
assets_import = AssetsImport.as_view() |
290 | 283 | |
291 | 284 | |
292 | 285 |
def assets_export(request, *args, **kwargs): |
293 | 286 |
fd = BytesIO() |
294 |
assets_file = tarfile.open('assets.tar', 'w', fileobj=fd) |
|
295 |
media_prefix = default_storage.path('') |
|
296 |
for basedir, dirnames, filenames in os.walk(media_prefix): |
|
297 |
for filename in filenames: |
|
298 |
assets_file.add( |
|
299 |
os.path.join(basedir, filename), |
|
300 |
os.path.join(basedir, filename)[len(media_prefix):]) |
|
301 |
if Asset.objects.exists(): |
|
302 |
json_file = tarfile.TarInfo('_assets.json') |
|
303 |
json_fd = BytesIO() |
|
304 |
export = {'assets': Asset.export_all_for_json(),} |
|
305 |
json_fd.write(json.dumps(export).encode('utf-8')) |
|
306 |
json_file.size = json_fd.tell() |
|
307 |
json_fd.seek(0) |
|
308 |
assets_file.addfile(json_file, fileobj=json_fd) |
|
309 |
assets_file.close() |
|
287 |
tar = tarfile.open('assets.tar', 'w', fileobj=fd) |
|
288 |
export_assets(tar) |
|
289 |
tar.close() |
|
310 | 290 |
return HttpResponse(fd.getvalue(), content_type='application/x-tar') |
311 | 291 | |
312 | 292 | |
313 | 293 |
def serve_asset(request, key): |
314 | 294 |
asset = get_object_or_404(Asset, key=key) |
315 | 295 | |
316 | 296 |
if not os.path.exists(asset.asset.path): |
317 | 297 |
raise Http404() |
combo/data/management/commands/export_site.py | ||
---|---|---|
11 | 11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | 12 |
# GNU Affero General Public License for more details. |
13 | 13 |
# |
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
import json |
18 | 18 |
import sys |
19 |
import tarfile |
|
19 | 20 | |
20 |
from django.core.management.base import BaseCommand |
|
21 |
from django.core.management.base import BaseCommand, CommandError
|
|
21 | 22 | |
22 | 23 |
from combo.data.utils import export_site |
23 | 24 | |
24 | 25 |
class Command(BaseCommand): |
25 | 26 |
help = 'Export the site' |
26 | 27 | |
27 | 28 |
def add_arguments(self, parser): |
28 | 29 |
parser.add_argument( |
29 | 30 |
'--output', metavar='FILE', default=None, |
30 | 31 |
help='name of a file to write output to') |
31 | 32 | |
32 | 33 |
def handle(self, *args, **options): |
33 | 34 |
if options['output'] and options['output'] != '-': |
34 |
output = open(options['output'], 'w') |
|
35 |
try: |
|
36 |
tar = tarfile.open(options['output'], 'w') |
|
37 |
except IOError as e: |
|
38 |
raise CommandError(e) |
|
35 | 39 |
else: |
36 |
output = sys.stdout |
|
37 |
json.dump(export_site(), output, indent=2) |
|
40 |
tar = tarfile.open(mode='w|', fileobj=sys.stdout.buffer) |
|
41 |
export_site(tar) |
|
42 |
tar.close() |
combo/data/management/commands/import_site.py | ||
---|---|---|
11 | 11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | 12 |
# GNU Affero General Public License for more details. |
13 | 13 |
# |
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
import json |
18 | 18 |
import sys |
19 |
import tarfile |
|
19 | 20 | |
20 | 21 |
from django.core.management.base import BaseCommand, CommandError |
21 | 22 |
from django.utils.encoding import force_text |
23 |
from django.utils.six import BytesIO |
|
22 | 24 | |
23 |
from combo.data.utils import import_site, MissingGroups
|
|
25 |
from combo.data.utils import import_site, ImportSiteError
|
|
24 | 26 | |
25 | 27 |
class Command(BaseCommand): |
26 | 28 |
help = 'Import an exported site' |
27 | 29 | |
28 | 30 |
def add_arguments(self, parser): |
29 | 31 |
parser.add_argument('filename', metavar='FILENAME', type=str, |
30 | 32 |
help='name of file to import') |
31 | 33 |
parser.add_argument( |
32 | 34 |
'--clean', action='store_true', default=False, |
33 | 35 |
help='Clean site before importing') |
34 | 36 |
parser.add_argument( |
35 | 37 |
'--if-empty', action='store_true', default=False, |
36 | 38 |
help='Import only if site is empty') |
37 | 39 | |
38 | 40 |
def handle(self, filename, *args, **options): |
39 | 41 |
if filename == '-': |
40 |
fd = sys.stdin
|
|
42 |
fd = BytesIO(sys.stdin.buffer.read())
|
|
41 | 43 |
else: |
42 |
fd = open(filename) |
|
44 |
try: |
|
45 |
fd = open(filename, 'rb') |
|
46 |
except IOError as e: |
|
47 |
raise CommandError(e) |
|
48 | ||
49 |
tar = None |
|
50 |
json_data = None |
|
51 |
if fd != sys.stdin: |
|
52 |
try: |
|
53 |
tar = tarfile.open(fileobj=fd) |
|
54 |
except tarfile.TarError as e: |
|
55 |
pass |
|
56 |
if not tar: |
|
57 |
try: |
|
58 |
json_data = json.loads(force_text(fd.read())) |
|
59 |
except ValueError as e: |
|
60 |
raise CommandError('%s: %s' % (e.__class__.__name__, str(e))) |
|
43 | 61 |
try: |
44 |
import_site(json.load(fd), |
|
62 |
import_site(data=json_data, |
|
63 |
tar=tar, |
|
45 | 64 |
if_empty=options['if_empty'], |
46 | 65 |
clean=options['clean']) |
47 |
except MissingGroups as e:
|
|
66 |
except ImportSiteError as e:
|
|
48 | 67 |
raise CommandError(e) |
68 |
if tar: |
|
69 |
tar.close() |
combo/data/utils.py | ||
---|---|---|
9 | 9 |
# This program is distributed in the hope that it will be useful, |
10 | 10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | 11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | 12 |
# GNU Affero General Public License for more details. |
13 | 13 |
# |
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import json |
|
18 | ||
17 | 19 |
from django.contrib.auth.models import Group |
18 | 20 |
from django.db import transaction |
19 | 21 |
from django.utils import six |
20 | 22 |
from django.utils.encoding import python_2_unicode_compatible |
21 | 23 |
from django.utils.translation import ugettext_lazy as _ |
22 | 24 | |
23 | 25 |
from combo.apps.assets.models import Asset |
26 |
from combo.apps.assets.utils import add_tar_content, clean_assets, import_assets, export_assets |
|
24 | 27 |
from combo.apps.maps.models import MapLayer |
25 | 28 |
from combo.apps.pwa.models import PwaSettings, PwaNavigationEntry |
26 | 29 |
from .models import Page |
27 | 30 | |
28 | 31 | |
32 |
class ImportSiteError(Exception): |
|
33 |
pass |
|
34 | ||
29 | 35 |
@python_2_unicode_compatible |
30 |
class MissingGroups(Exception):
|
|
36 |
class MissingGroups(ImportSiteError):
|
|
31 | 37 |
def __init__(self, names): |
32 | 38 |
self.names = names |
33 | 39 | |
34 | 40 |
def __str__(self): |
35 | 41 |
return _('Missing groups: %s') % ', '.join(self.names) |
36 | 42 | |
37 | 43 | |
38 |
def export_site(): |
|
39 |
'''Dump site objects to JSON-dumpable dictionnary''' |
|
40 |
return {'pages': Page.export_all_for_json(), |
|
41 |
'map-layers': MapLayer.export_all_for_json(), |
|
42 |
'assets': Asset.export_all_for_json(), |
|
43 |
'pwa': { |
|
44 |
'settings': PwaSettings.export_for_json(), |
|
45 |
'navigation': PwaNavigationEntry.export_all_for_json(), |
|
46 |
} |
|
47 |
} |
|
44 |
def export_site(tar): |
|
45 |
data = { |
|
46 |
'pages': Page.export_all_for_json(), |
|
47 |
'map-layers': MapLayer.export_all_for_json(), |
|
48 |
'pwa': { |
|
49 |
'settings': PwaSettings.export_for_json(), |
|
50 |
'navigation': PwaNavigationEntry.export_all_for_json(), |
|
51 |
} |
|
52 |
} |
|
53 |
add_tar_content(tar, '_site.json', json.dumps(data, indent=2)) |
|
54 |
export_assets(tar) |
|
48 | 55 | |
49 | ||
50 |
def import_site(data, if_empty=False, clean=False): |
|
56 |
def import_site(data=None, tar=None, if_empty=False, clean=False): |
|
51 | 57 |
if isinstance(data, list): |
52 | 58 |
# old export form with a list of pages, convert it to new dictionary |
53 | 59 |
# format. |
54 | 60 |
data = {'pages': data} |
55 | 61 | |
56 | 62 |
if if_empty and (Page.objects.count() or MapLayer.objects.count()): |
57 | 63 |
return |
58 | 64 | |
65 |
if tar: |
|
66 |
try: |
|
67 |
tarinfo = tar.getmember('_site.json') |
|
68 |
except KeyError: |
|
69 |
raise ImportSiteError(_('TAR file should provide _site.json file')) |
|
70 |
json_data = tar.extractfile(tarinfo).read() |
|
71 |
data = json.loads(json_data.decode('utf-8')) |
|
72 | ||
59 | 73 |
# check groups used in access control are all available. |
60 | 74 |
groups = set() |
61 | 75 |
for page in data.get('pages') or []: |
62 | 76 |
for group in page['fields']['groups']: |
63 | 77 |
groups.add(group if isinstance(group, six.string_types) else group[0]) |
64 | 78 |
for cell in page['cells']: |
65 | 79 |
for group in cell['fields']['groups']: |
66 | 80 |
groups.add(group if isinstance(group, six.string_types) else group[0]) |
... | ... | |
68 | 82 |
existing_groups = set([x.name for x in Group.objects.filter(name__in=groups)]) |
69 | 83 |
missing_groups = groups - existing_groups |
70 | 84 |
if missing_groups: |
71 | 85 |
raise MissingGroups(names=sorted([x for x in missing_groups])) |
72 | 86 | |
73 | 87 |
with transaction.atomic(): |
74 | 88 |
if clean: |
75 | 89 |
MapLayer.objects.all().delete() |
90 |
clean_assets() |
|
76 | 91 |
Asset.objects.all().delete() |
77 | 92 |
Page.objects.all().delete() |
78 | 93 |
PwaSettings.objects.all().delete() |
79 | 94 |
PwaNavigationEntry.objects.all().delete() |
80 | 95 | |
81 | 96 |
MapLayer.load_serialized_objects(data.get('map-layers') or []) |
82 |
Asset.load_serialized_objects(data.get('assets') or []) |
|
97 |
if not tar: |
|
98 |
Asset.load_serialized_objects(data.get('assets') or []) |
|
99 |
else: |
|
100 |
import_assets(tar, overwrite=not if_empty) |
|
83 | 101 |
Page.load_serialized_pages(data.get('pages') or []) |
84 | 102 | |
85 | 103 |
if data.get('pwa'): |
86 | 104 |
PwaSettings.load_serialized_settings(data['pwa'].get('settings')) |
87 | 105 |
PwaNavigationEntry.load_serialized_objects(data['pwa'].get('navigation')) |
combo/manager/views.py | ||
---|---|---|
11 | 11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | 12 |
# GNU Affero General Public License for more details. |
13 | 13 |
# |
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
import hashlib |
18 | 18 |
import json |
19 |
import os
|
|
19 |
import tarfile
|
|
20 | 20 | |
21 | 21 |
from django.conf import settings |
22 | 22 |
from django.contrib import messages |
23 |
from django.core.exceptions import ObjectDoesNotExist, PermissionDenied
|
|
23 |
from django.core.exceptions import ObjectDoesNotExist |
|
24 | 24 |
from django.core.urlresolvers import reverse, reverse_lazy |
25 | 25 |
from django.http import HttpResponse, HttpResponseRedirect, Http404 |
26 | 26 |
from django.shortcuts import redirect |
27 | 27 |
from django.shortcuts import get_object_or_404 |
28 |
from django.utils.six import BytesIO |
|
28 | 29 |
from django.utils.translation import ugettext_lazy as _ |
29 | 30 |
from django.utils.encoding import force_text, force_bytes |
30 | 31 |
from django.utils.formats import date_format |
31 | 32 |
from django.utils.timezone import localtime |
32 | 33 |
from django.views.decorators.csrf import requires_csrf_token |
33 | 34 |
from django.views.generic import (RedirectView, DetailView, |
34 | 35 |
CreateView, UpdateView, ListView, DeleteView, FormView) |
35 | 36 | |
36 | 37 |
from combo.data.models import Page, CellBase, ParentContentCell, PageSnapshot, LinkListCell |
37 | 38 |
from combo.data.library import get_cell_class |
38 |
from combo.data.utils import export_site, import_site, MissingGroups
|
|
39 |
from combo.data.utils import export_site, import_site, ImportSiteError
|
|
39 | 40 |
from combo import plugins |
40 | 41 | |
41 | 42 |
from .forms import (PageEditTitleForm, PageVisibilityForm, SiteImportForm, |
42 | 43 |
PageEditRedirectionForm, PageSelectTemplateForm, PageEditSlugForm, |
43 | 44 |
PageEditPictureForm, PageEditIncludeInNavigationForm, |
44 | 45 |
PageEditDescriptionForm, CellVisibilityForm) |
45 | 46 | |
46 | 47 | |
... | ... | |
56 | 57 | |
57 | 58 |
homepage = HomepageView.as_view() |
58 | 59 | |
59 | 60 | |
60 | 61 |
class SiteExportView(ListView): |
61 | 62 |
model = Page |
62 | 63 | |
63 | 64 |
def render_to_response(self, context, **response_kwargs): |
64 |
response = HttpResponse(content_type='application/json') |
|
65 |
json.dump(export_site(), response, indent=2) |
|
66 |
return response |
|
65 |
fd = BytesIO() |
|
66 |
tar = tarfile.open(mode='w', fileobj=fd) |
|
67 |
export_site(tar) |
|
68 |
tar.close() |
|
69 |
return HttpResponse(fd.getvalue(), content_type='application/x-tar') |
|
67 | 70 | |
68 | 71 |
site_export = SiteExportView.as_view() |
69 | 72 | |
70 | 73 | |
71 | 74 |
class SiteImportView(FormView): |
72 | 75 |
form_class = SiteImportForm |
73 | 76 |
template_name = 'combo/site_import.html' |
74 | 77 |
success_url = reverse_lazy('combo-manager-homepage') |
75 | 78 | |
76 | 79 |
def form_valid(self, form): |
80 |
content = self.request.FILES['site_json'].read() |
|
81 |
fd = BytesIO(content) |
|
82 |
tar = None |
|
83 |
json_data = None |
|
77 | 84 |
try: |
78 |
json_site = json.loads(force_text(self.request.FILES['site_json'].read())) |
|
79 |
except ValueError: |
|
80 |
form.add_error('site_json', _('File is not in the expected JSON format.')) |
|
81 |
return self.form_invalid(form) |
|
82 | ||
85 |
tar = tarfile.open(fileobj=fd) |
|
86 |
except tarfile.TarError as e: |
|
87 |
try: |
|
88 |
json_data = json.loads(force_text(content)) |
|
89 |
except ValueError: |
|
90 |
form.add_error('site_json', _('File is not in the expected JSON format.')) |
|
91 |
return self.form_invalid(form) |
|
83 | 92 |
try: |
84 |
import_site(json_site)
|
|
85 |
except MissingGroups as e:
|
|
93 |
import_site(data=json_data, tar=tar)
|
|
94 |
except ImportSiteError as e:
|
|
86 | 95 |
form.add_error('site_json', force_text(e)) |
87 | 96 |
return self.form_invalid(form) |
97 |
if tar: |
|
98 |
tar.close() |
|
88 | 99 | |
89 | 100 |
return super(SiteImportView, self).form_valid(form) |
90 | 101 | |
91 | 102 |
site_import = SiteImportView.as_view() |
92 | 103 | |
93 | 104 | |
94 | 105 |
class PageAddView(CreateView): |
95 | 106 |
model = Page |
tests/test_assets.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 | 3 |
import base64 |
4 |
import os |
|
5 |
import tarfile |
|
4 | 6 | |
5 | 7 |
from django.core.urlresolvers import reverse |
8 |
from django.core.files.storage import default_storage |
|
9 |
from django.core.files import File |
|
10 |
from django.utils.six import BytesIO |
|
6 | 11 | |
7 | 12 |
import pytest |
8 | 13 | |
9 | 14 |
from combo.apps.assets.models import Asset |
15 |
from combo.apps.assets.utils import add_tar_content, clean_assets, export_assets, import_assets |
|
10 | 16 | |
11 | 17 |
pytestmark = pytest.mark.django_db |
12 | 18 | |
19 |
@pytest.fixture |
|
20 |
def some_assets(): |
|
21 |
Asset(key='banner', asset=File(BytesIO(b'test'), 'test.png')).save() |
|
22 |
Asset(key='favicon', asset=File(BytesIO(b'test2'), 'test2.png')).save() |
|
23 | ||
24 |
def count_asset_files(): |
|
25 |
nb_assets = 0 |
|
26 |
media_prefix = default_storage.path('') |
|
27 |
for basedir, dirnames, filenames in os.walk(media_prefix): |
|
28 |
nb_assets += len(filenames) |
|
29 |
return nb_assets |
|
30 | ||
13 | 31 |
def test_asset_set_api(app, john_doe): |
14 | 32 |
app.authorization = ('Basic', (john_doe.username, john_doe.username)) |
15 | 33 |
resp = app.post_json(reverse('api-assets-set', kwargs={'key': 'plop'}), params={ |
16 | 34 |
'asset': { |
17 | 35 |
'content': base64.encodebytes(b'plop').decode('ascii'), |
18 | 36 |
'content_type': 'text/plain', |
19 | 37 |
'filename': 'plop.txt', |
20 | 38 |
} |
... | ... | |
37 | 55 |
resp = app.post_json(reverse('api-assets-set', kwargs={'key': 'plop'}), params={ |
38 | 56 |
'asset': { |
39 | 57 |
'content': invalid_value, |
40 | 58 |
'content_type': 'text/plain', |
41 | 59 |
'filename': 'plop.txt', |
42 | 60 |
} |
43 | 61 |
}, status=400) |
44 | 62 |
assert resp.json.get('err') == 1 |
63 | ||
64 |
def test_add_tar_content(tmpdir): |
|
65 |
filename = os.path.join(str(tmpdir), 'file.tar') |
|
66 |
tar = tarfile.open(filename, 'w') |
|
67 |
add_tar_content(tar, 'foo.txt', 'bar') |
|
68 |
tar.close() |
|
69 | ||
70 |
tar = tarfile.open(filename, 'r') |
|
71 |
tarinfo = tar.getmember('foo.txt') |
|
72 |
assert tar.extractfile(tarinfo).read().decode('utf-8') == 'bar' |
|
73 | ||
74 |
def test_import_export_assets(app, some_assets): |
|
75 |
assert Asset.objects.count() == 2 |
|
76 |
assert count_asset_files() == 2 |
|
77 |
fd = BytesIO() |
|
78 |
assets_file = tarfile.open(mode='w', fileobj=fd) |
|
79 |
export_assets(assets_file) |
|
80 |
tar_bytes = fd.getvalue() |
|
81 |
path = default_storage.path('') |
|
82 | ||
83 |
Asset.objects.all().delete() |
|
84 |
assert Asset.objects.count() == 0 |
|
85 |
os.remove('%s/assets/test.png' % path) |
|
86 |
open('%s/assets/test2.png' % path, 'w').write('foo') |
|
87 |
fd = BytesIO(tar_bytes) |
|
88 |
assert count_asset_files() == 1 |
|
89 |
assets_file = tarfile.open(fileobj=fd) |
|
90 |
import_assets(assets_file) |
|
91 |
assert Asset.objects.count() == 2 |
|
92 |
assert count_asset_files() == 2 |
|
93 | ||
94 |
assert open('%s/assets/test.png' % path, 'r').read() == 'test' |
|
95 |
assert open('%s/assets/test2.png' % path, 'r').read() == 'foo' |
|
96 |
fd = BytesIO(tar_bytes) |
|
97 |
assets_file = tarfile.open(fileobj=fd) |
|
98 |
import_assets(assets_file, True) |
|
99 |
assert open('%s/assets/test2.png' % path, 'r').read() == 'test2' |
|
100 |
clean_assets() |
|
101 |
assert count_asset_files() == 0 |
|
102 |
assert Asset.objects.count() == 0 |
tests/test_import_export.py | ||
---|---|---|
1 | 1 |
import base64 |
2 |
import datetime
|
|
2 |
import io
|
|
3 | 3 |
import json |
4 | 4 |
import os |
5 | 5 |
import shutil |
6 | 6 |
import sys |
7 |
import tarfile |
|
7 | 8 |
import tempfile |
8 | 9 | |
9 | 10 |
import pytest |
10 | 11 |
from django.contrib.auth.models import Group |
11 | 12 |
from django.core.files import File |
12 | 13 |
from django.core.management import call_command |
13 | 14 |
from django.core.management.base import CommandError |
14 | 15 |
from django.utils.encoding import force_bytes, force_text |
15 | 16 |
from django.utils.six import BytesIO, StringIO |
16 | 17 | |
17 | 18 |
from combo.apps.assets.models import Asset |
19 |
from combo.apps.assets.utils import export_assets |
|
18 | 20 |
from combo.apps.gallery.models import Image, GalleryCell |
19 | 21 |
from combo.apps.maps.models import MapLayer, Map |
20 | 22 |
from combo.apps.pwa.models import PwaSettings, PwaNavigationEntry |
21 | 23 |
from combo.data.models import Page, TextCell |
22 |
from combo.data.utils import export_site, import_site, MissingGroups
|
|
24 |
from combo.data.utils import import_site, ImportSiteError
|
|
23 | 25 | |
24 | 26 |
pytestmark = pytest.mark.django_db |
25 | 27 | |
26 | 28 | |
27 | 29 |
@pytest.fixture |
28 | 30 |
def some_data(): |
29 | 31 |
page = Page(title='One', slug='one') |
30 | 32 |
page.save() |
... | ... | |
40 | 42 |
MapLayer(label='Foo', slug='foo', geojson_url='http://example.net/foo/').save() |
41 | 43 |
MapLayer(label='Bar', slug='bar', geojson_url='http://example.net/bar/').save() |
42 | 44 | |
43 | 45 |
@pytest.fixture |
44 | 46 |
def some_assets(): |
45 | 47 |
Asset(key='banner', asset=File(BytesIO(b'test'), 'test.png')).save() |
46 | 48 |
Asset(key='favicon', asset=File(BytesIO(b'test2'), 'test2.png')).save() |
47 | 49 | |
48 |
def get_output_of_command(command, *args, **kwargs): |
|
49 |
old_stdout = sys.stdout |
|
50 |
output = sys.stdout = StringIO() |
|
50 |
def get_output_of_command(capsysbinary, command, *args, **kwargs): |
|
51 |
captured = capsysbinary.readouterr() |
|
51 | 52 |
call_command(command, *args, **kwargs) |
52 |
sys.stdout = old_stdout |
|
53 |
return output.getvalue() |
|
54 | ||
55 |
def test_import_export(app, some_data): |
|
56 |
output = get_output_of_command('export_site') |
|
57 |
assert len(json.loads(output)['pages']) == 3 |
|
58 |
import_site(data={}, clean=True) |
|
59 |
assert Page.objects.all().count() == 0 |
|
60 |
assert TextCell.objects.all().count() == 0 |
|
61 |
empty_output = get_output_of_command('export_site') |
|
62 |
assert len(json.loads(empty_output)['pages']) == 0 |
|
63 | ||
64 |
Page(title='test', slug='test').save() |
|
65 |
old_stdin = sys.stdin |
|
66 |
sys.stdin = StringIO(json.dumps({})) |
|
67 |
assert Page.objects.count() == 1 |
|
68 |
try: |
|
53 |
captured = capsysbinary.readouterr() |
|
54 |
return captured.out |
|
55 | ||
56 |
def get_embedded_json(tar_as_bytes, filename): |
|
57 |
fd = BytesIO(tar_as_bytes) |
|
58 |
tar = tarfile.open(fileobj=fd) |
|
59 |
tarinfo = tar.getmember(filename) |
|
60 |
data = tar.extractfile(tarinfo).read() |
|
61 |
tar.close() |
|
62 |
return json.loads(data.decode('utf-8')) |
|
63 | ||
64 |
def test_import_export(app, some_data, capsysbinary, monkeypatch): |
|
65 |
output = get_output_of_command(capsysbinary, 'export_site') |
|
66 |
with capsysbinary.disabled(): |
|
67 |
assert len(get_embedded_json(output, '_site.json')['pages']) == 3 |
|
68 |
import_site(data={}, clean=True) |
|
69 |
assert Page.objects.all().count() == 0 |
|
70 |
assert TextCell.objects.all().count() == 0 |
|
71 |
empty_output = get_output_of_command(capsysbinary, 'export_site') |
|
72 |
with capsysbinary.disabled(): |
|
73 |
assert len(get_embedded_json(empty_output, '_site.json')['pages']) == 0 |
|
74 | ||
75 |
Page(title='test', slug='test').save() |
|
76 |
sys.stdin = StringIO(json.dumps({})) |
|
77 |
assert Page.objects.count() == 1 |
|
78 |
monkeypatch.setattr('sys.stdin', io.TextIOWrapper(BytesIO(empty_output))) |
|
69 | 79 |
call_command('import_site', '-', clean=True) |
70 |
finally: |
|
71 |
sys.stdin = old_stdin |
|
72 |
assert Page.objects.count() == 0 |
|
80 |
assert Page.objects.count() == 0 |
|
73 | 81 | |
74 |
with tempfile.NamedTemporaryFile() as f: |
|
75 |
f.write(force_bytes(output)) |
|
76 |
f.flush() |
|
77 |
call_command('import_site', f.name) |
|
78 | ||
79 |
assert Page.objects.count() == 3 |
|
80 |
assert TextCell.objects.all().count() == 1 |
|
81 | ||
82 |
import_site(data={}, if_empty=True) |
|
83 |
assert Page.objects.count() == 3 |
|
84 |
assert TextCell.objects.all().count() == 1 |
|
85 | ||
86 |
import_site(data=[], clean=True) |
|
87 |
tempdir = tempfile.mkdtemp('chrono-test') |
|
88 |
empty_output = get_output_of_command('export_site', output=os.path.join(tempdir, 't.json')) |
|
89 |
assert os.path.exists(os.path.join(tempdir, 't.json')) |
|
90 |
shutil.rmtree(tempdir) |
|
82 |
with tempfile.NamedTemporaryFile() as f: |
|
83 |
f.write(force_bytes(output)) |
|
84 |
f.flush() |
|
85 |
call_command('import_site', f.name) |
|
86 | ||
87 |
assert Page.objects.count() == 3 |
|
88 |
assert TextCell.objects.all().count() == 1 |
|
89 | ||
90 |
import_site(data={}, if_empty=True) |
|
91 |
assert Page.objects.count() == 3 |
|
92 |
assert TextCell.objects.all().count() == 1 |
|
93 | ||
94 |
import_site(data=[], clean=True) |
|
95 |
tempdir = tempfile.mkdtemp('chrono-test') |
|
96 |
empty_output = get_output_of_command( |
|
97 |
capsysbinary, 'export_site', output=os.path.join(tempdir, 't.json')) |
|
98 |
with capsysbinary.disabled(): |
|
99 |
assert os.path.exists(os.path.join(tempdir, 't.json')) |
|
100 |
shutil.rmtree(tempdir) |
|
101 | ||
102 |
def test_tar_import_format_error(app, tmpdir): |
|
103 |
filename = os.path.join(str(tmpdir), 'file.tar') |
|
104 |
tar = tarfile.open(filename, 'w') |
|
105 |
export_assets(tar) |
|
106 |
tar.close() |
|
107 | ||
108 |
with pytest.raises(ImportSiteError, match='should provide _site.json file'): |
|
109 |
import_site(tar=tarfile.open(filename)) |
|
110 |
with pytest.raises(CommandError, match='should provide _site.json file'): |
|
111 |
call_command('import_site', filename) |
|
112 | ||
113 |
with pytest.raises(CommandError, match=r'No such file or directory'): |
|
114 |
call_command('export_site', '--output', '%s/noway/foo.tar' % tmpdir) |
|
115 |
with pytest.raises(CommandError, match=r'No such file or directory'): |
|
116 |
call_command('import_site', '%s/noway/foo.tar' % tmpdir) |
|
117 |
open(filename, 'w').write('foo') |
|
118 |
with pytest.raises(CommandError, match=r'JSON'): |
|
119 |
call_command('import_site', filename) |
|
91 | 120 | |
92 | 121 |
def test_backward_compatibility_import(app, some_data): |
93 | 122 |
old_export = Page.export_all_for_json() |
94 | 123 |
Page.objects.all().delete() |
95 | 124 |
import_site(data=old_export) |
96 | 125 |
assert Page.objects.count() == 3 |
97 | 126 | |
98 |
def test_import_export_map_layers(app, some_map_layers): |
|
99 |
output = get_output_of_command('export_site') |
|
100 |
assert len(json.loads(output)['map-layers']) == 2 |
|
101 |
import_site(data={}, clean=True) |
|
102 |
assert MapLayer.objects.all().count() == 0 |
|
103 |
empty_output = get_output_of_command('export_site') |
|
104 |
assert len(json.loads(empty_output)['map-layers']) == 0 |
|
105 | ||
106 |
MapLayer(label='Baz', slug='baz', geojson_url='http://example.net/baz/').save() |
|
107 |
old_stdin = sys.stdin |
|
108 |
sys.stdin = StringIO(json.dumps({})) |
|
109 |
assert MapLayer.objects.count() == 1 |
|
110 |
try: |
|
111 |
call_command('import_site', '-', clean=True) |
|
112 |
finally: |
|
113 |
sys.stdin = old_stdin |
|
114 |
assert MapLayer.objects.count() == 0 |
|
115 | ||
116 |
with tempfile.NamedTemporaryFile() as f: |
|
117 |
f.write(force_bytes(output)) |
|
118 |
f.flush() |
|
119 |
call_command('import_site', f.name) |
|
120 | ||
121 |
assert MapLayer.objects.count() == 2 |
|
122 | ||
123 |
import_site(data={}, if_empty=True) |
|
124 |
assert MapLayer.objects.count() == 2 |
|
125 | ||
126 |
def test_import_export_map_cells(app, some_data, some_map_layers): |
|
127 |
page = Page.objects.get(slug='one') |
|
128 |
cell = Map(page=page, order=0, placeholder='content') |
|
129 |
cell.save() |
|
130 |
cell.layers.add(MapLayer.objects.get(slug='foo')) |
|
131 |
cell.save() |
|
132 |
site_export = get_output_of_command('export_site') |
|
133 |
import_site(data={}, clean=True) |
|
134 |
assert Map.objects.count() == 0 |
|
135 | ||
136 |
import_site(data=json.loads(site_export), clean=True) |
|
137 |
assert Map.objects.count() == 1 |
|
138 |
assert Map.objects.all()[0].layers.all()[0].slug == 'foo' |
|
139 | ||
140 |
def test_group_restrictions_import_export(app, some_data): |
|
141 |
group = Group(name='A Group') |
|
142 |
group.save() |
|
143 | ||
144 |
page = Page.objects.get(slug='one') |
|
145 |
page.groups.set([group]) |
|
146 |
page.save() |
|
147 | ||
148 |
cell = TextCell.objects.get(order=0) |
|
149 |
cell.groups.set([group]) |
|
150 |
cell.save() |
|
151 | ||
152 |
output = get_output_of_command('export_site') |
|
153 |
assert len(json.loads(output)['pages']) == 3 |
|
154 |
import_site(data={}, clean=True) |
|
155 |
assert Page.objects.all().count() == 0 |
|
156 |
assert TextCell.objects.all().count() == 0 |
|
157 | ||
158 |
Group.objects.all().delete() |
|
127 |
def test_import_export_map_layers(app, some_map_layers, capsysbinary, monkeypatch): |
|
128 |
output = get_output_of_command(capsysbinary, 'export_site') |
|
129 |
with capsysbinary.disabled(): |
|
130 |
assert len(get_embedded_json(output, '_site.json')['map-layers']) == 2 |
|
131 |
import_site(data={}, clean=True) |
|
132 |
assert MapLayer.objects.all().count() == 0 |
|
133 |
empty_output = get_output_of_command(capsysbinary, 'export_site') |
|
134 |
with capsysbinary.disabled(): |
|
135 |
assert len(get_embedded_json(empty_output, '_site.json')['map-layers']) == 0 |
|
159 | 136 | |
160 |
with pytest.raises(MissingGroups) as excinfo:
|
|
161 |
import_site(json.loads(output), clean=True)
|
|
137 |
MapLayer(label='Baz', slug='baz', geojson_url='http://example.net/baz/').save()
|
|
138 |
assert MapLayer.objects.count() == 1
|
|
162 | 139 | |
163 |
assert excinfo.value.names == ['A Group'] |
|
140 |
monkeypatch.setattr('sys.stdin', io.TextIOWrapper(BytesIO(json.dumps({}).encode("utf-8")))) |
|
141 |
call_command('import_site', '-', clean=True) |
|
142 |
assert MapLayer.objects.count() == 0 |
|
164 | 143 | |
165 |
with pytest.raises(CommandError, match='Missing groups: A Group'): |
|
166 | 144 |
with tempfile.NamedTemporaryFile() as f: |
167 | 145 |
f.write(force_bytes(output)) |
168 | 146 |
f.flush() |
169 |
call_command('import_site', f.name, clean=True) |
|
170 |
assert Page.objects.count() == 0 |
|
171 | ||
172 |
group = Group(name='A Group') |
|
173 |
group.save() |
|
174 | ||
175 |
import_site(json.loads(output), clean=True) |
|
176 |
assert Page.objects.all().count() == 3 |
|
177 |
assert TextCell.objects.all().count() == 1 |
|
178 | ||
179 |
page = Page.objects.get(slug='one') |
|
180 |
assert [x.name for x in page.groups.all()] == ['A Group'] |
|
181 | ||
182 |
cell = TextCell.objects.get(order=0) |
|
183 |
assert [x.name for x in cell.groups.all()] == ['A Group'] |
|
184 | ||
185 |
def test_import_export_assets(app, some_assets): |
|
186 |
output = get_output_of_command('export_site') |
|
187 |
assert len(json.loads(output)['assets']) == 2 |
|
188 |
import_site(data={}, clean=True) |
|
189 |
assert Asset.objects.all().count() == 0 |
|
190 |
empty_output = get_output_of_command('export_site') |
|
191 |
assert len(json.loads(empty_output)['assets']) == 0 |
|
192 | ||
193 |
Asset(key='footer', asset=File(StringIO('test3'), 'test3.png')).save() |
|
194 |
old_stdin = sys.stdin |
|
195 |
sys.stdin = StringIO(json.dumps({})) |
|
196 |
assert Asset.objects.count() == 1 |
|
197 |
try: |
|
147 |
call_command('import_site', f.name) |
|
148 | ||
149 |
assert MapLayer.objects.count() == 2 |
|
150 | ||
151 |
import_site(data={}, if_empty=True) |
|
152 |
assert MapLayer.objects.count() == 2 |
|
153 | ||
154 |
def test_import_export_map_cells(app, some_data, some_map_layers, capsysbinary): |
|
155 |
with capsysbinary.disabled(): |
|
156 |
page = Page.objects.get(slug='one') |
|
157 |
cell = Map(page=page, order=0, placeholder='content') |
|
158 |
cell.save() |
|
159 |
cell.layers.add(MapLayer.objects.get(slug='foo')) |
|
160 |
cell.save() |
|
161 |
site_export = get_output_of_command(capsysbinary, 'export_site') |
|
162 |
with capsysbinary.disabled(): |
|
163 |
import_site(data={}, clean=True) |
|
164 |
assert Map.objects.count() == 0 |
|
165 | ||
166 |
import_site(tar=tarfile.open(fileobj=BytesIO(site_export)), clean=True) |
|
167 |
assert Map.objects.count() == 1 |
|
168 |
assert Map.objects.all()[0].layers.all()[0].slug == 'foo' |
|
169 | ||
170 |
def test_group_restrictions_import_export(app, some_data, capsysbinary): |
|
171 |
with capsysbinary.disabled(): |
|
172 |
group = Group(name='A Group') |
|
173 |
group.save() |
|
174 | ||
175 |
page = Page.objects.get(slug='one') |
|
176 |
page.groups.set([group]) |
|
177 |
page.save() |
|
178 | ||
179 |
cell = TextCell.objects.get(order=0) |
|
180 |
cell.groups.set([group]) |
|
181 |
cell.save() |
|
182 | ||
183 |
output = get_output_of_command(capsysbinary, 'export_site') |
|
184 |
with capsysbinary.disabled(): |
|
185 |
assert len(get_embedded_json(output, '_site.json')['pages']) == 3 |
|
186 |
import_site(data={}, clean=True) |
|
187 |
assert Page.objects.all().count() == 0 |
|
188 |
assert TextCell.objects.all().count() == 0 |
|
189 | ||
190 |
Group.objects.all().delete() |
|
191 | ||
192 |
with pytest.raises(ImportSiteError) as excinfo: |
|
193 |
import_site(tar=tarfile.open(fileobj=BytesIO(output)), clean=True) |
|
194 | ||
195 |
assert excinfo.value.names == ['A Group'] |
|
196 | ||
197 |
with pytest.raises(CommandError, match='Missing groups: A Group'): |
|
198 |
with tempfile.NamedTemporaryFile() as f: |
|
199 |
f.write(force_bytes(output)) |
|
200 |
f.flush() |
|
201 |
call_command('import_site', f.name, clean=True) |
|
202 |
assert Page.objects.count() == 0 |
|
203 | ||
204 |
group = Group(name='A Group') |
|
205 |
group.save() |
|
206 | ||
207 |
import_site(tar=tarfile.open(fileobj=BytesIO(output)), clean=True) |
|
208 |
assert Page.objects.all().count() == 3 |
|
209 |
assert TextCell.objects.all().count() == 1 |
|
210 | ||
211 |
page = Page.objects.get(slug='one') |
|
212 |
assert [x.name for x in page.groups.all()] == ['A Group'] |
|
213 | ||
214 |
cell = TextCell.objects.get(order=0) |
|
215 |
assert [x.name for x in cell.groups.all()] == ['A Group'] |
|
216 | ||
217 |
def test_import_export_assets(app, some_assets, capsysbinary, monkeypatch): |
|
218 |
output = get_output_of_command(capsysbinary, 'export_site') |
|
219 |
with capsysbinary.disabled(): |
|
220 |
assert len(get_embedded_json(output, '_assets.json')['assets']) == 2 |
|
221 |
import_site(data={}, clean=True) |
|
222 |
assert Asset.objects.all().count() == 0 |
|
223 |
empty_output = get_output_of_command(capsysbinary, 'export_site') |
|
224 |
with capsysbinary.disabled(): |
|
225 |
assert len(get_embedded_json(empty_output, '_assets.json')['assets']) == 0 |
|
226 | ||
227 |
Asset(key='footer', asset=File(StringIO('test3'), 'test3.png')).save() |
|
228 |
monkeypatch.setattr('sys.stdin', io.TextIOWrapper(BytesIO(json.dumps({}).encode("utf-8")))) |
|
229 |
assert Asset.objects.count() == 1 |
|
198 | 230 |
call_command('import_site', '-', clean=True) |
199 |
finally: |
|
200 |
sys.stdin = old_stdin |
|
201 |
assert Asset.objects.count() == 0 |
|
202 | ||
203 |
with tempfile.NamedTemporaryFile() as f: |
|
204 |
f.write(force_bytes(output)) |
|
205 |
f.flush() |
|
206 |
call_command('import_site', f.name) |
|
207 | ||
208 |
assert Asset.objects.count() == 2 |
|
209 | ||
210 |
import_site(data={}, if_empty=True) |
|
211 |
assert Asset.objects.count() == 2 |
|
212 | ||
213 |
def test_import_export_pwa_settings(app): |
|
214 |
output = get_output_of_command('export_site') |
|
215 |
pwa_settings = PwaSettings.singleton() |
|
216 |
pwa_settings.offline_text = 'Hello world' |
|
217 |
pwa_settings.offline_retry_button = False |
|
218 |
pwa_settings.save() |
|
219 |
output = get_output_of_command('export_site') |
|
220 |
import_site(data={}, clean=True) |
|
221 |
assert PwaSettings.objects.all().count() == 0 |
|
222 | ||
223 |
import_site(data=json.loads(output)) |
|
224 |
assert PwaSettings.singleton().offline_retry_button is False |
|
225 |
assert PwaSettings.singleton().offline_text == 'Hello world' |
|
226 | ||
227 |
def test_import_export_pwa_navigation(app, some_data): |
|
228 |
page = Page.objects.get(slug='one') |
|
229 |
entry1 = PwaNavigationEntry(label='a', url='/', order=0) |
|
230 |
entry2 = PwaNavigationEntry(link_page=page, order=1, icon=File(BytesIO(b'te\30st'), 'test.png')) |
|
231 |
entry1.save() |
|
232 |
entry2.save() |
|
233 |
output = get_output_of_command('export_site') |
|
234 |
import_site(data={}, clean=True) |
|
235 |
assert PwaNavigationEntry.objects.all().count() == 0 |
|
236 | ||
237 |
import_site(data=json.loads(output)) |
|
238 |
assert PwaNavigationEntry.objects.all().count() == 2 |
|
239 |
# check identical file was not touched |
|
240 |
assert os.path.basename(PwaNavigationEntry.objects.get(order=1).icon.file.name) == 'test.png' |
|
241 |
assert PwaNavigationEntry.objects.get(order=1).icon.read() == b'te\30st' |
|
242 | ||
243 |
# check a second import doesn't create additional entries |
|
244 |
import_site(data=json.loads(output)) |
|
245 |
assert PwaNavigationEntry.objects.all().count() == 2 |
|
246 | ||
247 |
# check with a change in icon file content |
|
248 |
data = json.loads(output) |
|
249 |
data['pwa']['navigation'][1]['icon:base64'] = force_text(base64.encodebytes(b'TEST')) |
|
250 |
import_site(data=data) |
|
251 |
assert PwaNavigationEntry.objects.all().count() == 2 |
|
252 |
assert PwaNavigationEntry.objects.get(order=1).icon.read() == b'TEST' |
|
253 | ||
254 |
# check with a change in icon file name |
|
255 |
data = json.loads(output) |
|
256 |
data['pwa']['navigation'][1]['fields']['icon'] = 'pwa/test2.png' |
|
257 |
data['pwa']['navigation'][1]['icon:base64'] = force_text(base64.encodebytes(b'TEST2')) |
|
258 |
import_site(data=data) |
|
259 |
assert PwaNavigationEntry.objects.all().count() == 2 |
|
260 |
assert os.path.basename(PwaNavigationEntry.objects.get(order=1).icon.file.name) == 'test2.png' |
|
261 |
assert PwaNavigationEntry.objects.get(order=1).icon.read() == b'TEST2' |
|
262 | ||
263 |
def test_import_export_gallery_images(app, some_data): |
|
264 |
page = Page.objects.get(slug='one') |
|
265 |
gallery = GalleryCell(page=page, order=2, placeholder='images') |
|
266 |
gallery.save() |
|
267 |
image1 = Image(gallery=gallery, image='path/foo.jpg', title='foo', order=1) |
|
268 |
image2 = Image(gallery=gallery, image='path/bar.jpg', title='bar', order=2) |
|
269 |
image1.save() |
|
270 |
image2.save() |
|
271 |
output = get_output_of_command('export_site') |
|
272 |
import_site(data={}, clean=True) |
|
273 |
assert Image.objects.all().count() == 0 |
|
274 | ||
275 |
import_site(data=json.loads(output)) |
|
276 |
assert Image.objects.all().count() == 2 |
|
277 |
assert image1.title == 'foo' |
|
278 |
assert image1.image == 'path/foo.jpg' |
|
231 |
assert Asset.objects.count() == 0 |
|
232 | ||
233 |
with tempfile.NamedTemporaryFile() as f: |
|
234 |
f.write(force_bytes(output)) |
|
235 |
f.flush() |
|
236 |
call_command('import_site', f.name) |
|
237 | ||
238 |
assert Asset.objects.count() == 2 |
|
239 | ||
240 |
import_site(data={}, if_empty=True) |
|
241 |
assert Asset.objects.count() == 2 |
|
242 | ||
243 |
def test_import_export_pwa_settings(app, capsysbinary): |
|
244 |
output = get_output_of_command(capsysbinary, 'export_site') |
|
245 |
with capsysbinary.disabled(): |
|
246 |
pwa_settings = PwaSettings.singleton() |
|
247 |
pwa_settings.offline_text = 'Hello world' |
|
248 |
pwa_settings.offline_retry_button = False |
|
249 |
pwa_settings.save() |
|
250 |
output = get_output_of_command(capsysbinary, 'export_site') |
|
251 |
with capsysbinary.disabled(): |
|
252 |
import_site(data={}, clean=True) |
|
253 |
assert PwaSettings.objects.all().count() == 0 |
|
254 | ||
255 |
import_site(tar=tarfile.open(fileobj=BytesIO(output))) |
|
256 |
assert PwaSettings.singleton().offline_retry_button is False |
|
257 |
assert PwaSettings.singleton().offline_text == 'Hello world' |
|
258 | ||
259 |
def test_import_export_pwa_navigation(app, some_data, capsysbinary): |
|
260 |
with capsysbinary.disabled(): |
|
261 |
page = Page.objects.get(slug='one') |
|
262 |
entry1 = PwaNavigationEntry(label='a', url='/', order=0) |
|
263 |
entry2 = PwaNavigationEntry(link_page=page, order=1, icon=File(BytesIO(b'te\30st'), 'test.png')) |
|
264 |
entry1.save() |
|
265 |
entry2.save() |
|
266 |
output = get_output_of_command(capsysbinary, 'export_site') |
|
267 |
with capsysbinary.disabled(): |
|
268 |
import_site(data={}, clean=True) |
|
269 |
assert PwaNavigationEntry.objects.all().count() == 0 |
|
270 | ||
271 |
import_site(tar=tarfile.open(fileobj=BytesIO(output))) |
|
272 |
assert PwaNavigationEntry.objects.all().count() == 2 |
|
273 |
# check identical file was not touched |
|
274 |
assert os.path.basename(PwaNavigationEntry.objects.get(order=1).icon.file.name) == 'test.png' |
|
275 |
assert PwaNavigationEntry.objects.get(order=1).icon.read() == b'te\30st' |
|
276 | ||
277 |
# check a second import doesn't create additional entries |
|
278 |
import_site(tar=tarfile.open(fileobj=BytesIO(output))) |
|
279 |
assert PwaNavigationEntry.objects.all().count() == 2 |
|
280 | ||
281 |
# check with a change in icon file content |
|
282 |
data = get_embedded_json(output, '_site.json') |
|
283 |
data['pwa']['navigation'][1]['icon:base64'] = force_text(base64.encodebytes(b'TEST')) |
|
284 |
import_site(data=data) |
|
285 |
assert PwaNavigationEntry.objects.all().count() == 2 |
|
286 |
assert PwaNavigationEntry.objects.get(order=1).icon.read() == b'TEST' |
|
287 | ||
288 |
# check with a change in icon file name |
|
289 |
data = get_embedded_json(output, '_site.json') |
|
290 |
data['pwa']['navigation'][1]['fields']['icon'] = 'pwa/test2.png' |
|
291 |
data['pwa']['navigation'][1]['icon:base64'] = force_text(base64.encodebytes(b'TEST2')) |
|
292 |
import_site(data=data) |
|
293 |
assert PwaNavigationEntry.objects.all().count() == 2 |
|
294 |
assert os.path.basename(PwaNavigationEntry.objects.get(order=1).icon.file.name) == 'test2.png' |
|
295 |
assert PwaNavigationEntry.objects.get(order=1).icon.read() == b'TEST2' |
|
296 | ||
297 |
def test_import_export_gallery_images(app, some_data, capsysbinary): |
|
298 |
with capsysbinary.disabled(): |
|
299 |
page = Page.objects.get(slug='one') |
|
300 |
gallery = GalleryCell(page=page, order=2, placeholder='images') |
|
301 |
gallery.save() |
|
302 |
image1 = Image(gallery=gallery, image='path/foo.jpg', title='foo', order=1) |
|
303 |
image2 = Image(gallery=gallery, image='path/bar.jpg', title='bar', order=2) |
|
304 |
image1.save() |
|
305 |
image2.save() |
|
306 |
output = get_output_of_command(capsysbinary, 'export_site') |
|
307 |
with capsysbinary.disabled(): |
|
308 |
import_site(data={}, clean=True) |
|
309 |
assert Image.objects.all().count() == 0 |
|
310 | ||
311 |
import_site(tar=tarfile.open(fileobj=BytesIO(output))) |
|
312 |
assert Image.objects.all().count() == 2 |
|
313 |
assert image1.title == 'foo' |
|
314 |
assert image1.image == 'path/foo.jpg' |
tests/test_manager.py | ||
---|---|---|
1 | 1 |
import base64 |
2 |
import json |
|
3 | 2 |
import os |
4 | 3 |
import re |
5 | 4 |
import shutil |
6 | 5 | |
7 | 6 |
import mock |
8 | 7 | |
9 | 8 |
from django.core.files.storage import default_storage |
10 | 9 |
from django.core.urlresolvers import reverse |
11 | 10 |
from django.conf import settings |
12 |
from django.contrib.auth.models import User, Group
|
|
11 |
from django.contrib.auth.models import Group |
|
13 | 12 |
from django.template import TemplateSyntaxError |
14 | 13 |
from django.test import override_settings |
15 | 14 |
from django.utils.http import urlencode |
16 | 15 |
from django.utils.six import BytesIO |
17 | 16 |
from django.utils.six.moves.urllib import parse as urlparse |
18 | 17 | |
19 | 18 |
import pytest |
20 | 19 |
from webtest import TestApp |
... | ... | |
429 | 428 |
resp = app.get('/manage/pages/order', params={ |
430 | 429 |
'moved-page-id': page2.id, |
431 | 430 |
'moved-page-new-parent': page4.id, |
432 | 431 |
'new-order': ','.join([str(x) for x in [page1.id, page3.id, page4.id, page2.id]])}) |
433 | 432 |
assert Page.objects.get(id=page1.id).parent_id is None |
434 | 433 |
assert Page.objects.get(id=page3.id).parent_id == page1.id |
435 | 434 | |
436 | 435 | |
437 |
def test_export_page(app, admin_user):
|
|
436 |
def test_page_export_import(app, admin_user):
|
|
438 | 437 |
Page.objects.all().delete() |
439 | 438 |
page = Page(title='One', slug='one', template_name='standard') |
440 | 439 |
page.save() |
441 | 440 |
app = login(app) |
442 | 441 |
resp = app.get('/manage/pages/%s/' % page.id) |
443 | 442 |
resp = resp.click('Export') |
444 | 443 |
assert resp.headers['content-type'] == 'application/json' |
445 | 444 |
assert resp.json['pages'][0].get('fields').get('slug') == 'one' |
445 |
page_export = resp.body |
|
446 | ||
447 |
Page.objects.all().delete() |
|
448 |
assert Page.objects.count() == 0 |
|
449 |
app = login(app) |
|
450 |
resp = app.get('/manage/') |
|
451 |
resp = resp.click('Import Site') |
|
452 |
resp.form['site_json'] = Upload('page-export.json', page_export, 'application/json') |
|
453 |
resp = resp.form.submit() |
|
454 |
assert Page.objects.count() == 1 |
|
446 | 455 | |
447 | 456 |
def test_export_page_order(): |
448 | 457 |
Page.objects.all().delete() |
449 | 458 |
page1 = Page(title='One', slug='one', template_name='standard') |
450 | 459 |
page2 = Page(title='Two', slug='two', parent=page1, template_name='standard') |
451 | 460 |
page3 = Page(title='Three', slug='three', parent=page2, template_name='standard') |
452 | 461 |
page4 = Page(title='Four', slug='four', parent=page1, template_name='standard') |
453 | 462 |
random_list = [page3, page4, page1, page2] |
... | ... | |
475 | 484 |
cell.save() |
476 | 485 | |
477 | 486 |
cell = LinkCell(page=page2, placeholder='content', link_page=page1, order=0) |
478 | 487 |
cell.save() |
479 | 488 | |
480 | 489 |
app = login(app) |
481 | 490 |
resp = app.get('/manage/') |
482 | 491 |
resp = resp.click('Export Site') |
483 |
assert resp.headers['content-type'] == 'application/json'
|
|
492 |
assert resp.headers['content-type'] == 'application/x-tar'
|
|
484 | 493 |
site_export = resp.body |
485 | 494 | |
486 | 495 |
Page.objects.all().delete() |
487 | 496 |
assert LinkCell.objects.count() == 0 |
488 | 497 |
app = login(app) |
489 | 498 |
resp = app.get('/manage/') |
490 | 499 |
resp = resp.click('Import Site') |
491 |
resp.form['site_json'] = Upload('site-export.json', site_export, 'application/json')
|
|
500 |
resp.form['site_json'] = Upload('site-export.tar', site_export, 'application/x-tar')
|
|
492 | 501 |
resp = resp.form.submit() |
493 | 502 |
assert Page.objects.count() == 4 |
494 | 503 |
assert LinkCell.objects.count() == 2 |
495 | 504 |
assert LinkCell.objects.get(page__slug='one').link_page.slug == 'two' |
496 | 505 |
assert LinkCell.objects.get(page__slug='two').link_page.slug == 'one' |
497 | 506 | |
498 | 507 |
# check with invalid file |
499 | 508 |
resp = app.get('/manage/') |
500 | 509 |
resp = resp.click('Import Site') |
501 |
resp.form['site_json'] = Upload('site-export.json', b'invalid content', 'application/json')
|
|
510 |
resp.form['site_json'] = Upload('site-export.tar', b'invalid content', 'application/x-tar')
|
|
502 | 511 |
resp = resp.form.submit() |
503 | 512 |
assert 'File is not in the expected JSON format.' in resp.text |
504 | 513 | |
505 | 514 |
def test_site_export_import_missing_group(app, admin_user): |
506 | 515 |
Page.objects.all().delete() |
507 | 516 |
group = Group.objects.create(name='foobar') |
508 | 517 |
page1 = Page(title='One', slug='one', template_name='standard') |
509 | 518 |
page1.save() |
510 | 519 |
page1.groups.set([group]) |
511 | 520 | |
512 | 521 |
app = login(app) |
513 | 522 |
resp = app.get('/manage/') |
514 | 523 |
resp = resp.click('Export Site') |
515 |
assert resp.headers['content-type'] == 'application/json'
|
|
524 |
assert resp.headers['content-type'] == 'application/x-tar'
|
|
516 | 525 |
site_export = resp.body |
517 | 526 | |
518 | 527 |
Page.objects.all().delete() |
519 | 528 |
group.delete() |
520 | 529 | |
521 | 530 |
app = login(app) |
522 | 531 |
resp = app.get('/manage/') |
523 | 532 |
resp = resp.click('Import Site') |
524 |
resp.form['site_json'] = Upload('site-export.json', site_export, 'application/json')
|
|
533 |
resp.form['site_json'] = Upload('site-export.json', site_export, 'application/x-tar')
|
|
525 | 534 |
resp = resp.form.submit() |
526 | 535 |
assert 'Missing groups: foobar' in resp.text |
527 | 536 | |
528 | 537 | |
529 | 538 |
def test_duplicate_page(app, admin_user): |
530 | 539 |
page = Page.objects.create(title='One', slug='one', template_name='standard', exclude_from_navigation=False) |
531 | 540 |
TextCell.objects.create(page=page, placeholder='content', text='Foobar', order=0) |
532 | 541 |
tests/test_pages.py | ||
---|---|---|
1 |
import io |
|
1 | 2 |
import os |
2 | 3 |
import pytest |
3 |
import sys |
|
4 | 4 | |
5 | 5 |
from django.conf import settings |
6 | 6 |
from django.contrib.auth.models import User, Group |
7 | 7 |
from django.test import override_settings |
8 | 8 |
from django.test.client import RequestFactory |
9 |
from django.utils.six import StringIO
|
|
9 |
from django.utils.six import BytesIO
|
|
10 | 10 |
from combo.data.models import Page, PageSnapshot, CellBase, TextCell, LinkCell, LinkListCell |
11 | 11 |
from combo.data.management.commands.import_site import Command as ImportSiteCommand |
12 | 12 |
from combo.data.management.commands.export_site import Command as ExportSiteCommand |
13 | 13 |
from combo.manager.forms import PageVisibilityForm |
14 | 14 | |
15 | 15 |
pytestmark = pytest.mark.django_db |
16 | 16 | |
17 | 17 |
def test_page_order(): |
... | ... | |
306 | 306 |
page3.save() |
307 | 307 |
assert page.get_next_page(None) is None |
308 | 308 | |
309 | 309 |
assert page.get_next_page(check_visibility=False).id == page2.pk |
310 | 310 |
assert page2.get_next_page(check_visibility=False).id == page3.pk |
311 | 311 |
assert page3.get_previous_page(check_visibility=False).id == page2.pk |
312 | 312 | |
313 | 313 | |
314 |
def test_import_export_management_commands(): |
|
315 |
page = Page(title=u'foo', slug='foo', order=0) |
|
316 |
page.save() |
|
314 |
def test_import_export_management_commands(capsysbinary, monkeypatch): |
|
315 |
with capsysbinary.disabled(): |
|
316 |
page = Page(title=u'foo', slug='foo', order=0) |
|
317 |
page.save() |
|
317 | 318 | |
318 |
cell = TextCell(page=page, text='foo', order=0, placeholder='content') |
|
319 |
cell.save() |
|
319 |
cell = TextCell(page=page, text='foo', order=0, placeholder='content')
|
|
320 |
cell.save()
|
|
320 | 321 | |
321 |
page2 = Page(title=u'bar', slug='bar', order=1, parent=page) |
|
322 |
page2.save() |
|
322 |
page2 = Page(title=u'bar', slug='bar', order=1, parent=page)
|
|
323 |
page2.save()
|
|
323 | 324 | |
324 |
cell = TextCell(page=page2, text='bar', order=0, placeholder='content') |
|
325 |
cell.save() |
|
325 |
cell = TextCell(page=page2, text='bar', order=0, placeholder='content')
|
|
326 |
cell.save()
|
|
326 | 327 | |
327 |
export_filename = os.path.join(settings.MEDIA_ROOT, 'site-export.json') |
|
328 |
if os.path.exists(export_filename): |
|
329 |
os.unlink(export_filename) |
|
328 |
export_filename = os.path.join(settings.MEDIA_ROOT, 'site-export.json')
|
|
329 |
if os.path.exists(export_filename):
|
|
330 |
os.unlink(export_filename)
|
|
330 | 331 | |
331 |
cmd = ExportSiteCommand() |
|
332 |
cmd.handle(output=export_filename) |
|
333 |
assert os.path.exists(export_filename) |
|
332 |
cmd = ExportSiteCommand()
|
|
333 |
cmd.handle(output=export_filename)
|
|
334 |
assert os.path.exists(export_filename)
|
|
334 | 335 | |
335 |
stdout = sys.stdout |
|
336 |
try: |
|
337 |
sys.stdout = StringIO() |
|
338 |
cmd.handle(output='-') |
|
339 |
assert sys.stdout.getvalue() == open(export_filename).read() |
|
340 |
finally: |
|
341 |
sys.stdout = stdout |
|
336 |
cmd.handle(output='-') |
|
337 |
captured = capsysbinary.readouterr() |
|
338 |
with capsysbinary.disabled(): |
|
339 |
assert captured.out.decode("utf-8")[:1023] == open(export_filename).read()[:1023] |
|
342 | 340 | |
343 |
Page.objects.all().delete() |
|
341 |
Page.objects.all().delete()
|
|
344 | 342 | |
345 |
cmd = ImportSiteCommand() |
|
346 |
cmd.handle(filename=export_filename, if_empty=False, clean=False) |
|
343 |
cmd = ImportSiteCommand()
|
|
344 |
cmd.handle(filename=export_filename, if_empty=False, clean=False)
|
|
347 | 345 | |
348 |
new_page_1 = Page.objects.all().order_by('order')[0] |
|
349 |
new_page_2 = Page.objects.all().order_by('order')[1] |
|
350 |
assert new_page_1.title == 'foo' |
|
351 |
assert new_page_2.title == 'bar' |
|
352 |
assert len(CellBase.get_cells(page_id=new_page_1.id)) == 1 |
|
353 |
assert isinstance(CellBase.get_cells(page_id=new_page_1.id)[0], TextCell) |
|
354 |
assert CellBase.get_cells(page_id=new_page_1.id)[0].text == 'foo' |
|
346 |
new_page_1 = Page.objects.all().order_by('order')[0] |
|
347 |
new_page_2 = Page.objects.all().order_by('order')[1] |
|
348 |
assert new_page_1.title == 'foo' |
|
349 |
assert new_page_2.title == 'bar' |
|
350 |
assert len(CellBase.get_cells(page_id=new_page_1.id)) == 1 |
|
351 |
assert isinstance(CellBase.get_cells(page_id=new_page_1.id)[0], TextCell) |
|
352 |
assert CellBase.get_cells(page_id=new_page_1.id)[0].text == 'foo' |
|
353 | ||
354 |
Page.objects.all().delete() |
|
355 | ||
356 |
monkeypatch.setattr('sys.stdin', io.TextIOWrapper(BytesIO(captured.out))) |
|
357 |
cmd.handle(filename='-', if_empty=False, clean=False) |
|
358 | ||
359 |
new_page_1 = Page.objects.all().order_by('order')[0] |
|
360 |
new_page_2 = Page.objects.all().order_by('order')[1] |
|
361 |
assert new_page_1.title == 'foo' |
|
362 |
assert new_page_2.title == 'bar' |
|
363 |
assert len(CellBase.get_cells(page_id=new_page_1.id)) == 1 |
|
364 |
assert isinstance(CellBase.get_cells(page_id=new_page_1.id)[0], TextCell) |
|
365 |
assert CellBase.get_cells(page_id=new_page_1.id)[0].text == 'foo' |
|
355 | 366 | |
356 | 367 |
def test_get_placeholders(): |
357 | 368 |
page = Page(title=u'foo', slug='foo', template_name='standard-sidebar', order=0) |
358 | 369 |
request = RequestFactory().get('/') |
359 | 370 |
placeholders = page.get_placeholders(request=request) |
360 | 371 |
assert [x.key for x in placeholders] == ['content', 'sidebar', 'footer'] |
361 | 372 |
assert placeholders[0].acquired is False |
362 | 373 |
assert placeholders[-1].acquired is True |
363 |
- |