0001-api-filter-synchronization-queryset-by-ou-permission.patch
| src/authentic2/api_views.py | ||
|---|---|---|
|
queryset = queryset.none()
|
||
|
return queryset
|
||
|
def filter_queryset_by_ou_perm(self, perm):
|
||
|
queryset = User.objects
|
||
|
allowed_ous = []
|
||
|
if self.request.user.has_perm(perm):
|
||
|
return queryset
|
||
|
for ou in OrganizationalUnit.objects.all():
|
||
|
if self.request.user.has_ou_perm(perm, ou):
|
||
|
allowed_ous.append(ou)
|
||
|
if not allowed_ous:
|
||
|
raise PermissionDenied("You do not have permission to perform this action.")
|
||
|
queryset = queryset.filter(ou__in=allowed_ous)
|
||
|
return queryset
|
||
|
def update(self, request, *args, **kwargs):
|
||
|
kwargs['partial'] = True
|
||
|
return super().update(request, *args, **kwargs)
|
||
| ... | ... | |
|
modified_users_uuids.add(users_pks[instance_pk].uuid)
|
||
|
return modified_users_uuids
|
||
|
@action(detail=False, methods=['post'], permission_classes=(DjangoPermission('custom_user.search_user'),))
|
||
|
@action(detail=False, methods=['post'], permission_classes=(permissions.IsAuthenticated,))
|
||
|
def synchronization(self, request):
|
||
|
serializer = self.SynchronizationSerializer(data=request.data)
|
||
|
users = self.filter_queryset_by_ou_perm('custom_user.search_user')
|
||
|
if not serializer.is_valid():
|
||
|
response = {'result': 0, 'errors': serializer.errors}
|
||
|
return Response(response, status.HTTP_400_BAD_REQUEST)
|
||
|
hooks.call_hooks('api_modify_serializer_after_validation', self, serializer)
|
||
|
remote_uuids = serializer.validated_data.get('known_uuids', [])
|
||
|
users = User.objects.filter(uuid__in=remote_uuids).only('id', 'uuid')
|
||
|
users = users.filter(uuid__in=remote_uuids).only('id', 'uuid')
|
||
|
unknown_uuids = self.check_unknown_uuids(remote_uuids, users)
|
||
|
data = {
|
||
|
'result': 1,
|
||
| tests/test_api_client.py | ||
|---|---|---|
|
from django.urls import reverse
|
||
|
from authentic2.a2_rbac.models import ADD_OP, SEARCH_OP, VIEW_OP, Role
|
||
|
from authentic2.a2_rbac.utils import get_default_ou
|
||
|
from authentic2.models import APIClient
|
||
|
User = get_user_model()
|
||
| ... | ... | |
|
assert len(resp.json['results']) == 1
|
||
|
def test_api_users_list_ou(app, api_client, ou1):
|
||
|
user = User.objects.create(username='user1')
|
||
|
api_client.ou = ou1
|
||
|
api_client.save()
|
||
|
app.authorization = ('Basic', ('foo', 'bar'))
|
||
|
resp = app.get('/api/users/', status=401)
|
||
|
app.authorization = ('Basic', (api_client.identifier, api_client.password))
|
||
|
resp = app.get('/api/users/')
|
||
|
assert len(resp.json['results']) == 0
|
||
|
# give permissions
|
||
|
r = Role.objects.get_admin_role(
|
||
|
ContentType.objects.get_for_model(User),
|
||
|
name='role',
|
||
|
slug='role',
|
||
|
ou=ou1,
|
||
|
operation=VIEW_OP,
|
||
|
)
|
||
|
api_client.apiclient_roles.add(r)
|
||
|
resp = app.get('/api/users/')
|
||
|
assert len(resp.json['results']) == 0
|
||
|
user.ou = ou1
|
||
|
user.save()
|
||
|
resp = app.get('/api/users/')
|
||
|
assert len(resp.json['results']) == 1
|
||
|
def test_api_user_synchronization(app, api_client):
|
||
|
uuids = []
|
||
|
for _ in range(100):
|
||
| ... | ... | |
|
assert set(response.json['unknown_uuids']) == set(unknown_uuids)
|
||
|
def test_api_user_synchronization_ou(app, api_client, ou1):
|
||
|
uuids = []
|
||
|
authorized_uuids = []
|
||
|
for index in range(100):
|
||
|
ou = ou1 if index % 2 else get_default_ou()
|
||
|
user = User.objects.create(first_name='ben', last_name='dauve', ou=ou)
|
||
|
uuids.append(user.uuid)
|
||
|
if index % 2:
|
||
|
authorized_uuids.append(user.uuid)
|
||
|
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
|
||
|
url = reverse('a2-api-users-synchronization')
|
||
|
content = {
|
||
|
'known_uuids': uuids + unknown_uuids,
|
||
|
}
|
||
|
random.shuffle(content['known_uuids'])
|
||
|
app.authorization = ('Basic', ('foo', 'bar'))
|
||
|
response = app.post_json(url, params=content, status=401)
|
||
|
app.authorization = ('Basic', (api_client.identifier, api_client.password))
|
||
|
response = app.post_json(url, params=content, status=403)
|
||
|
# give custom_user.search_user permission to user
|
||
|
r = Role.objects.get_admin_role(
|
||
|
ContentType.objects.get_for_model(User),
|
||
|
name='role',
|
||
|
slug='role',
|
||
|
ou=ou1,
|
||
|
operation=SEARCH_OP,
|
||
|
)
|
||
|
api_client.apiclient_roles.add(r)
|
||
|
response = app.post_json(url, params=content)
|
||
|
assert response.json['result'] == 1
|
||
|
assert set(response.json['unknown_uuids']) != set(unknown_uuids)
|
||
|
assert set(unknown_uuids).issubset(set(response.json['unknown_uuids']))
|
||
|
def test_api_users_create(app, api_client):
|
||
|
payload = {
|
||
|
'username': 'janedoe',
|
||