Projet

Général

Profil

0001-add-FranceConnect-authentication-method-14510.patch

Thomas Noël, 03 mai 2017 11:35

Télécharger (32,2 ko)

Voir les différences:

Subject: [PATCH] add FranceConnect authentication method (#14510)

use session.extra_user_variables to store attributes retrieved
during FranceConnect SSO.
 tests/test_fc_auth.py               | 333 ++++++++++++++++++++++++++
 wcs/admin/settings.py               |   1 +
 wcs/qommon/ident/franceconnect.py   | 465 ++++++++++++++++++++++++++++++++++++
 wcs/qommon/publisher.py             |   2 +
 wcs/qommon/static/css/dc2/admin.css |   3 +
 5 files changed, 804 insertions(+)
 create mode 100644 tests/test_fc_auth.py
 create mode 100644 wcs/qommon/ident/franceconnect.py
tests/test_fc_auth.py
1
import urlparse
2
import base64
3
import json
4
import urllib
5

  
6
from quixote import cleanup, get_session_manager
7

  
8
from utilities import get_app, create_temporary_pub
9
import mock
10

  
11
PROFILE = {
12
    'fields': [
13
      {
14
        'kind': 'string',
15
        'description': '',
16
        'required': True,
17
        'user_visible': True,
18
        'label': u'Prenoms',
19
        'disabled': False,
20
        'user_editable': True,
21
        'asked_on_registration': True,
22
        'name': 'prenoms'
23
      },
24
      {
25
        'kind': 'string',
26
        'description': '',
27
        'required': True,
28
        'user_visible': True,
29
        'label': 'Nom',
30
        'disabled': False,
31
        'user_editable': True,
32
        'asked_on_registration': True,
33
        'name': 'nom'
34
      },
35
      {
36
        'kind': 'string',
37
        'description': '',
38
        'required': True,
39
        'user_visible': True,
40
        'label': 'Email',
41
        'disabled': False,
42
        'user_editable': True,
43
        'asked_on_registration': True,
44
        'name': 'email'
45
      },
46
    ]
47
}
48

  
49

  
50
def base64url_encode(v):
51
    return base64.urlsafe_b64encode(v).strip('=')
52

  
53

  
54
def setup_module(module):
55
    cleanup()
56
    global pub
57
    pub = create_temporary_pub()
58

  
59

  
60
def setup_user_profile(pub):
61
    if not pub.cfg:
62
        pub.cfg = {}
63
    # create some roles
64
    from wcs.ctl.check_hobos import CmdCheckHobos
65

  
66
    # setup an hobo profile
67
    CmdCheckHobos().update_profile(PROFILE, pub)
68
    pub.cfg['users']['field_name'] = ['_prenoms', '_nom']
69
    pub.cfg['debug'] = {'logger': True}
70
    pub.user_class.wipe()
71
    pub.write_cfg()
72

  
73
FC_CONFIG = {
74
    'client_id': '123',
75
    'client_secret': 'xyz',
76
    'platform': 'dev-particulier',
77
    'scopes': 'identite_pivot',
78
    'user_field_mappings': [
79
        {
80
            'field_varname': 'prenoms',
81
            'value': '[given_name ""]',
82
            'verified': 'always',
83
        },
84
        {
85
            'field_varname': 'nom',
86
            'value': '[family_name ""]',
87
            'verified': 'always',
88
        },
89
        {
90
            'field_varname': 'email',
91
            'value': '[email ""]',
92
            'verified': 'always',
93
        },
94
    ]
95

  
96
}
97

  
98

  
99
def setup_fc_environment(pub):
100
    if not pub.cfg:
101
        pub.cfg = {}
102
    pub.cfg['identification'] = {
103
        'methods': ['fc'],
104
    }
105
    pub.cfg['fc'] = FC_CONFIG
106
    pub.user_class.wipe()
107
    pub.write_cfg()
108

  
109

  
110
def get_session(app):
111
    try:
112
        session_id = app.cookies.values()[0]
113
    except IndexError:
114
        return None
115
    else:
116
        session_id = session_id.strip('"')
117
        return get_session_manager().session_class.get(session_id)
118

  
119

  
120
def test_fc_login_page(caplog):
121
    setup_user_profile(pub)
122
    setup_fc_environment(pub)
123
    app = get_app(pub)
124
    resp = app.get('/')
125
    resp = app.get('/login/')
126
    assert resp.status_int == 302
127
    assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
128
    qs = urlparse.parse_qs(resp.location.split('?')[1])
129
    nonce = qs['nonce'][0]
130
    state = qs['state'][0]
131

  
132
    id_token = {
133
        'nonce': nonce,
134
    }
135
    token_result = {
136
        'access_token': 'abcd',
137
        'id_token': '.%s.' % base64url_encode(json.dumps(id_token)),
138
    }
139
    user_info_result = {
140
        'sub': 'ymca',
141
        'given_name': 'John',
142
        'family_name': 'Doe',
143
        'email': 'john.doe@example.com',
144
    }
145

  
146
    assert pub.user_class.count() == 0
147
    with mock.patch('qommon.ident.franceconnect.http_post_request') as http_post_request, \
148
            mock.patch('qommon.ident.franceconnect.http_get_page') as http_get_page:
149
        http_post_request.return_value = (None, 200, json.dumps(token_result), None)
150
        http_get_page.return_value = (None, 200, json.dumps(user_info_result), None)
151
        resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
152
            'code': '1234', 'state': state,
153
        }))
154
    assert pub.user_class.count() == 1
155
    user = pub.user_class.select()[0]
156
    assert user.form_data == {'_email': 'john.doe@example.com', '_nom': 'Doe', '_prenoms': 'John'}
157
    assert set(user.verified_fields) == set(['_nom', '_prenoms', '_email'])
158
    assert user.email == 'john.doe@example.com'
159
    assert user.name_identifiers == ['ymca']
160
    assert user.name == 'John Doe'
161

  
162
    # Verify we are logged in
163
    session = get_session(app)
164
    assert session.user == user.id
165
    assert session.extra_user_variables['fc_given_name'] == 'John'
166
    assert session.extra_user_variables['fc_family_name'] == 'Doe'
167
    assert session.extra_user_variables['fc_email'] == 'john.doe@example.com'
168
    assert session.extra_user_variables['fc_sub'] == 'ymca'
169

  
170
    resp = app.get('/logout')
171

  
172
    # Test error handling path
173
    resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
174
        'state': state,
175
        'error': 'access_denied',
176
    }))
177
    assert 'user did not authorize login' in caplog.records[-1].message
178
    resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
179
        'state': state,
180
        'error': 'whatever',
181
    }))
182
    assert 'whatever' in caplog.records[-1].message
183

  
184
    # Login existing user
185
    def logme(login_url):
186
        resp = app.get(login_url)
187
        assert resp.status_int == 302
188
        assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
189
        qs = urlparse.parse_qs(resp.location.split('?')[1])
190
        state = qs['state'][0]
191
        id_token['nonce'] = qs['nonce'][0]
192
        token_result['id_token'] = '.%s.' % base64url_encode(json.dumps(id_token))
193

  
194
        with mock.patch('qommon.ident.franceconnect.http_post_request') as http_post_request, \
195
                mock.patch('qommon.ident.franceconnect.http_get_page') as http_get_page:
196
            http_post_request.return_value = (None, 200, json.dumps(token_result), None)
197
            http_get_page.return_value = (None, 200, json.dumps(user_info_result), None)
198
            resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
199
                'code': '1234', 'state': state,
200
            }))
201
        return resp
202
    app.get('/logout')
203
    resp = logme('/login/')
204
    new_session = get_session(app)
205
    assert session.id != new_session.id, 'no new session created'
206
    assert pub.user_class.count() == 1, 'existing user has not been used'
207
    assert new_session.user == user.id
208

  
209
    # Login with next url
210
    app.get('/logout')
211
    resp = logme('/login/?next=/foo/bar/')
212
    assert resp.status_int == 302
213
    assert resp.location.endswith('/foo/bar/')
214

  
215
    # Direct login link
216
    app.get('/logout')
217
    resp = logme('/ident/fc/login')
218
    new_session = get_session(app)
219
    assert session.id != new_session.id, 'no new session created'
220
    assert pub.user_class.count() == 1, 'existing user has not been used'
221
    assert new_session.user == user.id
222
    app.get('/logout')
223
    resp = logme('/ident/fc/login?next=/foo/bar/')
224
    assert resp.status_int == 302
225
    assert resp.location.endswith('/foo/bar/')
226

  
227
    # User with missing attributes
228
    resp = app.get('/logout')
229
    resp = app.get('/login/')
230
    assert resp.status_int == 302
231
    assert resp.location.startswith('https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize')
232
    qs = urlparse.parse_qs(resp.location.split('?')[1])
233
    state = qs['state'][0]
234
    id_token['nonce'] = qs['nonce'][0]
235
    token_result['id_token'] = '.%s.' % base64url_encode(json.dumps(id_token))
236
    bad_user_info_result = {
237
        'sub': 'ymca2',
238
        'given_name': 'John',
239
        'family_name': 'Deux',
240
        # 'email': 'john.deux@example.com',  # missing
241
    }
242
    with mock.patch('qommon.ident.franceconnect.http_post_request') as http_post_request, \
243
            mock.patch('qommon.ident.franceconnect.http_get_page') as http_get_page:
244
        http_post_request.return_value = (None, 200, json.dumps(token_result), None)
245
        http_get_page.return_value = (None, 200, json.dumps(bad_user_info_result), None)
246
        resp = app.get('/ident/fc/callback?%s' % urllib.urlencode({
247
            'code': '1234', 'state': state,
248
        }))
249
    assert pub.user_class.count() == 1, 'an invalid user (no email) has been created'
250
    session = get_session(app)
251
    assert not session
252

  
253

  
254
def test_fc_settings():
255
    setup_user_profile(pub)
256
    app = get_app(pub)
257
    resp = app.get('/backoffice/settings/identification/')
258
    resp.forms[0]['methods$elementfc'].checked = True
259
    resp = resp.forms[0].submit().follow()
260

  
261
    assert 'FranceConnect' in resp.body
262
    resp = resp.click('FranceConnect')
263
    resp = resp.forms[0].submit('user_field_mappings$add_element')
264
    resp = resp.forms[0].submit('user_field_mappings$add_element')
265
    resp.forms[0]['client_id'].value = '123'
266
    resp.forms[0]['client_secret'].value = 'xyz'
267
    resp.forms[0]['platform'].value = 'Development citizens'
268
    resp.forms[0]['scopes'].value = 'identite_pivot'
269

  
270
    resp.forms[0]['user_field_mappings$element0$field_varname'] = 'prenoms'
271
    resp.forms[0]['user_field_mappings$element0$value'] = '[given_name ""]'
272
    resp.forms[0]['user_field_mappings$element0$verified'] = 'Always'
273

  
274
    resp.forms[0]['user_field_mappings$element1$field_varname'] = 'nom'
275
    resp.forms[0]['user_field_mappings$element1$value'] = '[family_name ""]'
276
    resp.forms[0]['user_field_mappings$element1$verified'] = 'Always'
277

  
278
    resp.forms[0]['user_field_mappings$element2$field_varname'] = 'email'
279
    resp.forms[0]['user_field_mappings$element2$value'] = '[email ""]'
280
    resp.forms[0]['user_field_mappings$element2$verified'] = 'Always'
281

  
282
    resp = resp.forms[0].submit('submit').follow()
283
    assert pub.cfg['fc'] == FC_CONFIG
284

  
285

  
286
def test_fc_settings_no_user_profile():
287
    FC_CONFIG = {
288
        'client_id': '123',
289
        'client_secret': 'xyz',
290
        'platform': 'dev-particulier',
291
        'scopes': 'identite_pivot',
292
        'user_field_mappings': [
293
            {
294
                'field_varname': '__name',
295
                'value': '[given_name ""] [family_name ""]',
296
                'verified': 'always',
297
            },
298
            {
299
                'field_varname': '__email',
300
                'value': '[email ""]',
301
                'verified': 'always',
302
            },
303
        ]
304

  
305
    }
306

  
307
    pub.cfg = {}
308
    pub.user_class.wipe()
309
    pub.write_cfg()
310
    app = get_app(pub)
311
    resp = app.get('/backoffice/settings/identification/')
312
    resp.forms[0]['methods$elementfc'].checked = True
313
    resp = resp.forms[0].submit().follow()
314

  
315
    assert 'FranceConnect' in resp.body
316
    resp = resp.click('FranceConnect')
317
    resp = resp.forms[0].submit('user_field_mappings$add_element')
318
    resp = resp.forms[0].submit('user_field_mappings$add_element')
319
    resp.forms[0]['client_id'].value = '123'
320
    resp.forms[0]['client_secret'].value = 'xyz'
321
    resp.forms[0]['platform'].value = 'Development citizens'
322
    resp.forms[0]['scopes'].value = 'identite_pivot'
323

  
324
    resp.forms[0]['user_field_mappings$element0$field_varname'] = '__name'
325
    resp.forms[0]['user_field_mappings$element0$value'] = '[given_name ""] [family_name ""]'
326
    resp.forms[0]['user_field_mappings$element0$verified'] = 'Always'
327

  
328
    resp.forms[0]['user_field_mappings$element2$field_varname'] = '__email'
329
    resp.forms[0]['user_field_mappings$element2$value'] = '[email ""]'
330
    resp.forms[0]['user_field_mappings$element2$verified'] = 'Always'
331

  
332
    resp = resp.forms[0].submit('submit').follow()
333
    assert pub.cfg['fc'] == FC_CONFIG
wcs/admin/settings.py
71 71
        if lasso is not None:
72 72
            methods.insert(0,
73 73
                    ('idp', _('Delegated to SAML identity provider'), 'idp'))
74
        methods.append(('fc', _('Delegated to FranceConnect'), 'fc'))
74 75
        form.add(CheckboxesWidget, 'methods', title = _('Methods'),
75 76
                value=identification_cfg.get('methods'),
76 77
                options=methods,
wcs/qommon/ident/franceconnect.py
1
# w.c.s. - web application for online forms
2
# Copyright (C) 2005-2017  Entr'ouvert
3
#
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU General Public License for more details.
13
#
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16

  
17
import base64
18
import hashlib
19
import sys
20
import urllib
21
import uuid
22

  
23
from quixote import redirect, get_session, get_publisher, get_request, get_session_manager
24
from quixote.directory import Directory
25
from quixote.html import htmltext, TemplateIO
26

  
27
from qommon import _
28
from qommon.backoffice.menu import html_top
29
from qommon import template, get_cfg, get_logger
30
from qommon.form import (Form, StringWidget, CompositeWidget, ComputedExpressionWidget,
31
                         SingleSelectWidget, WidgetListAsTable)
32
from qommon.misc import http_post_request, http_get_page, json_loads
33

  
34
from wcs.workflows import WorkflowStatusItem
35
from wcs.formdata import flatten_dict
36

  
37
from .base import AuthMethod
38

  
39

  
40
ADMIN_TITLE = N_('FranceConnect')
41

  
42
# XXX: make an OIDC auth method that FranceConnect would inherit from
43

  
44

  
45
def base64url_decode(input):
46
    rem = len(input) % 4
47
    if rem > 0:
48
        input += b'=' * (4 - rem)
49
    return base64.urlsafe_b64decode(input)
50

  
51

  
52
class UserFieldMappingRowWidget(CompositeWidget):
53
    def __init__(self, name, value=None, **kwargs):
54
        CompositeWidget.__init__(self, name, value, **kwargs)
55
        if not value:
56
            value = {}
57

  
58
        fields = []
59
        users_cfg = get_cfg('users', {})
60
        user_formdef = get_publisher().user_class.get_formdef()
61
        if not user_formdef or not users_cfg.get('field_name'):
62
            fields.append(('__name', _('Name'), '__name'))
63
        if not user_formdef or not users_cfg.get('field_email'):
64
            fields.append(('__email', _('Email'), '__email'))
65
        if user_formdef and user_formdef.fields:
66
            for field in user_formdef.fields:
67
                if field.varname:
68
                    fields.append((field.varname, field.label, field.varname))
69

  
70
        self.add(SingleSelectWidget, name='field_varname', title=_('Field'),
71
                 value=value.get('field_varname'),
72
                 options=fields, **kwargs)
73
        self.add(ComputedExpressionWidget, name='value', title=_('Value'),
74
                 value=value.get('value'))
75
        self.add(SingleSelectWidget, 'verified',
76
                 title=_('Is attribute verified'),
77
                 value=value.get('verified'),
78
                 options=[('never', _('Never')),
79
                          ('always', _('Always'))
80
                          ]
81
                 )
82

  
83
    def _parse(self, request):
84
        if self.get('value') and self.get('field_varname') and self.get('verified'):
85
            self.value = {
86
                'value': self.get('value'),
87
                'field_varname': self.get('field_varname'),
88
                'verified': self.get('verified'),
89
            }
90
        else:
91
            self.value = None
92

  
93

  
94
class UserFieldMappingTableWidget(WidgetListAsTable):
95
    readonly = False
96

  
97
    def __init__(self, name, **kwargs):
98
        super(UserFieldMappingTableWidget, self).__init__(
99
            name, element_type=UserFieldMappingRowWidget, **kwargs)
100

  
101

  
102
class MethodDirectory(Directory):
103
    _q_exports = ['login', 'callback']
104

  
105
    def login(self):
106
        return FCAuthMethod().login()
107

  
108
    def callback(self):
109
        return FCAuthMethod().callback()
110

  
111

  
112
class MethodAdminDirectory(Directory):
113
    title = ADMIN_TITLE
114
    label = N_('Configure FranceConnect identification method')
115

  
116
    _q_exports = ['']
117

  
118
    PLAFTORMS = [
119
        {
120
            'name': N_('Development citizens'),
121
            'slug': 'dev-particulier',
122
            'authorization_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/authorize',
123
            'token_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/token',
124
            'user_info_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/userinfo',
125
            'logout_url': 'https://fcp.integ01.dev-franceconnect.fr/api/v1/logout',
126
        },
127
        {
128
            'name': N_('Development enterprise'),
129
            'slug': 'dev-entreprise',
130
            'authorization_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/authorize',
131
            'token_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/token',
132
            'user_info_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/userinfo',
133
            'logout_url': 'https://fce.integ01.dev-franceconnect.fr/api/v1/logout',
134
        },
135
        {
136
            'name': N_('Production citizens'),
137
            'slug': 'prod-particulier',
138
            'authorization_url': 'https://app.franceconnect.gouv.fr/api/v1/authorize',
139
            'token_url': 'https://app.franceconnect.gouv.fr/api/v1/token',
140
            'user_info_url': 'https://app.franceconnect.gouv.fr/api/v1/userinfo',
141
            'logout_url': 'https://app.franceconnect.gouv.fr/api/v1/logout',
142
        }
143
    ]
144

  
145
    CONFIG = [
146
        ('client_id', N_('Client ID')),
147
        ('client_secret', N_('Client secret')),
148
        ('platform', N_('Platform')),
149
        ('scopes', N_('Scopes')),
150
        ('user_field_mappings', N_('User field mappings')),
151
    ]
152

  
153
    KNOWN_ATTRIBUTES = [
154
        ('given_name', N_('first names separated by spaces')),
155
        ('family_name', N_('birth\'s last name')),
156
        ('birthdate', N_('birthdate formatted as YYYY-MM-DD')),
157
        ('gender', N_('gender \'male\' for men, and \'female\' for women')),
158
        ('birthplace', N_('INSEE code of the place of birth')),
159
        ('birthcountry', N_('INSEE code of the country of birth')),
160
        ('email', N_('email')),
161
        ('siret', N_('SIRET or SIREN number of the enterprise')),
162
        # Note: FranceConnect website also refer to adress and phones attributes
163
        # but we don't know what must be expected of their value.
164
    ]
165

  
166
    @classmethod
167
    def get_form(cls, instance={}):
168
        form = Form(enctype='multipart/form-data')
169
        for key, title in cls.CONFIG:
170
            attrs = {}
171
            default = None
172
            hint = None
173
            kwargs = {}
174
            widget = StringWidget
175

  
176
            if key == 'user_field_mappings':
177
                widget = UserFieldMappingTableWidget
178
            elif key == 'platform':
179
                widget = SingleSelectWidget
180
                kwargs['options'] = [
181
                    (platform['slug'], platform['name']) for platform in cls.PLAFTORMS
182
                ]
183
            elif key == 'scopes':
184
                default = 'identite_pivot address email phones'
185
                hint = _('Space separated values among: identite_pivot, address, email, phones, '
186
                         'profile, birth, preferred_username, gender, birthdate, '
187
                         'birthcountry, birthplace')
188
            if widget == StringWidget:
189
                kwargs['size'] = '80'
190
            form.add(widget, key, title=_(title), hint=hint, required=True,
191
                     value=instance.get(key, default),
192
                     attrs=attrs, **kwargs)
193
        form.add_submit('submit', _('Submit'))
194
        form.add_submit('cancel', _('Cancel'))
195

  
196
        return form
197

  
198
    def submit(self, form):
199
        cfg = {}
200
        for key, title in self.CONFIG:
201
            cfg[key] = form.get_widget(key).parse()
202
        get_publisher().cfg['fc'] = cfg
203
        get_publisher().write_cfg()
204
        return redirect('../..')
205

  
206
    def _q_index(self):
207
        fc_cfg = get_cfg('fc', {})
208
        form = self.get_form(fc_cfg)
209
        pub = get_publisher()
210

  
211
        if form.get_submit() == 'cancel':
212
            return redirect('../..')
213

  
214
        if 'submit' in get_request().form and form.is_submitted() and not form.has_errors():
215
            return self.submit(form)
216

  
217
        html_top('settings', title=_(self.title))
218
        r = TemplateIO(html=True)
219
        r += htmltext('<h2>%s</h2>') % self.title
220
        fc_callback = pub.get_frontoffice_url() + '/ident/fc/callback'
221
        r += htmltext('<p>')
222
        r += _('Callback URL is %s.') % fc_callback
223
        r += htmltext('</p>')
224
        r += htmltext('<p>')
225
        r += htmltext(_('See <a href="https://franceconnect.gouv.fr/fournisseur-service">'
226
                        'FranceConnect partners\'site</a> for getting a client_id and '
227
                        'a client_secret.'))
228
        r += htmltext('</p>')
229
        r += form.render()
230
        r += htmltext('<div><p>')
231
        r += htmltext(_('See <a '
232
                      'href="https://franceconnect.gouv.fr/fournisseur-service#identite-pivot" '
233
                      '>FranceConnect partners\'site</a> for more '
234
                      'informations on available scopes and attributes. Known ones '
235
                      'are:'))
236
        r += htmltext('</p>')
237
        r += htmltext('<table class="franceconnect-attrs"><thead>'
238
                      '<tr><th>%s</th><th>%s</th></tr></thead><tbody>') % (
239
                      _('Attribute'), _('Description'))
240
        for attribute, description in self.KNOWN_ATTRIBUTES:
241
            r += htmltext('<tr><td><code>%s</code></td><td>%s</td></tr>') % (attribute, _(description))
242
        r += htmltext('</tbody></table></div>')
243

  
244
        return r.getvalue()
245

  
246

  
247
class FCAuthMethod(AuthMethod):
248
    key = 'fc'
249
    description = ADMIN_TITLE
250
    method_directory = MethodDirectory
251
    method_admin_directory = MethodAdminDirectory
252

  
253
    def is_ok(self):
254
        fc_cfg = get_cfg('fc', {})
255
        for key, title in self.method_admin_directory.CONFIG:
256
            if not fc_cfg.get(key):
257
                return False
258
        return True
259

  
260
    def login(self):
261
        if not self.is_ok():
262
            return template.error_page(_('FranceConnect support is not yet configured.'))
263

  
264
        fc_cfg = get_cfg('fc', {})
265
        pub = get_publisher()
266
        session = get_session()
267

  
268
        authorization_url = self.get_authorization_url()
269
        client_id = fc_cfg.get('client_id')
270
        state = str(uuid.uuid4())
271
        session = get_session()
272
        next_url = get_request().form.get('next') or pub.get_frontoffice_url()
273
        session.extra_user_variables = session.extra_user_variables or {}
274
        session.extra_user_variables['fc_next_url_' + state] = next_url
275

  
276
        # generate a session id if none exists, ugly but necessary
277
        get_session_manager().maintain_session(session)
278

  
279
        nonce = hashlib.sha256(str(session.id)).hexdigest()
280
        fc_callback = pub.get_frontoffice_url() + '/ident/fc/callback'
281
        qs = urllib.urlencode({
282
            'response_type': 'code',
283
            'client_id': client_id,
284
            'redirect_uri': fc_callback,
285
            'scope': 'openid ' + fc_cfg.get('scopes', ''),
286
            'state': state,
287
            'nonce': nonce,
288
        })
289
        redirect_url = '%s?%s' % (authorization_url, qs)
290
        return redirect(redirect_url)
291

  
292
    def is_interactive(self):
293
        return False
294

  
295
    def get_access_token(self, code):
296
        logger = get_logger()
297
        session = get_session()
298
        fc_cfg = get_cfg('fc', {})
299
        client_id = fc_cfg.get('client_id')
300
        client_secret = fc_cfg.get('client_secret')
301
        redirect_uri = get_request().get_frontoffice_url().split('?')[0]
302
        body = {
303
            'grant_type': 'authorization_code',
304
            'redirect_uri': redirect_uri,
305
            'client_id': client_id,
306
            'client_secret': client_secret,
307
            'code': code,
308
        }
309
        response, status, data, auth_header = http_post_request(
310
            self.get_token_url(),
311
            urllib.urlencode(body),
312
            headers={
313
                'Content-Type': 'application/x-www-form-urlencoded',
314
            })
315
        if status != 200:
316
            logger.error('status from FranceConnect token_url is not 200')
317
            return None
318
        result = json_loads(data)
319
        if 'error' in result:
320
            logger.error('FranceConnect code resolution failed: %s', result['error'])
321
            return None
322
        # check id_token nonce
323
        id_token = result['id_token']
324
        header, payload, signature = id_token.split('.')
325
        payload = json_loads(base64url_decode(payload))
326
        nonce = hashlib.sha256(str(session.id)).hexdigest()
327
        if payload['nonce'] != nonce:
328
            logger.error('FranceConnect returned nonce did not match')
329
            return None
330
        return result['access_token']
331

  
332
    def get_user_info(self, access_token):
333
        logger = get_logger()
334
        response, status, data, auth_header = http_get_page(
335
            self.get_user_info_url(),
336
            headers={
337
                'Authorization': 'Bearer %s' % access_token,
338
            })
339
        if status != 200:
340
            logger.error('status from FranceConnect user_info_url is not 200 but %s and data is'
341
                         ' %s', status. data[:100])
342
            return None
343
        return json_loads(data)
344

  
345
    def get_platform(self):
346
        fc_cfg = get_cfg('fc', {})
347
        slug = fc_cfg.get('platform')
348
        for platform in self.method_admin_directory.PLAFTORMS:
349
            if platform['slug'] == slug:
350
                return platform
351
        raise KeyError('platform %s not found' % slug)
352

  
353
    def get_authorization_url(self):
354
        return self.get_platform()['authorization_url']
355

  
356
    def get_token_url(self):
357
        return self.get_platform()['token_url']
358

  
359
    def get_user_info_url(self):
360
        return self.get_platform()['user_info_url']
361

  
362
    def get_logout_url(self):
363
        return self.get_platform()['logout_url']
364

  
365
    def fill_user_attributes(self, user, user_info):
366
        fc_cfg = get_cfg('fc', {})
367
        user_field_mappings = fc_cfg.get('user_field_mappings', [])
368
        user_formdef = get_publisher().user_class.get_formdef()
369

  
370
        form_data = user.form_data or {}
371
        user.verified_fields = user.verified_fields or []
372

  
373
        for user_field_mapping in user_field_mappings:
374
            field_varname = user_field_mapping['field_varname']
375
            value = user_field_mapping['value']
376
            verified = user_field_mapping['verified']
377
            field_id = None
378

  
379
            try:
380
                value = WorkflowStatusItem.compute(value, context=user_info)
381
            except Exception, e:
382
                get_publisher().notify_of_exception(sys.exc_info(), context='[FC-user-compute]')
383
                continue
384
            if field_varname == '__name':
385
                user.name = value
386
            elif field_varname == '__email':
387
                user.email = value
388
                field_id = 'email' # special value for verified email field
389
            else:
390
                for field in user_formdef.fields:
391
                    if field_varname == field.varname:
392
                        field_id = str(field.id)
393
                        break
394
                else:
395
                    continue
396
                form_data[field.id] = value
397
            # Update verified fields
398
            if field_id:
399
                if verified == 'always' and field_id not in user.verified_fields:
400
                    user.verified_fields.append(field_id)
401
                elif verified != 'always' and field_id in user.verified_fields:
402
                    user.verified_fields.remove(field_id)
403

  
404
        user.form_data = form_data
405

  
406
        if user.form_data:
407
            user.set_attributes_from_formdata(user.form_data)
408

  
409
    AUTHORIZATION_REQUEST_ERRORS = {
410
        'access_denied': N_('user did not authorize login'),
411
    }
412

  
413
    def callback(self):
414
        if not self.is_ok():
415
            return template.error_page(_('FranceConnect support is not yet configured.'))
416
        pub = get_publisher()
417
        request = get_request()
418
        session = get_session()
419
        logger = get_logger()
420
        state = request.form.get('state', '')
421
        next_url = ((session.extra_user_variables or {}).pop('fc_next_url_' + state, '')
422
                    or pub.get_frontoffice_url())
423

  
424
        if 'code' not in request.form:
425
            error = request.form.get('error')
426
            # if no error parameter, we stay silent
427
            if error:
428
                # we log only errors whose user is not responsible
429
                msg = self.AUTHORIZATION_REQUEST_ERRORS.get(error)
430
                logger.error(_('FranceConnect authentication failed: %s'),
431
                             _(msg) if msg else error)
432
            return redirect(next_url)
433
        access_token = self.get_access_token(request.form['code'])
434
        if not access_token:
435
            return redirect(next_url)
436
        user_info = self.get_user_info(access_token)
437
        if not user_info:
438
            return redirect(next_url)
439
        # Store user info in session
440
        flattened_user_info = user_info.copy()
441
        flatten_dict(flattened_user_info)
442
        session_var_fc_user = {}
443
        for key in flattened_user_info:
444
            session_var_fc_user['fc_' + key] = flattened_user_info[key]
445

  
446
        #  Lookup or create user
447
        sub = user_info['sub']
448
        user = None
449
        for user in pub.user_class.get_users_with_name_identifier(sub):
450
            break
451
        if not user:
452
            user = pub.user_class(sub)
453
            user.name_identifiers = [sub]
454

  
455
        self.fill_user_attributes(user, user_info)
456

  
457
        if not (user.name and user.email):
458
            # we didn't get useful attributes, forget it.
459
            logger.error('failed to get name and/or email attribute from FranceConnect')
460
            return redirect(next_url)
461

  
462
        user.store()
463
        session.set_user(user.id)
464
        session.extra_user_variables = session_var_fc_user
465
        return redirect(next_url)
wcs/qommon/publisher.py
674 674
        if lasso:
675 675
            import qommon.ident.idp
676 676
            classes.append(qommon.ident.idp.IdPAuthMethod)
677
        import qommon.ident.franceconnect
678
        classes.append(qommon.ident.franceconnect.FCAuthMethod)
677 679
        import qommon.ident.password
678 680
        classes.append(qommon.ident.password.PasswordAuthMethod)
679 681
        self.ident_methods = {}
wcs/qommon/static/css/dc2/admin.css
778 778
	border-bottom: 1px solid #bcbcbc;
779 779
}
780 780

  
781
table.franceconnect-attrs,
781 782
table#substvars {
782 783
	border-bottom: 1px solid #eee;
783 784
	border-collapse: collapse;
784 785
	margin: 1em 0;
785 786
}
786 787

  
788
table.franceconnect-attrs th,
787 789
table#substvars th {
788 790
	background: #eee;
789 791
	border: 1px solid #eee;
......
792 794
	padding: 0 1em;
793 795
}
794 796

  
797
table.franceconnect-attrs td,
795 798
table#substvars td {
796 799
	border: 1px solid #eee;
797 800
	padding: 0 1em;
798
-