From 99bd15362b1fd773af4772e25bd1097753a03306 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Mon, 26 Oct 2020 19:38:34 +0100 Subject: [PATCH 1/4] tests: simplify FranceConnect tests (#48042) --- tests/auth_fc/conftest.py | 244 ++++++----- tests/auth_fc/test_auth_fc.py | 688 ++++++++---------------------- tests/auth_fc/test_auth_fc_api.py | 14 +- 3 files changed, 322 insertions(+), 624 deletions(-) diff --git a/tests/auth_fc/conftest.py b/tests/auth_fc/conftest.py index 8b17fc6d..b0df872c 100644 --- a/tests/auth_fc/conftest.py +++ b/tests/auth_fc/conftest.py @@ -14,129 +14,149 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import base64 +import contextlib +import datetime import json -import pytest -import django_webtest - -from django.contrib.auth import get_user_model -from django.core.cache import cache -from django_rbac.utils import get_ou_model - -from authentic2 import hooks as a2_hooks -from authentic2.manager.utils import get_ou_count -from authentic2_auth_fc.models import FcAccount - - -CARTMAN_FC_INFO = { - "token": { - "access_token": "cartmane_access_token", - "token_type": "Bearer", - "expires_in": 1200, - "id_token": "cartman_token_id" - }, - "sub": "c11661ed00014db58149c8a886c8180d", - "user_info": { - "birthcountry": "99404", - "birthdate": "2006-06-06", - "birthplace": "southpark", - "email": "ecartman@ou_southpark.org", - "family_name": "CARTMAN", - "gender": "male", - "given_name": "Eric", - "preferred_username": "CARTMAN", - "sub": "c11661ed00014db58149c8a886c8180d" - } -} - - -def create_user(**kwargs): - User = get_user_model() - password = kwargs.pop('password', None) or kwargs['username'] - federation = kwargs.pop('federation', None) - user, created = User.objects.get_or_create(**kwargs) - if password: - user.set_password(password) - user.save() - - if federation: - create_fc_federation(user, federation) - return user - - -def create_fc_federation(user, info): - kwargs = { - 'user': user, - 'token': json.dumps(info['token']), - 'user_info': json.dumps(info['user_info']), - 'sub': info['sub'] - } - return FcAccount.objects.create(**kwargs) - - -@pytest.fixture -def app(request, db): - wtm = django_webtest.WebTestMixin() - wtm._patch_settings() - request.addfinalizer(wtm._unpatch_settings) - return django_webtest.DjangoTestApp(extra_environ={'HTTP_HOST': 'testserver'}) - +import urllib.parse as urlparse +import uuid -@pytest.fixture -def fc_settings(settings): - settings.A2_FC_ENABLE = True - settings.A2_FC_CLIENT_ID = 'xxx' - settings.A2_FC_CLIENT_SECRET = 'yyy' - return settings - - -@pytest.fixture -def ou_southpark(db): - OU = get_ou_model() - return OU.objects.create(name='southpark', slug='southpark') +from jwcrypto import jwk, jwt +import httmock +import pytest +from django.http import QueryDict +from django.urls import reverse +from django.utils.http import urlencode +from django.utils.timezone import now -@pytest.fixture -def admin(db): - return create_user(username='admin', is_superuser=True, is_staff=True) +from authentic2.models import Service +from authentic2.utils import make_url -@pytest.fixture -def user_cartman(db, ou_southpark): - return create_user(username='ecartman', first_name='eric', last_name='cartman', - email='ecartman@southpark.org', ou=ou_southpark, federation=CARTMAN_FC_INFO) +from ..utils import assert_equals_url +CLIENT_ID = 'xxx' +CLIENT_SECRET = 'yyy' -@pytest.fixture(autouse=True) -def clear_cache(): - OU = get_ou_model() - cache.clear() - for cached_el in (OU.cached, a2_hooks.get_hooks, get_ou_count): - cached_el.cache.clear() +class FranceConnectMock: + exp = None - -class AllHook(object): def __init__(self): - self.calls = {} - - def __call__(self, hook_name, *args, **kwargs): - calls = self.calls.setdefault(hook_name, []) - calls.append({'args': args, 'kwargs': kwargs}) - - def __getattr__(self, name): - return self.calls.get(name, []) - - def clear(self): - self.calls = {} + self.sub = '1234' + self.id_token = { + 'aud': 'xxx', + 'iss': 'https://fcp.integ01.dev-franceconnect.fr/', + } + self.user_info = { + 'family_name': 'Frédérique', + 'given_name': 'Ÿuñe', + 'email': 'john.doe@example.com', + } + self.access_token = str(uuid.uuid4()) + self.client_id = CLIENT_ID + self.client_secret = CLIENT_SECRET + self.scopes = {'openid', 'profile', 'email'} + self.callback_params = {'service': 'portail', 'next': '/idp/'} + + def handle_authorization(self, app, url, **kwargs): + assert url.startswith('https://fcp.integ01') + parsed_url = urlparse.urlparse(url) + query = QueryDict(parsed_url.query) + assert_equals_url(query['redirect_uri'], self.callback_url) + assert query['client_id'] == self.client_id + assert set(query['scope'].split()) == self.scopes + assert query['state'] + assert query['nonce'] + assert query['response_type'] == 'code' + assert query['acr_values'] == 'eidas1' + self.state = query['state'] + self.nonce = query['nonce'] + self.code = str(uuid.uuid4().hex) + return app.get( + make_url(self.callback_url, params={'code': self.code, 'state': self.state}), **kwargs) + + @property + def callback_url(self): + return 'http://testserver' + reverse('fc-login-or-link') + '?' + urlencode(self.callback_params) + + def login_with_fc_fixed_params(self, app): + if app.session: + app.session.flush() + response = app.get('/login/?' + urlencode(self.callback_params)) + response = response.click(href='callback') + return self.handle_authorization(app, response.location, status=302) + + def login_with_fc(self, app, path): + if app.session: + app.session.flush() + response = app.get(path) + self.callback_params = {k: v for k, v in QueryDict(urlparse.urlparse(response.location).query).items()} + response = response.follow() + response = response.click(href='callback') + return self.handle_authorization(app, response.location, status=302).follow() + + def access_token_response(self, url, request): + formdata = QueryDict(request.body) + assert set(formdata.keys()) == {'code', 'client_id', 'client_secret', + 'redirect_uri', 'grant_type'} + assert formdata['code'] == self.code + assert formdata['client_id'] == self.client_id + assert formdata['client_secret'] == self.client_secret + assert formdata['grant_type'] == 'authorization_code' + assert_equals_url(formdata['redirect_uri'], self.callback_url) + + # make response + id_token = self.id_token.copy() + id_token.update({ + 'sub': self.sub, + 'nonce': self.nonce, + 'exp': int((self.exp or (now() + datetime.timedelta(seconds=60))).timestamp()), + }) + id_token.update(self.user_info) + return json.dumps({ + 'access_token': self.access_token, + 'id_token': self.hmac_jwt(id_token, self.client_secret) + }) + + def hmac_jwt(self, payload, key): + header = {'alg': 'HS256'} + k = jwk.JWK(kty='oct', k=base64.b64encode(key.encode('utf-8')).decode('ascii')) + t = jwt.JWT(header=header, claims=payload) + t.make_signed_token(k) + return t.serialize() + + def user_info_response(self, url, request): + assert request.headers['Authorization'] == 'Bearer %s' % self.access_token + user_info = self.user_info.copy() + user_info['sub'] = self.sub + return json.dumps(user_info) + + @contextlib.contextmanager + def __call__(self): + with httmock.HTTMock( + httmock.urlmatch(path=r'.*/token$')(self.access_token_response), + httmock.urlmatch(path=r'.*userinfo$')(self.user_info_response)): + yield None + + def handle_logout(self, app, url): + assert url.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/logout') + parsed_url = urlparse.urlparse(url) + query = QueryDict(parsed_url.query) + assert_equals_url(query['post_logout_redirect_uri'], 'http://testserver' + reverse('fc-logout')) + assert query['state'] + self.state = query['state'] + return app.get(reverse('fc-logout') + '?state=' + self.state) @pytest.fixture -def hooks(settings): - if hasattr(settings, 'A2_HOOKS'): - hooks = settings.A2_HOOKS - else: - hooks = settings.A2_HOOKS = {} - hook = hooks['__all__'] = AllHook() - yield hook - hook.clear() - del settings.A2_HOOKS['__all__'] +def franceconnect(settings, service): + settings.A2_FC_ENABLE = True + settings.A2_FC_CLIENT_ID = CLIENT_ID + settings.A2_FC_CLIENT_SECRET = CLIENT_SECRET + + Service.objects.create(name='portail', slug='portail') + mock_object = FranceConnectMock() + with mock_object(): + yield mock_object diff --git a/tests/auth_fc/test_auth_fc.py b/tests/auth_fc/test_auth_fc.py index 1dae183a..da539b3f 100644 --- a/tests/auth_fc/test_auth_fc.py +++ b/tests/auth_fc/test_auth_fc.py @@ -15,84 +15,36 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import pytest -import re -import httmock -import mock -import json -import base64 -from jwcrypto import jwk, jwt import datetime +import mock import requests from django.contrib.auth import get_user_model from django.urls import reverse -from django.utils.encoding import force_text from django.utils.six.moves.urllib import parse as urlparse from django.utils.timezone import now -from authentic2.models import Service - from authentic2_auth_fc import models from authentic2_auth_fc.utils import requests_retry_session -from ..utils import login +from ..utils import login, get_link_from_mail User = get_user_model() -@pytest.fixture(autouse=True) -def service(db): - return Service.objects.create(name='portail', slug='portail') - - def path(url): return urlparse.urlparse(url).path -def get_links_from_mail(mail): - '''Extract links from mail sent by Django''' - return re.findall('https?://[^ \n]*', mail.body) - - -def hmac_jwt(payload, key): - header = {'alg': 'HS256'} - k = jwk.JWK( - kty='oct', k=force_text(base64.b64encode(key.encode('utf-8')))) - t = jwt.JWT(header=header, claims=payload) - t.make_signed_token(k) - return t.serialize() - - -def test_login_redirect(app, fc_settings): +def test_login_redirect(app, franceconnect): url = reverse('fc-login-or-link') response = app.get(url, status=302) assert response['Location'].startswith('https://fcp.integ01') -def check_authorization_url(url): - callback = reverse('fc-login-or-link') - assert url.startswith('https://fcp.integ01') - query_string = url.split('?')[1] - parsed = {x: y[0] for x, y in urlparse.parse_qs(query_string).items()} - assert 'redirect_uri' in parsed - assert callback in parsed['redirect_uri'] - assert 'client_id' in parsed - assert parsed['client_id'] == 'xxx' - assert 'scope' in parsed - assert set(parsed['scope'].split()) == set(['openid', 'profile', 'email']) - assert 'state' in parsed - assert 'nonce' in parsed - assert parsed['state'] == parsed['nonce'] - assert 'response_type' in parsed - assert parsed['response_type'] == 'code' - assert parsed['acr_values'] == 'eidas1' - return parsed['state'] - - -def test_login_with_condition(app, fc_settings, settings): +def test_login_with_condition(settings, app, franceconnect): # open the page first time so session cookie can be set response = app.get('/login/') assert 'fc-button' in response @@ -105,225 +57,135 @@ def test_login_with_condition(app, fc_settings, settings): assert 'fc-button' not in response -def test_login_autorun(app, fc_settings, settings): +def test_login_autorun(settings, app, franceconnect): # hide password block settings.AUTH_FRONTENDS_KWARGS = {'password': {'show_condition': 'remote_addr==\'0.0.0.0\''}} response = app.get('/login/') assert response['Location'] == reverse('fc-login-or-link') -@pytest.mark.parametrize('exp', [now() + datetime.timedelta(seconds=1000), - now() - datetime.timedelta(seconds=1000)]) -def test_login_simple(app, fc_settings, caplog, hooks, exp): +def test_no_create(app, franceconnect): response = app.get('/login/?service=portail&next=/idp/') response = response.click(href='callback') - location = response['Location'] - state = check_authorization_url(location) - - @httmock.urlmatch(path=r'.*/token$') - def access_token_response(url, request): - parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()} - assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri', - 'grant_type']) - assert parsed['code'] == 'zzz' - assert parsed['client_id'] == 'xxx' - assert parsed['client_secret'] == 'yyy' - assert parsed['grant_type'] == 'authorization_code' - parsed_redirect = urlparse.urlparse(parsed['redirect_uri']) - parsed_callback = urlparse.urlparse(callback) - assert parsed_redirect.path == parsed_callback.path - for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items(): - urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value - id_token = { - 'sub': '1234', - 'aud': 'xxx', - 'nonce': state, - 'exp': int(exp.timestamp()), - 'iss': 'https://fcp.integ01.dev-franceconnect.fr/', - } - return json.dumps({ - 'access_token': 'uuu', - 'id_token': hmac_jwt(id_token, 'yyy') - }) - - @httmock.urlmatch(path=r'.*userinfo$') - def user_info_response(url, request): - assert request.headers['Authorization'] == 'Bearer uuu' - return json.dumps({ - 'sub': '1234', - 'family_name': u'Frédérique', - 'given_name': u'Ÿuñe', - }) - - callback = reverse('fc-login-or-link') - with httmock.HTTMock(access_token_response, user_info_response): - response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302) + franceconnect.handle_authorization(app, response.location, status=302) assert User.objects.count() == 0 - fc_settings.A2_FC_CREATE = True - with httmock.HTTMock(access_token_response, user_info_response): - response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302) - if exp < now(): - assert User.objects.count() == 0 - else: - assert User.objects.count() == 1 - if User.objects.count(): - user = User.objects.get() - assert user.verified_attributes.first_name == u'Ÿuñe' - assert user.verified_attributes.last_name == u'Frédérique' - assert path(response['Location']) == '/idp/' - assert hooks.event[1]['kwargs']['name'] == 'login' - assert hooks.event[1]['kwargs']['service'] == 'portail' - # we must be connected - assert app.session['_auth_user_id'] - assert app.session.get_expire_at_browser_close() - assert models.FcAccount.objects.count() == 1 - - # test unlink cancel case - response = app.get('/accounts/') - response = response.click('Delete link') - assert len(response.pyquery('[name=cancel][formnovalidate]')) == 1 - response = response.form.submit(name='cancel') - response = response.follow() - - # test unlink submit case - response = app.get('/accounts/') - response = response.click('Delete link') - response.form.set('new_password1', 'ikKL1234') - response.form.set('new_password2', 'ikKL1234') - response = response.form.submit(name='unlink') - assert 'The link with the FranceConnect account has been deleted' in response.text - assert models.FcAccount.objects.count() == 0 - continue_url = response.pyquery('a#a2-continue').attr['href'] - state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0] - assert app.session['fc_states'][state]['next'] == '/accounts/' - response = app.get(reverse('fc-logout') + '?state=' + state) - assert path(response['Location']) == '/accounts/' - - -def test_login_email_is_unique(app, fc_settings, caplog): - callback = reverse('fc-login-or-link') - response = app.get(callback, status=302) - location = response['Location'] - state = check_authorization_url(location) - - @httmock.urlmatch(path=r'.*/token$') - def access_token_response(url, request): - parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()} - assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri', - 'grant_type']) - assert parsed['code'] == 'zzz' - assert parsed['client_id'] == 'xxx' - assert parsed['client_secret'] == 'yyy' - assert parsed['grant_type'] == 'authorization_code' - parsed_redirect = urlparse.urlparse(parsed['redirect_uri']) - parsed_callback = urlparse.urlparse(callback) - assert parsed_redirect.path == parsed_callback.path - for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items(): - urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value - exp = now() + datetime.timedelta(seconds=1000) - id_token = { - 'sub': '1234', - 'aud': 'xxx', - 'nonce': state, - 'exp': int(exp.timestamp()), - 'iss': 'https://fcp.integ01.dev-franceconnect.fr/', - } - return json.dumps({ - 'access_token': 'uuu', - 'id_token': hmac_jwt(id_token, 'yyy') - }) - - @httmock.urlmatch(path=r'.*userinfo$') - def user_info_response(url, request): - assert request.headers['Authorization'] == 'Bearer uuu' - return json.dumps({ - 'sub': '1234', - 'family_name': u'Frédérique', - 'given_name': u'Ÿuñe', - 'email': 'jOhn.dOe@eXample.com', - }) - user = User.objects.create(email='john.doe@example.com', first_name='John', last_name='Doe') - user.set_password('toto') - user.save() - fc_settings.A2_EMAIL_IS_UNIQUE = True - with httmock.HTTMock(access_token_response, user_info_response): - response = app.get(callback + '?code=zzz&state=%s' % state, status=302) + +def test_create(settings, app, franceconnect, hooks): + # test direct creation + settings.A2_FC_CREATE = True + + response = app.get('/login/?service=portail&next=/idp/') + response = response.click(href='callback') + + assert User.objects.count() == 0 + response = franceconnect.handle_authorization(app, response.location, status=302) assert User.objects.count() == 1 + + user = User.objects.get() + assert user.verified_attributes.first_name == 'Ÿuñe' + assert user.verified_attributes.last_name == 'Frédérique' + assert path(response['Location']) == '/idp/' + assert hooks.event[1]['kwargs']['name'] == 'login' + assert hooks.event[1]['kwargs']['service'] == 'portail' + # we must be connected assert app.session['_auth_user_id'] + assert app.session.get_expire_at_browser_close() + assert models.FcAccount.objects.count() == 1 - # logout, test unlinking when logging with password - app.session.flush() - response = app.get('/login/') - response.form.set('username', User.objects.get().email) - response.form.set('password', 'toto') - response = response.form.submit(name='login-password-submit').follow() + # test unlink cancel case + response = app.get('/accounts/') + response = response.click('Delete link') + assert len(response.pyquery('[name=cancel][formnovalidate]')) == 1 + response = response.form.submit(name='cancel') + response = response.follow() + # test unlink submit case response = app.get('/accounts/') response = response.click('Delete link') + response.form.set('new_password1', 'ikKL1234') + response.form.set('new_password2', 'ikKL1234') + response = response.form.submit(name='unlink') + assert 'The link with the FranceConnect account has been deleted' in response.text + assert models.FcAccount.objects.count() == 0 + continue_url = response.pyquery('a#a2-continue').attr['href'] + state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0] + assert app.session['fc_states'][state]['next'] == '/accounts/' + response = app.get(reverse('fc-logout') + '?state=' + state) + assert path(response['Location']) == '/accounts/' + + +def test_create_expired(settings, app, franceconnect, hooks): + # test direct creation failure on an expired id_token + settings.A2_FC_CREATE = True + franceconnect.exp = now() - datetime.timedelta(seconds=30) + + response = app.get('/login/?service=portail&next=/idp/') + response = response.click(href='callback') + + assert User.objects.count() == 0 + response = franceconnect.handle_authorization(app, response.location, status=302) + assert User.objects.count() == 0 + + +def test_login_email_is_unique(settings, app, franceconnect, caplog): + settings.A2_EMAIL_IS_UNIQUE = True + user = User( + email='john.doe@example.com', + first_name='John', + last_name='Doe') + user.set_password('toto') + user.save() + franceconnect.user_info['email'] = user.email + + assert User.objects.count() == 1 + franceconnect.login_with_fc_fixed_params(app) + assert User.objects.count() == 1 + assert app.session['_auth_user_id'] == str(user.pk) + + +def test_unlink_after_login_with_password(app, franceconnect, simple_user): + models.FcAccount.objects.create(user=simple_user, user_info='{}') + + response = login(app, simple_user, path='/accounts/') + response = response.click('Delete link') assert 'new_password1' not in response.form.fields response = response.form.submit(name='unlink').follow() assert 'The link with the FranceConnect account has been deleted' in response.text + # no logout from FC since we are not logged to it assert response.request.path == '/accounts/' -def test_login_email_is_unique_and_already_linked(app, fc_settings, caplog): - callback = reverse('fc-login-or-link') - response = app.get(callback, status=302) - location = response['Location'] - state = check_authorization_url(location) +def test_unlink_after_login_with_fc(app, franceconnect, simple_user): + models.FcAccount.objects.create(user=simple_user, sub=franceconnect.sub, user_info='{}') - EMAIL = 'john.doe@example.com' - SUB = '1234' - user = User.objects.create(email=EMAIL, first_name='John', last_name='Doe') - models.FcAccount.objects.create(user=user, sub='4567', token='xxx', user_info='{}') + response = franceconnect.login_with_fc(app, path='/accounts/') + response = response.click('Delete link') + response.form.set('new_password1', 'ikKL1234') + response.form.set('new_password2', 'ikKL1234') + response = response.form.submit(name='unlink') + assert 'The link with the FranceConnect account has been deleted' in response.text + assert models.FcAccount.objects.count() == 0 + continue_url = response.pyquery('a#a2-continue').attr['href'] + response = franceconnect.handle_logout(app, continue_url) + assert path(response.location) == '/accounts/' - @httmock.urlmatch(path=r'.*/token$') - def access_token_response(url, request): - parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()} - assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri', - 'grant_type']) - assert parsed['code'] == 'zzz' - assert parsed['client_id'] == 'xxx' - assert parsed['client_secret'] == 'yyy' - assert parsed['grant_type'] == 'authorization_code' - parsed_redirect = urlparse.urlparse(parsed['redirect_uri']) - parsed_callback = urlparse.urlparse(callback) - assert parsed_redirect.path == parsed_callback.path - for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items(): - urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value - exp = now() + datetime.timedelta(seconds=1000) - id_token = { - 'sub': SUB, - 'aud': 'xxx', - 'nonce': state, - 'exp': int(exp.timestamp()), - 'iss': 'https://fcp.integ01.dev-franceconnect.fr/', - } - return json.dumps({ - 'access_token': 'uuu', - 'id_token': hmac_jwt(id_token, 'yyy') - }) - - @httmock.urlmatch(path=r'.*userinfo$') - def user_info_response(url, request): - assert request.headers['Authorization'] == 'Bearer uuu' - return json.dumps({ - 'sub': '1234', - 'family_name': u'Frédérique', - 'given_name': u'Ÿuñe', - 'email': EMAIL, - }) - - fc_settings.A2_EMAIL_IS_UNIQUE = True - with httmock.HTTMock(access_token_response, user_info_response): - response = app.get(callback + '?code=zzz&state=%s' % state, status=302) - assert 'is already used' in str(response) - assert User.objects.count() == 1 + +def test_login_email_is_unique_and_already_linked(settings, app, franceconnect, caplog): + settings.A2_EMAIL_IS_UNIQUE = True + + # setup an already linked user account + user = User.objects.create(email='john.doe@example.com', first_name='John', last_name='Doe') + models.FcAccount.objects.create(user=user, sub='4567', token='xxx', user_info='{}') + response = app.get('/login/?service=portail&next=/idp/') + response = response.click(href='callback') + response = franceconnect.handle_authorization(app, response.location, status=302) + assert models.FcAccount.objects.count() == 1 + assert 'is already used' in app.cookies['messages'] assert '_auth_user_id' not in app.session -def test_requests_proxies_support(app, fc_settings, caplog): +def test_requests_proxies_support(settings, app): session = requests_retry_session() assert session.proxies == {} other_session = requests.Session() @@ -331,7 +193,8 @@ def test_requests_proxies_support(app, fc_settings, caplog): session = requests_retry_session(session=other_session) assert session is other_session assert session.proxies == {'http': 'http://example.net'} - fc_settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'} + + settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'} session = requests_retry_session() assert session.proxies == {'https': 'http://pubproxy.com/api/proxy'} @@ -341,95 +204,65 @@ def test_requests_proxies_support(app, fc_settings, caplog): assert mocked_send.call_args[1]['proxies'] == {'https': 'http://pubproxy.com/api/proxy'} -def test_password_reset(app, mailoutbox): +def test_no_password_with_fc_account_can_reset_password(app, db, mailoutbox): user = User.objects.create(email='john.doe@example.com') + # No FC account, forbidden to set a password response = app.get('/login/') response = response.click('Reset it!').maybe_follow() response.form['email'] = user.email assert len(mailoutbox) == 0 response = response.form.submit() assert len(mailoutbox) == 1 - url = get_links_from_mail(mailoutbox[0])[0] + url = get_link_from_mail(mailoutbox[0]) + response = app.get(url).follow().follow() + assert '_auth_user_id' not in app.session + assert 'not possible to reset' in response + + # With FC account, can set a password models.FcAccount.objects.create(user=user, sub='xxx', token='aaa') - response = app.get(url).maybe_follow() - assert 'new_password1' in response.form.fields + response = app.get('/login/') + response = response.click('Reset it!').maybe_follow() + response.form['email'] = user.email + assert len(mailoutbox) == 1 + response = response.form.submit() + assert len(mailoutbox) == 2 + url = get_link_from_mail(mailoutbox[1]) + response = app.get(url, status=200) + response.form.set('new_password1', 'ikKL1234') + response.form.set('new_password2', 'ikKL1234') + response = response.form.submit().follow() + assert '_auth_user_id' in app.session -def test_registration1(app, fc_settings, caplog, hooks): - exp = now() + datetime.timedelta(seconds=1000) - response = app.get('/login/?service=portail&next=/idp/') - response = response.click(href="callback") - # 1. Try a login - # 2. Verify we come back to login page - # 3. Check presence of registration link - # 4. Follow it - location = response['Location'] - state = check_authorization_url(location) - - @httmock.urlmatch(path=r'.*/token$') - def access_token_response(url, request): - parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()} - assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri', - 'grant_type']) - assert parsed['code'] == 'zzz' - assert parsed['client_id'] == 'xxx' - assert parsed['client_secret'] == 'yyy' - assert parsed['grant_type'] == 'authorization_code' - parsed_redirect = urlparse.urlparse(parsed['redirect_uri']) - parsed_callback = urlparse.urlparse(callback) - assert parsed_redirect.path == parsed_callback.path - for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items(): - urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value - id_token = { - 'sub': '1234', - 'aud': 'xxx', - 'nonce': state, - 'exp': int(exp.timestamp()), - 'iss': 'https://fcp.integ01.dev-franceconnect.fr/', - 'email': 'john.doe@example.com', - } - return json.dumps({ - 'access_token': 'uuu', - 'id_token': hmac_jwt(id_token, 'yyy') - }) - - @httmock.urlmatch(path=r'.*userinfo$') - def user_info_response(url, request): - assert request.headers['Authorization'] == 'Bearer uuu' - return json.dumps({ - 'sub': '1234', - 'family_name': u'Frédérique', - 'given_name': u'Ÿuñe', - 'email': 'john.doe@example.com', - }) - - callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0] - with httmock.HTTMock(access_token_response, user_info_response): - response = app.get(callback + '&code=zzz&state=%s' % state, status=302) +def test_registration1(settings, app, franceconnect, caplog, hooks): + response = franceconnect.login_with_fc_fixed_params(app) assert User.objects.count() == 0 - assert path(response['Location']) == '/login/' + assert path(response.location) == '/login/' response = response.follow() response = response.click(href='/accounts/fc/register') - location = response['Location'] - location.startswith('http://testserver/accounts/activate/') + response.location.startswith('http://testserver/accounts/activate/') + assert User.objects.count() == 0 response = response.follow() - assert hooks.calls['event'][0]['kwargs']['service'] == 'portail' + assert response.location.startswith('/fc/callback/') + # a new user has been created + assert User.objects.count() == 1 + # but no FcAccount + assert models.FcAccount.objects.count() == 0 # we must be connected assert app.session['_auth_user_id'] - parsed_location = urlparse.urlparse(response['Location']) - parsed_callback = urlparse.urlparse(callback) - assert parsed_location.path == parsed_callback.path - assert (urlparse.parse_qs(parsed_location.query) == - urlparse.parse_qs(parsed_callback.query)) + # hook must have been called + assert hooks.calls['event'][0]['kwargs']['service'] == 'portail' + response = response.follow() - location = response['Location'] - state = check_authorization_url(location) - with httmock.HTTMock(access_token_response, user_info_response): - response = app.get(callback + '&code=zzz&state=%s' % state, status=302) + # a new redirect to FC is done + response = franceconnect.handle_authorization(app, response.location) + + # FcAccount now exists assert models.FcAccount.objects.count() == 1 user = User.objects.get() - assert user.verified_attributes.first_name == u'Ÿuñe' - assert user.verified_attributes.last_name == u'Frédérique' + assert user.verified_attributes.first_name == 'Ÿuñe' + assert user.verified_attributes.last_name == 'Frédérique' + response = app.get('/accounts/') response = response.click('Delete link') response.form.set('new_password1', 'ikKL1234') @@ -438,214 +271,60 @@ def test_registration1(app, fc_settings, caplog, hooks): assert 'The link with the FranceConnect account has been deleted' in response.text assert models.FcAccount.objects.count() == 0 continue_url = response.pyquery('a#a2-continue').attr['href'] - state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0] - assert app.session['fc_states'][state]['next'] == '/accounts/' - response = app.get(reverse('fc-logout') + '?state=' + state) - assert path(response['Location']) == '/accounts/' + response = franceconnect.handle_logout(app, continue_url) + assert path(response.location) == '/accounts/' -def test_registration2(app, fc_settings, caplog, hooks): - exp = now() + datetime.timedelta(seconds=1000) +def test_registration2(settings, app, franceconnect, hooks): response = app.get('/login/?service=portail&next=/idp/') response = response.click("Register") response = response.click(href='callback') - # 1. Try a login - # 2. Verify we come back to login page - # 3. Check presence of registration link - # 4. Follow it - location = response['Location'] - state = check_authorization_url(location) - - @httmock.urlmatch(path=r'.*/token$') - def access_token_response(url, request): - parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()} - assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri', - 'grant_type']) - assert parsed['code'] == 'zzz' - assert parsed['client_id'] == 'xxx' - assert parsed['client_secret'] == 'yyy' - assert parsed['grant_type'] == 'authorization_code' - parsed_redirect = urlparse.urlparse(parsed['redirect_uri']) - parsed_callback = urlparse.urlparse(callback) - assert parsed_redirect.path == parsed_callback.path - for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items(): - urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value - id_token = { - 'sub': '1234', - 'aud': 'xxx', - 'nonce': state, - 'exp': int(exp.timestamp()), - 'iss': 'https://fcp.integ01.dev-franceconnect.fr/', - 'email': 'john.doe@example.com', - } - return json.dumps({ - 'access_token': 'uuu', - 'id_token': hmac_jwt(id_token, 'yyy') - }) - - @httmock.urlmatch(path=r'.*userinfo$') - def user_info_response(url, request): - assert request.headers['Authorization'] == 'Bearer uuu' - return json.dumps({ - 'sub': '1234', - 'family_name': u'Frédérique', - 'given_name': u'Ÿuñe', - 'email': 'john.doe@example.com', - }) - - callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0] - with httmock.HTTMock(access_token_response, user_info_response): - response = app.get(callback + '&code=zzz&state=%s' % state, status=302) + franceconnect.callback_params['registration'] = '' + response = franceconnect.handle_authorization(app, response.location) + assert User.objects.count() == 0 - assert path(response['Location']) == '/accounts/fc/register/' + assert path(response.location) == '/accounts/fc/register/' response = response.follow() - location = response['Location'] - location.startswith('http://testserver/accounts/activate/') + response.location.startswith('http://testserver/accounts/activate/') response = response.follow() + assert User.objects.count() == 1 + user = User.objects.get() + assert user.verified_attributes.first_name is None + assert user.verified_attributes.last_name is None assert hooks.calls['event'][0]['kwargs']['service'] == 'portail' assert hooks.calls['event'][1]['kwargs']['service'] == 'portail' # we must be connected assert app.session['_auth_user_id'] - # remove the registration parameter - callback = callback.replace('®istration=', '') - callback = callback.replace('?registration=', '?') - callback = callback.replace('?&', '?') - parsed_location = urlparse.urlparse(response['Location']) - parsed_callback = urlparse.urlparse(callback) - assert parsed_location.path == parsed_callback.path - assert (urlparse.parse_qs(parsed_location.query) == - urlparse.parse_qs(parsed_callback.query)) response = response.follow() - location = response['Location'] - state = check_authorization_url(location) - with httmock.HTTMock(access_token_response, user_info_response): - response = app.get(callback + '&code=zzz&state=%s' % state, status=302) - assert models.FcAccount.objects.count() == 1 + + del franceconnect.callback_params['registration'] + response = franceconnect.handle_authorization(app, response.location) user = User.objects.get() - assert user.verified_attributes.first_name == u'Ÿuñe' - assert user.verified_attributes.last_name == u'Frédérique' - response = app.get('/accounts/') - response = response.click('Delete link') - response.form.set('new_password1', 'ikKL1234') - response.form.set('new_password2', 'ikKL1234') - response = response.form.submit(name='unlink') - assert 'The link with the FranceConnect account has been deleted' in response.text - assert models.FcAccount.objects.count() == 0 - continue_url = response.pyquery('a#a2-continue').attr['href'] - state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0] - assert app.session['fc_states'][state]['next'] == '/accounts/' - response = app.get(reverse('fc-logout') + '?state=' + state) - assert path(response['Location']) == '/accounts/' + assert user.verified_attributes.first_name == 'Ÿuñe' + assert user.verified_attributes.last_name == 'Frédérique' -def test_can_change_password(app, fc_settings, caplog, hooks): - exp = now() + datetime.timedelta(seconds=1000) - response = app.get('/login/?service=portail&next=/idp/') - response = response.click("Register") - response = response.click(href='callback') - # 1. Try a login - # 2. Verify we come back to login page - # 3. Check presence of registration link - # 4. Follow it - location = response['Location'] - state = check_authorization_url(location) - - @httmock.urlmatch(path=r'.*/token$') - def access_token_response(url, request): - parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()} - assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri', - 'grant_type']) - assert parsed['code'] == 'zzz' - assert parsed['client_id'] == 'xxx' - assert parsed['client_secret'] == 'yyy' - assert parsed['grant_type'] == 'authorization_code' - parsed_redirect = urlparse.urlparse(parsed['redirect_uri']) - parsed_callback = urlparse.urlparse(callback) - assert parsed_redirect.path == parsed_callback.path - for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items(): - urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value - id_token = { - 'sub': '1234', - 'aud': 'xxx', - 'nonce': state, - 'exp': int(exp.timestamp()), - 'iss': 'https://fcp.integ01.dev-franceconnect.fr/', - 'email': 'john.doe@example.com', - } - return json.dumps({ - 'access_token': 'uuu', - 'id_token': hmac_jwt(id_token, 'yyy') - }) - - @httmock.urlmatch(path=r'.*userinfo$') - def user_info_response(url, request): - assert request.headers['Authorization'] == 'Bearer uuu' - return json.dumps({ - 'sub': '1234', - 'family_name': u'Frédérique', - 'given_name': u'Ÿuñe', - 'email': 'john.doe@example.com', - }) - - callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0] - with httmock.HTTMock(access_token_response, user_info_response): - response = app.get(callback + '&code=zzz&state=%s' % state, status=302) - assert User.objects.count() == 0 - assert path(response['Location']) == '/accounts/fc/register/' - response = response.follow() - location = response['Location'] - location.startswith('http://testserver/accounts/activate/') - response = response.follow() - assert hooks.calls['event'][0]['kwargs']['service'] == 'portail' - assert hooks.calls['event'][1]['kwargs']['service'] == 'portail' - # we must be connected - assert app.session['_auth_user_id'] - # remove the registration parameter - callback = callback.replace('®istration=', '') - callback = callback.replace('?registration=', '?') - callback = callback.replace('?&', '?') - parsed_location = urlparse.urlparse(response['Location']) - parsed_callback = urlparse.urlparse(callback) - assert parsed_location.path == parsed_callback.path - assert (urlparse.parse_qs(parsed_location.query) == - urlparse.parse_qs(parsed_callback.query)) - response = response.follow() - location = response['Location'] - state = check_authorization_url(location) - with httmock.HTTMock(access_token_response, user_info_response): - response = app.get(callback + '&code=zzz&state=%s' % state, status=302) - assert models.FcAccount.objects.count() == 1 - user = User.objects.get() - assert user.verified_attributes.first_name == u'Ÿuñe' - assert user.verified_attributes.last_name == u'Frédérique' - response = app.get('/accounts/') +def test_can_change_password(settings, app, franceconnect): + user = User.objects.create(email='john.doe@example.com') + models.FcAccount.objects.create(user=user, sub=franceconnect.sub) + + response = franceconnect.login_with_fc(app, path='/accounts/') assert len(response.pyquery('[href*="password/change"]')) == 0 + response = response.click('Logout') + response = franceconnect.handle_logout(app, response.location).follow() + assert '_auth_user_id' not in app.session # Login with password - user = User.objects.get() + user.username = 'test' user.set_password('test') user.save() - app.session.flush() - response = app.get('/login/') - response.form.set('username', User.objects.get().email) - response.form.set('password', 'test') - response = response.form.submit(name='login-password-submit').follow() - response = app.get('/accounts/') + + response = login(app, user, path='/accounts/') assert len(response.pyquery('[href*="password/change"]')) > 0 + response = response.click('Logout').follow() # Relogin with FC - app.session.flush() - response = app.get('/login/?service=portail&next=/accounts/') - response = response.click(href='callback') - location = response['Location'] - state = check_authorization_url(location) - callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0] - with httmock.HTTMock(access_token_response, user_info_response): - response = app.get(callback + '&code=zzz&state=%s' % state, status=302) - # we must be connected - assert app.session['_auth_user_id'] - assert path(response['Location']) == '/accounts/' - response = response.follow() + response = franceconnect.login_with_fc(app, path='/accounts/') assert len(response.pyquery('[href*="password/change"]')) == 0 # Unlink @@ -653,20 +332,17 @@ def test_can_change_password(app, fc_settings, caplog, hooks): response.form.set('new_password1', 'ikKL1234') response.form.set('new_password2', 'ikKL1234') response = response.form.submit(name='unlink') + assert 'The link with the FranceConnect account has been deleted' in response.text continue_url = response.pyquery('a#a2-continue').attr['href'] - state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0] - assert app.session['fc_states'][state]['next'] == '/accounts/' - response = app.get(reverse('fc-logout') + '?state=' + state) - assert path(response['Location']) == '/accounts/' - response = response.follow() + response = franceconnect.handle_logout(app, continue_url).follow() assert len(response.pyquery('[href*="password/change"]')) > 0 -def test_invalid_next_url(app, fc_settings, caplog, hooks): +def test_invalid_next_url(app, franceconnect): assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ' -def test_manager_user_sidebar(app, fc_settings, superuser, simple_user): +def test_manager_user_sidebar(app, superuser, simple_user): login(app, superuser, '/manage/') response = app.get('/manage/users/%s/' % simple_user.id) assert 'FranceConnect' not in response diff --git a/tests/auth_fc/test_auth_fc_api.py b/tests/auth_fc/test_auth_fc_api.py index 20160045..8a6f9190 100644 --- a/tests/auth_fc/test_auth_fc_api.py +++ b/tests/auth_fc/test_auth_fc_api.py @@ -18,8 +18,9 @@ from authentic2_auth_fc.models import FcAccount -def test_api_fc_unlink(app, admin, user_cartman): - url = '/api/users/%s/fc-unlink/' % user_cartman.uuid +def test_api_fc_unlink(app, admin, simple_user): + FcAccount.objects.create(user=simple_user) + url = '/api/users/%s/fc-unlink/' % simple_user.uuid # test unauthorized caller app.delete(url, status=401) # test unauthorized method @@ -27,13 +28,14 @@ def test_api_fc_unlink(app, admin, user_cartman): app.get(url, status=405) # test success app.delete(url, status=204) - assert FcAccount.objects.filter(user=user_cartman).exists() is False + assert FcAccount.objects.filter(user=simple_user).exists() is False -def test_api_user_franceconnect(settings, app, admin, user_cartman): +def test_api_user_franceconnect(settings, app, admin, simple_user): settings.A2_FC_ENABLE = True + FcAccount.objects.create(user=simple_user, sub='1234') - url = '/api/users/%s/' % user_cartman.uuid + url = '/api/users/%s/' % simple_user.uuid # test unauthorized method app.authorization = ('Basic', (admin.username, admin.username)) response = app.get(url) @@ -48,7 +50,7 @@ def test_api_user_franceconnect(settings, app, admin, user_cartman): assert content.get('unlink_url').startswith('http://') assert content.get('unlink_url').endswith('/unlink/') - unlink_url = '/api/users/%s/fc-unlink/' % user_cartman.uuid + unlink_url = '/api/users/%s/fc-unlink/' % simple_user.uuid app.delete(unlink_url, status=204) response = app.get(url + '?full') -- 2.28.0