0004-django_rbac-remove-utils-70894.patch
src/authentic2/a2_rbac/apps.py | ||
---|---|---|
25 | 25 |
from django.db.models.signals import post_delete, post_migrate, post_save |
26 | 26 | |
27 | 27 |
from authentic2.models import Service |
28 |
from django_rbac import utils |
|
29 | 28 | |
30 | 29 |
from . import models, signal_handlers, signals |
31 | 30 | |
32 | 31 |
# update role parenting when new role parenting is created |
33 |
post_save.connect(signal_handlers.role_parenting_post_save, sender=utils.get_role_parenting_model())
|
|
32 |
post_save.connect(signal_handlers.role_parenting_post_save, sender=models.RoleParenting)
|
|
34 | 33 |
# update role parenting when role parenting is deleted |
35 |
post_delete.connect( |
|
36 |
signal_handlers.role_parenting_post_delete, sender=utils.get_role_parenting_model() |
|
37 |
) |
|
34 |
post_delete.connect(signal_handlers.role_parenting_post_delete, sender=models.RoleParenting) |
|
38 | 35 |
# or soft-created |
39 | 36 |
signals.post_soft_create.connect( |
40 |
signal_handlers.role_parenting_post_soft_delete, sender=utils.get_role_parenting_model()
|
|
37 |
signal_handlers.role_parenting_post_soft_delete, sender=models.RoleParenting
|
|
41 | 38 |
) |
42 | 39 |
# or soft-deleted |
43 | 40 |
signals.post_soft_delete.connect( |
44 |
signal_handlers.role_parenting_post_soft_delete, sender=utils.get_role_parenting_model()
|
|
41 |
signal_handlers.role_parenting_post_soft_delete, sender=models.RoleParenting
|
|
45 | 42 |
) |
46 | 43 |
# create CRUD operations and admin |
47 | 44 |
post_migrate.connect(signal_handlers.create_base_operations, sender=self) |
src/authentic2/a2_rbac/managers.py | ||
---|---|---|
25 | 25 |
from django.db.models.query import Prefetch, Q |
26 | 26 |
from django.db.transaction import atomic |
27 | 27 | |
28 |
from django_rbac import utils |
|
29 |
from django_rbac.utils import get_operation |
|
30 | ||
31 | 28 |
from . import models as a2_models |
32 | 29 |
from . import signals |
30 |
from .utils import get_operation |
|
33 | 31 | |
34 | 32 | |
35 | 33 |
class AbstractBaseManager(models.Manager): |
... | ... | |
55 | 53 |
target_query = query.Q(target_ct=ContentType.objects.get_for_model(ContentType), target_id=ct.pk) |
56 | 54 |
if isinstance(object_or_model, models.Model): |
57 | 55 |
target_query |= query.Q(target_ct=ct, target_id=object.pk) |
58 |
Permission = utils.get_permission_model() |
|
59 |
qs = Permission.objects.for_user(user) |
|
56 |
qs = a2_models.Permission.objects.for_user(user) |
|
60 | 57 |
qs = qs.filter(operation__slug=operation_slug) |
61 | 58 |
qs = qs.filter(ou_query & target_query) |
62 | 59 |
return qs.exists() |
... | ... | |
66 | 63 |
def get_by_natural_key(self, operation_slug, ou_nk, target_ct, target_nk): |
67 | 64 |
qs = self.filter(operation__slug=operation_slug) |
68 | 65 |
if ou_nk: |
69 |
OrganizationalUnit = utils.get_ou_model() |
|
70 | 66 |
try: |
71 |
ou = OrganizationalUnit.objects.get_by_natural_key(*ou_nk) |
|
72 |
except OrganizationalUnit.DoesNotExist: |
|
67 |
ou = a2_models.OrganizationalUnit.objects.get_by_natural_key(*ou_nk)
|
|
68 |
except a2_models.OrganizationalUnit.DoesNotExist:
|
|
73 | 69 |
raise self.model.DoesNotExist |
74 | 70 |
qs = qs.filter(ou=ou) |
75 | 71 |
else: |
... | ... | |
102 | 98 |
"""Retrieve all permissions hold by an user through its role and |
103 | 99 |
inherited roles. |
104 | 100 |
""" |
105 |
Role = utils.get_role_model() |
|
106 |
roles = Role.objects.for_user(user=user) |
|
101 |
roles = a2_models.Role.objects.for_user(user=user) |
|
107 | 102 |
return self.filter(roles__in=roles) |
108 | 103 | |
109 | 104 |
def cleanup(self): |
... | ... | |
207 | 202 |
tls = Local() |
208 | 203 | |
209 | 204 |
def get_by_natural_key(self, parent_nk, child_nk, direct): |
210 |
Role = utils.get_role_model() |
|
211 | 205 |
try: |
212 |
parent = Role.objects.get_by_natural_key(*parent_nk) |
|
213 |
except Role.DoesNotExist: |
|
206 |
parent = a2_models.Role.objects.get_by_natural_key(*parent_nk)
|
|
207 |
except a2_models.Role.DoesNotExist:
|
|
214 | 208 |
raise self.model.DoesNotExist |
215 | 209 |
try: |
216 |
child = Role.objects.get_by_natural_key(*child_nk) |
|
217 |
except Role.DoesNotExist: |
|
210 |
child = a2_models.Role.objects.get_by_natural_key(*child_nk)
|
|
211 |
except a2_models.Role.DoesNotExist:
|
|
218 | 212 |
raise self.model.DoesNotExist |
219 | 213 |
return self.get(parent=parent, child=child, direct=direct) |
220 | 214 | |
... | ... | |
298 | 292 | |
299 | 293 |
@contextlib.contextmanager |
300 | 294 |
def defer_update_transitive_closure(): |
301 |
from . import utils |
|
302 | ||
303 | 295 |
RoleParentingManager.tls.DO_UPDATE_CLOSURE = False |
304 | 296 |
try: |
305 | 297 |
yield |
306 | 298 |
if RoleParentingManager.tls.CLOSURE_UPDATED: |
307 |
utils.get_role_parenting_model().objects.update_transitive_closure()
|
|
299 |
a2_models.RoleParenting.objects.update_transitive_closure()
|
|
308 | 300 |
finally: |
309 | 301 |
RoleParentingManager.tls.DO_UPDATE_CLOSURE = True |
310 | 302 |
RoleParentingManager.tls.CLOSURE_UPDATED = False |
src/authentic2/a2_rbac/migrations/0021_auto_20200317_1514.py | ||
---|---|---|
2 | 2 | |
3 | 3 |
from django.db import migrations, models |
4 | 4 | |
5 |
import django_rbac.utils
|
|
5 |
import authentic2.a2_rbac.utils
|
|
6 | 6 | |
7 | 7 | |
8 | 8 |
class Migration(migrations.Migration): |
... | ... | |
16 | 16 |
model_name='organizationalunit', |
17 | 17 |
name='uuid', |
18 | 18 |
field=models.CharField( |
19 |
default=django_rbac.utils.get_hex_uuid, max_length=32, unique=True, verbose_name='uuid'
|
|
19 |
default=authentic2.a2_rbac.utils.get_hex_uuid, max_length=32, unique=True, verbose_name='uuid'
|
|
20 | 20 |
), |
21 | 21 |
), |
22 | 22 |
migrations.AlterField( |
23 | 23 |
model_name='role', |
24 | 24 |
name='uuid', |
25 | 25 |
field=models.CharField( |
26 |
default=django_rbac.utils.get_hex_uuid, max_length=32, unique=True, verbose_name='uuid'
|
|
26 |
default=authentic2.a2_rbac.utils.get_hex_uuid, max_length=32, unique=True, verbose_name='uuid'
|
|
27 | 27 |
), |
28 | 28 |
), |
29 | 29 |
] |
src/authentic2/a2_rbac/models.py | ||
---|---|---|
37 | 37 |
from authentic2.decorators import errorcollector |
38 | 38 |
from authentic2.utils.cache import GlobalCache |
39 | 39 |
from authentic2.validators import HexaColourValidator |
40 |
from django_rbac import utils as rbac_utils |
|
41 | 40 | |
42 |
from . import app_settings, fields, managers |
|
41 |
from . import app_settings, fields, managers, utils
|
|
43 | 42 | |
44 | 43 | |
45 | 44 |
class AbstractBase(models.Model): |
... | ... | |
47 | 46 |
slug |
48 | 47 |
""" |
49 | 48 | |
50 |
uuid = models.CharField( |
|
51 |
max_length=32, verbose_name=_('uuid'), unique=True, default=rbac_utils.get_hex_uuid |
|
52 |
) |
|
49 |
uuid = models.CharField(max_length=32, verbose_name=_('uuid'), unique=True, default=utils.get_hex_uuid) |
|
53 | 50 |
name = models.CharField(max_length=256, verbose_name=_('name')) |
54 | 51 |
slug = models.SlugField(max_length=256, verbose_name=_('slug')) |
55 | 52 |
description = models.TextField(verbose_name=_('description'), blank=True) |
... | ... | |
65 | 62 |
def save(self, *args, **kwargs): |
66 | 63 |
# truncate slug and add a hash if it's too long |
67 | 64 |
if not self.slug: |
68 |
self.slug = rbac_utils.generate_slug(self.name)
|
|
65 |
self.slug = utils.generate_slug(self.name) |
|
69 | 66 |
if len(self.slug) > 256: |
70 | 67 |
self.slug = self.slug[:252] + hashlib.md5(self.slug).hexdigest()[:4] |
71 | 68 |
if not self.uuid: |
72 |
self.uuid = rbac_utils.get_hex_uuid()
|
|
69 |
self.uuid = utils.get_hex_uuid() |
|
73 | 70 |
return super().save(*args, **kwargs) |
74 | 71 | |
75 | 72 |
def natural_key(self): |
... | ... | |
269 | 266 |
to='a2_rbac.Operation', verbose_name=_('operation'), on_delete=models.CASCADE |
270 | 267 |
) |
271 | 268 |
ou = models.ForeignKey( |
272 |
to=rbac_utils.get_ou_model_name(),
|
|
269 |
to=OrganizationalUnit,
|
|
273 | 270 |
verbose_name=_('organizational unit'), |
274 | 271 |
related_name='scoped_permission', |
275 | 272 |
null=True, |
... | ... | |
370 | 367 | |
371 | 368 |
class Role(AbstractBase): |
372 | 369 |
ou = models.ForeignKey( |
373 |
to=rbac_utils.get_ou_model_name(),
|
|
370 |
to=OrganizationalUnit,
|
|
374 | 371 |
verbose_name=_('organizational unit'), |
375 | 372 |
swappable=True, |
376 | 373 |
blank=True, |
... | ... | |
380 | 377 |
members = models.ManyToManyField( |
381 | 378 |
to=settings.AUTH_USER_MODEL, swappable=True, blank=True, related_name='roles' |
382 | 379 |
) |
383 |
permissions = models.ManyToManyField( |
|
384 |
to=rbac_utils.get_permission_model_name(), related_name='roles', blank=True |
|
385 |
) |
|
380 |
permissions = models.ManyToManyField(to=Permission, related_name='roles', blank=True) |
|
386 | 381 |
name = models.TextField(verbose_name=_('name')) |
387 | 382 |
admin_scope_ct = models.ForeignKey( |
388 | 383 |
to='contenttypes.ContentType', |
... | ... | |
414 | 409 |
objects = managers.RoleQuerySet.as_manager() |
415 | 410 | |
416 | 411 |
def add_child(self, child): |
417 |
RoleParenting = rbac_utils.get_role_parenting_model() |
|
418 | 412 |
RoleParenting.objects.soft_create(self, child) |
419 | 413 | |
420 | 414 |
def remove_child(self, child): |
421 |
RoleParenting = rbac_utils.get_role_parenting_model() |
|
422 | 415 |
RoleParenting.objects.soft_delete(self, child) |
423 | 416 | |
424 | 417 |
def add_parent(self, parent): |
425 |
RoleParenting = rbac_utils.get_role_parenting_model() |
|
426 | 418 |
RoleParenting.objects.soft_create(parent, self) |
427 | 419 | |
428 | 420 |
def remove_parent(self, parent): |
429 |
RoleParenting = rbac_utils.get_role_parenting_model() |
|
430 | 421 |
RoleParenting.objects.soft_delete(parent, self) |
431 | 422 | |
432 | 423 |
def parents(self, include_self=True, annotate=False, direct=None): |
... | ... | |
518 | 509 |
def has_self_administration(self, op=None): |
519 | 510 |
if not op: |
520 | 511 |
op = MANAGE_MEMBERS_OP |
521 |
operation = rbac_utils.get_operation(op)
|
|
512 |
operation = utils.get_operation(op) |
|
522 | 513 |
self_perm, dummy = Permission.objects.get_or_create( |
523 | 514 |
operation=operation, |
524 | 515 |
target_ct=ContentType.objects.get_for_model(self), |
... | ... | |
531 | 522 |
'Add permission to role so that it is self-administered' |
532 | 523 |
if not op: |
533 | 524 |
op = MANAGE_MEMBERS_OP |
534 |
operation = rbac_utils.get_operation(op)
|
|
525 |
operation = utils.get_operation(op) |
|
535 | 526 |
self_perm, dummy = Permission.objects.get_or_create( |
536 | 527 |
operation=operation, target_ct=ContentType.objects.get_for_model(self), target_id=self.pk |
537 | 528 |
) |
... | ... | |
553 | 544 |
if isinstance(operation_tpl, str): |
554 | 545 |
operation = Operation.objects.get(slug=operation_tpl) |
555 | 546 |
else: |
556 |
operation = rbac_utils.get_operation(operation_tpl)
|
|
547 |
operation = utils.get_operation(operation_tpl) |
|
557 | 548 |
permission, _ = Permission.objects.get_or_create( |
558 | 549 |
operation=operation, target_ct=target_ct, target_id=target_id, ou=ou |
559 | 550 |
) |
... | ... | |
571 | 562 |
if isinstance(operation_tpl, str): |
572 | 563 |
operation = Operation.objects.get(slug=operation_tpl) |
573 | 564 |
else: |
574 |
operation = rbac_utils.get_operation(operation_tpl)
|
|
565 |
operation = utils.get_operation(operation_tpl) |
|
575 | 566 |
qs = Permission.objects.filter(target_ct=target_ct, target_id=target_id, operation=operation) |
576 | 567 |
if ou: |
577 | 568 |
qs = qs.filter(ou=ou) |
... | ... | |
704 | 695 | |
705 | 696 |
class RoleParenting(models.Model): |
706 | 697 |
parent = models.ForeignKey( |
707 |
to=rbac_utils.get_role_model_name(),
|
|
698 |
to=Role,
|
|
708 | 699 |
swappable=True, |
709 | 700 |
related_name='child_relation', |
710 | 701 |
on_delete=models.CASCADE, |
711 | 702 |
) |
712 | 703 |
child = models.ForeignKey( |
713 |
to=rbac_utils.get_role_model_name(),
|
|
704 |
to=Role,
|
|
714 | 705 |
swappable=True, |
715 | 706 |
related_name='parent_relation', |
716 | 707 |
on_delete=models.CASCADE, |
src/authentic2/a2_rbac/signal_handlers.py | ||
---|---|---|
20 | 20 |
from django.utils.translation import gettext as _ |
21 | 21 |
from django.utils.translation import override |
22 | 22 | |
23 |
from authentic2.a2_rbac.models import OrganizationalUnit, Role |
|
23 |
from authentic2.a2_rbac.models import OrganizationalUnit, Role, RoleParenting
|
|
24 | 24 |
from authentic2.utils.misc import get_fk_model |
25 |
from django_rbac.utils import get_operation, get_role_parenting_model |
|
26 | 25 | |
27 | 26 |
from .managers import defer_update_transitive_closure |
27 |
from .utils import get_operation |
|
28 | 28 | |
29 | 29 | |
30 | 30 |
def create_default_ou(app_config, verbosity=2, interactive=True, using=DEFAULT_DB_ALIAS, **kwargs): |
... | ... | |
144 | 144 | |
145 | 145 |
def fix_role_parenting_closure(app_config, verbosity=2, interactive=True, using=DEFAULT_DB_ALIAS, **kwargs): |
146 | 146 |
'''Close the role parenting relation after migrations''' |
147 |
if not router.allow_migrate(using, get_role_parenting_model()):
|
|
147 |
if not router.allow_migrate(using, RoleParenting):
|
|
148 | 148 |
return |
149 |
get_role_parenting_model().objects.update_transitive_closure() |
|
149 |
RoleParenting.objects.update_transitive_closure() |
src/authentic2/a2_rbac/utils.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import uuid |
|
18 | ||
17 | 19 |
from django.contrib.auth import get_user_model |
18 | 20 |
from django.contrib.contenttypes.models import ContentType |
19 | 21 |
from django.utils.text import slugify |
20 | 22 | |
21 |
from django_rbac import utils as rbac_utils |
|
22 | ||
23 | 23 |
from . import models |
24 | 24 | |
25 | 25 | |
26 |
def get_hex_uuid(): |
|
27 |
return uuid.uuid4().hex |
|
28 | ||
29 | ||
30 |
def get_operation(operation_tpl): |
|
31 |
operation, dummy = models.Operation.objects.get_or_create(slug=operation_tpl.slug) |
|
32 |
return operation |
|
33 | ||
34 | ||
26 | 35 |
def get_default_ou(): |
27 | 36 |
try: |
28 | 37 |
return models.OrganizationalUnit.objects.get(default=True) |
... | ... | |
37 | 46 |
def get_view_user_perm(ou=None): |
38 | 47 |
User = get_user_model() |
39 | 48 |
view_user_perm, dummy = models.Permission.objects.get_or_create( |
40 |
operation=rbac_utils.get_operation(models.VIEW_OP),
|
|
49 |
operation=get_operation(models.VIEW_OP), |
|
41 | 50 |
target_ct=ContentType.objects.get_for_model(ContentType), |
42 | 51 |
target_id=ContentType.objects.get_for_model(User).pk, |
43 | 52 |
ou__isnull=ou is None, |
... | ... | |
49 | 58 |
def get_search_ou_perm(ou=None): |
50 | 59 |
if ou: |
51 | 60 |
view_ou_perm, dummy = models.Permission.objects.get_or_create( |
52 |
operation=rbac_utils.get_operation(models.SEARCH_OP),
|
|
61 |
operation=get_operation(models.SEARCH_OP), |
|
53 | 62 |
target_ct=ContentType.objects.get_for_model(ou), |
54 | 63 |
target_id=ou.pk, |
55 | 64 |
ou__isnull=True, |
56 | 65 |
) |
57 | 66 |
else: |
58 | 67 |
view_ou_perm, dummy = models.Permission.objects.get_or_create( |
59 |
operation=rbac_utils.get_operation(models.SEARCH_OP),
|
|
68 |
operation=get_operation(models.SEARCH_OP), |
|
60 | 69 |
target_ct=ContentType.objects.get_for_model(ContentType), |
61 | 70 |
target_id=ContentType.objects.get_for_model(models.OrganizationalUnit).pk, |
62 | 71 |
ou__isnull=True, |
... | ... | |
67 | 76 |
def get_manage_authorizations_user_perm(ou=None): |
68 | 77 |
User = get_user_model() |
69 | 78 |
manage_authorizations_user_perm, dummy = models.Permission.objects.get_or_create( |
70 |
operation=rbac_utils.get_operation(models.MANAGE_AUTHORIZATIONS_OP),
|
|
79 |
operation=get_operation(models.MANAGE_AUTHORIZATIONS_OP), |
|
71 | 80 |
target_ct=ContentType.objects.get_for_model(ContentType), |
72 | 81 |
target_id=ContentType.objects.get_for_model(User).pk, |
73 | 82 |
ou__isnull=ou is None, |
src/authentic2/custom_user/backends.py | ||
---|---|---|
8 | 8 |
from django.db import models |
9 | 9 |
from django.db.models.query import Q |
10 | 10 | |
11 |
from django_rbac import utils |
|
11 |
from authentic2.a2_rbac.models import OrganizationalUnit as OU |
|
12 |
from authentic2.a2_rbac.models import Permission |
|
12 | 13 | |
13 | 14 | |
14 | 15 |
def get_fk_model(model, fieldname): |
... | ... | |
83 | 84 |
""" |
84 | 85 |
if not hasattr(user_obj, '_rbac_perms_cache'): |
85 | 86 |
perms_cache = {} |
86 |
Permission = utils.get_permission_model() |
|
87 | 87 |
qs = Permission.objects.for_user(user_obj) |
88 | 88 |
ct_ct = ContentType.objects.get_for_model(ContentType) |
89 | 89 |
qs = qs.select_related('operation') |
... | ... | |
236 | 236 |
perm_or_perms = set(perm_or_perms) |
237 | 237 |
cache = self.get_permission_cache(user_obj) |
238 | 238 |
model = qs.model |
239 |
OU = utils.get_ou_model() |
|
240 | 239 |
has_ou_field = get_fk_model(model, 'ou') == OU |
241 | 240 |
if perm_or_perms & cache.get('__all__', set()): |
242 | 241 |
return True |
... | ... | |
282 | 281 |
return perm in self.get_permission_cache(user_obj).get('ou.%s' % ou.pk, ()) |
283 | 282 | |
284 | 283 |
def ous_with_perm(self, user_obj, perm, queryset=None): |
285 |
OU = utils.get_ou_model() |
|
286 | 284 |
qs = queryset or OU.objects.all() |
287 | 285 | |
288 | 286 |
if user_obj.is_anonymous: |
src/authentic2/management/commands/check-and-repair.py | ||
---|---|---|
32 | 32 |
from authentic2.a2_rbac.models import ADMIN_OP |
33 | 33 |
from authentic2.a2_rbac.models import OrganizationalUnit as OU |
34 | 34 |
from authentic2.a2_rbac.models import Permission, Role |
35 |
from authentic2.a2_rbac.utils import get_operation |
|
35 | 36 |
from authentic2.custom_user.models import User |
36 |
from django_rbac.utils import get_operation |
|
37 | 37 | |
38 | 38 |
try: |
39 | 39 |
from authentic2.a2_rbac.models import MANAGE_MEMBERS_OP # pylint: disable=C0412 |
src/django_rbac/migrations/0002_organizationalunit_permission_role_roleparenting.py | ||
---|---|---|
1 | 1 |
from django.conf import settings |
2 | 2 |
from django.db import migrations, models |
3 | 3 | |
4 |
import django_rbac
|
|
4 |
import authentic2.a2_rbac
|
|
5 | 5 | |
6 | 6 | |
7 | 7 |
class Migration(migrations.Migration): |
... | ... | |
27 | 27 |
( |
28 | 28 |
'uuid', |
29 | 29 |
models.CharField( |
30 |
default=django_rbac.utils.get_hex_uuid,
|
|
30 |
default=authentic2.a2_rbac.utils.get_hex_uuid,
|
|
31 | 31 |
unique=True, |
32 | 32 |
max_length=32, |
33 | 33 |
verbose_name='uuid', |
... | ... | |
92 | 92 |
( |
93 | 93 |
'uuid', |
94 | 94 |
models.CharField( |
95 |
default=django_rbac.utils.get_hex_uuid,
|
|
95 |
default=authentic2.a2_rbac.utils.get_hex_uuid,
|
|
96 | 96 |
unique=True, |
97 | 97 |
max_length=32, |
98 | 98 |
verbose_name='uuid', |
src/django_rbac/utils.py | ||
---|---|---|
1 |
import uuid |
|
2 | ||
3 |
from django.apps import apps |
|
4 |
from django.conf import settings |
|
5 |
from django.utils.text import slugify |
|
6 | ||
7 |
from . import constants |
|
8 | ||
9 |
DEFAULT_MODELS = { |
|
10 |
constants.RBAC_OU_MODEL_SETTING: 'django_rbac.OrganizationalUnit', |
|
11 |
constants.RBAC_ROLE_PARENTING_MODEL_SETTING: 'django_rbac.RoleParenting', |
|
12 |
constants.RBAC_ROLE_MODEL_SETTING: 'django_rbac.Role', |
|
13 |
constants.RBAC_PERMISSION_MODEL_SETTING: 'django_rbac.Permission', |
|
14 |
} |
|
15 | ||
16 | ||
17 |
def get_hex_uuid(): |
|
18 |
return uuid.uuid4().hex |
|
19 | ||
20 | ||
21 |
def get_swapped_model_name(setting): |
|
22 |
"""Return a model qualified name given a setting name containing the |
|
23 |
qualified name of the model, useful to retrieve swappable models |
|
24 |
name. |
|
25 |
""" |
|
26 |
if not hasattr(settings, setting): |
|
27 |
setattr(settings, setting, DEFAULT_MODELS[setting]) |
|
28 |
return getattr(settings, setting) |
|
29 | ||
30 | ||
31 |
def get_swapped_model(setting): |
|
32 |
"""Return a model given a setting name containing the qualified name |
|
33 |
of the model, useful to retrieve swappable models. |
|
34 |
""" |
|
35 |
app, model_name = get_swapped_model_name(setting).rsplit('.', 1) |
|
36 |
return apps.get_model(app, model_name) |
|
37 | ||
38 | ||
39 |
def get_role_model_name(): |
|
40 |
'''Returns the currently configured role model''' |
|
41 |
return get_swapped_model_name(constants.RBAC_ROLE_MODEL_SETTING) |
|
42 | ||
43 | ||
44 |
def get_ou_model_name(): |
|
45 |
'''Returns the currently configured organizational unit model''' |
|
46 |
return get_swapped_model_name(constants.RBAC_OU_MODEL_SETTING) |
|
47 | ||
48 | ||
49 |
def get_role_parenting_model_name(): |
|
50 |
'''Returns the currently configured role parenting model''' |
|
51 |
return get_swapped_model_name(constants.RBAC_ROLE_PARENTING_MODEL_SETTING) |
|
52 | ||
53 | ||
54 |
def get_permission_model_name(): |
|
55 |
'''Returns the currently configured permission model''' |
|
56 |
return get_swapped_model_name(constants.RBAC_PERMISSION_MODEL_SETTING) |
|
57 | ||
58 | ||
59 |
def get_role_model(): |
|
60 |
'''Returns the currently configured role model''' |
|
61 |
return get_swapped_model(constants.RBAC_ROLE_MODEL_SETTING) |
|
62 | ||
63 | ||
64 |
def get_ou_model(): |
|
65 |
'''Returns the currently configured organizational unit model''' |
|
66 |
return get_swapped_model(constants.RBAC_OU_MODEL_SETTING) |
|
67 | ||
68 | ||
69 |
def get_role_parenting_model(): |
|
70 |
'''Returns the currently configured role parenting model''' |
|
71 |
return get_swapped_model(constants.RBAC_ROLE_PARENTING_MODEL_SETTING) |
|
72 | ||
73 | ||
74 |
def get_permission_model(): |
|
75 |
'''Returns the currently configured permission model''' |
|
76 |
return get_swapped_model(constants.RBAC_PERMISSION_MODEL_SETTING) |
|
77 | ||
78 | ||
79 |
def get_operation(operation_tpl): |
|
80 |
from authentic2.a2_rbac import models |
|
81 | ||
82 |
operation, dummy = models.Operation.objects.get_or_create(slug=operation_tpl.slug) |
|
83 |
return operation |
|
84 | ||
85 | ||
86 |
def generate_slug(name, seen_slugs=None): |
|
87 |
slug = base_slug = slugify(name).lstrip('_') |
|
88 |
if seen_slugs: |
|
89 |
i = 1 |
|
90 |
while slug in seen_slugs: |
|
91 |
slug = '%s-%s' % (base_slug, i) |
|
92 |
return slug |
tests/test_commands.py | ||
---|---|---|
36 | 36 |
Permission, |
37 | 37 |
Role, |
38 | 38 |
) |
39 |
from authentic2.a2_rbac.utils import get_default_ou |
|
39 |
from authentic2.a2_rbac.utils import get_default_ou, get_operation
|
|
40 | 40 |
from authentic2.apps.journal.models import Event |
41 | 41 |
from authentic2.custom_user.models import DeletedUser |
42 | 42 |
from authentic2.models import UserExternalId |
43 | 43 |
from authentic2_auth_oidc.models import OIDCAccount, OIDCProvider |
44 |
from django_rbac.utils import get_operation |
|
45 | 44 | |
46 | 45 |
from .utils import call_command, login |
47 | 46 |
tests/test_manager.py | ||
---|---|---|
31 | 31 |
from authentic2.a2_rbac.models import MANAGE_MEMBERS_OP, VIEW_OP |
32 | 32 |
from authentic2.a2_rbac.models import OrganizationalUnit as OU |
33 | 33 |
from authentic2.a2_rbac.models import Permission, Role |
34 |
from authentic2.a2_rbac.utils import get_default_ou |
|
34 |
from authentic2.a2_rbac.utils import get_default_ou, get_operation
|
|
35 | 35 |
from authentic2.apps.journal.models import Event |
36 | 36 |
from authentic2.models import Service |
37 | 37 |
from authentic2.validators import EmailValidator |
38 |
from django_rbac.utils import get_operation |
|
39 | 38 | |
40 | 39 |
from .utils import assert_event, get_link_from_mail, login, request_select2, text_content |
41 | 40 |
tests/test_rbac.py | ||
---|---|---|
21 | 21 |
from django.db.models import Q |
22 | 22 |
from django.test.utils import CaptureQueriesContext |
23 | 23 | |
24 |
from authentic2.a2_rbac import models
|
|
24 |
from authentic2.a2_rbac.models import Operation, OrganizationalUnit, Permission, Role, RoleParenting
|
|
25 | 25 |
from authentic2.custom_user import backends |
26 |
from django_rbac import utils |
|
27 | 26 | |
28 |
OU = OrganizationalUnit = utils.get_ou_model() |
|
29 |
Permission = utils.get_permission_model() |
|
30 |
RoleParenting = utils.get_role_parenting_model() |
|
31 |
Role = utils.get_role_model() |
|
32 | 27 |
User = get_user_model() |
33 | 28 | |
34 | 29 | |
... | ... | |
95 | 90 | |
96 | 91 | |
97 | 92 |
def test_role_parenting_soft_delete_children(db): |
98 |
OrganizationalUnit = utils.get_ou_model() |
|
99 |
Role = utils.get_role_model() |
|
100 |
RoleParenting = utils.get_role_parenting_model() |
|
101 | ||
102 | 93 |
ou = OrganizationalUnit.objects.create(name='ou') |
103 | 94 |
roles = [] |
104 | 95 |
for i in range(10): |
... | ... | |
121 | 112 | |
122 | 113 | |
123 | 114 |
def test_role_parenting_soft_delete_parents(db): |
124 |
OrganizationalUnit = utils.get_ou_model() |
|
125 |
Role = utils.get_role_model() |
|
126 |
RoleParenting = utils.get_role_parenting_model() |
|
127 | ||
128 | 115 |
ou = OrganizationalUnit.objects.create(name='ou') |
129 | 116 |
roles = [] |
130 | 117 |
for i in range(10): |
... | ... | |
167 | 154 |
relations.append(RoleParenting(parent=roles[i], child=roles[(i - 1) // SPAN])) |
168 | 155 |
RoleParenting.objects.bulk_create(relations) |
169 | 156 |
RoleParenting.objects.update_transitive_closure() |
170 |
operation, _ = models.Operation.objects.get_or_create(slug='admin')
|
|
157 |
operation, _ = Operation.objects.get_or_create(slug='admin') |
|
171 | 158 |
perm, _ = Permission.objects.get_or_create( |
172 | 159 |
operation=operation, |
173 | 160 |
target_ct=ContentType.objects.get_for_model(ContentType), |
... | ... | |
176 | 163 |
roles[0].members.add(user) |
177 | 164 |
Role.objects.get(pk=roles[-1].pk).permissions.add(perm) |
178 | 165 |
for i in range(SIZE): |
179 |
assert models.Operation.objects.has_perm(user, 'admin', User)
|
|
166 |
assert Operation.objects.has_perm(user, 'admin', User) |
|
180 | 167 |
for i in range(SIZE): |
181 | 168 |
assert list(Role.objects.for_user(user).order_by('pk')) == list(Role.objects.order_by('pk')) |
182 | 169 | |
183 | 170 | |
184 | 171 |
def test_rbac_backend(db): |
185 |
ou1 = OU.objects.create(name='ou1', slug='ou1')
|
|
186 |
ou2 = OU.objects.create(name='ou2', slug='ou2')
|
|
172 |
ou1 = OrganizationalUnit.objects.create(name='ou1', slug='ou1')
|
|
173 |
ou2 = OrganizationalUnit.objects.create(name='ou2', slug='ou2')
|
|
187 | 174 |
user1 = User.objects.create(username='john.doe') |
188 | 175 |
ct_ct = ContentType.objects.get_for_model(ContentType) |
189 | 176 |
role_ct = ContentType.objects.get_for_model(Role) |
190 |
change_op = models.Operation.objects.get(slug='change')
|
|
191 |
view_op = models.Operation.objects.get(slug='view')
|
|
192 |
delete_op = models.Operation.objects.get(slug='delete')
|
|
193 |
add_op = models.Operation.objects.get(slug='add')
|
|
194 |
admin_op = models.Operation.objects.get(slug='admin')
|
|
177 |
change_op = Operation.objects.get(slug='change') |
|
178 |
view_op = Operation.objects.get(slug='view') |
|
179 |
delete_op = Operation.objects.get(slug='delete') |
|
180 |
add_op = Operation.objects.get(slug='add') |
|
181 |
admin_op = Operation.objects.get(slug='admin') |
|
195 | 182 |
perm1 = Permission.objects.create(operation=change_op, target_ct=ct_ct, target_id=role_ct.pk) |
196 | 183 |
perm2 = Permission.objects.create(operation=view_op, target_ct=ct_ct, target_id=role_ct.pk) |
197 | 184 |
Role.objects.all().delete() |
tests/test_user_manager.py | ||
---|---|---|
30 | 30 |
from authentic2.a2_rbac.models import VIEW_OP |
31 | 31 |
from authentic2.a2_rbac.models import OrganizationalUnit as OU |
32 | 32 |
from authentic2.a2_rbac.models import Permission, Role |
33 |
from authentic2.a2_rbac.utils import get_default_ou, get_view_user_perm |
|
33 |
from authentic2.a2_rbac.utils import get_default_ou, get_operation, get_view_user_perm
|
|
34 | 34 |
from authentic2.apps.journal.models import Event |
35 | 35 |
from authentic2.custom_user.models import User |
36 | 36 |
from authentic2.manager import user_import |
37 | 37 |
from authentic2.models import Attribute, AttributeValue |
38 | 38 |
from authentic2_idp_oidc.models import OIDCAuthorization, OIDCClient |
39 |
from django_rbac.utils import get_operation |
|
40 | 39 | |
41 | 40 |
from .utils import get_link_from_mail, login, logout |
42 | 41 | |
43 |
- |