Projet

Général

Profil

0001-ldap-add-useful-output-to-sync-ldap-users-command-54.patch

Valentin Deniaud, 14 septembre 2021 11:22

Télécharger (8,55 ko)

Voir les différences:

Subject: [PATCH] ldap: add useful output to sync-ldap-users command (#54078)

 src/authentic2/backends/ldap_backend.py       | 28 +++++++++++-
 .../management/commands/sync-ldap-users.py    | 21 ++++++---
 tests/test_ldap.py                            | 45 ++++++++++++++++---
 3 files changed, 80 insertions(+), 14 deletions(-)
src/authentic2/backends/ldap_backend.py
76 76
    '/var/lib/ca-certificates/ca-bundle.pem',  # OpenSuse
77 77
]
78 78

  
79
INTERMEDIATE_DEBUG_LEVEL = logging.DEBUG + 1
80

  
79 81

  
80 82
# Select a system certificate store
81 83
for bundle_path in CA_BUNDLE_PATHS:
......
1543 1545

  
1544 1546
    @classmethod
1545 1547
    def get_users(cls):
1546
        for block in cls.get_config():
1548
        blocks = cls.get_config()
1549
        if not blocks:
1550
            log.log(INTERMEDIATE_DEBUG_LEVEL, 'No LDAP server configured.')
1551
            return
1552
        for block in blocks:
1553
            log.log(INTERMEDIATE_DEBUG_LEVEL, 'Synchronising users from realm "%s"', block['realm'])
1547 1554
            conn = cls.get_connection(block)
1548 1555
            if conn is None:
1549 1556
                log.warning('unable to synchronize with LDAP servers %s', force_text(block['url']))
......
1552 1559
            user_basedn = force_text(block.get('user_basedn') or block['basedn'])
1553 1560
            user_filter = cls.get_sync_ldap_user_filter(block)
1554 1561
            attribute_names = cls.get_ldap_attributes_names(block)
1562
            log.log(INTERMEDIATE_DEBUG_LEVEL, 'Searching for %s', user_filter)
1555 1563
            results = cls.paged_search(
1556 1564
                conn, user_basedn, ldap.SCOPE_SUBTREE, user_filter, attrlist=attribute_names
1557 1565
            )
1558 1566
            backend = cls()
1567
            count = 0
1559 1568
            for dn, attrs in results:
1560
                yield backend._return_user(dn, None, conn, block, attrs)
1569
                count += 1
1570
                user = backend._return_user(dn, None, conn, block, attrs)
1571
                if getattr(user, '_changed', False):
1572
                    log.debug(
1573
                        'Updated user %s (username %s, full name %s)',
1574
                        user.uuid,
1575
                        user.get_username(),
1576
                        user.get_full_name(),
1577
                    )
1578
                yield user
1579
            log.log(INTERMEDIATE_DEBUG_LEVEL, 'Server returned %s users.', count)
1561 1580

  
1562 1581
    @classmethod
1563 1582
    def deactivate_orphaned_users(cls):
......
1711 1730
            user_credentials = block['connect_with_user_credentials'] and credentials
1712 1731
            success, error = cls.bind(block, conn, credentials=user_credentials)
1713 1732
            if success:
1733
                log.log(INTERMEDIATE_DEBUG_LEVEL, 'Connected to server %s', url)
1714 1734
                yield conn
1715 1735
            else:
1716 1736
                if block['replicas']:
......
1726 1746
                who, password = credentials[0], credentials[1]
1727 1747
                password = force_text(password)
1728 1748
                conn.simple_bind_s(who, password)
1749
                log.log(INTERMEDIATE_DEBUG_LEVEL, 'Binding with %s account: success', who)
1729 1750
            elif block['bindsasl']:
1730 1751
                sasl_mech, who, sasl_params = map_text(block['bindsasl'])
1731 1752
                handler_class = getattr(ldap.sasl, sasl_mech)
1732 1753
                auth = handler_class(*sasl_params)
1733 1754
                conn.sasl_interactive_bind_s(who, auth)
1755
                log.log(INTERMEDIATE_DEBUG_LEVEL, 'Binding with %s account: success', who)
1734 1756
            elif block['binddn'] and block['bindpw']:
1735 1757
                who = force_text(block['binddn'])
1736 1758
                conn.simple_bind_s(who, force_text(block['bindpw']))
1759
                log.log(INTERMEDIATE_DEBUG_LEVEL, 'Binding with %s binddn: success', who)
1737 1760
            else:
1738 1761
                who = 'anonymous'
1739 1762
                conn.simple_bind_s()
1763
                log.log(INTERMEDIATE_DEBUG_LEVEL, 'Binding anonymously: success')
1740 1764
            return True, None
1741 1765
        except ldap.INVALID_CREDENTIALS:
1742 1766
            return False, 'invalid credentials'
src/authentic2/management/commands/sync-ldap-users.py
21 21
except ImportError:
22 22
    ldap = None
23 23

  
24
import logging
25

  
24 26
from django.core.management.base import BaseCommand
25 27

  
26
from authentic2.backends.ldap_backend import LDAPBackend
28
from authentic2.backends.ldap_backend import INTERMEDIATE_DEBUG_LEVEL, LDAPBackend
29

  
30
logger = logging.getLogger(__name__)
27 31

  
28 32

  
29 33
class Command(BaseCommand):
30 34
    def handle(self, *args, **kwargs):
35
        root_logger = logging.getLogger()
36
        root_logger.addHandler(logging.StreamHandler())
37

  
31 38
        verbosity = int(kwargs['verbosity'])
32
        if verbosity > 1:
33
            print('Updated users :')
39
        if verbosity == 0:
40
            root_logger.setLevel(logging.WARNING)
41
        elif verbosity == 1:
42
            root_logger.setLevel(INTERMEDIATE_DEBUG_LEVEL)
43
        elif verbosity == 2:
44
            root_logger.setLevel(logging.DEBUG)
45

  
34 46
        for user in LDAPBackend.get_users():
35
            if getattr(user, '_changed', False) and verbosity > 1:
36
                print(' -', user.uuid, user.get_username(), user.get_full_name())
47
            continue
tests/test_ldap.py
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17 17
import json
18
import logging
18 19
import os
19 20
import time
20 21
import urllib.parse
......
1605 1606
    assert '_auth_user_id' in app.session
1606 1607

  
1607 1608

  
1608
def test_sync_ldap_users(slapd, settings, app, db, capsys):
1609
def test_sync_ldap_users(slapd, settings, app, db, caplog):
1610
    caplog.set_level(logging.DEBUG)  # force pytest to reset log level after test
1611

  
1612
    management.call_command('sync-ldap-users')
1613
    assert len(caplog.records) == 1
1614
    assert caplog.records[0].message == 'No LDAP server configured.'
1615

  
1609 1616
    settings.LDAP_AUTH_SETTINGS = [
1610 1617
        {
1611 1618
            'url': [slapd.ldap_url],
......
1633 1640
    )
1634 1641

  
1635 1642
    assert User.objects.count() == 0
1636
    capsys.readouterr()
1637
    management.call_command('sync-ldap-users', verbosity=2)
1638
    assert len(capsys.readouterr().out.splitlines()) == 7
1643
    caplog.clear()
1644
    management.call_command('sync-ldap-users')
1645
    assert caplog.messages == [
1646
        'Synchronising users from realm "ldap"',
1647
        'Binding anonymously: success',
1648
        'Connected to server %s' % slapd.ldap_url,
1649
        'Searching for (|(mail=*)(uid=*))',
1650
        'Server returned 6 users.',
1651
    ]
1652

  
1639 1653
    assert User.objects.count() == 6
1640 1654
    assert all(user.first_name == 'Étienne' for user in User.objects.all())
1641 1655
    assert all(user.attributes.first_name == 'Étienne' for user in User.objects.all())
......
1652 1666
            for user in User.objects.all()
1653 1667
        ]
1654 1668
    )
1655
    capsys.readouterr()
1669

  
1670
    User.objects.update(first_name='John')
1671
    caplog.clear()
1672
    management.call_command('sync-ldap-users', verbosity=2)
1673
    assert len(caplog.records) == 42
1674
    assert (
1675
        caplog.records[10].message
1676
        == 'Updated user %s (username etienne.michu@ldap, full name Étienne Michu)'
1677
        % User.objects.first().uuid
1678
    )
1679

  
1680
    caplog.clear()
1656 1681
    management.call_command('sync-ldap-users', verbosity=2)
1657
    assert len(capsys.readouterr().out.splitlines()) == 1
1682
    assert len(caplog.records) == 36  # users have not been updated
1683

  
1684
    caplog.clear()
1685
    management.call_command('sync-ldap-users', verbosity=0)
1686
    assert len(caplog.records) == 0
1658 1687

  
1659 1688

  
1660 1689
def test_alert_on_wrong_user_filter(slapd, settings, client, db, caplog):
......
1819 1848
    }
1820 1849

  
1821 1850

  
1822
def test_switch_user_ldap_user(slapd, settings, app, db):
1851
def test_switch_user_ldap_user(slapd, settings, app, db, caplog):
1852
    caplog.set_level(logging.DEBUG)  # force pytest to reset log level after test
1853

  
1823 1854
    settings.LDAP_AUTH_SETTINGS = [
1824 1855
        {
1825 1856
            'url': [slapd.ldap_url],
1826
-