From 811fb044791e263b6fba2ddf59701b4574c5b899 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Fri, 11 Oct 2019 13:54:41 +0200 Subject: [PATCH 1/2] utils: add zip package for templated zip files (#36848) --- passerelle/utils/zip.py | 140 ++++++++++++++++++++++++++++++++++++++++ tests/test_utils_zip.py | 109 +++++++++++++++++++++++++++++++ 2 files changed, 249 insertions(+) create mode 100644 passerelle/utils/zip.py create mode 100644 tests/test_utils_zip.py diff --git a/passerelle/utils/zip.py b/passerelle/utils/zip.py new file mode 100644 index 00000000..548e5f5e --- /dev/null +++ b/passerelle/utils/zip.py @@ -0,0 +1,140 @@ +# passerelle - uniform access to multiple data sources and services +# Copyright (C) 2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from __future__ import unicode_literals + +import io +import os.path +import json +import zipfile + +from jsonschema import validate, ValidationError + +from django.template import Template, Context, TemplateDoesNotExist, TemplateSyntaxError, engines +from django.utils.functional import cached_property +from django.template.loader import get_template + +from passerelle.utils.files import atomic_write + + +SCHEMA = { + 'type': 'object', + 'required': ['name_template'], + 'properties': { + 'name_template': { + 'type': 'string', + }, + 'part_templates': { + 'type': 'array', + 'items': { + 'type': 'object', + 'required': ['name_template', 'template_path'], + 'properties': { + 'name_template': { + 'type': 'string', + }, + 'template_path': { + 'type': 'string', + }, + }, + }, + }, + }, +} + + +class ZipTemplateError(Exception): + pass + + +class ZipTemplateDoesNotExist(ZipTemplateError): + pass + + +class ZipTemplateSyntaxError(ZipTemplateError): + pass + + +class ZipTemplate(object): + def __init__(self, manifest, ctx=None): + path = None + for engine in engines.all(): + for loader in engine.engine.template_loaders: + for origin in loader.get_template_sources(manifest): + if os.path.exists(origin.name): + path = origin.name + break + if path: + break + if path: + break + if not path: + raise ZipTemplateDoesNotExist('manifest %s not found' % manifest) + self.base_path = os.path.dirname(manifest) + self.manifest_path = path + try: + manifest = self.manifest + except ValueError as e: + raise ZipTemplateError('invalid manifest file %s' % path, e) + try: + validate(self.manifest, SCHEMA) + except ValidationError as e: + raise ZipTemplateError('invalid manifest file %s' % path, e) + self.ctx = ctx or {} + + @cached_property + def manifest(self): + with open(self.manifest_path) as fd: + return json.load(fd) + + def __render_template(self, template_content, origin): + try: + return Template(template_content).render(Context(self.ctx, use_l10n=False)) + except TemplateSyntaxError as e: + raise ZipTemplateSyntaxError(e) + + @property + def name(self): + return self.__render_template(self.manifest['name_template'], '') + + @property + def parts(self): + for part_template in self.manifest.get('part_templates', []): + name = self.__render_template(part_template['name_template'], part_template) + template_path = os.path.join(self.base_path, part_template['template_path']) + try: + template = get_template(template_path) + except TemplateSyntaxError as e: + raise ZipTemplateSyntaxError('syntax error in part template %s' % template_path, e) + except TemplateDoesNotExist as e: + raise ZipTemplateDoesNotExist('part template %s not found' % template_path, e) + content = template.render(self.ctx) + yield name, content + + def render_to_bytes(self): + with io.BytesIO() as buf: + self.render_to_file(buf) + return buf.getvalue() + + def render_to_file(self, filelike): + with zipfile.ZipFile(filelike, 'w') as zi: + for name, content in self.parts: + zi.writestr(name, content) + + def render_to_path(self, path, tmp_dir=None): + full_path = os.path.join(str(path), self.name) + with atomic_write(full_path, dir=tmp_dir) as fd: + self.render_to_file(fd) diff --git a/tests/test_utils_zip.py b/tests/test_utils_zip.py new file mode 100644 index 00000000..c51ca4c3 --- /dev/null +++ b/tests/test_utils_zip.py @@ -0,0 +1,109 @@ +# passerelle - uniform access to multiple data sources and services +# Copyright (C) 2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from __future__ import unicode_literals + +import json +import uuid +import zipfile + +import pytest + +from passerelle.utils.zip import ZipTemplate, ZipTemplateDoesNotExist, ZipTemplateSyntaxError + + +@pytest.fixture +def templates_path(tmpdir, settings): + path = tmpdir.mkdir('templates') + settings.TEMPLATES = settings.TEMPLATES[:] + settings.TEMPLATES[0] = settings.TEMPLATES[0].copy() + settings.TEMPLATES[0].setdefault('DIRS', []) + settings.TEMPLATES[0]['DIRS'].insert(0, str(path)) + zip_templates_path = path.mkdir('zip_templates') + return zip_templates_path + + +@pytest.fixture +def tpl_builder(templates_path): + def make(name_template, *parts): + manifest_name = '%s.json' % uuid.uuid4().get_hex() + manifest_path = templates_path / manifest_name + d = { + 'name_template': name_template, + } + if parts: + d['part_templates'] = [] + for name_template, content in parts: + name = '%s.xml' % uuid.uuid4().get_hex() + with (templates_path / name).open('w') as fd: + fd.write(content) + d['part_templates'].append({ + 'name_template': name_template, + 'template_path': name, + }) + with manifest_path.open('w') as fd: + json.dump(d, fd) + return '%s/%s' % (templates_path.basename, manifest_name) + return make + + +@pytest.fixture +def dest(tmpdir): + return tmpdir.mkdir('dest') + + +def test_missing(templates_path): + with pytest.raises(ZipTemplateDoesNotExist): + ZipTemplate('zip_templates/manifest1.json') + + +def test_syntax_error(tpl_builder, dest): + zip_template = ZipTemplate(tpl_builder('{{ name -{{ counter }}.zip'), ctx={'name': 'coucou', 'counter': 10}) + with pytest.raises(ZipTemplateSyntaxError): + zip_template.render_to_path(dest) + + zip_template = ZipTemplate( + tpl_builder( + '{{ name }}-{{ counter }}.zip', + ('part1.xml', '{{ name {{ }}')), + ctx={'name': 'coucou', 'counter': 10}) + with pytest.raises(ZipTemplateSyntaxError): + zip_template.render_to_path(dest) + + +def test_no_parts(tpl_builder, dest): + ZipTemplate(tpl_builder('{{ name }}-{{ counter }}.zip'), + ctx={'name': 'coucou', 'counter': 10}).render_to_path(dest) + + full_path = dest / 'coucou-10.zip' + with full_path.open() as fd: + with zipfile.ZipFile(fd) as zi: + assert zi.namelist() == [] + + +def test_with_parts(tpl_builder, dest): + ZipTemplate( + tpl_builder( + '{{ name }}-{{ counter }}.zip', + ('{{ name }}-{{ counter }}-part1.xml', '{{ body }}'), + ), + ctx={'name': 'coucou', 'counter': 10, 'body': 'blabla'}).render_to_path(dest) + + full_path = dest / 'coucou-10.zip' + with full_path.open() as fd: + with zipfile.ZipFile(fd) as zi: + assert zi.namelist() == ['coucou-10-part1.xml'] + assert zi.open('coucou-10-part1.xml').read() == 'blabla' -- 2.23.0