From d4bc3e03fd6ef2b557cc0c8c7eb2a2af1aa6c322 Mon Sep 17 00:00:00 2001 From: Paul Marillonnet Date: Tue, 28 Apr 2020 14:01:19 +0200 Subject: [PATCH 1/3] a2_rbac: do not break unicity when get-or-creating admin role (#42178) --- src/authentic2/a2_rbac/management.py | 33 ++++++++------ src/authentic2/a2_rbac/managers.py | 64 +++++++++++++++++----------- src/authentic2/a2_rbac/models.py | 4 +- tests/test_a2_rbac.py | 6 +-- 4 files changed, 66 insertions(+), 41 deletions(-) diff --git a/src/authentic2/a2_rbac/management.py b/src/authentic2/a2_rbac/management.py index 88d887fc..959a6806 100644 --- a/src/authentic2/a2_rbac/management.py +++ b/src/authentic2/a2_rbac/management.py @@ -20,7 +20,9 @@ from django.utils.text import slugify from django.contrib.contenttypes.models import ContentType from django_rbac.models import ADMIN_OP -from django_rbac.utils import get_role_model, get_ou_model +from django_rbac.utils import get_ou_model +from django_rbac.utils import get_permission_model +from django_rbac.utils import get_role_model from ..utils import get_fk_model from . import utils, app_settings @@ -140,7 +142,9 @@ def update_content_types_roles(): continue ct_admin_role = Role.objects.get_admin_role(instance=ct, name=name, slug=slug, - update_name=True) + update_name=True, + update_slug=True, + create=True) if MANAGED_CT[ct_tuple].get('must_view_user'): ct_admin_role.permissions.add(view_user_perm) if MANAGED_CT[ct_tuple].get('must_manage_authorizations_user'): @@ -150,17 +154,22 @@ def update_content_types_roles(): def update_user_admin_roles_permission(): - roles = get_role_model().objects.filter(slug__startswith='_a2-managers-of-role', - permissions__operation__slug=ADMIN_OP.slug) - for role in roles: - old_perm = role.permissions.get(operation__slug=ADMIN_OP.slug) - administered_role = old_perm.target + role_ct = ContentType.objects.get_for_model(get_role_model()) + permissions = get_permission_model().objects.filter(target_ct=role_ct, operation__slug=ADMIN_OP.slug) + for perm in permissions: + administered_role = perm.target admin_role = administered_role.get_admin_role() - new_perm = admin_role.permissions.get(operation__slug=MANAGE_MEMBERS_OP.slug, - target_id=administered_role.pk) + assert admin_role.slug.startswith('_a2-managers-of-role') + new_perm = admin_role.permissions.get( + operation__slug=MANAGE_MEMBERS_OP.slug, + target_ct=role_ct, + target_id=administered_role.id) + assert new_perm.ou is None admin_role.delete() + assert len(perm.roles.all()) == 1 + role = perm.roles.first() role.admin_scope_id = new_perm.pk - role.save() - role.permissions.remove(old_perm) + role.save(update_fields=['admin_scope_id']) + role.permissions.remove(perm) role.permissions.add(new_perm) - assert role.pk == administered_role.get_admin_role().pk + assert role.pk == administered_role.get_admin_role(create=False).pk diff --git a/src/authentic2/a2_rbac/managers.py b/src/authentic2/a2_rbac/managers.py index 0ef6a8cc..6aea627a 100644 --- a/src/authentic2/a2_rbac/managers.py +++ b/src/authentic2/a2_rbac/managers.py @@ -33,18 +33,30 @@ class RoleManager(BaseRoleManager): self_administered=False, create=True): '''Get or create the role of manager's of this object instance''' kwargs = {} - if ou or getattr(instance, 'ou', None): - ou = kwargs['ou'] = ou or instance.ou - else: - kwargs['ou__isnull'] = True + assert not ou or isinstance(instance, ContentType), ( + 'get_admin_role(ou=...) can only be used with ContentType instances: %s %s %s' % (name, ou, instance) + ) + + # Does the permission need to be scoped by ou ? Yes if the target is a + # ContentType and ou is given. It's a general administration right upon + # all instance of a ContentType, eventually scoped to the given ou. + defaults = {} + if isinstance(instance, ContentType): + if ou: + kwargs['ou'] = ou + else: + kwargs['ou__isnull'] = True + else: # for non ContentType instances, OU must be set to NULL, always. + defaults['ou'] = None # find an operation matching the template op = get_operation(operation) Permission = rbac_utils.get_permission_model() if create: - perm, created = Permission.objects.get_or_create( + perm, _ = Permission.objects.update_or_create( operation=op, target_ct=ContentType.objects.get_for_model(instance), target_id=instance.pk, + defaults=defaults, **kwargs) else: try: @@ -55,9 +67,15 @@ class RoleManager(BaseRoleManager): **kwargs) except Permission.DoesNotExist: return None - created = False - admin_role = self.get_mirror_role(perm, name, slug, ou=ou, + # in which ou do we put the role ? + if ou: + mirror_role_ou = ou + elif getattr(instance, 'ou', None): + mirror_role_ou = instance.ou + else: + mirror_role_ou = None + admin_role = self.get_mirror_role(perm, name, slug, ou=mirror_role_ou, update_name=update_name, update_slug=update_slug, create=create) @@ -76,23 +94,26 @@ class RoleManager(BaseRoleManager): def get_mirror_role(self, instance, name, slug, ou=None, update_name=False, update_slug=False, create=True): - '''Get or create a role which mirror another model, for example a + '''Get or create a role which mirrors another model, for example a permission. ''' ct = ContentType.objects.get_for_model(instance) + update_fields = {} kwargs = {} - if ou or getattr(instance, 'ou', None): - kwargs['ou'] = ou or instance.ou + if ou: + update_fields['ou'] = ou else: - kwargs['ou__isnull'] = True + update_fields['ou'] = None + if update_name: + update_fields['name'] = name + if update_slug: + update_fields['slug'] = slug + if create: - role, created = self.prefetch_related('permissions').get_or_create( + role, _ = self.prefetch_related('permissions').update_or_create( admin_scope_ct=ct, admin_scope_id=instance.pk, - defaults={ - 'name': name, - 'slug': slug, - }, + defaults=update_fields, **kwargs) else: try: @@ -102,14 +123,9 @@ class RoleManager(BaseRoleManager): **kwargs) except self.model.DoesNotExist: return None - created = False - - if update_name and not created and role.name != name: - role.name = name - role.save() - if update_slug and not created and role.slug != slug: - role.slug = slug - role.save() + for field, value in update_fields.items(): + setattr(role, field, value) + role.save(update_fields=update_fields) return role def get_by_natural_key(self, slug, ou_natural_key, service_natural_key): diff --git a/src/authentic2/a2_rbac/models.py b/src/authentic2/a2_rbac/models.py index 4c743812..0eaa9c97 100644 --- a/src/authentic2/a2_rbac/models.py +++ b/src/authentic2/a2_rbac/models.py @@ -150,7 +150,7 @@ class OrganizationalUnit(OrganizationalUnitAbstractBase): slug = '_a2-managers-of-{ou.slug}'.format(ou=self) return Role.objects.get_admin_role( instance=self, name=name, slug=slug, operation=VIEW_OP, - update_name=True, update_slug=True) + update_name=True, update_slug=True, create=True) def delete(self, *args, **kwargs): Permission.objects.filter(ou=self).delete() @@ -239,7 +239,7 @@ class Role(RoleAbstractBase): view_user_perm = utils.get_view_user_perm() admin_role = self.__class__.objects.get_admin_role( - self, ou=self.ou, + self, name=_('Managers of role "{role}"').format( role=six.text_type(self)), slug='_a2-managers-of-role-{role}'.format( diff --git a/tests/test_a2_rbac.py b/tests/test_a2_rbac.py index 07b83f48..86d969f5 100644 --- a/tests/test_a2_rbac.py +++ b/tests/test_a2_rbac.py @@ -110,7 +110,7 @@ def test_admin_roles_update_slug(db): user = User.objects.create(username='john.doe') name1 = 'Can manage john.doe' slug1 = 'can-manage-john-doe' - admin_role1 = Role.objects.get_admin_role(user, name1, slug1) + admin_role1 = Role.objects.get_admin_role(user, name1, slug1, update_name=True, update_slug=True) assert admin_role1.name == name1 assert admin_role1.slug == slug1 name2 = 'Should manage john.doe' @@ -405,7 +405,7 @@ def test_role_rename(db): assert ar1.slug == '_a2-managers-of-role-r1ter' -def test_admin_role_user_view(settings, app, admin, simple_user, ou1, user_ou1, role_ou1): +def test_admin_role_user_view(db, settings, app, admin, simple_user, ou1, user_ou1, role_ou1): role_ou1.get_admin_role().members.add(simple_user) # Default: all users are visible @@ -495,7 +495,7 @@ def test_manager_roles_multi_ou(db, ou1): @pytest.mark.parametrize( 'alert,deletion', [(-1, 31), (31, -1), (0, 31), (31, 0), (None, 31), (31, None), (32, 31)] ) -def test_unused_account_settings_validation(ou1, alert, deletion): +def test_unused_account_settings_validation(db, ou1, alert, deletion): ou1.clean_unused_accounts_alert = alert ou1.clean_unused_accounts_deletion = deletion with pytest.raises(ValidationError): -- 2.28.0