0001-utils-add-zip-package-for-templated-zip-files-36848.patch
passerelle/utils/zip.py | ||
---|---|---|
1 |
# passerelle - uniform access to multiple data sources and services |
|
2 |
# Copyright (C) 2019 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from __future__ import unicode_literals |
|
18 | ||
19 |
import io |
|
20 |
import os.path |
|
21 |
import json |
|
22 |
import zipfile |
|
23 | ||
24 |
from jsonschema import validate, ValidationError |
|
25 | ||
26 |
from django.template import Template, Context, TemplateDoesNotExist, TemplateSyntaxError, engines |
|
27 |
from django.utils.functional import cached_property |
|
28 |
from django.template.loader import get_template |
|
29 | ||
30 |
from passerelle.utils.files import atomic_write |
|
31 | ||
32 | ||
33 |
SCHEMA = { |
|
34 |
'type': 'object', |
|
35 |
'required': ['name_template'], |
|
36 |
'properties': { |
|
37 |
'name_template': { |
|
38 |
'type': 'string', |
|
39 |
}, |
|
40 |
'part_templates': { |
|
41 |
'type': 'array', |
|
42 |
'items': { |
|
43 |
'type': 'object', |
|
44 |
'required': ['name_template', 'template_path'], |
|
45 |
'properties': { |
|
46 |
'name_template': { |
|
47 |
'type': 'string', |
|
48 |
}, |
|
49 |
'template_path': { |
|
50 |
'type': 'string', |
|
51 |
}, |
|
52 |
}, |
|
53 |
}, |
|
54 |
}, |
|
55 |
}, |
|
56 |
} |
|
57 | ||
58 | ||
59 |
class ZipTemplateError(Exception): |
|
60 |
pass |
|
61 | ||
62 | ||
63 |
class ZipTemplateDoesNotExist(ZipTemplateError): |
|
64 |
pass |
|
65 | ||
66 | ||
67 |
class ZipTemplateSyntaxError(ZipTemplateError): |
|
68 |
pass |
|
69 | ||
70 | ||
71 |
class ZipTemplate(object): |
|
72 |
def __init__(self, manifest, ctx=None): |
|
73 |
path = None |
|
74 |
for engine in engines.all(): |
|
75 |
for loader in engine.engine.template_loaders: |
|
76 |
for origin in loader.get_template_sources(manifest): |
|
77 |
if os.path.exists(origin.name): |
|
78 |
path = origin.name |
|
79 |
break |
|
80 |
if path: |
|
81 |
break |
|
82 |
if path: |
|
83 |
break |
|
84 |
if not path: |
|
85 |
raise ZipTemplateDoesNotExist('manifest %s not found' % manifest) |
|
86 |
self.base_path = os.path.dirname(manifest) |
|
87 |
self.manifest_path = path |
|
88 |
try: |
|
89 |
manifest = self.manifest |
|
90 |
except ValueError as e: |
|
91 |
raise ZipTemplateError('invalid manifest file %s' % path, e) |
|
92 |
try: |
|
93 |
validate(self.manifest, SCHEMA) |
|
94 |
except ValidationError as e: |
|
95 |
raise ZipTemplateError('invalid manifest file %s' % path, e) |
|
96 |
self.ctx = ctx or {} |
|
97 | ||
98 |
@cached_property |
|
99 |
def manifest(self): |
|
100 |
with open(self.manifest_path) as fd: |
|
101 |
return json.load(fd) |
|
102 | ||
103 |
def __render_template(self, template_content, origin): |
|
104 |
try: |
|
105 |
return Template(template_content).render(Context(self.ctx, use_l10n=False)) |
|
106 |
except TemplateSyntaxError as e: |
|
107 |
raise ZipTemplateSyntaxError(e) |
|
108 | ||
109 |
@property |
|
110 |
def name(self): |
|
111 |
return self.__render_template(self.manifest['name_template'], '<name_template>') |
|
112 | ||
113 |
@property |
|
114 |
def parts(self): |
|
115 |
for part_template in self.manifest.get('part_templates', []): |
|
116 |
name = self.__render_template(part_template['name_template'], part_template) |
|
117 |
template_path = os.path.join(self.base_path, part_template['template_path']) |
|
118 |
try: |
|
119 |
template = get_template(template_path) |
|
120 |
except TemplateSyntaxError as e: |
|
121 |
raise ZipTemplateSyntaxError('syntax error in part template %s' % template_path, e) |
|
122 |
except TemplateDoesNotExist as e: |
|
123 |
raise ZipTemplateDoesNotExist('part template %s not found' % template_path, e) |
|
124 |
content = template.render(self.ctx) |
|
125 |
yield name, content |
|
126 | ||
127 |
def render_to_bytes(self): |
|
128 |
with io.BytesIO() as buf: |
|
129 |
self.render_to_file(buf) |
|
130 |
return buf.getvalue() |
|
131 | ||
132 |
def render_to_file(self, filelike): |
|
133 |
with zipfile.ZipFile(filelike, 'w') as zi: |
|
134 |
for name, content in self.parts: |
|
135 |
zi.writestr(name, content) |
|
136 | ||
137 |
def render_to_path(self, path, tmp_dir=None): |
|
138 |
full_path = os.path.join(str(path), self.name) |
|
139 |
with atomic_write(full_path, dir=tmp_dir) as fd: |
|
140 |
self.render_to_file(fd) |
tests/test_utils_zip.py | ||
---|---|---|
1 |
# passerelle - uniform access to multiple data sources and services |
|
2 |
# Copyright (C) 2019 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from __future__ import unicode_literals |
|
18 | ||
19 |
import json |
|
20 |
import uuid |
|
21 |
import zipfile |
|
22 | ||
23 |
import pytest |
|
24 | ||
25 |
from passerelle.utils.zip import ZipTemplate, ZipTemplateDoesNotExist, ZipTemplateSyntaxError |
|
26 | ||
27 | ||
28 |
@pytest.fixture |
|
29 |
def templates_path(tmpdir, settings): |
|
30 |
path = tmpdir.mkdir('templates') |
|
31 |
settings.TEMPLATES = settings.TEMPLATES[:] |
|
32 |
settings.TEMPLATES[0] = settings.TEMPLATES[0].copy() |
|
33 |
settings.TEMPLATES[0].setdefault('DIRS', []) |
|
34 |
settings.TEMPLATES[0]['DIRS'].insert(0, str(path)) |
|
35 |
zip_templates_path = path.mkdir('zip_templates') |
|
36 |
return zip_templates_path |
|
37 | ||
38 | ||
39 |
@pytest.fixture |
|
40 |
def tpl_builder(templates_path): |
|
41 |
def make(name_template, *parts): |
|
42 |
manifest_name = '%s.json' % uuid.uuid4().get_hex() |
|
43 |
manifest_path = templates_path / manifest_name |
|
44 |
d = { |
|
45 |
'name_template': name_template, |
|
46 |
} |
|
47 |
if parts: |
|
48 |
d['part_templates'] = [] |
|
49 |
for name_template, content in parts: |
|
50 |
name = '%s.xml' % uuid.uuid4().get_hex() |
|
51 |
with (templates_path / name).open('w') as fd: |
|
52 |
fd.write(content) |
|
53 |
d['part_templates'].append({ |
|
54 |
'name_template': name_template, |
|
55 |
'template_path': name, |
|
56 |
}) |
|
57 |
with manifest_path.open('w') as fd: |
|
58 |
json.dump(d, fd) |
|
59 |
return '%s/%s' % (templates_path.basename, manifest_name) |
|
60 |
return make |
|
61 | ||
62 | ||
63 |
@pytest.fixture |
|
64 |
def dest(tmpdir): |
|
65 |
return tmpdir.mkdir('dest') |
|
66 | ||
67 | ||
68 |
def test_missing(templates_path): |
|
69 |
with pytest.raises(ZipTemplateDoesNotExist): |
|
70 |
ZipTemplate('zip_templates/manifest1.json') |
|
71 | ||
72 | ||
73 |
def test_syntax_error(tpl_builder, dest): |
|
74 |
zip_template = ZipTemplate(tpl_builder('{{ name -{{ counter }}.zip'), ctx={'name': 'coucou', 'counter': 10}) |
|
75 |
with pytest.raises(ZipTemplateSyntaxError): |
|
76 |
zip_template.render_to_path(dest) |
|
77 | ||
78 |
zip_template = ZipTemplate( |
|
79 |
tpl_builder( |
|
80 |
'{{ name }}-{{ counter }}.zip', |
|
81 |
('part1.xml', '{{ name {{ }}')), |
|
82 |
ctx={'name': 'coucou', 'counter': 10}) |
|
83 |
with pytest.raises(ZipTemplateSyntaxError): |
|
84 |
zip_template.render_to_path(dest) |
|
85 | ||
86 | ||
87 |
def test_no_parts(tpl_builder, dest): |
|
88 |
ZipTemplate(tpl_builder('{{ name }}-{{ counter }}.zip'), |
|
89 |
ctx={'name': 'coucou', 'counter': 10}).render_to_path(dest) |
|
90 | ||
91 |
full_path = dest / 'coucou-10.zip' |
|
92 |
with full_path.open() as fd: |
|
93 |
with zipfile.ZipFile(fd) as zi: |
|
94 |
assert zi.namelist() == [] |
|
95 | ||
96 | ||
97 |
def test_with_parts(tpl_builder, dest): |
|
98 |
ZipTemplate( |
|
99 |
tpl_builder( |
|
100 |
'{{ name }}-{{ counter }}.zip', |
|
101 |
('{{ name }}-{{ counter }}-part1.xml', '<?xml?><body>{{ body }}</body>'), |
|
102 |
), |
|
103 |
ctx={'name': 'coucou', 'counter': 10, 'body': 'blabla'}).render_to_path(dest) |
|
104 | ||
105 |
full_path = dest / 'coucou-10.zip' |
|
106 |
with full_path.open() as fd: |
|
107 |
with zipfile.ZipFile(fd) as zi: |
|
108 |
assert zi.namelist() == ['coucou-10-part1.xml'] |
|
109 |
assert zi.open('coucou-10-part1.xml').read() == '<?xml?><body>blabla</body>' |
|
0 |
- |