0002-auth_fc-do-not-update-non-uniq-email-returned-by-FC-.patch
src/authentic2_auth_fc/utils.py | ||
---|---|---|
160 | 160 | |
161 | 161 | |
162 | 162 |
def apply_user_info_mappings(user, user_info): |
163 | 163 |
assert user |
164 | 164 |
assert user_info |
165 | 165 | |
166 | 166 |
logger = logging.getLogger(__name__) |
167 | 167 |
mappings = app_settings.user_info_mappings |
168 |
status = {'err': 0} |
|
168 | 169 | |
169 | 170 |
save_user = False |
170 | 171 |
tags = set() |
171 | 172 |
for attribute, mapping in mappings.items(): |
172 | 173 |
# normalize mapping to dictionaries: if string, convert into a simple reference |
173 | 174 |
if hasattr(mapping, 'format'): |
174 | 175 |
mapping = {'ref': mapping} |
175 | 176 |
try: |
176 | 177 |
value = mapping_to_value(mapping, user_info) |
177 | 178 |
except (ValueError, KeyError, NotImplementedError) as e: |
178 | 179 |
logger.warning(u'auth_fc: cannot apply mapping %s <- %r: %s', attribute, mapping, e) |
179 | 180 |
continue |
180 | 181 |
if mapping.get('if-tag') and mapping['if-tag'] not in tags: |
181 | 182 |
continue |
182 | 183 | |
184 |
# do not update email if it is not unique |
|
185 |
if attribute == 'email' and email_is_unique(): |
|
186 |
qs = users_having_email(value) |
|
187 |
qs = qs.exclude(uuid=user.uuid) |
|
188 |
if qs.exists(): |
|
189 |
logger.error(u'other accounts already use the same mail %s, %s', value, list(qs)) |
|
190 |
status = {'err': 1, 'email': value} |
|
191 |
continue |
|
192 | ||
183 | 193 |
if attribute == 'password': |
184 | 194 |
if mapping.get('if-empty') and user.has_usable_password(): |
185 | 195 |
continue |
186 | 196 |
save_user = True |
187 | 197 |
user.set_password(value) |
188 | 198 |
elif hasattr(user.attributes, attribute): |
189 | 199 |
if mapping.get('if-empty') and getattr(user.attributes, attribute): |
190 | 200 |
continue |
... | ... | |
199 | 209 |
setattr(user, attribute, value) |
200 | 210 |
else: |
201 | 211 |
logger.warning(u'auth_fc: unknown attribute in user_info mapping: %s', attribute) |
202 | 212 |
continue |
203 | 213 |
if mapping.get('tag'): |
204 | 214 |
tags.add(mapping['tag']) |
205 | 215 |
if save_user: |
206 | 216 |
user.save() |
217 |
return status |
|
207 | 218 | |
208 | 219 | |
209 | 220 |
def requests_retry_session( |
210 | 221 |
retries=3, |
211 | 222 |
backoff_factor=0.5, |
212 | 223 |
status_forcelist=(500, 502, 504), |
213 | 224 |
session=None, |
214 | 225 |
): |
src/authentic2_auth_fc/views.py | ||
---|---|---|
339 | 339 | |
340 | 340 | |
341 | 341 |
class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View): |
342 | 342 |
'''Login with FC, if the FC account is already linked, connect this user, |
343 | 343 |
if a user is logged link the user to this account, otherwise display an |
344 | 344 |
error message. |
345 | 345 |
''' |
346 | 346 | |
347 |
def update_user_info(self): |
|
347 |
def update_user_info(self, request):
|
|
348 | 348 |
self.fc_account.token = json.dumps(self.token) |
349 | 349 |
self.fc_account.user_info = json.dumps(self.user_info) |
350 | 350 |
self.fc_account.save(update_fields=['token', 'user_info']) |
351 |
utils.apply_user_info_mappings(self.fc_account.user, self.user_info) |
|
351 | ||
352 |
status = utils.apply_user_info_mappings(self.fc_account.user, self.user_info) |
|
353 |
if status['err'] and status.get('email'): |
|
354 |
messages.warning(request, _( |
|
355 |
'Your FranceConnect email address \'%s\' is already used by another ' |
|
356 |
'account, so we cannot update your account for you. Please ensure your ' |
|
357 |
'account email is still up to date using your account management page.' |
|
358 |
) % status.get('email')) |
|
352 | 359 |
self.logger.debug('updating user_info %s', self.fc_account.user_info) |
353 | 360 | |
354 | 361 |
def uniqueness_check_failed(self, request): |
355 | 362 |
if request.user.is_authenticated(): |
356 | 363 |
# currently logged : |
357 | 364 |
if models.FcAccount.objects.filter(user=request.user, order=0).count(): |
358 | 365 |
# cannot link because we are already linked to another FC account |
359 | 366 |
messages.error(request, |
... | ... | |
384 | 391 | |
385 | 392 |
if created: |
386 | 393 |
self.logger.info('fc link created sub %s', self.sub) |
387 | 394 |
messages.info(request, |
388 | 395 |
_('Your FranceConnect account {} has been linked.').format(self.fc_display_name)) |
389 | 396 |
hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request) |
390 | 397 |
else: |
391 | 398 |
messages.info(request, _('Your local account has been updated.')) |
392 |
self.update_user_info() |
|
399 |
self.update_user_info(request)
|
|
393 | 400 |
return self.redirect(request) |
394 | 401 | |
395 | 402 |
user = a2_utils.authenticate( |
396 | 403 |
request, |
397 | 404 |
sub=self.sub, |
398 | 405 |
user_info=self.user_info, |
399 | 406 |
token=self.token) |
400 | 407 |
if user: |
... | ... | |
440 | 447 |
if user: |
441 | 448 |
views_utils.check_cookie_works(request) |
442 | 449 |
a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug) |
443 | 450 |
# set session expiration policy to EXPIRE_AT_BROWSER_CLOSE |
444 | 451 |
request.session.set_expiry(0) |
445 | 452 |
self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user) |
446 | 453 |
self.fc_account.token = json.dumps(self.token) |
447 | 454 |
self.fc_account.save(update_fields=['token']) |
448 |
self.update_user_info() |
|
455 |
self.update_user_info(request)
|
|
449 | 456 |
self.logger.info('logged in using fc sub %s', self.sub) |
450 | 457 |
return self.redirect(request) |
451 | 458 |
else: |
452 | 459 |
params = {} |
453 | 460 |
if self.service_slug: |
454 | 461 |
params[constants.SERVICE_FIELD_NAME] = self.service_slug |
455 | 462 |
if registration: |
456 | 463 |
return self.redirect_and_come_back(request, |
tests/auth_fc/test_auth_fc.py | ||
---|---|---|
650 | 650 |
response = app.get(reverse('fc-logout') + '?state=' + state) |
651 | 651 |
assert path(response['Location']) == '/accounts/' |
652 | 652 |
response = response.follow() |
653 | 653 |
assert len(response.pyquery('[href*="password/change"]')) > 0 |
654 | 654 | |
655 | 655 | |
656 | 656 |
def test_invalid_next_url(app, fc_settings, caplog, hooks): |
657 | 657 |
assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ' |
658 | ||
659 | ||
660 |
def test_update_user_info_having_non_uniq_new_email_from_fc(app, fc_settings, caplog): |
|
661 |
# We have 2 accounts, account1 is linked to FC. |
|
662 |
# Account1's email is not updated if FC return the account2's email |
|
663 | ||
664 |
callback = reverse('fc-login-or-link') |
|
665 |
response = app.get(callback, status=302) |
|
666 |
location = response['Location'] |
|
667 |
state = check_authorization_url(location) |
|
668 | ||
669 |
EMAIL1 = 'fred@example.com' |
|
670 |
EMAIL2 = 'john.doe@example.com' |
|
671 |
SUB = '1234' |
|
672 |
user1 = User.objects.create(email=EMAIL1, first_name='Frédérique', last_name='Ÿuñe') |
|
673 |
user1.save() |
|
674 |
models.FcAccount.objects.create(user=user1, sub='1234', token='xxx', user_info='{}').save() |
|
675 |
user2 = User.objects.create(email=EMAIL2, first_name='John', last_name='Doe') |
|
676 |
user2.save() |
|
677 | ||
678 |
@httmock.urlmatch(path=r'.*/token$') |
|
679 |
def access_token_response(url, request): |
|
680 |
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()} |
|
681 |
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri', |
|
682 |
'grant_type']) |
|
683 |
assert parsed['code'] == 'zzz' |
|
684 |
assert parsed['client_id'] == 'xxx' |
|
685 |
assert parsed['client_secret'] == 'yyy' |
|
686 |
assert parsed['grant_type'] == 'authorization_code' |
|
687 |
parsed_redirect = urlparse.urlparse(parsed['redirect_uri']) |
|
688 |
parsed_callback = urlparse.urlparse(callback) |
|
689 |
assert parsed_redirect.path == parsed_callback.path |
|
690 |
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items(): |
|
691 |
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value |
|
692 |
exp = now() + datetime.timedelta(seconds=1000) |
|
693 |
id_token = { |
|
694 |
'sub': SUB, |
|
695 |
'aud': 'xxx', |
|
696 |
'nonce': state, |
|
697 |
'exp': int(exp.timestamp()), |
|
698 |
'iss': 'https://fcp.integ01.dev-franceconnect.fr/', |
|
699 |
} |
|
700 |
return json.dumps({ |
|
701 |
'access_token': 'uuu', |
|
702 |
'id_token': hmac_jwt(id_token, 'yyy') |
|
703 |
}) |
|
704 | ||
705 |
@httmock.urlmatch(path=r'.*userinfo$') |
|
706 |
def user_info_response(url, request): |
|
707 |
assert request.headers['Authorization'] == 'Bearer uuu' |
|
708 |
return json.dumps({ |
|
709 |
'sub': SUB, |
|
710 |
'family_name': u'Frédérique', |
|
711 |
'given_name': u'Ÿuñe', |
|
712 |
'email': EMAIL2, # get a new mail from FC |
|
713 |
}) |
|
714 | ||
715 |
fc_settings.A2_EMAIL_IS_UNIQUE = True |
|
716 | ||
717 |
# user not already authenticated |
|
718 |
with httmock.HTTMock(access_token_response, user_info_response): |
|
719 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302) |
|
720 |
assert User.objects.count() == 2 |
|
721 |
assert User.objects.get(last_name='Frédérique').email == EMAIL1 |
|
722 |
response = response.follow() |
|
723 |
assert 'already used by another account' in response.html.find( |
|
724 |
'li', {'class': 'warning'}).text |
|
725 | ||
726 |
# user is authenticated |
|
727 |
assert app.session['_auth_user_id'] |
|
728 |
with httmock.HTTMock(access_token_response, user_info_response): |
|
729 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302) |
|
730 |
assert User.objects.count() == 2 |
|
731 |
assert User.objects.get(last_name='Frédérique').email == EMAIL1 |
|
732 |
response = response.follow() |
|
733 |
assert 'already used by another account' in response.html.find( |
|
734 |
'li', {'class': 'warning'}).text |
|
735 | ||
736 |
fc_settings.A2_EMAIL_IS_UNIQUE = False |
|
737 |
app.session.flush() |
|
738 |
callback = reverse('fc-login-or-link') |
|
739 |
response = app.get(callback, status=302) |
|
740 |
location = response['Location'] |
|
741 |
state = check_authorization_url(location) |
|
742 | ||
743 |
# user not already authenticated |
|
744 |
with httmock.HTTMock(access_token_response, user_info_response): |
|
745 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302) |
|
746 |
assert User.objects.get(last_name='Frédérique').email == EMAIL2 |
|
747 | ||
748 |
# user is authenticated |
|
749 |
user1 = User.objects.get(id=user1.id) |
|
750 |
user1.email = EMAIL1 |
|
751 |
user1.save() |
|
752 |
assert User.objects.get(last_name='Frédérique').email == EMAIL1 |
|
753 |
assert app.session['_auth_user_id'] |
|
754 |
with httmock.HTTMock(access_token_response, user_info_response): |
|
755 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302) |
|
756 |
assert User.objects.get(last_name='Frédérique').email == EMAIL2 |
|
658 |
- |