Projet

Général

Profil

0002-backends-add-SAML-LDAP-authentication-backend-30125.patch

Benjamin Dauvergne, 12 avril 2019 18:45

Télécharger (17,3 ko)

Voir les différences:

Subject: [PATCH 2/2] backends: add SAML LDAP authentication backend (#30125)

 src/authentic2/backends/ldap_backend.py | 147 +++++++++++-------------
 src/authentic2_auth_saml/__init__.py    |   3 +-
 src/authentic2_auth_saml/backends.py    |  44 +++++++
 tests/test_auth_saml.py                 |  64 +++++++++++
 tests/test_ldap.py                      |  26 +++++
 5 files changed, 204 insertions(+), 80 deletions(-)
src/authentic2/backends/ldap_backend.py
480 480
        'can_reset_password': False,
481 481
        # mapping from LDAP attributes to User attributes
482 482
        'user_attributes': [],
483
        # entity id of the IDP based on the same LDAP
484
        'saml_entity_id': '',
485
        # template for SAML username
486
        'saml_username_template': '{uid[0]}'
483 487
    }
484 488
    _REQUIRED = ('url', 'basedn')
485 489
    _TO_ITERABLE = ('url', 'groupsu', 'groupstaff', 'groupactive')
......
541 545
            if user is not None:
542 546
                return user
543 547

  
544
    def authenticate_block(self, block, username, password):
548
    def authenticate_block(self, block, username, password=None):
545 549
        for conn in self.get_connections(block):
546
            authz_ids = []
547
            user_basedn = force_text(block.get('user_basedn') or block['basedn'])
548

  
549 550
            try:
550
                if block['user_dn_template']:
551
                    template = force_text(block['user_dn_template'])
552
                    escaped_username = escape_dn_chars(username)
553
                    authz_ids.append(template.format(username=escaped_username))
554
                else:
555
                    try:
556
                        if block.get('bind_with_username'):
557
                            authz_ids.append(username)
558
                        elif block['user_filter']:
559
                            # allow multiple occurences of the username in the filter
560
                            user_filter = force_text(block['user_filter'])
561
                            n = len(user_filter.split('%s')) - 1
562
                            try:
563
                                query = filter_format(user_filter, (username,) * n)
564
                            except TypeError as e:
565
                                log.error('user_filter syntax error %r: %s', block['user_filter'],
566
                                          e)
567
                                return
568
                            log.debug('looking up dn for username %r using query %r', username,
569
                                      query)
570
                            results = conn.search_s(user_basedn, ldap.SCOPE_SUBTREE, query, [u'1.1'])
571
                            # remove search references
572
                            results = [result for result in results if result[0] is not None]
573
                            log.debug('found dns %r', results)
574
                            if len(results) == 0:
575
                                log.debug('user lookup failed: no entry found, %s' % query)
576
                            elif not block['multimatch'] and len(results) > 1:
577
                                log.error('user lookup failed: too many (%d) entries found: %s',
578
                                          len(results), query)
579
                            else:
580
                                authz_ids.extend(result[0] for result in results)
581
                        else:
582
                            raise NotImplementedError
583
                    except ldap.NO_SUCH_OBJECT:
584
                        log.error('user lookup failed: basedn %s not found', user_basedn)
585
                        if block['replicas']:
586
                            break
587
                        continue
588
                    except ldap.LDAPError as e:
589
                        log.error('user lookup failed: with query %r got error %s: %s', username,
590
                                  query, e)
591
                        continue
592
                if not authz_ids:
593
                    continue
594

  
595
                try:
596
                    failed = False
597
                    for authz_id in authz_ids:
598
                        if failed:
599
                            continue
600
                        try:
601
                            conn.simple_bind_s(authz_id, password)
602
                            user_login_success(authz_id)
603
                            if not block['connect_with_user_credentials']:
604
                                try:
605
                                    self.bind(block, conn)
606
                                except Exception:
607
                                    log.exception(u'rebind failure after login bind')
608
                                    raise ldap.SERVER_DOWN
609
                            break
610
                        except ldap.INVALID_CREDENTIALS:
611
                            user_login_failure(authz_id)
612
                            pass
551
                for dn in self.get_dn_from_username(conn, block, username):
552
                    if password is None or self.try_bind(conn, block, username, dn, password):
553
                        user_login_success(dn)
554
                        return self._return_user(dn, password, conn, block)
613 555
                    else:
614
                        log.debug('user bind failed: invalid credentials')
615
                        if block['replicas']:
616
                            break
617
                        continue
618
                except ldap.NO_SUCH_OBJECT:
619
                    # should not happen as we just searched for this object !
620
                    log.error('user bind failed: authz_id not found %r', ', '.join(authz_ids))
621
                    if block['replicas']:
622
                        break
623
                return self._return_user(authz_id, password, conn, block)
556
                        user_login_failure(dn)
624 557
            except ldap.CONNECT_ERROR:
625 558
                log.error('connection to %r failed, did you forget to declare the TLS certificate '
626 559
                          'in /etc/ldap/ldap.conf ?', block['url'])
......
628 561
                log.error('connection to %r timed out', block['url'])
629 562
            except ldap.SERVER_DOWN:
630 563
                log.error('ldap authentication error: %r is down', block['url'])
564
            except ldap.LDAPError as e:
565
                log.error('ldap error: %s', e)
566
                pass
631 567
            finally:
632 568
                del conn
633 569
        return None
634 570

  
571
    def get_dn_from_username(self, conn, block, username):
572
        user_basedn = block.get('user_basedn') or block['basedn']
573
        if block['user_dn_template']:
574
            template = force_text(block['user_dn_template'])
575
            escaped_username = escape_dn_chars(username)
576
            username = template.format(username=escaped_username)
577

  
578
        # allow multiple occurences of the username in the filter
579
        user_filter = force_text(block['user_filter'])
580
        n = len(user_filter.split('%s')) - 1
581

  
582
        try:
583
            query = filter_format(user_filter, (username,) * n)
584
        except TypeError as e:
585
            log.error('user_filter syntax error %r: %s', block['user_filter'], e)
586
            return []
587
        log.debug('looking up dn for username %r using query %r', username,
588
                  query)
589
        try:
590
            results = conn.search_s(user_basedn, ldap.SCOPE_SUBTREE, query)
591
            results = [result for result in results if result[0] is not None]
592
            log.debug('found dns %r', results)
593
            if len(results) == 0:
594
                log.debug('user lookup failed: no entry found, %s' % query)
595
                return []
596
            elif not block['multimatch'] and len(results) > 1:
597
                log.error('user lookup failed: too many (%d) entries found: %s',
598
                          len(results), query)
599
                return []
600
            else:
601
                return [result[0] for result in results]
602
        except ldap.NO_SUCH_OBJECT:
603
            log.error('user dn lookup failed: basedn %s not found', user_basedn)
604
            raise
605
        except ldap.LDAPError:
606
            log.error('user dn lookup failed: with query %r got error %s', username, query)
607
            raise
608

  
609
    def try_bind(self, conn, block, username, dn, password):
610
        try:
611
            conn.simple_bind_s(dn, password)
612
            if not block['connect_with_user_credentials']:
613
                try:
614
                    self.bind(block, conn)
615
                except Exception:
616
                    log.exception(u'rebind failure after login bind')
617
                    return False
618
            return True
619
        except ldap.INVALID_CREDENTIALS:
620
            return False
621
        except ldap.NO_SUCH_OBJECT:
622
            log.error('user bind failed: username not found %s', username)
623
            return False
624

  
635 625
    def get_user(self, user_id, session=None):
636 626
        try:
637 627
            try:
......
1136 1126

  
1137 1127
    @classmethod
1138 1128
    def get_users(cls):
1139
        logger = logging.getLogger(__name__)
1140 1129
        for block in cls.get_config():
1141 1130
            conn = cls.get_connection(block)
1142 1131
            if conn is None:
......
1335 1324
                if not isinstance(cls._DEFAULTS[d], bool) and d in cls._REQUIRED and not block[d]:
1336 1325
                    raise ImproperlyConfigured(
1337 1326
                        'LDAP_AUTH_SETTINGS: attribute %r is required but is empty')
1338
                # force_bytes all strings in iterable or dict
1327
                # force_text all strings in iterable or dict
1339 1328
                if isinstance(block[d], (list, tuple, dict)):
1340 1329
                    block[d] = map_text(block[d])
1341 1330
        # lowercase LDAP attribute names
src/authentic2_auth_saml/__init__.py
7 7
        return ['mellon', __name__]
8 8

  
9 9
    def get_authentication_backends(self):
10
        return ['authentic2_auth_saml.backends.SAMLBackend']
10
        return ['authentic2_auth_saml.backends.SAMLLdapBackend',
11
                'authentic2_auth_saml.backends.SAMLBackend']
11 12

  
12 13
    def get_authenticators(self):
13 14
        return ['authentic2_auth_saml.authenticators.SAMLAuthenticator']
src/authentic2_auth_saml/backends.py
1
# authentic2_auth_saml - Authentic2 SAML Auth plugin
2
# Copyright (C) 2010-2019 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17

  
18
from django.conf import settings
19

  
1 20
from mellon.backends import SAMLBackend
2 21

  
3 22
from authentic2.middleware import StoreRequestMiddleware
23
from authentic2.backends.ldap_backend import LDAPBackend
4 24

  
5 25
from . import app_settings
6 26

  
......
22 42

  
23 43
        import lasso
24 44
        return lasso.SAML2_AUTHN_CONTEXT_PREVIOUS_SESSION
45

  
46

  
47
class SAMLLdapBackend(SAMLBackend):
48

  
49
    def get_username_from_saml_attributes(self, block, saml_attributes):
50
        username_template = block.get('saml_username_template', '')
51
        return username_template.format(**saml_attributes)
52

  
53
    def authenticate(self, saml_attributes, request=None):
54

  
55
        if not getattr(settings, 'LDAP_AUTH_SETTINGS', []):
56
            return None
57

  
58
        no_ldap_for_entity_id = True
59
        for block in settings.LDAP_AUTH_SETTINGS:
60
            if block.get('saml_entity_id', '') == saml_attributes['issuer']:
61
                no_ldap_for_entity_id = False
62
                break
63
        if no_ldap_for_entity_id:
64
            return None
65

  
66
        username = self.get_username_from_saml_attributes(block, saml_attributes)
67
        backend = LDAPBackend()
68
        return backend.authenticate_block(block, username)
tests/test_auth_saml.py
1
#
2
# Copyright (C) 2010-2019 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17

  
1 18
import pytest
19
import mock
2 20

  
3 21
from django.contrib.auth import get_user_model
22
from django.utils.timezone import datetime
23
from django.test.utils import override_settings
24

  
4 25
from authentic2.models import Attribute
5 26

  
6 27
pytestmark = pytest.mark.django_db
......
38 59
    del saml_attributes['mail']
39 60
    with pytest.raises(ValueError):
40 61
        adapter.finish_create_user(idp, saml_attributes, user)
62

  
63

  
64
@mock.patch('authentic2.backends.ldap_backend.LDAPBackend.get_dn_from_username')
65
@mock.patch('authentic2.backends.ldap_backend.LDAPBackend.authenticate_block')
66
def test_disabled_saml_ldap_backend(ldap_authenticate_block, ldap_user_dn):
67
    from authentic2_auth_saml.backends import SAMLLdapBackend
68
    backend = SAMLLdapBackend()
69
    saml_attributes = {}
70
    assert backend.authenticate(saml_attributes) == None
71
    with override_settings(LDAP_AUTH_SETTINGS=[]):
72
        assert backend.authenticate(saml_attributes) == None
73

  
74

  
75
    LDAP_SETTINGS = {
76
        "realm": "example.com",
77
        "url": "ldaps://ldap.example.com/",
78
        "basedn": "ou=people,o=example,o=com",
79
        "user_filter": "(|(mail=%s)(uid=%s))",
80
        "sync_ldap_users_filter": "",
81
        "username_template": "{uid[0]}",
82
        "attributes": [ "uid" ],
83
        "set_mandatory_groups": ["LDAP Entrouvert"],
84
        "timeout": 3,
85
        "use_tls": False,
86
        'saml_entity_id': 'https://some-idp.com/idp/saml2/metadata',
87
        'global_ldap_options': {},
88
    }
89

  
90
    saml_attributes = {'username': [u'foo'], 'first_name': [u'Foo'],
91
                'last_name': [u'Bar'], 'uid': [u'foobar'],
92
                'name_id_name_qualifier': u'https://some-idp.com/idp/saml2/metadata',
93
                'authn_instant': datetime(2019, 1, 30, 11, 12, 40),
94
                'name_id_format': u'urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified',
95
                'name_id_content': u'9f160fa0eb6045e5a5257a34fb4651e0',
96
                'mail': [u'foo@example.com'],
97
                'issuer': 'https://some-idp.com/idp/saml2/metadata'
98
    }
99

  
100
    with override_settings(A2_AUTH_SAML_LDAP_ENABLE=True, LDAP_AUTH_SETTINGS=[LDAP_SETTINGS]):
101
        ldap_user_dn.return_value = []
102
        ldap_authenticate_block.return_value = None
103
        backend.authenticate(saml_attributes)
104
        assert ldap_authenticate_block.call_args[0][0] == LDAP_SETTINGS
tests/test_ldap.py
542 542
    assert 'Étienne Michu' in response.body
543 543

  
544 544

  
545
def test_authenticate_block(slapd_strict_acl, db, settings, app):
546
    slapd = slapd_strict_acl
547
    settings.LDAP_AUTH_SETTINGS = [{
548
        'url': [slapd.ldap_url],
549
        'binddn': force_text(slapd.root_bind_dn),
550
        'bindpw': force_text(slapd.root_bind_password),
551
        'basedn': u'o=ôrga',
552
        'use_tls': False,
553
    }]
554
    backend = ldap_backend.LDAPBackend()
555
    block = backend.get_config()[0]
556
    user = backend.authenticate_block(block, USERNAME, PASS.decode('utf-8'))
557
    assert user is None
558

  
559
    # authenticate user with no password
560
    block = backend.get_config()[0]
561
    user = backend.authenticate_block(block, USERNAME)
562
    assert user.username == 'etienne.michu@ldap'
563

  
564
    # do not authenticate with user credentials
565
    settings.LDAP_AUTH_SETTINGS[0]['connect_with_user_credentials'] = False
566
    block = backend.get_config()[0]
567
    user = backend.authenticate_block(block, USERNAME, PASS.decode('utf-8'))
568
    assert user.username == 'etienne.michu@ldap'
569

  
570

  
545 571
def test_reset_password_ldap_user(slapd, settings, app, db):
546 572
    settings.LDAP_AUTH_SETTINGS = [{
547 573
        'url': [slapd.ldap_url],
548
-