0001-qommon-saml2-on-artifact-resolution-error-show-an-er.patch
tests/test_saml_auth.py | ||
---|---|---|
9 | 9 |
import lasso |
10 | 10 |
except ImportError: |
11 | 11 |
lasso = None |
12 | ||
12 |
import mock |
|
13 | 13 |
import pytest |
14 | 14 |
from quixote import get_session_manager |
15 | 15 |
from quixote.errors import RequestError |
... | ... | |
18 | 18 |
from wcs.qommon.http_request import HTTPRequest |
19 | 19 |
from wcs.qommon.ident.idp import MethodAdminDirectory |
20 | 20 |
from wcs.qommon.misc import get_lasso_server |
21 |
from wcs.qommon.saml2 import Saml2Directory |
|
21 |
from wcs.qommon.saml2 import Saml2Directory, SOAPException
|
|
22 | 22 | |
23 | 23 |
from .test_hobo_notify import PROFILE |
24 | 24 |
from .utilities import clean_temporary_pub, create_temporary_pub, get_app |
... | ... | |
118 | 118 |
assert 'rsa-sha256' in req.response.headers['location'] |
119 | 119 | |
120 | 120 | |
121 |
def get_authn_response_msg(pub, ni_format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT): |
|
121 |
def get_authn_response_msg( |
|
122 |
pub, |
|
123 |
ni_format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT, |
|
124 |
protocol_binding=lasso.SAML2_METADATA_BINDING_POST, |
|
125 |
): |
|
122 | 126 |
idp_metadata_filepath = os.path.join(pub.app_dir, 'idp-http-sso.example.net-saml2-metadata-metadata.xml') |
123 | 127 |
idp_key_filepath = os.path.join(pub.app_dir, 'idp-http-sso.example.net-saml2-metadata-privatekey.pem') |
124 | 128 |
idp = lasso.Server(idp_metadata_filepath, idp_key_filepath, None, None) |
... | ... | |
131 | 135 |
login.initIdpInitiatedAuthnRequest(pub.cfg['sp']['saml2_providerid']) |
132 | 136 |
login.request.nameIDPolicy.format = ni_format |
133 | 137 |
login.request.nameIDPolicy.allowCreate = True |
134 |
login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_POST
|
|
138 |
login.request.protocolBinding = protocol_binding
|
|
135 | 139 |
login.processAuthnRequestMsg(None) |
136 | 140 |
login.validateRequestMsg(True, True) |
137 | 141 |
login.buildAssertion( |
... | ... | |
181 | 185 |
attributes.append(role_slug_attribute) |
182 | 186 |
login.assertion.attributeStatement[0].attribute = attributes |
183 | 187 | |
184 |
login.buildAuthnResponseMsg() |
|
185 |
return login.msgBody |
|
188 |
if protocol_binding == lasso.SAML2_METADATA_BINDING_POST: |
|
189 |
login.buildAuthnResponseMsg() |
|
190 |
return login.msgBody |
|
191 |
else: |
|
192 |
login.buildArtifactMsg(lasso.HTTP_METHOD_ARTIFACT_GET) |
|
193 |
return login.msgUrl |
|
186 | 194 | |
187 | 195 | |
188 | 196 |
def get_assertion_consumer_request(pub, ni_format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT): |
... | ... | |
343 | 351 |
saml2.assertionConsumerPost() |
344 | 352 | |
345 | 353 | |
354 |
@mock.patch('wcs.qommon.saml2.soap_call') |
|
355 |
def test_assertion_consumer_artifact_error(mock_soap_call, pub): |
|
356 |
def get_assertion_consumer_request(pub, ni_format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT): |
|
357 |
msg_url = get_authn_response_msg(pub, protocol_binding=lasso.SAML2_METADATA_BINDING_ARTIFACT) |
|
358 |
artifact = urllib.parse.parse_qs(urllib.parse.urlparse(msg_url).query)['SAMLart'][0] |
|
359 |
req = HTTPRequest( |
|
360 |
None, |
|
361 |
{ |
|
362 |
'SERVER_NAME': 'example.net', |
|
363 |
'SCRIPT_NAME': '', |
|
364 |
'PATH_INFO': '/saml/assertionConsumerArtifact', |
|
365 |
'QUERY_STRING': urllib.parse.urlencode( |
|
366 |
{'SAMLart': artifact, 'RelayState': '/foobar/?test=ok'} |
|
367 |
), |
|
368 |
}, |
|
369 |
) |
|
370 |
req.process_inputs() |
|
371 |
pub._set_request(req) |
|
372 |
pub.session_class.wipe() |
|
373 |
req.session = pub.session_class(id=1) |
|
374 |
assert req.session.user is None |
|
375 |
return req |
|
376 | ||
377 |
mock_soap_call.side_effect = SOAPException() |
|
378 |
req = get_assertion_consumer_request(pub) |
|
379 |
saml2 = Saml2Directory() |
|
380 |
saml2.assertionConsumerArtifact() |
|
381 |
assert req.response.status_code == 302 |
|
382 |
assert req.response.headers['location'] == 'http://example.net/saml/error?RelayState=/foobar/%3Ftest%3Dok' |
|
383 | ||
384 | ||
385 |
def test_saml_error_page(pub): |
|
386 |
resp = get_app(pub).get('/saml/error?RelayState=/foobar/%3Ftest%3Dok') |
|
387 |
resp = resp.form.submit() |
|
388 |
assert resp.status_int == 302 |
|
389 |
assert urllib.parse.parse_qs(urllib.parse.urlparse(resp.location).query)['RelayState'] == [ |
|
390 |
'/foobar/?test=ok' |
|
391 |
] |
|
392 | ||
393 | ||
346 | 394 |
def test_saml_login_page(pub): |
347 | 395 |
resp = get_app(pub).get('/login/') |
348 | 396 |
assert resp.status_int == 302 |
wcs/qommon/saml2.py | ||
---|---|---|
37 | 37 |
) |
38 | 38 |
from quixote.directory import Directory |
39 | 39 |
from quixote.errors import RequestError |
40 |
from quixote.html import TemplateIO, htmltext |
|
40 | 41 |
from quixote.http_request import parse_header |
41 | 42 | |
42 | 43 |
from . import _, errors, force_str, misc |
43 | 44 |
from .publisher import get_cfg, get_logger |
44 |
from .template import error_page |
|
45 |
from .template import error_page, html_top
|
|
45 | 46 | |
46 | 47 | |
47 | 48 |
class SOAPException(Exception): |
... | ... | |
120 | 121 |
'metadata', |
121 | 122 |
('metadata.xml', 'metadata'), |
122 | 123 |
'public_key', |
124 |
'error', |
|
123 | 125 |
] |
124 | 126 | |
125 | 127 |
def _q_traverse(self, path): |
... | ... | |
168 | 170 |
def login(self): |
169 | 171 |
return self.perform_login() |
170 | 172 | |
171 |
def perform_login(self, idp=None): |
|
173 |
def perform_login(self, idp=None, relay_state=None):
|
|
172 | 174 |
server = misc.get_lasso_server() |
173 | 175 |
if not server: |
174 | 176 |
return error_page(_('SAML 2.0 support not yet configured.')) |
... | ... | |
185 | 187 |
login.request.forceAuthn = get_request().form.get('forceAuthn') == 'true' |
186 | 188 |
login.request.isPassive = get_request().form.get('IsPassive') == 'true' |
187 | 189 |
login.request.consent = 'urn:oasis:names:tc:SAML:2.0:consent:current-implicit' |
188 |
if isinstance(get_request().form.get('next'), str): |
|
189 |
login.msgRelayState = get_request().form.get('next') |
|
190 | 190 | |
191 |
next_url = login.msgRelayState or get_publisher().get_frontoffice_url() |
|
191 |
if not relay_state and isinstance(get_request().form.get('next'), str): |
|
192 |
relay_state = get_request().form.get('next') |
|
193 |
if relay_state: |
|
194 |
login.msgRelayState = relay_state |
|
195 | ||
196 |
next_url = relay_state or get_publisher().get_frontoffice_url() |
|
192 | 197 |
parsed_url = urllib.parse.urlparse(next_url) |
193 | 198 |
request = get_request() |
194 | 199 |
scheme = parsed_url.scheme or request.get_scheme() |
... | ... | |
242 | 247 |
try: |
243 | 248 |
soap_answer = soap_call(login.msgUrl, login.msgBody, client_cert=client_cert) |
244 | 249 |
except SOAPException: |
245 |
return error_page(_('Failure to communicate with identity provider')) |
|
250 |
relay_state = request.form.get('RelayState', None) |
|
251 |
path = '/saml/error' |
|
252 |
if relay_state: |
|
253 |
path += '?RelayState=' + urllib.parse.quote(relay_state) |
|
254 |
return redirect(path) |
|
246 | 255 | |
247 | 256 |
try: |
248 | 257 |
login.processResponseMsg(force_str(soap_answer)) |
... | ... | |
786 | 795 |
publickey = open(misc.get_abs_path(get_cfg('sp')['publickey'])).read() |
787 | 796 |
return publickey |
788 | 797 | |
798 |
def error(self): |
|
799 |
request = get_request() |
|
800 |
if request.get_method() == 'POST': |
|
801 |
return self.perform_login(relay_state=request.form.get('RelayState')) |
|
802 |
html_top(title=_('Authentication error')) |
|
803 |
r = TemplateIO(html=True) |
|
804 |
r += htmltext('<p>%s</p>') % ( |
|
805 |
_('There was a temporary error during your authentication, please retry later.') |
|
806 |
) |
|
807 |
r += htmltext( |
|
808 |
'''<form method="post"> |
|
809 |
<button>%s</button> |
|
810 |
</form>''' |
|
811 |
% _('Continue') |
|
812 |
) |
|
813 |
return r.getvalue() |
|
814 | ||
789 | 815 |
# retain compatibility with old metadatas |
790 | 816 |
singleSignOnArtifact = assertionConsumerArtifact |
791 | 817 |
singleSignOnPost = assertionConsumerPost |
792 |
- |