From 7dd0b25e367c7b3d8e478d64a36ca974da3d1579 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Thu, 5 Jan 2017 15:31:13 +0100 Subject: [PATCH 3/4] add FranceConnect authentication method (fixes #14510) --- tests/test_fc_auth.py | 205 +++++++++++++++++ wcs/admin/settings.py | 2 + wcs/qommon/ident/franceconnect.py | 449 ++++++++++++++++++++++++++++++++++++++ wcs/qommon/publisher.py | 2 + wcs/qommon/sessions.py | 12 + 5 files changed, 670 insertions(+) create mode 100644 tests/test_fc_auth.py create mode 100644 wcs/qommon/ident/franceconnect.py diff --git a/tests/test_fc_auth.py b/tests/test_fc_auth.py new file mode 100644 index 0000000..0d4ee7f --- /dev/null +++ b/tests/test_fc_auth.py @@ -0,0 +1,205 @@ +import urlparse +import base64 +import json +import urllib + +from quixote import cleanup, get_session_manager + +from utilities import get_app, create_temporary_pub +import mock + +PROFILE = { + 'fields': [ + { + 'kind': 'string', + 'description': '', + 'required': True, + 'user_visible': True, + 'label': u'Prenoms', + 'disabled': False, + 'user_editable': True, + 'asked_on_registration': True, + 'name': 'prenoms' + }, + { + 'kind': 'string', + 'description': '', + 'required': True, + 'user_visible': True, + 'label': 'Nom', + 'disabled': False, + 'user_editable': True, + 'asked_on_registration': True, + 'name': 'nom' + }, + { + 'kind': 'string', + 'description': '', + 'required': True, + 'user_visible': True, + 'label': 'Email', + 'disabled': False, + 'user_editable': True, + 'asked_on_registration': True, + 'name': 'email' + }, + ] +} + + +def base64url_encode(v): + return base64.urlsafe_b64encode(v).strip('=') + + +def setup_module(module): + cleanup() + global pub + pub = create_temporary_pub() + + +def setup_user_profile(pub): + if not pub.cfg: + pub.cfg = {} + # create some roles + from wcs.ctl.check_hobos import CmdCheckHobos + + # setup an hobo profile + CmdCheckHobos().update_profile(PROFILE, pub) + pub.cfg['users']['field_name'] = ['_prenoms', '_nom'] + pub.user_class.wipe() + pub.write_cfg() + +FC_CONFIG = { + 'client_id': '123', + 'client_secret': 'xyz', + 'platform': 'dev-particulier', + 'scopes': 'identite_pivot', + 'user_field_mappings': [ + { + 'field_varname': 'prenoms', + 'value': '[given_name]', + 'verified': 'always', + }, + { + 'field_varname': 'nom', + 'value': '[family_name]', + 'verified': 'always', + }, + { + 'field_varname': 'email', + 'value': '[email]', + 'verified': 'always', + }, + ] + +} + + +def setup_fc_environment(pub): + if not pub.cfg: + pub.cfg = {} + pub.cfg['identification'] = { + 'methods': ['fc'], + } + pub.cfg['fc'] = FC_CONFIG + pub.user_class.wipe() + pub.write_cfg() + + +def test_fc_login_page(): + setup_user_profile(pub) + setup_fc_environment(pub) + app = get_app(pub) + resp = app.get('/') + resp = app.get('/login/') + assert resp.status_int == 302 + assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize') + qs = urlparse.parse_qs(resp.location.split('?')[1]) + nonce = qs['nonce'][0] + state = qs['state'][0] + + id_token = { + 'nonce': nonce, + } + token_result = { + 'access_token': 'abcd', + 'id_token': '.%s.' % base64url_encode(json.dumps(id_token)), + } + user_info_result = { + 'sub': 'ymca', + 'given_name': 'John', + 'family_name': 'Doe', + 'email': 'john.doe@example.com', + } + + assert pub.user_class.count() == 0 + with mock.patch('qommon.ident.franceconnect.http_post_request') as http_post_request, \ + mock.patch('qommon.ident.franceconnect.http_get_page') as http_get_page: + http_post_request.return_value = (None, 200, json.dumps(token_result), None) + http_get_page.return_value = (None, 200, json.dumps(user_info_result), None) + resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({ + 'code': '1234', 'state': state, + })) + assert pub.user_class.count() == 1 + user = pub.user_class.select()[0] + assert user.form_data == {'_email': 'john.doe@example.com', '_nom': 'Doe', '_prenoms': 'John'} + assert set(user.verified_fields) == set(['_nom', '_prenoms', '_email']) + assert user.email == 'john.doe@example.com' + assert user.name_identifiers == ['ymca'] + assert user.name == 'John Doe' + + # Verify we are logged in + session_id = app.cookies.values()[0].strip('"') + session = get_session_manager().session_class.get(session_id) + assert session.user == user.id + assert session.extra_variables['fc_user_given_name'] == 'John' + assert session.extra_variables['fc_user_family_name'] == 'Doe' + assert session.extra_variables['fc_user_email'] == 'john.doe@example.com' + assert session.extra_variables['fc_user_sub'] == 'ymca' + + # Login existing user + resp = app.get('/logout') + resp = app.get('/login/') + with mock.patch('qommon.ident.franceconnect.http_post_request') as http_post_request, \ + mock.patch('qommon.ident.franceconnect.http_get_page') as http_get_page: + http_post_request.return_value = (None, 200, json.dumps(token_result), None) + http_get_page.return_value = (None, 200, json.dumps(user_info_result), None) + resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({ + 'code': '1234', 'state': state, + })) + new_session_id = app.cookies.values()[0].strip('"') + assert session_id != new_session_id, 'no new session created' + session = get_session_manager().session_class.get(new_session_id) + assert pub.user_class.count() == 1, 'existing user has not been used' + + +def test_fc_settings(): + setup_user_profile(pub) + app = get_app(pub) + resp = app.get('/backoffice/settings/identification/') + resp.forms[0]['methods$elementfc'].checked = True + resp = resp.forms[0].submit().follow() + + assert 'FranceConnect' in resp.body + resp = resp.click('FranceConnect') + resp = resp.forms[0].submit('user_field_mappings$add_element') + resp = resp.forms[0].submit('user_field_mappings$add_element') + resp.forms[0]['client_id'].value = '123' + resp.forms[0]['client_secret'].value = 'xyz' + resp.forms[0]['platform'].value = 'Development citizens' + resp.forms[0]['scopes'].value = 'identite_pivot' + + resp.forms[0]['user_field_mappings$element0$field_varname'] = 'prenoms' + resp.forms[0]['user_field_mappings$element0$value'] = '[given_name]' + resp.forms[0]['user_field_mappings$element0$verified'] = 'Always' + + resp.forms[0]['user_field_mappings$element1$field_varname'] = 'nom' + resp.forms[0]['user_field_mappings$element1$value'] = '[family_name]' + resp.forms[0]['user_field_mappings$element1$verified'] = 'Always' + + resp.forms[0]['user_field_mappings$element2$field_varname'] = 'email' + resp.forms[0]['user_field_mappings$element2$value'] = '[email]' + resp.forms[0]['user_field_mappings$element2$verified'] = 'Always' + + resp = resp.forms[0].submit('submit').follow() + assert pub.cfg['fc'] == FC_CONFIG diff --git a/wcs/admin/settings.py b/wcs/admin/settings.py index fe19605..9449fd6 100644 --- a/wcs/admin/settings.py +++ b/wcs/admin/settings.py @@ -71,6 +71,8 @@ class IdentificationDirectory(Directory): if lasso is not None: methods.insert(0, ('idp', _('Delegated to SAML identity provider'), 'idp')) + methods.insert(0, + ('fc', _('Delegated to France Connect'), 'fc')) form.add(CheckboxesWidget, 'methods', title = _('Methods'), value=identification_cfg.get('methods'), options=methods, diff --git a/wcs/qommon/ident/franceconnect.py b/wcs/qommon/ident/franceconnect.py new file mode 100644 index 0000000..6954025 --- /dev/null +++ b/wcs/qommon/ident/franceconnect.py @@ -0,0 +1,449 @@ +# w.c.s. - web application for online forms +# Copyright (C) 2005-2017 Entr'ouvert +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, see . + +import json +import urllib +import uuid +import hashlib +import base64 + +from quixote import redirect, get_session, get_publisher, get_request, get_session_manager +from quixote.directory import Directory +from quixote.html import htmltext, TemplateIO + +from qommon.backoffice.menu import html_top +from qommon import template, get_cfg, get_logger +from qommon.form import (Form, StringWidget, CompositeWidget, ComputedExpressionWidget, + RadiobuttonsWidget, SingleSelectWidget, WidgetListAsTable) +from qommon.misc import http_post_request, http_get_page, compute, json_loads, flatten_dict +from .base import AuthMethod + +ADMIN_TITLE = N_('FranceConnect') + +# XXX: make an OIDC auth method that FranceConnect would inherit from + + +def base64url_decode(input): + rem = len(input) % 4 + if rem > 0: + input += b'=' * (4 - rem) + return base64.urlsafe_b64decode(input) + + +class UserFieldMappingRowWidget(CompositeWidget): + def __init__(self, name, value=None, **kwargs): + CompositeWidget.__init__(self, name, value, **kwargs) + if not value: + value = {} + + fields = [] + users_cfg = get_cfg('users', {}) + user_formdef = get_publisher().user_class.get_formdef() + if not user_formdef or not users_cfg.get('field_name'): + fields.append(('__name', _('Name'), '__name')) + if not user_formdef or not users_cfg.get('field_email'): + fields.append(('__email', _('Email'), '__email')) + if user_formdef and user_formdef.fields: + for field in user_formdef.fields: + if field.varname: + fields.append((field.varname, field.label, field.varname)) + + self.add(SingleSelectWidget, name='field_varname', title=_('Field'), + value=value.get('field_varname'), + options=fields, **kwargs) + self.add(ComputedExpressionWidget, name='value', title=_('Value'), + value=value.get('value')) + self.add(SingleSelectWidget, 'verified', + title=_('Is attribute verified'), + value=value.get('verified'), + options=[('never', _('Never')), + ('always', _('Always')) + ] + ) + + def _parse(self, request): + if self.get('value') and self.get('field_varname') and self.get('verified'): + self.value = { + 'value': self.get('value'), + 'field_varname': self.get('field_varname'), + 'verified': self.get('verified'), + } + else: + self.value = None + + +class UserFieldMappingTableWidget(WidgetListAsTable): + readonly = False + + def __init__(self, name, **kwargs): + super(UserFieldMappingTableWidget, self).__init__( + name, element_type=UserFieldMappingRowWidget, **kwargs) + + +class MethodDirectory(Directory): + _q_exports = ['login', 'callback'] + + def login(self): + return FCAuthMethod().login() + + def callback(self): + return FCAuthMethod().callback() + + +class MethodAdminDirectory(Directory): + title = ADMIN_TITLE + label = N_('Configure FranceConnect identification method') + + _q_exports = [''] + + PLAFTORMS = [ + { + 'name': N_('Development citizens'), + 'slug': 'dev-particulier', + 'authorization_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize', + 'token_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/token', + 'user_info_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/userinfo', + 'logout_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/logout', + }, + { + 'name': N_('Development enterprise'), + 'slug': 'dev-entreprise', + 'authorization_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/authorize', + 'token_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/token', + 'user_info_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/userinfo', + 'logout_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/logout', + }, + { + 'name': N_('Production citizens'), + 'slug': 'prod-particulier', + 'authorization_url': 'https://app.franceconnect.gouv.fr/api/v1/authorize', + 'token_url': 'https://app.franceconnect.gouv.fr/api/v1/token', + 'user_info_url': 'https://app.franceconnect.gouv.fr/api/v1/userinfo', + 'logout_url': 'https://app.franceconnect.gouv.fr/api/v1/logout', + } + ] + + CONFIG = [ + ('client_id', N_('Client ID')), + ('client_secret', N_('Client secret')), + ('platform', N_('Platform')), + ('scopes', N_('Scopes')), + ('user_field_mappings', N_('User field mappings')), + ] + + KNOWN_ATTRIBUTES = [ + ('given_name', N_('first names separated by spaces')), + ('family_name', N_('birth\'s last name')), + ('birthdate', N_('birthdate formatted as YYYY-MM-DD')), + ('gender', N_('gender \'male\' for men, and \'female\' for women')), + ('birthplace', N_('INSEE code of the place of birth')), + ('birthcountry', N_('INSEE code of the country of birth')), + ('email', N_('email')), + ('siret', N_('SIRET or SIREN number of the enterprise')), + # XXX: FranceConnect website also refer to adress and phones attributes but we don't know + # what must be expected of their value. + ] + + @classmethod + def get_form(cls, instance={}): + form = Form(enctype='multipart/form-data') + for key, title in cls.CONFIG: + attrs = {} + default = None + hint = None + kwargs = {} + widget = StringWidget + + if key == 'user_field_mappings': + widget = UserFieldMappingTableWidget + elif key == 'platform': + widget = SingleSelectWidget + kwargs['options'] = [ + (platform['slug'], platform['name']) for platform in cls.PLAFTORMS + ] + elif key == 'scopes': + default = 'identite_pivot address email phones' + hint = _('Space separated values among: identite_pivot, address, email, phones, ' + 'profile, birth, preferred_username, gender, birthdate, ' + 'birthcountry, birthplace') + if widget == StringWidget: + kwargs['size'] = '80' + form.add(widget, key, + title=_(title), + hint=hint, + value=instance.get(key, default), + attrs=attrs, **kwargs) + form.add_submit('submit', _('Submit')) + return form + + def submit(self, form): + cfg = {} + for key, title in self.CONFIG: + cfg[key] = form.get_widget(key).parse() + get_publisher().cfg['fc'] = cfg + get_publisher().write_cfg() + return redirect('.') + + def _q_index(self): + fc_cfg = get_cfg('fc', {}) + form = self.get_form(fc_cfg) + pub = get_publisher() + + if not ('submit' in get_request().form and form.is_submitted()) or form.has_errors(): + html_top('settings', title=_(self.title)) + r = TemplateIO(html=True) + r += htmltext('

%s

') % self.title + fc_callback = pub.get_frontoffice_url() + '/ident/fc/callback' + r += htmltext(_('

Callback URL is %s.

')) % ( + fc_callback, fc_callback) + r += htmltext(_('See ' + 'FranceConnect partners\'site for getting a client_id and ' + 'a client_secret.

')) + r += form.render() + r += htmltext('

See FranceConnect partners\'site for more ' + 'informations on available scopes and attributes. Known ones ' + 'are :

') + r += (htmltext('') % + (_('Attribute'), _('Description'))) + for attribute, description in self.KNOWN_ATTRIBUTES: + r += htmltext('') % (attribute, _(description)) + r += htmltext('
%s%s
%s
%s

') + + return r.getvalue() + else: + return self.submit(form) + + +class FCAuthMethod(AuthMethod): + key = 'fc' + description = ADMIN_TITLE + method_directory = MethodDirectory + method_admin_directory = MethodAdminDirectory + + def is_ok(self): + fc_cfg = get_cfg('fc', {}) + for key, title in self.method_admin_directory.CONFIG: + if not fc_cfg.get(key): + return False + return True + + def login(self): + if not self.is_ok(): + return template.error_page(_('FranceConnect support is not yet configured')) + + fc_cfg = get_cfg('fc', {}) + pub = get_publisher() + session = get_session() + + authorization_url = self.get_authorization_url() + client_id = fc_cfg.get('client_id') + state = str(uuid.uuid4()) + session = get_session() + next_url = get_request().form.get('next') or pub.get_frontoffice_url() + session.set_extra_attribute('fc_next_url_' + state, next_url) + + # generate a session id if none exists, ugly but necessary + get_session_manager().maintain_session(session) + + nonce = hashlib.sha256(str(session.id)).hexdigest() + fc_callback = pub.get_frontoffice_url() + '/ident/fc/callback' + qs = urllib.urlencode({ + 'response_type': 'code', + 'client_id': client_id, + 'redirect_uri': fc_callback, + 'scope': 'openid ' + fc_cfg.get('scopes', ''), + 'state': state, + 'nonce': nonce, + }) + redirect_url = '%s?%s' % (authorization_url, qs) + return redirect(redirect_url) + + def is_interactive(self): + return False + + def get_access_token(self, code): + logger = get_logger() + session = get_session() + fc_cfg = get_cfg('fc', {}) + client_id = fc_cfg.get('client_id') + client_secret = fc_cfg.get('client_secret') + redirect_uri = get_request().get_frontoffice_url().split('?')[0] + body = { + 'grant_type': 'authorization_code', + 'redirect_uri': redirect_uri, + 'client_id': client_id, + 'client_secret': client_secret, + 'code': code, + } + response, status, data, auth_header = http_post_request( + self.get_token_url(), + urllib.urlencode(body), + headers={ + 'Content-Type': 'application/x-www-form-urlencoded', + }) + if status != 200: + logger.error('status from FranceConnect token_url is not 200') + return None + result = json_loads(data) + if 'error' in result: + logger.error('FranceConnect code resolution failed: %s', result['error']) + return None + # check id_token nonce + id_token = result['id_token'] + header, payload, signature = id_token.split('.') + payload = json_loads(base64url_decode(payload)) + nonce = hashlib.sha256(str(session.id)).hexdigest() + if payload['nonce'] != nonce: + logger.error('FranceConnect returned nonce did not match') + return None + return result['access_token'] + + def get_user_info(self, access_token): + logger = get_logger() + response, status, data, auth_header = http_get_page( + self.get_user_info_url(), + headers={ + 'Authorization': 'Bearer %s' % access_token, + }) + if status != 200: + logger.error('status from FranceConnect user_info_url is not 200 but %s and data is' + ' %s', status. data[:100]) + return None + return json.loads(data) + + def get_platform(self): + fc_cfg = get_cfg('fc', {}) + slug = fc_cfg.get('platform') + for platform in self.method_admin_directory.PLAFTORMS: + if platform['slug'] == slug: + return platform + raise KeyError('platform %s not found' % slug) + + def get_authorization_url(self): + return self.get_platform()['authorization_url'] + + def get_token_url(self): + return self.get_platform()['token_url'] + + def get_user_info_url(self): + return self.get_platform()['user_info_url'] + + def get_logout_url(self): + return self.get_platform()['logout_url'] + + def fill_user_attributes(self, user, user_info): + fc_cfg = get_cfg('fc', {}) + user_field_mappings = fc_cfg.get('user_field_mappings', []) + user_formdef = get_publisher().user_class.get_formdef() + + form_data = user.form_data or {} + user.verified_fields = user.verified_fields or [] + + for user_field_mapping in user_field_mappings: + field_varname = user_field_mapping['field_varname'] + value = user_field_mapping['value'] + verified = user_field_mapping['verified'] + field_id = None + + try: + value = compute(value, context=user_info) + except: + continue + if field_varname == '__name': + user.name = value + elif field_varname == '__email': + user.email = value + field_id = 'email' + else: + for field in user_formdef.fields: + if field_varname == field.varname: + field_id = str(field.id) + break + else: + continue + form_data[field.id] = value + if field_varname == '__email': + field_varname = 'email' # special value for verified email field + + # Update verified fields + if field_id: + if verified == 'always' and field_id not in user.verified_fields: + user.verified_fields.append(field.id) + elif verified != 'always' and field_id in user.verified_fields: + user.verified_fields.remove(field.id) + + user.form_data = form_data + + if user.form_data: + user.set_attributes_from_formdata(user.form_data) + + AUTHORIZATION_REQUEST_ERRORS = { + 'access_denied': N_('You refused the connection'), + } + + def callback(self): + if not self.is_ok(): + return template.error_page(_('FranceConnect support is not yet configured')) + pub = get_publisher() + request = get_request() + session = get_session() + logger = get_logger() + state = request.form.get('state', '') + next_url = session.pop_extra_attribute('fc_next_url_' + state, '') or pub.get_frontoffice_url() + + if 'code' not in request.form: + error = request.form.get('error') + # if no error parameter, we stay silent + if error: + # we log only errors whose user is not responsible + msg = _(self.AUTHORIZATION_REQUEST_ERRORS.get(error)) + if not msg: + msg = _('FranceConnect authentication failed') + logger.error('FranceConnect authentication failed with an unknown error: %s', + error) + session.message = ('error', msg) + return redirect(next_url) + access_token = self.get_access_token(request.form['code']) + if not access_token: + return redirect(next_url) + user_info = self.get_user_info(access_token) + if not user_info: + return redirect(next_url) + # Store user info in session + flattened_user_info = user_info.copy() + flatten_dict(flattened_user_info) + session_var_fc_user = {} + for key in flattened_user_info: + session_var_fc_user['fc_user_' + key] = flattened_user_info[key] + session.add_extra_variables(**session_var_fc_user) + + # Lookup or create user + sub = user_info['sub'] + user = None + for user in pub.user_class.get_users_with_name_identifier(sub): + break + if not user: + user = pub.user_class(sub) + user.name_identifiers = [sub] + + # XXX: should we fail login if user info is not sufficient as with SAML 2.0 login ? + # XXX: should we implement an authentication less mode, it would just store FC user informations into the session variables + self.fill_user_attributes(user, user_info) + user.store() + session.set_user(user.id) + return redirect(next_url) diff --git a/wcs/qommon/publisher.py b/wcs/qommon/publisher.py index c62995d..87eb5e6 100644 --- a/wcs/qommon/publisher.py +++ b/wcs/qommon/publisher.py @@ -670,6 +670,8 @@ class QommonPublisher(Publisher, object): if lasso: import qommon.ident.idp classes.append(qommon.ident.idp.IdPAuthMethod) + import qommon.ident.franceconnect + classes.append(qommon.ident.franceconnect.FCAuthMethod) import qommon.ident.password classes.append(qommon.ident.password.PasswordAuthMethod) self.ident_methods = {} diff --git a/wcs/qommon/sessions.py b/wcs/qommon/sessions.py index a3c730c..192ddb0 100644 --- a/wcs/qommon/sessions.py +++ b/wcs/qommon/sessions.py @@ -82,6 +82,7 @@ class Session(QommonSession, CaptchaSession, StorableObject): jsonp_display_values = None extra_variables = None expire = None + extra_attributes = None username = None # only set on password authentication @@ -116,6 +117,7 @@ class Session(QommonSession, CaptchaSession, StorableObject): self.extra_variables or \ CaptchaSession.has_info(self) or \ self.expire or \ + self.extra_attributes or \ QuixoteSession.has_info(self) is_dirty = has_info @@ -248,6 +250,16 @@ class Session(QommonSession, CaptchaSession, StorableObject): d[prefix + k] = v return d + def set_extra_attribute(self, key, value): + self.extra_attributes = self.extra_attributes = {} + self.extra_attributes[key] = value + + def get_extra_attribute(self, key, default=None): + return (self.extra_attributes or {}).get(key, default) + + def pop_extra_attribute(self, key, default=None): + return (self.extra_attributes or {}).pop(key, default) + class QommonSessionManager(QuixoteSessionManager): def start_request(self): -- 2.1.4