From c2bfa0d6f7a191fbaefea8108d7f703d87bc1902 Mon Sep 17 00:00:00 2001 From: Valentin Deniaud Date: Thu, 4 Apr 2019 12:05:37 +0200 Subject: [PATCH 04/13] django_rbac: check authentication level along with permissions --- src/django_rbac/backends.py | 133 ++++++++++++++++++++++++++-------- src/django_rbac/exceptions.py | 4 + 2 files changed, 105 insertions(+), 32 deletions(-) create mode 100644 src/django_rbac/exceptions.py diff --git a/src/django_rbac/backends.py b/src/django_rbac/backends.py index eb792f72..5243d52a 100644 --- a/src/django_rbac/backends.py +++ b/src/django_rbac/backends.py @@ -5,6 +5,8 @@ from django.contrib.contenttypes.models import ContentType from django.db.models.query import Q from django.utils import six +from .exceptions import InsufficientAuthLevel + try: from django.core.exceptions import FieldDoesNotExist except ImportError: @@ -59,6 +61,7 @@ class DjangoRBACBackend(object): qs = qs.select_related('operation') for permission in qs: target_ct = ContentType.objects.get_for_id(permission.target_ct_id) + auth_level = permission.auth_level if target_ct == ct_ct: target = ContentType.objects.get_for_id(permission.target_id) app_label = target.app_label @@ -72,13 +75,13 @@ class DjangoRBACBackend(object): model = target_ct.model key = '%s.%s' % (permission.target_ct_id, permission.target_id) slug = permission.operation.slug - perms = [str('%s.%s_%s' % (app_label, slug, model))] + perms = {str('%s.%s_%s' % (app_label, slug, model)): auth_level} perm_hierarchy = getattr(settings, 'DJANGO_RBAC_PERMISSIONS_HIERARCHY', self._DEFAULT_DJANGO_RBAC_PERMISSIONS_HIERARCHY) if slug in perm_hierarchy: for other_perm in perm_hierarchy[slug]: - perms.append(str('%s.%s_%s' % (app_label, other_perm, model))) - permissions = perms_cache.setdefault(key, set()) + perms[str('%s.%s_%s' % (app_label, other_perm, model))] = auth_level + permissions = perms_cache.setdefault(key, {}) permissions.update(perms) # optimization for has_module_perms perms_cache[app_label] = True @@ -90,25 +93,41 @@ class DjangoRBACBackend(object): return () perms_cache = self.get_permission_cache(user_obj) if obj: - permissions = set() + permissions = {} ct = ContentType.objects.get_for_model(obj) key = '%s.%s' % (ct.id, obj.pk) if key in perms_cache: permissions.update(perms_cache[key]) - for permission in perms_cache.get('__all__', set([])): + perms_cache_all = perms_cache.get('__all__', set([])) + for permission in perms_cache_all: if (permission.startswith('%s.' % ct.app_label) and permission.endswith('_%s' % ct.model)): - permissions.add(permission) + permissions[permission] = perms_cache_all[permission] if hasattr(obj, 'ou_id') and obj.ou_id: key = 'ou.%s' % obj.ou_id - for permission in perms_cache.get(key, ()): + perms_cache_key = perms_cache.get(key, ()) + for permission in perms_cache_key: if (permission.startswith('%s.' % ct.app_label) and permission.endswith('_%s' % ct.model)): - permissions.add(permission) + permissions[permission] = perms_cache_key[permission] return permissions else: return perms_cache.get('__all__', []) + @staticmethod + def user_has_sufficient_auth_level(user_obj, perm_auth_level): + try: + user_auth_level = user_obj.auth_level + except AttributeError: + # If an app doesn't care about auth level, everything will still work + # in a transparent way. However, if it cares and forgot to annotate + # the User object, the check will fail as expected when permission level + # is too high. + user_auth_level = 1 + if perm_auth_level <= user_auth_level: + return True + raise InsufficientAuthLevel(perm_auth_level) + def has_perm(self, user_obj, perm, obj=None): if user_obj.is_anonymous(): return False @@ -116,15 +135,20 @@ class DjangoRBACBackend(object): return False if user_obj.is_superuser: return True - return perm in self.get_all_permissions(user_obj, obj=obj) + all_perms = self.get_all_permissions(user_obj, obj=obj) + if perm in all_perms: + return self.user_has_sufficient_auth_level(user_obj, all_perms[perm]) + return False def has_perms(self, user_obj, perm_list, obj=None): if user_obj.is_anonymous(): return False if not user_obj.is_active: return False - all_permissions = self.get_all_permissions(user_obj, obj=obj) - return all(perm in all_permissions for perm in perm_list) + for perm in perm_list: + if not self.has_perm(user_obj, perm, obj): + return False + return True def has_module_perms(self, user_obj, package_name): if user_obj.is_anonymous(): @@ -133,7 +157,29 @@ class DjangoRBACBackend(object): return False if user_obj.is_superuser: return True - return package_name in self.get_permission_cache(user_obj) + cache = self.get_permission_cache(user_obj) + if package_name in cache: + return self.user_has_sufficient_auth_level(user_obj, cache[package_name]) + return False + + def user_perms_any_auth_level(self, user_obj, user_perms, perms_levels, min_auth_level=False): + """Return True if user has one of the perms listed in user_perms with a + sufficient auth level. Otherwise return the minimum auth level that it should + get for being granted access (False if no permissions were found at all). + """ + for perm in perms_levels: + if not perm in user_perms: + continue + try: + # If there is at least one common permission, return true provided + # auth level check passes + return self.user_has_sufficient_auth_level(user_obj, perms_levels[perm]) + except InsufficientAuthLevel: + if not min_auth_level: + min_auth_level = perms_levels[perm] + else: + min_auth_level = min(perms_levels[perm], min_auth_level) + return min_auth_level def has_perm_any(self, user_obj, perm_or_perms): '''Return True if user has any perm on any object''' @@ -143,20 +189,25 @@ class DjangoRBACBackend(object): return False if user_obj.is_superuser: return True - if isinstance(perm_or_perms, six.string_types): - perm_or_perms = [perm_or_perms] - perm_or_perms = set(perm_or_perms) + perms = {perm_or_perms} if isinstance(perm_or_perms, six.string_types) \ + else set(perm_or_perms) cache = self.get_permission_cache(user_obj) - if perm_or_perms & cache.get('__all__', set()): + auth_level = self.user_perms_any_auth_level(user_obj, perms, cache.get('__all__', {})) + if auth_level is True: return True for key, value in cache.items(): if isinstance(value, bool): continue elif key == '__all__': continue - elif perm_or_perms & set(value): - return True - return False + else: + auth_level = self.user_perms_any_auth_level(user_obj, perms, value, + min_auth_level=auth_level) + if auth_level is True: + return True + if auth_level is False: + return False + raise InsufficientAuthLevel(auth_level) def filter_by_perm_query(self, user_obj, perm_or_perms, qs): '''Create a filter for a queryset for the objects on which the user has @@ -169,14 +220,14 @@ class DjangoRBACBackend(object): return False if user_obj.is_superuser: return True - if isinstance(perm_or_perms, six.string_types): - perm_or_perms = [perm_or_perms] - perm_or_perms = set(perm_or_perms) + perms = {perm_or_perms} if isinstance(perm_or_perms, six.string_types) \ + else set(perm_or_perms) cache = self.get_permission_cache(user_obj) model = qs.model OU = utils.get_ou_model() has_ou_field = get_fk_model(model, 'ou') == OU - if perm_or_perms & cache.get('__all__', set()): + auth_level = self.user_perms_any_auth_level(user_obj, perms, cache.get('__all__', {})) + if auth_level is True: return True q = [] for key, value in cache.items(): @@ -184,13 +235,26 @@ class DjangoRBACBackend(object): continue elif key == '__all__': continue - elif key.startswith('ou.'): - if has_ou_field and perm_or_perms & value: + elif key.startswith('ou.') and has_ou_field: + new_auth_level = self.user_perms_any_auth_level(user_obj, perms, value, + min_auth_level=auth_level) + if new_auth_level is True: q.append(Q(ou_id=int(key[3:]))) - continue - elif perm_or_perms & value: - ct_id, fk = key.split('.') - q.append(Q(pk=int(fk))) + else: + auth_level = new_auth_level + else: + new_auth_level = self.user_perms_any_auth_level(user_obj, perms, value, + min_auth_level=auth_level) + if auth_level is True: + ct_id, fk = key.split('.') + q.append(Q(pk=int(fk))) + else: + auth_level = new_auth_level + # TODO cette methode est plus compliquee que les autres, il faut surement rajouter + # de la logique en plus pour savoir si on doit raise en fonction de ce qu'il y a + # dans q + if auth_level > 1: + raise InsufficientAuthLevel(auth_level) if q: return six.moves.reduce(Q.__or__, q) return False @@ -217,7 +281,10 @@ class DjangoRBACBackend(object): return True if self.has_perm(user_obj, perm): return True - return perm in self.get_permission_cache(user_obj).get('ou.%s' % ou.pk, ()) + perms = self.get_permission_cache(user_obj).get('ou.%s' % ou.pk, ()) + if perm in perms: + return self.user_has_sufficient_auth_level(user_obj, perms[perm]) + return False def ous_with_perm(self, user_obj, perm, queryset=None): OU = utils.get_ou_model() @@ -233,7 +300,9 @@ class DjangoRBACBackend(object): ou_ids = [] for key in cache: if key == '__all__' and perm in cache[key]: - return qs + if self.user_has_sufficient_auth_level(user_obj, cache[key][perm]): + return qs if key.startswith('ou.') and perm in cache[key]: - ou_ids.append(int(key.split('.')[1])) + if self.user_has_sufficient_auth_level(user_obj, cache[key][perm]): + ou_ids.append(int(key.split('.')[1])) return qs.filter(id__in=ou_ids) diff --git a/src/django_rbac/exceptions.py b/src/django_rbac/exceptions.py new file mode 100644 index 00000000..b9750a20 --- /dev/null +++ b/src/django_rbac/exceptions.py @@ -0,0 +1,4 @@ +class InsufficientAuthLevel(Exception): + + def __init__(self, level): + self.required_level = level -- 2.20.1