0001-add-FranceConnect-authentication-method-14510.patch
tests/test_fc_auth.py | ||
---|---|---|
1 |
import urlparse |
|
2 |
import base64 |
|
3 |
import json |
|
4 |
import urllib |
|
5 | ||
6 |
from quixote import cleanup, get_session_manager |
|
7 | ||
8 |
from utilities import get_app, create_temporary_pub |
|
9 |
import mock |
|
10 | ||
11 |
PROFILE = { |
|
12 |
'fields': [ |
|
13 |
{ |
|
14 |
'kind': 'string', |
|
15 |
'description': '', |
|
16 |
'required': True, |
|
17 |
'user_visible': True, |
|
18 |
'label': u'Prenoms', |
|
19 |
'disabled': False, |
|
20 |
'user_editable': True, |
|
21 |
'asked_on_registration': True, |
|
22 |
'name': 'prenoms' |
|
23 |
}, |
|
24 |
{ |
|
25 |
'kind': 'string', |
|
26 |
'description': '', |
|
27 |
'required': True, |
|
28 |
'user_visible': True, |
|
29 |
'label': 'Nom', |
|
30 |
'disabled': False, |
|
31 |
'user_editable': True, |
|
32 |
'asked_on_registration': True, |
|
33 |
'name': 'nom' |
|
34 |
}, |
|
35 |
{ |
|
36 |
'kind': 'string', |
|
37 |
'description': '', |
|
38 |
'required': True, |
|
39 |
'user_visible': True, |
|
40 |
'label': 'Email', |
|
41 |
'disabled': False, |
|
42 |
'user_editable': True, |
|
43 |
'asked_on_registration': True, |
|
44 |
'name': 'email' |
|
45 |
}, |
|
46 |
] |
|
47 |
} |
|
48 | ||
49 | ||
50 |
def base64url_encode(v): |
|
51 |
return base64.urlsafe_b64encode(v).strip('=') |
|
52 | ||
53 | ||
54 |
def setup_module(module): |
|
55 |
cleanup() |
|
56 |
global pub |
|
57 |
pub = create_temporary_pub() |
|
58 | ||
59 | ||
60 |
def setup_user_profile(pub): |
|
61 |
if not pub.cfg: |
|
62 |
pub.cfg = {} |
|
63 |
# create some roles |
|
64 |
from wcs.ctl.check_hobos import CmdCheckHobos |
|
65 | ||
66 |
# setup an hobo profile |
|
67 |
CmdCheckHobos().update_profile(PROFILE, pub) |
|
68 |
pub.cfg['users']['field_name'] = ['_prenoms', '_nom'] |
|
69 |
pub.cfg['debug'] = {'logger': True} |
|
70 |
pub.user_class.wipe() |
|
71 |
pub.write_cfg() |
|
72 | ||
73 |
FC_CONFIG = { |
|
74 |
'client_id': '123', |
|
75 |
'client_secret': 'xyz', |
|
76 |
'platform': 'dev-particulier', |
|
77 |
'scopes': 'identite_pivot', |
|
78 |
'user_field_mappings': [ |
|
79 |
{ |
|
80 |
'field_varname': 'prenoms', |
|
81 |
'value': '[given_name ""]', |
|
82 |
'verified': 'always', |
|
83 |
}, |
|
84 |
{ |
|
85 |
'field_varname': 'nom', |
|
86 |
'value': '[family_name ""]', |
|
87 |
'verified': 'always', |
|
88 |
}, |
|
89 |
{ |
|
90 |
'field_varname': 'email', |
|
91 |
'value': '[email ""]', |
|
92 |
'verified': 'always', |
|
93 |
}, |
|
94 |
] |
|
95 | ||
96 |
} |
|
97 | ||
98 | ||
99 |
def setup_fc_environment(pub): |
|
100 |
if not pub.cfg: |
|
101 |
pub.cfg = {} |
|
102 |
pub.cfg['identification'] = { |
|
103 |
'methods': ['fc'], |
|
104 |
} |
|
105 |
pub.cfg['fc'] = FC_CONFIG |
|
106 |
pub.user_class.wipe() |
|
107 |
pub.write_cfg() |
|
108 | ||
109 | ||
110 |
def get_session(app): |
|
111 |
try: |
|
112 |
session_id = app.cookies.values()[0] |
|
113 |
except IndexError: |
|
114 |
return None |
|
115 |
else: |
|
116 |
session_id = session_id.strip('"') |
|
117 |
return get_session_manager().session_class.get(session_id) |
|
118 | ||
119 | ||
120 |
def test_fc_login_page(caplog): |
|
121 |
setup_user_profile(pub) |
|
122 |
setup_fc_environment(pub) |
|
123 |
app = get_app(pub) |
|
124 |
resp = app.get('/') |
|
125 |
resp = app.get('/login/') |
|
126 |
assert resp.status_int == 302 |
|
127 |
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize') |
|
128 |
qs = urlparse.parse_qs(resp.location.split('?')[1]) |
|
129 |
nonce = qs['nonce'][0] |
|
130 |
state = qs['state'][0] |
|
131 | ||
132 |
id_token = { |
|
133 |
'nonce': nonce, |
|
134 |
} |
|
135 |
token_result = { |
|
136 |
'access_token': 'abcd', |
|
137 |
'id_token': '.%s.' % base64url_encode(json.dumps(id_token)), |
|
138 |
} |
|
139 |
user_info_result = { |
|
140 |
'sub': 'ymca', |
|
141 |
'given_name': 'John', |
|
142 |
'family_name': 'Doe', |
|
143 |
'email': 'john.doe@example.com', |
|
144 |
} |
|
145 | ||
146 |
assert pub.user_class.count() == 0 |
|
147 |
with mock.patch('qommon.ident.franceconnect.http_post_request') as http_post_request, \ |
|
148 |
mock.patch('qommon.ident.franceconnect.http_get_page') as http_get_page: |
|
149 |
http_post_request.return_value = (None, 200, json.dumps(token_result), None) |
|
150 |
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None) |
|
151 |
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({ |
|
152 |
'code': '1234', 'state': state, |
|
153 |
})) |
|
154 |
assert pub.user_class.count() == 1 |
|
155 |
user = pub.user_class.select()[0] |
|
156 |
assert user.form_data == {'_email': 'john.doe@example.com', '_nom': 'Doe', '_prenoms': 'John'} |
|
157 |
assert set(user.verified_fields) == set(['_nom', '_prenoms', '_email']) |
|
158 |
assert user.email == 'john.doe@example.com' |
|
159 |
assert user.name_identifiers == ['ymca'] |
|
160 |
assert user.name == 'John Doe' |
|
161 | ||
162 |
# Verify we are logged in |
|
163 |
session = get_session(app) |
|
164 |
assert session.user == user.id |
|
165 |
assert session.extra_user_variables['fc_given_name'] == 'John' |
|
166 |
assert session.extra_user_variables['fc_family_name'] == 'Doe' |
|
167 |
assert session.extra_user_variables['fc_email'] == 'john.doe@example.com' |
|
168 |
assert session.extra_user_variables['fc_sub'] == 'ymca' |
|
169 | ||
170 |
resp = app.get('/logout') |
|
171 | ||
172 |
# Test error handling path |
|
173 |
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({ |
|
174 |
'state': state, |
|
175 |
'error': 'access_denied', |
|
176 |
})) |
|
177 |
assert 'user did not authorize login' in caplog.records[-1].message |
|
178 |
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({ |
|
179 |
'state': state, |
|
180 |
'error': 'whatever', |
|
181 |
})) |
|
182 |
assert 'whatever' in caplog.records[-1].message |
|
183 | ||
184 |
# Login existing user |
|
185 |
def logme(login_url): |
|
186 |
resp = app.get(login_url) |
|
187 |
assert resp.status_int == 302 |
|
188 |
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize') |
|
189 |
qs = urlparse.parse_qs(resp.location.split('?')[1]) |
|
190 |
state = qs['state'][0] |
|
191 |
id_token['nonce'] = qs['nonce'][0] |
|
192 |
token_result['id_token'] = '.%s.' % base64url_encode(json.dumps(id_token)) |
|
193 | ||
194 |
with mock.patch('qommon.ident.franceconnect.http_post_request') as http_post_request, \ |
|
195 |
mock.patch('qommon.ident.franceconnect.http_get_page') as http_get_page: |
|
196 |
http_post_request.return_value = (None, 200, json.dumps(token_result), None) |
|
197 |
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None) |
|
198 |
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({ |
|
199 |
'code': '1234', 'state': state, |
|
200 |
})) |
|
201 |
return resp |
|
202 |
app.get('/logout') |
|
203 |
resp = logme('/login/') |
|
204 |
new_session = get_session(app) |
|
205 |
assert session.id != new_session.id, 'no new session created' |
|
206 |
assert pub.user_class.count() == 1, 'existing user has not been used' |
|
207 |
assert new_session.user == user.id |
|
208 | ||
209 |
# Login with next url |
|
210 |
app.get('/logout') |
|
211 |
resp = logme('/login/?next=/foo/bar/') |
|
212 |
assert resp.status_int == 302 |
|
213 |
assert resp.location.endswith('/foo/bar/') |
|
214 | ||
215 |
# Direct login link |
|
216 |
app.get('/logout') |
|
217 |
resp = logme('/ident/fc/login') |
|
218 |
new_session = get_session(app) |
|
219 |
assert session.id != new_session.id, 'no new session created' |
|
220 |
assert pub.user_class.count() == 1, 'existing user has not been used' |
|
221 |
assert new_session.user == user.id |
|
222 |
app.get('/logout') |
|
223 |
resp = logme('/ident/fc/login?next=/foo/bar/') |
|
224 |
assert resp.status_int == 302 |
|
225 |
assert resp.location.endswith('/foo/bar/') |
|
226 | ||
227 |
# User with missing attributes |
|
228 |
resp = app.get('/logout') |
|
229 |
resp = app.get('/login/') |
|
230 |
assert resp.status_int == 302 |
|
231 |
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize') |
|
232 |
qs = urlparse.parse_qs(resp.location.split('?')[1]) |
|
233 |
state = qs['state'][0] |
|
234 |
id_token['nonce'] = qs['nonce'][0] |
|
235 |
token_result['id_token'] = '.%s.' % base64url_encode(json.dumps(id_token)) |
|
236 |
bad_user_info_result = { |
|
237 |
'sub': 'ymca2', |
|
238 |
'given_name': 'John', |
|
239 |
'family_name': 'Deux', |
|
240 |
# 'email': 'john.deux@example.com', # missing |
|
241 |
} |
|
242 |
with mock.patch('qommon.ident.franceconnect.http_post_request') as http_post_request, \ |
|
243 |
mock.patch('qommon.ident.franceconnect.http_get_page') as http_get_page: |
|
244 |
http_post_request.return_value = (None, 200, json.dumps(token_result), None) |
|
245 |
http_get_page.return_value = (None, 200, json.dumps(bad_user_info_result), None) |
|
246 |
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({ |
|
247 |
'code': '1234', 'state': state, |
|
248 |
})) |
|
249 |
assert pub.user_class.count() == 1, 'an invalid user (no email) has been created' |
|
250 |
session = get_session(app) |
|
251 |
assert not session |
|
252 | ||
253 | ||
254 |
def test_fc_settings(): |
|
255 |
setup_user_profile(pub) |
|
256 |
app = get_app(pub) |
|
257 |
resp = app.get('/backoffice/settings/identification/') |
|
258 |
resp.forms[0]['methods$elementfc'].checked = True |
|
259 |
resp = resp.forms[0].submit().follow() |
|
260 | ||
261 |
assert 'FranceConnect' in resp.body |
|
262 |
resp = resp.click('FranceConnect') |
|
263 |
resp = resp.forms[0].submit('user_field_mappings$add_element') |
|
264 |
resp = resp.forms[0].submit('user_field_mappings$add_element') |
|
265 |
resp.forms[0]['client_id'].value = '123' |
|
266 |
resp.forms[0]['client_secret'].value = 'xyz' |
|
267 |
resp.forms[0]['platform'].value = 'Development citizens' |
|
268 |
resp.forms[0]['scopes'].value = 'identite_pivot' |
|
269 | ||
270 |
resp.forms[0]['user_field_mappings$element0$field_varname'] = 'prenoms' |
|
271 |
resp.forms[0]['user_field_mappings$element0$value'] = '[given_name ""]' |
|
272 |
resp.forms[0]['user_field_mappings$element0$verified'] = 'Always' |
|
273 | ||
274 |
resp.forms[0]['user_field_mappings$element1$field_varname'] = 'nom' |
|
275 |
resp.forms[0]['user_field_mappings$element1$value'] = '[family_name ""]' |
|
276 |
resp.forms[0]['user_field_mappings$element1$verified'] = 'Always' |
|
277 | ||
278 |
resp.forms[0]['user_field_mappings$element2$field_varname'] = 'email' |
|
279 |
resp.forms[0]['user_field_mappings$element2$value'] = '[email ""]' |
|
280 |
resp.forms[0]['user_field_mappings$element2$verified'] = 'Always' |
|
281 | ||
282 |
resp = resp.forms[0].submit('submit').follow() |
|
283 |
assert pub.cfg['fc'] == FC_CONFIG |
|
284 | ||
285 | ||
286 |
def test_fc_settings_no_user_profile(): |
|
287 |
FC_CONFIG = { |
|
288 |
'client_id': '123', |
|
289 |
'client_secret': 'xyz', |
|
290 |
'platform': 'dev-particulier', |
|
291 |
'scopes': 'identite_pivot', |
|
292 |
'user_field_mappings': [ |
|
293 |
{ |
|
294 |
'field_varname': '__name', |
|
295 |
'value': '[given_name ""] [family_name ""]', |
|
296 |
'verified': 'always', |
|
297 |
}, |
|
298 |
{ |
|
299 |
'field_varname': '__email', |
|
300 |
'value': '[email ""]', |
|
301 |
'verified': 'always', |
|
302 |
}, |
|
303 |
] |
|
304 | ||
305 |
} |
|
306 | ||
307 |
pub.cfg = {} |
|
308 |
pub.user_class.wipe() |
|
309 |
pub.write_cfg() |
|
310 |
app = get_app(pub) |
|
311 |
resp = app.get('/backoffice/settings/identification/') |
|
312 |
resp.forms[0]['methods$elementfc'].checked = True |
|
313 |
resp = resp.forms[0].submit().follow() |
|
314 | ||
315 |
assert 'FranceConnect' in resp.body |
|
316 |
resp = resp.click('FranceConnect') |
|
317 |
resp = resp.forms[0].submit('user_field_mappings$add_element') |
|
318 |
resp = resp.forms[0].submit('user_field_mappings$add_element') |
|
319 |
resp.forms[0]['client_id'].value = '123' |
|
320 |
resp.forms[0]['client_secret'].value = 'xyz' |
|
321 |
resp.forms[0]['platform'].value = 'Development citizens' |
|
322 |
resp.forms[0]['scopes'].value = 'identite_pivot' |
|
323 | ||
324 |
resp.forms[0]['user_field_mappings$element0$field_varname'] = '__name' |
|
325 |
resp.forms[0]['user_field_mappings$element0$value'] = '[given_name ""] [family_name ""]' |
|
326 |
resp.forms[0]['user_field_mappings$element0$verified'] = 'Always' |
|
327 | ||
328 |
resp.forms[0]['user_field_mappings$element2$field_varname'] = '__email' |
|
329 |
resp.forms[0]['user_field_mappings$element2$value'] = '[email ""]' |
|
330 |
resp.forms[0]['user_field_mappings$element2$verified'] = 'Always' |
|
331 | ||
332 |
resp = resp.forms[0].submit('submit').follow() |
|
333 |
assert pub.cfg['fc'] == FC_CONFIG |
wcs/admin/settings.py | ||
---|---|---|
71 | 71 |
if lasso is not None: |
72 | 72 |
methods.insert(0, |
73 | 73 |
('idp', _('Delegated to SAML identity provider'), 'idp')) |
74 |
methods.append(('fc', _('Delegated to FranceConnect'), 'fc')) |
|
74 | 75 |
form.add(CheckboxesWidget, 'methods', title = _('Methods'), |
75 | 76 |
value=identification_cfg.get('methods'), |
76 | 77 |
options=methods, |
wcs/qommon/ident/franceconnect.py | ||
---|---|---|
1 |
# w.c.s. - web application for online forms |
|
2 |
# Copyright (C) 2005-2017 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software; you can redistribute it and/or modify |
|
5 |
# it under the terms of the GNU General Public License as published by |
|
6 |
# the Free Software Foundation; either version 2 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU General Public License |
|
15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import base64 |
|
18 |
import hashlib |
|
19 |
import sys |
|
20 |
import urllib |
|
21 |
import uuid |
|
22 | ||
23 |
from quixote import redirect, get_session, get_publisher, get_request, get_session_manager |
|
24 |
from quixote.directory import Directory |
|
25 |
from quixote.html import htmltext, TemplateIO |
|
26 | ||
27 |
from qommon import _ |
|
28 |
from qommon.backoffice.menu import html_top |
|
29 |
from qommon import template, get_cfg, get_logger |
|
30 |
from qommon.form import (Form, StringWidget, CompositeWidget, ComputedExpressionWidget, |
|
31 |
SingleSelectWidget, WidgetListAsTable) |
|
32 |
from qommon.misc import http_post_request, http_get_page, json_loads |
|
33 | ||
34 |
from wcs.workflows import WorkflowStatusItem |
|
35 |
from wcs.formdata import flatten_dict |
|
36 | ||
37 |
from .base import AuthMethod |
|
38 | ||
39 | ||
40 |
ADMIN_TITLE = N_('FranceConnect') |
|
41 | ||
42 |
# XXX: make an OIDC auth method that FranceConnect would inherit from |
|
43 | ||
44 | ||
45 |
def base64url_decode(input): |
|
46 |
rem = len(input) % 4 |
|
47 |
if rem > 0: |
|
48 |
input += b'=' * (4 - rem) |
|
49 |
return base64.urlsafe_b64decode(input) |
|
50 | ||
51 | ||
52 |
class UserFieldMappingRowWidget(CompositeWidget): |
|
53 |
def __init__(self, name, value=None, **kwargs): |
|
54 |
CompositeWidget.__init__(self, name, value, **kwargs) |
|
55 |
if not value: |
|
56 |
value = {} |
|
57 | ||
58 |
fields = [] |
|
59 |
users_cfg = get_cfg('users', {}) |
|
60 |
user_formdef = get_publisher().user_class.get_formdef() |
|
61 |
if not user_formdef or not users_cfg.get('field_name'): |
|
62 |
fields.append(('__name', _('Name'), '__name')) |
|
63 |
if not user_formdef or not users_cfg.get('field_email'): |
|
64 |
fields.append(('__email', _('Email'), '__email')) |
|
65 |
if user_formdef and user_formdef.fields: |
|
66 |
for field in user_formdef.fields: |
|
67 |
if field.varname: |
|
68 |
fields.append((field.varname, field.label, field.varname)) |
|
69 | ||
70 |
self.add(SingleSelectWidget, name='field_varname', title=_('Field'), |
|
71 |
value=value.get('field_varname'), |
|
72 |
options=fields, **kwargs) |
|
73 |
self.add(ComputedExpressionWidget, name='value', title=_('Value'), |
|
74 |
value=value.get('value')) |
|
75 |
self.add(SingleSelectWidget, 'verified', |
|
76 |
title=_('Is attribute verified'), |
|
77 |
value=value.get('verified'), |
|
78 |
options=[('never', _('Never')), |
|
79 |
('always', _('Always')) |
|
80 |
] |
|
81 |
) |
|
82 | ||
83 |
def _parse(self, request): |
|
84 |
if self.get('value') and self.get('field_varname') and self.get('verified'): |
|
85 |
self.value = { |
|
86 |
'value': self.get('value'), |
|
87 |
'field_varname': self.get('field_varname'), |
|
88 |
'verified': self.get('verified'), |
|
89 |
} |
|
90 |
else: |
|
91 |
self.value = None |
|
92 | ||
93 | ||
94 |
class UserFieldMappingTableWidget(WidgetListAsTable): |
|
95 |
readonly = False |
|
96 | ||
97 |
def __init__(self, name, **kwargs): |
|
98 |
super(UserFieldMappingTableWidget, self).__init__( |
|
99 |
name, element_type=UserFieldMappingRowWidget, **kwargs) |
|
100 | ||
101 | ||
102 |
class MethodDirectory(Directory): |
|
103 |
_q_exports = ['login', 'callback'] |
|
104 | ||
105 |
def login(self): |
|
106 |
return FCAuthMethod().login() |
|
107 | ||
108 |
def callback(self): |
|
109 |
return FCAuthMethod().callback() |
|
110 | ||
111 | ||
112 |
class MethodAdminDirectory(Directory): |
|
113 |
title = ADMIN_TITLE |
|
114 |
label = N_('Configure FranceConnect identification method') |
|
115 | ||
116 |
_q_exports = [''] |
|
117 | ||
118 |
PLAFTORMS = [ |
|
119 |
{ |
|
120 |
'name': N_('Development citizens'), |
|
121 |
'slug': 'dev-particulier', |
|
122 |
'authorization_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize', |
|
123 |
'token_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/token', |
|
124 |
'user_info_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/userinfo', |
|
125 |
'logout_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/logout', |
|
126 |
}, |
|
127 |
{ |
|
128 |
'name': N_('Development enterprise'), |
|
129 |
'slug': 'dev-entreprise', |
|
130 |
'authorization_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/authorize', |
|
131 |
'token_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/token', |
|
132 |
'user_info_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/userinfo', |
|
133 |
'logout_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/logout', |
|
134 |
}, |
|
135 |
{ |
|
136 |
'name': N_('Production citizens'), |
|
137 |
'slug': 'prod-particulier', |
|
138 |
'authorization_url': 'https://app.franceconnect.gouv.fr/api/v1/authorize', |
|
139 |
'token_url': 'https://app.franceconnect.gouv.fr/api/v1/token', |
|
140 |
'user_info_url': 'https://app.franceconnect.gouv.fr/api/v1/userinfo', |
|
141 |
'logout_url': 'https://app.franceconnect.gouv.fr/api/v1/logout', |
|
142 |
} |
|
143 |
] |
|
144 | ||
145 |
CONFIG = [ |
|
146 |
('client_id', N_('Client ID')), |
|
147 |
('client_secret', N_('Client secret')), |
|
148 |
('platform', N_('Platform')), |
|
149 |
('scopes', N_('Scopes')), |
|
150 |
('user_field_mappings', N_('User field mappings')), |
|
151 |
] |
|
152 | ||
153 |
KNOWN_ATTRIBUTES = [ |
|
154 |
('given_name', N_('first names separated by spaces')), |
|
155 |
('family_name', N_('birth\'s last name')), |
|
156 |
('birthdate', N_('birthdate formatted as YYYY-MM-DD')), |
|
157 |
('gender', N_('gender \'male\' for men, and \'female\' for women')), |
|
158 |
('birthplace', N_('INSEE code of the place of birth')), |
|
159 |
('birthcountry', N_('INSEE code of the country of birth')), |
|
160 |
('email', N_('email')), |
|
161 |
('siret', N_('SIRET or SIREN number of the enterprise')), |
|
162 |
# Note: FranceConnect website also refer to adress and phones attributes |
|
163 |
# but we don't know what must be expected of their value. |
|
164 |
] |
|
165 | ||
166 |
@classmethod |
|
167 |
def get_form(cls, instance={}): |
|
168 |
form = Form(enctype='multipart/form-data') |
|
169 |
for key, title in cls.CONFIG: |
|
170 |
attrs = {} |
|
171 |
default = None |
|
172 |
hint = None |
|
173 |
kwargs = {} |
|
174 |
widget = StringWidget |
|
175 | ||
176 |
if key == 'user_field_mappings': |
|
177 |
widget = UserFieldMappingTableWidget |
|
178 |
elif key == 'platform': |
|
179 |
widget = SingleSelectWidget |
|
180 |
kwargs['options'] = [ |
|
181 |
(platform['slug'], platform['name']) for platform in cls.PLAFTORMS |
|
182 |
] |
|
183 |
elif key == 'scopes': |
|
184 |
default = 'identite_pivot address email phones' |
|
185 |
hint = _('Space separated values among: identite_pivot, address, email, phones, ' |
|
186 |
'profile, birth, preferred_username, gender, birthdate, ' |
|
187 |
'birthcountry, birthplace') |
|
188 |
if widget == StringWidget: |
|
189 |
kwargs['size'] = '80' |
|
190 |
form.add(widget, key, title=_(title), hint=hint, required=True, |
|
191 |
value=instance.get(key, default), |
|
192 |
attrs=attrs, **kwargs) |
|
193 |
form.add_submit('submit', _('Submit')) |
|
194 |
form.add_submit('cancel', _('Cancel')) |
|
195 | ||
196 |
return form |
|
197 | ||
198 |
def submit(self, form): |
|
199 |
cfg = {} |
|
200 |
for key, title in self.CONFIG: |
|
201 |
cfg[key] = form.get_widget(key).parse() |
|
202 |
get_publisher().cfg['fc'] = cfg |
|
203 |
get_publisher().write_cfg() |
|
204 |
return redirect('../..') |
|
205 | ||
206 |
def _q_index(self): |
|
207 |
fc_cfg = get_cfg('fc', {}) |
|
208 |
form = self.get_form(fc_cfg) |
|
209 |
pub = get_publisher() |
|
210 | ||
211 |
if form.get_submit() == 'cancel': |
|
212 |
return redirect('../..') |
|
213 | ||
214 |
if 'submit' in get_request().form and form.is_submitted() and not form.has_errors(): |
|
215 |
return self.submit(form) |
|
216 | ||
217 |
html_top('settings', title=_(self.title)) |
|
218 |
r = TemplateIO(html=True) |
|
219 |
r += htmltext('<h2>%s</h2>') % self.title |
|
220 |
fc_callback = pub.get_frontoffice_url() + '/ident/fc/callback' |
|
221 |
r += htmltext('<p>') |
|
222 |
r += _('Callback URL is %s.') % fc_callback |
|
223 |
r += htmltext('</p>') |
|
224 |
r += htmltext('<p>') |
|
225 |
r += htmltext(_('See <a href="https://franceconnect.gouv.fr/fournisseur-service">' |
|
226 |
'FranceConnect partners\'site</a> for getting a client_id and ' |
|
227 |
'a client_secret.')) |
|
228 |
r += htmltext('</p>') |
|
229 |
r += form.render() |
|
230 |
r += htmltext('<div><p>') |
|
231 |
r += htmltext(_('See <a ' |
|
232 |
'href="https://franceconnect.gouv.fr/fournisseur-service#identite-pivot" ' |
|
233 |
'>FranceConnect partners\'site</a> for more ' |
|
234 |
'informations on available scopes and attributes. Known ones ' |
|
235 |
'are:')) |
|
236 |
r += htmltext('</p>') |
|
237 |
r += htmltext('<table class="franceconnect-attrs"><thead>' |
|
238 |
'<tr><th>%s</th><th>%s</th></tr></thead><tbody>') % ( |
|
239 |
_('Attribute'), _('Description')) |
|
240 |
for attribute, description in self.KNOWN_ATTRIBUTES: |
|
241 |
r += htmltext('<tr><td><code>%s</code></td><td>%s</td></tr>') % (attribute, _(description)) |
|
242 |
r += htmltext('</tbody></table></div>') |
|
243 | ||
244 |
return r.getvalue() |
|
245 | ||
246 | ||
247 |
class FCAuthMethod(AuthMethod): |
|
248 |
key = 'fc' |
|
249 |
description = ADMIN_TITLE |
|
250 |
method_directory = MethodDirectory |
|
251 |
method_admin_directory = MethodAdminDirectory |
|
252 | ||
253 |
def is_ok(self): |
|
254 |
fc_cfg = get_cfg('fc', {}) |
|
255 |
for key, title in self.method_admin_directory.CONFIG: |
|
256 |
if not fc_cfg.get(key): |
|
257 |
return False |
|
258 |
return True |
|
259 | ||
260 |
def login(self): |
|
261 |
if not self.is_ok(): |
|
262 |
return template.error_page(_('FranceConnect support is not yet configured.')) |
|
263 | ||
264 |
fc_cfg = get_cfg('fc', {}) |
|
265 |
pub = get_publisher() |
|
266 |
session = get_session() |
|
267 | ||
268 |
authorization_url = self.get_authorization_url() |
|
269 |
client_id = fc_cfg.get('client_id') |
|
270 |
state = str(uuid.uuid4()) |
|
271 |
session = get_session() |
|
272 |
next_url = get_request().form.get('next') or pub.get_frontoffice_url() |
|
273 |
session.extra_user_variables = session.extra_user_variables or {} |
|
274 |
session.extra_user_variables['fc_next_url_' + state] = next_url |
|
275 | ||
276 |
# generate a session id if none exists, ugly but necessary |
|
277 |
get_session_manager().maintain_session(session) |
|
278 | ||
279 |
nonce = hashlib.sha256(str(session.id)).hexdigest() |
|
280 |
fc_callback = pub.get_frontoffice_url() + '/ident/fc/callback' |
|
281 |
qs = urllib.urlencode({ |
|
282 |
'response_type': 'code', |
|
283 |
'client_id': client_id, |
|
284 |
'redirect_uri': fc_callback, |
|
285 |
'scope': 'openid ' + fc_cfg.get('scopes', ''), |
|
286 |
'state': state, |
|
287 |
'nonce': nonce, |
|
288 |
}) |
|
289 |
redirect_url = '%s?%s' % (authorization_url, qs) |
|
290 |
return redirect(redirect_url) |
|
291 | ||
292 |
def is_interactive(self): |
|
293 |
return False |
|
294 | ||
295 |
def get_access_token(self, code): |
|
296 |
logger = get_logger() |
|
297 |
session = get_session() |
|
298 |
fc_cfg = get_cfg('fc', {}) |
|
299 |
client_id = fc_cfg.get('client_id') |
|
300 |
client_secret = fc_cfg.get('client_secret') |
|
301 |
redirect_uri = get_request().get_frontoffice_url().split('?')[0] |
|
302 |
body = { |
|
303 |
'grant_type': 'authorization_code', |
|
304 |
'redirect_uri': redirect_uri, |
|
305 |
'client_id': client_id, |
|
306 |
'client_secret': client_secret, |
|
307 |
'code': code, |
|
308 |
} |
|
309 |
response, status, data, auth_header = http_post_request( |
|
310 |
self.get_token_url(), |
|
311 |
urllib.urlencode(body), |
|
312 |
headers={ |
|
313 |
'Content-Type': 'application/x-www-form-urlencoded', |
|
314 |
}) |
|
315 |
if status != 200: |
|
316 |
logger.error('status from FranceConnect token_url is not 200') |
|
317 |
return None |
|
318 |
result = json_loads(data) |
|
319 |
if 'error' in result: |
|
320 |
logger.error('FranceConnect code resolution failed: %s', result['error']) |
|
321 |
return None |
|
322 |
# check id_token nonce |
|
323 |
id_token = result['id_token'] |
|
324 |
header, payload, signature = id_token.split('.') |
|
325 |
payload = json_loads(base64url_decode(payload)) |
|
326 |
nonce = hashlib.sha256(str(session.id)).hexdigest() |
|
327 |
if payload['nonce'] != nonce: |
|
328 |
logger.error('FranceConnect returned nonce did not match') |
|
329 |
return None |
|
330 |
return result['access_token'] |
|
331 | ||
332 |
def get_user_info(self, access_token): |
|
333 |
logger = get_logger() |
|
334 |
response, status, data, auth_header = http_get_page( |
|
335 |
self.get_user_info_url(), |
|
336 |
headers={ |
|
337 |
'Authorization': 'Bearer %s' % access_token, |
|
338 |
}) |
|
339 |
if status != 200: |
|
340 |
logger.error('status from FranceConnect user_info_url is not 200 but %s and data is' |
|
341 |
' %s', status. data[:100]) |
|
342 |
return None |
|
343 |
return json_loads(data) |
|
344 | ||
345 |
def get_platform(self): |
|
346 |
fc_cfg = get_cfg('fc', {}) |
|
347 |
slug = fc_cfg.get('platform') |
|
348 |
for platform in self.method_admin_directory.PLAFTORMS: |
|
349 |
if platform['slug'] == slug: |
|
350 |
return platform |
|
351 |
raise KeyError('platform %s not found' % slug) |
|
352 | ||
353 |
def get_authorization_url(self): |
|
354 |
return self.get_platform()['authorization_url'] |
|
355 | ||
356 |
def get_token_url(self): |
|
357 |
return self.get_platform()['token_url'] |
|
358 | ||
359 |
def get_user_info_url(self): |
|
360 |
return self.get_platform()['user_info_url'] |
|
361 | ||
362 |
def get_logout_url(self): |
|
363 |
return self.get_platform()['logout_url'] |
|
364 | ||
365 |
def fill_user_attributes(self, user, user_info): |
|
366 |
fc_cfg = get_cfg('fc', {}) |
|
367 |
user_field_mappings = fc_cfg.get('user_field_mappings', []) |
|
368 |
user_formdef = get_publisher().user_class.get_formdef() |
|
369 | ||
370 |
form_data = user.form_data or {} |
|
371 |
user.verified_fields = user.verified_fields or [] |
|
372 | ||
373 |
for user_field_mapping in user_field_mappings: |
|
374 |
field_varname = user_field_mapping['field_varname'] |
|
375 |
value = user_field_mapping['value'] |
|
376 |
verified = user_field_mapping['verified'] |
|
377 |
field_id = None |
|
378 | ||
379 |
try: |
|
380 |
value = WorkflowStatusItem.compute(value, context=user_info) |
|
381 |
except Exception, e: |
|
382 |
get_publisher().notify_of_exception(sys.exc_info(), context='[FC-user-compute]') |
|
383 |
continue |
|
384 |
if field_varname == '__name': |
|
385 |
user.name = value |
|
386 |
elif field_varname == '__email': |
|
387 |
user.email = value |
|
388 |
field_id = 'email' # special value for verified email field |
|
389 |
else: |
|
390 |
for field in user_formdef.fields: |
|
391 |
if field_varname == field.varname: |
|
392 |
field_id = str(field.id) |
|
393 |
break |
|
394 |
else: |
|
395 |
continue |
|
396 |
form_data[field.id] = value |
|
397 |
# Update verified fields |
|
398 |
if field_id: |
|
399 |
if verified == 'always' and field_id not in user.verified_fields: |
|
400 |
user.verified_fields.append(field_id) |
|
401 |
elif verified != 'always' and field_id in user.verified_fields: |
|
402 |
user.verified_fields.remove(field_id) |
|
403 | ||
404 |
user.form_data = form_data |
|
405 | ||
406 |
if user.form_data: |
|
407 |
user.set_attributes_from_formdata(user.form_data) |
|
408 | ||
409 |
AUTHORIZATION_REQUEST_ERRORS = { |
|
410 |
'access_denied': N_('user did not authorize login'), |
|
411 |
} |
|
412 | ||
413 |
def callback(self): |
|
414 |
if not self.is_ok(): |
|
415 |
return template.error_page(_('FranceConnect support is not yet configured.')) |
|
416 |
pub = get_publisher() |
|
417 |
request = get_request() |
|
418 |
session = get_session() |
|
419 |
logger = get_logger() |
|
420 |
state = request.form.get('state', '') |
|
421 |
next_url = ((session.extra_user_variables or {}).pop('fc_next_url_' + state, '') |
|
422 |
or pub.get_frontoffice_url()) |
|
423 | ||
424 |
if 'code' not in request.form: |
|
425 |
error = request.form.get('error') |
|
426 |
# if no error parameter, we stay silent |
|
427 |
if error: |
|
428 |
# we log only errors whose user is not responsible |
|
429 |
msg = self.AUTHORIZATION_REQUEST_ERRORS.get(error) |
|
430 |
logger.error(_('FranceConnect authentication failed: %s'), |
|
431 |
_(msg) if msg else error) |
|
432 |
return redirect(next_url) |
|
433 |
access_token = self.get_access_token(request.form['code']) |
|
434 |
if not access_token: |
|
435 |
return redirect(next_url) |
|
436 |
user_info = self.get_user_info(access_token) |
|
437 |
if not user_info: |
|
438 |
return redirect(next_url) |
|
439 |
# Store user info in session |
|
440 |
flattened_user_info = user_info.copy() |
|
441 |
flatten_dict(flattened_user_info) |
|
442 |
session_var_fc_user = {} |
|
443 |
for key in flattened_user_info: |
|
444 |
session_var_fc_user['fc_' + key] = flattened_user_info[key] |
|
445 | ||
446 |
# Lookup or create user |
|
447 |
sub = user_info['sub'] |
|
448 |
user = None |
|
449 |
for user in pub.user_class.get_users_with_name_identifier(sub): |
|
450 |
break |
|
451 |
if not user: |
|
452 |
user = pub.user_class(sub) |
|
453 |
user.name_identifiers = [sub] |
|
454 | ||
455 |
self.fill_user_attributes(user, user_info) |
|
456 | ||
457 |
if not (user.name and user.email): |
|
458 |
# we didn't get useful attributes, forget it. |
|
459 |
logger.error('failed to get name and/or email attribute from FranceConnect') |
|
460 |
return redirect(next_url) |
|
461 | ||
462 |
user.store() |
|
463 |
session.set_user(user.id) |
|
464 |
session.extra_user_variables = session_var_fc_user |
|
465 |
return redirect(next_url) |
wcs/qommon/publisher.py | ||
---|---|---|
674 | 674 |
if lasso: |
675 | 675 |
import qommon.ident.idp |
676 | 676 |
classes.append(qommon.ident.idp.IdPAuthMethod) |
677 |
import qommon.ident.franceconnect |
|
678 |
classes.append(qommon.ident.franceconnect.FCAuthMethod) |
|
677 | 679 |
import qommon.ident.password |
678 | 680 |
classes.append(qommon.ident.password.PasswordAuthMethod) |
679 | 681 |
self.ident_methods = {} |
wcs/qommon/static/css/dc2/admin.css | ||
---|---|---|
778 | 778 |
border-bottom: 1px solid #bcbcbc; |
779 | 779 |
} |
780 | 780 | |
781 |
table.franceconnect-attrs, |
|
781 | 782 |
table#substvars { |
782 | 783 |
border-bottom: 1px solid #eee; |
783 | 784 |
border-collapse: collapse; |
784 | 785 |
margin: 1em 0; |
785 | 786 |
} |
786 | 787 | |
788 |
table.franceconnect-attrs th, |
|
787 | 789 |
table#substvars th { |
788 | 790 |
background: #eee; |
789 | 791 |
border: 1px solid #eee; |
... | ... | |
792 | 794 |
padding: 0 1em; |
793 | 795 |
} |
794 | 796 | |
797 |
table.franceconnect-attrs td, |
|
795 | 798 |
table#substvars td { |
796 | 799 |
border: 1px solid #eee; |
797 | 800 |
padding: 0 1em; |
798 |
- |