Projet

Général

Profil

0001-LDAPBackend-reactive-user-on-login-synchronization-i.patch

Benjamin Dauvergne, 18 mai 2021 21:09

Télécharger (13,7 ko)

Voir les différences:

Subject: [PATCH] LDAPBackend: reactive user on login/synchronization if
 inactive (#52670)

 src/authentic2/backends/ldap_backend.py       |  11 +-
 .../0027_user_deactivation_reason.py          |  18 ++
 src/authentic2/custom_user/models.py          |  12 +-
 tests/test_api.py                             | 170 ++++--------------
 tests/test_ldap.py                            |  43 ++++-
 5 files changed, 106 insertions(+), 148 deletions(-)
 create mode 100644 src/authentic2/custom_user/migrations/0027_user_deactivation_reason.py
src/authentic2/backends/ldap_backend.py
37 37
import os
38 38
import random
39 39
import time
40
import urllib.parse
41 40

  
42 41
from django.conf import settings
43 42
from django.contrib import messages
......
338 337
    return messages
339 338

  
340 339

  
340
LDAP_DEACTIVATION_REASON_ORPHANED = 'ldap-orphaned'
341

  
342

  
341 343
class LDAPUser(User):
342 344
    SESSION_LDAP_DATA_KEY = 'ldap-data'
343 345
    _changed = False
......
1477 1479
        if not is_user_authenticable(user):
1478 1480
            return None
1479 1481

  
1482
        if not user.is_active and user.deactivation_reason == LDAP_DEACTIVATION_REASON_ORPHANED:
1483
            user.mark_as_active()
1484

  
1480 1485
        user_login_success(user.get_username())
1481 1486
        return user
1482 1487

  
......
1562 1567
            for eid in UserExternalId.objects.filter(
1563 1568
                external_id__in=eids, user__is_active=True, source=block['realm']
1564 1569
            ):
1565
                eid.user.mark_as_inactive()
1570
                eid.user.mark_as_inactive(reason=LDAP_DEACTIVATION_REASON_ORPHANED)
1566 1571
        # Handle users of old sources
1567 1572
        uei_qs = UserExternalId.objects.exclude(source__in=[block['realm'] for block in cls.get_config()])
1568 1573
        for user in User.objects.filter(userexternalid__in=uei_qs):
1569
            user.mark_as_inactive()
1574
            user.mark_as_inactive(reason=LDAP_DEACTIVATION_REASON_ORPHANED)
1570 1575

  
1571 1576
    @classmethod
1572 1577
    def ad_encoding(cls, s):
src/authentic2/custom_user/migrations/0027_user_deactivation_reason.py
1
# Generated by Django 2.2.23 on 2021-05-18 16:14
2

  
3
from django.db import migrations, models
4

  
5

  
6
class Migration(migrations.Migration):
7

  
8
    dependencies = [
9
        ('custom_user', '0026_remove_user_deleted'),
10
    ]
11

  
12
    operations = [
13
        migrations.AddField(
14
            model_name='user',
15
            name='deactivation_reason',
16
            field=models.TextField(blank=True, null=True, verbose_name='Deactivation reason'),
17
        ),
18
    ]
src/authentic2/custom_user/models.py
177 177
        verbose_name=_('Last account deletion alert'), null=True, blank=True
178 178
    )
179 179
    deactivation = models.DateTimeField(verbose_name=_('Deactivation datetime'), null=True, blank=True)
180
    deactivation_reason = models.TextField(verbose_name=_('Deactivation reason'), null=True, blank=True)
180 181

  
181 182
    objects = UserManager.from_queryset(UserQuerySet)()
182 183
    attributes = AttributesDescriptor()
......
360 361
            del self._a2_attributes_cache
361 362
        return super(User, self).refresh_from_db(*args, **kwargs)
362 363

  
363
    def mark_as_inactive(self, timestamp=None):
364
    def mark_as_active(self):
365
        self.is_active = True
366
        self.deactivation = None
367
        self.deactivation_reason = None
368
        self.save(update_fields=['is_active', 'deactivation', 'deactivation_reason'])
369

  
370
    def mark_as_inactive(self, timestamp=None, reason=None):
364 371
        self.is_active = False
365 372
        self.deactivation = timestamp or timezone.now()
366
        self.save(update_fields=['is_active', 'deactivation'])
373
        self.deactivation_reason = reason
374
        self.save(update_fields=['is_active', 'deactivation', 'deactivation_reason'])
367 375

  
368 376
    def set_random_password(self):
369 377
        self.set_password(base64.b64encode(os.urandom(32)).decode('ascii'))
tests/test_api.py
48 48

  
49 49
User = get_user_model()
50 50

  
51
USER_ATTRIBUTES_SET = set(
52
    [
53
        'ou',
54
        'id',
55
        'uuid',
56
        'is_staff',
57
        'is_superuser',
58
        'first_name',
59
        'first_name_verified',
60
        'last_name',
61
        'last_name_verified',
62
        'date_joined',
63
        'last_login',
64
        'username',
65
        'password',
66
        'email',
67
        'is_active',
68
        'title',
69
        'title_verified',
70
        'modified',
71
        'email_verified',
72
        'last_account_deletion_alert',
73
        'deactivation',
74
        'deactivation_reason',
75
    ]
76
)
77

  
51 78

  
52 79
def test_api_user_simple(logged_app):
53 80
    resp = logged_app.get('/api/user/')
......
497 524

  
498 525
    resp = app.post_json('/api/users/', params=payload, status=status)
499 526
    if api_user.is_superuser or api_user.roles.exists():
500
        assert (
501
            set(
502
                [
503
                    'ou',
504
                    'id',
505
                    'uuid',
506
                    'is_staff',
507
                    'is_superuser',
508
                    'first_name',
509
                    'first_name_verified',
510
                    'last_name',
511
                    'last_name_verified',
512
                    'date_joined',
513
                    'last_login',
514
                    'username',
515
                    'password',
516
                    'email',
517
                    'is_active',
518
                    'title',
519
                    'title_verified',
520
                    'modified',
521
                    'email_verified',
522
                    'last_account_deletion_alert',
523
                    'deactivation',
524
                ]
525
            )
526
            == set(resp.json.keys())
527
        )
527
        assert USER_ATTRIBUTES_SET == set(resp.json)
528 528
        assert resp.json['first_name'] == payload['first_name']
529 529
        assert resp.json['last_name'] == payload['last_name']
530 530
        assert resp.json['email'] == payload['email']
......
584 584

  
585 585
    resp = app.post_json('/api/users/', params=payload, status=status)
586 586
    if api_user.is_superuser or api_user.roles.exists():
587
        assert (
588
            set(
589
                [
590
                    'ou',
591
                    'id',
592
                    'uuid',
593
                    'is_staff',
594
                    'is_superuser',
595
                    'first_name',
596
                    'first_name_verified',
597
                    'last_name',
598
                    'last_name_verified',
599
                    'date_joined',
600
                    'last_login',
601
                    'username',
602
                    'password',
603
                    'email',
604
                    'is_active',
605
                    'title',
606
                    'title_verified',
607
                    'modified',
608
                    'email_verified',
609
                    'last_account_deletion_alert',
610
                    'deactivation',
611
                ]
612
            )
613
            == set(resp.json.keys())
614
        )
587
        assert USER_ATTRIBUTES_SET == set(resp.json)
615 588
        user = get_user_model().objects.get(pk=resp.json['id'])
616 589
        assert AttributeValue.objects.with_owner(user).filter(verified=True).count() == 3
617 590
        assert AttributeValue.objects.with_owner(user).filter(verified=False).count() == 0
......
761 734
        member.roles.add(role)
762 735
        resp = app.get('/api/roles/{0}/members/{1}/'.format(role.uuid, member.uuid))
763 736
        assert resp.json['uuid'] == member.uuid
764
        assert (
765
            set(
766
                [
767
                    'id',
768
                    'ou',
769
                    'date_joined',
770
                    'last_login',
771
                    'password',
772
                    'is_superuser',
773
                    'uuid',
774
                    'username',
775
                    'first_name',
776
                    'last_name',
777
                    'email',
778
                    'email_verified',
779
                    'is_staff',
780
                    'is_active',
781
                    'modified',
782
                    'last_account_deletion_alert',
783
                    'deactivation',
784
                    'first_name_verified',
785
                    'last_name_verified',
786
                ]
787
            )
788
            == set(resp.json.keys())
789
        )
737
        assert USER_ATTRIBUTES_SET == set(resp.json)
790 738
    else:
791 739
        assert resp.json['result'] == 0
792 740
        assert resp.json['errors'] == 'User not allowed to view role'
......
819 767
    # api call with nested users
820 768
    resp = app.get(url, params={'nested': 'true'})
821 769
    assert resp.json['username'] == 'admin.ou1'
822
    assert (
823
        set(
824
            [
825
                'ou',
826
                'id',
827
                'uuid',
828
                'is_staff',
829
                'is_superuser',
830
                'first_name',
831
                'first_name_verified',
832
                'last_name',
833
                'last_name_verified',
834
                'date_joined',
835
                'last_login',
836
                'username',
837
                'password',
838
                'email',
839
                'is_active',
840
                'modified',
841
                'email_verified',
842
                'last_account_deletion_alert',
843
                'deactivation',
844
                'birthdate',
845
                'birthdate_verified',
846
            ]
847
        )
848
        == set(resp.json)
849
    )
770
    assert USER_ATTRIBUTES_SET == set(resp.json)
850 771

  
851 772

  
852 773
def test_api_role_add_member(app, api_user, role, member):
......
1581 1502
    resp = app.get(url)
1582 1503
    assert len(resp.json['results']) > 0
1583 1504
    for user_dict in resp.json['results']:
1584
        assert (
1585
            set(
1586
                [
1587
                    'ou',
1588
                    'id',
1589
                    'uuid',
1590
                    'is_staff',
1591
                    'is_superuser',
1592
                    'first_name',
1593
                    'first_name_verified',
1594
                    'last_name',
1595
                    'last_name_verified',
1596
                    'date_joined',
1597
                    'last_login',
1598
                    'username',
1599
                    'password',
1600
                    'email',
1601
                    'is_active',
1602
                    'modified',
1603
                    'email_verified',
1604
                    'last_account_deletion_alert',
1605
                    'deactivation',
1606
                    'birthdate',
1607
                    'birthdate_verified',
1608
                ]
1609
            )
1610
            == set(user_dict.keys())
1611
        )
1505
        assert USER_ATTRIBUTES_SET == set(user_dict)
1612 1506
    assert [x['username'] for x in resp.json['results']] == ['john.doe']
1613 1507

  
1614 1508
    # api call with nested users
tests/test_ldap.py
253 253
    conn.delete_s(DN)
254 254

  
255 255
    ldap_backend.LDAPBackend.deactivate_orphaned_users()
256
    list(ldap_backend.LDAPBackend.get_users())
256 257

  
257 258
    assert (
258
        ldap_backend.UserExternalId.objects.filter(user__is_active=False, source=block['realm']).count() == 1
259
        ldap_backend.UserExternalId.objects.filter(
260
            user__is_active=False,
261
            source=block['realm'],
262
            user__deactivation__isnull=False,
263
            user__deactivation_reason='ldap-orphaned',
264
        ).count()
265
        == 1
259 266
    )
260 267

  
261 268
    # rename source realm
262
    settings.LDAP_AUTH_SETTINGS = [
263
        {'url': [slapd.ldap_url], 'basedn': 'o=ôrga', 'use_tls': False, 'realm': 'test'}
264
    ]
269
    settings.LDAP_AUTH_SETTINGS = []
270
    ldap_backend.LDAPBackend.deactivate_orphaned_users()
271
    list(ldap_backend.LDAPBackend.get_users())
272

  
273
    assert (
274
        ldap_backend.UserExternalId.objects.filter(
275
            user__is_active=False,
276
            source=block['realm'],
277
            user__deactivation__isnull=False,
278
            user__deactivation_reason='ldap-orphaned',
279
        ).count()
280
        == 6
281
    )
265 282

  
283
    # reactivate users
284
    settings.LDAP_AUTH_SETTINGS = [block]
285
    list(ldap_backend.LDAPBackend.get_users())
266 286
    ldap_backend.LDAPBackend.deactivate_orphaned_users()
267 287
    assert (
268
        ldap_backend.UserExternalId.objects.filter(user__is_active=False, source=block['realm']).count() == 6
288
        ldap_backend.UserExternalId.objects.filter(
289
            user__is_active=False,
290
            source=block['realm'],
291
            user__deactivation__isnull=False,
292
            user__deactivation_reason='ldap-orphaned',
293
        ).count()
294
        == 1
269 295
    )
296
    assert (
297
        User.objects.filter(
298
            is_active=True, deactivation_reason__isnull=True, deactivation__isnull=True
299
        ).count()
300
        == 5
301
    )
302
    assert User.objects.count() == 6
270 303

  
271 304

  
272 305
@pytest.mark.django_db
273
-