0002-ldap-add-method-and-command-to-deactivate-orphaned-u.patch
debian/authentic2-multitenant.cron.d | ||
---|---|---|
5 | 5 |
5 * * * * authentic-multitenant authentic2-multitenant-manage tenant_command cleanupauthentic --all-tenants |
6 | 6 |
10 * * * * authentic-multitenant authentic2-multitenant-manage tenant_command sync-ldap-users --all-tenants |
7 | 7 |
0 5 * * * authentic-multitenant authentic2-multitenant-manage tenant_command clean-unused-accounts --all-tenants |
8 |
30 5 * * * authentic-multitenant authentic2-multitenant-manage tenant_command deactivate-orphaned-ldap-users --all-tenants |
debian/authentic2.cron.d | ||
---|---|---|
5 | 5 |
5 * * * * authentic2 authentic2-manage cleanupauthentic |
6 | 6 |
10 * * * * authentic2 authentic2-manage sync-ldap-users |
7 | 7 |
0 5 * * * authentic2 authentic2-manage clean-unused-accounts |
8 |
30 5 * * * authentic2 authentic2-manage deactivate-orphaned-ldap-users |
src/authentic2/backends/ldap_backend.py | ||
---|---|---|
1015 | 1015 |
attribute = attribute.split(':', 1)[0] |
1016 | 1016 |
yield attribute |
1017 | 1017 | |
1018 |
@classmethod |
|
1019 |
def get_sync_ldap_user_filter(cls, block): |
|
1020 |
user_filter = force_text(block['sync_ldap_users_filter'] or block['user_filter']) |
|
1021 |
user_filter = user_filter.replace('%s', '*') |
|
1022 |
return user_filter |
|
1023 | ||
1018 | 1024 |
@classmethod |
1019 | 1025 |
def get_ldap_attributes_names(cls, block): |
1020 | 1026 |
attributes = set() |
... | ... | |
1309 | 1315 |
logger.warning(u'unable to synchronize with LDAP servers %s', force_text(block['url'])) |
1310 | 1316 |
continue |
1311 | 1317 |
user_basedn = force_text(block.get('user_basedn') or block['basedn']) |
1312 |
user_filter = force_text(block['sync_ldap_users_filter'] or block['user_filter']) |
|
1313 |
user_filter = user_filter.replace('%s', '*') |
|
1318 |
user_filter = cls.get_sync_ldap_user_filter(block) |
|
1314 | 1319 |
attribute_names = cls.get_ldap_attributes_names(block) |
1315 | 1320 |
results = cls.paged_search(conn, user_basedn, ldap.SCOPE_SUBTREE, user_filter, attrlist=attribute_names) |
1316 | 1321 |
backend = cls() |
... | ... | |
1318 | 1323 |
yield backend._return_user(dn, None, conn, block, attrs) |
1319 | 1324 | |
1320 | 1325 | |
1326 |
@classmethod |
|
1327 |
def deactivate_orphaned_users(cls): |
|
1328 |
for block in cls.get_config(): |
|
1329 |
conn = cls.get_connection(block) |
|
1330 |
if conn is None: |
|
1331 |
continue |
|
1332 |
eids = list(UserExternalId.objects.filter(user__is_active=True, |
|
1333 |
source=block['realm']).values_list('external_id', flat=True)) |
|
1334 |
basedn = force_text(block.get('user_basedn') or block['basedn']) |
|
1335 |
attribute_names = [a[0] for a in cls.attribute_name_from_external_id_tuple(block['external_id_tuples'])] |
|
1336 |
user_filter = cls.get_sync_ldap_user_filter(block) |
|
1337 |
results = cls.paged_search(conn, basedn, ldap.SCOPE_SUBTREE, |
|
1338 |
user_filter, |
|
1339 |
attrlist=attribute_names) |
|
1340 |
for dn, attrs in results: |
|
1341 |
data = attrs.copy() |
|
1342 |
data['dn'] = dn |
|
1343 |
for eid_tuple in map_text(block['external_id_tuples']): |
|
1344 |
backend = cls() |
|
1345 |
external_id = backend.build_external_id(eid_tuple, data) |
|
1346 |
if external_id: |
|
1347 |
try: |
|
1348 |
eids.remove(external_id) |
|
1349 |
except ValueError: |
|
1350 |
pass |
|
1351 |
for eid in UserExternalId.objects.filter(external_id__in=eids): |
|
1352 |
eid.user.mark_as_inactive() |
|
1353 | ||
1354 | ||
1321 | 1355 |
@classmethod |
1322 | 1356 |
def ad_encoding(cls, s): |
1323 | 1357 |
'''Encode a string for AD consumption as a password''' |
src/authentic2/management/commands/deactivate-orphaned-ldap-users.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2021 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from django.core.management.base import BaseCommand |
|
18 | ||
19 |
from authentic2.backends.ldap_backend import LDAPBackend |
|
20 | ||
21 | ||
22 |
class Command(BaseCommand): |
|
23 | ||
24 |
def handle(self, *args, **kwargs): |
|
25 |
LDAPBackend.deactivate_orphaned_users() |
tests/test_ldap.py | ||
---|---|---|
218 | 218 |
assert 'password' not in client.session['ldap-data'] |
219 | 219 | |
220 | 220 | |
221 |
def test_deactivate_orphaned_users(slapd, settings, client, db): |
|
222 |
settings.LDAP_AUTH_SETTINGS = [{ |
|
223 |
'url': [slapd.ldap_url], |
|
224 |
'basedn': u'o=ôrga', |
|
225 |
'use_tls': False, |
|
226 |
}] |
|
227 | ||
228 |
# create users as a side effect |
|
229 |
list(ldap_backend.LDAPBackend.get_users()) |
|
230 |
block = settings.LDAP_AUTH_SETTINGS[0] |
|
231 |
assert ldap_backend.UserExternalId.objects.filter( |
|
232 |
user__is_active=False, source=block['realm']).count() == 0 |
|
233 | ||
234 |
conn = slapd.get_connection_admin() |
|
235 |
conn.delete_s(DN) |
|
236 | ||
237 |
ldap_backend.LDAPBackend.deactivate_orphaned_users() |
|
238 | ||
239 |
assert ldap_backend.UserExternalId.objects.filter( |
|
240 |
user__is_active=False, source=block['realm']).count() == 1 |
|
241 | ||
242 | ||
221 | 243 |
@pytest.mark.django_db |
222 | 244 |
def test_simple_with_binddn(slapd, settings, client): |
223 | 245 |
settings.LDAP_AUTH_SETTINGS = [{ |
224 |
- |