5 |
5 |
from django.db.models.query import Q
|
6 |
6 |
from django.utils import six
|
7 |
7 |
|
|
8 |
from .exceptions import InsufficientAuthLevel
|
|
9 |
|
8 |
10 |
try:
|
9 |
11 |
from django.core.exceptions import FieldDoesNotExist
|
10 |
12 |
except ImportError:
|
... | ... | |
59 |
61 |
qs = qs.select_related('operation')
|
60 |
62 |
for permission in qs:
|
61 |
63 |
target_ct = ContentType.objects.get_for_id(permission.target_ct_id)
|
|
64 |
auth_level = permission.auth_level
|
62 |
65 |
if target_ct == ct_ct:
|
63 |
66 |
target = ContentType.objects.get_for_id(permission.target_id)
|
64 |
67 |
app_label = target.app_label
|
... | ... | |
72 |
75 |
model = target_ct.model
|
73 |
76 |
key = '%s.%s' % (permission.target_ct_id, permission.target_id)
|
74 |
77 |
slug = permission.operation.slug
|
75 |
|
perms = [str('%s.%s_%s' % (app_label, slug, model))]
|
|
78 |
perms = {str('%s.%s_%s' % (app_label, slug, model)): auth_level}
|
76 |
79 |
perm_hierarchy = getattr(settings, 'DJANGO_RBAC_PERMISSIONS_HIERARCHY',
|
77 |
80 |
self._DEFAULT_DJANGO_RBAC_PERMISSIONS_HIERARCHY)
|
78 |
81 |
if slug in perm_hierarchy:
|
79 |
82 |
for other_perm in perm_hierarchy[slug]:
|
80 |
|
perms.append(str('%s.%s_%s' % (app_label, other_perm, model)))
|
81 |
|
permissions = perms_cache.setdefault(key, set())
|
|
83 |
perms[str('%s.%s_%s' % (app_label, other_perm, model))] = auth_level
|
|
84 |
permissions = perms_cache.setdefault(key, {})
|
82 |
85 |
permissions.update(perms)
|
83 |
86 |
# optimization for has_module_perms
|
84 |
87 |
perms_cache[app_label] = True
|
... | ... | |
90 |
93 |
return ()
|
91 |
94 |
perms_cache = self.get_permission_cache(user_obj)
|
92 |
95 |
if obj:
|
93 |
|
permissions = set()
|
|
96 |
permissions = {}
|
94 |
97 |
ct = ContentType.objects.get_for_model(obj)
|
95 |
98 |
key = '%s.%s' % (ct.id, obj.pk)
|
96 |
99 |
if key in perms_cache:
|
97 |
100 |
permissions.update(perms_cache[key])
|
98 |
|
for permission in perms_cache.get('__all__', set([])):
|
|
101 |
perms_cache_all = perms_cache.get('__all__', set([]))
|
|
102 |
for permission in perms_cache_all:
|
99 |
103 |
if (permission.startswith('%s.' % ct.app_label)
|
100 |
104 |
and permission.endswith('_%s' % ct.model)):
|
101 |
|
permissions.add(permission)
|
|
105 |
permissions[permission] = perms_cache_all[permission]
|
102 |
106 |
if hasattr(obj, 'ou_id') and obj.ou_id:
|
103 |
107 |
key = 'ou.%s' % obj.ou_id
|
104 |
|
for permission in perms_cache.get(key, ()):
|
|
108 |
perms_cache_key = perms_cache.get(key, ())
|
|
109 |
for permission in perms_cache_key:
|
105 |
110 |
if (permission.startswith('%s.' % ct.app_label)
|
106 |
111 |
and permission.endswith('_%s' % ct.model)):
|
107 |
|
permissions.add(permission)
|
|
112 |
permissions[permission] = perms_cache_key[permission]
|
108 |
113 |
return permissions
|
109 |
114 |
else:
|
110 |
115 |
return perms_cache.get('__all__', [])
|
111 |
116 |
|
|
117 |
@staticmethod
|
|
118 |
def user_has_sufficient_auth_level(user_obj, perm_auth_level):
|
|
119 |
try:
|
|
120 |
user_auth_level = user_obj.auth_level
|
|
121 |
except AttributeError:
|
|
122 |
# If an app doesn't care about auth level, everything will still work
|
|
123 |
# in a transparent way. However, if it cares and forgot to annotate
|
|
124 |
# the User object, the check will fail as expected when permission level
|
|
125 |
# is too high.
|
|
126 |
user_auth_level = 1
|
|
127 |
if perm_auth_level <= user_auth_level:
|
|
128 |
return True
|
|
129 |
raise InsufficientAuthLevel(perm_auth_level)
|
|
130 |
|
112 |
131 |
def has_perm(self, user_obj, perm, obj=None):
|
113 |
132 |
if user_obj.is_anonymous():
|
114 |
133 |
return False
|
... | ... | |
116 |
135 |
return False
|
117 |
136 |
if user_obj.is_superuser:
|
118 |
137 |
return True
|
119 |
|
return perm in self.get_all_permissions(user_obj, obj=obj)
|
|
138 |
all_perms = self.get_all_permissions(user_obj, obj=obj)
|
|
139 |
if perm in all_perms:
|
|
140 |
return self.user_has_sufficient_auth_level(user_obj, all_perms[perm])
|
|
141 |
return False
|
120 |
142 |
|
121 |
143 |
def has_perms(self, user_obj, perm_list, obj=None):
|
122 |
144 |
if user_obj.is_anonymous():
|
123 |
145 |
return False
|
124 |
146 |
if not user_obj.is_active:
|
125 |
147 |
return False
|
126 |
|
all_permissions = self.get_all_permissions(user_obj, obj=obj)
|
127 |
|
return all(perm in all_permissions for perm in perm_list)
|
|
148 |
for perm in perm_list:
|
|
149 |
if not self.has_perm(user_obj, perm, obj):
|
|
150 |
return False
|
|
151 |
return True
|
128 |
152 |
|
129 |
153 |
def has_module_perms(self, user_obj, package_name):
|
130 |
154 |
if user_obj.is_anonymous():
|
... | ... | |
133 |
157 |
return False
|
134 |
158 |
if user_obj.is_superuser:
|
135 |
159 |
return True
|
136 |
|
return package_name in self.get_permission_cache(user_obj)
|
|
160 |
cache = self.get_permission_cache(user_obj)
|
|
161 |
if package_name in cache:
|
|
162 |
return self.user_has_sufficient_auth_level(user_obj, cache[package_name])
|
|
163 |
return False
|
|
164 |
|
|
165 |
def user_perms_any_auth_level(self, user_obj, user_perms, perms_levels, min_auth_level=False):
|
|
166 |
"""Return True if user has one of the perms listed in user_perms with a
|
|
167 |
sufficient auth level. Otherwise return the minimum auth level that it should
|
|
168 |
get for being granted access (False if no permissions were found at all).
|
|
169 |
"""
|
|
170 |
for perm in perms_levels:
|
|
171 |
if not perm in user_perms:
|
|
172 |
continue
|
|
173 |
try:
|
|
174 |
# If there is at least one common permission, return true provided
|
|
175 |
# auth level check passes
|
|
176 |
return self.user_has_sufficient_auth_level(user_obj, perms_levels[perm])
|
|
177 |
except InsufficientAuthLevel:
|
|
178 |
if not min_auth_level:
|
|
179 |
min_auth_level = perms_levels[perm]
|
|
180 |
else:
|
|
181 |
min_auth_level = min(perms_levels[perm], min_auth_level)
|
|
182 |
return min_auth_level
|
137 |
183 |
|
138 |
184 |
def has_perm_any(self, user_obj, perm_or_perms):
|
139 |
185 |
'''Return True if user has any perm on any object'''
|
... | ... | |
143 |
189 |
return False
|
144 |
190 |
if user_obj.is_superuser:
|
145 |
191 |
return True
|
146 |
|
if isinstance(perm_or_perms, six.string_types):
|
147 |
|
perm_or_perms = [perm_or_perms]
|
148 |
|
perm_or_perms = set(perm_or_perms)
|
|
192 |
perms = {perm_or_perms} if isinstance(perm_or_perms, six.string_types) \
|
|
193 |
else set(perm_or_perms)
|
149 |
194 |
cache = self.get_permission_cache(user_obj)
|
150 |
|
if perm_or_perms & cache.get('__all__', set()):
|
|
195 |
auth_level = self.user_perms_any_auth_level(user_obj, perms, cache.get('__all__', {}))
|
|
196 |
if auth_level is True:
|
151 |
197 |
return True
|
152 |
198 |
for key, value in cache.items():
|
153 |
199 |
if isinstance(value, bool):
|
154 |
200 |
continue
|
155 |
201 |
elif key == '__all__':
|
156 |
202 |
continue
|
157 |
|
elif perm_or_perms & set(value):
|
158 |
|
return True
|
159 |
|
return False
|
|
203 |
else:
|
|
204 |
auth_level = self.user_perms_any_auth_level(user_obj, perms, value,
|
|
205 |
min_auth_level=auth_level)
|
|
206 |
if auth_level is True:
|
|
207 |
return True
|
|
208 |
if auth_level is False:
|
|
209 |
return False
|
|
210 |
raise InsufficientAuthLevel(auth_level)
|
160 |
211 |
|
161 |
212 |
def filter_by_perm_query(self, user_obj, perm_or_perms, qs):
|
162 |
213 |
'''Create a filter for a queryset for the objects on which the user has
|
... | ... | |
169 |
220 |
return False
|
170 |
221 |
if user_obj.is_superuser:
|
171 |
222 |
return True
|
172 |
|
if isinstance(perm_or_perms, six.string_types):
|
173 |
|
perm_or_perms = [perm_or_perms]
|
174 |
|
perm_or_perms = set(perm_or_perms)
|
|
223 |
perms = {perm_or_perms} if isinstance(perm_or_perms, six.string_types) \
|
|
224 |
else set(perm_or_perms)
|
175 |
225 |
cache = self.get_permission_cache(user_obj)
|
176 |
226 |
model = qs.model
|
177 |
227 |
OU = utils.get_ou_model()
|
178 |
228 |
has_ou_field = get_fk_model(model, 'ou') == OU
|
179 |
|
if perm_or_perms & cache.get('__all__', set()):
|
|
229 |
auth_level = self.user_perms_any_auth_level(user_obj, perms, cache.get('__all__', {}))
|
|
230 |
if auth_level is True:
|
180 |
231 |
return True
|
181 |
232 |
q = []
|
182 |
233 |
for key, value in cache.items():
|
... | ... | |
184 |
235 |
continue
|
185 |
236 |
elif key == '__all__':
|
186 |
237 |
continue
|
187 |
|
elif key.startswith('ou.'):
|
188 |
|
if has_ou_field and perm_or_perms & value:
|
|
238 |
elif key.startswith('ou.') and has_ou_field:
|
|
239 |
new_auth_level = self.user_perms_any_auth_level(user_obj, perms, value,
|
|
240 |
min_auth_level=auth_level)
|
|
241 |
if new_auth_level is True:
|
189 |
242 |
q.append(Q(ou_id=int(key[3:])))
|
190 |
|
continue
|
191 |
|
elif perm_or_perms & value:
|
192 |
|
ct_id, fk = key.split('.')
|
193 |
|
q.append(Q(pk=int(fk)))
|
|
243 |
else:
|
|
244 |
auth_level = new_auth_level
|
|
245 |
else:
|
|
246 |
new_auth_level = self.user_perms_any_auth_level(user_obj, perms, value,
|
|
247 |
min_auth_level=auth_level)
|
|
248 |
if auth_level is True:
|
|
249 |
ct_id, fk = key.split('.')
|
|
250 |
q.append(Q(pk=int(fk)))
|
|
251 |
else:
|
|
252 |
auth_level = new_auth_level
|
|
253 |
# TODO cette methode est plus compliquee que les autres, il faut surement rajouter
|
|
254 |
# de la logique en plus pour savoir si on doit raise en fonction de ce qu'il y a
|
|
255 |
# dans q
|
|
256 |
if auth_level > 1:
|
|
257 |
raise InsufficientAuthLevel(auth_level)
|
194 |
258 |
if q:
|
195 |
259 |
return six.moves.reduce(Q.__or__, q)
|
196 |
260 |
return False
|
... | ... | |
217 |
281 |
return True
|
218 |
282 |
if self.has_perm(user_obj, perm):
|
219 |
283 |
return True
|
220 |
|
return perm in self.get_permission_cache(user_obj).get('ou.%s' % ou.pk, ())
|
|
284 |
perms = self.get_permission_cache(user_obj).get('ou.%s' % ou.pk, ())
|
|
285 |
if perm in perms:
|
|
286 |
return self.user_has_sufficient_auth_level(user_obj, perms[perm])
|
|
287 |
return False
|
221 |
288 |
|
222 |
289 |
def ous_with_perm(self, user_obj, perm, queryset=None):
|
223 |
290 |
OU = utils.get_ou_model()
|
... | ... | |
233 |
300 |
ou_ids = []
|
234 |
301 |
for key in cache:
|
235 |
302 |
if key == '__all__' and perm in cache[key]:
|
236 |
|
return qs
|
|
303 |
if self.user_has_sufficient_auth_level(user_obj, cache[key][perm]):
|
|
304 |
return qs
|
237 |
305 |
if key.startswith('ou.') and perm in cache[key]:
|
238 |
|
ou_ids.append(int(key.split('.')[1]))
|
|
306 |
if self.user_has_sufficient_auth_level(user_obj, cache[key][perm]):
|
|
307 |
ou_ids.append(int(key.split('.')[1]))
|
239 |
308 |
return qs.filter(id__in=ou_ids)
|