Projet

Général

Profil

0001-workflows-add-possibility-to-add-remove-a-computed-r.patch

Frédéric Péters, 14 octobre 2018 10:03

Télécharger (9,25 ko)

Voir les différences:

Subject: [PATCH] workflows: add possibility to add/remove a computed role
 (#27313)

 tests/test_workflows.py | 48 ++++++++++++++++++++++++++++++++++++++++-
 wcs/wf/dispatch.py      | 19 +++-------------
 wcs/wf/roles.py         | 44 ++++++++++++++++++++-----------------
 wcs/workflows.py        | 14 ++++++++++++
 4 files changed, 88 insertions(+), 37 deletions(-)
tests/test_workflows.py
508 508
    item.role_id = '="foobar"'
509 509
    item.perform(formdata)
510 510
    assert not formdata.workflow_roles
511
    assert caplog.records[-1].message == 'error in dispatch, missing role foobar (="foobar")'
511
    assert caplog.records[-1].message == 'error in dispatch, missing role (="foobar")'
512 512

  
513 513
def test_roles(pub):
514 514
    user = pub.user_class()
......
551 551
    item.perform(formdata)
552 552
    assert pub.user_class.get(user.id).roles == ['2']
553 553

  
554
def test_add_remove_computed_roles(pub):
555
    user = pub.user_class()
556
    user.store()
557

  
558
    formdef = FormDef()
559
    formdef.name = 'baz'
560
    formdef.store()
561

  
562
    formdata = formdef.data_class()()
563
    formdata.user_id = user.id
564

  
565
    role = Role(name='plop')
566
    role.store()
567
    role2 = Role(name='xxx')
568
    role2.store()
569

  
570
    item = AddRoleWorkflowStatusItem()
571

  
572
    item.perform(formdata)
573
    assert not pub.user_class.get(user.id).roles
574

  
575
    item.role_id = role.name
576
    item.perform(formdata)
577
    assert pub.user_class.get(user.id).roles == [role.id]
578

  
579
    user.roles = None
580
    user.store()
581
    item = RemoveRoleWorkflowStatusItem()
582

  
583
    item.perform(formdata)
584
    assert not pub.user_class.get(user.id).roles
585

  
586
    item.role_id = role.name
587
    item.perform(formdata)
588
    assert not pub.user_class.get(user.id).roles
589

  
590
    user.roles = [role.id]
591
    user.store()
592
    item.perform(formdata)
593
    assert not pub.user_class.get(user.id).roles
594

  
595
    user.roles = [role2.id, role.id]
596
    user.store()
597
    item.perform(formdata)
598
    assert pub.user_class.get(user.id).roles == [role2.id]
599

  
554 600
def test_roles_idp(pub):
555 601
    pub.cfg['sp'] = {'idp-manage-user-attributes': True}
556 602
    pub.cfg['idp'] = {'xxx': {'metadata_url': 'http://idp.example.net/idp/saml2/metadata'}}
wcs/wf/dispatch.py
185 185
        if self.dispatch_type == 'manual' or not self.dispatch_type:
186 186
            if not (self.role_id and self.role_key):
187 187
                return
188
            new_role_id = self.compute(self.role_id)
189
            if not Role.has_key(new_role_id):
190
                # computed value, not an id, try to get role by slug
191
                new_role = Role.get_on_index(new_role_id, 'slug', ignore_errors=True)
192
                if not new_role:
193
                    # fallback to role label
194
                    for role in Role.select():
195
                        if role.name == new_role_id:
196
                            new_role = role
197
                            break
198
                if new_role:
199
                    new_role_id = new_role.id
200
                else:
201
                    get_logger().error('error in dispatch, missing role %s (%s)' % (
202
                        new_role_id, self.role_id))
203
                    new_role_id = None
188
            new_role_id = self.get_computed_role_id(self.role_id)
189
            if not new_role_id:
190
                get_logger().error('error in dispatch, missing role (%s)' % self.role_id)
204 191
        elif self.dispatch_type == 'automatic':
205 192
            if not (self.role_key and self.variable and self.rules):
206 193
                return
wcs/wf/roles.py
59 59
        super(AddRoleWorkflowStatusItem, self).add_parameters_widgets(
60 60
                form, parameters, prefix=prefix, formdef=formdef)
61 61
        if 'role_id' in parameters:
62
            form.add(SingleSelectWidget, '%srole_id' % prefix,
63
                     title=_('Role to Add'), value=str(self.role_id),
62
            form.add(SingleSelectWidgetWithOther, '%srole_id' % prefix,
63
                     title=_('Role to Add'), value=str(self.role_id) if self.role_id else None,
64 64
                     options=[(None, '----', None)] + get_user_roles())
65 65

  
66 66
    def role_id_export_to_xml(self, item, charset, include_id=False):
......
74 74
    def perform(self, formdata):
75 75
        if not self.role_id:
76 76
            return
77
        self.role_id = str(self.role_id)
77
        role_id = self.get_computed_role_id(self.role_id)
78
        if not role_id:
79
            return
78 80
        if not formdata.user_id:
79 81
            # we can't work on anonymous forms
80 82
            return
81 83
        user = get_publisher().user_class.get(formdata.user_id)
82
        self.perform_local(user, formdata)
84
        self.perform_local(user, formdata, role_id)
83 85
        if user.name_identifiers and is_idp_managing_user_attributes():
84
            self.perform_idp(user, formdata)
86
            self.perform_idp(user, formdata, role_id)
85 87

  
86
    def perform_local(self, user, formdata):
88
    def perform_local(self, user, formdata, role_id):
87 89
        if not user.roles:
88 90
            user.roles = []
89
        if not self.role_id in user.roles:
90
            user.roles.append(self.role_id)
91
        if not role_id in user.roles:
92
            user.roles.append(role_id)
91 93
        user.store()
92 94
        request = get_request()
93 95
        if request and request.user and request.user.id == user.id:
......
95 97
            # changes.
96 98
            request.user = user
97 99

  
98
    def perform_idp(self, user, formdata):
99
        role = Role.get(self.role_id)
100
    def perform_idp(self, user, formdata, role_id):
101
        role = Role.get(role_id)
100 102
        role_uuid = role.uuid or role.slug
101 103
        user_uuid = user.name_identifiers[0]
102 104
        try:
......
135 137
        super(RemoveRoleWorkflowStatusItem, self).add_parameters_widgets(
136 138
                form, parameters, prefix=prefix, formdef=formdef)
137 139
        if 'role_id' in parameters:
138
            form.add(SingleSelectWidget, '%srole_id' % prefix,
139
                     title=_('Role to Remove'), value=self.role_id,
140
            form.add(SingleSelectWidgetWithOther, '%srole_id' % prefix,
141
                     title=_('Role to Remove'), value=str(self.role_id) if self.role_id else None,
140 142
                     options=[(None, '----', None)] + get_user_roles())
141 143

  
142 144
    def perform(self, formdata):
143 145
        if not self.role_id:
144 146
            return
145
        self.role_id = str(self.role_id)
147
        role_id = self.get_computed_role_id(self.role_id)
148
        if not role_id:
149
            return
146 150
        if not formdata.user_id:
147 151
            # we can't work on anonymous forms
148 152
            return
149 153
        user = get_publisher().user_class.get(formdata.user_id)
150
        self.perform_local(user, formdata)
154
        self.perform_local(user, formdata, role_id)
151 155
        if user.name_identifiers and is_idp_managing_user_attributes():
152
            self.perform_idp(user, formdata)
156
            self.perform_idp(user, formdata, role_id)
153 157

  
154
    def perform_local(self, user, formdata):
155
        if user.roles and self.role_id in user.roles:
156
            user.roles.remove(self.role_id)
158
    def perform_local(self, user, formdata, role_id):
159
        if user.roles and role_id in user.roles:
160
            user.roles.remove(role_id)
157 161
            user.store()
158 162
            request = get_request()
159 163
            if request and request.user and request.user.id == user.id:
......
161 165
                # with the changes.
162 166
                request.user = user
163 167

  
164
    def perform_idp(self, user, formdata):
165
        role = Role.get(self.role_id)
168
    def perform_idp(self, user, formdata, role_id):
169
        role = Role.get(role_id)
166 170
        role_uuid = role.uuid or role.slug
167 171
        user_uuid = user.name_identifiers[0]
168 172
        try:
wcs/workflows.py
1729 1729
                raise
1730 1730
            return var
1731 1731

  
1732
    def get_computed_role_id(self, role_id):
1733
        new_role_id = self.compute(str(role_id))
1734
        if Role.has_key(new_role_id):
1735
            return new_role_id
1736
        # computed value, not an id, try to get role by slug
1737
        new_role = Role.get_on_index(new_role_id, 'slug', ignore_errors=True)
1738
        if new_role:
1739
            return new_role.id
1740
        # fallback to role label
1741
        for role in Role.select():
1742
            if role.name == new_role_id:
1743
                return role.id
1744
        return None
1745

  
1732 1746
    def get_substitution_variables(self, formdata):
1733 1747
        return {}
1734 1748

  
1735
-