From a63ff26655f0b8baf85ea2a6f1a09dc8ff425329 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Thu, 19 Oct 2017 12:26:49 +0200 Subject: [PATCH 3/3] ldap_backend: add setting connect_with_user_credentials The default is True, if False we never try to query the LDAP server with the user credentials apart from checking the password. --- src/authentic2/backends/ldap_backend.py | 66 ++++++++++++++++++++++----------- tests/test_ldap.py | 49 ++++++++++++++++++++++-- 2 files changed, 90 insertions(+), 25 deletions(-) diff --git a/src/authentic2/backends/ldap_backend.py b/src/authentic2/backends/ldap_backend.py index 18f2d3a..48e43cb 100644 --- a/src/authentic2/backends/ldap_backend.py +++ b/src/authentic2/backends/ldap_backend.py @@ -294,6 +294,8 @@ class LDAPBackend(object): 'use_password_modify': True, # Target OU 'ou_slug': '', + # use user credentials when we have them to connect to the LDAP + 'connect_with_user_credentials': True, } _REQUIRED = ('url', 'basedn') _TO_ITERABLE = ('url', 'groupsu', 'groupstaff', 'groupactive') @@ -400,10 +402,19 @@ class LDAPBackend(object): continue try: + failed = False for authz_id in authz_ids: + if failed: + continue try: conn.simple_bind_s(authz_id, utf8_password) user_login_success(authz_id) + if not block['connect_with_user_credentials']: + try: + self.bind(block, conn) + except Exception as e: + log.exception(u'rebind failure after login bind') + raise ldap.SERVER_DOWN break except ldap.INVALID_CREDENTIALS: user_login_failure(authz_id) @@ -947,37 +958,48 @@ class LDAPBackend(object): else: log.error('ldap %r is down', url) continue - try: - if credentials: - conn.bind_s(*credentials) - elif block['bindsasl']: - sasl_mech, who, sasl_params = block['bindsasl'] - handler_class = getattr(ldap.sasl, sasl_mech) - auth = handler_class(*sasl_params) - conn.sasl_interactive_bind_s(who, auth) - elif block['binddn'] and block['bindpw']: - conn.bind_s(block['binddn'], block['bindpw']) + user_credentials = block['connect_with_user_credentials'] and credentials + success, error = cls.bind(block, conn, credentials=user_credentials) + if success: yield conn - except ldap.INVALID_CREDENTIALS: - log.error('admin bind failed on %s: invalid credentials', url) - if block['replicas']: - break - except ldap.INVALID_DN_SYNTAX: - log.error('admin bind failed on %s: invalid dn syntax %r', url, who) - if block['replicas']: - break - except (ldap.TIMEOUT, ldap.CONNECT_ERROR, ldap.SERVER_DOWN): + else: if block['replicas']: - log.warning('ldap %r is down', url) + log.warning(u'admin bind failed on %s: %s', url, error) else: - log.error('ldap %r is down', url) - continue + log.error(u'admin bind failed on %s: %s', url, error) + + @classmethod + def bind(cls, block, conn, credentials=()): + '''Bind to the LDAP server''' + try: + if credentials: + who = credentials[0] + conn.bind_s(*credentials) + elif block['bindsasl']: + sasl_mech, who, sasl_params = block['bindsasl'] + handler_class = getattr(ldap.sasl, sasl_mech) + auth = handler_class(*sasl_params) + conn.sasl_interactive_bind_s(who, auth) + elif block['binddn'] and block['bindpw']: + who = block['binddn'] + conn.bind_s(block['binddn'], block['bindpw']) + else: + who = 'anonymous' + conn.simple_bind_s() + return True, None + except ldap.INVALID_CREDENTIALS: + return False, 'invalid credentials' + except ldap.INVALID_DN_SYNTAX: + return False, 'invalid dn syntax %r' % who + except (ldap.TIMEOUT, ldap.CONNECT_ERROR, ldap.SERVER_DOWN): + return False, 'ldap is down' @classmethod def get_connection(cls, block, credentials=()): '''Try to get at least one connection''' for conn in cls.get_connections(block, credentials=credentials): return conn + log.error('could not get a connection') @classmethod def update_default(cls, block): diff --git a/tests/test_ldap.py b/tests/test_ldap.py index 34d1014..d9a7cfb 100644 --- a/tests/test_ldap.py +++ b/tests/test_ldap.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- import pytest import mock + +import ldap from ldap.dn import escape_dn_chars from ldaptools.slapd import Slapd, has_slapd @@ -11,8 +13,6 @@ from django_rbac.utils import get_ou_model from authentic2.backends import ldap_backend from authentic2 import crypto -from pytest_django.migrations import DisableMigrations - import utils pytestmark = pytest.mark.skipunless(has_slapd(), reason='slapd is not installed') @@ -390,5 +390,48 @@ def test_nocreate_mandatory_roles(slapd, settings): 'create_role': False, }] - users = list(ldap_backend.LDAPBackend.get_users()) + list(ldap_backend.LDAPBackend.get_users()) assert User.objects.first().roles.count() == 0 + + +@pytest.fixture +def slapd_strict_acl(slapd): + # forbid modifications by user themselves + conn = slapd.get_connection_external() + conn.modify_s( + 'olcDatabase={1}mdb,cn=config', + [ + (ldap.MOD_REPLACE, 'olcAccess', [ + '{0}to * by dn.subtree="o=orga" none by * manage' + ]) + ]) + return slapd + + +def test_no_connect_with_user_credentials(slapd_strict_acl, db, settings, app): + slapd = slapd_strict_acl + settings.LDAP_AUTH_SETTINGS = [{ + 'url': [slapd.ldap_url], + 'basedn': 'o=orga', + 'use_tls': False, + 'create_group': True, + 'group_mapping': [ + ('cn=group2,o=orga', ['Group2']), + ], + 'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))', + 'set_mandatory_roles': ['tech', 'admin'], + 'create_role': False, + }] + response = app.get('/login/') + response.form.set('username', USERNAME) + response.form.set('password', PASS) + response = response.form.submit('login-password-submit') + assert response.status_code == 200 + assert 'Étienne Michu' not in response.body + + settings.LDAP_AUTH_SETTINGS[0]['connect_with_user_credentials'] = False + response = app.get('/login/') + response.form.set('username', USERNAME) + response.form.set('password', PASS) + response = response.form.submit('login-password-submit').follow() + assert 'Étienne Michu' in response.body -- 2.1.4