From e3f131b977d57404939c98b4ae42dc44fe6789f2 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Fri, 6 Nov 2015 13:30:53 +0100 Subject: [PATCH] add rest_framework authentication module for authentic2 --- hobo/rest_authentication.py | 63 +++++++++++++++++++++++++++++ tests_authentic/conftest.py | 3 ++ tests_authentic/test_rest_authentication.py | 41 +++++++++++++++++++ 3 files changed, 107 insertions(+) create mode 100644 hobo/rest_authentication.py create mode 100644 tests_authentic/test_rest_authentication.py diff --git a/hobo/rest_authentication.py b/hobo/rest_authentication.py new file mode 100644 index 0000000..90bd100 --- /dev/null +++ b/hobo/rest_authentication.py @@ -0,0 +1,63 @@ +import logging + +from rest_framework import authentication, exceptions + +from hobo import signature + +from authentic2.saml import models as saml_models +from django.contrib.auth import get_user_model +from django.conf import settings +from django.contrib.auth.models import AnonymousUser + +class AnonymousServiceUser(AnonymousUser): + '''This virtual user hold permissions for other publik services''' + def is_authenticated(self): + return True + + def has_perm(self, perm_or_perms, obj=None): + return 'a2_rbac.change_role' == perm_or_perms + + def __unicode__(self): + return 'Publik Authentikation Service User' + + +class PublikAuthentication(authentication.BaseAuthentication): + def __init__(self, *args, **kwargs): + self.logger = logging.getLogger(__name__) + super(PublikAuthentication, self).__init__(*args, **kwargs) + + def resolve_user(self, request): + if 'NameID' in request.GET: + name_id = request.GET['NameID'] + User = get_user_model() + try: + return User.objects.get(uuid=name_id) + except User.DoesNotExist: + self.logger.warning('no user found %r', uuid) + raise exceptions.AuthenticationFailed('No user matches uuid=%r' % uuid) + else: + self.logger.info('anonymous signature validated') + return AnonymousServiceUser() + + def get_orig_key(self, orig): + if not hasattr(settings, 'KNOWN_SERVICES'): + self.logger.warning('no known services') + raise exceptions.AuthenticationFailed('No KNOWN_SERVICES setting') + for service_id in settings.KNOWN_SERVICES: + for slug, service in settings.KNOWN_SERVICES[service_id].iteritems(): + if service.get('verif_orig') == orig and service.get('secret'): + return service['secret'] + self.logger.warning('no secret found for origin %r', orig) + raise exceptions.AuthenticationFailed('no secret found for origin %r' % orig) + + def authenticate(self, request): + full_path = request.get_full_path() + if not request.GET.get('orig') or not request.GET.get('signature'): + return None + key = self.get_orig_key(request.GET['orig']) + if not signature.check_url(full_path, key): + self.logger.warning('invalid signature') + raise exceptions.AuthenticationFailed('Invalid signature') + user = self.resolve_user(request) + self.logger.info('user authenticated with signature %s', user) + return (user, None) diff --git a/tests_authentic/conftest.py b/tests_authentic/conftest.py index fae0189..4b6c8a2 100644 --- a/tests_authentic/conftest.py +++ b/tests_authentic/conftest.py @@ -31,6 +31,7 @@ def tenant(db, request, settings, tenant_base): }, 'services': [ {'slug': 'test', + 'service-id': 'authentic', 'title': 'Test', 'this': True, 'secret_key': '12345', @@ -41,6 +42,8 @@ def tenant(db, request, settings, tenant_base): }, {'slug': 'other', 'title': 'Other', + 'service-id': 'welco', + 'secret_key': 'abcdef', 'base_url': 'http://other.example.net'}, ]}, fd) t = Tenant(domain_url=name, diff --git a/tests_authentic/test_rest_authentication.py b/tests_authentic/test_rest_authentication.py new file mode 100644 index 0000000..f4137b3 --- /dev/null +++ b/tests_authentic/test_rest_authentication.py @@ -0,0 +1,41 @@ +import pytest +import urllib + +from rest_framework.exceptions import AuthenticationFailed +from django.contrib.auth import get_user_model +from django.test import RequestFactory +from tenant_schemas.utils import tenant_context + +from hobo import signature, rest_authentication + +pytestmark = pytest.mark.django_db + +def test_publik_authentication(tenant, settings): + with tenant_context(tenant): + key = settings.KNOWN_SERVICES['welco']['other']['secret'] + + settings.HOBO_ROLE_EXPORT = False + User = get_user_model() + user = User.objects.create(username='foo', password='foo') + ORIG = 'other.example.net' + AUTH_QUERY = '&NameID=%s&&orig=%s' % (user.uuid, urllib.quote(ORIG)) + + URL = '/api/?coucou=zob' + factory = RequestFactory() + request = factory.get(signature.sign_url(URL + AUTH_QUERY, key)) + + publik_authentication = rest_authentication.PublikAuthentication() + result = publik_authentication.authenticate(request) + assert result is not None + assert isinstance(result, tuple) + assert len(result) == 2 + assert result[0] == user + assert result[1] == None + + # Failure + request = factory.get(signature.sign_url(URL + AUTH_QUERY, key+'zob')) + + publik_authentication = rest_authentication.PublikAuthentication() + with pytest.raises(AuthenticationFailed): + publik_authentication.authenticate(request) + -- 2.1.4