From 879b8930272272d42b5ec1a24ee4cf5baf7062e0 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Fri, 6 May 2022 11:35:23 +0200 Subject: [PATCH 9/9] api: add endpoints to manage role inheritance (#62013) --- src/authentic2/api_urls.py | 10 ++ src/authentic2/api_views.py | 137 ++++++++++++++++- tests/api/test_roles.py | 297 ++++++++++++++++++++++++++++++++++++ 3 files changed, 440 insertions(+), 4 deletions(-) create mode 100644 tests/api/test_roles.py diff --git a/src/authentic2/api_urls.py b/src/authentic2/api_urls.py index 17102f05..64b1b727 100644 --- a/src/authentic2/api_urls.py +++ b/src/authentic2/api_urls.py @@ -42,6 +42,16 @@ urlpatterns = [ api_views.role_memberships, name='a2-api-role-members', ), + url( + r'^roles/(?P[0-9a-z]{32})/parents/$', + api_views.roles_parents, + name='a2-api-role-parents', + ), + url( + r'^roles/(?P[\w+]*)/relationships/parents/$', + api_views.roles_parents_relationships, + name='a2-api-role-parents-relationships', + ), url(r'^check-password/$', api_views.check_password, name='a2-api-check-password'), url(r'^validate-password/$', api_views.validate_password, name='a2-api-validate-password'), url(r'^address-autocomplete/$', api_views.address_autocomplete, name='a2-api-address-autocomplete'), diff --git a/src/authentic2/api_views.py b/src/authentic2/api_views.py index d76d5336..f983f60d 100644 --- a/src/authentic2/api_views.py +++ b/src/authentic2/api_views.py @@ -40,7 +40,7 @@ from pytz.exceptions import AmbiguousTimeError, NonExistentTimeError from requests.exceptions import RequestException from rest_framework import authentication, pagination, permissions, serializers, status from rest_framework.authentication import SessionAuthentication -from rest_framework.exceptions import AuthenticationFailed, PermissionDenied, ValidationError +from rest_framework.exceptions import AuthenticationFailed, ErrorDetail, PermissionDenied, ValidationError from rest_framework.fields import CreateOnlyDefault from rest_framework.filters import BaseFilterBackend from rest_framework.generics import GenericAPIView @@ -54,13 +54,14 @@ from rest_framework.viewsets import ModelViewSet, ViewSet from authentic2.compat.drf import action from . import api_mixins, app_settings, decorators, hooks -from .a2_rbac.models import OrganizationalUnit, Role +from .a2_rbac.models import OrganizationalUnit, Role, RoleParenting from .a2_rbac.utils import get_default_ou from .custom_user.models import Profile, ProfileType, User from .journal_event_types import UserLogin, UserRegistration from .models import Attribute, PasswordReset, Service from .passwords import get_password_checker from .utils import misc as utils_misc +from .utils.api import DjangoRBACPermission, NaturalKeyRelatedField from .utils.lookups import Unaccent # Retro-compatibility with older Django versions @@ -918,9 +919,11 @@ class RolesAPI(api_mixins.GetOrCreateMixinView, ExceptionHandlerMixin, ModelView filter_backends = api_settings.DEFAULT_FILTER_BACKENDS filterset_class = RolesFilter lookup_field = 'uuid' + queryset = Role.objects.all() - def get_queryset(self): - return self.request.user.filter_by_perm('a2_rbac.view_role', Role.objects.all()) + def filter_queryset(self, queryset): + queryset = super().filter_queryset(queryset) + return self.request.user.filter_by_perm('a2_rbac.view_role', queryset) def perform_destroy(self, instance): if not self.request.user.has_perm(perm='a2_rbac.delete_role', obj=instance): @@ -1196,6 +1199,132 @@ class RoleMembershipsAPI(ExceptionHandlerMixin, APIView): role_memberships = RoleMembershipsAPI.as_view() +class PublikMixin: + def finalize_response(self, request, response, *args, **kwargs): + '''Adapt error response to Publik schema''' + response = super().finalize_response(request, response, *args, **kwargs) + if isinstance(response.data, dict) and 'err' not in response.data: + if list(response.data.keys()) == ['detail'] and isinstance(response.data['detail'], ErrorDetail): + response.data = { + 'err': 1, + 'err_class': response.data['detail'].code, + 'err_desc': str(response.data['detail']), + } + elif 'errors' in response.data: + response.data['err'] = 1 + response.data.pop('result', None) + response.data['err_desc'] = response.data.pop('errors') + return response + + +class RoleParentSerializer(RoleSerializer): + direct = serializers.BooleanField(read_only=True) + + class Meta(RoleSerializer.Meta): + fields = RoleSerializer.Meta.fields + ('direct',) + + +class RolesParentsAPI(PublikMixin, GenericAPIView): + permission_classes = [ + DjangoRBACPermission( + perms_map={ + 'GET': [], + }, + object_perms_map={ + 'GET': ['a2_rbac.view_role'], + }, + ) + ] + serializer_class = RoleParentSerializer + queryset = Role.objects.all() + + def get(self, request, *, role_uuid, **kwargs): + direct = None if 'all' in self.request.GET else True + role = get_object_or_404(Role, uuid=role_uuid) + self.check_object_permissions(self.request, role) + qs = self.get_queryset() + qs = self.queryset.filter(pk=role.pk) + qs = qs.parents(include_self=False, annotate=True, direct=direct) + qs = request.user.filter_by_perm('a2_rbac.search_role', qs) + qs = qs.order_by('id') + serializer = self.get_serializer(qs, many=True) + return Response({'err': 0, 'data': serializer.data}) + + +roles_parents = RolesParentsAPI.as_view() + + +class RoleParentingSerializer(serializers.ModelSerializer): + parent = NaturalKeyRelatedField(queryset=Role.objects.all()) + direct = serializers.BooleanField(read_only=True) + + class Meta: + model = RoleParenting + fields = [ + 'parent', + 'direct', + ] + + +class RolesParentsRelationshipsAPI(PublikMixin, GenericAPIView): + permission_classes = [ + DjangoRBACPermission( + perms_map={ + 'GET': [], + 'POST': [], + 'DELETE': [], + }, + object_perms_map={ + 'GET': ['a2_rbac.view_role'], + 'POST': ['a2_rbac.manage_members_role'], + 'DELETE': ['a2_rbac.manage_members_role'], + }, + ) + ] + serializer_class = RoleParentingSerializer + queryset = RoleParenting.alive.all() + + def filter_queryset(self, queryset): + if 'all' in self.request.GET: + qs = queryset.filter(child__uuid=self.kwargs['role_uuid']) + else: + qs = queryset.filter(child__uuid=self.kwargs['role_uuid'], direct=True) + qs = qs.filter(parent__in=self.request.user.filter_by_perm('a2_rbac.view_role', Role.objects.all())) + qs = qs.order_by('id') + return qs + + def get(self, request, *, role_uuid, **kwargs): + role = get_object_or_404(Role, uuid=role_uuid) + self.check_object_permissions(self.request, role) + return self.list() + + def list(self): + queryset = self.filter_queryset(self.get_queryset()) + serializer = self.get_serializer(queryset, many=True) + return Response({'err': 0, 'data': serializer.data}) + + def post(self, request, *, role_uuid, **kwargs): + serializer = self.get_serializer(data=request.data) + serializer.is_valid(raise_exception=True) + parent = serializer.validated_data['parent'] + self.check_object_permissions(self.request, parent) + child = get_object_or_404(Role.objects.all(), uuid=role_uuid) + child.add_parent(parent) + return self.list() + + def delete(self, request, *, role_uuid, **kwargs): + serializer = self.get_serializer(data=request.data) + serializer.is_valid(raise_exception=True) + parent = serializer.validated_data['parent'] + self.check_object_permissions(self.request, parent) + role = get_object_or_404(Role, uuid=role_uuid) + role.remove_parent(parent) + return self.list() + + +roles_parents_relationships = RolesParentsRelationshipsAPI.as_view() + + class BaseOrganizationalUnitSerializer(serializers.ModelSerializer): slug = serializers.SlugField( required=False, diff --git a/tests/api/test_roles.py b/tests/api/test_roles.py new file mode 100644 index 00000000..3e62abcc --- /dev/null +++ b/tests/api/test_roles.py @@ -0,0 +1,297 @@ +# authentic2 - versatile identity manager +# Copyright (C) 2010-2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import pytest + +from authentic2.a2_rbac.models import Role +from authentic2.api_views import OrganizationalUnit as OU +from authentic2.api_views import RoleParentingSerializer, RoleParentSerializer +from tests.utils import scoped_db_fixture + + +@scoped_db_fixture(scope='module', autouse=True) +def roles(): + class Namespace: + parent = Role.objects.create(name='parent', uuid='a' * 32) + child = Role.objects.create(name='child', uuid='1' * 32) + parent.add_child(child) + grandchild = Role.objects.create(name='grandchild', uuid='2' * 32) + child.add_child(grandchild) + + return Namespace + + +class TestSerializer: + def test_role_parent(self): + ou = OU(name='OU', slug='ou', uuid='2' * 32) + role = Role(uuid='1' * 32, name='Role', slug='role', ou=ou) + role.direct = True + assert RoleParentSerializer(role).data == { + 'uuid': '1' * 32, + 'name': 'Role', + 'slug': 'role', + 'direct': True, + 'ou': 'ou', + } + + def test_role_parenting(self, db, roles): + assert RoleParentingSerializer(roles.parent.child_relation.first()).data == { + 'parent': {'service': None, 'slug': 'parent', 'ou': None, 'name': 'parent', 'uuid': 'a' * 32}, + 'direct': True, + } + + +class TestViews: + class TestParents: + def test_not_authenticated(self, db, app, admin, roles): + app.get('/api/roles/%s/parents/' % roles.grandchild.uuid, status=401) + + class TestAuthenticated: + @pytest.fixture + def app(self, app, admin): + app.authorization = ('Basic', (admin.username, admin.username)) + return app + + def test_default(self, app, roles): + resp = app.get('/api/roles/%s/parents/' % roles.grandchild.uuid) + assert resp.json == { + 'err': 0, + 'data': [ + { + 'uuid': '1' * 32, + 'name': 'child', + 'slug': 'child', + 'ou': None, + 'direct': True, + } + ], + } + + def test_all(self, app, roles): + resp = app.get('/api/roles/%s/parents/?all' % roles.grandchild.uuid) + assert resp.json == { + 'err': 0, + 'data': [ + { + 'uuid': 'a' * 32, + 'name': 'parent', + 'slug': 'parent', + 'ou': None, + 'direct': False, + }, + { + 'uuid': '1' * 32, + 'name': 'child', + 'slug': 'child', + 'ou': None, + 'direct': True, + }, + ], + } + + def test_permission(self, app, simple_user, roles): + role = Role.objects.create(name='admin') + role.members.add(simple_user) + app.authorization = ('Basic', (simple_user.username, simple_user.username)) + app.get('/api/roles/%s/parents/' % roles.grandchild.uuid, status=403) + role.add_permission(roles.grandchild, 'view') + resp = app.get('/api/roles/%s/parents/?all' % roles.grandchild.uuid, status=200) + assert not resp.json['data'] + role.add_permission(roles.child, 'view') + resp = app.get('/api/roles/%s/parents/?all' % roles.grandchild.uuid, status=200) + assert len(resp.json['data']) == 1 + role.add_permission(roles.parent, 'view') + resp = app.get('/api/roles/%s/parents/?all' % roles.grandchild.uuid, status=200) + assert len(resp.json['data']) == 2 + + class TestParentsRelationships: + def test_not_authenticated(self, db, app, admin, roles): + app.get('/api/roles/%s/relationships/parents/' % roles.parent.uuid, status=401) + + class TestAuthenticated: + @pytest.fixture + def app(self, app, admin): + app.authorization = ('Basic', (admin.username, admin.username)) + return app + + def test_default(self, app, roles): + resp = app.get('/api/roles/%s/relationships/parents/' % roles.grandchild.uuid) + assert resp.json == { + 'err': 0, + 'data': [ + { + 'parent': { + 'uuid': '1' * 32, + 'name': 'child', + 'slug': 'child', + 'ou': None, + 'service': None, + }, + 'direct': True, + } + ], + } + + def test_all(self, app, roles): + resp = app.get('/api/roles/%s/relationships/parents/?all' % roles.grandchild.uuid) + assert resp.json == { + 'err': 0, + 'data': [ + { + 'parent': { + 'uuid': '1' * 32, + 'name': 'child', + 'slug': 'child', + 'ou': None, + 'service': None, + }, + 'direct': True, + }, + { + 'parent': { + 'uuid': 'a' * 32, + 'name': 'parent', + 'slug': 'parent', + 'ou': None, + 'service': None, + }, + 'direct': False, + }, + ], + } + + @pytest.mark.parametrize( + 'role_description', + [{'uuid': 'a' * 32}, {'slug': 'parent'}, {'name': 'parent'}], + ids=['by_uuid', 'by_slug', 'by_name'], + ) + def test_create(self, role_description, app, roles): + assert set(roles.parent.children(include_self=False, direct=True)) == {roles.child} + resp = app.post_json( + '/api/roles/%s/relationships/parents/' % roles.grandchild.uuid, + params={'parent': role_description}, + ) + assert resp.json == { + 'err': 0, + 'data': [ + { + 'parent': { + 'uuid': '1' * 32, + 'name': 'child', + 'slug': 'child', + 'ou': None, + 'service': None, + }, + 'direct': True, + }, + { + 'parent': { + 'uuid': 'a' * 32, + 'name': 'parent', + 'slug': 'parent', + 'ou': None, + 'service': None, + }, + 'direct': True, + }, + ], + } + assert set(roles.parent.children(include_self=False, direct=True)) == { + roles.child, + roles.grandchild, + } + + def test_delete(self, app, roles): + assert set(roles.parent.children(include_self=False, direct=True)) == {roles.child} + resp = app.delete_json( + '/api/roles/%s/relationships/parents/' % roles.grandchild.uuid, + params={'parent': {'uuid': roles.child.uuid}}, + ) + assert resp.json == {'err': 0, 'data': []} + assert not set(roles.grandchild.children(include_self=False, direct=True)) + + class TestPermission: + @pytest.fixture + def user(self, simple_user): + return simple_user + + @pytest.fixture + def admin_role(self, user): + role = Role.objects.create(name='admin') + role.members.add(user) + return role + + @pytest.fixture + def app(self, app, user): + app.authorization = ('Basic', (user.username, user.username)) + return app + + def test_list(self, app, admin_role, roles): + resp = app.get('/api/roles/%s/relationships/parents/' % roles.grandchild.uuid, status=403) + assert resp.json == { + 'err': 1, + 'err_class': 'permission_denied', + 'err_desc': 'You do not have permission to perform this action.', + } + + admin_role.add_permission(roles.grandchild, 'view') + resp = app.get('/api/roles/%s/relationships/parents/?all' % roles.grandchild.uuid, status=200) + assert not resp.json['data'] + admin_role.add_permission(roles.child, 'view') + resp = app.get('/api/roles/%s/relationships/parents/?all' % roles.grandchild.uuid, status=200) + assert len(resp.json['data']) == 1 + admin_role.add_permission(roles.parent, 'view') + resp = app.get('/api/roles/%s/relationships/parents/?all' % roles.grandchild.uuid, status=200) + assert len(resp.json['data']) == 2 + + def test_create(self, app, admin_role, roles): + app.post_json( + '/api/roles/%s/relationships/parents/' % roles.grandchild.uuid, + params={'parent': {'uuid': roles.parent.uuid}}, + status=403, + ) + admin_role.add_permission(roles.grandchild, 'manage_members') + app.post_json( + '/api/roles/%s/relationships/parents/' % roles.grandchild.uuid, + params={'parent': {'uuid': roles.parent.uuid}}, + status=403, + ) + admin_role.add_permission(roles.parent, 'manage_members') + app.post_json( + '/api/roles/%s/relationships/parents/' % roles.grandchild.uuid, + params={'parent': {'uuid': roles.parent.uuid}}, + status=200, + ) + + def test_delete(self, app, admin_role, roles): + app.delete_json( + '/api/roles/%s/relationships/parents/' % roles.grandchild.uuid, + params={'parent': {'uuid': roles.child.uuid}}, + status=403, + ) + admin_role.add_permission(roles.grandchild, 'manage_members') + app.delete_json( + '/api/roles/%s/relationships/parents/' % roles.grandchild.uuid, + params={'parent': {'uuid': roles.child.uuid}}, + status=403, + ) + admin_role.add_permission(roles.child, 'manage_members') + app.delete_json( + '/api/roles/%s/relationships/parents/' % roles.grandchild.uuid, + params={'parent': {'uuid': roles.child.uuid}}, + status=200, + ) + assert not set(roles.grandchild.parents(include_self=False, direct=True)) -- 2.35.1