Projet

Général

Profil

0001-api-filter-synchronization-queryset-by-ou-permission.patch

Paul Marillonnet, 07 décembre 2022 11:52

Télécharger (5,69 ko)

Voir les différences:

Subject: [PATCH] api: filter synchronization queryset by ou permissions
 (#71463)

 src/authentic2/api_views.py | 22 ++++++++++--
 tests/test_api_client.py    | 69 +++++++++++++++++++++++++++++++++++++
 2 files changed, 89 insertions(+), 2 deletions(-)
src/authentic2/api_views.py
769 769
                queryset = queryset.none()
770 770
        return queryset
771 771

  
772
    def filter_queryset_by_ou_perm(self, perm):
773
        queryset = User.objects
774
        allowed_ous = []
775

  
776
        if self.request.user.has_perm(perm):
777
            return queryset
778

  
779
        for ou in OrganizationalUnit.objects.all():
780
            if self.request.user.has_ou_perm(perm, ou):
781
                allowed_ous.append(ou)
782
        if not allowed_ous:
783
            raise PermissionDenied("You do not have permission to perform this action.")
784

  
785
        queryset = queryset.filter(ou__in=allowed_ous)
786
        return queryset
787

  
772 788
    def update(self, request, *args, **kwargs):
773 789
        kwargs['partial'] = True
774 790
        return super().update(request, *args, **kwargs)
......
824 840
                    modified_users_uuids.add(users_pks[instance_pk].uuid)
825 841
        return modified_users_uuids
826 842

  
827
    @action(detail=False, methods=['post'], permission_classes=(DjangoPermission('custom_user.search_user'),))
843
    @action(detail=False, methods=['post'], permission_classes=(permissions.IsAuthenticated,))
828 844
    def synchronization(self, request):
829 845
        serializer = self.SynchronizationSerializer(data=request.data)
846
        users = self.filter_queryset_by_ou_perm('custom_user.search_user')
847

  
830 848
        if not serializer.is_valid():
831 849
            response = {'result': 0, 'errors': serializer.errors}
832 850
            return Response(response, status.HTTP_400_BAD_REQUEST)
833 851
        hooks.call_hooks('api_modify_serializer_after_validation', self, serializer)
834 852
        remote_uuids = serializer.validated_data.get('known_uuids', [])
835
        users = User.objects.filter(uuid__in=remote_uuids).only('id', 'uuid')
853
        users = users.filter(uuid__in=remote_uuids).only('id', 'uuid')
836 854
        unknown_uuids = self.check_unknown_uuids(remote_uuids, users)
837 855
        data = {
838 856
            'result': 1,
tests/test_api_client.py
7 7
from django.urls import reverse
8 8

  
9 9
from authentic2.a2_rbac.models import ADD_OP, SEARCH_OP, VIEW_OP, Role
10
from authentic2.a2_rbac.utils import get_default_ou
10 11
from authentic2.models import APIClient
11 12

  
12 13
User = get_user_model()
......
52 53
    assert len(resp.json['results']) == 1
53 54

  
54 55

  
56
def test_api_users_list_ou(app, api_client, ou1):
57
    user = User.objects.create(username='user1')
58
    api_client.ou = ou1
59
    api_client.save()
60

  
61
    app.authorization = ('Basic', ('foo', 'bar'))
62
    resp = app.get('/api/users/', status=401)
63

  
64
    app.authorization = ('Basic', (api_client.identifier, api_client.password))
65
    resp = app.get('/api/users/')
66
    assert len(resp.json['results']) == 0
67

  
68
    # give permissions
69
    r = Role.objects.get_admin_role(
70
        ContentType.objects.get_for_model(User),
71
        name='role',
72
        slug='role',
73
        ou=ou1,
74
        operation=VIEW_OP,
75
    )
76
    api_client.apiclient_roles.add(r)
77
    resp = app.get('/api/users/')
78
    assert len(resp.json['results']) == 0
79

  
80
    user.ou = ou1
81
    user.save()
82

  
83
    resp = app.get('/api/users/')
84
    assert len(resp.json['results']) == 1
85

  
86

  
55 87
def test_api_user_synchronization(app, api_client):
56 88
    uuids = []
57 89
    for _ in range(100):
......
80 112
    assert set(response.json['unknown_uuids']) == set(unknown_uuids)
81 113

  
82 114

  
115
def test_api_user_synchronization_ou(app, api_client, ou1):
116
    uuids = []
117
    authorized_uuids = []
118
    for index in range(100):
119
        ou = ou1 if index % 2 else get_default_ou()
120
        user = User.objects.create(first_name='ben', last_name='dauve', ou=ou)
121
        uuids.append(user.uuid)
122
        if index % 2:
123
            authorized_uuids.append(user.uuid)
124
    unknown_uuids = [uuid.uuid4().hex for i in range(100)]
125
    url = reverse('a2-api-users-synchronization')
126
    content = {
127
        'known_uuids': uuids + unknown_uuids,
128
    }
129
    random.shuffle(content['known_uuids'])
130

  
131
    app.authorization = ('Basic', ('foo', 'bar'))
132
    response = app.post_json(url, params=content, status=401)
133

  
134
    app.authorization = ('Basic', (api_client.identifier, api_client.password))
135
    response = app.post_json(url, params=content, status=403)
136

  
137
    # give custom_user.search_user permission to user
138
    r = Role.objects.get_admin_role(
139
        ContentType.objects.get_for_model(User),
140
        name='role',
141
        slug='role',
142
        ou=ou1,
143
        operation=SEARCH_OP,
144
    )
145
    api_client.apiclient_roles.add(r)
146
    response = app.post_json(url, params=content)
147
    assert response.json['result'] == 1
148
    assert set(response.json['unknown_uuids']) != set(unknown_uuids)
149
    assert set(unknown_uuids).issubset(set(response.json['unknown_uuids']))
150

  
151

  
83 152
def test_api_users_create(app, api_client):
84 153
    payload = {
85 154
        'username': 'janedoe',
86
-