Projet

Général

Profil

0004-django_rbac-check-authentication-level-along-with-pe.patch

Valentin Deniaud, 04 avril 2019 17:07

Télécharger (12,2 ko)

Voir les différences:

Subject: [PATCH 04/13] django_rbac: check authentication level along with
 permissions

 src/django_rbac/backends.py   | 133 ++++++++++++++++++++++++++--------
 src/django_rbac/exceptions.py |   4 +
 2 files changed, 105 insertions(+), 32 deletions(-)
 create mode 100644 src/django_rbac/exceptions.py
src/django_rbac/backends.py
5 5
from django.db.models.query import Q
6 6
from django.utils import six
7 7

  
8
from .exceptions import InsufficientAuthLevel
9

  
8 10
try:
9 11
    from django.core.exceptions import FieldDoesNotExist
10 12
except ImportError:
......
59 61
            qs = qs.select_related('operation')
60 62
            for permission in qs:
61 63
                target_ct = ContentType.objects.get_for_id(permission.target_ct_id)
64
                auth_level = permission.auth_level
62 65
                if target_ct == ct_ct:
63 66
                    target = ContentType.objects.get_for_id(permission.target_id)
64 67
                    app_label = target.app_label
......
72 75
                    model = target_ct.model
73 76
                    key = '%s.%s' % (permission.target_ct_id, permission.target_id)
74 77
                slug = permission.operation.slug
75
                perms = [str('%s.%s_%s' % (app_label, slug, model))]
78
                perms = {str('%s.%s_%s' % (app_label, slug, model)): auth_level}
76 79
                perm_hierarchy = getattr(settings, 'DJANGO_RBAC_PERMISSIONS_HIERARCHY',
77 80
                                         self._DEFAULT_DJANGO_RBAC_PERMISSIONS_HIERARCHY)
78 81
                if slug in perm_hierarchy:
79 82
                    for other_perm in perm_hierarchy[slug]:
80
                        perms.append(str('%s.%s_%s' % (app_label, other_perm, model)))
81
                permissions = perms_cache.setdefault(key, set())
83
                        perms[str('%s.%s_%s' % (app_label, other_perm, model))] = auth_level
84
                permissions = perms_cache.setdefault(key, {})
82 85
                permissions.update(perms)
83 86
                # optimization for has_module_perms
84 87
                perms_cache[app_label] = True
......
90 93
            return ()
91 94
        perms_cache = self.get_permission_cache(user_obj)
92 95
        if obj:
93
            permissions = set()
96
            permissions = {}
94 97
            ct = ContentType.objects.get_for_model(obj)
95 98
            key = '%s.%s' % (ct.id, obj.pk)
96 99
            if key in perms_cache:
97 100
                permissions.update(perms_cache[key])
98
            for permission in perms_cache.get('__all__', set([])):
101
            perms_cache_all = perms_cache.get('__all__', set([]))
102
            for permission in perms_cache_all:
99 103
                if (permission.startswith('%s.' % ct.app_label)
100 104
                        and permission.endswith('_%s' % ct.model)):
101
                    permissions.add(permission)
105
                    permissions[permission] = perms_cache_all[permission]
102 106
            if hasattr(obj, 'ou_id') and obj.ou_id:
103 107
                key = 'ou.%s' % obj.ou_id
104
                for permission in perms_cache.get(key, ()):
108
                perms_cache_key = perms_cache.get(key, ())
109
                for permission in perms_cache_key:
105 110
                    if (permission.startswith('%s.' % ct.app_label)
106 111
                            and permission.endswith('_%s' % ct.model)):
107
                        permissions.add(permission)
112
                        permissions[permission] = perms_cache_key[permission]
108 113
            return permissions
109 114
        else:
110 115
            return perms_cache.get('__all__', [])
111 116

  
117
    @staticmethod
118
    def user_has_sufficient_auth_level(user_obj, perm_auth_level):
119
        try:
120
            user_auth_level = user_obj.auth_level
121
        except AttributeError:
122
            # If an app doesn't care about auth level, everything will still work
123
            # in a transparent way. However, if it cares and forgot to annotate
124
            # the User object, the check will fail as expected when permission level
125
            # is too high.
126
            user_auth_level = 1
127
        if perm_auth_level <= user_auth_level:
128
            return True
129
        raise InsufficientAuthLevel(perm_auth_level)
130

  
112 131
    def has_perm(self, user_obj, perm, obj=None):
113 132
        if user_obj.is_anonymous():
114 133
            return False
......
116 135
            return False
117 136
        if user_obj.is_superuser:
118 137
            return True
119
        return perm in self.get_all_permissions(user_obj, obj=obj)
138
        all_perms = self.get_all_permissions(user_obj, obj=obj)
139
        if perm in all_perms:
140
            return self.user_has_sufficient_auth_level(user_obj, all_perms[perm])
141
        return False
120 142

  
121 143
    def has_perms(self, user_obj, perm_list, obj=None):
122 144
        if user_obj.is_anonymous():
123 145
            return False
124 146
        if not user_obj.is_active:
125 147
            return False
126
        all_permissions = self.get_all_permissions(user_obj, obj=obj)
127
        return all(perm in all_permissions for perm in perm_list)
148
        for perm in perm_list:
149
            if not self.has_perm(user_obj, perm, obj):
150
                return False
151
        return True
128 152

  
129 153
    def has_module_perms(self, user_obj, package_name):
130 154
        if user_obj.is_anonymous():
......
133 157
            return False
134 158
        if user_obj.is_superuser:
135 159
            return True
136
        return package_name in self.get_permission_cache(user_obj)
160
        cache = self.get_permission_cache(user_obj)
161
        if package_name in cache:
162
            return self.user_has_sufficient_auth_level(user_obj, cache[package_name])
163
        return False
164

  
165
    def user_perms_any_auth_level(self, user_obj, user_perms, perms_levels, min_auth_level=False):
166
        """Return True if user has one of the perms listed in user_perms with a
167
        sufficient auth level. Otherwise return the minimum auth level that it should
168
        get for being granted access (False if no permissions were found at all).
169
        """
170
        for perm in perms_levels:
171
            if not perm in user_perms:
172
                continue
173
            try:
174
                # If there is at least one common permission, return true provided
175
                # auth level check passes
176
                return self.user_has_sufficient_auth_level(user_obj, perms_levels[perm])
177
            except InsufficientAuthLevel:
178
                if not min_auth_level:
179
                    min_auth_level = perms_levels[perm]
180
                else:
181
                    min_auth_level = min(perms_levels[perm], min_auth_level)
182
        return min_auth_level
137 183

  
138 184
    def has_perm_any(self, user_obj, perm_or_perms):
139 185
        '''Return True if user has any perm on any object'''
......
143 189
            return False
144 190
        if user_obj.is_superuser:
145 191
            return True
146
        if isinstance(perm_or_perms, six.string_types):
147
            perm_or_perms = [perm_or_perms]
148
        perm_or_perms = set(perm_or_perms)
192
        perms = {perm_or_perms} if isinstance(perm_or_perms, six.string_types) \
193
                else set(perm_or_perms)
149 194
        cache = self.get_permission_cache(user_obj)
150
        if perm_or_perms & cache.get('__all__', set()):
195
        auth_level = self.user_perms_any_auth_level(user_obj, perms, cache.get('__all__', {}))
196
        if auth_level is True:
151 197
            return True
152 198
        for key, value in cache.items():
153 199
            if isinstance(value, bool):
154 200
                continue
155 201
            elif key == '__all__':
156 202
                continue
157
            elif perm_or_perms & set(value):
158
                return True
159
        return False
203
            else:
204
                auth_level = self.user_perms_any_auth_level(user_obj, perms, value,
205
                                                            min_auth_level=auth_level)
206
                if auth_level is True:
207
                    return True
208
        if auth_level is False:
209
            return False
210
        raise InsufficientAuthLevel(auth_level)
160 211

  
161 212
    def filter_by_perm_query(self, user_obj, perm_or_perms, qs):
162 213
        '''Create a filter for a queryset for the objects on which the user has
......
169 220
            return False
170 221
        if user_obj.is_superuser:
171 222
            return True
172
        if isinstance(perm_or_perms, six.string_types):
173
            perm_or_perms = [perm_or_perms]
174
        perm_or_perms = set(perm_or_perms)
223
        perms = {perm_or_perms} if isinstance(perm_or_perms, six.string_types) \
224
                else set(perm_or_perms)
175 225
        cache = self.get_permission_cache(user_obj)
176 226
        model = qs.model
177 227
        OU = utils.get_ou_model()
178 228
        has_ou_field = get_fk_model(model, 'ou') == OU
179
        if perm_or_perms & cache.get('__all__', set()):
229
        auth_level = self.user_perms_any_auth_level(user_obj, perms, cache.get('__all__', {}))
230
        if auth_level is True:
180 231
            return True
181 232
        q = []
182 233
        for key, value in cache.items():
......
184 235
                continue
185 236
            elif key == '__all__':
186 237
                continue
187
            elif key.startswith('ou.'):
188
                if has_ou_field and perm_or_perms & value:
238
            elif key.startswith('ou.') and has_ou_field:
239
                new_auth_level = self.user_perms_any_auth_level(user_obj, perms, value,
240
                                                                min_auth_level=auth_level)
241
                if new_auth_level is True:
189 242
                    q.append(Q(ou_id=int(key[3:])))
190
                    continue
191
            elif perm_or_perms & value:
192
                ct_id, fk = key.split('.')
193
                q.append(Q(pk=int(fk)))
243
                else:
244
                    auth_level = new_auth_level
245
            else:
246
                new_auth_level = self.user_perms_any_auth_level(user_obj, perms, value,
247
                                                                min_auth_level=auth_level)
248
                if auth_level is True:
249
                    ct_id, fk = key.split('.')
250
                    q.append(Q(pk=int(fk)))
251
                else:
252
                    auth_level = new_auth_level
253
        # TODO cette methode est plus compliquee que les autres, il faut surement rajouter
254
        # de la logique en plus pour savoir si on doit raise en fonction de ce qu'il y a
255
        # dans q
256
        if auth_level > 1:
257
            raise InsufficientAuthLevel(auth_level)
194 258
        if q:
195 259
            return six.moves.reduce(Q.__or__, q)
196 260
        return False
......
217 281
            return True
218 282
        if self.has_perm(user_obj, perm):
219 283
            return True
220
        return perm in self.get_permission_cache(user_obj).get('ou.%s' % ou.pk, ())
284
        perms = self.get_permission_cache(user_obj).get('ou.%s' % ou.pk, ())
285
        if perm in perms:
286
            return self.user_has_sufficient_auth_level(user_obj, perms[perm])
287
        return False
221 288

  
222 289
    def ous_with_perm(self, user_obj, perm, queryset=None):
223 290
        OU = utils.get_ou_model()
......
233 300
        ou_ids = []
234 301
        for key in cache:
235 302
            if key == '__all__' and perm in cache[key]:
236
                return qs
303
                if self.user_has_sufficient_auth_level(user_obj, cache[key][perm]):
304
                    return qs
237 305
            if key.startswith('ou.') and perm in cache[key]:
238
                ou_ids.append(int(key.split('.')[1]))
306
                if self.user_has_sufficient_auth_level(user_obj, cache[key][perm]):
307
                    ou_ids.append(int(key.split('.')[1]))
239 308
        return qs.filter(id__in=ou_ids)
src/django_rbac/exceptions.py
1
class InsufficientAuthLevel(Exception):
2

  
3
    def __init__(self, level):
4
        self.required_level = level
0
-