0002-tests-move-idp_oidc-tests-in-a-subdirectory-54740.patch
tests/idp_oidc/conftest.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2021 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import base64 |
|
18 |
from importlib import import_module |
|
19 | ||
20 |
import pytest |
|
21 |
from django.urls import reverse |
|
22 | ||
23 |
from authentic2.a2_rbac.utils import get_default_ou |
|
24 |
from authentic2.models import Attribute |
|
25 |
from authentic2_idp_oidc import app_settings |
|
26 |
from authentic2_idp_oidc.models import OIDCClaim, OIDCClient |
|
27 |
from tests import utils |
|
28 | ||
29 |
JWKSET = { |
|
30 |
"keys": [ |
|
31 |
{ |
|
32 |
"qi": "h_zifVD-ChelxZUVxhICNcgGkQz26b-EdIlLY9rN7SX_aD3sLI_JHEHV4Bz3kV5eW8O4qJ8SHhfUdHGK-gRH7FVOGoXnXACf47QoXowHzsPLL64wCuZENTl7hIRGLY-BInULkfTQfuiVSMoxPjsVNTMBzMiz0bNjMQyMyvW5xH4", |
|
33 |
"kty": "RSA", |
|
34 |
"d": "pUcL4-LDBy3rqJWip269h5Hd6nLvqjXltfkVe_mL-LwZPHmCrUaj_SX54SnCY3Wyf7kxhoMYUac62lQ71923uJPFFdiavAujbNrtZPq32i4C-1apWXW8OGJr8VoVDqalxj9SAq1G54wbbsaAPrZdyuqy-esNxDqDigfbM-cWgngBBYo5CSsfnmnd05N2cUS26L7QzWbNHwilnBTE9e_J7rK3xUCDKrobv6_LiI-AhMmBHJSrCxjexh0wzfBi_Ntj9BGCcPThDjG8SQvaV-aLNdLfIy2XO3i076RLBB6Hm_yHuAparrwp-pPE48eQdiYjrSAFalz4ojWQ3_ByLA6uAQ", |
|
35 |
"q": "2FvfeWnIlWNUipan7DIBlJrmz5EinJNxrQ-BNwPHrAoIM8qvyC7jPy09YxZs5Y9CMMZSal6C4Nm2LHBFxHU9z1qd5XDzbk19G-y1lDqZizVXr876TpiAjuq03rcoMQm8dQru_pVjUdgxR64vKyJ9CaFMAqcpZeEMIqAvzhQG8uE", |
|
36 |
"dp": "Kg4HPGpzenhK2ser6nfM1Yt-pkqBbWQotvqsxGptECXpbN7vweupvL5kJPeRrbsXKp9QE7DXTN1sG9puJxMSwtgiv4hr9Va9e9WOC6PMd2VY7tgw5uKMpPLMc5y82PusRhBoRh0SUUsjyQxK9PGtWYnGZXbAoaIYPdMyDlosfqU", |
|
37 |
"dq": "QuUNEHYTjZTbo8n2-4FumarXKGBAalbwM8jyc7cYemnTpWfKt8M_gd4T99oMK2IC3h_DhZ3ZK3pE6DKCb76sMLtczH8C1RziTMsATWdc5_zDMtl07O4b-ZQ5_g51P8w515pc0JwRzFFi0z3Y2aZdMKgNX1id5SES5nXOshHhICE", |
|
38 |
"n": "0lN6CiJGFD8BSPV_azLoEl6Nq-WlHkU743D5rqvzw1sOaxstMGxAhVk2YIhWwfvapV6XjO_yvc4778VBTELOdjRw6BGUdBJepdwkL__TPyjEVhqMQj9MKhEU4GUy9w0Lsilb5D01kfrOKpmdcYw4jhcDvb0H4-LZgh1Vk84vF4WaQCUg_AX4drVDQOjoU8kuWIM8gz9w6zEsbIw-gtMRpFwS8ncA0zDX5VfyC77iMxzFftDIP2gM5GvdevMzvP9IRkRRBhP9vV4JchBFPHSA9OPJcnySjJJNW6aAJn6P6JasN1z68khjufM09J8UzmLAZYOq7gUG95Ox1KsV-g337Q", |
|
39 |
"e": "AQAB", |
|
40 |
"p": "-Nyj_Sw3f2HUqSssCZv84y7b3blOtGGAhfYN_JtGfcTQv2bOtxrIUzeonCi-Z_1W4hO10tqxJcOB0ibtDqkDlLhnLaIYOBfriITRFK83EJG5sC-0KTmFzUXFTA2aMc1QgP-Fu6gUfQpPqLgWxhx8EFhkBlBZshKU5-C-385Sco0", |
|
41 |
"kid": "46c686ea-7d4e-41cd-a462-2125fc1dee0e", |
|
42 |
}, |
|
43 |
{ |
|
44 |
"kty": "EC", |
|
45 |
"d": "wwULaR9UYWZW6U2oEbkz3sO1lhPSj6DyA6e7PiUfhog", |
|
46 |
"use": "sig", |
|
47 |
"crv": "P-256", |
|
48 |
"x": "HZMHZkX-63heqA5pvWn-UR7bgcXZNEcQa5wfvG_BzTw", |
|
49 |
"y": "SUCuwjjiyKvGq5Odr0sjDqjha_CBqks0JQFrR7Ei5OQ", |
|
50 |
"alg": "ES256", |
|
51 |
"kid": "ac85baf4-835b-49b2-8272-ffecce7654c9", |
|
52 |
}, |
|
53 |
] |
|
54 |
} |
|
55 | ||
56 | ||
57 |
@pytest.fixture |
|
58 |
def jwkset(): |
|
59 |
return JWKSET |
|
60 | ||
61 | ||
62 |
@pytest.fixture |
|
63 |
def oidc_settings(settings, jwkset): |
|
64 |
settings.A2_IDP_OIDC_JWKSET = jwkset |
|
65 |
settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT = '100/m' |
|
66 |
return settings |
|
67 | ||
68 | ||
69 |
def make_client(app, superuser, params=None): |
|
70 |
Attribute.objects.create( |
|
71 |
name='cityscape_image', |
|
72 |
label='cityscape', |
|
73 |
kind='profile_image', |
|
74 |
asked_on_registration=True, |
|
75 |
required=False, |
|
76 |
user_visible=True, |
|
77 |
user_editable=True, |
|
78 |
) |
|
79 | ||
80 |
client = OIDCClient( |
|
81 |
name='oidcclient', |
|
82 |
slug='oidcclient', |
|
83 |
ou=get_default_ou(), |
|
84 |
unauthorized_url='https://example.com/southpark/', |
|
85 |
redirect_uris='https://example.com/callbac%C3%A9', |
|
86 |
) |
|
87 | ||
88 |
for key, value in (params or {}).items(): |
|
89 |
setattr(client, key, value) |
|
90 |
client.save() |
|
91 |
for mapping in app_settings.DEFAULT_MAPPINGS: |
|
92 |
OIDCClaim.objects.create( |
|
93 |
client=client, name=mapping['name'], value=mapping['value'], scopes=mapping['scopes'] |
|
94 |
) |
|
95 |
return client |
|
96 | ||
97 | ||
98 |
@pytest.fixture |
|
99 |
def client(app, superuser): |
|
100 |
return make_client(app, superuser, {}) |
|
101 | ||
102 | ||
103 |
@pytest.fixture |
|
104 |
def simple_oidc_client(db): |
|
105 |
return OIDCClient.objects.create( |
|
106 |
name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/' |
|
107 |
) |
|
108 | ||
109 | ||
110 |
@pytest.fixture |
|
111 |
def oidc_client(request, superuser, app, simple_user, oidc_settings): |
|
112 |
return make_client(app, superuser, getattr(request, 'param', None) or {}) |
|
113 | ||
114 | ||
115 |
@pytest.fixture |
|
116 |
def normal_oidc_client(superuser, app, simple_user): |
|
117 |
url = reverse('admin:authentic2_idp_oidc_oidcclient_add') |
|
118 |
assert OIDCClient.objects.count() == 0 |
|
119 |
response = utils.login(app, superuser, path=url) |
|
120 |
response.form.set('name', 'oidcclient') |
|
121 |
response.form.set('slug', 'oidcclient') |
|
122 |
response.form.set('ou', get_default_ou().pk) |
|
123 |
response.form.set('unauthorized_url', 'https://example.com/southpark/') |
|
124 |
response.form.set('redirect_uris', 'https://example.com/callbac%C3%A9') |
|
125 |
response = response.form.submit(name='_save').follow() |
|
126 |
assert OIDCClient.objects.count() == 1 |
|
127 |
client = OIDCClient.objects.get() |
|
128 |
utils.logout(app) |
|
129 |
return client |
|
130 | ||
131 | ||
132 |
@pytest.fixture |
|
133 |
def session(settings, db, simple_user): |
|
134 |
engine = import_module(settings.SESSION_ENGINE) |
|
135 |
session = engine.SessionStore() |
|
136 |
session['_auth_user_id'] = str(simple_user.id) |
|
137 |
session.create() |
|
138 |
return session |
|
139 | ||
140 | ||
141 |
def client_authentication_headers(oidc_client): |
|
142 |
client_creds = '%s:%s' % (oidc_client.client_id, oidc_client.client_secret) |
|
143 |
token = base64.b64encode(client_creds.encode('ascii')) |
|
144 |
return {'Authorization': 'Basic %s' % str(token.decode('ascii'))} |
|
145 | ||
146 | ||
147 |
def bearer_authentication_headers(access_token): |
|
148 |
return {'Authorization': 'Bearer %s' % str(access_token)} |
tests/idp_oidc/test_api.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2021 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from authentic2.custom_user.models import User |
|
18 |
from authentic2_idp_oidc.models import OIDCClient |
|
19 |
from authentic2_idp_oidc.utils import make_sub |
|
20 | ||
21 | ||
22 |
def test_api_synchronization(app, oidc_client): |
|
23 |
oidc_client.has_api_access = True |
|
24 |
oidc_client.save() |
|
25 |
users = [User.objects.create(username='user-%s' % i) for i in range(10)] |
|
26 |
for user in users[5:]: |
|
27 |
user.delete() |
|
28 |
deleted_subs = set(make_sub(oidc_client, user) for user in users[5:]) |
|
29 | ||
30 |
app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret)) |
|
31 |
status = 200 |
|
32 |
if oidc_client.identifier_policy not in (OIDCClient.POLICY_PAIRWISE_REVERSIBLE, OIDCClient.POLICY_UUID): |
|
33 |
status = 401 |
|
34 |
response = app.post_json( |
|
35 |
'/api/users/synchronization/', |
|
36 |
params={'known_uuids': [make_sub(oidc_client, user) for user in users]}, |
|
37 |
status=status, |
|
38 |
) |
|
39 |
if status == 200: |
|
40 |
assert response.json['result'] == 1 |
|
41 |
assert set(response.json['unknown_uuids']) == deleted_subs |
tests/idp_oidc/test_migrations.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2021 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 | ||
18 |
def test_oidclient_claims_data_migration(migration): |
|
19 |
app = 'authentic2_idp_oidc' |
|
20 |
migrate_from = [(app, '0009_auto_20180313_1156')] |
|
21 |
migrate_to = [(app, '0010_oidcclaim')] |
|
22 | ||
23 |
old_apps = migration.before(migrate_from) |
|
24 |
OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient') |
|
25 | ||
26 |
client = OIDCClient(name='test', slug='test', redirect_uris='https://example.net/') |
|
27 |
client.save() |
|
28 | ||
29 |
new_apps = migration.apply(migrate_to) |
|
30 |
OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient') |
|
31 |
OIDCClaim = new_apps.get_model('authentic2_idp_oidc', 'OIDCClaim') |
|
32 | ||
33 |
client = OIDCClient.objects.first() |
|
34 |
assert OIDCClaim.objects.filter(client=client.id).count() == 5 |
|
35 | ||
36 | ||
37 |
def test_oidclient_preferred_username_as_identifier_data_migration(migration): |
|
38 |
app = 'authentic2_idp_oidc' |
|
39 |
migrate_from = [(app, '0010_oidcclaim')] |
|
40 |
migrate_to = [(app, '0011_auto_20180808_1546')] |
|
41 | ||
42 |
old_apps = migration.before(migrate_from) |
|
43 |
OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient') |
|
44 |
OIDCClaim = old_apps.get_model('authentic2_idp_oidc', 'OIDCClaim') |
|
45 | ||
46 |
client1 = OIDCClient.objects.create(name='test', slug='test', redirect_uris='https://example.net/') |
|
47 |
client2 = OIDCClient.objects.create(name='test1', slug='test1', redirect_uris='https://example.net/') |
|
48 |
client3 = OIDCClient.objects.create(name='test2', slug='test2', redirect_uris='https://example.net/') |
|
49 |
client4 = OIDCClient.objects.create(name='test3', slug='test3', redirect_uris='https://example.net/') |
|
50 |
for client in (client1, client2, client3, client4): |
|
51 |
if client.name == 'test1': |
|
52 |
continue |
|
53 |
if client.name == 'test3': |
|
54 |
OIDCClaim.objects.create( |
|
55 |
client=client, name='preferred_username', value='django_user_full_name', scopes='profile' |
|
56 |
) |
|
57 |
else: |
|
58 |
OIDCClaim.objects.create( |
|
59 |
client=client, name='preferred_username', value='django_user_username', scopes='profile' |
|
60 |
) |
|
61 |
OIDCClaim.objects.create( |
|
62 |
client=client, name='given_name', value='django_user_first_name', scopes='profile' |
|
63 |
) |
|
64 |
OIDCClaim.objects.create( |
|
65 |
client=client, name='family_name', value='django_user_last_name', scopes='profile' |
|
66 |
) |
|
67 |
if client.name == 'test2': |
|
68 |
continue |
|
69 |
OIDCClaim.objects.create(client=client, name='email', value='django_user_email', scopes='email') |
|
70 |
OIDCClaim.objects.create( |
|
71 |
client=client, name='email_verified', value='django_user_email_verified', scopes='email' |
|
72 |
) |
|
73 | ||
74 |
new_apps = migration.apply(migrate_to) |
|
75 |
OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient') |
|
76 | ||
77 |
client = OIDCClient.objects.first() |
|
78 |
for client in OIDCClient.objects.all(): |
|
79 |
claims = client.oidcclaim_set.all() |
|
80 |
if client.name == 'test': |
|
81 |
assert claims.count() == 5 |
|
82 |
assert sorted(claims.values_list('name', flat=True)) == [ |
|
83 |
'email', |
|
84 |
'email_verified', |
|
85 |
'family_name', |
|
86 |
'given_name', |
|
87 |
'preferred_username', |
|
88 |
] |
|
89 |
assert sorted(claims.values_list('value', flat=True)) == [ |
|
90 |
'django_user_email', |
|
91 |
'django_user_email_verified', |
|
92 |
'django_user_first_name', |
|
93 |
'django_user_identifier', |
|
94 |
'django_user_last_name', |
|
95 |
] |
|
96 |
elif client.name == 'test2': |
|
97 |
assert claims.count() == 3 |
|
98 |
assert sorted(claims.values_list('name', flat=True)) == [ |
|
99 |
'family_name', |
|
100 |
'given_name', |
|
101 |
'preferred_username', |
|
102 |
] |
|
103 |
assert sorted(claims.values_list('value', flat=True)) == [ |
|
104 |
'django_user_first_name', |
|
105 |
'django_user_last_name', |
|
106 |
'django_user_username', |
|
107 |
] |
|
108 |
elif client.name == 'test3': |
|
109 |
assert claims.count() == 5 |
|
110 |
assert sorted(claims.values_list('name', flat=True)) == [ |
|
111 |
'email', |
|
112 |
'email_verified', |
|
113 |
'family_name', |
|
114 |
'given_name', |
|
115 |
'preferred_username', |
|
116 |
] |
|
117 |
assert sorted(claims.values_list('value', flat=True)) == [ |
|
118 |
'django_user_email', |
|
119 |
'django_user_email_verified', |
|
120 |
'django_user_first_name', |
|
121 |
'django_user_full_name', |
|
122 |
'django_user_last_name', |
|
123 |
] |
|
124 |
else: |
|
125 |
assert claims.count() == 0 |
tests/test_idp_oidc.py → tests/idp_oidc/test_misc.py | ||
---|---|---|
1 | 1 |
# authentic2 - versatile identity manager |
2 |
# Copyright (C) 2010-2019 Entr'ouvert
|
|
2 |
# Copyright (C) 2010-2021 Entr'ouvert
|
|
3 | 3 |
# |
4 | 4 |
# This program is free software: you can redistribute it and/or modify it |
5 | 5 |
# under the terms of the GNU Affero General Public License as published |
... | ... | |
19 | 19 |
import functools |
20 | 20 |
import json |
21 | 21 |
import urllib.parse |
22 |
from importlib import import_module |
|
23 | 22 | |
24 | 23 |
import pytest |
25 | 24 |
from django.contrib.auth import get_user_model |
... | ... | |
37 | 36 |
from authentic2.models import Attribute, AuthorizedRole |
38 | 37 |
from authentic2.utils import good_next_url, make_url |
39 | 38 |
from authentic2_auth_oidc.utils import parse_timestamp |
40 |
from authentic2_idp_oidc import app_settings |
|
41 | 39 |
from authentic2_idp_oidc.models import OIDCAccessToken, OIDCAuthorization, OIDCClaim, OIDCClient, OIDCCode |
42 | 40 |
from authentic2_idp_oidc.utils import base64url, get_first_ec_sig_key, get_first_rsa_sig_key, make_sub |
43 | 41 |
from django_rbac.utils import get_ou_model, get_role_model |
44 | 42 | |
45 |
from . import utils |
|
43 |
from .. import utils |
|
44 |
from .conftest import bearer_authentication_headers, client_authentication_headers |
|
46 | 45 | |
47 | 46 |
User = get_user_model() |
48 | 47 | |
49 | 48 |
pytestmark = pytest.mark.django_db |
50 | 49 | |
51 |
JWKSET = { |
|
52 |
"keys": [ |
|
53 |
{ |
|
54 |
"qi": "h_zifVD-ChelxZUVxhICNcgGkQz26b-EdIlLY9rN7SX_aD3sLI_JHEHV4Bz3kV5eW8O4qJ8SHhfUdHGK-gRH7FVOGoXnXACf47QoXowHzsPLL64wCuZENTl7hIRGLY-BInULkfTQfuiVSMoxPjsVNTMBzMiz0bNjMQyMyvW5xH4", |
|
55 |
"kty": "RSA", |
|
56 |
"d": "pUcL4-LDBy3rqJWip269h5Hd6nLvqjXltfkVe_mL-LwZPHmCrUaj_SX54SnCY3Wyf7kxhoMYUac62lQ71923uJPFFdiavAujbNrtZPq32i4C-1apWXW8OGJr8VoVDqalxj9SAq1G54wbbsaAPrZdyuqy-esNxDqDigfbM-cWgngBBYo5CSsfnmnd05N2cUS26L7QzWbNHwilnBTE9e_J7rK3xUCDKrobv6_LiI-AhMmBHJSrCxjexh0wzfBi_Ntj9BGCcPThDjG8SQvaV-aLNdLfIy2XO3i076RLBB6Hm_yHuAparrwp-pPE48eQdiYjrSAFalz4ojWQ3_ByLA6uAQ", |
|
57 |
"q": "2FvfeWnIlWNUipan7DIBlJrmz5EinJNxrQ-BNwPHrAoIM8qvyC7jPy09YxZs5Y9CMMZSal6C4Nm2LHBFxHU9z1qd5XDzbk19G-y1lDqZizVXr876TpiAjuq03rcoMQm8dQru_pVjUdgxR64vKyJ9CaFMAqcpZeEMIqAvzhQG8uE", |
|
58 |
"dp": "Kg4HPGpzenhK2ser6nfM1Yt-pkqBbWQotvqsxGptECXpbN7vweupvL5kJPeRrbsXKp9QE7DXTN1sG9puJxMSwtgiv4hr9Va9e9WOC6PMd2VY7tgw5uKMpPLMc5y82PusRhBoRh0SUUsjyQxK9PGtWYnGZXbAoaIYPdMyDlosfqU", |
|
59 |
"dq": "QuUNEHYTjZTbo8n2-4FumarXKGBAalbwM8jyc7cYemnTpWfKt8M_gd4T99oMK2IC3h_DhZ3ZK3pE6DKCb76sMLtczH8C1RziTMsATWdc5_zDMtl07O4b-ZQ5_g51P8w515pc0JwRzFFi0z3Y2aZdMKgNX1id5SES5nXOshHhICE", |
|
60 |
"n": "0lN6CiJGFD8BSPV_azLoEl6Nq-WlHkU743D5rqvzw1sOaxstMGxAhVk2YIhWwfvapV6XjO_yvc4778VBTELOdjRw6BGUdBJepdwkL__TPyjEVhqMQj9MKhEU4GUy9w0Lsilb5D01kfrOKpmdcYw4jhcDvb0H4-LZgh1Vk84vF4WaQCUg_AX4drVDQOjoU8kuWIM8gz9w6zEsbIw-gtMRpFwS8ncA0zDX5VfyC77iMxzFftDIP2gM5GvdevMzvP9IRkRRBhP9vV4JchBFPHSA9OPJcnySjJJNW6aAJn6P6JasN1z68khjufM09J8UzmLAZYOq7gUG95Ox1KsV-g337Q", |
|
61 |
"e": "AQAB", |
|
62 |
"p": "-Nyj_Sw3f2HUqSssCZv84y7b3blOtGGAhfYN_JtGfcTQv2bOtxrIUzeonCi-Z_1W4hO10tqxJcOB0ibtDqkDlLhnLaIYOBfriITRFK83EJG5sC-0KTmFzUXFTA2aMc1QgP-Fu6gUfQpPqLgWxhx8EFhkBlBZshKU5-C-385Sco0", |
|
63 |
"kid": "46c686ea-7d4e-41cd-a462-2125fc1dee0e", |
|
64 |
}, |
|
65 |
{ |
|
66 |
"kty": "EC", |
|
67 |
"d": "wwULaR9UYWZW6U2oEbkz3sO1lhPSj6DyA6e7PiUfhog", |
|
68 |
"use": "sig", |
|
69 |
"crv": "P-256", |
|
70 |
"x": "HZMHZkX-63heqA5pvWn-UR7bgcXZNEcQa5wfvG_BzTw", |
|
71 |
"y": "SUCuwjjiyKvGq5Odr0sjDqjha_CBqks0JQFrR7Ei5OQ", |
|
72 |
"alg": "ES256", |
|
73 |
"kid": "ac85baf4-835b-49b2-8272-ffecce7654c9", |
|
74 |
}, |
|
75 |
] |
|
76 |
} |
|
77 | ||
78 | ||
79 |
@pytest.fixture |
|
80 |
def oidc_settings(settings): |
|
81 |
settings.A2_IDP_OIDC_JWKSET = JWKSET |
|
82 |
settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT = '100/m' |
|
83 |
return settings |
|
84 | ||
85 | 50 | |
86 | 51 |
def test_get_jwkset(oidc_settings): |
87 | 52 |
from authentic2_idp_oidc.utils import get_jwkset |
... | ... | |
159 | 124 |
assert OIDCClient.objects.count() == 1 |
160 | 125 | |
161 | 126 | |
162 |
def make_client(app, superuser, params=None): |
|
163 |
Attribute.objects.create( |
|
164 |
name='cityscape_image', |
|
165 |
label='cityscape', |
|
166 |
kind='profile_image', |
|
167 |
asked_on_registration=True, |
|
168 |
required=False, |
|
169 |
user_visible=True, |
|
170 |
user_editable=True, |
|
171 |
) |
|
172 | ||
173 |
client = OIDCClient( |
|
174 |
name='oidcclient', |
|
175 |
slug='oidcclient', |
|
176 |
ou=get_default_ou(), |
|
177 |
unauthorized_url='https://example.com/southpark/', |
|
178 |
redirect_uris='https://example.com/callbac%C3%A9', |
|
179 |
) |
|
180 | ||
181 |
for key, value in (params or {}).items(): |
|
182 |
setattr(client, key, value) |
|
183 |
client.save() |
|
184 |
for mapping in app_settings.DEFAULT_MAPPINGS: |
|
185 |
OIDCClaim.objects.create( |
|
186 |
client=client, name=mapping['name'], value=mapping['value'], scopes=mapping['scopes'] |
|
187 |
) |
|
188 |
return client |
|
189 | ||
190 | ||
191 |
@pytest.fixture |
|
192 |
def client(app, superuser): |
|
193 |
return make_client(app, superuser, {}) |
|
194 | ||
195 | ||
196 |
@pytest.fixture |
|
197 |
def oidc_client(request, superuser, app, simple_user, oidc_settings): |
|
198 |
return make_client(app, superuser, getattr(request, 'param', None) or {}) |
|
199 | ||
200 | ||
201 |
@pytest.fixture |
|
202 |
def normal_oidc_client(superuser, app, simple_user): |
|
203 |
url = reverse('admin:authentic2_idp_oidc_oidcclient_add') |
|
204 |
assert OIDCClient.objects.count() == 0 |
|
205 |
response = utils.login(app, superuser, path=url) |
|
206 |
response.form.set('name', 'oidcclient') |
|
207 |
response.form.set('slug', 'oidcclient') |
|
208 |
response.form.set('ou', get_default_ou().pk) |
|
209 |
response.form.set('unauthorized_url', 'https://example.com/southpark/') |
|
210 |
response.form.set('redirect_uris', 'https://example.com/callbac%C3%A9') |
|
211 |
response = response.form.submit(name='_save').follow() |
|
212 |
assert OIDCClient.objects.count() == 1 |
|
213 |
client = OIDCClient.objects.get() |
|
214 |
utils.logout(app) |
|
215 |
return client |
|
216 | ||
217 | ||
218 |
def client_authentication_headers(oidc_client): |
|
219 |
client_creds = '%s:%s' % (oidc_client.client_id, oidc_client.client_secret) |
|
220 |
token = base64.b64encode(client_creds.encode('ascii')) |
|
221 |
return {'Authorization': 'Basic %s' % str(token.decode('ascii'))} |
|
222 | ||
223 | ||
224 |
def bearer_authentication_headers(access_token): |
|
225 |
return {'Authorization': 'Bearer %s' % str(access_token)} |
|
226 | ||
227 | ||
228 | 127 |
@pytest.mark.parametrize('oidc_client', OIDC_CLIENT_PARAMS, indirect=True) |
229 | 128 |
@pytest.mark.parametrize('do_not_ask_again', [(True,), (False,)]) |
230 | 129 |
@pytest.mark.parametrize('login_first', [(True,), (False,)]) |
... | ... | |
934 | 833 |
assert response.json['error_description'] == 'Parameter "code" has expired or user is disconnected' |
935 | 834 | |
936 | 835 | |
937 |
def test_expired_manager(db, simple_user): |
|
938 |
expired = now() - datetime.timedelta(seconds=1) |
|
939 |
not_expired = now() + datetime.timedelta(days=1) |
|
940 |
client = OIDCClient.objects.create( |
|
941 |
name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/' |
|
942 |
) |
|
943 |
OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=expired) |
|
944 |
OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=not_expired) |
|
945 |
assert OIDCAuthorization.objects.count() == 2 |
|
946 |
OIDCAuthorization.objects.cleanup() |
|
947 |
assert OIDCAuthorization.objects.count() == 1 |
|
948 | ||
949 |
OIDCCode.objects.create( |
|
950 |
client=client, |
|
951 |
user=simple_user, |
|
952 |
scopes='openid', |
|
953 |
redirect_uri='https://example.com/', |
|
954 |
session_key='xxx', |
|
955 |
auth_time=now(), |
|
956 |
expired=expired, |
|
957 |
) |
|
958 |
OIDCCode.objects.create( |
|
959 |
client=client, |
|
960 |
user=simple_user, |
|
961 |
scopes='openid', |
|
962 |
redirect_uri='https://example.com/', |
|
963 |
session_key='xxx', |
|
964 |
auth_time=now(), |
|
965 |
expired=not_expired, |
|
966 |
) |
|
967 |
assert OIDCCode.objects.count() == 2 |
|
968 |
OIDCCode.objects.cleanup() |
|
969 |
assert OIDCCode.objects.count() == 1 |
|
970 | ||
971 |
OIDCAccessToken.objects.create( |
|
972 |
client=client, user=simple_user, scopes='openid', session_key='xxx', expired=expired |
|
973 |
) |
|
974 |
OIDCAccessToken.objects.create( |
|
975 |
client=client, user=simple_user, scopes='openid', session_key='xxx', expired=not_expired |
|
976 |
) |
|
977 |
assert OIDCAccessToken.objects.count() == 2 |
|
978 |
OIDCAccessToken.objects.cleanup() |
|
979 |
assert OIDCAccessToken.objects.count() == 1 |
|
980 | ||
981 | ||
982 |
@pytest.fixture |
|
983 |
def simple_oidc_client(db): |
|
984 |
return OIDCClient.objects.create( |
|
985 |
name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/' |
|
986 |
) |
|
987 | ||
988 | ||
989 | 836 |
def test_client_secret_post_authentication(oidc_settings, app, simple_oidc_client, simple_user): |
990 | 837 |
utils.login(app, simple_user) |
991 | 838 |
redirect_uri = simple_oidc_client.redirect_uris.split()[0] |
... | ... | |
1149 | 996 |
assert hooks.event[2]['kwargs']['service'] == 'client' |
1150 | 997 | |
1151 | 998 | |
1152 |
def test_oidclient_claims_data_migration(migration): |
|
1153 |
app = 'authentic2_idp_oidc' |
|
1154 |
migrate_from = [(app, '0009_auto_20180313_1156')] |
|
1155 |
migrate_to = [(app, '0010_oidcclaim')] |
|
1156 | ||
1157 |
old_apps = migration.before(migrate_from) |
|
1158 |
OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient') |
|
1159 | ||
1160 |
client = OIDCClient(name='test', slug='test', redirect_uris='https://example.net/') |
|
1161 |
client.save() |
|
1162 | ||
1163 |
new_apps = migration.apply(migrate_to) |
|
1164 |
OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient') |
|
1165 | ||
1166 |
client = OIDCClient.objects.first() |
|
1167 |
assert OIDCClaim.objects.filter(client=client.id).count() == 5 |
|
1168 | ||
1169 | ||
1170 |
def test_oidclient_preferred_username_as_identifier_data_migration(migration): |
|
1171 |
app = 'authentic2_idp_oidc' |
|
1172 |
migrate_from = [(app, '0010_oidcclaim')] |
|
1173 |
migrate_to = [(app, '0011_auto_20180808_1546')] |
|
1174 | ||
1175 |
old_apps = migration.before(migrate_from) |
|
1176 |
OIDCClient = old_apps.get_model('authentic2_idp_oidc', 'OIDCClient') |
|
1177 |
OIDCClaim = old_apps.get_model('authentic2_idp_oidc', 'OIDCClaim') |
|
1178 | ||
1179 |
client1 = OIDCClient.objects.create(name='test', slug='test', redirect_uris='https://example.net/') |
|
1180 |
client2 = OIDCClient.objects.create(name='test1', slug='test1', redirect_uris='https://example.net/') |
|
1181 |
client3 = OIDCClient.objects.create(name='test2', slug='test2', redirect_uris='https://example.net/') |
|
1182 |
client4 = OIDCClient.objects.create(name='test3', slug='test3', redirect_uris='https://example.net/') |
|
1183 |
for client in (client1, client2, client3, client4): |
|
1184 |
if client.name == 'test1': |
|
1185 |
continue |
|
1186 |
if client.name == 'test3': |
|
1187 |
OIDCClaim.objects.create( |
|
1188 |
client=client, name='preferred_username', value='django_user_full_name', scopes='profile' |
|
1189 |
) |
|
1190 |
else: |
|
1191 |
OIDCClaim.objects.create( |
|
1192 |
client=client, name='preferred_username', value='django_user_username', scopes='profile' |
|
1193 |
) |
|
1194 |
OIDCClaim.objects.create( |
|
1195 |
client=client, name='given_name', value='django_user_first_name', scopes='profile' |
|
1196 |
) |
|
1197 |
OIDCClaim.objects.create( |
|
1198 |
client=client, name='family_name', value='django_user_last_name', scopes='profile' |
|
1199 |
) |
|
1200 |
if client.name == 'test2': |
|
1201 |
continue |
|
1202 |
OIDCClaim.objects.create(client=client, name='email', value='django_user_email', scopes='email') |
|
1203 |
OIDCClaim.objects.create( |
|
1204 |
client=client, name='email_verified', value='django_user_email_verified', scopes='email' |
|
1205 |
) |
|
1206 | ||
1207 |
new_apps = migration.apply(migrate_to) |
|
1208 |
OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient') |
|
1209 | ||
1210 |
client = OIDCClient.objects.first() |
|
1211 |
for client in OIDCClient.objects.all(): |
|
1212 |
claims = client.oidcclaim_set.all() |
|
1213 |
if client.name == 'test': |
|
1214 |
assert claims.count() == 5 |
|
1215 |
assert sorted(claims.values_list('name', flat=True)) == [ |
|
1216 |
'email', |
|
1217 |
'email_verified', |
|
1218 |
'family_name', |
|
1219 |
'given_name', |
|
1220 |
'preferred_username', |
|
1221 |
] |
|
1222 |
assert sorted(claims.values_list('value', flat=True)) == [ |
|
1223 |
'django_user_email', |
|
1224 |
'django_user_email_verified', |
|
1225 |
'django_user_first_name', |
|
1226 |
'django_user_identifier', |
|
1227 |
'django_user_last_name', |
|
1228 |
] |
|
1229 |
elif client.name == 'test2': |
|
1230 |
assert claims.count() == 3 |
|
1231 |
assert sorted(claims.values_list('name', flat=True)) == [ |
|
1232 |
'family_name', |
|
1233 |
'given_name', |
|
1234 |
'preferred_username', |
|
1235 |
] |
|
1236 |
assert sorted(claims.values_list('value', flat=True)) == [ |
|
1237 |
'django_user_first_name', |
|
1238 |
'django_user_last_name', |
|
1239 |
'django_user_username', |
|
1240 |
] |
|
1241 |
elif client.name == 'test3': |
|
1242 |
assert claims.count() == 5 |
|
1243 |
assert sorted(claims.values_list('name', flat=True)) == [ |
|
1244 |
'email', |
|
1245 |
'email_verified', |
|
1246 |
'family_name', |
|
1247 |
'given_name', |
|
1248 |
'preferred_username', |
|
1249 |
] |
|
1250 |
assert sorted(claims.values_list('value', flat=True)) == [ |
|
1251 |
'django_user_email', |
|
1252 |
'django_user_email_verified', |
|
1253 |
'django_user_first_name', |
|
1254 |
'django_user_full_name', |
|
1255 |
'django_user_last_name', |
|
1256 |
] |
|
1257 |
else: |
|
1258 |
assert claims.count() == 0 |
|
1259 | ||
1260 | ||
1261 |
def test_api_synchronization(app, oidc_client): |
|
1262 |
oidc_client.has_api_access = True |
|
1263 |
oidc_client.save() |
|
1264 |
users = [User.objects.create(username='user-%s' % i) for i in range(10)] |
|
1265 |
for user in users[5:]: |
|
1266 |
user.delete() |
|
1267 |
deleted_subs = set(make_sub(oidc_client, user) for user in users[5:]) |
|
1268 | ||
1269 |
app.authorization = ('Basic', (oidc_client.client_id, oidc_client.client_secret)) |
|
1270 |
status = 200 |
|
1271 |
if oidc_client.identifier_policy not in (OIDCClient.POLICY_PAIRWISE_REVERSIBLE, OIDCClient.POLICY_UUID): |
|
1272 |
status = 401 |
|
1273 |
response = app.post_json( |
|
1274 |
'/api/users/synchronization/', |
|
1275 |
params={'known_uuids': [make_sub(oidc_client, user) for user in users]}, |
|
1276 |
status=status, |
|
1277 |
) |
|
1278 |
if status == 200: |
|
1279 |
assert response.json['result'] == 1 |
|
1280 |
assert set(response.json['unknown_uuids']) == deleted_subs |
|
1281 | ||
1282 | ||
1283 | 999 |
def test_claim_default_value(oidc_settings, normal_oidc_client, simple_user, app): |
1284 | 1000 |
oidc_settings.A2_IDP_OIDC_SCOPES = ['openid', 'profile', 'email', 'phone'] |
1285 | 1001 |
Attribute.objects.create( |
... | ... | |
1922 | 1638 |
rf = RequestFactory() |
1923 | 1639 |
request = rf.get('/') |
1924 | 1640 |
assert good_next_url(request, 'https://example.com/') |
1925 | ||
1926 | ||
1927 |
@pytest.fixture |
|
1928 |
def access_token(client, simple_user): |
|
1929 |
return OIDCAccessToken.objects.create( |
|
1930 |
client=client, |
|
1931 |
user=simple_user, |
|
1932 |
scopes='openid profile email', |
|
1933 |
expired=now() + datetime.timedelta(seconds=3600), |
|
1934 |
) |
|
1935 | ||
1936 | ||
1937 |
def test_user_info(app, client, access_token, freezer): |
|
1938 |
def get_user_info(**kwargs): |
|
1939 |
return app.get( |
|
1940 |
'/idp/oidc/user_info/', headers=bearer_authentication_headers(access_token.uuid), **kwargs |
|
1941 |
) |
|
1942 | ||
1943 |
response = app.get('/idp/oidc/user_info/', status=401) |
|
1944 |
assert ( |
|
1945 |
response['WWW-Authenticate'] |
|
1946 |
== 'Bearer error="invalid_request", error_description="Bearer authentication is mandatory"' |
|
1947 |
) |
|
1948 | ||
1949 |
response = app.get('/idp/oidc/user_info/', headers={'Authorization': 'Bearer'}, status=401) |
|
1950 |
assert ( |
|
1951 |
response['WWW-Authenticate'] |
|
1952 |
== 'Bearer error="invalid_request", error_description="Invalid Bearer authentication"' |
|
1953 |
) |
|
1954 | ||
1955 |
response = get_user_info(status=200) |
|
1956 |
assert dict(response.json, sub='') == { |
|
1957 |
'email': 'user@example.net', |
|
1958 |
'email_verified': False, |
|
1959 |
'family_name': 'Dôe', |
|
1960 |
'family_name_verified': True, |
|
1961 |
'given_name': 'Jôhn', |
|
1962 |
'given_name_verified': True, |
|
1963 |
'preferred_username': 'user', |
|
1964 |
'sub': '', |
|
1965 |
} |
|
1966 | ||
1967 |
# token is expired |
|
1968 |
access_token.expired = now() - datetime.timedelta(seconds=1) |
|
1969 |
access_token.save() |
|
1970 |
response = get_user_info(status=401) |
|
1971 |
assert ( |
|
1972 |
response['WWW-Authenticate'] |
|
1973 |
== 'Bearer error="invalid_token", error_description="Token expired or user disconnected"' |
|
1974 |
) |
|
1975 | ||
1976 |
# token is unknown |
|
1977 |
access_token.delete() |
|
1978 |
response = get_user_info(status=401) |
|
1979 |
assert response['WWW-Authenticate'] == 'Bearer error="invalid_token", error_description="Token unknown"' |
|
1980 | ||
1981 |
utils.login(app, access_token.user) |
|
1982 |
access_token.expired = now() + datetime.timedelta(seconds=1) |
|
1983 |
access_token.session_key = app.session.session_key |
|
1984 |
access_token.save() |
|
1985 | ||
1986 |
get_user_info(status=200) |
|
1987 | ||
1988 |
app.session.flush() |
|
1989 |
response = get_user_info(status=401) |
|
1990 |
assert ( |
|
1991 |
response['WWW-Authenticate'] |
|
1992 |
== 'Bearer error="invalid_token", error_description="Token expired or user disconnected"' |
|
1993 |
) |
|
1994 | ||
1995 | ||
1996 |
@pytest.fixture |
|
1997 |
def session(settings, db, simple_user): |
|
1998 |
engine = import_module(settings.SESSION_ENGINE) |
|
1999 |
session = engine.SessionStore() |
|
2000 |
session['_auth_user_id'] = str(simple_user.id) |
|
2001 |
session.create() |
|
2002 |
return session |
|
2003 | ||
2004 | ||
2005 |
def test_access_token_is_valid_session(simple_oidc_client, simple_user, session): |
|
2006 |
token = OIDCAccessToken.objects.create( |
|
2007 |
client=simple_oidc_client, user=simple_user, scopes='openid', session_key=session.session_key |
|
2008 |
) |
|
2009 | ||
2010 |
assert token.is_valid() |
|
2011 |
session.flush() |
|
2012 |
assert not token.is_valid() |
|
2013 | ||
2014 | ||
2015 |
def test_access_token_is_valid_expired(simple_oidc_client, simple_user, freezer): |
|
2016 |
start = now() |
|
2017 |
expired = start + datetime.timedelta(seconds=30) |
|
2018 | ||
2019 |
token = OIDCAccessToken.objects.create( |
|
2020 |
client=simple_oidc_client, user=simple_user, scopes='openid', expired=expired |
|
2021 |
) |
|
2022 | ||
2023 |
assert token.is_valid() |
|
2024 |
freezer.move_to(expired) |
|
2025 |
assert token.is_valid() |
|
2026 |
freezer.move_to(expired + datetime.timedelta(seconds=1)) |
|
2027 |
assert not token.is_valid() |
|
2028 | ||
2029 | ||
2030 |
def test_access_token_is_valid_session_and_expired(simple_oidc_client, simple_user, session, freezer): |
|
2031 |
start = now() |
|
2032 |
expired = start + datetime.timedelta(seconds=30) |
|
2033 | ||
2034 |
token = OIDCAccessToken.objects.create( |
|
2035 |
client=simple_oidc_client, |
|
2036 |
user=simple_user, |
|
2037 |
scopes='openid', |
|
2038 |
session_key=session.session_key, |
|
2039 |
expired=expired, |
|
2040 |
) |
|
2041 | ||
2042 |
assert token.is_valid() |
|
2043 |
freezer.move_to(expired) |
|
2044 |
assert token.is_valid() |
|
2045 |
freezer.move_to(expired + datetime.timedelta(seconds=1)) |
|
2046 |
assert not token.is_valid() |
|
2047 |
freezer.move_to(start) |
|
2048 |
assert token.is_valid() |
|
2049 |
session.flush() |
|
2050 |
assert not token.is_valid() |
tests/idp_oidc/test_models.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2021 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import datetime |
|
18 | ||
19 |
from django.utils.timezone import now |
|
20 | ||
21 |
from authentic2.a2_rbac.utils import get_default_ou |
|
22 |
from authentic2_idp_oidc.models import OIDCAccessToken, OIDCAuthorization, OIDCClient, OIDCCode |
|
23 | ||
24 | ||
25 |
def test_expired_manager(db, simple_user): |
|
26 |
expired = now() - datetime.timedelta(seconds=1) |
|
27 |
not_expired = now() + datetime.timedelta(days=1) |
|
28 |
client = OIDCClient.objects.create( |
|
29 |
name='client', slug='client', ou=get_default_ou(), redirect_uris='https://example.com/' |
|
30 |
) |
|
31 |
OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=expired) |
|
32 |
OIDCAuthorization.objects.create(client=client, user=simple_user, scopes='openid', expired=not_expired) |
|
33 |
assert OIDCAuthorization.objects.count() == 2 |
|
34 |
OIDCAuthorization.objects.cleanup() |
|
35 |
assert OIDCAuthorization.objects.count() == 1 |
|
36 | ||
37 |
OIDCCode.objects.create( |
|
38 |
client=client, |
|
39 |
user=simple_user, |
|
40 |
scopes='openid', |
|
41 |
redirect_uri='https://example.com/', |
|
42 |
session_key='xxx', |
|
43 |
auth_time=now(), |
|
44 |
expired=expired, |
|
45 |
) |
|
46 |
OIDCCode.objects.create( |
|
47 |
client=client, |
|
48 |
user=simple_user, |
|
49 |
scopes='openid', |
|
50 |
redirect_uri='https://example.com/', |
|
51 |
session_key='xxx', |
|
52 |
auth_time=now(), |
|
53 |
expired=not_expired, |
|
54 |
) |
|
55 |
assert OIDCCode.objects.count() == 2 |
|
56 |
OIDCCode.objects.cleanup() |
|
57 |
assert OIDCCode.objects.count() == 1 |
|
58 | ||
59 |
OIDCAccessToken.objects.create( |
|
60 |
client=client, user=simple_user, scopes='openid', session_key='xxx', expired=expired |
|
61 |
) |
|
62 |
OIDCAccessToken.objects.create( |
|
63 |
client=client, user=simple_user, scopes='openid', session_key='xxx', expired=not_expired |
|
64 |
) |
|
65 |
assert OIDCAccessToken.objects.count() == 2 |
|
66 |
OIDCAccessToken.objects.cleanup() |
|
67 |
assert OIDCAccessToken.objects.count() == 1 |
|
68 | ||
69 | ||
70 |
def test_access_token_is_valid_session(simple_oidc_client, simple_user, session): |
|
71 |
token = OIDCAccessToken.objects.create( |
|
72 |
client=simple_oidc_client, user=simple_user, scopes='openid', session_key=session.session_key |
|
73 |
) |
|
74 | ||
75 |
assert token.is_valid() |
|
76 |
session.flush() |
|
77 |
assert not token.is_valid() |
|
78 | ||
79 | ||
80 |
def test_access_token_is_valid_expired(simple_oidc_client, simple_user, freezer): |
|
81 |
start = now() |
|
82 |
expired = start + datetime.timedelta(seconds=30) |
|
83 | ||
84 |
token = OIDCAccessToken.objects.create( |
|
85 |
client=simple_oidc_client, user=simple_user, scopes='openid', expired=expired |
|
86 |
) |
|
87 | ||
88 |
assert token.is_valid() |
|
89 |
freezer.move_to(expired) |
|
90 |
assert token.is_valid() |
|
91 |
freezer.move_to(expired + datetime.timedelta(seconds=1)) |
|
92 |
assert not token.is_valid() |
|
93 | ||
94 | ||
95 |
def test_access_token_is_valid_session_and_expired(simple_oidc_client, simple_user, session, freezer): |
|
96 |
start = now() |
|
97 |
expired = start + datetime.timedelta(seconds=30) |
|
98 | ||
99 |
token = OIDCAccessToken.objects.create( |
|
100 |
client=simple_oidc_client, |
|
101 |
user=simple_user, |
|
102 |
scopes='openid', |
|
103 |
session_key=session.session_key, |
|
104 |
expired=expired, |
|
105 |
) |
|
106 | ||
107 |
assert token.is_valid() |
|
108 |
freezer.move_to(expired) |
|
109 |
assert token.is_valid() |
|
110 |
freezer.move_to(expired + datetime.timedelta(seconds=1)) |
|
111 |
assert not token.is_valid() |
|
112 |
freezer.move_to(start) |
|
113 |
assert token.is_valid() |
|
114 |
session.flush() |
|
115 |
assert not token.is_valid() |
tests/idp_oidc/test_views.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2021 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import datetime |
|
18 | ||
19 |
from django.utils.timezone import now |
|
20 | ||
21 |
from authentic2_idp_oidc.models import OIDCAccessToken |
|
22 | ||
23 |
from .. import utils |
|
24 |
from .conftest import bearer_authentication_headers |
|
25 | ||
26 | ||
27 |
def test_user_info(app, client, freezer, simple_user): |
|
28 |
access_token = OIDCAccessToken.objects.create( |
|
29 |
client=client, |
|
30 |
user=simple_user, |
|
31 |
scopes='openid profile email', |
|
32 |
expired=now() + datetime.timedelta(seconds=3600), |
|
33 |
) |
|
34 | ||
35 |
def get_user_info(**kwargs): |
|
36 |
return app.get( |
|
37 |
'/idp/oidc/user_info/', headers=bearer_authentication_headers(access_token.uuid), **kwargs |
|
38 |
) |
|
39 | ||
40 |
response = app.get('/idp/oidc/user_info/', status=401) |
|
41 |
assert ( |
|
42 |
response['WWW-Authenticate'] |
|
43 |
== 'Bearer error="invalid_request", error_description="Bearer authentication is mandatory"' |
|
44 |
) |
|
45 | ||
46 |
response = app.get('/idp/oidc/user_info/', headers={'Authorization': 'Bearer'}, status=401) |
|
47 |
assert ( |
|
48 |
response['WWW-Authenticate'] |
|
49 |
== 'Bearer error="invalid_request", error_description="Invalid Bearer authentication"' |
|
50 |
) |
|
51 | ||
52 |
response = get_user_info(status=200) |
|
53 |
assert dict(response.json, sub='') == { |
|
54 |
'email': 'user@example.net', |
|
55 |
'email_verified': False, |
|
56 |
'family_name': 'Dôe', |
|
57 |
'family_name_verified': True, |
|
58 |
'given_name': 'Jôhn', |
|
59 |
'given_name_verified': True, |
|
60 |
'preferred_username': 'user', |
|
61 |
'sub': '', |
|
62 |
} |
|
63 | ||
64 |
# token is expired |
|
65 |
access_token.expired = now() - datetime.timedelta(seconds=1) |
|
66 |
access_token.save() |
|
67 |
response = get_user_info(status=401) |
|
68 |
assert ( |
|
69 |
response['WWW-Authenticate'] |
|
70 |
== 'Bearer error="invalid_token", error_description="Token expired or user disconnected"' |
|
71 |
) |
|
72 | ||
73 |
# token is unknown |
|
74 |
access_token.delete() |
|
75 |
response = get_user_info(status=401) |
|
76 |
assert response['WWW-Authenticate'] == 'Bearer error="invalid_token", error_description="Token unknown"' |
|
77 | ||
78 |
utils.login(app, access_token.user) |
|
79 |
access_token.expired = now() + datetime.timedelta(seconds=1) |
|
80 |
access_token.session_key = app.session.session_key |
|
81 |
access_token.save() |
|
82 | ||
83 |
get_user_info(status=200) |
|
84 | ||
85 |
app.session.flush() |
|
86 |
response = get_user_info(status=401) |
|
87 |
assert ( |
|
88 |
response['WWW-Authenticate'] |
|
89 |
== 'Bearer error="invalid_token", error_description="Token expired or user disconnected"' |
|
90 |
) |
|
0 |
- |