Projet

Général

Profil

0003-tests-test-invalid-kid-in-id_token-39136.patch

Benjamin Dauvergne, 21 janvier 2020 13:44

Télécharger (15 ko)

Voir les différences:

Subject: [PATCH 3/3] tests: test invalid kid in id_token (#39136)

 tests/test_auth_oidc.py | 122 +++++++++++++++++++++++++++-------------
 tests/utils.py          |   7 ++-
 2 files changed, 86 insertions(+), 43 deletions(-)
tests/test_auth_oidc.py
19 19
import os
20 20
import pytest
21 21
import json
22
import logging
22 23
import time
23 24
import random
24 25

  
......
55 56
        base64url_decode('x')
56 57
    base64url_decode('aa')
57 58

  
58
header_rsa_decoded = {'alg': 'RS256', 'kid': '1e9gdk7'}
59
KID = '1e9gdk7'
60
header_rsa_decoded = {'alg': 'RS256', 'kid': KID}
59 61
header_hmac_decoded = {'alg': 'HS256'}
60 62
payload_decoded = {
61 63
    'sub': '248289761001',
......
101 103

  
102 104
@pytest.fixture
103 105
def oidc_provider_jwkset():
104
    key = JWK.generate(kty='RSA', size=512, kid='1e9gdk7')
106
    key = JWK.generate(kty='RSA', size=512, kid=KID)
105 107
    jwkset = JWKSet()
106 108
    jwkset.add(key)
107 109
    return jwkset
......
109 111
OIDC_PROVIDER_PARAMS = [
110 112
    {},
111 113
    {
112
        'idtoken_algo': OIDCProvider.ALGO_HMAC
114
        'idtoken_algo': OIDCProvider.ALGO_HMAC,
113 115
    },
114 116
    {
115 117
        'claims_parameter_supported': True,
......
119 121

  
120 122
@pytest.fixture(params=OIDC_PROVIDER_PARAMS)
121 123
def oidc_provider(request, db, oidc_provider_jwkset):
122
    idtoken_algo = request.param.get('idtoken_algo', OIDCProvider.ALGO_RSA)
123 124
    claims_parameter_supported = request.param.get('claims_parameter_supported', False)
125
    idtoken_algo = request.param.get('idtoken_algo', OIDCProvider.ALGO_RSA)
126

  
127
    return make_oidc_provider(
128
        idtoken_algo=idtoken_algo,
129
        jwkset=oidc_provider_jwkset,
130
        claims_parameter_supported=claims_parameter_supported)
131

  
132

  
133
@pytest.fixture
134
def oidc_provider_rsa(request, db, oidc_provider_jwkset):
135
    return make_oidc_provider(
136
        idtoken_algo=OIDCProvider.ALGO_RSA,
137
        jwkset=oidc_provider_jwkset)
138

  
139

  
140
def make_oidc_provider(
141
        idtoken_algo=OIDCProvider._meta.get_field('idtoken_algo').default,
142
        jwkset=None,
143
        claims_parameter_supported=False):
124 144
    from authentic2_auth_oidc.utils import get_provider, get_provider_by_issuer
125 145
    get_provider.cache.clear()
126 146
    get_provider_by_issuer.cache.clear()
127
    if idtoken_algo == OIDCProvider.ALGO_RSA:
128
        jwkset = json.loads(oidc_provider_jwkset.export())
129
    else:
130
        jwkset = None
147

  
148
    if jwkset is not None:
149
        jwkset = json.loads(jwkset.export())
150

  
131 151
    provider = OIDCProvider.objects.create(
132 152
        ou=get_default_ou(),
133 153
        name='OIDIDP',
......
182 202
    return 'xxxx'
183 203

  
184 204

  
185
@pytest.fixture
186 205
def header(oidc_provider):
187 206
    if oidc_provider.idtoken_algo == OIDCProvider.ALGO_RSA:
188 207
        return header_rsa
......
190 209
        return header_hmac
191 210

  
192 211

  
193
@pytest.fixture
194 212
def signature(oidc_provider):
195 213
    if oidc_provider.idtoken_algo == OIDCProvider.ALGO_RSA:
196
        key = oidc_provider.jwkset.get_key(kid='1e9gdk7')
214
        key = oidc_provider.jwkset.get_key(kid=KID)
197 215
        header_decoded = header_rsa_decoded
198 216
    elif oidc_provider.idtoken_algo == OIDCProvider.ALGO_HMAC:
199 217
        key = JWK(kty='oct',
......
206 224

  
207 225

  
208 226
def oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token=None,
209
                       extra_user_info=None, sub='john.doe', nonce=None):
227
                       extra_user_info=None, sub='john.doe', nonce=None, kid=KID):
210 228
    token_endpoint = urlparse.urlparse(oidc_provider.token_endpoint)
211 229
    userinfo_endpoint = urlparse.urlparse(oidc_provider.userinfo_endpoint)
212 230
    token_revocation_endpoint = urlparse.urlparse(oidc_provider.token_revocation_endpoint)
......
227 245
                id_token.update(extra_id_token)
228 246

  
229 247
            if oidc_provider.idtoken_algo == OIDCProvider.ALGO_RSA:
230
                jwt = JWT(header={'alg': 'RS256', 'kid': '1e9gdk7'},
248
                jwt = JWT(header={'alg': 'RS256', 'kid': kid},
231 249
                          claims=id_token)
232 250
                jwt.make_signed_token(list(oidc_provider_jwkset['keys'])[0])
233 251
            else:
......
289 307
    return HTTMock(token_endpoint_mock, user_info_endpoint_mock, token_revocation_endpoint_mock)
290 308

  
291 309

  
292
@pytest.fixture
293
def login_url(oidc_provider):
294
    return reverse('oidc-login', kwargs={'pk': oidc_provider.pk})
295

  
296

  
297
@pytest.fixture
298 310
def login_callback_url(oidc_provider):
299 311
    return reverse('oidc-login-callback')
300 312

  
......
333 345
    assert response.pyquery('p#oidc-p-oidcidp-2')
334 346

  
335 347

  
336

  
337

  
338
def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, login_url, login_callback_url, hooks):
348
def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks):
339 349
    OU = get_ou_model()
340 350
    cassis = OU.objects.create(name='Cassis', slug='cassis')
341 351
    OU.cached.cache.clear()
......
370 380

  
371 381
    with utils.check_log(caplog, 'failed to contact the token_endpoint'):
372 382
        with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
373
            response = app.get(login_callback_url, params={'code': 'yyyy', 'state': query['state']})
383
            response = app.get(login_callback_url(oidc_provider), params={'code': 'yyyy', 'state': query['state']})
374 384
    with utils.check_log(caplog, 'invalid id_token %r'):
375 385
        with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
376 386
                                extra_id_token={'iss': None}):
377
            response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
387
            response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
378 388
    with utils.check_log(caplog, 'invalid id_token %r'):
379 389
        with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
380 390
                                extra_id_token={'sub': None}):
381
            response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
391
            response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
382 392
    with utils.check_log(caplog, 'authentication is too old'):
383 393
        with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
384 394
                                extra_id_token={'iat': 1}):
385
            response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
395
            response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
386 396
    with utils.check_log(caplog, 'invalid id_token %r'):
387 397
        with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
388 398
                                extra_id_token={'exp': 1}):
389
            response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
399
            response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
390 400
    with utils.check_log(caplog, 'invalid id_token audience'):
391 401
        with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
392 402
                                extra_id_token={'aud': 'zz'}):
393
            response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
403
            response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
394 404
    with utils.check_log(caplog, 'expected nonce'):
395 405
        with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
396
            response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
406
            response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
397 407
    assert not hooks.auth_oidc_backend_modify_user
398 408
    with utils.check_log(caplog, 'created user'):
399 409
        with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
400
            response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
410
            response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
401 411
    assert len(hooks.auth_oidc_backend_modify_user) == 1
402 412
    assert set(hooks.auth_oidc_backend_modify_user[0]['kwargs']) >= set(
403 413
        ['user', 'provider', 'user_info', 'id_token', 'access_token'])
......
417 427

  
418 428
    with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
419 429
                            extra_user_info={'family_name_verified': True}, nonce=nonce):
420
        response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
430
        response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
421 431
    assert AttributeValue.objects.filter(content='Doe', verified=False).count() == 0
422 432
    assert AttributeValue.objects.filter(content='Doe', verified=True).count() == 1
423 433

  
424 434
    with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code,
425 435
                            extra_user_info={'ou': 'cassis'}, nonce=nonce):
426
        response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
436
        response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
427 437
    assert User.objects.count() == 1
428 438
    user = User.objects.get()
429 439
    assert user.ou == cassis
430 440

  
431 441
    with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
432
        response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
442
        response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
433 443
    assert User.objects.count() == 1
434 444
    user = User.objects.get()
435 445
    assert user.ou == get_default_ou()
......
438 448
    time.sleep(0.1)
439 449

  
440 450
    with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code):
441
        response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
451
        response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
442 452
    assert User.objects.count() == 1
443 453
    user = User.objects.get()
444 454
    assert user.ou == get_default_ou()
......
469 479
    assert 'oidc-a-oididp' not in response.text
470 480

  
471 481

  
472
def test_strategy_find_uuid(app, caplog, code, oidc_provider, oidc_provider_jwkset, login_url,
473
                            login_callback_url, simple_user):
482
def test_strategy_find_uuid(app, caplog, code, oidc_provider, oidc_provider_jwkset, simple_user):
474 483

  
475 484
    get_providers.cache.clear()
476 485
    has_providers.cache.clear()
......
492 501
    # sub=john.doe, MUST not work
493 502
    with utils.check_log(caplog, 'cannot create user'):
494 503
        with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce):
495
            response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
504
            response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
496 505

  
497 506
    # sub=simple_user.uuid MUST work
498 507
    with utils.check_log(caplog, 'found user using UUID'):
499 508
        with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, sub=simple_user.uuid, nonce=nonce):
500
            response = app.get(login_callback_url, params={'code': code, 'state': query['state']})
509
            response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
501 510

  
502 511
    assert urlparse.urlparse(response['Location']).path == '/'
503 512
    assert User.objects.count() == 1
......
542 551
            openid_configuration=oidc_conf)
543 552

  
544 553

  
545
def test_required_keys(db, oidc_provider, header, signature, caplog):
554
def test_required_keys(db, oidc_provider, caplog):
546 555
    erroneous_payload = base64url_encode(json.dumps({
547 556
        'sub': '248289761001',
548 557
        'iss': 'http://server.example.com',
......
553 562

  
554 563
    with pytest.raises(IDTokenError):
555 564
        with utils.check_log(caplog, 'missing field'):
556
            token = IDToken('{}.{}.{}'.format(header, erroneous_payload, signature))
565
            token = IDToken('{}.{}.{}'.format(header(oidc_provider), erroneous_payload, signature(oidc_provider)))
557 566
            token.deserialize(oidc_provider)
567

  
568

  
569
def test_invalid_kid(app, caplog, code, oidc_provider_rsa,
570
                     oidc_provider_jwkset, simple_user):
571

  
572
    get_providers.cache.clear()
573
    has_providers.cache.clear()
574
    # no mapping please
575
    OIDCClaimMapping.objects.all().delete()
576

  
577
    User = get_user_model()
578
    assert User.objects.count() == 1
579

  
580
    response = app.get('/').maybe_follow()
581
    assert oidc_provider_rsa.name in response.text
582
    response = response.click(oidc_provider_rsa.name)
583
    location = urlparse.urlparse(response.location)
584
    query = check_simple_qs(urlparse.parse_qs(location.query))
585
    nonce = app.session['auth_oidc'][query['state']]['request']['nonce']
586

  
587
    # test invalid kid
588
    with utils.check_log(caplog, message='not in key set', levelname='WARNING'):
589
        with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, nonce=nonce, kid='coin'):
590
            response = app.get(login_callback_url(oidc_provider_rsa), params={'code': code, 'state': query['state']})
591
            import pdb
592
            pdb.set_trace()
593

  
594
    # test missing kid
595
    with utils.check_log(caplog, message='Missing Key ID', levelname='WARNING'):
596
        with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, nonce=nonce, kid=None):
597
            response = app.get(login_callback_url(oidc_provider_rsa), params={'code': code, 'state': query['state']})
598
            import pdb
599
            pdb.set_trace()
tests/utils.py
154 154

  
155 155

  
156 156
@contextmanager
157
def check_log(caplog, msg):
157
def check_log(caplog, message, levelname=None):
158 158
    idx = len(caplog.records)
159 159
    yield
160
    assert any(msg in record.msg for record in caplog.records[idx:]), \
161
        '%r not found in log records' % msg
160
    assert any(message in record.message for record in caplog.records[idx:]
161
               if not levelname or record.levelname == levelname), \
162
        '%r not found in log records' % message
162 163

  
163 164

  
164 165
def can_resolve_dns():
165
-