Projet

Général

Profil

0001-handling-of-connection-through-a-service-provide-5262.patch

Josué Kouka, 20 octobre 2015 12:22

Télécharger (5,88 ko)

Voir les différences:

Subject: [PATCH] handling of connection through a service provide 5262

 src/authentic2/idp/saml/saml2_endpoints.py |  3 ++-
 src/authentic2/idp/saml/tests.py           | 43 +++++++++++++++++++++++++++++-
 src/authentic2/models.py                   | 24 +++++++++++++++++
 3 files changed, 68 insertions(+), 2 deletions(-)
src/authentic2/idp/saml/saml2_endpoints.py
675 675
        transient = True
676 676

  
677 677
    decisions = idp_signals.authorize_service.send(sender=None,
678
         request=request, user=request.user, audience=login.remoteProviderId,
678
         #request=request, user=request.user, audience=login.remoteProviderId,
679
         request=request, user=request.user, audience=load_provider(request, login.remoteProviderId),
679 680
         attributes={})
680 681
    logger.info('signal authorize_service sent')
681 682

  
src/authentic2/idp/saml/tests.py
207 207
        self.do_test_sso(dict(allow_create=True, name_id_policy=False),
208 208
                         check_federation=True, default_name_id_format='uuid')
209 209

  
210
    def test_sso_unauthorized_service(self):
211
        self.do_test_sso(dict(allow_create=True), authorize_service=False)
212

  
210 213
    def do_test_sso(self, make_authn_request_kwargs={}, check_federation=True,
211
                    cancel=False, default_name_id_format='persistent'):
214
                    cancel=False, default_name_id_format='persistent', authorize_service=True):
212 215
        self.setup(default_name_id_format=default_name_id_format)
213 216
        client = Client()
214 217
        # Launch an AuthnRequest
......
258 261
                          % saml_response)
259 262
            with self.assertRaises(lasso.ProfileRequestDeniedError):
260 263
                assertion = self.parse_authn_response(saml_response)
264
        elif not authorize_service:
265
            self.admin_role.members.remove(self.user) # Removing user role
266
            response = client.post(url, {
267
                'username': self.email,
268
                'password': self.password,
269
                'login-password-submit': 1,
270
            })
271
            self.assertRedirectsComplex(response,
272
                    reverse('a2-idp-saml-continue'),
273
                    nonce=nonce)
274
            response = client.get(response['Location'])
275
            self.assertEqual(response.status_code, 200)
276
            self.assertEqual(response['Content-type'].split(';')[0],
277
                             'text/html')
278
            doc = parse(StringIO.StringIO(response.content)).getroot()
279
            self.assertEqual(len(doc.forms), 1,
280
                             msg='the number of forms is not 1')
281
            self.assertEqual(doc.forms[0].get('action'),
282
                             '%s/sso/POST' % self.base_url)
283
            self.assertIn('SAMLResponse', doc.forms[0].fields)
284
            saml_response = doc.forms[0].fields['SAMLResponse']
285
            try:
286
                saml_response_decoded = base64.b64decode(saml_response)
287
            except TypeError:
288
                self.fail('SAMLResponse is not base64 encoded: %s'
289
                          % saml_response)
290
            with self.assertRaises(lasso.ProfileRequestDeniedError):
291
                assertion = self.parse_authn_response(saml_response)
292
            namespaces = {'samlp': lasso.SAML2_PROTOCOL_HREF}
293
            constraints = (
294
                    ("/samlp:Response/samlp:Status/samlp:StatusCode/@Value", 
295
                        lasso.SAML2_STATUS_CODE_RESPONDER),
296
                    ("/samlp:Response/samlp:Status/samlp:StatusCode/samlp:StatusCode/@Value", 
297
                        lasso.SAML2_STATUS_CODE_REQUEST_DENIED),
298
                    ("/samlp:Response/samlp:Status/samlp:StatusMessage", 
299
                        'User not allowed to connect from this Service Provider')
300
            )
301
            self.assertXPathConstraints(saml_response_decoded, constraints, namespaces)
261 302
        else:
262 303
            response = client.post(url, {
263 304
                'username': self.email,
src/authentic2/models.py
12 12
from . import attribute_kinds
13 13
from authentic2.a2_rbac.models import Role
14 14
from authentic2.a2_rbac.utils import get_default_ou
15
from authentic2.idp import signals as idp_signals
15 16

  
16 17
try:
17 18
    from django.contrib.contenttypes.fields import GenericForeignKey
......
292 293
            'ou__slug': self.ou.slug if self.ou else None,
293 294
            'roles': [role.to_json() for role in roles],
294 295
        }
296

  
297
    def is_authorized(self, user):
298
        """Check if use is authorized or not 
299
        """
300
        if not self.roles.all():
301
            return True
302
        for role in self.roles.all():
303
            if user in role.members.all():
304
                return True
305

  
306
        return False
307

  
308
def authorize_service_signal_handler(sender, **kwargs):
309
    """Receiver of the authorize_service signal
310
    """
311
    user = kwargs['user']
312
    service_provider = kwargs['audience']
313

  
314

  
315
    return {'authz': True} if service_provider.is_authorized(user) else {'authz': False, 
316
            'message': _('User not allowed to connect from this Service Provider')}
317

  
318
idp_signals.authorize_service.connect(authorize_service_signal_handler)
295
-