0002-workflows-hide-attachment-not-matching-target-roles-.patch
tests/api/test_formdata.py | ||
---|---|---|
7 | 7 |
import re |
8 | 8 |
import time |
9 | 9 |
import xml.etree.ElementTree as ET |
10 | 10 |
import zipfile |
11 | 11 | |
12 | 12 |
import pytest |
13 | 13 |
from django.utils.encoding import force_bytes |
14 | 14 |
from quixote import get_publisher |
15 |
from webtest import Upload |
|
15 | 16 | |
16 | 17 |
from wcs import fields |
17 | 18 |
from wcs.api_access import ApiAccess |
18 | 19 |
from wcs.blocks import BlockDef |
19 | 20 |
from wcs.categories import Category |
20 | 21 |
from wcs.data_sources import NamedDataSource |
21 | 22 |
from wcs.formdata import Evolution |
22 | 23 |
from wcs.formdef import FormDef |
23 | 24 |
from wcs.qommon import ods |
24 | 25 |
from wcs.qommon.http_request import HTTPRequest |
25 | 26 |
from wcs.qommon.ident.password_accounts import PasswordAccount |
26 | 27 |
from wcs.qommon.upload_storage import PicklableUpload |
28 |
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem |
|
27 | 29 |
from wcs.workflows import ( |
28 | 30 |
AttachmentEvolutionPart, |
29 | 31 |
EditableWorkflowStatusItem, |
30 | 32 |
Workflow, |
31 | 33 |
WorkflowBackofficeFieldsFormDef, |
32 | 34 |
) |
33 | 35 | |
34 | 36 |
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login |
... | ... | |
503 | 505 | |
504 | 506 |
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user)) |
505 | 507 |
assert len(resp.json['evolution']) == 1 |
506 | 508 |
assert len(resp.json['evolution'][0]['parts']) == 1 |
507 | 509 |
part = resp.json['evolution'][0]['parts'][0] |
508 | 510 |
assert part['filename'] == 'hello.txt' |
509 | 511 |
assert part['content_type'] == 'text/plain' |
510 | 512 |
assert 'content' in part |
513 |
assert 'to' in part |
|
511 | 514 |
assert base64.decodebytes(force_bytes(part['content'])) == b'test' |
512 | 515 | |
513 | 516 |
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/?anonymise' % formdata.id, user=local_user)) |
514 | 517 |
assert len(resp.json['evolution']) == 1 |
515 | 518 |
assert 'parts' not in resp.json['evolution'][0] |
516 | 519 | |
517 | 520 |
# check this doesn't get into list of forms API |
518 | 521 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on', user=local_user)) |
519 | 522 |
assert 'hello.txt' not in resp.text |
520 | 523 | |
521 | 524 | |
525 |
def test_formdata_with_evolution_part_attachment_to(pub, local_user): |
|
526 |
pub.role_class.wipe() |
|
527 |
role = pub.role_class(name='test') |
|
528 |
role.id = '123' |
|
529 |
role.store() |
|
530 | ||
531 |
local_user.roles = [role.id] |
|
532 |
local_user.store() |
|
533 | ||
534 |
workflow = Workflow(name='test') |
|
535 |
st1 = workflow.add_status('Status1', 'st1') |
|
536 | ||
537 |
add_to_journal = RegisterCommenterWorkflowStatusItem() |
|
538 |
add_to_journal.id = '_add_to_journal' |
|
539 |
add_to_journal.comment = 'HELLO WORLD' |
|
540 |
add_to_journal.attachments = ['form_var_file_raw'] |
|
541 |
add_to_journal.to = [role.id] |
|
542 |
st1.items.append(add_to_journal) |
|
543 | ||
544 |
workflow.store() |
|
545 | ||
546 |
FormDef.wipe() |
|
547 |
formdef = FormDef() |
|
548 |
formdef.name = 'test' |
|
549 |
formdef.workflow_id = workflow.id |
|
550 |
formdef.workflow_roles = {'_receiver': role.id} |
|
551 |
formdef.fields = [fields.FileField(id='1', label='File1', type='file', varname='file')] |
|
552 |
formdef.store() |
|
553 |
formdef.data_class().wipe() |
|
554 | ||
555 |
resp = get_app(pub).get('/test/') |
|
556 |
resp.forms[0]['f1$file'] = Upload('hello.txt', b'foobar', 'text/plain') |
|
557 |
resp = resp.forms[0].submit('submit') |
|
558 |
assert 'Check values then click submit.' in resp.text |
|
559 |
resp = resp.forms[0].submit('submit') |
|
560 |
assert resp.status_int == 302 |
|
561 |
resp = resp.follow() |
|
562 |
assert 'The form has been recorded' in resp.text |
|
563 | ||
564 |
formdata = formdef.data_class().select()[0] |
|
565 |
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user)) |
|
566 |
assert len(resp.json['evolution']) == 1 |
|
567 |
assert len(resp.json['evolution'][0]['parts']) == 2 |
|
568 |
assert resp.json['evolution'][0]['parts'][1]['type'] == 'workflow-comment' |
|
569 |
part = resp.json['evolution'][0]['parts'][0] |
|
570 |
assert part['type'] == 'workflow-attachment' |
|
571 |
assert part['filename'] == 'hello.txt' |
|
572 |
assert part['content_type'] == 'text/plain' |
|
573 |
assert part['to'] == ['123'] |
|
574 |
assert 'content' in part |
|
575 |
assert base64.decodebytes(force_bytes(part['content'])) == b'foobar' |
|
576 | ||
577 |
resp = get_app(pub).get(sign_uri('/api/forms/test/%s/?anonymise' % formdata.id, user=local_user)) |
|
578 |
assert len(resp.json['evolution']) == 1 |
|
579 |
assert len(resp.json['evolution'][0]['parts']) == 1 |
|
580 |
assert resp.json['evolution'][0]['parts'][0]['type'] == 'workflow-comment' |
|
581 | ||
582 |
# check this doesn't get into list of forms API |
|
583 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on', user=local_user)) |
|
584 |
assert len(resp.json[0]['evolution']) == 1 |
|
585 |
assert len(resp.json[0]['evolution'][0]['parts']) == 1 |
|
586 |
assert resp.json[0]['evolution'][0]['parts'][0]['type'] == 'workflow-comment' |
|
587 | ||
588 | ||
522 | 589 |
def test_api_list_formdata(pub, local_user): |
523 | 590 |
pub.role_class.wipe() |
524 | 591 |
role = pub.role_class(name='test') |
525 | 592 |
role.store() |
526 | 593 | |
527 | 594 |
FormDef.wipe() |
528 | 595 |
formdef = FormDef() |
529 | 596 |
formdef.name = 'test' |
tests/form_pages/test_formdata.py | ||
---|---|---|
1635 | 1635 |
assert 'The form has been recorded' in resp.text |
1636 | 1636 | |
1637 | 1637 |
formdata = formdef.data_class().select()[0] |
1638 | 1638 |
assert formdata.evolution[0].parts[0].content == 'Hello World' |
1639 | 1639 |
assert formdata.evolution[0].parts[0].to == [role2.id] |
1640 | 1640 |
resp = app.get('/test/%s/' % formdata.id) |
1641 | 1641 |
resp.status_int = 200 |
1642 | 1642 |
assert resp.html.find('div', {'id': 'evolution-log'}).find('p').text == 'Hello World' |
1643 | ||
1644 | ||
1645 |
def test_formdata_evolution_registercommenter_to_with_attachment(pub): |
|
1646 |
user = create_user(pub) |
|
1647 | ||
1648 |
pub.role_class.wipe() |
|
1649 |
role1 = pub.role_class(name='role the user does not have') |
|
1650 |
role1.store() |
|
1651 |
role2 = pub.role_class(name='role the user does have') |
|
1652 |
role2.store() |
|
1653 |
user.roles = [role2.id] |
|
1654 |
user.store() |
|
1655 | ||
1656 |
wf = Workflow(name='status') |
|
1657 |
st1 = wf.add_status('Status1', 'st1') |
|
1658 | ||
1659 |
comment = RegisterCommenterWorkflowStatusItem() |
|
1660 |
comment.id = '1' |
|
1661 |
comment.comment = 'Hello all' |
|
1662 |
comment.attachments = ['form_var_file1_raw'] |
|
1663 |
comment.to = None |
|
1664 |
st1.items.append(comment) |
|
1665 |
comment.parent = st1 |
|
1666 | ||
1667 |
comment = RegisterCommenterWorkflowStatusItem() |
|
1668 |
comment.id = '2' |
|
1669 |
comment.comment = 'Hello role1' |
|
1670 |
comment.attachments = ['form_var_file2_raw'] |
|
1671 |
comment.to = [role1.id] |
|
1672 |
st1.items.append(comment) |
|
1673 |
comment.parent = st1 |
|
1674 | ||
1675 |
comment = RegisterCommenterWorkflowStatusItem() |
|
1676 |
comment.id = '3' |
|
1677 |
comment.comment = 'Hello role2' |
|
1678 |
comment.attachments = ['form_var_file3_raw'] |
|
1679 |
comment.to = [role2.id] |
|
1680 |
st1.items.append(comment) |
|
1681 |
comment.parent = st1 |
|
1682 | ||
1683 |
wf.store() |
|
1684 | ||
1685 |
FormDef.wipe() |
|
1686 |
formdef = FormDef() |
|
1687 |
formdef.name = 'test' |
|
1688 |
formdef.workflow_id = wf.id |
|
1689 |
formdef.fields = [ |
|
1690 |
fields.FileField(id='1', label='File1', type='file', varname='file1'), |
|
1691 |
fields.FileField(id='2', label='File2', type='file', varname='file2'), |
|
1692 |
fields.FileField(id='3', label='File3', type='file', varname='file3'), |
|
1693 |
] |
|
1694 |
formdef.store() |
|
1695 |
formdef.data_class().wipe() |
|
1696 | ||
1697 |
app = login(get_app(pub), username='foo', password='foo') |
|
1698 |
resp = app.get('/test/') |
|
1699 |
resp.forms[0]['f1$file'] = Upload('to-all.txt', b'foobar', 'text/plain') |
|
1700 |
resp.forms[0]['f2$file'] = Upload('to-role1.txt', b'foobar', 'text/plain') |
|
1701 |
resp.forms[0]['f3$file'] = Upload('to-role2.txt', b'foobar', 'text/plain') |
|
1702 |
resp = resp.forms[0].submit('submit') |
|
1703 |
assert 'Check values then click submit.' in resp.text |
|
1704 |
resp = resp.forms[0].submit('submit') |
|
1705 |
assert resp.status_int == 302 |
|
1706 |
resp = resp.follow() |
|
1707 |
assert 'The form has been recorded' in resp.text |
|
1708 | ||
1709 |
formdata = formdef.data_class().select()[0] |
|
1710 |
assert len(formdata.evolution[0].parts) == 6 |
|
1711 | ||
1712 |
resp = app.get('/test/%s/' % formdata.id) |
|
1713 |
resp.status_int = 200 |
|
1714 |
assert [x.a.text for x in resp.html.find_all('p', {'class': 'wf-attachment'})] == [ |
|
1715 |
'to-all.txt', |
|
1716 |
'to-role2.txt', |
|
1717 |
] |
tests/test_workflows.py | ||
---|---|---|
1289 | 1289 |
register_commenter2.to = [role.id, '_submitter'] |
1290 | 1290 |
user.roles = [role.id, role2.id] |
1291 | 1291 |
register_commenter2.perform(formdata) |
1292 | 1292 |
assert len(formdata.evolution[-1].parts) == 6 |
1293 | 1293 |
assert '<p>d1</p>' in [str(x) for x in display_parts()] |
1294 | 1294 |
assert '<p>d2</p>' in [str(x) for x in display_parts()] |
1295 | 1295 | |
1296 | 1296 | |
1297 |
def test_register_comment_to_with_attachment(pub): |
|
1298 |
workflow = Workflow(name='register comment to with attachment') |
|
1299 |
st1 = workflow.add_status('Status1', 'st1') |
|
1300 | ||
1301 |
role = pub.role_class(name='foorole') |
|
1302 |
role.store() |
|
1303 |
role2 = pub.role_class(name='no-one-role') |
|
1304 |
role2.store() |
|
1305 |
user = pub.user_class(name='baruser') |
|
1306 |
user.roles = [] |
|
1307 |
user.store() |
|
1308 | ||
1309 |
upload1 = PicklableUpload('all.txt', 'text/plain') |
|
1310 |
upload1.receive([b'barfoo']) |
|
1311 |
upload2 = PicklableUpload('to-role.txt', 'text/plain') |
|
1312 |
upload2.receive([b'barfoo']) |
|
1313 |
upload3 = PicklableUpload('to-submitter.txt', 'text/plain') |
|
1314 |
upload3.receive([b'barfoo']) |
|
1315 |
upload4 = PicklableUpload('to-role-or-submitter.txt', 'text/plain') |
|
1316 |
upload4.receive([b'barfoo']) |
|
1317 | ||
1318 |
FormDef.wipe() |
|
1319 |
formdef = FormDef() |
|
1320 |
formdef.url_name = 'foobar' |
|
1321 |
formdef.fields = [ |
|
1322 |
FileField(id='1', label='File1', type='file', varname='file1'), |
|
1323 |
FileField(id='2', label='File2', type='file', varname='file2'), |
|
1324 |
FileField(id='3', label='File3', type='file', varname='file3'), |
|
1325 |
FileField(id='4', label='File4', type='file', varname='file4'), |
|
1326 |
] |
|
1327 |
formdef._workflow = workflow |
|
1328 |
formdef.store() |
|
1329 | ||
1330 |
formdata = formdef.data_class()() |
|
1331 |
formdata.data = {'1': upload1, '2': upload2, '3': upload3, '4': upload4} |
|
1332 |
formdata.just_created() |
|
1333 |
assert formdata.status == 'wf-st1' |
|
1334 |
pub.substitutions.feed(formdata) |
|
1335 | ||
1336 |
register_commenter = RegisterCommenterWorkflowStatusItem() |
|
1337 |
register_commenter.parent = st1 |
|
1338 |
st1.items.append(register_commenter) |
|
1339 | ||
1340 |
def display_parts(): |
|
1341 |
formdata.evolution[-1]._display_parts = None # invalidate cache |
|
1342 |
return [str(x) for x in formdata.evolution[-1].display_parts()] |
|
1343 | ||
1344 |
register_commenter.comment = 'all' |
|
1345 |
register_commenter.attachments = ['form_var_file1_raw'] |
|
1346 |
register_commenter.to = None |
|
1347 |
register_commenter.perform(formdata) |
|
1348 | ||
1349 |
register_commenter.comment = 'to-role' |
|
1350 |
register_commenter.attachments = ['form_var_file2_raw'] |
|
1351 |
register_commenter.to = [role.id] |
|
1352 |
register_commenter.perform(formdata) |
|
1353 | ||
1354 |
register_commenter.comment = 'to-submitter' |
|
1355 |
register_commenter.attachments = ['form_var_file3_raw'] |
|
1356 |
register_commenter.to = ['_submitter'] |
|
1357 |
register_commenter.perform(formdata) |
|
1358 | ||
1359 |
register_commenter.comment = 'to-role-or-submitter' |
|
1360 |
register_commenter.attachments = ['form_var_file4_raw'] |
|
1361 |
register_commenter.to = [role.id, '_submitter'] |
|
1362 |
register_commenter.perform(formdata) |
|
1363 | ||
1364 |
assert len(formdata.evolution[-1].parts) == 8 |
|
1365 | ||
1366 |
assert user.roles == [] |
|
1367 |
assert len(display_parts()) == 2 |
|
1368 |
assert 'all.txt' in display_parts()[0] |
|
1369 |
assert display_parts()[1] == '<p>all</p>' |
|
1370 | ||
1371 |
pub._request._user = user |
|
1372 |
user.roles = [role.id] |
|
1373 |
assert len(display_parts()) == 6 |
|
1374 |
assert 'all.txt' in display_parts()[0] |
|
1375 |
assert 'to-role.txt' in display_parts()[2] |
|
1376 |
assert 'to-role-or-submitter.txt' in display_parts()[4] |
|
1377 | ||
1378 |
user.roles = [] |
|
1379 |
formdata.user_id = user.id |
|
1380 |
assert len(display_parts()) == 6 |
|
1381 |
assert 'all.txt' in display_parts()[0] |
|
1382 |
assert 'to-submitter.txt' in display_parts()[2] |
|
1383 |
assert 'to-role-or-submitter.txt' in display_parts()[4] |
|
1384 | ||
1385 |
user.roles = [role.id] |
|
1386 |
assert len(display_parts()) == 8 |
|
1387 |
assert 'all.txt' in display_parts()[0] |
|
1388 |
assert 'to-role.txt' in display_parts()[2] |
|
1389 |
assert 'to-submitter.txt' in display_parts()[4] |
|
1390 |
assert 'to-role-or-submitter.txt' in display_parts()[6] |
|
1391 | ||
1392 | ||
1297 | 1393 |
def test_email(pub, emails): |
1298 | 1394 |
pub.substitutions.feed(MockSubstitutionVariables()) |
1299 | 1395 | |
1300 | 1396 |
formdef = FormDef() |
1301 | 1397 |
formdef.name = 'baz' |
1302 | 1398 |
formdef.fields = [] |
1303 | 1399 |
formdef.store() |
1304 | 1400 |
wcs/wf/register_comment.py | ||
---|---|---|
107 | 107 |
'render_br': False, |
108 | 108 |
'options': [(None, '---', None)] + self.get_list_of_roles(include_logged_in_users=False), |
109 | 109 |
}, |
110 | 110 |
) |
111 | 111 | |
112 | 112 |
def get_parameters(self): |
113 | 113 |
return ('comment', 'to', 'attachments', 'condition') |
114 | 114 | |
115 |
def attach_uploads_to_formdata(self, formdata, uploads): |
|
115 |
def attach_uploads_to_formdata(self, formdata, uploads, to):
|
|
116 | 116 |
if not formdata.evolution[-1].parts: |
117 | 117 |
formdata.evolution[-1].parts = [] |
118 | 118 |
for upload in uploads: |
119 | 119 |
try: |
120 | 120 |
# useless but required to restore upload.fp from serialized state, |
121 | 121 |
# needed by AttachmentEvolutionPart.from_upload() |
122 | 122 |
upload.get_file_pointer() |
123 |
formdata.evolution[-1].add_part(AttachmentEvolutionPart.from_upload(upload)) |
|
123 |
formdata.evolution[-1].add_part(AttachmentEvolutionPart.from_upload(upload, to=to))
|
|
124 | 124 |
except Exception: |
125 | 125 |
get_publisher().notify_of_exception(sys.exc_info(), context='[comment/attachments]') |
126 | 126 |
continue |
127 | 127 | |
128 | 128 |
def perform(self, formdata): |
129 | 129 |
if not formdata.evolution: |
130 | 130 |
return |
131 | 131 | |
132 | 132 |
# process attachments first, they might be used in the comment |
133 | 133 |
# (with substitution vars) |
134 | 134 |
if self.attachments: |
135 | 135 |
uploads = self.convert_attachments_to_uploads() |
136 |
self.attach_uploads_to_formdata(formdata, uploads) |
|
136 |
self.attach_uploads_to_formdata(formdata, uploads, self.to)
|
|
137 | 137 |
formdata.store() # store and invalidate cache, so references can be used in the comment message. |
138 | 138 | |
139 | 139 |
# the comment can use attachments done above |
140 | 140 |
try: |
141 | 141 |
formdata.evolution[-1].add_part(JournalEvolutionPart(formdata, self.comment, self.to)) |
142 | 142 |
formdata.store() |
143 | 143 |
except TemplateError as e: |
144 | 144 |
url = formdata.get_url() |
wcs/workflows.py | ||
---|---|---|
210 | 210 |
base_filename, |
211 | 211 |
fp, |
212 | 212 |
orig_filename=None, |
213 | 213 |
content_type=None, |
214 | 214 |
charset=None, |
215 | 215 |
varname=None, |
216 | 216 |
storage=None, |
217 | 217 |
storage_attrs=None, |
218 |
to=None, |
|
218 | 219 |
): |
219 | 220 |
self.base_filename = base_filename |
220 | 221 |
self.orig_filename = orig_filename or base_filename |
221 | 222 |
self.content_type = content_type |
222 | 223 |
self.charset = charset |
223 | 224 |
self.fp = fp |
224 | 225 |
self.varname = varname |
225 | 226 |
self.storage = storage |
226 | 227 |
self.storage_attrs = storage_attrs |
228 |
self.to = to |
|
227 | 229 | |
228 | 230 |
@classmethod |
229 |
def from_upload(cls, upload, varname=None): |
|
231 |
def from_upload(cls, upload, varname=None, to=None):
|
|
230 | 232 |
return AttachmentEvolutionPart( |
231 | 233 |
upload.base_filename, |
232 | 234 |
getattr(upload, 'fp', None), |
233 | 235 |
upload.orig_filename, |
234 | 236 |
upload.content_type, |
235 | 237 |
upload.charset, |
236 | 238 |
varname=varname, |
237 | 239 |
storage=getattr(upload, 'storage', None), |
238 | 240 |
storage_attrs=getattr(upload, 'storage_attrs', None), |
241 |
to=to, |
|
239 | 242 |
) |
240 | 243 | |
241 | 244 |
def get_file_pointer(self): |
242 | 245 |
if self.filename.startswith('uuid-'): |
243 | 246 |
return None |
244 | 247 |
return open(self.filename, 'rb') # pylint: disable=consider-using-with |
245 | 248 | |
246 | 249 |
def __getstate__(self): |
... | ... | |
285 | 288 | |
286 | 289 |
def get_json_export_dict(self, anonymise=False, include_files=True): |
287 | 290 |
if not include_files or anonymise: |
288 | 291 |
return None |
289 | 292 |
d = { |
290 | 293 |
'type': 'workflow-attachment', |
291 | 294 |
'content_type': self.content_type, |
292 | 295 |
'filename': self.base_filename, |
296 |
'to': self.to, |
|
293 | 297 |
} |
294 | 298 |
fd = self.get_file_pointer() |
295 | 299 |
if fd: |
296 | 300 |
d['content'] = base64.encodebytes(fd.read()) |
297 | 301 |
fd.close() |
298 | 302 |
return d |
299 | 303 | |
300 | 304 |
@classmethod |
301 |
- |