0001-ldap-use-guid-attributes-as-global-external-id-63646.patch
src/authentic2/backends/ldap_backend.py | ||
---|---|---|
24 | 24 |
import ssl |
25 | 25 |
import time |
26 | 26 |
import urllib.parse |
27 |
import uuid |
|
27 | 28 | |
28 | 29 |
import ldap |
29 | 30 |
import ldap.modlist |
... | ... | |
34 | 35 |
from django.contrib.auth.models import Group |
35 | 36 |
from django.core.cache import cache |
36 | 37 |
from django.core.exceptions import ImproperlyConfigured |
38 |
from django.db.models import Q |
|
37 | 39 |
from django.db.transaction import atomic |
38 | 40 |
from django.utils.encoding import force_bytes, force_text |
39 | 41 |
from django.utils.translation import ngettext |
... | ... | |
71 | 73 |
'/var/lib/ca-certificates/ca-bundle.pem', # OpenSuse |
72 | 74 |
] |
73 | 75 | |
76 |
USUAL_GUID_ATTRIBUTES = ['entryuuid', 'objectguid', 'nsuniqueid'] |
|
77 | ||
74 | 78 | |
75 | 79 |
class UserCreationError(Exception): |
76 | 80 |
pass |
... | ... | |
123 | 127 |
@to_list |
124 | 128 |
def _convert_results_to_unicode(self, result_list): |
125 | 129 |
for dn, attrs in result_list: |
126 |
if dn is not None: |
|
127 |
# tuple is a real entry with a DN not a search reference |
|
128 |
attrs = {attribute: filter_non_unicode_values(attrs[attribute]) for attribute in attrs} |
|
129 |
yield dn, attrs |
|
130 |
if dn is None: |
|
131 |
continue |
|
132 |
new_attrs = {} |
|
133 |
for attribute in attrs: |
|
134 |
values = attrs[attribute] |
|
135 |
# specialize for GUID attributes |
|
136 |
if attribute in USUAL_GUID_ATTRIBUTES and len(values[0]) == 16: |
|
137 |
try: |
|
138 |
values = [str(uuid.UUID(bytes=values[0]))] |
|
139 |
except ValueError: |
|
140 |
values = [] |
|
141 |
values = filter_non_unicode_values(values) |
|
142 |
if not values: |
|
143 |
continue |
|
144 |
new_attrs[attribute] = values |
|
145 |
yield dn, new_attrs |
|
130 | 146 | |
131 | 147 |
def modify_s(self, dn, modlist): |
132 | 148 |
new_modlist = [] |
... | ... | |
482 | 498 |
# generated username are unique |
483 | 499 |
'update_username': False, |
484 | 500 |
# lookup existing user with an external id build with attributes |
485 |
'lookups': ('external_id', 'username', 'email'), |
|
501 |
'lookups': ('guid', 'external_id', 'username', 'email'),
|
|
486 | 502 |
'external_id_tuples': ( |
487 | 503 |
('uid',), |
488 | 504 |
('dn:noquote',), |
... | ... | |
745 | 761 |
try: |
746 | 762 |
return self._return_user(authz_id, password, conn, block) |
747 | 763 |
except UserCreationError as e: |
748 |
messages.error(request, str(e)) |
|
764 |
if request: |
|
765 |
messages.error(request, str(e)) |
|
749 | 766 |
return None |
750 | 767 |
except ldap.CONNECT_ERROR: |
751 | 768 |
log.error( |
... | ... | |
1095 | 1112 |
for key in at_mapping: |
1096 | 1113 |
if at_mapping[key] != 'dn': |
1097 | 1114 |
attributes.add(at_mapping[key]) |
1115 |
# add usual GUID attributes |
|
1116 |
attributes.update(USUAL_GUID_ATTRIBUTES) |
|
1098 | 1117 |
return list({attribute.lower() for attribute in attributes}) |
1099 | 1118 | |
1100 | 1119 |
@classmethod |
... | ... | |
1350 | 1369 |
return None |
1351 | 1370 | |
1352 | 1371 |
def _lookup_by_external_id(self, block, attributes): |
1372 |
realm = block['realm'] |
|
1353 | 1373 |
for eid_tuple in map_text(block['external_id_tuples']): |
1354 | 1374 |
external_id = self.build_external_id(eid_tuple, attributes) |
1355 | 1375 |
if not external_id: |
... | ... | |
1359 | 1379 |
LDAPUser.objects.prefetch_related('groups') |
1360 | 1380 |
.filter( |
1361 | 1381 |
userexternalid__external_id__iexact=external_id, |
1362 |
userexternalid__source=force_text(block['realm']),
|
|
1382 |
userexternalid__source=realm,
|
|
1363 | 1383 |
) |
1364 | 1384 |
.order_by('-last_login') |
1365 | 1385 |
) |
... | ... | |
1373 | 1393 |
len(users), |
1374 | 1394 |
) |
1375 | 1395 |
for other in users[1:]: |
1376 |
for r in other.roles.all(): |
|
1377 |
user.roles.add(r) |
|
1396 |
user.roles.add(*other.roles.all()) |
|
1378 | 1397 |
other.delete() |
1379 | 1398 |
return user |
1380 | 1399 |
return None |
1381 | 1400 | |
1401 |
def _lookup_by_external_guid(self, block, attribute, guid): |
|
1402 |
if not guid: |
|
1403 |
return None |
|
1404 |
log.debug('ldap: lookup by external_guid %s=%s', attribute, guid) |
|
1405 |
try: |
|
1406 |
return LDAPUser.objects.get( |
|
1407 |
userexternalid__source=block['realm'], userexternalid__external_guid=guid |
|
1408 |
) |
|
1409 |
except LDAPUser.DoesNotExist: |
|
1410 |
return None |
|
1411 | ||
1412 |
# entryuuid is encoded as the string representation of the UUID, as an octet string |
|
1413 |
@classmethod |
|
1414 |
def _decode_entryuuid_guid(cls, value): |
|
1415 |
try: |
|
1416 |
return uuid.UUID(value) |
|
1417 |
except (ValueError, UnicodeDecodeError): |
|
1418 |
return None |
|
1419 | ||
1420 |
# objectid is encoded as the byte representation of the UUID |
|
1421 |
_decode_objectguid_guid = _decode_entryuuid_guid |
|
1422 | ||
1423 |
# nsuniqueid is encoded as the byte representation of the UUID |
|
1424 |
_decode_nsuniqueid_guid = _decode_objectguid_guid |
|
1425 | ||
1426 |
def _lookup_by_guid(self, block, attributes): |
|
1427 |
attribute, guid = self._get_guid(attributes) |
|
1428 |
return self._lookup_by_external_guid(block=block, attribute=attribute, guid=guid) |
|
1429 | ||
1430 |
@classmethod |
|
1431 |
def _get_guid(cls, attributes): |
|
1432 |
for attribute in USUAL_GUID_ATTRIBUTES: |
|
1433 |
if attribute not in attributes: |
|
1434 |
continue |
|
1435 |
value = attributes[attribute][0] |
|
1436 |
guid = getattr(cls, f'_decode_{attribute}_guid')(value) |
|
1437 |
if guid: |
|
1438 |
return attribute, guid |
|
1439 |
return None, None |
|
1440 | ||
1382 | 1441 |
def _lookup_existing_user(self, username, block, attributes): |
1383 | 1442 |
user = None |
1384 | 1443 |
ou = self._get_target_ou(block) |
... | ... | |
1393 | 1452 |
user = self._lookup_by_external_id(block=block, attributes=attributes) |
1394 | 1453 |
elif lookup_type == 'email' and attributes: |
1395 | 1454 |
user = self._lookup_by_email(ou=ou, block=block, attributes=attributes) |
1455 |
elif lookup_type == 'guid' and attributes: |
|
1456 |
user = self._lookup_by_guid(block=block, attributes=attributes) |
|
1396 | 1457 |
if user: |
1397 | 1458 |
return user |
1398 | 1459 | |
1399 | 1460 |
def update_user_identifiers(self, user, username, block, attributes): |
1461 |
realm = block['realm'] |
|
1400 | 1462 |
# if username has changed and we propagate those changes, update it |
1401 | 1463 |
if block['update_username']: |
1402 | 1464 |
if user.username != username: |
... | ... | |
1406 | 1468 |
log_msg = 'updating username from %r to %r' |
1407 | 1469 |
log.debug(log_msg, old_username, user.username) |
1408 | 1470 |
# if external_id lookup is used, update it |
1471 |
userexternalid = None |
|
1472 |
use_guid = False |
|
1473 |
use_external_id = False |
|
1474 |
guid = None |
|
1475 |
external_id = None |
|
1476 |
if 'guid' in block['lookups']: |
|
1477 |
use_guid = True |
|
1478 |
_attribute, guid = self._get_guid(attributes) |
|
1479 | ||
1480 |
if guid and user.pk: |
|
1481 |
if guid: |
|
1482 |
try: |
|
1483 |
userexternalid = UserExternalId.objects.get(user=user, external_guid=guid, source=realm) |
|
1484 |
except UserExternalId.DoesNotExist: |
|
1485 |
pass |
|
1486 | ||
1409 | 1487 |
if ( |
1410 | 1488 |
'external_id' in block['lookups'] |
1411 | 1489 |
and block.get('external_id_tuples') |
1412 | 1490 |
and block['external_id_tuples'][0] |
1413 | 1491 |
): |
1414 |
if not user.pk: |
|
1415 |
user.save() |
|
1416 |
user._changed = False |
|
1492 |
use_external_id = True |
|
1417 | 1493 |
external_id = self.build_external_id(map_text(block['external_id_tuples'][0]), attributes) |
1418 |
if external_id: |
|
1419 |
new, dummy = UserExternalId.objects.get_or_create( |
|
1420 |
user=user, external_id=external_id, source=force_text(block['realm']) |
|
1494 | ||
1495 |
if external_id and user.pk: |
|
1496 |
try: |
|
1497 |
userexternalid = UserExternalId.objects.get( |
|
1498 |
user=user, external_id__iexact=external_id, source=realm |
|
1421 | 1499 |
) |
1422 |
if block['clean_external_id_on_update']: |
|
1423 |
UserExternalId.objects.exclude(id=new.id).filter( |
|
1424 |
user=user, source=force_text(block['realm']) |
|
1425 |
).delete() |
|
1426 |
elif user._created: |
|
1500 |
except UserExternalId.DoesNotExist: |
|
1501 |
pass |
|
1502 | ||
1503 |
if userexternalid: |
|
1504 |
changed = False |
|
1505 |
if userexternalid.external_guid != guid: |
|
1506 |
userexternalid.external_guid = guid |
|
1507 |
changed = True |
|
1508 |
if userexternalid.external_id != external_id: |
|
1509 |
userexternalid.external_id = external_id |
|
1510 |
changed = True |
|
1511 |
if changed: |
|
1512 |
userexternalid.save() |
|
1513 |
elif use_guid or use_external_id: |
|
1514 |
if not guid and not external_id: |
|
1427 | 1515 |
log.error( |
1428 |
'ldap: unable to build an external_id (%r) with attributes: %r', |
|
1429 |
block['external_id_tuples'][0],
|
|
1516 |
'ldap: unable to build an user_external_id (%r) with attributes: %r',
|
|
1517 |
block['external_id_tuples'], |
|
1430 | 1518 |
attributes, |
1431 | 1519 |
) |
1432 | 1520 |
raise UserCreationError(_('LDAP configuration is broken, please contact your administrator')) |
1433 |
else:
|
|
1434 |
log.error(
|
|
1435 |
'ldap: unable to update an external_id (%r) with attributes: %r',
|
|
1436 |
block['external_id_tuples'][0],
|
|
1437 |
attributes,
|
|
1438 |
)
|
|
1521 |
if not user.pk:
|
|
1522 |
user._changed = False
|
|
1523 |
user.save()
|
|
1524 |
UserExternalId.objects.create(
|
|
1525 |
user=user, source=realm, external_id=external_id, external_guid=guid
|
|
1526 |
) |
|
1439 | 1527 | |
1440 | 1528 |
def _record_failure_for_user(self, request, reason, user_id, block, conn, attributes=None): |
1441 | 1529 |
user = None |
... | ... | |
1589 | 1677 |
'external_id', flat=True |
1590 | 1678 |
) |
1591 | 1679 |
) |
1680 |
guids = set( |
|
1681 |
UserExternalId.objects.filter(user__is_active=True, source=block['realm']).values_list( |
|
1682 |
'external_guid', flat=True |
|
1683 |
) |
|
1684 |
) |
|
1592 | 1685 |
basedn = force_text(block.get('user_basedn') or block['basedn']) |
1593 |
attribute_names = [ |
|
1594 |
a[0] for a in cls.attribute_name_from_external_id_tuple(block['external_id_tuples']) |
|
1595 |
] |
|
1686 |
attribute_names = list( |
|
1687 |
{a[0] for a in cls.attribute_name_from_external_id_tuple(block['external_id_tuples'])} |
|
1688 |
| set(USUAL_GUID_ATTRIBUTES) |
|
1689 |
) |
|
1596 | 1690 |
user_filter = cls.get_sync_ldap_user_filter(block) |
1597 | 1691 |
results = cls.paged_search( |
1598 | 1692 |
conn, basedn, ldap.SCOPE_SUBTREE, user_filter, attrlist=attribute_names |
... | ... | |
1600 | 1694 |
for dn, attrs in results: |
1601 | 1695 |
data = attrs.copy() |
1602 | 1696 |
data['dn'] = dn |
1697 |
_attribute, guid = cls._get_guid(data) |
|
1698 |
if guid and guid in guids: |
|
1699 |
guids.discard(guid) |
|
1603 | 1700 |
for eid_tuple in map_text(block['external_id_tuples']): |
1604 | 1701 |
backend = cls() |
1605 | 1702 |
external_id = backend.build_external_id(eid_tuple, data) |
... | ... | |
1608 | 1705 |
eids.remove(external_id) |
1609 | 1706 |
except ValueError: |
1610 | 1707 |
pass |
1611 |
for eid in UserExternalId.objects.filter( |
|
1612 |
external_id__in=eids, user__is_active=True, source=block['realm'] |
|
1708 |
for eid in UserExternalId.objects.filter(user__is_active=True, source=block['realm']).filter( |
|
1709 |
Q(external_id__in=eids, external_guid__isnull=True) |
|
1710 |
| Q(external_guid__in=guids, external_id__isnull=True) |
|
1711 |
| Q(external_guid__in=guids, external_id__in=eids) |
|
1613 | 1712 |
): |
1614 | 1713 |
if eid.user.is_active: |
1615 | 1714 |
eid.user.mark_as_inactive(reason=LDAP_DEACTIVATION_REASON_NOT_PRESENT) |
... | ... | |
1896 | 1995 |
return |
1897 | 1996 |
for user_external_id in user.userexternalid_set.all(): |
1898 | 1997 |
external_id = user_external_id.external_id |
1998 |
if not external_id: |
|
1999 |
continue |
|
1899 | 2000 |
for block in config: |
1900 | 2001 |
if user_external_id.source != force_text(block['realm']): |
1901 | 2002 |
continue |
src/authentic2/migrations/0038_add_external_guid.py | ||
---|---|---|
1 |
# Generated by Django 2.2.27 on 2022-04-06 21:11 |
|
2 | ||
3 |
from django.db import migrations, models |
|
4 | ||
5 | ||
6 |
class Migration(migrations.Migration): |
|
7 | ||
8 |
dependencies = [ |
|
9 |
('authentic2', '0037_auto_20220331_1513'), |
|
10 |
] |
|
11 | ||
12 |
operations = [ |
|
13 |
migrations.AddField( |
|
14 |
model_name='userexternalid', |
|
15 |
name='external_guid', |
|
16 |
field=models.UUIDField(null=True, verbose_name='External GUID'), |
|
17 |
), |
|
18 |
migrations.AlterField( |
|
19 |
model_name='userexternalid', |
|
20 |
name='external_id', |
|
21 |
field=models.CharField(max_length=256, null=True, verbose_name='external id'), |
|
22 |
), |
|
23 |
migrations.AlterUniqueTogether( |
|
24 |
name='userexternalid', |
|
25 |
unique_together={('source', 'external_guid'), ('source', 'external_id')}, |
|
26 |
), |
|
27 |
migrations.AddConstraint( |
|
28 |
model_name='userexternalid', |
|
29 |
constraint=models.CheckConstraint( |
|
30 |
check=models.Q( |
|
31 |
('external_id__isnull', False), ('external_guid__isnull', False), _connector='OR' |
|
32 |
), |
|
33 |
name='at_least_one_id', |
|
34 |
), |
|
35 |
), |
|
36 |
] |
src/authentic2/models.py | ||
---|---|---|
49 | 49 |
class UserExternalId(models.Model): |
50 | 50 |
user = models.ForeignKey(settings.AUTH_USER_MODEL, verbose_name=_('user'), on_delete=models.CASCADE) |
51 | 51 |
source = models.CharField(max_length=256, verbose_name=_('source')) |
52 |
external_id = models.CharField(max_length=256, verbose_name=_('external id')) |
|
52 |
external_id = models.CharField(max_length=256, verbose_name=_('external id'), null=True) |
|
53 |
external_guid = models.UUIDField(verbose_name=_('External GUID'), null=True) |
|
53 | 54 |
created = models.DateTimeField(auto_now_add=True, verbose_name=_('creation date')) |
54 | 55 |
updated = models.DateTimeField(auto_now=True, verbose_name=_('last update date')) |
55 | 56 | |
56 | 57 |
def __str__(self): |
57 |
return f'{self.user} is {self.external_id} on {self.source}' |
|
58 |
return f'{self.user} is {self.external_id or self.external_guid} on {self.source}'
|
|
58 | 59 | |
59 | 60 |
def __repr__(self): |
60 |
return '<UserExternalId user: {!r} source: {!r} external_id: {!r} created: {} updated: {}'.format( |
|
61 |
self.user_id, self.source, self.external_id, self.created, self.updated |
|
61 |
return ( |
|
62 |
f'<UserExternalId user: {self.user!r} source: {self.source!r}' |
|
63 |
f"{f' external_id: {self.external_id!r}' if self.external_id else ''}" |
|
64 |
f"{f' external_guid: {self.external_guid!r}' if self.external_guid else ''}" |
|
65 |
f' created: {self.created} updated: {self.updated}' |
|
62 | 66 |
) |
63 | 67 | |
64 | 68 |
class Meta: |
... | ... | |
66 | 70 |
verbose_name_plural = _('user external ids') |
67 | 71 |
unique_together = [ |
68 | 72 |
('source', 'external_id'), |
73 |
('source', 'external_guid'), |
|
74 |
] |
|
75 |
constraints = [ |
|
76 |
models.CheckConstraint( |
|
77 |
check=Q(external_id__isnull=False) | Q(external_guid__isnull=False), name='at_least_one_id' |
|
78 |
), |
|
69 | 79 |
] |
70 | 80 | |
71 | 81 |
tests/test_ldap.py | ||
---|---|---|
54 | 54 |
UPASS = 'passé' |
55 | 55 |
EMAIL = 'etienne.michu@example.net' |
56 | 56 |
CARLICENSE = '123445ABC' |
57 |
UUID = '8ff2f34a-4a36-103c-8d0a-e3a0333484d3' |
|
57 | 58 | |
58 | 59 |
CN_INCOMPLETE = 'Jean Dupond' |
59 | 60 |
DN_INCOMPLETE = 'cn=%s,o=ôrga' % escape_dn_chars(CN_INCOMPLETE) |
... | ... | |
749 | 750 |
'group_to_role_mapping': [ |
750 | 751 |
['cn=unknown,o=dn', ['Role2']], |
751 | 752 |
], |
753 |
'lookups': ['external_id', 'username'], |
|
752 | 754 |
} |
753 | 755 |
] |
754 | 756 |
save = mock.Mock(wraps=ldap_backend.LDAPUser.save) |
... | ... | |
1771 | 1773 |
def test_sync_ldap_users(slapd, settings, app, db, caplog): |
1772 | 1774 |
caplog.set_level('INFO') |
1773 | 1775 | |
1776 |
conn = slapd.get_connection_admin() |
|
1777 |
entryuuid = conn.search_s('o=ôrga', ldap.SCOPE_SUBTREE, f'(uid={UID})', ['entryUUID'])[0][1]['entryUUID'][ |
|
1778 |
0 |
|
1779 |
].decode() |
|
1774 | 1780 |
management.call_command('sync-ldap-users') |
1775 | 1781 |
assert caplog.records[0].message == 'No LDAP server configured.' |
1776 | 1782 | |
... | ... | |
1808 | 1814 |
assert caplog.records[2].message == ( |
1809 | 1815 |
( |
1810 | 1816 |
"Created user etienne.michu@ldap (uuid %s) from dn=cn=Étienne Michu,o=ôrga, uid=['etienne.michu'], " |
1811 |
"sn=['Michu'], givenname=['Étienne'], l=['Paris'], mail=['etienne.michu@example.net']" |
|
1817 |
"sn=['Michu'], givenname=['Étienne'], l=['Paris'], mail=['etienne.michu@example.net'], entryuuid=['%s']"
|
|
1812 | 1818 |
) |
1813 |
% User.objects.first().uuid
|
|
1819 |
% (User.objects.first().uuid, entryuuid)
|
|
1814 | 1820 |
) |
1815 | 1821 |
assert caplog.records[-1].message == 'Search for (|(mail=*)(uid=*)) returned 6 users.' |
1816 | 1822 | |
... | ... | |
1834 | 1840 |
caplog.clear() |
1835 | 1841 |
User.objects.update(first_name='John') |
1836 | 1842 |
management.call_command('sync-ldap-users', verbosity=3) |
1837 |
assert ( |
|
1838 |
caplog.records[2].message |
|
1839 |
== ( |
|
1840 |
"Updated user etienne.michu@ldap (uuid %s) from dn=cn=Étienne Michu,o=ôrga, uid=['etienne.michu'], " |
|
1841 |
"sn=['Michu'], givenname=['Étienne'], l=['Paris'], mail=['etienne.michu@example.net']" |
|
1842 |
) |
|
1843 |
% User.objects.first().uuid |
|
1844 |
) |
|
1843 |
assert caplog.records[2].message == ( |
|
1844 |
"Updated user etienne.michu@ldap (uuid %s) from dn=cn=Étienne Michu,o=ôrga, uid=['etienne.michu'], " |
|
1845 |
"sn=['Michu'], givenname=['Étienne'], l=['Paris'], mail=['etienne.michu@example.net'], entryuuid=['%s']" |
|
1846 |
) % (User.objects.first().uuid, entryuuid) |
|
1845 | 1847 | |
1846 | 1848 | |
1847 | 1849 |
def test_get_users_select_realm(slapd, settings, db, caplog): |
... | ... | |
1895 | 1897 |
] |
1896 | 1898 |
user = authenticate(username=USERNAME, password=UPASS) |
1897 | 1899 |
assert user |
1898 |
assert user.get_attributes(object(), {}) == {
|
|
1900 |
assert dict(user.get_attributes(object(), {}), entryuuid=None) == {
|
|
1899 | 1901 |
'dn': 'cn=étienne michu,o=\xf4rga', |
1900 | 1902 |
'givenname': ['Étienne'], |
1901 | 1903 |
'mail': ['etienne.michu@example.net'], |
1902 | 1904 |
'sn': ['Michu'], |
1903 | 1905 |
'uid': ['etienne.michu'], |
1904 | 1906 |
'carlicense': ['123445ABC'], |
1907 |
'entryuuid': None, |
|
1905 | 1908 |
} |
1906 | 1909 |
# simulate LDAP down |
1907 | 1910 |
slapd.stop() |
1908 |
assert user.get_attributes(object(), {}) == {
|
|
1911 |
assert dict(user.get_attributes(object(), {}), entryuuid=None) == {
|
|
1909 | 1912 |
'dn': 'cn=étienne michu,o=\xf4rga', |
1910 | 1913 |
'givenname': ['\xc9tienne'], |
1911 | 1914 |
'mail': ['etienne.michu@example.net'], |
1912 | 1915 |
'sn': ['Michu'], |
1913 | 1916 |
'uid': ['etienne.michu'], |
1914 | 1917 |
'carlicense': ['123445ABC'], |
1918 |
'entryuuid': None, |
|
1915 | 1919 |
} |
1916 | 1920 |
assert not user.check_password(UPASS) |
1917 | 1921 |
# simulate LDAP come back up |
... | ... | |
1921 | 1925 |
conn = slapd.get_connection_admin() |
1922 | 1926 |
ldif = [(ldap.MOD_REPLACE, 'sn', [b'Micho'])] |
1923 | 1927 |
conn.modify_s(DN, ldif) |
1924 |
assert user.get_attributes(object(), {}) == {
|
|
1928 |
assert dict(user.get_attributes(object(), {}), entryuuid=None) == {
|
|
1925 | 1929 |
'dn': 'cn=étienne michu,o=\xf4rga', |
1926 | 1930 |
'givenname': ['\xc9tienne'], |
1927 | 1931 |
'mail': ['etienne.michu@example.net'], |
1928 | 1932 |
'sn': ['Micho'], |
1929 | 1933 |
'uid': ['etienne.michu'], |
1930 | 1934 |
'carlicense': ['123445ABC'], |
1935 |
'entryuuid': None, |
|
1931 | 1936 |
} |
1932 | 1937 | |
1933 | 1938 | |
... | ... | |
2275 | 2280 |
assert auth_user == user |
2276 | 2281 |
assert auth_user.username == f'{UID}@ldap' |
2277 | 2282 | |
2278 |
def test_by_username_only(self, backend, slapd, settings, client, db): |
|
2279 |
settings.LDAP_AUTH_SETTINGS[0]['lookups'] = ['username'] |
|
2280 |
user = User.objects.create(username=UID, ou=get_default_ou()) |
|
2281 |
assert backend.authenticate(None, username=EMAIL, password=PASS) == user |
|
2282 |
assert not models.UserExternalId.objects.exists() |
|
2283 |
user.username = '' |
|
2284 |
user.save() |
|
2285 |
new_user = backend.authenticate(None, username=EMAIL, password=PASS) |
|
2286 |
assert new_user and new_user != user |
|
2287 |
assert new_user.username == f'{UID}@ldap' |
|
2283 |
def test_by_guid_migration(self, backend, slapd, settings, client, db): |
|
2284 |
settings.LDAP_AUTH_SETTINGS[0]['lookups'] = ['external_id'] |
|
2285 |
assert backend.authenticate(None, username=USERNAME, password=PASS) |
|
2286 |
assert User.objects.count() == 1 |
|
2287 |
user_external_id = models.UserExternalId.objects.get() |
|
2288 |
assert user_external_id.external_id |
|
2289 |
assert not user_external_id.external_guid |
|
2290 | ||
2291 |
settings.LDAP_AUTH_SETTINGS[0]['lookups'] = ['guid', 'external_id'] |
|
2292 |
assert backend.authenticate(None, username=USERNAME, password=PASS) |
|
2293 |
assert User.objects.count() == 1 |
|
2294 |
user_external_id = models.UserExternalId.objects.get() |
|
2295 |
assert user_external_id.external_id |
|
2296 |
assert user_external_id.external_guid |
|
2297 | ||
2298 |
def test_by_guid_only(self, backend, slapd, settings, client, db): |
|
2299 |
settings.LDAP_AUTH_SETTINGS[0]['lookups'] = ['guid'] |
|
2300 |
assert backend.authenticate(None, username=USERNAME, password=PASS) |
|
2301 |
assert User.objects.count() == 1 |
|
2302 |
user_external_id = models.UserExternalId.objects.get() |
|
2303 |
assert not user_external_id.external_id |
|
2304 |
assert user_external_id.external_guid |
|
2305 | ||
2306 |
assert backend.authenticate(None, username=USERNAME, password=PASS) |
|
2307 |
assert User.objects.count() == 1 |
|
2308 |
user_external_id = models.UserExternalId.objects.get() |
|
2309 |
assert not user_external_id.external_id |
|
2310 |
assert user_external_id.external_guid |
|
2288 | 2311 | |
2289 | 2312 | |
2290 | 2313 |
def test_build_external_id_failure_authenticate(db, rf, slapd, settings, caplog): |
... | ... | |
2297 | 2320 |
'external_id_tuples': [ |
2298 | 2321 |
['missing'], |
2299 | 2322 |
], |
2323 |
'lookups': ['external_id', 'username'], |
|
2300 | 2324 |
} |
2301 | 2325 |
] |
2302 | 2326 |
request = rf.get('/login/') |
... | ... | |
2311 | 2335 |
) |
2312 | 2336 |
assert len(caplog.records) == 1 |
2313 | 2337 |
assert caplog.records[0].levelname == 'ERROR' |
2314 |
assert 'unable to build an external_id' in caplog.records[0].message |
|
2338 |
assert 'unable to build an user_external_id' in caplog.records[0].message
|
|
2315 | 2339 | |
2316 | 2340 | |
2317 | 2341 |
def test_build_external_id_failure_get_users(db, rf, slapd, settings, caplog): |
... | ... | |
2324 | 2348 |
'external_id_tuples': [ |
2325 | 2349 |
['missing'], |
2326 | 2350 |
], |
2351 |
'lookups': ['external_id', 'username'], |
|
2327 | 2352 |
} |
2328 | 2353 |
] |
2329 | 2354 |
backend = ldap_backend.LDAPBackend() |
... | ... | |
2331 | 2356 |
assert not users |
2332 | 2357 |
assert len(caplog.records) == 6 |
2333 | 2358 |
assert all(record.levelname == 'ERROR' for record in caplog.records) |
2334 |
assert all('unable to build an external_id' in record.message for record in caplog.records) |
|
2359 |
assert all('unable to build an user_external_id' in record.message for record in caplog.records) |
|
2335 |
- |