Projet

Général

Profil

0001-ldap-use-guid-attributes-as-global-external-id-63646.patch

Benjamin Dauvergne, 07 avril 2022 10:20

Télécharger (26,2 ko)

Voir les différences:

Subject: [PATCH 1/2] ldap: use guid attributes as global external id (#63646)

 src/authentic2/backends/ldap_backend.py       | 167 ++++++++++++++----
 .../migrations/0038_add_external_guid.py      |  36 ++++
 src/authentic2/models.py                      |  18 +-
 tests/test_ldap.py                            | 108 ++++++++---
 4 files changed, 267 insertions(+), 62 deletions(-)
 create mode 100644 src/authentic2/migrations/0038_add_external_guid.py
src/authentic2/backends/ldap_backend.py
24 24
import ssl
25 25
import time
26 26
import urllib.parse
27
import uuid
27 28

  
28 29
import ldap
29 30
import ldap.modlist
......
34 35
from django.contrib.auth.models import Group
35 36
from django.core.cache import cache
36 37
from django.core.exceptions import ImproperlyConfigured
38
from django.db.models import Q
37 39
from django.db.transaction import atomic
38 40
from django.utils.encoding import force_bytes, force_text
39 41
from django.utils.translation import ngettext
......
71 73
    '/var/lib/ca-certificates/ca-bundle.pem',  # OpenSuse
72 74
]
73 75

  
76
USUAL_GUID_ATTRIBUTES = ['entryuuid', 'objectguid', 'nsuniqueid']
77

  
74 78

  
75 79
class UserCreationError(Exception):
76 80
    pass
......
123 127
    @to_list
124 128
    def _convert_results_to_unicode(self, result_list):
125 129
        for dn, attrs in result_list:
126
            if dn is not None:
127
                # tuple is a real entry with a DN not a search reference
128
                attrs = {attribute: filter_non_unicode_values(attrs[attribute]) for attribute in attrs}
129
                yield dn, attrs
130
            if dn is None:
131
                continue
132
            new_attrs = {}
133
            for attribute in attrs:
134
                values = attrs[attribute]
135
                # specialize for GUID attributes
136
                if attribute.lower() in USUAL_GUID_ATTRIBUTES and len(values[0]) == 16:
137
                    try:
138
                        values = [str(uuid.UUID(bytes=values[0])).encode()]
139
                    except ValueError:
140
                        values = []
141
                values = filter_non_unicode_values(values)
142
                if not values:
143
                    continue
144
                new_attrs[attribute] = values
145
            yield dn, new_attrs
130 146

  
131 147
    def modify_s(self, dn, modlist):
132 148
        new_modlist = []
......
482 498
        # generated username are unique
483 499
        'update_username': False,
484 500
        # lookup existing user with an external id build with attributes
485
        'lookups': ('external_id', 'username', 'email'),
501
        'lookups': ('guid', 'external_id', 'username', 'email'),
486 502
        'external_id_tuples': (
487 503
            ('uid',),
488 504
            ('dn:noquote',),
......
745 761
                try:
746 762
                    return self._return_user(authz_id, password, conn, block)
747 763
                except UserCreationError as e:
748
                    messages.error(request, str(e))
764
                    if request:
765
                        messages.error(request, str(e))
749 766
                    return None
750 767
            except ldap.CONNECT_ERROR:
751 768
                log.error(
......
1095 1112
            for key in at_mapping:
1096 1113
                if at_mapping[key] != 'dn':
1097 1114
                    attributes.add(at_mapping[key])
1115
        # add usual GUID attributes
1116
        attributes.update(USUAL_GUID_ATTRIBUTES)
1098 1117
        return list({attribute.lower() for attribute in attributes})
1099 1118

  
1100 1119
    @classmethod
......
1350 1369
            return None
1351 1370

  
1352 1371
    def _lookup_by_external_id(self, block, attributes):
1372
        realm = block['realm']
1353 1373
        for eid_tuple in map_text(block['external_id_tuples']):
1354 1374
            external_id = self.build_external_id(eid_tuple, attributes)
1355 1375
            if not external_id:
......
1359 1379
                LDAPUser.objects.prefetch_related('groups')
1360 1380
                .filter(
1361 1381
                    userexternalid__external_id__iexact=external_id,
1362
                    userexternalid__source=force_text(block['realm']),
1382
                    userexternalid__source=realm,
1363 1383
                )
1364 1384
                .order_by('-last_login')
1365 1385
            )
......
1373 1393
                        len(users),
1374 1394
                    )
1375 1395
                    for other in users[1:]:
1376
                        for r in other.roles.all():
1377
                            user.roles.add(r)
1396
                        user.roles.add(*other.roles.all())
1378 1397
                        other.delete()
1379 1398
                return user
1380 1399
        return None
1381 1400

  
1401
    def _lookup_by_external_guid(self, block, attribute, guid):
1402
        if not guid:
1403
            return None
1404
        log.debug('ldap: lookup by external_guid %s=%s', attribute, guid)
1405
        try:
1406
            return LDAPUser.objects.get(
1407
                userexternalid__source=block['realm'], userexternalid__external_guid=guid
1408
            )
1409
        except LDAPUser.DoesNotExist:
1410
            return None
1411

  
1412
    # entryuuid is encoded as the string representation of the UUID, as an octet string
1413
    @classmethod
1414
    def _decode_entryuuid_guid(cls, value):
1415
        try:
1416
            return uuid.UUID(value)
1417
        except (ValueError, UnicodeDecodeError):
1418
            return None
1419

  
1420
    # objectid is encoded as the byte representation of the UUID
1421
    _decode_objectguid_guid = _decode_entryuuid_guid
1422

  
1423
    # nsuniqueid is encoded as the byte representation of the UUID
1424
    _decode_nsuniqueid_guid = _decode_objectguid_guid
1425

  
1426
    def _lookup_by_guid(self, block, attributes):
1427
        attribute, guid = self._get_guid(attributes)
1428
        return self._lookup_by_external_guid(block=block, attribute=attribute, guid=guid)
1429

  
1430
    @classmethod
1431
    def _get_guid(cls, attributes):
1432
        for attribute in USUAL_GUID_ATTRIBUTES:
1433
            if attribute not in attributes:
1434
                continue
1435
            value = attributes[attribute][0]
1436
            guid = getattr(cls, f'_decode_{attribute}_guid')(value)
1437
            if guid:
1438
                return attribute, guid
1439
        return None, None
1440

  
1382 1441
    def _lookup_existing_user(self, username, block, attributes):
1383 1442
        user = None
1384 1443
        ou = self._get_target_ou(block)
......
1393 1452
                user = self._lookup_by_external_id(block=block, attributes=attributes)
1394 1453
            elif lookup_type == 'email' and attributes:
1395 1454
                user = self._lookup_by_email(ou=ou, block=block, attributes=attributes)
1455
            elif lookup_type == 'guid' and attributes:
1456
                user = self._lookup_by_guid(block=block, attributes=attributes)
1396 1457
            if user:
1397 1458
                return user
1398 1459

  
1399 1460
    def update_user_identifiers(self, user, username, block, attributes):
1461
        realm = block['realm']
1400 1462
        # if username has changed and we propagate those changes, update it
1401 1463
        if block['update_username']:
1402 1464
            if user.username != username:
......
1406 1468
                log_msg = 'updating username from %r to %r'
1407 1469
                log.debug(log_msg, old_username, user.username)
1408 1470
        # if external_id lookup is used, update it
1471
        userexternalid = None
1472
        use_guid = False
1473
        use_external_id = False
1474
        guid = None
1475
        external_id = None
1476
        if 'guid' in block['lookups']:
1477
            use_guid = True
1478
            _attribute, guid = self._get_guid(attributes)
1479

  
1480
        if guid and user.pk:
1481
            if guid:
1482
                try:
1483
                    userexternalid = UserExternalId.objects.get(user=user, external_guid=guid, source=realm)
1484
                except UserExternalId.DoesNotExist:
1485
                    pass
1486

  
1409 1487
        if (
1410 1488
            'external_id' in block['lookups']
1411 1489
            and block.get('external_id_tuples')
1412 1490
            and block['external_id_tuples'][0]
1413 1491
        ):
1414
            if not user.pk:
1415
                user.save()
1416
                user._changed = False
1492
            use_external_id = True
1417 1493
            external_id = self.build_external_id(map_text(block['external_id_tuples'][0]), attributes)
1418
            if external_id:
1419
                new, dummy = UserExternalId.objects.get_or_create(
1420
                    user=user, external_id=external_id, source=force_text(block['realm'])
1494

  
1495
        if external_id and user.pk:
1496
            try:
1497
                userexternalid = UserExternalId.objects.get(
1498
                    user=user, external_id__iexact=external_id, source=realm
1421 1499
                )
1422
                if block['clean_external_id_on_update']:
1423
                    UserExternalId.objects.exclude(id=new.id).filter(
1424
                        user=user, source=force_text(block['realm'])
1425
                    ).delete()
1426
            elif user._created:
1500
            except UserExternalId.DoesNotExist:
1501
                pass
1502

  
1503
        if userexternalid:
1504
            changed = False
1505
            if userexternalid.external_guid != guid:
1506
                userexternalid.external_guid = guid
1507
                changed = True
1508
            if userexternalid.external_id != external_id:
1509
                userexternalid.external_id = external_id
1510
                changed = True
1511
            if changed:
1512
                userexternalid.save()
1513
        elif use_guid or use_external_id:
1514
            if not guid and not external_id:
1427 1515
                log.error(
1428
                    'ldap: unable to build an external_id (%r) with attributes: %r',
1429
                    block['external_id_tuples'][0],
1516
                    'ldap: unable to build an user_external_id (%r) with attributes: %r',
1517
                    block['external_id_tuples'],
1430 1518
                    attributes,
1431 1519
                )
1432 1520
                raise UserCreationError(_('LDAP configuration is broken, please contact your administrator'))
1433
            else:
1434
                log.error(
1435
                    'ldap: unable to update an external_id (%r) with attributes: %r',
1436
                    block['external_id_tuples'][0],
1437
                    attributes,
1438
                )
1521
            if not user.pk:
1522
                user._changed = False
1523
                user.save()
1524
            UserExternalId.objects.create(
1525
                user=user, source=realm, external_id=external_id, external_guid=guid
1526
            )
1439 1527

  
1440 1528
    def _record_failure_for_user(self, request, reason, user_id, block, conn, attributes=None):
1441 1529
        user = None
......
1589 1677
                    'external_id', flat=True
1590 1678
                )
1591 1679
            )
1680
            guids = set(
1681
                UserExternalId.objects.filter(user__is_active=True, source=block['realm']).values_list(
1682
                    'external_guid', flat=True
1683
                )
1684
            )
1592 1685
            basedn = force_text(block.get('user_basedn') or block['basedn'])
1593
            attribute_names = [
1594
                a[0] for a in cls.attribute_name_from_external_id_tuple(block['external_id_tuples'])
1595
            ]
1686
            attribute_names = list(
1687
                {a[0] for a in cls.attribute_name_from_external_id_tuple(block['external_id_tuples'])}
1688
                | set(USUAL_GUID_ATTRIBUTES)
1689
            )
1596 1690
            user_filter = cls.get_sync_ldap_user_filter(block)
1597 1691
            results = cls.paged_search(
1598 1692
                conn, basedn, ldap.SCOPE_SUBTREE, user_filter, attrlist=attribute_names
......
1600 1694
            for dn, attrs in results:
1601 1695
                data = attrs.copy()
1602 1696
                data['dn'] = dn
1697
                _attribute, guid = cls._get_guid(data)
1698
                if guid and guid in guids:
1699
                    guids.discard(guid)
1603 1700
                for eid_tuple in map_text(block['external_id_tuples']):
1604 1701
                    backend = cls()
1605 1702
                    external_id = backend.build_external_id(eid_tuple, data)
......
1608 1705
                            eids.remove(external_id)
1609 1706
                        except ValueError:
1610 1707
                            pass
1611
            for eid in UserExternalId.objects.filter(
1612
                external_id__in=eids, user__is_active=True, source=block['realm']
1708
            for eid in UserExternalId.objects.filter(user__is_active=True, source=block['realm']).filter(
1709
                Q(external_id__in=eids, external_guid__isnull=True)
1710
                | Q(external_guid__in=guids, external_id__isnull=True)
1711
                | Q(external_guid__in=guids, external_id__in=eids)
1613 1712
            ):
1614 1713
                if eid.user.is_active:
1615 1714
                    eid.user.mark_as_inactive(reason=LDAP_DEACTIVATION_REASON_NOT_PRESENT)
......
1896 1995
            return
1897 1996
        for user_external_id in user.userexternalid_set.all():
1898 1997
            external_id = user_external_id.external_id
1998
            if not external_id:
1999
                continue
1899 2000
            for block in config:
1900 2001
                if user_external_id.source != force_text(block['realm']):
1901 2002
                    continue
src/authentic2/migrations/0038_add_external_guid.py
1
# Generated by Django 2.2.27 on 2022-04-06 21:11
2

  
3
from django.db import migrations, models
4

  
5

  
6
class Migration(migrations.Migration):
7

  
8
    dependencies = [
9
        ('authentic2', '0037_auto_20220331_1513'),
10
    ]
11

  
12
    operations = [
13
        migrations.AddField(
14
            model_name='userexternalid',
15
            name='external_guid',
16
            field=models.UUIDField(null=True, verbose_name='External GUID'),
17
        ),
18
        migrations.AlterField(
19
            model_name='userexternalid',
20
            name='external_id',
21
            field=models.CharField(max_length=256, null=True, verbose_name='external id'),
22
        ),
23
        migrations.AlterUniqueTogether(
24
            name='userexternalid',
25
            unique_together={('source', 'external_guid'), ('source', 'external_id')},
26
        ),
27
        migrations.AddConstraint(
28
            model_name='userexternalid',
29
            constraint=models.CheckConstraint(
30
                check=models.Q(
31
                    ('external_id__isnull', False), ('external_guid__isnull', False), _connector='OR'
32
                ),
33
                name='at_least_one_id',
34
            ),
35
        ),
36
    ]
src/authentic2/models.py
49 49
class UserExternalId(models.Model):
50 50
    user = models.ForeignKey(settings.AUTH_USER_MODEL, verbose_name=_('user'), on_delete=models.CASCADE)
51 51
    source = models.CharField(max_length=256, verbose_name=_('source'))
52
    external_id = models.CharField(max_length=256, verbose_name=_('external id'))
52
    external_id = models.CharField(max_length=256, verbose_name=_('external id'), null=True)
53
    external_guid = models.UUIDField(verbose_name=_('External GUID'), null=True)
53 54
    created = models.DateTimeField(auto_now_add=True, verbose_name=_('creation date'))
54 55
    updated = models.DateTimeField(auto_now=True, verbose_name=_('last update date'))
55 56

  
56 57
    def __str__(self):
57
        return f'{self.user} is {self.external_id} on {self.source}'
58
        return f'{self.user} is {self.external_id or self.external_guid} on {self.source}'
58 59

  
59 60
    def __repr__(self):
60
        return '<UserExternalId user: {!r} source: {!r} external_id: {!r} created: {} updated: {}'.format(
61
            self.user_id, self.source, self.external_id, self.created, self.updated
61
        return (
62
            f'<UserExternalId user: {self.user!r} source: {self.source!r}'
63
            f"{f' external_id: {self.external_id!r}' if self.external_id else ''}"
64
            f"{f' external_guid: {self.external_guid!r}' if self.external_guid else ''}"
65
            f' created: {self.created} updated: {self.updated}'
62 66
        )
63 67

  
64 68
    class Meta:
......
66 70
        verbose_name_plural = _('user external ids')
67 71
        unique_together = [
68 72
            ('source', 'external_id'),
73
            ('source', 'external_guid'),
74
        ]
75
        constraints = [
76
            models.CheckConstraint(
77
                check=Q(external_id__isnull=False) | Q(external_guid__isnull=False), name='at_least_one_id'
78
            ),
69 79
        ]
70 80

  
71 81

  
tests/test_ldap.py
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import base64
17 18
import json
18 19
import logging
19 20
import os
......
54 55
UPASS = 'passé'
55 56
EMAIL = 'etienne.michu@example.net'
56 57
CARLICENSE = '123445ABC'
58
UUID = '8ff2f34a-4a36-103c-8d0a-e3a0333484d3'
59
OBJECTGUID_RAW = b'\xab' * 16
60
OBJECTGUID_B64 = base64.b64encode(OBJECTGUID_RAW).decode()
57 61

  
58 62
CN_INCOMPLETE = 'Jean Dupond'
59 63
DN_INCOMPLETE = 'cn=%s,o=ôrga' % escape_dn_chars(CN_INCOMPLETE)
......
80 84
        yield s
81 85

  
82 86

  
87
OBJECTGUID_SCHEMA = '''\
88
dn: cn=objectguid,cn=schema,cn=config
89
objectClass: olcSchemaConfig
90
cn: objectguid
91
olcAttributeTypes: ( 1.3.6.1.4.1.36560.1.1.3.2 NAME 'objectGUID'
92
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.40 )
93
olcObjectClasses: ( 1.3.6.1.4.1.36560.1.1.3.1 NAME 'objectWithObjectGuid'
94
  MAY ( objectGUID )
95
  SUP top AUXILIARY )
96
'''
97

  
98

  
83 99
@pytest.fixture
84 100
def slapd_ppolicy():
85 101
    with create_slapd() as slapd:
......
749 765
            'group_to_role_mapping': [
750 766
                ['cn=unknown,o=dn', ['Role2']],
751 767
            ],
768
            'lookups': ['external_id', 'username'],
752 769
        }
753 770
    ]
754 771
    save = mock.Mock(wraps=ldap_backend.LDAPUser.save)
......
1771 1788
def test_sync_ldap_users(slapd, settings, app, db, caplog):
1772 1789
    caplog.set_level('INFO')
1773 1790

  
1791
    conn = slapd.get_connection_admin()
1792
    entryuuid = conn.search_s('o=ôrga', ldap.SCOPE_SUBTREE, f'(uid={UID})', ['entryUUID'])[0][1]['entryUUID'][
1793
        0
1794
    ].decode()
1774 1795
    management.call_command('sync-ldap-users')
1775 1796
    assert caplog.records[0].message == 'No LDAP server configured.'
1776 1797

  
......
1808 1829
    assert caplog.records[2].message == (
1809 1830
        (
1810 1831
            "Created user etienne.michu@ldap (uuid %s) from dn=cn=Étienne Michu,o=ôrga, uid=['etienne.michu'], "
1811
            "sn=['Michu'], givenname=['Étienne'], l=['Paris'], mail=['etienne.michu@example.net']"
1832
            "sn=['Michu'], givenname=['Étienne'], l=['Paris'], mail=['etienne.michu@example.net'], entryuuid=['%s']"
1812 1833
        )
1813
        % User.objects.first().uuid
1834
        % (User.objects.first().uuid, entryuuid)
1814 1835
    )
1815 1836
    assert caplog.records[-1].message == 'Search for (|(mail=*)(uid=*)) returned 6 users.'
1816 1837

  
......
1834 1855
    caplog.clear()
1835 1856
    User.objects.update(first_name='John')
1836 1857
    management.call_command('sync-ldap-users', verbosity=3)
1837
    assert (
1838
        caplog.records[2].message
1839
        == (
1840
            "Updated user etienne.michu@ldap (uuid %s) from dn=cn=Étienne Michu,o=ôrga, uid=['etienne.michu'], "
1841
            "sn=['Michu'], givenname=['Étienne'], l=['Paris'], mail=['etienne.michu@example.net']"
1842
        )
1843
        % User.objects.first().uuid
1844
    )
1858
    assert caplog.records[2].message == (
1859
        "Updated user etienne.michu@ldap (uuid %s) from dn=cn=Étienne Michu,o=ôrga, uid=['etienne.michu'], "
1860
        "sn=['Michu'], givenname=['Étienne'], l=['Paris'], mail=['etienne.michu@example.net'], entryuuid=['%s']"
1861
    ) % (User.objects.first().uuid, entryuuid)
1845 1862

  
1846 1863

  
1847 1864
def test_get_users_select_realm(slapd, settings, db, caplog):
......
1895 1912
    ]
1896 1913
    user = authenticate(username=USERNAME, password=UPASS)
1897 1914
    assert user
1898
    assert user.get_attributes(object(), {}) == {
1915
    assert dict(user.get_attributes(object(), {}), entryuuid=None) == {
1899 1916
        'dn': 'cn=étienne michu,o=\xf4rga',
1900 1917
        'givenname': ['Étienne'],
1901 1918
        'mail': ['etienne.michu@example.net'],
1902 1919
        'sn': ['Michu'],
1903 1920
        'uid': ['etienne.michu'],
1904 1921
        'carlicense': ['123445ABC'],
1922
        'entryuuid': None,
1905 1923
    }
1906 1924
    # simulate LDAP down
1907 1925
    slapd.stop()
1908
    assert user.get_attributes(object(), {}) == {
1926
    assert dict(user.get_attributes(object(), {}), entryuuid=None) == {
1909 1927
        'dn': 'cn=étienne michu,o=\xf4rga',
1910 1928
        'givenname': ['\xc9tienne'],
1911 1929
        'mail': ['etienne.michu@example.net'],
1912 1930
        'sn': ['Michu'],
1913 1931
        'uid': ['etienne.michu'],
1914 1932
        'carlicense': ['123445ABC'],
1933
        'entryuuid': None,
1915 1934
    }
1916 1935
    assert not user.check_password(UPASS)
1917 1936
    # simulate LDAP come back up
......
1921 1940
    conn = slapd.get_connection_admin()
1922 1941
    ldif = [(ldap.MOD_REPLACE, 'sn', [b'Micho'])]
1923 1942
    conn.modify_s(DN, ldif)
1924
    assert user.get_attributes(object(), {}) == {
1943
    assert dict(user.get_attributes(object(), {}), entryuuid=None) == {
1925 1944
        'dn': 'cn=étienne michu,o=\xf4rga',
1926 1945
        'givenname': ['\xc9tienne'],
1927 1946
        'mail': ['etienne.michu@example.net'],
1928 1947
        'sn': ['Micho'],
1929 1948
        'uid': ['etienne.michu'],
1930 1949
        'carlicense': ['123445ABC'],
1950
        'entryuuid': None,
1931 1951
    }
1932 1952

  
1933 1953

  
......
2275 2295
        assert auth_user == user
2276 2296
        assert auth_user.username == f'{UID}@ldap'
2277 2297

  
2278
    def test_by_username_only(self, backend, slapd, settings, client, db):
2279
        settings.LDAP_AUTH_SETTINGS[0]['lookups'] = ['username']
2280
        user = User.objects.create(username=UID, ou=get_default_ou())
2281
        assert backend.authenticate(None, username=EMAIL, password=PASS) == user
2282
        assert not models.UserExternalId.objects.exists()
2283
        user.username = ''
2284
        user.save()
2285
        new_user = backend.authenticate(None, username=EMAIL, password=PASS)
2286
        assert new_user and new_user != user
2287
        assert new_user.username == f'{UID}@ldap'
2298
    def test_by_guid_migration(self, backend, slapd, settings, client, db):
2299
        settings.LDAP_AUTH_SETTINGS[0]['lookups'] = ['external_id']
2300
        assert backend.authenticate(None, username=USERNAME, password=PASS)
2301
        assert User.objects.count() == 1
2302
        user_external_id = models.UserExternalId.objects.get()
2303
        assert user_external_id.external_id
2304
        assert not user_external_id.external_guid
2305

  
2306
        settings.LDAP_AUTH_SETTINGS[0]['lookups'] = ['guid', 'external_id']
2307
        assert backend.authenticate(None, username=USERNAME, password=PASS)
2308
        assert User.objects.count() == 1
2309
        user_external_id = models.UserExternalId.objects.get()
2310
        assert user_external_id.external_id
2311
        assert user_external_id.external_guid
2312

  
2313
    def test_by_guid_only(self, backend, slapd, settings, client, db):
2314
        settings.LDAP_AUTH_SETTINGS[0]['lookups'] = ['guid']
2315
        assert backend.authenticate(None, username=USERNAME, password=PASS)
2316
        assert User.objects.count() == 1
2317
        user_external_id = models.UserExternalId.objects.get()
2318
        assert not user_external_id.external_id
2319
        assert user_external_id.external_guid
2320

  
2321
        assert backend.authenticate(None, username=USERNAME, password=PASS)
2322
        assert User.objects.count() == 1
2323
        user_external_id = models.UserExternalId.objects.get()
2324
        assert not user_external_id.external_id
2325
        assert user_external_id.external_guid
2326

  
2327
    def test_by_guid_only_objectguid(self, backend, slapd, settings, client, db, monkeypatch):
2328
        slapd.add_ldif(OBJECTGUID_SCHEMA)
2329
        conn = slapd.get_connection_admin()
2330
        ldif = [
2331
            (ldap.MOD_ADD, 'objectClass', b'objectWithObjectGuid'),
2332
            (ldap.MOD_ADD, 'objectguid', OBJECTGUID_RAW),
2333
        ]
2334
        conn.modify_s(DN, ldif)
2335
        conn.unbind_s()
2336

  
2337
        settings.LDAP_AUTH_SETTINGS[0]['lookups'] = ['guid']
2338
        monkeypatch.setattr(ldap_backend, 'USUAL_GUID_ATTRIBUTES', ['objectguid'])
2339
        assert backend.authenticate(None, username=USERNAME, password=PASS)
2340
        assert User.objects.count() == 1
2341
        user_external_id = models.UserExternalId.objects.get()
2342
        assert not user_external_id.external_id
2343
        assert user_external_id.external_guid.bytes == OBJECTGUID_RAW
2288 2344

  
2289 2345

  
2290 2346
def test_build_external_id_failure_authenticate(db, rf, slapd, settings, caplog):
......
2297 2353
            'external_id_tuples': [
2298 2354
                ['missing'],
2299 2355
            ],
2356
            'lookups': ['external_id', 'username'],
2300 2357
        }
2301 2358
    ]
2302 2359
    request = rf.get('/login/')
......
2311 2368
    )
2312 2369
    assert len(caplog.records) == 1
2313 2370
    assert caplog.records[0].levelname == 'ERROR'
2314
    assert 'unable to build an external_id' in caplog.records[0].message
2371
    assert 'unable to build an user_external_id' in caplog.records[0].message
2315 2372

  
2316 2373

  
2317 2374
def test_build_external_id_failure_get_users(db, rf, slapd, settings, caplog):
......
2324 2381
            'external_id_tuples': [
2325 2382
                ['missing'],
2326 2383
            ],
2384
            'lookups': ['external_id', 'username'],
2327 2385
        }
2328 2386
    ]
2329 2387
    backend = ldap_backend.LDAPBackend()
......
2331 2389
    assert not users
2332 2390
    assert len(caplog.records) == 6
2333 2391
    assert all(record.levelname == 'ERROR' for record in caplog.records)
2334
    assert all('unable to build an external_id' in record.message for record in caplog.records)
2392
    assert all('unable to build an user_external_id' in record.message for record in caplog.records)
2335
-