0001-handling-of-connection-through-a-service-provide-5262.patch
src/authentic2/idp/saml/saml2_endpoints.py | ||
---|---|---|
675 | 675 |
transient = True |
676 | 676 | |
677 | 677 |
decisions = idp_signals.authorize_service.send(sender=None, |
678 |
request=request, user=request.user, audience=login.remoteProviderId, |
|
678 |
#request=request, user=request.user, audience=login.remoteProviderId, |
|
679 |
request=request, user=request.user, audience=load_provider(request, login.remoteProviderId), |
|
679 | 680 |
attributes={}) |
680 | 681 |
logger.info('signal authorize_service sent') |
681 | 682 |
src/authentic2/idp/saml/tests.py | ||
---|---|---|
207 | 207 |
self.do_test_sso(dict(allow_create=True, name_id_policy=False), |
208 | 208 |
check_federation=True, default_name_id_format='uuid') |
209 | 209 | |
210 |
def test_sso_unauthorized_service(self): |
|
211 |
self.do_test_sso(dict(allow_create=True), authorize_service=False) |
|
212 | ||
210 | 213 |
def do_test_sso(self, make_authn_request_kwargs={}, check_federation=True, |
211 |
cancel=False, default_name_id_format='persistent'): |
|
214 |
cancel=False, default_name_id_format='persistent', authorize_service=True):
|
|
212 | 215 |
self.setup(default_name_id_format=default_name_id_format) |
213 | 216 |
client = Client() |
214 | 217 |
# Launch an AuthnRequest |
... | ... | |
258 | 261 |
% saml_response) |
259 | 262 |
with self.assertRaises(lasso.ProfileRequestDeniedError): |
260 | 263 |
assertion = self.parse_authn_response(saml_response) |
264 |
elif not authorize_service: |
|
265 |
self.admin_role.members.remove(self.user) # Removing user role |
|
266 |
response = client.post(url, { |
|
267 |
'username': self.email, |
|
268 |
'password': self.password, |
|
269 |
'login-password-submit': 1, |
|
270 |
}) |
|
271 |
self.assertRedirectsComplex(response, |
|
272 |
reverse('a2-idp-saml-continue'), |
|
273 |
nonce=nonce) |
|
274 |
response = client.get(response['Location']) |
|
275 |
self.assertEqual(response.status_code, 200) |
|
276 |
self.assertEqual(response['Content-type'].split(';')[0], |
|
277 |
'text/html') |
|
278 |
doc = parse(StringIO.StringIO(response.content)).getroot() |
|
279 |
self.assertEqual(len(doc.forms), 1, |
|
280 |
msg='the number of forms is not 1') |
|
281 |
self.assertEqual(doc.forms[0].get('action'), |
|
282 |
'%s/sso/POST' % self.base_url) |
|
283 |
self.assertIn('SAMLResponse', doc.forms[0].fields) |
|
284 |
saml_response = doc.forms[0].fields['SAMLResponse'] |
|
285 |
try: |
|
286 |
saml_response_decoded = base64.b64decode(saml_response) |
|
287 |
except TypeError: |
|
288 |
self.fail('SAMLResponse is not base64 encoded: %s' |
|
289 |
% saml_response) |
|
290 |
with self.assertRaises(lasso.ProfileRequestDeniedError): |
|
291 |
assertion = self.parse_authn_response(saml_response) |
|
292 |
namespaces = {'samlp': lasso.SAML2_PROTOCOL_HREF} |
|
293 |
constraints = ( |
|
294 |
("/samlp:Response/samlp:Status/samlp:StatusCode/@Value", |
|
295 |
lasso.SAML2_STATUS_CODE_RESPONDER), |
|
296 |
("/samlp:Response/samlp:Status/samlp:StatusCode/samlp:StatusCode/@Value", |
|
297 |
lasso.SAML2_STATUS_CODE_REQUEST_DENIED), |
|
298 |
("/samlp:Response/samlp:Status/samlp:StatusMessage", |
|
299 |
'User not allowed to connect from this Service Provider') |
|
300 |
) |
|
301 |
self.assertXPathConstraints(saml_response_decoded, constraints, namespaces) |
|
261 | 302 |
else: |
262 | 303 |
response = client.post(url, { |
263 | 304 |
'username': self.email, |
src/authentic2/models.py | ||
---|---|---|
12 | 12 |
from . import attribute_kinds |
13 | 13 |
from authentic2.a2_rbac.models import Role |
14 | 14 |
from authentic2.a2_rbac.utils import get_default_ou |
15 |
from authentic2.idp import signals as idp_signals |
|
15 | 16 | |
16 | 17 |
try: |
17 | 18 |
from django.contrib.contenttypes.fields import GenericForeignKey |
... | ... | |
292 | 293 |
'ou__slug': self.ou.slug if self.ou else None, |
293 | 294 |
'roles': [role.to_json() for role in roles], |
294 | 295 |
} |
296 | ||
297 |
def is_authorized(self, user): |
|
298 |
"""Check if use is authorized or not |
|
299 |
""" |
|
300 |
if not self.roles.all(): |
|
301 |
return True |
|
302 |
for role in self.roles.all(): |
|
303 |
if user in role.members.all(): |
|
304 |
return True |
|
305 | ||
306 |
return False |
|
307 | ||
308 |
def authorize_service_signal_handler(sender, **kwargs): |
|
309 |
"""Receiver of the authorize_service signal |
|
310 |
""" |
|
311 |
user = kwargs['user'] |
|
312 |
service_provider = kwargs['audience'] |
|
313 | ||
314 | ||
315 |
return {'authz': True} if service_provider.is_authorized(user) else {'authz': False, |
|
316 |
'message': _('User not allowed to connect from this Service Provider')} |
|
317 | ||
318 |
idp_signals.authorize_service.connect(authorize_service_signal_handler) |
|
295 |
- |