26 |
26 |
from ldaptools.slapd import Slapd, has_slapd
|
27 |
27 |
from django.contrib.auth import get_user_model, authenticate
|
28 |
28 |
from django.core.exceptions import ImproperlyConfigured
|
|
29 |
from django.core import management
|
29 |
30 |
from django.core import mail
|
30 |
31 |
from django.utils.encoding import force_text
|
31 |
32 |
from django.utils import timezone
|
|
33 |
from django.utils.six.moves.urllib import parse as urlparse
|
32 |
34 |
|
33 |
35 |
from authentic2.a2_rbac.utils import get_default_ou
|
34 |
36 |
from django_rbac.utils import get_ou_model
|
... | ... | |
88 |
90 |
member: {dn}
|
89 |
91 |
|
90 |
92 |
'''.format(dn=DN, uid=UID, password=PASS))
|
91 |
|
for i in range(100):
|
92 |
|
slapd.add_ldif('''dn: uid=michu{i},o=ôrga
|
|
93 |
for i in range(5):
|
|
94 |
slapd.add_ldif('''dn: uid=mïchu{i},o=ôrga
|
93 |
95 |
objectClass: inetOrgPerson
|
94 |
96 |
userPassword: {password}
|
95 |
|
uid: michu{i}
|
|
97 |
uid: mïchu{i}
|
96 |
98 |
cn: Étienne Michu
|
97 |
99 |
sn: Michu
|
98 |
100 |
gn: Étienne
|
... | ... | |
105 |
107 |
objectClass: posixGroup
|
106 |
108 |
memberUid: {uid}
|
107 |
109 |
'''.format(uid=UID)
|
108 |
|
for i in range(100):
|
109 |
|
group_ldif += 'memberUid: michu{i}\n'.format(i=i)
|
110 |
110 |
group_ldif += '\n\n'
|
111 |
111 |
slapd.add_ldif(group_ldif)
|
112 |
112 |
return slapd
|
... | ... | |
381 |
381 |
def test_get_users(slapd, settings, db):
|
382 |
382 |
import django.db.models.base
|
383 |
383 |
from types import MethodType
|
|
384 |
from django.contrib.auth.models import Group
|
384 |
385 |
|
385 |
386 |
settings.LDAP_AUTH_SETTINGS = [{
|
386 |
387 |
'url': [slapd.ldap_url],
|
... | ... | |
398 |
399 |
django.db.models.query.QuerySet.bulk_create = MethodType(bulk_create, None,
|
399 |
400 |
django.db.models.query.QuerySet)
|
400 |
401 |
|
|
402 |
assert Group.objects.count() == 0
|
401 |
403 |
# Provision all users and their groups
|
402 |
404 |
assert User.objects.count() == 0
|
403 |
405 |
users = list(ldap_backend.LDAPBackend.get_users())
|
404 |
|
assert len(users) == 101
|
405 |
|
assert User.objects.count() == 101
|
406 |
|
assert bulk_create.call_count == 101
|
407 |
|
assert save.call_count == 303
|
|
406 |
assert len(users) == 6
|
|
407 |
assert User.objects.count() == 6
|
|
408 |
assert bulk_create.call_count == 1
|
|
409 |
assert save.call_count == 18
|
|
410 |
assert Group.objects.count() == 1
|
|
411 |
assert Group.objects.get().user_set.count() == 1
|
408 |
412 |
|
409 |
413 |
# Check that if nothing changed no save() is made
|
410 |
414 |
save.reset_mock()
|
... | ... | |
416 |
420 |
# Check that if we delete 1 user, only this user is created
|
417 |
421 |
save.reset_mock()
|
418 |
422 |
bulk_create.reset_mock()
|
419 |
|
User.objects.last().delete()
|
420 |
|
assert User.objects.count() == 100
|
|
423 |
User.objects.filter(username='etienne.michu@ldap').delete()
|
|
424 |
assert User.objects.count() == 5
|
421 |
425 |
users = list(ldap_backend.LDAPBackend.get_users())
|
422 |
|
assert len(users) == 101
|
423 |
|
assert User.objects.count() == 101
|
|
426 |
assert len(users) == 6
|
|
427 |
assert User.objects.count() == 6
|
424 |
428 |
assert save.call_count == 3
|
425 |
429 |
assert bulk_create.call_count == 1
|
426 |
430 |
|
... | ... | |
431 |
435 |
save.reset_mock()
|
432 |
436 |
bulk_create.reset_mock()
|
433 |
437 |
users = list(ldap_backend.LDAPBackend.get_users())
|
434 |
|
assert len(users) == 101
|
435 |
|
assert User.objects.count() == 101
|
|
438 |
assert len(users) == 6
|
|
439 |
assert User.objects.count() == 6
|
436 |
440 |
assert save.call_count == 0
|
437 |
441 |
assert bulk_create.call_count == 0
|
438 |
442 |
|
... | ... | |
446 |
450 |
user = ldap_backend.LDAPUser.objects.get(username='%s@ldap' % UID)
|
447 |
451 |
user.last_login = timezone.now()
|
448 |
452 |
user.save()
|
449 |
|
assert ldap_backend.LDAPUser.objects.count() == 102
|
|
453 |
assert ldap_backend.LDAPUser.objects.count() == 7
|
450 |
454 |
users = list(ldap_backend.LDAPBackend.get_users())
|
451 |
|
assert len(users) == 101
|
|
455 |
assert len(users) == 6
|
452 |
456 |
assert ldap_backend.LDAPUser.objects.filter(username='%s' % UID.capitalize()).count() == 0
|
453 |
457 |
|
454 |
458 |
|
... | ... | |
694 |
698 |
client.post('/login/',
|
695 |
699 |
{
|
696 |
700 |
'login-password-submit': '1',
|
697 |
|
'username': 'michu%s' % i,
|
|
701 |
'username': u'mïchu%s' % i,
|
698 |
702 |
'password': PASS,
|
699 |
703 |
},
|
700 |
704 |
follow=True)
|
701 |
|
username = u'michu%s@ldap' % i
|
|
705 |
username = u'mïchu%s@ldap' % i
|
702 |
706 |
user = User.objects.get(username=username)
|
703 |
707 |
assert user.attributes.locality == u'locality%s' % i
|
704 |
708 |
client.session.flush()
|
... | ... | |
774 |
778 |
response.form.set('ou', str(get_default_ou().pk))
|
775 |
779 |
response = response.form.submit(name='login-password-submit').follow()
|
776 |
780 |
assert '_auth_user_id' in app.session
|
|
781 |
|
|
782 |
|
|
783 |
def test_sync_ldap_users(slapd, settings, app, db):
|
|
784 |
settings.LDAP_AUTH_SETTINGS = [{
|
|
785 |
'url': [slapd.ldap_url],
|
|
786 |
'basedn': u'o=ôrga',
|
|
787 |
'use_tls': False,
|
|
788 |
'user_attributes': [
|
|
789 |
{
|
|
790 |
'from_ldap': 'l',
|
|
791 |
'to_user': 'locality',
|
|
792 |
},
|
|
793 |
]
|
|
794 |
}]
|
|
795 |
|
|
796 |
# create a locality attribute
|
|
797 |
models.Attribute.objects.create(
|
|
798 |
label='locality',
|
|
799 |
name='locality',
|
|
800 |
kind='string',
|
|
801 |
required=False,
|
|
802 |
user_visible=True,
|
|
803 |
user_editable=False,
|
|
804 |
asked_on_registration=False,
|
|
805 |
multiple=False)
|
|
806 |
|
|
807 |
assert User.objects.count() == 0
|
|
808 |
management.call_command('sync-ldap-users')
|
|
809 |
assert User.objects.count() == 6
|
|
810 |
assert all(user.first_name == u'Étienne' for user in User.objects.all())
|
|
811 |
assert all(user.attributes.first_name == u'Étienne' for user in User.objects.all())
|
|
812 |
assert all(user.last_name == u'Michu' for user in User.objects.all())
|
|
813 |
assert all(user.attributes.last_name == u'Michu' for user in User.objects.all())
|
|
814 |
assert all(user.attributes.locality == u'Paris' or user.attributes.locality.startswith('locality')
|
|
815 |
for user in User.objects.all())
|
|
816 |
assert all([user.userexternalid_set.first().external_id
|
|
817 |
== urlparse.quote(user.username.split('@')[0].encode('utf-8'))
|
|
818 |
for user in User.objects.all()])
|
777 |
|
-
|