From 6558bfd809a5e0e9fb56a8fd65140463342628aa Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Fri, 12 Jul 2019 10:03:06 +0200 Subject: [PATCH 3/3] a2_rbac: rename role's admin role on role's rename (#34774) --- src/authentic2/a2_rbac/managers.py | 60 ++++++++++++++++++++++-------- src/authentic2/a2_rbac/models.py | 11 ++++-- tests/test_a2_rbac.py | 60 ++++++++++++++++++++++++++++-- 3 files changed, 109 insertions(+), 22 deletions(-) diff --git a/src/authentic2/a2_rbac/managers.py b/src/authentic2/a2_rbac/managers.py index 6fdce9cb..e94edcd2 100644 --- a/src/authentic2/a2_rbac/managers.py +++ b/src/authentic2/a2_rbac/managers.py @@ -30,7 +30,7 @@ class OrganizationalUnitManager(AbstractBaseManager): class RoleManager(BaseRoleManager): def get_admin_role(self, instance, name, slug, ou=None, operation=ADMIN_OP, update_name=False, update_slug=False, permissions=(), - self_administered=False): + self_administered=False, create=True): '''Get or create the role of manager's of this object instance''' kwargs = {} if ou or getattr(instance, 'ou', None): @@ -40,14 +40,31 @@ class RoleManager(BaseRoleManager): # find an operation matching the template op = get_operation(operation) Permission = rbac_utils.get_permission_model() - perm, created = Permission.objects.get_or_create( - operation=op, - target_ct=ContentType.objects.get_for_model(instance), - target_id=instance.pk, - **kwargs) + if create: + perm, created = Permission.objects.get_or_create( + operation=op, + target_ct=ContentType.objects.get_for_model(instance), + target_id=instance.pk, + **kwargs) + else: + try: + perm = Permission.objects.get( + operation=op, + target_ct=ContentType.objects.get_for_model(instance), + target_id=instance.pk, + **kwargs) + except Permission.DoesNotExist: + return None + created = False + admin_role = self.get_mirror_role(perm, name, slug, ou=ou, update_name=update_name, - update_slug=update_slug) + update_slug=update_slug, + create=create) + + if not admin_role: + return None + permissions = set(permissions) permissions.add(perm) if self_administered: @@ -60,7 +77,7 @@ class RoleManager(BaseRoleManager): return admin_role def get_mirror_role(self, instance, name, slug, ou=None, - update_name=False, update_slug=False): + update_name=False, update_slug=False, create=True): '''Get or create a role which mirror another model, for example a permission. ''' @@ -70,14 +87,25 @@ class RoleManager(BaseRoleManager): kwargs['ou'] = ou or instance.ou else: kwargs['ou__isnull'] = True - role, created = self.prefetch_related('permissions').get_or_create( - admin_scope_ct=ct, - admin_scope_id=instance.pk, - defaults={ - 'name': name, - 'slug': slug, - }, - **kwargs) + if create: + role, created = self.prefetch_related('permissions').get_or_create( + admin_scope_ct=ct, + admin_scope_id=instance.pk, + defaults={ + 'name': name, + 'slug': slug, + }, + **kwargs) + else: + try: + role = self.prefetch_related('permissions').get( + admin_scope_ct=ct, + admin_scope_id=instance.pk, + **kwargs) + except self.model.DoesNotExist: + return None + created = False + if update_name and not created and role.name != name: role.name = name role.save() diff --git a/src/authentic2/a2_rbac/models.py b/src/authentic2/a2_rbac/models.py index 8b435e94..3c035a30 100644 --- a/src/authentic2/a2_rbac/models.py +++ b/src/authentic2/a2_rbac/models.py @@ -203,7 +203,7 @@ class Role(RoleAbstractBase): content_type_field='target_ct', object_id_field='target_id') - def get_admin_role(self, ou=None): + def get_admin_role(self, ou=None, create=True): from . import utils admin_role = self.__class__.objects.get_admin_role( self, ou=self.ou, @@ -212,7 +212,10 @@ class Role(RoleAbstractBase): slug='_a2-managers-of-role-{role}'.format( role=slugify(six.text_type(self))), permissions=(utils.get_view_user_perm(),), - self_administered=True) + self_administered=True, + update_name=True, + update_slug=True, + create=create) return admin_role def validate_unique(self, exclude=None): @@ -237,7 +240,9 @@ class Role(RoleAbstractBase): # Service roles can only be part of the same ou as the service if self.service: self.ou = self.service.ou - return super(Role, self).save(*args, **kwargs) + result = super(Role, self).save(*args, **kwargs) + self.get_admin_role(create=False) + return result def has_self_administration(self, op=CHANGE_OP): Permission = rbac_utils.get_permission_model() diff --git a/tests/test_a2_rbac.py b/tests/test_a2_rbac.py index cb6110da..f8e74682 100644 --- a/tests/test_a2_rbac.py +++ b/tests/test_a2_rbac.py @@ -69,14 +69,14 @@ def test_role_with_ou_export_json(db): ou = OU.objects.create(name='ou', slug='ou') role = Role.objects.create(name='some role', ou=ou) role_dict = role.export_json() - assert role_dict['ou'] == {'uuid': ou.uuid, 'slug': ou.slug, 'name': ou.name} + assert role_dict['ou'] == {'uuid': ou.uuid, 'slug': ou.slug, 'name': ou.name} def test_role_with_service_export_json(db): service = Service.objects.create(name='service name', slug='service-name') role = Role.objects.create(name='some role', service=service) role_dict = role.export_json() - assert role_dict['service'] == {'slug': service.slug, 'ou': None} + assert role_dict['service'] == {'slug': service.slug, 'ou': None} def test_role_with_service_with_ou_export_json(db): @@ -85,7 +85,7 @@ def test_role_with_service_with_ou_export_json(db): role = Role.objects.create(name='some role', service=service) role_dict = role.export_json() assert role_dict['service'] == { - 'slug': service.slug, 'ou': {'uuid': ou.uuid, 'slug': 'ou', 'name': 'ou'}} + 'slug': service.slug, 'ou': {'uuid': ou.uuid, 'slug': 'ou', 'name': 'ou'}} def test_role_with_attributes_export_json(db): @@ -218,6 +218,20 @@ def test_admin_cleanup(db): assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 0 +def test_admin_cleanup_bulk_delete(db): + r1 = Role.objects.create(name='r1') + + assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 0 + + r1.get_admin_role() + + assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 1 + + Role.objects.filter(name='r1').delete() + + assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 0 + + def test_admin_cleanup_failure(db): Role.objects.create( name='manager of r1', @@ -242,3 +256,43 @@ def test_admin_cleanup_command(db): call_command('cleanupauthentic') assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 0 + + +def test_role_rename(db): + r1 = Role.objects.create(name='r1') + assert r1.slug == 'r1' + + assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 0 + + assert not r1.get_admin_role(create=False) + + assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 0 + + ar1 = r1.get_admin_role() + + assert ar1 + assert ar1.name == 'Managers of role "r1"' + assert ar1.slug == '_a2-managers-of-role-r1' + assert Role.objects.filter(name__contains='r1', admin_scope_ct_id__isnull=False).count() == 1 + + assert ar1.name == 'Managers of role "r1"' + + Role.objects.filter(pk=r1.pk).update(name='r1bis') + + r1.refresh_from_db() + ar1.refresh_from_db() + + assert r1.name == 'r1bis' + assert ar1.name == 'Managers of role "r1"' + + ar1 = r1.get_admin_role(create=False) + + assert ar1.name == 'Managers of role "r1bis"' + assert ar1.slug == '_a2-managers-of-role-r1bis' + + r1.name = 'r1ter' + r1.save() + ar1.refresh_from_db() + + assert ar1.name == 'Managers of role "r1ter"' + assert ar1.slug == '_a2-managers-of-role-r1ter' -- 2.20.1