From 6b9f4fc05c0c804101fb33e075b210e29c646b49 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Tue, 29 Nov 2022 16:32:45 +0100 Subject: [PATCH 3/3] misc: proxy passive SSO from SAML2 services to OIDC idps (#27135) Behaviour of the SAML2 when receiving a Passive AuthnRequest and not user is logged is modified. Before an immediate response with StatusCode no-passive was returned. Now if one authenticator with the method passive_login is found, the request is transferred to this authentication source. --- src/authentic2/idp/saml/saml2_endpoints.py | 29 +++++++-- src/authentic2/views.py | 31 +++++++++ src/authentic2_auth_oidc/models.py | 5 ++ src/authentic2_auth_oidc/views.py | 7 +- tests/test_auth_oidc.py | 76 ++++++++++++++++++++++ tests/test_idp_saml2.py | 59 +++++++++++++++++ tests/test_views.py | 23 +++++++ 7 files changed, 224 insertions(+), 6 deletions(-) diff --git a/src/authentic2/idp/saml/saml2_endpoints.py b/src/authentic2/idp/saml/saml2_endpoints.py index 56a31b33..3fa27552 100644 --- a/src/authentic2/idp/saml/saml2_endpoints.py +++ b/src/authentic2/idp/saml/saml2_endpoints.py @@ -114,6 +114,7 @@ from authentic2.utils.misc import get_backends as get_idp_backends from authentic2.utils.misc import login_require, make_url from authentic2.utils.service import set_service from authentic2.utils.view_decorators import check_view_restriction, enable_view_restriction +from authentic2.views import passive_login from . import app_settings @@ -629,17 +630,28 @@ def sso(request): return sso_after_process_request(request, login, nid_format=nid_format) +def make_continue_url(login, nid_format): + nonce = login.request.id or get_nonce() + save_key_values(nonce, force_str(login.dump()), False, nid_format) + return make_url(continue_sso, params={NONCE_FIELD_NAME: nonce}) + + +def saml_passive_login(request, login, nid_format): + return passive_login( + request, + next_url=make_continue_url(login, nid_format), + login_hint=get_login_hints_extension(login), + ) + + def need_login(request, login, nid_format): """Redirect to the login page with a nonce parameter to verify later that the login form was submitted """ nonce = login.request.id or get_nonce() - save_key_values(nonce, force_str(login.dump()), False, nid_format) - next_url = make_url(continue_sso, params={NONCE_FIELD_NAME: nonce}) - logger.debug('redirect to login page with next url %s', next_url) return login_require( request, - next_url=next_url, + next_url=make_continue_url(login, nid_format), params={NONCE_FIELD_NAME: nonce}, login_hint=get_login_hints_extension(login), ) @@ -722,6 +734,7 @@ def continue_sso(request): consent_obtained=consent_obtained, consent_attribute_answer=consent_attribute_answer, nid_format=nid_format, + passive_login=False, ) @@ -769,6 +782,7 @@ def sso_after_process_request( user=None, nid_format='transient', return_profile=False, + passive_login=True, ): """Common path for sso and idp_initiated_sso. @@ -802,7 +816,12 @@ def sso_after_process_request( return need_login(request, login, nid_format) # No user is authenticated and passive is True, deny request - if passive and user.is_anonymous: + if passive and not user.is_authenticated: + # passive_login is false if caller is continue_sso (after a failed passive login) + if passive_login: + passive_login_response = saml_passive_login(request, login, nid_format) + if passive_login_response is not None: + return passive_login_response logger.debug('no user connected and passive request, returning NoPassive') set_saml2_response_responder_status_code(login.response, lasso.SAML2_STATUS_CODE_NO_PASSIVE) return finish_sso(request, login) diff --git a/src/authentic2/views.py b/src/authentic2/views.py index 2e044592..8a36120b 100644 --- a/src/authentic2/views.py +++ b/src/authentic2/views.py @@ -331,6 +331,37 @@ class EmailChangeVerifyView(TemplateView): email_change_verify = EmailChangeVerifyView.as_view() +def passive_login(request, *, next_url, login_hint=None): + '''View to use in IdP backends to implement passive login toward IdPs''' + service = get_service(request) + authenticators = utils_misc.get_authenticators() + + login_hint = login_hint or {} + show_ctx = make_condition_context(request=request, login_hint=login_hint) + if service: + show_ctx['service_ou_slug'] = service.ou and service.ou.slug + show_ctx['service_slug'] = service.slug + show_ctx['service'] = service + else: + show_ctx['service_ou_slug'] = '' + show_ctx['service_slug'] = '' + show_ctx['service'] = None + visible_authenticators = [ + authenticator + for authenticator in authenticators + if (authenticator.shown(ctx=show_ctx) and getattr(authenticator, 'passive_login', None)) + ] + + if len(visible_authenticators) != 1: + return None + unique_authenticator = visible_authenticators[0] + return unique_authenticator.passive_login( + request, + block_id=unique_authenticator.get_identifier(), + next_url=next_url, + ) + + @csrf_exempt @ensure_csrf_cookie @never_cache diff --git a/src/authentic2_auth_oidc/models.py b/src/authentic2_auth_oidc/models.py index 8bac71a2..949520ae 100644 --- a/src/authentic2_auth_oidc/models.py +++ b/src/authentic2_auth_oidc/models.py @@ -237,6 +237,11 @@ class OIDCProvider(BaseAuthenticator): return views.oidc_login(request, pk=self.pk, next_url=next_url) + def passive_login(self, request, block_id, next_url): + from . import views + + return views.oidc_login(request, pk=self.pk, next_url=next_url, passive=True) + def login(self, request, *args, **kwargs): context = kwargs.get('context', {}).copy() context['provider'] = self diff --git a/src/authentic2_auth_oidc/views.py b/src/authentic2_auth_oidc/views.py index 70e27d93..602eb512 100644 --- a/src/authentic2_auth_oidc/views.py +++ b/src/authentic2_auth_oidc/views.py @@ -42,7 +42,7 @@ def make_nonce(state): return hashlib.sha256(state.encode() + settings.SECRET_KEY.encode()).hexdigest() -def oidc_login(request, pk, next_url=None, *args, **kwargs): +def oidc_login(request, pk, next_url=None, passive=None, *args, **kwargs): provider = get_provider(pk) scopes = set(provider.scopes.split()) | {'openid'} state_id = str(uuid.uuid4()) @@ -58,6 +58,11 @@ def oidc_login(request, pk, next_url=None, *args, **kwargs): } if next_url: state_content['next'] = next_url + if passive is True or passive is False: + if passive: + prompt.add('none') + else: + prompt.add('login') params = { 'client_id': provider.client_id, 'scope': ' '.join(scopes), diff --git a/tests/test_auth_oidc.py b/tests/test_auth_oidc.py index fd9efecb..78c5c5a4 100644 --- a/tests/test_auth_oidc.py +++ b/tests/test_auth_oidc.py @@ -21,6 +21,7 @@ import random import re import time import urllib.parse +from unittest import mock import pytest from django.contrib.auth import get_user_model @@ -45,6 +46,7 @@ from authentic2.utils.misc import last_authentication_event from authentic2_auth_oidc.backends import OIDCBackend from authentic2_auth_oidc.models import OIDCAccount, OIDCClaimMapping, OIDCProvider from authentic2_auth_oidc.utils import IDToken, IDTokenError, parse_id_token, register_issuer +from authentic2_auth_oidc.views import oidc_login from . import utils @@ -1394,3 +1396,77 @@ def test_double_link(app, caplog, code, simple_user, oidc_provider_jwkset): warnings = response.pyquery('.warning') assert len(warnings) == 1 assert 'Your email is already linked' in warnings.text() + + +@mock.patch('authentic2_auth_oidc.views.get_provider') +def test_oidc_login(get_provider, rf): + AUTHORIZE_URL = 'https://op.example.com/authorize' + SCOPES = {'profile'} + + provider = OIDCProvider( + pk=1, client_id='1234', authorization_endpoint=AUTHORIZE_URL, scopes=' '.join(SCOPES) + ) + get_provider.return_value = provider + + url = oidc_login(rf.get('/', secure=True), 1, next_url='/idp/x/').url + assert url + prefix, query = url.split('?', 1) + assert prefix == AUTHORIZE_URL + qs = dict(urllib.parse.parse_qsl(query)) + assert qs['client_id'] == '1234' + assert qs['nonce'] + assert qs['state'] + assert qs['redirect_uri'] == 'https://testserver/accounts/oidc/callback/' + assert qs['ui_locales'] == 'en' + assert set(qs['scope'].split()) == {'profile', 'openid'} + assert 'prompt' not in qs + + # passive + url = oidc_login(rf.get('/', secure=True), 1, next_url='/idp/x/', passive=True).url + prefix, query = url.split('?', 1) + qs = dict(urllib.parse.parse_qsl(query)) + assert qs['prompt'] == 'none' + + # not passive + url = oidc_login(rf.get('/', secure=True), 1, next_url='/idp/x/', passive=False).url + prefix, query = url.split('?', 1) + qs = dict(urllib.parse.parse_qsl(query)) + assert qs['prompt'] == 'login' + + +@mock.patch('authentic2_auth_oidc.views.get_provider') +def test_autorun(get_provider, rf): + AUTHORIZE_URL = 'https://op.example.com/authorize' + SCOPES = {'profile'} + + provider = OIDCProvider( + pk=1, client_id='1234', authorization_endpoint=AUTHORIZE_URL, scopes=' '.join(SCOPES) + ) + get_provider.return_value = provider + req = rf.get('/?next=/idp/x/') + req.user = mock.Mock() + req.user.is_authenticated = False + + url = provider.autorun(req, block_id=1, next_url='/').url + _, query = url.split('?', 1) + qs = dict(urllib.parse.parse_qsl(query)) + assert 'prompt' not in qs + + +@mock.patch('authentic2_auth_oidc.views.get_provider') +def test_passive_login(get_provider, rf): + AUTHORIZE_URL = 'https://op.example.com/authorize' + SCOPES = {'profile'} + + provider = OIDCProvider( + pk=1, client_id='1234', authorization_endpoint=AUTHORIZE_URL, scopes=' '.join(SCOPES) + ) + get_provider.return_value = provider + req = rf.get('/?next=/idp/x/') + req.user = mock.Mock() + req.user.is_authenticated = False + + url = provider.passive_login(req, block_id=1, next_url='/').url + _, query = url.split('?', 1) + qs = dict(urllib.parse.parse_qsl(query)) + assert qs['prompt'] == 'none' diff --git a/tests/test_idp_saml2.py b/tests/test_idp_saml2.py index 2322f156..342178c0 100644 --- a/tests/test_idp_saml2.py +++ b/tests/test_idp_saml2.py @@ -27,6 +27,7 @@ import lasso import pytest from django.contrib.auth import REDIRECT_FIELD_NAME from django.core.files import File +from django.http import HttpResponseRedirect from django.template import Context, Template from django.urls import reverse from django.utils.encoding import force_bytes, force_str @@ -1055,3 +1056,61 @@ def test_sso_view_restriction(app, idp, user, cgu_attribute): scenario.launch_authn_request() scenario.login(user=user) assert scenario.idp_response.location.startswith('/accounts/edit/required/?') + + +def test_sso_is_passive(app, idp, user, cgu_attribute, caplog): + scenario = Scenario( + app, + make_authn_request_kwargs={'is_passive': True}, + authn_request_success=False, + ) + scenario.launch_authn_request() + + with pytest.raises(lasso.ProfileStatusNotSuccessError): + scenario.handle_post_response() + + assert ( + scenario.sp.login.response.status.statusCode.value == 'urn:oasis:names:tc:SAML:2.0:status:Responder' + ) + assert ( + scenario.sp.login.response.status.statusCode.statusCode.value + == 'urn:oasis:names:tc:SAML:2.0:status:NoPassive' + ) + + +def test_sso_with_authenticator_passive_sso(app, idp, user, cgu_attribute, caplog): + scenario = Scenario( + app, + make_authn_request_kwargs={'is_passive': True}, + authn_request_success=False, + ) + + authenticator = mock.Mock() + authenticator.show.return_value = True + + mock_passive_login = mock.Mock() + + def passive_login(request, block_id, next_url, passive=None): + mock_passive_login(request=request, block_id=block_id, next_url=next_url, passive=passive) + return HttpResponseRedirect('https://idp.example.com/?passive') + + authenticator.passive_login = passive_login + + with mock.patch('authentic2.utils.misc.get_authenticators', return_value=[authenticator]): + scenario.launch_authn_request() + + assert scenario.idp_response.location == 'https://idp.example.com/?passive' + assert mock_passive_login.call_args[1]['next_url'].startswith('/idp/saml2/continue?nonce=') + + # check NoPassive status code response after if conitnue is called and still no user is logged in + response = app.get(mock_passive_login.call_args[1]['next_url']) + scenario.idp_response = response + with pytest.raises(lasso.ProfileStatusNotSuccessError): + scenario.handle_post_response() + assert ( + scenario.sp.login.response.status.statusCode.value == 'urn:oasis:names:tc:SAML:2.0:status:Responder' + ) + assert ( + scenario.sp.login.response.status.statusCode.statusCode.value + == 'urn:oasis:names:tc:SAML:2.0:status:NoPassive' + ) diff --git a/tests/test_views.py b/tests/test_views.py index f5c8a70f..731fd5c0 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -26,6 +26,7 @@ from django.utils.html import escape from authentic2.custom_user.models import DeletedUser, User from authentic2.forms.passwords import PasswordChangeForm, SetPasswordForm from authentic2.models import Attribute +from authentic2.views import passive_login from .utils import assert_event, get_link_from_mail, login, logout @@ -419,3 +420,25 @@ def test_redirected_views(app): assert ( app.get('/accounts/password/reset/confirm/abcd1234/').location == '/password/reset/confirm/abcd1234/' ) + + +def test_passive_login(rf): + from django.contrib.sessions.middleware import SessionMiddleware + + req = rf.get('/') + SessionMiddleware(lambda x: None).process_request(req) + assert passive_login(req, next_url='/', login_hint={'pop'}) is None + + authenticator1 = mock.Mock() + authenticator1.show.return_value = True + authenticator1.passive_login.return_value = 'response' + authenticator2 = mock.Mock() + authenticator2.show.return_value = True + authenticator2.passive_login.return_value = 'response' + + with mock.patch( + 'authentic2.utils.misc.get_authenticators', return_value=[authenticator1, authenticator2] + ): + assert passive_login(req, next_url='/', login_hint={'pop'}) is None + authenticator2.passive_login = None + assert passive_login(req, next_url='/', login_hint={'pop'}) == 'response' -- 2.37.2