0001-add-FranceConnect-authentication-method-14510.patch
tests/test_fc_auth.py | ||
---|---|---|
1 |
import urlparse |
|
2 |
import base64 |
|
3 |
import json |
|
4 |
import urllib |
|
5 | ||
6 |
from quixote import cleanup, get_session_manager |
|
7 | ||
8 |
from utilities import get_app, create_temporary_pub |
|
9 |
import mock |
|
10 | ||
11 |
PROFILE = { |
|
12 |
'fields': [ |
|
13 |
{ |
|
14 |
'kind': 'string', |
|
15 |
'description': '', |
|
16 |
'required': True, |
|
17 |
'user_visible': True, |
|
18 |
'label': u'Prenoms', |
|
19 |
'disabled': False, |
|
20 |
'user_editable': True, |
|
21 |
'asked_on_registration': True, |
|
22 |
'name': 'prenoms' |
|
23 |
}, |
|
24 |
{ |
|
25 |
'kind': 'string', |
|
26 |
'description': '', |
|
27 |
'required': True, |
|
28 |
'user_visible': True, |
|
29 |
'label': 'Nom', |
|
30 |
'disabled': False, |
|
31 |
'user_editable': True, |
|
32 |
'asked_on_registration': True, |
|
33 |
'name': 'nom' |
|
34 |
}, |
|
35 |
{ |
|
36 |
'kind': 'string', |
|
37 |
'description': '', |
|
38 |
'required': True, |
|
39 |
'user_visible': True, |
|
40 |
'label': 'Email', |
|
41 |
'disabled': False, |
|
42 |
'user_editable': True, |
|
43 |
'asked_on_registration': True, |
|
44 |
'name': 'email' |
|
45 |
}, |
|
46 |
] |
|
47 |
} |
|
48 | ||
49 | ||
50 |
def base64url_encode(v): |
|
51 |
return base64.urlsafe_b64encode(v).strip('=') |
|
52 | ||
53 | ||
54 |
def setup_module(module): |
|
55 |
cleanup() |
|
56 |
global pub |
|
57 |
pub = create_temporary_pub() |
|
58 | ||
59 | ||
60 |
def setup_user_profile(pub): |
|
61 |
if not pub.cfg: |
|
62 |
pub.cfg = {} |
|
63 |
# create some roles |
|
64 |
from wcs.ctl.check_hobos import CmdCheckHobos |
|
65 | ||
66 |
# setup an hobo profile |
|
67 |
CmdCheckHobos().update_profile(PROFILE, pub) |
|
68 |
pub.cfg['users']['field_name'] = ['_prenoms', '_nom'] |
|
69 |
pub.user_class.wipe() |
|
70 |
pub.write_cfg() |
|
71 | ||
72 |
FC_CONFIG = { |
|
73 |
'client_id': '123', |
|
74 |
'client_secret': 'xyz', |
|
75 |
'platform': 'dev-particulier', |
|
76 |
'scopes': 'identite_pivot', |
|
77 |
'user_field_mappings': [ |
|
78 |
{ |
|
79 |
'field_varname': 'prenoms', |
|
80 |
'value': '[given_name ""]', |
|
81 |
'verified': 'always', |
|
82 |
}, |
|
83 |
{ |
|
84 |
'field_varname': 'nom', |
|
85 |
'value': '[family_name ""]', |
|
86 |
'verified': 'always', |
|
87 |
}, |
|
88 |
{ |
|
89 |
'field_varname': 'email', |
|
90 |
'value': '[email ""]', |
|
91 |
'verified': 'always', |
|
92 |
}, |
|
93 |
] |
|
94 | ||
95 |
} |
|
96 | ||
97 | ||
98 |
def setup_fc_environment(pub): |
|
99 |
if not pub.cfg: |
|
100 |
pub.cfg = {} |
|
101 |
pub.cfg['identification'] = { |
|
102 |
'methods': ['fc'], |
|
103 |
} |
|
104 |
pub.cfg['fc'] = FC_CONFIG |
|
105 |
pub.user_class.wipe() |
|
106 |
pub.write_cfg() |
|
107 | ||
108 | ||
109 |
def test_fc_login_page(): |
|
110 |
setup_user_profile(pub) |
|
111 |
setup_fc_environment(pub) |
|
112 |
app = get_app(pub) |
|
113 |
resp = app.get('/') |
|
114 |
resp = app.get('/login/') |
|
115 |
assert resp.status_int == 302 |
|
116 |
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize') |
|
117 |
qs = urlparse.parse_qs(resp.location.split('?')[1]) |
|
118 |
nonce = qs['nonce'][0] |
|
119 |
state = qs['state'][0] |
|
120 | ||
121 |
id_token = { |
|
122 |
'nonce': nonce, |
|
123 |
} |
|
124 |
token_result = { |
|
125 |
'access_token': 'abcd', |
|
126 |
'id_token': '.%s.' % base64url_encode(json.dumps(id_token)), |
|
127 |
} |
|
128 |
user_info_result = { |
|
129 |
'sub': 'ymca', |
|
130 |
'given_name': 'John', |
|
131 |
'family_name': 'Doe', |
|
132 |
'email': 'john.doe@example.com', |
|
133 |
} |
|
134 | ||
135 |
assert pub.user_class.count() == 0 |
|
136 |
with mock.patch('qommon.ident.franceconnect.http_post_request') as http_post_request, \ |
|
137 |
mock.patch('qommon.ident.franceconnect.http_get_page') as http_get_page: |
|
138 |
http_post_request.return_value = (None, 200, json.dumps(token_result), None) |
|
139 |
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None) |
|
140 |
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({ |
|
141 |
'code': '1234', 'state': state, |
|
142 |
})) |
|
143 |
assert pub.user_class.count() == 1 |
|
144 |
user = pub.user_class.select()[0] |
|
145 |
assert user.form_data == {'_email': 'john.doe@example.com', '_nom': 'Doe', '_prenoms': 'John'} |
|
146 |
assert set(user.verified_fields) == set(['_nom', '_prenoms', '_email']) |
|
147 |
assert user.email == 'john.doe@example.com' |
|
148 |
assert user.name_identifiers == ['ymca'] |
|
149 |
assert user.name == 'John Doe' |
|
150 | ||
151 |
# Verify we are logged in |
|
152 |
session_id = app.cookies.values()[0].strip('"') |
|
153 |
session = get_session_manager().session_class.get(session_id) |
|
154 |
assert session.user == user.id |
|
155 |
assert session.extra_variables['fc_user_given_name'] == 'John' |
|
156 |
assert session.extra_variables['fc_user_family_name'] == 'Doe' |
|
157 |
assert session.extra_variables['fc_user_email'] == 'john.doe@example.com' |
|
158 |
assert session.extra_variables['fc_user_sub'] == 'ymca' |
|
159 |
assert session.display_message() == '' |
|
160 | ||
161 |
# Login existing user |
|
162 |
resp = app.get('/logout') |
|
163 |
resp = app.get('/login/') |
|
164 |
assert resp.status_int == 302 |
|
165 |
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize') |
|
166 |
qs = urlparse.parse_qs(resp.location.split('?')[1]) |
|
167 |
state = qs['state'][0] |
|
168 |
id_token['nonce'] = qs['nonce'][0] |
|
169 |
token_result['id_token'] = '.%s.' % base64url_encode(json.dumps(id_token)) |
|
170 | ||
171 |
with mock.patch('qommon.ident.franceconnect.http_post_request') as http_post_request, \ |
|
172 |
mock.patch('qommon.ident.franceconnect.http_get_page') as http_get_page: |
|
173 |
http_post_request.return_value = (None, 200, json.dumps(token_result), None) |
|
174 |
http_get_page.return_value = (None, 200, json.dumps(user_info_result), None) |
|
175 |
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({ |
|
176 |
'code': '1234', 'state': state, |
|
177 |
})) |
|
178 |
new_session_id = app.cookies.values()[0].strip('"') |
|
179 |
assert session_id != new_session_id, 'no new session created' |
|
180 |
session = get_session_manager().session_class.get(new_session_id) |
|
181 |
assert pub.user_class.count() == 1, 'existing user has not been used' |
|
182 |
session_id = app.cookies.values()[0].strip('"') |
|
183 |
session = get_session_manager().session_class.get(session_id) |
|
184 |
assert session.user == user.id |
|
185 |
assert session.display_message() == '' |
|
186 | ||
187 |
# User with missing attributes |
|
188 |
resp = app.get('/logout') |
|
189 |
resp = app.get('/login/') |
|
190 |
assert resp.status_int == 302 |
|
191 |
assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize') |
|
192 |
qs = urlparse.parse_qs(resp.location.split('?')[1]) |
|
193 |
state = qs['state'][0] |
|
194 |
id_token['nonce'] = qs['nonce'][0] |
|
195 |
token_result['id_token'] = '.%s.' % base64url_encode(json.dumps(id_token)) |
|
196 |
bad_user_info_result = { |
|
197 |
'sub': 'ymca2', |
|
198 |
'given_name': 'John', |
|
199 |
'family_name': 'Deux', |
|
200 |
# 'email': 'john.deux@example.com', # missing |
|
201 |
} |
|
202 |
with mock.patch('qommon.ident.franceconnect.http_post_request') as http_post_request, \ |
|
203 |
mock.patch('qommon.ident.franceconnect.http_get_page') as http_get_page: |
|
204 |
http_post_request.return_value = (None, 200, json.dumps(token_result), None) |
|
205 |
http_get_page.return_value = (None, 200, json.dumps(bad_user_info_result), None) |
|
206 |
resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({ |
|
207 |
'code': '1234', 'state': state, |
|
208 |
})) |
|
209 |
assert pub.user_class.count() == 1, 'an invalid user (no email) has been created' |
|
210 |
session_id = app.cookies.values()[0].strip('"') |
|
211 |
session = get_session_manager().session_class.get(session_id) |
|
212 |
assert session.user is None |
|
213 |
assert 'FranceConnect authentication failed: missing name or email' in str(session.display_message()) |
|
214 | ||
215 |
def test_fc_settings(): |
|
216 |
setup_user_profile(pub) |
|
217 |
app = get_app(pub) |
|
218 |
resp = app.get('/backoffice/settings/identification/') |
|
219 |
resp.forms[0]['methods$elementfc'].checked = True |
|
220 |
resp = resp.forms[0].submit().follow() |
|
221 | ||
222 |
assert 'FranceConnect' in resp.body |
|
223 |
resp = resp.click('FranceConnect') |
|
224 |
resp = resp.forms[0].submit('user_field_mappings$add_element') |
|
225 |
resp = resp.forms[0].submit('user_field_mappings$add_element') |
|
226 |
resp.forms[0]['client_id'].value = '123' |
|
227 |
resp.forms[0]['client_secret'].value = 'xyz' |
|
228 |
resp.forms[0]['platform'].value = 'Development citizens' |
|
229 |
resp.forms[0]['scopes'].value = 'identite_pivot' |
|
230 | ||
231 |
resp.forms[0]['user_field_mappings$element0$field_varname'] = 'prenoms' |
|
232 |
resp.forms[0]['user_field_mappings$element0$value'] = '[given_name ""]' |
|
233 |
resp.forms[0]['user_field_mappings$element0$verified'] = 'Always' |
|
234 | ||
235 |
resp.forms[0]['user_field_mappings$element1$field_varname'] = 'nom' |
|
236 |
resp.forms[0]['user_field_mappings$element1$value'] = '[family_name ""]' |
|
237 |
resp.forms[0]['user_field_mappings$element1$verified'] = 'Always' |
|
238 | ||
239 |
resp.forms[0]['user_field_mappings$element2$field_varname'] = 'email' |
|
240 |
resp.forms[0]['user_field_mappings$element2$value'] = '[email ""]' |
|
241 |
resp.forms[0]['user_field_mappings$element2$verified'] = 'Always' |
|
242 | ||
243 |
resp = resp.forms[0].submit('submit').follow() |
|
244 |
assert pub.cfg['fc'] == FC_CONFIG |
wcs/admin/settings.py | ||
---|---|---|
71 | 71 |
if lasso is not None: |
72 | 72 |
methods.insert(0, |
73 | 73 |
('idp', _('Delegated to SAML identity provider'), 'idp')) |
74 |
methods.insert(0, ('fc', _('Delegated to FranceConnect'), 'fc')) |
|
74 | 75 |
form.add(CheckboxesWidget, 'methods', title = _('Methods'), |
75 | 76 |
value=identification_cfg.get('methods'), |
76 | 77 |
options=methods, |
wcs/qommon/ident/franceconnect.py | ||
---|---|---|
1 |
# w.c.s. - web application for online forms |
|
2 |
# Copyright (C) 2005-2017 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software; you can redistribute it and/or modify |
|
5 |
# it under the terms of the GNU General Public License as published by |
|
6 |
# the Free Software Foundation; either version 2 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU General Public License |
|
15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import json |
|
18 |
import urllib |
|
19 |
import uuid |
|
20 |
import hashlib |
|
21 |
import base64 |
|
22 | ||
23 |
from quixote import redirect, get_session, get_publisher, get_request, get_session_manager |
|
24 |
from quixote.directory import Directory |
|
25 |
from quixote.html import htmltext, TemplateIO |
|
26 | ||
27 |
from qommon import _ |
|
28 |
from qommon.backoffice.menu import html_top |
|
29 |
from qommon import template, get_cfg, get_logger |
|
30 |
from qommon.form import (Form, StringWidget, CompositeWidget, ComputedExpressionWidget, |
|
31 |
RadiobuttonsWidget, SingleSelectWidget, WidgetListAsTable) |
|
32 |
from qommon.misc import http_post_request, http_get_page, json_loads |
|
33 |
from .base import AuthMethod |
|
34 | ||
35 |
from wcs.workflows import WorkflowStatusItem |
|
36 |
from wcs.formdata import flatten_dict |
|
37 | ||
38 | ||
39 |
ADMIN_TITLE = N_('FranceConnect') |
|
40 | ||
41 |
# XXX: make an OIDC auth method that FranceConnect would inherit from |
|
42 | ||
43 | ||
44 |
def base64url_decode(input): |
|
45 |
rem = len(input) % 4 |
|
46 |
if rem > 0: |
|
47 |
input += b'=' * (4 - rem) |
|
48 |
return base64.urlsafe_b64decode(input) |
|
49 | ||
50 | ||
51 |
class UserFieldMappingRowWidget(CompositeWidget): |
|
52 |
def __init__(self, name, value=None, **kwargs): |
|
53 |
CompositeWidget.__init__(self, name, value, **kwargs) |
|
54 |
if not value: |
|
55 |
value = {} |
|
56 | ||
57 |
fields = [] |
|
58 |
users_cfg = get_cfg('users', {}) |
|
59 |
user_formdef = get_publisher().user_class.get_formdef() |
|
60 |
if not user_formdef or not users_cfg.get('field_name'): |
|
61 |
fields.append(('__name', _('Name'), '__name')) |
|
62 |
if not user_formdef or not users_cfg.get('field_email'): |
|
63 |
fields.append(('__email', _('Email'), '__email')) |
|
64 |
if user_formdef and user_formdef.fields: |
|
65 |
for field in user_formdef.fields: |
|
66 |
if field.varname: |
|
67 |
fields.append((field.varname, field.label, field.varname)) |
|
68 | ||
69 |
self.add(SingleSelectWidget, name='field_varname', title=_('Field'), |
|
70 |
value=value.get('field_varname'), |
|
71 |
options=fields, **kwargs) |
|
72 |
self.add(ComputedExpressionWidget, name='value', title=_('Value'), |
|
73 |
value=value.get('value')) |
|
74 |
self.add(SingleSelectWidget, 'verified', |
|
75 |
title=_('Is attribute verified'), |
|
76 |
value=value.get('verified'), |
|
77 |
options=[('never', _('Never')), |
|
78 |
('always', _('Always')) |
|
79 |
] |
|
80 |
) |
|
81 | ||
82 |
def _parse(self, request): |
|
83 |
if self.get('value') and self.get('field_varname') and self.get('verified'): |
|
84 |
self.value = { |
|
85 |
'value': self.get('value'), |
|
86 |
'field_varname': self.get('field_varname'), |
|
87 |
'verified': self.get('verified'), |
|
88 |
} |
|
89 |
else: |
|
90 |
self.value = None |
|
91 | ||
92 | ||
93 |
class UserFieldMappingTableWidget(WidgetListAsTable): |
|
94 |
readonly = False |
|
95 | ||
96 |
def __init__(self, name, **kwargs): |
|
97 |
super(UserFieldMappingTableWidget, self).__init__( |
|
98 |
name, element_type=UserFieldMappingRowWidget, **kwargs) |
|
99 | ||
100 | ||
101 |
class MethodDirectory(Directory): |
|
102 |
_q_exports = ['login', 'callback'] |
|
103 | ||
104 |
def login(self): |
|
105 |
return FCAuthMethod().login() |
|
106 | ||
107 |
def callback(self): |
|
108 |
return FCAuthMethod().callback() |
|
109 | ||
110 | ||
111 |
class MethodAdminDirectory(Directory): |
|
112 |
title = ADMIN_TITLE |
|
113 |
label = N_('Configure FranceConnect identification method') |
|
114 | ||
115 |
_q_exports = [''] |
|
116 | ||
117 |
PLAFTORMS = [ |
|
118 |
{ |
|
119 |
'name': N_('Development citizens'), |
|
120 |
'slug': 'dev-particulier', |
|
121 |
'authorization_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize', |
|
122 |
'token_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/token', |
|
123 |
'user_info_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/userinfo', |
|
124 |
'logout_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/logout', |
|
125 |
}, |
|
126 |
{ |
|
127 |
'name': N_('Development enterprise'), |
|
128 |
'slug': 'dev-entreprise', |
|
129 |
'authorization_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/authorize', |
|
130 |
'token_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/token', |
|
131 |
'user_info_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/userinfo', |
|
132 |
'logout_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/logout', |
|
133 |
}, |
|
134 |
{ |
|
135 |
'name': N_('Production citizens'), |
|
136 |
'slug': 'prod-particulier', |
|
137 |
'authorization_url': 'https://app.franceconnect.gouv.fr/api/v1/authorize', |
|
138 |
'token_url': 'https://app.franceconnect.gouv.fr/api/v1/token', |
|
139 |
'user_info_url': 'https://app.franceconnect.gouv.fr/api/v1/userinfo', |
|
140 |
'logout_url': 'https://app.franceconnect.gouv.fr/api/v1/logout', |
|
141 |
} |
|
142 |
] |
|
143 | ||
144 |
CONFIG = [ |
|
145 |
('client_id', N_('Client ID')), |
|
146 |
('client_secret', N_('Client secret')), |
|
147 |
('platform', N_('Platform')), |
|
148 |
('scopes', N_('Scopes')), |
|
149 |
('user_field_mappings', N_('User field mappings')), |
|
150 |
] |
|
151 | ||
152 |
KNOWN_ATTRIBUTES = [ |
|
153 |
('given_name', N_('first names separated by spaces')), |
|
154 |
('family_name', N_('birth\'s last name')), |
|
155 |
('birthdate', N_('birthdate formatted as YYYY-MM-DD')), |
|
156 |
('gender', N_('gender \'male\' for men, and \'female\' for women')), |
|
157 |
('birthplace', N_('INSEE code of the place of birth')), |
|
158 |
('birthcountry', N_('INSEE code of the country of birth')), |
|
159 |
('email', N_('email')), |
|
160 |
('siret', N_('SIRET or SIREN number of the enterprise')), |
|
161 |
# Note: FranceConnect website also refer to adress and phones attributes |
|
162 |
# but we don't know what must be expected of their value. |
|
163 |
] |
|
164 | ||
165 |
@classmethod |
|
166 |
def get_form(cls, instance={}): |
|
167 |
form = Form(enctype='multipart/form-data') |
|
168 |
for key, title in cls.CONFIG: |
|
169 |
attrs = {} |
|
170 |
default = None |
|
171 |
hint = None |
|
172 |
kwargs = {} |
|
173 |
widget = StringWidget |
|
174 | ||
175 |
if key == 'user_field_mappings': |
|
176 |
widget = UserFieldMappingTableWidget |
|
177 |
elif key == 'platform': |
|
178 |
widget = SingleSelectWidget |
|
179 |
kwargs['options'] = [ |
|
180 |
(platform['slug'], platform['name']) for platform in cls.PLAFTORMS |
|
181 |
] |
|
182 |
elif key == 'scopes': |
|
183 |
default = 'identite_pivot address email phones' |
|
184 |
hint = _('Space separated values among: identite_pivot, address, email, phones, ' |
|
185 |
'profile, birth, preferred_username, gender, birthdate, ' |
|
186 |
'birthcountry, birthplace') |
|
187 |
if widget == StringWidget: |
|
188 |
kwargs['size'] = '80' |
|
189 |
form.add(widget, key, |
|
190 |
title=_(title), |
|
191 |
hint=hint, |
|
192 |
value=instance.get(key, default), |
|
193 |
attrs=attrs, **kwargs) |
|
194 |
form.add_submit('submit', _('Submit')) |
|
195 |
return form |
|
196 | ||
197 |
def submit(self, form): |
|
198 |
cfg = {} |
|
199 |
for key, title in self.CONFIG: |
|
200 |
cfg[key] = form.get_widget(key).parse() |
|
201 |
get_publisher().cfg['fc'] = cfg |
|
202 |
get_publisher().write_cfg() |
|
203 |
return redirect('.') |
|
204 | ||
205 |
def _q_index(self): |
|
206 |
fc_cfg = get_cfg('fc', {}) |
|
207 |
form = self.get_form(fc_cfg) |
|
208 |
pub = get_publisher() |
|
209 | ||
210 |
if not ('submit' in get_request().form and form.is_submitted()) or form.has_errors(): |
|
211 |
html_top('settings', title=_(self.title)) |
|
212 |
r = TemplateIO(html=True) |
|
213 |
r += htmltext('<h2>%s</h2>') % self.title |
|
214 |
fc_callback = pub.get_frontoffice_url() + '/ident/fc/callback' |
|
215 |
r += htmltext(_('<p>Callback URL is <a href="%s">%s</a>.</p>')) % ( |
|
216 |
fc_callback, fc_callback) |
|
217 |
r += htmltext(_('See <a target="_blank" href="https://franceconnect.gouv.fr/fournisseur-service">' |
|
218 |
'FranceConnect partners\'site</a> for getting a client_id and ' |
|
219 |
'a client_secret.</p>')) |
|
220 |
r += form.render() |
|
221 |
r += htmltext('<div><p>See <a ' |
|
222 |
'href="https://franceconnect.gouv.fr/fournisseur-service#identite-pivot" ' |
|
223 |
'target="_blank">FranceConnect partners\'site</a> for more ' |
|
224 |
'informations on available scopes and attributes. Known ones ' |
|
225 |
'are :<p>') |
|
226 |
r += (htmltext('<table><thead><tr><th>%s</th><th>%s</th></tr></thead><tbody>') % |
|
227 |
(_('Attribute'), _('Description'))) |
|
228 |
for attribute, description in self.KNOWN_ATTRIBUTES: |
|
229 |
r += htmltext('<tr><td><pre>%s</pre></td><td>%s</td></tr>') % (attribute, _(description)) |
|
230 |
r += htmltext('</tbody></table></div>') |
|
231 | ||
232 |
return r.getvalue() |
|
233 |
else: |
|
234 |
return self.submit(form) |
|
235 | ||
236 | ||
237 |
class FCAuthMethod(AuthMethod): |
|
238 |
key = 'fc' |
|
239 |
description = ADMIN_TITLE |
|
240 |
method_directory = MethodDirectory |
|
241 |
method_admin_directory = MethodAdminDirectory |
|
242 | ||
243 |
def is_ok(self): |
|
244 |
fc_cfg = get_cfg('fc', {}) |
|
245 |
for key, title in self.method_admin_directory.CONFIG: |
|
246 |
if not fc_cfg.get(key): |
|
247 |
return False |
|
248 |
return True |
|
249 | ||
250 |
def login(self): |
|
251 |
if not self.is_ok(): |
|
252 |
return template.error_page(_('FranceConnect support is not yet configured')) |
|
253 | ||
254 |
fc_cfg = get_cfg('fc', {}) |
|
255 |
pub = get_publisher() |
|
256 |
session = get_session() |
|
257 | ||
258 |
authorization_url = self.get_authorization_url() |
|
259 |
client_id = fc_cfg.get('client_id') |
|
260 |
state = str(uuid.uuid4()) |
|
261 |
session = get_session() |
|
262 |
next_url = get_request().form.get('next') or pub.get_frontoffice_url() |
|
263 |
session.set_extra_attribute('fc_next_url_' + state, next_url) |
|
264 | ||
265 |
# generate a session id if none exists, ugly but necessary |
|
266 |
get_session_manager().maintain_session(session) |
|
267 | ||
268 |
nonce = hashlib.sha256(str(session.id)).hexdigest() |
|
269 |
fc_callback = pub.get_frontoffice_url() + '/ident/fc/callback' |
|
270 |
qs = urllib.urlencode({ |
|
271 |
'response_type': 'code', |
|
272 |
'client_id': client_id, |
|
273 |
'redirect_uri': fc_callback, |
|
274 |
'scope': 'openid ' + fc_cfg.get('scopes', ''), |
|
275 |
'state': state, |
|
276 |
'nonce': nonce, |
|
277 |
}) |
|
278 |
redirect_url = '%s?%s' % (authorization_url, qs) |
|
279 |
return redirect(redirect_url) |
|
280 | ||
281 |
def is_interactive(self): |
|
282 |
return False |
|
283 | ||
284 |
def get_access_token(self, code): |
|
285 |
logger = get_logger() |
|
286 |
session = get_session() |
|
287 |
fc_cfg = get_cfg('fc', {}) |
|
288 |
client_id = fc_cfg.get('client_id') |
|
289 |
client_secret = fc_cfg.get('client_secret') |
|
290 |
redirect_uri = get_request().get_frontoffice_url().split('?')[0] |
|
291 |
body = { |
|
292 |
'grant_type': 'authorization_code', |
|
293 |
'redirect_uri': redirect_uri, |
|
294 |
'client_id': client_id, |
|
295 |
'client_secret': client_secret, |
|
296 |
'code': code, |
|
297 |
} |
|
298 |
response, status, data, auth_header = http_post_request( |
|
299 |
self.get_token_url(), |
|
300 |
urllib.urlencode(body), |
|
301 |
headers={ |
|
302 |
'Content-Type': 'application/x-www-form-urlencoded', |
|
303 |
}) |
|
304 |
if status != 200: |
|
305 |
logger.error('status from FranceConnect token_url is not 200') |
|
306 |
return None |
|
307 |
result = json_loads(data) |
|
308 |
if 'error' in result: |
|
309 |
logger.error('FranceConnect code resolution failed: %s', result['error']) |
|
310 |
return None |
|
311 |
# check id_token nonce |
|
312 |
id_token = result['id_token'] |
|
313 |
header, payload, signature = id_token.split('.') |
|
314 |
payload = json_loads(base64url_decode(payload)) |
|
315 |
nonce = hashlib.sha256(str(session.id)).hexdigest() |
|
316 |
if payload['nonce'] != nonce: |
|
317 |
logger.error('FranceConnect returned nonce did not match') |
|
318 |
return None |
|
319 |
return result['access_token'] |
|
320 | ||
321 |
def get_user_info(self, access_token): |
|
322 |
logger = get_logger() |
|
323 |
response, status, data, auth_header = http_get_page( |
|
324 |
self.get_user_info_url(), |
|
325 |
headers={ |
|
326 |
'Authorization': 'Bearer %s' % access_token, |
|
327 |
}) |
|
328 |
if status != 200: |
|
329 |
logger.error('status from FranceConnect user_info_url is not 200 but %s and data is' |
|
330 |
' %s', status. data[:100]) |
|
331 |
return None |
|
332 |
return json.loads(data) |
|
333 | ||
334 |
def get_platform(self): |
|
335 |
fc_cfg = get_cfg('fc', {}) |
|
336 |
slug = fc_cfg.get('platform') |
|
337 |
for platform in self.method_admin_directory.PLAFTORMS: |
|
338 |
if platform['slug'] == slug: |
|
339 |
return platform |
|
340 |
raise KeyError('platform %s not found' % slug) |
|
341 | ||
342 |
def get_authorization_url(self): |
|
343 |
return self.get_platform()['authorization_url'] |
|
344 | ||
345 |
def get_token_url(self): |
|
346 |
return self.get_platform()['token_url'] |
|
347 | ||
348 |
def get_user_info_url(self): |
|
349 |
return self.get_platform()['user_info_url'] |
|
350 | ||
351 |
def get_logout_url(self): |
|
352 |
return self.get_platform()['logout_url'] |
|
353 | ||
354 |
def fill_user_attributes(self, user, user_info): |
|
355 |
fc_cfg = get_cfg('fc', {}) |
|
356 |
user_field_mappings = fc_cfg.get('user_field_mappings', []) |
|
357 |
user_formdef = get_publisher().user_class.get_formdef() |
|
358 | ||
359 |
form_data = user.form_data or {} |
|
360 |
user.verified_fields = user.verified_fields or [] |
|
361 | ||
362 |
for user_field_mapping in user_field_mappings: |
|
363 |
field_varname = user_field_mapping['field_varname'] |
|
364 |
value = user_field_mapping['value'] |
|
365 |
verified = user_field_mapping['verified'] |
|
366 |
field_id = None |
|
367 | ||
368 |
try: |
|
369 |
value = WorkflowStatusItem.compute(value, context=user_info) |
|
370 |
except: |
|
371 |
continue |
|
372 |
if field_varname == '__name': |
|
373 |
user.name = value |
|
374 |
elif field_varname == '__email': |
|
375 |
user.email = value |
|
376 |
field_id = 'email' |
|
377 |
else: |
|
378 |
for field in user_formdef.fields: |
|
379 |
if field_varname == field.varname: |
|
380 |
field_id = str(field.id) |
|
381 |
break |
|
382 |
else: |
|
383 |
continue |
|
384 |
form_data[field.id] = value |
|
385 |
if field_varname == '__email': |
|
386 |
field_varname = 'email' # special value for verified email field |
|
387 | ||
388 |
# Update verified fields |
|
389 |
if field_id: |
|
390 |
if verified == 'always' and field_id not in user.verified_fields: |
|
391 |
user.verified_fields.append(field.id) |
|
392 |
elif verified != 'always' and field_id in user.verified_fields: |
|
393 |
user.verified_fields.remove(field.id) |
|
394 | ||
395 |
user.form_data = form_data |
|
396 | ||
397 |
if user.form_data: |
|
398 |
user.set_attributes_from_formdata(user.form_data) |
|
399 | ||
400 |
AUTHORIZATION_REQUEST_ERRORS = { |
|
401 |
'access_denied': N_('You refused the connection'), |
|
402 |
} |
|
403 | ||
404 |
def callback(self): |
|
405 |
if not self.is_ok(): |
|
406 |
return template.error_page(_('FranceConnect support is not yet configured')) |
|
407 |
pub = get_publisher() |
|
408 |
request = get_request() |
|
409 |
session = get_session() |
|
410 |
logger = get_logger() |
|
411 |
state = request.form.get('state', '') |
|
412 |
next_url = session.pop_extra_attribute('fc_next_url_' + state, '') or pub.get_frontoffice_url() |
|
413 | ||
414 |
if 'code' not in request.form: |
|
415 |
error = request.form.get('error') |
|
416 |
# if no error parameter, we stay silent |
|
417 |
if error: |
|
418 |
# we log only errors whose user is not responsible |
|
419 |
msg = _(self.AUTHORIZATION_REQUEST_ERRORS.get(error)) |
|
420 |
if not msg: |
|
421 |
msg = _('FranceConnect authentication failed') |
|
422 |
logger.error('FranceConnect authentication failed with an unknown error: %s', |
|
423 |
error) |
|
424 |
session.message = ('error', msg) |
|
425 |
return redirect(next_url) |
|
426 |
access_token = self.get_access_token(request.form['code']) |
|
427 |
if not access_token: |
|
428 |
return redirect(next_url) |
|
429 |
user_info = self.get_user_info(access_token) |
|
430 |
if not user_info: |
|
431 |
return redirect(next_url) |
|
432 |
# Store user info in session |
|
433 |
flattened_user_info = user_info.copy() |
|
434 |
flatten_dict(flattened_user_info) |
|
435 |
session_var_fc_user = {} |
|
436 |
for key in flattened_user_info: |
|
437 |
session_var_fc_user['fc_user_' + key] = flattened_user_info[key] |
|
438 |
session.add_extra_variables(**session_var_fc_user) |
|
439 | ||
440 |
# Lookup or create user |
|
441 |
sub = user_info['sub'] |
|
442 |
user = None |
|
443 |
for user in pub.user_class.get_users_with_name_identifier(sub): |
|
444 |
break |
|
445 |
if not user: |
|
446 |
user = pub.user_class(sub) |
|
447 |
user.name_identifiers = [sub] |
|
448 | ||
449 |
self.fill_user_attributes(user, user_info) |
|
450 | ||
451 |
if not (user.name and user.email): |
|
452 |
# we didn't get useful attributes, forget it. |
|
453 |
logger.error('failed to get name and/or email attribute from FranceConnect') |
|
454 |
session.message = ('error', |
|
455 |
_('FranceConnect authentication failed: missing name or email')) |
|
456 |
return redirect(next_url) |
|
457 | ||
458 |
user.store() |
|
459 |
session.set_user(user.id) |
|
460 |
return redirect(next_url) |
wcs/qommon/publisher.py | ||
---|---|---|
674 | 674 |
if lasso: |
675 | 675 |
import qommon.ident.idp |
676 | 676 |
classes.append(qommon.ident.idp.IdPAuthMethod) |
677 |
import qommon.ident.franceconnect |
|
678 |
classes.append(qommon.ident.franceconnect.FCAuthMethod) |
|
677 | 679 |
import qommon.ident.password |
678 | 680 |
classes.append(qommon.ident.password.PasswordAuthMethod) |
679 | 681 |
self.ident_methods = {} |
wcs/qommon/sessions.py | ||
---|---|---|
82 | 82 |
jsonp_display_values = None |
83 | 83 |
extra_variables = None |
84 | 84 |
expire = None |
85 |
extra_attributes = None |
|
85 | 86 | |
86 | 87 |
username = None # only set on password authentication |
87 | 88 | |
... | ... | |
116 | 117 |
self.extra_variables or \ |
117 | 118 |
CaptchaSession.has_info(self) or \ |
118 | 119 |
self.expire or \ |
120 |
self.extra_attributes or \ |
|
119 | 121 |
QuixoteSession.has_info(self) |
120 | 122 |
is_dirty = has_info |
121 | 123 | |
... | ... | |
248 | 250 |
d[prefix + k] = v |
249 | 251 |
return d |
250 | 252 | |
253 |
def set_extra_attribute(self, key, value): |
|
254 |
if not self.extra_attributes: |
|
255 |
self.extra_attributes = {} |
|
256 |
self.extra_attributes[key] = value |
|
257 | ||
258 |
def pop_extra_attribute(self, key, default=None): |
|
259 |
return (self.extra_attributes or {}).pop(key, default) |
|
260 | ||
251 | 261 | |
252 | 262 |
class QommonSessionManager(QuixoteSessionManager): |
253 | 263 |
def start_request(self): |
wcs/workflows.py | ||
---|---|---|
1604 | 1604 |
setattr(self, f, value) |
1605 | 1605 | |
1606 | 1606 |
@classmethod |
1607 |
def compute(cls, var, do_ezt=True, raises=False): |
|
1607 |
def compute(cls, var, do_ezt=True, raises=False, context=None):
|
|
1608 | 1608 |
if not isinstance(var, basestring): |
1609 | 1609 |
return var |
1610 | 1610 | |
... | ... | |
1612 | 1612 |
return var |
1613 | 1613 | |
1614 | 1614 |
vars = get_publisher().substitutions.get_context_variables() |
1615 |
vars.update(context or {}) |
|
1616 | ||
1615 | 1617 |
if not var.startswith('='): |
1616 | 1618 |
try: |
1617 | 1619 |
processor = ezt.Template(compress_whitespace=False) |
1618 |
- |