Projet

Général

Profil

0002-auth_fc-do-not-update-non-uniq-email-returned-by-FC-.patch

Nicolas Roche, 30 juillet 2020 14:54

Télécharger (11,1 ko)

Voir les différences:

Subject: [PATCH 2/2] auth_fc: do not update non-uniq email returned by FC
 (#45199)

 src/authentic2_auth_fc/utils.py | 11 ++++
 src/authentic2_auth_fc/views.py | 15 +++--
 tests/auth_fc/test_auth_fc.py   | 99 +++++++++++++++++++++++++++++++++
 3 files changed, 121 insertions(+), 4 deletions(-)
src/authentic2_auth_fc/utils.py
160 160

  
161 161

  
162 162
def apply_user_info_mappings(user, user_info):
163 163
    assert user
164 164
    assert user_info
165 165

  
166 166
    logger = logging.getLogger(__name__)
167 167
    mappings = app_settings.user_info_mappings
168
    status = {'err': 0}
168 169

  
169 170
    save_user = False
170 171
    tags = set()
171 172
    for attribute, mapping in mappings.items():
172 173
        # normalize mapping to dictionaries: if string, convert into a simple reference
173 174
        if hasattr(mapping, 'format'):
174 175
            mapping = {'ref': mapping}
175 176
        try:
176 177
            value = mapping_to_value(mapping, user_info)
177 178
        except (ValueError, KeyError, NotImplementedError) as e:
178 179
            logger.warning(u'auth_fc: cannot apply mapping %s <- %r: %s', attribute, mapping, e)
179 180
            continue
180 181
        if mapping.get('if-tag') and mapping['if-tag'] not in tags:
181 182
            continue
182 183

  
184
        # do not update email if it is not unique
185
        if attribute == 'email' and email_is_unique():
186
            qs = users_having_email(value)
187
            qs = qs.exclude(uuid=user.uuid)
188
            if qs.exists():
189
                logger.error(u'other accounts already use the same mail %s, %s', value, list(qs))
190
                status = {'err': 1, 'email': value}
191
                continue
192

  
183 193
        if attribute == 'password':
184 194
            if mapping.get('if-empty') and user.has_usable_password():
185 195
                continue
186 196
            save_user = True
187 197
            user.set_password(value)
188 198
        elif hasattr(user.attributes, attribute):
189 199
            if mapping.get('if-empty') and getattr(user.attributes, attribute):
190 200
                continue
......
199 209
            setattr(user, attribute, value)
200 210
        else:
201 211
            logger.warning(u'auth_fc: unknown attribute in user_info mapping: %s', attribute)
202 212
            continue
203 213
        if mapping.get('tag'):
204 214
            tags.add(mapping['tag'])
205 215
    if save_user:
206 216
        user.save()
217
    return status
207 218

  
208 219

  
209 220
def requests_retry_session(
210 221
    retries=3,
211 222
    backoff_factor=0.5,
212 223
    status_forcelist=(500, 502, 504),
213 224
    session=None,
214 225
):
src/authentic2_auth_fc/views.py
339 339

  
340 340

  
341 341
class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
342 342
    '''Login with FC, if the FC account is already linked, connect this user,
343 343
       if a user is logged link the user to this account, otherwise display an
344 344
       error message.
345 345
    '''
346 346

  
347
    def update_user_info(self):
347
    def update_user_info(self, request):
348 348
        self.fc_account.token = json.dumps(self.token)
349 349
        self.fc_account.user_info = json.dumps(self.user_info)
350 350
        self.fc_account.save(update_fields=['token', 'user_info'])
351
        utils.apply_user_info_mappings(self.fc_account.user, self.user_info)
351

  
352
        status = utils.apply_user_info_mappings(self.fc_account.user, self.user_info)
353
        if status['err'] and status.get('email'):
354
            messages.warning(request, _(
355
                'Your FranceConnect email address \'%s\' is already used by another '
356
                'account, so we cannot update your account for you. Please ensure your '
357
                'account email is still up to date using your account management page.'
358
            ) % status.get('email'))
352 359
        self.logger.debug('updating user_info %s', self.fc_account.user_info)
353 360

  
354 361
    def uniqueness_check_failed(self, request):
355 362
        if request.user.is_authenticated():
356 363
            # currently logged :
357 364
            if models.FcAccount.objects.filter(user=request.user, order=0).count():
358 365
                # cannot link because we are already linked to another FC account
359 366
                messages.error(request,
......
384 391

  
385 392
            if created:
386 393
                self.logger.info('fc link created sub %s', self.sub)
387 394
                messages.info(request,
388 395
                              _('Your FranceConnect account {} has been linked.').format(self.fc_display_name))
389 396
                hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request)
390 397
            else:
391 398
                messages.info(request, _('Your local account has been updated.'))
392
            self.update_user_info()
399
            self.update_user_info(request)
393 400
            return self.redirect(request)
394 401

  
395 402
        user = a2_utils.authenticate(
396 403
            request,
397 404
            sub=self.sub,
398 405
            user_info=self.user_info,
399 406
            token=self.token)
400 407
        if user:
......
440 447
        if user:
441 448
            views_utils.check_cookie_works(request)
442 449
            a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug)
443 450
            # set session expiration policy to EXPIRE_AT_BROWSER_CLOSE
444 451
            request.session.set_expiry(0)
445 452
            self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user)
446 453
            self.fc_account.token = json.dumps(self.token)
447 454
            self.fc_account.save(update_fields=['token'])
448
            self.update_user_info()
455
            self.update_user_info(request)
449 456
            self.logger.info('logged in using fc sub %s', self.sub)
450 457
            return self.redirect(request)
451 458
        else:
452 459
            params = {}
453 460
            if self.service_slug:
454 461
                params[constants.SERVICE_FIELD_NAME] = self.service_slug
455 462
            if registration:
456 463
                return self.redirect_and_come_back(request,
tests/auth_fc/test_auth_fc.py
650 650
    response = app.get(reverse('fc-logout') + '?state=' + state)
651 651
    assert path(response['Location']) == '/accounts/'
652 652
    response = response.follow()
653 653
    assert len(response.pyquery('[href*="password/change"]')) > 0
654 654

  
655 655

  
656 656
def test_invalid_next_url(app, fc_settings, caplog, hooks):
657 657
    assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ'
658

  
659

  
660
def test_update_user_info_having_non_uniq_new_email_from_fc(app, fc_settings, caplog):
661
    # We have 2 accounts, account1 is linked to FC.
662
    # Account1's email is not updated if FC return the account2's email
663

  
664
    callback = reverse('fc-login-or-link')
665
    response = app.get(callback, status=302)
666
    location = response['Location']
667
    state = check_authorization_url(location)
668

  
669
    EMAIL1 = 'fred@example.com'
670
    EMAIL2 = 'john.doe@example.com'
671
    SUB = '1234'
672
    user1 = User.objects.create(email=EMAIL1, first_name='Frédérique', last_name='Ÿuñe')
673
    user1.save()
674
    models.FcAccount.objects.create(user=user1, sub='1234', token='xxx', user_info='{}').save()
675
    user2 = User.objects.create(email=EMAIL2, first_name='John', last_name='Doe')
676
    user2.save()
677

  
678
    @httmock.urlmatch(path=r'.*/token$')
679
    def access_token_response(url, request):
680
        parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
681
        assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
682
                                          'grant_type'])
683
        assert parsed['code'] == 'zzz'
684
        assert parsed['client_id'] == 'xxx'
685
        assert parsed['client_secret'] == 'yyy'
686
        assert parsed['grant_type'] == 'authorization_code'
687
        parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
688
        parsed_callback = urlparse.urlparse(callback)
689
        assert parsed_redirect.path == parsed_callback.path
690
        for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
691
            urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
692
        exp = now() + datetime.timedelta(seconds=1000)
693
        id_token = {
694
            'sub': SUB,
695
            'aud': 'xxx',
696
            'nonce': state,
697
            'exp': int(exp.timestamp()),
698
            'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
699
        }
700
        return json.dumps({
701
            'access_token': 'uuu',
702
            'id_token': hmac_jwt(id_token, 'yyy')
703
        })
704

  
705
    @httmock.urlmatch(path=r'.*userinfo$')
706
    def user_info_response(url, request):
707
        assert request.headers['Authorization'] == 'Bearer uuu'
708
        return json.dumps({
709
            'sub': SUB,
710
            'family_name': u'Frédérique',
711
            'given_name': u'Ÿuñe',
712
            'email': EMAIL2,  # get a new mail from FC
713
        })
714

  
715
    fc_settings.A2_EMAIL_IS_UNIQUE = True
716

  
717
    # user not already authenticated
718
    with httmock.HTTMock(access_token_response, user_info_response):
719
        response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
720
    assert User.objects.count() == 2
721
    assert User.objects.get(last_name='Frédérique').email == EMAIL1
722
    response = response.follow()
723
    assert 'already used by another account' in response.html.find(
724
        'li', {'class': 'warning'}).text
725

  
726
    # user is authenticated
727
    assert app.session['_auth_user_id']
728
    with httmock.HTTMock(access_token_response, user_info_response):
729
        response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
730
    assert User.objects.count() == 2
731
    assert User.objects.get(last_name='Frédérique').email == EMAIL1
732
    response = response.follow()
733
    assert 'already used by another account' in response.html.find(
734
        'li', {'class': 'warning'}).text
735

  
736
    fc_settings.A2_EMAIL_IS_UNIQUE = False
737
    app.session.flush()
738
    callback = reverse('fc-login-or-link')
739
    response = app.get(callback, status=302)
740
    location = response['Location']
741
    state = check_authorization_url(location)
742

  
743
    # user not already authenticated
744
    with httmock.HTTMock(access_token_response, user_info_response):
745
        response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
746
    assert User.objects.get(last_name='Frédérique').email == EMAIL2
747

  
748
    # user is authenticated
749
    user1 = User.objects.get(id=user1.id)
750
    user1.email = EMAIL1
751
    user1.save()
752
    assert User.objects.get(last_name='Frédérique').email == EMAIL1
753
    assert app.session['_auth_user_id']
754
    with httmock.HTTMock(access_token_response, user_info_response):
755
        response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
756
    assert User.objects.get(last_name='Frédérique').email == EMAIL2
658
-