0002-auth_fc-do-not-update-non-uniq-email-returned-by-FC-.patch
src/authentic2_auth_fc/views.py | ||
---|---|---|
352 | 352 |
def users_having_email(self, email): |
353 | 353 |
default_ou = get_default_ou() |
354 | 354 |
User = get_user_model() |
355 | 355 |
qs = User.objects.filter(email__iexact=email) |
356 | 356 |
if not a2_app_settings.A2_EMAIL_IS_UNIQUE and default_ou.email_is_unique: |
357 | 357 |
qs = qs.filter(ou=default_ou) |
358 | 358 |
return qs |
359 | 359 | |
360 |
def update_user_info(self): |
|
360 |
def update_user_info(self, request):
|
|
361 | 361 |
self.fc_account.token = json.dumps(self.token) |
362 | 362 |
self.fc_account.user_info = json.dumps(self.user_info) |
363 | 363 |
self.fc_account.save(update_fields=['token', 'user_info']) |
364 | ||
365 |
# do not update email if it is not unique |
|
366 |
if self.email_is_unique(): |
|
367 |
mappings = app_settings.user_info_mappings |
|
368 |
mapping = {'ref': mappings['email']} |
|
369 |
email = '' |
|
370 |
try: |
|
371 |
email = utils.mapping_to_value(mapping, self.user_info) |
|
372 |
except (ValueError, KeyError, NotImplementedError): |
|
373 |
pass |
|
374 |
if email: |
|
375 |
qs = self.users_having_email(email) |
|
376 |
qs = qs.exclude(uuid=self.fc_account.user.uuid) |
|
377 |
if qs.exists(): |
|
378 |
# there should not be other accounts with the same mail |
|
379 |
self.logger.error(u'other account already use the same mail %s, %s', |
|
380 |
email, list(qs)) |
|
381 |
messages.warning(request, _( |
|
382 |
'Your FranceConnect email address \'%s\' is already used by another ' |
|
383 |
'account, so we cannot update your account for you. Please ensure your ' |
|
384 |
'account email is still up to date using your account management page.' |
|
385 |
) % email) |
|
386 |
del self.user_info[mapping['ref']] |
|
387 | ||
364 | 388 |
utils.apply_user_info_mappings(self.fc_account.user, self.user_info) |
365 | 389 |
self.logger.debug('updating user_info %s', self.fc_account.user_info) |
366 | 390 | |
367 | 391 |
def uniqueness_check_failed(self, request): |
368 | 392 |
if request.user.is_authenticated(): |
369 | 393 |
# currently logged : |
370 | 394 |
if models.FcAccount.objects.filter(user=request.user, order=0).count(): |
371 | 395 |
# cannot link because we are already linked to another FC account |
... | ... | |
397 | 421 | |
398 | 422 |
if created: |
399 | 423 |
self.logger.info('fc link created sub %s', self.sub) |
400 | 424 |
messages.info(request, |
401 | 425 |
_('Your FranceConnect account {} has been linked.').format(self.fc_display_name)) |
402 | 426 |
hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request) |
403 | 427 |
else: |
404 | 428 |
messages.info(request, _('Your local account has been updated.')) |
405 |
self.update_user_info() |
|
429 |
self.update_user_info(request)
|
|
406 | 430 |
return self.redirect(request) |
407 | 431 | |
408 | 432 |
user = a2_utils.authenticate( |
409 | 433 |
request, |
410 | 434 |
sub=self.sub, |
411 | 435 |
user_info=self.user_info, |
412 | 436 |
token=self.token) |
413 | 437 |
if user: |
... | ... | |
453 | 477 |
if user: |
454 | 478 |
views_utils.check_cookie_works(request) |
455 | 479 |
a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug) |
456 | 480 |
# set session expiration policy to EXPIRE_AT_BROWSER_CLOSE |
457 | 481 |
request.session.set_expiry(0) |
458 | 482 |
self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user) |
459 | 483 |
self.fc_account.token = json.dumps(self.token) |
460 | 484 |
self.fc_account.save(update_fields=['token']) |
461 |
self.update_user_info() |
|
485 |
self.update_user_info(request)
|
|
462 | 486 |
self.logger.info('logged in using fc sub %s', self.sub) |
463 | 487 |
return self.redirect(request) |
464 | 488 |
else: |
465 | 489 |
params = {} |
466 | 490 |
if self.service_slug: |
467 | 491 |
params[constants.SERVICE_FIELD_NAME] = self.service_slug |
468 | 492 |
if registration: |
469 | 493 |
return self.redirect_and_come_back(request, |
tests/auth_fc/test_auth_fc.py | ||
---|---|---|
650 | 650 |
response = app.get(reverse('fc-logout') + '?state=' + state) |
651 | 651 |
assert path(response['Location']) == '/accounts/' |
652 | 652 |
response = response.follow() |
653 | 653 |
assert len(response.pyquery('[href*="password/change"]')) > 0 |
654 | 654 | |
655 | 655 | |
656 | 656 |
def test_invalid_next_url(app, fc_settings, caplog, hooks): |
657 | 657 |
assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ' |
658 | ||
659 | ||
660 |
def test_update_user_info_having_new_email_from_fc(app, fc_settings, caplog): |
|
661 |
# 1. we have 2 accounts havinf one linked to FC |
|
662 |
# 2. mail is updated if FC return a new one, |
|
663 |
# 3. but only if the mail is not already used |
|
664 | ||
665 |
callback = reverse('fc-login-or-link') |
|
666 |
response = app.get(callback, status=302) |
|
667 |
location = response['Location'] |
|
668 |
state = check_authorization_url(location) |
|
669 | ||
670 |
EMAIL1 = 'fred@example.com' |
|
671 |
EMAIL2 = 'foo@example.com' |
|
672 |
EMAIL3 = 'john.doe@example.com' |
|
673 |
SUB = '1234' |
|
674 |
user1 = User.objects.create(email=EMAIL1, first_name='Frédérique', last_name='Ÿuñe') |
|
675 |
user1.save() |
|
676 |
models.FcAccount.objects.create(user=user1, sub='1234', token='xxx', user_info='{}') |
|
677 |
user2 = User.objects.create(email=EMAIL3, first_name='John', last_name='Doe') |
|
678 |
user2.save() |
|
679 | ||
680 |
@httmock.urlmatch(path=r'.*/token$') |
|
681 |
def access_token_response(url, request): |
|
682 |
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()} |
|
683 |
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri', |
|
684 |
'grant_type']) |
|
685 |
assert parsed['code'] == 'zzz' |
|
686 |
assert parsed['client_id'] == 'xxx' |
|
687 |
assert parsed['client_secret'] == 'yyy' |
|
688 |
assert parsed['grant_type'] == 'authorization_code' |
|
689 |
parsed_redirect = urlparse.urlparse(parsed['redirect_uri']) |
|
690 |
parsed_callback = urlparse.urlparse(callback) |
|
691 |
assert parsed_redirect.path == parsed_callback.path |
|
692 |
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items(): |
|
693 |
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value |
|
694 |
exp = now() + datetime.timedelta(seconds=1000) |
|
695 |
id_token = { |
|
696 |
'sub': SUB, |
|
697 |
'aud': 'xxx', |
|
698 |
'nonce': state, |
|
699 |
'exp': int(exp.timestamp()), |
|
700 |
'iss': 'https://fcp.integ01.dev-franceconnect.fr/', |
|
701 |
} |
|
702 |
return json.dumps({ |
|
703 |
'access_token': 'uuu', |
|
704 |
'id_token': hmac_jwt(id_token, 'yyy') |
|
705 |
}) |
|
706 | ||
707 |
@httmock.urlmatch(path=r'.*userinfo$') |
|
708 |
def user_info_response(url, request): |
|
709 |
assert request.headers['Authorization'] == 'Bearer uuu' |
|
710 |
return json.dumps({ |
|
711 |
'sub': SUB, |
|
712 |
'family_name': u'Frédérique', |
|
713 |
'given_name': u'Ÿuñe', |
|
714 |
'email': email, |
|
715 |
}) |
|
716 | ||
717 |
fc_settings.A2_EMAIL_IS_UNIQUE = True |
|
718 |
email = EMAIL2 |
|
719 | ||
720 |
# user not already authenticated |
|
721 |
with httmock.HTTMock(access_token_response, user_info_response): |
|
722 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302) |
|
723 |
assert User.objects.count() == 2 |
|
724 |
assert User.objects.get(last_name='Frédérique').email == 'foo@example.com' |
|
725 |
assert app.session['_auth_user_id'] |
|
726 |
response = response.follow() |
|
727 |
assert not response.html.find('li', {'class': 'warning'}) |
|
728 | ||
729 |
# user is authenticated |
|
730 |
with httmock.HTTMock(access_token_response, user_info_response): |
|
731 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302) |
|
732 |
assert User.objects.count() == 2 |
|
733 |
assert User.objects.get(last_name='Frédérique').email == 'foo@example.com' |
|
734 |
assert app.session['_auth_user_id'] |
|
735 |
response = response.follow() |
|
736 |
assert not response.html.find('li', {'class': 'warning'}) |
|
737 | ||
738 |
# user get a new mail from FC |
|
739 |
email = EMAIL3 |
|
740 |
with httmock.HTTMock(access_token_response, user_info_response): |
|
741 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302) |
|
742 |
assert User.objects.count() == 2 |
|
743 |
assert User.objects.get(last_name='Frédérique').email == 'foo@example.com' |
|
744 |
assert app.session['_auth_user_id'] |
|
745 |
response = response.follow() |
|
746 |
assert 'already used by another account' in response.html.find( |
|
747 |
'li', {'class': 'warning'}).text |
|
748 | ||
749 |
fc_settings.A2_EMAIL_IS_UNIQUE = False |
|
750 |
with httmock.HTTMock(access_token_response, user_info_response): |
|
751 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302) |
|
752 |
assert User.objects.count() == 2 |
|
753 |
assert User.objects.get(last_name='Frédérique').email == 'john.doe@example.com' |
|
754 |
assert app.session['_auth_user_id'] |
|
658 |
- |