Projet

Général

Profil

0001-ldap-optionally-collects-messages-from-ppolicy.patch

Loïc Dachary, 12 février 2021 00:01

Télécharger (16,5 ko)

Voir les différences:

Subject: [PATCH] ldap: optionally collects messages from ppolicy

Enable PasswordPolicyControl[0] in authenticate() and log the
information it returns, on success or error. In the context of a
request, this information is also set as a message[1] to be displayed
to the user.

[0] https://github.com/python-ldap/python-ldap/blob/python-ldap-3.3.1/Lib/ldap/controls/ppolicy.py
[1] https://docs.djangoproject.com/en/3.1/ref/contrib/messages/

Fixes: #50959

License: MIT
 src/authentic2/backends/ldap_backend.py |  71 +++++-
 tests/test_ldap.py                      | 275 ++++++++++++++++++++++++
 2 files changed, 340 insertions(+), 6 deletions(-)
src/authentic2/backends/ldap_backend.py
23 23
    from ldap.filter import filter_format
24 24
    from ldap.dn import escape_dn_chars
25 25
    from ldap.ldapobject import ReconnectLDAPObject as NativeLDAPObject
26
    from ldap.controls import SimplePagedResultsControl
26
    from ldap.controls import SimplePagedResultsControl, DecodeControlTuples
27
    from ldap.controls import ppolicy
28
    from pyasn1.codec.der import decoder
27 29
    PYTHON_LDAP3 = [int(x) for x in ldap.__version__.split('.')] >= [3]
28 30
    LDAPObject = NativeLDAPObject
29 31
except ImportError:
......
34 36
import base64
35 37
import os
36 38
import json
39
import time
37 40

  
38 41
# code originaly copied from by now merely inspired by
39 42
# http://www.amherst.k12.oh.us/django-ldap.html
......
41 44
from django.core.cache import cache
42 45
from django.core.exceptions import ImproperlyConfigured
43 46
from django.conf import settings
47
from django.contrib import messages
44 48
from django.contrib.auth import get_user_model
45 49
from django.contrib.auth.models import Group
46 50
from django.utils.encoding import force_bytes, force_text
47 51
from django.utils import six
48 52
from django.utils.six.moves.urllib import parse as urlparse
53
from django.utils.translation import ugettext as _, ngettext
49 54

  
50 55
from authentic2.a2_rbac.models import Role
51 56

  
......
232 237
    raise NotImplementedError
233 238

  
234 239

  
240
def password_policy_control_messages(ctrl):
241
    messages = []
242

  
243
    if ctrl.error:
244
        error = ppolicy.PasswordPolicyError.namedValues[ctrl.error]
245
        error2message = {
246
            'passwordExpired': _('The password expired'),
247
            'accountLocked': _('The account is locked.'),
248
            'changeAfterReset': _('The password was reset and must be changed.'),
249
            'passwordModNotAllowed': _('It is not possible to modify the password.'),
250
            'mustSupplyOldPassword': _('The old password must be supplied.'),
251
            'insufficientPasswordQuality': _('The password does not meet the quality requirements.'),
252
            'passwordTooShort': _('The password is too short.'),
253
            'passwordTooYoung': _('It is too soon to change the password.'),
254
            'passwordInHistory': _('This password was recently used and cannot be used again.'),
255
        }
256
        messages.append(error2message.get(error, _('Unexpected error {error}').format(error=error)))
257
        return messages
258

  
259
    if ctrl.timeBeforeExpiration:
260
        timeBeforeExpiration = time.asctime(time.localtime(time.time() + ctrl.timeBeforeExpiration))
261
        messages.append(_('The password will expire at {timeBeforeExpiration}.').format(
262
            timeBeforeExpiration=timeBeforeExpiration))
263
    if ctrl.graceAuthNsRemaining:
264
        messages.append(ngettext(
265
            'This password expired: this is the last time it can be used.',
266
            'This password expired and can only be used {graceAuthNsRemaining} times, including this one.',
267
            ctrl.graceAuthNsRemaining).format(graceAuthNsRemaining=ctrl.graceAuthNsRemaining))
268
    return messages
269

  
235 270
class LDAPUser(User):
236 271
    SESSION_LDAP_DATA_KEY = 'ldap-data'
237 272
    _changed = False
......
358 393
            if not conn:
359 394
                log.warning('ldap: set_password failed, could not get a connection')
360 395
                return
361
            self.ldap_backend.modify_password(conn, self.block, self.dn, _current_password, new_password)
396
            try:
397
                self.ldap_backend.modify_password(conn, self.block, self.dn, _current_password, new_password)
398
            except ldap.STRONG_AUTH_REQUIRED:
399
                log.warning('ldap: set_password failed, STRONG_AUTH_REQUIRED')
400
                return
362 401
            self._current_password = new_password
363 402
        self.keep_password_in_session(new_password)
364 403
        if self.block['keep_password']:
......
507 546
        'can_reset_password': False,
508 547
        # mapping from LDAP attributes to User attributes
509 548
        'user_attributes': [],
549
        # https://www.python-ldap.org/en/python-ldap-3.3.0/reference/ldap.html#ldap-controls
550
        'use_controls': True,
510 551
    }
511 552
    _REQUIRED = ('url', 'basedn')
512 553
    _TO_ITERABLE = ('url', 'groupsu', 'groupstaff', 'groupactive')
......
537 578
        log.debug('got config %r', blocks)
538 579
        return blocks
539 580

  
581
    @staticmethod
582
    def process_controls(request, authz_id, ctrls):
583
        for c in ctrls:
584
            if c.controlType == ppolicy.PasswordPolicyControl.controlType:
585
                message = ' '.join(password_policy_control_messages(c))
586
                if request is not None:
587
                    messages.add_message(request, messages.WARNING, message)
588
            else:
589
                message = str(vars(c))
590
            log.info('ldap: bind error with authz_id "%s" -> "%s"', authz_id, message)
591

  
540 592
    def authenticate(self, request=None, username=None, password=None, realm=None, ou=None):
541 593
        if username is None or password is None:
542 594
            return None
......
566 618
                log.error(
567 619
                    "account name authentication filter doesn't contain '%s'")
568 620
                continue
569
            user = self.authenticate_block(block, uid, password)
621
            user = self.authenticate_block(request, block, uid, password)
570 622
            if user is not None:
571 623
                return user
572 624

  
573
    def authenticate_block(self, block, username, password):
625
    def authenticate_block(self, request, block, username, password):
574 626
        for conn in self.get_connections(block):
575 627
            ldap_uri = conn.get_option(ldap.OPT_URI)
576 628
            authz_ids = []
......
628 680
                        if failed:
629 681
                            continue
630 682
                        try:
631
                            conn.simple_bind_s(authz_id, password)
683
                            if block.get('use_controls'):
684
                                serverctrls = [ppolicy.PasswordPolicyControl()]
685
                            else:
686
                                serverctrls = []
687
                            results = conn.simple_bind_s(authz_id, password, serverctrls=serverctrls)
688
                            self.process_controls(request, authz_id, results[3])
632 689
                            user_login_success(authz_id)
633 690
                            if not block['connect_with_user_credentials']:
634 691
                                try:
......
637 694
                                    log.exception(u'rebind failure after login bind')
638 695
                                    raise ldap.SERVER_DOWN
639 696
                            break
640
                        except ldap.INVALID_CREDENTIALS:
697
                        except ldap.INVALID_CREDENTIALS as e:
698
                            if block.get('use_controls') and len(e.args) > 0 and 'ctrls' in e.args[0]:
699
                                self.process_controls(request, authz_id, DecodeControlTuples(e.args[0]['ctrls']))
641 700
                            user_login_failure(authz_id)
642 701
                            pass
643 702
                    else:
tests/test_ldap.py
20 20

  
21 21
import pytest
22 22
import mock
23
import time
23 24

  
24 25
import ldap
25 26
from ldap.dn import escape_dn_chars
......
77 78
    with create_slapd() as s:
78 79
        yield s
79 80

  
81
@pytest.fixture
82
def slapd_ppolicy():
83
    with create_slapd() as slapd:
84
        conn = slapd.get_connection_admin()
85
        assert conn.protocol_version == ldap.VERSION3
86
        conn.modify_s(
87
            'cn=module{0},cn=config',
88
            [
89
                (ldap.MOD_ADD, 'olcModuleLoad', [
90
                    force_bytes('ppolicy')
91
                ])
92
            ])
93
        with open('/etc/ldap/schema/ppolicy.ldif') as fd:
94
            slapd.add_ldif(fd.read())
95
        slapd.add_ldif('''
96
dn: olcOverlay={0}ppolicy,olcDatabase={2}mdb,cn=config
97
objectclass: olcOverlayConfig
98
objectclass: olcPPolicyConfig
99
olcoverlay: {0}ppolicy
100
olcppolicydefault: cn=default,ou=ppolicies,o=ôrga
101
olcppolicyforwardupdates: FALSE
102
olcppolicyhashcleartext: TRUE
103
olcppolicyuselockout: TRUE
104
''')
105

  
106
        slapd.add_ldif('''
107
dn: ou=ppolicies,o=ôrga
108
objectclass: organizationalUnit
109
ou: ppolicies
110
''')
111
        yield slapd
112

  
80 113

  
81 114
@pytest.fixture
82 115
def tls_slapd():
......
872 905
    assert user.pk == user2.pk
873 906

  
874 907

  
908
def test_login_ppolicy_pwdMaxFailure(slapd_ppolicy, settings, db, app):
909
    settings.LDAP_AUTH_SETTINGS = [{
910
        'url': [slapd_ppolicy.ldap_url],
911
        'basedn': u'o=ôrga',
912
        'use_tls': False,
913
    }]
914

  
915
    pwdMaxFailure = 2
916
    slapd_ppolicy.add_ldif('''
917
dn: cn=default,ou=ppolicies,o=ôrga
918
cn: default
919
objectclass: top
920
objectclass: device
921
objectclass: pwdPolicy
922
objectclass: pwdPolicyChecker
923
pwdAttribute: userPassword
924
pwdMinAge: 0
925
pwdMaxAge: 0
926
pwdInHistory: 0
927
pwdCheckQuality: 0
928
pwdMinLength: 0
929
pwdExpireWarning: 0
930
pwdGraceAuthnLimit: 0
931
pwdLockout: TRUE
932
pwdLockoutDuration: 0
933
pwdMaxFailure: {pwdMaxFailure}
934
pwdMaxRecordedFailure: 0
935
pwdFailureCountInterval: 0
936
pwdMustChange: FALSE
937
pwdAllowUserChange: FALSE
938
pwdSafeModify: FALSE
939
'''.format(pwdMaxFailure=pwdMaxFailure))
940

  
941
    for _ in range(pwdMaxFailure):
942
        response = app.get('/login/')
943
        response.form.set('username', USERNAME)
944
        response.form.set('password', 'invalid')
945
        response = response.form.submit(name='login-password-submit')
946
        assert 'Incorrect Username or password' in str(response.pyquery('.errornotice'))
947
        assert 'account is locked' not in str(response.pyquery('.messages'))
948
    response = app.get('/login/')
949
    response.form.set('username', USERNAME)
950
    response.form.set('password', 'invalid')
951
    response = response.form.submit(name='login-password-submit')
952
    assert 'account is locked' in str(response.pyquery('.messages'))
953

  
954

  
955
def ppolicy_authenticate_exactly_pwdMaxFailure(slapd_ppolicy, caplog):
956
    pwdMaxFailure = 2
957
    slapd_ppolicy.add_ldif('''
958
dn: cn=default,ou=ppolicies,o=ôrga
959
cn: default
960
objectclass: top
961
objectclass: device
962
objectclass: pwdPolicy
963
objectclass: pwdPolicyChecker
964
pwdAttribute: userPassword
965
pwdMinAge: 0
966
pwdMaxAge: 0
967
pwdInHistory: 0
968
pwdCheckQuality: 0
969
pwdMinLength: 0
970
pwdExpireWarning: 0
971
pwdGraceAuthnLimit: 0
972
pwdLockout: TRUE
973
pwdLockoutDuration: 0
974
pwdMaxFailure: {pwdMaxFailure}
975
pwdMaxRecordedFailure: 0
976
pwdFailureCountInterval: 0
977
pwdMustChange: FALSE
978
pwdAllowUserChange: FALSE
979
pwdSafeModify: FALSE
980
'''.format(pwdMaxFailure=pwdMaxFailure))
981

  
982
    for _ in range(pwdMaxFailure):
983
        assert authenticate(username=USERNAME, password='incorrect') is None
984
        assert "failed to login" in caplog.text
985

  
986

  
987
def test_authenticate_ppolicy_pwdMaxFailure(slapd_ppolicy, settings, db, caplog):
988
    settings.LDAP_AUTH_SETTINGS = [{
989
        'url': [slapd_ppolicy.ldap_url],
990
        'basedn': u'o=ôrga',
991
        'use_tls': False,
992
    }]
993

  
994
    ppolicy_authenticate_exactly_pwdMaxFailure(slapd_ppolicy, caplog)
995
    assert 'account is locked' not in caplog.text
996
    assert authenticate(username=USERNAME, password='incorrect') is None
997
    assert 'account is locked' in caplog.text
998

  
999

  
1000
def test_do_not_use_controls(slapd_ppolicy, settings, db, caplog):
1001
    """
1002
    Same as test_authenticate_ppolicy_pwdMaxFailure but with use_controls
1003
    deactivated and therefore not logging when an account is locked.
1004
    """
1005
    settings.LDAP_AUTH_SETTINGS = [{
1006
        'url': [slapd_ppolicy.ldap_url],
1007
        'basedn': u'o=ôrga',
1008
        'use_tls': False,
1009
        'use_controls': False,
1010
    }]
1011

  
1012
    ppolicy_authenticate_exactly_pwdMaxFailure(slapd_ppolicy, caplog)
1013
    assert 'account is locked' not in caplog.text
1014
    assert authenticate(username=USERNAME, password='incorrect') is None
1015
    # this following line is the difference with test_authenticate_ppolicy_pwdMaxFailure
1016
    assert 'account is locked' not in caplog.text
1017

  
1018

  
1019
def test_authenticate_ppolicy_pwdGraceAuthnLimit(slapd_ppolicy, settings, db, caplog):
1020
    settings.LDAP_AUTH_SETTINGS = [{
1021
        'url': [slapd_ppolicy.ldap_url],
1022
        'basedn': u'o=ôrga',
1023
        'use_tls': False,
1024
    }]
1025

  
1026
    pwdMaxAge = 1
1027
    pwdGraceAuthnLimit = 2
1028
    slapd_ppolicy.add_ldif('''
1029
dn: cn=default,ou=ppolicies,o=ôrga
1030
cn: default
1031
objectclass: top
1032
objectclass: device
1033
objectclass: pwdPolicy
1034
objectclass: pwdPolicyChecker
1035
pwdAttribute: userPassword
1036
pwdMinAge: 0
1037
pwdMaxAge: {pwdMaxAge}
1038
pwdInHistory: 1
1039
pwdCheckQuality: 0
1040
pwdMinLength: 0
1041
pwdExpireWarning: 0
1042
pwdGraceAuthnLimit: {pwdGraceAuthnLimit}
1043
pwdLockout: TRUE
1044
pwdLockoutDuration: 0
1045
pwdMaxFailure: 0
1046
pwdMaxRecordedFailure: 0
1047
pwdFailureCountInterval: 0
1048
pwdMustChange: FALSE
1049
pwdAllowUserChange: TRUE
1050
pwdSafeModify: FALSE
1051
'''.format(pwdMaxAge=pwdMaxAge, pwdGraceAuthnLimit=pwdGraceAuthnLimit))
1052

  
1053
    user = authenticate(username=USERNAME, password=UPASS)
1054
    assert user.check_password(UPASS)
1055
    password = u'ogutOmyetew4'
1056
    user.set_password(password)
1057

  
1058
    time.sleep(pwdMaxAge * 3)
1059

  
1060
    assert 'used 2 time' not in caplog.text
1061
    assert authenticate(username=USERNAME, password=password) is not None
1062
    assert 'used 2 times' in caplog.text
1063

  
1064
    assert 'last time' not in caplog.text
1065
    assert authenticate(username=USERNAME, password=password) is not None
1066
    assert 'last time' in caplog.text
1067

  
1068

  
1069
def test_authenticate_ppolicy_pwdExpireWarning(slapd_ppolicy, settings, db, caplog):
1070
    settings.LDAP_AUTH_SETTINGS = [{
1071
        'url': [slapd_ppolicy.ldap_url],
1072
        'basedn': u'o=ôrga',
1073
        'use_tls': False,
1074
    }]
1075

  
1076
    pwdMaxAge = 3600
1077
    slapd_ppolicy.add_ldif('''
1078
dn: cn=default,ou=ppolicies,o=ôrga
1079
cn: default
1080
objectclass: top
1081
objectclass: device
1082
objectclass: pwdPolicy
1083
objectclass: pwdPolicyChecker
1084
pwdAttribute: userPassword
1085
pwdMinAge: 0
1086
pwdMaxAge: {pwdMaxAge}
1087
pwdInHistory: 1
1088
pwdCheckQuality: 0
1089
pwdMinLength: 0
1090
pwdExpireWarning: {pwdMaxAge}
1091
pwdGraceAuthnLimit: 0
1092
pwdLockout: TRUE
1093
pwdLockoutDuration: 0
1094
pwdMaxFailure: 0
1095
pwdMaxRecordedFailure: 0
1096
pwdFailureCountInterval: 0
1097
pwdMustChange: FALSE
1098
pwdAllowUserChange: TRUE
1099
pwdSafeModify: FALSE
1100
'''.format(pwdMaxAge=pwdMaxAge))
1101

  
1102
    user = authenticate(username=USERNAME, password=UPASS)
1103
    assert user.check_password(UPASS)
1104
    password = u'ogutOmyetew4'
1105
    user.set_password(password)
1106

  
1107
    time.sleep(2)
1108

  
1109
    assert 'password will expire' not in caplog.text
1110
    assert authenticate(username=USERNAME, password=password) is not None
1111
    assert 'password will expire' in caplog.text
1112

  
1113

  
1114
def test_authenticate_ppolicy_pwdAllowUserChange(slapd_ppolicy, settings, db, caplog):
1115
    settings.LDAP_AUTH_SETTINGS = [{
1116
        'url': [slapd_ppolicy.ldap_url],
1117
        'basedn': u'o=ôrga',
1118
        'use_tls': False,
1119
    }]
1120

  
1121
    slapd_ppolicy.add_ldif('''
1122
dn: cn=default,ou=ppolicies,o=ôrga
1123
cn: default
1124
objectclass: top
1125
objectclass: device
1126
objectclass: pwdPolicy
1127
pwdAttribute: userPassword
1128
pwdMinAge: 0
1129
pwdMaxAge: 0
1130
pwdInHistory: 0
1131
pwdCheckQuality: 0
1132
pwdMinLength: 0
1133
pwdExpireWarning: 0
1134
pwdGraceAuthnLimit: 0
1135
pwdLockout: TRUE
1136
pwdLockoutDuration: 0
1137
pwdMaxFailure: 0
1138
pwdMaxRecordedFailure: 0
1139
pwdFailureCountInterval: 0
1140
pwdMustChange: FALSE
1141
pwdAllowUserChange: FALSE
1142
pwdSafeModify: FALSE
1143
''')
1144

  
1145
    user = authenticate(username=USERNAME, password=UPASS)
1146
    assert user.set_password(u'ogutOmyetew4') is None
1147
    assert 'STRONG_AUTH_REQUIRED' in caplog.text
1148

  
1149

  
875 1150
def test_ou_selector(slapd, settings, app, ou1):
876 1151
    settings.LDAP_AUTH_SETTINGS = [{
877 1152
        'url': [slapd.ldap_url],
878
-