From d8286c886d02d7c968dede5632222929e5055f22 Mon Sep 17 00:00:00 2001 From: Paul Marillonnet Date: Mon, 4 Dec 2017 17:41:07 +0100 Subject: [PATCH] WIP ldap_backend: groups to A2 roles mapping (#16523) --- src/authentic2/backends/ldap_backend.py | 79 ++++++++++++++++++++++++++++----- tests/test_ldap.py | 46 +++++++++++++++++++ 2 files changed, 113 insertions(+), 12 deletions(-) diff --git a/src/authentic2/backends/ldap_backend.py b/src/authentic2/backends/ldap_backend.py index ba43f64d..785f3241 100644 --- a/src/authentic2/backends/ldap_backend.py +++ b/src/authentic2/backends/ldap_backend.py @@ -228,6 +228,7 @@ class LDAPBackend(object): 'groupstaff': None, 'groupactive': None, 'group_mapping': (), + 'group_to_role_mapping': (), 'replicas': True, 'email_field': 'mail', 'fname_field': 'givenName', @@ -542,6 +543,29 @@ class LDAPBackend(object): elif dn not in group_dns and group in groups: user.groups.remove(group) + def populate_roles_by_mapping(self, user, dn, conn, block, role_dns): + '''Assign role to user based on a mapping from group DNs''' + group_to_role_mapping = block.get('group_to_role_mapping') + if not group_to_role_mapping: + return + if not user.pk: + user.save() + user._changed = False + roles = user.roles.all() + for dn, role_names in group_to_role_mapping: + for role_name in role_names: + role, error = self.get_role(block, role_id=role_name) + if role is None: + log.warning('error %s: couldn\'t retrieve role %r', + error, role_name) + continue + # Add missing roles + if dn in role_dns and role not in roles: + user.roles.add(role) + # Remove extra roles + elif dn not in role_dns and role in roles: + user.roles.remove(role) + def get_ldap_group_dns(self, user, dn, conn, block, attributes): '''Retrieve group DNs from the LDAP by attributes (memberOf) or by filter. @@ -551,9 +575,7 @@ class LDAPBackend(object): group_filter = block['group_filter'] group_dns = set() if member_of_attribute: - member_of_attribute = str(member_of_attribute) - results = conn.search_s(dn, ldap.SCOPE_BASE, '', [member_of_attribute]) - group_dns.update(results[0][1].get(member_of_attribute, [])) + group_dns.update(attributes.get(member_of_attribute, [])) if group_filter: group_filter = str(group_filter) params = attributes.copy() @@ -574,6 +596,12 @@ class LDAPBackend(object): self.populate_admin_flags_by_group(user, block, group_dns) self.populate_groups_by_mapping(user, dn, conn, block, group_dns) + def populate_user_roles(self, user, dn, conn, block, attributes): + group_dns = self.get_ldap_group_dns(user, dn, conn, block, attributes) + log.debug('roles for dn %r: %r', dn, group_dns) + # TODO? Admin flags by roles? + self.populate_roles_by_mapping(user, dn, conn, block, group_dns) + def get_group_by_name(self, block, group_name, create=None): '''Obtain a Django group''' if create is None: @@ -587,18 +615,41 @@ class LDAPBackend(object): except Group.DoesNotExist: return None - def get_role_by_name(self, block, role_name, create=None): + def get_role(self, block, role_id, create=None): '''Obtain a Django role''' + kwargs = {} + slug = None if create is None: create = block['create_role'] - if create: - role, created = Role.objects.get_or_create(name=role_name) - return role - else: + if isinstance(role_id, basestring): + slug = role_id + elif isinstance(role_id, (tuple, list)): + try: + slug, ou__slug = role_id + kwargs = {'ou__slug': ou__slug} + except ValueError: + try: + slug, ou__slug, service__slug = role_id + kwargs = {'ou__slug': ou__slug, 'service__slug': service__slug} + except ValueError: + pass + if slug: try: - return Role.objects.get(name=role_name) + return Role.objects.get(slug=slug, **kwargs), None except Role.DoesNotExist: - return None + try: + if create: + role, _ = Role.objects.get_or_create(name=slug, **kwargs) + return role, None + else: + return Role.objects.get(name=slug, **kwargs), None + except Role.DoesNotExist: + error = ('role %r does not exist' % role_id) + except Role.MultipleObjectsReturned: + error = 'multiple objects returned, identifier is imprecise' + else: + error = 'invalid role identifier must be slug, (slug, ou__slug) or (slug, ou__slug, service__slug)' + return None, error def populate_mandatory_groups(self, user, block): mandatory_groups = block.get('set_mandatory_groups') @@ -625,8 +676,10 @@ class LDAPBackend(object): user._changed = False roles = user.roles.all() for role_name in mandatory_roles: - role = self.get_role_by_name(block, role_name) + role, error = self.get_role(block, role_id=role_name) if role is None: + log.warning('error %s: couldn\'t retrieve role %r', + error, role_name) continue if role not in roles: user.roles.add(role) @@ -649,6 +702,7 @@ class LDAPBackend(object): self.populate_mandatory_groups(user, block) self.populate_mandatory_roles(user, block) self.populate_user_groups(user, dn, conn, block, attributes) + self.populate_user_roles(user, dn, conn, block, attributes) def populate_user_ou(self, user, dn, conn, block, attributes): '''Assign LDAP user to an ou, the default one if ou_slug setting is @@ -679,7 +733,8 @@ class LDAPBackend(object): def get_ldap_attributes_names(cls, block): attributes = set() attributes.update(map(str, block['attributes'])) - for field in ('email_field', 'fname_field', 'lname_field'): + for field in ('email_field', 'fname_field', 'lname_field', + 'member_of_attribute'): if block[field]: attributes.add(block[field]) for external_id_tuple in block['external_id_tuples']: diff --git a/tests/test_ldap.py b/tests/test_ldap.py index 1ab923bf..72d565c7 100644 --- a/tests/test_ldap.py +++ b/tests/test_ldap.py @@ -272,6 +272,52 @@ def test_posix_group_mapping(slapd, settings, client): @pytest.mark.django_db +def test_group_to_role_mapping(slapd, settings, client): + from authentic2.a2_rbac.models import Role + + settings.LDAP_AUTH_SETTINGS = [{ + 'url': [slapd.ldap_url], + 'basedn': 'o=orga', + 'use_tls': False, + 'create_role': True, + 'group_to_role_mapping': [ + ('cn=group1,o=orga', ['Role1']), + ], + }] + assert Role.objects.filter(name='Role1').count() == 0 + response = client.post('/login/', {'login-password-submit': '1', + 'username': USERNAME, + 'password': PASS}, follow=True) + assert Role.objects.filter(name='Role1').count() == 1 + assert response.context['user'].username == u'%s@ldap' % USERNAME + assert response.context['user'].roles.count() == 1 + + +@pytest.mark.django_db +def test_posix_group_to_role_mapping(slapd, settings, client): + from authentic2.a2_rbac.models import Role + + settings.LDAP_AUTH_SETTINGS = [{ + 'url': [slapd.ldap_url], + 'basedn': 'o=orga', + 'use_tls': False, + 'create_role': True, + 'group_to_role_mapping': [ + ('cn=group2,o=orga', ['Role2']), + ], + 'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))', + }] + assert Role.objects.filter(name='Role2').count() == 0 + response = client.post('/login/', {'login-password-submit': '1', + 'username': USERNAME, + 'password': PASS}, follow=True) + assert Role.objects.filter(name='Role2').count() == 1 + assert response.context['user'].username == u'%s@ldap' % USERNAME + assert response.context['user'].roles.count() == 1 + + + +@pytest.mark.django_db def test_group_su(slapd, settings, client): from django.contrib.auth.models import Group -- 2.11.0