Projet

Général

Profil

0002-tests-move-idp_oidc-tests-in-a-subdirectory-54740.patch

Benjamin Dauvergne, 10 juin 2021 16:45

Télécharger (40,9 ko)

Voir les différences:

Subject: [PATCH 2/3] tests: move idp_oidc tests in a subdirectory (#54740)

 tests/idp_oidc/__init__.py                    |   0
 tests/idp_oidc/conftest.py                    | 148 +++++++
 tests/idp_oidc/test_api.py                    |  41 ++
 tests/idp_oidc/test_migrations.py             | 125 ++++++
 .../test_misc.py}                             | 416 +-----------------
 tests/idp_oidc/test_models.py                 | 115 +++++
 tests/idp_oidc/test_views.py                  |  90 ++++
 7 files changed, 522 insertions(+), 413 deletions(-)
 create mode 100644 tests/idp_oidc/__init__.py
 create mode 100644 tests/idp_oidc/conftest.py
 create mode 100644 tests/idp_oidc/test_api.py
 create mode 100644 tests/idp_oidc/test_migrations.py
 rename tests/{test_idp_oidc.py => idp_oidc/test_misc.py} (79%)
 create mode 100644 tests/idp_oidc/test_models.py
 create mode 100644 tests/idp_oidc/test_views.py
tests/idp_oidc/conftest.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2021 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import base64
18
from importlib import import_module
19

  
20
import pytest
21
from django.urls import reverse
22

  
23
from authentic2.a2_rbac.utils import get_default_ou
24
from authentic2.models import Attribute
25
from authentic2_idp_oidc import app_settings
26
from authentic2_idp_oidc.models import OIDCClaim, OIDCClient
27
from tests import utils
28

  
29
JWKSET = {
30
    "keys": [
31
        {
32
            "qi": "h_zifVD-ChelxZUVxhICNcgGkQz26b-EdIlLY9rN7SX_aD3sLI_JHEHV4Bz3kV5eW8O4qJ8SHhfUdHGK-gRH7FVOGoXnXACf47QoXowHzsPLL64wCuZENTl7hIRGLY-BInULkfTQfuiVSMoxPjsVNTMBzMiz0bNjMQyMyvW5xH4",
33
            "kty": "RSA",
34
            "d": "pUcL4-LDBy3rqJWip269h5Hd6nLvqjXltfkVe_mL-LwZPHmCrUaj_SX54SnCY3Wyf7kxhoMYUac62lQ71923uJPFFdiavAujbNrtZPq32i4C-1apWXW8OGJr8VoVDqalxj9SAq1G54wbbsaAPrZdyuqy-esNxDqDigfbM-cWgngBBYo5CSsfnmnd05N2cUS26L7QzWbNHwilnBTE9e_J7rK3xUCDKrobv6_LiI-AhMmBHJSrCxjexh0wzfBi_Ntj9BGCcPThDjG8SQvaV-aLNdLfIy2XO3i076RLBB6Hm_yHuAparrwp-pPE48eQdiYjrSAFalz4ojWQ3_ByLA6uAQ",
35
            "q": "2FvfeWnIlWNUipan7DIBlJrmz5EinJNxrQ-BNwPHrAoIM8qvyC7jPy09YxZs5Y9CMMZSal6C4Nm2LHBFxHU9z1qd5XDzbk19G-y1lDqZizVXr876TpiAjuq03rcoMQm8dQru_pVjUdgxR64vKyJ9CaFMAqcpZeEMIqAvzhQG8uE",
36
            "dp": "Kg4HPGpzenhK2ser6nfM1Yt-pkqBbWQotvqsxGptECXpbN7vweupvL5kJPeRrbsXKp9QE7DXTN1sG9puJxMSwtgiv4hr9Va9e9WOC6PMd2VY7tgw5uKMpPLMc5y82PusRhBoRh0SUUsjyQxK9PGtWYnGZXbAoaIYPdMyDlosfqU",
37
            "dq": "QuUNEHYTjZTbo8n2-4FumarXKGBAalbwM8jyc7cYemnTpWfKt8M_gd4T99oMK2IC3h_DhZ3ZK3pE6DKCb76sMLtczH8C1RziTMsATWdc5_zDMtl07O4b-ZQ5_g51P8w515pc0JwRzFFi0z3Y2aZdMKgNX1id5SES5nXOshHhICE",
38
            "n": "0lN6CiJGFD8BSPV_azLoEl6Nq-WlHkU743D5rqvzw1sOaxstMGxAhVk2YIhWwfvapV6XjO_yvc4778VBTELOdjRw6BGUdBJepdwkL__TPyjEVhqMQj9MKhEU4GUy9w0Lsilb5D01kfrOKpmdcYw4jhcDvb0H4-LZgh1Vk84vF4WaQCUg_AX4drVDQOjoU8kuWIM8gz9w6zEsbIw-gtMRpFwS8ncA0zDX5VfyC77iMxzFftDIP2gM5GvdevMzvP9IRkRRBhP9vV4JchBFPHSA9OPJcnySjJJNW6aAJn6P6JasN1z68khjufM09J8UzmLAZYOq7gUG95Ox1KsV-g337Q",
39
            "e": "AQAB",
40
            "p": "-Nyj_Sw3f2HUqSssCZv84y7b3blOtGGAhfYN_JtGfcTQv2bOtxrIUzeonCi-Z_1W4hO10tqxJcOB0ibtDqkDlLhnLaIYOBfriITRFK83EJG5sC-0KTmFzUXFTA2aMc1QgP-Fu6gUfQpPqLgWxhx8EFhkBlBZshKU5-C-385Sco0",
41
            "kid": "46c686ea-7d4e-41cd-a462-2125fc1dee0e",
42
        },
43
        {
44
            "kty": "EC",
45
            "d": "wwULaR9UYWZW6U2oEbkz3sO1lhPSj6DyA6e7PiUfhog",
46
            "use": "sig",
47
            "crv": "P-256",
48
            "x": "HZMHZkX-63heqA5pvWn-UR7bgcXZNEcQa5wfvG_BzTw",
49
            "y": "SUCuwjjiyKvGq5Odr0sjDqjha_CBqks0JQFrR7Ei5OQ",
50
            "alg": "ES256",
51
            "kid": "ac85baf4-835b-49b2-8272-ffecce7654c9",
52
        },
53
    ]
54
}
55

  
56

  
57
@pytest.fixture
58
def jwkset():
59
    return JWKSET
60

  
61

  
62
@pytest.fixture
63
def oidc_settings(settings, jwkset):
64
    settings.A2_IDP_OIDC_JWKSET = jwkset
65
    settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT = '100/m'
66
    return settings
67

  
68

  
69
def make_client(app, superuser, params=None):
70
    Attribute.objects.create(
71
        name='cityscape_image',
72
        label='cityscape',
73
        kind='profile_image',
74
        asked_on_registration=True,
75
        required=False,
76
        user_visible=True,
77
        user_editable=True,
78
    )
79

  
80
    client = OIDCClient(
81
        name='oidcclient',
82
        slug='oidcclient',
83
        ou=get_default_ou(),
84
        unauthorized_url='https://example.com/southpark/',
85
        redirect_uris='https://example.com/callbac%C3%A9',
86
    )
87

  
88
    for key, value in (params or {}).items():
89
        setattr(client, key, value)
90
    client.save()
91
    for mapping in app_settings.DEFAULT_MAPPINGS:
92
        OIDCClaim.objects.create(
93
            client=client, name=mapping['name'], value=mapping['value'], scopes=mapping['scopes']
94
        )
95
    return client
96

  
97

  
98
@pytest.fixture
99
def client(app, superuser):
100
    return make_client(app, superuser, {})
101

  
102

  
103
@pytest.fixture
104
def simple_oidc_client(db):
105
    return OIDCClient.objects.create(
106
        name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/'
107
    )
108

  
109

  
110
@pytest.fixture
111
def oidc_client(request, superuser, app, simple_user, oidc_settings):
112
    return make_client(app, superuser, getattr(request, 'param', None) or {})
113

  
114

  
115
@pytest.fixture
116
def normal_oidc_client(superuser, app, simple_user):
117
    url = reverse('admin:authentic2_idp_oidc_oidcclient_add')
118
    assert OIDCClient.objects.count() == 0
119
    response = utils.login(app, superuser, path=url)
120
    response.form.set('name', 'oidcclient')
121
    response.form.set('slug', 'oidcclient')
122
    response.form.set('ou', get_default_ou().pk)
123
    response.form.set('unauthorized_url', 'https://example.com/southpark/')
124
    response.form.set('redirect_uris', 'https://example.com/callbac%C3%A9')
125
    response = response.form.submit(name='_save').follow()
126
    assert OIDCClient.objects.count() == 1
127
    client = OIDCClient.objects.get()
128
    utils.logout(app)
129
    return client
130

  
131

  
132
@pytest.fixture
133
def session(settings, db, simple_user):
134
    engine = import_module(settings.SESSION_ENGINE)
135
    session = engine.SessionStore()
136
    session['_auth_user_id'] = str(simple_user.id)
137
    session.create()
138
    return session
139

  
140

  
141
def client_authentication_headers(oidc_client):
142
    client_creds = '%s:%s' % (oidc_client.client_id, oidc_client.client_secret)
143
    token = base64.b64encode(client_creds.encode('ascii'))
144
    return {'Authorization': 'Basic %s' % str(token.decode('ascii'))}
145

  
146

  
147
def bearer_authentication_headers(access_token):
148
    return {'Authorization': 'Bearer %s' % str(access_token)}
tests/idp_oidc/test_api.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2021 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from authentic2.custom_user.models import User
18
from authentic2_idp_oidc.models import OIDCClient
19
from authentic2_idp_oidc.utils import make_sub
20

  
21

  
22
def test_api_synchronization(app, oidc_client):
23
    oidc_client.has_api_access = True
24
    oidc_client.save()
25
    users = [User.objects.create(username='user-%s' % i) for i in range(10)]
26
    for user in users[5:]:
27
        user.delete()
28
    deleted_subs = set(make_sub(oidc_client, user) for user in users[5:])
29

  
30
    app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
31
    status = 200
32
    if oidc_client.identifier_policy not in (OIDCClient.POLICY_PAIRWISE_REVERSIBLE, OIDCClient.POLICY_UUID):
33
        status = 401
34
    response = app.post_json(
35
        '/api/users/synchronization/',
36
        params={'known_uuids': [make_sub(oidc_client, user) for user in users]},
37
        status=status,
38
    )
39
    if status == 200:
40
        assert response.json['result'] == 1
41
        assert set(response.json['unknown_uuids']) == deleted_subs
tests/idp_oidc/test_migrations.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2021 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17

  
18
def test_oidclient_claims_data_migration(migration):
19
    app = 'authentic2_idp_oidc'
20
    migrate_from = [(app, '0009_auto_20180313_1156')]
21
    migrate_to = [(app, '0010_oidcclaim')]
22

  
23
    old_apps = migration.before(migrate_from)
24
    OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
25

  
26
    client = OIDCClient(name='test', slug='test', redirect_uris='https://example.net/')
27
    client.save()
28

  
29
    new_apps = migration.apply(migrate_to)
30
    OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
31
    OIDCClaim = new_apps.get_model('authentic2_idp_oidc', 'OIDCClaim')
32

  
33
    client = OIDCClient.objects.first()
34
    assert OIDCClaim.objects.filter(client=client.id).count() == 5
35

  
36

  
37
def test_oidclient_preferred_username_as_identifier_data_migration(migration):
38
    app = 'authentic2_idp_oidc'
39
    migrate_from = [(app, '0010_oidcclaim')]
40
    migrate_to = [(app, '0011_auto_20180808_1546')]
41

  
42
    old_apps = migration.before(migrate_from)
43
    OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
44
    OIDCClaim = old_apps.get_model('authentic2_idp_oidc', 'OIDCClaim')
45

  
46
    client1 = OIDCClient.objects.create(name='test', slug='test', redirect_uris='https://example.net/')
47
    client2 = OIDCClient.objects.create(name='test1', slug='test1', redirect_uris='https://example.net/')
48
    client3 = OIDCClient.objects.create(name='test2', slug='test2', redirect_uris='https://example.net/')
49
    client4 = OIDCClient.objects.create(name='test3', slug='test3', redirect_uris='https://example.net/')
50
    for client in (client1, client2, client3, client4):
51
        if client.name == 'test1':
52
            continue
53
        if client.name == 'test3':
54
            OIDCClaim.objects.create(
55
                client=client, name='preferred_username', value='django_user_full_name', scopes='profile'
56
            )
57
        else:
58
            OIDCClaim.objects.create(
59
                client=client, name='preferred_username', value='django_user_username', scopes='profile'
60
            )
61
        OIDCClaim.objects.create(
62
            client=client, name='given_name', value='django_user_first_name', scopes='profile'
63
        )
64
        OIDCClaim.objects.create(
65
            client=client, name='family_name', value='django_user_last_name', scopes='profile'
66
        )
67
        if client.name == 'test2':
68
            continue
69
        OIDCClaim.objects.create(client=client, name='email', value='django_user_email', scopes='email')
70
        OIDCClaim.objects.create(
71
            client=client, name='email_verified', value='django_user_email_verified', scopes='email'
72
        )
73

  
74
    new_apps = migration.apply(migrate_to)
75
    OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
76

  
77
    client = OIDCClient.objects.first()
78
    for client in OIDCClient.objects.all():
79
        claims = client.oidcclaim_set.all()
80
        if client.name == 'test':
81
            assert claims.count() == 5
82
            assert sorted(claims.values_list('name', flat=True)) == [
83
                'email',
84
                'email_verified',
85
                'family_name',
86
                'given_name',
87
                'preferred_username',
88
            ]
89
            assert sorted(claims.values_list('value', flat=True)) == [
90
                'django_user_email',
91
                'django_user_email_verified',
92
                'django_user_first_name',
93
                'django_user_identifier',
94
                'django_user_last_name',
95
            ]
96
        elif client.name == 'test2':
97
            assert claims.count() == 3
98
            assert sorted(claims.values_list('name', flat=True)) == [
99
                'family_name',
100
                'given_name',
101
                'preferred_username',
102
            ]
103
            assert sorted(claims.values_list('value', flat=True)) == [
104
                'django_user_first_name',
105
                'django_user_last_name',
106
                'django_user_username',
107
            ]
108
        elif client.name == 'test3':
109
            assert claims.count() == 5
110
            assert sorted(claims.values_list('name', flat=True)) == [
111
                'email',
112
                'email_verified',
113
                'family_name',
114
                'given_name',
115
                'preferred_username',
116
            ]
117
            assert sorted(claims.values_list('value', flat=True)) == [
118
                'django_user_email',
119
                'django_user_email_verified',
120
                'django_user_first_name',
121
                'django_user_full_name',
122
                'django_user_last_name',
123
            ]
124
        else:
125
            assert claims.count() == 0
tests/test_idp_oidc.py → tests/idp_oidc/test_misc.py
1 1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2019 Entr'ouvert
2
# Copyright (C) 2010-2021 Entr'ouvert
3 3
#
4 4
# This program is free software: you can redistribute it and/or modify it
5 5
# under the terms of the GNU Affero General Public License as published
......
19 19
import functools
20 20
import json
21 21
import urllib.parse
22
from importlib import import_module
23 22

  
24 23
import pytest
25 24
from django.contrib.auth import get_user_model
......
37 36
from authentic2.models import Attribute, AuthorizedRole
38 37
from authentic2.utils import good_next_url, make_url
39 38
from authentic2_auth_oidc.utils import parse_timestamp
40
from authentic2_idp_oidc import app_settings
41 39
from authentic2_idp_oidc.models import OIDCAccessToken, OIDCAuthorization, OIDCClaim, OIDCClient, OIDCCode
42 40
from authentic2_idp_oidc.utils import base64url, get_first_ec_sig_key, get_first_rsa_sig_key, make_sub
43 41
from django_rbac.utils import get_ou_model, get_role_model
44 42

  
45
from . import utils
43
from .. import utils
44
from .conftest import bearer_authentication_headers, client_authentication_headers
46 45

  
47 46
User = get_user_model()
48 47

  
49 48
pytestmark = pytest.mark.django_db
50 49

  
51
JWKSET = {
52
    "keys": [
53
        {
54
            "qi": "h_zifVD-ChelxZUVxhICNcgGkQz26b-EdIlLY9rN7SX_aD3sLI_JHEHV4Bz3kV5eW8O4qJ8SHhfUdHGK-gRH7FVOGoXnXACf47QoXowHzsPLL64wCuZENTl7hIRGLY-BInULkfTQfuiVSMoxPjsVNTMBzMiz0bNjMQyMyvW5xH4",
55
            "kty": "RSA",
56
            "d": "pUcL4-LDBy3rqJWip269h5Hd6nLvqjXltfkVe_mL-LwZPHmCrUaj_SX54SnCY3Wyf7kxhoMYUac62lQ71923uJPFFdiavAujbNrtZPq32i4C-1apWXW8OGJr8VoVDqalxj9SAq1G54wbbsaAPrZdyuqy-esNxDqDigfbM-cWgngBBYo5CSsfnmnd05N2cUS26L7QzWbNHwilnBTE9e_J7rK3xUCDKrobv6_LiI-AhMmBHJSrCxjexh0wzfBi_Ntj9BGCcPThDjG8SQvaV-aLNdLfIy2XO3i076RLBB6Hm_yHuAparrwp-pPE48eQdiYjrSAFalz4ojWQ3_ByLA6uAQ",
57
            "q": "2FvfeWnIlWNUipan7DIBlJrmz5EinJNxrQ-BNwPHrAoIM8qvyC7jPy09YxZs5Y9CMMZSal6C4Nm2LHBFxHU9z1qd5XDzbk19G-y1lDqZizVXr876TpiAjuq03rcoMQm8dQru_pVjUdgxR64vKyJ9CaFMAqcpZeEMIqAvzhQG8uE",
58
            "dp": "Kg4HPGpzenhK2ser6nfM1Yt-pkqBbWQotvqsxGptECXpbN7vweupvL5kJPeRrbsXKp9QE7DXTN1sG9puJxMSwtgiv4hr9Va9e9WOC6PMd2VY7tgw5uKMpPLMc5y82PusRhBoRh0SUUsjyQxK9PGtWYnGZXbAoaIYPdMyDlosfqU",
59
            "dq": "QuUNEHYTjZTbo8n2-4FumarXKGBAalbwM8jyc7cYemnTpWfKt8M_gd4T99oMK2IC3h_DhZ3ZK3pE6DKCb76sMLtczH8C1RziTMsATWdc5_zDMtl07O4b-ZQ5_g51P8w515pc0JwRzFFi0z3Y2aZdMKgNX1id5SES5nXOshHhICE",
60
            "n": "0lN6CiJGFD8BSPV_azLoEl6Nq-WlHkU743D5rqvzw1sOaxstMGxAhVk2YIhWwfvapV6XjO_yvc4778VBTELOdjRw6BGUdBJepdwkL__TPyjEVhqMQj9MKhEU4GUy9w0Lsilb5D01kfrOKpmdcYw4jhcDvb0H4-LZgh1Vk84vF4WaQCUg_AX4drVDQOjoU8kuWIM8gz9w6zEsbIw-gtMRpFwS8ncA0zDX5VfyC77iMxzFftDIP2gM5GvdevMzvP9IRkRRBhP9vV4JchBFPHSA9OPJcnySjJJNW6aAJn6P6JasN1z68khjufM09J8UzmLAZYOq7gUG95Ox1KsV-g337Q",
61
            "e": "AQAB",
62
            "p": "-Nyj_Sw3f2HUqSssCZv84y7b3blOtGGAhfYN_JtGfcTQv2bOtxrIUzeonCi-Z_1W4hO10tqxJcOB0ibtDqkDlLhnLaIYOBfriITRFK83EJG5sC-0KTmFzUXFTA2aMc1QgP-Fu6gUfQpPqLgWxhx8EFhkBlBZshKU5-C-385Sco0",
63
            "kid": "46c686ea-7d4e-41cd-a462-2125fc1dee0e",
64
        },
65
        {
66
            "kty": "EC",
67
            "d": "wwULaR9UYWZW6U2oEbkz3sO1lhPSj6DyA6e7PiUfhog",
68
            "use": "sig",
69
            "crv": "P-256",
70
            "x": "HZMHZkX-63heqA5pvWn-UR7bgcXZNEcQa5wfvG_BzTw",
71
            "y": "SUCuwjjiyKvGq5Odr0sjDqjha_CBqks0JQFrR7Ei5OQ",
72
            "alg": "ES256",
73
            "kid": "ac85baf4-835b-49b2-8272-ffecce7654c9",
74
        },
75
    ]
76
}
77

  
78

  
79
@pytest.fixture
80
def oidc_settings(settings):
81
    settings.A2_IDP_OIDC_JWKSET = JWKSET
82
    settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT = '100/m'
83
    return settings
84

  
85 50

  
86 51
def test_get_jwkset(oidc_settings):
87 52
    from authentic2_idp_oidc.utils import get_jwkset
......
159 124
    assert OIDCClient.objects.count() == 1
160 125

  
161 126

  
162
def make_client(app, superuser, params=None):
163
    Attribute.objects.create(
164
        name='cityscape_image',
165
        label='cityscape',
166
        kind='profile_image',
167
        asked_on_registration=True,
168
        required=False,
169
        user_visible=True,
170
        user_editable=True,
171
    )
172

  
173
    client = OIDCClient(
174
        name='oidcclient',
175
        slug='oidcclient',
176
        ou=get_default_ou(),
177
        unauthorized_url='https://example.com/southpark/',
178
        redirect_uris='https://example.com/callbac%C3%A9',
179
    )
180

  
181
    for key, value in (params or {}).items():
182
        setattr(client, key, value)
183
    client.save()
184
    for mapping in app_settings.DEFAULT_MAPPINGS:
185
        OIDCClaim.objects.create(
186
            client=client, name=mapping['name'], value=mapping['value'], scopes=mapping['scopes']
187
        )
188
    return client
189

  
190

  
191
@pytest.fixture
192
def client(app, superuser):
193
    return make_client(app, superuser, {})
194

  
195

  
196
@pytest.fixture
197
def oidc_client(request, superuser, app, simple_user, oidc_settings):
198
    return make_client(app, superuser, getattr(request, 'param', None) or {})
199

  
200

  
201
@pytest.fixture
202
def normal_oidc_client(superuser, app, simple_user):
203
    url = reverse('admin:authentic2_idp_oidc_oidcclient_add')
204
    assert OIDCClient.objects.count() == 0
205
    response = utils.login(app, superuser, path=url)
206
    response.form.set('name', 'oidcclient')
207
    response.form.set('slug', 'oidcclient')
208
    response.form.set('ou', get_default_ou().pk)
209
    response.form.set('unauthorized_url', 'https://example.com/southpark/')
210
    response.form.set('redirect_uris', 'https://example.com/callbac%C3%A9')
211
    response = response.form.submit(name='_save').follow()
212
    assert OIDCClient.objects.count() == 1
213
    client = OIDCClient.objects.get()
214
    utils.logout(app)
215
    return client
216

  
217

  
218
def client_authentication_headers(oidc_client):
219
    client_creds = '%s:%s' % (oidc_client.client_id, oidc_client.client_secret)
220
    token = base64.b64encode(client_creds.encode('ascii'))
221
    return {'Authorization': 'Basic %s' % str(token.decode('ascii'))}
222

  
223

  
224
def bearer_authentication_headers(access_token):
225
    return {'Authorization': 'Bearer %s' % str(access_token)}
226

  
227

  
228 127
@pytest.mark.parametrize('oidc_client', OIDC_CLIENT_PARAMS, indirect=True)
229 128
@pytest.mark.parametrize('do_not_ask_again', [(True,), (False,)])
230 129
@pytest.mark.parametrize('login_first', [(True,), (False,)])
......
934 833
        assert response.json['error_description'] == 'Parameter "code" has expired or user is disconnected'
935 834

  
936 835

  
937
def test_expired_manager(db, simple_user):
938
    expired = now() - datetime.timedelta(seconds=1)
939
    not_expired = now() + datetime.timedelta(days=1)
940
    client = OIDCClient.objects.create(
941
        name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/'
942
    )
943
    OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=expired)
944
    OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=not_expired)
945
    assert OIDCAuthorization.objects.count() == 2
946
    OIDCAuthorization.objects.cleanup()
947
    assert OIDCAuthorization.objects.count() == 1
948

  
949
    OIDCCode.objects.create(
950
        client=client,
951
        user=simple_user,
952
        scopes='openid',
953
        redirect_uri='https://example.com/',
954
        session_key='xxx',
955
        auth_time=now(),
956
        expired=expired,
957
    )
958
    OIDCCode.objects.create(
959
        client=client,
960
        user=simple_user,
961
        scopes='openid',
962
        redirect_uri='https://example.com/',
963
        session_key='xxx',
964
        auth_time=now(),
965
        expired=not_expired,
966
    )
967
    assert OIDCCode.objects.count() == 2
968
    OIDCCode.objects.cleanup()
969
    assert OIDCCode.objects.count() == 1
970

  
971
    OIDCAccessToken.objects.create(
972
        client=client, user=simple_user, scopes='openid', session_key='xxx', expired=expired
973
    )
974
    OIDCAccessToken.objects.create(
975
        client=client, user=simple_user, scopes='openid', session_key='xxx', expired=not_expired
976
    )
977
    assert OIDCAccessToken.objects.count() == 2
978
    OIDCAccessToken.objects.cleanup()
979
    assert OIDCAccessToken.objects.count() == 1
980

  
981

  
982
@pytest.fixture
983
def simple_oidc_client(db):
984
    return OIDCClient.objects.create(
985
        name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/'
986
    )
987

  
988

  
989 836
def test_client_secret_post_authentication(oidc_settings, app, simple_oidc_client, simple_user):
990 837
    utils.login(app, simple_user)
991 838
    redirect_uri = simple_oidc_client.redirect_uris.split()[0]
......
1149 996
    assert hooks.event[2]['kwargs']['service'] == 'client'
1150 997

  
1151 998

  
1152
def test_oidclient_claims_data_migration(migration):
1153
    app = 'authentic2_idp_oidc'
1154
    migrate_from = [(app, '0009_auto_20180313_1156')]
1155
    migrate_to = [(app, '0010_oidcclaim')]
1156

  
1157
    old_apps = migration.before(migrate_from)
1158
    OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
1159

  
1160
    client = OIDCClient(name='test', slug='test', redirect_uris='https://example.net/')
1161
    client.save()
1162

  
1163
    new_apps = migration.apply(migrate_to)
1164
    OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
1165

  
1166
    client = OIDCClient.objects.first()
1167
    assert OIDCClaim.objects.filter(client=client.id).count() == 5
1168

  
1169

  
1170
def test_oidclient_preferred_username_as_identifier_data_migration(migration):
1171
    app = 'authentic2_idp_oidc'
1172
    migrate_from = [(app, '0010_oidcclaim')]
1173
    migrate_to = [(app, '0011_auto_20180808_1546')]
1174

  
1175
    old_apps = migration.before(migrate_from)
1176
    OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
1177
    OIDCClaim = old_apps.get_model('authentic2_idp_oidc', 'OIDCClaim')
1178

  
1179
    client1 = OIDCClient.objects.create(name='test', slug='test', redirect_uris='https://example.net/')
1180
    client2 = OIDCClient.objects.create(name='test1', slug='test1', redirect_uris='https://example.net/')
1181
    client3 = OIDCClient.objects.create(name='test2', slug='test2', redirect_uris='https://example.net/')
1182
    client4 = OIDCClient.objects.create(name='test3', slug='test3', redirect_uris='https://example.net/')
1183
    for client in (client1, client2, client3, client4):
1184
        if client.name == 'test1':
1185
            continue
1186
        if client.name == 'test3':
1187
            OIDCClaim.objects.create(
1188
                client=client, name='preferred_username', value='django_user_full_name', scopes='profile'
1189
            )
1190
        else:
1191
            OIDCClaim.objects.create(
1192
                client=client, name='preferred_username', value='django_user_username', scopes='profile'
1193
            )
1194
        OIDCClaim.objects.create(
1195
            client=client, name='given_name', value='django_user_first_name', scopes='profile'
1196
        )
1197
        OIDCClaim.objects.create(
1198
            client=client, name='family_name', value='django_user_last_name', scopes='profile'
1199
        )
1200
        if client.name == 'test2':
1201
            continue
1202
        OIDCClaim.objects.create(client=client, name='email', value='django_user_email', scopes='email')
1203
        OIDCClaim.objects.create(
1204
            client=client, name='email_verified', value='django_user_email_verified', scopes='email'
1205
        )
1206

  
1207
    new_apps = migration.apply(migrate_to)
1208
    OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient')
1209

  
1210
    client = OIDCClient.objects.first()
1211
    for client in OIDCClient.objects.all():
1212
        claims = client.oidcclaim_set.all()
1213
        if client.name == 'test':
1214
            assert claims.count() == 5
1215
            assert sorted(claims.values_list('name', flat=True)) == [
1216
                'email',
1217
                'email_verified',
1218
                'family_name',
1219
                'given_name',
1220
                'preferred_username',
1221
            ]
1222
            assert sorted(claims.values_list('value', flat=True)) == [
1223
                'django_user_email',
1224
                'django_user_email_verified',
1225
                'django_user_first_name',
1226
                'django_user_identifier',
1227
                'django_user_last_name',
1228
            ]
1229
        elif client.name == 'test2':
1230
            assert claims.count() == 3
1231
            assert sorted(claims.values_list('name', flat=True)) == [
1232
                'family_name',
1233
                'given_name',
1234
                'preferred_username',
1235
            ]
1236
            assert sorted(claims.values_list('value', flat=True)) == [
1237
                'django_user_first_name',
1238
                'django_user_last_name',
1239
                'django_user_username',
1240
            ]
1241
        elif client.name == 'test3':
1242
            assert claims.count() == 5
1243
            assert sorted(claims.values_list('name', flat=True)) == [
1244
                'email',
1245
                'email_verified',
1246
                'family_name',
1247
                'given_name',
1248
                'preferred_username',
1249
            ]
1250
            assert sorted(claims.values_list('value', flat=True)) == [
1251
                'django_user_email',
1252
                'django_user_email_verified',
1253
                'django_user_first_name',
1254
                'django_user_full_name',
1255
                'django_user_last_name',
1256
            ]
1257
        else:
1258
            assert claims.count() == 0
1259

  
1260

  
1261
def test_api_synchronization(app, oidc_client):
1262
    oidc_client.has_api_access = True
1263
    oidc_client.save()
1264
    users = [User.objects.create(username='user-%s' % i) for i in range(10)]
1265
    for user in users[5:]:
1266
        user.delete()
1267
    deleted_subs = set(make_sub(oidc_client, user) for user in users[5:])
1268

  
1269
    app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret))
1270
    status = 200
1271
    if oidc_client.identifier_policy not in (OIDCClient.POLICY_PAIRWISE_REVERSIBLE, OIDCClient.POLICY_UUID):
1272
        status = 401
1273
    response = app.post_json(
1274
        '/api/users/synchronization/',
1275
        params={'known_uuids': [make_sub(oidc_client, user) for user in users]},
1276
        status=status,
1277
    )
1278
    if status == 200:
1279
        assert response.json['result'] == 1
1280
        assert set(response.json['unknown_uuids']) == deleted_subs
1281

  
1282

  
1283 999
def test_claim_default_value(oidc_settings, normal_oidc_client, simple_user, app):
1284 1000
    oidc_settings.A2_IDP_OIDC_SCOPES = ['openid', 'profile', 'email', 'phone']
1285 1001
    Attribute.objects.create(
......
1922 1638
    rf = RequestFactory()
1923 1639
    request = rf.get('/')
1924 1640
    assert good_next_url(request, 'https://example.com/')
1925

  
1926

  
1927
@pytest.fixture
1928
def access_token(client, simple_user):
1929
    return OIDCAccessToken.objects.create(
1930
        client=client,
1931
        user=simple_user,
1932
        scopes='openid profile email',
1933
        expired=now() + datetime.timedelta(seconds=3600),
1934
    )
1935

  
1936

  
1937
def test_user_info(app, client, access_token, freezer):
1938
    def get_user_info(**kwargs):
1939
        return app.get(
1940
            '/idp/oidc/user_info/', headers=bearer_authentication_headers(access_token.uuid), **kwargs
1941
        )
1942

  
1943
    response = app.get('/idp/oidc/user_info/', status=401)
1944
    assert (
1945
        response['WWW-Authenticate']
1946
        == 'Bearer error="invalid_request", error_description="Bearer authentication is mandatory"'
1947
    )
1948

  
1949
    response = app.get('/idp/oidc/user_info/', headers={'Authorization': 'Bearer'}, status=401)
1950
    assert (
1951
        response['WWW-Authenticate']
1952
        == 'Bearer error="invalid_request", error_description="Invalid Bearer authentication"'
1953
    )
1954

  
1955
    response = get_user_info(status=200)
1956
    assert dict(response.json, sub='') == {
1957
        'email': 'user@example.net',
1958
        'email_verified': False,
1959
        'family_name': 'Dôe',
1960
        'family_name_verified': True,
1961
        'given_name': 'Jôhn',
1962
        'given_name_verified': True,
1963
        'preferred_username': 'user',
1964
        'sub': '',
1965
    }
1966

  
1967
    # token is expired
1968
    access_token.expired = now() - datetime.timedelta(seconds=1)
1969
    access_token.save()
1970
    response = get_user_info(status=401)
1971
    assert (
1972
        response['WWW-Authenticate']
1973
        == 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
1974
    )
1975

  
1976
    # token is unknown
1977
    access_token.delete()
1978
    response = get_user_info(status=401)
1979
    assert response['WWW-Authenticate'] == 'Bearer error="invalid_token", error_description="Token unknown"'
1980

  
1981
    utils.login(app, access_token.user)
1982
    access_token.expired = now() + datetime.timedelta(seconds=1)
1983
    access_token.session_key = app.session.session_key
1984
    access_token.save()
1985

  
1986
    get_user_info(status=200)
1987

  
1988
    app.session.flush()
1989
    response = get_user_info(status=401)
1990
    assert (
1991
        response['WWW-Authenticate']
1992
        == 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
1993
    )
1994

  
1995

  
1996
@pytest.fixture
1997
def session(settings, db, simple_user):
1998
    engine = import_module(settings.SESSION_ENGINE)
1999
    session = engine.SessionStore()
2000
    session['_auth_user_id'] = str(simple_user.id)
2001
    session.create()
2002
    return session
2003

  
2004

  
2005
def test_access_token_is_valid_session(simple_oidc_client, simple_user, session):
2006
    token = OIDCAccessToken.objects.create(
2007
        client=simple_oidc_client, user=simple_user, scopes='openid', session_key=session.session_key
2008
    )
2009

  
2010
    assert token.is_valid()
2011
    session.flush()
2012
    assert not token.is_valid()
2013

  
2014

  
2015
def test_access_token_is_valid_expired(simple_oidc_client, simple_user, freezer):
2016
    start = now()
2017
    expired = start + datetime.timedelta(seconds=30)
2018

  
2019
    token = OIDCAccessToken.objects.create(
2020
        client=simple_oidc_client, user=simple_user, scopes='openid', expired=expired
2021
    )
2022

  
2023
    assert token.is_valid()
2024
    freezer.move_to(expired)
2025
    assert token.is_valid()
2026
    freezer.move_to(expired + datetime.timedelta(seconds=1))
2027
    assert not token.is_valid()
2028

  
2029

  
2030
def test_access_token_is_valid_session_and_expired(simple_oidc_client, simple_user, session, freezer):
2031
    start = now()
2032
    expired = start + datetime.timedelta(seconds=30)
2033

  
2034
    token = OIDCAccessToken.objects.create(
2035
        client=simple_oidc_client,
2036
        user=simple_user,
2037
        scopes='openid',
2038
        session_key=session.session_key,
2039
        expired=expired,
2040
    )
2041

  
2042
    assert token.is_valid()
2043
    freezer.move_to(expired)
2044
    assert token.is_valid()
2045
    freezer.move_to(expired + datetime.timedelta(seconds=1))
2046
    assert not token.is_valid()
2047
    freezer.move_to(start)
2048
    assert token.is_valid()
2049
    session.flush()
2050
    assert not token.is_valid()
tests/idp_oidc/test_models.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2021 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import datetime
18

  
19
from django.utils.timezone import now
20

  
21
from authentic2.a2_rbac.utils import get_default_ou
22
from authentic2_idp_oidc.models import OIDCAccessToken, OIDCAuthorization, OIDCClient, OIDCCode
23

  
24

  
25
def test_expired_manager(db, simple_user):
26
    expired = now() - datetime.timedelta(seconds=1)
27
    not_expired = now() + datetime.timedelta(days=1)
28
    client = OIDCClient.objects.create(
29
        name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/'
30
    )
31
    OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=expired)
32
    OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=not_expired)
33
    assert OIDCAuthorization.objects.count() == 2
34
    OIDCAuthorization.objects.cleanup()
35
    assert OIDCAuthorization.objects.count() == 1
36

  
37
    OIDCCode.objects.create(
38
        client=client,
39
        user=simple_user,
40
        scopes='openid',
41
        redirect_uri='https://example.com/',
42
        session_key='xxx',
43
        auth_time=now(),
44
        expired=expired,
45
    )
46
    OIDCCode.objects.create(
47
        client=client,
48
        user=simple_user,
49
        scopes='openid',
50
        redirect_uri='https://example.com/',
51
        session_key='xxx',
52
        auth_time=now(),
53
        expired=not_expired,
54
    )
55
    assert OIDCCode.objects.count() == 2
56
    OIDCCode.objects.cleanup()
57
    assert OIDCCode.objects.count() == 1
58

  
59
    OIDCAccessToken.objects.create(
60
        client=client, user=simple_user, scopes='openid', session_key='xxx', expired=expired
61
    )
62
    OIDCAccessToken.objects.create(
63
        client=client, user=simple_user, scopes='openid', session_key='xxx', expired=not_expired
64
    )
65
    assert OIDCAccessToken.objects.count() == 2
66
    OIDCAccessToken.objects.cleanup()
67
    assert OIDCAccessToken.objects.count() == 1
68

  
69

  
70
def test_access_token_is_valid_session(simple_oidc_client, simple_user, session):
71
    token = OIDCAccessToken.objects.create(
72
        client=simple_oidc_client, user=simple_user, scopes='openid', session_key=session.session_key
73
    )
74

  
75
    assert token.is_valid()
76
    session.flush()
77
    assert not token.is_valid()
78

  
79

  
80
def test_access_token_is_valid_expired(simple_oidc_client, simple_user, freezer):
81
    start = now()
82
    expired = start + datetime.timedelta(seconds=30)
83

  
84
    token = OIDCAccessToken.objects.create(
85
        client=simple_oidc_client, user=simple_user, scopes='openid', expired=expired
86
    )
87

  
88
    assert token.is_valid()
89
    freezer.move_to(expired)
90
    assert token.is_valid()
91
    freezer.move_to(expired + datetime.timedelta(seconds=1))
92
    assert not token.is_valid()
93

  
94

  
95
def test_access_token_is_valid_session_and_expired(simple_oidc_client, simple_user, session, freezer):
96
    start = now()
97
    expired = start + datetime.timedelta(seconds=30)
98

  
99
    token = OIDCAccessToken.objects.create(
100
        client=simple_oidc_client,
101
        user=simple_user,
102
        scopes='openid',
103
        session_key=session.session_key,
104
        expired=expired,
105
    )
106

  
107
    assert token.is_valid()
108
    freezer.move_to(expired)
109
    assert token.is_valid()
110
    freezer.move_to(expired + datetime.timedelta(seconds=1))
111
    assert not token.is_valid()
112
    freezer.move_to(start)
113
    assert token.is_valid()
114
    session.flush()
115
    assert not token.is_valid()
tests/idp_oidc/test_views.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2021 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import datetime
18

  
19
from django.utils.timezone import now
20

  
21
from authentic2_idp_oidc.models import OIDCAccessToken
22

  
23
from .. import utils
24
from .conftest import bearer_authentication_headers
25

  
26

  
27
def test_user_info(app, client, freezer, simple_user):
28
    access_token = OIDCAccessToken.objects.create(
29
        client=client,
30
        user=simple_user,
31
        scopes='openid profile email',
32
        expired=now() + datetime.timedelta(seconds=3600),
33
    )
34

  
35
    def get_user_info(**kwargs):
36
        return app.get(
37
            '/idp/oidc/user_info/', headers=bearer_authentication_headers(access_token.uuid), **kwargs
38
        )
39

  
40
    response = app.get('/idp/oidc/user_info/', status=401)
41
    assert (
42
        response['WWW-Authenticate']
43
        == 'Bearer error="invalid_request", error_description="Bearer authentication is mandatory"'
44
    )
45

  
46
    response = app.get('/idp/oidc/user_info/', headers={'Authorization': 'Bearer'}, status=401)
47
    assert (
48
        response['WWW-Authenticate']
49
        == 'Bearer error="invalid_request", error_description="Invalid Bearer authentication"'
50
    )
51

  
52
    response = get_user_info(status=200)
53
    assert dict(response.json, sub='') == {
54
        'email': 'user@example.net',
55
        'email_verified': False,
56
        'family_name': 'Dôe',
57
        'family_name_verified': True,
58
        'given_name': 'Jôhn',
59
        'given_name_verified': True,
60
        'preferred_username': 'user',
61
        'sub': '',
62
    }
63

  
64
    # token is expired
65
    access_token.expired = now() - datetime.timedelta(seconds=1)
66
    access_token.save()
67
    response = get_user_info(status=401)
68
    assert (
69
        response['WWW-Authenticate']
70
        == 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
71
    )
72

  
73
    # token is unknown
74
    access_token.delete()
75
    response = get_user_info(status=401)
76
    assert response['WWW-Authenticate'] == 'Bearer error="invalid_token", error_description="Token unknown"'
77

  
78
    utils.login(app, access_token.user)
79
    access_token.expired = now() + datetime.timedelta(seconds=1)
80
    access_token.session_key = app.session.session_key
81
    access_token.save()
82

  
83
    get_user_info(status=200)
84

  
85
    app.session.flush()
86
    response = get_user_info(status=401)
87
    assert (
88
        response['WWW-Authenticate']
89
        == 'Bearer error="invalid_token", error_description="Token expired or user disconnected"'
90
    )
0
-