From 4890ed834e8cf3d014ca117d2263af1ee0ab867d Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Thu, 10 Jun 2021 13:16:04 +0200 Subject: [PATCH 2/3] tests: move idp_oidc tests in a subdirectory (#54740) --- tests/idp_oidc/__init__.py | 0 tests/idp_oidc/conftest.py | 148 +++++++ tests/idp_oidc/test_api.py | 41 ++ tests/idp_oidc/test_migrations.py | 125 ++++++ .../test_misc.py} | 416 +----------------- tests/idp_oidc/test_models.py | 115 +++++ tests/idp_oidc/test_views.py | 90 ++++ 7 files changed, 522 insertions(+), 413 deletions(-) create mode 100644 tests/idp_oidc/__init__.py create mode 100644 tests/idp_oidc/conftest.py create mode 100644 tests/idp_oidc/test_api.py create mode 100644 tests/idp_oidc/test_migrations.py rename tests/{test_idp_oidc.py => idp_oidc/test_misc.py} (79%) create mode 100644 tests/idp_oidc/test_models.py create mode 100644 tests/idp_oidc/test_views.py diff --git a/tests/idp_oidc/__init__.py b/tests/idp_oidc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/idp_oidc/conftest.py b/tests/idp_oidc/conftest.py new file mode 100644 index 00000000..2aa431c4 --- /dev/null +++ b/tests/idp_oidc/conftest.py @@ -0,0 +1,148 @@ +# authentic2 - versatile identity manager +# Copyright (C) 2010-2021 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import base64 +from importlib import import_module + +import pytest +from django.urls import reverse + +from authentic2.a2_rbac.utils import get_default_ou +from authentic2.models import Attribute +from authentic2_idp_oidc import app_settings +from authentic2_idp_oidc.models import OIDCClaim, OIDCClient +from tests import utils + +JWKSET = { + "keys": [ + { + "qi": "h_zifVD-ChelxZUVxhICNcgGkQz26b-EdIlLY9rN7SX_aD3sLI_JHEHV4Bz3kV5eW8O4qJ8SHhfUdHGK-gRH7FVOGoXnXACf47QoXowHzsPLL64wCuZENTl7hIRGLY-BInULkfTQfuiVSMoxPjsVNTMBzMiz0bNjMQyMyvW5xH4", + "kty": "RSA", + "d": "pUcL4-LDBy3rqJWip269h5Hd6nLvqjXltfkVe_mL-LwZPHmCrUaj_SX54SnCY3Wyf7kxhoMYUac62lQ71923uJPFFdiavAujbNrtZPq32i4C-1apWXW8OGJr8VoVDqalxj9SAq1G54wbbsaAPrZdyuqy-esNxDqDigfbM-cWgngBBYo5CSsfnmnd05N2cUS26L7QzWbNHwilnBTE9e_J7rK3xUCDKrobv6_LiI-AhMmBHJSrCxjexh0wzfBi_Ntj9BGCcPThDjG8SQvaV-aLNdLfIy2XO3i076RLBB6Hm_yHuAparrwp-pPE48eQdiYjrSAFalz4ojWQ3_ByLA6uAQ", + "q": "2FvfeWnIlWNUipan7DIBlJrmz5EinJNxrQ-BNwPHrAoIM8qvyC7jPy09YxZs5Y9CMMZSal6C4Nm2LHBFxHU9z1qd5XDzbk19G-y1lDqZizVXr876TpiAjuq03rcoMQm8dQru_pVjUdgxR64vKyJ9CaFMAqcpZeEMIqAvzhQG8uE", + "dp": "Kg4HPGpzenhK2ser6nfM1Yt-pkqBbWQotvqsxGptECXpbN7vweupvL5kJPeRrbsXKp9QE7DXTN1sG9puJxMSwtgiv4hr9Va9e9WOC6PMd2VY7tgw5uKMpPLMc5y82PusRhBoRh0SUUsjyQxK9PGtWYnGZXbAoaIYPdMyDlosfqU", + "dq": "QuUNEHYTjZTbo8n2-4FumarXKGBAalbwM8jyc7cYemnTpWfKt8M_gd4T99oMK2IC3h_DhZ3ZK3pE6DKCb76sMLtczH8C1RziTMsATWdc5_zDMtl07O4b-ZQ5_g51P8w515pc0JwRzFFi0z3Y2aZdMKgNX1id5SES5nXOshHhICE", + "n": "0lN6CiJGFD8BSPV_azLoEl6Nq-WlHkU743D5rqvzw1sOaxstMGxAhVk2YIhWwfvapV6XjO_yvc4778VBTELOdjRw6BGUdBJepdwkL__TPyjEVhqMQj9MKhEU4GUy9w0Lsilb5D01kfrOKpmdcYw4jhcDvb0H4-LZgh1Vk84vF4WaQCUg_AX4drVDQOjoU8kuWIM8gz9w6zEsbIw-gtMRpFwS8ncA0zDX5VfyC77iMxzFftDIP2gM5GvdevMzvP9IRkRRBhP9vV4JchBFPHSA9OPJcnySjJJNW6aAJn6P6JasN1z68khjufM09J8UzmLAZYOq7gUG95Ox1KsV-g337Q", + "e": "AQAB", + "p": "-Nyj_Sw3f2HUqSssCZv84y7b3blOtGGAhfYN_JtGfcTQv2bOtxrIUzeonCi-Z_1W4hO10tqxJcOB0ibtDqkDlLhnLaIYOBfriITRFK83EJG5sC-0KTmFzUXFTA2aMc1QgP-Fu6gUfQpPqLgWxhx8EFhkBlBZshKU5-C-385Sco0", + "kid": "46c686ea-7d4e-41cd-a462-2125fc1dee0e", + }, + { + "kty": "EC", + "d": "wwULaR9UYWZW6U2oEbkz3sO1lhPSj6DyA6e7PiUfhog", + "use": "sig", + "crv": "P-256", + "x": "HZMHZkX-63heqA5pvWn-UR7bgcXZNEcQa5wfvG_BzTw", + "y": "SUCuwjjiyKvGq5Odr0sjDqjha_CBqks0JQFrR7Ei5OQ", + "alg": "ES256", + "kid": "ac85baf4-835b-49b2-8272-ffecce7654c9", + }, + ] +} + + +@pytest.fixture +def jwkset(): + return JWKSET + + +@pytest.fixture +def oidc_settings(settings, jwkset): + settings.A2_IDP_OIDC_JWKSET = jwkset + settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT = '100/m' + return settings + + +def make_client(app, superuser, params=None): + Attribute.objects.create( + name='cityscape_image', + label='cityscape', + kind='profile_image', + asked_on_registration=True, + required=False, + user_visible=True, + user_editable=True, + ) + + client = OIDCClient( + name='oidcclient', + slug='oidcclient', + ou=get_default_ou(), + unauthorized_url='https://example.com/southpark/', + redirect_uris='https://example.com/callbac%C3%A9', + ) + + for key, value in (params or {}).items(): + setattr(client, key, value) + client.save() + for mapping in app_settings.DEFAULT_MAPPINGS: + OIDCClaim.objects.create( + client=client, name=mapping['name'], value=mapping['value'], scopes=mapping['scopes'] + ) + return client + + +@pytest.fixture +def client(app, superuser): + return make_client(app, superuser, {}) + + +@pytest.fixture +def simple_oidc_client(db): + return OIDCClient.objects.create( + name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/' + ) + + +@pytest.fixture +def oidc_client(request, superuser, app, simple_user, oidc_settings): + return make_client(app, superuser, getattr(request, 'param', None) or {}) + + +@pytest.fixture +def normal_oidc_client(superuser, app, simple_user): + url = reverse('admin:authentic2_idp_oidc_oidcclient_add') + assert OIDCClient.objects.count() == 0 + response = utils.login(app, superuser, path=url) + response.form.set('name', 'oidcclient') + response.form.set('slug', 'oidcclient') + response.form.set('ou', get_default_ou().pk) + response.form.set('unauthorized_url', 'https://example.com/southpark/') + response.form.set('redirect_uris', 'https://example.com/callbac%C3%A9') + response = response.form.submit(name='_save').follow() + assert OIDCClient.objects.count() == 1 + client = OIDCClient.objects.get() + utils.logout(app) + return client + + +@pytest.fixture +def session(settings, db, simple_user): + engine = import_module(settings.SESSION_ENGINE) + session = engine.SessionStore() + session['_auth_user_id'] = str(simple_user.id) + session.create() + return session + + +def client_authentication_headers(oidc_client): + client_creds = '%s:%s' % (oidc_client.client_id, oidc_client.client_secret) + token = base64.b64encode(client_creds.encode('ascii')) + return {'Authorization': 'Basic %s' % str(token.decode('ascii'))} + + +def bearer_authentication_headers(access_token): + return {'Authorization': 'Bearer %s' % str(access_token)} diff --git a/tests/idp_oidc/test_api.py b/tests/idp_oidc/test_api.py new file mode 100644 index 00000000..ad36aead --- /dev/null +++ b/tests/idp_oidc/test_api.py @@ -0,0 +1,41 @@ +# authentic2 - versatile identity manager +# Copyright (C) 2010-2021 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from authentic2.custom_user.models import User +from authentic2_idp_oidc.models import OIDCClient +from authentic2_idp_oidc.utils import make_sub + + +def test_api_synchronization(app, oidc_client): + oidc_client.has_api_access = True + oidc_client.save() + users = [User.objects.create(username='user-%s' % i) for i in range(10)] + for user in users[5:]: + user.delete() + deleted_subs = set(make_sub(oidc_client, user) for user in users[5:]) + + app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret)) + status = 200 + if oidc_client.identifier_policy not in (OIDCClient.POLICY_PAIRWISE_REVERSIBLE, OIDCClient.POLICY_UUID): + status = 401 + response = app.post_json( + '/api/users/synchronization/', + params={'known_uuids': [make_sub(oidc_client, user) for user in users]}, + status=status, + ) + if status == 200: + assert response.json['result'] == 1 + assert set(response.json['unknown_uuids']) == deleted_subs diff --git a/tests/idp_oidc/test_migrations.py b/tests/idp_oidc/test_migrations.py new file mode 100644 index 00000000..e8c25d98 --- /dev/null +++ b/tests/idp_oidc/test_migrations.py @@ -0,0 +1,125 @@ +# authentic2 - versatile identity manager +# Copyright (C) 2010-2021 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +def test_oidclient_claims_data_migration(migration): + app = 'authentic2_idp_oidc' + migrate_from = [(app, '0009_auto_20180313_1156')] + migrate_to = [(app, '0010_oidcclaim')] + + old_apps = migration.before(migrate_from) + OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient') + + client = OIDCClient(name='test', slug='test', redirect_uris='https://example.net/') + client.save() + + new_apps = migration.apply(migrate_to) + OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient') + OIDCClaim = new_apps.get_model('authentic2_idp_oidc', 'OIDCClaim') + + client = OIDCClient.objects.first() + assert OIDCClaim.objects.filter(client=client.id).count() == 5 + + +def test_oidclient_preferred_username_as_identifier_data_migration(migration): + app = 'authentic2_idp_oidc' + migrate_from = [(app, '0010_oidcclaim')] + migrate_to = [(app, '0011_auto_20180808_1546')] + + old_apps = migration.before(migrate_from) + OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient') + OIDCClaim = old_apps.get_model('authentic2_idp_oidc', 'OIDCClaim') + + client1 = OIDCClient.objects.create(name='test', slug='test', redirect_uris='https://example.net/') + client2 = OIDCClient.objects.create(name='test1', slug='test1', redirect_uris='https://example.net/') + client3 = OIDCClient.objects.create(name='test2', slug='test2', redirect_uris='https://example.net/') + client4 = OIDCClient.objects.create(name='test3', slug='test3', redirect_uris='https://example.net/') + for client in (client1, client2, client3, client4): + if client.name == 'test1': + continue + if client.name == 'test3': + OIDCClaim.objects.create( + client=client, name='preferred_username', value='django_user_full_name', scopes='profile' + ) + else: + OIDCClaim.objects.create( + client=client, name='preferred_username', value='django_user_username', scopes='profile' + ) + OIDCClaim.objects.create( + client=client, name='given_name', value='django_user_first_name', scopes='profile' + ) + OIDCClaim.objects.create( + client=client, name='family_name', value='django_user_last_name', scopes='profile' + ) + if client.name == 'test2': + continue + OIDCClaim.objects.create(client=client, name='email', value='django_user_email', scopes='email') + OIDCClaim.objects.create( + client=client, name='email_verified', value='django_user_email_verified', scopes='email' + ) + + new_apps = migration.apply(migrate_to) + OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient') + + client = OIDCClient.objects.first() + for client in OIDCClient.objects.all(): + claims = client.oidcclaim_set.all() + if client.name == 'test': + assert claims.count() == 5 + assert sorted(claims.values_list('name', flat=True)) == [ + 'email', + 'email_verified', + 'family_name', + 'given_name', + 'preferred_username', + ] + assert sorted(claims.values_list('value', flat=True)) == [ + 'django_user_email', + 'django_user_email_verified', + 'django_user_first_name', + 'django_user_identifier', + 'django_user_last_name', + ] + elif client.name == 'test2': + assert claims.count() == 3 + assert sorted(claims.values_list('name', flat=True)) == [ + 'family_name', + 'given_name', + 'preferred_username', + ] + assert sorted(claims.values_list('value', flat=True)) == [ + 'django_user_first_name', + 'django_user_last_name', + 'django_user_username', + ] + elif client.name == 'test3': + assert claims.count() == 5 + assert sorted(claims.values_list('name', flat=True)) == [ + 'email', + 'email_verified', + 'family_name', + 'given_name', + 'preferred_username', + ] + assert sorted(claims.values_list('value', flat=True)) == [ + 'django_user_email', + 'django_user_email_verified', + 'django_user_first_name', + 'django_user_full_name', + 'django_user_last_name', + ] + else: + assert claims.count() == 0 diff --git a/tests/test_idp_oidc.py b/tests/idp_oidc/test_misc.py similarity index 79% rename from tests/test_idp_oidc.py rename to tests/idp_oidc/test_misc.py index 04745073..b75a570f 100644 --- a/tests/test_idp_oidc.py +++ b/tests/idp_oidc/test_misc.py @@ -1,5 +1,5 @@ # authentic2 - versatile identity manager -# Copyright (C) 2010-2019 Entr'ouvert +# Copyright (C) 2010-2021 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published @@ -19,7 +19,6 @@ import datetime import functools import json import urllib.parse -from importlib import import_module import pytest from django.contrib.auth import get_user_model @@ -37,51 +36,17 @@ from authentic2.a2_rbac.utils import get_default_ou from authentic2.models import Attribute, AuthorizedRole from authentic2.utils import good_next_url, make_url from authentic2_auth_oidc.utils import parse_timestamp -from authentic2_idp_oidc import app_settings from authentic2_idp_oidc.models import OIDCAccessToken, OIDCAuthorization, OIDCClaim, OIDCClient, OIDCCode from authentic2_idp_oidc.utils import base64url, get_first_ec_sig_key, get_first_rsa_sig_key, make_sub from django_rbac.utils import get_ou_model, get_role_model -from . import utils +from .. import utils +from .conftest import bearer_authentication_headers, client_authentication_headers User = get_user_model() pytestmark = pytest.mark.django_db -JWKSET = { - "keys": [ - { - "qi": "h_zifVD-ChelxZUVxhICNcgGkQz26b-EdIlLY9rN7SX_aD3sLI_JHEHV4Bz3kV5eW8O4qJ8SHhfUdHGK-gRH7FVOGoXnXACf47QoXowHzsPLL64wCuZENTl7hIRGLY-BInULkfTQfuiVSMoxPjsVNTMBzMiz0bNjMQyMyvW5xH4", - "kty": "RSA", - "d": "pUcL4-LDBy3rqJWip269h5Hd6nLvqjXltfkVe_mL-LwZPHmCrUaj_SX54SnCY3Wyf7kxhoMYUac62lQ71923uJPFFdiavAujbNrtZPq32i4C-1apWXW8OGJr8VoVDqalxj9SAq1G54wbbsaAPrZdyuqy-esNxDqDigfbM-cWgngBBYo5CSsfnmnd05N2cUS26L7QzWbNHwilnBTE9e_J7rK3xUCDKrobv6_LiI-AhMmBHJSrCxjexh0wzfBi_Ntj9BGCcPThDjG8SQvaV-aLNdLfIy2XO3i076RLBB6Hm_yHuAparrwp-pPE48eQdiYjrSAFalz4ojWQ3_ByLA6uAQ", - "q": "2FvfeWnIlWNUipan7DIBlJrmz5EinJNxrQ-BNwPHrAoIM8qvyC7jPy09YxZs5Y9CMMZSal6C4Nm2LHBFxHU9z1qd5XDzbk19G-y1lDqZizVXr876TpiAjuq03rcoMQm8dQru_pVjUdgxR64vKyJ9CaFMAqcpZeEMIqAvzhQG8uE", - "dp": "Kg4HPGpzenhK2ser6nfM1Yt-pkqBbWQotvqsxGptECXpbN7vweupvL5kJPeRrbsXKp9QE7DXTN1sG9puJxMSwtgiv4hr9Va9e9WOC6PMd2VY7tgw5uKMpPLMc5y82PusRhBoRh0SUUsjyQxK9PGtWYnGZXbAoaIYPdMyDlosfqU", - "dq": "QuUNEHYTjZTbo8n2-4FumarXKGBAalbwM8jyc7cYemnTpWfKt8M_gd4T99oMK2IC3h_DhZ3ZK3pE6DKCb76sMLtczH8C1RziTMsATWdc5_zDMtl07O4b-ZQ5_g51P8w515pc0JwRzFFi0z3Y2aZdMKgNX1id5SES5nXOshHhICE", - "n": "0lN6CiJGFD8BSPV_azLoEl6Nq-WlHkU743D5rqvzw1sOaxstMGxAhVk2YIhWwfvapV6XjO_yvc4778VBTELOdjRw6BGUdBJepdwkL__TPyjEVhqMQj9MKhEU4GUy9w0Lsilb5D01kfrOKpmdcYw4jhcDvb0H4-LZgh1Vk84vF4WaQCUg_AX4drVDQOjoU8kuWIM8gz9w6zEsbIw-gtMRpFwS8ncA0zDX5VfyC77iMxzFftDIP2gM5GvdevMzvP9IRkRRBhP9vV4JchBFPHSA9OPJcnySjJJNW6aAJn6P6JasN1z68khjufM09J8UzmLAZYOq7gUG95Ox1KsV-g337Q", - "e": "AQAB", - "p": "-Nyj_Sw3f2HUqSssCZv84y7b3blOtGGAhfYN_JtGfcTQv2bOtxrIUzeonCi-Z_1W4hO10tqxJcOB0ibtDqkDlLhnLaIYOBfriITRFK83EJG5sC-0KTmFzUXFTA2aMc1QgP-Fu6gUfQpPqLgWxhx8EFhkBlBZshKU5-C-385Sco0", - "kid": "46c686ea-7d4e-41cd-a462-2125fc1dee0e", - }, - { - "kty": "EC", - "d": "wwULaR9UYWZW6U2oEbkz3sO1lhPSj6DyA6e7PiUfhog", - "use": "sig", - "crv": "P-256", - "x": "HZMHZkX-63heqA5pvWn-UR7bgcXZNEcQa5wfvG_BzTw", - "y": "SUCuwjjiyKvGq5Odr0sjDqjha_CBqks0JQFrR7Ei5OQ", - "alg": "ES256", - "kid": "ac85baf4-835b-49b2-8272-ffecce7654c9", - }, - ] -} - - -@pytest.fixture -def oidc_settings(settings): - settings.A2_IDP_OIDC_JWKSET = JWKSET - settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT = '100/m' - return settings - def test_get_jwkset(oidc_settings): from authentic2_idp_oidc.utils import get_jwkset @@ -159,72 +124,6 @@ def test_admin(other_attributes, app, superuser, oidc_settings): assert OIDCClient.objects.count() == 1 -def make_client(app, superuser, params=None): - Attribute.objects.create( - name='cityscape_image', - label='cityscape', - kind='profile_image', - asked_on_registration=True, - required=False, - user_visible=True, - user_editable=True, - ) - - client = OIDCClient( - name='oidcclient', - slug='oidcclient', - ou=get_default_ou(), - unauthorized_url='https://example.com/southpark/', - redirect_uris='https://example.com/callbac%C3%A9', - ) - - for key, value in (params or {}).items(): - setattr(client, key, value) - client.save() - for mapping in app_settings.DEFAULT_MAPPINGS: - OIDCClaim.objects.create( - client=client, name=mapping['name'], value=mapping['value'], scopes=mapping['scopes'] - ) - return client - - -@pytest.fixture -def client(app, superuser): - return make_client(app, superuser, {}) - - -@pytest.fixture -def oidc_client(request, superuser, app, simple_user, oidc_settings): - return make_client(app, superuser, getattr(request, 'param', None) or {}) - - -@pytest.fixture -def normal_oidc_client(superuser, app, simple_user): - url = reverse('admin:authentic2_idp_oidc_oidcclient_add') - assert OIDCClient.objects.count() == 0 - response = utils.login(app, superuser, path=url) - response.form.set('name', 'oidcclient') - response.form.set('slug', 'oidcclient') - response.form.set('ou', get_default_ou().pk) - response.form.set('unauthorized_url', 'https://example.com/southpark/') - response.form.set('redirect_uris', 'https://example.com/callbac%C3%A9') - response = response.form.submit(name='_save').follow() - assert OIDCClient.objects.count() == 1 - client = OIDCClient.objects.get() - utils.logout(app) - return client - - -def client_authentication_headers(oidc_client): - client_creds = '%s:%s' % (oidc_client.client_id, oidc_client.client_secret) - token = base64.b64encode(client_creds.encode('ascii')) - return {'Authorization': 'Basic %s' % str(token.decode('ascii'))} - - -def bearer_authentication_headers(access_token): - return {'Authorization': 'Bearer %s' % str(access_token)} - - @pytest.mark.parametrize('oidc_client', OIDC_CLIENT_PARAMS, indirect=True) @pytest.mark.parametrize('do_not_ask_again', [(True,), (False,)]) @pytest.mark.parametrize('login_first', [(True,), (False,)]) @@ -934,58 +833,6 @@ def test_invalid_request(oidc_client, caplog, oidc_settings, simple_user, app): assert response.json['error_description'] == 'Parameter "code" has expired or user is disconnected' -def test_expired_manager(db, simple_user): - expired = now() - datetime.timedelta(seconds=1) - not_expired = now() + datetime.timedelta(days=1) - client = OIDCClient.objects.create( - name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/' - ) - OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=expired) - OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=not_expired) - assert OIDCAuthorization.objects.count() == 2 - OIDCAuthorization.objects.cleanup() - assert OIDCAuthorization.objects.count() == 1 - - OIDCCode.objects.create( - client=client, - user=simple_user, - scopes='openid', - redirect_uri='https://example.com/', - session_key='xxx', - auth_time=now(), - expired=expired, - ) - OIDCCode.objects.create( - client=client, - user=simple_user, - scopes='openid', - redirect_uri='https://example.com/', - session_key='xxx', - auth_time=now(), - expired=not_expired, - ) - assert OIDCCode.objects.count() == 2 - OIDCCode.objects.cleanup() - assert OIDCCode.objects.count() == 1 - - OIDCAccessToken.objects.create( - client=client, user=simple_user, scopes='openid', session_key='xxx', expired=expired - ) - OIDCAccessToken.objects.create( - client=client, user=simple_user, scopes='openid', session_key='xxx', expired=not_expired - ) - assert OIDCAccessToken.objects.count() == 2 - OIDCAccessToken.objects.cleanup() - assert OIDCAccessToken.objects.count() == 1 - - -@pytest.fixture -def simple_oidc_client(db): - return OIDCClient.objects.create( - name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/' - ) - - def test_client_secret_post_authentication(oidc_settings, app, simple_oidc_client, simple_user): utils.login(app, simple_user) redirect_uri = simple_oidc_client.redirect_uris.split()[0] @@ -1149,137 +996,6 @@ def test_registration_service_slug(oidc_settings, app, simple_oidc_client, simpl assert hooks.event[2]['kwargs']['service'] == 'client' -def test_oidclient_claims_data_migration(migration): - app = 'authentic2_idp_oidc' - migrate_from = [(app, '0009_auto_20180313_1156')] - migrate_to = [(app, '0010_oidcclaim')] - - old_apps = migration.before(migrate_from) - OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient') - - client = OIDCClient(name='test', slug='test', redirect_uris='https://example.net/') - client.save() - - new_apps = migration.apply(migrate_to) - OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient') - - client = OIDCClient.objects.first() - assert OIDCClaim.objects.filter(client=client.id).count() == 5 - - -def test_oidclient_preferred_username_as_identifier_data_migration(migration): - app = 'authentic2_idp_oidc' - migrate_from = [(app, '0010_oidcclaim')] - migrate_to = [(app, '0011_auto_20180808_1546')] - - old_apps = migration.before(migrate_from) - OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient') - OIDCClaim = old_apps.get_model('authentic2_idp_oidc', 'OIDCClaim') - - client1 = OIDCClient.objects.create(name='test', slug='test', redirect_uris='https://example.net/') - client2 = OIDCClient.objects.create(name='test1', slug='test1', redirect_uris='https://example.net/') - client3 = OIDCClient.objects.create(name='test2', slug='test2', redirect_uris='https://example.net/') - client4 = OIDCClient.objects.create(name='test3', slug='test3', redirect_uris='https://example.net/') - for client in (client1, client2, client3, client4): - if client.name == 'test1': - continue - if client.name == 'test3': - OIDCClaim.objects.create( - client=client, name='preferred_username', value='django_user_full_name', scopes='profile' - ) - else: - OIDCClaim.objects.create( - client=client, name='preferred_username', value='django_user_username', scopes='profile' - ) - OIDCClaim.objects.create( - client=client, name='given_name', value='django_user_first_name', scopes='profile' - ) - OIDCClaim.objects.create( - client=client, name='family_name', value='django_user_last_name', scopes='profile' - ) - if client.name == 'test2': - continue - OIDCClaim.objects.create(client=client, name='email', value='django_user_email', scopes='email') - OIDCClaim.objects.create( - client=client, name='email_verified', value='django_user_email_verified', scopes='email' - ) - - new_apps = migration.apply(migrate_to) - OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient') - - client = OIDCClient.objects.first() - for client in OIDCClient.objects.all(): - claims = client.oidcclaim_set.all() - if client.name == 'test': - assert claims.count() == 5 - assert sorted(claims.values_list('name', flat=True)) == [ - 'email', - 'email_verified', - 'family_name', - 'given_name', - 'preferred_username', - ] - assert sorted(claims.values_list('value', flat=True)) == [ - 'django_user_email', - 'django_user_email_verified', - 'django_user_first_name', - 'django_user_identifier', - 'django_user_last_name', - ] - elif client.name == 'test2': - assert claims.count() == 3 - assert sorted(claims.values_list('name', flat=True)) == [ - 'family_name', - 'given_name', - 'preferred_username', - ] - assert sorted(claims.values_list('value', flat=True)) == [ - 'django_user_first_name', - 'django_user_last_name', - 'django_user_username', - ] - elif client.name == 'test3': - assert claims.count() == 5 - assert sorted(claims.values_list('name', flat=True)) == [ - 'email', - 'email_verified', - 'family_name', - 'given_name', - 'preferred_username', - ] - assert sorted(claims.values_list('value', flat=True)) == [ - 'django_user_email', - 'django_user_email_verified', - 'django_user_first_name', - 'django_user_full_name', - 'django_user_last_name', - ] - else: - assert claims.count() == 0 - - -def test_api_synchronization(app, oidc_client): - oidc_client.has_api_access = True - oidc_client.save() - users = [User.objects.create(username='user-%s' % i) for i in range(10)] - for user in users[5:]: - user.delete() - deleted_subs = set(make_sub(oidc_client, user) for user in users[5:]) - - app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret)) - status = 200 - if oidc_client.identifier_policy not in (OIDCClient.POLICY_PAIRWISE_REVERSIBLE, OIDCClient.POLICY_UUID): - status = 401 - response = app.post_json( - '/api/users/synchronization/', - params={'known_uuids': [make_sub(oidc_client, user) for user in users]}, - status=status, - ) - if status == 200: - assert response.json['result'] == 1 - assert set(response.json['unknown_uuids']) == deleted_subs - - def test_claim_default_value(oidc_settings, normal_oidc_client, simple_user, app): oidc_settings.A2_IDP_OIDC_SCOPES = ['openid', 'profile', 'email', 'phone'] Attribute.objects.create( @@ -1922,129 +1638,3 @@ def test_oidc_good_next_url_hook(app, oidc_client): rf = RequestFactory() request = rf.get('/') assert good_next_url(request, 'https://example.com/') - - -@pytest.fixture -def access_token(client, simple_user): - return OIDCAccessToken.objects.create( - client=client, - user=simple_user, - scopes='openid profile email', - expired=now() + datetime.timedelta(seconds=3600), - ) - - -def test_user_info(app, client, access_token, freezer): - def get_user_info(**kwargs): - return app.get( - '/idp/oidc/user_info/', headers=bearer_authentication_headers(access_token.uuid), **kwargs - ) - - response = app.get('/idp/oidc/user_info/', status=401) - assert ( - response['WWW-Authenticate'] - == 'Bearer error="invalid_request", error_description="Bearer authentication is mandatory"' - ) - - response = app.get('/idp/oidc/user_info/', headers={'Authorization': 'Bearer'}, status=401) - assert ( - response['WWW-Authenticate'] - == 'Bearer error="invalid_request", error_description="Invalid Bearer authentication"' - ) - - response = get_user_info(status=200) - assert dict(response.json, sub='') == { - 'email': 'user@example.net', - 'email_verified': False, - 'family_name': 'Dôe', - 'family_name_verified': True, - 'given_name': 'Jôhn', - 'given_name_verified': True, - 'preferred_username': 'user', - 'sub': '', - } - - # token is expired - access_token.expired = now() - datetime.timedelta(seconds=1) - access_token.save() - response = get_user_info(status=401) - assert ( - response['WWW-Authenticate'] - == 'Bearer error="invalid_token", error_description="Token expired or user disconnected"' - ) - - # token is unknown - access_token.delete() - response = get_user_info(status=401) - assert response['WWW-Authenticate'] == 'Bearer error="invalid_token", error_description="Token unknown"' - - utils.login(app, access_token.user) - access_token.expired = now() + datetime.timedelta(seconds=1) - access_token.session_key = app.session.session_key - access_token.save() - - get_user_info(status=200) - - app.session.flush() - response = get_user_info(status=401) - assert ( - response['WWW-Authenticate'] - == 'Bearer error="invalid_token", error_description="Token expired or user disconnected"' - ) - - -@pytest.fixture -def session(settings, db, simple_user): - engine = import_module(settings.SESSION_ENGINE) - session = engine.SessionStore() - session['_auth_user_id'] = str(simple_user.id) - session.create() - return session - - -def test_access_token_is_valid_session(simple_oidc_client, simple_user, session): - token = OIDCAccessToken.objects.create( - client=simple_oidc_client, user=simple_user, scopes='openid', session_key=session.session_key - ) - - assert token.is_valid() - session.flush() - assert not token.is_valid() - - -def test_access_token_is_valid_expired(simple_oidc_client, simple_user, freezer): - start = now() - expired = start + datetime.timedelta(seconds=30) - - token = OIDCAccessToken.objects.create( - client=simple_oidc_client, user=simple_user, scopes='openid', expired=expired - ) - - assert token.is_valid() - freezer.move_to(expired) - assert token.is_valid() - freezer.move_to(expired + datetime.timedelta(seconds=1)) - assert not token.is_valid() - - -def test_access_token_is_valid_session_and_expired(simple_oidc_client, simple_user, session, freezer): - start = now() - expired = start + datetime.timedelta(seconds=30) - - token = OIDCAccessToken.objects.create( - client=simple_oidc_client, - user=simple_user, - scopes='openid', - session_key=session.session_key, - expired=expired, - ) - - assert token.is_valid() - freezer.move_to(expired) - assert token.is_valid() - freezer.move_to(expired + datetime.timedelta(seconds=1)) - assert not token.is_valid() - freezer.move_to(start) - assert token.is_valid() - session.flush() - assert not token.is_valid() diff --git a/tests/idp_oidc/test_models.py b/tests/idp_oidc/test_models.py new file mode 100644 index 00000000..c764d72e --- /dev/null +++ b/tests/idp_oidc/test_models.py @@ -0,0 +1,115 @@ +# authentic2 - versatile identity manager +# Copyright (C) 2010-2021 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import datetime + +from django.utils.timezone import now + +from authentic2.a2_rbac.utils import get_default_ou +from authentic2_idp_oidc.models import OIDCAccessToken, OIDCAuthorization, OIDCClient, OIDCCode + + +def test_expired_manager(db, simple_user): + expired = now() - datetime.timedelta(seconds=1) + not_expired = now() + datetime.timedelta(days=1) + client = OIDCClient.objects.create( + name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/' + ) + OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=expired) + OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=not_expired) + assert OIDCAuthorization.objects.count() == 2 + OIDCAuthorization.objects.cleanup() + assert OIDCAuthorization.objects.count() == 1 + + OIDCCode.objects.create( + client=client, + user=simple_user, + scopes='openid', + redirect_uri='https://example.com/', + session_key='xxx', + auth_time=now(), + expired=expired, + ) + OIDCCode.objects.create( + client=client, + user=simple_user, + scopes='openid', + redirect_uri='https://example.com/', + session_key='xxx', + auth_time=now(), + expired=not_expired, + ) + assert OIDCCode.objects.count() == 2 + OIDCCode.objects.cleanup() + assert OIDCCode.objects.count() == 1 + + OIDCAccessToken.objects.create( + client=client, user=simple_user, scopes='openid', session_key='xxx', expired=expired + ) + OIDCAccessToken.objects.create( + client=client, user=simple_user, scopes='openid', session_key='xxx', expired=not_expired + ) + assert OIDCAccessToken.objects.count() == 2 + OIDCAccessToken.objects.cleanup() + assert OIDCAccessToken.objects.count() == 1 + + +def test_access_token_is_valid_session(simple_oidc_client, simple_user, session): + token = OIDCAccessToken.objects.create( + client=simple_oidc_client, user=simple_user, scopes='openid', session_key=session.session_key + ) + + assert token.is_valid() + session.flush() + assert not token.is_valid() + + +def test_access_token_is_valid_expired(simple_oidc_client, simple_user, freezer): + start = now() + expired = start + datetime.timedelta(seconds=30) + + token = OIDCAccessToken.objects.create( + client=simple_oidc_client, user=simple_user, scopes='openid', expired=expired + ) + + assert token.is_valid() + freezer.move_to(expired) + assert token.is_valid() + freezer.move_to(expired + datetime.timedelta(seconds=1)) + assert not token.is_valid() + + +def test_access_token_is_valid_session_and_expired(simple_oidc_client, simple_user, session, freezer): + start = now() + expired = start + datetime.timedelta(seconds=30) + + token = OIDCAccessToken.objects.create( + client=simple_oidc_client, + user=simple_user, + scopes='openid', + session_key=session.session_key, + expired=expired, + ) + + assert token.is_valid() + freezer.move_to(expired) + assert token.is_valid() + freezer.move_to(expired + datetime.timedelta(seconds=1)) + assert not token.is_valid() + freezer.move_to(start) + assert token.is_valid() + session.flush() + assert not token.is_valid() diff --git a/tests/idp_oidc/test_views.py b/tests/idp_oidc/test_views.py new file mode 100644 index 00000000..86997a7f --- /dev/null +++ b/tests/idp_oidc/test_views.py @@ -0,0 +1,90 @@ +# authentic2 - versatile identity manager +# Copyright (C) 2010-2021 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import datetime + +from django.utils.timezone import now + +from authentic2_idp_oidc.models import OIDCAccessToken + +from .. import utils +from .conftest import bearer_authentication_headers + + +def test_user_info(app, client, freezer, simple_user): + access_token = OIDCAccessToken.objects.create( + client=client, + user=simple_user, + scopes='openid profile email', + expired=now() + datetime.timedelta(seconds=3600), + ) + + def get_user_info(**kwargs): + return app.get( + '/idp/oidc/user_info/', headers=bearer_authentication_headers(access_token.uuid), **kwargs + ) + + response = app.get('/idp/oidc/user_info/', status=401) + assert ( + response['WWW-Authenticate'] + == 'Bearer error="invalid_request", error_description="Bearer authentication is mandatory"' + ) + + response = app.get('/idp/oidc/user_info/', headers={'Authorization': 'Bearer'}, status=401) + assert ( + response['WWW-Authenticate'] + == 'Bearer error="invalid_request", error_description="Invalid Bearer authentication"' + ) + + response = get_user_info(status=200) + assert dict(response.json, sub='') == { + 'email': 'user@example.net', + 'email_verified': False, + 'family_name': 'Dôe', + 'family_name_verified': True, + 'given_name': 'Jôhn', + 'given_name_verified': True, + 'preferred_username': 'user', + 'sub': '', + } + + # token is expired + access_token.expired = now() - datetime.timedelta(seconds=1) + access_token.save() + response = get_user_info(status=401) + assert ( + response['WWW-Authenticate'] + == 'Bearer error="invalid_token", error_description="Token expired or user disconnected"' + ) + + # token is unknown + access_token.delete() + response = get_user_info(status=401) + assert response['WWW-Authenticate'] == 'Bearer error="invalid_token", error_description="Token unknown"' + + utils.login(app, access_token.user) + access_token.expired = now() + datetime.timedelta(seconds=1) + access_token.session_key = app.session.session_key + access_token.save() + + get_user_info(status=200) + + app.session.flush() + response = get_user_info(status=401) + assert ( + response['WWW-Authenticate'] + == 'Bearer error="invalid_token", error_description="Token expired or user disconnected"' + ) -- 2.32.0.rc0