0001-api-make-sync-endpoint-adapt-to-permissions-by-OU-71.patch
src/authentic2/api_views.py | ||
---|---|---|
769 | 769 |
queryset = queryset.none() |
770 | 770 |
return queryset |
771 | 771 | |
772 |
def filter_queryset_by_ou_perm(self, perm): |
|
773 |
queryset = User.objects |
|
774 |
allowed_ous = [] |
|
775 | ||
776 |
if self.request.user.has_perm(perm): |
|
777 |
return queryset |
|
778 | ||
779 |
for ou in OrganizationalUnit.objects.all(): |
|
780 |
if self.request.user.has_ou_perm(perm, ou): |
|
781 |
allowed_ous.append(ou) |
|
782 |
if not allowed_ous: |
|
783 |
raise PermissionDenied("You do not have permission to perform this action.") |
|
784 | ||
785 |
queryset = queryset.filter(ou__in=allowed_ous) |
|
786 |
return queryset |
|
787 | ||
772 | 788 |
def update(self, request, *args, **kwargs): |
773 | 789 |
kwargs['partial'] = True |
774 | 790 |
return super().update(request, *args, **kwargs) |
... | ... | |
824 | 840 |
modified_users_uuids.add(users_pks[instance_pk].uuid) |
825 | 841 |
return modified_users_uuids |
826 | 842 | |
827 |
@action(detail=False, methods=['post'], permission_classes=(DjangoPermission('custom_user.search_user'),))
|
|
843 |
@action(detail=False, methods=['post'], permission_classes=(permissions.IsAuthenticated,))
|
|
828 | 844 |
def synchronization(self, request): |
829 | 845 |
serializer = self.SynchronizationSerializer(data=request.data) |
846 |
queryset = self.filter_queryset_by_ou_perm('custom_user.search_user') |
|
847 | ||
830 | 848 |
if not serializer.is_valid(): |
831 | 849 |
response = {'result': 0, 'errors': serializer.errors} |
832 | 850 |
return Response(response, status.HTTP_400_BAD_REQUEST) |
833 | 851 |
hooks.call_hooks('api_modify_serializer_after_validation', self, serializer) |
834 | 852 |
remote_uuids = serializer.validated_data.get('known_uuids', []) |
835 |
users = User.objects.filter(uuid__in=remote_uuids).only('id', 'uuid')
|
|
853 |
users = queryset.filter(uuid__in=remote_uuids).only('id', 'uuid')
|
|
836 | 854 |
unknown_uuids = self.check_unknown_uuids(remote_uuids, users) |
837 | 855 |
data = { |
838 | 856 |
'result': 1, |
tests/api/test_user_synchronization.py | ||
---|---|---|
72 | 72 |
app.post_json(URL, status=403) |
73 | 73 | |
74 | 74 | |
75 |
def test_ou_permission_sufficient(app, user, ou1, users): |
|
76 |
user.roles.clear() |
|
77 |
r = Role.objects.get_admin_role( |
|
78 |
ContentType.objects.get_for_model(User), |
|
79 |
name='role', |
|
80 |
slug='role', |
|
81 |
ou=ou1, |
|
82 |
operation=SEARCH_OP, |
|
83 |
) |
|
84 |
user.roles.add(r) |
|
85 | ||
86 |
users = users[:6] |
|
87 |
now = datetime.datetime.now() |
|
88 |
content = { |
|
89 |
'known_uuids': [user.uuid for user in users], |
|
90 |
'timestamp': (now - datetime.timedelta(days=3)).isoformat(), |
|
91 |
} |
|
92 |
resp = app.post_json(URL, params=content, status=200) |
|
93 |
assert len(resp.json['unknown_uuids']) == 6 |
|
94 | ||
95 |
for user in users[:4]: |
|
96 |
user.ou = ou1 |
|
97 |
user.save() |
|
98 | ||
99 |
resp = app.post_json(URL, params=content, status=200) |
|
100 |
assert len(resp.json['unknown_uuids']) == 6 - 4 |
|
101 | ||
102 | ||
75 | 103 |
@pytest.fixture(scope='session') |
76 | 104 |
def unknown_uuids(): |
77 | 105 |
return [uuid.uuid4().hex for i in range(10)] |
tests/test_api_client.py | ||
---|---|---|
7 | 7 |
from django.urls import reverse |
8 | 8 | |
9 | 9 |
from authentic2.a2_rbac.models import ADD_OP, SEARCH_OP, VIEW_OP, Role |
10 |
from authentic2.a2_rbac.utils import get_default_ou |
|
10 | 11 |
from authentic2.models import APIClient |
11 | 12 | |
12 | 13 |
User = get_user_model() |
... | ... | |
80 | 81 |
assert set(response.json['unknown_uuids']) == set(unknown_uuids) |
81 | 82 | |
82 | 83 | |
84 |
def test_api_user_synchronization_ou(app, api_client, ou1): |
|
85 |
uuids = [] |
|
86 |
authorized_uuids = [] |
|
87 |
for index in range(100): |
|
88 |
ou = ou1 if index % 2 else get_default_ou() |
|
89 |
user = User.objects.create(first_name='ben', last_name='dauve', ou=ou) |
|
90 |
uuids.append(user.uuid) |
|
91 |
if index % 2: |
|
92 |
authorized_uuids.append(user.uuid) |
|
93 |
unknown_uuids = [uuid.uuid4().hex for i in range(100)] |
|
94 |
url = reverse('a2-api-users-synchronization') |
|
95 |
content = { |
|
96 |
'known_uuids': uuids + unknown_uuids, |
|
97 |
} |
|
98 |
random.shuffle(content['known_uuids']) |
|
99 | ||
100 |
app.authorization = ('Basic', ('foo', 'bar')) |
|
101 |
response = app.post_json(url, params=content, status=401) |
|
102 | ||
103 |
app.authorization = ('Basic', (api_client.identifier, api_client.password)) |
|
104 |
response = app.post_json(url, params=content, status=403) |
|
105 | ||
106 |
# give custom_user.search_user permission to user |
|
107 |
r = Role.objects.get_admin_role( |
|
108 |
ContentType.objects.get_for_model(User), |
|
109 |
name='role', |
|
110 |
slug='role', |
|
111 |
ou=ou1, |
|
112 |
operation=SEARCH_OP, |
|
113 |
) |
|
114 |
api_client.apiclient_roles.add(r) |
|
115 |
response = app.post_json(url, params=content) |
|
116 |
assert response.json['result'] == 1 |
|
117 |
assert set(response.json['unknown_uuids']) != set(unknown_uuids) |
|
118 |
assert set(unknown_uuids).issubset(set(response.json['unknown_uuids'])) |
|
119 | ||
120 | ||
83 | 121 |
def test_api_users_create(app, api_client): |
84 | 122 |
payload = { |
85 | 123 |
'username': 'janedoe', |
86 |
- |