Projet

Général

Profil

0001-journal_event_types-add-ldap-user-deactivation-52671.patch

Valentin Deniaud, 20 mai 2021 15:09

Télécharger (8,68 ko)

Voir les différences:

Subject: [PATCH] journal_event_types: add ldap user deactivation (#52671)

 src/authentic2/backends/ldap_backend.py       |  6 ++++
 src/authentic2/manager/journal_event_types.py | 24 ++++++++++---
 tests/test_ldap.py                            | 20 ++++++++---
 tests/test_manager_journal.py                 | 34 +++++++++++++++++++
 tests/utils.py                                |  4 ++-
 5 files changed, 79 insertions(+), 9 deletions(-)
src/authentic2/backends/ldap_backend.py
55 55
from authentic2.backends import is_user_authenticable
56 56
from authentic2.compat_lasso import lasso
57 57
from authentic2.ldap_utils import FilterFormatter
58
from authentic2.manager.journal_event_types import ManagerUserDeactivation
58 59
from authentic2.middleware import StoreRequestMiddleware
59 60
from authentic2.models import UserExternalId
60 61
from authentic2.user_login_failure import user_login_failure, user_login_success
......
1538 1539
            conn = cls.get_connection(block)
1539 1540
            if conn is None:
1540 1541
                continue
1542
            ldap_uri = conn.get_option(ldap.OPT_URI)
1541 1543
            eids = list(
1542 1544
                UserExternalId.objects.filter(user__is_active=True, source=block['realm']).values_list(
1543 1545
                    'external_id', flat=True
......
1566 1568
                external_id__in=eids, user__is_active=True, source=block['realm']
1567 1569
            ):
1568 1570
                eid.user.mark_as_inactive()
1571
                ManagerUserDeactivation.record(
1572
                    target_user=eid.user, reason='ldap-not-present', origin=ldap_uri
1573
                )
1569 1574
        # Handle users of old sources
1570 1575
        uei_qs = UserExternalId.objects.exclude(source__in=[block['realm'] for block in cls.get_config()])
1571 1576
        for user in User.objects.filter(userexternalid__in=uei_qs):
1572 1577
            user.mark_as_inactive()
1578
            ManagerUserDeactivation.record(target_user=user, reason='ldap-old-source', origin=ldap_uri)
1573 1579

  
1574 1580
    @classmethod
1575 1581
    def ad_encoding(cls, s):
src/authentic2/manager/journal_event_types.py
201 201
    label = _('user deactivation')
202 202

  
203 203
    @classmethod
204
    def record(cls, user, session, target_user):
205
        super().record(user=user, session=session, references=[target_user])
204
    def record(cls, target_user, user=None, session=None, origin='backoffice', reason=None):
205
        data = {'reason': reason, 'origin': origin}
206
        super().record(user=user, session=session, references=[target_user], data=data)
206 207

  
207 208
    @classmethod
208 209
    def get_message(cls, event, context):
209 210
        (user,) = event.get_typed_references(User)
211
        reason = event.get_data('reason')
210 212
        if context and context == user:
211
            return _('deactivation by administrator')
213
            if reason == 'ldap-not-present':
214
                return _('automatic deactivation because the associated LDAP account does not exist anymore')
215
            elif reason == 'ldap-old-source':
216
                return _('automatic deactivation because the associated LDAP source has been deleted')
217
            else:
218
                return _('deactivation by administrator')
212 219
        elif user:
213
            return _('deactivation of user "%s"') % user.get_full_name()
220
            if reason == 'ldap-not-present':
221
                return _(
222
                    'automatic deactivation of user "%s" because the associated LDAP account does not exist anymore'
223
                )
224
            elif reason == 'ldap-old-source':
225
                return _(
226
                    'automatic deactivation of user "%s" because the associated LDAP source has been deleted'
227
                )
228
            else:
229
                return _('deactivation of user "%s"') % user.get_full_name()
214 230
        return super().get_message(event, context)
215 231

  
216 232

  
tests/test_ldap.py
254 254

  
255 255
    ldap_backend.LDAPBackend.deactivate_orphaned_users()
256 256

  
257
    assert (
258
        ldap_backend.UserExternalId.objects.filter(user__is_active=False, source=block['realm']).count() == 1
257
    deactivated_user = ldap_backend.UserExternalId.objects.get(user__is_active=False, source=block['realm'])
258
    utils.assert_event(
259
        'manager.user.deactivation',
260
        target_user=deactivated_user.user,
261
        reason='ldap-not-present',
262
        origin=slapd.ldap_url,
259 263
    )
260 264

  
261 265
    # rename source realm
......
264 268
    ]
265 269

  
266 270
    ldap_backend.LDAPBackend.deactivate_orphaned_users()
267
    assert (
268
        ldap_backend.UserExternalId.objects.filter(user__is_active=False, source=block['realm']).count() == 6
271
    deactivated_users = ldap_backend.UserExternalId.objects.filter(
272
        user__is_active=False, source=block['realm']
269 273
    )
274
    assert deactivated_users.count() == 6
275
    for ldap_user in deactivated_users.exclude(pk=deactivated_user.pk):
276
        utils.assert_event(
277
            'manager.user.deactivation',
278
            target_user=ldap_user.user,
279
            reason='ldap-old-source',
280
            origin=slapd.ldap_url,
281
        )
270 282

  
271 283

  
272 284
@pytest.mark.django_db
tests/test_manager_journal.py
251 251
        old_email='old@example.com',
252 252
        new_email='new@example.com',
253 253
    )
254
    make(
255
        'manager.user.deactivation',
256
        target_user=user,
257
        reason='ldap-not-present',
258
    )
259
    make(
260
        'manager.user.deactivation',
261
        target_user=user,
262
        reason='ldap-old-source',
263
    )
254 264

  
255 265
    # verify we created at least one event for each type
256 266
    assert set(Event.objects.values_list("type__name", flat=True)) == set(_registry)
......
542 552
            'type': 'user.email.change',
543 553
            'user': 'Johnny doe',
544 554
        },
555
        {
556
            'timestamp': 'Jan. 2, 2020, 5 p.m.',
557
            'type': 'manager.user.deactivation',
558
            'user': '-',
559
            'message': 'automatic deactivation of user "%s" because the associated LDAP account does not exist anymore',
560
        },
561
        {
562
            'timestamp': 'Jan. 2, 2020, 6 p.m.',
563
            'type': 'manager.user.deactivation',
564
            'user': '-',
565
            'message': 'automatic deactivation of user "%s" because the associated LDAP source has been deleted',
566
        },
545 567
    ]
546 568

  
547 569

  
......
727 749
            'type': 'user.email.change',
728 750
            'user': 'Johnny doe',
729 751
        },
752
        {
753
            'timestamp': 'Jan. 2, 2020, 5 p.m.',
754
            'type': 'manager.user.deactivation',
755
            'user': '-',
756
            'message': 'automatic deactivation because the associated LDAP account does not exist anymore',
757
        },
758
        {
759
            'timestamp': 'Jan. 2, 2020, 6 p.m.',
760
            'type': 'manager.user.deactivation',
761
            'user': '-',
762
            'message': 'automatic deactivation because the associated LDAP source has been deleted',
763
        },
730 764
    ]
731 765

  
732 766

  
tests/utils.py
264 264
    return ''.join(node.itertext()) if node is not None else ''
265 265

  
266 266

  
267
def assert_event(event_type_name, user=None, session=None, service=None, **data):
267
def assert_event(event_type_name, user=None, session=None, service=None, target_user=None, **data):
268 268
    qs = Event.objects.filter(type__name=event_type_name)
269 269
    if user:
270 270
        qs = qs.filter(user=user)
......
278 278
        qs = qs.which_references(service)
279 279
    else:
280 280
        qs = qs.exclude(qs._which_references_query(models.Service))
281
    if target_user:
282
        qs = qs.which_references(target_user)
281 283

  
282 284
    assert qs.count() == 1
283 285

  
284
-