0001-a2_rbac-do-not-break-unicity-when-get-or-creating-ad.patch
src/authentic2/a2_rbac/management.py | ||
---|---|---|
20 | 20 |
from django.contrib.contenttypes.models import ContentType |
21 | 21 | |
22 | 22 |
from django_rbac.models import ADMIN_OP |
23 |
from django_rbac.utils import get_role_model, get_ou_model |
|
23 |
from django_rbac.utils import get_ou_model |
|
24 |
from django_rbac.utils import get_permission_model |
|
25 |
from django_rbac.utils import get_role_model |
|
24 | 26 | |
25 | 27 |
from ..utils import get_fk_model |
26 | 28 |
from . import utils, app_settings |
... | ... | |
140 | 142 |
continue |
141 | 143 |
ct_admin_role = Role.objects.get_admin_role(instance=ct, name=name, |
142 | 144 |
slug=slug, |
143 |
update_name=True) |
|
145 |
update_name=True, |
|
146 |
update_slug=True, |
|
147 |
create=True) |
|
144 | 148 |
if MANAGED_CT[ct_tuple].get('must_view_user'): |
145 | 149 |
ct_admin_role.permissions.add(view_user_perm) |
146 | 150 |
if MANAGED_CT[ct_tuple].get('must_manage_authorizations_user'): |
... | ... | |
150 | 154 | |
151 | 155 | |
152 | 156 |
def update_user_admin_roles_permission(): |
153 |
roles = get_role_model().objects.filter(slug__startswith='_a2-managers-of-role', |
|
154 |
permissions__operation__slug=ADMIN_OP.slug) |
|
155 |
for role in roles: |
|
156 |
old_perm = role.permissions.get(operation__slug=ADMIN_OP.slug) |
|
157 |
administered_role = old_perm.target |
|
157 |
role_ct = ContentType.objects.get_for_model(get_role_model()) |
|
158 |
permissions = get_permission_model().objects.filter(target_ct=role_ct, operation__slug=ADMIN_OP.slug) |
|
159 |
for perm in permissions: |
|
160 |
administered_role = perm.target |
|
158 | 161 |
admin_role = administered_role.get_admin_role() |
159 |
new_perm = admin_role.permissions.get(operation__slug=MANAGE_MEMBERS_OP.slug, |
|
160 |
target_id=administered_role.pk) |
|
162 |
assert admin_role.slug.startswith('_a2-managers-of-role') |
|
163 |
new_perm = admin_role.permissions.get( |
|
164 |
operation__slug=MANAGE_MEMBERS_OP.slug, |
|
165 |
target_ct=role_ct, |
|
166 |
target_id=administered_role.id) |
|
167 |
assert new_perm.ou is None |
|
161 | 168 |
admin_role.delete() |
169 |
assert len(perm.roles.all()) == 1 |
|
170 |
role = perm.roles.first() |
|
162 | 171 |
role.admin_scope_id = new_perm.pk |
163 |
role.save() |
|
164 |
role.permissions.remove(old_perm)
|
|
172 |
role.save(update_fields=['admin_scope_id'])
|
|
173 |
role.permissions.remove(perm) |
|
165 | 174 |
role.permissions.add(new_perm) |
166 |
assert role.pk == administered_role.get_admin_role().pk |
|
175 |
assert role.pk == administered_role.get_admin_role(create=False).pk |
src/authentic2/a2_rbac/managers.py | ||
---|---|---|
33 | 33 |
self_administered=False, create=True): |
34 | 34 |
'''Get or create the role of manager's of this object instance''' |
35 | 35 |
kwargs = {} |
36 |
if ou or getattr(instance, 'ou', None): |
|
37 |
ou = kwargs['ou'] = ou or instance.ou |
|
38 |
else: |
|
39 |
kwargs['ou__isnull'] = True |
|
36 |
assert not ou or isinstance(instance, ContentType), ( |
|
37 |
'get_admin_role(ou=...) can only be used with ContentType instances: %s %s %s' % (name, ou, instance) |
|
38 |
) |
|
39 | ||
40 |
# Does the permission need to be scoped by ou ? Yes if the target is a |
|
41 |
# ContentType and ou is given. It's a general administration right upon |
|
42 |
# all instance of a ContentType, eventually scoped to the given ou. |
|
43 |
defaults = {} |
|
44 |
if isinstance(instance, ContentType): |
|
45 |
if ou: |
|
46 |
kwargs['ou'] = ou |
|
47 |
else: |
|
48 |
kwargs['ou__isnull'] = True |
|
49 |
else: # for non ContentType instances, OU must be set to NULL, always. |
|
50 |
defaults['ou'] = None |
|
40 | 51 |
# find an operation matching the template |
41 | 52 |
op = get_operation(operation) |
42 | 53 |
Permission = rbac_utils.get_permission_model() |
43 | 54 |
if create: |
44 |
perm, created = Permission.objects.get_or_create(
|
|
55 |
perm, _ = Permission.objects.update_or_create(
|
|
45 | 56 |
operation=op, |
46 | 57 |
target_ct=ContentType.objects.get_for_model(instance), |
47 | 58 |
target_id=instance.pk, |
59 |
defaults=defaults, |
|
48 | 60 |
**kwargs) |
49 | 61 |
else: |
50 | 62 |
try: |
... | ... | |
55 | 67 |
**kwargs) |
56 | 68 |
except Permission.DoesNotExist: |
57 | 69 |
return None |
58 |
created = False |
|
59 | 70 | |
60 |
admin_role = self.get_mirror_role(perm, name, slug, ou=ou, |
|
71 |
# in which ou do we put the role ? |
|
72 |
if ou: |
|
73 |
mirror_role_ou = ou |
|
74 |
elif getattr(instance, 'ou', None): |
|
75 |
mirror_role_ou = instance.ou |
|
76 |
else: |
|
77 |
mirror_role_ou = None |
|
78 |
admin_role = self.get_mirror_role(perm, name, slug, ou=mirror_role_ou, |
|
61 | 79 |
update_name=update_name, |
62 | 80 |
update_slug=update_slug, |
63 | 81 |
create=create) |
... | ... | |
76 | 94 | |
77 | 95 |
def get_mirror_role(self, instance, name, slug, ou=None, |
78 | 96 |
update_name=False, update_slug=False, create=True): |
79 |
'''Get or create a role which mirror another model, for example a |
|
97 |
'''Get or create a role which mirrors another model, for example a
|
|
80 | 98 |
permission. |
81 | 99 |
''' |
82 | 100 |
ct = ContentType.objects.get_for_model(instance) |
101 |
update_fields = {} |
|
83 | 102 |
kwargs = {} |
84 |
if ou or getattr(instance, 'ou', None):
|
|
85 |
kwargs['ou'] = ou or instance.ou
|
|
103 |
if ou: |
|
104 |
update_fields['ou'] = ou
|
|
86 | 105 |
else: |
87 |
kwargs['ou__isnull'] = True |
|
106 |
update_fields['ou'] = None |
|
107 |
if update_name: |
|
108 |
update_fields['name'] = name |
|
109 |
if update_slug: |
|
110 |
update_fields['slug'] = slug |
|
111 | ||
88 | 112 |
if create: |
89 |
role, created = self.prefetch_related('permissions').get_or_create(
|
|
113 |
role, _ = self.prefetch_related('permissions').update_or_create(
|
|
90 | 114 |
admin_scope_ct=ct, |
91 | 115 |
admin_scope_id=instance.pk, |
92 |
defaults={ |
|
93 |
'name': name, |
|
94 |
'slug': slug, |
|
95 |
}, |
|
116 |
defaults=update_fields, |
|
96 | 117 |
**kwargs) |
97 | 118 |
else: |
98 | 119 |
try: |
... | ... | |
102 | 123 |
**kwargs) |
103 | 124 |
except self.model.DoesNotExist: |
104 | 125 |
return None |
105 |
created = False |
|
106 | ||
107 |
if update_name and not created and role.name != name: |
|
108 |
role.name = name |
|
109 |
role.save() |
|
110 |
if update_slug and not created and role.slug != slug: |
|
111 |
role.slug = slug |
|
112 |
role.save() |
|
126 |
for field, value in update_fields.items(): |
|
127 |
setattr(role, field, value) |
|
128 |
role.save(update_fields=update_fields) |
|
113 | 129 |
return role |
114 | 130 | |
115 | 131 |
def get_by_natural_key(self, slug, ou_natural_key, service_natural_key): |
src/authentic2/a2_rbac/models.py | ||
---|---|---|
150 | 150 |
slug = '_a2-managers-of-{ou.slug}'.format(ou=self) |
151 | 151 |
return Role.objects.get_admin_role( |
152 | 152 |
instance=self, name=name, slug=slug, operation=VIEW_OP, |
153 |
update_name=True, update_slug=True) |
|
153 |
update_name=True, update_slug=True, create=True)
|
|
154 | 154 | |
155 | 155 |
def delete(self, *args, **kwargs): |
156 | 156 |
Permission.objects.filter(ou=self).delete() |
... | ... | |
239 | 239 |
view_user_perm = utils.get_view_user_perm() |
240 | 240 | |
241 | 241 |
admin_role = self.__class__.objects.get_admin_role( |
242 |
self, ou=self.ou,
|
|
242 |
self, |
|
243 | 243 |
name=_('Managers of role "{role}"').format( |
244 | 244 |
role=six.text_type(self)), |
245 | 245 |
slug='_a2-managers-of-role-{role}'.format( |
tests/test_a2_rbac.py | ||
---|---|---|
110 | 110 |
user = User.objects.create(username='john.doe') |
111 | 111 |
name1 = 'Can manage john.doe' |
112 | 112 |
slug1 = 'can-manage-john-doe' |
113 |
admin_role1 = Role.objects.get_admin_role(user, name1, slug1) |
|
113 |
admin_role1 = Role.objects.get_admin_role(user, name1, slug1, update_name=True, update_slug=True)
|
|
114 | 114 |
assert admin_role1.name == name1 |
115 | 115 |
assert admin_role1.slug == slug1 |
116 | 116 |
name2 = 'Should manage john.doe' |
... | ... | |
405 | 405 |
assert ar1.slug == '_a2-managers-of-role-r1ter' |
406 | 406 | |
407 | 407 | |
408 |
def test_admin_role_user_view(settings, app, admin, simple_user, ou1, user_ou1, role_ou1): |
|
408 |
def test_admin_role_user_view(db, settings, app, admin, simple_user, ou1, user_ou1, role_ou1):
|
|
409 | 409 |
role_ou1.get_admin_role().members.add(simple_user) |
410 | 410 | |
411 | 411 |
# Default: all users are visible |
... | ... | |
495 | 495 |
@pytest.mark.parametrize( |
496 | 496 |
'alert,deletion', [(-1, 31), (31, -1), (0, 31), (31, 0), (None, 31), (31, None), (32, 31)] |
497 | 497 |
) |
498 |
def test_unused_account_settings_validation(ou1, alert, deletion): |
|
498 |
def test_unused_account_settings_validation(db, ou1, alert, deletion):
|
|
499 | 499 |
ou1.clean_unused_accounts_alert = alert |
500 | 500 |
ou1.clean_unused_accounts_deletion = deletion |
501 | 501 |
with pytest.raises(ValidationError): |
502 |
- |