From 51b29e437d020d3a1a387c5dd801e35f248ec8e8 Mon Sep 17 00:00:00 2001
From: Benjamin Dauvergne
Date: Thu, 5 Jan 2017 15:31:13 +0100
Subject: [PATCH 3/4] add france connect authentication method
---
wcs/admin/settings.py | 2 +
wcs/qommon/ident/france_connect.py | 445 ++++++++++++++++++++++++++++++++++++
wcs/qommon/publisher.py | 2 +
wcs/qommon/sessions.py | 12 +
wcs/qommon/static/css/dc2/admin.css | 4 +
5 files changed, 465 insertions(+)
create mode 100644 wcs/qommon/ident/france_connect.py
diff --git a/wcs/admin/settings.py b/wcs/admin/settings.py
index 21d72fe..6fa1649 100644
--- a/wcs/admin/settings.py
+++ b/wcs/admin/settings.py
@@ -71,6 +71,8 @@ class IdentificationDirectory(Directory):
if lasso is not None:
methods.insert(0,
('idp', _('Delegated to SAML identity provider')))
+ methods.insert(0,
+ ('fc', _('Delegated to France Connect')))
form.add(CheckboxesWidget, 'methods', title = _('Methods'),
value=identification_cfg.get('methods'),
options=methods,
diff --git a/wcs/qommon/ident/france_connect.py b/wcs/qommon/ident/france_connect.py
new file mode 100644
index 0000000..f8f9759
--- /dev/null
+++ b/wcs/qommon/ident/france_connect.py
@@ -0,0 +1,445 @@
+# w.c.s. - web application for online forms
+# Copyright (C) 2005-2017 Entr'ouvert
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, see .
+
+import json
+import urllib
+import uuid
+import hashlib
+import base64
+
+from quixote import redirect, get_session, get_publisher, get_request
+from quixote.directory import Directory
+from quixote.html import htmltext, TemplateIO
+
+from qommon.backoffice.menu import html_top
+from qommon import template, get_cfg, get_logger
+from qommon.form import (Form, StringWidget, CompositeWidget, ComputedExpressionWidget,
+ RadiobuttonsWidget, SingleSelectWidget, WidgetListAsTable)
+from qommon.misc import http_post_request, http_get_page, compute, json_loads, C_, flatten_dict
+from .base import AuthMethod
+
+ADMIN_TITLE = N_('France Connect')
+
+# XXX: make an OIDC auth method that France Connect would inherit from
+
+
+def base64url_decode(input):
+ rem = len(input) % 4
+ if rem > 0:
+ input += b'=' * (4 - rem)
+ return base64.urlsafe_b64decode(input)
+
+
+class UserFieldMappingRowWidget(CompositeWidget):
+ def __init__(self, name, value=None, **kwargs):
+ CompositeWidget.__init__(self, name, value, **kwargs)
+ if not value:
+ value = {}
+
+ fields = []
+ users_cfg = get_cfg('users', {})
+ user_formdef = get_publisher().user_class.get_formdef()
+ if not user_formdef or not users_cfg.get('field_name'):
+ fields.append(('__name', _('Name'), '__name'))
+ if not user_formdef or not users_cfg.get('field_email'):
+ fields.append(('__email', _('Email'), '__email'))
+ if user_formdef and user_formdef.fields:
+ for field in user_formdef.fields:
+ if field.varname:
+ fields.append((field.varname, field.label, field.varname))
+
+ self.add(SingleSelectWidget, name='field_varname', title=_('Field'),
+ value=value.get('field_varname'),
+ options=fields, **kwargs)
+ self.add(ComputedExpressionWidget, name='value', title=_('Value'),
+ value=value.get('value'))
+ self.add(SingleSelectWidget, 'verified',
+ title=_('Is attribute verified'),
+ value=value.get('verified'),
+ options=[('never', _('Never')),
+ ('always', _('Always'))
+ ]
+ )
+
+ def _parse(self, request):
+ if self.get('value') and self.get('field_varname') and self.get('verified'):
+ self.value = {
+ 'value': self.get('value'),
+ 'field_varname': self.get('field_varname'),
+ 'verified': self.get('verified'),
+ }
+ else:
+ self.value = None
+
+
+class UserFieldMappingTableWidget(WidgetListAsTable):
+ readonly = False
+
+ def __init__(self, name, **kwargs):
+ super(UserFieldMappingTableWidget, self).__init__(
+ name, element_type=UserFieldMappingRowWidget, **kwargs)
+
+
+class MethodDirectory(Directory):
+ _q_exports = ['login', 'callback']
+
+ def login(self):
+ return FCAuthMethod().login()
+
+ def callback(self):
+ return FCAuthMethod().callback()
+
+
+class MethodAdminDirectory(Directory):
+ title = ADMIN_TITLE
+ label = N_('Configure France Connect identification method')
+
+ _q_exports = ['']
+
+ PLAFTORMS = [
+ {
+ 'name': N_('Development citizens'),
+ 'slug': 'dev-particulier',
+ 'authorization_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize',
+ 'token_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/token',
+ 'user_info_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/userinfo',
+ 'logout_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/logout',
+ },
+ {
+ 'name': N_('Development enterprise'),
+ 'slug': 'dev-entreprise',
+ 'authorization_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/authorize',
+ 'token_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/token',
+ 'user_info_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/userinfo',
+ 'logout_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/logout',
+ },
+ {
+ 'name': N_('Production citizens'),
+ 'slug': 'prod-particulier',
+ 'authorization_url': 'https://app.franceconnect.gouv.fr/api/v1/authorize',
+ 'token_url': 'https://app.franceconnect.gouv.fr/api/v1/token',
+ 'user_info_url': 'https://app.franceconnect.gouv.fr/api/v1/userinfo',
+ 'logout_url': 'https://app.franceconnect.gouv.fr/api/v1/logout',
+ }
+ ]
+
+ CONFIG = [
+ ('client_id', N_('Client ID')),
+ ('client_secret', N_('Client secret')),
+ ('platform', N_('Platform')),
+ ('scopes', N_('Scopes')),
+ ('user_field_mappings', N_('User field mappings')),
+ ]
+
+ KNOWN_ATTRIBUTES = [
+ ('given_name', N_('fc_given_name_description|first names separated by spaces')),
+ ('family_name', N_('fc_last_name_description|birth\'s last name')),
+ ('birthdate', N_('fc_birthdate_description|birthdate formatted as YYYY-MM-DD')),
+ ('gender', N_('fc_gender_description|gender \'male\' for men, and \'female\' for women')),
+ ('birthplace', N_('fc_birthplace_description|INSEE code of the place of birth')),
+ ('birthcountry', N_('fc_birthplace_description|INSEE code of the country of birth')),
+ ('email', N_('fc_email_description|email')),
+ ('siret', N_('fc_siret_description|SIRET or SIREN number of the enterprise')),
+ # XXX: France Connect website also refer to adress and phones attributes but we don't know
+ # what must be expected of their value.
+ ]
+
+ @classmethod
+ def get_form(cls, instance={}):
+ form = Form(enctype='multipart/form-data')
+ for key, title in cls.CONFIG:
+ attrs = {}
+ default = None
+ hint = None
+ kwargs = {}
+ widget = StringWidget
+
+ if key == 'user_field_mappings':
+ widget = UserFieldMappingTableWidget
+ elif key == 'platform':
+ widget = SingleSelectWidget
+ kwargs['options'] = [
+ (platform['slug'], platform['name']) for platform in cls.PLAFTORMS
+ ]
+ elif key == 'scopes':
+ default = 'identite_pivot address email phones'
+ hint = _('Space separated values among: identite_pivot, address, email, phones, '
+ 'profile, birth, preferred_username, gender, birthdate, '
+ 'birthcountry, birthplace')
+ if widget == StringWidget:
+ kwargs['class'] = 'large'
+ form.add(widget, key,
+ title=_(title),
+ hint=hint,
+ value=instance.get(key, default),
+ attrs=attrs, **kwargs)
+ form.add_submit('submit', _('Submit'))
+ return form
+
+ def submit(self, form):
+ cfg = {}
+ for key, title in self.CONFIG:
+ cfg[key] = form.get_widget(key).parse()
+ get_publisher().cfg['fc'] = cfg
+ get_publisher().write_cfg()
+ return redirect('.')
+
+ def _q_index(self):
+ fc_cfg = get_cfg('fc', {})
+ form = self.get_form(fc_cfg)
+ pub = get_publisher()
+
+ if not ('submit' in get_request().form and form.is_submitted()) or form.has_errors():
+ html_top('settings', title=_(self.title))
+ r = TemplateIO(html=True)
+ r += htmltext('%s
') % self.title
+ fc_callback = pub.get_frontoffice_url() + '/ident/fc/callback'
+ r += htmltext(_('Callback URL is %s.
')) % (
+ fc_callback, fc_callback)
+ r += htmltext(_('See '
+ 'France Connect partners\'site for getting a client_id and '
+ 'a client_secret.
'))
+ r += form.render()
+ r += htmltext('See France Connect partners\'site for more '
+ 'informations on available scopes and attributes. Known ones '
+ 'are :
')
+ r += (htmltext('
%s | %s |
') %
+ (_('Attribute'), _('Description')))
+ for attribute, description in self.KNOWN_ATTRIBUTES:
+ r += htmltext('%s | %s |
') % (attribute, C_(description))
+ r += htmltext('
')
+
+ return r.getvalue()
+ else:
+ return self.submit(form)
+
+
+class FCAuthMethod(AuthMethod):
+ key = 'fc'
+ description = ADMIN_TITLE
+ method_directory = MethodDirectory
+ method_admin_directory = MethodAdminDirectory
+
+ def is_ok(self):
+ fc_cfg = get_cfg('fc', {})
+ for key, title in self.method_admin_directory.CONFIG:
+ if not fc_cfg.get(key):
+ return False
+ return True
+
+ def login(self):
+ if not self.is_ok():
+ return template.error_page(_('France Connect support is not yet configured'))
+
+ fc_cfg = get_cfg('fc', {})
+ pub = get_publisher()
+ session = get_session()
+
+ authorization_url = self.get_authorization_url()
+ client_id = fc_cfg.get('client_id')
+ state = str(uuid.uuid4())
+ session = get_session()
+ nonce = hashlib.sha256(str(session.id)).hexdigest()
+ next_url = get_request().form.get('next') or pub.get_frontoffice_url()
+ session.set_extra('fc_next_url_' + state, next_url)
+ fc_callback = pub.get_frontoffice_url() + '/ident/fc/callback'
+ qs = urllib.urlencode({
+ 'response_type': 'code',
+ 'client_id': client_id,
+ 'redirect_uri': fc_callback,
+ 'scope': 'openid ' + fc_cfg.get('scopes', ''),
+ 'state': state,
+ 'nonce': nonce,
+ })
+ redirect_url = '%s?%s' % (authorization_url, qs)
+ return redirect(redirect_url)
+
+ def is_interactive(self):
+ return False
+
+ def get_access_token(self, code):
+ logger = get_logger()
+ session = get_session()
+ fc_cfg = get_cfg('fc', {})
+ client_id = fc_cfg.get('client_id')
+ client_secret = fc_cfg.get('client_secret')
+ redirect_uri = get_request().get_frontoffice_url().split('?')[0]
+ body = {
+ 'grant_type': 'authorization_code',
+ 'redirect_uri': redirect_uri,
+ 'client_id': client_id,
+ 'client_secret': client_secret,
+ 'code': code,
+ }
+ response, status, data, auth_header = http_post_request(
+ self.get_token_url(),
+ urllib.urlencode(body),
+ headers={
+ 'Content-Type': 'application/x-www-form-urlencoded',
+ })
+ if status != 200:
+ logger.error('status from France Connect token_url is not 200')
+ return None
+ result = json_loads(data)
+ if 'error' in result:
+ logger.error('France Connect code resolution failed: %s', result['error'])
+ return None
+ # check id_token nonce
+ id_token = result['id_token']
+ header, payload, signature = id_token.split('.')
+ payload = json_loads(base64url_decode(payload))
+ nonce = hashlib.sha256(str(session.id)).hexdigest()
+ if payload['nonce'] != nonce:
+ logger.error('France Connect returned nonce did not match')
+ return None
+ return result['access_token']
+
+ def get_user_info(self, access_token):
+ logger = get_logger()
+ response, status, data, auth_header = http_get_page(
+ self.get_user_info_url(),
+ headers={
+ 'Authorization': 'Bearer %s' % access_token,
+ })
+ if status != 200:
+ logger.error('status from France Connect user_info_url is not 200 but %s and data is'
+ ' %s', status. data[:100])
+ return None
+ return json.loads(data)
+
+ def get_platform(self):
+ fc_cfg = get_cfg('fc', {})
+ slug = fc_cfg.get('platform')
+ for platform in self.method_admin_directory.PLAFTORMS:
+ if platform['slug'] == slug:
+ return platform
+ raise KeyError('platform %s not found' % slug)
+
+ def get_authorization_url(self):
+ return self.get_platform()['authorization_url']
+
+ def get_token_url(self):
+ return self.get_platform()['token_url']
+
+ def get_user_info_url(self):
+ return self.get_platform()['user_info_url']
+
+ def get_logout_url(self):
+ return self.get_platform()['logout_url']
+
+ def fill_user_attributes(self, user, user_info):
+ fc_cfg = get_cfg('fc', {})
+ user_field_mappings = fc_cfg.get('user_field_mappings', [])
+ user_formdef = get_publisher().user_class.get_formdef()
+
+ form_data = user.form_data or {}
+ user.verified_fields = user.verified_fields or []
+
+ for user_field_mapping in user_field_mappings:
+ field_varname = user_field_mapping['field_varname']
+ value = user_field_mapping['value']
+ verified = user_field_mapping['verified']
+ field_id = None
+
+ try:
+ value = compute(value, context=user_info)
+ except:
+ continue
+ if field_varname == '__name':
+ user.name = value
+ elif field_varname == '__email':
+ user.email = value
+ field_id = 'email'
+ else:
+ for field in user_formdef.fields:
+ if field_varname == field.varname:
+ field_id = str(field.id)
+ break
+ else:
+ continue
+ form_data[field.id] = value
+ if field_varname == '__email':
+ field_varname = 'email' # special value for verified email field
+
+ # Update verified fields
+ if field_id:
+ if verified == 'always' and field_id not in user.verified_fields:
+ user.verified_fields.append(field.id)
+ elif verified != 'always' and field_id in user.verified_fields:
+ user.verified_fields.remove(field.id)
+
+ user.form_data = form_data
+
+ if user.form_data:
+ user.set_attributes_from_formdata(user.form_data)
+
+ AUTHORIZATION_REQUEST_ERRORS = {
+ 'access_denied': N_('You refused the connection'),
+ }
+
+ def callback(self):
+ if not self.is_ok():
+ return template.error_page(_('France Connect support is not yet configured'))
+ pub = get_publisher()
+ request = get_request()
+ session = get_session()
+ logger = get_logger()
+ state = request.form.get('state', '')
+ next_url = session.pop_extra('fc_next_url_' + state, '') or pub.get_frontoffice_url()
+
+ if 'code' not in request.form:
+ error = request.form.get('error')
+ # if no error parameter, we stay silent
+ if error:
+ # we log only errors whose user is not responsible
+ msg = _(self.AUTHORIZATION_REQUEST_ERRORS.get(error))
+ if not msg:
+ msg = _('France Connect authentication failed')
+ logger.error('France Connect authentication failed with an unknown error: %s',
+ error)
+ session.message = ('error', msg)
+ return redirect(next_url)
+ access_token = self.get_access_token(request.form['code'])
+ if not access_token:
+ return redirect(next_url)
+ user_info = self.get_user_info(access_token)
+ if not user_info:
+ return redirect(next_url)
+ # Store user info in session
+ flattened_user_info = user_info.copy()
+ flatten_dict(flattened_user_info)
+ session_var_fc_user = {}
+ for key in flattened_user_info:
+ session_var_fc_user['fc_user_' + key] = flattened_user_info[key]
+ session.add_extra_variables(**session_var_fc_user)
+
+ # Lookup or create user
+ sub = user_info['sub']
+ user = None
+ for user in pub.user_class.get_users_with_name_identifier(sub):
+ break
+ if not user:
+ user = pub.user_class(sub)
+ user.name_identifiers = [sub]
+
+ # XXX: should we fail login if user info is not sufficient as with SAML 2.0 login ?
+ # XXX: should we implement an authentication less mode, it would just store FC user informations into the session variables
+ self.fill_user_attributes(user, user_info)
+ user.store()
+ session.set_user(user.id)
+ return redirect(next_url)
diff --git a/wcs/qommon/publisher.py b/wcs/qommon/publisher.py
index 7a7d99b..f717fe0 100644
--- a/wcs/qommon/publisher.py
+++ b/wcs/qommon/publisher.py
@@ -670,6 +670,8 @@ class QommonPublisher(Publisher, object):
if lasso:
import qommon.ident.idp
classes.append(qommon.ident.idp.IdPAuthMethod)
+ import qommon.ident.france_connect
+ classes.append(qommon.ident.france_connect.FCAuthMethod)
import qommon.ident.password
classes.append(qommon.ident.password.PasswordAuthMethod)
self.ident_methods = {}
diff --git a/wcs/qommon/sessions.py b/wcs/qommon/sessions.py
index b980aee..3070a34 100644
--- a/wcs/qommon/sessions.py
+++ b/wcs/qommon/sessions.py
@@ -81,6 +81,7 @@ class Session(QommonSession, CaptchaSession, StorableObject):
jsonp_display_values = None
extra_variables = None
expire = None
+ extra = {}
username = None # only set on password authentication
@@ -114,6 +115,7 @@ class Session(QommonSession, CaptchaSession, StorableObject):
self.extra_variables or \
CaptchaSession.has_info(self) or \
self.expire or \
+ self.extra or \
QuixoteSession.has_info(self)
is_dirty = has_info
@@ -238,6 +240,16 @@ class Session(QommonSession, CaptchaSession, StorableObject):
d[prefix + k] = v
return d
+ def set_extra(self, key, value):
+ self.extra = self.extra = {}
+ self.extra[key] = value
+
+ def get_extra(self, key, default=None):
+ return (self.extra or {}).get(key, default)
+
+ def pop_extra(self, key, default=None):
+ return (self.extra or {}).pop(key, default)
+
class QommonSessionManager(QuixoteSessionManager):
def start_request(self):
diff --git a/wcs/qommon/static/css/dc2/admin.css b/wcs/qommon/static/css/dc2/admin.css
index 35af8c7..a85e883 100644
--- a/wcs/qommon/static/css/dc2/admin.css
+++ b/wcs/qommon/static/css/dc2/admin.css
@@ -1550,3 +1550,7 @@ div#sidebar div.MapWidget {
.ui-dialog .ui-widget-content .ui-state-default.submit-button:hover {
border-color: #283c94;
}
+
+div.widget div.content input.large {
+ width: 100%;
+}
--
2.1.4