0001-misc-maintain-home-url-service-and-ou-61199.patch
src/authentic2/authenticators.py | ||
---|---|---|
29 | 29 |
from .forms import authentication as authentication_forms |
30 | 30 |
from .utils import misc as utils_misc |
31 | 31 |
from .utils.evaluate import evaluate_condition |
32 |
from .utils.service import get_service_from_request
|
|
32 |
from .utils.service import get_service |
|
33 | 33 |
from .utils.views import csrf_token_check |
34 | 34 | |
35 | 35 |
logger = logging.getLogger(__name__) |
... | ... | |
88 | 88 |
return [] |
89 | 89 |
return OU.objects.filter(pk__in=service_ou_ids) |
90 | 90 | |
91 |
def get_preferred_ous(self, request, service): |
|
91 |
def get_preferred_ous(self, request): |
|
92 |
service = get_service(request) |
|
92 | 93 |
preferred_ous_cookie = utils_misc.get_remember_cookie(request, 'preferred-ous') |
93 | 94 |
preferred_ous = [] |
94 | 95 |
if preferred_ous_cookie: |
... | ... | |
102 | 103 |
return preferred_ous |
103 | 104 | |
104 | 105 |
def login(self, request, *args, **kwargs): |
105 |
service = get_service_from_request(request) |
|
106 | 106 |
context = kwargs.get('context', {}) |
107 | 107 |
is_post = request.method == 'POST' and self.submit_name in request.POST |
108 | 108 |
data = request.POST if is_post else None |
... | ... | |
112 | 112 | |
113 | 113 |
# Special handling when the form contains an OU selector |
114 | 114 |
if app_settings.A2_LOGIN_FORM_OU_SELECTOR: |
115 |
preferred_ous = self.get_preferred_ous(request, service)
|
|
115 |
preferred_ous = self.get_preferred_ous(request) |
|
116 | 116 |
if preferred_ous: |
117 | 117 |
initial['ou'] = preferred_ous[0] |
118 | 118 | |
... | ... | |
135 | 135 |
if form.cleaned_data.get('remember_me'): |
136 | 136 |
request.session['remember_me'] = True |
137 | 137 |
request.session.set_expiry(app_settings.A2_USER_REMEMBER_ME) |
138 |
response = utils_misc.login(request, form.get_user(), how, service=service)
|
|
138 |
response = utils_misc.login(request, form.get_user(), how) |
|
139 | 139 |
if 'ou' in form.fields: |
140 | 140 |
utils_misc.prepend_remember_cookie( |
141 | 141 |
request, response, 'preferred-ous', form.cleaned_data['ou'].pk |
src/authentic2/constants.py | ||
---|---|---|
20 | 20 |
AUTHENTICATION_EVENTS_SESSION_KEY = 'authentication-events' |
21 | 21 |
SWITCH_USER_SESSION_KEY = '_switch_user' |
22 | 22 |
LAST_LOGIN_SESSION_KEY = '_last_login' |
23 |
SERVICE_FIELD_NAME = 'service' |
|
24 | 23 |
NEXT_URL_SIGNATURE = 'next-signature' |
src/authentic2/context_processors.py | ||
---|---|---|
20 | 20 |
from . import app_settings, constants |
21 | 21 |
from .models import Service |
22 | 22 |
from .utils import misc as utils_misc |
23 |
from .utils.service import get_service |
|
23 | 24 | |
24 | 25 | |
25 | 26 |
class UserFederations: |
... | ... | |
69 | 70 |
except Service.DoesNotExist: |
70 | 71 |
pass |
71 | 72 |
return variables |
73 | ||
74 | ||
75 |
def home(request): |
|
76 |
ctx = {} |
|
77 |
service = get_service(request) |
|
78 |
if service: |
|
79 |
ctx['home_service'] = service |
|
80 |
if service.ou: |
|
81 |
ctx['home_ou'] = service.ou |
|
82 |
if request.session.get('home_url'): |
|
83 |
ctx['home_url'] = request.session['home_url'] |
|
84 |
elif service and service.ou and service.ou.home_url: |
|
85 |
ctx['home_url'] = service.ou.home_url |
|
86 |
else: |
|
87 |
ctx['home_url'] = app_settings.A2_HOMEPAGE_URL or settings.LOGIN_REDIRECT_URL |
|
88 |
return ctx |
src/authentic2/idp/saml/saml2_endpoints.py | ||
---|---|---|
113 | 113 |
from authentic2.utils.misc import datetime_to_xs_datetime, find_authentication_event |
114 | 114 |
from authentic2.utils.misc import get_backends as get_idp_backends |
115 | 115 |
from authentic2.utils.misc import login_require, make_url |
116 |
from authentic2.utils.service import set_service |
|
116 | 117 |
from authentic2.utils.view_decorators import check_view_restriction, enable_view_restriction |
117 | 118 | |
118 | 119 |
from . import app_settings |
... | ... | |
582 | 583 |
}, |
583 | 584 |
) |
584 | 585 |
else: |
586 |
set_service(request, provider_loaded) |
|
585 | 587 |
policy = get_sp_options_policy(provider_loaded) |
586 | 588 |
if not policy: |
587 | 589 |
return error_page(request, _('sso: No SP policy defined'), logger=logger, warning=True) |
... | ... | |
628 | 630 |
return sso_after_process_request(request, login, nid_format=nid_format) |
629 | 631 | |
630 | 632 | |
631 |
def need_login(request, login, nid_format, service):
|
|
633 |
def need_login(request, login, nid_format): |
|
632 | 634 |
"""Redirect to the login page with a nonce parameter to verify later that |
633 | 635 |
the login form was submitted |
634 | 636 |
""" |
... | ... | |
640 | 642 |
request, |
641 | 643 |
next_url=next_url, |
642 | 644 |
params={NONCE_FIELD_NAME: nonce}, |
643 |
service=service, |
|
644 | 645 |
login_hint=get_login_hints_extension(login), |
645 | 646 |
) |
646 | 647 | |
... | ... | |
789 | 790 | |
790 | 791 |
if not passive and (user.is_anonymous or (force_authn and not did_auth)): |
791 | 792 |
logger.debug('login required') |
792 |
return need_login(request, login, nid_format, service)
|
|
793 |
return need_login(request, login, nid_format) |
|
793 | 794 | |
794 | 795 |
# No user is authenticated and passive is True, deny request |
795 | 796 |
if passive and user.is_anonymous: |
... | ... | |
1296 | 1297 |
except ObjectDoesNotExist: |
1297 | 1298 |
logger.warning('provider %r unknown', logout.remoteProviderId) |
1298 | 1299 |
return return_logout_error(request, logout, AUTHENTIC_STATUS_CODE_UNAUTHORIZED) |
1300 |
set_service(request, provider) |
|
1299 | 1301 |
policy = get_sp_options_policy(provider) |
1300 | 1302 |
if not policy: |
1301 | 1303 |
logger.warning('No policy found for %s', logout.remoteProviderId) |
... | ... | |
1385 | 1387 |
except ObjectDoesNotExist: |
1386 | 1388 |
logger.debug('provider %r unknown', logout.remoteProviderId) |
1387 | 1389 |
return return_logout_error(request, logout, AUTHENTIC_STATUS_CODE_UNAUTHORIZED) |
1390 |
set_service(request, provider) |
|
1388 | 1391 |
policy = get_sp_options_policy(provider) |
1389 | 1392 |
if not policy: |
1390 | 1393 |
logger.debug('No policy found for %s', logout.remoteProviderId) |
src/authentic2/journal.py | ||
---|---|---|
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
from authentic2.apps.journal.journal import Journal as BaseJournal |
18 |
from authentic2.utils.service import get_service_from_request
|
|
18 |
from authentic2.utils.service import get_service |
|
19 | 19 | |
20 | 20 | |
21 | 21 |
class Journal(BaseJournal): |
... | ... | |
25 | 25 | |
26 | 26 |
@property |
27 | 27 |
def service(self): |
28 |
return self._service or (get_service_from_request(self.request) if self.request else None)
|
|
28 |
return self._service or get_service(self.request) if self.request else None
|
|
29 | 29 | |
30 | 30 |
def massage_kwargs(self, record_parameters, kwargs): |
31 | 31 |
if 'service' not in kwargs and 'service' in record_parameters: |
src/authentic2/middleware.py | ||
---|---|---|
28 | 28 |
from django.contrib import messages |
29 | 29 |
from django.db.models import Model |
30 | 30 |
from django.utils.deprecation import MiddlewareMixin |
31 |
from django.utils.functional import SimpleLazyObject |
|
32 | 31 |
from django.utils.translation import ugettext as _ |
33 | 32 | |
34 | 33 |
from . import app_settings, plugins |
35 | 34 |
from .utils import misc as utils_misc |
36 |
from .utils.service import get_service_from_request, get_service_from_session |
|
37 | 35 | |
38 | 36 | |
39 | 37 |
class CollectIPMiddleware(MiddlewareMixin): |
... | ... | |
263 | 261 |
return response |
264 | 262 | |
265 | 263 | |
266 |
class SaveServiceInSessionMiddleware: |
|
267 |
def __init__(self, get_response): |
|
268 |
self.get_response = get_response |
|
269 | ||
270 |
def __call__(self, request): |
|
271 |
service = get_service_from_request(request) |
|
272 |
if service: |
|
273 |
request.session['service_pk'] = service.pk |
|
274 |
request.service = SimpleLazyObject(lambda: get_service_from_session(request)) |
|
275 |
return self.get_response(request) |
|
276 | ||
277 | ||
278 | 264 |
def journal_middleware(get_response): |
279 | 265 |
from . import journal |
280 | 266 |
src/authentic2/models.py | ||
---|---|---|
451 | 451 |
def get_absolute_url(self): |
452 | 452 |
return reverse('a2-manager-service', kwargs={'service_pk': self.pk}) |
453 | 453 | |
454 |
def get_base_urls(self): |
|
455 |
return [] |
|
456 | ||
454 | 457 | |
455 | 458 |
Service._meta.natural_key = [['slug', 'ou']] |
456 | 459 |
src/authentic2/settings.py | ||
---|---|---|
81 | 81 |
'django.contrib.messages.context_processors.messages', |
82 | 82 |
'django.template.context_processors.static', |
83 | 83 |
'authentic2.context_processors.a2_processor', |
84 |
'authentic2.context_processors.home', |
|
84 | 85 |
], |
85 | 86 |
}, |
86 | 87 |
}, |
... | ... | |
96 | 97 |
'django.middleware.common.CommonMiddleware', |
97 | 98 |
'django.middleware.http.ConditionalGetMiddleware', |
98 | 99 |
'django.contrib.sessions.middleware.SessionMiddleware', |
99 |
'authentic2.middleware.SaveServiceInSessionMiddleware', |
|
100 | 100 |
'django.middleware.csrf.CsrfViewMiddleware', |
101 | 101 |
'django.middleware.locale.LocaleMiddleware', |
102 | 102 |
'django.contrib.auth.middleware.AuthenticationMiddleware', |
src/authentic2/templates/authentic2/base.html | ||
---|---|---|
12 | 12 |
{% endblock %} |
13 | 13 | |
14 | 14 |
{% block bodyargs %} |
15 |
data-service-slug="{{ service.slug }}" data-service-name="{{ service.name }}" |
|
15 |
{% if home_url %}data-home-url="{{ home_url }}"{% endif %} |
|
16 |
{% if home_service %}data-home-service-slug="{{ home_service.slug }}" data-home-service-name="{{ home_service.name }}"{% endif %} |
|
17 |
{% if home_ou %}data-home-ou-slug="{{ home_ou.slug }}" data-home-ou-name="{{ home_ou.name }}"{% endif %} |
|
16 | 18 |
{% endblock %} |
17 | 19 | |
18 | 20 |
{% block extrascripts %} |
src/authentic2/utils/misc.py | ||
---|---|---|
50 | 50 |
from authentic2.saml.saml2utils import filter_attribute_private_key, filter_element_private_key |
51 | 51 | |
52 | 52 |
from .. import app_settings, constants, crypto, plugins |
53 |
from .service import set_service_ref |
|
54 | 53 | |
55 | 54 | |
56 | 55 |
class CleanLogMessage(logging.Filter): |
... | ... | |
455 | 454 |
return None |
456 | 455 | |
457 | 456 | |
458 |
def login(request, user, how, service=None, service_slug=None, nonce=None, record=True, **kwargs):
|
|
457 |
def login(request, user, how, nonce=None, record=True, **kwargs): |
|
459 | 458 |
"""Login a user model, record the authentication event and redirect to next |
460 | 459 |
URL or settings.LOGIN_REDIRECT_URL.""" |
461 | 460 |
from .. import hooks |
461 |
from .service import get_service |
|
462 | 462 |
from .views import check_cookie_works |
463 | 463 | |
464 |
if service: |
|
465 |
assert service_slug is None |
|
466 |
service_slug = service.slug |
|
467 | 464 |
check_cookie_works(request) |
468 | 465 |
last_login = user.last_login |
469 | 466 |
auth_login(request, user) |
... | ... | |
472 | 469 |
if constants.LAST_LOGIN_SESSION_KEY not in request.session: |
473 | 470 |
request.session[constants.LAST_LOGIN_SESSION_KEY] = localize(to_current_timezone(last_login), True) |
474 | 471 |
record_authentication_event(request, how, nonce=nonce) |
475 |
hooks.call_hooks('event', name='login', user=user, how=how, service=service_slug)
|
|
472 |
hooks.call_hooks('event', name='login', user=user, how=how, service=get_service(request))
|
|
476 | 473 |
# prevent logint-hint to influence next use of the login page |
477 | 474 |
if 'login-hint' in request.session: |
478 | 475 |
del request.session['login-hint'] |
479 | 476 |
if record: |
480 |
request.journal.record('user.login', how=how, service=service)
|
|
477 |
request.journal.record('user.login', how=how) |
|
481 | 478 |
return continue_to_next_url(request, **kwargs) |
482 | 479 | |
483 | 480 | |
484 |
def login_require(request, next_url=None, login_url='auth_login', service=None, login_hint=(), **kwargs):
|
|
481 |
def login_require(request, next_url=None, login_url='auth_login', login_hint=(), **kwargs): |
|
485 | 482 |
'''Require a login and come back to current URL''' |
486 | 483 | |
487 | 484 |
next_url = next_url or request.get_full_path() |
488 | 485 |
params = kwargs.setdefault('params', {}) |
489 | 486 |
params[REDIRECT_FIELD_NAME] = next_url |
490 |
if service: |
|
491 |
set_service_ref(params, service) |
|
492 | 487 |
if login_hint: |
493 | 488 |
request.session['login-hint'] = list(login_hint) |
494 | 489 |
elif 'login-hint' in request.session: |
... | ... | |
735 | 730 |
return field.related_model |
736 | 731 | |
737 | 732 | |
738 |
def get_registration_url(request, service=None):
|
|
733 |
def get_registration_url(request): |
|
739 | 734 |
next_url = select_next_url(request, settings.LOGIN_REDIRECT_URL) |
740 | 735 |
next_url = make_url( |
741 | 736 |
next_url, request=request, keep_params=True, include=(constants.NONCE_FIELD_NAME,), resolve=False |
742 | 737 |
) |
743 | 738 |
params = {REDIRECT_FIELD_NAME: next_url} |
744 |
if service: |
|
745 |
set_service_ref(params, service) |
|
746 | 739 |
return make_url('registration_register', params=params) |
747 | 740 | |
748 | 741 | |
... | ... | |
1041 | 1034 |
return next_url |
1042 | 1035 | |
1043 | 1036 | |
1044 |
def select_next_url(request, default, field_name=None, include_post=False, replace=None): |
|
1037 |
EMPTY = object() |
|
1038 | ||
1039 | ||
1040 |
def select_next_url(request, default=EMPTY, field_name=None, include_post=False, replace=None): |
|
1045 | 1041 |
'''Select the first valid next URL''' |
1046 | 1042 |
# pylint: disable=consider-using-ternary |
1043 |
if default is EMPTY: |
|
1044 |
if request.user.is_authenticated and request.user.ou and request.user.ou.home_url: |
|
1045 |
default = request.user.ou.home_url |
|
1046 |
else: |
|
1047 |
default = settings.LOGIN_REDIRECT_URL |
|
1047 | 1048 |
next_url = (include_post and get_next_url(request.POST, field_name=field_name)) or get_next_url( |
1048 | 1049 |
request.GET, field_name=field_name |
1049 | 1050 |
) |
... | ... | |
1143 | 1144 |
return True |
1144 | 1145 | |
1145 | 1146 | |
1146 |
def simulate_authentication(request, user, method, backend=None, service=None, record=False, **kwargs):
|
|
1147 |
def simulate_authentication(request, user, method, backend=None, record=False, **kwargs): |
|
1147 | 1148 |
"""Simulate a normal login by eventually forcing a backend attribute on the |
1148 | 1149 |
user instance""" |
1149 | 1150 |
if not getattr(user, 'backend', None) and not backend: |
... | ... | |
1151 | 1152 |
if backend: |
1152 | 1153 |
user = copy.deepcopy(user) |
1153 | 1154 |
user.backend = backend |
1154 |
return login(request, user, method, service=service, record=record, **kwargs)
|
|
1155 |
return login(request, user, method, record=record, **kwargs) |
|
1155 | 1156 | |
1156 | 1157 | |
1157 | 1158 |
def get_manager_login_url(): |
src/authentic2/utils/service.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
from authentic2.constants import SERVICE_FIELD_NAME |
|
17 |
import urllib.parse |
|
18 | 18 | |
19 |
from django.apps import apps |
|
19 | 20 | |
20 |
def service_ref(service): |
|
21 |
if service.ou: |
|
22 |
return '%s %s' % (service.ou.slug, service.slug) |
|
23 |
else: |
|
24 |
return service.slug |
|
21 |
from authentic2.decorators import GlobalCache |
|
22 |
from authentic2.utils.misc import same_origin |
|
25 | 23 | |
26 | 24 | |
27 |
def get_service_from_ref(ref): |
|
25 |
@GlobalCache(timeout=60) |
|
26 |
def _base_urls_map(): |
|
28 | 27 |
from authentic2.models import Service |
29 | 28 | |
30 |
splitted = ref.split(' ') |
|
31 | ||
32 |
try: |
|
33 |
ou_slug, service_slug = splitted |
|
34 |
except ValueError: |
|
35 |
pass |
|
36 |
else: |
|
37 |
return Service.objects.filter(ou__slug=ou_slug, slug=service_slug).first() |
|
29 |
base_urls_map = {} |
|
30 |
for service in Service.objects.select_related().select_subclasses(): |
|
31 |
for url in service.get_base_urls(): |
|
32 |
base_urls_map[url] = (type(service), service.pk) |
|
33 |
return base_urls_map |
|
38 | 34 | |
39 |
try: |
|
40 |
(service_slug,) = splitted |
|
41 |
except ValueError: |
|
42 |
return None |
|
43 | 35 | |
44 |
service = Service.objects.filter(ou__isnull=True, slug=service_slug).first() |
|
36 |
def _set_service(session, service): |
|
37 |
if 'home_url' in session: |
|
38 |
del session['home_url'] |
|
45 | 39 |
if service: |
46 |
return service |
|
47 |
try: |
|
48 |
return Service.objects.get(slug=service_slug) |
|
49 |
except (Service.DoesNotExist, Service.MultipleObjectsReturned): |
|
50 |
return None |
|
51 | ||
52 | ||
53 |
def get_service_from_request(request): |
|
54 |
service_ref = request.GET.get(SERVICE_FIELD_NAME) |
|
55 |
if service_ref and '\x00' not in service_ref: |
|
56 |
return get_service_from_ref(service_ref) |
|
57 |
return None |
|
58 | ||
59 | ||
60 |
def get_service_from_session(request): |
|
61 |
session = getattr(request, 'session', None) |
|
62 |
if session and 'service_pk' in session: |
|
63 |
from authentic2.models import Service |
|
64 | ||
65 |
return Service.objects.get(pk=session['service_pk']) |
|
66 |
return None |
|
67 | ||
68 | ||
69 |
def get_service_from_token(params): |
|
70 |
ref = params.get(SERVICE_FIELD_NAME) |
|
71 |
if not ref: |
|
72 |
return None |
|
73 |
return get_service_from_ref(ref) |
|
74 | ||
75 | ||
76 |
def set_service_ref(params, service): |
|
77 |
params[SERVICE_FIELD_NAME] = service_ref(service) |
|
40 |
session['service_type'] = [type(service)._meta.app_label, type(service)._meta.model_name] |
|
41 |
session['service_pk'] = service.pk |
|
42 |
else: |
|
43 |
session.pop('sevice_type', None) |
|
44 |
session.pop('sevice_pk', None) |
|
45 | ||
46 | ||
47 |
def set_service(request, service): |
|
48 |
request._service = service |
|
49 |
_set_service(request.session, service) |
|
50 | ||
51 | ||
52 |
def set_home_url(request, url=None): |
|
53 |
if not url: |
|
54 |
from .misc import select_next_url |
|
55 | ||
56 |
url = select_next_url(request, default=None) |
|
57 |
if not url or not urllib.parse.urlparse(url).netloc: |
|
58 |
return |
|
59 |
urls_map = _base_urls_map() |
|
60 |
for base_url, (Model, pk) in urls_map.items(): |
|
61 |
if same_origin(base_url, url): |
|
62 |
set_service(request, Model.object.get(pk=pk)) |
|
63 |
break |
|
64 |
request.session['home_url'] = url |
|
65 | ||
66 | ||
67 |
def get_service(request): |
|
68 |
if not hasattr(request, '_service'): |
|
69 |
if 'service_type' in request.session and 'service_pk' in request.session: |
|
70 |
ServiceKlass = apps.get_app_config(request.session['service_type'][0]).get_model( |
|
71 |
request.session['service_type'][1] |
|
72 |
) |
|
73 |
request._service = ServiceKlass.objects.get(pk=request.session['service_pk']) |
|
74 |
else: |
|
75 |
request._service = None |
|
76 |
return getattr(request, '_service', None) |
src/authentic2/views.py | ||
---|---|---|
63 | 63 |
from .utils import misc as utils_misc |
64 | 64 |
from .utils import switch_user as utils_switch_user |
65 | 65 |
from .utils.evaluate import make_condition_context |
66 |
from .utils.service import get_service_from_request, get_service_from_token, set_service_ref
|
|
66 |
from .utils.service import get_service, set_home_url
|
|
67 | 67 |
from .utils.view_decorators import enable_view_restriction |
68 | 68 | |
69 | 69 |
User = get_user_model() |
... | ... | |
71 | 71 |
logger = logging.getLogger(__name__) |
72 | 72 | |
73 | 73 | |
74 |
class EditProfile(cbv.HookMixin, cbv.TemplateNamesMixin, UpdateView): |
|
74 |
class HomeURLMixin: |
|
75 |
def dispatch(self, request, *args, **kwargs): |
|
76 |
set_home_url(request) |
|
77 |
return super().dispatch(request, *args, **kwargs) |
|
78 | ||
79 | ||
80 |
class EditProfile(HomeURLMixin, cbv.HookMixin, cbv.TemplateNamesMixin, UpdateView): |
|
75 | 81 |
model = User |
76 | 82 |
template_names = ['profiles/edit_profile.html', 'authentic2/accounts_edit.html'] |
77 | 83 |
title = _('Edit account data') |
... | ... | |
182 | 188 |
edit_required_profile = login_required(EditRequired.as_view()) |
183 | 189 | |
184 | 190 | |
185 |
class EmailChangeView(cbv.TemplateNamesMixin, FormView): |
|
191 |
class EmailChangeView(HomeURLMixin, cbv.TemplateNamesMixin, FormView):
|
|
186 | 192 |
template_names = ['profiles/email_change.html', 'authentic2/change_email.html'] |
187 | 193 |
title = _('Email Change') |
188 | 194 |
success_url = '..' |
... | ... | |
295 | 301 | |
296 | 302 |
redirect_to = request.GET.get(redirect_field_name) |
297 | 303 | |
298 |
service = get_service_from_request(request) |
|
299 | ||
300 | 304 |
if not redirect_to or ' ' in redirect_to: |
301 | 305 |
redirect_to = settings.LOGIN_REDIRECT_URL |
302 | 306 |
# Heavier security check -- redirects to http://example.com should |
... | ... | |
311 | 315 | |
312 | 316 |
blocks = [] |
313 | 317 | |
314 |
registration_url = utils_misc.get_registration_url(request, service=service)
|
|
318 |
registration_url = utils_misc.get_registration_url(request) |
|
315 | 319 | |
316 | 320 |
context = { |
317 | 321 |
'cancel': app_settings.A2_LOGIN_DISPLAY_A_CANCEL_BUTTON and nonce is not None, |
... | ... | |
346 | 350 |
parameters = {'request': request, 'context': context} |
347 | 351 |
login_hint = set(request.session.get('login-hint', [])) |
348 | 352 |
show_ctx = make_condition_context(request=request, login_hint=login_hint) |
353 |
service = get_service(request) |
|
349 | 354 |
if service: |
350 | 355 |
show_ctx['service_ou_slug'] = service.ou and service.ou.slug |
351 | 356 |
show_ctx['service_slug'] = service.slug |
... | ... | |
421 | 426 |
template_names = ['idp/homepage.html', 'authentic2/homepage.html'] |
422 | 427 | |
423 | 428 |
def dispatch(self, request, *args, **kwargs): |
424 |
if app_settings.A2_HOMEPAGE_URL: |
|
429 |
home_url = request.session.get('home_url') |
|
430 |
home_url = home_url or ( |
|
431 |
request.user.is_authenticated and request.user and request.user.ou and request.user.ou.home_url |
|
432 |
) |
|
433 |
home_url = app_settings.A2_HOMEPAGE_URL |
|
434 |
if home_url: |
|
425 | 435 |
return utils_misc.redirect(request, app_settings.A2_HOMEPAGE_URL) |
426 | 436 |
return login_required(super().dispatch)(request, *args, **kwargs) |
427 | 437 | |
... | ... | |
435 | 445 |
homepage = enable_view_restriction(Homepage.as_view()) |
436 | 446 | |
437 | 447 | |
438 |
class ProfileView(cbv.TemplateNamesMixin, TemplateView): |
|
448 |
class ProfileView(HomeURLMixin, cbv.TemplateNamesMixin, TemplateView):
|
|
439 | 449 |
template_names = ['idp/account_management.html', 'authentic2/accounts.html'] |
440 | 450 |
title = _('Your account') |
441 | 451 | |
... | ... | |
889 | 899 |
password_reset_confirm = PasswordResetConfirmView.as_view() |
890 | 900 | |
891 | 901 | |
892 |
class BaseRegistrationView(FormView): |
|
902 |
class BaseRegistrationView(HomeURLMixin, FormView):
|
|
893 | 903 |
form_class = registration_forms.RegistrationForm |
894 | 904 |
template_name = 'registration/registration_form.html' |
895 | 905 |
title = _('Registration') |
... | ... | |
912 | 922 |
if 'ou' in self.token: |
913 | 923 |
self.ou = OU.objects.get(pk=self.token['ou']) |
914 | 924 |
self.next_url = self.token.pop(REDIRECT_FIELD_NAME, utils_misc.select_next_url(request, None)) |
925 |
set_home_url(request, self.next_url) |
|
915 | 926 |
return super().dispatch(request, *args, **kwargs) |
916 | 927 | |
917 | 928 |
def form_valid(self, form): |
... | ... | |
978 | 989 |
for field in form.cleaned_data: |
979 | 990 |
self.token[field] = form.cleaned_data[field] |
980 | 991 | |
981 |
# propagate service to the registration completion view |
|
982 |
service = get_service_from_request(self.request) |
|
983 |
if service: |
|
984 |
set_service_ref(self.token, service) |
|
985 | ||
986 | 992 |
self.token.pop(REDIRECT_FIELD_NAME, None) |
987 | 993 |
self.token.pop('email', None) |
988 | 994 | |
... | ... | |
1066 | 1072 |
if self.ou: |
1067 | 1073 |
self.email_is_unique |= self.ou.email_is_unique |
1068 | 1074 |
self.init_fields_labels_and_help_texts() |
1069 |
# if registration is done during an SSO add the service to the registration event |
|
1070 |
self.service = get_service_from_token(self.token) |
|
1075 |
set_home_url(request, self.get_success_url()) |
|
1071 | 1076 |
return super().dispatch(request, *args, **kwargs) |
1072 | 1077 | |
1073 | 1078 |
def init_fields_labels_and_help_texts(self): |
... | ... | |
1180 | 1185 |
def get(self, request, *args, **kwargs): |
1181 | 1186 |
if len(self.users) == 1 and self.email_is_unique: |
1182 | 1187 |
# Found one user, EMAIL is unique, log her in |
1183 |
utils_misc.simulate_authentication( |
|
1184 |
request, self.users[0], method=self.authentication_method, service=self.service |
|
1185 |
) |
|
1188 |
utils_misc.simulate_authentication(request, self.users[0], method=self.authentication_method) |
|
1186 | 1189 |
return utils_misc.redirect(request, self.get_success_url()) |
1187 | 1190 |
confirm_data = self.token.get('confirm_data', False) |
1188 | 1191 | |
... | ... | |
1220 | 1223 |
uid = request.POST['uid'] |
1221 | 1224 |
for user in self.users: |
1222 | 1225 |
if str(user.id) == uid: |
1223 |
utils_misc.simulate_authentication( |
|
1224 |
request, user, method=self.authentication_method, service=self.service |
|
1225 |
) |
|
1226 |
utils_misc.simulate_authentication(request, user, method=self.authentication_method) |
|
1226 | 1227 |
return utils_misc.redirect(request, self.get_success_url()) |
1227 | 1228 |
return super().post(request, *args, **kwargs) |
1228 | 1229 | |
... | ... | |
1284 | 1285 |
view=self, |
1285 | 1286 |
authentication_method=self.authentication_method, |
1286 | 1287 |
token=self.token, |
1287 |
service=self.service and self.service.slug,
|
|
1288 |
service=get_service(request),
|
|
1288 | 1289 |
) |
1289 | 1290 |
self.send_registration_success_email(user) |
1290 | 1291 | |
1291 | 1292 |
def registration_success(self, request, user): |
1292 |
utils_misc.simulate_authentication( |
|
1293 |
request, user, method=self.authentication_method, service=self.service |
|
1294 |
) |
|
1293 |
utils_misc.simulate_authentication(request, user, method=self.authentication_method) |
|
1295 | 1294 |
message_template = loader.get_template('authentic2/registration_success_message.html') |
1296 | 1295 |
messages.info(self.request, message_template.render(request=request)) |
1297 | 1296 |
return utils_misc.redirect(request, self.get_success_url()) |
... | ... | |
1319 | 1318 |
registration_completion = RegistrationCompletionView.as_view() |
1320 | 1319 | |
1321 | 1320 | |
1322 |
class AccountDeleteView(TemplateView): |
|
1321 |
class AccountDeleteView(HomeURLMixin, TemplateView):
|
|
1323 | 1322 |
template_name = 'authentic2/accounts_delete_request.html' |
1324 | 1323 |
title = _('Request account deletion') |
1325 | 1324 | |
... | ... | |
1407 | 1406 |
registration_complete = RegistrationCompleteView.as_view() |
1408 | 1407 | |
1409 | 1408 | |
1410 |
class PasswordChangeView(DjPasswordChangeView): |
|
1409 |
class PasswordChangeView(HomeURLMixin, DjPasswordChangeView):
|
|
1411 | 1410 |
title = _('Password Change') |
1412 | 1411 |
do_not_call_in_templates = True |
1413 | 1412 | |
... | ... | |
1471 | 1470 |
su = SuView.as_view() |
1472 | 1471 | |
1473 | 1472 | |
1474 |
class Consents(ListView): |
|
1473 |
class Consents(HomeURLMixin, ListView):
|
|
1475 | 1474 |
template_name = 'authentic2/consents.html' |
1476 | 1475 |
title = _('Consent Management') |
1477 | 1476 |
model = OIDCAuthorization |
src/authentic2_auth_fc/views.py | ||
---|---|---|
42 | 42 |
from authentic2.utils import misc as utils_misc |
43 | 43 |
from authentic2.utils import views as utils_views |
44 | 44 |
from authentic2.utils.models import safe_get_or_create |
45 |
from authentic2.utils.service import get_service_from_ref, get_service_from_request, service_ref |
|
46 | 45 | |
47 | 46 |
from . import app_settings, models |
48 | 47 |
from .utils import ( |
... | ... | |
69 | 68 |
""" |
70 | 69 | |
71 | 70 |
_next_url = None |
72 |
service = None |
|
73 | 71 | |
74 | 72 |
@property |
75 | 73 |
def next_url(self): |
... | ... | |
114 | 112 |
def handle_authorization_response(self, request, code, state): |
115 | 113 |
# check state signature and parse it |
116 | 114 |
try: |
117 |
state, self._next_url, self.service = self.decode_state(state)
|
|
115 |
state, self._next_url = self.decode_state(state) |
|
118 | 116 |
except ValueError: |
119 | 117 |
return utils_misc.redirect(request, settings.LOGIN_REDIRECT_URL) |
120 | 118 | |
... | ... | |
186 | 184 |
else: |
187 | 185 |
return self.login(request) |
188 | 186 | |
189 |
def encode_state(self, state, next_url, service):
|
|
187 |
def encode_state(self, state, next_url): |
|
190 | 188 |
encoded_state = state + ' ' + self.next_url + ' ' |
191 |
if service: |
|
192 |
encoded_state += service_ref(service) |
|
193 | 189 |
encoded_state += ' ' + hmac_url(settings.SECRET_KEY, encoded_state) |
194 | 190 |
return encoded_state |
195 | 191 | |
... | ... | |
197 | 193 |
payload, signature = state.rsplit(' ', 1) |
198 | 194 |
if not check_hmac_url(settings.SECRET_KEY, payload, signature): |
199 | 195 |
raise ValueError |
200 |
# service_ref can be made of one or two parts |
|
201 | 196 |
try: |
202 |
state, next_url, service_ref = payload.split(' ')
|
|
197 |
state, next_url, dummy = payload.split(' ')
|
|
203 | 198 |
except ValueError: |
204 |
state, next_url, ou_slug, service_slug = payload.split(' ') |
|
205 |
service_ref = ou_slug + ' ' + service_slug |
|
206 |
service = get_service_from_ref(service_ref) |
|
207 |
return state, next_url, service |
|
199 |
state, next_url, dummy, dummy = payload.split(' ') |
|
200 |
return state, next_url |
|
208 | 201 | |
209 | 202 |
def make_authorization_request(self, request): |
210 | 203 |
scope = ' '.join(set(['openid'] + app_settings.scopes)) |
211 |
service = self.service or get_service_from_request(request) |
|
212 | 204 | |
213 | 205 |
nonce_seed, nonce, state = hash_chain(3) |
214 | 206 | |
215 | 207 |
# encode the target service and next_url in the state |
216 | 208 |
full_state = state + ' ' + self.next_url + ' ' |
217 |
if service: |
|
218 |
full_state += service_ref(service) |
|
219 | 209 |
full_state += ' ' + hmac_url(settings.SECRET_KEY, full_state) |
220 | 210 |
params = { |
221 | 211 |
'client_id': app_settings.client_id, |
222 | 212 |
'scope': scope, |
223 | 213 |
'redirect_uri': self.redirect_uri, |
224 | 214 |
'response_type': 'code', |
225 |
'state': self.encode_state(state, self.next_url, service),
|
|
215 |
'state': self.encode_state(state, self.next_url), |
|
226 | 216 |
'nonce': nonce, |
227 | 217 |
'acr_values': 'eidas1', |
228 | 218 |
} |
... | ... | |
340 | 330 |
def finish_login(self, request, user, user_info, created): |
341 | 331 |
self.update_user_info(user, user_info) |
342 | 332 |
utils_views.check_cookie_works(request) |
343 |
utils_misc.login(request, user, 'france-connect', service=self.service)
|
|
333 |
utils_misc.login(request, user, 'france-connect') |
|
344 | 334 | |
345 | 335 |
# keep id_token around for logout |
346 | 336 |
request.session['fc_id_token'] = self.id_token |
src/authentic2_idp_cas/views.py | ||
---|---|---|
36 | 36 |
normalize_attribute_values, |
37 | 37 |
redirect, |
38 | 38 |
) |
39 |
from authentic2.utils.service import set_service |
|
39 | 40 |
from authentic2.utils.view_decorators import enable_view_restriction |
40 | 41 |
from authentic2.views import logout as logout_view |
41 | 42 |
from authentic2_idp_cas.constants import ( |
... | ... | |
151 | 152 |
model = Service.objects.for_service(service) |
152 | 153 |
if not model: |
153 | 154 |
return self.failure(request, service, 'service unknown') |
155 |
set_service(request, model) |
|
154 | 156 |
if renew and gateway: |
155 | 157 |
return self.failure(request, service, 'renew and gateway cannot be requested at the same time') |
156 | 158 | |
... | ... | |
464 | 466 |
if referrer: |
465 | 467 |
model = Service.objects.for_service(referrer) |
466 | 468 |
if model: |
469 |
set_service(request, model) |
|
467 | 470 |
return logout_view(request, next_url=next_url, check_referer=False, do_local=False) |
468 | 471 |
return redirect(request, next_url) |
469 | 472 |
src/authentic2_idp_oidc/views.py | ||
---|---|---|
49 | 49 |
from authentic2.decorators import setting_enabled |
50 | 50 |
from authentic2.exponential_retry_timeout import ExponentialRetryTimeout |
51 | 51 |
from authentic2.utils.misc import last_authentication_event, login_require, make_url, redirect |
52 |
from authentic2.utils.service import set_service |
|
52 | 53 |
from authentic2.utils.view_decorators import check_view_restriction |
53 | 54 |
from authentic2.views import logout as a2_logout |
54 | 55 | |
... | ... | |
254 | 255 |
client = get_client(client_id=client_id) |
255 | 256 |
if not client: |
256 | 257 |
raise InvalidRequest(_('Unknown client identifier: "%s"') % client_id) |
258 |
# define the current service |
|
259 |
set_service(request, client) |
|
257 | 260 |
try: |
258 | 261 |
client.validate_redirect_uri(redirect_uri) |
259 | 262 |
except ValueError: |
... | ... | |
341 | 344 |
params = {} |
342 | 345 |
if nonce is not None: |
343 | 346 |
params['nonce'] = nonce |
344 |
return login_require(request, params=params, service=client, login_hint=login_hint)
|
|
347 |
return login_require(request, params=params, login_hint=login_hint) |
|
345 | 348 | |
346 | 349 |
# view restriction and passive SSO |
347 | 350 |
if hasattr(request, 'view_restriction_response'): |
... | ... | |
360 | 363 |
params = {} |
361 | 364 |
if nonce is not None: |
362 | 365 |
params['nonce'] = nonce |
363 |
return login_require(request, params=params, service=client, login_hint=login_hint)
|
|
366 |
return login_require(request, params=params, login_hint=login_hint) |
|
364 | 367 | |
365 | 368 |
iat = now() # iat = issued at |
366 | 369 | |
... | ... | |
820 | 823 |
) |
821 | 824 |
for provider in providers: |
822 | 825 |
if post_logout_redirect_uri in provider.post_logout_redirect_uris.split(): |
826 |
set_service(request, provider) |
|
823 | 827 |
break |
824 | 828 |
else: |
825 | 829 |
messages.warning(request, _('Invalid post logout URI')) |
tests/auth_fc/conftest.py | ||
---|---|---|
160 | 160 | |
161 | 161 | |
162 | 162 |
@pytest.fixture |
163 |
def franceconnect(settings, db): |
|
163 |
def service(db): |
|
164 |
return Service.objects.create(name='portail', slug='portail', ou=get_default_ou()) |
|
165 | ||
166 | ||
167 |
@pytest.fixture |
|
168 |
def franceconnect(settings, service, db): |
|
164 | 169 |
settings.A2_FC_ENABLE = True |
165 | 170 |
settings.A2_FC_CLIENT_ID = CLIENT_ID |
166 | 171 |
settings.A2_FC_CLIENT_SECRET = CLIENT_SECRET |
167 | 172 | |
168 |
Service.objects.create(name='portail', slug='portail', ou=get_default_ou()) |
|
169 | 173 |
mock_object = FranceConnectMock() |
170 | 174 |
with mock_object(): |
171 | 175 |
yield mock_object |
tests/auth_fc/test_auth_fc.py | ||
---|---|---|
32 | 32 |
from authentic2.a2_rbac.utils import get_default_ou |
33 | 33 |
from authentic2.apps.journal.models import Event |
34 | 34 |
from authentic2.custom_user.models import DeletedUser |
35 |
from authentic2.models import Attribute, Service
|
|
35 |
from authentic2.models import Attribute |
|
36 | 36 |
from authentic2_auth_fc import models |
37 | 37 |
from authentic2_auth_fc.backends import FcBackend |
38 | 38 |
from authentic2_auth_fc.utils import requests_retry_session |
39 | 39 | |
40 |
from ..utils import get_link_from_mail, login |
|
40 |
from ..utils import get_link_from_mail, login, set_service
|
|
41 | 41 | |
42 | 42 |
User = get_user_model() |
43 | 43 | |
... | ... | |
54 | 54 | |
55 | 55 | |
56 | 56 |
def test_retry_authorization_if_state_is_lost(settings, app, franceconnect, hooks): |
57 |
response = app.get('/fc/callback/?next=/idp/&service=default%20portail', status=302)
|
|
57 |
response = app.get('/fc/callback/?next=/idp/', status=302) |
|
58 | 58 |
# clear fc-state cookie |
59 | 59 |
app.cookiejar.clear() |
60 | 60 |
response = franceconnect.handle_authorization(app, response.location, status=302) |
... | ... | |
81 | 81 |
assert response.location == reverse('fc-login-or-link') |
82 | 82 | |
83 | 83 | |
84 |
def test_create(settings, app, franceconnect, hooks): |
|
84 |
def test_create(settings, app, franceconnect, hooks, service):
|
|
85 | 85 |
# test direct creation |
86 | ||
87 |
response = app.get('/login/?service=portail&next=/idp/')
|
|
86 |
set_service(app, service) |
|
87 |
response = app.get('/login/?next=/idp/') |
|
88 | 88 |
response = response.click(href='callback') |
89 | 89 | |
90 | 90 |
assert User.objects.count() == 0 |
91 |
assert Event.objects.which_references(Service.objects.get()).count() == 0
|
|
91 |
assert Event.objects.which_references(service).count() == 0
|
|
92 | 92 |
response = franceconnect.handle_authorization(app, response.location, status=302) |
93 | 93 |
assert 'fc-state' not in app.cookies |
94 | 94 |
assert User.objects.count() == 1 |
95 | 95 |
# check login for service=portail was registered |
96 |
assert Event.objects.which_references(Service.objects.get()).count() == 1
|
|
96 |
assert Event.objects.which_references(service).count() == 1
|
|
97 | 97 | |
98 | 98 |
user = User.objects.get() |
99 | 99 |
assert user.verified_attributes.first_name == 'Ÿuñe' |
100 | 100 |
assert user.verified_attributes.last_name == 'Frédérique' |
101 | 101 |
assert path(response.location) == '/idp/' |
102 | 102 |
assert hooks.event[1]['kwargs']['name'] == 'login' |
103 |
assert hooks.event[1]['kwargs']['service'] == 'portail'
|
|
103 |
assert hooks.event[1]['kwargs']['service'] == service
|
|
104 | 104 |
# we must be connected |
105 | 105 |
assert app.session['_auth_user_id'] |
106 | 106 |
assert app.session.get_expire_at_browser_close() |
... | ... | |
130 | 130 |
# test direct creation failure on an expired id_token |
131 | 131 |
franceconnect.exp = now() - datetime.timedelta(seconds=30) |
132 | 132 | |
133 |
response = app.get('/login/?service=portail&next=/idp/')
|
|
133 |
response = app.get('/login/?next=/idp/') |
|
134 | 134 |
response = response.click(href='callback') |
135 | 135 | |
136 | 136 |
assert User.objects.count() == 0 |
tests/idp_oidc/test_misc.py | ||
---|---|---|
965 | 965 |
def test_registration_service_slug(oidc_settings, app, simple_oidc_client, simple_user, hooks, mailoutbox): |
966 | 966 |
redirect_uri = simple_oidc_client.redirect_uris.split()[0] |
967 | 967 | |
968 |
simple_oidc_client.ou.home_url = 'https://portal/' |
|
969 |
simple_oidc_client.ou.save() |
|
970 | ||
968 | 971 |
params = { |
969 | 972 |
'client_id': simple_oidc_client.client_id, |
970 | 973 |
'scope': 'openid profile email', |
... | ... | |
977 | 980 |
authorize_url = make_url('oidc-authorize', params=params) |
978 | 981 |
response = app.get(authorize_url) |
979 | 982 | |
980 |
location = urllib.parse.urlparse(response['Location']) |
|
981 |
query = urllib.parse.parse_qs(location.query) |
|
982 |
assert query['service'] == ['default client'] |
|
983 | 983 |
response = response.follow().click('Register') |
984 |
location = urllib.parse.urlparse(response.request.url) |
|
985 |
query = urllib.parse.parse_qs(location.query) |
|
986 |
assert query['service'] == ['default client'] |
|
987 | ||
988 | 984 |
response.form.set('email', 'john.doe@example.com') |
989 | 985 |
response = response.form.submit() |
990 | 986 |
assert len(mailoutbox) == 1 |
991 | 987 |
link = utils.get_link_from_mail(mailoutbox[0]) |
992 | 988 |
response = app.get(link) |
989 |
body = response.pyquery('body')[0] |
|
990 |
assert body.attrib['data-home-ou-slug'] == 'default' |
|
991 |
assert body.attrib['data-home-ou-name'] == 'Default organizational unit' |
|
992 |
assert body.attrib['data-home-service-slug'] == 'client' |
|
993 |
assert body.attrib['data-home-service-name'] == 'client' |
|
994 |
assert body.attrib['data-home-url'] == 'https://portal/' |
|
993 | 995 |
response.form.set('first_name', 'John') |
994 | 996 |
response.form.set('last_name', 'Doe') |
995 | 997 |
response.form.set('password1', 'T0==toto') |
... | ... | |
999 | 1001 |
assert hooks.event[0]['kwargs']['service'].slug == 'client' |
1000 | 1002 | |
1001 | 1003 |
assert hooks.event[1]['kwargs']['name'] == 'registration' |
1002 |
assert hooks.event[1]['kwargs']['service'] == 'client' |
|
1004 |
assert hooks.event[1]['kwargs']['service'].slug == 'client'
|
|
1003 | 1005 | |
1004 | 1006 |
assert hooks.event[2]['kwargs']['name'] == 'login' |
1005 | 1007 |
assert hooks.event[2]['kwargs']['how'] == 'email' |
1006 |
assert hooks.event[2]['kwargs']['service'] == 'client' |
|
1008 |
assert hooks.event[2]['kwargs']['service'].slug == 'client'
|
|
1007 | 1009 | |
1008 | 1010 | |
1009 | 1011 |
def test_claim_default_value(oidc_settings, normal_oidc_client, simple_user, app): |
tests/test_context_processors.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2022 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from . import utils |
|
18 | ||
19 | ||
20 |
def test_home(app, settings, simple_user, service): |
|
21 |
from authentic2.a2_rbac.utils import get_default_ou |
|
22 |
from authentic2.models import Service |
|
23 | ||
24 |
utils.set_service(app, service) |
|
25 | ||
26 |
settings.LOGIN_REDIRECT_URL = 'https://portal1/' |
|
27 | ||
28 |
resp = app.get('/login/') |
|
29 |
body = resp.pyquery('body') |
|
30 |
assert body.attr('data-home-url') == 'https://portal1/' |
|
31 |
assert body.attr('data-home-service-slug') == service.slug |
|
32 |
assert body.attr('data-home-service-name') == service.name |
|
33 |
assert body.attr('data-home-ou-slug') == service.ou.slug |
|
34 |
assert body.attr('data-home-ou-name') == service.ou.name |
|
35 | ||
36 |
settings.A2_HOMEPAGE_URL = 'https://portal2/' |
|
37 |
resp = app.get('/login/') |
|
38 |
body = resp.pyquery('body') |
|
39 |
assert body.attr('data-home-url') == 'https://portal2/' |
|
40 | ||
41 |
service.ou.home_url = 'https://portal3/' |
|
42 |
service.ou.save() |
|
43 |
resp = app.get('/login/') |
|
44 |
body = resp.pyquery('body') |
|
45 |
assert body.attr('data-home-url') == 'https://portal3/' |
|
46 | ||
47 |
# if user comes back from a different service, the information is updated |
|
48 |
new_service = Service.objects.create(ou=get_default_ou(), slug='service2', name='Service2') |
|
49 |
utils.set_service(app, new_service) |
|
50 | ||
51 |
resp = app.get('/login/') |
|
52 |
body = resp.pyquery('body') |
|
53 |
assert body.attr('data-home-url') == 'https://portal3/' |
|
54 |
assert body.attr('data-home-service-slug') == new_service.slug |
|
55 |
assert body.attr('data-home-service-name') == new_service.name |
|
56 |
assert body.attr('data-home-ou-slug') == new_service.ou.slug |
|
57 |
assert body.attr('data-home-ou-name') == new_service.ou.name |
tests/test_idp_saml2.py | ||
---|---|---|
33 | 33 |
from django.utils.translation import gettext as _ |
34 | 34 | |
35 | 35 |
from authentic2.a2_rbac.models import OrganizationalUnit, Role |
36 |
from authentic2.constants import NONCE_FIELD_NAME, SERVICE_FIELD_NAME
|
|
36 |
from authentic2.constants import NONCE_FIELD_NAME |
|
37 | 37 |
from authentic2.custom_user.models import User |
38 | 38 |
from authentic2.idp.saml import saml2_endpoints |
39 | 39 |
from authentic2.idp.saml.saml2_endpoints import get_extensions, get_login_hints_extension |
... | ... | |
330 | 330 |
reverse('auth_login'), |
331 | 331 |
**{ |
332 | 332 |
'nonce': '*', |
333 |
SERVICE_FIELD_NAME: 'default ' + self.sp.slug, |
|
334 | 333 |
REDIRECT_FIELD_NAME: make_url( |
335 | 334 |
'a2-idp-saml-continue', params={NONCE_FIELD_NAME: request_id} |
336 | 335 |
), |
tests/test_login.py | ||
---|---|---|
22 | 22 |
from authentic2 import models |
23 | 23 |
from authentic2.utils.misc import get_token_login_url |
24 | 24 | |
25 |
from .utils import assert_event, login |
|
25 |
from .utils import assert_event, login, set_service
|
|
26 | 26 | |
27 | 27 |
User = get_user_model() |
28 | 28 | |
... | ... | |
85 | 85 |
assert len(caplog.records) == 1 |
86 | 86 | |
87 | 87 | |
88 |
def test_show_condition_service(db, app, settings): |
|
88 |
def test_show_condition_service(db, rf, app, settings): |
|
89 |
portal = models.Service.objects.create(pk=1, name='Service', slug='portal') |
|
90 |
service = models.Service.objects.create(pk=2, name='Service', slug='service') |
|
89 | 91 |
settings.AUTH_FRONTENDS_KWARGS = {'password': {'show_condition': 'service_slug == \'portal\''}} |
90 |
response = app.get('/login/', params={}) |
|
91 |
assert 'name="login-password-submit"' not in response |
|
92 | 92 | |
93 |
# service doesn't exist |
|
94 |
response = app.get('/login/', params={'service': 'portal'}) |
|
93 |
response = app.get('/login/') |
|
95 | 94 |
assert 'name="login-password-submit"' not in response |
96 | 95 | |
97 |
# Create a service
|
|
98 |
models.Service.objects.create(name='Service', slug='portal') |
|
99 |
response = app.get('/login/', params={'service': 'portal'})
|
|
96 |
set_service(app, portal)
|
|
97 | ||
98 |
response = app.get('/login/') |
|
100 | 99 |
assert 'name="login-password-submit"' in response |
101 | 100 | |
102 |
models.Service.objects.create(name='Service', slug='service') |
|
103 |
response = app.get('/login/', params={'service': 'service'}) |
|
101 |
set_service(app, service) |
|
102 | ||
103 |
response = app.get('/login/') |
|
104 | 104 |
assert 'name="login-password-submit"' not in response |
105 | 105 | |
106 | 106 | |
... | ... | |
251 | 251 |
response = app.get('/login/') |
252 | 252 |
assert response.pyquery.find('select#id_ou option[selected]')[0].text == 'Default organizational unit' |
253 | 253 | |
254 |
set_service(app, service) |
|
254 | 255 |
# service is specified but not access-control is defined, default for user is selected |
255 |
response = app.get('/login/?service=service')
|
|
256 |
response = app.get('/login/') |
|
256 | 257 |
assert response.pyquery.find('select#id_ou option[selected]')[0].text == 'Default organizational unit' |
257 | 258 | |
258 | 259 |
# service is specified, access control is defined but role is empty, default for user is selected |
259 | 260 |
service.authorized_roles.through.objects.create(service=service, role=role_ou1) |
260 |
response = app.get('/login/?service=service')
|
|
261 |
response = app.get('/login/') |
|
261 | 262 |
assert response.pyquery.find('select#id_ou option[selected]')[0].text == 'Default organizational unit' |
262 | 263 | |
263 | 264 |
# user is added to role_ou1, default for user is still selected |
264 | 265 |
user_ou1.roles.add(role_ou1) |
265 |
response = app.get('/login/?service=service')
|
|
266 |
response = app.get('/login/') |
|
266 | 267 |
assert response.pyquery.find('select#id_ou option[selected]')[0].text == 'Default organizational unit' |
267 | 268 | |
268 | 269 |
# Clear cookies, OU1 is selected |
269 | 270 |
app.cookiejar.clear() |
270 |
response = app.get('/login/?service=service') |
|
271 |
set_service(app, service) |
|
272 |
response = app.get('/login/') |
|
271 | 273 |
assert response.pyquery.find('select#id_ou option[selected]')[0].text == 'OU1' |
272 | 274 | |
273 | 275 |
# if we change the user's ou, then default selected OU changes |
274 | 276 |
user_ou1.ou = ou2 |
275 | 277 |
user_ou1.save() |
276 |
response = app.get('/login/?service=service')
|
|
278 |
response = app.get('/login/') |
|
277 | 279 |
assert response.pyquery.find('select#id_ou option[selected]')[0].text == 'OU2' |
278 | 280 | |
279 | 281 |
tests/test_template.py | ||
---|---|---|
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
import pytest |
18 |
from django.urls import reverse |
|
19 | 18 | |
20 |
from authentic2.a2_rbac.utils import get_default_ou |
|
21 |
from authentic2.models import Service |
|
22 | 19 |
from authentic2.utils.template import Template, TemplateError |
23 | 20 | |
24 | 21 |
pytestmark = pytest.mark.django_db |
... | ... | |
111 | 108 |
with pytest.raises(TemplateError) as raised: |
112 | 109 |
template.render(context=context) |
113 | 110 |
assert 'missing template variable' in raised |
114 | ||
115 | ||
116 |
def test_service_in_template(app, simple_user, service): |
|
117 |
resp = app.get(reverse('auth_login') + '?service=%s' % service.slug) |
|
118 | ||
119 |
assert resp.pyquery('body').attr('data-service-slug') == service.slug |
|
120 |
assert resp.pyquery('body').attr('data-service-name') == service.name |
|
121 | ||
122 |
resp.form.set('username', simple_user.username) |
|
123 |
resp.form.set('password', simple_user.username) |
|
124 |
resp.form.submit(name='login-password-submit') |
|
125 | ||
126 |
resp = app.get(reverse('account_management')) |
|
127 |
assert resp.pyquery('body').attr('data-service-slug') == service.slug |
|
128 |
assert resp.pyquery('body').attr('data-service-name') == service.name |
|
129 | ||
130 |
# if user comes back from a different service, the information is updated |
|
131 |
new_service = Service.objects.create(ou=get_default_ou(), slug='service2', name='Service2') |
|
132 |
resp = app.get(reverse('account_management') + '?service=%s' % new_service.slug) |
|
133 |
assert resp.pyquery('body').attr('data-service-slug') == new_service.slug |
|
134 |
assert resp.pyquery('body').attr('data-service-name') == new_service.name |
tests/utils.py | ||
---|---|---|
307 | 307 |
) |
308 | 308 |
elif data and count > 1: |
309 | 309 |
assert qs.filter(**{'data__' + k: v for k, v in data.items()}).count() == 1 |
310 | ||
311 | ||
312 |
def set_service(app, service): |
|
313 |
from importlib import import_module |
|
314 | ||
315 |
from django.conf import settings |
|
316 | ||
317 |
from authentic2.utils.service import _set_service |
|
318 | ||
319 |
engine = import_module(settings.SESSION_ENGINE) |
|
320 |
if app.session == {}: |
|
321 |
session = engine.SessionStore() |
|
322 |
else: |
|
323 |
session = app.session |
|
324 |
_set_service(session, service) |
|
325 |
session.save() |
|
326 |
if app.session == {}: |
|
327 |
app.set_cookie(settings.SESSION_COOKIE_NAME, session.session_key) |
|
310 |
- |