From 232d567c822c0ea88fea1616e3f9b30df3c78011 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Tue, 13 Oct 2020 19:08:00 +0200 Subject: [PATCH 2/3] misc: integration of journal authentic views (#47155) --- src/authentic2/authenticators.py | 4 + src/authentic2/journal.py | 37 ++++ src/authentic2/journal_event_types.py | 266 ++++++++++++++++++++++++++ src/authentic2/middleware.py | 17 +- src/authentic2/settings.py | 1 + src/authentic2/utils/__init__.py | 4 + src/authentic2/utils/service.py | 14 +- src/authentic2/views.py | 18 +- src/authentic2_idp_oidc/views.py | 8 + tests/conftest.py | 3 +- tests/test_all.py | 5 +- tests/test_idp_oidc.py | 7 + tests/test_login.py | 20 +- tests/test_password_reset.py | 18 +- tests/test_registration.py | 6 +- tests/test_utils.py | 2 + tests/test_views.py | 25 ++- tests/utils.py | 54 +++++- 18 files changed, 470 insertions(+), 39 deletions(-) create mode 100644 src/authentic2/journal.py create mode 100644 src/authentic2/journal_event_types.py diff --git a/src/authentic2/authenticators.py b/src/authentic2/authenticators.py index 0f9ba781..f5958dfb 100644 --- a/src/authentic2/authenticators.py +++ b/src/authentic2/authenticators.py @@ -133,6 +133,10 @@ class LoginPasswordAuthenticator(BaseAuthenticator): utils.prepend_remember_cookie(request, response, 'preferred-ous', form.cleaned_data['ou'].pk) return response + else: + username = form.cleaned_data.get('username', '').strip() + if username: + request.journal.record('user.login.failure', username=username) context['form'] = form return render(request, 'authentic2/login_password_form.html', context) diff --git a/src/authentic2/journal.py b/src/authentic2/journal.py new file mode 100644 index 00000000..dfd7b9a1 --- /dev/null +++ b/src/authentic2/journal.py @@ -0,0 +1,37 @@ +# authentic2 - versatile identity manager +# Copyright (C) 2010-2020 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from authentic2.utils.service import get_service_from_request + +from authentic2.apps.journal.journal import Journal + + +class Journal(Journal): + def __init__(self, **kwargs): + self._service = kwargs.pop('service', None) + super().__init__(**kwargs) + + @property + def service(self): + return self._service or (get_service_from_request(self.request) if self.request else None) + + def massage_kwargs(self, record_parameters, kwargs): + if 'service' not in kwargs and 'service' in record_parameters: + kwargs['service'] = self.service + return super().massage_kwargs(record_parameters, kwargs) + + +journal = Journal() diff --git a/src/authentic2/journal_event_types.py b/src/authentic2/journal_event_types.py new file mode 100644 index 00000000..0126abd4 --- /dev/null +++ b/src/authentic2/journal_event_types.py @@ -0,0 +1,266 @@ +# authentic2 - versatile identity manager +# Copyright (C) 2010-2020 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from django.utils.translation import ugettext_lazy as _ + +from authentic2.custom_user.models import get_attributes_map +from authentic2.apps.journal.models import EventTypeDefinition +from authentic2.apps.journal.utils import form_to_old_new +from authentic2.custom_user.models import User + +from . import models + + +class EventTypeWithService(EventTypeDefinition): + @classmethod + def record(cls, user=None, service=None, session=None, references=None, data=None): + if service: + if not data: + data = {} + data['service_name'] = str(service) + if not references: + references = [] + references = [service] + references + super().record(user=user, session=session, references=references, data=data) + + @classmethod + def get_service_name(self, event): + service, = event.get_typed_references(models.Service) + if service is not None: + return str(service) + if 'service_name' in event.data: + return event.data['service_name'] + return '' + + +def login_method_label(how): + if how.startswith('password'): + return _('password') + elif how == 'fc': + return _('FranceConnect') + elif how == 'saml': + return _('SAML') + elif how == 'oidc': + return _('OpenIDConnect') + elif how: + return how + else: + return _('none') + + +def get_attributes_label(attributes_new_values): + # FIXME: attributes cache should be factorized at the level of the AttributeManager as done on ContentType, + # main difference is that attributes are editable objects + attributes_map = get_attributes_map() + for name in attributes_new_values: + if name in ('email', 'first_name', 'last_name'): + yield str(User._meta.get_field(name).verbose_name) + else: + if name in attributes_map: + yield attributes_map[name].label + else: + yield name + + +class UserLogin(EventTypeWithService): + name = 'user.login' + label = _('login') + + @classmethod + def record(cls, user, session, service, how): + super().record(user=user, session=session, service=service, data={'how': how}) + + @classmethod + def get_message(cls, event, context): + how = event.get_data('how') + return _('login using {method}').format(method=login_method_label(how)) + + +class UserLoginFailure(EventTypeWithService): + name = 'user.login.failure' + label = _('login failure') + + @classmethod + def record(cls, service, username): + super().record(service=service, data={'username': username}) + + @classmethod + def get_message(cls, event, context): + username = event.get_data('username') + return _('login failure with username "{username}"').format(username=username) + + +class UserRegistrationRequest(EventTypeDefinition): + name = 'user.registration.request' + label = _('registration request') + + @classmethod + def record(cls, email): + super().record(data={'email': email.lower()}) + + @classmethod + def get_message(cls, event, context): + email = event.get_data('email') + return _('registration request with email "%s"') % email + + +class UserRegistration(EventTypeWithService): + name = 'user.registration' + label = _('registration') + + @classmethod + def record(cls, user, session, service, how): + super().record(user=user, session=session, service=service, data={'how': how}) + + @classmethod + def get_message(cls, event, context): + how = event.get_data('how') + return _('registration using {method}').format(method=login_method_label(how)) + + +class UserLogout(EventTypeWithService): + name = 'user.logout' + label = _('logout') + + @classmethod + def record(cls, user, session, service): + super().record(user=user, session=session, service=service) + + @classmethod + def get_message(cls, event, context): + return _('logout') + + +class UserRequestPasswordReset(EventTypeDefinition): + name = 'user.password.reset.request' + label = _('password reset request') + + @classmethod + def record(cls, user, email): + super().record(user=user, data={'email': email.lower()}) + + @classmethod + def get_message(cls, event, context): + email = event.get_data('email') + if email: + return _('password reset request with email "%s"') % email + return super().get_message(event, context) + + +class UserResetPassword(EventTypeDefinition): + name = 'user.password.reset' + label = _('password reset') + + @classmethod + def record(cls, user, session): + super().record(user=user, session=session) + + +class UserResetPasswordFailure(EventTypeDefinition): + name = 'user.password.reset.failure' + label = _('password reset failure') + + @classmethod + def record(cls, email): + super().record(data={'email': email}) + + @classmethod + def get_message(cls, event, context): + email = event.get_data('email') + if email: + return _('password reset failure with email "%s"') % email + return super().get_message(event, context) + + +class UserChangePassword(EventTypeWithService): + name = 'user.password.change' + label = _('password change') + + @classmethod + def record(cls, user, session, service): + super().record(user=user, session=session, service=service) + + +class UserEdit(EventTypeWithService): + name = 'user.profile.edit' + label = _('profile edit') + + @classmethod + def record(cls, user, session, service, form): + data = form_to_old_new(form) + super().record(user=user, session=session, service=service, data=data) + + @classmethod + def get_message(cls, event, context): + new = event.get_data('new') + if new: + edited_attributes = ', '.join(get_attributes_label(new)) + return _('profile edit (%s)') % edited_attributes + return super().get_message(event, context) + + +class UserDeletion(EventTypeWithService): + name = 'user.deletion' + label = _('deletion') + + @classmethod + def record(cls, user, session, service): + super().record(user=user, session=session, service=service) + + +class UserServiceSSO(EventTypeWithService): + name = 'user.service.sso' + label = _('service single sign on') + + @classmethod + def record(cls, user, session, service, how): + super().record(user=user, session=session, service=service, data={'how': how}) + + @classmethod + def get_message(cls, event, context): + service_name = cls.get_service_name(event) + return _('service single sign on with "{service}"').format( + service=service_name) + + +class UserServiceSSOAuthorization(EventTypeWithService): + name = 'user.service.sso.authorization' + label = _('consentment to single sign on') + + @classmethod + def record(cls, user, session, service, **kwargs): + super().record(user=user, session=session, service=service, data=kwargs) + + @classmethod + def get_message(cls, event, context): + service_name = cls.get_service_name(event) + return _('authorization of single sign on with "{service}"').format( + service=service_name) + + +class UserServiceSSOUnauthorization(EventTypeWithService): + name = 'user.service.sso.unauthorization' + label = _('remove consentment to single sign on') + + @classmethod + def record(cls, user, session, service): + super().record(user=user, session=session, service=service) + + @classmethod + def get_message(cls, event, context): + service_name = cls.get_service_name(event) + return _('unauthorization of single sign on with "{service}"').format( + service=service_name) diff --git a/src/authentic2/middleware.py b/src/authentic2/middleware.py index f3b32aa0..67638cd6 100644 --- a/src/authentic2/middleware.py +++ b/src/authentic2/middleware.py @@ -27,12 +27,13 @@ from django.conf import settings from django.contrib import messages from django.utils.deprecation import MiddlewareMixin from django.utils.encoding import force_text +from django.utils.functional import SimpleLazyObject from django.utils.translation import ugettext as _ from django.utils.six.moves.urllib import parse as urlparse from django.shortcuts import render from . import app_settings, utils, plugins -from .utils.service import get_service_from_request +from .utils.service import get_service_from_request, get_service_from_session class CollectIPMiddleware(MiddlewareMixin): @@ -208,10 +209,18 @@ class SaveServiceInSessionMiddleware: self.get_response = get_response def __call__(self, request): - service = None - service = get_service_from_request(request) if service: request.session['service_pk'] = service.pk - + request.service = SimpleLazyObject(lambda: get_service_from_session(request)) return self.get_response(request) + + +def journal_middleware(get_response): + from . import journal + + def middleware(request): + request.journal = journal.Journal(request=request) + return get_response(request) + + return middleware diff --git a/src/authentic2/settings.py b/src/authentic2/settings.py index 44704435..ab8ce4fe 100644 --- a/src/authentic2/settings.py +++ b/src/authentic2/settings.py @@ -100,6 +100,7 @@ MIDDLEWARE = ( 'django.middleware.locale.LocaleMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', + 'authentic2.middleware.journal_middleware', ) DATABASES['default']['ATOMIC_REQUESTS'] = True diff --git a/src/authentic2/utils/__init__.py b/src/authentic2/utils/__init__.py index 8d344891..56052c2c 100644 --- a/src/authentic2/utils/__init__.py +++ b/src/authentic2/utils/__init__.py @@ -447,6 +447,7 @@ def login(request, user, how, service=None, service_slug=None, nonce=None, **kwa # prevent logint-hint to influence next use of the login page if 'login-hint' in request.session: del request.session['login-hint'] + request.journal.record('user.login', how=how) return continue_to_next_url(request, **kwargs) @@ -757,6 +758,7 @@ def send_registration_mail(request, email, ou, template_names=None, next_url=Non legacy_html_body_templates=['registration/activation_email.html']) logger.info(u'registration mail sent to %s with registration URL %s...', email, registration_url) + request.journal.record('user.registration.request', email=email) def send_account_deletion_code(request, user): @@ -823,6 +825,7 @@ def send_password_reset_mail(user, template_names=None, request=None, sign_next_url=True, **kwargs): from .. import middleware + from authentic2.journal import journal if not user.email: raise ValueError('user must have an email') @@ -852,6 +855,7 @@ def send_password_reset_mail(user, template_names=None, request=None, per_ou_templates=True, **kwargs) logger.info(u'password reset request for user %s, email sent to %s ' 'with token %s', user, user.email, token.uuid) + journal.record('user.password.reset.request', email=user.email, user=user) def batch(iterable, size): diff --git a/src/authentic2/utils/service.py b/src/authentic2/utils/service.py index 8468b0e4..bcce22ce 100644 --- a/src/authentic2/utils/service.py +++ b/src/authentic2/utils/service.py @@ -52,9 +52,17 @@ def get_service_from_ref(ref): def get_service_from_request(request): service_ref = request.GET.get(SERVICE_FIELD_NAME) - if not service_ref or '\x00' in service_ref: - return None - return get_service_from_ref(service_ref) + if service_ref and '\x00' not in service_ref: + return get_service_from_ref(service_ref) + return None + + +def get_service_from_session(request): + session = getattr(request, 'session', None) + if session and 'service_pk' in session: + from authentic2.models import Service + return Service.objects.get(pk=session['service_pk']) + return None def get_service_from_token(params): diff --git a/src/authentic2/views.py b/src/authentic2/views.py index d886d5c7..1827ce46 100644 --- a/src/authentic2/views.py +++ b/src/authentic2/views.py @@ -151,6 +151,7 @@ class EditProfile(cbv.HookMixin, cbv.TemplateNamesMixin, UpdateView): def form_valid(self, form): response = super(EditProfile, self).form_valid(form) hooks.call_hooks('event', name='edit-profile', user=self.request.user, form=form) + self.request.journal.record('user.profile.edit', form=form) return response edit_profile = decorators.setting_enabled('A2_PROFILE_CAN_EDIT_PROFILE')( @@ -575,6 +576,7 @@ def logout(request, targets = redirect_logout_list(request) logger.debug('Accumulated redirections : {}'.format(targets)) # Local logout + request.journal.record('user.logout') auth_logout(request) logger.info('Logged out') local_logout_done = True @@ -684,6 +686,7 @@ class PasswordResetView(FormView): if is_ratelimited(self.request, key='post:email', group='pw-reset-email', rate=app_settings.A2_EMAILS_ADDRESS_RATELIMIT, increment=True): + self.request.journal.record('user.password.reset.failure', email=email) form.add_error( 'email', _('Multiple emails have already been sent to this address. Further attempts are ' @@ -692,6 +695,7 @@ class PasswordResetView(FormView): return self.form_invalid(form) if is_ratelimited(self.request, key='ip', group='pw-reset-email', rate=app_settings.A2_EMAILS_IP_RATELIMIT, increment=True): + self.request.journal.record('user.password.reset.failure', email=email) form.add_error( 'email', _('Multiple password reset attempts have already been made from this IP address. No ' @@ -783,7 +787,9 @@ class PasswordResetConfirmView(cbv.RedirectToNextURLViewMixin, FormView): return self.finish() def finish(self): - return utils.simulate_authentication(self.request, self.user, 'email') + response = utils.simulate_authentication(self.request, self.user, 'email') + self.request.journal.record('user.password.reset') + return response password_reset_confirm = PasswordResetConfirmView.as_view() @@ -1136,6 +1142,11 @@ class RegistrationCompletionView(CreateView): return self.registration_success(self.request, form.instance, form) def registration_success(self, request, user, form): + request.journal.record( + 'user.registration', + user=user, + session=None, + how=self.authentication_method) hooks.call_hooks('event', name='registration', user=user, form=form, view=self, authentication_method=self.authentication_method, token=self.token, service=self.service and self.service.slug) @@ -1233,6 +1244,7 @@ class ValidateDeletionView(TemplateView): self.user.mark_as_deleted() logger.info(u'deletion of account %s performed', self.user) hooks.call_hooks('event', name='delete-account', user=self.user) + request.journal.record('user.deletion', user=self.user) if self.user == request.user: # No validation message displayed, as the user will surely # notice their own account deletion... @@ -1289,7 +1301,9 @@ class PasswordChangeView(DjPasswordChangeView): hooks.call_hooks('event', name='change-password', user=self.request.user, request=self.request) messages.info(self.request, _('Password changed')) models.PasswordReset.objects.filter(user=self.request.user).delete() - return super(PasswordChangeView, self).form_valid(form) + response = super(PasswordChangeView, self).form_valid(form) + self.request.journal.record('user.password.change', session=self.request.session) + return response def get_form_class(self): if self.request.user.has_usable_password(): diff --git a/src/authentic2_idp_oidc/views.py b/src/authentic2_idp_oidc/views.py index 83965aab..4253a59b 100644 --- a/src/authentic2_idp_oidc/views.py +++ b/src/authentic2_idp_oidc/views.py @@ -287,6 +287,10 @@ def authorize(request, *args, **kwargs): expired=start + datetime.timedelta(days=365)) if pk_to_deletes: auth_manager.filter(pk__in=pk_to_deletes).delete() + request.journal.record( + 'user.service.sso.authorization', + service=client, + scopes=list(sorted(scopes))) logger.info(u'authorized scopes %s saved for service %s', ' '.join(scopes), client.name) else: @@ -365,6 +369,10 @@ def authorize(request, *args, **kwargs): }) # query is transfered through the hashtag response = redirect(request, redirect_uri + '#%s' % urlencode(params), resolve=False) + request.journal.record( + 'user.service.sso', + service=client, + how=last_auth and last_auth.get('how')) hooks.call_hooks('event', name='sso-success', idp='oidc', service=client, user=request.user) utils.add_oidc_session(request, client) return response diff --git a/tests/conftest.py b/tests/conftest.py index dc5476c3..6e12af59 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -126,6 +126,7 @@ def create_user(**kwargs): password = kwargs.pop('password', None) or kwargs['username'] user, created = User.objects.get_or_create(**kwargs) if password: + user.clear_password = password user.set_password(password) user.save() return user @@ -167,7 +168,7 @@ def user_ou1(db, ou1): @pytest.fixture def user_ou2(db, ou2): - return create_user(username='john.doe', first_name=u'Jôhn', last_name=u'Dôe', + return create_user(username='john.doe.ou2', first_name=u'Jôhn', last_name=u'Dôe', email='john.doe@example.net', ou=ou2) diff --git a/tests/test_all.py b/tests/test_all.py index 9243bfc9..98215100 100644 --- a/tests/test_all.py +++ b/tests/test_all.py @@ -41,7 +41,7 @@ from django_rbac.utils import get_role_model, get_ou_model from authentic2 import utils, models, attribute_kinds -from .utils import Authentic2TestCase, get_response_form, get_link_from_mail +from .utils import Authentic2TestCase, get_response_form, get_link_from_mail, assert_event class SerializerTests(TestCase): @@ -215,6 +215,7 @@ class UserProfileTests(TestCase): user = User.objects.create(username='testbot') user.set_password('secret') user.save() + self.user = user self.client = Client() def test_edit_profile_attributes(self): @@ -249,6 +250,8 @@ class UserProfileTests(TestCase): for k, v in kwargs.items()) response = self.client.post(reverse('profile_edit'), kwargs) + new = {'custom': 'random data', 'next_url': '', 'national_number': 'xx20153566342yy'} + assert_event('user.profile.edit', user=self.user, session=self.client.session, old={}, new=new) self.assertEqual(response.status_code, 302) response = self.client.get(reverse('account_management')) diff --git a/tests/test_idp_oidc.py b/tests/test_idp_oidc.py index 962be9c1..3bb06f47 100644 --- a/tests/test_idp_oidc.py +++ b/tests/test_idp_oidc.py @@ -236,6 +236,13 @@ def test_authorization_code_sso(login_first, do_not_ask_again, oidc_settings, oi assert authz.expired >= now() else: assert OIDCAuthorization.objects.count() == 0 + utils.assert_event('user.service.sso.authorization', + session=app.session, + user=simple_user, service=oidc_client, + scopes=['email', 'openid', 'profile']) + utils.assert_event('user.service.sso', session=app.session, + user=simple_user, service=oidc_client, + how='password-on-https') if oidc_client.authorization_flow == oidc_client.FLOW_AUTHORIZATION_CODE: assert OIDCCode.objects.count() == 1 code = OIDCCode.objects.get() diff --git a/tests/test_login.py b/tests/test_login.py index 7302df19..dc9e97a6 100644 --- a/tests/test_login.py +++ b/tests/test_login.py @@ -21,11 +21,25 @@ from django.contrib.auth import get_user_model from authentic2 import models -from .utils import login, check_log +from .utils import login, check_log, assert_event + +User = get_user_model() + + +def test_success(db, app, simple_user): + login(app, simple_user) + assert_event('user.login', user=simple_user, session=app.session, how='password-on-https') + session = app.session + app.get('/logout/').form.submit() + assert_event('user.logout', user=simple_user, session=session) + + +def test_failure(db, app, simple_user): + login(app, simple_user, password='wrong', fail=True) + assert_event('user.login.failure', username=simple_user.username) def test_login_inactive_user(db, app): - User = get_user_model() user1 = User.objects.create(username='john.doe') user1.set_password('john.doe') user1.save() @@ -39,7 +53,7 @@ def test_login_inactive_user(db, app): assert '_auth_user_id' not in app.session user1.is_active = False user1.save() - login(app, user1) + login(app, user2) assert int(app.session['_auth_user_id']) == user2.id app.get('/logout/').form.submit() assert '_auth_user_id' not in app.session diff --git a/tests/test_password_reset.py b/tests/test_password_reset.py index 3e1c8490..3681efd5 100644 --- a/tests/test_password_reset.py +++ b/tests/test_password_reset.py @@ -23,14 +23,16 @@ from . import utils def test_send_password_reset_email(app, simple_user, mailoutbox): from authentic2.utils import send_password_reset_mail assert len(mailoutbox) == 0 - send_password_reset_mail( - simple_user, - legacy_subject_templates=['registration/password_reset_subject.txt'], - legacy_body_templates=['registration/password_reset_email.html'], - context={ - 'base_url': 'http://testserver', - }) + with utils.run_on_commit_hooks(): + send_password_reset_mail( + simple_user, + legacy_subject_templates=['registration/password_reset_subject.txt'], + legacy_body_templates=['registration/password_reset_email.html'], + context={ + 'base_url': 'http://testserver', + }) assert len(mailoutbox) == 1 + utils.assert_event('user.password.reset.request', user=simple_user, email=simple_user.email) url = utils.get_link_from_mail(mailoutbox[0]) relative_url = url.split('testserver')[1] resp = app.get(relative_url, status=200) @@ -38,6 +40,7 @@ def test_send_password_reset_email(app, simple_user, mailoutbox): resp.form.set('new_password2', '1234==aA') resp = resp.form.submit().follow() assert str(app.session['_auth_user_id']) == str(simple_user.pk) + utils.assert_event('user.password.reset', user=simple_user, session=app.session) def test_view(app, simple_user, mailoutbox, settings): @@ -47,6 +50,7 @@ def test_view(app, simple_user, mailoutbox, settings): assert len(mailoutbox) == 0 settings.DEFAULT_FROM_EMAIL = 'show only addr ' resp = resp.form.submit() + utils.assert_event('user.password.reset.request', user=simple_user, email=simple_user.email) assert resp['Location'].endswith('/instructions/') resp = resp.follow() assert simple_user.email in resp.text diff --git a/tests/test_registration.py b/tests/test_registration.py index de785b25..2963d678 100644 --- a/tests/test_registration.py +++ b/tests/test_registration.py @@ -25,13 +25,13 @@ from django.utils.six.moves.urllib.parse import urlparse from authentic2 import utils, models from authentic2.validators import EmailValidator -from .utils import get_link_from_mail +from .utils import get_link_from_mail, assert_event User = get_user_model() -def test_registration(app, db, settings, mailoutbox, external_redirect): +def test_registration_success(app, db, settings, mailoutbox, external_redirect): next_url, good_next_url = external_redirect settings.LANGUAGE_CODE = 'en-us' @@ -45,6 +45,7 @@ def test_registration(app, db, settings, mailoutbox, external_redirect): response.form.set('email', 'testbot@entrouvert.com') response = response.form.submit() + assert_event('user.registration.request', email='testbot@entrouvert.com') assert urlparse(response['Location']).path == reverse('registration_complete') if not good_next_url: assert not urlparse(response['Location']).query @@ -87,6 +88,7 @@ def test_registration(app, db, settings, mailoutbox, external_redirect): assert 'was successful' in mailoutbox[1].body new_user = User.objects.get() + assert_event('user.registration', user=new_user, how='email') assert new_user.email == 'testbot@entrouvert.com' assert new_user.username is None assert new_user.check_password('T0==toto') diff --git a/tests/test_utils.py b/tests/test_utils.py index 0dd78930..c3b0b9a6 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -23,6 +23,7 @@ from django.utils.functional import lazy from django_rbac.utils import get_ou_model +from authentic2.journal import Journal from authentic2.utils import (good_next_url, same_origin, select_next_url, user_can_change_password, login, get_authentication_events, authenticate, @@ -91,6 +92,7 @@ def test_get_authentication_events_hows(rf, simple_user): middleware = AuthenticationMiddleware() middleware.process_request(request) MessageMiddleware().process_request(request) + request.journal = Journal(request=request) assert 'password' not in [ev['how'] for ev in get_authentication_events(request)] login(request, user, 'password') assert 'password' in [ev['how'] for ev in get_authentication_events(request)] diff --git a/tests/test_views.py b/tests/test_views.py index 6824a861..5828b345 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -16,7 +16,7 @@ # authentic2 import datetime -from .utils import login, logout, get_link_from_mail +from .utils import login, logout, get_link_from_mail, assert_event import pytest from django.urls import reverse @@ -38,13 +38,19 @@ def test_password_change(app, simple_user): simple_user.set_password('hop') simple_user.save() resp = login(app, simple_user, password='hop', path=reverse('password_change')) + old_session_key = app.session.session_key resp.form['old_password'] = 'hop' resp.form['new_password1'] = 'hopAbcde1' resp.form['new_password2'] = 'hopAbcde1' resp = resp.form.submit() + new_session_key = app.session.session_key + + assert old_session_key != new_session_key, 'session\'s key has not been cycled' + assert resp.location == '/accounts/password/change/done/' + assert_event('user.password.change', user=simple_user, session=app.session) def test_account_delete(app, simple_user, mailoutbox): @@ -84,8 +90,8 @@ def test_account_delete_when_logged_out(app, simple_user, mailoutbox): link = get_link_from_mail(mailoutbox[0]) logout(app) page = app.get(link) - assert 'You are about to delete the account of %s.' % \ - escape(simple_user.get_full_name()) in page.text + assert 'You are about to delete the account of %s.' % escape( + simple_user.get_full_name()) in page.text response = page.form.submit(name='delete').follow().follow() assert not User.objects.get(pk=simple_user.pk).is_active assert len(mailoutbox) == 2 @@ -105,8 +111,8 @@ def test_account_delete_by_other_user(app, simple_user, user_ou1, mailoutbox): logout(app) login(app, user_ou1, path=reverse('account_management')) page = app.get(link) - assert 'You are about to delete the account of %s.' % \ - escape(simple_user.get_full_name()) in page.text + assert 'You are about to delete the account of %s.' % escape( + simple_user.get_full_name()) in page.text response = page.form.submit(name='delete').follow() assert not User.objects.get(pk=simple_user.pk).is_active assert User.objects.get(pk=user_ou1.pk).is_active @@ -117,7 +123,12 @@ def test_account_delete_by_other_user(app, simple_user, user_ou1, mailoutbox): def test_account_delete_fake_token(app, simple_user, mailoutbox): - response = app.get(reverse('validate_deletion', kwargs={'deletion_token': 'thisismostlikelynotavalidtoken'})).follow().follow() + response = ( + app.get(reverse('validate_deletion', + kwargs={'deletion_token': 'thisismostlikelynotavalidtoken'})) + .follow() + .follow() + ) assert "The account deletion request is invalid, try again" in response.text @@ -182,6 +193,8 @@ def test_views_email_ratelimit(app, db, simple_user, settings, mailoutbox, freez response = response.form.submit() assert len(mailoutbox) == 3 assert 'try again later' in response.text + if view_name == 'password_reset': + assert_event('user.password.reset.failure', email=simple_user.email) # reach ip limit for i in range(7): diff --git a/tests/utils.py b/tests/utils.py index a3765f6f..69ab3bc1 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -30,10 +30,11 @@ from django.shortcuts import resolve_url from django.utils import six from django.utils.six.moves.urllib import parse as urlparse -from authentic2 import utils +from authentic2 import utils, models +from authentic2.apps.journal.models import Event -def login(app, user, path=None, password=None, remember_me=None, args=None, kwargs=None): +def login(app, user, path=None, password=None, remember_me=None, args=None, kwargs=None, fail=False): if path: args = args or [] kwargs = kwargs or {} @@ -43,17 +44,24 @@ def login(app, user, path=None, password=None, remember_me=None, args=None, kwar login_page = app.get(reverse('auth_login')) assert login_page.request.path == reverse('auth_login') form = login_page.form - form.set('username', user.username if hasattr(user, 'username') else user) + username = user.username if hasattr(user, 'username') else user + form.set('username', username) # password is supposed to be the same as username - form.set('password', password or user.username) + form.set('password', password or (user.clear_password if hasattr(user, 'clear_password') else username)) if remember_me is not None: form.set('remember_me', bool(remember_me)) - response = form.submit(name='login-password-submit').follow() - if path: - assert response.request.path == path + response = form.submit(name='login-password-submit') + if fail: + assert response.status_code == 200 + assert '_auth_user_id' not in app.session else: - assert response.request.path == reverse('auth_homepage') - assert '_auth_user_id' in app.session + response = response.follow() + if path: + assert response.request.path == path + else: + assert response.request.path == reverse('auth_homepage') + assert '_auth_user_id' in app.session + assert not hasattr(user, 'id') or (app.session['_auth_user_id'] == str(user.id)) return response @@ -242,4 +250,30 @@ def call_command(*args, **kwargs): def text_content(node): '''Extract text content from node and all its children. Equivalent to xmlNodeGetContent from libxml.''' - return u''.join(node.itertext()) + return u''.join(node.itertext()) if node is not None else '' + + +def assert_event(event_type_name, user=None, session=None, service=None, **data): + qs = Event.objects.filter(type__name=event_type_name) + if user: + qs = qs.filter(user=user) + else: + qs = qs.filter(user__isnull=True) + if session: + qs = qs.filter(session=session.session_key) + else: + qs = qs.filter(session__isnull=True) + if service: + qs = qs.which_references(service) + else: + qs = qs.exclude(qs._which_references_query(models.Service)) + + assert qs.count() == 1 + + if data: + event = qs.get() + assert event.data, 'no event.data, should be %s' % data + for key, value in data.items(): + assert event.data.get(key) == value, ( + 'event.data[%s] != data[%s] (%s != %s)' % (key, key, event.data.get(key), value) + ) -- 2.28.0