0004-django_rbac-check-authentication-level-along-with-pe.patch
| src/django_rbac/backends.py | ||
|---|---|---|
|
from django.db.models.query import Q
|
||
|
from django.utils import six
|
||
|
from .exceptions import InsufficientAuthLevel
|
||
|
try:
|
||
|
from django.core.exceptions import FieldDoesNotExist
|
||
|
except ImportError:
|
||
| ... | ... | |
|
qs = qs.select_related('operation')
|
||
|
for permission in qs:
|
||
|
target_ct = ContentType.objects.get_for_id(permission.target_ct_id)
|
||
|
auth_level = permission.auth_level
|
||
|
if target_ct == ct_ct:
|
||
|
target = ContentType.objects.get_for_id(permission.target_id)
|
||
|
app_label = target.app_label
|
||
| ... | ... | |
|
model = target_ct.model
|
||
|
key = '%s.%s' % (permission.target_ct_id, permission.target_id)
|
||
|
slug = permission.operation.slug
|
||
|
perms = [str('%s.%s_%s' % (app_label, slug, model))]
|
||
|
perms = {str('%s.%s_%s' % (app_label, slug, model)): auth_level}
|
||
|
perm_hierarchy = getattr(settings, 'DJANGO_RBAC_PERMISSIONS_HIERARCHY',
|
||
|
self._DEFAULT_DJANGO_RBAC_PERMISSIONS_HIERARCHY)
|
||
|
if slug in perm_hierarchy:
|
||
|
for other_perm in perm_hierarchy[slug]:
|
||
|
perms.append(str('%s.%s_%s' % (app_label, other_perm, model)))
|
||
|
permissions = perms_cache.setdefault(key, set())
|
||
|
perms[str('%s.%s_%s' % (app_label, other_perm, model))] = auth_level
|
||
|
permissions = perms_cache.setdefault(key, {})
|
||
|
permissions.update(perms)
|
||
|
# optimization for has_module_perms
|
||
|
perms_cache[app_label] = True
|
||
| ... | ... | |
|
return ()
|
||
|
perms_cache = self.get_permission_cache(user_obj)
|
||
|
if obj:
|
||
|
permissions = set()
|
||
|
permissions = {}
|
||
|
ct = ContentType.objects.get_for_model(obj)
|
||
|
key = '%s.%s' % (ct.id, obj.pk)
|
||
|
if key in perms_cache:
|
||
|
permissions.update(perms_cache[key])
|
||
|
for permission in perms_cache.get('__all__', set([])):
|
||
|
perms_cache_all = perms_cache.get('__all__', set([]))
|
||
|
for permission in perms_cache_all:
|
||
|
if (permission.startswith('%s.' % ct.app_label)
|
||
|
and permission.endswith('_%s' % ct.model)):
|
||
|
permissions.add(permission)
|
||
|
permissions[permission] = perms_cache_all[permission]
|
||
|
if hasattr(obj, 'ou_id') and obj.ou_id:
|
||
|
key = 'ou.%s' % obj.ou_id
|
||
|
for permission in perms_cache.get(key, ()):
|
||
|
perms_cache_key = perms_cache.get(key, ())
|
||
|
for permission in perms_cache_key:
|
||
|
if (permission.startswith('%s.' % ct.app_label)
|
||
|
and permission.endswith('_%s' % ct.model)):
|
||
|
permissions.add(permission)
|
||
|
permissions[permission] = perms_cache_key[permission]
|
||
|
return permissions
|
||
|
else:
|
||
|
return perms_cache.get('__all__', [])
|
||
|
@staticmethod
|
||
|
def user_has_sufficient_auth_level(user_obj, perm_auth_level):
|
||
|
try:
|
||
|
user_auth_level = user_obj.auth_level
|
||
|
except AttributeError:
|
||
|
# If an app doesn't care about auth level, everything will still work
|
||
|
# in a transparent way. However, if it cares and forgot to annotate
|
||
|
# the User object, the check will fail as expected when permission level
|
||
|
# is too high.
|
||
|
user_auth_level = 1
|
||
|
if perm_auth_level <= user_auth_level:
|
||
|
return True
|
||
|
raise InsufficientAuthLevel(perm_auth_level)
|
||
|
def has_perm(self, user_obj, perm, obj=None):
|
||
|
if user_obj.is_anonymous():
|
||
|
return False
|
||
| ... | ... | |
|
return False
|
||
|
if user_obj.is_superuser:
|
||
|
return True
|
||
|
return perm in self.get_all_permissions(user_obj, obj=obj)
|
||
|
all_perms = self.get_all_permissions(user_obj, obj=obj)
|
||
|
if perm in all_perms:
|
||
|
return self.user_has_sufficient_auth_level(user_obj, all_perms[perm])
|
||
|
return False
|
||
|
def has_perms(self, user_obj, perm_list, obj=None):
|
||
|
if user_obj.is_anonymous():
|
||
|
return False
|
||
|
if not user_obj.is_active:
|
||
|
return False
|
||
|
all_permissions = self.get_all_permissions(user_obj, obj=obj)
|
||
|
return all(perm in all_permissions for perm in perm_list)
|
||
|
for perm in perm_list:
|
||
|
if not self.has_perm(user_obj, perm, obj):
|
||
|
return False
|
||
|
return True
|
||
|
def has_module_perms(self, user_obj, package_name):
|
||
|
if user_obj.is_anonymous():
|
||
| ... | ... | |
|
return False
|
||
|
if user_obj.is_superuser:
|
||
|
return True
|
||
|
return package_name in self.get_permission_cache(user_obj)
|
||
|
cache = self.get_permission_cache(user_obj)
|
||
|
if package_name in cache:
|
||
|
return self.user_has_sufficient_auth_level(user_obj, cache[package_name])
|
||
|
return False
|
||
|
def user_perms_any_auth_level(self, user_obj, user_perms, perms_levels, min_auth_level=False):
|
||
|
"""Return True if user has one of the perms listed in user_perms with a
|
||
|
sufficient auth level. Otherwise return the minimum auth level that it should
|
||
|
get for being granted access (False if no permissions were found at all).
|
||
|
"""
|
||
|
for perm in perms_levels:
|
||
|
if not perm in user_perms:
|
||
|
continue
|
||
|
try:
|
||
|
# If there is at least one common permission, return true provided
|
||
|
# auth level check passes
|
||
|
return self.user_has_sufficient_auth_level(user_obj, perms_levels[perm])
|
||
|
except InsufficientAuthLevel:
|
||
|
if not min_auth_level:
|
||
|
min_auth_level = perms_levels[perm]
|
||
|
else:
|
||
|
min_auth_level = min(perms_levels[perm], min_auth_level)
|
||
|
return min_auth_level
|
||
|
def has_perm_any(self, user_obj, perm_or_perms):
|
||
|
'''Return True if user has any perm on any object'''
|
||
| ... | ... | |
|
return False
|
||
|
if user_obj.is_superuser:
|
||
|
return True
|
||
|
if isinstance(perm_or_perms, six.string_types):
|
||
|
perm_or_perms = [perm_or_perms]
|
||
|
perm_or_perms = set(perm_or_perms)
|
||
|
perms = {perm_or_perms} if isinstance(perm_or_perms, six.string_types) \
|
||
|
else set(perm_or_perms)
|
||
|
cache = self.get_permission_cache(user_obj)
|
||
|
if perm_or_perms & cache.get('__all__', set()):
|
||
|
auth_level = self.user_perms_any_auth_level(user_obj, perms, cache.get('__all__', {}))
|
||
|
if auth_level is True:
|
||
|
return True
|
||
|
for key, value in cache.items():
|
||
|
if isinstance(value, bool):
|
||
|
continue
|
||
|
elif key == '__all__':
|
||
|
continue
|
||
|
elif perm_or_perms & set(value):
|
||
|
return True
|
||
|
return False
|
||
|
else:
|
||
|
auth_level = self.user_perms_any_auth_level(user_obj, perms, value,
|
||
|
min_auth_level=auth_level)
|
||
|
if auth_level is True:
|
||
|
return True
|
||
|
if auth_level is False:
|
||
|
return False
|
||
|
raise InsufficientAuthLevel(auth_level)
|
||
|
def filter_by_perm_query(self, user_obj, perm_or_perms, qs):
|
||
|
'''Create a filter for a queryset for the objects on which the user has
|
||
| ... | ... | |
|
return False
|
||
|
if user_obj.is_superuser:
|
||
|
return True
|
||
|
if isinstance(perm_or_perms, six.string_types):
|
||
|
perm_or_perms = [perm_or_perms]
|
||
|
perm_or_perms = set(perm_or_perms)
|
||
|
perms = {perm_or_perms} if isinstance(perm_or_perms, six.string_types) \
|
||
|
else set(perm_or_perms)
|
||
|
cache = self.get_permission_cache(user_obj)
|
||
|
model = qs.model
|
||
|
OU = utils.get_ou_model()
|
||
|
has_ou_field = get_fk_model(model, 'ou') == OU
|
||
|
if perm_or_perms & cache.get('__all__', set()):
|
||
|
auth_level = self.user_perms_any_auth_level(user_obj, perms, cache.get('__all__', {}))
|
||
|
if auth_level is True:
|
||
|
return True
|
||
|
q = []
|
||
|
for key, value in cache.items():
|
||
| ... | ... | |
|
continue
|
||
|
elif key == '__all__':
|
||
|
continue
|
||
|
elif key.startswith('ou.'):
|
||
|
if has_ou_field and perm_or_perms & value:
|
||
|
elif key.startswith('ou.') and has_ou_field:
|
||
|
new_auth_level = self.user_perms_any_auth_level(user_obj, perms, value,
|
||
|
min_auth_level=auth_level)
|
||
|
if new_auth_level is True:
|
||
|
q.append(Q(ou_id=int(key[3:])))
|
||
|
continue
|
||
|
elif perm_or_perms & value:
|
||
|
ct_id, fk = key.split('.')
|
||
|
q.append(Q(pk=int(fk)))
|
||
|
else:
|
||
|
auth_level = new_auth_level
|
||
|
else:
|
||
|
new_auth_level = self.user_perms_any_auth_level(user_obj, perms, value,
|
||
|
min_auth_level=auth_level)
|
||
|
if auth_level is True:
|
||
|
ct_id, fk = key.split('.')
|
||
|
q.append(Q(pk=int(fk)))
|
||
|
else:
|
||
|
auth_level = new_auth_level
|
||
|
# TODO cette methode est plus compliquee que les autres, il faut surement rajouter
|
||
|
# de la logique en plus pour savoir si on doit raise en fonction de ce qu'il y a
|
||
|
# dans q
|
||
|
if auth_level > 1:
|
||
|
raise InsufficientAuthLevel(auth_level)
|
||
|
if q:
|
||
|
return six.moves.reduce(Q.__or__, q)
|
||
|
return False
|
||
| ... | ... | |
|
return True
|
||
|
if self.has_perm(user_obj, perm):
|
||
|
return True
|
||
|
return perm in self.get_permission_cache(user_obj).get('ou.%s' % ou.pk, ())
|
||
|
perms = self.get_permission_cache(user_obj).get('ou.%s' % ou.pk, ())
|
||
|
if perm in perms:
|
||
|
return self.user_has_sufficient_auth_level(user_obj, perms[perm])
|
||
|
return False
|
||
|
def ous_with_perm(self, user_obj, perm, queryset=None):
|
||
|
OU = utils.get_ou_model()
|
||
| ... | ... | |
|
ou_ids = []
|
||
|
for key in cache:
|
||
|
if key == '__all__' and perm in cache[key]:
|
||
|
return qs
|
||
|
if self.user_has_sufficient_auth_level(user_obj, cache[key][perm]):
|
||
|
return qs
|
||
|
if key.startswith('ou.') and perm in cache[key]:
|
||
|
ou_ids.append(int(key.split('.')[1]))
|
||
|
if self.user_has_sufficient_auth_level(user_obj, cache[key][perm]):
|
||
|
ou_ids.append(int(key.split('.')[1]))
|
||
|
return qs.filter(id__in=ou_ids)
|
||
| src/django_rbac/exceptions.py | ||
|---|---|---|
|
class InsufficientAuthLevel(Exception):
|
||
|
def __init__(self, level):
|
||
|
self.required_level = level
|
||