Projet

Général

Profil

0001-api-make-sync-endpoint-adapt-to-permissions-by-OU-71.patch

Paul Marillonnet, 21 novembre 2022 14:28

Télécharger (5,92 ko)

Voir les différences:

Subject: [PATCH] api: make sync endpoint adapt to permissions by OU (#71506)

 src/authentic2/api_views.py            | 22 +++++++++++++--
 tests/api/test_user_synchronization.py | 28 +++++++++++++++++++
 tests/test_api_client.py               | 38 ++++++++++++++++++++++++++
 3 files changed, 86 insertions(+), 2 deletions(-)
src/authentic2/api_views.py
769 769
                queryset = queryset.none()
770 770
        return queryset
771 771

  
772
    def filter_queryset_by_ou_perm(self, perm):
773
        queryset = User.objects
774
        allowed_ous = []
775

  
776
        if self.request.user.has_perm(perm):
777
            return queryset
778

  
779
        for ou in OrganizationalUnit.objects.all():
780
            if self.request.user.has_ou_perm(perm, ou):
781
                allowed_ous.append(ou)
782
        if not allowed_ous:
783
            raise PermissionDenied("You do not have permission to perform this action.")
784

  
785
        queryset = queryset.filter(ou__in=allowed_ous)
786
        return queryset
787

  
772 788
    def update(self, request, *args, **kwargs):
773 789
        kwargs['partial'] = True
774 790
        return super().update(request, *args, **kwargs)
......
824 840
                    modified_users_uuids.add(users_pks[instance_pk].uuid)
825 841
        return modified_users_uuids
826 842

  
827
    @action(detail=False, methods=['post'], permission_classes=(DjangoPermission('custom_user.search_user'),))
843
    @action(detail=False, methods=['post'], permission_classes=(permissions.IsAuthenticated,))
828 844
    def synchronization(self, request):
829 845
        serializer = self.SynchronizationSerializer(data=request.data)
846
        queryset = self.filter_queryset_by_ou_perm('custom_user.search_user')
847

  
830 848
        if not serializer.is_valid():
831 849
            response = {'result': 0, 'errors': serializer.errors}
832 850
            return Response(response, status.HTTP_400_BAD_REQUEST)
833 851
        hooks.call_hooks('api_modify_serializer_after_validation', self, serializer)
834 852
        remote_uuids = serializer.validated_data.get('known_uuids', [])
835
        users = User.objects.filter(uuid__in=remote_uuids).only('id', 'uuid')
853
        users = queryset.filter(uuid__in=remote_uuids).only('id', 'uuid')
836 854
        unknown_uuids = self.check_unknown_uuids(remote_uuids, users)
837 855
        data = {
838 856
            'result': 1,
tests/api/test_user_synchronization.py
72 72
    app.post_json(URL, status=403)
73 73

  
74 74

  
75
def test_ou_permission_sufficient(app, user, ou1, users):
76
    user.roles.clear()
77
    r = Role.objects.get_admin_role(
78
        ContentType.objects.get_for_model(User),
79
        name='role',
80
        slug='role',
81
        ou=ou1,
82
        operation=SEARCH_OP,
83
    )
84
    user.roles.add(r)
85

  
86
    users = users[:6]
87
    now = datetime.datetime.now()
88
    content = {
89
        'known_uuids': [user.uuid for user in users],
90
        'timestamp': (now - datetime.timedelta(days=3)).isoformat(),
91
    }
92
    resp = app.post_json(URL, params=content, status=200)
93
    assert len(resp.json['unknown_uuids']) == 6
94

  
95
    for user in users[:4]:
96
        user.ou = ou1
97
        user.save()
98

  
99
    resp = app.post_json(URL, params=content, status=200)
100
    assert len(resp.json['unknown_uuids']) == 6 - 4
101

  
102

  
75 103
@pytest.fixture(scope='session')
76 104
def unknown_uuids():
77 105
    return [uuid.uuid4().hex for i in range(10)]
tests/test_api_client.py
7 7
from django.urls import reverse
8 8

  
9 9
from authentic2.a2_rbac.models import ADD_OP, SEARCH_OP, VIEW_OP, Role
10
from authentic2.a2_rbac.utils import get_default_ou
10 11
from authentic2.models import APIClient
11 12

  
12 13
User = get_user_model()
......
80 81
    assert set(response.json['unknown_uuids']) == set(unknown_uuids)
81 82

  
82 83

  
84
def test_api_user_synchronization_ou(app, api_client, ou1):
85
    uuids = []
86
    authorized_uuids = []
87
    for index in range(100):
88
        ou = ou1 if index % 2 else get_default_ou()
89
        user = User.objects.create(first_name='ben', last_name='dauve', ou=ou)
90
        uuids.append(user.uuid)
91
        if index % 2:
92
            authorized_uuids.append(user.uuid)
93
    unknown_uuids = [uuid.uuid4().hex for i in range(100)]
94
    url = reverse('a2-api-users-synchronization')
95
    content = {
96
        'known_uuids': uuids + unknown_uuids,
97
    }
98
    random.shuffle(content['known_uuids'])
99

  
100
    app.authorization = ('Basic', ('foo', 'bar'))
101
    response = app.post_json(url, params=content, status=401)
102

  
103
    app.authorization = ('Basic', (api_client.identifier, api_client.password))
104
    response = app.post_json(url, params=content, status=403)
105

  
106
    # give custom_user.search_user permission to user
107
    r = Role.objects.get_admin_role(
108
        ContentType.objects.get_for_model(User),
109
        name='role',
110
        slug='role',
111
        ou=ou1,
112
        operation=SEARCH_OP,
113
    )
114
    api_client.apiclient_roles.add(r)
115
    response = app.post_json(url, params=content)
116
    assert response.json['result'] == 1
117
    assert set(response.json['unknown_uuids']) != set(unknown_uuids)
118
    assert set(unknown_uuids).issubset(set(response.json['unknown_uuids']))
119

  
120

  
83 121
def test_api_users_create(app, api_client):
84 122
    payload = {
85 123
        'username': 'janedoe',
86
-