Projet

Général

Profil

0001-a2_rbac-do-not-break-unicity-when-get-or-creating-ad.patch

Paul Marillonnet, 12 mai 2020 08:53

Télécharger (9,22 ko)

Voir les différences:

Subject: [PATCH] a2_rbac: do not break unicity when get-or-creating admin role
 (#42178)

 src/authentic2/a2_rbac/management.py          |  8 ++--
 src/authentic2/a2_rbac/managers.py            | 45 ++++++++++---------
 .../migrations/0011_auto_20160209_1511.py     |  2 +-
 src/authentic2/a2_rbac/models.py              |  5 ++-
 tests/test_a2_rbac.py                         |  6 +--
 5 files changed, 36 insertions(+), 30 deletions(-)
src/authentic2/a2_rbac/management.py
138 138
            continue
139 139
        ct_admin_role = Role.objects.get_admin_role(instance=ct, name=name,
140 140
                                                    slug=slug,
141
                                                    update_name=True)
141
                                                    update_name=True,
142
                                                    update_slug=True,
143
                                                    create=True)
142 144
        if MANAGED_CT[ct_tuple].get('must_view_user'):
143 145
            ct_admin_role.permissions.add(view_user_perm)
144 146
        ct_admin_role.permissions.add(search_ou_perm)
......
155 157
        new_perm = admin_role.permissions.get(operation__slug=MANAGE_MEMBERS_OP.slug)
156 158
        admin_role.delete()
157 159
        role.admin_scope_id = new_perm.pk
158
        role.save()
160
        role.save(update_fields=['admin_scope_id'])
159 161
        role.permissions.remove(old_perm)
160 162
        role.permissions.add(new_perm)
161
        assert role.pk == administered_role.get_admin_role().pk
163
        assert role.pk == administered_role.get_admin_role(create=False).pk
src/authentic2/a2_rbac/managers.py
29 29

  
30 30
class RoleManager(BaseRoleManager):
31 31
    def get_admin_role(self, instance, name, slug, ou=None, operation=ADMIN_OP,
32
                       update_name=False, update_slug=False, permissions=(),
33
                       self_administered=False, create=True):
32
                       update_name=False, update_slug=False, update_ou=False,
33
                       permissions=(), self_administered=False, create=True):
34 34
        '''Get or create the role of manager's of this object instance'''
35 35
        kwargs = {}
36 36
        if ou or getattr(instance, 'ou', None):
......
41 41
        op = get_operation(operation)
42 42
        Permission = rbac_utils.get_permission_model()
43 43
        if create:
44
            perm, created = Permission.objects.get_or_create(
44
            perm, _ = Permission.objects.get_or_create(
45 45
                operation=op,
46 46
                target_ct=ContentType.objects.get_for_model(instance),
47 47
                target_id=instance.pk,
......
55 55
                    **kwargs)
56 56
            except Permission.DoesNotExist:
57 57
                return None
58
            created = False
59 58

  
60 59
        admin_role = self.get_mirror_role(perm, name, slug, ou=ou,
61 60
                                          update_name=update_name,
62 61
                                          update_slug=update_slug,
62
                                          update_ou=update_ou,
63 63
                                          create=create)
64 64

  
65 65
        if not admin_role:
......
75 75
        return admin_role
76 76

  
77 77
    def get_mirror_role(self, instance, name, slug, ou=None,
78
                        update_name=False, update_slug=False, create=True):
79
        '''Get or create a role which mirror another model, for example a
78
                        update_name=False, update_slug=False, update_ou=False,
79
                        create=True):
80
        '''Get or create a role which mirrors another model, for example a
80 81
           permission.
81 82
        '''
82 83
        ct = ContentType.objects.get_for_model(instance)
84
        update_fields = {}
83 85
        kwargs = {}
84
        if ou or getattr(instance, 'ou', None):
85
            kwargs['ou'] = ou or instance.ou
86
        ou = ou or getattr(instance, 'ou', None)
87
        if ou:
88
            if update_ou:
89
                update_fields['ou'] = ou
90
            else:
91
                kwargs['ou'] = ou
86 92
        else:
87 93
            kwargs['ou__isnull'] = True
94
        if update_name:
95
            update_fields['name'] = name
96
        if update_slug:
97
            update_fields['slug'] = slug
98

  
88 99
        if create:
89
            role, created = self.prefetch_related('permissions').get_or_create(
100
            role, _ = self.prefetch_related('permissions').update_or_create(
90 101
                admin_scope_ct=ct,
91 102
                admin_scope_id=instance.pk,
92
                defaults={
93
                    'name': name,
94
                    'slug': slug,
95
                },
103
                defaults=update_fields,
96 104
                **kwargs)
97 105
        else:
98 106
            try:
......
102 110
                    **kwargs)
103 111
            except self.model.DoesNotExist:
104 112
                return None
105
            created = False
106

  
107
        if update_name and not created and role.name != name:
108
            role.name = name
109
            role.save()
110
        if update_slug and not created and role.slug != slug:
111
            role.slug = slug
112
            role.save()
113
            for field, value in update_fields.items():
114
                setattr(role, field, value)
115
            role.save(update_fields=update_fields)
113 116
        return role
114 117

  
115 118
    def get_by_natural_key(self, slug, ou_natural_key, service_natural_key):
src/authentic2/a2_rbac/migrations/0011_auto_20160209_1511.py
13 13
    operations = [
14 14
        migrations.AlterUniqueTogether(
15 15
            name='role',
16
            unique_together=set([('admin_scope_ct', 'admin_scope_id')]),
16
            unique_together=set([('admin_scope_ct', 'admin_scope_id', 'ou')]),
17 17
        ),
18 18
    ]
src/authentic2/a2_rbac/models.py
151 151
        slug = '_a2-managers-of-{ou.slug}'.format(ou=self)
152 152
        return Role.objects.get_admin_role(
153 153
            instance=self, name=name, slug=slug, operation=VIEW_OP,
154
            update_name=True, update_slug=True)
154
            update_name=True, update_slug=True, create=True)
155 155

  
156 156
    def delete(self, *args, **kwargs):
157 157
        Permission.objects.filter(ou=self).delete()
......
245 245
            self_administered=True,
246 246
            update_name=True,
247 247
            update_slug=True,
248
            update_ou=True,
248 249
            create=create,
249 250
            operation=MANAGE_MEMBERS_OP)
250 251
        return admin_role
......
312 313
        verbose_name_plural = _('roles')
313 314
        ordering = ('ou', 'service', 'name',)
314 315
        unique_together = (
315
            ('admin_scope_ct', 'admin_scope_id'),
316
            ('admin_scope_ct', 'admin_scope_id', 'ou'),
316 317
        )
317 318

  
318 319
    def natural_key(self):
tests/test_a2_rbac.py
108 108
    user = User.objects.create(username='john.doe')
109 109
    name1 = 'Can manage john.doe'
110 110
    slug1 = 'can-manage-john-doe'
111
    admin_role1 = Role.objects.get_admin_role(user, name1, slug1)
111
    admin_role1 = Role.objects.get_admin_role(user, name1, slug1, update_name=True, update_slug=True)
112 112
    assert admin_role1.name == name1
113 113
    assert admin_role1.slug == slug1
114 114
    name2 = 'Should manage john.doe'
......
403 403
    assert ar1.slug == '_a2-managers-of-role-r1ter'
404 404

  
405 405

  
406
def test_admin_role_user_view(settings, app, admin, simple_user, ou1, user_ou1, role_ou1):
406
def test_admin_role_user_view(db, settings, app, admin, simple_user, ou1, user_ou1, role_ou1):
407 407
    role_ou1.get_admin_role().members.add(simple_user)
408 408

  
409 409
    # Default: all users are visible
......
499 499
@pytest.mark.parametrize(
500 500
    'alert,deletion', [(-1, 31), (31, -1), (0, 31), (31, 0), (None, 31), (31, None), (32, 31)]
501 501
)
502
def test_unused_account_settings_validation(ou1, alert, deletion):
502
def test_unused_account_settings_validation(db, ou1, alert, deletion):
503 503
    ou1.clean_unused_accounts_alert = alert
504 504
    ou1.clean_unused_accounts_deletion = deletion
505 505
    with pytest.raises(ValidationError):
506
-