From cd5d8b4c660706dfbb3cf334b19721a40269d392 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Thu, 13 Aug 2020 16:48:24 +0200 Subject: [PATCH] misc: validate and use a real identifier for services (#45672) --- src/authentic2/authenticators.py | 28 +++++++-------- src/authentic2/utils/__init__.py | 17 +++++---- src/authentic2/utils/service.py | 62 ++++++++++++++++++++++++++++++++ src/authentic2/views.py | 26 +++++++------- src/authentic2_auth_fc/views.py | 18 +++++----- 5 files changed, 109 insertions(+), 42 deletions(-) create mode 100644 src/authentic2/utils/service.py diff --git a/src/authentic2/authenticators.py b/src/authentic2/authenticators.py index 9ad2a52f..931963bb 100644 --- a/src/authentic2/authenticators.py +++ b/src/authentic2/authenticators.py @@ -22,8 +22,9 @@ from django.utils.translation import ugettext as _, ugettext_lazy from authentic2.a2_rbac.models import OrganizationalUnit as OU, Role from authentic2.custom_user.models import User -from . import views, app_settings, utils, constants +from . import views, app_settings, utils from .utils.views import csrf_token_check +from .utils.service import get_service_from_request from .forms import authentication as authentication_forms from .utils.evaluate import evaluate_condition @@ -64,11 +65,8 @@ class LoginPasswordAuthenticator(BaseAuthenticator): def name(self): return ugettext_lazy('Password') - def get_service_ous(self, request): - service_slug = request.GET.get(constants.SERVICE_FIELD_NAME) - if not service_slug: - return [] - roles = Role.objects.filter(allowed_services__slug=service_slug).children() + def get_service_ous(self, service): + roles = Role.objects.filter(allowed_services=service).children() if not roles: return [] service_ou_ids = [] @@ -81,20 +79,21 @@ class LoginPasswordAuthenticator(BaseAuthenticator): return [] return OU.objects.filter(pk__in=service_ou_ids) - def get_preferred_ous(self, request): + def get_preferred_ous(self, request, service): preferred_ous_cookie = utils.get_remember_cookie(request, 'preferred-ous') preferred_ous = [] if preferred_ous_cookie: preferred_ous.extend(OU.objects.filter(pk__in=preferred_ous_cookie)) # for the special case of services open to only one OU, pre-select it - for ou in self.get_service_ous(request): - if ou in preferred_ous: - continue - preferred_ous.append(ou) + if service: + for ou in self.get_service_ous(service): + if ou in preferred_ous: + continue + preferred_ous.append(ou) return preferred_ous def login(self, request, *args, **kwargs): - service_slug = request.GET.get(constants.SERVICE_FIELD_NAME) + service = get_service_from_request(request) context = kwargs.get('context', {}) is_post = request.method == 'POST' and self.submit_name in request.POST data = request.POST if is_post else None @@ -103,7 +102,7 @@ class LoginPasswordAuthenticator(BaseAuthenticator): # Special handling when the form contains an OU selector if app_settings.A2_LOGIN_FORM_OU_SELECTOR: - preferred_ous = self.get_preferred_ous(request) + preferred_ous = self.get_preferred_ous(request, service) if preferred_ous: initial['ou'] = preferred_ous[0] @@ -128,8 +127,7 @@ class LoginPasswordAuthenticator(BaseAuthenticator): if form.cleaned_data.get('remember_me'): request.session['remember_me'] = True request.session.set_expiry(app_settings.A2_USER_REMEMBER_ME) - response = utils.login(request, form.get_user(), how, - service_slug=service_slug) + response = utils.login(request, form.get_user(), how, service=service) if 'ou' in form.fields: utils.prepend_remember_cookie(request, response, 'preferred-ous', form.cleaned_data['ou'].pk) diff --git a/src/authentic2/utils/__init__.py b/src/authentic2/utils/__init__.py index d0e53995..61fcbc5c 100644 --- a/src/authentic2/utils/__init__.py +++ b/src/authentic2/utils/__init__.py @@ -63,6 +63,7 @@ from authentic2.saml.saml2utils import filter_attribute_private_key, \ filter_element_private_key from .. import plugins, app_settings, constants, crypto +from .service import set_service class CleanLogMessage(logging.Filter): @@ -431,12 +432,14 @@ def last_authentication_event(request=None, session=None): return None -def login(request, user, how, service_slug=None, nonce=None, **kwargs): +def login(request, user, how, service=None, service_slug=None, nonce=None, **kwargs): '''Login a user model, record the authentication event and redirect to next URL or settings.LOGIN_REDIRECT_URL.''' from .. import hooks from .views import check_cookie_works + if service: + service_slug = service.slug check_cookie_works(request) last_login = user.last_login auth_login(request, user) @@ -459,7 +462,7 @@ def login_require(request, next_url=None, login_url='auth_login', service=None, params = kwargs.setdefault('params', {}) params[REDIRECT_FIELD_NAME] = next_url if service: - params['service'] = service.slug + set_service(params, service) if login_hint: request.session['login-hint'] = list(login_hint) elif 'login-hint' in request.session: @@ -679,13 +682,13 @@ def get_fk_model(model, fieldname): return field.related_model -def get_registration_url(request, service_slug=None): +def get_registration_url(request, service=None): next_url = select_next_url(request, settings.LOGIN_REDIRECT_URL) next_url = make_url(next_url, request=request, keep_params=True, include=(constants.NONCE_FIELD_NAME,), resolve=False) params = {REDIRECT_FIELD_NAME: next_url} - if service_slug: - params[constants.SERVICE_FIELD_NAME] = service_slug + if service: + set_service(params, service) return make_url('registration_register', params=params) @@ -1042,12 +1045,12 @@ def same_origin(url1, url2): def simulate_authentication(request, user, method, backend='authentic2.backends.models_backend.ModelBackend', - service_slug=None, **kwargs): + service=None, **kwargs): '''Simulate a normal login by forcing a backend attribute on the user instance''' # do not modify the passed user user = copy.deepcopy(user) user.backend = backend - return login(request, user, method, service_slug=service_slug, **kwargs) + return login(request, user, method, service=service, **kwargs) def get_manager_login_url(): diff --git a/src/authentic2/utils/service.py b/src/authentic2/utils/service.py new file mode 100644 index 00000000..040b8342 --- /dev/null +++ b/src/authentic2/utils/service.py @@ -0,0 +1,62 @@ +# authentic2 - versatile identity manager +# Copyright (C) 2010-2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from authentic2.models import Service +from authentic2.constants import SERVICE_FIELD_NAME + + +def service_ref(service): + if service.ou: + return '%s %s' % (service.ou.slug, service.slug) + else: + return service.slug + + +def get_service_from_ref(ref): + splitted = ref.split(' ') + + try: + ou_slug, service_slug = splitted + except ValueError: + pass + else: + return Service.objects.filter(ou__slug=ou_slug, slug=service_slug).first() + + try: + service_slug, = splitted + except ValueError: + service = Service.objects.filter(ou__isnull=True, slug=service_slug) + if service: + return service + try: + return Service.objects.get(slug=service_slug) + except (Service.DoesNotExist, Service.MultipleObjectsReturned): + return None + + +def get_service_from_request(request): + service_ref = request.GET.get(SERVICE_FIELD_NAME) + if not service_ref or '\x00' in service_ref: + return None + return get_service_from_ref(service_ref) + + +def get_service_from_token(params): + return get_service_from_ref(params[SERVICE_FIELD_NAME]) + + +def set_service_ref(params, service): + params[SERVICE_FIELD_NAME] = service_ref(service) diff --git a/src/authentic2/views.py b/src/authentic2/views.py index aa6b0ad0..d6cf4d2f 100644 --- a/src/authentic2/views.py +++ b/src/authentic2/views.py @@ -57,6 +57,7 @@ from django.template import loader from authentic2.compat.misc import default_token_generator from . import (utils, app_settings, decorators, constants, models, cbv, hooks, validators) +from .utils.service import get_service_from_request, get_service_from_token, set_service_ref from .utils import switch_user from .a2_rbac.utils import get_default_ou from .a2_rbac.models import OrganizationalUnit as OU @@ -261,6 +262,8 @@ def login(request, template_name='authentic2/login.html', redirect_to = request.GET.get(redirect_field_name) + service = get_service_from_request(request) + if not redirect_to or ' ' in redirect_to: redirect_to = settings.LOGIN_REDIRECT_URL # Heavier security check -- redirects to http://example.com should @@ -275,8 +278,7 @@ def login(request, template_name='authentic2/login.html', blocks = [] - registration_url = utils.get_registration_url( - request, service_slug=request.GET.get(constants.SERVICE_FIELD_NAME)) + registration_url = utils.get_registration_url(request, service=service) context = { 'cancel': nonce is not None, @@ -316,11 +318,11 @@ def login(request, template_name='authentic2/login.html', parameters = {'request': request, 'context': context} remote_addr = request.META.get('REMOTE_ADDR') - service = request.GET.get('service') login_hint = set(request.session.get('login-hint', [])) show_ctx = dict(remote_addr=remote_addr, login_hint=login_hint) - if service and models.Service.objects.filter(slug=service).exists(): - show_ctx['service_slug'] = service + if service: + show_ctx['service_slug'] = service.slug + show_ctx['service'] = service # check if the authenticator has multiple instances if hasattr(authenticator, 'instances'): for instance_id, instance in authenticator.instances(**parameters): @@ -835,9 +837,9 @@ class BaseRegistrationView(FormView): self.token[field] = form.cleaned_data[field] # propagate service to the registration completion view - if constants.SERVICE_FIELD_NAME in self.request.GET: - self.token[constants.SERVICE_FIELD_NAME] = \ - self.request.GET[constants.SERVICE_FIELD_NAME] + service = get_service_from_request(self.request) + if service: + set_service_ref(self.token, service) self.token.pop(REDIRECT_FIELD_NAME, None) self.token.pop('email', None) @@ -908,7 +910,7 @@ class RegistrationCompletionView(CreateView): self.email_is_unique |= self.ou.email_is_unique self.init_fields_labels_and_help_texts() # if registration is done during an SSO add the service to the registration event - self.service = self.token.get(constants.SERVICE_FIELD_NAME) + self.service = get_service_from_token(self.token) return super(RegistrationCompletionView, self) \ .dispatch(request, *args, **kwargs) @@ -1026,7 +1028,7 @@ class RegistrationCompletionView(CreateView): utils.simulate_authentication( request, self.users[0], method=self.authentication_method, - service_slug=self.service) + service=self.service) return utils.redirect(request, self.get_success_url()) confirm_data = self.token.get('confirm_data', False) @@ -1065,7 +1067,7 @@ class RegistrationCompletionView(CreateView): utils.simulate_authentication( request, user, method=self.authentication_method, - service_slug=self.service) + service=self.service) return utils.redirect(request, self.get_success_url()) return super(RegistrationCompletionView, self).post(request, *args, **kwargs) @@ -1101,7 +1103,7 @@ class RegistrationCompletionView(CreateView): utils.simulate_authentication( request, user, method=self.authentication_method, - service_slug=self.service) + service=self.service) message_template = loader.get_template('authentic2/registration_success_message.html') messages.info(self.request, message_template.render(request=request)) self.send_registration_success_email(user) diff --git a/src/authentic2_auth_fc/views.py b/src/authentic2_auth_fc/views.py index e64a11ce..8ec0ad61 100644 --- a/src/authentic2_auth_fc/views.py +++ b/src/authentic2_auth_fc/views.py @@ -47,6 +47,7 @@ from authentic2 import utils as a2_utils, hooks, constants from authentic2.a2_rbac.utils import get_default_ou from authentic2.forms.passwords import SetPasswordForm from authentic2.utils import views as views_utils +from authentic2.utils.service import get_service_from_request, set_service from . import app_settings, models, utils @@ -372,7 +373,7 @@ class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View): def get(self, request, *args, **kwargs): registration = True if 'registration' in request.GET else False '''Request an access grant code and associate it to the current user''' - self.service_slug = request.GET.get(constants.SERVICE_FIELD_NAME) + self.service = get_service_from_request(request) if request.user.is_authenticated: # Prevent to add a link with an FC account already linked with another user. try: @@ -446,7 +447,7 @@ class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View): return self.redirect(request) if user: views_utils.check_cookie_works(request) - a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug) + a2_utils.login(request, user, 'france-connect', service=self.service) # set session expiration policy to EXPIRE_AT_BROWSER_CLOSE request.session.set_expiry(0) self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user) @@ -457,8 +458,8 @@ class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View): return self.redirect(request) else: params = {} - if self.service_slug: - params[constants.SERVICE_FIELD_NAME] = self.service_slug + if self.service: + set_service(params, self.service) if registration: return self.redirect_and_come_back(request, a2_utils.make_url('fc-registration', @@ -496,8 +497,9 @@ class RegistrationView(PopupViewMixin, LoggerMixin, View): params = { REDIRECT_FIELD_NAME: redirect_to, } - if constants.SERVICE_FIELD_NAME in request.GET: - params[constants.SERVICE_FIELD_NAME] = request.GET[constants.SERVICE_FIELD_NAME] + service = get_service_from_request(request) + if service: + set_service(params, service) if self.get_in_popup(): params['popup'] = '' redirect_to = a2_utils.make_url('fc-login-or-link', params=params) @@ -510,8 +512,8 @@ class RegistrationView(PopupViewMixin, LoggerMixin, View): data['valid_email'] = False data['franceconnect'] = True data['authentication_method'] = 'france-connect' - if constants.SERVICE_FIELD_NAME in request.GET: - data[constants.SERVICE_FIELD_NAME] = request.GET[constants.SERVICE_FIELD_NAME] + if service: + set_service(data, service) activation_url = a2_utils.build_activation_url(request, next_url=redirect_to, **data) -- 2.28.0