15 |
15 |
# You should have received a copy of the GNU Affero General Public License
|
16 |
16 |
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
17 |
17 |
|
18 |
|
import pytest
|
19 |
|
import re
|
20 |
|
import httmock
|
21 |
|
import mock
|
22 |
|
import json
|
23 |
|
import base64
|
24 |
|
from jwcrypto import jwk, jwt
|
25 |
18 |
import datetime
|
|
19 |
import mock
|
26 |
20 |
|
27 |
21 |
import requests
|
28 |
22 |
|
29 |
23 |
from django.contrib.auth import get_user_model
|
30 |
24 |
from django.urls import reverse
|
31 |
|
from django.utils.encoding import force_text
|
32 |
25 |
from django.utils.six.moves.urllib import parse as urlparse
|
33 |
26 |
from django.utils.timezone import now
|
34 |
27 |
|
35 |
|
from authentic2.models import Service
|
36 |
|
|
37 |
28 |
from authentic2_auth_fc import models
|
38 |
29 |
from authentic2_auth_fc.utils import requests_retry_session
|
39 |
30 |
|
40 |
|
from ..utils import login
|
|
31 |
from ..utils import login, get_link_from_mail
|
41 |
32 |
|
42 |
33 |
|
43 |
34 |
User = get_user_model()
|
44 |
35 |
|
45 |
36 |
|
46 |
|
@pytest.fixture(autouse=True)
|
47 |
|
def service(db):
|
48 |
|
return Service.objects.create(name='portail', slug='portail')
|
49 |
|
|
50 |
|
|
51 |
37 |
def path(url):
|
52 |
38 |
return urlparse.urlparse(url).path
|
53 |
39 |
|
54 |
40 |
|
55 |
|
def get_links_from_mail(mail):
|
56 |
|
'''Extract links from mail sent by Django'''
|
57 |
|
return re.findall('https?://[^ \n]*', mail.body)
|
58 |
|
|
59 |
|
|
60 |
|
def hmac_jwt(payload, key):
|
61 |
|
header = {'alg': 'HS256'}
|
62 |
|
k = jwk.JWK(
|
63 |
|
kty='oct', k=force_text(base64.b64encode(key.encode('utf-8'))))
|
64 |
|
t = jwt.JWT(header=header, claims=payload)
|
65 |
|
t.make_signed_token(k)
|
66 |
|
return t.serialize()
|
67 |
|
|
68 |
|
|
69 |
|
def test_login_redirect(app, fc_settings):
|
|
41 |
def test_login_redirect(app, franceconnect):
|
70 |
42 |
url = reverse('fc-login-or-link')
|
71 |
43 |
response = app.get(url, status=302)
|
72 |
44 |
assert response['Location'].startswith('https://fcp.integ01')
|
73 |
45 |
|
74 |
46 |
|
75 |
|
def check_authorization_url(url):
|
76 |
|
callback = reverse('fc-login-or-link')
|
77 |
|
assert url.startswith('https://fcp.integ01')
|
78 |
|
query_string = url.split('?')[1]
|
79 |
|
parsed = {x: y[0] for x, y in urlparse.parse_qs(query_string).items()}
|
80 |
|
assert 'redirect_uri' in parsed
|
81 |
|
assert callback in parsed['redirect_uri']
|
82 |
|
assert 'client_id' in parsed
|
83 |
|
assert parsed['client_id'] == 'xxx'
|
84 |
|
assert 'scope' in parsed
|
85 |
|
assert set(parsed['scope'].split()) == set(['openid', 'profile', 'email'])
|
86 |
|
assert 'state' in parsed
|
87 |
|
assert 'nonce' in parsed
|
88 |
|
assert parsed['state'] == parsed['nonce']
|
89 |
|
assert 'response_type' in parsed
|
90 |
|
assert parsed['response_type'] == 'code'
|
91 |
|
assert parsed['acr_values'] == 'eidas1'
|
92 |
|
return parsed['state']
|
93 |
|
|
94 |
|
|
95 |
|
def test_login_with_condition(app, fc_settings, settings):
|
|
47 |
def test_login_with_condition(settings, app, franceconnect):
|
96 |
48 |
# open the page first time so session cookie can be set
|
97 |
49 |
response = app.get('/login/')
|
98 |
50 |
assert 'fc-button' in response
|
... | ... | |
105 |
57 |
assert 'fc-button' not in response
|
106 |
58 |
|
107 |
59 |
|
108 |
|
def test_login_autorun(app, fc_settings, settings):
|
|
60 |
def test_login_autorun(settings, app, franceconnect):
|
109 |
61 |
# hide password block
|
110 |
62 |
settings.AUTH_FRONTENDS_KWARGS = {'password': {'show_condition': 'remote_addr==\'0.0.0.0\''}}
|
111 |
63 |
response = app.get('/login/')
|
112 |
64 |
assert response['Location'] == reverse('fc-login-or-link')
|
113 |
65 |
|
114 |
66 |
|
115 |
|
@pytest.mark.parametrize('exp', [now() + datetime.timedelta(seconds=1000),
|
116 |
|
now() - datetime.timedelta(seconds=1000)])
|
117 |
|
def test_login_simple(app, fc_settings, caplog, hooks, exp):
|
|
67 |
def test_no_create(app, franceconnect):
|
118 |
68 |
response = app.get('/login/?service=portail&next=/idp/')
|
119 |
69 |
response = response.click(href='callback')
|
120 |
|
location = response['Location']
|
121 |
|
state = check_authorization_url(location)
|
122 |
|
|
123 |
|
@httmock.urlmatch(path=r'.*/token$')
|
124 |
|
def access_token_response(url, request):
|
125 |
|
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
126 |
|
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
127 |
|
'grant_type'])
|
128 |
|
assert parsed['code'] == 'zzz'
|
129 |
|
assert parsed['client_id'] == 'xxx'
|
130 |
|
assert parsed['client_secret'] == 'yyy'
|
131 |
|
assert parsed['grant_type'] == 'authorization_code'
|
132 |
|
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
|
133 |
|
parsed_callback = urlparse.urlparse(callback)
|
134 |
|
assert parsed_redirect.path == parsed_callback.path
|
135 |
|
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
|
136 |
|
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
|
137 |
|
id_token = {
|
138 |
|
'sub': '1234',
|
139 |
|
'aud': 'xxx',
|
140 |
|
'nonce': state,
|
141 |
|
'exp': int(exp.timestamp()),
|
142 |
|
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
143 |
|
}
|
144 |
|
return json.dumps({
|
145 |
|
'access_token': 'uuu',
|
146 |
|
'id_token': hmac_jwt(id_token, 'yyy')
|
147 |
|
})
|
148 |
|
|
149 |
|
@httmock.urlmatch(path=r'.*userinfo$')
|
150 |
|
def user_info_response(url, request):
|
151 |
|
assert request.headers['Authorization'] == 'Bearer uuu'
|
152 |
|
return json.dumps({
|
153 |
|
'sub': '1234',
|
154 |
|
'family_name': u'Frédérique',
|
155 |
|
'given_name': u'Ÿuñe',
|
156 |
|
})
|
157 |
|
|
158 |
|
callback = reverse('fc-login-or-link')
|
159 |
|
with httmock.HTTMock(access_token_response, user_info_response):
|
160 |
|
response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302)
|
|
70 |
franceconnect.handle_authorization(app, response.location, status=302)
|
161 |
71 |
assert User.objects.count() == 0
|
162 |
|
fc_settings.A2_FC_CREATE = True
|
163 |
|
with httmock.HTTMock(access_token_response, user_info_response):
|
164 |
|
response = app.get(callback + '?service=portail&next=/idp/&code=zzz&state=%s' % state, status=302)
|
165 |
|
if exp < now():
|
166 |
|
assert User.objects.count() == 0
|
167 |
|
else:
|
168 |
|
assert User.objects.count() == 1
|
169 |
|
if User.objects.count():
|
170 |
|
user = User.objects.get()
|
171 |
|
assert user.verified_attributes.first_name == u'Ÿuñe'
|
172 |
|
assert user.verified_attributes.last_name == u'Frédérique'
|
173 |
|
assert path(response['Location']) == '/idp/'
|
174 |
|
assert hooks.event[1]['kwargs']['name'] == 'login'
|
175 |
|
assert hooks.event[1]['kwargs']['service'] == 'portail'
|
176 |
|
# we must be connected
|
177 |
|
assert app.session['_auth_user_id']
|
178 |
|
assert app.session.get_expire_at_browser_close()
|
179 |
|
assert models.FcAccount.objects.count() == 1
|
180 |
|
|
181 |
|
# test unlink cancel case
|
182 |
|
response = app.get('/accounts/')
|
183 |
|
response = response.click('Delete link')
|
184 |
|
assert len(response.pyquery('[name=cancel][formnovalidate]')) == 1
|
185 |
|
response = response.form.submit(name='cancel')
|
186 |
|
response = response.follow()
|
187 |
|
|
188 |
|
# test unlink submit case
|
189 |
|
response = app.get('/accounts/')
|
190 |
|
response = response.click('Delete link')
|
191 |
|
response.form.set('new_password1', 'ikKL1234')
|
192 |
|
response.form.set('new_password2', 'ikKL1234')
|
193 |
|
response = response.form.submit(name='unlink')
|
194 |
|
assert 'The link with the FranceConnect account has been deleted' in response.text
|
195 |
|
assert models.FcAccount.objects.count() == 0
|
196 |
|
continue_url = response.pyquery('a#a2-continue').attr['href']
|
197 |
|
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
|
198 |
|
assert app.session['fc_states'][state]['next'] == '/accounts/'
|
199 |
|
response = app.get(reverse('fc-logout') + '?state=' + state)
|
200 |
|
assert path(response['Location']) == '/accounts/'
|
201 |
|
|
202 |
|
|
203 |
|
def test_login_email_is_unique(app, fc_settings, caplog):
|
204 |
|
callback = reverse('fc-login-or-link')
|
205 |
|
response = app.get(callback, status=302)
|
206 |
|
location = response['Location']
|
207 |
|
state = check_authorization_url(location)
|
208 |
|
|
209 |
|
@httmock.urlmatch(path=r'.*/token$')
|
210 |
|
def access_token_response(url, request):
|
211 |
|
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
212 |
|
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
213 |
|
'grant_type'])
|
214 |
|
assert parsed['code'] == 'zzz'
|
215 |
|
assert parsed['client_id'] == 'xxx'
|
216 |
|
assert parsed['client_secret'] == 'yyy'
|
217 |
|
assert parsed['grant_type'] == 'authorization_code'
|
218 |
|
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
|
219 |
|
parsed_callback = urlparse.urlparse(callback)
|
220 |
|
assert parsed_redirect.path == parsed_callback.path
|
221 |
|
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
|
222 |
|
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
|
223 |
|
exp = now() + datetime.timedelta(seconds=1000)
|
224 |
|
id_token = {
|
225 |
|
'sub': '1234',
|
226 |
|
'aud': 'xxx',
|
227 |
|
'nonce': state,
|
228 |
|
'exp': int(exp.timestamp()),
|
229 |
|
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
230 |
|
}
|
231 |
|
return json.dumps({
|
232 |
|
'access_token': 'uuu',
|
233 |
|
'id_token': hmac_jwt(id_token, 'yyy')
|
234 |
|
})
|
235 |
|
|
236 |
|
@httmock.urlmatch(path=r'.*userinfo$')
|
237 |
|
def user_info_response(url, request):
|
238 |
|
assert request.headers['Authorization'] == 'Bearer uuu'
|
239 |
|
return json.dumps({
|
240 |
|
'sub': '1234',
|
241 |
|
'family_name': u'Frédérique',
|
242 |
|
'given_name': u'Ÿuñe',
|
243 |
|
'email': 'jOhn.dOe@eXample.com',
|
244 |
|
})
|
245 |
72 |
|
246 |
|
user = User.objects.create(email='john.doe@example.com', first_name='John', last_name='Doe')
|
247 |
|
user.set_password('toto')
|
248 |
|
user.save()
|
249 |
|
fc_settings.A2_EMAIL_IS_UNIQUE = True
|
250 |
|
with httmock.HTTMock(access_token_response, user_info_response):
|
251 |
|
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
|
|
73 |
|
|
74 |
def test_create(settings, app, franceconnect, hooks):
|
|
75 |
# test direct creation
|
|
76 |
settings.A2_FC_CREATE = True
|
|
77 |
|
|
78 |
response = app.get('/login/?service=portail&next=/idp/')
|
|
79 |
response = response.click(href='callback')
|
|
80 |
|
|
81 |
assert User.objects.count() == 0
|
|
82 |
response = franceconnect.handle_authorization(app, response.location, status=302)
|
252 |
83 |
assert User.objects.count() == 1
|
|
84 |
|
|
85 |
user = User.objects.get()
|
|
86 |
assert user.verified_attributes.first_name == 'Ÿuñe'
|
|
87 |
assert user.verified_attributes.last_name == 'Frédérique'
|
|
88 |
assert path(response['Location']) == '/idp/'
|
|
89 |
assert hooks.event[1]['kwargs']['name'] == 'login'
|
|
90 |
assert hooks.event[1]['kwargs']['service'] == 'portail'
|
|
91 |
# we must be connected
|
253 |
92 |
assert app.session['_auth_user_id']
|
|
93 |
assert app.session.get_expire_at_browser_close()
|
|
94 |
assert models.FcAccount.objects.count() == 1
|
254 |
95 |
|
255 |
|
# logout, test unlinking when logging with password
|
256 |
|
app.session.flush()
|
257 |
|
response = app.get('/login/')
|
258 |
|
response.form.set('username', User.objects.get().email)
|
259 |
|
response.form.set('password', 'toto')
|
260 |
|
response = response.form.submit(name='login-password-submit').follow()
|
|
96 |
# test unlink cancel case
|
|
97 |
response = app.get('/accounts/')
|
|
98 |
response = response.click('Delete link')
|
|
99 |
assert len(response.pyquery('[name=cancel][formnovalidate]')) == 1
|
|
100 |
response = response.form.submit(name='cancel')
|
|
101 |
response = response.follow()
|
261 |
102 |
|
|
103 |
# test unlink submit case
|
262 |
104 |
response = app.get('/accounts/')
|
263 |
105 |
response = response.click('Delete link')
|
|
106 |
response.form.set('new_password1', 'ikKL1234')
|
|
107 |
response.form.set('new_password2', 'ikKL1234')
|
|
108 |
response = response.form.submit(name='unlink')
|
|
109 |
assert 'The link with the FranceConnect account has been deleted' in response.text
|
|
110 |
assert models.FcAccount.objects.count() == 0
|
|
111 |
continue_url = response.pyquery('a#a2-continue').attr['href']
|
|
112 |
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
|
|
113 |
assert app.session['fc_states'][state]['next'] == '/accounts/'
|
|
114 |
response = app.get(reverse('fc-logout') + '?state=' + state)
|
|
115 |
assert path(response['Location']) == '/accounts/'
|
|
116 |
|
|
117 |
|
|
118 |
def test_create_expired(settings, app, franceconnect, hooks):
|
|
119 |
# test direct creation failure on an expired id_token
|
|
120 |
settings.A2_FC_CREATE = True
|
|
121 |
franceconnect.exp = now() - datetime.timedelta(seconds=30)
|
|
122 |
|
|
123 |
response = app.get('/login/?service=portail&next=/idp/')
|
|
124 |
response = response.click(href='callback')
|
|
125 |
|
|
126 |
assert User.objects.count() == 0
|
|
127 |
response = franceconnect.handle_authorization(app, response.location, status=302)
|
|
128 |
assert User.objects.count() == 0
|
|
129 |
|
|
130 |
|
|
131 |
def test_login_email_is_unique(settings, app, franceconnect, caplog):
|
|
132 |
settings.A2_EMAIL_IS_UNIQUE = True
|
|
133 |
user = User(
|
|
134 |
email='john.doe@example.com',
|
|
135 |
first_name='John',
|
|
136 |
last_name='Doe')
|
|
137 |
user.set_password('toto')
|
|
138 |
user.save()
|
|
139 |
franceconnect.user_info['email'] = user.email
|
|
140 |
|
|
141 |
assert User.objects.count() == 1
|
|
142 |
franceconnect.login_with_fc_fixed_params(app)
|
|
143 |
assert User.objects.count() == 1
|
|
144 |
assert app.session['_auth_user_id'] == str(user.pk)
|
|
145 |
|
|
146 |
|
|
147 |
def test_unlink_after_login_with_password(app, franceconnect, simple_user):
|
|
148 |
models.FcAccount.objects.create(user=simple_user, user_info='{}')
|
|
149 |
|
|
150 |
response = login(app, simple_user, path='/accounts/')
|
|
151 |
response = response.click('Delete link')
|
264 |
152 |
assert 'new_password1' not in response.form.fields
|
265 |
153 |
response = response.form.submit(name='unlink').follow()
|
266 |
154 |
assert 'The link with the FranceConnect account has been deleted' in response.text
|
|
155 |
# no logout from FC since we are not logged to it
|
267 |
156 |
assert response.request.path == '/accounts/'
|
268 |
157 |
|
269 |
158 |
|
270 |
|
def test_login_email_is_unique_and_already_linked(app, fc_settings, caplog):
|
271 |
|
callback = reverse('fc-login-or-link')
|
272 |
|
response = app.get(callback, status=302)
|
273 |
|
location = response['Location']
|
274 |
|
state = check_authorization_url(location)
|
|
159 |
def test_unlink_after_login_with_fc(app, franceconnect, simple_user):
|
|
160 |
models.FcAccount.objects.create(user=simple_user, sub=franceconnect.sub, user_info='{}')
|
275 |
161 |
|
276 |
|
EMAIL = 'john.doe@example.com'
|
277 |
|
SUB = '1234'
|
278 |
|
user = User.objects.create(email=EMAIL, first_name='John', last_name='Doe')
|
279 |
|
models.FcAccount.objects.create(user=user, sub='4567', token='xxx', user_info='{}')
|
|
162 |
response = franceconnect.login_with_fc(app, path='/accounts/')
|
|
163 |
response = response.click('Delete link')
|
|
164 |
response.form.set('new_password1', 'ikKL1234')
|
|
165 |
response.form.set('new_password2', 'ikKL1234')
|
|
166 |
response = response.form.submit(name='unlink')
|
|
167 |
assert 'The link with the FranceConnect account has been deleted' in response.text
|
|
168 |
assert models.FcAccount.objects.count() == 0
|
|
169 |
continue_url = response.pyquery('a#a2-continue').attr['href']
|
|
170 |
response = franceconnect.handle_logout(app, continue_url)
|
|
171 |
assert path(response.location) == '/accounts/'
|
280 |
172 |
|
281 |
|
@httmock.urlmatch(path=r'.*/token$')
|
282 |
|
def access_token_response(url, request):
|
283 |
|
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
284 |
|
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
285 |
|
'grant_type'])
|
286 |
|
assert parsed['code'] == 'zzz'
|
287 |
|
assert parsed['client_id'] == 'xxx'
|
288 |
|
assert parsed['client_secret'] == 'yyy'
|
289 |
|
assert parsed['grant_type'] == 'authorization_code'
|
290 |
|
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
|
291 |
|
parsed_callback = urlparse.urlparse(callback)
|
292 |
|
assert parsed_redirect.path == parsed_callback.path
|
293 |
|
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
|
294 |
|
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
|
295 |
|
exp = now() + datetime.timedelta(seconds=1000)
|
296 |
|
id_token = {
|
297 |
|
'sub': SUB,
|
298 |
|
'aud': 'xxx',
|
299 |
|
'nonce': state,
|
300 |
|
'exp': int(exp.timestamp()),
|
301 |
|
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
302 |
|
}
|
303 |
|
return json.dumps({
|
304 |
|
'access_token': 'uuu',
|
305 |
|
'id_token': hmac_jwt(id_token, 'yyy')
|
306 |
|
})
|
307 |
|
|
308 |
|
@httmock.urlmatch(path=r'.*userinfo$')
|
309 |
|
def user_info_response(url, request):
|
310 |
|
assert request.headers['Authorization'] == 'Bearer uuu'
|
311 |
|
return json.dumps({
|
312 |
|
'sub': '1234',
|
313 |
|
'family_name': u'Frédérique',
|
314 |
|
'given_name': u'Ÿuñe',
|
315 |
|
'email': EMAIL,
|
316 |
|
})
|
317 |
|
|
318 |
|
fc_settings.A2_EMAIL_IS_UNIQUE = True
|
319 |
|
with httmock.HTTMock(access_token_response, user_info_response):
|
320 |
|
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
|
321 |
|
assert 'is already used' in str(response)
|
322 |
|
assert User.objects.count() == 1
|
|
173 |
|
|
174 |
def test_login_email_is_unique_and_already_linked(settings, app, franceconnect, caplog):
|
|
175 |
settings.A2_EMAIL_IS_UNIQUE = True
|
|
176 |
|
|
177 |
# setup an already linked user account
|
|
178 |
user = User.objects.create(email='john.doe@example.com', first_name='John', last_name='Doe')
|
|
179 |
models.FcAccount.objects.create(user=user, sub='4567', token='xxx', user_info='{}')
|
|
180 |
response = app.get('/login/?service=portail&next=/idp/')
|
|
181 |
response = response.click(href='callback')
|
|
182 |
response = franceconnect.handle_authorization(app, response.location, status=302)
|
|
183 |
assert models.FcAccount.objects.count() == 1
|
|
184 |
assert 'is already used' in app.cookies['messages']
|
323 |
185 |
assert '_auth_user_id' not in app.session
|
324 |
186 |
|
325 |
187 |
|
326 |
|
def test_requests_proxies_support(app, fc_settings, caplog):
|
|
188 |
def test_requests_proxies_support(settings, app):
|
327 |
189 |
session = requests_retry_session()
|
328 |
190 |
assert session.proxies == {}
|
329 |
191 |
other_session = requests.Session()
|
... | ... | |
331 |
193 |
session = requests_retry_session(session=other_session)
|
332 |
194 |
assert session is other_session
|
333 |
195 |
assert session.proxies == {'http': 'http://example.net'}
|
334 |
|
fc_settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'}
|
|
196 |
|
|
197 |
settings.REQUESTS_PROXIES = {'https': 'http://pubproxy.com/api/proxy'}
|
335 |
198 |
session = requests_retry_session()
|
336 |
199 |
assert session.proxies == {'https': 'http://pubproxy.com/api/proxy'}
|
337 |
200 |
|
... | ... | |
341 |
204 |
assert mocked_send.call_args[1]['proxies'] == {'https': 'http://pubproxy.com/api/proxy'}
|
342 |
205 |
|
343 |
206 |
|
344 |
|
def test_password_reset(app, mailoutbox):
|
|
207 |
def test_no_password_with_fc_account_can_reset_password(app, db, mailoutbox):
|
345 |
208 |
user = User.objects.create(email='john.doe@example.com')
|
|
209 |
# No FC account, forbidden to set a password
|
346 |
210 |
response = app.get('/login/')
|
347 |
211 |
response = response.click('Reset it!').maybe_follow()
|
348 |
212 |
response.form['email'] = user.email
|
349 |
213 |
assert len(mailoutbox) == 0
|
350 |
214 |
response = response.form.submit()
|
351 |
215 |
assert len(mailoutbox) == 1
|
352 |
|
url = get_links_from_mail(mailoutbox[0])[0]
|
|
216 |
url = get_link_from_mail(mailoutbox[0])
|
|
217 |
response = app.get(url).follow().follow()
|
|
218 |
assert '_auth_user_id' not in app.session
|
|
219 |
assert 'not possible to reset' in response
|
|
220 |
|
|
221 |
# With FC account, can set a password
|
353 |
222 |
models.FcAccount.objects.create(user=user, sub='xxx', token='aaa')
|
354 |
|
response = app.get(url).maybe_follow()
|
355 |
|
assert 'new_password1' in response.form.fields
|
|
223 |
response = app.get('/login/')
|
|
224 |
response = response.click('Reset it!').maybe_follow()
|
|
225 |
response.form['email'] = user.email
|
|
226 |
assert len(mailoutbox) == 1
|
|
227 |
response = response.form.submit()
|
|
228 |
assert len(mailoutbox) == 2
|
|
229 |
url = get_link_from_mail(mailoutbox[1])
|
|
230 |
response = app.get(url, status=200)
|
|
231 |
response.form.set('new_password1', 'ikKL1234')
|
|
232 |
response.form.set('new_password2', 'ikKL1234')
|
|
233 |
response = response.form.submit().follow()
|
|
234 |
assert '_auth_user_id' in app.session
|
356 |
235 |
|
357 |
236 |
|
358 |
|
def test_registration1(app, fc_settings, caplog, hooks):
|
359 |
|
exp = now() + datetime.timedelta(seconds=1000)
|
360 |
|
response = app.get('/login/?service=portail&next=/idp/')
|
361 |
|
response = response.click(href="callback")
|
362 |
|
# 1. Try a login
|
363 |
|
# 2. Verify we come back to login page
|
364 |
|
# 3. Check presence of registration link
|
365 |
|
# 4. Follow it
|
366 |
|
location = response['Location']
|
367 |
|
state = check_authorization_url(location)
|
368 |
|
|
369 |
|
@httmock.urlmatch(path=r'.*/token$')
|
370 |
|
def access_token_response(url, request):
|
371 |
|
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
372 |
|
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
373 |
|
'grant_type'])
|
374 |
|
assert parsed['code'] == 'zzz'
|
375 |
|
assert parsed['client_id'] == 'xxx'
|
376 |
|
assert parsed['client_secret'] == 'yyy'
|
377 |
|
assert parsed['grant_type'] == 'authorization_code'
|
378 |
|
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
|
379 |
|
parsed_callback = urlparse.urlparse(callback)
|
380 |
|
assert parsed_redirect.path == parsed_callback.path
|
381 |
|
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
|
382 |
|
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
|
383 |
|
id_token = {
|
384 |
|
'sub': '1234',
|
385 |
|
'aud': 'xxx',
|
386 |
|
'nonce': state,
|
387 |
|
'exp': int(exp.timestamp()),
|
388 |
|
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
389 |
|
'email': 'john.doe@example.com',
|
390 |
|
}
|
391 |
|
return json.dumps({
|
392 |
|
'access_token': 'uuu',
|
393 |
|
'id_token': hmac_jwt(id_token, 'yyy')
|
394 |
|
})
|
395 |
|
|
396 |
|
@httmock.urlmatch(path=r'.*userinfo$')
|
397 |
|
def user_info_response(url, request):
|
398 |
|
assert request.headers['Authorization'] == 'Bearer uuu'
|
399 |
|
return json.dumps({
|
400 |
|
'sub': '1234',
|
401 |
|
'family_name': u'Frédérique',
|
402 |
|
'given_name': u'Ÿuñe',
|
403 |
|
'email': 'john.doe@example.com',
|
404 |
|
})
|
405 |
|
|
406 |
|
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
|
407 |
|
with httmock.HTTMock(access_token_response, user_info_response):
|
408 |
|
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
|
|
237 |
def test_registration1(settings, app, franceconnect, caplog, hooks):
|
|
238 |
response = franceconnect.login_with_fc_fixed_params(app)
|
409 |
239 |
assert User.objects.count() == 0
|
410 |
|
assert path(response['Location']) == '/login/'
|
|
240 |
assert path(response.location) == '/login/'
|
411 |
241 |
response = response.follow()
|
412 |
242 |
response = response.click(href='/accounts/fc/register')
|
413 |
|
location = response['Location']
|
414 |
|
location.startswith('http://testserver/accounts/activate/')
|
|
243 |
response.location.startswith('http://testserver/accounts/activate/')
|
|
244 |
assert User.objects.count() == 0
|
415 |
245 |
response = response.follow()
|
416 |
|
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
|
|
246 |
assert response.location.startswith('/fc/callback/')
|
|
247 |
# a new user has been created
|
|
248 |
assert User.objects.count() == 1
|
|
249 |
# but no FcAccount
|
|
250 |
assert models.FcAccount.objects.count() == 0
|
417 |
251 |
# we must be connected
|
418 |
252 |
assert app.session['_auth_user_id']
|
419 |
|
parsed_location = urlparse.urlparse(response['Location'])
|
420 |
|
parsed_callback = urlparse.urlparse(callback)
|
421 |
|
assert parsed_location.path == parsed_callback.path
|
422 |
|
assert (urlparse.parse_qs(parsed_location.query) ==
|
423 |
|
urlparse.parse_qs(parsed_callback.query))
|
|
253 |
# hook must have been called
|
|
254 |
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
|
|
255 |
|
424 |
256 |
response = response.follow()
|
425 |
|
location = response['Location']
|
426 |
|
state = check_authorization_url(location)
|
427 |
|
with httmock.HTTMock(access_token_response, user_info_response):
|
428 |
|
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
|
|
257 |
# a new redirect to FC is done
|
|
258 |
response = franceconnect.handle_authorization(app, response.location)
|
|
259 |
|
|
260 |
# FcAccount now exists
|
429 |
261 |
assert models.FcAccount.objects.count() == 1
|
430 |
262 |
user = User.objects.get()
|
431 |
|
assert user.verified_attributes.first_name == u'Ÿuñe'
|
432 |
|
assert user.verified_attributes.last_name == u'Frédérique'
|
|
263 |
assert user.verified_attributes.first_name == 'Ÿuñe'
|
|
264 |
assert user.verified_attributes.last_name == 'Frédérique'
|
|
265 |
|
433 |
266 |
response = app.get('/accounts/')
|
434 |
267 |
response = response.click('Delete link')
|
435 |
268 |
response.form.set('new_password1', 'ikKL1234')
|
... | ... | |
438 |
271 |
assert 'The link with the FranceConnect account has been deleted' in response.text
|
439 |
272 |
assert models.FcAccount.objects.count() == 0
|
440 |
273 |
continue_url = response.pyquery('a#a2-continue').attr['href']
|
441 |
|
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
|
442 |
|
assert app.session['fc_states'][state]['next'] == '/accounts/'
|
443 |
|
response = app.get(reverse('fc-logout') + '?state=' + state)
|
444 |
|
assert path(response['Location']) == '/accounts/'
|
|
274 |
response = franceconnect.handle_logout(app, continue_url)
|
|
275 |
assert path(response.location) == '/accounts/'
|
445 |
276 |
|
446 |
277 |
|
447 |
|
def test_registration2(app, fc_settings, caplog, hooks):
|
448 |
|
exp = now() + datetime.timedelta(seconds=1000)
|
|
278 |
def test_registration2(settings, app, franceconnect, hooks):
|
449 |
279 |
response = app.get('/login/?service=portail&next=/idp/')
|
450 |
280 |
response = response.click("Register")
|
451 |
281 |
response = response.click(href='callback')
|
452 |
|
# 1. Try a login
|
453 |
|
# 2. Verify we come back to login page
|
454 |
|
# 3. Check presence of registration link
|
455 |
|
# 4. Follow it
|
456 |
|
location = response['Location']
|
457 |
|
state = check_authorization_url(location)
|
458 |
|
|
459 |
|
@httmock.urlmatch(path=r'.*/token$')
|
460 |
|
def access_token_response(url, request):
|
461 |
|
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
462 |
|
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
463 |
|
'grant_type'])
|
464 |
|
assert parsed['code'] == 'zzz'
|
465 |
|
assert parsed['client_id'] == 'xxx'
|
466 |
|
assert parsed['client_secret'] == 'yyy'
|
467 |
|
assert parsed['grant_type'] == 'authorization_code'
|
468 |
|
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
|
469 |
|
parsed_callback = urlparse.urlparse(callback)
|
470 |
|
assert parsed_redirect.path == parsed_callback.path
|
471 |
|
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
|
472 |
|
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
|
473 |
|
id_token = {
|
474 |
|
'sub': '1234',
|
475 |
|
'aud': 'xxx',
|
476 |
|
'nonce': state,
|
477 |
|
'exp': int(exp.timestamp()),
|
478 |
|
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
479 |
|
'email': 'john.doe@example.com',
|
480 |
|
}
|
481 |
|
return json.dumps({
|
482 |
|
'access_token': 'uuu',
|
483 |
|
'id_token': hmac_jwt(id_token, 'yyy')
|
484 |
|
})
|
485 |
|
|
486 |
|
@httmock.urlmatch(path=r'.*userinfo$')
|
487 |
|
def user_info_response(url, request):
|
488 |
|
assert request.headers['Authorization'] == 'Bearer uuu'
|
489 |
|
return json.dumps({
|
490 |
|
'sub': '1234',
|
491 |
|
'family_name': u'Frédérique',
|
492 |
|
'given_name': u'Ÿuñe',
|
493 |
|
'email': 'john.doe@example.com',
|
494 |
|
})
|
495 |
|
|
496 |
|
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
|
497 |
|
with httmock.HTTMock(access_token_response, user_info_response):
|
498 |
|
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
|
|
282 |
franceconnect.callback_params['registration'] = ''
|
|
283 |
response = franceconnect.handle_authorization(app, response.location)
|
|
284 |
|
499 |
285 |
assert User.objects.count() == 0
|
500 |
|
assert path(response['Location']) == '/accounts/fc/register/'
|
|
286 |
assert path(response.location) == '/accounts/fc/register/'
|
501 |
287 |
response = response.follow()
|
502 |
|
location = response['Location']
|
503 |
|
location.startswith('http://testserver/accounts/activate/')
|
|
288 |
response.location.startswith('http://testserver/accounts/activate/')
|
504 |
289 |
response = response.follow()
|
|
290 |
assert User.objects.count() == 1
|
|
291 |
user = User.objects.get()
|
|
292 |
assert user.verified_attributes.first_name is None
|
|
293 |
assert user.verified_attributes.last_name is None
|
505 |
294 |
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
|
506 |
295 |
assert hooks.calls['event'][1]['kwargs']['service'] == 'portail'
|
507 |
296 |
# we must be connected
|
508 |
297 |
assert app.session['_auth_user_id']
|
509 |
|
# remove the registration parameter
|
510 |
|
callback = callback.replace('®istration=', '')
|
511 |
|
callback = callback.replace('?registration=', '?')
|
512 |
|
callback = callback.replace('?&', '?')
|
513 |
|
parsed_location = urlparse.urlparse(response['Location'])
|
514 |
|
parsed_callback = urlparse.urlparse(callback)
|
515 |
|
assert parsed_location.path == parsed_callback.path
|
516 |
|
assert (urlparse.parse_qs(parsed_location.query) ==
|
517 |
|
urlparse.parse_qs(parsed_callback.query))
|
518 |
298 |
response = response.follow()
|
519 |
|
location = response['Location']
|
520 |
|
state = check_authorization_url(location)
|
521 |
|
with httmock.HTTMock(access_token_response, user_info_response):
|
522 |
|
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
|
523 |
|
assert models.FcAccount.objects.count() == 1
|
|
299 |
|
|
300 |
del franceconnect.callback_params['registration']
|
|
301 |
response = franceconnect.handle_authorization(app, response.location)
|
524 |
302 |
user = User.objects.get()
|
525 |
|
assert user.verified_attributes.first_name == u'Ÿuñe'
|
526 |
|
assert user.verified_attributes.last_name == u'Frédérique'
|
527 |
|
response = app.get('/accounts/')
|
528 |
|
response = response.click('Delete link')
|
529 |
|
response.form.set('new_password1', 'ikKL1234')
|
530 |
|
response.form.set('new_password2', 'ikKL1234')
|
531 |
|
response = response.form.submit(name='unlink')
|
532 |
|
assert 'The link with the FranceConnect account has been deleted' in response.text
|
533 |
|
assert models.FcAccount.objects.count() == 0
|
534 |
|
continue_url = response.pyquery('a#a2-continue').attr['href']
|
535 |
|
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
|
536 |
|
assert app.session['fc_states'][state]['next'] == '/accounts/'
|
537 |
|
response = app.get(reverse('fc-logout') + '?state=' + state)
|
538 |
|
assert path(response['Location']) == '/accounts/'
|
|
303 |
assert user.verified_attributes.first_name == 'Ÿuñe'
|
|
304 |
assert user.verified_attributes.last_name == 'Frédérique'
|
539 |
305 |
|
540 |
306 |
|
541 |
|
def test_can_change_password(app, fc_settings, caplog, hooks):
|
542 |
|
exp = now() + datetime.timedelta(seconds=1000)
|
543 |
|
response = app.get('/login/?service=portail&next=/idp/')
|
544 |
|
response = response.click("Register")
|
545 |
|
response = response.click(href='callback')
|
546 |
|
# 1. Try a login
|
547 |
|
# 2. Verify we come back to login page
|
548 |
|
# 3. Check presence of registration link
|
549 |
|
# 4. Follow it
|
550 |
|
location = response['Location']
|
551 |
|
state = check_authorization_url(location)
|
552 |
|
|
553 |
|
@httmock.urlmatch(path=r'.*/token$')
|
554 |
|
def access_token_response(url, request):
|
555 |
|
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
556 |
|
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
557 |
|
'grant_type'])
|
558 |
|
assert parsed['code'] == 'zzz'
|
559 |
|
assert parsed['client_id'] == 'xxx'
|
560 |
|
assert parsed['client_secret'] == 'yyy'
|
561 |
|
assert parsed['grant_type'] == 'authorization_code'
|
562 |
|
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
|
563 |
|
parsed_callback = urlparse.urlparse(callback)
|
564 |
|
assert parsed_redirect.path == parsed_callback.path
|
565 |
|
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
|
566 |
|
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
|
567 |
|
id_token = {
|
568 |
|
'sub': '1234',
|
569 |
|
'aud': 'xxx',
|
570 |
|
'nonce': state,
|
571 |
|
'exp': int(exp.timestamp()),
|
572 |
|
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
573 |
|
'email': 'john.doe@example.com',
|
574 |
|
}
|
575 |
|
return json.dumps({
|
576 |
|
'access_token': 'uuu',
|
577 |
|
'id_token': hmac_jwt(id_token, 'yyy')
|
578 |
|
})
|
579 |
|
|
580 |
|
@httmock.urlmatch(path=r'.*userinfo$')
|
581 |
|
def user_info_response(url, request):
|
582 |
|
assert request.headers['Authorization'] == 'Bearer uuu'
|
583 |
|
return json.dumps({
|
584 |
|
'sub': '1234',
|
585 |
|
'family_name': u'Frédérique',
|
586 |
|
'given_name': u'Ÿuñe',
|
587 |
|
'email': 'john.doe@example.com',
|
588 |
|
})
|
589 |
|
|
590 |
|
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
|
591 |
|
with httmock.HTTMock(access_token_response, user_info_response):
|
592 |
|
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
|
593 |
|
assert User.objects.count() == 0
|
594 |
|
assert path(response['Location']) == '/accounts/fc/register/'
|
595 |
|
response = response.follow()
|
596 |
|
location = response['Location']
|
597 |
|
location.startswith('http://testserver/accounts/activate/')
|
598 |
|
response = response.follow()
|
599 |
|
assert hooks.calls['event'][0]['kwargs']['service'] == 'portail'
|
600 |
|
assert hooks.calls['event'][1]['kwargs']['service'] == 'portail'
|
601 |
|
# we must be connected
|
602 |
|
assert app.session['_auth_user_id']
|
603 |
|
# remove the registration parameter
|
604 |
|
callback = callback.replace('®istration=', '')
|
605 |
|
callback = callback.replace('?registration=', '?')
|
606 |
|
callback = callback.replace('?&', '?')
|
607 |
|
parsed_location = urlparse.urlparse(response['Location'])
|
608 |
|
parsed_callback = urlparse.urlparse(callback)
|
609 |
|
assert parsed_location.path == parsed_callback.path
|
610 |
|
assert (urlparse.parse_qs(parsed_location.query) ==
|
611 |
|
urlparse.parse_qs(parsed_callback.query))
|
612 |
|
response = response.follow()
|
613 |
|
location = response['Location']
|
614 |
|
state = check_authorization_url(location)
|
615 |
|
with httmock.HTTMock(access_token_response, user_info_response):
|
616 |
|
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
|
617 |
|
assert models.FcAccount.objects.count() == 1
|
618 |
|
user = User.objects.get()
|
619 |
|
assert user.verified_attributes.first_name == u'Ÿuñe'
|
620 |
|
assert user.verified_attributes.last_name == u'Frédérique'
|
621 |
|
response = app.get('/accounts/')
|
|
307 |
def test_can_change_password(settings, app, franceconnect):
|
|
308 |
user = User.objects.create(email='john.doe@example.com')
|
|
309 |
models.FcAccount.objects.create(user=user, sub=franceconnect.sub)
|
|
310 |
|
|
311 |
response = franceconnect.login_with_fc(app, path='/accounts/')
|
622 |
312 |
assert len(response.pyquery('[href*="password/change"]')) == 0
|
|
313 |
response = response.click('Logout')
|
|
314 |
response = franceconnect.handle_logout(app, response.location).follow()
|
|
315 |
assert '_auth_user_id' not in app.session
|
623 |
316 |
|
624 |
317 |
# Login with password
|
625 |
|
user = User.objects.get()
|
|
318 |
user.username = 'test'
|
626 |
319 |
user.set_password('test')
|
627 |
320 |
user.save()
|
628 |
|
app.session.flush()
|
629 |
|
response = app.get('/login/')
|
630 |
|
response.form.set('username', User.objects.get().email)
|
631 |
|
response.form.set('password', 'test')
|
632 |
|
response = response.form.submit(name='login-password-submit').follow()
|
633 |
|
response = app.get('/accounts/')
|
|
321 |
|
|
322 |
response = login(app, user, path='/accounts/')
|
634 |
323 |
assert len(response.pyquery('[href*="password/change"]')) > 0
|
|
324 |
response = response.click('Logout').follow()
|
635 |
325 |
|
636 |
326 |
# Relogin with FC
|
637 |
|
app.session.flush()
|
638 |
|
response = app.get('/login/?service=portail&next=/accounts/')
|
639 |
|
response = response.click(href='callback')
|
640 |
|
location = response['Location']
|
641 |
|
state = check_authorization_url(location)
|
642 |
|
callback = urlparse.parse_qs(urlparse.urlparse(location).query)['redirect_uri'][0]
|
643 |
|
with httmock.HTTMock(access_token_response, user_info_response):
|
644 |
|
response = app.get(callback + '&code=zzz&state=%s' % state, status=302)
|
645 |
|
# we must be connected
|
646 |
|
assert app.session['_auth_user_id']
|
647 |
|
assert path(response['Location']) == '/accounts/'
|
648 |
|
response = response.follow()
|
|
327 |
response = franceconnect.login_with_fc(app, path='/accounts/')
|
649 |
328 |
assert len(response.pyquery('[href*="password/change"]')) == 0
|
650 |
329 |
|
651 |
330 |
# Unlink
|
... | ... | |
653 |
332 |
response.form.set('new_password1', 'ikKL1234')
|
654 |
333 |
response.form.set('new_password2', 'ikKL1234')
|
655 |
334 |
response = response.form.submit(name='unlink')
|
|
335 |
assert 'The link with the FranceConnect account has been deleted' in response.text
|
656 |
336 |
continue_url = response.pyquery('a#a2-continue').attr['href']
|
657 |
|
state = urlparse.parse_qs(urlparse.urlparse(continue_url).query)['state'][0]
|
658 |
|
assert app.session['fc_states'][state]['next'] == '/accounts/'
|
659 |
|
response = app.get(reverse('fc-logout') + '?state=' + state)
|
660 |
|
assert path(response['Location']) == '/accounts/'
|
661 |
|
response = response.follow()
|
|
337 |
response = franceconnect.handle_logout(app, continue_url).follow()
|
662 |
338 |
assert len(response.pyquery('[href*="password/change"]')) > 0
|
663 |
339 |
|
664 |
340 |
|
665 |
|
def test_invalid_next_url(app, fc_settings, caplog, hooks):
|
|
341 |
def test_invalid_next_url(app, franceconnect):
|
666 |
342 |
assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ'
|
667 |
343 |
|
668 |
344 |
|
669 |
|
def test_manager_user_sidebar(app, fc_settings, superuser, simple_user):
|
|
345 |
def test_manager_user_sidebar(app, superuser, simple_user):
|
670 |
346 |
login(app, superuser, '/manage/')
|
671 |
347 |
response = app.get('/manage/users/%s/' % simple_user.id)
|
672 |
348 |
assert 'FranceConnect' not in response
|