22 |
22 |
import json
|
23 |
23 |
import base64
|
24 |
24 |
from jwcrypto import jwk, jwt
|
25 |
25 |
import datetime
|
26 |
26 |
|
27 |
27 |
import requests
|
28 |
28 |
|
29 |
29 |
from django.contrib.auth import get_user_model
|
|
30 |
from django.test import override_settings
|
30 |
31 |
from django.urls import reverse
|
31 |
32 |
from django.utils.encoding import force_text
|
32 |
33 |
from django.utils.six.moves.urllib import parse as urlparse
|
33 |
34 |
from django.utils.timezone import now
|
34 |
35 |
|
35 |
36 |
from authentic2_auth_fc import models
|
36 |
37 |
from authentic2_auth_fc.utils import requests_retry_session
|
37 |
38 |
|
... | ... | |
650 |
651 |
response = app.get(reverse('fc-logout') + '?state=' + state)
|
651 |
652 |
assert path(response['Location']) == '/accounts/'
|
652 |
653 |
response = response.follow()
|
653 |
654 |
assert len(response.pyquery('[href*="password/change"]')) > 0
|
654 |
655 |
|
655 |
656 |
|
656 |
657 |
def test_invalid_next_url(app, fc_settings, caplog, hooks):
|
657 |
658 |
assert app.get('/fc/callback/?code=coin&next=JJJ72QQQ').location == 'JJJ72QQQ'
|
|
659 |
|
|
660 |
|
|
661 |
def test_update_user_info_having_non_uniq_new_email_from_fc(app, fc_settings, caplog):
|
|
662 |
# We have 2 accounts, account1 is linked to FC.
|
|
663 |
# Account1's email is not updated if FC return the account2's email
|
|
664 |
|
|
665 |
callback = reverse('fc-login-or-link')
|
|
666 |
response = app.get(callback, status=302)
|
|
667 |
location = response['Location']
|
|
668 |
state = check_authorization_url(location)
|
|
669 |
|
|
670 |
EMAIL1 = 'fred@example.com'
|
|
671 |
EMAIL2 = 'john.doe@example.com'
|
|
672 |
SUB = '1234'
|
|
673 |
user1 = User.objects.create(email=EMAIL1, first_name='Frédérique', last_name='Ÿuñe')
|
|
674 |
user1.save()
|
|
675 |
models.FcAccount.objects.create(user=user1, sub='1234', token='xxx', user_info='{}').save()
|
|
676 |
user2 = User.objects.create(email=EMAIL2, first_name='John', last_name='Doe')
|
|
677 |
user2.save()
|
|
678 |
|
|
679 |
@httmock.urlmatch(path=r'.*/token$')
|
|
680 |
def access_token_response(url, request):
|
|
681 |
parsed = {x: y[0] for x, y in urlparse.parse_qs(request.body).items()}
|
|
682 |
assert set(parsed.keys()) == set(['code', 'client_id', 'client_secret', 'redirect_uri',
|
|
683 |
'grant_type'])
|
|
684 |
assert parsed['code'] == 'zzz'
|
|
685 |
assert parsed['client_id'] == 'xxx'
|
|
686 |
assert parsed['client_secret'] == 'yyy'
|
|
687 |
assert parsed['grant_type'] == 'authorization_code'
|
|
688 |
parsed_redirect = urlparse.urlparse(parsed['redirect_uri'])
|
|
689 |
parsed_callback = urlparse.urlparse(callback)
|
|
690 |
assert parsed_redirect.path == parsed_callback.path
|
|
691 |
for cb_key, cb_value in urlparse.parse_qs(parsed_callback.query).items():
|
|
692 |
urlparse.parse_qs(parsed_redirect.query)[cb_key] == cb_value
|
|
693 |
exp = now() + datetime.timedelta(seconds=1000)
|
|
694 |
id_token = {
|
|
695 |
'sub': SUB,
|
|
696 |
'aud': 'xxx',
|
|
697 |
'nonce': state,
|
|
698 |
'exp': int(exp.timestamp()),
|
|
699 |
'iss': 'https://fcp.integ01.dev-franceconnect.fr/',
|
|
700 |
}
|
|
701 |
return json.dumps({
|
|
702 |
'access_token': 'uuu',
|
|
703 |
'id_token': hmac_jwt(id_token, 'yyy')
|
|
704 |
})
|
|
705 |
|
|
706 |
@httmock.urlmatch(path=r'.*userinfo$')
|
|
707 |
def user_info_response(url, request):
|
|
708 |
assert request.headers['Authorization'] == 'Bearer uuu'
|
|
709 |
return json.dumps({
|
|
710 |
'sub': SUB,
|
|
711 |
'family_name': u'Frédérique',
|
|
712 |
'given_name': u'Ÿuñe',
|
|
713 |
'email': EMAIL2, # get a new mail from FC
|
|
714 |
})
|
|
715 |
|
|
716 |
# user not already authenticated
|
|
717 |
with httmock.HTTMock(access_token_response, user_info_response):
|
|
718 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
|
|
719 |
json.loads(models.FcAccount.objects.get(sub=SUB).user_info)['email'] == EMAIL2
|
|
720 |
assert User.objects.count() == 2
|
|
721 |
assert User.objects.get(last_name='Frédérique').email == EMAIL1
|
|
722 |
assert not User.objects.get(last_name='Frédérique').email_verified
|
|
723 |
|
|
724 |
# user is authenticated
|
|
725 |
assert app.session['_auth_user_id']
|
|
726 |
with httmock.HTTMock(access_token_response, user_info_response):
|
|
727 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
|
|
728 |
json.loads(models.FcAccount.objects.get(sub=SUB).user_info)['email'] == EMAIL2
|
|
729 |
assert User.objects.count() == 2
|
|
730 |
assert User.objects.get(last_name='Frédérique').email == EMAIL1
|
|
731 |
assert not User.objects.get(last_name='Frédérique').email_verified
|
|
732 |
|
|
733 |
app.session.flush()
|
|
734 |
response = app.get(callback, status=302)
|
|
735 |
location = response['Location']
|
|
736 |
state = check_authorization_url(location)
|
|
737 |
|
|
738 |
with override_settings(A2_FC_USER_INFO_MAPPINGS={
|
|
739 |
'last_name': {
|
|
740 |
'ref': 'family_name',
|
|
741 |
'verified': True,
|
|
742 |
},
|
|
743 |
'first_name': {
|
|
744 |
'ref': 'given_name',
|
|
745 |
'verified': True,
|
|
746 |
},
|
|
747 |
'email': 'email',
|
|
748 |
}):
|
|
749 |
# user not already authenticated
|
|
750 |
with httmock.HTTMock(access_token_response, user_info_response):
|
|
751 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
|
|
752 |
assert User.objects.get(last_name='Frédérique').email == EMAIL2
|
|
753 |
|
|
754 |
# user is authenticated
|
|
755 |
user1 = User.objects.get(id=user1.id)
|
|
756 |
user1.email = EMAIL1
|
|
757 |
user1.save()
|
|
758 |
assert User.objects.get(last_name='Frédérique').email == EMAIL1
|
|
759 |
assert app.session['_auth_user_id']
|
|
760 |
with httmock.HTTMock(access_token_response, user_info_response):
|
|
761 |
response = app.get(callback + '?code=zzz&state=%s' % state, status=302)
|
|
762 |
assert User.objects.get(last_name='Frédérique').email == EMAIL2
|
658 |
|
-
|