19 |
19 |
import os
|
20 |
20 |
import pytest
|
21 |
21 |
import json
|
|
22 |
import logging
|
22 |
23 |
import time
|
23 |
24 |
import random
|
24 |
25 |
|
... | ... | |
55 |
56 |
base64url_decode('x')
|
56 |
57 |
base64url_decode('aa')
|
57 |
58 |
|
58 |
|
header_rsa_decoded = {'alg': 'RS256', 'kid': '1e9gdk7'}
|
|
59 |
KID = '1e9gdk7'
|
|
60 |
header_rsa_decoded = {'alg': 'RS256', 'kid': KID}
|
59 |
61 |
header_hmac_decoded = {'alg': 'HS256'}
|
60 |
62 |
payload_decoded = {
|
61 |
63 |
'sub': '248289761001',
|
... | ... | |
101 |
103 |
|
102 |
104 |
@pytest.fixture
|
103 |
105 |
def oidc_provider_jwkset():
|
104 |
|
key = JWK.generate(kty='RSA', size=512, kid='1e9gdk7')
|
|
106 |
key = JWK.generate(kty='RSA', size=512, kid=KID)
|
105 |
107 |
jwkset = JWKSet()
|
106 |
108 |
jwkset.add(key)
|
107 |
109 |
return jwkset
|
... | ... | |
109 |
111 |
OIDC_PROVIDER_PARAMS = [
|
110 |
112 |
{},
|
111 |
113 |
{
|
112 |
|
'idtoken_algo': OIDCProvider.ALGO_HMAC
|
|
114 |
'idtoken_algo': OIDCProvider.ALGO_HMAC,
|
113 |
115 |
},
|
114 |
116 |
{
|
115 |
117 |
'claims_parameter_supported': True,
|
... | ... | |
119 |
121 |
|
120 |
122 |
@pytest.fixture(params=OIDC_PROVIDER_PARAMS)
|
121 |
123 |
def oidc_provider(request, db, oidc_provider_jwkset):
|
122 |
|
idtoken_algo = request.param.get('idtoken_algo', OIDCProvider.ALGO_RSA)
|
123 |
124 |
claims_parameter_supported = request.param.get('claims_parameter_supported', False)
|
|
125 |
idtoken_algo = request.param.get('idtoken_algo', OIDCProvider.ALGO_RSA)
|
|
126 |
|
|
127 |
return make_oidc_provider(
|
|
128 |
idtoken_algo=idtoken_algo,
|
|
129 |
jwkset=oidc_provider_jwkset,
|
|
130 |
claims_parameter_supported=claims_parameter_supported)
|
|
131 |
|
|
132 |
|
|
133 |
@pytest.fixture
|
|
134 |
def oidc_provider_rsa(request, db, oidc_provider_jwkset):
|
|
135 |
return make_oidc_provider(
|
|
136 |
idtoken_algo=OIDCProvider.ALGO_RSA,
|
|
137 |
jwkset=oidc_provider_jwkset)
|
|
138 |
|
|
139 |
|
|
140 |
def make_oidc_provider(
|
|
141 |
idtoken_algo=OIDCProvider._meta.get_field('idtoken_algo').default,
|
|
142 |
jwkset=None,
|
|
143 |
claims_parameter_supported=False):
|
124 |
144 |
from authentic2_auth_oidc.utils import get_provider, get_provider_by_issuer
|
125 |
145 |
get_provider.cache.clear()
|
126 |
146 |
get_provider_by_issuer.cache.clear()
|
127 |
|
if idtoken_algo == OIDCProvider.ALGO_RSA:
|
128 |
|
jwkset = json.loads(oidc_provider_jwkset.export())
|
129 |
|
else:
|
130 |
|
jwkset = None
|
|
147 |
|
|
148 |
if jwkset is not None:
|
|
149 |
jwkset = json.loads(jwkset.export())
|
|
150 |
|
131 |
151 |
provider = OIDCProvider.objects.create(
|
132 |
152 |
ou=get_default_ou(),
|
133 |
153 |
name='OIDIDP',
|
... | ... | |
182 |
202 |
return 'xxxx'
|
183 |
203 |
|
184 |
204 |
|
185 |
|
@pytest.fixture
|
186 |
205 |
def header(oidc_provider):
|
187 |
206 |
if oidc_provider.idtoken_algo == OIDCProvider.ALGO_RSA:
|
188 |
207 |
return header_rsa
|
... | ... | |
190 |
209 |
return header_hmac
|
191 |
210 |
|
192 |
211 |
|
193 |
|
@pytest.fixture
|
194 |
212 |
def signature(oidc_provider):
|
195 |
213 |
if oidc_provider.idtoken_algo == OIDCProvider.ALGO_RSA:
|
196 |
|
key = oidc_provider.jwkset.get_key(kid='1e9gdk7')
|
|
214 |
key = oidc_provider.jwkset.get_key(kid=KID)
|
197 |
215 |
header_decoded = header_rsa_decoded
|
198 |
216 |
elif oidc_provider.idtoken_algo == OIDCProvider.ALGO_HMAC:
|
199 |
217 |
key = JWK(kty='oct',
|
... | ... | |
206 |
224 |
|
207 |
225 |
|
208 |
226 |
def oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token=None,
|
209 |
|
extra_user_info=None, sub='john.doe', nonce=None):
|
|
227 |
extra_user_info=None, sub='john.doe', nonce=None, kid=KID):
|
210 |
228 |
token_endpoint = urlparse.urlparse(oidc_provider.token_endpoint)
|
211 |
229 |
userinfo_endpoint = urlparse.urlparse(oidc_provider.userinfo_endpoint)
|
212 |
230 |
token_revocation_endpoint = urlparse.urlparse(oidc_provider.token_revocation_endpoint)
|
... | ... | |
227 |
245 |
id_token.update(extra_id_token)
|
228 |
246 |
|
229 |
247 |
if oidc_provider.idtoken_algo == OIDCProvider.ALGO_RSA:
|
230 |
|
jwt = JWT(header={'alg': 'RS256', 'kid': '1e9gdk7'},
|
|
248 |
jwt = JWT(header={'alg': 'RS256', 'kid': kid},
|
231 |
249 |
claims=id_token)
|
232 |
250 |
jwt.make_signed_token(list(oidc_provider_jwkset['keys'])[0])
|
233 |
251 |
else:
|
... | ... | |
289 |
307 |
return HTTMock(token_endpoint_mock, user_info_endpoint_mock, token_revocation_endpoint_mock)
|
290 |
308 |
|
291 |
309 |
|
292 |
|
@pytest.fixture
|
293 |
|
def login_url(oidc_provider):
|
294 |
|
return reverse('oidc-login', kwargs={'pk': oidc_provider.pk})
|
295 |
|
|
296 |
|
|
297 |
|
@pytest.fixture
|
298 |
310 |
def login_callback_url(oidc_provider):
|
299 |
311 |
return reverse('oidc-login-callback')
|
300 |
312 |
|
... | ... | |
333 |
345 |
assert response.pyquery('p#oidc-p-oidcidp-2')
|
334 |
346 |
|
335 |
347 |
|
336 |
|
|
337 |
|
|
338 |
|
def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, login_url, login_callback_url, hooks):
|
|
348 |
def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
|
339 |
349 |
OU = get_ou_model()
|
340 |
350 |
cassis = OU.objects.create(name='Cassis', slug='cassis')
|
341 |
351 |
OU.cached.cache.clear()
|
... | ... | |
370 |
380 |
|
371 |
381 |
with utils.check_log(caplog, 'failed to contact the token_endpoint'):
|
372 |
382 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
|
373 |
|
response = app.get(login_callback_url, params={'code': 'yyyy', 'state': query['state']})
|
|
383 |
response = app.get(login_callback_url(oidc_provider), params={'code': 'yyyy', 'state': query['state']})
|
374 |
384 |
with utils.check_log(caplog, 'invalid id_token %r'):
|
375 |
385 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
|
376 |
386 |
extra_id_token={'iss': None}):
|
377 |
|
response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
|
|
387 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
378 |
388 |
with utils.check_log(caplog, 'invalid id_token %r'):
|
379 |
389 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
|
380 |
390 |
extra_id_token={'sub': None}):
|
381 |
|
response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
|
|
391 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
382 |
392 |
with utils.check_log(caplog, 'authentication is too old'):
|
383 |
393 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
|
384 |
394 |
extra_id_token={'iat': 1}):
|
385 |
|
response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
|
|
395 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
386 |
396 |
with utils.check_log(caplog, 'invalid id_token %r'):
|
387 |
397 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
|
388 |
398 |
extra_id_token={'exp': 1}):
|
389 |
|
response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
|
|
399 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
390 |
400 |
with utils.check_log(caplog, 'invalid id_token audience'):
|
391 |
401 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
|
392 |
402 |
extra_id_token={'aud': 'zz'}):
|
393 |
|
response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
|
|
403 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
394 |
404 |
with utils.check_log(caplog, 'expected nonce'):
|
395 |
405 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
|
396 |
|
response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
|
|
406 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
397 |
407 |
assert not hooks.auth_oidc_backend_modify_user
|
398 |
408 |
with utils.check_log(caplog, 'created user'):
|
399 |
409 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
|
400 |
|
response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
|
|
410 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
401 |
411 |
assert len(hooks.auth_oidc_backend_modify_user) == 1
|
402 |
412 |
assert set(hooks.auth_oidc_backend_modify_user[0]['kwargs']) >= set(
|
403 |
413 |
['user', 'provider', 'user_info', 'id_token', 'access_token'])
|
... | ... | |
417 |
427 |
|
418 |
428 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
|
419 |
429 |
extra_user_info={'family_name_verified': True}, nonce=nonce):
|
420 |
|
response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
|
|
430 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
421 |
431 |
assert AttributeValue.objects.filter(content='Doe', verified=False).count() == 0
|
422 |
432 |
assert AttributeValue.objects.filter(content='Doe', verified=True).count() == 1
|
423 |
433 |
|
424 |
434 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
|
425 |
435 |
extra_user_info={'ou': 'cassis'}, nonce=nonce):
|
426 |
|
response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
|
|
436 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
427 |
437 |
assert User.objects.count() == 1
|
428 |
438 |
user = User.objects.get()
|
429 |
439 |
assert user.ou == cassis
|
430 |
440 |
|
431 |
441 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
|
432 |
|
response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
|
|
442 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
433 |
443 |
assert User.objects.count() == 1
|
434 |
444 |
user = User.objects.get()
|
435 |
445 |
assert user.ou == get_default_ou()
|
... | ... | |
438 |
448 |
time.sleep(0.1)
|
439 |
449 |
|
440 |
450 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
|
441 |
|
response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
|
|
451 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
442 |
452 |
assert User.objects.count() == 1
|
443 |
453 |
user = User.objects.get()
|
444 |
454 |
assert user.ou == get_default_ou()
|
... | ... | |
469 |
479 |
assert 'oidc-a-oididp' not in response.text
|
470 |
480 |
|
471 |
481 |
|
472 |
|
def test_strategy_find_uuid(app, caplog, code, oidc_provider, oidc_provider_jwkset, login_url,
|
473 |
|
login_callback_url, simple_user):
|
|
482 |
def test_strategy_find_uuid(app, caplog, code, oidc_provider, oidc_provider_jwkset, simple_user):
|
474 |
483 |
|
475 |
484 |
get_providers.cache.clear()
|
476 |
485 |
has_providers.cache.clear()
|
... | ... | |
492 |
501 |
# sub=john.doe, MUST not work
|
493 |
502 |
with utils.check_log(caplog, 'cannot create user'):
|
494 |
503 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
|
495 |
|
response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
|
|
504 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
496 |
505 |
|
497 |
506 |
# sub=simple_user.uuid MUST work
|
498 |
507 |
with utils.check_log(caplog, 'found user using UUID'):
|
499 |
508 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, sub=simple_user.uuid, nonce=nonce):
|
500 |
|
response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
|
|
509 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
501 |
510 |
|
502 |
511 |
assert urlparse.urlparse(response['Location']).path == '/'
|
503 |
512 |
assert User.objects.count() == 1
|
... | ... | |
542 |
551 |
openid_configuration=oidc_conf)
|
543 |
552 |
|
544 |
553 |
|
545 |
|
def test_required_keys(db, oidc_provider, header, signature, caplog):
|
|
554 |
def test_required_keys(db, oidc_provider, caplog):
|
546 |
555 |
erroneous_payload = base64url_encode(json.dumps({
|
547 |
556 |
'sub': '248289761001',
|
548 |
557 |
'iss': 'http://server.example.com',
|
... | ... | |
553 |
562 |
|
554 |
563 |
with pytest.raises(IDTokenError):
|
555 |
564 |
with utils.check_log(caplog, 'missing field'):
|
556 |
|
token = IDToken('{}.{}.{}'.format(header, erroneous_payload, signature))
|
|
565 |
token = IDToken('{}.{}.{}'.format(header(oidc_provider), erroneous_payload, signature(oidc_provider)))
|
557 |
566 |
token.deserialize(oidc_provider)
|
|
567 |
|
|
568 |
|
|
569 |
def test_invalid_kid(app, caplog, code, oidc_provider_rsa,
|
|
570 |
oidc_provider_jwkset, simple_user):
|
|
571 |
|
|
572 |
get_providers.cache.clear()
|
|
573 |
has_providers.cache.clear()
|
|
574 |
# no mapping please
|
|
575 |
OIDCClaimMapping.objects.all().delete()
|
|
576 |
|
|
577 |
User = get_user_model()
|
|
578 |
assert User.objects.count() == 1
|
|
579 |
|
|
580 |
response = app.get('/').maybe_follow()
|
|
581 |
assert oidc_provider_rsa.name in response.text
|
|
582 |
response = response.click(oidc_provider_rsa.name)
|
|
583 |
location = urlparse.urlparse(response.location)
|
|
584 |
query = check_simple_qs(urlparse.parse_qs(location.query))
|
|
585 |
nonce = app.session['auth_oidc'][query['state']]['request']['nonce']
|
|
586 |
|
|
587 |
# test invalid kid
|
|
588 |
with utils.check_log(caplog, message='not in key set', levelname='WARNING'):
|
|
589 |
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, nonce=nonce, kid='coin'):
|
|
590 |
response = app.get(login_callback_url(oidc_provider_rsa), params={'code': code, 'state': query['state']})
|
|
591 |
import pdb
|
|
592 |
pdb.set_trace()
|
|
593 |
|
|
594 |
# test missing kid
|
|
595 |
with utils.check_log(caplog, message='Missing Key ID', levelname='WARNING'):
|
|
596 |
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, nonce=nonce, kid=None):
|
|
597 |
response = app.get(login_callback_url(oidc_provider_rsa), params={'code': code, 'state': query['state']})
|
|
598 |
import pdb
|
|
599 |
pdb.set_trace()
|