0001-misc-validate-and-use-a-real-identifier-for-services.patch
src/authentic2/authenticators.py | ||
---|---|---|
22 | 22 | |
23 | 23 |
from authentic2.a2_rbac.models import OrganizationalUnit as OU, Role |
24 | 24 |
from authentic2.custom_user.models import User |
25 |
from . import views, app_settings, utils, constants
|
|
25 |
from . import views, app_settings, utils |
|
26 | 26 |
from .utils.views import csrf_token_check |
27 |
from .utils.service import get_service_from_request |
|
27 | 28 |
from .forms import authentication as authentication_forms |
28 | 29 |
from .utils.evaluate import evaluate_condition |
29 | 30 | |
... | ... | |
65 | 66 |
def name(self): |
66 | 67 |
return ugettext_lazy('Password') |
67 | 68 | |
68 |
def get_service_ous(self, request): |
|
69 |
service_slug = request.GET.get(constants.SERVICE_FIELD_NAME) |
|
70 |
if not service_slug: |
|
71 |
return [] |
|
72 |
roles = Role.objects.filter(allowed_services__slug=service_slug).children() |
|
69 |
def get_service_ous(self, service): |
|
70 |
roles = Role.objects.filter(allowed_services=service).children() |
|
73 | 71 |
if not roles: |
74 | 72 |
return [] |
75 | 73 |
service_ou_ids = [] |
... | ... | |
82 | 80 |
return [] |
83 | 81 |
return OU.objects.filter(pk__in=service_ou_ids) |
84 | 82 | |
85 |
def get_preferred_ous(self, request): |
|
83 |
def get_preferred_ous(self, request, service):
|
|
86 | 84 |
preferred_ous_cookie = utils.get_remember_cookie(request, 'preferred-ous') |
87 | 85 |
preferred_ous = [] |
88 | 86 |
if preferred_ous_cookie: |
89 | 87 |
preferred_ous.extend(OU.objects.filter(pk__in=preferred_ous_cookie)) |
90 | 88 |
# for the special case of services open to only one OU, pre-select it |
91 |
for ou in self.get_service_ous(request): |
|
92 |
if ou in preferred_ous: |
|
93 |
continue |
|
94 |
preferred_ous.append(ou) |
|
89 |
if service: |
|
90 |
for ou in self.get_service_ous(service): |
|
91 |
if ou in preferred_ous: |
|
92 |
continue |
|
93 |
preferred_ous.append(ou) |
|
95 | 94 |
return preferred_ous |
96 | 95 | |
97 | 96 |
def login(self, request, *args, **kwargs): |
98 |
service_slug = request.GET.get(constants.SERVICE_FIELD_NAME)
|
|
97 |
service = get_service_from_request(request)
|
|
99 | 98 |
context = kwargs.get('context', {}) |
100 | 99 |
is_post = request.method == 'POST' and self.submit_name in request.POST |
101 | 100 |
data = request.POST if is_post else None |
... | ... | |
104 | 103 | |
105 | 104 |
# Special handling when the form contains an OU selector |
106 | 105 |
if app_settings.A2_LOGIN_FORM_OU_SELECTOR: |
107 |
preferred_ous = self.get_preferred_ous(request) |
|
106 |
preferred_ous = self.get_preferred_ous(request, service)
|
|
108 | 107 |
if preferred_ous: |
109 | 108 |
initial['ou'] = preferred_ous[0] |
110 | 109 | |
... | ... | |
129 | 128 |
if form.cleaned_data.get('remember_me'): |
130 | 129 |
request.session['remember_me'] = True |
131 | 130 |
request.session.set_expiry(app_settings.A2_USER_REMEMBER_ME) |
132 |
response = utils.login(request, form.get_user(), how, |
|
133 |
service_slug=service_slug) |
|
131 |
response = utils.login(request, form.get_user(), how, service=service) |
|
134 | 132 |
if 'ou' in form.fields: |
135 | 133 |
utils.prepend_remember_cookie(request, response, 'preferred-ous', form.cleaned_data['ou'].pk) |
136 | 134 |
src/authentic2/utils/__init__.py | ||
---|---|---|
63 | 63 |
filter_element_private_key |
64 | 64 | |
65 | 65 |
from .. import plugins, app_settings, constants, crypto |
66 |
from .service import set_service_ref |
|
66 | 67 | |
67 | 68 | |
68 | 69 |
class CleanLogMessage(logging.Filter): |
... | ... | |
431 | 432 |
return None |
432 | 433 | |
433 | 434 | |
434 |
def login(request, user, how, service_slug=None, nonce=None, **kwargs): |
|
435 |
def login(request, user, how, service=None, service_slug=None, nonce=None, **kwargs):
|
|
435 | 436 |
'''Login a user model, record the authentication event and redirect to next |
436 | 437 |
URL or settings.LOGIN_REDIRECT_URL.''' |
437 | 438 |
from .. import hooks |
438 | 439 |
from .views import check_cookie_works |
439 | 440 | |
441 |
if service: |
|
442 |
assert service_slug is None |
|
443 |
service_slug = service.slug |
|
440 | 444 |
check_cookie_works(request) |
441 | 445 |
last_login = user.last_login |
442 | 446 |
auth_login(request, user) |
... | ... | |
455 | 459 | |
456 | 460 |
def login_require(request, next_url=None, login_url='auth_login', service=None, login_hint=(), **kwargs): |
457 | 461 |
'''Require a login and come back to current URL''' |
462 | ||
458 | 463 |
next_url = next_url or request.get_full_path() |
459 | 464 |
params = kwargs.setdefault('params', {}) |
460 | 465 |
params[REDIRECT_FIELD_NAME] = next_url |
461 | 466 |
if service: |
462 |
params['service'] = service.slug
|
|
467 |
set_service_ref(params, service)
|
|
463 | 468 |
if login_hint: |
464 | 469 |
request.session['login-hint'] = list(login_hint) |
465 | 470 |
elif 'login-hint' in request.session: |
... | ... | |
679 | 684 |
return field.related_model |
680 | 685 | |
681 | 686 | |
682 |
def get_registration_url(request, service_slug=None):
|
|
687 |
def get_registration_url(request, service=None): |
|
683 | 688 |
next_url = select_next_url(request, settings.LOGIN_REDIRECT_URL) |
684 | 689 |
next_url = make_url(next_url, request=request, keep_params=True, |
685 | 690 |
include=(constants.NONCE_FIELD_NAME,), resolve=False) |
686 | 691 |
params = {REDIRECT_FIELD_NAME: next_url} |
687 |
if service_slug:
|
|
688 |
params[constants.SERVICE_FIELD_NAME] = service_slug
|
|
692 |
if service: |
|
693 |
set_service_ref(params, service)
|
|
689 | 694 |
return make_url('registration_register', params=params) |
690 | 695 | |
691 | 696 | |
... | ... | |
1049 | 1054 | |
1050 | 1055 |
def simulate_authentication(request, user, method, |
1051 | 1056 |
backend='authentic2.backends.models_backend.ModelBackend', |
1052 |
service_slug=None, **kwargs):
|
|
1057 |
service=None, **kwargs): |
|
1053 | 1058 |
'''Simulate a normal login by forcing a backend attribute on the user instance''' |
1054 | 1059 |
# do not modify the passed user |
1055 | 1060 |
user = copy.deepcopy(user) |
1056 | 1061 |
user.backend = backend |
1057 |
return login(request, user, method, service_slug=service_slug, **kwargs)
|
|
1062 |
return login(request, user, method, service=service, **kwargs)
|
|
1058 | 1063 | |
1059 | 1064 | |
1060 | 1065 |
def get_manager_login_url(): |
src/authentic2/utils/service.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2019 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from authentic2.constants import SERVICE_FIELD_NAME |
|
18 | ||
19 | ||
20 |
def service_ref(service): |
|
21 |
if service.ou: |
|
22 |
return '%s %s' % (service.ou.slug, service.slug) |
|
23 |
else: |
|
24 |
return service.slug |
|
25 | ||
26 | ||
27 |
def get_service_from_ref(ref): |
|
28 |
from authentic2.models import Service |
|
29 | ||
30 |
splitted = ref.split(' ') |
|
31 | ||
32 |
try: |
|
33 |
ou_slug, service_slug = splitted |
|
34 |
except ValueError: |
|
35 |
pass |
|
36 |
else: |
|
37 |
return Service.objects.filter(ou__slug=ou_slug, slug=service_slug).first() |
|
38 | ||
39 |
try: |
|
40 |
service_slug, = splitted |
|
41 |
except ValueError: |
|
42 |
return None |
|
43 | ||
44 |
service = Service.objects.filter(ou__isnull=True, slug=service_slug).first() |
|
45 |
if service: |
|
46 |
return service |
|
47 |
try: |
|
48 |
return Service.objects.get(slug=service_slug) |
|
49 |
except (Service.DoesNotExist, Service.MultipleObjectsReturned): |
|
50 |
return None |
|
51 | ||
52 | ||
53 |
def get_service_from_request(request): |
|
54 |
service_ref = request.GET.get(SERVICE_FIELD_NAME) |
|
55 |
if not service_ref or '\x00' in service_ref: |
|
56 |
return None |
|
57 |
return get_service_from_ref(service_ref) |
|
58 | ||
59 | ||
60 |
def get_service_from_token(params): |
|
61 |
ref = params.get(SERVICE_FIELD_NAME) |
|
62 |
if not ref: |
|
63 |
return None |
|
64 |
return get_service_from_ref(ref) |
|
65 | ||
66 | ||
67 |
def set_service_ref(params, service): |
|
68 |
params[SERVICE_FIELD_NAME] = service_ref(service) |
src/authentic2/views.py | ||
---|---|---|
54 | 54 |
from authentic2.custom_user.models import iter_attributes |
55 | 55 |
from . import (utils, app_settings, decorators, constants, |
56 | 56 |
models, cbv, hooks, validators, attribute_kinds) |
57 |
from .utils.service import get_service_from_request, get_service_from_token, set_service_ref |
|
57 | 58 |
from .utils import switch_user |
58 | 59 |
from .a2_rbac.utils import get_default_ou |
59 | 60 |
from .a2_rbac.models import OrganizationalUnit as OU |
... | ... | |
258 | 259 | |
259 | 260 |
redirect_to = request.GET.get(redirect_field_name) |
260 | 261 | |
262 |
service = get_service_from_request(request) |
|
263 | ||
261 | 264 |
if not redirect_to or ' ' in redirect_to: |
262 | 265 |
redirect_to = settings.LOGIN_REDIRECT_URL |
263 | 266 |
# Heavier security check -- redirects to http://example.com should |
... | ... | |
272 | 275 | |
273 | 276 |
blocks = [] |
274 | 277 | |
275 |
registration_url = utils.get_registration_url( |
|
276 |
request, service_slug=request.GET.get(constants.SERVICE_FIELD_NAME)) |
|
278 |
registration_url = utils.get_registration_url(request, service=service) |
|
277 | 279 | |
278 | 280 |
context = { |
279 | 281 |
'cancel': nonce is not None, |
... | ... | |
313 | 315 |
parameters = {'request': request, |
314 | 316 |
'context': context} |
315 | 317 |
remote_addr = request.META.get('REMOTE_ADDR') |
316 |
service = request.GET.get('service') |
|
317 | 318 |
login_hint = set(request.session.get('login-hint', [])) |
318 | 319 |
show_ctx = dict(remote_addr=remote_addr, login_hint=login_hint) |
319 |
if service and models.Service.objects.filter(slug=service).exists(): |
|
320 |
show_ctx['service_slug'] = service |
|
320 |
if service: |
|
321 |
show_ctx['service_ou_slug'] = service.ou and service.ou.slug |
|
322 |
show_ctx['service_slug'] = service.slug |
|
323 |
show_ctx['service'] = service |
|
321 | 324 |
# check if the authenticator has multiple instances |
322 | 325 |
if hasattr(authenticator, 'instances'): |
323 | 326 |
for instance_id, instance in authenticator.instances(**parameters): |
... | ... | |
846 | 849 |
self.token[field] = form.cleaned_data[field] |
847 | 850 | |
848 | 851 |
# propagate service to the registration completion view |
849 |
if constants.SERVICE_FIELD_NAME in self.request.GET:
|
|
850 |
self.token[constants.SERVICE_FIELD_NAME] = \
|
|
851 |
self.request.GET[constants.SERVICE_FIELD_NAME]
|
|
852 |
service = get_service_from_request(self.request)
|
|
853 |
if service:
|
|
854 |
set_service_ref(self.token, service)
|
|
852 | 855 | |
853 | 856 |
self.token.pop(REDIRECT_FIELD_NAME, None) |
854 | 857 |
self.token.pop('email', None) |
... | ... | |
930 | 933 |
self.email_is_unique |= self.ou.email_is_unique |
931 | 934 |
self.init_fields_labels_and_help_texts() |
932 | 935 |
# if registration is done during an SSO add the service to the registration event |
933 |
self.service = self.token.get(constants.SERVICE_FIELD_NAME)
|
|
936 |
self.service = get_service_from_token(self.token)
|
|
934 | 937 |
return super(RegistrationCompletionView, self) \ |
935 | 938 |
.dispatch(request, *args, **kwargs) |
936 | 939 | |
... | ... | |
1048 | 1051 |
utils.simulate_authentication( |
1049 | 1052 |
request, self.users[0], |
1050 | 1053 |
method=self.authentication_method, |
1051 |
service_slug=self.service)
|
|
1054 |
service=self.service) |
|
1052 | 1055 |
return utils.redirect(request, self.get_success_url()) |
1053 | 1056 |
confirm_data = self.token.get('confirm_data', False) |
1054 | 1057 | |
... | ... | |
1087 | 1090 |
utils.simulate_authentication( |
1088 | 1091 |
request, user, |
1089 | 1092 |
method=self.authentication_method, |
1090 |
service_slug=self.service)
|
|
1093 |
service=self.service) |
|
1091 | 1094 |
return utils.redirect(request, self.get_success_url()) |
1092 | 1095 |
return super(RegistrationCompletionView, self).post(request, *args, **kwargs) |
1093 | 1096 | |
... | ... | |
1127 | 1130 |
def registration_success(self, request, user, form): |
1128 | 1131 |
hooks.call_hooks('event', name='registration', user=user, form=form, view=self, |
1129 | 1132 |
authentication_method=self.authentication_method, |
1130 |
token=self.token, service=self.service) |
|
1133 |
token=self.token, service=self.service and self.service.slug)
|
|
1131 | 1134 |
self.token_obj.delete() |
1132 | 1135 |
utils.simulate_authentication( |
1133 | 1136 |
request, user, |
1134 | 1137 |
method=self.authentication_method, |
1135 |
service_slug=self.service)
|
|
1138 |
service=self.service) |
|
1136 | 1139 |
message_template = loader.get_template('authentic2/registration_success_message.html') |
1137 | 1140 |
messages.info(self.request, message_template.render(request=request)) |
1138 | 1141 |
self.send_registration_success_email(user) |
src/authentic2_auth_fc/views.py | ||
---|---|---|
47 | 47 |
from authentic2.a2_rbac.utils import get_default_ou |
48 | 48 |
from authentic2.forms.passwords import SetPasswordForm |
49 | 49 |
from authentic2.utils import views as views_utils |
50 |
from authentic2.utils.service import get_service_from_request, set_service_ref |
|
50 | 51 | |
51 | 52 |
from . import app_settings, models, utils |
52 | 53 | |
... | ... | |
372 | 373 |
def get(self, request, *args, **kwargs): |
373 | 374 |
registration = True if 'registration' in request.GET else False |
374 | 375 |
'''Request an access grant code and associate it to the current user''' |
375 |
self.service_slug = request.GET.get(constants.SERVICE_FIELD_NAME)
|
|
376 |
self.service = get_service_from_request(request)
|
|
376 | 377 |
if request.user.is_authenticated: |
377 | 378 |
# Prevent to add a link with an FC account already linked with another user. |
378 | 379 |
try: |
... | ... | |
446 | 447 |
return self.redirect(request) |
447 | 448 |
if user: |
448 | 449 |
views_utils.check_cookie_works(request) |
449 |
a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug)
|
|
450 |
a2_utils.login(request, user, 'france-connect', service=self.service)
|
|
450 | 451 |
# set session expiration policy to EXPIRE_AT_BROWSER_CLOSE |
451 | 452 |
request.session.set_expiry(0) |
452 | 453 |
self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user) |
... | ... | |
457 | 458 |
return self.redirect(request) |
458 | 459 |
else: |
459 | 460 |
params = {} |
460 |
if self.service_slug:
|
|
461 |
params[constants.SERVICE_FIELD_NAME] = self.service_slug
|
|
461 |
if self.service: |
|
462 |
set_service_ref(params, self.service)
|
|
462 | 463 |
if registration: |
463 | 464 |
return self.redirect_and_come_back(request, |
464 | 465 |
a2_utils.make_url('fc-registration', |
... | ... | |
496 | 497 |
params = { |
497 | 498 |
REDIRECT_FIELD_NAME: redirect_to, |
498 | 499 |
} |
499 |
if constants.SERVICE_FIELD_NAME in request.GET: |
|
500 |
params[constants.SERVICE_FIELD_NAME] = request.GET[constants.SERVICE_FIELD_NAME] |
|
500 |
service = get_service_from_request(request) |
|
501 |
if service: |
|
502 |
set_service_ref(params, service) |
|
501 | 503 |
if self.get_in_popup(): |
502 | 504 |
params['popup'] = '' |
503 | 505 |
redirect_to = a2_utils.make_url('fc-login-or-link', params=params) |
... | ... | |
510 | 512 |
data['valid_email'] = False |
511 | 513 |
data['franceconnect'] = True |
512 | 514 |
data['authentication_method'] = 'france-connect' |
513 |
if constants.SERVICE_FIELD_NAME in request.GET:
|
|
514 |
data[constants.SERVICE_FIELD_NAME] = request.GET[constants.SERVICE_FIELD_NAME]
|
|
515 |
if service:
|
|
516 |
set_service_ref(data, service)
|
|
515 | 517 |
activation_url = a2_utils.build_activation_url(request, |
516 | 518 |
next_url=redirect_to, |
517 | 519 |
**data) |
tests/auth_fc/test_auth_fc.py | ||
---|---|---|
32 | 32 |
from django.utils.six.moves.urllib import parse as urlparse |
33 | 33 |
from django.utils.timezone import now |
34 | 34 | |
35 |
from authentic2.models import Service |
|
36 | ||
35 | 37 |
from authentic2_auth_fc import models |
36 | 38 |
from authentic2_auth_fc.utils import requests_retry_session |
37 | 39 | |
... | ... | |
41 | 43 |
User = get_user_model() |
42 | 44 | |
43 | 45 | |
46 |
@pytest.fixture(autouse=True) |
|
47 |
def service(db): |
|
48 |
return Service.objects.create(name='portail', slug='portail') |
|
49 | ||
50 | ||
44 | 51 |
def path(url): |
45 | 52 |
return urlparse.urlparse(url).path |
46 | 53 |
tests/test_idp_oidc.py | ||
---|---|---|
959 | 959 | |
960 | 960 |
location = urlparse.urlparse(response['Location']) |
961 | 961 |
query = urlparse.parse_qs(location.query) |
962 |
assert query['service'] == ['client'] |
|
962 |
assert query['service'] == ['default client']
|
|
963 | 963 |
response = response.follow().click('Register') |
964 | 964 |
location = urlparse.urlparse(response.request.url) |
965 | 965 |
query = urlparse.parse_qs(location.query) |
966 |
assert query['service'] == ['client'] |
|
966 |
assert query['service'] == ['default client']
|
|
967 | 967 | |
968 | 968 |
response.form.set('email', 'john.doe@example.com') |
969 | 969 |
response = response.form.submit() |
tests/test_idp_saml2.py | ||
---|---|---|
346 | 346 |
reverse('auth_login'), |
347 | 347 |
**{ |
348 | 348 |
'nonce': '*', |
349 |
SERVICE_FIELD_NAME: self.sp.slug, |
|
349 |
SERVICE_FIELD_NAME: 'default ' + self.sp.slug,
|
|
350 | 350 |
REDIRECT_FIELD_NAME: make_url( |
351 | 351 |
'a2-idp-saml-continue', |
352 | 352 |
params={NONCE_FIELD_NAME: request_id}), |
353 |
- |