|
1 |
import base64
|
|
2 |
import urlparse
|
|
3 |
import zlib
|
|
4 |
|
1 |
5 |
import lasso
|
2 |
6 |
|
3 |
7 |
from pytest import fixture
|
... | ... | |
57 |
61 |
def process_authn_request_redirect(self, url, auth_result=True, consent=True):
|
58 |
62 |
login = lasso.Login(self.server)
|
59 |
63 |
login.processAuthnRequestMsg(url.split('?', 1)[1])
|
|
64 |
# See
|
|
65 |
# https://docs.python.org/2/library/zlib.html#zlib.decompress
|
|
66 |
# for the -15 magic value.
|
|
67 |
#
|
|
68 |
# * -8 to -15: Uses the absolute value of wbits as the window size
|
|
69 |
# logarithm. The input must be a raw stream with no header or trailer.
|
|
70 |
#
|
|
71 |
# it means Deflate instead of GZIP (same stream no header, no trailer)
|
|
72 |
self.request = zlib.decompress(
|
|
73 |
base64.b64decode(
|
|
74 |
urlparse.parse_qs(
|
|
75 |
urlparse.urlparse(url).query)['SAMLRequest'][0]), -15)
|
60 |
76 |
try:
|
61 |
77 |
login.validateRequestMsg(auth_result, consent)
|
62 |
78 |
except lasso.LoginRequestDeniedError:
|
... | ... | |
75 |
91 |
login.buildAuthnResponseMsg()
|
76 |
92 |
else:
|
77 |
93 |
raise NotImplementedError
|
78 |
|
return login.msgUrl, login.msgBody
|
|
94 |
return login.msgUrl, login.msgBody, login.msgRelayState
|
79 |
95 |
|
80 |
96 |
def resolve_artifact(self, soap_message):
|
81 |
97 |
login = lasso.Login(self.server)
|
... | ... | |
104 |
120 |
|
105 |
121 |
|
106 |
122 |
def test_sso_slo(db, app, idp, caplog, sp_settings):
|
107 |
|
response = app.get(reverse('mellon_login'))
|
108 |
|
url, body = idp.process_authn_request_redirect(response['Location'])
|
|
123 |
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
|
124 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
|
125 |
assert relay_state
|
|
126 |
assert 'eo:next_url' not in idp.request
|
109 |
127 |
assert url.endswith(reverse('mellon_login'))
|
110 |
|
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body})
|
|
128 |
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
111 |
129 |
assert 'created new user' in caplog.text
|
112 |
130 |
assert 'logged in using SAML' in caplog.text
|
113 |
|
assert response['Location'].endswith(sp_settings.LOGIN_REDIRECT_URL)
|
|
131 |
assert response['Location'].endswith('/whatever/')
|
114 |
132 |
|
115 |
133 |
|
116 |
134 |
def test_sso(db, app, idp, caplog, sp_settings):
|
117 |
135 |
response = app.get(reverse('mellon_login'))
|
118 |
|
url, body = idp.process_authn_request_redirect(response['Location'])
|
|
136 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
|
137 |
assert not relay_state
|
119 |
138 |
assert url.endswith(reverse('mellon_login'))
|
120 |
|
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body})
|
|
139 |
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
121 |
140 |
assert 'created new user' in caplog.text
|
122 |
141 |
assert 'logged in using SAML' in caplog.text
|
123 |
142 |
assert response['Location'].endswith(sp_settings.LOGIN_REDIRECT_URL)
|
... | ... | |
125 |
144 |
|
126 |
145 |
def test_sso_request_denied(db, app, idp, caplog, sp_settings):
|
127 |
146 |
response = app.get(reverse('mellon_login'))
|
128 |
|
url, body = idp.process_authn_request_redirect(response['Location'], auth_result=False)
|
|
147 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'], auth_result=False)
|
|
148 |
assert not relay_state
|
129 |
149 |
assert url.endswith(reverse('mellon_login'))
|
130 |
|
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body})
|
|
150 |
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
131 |
151 |
assert "status is not success codes: [u'urn:oasis:names:tc:SAML:2.0:status:Responder',\
|
132 |
152 |
u'urn:oasis:names:tc:SAML:2.0:status:RequestDenied']" in caplog.text
|
133 |
153 |
|
... | ... | |
137 |
157 |
request = rf.get('/')
|
138 |
158 |
sp_metadata = create_metadata(request)
|
139 |
159 |
idp = MockIdp(idp_metadata, idp_private_key, sp_metadata)
|
140 |
|
response = app.get(reverse('mellon_login'))
|
141 |
|
url, body = idp.process_authn_request_redirect(response['Location'])
|
|
160 |
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
|
161 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
|
162 |
assert relay_state
|
142 |
163 |
assert body is None
|
143 |
164 |
assert reverse('mellon_login') in url
|
144 |
165 |
assert 'SAMLart' in url
|
145 |
166 |
acs_artifact_url = url.split('testserver', 1)[1]
|
146 |
167 |
with HTTMock(idp.mock_artifact_resolver()):
|
147 |
|
response = app.get(acs_artifact_url)
|
|
168 |
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
|
148 |
169 |
assert 'created new user' in caplog.text
|
149 |
170 |
assert 'logged in using SAML' in caplog.text
|
150 |
|
assert response['Location'].endswith(sp_settings.LOGIN_REDIRECT_URL)
|
|
171 |
assert response['Location'].endswith('/whatever/')
|
151 |
172 |
# force delog
|
152 |
|
app.session.flush()
|
|
173 |
del app.session['_auth_user_id']
|
153 |
174 |
assert 'dead artifact' not in caplog.text
|
154 |
175 |
with HTTMock(idp.mock_artifact_resolver()):
|
155 |
|
response = app.get(acs_artifact_url)
|
|
176 |
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
|
156 |
177 |
# verify retry login was asked
|
157 |
178 |
assert 'dead artifact' in caplog.text
|
158 |
179 |
assert response.status_code == 302
|
159 |
180 |
assert reverse('mellon_login') in url
|
160 |
181 |
response = response.follow()
|
161 |
|
url, body = idp.process_authn_request_redirect(response['Location'])
|
|
182 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
|
183 |
assert relay_state
|
162 |
184 |
reset_caplog(caplog)
|
163 |
185 |
# verify caplog has been cleaned
|
164 |
186 |
assert 'created new user' not in caplog.text
|
... | ... | |
167 |
189 |
assert 'SAMLart' in url
|
168 |
190 |
acs_artifact_url = url.split('testserver', 1)[1]
|
169 |
191 |
with HTTMock(idp.mock_artifact_resolver()):
|
170 |
|
response = app.get(acs_artifact_url)
|
|
192 |
response = app.get(acs_artifact_url, params={'RelayState': relay_state})
|
171 |
193 |
assert 'created new user' in caplog.text
|
172 |
194 |
assert 'logged in using SAML' in caplog.text
|
173 |
|
assert response['Location'].endswith(sp_settings.LOGIN_REDIRECT_URL)
|
|
195 |
assert response['Location'].endswith('/whatever/')
|
|
196 |
|
|
197 |
|
|
198 |
def test_sso_slo_pass_next_url(db, app, idp, caplog, sp_settings):
|
|
199 |
sp_settings.MELLON_ADD_AUTHNREQUEST_NEXT_URL_EXTENSION = True
|
|
200 |
response = app.get(reverse('mellon_login') + '?next=/whatever/')
|
|
201 |
url, body, relay_state = idp.process_authn_request_redirect(response['Location'])
|
|
202 |
assert 'eo:next_url' in idp.request
|
|
203 |
assert url.endswith(reverse('mellon_login'))
|
|
204 |
response = app.post(reverse('mellon_login'), params={'SAMLResponse': body, 'RelayState': relay_state})
|
|
205 |
assert 'created new user' in caplog.text
|
|
206 |
assert 'logged in using SAML' in caplog.text
|
|
207 |
assert response['Location'].endswith('/whatever/')
|
174 |
|
-
|