0001-auth_fc-do-not-update-non-uniq-email-returned-by-FC-.patch
src/authentic2_auth_fc/views.py | ||
---|---|---|
340 | 340 | |
341 | 341 | |
342 | 342 |
class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View): |
343 | 343 |
'''Login with FC, if the FC account is already linked, connect this user, |
344 | 344 |
if a user is logged link the user to this account, otherwise display an |
345 | 345 |
error message. |
346 | 346 |
''' |
347 | 347 | |
348 |
def update_user_info(self): |
|
348 |
def update_user_info(self, request):
|
|
349 | 349 |
self.fc_account.token = json.dumps(self.token) |
350 | 350 |
self.fc_account.user_info = json.dumps(self.user_info) |
351 | 351 |
self.fc_account.save(update_fields=['token', 'user_info']) |
352 | ||
353 |
# do not update email if it is not unique |
|
354 |
default_ou = get_default_ou() |
|
355 |
email_is_unique = a2_app_settings.A2_EMAIL_IS_UNIQUE or default_ou.email_is_unique |
|
356 |
if email_is_unique: |
|
357 |
mappings = app_settings.user_info_mappings |
|
358 |
mapping = {'ref': mappings['email']} |
|
359 |
email = '' |
|
360 |
try: |
|
361 |
email = utils.mapping_to_value(mapping, self.user_info) |
|
362 |
except (ValueError, KeyError, NotImplementedError): |
|
363 |
pass |
|
364 |
if email: |
|
365 |
User = get_user_model() |
|
366 |
qs = User.objects.filter(email__iexact=email) |
|
367 |
if not a2_app_settings.A2_EMAIL_IS_UNIQUE and default_ou.email_is_unique: |
|
368 |
qs = qs.filter(ou=default_ou) |
|
369 | ||
370 |
qs = qs.exclude(uuid=self.fc_account.user.uuid) |
|
371 |
if qs.exists(): |
|
372 |
# there should not be other accounts with the same mail |
|
373 |
if self.logger: |
|
374 |
self.logger.error(u'other account already use the same mail %s, %s', |
|
375 |
email, list(qs)) |
|
376 |
messages.warning(request, _( |
|
377 |
'Your FranceConnect email address \'%s\' is already used by another ' |
|
378 |
'account, so we cannot update your account for you. Please ensure your ' |
|
379 |
'account email is still up to date using your account management page.' |
|
380 |
) % email) |
|
381 |
del self.user_info[mapping['ref']] |
|
382 | ||
352 | 383 |
utils.apply_user_info_mappings(self.fc_account.user, self.user_info) |
353 | 384 |
self.logger.debug('updating user_info %s', self.fc_account.user_info) |
354 | 385 | |
355 | 386 |
def uniqueness_check_failed(self, request): |
356 | 387 |
if request.user.is_authenticated(): |
357 | 388 |
# currently logged : |
358 | 389 |
if models.FcAccount.objects.filter(user=request.user, order=0).count(): |
359 | 390 |
# cannot link because we are already linked to another FC account |
... | ... | |
385 | 416 | |
386 | 417 |
if created: |
387 | 418 |
self.logger.info('fc link created sub %s', self.sub) |
388 | 419 |
messages.info(request, |
389 | 420 |
_('Your FranceConnect account {} has been linked.').format(self.fc_display_name)) |
390 | 421 |
hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request) |
391 | 422 |
else: |
392 | 423 |
messages.info(request, _('Your local account has been updated.')) |
393 |
self.update_user_info() |
|
424 |
self.update_user_info(request)
|
|
394 | 425 |
return self.redirect(request) |
395 | 426 | |
396 | 427 |
default_ou = get_default_ou() |
397 | 428 |
email_is_unique = a2_app_settings.A2_EMAIL_IS_UNIQUE or default_ou.email_is_unique |
398 | 429 |
user = a2_utils.authenticate( |
399 | 430 |
request, |
400 | 431 |
sub=self.sub, |
401 | 432 |
user_info=self.user_info, |
... | ... | |
447 | 478 |
if user: |
448 | 479 |
views_utils.check_cookie_works(request) |
449 | 480 |
a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug) |
450 | 481 |
# set session expiration policy to EXPIRE_AT_BROWSER_CLOSE |
451 | 482 |
request.session.set_expiry(0) |
452 | 483 |
self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user) |
453 | 484 |
self.fc_account.token = json.dumps(self.token) |
454 | 485 |
self.fc_account.save(update_fields=['token']) |
455 |
self.update_user_info() |
|
486 |
self.update_user_info(request)
|
|
456 | 487 |
self.logger.info('logged in using fc sub %s', self.sub) |
457 | 488 |
return self.redirect(request) |
458 | 489 |
else: |
459 | 490 |
params = {} |
460 | 491 |
if self.service_slug: |
461 | 492 |
params[constants.SERVICE_FIELD_NAME] = self.service_slug |
462 | 493 |
if registration: |
463 | 494 |
return self.redirect_and_come_back(request, |
tests/auth_fc/test_auth_fc.py | ||
---|---|---|
650 | 650 |
response = app.get(reverse('fc-logout') + '?state=' + state) |
651 | 651 |
assert path(response['Location']) == '/accounts/' |
652 | 652 |
response = response.follow() |
653 | 653 |
assert len(response.pyquery('[href*="password/change"]')) > 0 |
654 | 654 | |
655 | 655 | |
656 | 656 |
def test_invalid_next_url(app, fc_settings, caplog, hooks): |
657 | 657 |
assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ' |
658 | ||
659 | ||
660 |
def test_update_user_email_from_fc(app, fc_settings, caplog): |
|
661 |
# 1. we have 2 accounts havinf one linked to FC |
|
662 |
# 2. mail is updated if FC return a new one, |
|
663 |
# 3. but only if the mail is not already used |
|
664 | ||
665 |
callback = reverse('fc-login-or-link') |
|
666 |
response = app.get(callback, status=302) |
|
667 |
location = response['Location'] |
|
668 |
state = check_authorization_url(location) |
|
669 | ||
670 |
EMAIL1 = 'fred@example.com' |
|
671 |
EMAIL2 = 'foo@example.com' |
|
672 |
EMAIL3 = 'john.doe@example.com' |
|
673 |
SUB = '1234' |
|
674 |
user1 = User.objects.create(email=EMAIL1, first_name='Frédérique', last_name='Ÿuñe') |
|
675 |
user1.save() |
|
676 |
models.FcAccount.objects.create(user=user1, sub='1234', token='xxx', user_info='{}') |
|
677 |
user2 = User.objects.create(email=EMAIL3, first_name='John', last_name='Doe') |
|
678 |
user2.save() |
|
679 | ||
680 |
@httmock.urlmatch(path=r'.*/token$') |
|
681 |
def access_token_response(url, request): |
|
682 |
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()} |
|
683 |
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri', |
|
684 |
'grant_type']) |
|
685 |
assert parsed['code'] == 'zzz' |
|
686 |
assert parsed['client_id'] == 'xxx' |
|
687 |
assert parsed['client_secret'] == 'yyy' |
|
688 |
assert parsed['grant_type'] == 'authorization_code' |
|
689 |
parsed_redirect = urlparse.urlparse(parsed['redirect_uri']) |
|
690 |
parsed_callback = urlparse.urlparse(callback) |
|
691 |
assert parsed_redirect.path == parsed_callback.path |
|
692 |
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items(): |
|
693 |
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value |
|
694 |
exp = now() + datetime.timedelta(seconds=1000) |
|
695 |
id_token = { |
|
696 |
'sub': SUB, |
|
697 |
'aud': 'xxx', |
|
698 |
'nonce': state, |
|
699 |
'exp': int(exp.timestamp()), |
|
700 |
'iss': 'https://fcp.integ01.dev-franceconnect.fr/', |
|
701 |
} |
|
702 |
return json.dumps({ |
|
703 |
'access_token': 'uuu', |
|
704 |
'id_token': hmac_jwt(id_token, 'yyy') |
|
705 |
}) |
|
706 | ||
707 |
@httmock.urlmatch(path=r'.*userinfo$') |
|
708 |
def user_info_response(url, request): |
|
709 |
assert request.headers['Authorization'] == 'Bearer uuu' |
|
710 |
return json.dumps({ |
|
711 |
'sub': SUB, |
|
712 |
'family_name': u'Frédérique', |
|
713 |
'given_name': u'Ÿuñe', |
|
714 |
'email': email, |
|
715 |
}) |
|
716 | ||
717 |
fc_settings.A2_EMAIL_IS_UNIQUE = True |
|
718 |
email = EMAIL2 |
|
719 |
with httmock.HTTMock(access_token_response, user_info_response): |
|
720 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302) |
|
721 |
assert User.objects.count() == 2 |
|
722 |
assert User.objects.get(last_name='Frédérique').email == 'foo@example.com' |
|
723 |
assert app.session['_auth_user_id'] |
|
724 |
response = response.follow() |
|
725 |
assert not response.html.find('li', {'class': 'warning'}) |
|
726 | ||
727 |
email = EMAIL3 |
|
728 |
with httmock.HTTMock(access_token_response, user_info_response): |
|
729 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302) |
|
730 |
assert User.objects.count() == 2 |
|
731 |
assert User.objects.get(last_name='Frédérique').email == 'foo@example.com' |
|
732 |
assert app.session['_auth_user_id'] |
|
733 |
response = response.follow() |
|
734 |
assert 'already used by another account' in response.html.find( |
|
735 |
'li', {'class': 'warning'}).text |
|
736 | ||
737 |
fc_settings.A2_EMAIL_IS_UNIQUE = False |
|
738 |
with httmock.HTTMock(access_token_response, user_info_response): |
|
739 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302) |
|
740 |
assert User.objects.count() == 2 |
|
741 |
assert User.objects.get(last_name='Frédérique').email == 'john.doe@example.com' |
|
742 |
assert app.session['_auth_user_id'] |
|
658 |
- |