Project

General

Profile

0001-api-filter-synchronization-queryset-by-ou-permission.patch

Paul Marillonnet (retour le 04/05), 07 December 2022 11:52 AM

Download (5.69 KB)

View differences:

Subject: [PATCH] api: filter synchronization queryset by ou permissions
 (#71463)

 src/authentic2/api_views.py | 22 ++++++++++--
 tests/test_api_client.py    | 69 +++++++++++++++++++++++++++++++++++++
 2 files changed, 89 insertions(+), 2 deletions(-)
src/authentic2/api_views.py
queryset = queryset.none()
return queryset
def filter_queryset_by_ou_perm(self, perm):
queryset = User.objects
allowed_ous = []
if self.request.user.has_perm(perm):
return queryset
for ou in OrganizationalUnit.objects.all():
if self.request.user.has_ou_perm(perm, ou):
allowed_ous.append(ou)
if not allowed_ous:
raise PermissionDenied("You do not have permission to perform this action.")
queryset = queryset.filter(ou__in=allowed_ous)
return queryset
def update(self, request, *args, **kwargs):
kwargs['partial'] = True
return super().update(request, *args, **kwargs)
......
modified_users_uuids.add(users_pks[instance_pk].uuid)
return modified_users_uuids
@action(detail=False, methods=['post'], permission_classes=(DjangoPermission('custom_user.search_user'),))
@action(detail=False, methods=['post'], permission_classes=(permissions.IsAuthenticated,))
def synchronization(self, request):
serializer = self.SynchronizationSerializer(data=request.data)
users = self.filter_queryset_by_ou_perm('custom_user.search_user')
if not serializer.is_valid():
response = {'result': 0, 'errors': serializer.errors}
return Response(response, status.HTTP_400_BAD_REQUEST)
hooks.call_hooks('api_modify_serializer_after_validation', self, serializer)
remote_uuids = serializer.validated_data.get('known_uuids', [])
users = User.objects.filter(uuid__in=remote_uuids).only('id', 'uuid')
users = users.filter(uuid__in=remote_uuids).only('id', 'uuid')
unknown_uuids = self.check_unknown_uuids(remote_uuids, users)
data = {
'result': 1,
tests/test_api_client.py
from django.urls import reverse
from authentic2.a2_rbac.models import ADD_OP, SEARCH_OP, VIEW_OP, Role
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.models import APIClient
User = get_user_model()
......
assert len(resp.json['results']) == 1
def test_api_users_list_ou(app, api_client, ou1):
user = User.objects.create(username='user1')
api_client.ou = ou1
api_client.save()
app.authorization = ('Basic', ('foo', 'bar'))
resp = app.get('/api/users/', status=401)
app.authorization = ('Basic', (api_client.identifier, api_client.password))
resp = app.get('/api/users/')
assert len(resp.json['results']) == 0
# give permissions
r = Role.objects.get_admin_role(
ContentType.objects.get_for_model(User),
name='role',
slug='role',
ou=ou1,
operation=VIEW_OP,
)
api_client.apiclient_roles.add(r)
resp = app.get('/api/users/')
assert len(resp.json['results']) == 0
user.ou = ou1
user.save()
resp = app.get('/api/users/')
assert len(resp.json['results']) == 1
def test_api_user_synchronization(app, api_client):
uuids = []
for _ in range(100):
......
assert set(response.json['unknown_uuids']) == set(unknown_uuids)
def test_api_user_synchronization_ou(app, api_client, ou1):
uuids = []
authorized_uuids = []
for index in range(100):
ou = ou1 if index % 2 else get_default_ou()
user = User.objects.create(first_name='ben', last_name='dauve', ou=ou)
uuids.append(user.uuid)
if index % 2:
authorized_uuids.append(user.uuid)
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
url = reverse('a2-api-users-synchronization')
content = {
'known_uuids': uuids + unknown_uuids,
}
random.shuffle(content['known_uuids'])
app.authorization = ('Basic', ('foo', 'bar'))
response = app.post_json(url, params=content, status=401)
app.authorization = ('Basic', (api_client.identifier, api_client.password))
response = app.post_json(url, params=content, status=403)
# give custom_user.search_user permission to user
r = Role.objects.get_admin_role(
ContentType.objects.get_for_model(User),
name='role',
slug='role',
ou=ou1,
operation=SEARCH_OP,
)
api_client.apiclient_roles.add(r)
response = app.post_json(url, params=content)
assert response.json['result'] == 1
assert set(response.json['unknown_uuids']) != set(unknown_uuids)
assert set(unknown_uuids).issubset(set(response.json['unknown_uuids']))
def test_api_users_create(app, api_client):
payload = {
'username': 'janedoe',