From abf13814882c040e65270779b3da3cf56314de58 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Mon, 9 Mar 2020 12:02:55 +0100 Subject: [PATCH] rest_authentication: raise APIError for signature errors (#39911) --- hobo/rest_authentication.py | 27 ++++--- tests_authentic/test_rest_authentication.py | 87 ++++++++++++++++++++- 2 files changed, 101 insertions(+), 13 deletions(-) diff --git a/hobo/rest_authentication.py b/hobo/rest_authentication.py index 485c1c0..3d2f7bc 100644 --- a/hobo/rest_authentication.py +++ b/hobo/rest_authentication.py @@ -1,6 +1,6 @@ import logging -from rest_framework import authentication, exceptions +from rest_framework import authentication, exceptions, status from hobo import signature @@ -61,6 +61,14 @@ class AnonymousAdminServiceUser(AnonymousUser): return 'Publik Service Admin' +class PublikAuthenticationFailed(exceptions.APIException): + status_code = status.HTTP_401_UNAUTHORIZED + default_code = 'invalid-signature' + + def __init__(self, code): + self.detail = {'err': code} + + class PublikAuthentication(authentication.BaseAuthentication): def __init__(self, *args, **kwargs): self.logger = logging.getLogger(__name__) @@ -80,16 +88,15 @@ class PublikAuthentication(authentication.BaseAuthentication): try: return User.objects.get(uuid=name_id) except User.DoesNotExist: - raise exceptions.AuthenticationFailed('No user matches uuid=%r' % name_id) + raise PublikAuthenticationFailed('user-not-found') + elif UserSAMLIdentifier: try: return UserSAMLIdentifier.objects.get(name_id=name_id).user except UserSAMLIdentifier.DoesNotExist: - raise exceptions.AuthenticationFailed( - 'No user matches nameid=%r' % name_id) + raise PublikAuthenticationFailed('user-not-found') else: - raise exceptions.AuthenticationFailed( - 'No usable model to match nameid=%r' % name_id) + raise PublikAuthenticationFailed('no-usable-model') else: orig = request.GET['orig'] try: @@ -100,18 +107,18 @@ class PublikAuthentication(authentication.BaseAuthentication): klass = import_string(settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS) self.logger.info('anonymous signature validated') return klass() - raise exceptions.AuthenticationFailed('Anonymous service user is unsupported') + raise PublikAuthenticationFailed('no-user-for-orig') def get_orig_key(self, orig): if not hasattr(settings, 'KNOWN_SERVICES'): self.logger.warning('no known services') - raise exceptions.AuthenticationFailed('No KNOWN_SERVICES setting') + raise PublikAuthenticationFailed('no-known-services-setting') for service_id in settings.KNOWN_SERVICES: for slug, service in settings.KNOWN_SERVICES[service_id].items(): if service.get('verif_orig') == orig and service.get('secret'): return service['secret'] self.logger.warning('no secret found for origin %r', orig) - raise exceptions.AuthenticationFailed('no secret found for origin %r' % orig) + raise PublikAuthenticationFailed('no-secret-found-for-orig') def authenticate(self, request): full_path = request.get_full_path() @@ -120,7 +127,7 @@ class PublikAuthentication(authentication.BaseAuthentication): key = self.get_orig_key(request.GET['orig']) if not signature.check_url(full_path, key): self.logger.warning('invalid signature') - raise exceptions.AuthenticationFailed('Invalid signature') + raise PublikAuthenticationFailed('invalid-signature') user = self.resolve_user(request) self.logger.info('user authenticated with signature %s', user) return (user, None) diff --git a/tests_authentic/test_rest_authentication.py b/tests_authentic/test_rest_authentication.py index 349247d..f853c7a 100644 --- a/tests_authentic/test_rest_authentication.py +++ b/tests_authentic/test_rest_authentication.py @@ -1,12 +1,30 @@ +# hobo - portal to configure and deploy applications +# Copyright (C) 2015-2020 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + import pytest from django.utils.six.moves.urllib import parse as urllib -from rest_framework.exceptions import AuthenticationFailed - from django.contrib.auth import get_user_model from django.test import RequestFactory from tenant_schemas.utils import tenant_context +from rest_framework.views import APIView +from rest_framework.response import Response +from rest_framework import permissions + from hobo import signature, rest_authentication pytestmark = pytest.mark.django_db @@ -71,6 +89,69 @@ def test_publik_authentication(tenant, settings): request = factory.get(signature.sign_url(URL + AUTH_QUERY, key + 'zob')) publik_authentication = rest_authentication.PublikAuthentication() - with pytest.raises(AuthenticationFailed): + with pytest.raises(rest_authentication.PublikAuthenticationFailed) as exc_info: publik_authentication.authenticate(request) + assert exc_info.value.detail['err'] == 'invalid-signature' + + +def test_response(rf, settings, tenant): + + with tenant_context(tenant): + request = rf.get('/') + + del settings.KNOWN_SERVICES + + class View(APIView): + authentication_classes = (rest_authentication.PublikAuthentication,) + permission_classes = (permissions.IsAuthenticated,) + + def get(self, request, format=None): + return Response({'err': 0}) + + view = View.as_view() + + response = view(request) + assert response.status_code == 403 + + request = rf.get('/?signature=aaa&orig=zzz') + + response = view(request) + assert response.status_code == 401 + assert response.data['err'] == 'no-known-services-setting' + + secret_key = 'bbb' + settings.KNOWN_SERVICES = { + 'whatever': { + 'whatever': { + 'verif_orig': 'zzz', + } + } + } + + response = view(request) + assert response.status_code == 401 + assert response.data['err'] == 'no-secret-found-for-orig' + + settings.KNOWN_SERVICES['whatever']['whatever']['secret'] = secret_key + response = view(request) + assert response.status_code == 401 + assert response.data['err'] == 'invalid-signature' + + # User authentication + request = rf.get(signature.sign_url('/?orig=zzz&NameID=1234', secret_key)) + + response = view(request) + assert response.status_code == 401 + assert response.data == {'err': 'user-not-found'} + + # Service authentication + request = rf.get(signature.sign_url('/?orig=zzz', secret_key)) + + response = view(request) + assert response.status_code == 200 + assert response.data == {'err': 0} + del settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS + response = view(request) + assert response.status_code == 401 + assert response.data == {'err': 'no-user-for-orig'} -- 2.24.0