Projet

Général

Profil

0001-utils-add-zip-package-for-templated-zip-files-36848.patch

Benjamin Dauvergne, 25 octobre 2019 10:48

Télécharger (14,9 ko)

Voir les différences:

Subject: [PATCH 1/9] utils: add zip package for templated zip files (#36848)

 passerelle/utils/zip.py | 241 ++++++++++++++++++++++++++++++++++++++++
 tests/test_utils_zip.py | 156 ++++++++++++++++++++++++++
 2 files changed, 397 insertions(+)
 create mode 100644 passerelle/utils/zip.py
 create mode 100644 tests/test_utils_zip.py
passerelle/utils/zip.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2019  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from __future__ import unicode_literals, absolute_import
18

  
19
import io
20
import os.path
21
import json
22
import re
23
import xml.etree.ElementTree as ET
24
import zipfile
25

  
26
from jsonschema import validate, ValidationError
27

  
28
from django.template import Template, Context, TemplateDoesNotExist, TemplateSyntaxError, engines
29
from django.utils.functional import cached_property
30
from django.utils.encoding import force_str
31
from django.utils.six import python_2_unicode_compatible
32
from django.template.loader import get_template
33

  
34
from passerelle.utils.files import atomic_write
35

  
36

  
37
SCHEMA = {
38
    'type': 'object',
39
    'required': ['name_template'],
40
    'properties': {
41
        'name_template': {
42
            'type': 'string',
43
        },
44
        'part_templates': {
45
            'type': 'array',
46
            'items': {
47
                'oneOf': [
48
                    {
49
                        'type': 'object',
50
                        'required': ['name_template', 'template_path'],
51
                        'additionalProperties': False,
52
                        'properties': {
53
                            'name_template': {
54
                                'type': 'string',
55
                            },
56
                            'template_path': {
57
                                'type': 'string',
58
                            },
59
                        },
60
                    },
61
                    {
62
                        'type': 'object',
63
                        'required': ['name_template', 'content_expression'],
64
                        'additionalProperties': False,
65
                        'properties': {
66
                            'name_template': {
67
                                'type': 'string',
68
                            },
69
                            'content_expression': {
70
                                'type': 'string',
71
                            },
72
                        },
73
                    }
74
                ]
75
            },
76
        },
77
    },
78
}
79

  
80

  
81
class ZipTemplateError(Exception):
82
    pass
83

  
84

  
85
class ZipTemplateDoesNotExist(ZipTemplateError):
86
    pass
87

  
88

  
89
class ZipTemplateSyntaxError(ZipTemplateError):
90
    pass
91

  
92
VARIABLE_RE = re.compile(r'{{ *(\w*)')
93

  
94

  
95
@python_2_unicode_compatible
96
class ZipPart(object):
97
    def __init__(self, zip_template, name_template, template_path=None, content_expression=None):
98
        self.zip_template = zip_template
99
        self._name_template = name_template
100
        self.template_path = template_path
101
        self.content_expression = content_expression
102
        assert bool(self.template_path) ^ bool(self.content_expression), '\
103
            template_path and content_expression are mutually excluded'
104

  
105
    @property
106
    def ctx(self):
107
        return self.zip_template.ctx
108

  
109
    @property
110
    def base_path(self):
111
        return self.zip_template.base_path
112

  
113
    @property
114
    def template(self):
115
        assert self.name_template, 'not a template_path part'
116

  
117
        template_path = os.path.join(self.base_path, self.template_path)
118
        if template_path.startswith('/'):
119
            if not os.path.exists(template_path):
120
                raise ZipTemplateDoesNotExist('part template %s not found' % template_path, e)
121
            try:
122
                with open(template_path) as fd:
123
                    return Template(fd.read())
124
            except TemplateSyntaxError as e:
125
                raise ZipTemplateSyntaxError('syntax error in part template %s' % template_path, e)
126
        else:
127
            try:
128
                return get_template(template_path).template
129
            except TemplateSyntaxError as e:
130
                raise ZipTemplateSyntaxError('syntax error in part template %s' % template_path, e)
131
            except TemplateDoesNotExist as e:
132
                raise ZipTemplateDoesNotExist('part template %s not found' % template_path, e)
133

  
134
    @property
135
    def name_template(self):
136
        try:
137
            return Template(self._name_template)
138
        except TemplateSyntaxError as e:
139
            raise ZipTemplateSyntaxError('syntax error in part\'s name template %s' % self, e)
140

  
141
    def _render(self, template):
142
        return template.render(Context(self.ctx, use_l10n=False))
143

  
144
    @property
145
    def content(self):
146
        if self.template_path:
147
            return self._render(self.template)
148
        else:
149
            return self.ctx[self.content_expression]
150

  
151
    @property
152
    def name(self):
153
        return self._render(self.name_template)
154

  
155
    def __str__(self):
156
        s = '<{0.__class__.__name__} name_template={0._name_template}'
157
        if self.template_path:
158
            s += ' template_path={0.template_path!r}'
159
        else:
160
            s += ' content_expression={0.content_expression!r}'
161
        s += '>'
162
        return s.format(self)
163

  
164

  
165
class ZipTemplate(object):
166
    def __init__(self, manifest, ctx=None):
167
        if manifest.startswith('/'):
168
            path = manifest
169
        else:
170
            path = None
171
            for engine in engines.all():
172
                for loader in engine.engine.template_loaders:
173
                    for origin in loader.get_template_sources(manifest):
174
                        if os.path.exists(origin.name):
175
                            path = origin.name
176
                            break
177
                    if path:
178
                        break
179
                if path:
180
                    break
181
        if not path:
182
            raise ZipTemplateDoesNotExist('manifest %s not found' % manifest)
183
        self.base_path = os.path.dirname(manifest)
184
        self.manifest_path = path
185
        try:
186
            manifest = self.manifest
187
        except ValueError as e:
188
            raise ZipTemplateError('invalid manifest file %s' % path, e)
189
        try:
190
            validate(self.manifest, SCHEMA)
191
        except ValidationError as e:
192
            raise ZipTemplateError('invalid manifest file %s' % path, e)
193
        self.ctx = ctx or {}
194

  
195
    @cached_property
196
    def manifest(self):
197
        with open(self.manifest_path) as fd:
198
            return json.load(fd)
199

  
200
    @property
201
    def name_template(self):
202
        try:
203
            return Template(self.manifest['name_template'])
204
        except TemplateSyntaxError as e:
205
            raise ZipTemplateSyntaxError('syntax error in zip name_template', e)
206

  
207
    @property
208
    def name(self):
209
        return self.name_template.render(Context(self.ctx))
210

  
211
    @property
212
    def parts(self):
213
        for part_template in self.manifest.get('part_templates', []):
214
            yield ZipPart(zip_template=self, **part_template)
215

  
216
    @property
217
    def rendered_parts(self):
218
        for zip_part in self.parts:
219
            name = zip_part.name
220
            content = zip_part.content
221
            if name.endswith('.xml'):
222
                try:
223
                    ET.fromstring(force_str(content))
224
                except ET.ParseError as e:
225
                    raise ZipTemplateSyntaxError('XML syntax error in part template %s' % zip_part, e)
226
            yield name, zip_part.content
227

  
228
    def render_to_bytes(self):
229
        with io.BytesIO() as buf:
230
            self.render_to_file(buf)
231
            return buf.getvalue()
232

  
233
    def render_to_file(self, filelike):
234
        with zipfile.ZipFile(filelike, 'w') as zi:
235
            for name, content in self.rendered_parts:
236
                zi.writestr(name, force_str(content))
237

  
238
    def render_to_path(self, path, tmp_dir=None):
239
        full_path = os.path.join(str(path), self.name)
240
        with atomic_write(full_path, dir=tmp_dir) as fd:
241
            self.render_to_file(fd)
tests/test_utils_zip.py
1
# coding: utf-8
2
# passerelle - uniform access to multiple data sources and services
3
# Copyright (C) 2019 Entr'ouvert
4
#
5
# This program is free software: you can redistribute it and/or modify it
6
# under the terms of the GNU Affero General Public License as published
7
# by the Free Software Foundation, either version 3 of the License, or
8
# (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17

  
18
from __future__ import unicode_literals
19

  
20
import io
21
import json
22
import uuid
23
import zipfile
24

  
25
import pytest
26

  
27
from passerelle.utils.zip import ZipTemplate, ZipTemplateDoesNotExist, ZipTemplateSyntaxError, ZipTemplateError
28

  
29

  
30
@pytest.fixture
31
def templates_path(tmpdir, settings):
32
    path = tmpdir.mkdir('templates')
33
    settings.TEMPLATES = settings.TEMPLATES[:]
34
    settings.TEMPLATES[0] = settings.TEMPLATES[0].copy()
35
    settings.TEMPLATES[0].setdefault('DIRS', [])
36
    settings.TEMPLATES[0]['DIRS'].insert(0, str(path))
37
    zip_templates_path = path.mkdir('zip_templates')
38
    return zip_templates_path
39

  
40

  
41
@pytest.fixture
42
def tpl_builder(templates_path):
43
    def make(name_template, template_parts=(), content_parts=()):
44
        manifest_name = '%s.json' % uuid.uuid4().get_hex()
45
        manifest_path = templates_path / manifest_name
46
        d = {
47
            'name_template': name_template,
48
        }
49
        if template_parts or content_parts:
50
            d['part_templates'] = []
51
            for name_template, content in template_parts:
52
                name = '%s.xml' % uuid.uuid4().get_hex()
53
                with (templates_path / name).open('w') as fd:
54
                    fd.write(content)
55
                d['part_templates'].append({
56
                    'name_template': name_template,
57
                    'template_path': name,
58
                })
59
            for name_template, content in content_parts:
60
                d['part_templates'].append({
61
                    'name_template': name_template,
62
                    'content_expression': content,
63
                })
64
        with manifest_path.open('w') as fd:
65
            json.dump(d, fd)
66
        return '%s/%s' % (templates_path.basename, manifest_name)
67
    return make
68

  
69

  
70
@pytest.fixture
71
def dest(tmpdir):
72
    return tmpdir.mkdir('dest')
73

  
74

  
75
def test_missing():
76
    with pytest.raises(ZipTemplateDoesNotExist):
77
        ZipTemplate('zip_templates/manifest1.json')
78

  
79

  
80
def test_invalid(templates_path):
81
    path = templates_path / 'invalid-manifest.json'
82
    with path.open(mode='w') as fd:
83
        fd.write('{')
84
    with pytest.raises(ZipTemplateError) as exc_info:
85
        ZipTemplate(str(path))
86
    assert 'invalid' in exc_info.value.args[0]
87

  
88
    with path.open(mode='w') as fd:
89
        fd.write('{}')
90
    with pytest.raises(ZipTemplateError):
91
        ZipTemplate(str(path))
92
    assert 'invalid' in exc_info.value.args[0]
93

  
94

  
95
def test_syntax_error(tpl_builder, dest):
96
    zip_template = ZipTemplate(tpl_builder('{{ name -{{ counter }}.zip'), ctx={'name': 'coucou', 'counter': 10})
97
    with pytest.raises(ZipTemplateSyntaxError):
98
        zip_template.render_to_path(dest)
99

  
100
    zip_template = ZipTemplate(
101
        tpl_builder(
102
            '{{ name }}-{{ counter }}.zip',
103
            template_parts=[('part1.xml', '{{ name {{ }}')]),
104
        ctx={'name': 'coucou', 'counter': 10})
105
    with pytest.raises(ZipTemplateSyntaxError):
106
        zip_template.render_to_path(dest)
107

  
108

  
109
def test_no_parts(tpl_builder, dest):
110
    z = ZipTemplate(tpl_builder('{{ name }}-{{ counter }}.zip'),
111
                    ctx={'name': 'coucou', 'counter': 10})
112
    z.render_to_path(dest)
113

  
114
    full_path = dest / 'coucou-10.zip'
115
    with full_path.open() as fd:
116
        with zipfile.ZipFile(fd) as zi:
117
            assert zi.namelist() == []
118

  
119

  
120
def test_with_parts(tpl_builder, dest):
121
    z = ZipTemplate(
122
        tpl_builder(
123
            '{{ name }}-{{ counter }}.zip',
124
            template_parts=[('{{ name }}-{{ counter }}-part1.xml',
125
                             '<?xml version="1.0"?><body>{{ bo_dy|lower }}</body>')],
126
            content_parts=[('{{ name }}-{{ counter }}-dôc.xml', 'doc-content')],
127
        ),
128
        ctx={'name': 'coucou', 'counter': 10, 'bo_dy': 'blabla', 'doc-content': '<a>Héllo World!</a>'})
129
    z.render_to_path(dest)
130
    for part in z.parts:
131
        str(part)
132

  
133
    full_path = dest / 'coucou-10.zip'
134
    with full_path.open() as fd:
135
        with zipfile.ZipFile(fd) as zi:
136
            assert zi.namelist() == ['coucou-10-part1.xml', 'coucou-10-dôc.xml']
137
            assert zi.open('coucou-10-part1.xml').read().decode('utf-8') == '<?xml version="1.0"?><body>blabla</body>'
138
            assert zi.open('coucou-10-dôc.xml').read().decode('utf-8') == '<a>Héllo World!</a>'
139

  
140
    with io.BytesIO(z.render_to_bytes()) as fd:
141
        with zipfile.ZipFile(fd) as zi:
142
            assert zi.namelist() == ['coucou-10-part1.xml', 'coucou-10-dôc.xml']
143
            assert zi.open('coucou-10-part1.xml').read().decode('utf-8') == '<?xml version="1.0"?><body>blabla</body>'
144
            assert zi.open('coucou-10-dôc.xml').read().decode('utf-8') == '<a>Héllo World!</a>'
145

  
146

  
147
def test_xml_error(tpl_builder, dest):
148
    z = ZipTemplate(
149
        tpl_builder(
150
            'rien.zip',
151
            content_parts=[('rien.xml', 'doc-content')],
152
        ),
153
        ctx={'doc-content': '<a>Héllo World!'})
154
    with pytest.raises(ZipTemplateSyntaxError) as exc_info:
155
        z.render_to_bytes()
156
    assert 'XML syntax error' in exc_info.value.args[0]
0
-