0002-idp-saml-set-SessionNotOnAfter-on-AuthnStatement-fro.patch
src/authentic2/idp/saml/saml2_endpoints.py | ||
---|---|---|
73 | 73 |
from authentic2.idp import signals as idp_signals |
74 | 74 | |
75 | 75 |
from authentic2.utils import (make_url, get_backends as get_idp_backends, |
76 |
get_username, login_require, find_authentication_event) |
|
76 |
get_username, login_require, find_authentication_event, datetime_to_xs_datetime)
|
|
77 | 77 |
from authentic2.decorators import is_transient_user |
78 | 78 |
from authentic2.attributes_ng.engine import get_attributes |
79 | 79 | |
... | ... | |
359 | 359 |
notOnOrAfter.isoformat() + 'Z') |
360 | 360 |
assertion = login.assertion |
361 | 361 |
assertion.conditions.notOnOrAfter = notOnOrAfter.isoformat() + 'Z' |
362 |
# Set SessionNotOnOrAfter to expiry date of the current session, so we are sure no session on |
|
363 |
# service providers can outlive the IdP session. |
|
364 |
expiry_date = request.session.get_expiry_date() |
|
365 |
assertion.authnStatement[0].sessionNotOnOrAfter = datetime_to_xs_datetime(expiry_date) |
|
362 | 366 |
logger.debug("assertion building in progress %s" \ |
363 | 367 |
% assertion.dump()) |
364 | 368 |
logger.debug("fill assertion") |
... | ... | |
395 | 399 |
kwargs['user'] = request.user |
396 | 400 |
logger.info(u'sending nameID %(name_id_format)s: ' |
397 | 401 |
'%(name_id_content)s to %(entity_id)s for user %(user)s' % kwargs) |
402 | ||
398 | 403 |
register_new_saml2_session(request, login) |
399 | 404 | |
400 | 405 |
src/authentic2/idp/saml/tests.py | ||
---|---|---|
1 |
import base64 |
|
2 |
import unittest |
|
3 |
import StringIO |
|
4 |
import urlparse |
|
5 |
from lxml.html import parse |
|
6 | ||
7 |
from django.test import Client |
|
8 |
from django.test.utils import override_settings |
|
9 |
from django.contrib.auth import get_user_model, REDIRECT_FIELD_NAME |
|
10 |
from django.core.urlresolvers import reverse |
|
11 |
from django.utils.translation import gettext as _ |
|
12 | ||
13 |
from authentic2.tests import Authentic2TestCase |
|
14 |
from authentic2.saml import models as saml_models |
|
15 |
from authentic2.a2_rbac.models import Role, OrganizationalUnit |
|
16 |
from authentic2.utils import make_url |
|
17 |
from authentic2.constants import NONCE_FIELD_NAME |
|
18 | ||
19 |
try: |
|
20 |
import lasso |
|
21 |
except ImportError: |
|
22 |
lasso = None |
|
23 | ||
24 |
from . import app_settings |
|
25 | ||
26 | ||
27 |
@unittest.skipUnless(lasso is not None, 'lasso is not installed') |
|
28 |
@override_settings(A2_IDP_SAML2_ENABLE=True) |
|
29 |
class SamlBaseTestCase(Authentic2TestCase): |
|
30 |
METADATA_TPL = '''<?xml version="1.0" encoding="UTF-8" standalone="yes"?> |
|
31 |
<EntityDescriptor |
|
32 |
entityID="{base_url}/" |
|
33 |
xmlns="urn:oasis:names:tc:SAML:2.0:metadata"> |
|
34 |
<SPSSODescriptor |
|
35 |
AuthnRequestsSigned="true" |
|
36 |
WantAssertionsSigned="true" |
|
37 |
protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"> |
|
38 |
<SingleLogoutService |
|
39 |
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" |
|
40 |
Location="https://files.entrouvert.org/mellon/logout" /> |
|
41 |
<AssertionConsumerService |
|
42 |
index="0" |
|
43 |
isDefault="true" |
|
44 |
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" |
|
45 |
Location="{base_url}/sso/POST" /> |
|
46 |
<AssertionConsumerService |
|
47 |
index="1" |
|
48 |
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact" |
|
49 |
Location="{base_url}/mellon/artifactResponse" /> |
|
50 |
</SPSSODescriptor> |
|
51 |
</EntityDescriptor>''' |
|
52 | ||
53 |
def get_sp_metadata(self, base_url='https://sp.example.com'): |
|
54 |
return self.METADATA_TPL.format(base_url=base_url) |
|
55 | ||
56 |
def get_idp_metadata(self): |
|
57 |
client = Client() |
|
58 |
response = client.get(reverse('a2-idp-saml-metadata')) |
|
59 |
# FIXME: add better test of well formedness for metadata |
|
60 |
self.assertEqual(response['Content-type'], 'text/xml', |
|
61 |
msg='metadata endpoint did not return an XML ' |
|
62 |
'document') |
|
63 |
self.assertIn('IDPSSODescriptor', response.content, |
|
64 |
msg='metadata endpoint does not contain an ' |
|
65 |
'IDPSSODescriptor element') |
|
66 |
return response.content |
|
67 | ||
68 |
def get_server(self, base_url='https://sp.example.com'): |
|
69 |
sp_meta = self.get_sp_metadata(base_url=base_url) |
|
70 |
idp_meta = self.get_idp_metadata() |
|
71 |
server = lasso.Server.newFromBuffers(sp_meta) |
|
72 |
server.addProviderFromBuffer(lasso.PROVIDER_ROLE_IDP, idp_meta) |
|
73 |
return server |
|
74 | ||
75 |
def setup(self, default_name_id_format='persistent'): |
|
76 |
self.base_url = 'https://sp.example.com' |
|
77 |
self.name = 'Test SP' |
|
78 |
self.slug = 'test-sp' |
|
79 |
self.email = 'john.doe@example.com' |
|
80 |
self.username = 'john.doe' |
|
81 |
self.first_name = 'John' |
|
82 |
self.last_name = 'Doe' |
|
83 |
self.password = 'T0toto' |
|
84 |
self.user = get_user_model().objects.create( |
|
85 |
email=self.email, |
|
86 |
username=self.username, |
|
87 |
first_name=self.first_name, |
|
88 |
last_name=self.last_name) |
|
89 |
self.user.set_password(self.password) |
|
90 |
self.user.save() |
|
91 |
self.default_ou = OrganizationalUnit.objects.get() |
|
92 |
sp_meta = self.get_sp_metadata() |
|
93 |
self.liberty_provider = saml_models.LibertyProvider( |
|
94 |
name=self.name, |
|
95 |
slug=self.slug, |
|
96 |
ou=self.default_ou, |
|
97 |
metadata=sp_meta) |
|
98 |
self.liberty_provider.clean() |
|
99 |
self.liberty_provider.save() |
|
100 |
self.liberty_service_provider = saml_models.LibertyServiceProvider \ |
|
101 |
.objects.create( |
|
102 |
liberty_provider=self.liberty_provider, |
|
103 |
enabled=True) |
|
104 |
self.default_sp_options_idp_policy = saml_models.SPOptionsIdPPolicy \ |
|
105 |
.objects.create( |
|
106 |
name='Default', |
|
107 |
enabled=True, |
|
108 |
authn_request_signed=False, |
|
109 |
default_name_id_format=default_name_id_format, |
|
110 |
accepted_name_id_format=['email', 'persistent', 'transient', |
|
111 |
'username']) |
|
112 |
self.admin_role = Role.objects.create( |
|
113 |
name='Administrator', |
|
114 |
slug='administrator', |
|
115 |
service=self.liberty_provider) |
|
116 |
self.admin_role.attributes.create( |
|
117 |
name='superuser', |
|
118 |
kind='strig', |
|
119 |
value='true') |
|
120 |
self.admin_role.members.add(self.user) |
|
121 |
self.first_name_attribute = self.liberty_provider.attributes.create( |
|
122 |
name_format='basic', |
|
123 |
name='first-name', |
|
124 |
friendly_name='First name', |
|
125 |
attribute_name='django_user_first_name') |
|
126 |
self.last_name_attribute = self.liberty_provider.attributes.create( |
|
127 |
name_format='basic', |
|
128 |
name='last-name', |
|
129 |
friendly_name='Last name', |
|
130 |
attribute_name='django_user_last_name') |
|
131 |
self.superuser_attribute = self.liberty_provider.attributes.create( |
|
132 |
name_format='basic', |
|
133 |
name='superuser', |
|
134 |
friendly_name='Superuser status', |
|
135 |
attribute_name='superuser') |
|
136 | ||
137 |
def make_authn_request( |
|
138 |
self, idp=None, |
|
139 |
method=lasso.HTTP_METHOD_REDIRECT, |
|
140 |
allow_create=None, |
|
141 |
format=None, |
|
142 |
relay_state=None, |
|
143 |
force_authn=None, |
|
144 |
is_passive=None, |
|
145 |
sp_name_qualifier=None, |
|
146 |
sign=False, |
|
147 |
name_id_policy=True): |
|
148 |
server = self.get_server() |
|
149 |
login = lasso.Login(server) |
|
150 |
if not sign: |
|
151 |
login.setSignatureHint(lasso.PROFILE_SIGNATURE_HINT_FORBID) |
|
152 |
login.initAuthnRequest(idp, method) |
|
153 |
request = login.request |
|
154 |
policy = request.nameIdPolicy |
|
155 |
if force_authn is not None: |
|
156 |
request.forceAuthn = force_authn |
|
157 |
if is_passive is not None: |
|
158 |
request.isPassive = is_passive |
|
159 |
if allow_create is not None: |
|
160 |
policy.allowCreate = allow_create |
|
161 |
if format is not None: |
|
162 |
policy.format = format |
|
163 |
if sp_name_qualifier is not None: |
|
164 |
policy.spNameQualifier = sp_name_qualifier |
|
165 |
if relay_state is not None: |
|
166 |
login.msgRelayState = relay_state |
|
167 |
if not name_id_policy: |
|
168 |
request.nameIdPolicy = None |
|
169 |
login.buildAuthnRequestMsg() |
|
170 |
if method == lasso.HTTP_METHOD_REDIRECT: |
|
171 |
self.assertIsNone(login.msgBody, 'body should be None with ' |
|
172 |
'method Redirect') |
|
173 |
elif method == lasso.HTTP_METHOD_POST: |
|
174 |
self.assertIsNotNone(login.msgBody) |
|
175 |
self.assertIsNone(login.msgBody, 'body should be None with method ' |
|
176 |
'Redirect') |
|
177 |
url_parsed = urlparse.urlparse(login.msgUrl) |
|
178 |
self.assertEqual(url_parsed.path, reverse('a2-idp-saml-sso'), |
|
179 |
'msgUrl should target the sso endpoint') |
|
180 |
return login.msgUrl, login.msgBody, request.id |
|
181 | ||
182 |
def parse_authn_response(self, saml_response): |
|
183 |
server = self.get_server() |
|
184 |
login = lasso.Login(server) |
|
185 |
login.processAuthnResponseMsg(saml_response) |
|
186 |
login.acceptSso() |
|
187 |
return login |
|
188 | ||
189 | ||
190 |
class SamlSSOTestCase(SamlBaseTestCase): |
|
191 |
def test_sso_login_redirect(self): |
|
192 |
self.do_test_sso(dict(allow_create=True, |
|
193 |
format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT)) |
|
194 | ||
195 |
def test_sso_cancel_redirect(self): |
|
196 |
self.do_test_sso(dict(allow_create=True), cancel=True) |
|
197 | ||
198 |
def test_sso_no_name_id_policy_redirect(self): |
|
199 |
self.do_test_sso(dict(allow_create=True, name_id_policy=False), |
|
200 |
check_federation=False, default_name_id_format='email') |
|
201 | ||
202 |
def test_sso_name_id_policy_username(self): |
|
203 |
self.do_test_sso(dict(allow_create=True, name_id_policy=False), |
|
204 |
check_federation=True, default_name_id_format='username') |
|
205 | ||
206 |
def test_sso_name_id_policy_uuid(self): |
|
207 |
self.do_test_sso(dict(allow_create=True, name_id_policy=False), |
|
208 |
check_federation=True, default_name_id_format='uuid') |
|
209 | ||
210 |
def do_test_sso(self, make_authn_request_kwargs={}, check_federation=True, |
|
211 |
cancel=False, default_name_id_format='persistent'): |
|
212 |
self.setup(default_name_id_format=default_name_id_format) |
|
213 |
client = Client() |
|
214 |
# Launch an AuthnRequest |
|
215 |
url, body, request_id = self.make_authn_request( |
|
216 |
**make_authn_request_kwargs) |
|
217 |
response = client.get(url) |
|
218 |
self.assertRedirectsComplex(response, reverse('auth_login'), **{ |
|
219 |
'nonce': '*', |
|
220 |
REDIRECT_FIELD_NAME: make_url('a2-idp-saml-continue', |
|
221 |
params={ |
|
222 |
NONCE_FIELD_NAME: request_id |
|
223 |
} |
|
224 |
), |
|
225 |
}) |
|
226 |
nonce = urlparse.parse_qs( |
|
227 |
urlparse.urlparse( |
|
228 |
response['Location']).query)['nonce'][0] |
|
229 |
url = response['Location'] |
|
230 |
response = client.get(url) |
|
231 |
self.assertEqual(response.status_code, 200) |
|
232 |
self.assertEqual(response['Content-Type'].split(';')[0], 'text/html') |
|
233 |
self.assertInHTML(u'<input type="submit" name="cancel" ' |
|
234 |
'value="%s"/>' % _('Cancel'), response.content, |
|
235 |
count=1) |
|
236 |
if cancel: |
|
237 |
response = client.post(url, { |
|
238 |
'cancel': 1, |
|
239 |
}) |
|
240 |
self.assertRedirectsComplex(response, |
|
241 |
reverse('a2-idp-saml-continue'), |
|
242 |
cancel='*', nonce=nonce) |
|
243 |
response = client.get(response['Location']) |
|
244 |
self.assertEqual(response.status_code, 200) |
|
245 |
self.assertEqual(response['Content-type'].split(';')[0], |
|
246 |
'text/html') |
|
247 |
doc = parse(StringIO.StringIO(response.content)).getroot() |
|
248 |
self.assertEqual(len(doc.forms), 1, |
|
249 |
msg='the number of forms is not 1') |
|
250 |
self.assertEqual(doc.forms[0].get('action'), |
|
251 |
'%s/sso/POST' % self.base_url) |
|
252 |
self.assertIn('SAMLResponse', doc.forms[0].fields) |
|
253 |
saml_response = doc.forms[0].fields['SAMLResponse'] |
|
254 |
try: |
|
255 |
base64.b64decode(saml_response) |
|
256 |
except TypeError: |
|
257 |
self.fail('SAMLResponse is not base64 encoded: %s' |
|
258 |
% saml_response) |
|
259 |
with self.assertRaises(lasso.ProfileRequestDeniedError): |
|
260 |
assertion = self.parse_authn_response(saml_response) |
|
261 |
else: |
|
262 |
response = client.post(url, { |
|
263 |
'username': self.email, |
|
264 |
'password': self.password, |
|
265 |
'login-password-submit': 1, |
|
266 |
}) |
|
267 |
self.assertRedirectsComplex( |
|
268 |
response, reverse('a2-idp-saml-continue'), nonce=nonce) |
|
269 |
response = client.get(response['Location']) |
|
270 |
self.assertEqual(response.status_code, 200) |
|
271 |
self.assertEqual(response['Content-type'].split(';')[0], 'text/html') |
|
272 |
doc = parse(StringIO.StringIO(response.content)).getroot() |
|
273 |
self.assertEqual(len(doc.forms), 1, msg='the number of forms is not 1') |
|
274 |
self.assertEqual( |
|
275 |
doc.forms[0].get('action'), '%s/sso/POST' % self.base_url) |
|
276 |
self.assertIn('SAMLResponse', doc.forms[0].fields) |
|
277 |
saml_response = doc.forms[0].fields['SAMLResponse'] |
|
278 |
try: |
|
279 |
base64.b64decode(saml_response) |
|
280 |
except TypeError: |
|
281 |
self.fail('SAMLResponse is not base64 encoded: %s' % saml_response) |
|
282 |
login = self.parse_authn_response(saml_response) |
|
283 |
assertion = login.assertion |
|
284 |
assertion_xml = assertion.exportToXml() |
|
285 |
namespaces = { |
|
286 |
'saml': lasso.SAML2_ASSERTION_HREF, |
|
287 |
} |
|
288 |
constraints = () |
|
289 |
# check nameid |
|
290 |
if check_federation: |
|
291 |
format = make_authn_request_kwargs.get('format') |
|
292 |
if not format: |
|
293 |
if self.default_sp_options_idp_policy.default_name_id_format == 'username': |
|
294 |
self.assertEqual(login.assertion.subject.nameID.format, |
|
295 |
lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED) |
|
296 |
self.assertEqual(login.assertion.subject.nameID.content, |
|
297 |
self.user.username.encode('utf-8')) |
|
298 |
elif self.default_sp_options_idp_policy.default_name_id_format == 'uuid': |
|
299 |
self.assertEqual(login.assertion.subject.nameID.format, |
|
300 |
lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED) |
|
301 |
self.assertEqual(login.assertion.subject.nameID.content, |
|
302 |
self.user.uuid.encode('utf-8')) |
|
303 |
elif format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT: |
|
304 |
federation = saml_models.LibertyFederation.objects.get() |
|
305 |
constraints += ( |
|
306 |
('/saml:Assertion/saml:Subject/saml:NameID', |
|
307 |
federation.name_id_content), |
|
308 |
('/saml:Assertion/saml:Subject/saml:NameID/@Format', |
|
309 |
lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT), |
|
310 |
('/saml:Assertion/saml:Subject/saml:NameID/@SPNameQualifier', |
|
311 |
'%s/' % self.base_url), |
|
312 | ||
313 |
) |
|
314 |
elif format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_EMAIL or \ |
|
315 |
(not format and default_name_id_format == 'email'): |
|
316 |
constraints += ( |
|
317 |
('/saml:Assertion/saml:Subject/saml:NameID', |
|
318 |
self.email), |
|
319 |
('/saml:Assertion/saml:Subject/saml:NameID/@Format', |
|
320 |
lasso.SAML2_NAME_IDENTIFIER_FORMAT_EMAIL), |
|
321 |
) |
|
322 |
constraints += ( |
|
323 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='first-name']/" |
|
324 |
"@NameFormat", lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC), |
|
325 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='first-name']/" |
|
326 |
"@FriendlyName", 'First name'), |
|
327 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='first-name']/" |
|
328 |
"saml:AttributeValue", 'John'), |
|
329 | ||
330 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='last-name']/" |
|
331 |
"@NameFormat", lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC), |
|
332 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='last-name']/" |
|
333 |
"@FriendlyName", 'Last name'), |
|
334 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='last-name']/" |
|
335 |
"saml:AttributeValue", 'Doe'), |
|
336 | ||
337 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='superuser']/" |
|
338 |
"@NameFormat", lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC), |
|
339 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='superuser']/" |
|
340 |
"@FriendlyName", 'Superuser status'), |
|
341 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='superuser']/" |
|
342 |
"saml:AttributeValue", 'true'), |
|
343 |
) |
|
344 |
self.assertXPathConstraints(assertion_xml, constraints, namespaces) |
|
345 |
tests/test_idp_saml2.py | ||
---|---|---|
1 |
import datetime |
|
2 |
import base64 |
|
3 |
import unittest |
|
4 |
import StringIO |
|
5 |
import urlparse |
|
6 |
from lxml.html import parse |
|
7 | ||
8 |
from django.test import Client |
|
9 |
from django.test.utils import override_settings |
|
10 |
from django.contrib.auth import get_user_model, REDIRECT_FIELD_NAME |
|
11 |
from django.core.urlresolvers import reverse |
|
12 |
from django.utils.translation import gettext as _ |
|
13 | ||
14 |
from authentic2.tests import Authentic2TestCase |
|
15 |
from authentic2.saml import models as saml_models |
|
16 |
from authentic2.a2_rbac.models import Role, OrganizationalUnit |
|
17 |
from authentic2.utils import make_url |
|
18 |
from authentic2.constants import NONCE_FIELD_NAME |
|
19 | ||
20 |
try: |
|
21 |
import lasso |
|
22 |
except ImportError: |
|
23 |
lasso = None |
|
24 | ||
25 |
from authentic2.idp.saml import app_settings |
|
26 | ||
27 | ||
28 |
@unittest.skipUnless(lasso is not None, 'lasso is not installed') |
|
29 |
@override_settings(A2_IDP_SAML2_ENABLE=True) |
|
30 |
class SamlBaseTestCase(Authentic2TestCase): |
|
31 |
METADATA_TPL = '''<?xml version="1.0" encoding="UTF-8" standalone="yes"?> |
|
32 |
<EntityDescriptor |
|
33 |
entityID="{base_url}/" |
|
34 |
xmlns="urn:oasis:names:tc:SAML:2.0:metadata"> |
|
35 |
<SPSSODescriptor |
|
36 |
AuthnRequestsSigned="true" |
|
37 |
WantAssertionsSigned="true" |
|
38 |
protocolSupportEnumeration="urn:oasis:names:tc:SAML:2.0:protocol"> |
|
39 |
<SingleLogoutService |
|
40 |
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" |
|
41 |
Location="https://files.entrouvert.org/mellon/logout" /> |
|
42 |
<AssertionConsumerService |
|
43 |
index="0" |
|
44 |
isDefault="true" |
|
45 |
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" |
|
46 |
Location="{base_url}/sso/POST" /> |
|
47 |
<AssertionConsumerService |
|
48 |
index="1" |
|
49 |
Binding="urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Artifact" |
|
50 |
Location="{base_url}/mellon/artifactResponse" /> |
|
51 |
</SPSSODescriptor> |
|
52 |
</EntityDescriptor>''' |
|
53 | ||
54 |
def get_sp_metadata(self, base_url='https://sp.example.com'): |
|
55 |
return self.METADATA_TPL.format(base_url=base_url) |
|
56 | ||
57 |
def get_idp_metadata(self): |
|
58 |
client = Client() |
|
59 |
response = client.get(reverse('a2-idp-saml-metadata')) |
|
60 |
# FIXME: add better test of well formedness for metadata |
|
61 |
self.assertEqual(response['Content-type'], 'text/xml', |
|
62 |
msg='metadata endpoint did not return an XML ' |
|
63 |
'document') |
|
64 |
self.assertIn('IDPSSODescriptor', response.content, |
|
65 |
msg='metadata endpoint does not contain an ' |
|
66 |
'IDPSSODescriptor element') |
|
67 |
return response.content |
|
68 | ||
69 |
def get_server(self, base_url='https://sp.example.com'): |
|
70 |
sp_meta = self.get_sp_metadata(base_url=base_url) |
|
71 |
idp_meta = self.get_idp_metadata() |
|
72 |
server = lasso.Server.newFromBuffers(sp_meta) |
|
73 |
server.addProviderFromBuffer(lasso.PROVIDER_ROLE_IDP, idp_meta) |
|
74 |
return server |
|
75 | ||
76 |
def setup(self, default_name_id_format='persistent'): |
|
77 |
self.base_url = 'https://sp.example.com' |
|
78 |
self.name = 'Test SP' |
|
79 |
self.slug = 'test-sp' |
|
80 |
self.email = 'john.doe@example.com' |
|
81 |
self.username = 'john.doe' |
|
82 |
self.first_name = 'John' |
|
83 |
self.last_name = 'Doe' |
|
84 |
self.password = 'T0toto' |
|
85 |
self.user = get_user_model().objects.create( |
|
86 |
email=self.email, |
|
87 |
username=self.username, |
|
88 |
first_name=self.first_name, |
|
89 |
last_name=self.last_name) |
|
90 |
self.user.set_password(self.password) |
|
91 |
self.user.save() |
|
92 |
self.default_ou = OrganizationalUnit.objects.get() |
|
93 |
sp_meta = self.get_sp_metadata() |
|
94 |
self.liberty_provider = saml_models.LibertyProvider( |
|
95 |
name=self.name, |
|
96 |
slug=self.slug, |
|
97 |
ou=self.default_ou, |
|
98 |
metadata=sp_meta) |
|
99 |
self.liberty_provider.clean() |
|
100 |
self.liberty_provider.save() |
|
101 |
self.liberty_service_provider = saml_models.LibertyServiceProvider \ |
|
102 |
.objects.create( |
|
103 |
liberty_provider=self.liberty_provider, |
|
104 |
enabled=True) |
|
105 |
self.default_sp_options_idp_policy = saml_models.SPOptionsIdPPolicy \ |
|
106 |
.objects.create( |
|
107 |
name='Default', |
|
108 |
enabled=True, |
|
109 |
authn_request_signed=False, |
|
110 |
default_name_id_format=default_name_id_format, |
|
111 |
accepted_name_id_format=['email', 'persistent', 'transient', |
|
112 |
'username']) |
|
113 |
self.admin_role = Role.objects.create( |
|
114 |
name='Administrator', |
|
115 |
slug='administrator', |
|
116 |
service=self.liberty_provider) |
|
117 |
self.admin_role.attributes.create( |
|
118 |
name='superuser', |
|
119 |
kind='strig', |
|
120 |
value='true') |
|
121 |
self.admin_role.members.add(self.user) |
|
122 |
self.first_name_attribute = self.liberty_provider.attributes.create( |
|
123 |
name_format='basic', |
|
124 |
name='first-name', |
|
125 |
friendly_name='First name', |
|
126 |
attribute_name='django_user_first_name') |
|
127 |
self.last_name_attribute = self.liberty_provider.attributes.create( |
|
128 |
name_format='basic', |
|
129 |
name='last-name', |
|
130 |
friendly_name='Last name', |
|
131 |
attribute_name='django_user_last_name') |
|
132 |
self.superuser_attribute = self.liberty_provider.attributes.create( |
|
133 |
name_format='basic', |
|
134 |
name='superuser', |
|
135 |
friendly_name='Superuser status', |
|
136 |
attribute_name='superuser') |
|
137 | ||
138 |
def make_authn_request( |
|
139 |
self, idp=None, |
|
140 |
method=lasso.HTTP_METHOD_REDIRECT, |
|
141 |
allow_create=None, |
|
142 |
format=None, |
|
143 |
relay_state=None, |
|
144 |
force_authn=None, |
|
145 |
is_passive=None, |
|
146 |
sp_name_qualifier=None, |
|
147 |
sign=False, |
|
148 |
name_id_policy=True): |
|
149 |
server = self.get_server() |
|
150 |
login = lasso.Login(server) |
|
151 |
if not sign: |
|
152 |
login.setSignatureHint(lasso.PROFILE_SIGNATURE_HINT_FORBID) |
|
153 |
login.initAuthnRequest(idp, method) |
|
154 |
request = login.request |
|
155 |
policy = request.nameIdPolicy |
|
156 |
if force_authn is not None: |
|
157 |
request.forceAuthn = force_authn |
|
158 |
if is_passive is not None: |
|
159 |
request.isPassive = is_passive |
|
160 |
if allow_create is not None: |
|
161 |
policy.allowCreate = allow_create |
|
162 |
if format is not None: |
|
163 |
policy.format = format |
|
164 |
if sp_name_qualifier is not None: |
|
165 |
policy.spNameQualifier = sp_name_qualifier |
|
166 |
if relay_state is not None: |
|
167 |
login.msgRelayState = relay_state |
|
168 |
if not name_id_policy: |
|
169 |
request.nameIdPolicy = None |
|
170 |
login.buildAuthnRequestMsg() |
|
171 |
if method == lasso.HTTP_METHOD_REDIRECT: |
|
172 |
self.assertIsNone(login.msgBody, 'body should be None with ' |
|
173 |
'method Redirect') |
|
174 |
elif method == lasso.HTTP_METHOD_POST: |
|
175 |
self.assertIsNotNone(login.msgBody) |
|
176 |
self.assertIsNone(login.msgBody, 'body should be None with method ' |
|
177 |
'Redirect') |
|
178 |
url_parsed = urlparse.urlparse(login.msgUrl) |
|
179 |
self.assertEqual(url_parsed.path, reverse('a2-idp-saml-sso'), |
|
180 |
'msgUrl should target the sso endpoint') |
|
181 |
return login.msgUrl, login.msgBody, request.id |
|
182 | ||
183 |
def parse_authn_response(self, saml_response): |
|
184 |
server = self.get_server() |
|
185 |
login = lasso.Login(server) |
|
186 |
login.processAuthnResponseMsg(saml_response) |
|
187 |
login.acceptSso() |
|
188 |
return login |
|
189 | ||
190 | ||
191 |
class SamlSSOTestCase(SamlBaseTestCase): |
|
192 |
def test_sso_login_redirect(self): |
|
193 |
self.do_test_sso(dict(allow_create=True, |
|
194 |
format=lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT)) |
|
195 | ||
196 |
def test_sso_cancel_redirect(self): |
|
197 |
self.do_test_sso(dict(allow_create=True), cancel=True) |
|
198 | ||
199 |
def test_sso_no_name_id_policy_redirect(self): |
|
200 |
self.do_test_sso(dict(allow_create=True, name_id_policy=False), |
|
201 |
check_federation=False, default_name_id_format='email') |
|
202 | ||
203 |
def test_sso_name_id_policy_username(self): |
|
204 |
self.do_test_sso(dict(allow_create=True, name_id_policy=False), |
|
205 |
check_federation=True, default_name_id_format='username') |
|
206 | ||
207 |
def test_sso_name_id_policy_uuid(self): |
|
208 |
self.do_test_sso(dict(allow_create=True, name_id_policy=False), |
|
209 |
check_federation=True, default_name_id_format='uuid') |
|
210 | ||
211 |
def do_test_sso(self, make_authn_request_kwargs={}, check_federation=True, |
|
212 |
cancel=False, default_name_id_format='persistent'): |
|
213 |
self.setup(default_name_id_format=default_name_id_format) |
|
214 |
client = Client() |
|
215 |
# Launch an AuthnRequest |
|
216 |
url, body, request_id = self.make_authn_request( |
|
217 |
**make_authn_request_kwargs) |
|
218 |
response = client.get(url) |
|
219 |
self.assertRedirectsComplex(response, reverse('auth_login'), **{ |
|
220 |
'nonce': '*', |
|
221 |
REDIRECT_FIELD_NAME: make_url('a2-idp-saml-continue', |
|
222 |
params={ |
|
223 |
NONCE_FIELD_NAME: request_id |
|
224 |
} |
|
225 |
), |
|
226 |
}) |
|
227 |
nonce = urlparse.parse_qs( |
|
228 |
urlparse.urlparse( |
|
229 |
response['Location']).query)['nonce'][0] |
|
230 |
url = response['Location'] |
|
231 |
response = client.get(url) |
|
232 |
self.assertEqual(response.status_code, 200) |
|
233 |
self.assertEqual(response['Content-Type'].split(';')[0], 'text/html') |
|
234 |
self.assertInHTML(u'<input type="submit" name="cancel" ' |
|
235 |
'value="%s"/>' % _('Cancel'), response.content, |
|
236 |
count=1) |
|
237 |
if cancel: |
|
238 |
response = client.post(url, { |
|
239 |
'cancel': 1, |
|
240 |
}) |
|
241 |
self.assertRedirectsComplex(response, |
|
242 |
reverse('a2-idp-saml-continue'), |
|
243 |
cancel='*', nonce=nonce) |
|
244 |
response = client.get(response['Location']) |
|
245 |
self.assertEqual(response.status_code, 200) |
|
246 |
self.assertEqual(response['Content-type'].split(';')[0], |
|
247 |
'text/html') |
|
248 |
doc = parse(StringIO.StringIO(response.content)).getroot() |
|
249 |
self.assertEqual(len(doc.forms), 1, |
|
250 |
msg='the number of forms is not 1') |
|
251 |
self.assertEqual(doc.forms[0].get('action'), |
|
252 |
'%s/sso/POST' % self.base_url) |
|
253 |
self.assertIn('SAMLResponse', doc.forms[0].fields) |
|
254 |
saml_response = doc.forms[0].fields['SAMLResponse'] |
|
255 |
try: |
|
256 |
base64.b64decode(saml_response) |
|
257 |
except TypeError: |
|
258 |
self.fail('SAMLResponse is not base64 encoded: %s' |
|
259 |
% saml_response) |
|
260 |
with self.assertRaises(lasso.ProfileRequestDeniedError): |
|
261 |
assertion = self.parse_authn_response(saml_response) |
|
262 |
else: |
|
263 |
response = client.post(url, { |
|
264 |
'username': self.email, |
|
265 |
'password': self.password, |
|
266 |
'login-password-submit': 1, |
|
267 |
}) |
|
268 |
self.assertRedirectsComplex( |
|
269 |
response, reverse('a2-idp-saml-continue'), nonce=nonce) |
|
270 |
response = client.get(response['Location']) |
|
271 |
self.assertEqual(response.status_code, 200) |
|
272 |
self.assertEqual(response['Content-type'].split(';')[0], 'text/html') |
|
273 |
doc = parse(StringIO.StringIO(response.content)).getroot() |
|
274 |
self.assertEqual(len(doc.forms), 1, msg='the number of forms is not 1') |
|
275 |
self.assertEqual( |
|
276 |
doc.forms[0].get('action'), '%s/sso/POST' % self.base_url) |
|
277 |
self.assertIn('SAMLResponse', doc.forms[0].fields) |
|
278 |
saml_response = doc.forms[0].fields['SAMLResponse'] |
|
279 |
try: |
|
280 |
base64.b64decode(saml_response) |
|
281 |
except TypeError: |
|
282 |
self.fail('SAMLResponse is not base64 encoded: %s' % saml_response) |
|
283 |
login = self.parse_authn_response(saml_response) |
|
284 |
assertion = login.assertion |
|
285 |
session_not_on_or_after = login.assertion.authnStatement[0].sessionNotOnOrAfter |
|
286 |
assert session_not_on_or_after is not None |
|
287 |
assert (datetime.datetime.strptime(session_not_on_or_after, '%Y-%m-%dT%H:%M:%SZ') > |
|
288 |
datetime.datetime.utcnow()) |
|
289 |
assertion_xml = assertion.exportToXml() |
|
290 |
namespaces = { |
|
291 |
'saml': lasso.SAML2_ASSERTION_HREF, |
|
292 |
} |
|
293 |
constraints = () |
|
294 |
# check nameid |
|
295 |
if check_federation: |
|
296 |
format = make_authn_request_kwargs.get('format') |
|
297 |
if not format: |
|
298 |
if self.default_sp_options_idp_policy.default_name_id_format == 'username': |
|
299 |
self.assertEqual(login.assertion.subject.nameID.format, |
|
300 |
lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED) |
|
301 |
self.assertEqual(login.assertion.subject.nameID.content, |
|
302 |
self.user.username.encode('utf-8')) |
|
303 |
elif self.default_sp_options_idp_policy.default_name_id_format == 'uuid': |
|
304 |
self.assertEqual(login.assertion.subject.nameID.format, |
|
305 |
lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED) |
|
306 |
self.assertEqual(login.assertion.subject.nameID.content, |
|
307 |
self.user.uuid.encode('utf-8')) |
|
308 |
elif format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT: |
|
309 |
federation = saml_models.LibertyFederation.objects.get() |
|
310 |
constraints += ( |
|
311 |
('/saml:Assertion/saml:Subject/saml:NameID', |
|
312 |
federation.name_id_content), |
|
313 |
('/saml:Assertion/saml:Subject/saml:NameID/@Format', |
|
314 |
lasso.SAML2_NAME_IDENTIFIER_FORMAT_PERSISTENT), |
|
315 |
('/saml:Assertion/saml:Subject/saml:NameID/@SPNameQualifier', |
|
316 |
'%s/' % self.base_url), |
|
317 | ||
318 |
) |
|
319 |
elif format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_EMAIL or \ |
|
320 |
(not format and default_name_id_format == 'email'): |
|
321 |
constraints += ( |
|
322 |
('/saml:Assertion/saml:Subject/saml:NameID', |
|
323 |
self.email), |
|
324 |
('/saml:Assertion/saml:Subject/saml:NameID/@Format', |
|
325 |
lasso.SAML2_NAME_IDENTIFIER_FORMAT_EMAIL), |
|
326 |
) |
|
327 |
constraints += ( |
|
328 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='first-name']/" |
|
329 |
"@NameFormat", lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC), |
|
330 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='first-name']/" |
|
331 |
"@FriendlyName", 'First name'), |
|
332 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='first-name']/" |
|
333 |
"saml:AttributeValue", 'John'), |
|
334 | ||
335 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='last-name']/" |
|
336 |
"@NameFormat", lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC), |
|
337 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='last-name']/" |
|
338 |
"@FriendlyName", 'Last name'), |
|
339 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='last-name']/" |
|
340 |
"saml:AttributeValue", 'Doe'), |
|
341 | ||
342 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='superuser']/" |
|
343 |
"@NameFormat", lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC), |
|
344 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='superuser']/" |
|
345 |
"@FriendlyName", 'Superuser status'), |
|
346 |
("/saml:Assertion/saml:AttributeStatement/saml:Attribute[@Name='superuser']/" |
|
347 |
"saml:AttributeValue", 'true'), |
|
348 |
) |
|
349 |
self.assertXPathConstraints(assertion_xml, constraints, namespaces) |
|
350 | ||
0 |
- |