Project

General

Profile

0002-utils-add-zip-package-for-templated-zip-files-36848.patch

Benjamin Dauvergne, 21 Oct 2019 10:25 AM

Download (14.3 KB)

View differences:

Subject: [PATCH 2/8] utils: add zip package for templated zip files (#36848)

 passerelle/utils/zip.py | 229 ++++++++++++++++++++++++++++++++++++++++
 tests/test_utils_zip.py | 156 +++++++++++++++++++++++++++
 2 files changed, 385 insertions(+)
 create mode 100644 passerelle/utils/zip.py
 create mode 100644 tests/test_utils_zip.py
passerelle/utils/zip.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2019  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from __future__ import unicode_literals, absolute_import
18

  
19
import io
20
import os.path
21
import json
22
import re
23
import xml.etree.ElementTree as ET
24
import zipfile
25

  
26
from jsonschema import validate, ValidationError
27

  
28
from django.template import Template, Context, TemplateDoesNotExist, TemplateSyntaxError, engines
29
from django.utils.functional import cached_property
30
from django.utils.encoding import force_str
31
from django.utils.six import python_2_unicode_compatible
32
from django.template.loader import get_template
33

  
34
from passerelle.utils.files import atomic_write
35

  
36

  
37
SCHEMA = {
38
    'type': 'object',
39
    'required': ['name_template'],
40
    'properties': {
41
        'name_template': {
42
            'type': 'string',
43
        },
44
        'part_templates': {
45
            'type': 'array',
46
            'items': {
47
                'oneOf': [
48
                    {
49
                        'type': 'object',
50
                        'required': ['name_template', 'template_path'],
51
                        'additionalProperties': False,
52
                        'properties': {
53
                            'name_template': {
54
                                'type': 'string',
55
                            },
56
                            'template_path': {
57
                                'type': 'string',
58
                            },
59
                        },
60
                    },
61
                    {
62
                        'type': 'object',
63
                        'required': ['name_template', 'content_expression'],
64
                        'additionalProperties': False,
65
                        'properties': {
66
                            'name_template': {
67
                                'type': 'string',
68
                            },
69
                            'content_expression': {
70
                                'type': 'string',
71
                            },
72
                        },
73
                    }
74
                ]
75
            },
76
        },
77
    },
78
}
79

  
80

  
81
class ZipTemplateError(Exception):
82
    pass
83

  
84

  
85
class ZipTemplateDoesNotExist(ZipTemplateError):
86
    pass
87

  
88

  
89
class ZipTemplateSyntaxError(ZipTemplateError):
90
    pass
91

  
92
VARIABLE_RE = re.compile(r'{{ *(\w*)')
93

  
94

  
95
@python_2_unicode_compatible
96
class ZipPart(object):
97
    def __init__(self, zip_template, name_template, template_path=None, content_expression=None):
98
        self.zip_template = zip_template
99
        self._name_template = name_template
100
        self.template_path = template_path
101
        self.content_expression = content_expression
102
        assert bool(self.template_path) ^ bool(self.content_expression), '\
103
            template_path and content_expression are mutually excluded'
104

  
105
    @property
106
    def ctx(self):
107
        return self.zip_template.ctx
108

  
109
    @property
110
    def base_path(self):
111
        return self.zip_template.base_path
112

  
113
    @property
114
    def template(self):
115
        assert self.name_template, 'not a template_path part'
116

  
117
        template_path = os.path.join(self.base_path, self.template_path)
118
        try:
119
            return get_template(template_path)
120
        except TemplateSyntaxError as e:
121
            raise ZipTemplateSyntaxError('syntax error in part template %s' % template_path, e)
122
        except TemplateDoesNotExist as e:
123
            raise ZipTemplateDoesNotExist('part template %s not found' % template_path, e)
124

  
125
    @property
126
    def name_template(self):
127
        try:
128
            return Template(self._name_template)
129
        except TemplateSyntaxError as e:
130
            raise ZipTemplateSyntaxError('syntax error in part\'s name template %s' % self, e)
131

  
132
    def _render(self, template):
133
        return template.render(Context(self.ctx, use_l10n=False))
134

  
135
    @property
136
    def content(self):
137
        if self.template_path:
138
            return self.template.render(self.ctx)
139
        else:
140
            return self.ctx[self.content_expression]
141

  
142
    @property
143
    def name(self):
144
        return self._render(self.name_template)
145

  
146
    def __str__(self):
147
        s = '<{0.__class__.__name__} name_template={0._name_template}'
148
        if self.template_path:
149
            s += ' template_path={0.template_path!r}'
150
        else:
151
            s += ' content_expression={0.content_expression!r}'
152
        s += '>'
153
        return s.format(self)
154

  
155

  
156
class ZipTemplate(object):
157
    def __init__(self, manifest, ctx=None):
158
        path = None
159
        for engine in engines.all():
160
            for loader in engine.engine.template_loaders:
161
                for origin in loader.get_template_sources(manifest):
162
                    if os.path.exists(origin.name):
163
                        path = origin.name
164
                        break
165
                if path:
166
                    break
167
            if path:
168
                break
169
        if not path:
170
            raise ZipTemplateDoesNotExist('manifest %s not found' % manifest)
171
        self.base_path = os.path.dirname(manifest)
172
        self.manifest_path = path
173
        try:
174
            manifest = self.manifest
175
        except ValueError as e:
176
            raise ZipTemplateError('invalid manifest file %s' % path, e)
177
        try:
178
            validate(self.manifest, SCHEMA)
179
        except ValidationError as e:
180
            raise ZipTemplateError('invalid manifest file %s' % path, e)
181
        self.ctx = ctx or {}
182

  
183
    @cached_property
184
    def manifest(self):
185
        with open(self.manifest_path) as fd:
186
            return json.load(fd)
187

  
188
    @property
189
    def name_template(self):
190
        try:
191
            return Template(self.manifest['name_template'])
192
        except TemplateSyntaxError as e:
193
            raise ZipTemplateSyntaxError('syntax error in zip name_template', e)
194

  
195
    @property
196
    def name(self):
197
        return self.name_template.render(Context(self.ctx))
198

  
199
    @property
200
    def parts(self):
201
        for part_template in self.manifest.get('part_templates', []):
202
            yield ZipPart(zip_template=self, **part_template)
203

  
204
    @property
205
    def rendered_parts(self):
206
        for zip_part in self.parts:
207
            name = zip_part.name
208
            content = zip_part.content
209
            if name.endswith('.xml'):
210
                try:
211
                    ET.fromstring(force_str(content))
212
                except ET.ParseError as e:
213
                    raise ZipTemplateSyntaxError('XML syntax error in part template %s' % zip_part, e)
214
            yield name, zip_part.content
215

  
216
    def render_to_bytes(self):
217
        with io.BytesIO() as buf:
218
            self.render_to_file(buf)
219
            return buf.getvalue()
220

  
221
    def render_to_file(self, filelike):
222
        with zipfile.ZipFile(filelike, 'w') as zi:
223
            for name, content in self.rendered_parts:
224
                zi.writestr(name, force_str(content))
225

  
226
    def render_to_path(self, path, tmp_dir=None):
227
        full_path = os.path.join(str(path), self.name)
228
        with atomic_write(full_path, dir=tmp_dir) as fd:
229
            self.render_to_file(fd)
tests/test_utils_zip.py
1
# coding: utf-8
2
# passerelle - uniform access to multiple data sources and services
3
# Copyright (C) 2019 Entr'ouvert
4
#
5
# This program is free software: you can redistribute it and/or modify it
6
# under the terms of the GNU Affero General Public License as published
7
# by the Free Software Foundation, either version 3 of the License, or
8
# (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17

  
18
from __future__ import unicode_literals
19

  
20
import io
21
import json
22
import uuid
23
import zipfile
24

  
25
import pytest
26

  
27
from passerelle.utils.zip import ZipTemplate, ZipTemplateDoesNotExist, ZipTemplateSyntaxError, ZipTemplateError
28

  
29

  
30
@pytest.fixture
31
def templates_path(tmpdir, settings):
32
    path = tmpdir.mkdir('templates')
33
    settings.TEMPLATES = settings.TEMPLATES[:]
34
    settings.TEMPLATES[0] = settings.TEMPLATES[0].copy()
35
    settings.TEMPLATES[0].setdefault('DIRS', [])
36
    settings.TEMPLATES[0]['DIRS'].insert(0, str(path))
37
    zip_templates_path = path.mkdir('zip_templates')
38
    return zip_templates_path
39

  
40

  
41
@pytest.fixture
42
def tpl_builder(templates_path):
43
    def make(name_template, template_parts=(), content_parts=()):
44
        manifest_name = '%s.json' % uuid.uuid4().get_hex()
45
        manifest_path = templates_path / manifest_name
46
        d = {
47
            'name_template': name_template,
48
        }
49
        if template_parts or content_parts:
50
            d['part_templates'] = []
51
            for name_template, content in template_parts:
52
                name = '%s.xml' % uuid.uuid4().get_hex()
53
                with (templates_path / name).open('w') as fd:
54
                    fd.write(content)
55
                d['part_templates'].append({
56
                    'name_template': name_template,
57
                    'template_path': name,
58
                })
59
            for name_template, content in content_parts:
60
                d['part_templates'].append({
61
                    'name_template': name_template,
62
                    'content_expression': content,
63
                })
64
        with manifest_path.open('w') as fd:
65
            json.dump(d, fd)
66
        return '%s/%s' % (templates_path.basename, manifest_name)
67
    return make
68

  
69

  
70
@pytest.fixture
71
def dest(tmpdir):
72
    return tmpdir.mkdir('dest')
73

  
74

  
75
def test_missing():
76
    with pytest.raises(ZipTemplateDoesNotExist):
77
        ZipTemplate('zip_templates/manifest1.json')
78

  
79

  
80
def test_invalid(templates_path):
81
    path = templates_path / 'invalid-manifest.json'
82
    with path.open(mode='w') as fd:
83
        fd.write('{')
84
    with pytest.raises(ZipTemplateError) as exc_info:
85
        ZipTemplate(str(path))
86
    assert 'invalid' in exc_info.value.args[0]
87

  
88
    with path.open(mode='w') as fd:
89
        fd.write('{}')
90
    with pytest.raises(ZipTemplateError):
91
        ZipTemplate(str(path))
92
    assert 'invalid' in exc_info.value.args[0]
93

  
94

  
95
def test_syntax_error(tpl_builder, dest):
96
    zip_template = ZipTemplate(tpl_builder('{{ name -{{ counter }}.zip'), ctx={'name': 'coucou', 'counter': 10})
97
    with pytest.raises(ZipTemplateSyntaxError):
98
        zip_template.render_to_path(dest)
99

  
100
    zip_template = ZipTemplate(
101
        tpl_builder(
102
            '{{ name }}-{{ counter }}.zip',
103
            template_parts=[('part1.xml', '{{ name {{ }}')]),
104
        ctx={'name': 'coucou', 'counter': 10})
105
    with pytest.raises(ZipTemplateSyntaxError):
106
        zip_template.render_to_path(dest)
107

  
108

  
109
def test_no_parts(tpl_builder, dest):
110
    z = ZipTemplate(tpl_builder('{{ name }}-{{ counter }}.zip'),
111
                    ctx={'name': 'coucou', 'counter': 10})
112
    z.render_to_path(dest)
113

  
114
    full_path = dest / 'coucou-10.zip'
115
    with full_path.open() as fd:
116
        with zipfile.ZipFile(fd) as zi:
117
            assert zi.namelist() == []
118

  
119

  
120
def test_with_parts(tpl_builder, dest):
121
    z = ZipTemplate(
122
        tpl_builder(
123
            '{{ name }}-{{ counter }}.zip',
124
            template_parts=[('{{ name }}-{{ counter }}-part1.xml',
125
                             '<?xml version="1.0"?><body>{{ bo_dy|lower }}</body>')],
126
            content_parts=[('{{ name }}-{{ counter }}-dôc.xml', 'doc-content')],
127
        ),
128
        ctx={'name': 'coucou', 'counter': 10, 'bo_dy': 'blabla', 'doc-content': '<a>Héllo World!</a>'})
129
    z.render_to_path(dest)
130
    for part in z.parts:
131
        str(part)
132

  
133
    full_path = dest / 'coucou-10.zip'
134
    with full_path.open() as fd:
135
        with zipfile.ZipFile(fd) as zi:
136
            assert zi.namelist() == ['coucou-10-part1.xml', 'coucou-10-dôc.xml']
137
            assert zi.open('coucou-10-part1.xml').read().decode('utf-8') == '<?xml version="1.0"?><body>blabla</body>'
138
            assert zi.open('coucou-10-dôc.xml').read().decode('utf-8') == '<a>Héllo World!</a>'
139

  
140
    with io.BytesIO(z.render_to_bytes()) as fd:
141
        with zipfile.ZipFile(fd) as zi:
142
            assert zi.namelist() == ['coucou-10-part1.xml', 'coucou-10-dôc.xml']
143
            assert zi.open('coucou-10-part1.xml').read().decode('utf-8') == '<?xml version="1.0"?><body>blabla</body>'
144
            assert zi.open('coucou-10-dôc.xml').read().decode('utf-8') == '<a>Héllo World!</a>'
145

  
146

  
147
def test_xml_error(tpl_builder, dest):
148
    z = ZipTemplate(
149
        tpl_builder(
150
            'rien.zip',
151
            content_parts=[('rien.xml', 'doc-content')],
152
        ),
153
        ctx={'doc-content': '<a>Héllo World!'})
154
    with pytest.raises(ZipTemplateSyntaxError) as exc_info:
155
        z.render_to_bytes()
156
    assert 'XML syntax error' in exc_info.value.args[0]
0
-