Projet

Général

Profil

0001-auth_fc-do-not-update-non-uniq-email-returned-by-FC-.patch

Nicolas Roche, 28 juillet 2020 18:25

Télécharger (9,33 ko)

Voir les différences:

Subject: [PATCH] auth_fc: do not update non-uniq email returned by FC (#45199)

 src/authentic2_auth_fc/views.py | 37 ++++++++++++--
 tests/auth_fc/test_auth_fc.py   | 85 +++++++++++++++++++++++++++++++++
 2 files changed, 119 insertions(+), 3 deletions(-)
src/authentic2_auth_fc/views.py
340 340

  
341 341

  
342 342
class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
343 343
    '''Login with FC, if the FC account is already linked, connect this user,
344 344
       if a user is logged link the user to this account, otherwise display an
345 345
       error message.
346 346
    '''
347 347

  
348
    def update_user_info(self):
348
    def update_user_info(self, request):
349 349
        self.fc_account.token = json.dumps(self.token)
350 350
        self.fc_account.user_info = json.dumps(self.user_info)
351 351
        self.fc_account.save(update_fields=['token', 'user_info'])
352

  
353
        # do not update email if it is not unique
354
        default_ou = get_default_ou()
355
        email_is_unique = a2_app_settings.A2_EMAIL_IS_UNIQUE or default_ou.email_is_unique
356
        if email_is_unique:
357
            mappings = app_settings.user_info_mappings
358
            mapping = {'ref': mappings['email']}
359
            email = ''
360
            try:
361
                email = utils.mapping_to_value(mapping, self.user_info)
362
            except (ValueError, KeyError, NotImplementedError):
363
                pass
364
            if email:
365
                User = get_user_model()
366
                qs = User.objects.filter(email__iexact=email)
367
                if not a2_app_settings.A2_EMAIL_IS_UNIQUE and default_ou.email_is_unique:
368
                    qs = qs.filter(ou=default_ou)
369

  
370
                qs = qs.exclude(uuid=self.fc_account.user.uuid)
371
                if qs.exists():
372
                # there should not be other accounts with the same mail
373
                    if self.logger:
374
                        self.logger.error(u'other account already use the same mail %s, %s',
375
                                          email, list(qs))
376
                    messages.warning(request, _(
377
                        'Your FranceConnect email address \'%s\' is already used by another '
378
                        'account, so we cannot update your account for you. Please ensure your '
379
                        'account email is still up to date using your account management page.'
380
                    ) % email)
381
                    del self.user_info[mapping['ref']]
382

  
352 383
        utils.apply_user_info_mappings(self.fc_account.user, self.user_info)
353 384
        self.logger.debug('updating user_info %s', self.fc_account.user_info)
354 385

  
355 386
    def uniqueness_check_failed(self, request):
356 387
        if request.user.is_authenticated():
357 388
            # currently logged :
358 389
            if models.FcAccount.objects.filter(user=request.user, order=0).count():
359 390
                # cannot link because we are already linked to another FC account
......
385 416

  
386 417
            if created:
387 418
                self.logger.info('fc link created sub %s', self.sub)
388 419
                messages.info(request,
389 420
                              _('Your FranceConnect account {} has been linked.').format(self.fc_display_name))
390 421
                hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request)
391 422
            else:
392 423
                messages.info(request, _('Your local account has been updated.'))
393
            self.update_user_info()
424
            self.update_user_info(request)
394 425
            return self.redirect(request)
395 426

  
396 427
        default_ou = get_default_ou()
397 428
        email_is_unique = a2_app_settings.A2_EMAIL_IS_UNIQUE or default_ou.email_is_unique
398 429
        user = a2_utils.authenticate(
399 430
            request,
400 431
            sub=self.sub,
401 432
            user_info=self.user_info,
......
447 478
        if user:
448 479
            views_utils.check_cookie_works(request)
449 480
            a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug)
450 481
            # set session expiration policy to EXPIRE_AT_BROWSER_CLOSE
451 482
            request.session.set_expiry(0)
452 483
            self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user)
453 484
            self.fc_account.token = json.dumps(self.token)
454 485
            self.fc_account.save(update_fields=['token'])
455
            self.update_user_info()
486
            self.update_user_info(request)
456 487
            self.logger.info('logged in using fc sub %s', self.sub)
457 488
            return self.redirect(request)
458 489
        else:
459 490
            params = {}
460 491
            if self.service_slug:
461 492
                params[constants.SERVICE_FIELD_NAME] = self.service_slug
462 493
            if registration:
463 494
                return self.redirect_and_come_back(request,
tests/auth_fc/test_auth_fc.py
650 650
    response = app.get(reverse('fc-logout') + '?state=' + state)
651 651
    assert path(response['Location']) == '/accounts/'
652 652
    response = response.follow()
653 653
    assert len(response.pyquery('[href*="password/change"]')) > 0
654 654

  
655 655

  
656 656
def test_invalid_next_url(app, fc_settings, caplog, hooks):
657 657
    assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ'
658

  
659

  
660
def test_update_user_email_from_fc(app, fc_settings, caplog):
661
    # 1. we have 2 accounts havinf one linked to FC
662
    # 2. mail is updated if FC return a new one,
663
    # 3. but only if the mail is not already used
664

  
665
    callback = reverse('fc-login-or-link')
666
    response = app.get(callback, status=302)
667
    location = response['Location']
668
    state = check_authorization_url(location)
669

  
670
    EMAIL1 = 'fred@example.com'
671
    EMAIL2 = 'foo@example.com'
672
    EMAIL3 = 'john.doe@example.com'
673
    SUB = '1234'
674
    user1 = User.objects.create(email=EMAIL1, first_name='Frédérique', last_name='Ÿuñe')
675
    user1.save()
676
    models.FcAccount.objects.create(user=user1, sub='1234', token='xxx', user_info='{}')
677
    user2 = User.objects.create(email=EMAIL3, first_name='John', last_name='Doe')
678
    user2.save()
679

  
680
    @httmock.urlmatch(path=r'.*/token$')
681
    def access_token_response(url, request):
682
        parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
683
        assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
684
                                          'grant_type'])
685
        assert parsed['code'] == 'zzz'
686
        assert parsed['client_id'] == 'xxx'
687
        assert parsed['client_secret'] == 'yyy'
688
        assert parsed['grant_type'] == 'authorization_code'
689
        parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
690
        parsed_callback = urlparse.urlparse(callback)
691
        assert parsed_redirect.path == parsed_callback.path
692
        for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
693
            urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
694
        exp = now() + datetime.timedelta(seconds=1000)
695
        id_token = {
696
            'sub': SUB,
697
            'aud': 'xxx',
698
            'nonce': state,
699
            'exp': int(exp.timestamp()),
700
            'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
701
        }
702
        return json.dumps({
703
            'access_token': 'uuu',
704
            'id_token': hmac_jwt(id_token, 'yyy')
705
        })
706

  
707
    @httmock.urlmatch(path=r'.*userinfo$')
708
    def user_info_response(url, request):
709
        assert request.headers['Authorization'] == 'Bearer uuu'
710
        return json.dumps({
711
            'sub': SUB,
712
            'family_name': u'Frédérique',
713
            'given_name': u'Ÿuñe',
714
            'email': email,
715
        })
716

  
717
    fc_settings.A2_EMAIL_IS_UNIQUE = True
718
    email = EMAIL2
719
    with httmock.HTTMock(access_token_response, user_info_response):
720
        response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
721
    assert User.objects.count() == 2
722
    assert User.objects.get(last_name='Frédérique').email == 'foo@example.com'
723
    assert app.session['_auth_user_id']
724
    response = response.follow()
725
    assert not response.html.find('li', {'class': 'warning'})
726

  
727
    email = EMAIL3
728
    with httmock.HTTMock(access_token_response, user_info_response):
729
        response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
730
    assert User.objects.count() == 2
731
    assert User.objects.get(last_name='Frédérique').email == 'foo@example.com'
732
    assert app.session['_auth_user_id']
733
    response = response.follow()
734
    assert 'already used by another account' in response.html.find(
735
        'li', {'class': 'warning'}).text
736

  
737
    fc_settings.A2_EMAIL_IS_UNIQUE = False
738
    with httmock.HTTMock(access_token_response, user_info_response):
739
        response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
740
    assert User.objects.count() == 2
741
    assert User.objects.get(last_name='Frédérique').email == 'john.doe@example.com'
742
    assert app.session['_auth_user_id']
658
-