From 4caea6efd5a243872f003ae3be6037b0953f8e01 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Mon, 9 May 2022 16:03:20 +0200 Subject: [PATCH 6/9] utils: add DjangoRBACPermission DRF's permission class (#62013) --- src/authentic2/utils/api.py | 75 +++++++++++++++++++++++++++++- tests/test_utils_api.py | 91 ++++++++++++++++++++++++++++++++++++- 2 files changed, 163 insertions(+), 3 deletions(-) diff --git a/src/authentic2/utils/api.py b/src/authentic2/utils/api.py index 9506613f..6bfea281 100644 --- a/src/authentic2/utils/api.py +++ b/src/authentic2/utils/api.py @@ -16,7 +16,7 @@ from django.db import models -from rest_framework import exceptions, serializers +from rest_framework import exceptions, permissions, serializers class NaturalKeyRelatedField(serializers.RelatedField): @@ -77,3 +77,76 @@ class NaturalKeyRelatedField(serializers.RelatedField): except model.MultipleObjectsReturned: raise exceptions.ValidationError('multiple objects returned') raise exceptions.ValidationError('object not found') + + +class DjangoRBACPermission(permissions.BasePermission): + perms_map = { + 'GET': [], + 'OPTIONS': [], + 'HEAD': [], + 'POST': ['add'], + 'PUT': ['change'], + 'PATCH': ['change'], + 'DELETE': ['delete'], + } + object_perms_map = { + 'GET': ['view'], + } + + def __init__(self, perms_map=None, object_perms_map=None): + self.perms_map = perms_map or dict(self.perms_map) + if object_perms_map: + self.object_perms_map = object_perms_map + else: + self.object_perms_map = dict(self.object_perms_map) + for k, v in self.perms_map.items(): + if v: + self.object_perms_map[k] = v + + def _get_queryset(self, view): + assert hasattr(view, 'get_queryset') or getattr(view, 'queryset', None) is not None, ( + 'Cannot apply {} on a view that does not set ' '`.queryset` or have a `.get_queryset()` method.' + ).format(self.__class__.__name__) + + if hasattr(view, 'get_queryset'): + queryset = view.get_queryset() + assert queryset is not None, f'{view.__class__.__name__}.get_queryset() returned None' + return queryset + return view.queryset + + def _get_required_permissions(self, method, model_cls, perms_map): + """ + Given a model and an HTTP method, return the list of permission + codes that the user is required to have. + """ + app_label = model_cls._meta.app_label + model_name = model_cls._meta.model_name + + if method not in perms_map: + raise exceptions.MethodNotAllowed(method) + + return [f'{app_label}.{perm}_{model_name}' if '.' not in perm else perm for perm in perms_map[method]] + + def has_permission(self, request, view): + if not request.user or not request.user.is_authenticated: + return False + + queryset = self._get_queryset(view) + perms = self._get_required_permissions(request.method, queryset.model, self.perms_map) + + return request.user.has_perms(perms) + + def has_object_permission(self, request, view, obj): + if not request.user or not request.user.is_authenticated: + return False + + queryset = self._get_queryset(view) + perms = self._get_required_permissions(request.method, queryset.model, self.object_perms_map) + + return request.user.has_perms(perms, obj=obj) + + def __call__(self): + return self + + def __repr__(self): + return f'' diff --git a/tests/test_utils_api.py b/tests/test_utils_api.py index 70c259e3..2e6f104f 100644 --- a/tests/test_utils_api.py +++ b/tests/test_utils_api.py @@ -14,13 +14,15 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +from unittest import mock + import pytest -from rest_framework.exceptions import ValidationError +from rest_framework.exceptions import MethodNotAllowed, ValidationError from authentic2.a2_rbac.models import OrganizationalUnit as OU from authentic2.a2_rbac.models import Role from authentic2.models import Service -from authentic2.utils.api import NaturalKeyRelatedField +from authentic2.utils.api import DjangoRBACPermission, NaturalKeyRelatedField from tests.utils import scoped_db_fixture @@ -132,3 +134,88 @@ class TestNaturalKeyRelatedField: assert ( NaturalKeyRelatedField(queryset=Role.objects.all()).to_internal_value(value) == fixture.role ) + + +class TestDjangoRBACPermission: + @pytest.fixture + def permission(self): + return DjangoRBACPermission( + perms_map={ + 'GET': [], + 'POST': ['create'], + 'DELETE': [], + }, + object_perms_map={ + 'GET': [], + 'DELETE': ['delete'], + }, + ) + + @pytest.fixture + def view(self): + view = mock.Mock() + view.get_queryset.return_value = Role.objects.all() + return view + + class TestHasPermission: + def test_user_must_be_authenticated(self, permission): + request = mock.Mock() + request.user.is_authenticated = False + assert not permission.has_permission(request=request, view=mock.Mock()) + + def test_method_is_not_allowed(self, rf, permission): + request = mock.Mock() + request.method = 'PATCH' + request.user.is_authenticated = True + + with pytest.raises(MethodNotAllowed): + permission.has_permission(request=request, view=mock.Mock()) + + def test_method_post(self, permission, view): + request = mock.Mock() + request.method = 'POST' + request.user.is_authenticated = True + request.user.has_perms = lambda perms, obj=None: not obj and set(perms) <= {'a2_rbac.create_role'} + assert permission.has_permission(request=request, view=view) + + request.user.has_perms = lambda perms, obj=None: not obj and set(perms) <= set() + assert not permission.has_permission(request=request, view=view) + + def test_method_get(self, permission, view): + request = mock.Mock() + request.user.is_authenticated = True + request.method = 'GET' + request.user.has_perms = lambda perms, obj=None: not obj and (not perms or set(perms) <= set()) + assert permission.has_permission(request=request, view=view) + + class TestHasObjectPermission: + def test_user_must_be_authenticated(self, permission): + request = mock.Mock() + request.user.is_authenticated = False + assert not permission.has_object_permission(request=request, view=mock.Mock(), obj=mock.Mock()) + + def test_method_is_not_allowed(self, rf, permission): + request = mock.Mock() + request.method = 'PATCH' + request.user.is_authenticated = True + + with pytest.raises(MethodNotAllowed): + permission.has_object_permission(request=request, view=mock.Mock(), obj=mock.Mock()) + + def test_method_delete(self, permission, view): + request = mock.Mock() + request.method = 'DELETE' + request.user.is_authenticated = True + mock_obj = mock.Mock() + request.user.has_perms = ( + lambda perms, obj=None: set(perms) <= {'a2_rbac.delete_role'} and obj is mock_obj + ) + assert permission.has_object_permission(request=request, view=view, obj=mock_obj) + + request.user.has_perms = mock.Mock(return_value=False) + assert not permission.has_object_permission(request=request, view=view, obj=mock_obj) + assert request.user.has_perms.call_args[1]['obj'] is mock_obj + + request.method = 'GET' + request.user.has_perms = lambda perms, obj=None: not obj and set(perms) <= {'a2_rbac.create_role'} + assert permission.has_permission(request=request, view=view) -- 2.35.1