0001-idp_oidc-make-user-info-depend-on-profile-choice-dur.patch
src/authentic2/app_settings.py | ||
---|---|---|
333 | 333 |
default=250, |
334 | 334 |
definition='Maximum number of mails to send per period', |
335 | 335 |
), |
336 |
A2_USER_PROFILE_MANAGEMENT=Setting( |
|
337 |
default=False, |
|
338 |
definition='Activate user profiles for juridical entity management.', |
|
339 |
), |
|
336 | 340 |
) |
337 | 341 | |
338 | 342 |
app_settings = AppSettings(default_settings) |
src/authentic2/custom_user/migrations/0031_profile_email.py | ||
---|---|---|
1 |
# Generated by Django 2.2.24 on 2022-02-15 11:39 |
|
2 | ||
3 |
from django.db import migrations, models |
|
4 | ||
5 | ||
6 |
class Migration(migrations.Migration): |
|
7 | ||
8 |
dependencies = [ |
|
9 |
('custom_user', '0030_auto_20220222_1028'), |
|
10 |
] |
|
11 | ||
12 |
operations = [ |
|
13 |
migrations.AddField( |
|
14 |
model_name='profile', |
|
15 |
name='email', |
|
16 |
field=models.EmailField(blank=True, max_length=254, verbose_name='profile email address'), |
|
17 |
), |
|
18 |
] |
src/authentic2/custom_user/models.py | ||
---|---|---|
520 | 520 |
to=User, verbose_name=_('user'), related_name='subprofiles', on_delete=models.CASCADE |
521 | 521 |
) |
522 | 522 |
identifier = models.CharField(max_length=256, verbose_name=_('profile identifier'), default='') |
523 |
email = models.EmailField(blank=True, max_length=254, verbose_name=_('profile email address')) |
|
523 | 524 |
data = JSONField(verbose_name=_('profile data'), null=True, blank=True) |
524 | 525 | |
525 | 526 |
class Meta: |
src/authentic2_idp_oidc/app_settings.py | ||
---|---|---|
79 | 79 |
], |
80 | 80 |
) |
81 | 81 | |
82 |
@property |
|
83 |
def PROFILE_OVERRIDE_MAPPING(self): |
|
84 |
return self._setting('PROFILE_OVERRIDE_MAPPING', {'email': 'email'}) |
|
85 | ||
82 | 86 | |
83 | 87 |
app_settings = AppSettings('A2_IDP_OIDC_') |
84 | 88 |
app_settings.__name__ = __name__ |
src/authentic2_idp_oidc/apps.py | ||
---|---|---|
88 | 88 |
sub = smart_bytes(view.kwargs[lookup_url_kwarg]) |
89 | 89 |
decrypted = utils.reverse_pairwise_sub(client, sub) |
90 | 90 |
if decrypted: |
91 |
view.kwargs[lookup_url_kwarg] = uuid.UUID(bytes=decrypted).hex |
|
91 |
view.kwargs[lookup_url_kwarg] = uuid.UUID(bytes=decrypted.split(b'#')[0]).hex
|
|
92 | 92 | |
93 | 93 |
def a2_hook_api_modify_serializer_after_validation(self, view, serializer): |
94 | 94 |
import uuid |
... | ... | |
111 | 111 |
for u in serializer.validated_data['known_uuids']: |
112 | 112 |
decrypted = utils.reverse_pairwise_sub(client, smart_bytes(u)) |
113 | 113 |
if decrypted: |
114 |
new_known_uuid = uuid.UUID(bytes=decrypted).hex |
|
114 |
new_known_uuid = uuid.UUID(bytes=decrypted.split(b'#')[0]).hex
|
|
115 | 115 |
new_known_uuids.append(new_known_uuid) |
116 | 116 |
uuid_map[new_known_uuid] = u |
117 | 117 |
else: |
src/authentic2_idp_oidc/migrations/0015_oidccode_profile.py | ||
---|---|---|
1 |
# Generated by Django 2.2.24 on 2022-02-15 09:50 |
|
2 | ||
3 |
import django.db.models.deletion |
|
4 |
from django.db import migrations, models |
|
5 | ||
6 | ||
7 |
class Migration(migrations.Migration): |
|
8 | ||
9 |
dependencies = [ |
|
10 |
('custom_user', '0030_auto_20220222_1028'), |
|
11 |
('authentic2_idp_oidc', '0014_auto_20201126_1812'), |
|
12 |
] |
|
13 | ||
14 |
operations = [ |
|
15 |
migrations.AddField( |
|
16 |
model_name='oidccode', |
|
17 |
name='profile', |
|
18 |
field=models.ForeignKey( |
|
19 |
null=True, |
|
20 |
on_delete=django.db.models.deletion.CASCADE, |
|
21 |
to='custom_user.Profile', |
|
22 |
verbose_name='user selected profile', |
|
23 |
), |
|
24 |
), |
|
25 |
] |
src/authentic2_idp_oidc/models.py | ||
---|---|---|
27 | 27 |
from django.utils.translation import ugettext_lazy as _ |
28 | 28 | |
29 | 29 |
from authentic2.a2_rbac.models import OrganizationalUnit |
30 |
from authentic2.custom_user.models import Profile |
|
30 | 31 |
from authentic2.models import Service |
31 | 32 | |
32 | 33 |
from . import app_settings, managers, utils |
... | ... | |
291 | 292 |
uuid = models.CharField(max_length=128, verbose_name=_('uuid'), default=generate_uuid) |
292 | 293 |
client = models.ForeignKey(to=OIDCClient, verbose_name=_('client'), on_delete=models.CASCADE) |
293 | 294 |
user = models.ForeignKey(to=settings.AUTH_USER_MODEL, verbose_name=_('user'), on_delete=models.CASCADE) |
295 |
profile = models.ForeignKey( |
|
296 |
to=Profile, verbose_name=_('user selected profile'), null=True, on_delete=models.CASCADE |
|
297 |
) |
|
294 | 298 |
scopes = models.TextField(verbose_name=_('scopes')) |
295 | 299 |
state = models.TextField(null=True, verbose_name=_('state')) |
296 | 300 |
nonce = models.TextField(null=True, verbose_name=_('nonce')) |
src/authentic2_idp_oidc/templates/authentic2_idp_oidc/authorization.html | ||
---|---|---|
16 | 16 |
{% endfor %} |
17 | 17 |
</ul> |
18 | 18 |
{% endif %} |
19 |
{% if needs_profile_validation %} |
|
20 |
<p>{% trans "Additionally, you may authenticate as owner of the following juridical entity management profile, which may change the aforementioned information:" %}</p> |
|
21 |
<div id="profile-validation-none"> |
|
22 |
<input type="radio" id="_none" name="profile-validation" value=""> |
|
23 |
<label for="_none">{% trans "Keep my default user profile." %}</label> |
|
24 |
</div> |
|
25 |
{% for profile in user.subprofiles.all %} |
|
26 |
<div id="profile-validation-{{ profile.id }}"> |
|
27 |
<input type="radio" id="{{ profile.id }}" name="profile-validation" value="{{ profile.id }}"> |
|
28 |
<label for="{{ profile.id }}"> |
|
29 |
{% blocktrans with type_name=profile.profile_type.name identifier=profile.identifier %}Profile of type {{ type_name }}: {{ identifier }}{% endblocktrans %} |
|
30 |
{% if profile.email %} |
|
31 |
{% blocktrans with email=profile.email %} Email address: {{ email }}{% endblocktrans %} |
|
32 |
{% endif %}</label> |
|
33 |
</div> |
|
34 |
{% endfor %} |
|
35 |
{% endif %} |
|
19 | 36 |
{% csrf_token %} |
20 | 37 |
<p class="a2-oidc-authorization-form--do-not-ask-again"> |
21 | 38 |
<label for="id_do_not_ask_again"><input id="id_do_not_ask_again" type="checkbox" name="do_not_ask_again" value="on"/><span>{% trans "Do not ask again" %}</span></label> |
src/authentic2_idp_oidc/utils.py | ||
---|---|---|
120 | 120 |
return urllib.parse.urlparse(url).netloc.split(':')[0] |
121 | 121 | |
122 | 122 | |
123 |
def make_sub(client, user): |
|
123 |
def make_sub(client, user, profile=None):
|
|
124 | 124 |
if client.identifier_policy in (client.POLICY_PAIRWISE, client.POLICY_PAIRWISE_REVERSIBLE): |
125 |
return make_pairwise_sub(client, user) |
|
125 |
return make_pairwise_sub(client, user, profile=profile)
|
|
126 | 126 |
elif client.identifier_policy == client.POLICY_UUID: |
127 | 127 |
return force_text(user.uuid) |
128 | 128 |
elif client.identifier_policy == client.POLICY_EMAIL: |
... | ... | |
131 | 131 |
raise NotImplementedError |
132 | 132 | |
133 | 133 | |
134 |
def make_pairwise_sub(client, user): |
|
134 |
def make_pairwise_sub(client, user, profile=None):
|
|
135 | 135 |
'''Make a pairwise sub''' |
136 | 136 |
if client.identifier_policy == client.POLICY_PAIRWISE: |
137 |
return make_pairwise_unreversible_sub(client, user) |
|
137 |
return make_pairwise_unreversible_sub(client, user, profile=profile)
|
|
138 | 138 |
elif client.identifier_policy == client.POLICY_PAIRWISE_REVERSIBLE: |
139 |
return make_pairwise_reversible_sub(client, user) |
|
139 |
return make_pairwise_reversible_sub(client, user, profile=profile)
|
|
140 | 140 |
else: |
141 | 141 |
raise NotImplementedError('unknown pairwise client.identifier_policy %s' % client.identifier_policy) |
142 | 142 | |
143 | 143 | |
144 |
def make_pairwise_unreversible_sub(client, user): |
|
144 |
def make_pairwise_unreversible_sub(client, user, profile=None):
|
|
145 | 145 |
sector_identifier = client.get_sector_identifier() |
146 | 146 |
sub = sector_identifier + str(user.uuid) + settings.SECRET_KEY |
147 |
if profile: |
|
148 |
sub += str(profile.id) |
|
147 | 149 |
sub = base64.b64encode(hashlib.sha256(sub.encode('utf-8')).digest()) |
148 | 150 |
return sub.decode('utf-8') |
149 | 151 | |
150 | 152 | |
151 |
def make_pairwise_reversible_sub(client, user): |
|
152 |
return make_pairwise_reversible_sub_from_uuid(client, user.uuid) |
|
153 |
def make_pairwise_reversible_sub(client, user, profile=None):
|
|
154 |
return make_pairwise_reversible_sub_from_uuid(client, user.uuid, profile=profile)
|
|
153 | 155 | |
154 | 156 | |
155 |
def make_pairwise_reversible_sub_from_uuid(client, user_uuid): |
|
157 |
def make_pairwise_reversible_sub_from_uuid(client, user_uuid, profile=None):
|
|
156 | 158 |
try: |
157 | 159 |
identifier = uuid.UUID(user_uuid).bytes |
158 | 160 |
except ValueError: |
159 | 161 |
return None |
160 | 162 |
sector_identifier = client.get_sector_identifier() |
161 |
return crypto.aes_base64url_deterministic_encrypt( |
|
162 |
settings.SECRET_KEY.encode('utf-8'), identifier, sector_identifier |
|
163 |
).decode('utf-8') |
|
163 |
cipher_args = [ |
|
164 |
settings.SECRET_KEY.encode('utf-8'), |
|
165 |
identifier, |
|
166 |
sector_identifier, |
|
167 |
] |
|
168 |
if profile: |
|
169 |
cipher_args[1] += '#%s' % str(profile.id) |
|
170 |
return crypto.aes_base64url_deterministic_encrypt(*cipher_args).decode('utf-8') |
|
164 | 171 | |
165 | 172 | |
166 | 173 |
def reverse_pairwise_sub(client, sub): |
... | ... | |
183 | 190 |
return str(value) |
184 | 191 | |
185 | 192 | |
186 |
def create_user_info(request, client, user, scope_set, id_token=False): |
|
193 |
def create_user_info(request, client, user, scope_set, id_token=False, profile=None):
|
|
187 | 194 |
'''Create user info dictionary''' |
188 | 195 |
user_info = {} |
189 | 196 |
if 'openid' in scope_set: |
190 |
user_info['sub'] = make_sub(client, user) |
|
197 |
user_info['sub'] = make_sub(client, user, profile=profile)
|
|
191 | 198 |
attributes = get_attributes( |
192 | 199 |
{ |
193 | 200 |
'user': user, |
... | ... | |
240 | 247 |
]: |
241 | 248 |
default_value = '' |
242 | 249 |
user_info[claim.name] = default_value |
250 |
if profile: |
|
251 |
for attr, userinfo_key in app_settings.PROFILE_OVERRIDE_MAPPING.items(): |
|
252 |
if getattr(profile, attr, None) and userinfo_key in user_info: |
|
253 |
user_info[userinfo_key] = getattr(profile, attr) |
|
243 | 254 |
hooks.call_hooks('idp_oidc_modify_user_info', client, user, scope_set, user_info) |
244 | 255 |
return user_info |
245 | 256 |
src/authentic2_idp_oidc/views.py | ||
---|---|---|
38 | 38 |
from authentic2 import app_settings as a2_app_settings |
39 | 39 |
from authentic2 import hooks |
40 | 40 |
from authentic2.a2_rbac.models import OrganizationalUnit |
41 |
from authentic2.custom_user.models import Profile |
|
41 | 42 |
from authentic2.decorators import setting_enabled |
42 | 43 |
from authentic2.exponential_retry_timeout import ExponentialRetryTimeout |
43 | 44 |
from authentic2.utils.misc import last_authentication_event, login_require, make_url, redirect |
... | ... | |
359 | 360 | |
360 | 361 |
iat = now() # iat = issued at |
361 | 362 | |
363 |
needs_profile_validation = False |
|
364 |
profile = None |
|
365 |
if request.user.subprofiles.count() and a2_app_settings.A2_USER_PROFILE_MANAGEMENT: |
|
366 |
needs_profile_validation = True |
|
362 | 367 |
if client.authorization_mode != client.AUTHORIZATION_MODE_NONE or 'consent' in prompt: |
363 | 368 |
# authorization by user is mandatory, as per local configuration or per explicit request by |
364 | 369 |
# the RP |
... | ... | |
381 | 386 |
authorized_scopes = set() |
382 | 387 |
for authorization in qs: |
383 | 388 |
authorized_scopes |= authorization.scope_set() |
384 |
if (authorized_scopes & scopes) < scopes: |
|
389 |
if (authorized_scopes & scopes) < scopes or needs_profile_validation:
|
|
385 | 390 |
if 'none' in prompt: |
386 | 391 |
raise ConsentRequired(_('Consent is required but prompt parameter is "none"')) |
387 | 392 |
if request.method == 'POST': |
393 |
if request.POST.get('profile-validation', ''): |
|
394 |
try: |
|
395 |
profile = Profile.objects.get( |
|
396 |
user=request.user, |
|
397 |
id=request.POST['profile-validation'], |
|
398 |
) |
|
399 |
except Profile.DoesNotExist: |
|
400 |
pass |
|
388 | 401 |
if 'accept' in request.POST: |
389 | 402 |
if 'do_not_ask_again' in request.POST: |
390 | 403 |
pk_to_deletes = [] |
... | ... | |
417 | 430 |
request, |
418 | 431 |
'authentic2_idp_oidc/authorization.html', |
419 | 432 |
{ |
433 |
'needs_profile_validation': needs_profile_validation, |
|
420 | 434 |
'client': client, |
421 | 435 |
'scopes': scopes - {'openid'}, |
422 | 436 |
}, |
... | ... | |
425 | 439 |
code = models.OIDCCode.objects.create( |
426 | 440 |
client=client, |
427 | 441 |
user=request.user, |
442 |
profile=profile, |
|
428 | 443 |
scopes=' '.join(scopes), |
429 | 444 |
state=state, |
430 | 445 |
nonce=nonce, |
... | ... | |
721 | 736 |
): |
722 | 737 |
acr = '1' |
723 | 738 |
# prefill id_token with user info |
724 |
id_token = utils.create_user_info(request, client, oidc_code.user, oidc_code.scope_set(), id_token=True) |
|
739 |
id_token = utils.create_user_info( |
|
740 |
request, |
|
741 |
client, |
|
742 |
oidc_code.user, |
|
743 |
oidc_code.scope_set(), |
|
744 |
id_token=True, |
|
745 |
profile=oidc_code.profile, |
|
746 |
) |
|
725 | 747 |
exp = start + idtoken_duration(client) |
726 | 748 |
id_token.update( |
727 | 749 |
{ |
728 | 750 |
'iss': utils.get_issuer(request), |
729 |
'sub': utils.make_sub(client, oidc_code.user), |
|
751 |
'sub': utils.make_sub(client, oidc_code.user, profile=oidc_code.profile),
|
|
730 | 752 |
'aud': client.client_id, |
731 | 753 |
'exp': int(exp.timestamp()), |
732 | 754 |
'iat': int(start.timestamp()), |
tests/idp_oidc/conftest.py | ||
---|---|---|
75 | 75 |
return settings |
76 | 76 | |
77 | 77 | |
78 |
@pytest.fixture |
|
79 |
def profile_settings(settings, jwkset): |
|
80 |
settings.A2_IDP_OIDC_JWKSET = jwkset |
|
81 |
settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT = '100/m' |
|
82 |
settings.A2_USER_PROFILE_MANAGEMENT = True |
|
83 |
return settings |
|
84 | ||
85 | ||
78 | 86 |
def make_client(app, superuser, params=None): |
79 | 87 |
Attribute.objects.create( |
80 | 88 |
name='cityscape_image', |
tests/idp_oidc/test_user_profiles.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2022 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import base64 |
|
18 |
import json |
|
19 |
import urllib.parse |
|
20 | ||
21 |
import pytest |
|
22 |
from django.contrib.auth import get_user_model |
|
23 |
from django.urls import reverse |
|
24 |
from django.utils.encoding import force_text |
|
25 |
from django.utils.timezone import now |
|
26 |
from jwcrypto.jwk import JWK |
|
27 |
from jwcrypto.jwt import JWT |
|
28 | ||
29 |
from authentic2.custom_user.models import Profile, ProfileType |
|
30 |
from authentic2.utils.misc import make_url |
|
31 |
from authentic2_idp_oidc.models import OIDCCode |
|
32 |
from authentic2_idp_oidc.utils import make_sub |
|
33 | ||
34 |
from .. import utils |
|
35 |
from .conftest import client_authentication_headers |
|
36 | ||
37 |
User = get_user_model() |
|
38 | ||
39 |
pytestmark = pytest.mark.django_db |
|
40 | ||
41 | ||
42 |
@pytest.fixture |
|
43 |
def profile_user(): |
|
44 |
user = User.objects.create( |
|
45 |
first_name='Foo', |
|
46 |
last_name='Bar', |
|
47 |
username='foobar', |
|
48 |
email='foobar@example.org', |
|
49 |
) |
|
50 |
profile_type_manager = ProfileType.objects.create( |
|
51 |
name='One Manager Type', |
|
52 |
slug='one-manager-type', |
|
53 |
) |
|
54 |
profile_type_delegate = ProfileType.objects.create( |
|
55 |
name='One Delegate Type', |
|
56 |
slug='one-delegate-type', |
|
57 |
) |
|
58 |
Profile.objects.create( |
|
59 |
user=user, |
|
60 |
profile_type=profile_type_manager, |
|
61 |
identifier='Entity 789', |
|
62 |
email='manager@example789.org', |
|
63 |
) |
|
64 |
Profile.objects.create( |
|
65 |
user=user, |
|
66 |
profile_type=profile_type_delegate, |
|
67 |
identifier='Entity 1011', |
|
68 |
email='delegate@example1011.org', |
|
69 |
) |
|
70 |
user.clear_password = 'foobar' |
|
71 |
user.set_password('foobar') |
|
72 |
user.save() |
|
73 |
return user |
|
74 | ||
75 | ||
76 |
def test_admin_base_models(app, superuser, simple_user, profile_settings): |
|
77 |
url = reverse('admin:custom_user_profiletype_add') |
|
78 |
assert ProfileType.objects.count() == 0 |
|
79 |
response = utils.login(app, superuser, path=url) |
|
80 |
response.form.set('name', 'Manager') |
|
81 |
response.form.set('slug', 'manager') |
|
82 |
response = response.form.submit(name='_save').follow() |
|
83 |
assert ProfileType.objects.count() == 1 |
|
84 | ||
85 |
response = app.get(url) |
|
86 |
response.form.set('name', 'Delegate') |
|
87 |
response.form.set('slug', 'delegate') |
|
88 |
response = response.form.submit(name='_save').follow() |
|
89 |
assert ProfileType.objects.count() == 2 |
|
90 | ||
91 |
url = reverse('admin:custom_user_profile_add') |
|
92 |
assert Profile.objects.count() == 0 |
|
93 |
response = app.get(url) |
|
94 |
response.form.set('user', simple_user.id) |
|
95 |
response.form.set('profile_type', ProfileType.objects.first().pk) |
|
96 |
response.form.set('email', 'john.doe@example.org') |
|
97 |
response.form.set('identifier', 'Entity 0123') |
|
98 |
response = response.form.submit(name='_save').follow() |
|
99 |
assert Profile.objects.count() == 1 |
|
100 | ||
101 |
response = app.get(url) |
|
102 |
response.form.set('user', simple_user.id) |
|
103 |
response.form.set('profile_type', ProfileType.objects.last().pk) |
|
104 |
response.form.set('email', 'john.doe@anotherexample.org') |
|
105 |
response.form.set('identifier', 'Entity 5678') |
|
106 |
response = response.form.submit(name='_save').follow() |
|
107 |
assert Profile.objects.count() == 2 |
|
108 | ||
109 | ||
110 |
def test_login_profiles_absent(app, oidc_client, simple_user, profile_settings): |
|
111 |
redirect_uri = oidc_client.redirect_uris.split()[0] |
|
112 |
params = { |
|
113 |
'client_id': oidc_client.client_id, |
|
114 |
'scope': 'openid profile email', |
|
115 |
'redirect_uri': redirect_uri, |
|
116 |
'state': 'xxx', |
|
117 |
'nonce': 'yyy', |
|
118 |
'login_hint': 'backoffice john@example.com', |
|
119 |
'response_type': 'code', |
|
120 |
} |
|
121 |
authorize_url = make_url('oidc-authorize', params=params) |
|
122 |
utils.login(app, simple_user) |
|
123 |
response = app.get(authorize_url) |
|
124 |
assert 'a2-oidc-authorization-form' in response.text |
|
125 |
# not interface changes for users without a profile |
|
126 |
assert not 'profile-validation-' in response.text |
|
127 | ||
128 | ||
129 |
def test_login_profile_selection(app, oidc_client, profile_user, profile_settings): |
|
130 |
oidc_client.idtoken_algo = oidc_client.ALGO_HMAC |
|
131 |
oidc_client.save() |
|
132 |
redirect_uri = oidc_client.redirect_uris.split()[0] |
|
133 |
params = { |
|
134 |
'client_id': oidc_client.client_id, |
|
135 |
'scope': 'openid profile email', |
|
136 |
'redirect_uri': redirect_uri, |
|
137 |
'state': 'xxx', |
|
138 |
'nonce': 'yyy', |
|
139 |
'login_hint': 'backoffice john@example.com', |
|
140 |
'response_type': 'code', |
|
141 |
} |
|
142 |
assert profile_user.subprofiles.count() == 2 |
|
143 | ||
144 |
authorize_url = make_url('oidc-authorize', params=params) |
|
145 |
utils.login(app, profile_user) |
|
146 |
response = app.get(authorize_url) |
|
147 |
assert 'a2-oidc-authorization-form' in response.text |
|
148 |
assert 'profile-validation-' in response.text |
|
149 |
response.form.set('profile-validation', profile_user.subprofiles.first().id) |
|
150 |
response = response.form.submit('accept') |
|
151 |
assert OIDCCode.objects.count() == 1 |
|
152 |
code = OIDCCode.objects.get() |
|
153 |
assert code.client == oidc_client |
|
154 |
assert code.user == profile_user |
|
155 |
assert code.profile == profile_user.subprofiles.first() |
|
156 |
assert code.scope_set() == set('openid profile email'.split()) |
|
157 |
assert code.state == 'xxx' |
|
158 |
assert code.nonce == 'yyy' |
|
159 |
assert code.redirect_uri == redirect_uri |
|
160 |
assert code.session_key == app.session.session_key |
|
161 |
assert code.auth_time <= now() |
|
162 |
assert code.expired >= now() |
|
163 |
assert response['Location'].startswith(redirect_uri) |
|
164 |
location = urllib.parse.urlparse(response['Location']) |
|
165 |
query = urllib.parse.parse_qs(location.query) |
|
166 |
assert set(query.keys()) == {'code', 'state'} |
|
167 |
assert query['code'] == [code.uuid] |
|
168 |
code = query['code'][0] |
|
169 |
assert query['state'] == ['xxx'] |
|
170 | ||
171 |
token_url = make_url('oidc-token') |
|
172 |
response = app.post( |
|
173 |
token_url, |
|
174 |
params={ |
|
175 |
'grant_type': 'authorization_code', |
|
176 |
'code': code, |
|
177 |
'redirect_uri': oidc_client.redirect_uris.split()[0], |
|
178 |
}, |
|
179 |
headers=client_authentication_headers(oidc_client), |
|
180 |
) |
|
181 |
assert 'error' not in response.json |
|
182 |
assert 'access_token' in response.json |
|
183 |
assert 'expires_in' in response.json |
|
184 |
assert 'id_token' in response.json |
|
185 |
assert response.json['token_type'] == 'Bearer' |
|
186 |
access_token = response.json['access_token'] |
|
187 |
assert access_token |
|
188 |
id_token = response.json['id_token'] |
|
189 |
k = base64.b64encode(oidc_client.client_secret.encode('utf-8')) |
|
190 |
key = JWK(kty='oct', k=force_text(k)) |
|
191 |
algs = ['HS256'] |
|
192 |
jwt = JWT(jwt=id_token, key=key, algs=algs) |
|
193 |
claims = json.loads(jwt.claims) |
|
194 | ||
195 |
# check subject identifier substitution: |
|
196 |
assert claims['sub'] != make_sub(oidc_client, profile_user) |
|
197 |
assert claims['sub'] == make_sub(oidc_client, profile_user, profile=profile_user.subprofiles.first()) |
|
198 | ||
199 |
# check email substitution |
|
200 |
assert claims['email'] != profile_user.email |
|
201 |
assert claims['email'] == profile_user.subprofiles.first().email |
|
0 |
- |