Projet

Général

Profil

0001-journal-add-ldap-down-info-on-failed-user-login-5815.patch

Paul Marillonnet, 03 novembre 2021 15:30

Télécharger (8,59 ko)

Voir les différences:

Subject: [PATCH] journal: add ldap down info on failed user login (#58151)

 src/authentic2/authenticators.py          | 11 +++++--
 src/authentic2/backends/ldap_backend.py   | 28 ++++++++++++++++--
 src/authentic2/backends/models_backend.py |  4 +--
 src/authentic2/journal_event_types.py     | 10 +++++--
 tests/test_ldap.py                        | 36 +++++++++++++++++++++++
 5 files changed, 78 insertions(+), 11 deletions(-)
src/authentic2/authenticators.py
108 108
        data = request.POST if is_post else None
109 109
        initial = {}
110 110
        preferred_ous = []
111
        request.failed_logins = set()
111
        request.failed_logins = {}
112 112

  
113 113
        # Special handling when the form contains an OU selector
114 114
        if app_settings.A2_LOGIN_FORM_OU_SELECTOR:
......
151 151
            else:
152 152
                username = form.cleaned_data.get('username', '').strip()
153 153
                if request.failed_logins:
154
                    for user in request.failed_logins:
155
                        request.journal.record('user.login.failure', user=user, username=username)
154
                    for user, failure_data in request.failed_logins.items():
155
                        request.journal.record(
156
                            'user.login.failure',
157
                            user=user,
158
                            reason=failure_data.get('reason', None),
159
                            username=username,
160
                        )
156 161
                elif username:
157 162
                    request.journal.record('user.login.failure', username=username)
158 163
        context['form'] = form
src/authentic2/backends/ldap_backend.py
708 708
                            if success:
709 709
                                attributes = self.get_ldap_attributes(block, conn, authz_id)
710 710
                                user = self.lookup_existing_user(authz_id, block, attributes)
711
                                if user and hasattr(request, 'failed_logins'):
712
                                    request.failed_logins.add(user)
711
                                if (
712
                                    user
713
                                    and hasattr(request, 'failed_logins')
714
                                    and user not in request.failed_logins
715
                                ):
716
                                    request.failed_logins.update({user: {}})
713 717
                            else:
714 718
                                log.warning(
715 719
                                    'could not rebind after a bind failure, unable to attach the error to the'
......
735 739
                    ldap_uri,
736 740
                    block['url'],
737 741
                )
742
                self._record_failure_for_user(request, 'ldap connect error', authz_id, block, conn)
738 743
            except ldap.TIMEOUT:
739 744
                log.error('connection to %r timed out', block['url'])
745
                self._record_failure_for_user(request, 'ldap timeout', authz_id, block, conn)
740 746
            except ldap.SERVER_DOWN:
741 747
                log.error('ldap authentication error: %r is down', block['url'])
748
                self._record_failure_for_user(request, 'ldap server down', authz_id, block, conn)
742 749
            finally:
743 750
                del conn
744 751
        return None
......
1278 1285
        return '(&{})'.format(''.join(filters))
1279 1286

  
1280 1287
    def build_external_id(self, external_id_tuple, attributes):
1281
        """Build the exernal id for the user, use attribute that eventually
1288
        """Build the external id for the user, use attribute that eventually
1282 1289
        never change like GUID or UUID.
1283 1290
        """
1284 1291
        parts = []
......
1370 1377
                        user=user, source=force_text(block['realm'])
1371 1378
                    ).delete()
1372 1379

  
1380
    def _record_failure_for_user(self, request, reason, user_id, block, conn, attributes=None):
1381
        user = None
1382
        if not reason or not hasattr(request, 'failed_logins'):
1383
            return
1384
        attributes = attributes or self.get_ldap_attributes(block, conn, user_id)
1385
        for lookup_type in block['lookups']:
1386
            if lookup_type == 'external_id':
1387
                user = self.lookup_by_external_id(block, attributes)
1388
            elif lookup_type == 'username':
1389
                user = self.lookup_by_username(user_id)
1390
            if user:
1391
                break
1392
        if user:
1393
            request.failed_logins.update({user: {'username': user_id, 'reason': reason}})
1394

  
1373 1395
    def _return_user(self, dn, password, conn, block, attributes=None):
1374 1396
        attributes = attributes or self.get_ldap_attributes(block, conn, dn)
1375 1397
        if attributes is None:
src/authentic2/backends/models_backend.py
83 83
                return user
84 84
            else:
85 85
                user_login_failure(user.get_username())
86
                if hasattr(request, 'failed_logins'):
87
                    request.failed_logins.add(user)
86
                if hasattr(request, 'failed_logins') and user not in request.failed_logins:
87
                    request.failed_logins.update({user: {}})
88 88

  
89 89
    def get_user(self, user_id):
90 90
        UserModel = get_user_model()
src/authentic2/journal_event_types.py
150 150
    label = _('login failure')
151 151

  
152 152
    @classmethod
153
    def record(cls, service, username, user):
154
        super().record(user=user, service=service, data={'username': username})
153
    def record(cls, service, username, user, reason=None):
154
        super().record(user=user, service=service, data={'username': username, 'reason': reason})
155 155

  
156 156
    @classmethod
157 157
    def get_message(cls, event, context):
158 158
        username = event.get_data('username')
159
        return _('login failure with username "{username}"').format(username=username)
159
        reason = event.get_data('reason')
160
        msg = _('login failure with username "{username}"').format(username=username)
161
        if reason:
162
            msg.append(_(' (reason: {reason})').format(reason=reason))
163
        return msg
160 164

  
161 165

  
162 166
class UserRegistrationRequest(EventTypeDefinition):
tests/test_ldap.py
2073 2073
    user.userexternalid_set.all().delete()
2074 2074
    resp = app.get('/manage/users/%s/' % user.pk)
2075 2075
    assert 'LDAP' not in resp.text
2076

  
2077

  
2078
def test_user_journal_login_failure(slapd, settings, client, db, monkeypatch):
2079
    settings.LDAP_AUTH_SETTINGS = [
2080
        {
2081
            'url': [slapd.ldap_url],
2082
            'basedn': 'o=ôrga',
2083
            'use_tls': False,
2084
            'attributes': ['jpegPhoto'],
2085
        }
2086
    ]
2087

  
2088
    # create ldap user
2089
    result = client.post(
2090
        '/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
2091
    )
2092

  
2093
    for exception, reason in {
2094
        ldap.CONNECT_ERROR: 'ldap connect error',
2095
        ldap.TIMEOUT: 'ldap timeout',
2096
        ldap.SERVER_DOWN: 'ldap server down',
2097
    }.items():
2098

  
2099
        def patched_process_controls(cls, request, block, conn, authz_id, ctrls):
2100
            raise exception('oops')
2101

  
2102
        monkeypatch.setattr(
2103
            ldap_backend.LDAPBackend,
2104
            'process_controls',
2105
            patched_process_controls,
2106
        )
2107
        result = client.post(
2108
            '/login/', {'login-password-submit': '1', 'username': USERNAME, 'password': PASS}, follow=True
2109
        )
2110
        user = ldap_backend.LDAPUser.objects.get(username='%s@ldap' % UID)
2111
        utils.assert_event('user.login.failure', user=user, username=UID, reason=reason)
2076
-