From 5612a78c78e1e477a42e565934150524b817c6db Mon Sep 17 00:00:00 2001 From: Paul Marillonnet Date: Wed, 6 Dec 2017 18:21:29 +0100 Subject: [PATCH] ldap_backend: groups to A2 roles mapping (#16523) --- src/authentic2/backends/ldap_backend.py | 75 ++++++++++++++++++++++++++++----- tests/test_ldap.py | 41 ++++++++++++++++++ 2 files changed, 106 insertions(+), 10 deletions(-) diff --git a/src/authentic2/backends/ldap_backend.py b/src/authentic2/backends/ldap_backend.py index 96f8e625..6a2b322b 100644 --- a/src/authentic2/backends/ldap_backend.py +++ b/src/authentic2/backends/ldap_backend.py @@ -228,6 +228,7 @@ class LDAPBackend(object): 'groupstaff': None, 'groupactive': None, 'group_mapping': (), + 'group_to_role_mapping': (), 'replicas': True, 'email_field': 'mail', 'fname_field': 'givenName', @@ -540,6 +541,29 @@ class LDAPBackend(object): elif dn not in group_dns and group in groups: user.groups.remove(group) + def populate_roles_by_mapping(self, user, dn, conn, block, role_dns): + '''Assign role to user based on a mapping from group DNs''' + group_to_role_mapping = block.get('group_to_role_mapping') + if not group_to_role_mapping: + return + if not user.pk: + user.save() + user._changed = False + roles = user.roles.all() + for dn, role_names in group_to_role_mapping: + for role_name in role_names: + role, error = self.get_role(block, role_id=role_name) + if role is None: + log.warning('error %s: couldn\'t retrieve role %r', + error, role_name) + continue + # Add missing roles + if dn in role_dns and role not in roles: + user.roles.add(role) + # Remove extra roles + elif dn not in role_dns and role in roles: + user.roles.remove(role) + def get_ldap_group_dns(self, user, dn, conn, block, attributes): '''Retrieve group DNs from the LDAP by attributes (memberOf) or by filter. @@ -549,9 +573,7 @@ class LDAPBackend(object): group_filter = block['group_filter'] group_dns = set() if member_of_attribute: - member_of_attribute = str(member_of_attribute) - results = conn.search_s(dn, ldap.SCOPE_BASE, '', [member_of_attribute]) - group_dns.update(results[0][1].get(member_of_attribute, [])) + group_dns.update(attributes.get(member_of_attribute, [])) if group_filter: group_filter = str(group_filter) params = attributes.copy() @@ -572,6 +594,12 @@ class LDAPBackend(object): self.populate_admin_flags_by_group(user, block, group_dns) self.populate_groups_by_mapping(user, dn, conn, block, group_dns) + def populate_user_roles(self, user, dn, conn, block, attributes): + group_dns = self.get_ldap_group_dns(user, dn, conn, block, attributes) + log.debug('roles for dn %r: %r', dn, group_dns) + # TODO? Admin flags by roles? + self.populate_roles_by_mapping(user, dn, conn, block, group_dns) + def get_group_by_name(self, block, group_name, create=None): '''Obtain a Django group''' if create is None: @@ -585,12 +613,35 @@ class LDAPBackend(object): except Group.DoesNotExist: return None - def get_role_by_name(self, block, role_name): + def get_role(self, block, role_id): '''Obtain a Django role''' - try: - return Role.objects.get(name=role_name) - except Role.DoesNotExist: - return None + kwargs = {} + slug = None + if isinstance(role_id, basestring): + slug = role_id + elif isinstance(role_id, (tuple, list)): + try: + slug, ou__slug = role_id + kwargs = {'ou__slug': ou__slug} + except ValueError: + try: + slug, ou__slug, service__slug = role_id + kwargs = {'ou__slug': ou__slug, 'service__slug': service__slug} + except ValueError: + pass + if slug: + try: + return Role.objects.get(slug=slug, **kwargs), None + except Role.DoesNotExist: + try: + return Role.objects.get(name=slug, **kwargs), None + except Role.DoesNotExist: + error = ('role %r does not exist' % role_id) + except Role.MultipleObjectsReturned: + error = 'multiple objects returned, identifier is imprecise' + else: + error = 'invalid role identifier must be slug, (slug, ou__slug) or (slug, ou__slug, service__slug)' + return None, error def populate_mandatory_groups(self, user, block): mandatory_groups = block.get('set_mandatory_groups') @@ -617,8 +668,10 @@ class LDAPBackend(object): user._changed = False roles = user.roles.all() for role_name in mandatory_roles: - role = self.get_role_by_name(block, role_name) + role, error = self.get_role(block, role_id=role_name) if role is None: + log.warning('error %s: couldn\'t retrieve role %r', + error, role_name) continue if role not in roles: user.roles.add(role) @@ -641,6 +694,7 @@ class LDAPBackend(object): self.populate_mandatory_groups(user, block) self.populate_mandatory_roles(user, block) self.populate_user_groups(user, dn, conn, block, attributes) + self.populate_user_roles(user, dn, conn, block, attributes) def populate_user_ou(self, user, dn, conn, block, attributes): '''Assign LDAP user to an ou, the default one if ou_slug setting is @@ -671,7 +725,8 @@ class LDAPBackend(object): def get_ldap_attributes_names(cls, block): attributes = set() attributes.update(map(str, block['attributes'])) - for field in ('email_field', 'fname_field', 'lname_field'): + for field in ('email_field', 'fname_field', 'lname_field', + 'member_of_attribute'): if block[field]: attributes.add(block[field]) for external_id_tuple in block['external_id_tuples']: diff --git a/tests/test_ldap.py b/tests/test_ldap.py index 1fc92a49..5a22dba5 100644 --- a/tests/test_ldap.py +++ b/tests/test_ldap.py @@ -272,6 +272,47 @@ def test_posix_group_mapping(slapd, settings, client): @pytest.mark.django_db +def test_group_to_role_mapping(slapd, settings, client): + from authentic2.a2_rbac.models import Role + + Role.objects.get_or_create(name='Role1') + settings.LDAP_AUTH_SETTINGS = [{ + 'url': [slapd.ldap_url], + 'basedn': 'o=orga', + 'use_tls': False, + 'group_to_role_mapping': [ + ('cn=group1,o=orga', ['Role1']), + ], + }] + response = client.post('/login/', {'login-password-submit': '1', + 'username': USERNAME, + 'password': PASS}, follow=True) + assert response.context['user'].username == u'%s@ldap' % USERNAME + assert response.context['user'].roles.count() == 1 + + +@pytest.mark.django_db +def test_posix_group_to_role_mapping(slapd, settings, client): + from authentic2.a2_rbac.models import Role + + Role.objects.get_or_create(name='Role2') + settings.LDAP_AUTH_SETTINGS = [{ + 'url': [slapd.ldap_url], + 'basedn': 'o=orga', + 'use_tls': False, + 'group_to_role_mapping': [ + ('cn=group2,o=orga', ['Role2']), + ], + 'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))', + }] + response = client.post('/login/', {'login-password-submit': '1', + 'username': USERNAME, + 'password': PASS}, follow=True) + assert response.context['user'].username == u'%s@ldap' % USERNAME + assert response.context['user'].roles.count() == 1 + + +@pytest.mark.django_db def test_group_su(slapd, settings, client): from django.contrib.auth.models import Group -- 2.11.0