0001-misc-validate-and-use-a-real-identifier-for-services.patch
src/authentic2/authenticators.py | ||
---|---|---|
22 | 22 | |
23 | 23 |
from authentic2.a2_rbac.models import OrganizationalUnit as OU, Role |
24 | 24 |
from authentic2.custom_user.models import User |
25 |
from . import views, app_settings, utils, constants
|
|
25 |
from . import views, app_settings, utils |
|
26 | 26 |
from .utils.views import csrf_token_check |
27 |
from .utils.service import get_service_from_request |
|
27 | 28 |
from .forms import authentication as authentication_forms |
28 | 29 |
from .utils.evaluate import evaluate_condition |
29 | 30 | |
... | ... | |
64 | 65 |
def name(self): |
65 | 66 |
return ugettext_lazy('Password') |
66 | 67 | |
67 |
def get_service_ous(self, request): |
|
68 |
service_slug = request.GET.get(constants.SERVICE_FIELD_NAME) |
|
69 |
if not service_slug: |
|
70 |
return [] |
|
71 |
roles = Role.objects.filter(allowed_services__slug=service_slug).children() |
|
68 |
def get_service_ous(self, service): |
|
69 |
roles = Role.objects.filter(allowed_services=service).children() |
|
72 | 70 |
if not roles: |
73 | 71 |
return [] |
74 | 72 |
service_ou_ids = [] |
... | ... | |
81 | 79 |
return [] |
82 | 80 |
return OU.objects.filter(pk__in=service_ou_ids) |
83 | 81 | |
84 |
def get_preferred_ous(self, request): |
|
82 |
def get_preferred_ous(self, request, service):
|
|
85 | 83 |
preferred_ous_cookie = utils.get_remember_cookie(request, 'preferred-ous') |
86 | 84 |
preferred_ous = [] |
87 | 85 |
if preferred_ous_cookie: |
88 | 86 |
preferred_ous.extend(OU.objects.filter(pk__in=preferred_ous_cookie)) |
89 | 87 |
# for the special case of services open to only one OU, pre-select it |
90 |
for ou in self.get_service_ous(request): |
|
91 |
if ou in preferred_ous: |
|
92 |
continue |
|
93 |
preferred_ous.append(ou) |
|
88 |
if service: |
|
89 |
for ou in self.get_service_ous(service): |
|
90 |
if ou in preferred_ous: |
|
91 |
continue |
|
92 |
preferred_ous.append(ou) |
|
94 | 93 |
return preferred_ous |
95 | 94 | |
96 | 95 |
def login(self, request, *args, **kwargs): |
97 |
service_slug = request.GET.get(constants.SERVICE_FIELD_NAME)
|
|
96 |
service = get_service_from_request(request)
|
|
98 | 97 |
context = kwargs.get('context', {}) |
99 | 98 |
is_post = request.method == 'POST' and self.submit_name in request.POST |
100 | 99 |
data = request.POST if is_post else None |
... | ... | |
103 | 102 | |
104 | 103 |
# Special handling when the form contains an OU selector |
105 | 104 |
if app_settings.A2_LOGIN_FORM_OU_SELECTOR: |
106 |
preferred_ous = self.get_preferred_ous(request) |
|
105 |
preferred_ous = self.get_preferred_ous(request, service)
|
|
107 | 106 |
if preferred_ous: |
108 | 107 |
initial['ou'] = preferred_ous[0] |
109 | 108 | |
... | ... | |
128 | 127 |
if form.cleaned_data.get('remember_me'): |
129 | 128 |
request.session['remember_me'] = True |
130 | 129 |
request.session.set_expiry(app_settings.A2_USER_REMEMBER_ME) |
131 |
response = utils.login(request, form.get_user(), how, |
|
132 |
service_slug=service_slug) |
|
130 |
response = utils.login(request, form.get_user(), how, service=service) |
|
133 | 131 |
if 'ou' in form.fields: |
134 | 132 |
utils.prepend_remember_cookie(request, response, 'preferred-ous', form.cleaned_data['ou'].pk) |
135 | 133 |
src/authentic2/utils/__init__.py | ||
---|---|---|
63 | 63 |
filter_element_private_key |
64 | 64 | |
65 | 65 |
from .. import plugins, app_settings, constants, crypto |
66 |
from .service import set_service |
|
66 | 67 | |
67 | 68 | |
68 | 69 |
class CleanLogMessage(logging.Filter): |
... | ... | |
431 | 432 |
return None |
432 | 433 | |
433 | 434 | |
434 |
def login(request, user, how, service_slug=None, nonce=None, **kwargs): |
|
435 |
def login(request, user, how, service=None, service_slug=None, nonce=None, **kwargs):
|
|
435 | 436 |
'''Login a user model, record the authentication event and redirect to next |
436 | 437 |
URL or settings.LOGIN_REDIRECT_URL.''' |
437 | 438 |
from .. import hooks |
438 | 439 |
from .views import check_cookie_works |
439 | 440 | |
441 |
if service: |
|
442 |
service_slug = service.slug |
|
440 | 443 |
check_cookie_works(request) |
441 | 444 |
last_login = user.last_login |
442 | 445 |
auth_login(request, user) |
... | ... | |
459 | 462 |
params = kwargs.setdefault('params', {}) |
460 | 463 |
params[REDIRECT_FIELD_NAME] = next_url |
461 | 464 |
if service: |
462 |
params['service'] = service.slug
|
|
465 |
set_service(params, service)
|
|
463 | 466 |
if login_hint: |
464 | 467 |
request.session['login-hint'] = list(login_hint) |
465 | 468 |
elif 'login-hint' in request.session: |
... | ... | |
679 | 682 |
return field.related_model |
680 | 683 | |
681 | 684 | |
682 |
def get_registration_url(request, service_slug=None):
|
|
685 |
def get_registration_url(request, service=None): |
|
683 | 686 |
next_url = select_next_url(request, settings.LOGIN_REDIRECT_URL) |
684 | 687 |
next_url = make_url(next_url, request=request, keep_params=True, |
685 | 688 |
include=(constants.NONCE_FIELD_NAME,), resolve=False) |
686 | 689 |
params = {REDIRECT_FIELD_NAME: next_url} |
687 |
if service_slug:
|
|
688 |
params[constants.SERVICE_FIELD_NAME] = service_slug
|
|
690 |
if service: |
|
691 |
set_service(params, service)
|
|
689 | 692 |
return make_url('registration_register', params=params) |
690 | 693 | |
691 | 694 | |
... | ... | |
1042 | 1045 | |
1043 | 1046 |
def simulate_authentication(request, user, method, |
1044 | 1047 |
backend='authentic2.backends.models_backend.ModelBackend', |
1045 |
service_slug=None, **kwargs):
|
|
1048 |
service=None, **kwargs): |
|
1046 | 1049 |
'''Simulate a normal login by forcing a backend attribute on the user instance''' |
1047 | 1050 |
# do not modify the passed user |
1048 | 1051 |
user = copy.deepcopy(user) |
1049 | 1052 |
user.backend = backend |
1050 |
return login(request, user, method, service_slug=service_slug, **kwargs)
|
|
1053 |
return login(request, user, method, service=service, **kwargs)
|
|
1051 | 1054 | |
1052 | 1055 | |
1053 | 1056 |
def get_manager_login_url(): |
src/authentic2/utils/service.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2019 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from authentic2.models import Service |
|
18 |
from authentic2.constants import SERVICE_FIELD_NAME |
|
19 | ||
20 | ||
21 |
def service_ref(service): |
|
22 |
if service.ou: |
|
23 |
return '%s %s' % (service.ou.slug, service.slug) |
|
24 |
else: |
|
25 |
return service.slug |
|
26 | ||
27 | ||
28 |
def get_service_from_ref(ref): |
|
29 |
splitted = ref.split(' ') |
|
30 | ||
31 |
try: |
|
32 |
ou_slug, service_slug = splitted |
|
33 |
except ValueError: |
|
34 |
pass |
|
35 |
else: |
|
36 |
return Service.objects.filter(ou__slug=ou_slug, slug=service_slug).first() |
|
37 | ||
38 |
try: |
|
39 |
service_slug, = splitted |
|
40 |
except ValueError: |
|
41 |
service = Service.objects.filter(ou__isnull=True, slug=service_slug) |
|
42 |
if service: |
|
43 |
return service |
|
44 |
try: |
|
45 |
return Service.objects.get(slug=service_slug) |
|
46 |
except (Service.DoesNotExist, Service.MultipleObjectsReturned): |
|
47 |
return None |
|
48 | ||
49 | ||
50 |
def get_service_from_request(request): |
|
51 |
service_ref = request.GET.get(SERVICE_FIELD_NAME) |
|
52 |
if not service_ref or '\x00' in service_ref: |
|
53 |
return None |
|
54 |
return get_service_from_ref(service_ref) |
|
55 | ||
56 | ||
57 |
def get_service_from_token(params): |
|
58 |
return get_service_from_ref(params[SERVICE_FIELD_NAME]) |
|
59 | ||
60 | ||
61 |
def set_service_ref(params, service): |
|
62 |
params[SERVICE_FIELD_NAME] = service_ref(service) |
src/authentic2/views.py | ||
---|---|---|
57 | 57 |
from authentic2.compat.misc import default_token_generator |
58 | 58 |
from . import (utils, app_settings, decorators, constants, |
59 | 59 |
models, cbv, hooks, validators) |
60 |
from .utils.service import get_service_from_request, get_service_from_token, set_service_ref |
|
60 | 61 |
from .utils import switch_user |
61 | 62 |
from .a2_rbac.utils import get_default_ou |
62 | 63 |
from .a2_rbac.models import OrganizationalUnit as OU |
... | ... | |
261 | 262 | |
262 | 263 |
redirect_to = request.GET.get(redirect_field_name) |
263 | 264 | |
265 |
service = get_service_from_request(request) |
|
266 | ||
264 | 267 |
if not redirect_to or ' ' in redirect_to: |
265 | 268 |
redirect_to = settings.LOGIN_REDIRECT_URL |
266 | 269 |
# Heavier security check -- redirects to http://example.com should |
... | ... | |
275 | 278 | |
276 | 279 |
blocks = [] |
277 | 280 | |
278 |
registration_url = utils.get_registration_url( |
|
279 |
request, service_slug=request.GET.get(constants.SERVICE_FIELD_NAME)) |
|
281 |
registration_url = utils.get_registration_url(request, service=service) |
|
280 | 282 | |
281 | 283 |
context = { |
282 | 284 |
'cancel': nonce is not None, |
... | ... | |
316 | 318 |
parameters = {'request': request, |
317 | 319 |
'context': context} |
318 | 320 |
remote_addr = request.META.get('REMOTE_ADDR') |
319 |
service = request.GET.get('service') |
|
320 | 321 |
login_hint = set(request.session.get('login-hint', [])) |
321 | 322 |
show_ctx = dict(remote_addr=remote_addr, login_hint=login_hint) |
322 |
if service and models.Service.objects.filter(slug=service).exists(): |
|
323 |
show_ctx['service_slug'] = service |
|
323 |
if service: |
|
324 |
show_ctx['service_slug'] = service.slug |
|
325 |
show_ctx['service'] = service |
|
324 | 326 |
# check if the authenticator has multiple instances |
325 | 327 |
if hasattr(authenticator, 'instances'): |
326 | 328 |
for instance_id, instance in authenticator.instances(**parameters): |
... | ... | |
835 | 837 |
self.token[field] = form.cleaned_data[field] |
836 | 838 | |
837 | 839 |
# propagate service to the registration completion view |
838 |
if constants.SERVICE_FIELD_NAME in self.request.GET:
|
|
839 |
self.token[constants.SERVICE_FIELD_NAME] = \
|
|
840 |
self.request.GET[constants.SERVICE_FIELD_NAME]
|
|
840 |
service = get_service_from_request(self.request)
|
|
841 |
if service:
|
|
842 |
set_service_ref(self.token, service)
|
|
841 | 843 | |
842 | 844 |
self.token.pop(REDIRECT_FIELD_NAME, None) |
843 | 845 |
self.token.pop('email', None) |
... | ... | |
908 | 910 |
self.email_is_unique |= self.ou.email_is_unique |
909 | 911 |
self.init_fields_labels_and_help_texts() |
910 | 912 |
# if registration is done during an SSO add the service to the registration event |
911 |
self.service = self.token.get(constants.SERVICE_FIELD_NAME)
|
|
913 |
self.service = get_service_from_token(self.token)
|
|
912 | 914 |
return super(RegistrationCompletionView, self) \ |
913 | 915 |
.dispatch(request, *args, **kwargs) |
914 | 916 | |
... | ... | |
1026 | 1028 |
utils.simulate_authentication( |
1027 | 1029 |
request, self.users[0], |
1028 | 1030 |
method=self.authentication_method, |
1029 |
service_slug=self.service)
|
|
1031 |
service=self.service) |
|
1030 | 1032 |
return utils.redirect(request, self.get_success_url()) |
1031 | 1033 |
confirm_data = self.token.get('confirm_data', False) |
1032 | 1034 | |
... | ... | |
1065 | 1067 |
utils.simulate_authentication( |
1066 | 1068 |
request, user, |
1067 | 1069 |
method=self.authentication_method, |
1068 |
service_slug=self.service)
|
|
1070 |
service=self.service) |
|
1069 | 1071 |
return utils.redirect(request, self.get_success_url()) |
1070 | 1072 |
return super(RegistrationCompletionView, self).post(request, *args, **kwargs) |
1071 | 1073 | |
... | ... | |
1101 | 1103 |
utils.simulate_authentication( |
1102 | 1104 |
request, user, |
1103 | 1105 |
method=self.authentication_method, |
1104 |
service_slug=self.service)
|
|
1106 |
service=self.service) |
|
1105 | 1107 |
message_template = loader.get_template('authentic2/registration_success_message.html') |
1106 | 1108 |
messages.info(self.request, message_template.render(request=request)) |
1107 | 1109 |
self.send_registration_success_email(user) |
src/authentic2_auth_fc/views.py | ||
---|---|---|
47 | 47 |
from authentic2.a2_rbac.utils import get_default_ou |
48 | 48 |
from authentic2.forms.passwords import SetPasswordForm |
49 | 49 |
from authentic2.utils import views as views_utils |
50 |
from authentic2.utils.service import get_service_from_request, set_service |
|
50 | 51 | |
51 | 52 |
from . import app_settings, models, utils |
52 | 53 | |
... | ... | |
372 | 373 |
def get(self, request, *args, **kwargs): |
373 | 374 |
registration = True if 'registration' in request.GET else False |
374 | 375 |
'''Request an access grant code and associate it to the current user''' |
375 |
self.service_slug = request.GET.get(constants.SERVICE_FIELD_NAME)
|
|
376 |
self.service = get_service_from_request(request)
|
|
376 | 377 |
if request.user.is_authenticated: |
377 | 378 |
# Prevent to add a link with an FC account already linked with another user. |
378 | 379 |
try: |
... | ... | |
446 | 447 |
return self.redirect(request) |
447 | 448 |
if user: |
448 | 449 |
views_utils.check_cookie_works(request) |
449 |
a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug)
|
|
450 |
a2_utils.login(request, user, 'france-connect', service=self.service)
|
|
450 | 451 |
# set session expiration policy to EXPIRE_AT_BROWSER_CLOSE |
451 | 452 |
request.session.set_expiry(0) |
452 | 453 |
self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user) |
... | ... | |
457 | 458 |
return self.redirect(request) |
458 | 459 |
else: |
459 | 460 |
params = {} |
460 |
if self.service_slug:
|
|
461 |
params[constants.SERVICE_FIELD_NAME] = self.service_slug
|
|
461 |
if self.service: |
|
462 |
set_service(params, self.service)
|
|
462 | 463 |
if registration: |
463 | 464 |
return self.redirect_and_come_back(request, |
464 | 465 |
a2_utils.make_url('fc-registration', |
... | ... | |
496 | 497 |
params = { |
497 | 498 |
REDIRECT_FIELD_NAME: redirect_to, |
498 | 499 |
} |
499 |
if constants.SERVICE_FIELD_NAME in request.GET: |
|
500 |
params[constants.SERVICE_FIELD_NAME] = request.GET[constants.SERVICE_FIELD_NAME] |
|
500 |
service = get_service_from_request(request) |
|
501 |
if service: |
|
502 |
set_service(params, service) |
|
501 | 503 |
if self.get_in_popup(): |
502 | 504 |
params['popup'] = '' |
503 | 505 |
redirect_to = a2_utils.make_url('fc-login-or-link', params=params) |
... | ... | |
510 | 512 |
data['valid_email'] = False |
511 | 513 |
data['franceconnect'] = True |
512 | 514 |
data['authentication_method'] = 'france-connect' |
513 |
if constants.SERVICE_FIELD_NAME in request.GET:
|
|
514 |
data[constants.SERVICE_FIELD_NAME] = request.GET[constants.SERVICE_FIELD_NAME]
|
|
515 |
if service:
|
|
516 |
set_service(data, service)
|
|
515 | 517 |
activation_url = a2_utils.build_activation_url(request, |
516 | 518 |
next_url=redirect_to, |
517 | 519 |
**data) |
518 |
- |