0001-workflows-allow-setting-files-to-assembled-dictionar.patch
tests/test_workflows.py | ||
---|---|---|
1733 | 1733 | |
1734 | 1734 |
formdata = formdef.data_class().get(formdata.id) |
1735 | 1735 |
assert formdata.data['bo1'].base_filename == 'xxx.xml' |
1736 | ||
1737 |
# check storing a file from an assembled dictionary |
|
1738 |
item = SetBackofficeFieldsWorkflowStatusItem() |
|
1739 |
item.fields = [{'field_id': 'bo1', |
|
1740 |
'value': '={"content": "hello world", "filename": "hello.txt"}'}] |
|
1741 |
item.perform(formdata) |
|
1742 | ||
1743 |
formdata = formdef.data_class().get(formdata.id) |
|
1744 |
assert formdata.data['bo1'].base_filename == 'hello.txt' |
|
1745 |
assert formdata.data['bo1'].get_content() == 'hello world' |
|
1746 | ||
1747 |
item = SetBackofficeFieldsWorkflowStatusItem() |
|
1748 |
item.fields = [{'field_id': 'bo1', |
|
1749 |
'value': '={"b64_content": "SEVMTE8gV09STEQ=", "filename": "hello.txt"}'}] |
|
1750 |
item.perform(formdata) |
|
1751 | ||
1752 |
formdata = formdef.data_class().get(formdata.id) |
|
1753 |
assert formdata.data['bo1'].base_filename == 'hello.txt' |
|
1754 |
assert formdata.data['bo1'].get_content() == 'HELLO WORLD' |
|
1755 | ||
1756 |
# check wrong value |
|
1757 |
del formdata.data['bo1'] |
|
1758 |
formdata.store() |
|
1759 |
assert not 'bo1' in formdata.data |
|
1760 | ||
1761 |
item = SetBackofficeFieldsWorkflowStatusItem() |
|
1762 |
item.fields = [{'field_id': 'bo1', 'value': '="HELLO"'}] |
|
1763 |
item.perform(formdata) |
|
1764 | ||
1765 |
formdata = formdef.data_class().get(formdata.id) |
|
1766 |
assert not 'bo1' in formdata.data |
wcs/wf/backoffice_fields.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import base64 |
|
17 | 18 |
import sys |
18 | 19 |
import xml.etree.ElementTree as ET |
19 | 20 | |
... | ... | |
83 | 84 |
title=_('Fields Update'), value=self.fields, |
84 | 85 |
workflow=self.parent.parent) |
85 | 86 | |
87 |
def get_file_value(self, new_value): |
|
88 |
if isinstance(new_value, PicklableUpload): |
|
89 |
return new_value |
|
90 | ||
91 |
if isinstance(new_value, NamedAttachmentsSubstitutionProxy): |
|
92 |
upload = PicklableUpload(new_value.filename, new_value.content_type) |
|
93 |
upload.receive([new_value.content]) |
|
94 |
return upload |
|
95 | ||
96 |
if isinstance(new_value, dict): |
|
97 |
# if value is a dictionary we expect it to have a content or |
|
98 |
# b64_content key and a filename keys and an optional |
|
99 |
# content_type key. |
|
100 |
if 'b64_content' in new_value: |
|
101 |
new_value['content'] = base64.decodestring(new_value['b64_content']) |
|
102 |
if 'filename' in new_value and 'content' in new_value: |
|
103 |
upload = PicklableUpload(new_value['filename'], |
|
104 |
new_value.get('content_type') or 'application/octet-stream') |
|
105 |
upload.receive(new_value['content']) |
|
106 |
return upload |
|
107 | ||
108 |
raise ValueError('invalid data for file type (%r)' % new_value) |
|
109 | ||
86 | 110 |
def perform(self, formdata): |
87 | 111 |
if not self.fields: |
88 | 112 |
return |
89 | 113 |
for field in self.fields: |
90 | 114 |
try: |
115 |
formdef_field = [x for x in formdata.formdef.fields |
|
116 |
if 'bo%s' % x.id == field['field_id']][0] |
|
117 |
except IndexError: |
|
118 |
continue |
|
119 | ||
120 |
try: |
|
91 | 121 |
new_value = self.compute(field['value'], raises=True) |
122 |
if formdef_field.type == 'file': |
|
123 |
new_value = self.get_file_value(new_value) |
|
92 | 124 |
except: |
93 | 125 |
get_publisher().notify_of_exception(sys.exc_info()) |
94 |
if isinstance(new_value, NamedAttachmentsSubstitutionProxy): |
|
95 |
upload = PicklableUpload(new_value.filename, new_value.content_type) |
|
96 |
upload.receive([new_value.content]) |
|
97 |
new_value = upload |
|
126 |
continue |
|
127 | ||
98 | 128 |
formdata.data['%s' % field['field_id']] = new_value |
99 | 129 |
formdata.store() |
100 | 130 | |
101 |
- |