From 75fa16edf3fa72d7c7d2b0d8a30fd2a17068ce36 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Sun, 18 Oct 2020 12:54:50 +0200 Subject: [PATCH] auth_oidc: use a signed state (#47825) State is no more stored in the session, it's made using signing.dumps() instead, to be more resilient. It's associated to a cookie scoped to the callback path and the nonce created from the state id using an HMAC construction with settings.SECRET_KEY. --- src/authentic2_auth_oidc/views.py | 178 +++++++++++++++++++----------- tests/test_auth_oidc.py | 136 +++++++++++++---------- tests/utils.py | 7 ++ 3 files changed, 198 insertions(+), 123 deletions(-) diff --git a/src/authentic2_auth_oidc/views.py b/src/authentic2_auth_oidc/views.py index 49bad4e0..eb8a5863 100644 --- a/src/authentic2_auth_oidc/views.py +++ b/src/authentic2_auth_oidc/views.py @@ -14,12 +14,15 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import uuid -import logging +import hashlib import json +import logging +import uuid import requests +from django.conf import settings +from django.core import signing from django.urls import reverse from django.utils.translation import get_language, ugettext as _ from django.contrib import messages @@ -34,22 +37,36 @@ from authentic2.utils import redirect, login, good_next_url, authenticate from . import app_settings, models from .utils import get_provider, get_provider_by_issuer +logger = logging.getLogger(__name__) + + +def make_nonce(state): + return hashlib.sha256(state.encode() + settings.SECRET_KEY.encode()).hexdigest() + @setting_enabled('ENABLE', settings=app_settings) def oidc_login(request, pk, next_url=None, *args, **kwargs): - logger = logging.getLogger(__name__) provider = get_provider(pk) scopes = set(provider.scopes.split()) | set(['openid']) - state = str(uuid.uuid4()) - nonce = request.GET.get('nonce') or str(uuid.uuid4()) + state_id = str(uuid.uuid4()) + next_url = next_url or request.GET.get(REDIRECT_FIELD_NAME, '') + if next_url and not good_next_url(request, next_url): + next_url = None + nonce = make_nonce(state_id) display = set() prompt = set() + state_content = { + 'state_id': state_id, + 'issuer': provider.issuer, + } + if next_url: + state_content['next'] = next_url params = { 'client_id': provider.client_id, 'scope': ' '.join(scopes), 'response_type': 'code', 'redirect_uri': request.build_absolute_uri(reverse('oidc-login-callback')), - 'state': state, + 'state': signing.dumps(state_content), 'nonce': nonce, } if provider.claims_parameter_supported: @@ -70,15 +87,12 @@ def oidc_login(request, pk, next_url=None, *args, **kwargs): # FIXME: id_token_hint ? # FIXME: acr_values ? # save request state - saved_state = request.session.setdefault('auth_oidc', {}).setdefault(state, {}) - saved_state['request'] = params - saved_state['issuer'] = provider.issuer - next_url = next_url or request.GET.get(REDIRECT_FIELD_NAME, '') - if good_next_url(request, next_url): - saved_state['next_url'] = next_url - request.session.modified = True # necessary if auth_oidc already exists logger.debug('auth_oidc: sent request to authorization endpoint %r', params) - return redirect(request, provider.authorization_endpoint, params=params, resolve=False) + response = redirect(request, provider.authorization_endpoint, params=params, resolve=False) + response.set_cookie( + 'oidc-state', value=state_id, path=reverse('oidc-login-callback'), + httponly=True, secure=request.is_secure()) + return response @setting_enabled('ENABLE', settings=app_settings) @@ -89,78 +103,81 @@ def login_initiate(request, *args, **kwargs): try: provider = get_provider_by_issuer(issuer) except models.OIDCProvider.DoesNotExist: - return HttpResponseBadRequest(u'unknown issuer %s' % issuer, content_type='text/plain') + return HttpResponseBadRequest('unknown issuer %s' % issuer, content_type='text/plain') return oidc_login(request, pk=provider.pk, next_url=request.GET.get('target_link_uri')) class LoginCallback(View): - def continue_to_next_url(self): - return redirect(self.request, - self.oidc_state.get('next_url', settings.LOGIN_REDIRECT_URL), - resolve=False) + next_url = None + + def continue_to_next_url(self, request): + if self.next_url: + return redirect(request, self.next_url, resolve=False) + else: + return redirect(request, settings.LOGIN_REDIRECT_URL) def get(self, request, *args, **kwargs): - logger = logging.getLogger(__name__) + response = self.handle_authorization_response(request) + # clean the state cookie in all cases + if 'oidc-state' in request.COOKIES: + response.delete_cookie('oidc-state') + return response + + def handle_authorization_response(self, request): code = request.GET.get('code') - state = request.GET.get('state') - oidc_state = self.oidc_state = request.session.get('auth_oidc', {}).get(state) - if not state or not oidc_state or 'request' not in oidc_state: - messages.warning(request, _('Login with OpenIDConnect failed, state lost.')) - logger.warning('auth_oidc: state lost') + raw_state = request.GET.get('state') + if not raw_state: return redirect(request, settings.LOGIN_REDIRECT_URL) - oidc_request = oidc_state.get('request') - assert isinstance(oidc_request, dict), 'state is not properly initialized' - nonce = oidc_request.get('nonce') try: - issuer = oidc_state.get('issuer') + state_content = signing.loads(raw_state) + except signing.BadSignature: + return redirect(request, settings.LOGIN_REDIRECT_URL) + + state = state_content['state_id'] + issuer = state_content['issuer'] + nonce = make_nonce(state) + self.next_url = state_content.get('next') + + try: provider = get_provider_by_issuer(issuer) except models.OIDCProvider.DoesNotExist: - messages.warning(request, _('Unknown OpenID connect issuer')) + messages.warning(request, _('Unknown OpenID connect issuer: "%s"') % issuer) logger.warning('auth_oidc: unknown issuer, %s', issuer) - return self.continue_to_next_url() + return self.continue_to_next_url(request) + + # Check state + if 'oidc-state' not in request.COOKIES or request.COOKIES['oidc-state'] != state: + logger.warning('auth-oidc: state %s for issuer %s has been lost', state, issuer) + params = {} + if self.next_url: + params['next'] = self.next_url + response = redirect(request, 'oidc-login', kwargs={'pk': str(provider.pk)}, params=params) + return response - # FIXME is idp initiated SSO allowed ? in this case state is maybe not mandatory if 'error' in request.GET: # error code path - error_description = request.GET.get('error_description') - error_url = request.GET.get('error_url') - msg = u'auth_oidc: error received ' - if error_description: - msg += u'%s (%s)' % (error_description, request.GET['error']) - else: - msg += request.GET['error'] - if error_url: - msg += u' see %s' % error_url - logger.warning(msg) - if provider: - messages.warning(request, _('Login with %(name)s failed, report %(request_id)s ' - 'to an administrator.') - % { - 'name': provider.name, - 'request_id': request.request_id, - }) - else: - messages.warning(request, _('Login with OpenIDConnect failed, report %s to an ' - 'administrator') % request.request_id) - return self.continue_to_next_url() - if not code: + return self.handle_error(request, provider) + elif not code: messages.warning(request, _('Missing code, report %s to an administrator') % request.request_id) logger.warning('auth_oidc: missing code, %r', request.GET) - return self.continue_to_next_url() + return self.continue_to_next_url(request) + else: + return self.handle_code(request, provider, nonce, code) + + def handle_code(self, request, provider, nonce, code): try: token_endpoint_request = { 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': request.build_absolute_uri(request.path), } - logger.debug('auth_oidc: sent request to token endpoint %r', token_endpoint_request) response = requests.post(provider.token_endpoint, data=token_endpoint_request, auth=(provider.client_id, provider.client_secret), timeout=10) response.raise_for_status() except requests.RequestException as e: logger.warning( 'auth_oidc: failed to contact the token_endpoint for %(issuer)s, %(exception)s' % { - 'issuer': issuer, + 'issuer': provider.issuer, 'exception': e, }) messages.warning(request, _('Provider %(name)s is down, report %(request_id)s to ' @@ -169,11 +186,11 @@ class LoginCallback(View): 'name': provider.name, 'request_id': request.request_id, }) - return self.continue_to_next_url() + return self.continue_to_next_url(request) try: result = response.json() except ValueError as e: - logger.warning(u'auth_oidc: response from %s is not a JSON document, %s, %r' % + logger.warning('auth_oidc: response from %s is not a JSON document, %s, %r' % (provider.token_endpoint, e, response.content)) messages.warning(request, _('Provider %(name)s is down, report %(request_id)s to ' 'an administrator. ') % @@ -181,13 +198,13 @@ class LoginCallback(View): 'name': provider.name, 'request_id': request.request_id, }) - return self.continue_to_next_url() + return self.continue_to_next_url(request) # token_type is case insensitive, https://tools.ietf.org/html/rfc6749#section-4.2.2 if ('access_token' not in result or 'token_type' not in result or result['token_type'].lower() != 'bearer' or 'id_token' not in result): - logger.warning(u'auth_oidc: invalid token endpoint response from %s: %r' % ( + logger.warning('auth_oidc: invalid token endpoint response from %s: %r' % ( provider.token_endpoint, result)) messages.warning(request, _('Provider %(name)s is down, report %(request_id)s to ' 'an administrator. ') % @@ -195,22 +212,49 @@ class LoginCallback(View): 'name': provider.name, 'request_id': request.request_id, }) - return self.continue_to_next_url() - logger.info(u'got token response %s', result) + return self.continue_to_next_url(request) + logger.info('auth_oidc: got token response %s', result) access_token = result.get('access_token') - user = authenticate(request, access_token=access_token, nonce=nonce, id_token=result['id_token'], provider=provider) + user = authenticate( + request, + access_token=access_token, + nonce=nonce, + id_token=result['id_token'], + provider=provider) if user: # remember last tokens for logout + login(request, user, 'oidc', nonce=nonce) tokens = request.session.setdefault('auth_oidc', {}).setdefault('tokens', []) tokens.append({ 'token_response': result, 'provider_pk': provider.pk, }) - request.session.modified = True - login(request, user, 'oidc', nonce=nonce) else: messages.warning(request, _('No user found')) - return self.continue_to_next_url() + return self.continue_to_next_url(request) + + def handle_error(self, request, provider): + error_description = request.GET.get('error_description') + error_url = request.GET.get('error_url') + msg = 'auth_oidc: error received ' + if error_description: + msg += '%s (%s)' % (error_description, request.GET['error']) + else: + msg += request.GET['error'] + if error_url: + msg += ' see %s' % error_url + logger.warning(msg) + if provider: + messages.warning(request, _('Login with %(name)s failed, report %(request_id)s ' + 'to an administrator.') + % { + 'name': provider.name, + 'request_id': request.request_id, + }) + else: + messages.warning(request, _('Login with OpenIDConnect failed, report %s to an ' + 'administrator') % request.request_id) + return self.continue_to_next_url(request) login_callback = setting_enabled('ENABLE', settings=app_settings)(LoginCallback.as_view()) diff --git a/tests/test_auth_oidc.py b/tests/test_auth_oidc.py index 3c83e9eb..e12c4f42 100644 --- a/tests/test_auth_oidc.py +++ b/tests/test_auth_oidc.py @@ -16,25 +16,27 @@ # along with this program. If not, see . import datetime +import json import os import pytest -import json -import time import random +import re +import time +from jwcrypto.common import base64url_encode, base64url_decode, json_encode from jwcrypto.jwk import JWKSet, JWK -from jwcrypto.jwt import JWT from jwcrypto.jws import JWS, InvalidJWSObject -from jwcrypto.common import base64url_encode, base64url_decode, json_encode +from jwcrypto.jwt import JWT from httmock import urlmatch, HTTMock -from django.urls import reverse -from django.utils.timezone import utc from django.contrib.auth import get_user_model +from django.urls import reverse from django.utils.encoding import force_text, force_str -from django.utils.timezone import now +from django.http import QueryDict from django.utils.six.moves.urllib import parse as urlparse +from django.utils.timezone import now +from django.utils.timezone import utc from django_rbac.utils import get_ou_model @@ -261,8 +263,7 @@ def oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token if extra_id_token: id_token.update(extra_id_token) - if oidc_provider.idtoken_algo in (OIDCProvider.ALGO_RSA, - OIDCProvider.ALGO_EC): + if oidc_provider.idtoken_algo in (OIDCProvider.ALGO_RSA, OIDCProvider.ALGO_EC): alg = { OIDCProvider.ALGO_RSA: 'RS256', OIDCProvider.ALGO_EC: 'ES256', @@ -270,9 +271,9 @@ def oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token jwk = None for key in oidc_provider_jwkset['keys']: if key.key_type == { - OIDCProvider.ALGO_RSA: 'RSA', - OIDCProvider.ALGO_EC: 'EC', - }.get(oidc_provider.idtoken_algo): + OIDCProvider.ALGO_RSA: 'RSA', + OIDCProvider.ALGO_EC: 'EC', + }.get(oidc_provider.idtoken_algo): jwk = key break if provides_kid_header: @@ -281,7 +282,7 @@ def oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token header = {'alg': alg, 'kid': jwk.key_id} jwt = JWT(header=header, claims=id_token) jwt.make_signed_token(jwk) - else: # hmac + else: # hmac jwt = JWT(header={'alg': 'HS256'}, claims=id_token) k = base64url_encode(oidc_provider.client_secret.encode('utf-8')) @@ -346,13 +347,6 @@ def login_callback_url(oidc_provider): return reverse('oidc-login-callback') -def check_simple_qs(qs): - for k in qs: - assert len(qs[k]) == 1 - qs[k] = qs[k][0] - return qs - - def test_providers_on_login_page(oidc_provider, app): response = app.get('/login/') # two frontends should be present on login page @@ -381,7 +375,7 @@ def test_providers_on_login_page(oidc_provider, app): def test_login_with_conditional_authenticators(oidc_provider, app, settings, caplog): - oidc2_provider = OIDCProvider.objects.create( + OIDCProvider.objects.create( id=2, ou=get_default_ou(), name='My IDP', @@ -482,7 +476,6 @@ def test_login_autorun(oidc_provider, app, settings): assert response['Location'] == '/accounts/oidc/login/%s/' % oidc_provider.pk - def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks): OU = get_ou_model() cassis = OU.objects.create(name='Cassis', slug='cassis') @@ -495,14 +488,13 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks): assert location.scheme == endpoint.scheme assert location.netloc == endpoint.netloc assert location.path == endpoint.path - query = check_simple_qs(urlparse.parse_qs(location.query)) - assert query['state'] in app.session['auth_oidc'] + query = QueryDict(location.query) + state = query['state'] assert query['response_type'] == 'code' assert query['client_id'] == str(oidc_provider.client_id) assert query['scope'] == 'openid' assert query['redirect_uri'] == 'http://testserver' + reverse('oidc-login-callback') - # get the nonce - nonce = app.session['auth_oidc'][query['state']]['request']['nonce'] + nonce = query['nonce'] if oidc_provider.claims_parameter_supported: claims = json.loads(query['claims']) @@ -517,34 +509,34 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks): with utils.check_log(caplog, 'failed to contact the token_endpoint'): with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code): - response = app.get(login_callback_url(oidc_provider), params={'code': 'yyyy', 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': 'yyyy', 'state': state}) with utils.check_log(caplog, 'invalid id_token'): with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token={'iss': None}): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) with utils.check_log(caplog, 'invalid id_token'): with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token={'sub': None}): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) with utils.check_log(caplog, 'authentication is too old'): with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token={'iat': 1}): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) with utils.check_log(caplog, 'invalid id_token'): with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token={'exp': 1}): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) with utils.check_log(caplog, 'invalid id_token audience'): with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_id_token={'aud': 'zz'}): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) with utils.check_log(caplog, 'expected nonce'): with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) assert not hooks.auth_oidc_backend_modify_user with utils.check_log(caplog, 'created user'): with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) assert len(hooks.auth_oidc_backend_modify_user) == 1 assert set(hooks.auth_oidc_backend_modify_user[0]['kwargs']) >= set( ['user', 'provider', 'user_info', 'id_token', 'access_token']) @@ -564,19 +556,19 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks): with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_user_info={'family_name_verified': True}, nonce=nonce): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) assert AttributeValue.objects.filter(content='Doe', verified=False).count() == 0 assert AttributeValue.objects.filter(content='Doe', verified=True).count() == 1 with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, extra_user_info={'ou': 'cassis'}, nonce=nonce): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) assert User.objects.count() == 1 user = User.objects.get() assert user.ou == cassis with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) assert User.objects.count() == 1 user = User.objects.get() assert user.ou == get_default_ou() @@ -585,7 +577,7 @@ def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks): time.sleep(0.1) with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) assert User.objects.count() == 1 user = User.objects.get() assert user.ou == get_default_ou() @@ -626,18 +618,19 @@ def test_strategy_find_uuid(app, caplog, code, oidc_provider, oidc_provider_jwks assert oidc_provider.name in response.text response = response.click(oidc_provider.name) location = urlparse.urlparse(response.location) - query = check_simple_qs(urlparse.parse_qs(location.query)) - nonce = app.session['auth_oidc'][query['state']]['request']['nonce'] + query = QueryDict(location.query) + state = query['state'] + nonce = query['nonce'] # sub=john.doe, MUST not work with utils.check_log(caplog, 'cannot create user'): with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) # sub=simple_user.uuid MUST work with utils.check_log(caplog, 'found user using UUID'): with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, sub=simple_user.uuid, nonce=nonce): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) assert urlparse.urlparse(response['Location']).path == '/' assert User.objects.count() == 1 @@ -668,24 +661,25 @@ def test_strategy_create(app, caplog, code, oidc_provider, oidc_provider_jwkset) assert oidc_provider.name in response.text response = response.click(oidc_provider.name) location = urlparse.urlparse(response.location) - query = check_simple_qs(urlparse.parse_qs(location.query)) - nonce = app.session['auth_oidc'][query['state']]['request']['nonce'] + query = QueryDict(location.query) + state = query['state'] + nonce = query['nonce'] # sub=john.doe with utils.check_log(caplog, 'auth_oidc: created user'): with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) assert User.objects.count() == 1 # second time with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) assert User.objects.count() == 1 # different sub, same user with utils.check_log(caplog, 'auth_oidc: changed user'): with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, sub='other', nonce=nonce): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}) + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) assert User.objects.count() == 1 @@ -742,18 +736,25 @@ def test_invalid_kid(app, caplog, code, oidc_provider_rsa, assert oidc_provider_rsa.name in response.text response = response.click(oidc_provider_rsa.name) location = urlparse.urlparse(response.location) - query = check_simple_qs(urlparse.parse_qs(location.query)) - nonce = app.session['auth_oidc'][query['state']]['request']['nonce'] + query = QueryDict(location.query) + state = query['state'] + nonce = query['nonce'] # test invalid kid with utils.check_log(caplog, message='not in key set', levelname='WARNING'): - with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, nonce=nonce, provides_kid_header=True, kid='coin'): - response = app.get(login_callback_url(oidc_provider_rsa), params={'code': code, 'state': query['state']}) + with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, + nonce=nonce, provides_kid_header=True, + kid='coin'): + response = app.get(login_callback_url(oidc_provider_rsa), + params={'code': code, 'state': state}) # test missing kid with utils.check_log(caplog, message='Key ID None not in key set', levelname='WARNING'): - with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, nonce=nonce, provides_kid_header=True, kid=None): - response = app.get(login_callback_url(oidc_provider_rsa), params={'code': code, 'state': query['state']}) + with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, + nonce=nonce, provides_kid_header=True, + kid=None): + response = app.get(login_callback_url(oidc_provider_rsa), + params={'code': code, 'state': state}) def test_templated_claim_mapping(app, caplog, code, oidc_provider, oidc_provider_jwkset): @@ -807,11 +808,13 @@ def test_templated_claim_mapping(app, caplog, code, oidc_provider, oidc_provider response = app.get('/').maybe_follow() response = response.click(oidc_provider.name) location = urlparse.urlparse(response.location) - query = check_simple_qs(urlparse.parse_qs(location.query)) - nonce = app.session['auth_oidc'][query['state']]['request']['nonce'] + query = QueryDict(location.query) + state = query['state'] + nonce = query['nonce'] with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce): - response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}).maybe_follow() + response = app.get(login_callback_url(oidc_provider), + params={'code': code, 'state': state}).maybe_follow() assert User.objects.count() == 1 user = User.objects.first() @@ -822,3 +825,24 @@ def test_templated_claim_mapping(app, caplog, code, oidc_provider, oidc_provider assert user.last_name == 'DOE' # typo in template string, no rendering assert user.first_name == '{{ given_name' + + +def test_lost_state(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks): + response = app.get('/login/?next=/whatever/') + assert oidc_provider.name in response.text + response = response.click(oidc_provider.name) + qs = urlparse.parse_qs(urlparse.urlparse(response.location).query) + state = qs['state'] + + # reset the session to forget the state + app.cookiejar.clear() + + caplog.clear() + with utils.norequest: + response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) + # not logged + assert re.match('^auth-oidc: state.*has been lost', caplog.records[-1].message) + # event is recorded + assert '_auth_user_id' not in app.session + # we are automatically redirected to our destination + assert response.location == '/accounts/oidc/login/%s/?next=/whatever/' % oidc_provider.pk diff --git a/tests/utils.py b/tests/utils.py index 69ab3bc1..4da3eb26 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -20,6 +20,7 @@ import base64 import socket from contextlib import contextmanager, closing +import httmock from lxml import etree from django.core.management import call_command as django_call_command @@ -277,3 +278,9 @@ def assert_event(event_type_name, user=None, session=None, service=None, **data) assert event.data.get(key) == value, ( 'event.data[%s] != data[%s] (%s != %s)' % (key, key, event.data.get(key), value) ) + + +@httmock.HTTMock +@httmock.urlmatch() +def norequest(request, url): + assert False, 'no request should be done' -- 2.28.0