0006-utils-add-DjangoRBACPermission-DRF-s-permission-clas.patch
src/authentic2/utils/api.py | ||
---|---|---|
16 | 16 | |
17 | 17 | |
18 | 18 |
from django.db import models |
19 |
from rest_framework import exceptions, serializers |
|
19 |
from rest_framework import exceptions, permissions, serializers
|
|
20 | 20 | |
21 | 21 | |
22 | 22 |
class NaturalKeyRelatedField(serializers.RelatedField): |
... | ... | |
77 | 77 |
except model.MultipleObjectsReturned: |
78 | 78 |
raise exceptions.ValidationError('multiple objects returned') |
79 | 79 |
raise exceptions.ValidationError('object not found') |
80 | ||
81 | ||
82 |
class DjangoRBACPermission(permissions.BasePermission): |
|
83 |
perms_map = { |
|
84 |
'GET': [], |
|
85 |
'OPTIONS': [], |
|
86 |
'HEAD': [], |
|
87 |
'POST': ['add'], |
|
88 |
'PUT': ['change'], |
|
89 |
'PATCH': ['change'], |
|
90 |
'DELETE': ['delete'], |
|
91 |
} |
|
92 |
object_perms_map = { |
|
93 |
'GET': ['view'], |
|
94 |
} |
|
95 | ||
96 |
def __init__(self, perms_map=None, object_perms_map=None): |
|
97 |
self.perms_map = perms_map or dict(self.perms_map) |
|
98 |
if object_perms_map: |
|
99 |
self.object_perms_map = object_perms_map |
|
100 |
else: |
|
101 |
self.object_perms_map = dict(self.object_perms_map) |
|
102 |
for k, v in self.perms_map.items(): |
|
103 |
if v: |
|
104 |
self.object_perms_map[k] = v |
|
105 | ||
106 |
def _get_queryset(self, view): |
|
107 |
assert hasattr(view, 'get_queryset') or getattr(view, 'queryset', None) is not None, ( |
|
108 |
'Cannot apply {} on a view that does not set ' '`.queryset` or have a `.get_queryset()` method.' |
|
109 |
).format(self.__class__.__name__) |
|
110 | ||
111 |
if hasattr(view, 'get_queryset'): |
|
112 |
queryset = view.get_queryset() |
|
113 |
assert queryset is not None, f'{view.__class__.__name__}.get_queryset() returned None' |
|
114 |
return queryset |
|
115 |
return view.queryset |
|
116 | ||
117 |
def _get_required_permissions(self, method, model_cls, perms_map): |
|
118 |
""" |
|
119 |
Given a model and an HTTP method, return the list of permission |
|
120 |
codes that the user is required to have. |
|
121 |
""" |
|
122 |
app_label = model_cls._meta.app_label |
|
123 |
model_name = model_cls._meta.model_name |
|
124 | ||
125 |
if method not in perms_map: |
|
126 |
raise exceptions.MethodNotAllowed(method) |
|
127 | ||
128 |
return [f'{app_label}.{perm}_{model_name}' if '.' not in perm else perm for perm in perms_map[method]] |
|
129 | ||
130 |
def has_permission(self, request, view): |
|
131 |
if not request.user or not request.user.is_authenticated: |
|
132 |
return False |
|
133 | ||
134 |
queryset = self._get_queryset(view) |
|
135 |
perms = self._get_required_permissions(request.method, queryset.model, self.perms_map) |
|
136 | ||
137 |
return request.user.has_perms(perms) |
|
138 | ||
139 |
def has_object_permission(self, request, view, obj): |
|
140 |
if not request.user or not request.user.is_authenticated: |
|
141 |
return False |
|
142 | ||
143 |
queryset = self._get_queryset(view) |
|
144 |
perms = self._get_required_permissions(request.method, queryset.model, self.object_perms_map) |
|
145 | ||
146 |
return request.user.has_perms(perms, obj=obj) |
|
147 | ||
148 |
def __call__(self): |
|
149 |
return self |
|
150 | ||
151 |
def __repr__(self): |
|
152 |
return f'<DjangoRBACPermission perms_map={self.perms_map} object_perms_map={self.object_perms_map}>' |
tests/test_utils_api.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
from unittest import mock |
|
18 | ||
17 | 19 |
import pytest |
18 |
from rest_framework.exceptions import ValidationError |
|
20 |
from rest_framework.exceptions import MethodNotAllowed, ValidationError
|
|
19 | 21 | |
20 | 22 |
from authentic2.a2_rbac.models import OrganizationalUnit as OU |
21 | 23 |
from authentic2.a2_rbac.models import Role |
22 | 24 |
from authentic2.models import Service |
23 |
from authentic2.utils.api import NaturalKeyRelatedField |
|
25 |
from authentic2.utils.api import DjangoRBACPermission, NaturalKeyRelatedField
|
|
24 | 26 |
from tests.utils import scoped_db_fixture |
25 | 27 | |
26 | 28 | |
... | ... | |
132 | 134 |
assert ( |
133 | 135 |
NaturalKeyRelatedField(queryset=Role.objects.all()).to_internal_value(value) == fixture.role |
134 | 136 |
) |
137 | ||
138 | ||
139 |
class TestDjangoRBACPermission: |
|
140 |
@pytest.fixture |
|
141 |
def permission(self): |
|
142 |
return DjangoRBACPermission( |
|
143 |
perms_map={ |
|
144 |
'GET': [], |
|
145 |
'POST': ['create'], |
|
146 |
'DELETE': [], |
|
147 |
}, |
|
148 |
object_perms_map={ |
|
149 |
'GET': [], |
|
150 |
'DELETE': ['delete'], |
|
151 |
}, |
|
152 |
) |
|
153 | ||
154 |
@pytest.fixture |
|
155 |
def view(self): |
|
156 |
view = mock.Mock() |
|
157 |
view.get_queryset.return_value = Role.objects.all() |
|
158 |
return view |
|
159 | ||
160 |
class TestHasPermission: |
|
161 |
def test_user_must_be_authenticated(self, permission): |
|
162 |
request = mock.Mock() |
|
163 |
request.user.is_authenticated = False |
|
164 |
assert not permission.has_permission(request=request, view=mock.Mock()) |
|
165 | ||
166 |
def test_method_is_not_allowed(self, rf, permission): |
|
167 |
request = mock.Mock() |
|
168 |
request.method = 'PATCH' |
|
169 |
request.user.is_authenticated = True |
|
170 | ||
171 |
with pytest.raises(MethodNotAllowed): |
|
172 |
permission.has_permission(request=request, view=mock.Mock()) |
|
173 | ||
174 |
def test_method_post(self, permission, view): |
|
175 |
request = mock.Mock() |
|
176 |
request.method = 'POST' |
|
177 |
request.user.is_authenticated = True |
|
178 |
request.user.has_perms = lambda perms, obj=None: not obj and set(perms) <= {'a2_rbac.create_role'} |
|
179 |
assert permission.has_permission(request=request, view=view) |
|
180 | ||
181 |
request.user.has_perms = lambda perms, obj=None: not obj and set(perms) <= set() |
|
182 |
assert not permission.has_permission(request=request, view=view) |
|
183 | ||
184 |
def test_method_get(self, permission, view): |
|
185 |
request = mock.Mock() |
|
186 |
request.user.is_authenticated = True |
|
187 |
request.method = 'GET' |
|
188 |
request.user.has_perms = lambda perms, obj=None: not obj and (not perms or set(perms) <= set()) |
|
189 |
assert permission.has_permission(request=request, view=view) |
|
190 | ||
191 |
class TestHasObjectPermission: |
|
192 |
def test_user_must_be_authenticated(self, permission): |
|
193 |
request = mock.Mock() |
|
194 |
request.user.is_authenticated = False |
|
195 |
assert not permission.has_object_permission(request=request, view=mock.Mock(), obj=mock.Mock()) |
|
196 | ||
197 |
def test_method_is_not_allowed(self, rf, permission): |
|
198 |
request = mock.Mock() |
|
199 |
request.method = 'PATCH' |
|
200 |
request.user.is_authenticated = True |
|
201 | ||
202 |
with pytest.raises(MethodNotAllowed): |
|
203 |
permission.has_object_permission(request=request, view=mock.Mock(), obj=mock.Mock()) |
|
204 | ||
205 |
def test_method_delete(self, permission, view): |
|
206 |
request = mock.Mock() |
|
207 |
request.method = 'DELETE' |
|
208 |
request.user.is_authenticated = True |
|
209 |
mock_obj = mock.Mock() |
|
210 |
request.user.has_perms = ( |
|
211 |
lambda perms, obj=None: set(perms) <= {'a2_rbac.delete_role'} and obj is mock_obj |
|
212 |
) |
|
213 |
assert permission.has_object_permission(request=request, view=view, obj=mock_obj) |
|
214 | ||
215 |
request.user.has_perms = mock.Mock(return_value=False) |
|
216 |
assert not permission.has_object_permission(request=request, view=view, obj=mock_obj) |
|
217 |
assert request.user.has_perms.call_args[1]['obj'] is mock_obj |
|
218 | ||
219 |
request.method = 'GET' |
|
220 |
request.user.has_perms = lambda perms, obj=None: not obj and set(perms) <= {'a2_rbac.create_role'} |
|
221 |
assert permission.has_permission(request=request, view=view) |
|
135 |
- |