0001-wscall-allow-storing-in-backoffice-file-without-varn.patch
tests/test_workflows.py | ||
---|---|---|
1736 | 1736 |
item.parent = st1 |
1737 | 1737 |
item.backoffice_filefield_id = 'bo1' |
1738 | 1738 |
item.url = 'http://remote.example.net/xml' |
1739 |
item.varname = 'xxx' |
|
1740 | 1739 |
item.response_type = 'attachment' |
1741 | 1740 |
item.record_errors = True |
1742 | 1741 |
item.perform(formdata) |
1743 | 1742 | |
1743 |
assert 'bo1' in formdata.data |
|
1744 |
fbo1 = formdata.data['bo1'] |
|
1745 |
assert fbo1.base_filename == 'file-bo1.xml' |
|
1746 |
assert fbo1.content_type == 'text/xml' |
|
1747 |
assert fbo1.get_content().startswith('<?xml') |
|
1748 |
# nothing else is stored |
|
1749 |
assert formdata.workflow_data is None |
|
1750 |
assert not formdata.evolution[-1].parts |
|
1751 | ||
1752 |
# store in backoffice file field + varname |
|
1753 |
formdata = formdef.data_class()() |
|
1754 |
formdata.data = {} |
|
1755 |
formdata.just_created() |
|
1756 |
formdata.store() |
|
1757 |
item.varname = 'xxx' |
|
1758 |
item.perform(formdata) |
|
1759 |
# backoffice file field |
|
1744 | 1760 |
assert 'bo1' in formdata.data |
1745 | 1761 |
fbo1 = formdata.data['bo1'] |
1746 | 1762 |
assert fbo1.base_filename == 'xxx.xml' |
1747 | 1763 |
assert fbo1.content_type == 'text/xml' |
1748 | 1764 |
assert fbo1.get_content().startswith('<?xml') |
1765 |
# varname => workflow_data and AttachmentEvolutionPart |
|
1766 |
assert formdata.workflow_data.get('xxx_status') == 200 |
|
1767 |
assert formdata.workflow_data.get('xxx_content_type') == 'text/xml' |
|
1768 |
attachment = formdata.evolution[-1].parts[-1] |
|
1769 |
assert isinstance(attachment, AttachmentEvolutionPart) |
|
1770 |
assert attachment.base_filename == 'xxx.xml' |
|
1771 |
assert attachment.content_type == 'text/xml' |
|
1772 |
attachment.fp.seek(0) |
|
1773 |
assert attachment.fp.read(5) == '<?xml' |
|
1749 | 1774 | |
1750 | 1775 |
# no more 'bo1' backoffice field: do nothing |
1751 | 1776 |
formdata = formdef.data_class()() |
... | ... | |
1768 | 1793 |
item.perform(formdata) |
1769 | 1794 |
assert formdata.data == {} |
1770 | 1795 | |
1796 | ||
1771 | 1797 |
def test_webservice_target_status(pub): |
1772 | 1798 |
wf = Workflow(name='boo') |
1773 | 1799 |
status1 = wf.add_status('Status1', 'st1') |
wcs/wf/wscall.py | ||
---|---|---|
322 | 322 |
formdata.update_workflow_data(workflow_data) |
323 | 323 |
formdata.store() |
324 | 324 | |
325 |
if self.backoffice_filefield_id: |
|
326 |
if (status // 100) == 2 and app_error_code == 0 and status not in (204, 205): |
|
327 |
self.store_backoffice_filefield(formdata, response, data) |
|
328 | ||
325 | 329 |
if app_error_code != 0: |
326 | 330 |
self.action_on_error(self.action_on_app_error, formdata, response, data=data) |
327 | 331 |
if (status // 100) == 4: |
... | ... | |
329 | 333 |
if (status // 100) == 5: |
330 | 334 |
self.action_on_error(self.action_on_5xx, formdata, response, data=data) |
331 | 335 | |
336 |
def get_attachment_data(self, response): |
|
337 |
content_type = response.headers.get('content-type') or '' |
|
338 |
if content_type: |
|
339 |
content_type = content_type.split(';')[0].strip().lower() |
|
340 |
extension = mimetypes.guess_extension(content_type, strict=False) or '' |
|
341 |
if self.varname: |
|
342 |
filename = '%s%s' % (self.varname, extension) |
|
343 |
elif self.backoffice_filefield_id: |
|
344 |
filename = 'file-%s%s' % (self.backoffice_filefield_id, extension) |
|
345 |
else: |
|
346 |
filename = 'file%s' % extension |
|
347 |
return filename, content_type |
|
348 | ||
332 | 349 |
def store_response(self, formdata, response, data, workflow_data): |
333 | 350 |
if self.response_type == 'json': |
334 | 351 |
try: |
... | ... | |
346 | 363 |
elif d.get('display_id'): |
347 | 364 |
formdata.id_display = d.get('display_id') |
348 | 365 |
else: # store result as attachment |
349 |
content_type = response.headers.get('content-type') or '' |
|
350 |
if content_type: |
|
351 |
content_type = content_type.split(';')[0].strip().lower() |
|
366 |
filename, content_type = self.get_attachment_data(response) |
|
352 | 367 |
workflow_data['%s_content_type' % self.varname] = content_type |
353 | 368 |
workflow_data['%s_length' % self.varname] = len(data) |
354 |
extension = mimetypes.guess_extension(content_type, strict=False) or '' |
|
355 |
filename = '%s%s' % (self.varname, extension) |
|
356 | 369 |
fp_content = StringIO(data) |
357 | 370 |
attachment = AttachmentEvolutionPart(filename, fp_content, |
358 | 371 |
content_type=content_type, |
359 | 372 |
varname=self.varname) |
360 | 373 |
formdata.evolution[-1].add_part(attachment) |
361 |
if self.backoffice_filefield_id: |
|
362 |
self.store_in_backoffice_filefield( |
|
363 |
formdata, |
|
364 |
self.backoffice_filefield_id, |
|
365 |
filename, |
|
366 |
content_type, |
|
367 |
data) |
|
374 | ||
375 |
def store_backoffice_filefield(self, formdata, response, data): |
|
376 |
filename, content_type = self.get_attachment_data(response) |
|
377 |
self.store_in_backoffice_filefield( |
|
378 |
formdata, self.backoffice_filefield_id, |
|
379 |
filename, content_type, data) |
|
368 | 380 | |
369 | 381 |
def action_on_error(self, action, formdata, response=None, data=None, exc_info=None): |
370 | 382 |
if action in (':pass', ':stop') and (self.notify_on_errors or self.record_errors): |
371 |
- |