From 2cad719e19f3c8aa8d4286d47bfa12c6893107e5 Mon Sep 17 00:00:00 2001 From: Paul Marillonnet Date: Wed, 31 May 2017 16:31:26 +0200 Subject: [PATCH] ldap_backend: A2 roles mapped to LDAP entries (#16523) --- src/authentic2/backends/ldap_backend.py | 69 ++++++++++++++++++++++++++++++--- tests/test_ldap.py | 46 ++++++++++++++++++++++ 2 files changed, 109 insertions(+), 6 deletions(-) diff --git a/src/authentic2/backends/ldap_backend.py b/src/authentic2/backends/ldap_backend.py index 542c8abe..16cc7187 100644 --- a/src/authentic2/backends/ldap_backend.py +++ b/src/authentic2/backends/ldap_backend.py @@ -209,13 +209,16 @@ class LDAPBackend(object): 'sync_ldap_users_filter': None, 'user_basedn': None, 'group_dn_template': None, - 'member_of_attribute': None, + 'group_member_of_attribute': None, + 'role_member_of_attribute': None, 'group_filter': '(&(member={user_dn})(objectClass=groupOfNames))', + 'role_filter': '(&(member={user_dn})(objectClass=groupOfNames))', 'group': None, 'groupsu': None, 'groupstaff': None, 'groupactive': None, 'group_mapping': (), + 'role_mapping': (), 'replicas': True, 'email_field': 'mail', 'fname_field': 'givenName', @@ -514,18 +517,39 @@ class LDAPBackend(object): elif dn not in group_dns and group in groups: user.groups.remove(group) + def populate_roles_by_mapping(self, user, dn, conn, block, role_dns): + '''Assign role to user based on a mapping from role DNs''' + role_mapping = block.get('role_mapping') + if not role_mapping: + return + if not user.pk: + user.save() + user._changed = False + roles = user.roles.all() + for dn, role_names in role_mapping: + for role_name in role_names: + role = self.get_role_by_name(block, role_name) + if role is None: + continue + # Add missing roles + if dn in role_dns and role not in roles: + user.roles.add(role) + # Remove extra roles + elif dn not in role_dns and role in roles: + user.roles.remove(role) + def get_ldap_group_dns(self, user, dn, conn, block, attributes): '''Retrieve group DNs from the LDAP by attributes (memberOf) or by filter. ''' group_base_dn = block.get('group_basedn', block['basedn']) - member_of_attribute = block['member_of_attribute'] + group_member_of_attribute = block['group_member_of_attribute'] group_filter = block['group_filter'] group_dns = set() - if member_of_attribute: - member_of_attribute = str(member_of_attribute) - results = conn.search_s(dn, ldap.SCOPE_BASE, '', [member_of_attribute]) - group_dns.update(results[0][1].get(member_of_attribute, [])) + if group_member_of_attribute: + group_member_of_attribute = str(group_member_of_attribute) + results = conn.search_s(dn, ldap.SCOPE_BASE, '', [group_member_of_attribute]) + group_dns.update(results[0][1].get(group_member_of_attribute, [])) if group_filter: group_filter = str(group_filter) params = attributes.copy() @@ -540,12 +564,44 @@ class LDAPBackend(object): group_dns.update(dn for dn, attributes in results if dn) return group_dns + def get_ldap_role_dns(self, user, dn, conn, block, attributes): + '''Retrieve role DNs from the LDAP by attributes (memberOf) or by + filter. + ''' + role_base_dn = block.get('role_basedn', block['basedn']) + role_member_of_attribute = block['role_member_of_attribute'] + role_filter = block['role_filter'] + role_dns = set() + if role_member_of_attribute: + role_member_of_attribute = str(role_member_of_attribute) + results = conn.search_s(dn, ldap.SCOPE_BASE, '', [role_member_of_attribute]) + role_dns.update(results[0][1].get(role_member_of_attribute, [])) + if role_filter: + role_filter = str(role_filter) + params = attributes.copy() + params['user_dn'] = dn + query = FilterFormatter().format(role_filter, **params) + try: + results = conn.search_s(role_base_dn, ldap.SCOPE_SUBTREE, query, []) + except ldap.NO_SUCH_OBJECT: + pass + else: + # ignore referrals by checking if bool(dn) is True + role_dns.update(dn for dn, attributes in results if dn) + return role_dns + def populate_user_groups(self, user, dn, conn, block, attributes): group_dns = self.get_ldap_group_dns(user, dn, conn, block, attributes) log.debug('groups for dn %r: %r', dn, group_dns) self.populate_admin_flags_by_group(user, block, group_dns) self.populate_groups_by_mapping(user, dn, conn, block, group_dns) + def populate_user_roles(self, user, dn, conn, block, attributes): + role_dns = self.get_ldap_role_dns(user, dn, conn, block, attributes) + log.debug('roles for dn %r: %r', dn, role_dns) + # Admin flags by roles ? + self.populate_roles_by_mapping(user, dn, conn, block, role_dns) + def get_group_by_name(self, block, group_name, create=None): '''Obtain a Django group''' if create is None: @@ -621,6 +677,7 @@ class LDAPBackend(object): self.populate_mandatory_groups(user, block) self.populate_mandatory_roles(user, block) self.populate_user_groups(user, dn, conn, block, attributes) + self.populate_user_roles(user, dn, conn, block, attributes) def populate_user_ou(self, user, dn, conn, block, attributes): '''Assign LDAP user to an ou, the default one if ou_slug setting is diff --git a/tests/test_ldap.py b/tests/test_ldap.py index 14787338..067914f5 100644 --- a/tests/test_ldap.py +++ b/tests/test_ldap.py @@ -266,6 +266,52 @@ def test_posix_group_mapping(slapd, settings, client): @pytest.mark.django_db +def test_role_mapping(slapd, settings, client): + from authentic2.a2_rbac.models import Role + + settings.LDAP_AUTH_SETTINGS = [{ + 'url': [slapd.ldap_url], + 'basedn': 'o=orga', + 'use_tls': False, + 'create_role': True, + 'role_mapping': [ + ('cn=group1,o=orga', ['Group1']), + ], + }] + assert Role.objects.filter(name='Group1').count() == 0 + response = client.post('/login/', {'login-password-submit': '1', + 'username': USERNAME, + 'password': PASS}, follow=True) + assert Role.objects.filter(name='Group1').count() == 1 + assert response.context['user'].username == u'%s@ldap' % USERNAME + assert response.context['user'].roles.count() == 1 + + +@pytest.mark.django_db +def test_posix_group_to_role_mapping(slapd, settings, client): + from authentic2.a2_rbac.models import Role + + settings.LDAP_AUTH_SETTINGS = [{ + 'url': [slapd.ldap_url], + 'basedn': 'o=orga', + 'use_tls': False, + 'create_role': True, + 'role_mapping': [ + ('cn=group2,o=orga', ['Group2']), + ], + 'role_filter': '(&(memberUid={uid})(objectClass=posixGroup))', + }] + assert Role.objects.filter(name='Group2').count() == 0 + response = client.post('/login/', {'login-password-submit': '1', + 'username': USERNAME, + 'password': PASS}, follow=True) + assert Role.objects.filter(name='Group2').count() == 1 + assert response.context['user'].username == u'%s@ldap' % USERNAME + assert response.context['user'].roles.count() == 1 + + + +@pytest.mark.django_db def test_group_su(slapd, settings, client): from django.contrib.auth.models import Group -- 2.11.0