Projet

Général

Profil

0002-auth_fc-do-not-update-non-uniq-email-returned-by-FC-.patch

Nicolas Roche, 29 juillet 2020 22:35

Télécharger (8,98 ko)

Voir les différences:

Subject: [PATCH 2/2] auth_fc: do not update non-uniq email returned by FC
 (#45199)

 src/authentic2_auth_fc/views.py | 30 ++++++++++--
 tests/auth_fc/test_auth_fc.py   | 85 +++++++++++++++++++++++++++++++++
 2 files changed, 112 insertions(+), 3 deletions(-)
src/authentic2_auth_fc/views.py
352 352
    def users_having_email(self, email):
353 353
        default_ou = get_default_ou()
354 354
        User = get_user_model()
355 355
        qs = User.objects.filter(email__iexact=email)
356 356
        if not a2_app_settings.A2_EMAIL_IS_UNIQUE and default_ou.email_is_unique:
357 357
            qs = qs.filter(ou=default_ou)
358 358
        return qs
359 359

  
360
    def update_user_info(self):
360
    def update_user_info(self, request):
361 361
        self.fc_account.token = json.dumps(self.token)
362 362
        self.fc_account.user_info = json.dumps(self.user_info)
363 363
        self.fc_account.save(update_fields=['token', 'user_info'])
364

  
365
        # do not update email if it is not unique
366
        if self.email_is_unique():
367
            mappings = app_settings.user_info_mappings
368
            mapping = {'ref': mappings['email']}
369
            email = ''
370
            try:
371
                email = utils.mapping_to_value(mapping, self.user_info)
372
            except (ValueError, KeyError, NotImplementedError):
373
                pass
374
            if email:
375
                qs = self.users_having_email(email)
376
                qs = qs.exclude(uuid=self.fc_account.user.uuid)
377
                if qs.exists():
378
                # there should not be other accounts with the same mail
379
                    self.logger.error(u'other account already use the same mail %s, %s',
380
                                      email, list(qs))
381
                    messages.warning(request, _(
382
                        'Your FranceConnect email address \'%s\' is already used by another '
383
                        'account, so we cannot update your account for you. Please ensure your '
384
                        'account email is still up to date using your account management page.'
385
                    ) % email)
386
                    del self.user_info[mapping['ref']]
387

  
364 388
        utils.apply_user_info_mappings(self.fc_account.user, self.user_info)
365 389
        self.logger.debug('updating user_info %s', self.fc_account.user_info)
366 390

  
367 391
    def uniqueness_check_failed(self, request):
368 392
        if request.user.is_authenticated():
369 393
            # currently logged :
370 394
            if models.FcAccount.objects.filter(user=request.user, order=0).count():
371 395
                # cannot link because we are already linked to another FC account
......
397 421

  
398 422
            if created:
399 423
                self.logger.info('fc link created sub %s', self.sub)
400 424
                messages.info(request,
401 425
                              _('Your FranceConnect account {} has been linked.').format(self.fc_display_name))
402 426
                hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request)
403 427
            else:
404 428
                messages.info(request, _('Your local account has been updated.'))
405
            self.update_user_info()
429
            self.update_user_info(request)
406 430
            return self.redirect(request)
407 431

  
408 432
        user = a2_utils.authenticate(
409 433
            request,
410 434
            sub=self.sub,
411 435
            user_info=self.user_info,
412 436
            token=self.token)
413 437
        if user:
......
453 477
        if user:
454 478
            views_utils.check_cookie_works(request)
455 479
            a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug)
456 480
            # set session expiration policy to EXPIRE_AT_BROWSER_CLOSE
457 481
            request.session.set_expiry(0)
458 482
            self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user)
459 483
            self.fc_account.token = json.dumps(self.token)
460 484
            self.fc_account.save(update_fields=['token'])
461
            self.update_user_info()
485
            self.update_user_info(request)
462 486
            self.logger.info('logged in using fc sub %s', self.sub)
463 487
            return self.redirect(request)
464 488
        else:
465 489
            params = {}
466 490
            if self.service_slug:
467 491
                params[constants.SERVICE_FIELD_NAME] = self.service_slug
468 492
            if registration:
469 493
                return self.redirect_and_come_back(request,
tests/auth_fc/test_auth_fc.py
650 650
    response = app.get(reverse('fc-logout') + '?state=' + state)
651 651
    assert path(response['Location']) == '/accounts/'
652 652
    response = response.follow()
653 653
    assert len(response.pyquery('[href*="password/change"]')) > 0
654 654

  
655 655

  
656 656
def test_invalid_next_url(app, fc_settings, caplog, hooks):
657 657
    assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ'
658

  
659

  
660
def test_update_user_email_from_fc(app, fc_settings, caplog):
661
    # 1. we have 2 accounts havinf one linked to FC
662
    # 2. mail is updated if FC return a new one,
663
    # 3. but only if the mail is not already used
664

  
665
    callback = reverse('fc-login-or-link')
666
    response = app.get(callback, status=302)
667
    location = response['Location']
668
    state = check_authorization_url(location)
669

  
670
    EMAIL1 = 'fred@example.com'
671
    EMAIL2 = 'foo@example.com'
672
    EMAIL3 = 'john.doe@example.com'
673
    SUB = '1234'
674
    user1 = User.objects.create(email=EMAIL1, first_name='Frédérique', last_name='Ÿuñe')
675
    user1.save()
676
    models.FcAccount.objects.create(user=user1, sub='1234', token='xxx', user_info='{}')
677
    user2 = User.objects.create(email=EMAIL3, first_name='John', last_name='Doe')
678
    user2.save()
679

  
680
    @httmock.urlmatch(path=r'.*/token$')
681
    def access_token_response(url, request):
682
        parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
683
        assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
684
                                          'grant_type'])
685
        assert parsed['code'] == 'zzz'
686
        assert parsed['client_id'] == 'xxx'
687
        assert parsed['client_secret'] == 'yyy'
688
        assert parsed['grant_type'] == 'authorization_code'
689
        parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
690
        parsed_callback = urlparse.urlparse(callback)
691
        assert parsed_redirect.path == parsed_callback.path
692
        for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
693
            urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
694
        exp = now() + datetime.timedelta(seconds=1000)
695
        id_token = {
696
            'sub': SUB,
697
            'aud': 'xxx',
698
            'nonce': state,
699
            'exp': int(exp.timestamp()),
700
            'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
701
        }
702
        return json.dumps({
703
            'access_token': 'uuu',
704
            'id_token': hmac_jwt(id_token, 'yyy')
705
        })
706

  
707
    @httmock.urlmatch(path=r'.*userinfo$')
708
    def user_info_response(url, request):
709
        assert request.headers['Authorization'] == 'Bearer uuu'
710
        return json.dumps({
711
            'sub': SUB,
712
            'family_name': u'Frédérique',
713
            'given_name': u'Ÿuñe',
714
            'email': email,
715
        })
716

  
717
    fc_settings.A2_EMAIL_IS_UNIQUE = True
718
    email = EMAIL2
719
    with httmock.HTTMock(access_token_response, user_info_response):
720
        response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
721
    assert User.objects.count() == 2
722
    assert User.objects.get(last_name='Frédérique').email == 'foo@example.com'
723
    assert app.session['_auth_user_id']
724
    response = response.follow()
725
    assert not response.html.find('li', {'class': 'warning'})
726

  
727
    email = EMAIL3
728
    with httmock.HTTMock(access_token_response, user_info_response):
729
        response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
730
    assert User.objects.count() == 2
731
    assert User.objects.get(last_name='Frédérique').email == 'foo@example.com'
732
    assert app.session['_auth_user_id']
733
    response = response.follow()
734
    assert 'already used by another account' in response.html.find(
735
        'li', {'class': 'warning'}).text
736

  
737
    fc_settings.A2_EMAIL_IS_UNIQUE = False
738
    with httmock.HTTMock(access_token_response, user_info_response):
739
        response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
740
    assert User.objects.count() == 2
741
    assert User.objects.get(last_name='Frédérique').email == 'john.doe@example.com'
742
    assert app.session['_auth_user_id']
658
-