Projet

Général

Profil

0001-ldap-add-useful-output-to-sync-ldap-users-command-54.patch

Valentin Deniaud, 14 septembre 2021 18:27

Télécharger (8,77 ko)

Voir les différences:

Subject: [PATCH] ldap: add useful output to sync-ldap-users command (#54078)

 src/authentic2/backends/ldap_backend.py       | 30 +++++++++++++--
 .../management/commands/sync-ldap-users.py    | 22 +++++++++--
 tests/test_ldap.py                            | 37 +++++++++++++++----
 3 files changed, 75 insertions(+), 14 deletions(-)
src/authentic2/backends/ldap_backend.py
346 346
class LDAPUser(User):
347 347
    SESSION_LDAP_DATA_KEY = 'ldap-data'
348 348
    _changed = False
349
    _created = False
349 350

  
350 351
    class Meta:
351 352
        proxy = True
......
666 667
        # First get our configuration into a standard format
667 668
        for block in blocks:
668 669
            cls.update_default(block)
669
        log.debug('got config %r', blocks)
670 670
        return blocks
671 671

  
672 672
    @classmethod
......
1499 1499
        user.keep_password(password)
1500 1500
        self.populate_user(user, dn, username, conn, block, attributes)
1501 1501
        if not user.pk or getattr(user, '_changed', False):
1502
            user._created = bool(not user.pk)
1502 1503
            user.save()
1503 1504

  
1504 1505
        if not is_user_authenticable(user):
......
1543 1544

  
1544 1545
    @classmethod
1545 1546
    def get_users(cls):
1546
        for block in cls.get_config():
1547
        blocks = cls.get_config()
1548
        if not blocks:
1549
            log.info('No LDAP server configured.')
1550
            return
1551
        for block in blocks:
1552
            log.info('Synchronising users from realm "%s"', block['realm'])
1547 1553
            conn = cls.get_connection(block)
1548 1554
            if conn is None:
1549 1555
                log.warning('unable to synchronize with LDAP servers %s', force_text(block['url']))
......
1556 1562
                conn, user_basedn, ldap.SCOPE_SUBTREE, user_filter, attrlist=attribute_names
1557 1563
            )
1558 1564
            backend = cls()
1565
            count = 0
1559 1566
            for dn, attrs in results:
1560
                yield backend._return_user(dn, None, conn, block, attrs)
1567
                count += 1
1568
                user = backend._return_user(dn, None, conn, block, attrs)
1569
                if user._changed or user._created:
1570
                    log.info(
1571
                        '%s user %s (uuid %s) from %s',
1572
                        'Created' if user._created else 'Updated',
1573
                        user.get_username(),
1574
                        user.uuid,
1575
                        ', '.join('%s=%s' % (k, v) for k, v in attrs.items()),
1576
                    )
1577
                yield user
1578
            log.info('Search for %s returned %s users.', user_filter, count)
1561 1579

  
1562 1580
    @classmethod
1563 1581
    def deactivate_orphaned_users(cls):
......
1721 1739
    @classmethod
1722 1740
    def bind(cls, block, conn, credentials=()):
1723 1741
        '''Bind to the LDAP server'''
1742
        ldap_uri = conn.get_option(ldap.OPT_URI)
1724 1743
        try:
1725 1744
            if credentials:
1726 1745
                who, password = credentials[0], credentials[1]
1727 1746
                password = force_text(password)
1728 1747
                conn.simple_bind_s(who, password)
1748
                log_message = 'with user %s' % who
1729 1749
            elif block['bindsasl']:
1730 1750
                sasl_mech, who, sasl_params = map_text(block['bindsasl'])
1731 1751
                handler_class = getattr(ldap.sasl, sasl_mech)
1732 1752
                auth = handler_class(*sasl_params)
1733 1753
                conn.sasl_interactive_bind_s(who, auth)
1754
                log_message = 'with account %s' % who
1734 1755
            elif block['binddn'] and block['bindpw']:
1735 1756
                who = force_text(block['binddn'])
1736 1757
                conn.simple_bind_s(who, force_text(block['bindpw']))
1758
                log_message = 'with binddn %s' % who
1737 1759
            else:
1738 1760
                who = 'anonymous'
1739 1761
                conn.simple_bind_s()
1762
                log_message = 'anonymously'
1763
            log.info('Binding to server %s (%s)', ldap_uri, log_message)
1740 1764
            return True, None
1741 1765
        except ldap.INVALID_CREDENTIALS:
1742 1766
            return False, 'invalid credentials'
src/authentic2/management/commands/sync-ldap-users.py
21 21
except ImportError:
22 22
    ldap = None
23 23

  
24
import logging
25

  
24 26
from django.core.management.base import BaseCommand
25 27

  
26 28
from authentic2.backends.ldap_backend import LDAPBackend
......
28 30

  
29 31
class Command(BaseCommand):
30 32
    def handle(self, *args, **kwargs):
33
        root_logger = logging.getLogger()
34
        ldap_logger = logging.getLogger('authentic2.backends.ldap_backend')
35

  
36
        # ensure log messages are displayed only once on terminal
37
        stream_handlers = [x for x in root_logger.handlers if isinstance(x, logging.StreamHandler)]
38
        if not any(handler.stream.isatty() for handler in stream_handlers):
39
            ldap_logger.addHandler(logging.StreamHandler())
40

  
31 41
        verbosity = int(kwargs['verbosity'])
32
        if verbosity > 1:
33
            print('Updated users :')
42
        if verbosity == 1:
43
            ldap_logger.setLevel(logging.ERROR)
44
        elif verbosity == 2:
45
            ldap_logger.setLevel(logging.INFO)
46
        elif verbosity == 3:
47
            ldap_logger.setLevel(logging.DEBUG)
48

  
34 49
        for user in LDAPBackend.get_users():
35
            if getattr(user, '_changed', False) and verbosity > 1:
36
                print(' -', user.uuid, user.get_username(), user.get_full_name())
50
            continue
tests/test_ldap.py
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17 17
import json
18
import logging
18 19
import os
19 20
import time
20 21
import urllib.parse
......
1605 1606
    assert '_auth_user_id' in app.session
1606 1607

  
1607 1608

  
1608
def test_sync_ldap_users(slapd, settings, app, db, capsys):
1609
def test_sync_ldap_users(slapd, settings, app, db, caplog):
1610
    caplog.set_level(logging.DEBUG)  # force pytest to reset log level after test
1611

  
1612
    management.call_command('sync-ldap-users')
1613
    assert len(caplog.records) == 0
1614

  
1615
    management.call_command('sync-ldap-users', verbosity=2)
1616
    assert len(caplog.records) == 1
1617
    assert caplog.records[0].message == 'No LDAP server configured.'
1618

  
1609 1619
    settings.LDAP_AUTH_SETTINGS = [
1610 1620
        {
1611 1621
            'url': [slapd.ldap_url],
......
1633 1643
    )
1634 1644

  
1635 1645
    assert User.objects.count() == 0
1636
    capsys.readouterr()
1646
    caplog.clear()
1637 1647
    management.call_command('sync-ldap-users', verbosity=2)
1638
    assert len(capsys.readouterr().out.splitlines()) == 7
1648
    assert len(caplog.records) == 9
1649
    assert caplog.messages[0] == 'Synchronising users from realm "ldap"'
1650
    assert caplog.messages[1] == 'Binding to server %s (anonymously)' % slapd.ldap_url
1651
    assert (
1652
        caplog.messages[2]
1653
        == "Updated user etienne.michu@ldap (uuid %s) from dn=cn=Étienne Michu,o=ôrga, uid=['etienne.michu'], sn=['Michu'], givenname=['Étienne'], l=['Paris'], mail=['etienne.michu@example.net']"
1654
        % User.objects.first().uuid
1655
    )
1656
    assert caplog.messages[-1] == 'Search for (|(mail=*)(uid=*)) returned 6 users.'
1657

  
1639 1658
    assert User.objects.count() == 6
1640 1659
    assert all(user.first_name == 'Étienne' for user in User.objects.all())
1641 1660
    assert all(user.attributes.first_name == 'Étienne' for user in User.objects.all())
......
1652 1671
            for user in User.objects.all()
1653 1672
        ]
1654 1673
    )
1655
    capsys.readouterr()
1656
    management.call_command('sync-ldap-users', verbosity=2)
1657
    assert len(capsys.readouterr().out.splitlines()) == 1
1674

  
1675
    User.objects.update(first_name='John')
1676
    caplog.clear()
1677
    management.call_command('sync-ldap-users', verbosity=3)
1678
    assert len(caplog.records) == 39
1658 1679

  
1659 1680

  
1660 1681
def test_alert_on_wrong_user_filter(slapd, settings, client, db, caplog):
......
1819 1840
    }
1820 1841

  
1821 1842

  
1822
def test_switch_user_ldap_user(slapd, settings, app, db):
1843
def test_switch_user_ldap_user(slapd, settings, app, db, caplog):
1844
    caplog.set_level(logging.DEBUG)  # force pytest to reset log level after test
1845

  
1823 1846
    settings.LDAP_AUTH_SETTINGS = [
1824 1847
        {
1825 1848
            'url': [slapd.ldap_url],
1826
-