Projet

Général

Profil

0001-WIP-ldap_backend-groups-to-A2-roles-mapping-16523.patch

Paul Marillonnet, 05 décembre 2017 17:55

Télécharger (8,88 ko)

Voir les différences:

Subject: [PATCH] WIP ldap_backend: groups to A2 roles mapping (#16523)

 src/authentic2/backends/ldap_backend.py | 79 ++++++++++++++++++++++++++++-----
 tests/test_ldap.py                      | 46 +++++++++++++++++++
 2 files changed, 113 insertions(+), 12 deletions(-)
src/authentic2/backends/ldap_backend.py
228 228
        'groupstaff': None,
229 229
        'groupactive': None,
230 230
        'group_mapping': (),
231
        'group_to_role_mapping': (),
231 232
        'replicas': True,
232 233
        'email_field': 'mail',
233 234
        'fname_field': 'givenName',
......
542 543
                elif dn not in group_dns and group in groups:
543 544
                    user.groups.remove(group)
544 545

  
546
    def populate_roles_by_mapping(self, user, dn, conn, block, role_dns):
547
        '''Assign role to user based on a mapping from group DNs'''
548
        group_to_role_mapping = block.get('group_to_role_mapping')
549
        if not group_to_role_mapping:
550
            return
551
        if not user.pk:
552
            user.save()
553
            user._changed = False
554
        roles = user.roles.all()
555
        for dn, role_names in group_to_role_mapping:
556
            for role_name in role_names:
557
                role, error = self.get_role(block, role_id=role_name)
558
                if role is None:
559
                    log.warning('error %s: couldn\'t retrieve role %r',
560
                            error, role_name)
561
                    continue
562
                # Add missing roles
563
                if dn in role_dns and role not in roles:
564
                    user.roles.add(role)
565
                # Remove extra roles
566
                elif dn not in role_dns and role in roles:
567
                    user.roles.remove(role)
568

  
545 569
    def get_ldap_group_dns(self, user, dn, conn, block, attributes):
546 570
        '''Retrieve group DNs from the LDAP by attributes (memberOf) or by
547 571
           filter.
......
551 575
        group_filter = block['group_filter']
552 576
        group_dns = set()
553 577
        if member_of_attribute:
554
            member_of_attribute = str(member_of_attribute)
555
            results = conn.search_s(dn, ldap.SCOPE_BASE, '', [member_of_attribute])
556
            group_dns.update(results[0][1].get(member_of_attribute, []))
578
            group_dns.update(attributes.get(member_of_attribute, []))
557 579
        if group_filter:
558 580
            group_filter = str(group_filter)
559 581
            params = attributes.copy()
......
574 596
        self.populate_admin_flags_by_group(user, block, group_dns)
575 597
        self.populate_groups_by_mapping(user, dn, conn, block, group_dns)
576 598

  
599
    def populate_user_roles(self, user, dn, conn, block, attributes):
600
        group_dns = self.get_ldap_group_dns(user, dn, conn, block, attributes)
601
        log.debug('roles for dn %r: %r', dn, group_dns)
602
        # TODO? Admin flags by roles?
603
        self.populate_roles_by_mapping(user, dn, conn, block, group_dns)
604

  
577 605
    def get_group_by_name(self, block, group_name, create=None):
578 606
        '''Obtain a Django group'''
579 607
        if create is None:
......
587 615
            except Group.DoesNotExist:
588 616
                return None
589 617

  
590
    def get_role_by_name(self, block, role_name, create=None):
618
    def get_role(self, block, role_id, create=None):
591 619
        '''Obtain a Django role'''
620
        kwargs = {}
621
        slug = None
592 622
        if create is None:
593 623
            create = block['create_role']
594
        if create:
595
            role, created = Role.objects.get_or_create(name=role_name)
596
            return role
597
        else:
624
        if isinstance(role_id, basestring):
625
            slug = role_id
626
        elif isinstance(role_id, (tuple, list)):
627
            try:
628
                slug, ou__slug = role_id
629
                kwargs = {'ou__slug': ou__slug}
630
            except ValueError:
631
                try:
632
                    slug, ou__slug, service__slug = role_id
633
                    kwargs = {'ou__slug': ou__slug, 'service__slug': service__slug}
634
                except ValueError:
635
                    pass
636
        if slug:
598 637
            try:
599
                return Role.objects.get(name=role_name)
638
                return Role.objects.get(slug=slug, **kwargs), None
600 639
            except Role.DoesNotExist:
601
                return None
640
                try:
641
                    if create:
642
                        role, _ = Role.objects.get_or_create(name=slug, **kwargs)
643
                        return role, None
644
                    else:
645
                        return Role.objects.get(name=slug, **kwargs), None
646
                except Role.DoesNotExist:
647
                    error = ('role %r does not exist' % role_id)
648
            except Role.MultipleObjectsReturned:
649
                error = 'multiple objects returned, identifier is imprecise'
650
        else:
651
            error = 'invalid role identifier must be slug, (slug, ou__slug) or (slug, ou__slug, service__slug)'
652
        return None, error
602 653

  
603 654
    def populate_mandatory_groups(self, user, block):
604 655
        mandatory_groups = block.get('set_mandatory_groups')
......
625 676
            user._changed = False
626 677
        roles = user.roles.all()
627 678
        for role_name in mandatory_roles:
628
            role = self.get_role_by_name(block, role_name)
679
            role, error = self.get_role(block, role_id=role_name)
629 680
            if role is None:
681
                log.warning('error %s: couldn\'t retrieve role %r',
682
                        error, role_name)
630 683
                continue
631 684
            if role not in roles:
632 685
                user.roles.add(role)
......
649 702
        self.populate_mandatory_groups(user, block)
650 703
        self.populate_mandatory_roles(user, block)
651 704
        self.populate_user_groups(user, dn, conn, block, attributes)
705
        self.populate_user_roles(user, dn, conn, block, attributes)
652 706

  
653 707
    def populate_user_ou(self, user, dn, conn, block, attributes):
654 708
        '''Assign LDAP user to an ou, the default one if ou_slug setting is
......
679 733
    def get_ldap_attributes_names(cls, block):
680 734
        attributes = set()
681 735
        attributes.update(map(str, block['attributes']))
682
        for field in ('email_field', 'fname_field', 'lname_field'):
736
        for field in ('email_field', 'fname_field', 'lname_field',
737
                'member_of_attribute'):
683 738
            if block[field]:
684 739
                attributes.add(block[field])
685 740
        for external_id_tuple in block['external_id_tuples']:
tests/test_ldap.py
272 272

  
273 273

  
274 274
@pytest.mark.django_db
275
def test_group_to_role_mapping(slapd, settings, client):
276
    from authentic2.a2_rbac.models import Role
277

  
278
    settings.LDAP_AUTH_SETTINGS = [{
279
        'url': [slapd.ldap_url],
280
        'basedn': 'o=orga',
281
        'use_tls': False,
282
        'create_role': True,
283
        'group_to_role_mapping': [
284
            ('cn=group1,o=orga', ['Role1']),
285
        ],
286
    }]
287
    assert Role.objects.filter(name='Role1').count() == 0
288
    response = client.post('/login/', {'login-password-submit': '1',
289
                                       'username': USERNAME,
290
                                       'password': PASS}, follow=True)
291
    assert Role.objects.filter(name='Role1').count() == 1
292
    assert response.context['user'].username == u'%s@ldap' % USERNAME
293
    assert response.context['user'].roles.count() == 1
294

  
295

  
296
@pytest.mark.django_db
297
def test_posix_group_to_role_mapping(slapd, settings, client):
298
    from authentic2.a2_rbac.models import Role
299

  
300
    settings.LDAP_AUTH_SETTINGS = [{
301
        'url': [slapd.ldap_url],
302
        'basedn': 'o=orga',
303
        'use_tls': False,
304
        'create_role': True,
305
        'group_to_role_mapping': [
306
            ('cn=group2,o=orga', ['Role2']),
307
        ],
308
        'group_filter': '(&(memberUid={uid})(objectClass=posixGroup))',
309
    }]
310
    assert Role.objects.filter(name='Role2').count() == 0
311
    response = client.post('/login/', {'login-password-submit': '1',
312
                                       'username': USERNAME,
313
                                       'password': PASS}, follow=True)
314
    assert Role.objects.filter(name='Role2').count() == 1
315
    assert response.context['user'].username == u'%s@ldap' % USERNAME
316
    assert response.context['user'].roles.count() == 1
317

  
318

  
319

  
320
@pytest.mark.django_db
275 321
def test_group_su(slapd, settings, client):
276 322
    from django.contrib.auth.models import Group
277 323

  
278
-