From 0e6df9629eb131a0452df6befc22f630922859b3 Mon Sep 17 00:00:00 2001 From: Paul Marillonnet Date: Fri, 12 Jan 2018 11:27:12 +0100 Subject: [PATCH] WIP add role-creation API (#20706) --- src/authentic2/api_views.py | 62 ++++++++++++++++++++++++++++++++++++- tests/test_api.py | 74 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+), 1 deletion(-) diff --git a/src/authentic2/api_views.py b/src/authentic2/api_views.py index 95fb99ee..195a1ade 100644 --- a/src/authentic2/api_views.py +++ b/src/authentic2/api_views.py @@ -4,7 +4,7 @@ import smtplib from django.db import models from django.contrib.auth import get_user_model -from django.core.exceptions import MultipleObjectsReturned +from django.core.exceptions import MultipleObjectsReturned, DoesNotExist from django.utils.translation import ugettext as _ from django.views.decorators.vary import vary_on_headers from django.views.decorators.cache import cache_control @@ -444,6 +444,31 @@ class BaseUserSerializer(serializers.ModelSerializer): exclude = ('date_joined', 'user_permissions', 'groups', 'last_login') +class RoleSerializer(serializers.ModelSerializer): + ou = serializers.SlugRelatedField( + many=False, + required=False, + default=get_default_ou, + queryset=get_ou_model().objects.all(), + slug_field='slug') + + def check_perm(self, perm, ou=None): + self.context['view'].check_perm(perm, ou) + + def create(self, validated_data): + if self.check_perm('a2_rbac.add_role', validated_data.get('ou')): + return super(RoleSerializer, self).create(validated_data) + + def update(self, instance, validated_data): + if self.check_perm('a2_rbac.change_role', validated_data.get('ou')): + return super(RoleSerializer, self).update(instance, validated_data) + + class Meta: + model = get_role_model() + exclude = ('service', 'admin_scope_id', 'admin_scope_ct',) + extra_kwargs = {'uuid': {'read_only': True}} + + class UsersFilter(FilterSet): class Meta: model = get_user_model() @@ -576,6 +601,40 @@ class UsersAPI(HookMixin, ExceptionHandlerMixin, ModelViewSet): return Response({'result': 1}) +class RolesAPI(ExceptionHandlerMixin, ModelViewSet): + permission_classes = (permissions.IsAuthenticated,) + serializer_class = RoleSerializer + lookup_field = 'slug' + queryset = get_role_model().objects.all() + + def check_perm(self, perm, ou=None): + if ou: + if not self.request.user.has_ou_perm(perm, ou): + raise PermissionDenied(u'User %s does not have permission %s in %s' % (user, perm, ou)) + else: + if not self.request.user.has_perm(perm): + raise PermissionDenied(u'User %s do not have permission %s' % (user, perm)) + + def _role_from_request(self, request, *args, **kwargs): + role_slug = kwargs.get('slug') + if role_slug: + try: + role = self.queryset.get(slug=role_slug) + except (DoesNotExist, MultipleObjectsReturned): + pass + else: + return role + + def retrieve(self, request, *args, **kwargs): + role = self._role_from_request(self, request, *args, **kwargs) + if role and self.check_perm('a2_rbac.view_role', getattr(role, 'ou', None)): + return super(RolesAPI, self).retrieve(request, *args, **kwars) + + def destroy(self, request, *args, **kwargs): + role = self._role_from_request(self, request, *args, **kwargs) + if role and self.check_perm('a2_rbac.delete_role', getattr(role, 'ou', None)): + return super(RolesAPI, self).destroy(request, *args, **kwars) + class RoleMembershipsAPI(ExceptionHandlerMixin, APIView): permission_classes = (permissions.IsAuthenticated,) @@ -620,6 +679,7 @@ class OrganizationalUnitAPI(ExceptionHandlerMixin, ModelViewSet): router = SimpleRouter() router.register(r'users', UsersAPI, base_name='a2-api-users') router.register(r'ous', OrganizationalUnitAPI, base_name='a2-api-ous') +router.register(r'roles', RolesAPI, base_name='a2-api-roles') class CheckPasswordSerializer(serializers.Serializer): diff --git a/tests/test_api.py b/tests/test_api.py index 96085590..b7a2adf4 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -30,6 +30,80 @@ def test_api_user_simple(logged_app): assert 'username' in resp.json +def test_api_delete_role_authorized(app, superuser, simple_role): + app.authorization = ('Basic', (superuser.username, superuser.username)) + app.delete('/api/roles/simple_role/') + + +def test_api_delete_role_unauthorized(app, simple_user, simple_role): + app.authorization = ('Basic', (simple_user.username, simple_user.username)) + app.delete('/api/roles/simple_role/') + + +def test_api_put_role_authorized(app, superuser, simple_role): + app.authorization = ('Basic', (superuser.username, superuser.username)) + + role_data = { + 'name': 'updated-role', + } + resp = app.put_json('/api/roles/', params=role_data) + + +def test_api_put_role_unauthorized(app, simple_user, simple_role): + app.authorization = ('Basic', (simple_user.username, simple_user.username)) + + role_data = { + 'name': 'updated-role', + } + resp = app.put_json('/api/roles/', params=role_data) + + +def test_api_post_role_simple(app, superuser): + app.authorization = ('Basic', (superuser.username, superuser.username)) + + role_data = { + 'slug': 'coffee-manager', + 'name': 'Coffee Manager', + 'ou': 'ou1' + } + + resp = app.post_json('/api/roles/', params=role_data) + assert isinstance(resp.json, dict) + Role = get_role_model() + + # Check attribute values against the server's response: + for key, value in role_data.items(): + assert key in resp.json.keys() + assert value in resp.json.values() + + # Check attributes values against the DB: + posted_role = Role.objects.get(slug='coffee-manager') + assert posted_role.slug == role_data['slug'] + assert posted_role.name == role_data['name'] + assert posted_role.ou.slug == 'ou1' + + +def test_api_get_role_description(app, superuser): + app.authorization = ('Basic', (superuser.username, superuser.username)) + resp = app.get('/api/roles/rando/') + + assert resp.json['slug'] == 'rando' + assert resp.json['ou'] == 'ou_rando' + + +def test_api_get_role_list(app, user): + app.authorization = ('Basic', (user.username, user.username)) + resp = app.get('/api/roles/') + + role_fields = ['slug', 'uuid', 'name', 'ou'] + + assert len(resp.json['results']) + + for role_dict in resp.json['results']: + for field in role_fields: + assert field in role_dict + + def test_api_user(client): # create an user, an ou role, a service and a service role ou = get_default_ou() -- 2.11.0