From a16388937c3db8a6dcc2a1813fc5f398ff23e6f8 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Fri, 23 Oct 2020 22:08:45 +0200 Subject: [PATCH 1/5] idp_oidc: improve error reporting in token endpoint (#47900) --- src/authentic2_idp_oidc/views.py | 582 ++++++++++++++++++------------- tests/test_idp_oidc.py | 449 +++++++++++++----------- 2 files changed, 584 insertions(+), 447 deletions(-) diff --git a/src/authentic2_idp_oidc/views.py b/src/authentic2_idp_oidc/views.py index 4253a59b..d6a0cdc2 100644 --- a/src/authentic2_idp_oidc/views.py +++ b/src/authentic2_idp_oidc/views.py @@ -18,6 +18,7 @@ import logging import math import datetime import base64 +import secrets import time from django.http import (HttpResponse, HttpResponseNotAllowed, JsonResponse) @@ -49,6 +50,132 @@ from . import app_settings, models, utils logger = logging.getLogger(__name__) +class OIDCException(Exception): + error_code = None + error_description = None + show_message = True + + def __init__(self, error_description=None, status=400, client=None, show_message=None): + if error_description: + self.error_description = error_description + self.status = status + self.client = client + if show_message is not None: + self.show_message = show_message + + def json_response(self, request): + content = { + 'error': self.error_code, + } + + if self.error_description: + content['error_description'] = self.error_description + + if self.client: + logger.warning('idp_oidc: error "%s" in token endpoint "%s" for client %s', + self.error_code, self.error_description, self.client) + else: + logger.warning('idp_oidc: error "%s" in token endpoint "%s"', + self.error_code, self.error_description) + return JsonResponse(content, status=self.status) + + def redirect_response(self, request, redirect_uri=None, use_fragment=None, state=None, client=None): + params = { + 'error': self.error_code, + 'error_description': self.error_description, + } + if state is not None: + params['state'] = state + + log_method = logger.warning + if not self.show_message: + # errors not shown as Django messages are regular events, no need to log as warning + log_method = logger.info + + client = client or self.client + if client: + log_method('idp_oidc: error "%s" in authorize endpoint for client %s": %s', + self.error_code, client, self.error_description) + else: + log_method('idp_oidc: error "%s" in authorize endpoint: %s', + self.error_code, self.error_description) + + if self.show_message: + messages.error(request, _('OpenIDConnect Error "%s": %s') % (self.error_code, self.error_description)) + + if redirect_uri: + if use_fragment: + return redirect(request, redirect_uri + '#%s' % urlencode(params), resolve=False) + else: + return redirect(request, redirect_uri, params=params, resolve=False) + else: + return redirect(request, 'continue', resolve=True) + + +class InvalidRequest(OIDCException): + error_code = 'invalid_request' + + +class MissingParameter(InvalidRequest): + def __init__(self, parameter): + super().__init__(error_description=_('Missing parameter "%s"') % parameter) + + +class UnsupportedResponseType(OIDCException): + error_code = 'unsupported_response_type' + + +class InvalidScope(OIDCException): + error_code = 'invalid_scope' + + +class LoginRequired(OIDCException): + error_code = 'login_required' + show_message = False + + +class ConsentRequired(OIDCException): + error_code = 'consent_required' + show_message = False + + +class AccessDenied(OIDCException): + error_code = 'access_denied' + show_message = False + + +class UnauthorizedClient(OIDCException): + error_code = 'unauthorized_client' + + +class InvalidClient(OIDCException): + error_code = 'invalid_client' + + +class WrongClientId(InvalidClient): + error_description = _('Wrong client\'s identifier') + + +class WrongClientSecret(InvalidClient): + error_description = _('Wrong client\'s secret') + + +def idtoken_duration(client): + return client.idtoken_duration or datetime.timedelta(seconds=app_settings.IDTOKEN_DURATION) + + +def access_token_duration(client): + return client.access_token_duration or datetime.timedelta(seconds=app_settings.IDTOKEN_DURATION) + + +def allowed_scopes(client): + return client.scope_set() or app_settings.SCOPES or ['openid', 'email', 'profile'] + + +def is_scopes_allowed(scopes, client): + return scopes <= set(allowed_scopes(client)) + + @setting_enabled('ENABLE', settings=app_settings) def openid_configuration(request, *args, **kwargs): metadata = { @@ -78,153 +205,104 @@ def certs(request, *args, **kwargs): content_type='application/json') -def authorization_error(request, redirect_uri, error, error_description=None, error_uri=None, - state=None, fragment=False): - params = { - 'error': error, - } - if error_description: - params['error_description'] = error_description - if error_uri: - params['error_uri'] = error_uri - if state is not None: - params['state'] = state - logger.warning(u'idp_oidc: authorization request error redirect_uri=%r error=%r error_description=%r', - redirect_uri, error, error_description, extra={'redirect_uri': redirect_uri}) - if fragment: - return redirect(request, redirect_uri + '#%s' % urlencode(params), resolve=False) - else: - return redirect(request, redirect_uri, params=params, resolve=False) - - -def idtoken_duration(client): - return client.idtoken_duration or datetime.timedelta(seconds=app_settings.IDTOKEN_DURATION) - - -def access_token_duration(client): - return client.access_token_duration or datetime.timedelta(seconds=app_settings.IDTOKEN_DURATION) - - -def allowed_scopes(client): - return client.scope_set() or app_settings.SCOPES or ['openid', 'email', 'profile'] - - -def is_scopes_allowed(scopes, client): - return scopes <= set(allowed_scopes(client)) - - -def log_invalid_request(request, debug_info): - logger.warning('idp_oidc: authorization request error, %s', debug_info) - error_message = _('Authorization request is invalid') - if settings.DEBUG: - error_message += ' (%s)' % debug_info - messages.warning(request, error_message) - - @setting_enabled('ENABLE', settings=app_settings) def authorize(request, *args, **kwargs): - start = now() - - try: - client_id = request.GET['client_id'] - redirect_uri = request.GET['redirect_uri'] - except KeyError as k: - log_invalid_request(request, 'missing %s' % k.args[0]) - return redirect(request, 'auth_homepage') - try: - client = models.OIDCClient.objects.get(client_id=client_id) - except models.OIDCClient.DoesNotExist: - log_invalid_request(request, 'unknown client_id redirect_uri=%r client_id=%r' % (redirect_uri, client_id)) - return redirect(request, 'auth_homepage') - - if client.authorization_flow == client.FLOW_RESOURCE_OWNER_CRED: - messages.warning(request, _('Client is configured for resource owner password credentials grant type')) - return authorization_error(request, 'auth_homepage', - 'unauthorized_client', - error_description='authz endpoint is configured ' - 'for resource owner password credential grant type') - + validated_redirect_uri = None + client_id = None + client = None try: - client.validate_redirect_uri(redirect_uri) - except ValueError as e: - log_invalid_request(request, 'invalid redirect_uri redirect_uri=%r client_id=%r (%s)' % (redirect_uri, client_id, e)) - return redirect(request, 'auth_homepage') - - fragment = client.authorization_flow == client.FLOW_IMPLICIT + client_id = request.GET.get('client_id', '') + if not client_id: + raise MissingParameter('client_id') + redirect_uri = request.GET.get('redirect_uri', '') + if not redirect_uri: + raise MissingParameter('redirect_uri') + client = get_client(client_id=client_id) + if not client: + raise InvalidRequest(_('Unknown client identifier: "%s"') % client_id) + try: + client.validate_redirect_uri(redirect_uri) + except ValueError: + error_description = _( + 'Redirect URI "%s" is unknown.' + ) % redirect_uri + if settings.DEBUG: + error_description += _( + ' Known redirect URIs are: %s' + ) % ', '.join(client.redirect_uris.split()) + raise InvalidRequest(error_description) + state = request.GET.get('state') + use_fragment = client.authorization_flow == client.FLOW_IMPLICIT + validated_redirect_uri = redirect_uri + return authorize_for_client(request, client, validated_redirect_uri) + except OIDCException as e: + return e.redirect_response( + request, + redirect_uri=validated_redirect_uri, + state=validated_redirect_uri and state, + use_fragment=validated_redirect_uri and use_fragment, + client=client) + + +def authorize_for_client(request, client, redirect_uri): + hooks.call_hooks('event', name='sso-request', idp='oidc', service=client) state = request.GET.get('state') + nonce = request.GET.get('nonce') login_hint = set(request.GET.get('login_hint', u'').split()) - - try: - response_type = request.GET['response_type'] - scope = request.GET['scope'] - except KeyError as k: - return authorization_error(request, redirect_uri, 'invalid_request', - state=state, - error_description='missing parameter %s' % k.args[0], - fragment=fragment) - prompt = set(filter(None, request.GET.get('prompt', '').split())) - nonce = request.GET.get('nonce') - scopes = utils.scope_set(scope) - - max_age = request.GET.get('max_age') - if max_age: - try: - max_age = int(max_age) - if max_age < 0: - raise ValueError - except ValueError: - return authorization_error(request, redirect_uri, 'invalid_request', - error_description='max_age is not a positive integer', - state=state, - fragment=fragment) + # check response_type + response_type = request.GET.get('response_type', '') + if not response_type: + raise MissingParameter('response_type') + if client.authorization_flow == client.FLOW_RESOURCE_OWNER_CRED: + raise InvalidRequest(_( + 'Client is configured for resource owner password credentials grant, ' + 'authorize endpoint is not usable' + )) if client.authorization_flow == client.FLOW_AUTHORIZATION_CODE: if response_type != 'code': - return authorization_error(request, redirect_uri, 'unsupported_response_type', - error_description='only code is supported', - state=state, - fragment=fragment) + raise UnsupportedResponseType(_('Response type must be "code"')) elif client.authorization_flow == client.FLOW_IMPLICIT: if not set(filter(None, response_type.split())) in (set(['id_token', 'token']), set(['id_token'])): - return authorization_error(request, redirect_uri, 'unsupported_response_type', - error_description='only "id_token token" or "id_token" ' - 'are supported', - state=state, - fragment=fragment) + raise UnsupportedResponseType(_('Response type must be "id_token token" or "id_token"')) else: raise NotImplementedError - if 'openid' not in scopes: - return authorization_error(request, redirect_uri, 'invalid_request', - error_description='openid scope is missing', - state=state, - fragment=fragment) + # check scope + scope = request.GET.get('scope', '') + if not scope: + raise MissingParameter('scope') + scopes = utils.scope_set(scope) + if 'openid' not in scopes: + raise InvalidScope( + _('Scope must contain "openid", received "%s"') + % ', '.join(sorted(scopes))) if not is_scopes_allowed(scopes, client): - message = 'only "%s" scope(s) are supported, but "%s" requested' % ( - ', '.join(allowed_scopes(client)), ', '.join(scopes)) - return authorization_error(request, redirect_uri, 'invalid_scope', - error_description=message, - state=state, - fragment=fragment) + raise InvalidScope( + _('Scope may contain "%s" scope(s), received "%s"') % ( + ', '.join(sorted(allowed_scopes(client))), + ', '.join(sorted(scopes)))) + + # check max_age + max_age = request.GET.get('max_age') + if max_age: + try: + max_age = int(max_age) + if max_age < 0: + raise ValueError + except ValueError: + raise InvalidRequest(_('Parameter "max_age" must be a positive integer')) - hooks.call_hooks('event', name='sso-request', idp='oidc', service=client) # authentication canceled by user if 'cancel' in request.GET: - logger.info(u'authentication canceled for service %s', client.name) - return authorization_error(request, redirect_uri, 'access_denied', - error_description='user did not authenticate', - state=state, - fragment=fragment) + raise AccessDenied(_('Authentication cancelled by user')) if not request.user.is_authenticated or 'login' in prompt: if 'none' in prompt: - return authorization_error(request, redirect_uri, 'login_required', - error_description='login is required but prompt is none', - state=state, - fragment=fragment) + raise LoginRequired(_('Login is required but prompt parameter is "none"')) params = {} if nonce is not None: params['nonce'] = nonce @@ -237,15 +315,14 @@ def authorize(request, *args, **kwargs): last_auth = last_authentication_event(request=request) if max_age is not None and time.time() - last_auth['when'] >= max_age: if 'none' in prompt: - return authorization_error(request, redirect_uri, 'login_required', - error_description='login is required but prompt is none', - state=state, - fragment=fragment) + raise LoginRequired(_('Login is required because of max_age, but prompt parameter is "none"')) params = {} if nonce is not None: params['nonce'] = nonce return login_require(request, params=params, service=client, login_hint=login_hint) + iat = now() # iat = issued at + if client.authorization_mode != client.AUTHORIZATION_MODE_NONE or 'consent' in prompt: # authorization by user is mandatory, as per local configuration or per explicit request by # the RP @@ -256,24 +333,19 @@ def authorize(request, *args, **kwargs): auth_manager = client.ou.oidc_authorizations qs = auth_manager.filter(user=request.user) - if 'consent' in prompt: # if consent is asked we delete existing authorizations # it seems to be the safer option qs.delete() qs = auth_manager.none() else: - qs = qs.filter(expired__gte=start) + qs = qs.filter(expired__gte=iat) authorized_scopes = set() for authorization in qs: authorized_scopes |= authorization.scope_set() if (authorized_scopes & scopes) < scopes: if 'none' in prompt: - return authorization_error( - request, redirect_uri, 'consent_required', - error_description='consent is required but prompt is none', - state=state, - fragment=fragment) + raise ConsentRequired(_('Consent is required but prompt parameter is "none"')) if request.method == 'POST': if 'accept' in request.POST: if 'do_not_ask_again' in request.POST: @@ -284,25 +356,23 @@ def authorize(request, *args, **kwargs): pk_to_deletes.append(authorization.pk) auth_manager.create( user=request.user, scopes=u' '.join(sorted(scopes)), - expired=start + datetime.timedelta(days=365)) + expired=iat + datetime.timedelta(days=365)) if pk_to_deletes: auth_manager.filter(pk__in=pk_to_deletes).delete() request.journal.record( 'user.service.sso.authorization', service=client, scopes=list(sorted(scopes))) - logger.info(u'authorized scopes %s saved for service %s', ' '.join(scopes), - client.name) + logger.info( + 'idp_oidc: authorized scopes %s saved for service %s', + ' '.join(scopes), client) else: - logger.info(u'authorized scopes %s for service %s', ' '.join(scopes), - client.name) + logger.info( + 'idp_oidc: authorized scopes %s for service %s', + ' '.join(scopes), + client) else: - logger.info(u'refused scopes %s for service %s', ' '.join(scopes), - client.name) - return authorization_error(request, redirect_uri, 'access_denied', - error_description='user denied access', - state=state, - fragment=fragment) + raise AccessDenied(_('User consent refused')) else: return render(request, 'authentic2_idp_oidc/authorization.html', { @@ -313,12 +383,12 @@ def authorize(request, *args, **kwargs): code = models.OIDCCode.objects.create( client=client, user=request.user, scopes=u' '.join(scopes), state=state, nonce=nonce, redirect_uri=redirect_uri, - expired=start + datetime.timedelta(seconds=30), + expired=iat + datetime.timedelta(seconds=30), auth_time=datetime.datetime.fromtimestamp(last_auth['when'], utc), session_key=request.session.session_key) - logger.info(u'sending code %s for scopes %s for service %s', - code.uuid, ' '.join(scopes), - client.name) + logger.info( + 'idp_oidc: sending code %s for scopes %s for service %s', + code.uuid, ' '.join(scopes), client) params = { 'code': six.text_type(code.uuid), } @@ -326,7 +396,6 @@ def authorize(request, *args, **kwargs): params['state'] = state response = redirect(request, redirect_uri, params=params, resolve=False) else: - # FIXME: we should probably factorize this part with the token endpoint similar code need_access_token = 'token' in response_type.split() expires_in = access_token_duration(client) if need_access_token: @@ -335,7 +404,7 @@ def authorize(request, *args, **kwargs): user=request.user, scopes=u' '.join(scopes), session_key=request.session.session_key, - expired=start + expires_in) + expired=iat + expires_in) acr = '0' if nonce is not None and last_auth.get('nonce') == nonce: acr = '1' @@ -344,12 +413,12 @@ def authorize(request, *args, **kwargs): request.user, scopes, id_token=True) - exp = start + idtoken_duration(client) + exp = iat + idtoken_duration(client) id_token.update({ 'iss': utils.get_issuer(request), 'aud': client.client_id, 'exp': int(exp.timestamp()), - 'iat': int(start.timestamp()), + 'iat': int(iat.timestamp()), 'auth_time': last_auth['when'], 'acr': acr, 'sid': utils.get_session_id(request, client), @@ -378,73 +447,99 @@ def authorize(request, *args, **kwargs): return response -def authenticate_client(request, client=None): - '''Authenticate client on the token endpoint''' +def parse_http_basic(request): + authorization = request.META['HTTP_AUTHORIZATION'].split() + if authorization[0] != 'Basic' or len(authorization) != 2: + return None, None + try: + decoded = force_text(base64.b64decode(authorization[1])) + except Base64Error: + return None, None + parts = decoded.split(':') + if len(parts) != 2: + return None, None + return parts - if 'HTTP_AUTHORIZATION' in request.META: - authorization = request.META['HTTP_AUTHORIZATION'].split() - if authorization[0] != 'Basic' or len(authorization) != 2: - return None - try: - decoded = force_text(base64.b64decode(authorization[1])) - except Base64Error: - return None - parts = decoded.split(':') - if len(parts) != 2: - return None - client_id, client_secret = parts - elif 'client_id' in request.POST: - client_id = request.POST['client_id'] - client_secret = request.POST.get('client_secret', '') - else: - return None + +def get_client(client_id, client=None): if not client: try: client = models.OIDCClient.objects.get(client_id=client_id) except models.OIDCClient.DoesNotExist: return None - if client.client_secret != client_secret: - return None + else: + if client.client_id != client_id: + return None return client -def error_response(error, error_description=None, status=400): - content = { - 'error': error, - } - if error_description: - content['error_description'] = error_description - return JsonResponse(content, status=status) - +def authenticate_client_secret(client, client_secret): + raw_client_client_secret = client.client_secret.encode('utf-8') + raw_provided_client_secret = client_secret.encode('utf-8') + if len(raw_client_client_secret) != len(raw_provided_client_secret): + raise WrongClientSecret(client=client) + if not secrets.compare_digest( + raw_client_client_secret, + raw_provided_client_secret): + raise WrongClientSecret(client=client) + return client -def invalid_request_response(error_description=None): - return error_response('invalid_request', error_description=error_description) +def check_ratelimited(request, key='ip', increment=True): + return is_ratelimited( + request, group='ro-cred-grant', increment=increment, + key=key, rate=app_settings.PASSWORD_GRANT_RATELIMIT) -def access_denied_response(error_description=None): - return error_response('access_denied', error_description=error_description) +def authenticate_client(request, ratelimit=False, client=None): + '''Authenticate client on the token endpoint''' -def unauthorized_client_response(error_description=None): - return error_response('unauthorized_client', error_description=error_description) + if 'HTTP_AUTHORIZATION' in request.META: + client_id, client_secret = parse_http_basic(request) + elif 'client_id' in request.POST: + client_id = request.POST.get('client_id', '') + client_secret = request.POST.get('client_secret', '') + else: + return None + if not client_id: + raise WrongClientId -def invalid_client_response(error_description=None): - return error_response('invalid_client', error_description=error_description) + if not client_secret: + raise InvalidRequest('missing client_secret', client=client_id) + client = get_client(client_id) + if not client: + raise WrongClientId -def credential_grant_ratelimit_key(group, request): - client = authenticate_client(request, client=None) - if client: - return client.client_id - # return remote address when no valid client credentials have been provided - return request.META['REMOTE_ADDR'] + return authenticate_client_secret(client, client_secret) def idtoken_from_user_credential(request): + # if rate limit by ip is exceeded, do not even try client authentication + if check_ratelimited(request, increment=False): + raise InvalidRequest('Rate limit exceeded for IP address "%s"' % request.META.get('REMOTE_ADDR', '')) + + try: + client = authenticate_client(request, ratelimit=True, client=None) + except InvalidClient: + # increment rate limit by IP + if check_ratelimited(request): + raise InvalidRequest( + _('Rate limit exceeded for IP address "%s"') % request.META.get('REMOTE_ADDR', '')) + raise + + # check rate limit by client id + if check_ratelimited(request, key=lambda group, request: client.client_id): + raise InvalidClient( + _('Rate limit of %s exceeded for client "%s"') % ( + app_settings.PASSWORD_GRANT_RATELIMIT, client), + client=client) + if request.META.get('CONTENT_TYPE') != 'application/x-www-form-urlencoded': - return invalid_request_response( - 'wrong content type. request content type must be \'application/x-www-form-urlencoded\'') + raise InvalidRequest( + _('Wrong content type. request content type must be ' + '\'application/x-www-form-urlencoded\''), client=client) username = request.POST.get('username') scope = request.POST.get('scope') OrganizationalUnit = get_ou_model() @@ -452,27 +547,15 @@ def idtoken_from_user_credential(request): # scope is ignored, we used the configured scope if not all((username, request.POST.get('password'))): - return invalid_request_response( - 'request must bear both username and password as ' - 'parameters using the "application/x-www-form-urlencoded" ' - 'media type') - - if is_ratelimited( - request, group='ro-cred-grant', increment=True, - key=credential_grant_ratelimit_key, - rate=app_settings.PASSWORD_GRANT_RATELIMIT): - return invalid_request_response( - 'reached rate limitation, too many erroneous requests') - - client = authenticate_client(request, client=None) - - if not client: - return invalid_client_response('client authentication failed') + raise InvalidRequest( + _('Request must bear both username and password as ' + 'parameters using the "application/x-www-form-urlencoded" ' + 'media type'), client=client) if client.authorization_flow != models.OIDCClient.FLOW_RESOURCE_OWNER_CRED: - return unauthorized_client_response( - 'client is not configured for resource owner password ' - 'credential grant') + raise UnauthorizedClient( + _('Client is not configured for resource owner password ' + 'credential grant'), client=client) exponential_backoff = ExponentialRetryTimeout( key_prefix='idp-oidc-ro-cred-grant', @@ -484,22 +567,22 @@ def idtoken_from_user_credential(request): if seconds_to_wait > a2_app_settings.A2_LOGIN_EXPONENTIAL_RETRY_TIMEOUT_MAX_DURATION: seconds_to_wait = a2_app_settings.A2_LOGIN_EXPONENTIAL_RETRY_TIMEOUT_MAX_DURATION if seconds_to_wait: - return invalid_request_response( - 'too many attempts with erroneous RO password, you must wait ' - '%s seconds to try again.' % int(math.ceil(seconds_to_wait))) + raise InvalidRequest( + _('Too many attempts with erroneous RO password, you must wait ' + '%s seconds to try again.') % int(math.ceil(seconds_to_wait)), client=client) ou = None if 'ou_slug' in request.POST: try: ou = OrganizationalUnit.objects.get(slug=request.POST.get('ou_slug')) except OrganizationalUnit.DoesNotExist: - return invalid_request_response( - 'ou_slug parameter does not match a valid organization unit') + raise InvalidRequest( + _('Parameter "ou_slug" does not match an existing organizational unit'), client=client) user = authenticate(request, username=username, password=request.POST.get('password'), ou=ou) if not user: exponential_backoff.failure(*backoff_keys) - return access_denied_response('invalid resource owner credentials') + raise AccessDenied(_('Invalid user credentials'), client=client) # limit requested scopes if scope is not None: @@ -508,7 +591,7 @@ def idtoken_from_user_credential(request): scopes = client.scope_set() exponential_backoff.success(*backoff_keys) - start = now() + iat = now() # iat = issued at # make access_token expires_in = access_token_duration(client) access_token = models.OIDCAccessToken.objects.create( @@ -516,7 +599,7 @@ def idtoken_from_user_credential(request): user=user, scopes=' '.join(scopes), session_key='', - expired=start + expires_in) + expired=iat + expires_in) # make id_token id_token = utils.create_user_info( request, @@ -524,13 +607,13 @@ def idtoken_from_user_credential(request): user, scopes, id_token=True) - exp = start + idtoken_duration(client) + exp = iat + idtoken_duration(client) id_token.update({ 'iss': utils.get_issuer(request), 'aud': client.client_id, 'exp': int(exp.timestamp()), - 'iat': int(start.timestamp()), - 'auth_time': int(start.timestamp()), + 'iat': int(iat.timestamp()), + 'auth_time': int(iat.timestamp()), 'acr': '0', }) return JsonResponse({ @@ -542,23 +625,21 @@ def idtoken_from_user_credential(request): def tokens_from_authz_code(request): + client = authenticate_client(request) + code = request.POST.get('code') - if code is None: - return invalid_request_response('missing code') + if not code: + raise MissingParameter('code', client=client) try: oidc_code = models.OIDCCode.objects.select_related().get(uuid=code) except models.OIDCCode.DoesNotExist: - return invalid_request_response('invalid code') + raise InvalidRequest(_('Parameter "code" is invalid'), client=client) if not oidc_code.is_valid(): - return invalid_request_response('code has expired or user is disconnected') - client = authenticate_client(request, client=oidc_code.client) - if client is None: - return HttpResponse('unauthenticated', status=401) - # delete immediately + raise InvalidRequest(_('Parameter "code" has expired or user is disconnected'), client=client) models.OIDCCode.objects.filter(uuid=code).delete() redirect_uri = request.POST.get('redirect_uri') if oidc_code.redirect_uri != redirect_uri: - return invalid_request_response('invalid redirect_uri') + raise InvalidRequest(_('Parameter "redirect_uri" does not match the code.'), client=client) expires_in = access_token_duration(client) access_token = models.OIDCAccessToken.objects.create( client=client, @@ -604,15 +685,22 @@ def token(request, *args, **kwargs): if request.method != 'POST': return HttpResponseNotAllowed(['POST']) grant_type = request.POST.get('grant_type') - if grant_type == 'password': - response = idtoken_from_user_credential(request) - elif grant_type == 'authorization_code': - response = tokens_from_authz_code(request) - else: - return invalid_request_response('grant_type must be either authorization_code or password') - response['Cache-Control'] = 'no-store' - response['Pragma'] = 'no-cache' - return response + try: + if grant_type == 'password': + response = idtoken_from_user_credential(request) + elif grant_type == 'authorization_code': + response = tokens_from_authz_code(request) + else: + raise InvalidRequest('grant_type must be either authorization_code or password') + response['Cache-Control'] = 'no-store' + response['Pragma'] = 'no-cache' + return response + except OIDCException as e: + response = e.json_response(request) + # special case of client authentication error with HTTP Basic + if 'HTTP_AUTHORIZATION' in request and e.error_code == 'invalid_client': + response['WWW-Authenticate'] = 'Basic' + return response def authenticate_access_token(request): diff --git a/tests/test_idp_oidc.py b/tests/test_idp_oidc.py index c158a536..939494f1 100644 --- a/tests/test_idp_oidc.py +++ b/tests/test_idp_oidc.py @@ -15,8 +15,9 @@ # along with this program. If not, see . import base64 -import json import datetime +import functools +import json import pytest @@ -25,9 +26,9 @@ from jwcrypto.jwk import JWKSet, JWK from . import utils -from django import VERSION as DJ_VERSION from django.core.exceptions import ValidationError from django.core.files import File +from django.http import QueryDict from django.test.utils import override_settings from django.urls import reverse from django.utils.encoding import force_text @@ -191,7 +192,7 @@ def bearer_authentication_headers(access_token): @pytest.mark.parametrize('do_not_ask_again', [(True,), (False,)]) @pytest.mark.parametrize('login_first', [(True,), (False,)]) -def test_authorization_code_sso(login_first, do_not_ask_again, oidc_settings, oidc_client, simple_user, app): +def test_authorization_code_sso(login_first, do_not_ask_again, oidc_settings, oidc_client, simple_user, app, caplog): redirect_uri = oidc_client.redirect_uris.split()[0] params = { 'client_id': oidc_client.client_id, @@ -221,6 +222,7 @@ def test_authorization_code_sso(login_first, do_not_ask_again, oidc_settings, oi response = response.follow() assert response.request.path == reverse('oidc-authorize') if oidc_client.authorization_mode != OIDCClient.AUTHORIZATION_MODE_NONE: + response = response.maybe_follow() assert 'a2-oidc-authorization-form' in response.text assert OIDCAuthorization.objects.count() == 0 assert OIDCCode.objects.count() == 0 @@ -391,27 +393,52 @@ def test_authorization_code_sso(login_first, do_not_ask_again, oidc_settings, oi assert iframes.attr('onload').endswith(', 300)') -def assert_oidc_error(response, error, error_description=None, fragment=False): - location = urlparse.urlparse(response['Location']) - query = location.fragment if fragment else location.query - query = urlparse.parse_qs(query) - assert query['error'] == [error] - if error_description: - assert len(query['error_description']) == 1 - assert error_description in query['error_description'][0] +def check_authorize_error(response, error, error_description, fragment, caplog, + check_next=True, redirect_uri=None, message=True): + # check next_url qs + if message: + location = urlparse.urlparse(response.location) + assert location.path == '/continue/' + if check_next: + location_qs = QueryDict(location.query or '') + assert 'next' in location_qs + assert location_qs['next'].startswith(redirect_uri) + next_url = urlparse.urlparse(location_qs['next']) + next_url_qs = QueryDict(next_url.fragment if fragment else next_url.query) + assert next_url_qs['error'] == error + assert next_url_qs['error_description'] == error_description + # check continue page + continue_response = response.follow() + assert error_description in continue_response.pyquery('.error').text() + elif check_next: + assert response.location.startswith(redirect_uri) + location = urlparse.urlparse(response.location) + location_qs = QueryDict(location.fragment if fragment else location.query) + assert location_qs['error'] == error + assert location_qs['error_description'] == error_description + # check logs + last_record = caplog.records[-1] + if message: + assert last_record.levelname == 'WARNING' + else: + assert last_record.levelname == 'INFO' + assert 'error "%s" in authorize endpoint' % error in last_record.message + assert error_description in last_record.message + if message: + return continue_response def assert_authorization_response(response, fragment=False, **kwargs): - location = urlparse.urlparse(response['Location']) - query = location.fragment if fragment else location.query - query = urlparse.parse_qs(query) + location = urlparse.urlparse(response.location) + location_qs = QueryDict(location.fragment if fragment else location.query) + assert set(location_qs) == set(kwargs) for key, value in kwargs.items(): if value is None: - assert key in query + assert key in location_qs elif isinstance(value, list): - assert query[key] == value + assert set(location_qs.getlist(key)) == set(value) else: - assert value in query[key][0] + assert value in location_qs[key] def test_invalid_request(caplog, oidc_settings, oidc_client, simple_user, app): @@ -425,119 +452,102 @@ def test_invalid_request(caplog, oidc_settings, oidc_client, simple_user, app): else: raise NotImplementedError - # missing client_id - authorize_url = make_url('oidc-authorize', params={}) + assert_authorize_error = functools.partial( + check_authorize_error, + caplog=caplog, + fragment=fragment, + redirect_uri=redirect_uri) - response = app.get(authorize_url, status=302) - assert urlparse.urlparse(response['Location']).path == '/' - response = response.maybe_follow() - assert 'Authorization request is invalid' in response + # missing client_id + response = app.get(make_url('oidc-authorize', params={})) + assert_authorize_error(response, 'invalid_request', 'Missing parameter "client_id"', check_next=False) # missing redirect_uri - authorize_url = make_url('oidc-authorize', params={ + response = app.get(make_url('oidc-authorize', params={ 'client_id': oidc_client.client_id, - }) - - response = app.get(authorize_url, status=302) - assert urlparse.urlparse(response['Location']).path == '/' - response = response.maybe_follow() - assert 'Authorization request is invalid' in response + })) + assert_authorize_error(response, 'invalid_request', 'Missing parameter "redirect_uri"', check_next=False) # invalid client_id - authorize_url = make_url('oidc-authorize', params={ + authorize_url = app.get(make_url('oidc-authorize', params={ 'client_id': 'xxx', 'redirect_uri': redirect_uri, - }) - - response = app.get(authorize_url, status=302) - assert urlparse.urlparse(response['Location']).path == '/' - response = response.maybe_follow() - assert 'Authorization request is invalid' in response + })) + assert_authorize_error(response, 'invalid_request', 'Unknown client identifier: "xxx"', check_next=False) # invalid redirect_uri - authorize_url = make_url('oidc-authorize', params={ + response = app.get(make_url('oidc-authorize', params={ 'client_id': oidc_client.client_id, 'redirect_uri': 'xxx', 'response_type': 'code', 'scope': 'openid', - }) - - response = app.get(authorize_url, status=302) - assert urlparse.urlparse(response['Location']).path == '/' - response = response.maybe_follow() - assert 'Authorization request is invalid' in response - assert not 'invalid redirect_uri' in response + }), status=302) + continue_response = assert_authorize_error( + response, 'invalid_request', 'Redirect URI "xxx" is unknown.', check_next=False) + assert 'Known' not in continue_response.pyquery('.error').text() + # invalid redirect_uri with DEBUG=True, list of redirect_uris is shown with override_settings(DEBUG=True): - response = app.get(authorize_url, status=302) - assert urlparse.urlparse(response['Location']).path == '/' - response = response.maybe_follow() - assert 'invalid redirect_uri' in response + response = app.get(make_url('oidc-authorize', params={ + 'client_id': oidc_client.client_id, + 'redirect_uri': 'xxx', + 'response_type': 'code', + 'scope': 'openid', + }), status=302) + continue_response = assert_authorize_error( + response, 'invalid_request', 'Redirect URI "xxx" is unknown.', check_next=False) + assert ( + 'Known redirect URIs are: https://example.com/callbac%C3%A9' + in continue_response.pyquery('.error').text() + ) # missing response_type - authorize_url = make_url('oidc-authorize', params={ + response = app.get(make_url('oidc-authorize', params={ 'client_id': oidc_client.client_id, 'redirect_uri': redirect_uri, - }) - - response = app.get(authorize_url) - if DJ_VERSION < (2, 0): - errmsg1 = 'missing parameter \'response_type\'' - errmsg2 = 'missing parameter \'scope\'' - else: - errmsg1 = 'missing parameter response_type' - errmsg2 = 'missing parameter scope' - assert_oidc_error(response, 'invalid_request', errmsg1, - fragment=fragment) - logrecord = [rec for rec in caplog.records if rec.funcName == 'authorization_error'][0] - assert logrecord.levelname == 'WARNING' - assert logrecord.redirect_uri == 'https://example.com/callbac%C3%A9' - assert errmsg1 in logrecord.message + })) + assert_authorize_error(response, 'invalid_request', 'Missing parameter "response_type"') - # missing scope - authorize_url = make_url('oidc-authorize', params={ + # unsupported response_type + response = app.get(make_url('oidc-authorize', params={ 'client_id': oidc_client.client_id, 'redirect_uri': redirect_uri, - 'response_type': 'code', - }) + 'response_type': 'xxx', + })) - response = app.get(authorize_url) - assert_oidc_error(response, 'invalid_request', errmsg2, fragment=fragment) + if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE: + assert_authorize_error(response, 'unsupported_response_type', 'Response type must be "code"') + elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT: + assert_authorize_error( + response, 'unsupported_response_type', 'Response type must be "id_token token" or "id_token"') - # invalid max_age - authorize_url = make_url('oidc-authorize', params={ + # missing scope + response = app.get(make_url('oidc-authorize', params={ 'client_id': oidc_client.client_id, 'redirect_uri': redirect_uri, - 'response_type': 'code', - 'scope': 'openid', - 'max_age': 'xxx', - }) - response = app.get(authorize_url) - assert_oidc_error(response, 'invalid_request', 'max_age is not', fragment=fragment) - authorize_url = make_url('oidc-authorize', params={ + 'response_type': response_type, + })) + assert_authorize_error(response, 'invalid_request', 'Missing parameter "scope"') + + # invalid max_age : not an integer + response = app.get(make_url('oidc-authorize', params={ 'client_id': oidc_client.client_id, 'redirect_uri': redirect_uri, - 'response_type': 'code', + 'response_type': response_type, 'scope': 'openid', - 'max_age': '-1', - }) - response = app.get(authorize_url) - assert_oidc_error(response, 'invalid_request', 'max_age is not', fragment=fragment) + 'max_age': 'xxx', + })) + assert_authorize_error(response, 'invalid_request', 'Parameter "max_age" must be a positive integer') - # unsupported response_type - authorize_url = make_url('oidc-authorize', params={ + # invalid max_age : not positive + response = app.get(make_url('oidc-authorize', params={ 'client_id': oidc_client.client_id, 'redirect_uri': redirect_uri, - 'response_type': 'xxx', + 'response_type': response_type, 'scope': 'openid', - }) - - response = app.get(authorize_url) - if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE: - assert_oidc_error(response, 'unsupported_response_type', 'only code is supported') - elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT: - assert_oidc_error(response, 'unsupported_response_type', - 'only "id_token token" or "id_token" are supported', fragment=fragment) + 'max_age': '-1', + })) + assert_authorize_error(response, 'invalid_request', 'Parameter "max_age" must be a positive integer') # openid scope is missing authorize_url = make_url('oidc-authorize', params={ @@ -548,7 +558,7 @@ def test_invalid_request(caplog, oidc_settings, oidc_client, simple_user, app): }) response = app.get(authorize_url) - assert_oidc_error(response, 'invalid_request', 'openid scope is missing', fragment=fragment) + assert_authorize_error(response, 'invalid_scope', 'Scope must contain "openid", received "profile"') # use of an unknown scope authorize_url = make_url('oidc-authorize', params={ @@ -559,73 +569,71 @@ def test_invalid_request(caplog, oidc_settings, oidc_client, simple_user, app): }) response = app.get(authorize_url) - assert_oidc_error(response, 'invalid_scope', fragment=fragment) + assert_authorize_error( + response, + 'invalid_scope', + 'Scope may contain "email, openid, profile" scope(s), received "email, openid, profile, zob"') # restriction on scopes - oidc_settings.A2_IDP_OIDC_SCOPES = ['openid'] - authorize_url = make_url('oidc-authorize', params={ - 'client_id': oidc_client.client_id, - 'redirect_uri': redirect_uri, - 'response_type': response_type, - 'scope': 'openid email', - }) - - response = app.get(authorize_url) - assert_oidc_error(response, 'invalid_scope', fragment=fragment) - del oidc_settings.A2_IDP_OIDC_SCOPES + with override_settings(A2_IDP_OIDC_SCOPES=['openid']): + response = app.get(make_url('oidc-authorize', params={ + 'client_id': oidc_client.client_id, + 'redirect_uri': redirect_uri, + 'response_type': response_type, + 'scope': 'openid email', + })) + assert_authorize_error( + response, + 'invalid_scope', + 'Scope may contain "openid" scope(s), received "email, openid"') # cancel - authorize_url = make_url('oidc-authorize', params={ + response = app.get(make_url('oidc-authorize', params={ 'client_id': oidc_client.client_id, 'redirect_uri': redirect_uri, 'response_type': response_type, 'scope': 'openid email profile', 'cancel': '1', - }) - - response = app.get(authorize_url) - assert_oidc_error(response, 'access_denied', error_description='user did not authenticate', - fragment=fragment) + })) + assert_authorize_error(response, 'access_denied', 'Authentication cancelled by user', message=False) # prompt=none - authorize_url = make_url('oidc-authorize', params={ + response = app.get(make_url('oidc-authorize', params={ 'client_id': oidc_client.client_id, 'redirect_uri': redirect_uri, 'response_type': response_type, 'scope': 'openid email profile', 'prompt': 'none', - }) - - response = app.get(authorize_url) - assert_oidc_error(response, 'login_required', error_description='prompt is none', - fragment=fragment) + })) + assert_authorize_error(response, + 'login_required', + error_description='Login is required but prompt parameter is "none"', + message=False) utils.login(app, simple_user) # prompt=none max_age=0 - authorize_url = make_url('oidc-authorize', params={ + response = app.get(make_url('oidc-authorize', params={ 'client_id': oidc_client.client_id, 'redirect_uri': redirect_uri, 'response_type': response_type, 'scope': 'openid email profile', 'max_age': '0', 'prompt': 'none', - }) - - response = app.get(authorize_url) - assert_oidc_error(response, 'login_required', error_description='prompt is none', - fragment=fragment) + })) + assert_authorize_error(response, 'login_required', + error_description='Login is required because of max_age, but prompt parameter is "none"', + message=False) # max_age=0 - authorize_url = make_url('oidc-authorize', params={ + response = app.get(make_url('oidc-authorize', params={ 'client_id': oidc_client.client_id, 'redirect_uri': redirect_uri, 'response_type': response_type, 'scope': 'openid email profile', 'max_age': '0', - }) - response = app.get(authorize_url) - assert urlparse.urlparse(response['Location']).path == reverse('auth_login') + })) + assert response.location.startswith(reverse('auth_login') + '?') # prompt=login authorize_url = make_url('oidc-authorize', params={ @@ -638,42 +646,50 @@ def test_invalid_request(caplog, oidc_settings, oidc_client, simple_user, app): response = app.get(authorize_url) assert urlparse.urlparse(response['Location']).path == reverse('auth_login') - # user refuse authorization - authorize_url = make_url('oidc-authorize', params={ - 'client_id': oidc_client.client_id, - 'redirect_uri': redirect_uri, - 'response_type': response_type, - 'scope': 'openid email profile', - 'prompt': 'none', - }) - response = app.get(authorize_url) - if oidc_client.authorization_mode != oidc_client.AUTHORIZATION_MODE_NONE: - assert_oidc_error(response, 'consent_required', error_description='prompt is none', - fragment=fragment) - - # user refuse authorization - authorize_url = make_url('oidc-authorize', params={ - 'client_id': oidc_client.client_id, - 'redirect_uri': redirect_uri, - 'response_type': response_type, - 'scope': 'openid email profile', - }) - response = app.get(authorize_url) if oidc_client.authorization_mode != oidc_client.AUTHORIZATION_MODE_NONE: + # prompt is none, but consent is required + response = app.get(make_url('oidc-authorize', params={ + 'client_id': oidc_client.client_id, + 'redirect_uri': redirect_uri, + 'response_type': response_type, + 'scope': 'openid email profile', + 'prompt': 'none', + })) + assert_authorize_error( + response, + 'consent_required', + 'Consent is required but prompt parameter is "none"', + message=False) + + # user do not consent + response = app.get(make_url('oidc-authorize', params={ + 'client_id': oidc_client.client_id, + 'redirect_uri': redirect_uri, + 'response_type': response_type, + 'scope': 'openid email profile', + })) response = response.form.submit('refuse') - assert_oidc_error(response, 'access_denied', error_description='user denied access', - fragment=fragment) + assert_authorize_error( + response, + 'access_denied', + 'User consent refused', + message=False) # authorization exists authorize = OIDCAuthorization.objects.create( client=oidc_client, user=simple_user, scopes='openid profile email', expired=now() + datetime.timedelta(days=2)) - response = app.get(authorize_url) + response = app.get(make_url('oidc-authorize', params={ + 'client_id': oidc_client.client_id, + 'redirect_uri': redirect_uri, + 'response_type': response_type, + 'scope': 'openid email profile', + })) if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE: - assert_authorization_response(response, code=None, fragment=fragment) + assert_authorization_response(response, code=None) elif oidc_client.authorization_flow == oidc_client.FLOW_IMPLICIT: assert_authorization_response(response, access_token=None, id_token=None, expires_in=None, - token_type=None, fragment=fragment) + token_type=None, fragment=True) # client ask for explicit authorization authorize_url = make_url('oidc-authorize', params={ @@ -729,7 +745,7 @@ def test_invalid_request(caplog, oidc_settings, oidc_client, simple_user, app): }, headers=client_authentication_headers(oidc_client), status=400) assert 'error' in response.json assert response.json['error'] == 'invalid_request' - assert response.json['error_description'] == 'code has expired or user is disconnected' + assert response.json['error_description'] == 'Parameter "code" has expired or user is disconnected' # invalid logout logout_url = make_url('oidc-logout', params={ @@ -755,7 +771,7 @@ def test_invalid_request(caplog, oidc_settings, oidc_client, simple_user, app): }, headers=client_authentication_headers(oidc_client), status=400) assert 'error' in response.json assert response.json['error'] == 'invalid_request' - assert response.json['error_description'] == 'code has expired or user is disconnected' + assert response.json['error_description'] == 'Parameter "code" has expired or user is disconnected' def test_expired_manager(db, simple_user): @@ -1029,15 +1045,22 @@ def test_oidclient_preferred_username_as_identifier_data_migration(migration): if client.name == 'test1': continue if client.name == 'test3': - OIDCClaim.objects.create(client=client, name='preferred_username', value='django_user_full_name', scopes='profile') + OIDCClaim.objects.create(client=client, name='preferred_username', + value='django_user_full_name', + scopes='profile') else: - OIDCClaim.objects.create(client=client, name='preferred_username', value='django_user_username', scopes='profile') + OIDCClaim.objects.create(client=client, name='preferred_username', + value='django_user_username', + scopes='profile') OIDCClaim.objects.create(client=client, name='given_name', value='django_user_first_name', scopes='profile') OIDCClaim.objects.create(client=client, name='family_name', value='django_user_last_name', scopes='profile') if client.name == 'test2': continue - OIDCClaim.objects.create(client=client, name='email', value='django_user_email', scopes='email') - OIDCClaim.objects.create(client=client, name='email_verified', value='django_user_email_verified', scopes='email') + OIDCClaim.objects.create(client=client, name='email', + value='django_user_email', scopes='email') + OIDCClaim.objects.create(client=client, name='email_verified', + value='django_user_email_verified', + scopes='email') new_apps = migration.apply(migrate_to) OIDCClient = new_apps.get_model('authentic2_idp_oidc', 'OIDCClient') @@ -1047,16 +1070,38 @@ def test_oidclient_preferred_username_as_identifier_data_migration(migration): claims = client.oidcclaim_set.all() if client.name == 'test': assert claims.count() == 5 - assert sorted(claims.values_list('name', flat=True)) == [u'email', u'email_verified', u'family_name', u'given_name', u'preferred_username'] - assert sorted(claims.values_list('value', flat=True)) == [u'django_user_email', u'django_user_email_verified', u'django_user_first_name', u'django_user_identifier', u'django_user_last_name'] + assert ( + sorted(claims.values_list('name', flat=True)) + == ['email', 'email_verified', 'family_name', 'given_name', 'preferred_username'] + ) + assert ( + sorted(claims.values_list('value', flat=True)) + == ['django_user_email', 'django_user_email_verified', + 'django_user_first_name', 'django_user_identifier', + 'django_user_last_name'] + ) elif client.name == 'test2': assert claims.count() == 3 - assert sorted(claims.values_list('name', flat=True)) == [u'family_name', u'given_name', u'preferred_username'] - assert sorted(claims.values_list('value', flat=True)) == [u'django_user_first_name', u'django_user_last_name', u'django_user_username'] + assert ( + sorted(claims.values_list('name', flat=True)) + == ['family_name', 'given_name', 'preferred_username'] + ) + assert ( + sorted(claims.values_list('value', flat=True)) + == ['django_user_first_name', 'django_user_last_name', 'django_user_username'] + ) elif client.name == 'test3': assert claims.count() == 5 - assert sorted(claims.values_list('name', flat=True)) == [u'email', u'email_verified', u'family_name', u'given_name', u'preferred_username'] - assert sorted(claims.values_list('value', flat=True)) == [u'django_user_email', u'django_user_email_verified', u'django_user_first_name', u'django_user_full_name', u'django_user_last_name'] + assert ( + sorted(claims.values_list('name', flat=True)) + == ['email', 'email_verified', 'family_name', 'given_name', 'preferred_username'] + ) + assert ( + sorted(claims.values_list('value', flat=True)) + == ['django_user_email', 'django_user_email_verified', + 'django_user_first_name', 'django_user_full_name', + 'django_user_last_name'] + ) else: assert claims.count() == 0 @@ -1131,7 +1176,7 @@ def test_claim_default_value(oidc_settings, normal_oidc_client, simple_user, app access_token = response.json['access_token'] id_token = response.json['id_token'] - k=base64.b64encode(oidc_client.client_secret.encode('utf-8')) + k = base64.b64encode(oidc_client.client_secret.encode('utf-8')) key = JWK(kty='oct', k=force_text(k)) jwt = JWT(jwt=id_token, key=key) claims = json.loads(jwt.claims) @@ -1181,20 +1226,18 @@ def test_claim_default_value(oidc_settings, normal_oidc_client, simple_user, app def test_claim_templated(oidc_settings, normal_oidc_client, simple_user, app): oidc_settings.A2_IDP_OIDC_SCOPES = ['openid', 'profile', 'email'] - OIDCClaim.objects.filter( - client=normal_oidc_client, name='given_name').delete() - OIDCClaim.objects.filter( - client=normal_oidc_client, name='family_name').delete() - claim1 = OIDCClaim.objects.create( - client=normal_oidc_client, - name='given_name', - value='{{ django_user_first_name|add:"ounet" }}', - scopes='profile') - claim2 = OIDCClaim.objects.create( - client=normal_oidc_client, - name='family_name', - value='{{ "Von der "|add:django_user_last_name }}', - scopes='profile') + OIDCClaim.objects.filter(client=normal_oidc_client, name='given_name').delete() + OIDCClaim.objects.filter(client=normal_oidc_client, name='family_name').delete() + OIDCClaim.objects.create( + client=normal_oidc_client, + name='given_name', + value='{{ django_user_first_name|add:"ounet" }}', + scopes='profile') + OIDCClaim.objects.create( + client=normal_oidc_client, + name='family_name', + value='{{ "Von der "|add:django_user_last_name }}', + scopes='profile') normal_oidc_client.authorization_flow = normal_oidc_client.FLOW_AUTHORIZATION_CODE normal_oidc_client.authorization_mode = normal_oidc_client.AUTHORIZATION_MODE_NONE normal_oidc_client.save() @@ -1337,7 +1380,7 @@ def test_credentials_grant(app, oidc_client, admin, simple_user): oidc_client.save() token_url = make_url('oidc-token') if oidc_client.idtoken_algo == OIDCClient.ALGO_HMAC: - k=base64url(oidc_client.client_secret.encode('utf-8')) + k = base64url(oidc_client.client_secret.encode('utf-8')) jwk = JWK(kty='oct', k=force_text(k)) elif oidc_client.idtoken_algo == OIDCClient.ALGO_RSA: jwk = get_first_rsa_sig_key() @@ -1396,10 +1439,10 @@ def test_credentials_grant_ratelimitation_invalid_client( for i in range(int(oidc_settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT.split('/')[0])): response = app.post(token_url, params=params, status=400) assert response.json['error'] == 'invalid_client' - assert 'client authentication failed' in response.json['error_description'] + assert 'Wrong client\'s secret' in response.json['error_description'] response = app.post(token_url, params=params, status=400) assert response.json['error'] == 'invalid_request' - assert 'reached rate limitation' in response.json['error_description'] + assert response.json['error_description'] == 'Rate limit exceeded for IP address "127.0.0.1"' def test_credentials_grant_ratelimitation_valid_client( @@ -1419,8 +1462,8 @@ def test_credentials_grant_ratelimitation_valid_client( for i in range(int(oidc_settings.A2_IDP_OIDC_PASSWORD_GRANT_RATELIMIT.split('/')[0])): app.post(token_url, params=params) response = app.post(token_url, params=params, status=400) - assert response.json['error'] == 'invalid_request' - assert 'reached rate limitation' in response.json['error_description'] + assert response.json['error'] == 'invalid_client' + assert response.json['error_description'] == 'Rate limit of 100/m exceeded for client "oidcclient"' def test_credentials_grant_retrytimout( @@ -1436,7 +1479,7 @@ def test_credentials_grant_retrytimout( 'client_secret': oidc_client.client_secret, 'grant_type': 'password', 'username': simple_user.username, - 'password': u'SurelyNotTheRightPassword', + 'password': 'SurelyNotTheRightPassword', } attempts = 0 while attempts < 100: @@ -1444,7 +1487,7 @@ def test_credentials_grant_retrytimout( attempts += 1 if attempts >= 10: assert response.json['error'] == 'invalid_request' - assert 'too many attempts with erroneous RO password' in response.json['error_description'] + assert 'Too many attempts with erroneous RO password' in response.json['error_description'] # freeze some time after backoff delay expiration freezer.move_to(datetime.timedelta(days=2)) @@ -1462,7 +1505,7 @@ def test_credentials_grant_invalid_flow( 'client_secret': oidc_client.client_secret, 'grant_type': 'password', 'username': simple_user.username, - 'password': u'SurelyNotTheRightPassword', + 'password': 'SurelyNotTheRightPassword', } token_url = make_url('oidc-token') response = app.post(token_url, params=params, status=400) @@ -1484,7 +1527,7 @@ def test_credentials_grant_invalid_client( token_url = make_url('oidc-token') response = app.post(token_url, params=params, status=400) assert response.json['error'] == 'invalid_client' - assert response.json['error_description'] == 'client authentication failed' + assert response.json['error_description'] == 'Wrong client\'s secret' def test_credentials_grant_unauthz_client( @@ -1499,7 +1542,7 @@ def test_credentials_grant_unauthz_client( token_url = make_url('oidc-token') response = app.post(token_url, params=params, status=400) assert response.json['error'] == 'unauthorized_client' - assert 'client is not configured for resource owner'in response.json['error_description'] + assert 'Client is not configured for resource owner' in response.json['error_description'] def test_credentials_grant_invalid_content_type( @@ -1519,7 +1562,7 @@ def test_credentials_grant_invalid_content_type( content_type='multipart/form-data', status=400) assert response.json['error'] == 'invalid_request' - assert 'wrong content type' in response.json['error_description'] + assert 'Wrong content type' in response.json['error_description'] def test_credentials_grant_ou_selection_simple( @@ -1535,11 +1578,13 @@ def test_credentials_grant_ou_selection_simple( 'password': user_ou1.username, } token_url = make_url('oidc-token') - response = app.post(token_url, params=params) + response = app.post(token_url, params=params, status=200) params['username'] = user_ou2.username params['password'] = user_ou2.password response = app.post(token_url, params=params, status=400) + assert response.json['error'] == 'access_denied' + assert response.json['error_description'] == 'Invalid user credentials' def test_credentials_grant_ou_selection_username_not_unique( @@ -1560,13 +1605,11 @@ def test_credentials_grant_ou_selection_username_not_unique( } token_url = make_url('oidc-token') response = app.post(token_url, params=params) - assert OIDCAccessToken.objects.get( - uuid=response.json['access_token']).user == user_ou1 + assert OIDCAccessToken.objects.get(uuid=response.json['access_token']).user == user_ou1 params['ou_slug'] = ou2.slug response = app.post(token_url, params=params) - assert OIDCAccessToken.objects.get( - uuid=response.json['access_token']).user == admin_ou2 + assert OIDCAccessToken.objects.get(uuid=response.json['access_token']).user == admin_ou2 def test_credentials_grant_ou_selection_username_not_unique_wrong_ou( @@ -1589,10 +1632,14 @@ def test_credentials_grant_ou_selection_username_not_unique_wrong_ou( params['username'] = admin_ou2.username params['password'] = admin_ou2.password response = app.post(token_url, params=params, status=400) + assert response.json['error'] == 'access_denied' + assert response.json['error_description'] == 'Invalid user credentials' def test_credentials_grant_ou_selection_invalid_ou( app, oidc_client, admin, user_ou1, settings): + oidc_client.authorization_flow = OIDCClient.FLOW_RESOURCE_OWNER_CRED + oidc_client.save() params = { 'client_id': oidc_client.client_id, 'client_secret': oidc_client.client_secret, @@ -1603,6 +1650,8 @@ def test_credentials_grant_ou_selection_invalid_ou( } token_url = make_url('oidc-token') response = app.post(token_url, params=params, status=400) + assert response.json['error'] == 'invalid_request' + assert response.json['error_description'] == 'Parameter "ou_slug" does not match an existing organizational unit' def test_oidc_client_clean(): -- 2.29.2