Projet

Général

Profil

0001-misc-validate-and-use-a-real-identifier-for-services.patch

Benjamin Dauvergne, 21 août 2020 16:39

Télécharger (20 ko)

Voir les différences:

Subject: [PATCH] misc: validate and use a real identifier for services
 (#45672)

 src/authentic2/authenticators.py | 28 ++++++-------
 src/authentic2/utils/__init__.py | 19 +++++----
 src/authentic2/utils/service.py  | 68 ++++++++++++++++++++++++++++++++
 src/authentic2/views.py          | 29 ++++++++------
 src/authentic2_auth_fc/views.py  | 18 +++++----
 tests/auth_fc/test_auth_fc.py    |  7 ++++
 tests/test_idp_oidc.py           |  4 +-
 tests/test_idp_saml2.py          |  2 +-
 8 files changed, 129 insertions(+), 46 deletions(-)
 create mode 100644 src/authentic2/utils/service.py
src/authentic2/authenticators.py
22 22

  
23 23
from authentic2.a2_rbac.models import OrganizationalUnit as OU, Role
24 24
from authentic2.custom_user.models import User
25
from . import views, app_settings, utils, constants
25
from . import views, app_settings, utils
26 26
from .utils.views import csrf_token_check
27
from .utils.service import get_service_from_request
27 28
from .forms import authentication as authentication_forms
28 29
from .utils.evaluate import evaluate_condition
29 30

  
......
65 66
    def name(self):
66 67
        return ugettext_lazy('Password')
67 68

  
68
    def get_service_ous(self, request):
69
        service_slug = request.GET.get(constants.SERVICE_FIELD_NAME)
70
        if not service_slug:
71
            return []
72
        roles = Role.objects.filter(allowed_services__slug=service_slug).children()
69
    def get_service_ous(self, service):
70
        roles = Role.objects.filter(allowed_services=service).children()
73 71
        if not roles:
74 72
            return []
75 73
        service_ou_ids = []
......
82 80
            return []
83 81
        return OU.objects.filter(pk__in=service_ou_ids)
84 82

  
85
    def get_preferred_ous(self, request):
83
    def get_preferred_ous(self, request, service):
86 84
        preferred_ous_cookie = utils.get_remember_cookie(request, 'preferred-ous')
87 85
        preferred_ous = []
88 86
        if preferred_ous_cookie:
89 87
            preferred_ous.extend(OU.objects.filter(pk__in=preferred_ous_cookie))
90 88
        # for the special case of services open to only one OU, pre-select it
91
        for ou in self.get_service_ous(request):
92
            if ou in preferred_ous:
93
                continue
94
            preferred_ous.append(ou)
89
        if service:
90
            for ou in self.get_service_ous(service):
91
                if ou in preferred_ous:
92
                    continue
93
                preferred_ous.append(ou)
95 94
        return preferred_ous
96 95

  
97 96
    def login(self, request, *args, **kwargs):
98
        service_slug = request.GET.get(constants.SERVICE_FIELD_NAME)
97
        service = get_service_from_request(request)
99 98
        context = kwargs.get('context', {})
100 99
        is_post = request.method == 'POST' and self.submit_name in request.POST
101 100
        data = request.POST if is_post else None
......
104 103

  
105 104
        # Special handling when the form contains an OU selector
106 105
        if app_settings.A2_LOGIN_FORM_OU_SELECTOR:
107
            preferred_ous = self.get_preferred_ous(request)
106
            preferred_ous = self.get_preferred_ous(request, service)
108 107
            if preferred_ous:
109 108
                initial['ou'] = preferred_ous[0]
110 109

  
......
129 128
                if form.cleaned_data.get('remember_me'):
130 129
                    request.session['remember_me'] = True
131 130
                    request.session.set_expiry(app_settings.A2_USER_REMEMBER_ME)
132
                response = utils.login(request, form.get_user(), how,
133
                                       service_slug=service_slug)
131
                response = utils.login(request, form.get_user(), how, service=service)
134 132
                if 'ou' in form.fields:
135 133
                    utils.prepend_remember_cookie(request, response, 'preferred-ous', form.cleaned_data['ou'].pk)
136 134

  
src/authentic2/utils/__init__.py
63 63
    filter_element_private_key
64 64

  
65 65
from .. import plugins, app_settings, constants, crypto
66
from .service import set_service_ref
66 67

  
67 68

  
68 69
class CleanLogMessage(logging.Filter):
......
431 432
    return None
432 433

  
433 434

  
434
def login(request, user, how, service_slug=None, nonce=None, **kwargs):
435
def login(request, user, how, service=None, service_slug=None, nonce=None, **kwargs):
435 436
    '''Login a user model, record the authentication event and redirect to next
436 437
       URL or settings.LOGIN_REDIRECT_URL.'''
437 438
    from .. import hooks
438 439
    from .views import check_cookie_works
439 440

  
441
    if service:
442
        assert service_slug is None
443
        service_slug = service.slug
440 444
    check_cookie_works(request)
441 445
    last_login = user.last_login
442 446
    auth_login(request, user)
......
455 459

  
456 460
def login_require(request, next_url=None, login_url='auth_login', service=None, login_hint=(), **kwargs):
457 461
    '''Require a login and come back to current URL'''
462

  
458 463
    next_url = next_url or request.get_full_path()
459 464
    params = kwargs.setdefault('params', {})
460 465
    params[REDIRECT_FIELD_NAME] = next_url
461 466
    if service:
462
        params['service'] = service.slug
467
        set_service_ref(params, service)
463 468
    if login_hint:
464 469
        request.session['login-hint'] = list(login_hint)
465 470
    elif 'login-hint' in request.session:
......
679 684
        return field.related_model
680 685

  
681 686

  
682
def get_registration_url(request, service_slug=None):
687
def get_registration_url(request, service=None):
683 688
    next_url = select_next_url(request, settings.LOGIN_REDIRECT_URL)
684 689
    next_url = make_url(next_url, request=request, keep_params=True,
685 690
                        include=(constants.NONCE_FIELD_NAME,), resolve=False)
686 691
    params = {REDIRECT_FIELD_NAME: next_url}
687
    if service_slug:
688
        params[constants.SERVICE_FIELD_NAME] = service_slug
692
    if service:
693
        set_service_ref(params, service)
689 694
    return make_url('registration_register', params=params)
690 695

  
691 696

  
......
1049 1054

  
1050 1055
def simulate_authentication(request, user, method,
1051 1056
                            backend='authentic2.backends.models_backend.ModelBackend',
1052
                            service_slug=None, **kwargs):
1057
                            service=None, **kwargs):
1053 1058
    '''Simulate a normal login by forcing a backend attribute on the user instance'''
1054 1059
    # do not modify the passed user
1055 1060
    user = copy.deepcopy(user)
1056 1061
    user.backend = backend
1057
    return login(request, user, method, service_slug=service_slug, **kwargs)
1062
    return login(request, user, method, service=service, **kwargs)
1058 1063

  
1059 1064

  
1060 1065
def get_manager_login_url():
src/authentic2/utils/service.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2019 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from authentic2.constants import SERVICE_FIELD_NAME
18

  
19

  
20
def service_ref(service):
21
    if service.ou:
22
        return '%s %s' % (service.ou.slug, service.slug)
23
    else:
24
        return service.slug
25

  
26

  
27
def get_service_from_ref(ref):
28
    from authentic2.models import Service
29

  
30
    splitted = ref.split(' ')
31

  
32
    try:
33
        ou_slug, service_slug = splitted
34
    except ValueError:
35
        pass
36
    else:
37
        return Service.objects.filter(ou__slug=ou_slug, slug=service_slug).first()
38

  
39
    try:
40
        service_slug, = splitted
41
    except ValueError:
42
        return None
43

  
44
    service = Service.objects.filter(ou__isnull=True, slug=service_slug).first()
45
    if service:
46
        return service
47
    try:
48
        return Service.objects.get(slug=service_slug)
49
    except (Service.DoesNotExist, Service.MultipleObjectsReturned):
50
        return None
51

  
52

  
53
def get_service_from_request(request):
54
    service_ref = request.GET.get(SERVICE_FIELD_NAME)
55
    if not service_ref or '\x00' in service_ref:
56
        return None
57
    return get_service_from_ref(service_ref)
58

  
59

  
60
def get_service_from_token(params):
61
    ref = params.get(SERVICE_FIELD_NAME)
62
    if not ref:
63
        return None
64
    return get_service_from_ref(ref)
65

  
66

  
67
def set_service_ref(params, service):
68
    params[SERVICE_FIELD_NAME] = service_ref(service)
src/authentic2/views.py
54 54
from authentic2.custom_user.models import iter_attributes
55 55
from . import (utils, app_settings, decorators, constants,
56 56
               models, cbv, hooks, validators, attribute_kinds)
57
from .utils.service import get_service_from_request, get_service_from_token, set_service_ref
57 58
from .utils import switch_user
58 59
from .a2_rbac.utils import get_default_ou
59 60
from .a2_rbac.models import OrganizationalUnit as OU
......
258 259

  
259 260
    redirect_to = request.GET.get(redirect_field_name)
260 261

  
262
    service = get_service_from_request(request)
263

  
261 264
    if not redirect_to or ' ' in redirect_to:
262 265
        redirect_to = settings.LOGIN_REDIRECT_URL
263 266
    # Heavier security check -- redirects to http://example.com should
......
272 275

  
273 276
    blocks = []
274 277

  
275
    registration_url = utils.get_registration_url(
276
        request, service_slug=request.GET.get(constants.SERVICE_FIELD_NAME))
278
    registration_url = utils.get_registration_url(request, service=service)
277 279

  
278 280
    context = {
279 281
        'cancel': nonce is not None,
......
313 315
            parameters = {'request': request,
314 316
                          'context': context}
315 317
            remote_addr = request.META.get('REMOTE_ADDR')
316
            service = request.GET.get('service')
317 318
            login_hint = set(request.session.get('login-hint', []))
318 319
            show_ctx = dict(remote_addr=remote_addr, login_hint=login_hint)
319
            if service and models.Service.objects.filter(slug=service).exists():
320
                    show_ctx['service_slug'] = service
320
            if service:
321
                show_ctx['service_ou_slug'] = service.ou and service.ou.slug
322
                show_ctx['service_slug'] = service.slug
323
                show_ctx['service'] = service
321 324
            # check if the authenticator has multiple instances
322 325
            if hasattr(authenticator, 'instances'):
323 326
                for instance_id, instance in authenticator.instances(**parameters):
......
846 849
            self.token[field] = form.cleaned_data[field]
847 850

  
848 851
        # propagate service to the registration completion view
849
        if constants.SERVICE_FIELD_NAME in self.request.GET:
850
            self.token[constants.SERVICE_FIELD_NAME] = \
851
                self.request.GET[constants.SERVICE_FIELD_NAME]
852
        service = get_service_from_request(self.request)
853
        if service:
854
            set_service_ref(self.token, service)
852 855

  
853 856
        self.token.pop(REDIRECT_FIELD_NAME, None)
854 857
        self.token.pop('email', None)
......
930 933
            self.email_is_unique |= self.ou.email_is_unique
931 934
        self.init_fields_labels_and_help_texts()
932 935
        # if registration is done during an SSO add the service to the registration event
933
        self.service = self.token.get(constants.SERVICE_FIELD_NAME)
936
        self.service = get_service_from_token(self.token)
934 937
        return super(RegistrationCompletionView, self) \
935 938
            .dispatch(request, *args, **kwargs)
936 939

  
......
1048 1051
            utils.simulate_authentication(
1049 1052
                request, self.users[0],
1050 1053
                method=self.authentication_method,
1051
                service_slug=self.service)
1054
                service=self.service)
1052 1055
            return utils.redirect(request, self.get_success_url())
1053 1056
        confirm_data = self.token.get('confirm_data', False)
1054 1057

  
......
1087 1090
                    utils.simulate_authentication(
1088 1091
                        request, user,
1089 1092
                        method=self.authentication_method,
1090
                        service_slug=self.service)
1093
                        service=self.service)
1091 1094
                    return utils.redirect(request, self.get_success_url())
1092 1095
        return super(RegistrationCompletionView, self).post(request, *args, **kwargs)
1093 1096

  
......
1127 1130
    def registration_success(self, request, user, form):
1128 1131
        hooks.call_hooks('event', name='registration', user=user, form=form, view=self,
1129 1132
                         authentication_method=self.authentication_method,
1130
                         token=self.token, service=self.service)
1133
                         token=self.token, service=self.service and self.service.slug)
1131 1134
        self.token_obj.delete()
1132 1135
        utils.simulate_authentication(
1133 1136
            request, user,
1134 1137
            method=self.authentication_method,
1135
            service_slug=self.service)
1138
            service=self.service)
1136 1139
        message_template = loader.get_template('authentic2/registration_success_message.html')
1137 1140
        messages.info(self.request, message_template.render(request=request))
1138 1141
        self.send_registration_success_email(user)
src/authentic2_auth_fc/views.py
47 47
from authentic2.a2_rbac.utils import get_default_ou
48 48
from authentic2.forms.passwords import SetPasswordForm
49 49
from authentic2.utils import views as views_utils
50
from authentic2.utils.service import get_service_from_request, set_service_ref
50 51

  
51 52
from . import app_settings, models, utils
52 53

  
......
372 373
    def get(self, request, *args, **kwargs):
373 374
        registration = True if 'registration' in request.GET else False
374 375
        '''Request an access grant code and associate it to the current user'''
375
        self.service_slug = request.GET.get(constants.SERVICE_FIELD_NAME)
376
        self.service = get_service_from_request(request)
376 377
        if request.user.is_authenticated:
377 378
            # Prevent to add a link with an FC account already linked with another user.
378 379
            try:
......
446 447
                        return self.redirect(request)
447 448
        if user:
448 449
            views_utils.check_cookie_works(request)
449
            a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug)
450
            a2_utils.login(request, user, 'france-connect', service=self.service)
450 451
            # set session expiration policy to EXPIRE_AT_BROWSER_CLOSE
451 452
            request.session.set_expiry(0)
452 453
            self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user)
......
457 458
            return self.redirect(request)
458 459
        else:
459 460
            params = {}
460
            if self.service_slug:
461
                params[constants.SERVICE_FIELD_NAME] = self.service_slug
461
            if self.service:
462
                set_service_ref(params, self.service)
462 463
            if registration:
463 464
                return self.redirect_and_come_back(request,
464 465
                                                   a2_utils.make_url('fc-registration',
......
496 497
        params = {
497 498
            REDIRECT_FIELD_NAME: redirect_to,
498 499
        }
499
        if constants.SERVICE_FIELD_NAME in request.GET:
500
            params[constants.SERVICE_FIELD_NAME] = request.GET[constants.SERVICE_FIELD_NAME]
500
        service = get_service_from_request(request)
501
        if service:
502
            set_service_ref(params, service)
501 503
        if self.get_in_popup():
502 504
            params['popup'] = ''
503 505
        redirect_to = a2_utils.make_url('fc-login-or-link', params=params)
......
510 512
        data['valid_email'] = False
511 513
        data['franceconnect'] = True
512 514
        data['authentication_method'] = 'france-connect'
513
        if constants.SERVICE_FIELD_NAME in request.GET:
514
            data[constants.SERVICE_FIELD_NAME] = request.GET[constants.SERVICE_FIELD_NAME]
515
        if service:
516
            set_service_ref(data, service)
515 517
        activation_url = a2_utils.build_activation_url(request,
516 518
                                                       next_url=redirect_to,
517 519
                                                       **data)
tests/auth_fc/test_auth_fc.py
32 32
from django.utils.six.moves.urllib import parse as urlparse
33 33
from django.utils.timezone import now
34 34

  
35
from authentic2.models import Service
36

  
35 37
from authentic2_auth_fc import models
36 38
from authentic2_auth_fc.utils import requests_retry_session
37 39

  
......
41 43
User = get_user_model()
42 44

  
43 45

  
46
@pytest.fixture(autouse=True)
47
def service(db):
48
    return Service.objects.create(name='portail', slug='portail')
49

  
50

  
44 51
def path(url):
45 52
    return urlparse.urlparse(url).path
46 53

  
tests/test_idp_oidc.py
959 959

  
960 960
    location = urlparse.urlparse(response['Location'])
961 961
    query = urlparse.parse_qs(location.query)
962
    assert query['service'] == ['client']
962
    assert query['service'] == ['default client']
963 963
    response = response.follow().click('Register')
964 964
    location = urlparse.urlparse(response.request.url)
965 965
    query = urlparse.parse_qs(location.query)
966
    assert query['service'] == ['client']
966
    assert query['service'] == ['default client']
967 967

  
968 968
    response.form.set('email', 'john.doe@example.com')
969 969
    response = response.form.submit()
tests/test_idp_saml2.py
346 346
            reverse('auth_login'),
347 347
            **{
348 348
                'nonce': '*',
349
                SERVICE_FIELD_NAME: self.sp.slug,
349
                SERVICE_FIELD_NAME: 'default ' + self.sp.slug,
350 350
                REDIRECT_FIELD_NAME: make_url(
351 351
                    'a2-idp-saml-continue',
352 352
                    params={NONCE_FIELD_NAME: request_id}),
353
-