Projet

Général

Profil

0001-a2_rbac-do-not-break-unicity-when-get-or-creating-ad.patch

Benjamin Dauvergne, 15 octobre 2020 14:49

Télécharger (10,4 ko)

Voir les différences:

Subject: [PATCH 1/3] a2_rbac: do not break unicity when get-or-creating admin
 role (#42178)

 src/authentic2/a2_rbac/management.py | 33 ++++++++------
 src/authentic2/a2_rbac/managers.py   | 64 +++++++++++++++++-----------
 src/authentic2/a2_rbac/models.py     |  4 +-
 tests/test_a2_rbac.py                |  6 +--
 4 files changed, 66 insertions(+), 41 deletions(-)
src/authentic2/a2_rbac/management.py
20 20
from django.contrib.contenttypes.models import ContentType
21 21

  
22 22
from django_rbac.models import ADMIN_OP
23
from django_rbac.utils import get_role_model, get_ou_model
23
from django_rbac.utils import get_ou_model
24
from django_rbac.utils import get_permission_model
25
from django_rbac.utils import get_role_model
24 26

  
25 27
from ..utils import get_fk_model
26 28
from . import utils, app_settings
......
140 142
            continue
141 143
        ct_admin_role = Role.objects.get_admin_role(instance=ct, name=name,
142 144
                                                    slug=slug,
143
                                                    update_name=True)
145
                                                    update_name=True,
146
                                                    update_slug=True,
147
                                                    create=True)
144 148
        if MANAGED_CT[ct_tuple].get('must_view_user'):
145 149
            ct_admin_role.permissions.add(view_user_perm)
146 150
        if MANAGED_CT[ct_tuple].get('must_manage_authorizations_user'):
......
150 154

  
151 155

  
152 156
def update_user_admin_roles_permission():
153
    roles = get_role_model().objects.filter(slug__startswith='_a2-managers-of-role',
154
                                            permissions__operation__slug=ADMIN_OP.slug)
155
    for role in roles:
156
        old_perm = role.permissions.get(operation__slug=ADMIN_OP.slug)
157
        administered_role = old_perm.target
157
    role_ct = ContentType.objects.get_for_model(get_role_model())
158
    permissions = get_permission_model().objects.filter(target_ct=role_ct, operation__slug=ADMIN_OP.slug)
159
    for perm in permissions:
160
        administered_role = perm.target
158 161
        admin_role = administered_role.get_admin_role()
159
        new_perm = admin_role.permissions.get(operation__slug=MANAGE_MEMBERS_OP.slug,
160
                                              target_id=administered_role.pk)
162
        assert admin_role.slug.startswith('_a2-managers-of-role')
163
        new_perm = admin_role.permissions.get(
164
            operation__slug=MANAGE_MEMBERS_OP.slug,
165
            target_ct=role_ct,
166
            target_id=administered_role.id)
167
        assert new_perm.ou is None
161 168
        admin_role.delete()
169
        assert len(perm.roles.all()) == 1
170
        role = perm.roles.first()
162 171
        role.admin_scope_id = new_perm.pk
163
        role.save()
164
        role.permissions.remove(old_perm)
172
        role.save(update_fields=['admin_scope_id'])
173
        role.permissions.remove(perm)
165 174
        role.permissions.add(new_perm)
166
        assert role.pk == administered_role.get_admin_role().pk
175
        assert role.pk == administered_role.get_admin_role(create=False).pk
src/authentic2/a2_rbac/managers.py
33 33
                       self_administered=False, create=True):
34 34
        '''Get or create the role of manager's of this object instance'''
35 35
        kwargs = {}
36
        if ou or getattr(instance, 'ou', None):
37
            ou = kwargs['ou'] = ou or instance.ou
38
        else:
39
            kwargs['ou__isnull'] = True
36
        assert not ou or isinstance(instance, ContentType), (
37
            'get_admin_role(ou=...) can only be used with ContentType instances: %s %s %s' % (name, ou, instance)
38
        )
39

  
40
        # Does the permission need to be scoped by ou ? Yes if the target is a
41
        # ContentType and ou is given. It's a general administration right upon
42
        # all instance of a ContentType, eventually scoped to the given ou.
43
        defaults = {}
44
        if isinstance(instance, ContentType):
45
            if ou:
46
                kwargs['ou'] = ou
47
            else:
48
                kwargs['ou__isnull'] = True
49
        else:  # for non ContentType instances, OU must be set to NULL, always.
50
            defaults['ou'] = None
40 51
        # find an operation matching the template
41 52
        op = get_operation(operation)
42 53
        Permission = rbac_utils.get_permission_model()
43 54
        if create:
44
            perm, created = Permission.objects.get_or_create(
55
            perm, _ = Permission.objects.update_or_create(
45 56
                operation=op,
46 57
                target_ct=ContentType.objects.get_for_model(instance),
47 58
                target_id=instance.pk,
59
                defaults=defaults,
48 60
                **kwargs)
49 61
        else:
50 62
            try:
......
55 67
                    **kwargs)
56 68
            except Permission.DoesNotExist:
57 69
                return None
58
            created = False
59 70

  
60
        admin_role = self.get_mirror_role(perm, name, slug, ou=ou,
71
        # in which ou do we put the role ?
72
        if ou:
73
            mirror_role_ou = ou
74
        elif getattr(instance, 'ou', None):
75
            mirror_role_ou = instance.ou
76
        else:
77
            mirror_role_ou = None
78
        admin_role = self.get_mirror_role(perm, name, slug, ou=mirror_role_ou,
61 79
                                          update_name=update_name,
62 80
                                          update_slug=update_slug,
63 81
                                          create=create)
......
76 94

  
77 95
    def get_mirror_role(self, instance, name, slug, ou=None,
78 96
                        update_name=False, update_slug=False, create=True):
79
        '''Get or create a role which mirror another model, for example a
97
        '''Get or create a role which mirrors another model, for example a
80 98
           permission.
81 99
        '''
82 100
        ct = ContentType.objects.get_for_model(instance)
101
        update_fields = {}
83 102
        kwargs = {}
84
        if ou or getattr(instance, 'ou', None):
85
            kwargs['ou'] = ou or instance.ou
103
        if ou:
104
            update_fields['ou'] = ou
86 105
        else:
87
            kwargs['ou__isnull'] = True
106
            update_fields['ou'] = None
107
        if update_name:
108
            update_fields['name'] = name
109
        if update_slug:
110
            update_fields['slug'] = slug
111

  
88 112
        if create:
89
            role, created = self.prefetch_related('permissions').get_or_create(
113
            role, _ = self.prefetch_related('permissions').update_or_create(
90 114
                admin_scope_ct=ct,
91 115
                admin_scope_id=instance.pk,
92
                defaults={
93
                    'name': name,
94
                    'slug': slug,
95
                },
116
                defaults=update_fields,
96 117
                **kwargs)
97 118
        else:
98 119
            try:
......
102 123
                    **kwargs)
103 124
            except self.model.DoesNotExist:
104 125
                return None
105
            created = False
106

  
107
        if update_name and not created and role.name != name:
108
            role.name = name
109
            role.save()
110
        if update_slug and not created and role.slug != slug:
111
            role.slug = slug
112
            role.save()
126
            for field, value in update_fields.items():
127
                setattr(role, field, value)
128
            role.save(update_fields=update_fields)
113 129
        return role
114 130

  
115 131
    def get_by_natural_key(self, slug, ou_natural_key, service_natural_key):
src/authentic2/a2_rbac/models.py
150 150
        slug = '_a2-managers-of-{ou.slug}'.format(ou=self)
151 151
        return Role.objects.get_admin_role(
152 152
            instance=self, name=name, slug=slug, operation=VIEW_OP,
153
            update_name=True, update_slug=True)
153
            update_name=True, update_slug=True, create=True)
154 154

  
155 155
    def delete(self, *args, **kwargs):
156 156
        Permission.objects.filter(ou=self).delete()
......
239 239
            view_user_perm = utils.get_view_user_perm()
240 240

  
241 241
        admin_role = self.__class__.objects.get_admin_role(
242
            self, ou=self.ou,
242
            self,
243 243
            name=_('Managers of role "{role}"').format(
244 244
                role=six.text_type(self)),
245 245
            slug='_a2-managers-of-role-{role}'.format(
tests/test_a2_rbac.py
110 110
    user = User.objects.create(username='john.doe')
111 111
    name1 = 'Can manage john.doe'
112 112
    slug1 = 'can-manage-john-doe'
113
    admin_role1 = Role.objects.get_admin_role(user, name1, slug1)
113
    admin_role1 = Role.objects.get_admin_role(user, name1, slug1, update_name=True, update_slug=True)
114 114
    assert admin_role1.name == name1
115 115
    assert admin_role1.slug == slug1
116 116
    name2 = 'Should manage john.doe'
......
405 405
    assert ar1.slug == '_a2-managers-of-role-r1ter'
406 406

  
407 407

  
408
def test_admin_role_user_view(settings, app, admin, simple_user, ou1, user_ou1, role_ou1):
408
def test_admin_role_user_view(db, settings, app, admin, simple_user, ou1, user_ou1, role_ou1):
409 409
    role_ou1.get_admin_role().members.add(simple_user)
410 410

  
411 411
    # Default: all users are visible
......
495 495
@pytest.mark.parametrize(
496 496
    'alert,deletion', [(-1, 31), (31, -1), (0, 31), (31, 0), (None, 31), (31, None), (32, 31)]
497 497
)
498
def test_unused_account_settings_validation(ou1, alert, deletion):
498
def test_unused_account_settings_validation(db, ou1, alert, deletion):
499 499
    ou1.clean_unused_accounts_alert = alert
500 500
    ou1.clean_unused_accounts_deletion = deletion
501 501
    with pytest.raises(ValidationError):
502
-