0003-wf-export_to_model-add-opendocument-support-4292.patch
tests/test_form_pages.py | ||
---|---|---|
3 | 3 |
import os |
4 | 4 |
import re |
5 | 5 |
import StringIO |
6 |
import zipfile |
|
6 | 7 |
from webtest import Upload |
7 | 8 | |
8 | 9 |
from quixote.http_request import Upload as QuixoteUpload |
... | ... | |
23 | 24 | |
24 | 25 |
from utilities import get_app, login, create_temporary_pub, clean_temporary_pub, emails |
25 | 26 | |
27 | ||
28 |
def assert_equal_zip(stream1, stream2): |
|
29 |
z1 = zipfile.ZipFile(stream1) |
|
30 |
z2 = zipfile.ZipFile(stream2) |
|
31 |
assert set(z1.namelist()) == set(z2.namelist()) |
|
32 |
for name in z1.namelist(): |
|
33 |
assert z1.read(name) == z2.read(name), 'file "%s" differs' % name |
|
34 | ||
35 | ||
26 | 36 |
def pytest_generate_tests(metafunc): |
27 | 37 |
if 'pub' in metafunc.fixturenames: |
28 | 38 |
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True) |
... | ... | |
1135 | 1145 |
st1 = wf.add_status('Status1', 'st1') |
1136 | 1146 |
export_to = ExportToModel() |
1137 | 1147 |
export_to.label = 'create doc' |
1138 |
upload = QuixoteUpload('/foo/test.rtf', content_type='text/rtf')
|
|
1148 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
|
|
1139 | 1149 |
upload.fp = StringIO.StringIO() |
1140 | 1150 |
upload.fp.write('HELLO WORLD') |
1141 | 1151 |
upload.fp.seek(0) |
... | ... | |
1178 | 1188 |
resp = resp.click('test.rtf') |
1179 | 1189 |
assert resp.location.endswith('/test.rtf') |
1180 | 1190 |
resp = resp.follow() |
1181 |
assert resp.content_type == 'text/rtf'
|
|
1191 |
assert resp.content_type == 'application/rtf'
|
|
1182 | 1192 |
assert resp.body == 'HELLO WORLD' |
1183 | 1193 | |
1184 | 1194 |
# change file content, same name |
1185 |
upload = QuixoteUpload('/foo/test.rtf', content_type='text/rtf')
|
|
1195 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf')
|
|
1186 | 1196 |
upload.fp = StringIO.StringIO() |
1187 | 1197 |
upload.fp.write('HELLO NEW WORLD') |
1188 | 1198 |
upload.fp.seek(0) |
... | ... | |
1197 | 1207 |
assert resp.click('test.rtf', index=0).follow().body == 'HELLO WORLD' |
1198 | 1208 |
assert resp.click('test.rtf', index=1).follow().body == 'HELLO NEW WORLD' |
1199 | 1209 | |
1210 |
def test_formdata_generated_document_odt_download(pub): |
|
1211 |
create_user(pub) |
|
1212 |
wf = Workflow(name='status') |
|
1213 |
st1 = wf.add_status('Status1', 'st1') |
|
1214 |
export_to = ExportToModel() |
|
1215 |
export_to.label = 'create doc' |
|
1216 |
template_filename = os.path.join(os.path.dirname(__file__), 'template.odt') |
|
1217 |
template = open(template_filename).read() |
|
1218 |
upload = QuixoteUpload('/foo/template.odt', content_type='application/octet-stream') |
|
1219 |
upload.fp = StringIO.StringIO() |
|
1220 |
upload.fp.write(template) |
|
1221 |
upload.fp.seek(0) |
|
1222 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
1223 |
export_to.id = '_export_to' |
|
1224 |
export_to.by = ['_submitter'] |
|
1225 |
st1.items.append(export_to) |
|
1226 |
export_to.parent = st1 |
|
1227 |
wf.store() |
|
1228 | ||
1229 |
formdef = create_formdef() |
|
1230 |
formdef.workflow_id = wf.id |
|
1231 |
formdef.fields = [] |
|
1232 |
formdef.store() |
|
1233 |
formdef.data_class().wipe() |
|
1234 | ||
1235 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
1236 |
resp = resp.forms[0].submit('submit') |
|
1237 |
assert 'Check values then click submit.' in resp.body |
|
1238 |
resp = resp.forms[0].submit('submit') |
|
1239 |
assert resp.status_int == 302 |
|
1240 |
form_location = resp.location |
|
1241 |
resp = resp.follow() |
|
1242 |
assert 'The form has been recorded' in resp.body |
|
1243 | ||
1244 |
resp = resp.form.submit('button_export_to') |
|
1245 | ||
1246 |
resp = resp.follow() # $form/$id/create_doc |
|
1247 |
resp = resp.follow() # $form/$id/create_doc/ |
|
1248 |
with open(os.path.join(os.path.dirname(__file__), 'template-out.odt')) as f: |
|
1249 |
assert_equal_zip(StringIO.StringIO(resp.body), f) |
|
1250 | ||
1251 |
export_to.attach_to_history = True |
|
1252 |
wf.store() |
|
1253 | ||
1254 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
1255 |
resp = resp.form.submit('button_export_to') |
|
1256 |
assert resp.location == form_location |
|
1257 |
resp = resp.follow() # back to form page |
|
1258 | ||
1259 |
resp = resp.click('template.odt') |
|
1260 |
assert resp.location.endswith('/template.odt') |
|
1261 |
resp = resp.follow() |
|
1262 |
assert resp.content_type == 'application/octet-stream' |
|
1263 |
with open(os.path.join(os.path.dirname(__file__), 'template-out.odt')) as f: |
|
1264 |
assert_equal_zip(StringIO.StringIO(resp.body), f) |
|
1265 | ||
1266 |
# change file content, same name |
|
1267 |
upload = QuixoteUpload('/foo/test.rtf', content_type='application/rtf') |
|
1268 |
upload.fp = StringIO.StringIO() |
|
1269 |
upload.fp.write('HELLO NEW WORLD') |
|
1270 |
upload.fp.seek(0) |
|
1271 |
export_to.model_file = UploadedFile(pub.app_dir, None, upload) |
|
1272 |
wf.store() |
|
1273 | ||
1274 |
resp = login(get_app(pub), username='foo', password='foo').get(form_location) |
|
1275 |
resp = resp.form.submit('button_export_to') |
|
1276 |
assert resp.location == form_location |
|
1277 |
resp = resp.follow() # back to form page |
|
1278 | ||
1279 |
with open(os.path.join(os.path.dirname(__file__), 'template-out.odt')) as f: |
|
1280 |
body = resp.click('template.odt', index=0).follow().body |
|
1281 |
assert_equal_zip(StringIO.StringIO(body), f) |
|
1282 |
assert resp.click('test.rtf', index=0).follow().body == 'HELLO NEW WORLD' |
|
1283 | ||
1200 | 1284 |
def test_formdata_form_file_download(pub): |
1201 | 1285 |
create_user(pub) |
1202 | 1286 |
wf = Workflow(name='status') |
wcs/wf/export_to_model.py | ||
---|---|---|
16 | 16 | |
17 | 17 |
from StringIO import StringIO |
18 | 18 |
from xml.etree import ElementTree as ET |
19 |
import zipfile |
|
19 | 20 | |
20 | 21 |
from quixote import get_response, get_request, get_publisher |
21 | 22 |
from quixote.directory import Directory |
... | ... | |
33 | 34 |
template_on_formdata, register_item_class) |
34 | 35 | |
35 | 36 | |
37 |
def transform_opendocument(instream, outstream, process): |
|
38 |
'''Take a file-like object containing an ODT, ODS, or any open-office |
|
39 |
format, parse context.xml with element tree and apply process to its root |
|
40 |
node. |
|
41 |
''' |
|
42 |
zin = zipfile.ZipFile(instream, mode='r') |
|
43 |
zout = zipfile.ZipFile(outstream, mode='w') |
|
44 |
assert 'content.xml' in zin.namelist() |
|
45 |
for filename in zin.namelist(): |
|
46 |
content = zin.read(filename) |
|
47 |
if filename in ('meta.xml', 'content.xml'): |
|
48 |
root = ET.fromstring(content) |
|
49 |
process(root) |
|
50 |
content = ET.tostring(root) |
|
51 |
zout.writestr(filename, content) |
|
52 |
zout.close() |
|
53 | ||
54 | ||
55 |
def is_opendocument(stream): |
|
56 |
try: |
|
57 |
with zipfile.ZipFile(stream) as z: |
|
58 |
if 'mimetype' in z.namelist(): |
|
59 |
return z.read('mimetype').startswith('application/vnd.oasis.opendocument.') |
|
60 |
except zipfile.BadZipfile: |
|
61 |
return False |
|
62 |
finally: |
|
63 |
stream.seek(0) |
|
64 | ||
65 | ||
36 | 66 |
class TemplatingError(PublishError): |
37 | 67 |
def __init__(self, description): |
38 | 68 |
self.title = _('Templating Error') |
... | ... | |
151 | 181 |
return base_url + self.get_directory_name() |
152 | 182 | |
153 | 183 |
def model_file_validation(self, upload): |
184 |
if hasattr(upload, 'fp'): |
|
185 |
fp = upload.fp |
|
186 |
elif hasattr(upload, 'get_file'): |
|
187 |
fp = upload.get_file() |
|
188 |
else: |
|
189 |
raise Exception('unknown upload %r' % upload) |
|
154 | 190 |
if upload.content_type and upload.content_type == 'application/rtf': |
155 |
return True, ''
|
|
191 |
return 'rtf', ''
|
|
156 | 192 |
if (upload.content_type and upload.content_type == 'application/octet-stream') or \ |
157 | 193 |
upload.content_type is None: |
158 | 194 |
if upload.base_filename and upload.base_filename.endswith('.rtf'): |
159 |
return True, '' |
|
160 |
upload.fp.seek(0) |
|
161 |
if upload.read(10).startswith('{\\rtf'): |
|
162 |
upload.fp.seek(0) |
|
163 |
return True, '' |
|
164 |
return False, _('Only RTF files can be used') |
|
195 |
return 'rtf', '' |
|
196 |
if fp.read(10).startswith('{\\rtf'): |
|
197 |
fp.seek(0) |
|
198 |
return 'rtf', '' |
|
199 |
fp.seek(0) |
|
200 |
if upload.content_type and upload.content_type.startswith('application/vnd.oasis.opendocument.'): |
|
201 |
return 'opendocument', '' |
|
202 |
if (upload.content_type and upload.content_type == 'application/octet-stream') or \ |
|
203 |
upload.content_type is None: |
|
204 |
if upload.base_filename and upload.base_filename.rsplit('.', 1) in ('odt', 'ods', 'odc', 'odb'): |
|
205 |
return 'opendocument', '' |
|
206 |
if is_opendocument(fp): |
|
207 |
return 'opendocument', '' |
|
208 |
return False, _('Only RTF and OpenDocument files can be used') |
|
165 | 209 | |
166 | 210 |
def add_parameters_widgets(self, form, parameters, prefix='', |
167 | 211 |
formdef=None): |
... | ... | |
222 | 266 |
directory_name = property(get_directory_name) |
223 | 267 | |
224 | 268 |
def apply_template_to_formdata(self, formdata): |
269 |
assert self.model_file |
|
270 |
kind, msg = self.model_file_validation(self.model_file) |
|
271 |
if kind == 'rtf': |
|
272 |
return self.apply_rtf_template_to_formdata(formdata) |
|
273 |
elif kind == 'opendocument': |
|
274 |
return self.apply_od_template_to_formdata(formdata) |
|
275 |
else: |
|
276 |
raise Exception('unsupported model kind %r' % kind) |
|
277 | ||
278 |
def apply_rtf_template_to_formdata(self, formdata): |
|
225 | 279 |
process = None |
226 | 280 |
if self.model_file.base_filename.endswith('.rtf'): |
227 | 281 |
process = rtf_process |
... | ... | |
242 | 296 |
raise TemplatingError(_('Unknown error in the ' |
243 | 297 |
'template: %s') % str(e)) |
244 | 298 | |
299 | ||
300 |
def apply_od_template_to_formdata(self, formdata): |
|
301 |
def process_root(root): |
|
302 |
def process_text(t): |
|
303 |
if isinstance(t, unicode): |
|
304 |
t = t.encode(get_publisher().site_charset) |
|
305 |
t = template_on_formdata(formdata, t) |
|
306 |
return unicode(t, get_publisher().site_charset) |
|
307 |
for node in root.iter(): |
|
308 |
if node.text: |
|
309 |
node.text = process_text(node.text) |
|
310 |
if node.tail: |
|
311 |
node.tail = process_text(node.tail) |
|
312 |
outstream = StringIO() |
|
313 |
transform_opendocument(self.model_file.get_file(), outstream, |
|
314 |
process_root) |
|
315 |
return outstream.getvalue() |
|
316 | ||
317 | ||
245 | 318 |
def get_parameters(self): |
246 | 319 |
return ('by', 'label', 'model_file', 'attach_to_history', |
247 | 320 |
'backoffice_info_text') |
248 |
- |