Project

General

Profile

0004-django_rbac-check-authentication-level-along-with-pe.patch

Valentin Deniaud, 04 April 2019 05:07 PM

Download (12.2 KB)

View differences:

Subject: [PATCH 04/13] django_rbac: check authentication level along with
 permissions

 src/django_rbac/backends.py   | 133 ++++++++++++++++++++++++++--------
 src/django_rbac/exceptions.py |   4 +
 2 files changed, 105 insertions(+), 32 deletions(-)
 create mode 100644 src/django_rbac/exceptions.py
src/django_rbac/backends.py
from django.db.models.query import Q
from django.utils import six
from .exceptions import InsufficientAuthLevel
try:
from django.core.exceptions import FieldDoesNotExist
except ImportError:
......
qs = qs.select_related('operation')
for permission in qs:
target_ct = ContentType.objects.get_for_id(permission.target_ct_id)
auth_level = permission.auth_level
if target_ct == ct_ct:
target = ContentType.objects.get_for_id(permission.target_id)
app_label = target.app_label
......
model = target_ct.model
key = '%s.%s' % (permission.target_ct_id, permission.target_id)
slug = permission.operation.slug
perms = [str('%s.%s_%s' % (app_label, slug, model))]
perms = {str('%s.%s_%s' % (app_label, slug, model)): auth_level}
perm_hierarchy = getattr(settings, 'DJANGO_RBAC_PERMISSIONS_HIERARCHY',
self._DEFAULT_DJANGO_RBAC_PERMISSIONS_HIERARCHY)
if slug in perm_hierarchy:
for other_perm in perm_hierarchy[slug]:
perms.append(str('%s.%s_%s' % (app_label, other_perm, model)))
permissions = perms_cache.setdefault(key, set())
perms[str('%s.%s_%s' % (app_label, other_perm, model))] = auth_level
permissions = perms_cache.setdefault(key, {})
permissions.update(perms)
# optimization for has_module_perms
perms_cache[app_label] = True
......
return ()
perms_cache = self.get_permission_cache(user_obj)
if obj:
permissions = set()
permissions = {}
ct = ContentType.objects.get_for_model(obj)
key = '%s.%s' % (ct.id, obj.pk)
if key in perms_cache:
permissions.update(perms_cache[key])
for permission in perms_cache.get('__all__', set([])):
perms_cache_all = perms_cache.get('__all__', set([]))
for permission in perms_cache_all:
if (permission.startswith('%s.' % ct.app_label)
and permission.endswith('_%s' % ct.model)):
permissions.add(permission)
permissions[permission] = perms_cache_all[permission]
if hasattr(obj, 'ou_id') and obj.ou_id:
key = 'ou.%s' % obj.ou_id
for permission in perms_cache.get(key, ()):
perms_cache_key = perms_cache.get(key, ())
for permission in perms_cache_key:
if (permission.startswith('%s.' % ct.app_label)
and permission.endswith('_%s' % ct.model)):
permissions.add(permission)
permissions[permission] = perms_cache_key[permission]
return permissions
else:
return perms_cache.get('__all__', [])
@staticmethod
def user_has_sufficient_auth_level(user_obj, perm_auth_level):
try:
user_auth_level = user_obj.auth_level
except AttributeError:
# If an app doesn't care about auth level, everything will still work
# in a transparent way. However, if it cares and forgot to annotate
# the User object, the check will fail as expected when permission level
# is too high.
user_auth_level = 1
if perm_auth_level <= user_auth_level:
return True
raise InsufficientAuthLevel(perm_auth_level)
def has_perm(self, user_obj, perm, obj=None):
if user_obj.is_anonymous():
return False
......
return False
if user_obj.is_superuser:
return True
return perm in self.get_all_permissions(user_obj, obj=obj)
all_perms = self.get_all_permissions(user_obj, obj=obj)
if perm in all_perms:
return self.user_has_sufficient_auth_level(user_obj, all_perms[perm])
return False
def has_perms(self, user_obj, perm_list, obj=None):
if user_obj.is_anonymous():
return False
if not user_obj.is_active:
return False
all_permissions = self.get_all_permissions(user_obj, obj=obj)
return all(perm in all_permissions for perm in perm_list)
for perm in perm_list:
if not self.has_perm(user_obj, perm, obj):
return False
return True
def has_module_perms(self, user_obj, package_name):
if user_obj.is_anonymous():
......
return False
if user_obj.is_superuser:
return True
return package_name in self.get_permission_cache(user_obj)
cache = self.get_permission_cache(user_obj)
if package_name in cache:
return self.user_has_sufficient_auth_level(user_obj, cache[package_name])
return False
def user_perms_any_auth_level(self, user_obj, user_perms, perms_levels, min_auth_level=False):
"""Return True if user has one of the perms listed in user_perms with a
sufficient auth level. Otherwise return the minimum auth level that it should
get for being granted access (False if no permissions were found at all).
"""
for perm in perms_levels:
if not perm in user_perms:
continue
try:
# If there is at least one common permission, return true provided
# auth level check passes
return self.user_has_sufficient_auth_level(user_obj, perms_levels[perm])
except InsufficientAuthLevel:
if not min_auth_level:
min_auth_level = perms_levels[perm]
else:
min_auth_level = min(perms_levels[perm], min_auth_level)
return min_auth_level
def has_perm_any(self, user_obj, perm_or_perms):
'''Return True if user has any perm on any object'''
......
return False
if user_obj.is_superuser:
return True
if isinstance(perm_or_perms, six.string_types):
perm_or_perms = [perm_or_perms]
perm_or_perms = set(perm_or_perms)
perms = {perm_or_perms} if isinstance(perm_or_perms, six.string_types) \
else set(perm_or_perms)
cache = self.get_permission_cache(user_obj)
if perm_or_perms & cache.get('__all__', set()):
auth_level = self.user_perms_any_auth_level(user_obj, perms, cache.get('__all__', {}))
if auth_level is True:
return True
for key, value in cache.items():
if isinstance(value, bool):
continue
elif key == '__all__':
continue
elif perm_or_perms & set(value):
return True
return False
else:
auth_level = self.user_perms_any_auth_level(user_obj, perms, value,
min_auth_level=auth_level)
if auth_level is True:
return True
if auth_level is False:
return False
raise InsufficientAuthLevel(auth_level)
def filter_by_perm_query(self, user_obj, perm_or_perms, qs):
'''Create a filter for a queryset for the objects on which the user has
......
return False
if user_obj.is_superuser:
return True
if isinstance(perm_or_perms, six.string_types):
perm_or_perms = [perm_or_perms]
perm_or_perms = set(perm_or_perms)
perms = {perm_or_perms} if isinstance(perm_or_perms, six.string_types) \
else set(perm_or_perms)
cache = self.get_permission_cache(user_obj)
model = qs.model
OU = utils.get_ou_model()
has_ou_field = get_fk_model(model, 'ou') == OU
if perm_or_perms & cache.get('__all__', set()):
auth_level = self.user_perms_any_auth_level(user_obj, perms, cache.get('__all__', {}))
if auth_level is True:
return True
q = []
for key, value in cache.items():
......
continue
elif key == '__all__':
continue
elif key.startswith('ou.'):
if has_ou_field and perm_or_perms & value:
elif key.startswith('ou.') and has_ou_field:
new_auth_level = self.user_perms_any_auth_level(user_obj, perms, value,
min_auth_level=auth_level)
if new_auth_level is True:
q.append(Q(ou_id=int(key[3:])))
continue
elif perm_or_perms & value:
ct_id, fk = key.split('.')
q.append(Q(pk=int(fk)))
else:
auth_level = new_auth_level
else:
new_auth_level = self.user_perms_any_auth_level(user_obj, perms, value,
min_auth_level=auth_level)
if auth_level is True:
ct_id, fk = key.split('.')
q.append(Q(pk=int(fk)))
else:
auth_level = new_auth_level
# TODO cette methode est plus compliquee que les autres, il faut surement rajouter
# de la logique en plus pour savoir si on doit raise en fonction de ce qu'il y a
# dans q
if auth_level > 1:
raise InsufficientAuthLevel(auth_level)
if q:
return six.moves.reduce(Q.__or__, q)
return False
......
return True
if self.has_perm(user_obj, perm):
return True
return perm in self.get_permission_cache(user_obj).get('ou.%s' % ou.pk, ())
perms = self.get_permission_cache(user_obj).get('ou.%s' % ou.pk, ())
if perm in perms:
return self.user_has_sufficient_auth_level(user_obj, perms[perm])
return False
def ous_with_perm(self, user_obj, perm, queryset=None):
OU = utils.get_ou_model()
......
ou_ids = []
for key in cache:
if key == '__all__' and perm in cache[key]:
return qs
if self.user_has_sufficient_auth_level(user_obj, cache[key][perm]):
return qs
if key.startswith('ou.') and perm in cache[key]:
ou_ids.append(int(key.split('.')[1]))
if self.user_has_sufficient_auth_level(user_obj, cache[key][perm]):
ou_ids.append(int(key.split('.')[1]))
return qs.filter(id__in=ou_ids)
src/django_rbac/exceptions.py
class InsufficientAuthLevel(Exception):
def __init__(self, level):
self.required_level = level