0001-backends-add-SAML-LDAP-authentication-backend-30125.patch
src/authentic2/backends/ldap_backend.py | ||
---|---|---|
463 | 463 |
'can_reset_password': False, |
464 | 464 |
# mapping from LDAP attributes to User attributes |
465 | 465 |
'user_attributes': [], |
466 |
# entity id of the IDP based on the same LDAP |
|
467 |
'saml_entity_id': '', |
|
468 |
# template for SAML username |
|
469 |
'saml_username_template': '{uid[0]}' |
|
466 | 470 |
} |
467 | 471 |
_REQUIRED = ('url', 'basedn') |
468 | 472 |
_TO_ITERABLE = ('url', 'groupsu', 'groupstaff', 'groupactive') |
... | ... | |
520 | 524 |
if user is not None: |
521 | 525 |
return user |
522 | 526 | |
523 |
def authenticate_block(self, block, username, password): |
|
527 |
def authenticate_block(self, block, username, password=None): |
|
528 |
utf8_username = force_bytes(username) |
|
524 | 529 |
for conn in self.get_connections(block): |
525 |
authz_ids = [] |
|
526 |
user_basedn = force_text(block.get('user_basedn') or block['basedn']) |
|
527 | ||
528 | 530 |
try: |
529 |
if block['user_dn_template']: |
|
530 |
template = force_text(block['user_dn_template']) |
|
531 |
escaped_username = escape_dn_chars(username) |
|
532 |
authz_ids.append(template.format(username=escaped_username)) |
|
533 |
else: |
|
534 |
try: |
|
535 |
if block.get('bind_with_username'): |
|
536 |
authz_ids.append(username) |
|
537 |
elif block['user_filter']: |
|
538 |
# allow multiple occurences of the username in the filter |
|
539 |
user_filter = force_text(block['user_filter']) |
|
540 |
n = len(user_filter.split('%s')) - 1 |
|
541 |
try: |
|
542 |
query = filter_format(user_filter, (username,) * n) |
|
543 |
except TypeError as e: |
|
544 |
log.error('user_filter syntax error %r: %s', block['user_filter'], |
|
545 |
e) |
|
546 |
return |
|
547 |
log.debug('looking up dn for username %r using query %r', username, |
|
548 |
query) |
|
549 |
results = conn.search_s(user_basedn, ldap.SCOPE_SUBTREE, query, [u'1.1']) |
|
550 |
# remove search references |
|
551 |
results = [result for result in results if result[0] is not None] |
|
552 |
log.debug('found dns %r', results) |
|
553 |
if len(results) == 0: |
|
554 |
log.debug('user lookup failed: no entry found, %s' % query) |
|
555 |
elif not block['multimatch'] and len(results) > 1: |
|
556 |
log.error('user lookup failed: too many (%d) entries found: %s', |
|
557 |
len(results), query) |
|
558 |
else: |
|
559 |
authz_ids.extend(result[0] for result in results) |
|
560 |
else: |
|
561 |
raise NotImplementedError |
|
562 |
except ldap.NO_SUCH_OBJECT: |
|
563 |
log.error('user lookup failed: basedn %s not found', user_basedn) |
|
564 |
if block['replicas']: |
|
565 |
break |
|
566 |
continue |
|
567 |
except ldap.LDAPError as e: |
|
568 |
log.error('user lookup failed: with query %r got error %s: %s', username, |
|
569 |
query, e) |
|
570 |
continue |
|
571 |
if not authz_ids: |
|
572 |
continue |
|
573 | ||
574 |
try: |
|
575 |
failed = False |
|
576 |
for authz_id in authz_ids: |
|
577 |
if failed: |
|
578 |
continue |
|
579 |
try: |
|
580 |
conn.simple_bind_s(authz_id, password) |
|
581 |
user_login_success(authz_id) |
|
582 |
if not block['connect_with_user_credentials']: |
|
583 |
try: |
|
584 |
self.bind(block, conn) |
|
585 |
except Exception as e: |
|
586 |
log.exception(u'rebind failure after login bind') |
|
587 |
raise ldap.SERVER_DOWN |
|
588 |
break |
|
589 |
except ldap.INVALID_CREDENTIALS: |
|
590 |
user_login_failure(authz_id) |
|
591 |
pass |
|
531 |
for dn in self.get_dn_from_username(conn, block, username): |
|
532 |
if password is None or self.try_bind(conn, block, dn, password): |
|
533 |
user_login_success(dn) |
|
534 |
return self._return_user(dn, password, conn, block) |
|
592 | 535 |
else: |
593 |
log.debug('user bind failed: invalid credentials') |
|
594 |
if block['replicas']: |
|
595 |
break |
|
596 |
continue |
|
597 |
except ldap.NO_SUCH_OBJECT: |
|
598 |
# should not happen as we just searched for this object ! |
|
599 |
log.error('user bind failed: authz_id not found %r', ', '.join(authz_ids)) |
|
600 |
if block['replicas']: |
|
601 |
break |
|
602 |
return self._return_user(authz_id, password, conn, block) |
|
536 |
user_login_failure(dn) |
|
603 | 537 |
except ldap.CONNECT_ERROR: |
604 | 538 |
log.error('connection to %r failed, did you forget to declare the TLS certificate ' |
605 | 539 |
'in /etc/ldap/ldap.conf ?', block['url']) |
... | ... | |
607 | 541 |
log.error('connection to %r timed out', block['url']) |
608 | 542 |
except ldap.SERVER_DOWN: |
609 | 543 |
log.error('ldap authentication error: %r is down', block['url']) |
544 |
except ldap.LDAPError, e: |
|
545 |
log.error('ldap error: %s', e) |
|
546 |
pass |
|
610 | 547 |
finally: |
611 | 548 |
del conn |
612 | 549 |
return None |
613 | 550 | |
551 |
def get_dn_from_username(self, conn, block, username): |
|
552 |
user_basedn = block.get('user_basedn') or block['basedn'] |
|
553 |
if block['user_dn_template']: |
|
554 |
template = force_bytes(block['user_dn_template']) |
|
555 |
escaped_username = escape_dn_chars(username) |
|
556 |
username = template.format(username=escaped_username) |
|
557 | ||
558 |
# allow multiple occurences of the username in the filter |
|
559 |
user_filter = block['user_filter'] |
|
560 |
n = len(user_filter.split('%s')) - 1 |
|
561 | ||
562 |
try: |
|
563 |
query = filter_format(user_filter, (username,) * n) |
|
564 |
except TypeError, e: |
|
565 |
log.error('user_filter syntax error %r: %s', block['user_filter'], e) |
|
566 |
return [] |
|
567 |
log.debug('looking up dn for username %r using query %r', username, |
|
568 |
query) |
|
569 |
try: |
|
570 |
results = conn.search_s(user_basedn, ldap.SCOPE_SUBTREE, query) |
|
571 |
results = [result for result in results if result[0] is not None] |
|
572 |
log.debug('found dns %r', results) |
|
573 |
if len(results) == 0: |
|
574 |
log.debug('user lookup failed: no entry found, %s' % query) |
|
575 |
return [] |
|
576 |
elif not block['multimatch'] and len(results) > 1: |
|
577 |
log.error('user lookup failed: too many (%d) entries found: %s', |
|
578 |
len(results), query) |
|
579 |
return [] |
|
580 |
else: |
|
581 |
return [result[0] for result in results] |
|
582 |
except ldap.NO_SUCH_OBJECT: |
|
583 |
log.error('user dn lookup failed: basedn %s not found', user_basedn) |
|
584 |
raise |
|
585 |
except ldap.LDAPError, e: |
|
586 |
log.error('user dn lookup failed: with query %r got error %s', username, |
|
587 |
query) |
|
588 |
raise |
|
589 | ||
590 |
def try_bind(self, conn, block, dn, password): |
|
591 |
try: |
|
592 |
conn.simple_bind_s(dn, password) |
|
593 |
if not block['connect_with_user_credentials']: |
|
594 |
try: |
|
595 |
self.bind(block, conn) |
|
596 |
except Exception as e: |
|
597 |
log.exception(u'rebind failure after login bind') |
|
598 |
return False |
|
599 |
return True |
|
600 |
except ldap.INVALID_CREDENTIALS: |
|
601 |
return False |
|
602 |
except ldap.NO_SUCH_OBJECT: |
|
603 |
log.error('user bind failed: username not found %s', username) |
|
604 |
return False |
|
605 | ||
614 | 606 |
def get_user(self, user_id, session=None): |
615 | 607 |
try: |
616 | 608 |
try: |
... | ... | |
1119 | 1111 | |
1120 | 1112 |
@classmethod |
1121 | 1113 |
def get_users(cls): |
1122 |
logger = logging.getLogger(__name__) |
|
1123 | 1114 |
for block in cls.get_config(): |
1124 | 1115 |
conn = cls.get_connection(block) |
1125 | 1116 |
if conn is None: |
src/authentic2_auth_saml/__init__.py | ||
---|---|---|
7 | 7 |
return ['mellon', __name__] |
8 | 8 | |
9 | 9 |
def get_authentication_backends(self): |
10 |
return ['authentic2_auth_saml.backends.SAMLBackend'] |
|
10 |
return ['authentic2_auth_saml.backends.SAMLLdapBackend', |
|
11 |
'authentic2_auth_saml.backends.SAMLBackend'] |
|
11 | 12 | |
12 | 13 |
def get_authenticators(self): |
13 | 14 |
return ['authentic2_auth_saml.authenticators.SAMLAuthenticator'] |
src/authentic2_auth_saml/backends.py | ||
---|---|---|
1 |
# authentic2_auth_saml - Authentic2 SAML Auth plugin |
|
2 |
# Copyright (C) 2010-2019 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 | ||
18 |
from django.conf import settings |
|
19 | ||
1 | 20 |
from mellon.backends import SAMLBackend |
2 | 21 | |
3 | 22 |
from authentic2.middleware import StoreRequestMiddleware |
23 |
from authentic2.backends.ldap_backend import LDAPBackend |
|
4 | 24 | |
5 | 25 |
from . import app_settings |
6 | 26 | |
... | ... | |
22 | 42 | |
23 | 43 |
import lasso |
24 | 44 |
return lasso.SAML2_AUTHN_CONTEXT_PREVIOUS_SESSION |
45 | ||
46 | ||
47 |
class SAMLLdapBackend(SAMLBackend): |
|
48 | ||
49 |
def get_username_from_saml_attributes(self, block, saml_attributes): |
|
50 |
username_template = block.get('saml_username_template', '') |
|
51 |
return username_template.format(**saml_attributes) |
|
52 | ||
53 |
def authenticate(self, saml_attributes, request=None): |
|
54 | ||
55 |
if not getattr(settings, 'LDAP_AUTH_SETTINGS', []): |
|
56 |
return None |
|
57 | ||
58 |
no_ldap_for_entity_id = True |
|
59 |
for block in settings.LDAP_AUTH_SETTINGS: |
|
60 |
if block.get('saml_entity_id', '') == saml_attributes['issuer']: |
|
61 |
no_ldap_for_entity_id = False |
|
62 |
break |
|
63 |
if no_ldap_for_entity_id: |
|
64 |
return None |
|
65 | ||
66 |
username = self.get_username_from_saml_attributes(block, saml_attributes) |
|
67 |
backend = LDAPBackend() |
|
68 |
return backend.authenticate_block(block, username) |
tests/test_auth_saml.py | ||
---|---|---|
1 |
# |
|
2 |
# Copyright (C) 2010-2019 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 | ||
1 | 18 |
import pytest |
19 |
import mock |
|
2 | 20 | |
3 | 21 |
from django.contrib.auth import get_user_model |
22 |
from django.utils.timezone import datetime |
|
23 |
from django.test.utils import override_settings |
|
24 | ||
4 | 25 |
from authentic2.models import Attribute |
5 | 26 | |
6 | 27 |
pytestmark = pytest.mark.django_db |
... | ... | |
38 | 59 |
del saml_attributes['mail'] |
39 | 60 |
with pytest.raises(ValueError): |
40 | 61 |
adapter.finish_create_user(idp, saml_attributes, user) |
62 | ||
63 | ||
64 |
@mock.patch('authentic2.backends.ldap_backend.LDAPBackend.get_dn_from_username') |
|
65 |
@mock.patch('authentic2.backends.ldap_backend.LDAPBackend.authenticate_block') |
|
66 |
def test_disabled_saml_ldap_backend(ldap_authenticate_block, ldap_user_dn): |
|
67 |
from authentic2_auth_saml.backends import SAMLLdapBackend |
|
68 |
backend = SAMLLdapBackend() |
|
69 |
saml_attributes = {} |
|
70 |
assert backend.authenticate(saml_attributes) == None |
|
71 |
with override_settings(LDAP_AUTH_SETTINGS=[]): |
|
72 |
assert backend.authenticate(saml_attributes) == None |
|
73 | ||
74 | ||
75 |
LDAP_SETTINGS = { |
|
76 |
"realm": "example.com", |
|
77 |
"url": "ldaps://ldap.example.com/", |
|
78 |
"basedn": "ou=people,o=example,o=com", |
|
79 |
"user_filter": "(|(mail=%s)(uid=%s))", |
|
80 |
"sync_ldap_users_filter": "", |
|
81 |
"username_template": "{uid[0]}", |
|
82 |
"attributes": [ "uid" ], |
|
83 |
"set_mandatory_groups": ["LDAP Entrouvert"], |
|
84 |
"timeout": 3, |
|
85 |
"use_tls": False, |
|
86 |
'saml_entity_id': 'https://some-idp.com/idp/saml2/metadata', |
|
87 |
'global_ldap_options': {}, |
|
88 |
} |
|
89 | ||
90 |
saml_attributes = {'username': [u'foo'], 'first_name': [u'Foo'], |
|
91 |
'last_name': [u'Bar'], 'uid': [u'foobar'], |
|
92 |
'name_id_name_qualifier': u'https://some-idp.com/idp/saml2/metadata', |
|
93 |
'authn_instant': datetime(2019, 1, 30, 11, 12, 40), |
|
94 |
'name_id_format': u'urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified', |
|
95 |
'name_id_content': u'9f160fa0eb6045e5a5257a34fb4651e0', |
|
96 |
'mail': [u'foo@example.com'], |
|
97 |
'issuer': 'https://some-idp.com/idp/saml2/metadata' |
|
98 |
} |
|
99 | ||
100 |
with override_settings(A2_AUTH_SAML_LDAP_ENABLE=True, LDAP_AUTH_SETTINGS=[LDAP_SETTINGS]): |
|
101 |
ldap_user_dn.return_value = [] |
|
102 |
ldap_authenticate_block.return_value = None |
|
103 |
backend.authenticate(saml_attributes) |
|
104 |
assert ldap_authenticate_block.call_args[0][0] == LDAP_SETTINGS |
tests/test_ldap.py | ||
---|---|---|
542 | 542 |
assert 'Étienne Michu' in response.body |
543 | 543 | |
544 | 544 | |
545 |
def test_authenticate_block(slapd_strict_acl, db, settings, app): |
|
546 |
slapd = slapd_strict_acl |
|
547 |
settings.LDAP_AUTH_SETTINGS = [{ |
|
548 |
'url': [slapd.ldap_url], |
|
549 |
'binddn': force_text(slapd.root_bind_dn), |
|
550 |
'bindpw': force_text(slapd.root_bind_password), |
|
551 |
'basedn': u'o=ôrga', |
|
552 |
'use_tls': False, |
|
553 |
}] |
|
554 |
backend = ldap_backend.LDAPBackend() |
|
555 |
block = backend.get_config()[0] |
|
556 |
user = backend.authenticate_block(block, USERNAME, PASS.decode('utf-8')) |
|
557 |
assert user is None |
|
558 | ||
559 |
# authenticate user with no password |
|
560 |
block = backend.get_config()[0] |
|
561 |
user = backend.authenticate_block(block, USERNAME) |
|
562 |
assert user.username == 'etienne.michu@ldap' |
|
563 | ||
564 |
# do not authenticate with user credentials |
|
565 |
settings.LDAP_AUTH_SETTINGS[0]['connect_with_user_credentials'] = False |
|
566 |
block = backend.get_config()[0] |
|
567 |
user = backend.authenticate_block(block, USERNAME, PASS.decode('utf-8')) |
|
568 |
assert user.username == 'etienne.michu@ldap' |
|
569 | ||
570 | ||
545 | 571 |
def test_reset_password_ldap_user(slapd, settings, app, db): |
546 | 572 |
settings.LDAP_AUTH_SETTINGS = [{ |
547 | 573 |
'url': [slapd.ldap_url], |
548 |
- |