Projet

Général

Profil

0004-tests-split-user-synchronization-API-tests-67901.patch

Benjamin Dauvergne, 02 novembre 2022 14:22

Télécharger (12,5 ko)

Voir les différences:

Subject: [PATCH 4/7] tests: split user synchronization API tests (#67901)

 tests/api/test_all.py                  | 148 ---------------------
 tests/api/test_user_synchronization.py | 171 +++++++++++++++++++++++++
 2 files changed, 171 insertions(+), 148 deletions(-)
 create mode 100644 tests/api/test_user_synchronization.py
tests/api/test_all.py
17 17

  
18 18
import datetime
19 19
import json
20
import random
21
import uuid
22 20
from unittest import mock
23 21

  
24 22
import django
25 23
import pytest
26 24
from django.contrib.auth import get_user_model
27 25
from django.contrib.auth.hashers import check_password
28
from django.contrib.contenttypes.models import ContentType
29 26
from django.core import mail
30 27
from django.urls import reverse
31 28
from django.utils.encoding import force_str
32 29
from django.utils.text import slugify
33 30
from requests.models import Response
34 31

  
35
from authentic2.a2_rbac.models import SEARCH_OP
36 32
from authentic2.a2_rbac.models import OrganizationalUnit as OU
37 33
from authentic2.a2_rbac.models import Role
38 34
from authentic2.a2_rbac.utils import get_default_ou
......
1128 1124
    assert user.check_password(password)
1129 1125

  
1130 1126

  
1131
def test_user_synchronization(app, simple_user):
1132
    headers = basic_authorization_header(simple_user)
1133
    uuids = []
1134
    for _ in range(100):
1135
        user = User.objects.create(first_name='ben', last_name='dauve')
1136
        uuids.append(user.uuid)
1137
    unknown_uuids = [uuid.uuid4().hex for i in range(100)]
1138
    url = reverse('a2-api-users-synchronization')
1139
    content = {
1140
        'known_uuids': uuids + unknown_uuids,
1141
    }
1142
    random.shuffle(content['known_uuids'])
1143
    response = app.post_json(url, params=content, headers=headers, status=403)
1144

  
1145
    # give custom_user.search_user permission to user
1146
    r = Role.objects.get_admin_role(
1147
        ContentType.objects.get_for_model(User), name='role', slug='role', operation=SEARCH_OP
1148
    )
1149

  
1150
    r.members.add(simple_user)
1151
    response = app.post_json(url, params=content, headers=headers)
1152
    assert response.json['result'] == 1
1153
    assert set(response.json['unknown_uuids']) == set(unknown_uuids)
1154

  
1155

  
1156
def test_user_synchronization_full(app, admin):
1157
    headers = basic_authorization_header(admin)
1158
    uuids = []
1159
    for _ in range(100):
1160
        user = User.objects.create(first_name='jim', last_name='jam')
1161
        uuids.append(user.uuid)
1162
    unknown_uuids = [uuid.uuid4().hex for i in range(100)]
1163
    url = reverse('a2-api-users-synchronization')
1164
    content = {
1165
        'known_uuids': uuids + unknown_uuids,
1166
        'full_known_users': 1,
1167
    }
1168
    random.shuffle(content['known_uuids'])
1169
    response = app.post_json(url, params=content, headers=headers)
1170
    assert response.json['result'] == 1
1171

  
1172
    # known users returned as part of api's full mode:
1173
    assert len(response.json['known_users']) == 100
1174
    for user_dict in response.json['known_users']:
1175
        assert user_dict['first_name'] == 'jim'
1176
        assert user_dict['last_name'] == 'jam'
1177
        assert {
1178
            'uuid',
1179
            'email',
1180
            'is_staff',
1181
            'is_superuser',
1182
            'email_verified',
1183
            'ou',
1184
            'is_active',
1185
            'deactivation',
1186
            'modified',
1187
        }.issubset(set(user_dict.keys()))
1188

  
1189

  
1190
def test_user_synchronization_timestamp(app, admin):
1191
    headers = basic_authorization_header(admin)
1192
    url = reverse('a2-api-users-synchronization')
1193
    now = datetime.datetime.now()
1194

  
1195
    ou = get_default_ou()
1196
    User = get_user_model()
1197
    users = []
1198

  
1199
    for i in range(6):
1200
        users.append(
1201
            User.objects.create(
1202
                first_name='john%s' % i,
1203
                last_name='doe',
1204
                username='user%s' % i,
1205
                email='user%s' % i,
1206
                ou=ou,
1207
            )
1208
        )
1209

  
1210
    for i, event_name in enumerate(
1211
        [
1212
            'manager.user.creation',
1213
            'manager.user.profile.edit',
1214
            'manager.user.activation',
1215
            'manager.user.deactivation',
1216
            'manager.user.password.change.force',
1217
            'manager.user.password.change.unforce',
1218
        ]
1219
    ):
1220
        event_type = EventType.objects.get_for_name(event_name)
1221
        Event.objects.create(
1222
            type=event_type,
1223
            timestamp=now - datetime.timedelta(days=i, hours=1),
1224
            references=[users[i]],
1225
        )
1226

  
1227
    content = {
1228
        'known_uuids': [user.uuid for user in users],
1229
        'timestamp': (now - datetime.timedelta(days=3)).isoformat(),
1230
    }
1231

  
1232
    response = app.post(url, params=content, headers=headers)
1233

  
1234
    for user in users[:3]:
1235
        assert user.uuid in response.json['modified_users_uuids']
1236
        assert user.uuid not in response.json['unknown_uuids']
1237
    for user in users[3:]:
1238
        assert user.uuid not in response.json['modified_users_uuids']
1239
        assert user.uuid not in response.json['unknown_uuids']
1240

  
1241
    for user in users[:3]:
1242
        user.delete()
1243

  
1244
    content['timestamp'] = (now - datetime.timedelta(days=7)).isoformat()
1245

  
1246
    response = app.post(url, params=content, headers=headers)
1247

  
1248
    for user in users[:3]:
1249
        assert user.uuid not in response.json['modified_users_uuids']
1250
        assert user.uuid in response.json['unknown_uuids']
1251
    for user in users[3:]:
1252
        assert user.uuid in response.json['modified_users_uuids']
1253
        assert user.uuid not in response.json['unknown_uuids']
1254

  
1255
    for user in users[3:]:
1256
        user.delete()
1257

  
1258
    response = app.post(url, params=content, headers=headers)
1259

  
1260
    assert not response.json['modified_users_uuids']
1261
    for user in users:
1262
        assert user.uuid in response.json['unknown_uuids']
1263

  
1264
    for user in users[:3]:
1265
        content['known_uuids'].remove(user.uuid)
1266

  
1267
    response = app.post(url, params=content, headers=headers)
1268

  
1269
    assert not response.json['modified_users_uuids']
1270
    assert len(response.json['unknown_uuids']) == 3
1271
    for user in users[3:]:
1272
        assert user.uuid in response.json['unknown_uuids']
1273

  
1274

  
1275 1127
def test_api_drf_authentication_class(app, admin, user_ou1, oidc_client):
1276 1128
    url = '/api/users/%s/' % user_ou1.uuid
1277 1129
    # test invalid client
tests/api/test_user_synchronization.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2022 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import datetime
18
import random
19
import uuid
20

  
21
from django.contrib.contenttypes.models import ContentType
22
from django.urls import reverse
23

  
24
from authentic2.a2_rbac.models import SEARCH_OP, Role
25
from authentic2.a2_rbac.utils import get_default_ou
26
from authentic2.apps.journal.models import Event, EventType
27
from authentic2.custom_user.models import User
28

  
29
from ..utils import basic_authorization_header
30

  
31

  
32
def test_basic(app, simple_user):
33
    headers = basic_authorization_header(simple_user)
34
    uuids = []
35
    for _ in range(100):
36
        user = User.objects.create(first_name='ben', last_name='dauve')
37
        uuids.append(user.uuid)
38
    unknown_uuids = [uuid.uuid4().hex for i in range(100)]
39
    url = reverse('a2-api-users-synchronization')
40
    content = {
41
        'known_uuids': uuids + unknown_uuids,
42
    }
43
    random.shuffle(content['known_uuids'])
44

  
45
    # test permission check
46
    response = app.post_json(url, params=content, headers=headers, status=403)
47
    r = Role.objects.get_admin_role(
48
        ContentType.objects.get_for_model(User), name='role', slug='role', operation=SEARCH_OP
49
    )
50
    r.members.add(simple_user)
51
    response = app.post_json(url, params=content, headers=headers)
52
    assert response.json['result'] == 1
53
    assert set(response.json['unknown_uuids']) == set(unknown_uuids)
54

  
55

  
56
def test_full_known_users(app, admin):
57
    headers = basic_authorization_header(admin)
58
    uuids = []
59
    for _ in range(100):
60
        user = User.objects.create(first_name='jim', last_name='jam')
61
        uuids.append(user.uuid)
62
    unknown_uuids = [uuid.uuid4().hex for i in range(100)]
63
    url = reverse('a2-api-users-synchronization')
64
    content = {
65
        'known_uuids': uuids + unknown_uuids,
66
        'full_known_users': 1,
67
    }
68
    random.shuffle(content['known_uuids'])
69
    response = app.post_json(url, params=content, headers=headers)
70
    assert response.json['result'] == 1
71

  
72
    # known users returned as part of api's full mode:
73
    assert len(response.json['known_users']) == 100
74
    for user_dict in response.json['known_users']:
75
        assert user_dict['first_name'] == 'jim'
76
        assert user_dict['last_name'] == 'jam'
77
        assert {
78
            'uuid',
79
            'email',
80
            'is_staff',
81
            'is_superuser',
82
            'email_verified',
83
            'ou',
84
            'is_active',
85
            'deactivation',
86
            'modified',
87
        }.issubset(set(user_dict.keys()))
88

  
89

  
90
def test_timestamp(app, admin):
91
    headers = basic_authorization_header(admin)
92
    url = reverse('a2-api-users-synchronization')
93
    now = datetime.datetime.now()
94

  
95
    ou = get_default_ou()
96
    users = []
97

  
98
    for i in range(6):
99
        users.append(
100
            User.objects.create(
101
                first_name='john%s' % i,
102
                last_name='doe',
103
                username='user%s' % i,
104
                email='user%s' % i,
105
                ou=ou,
106
            )
107
        )
108

  
109
    for i, event_name in enumerate(
110
        [
111
            'manager.user.creation',
112
            'manager.user.profile.edit',
113
            'manager.user.activation',
114
            'manager.user.deactivation',
115
            'manager.user.password.change.force',
116
            'manager.user.password.change.unforce',
117
        ]
118
    ):
119
        event_type = EventType.objects.get_for_name(event_name)
120
        Event.objects.create(
121
            type=event_type,
122
            timestamp=now - datetime.timedelta(days=i, hours=1),
123
            references=[users[i]],
124
        )
125

  
126
    content = {
127
        'known_uuids': [user.uuid for user in users],
128
        'timestamp': (now - datetime.timedelta(days=3)).isoformat(),
129
    }
130

  
131
    response = app.post(url, params=content, headers=headers)
132

  
133
    for user in users[:3]:
134
        assert user.uuid in response.json['modified_users_uuids']
135
        assert user.uuid not in response.json['unknown_uuids']
136
    for user in users[3:]:
137
        assert user.uuid not in response.json['modified_users_uuids']
138
        assert user.uuid not in response.json['unknown_uuids']
139

  
140
    for user in users[:3]:
141
        user.delete()
142

  
143
    content['timestamp'] = (now - datetime.timedelta(days=7)).isoformat()
144

  
145
    response = app.post(url, params=content, headers=headers)
146

  
147
    for user in users[:3]:
148
        assert user.uuid not in response.json['modified_users_uuids']
149
        assert user.uuid in response.json['unknown_uuids']
150
    for user in users[3:]:
151
        assert user.uuid in response.json['modified_users_uuids']
152
        assert user.uuid not in response.json['unknown_uuids']
153

  
154
    for user in users[3:]:
155
        user.delete()
156

  
157
    response = app.post(url, params=content, headers=headers)
158

  
159
    assert not response.json['modified_users_uuids']
160
    for user in users:
161
        assert user.uuid in response.json['unknown_uuids']
162

  
163
    for user in users[:3]:
164
        content['known_uuids'].remove(user.uuid)
165

  
166
    response = app.post(url, params=content, headers=headers)
167

  
168
    assert not response.json['modified_users_uuids']
169
    assert len(response.json['unknown_uuids']) == 3
170
    for user in users[3:]:
171
        assert user.uuid in response.json['unknown_uuids']
0
-