0001-auth_oidc-use-a-signed-state-47825.patch
src/authentic2_auth_oidc/views.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import uuid |
|
18 |
import logging |
|
17 |
import hashlib |
|
19 | 18 |
import json |
19 |
import logging |
|
20 |
import uuid |
|
20 | 21 | |
21 | 22 |
import requests |
22 | 23 | |
24 |
from django.conf import settings |
|
25 |
from django.core import signing |
|
23 | 26 |
from django.urls import reverse |
24 | 27 |
from django.utils.translation import get_language, ugettext as _ |
25 | 28 |
from django.contrib import messages |
... | ... | |
34 | 37 |
from . import app_settings, models |
35 | 38 |
from .utils import get_provider, get_provider_by_issuer |
36 | 39 | |
40 |
logger = logging.getLogger(__name__) |
|
41 | ||
42 | ||
43 |
def make_nonce(state): |
|
44 |
return hashlib.sha256(state.encode() + settings.SECRET_KEY.encode()).hexdigest() |
|
45 | ||
37 | 46 | |
38 | 47 |
@setting_enabled('ENABLE', settings=app_settings) |
39 | 48 |
def oidc_login(request, pk, next_url=None, *args, **kwargs): |
40 |
logger = logging.getLogger(__name__) |
|
41 | 49 |
provider = get_provider(pk) |
42 | 50 |
scopes = set(provider.scopes.split()) | set(['openid']) |
43 |
state = str(uuid.uuid4()) |
|
44 |
nonce = request.GET.get('nonce') or str(uuid.uuid4()) |
|
51 |
state_id = str(uuid.uuid4()) |
|
52 |
next_url = next_url or request.GET.get(REDIRECT_FIELD_NAME, '') |
|
53 |
if next_url and not good_next_url(request, next_url): |
|
54 |
next_url = None |
|
55 |
nonce = make_nonce(state_id) |
|
45 | 56 |
display = set() |
46 | 57 |
prompt = set() |
58 |
state_content = { |
|
59 |
'state_id': state_id, |
|
60 |
'issuer': provider.issuer, |
|
61 |
} |
|
62 |
if next_url: |
|
63 |
state_content['next'] = next_url |
|
47 | 64 |
params = { |
48 | 65 |
'client_id': provider.client_id, |
49 | 66 |
'scope': ' '.join(scopes), |
50 | 67 |
'response_type': 'code', |
51 | 68 |
'redirect_uri': request.build_absolute_uri(reverse('oidc-login-callback')), |
52 |
'state': state,
|
|
69 |
'state': signing.dumps(state_content),
|
|
53 | 70 |
'nonce': nonce, |
54 | 71 |
} |
55 | 72 |
if provider.claims_parameter_supported: |
... | ... | |
70 | 87 |
# FIXME: id_token_hint ? |
71 | 88 |
# FIXME: acr_values ? |
72 | 89 |
# save request state |
73 |
saved_state = request.session.setdefault('auth_oidc', {}).setdefault(state, {}) |
|
74 |
saved_state['request'] = params |
|
75 |
saved_state['issuer'] = provider.issuer |
|
76 |
next_url = next_url or request.GET.get(REDIRECT_FIELD_NAME, '') |
|
77 |
if good_next_url(request, next_url): |
|
78 |
saved_state['next_url'] = next_url |
|
79 |
request.session.modified = True # necessary if auth_oidc already exists |
|
80 | 90 |
logger.debug('auth_oidc: sent request to authorization endpoint %r', params) |
81 |
return redirect(request, provider.authorization_endpoint, params=params, resolve=False) |
|
91 |
response = redirect(request, provider.authorization_endpoint, params=params, resolve=False) |
|
92 |
response.set_cookie( |
|
93 |
'oidc-state', value=state_id, path=reverse('oidc-login-callback'), |
|
94 |
httponly=True, secure=request.is_secure()) |
|
95 |
return response |
|
82 | 96 | |
83 | 97 | |
84 | 98 |
@setting_enabled('ENABLE', settings=app_settings) |
... | ... | |
89 | 103 |
try: |
90 | 104 |
provider = get_provider_by_issuer(issuer) |
91 | 105 |
except models.OIDCProvider.DoesNotExist: |
92 |
return HttpResponseBadRequest(u'unknown issuer %s' % issuer, content_type='text/plain')
|
|
106 |
return HttpResponseBadRequest('unknown issuer %s' % issuer, content_type='text/plain') |
|
93 | 107 |
return oidc_login(request, pk=provider.pk, next_url=request.GET.get('target_link_uri')) |
94 | 108 | |
95 | 109 | |
96 | 110 |
class LoginCallback(View): |
97 |
def continue_to_next_url(self): |
|
98 |
return redirect(self.request, |
|
99 |
self.oidc_state.get('next_url', settings.LOGIN_REDIRECT_URL), |
|
100 |
resolve=False) |
|
111 |
next_url = None |
|
112 | ||
113 |
def continue_to_next_url(self, request): |
|
114 |
if self.next_url: |
|
115 |
return redirect(request, self.next_url, resolve=False) |
|
116 |
else: |
|
117 |
return redirect(request, settings.LOGIN_REDIRECT_URL) |
|
101 | 118 | |
102 | 119 |
def get(self, request, *args, **kwargs): |
103 |
logger = logging.getLogger(__name__) |
|
120 |
response = self.handle_authorization_response(request) |
|
121 |
# clean the state cookie in all cases |
|
122 |
if 'oidc-state' in request.COOKIES: |
|
123 |
response.delete_cookie('oidc-state') |
|
124 |
return response |
|
125 | ||
126 |
def handle_authorization_response(self, request): |
|
104 | 127 |
code = request.GET.get('code') |
105 |
state = request.GET.get('state') |
|
106 |
oidc_state = self.oidc_state = request.session.get('auth_oidc', {}).get(state) |
|
107 |
if not state or not oidc_state or 'request' not in oidc_state: |
|
108 |
messages.warning(request, _('Login with OpenIDConnect failed, state lost.')) |
|
109 |
logger.warning('auth_oidc: state lost') |
|
128 |
raw_state = request.GET.get('state') |
|
129 |
if not raw_state: |
|
110 | 130 |
return redirect(request, settings.LOGIN_REDIRECT_URL) |
111 |
oidc_request = oidc_state.get('request') |
|
112 |
assert isinstance(oidc_request, dict), 'state is not properly initialized' |
|
113 |
nonce = oidc_request.get('nonce') |
|
114 | 131 |
try: |
115 |
issuer = oidc_state.get('issuer') |
|
132 |
state_content = signing.loads(raw_state) |
|
133 |
except signing.BadSignature: |
|
134 |
return redirect(request, settings.LOGIN_REDIRECT_URL) |
|
135 | ||
136 |
state = state_content['state_id'] |
|
137 |
issuer = state_content['issuer'] |
|
138 |
nonce = make_nonce(state) |
|
139 |
self.next_url = state_content.get('next') |
|
140 | ||
141 |
try: |
|
116 | 142 |
provider = get_provider_by_issuer(issuer) |
117 | 143 |
except models.OIDCProvider.DoesNotExist: |
118 |
messages.warning(request, _('Unknown OpenID connect issuer'))
|
|
144 |
messages.warning(request, _('Unknown OpenID connect issuer: "%s"') % issuer)
|
|
119 | 145 |
logger.warning('auth_oidc: unknown issuer, %s', issuer) |
120 |
return self.continue_to_next_url() |
|
146 |
return self.continue_to_next_url(request) |
|
147 | ||
148 |
# Check state |
|
149 |
if 'oidc-state' not in request.COOKIES or request.COOKIES['oidc-state'] != state: |
|
150 |
logger.warning('auth-oidc: state %s for issuer %s has been lost', state, issuer) |
|
151 |
params = {} |
|
152 |
if self.next_url: |
|
153 |
params['next'] = self.next_url |
|
154 |
response = redirect(request, 'oidc-login', kwargs={'pk': str(provider.pk)}, params=params) |
|
155 |
return response |
|
121 | 156 | |
122 |
# FIXME is idp initiated SSO allowed ? in this case state is maybe not mandatory |
|
123 | 157 |
if 'error' in request.GET: # error code path |
124 |
error_description = request.GET.get('error_description') |
|
125 |
error_url = request.GET.get('error_url') |
|
126 |
msg = u'auth_oidc: error received ' |
|
127 |
if error_description: |
|
128 |
msg += u'%s (%s)' % (error_description, request.GET['error']) |
|
129 |
else: |
|
130 |
msg += request.GET['error'] |
|
131 |
if error_url: |
|
132 |
msg += u' see %s' % error_url |
|
133 |
logger.warning(msg) |
|
134 |
if provider: |
|
135 |
messages.warning(request, _('Login with %(name)s failed, report %(request_id)s ' |
|
136 |
'to an administrator.') |
|
137 |
% { |
|
138 |
'name': provider.name, |
|
139 |
'request_id': request.request_id, |
|
140 |
}) |
|
141 |
else: |
|
142 |
messages.warning(request, _('Login with OpenIDConnect failed, report %s to an ' |
|
143 |
'administrator') % request.request_id) |
|
144 |
return self.continue_to_next_url() |
|
145 |
if not code: |
|
158 |
return self.handle_error(request, provider) |
|
159 |
elif not code: |
|
146 | 160 |
messages.warning(request, _('Missing code, report %s to an administrator') % |
147 | 161 |
request.request_id) |
148 | 162 |
logger.warning('auth_oidc: missing code, %r', request.GET) |
149 |
return self.continue_to_next_url() |
|
163 |
return self.continue_to_next_url(request) |
|
164 |
else: |
|
165 |
return self.handle_code(request, provider, nonce, code) |
|
166 | ||
167 |
def handle_code(self, request, provider, nonce, code): |
|
150 | 168 |
try: |
151 | 169 |
token_endpoint_request = { |
152 | 170 |
'grant_type': 'authorization_code', |
153 | 171 |
'code': code, |
154 | 172 |
'redirect_uri': request.build_absolute_uri(request.path), |
155 | 173 |
} |
156 |
logger.debug('auth_oidc: sent request to token endpoint %r', token_endpoint_request) |
|
157 | 174 |
response = requests.post(provider.token_endpoint, data=token_endpoint_request, |
158 | 175 |
auth=(provider.client_id, provider.client_secret), timeout=10) |
159 | 176 |
response.raise_for_status() |
160 | 177 |
except requests.RequestException as e: |
161 | 178 |
logger.warning( |
162 | 179 |
'auth_oidc: failed to contact the token_endpoint for %(issuer)s, %(exception)s' % { |
163 |
'issuer': issuer, |
|
180 |
'issuer': provider.issuer,
|
|
164 | 181 |
'exception': e, |
165 | 182 |
}) |
166 | 183 |
messages.warning(request, _('Provider %(name)s is down, report %(request_id)s to ' |
... | ... | |
169 | 186 |
'name': provider.name, |
170 | 187 |
'request_id': request.request_id, |
171 | 188 |
}) |
172 |
return self.continue_to_next_url() |
|
189 |
return self.continue_to_next_url(request)
|
|
173 | 190 |
try: |
174 | 191 |
result = response.json() |
175 | 192 |
except ValueError as e: |
176 |
logger.warning(u'auth_oidc: response from %s is not a JSON document, %s, %r' %
|
|
193 |
logger.warning('auth_oidc: response from %s is not a JSON document, %s, %r' % |
|
177 | 194 |
(provider.token_endpoint, e, response.content)) |
178 | 195 |
messages.warning(request, _('Provider %(name)s is down, report %(request_id)s to ' |
179 | 196 |
'an administrator. ') % |
... | ... | |
181 | 198 |
'name': provider.name, |
182 | 199 |
'request_id': request.request_id, |
183 | 200 |
}) |
184 |
return self.continue_to_next_url() |
|
201 |
return self.continue_to_next_url(request)
|
|
185 | 202 |
# token_type is case insensitive, https://tools.ietf.org/html/rfc6749#section-4.2.2 |
186 | 203 |
if ('access_token' not in result |
187 | 204 |
or 'token_type' not in result |
188 | 205 |
or result['token_type'].lower() != 'bearer' |
189 | 206 |
or 'id_token' not in result): |
190 |
logger.warning(u'auth_oidc: invalid token endpoint response from %s: %r' % (
|
|
207 |
logger.warning('auth_oidc: invalid token endpoint response from %s: %r' % ( |
|
191 | 208 |
provider.token_endpoint, result)) |
192 | 209 |
messages.warning(request, _('Provider %(name)s is down, report %(request_id)s to ' |
193 | 210 |
'an administrator. ') % |
... | ... | |
195 | 212 |
'name': provider.name, |
196 | 213 |
'request_id': request.request_id, |
197 | 214 |
}) |
198 |
return self.continue_to_next_url() |
|
199 |
logger.info(u'got token response %s', result)
|
|
215 |
return self.continue_to_next_url(request)
|
|
216 |
logger.info('auth_oidc: got token response %s', result)
|
|
200 | 217 |
access_token = result.get('access_token') |
201 |
user = authenticate(request, access_token=access_token, nonce=nonce, id_token=result['id_token'], provider=provider) |
|
218 |
user = authenticate( |
|
219 |
request, |
|
220 |
access_token=access_token, |
|
221 |
nonce=nonce, |
|
222 |
id_token=result['id_token'], |
|
223 |
provider=provider) |
|
202 | 224 |
if user: |
203 | 225 |
# remember last tokens for logout |
226 |
login(request, user, 'oidc', nonce=nonce) |
|
204 | 227 |
tokens = request.session.setdefault('auth_oidc', {}).setdefault('tokens', []) |
205 | 228 |
tokens.append({ |
206 | 229 |
'token_response': result, |
207 | 230 |
'provider_pk': provider.pk, |
208 | 231 |
}) |
209 |
request.session.modified = True |
|
210 |
login(request, user, 'oidc', nonce=nonce) |
|
211 | 232 |
else: |
212 | 233 |
messages.warning(request, _('No user found')) |
213 |
return self.continue_to_next_url() |
|
234 |
return self.continue_to_next_url(request) |
|
235 | ||
236 |
def handle_error(self, request, provider): |
|
237 |
error_description = request.GET.get('error_description') |
|
238 |
error_url = request.GET.get('error_url') |
|
239 |
msg = 'auth_oidc: error received ' |
|
240 |
if error_description: |
|
241 |
msg += '%s (%s)' % (error_description, request.GET['error']) |
|
242 |
else: |
|
243 |
msg += request.GET['error'] |
|
244 |
if error_url: |
|
245 |
msg += ' see %s' % error_url |
|
246 |
logger.warning(msg) |
|
247 |
if provider: |
|
248 |
messages.warning(request, _('Login with %(name)s failed, report %(request_id)s ' |
|
249 |
'to an administrator.') |
|
250 |
% { |
|
251 |
'name': provider.name, |
|
252 |
'request_id': request.request_id, |
|
253 |
}) |
|
254 |
else: |
|
255 |
messages.warning(request, _('Login with OpenIDConnect failed, report %s to an ' |
|
256 |
'administrator') % request.request_id) |
|
257 |
return self.continue_to_next_url(request) |
|
214 | 258 | |
215 | 259 | |
216 | 260 |
login_callback = setting_enabled('ENABLE', settings=app_settings)(LoginCallback.as_view()) |
tests/test_auth_oidc.py | ||
---|---|---|
16 | 16 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | 17 | |
18 | 18 |
import datetime |
19 |
import json |
|
19 | 20 |
import os |
20 | 21 |
import pytest |
21 |
import json |
|
22 |
import time |
|
23 | 22 |
import random |
23 |
import re |
|
24 |
import time |
|
24 | 25 | |
26 |
from jwcrypto.common import base64url_encode, base64url_decode, json_encode |
|
25 | 27 |
from jwcrypto.jwk import JWKSet, JWK |
26 |
from jwcrypto.jwt import JWT |
|
27 | 28 |
from jwcrypto.jws import JWS, InvalidJWSObject |
28 |
from jwcrypto.common import base64url_encode, base64url_decode, json_encode
|
|
29 |
from jwcrypto.jwt import JWT
|
|
29 | 30 | |
30 | 31 |
from httmock import urlmatch, HTTMock |
31 | 32 | |
32 |
from django.urls import reverse |
|
33 |
from django.utils.timezone import utc |
|
34 | 33 |
from django.contrib.auth import get_user_model |
34 |
from django.urls import reverse |
|
35 | 35 |
from django.utils.encoding import force_text, force_str |
36 |
from django.utils.timezone import now
|
|
36 |
from django.http import QueryDict
|
|
37 | 37 |
from django.utils.six.moves.urllib import parse as urlparse |
38 |
from django.utils.timezone import now |
|
39 |
from django.utils.timezone import utc |
|
38 | 40 | |
39 | 41 |
from django_rbac.utils import get_ou_model |
40 | 42 | |
... | ... | |
261 | 263 |
if extra_id_token: |
262 | 264 |
id_token.update(extra_id_token) |
263 | 265 | |
264 |
if oidc_provider.idtoken_algo in (OIDCProvider.ALGO_RSA, |
|
265 |
OIDCProvider.ALGO_EC): |
|
266 |
if oidc_provider.idtoken_algo in (OIDCProvider.ALGO_RSA, OIDCProvider.ALGO_EC): |
|
266 | 267 |
alg = { |
267 | 268 |
OIDCProvider.ALGO_RSA: 'RS256', |
268 | 269 |
OIDCProvider.ALGO_EC: 'ES256', |
... | ... | |
270 | 271 |
jwk = None |
271 | 272 |
for key in oidc_provider_jwkset['keys']: |
272 | 273 |
if key.key_type == { |
273 |
OIDCProvider.ALGO_RSA: 'RSA',
|
|
274 |
OIDCProvider.ALGO_EC: 'EC',
|
|
275 |
}.get(oidc_provider.idtoken_algo):
|
|
274 |
OIDCProvider.ALGO_RSA: 'RSA', |
|
275 |
OIDCProvider.ALGO_EC: 'EC', |
|
276 |
}.get(oidc_provider.idtoken_algo): |
|
276 | 277 |
jwk = key |
277 | 278 |
break |
278 | 279 |
if provides_kid_header: |
... | ... | |
281 | 282 |
header = {'alg': alg, 'kid': jwk.key_id} |
282 | 283 |
jwt = JWT(header=header, claims=id_token) |
283 | 284 |
jwt.make_signed_token(jwk) |
284 |
else: # hmac |
|
285 |
else: # hmac
|
|
285 | 286 |
jwt = JWT(header={'alg': 'HS256'}, |
286 | 287 |
claims=id_token) |
287 | 288 |
k = base64url_encode(oidc_provider.client_secret.encode('utf-8')) |
... | ... | |
346 | 347 |
return reverse('oidc-login-callback') |
347 | 348 | |
348 | 349 | |
349 |
def check_simple_qs(qs): |
|
350 |
for k in qs: |
|
351 |
assert len(qs[k]) == 1 |
|
352 |
qs[k] = qs[k][0] |
|
353 |
return qs |
|
354 | ||
355 | ||
356 | 350 |
def test_providers_on_login_page(oidc_provider, app): |
357 | 351 |
response = app.get('/login/') |
358 | 352 |
# two frontends should be present on login page |
... | ... | |
381 | 375 | |
382 | 376 | |
383 | 377 |
def test_login_with_conditional_authenticators(oidc_provider, app, settings, caplog): |
384 |
oidc2_provider = OIDCProvider.objects.create(
|
|
378 |
OIDCProvider.objects.create( |
|
385 | 379 |
id=2, |
386 | 380 |
ou=get_default_ou(), |
387 | 381 |
name='My IDP', |
... | ... | |
482 | 476 |
assert response['Location'] == '/accounts/oidc/login/%s/' % oidc_provider.pk |
483 | 477 | |
484 | 478 | |
485 | ||
486 | 479 |
def test_sso(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks): |
487 | 480 |
OU = get_ou_model() |
488 | 481 |
cassis = OU.objects.create(name='Cassis', slug='cassis') |
... | ... | |
495 | 488 |
assert location.scheme == endpoint.scheme |
496 | 489 |
assert location.netloc == endpoint.netloc |
497 | 490 |
assert location.path == endpoint.path |
498 |
query = check_simple_qs(urlparse.parse_qs(location.query))
|
|
499 |
assert query['state'] in app.session['auth_oidc']
|
|
491 |
query = QueryDict(location.query)
|
|
492 |
state = query['state']
|
|
500 | 493 |
assert query['response_type'] == 'code' |
501 | 494 |
assert query['client_id'] == str(oidc_provider.client_id) |
502 | 495 |
assert query['scope'] == 'openid' |
503 | 496 |
assert query['redirect_uri'] == 'http://testserver' + reverse('oidc-login-callback') |
504 |
# get the nonce |
|
505 |
nonce = app.session['auth_oidc'][query['state']]['request']['nonce'] |
|
497 |
nonce = query['nonce'] |
|
506 | 498 | |
507 | 499 |
if oidc_provider.claims_parameter_supported: |
508 | 500 |
claims = json.loads(query['claims']) |
... | ... | |
517 | 509 | |
518 | 510 |
with utils.check_log(caplog, 'failed to contact the token_endpoint'): |
519 | 511 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code): |
520 |
response = app.get(login_callback_url(oidc_provider), params={'code': 'yyyy', 'state': query['state']})
|
|
512 |
response = app.get(login_callback_url(oidc_provider), params={'code': 'yyyy', 'state': state})
|
|
521 | 513 |
with utils.check_log(caplog, 'invalid id_token'): |
522 | 514 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, |
523 | 515 |
extra_id_token={'iss': None}): |
524 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
516 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
525 | 517 |
with utils.check_log(caplog, 'invalid id_token'): |
526 | 518 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, |
527 | 519 |
extra_id_token={'sub': None}): |
528 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
520 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
529 | 521 |
with utils.check_log(caplog, 'authentication is too old'): |
530 | 522 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, |
531 | 523 |
extra_id_token={'iat': 1}): |
532 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
524 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
533 | 525 |
with utils.check_log(caplog, 'invalid id_token'): |
534 | 526 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, |
535 | 527 |
extra_id_token={'exp': 1}): |
536 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
528 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
537 | 529 |
with utils.check_log(caplog, 'invalid id_token audience'): |
538 | 530 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, |
539 | 531 |
extra_id_token={'aud': 'zz'}): |
540 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
532 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
541 | 533 |
with utils.check_log(caplog, 'expected nonce'): |
542 | 534 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code): |
543 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
535 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
544 | 536 |
assert not hooks.auth_oidc_backend_modify_user |
545 | 537 |
with utils.check_log(caplog, 'created user'): |
546 | 538 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce): |
547 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
539 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
548 | 540 |
assert len(hooks.auth_oidc_backend_modify_user) == 1 |
549 | 541 |
assert set(hooks.auth_oidc_backend_modify_user[0]['kwargs']) >= set( |
550 | 542 |
['user', 'provider', 'user_info', 'id_token', 'access_token']) |
... | ... | |
564 | 556 | |
565 | 557 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, |
566 | 558 |
extra_user_info={'family_name_verified': True}, nonce=nonce): |
567 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
559 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
568 | 560 |
assert AttributeValue.objects.filter(content='Doe', verified=False).count() == 0 |
569 | 561 |
assert AttributeValue.objects.filter(content='Doe', verified=True).count() == 1 |
570 | 562 | |
571 | 563 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, |
572 | 564 |
extra_user_info={'ou': 'cassis'}, nonce=nonce): |
573 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
565 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
574 | 566 |
assert User.objects.count() == 1 |
575 | 567 |
user = User.objects.get() |
576 | 568 |
assert user.ou == cassis |
577 | 569 | |
578 | 570 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce): |
579 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
571 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
580 | 572 |
assert User.objects.count() == 1 |
581 | 573 |
user = User.objects.get() |
582 | 574 |
assert user.ou == get_default_ou() |
... | ... | |
585 | 577 |
time.sleep(0.1) |
586 | 578 | |
587 | 579 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code): |
588 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
580 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
589 | 581 |
assert User.objects.count() == 1 |
590 | 582 |
user = User.objects.get() |
591 | 583 |
assert user.ou == get_default_ou() |
... | ... | |
626 | 618 |
assert oidc_provider.name in response.text |
627 | 619 |
response = response.click(oidc_provider.name) |
628 | 620 |
location = urlparse.urlparse(response.location) |
629 |
query = check_simple_qs(urlparse.parse_qs(location.query)) |
|
630 |
nonce = app.session['auth_oidc'][query['state']]['request']['nonce'] |
|
621 |
query = QueryDict(location.query) |
|
622 |
state = query['state'] |
|
623 |
nonce = query['nonce'] |
|
631 | 624 | |
632 | 625 |
# sub=john.doe, MUST not work |
633 | 626 |
with utils.check_log(caplog, 'cannot create user'): |
634 | 627 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce): |
635 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
628 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
636 | 629 | |
637 | 630 |
# sub=simple_user.uuid MUST work |
638 | 631 |
with utils.check_log(caplog, 'found user using UUID'): |
639 | 632 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, sub=simple_user.uuid, nonce=nonce): |
640 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
633 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
641 | 634 | |
642 | 635 |
assert urlparse.urlparse(response['Location']).path == '/' |
643 | 636 |
assert User.objects.count() == 1 |
... | ... | |
668 | 661 |
assert oidc_provider.name in response.text |
669 | 662 |
response = response.click(oidc_provider.name) |
670 | 663 |
location = urlparse.urlparse(response.location) |
671 |
query = check_simple_qs(urlparse.parse_qs(location.query)) |
|
672 |
nonce = app.session['auth_oidc'][query['state']]['request']['nonce'] |
|
664 |
query = QueryDict(location.query) |
|
665 |
state = query['state'] |
|
666 |
nonce = query['nonce'] |
|
673 | 667 | |
674 | 668 |
# sub=john.doe |
675 | 669 |
with utils.check_log(caplog, 'auth_oidc: created user'): |
676 | 670 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce): |
677 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
671 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
678 | 672 |
assert User.objects.count() == 1 |
679 | 673 | |
680 | 674 |
# second time |
681 | 675 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce): |
682 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
676 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
683 | 677 |
assert User.objects.count() == 1 |
684 | 678 | |
685 | 679 |
# different sub, same user |
686 | 680 |
with utils.check_log(caplog, 'auth_oidc: changed user'): |
687 | 681 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, sub='other', nonce=nonce): |
688 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']})
|
|
682 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state})
|
|
689 | 683 |
assert User.objects.count() == 1 |
690 | 684 | |
691 | 685 | |
... | ... | |
742 | 736 |
assert oidc_provider_rsa.name in response.text |
743 | 737 |
response = response.click(oidc_provider_rsa.name) |
744 | 738 |
location = urlparse.urlparse(response.location) |
745 |
query = check_simple_qs(urlparse.parse_qs(location.query)) |
|
746 |
nonce = app.session['auth_oidc'][query['state']]['request']['nonce'] |
|
739 |
query = QueryDict(location.query) |
|
740 |
state = query['state'] |
|
741 |
nonce = query['nonce'] |
|
747 | 742 | |
748 | 743 |
# test invalid kid |
749 | 744 |
with utils.check_log(caplog, message='not in key set', levelname='WARNING'): |
750 |
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, nonce=nonce, provides_kid_header=True, kid='coin'): |
|
751 |
response = app.get(login_callback_url(oidc_provider_rsa), params={'code': code, 'state': query['state']}) |
|
745 |
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, |
|
746 |
nonce=nonce, provides_kid_header=True, |
|
747 |
kid='coin'): |
|
748 |
response = app.get(login_callback_url(oidc_provider_rsa), |
|
749 |
params={'code': code, 'state': state}) |
|
752 | 750 | |
753 | 751 |
# test missing kid |
754 | 752 |
with utils.check_log(caplog, message='Key ID None not in key set', levelname='WARNING'): |
755 |
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, nonce=nonce, provides_kid_header=True, kid=None): |
|
756 |
response = app.get(login_callback_url(oidc_provider_rsa), params={'code': code, 'state': query['state']}) |
|
753 |
with oidc_provider_mock(oidc_provider_rsa, oidc_provider_jwkset, code, |
|
754 |
nonce=nonce, provides_kid_header=True, |
|
755 |
kid=None): |
|
756 |
response = app.get(login_callback_url(oidc_provider_rsa), |
|
757 |
params={'code': code, 'state': state}) |
|
757 | 758 | |
758 | 759 | |
759 | 760 |
def test_templated_claim_mapping(app, caplog, code, oidc_provider, oidc_provider_jwkset): |
... | ... | |
807 | 808 |
response = app.get('/').maybe_follow() |
808 | 809 |
response = response.click(oidc_provider.name) |
809 | 810 |
location = urlparse.urlparse(response.location) |
810 |
query = check_simple_qs(urlparse.parse_qs(location.query)) |
|
811 |
nonce = app.session['auth_oidc'][query['state']]['request']['nonce'] |
|
811 |
query = QueryDict(location.query) |
|
812 |
state = query['state'] |
|
813 |
nonce = query['nonce'] |
|
812 | 814 | |
813 | 815 |
with oidc_provider_mock(oidc_provider, oidc_provider_jwkset, code, nonce=nonce): |
814 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': query['state']}).maybe_follow() |
|
816 |
response = app.get(login_callback_url(oidc_provider), |
|
817 |
params={'code': code, 'state': state}).maybe_follow() |
|
815 | 818 | |
816 | 819 |
assert User.objects.count() == 1 |
817 | 820 |
user = User.objects.first() |
... | ... | |
822 | 825 |
assert user.last_name == 'DOE' |
823 | 826 |
# typo in template string, no rendering |
824 | 827 |
assert user.first_name == '{{ given_name' |
828 | ||
829 | ||
830 |
def test_lost_state(app, caplog, code, oidc_provider, oidc_provider_jwkset, hooks): |
|
831 |
response = app.get('/login/?next=/whatever/') |
|
832 |
assert oidc_provider.name in response.text |
|
833 |
response = response.click(oidc_provider.name) |
|
834 |
qs = urlparse.parse_qs(urlparse.urlparse(response.location).query) |
|
835 |
state = qs['state'] |
|
836 | ||
837 |
# reset the session to forget the state |
|
838 |
app.cookiejar.clear() |
|
839 | ||
840 |
caplog.clear() |
|
841 |
with utils.norequest: |
|
842 |
response = app.get(login_callback_url(oidc_provider), params={'code': code, 'state': state}) |
|
843 |
# not logged |
|
844 |
assert re.match('^auth-oidc: state.*has been lost', caplog.records[-1].message) |
|
845 |
# event is recorded |
|
846 |
assert '_auth_user_id' not in app.session |
|
847 |
# we are automatically redirected to our destination |
|
848 |
assert response.location == '/accounts/oidc/login/%s/?next=/whatever/' % oidc_provider.pk |
tests/utils.py | ||
---|---|---|
20 | 20 |
import socket |
21 | 21 |
from contextlib import contextmanager, closing |
22 | 22 | |
23 |
import httmock |
|
23 | 24 |
from lxml import etree |
24 | 25 | |
25 | 26 |
from django.core.management import call_command as django_call_command |
... | ... | |
277 | 278 |
assert event.data.get(key) == value, ( |
278 | 279 |
'event.data[%s] != data[%s] (%s != %s)' % (key, key, event.data.get(key), value) |
279 | 280 |
) |
281 | ||
282 | ||
283 |
@httmock.HTTMock |
|
284 |
@httmock.urlmatch() |
|
285 |
def norequest(request, url): |
|
286 |
assert False, 'no request should be done' |
|
280 |
- |