Projet

Général

Profil

0004-tests-split-user-synchronization-API-tests-67901.patch

Benjamin Dauvergne, 10 octobre 2022 10:37

Télécharger (12,6 ko)

Voir les différences:

Subject: [PATCH 4/6] tests: split user synchronization API tests (#67901)

 tests/api/test_all.py                  | 148 ---------------------
 tests/api/test_user_synchronization.py | 172 +++++++++++++++++++++++++
 2 files changed, 172 insertions(+), 148 deletions(-)
 create mode 100644 tests/api/test_user_synchronization.py
tests/api/test_all.py
17 17

  
18 18
import datetime
19 19
import json
20
import random
21
import uuid
22 20
from unittest import mock
23 21

  
24 22
import django
25 23
import pytest
26 24
from django.contrib.auth import get_user_model
27 25
from django.contrib.auth.hashers import check_password
28
from django.contrib.contenttypes.models import ContentType
29 26
from django.core import mail
30 27
from django.urls import reverse
31 28
from django.utils.encoding import force_str
......
40 37
from authentic2.models import APIClient, Attribute, AttributeValue, AuthorizedRole, PasswordReset, Service
41 38
from authentic2.utils.misc import good_next_url
42 39
from authentic2_idp_cas.models import Service as CASService
43
from django_rbac.models import SEARCH_OP
44 40

  
45 41
from ..utils import assert_event, basic_authorization_header, get_link_from_mail, login
46 42

  
......
1126 1122
    assert user.check_password(password)
1127 1123

  
1128 1124

  
1129
def test_user_synchronization(app, simple_user):
1130
    headers = basic_authorization_header(simple_user)
1131
    uuids = []
1132
    for _ in range(100):
1133
        user = User.objects.create(first_name='ben', last_name='dauve')
1134
        uuids.append(user.uuid)
1135
    unknown_uuids = [uuid.uuid4().hex for i in range(100)]
1136
    url = reverse('a2-api-users-synchronization')
1137
    content = {
1138
        'known_uuids': uuids + unknown_uuids,
1139
    }
1140
    random.shuffle(content['known_uuids'])
1141
    response = app.post_json(url, params=content, headers=headers, status=403)
1142

  
1143
    # give custom_user.search_user permission to user
1144
    r = Role.objects.get_admin_role(
1145
        ContentType.objects.get_for_model(User), name='role', slug='role', operation=SEARCH_OP
1146
    )
1147

  
1148
    r.members.add(simple_user)
1149
    response = app.post_json(url, params=content, headers=headers)
1150
    assert response.json['result'] == 1
1151
    assert set(response.json['unknown_uuids']) == set(unknown_uuids)
1152

  
1153

  
1154
def test_user_synchronization_full(app, admin):
1155
    headers = basic_authorization_header(admin)
1156
    uuids = []
1157
    for _ in range(100):
1158
        user = User.objects.create(first_name='jim', last_name='jam')
1159
        uuids.append(user.uuid)
1160
    unknown_uuids = [uuid.uuid4().hex for i in range(100)]
1161
    url = reverse('a2-api-users-synchronization')
1162
    content = {
1163
        'known_uuids': uuids + unknown_uuids,
1164
        'full_known_users': 1,
1165
    }
1166
    random.shuffle(content['known_uuids'])
1167
    response = app.post_json(url, params=content, headers=headers)
1168
    assert response.json['result'] == 1
1169

  
1170
    # known users returned as part of api's full mode:
1171
    assert len(response.json['known_users']) == 100
1172
    for user_dict in response.json['known_users']:
1173
        assert user_dict['first_name'] == 'jim'
1174
        assert user_dict['last_name'] == 'jam'
1175
        assert {
1176
            'uuid',
1177
            'email',
1178
            'is_staff',
1179
            'is_superuser',
1180
            'email_verified',
1181
            'ou',
1182
            'is_active',
1183
            'deactivation',
1184
            'modified',
1185
        }.issubset(set(user_dict.keys()))
1186

  
1187

  
1188
def test_user_synchronization_timestamp(app, admin):
1189
    headers = basic_authorization_header(admin)
1190
    url = reverse('a2-api-users-synchronization')
1191
    now = datetime.datetime.now()
1192

  
1193
    ou = get_default_ou()
1194
    User = get_user_model()
1195
    users = []
1196

  
1197
    for i in range(6):
1198
        users.append(
1199
            User.objects.create(
1200
                first_name='john%s' % i,
1201
                last_name='doe',
1202
                username='user%s' % i,
1203
                email='user%s' % i,
1204
                ou=ou,
1205
            )
1206
        )
1207

  
1208
    for i, event_name in enumerate(
1209
        [
1210
            'manager.user.creation',
1211
            'manager.user.profile.edit',
1212
            'manager.user.activation',
1213
            'manager.user.deactivation',
1214
            'manager.user.password.change.force',
1215
            'manager.user.password.change.unforce',
1216
        ]
1217
    ):
1218
        event_type = EventType.objects.get_for_name(event_name)
1219
        Event.objects.create(
1220
            type=event_type,
1221
            timestamp=now - datetime.timedelta(days=i, hours=1),
1222
            references=[users[i]],
1223
        )
1224

  
1225
    content = {
1226
        'known_uuids': [user.uuid for user in users],
1227
        'timestamp': (now - datetime.timedelta(days=3)).isoformat(),
1228
    }
1229

  
1230
    response = app.post(url, params=content, headers=headers)
1231

  
1232
    for user in users[:3]:
1233
        assert user.uuid in response.json['modified_users_uuids']
1234
        assert user.uuid not in response.json['unknown_uuids']
1235
    for user in users[3:]:
1236
        assert user.uuid not in response.json['modified_users_uuids']
1237
        assert user.uuid not in response.json['unknown_uuids']
1238

  
1239
    for user in users[:3]:
1240
        user.delete()
1241

  
1242
    content['timestamp'] = (now - datetime.timedelta(days=7)).isoformat()
1243

  
1244
    response = app.post(url, params=content, headers=headers)
1245

  
1246
    for user in users[:3]:
1247
        assert user.uuid not in response.json['modified_users_uuids']
1248
        assert user.uuid in response.json['unknown_uuids']
1249
    for user in users[3:]:
1250
        assert user.uuid in response.json['modified_users_uuids']
1251
        assert user.uuid not in response.json['unknown_uuids']
1252

  
1253
    for user in users[3:]:
1254
        user.delete()
1255

  
1256
    response = app.post(url, params=content, headers=headers)
1257

  
1258
    assert not response.json['modified_users_uuids']
1259
    for user in users:
1260
        assert user.uuid in response.json['unknown_uuids']
1261

  
1262
    for user in users[:3]:
1263
        content['known_uuids'].remove(user.uuid)
1264

  
1265
    response = app.post(url, params=content, headers=headers)
1266

  
1267
    assert not response.json['modified_users_uuids']
1268
    assert len(response.json['unknown_uuids']) == 3
1269
    for user in users[3:]:
1270
        assert user.uuid in response.json['unknown_uuids']
1271

  
1272

  
1273 1125
def test_api_drf_authentication_class(app, admin, user_ou1, oidc_client):
1274 1126
    url = '/api/users/%s/' % user_ou1.uuid
1275 1127
    # test invalid client
tests/api/test_user_synchronization.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2022 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import datetime
18
import random
19
import uuid
20

  
21
from django.contrib.contenttypes.models import ContentType
22
from django.urls import reverse
23

  
24
from authentic2.a2_rbac.models import Role
25
from authentic2.a2_rbac.utils import get_default_ou
26
from authentic2.apps.journal.models import Event, EventType
27
from authentic2.custom_user.models import User
28
from django_rbac.models import SEARCH_OP
29

  
30
from ..utils import basic_authorization_header
31

  
32

  
33
def test_basic(app, simple_user):
34
    headers = basic_authorization_header(simple_user)
35
    uuids = []
36
    for _ in range(100):
37
        user = User.objects.create(first_name='ben', last_name='dauve')
38
        uuids.append(user.uuid)
39
    unknown_uuids = [uuid.uuid4().hex for i in range(100)]
40
    url = reverse('a2-api-users-synchronization')
41
    content = {
42
        'known_uuids': uuids + unknown_uuids,
43
    }
44
    random.shuffle(content['known_uuids'])
45

  
46
    # test permission check
47
    response = app.post_json(url, params=content, headers=headers, status=403)
48
    r = Role.objects.get_admin_role(
49
        ContentType.objects.get_for_model(User), name='role', slug='role', operation=SEARCH_OP
50
    )
51
    r.members.add(simple_user)
52
    response = app.post_json(url, params=content, headers=headers)
53
    assert response.json['result'] == 1
54
    assert set(response.json['unknown_uuids']) == set(unknown_uuids)
55

  
56

  
57
def test_full_known_users(app, admin):
58
    headers = basic_authorization_header(admin)
59
    uuids = []
60
    for _ in range(100):
61
        user = User.objects.create(first_name='jim', last_name='jam')
62
        uuids.append(user.uuid)
63
    unknown_uuids = [uuid.uuid4().hex for i in range(100)]
64
    url = reverse('a2-api-users-synchronization')
65
    content = {
66
        'known_uuids': uuids + unknown_uuids,
67
        'full_known_users': 1,
68
    }
69
    random.shuffle(content['known_uuids'])
70
    response = app.post_json(url, params=content, headers=headers)
71
    assert response.json['result'] == 1
72

  
73
    # known users returned as part of api's full mode:
74
    assert len(response.json['known_users']) == 100
75
    for user_dict in response.json['known_users']:
76
        assert user_dict['first_name'] == 'jim'
77
        assert user_dict['last_name'] == 'jam'
78
        assert {
79
            'uuid',
80
            'email',
81
            'is_staff',
82
            'is_superuser',
83
            'email_verified',
84
            'ou',
85
            'is_active',
86
            'deactivation',
87
            'modified',
88
        }.issubset(set(user_dict.keys()))
89

  
90

  
91
def test_timestamp(app, admin):
92
    headers = basic_authorization_header(admin)
93
    url = reverse('a2-api-users-synchronization')
94
    now = datetime.datetime.now()
95

  
96
    ou = get_default_ou()
97
    users = []
98

  
99
    for i in range(6):
100
        users.append(
101
            User.objects.create(
102
                first_name='john%s' % i,
103
                last_name='doe',
104
                username='user%s' % i,
105
                email='user%s' % i,
106
                ou=ou,
107
            )
108
        )
109

  
110
    for i, event_name in enumerate(
111
        [
112
            'manager.user.creation',
113
            'manager.user.profile.edit',
114
            'manager.user.activation',
115
            'manager.user.deactivation',
116
            'manager.user.password.change.force',
117
            'manager.user.password.change.unforce',
118
        ]
119
    ):
120
        event_type = EventType.objects.get_for_name(event_name)
121
        Event.objects.create(
122
            type=event_type,
123
            timestamp=now - datetime.timedelta(days=i, hours=1),
124
            references=[users[i]],
125
        )
126

  
127
    content = {
128
        'known_uuids': [user.uuid for user in users],
129
        'timestamp': (now - datetime.timedelta(days=3)).isoformat(),
130
    }
131

  
132
    response = app.post(url, params=content, headers=headers)
133

  
134
    for user in users[:3]:
135
        assert user.uuid in response.json['modified_users_uuids']
136
        assert user.uuid not in response.json['unknown_uuids']
137
    for user in users[3:]:
138
        assert user.uuid not in response.json['modified_users_uuids']
139
        assert user.uuid not in response.json['unknown_uuids']
140

  
141
    for user in users[:3]:
142
        user.delete()
143

  
144
    content['timestamp'] = (now - datetime.timedelta(days=7)).isoformat()
145

  
146
    response = app.post(url, params=content, headers=headers)
147

  
148
    for user in users[:3]:
149
        assert user.uuid not in response.json['modified_users_uuids']
150
        assert user.uuid in response.json['unknown_uuids']
151
    for user in users[3:]:
152
        assert user.uuid in response.json['modified_users_uuids']
153
        assert user.uuid not in response.json['unknown_uuids']
154

  
155
    for user in users[3:]:
156
        user.delete()
157

  
158
    response = app.post(url, params=content, headers=headers)
159

  
160
    assert not response.json['modified_users_uuids']
161
    for user in users:
162
        assert user.uuid in response.json['unknown_uuids']
163

  
164
    for user in users[:3]:
165
        content['known_uuids'].remove(user.uuid)
166

  
167
    response = app.post(url, params=content, headers=headers)
168

  
169
    assert not response.json['modified_users_uuids']
170
    assert len(response.json['unknown_uuids']) == 3
171
    for user in users[3:]:
172
        assert user.uuid in response.json['unknown_uuids']
0
-