Projet

Général

Profil

0001-backends-add-SAML-LDAP-authentication-backend-30125.patch

Serghei Mihai, 21 mars 2019 09:15

Télécharger (16,8 ko)

Voir les différences:

Subject: [PATCH] backends: add SAML LDAP authentication backend (#30125)

 src/authentic2/backends/ldap_backend.py | 147 +++++++++++-------------
 src/authentic2_auth_saml/__init__.py    |   3 +-
 src/authentic2_auth_saml/backends.py    |  44 +++++++
 tests/test_auth_saml.py                 |  64 +++++++++++
 tests/test_ldap.py                      |  26 +++++
 5 files changed, 205 insertions(+), 79 deletions(-)
src/authentic2/backends/ldap_backend.py
463 463
        'can_reset_password': False,
464 464
        # mapping from LDAP attributes to User attributes
465 465
        'user_attributes': [],
466
        # entity id of the IDP based on the same LDAP
467
        'saml_entity_id': '',
468
        # template for SAML username
469
        'saml_username_template': '{uid[0]}'
466 470
    }
467 471
    _REQUIRED = ('url', 'basedn')
468 472
    _TO_ITERABLE = ('url', 'groupsu', 'groupstaff', 'groupactive')
......
520 524
            if user is not None:
521 525
                return user
522 526

  
523
    def authenticate_block(self, block, username, password):
527
    def authenticate_block(self, block, username, password=None):
528
        utf8_username = force_bytes(username)
524 529
        for conn in self.get_connections(block):
525
            authz_ids = []
526
            user_basedn = force_text(block.get('user_basedn') or block['basedn'])
527

  
528 530
            try:
529
                if block['user_dn_template']:
530
                    template = force_text(block['user_dn_template'])
531
                    escaped_username = escape_dn_chars(username)
532
                    authz_ids.append(template.format(username=escaped_username))
533
                else:
534
                    try:
535
                        if block.get('bind_with_username'):
536
                            authz_ids.append(username)
537
                        elif block['user_filter']:
538
                            # allow multiple occurences of the username in the filter
539
                            user_filter = force_text(block['user_filter'])
540
                            n = len(user_filter.split('%s')) - 1
541
                            try:
542
                                query = filter_format(user_filter, (username,) * n)
543
                            except TypeError as e:
544
                                log.error('user_filter syntax error %r: %s', block['user_filter'],
545
                                          e)
546
                                return
547
                            log.debug('looking up dn for username %r using query %r', username,
548
                                      query)
549
                            results = conn.search_s(user_basedn, ldap.SCOPE_SUBTREE, query, [u'1.1'])
550
                            # remove search references
551
                            results = [result for result in results if result[0] is not None]
552
                            log.debug('found dns %r', results)
553
                            if len(results) == 0:
554
                                log.debug('user lookup failed: no entry found, %s' % query)
555
                            elif not block['multimatch'] and len(results) > 1:
556
                                log.error('user lookup failed: too many (%d) entries found: %s',
557
                                          len(results), query)
558
                            else:
559
                                authz_ids.extend(result[0] for result in results)
560
                        else:
561
                            raise NotImplementedError
562
                    except ldap.NO_SUCH_OBJECT:
563
                        log.error('user lookup failed: basedn %s not found', user_basedn)
564
                        if block['replicas']:
565
                            break
566
                        continue
567
                    except ldap.LDAPError as e:
568
                        log.error('user lookup failed: with query %r got error %s: %s', username,
569
                                  query, e)
570
                        continue
571
                if not authz_ids:
572
                    continue
573

  
574
                try:
575
                    failed = False
576
                    for authz_id in authz_ids:
577
                        if failed:
578
                            continue
579
                        try:
580
                            conn.simple_bind_s(authz_id, password)
581
                            user_login_success(authz_id)
582
                            if not block['connect_with_user_credentials']:
583
                                try:
584
                                    self.bind(block, conn)
585
                                except Exception as e:
586
                                    log.exception(u'rebind failure after login bind')
587
                                    raise ldap.SERVER_DOWN
588
                            break
589
                        except ldap.INVALID_CREDENTIALS:
590
                            user_login_failure(authz_id)
591
                            pass
531
                for dn in self.get_dn_from_username(conn, block, username):
532
                    if password is None or self.try_bind(conn, block, dn, password):
533
                        user_login_success(dn)
534
                        return self._return_user(dn, password, conn, block)
592 535
                    else:
593
                        log.debug('user bind failed: invalid credentials')
594
                        if block['replicas']:
595
                            break
596
                        continue
597
                except ldap.NO_SUCH_OBJECT:
598
                    # should not happen as we just searched for this object !
599
                    log.error('user bind failed: authz_id not found %r', ', '.join(authz_ids))
600
                    if block['replicas']:
601
                        break
602
                return self._return_user(authz_id, password, conn, block)
536
                        user_login_failure(dn)
603 537
            except ldap.CONNECT_ERROR:
604 538
                log.error('connection to %r failed, did you forget to declare the TLS certificate '
605 539
                          'in /etc/ldap/ldap.conf ?', block['url'])
......
607 541
                log.error('connection to %r timed out', block['url'])
608 542
            except ldap.SERVER_DOWN:
609 543
                log.error('ldap authentication error: %r is down', block['url'])
544
            except ldap.LDAPError, e:
545
                log.error('ldap error: %s', e)
546
                pass
610 547
            finally:
611 548
                del conn
612 549
        return None
613 550

  
551
    def get_dn_from_username(self, conn, block, username):
552
        user_basedn = block.get('user_basedn') or block['basedn']
553
        if block['user_dn_template']:
554
            template = force_bytes(block['user_dn_template'])
555
            escaped_username = escape_dn_chars(username)
556
            username = template.format(username=escaped_username)
557

  
558
        # allow multiple occurences of the username in the filter
559
        user_filter = block['user_filter']
560
        n = len(user_filter.split('%s')) - 1
561

  
562
        try:
563
            query = filter_format(user_filter, (username,) * n)
564
        except TypeError, e:
565
            log.error('user_filter syntax error %r: %s', block['user_filter'], e)
566
            return []
567
        log.debug('looking up dn for username %r using query %r', username,
568
                  query)
569
        try:
570
            results = conn.search_s(user_basedn, ldap.SCOPE_SUBTREE, query)
571
            results = [result for result in results if result[0] is not None]
572
            log.debug('found dns %r', results)
573
            if len(results) == 0:
574
                log.debug('user lookup failed: no entry found, %s' % query)
575
                return []
576
            elif not block['multimatch'] and len(results) > 1:
577
                log.error('user lookup failed: too many (%d) entries found: %s',
578
                          len(results), query)
579
                return []
580
            else:
581
                return [result[0] for result in results]
582
        except ldap.NO_SUCH_OBJECT:
583
            log.error('user dn lookup failed: basedn %s not found', user_basedn)
584
            raise
585
        except ldap.LDAPError, e:
586
            log.error('user dn lookup failed: with query %r got error %s', username,
587
                      query)
588
            raise
589

  
590
    def try_bind(self, conn, block, dn, password):
591
        try:
592
            conn.simple_bind_s(dn, password)
593
            if not block['connect_with_user_credentials']:
594
                try:
595
                    self.bind(block, conn)
596
                except Exception as e:
597
                    log.exception(u'rebind failure after login bind')
598
                    return False
599
            return True
600
        except ldap.INVALID_CREDENTIALS:
601
            return False
602
        except ldap.NO_SUCH_OBJECT:
603
            log.error('user bind failed: username not found %s', username)
604
            return False
605

  
614 606
    def get_user(self, user_id, session=None):
615 607
        try:
616 608
            try:
......
1119 1111

  
1120 1112
    @classmethod
1121 1113
    def get_users(cls):
1122
        logger = logging.getLogger(__name__)
1123 1114
        for block in cls.get_config():
1124 1115
            conn = cls.get_connection(block)
1125 1116
            if conn is None:
src/authentic2_auth_saml/__init__.py
7 7
        return ['mellon', __name__]
8 8

  
9 9
    def get_authentication_backends(self):
10
        return ['authentic2_auth_saml.backends.SAMLBackend']
10
        return ['authentic2_auth_saml.backends.SAMLLdapBackend',
11
                'authentic2_auth_saml.backends.SAMLBackend']
11 12

  
12 13
    def get_authenticators(self):
13 14
        return ['authentic2_auth_saml.authenticators.SAMLAuthenticator']
src/authentic2_auth_saml/backends.py
1
# authentic2_auth_saml - Authentic2 SAML Auth plugin
2
# Copyright (C) 2010-2019 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17

  
18
from django.conf import settings
19

  
1 20
from mellon.backends import SAMLBackend
2 21

  
3 22
from authentic2.middleware import StoreRequestMiddleware
23
from authentic2.backends.ldap_backend import LDAPBackend
4 24

  
5 25
from . import app_settings
6 26

  
......
22 42

  
23 43
        import lasso
24 44
        return lasso.SAML2_AUTHN_CONTEXT_PREVIOUS_SESSION
45

  
46

  
47
class SAMLLdapBackend(SAMLBackend):
48

  
49
    def get_username_from_saml_attributes(self, block, saml_attributes):
50
        username_template = block.get('saml_username_template', '')
51
        return username_template.format(**saml_attributes)
52

  
53
    def authenticate(self, saml_attributes, request=None):
54

  
55
        if not getattr(settings, 'LDAP_AUTH_SETTINGS', []):
56
            return None
57

  
58
        no_ldap_for_entity_id = True
59
        for block in settings.LDAP_AUTH_SETTINGS:
60
            if block.get('saml_entity_id', '') == saml_attributes['issuer']:
61
                no_ldap_for_entity_id = False
62
                break
63
        if no_ldap_for_entity_id:
64
            return None
65

  
66
        username = self.get_username_from_saml_attributes(block, saml_attributes)
67
        backend = LDAPBackend()
68
        return backend.authenticate_block(block, username)
tests/test_auth_saml.py
1
#
2
# Copyright (C) 2010-2019 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17

  
1 18
import pytest
19
import mock
2 20

  
3 21
from django.contrib.auth import get_user_model
22
from django.utils.timezone import datetime
23
from django.test.utils import override_settings
24

  
4 25
from authentic2.models import Attribute
5 26

  
6 27
pytestmark = pytest.mark.django_db
......
38 59
    del saml_attributes['mail']
39 60
    with pytest.raises(ValueError):
40 61
        adapter.finish_create_user(idp, saml_attributes, user)
62

  
63

  
64
@mock.patch('authentic2.backends.ldap_backend.LDAPBackend.get_dn_from_username')
65
@mock.patch('authentic2.backends.ldap_backend.LDAPBackend.authenticate_block')
66
def test_disabled_saml_ldap_backend(ldap_authenticate_block, ldap_user_dn):
67
    from authentic2_auth_saml.backends import SAMLLdapBackend
68
    backend = SAMLLdapBackend()
69
    saml_attributes = {}
70
    assert backend.authenticate(saml_attributes) == None
71
    with override_settings(LDAP_AUTH_SETTINGS=[]):
72
        assert backend.authenticate(saml_attributes) == None
73

  
74

  
75
    LDAP_SETTINGS = {
76
        "realm": "example.com",
77
        "url": "ldaps://ldap.example.com/",
78
        "basedn": "ou=people,o=example,o=com",
79
        "user_filter": "(|(mail=%s)(uid=%s))",
80
        "sync_ldap_users_filter": "",
81
        "username_template": "{uid[0]}",
82
        "attributes": [ "uid" ],
83
        "set_mandatory_groups": ["LDAP Entrouvert"],
84
        "timeout": 3,
85
        "use_tls": False,
86
        'saml_entity_id': 'https://some-idp.com/idp/saml2/metadata',
87
        'global_ldap_options': {},
88
    }
89

  
90
    saml_attributes = {'username': [u'foo'], 'first_name': [u'Foo'],
91
                'last_name': [u'Bar'], 'uid': [u'foobar'],
92
                'name_id_name_qualifier': u'https://some-idp.com/idp/saml2/metadata',
93
                'authn_instant': datetime(2019, 1, 30, 11, 12, 40),
94
                'name_id_format': u'urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified',
95
                'name_id_content': u'9f160fa0eb6045e5a5257a34fb4651e0',
96
                'mail': [u'foo@example.com'],
97
                'issuer': 'https://some-idp.com/idp/saml2/metadata'
98
    }
99

  
100
    with override_settings(A2_AUTH_SAML_LDAP_ENABLE=True, LDAP_AUTH_SETTINGS=[LDAP_SETTINGS]):
101
        ldap_user_dn.return_value = []
102
        ldap_authenticate_block.return_value = None
103
        backend.authenticate(saml_attributes)
104
        assert ldap_authenticate_block.call_args[0][0] == LDAP_SETTINGS
tests/test_ldap.py
542 542
    assert 'Étienne Michu' in response.body
543 543

  
544 544

  
545
def test_authenticate_block(slapd_strict_acl, db, settings, app):
546
    slapd = slapd_strict_acl
547
    settings.LDAP_AUTH_SETTINGS = [{
548
        'url': [slapd.ldap_url],
549
        'binddn': force_text(slapd.root_bind_dn),
550
        'bindpw': force_text(slapd.root_bind_password),
551
        'basedn': u'o=ôrga',
552
        'use_tls': False,
553
    }]
554
    backend = ldap_backend.LDAPBackend()
555
    block = backend.get_config()[0]
556
    user = backend.authenticate_block(block, USERNAME, PASS.decode('utf-8'))
557
    assert user is None
558

  
559
    # authenticate user with no password
560
    block = backend.get_config()[0]
561
    user = backend.authenticate_block(block, USERNAME)
562
    assert user.username == 'etienne.michu@ldap'
563

  
564
    # do not authenticate with user credentials
565
    settings.LDAP_AUTH_SETTINGS[0]['connect_with_user_credentials'] = False
566
    block = backend.get_config()[0]
567
    user = backend.authenticate_block(block, USERNAME, PASS.decode('utf-8'))
568
    assert user.username == 'etienne.michu@ldap'
569

  
570

  
545 571
def test_reset_password_ldap_user(slapd, settings, app, db):
546 572
    settings.LDAP_AUTH_SETTINGS = [{
547 573
        'url': [slapd.ldap_url],
548
-