0001-workflows-add-possibility-to-add-remove-a-computed-r.patch
tests/test_workflows.py | ||
---|---|---|
508 | 508 |
item.role_id = '="foobar"' |
509 | 509 |
item.perform(formdata) |
510 | 510 |
assert not formdata.workflow_roles |
511 |
assert caplog.records[-1].message == 'error in dispatch, missing role foobar (="foobar")'
|
|
511 |
assert caplog.records[-1].message == 'error in dispatch, missing role (="foobar")' |
|
512 | 512 | |
513 | 513 |
def test_roles(pub): |
514 | 514 |
user = pub.user_class() |
... | ... | |
551 | 551 |
item.perform(formdata) |
552 | 552 |
assert pub.user_class.get(user.id).roles == ['2'] |
553 | 553 | |
554 |
def test_add_remove_computed_roles(pub): |
|
555 |
user = pub.user_class() |
|
556 |
user.store() |
|
557 | ||
558 |
formdef = FormDef() |
|
559 |
formdef.name = 'baz' |
|
560 |
formdef.store() |
|
561 | ||
562 |
formdata = formdef.data_class()() |
|
563 |
formdata.user_id = user.id |
|
564 | ||
565 |
role = Role(name='plop') |
|
566 |
role.store() |
|
567 |
role2 = Role(name='xxx') |
|
568 |
role2.store() |
|
569 | ||
570 |
item = AddRoleWorkflowStatusItem() |
|
571 | ||
572 |
item.perform(formdata) |
|
573 |
assert not pub.user_class.get(user.id).roles |
|
574 | ||
575 |
item.role_id = role.name |
|
576 |
item.perform(formdata) |
|
577 |
assert pub.user_class.get(user.id).roles == [role.id] |
|
578 | ||
579 |
user.roles = None |
|
580 |
user.store() |
|
581 |
item = RemoveRoleWorkflowStatusItem() |
|
582 | ||
583 |
item.perform(formdata) |
|
584 |
assert not pub.user_class.get(user.id).roles |
|
585 | ||
586 |
item.role_id = role.name |
|
587 |
item.perform(formdata) |
|
588 |
assert not pub.user_class.get(user.id).roles |
|
589 | ||
590 |
user.roles = [role.id] |
|
591 |
user.store() |
|
592 |
item.perform(formdata) |
|
593 |
assert not pub.user_class.get(user.id).roles |
|
594 | ||
595 |
user.roles = [role2.id, role.id] |
|
596 |
user.store() |
|
597 |
item.perform(formdata) |
|
598 |
assert pub.user_class.get(user.id).roles == [role2.id] |
|
599 | ||
554 | 600 |
def test_roles_idp(pub): |
555 | 601 |
pub.cfg['sp'] = {'idp-manage-user-attributes': True} |
556 | 602 |
pub.cfg['idp'] = {'xxx': {'metadata_url': 'http://idp.example.net/idp/saml2/metadata'}} |
wcs/wf/dispatch.py | ||
---|---|---|
185 | 185 |
if self.dispatch_type == 'manual' or not self.dispatch_type: |
186 | 186 |
if not (self.role_id and self.role_key): |
187 | 187 |
return |
188 |
new_role_id = self.compute(self.role_id) |
|
189 |
if not Role.has_key(new_role_id): |
|
190 |
# computed value, not an id, try to get role by slug |
|
191 |
new_role = Role.get_on_index(new_role_id, 'slug', ignore_errors=True) |
|
192 |
if not new_role: |
|
193 |
# fallback to role label |
|
194 |
for role in Role.select(): |
|
195 |
if role.name == new_role_id: |
|
196 |
new_role = role |
|
197 |
break |
|
198 |
if new_role: |
|
199 |
new_role_id = new_role.id |
|
200 |
else: |
|
201 |
get_logger().error('error in dispatch, missing role %s (%s)' % ( |
|
202 |
new_role_id, self.role_id)) |
|
203 |
new_role_id = None |
|
188 |
new_role_id = self.get_computed_role_id(self.role_id) |
|
189 |
if not new_role_id: |
|
190 |
get_logger().error('error in dispatch, missing role (%s)' % self.role_id) |
|
204 | 191 |
elif self.dispatch_type == 'automatic': |
205 | 192 |
if not (self.role_key and self.variable and self.rules): |
206 | 193 |
return |
wcs/wf/roles.py | ||
---|---|---|
59 | 59 |
super(AddRoleWorkflowStatusItem, self).add_parameters_widgets( |
60 | 60 |
form, parameters, prefix=prefix, formdef=formdef) |
61 | 61 |
if 'role_id' in parameters: |
62 |
form.add(SingleSelectWidget, '%srole_id' % prefix, |
|
63 |
title=_('Role to Add'), value=str(self.role_id), |
|
62 |
form.add(SingleSelectWidgetWithOther, '%srole_id' % prefix,
|
|
63 |
title=_('Role to Add'), value=str(self.role_id) if self.role_id else None,
|
|
64 | 64 |
options=[(None, '----', None)] + get_user_roles()) |
65 | 65 | |
66 | 66 |
def role_id_export_to_xml(self, item, charset, include_id=False): |
... | ... | |
74 | 74 |
def perform(self, formdata): |
75 | 75 |
if not self.role_id: |
76 | 76 |
return |
77 |
self.role_id = str(self.role_id) |
|
77 |
role_id = self.get_computed_role_id(self.role_id) |
|
78 |
if not role_id: |
|
79 |
return |
|
78 | 80 |
if not formdata.user_id: |
79 | 81 |
# we can't work on anonymous forms |
80 | 82 |
return |
81 | 83 |
user = get_publisher().user_class.get(formdata.user_id) |
82 |
self.perform_local(user, formdata) |
|
84 |
self.perform_local(user, formdata, role_id)
|
|
83 | 85 |
if user.name_identifiers and is_idp_managing_user_attributes(): |
84 |
self.perform_idp(user, formdata) |
|
86 |
self.perform_idp(user, formdata, role_id)
|
|
85 | 87 | |
86 |
def perform_local(self, user, formdata): |
|
88 |
def perform_local(self, user, formdata, role_id):
|
|
87 | 89 |
if not user.roles: |
88 | 90 |
user.roles = [] |
89 |
if not self.role_id in user.roles:
|
|
90 |
user.roles.append(self.role_id)
|
|
91 |
if not role_id in user.roles: |
|
92 |
user.roles.append(role_id) |
|
91 | 93 |
user.store() |
92 | 94 |
request = get_request() |
93 | 95 |
if request and request.user and request.user.id == user.id: |
... | ... | |
95 | 97 |
# changes. |
96 | 98 |
request.user = user |
97 | 99 | |
98 |
def perform_idp(self, user, formdata): |
|
99 |
role = Role.get(self.role_id)
|
|
100 |
def perform_idp(self, user, formdata, role_id):
|
|
101 |
role = Role.get(role_id) |
|
100 | 102 |
role_uuid = role.uuid or role.slug |
101 | 103 |
user_uuid = user.name_identifiers[0] |
102 | 104 |
try: |
... | ... | |
135 | 137 |
super(RemoveRoleWorkflowStatusItem, self).add_parameters_widgets( |
136 | 138 |
form, parameters, prefix=prefix, formdef=formdef) |
137 | 139 |
if 'role_id' in parameters: |
138 |
form.add(SingleSelectWidget, '%srole_id' % prefix, |
|
139 |
title=_('Role to Remove'), value=self.role_id,
|
|
140 |
form.add(SingleSelectWidgetWithOther, '%srole_id' % prefix,
|
|
141 |
title=_('Role to Remove'), value=str(self.role_id) if self.role_id else None,
|
|
140 | 142 |
options=[(None, '----', None)] + get_user_roles()) |
141 | 143 | |
142 | 144 |
def perform(self, formdata): |
143 | 145 |
if not self.role_id: |
144 | 146 |
return |
145 |
self.role_id = str(self.role_id) |
|
147 |
role_id = self.get_computed_role_id(self.role_id) |
|
148 |
if not role_id: |
|
149 |
return |
|
146 | 150 |
if not formdata.user_id: |
147 | 151 |
# we can't work on anonymous forms |
148 | 152 |
return |
149 | 153 |
user = get_publisher().user_class.get(formdata.user_id) |
150 |
self.perform_local(user, formdata) |
|
154 |
self.perform_local(user, formdata, role_id)
|
|
151 | 155 |
if user.name_identifiers and is_idp_managing_user_attributes(): |
152 |
self.perform_idp(user, formdata) |
|
156 |
self.perform_idp(user, formdata, role_id)
|
|
153 | 157 | |
154 |
def perform_local(self, user, formdata): |
|
155 |
if user.roles and self.role_id in user.roles:
|
|
156 |
user.roles.remove(self.role_id)
|
|
158 |
def perform_local(self, user, formdata, role_id):
|
|
159 |
if user.roles and role_id in user.roles: |
|
160 |
user.roles.remove(role_id) |
|
157 | 161 |
user.store() |
158 | 162 |
request = get_request() |
159 | 163 |
if request and request.user and request.user.id == user.id: |
... | ... | |
161 | 165 |
# with the changes. |
162 | 166 |
request.user = user |
163 | 167 | |
164 |
def perform_idp(self, user, formdata): |
|
165 |
role = Role.get(self.role_id)
|
|
168 |
def perform_idp(self, user, formdata, role_id):
|
|
169 |
role = Role.get(role_id) |
|
166 | 170 |
role_uuid = role.uuid or role.slug |
167 | 171 |
user_uuid = user.name_identifiers[0] |
168 | 172 |
try: |
wcs/workflows.py | ||
---|---|---|
1729 | 1729 |
raise |
1730 | 1730 |
return var |
1731 | 1731 | |
1732 |
def get_computed_role_id(self, role_id): |
|
1733 |
new_role_id = self.compute(str(role_id)) |
|
1734 |
if Role.has_key(new_role_id): |
|
1735 |
return new_role_id |
|
1736 |
# computed value, not an id, try to get role by slug |
|
1737 |
new_role = Role.get_on_index(new_role_id, 'slug', ignore_errors=True) |
|
1738 |
if new_role: |
|
1739 |
return new_role.id |
|
1740 |
# fallback to role label |
|
1741 |
for role in Role.select(): |
|
1742 |
if role.name == new_role_id: |
|
1743 |
return role.id |
|
1744 |
return None |
|
1745 | ||
1732 | 1746 |
def get_substitution_variables(self, formdata): |
1733 | 1747 |
return {} |
1734 | 1748 | |
1735 |
- |