0001-api-filter-synchronization-queryset-by-ou-permission.patch
src/authentic2/api_views.py | ||
---|---|---|
769 | 769 |
queryset = queryset.none() |
770 | 770 |
return queryset |
771 | 771 | |
772 |
def filter_queryset_by_ou_perm(self, perm): |
|
773 |
queryset = User.objects |
|
774 |
allowed_ous = [] |
|
775 | ||
776 |
if self.request.user.has_perm(perm): |
|
777 |
return queryset |
|
778 | ||
779 |
for ou in OrganizationalUnit.objects.all(): |
|
780 |
if self.request.user.has_ou_perm(perm, ou): |
|
781 |
allowed_ous.append(ou) |
|
782 |
if not allowed_ous: |
|
783 |
raise PermissionDenied("You do not have permission to perform this action.") |
|
784 | ||
785 |
queryset = queryset.filter(ou__in=allowed_ous) |
|
786 |
return queryset |
|
787 | ||
772 | 788 |
def update(self, request, *args, **kwargs): |
773 | 789 |
kwargs['partial'] = True |
774 | 790 |
return super().update(request, *args, **kwargs) |
... | ... | |
824 | 840 |
modified_users_uuids.add(users_pks[instance_pk].uuid) |
825 | 841 |
return modified_users_uuids |
826 | 842 | |
827 |
@action(detail=False, methods=['post'], permission_classes=(DjangoPermission('custom_user.search_user'),))
|
|
843 |
@action(detail=False, methods=['post'], permission_classes=(permissions.IsAuthenticated,))
|
|
828 | 844 |
def synchronization(self, request): |
829 | 845 |
serializer = self.SynchronizationSerializer(data=request.data) |
846 |
users = self.filter_queryset_by_ou_perm('custom_user.search_user') |
|
847 | ||
830 | 848 |
if not serializer.is_valid(): |
831 | 849 |
response = {'result': 0, 'errors': serializer.errors} |
832 | 850 |
return Response(response, status.HTTP_400_BAD_REQUEST) |
833 | 851 |
hooks.call_hooks('api_modify_serializer_after_validation', self, serializer) |
834 | 852 |
remote_uuids = serializer.validated_data.get('known_uuids', []) |
835 |
users = User.objects.filter(uuid__in=remote_uuids).only('id', 'uuid')
|
|
853 |
users = users.filter(uuid__in=remote_uuids).only('id', 'uuid')
|
|
836 | 854 |
unknown_uuids = self.check_unknown_uuids(remote_uuids, users) |
837 | 855 |
data = { |
838 | 856 |
'result': 1, |
tests/test_api_client.py | ||
---|---|---|
7 | 7 |
from django.urls import reverse |
8 | 8 | |
9 | 9 |
from authentic2.a2_rbac.models import ADD_OP, SEARCH_OP, VIEW_OP, Role |
10 |
from authentic2.a2_rbac.utils import get_default_ou |
|
10 | 11 |
from authentic2.models import APIClient |
11 | 12 | |
12 | 13 |
User = get_user_model() |
... | ... | |
52 | 53 |
assert len(resp.json['results']) == 1 |
53 | 54 | |
54 | 55 | |
56 |
def test_api_users_list_ou(app, api_client, ou1): |
|
57 |
user = User.objects.create(username='user1') |
|
58 |
api_client.ou = ou1 |
|
59 |
api_client.save() |
|
60 | ||
61 |
app.authorization = ('Basic', ('foo', 'bar')) |
|
62 |
resp = app.get('/api/users/', status=401) |
|
63 | ||
64 |
app.authorization = ('Basic', (api_client.identifier, api_client.password)) |
|
65 |
resp = app.get('/api/users/') |
|
66 |
assert len(resp.json['results']) == 0 |
|
67 | ||
68 |
# give permissions |
|
69 |
r = Role.objects.get_admin_role( |
|
70 |
ContentType.objects.get_for_model(User), |
|
71 |
name='role', |
|
72 |
slug='role', |
|
73 |
ou=ou1, |
|
74 |
operation=VIEW_OP, |
|
75 |
) |
|
76 |
api_client.apiclient_roles.add(r) |
|
77 |
resp = app.get('/api/users/') |
|
78 |
assert len(resp.json['results']) == 0 |
|
79 | ||
80 |
user.ou = ou1 |
|
81 |
user.save() |
|
82 | ||
83 |
resp = app.get('/api/users/') |
|
84 |
assert len(resp.json['results']) == 1 |
|
85 | ||
86 | ||
55 | 87 |
def test_api_user_synchronization(app, api_client): |
56 | 88 |
uuids = [] |
57 | 89 |
for _ in range(100): |
... | ... | |
80 | 112 |
assert set(response.json['unknown_uuids']) == set(unknown_uuids) |
81 | 113 | |
82 | 114 | |
115 |
def test_api_user_synchronization_ou(app, api_client, ou1): |
|
116 |
uuids = [] |
|
117 |
authorized_uuids = [] |
|
118 |
for index in range(100): |
|
119 |
ou = ou1 if index % 2 else get_default_ou() |
|
120 |
user = User.objects.create(first_name='ben', last_name='dauve', ou=ou) |
|
121 |
uuids.append(user.uuid) |
|
122 |
if index % 2: |
|
123 |
authorized_uuids.append(user.uuid) |
|
124 |
unknown_uuids = [uuid.uuid4().hex for i in range(100)] |
|
125 |
url = reverse('a2-api-users-synchronization') |
|
126 |
content = { |
|
127 |
'known_uuids': uuids + unknown_uuids, |
|
128 |
} |
|
129 |
random.shuffle(content['known_uuids']) |
|
130 | ||
131 |
app.authorization = ('Basic', ('foo', 'bar')) |
|
132 |
response = app.post_json(url, params=content, status=401) |
|
133 | ||
134 |
app.authorization = ('Basic', (api_client.identifier, api_client.password)) |
|
135 |
response = app.post_json(url, params=content, status=403) |
|
136 | ||
137 |
# give custom_user.search_user permission to user |
|
138 |
r = Role.objects.get_admin_role( |
|
139 |
ContentType.objects.get_for_model(User), |
|
140 |
name='role', |
|
141 |
slug='role', |
|
142 |
ou=ou1, |
|
143 |
operation=SEARCH_OP, |
|
144 |
) |
|
145 |
api_client.apiclient_roles.add(r) |
|
146 |
response = app.post_json(url, params=content) |
|
147 |
assert response.json['result'] == 1 |
|
148 |
assert set(response.json['unknown_uuids']) != set(unknown_uuids) |
|
149 |
assert set(unknown_uuids).issubset(set(response.json['unknown_uuids'])) |
|
150 | ||
151 | ||
83 | 152 |
def test_api_users_create(app, api_client): |
84 | 153 |
payload = { |
85 | 154 |
'username': 'janedoe', |
86 |
- |