From b857b880ebd18afea05b2e6bc1c9ab26d6336cb3 Mon Sep 17 00:00:00 2001 From: Valentin Deniaud Date: Tue, 4 Oct 2022 12:44:03 +0200 Subject: [PATCH 4/4] a2_rbac: move abstract model code from django_rbac (#58696) --- src/authentic2/a2_rbac/models.py | 199 +++++++++++++- .../migrations/0009_auto_20221004_1343.py | 53 ++++ src/django_rbac/models.py | 244 +----------------- 3 files changed, 240 insertions(+), 256 deletions(-) create mode 100644 src/django_rbac/migrations/0009_auto_20221004_1343.py diff --git a/src/authentic2/a2_rbac/models.py b/src/authentic2/a2_rbac/models.py index bf3657ce8..3721466c1 100644 --- a/src/authentic2/a2_rbac/models.py +++ b/src/authentic2/a2_rbac/models.py @@ -14,36 +14,73 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import hashlib import os from collections import namedtuple +from django.conf import settings +from django.contrib.auth import get_user_model from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ValidationError from django.core.validators import MinValueValidator from django.db import models +from django.db.models.query import Prefetch, Q from django.urls import reverse from django.utils.text import slugify +from django.utils.translation import gettext from django.utils.translation import gettext_lazy as _ from django.utils.translation import pgettext_lazy +from model_utils.managers import QueryManager from authentic2.decorators import errorcollector from authentic2.utils.cache import GlobalCache from authentic2.validators import HexaColourValidator +from django_rbac import managers as rbac_managers from django_rbac import utils as rbac_utils -from django_rbac.models import ( - VIEW_OP, - Operation, - OrganizationalUnitAbstractBase, - PermissionAbstractBase, - RoleAbstractBase, - RoleParentingAbstractBase, -) +from django_rbac.models import VIEW_OP, Operation from . import app_settings, fields, managers -class OrganizationalUnit(OrganizationalUnitAbstractBase): +class AbstractBase(models.Model): + """Abstract base model for all models having a name and uuid and a + slug + """ + + uuid = models.CharField( + max_length=32, verbose_name=_('uuid'), unique=True, default=rbac_utils.get_hex_uuid + ) + name = models.CharField(max_length=256, verbose_name=_('name')) + slug = models.SlugField(max_length=256, verbose_name=_('slug')) + description = models.TextField(verbose_name=_('description'), blank=True) + + objects = rbac_managers.AbstractBaseManager() + + def __str__(self): + return str(self.name) + + def __repr__(self): + return f'<{self.__class__.__name__} {repr(self.slug)} {repr(self.name)}>' + + def save(self, *args, **kwargs): + # truncate slug and add a hash if it's too long + if not self.slug: + self.slug = rbac_utils.generate_slug(self.name) + if len(self.slug) > 256: + self.slug = self.slug[:252] + hashlib.md5(self.slug).hexdigest()[:4] + if not self.uuid: + self.uuid = rbac_utils.get_hex_uuid() + return super().save(*args, **kwargs) + + def natural_key(self): + return [self.uuid] + + class Meta: + abstract = True + + +class OrganizationalUnit(AbstractBase): RESET_LINK_POLICY = 0 MANUAL_PASSWORD_POLICY = 1 @@ -131,6 +168,9 @@ class OrganizationalUnit(OrganizationalUnitAbstractBase): ('slug',), ) + def as_scope(self): + return self + def clean(self): # if we set this ou as the default one, we must unset the other one if # there is @@ -176,7 +216,7 @@ class OrganizationalUnit(OrganizationalUnitAbstractBase): os.unlink(self.logo.path) Permission.objects.filter(ou=self).delete() - return super(OrganizationalUnitAbstractBase, self).delete(*args, **kwargs) + return super().delete(*args, **kwargs) def natural_key(self): return [self.slug] @@ -208,7 +248,21 @@ class OrganizationalUnit(OrganizationalUnitAbstractBase): OrganizationalUnit._meta.natural_key = [['uuid'], ['slug'], ['name']] -class Permission(PermissionAbstractBase): +class Permission(models.Model): + operation = models.ForeignKey(to=Operation, verbose_name=_('operation'), on_delete=models.CASCADE) + ou = models.ForeignKey( + to=rbac_utils.get_ou_model_name(), + verbose_name=_('organizational unit'), + related_name='scoped_permission', + null=True, + on_delete=models.CASCADE, + ) + target_ct = models.ForeignKey(to='contenttypes.ContentType', related_name='+', on_delete=models.CASCADE) + target_id = models.PositiveIntegerField() + target = GenericForeignKey('target_ct', 'target_id') + + objects = rbac_managers.PermissionManager() + class Meta: verbose_name = _('permission') verbose_name_plural = _('permissions') @@ -226,6 +280,34 @@ class Permission(PermissionAbstractBase): object_id_field='admin_scope_id', ) + def natural_key(self): + return [ + self.operation.slug, + self.ou and self.ou.natural_key(), + self.target and self.target_ct.natural_key(), + self.target and self.target.natural_key(), + ] + + def export_json(self): + return { + "operation": self.operation.natural_key_json(), + "ou": self.ou and self.ou.natural_key_json(), + 'target_ct': self.target_ct.natural_key_json(), + "target": self.target.natural_key_json(), + } + + def __str__(self): + ct = ContentType.objects.get_for_id(self.target_ct_id) + ct_ct = ContentType.objects.get_for_model(ContentType) + if ct == ct_ct: + target = ContentType.objects.get_for_id(self.target_id) + s = f'{self.operation} / {target}' + else: + s = f'{self.operation} / {ct.name} / {self.target}' + if self.ou: + s += gettext(' (scope "{0}")').format(self.ou) + return s + Permission._meta.natural_key = [ ['operation', 'ou', 'target'], @@ -233,7 +315,21 @@ Permission._meta.natural_key = [ ] -class Role(RoleAbstractBase): +class Role(AbstractBase): + ou = models.ForeignKey( + to=rbac_utils.get_ou_model_name(), + verbose_name=_('organizational unit'), + swappable=True, + blank=True, + null=True, + on_delete=models.CASCADE, + ) + members = models.ManyToManyField( + to=settings.AUTH_USER_MODEL, swappable=True, blank=True, related_name='roles' + ) + permissions = models.ManyToManyField( + to=rbac_utils.get_permission_model_name(), related_name='roles', blank=True + ) name = models.TextField(verbose_name=_('name')) admin_scope_ct = models.ForeignKey( to='contenttypes.ContentType', @@ -262,6 +358,56 @@ class Role(RoleAbstractBase): default=True, verbose_name=_('Allow adding or deleting role members') ) + objects = rbac_managers.RoleQuerySet.as_manager() + + def add_child(self, child): + RoleParenting = rbac_utils.get_role_parenting_model() + RoleParenting.objects.soft_create(self, child) + + def remove_child(self, child): + RoleParenting = rbac_utils.get_role_parenting_model() + RoleParenting.objects.soft_delete(self, child) + + def add_parent(self, parent): + RoleParenting = rbac_utils.get_role_parenting_model() + RoleParenting.objects.soft_create(parent, self) + + def remove_parent(self, parent): + RoleParenting = rbac_utils.get_role_parenting_model() + RoleParenting.objects.soft_delete(parent, self) + + def parents(self, include_self=True, annotate=False, direct=None): + return self.__class__.objects.filter(pk=self.pk).parents( + include_self=include_self, annotate=annotate, direct=direct + ) + + def children(self, include_self=True, annotate=False, direct=None): + return self.__class__.objects.filter(pk=self.pk).children( + include_self=include_self, + annotate=annotate, + direct=direct, + ) + + def all_members(self): + User = get_user_model() + prefetch = Prefetch('roles', queryset=self.__class__.objects.filter(pk=self.pk), to_attr='direct') + + return ( + User.objects.filter( + Q(roles=self) + | Q(roles__parent_relation__parent=self) & Q(roles__parent_relation__deleted__isnull=True) + ) + .distinct() + .prefetch_related(prefetch) + ) + + def is_direct(self): + if hasattr(self, 'direct'): + if self.direct is None: + return True + return bool(self.direct) + return None + def get_admin_role(self, create=True): from . import utils @@ -503,10 +649,35 @@ Role._meta.natural_key = [ ] -class RoleParenting(RoleParentingAbstractBase): - class Meta(RoleParentingAbstractBase.Meta): +class RoleParenting(models.Model): + parent = models.ForeignKey( + to=rbac_utils.get_role_model_name(), + swappable=True, + related_name='child_relation', + on_delete=models.CASCADE, + ) + child = models.ForeignKey( + to=rbac_utils.get_role_model_name(), + swappable=True, + related_name='parent_relation', + on_delete=models.CASCADE, + ) + direct = models.BooleanField(default=True, blank=True) + created = models.DateTimeField(verbose_name=_('Creation date'), auto_now_add=True) + deleted = models.DateTimeField(verbose_name=_('Deletion date'), null=True) + + objects = rbac_managers.RoleParentingManager() + alive = QueryManager(deleted__isnull=True) + + def natural_key(self): + return [self.parent.natural_key(), self.child.natural_key(), self.direct] + + class Meta: verbose_name = _('role parenting relation') verbose_name_plural = _('role parenting relations') + unique_together = (('parent', 'child', 'direct'),) + # covering indexes + index_together = (('child', 'parent', 'direct'),) def __str__(self): return '{} {}> {}'.format(self.parent.name, '-' if self.direct else '~', self.child.name) diff --git a/src/django_rbac/migrations/0009_auto_20221004_1343.py b/src/django_rbac/migrations/0009_auto_20221004_1343.py new file mode 100644 index 000000000..222ffbb86 --- /dev/null +++ b/src/django_rbac/migrations/0009_auto_20221004_1343.py @@ -0,0 +1,53 @@ +# Generated by Django 2.2.26 on 2022-10-04 11:43 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('django_rbac', '0008_add_roleparenting_soft_delete'), + ] + + operations = [ + migrations.DeleteModel( + name='OrganizationalUnit', + ), + migrations.RemoveField( + model_name='role', + name='members', + ), + migrations.RemoveField( + model_name='role', + name='ou', + ), + migrations.RemoveField( + model_name='role', + name='permissions', + ), + migrations.AlterUniqueTogether( + name='roleparenting', + unique_together=None, + ), + migrations.AlterIndexTogether( + name='roleparenting', + index_together=None, + ), + migrations.RemoveField( + model_name='roleparenting', + name='child', + ), + migrations.RemoveField( + model_name='roleparenting', + name='parent', + ), + migrations.DeleteModel( + name='Permission', + ), + migrations.DeleteModel( + name='Role', + ), + migrations.DeleteModel( + name='RoleParenting', + ), + ] diff --git a/src/django_rbac/models.py b/src/django_rbac/models.py index 77eea32c3..76d968118 100644 --- a/src/django_rbac/models.py +++ b/src/django_rbac/models.py @@ -1,10 +1,7 @@ import functools -import hashlib import operator -from django.conf import settings from django.contrib import auth -from django.contrib.auth import get_user_model from django.contrib.auth.models import Group from django.contrib.auth.models import Permission as AuthPermission from django.contrib.auth.models import _user_has_module_perms, _user_has_perm @@ -18,86 +15,11 @@ except ImportError: return _user_get_permissions(user, obj, 'all') -from django.contrib.contenttypes.fields import GenericForeignKey -from django.contrib.contenttypes.models import ContentType from django.db import models -from django.db.models.query import Prefetch, Q -from django.utils.translation import gettext, pgettext_lazy +from django.utils.translation import pgettext_lazy from django.utils.translation import ugettext_lazy as _ -from model_utils.managers import QueryManager -from . import backends, constants, managers, utils - - -class AbstractBase(models.Model): - """Abstract base model for all models having a name and uuid and a - slug - """ - - uuid = models.CharField(max_length=32, verbose_name=_('uuid'), unique=True, default=utils.get_hex_uuid) - name = models.CharField(max_length=256, verbose_name=_('name')) - slug = models.SlugField(max_length=256, verbose_name=_('slug')) - description = models.TextField(verbose_name=_('description'), blank=True) - - objects = managers.AbstractBaseManager() - - def __str__(self): - return str(self.name) - - def __repr__(self): - return f'<{self.__class__.__name__} {repr(self.slug)} {repr(self.name)}>' - - def save(self, *args, **kwargs): - # truncate slug and add a hash if it's too long - if not self.slug: - self.slug = utils.generate_slug(self.name) - if len(self.slug) > 256: - self.slug = self.slug[:252] + hashlib.md5(self.slug).hexdigest()[:4] - if not self.uuid: - self.uuid = utils.get_hex_uuid() - return super().save(*args, **kwargs) - - def natural_key(self): - return [self.uuid] - - class Meta: - abstract = True - - -class AbstractOrganizationalUnitScopedBase(models.Model): - '''Base abstract model class for model needing to be scoped by ou''' - - ou = models.ForeignKey( - to=utils.get_ou_model_name(), - verbose_name=_('organizational unit'), - swappable=True, - blank=True, - null=True, - on_delete=models.CASCADE, - ) - - class Meta: - abstract = True - - -class OrganizationalUnitAbstractBase(AbstractBase): - class Meta: - abstract = True - - def as_scope(self): - """When used as scope to find permissions. Can return a queryset - in a swapped model if for example your OU are hierarchical. - - Must return an OrganizationalUnit or a queryset. - """ - return self - - -class OrganizationalUnit(OrganizationalUnitAbstractBase): - class Meta(OrganizationalUnitAbstractBase.Meta): - verbose_name = _('organizational unit') - verbose_name_plural = _('organizational units') - swappable = constants.RBAC_OU_MODEL_SETTING +from . import backends, managers class Operation(models.Model): @@ -129,168 +51,6 @@ class Operation(models.Model): Operation._meta.natural_key = ['slug'] -class PermissionAbstractBase(models.Model): - operation = models.ForeignKey(to=Operation, verbose_name=_('operation'), on_delete=models.CASCADE) - ou = models.ForeignKey( - to=utils.get_ou_model_name(), - verbose_name=_('organizational unit'), - related_name='scoped_permission', - null=True, - on_delete=models.CASCADE, - ) - target_ct = models.ForeignKey(to='contenttypes.ContentType', related_name='+', on_delete=models.CASCADE) - target_id = models.PositiveIntegerField() - target = GenericForeignKey('target_ct', 'target_id') - - objects = managers.PermissionManager() - - def natural_key(self): - return [ - self.operation.slug, - self.ou and self.ou.natural_key(), - self.target and self.target_ct.natural_key(), - self.target and self.target.natural_key(), - ] - - def export_json(self): - return { - "operation": self.operation.natural_key_json(), - "ou": self.ou and self.ou.natural_key_json(), - 'target_ct': self.target_ct.natural_key_json(), - "target": self.target.natural_key_json(), - } - - def __str__(self): - ct = ContentType.objects.get_for_id(self.target_ct_id) - ct_ct = ContentType.objects.get_for_model(ContentType) - if ct == ct_ct: - target = ContentType.objects.get_for_id(self.target_id) - s = f'{self.operation} / {target}' - else: - s = f'{self.operation} / {ct.name} / {self.target}' - if self.ou: - s += gettext(' (scope "{0}")').format(self.ou) - return s - - class Meta: - abstract = True - # FIXME: it's still allow non-unique permission with ou=null - unique_together = (('operation', 'ou', 'target_ct', 'target_id'),) - - -class Permission(PermissionAbstractBase): - class Meta(PermissionAbstractBase.Meta): - swappable = constants.RBAC_PERMISSION_MODEL_SETTING - verbose_name = _('permission') - verbose_name_plural = _('permissions') - - -class RoleAbstractBase(AbstractOrganizationalUnitScopedBase, AbstractBase): - members = models.ManyToManyField( - to=settings.AUTH_USER_MODEL, swappable=True, blank=True, related_name='roles' - ) - permissions = models.ManyToManyField( - to=utils.get_permission_model_name(), related_name='roles', blank=True - ) - - objects = managers.RoleQuerySet.as_manager() - - def add_child(self, child): - RoleParenting = utils.get_role_parenting_model() - RoleParenting.objects.soft_create(self, child) - - def remove_child(self, child): - RoleParenting = utils.get_role_parenting_model() - RoleParenting.objects.soft_delete(self, child) - - def add_parent(self, parent): - RoleParenting = utils.get_role_parenting_model() - RoleParenting.objects.soft_create(parent, self) - - def remove_parent(self, parent): - RoleParenting = utils.get_role_parenting_model() - RoleParenting.objects.soft_delete(parent, self) - - def parents(self, include_self=True, annotate=False, direct=None): - return self.__class__.objects.filter(pk=self.pk).parents( - include_self=include_self, annotate=annotate, direct=direct - ) - - def children(self, include_self=True, annotate=False, direct=None): - return self.__class__.objects.filter(pk=self.pk).children( - include_self=include_self, - annotate=annotate, - direct=direct, - ) - - def all_members(self): - User = get_user_model() - prefetch = Prefetch('roles', queryset=self.__class__.objects.filter(pk=self.pk), to_attr='direct') - - return ( - User.objects.filter( - Q(roles=self) - | Q(roles__parent_relation__parent=self) & Q(roles__parent_relation__deleted__isnull=True) - ) - .distinct() - .prefetch_related(prefetch) - ) - - def is_direct(self): - if hasattr(self, 'direct'): - if self.direct is None: - return True - return bool(self.direct) - return None - - class Meta: - abstract = True - - -class Role(RoleAbstractBase): - class Meta(RoleAbstractBase.Meta): - verbose_name = _('role') - verbose_name_plural = _('roles') - swappable = constants.RBAC_ROLE_MODEL_SETTING - - -class RoleParentingAbstractBase(models.Model): - parent = models.ForeignKey( - to=utils.get_role_model_name(), - swappable=True, - related_name='child_relation', - on_delete=models.CASCADE, - ) - child = models.ForeignKey( - to=utils.get_role_model_name(), - swappable=True, - related_name='parent_relation', - on_delete=models.CASCADE, - ) - direct = models.BooleanField(default=True, blank=True) - created = models.DateTimeField(verbose_name=_('Creation date'), auto_now_add=True) - deleted = models.DateTimeField(verbose_name=_('Deletion date'), null=True) - - objects = managers.RoleParentingManager() - alive = QueryManager(deleted__isnull=True) - - def natural_key(self): - return [self.parent.natural_key(), self.child.natural_key(), self.direct] - - class Meta: - abstract = True - unique_together = (('parent', 'child', 'direct'),) - # covering indexes - index_together = (('child', 'parent', 'direct'),) - - -class RoleParenting(RoleParentingAbstractBase): - class Meta(RoleParentingAbstractBase.Meta): - verbose_name = _('role parenting relation') - verbose_name_plural = _('role parenting relations') - swappable = constants.RBAC_ROLE_PARENTING_MODEL_SETTING - - class PermissionMixin(models.Model): """ A mixin class that adds the fields and methods necessary to support -- 2.35.1