Projet

Général

Profil

0002-ldap-add-method-and-command-to-deactivate-orphaned-u.patch

Voir les différences:

Subject: [PATCH 2/2] ldap: add method and command to deactivate orphaned users
 (#6379)

 debian/authentic2-multitenant.cron.d          |  1 +
 debian/authentic2.cron.d                      |  1 +
 src/authentic2/backends/ldap_backend.py       | 38 ++++++++++++++++++-
 .../deactivate-orphaned-ldap-users.py         | 25 ++++++++++++
 tests/test_ldap.py                            | 22 +++++++++++
 5 files changed, 85 insertions(+), 2 deletions(-)
 create mode 100644 src/authentic2/management/commands/deactivate-orphaned-ldap-users.py
debian/authentic2-multitenant.cron.d
5 5
5 * * * * authentic-multitenant authentic2-multitenant-manage tenant_command cleanupauthentic --all-tenants
6 6
10 * * * * authentic-multitenant authentic2-multitenant-manage tenant_command sync-ldap-users --all-tenants
7 7
0 5 * * * authentic-multitenant authentic2-multitenant-manage tenant_command clean-unused-accounts --all-tenants
8
30 5 * * * authentic-multitenant authentic2-multitenant-manage tenant_command deactivate-orphaned-ldap-users --all-tenants
debian/authentic2.cron.d
5 5
5 * * * * authentic2 authentic2-manage cleanupauthentic
6 6
10 * * * * authentic2 authentic2-manage sync-ldap-users
7 7
0 5 * * * authentic2 authentic2-manage clean-unused-accounts
8
30 5 * * * authentic2 authentic2-manage deactivate-orphaned-ldap-users
src/authentic2/backends/ldap_backend.py
1015 1015
                attribute = attribute.split(':', 1)[0]
1016 1016
            yield attribute
1017 1017

  
1018
    @classmethod
1019
    def get_sync_ldap_user_filter(cls, block):
1020
        user_filter = force_text(block['sync_ldap_users_filter'] or block['user_filter'])
1021
        user_filter = user_filter.replace('%s', '*')
1022
        return user_filter
1023

  
1018 1024
    @classmethod
1019 1025
    def get_ldap_attributes_names(cls, block):
1020 1026
        attributes = set()
......
1309 1315
                logger.warning(u'unable to synchronize with LDAP servers %s', force_text(block['url']))
1310 1316
                continue
1311 1317
            user_basedn = force_text(block.get('user_basedn') or block['basedn'])
1312
            user_filter = force_text(block['sync_ldap_users_filter'] or block['user_filter'])
1313
            user_filter = user_filter.replace('%s', '*')
1318
            user_filter = cls.get_sync_ldap_user_filter(block)
1314 1319
            attribute_names = cls.get_ldap_attributes_names(block)
1315 1320
            results = cls.paged_search(conn, user_basedn, ldap.SCOPE_SUBTREE, user_filter, attrlist=attribute_names)
1316 1321
            backend = cls()
......
1318 1323
                yield backend._return_user(dn, None, conn, block, attrs)
1319 1324

  
1320 1325

  
1326
    @classmethod
1327
    def deactivate_orphaned_users(cls):
1328
        for block in cls.get_config():
1329
            conn = cls.get_connection(block)
1330
            if conn is None:
1331
                continue
1332
            eids = list(UserExternalId.objects.filter(user__is_active=True,
1333
                                                      source=block['realm']).values_list('external_id', flat=True))
1334
            basedn = force_text(block.get('user_basedn') or block['basedn'])
1335
            attribute_names = [a[0] for a in cls.attribute_name_from_external_id_tuple(block['external_id_tuples'])]
1336
            user_filter = cls.get_sync_ldap_user_filter(block)
1337
            results = cls.paged_search(conn, basedn, ldap.SCOPE_SUBTREE,
1338
                                       user_filter,
1339
                                       attrlist=attribute_names)
1340
            for dn, attrs in results:
1341
                data = attrs.copy()
1342
                data['dn'] = dn
1343
                for eid_tuple in map_text(block['external_id_tuples']):
1344
                    backend = cls()
1345
                    external_id = backend.build_external_id(eid_tuple, data)
1346
                    if external_id:
1347
                        try:
1348
                            eids.remove(external_id)
1349
                        except ValueError:
1350
                            pass
1351
            for eid in UserExternalId.objects.filter(external_id__in=eids):
1352
                eid.user.mark_as_inactive()
1353

  
1354

  
1321 1355
    @classmethod
1322 1356
    def ad_encoding(cls, s):
1323 1357
        '''Encode a string for AD consumption as a password'''
src/authentic2/management/commands/deactivate-orphaned-ldap-users.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2021 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from django.core.management.base import BaseCommand
18

  
19
from authentic2.backends.ldap_backend import LDAPBackend
20

  
21

  
22
class Command(BaseCommand):
23

  
24
    def handle(self, *args, **kwargs):
25
        LDAPBackend.deactivate_orphaned_users()
tests/test_ldap.py
218 218
    assert 'password' not in client.session['ldap-data']
219 219

  
220 220

  
221
def test_deactivate_orphaned_users(slapd, settings, client, db):
222
    settings.LDAP_AUTH_SETTINGS = [{
223
        'url': [slapd.ldap_url],
224
        'basedn': u'o=ôrga',
225
        'use_tls': False,
226
    }]
227

  
228
    # create users as a side effect
229
    list(ldap_backend.LDAPBackend.get_users())
230
    block = settings.LDAP_AUTH_SETTINGS[0]
231
    assert ldap_backend.UserExternalId.objects.filter(
232
        user__is_active=False, source=block['realm']).count() == 0
233

  
234
    conn = slapd.get_connection_admin()
235
    conn.delete_s(DN)
236

  
237
    ldap_backend.LDAPBackend.deactivate_orphaned_users()
238

  
239
    assert ldap_backend.UserExternalId.objects.filter(
240
        user__is_active=False, source=block['realm']).count() == 1
241

  
242

  
221 243
@pytest.mark.django_db
222 244
def test_simple_with_binddn(slapd, settings, client):
223 245
    settings.LDAP_AUTH_SETTINGS = [{
224
-