Projet

Général

Profil

0002-workflows-hide-attachment-not-matching-target-roles-.patch

Nicolas Roche, 01 juin 2021 22:09

Télécharger (16,8 ko)

Voir les différences:

Subject: [PATCH 2/2] workflows: hide attachment not matching target roles in
 commenter workflow item (#54081)

 tests/api/test_formdata.py        | 67 +++++++++++++++++++++
 tests/form_pages/test_formdata.py | 75 ++++++++++++++++++++++++
 tests/test_workflows.py           | 96 +++++++++++++++++++++++++++++++
 wcs/wf/register_comment.py        |  6 +-
 wcs/workflows.py                  |  6 +-
 5 files changed, 246 insertions(+), 4 deletions(-)
tests/api/test_formdata.py
7 7
import re
8 8
import time
9 9
import xml.etree.ElementTree as ET
10 10
import zipfile
11 11

  
12 12
import pytest
13 13
from django.utils.encoding import force_bytes
14 14
from quixote import get_publisher
15
from webtest import Hidden, Upload
15 16

  
16 17
from wcs import fields
17 18
from wcs.api_access import ApiAccess
18 19
from wcs.blocks import BlockDef
19 20
from wcs.categories import Category
20 21
from wcs.data_sources import NamedDataSource
21 22
from wcs.formdata import Evolution
22 23
from wcs.formdef import FormDef
23 24
from wcs.qommon import ods
24 25
from wcs.qommon.http_request import HTTPRequest
25 26
from wcs.qommon.ident.password_accounts import PasswordAccount
26 27
from wcs.qommon.upload_storage import PicklableUpload
28
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
27 29
from wcs.workflows import (
28 30
    AttachmentEvolutionPart,
29 31
    EditableWorkflowStatusItem,
30 32
    Workflow,
31 33
    WorkflowBackofficeFieldsFormDef,
32 34
)
33 35

  
34 36
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login
......
503 505

  
504 506
    resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user))
505 507
    assert len(resp.json['evolution']) == 1
506 508
    assert len(resp.json['evolution'][0]['parts']) == 1
507 509
    part = resp.json['evolution'][0]['parts'][0]
508 510
    assert part['filename'] == 'hello.txt'
509 511
    assert part['content_type'] == 'text/plain'
510 512
    assert 'content' in part
513
    assert 'to' in part
511 514
    assert base64.decodebytes(force_bytes(part['content'])) == b'test'
512 515

  
513 516
    resp = get_app(pub).get(sign_uri('/api/forms/test/%s/?anonymise' % formdata.id, user=local_user))
514 517
    assert len(resp.json['evolution']) == 1
515 518
    assert 'parts' not in resp.json['evolution'][0]
516 519

  
517 520
    # check this doesn't get into list of forms API
518 521
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on', user=local_user))
519 522
    assert 'hello.txt' not in resp.text
520 523

  
521 524

  
525
def test_formdata_with_evolution_part_attachment_to(pub, local_user):
526
    pub.role_class.wipe()
527
    role = pub.role_class(name='test')
528
    role.id = '123'
529
    role.store()
530

  
531
    local_user.roles = [role.id]
532
    local_user.store()
533

  
534
    workflow = Workflow(name='test')
535
    st1 = workflow.add_status('Status1', 'st1')
536

  
537
    add_to_journal = RegisterCommenterWorkflowStatusItem()
538
    add_to_journal.id = '_add_to_journal'
539
    add_to_journal.comment = 'HELLO WORLD'
540
    add_to_journal.attachments = ['form_var_file_raw']
541
    add_to_journal.to = [role.id]
542
    st1.items.append(add_to_journal)
543

  
544
    workflow.store()
545

  
546
    FormDef.wipe()
547
    formdef = FormDef()
548
    formdef.name = 'test'
549
    formdef.workflow_id = workflow.id
550
    formdef.workflow_roles = {'_receiver': role.id}
551
    formdef.fields = [fields.FileField(id='1', label='File1', type='file', varname='file')]
552
    formdef.store()
553
    formdef.data_class().wipe()
554

  
555
    resp = get_app(pub).get('/test/')
556
    resp.forms[0]['f1$file'] = Upload('hello.txt', b'foobar', 'text/plain')
557
    resp = resp.forms[0].submit('submit')
558
    assert 'Check values then click submit.' in resp.text
559
    resp = resp.forms[0].submit('submit')
560
    assert resp.status_int == 302
561
    resp = resp.follow()
562
    assert 'The form has been recorded' in resp.text
563

  
564
    formdata = formdef.data_class().select()[0]
565
    resp = get_app(pub).get(sign_uri('/api/forms/test/%s/' % formdata.id, user=local_user))
566
    assert len(resp.json['evolution']) == 1
567
    assert len(resp.json['evolution'][0]['parts']) == 2
568
    assert resp.json['evolution'][0]['parts'][1]['type'] == 'workflow-comment'
569
    part = resp.json['evolution'][0]['parts'][0]
570
    assert part['type'] == 'workflow-attachment'
571
    assert part['filename'] == 'hello.txt'
572
    assert part['content_type'] == 'text/plain'
573
    assert part['to'] == ['123']
574
    assert 'content' in part
575
    assert base64.decodebytes(force_bytes(part['content'])) == b'foobar'
576

  
577
    resp = get_app(pub).get(sign_uri('/api/forms/test/%s/?anonymise' % formdata.id, user=local_user))
578
    assert len(resp.json['evolution']) == 1
579
    assert len(resp.json['evolution'][0]['parts']) == 1
580
    assert resp.json['evolution'][0]['parts'][0]['type'] == 'workflow-comment'
581

  
582
    # check this doesn't get into list of forms API
583
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?full=on', user=local_user))
584
    assert len(resp.json[0]['evolution']) == 1
585
    assert len(resp.json[0]['evolution'][0]['parts']) == 1
586
    assert resp.json[0]['evolution'][0]['parts'][0]['type'] == 'workflow-comment'
587

  
588

  
522 589
def test_api_list_formdata(pub, local_user):
523 590
    pub.role_class.wipe()
524 591
    role = pub.role_class(name='test')
525 592
    role.store()
526 593

  
527 594
    FormDef.wipe()
528 595
    formdef = FormDef()
529 596
    formdef.name = 'test'
tests/form_pages/test_formdata.py
1635 1635
    assert 'The form has been recorded' in resp.text
1636 1636

  
1637 1637
    formdata = formdef.data_class().select()[0]
1638 1638
    assert formdata.evolution[0].parts[0].content == 'Hello World'
1639 1639
    assert formdata.evolution[0].parts[0].to == [role2.id]
1640 1640
    resp = app.get('/test/%s/' % formdata.id)
1641 1641
    resp.status_int = 200
1642 1642
    assert resp.html.find('div', {'id': 'evolution-log'}).find('p').text == 'Hello World'
1643

  
1644

  
1645
def test_formdata_evolution_registercommenter_to_with_attachment(pub):
1646
    user = create_user(pub)
1647

  
1648
    pub.role_class.wipe()
1649
    role1 = pub.role_class(name='role the user does not have')
1650
    role1.store()
1651
    role2 = pub.role_class(name='role the user does have')
1652
    role2.store()
1653
    user.roles = [role2.id]
1654
    user.store()
1655

  
1656
    wf = Workflow(name='status')
1657
    st1 = wf.add_status('Status1', 'st1')
1658

  
1659
    comment = RegisterCommenterWorkflowStatusItem()
1660
    comment.id = '1'
1661
    comment.comment = 'Hello all'
1662
    comment.attachments = ['form_var_file1_raw']
1663
    comment.to = None
1664
    st1.items.append(comment)
1665
    comment.parent = st1
1666

  
1667
    comment = RegisterCommenterWorkflowStatusItem()
1668
    comment.id = '2'
1669
    comment.comment = 'Hello role1'
1670
    comment.attachments = ['form_var_file2_raw']
1671
    comment.to = [role1.id]
1672
    st1.items.append(comment)
1673
    comment.parent = st1
1674

  
1675
    comment = RegisterCommenterWorkflowStatusItem()
1676
    comment.id = '3'
1677
    comment.comment = 'Hello role2'
1678
    comment.attachments = ['form_var_file3_raw']
1679
    comment.to = [role2.id]
1680
    st1.items.append(comment)
1681
    comment.parent = st1
1682

  
1683
    wf.store()
1684

  
1685
    FormDef.wipe()
1686
    formdef = FormDef()
1687
    formdef.name = 'test'
1688
    formdef.workflow_id = wf.id
1689
    formdef.fields = [
1690
        fields.FileField(id='1', label='File1', type='file', varname='file1'),
1691
        fields.FileField(id='2', label='File2', type='file', varname='file2'),
1692
        fields.FileField(id='3', label='File3', type='file', varname='file3'),
1693
    ]
1694
    formdef.store()
1695
    formdef.data_class().wipe()
1696

  
1697
    app = login(get_app(pub), username='foo', password='foo')
1698
    resp = app.get('/test/')
1699
    resp.forms[0]['f1$file'] = Upload('to-all.txt', b'foobar', 'text/plain')
1700
    resp.forms[0]['f2$file'] = Upload('to-role1.txt', b'foobar', 'text/plain')
1701
    resp.forms[0]['f3$file'] = Upload('to-role2.txt', b'foobar', 'text/plain')
1702
    resp = resp.forms[0].submit('submit')
1703
    assert 'Check values then click submit.' in resp.text
1704
    resp = resp.forms[0].submit('submit')
1705
    assert resp.status_int == 302
1706
    resp = resp.follow()
1707
    assert 'The form has been recorded' in resp.text
1708

  
1709
    formdata = formdef.data_class().select()[0]
1710
    assert len(formdata.evolution[0].parts) == 6
1711

  
1712
    resp = app.get('/test/%s/' % formdata.id)
1713
    resp.status_int = 200
1714
    assert [x.a.text for x in resp.html.find_all('p', {'class': 'wf-attachment'})] == [
1715
        'to-all.txt',
1716
        'to-role2.txt',
1717
    ]
tests/test_workflows.py
1289 1289
    register_commenter2.to = [role.id, '_submitter']
1290 1290
    user.roles = [role.id, role2.id]
1291 1291
    register_commenter2.perform(formdata)
1292 1292
    assert len(formdata.evolution[-1].parts) == 6
1293 1293
    assert '<p>d1</p>' in [str(x) for x in display_parts()]
1294 1294
    assert '<p>d2</p>' in [str(x) for x in display_parts()]
1295 1295

  
1296 1296

  
1297
def test_register_comment_to_with_attachment(pub):
1298
    workflow = Workflow(name='register comment to with attachment')
1299
    st1 = workflow.add_status('Status1', 'st1')
1300

  
1301
    role = pub.role_class(name='foorole')
1302
    role.store()
1303
    role2 = pub.role_class(name='no-one-role')
1304
    role2.store()
1305
    user = pub.user_class(name='baruser')
1306
    user.roles = []
1307
    user.store()
1308

  
1309
    upload1 = PicklableUpload('all.txt', 'text/plain')
1310
    upload1.receive([b'barfoo'])
1311
    upload2 = PicklableUpload('to-role.txt', 'text/plain')
1312
    upload2.receive([b'barfoo'])
1313
    upload3 = PicklableUpload('to-submitter.txt', 'text/plain')
1314
    upload3.receive([b'barfoo'])
1315
    upload4 = PicklableUpload('to-role-or-submitter.txt', 'text/plain')
1316
    upload4.receive([b'barfoo'])
1317

  
1318
    FormDef.wipe()
1319
    formdef = FormDef()
1320
    formdef.url_name = 'foobar'
1321
    formdef.fields = [
1322
        FileField(id='1', label='File1', type='file', varname='file1'),
1323
        FileField(id='2', label='File2', type='file', varname='file2'),
1324
        FileField(id='3', label='File3', type='file', varname='file3'),
1325
        FileField(id='4', label='File4', type='file', varname='file4'),
1326
    ]
1327
    formdef._workflow = workflow
1328
    formdef.store()
1329

  
1330
    formdata = formdef.data_class()()
1331
    formdata.data = {'1': upload1, '2': upload2, '3': upload3, '4': upload4}
1332
    formdata.just_created()
1333
    assert formdata.status == 'wf-st1'
1334
    pub.substitutions.feed(formdata)
1335

  
1336
    register_commenter = RegisterCommenterWorkflowStatusItem()
1337
    register_commenter.parent = st1
1338
    st1.items.append(register_commenter)
1339

  
1340
    def display_parts():
1341
        formdata.evolution[-1]._display_parts = None  # invalidate cache
1342
        return [str(x) for x in formdata.evolution[-1].display_parts()]
1343

  
1344
    register_commenter.comment = 'all'
1345
    register_commenter.attachments = ['form_var_file1_raw']
1346
    register_commenter.to = None
1347
    register_commenter.perform(formdata)
1348

  
1349
    register_commenter.comment = 'to-role'
1350
    register_commenter.attachments = ['form_var_file2_raw']
1351
    register_commenter.to = [role.id]
1352
    register_commenter.perform(formdata)
1353

  
1354
    register_commenter.comment = 'to-submitter'
1355
    register_commenter.attachments = ['form_var_file3_raw']
1356
    register_commenter.to = ['_submitter']
1357
    register_commenter.perform(formdata)
1358

  
1359
    register_commenter.comment = 'to-role-or-submitter'
1360
    register_commenter.attachments = ['form_var_file4_raw']
1361
    register_commenter.to = [role.id, '_submitter']
1362
    register_commenter.perform(formdata)
1363

  
1364
    assert len(formdata.evolution[-1].parts) == 8
1365

  
1366
    assert user.roles == []
1367
    assert len(display_parts()) == 2
1368
    assert 'all.txt' in display_parts()[0]
1369
    assert display_parts()[1] == '<p>all</p>'
1370

  
1371
    pub._request._user = user
1372
    user.roles = [role.id]
1373
    assert len(display_parts()) == 6
1374
    assert 'all.txt' in display_parts()[0]
1375
    assert 'to-role.txt' in display_parts()[2]
1376
    assert 'to-role-or-submitter.txt' in display_parts()[4]
1377

  
1378
    user.roles = []
1379
    formdata.user_id = user.id
1380
    assert len(display_parts()) == 6
1381
    assert 'all.txt' in display_parts()[0]
1382
    assert 'to-submitter.txt' in display_parts()[2]
1383
    assert 'to-role-or-submitter.txt' in display_parts()[4]
1384

  
1385
    user.roles = [role.id]
1386
    assert len(display_parts()) == 8
1387
    assert 'all.txt' in display_parts()[0]
1388
    assert 'to-role.txt' in display_parts()[2]
1389
    assert 'to-submitter.txt' in display_parts()[4]
1390
    assert 'to-role-or-submitter.txt' in display_parts()[6]
1391

  
1392

  
1297 1393
def test_email(pub, emails):
1298 1394
    pub.substitutions.feed(MockSubstitutionVariables())
1299 1395

  
1300 1396
    formdef = FormDef()
1301 1397
    formdef.name = 'baz'
1302 1398
    formdef.fields = []
1303 1399
    formdef.store()
1304 1400

  
wcs/wf/register_comment.py
107 107
                    'render_br': False,
108 108
                    'options': [(None, '---', None)] + self.get_list_of_roles(include_logged_in_users=False),
109 109
                },
110 110
            )
111 111

  
112 112
    def get_parameters(self):
113 113
        return ('comment', 'to', 'attachments', 'condition')
114 114

  
115
    def attach_uploads_to_formdata(self, formdata, uploads):
115
    def attach_uploads_to_formdata(self, formdata, uploads, to):
116 116
        if not formdata.evolution[-1].parts:
117 117
            formdata.evolution[-1].parts = []
118 118
        for upload in uploads:
119 119
            try:
120 120
                # useless but required to restore upload.fp from serialized state,
121 121
                # needed by AttachmentEvolutionPart.from_upload()
122 122
                upload.get_file_pointer()
123
                formdata.evolution[-1].add_part(AttachmentEvolutionPart.from_upload(upload))
123
                formdata.evolution[-1].add_part(AttachmentEvolutionPart.from_upload(upload, to=to))
124 124
            except Exception:
125 125
                get_publisher().notify_of_exception(sys.exc_info(), context='[comment/attachments]')
126 126
            continue
127 127

  
128 128
    def perform(self, formdata):
129 129
        if not formdata.evolution:
130 130
            return
131 131

  
132 132
        # process attachments first, they might be used in the comment
133 133
        # (with substitution vars)
134 134
        if self.attachments:
135 135
            uploads = self.convert_attachments_to_uploads()
136
            self.attach_uploads_to_formdata(formdata, uploads)
136
            self.attach_uploads_to_formdata(formdata, uploads, self.to)
137 137
            formdata.store()  # store and invalidate cache, so references can be used in the comment message.
138 138

  
139 139
        # the comment can use attachments done above
140 140
        try:
141 141
            formdata.evolution[-1].add_part(JournalEvolutionPart(formdata, self.comment, self.to))
142 142
            formdata.store()
143 143
        except TemplateError as e:
144 144
            url = formdata.get_url()
wcs/workflows.py
210 210
        base_filename,
211 211
        fp,
212 212
        orig_filename=None,
213 213
        content_type=None,
214 214
        charset=None,
215 215
        varname=None,
216 216
        storage=None,
217 217
        storage_attrs=None,
218
        to=None,
218 219
    ):
219 220
        self.base_filename = base_filename
220 221
        self.orig_filename = orig_filename or base_filename
221 222
        self.content_type = content_type
222 223
        self.charset = charset
223 224
        self.fp = fp
224 225
        self.varname = varname
225 226
        self.storage = storage
226 227
        self.storage_attrs = storage_attrs
228
        self.to = to
227 229

  
228 230
    @classmethod
229
    def from_upload(cls, upload, varname=None):
231
    def from_upload(cls, upload, varname=None, to=None):
230 232
        return AttachmentEvolutionPart(
231 233
            upload.base_filename,
232 234
            getattr(upload, 'fp', None),
233 235
            upload.orig_filename,
234 236
            upload.content_type,
235 237
            upload.charset,
236 238
            varname=varname,
237 239
            storage=getattr(upload, 'storage', None),
238 240
            storage_attrs=getattr(upload, 'storage_attrs', None),
241
            to=to,
239 242
        )
240 243

  
241 244
    def get_file_pointer(self):
242 245
        if self.filename.startswith('uuid-'):
243 246
            return None
244 247
        return open(self.filename, 'rb')  # pylint: disable=consider-using-with
245 248

  
246 249
    def __getstate__(self):
......
285 288

  
286 289
    def get_json_export_dict(self, anonymise=False, include_files=True):
287 290
        if not include_files or anonymise:
288 291
            return None
289 292
        d = {
290 293
            'type': 'workflow-attachment',
291 294
            'content_type': self.content_type,
292 295
            'filename': self.base_filename,
296
            'to': self.to,
293 297
        }
294 298
        fd = self.get_file_pointer()
295 299
        if fd:
296 300
            d['content'] = base64.encodebytes(fd.read())
297 301
            fd.close()
298 302
        return d
299 303

  
300 304
    @classmethod
301
-