From 68e81131ea6eff69075859b89c9bfc0627bb1846 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Fri, 7 Oct 2022 10:53:22 +0200 Subject: [PATCH 4/7] tests: split user synchronization API tests (#67901) --- tests/api/test_all.py | 148 --------------------- tests/api/test_user_synchronization.py | 171 +++++++++++++++++++++++++ 2 files changed, 171 insertions(+), 148 deletions(-) create mode 100644 tests/api/test_user_synchronization.py diff --git a/tests/api/test_all.py b/tests/api/test_all.py index 7e9fc25a..24bd654c 100644 --- a/tests/api/test_all.py +++ b/tests/api/test_all.py @@ -17,22 +17,18 @@ import datetime import json -import random -import uuid from unittest import mock import django import pytest from django.contrib.auth import get_user_model from django.contrib.auth.hashers import check_password -from django.contrib.contenttypes.models import ContentType from django.core import mail from django.urls import reverse from django.utils.encoding import force_str from django.utils.text import slugify from requests.models import Response -from authentic2.a2_rbac.models import SEARCH_OP from authentic2.a2_rbac.models import OrganizationalUnit as OU from authentic2.a2_rbac.models import Role from authentic2.a2_rbac.utils import get_default_ou @@ -1128,150 +1124,6 @@ def test_register_ou_no_email_validation(settings, app, admin, django_user_model assert user.check_password(password) -def test_user_synchronization(app, simple_user): - headers = basic_authorization_header(simple_user) - uuids = [] - for _ in range(100): - user = User.objects.create(first_name='ben', last_name='dauve') - uuids.append(user.uuid) - unknown_uuids = [uuid.uuid4().hex for i in range(100)] - url = reverse('a2-api-users-synchronization') - content = { - 'known_uuids': uuids + unknown_uuids, - } - random.shuffle(content['known_uuids']) - response = app.post_json(url, params=content, headers=headers, status=403) - - # give custom_user.search_user permission to user - r = Role.objects.get_admin_role( - ContentType.objects.get_for_model(User), name='role', slug='role', operation=SEARCH_OP - ) - - r.members.add(simple_user) - response = app.post_json(url, params=content, headers=headers) - assert response.json['result'] == 1 - assert set(response.json['unknown_uuids']) == set(unknown_uuids) - - -def test_user_synchronization_full(app, admin): - headers = basic_authorization_header(admin) - uuids = [] - for _ in range(100): - user = User.objects.create(first_name='jim', last_name='jam') - uuids.append(user.uuid) - unknown_uuids = [uuid.uuid4().hex for i in range(100)] - url = reverse('a2-api-users-synchronization') - content = { - 'known_uuids': uuids + unknown_uuids, - 'full_known_users': 1, - } - random.shuffle(content['known_uuids']) - response = app.post_json(url, params=content, headers=headers) - assert response.json['result'] == 1 - - # known users returned as part of api's full mode: - assert len(response.json['known_users']) == 100 - for user_dict in response.json['known_users']: - assert user_dict['first_name'] == 'jim' - assert user_dict['last_name'] == 'jam' - assert { - 'uuid', - 'email', - 'is_staff', - 'is_superuser', - 'email_verified', - 'ou', - 'is_active', - 'deactivation', - 'modified', - }.issubset(set(user_dict.keys())) - - -def test_user_synchronization_timestamp(app, admin): - headers = basic_authorization_header(admin) - url = reverse('a2-api-users-synchronization') - now = datetime.datetime.now() - - ou = get_default_ou() - User = get_user_model() - users = [] - - for i in range(6): - users.append( - User.objects.create( - first_name='john%s' % i, - last_name='doe', - username='user%s' % i, - email='user%s' % i, - ou=ou, - ) - ) - - for i, event_name in enumerate( - [ - 'manager.user.creation', - 'manager.user.profile.edit', - 'manager.user.activation', - 'manager.user.deactivation', - 'manager.user.password.change.force', - 'manager.user.password.change.unforce', - ] - ): - event_type = EventType.objects.get_for_name(event_name) - Event.objects.create( - type=event_type, - timestamp=now - datetime.timedelta(days=i, hours=1), - references=[users[i]], - ) - - content = { - 'known_uuids': [user.uuid for user in users], - 'timestamp': (now - datetime.timedelta(days=3)).isoformat(), - } - - response = app.post(url, params=content, headers=headers) - - for user in users[:3]: - assert user.uuid in response.json['modified_users_uuids'] - assert user.uuid not in response.json['unknown_uuids'] - for user in users[3:]: - assert user.uuid not in response.json['modified_users_uuids'] - assert user.uuid not in response.json['unknown_uuids'] - - for user in users[:3]: - user.delete() - - content['timestamp'] = (now - datetime.timedelta(days=7)).isoformat() - - response = app.post(url, params=content, headers=headers) - - for user in users[:3]: - assert user.uuid not in response.json['modified_users_uuids'] - assert user.uuid in response.json['unknown_uuids'] - for user in users[3:]: - assert user.uuid in response.json['modified_users_uuids'] - assert user.uuid not in response.json['unknown_uuids'] - - for user in users[3:]: - user.delete() - - response = app.post(url, params=content, headers=headers) - - assert not response.json['modified_users_uuids'] - for user in users: - assert user.uuid in response.json['unknown_uuids'] - - for user in users[:3]: - content['known_uuids'].remove(user.uuid) - - response = app.post(url, params=content, headers=headers) - - assert not response.json['modified_users_uuids'] - assert len(response.json['unknown_uuids']) == 3 - for user in users[3:]: - assert user.uuid in response.json['unknown_uuids'] - - def test_api_drf_authentication_class(app, admin, user_ou1, oidc_client): url = '/api/users/%s/' % user_ou1.uuid # test invalid client diff --git a/tests/api/test_user_synchronization.py b/tests/api/test_user_synchronization.py new file mode 100644 index 00000000..f3c3c572 --- /dev/null +++ b/tests/api/test_user_synchronization.py @@ -0,0 +1,171 @@ +# authentic2 - versatile identity manager +# Copyright (C) 2010-2022 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import datetime +import random +import uuid + +from django.contrib.contenttypes.models import ContentType +from django.urls import reverse + +from authentic2.a2_rbac.models import SEARCH_OP, Role +from authentic2.a2_rbac.utils import get_default_ou +from authentic2.apps.journal.models import Event, EventType +from authentic2.custom_user.models import User + +from ..utils import basic_authorization_header + + +def test_basic(app, simple_user): + headers = basic_authorization_header(simple_user) + uuids = [] + for _ in range(100): + user = User.objects.create(first_name='ben', last_name='dauve') + uuids.append(user.uuid) + unknown_uuids = [uuid.uuid4().hex for i in range(100)] + url = reverse('a2-api-users-synchronization') + content = { + 'known_uuids': uuids + unknown_uuids, + } + random.shuffle(content['known_uuids']) + + # test permission check + response = app.post_json(url, params=content, headers=headers, status=403) + r = Role.objects.get_admin_role( + ContentType.objects.get_for_model(User), name='role', slug='role', operation=SEARCH_OP + ) + r.members.add(simple_user) + response = app.post_json(url, params=content, headers=headers) + assert response.json['result'] == 1 + assert set(response.json['unknown_uuids']) == set(unknown_uuids) + + +def test_full_known_users(app, admin): + headers = basic_authorization_header(admin) + uuids = [] + for _ in range(100): + user = User.objects.create(first_name='jim', last_name='jam') + uuids.append(user.uuid) + unknown_uuids = [uuid.uuid4().hex for i in range(100)] + url = reverse('a2-api-users-synchronization') + content = { + 'known_uuids': uuids + unknown_uuids, + 'full_known_users': 1, + } + random.shuffle(content['known_uuids']) + response = app.post_json(url, params=content, headers=headers) + assert response.json['result'] == 1 + + # known users returned as part of api's full mode: + assert len(response.json['known_users']) == 100 + for user_dict in response.json['known_users']: + assert user_dict['first_name'] == 'jim' + assert user_dict['last_name'] == 'jam' + assert { + 'uuid', + 'email', + 'is_staff', + 'is_superuser', + 'email_verified', + 'ou', + 'is_active', + 'deactivation', + 'modified', + }.issubset(set(user_dict.keys())) + + +def test_timestamp(app, admin): + headers = basic_authorization_header(admin) + url = reverse('a2-api-users-synchronization') + now = datetime.datetime.now() + + ou = get_default_ou() + users = [] + + for i in range(6): + users.append( + User.objects.create( + first_name='john%s' % i, + last_name='doe', + username='user%s' % i, + email='user%s' % i, + ou=ou, + ) + ) + + for i, event_name in enumerate( + [ + 'manager.user.creation', + 'manager.user.profile.edit', + 'manager.user.activation', + 'manager.user.deactivation', + 'manager.user.password.change.force', + 'manager.user.password.change.unforce', + ] + ): + event_type = EventType.objects.get_for_name(event_name) + Event.objects.create( + type=event_type, + timestamp=now - datetime.timedelta(days=i, hours=1), + references=[users[i]], + ) + + content = { + 'known_uuids': [user.uuid for user in users], + 'timestamp': (now - datetime.timedelta(days=3)).isoformat(), + } + + response = app.post(url, params=content, headers=headers) + + for user in users[:3]: + assert user.uuid in response.json['modified_users_uuids'] + assert user.uuid not in response.json['unknown_uuids'] + for user in users[3:]: + assert user.uuid not in response.json['modified_users_uuids'] + assert user.uuid not in response.json['unknown_uuids'] + + for user in users[:3]: + user.delete() + + content['timestamp'] = (now - datetime.timedelta(days=7)).isoformat() + + response = app.post(url, params=content, headers=headers) + + for user in users[:3]: + assert user.uuid not in response.json['modified_users_uuids'] + assert user.uuid in response.json['unknown_uuids'] + for user in users[3:]: + assert user.uuid in response.json['modified_users_uuids'] + assert user.uuid not in response.json['unknown_uuids'] + + for user in users[3:]: + user.delete() + + response = app.post(url, params=content, headers=headers) + + assert not response.json['modified_users_uuids'] + for user in users: + assert user.uuid in response.json['unknown_uuids'] + + for user in users[:3]: + content['known_uuids'].remove(user.uuid) + + response = app.post(url, params=content, headers=headers) + + assert not response.json['modified_users_uuids'] + assert len(response.json['unknown_uuids']) == 3 + for user in users[3:]: + assert user.uuid in response.json['unknown_uuids'] -- 2.37.2