0003-ldap_backend-add-setting-connect_with_user_credentia.patch
src/authentic2/backends/ldap_backend.py | ||
---|---|---|
294 | 294 |
'use_password_modify': True, |
295 | 295 |
# Target OU |
296 | 296 |
'ou_slug': '', |
297 |
# use user credentials when we have them to connect to the LDAP |
|
298 |
'connect_with_user_credentials': True, |
|
297 | 299 |
} |
298 | 300 |
_REQUIRED = ('url', 'basedn') |
299 | 301 |
_TO_ITERABLE = ('url', 'groupsu', 'groupstaff', 'groupactive') |
... | ... | |
400 | 402 |
continue |
401 | 403 | |
402 | 404 |
try: |
405 |
failed = False |
|
403 | 406 |
for authz_id in authz_ids: |
407 |
if failed: |
|
408 |
continue |
|
404 | 409 |
try: |
405 | 410 |
conn.simple_bind_s(authz_id, utf8_password) |
406 | 411 |
user_login_success(authz_id) |
412 |
if not block['connect_with_user_credentials']: |
|
413 |
try: |
|
414 |
self.bind(block, conn) |
|
415 |
except Exception as e: |
|
416 |
log.exception(u'rebind failure after login bind') |
|
417 |
raise ldap.SERVER_DOWN |
|
407 | 418 |
break |
408 | 419 |
except ldap.INVALID_CREDENTIALS: |
409 | 420 |
user_login_failure(authz_id) |
... | ... | |
947 | 958 |
else: |
948 | 959 |
log.error('ldap %r is down', url) |
949 | 960 |
continue |
950 |
try: |
|
951 |
if credentials: |
|
952 |
conn.bind_s(*credentials) |
|
953 |
elif block['bindsasl']: |
|
954 |
sasl_mech, who, sasl_params = block['bindsasl'] |
|
955 |
handler_class = getattr(ldap.sasl, sasl_mech) |
|
956 |
auth = handler_class(*sasl_params) |
|
957 |
conn.sasl_interactive_bind_s(who, auth) |
|
958 |
elif block['binddn'] and block['bindpw']: |
|
959 |
conn.bind_s(block['binddn'], block['bindpw']) |
|
961 |
user_credentials = block['connect_with_user_credentials'] and credentials |
|
962 |
success, error = cls.bind(block, conn, credentials=user_credentials) |
|
963 |
if success: |
|
960 | 964 |
yield conn |
961 |
except ldap.INVALID_CREDENTIALS: |
|
962 |
log.error('admin bind failed on %s: invalid credentials', url) |
|
963 |
if block['replicas']: |
|
964 |
break |
|
965 |
except ldap.INVALID_DN_SYNTAX: |
|
966 |
log.error('admin bind failed on %s: invalid dn syntax %r', url, who) |
|
967 |
if block['replicas']: |
|
968 |
break |
|
969 |
except (ldap.TIMEOUT, ldap.CONNECT_ERROR, ldap.SERVER_DOWN): |
|
965 |
else: |
|
970 | 966 |
if block['replicas']: |
971 |
log.warning('ldap %r is down', url)
|
|
967 |
log.warning(u'admin bind failed on %s: %s', url, error)
|
|
972 | 968 |
else: |
973 |
log.error('ldap %r is down', url) |
|
974 |
continue |
|
969 |
log.error(u'admin bind failed on %s: %s', url, error) |
|
970 | ||
971 |
@classmethod |
|
972 |
def bind(cls, block, conn, credentials=()): |
|
973 |
'''Bind to the LDAP server''' |
|
974 |
try: |
|
975 |
if credentials: |
|
976 |
who = credentials[0] |
|
977 |
conn.bind_s(*credentials) |
|
978 |
elif block['bindsasl']: |
|
979 |
sasl_mech, who, sasl_params = block['bindsasl'] |
|
980 |
handler_class = getattr(ldap.sasl, sasl_mech) |
|
981 |
auth = handler_class(*sasl_params) |
|
982 |
conn.sasl_interactive_bind_s(who, auth) |
|
983 |
elif block['binddn'] and block['bindpw']: |
|
984 |
who = block['binddn'] |
|
985 |
conn.bind_s(block['binddn'], block['bindpw']) |
|
986 |
else: |
|
987 |
who = 'anonymous' |
|
988 |
conn.simple_bind_s() |
|
989 |
return True, None |
|
990 |
except ldap.INVALID_CREDENTIALS: |
|
991 |
return False, 'invalid credentials' |
|
992 |
except ldap.INVALID_DN_SYNTAX: |
|
993 |
return False, 'invalid dn syntax %r' % who |
|
994 |
except (ldap.TIMEOUT, ldap.CONNECT_ERROR, ldap.SERVER_DOWN): |
|
995 |
return False, 'ldap is down' |
|
975 | 996 | |
976 | 997 |
@classmethod |
977 | 998 |
def get_connection(cls, block, credentials=()): |
978 | 999 |
'''Try to get at least one connection''' |
979 | 1000 |
for conn in cls.get_connections(block, credentials=credentials): |
980 | 1001 |
return conn |
1002 |
log.error('could not get a connection') |
|
981 | 1003 | |
982 | 1004 |
@classmethod |
983 | 1005 |
def update_default(cls, block): |
tests/test_ldap.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 |
import pytest |
3 | 3 |
import mock |
4 | ||
5 |
import ldap |
|
4 | 6 |
from ldap.dn import escape_dn_chars |
5 | 7 | |
6 | 8 |
from ldaptools.slapd import Slapd, has_slapd |
... | ... | |
11 | 13 |
from authentic2.backends import ldap_backend |
12 | 14 |
from authentic2 import crypto |
13 | 15 | |
14 |
from pytest_django.migrations import DisableMigrations |
|
15 | ||
16 | 16 |
import utils |
17 | 17 | |
18 | 18 |
pytestmark = pytest.mark.skipunless(has_slapd(), reason='slapd is not installed') |
... | ... | |
390 | 390 |
'create_role': False, |
391 | 391 |
}] |
392 | 392 | |
393 |
users = list(ldap_backend.LDAPBackend.get_users())
|
|
393 |
list(ldap_backend.LDAPBackend.get_users()) |
|
394 | 394 |
assert User.objects.first().roles.count() == 0 |
395 | ||
396 | ||
397 |
@pytest.fixture |
|
398 |
def slapd_strict_acl(slapd): |
|
399 |
# forbid modifications by user themselves |
|
400 |
conn = slapd.get_connection_external() |
|
401 |
conn.modify_s( |
|
402 |
'olcDatabase={1}mdb,cn=config', |
|
403 |
[ |
|
404 |
(ldap.MOD_REPLACE, 'olcAccess', [ |
|
405 |
'{0}to * by dn.subtree="o=orga" none by * manage' |
|
406 |
]) |
|
407 |
]) |
|
408 |
return slapd |
|
409 | ||
410 | ||
411 |
def test_no_connect_with_user_credentials(slapd_strict_acl, db, settings, app): |
|
412 |
slapd = slapd_strict_acl |
|
413 |
settings.LDAP_AUTH_SETTINGS = [{ |
|
414 |
'url': [slapd.ldap_url], |
|
415 |
'basedn': 'o=orga', |
|
416 |
'use_tls': False, |
|
417 |
'create_group': True, |
|
418 |
'group_mapping': [ |
|
419 |
('cn=group2,o=orga', ['Group2']), |
|
420 |
], |
|
421 |
'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))', |
|
422 |
'set_mandatory_roles': ['tech', 'admin'], |
|
423 |
'create_role': False, |
|
424 |
}] |
|
425 |
response = app.get('/login/') |
|
426 |
response.form.set('username', USERNAME) |
|
427 |
response.form.set('password', PASS) |
|
428 |
response = response.form.submit('login-password-submit') |
|
429 |
assert response.status_code == 200 |
|
430 |
assert 'Étienne Michu' not in response.body |
|
431 | ||
432 |
settings.LDAP_AUTH_SETTINGS[0]['connect_with_user_credentials'] = False |
|
433 |
response = app.get('/login/') |
|
434 |
response.form.set('username', USERNAME) |
|
435 |
response.form.set('password', PASS) |
|
436 |
response = response.form.submit('login-password-submit').follow() |
|
437 |
assert 'Étienne Michu' in response.body |
|
395 |
- |