From b716109aee6d83bcf3fcc829d9216d3d76b3dee4 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Fri, 11 Oct 2019 13:54:41 +0200 Subject: [PATCH 05/11] utils: add zip package for templated zip files (#36848) --- passerelle/utils/zip.py | 241 ++++++++++++++++++++++++++++++++++++++++ tests/test_utils_zip.py | 156 ++++++++++++++++++++++++++ 2 files changed, 397 insertions(+) create mode 100644 passerelle/utils/zip.py create mode 100644 tests/test_utils_zip.py diff --git a/passerelle/utils/zip.py b/passerelle/utils/zip.py new file mode 100644 index 00000000..a4dd60b9 --- /dev/null +++ b/passerelle/utils/zip.py @@ -0,0 +1,241 @@ +# passerelle - uniform access to multiple data sources and services +# Copyright (C) 2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from __future__ import unicode_literals, absolute_import + +import io +import os.path +import json +import re +import xml.etree.ElementTree as ET +import zipfile + +from jsonschema import validate, ValidationError + +from django.template import Template, Context, TemplateDoesNotExist, TemplateSyntaxError, engines +from django.utils.functional import cached_property +from django.utils.encoding import force_str +from django.utils.six import python_2_unicode_compatible +from django.template.loader import get_template + +from passerelle.utils.files import atomic_write + + +SCHEMA = { + 'type': 'object', + 'required': ['name_template'], + 'properties': { + 'name_template': { + 'type': 'string', + }, + 'part_templates': { + 'type': 'array', + 'items': { + 'oneOf': [ + { + 'type': 'object', + 'required': ['name_template', 'template_path'], + 'additionalProperties': False, + 'properties': { + 'name_template': { + 'type': 'string', + }, + 'template_path': { + 'type': 'string', + }, + }, + }, + { + 'type': 'object', + 'required': ['name_template', 'content_expression'], + 'additionalProperties': False, + 'properties': { + 'name_template': { + 'type': 'string', + }, + 'content_expression': { + 'type': 'string', + }, + }, + } + ] + }, + }, + }, +} + + +class ZipTemplateError(Exception): + pass + + +class ZipTemplateDoesNotExist(ZipTemplateError): + pass + + +class ZipTemplateSyntaxError(ZipTemplateError): + pass + +VARIABLE_RE = re.compile(r'{{ *(\w*)') + + +@python_2_unicode_compatible +class ZipPart(object): + def __init__(self, zip_template, name_template, template_path=None, content_expression=None): + self.zip_template = zip_template + self._name_template = name_template + self.template_path = template_path + self.content_expression = content_expression + assert bool(self.template_path) ^ bool(self.content_expression), '\ + template_path and content_expression are mutually excluded' + + @property + def ctx(self): + return self.zip_template.ctx + + @property + def base_path(self): + return self.zip_template.base_path + + @property + def template(self): + assert self.name_template, 'not a template_path part' + + template_path = os.path.join(self.base_path, self.template_path) + if template_path.startswith('/'): + if not os.path.exists(template_path): + raise ZipTemplateDoesNotExist('part template %s not found' % template_path, e) + try: + with open(template_path) as fd: + return Template(fd.read()) + except TemplateSyntaxError as e: + raise ZipTemplateSyntaxError('syntax error in part template %s' % template_path, e) + else: + try: + return get_template(template_path).template + except TemplateSyntaxError as e: + raise ZipTemplateSyntaxError('syntax error in part template %s' % template_path, e) + except TemplateDoesNotExist as e: + raise ZipTemplateDoesNotExist('part template %s not found' % template_path, e) + + @property + def name_template(self): + try: + return Template(self._name_template) + except TemplateSyntaxError as e: + raise ZipTemplateSyntaxError('syntax error in part\'s name template %s' % self, e) + + def _render(self, template): + return template.render(Context(self.ctx, use_l10n=False)) + + @property + def content(self): + if self.template_path: + return self._render(self.template) + else: + return self.ctx[self.content_expression] + + @property + def name(self): + return self._render(self.name_template) + + def __str__(self): + s = '<{0.__class__.__name__} name_template={0._name_template}' + if self.template_path: + s += ' template_path={0.template_path!r}' + else: + s += ' content_expression={0.content_expression!r}' + s += '>' + return s.format(self) + + +class ZipTemplate(object): + def __init__(self, manifest, ctx=None): + if manifest.startswith('/'): + path = manifest + else: + path = None + for engine in engines.all(): + for loader in engine.engine.template_loaders: + for origin in loader.get_template_sources(manifest): + if os.path.exists(origin.name): + path = origin.name + break + if path: + break + if path: + break + if not path: + raise ZipTemplateDoesNotExist('manifest %s not found' % manifest) + self.base_path = os.path.dirname(manifest) + self.manifest_path = path + try: + manifest = self.manifest + except ValueError as e: + raise ZipTemplateError('invalid manifest file %s' % path, e) + try: + validate(self.manifest, SCHEMA) + except ValidationError as e: + raise ZipTemplateError('invalid manifest file %s' % path, e) + self.ctx = ctx or {} + + @cached_property + def manifest(self): + with open(self.manifest_path) as fd: + return json.load(fd) + + @property + def name_template(self): + try: + return Template(self.manifest['name_template']) + except TemplateSyntaxError as e: + raise ZipTemplateSyntaxError('syntax error in zip name_template', e) + + @property + def name(self): + return self.name_template.render(Context(self.ctx)) + + @property + def parts(self): + for part_template in self.manifest.get('part_templates', []): + yield ZipPart(zip_template=self, **part_template) + + @property + def rendered_parts(self): + for zip_part in self.parts: + name = zip_part.name + content = zip_part.content + if name.endswith('.xml'): + try: + ET.fromstring(force_str(content)) + except ET.ParseError as e: + raise ZipTemplateSyntaxError('XML syntax error in part template %s' % zip_part, e) + yield name, zip_part.content + + def render_to_bytes(self): + with io.BytesIO() as buf: + self.render_to_file(buf) + return buf.getvalue() + + def render_to_file(self, filelike): + with zipfile.ZipFile(filelike, 'w') as zi: + for name, content in self.rendered_parts: + zi.writestr(name, force_str(content)) + + def render_to_path(self, path, tmp_dir=None): + full_path = os.path.join(str(path), self.name) + with atomic_write(full_path, dir=tmp_dir) as fd: + self.render_to_file(fd) diff --git a/tests/test_utils_zip.py b/tests/test_utils_zip.py new file mode 100644 index 00000000..bcdd19db --- /dev/null +++ b/tests/test_utils_zip.py @@ -0,0 +1,156 @@ +# coding: utf-8 +# passerelle - uniform access to multiple data sources and services +# Copyright (C) 2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from __future__ import unicode_literals + +import io +import json +import uuid +import zipfile + +import pytest + +from passerelle.utils.zip import ZipTemplate, ZipTemplateDoesNotExist, ZipTemplateSyntaxError, ZipTemplateError + + +@pytest.fixture +def templates_path(tmpdir, settings): + path = tmpdir.mkdir('templates') + settings.TEMPLATES = settings.TEMPLATES[:] + settings.TEMPLATES[0] = settings.TEMPLATES[0].copy() + settings.TEMPLATES[0].setdefault('DIRS', []) + settings.TEMPLATES[0]['DIRS'].insert(0, str(path)) + zip_templates_path = path.mkdir('zip_templates') + return zip_templates_path + + +@pytest.fixture +def tpl_builder(templates_path): + def make(name_template, template_parts=(), content_parts=()): + manifest_name = '%s.json' % uuid.uuid4().get_hex() + manifest_path = templates_path / manifest_name + d = { + 'name_template': name_template, + } + if template_parts or content_parts: + d['part_templates'] = [] + for name_template, content in template_parts: + name = '%s.xml' % uuid.uuid4().get_hex() + with (templates_path / name).open('w') as fd: + fd.write(content) + d['part_templates'].append({ + 'name_template': name_template, + 'template_path': name, + }) + for name_template, content in content_parts: + d['part_templates'].append({ + 'name_template': name_template, + 'content_expression': content, + }) + with manifest_path.open('w') as fd: + json.dump(d, fd) + return '%s/%s' % (templates_path.basename, manifest_name) + return make + + +@pytest.fixture +def dest(tmpdir): + return tmpdir.mkdir('dest') + + +def test_missing(): + with pytest.raises(ZipTemplateDoesNotExist): + ZipTemplate('zip_templates/manifest1.json') + + +def test_invalid(templates_path): + path = templates_path / 'invalid-manifest.json' + with path.open(mode='w') as fd: + fd.write('{') + with pytest.raises(ZipTemplateError) as exc_info: + ZipTemplate(str(path)) + assert 'invalid' in exc_info.value.args[0] + + with path.open(mode='w') as fd: + fd.write('{}') + with pytest.raises(ZipTemplateError): + ZipTemplate(str(path)) + assert 'invalid' in exc_info.value.args[0] + + +def test_syntax_error(tpl_builder, dest): + zip_template = ZipTemplate(tpl_builder('{{ name -{{ counter }}.zip'), ctx={'name': 'coucou', 'counter': 10}) + with pytest.raises(ZipTemplateSyntaxError): + zip_template.render_to_path(dest) + + zip_template = ZipTemplate( + tpl_builder( + '{{ name }}-{{ counter }}.zip', + template_parts=[('part1.xml', '{{ name {{ }}')]), + ctx={'name': 'coucou', 'counter': 10}) + with pytest.raises(ZipTemplateSyntaxError): + zip_template.render_to_path(dest) + + +def test_no_parts(tpl_builder, dest): + z = ZipTemplate(tpl_builder('{{ name }}-{{ counter }}.zip'), + ctx={'name': 'coucou', 'counter': 10}) + z.render_to_path(dest) + + full_path = dest / 'coucou-10.zip' + with full_path.open() as fd: + with zipfile.ZipFile(fd) as zi: + assert zi.namelist() == [] + + +def test_with_parts(tpl_builder, dest): + z = ZipTemplate( + tpl_builder( + '{{ name }}-{{ counter }}.zip', + template_parts=[('{{ name }}-{{ counter }}-part1.xml', + '{{ bo_dy|lower }}')], + content_parts=[('{{ name }}-{{ counter }}-dôc.xml', 'doc-content')], + ), + ctx={'name': 'coucou', 'counter': 10, 'bo_dy': 'blabla', 'doc-content': 'Héllo World!'}) + z.render_to_path(dest) + for part in z.parts: + str(part) + + full_path = dest / 'coucou-10.zip' + with full_path.open() as fd: + with zipfile.ZipFile(fd) as zi: + assert zi.namelist() == ['coucou-10-part1.xml', 'coucou-10-dôc.xml'] + assert zi.open('coucou-10-part1.xml').read().decode('utf-8') == 'blabla' + assert zi.open('coucou-10-dôc.xml').read().decode('utf-8') == 'Héllo World!' + + with io.BytesIO(z.render_to_bytes()) as fd: + with zipfile.ZipFile(fd) as zi: + assert zi.namelist() == ['coucou-10-part1.xml', 'coucou-10-dôc.xml'] + assert zi.open('coucou-10-part1.xml').read().decode('utf-8') == 'blabla' + assert zi.open('coucou-10-dôc.xml').read().decode('utf-8') == 'Héllo World!' + + +def test_xml_error(tpl_builder, dest): + z = ZipTemplate( + tpl_builder( + 'rien.zip', + content_parts=[('rien.xml', 'doc-content')], + ), + ctx={'doc-content': 'Héllo World!'}) + with pytest.raises(ZipTemplateSyntaxError) as exc_info: + z.render_to_bytes() + assert 'XML syntax error' in exc_info.value.args[0] -- 2.23.0