26 |
26 |
import pytest
|
27 |
27 |
from pytest import fixture
|
28 |
28 |
|
|
29 |
from django.contrib.sessions.models import Session
|
|
30 |
from django.contrib.auth.models import User
|
29 |
31 |
from django.urls import reverse
|
30 |
32 |
from django.utils import six
|
31 |
33 |
from django.utils.six.moves.urllib import parse as urlparse
|
... | ... | |
78 |
80 |
|
79 |
81 |
|
80 |
82 |
class MockIdp(object):
|
|
83 |
session_dump = None
|
|
84 |
identity_dump = None
|
|
85 |
|
81 |
86 |
def __init__(self, idp_metadata, private_key, sp_metadata):
|
82 |
87 |
self.server = server = lasso.Server.newFromBuffers(idp_metadata, private_key)
|
83 |
88 |
self.server.signatureMethod = lasso.SIGNATURE_METHOD_RSA_SHA256
|
... | ... | |
85 |
90 |
|
86 |
91 |
def process_authn_request_redirect(self, url, auth_result=True, consent=True, msg=None):
|
87 |
92 |
login = lasso.Login(self.server)
|
|
93 |
if self.identity_dump:
|
|
94 |
login.setIdentityFromDump(self.identity_dump)
|
|
95 |
if self.session_dump:
|
|
96 |
login.setSessionFromDump(self.session_dump)
|
88 |
97 |
login.processAuthnRequestMsg(url.split('?', 1)[1])
|
89 |
98 |
# See
|
90 |
99 |
# https://docs.python.org/2/library/zlib.html#zlib.decompress
|
... | ... | |
151 |
160 |
raise NotImplementedError
|
152 |
161 |
if login.msgBody:
|
153 |
162 |
assert b'rsa-sha256' in base64.b64decode(login.msgBody)
|
|
163 |
if login.identity:
|
|
164 |
self.identity_dump = login.identity.dump()
|
|
165 |
else:
|
|
166 |
self.identity_dump = None
|
|
167 |
if login.session:
|
|
168 |
self.session_dump = login.session.dump()
|
|
169 |
else:
|
|
170 |
self.session_dump = None
|
154 |
171 |
return login.msgUrl, login.msgBody, login.msgRelayState
|
155 |
172 |
|
156 |
173 |
def resolve_artifact(self, soap_message):
|
... | ... | |
167 |
184 |
assert 'rsa-sha256' in login.msgBody
|
168 |
185 |
return '<?xml version="1.0"?>\n' + login.msgBody
|
169 |
186 |
|
|
187 |
def init_slo(self, full=False, method=lasso.HTTP_METHOD_REDIRECT, relay_state=None):
|
|
188 |
logout = lasso.Logout(self.server)
|
|
189 |
logout.setIdentityFromDump(self.identity_dump)
|
|
190 |
logout.setSessionFromDump(self.session_dump)
|
|
191 |
logout.initRequest(None, method)
|
|
192 |
logout.msgRelayState = relay_state
|
|
193 |
if full:
|
|
194 |
logout.request.sessionIndexes = ()
|
|
195 |
logout.request.sessionIndex = None
|
|
196 |
logout.buildRequestMsg()
|
|
197 |
return logout.msgUrl, logout.msgBody, logout.msgRelayState
|
|
198 |
|
|
199 |
def check_slo_return(self, url=None, body=None):
|
|
200 |
logout = lasso.Logout(self.server)
|
|
201 |
logout.setIdentityFromDump(self.identity_dump)
|
|
202 |
logout.setSessionFromDump(self.session_dump)
|
|
203 |
if body:
|
|
204 |
logout.processResponseMsg(force_str(body))
|
|
205 |
else:
|
|
206 |
logout.processResponseMsg(force_str(url.split('?', 1)[-1]))
|
|
207 |
|
170 |
208 |
def mock_artifact_resolver(self):
|
171 |
209 |
@all_requests
|
172 |
210 |
def f(url, request):
|
... | ... | |
195 |
233 |
assert urlparse.urlparse(response['Location']).path == '/singleLogout'
|
196 |
234 |
|
197 |
235 |
|
|
236 |
def test_sso_idp_slo(db, app, idp, caplog, sp_settings):
|
|
237 |
assert Session.objects.count() == 0
|
|
238 |
assert User.objects.count() == 0
|
|
239 |
|
|
240 |
# first session
|
|
241 |
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
|
242 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
|
243 |
assert relay_state
|
|
244 |
assert 'eo:next_url' not in str(idp.request)
|
|
245 |
assert url.endswith(reverse('mellon_login'))
|
|
246 |
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
|
247 |
assert 'created new user' in caplog.text
|
|
248 |
assert 'logged in using SAML' in caplog.text
|
|
249 |
assert urlparse.urlparse(response['Location']).path == '/whatever/'
|
|
250 |
|
|
251 |
# second session
|
|
252 |
app.cookiejar.clear()
|
|
253 |
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
|
254 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
|
255 |
assert relay_state
|
|
256 |
assert 'eo:next_url' not in str(idp.request)
|
|
257 |
assert url.endswith(reverse('mellon_login'))
|
|
258 |
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
|
259 |
assert 'created new user' in caplog.text
|
|
260 |
assert 'logged in using SAML' in caplog.text
|
|
261 |
assert urlparse.urlparse(response['Location']).path == '/whatever/'
|
|
262 |
|
|
263 |
assert Session.objects.count() == 2
|
|
264 |
assert User.objects.count() == 1
|
|
265 |
|
|
266 |
# idp logout
|
|
267 |
url, body, relay_state = idp.init_slo()
|
|
268 |
response = app.get(url)
|
|
269 |
assert response.location.startswith('http://idp5/singleLogoutReturn?')
|
|
270 |
assert Session.objects.count() == 1
|
|
271 |
idp.check_slo_return(response.location)
|
|
272 |
|
|
273 |
|
|
274 |
def test_sso_idp_slo_soap(db, app, idp, caplog, sp_settings):
|
|
275 |
assert Session.objects.count() == 0
|
|
276 |
assert User.objects.count() == 0
|
|
277 |
|
|
278 |
# first session
|
|
279 |
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
|
280 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
|
281 |
assert relay_state
|
|
282 |
assert 'eo:next_url' not in str(idp.request)
|
|
283 |
assert url.endswith(reverse('mellon_login'))
|
|
284 |
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
|
285 |
assert 'created new user' in caplog.text
|
|
286 |
assert 'logged in using SAML' in caplog.text
|
|
287 |
assert urlparse.urlparse(response['Location']).path == '/whatever/'
|
|
288 |
|
|
289 |
# second session
|
|
290 |
app.cookiejar.clear()
|
|
291 |
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
|
292 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
|
293 |
assert relay_state
|
|
294 |
assert 'eo:next_url' not in str(idp.request)
|
|
295 |
assert url.endswith(reverse('mellon_login'))
|
|
296 |
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
|
297 |
assert 'created new user' in caplog.text
|
|
298 |
assert 'logged in using SAML' in caplog.text
|
|
299 |
assert urlparse.urlparse(response['Location']).path == '/whatever/'
|
|
300 |
|
|
301 |
assert Session.objects.count() == 2
|
|
302 |
assert User.objects.count() == 1
|
|
303 |
|
|
304 |
# idp logout
|
|
305 |
url, body, relay_state = idp.init_slo(method=lasso.HTTP_METHOD_SOAP)
|
|
306 |
response = app.post(url, params=body, headers={'Content-Type': force_str('text/xml')})
|
|
307 |
assert Session.objects.count() == 1
|
|
308 |
idp.check_slo_return(body=response.content)
|
|
309 |
|
|
310 |
|
|
311 |
def test_sso_idp_slo_full(db, app, idp, caplog, sp_settings):
|
|
312 |
assert Session.objects.count() == 0
|
|
313 |
assert User.objects.count() == 0
|
|
314 |
|
|
315 |
# first session
|
|
316 |
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
|
317 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
|
318 |
assert relay_state
|
|
319 |
assert 'eo:next_url' not in str(idp.request)
|
|
320 |
assert url.endswith(reverse('mellon_login'))
|
|
321 |
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
|
322 |
assert 'created new user' in caplog.text
|
|
323 |
assert 'logged in using SAML' in caplog.text
|
|
324 |
assert urlparse.urlparse(response['Location']).path == '/whatever/'
|
|
325 |
|
|
326 |
# second session
|
|
327 |
app.cookiejar.clear()
|
|
328 |
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
|
329 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
|
330 |
assert relay_state
|
|
331 |
assert 'eo:next_url' not in str(idp.request)
|
|
332 |
assert url.endswith(reverse('mellon_login'))
|
|
333 |
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
|
334 |
assert 'created new user' in caplog.text
|
|
335 |
assert 'logged in using SAML' in caplog.text
|
|
336 |
assert urlparse.urlparse(response['Location']).path == '/whatever/'
|
|
337 |
|
|
338 |
assert Session.objects.count() == 2
|
|
339 |
assert User.objects.count() == 1
|
|
340 |
|
|
341 |
# idp logout
|
|
342 |
url, body, relay_state = idp.init_slo(full=True)
|
|
343 |
response = app.get(url)
|
|
344 |
assert response.location.startswith('http://idp5/singleLogoutReturn?')
|
|
345 |
assert Session.objects.count() == 0
|
|
346 |
idp.check_slo_return(url=response.location)
|
|
347 |
|
|
348 |
|
|
349 |
def test_sso_idp_slo_full_soap(db, app, idp, caplog, sp_settings):
|
|
350 |
assert Session.objects.count() == 0
|
|
351 |
assert User.objects.count() == 0
|
|
352 |
|
|
353 |
# first session
|
|
354 |
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
|
355 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
|
356 |
assert relay_state
|
|
357 |
assert 'eo:next_url' not in str(idp.request)
|
|
358 |
assert url.endswith(reverse('mellon_login'))
|
|
359 |
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
|
360 |
assert 'created new user' in caplog.text
|
|
361 |
assert 'logged in using SAML' in caplog.text
|
|
362 |
assert urlparse.urlparse(response['Location']).path == '/whatever/'
|
|
363 |
|
|
364 |
# second session
|
|
365 |
app.cookiejar.clear()
|
|
366 |
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
|
367 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
|
368 |
assert relay_state
|
|
369 |
assert 'eo:next_url' not in str(idp.request)
|
|
370 |
assert url.endswith(reverse('mellon_login'))
|
|
371 |
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
|
372 |
assert 'created new user' in caplog.text
|
|
373 |
assert 'logged in using SAML' in caplog.text
|
|
374 |
assert urlparse.urlparse(response['Location']).path == '/whatever/'
|
|
375 |
|
|
376 |
assert Session.objects.count() == 2
|
|
377 |
assert User.objects.count() == 1
|
|
378 |
|
|
379 |
# idp logout
|
|
380 |
url, body, relay_state = idp.init_slo(method=lasso.HTTP_METHOD_SOAP, full=True)
|
|
381 |
response = app.post(url, params=body, headers={'Content-Type': force_str('text/xml')})
|
|
382 |
assert Session.objects.count() == 0
|
|
383 |
idp.check_slo_return(body=response.content)
|
|
384 |
|
|
385 |
|
198 |
386 |
def test_sso(db, app, idp, caplog, sp_settings):
|
199 |
387 |
response = app.get(reverse('mellon_login'))
|
200 |
388 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
... | ... | |
330 |
518 |
acs_artifact_url = url.split('testserver', 1)[1]
|
331 |
519 |
with HTTMock(idp.mock_artifact_resolver()):
|
332 |
520 |
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
|
333 |
|
assert 'created new user' in caplog.text
|
|
521 |
assert 'created new user' not in caplog.text
|
334 |
522 |
assert 'logged in using SAML' in caplog.text
|
335 |
523 |
assert urlparse.urlparse(response['Location']).path == '/whatever/'
|
336 |
524 |
|