0001-WIP-add-role-creation-API-20706.patch
src/authentic2/api_views.py | ||
---|---|---|
4 | 4 | |
5 | 5 |
from django.db import models |
6 | 6 |
from django.contrib.auth import get_user_model |
7 |
from django.core.exceptions import MultipleObjectsReturned |
|
7 |
from django.core.exceptions import MultipleObjectsReturned, DoesNotExist
|
|
8 | 8 |
from django.utils.translation import ugettext as _ |
9 | 9 |
from django.views.decorators.vary import vary_on_headers |
10 | 10 |
from django.views.decorators.cache import cache_control |
... | ... | |
444 | 444 |
exclude = ('date_joined', 'user_permissions', 'groups', 'last_login') |
445 | 445 | |
446 | 446 | |
447 |
class RoleSerializer(serializers.ModelSerializer): |
|
448 |
ou = serializers.SlugRelatedField( |
|
449 |
many=False, |
|
450 |
required=False, |
|
451 |
default=get_default_ou, |
|
452 |
queryset=get_ou_model().objects.all(), |
|
453 |
slug_field='slug') |
|
454 | ||
455 |
def check_perm(self, perm, ou=None): |
|
456 |
self.context['view'].check_perm(perm, ou) |
|
457 | ||
458 |
def create(self, validated_data): |
|
459 |
if self.check_perm('a2_rbac.add_role', validated_data.get('ou')): |
|
460 |
return super(RoleSerializer, self).create(validated_data) |
|
461 | ||
462 |
def update(self, instance, validated_data): |
|
463 |
if self.check_perm('a2_rbac.change_role', validated_data.get('ou')): |
|
464 |
return super(RoleSerializer, self).update(instance, validated_data) |
|
465 | ||
466 |
class Meta: |
|
467 |
model = get_role_model() |
|
468 |
exclude = ('service', 'admin_scope_id', 'admin_scope_ct',) |
|
469 |
extra_kwargs = {'uuid': {'read_only': True}} |
|
470 | ||
471 | ||
447 | 472 |
class UsersFilter(FilterSet): |
448 | 473 |
class Meta: |
449 | 474 |
model = get_user_model() |
... | ... | |
576 | 601 |
return Response({'result': 1}) |
577 | 602 | |
578 | 603 | |
604 |
class RolesAPI(ExceptionHandlerMixin, ModelViewSet): |
|
605 |
permission_classes = (permissions.IsAuthenticated,) |
|
606 |
serializer_class = RoleSerializer |
|
607 |
lookup_field = 'slug' |
|
608 |
queryset = get_role_model().objects.all() |
|
609 | ||
610 |
def check_perm(self, perm, ou=None): |
|
611 |
if ou: |
|
612 |
if not self.request.user.has_ou_perm(perm, ou): |
|
613 |
raise PermissionDenied(u'User %s does not have permission %s in %s' % (user, perm, ou)) |
|
614 |
else: |
|
615 |
if not self.request.user.has_perm(perm): |
|
616 |
raise PermissionDenied(u'User %s do not have permission %s' % (user, perm)) |
|
617 | ||
618 |
def _role_from_request(self, request, *args, **kwargs): |
|
619 |
role_slug = kwargs.get('slug') |
|
620 |
if role_slug: |
|
621 |
try: |
|
622 |
role = self.queryset.get(slug=role_slug) |
|
623 |
except (DoesNotExist, MultipleObjectsReturned): |
|
624 |
pass |
|
625 |
else: |
|
626 |
return role |
|
627 | ||
628 |
def retrieve(self, request, *args, **kwargs): |
|
629 |
role = self._role_from_request(self, request, *args, **kwargs) |
|
630 |
if role and self.check_perm('a2_rbac.view_role', getattr(role, 'ou', None)): |
|
631 |
return super(RolesAPI, self).retrieve(request, *args, **kwars) |
|
632 | ||
633 |
def destroy(self, request, *args, **kwargs): |
|
634 |
role = self._role_from_request(self, request, *args, **kwargs) |
|
635 |
if role and self.check_perm('a2_rbac.delete_role', getattr(role, 'ou', None)): |
|
636 |
return super(RolesAPI, self).destroy(request, *args, **kwars) |
|
637 | ||
579 | 638 |
class RoleMembershipsAPI(ExceptionHandlerMixin, APIView): |
580 | 639 |
permission_classes = (permissions.IsAuthenticated,) |
581 | 640 | |
... | ... | |
620 | 679 |
router = SimpleRouter() |
621 | 680 |
router.register(r'users', UsersAPI, base_name='a2-api-users') |
622 | 681 |
router.register(r'ous', OrganizationalUnitAPI, base_name='a2-api-ous') |
682 |
router.register(r'roles', RolesAPI, base_name='a2-api-roles') |
|
623 | 683 | |
624 | 684 | |
625 | 685 |
class CheckPasswordSerializer(serializers.Serializer): |
tests/test_api.py | ||
---|---|---|
30 | 30 |
assert 'username' in resp.json |
31 | 31 | |
32 | 32 | |
33 |
def test_api_delete_role_authorized(app, superuser, simple_role): |
|
34 |
app.authorization = ('Basic', (superuser.username, superuser.username)) |
|
35 |
app.delete('/api/roles/simple_role/') |
|
36 | ||
37 | ||
38 |
def test_api_delete_role_unauthorized(app, simple_user, simple_role): |
|
39 |
app.authorization = ('Basic', (simple_user.username, simple_user.username)) |
|
40 |
app.delete('/api/roles/simple_role/') |
|
41 | ||
42 | ||
43 |
def test_api_put_role_authorized(app, superuser, simple_role): |
|
44 |
app.authorization = ('Basic', (superuser.username, superuser.username)) |
|
45 | ||
46 |
role_data = { |
|
47 |
'name': 'updated-role', |
|
48 |
} |
|
49 |
resp = app.put_json('/api/roles/', params=role_data) |
|
50 | ||
51 | ||
52 |
def test_api_put_role_unauthorized(app, simple_user, simple_role): |
|
53 |
app.authorization = ('Basic', (simple_user.username, simple_user.username)) |
|
54 | ||
55 |
role_data = { |
|
56 |
'name': 'updated-role', |
|
57 |
} |
|
58 |
resp = app.put_json('/api/roles/', params=role_data) |
|
59 | ||
60 | ||
61 |
def test_api_post_role_simple(app, superuser): |
|
62 |
app.authorization = ('Basic', (superuser.username, superuser.username)) |
|
63 | ||
64 |
role_data = { |
|
65 |
'slug': 'coffee-manager', |
|
66 |
'name': 'Coffee Manager', |
|
67 |
'ou': 'ou1' |
|
68 |
} |
|
69 | ||
70 |
resp = app.post_json('/api/roles/', params=role_data) |
|
71 |
assert isinstance(resp.json, dict) |
|
72 |
Role = get_role_model() |
|
73 | ||
74 |
# Check attribute values against the server's response: |
|
75 |
for key, value in role_data.items(): |
|
76 |
assert key in resp.json.keys() |
|
77 |
assert value in resp.json.values() |
|
78 | ||
79 |
# Check attributes values against the DB: |
|
80 |
posted_role = Role.objects.get(slug='coffee-manager') |
|
81 |
assert posted_role.slug == role_data['slug'] |
|
82 |
assert posted_role.name == role_data['name'] |
|
83 |
assert posted_role.ou.slug == 'ou1' |
|
84 | ||
85 | ||
86 |
def test_api_get_role_description(app, superuser): |
|
87 |
app.authorization = ('Basic', (superuser.username, superuser.username)) |
|
88 |
resp = app.get('/api/roles/rando/') |
|
89 | ||
90 |
assert resp.json['slug'] == 'rando' |
|
91 |
assert resp.json['ou'] == 'ou_rando' |
|
92 | ||
93 | ||
94 |
def test_api_get_role_list(app, user): |
|
95 |
app.authorization = ('Basic', (user.username, user.username)) |
|
96 |
resp = app.get('/api/roles/') |
|
97 | ||
98 |
role_fields = ['slug', 'uuid', 'name', 'ou'] |
|
99 | ||
100 |
assert len(resp.json['results']) |
|
101 | ||
102 |
for role_dict in resp.json['results']: |
|
103 |
for field in role_fields: |
|
104 |
assert field in role_dict |
|
105 | ||
106 | ||
33 | 107 |
def test_api_user(client): |
34 | 108 |
# create an user, an ou role, a service and a service role |
35 | 109 |
ou = get_default_ou() |
36 |
- |