0001-a2_rbac-do-not-break-unicity-when-get-or-creating-ad.patch
src/authentic2/a2_rbac/management.py | ||
---|---|---|
138 | 138 |
continue |
139 | 139 |
ct_admin_role = Role.objects.get_admin_role(instance=ct, name=name, |
140 | 140 |
slug=slug, |
141 |
update_name=True) |
|
141 |
update_name=True, |
|
142 |
update_slug=True, |
|
143 |
create=True) |
|
142 | 144 |
if MANAGED_CT[ct_tuple].get('must_view_user'): |
143 | 145 |
ct_admin_role.permissions.add(view_user_perm) |
144 | 146 |
ct_admin_role.permissions.add(search_ou_perm) |
... | ... | |
155 | 157 |
new_perm = admin_role.permissions.get(operation__slug=MANAGE_MEMBERS_OP.slug) |
156 | 158 |
admin_role.delete() |
157 | 159 |
role.admin_scope_id = new_perm.pk |
158 |
role.save() |
|
160 |
role.save(update_fields=['admin_scope_id'])
|
|
159 | 161 |
role.permissions.remove(old_perm) |
160 | 162 |
role.permissions.add(new_perm) |
161 |
assert role.pk == administered_role.get_admin_role().pk |
|
163 |
assert role.pk == administered_role.get_admin_role(create=False).pk |
src/authentic2/a2_rbac/managers.py | ||
---|---|---|
29 | 29 | |
30 | 30 |
class RoleManager(BaseRoleManager): |
31 | 31 |
def get_admin_role(self, instance, name, slug, ou=None, operation=ADMIN_OP, |
32 |
update_name=False, update_slug=False, permissions=(),
|
|
33 |
self_administered=False, create=True): |
|
32 |
update_name=False, update_slug=False, update_ou=False,
|
|
33 |
permissions=(), self_administered=False, create=True):
|
|
34 | 34 |
'''Get or create the role of manager's of this object instance''' |
35 | 35 |
kwargs = {} |
36 | 36 |
if ou or getattr(instance, 'ou', None): |
... | ... | |
41 | 41 |
op = get_operation(operation) |
42 | 42 |
Permission = rbac_utils.get_permission_model() |
43 | 43 |
if create: |
44 |
perm, created = Permission.objects.get_or_create(
|
|
44 |
perm, _ = Permission.objects.get_or_create(
|
|
45 | 45 |
operation=op, |
46 | 46 |
target_ct=ContentType.objects.get_for_model(instance), |
47 | 47 |
target_id=instance.pk, |
... | ... | |
55 | 55 |
**kwargs) |
56 | 56 |
except Permission.DoesNotExist: |
57 | 57 |
return None |
58 |
created = False |
|
59 | 58 | |
60 | 59 |
admin_role = self.get_mirror_role(perm, name, slug, ou=ou, |
61 | 60 |
update_name=update_name, |
62 | 61 |
update_slug=update_slug, |
62 |
update_ou=update_ou, |
|
63 | 63 |
create=create) |
64 | 64 | |
65 | 65 |
if not admin_role: |
... | ... | |
75 | 75 |
return admin_role |
76 | 76 | |
77 | 77 |
def get_mirror_role(self, instance, name, slug, ou=None, |
78 |
update_name=False, update_slug=False, create=True): |
|
79 |
'''Get or create a role which mirror another model, for example a |
|
78 |
update_name=False, update_slug=False, update_ou=False, |
|
79 |
create=True): |
|
80 |
'''Get or create a role which mirrors another model, for example a |
|
80 | 81 |
permission. |
81 | 82 |
''' |
82 | 83 |
ct = ContentType.objects.get_for_model(instance) |
84 |
update_fields = {} |
|
83 | 85 |
kwargs = {} |
84 |
if ou or getattr(instance, 'ou', None): |
|
85 |
kwargs['ou'] = ou or instance.ou |
|
86 |
ou = ou or getattr(instance, 'ou', None) |
|
87 |
if ou: |
|
88 |
if update_ou: |
|
89 |
update_fields['ou'] = ou |
|
90 |
else: |
|
91 |
kwargs['ou'] = ou |
|
86 | 92 |
else: |
87 | 93 |
kwargs['ou__isnull'] = True |
94 |
if update_name: |
|
95 |
update_fields['name'] = name |
|
96 |
if update_slug: |
|
97 |
update_fields['slug'] = slug |
|
98 | ||
88 | 99 |
if create: |
89 |
role, created = self.prefetch_related('permissions').get_or_create(
|
|
100 |
role, _ = self.prefetch_related('permissions').update_or_create(
|
|
90 | 101 |
admin_scope_ct=ct, |
91 | 102 |
admin_scope_id=instance.pk, |
92 |
defaults={ |
|
93 |
'name': name, |
|
94 |
'slug': slug, |
|
95 |
}, |
|
103 |
defaults=update_fields, |
|
96 | 104 |
**kwargs) |
97 | 105 |
else: |
98 | 106 |
try: |
... | ... | |
102 | 110 |
**kwargs) |
103 | 111 |
except self.model.DoesNotExist: |
104 | 112 |
return None |
105 |
created = False |
|
106 | ||
107 |
if update_name and not created and role.name != name: |
|
108 |
role.name = name |
|
109 |
role.save() |
|
110 |
if update_slug and not created and role.slug != slug: |
|
111 |
role.slug = slug |
|
112 |
role.save() |
|
113 |
for field, value in update_fields.items(): |
|
114 |
setattr(role, field, value) |
|
115 |
role.save(update_fields=update_fields) |
|
113 | 116 |
return role |
114 | 117 | |
115 | 118 |
def get_by_natural_key(self, slug, ou_natural_key, service_natural_key): |
src/authentic2/a2_rbac/migrations/0011_auto_20160209_1511.py | ||
---|---|---|
13 | 13 |
operations = [ |
14 | 14 |
migrations.AlterUniqueTogether( |
15 | 15 |
name='role', |
16 |
unique_together=set([('admin_scope_ct', 'admin_scope_id')]), |
|
16 |
unique_together=set([('admin_scope_ct', 'admin_scope_id', 'ou')]),
|
|
17 | 17 |
), |
18 | 18 |
] |
src/authentic2/a2_rbac/models.py | ||
---|---|---|
151 | 151 |
slug = '_a2-managers-of-{ou.slug}'.format(ou=self) |
152 | 152 |
return Role.objects.get_admin_role( |
153 | 153 |
instance=self, name=name, slug=slug, operation=VIEW_OP, |
154 |
update_name=True, update_slug=True) |
|
154 |
update_name=True, update_slug=True, create=True)
|
|
155 | 155 | |
156 | 156 |
def delete(self, *args, **kwargs): |
157 | 157 |
Permission.objects.filter(ou=self).delete() |
... | ... | |
245 | 245 |
self_administered=True, |
246 | 246 |
update_name=True, |
247 | 247 |
update_slug=True, |
248 |
update_ou=True, |
|
248 | 249 |
create=create, |
249 | 250 |
operation=MANAGE_MEMBERS_OP) |
250 | 251 |
return admin_role |
... | ... | |
312 | 313 |
verbose_name_plural = _('roles') |
313 | 314 |
ordering = ('ou', 'service', 'name',) |
314 | 315 |
unique_together = ( |
315 |
('admin_scope_ct', 'admin_scope_id'), |
|
316 |
('admin_scope_ct', 'admin_scope_id', 'ou'),
|
|
316 | 317 |
) |
317 | 318 | |
318 | 319 |
def natural_key(self): |
tests/test_a2_rbac.py | ||
---|---|---|
108 | 108 |
user = User.objects.create(username='john.doe') |
109 | 109 |
name1 = 'Can manage john.doe' |
110 | 110 |
slug1 = 'can-manage-john-doe' |
111 |
admin_role1 = Role.objects.get_admin_role(user, name1, slug1) |
|
111 |
admin_role1 = Role.objects.get_admin_role(user, name1, slug1, update_name=True, update_slug=True)
|
|
112 | 112 |
assert admin_role1.name == name1 |
113 | 113 |
assert admin_role1.slug == slug1 |
114 | 114 |
name2 = 'Should manage john.doe' |
... | ... | |
403 | 403 |
assert ar1.slug == '_a2-managers-of-role-r1ter' |
404 | 404 | |
405 | 405 | |
406 |
def test_admin_role_user_view(settings, app, admin, simple_user, ou1, user_ou1, role_ou1): |
|
406 |
def test_admin_role_user_view(db, settings, app, admin, simple_user, ou1, user_ou1, role_ou1):
|
|
407 | 407 |
role_ou1.get_admin_role().members.add(simple_user) |
408 | 408 | |
409 | 409 |
# Default: all users are visible |
... | ... | |
499 | 499 |
@pytest.mark.parametrize( |
500 | 500 |
'alert,deletion', [(-1, 31), (31, -1), (0, 31), (31, 0), (None, 31), (31, None), (32, 31)] |
501 | 501 |
) |
502 |
def test_unused_account_settings_validation(ou1, alert, deletion): |
|
502 |
def test_unused_account_settings_validation(db, ou1, alert, deletion):
|
|
503 | 503 |
ou1.clean_unused_accounts_alert = alert |
504 | 504 |
ou1.clean_unused_accounts_deletion = deletion |
505 | 505 |
with pytest.raises(ValidationError): |
506 |
- |