Projet

Général

Profil

0002-add-ldap-connector-66533.patch

Benjamin Dauvergne, 03 août 2022 01:05

Télécharger (31,1 ko)

Voir les différences:

Subject: [PATCH 2/2] add ldap connector (#66533)

 passerelle/apps/ldap/__init__.py              |   0
 .../apps/ldap/migrations/0001_initial.py      |  63 ++++
 passerelle/apps/ldap/migrations/__init__.py   |   0
 passerelle/apps/ldap/models.py                | 311 ++++++++++++++++++
 passerelle/settings.py                        |   1 +
 passerelle/utils/forms.py                     |   5 +
 passerelle/utils/models.py                    |  15 +
 passerelle/views.py                           |   1 +
 setup.py                                      |   2 +
 tests/ldap/__init__.py                        |   0
 tests/ldap/cert.pem                           |  19 ++
 tests/ldap/conftest.py                        |  76 +++++
 tests/ldap/key.pem                            |  28 ++
 tests/ldap/test_model.py                      | 189 +++++++++++
 tox.ini                                       |   1 +
 15 files changed, 711 insertions(+)
 create mode 100644 passerelle/apps/ldap/__init__.py
 create mode 100644 passerelle/apps/ldap/migrations/0001_initial.py
 create mode 100644 passerelle/apps/ldap/migrations/__init__.py
 create mode 100644 passerelle/apps/ldap/models.py
 create mode 100644 tests/ldap/__init__.py
 create mode 100644 tests/ldap/cert.pem
 create mode 100644 tests/ldap/conftest.py
 create mode 100644 tests/ldap/key.pem
 create mode 100644 tests/ldap/test_model.py
passerelle/apps/ldap/migrations/0001_initial.py
1
# Generated by Django 3.2.14 on 2022-08-02 14:37
2

  
3
from django.db import migrations, models
4

  
5
import passerelle.utils.models
6

  
7

  
8
class Migration(migrations.Migration):
9

  
10
    initial = True
11

  
12
    dependencies = [
13
        ('base', '0029_auto_20210202_1627'),
14
    ]
15

  
16
    operations = [
17
        migrations.CreateModel(
18
            name='Resource',
19
            fields=[
20
                (
21
                    'id',
22
                    models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
23
                ),
24
                ('title', models.CharField(max_length=50, verbose_name='Title')),
25
                ('slug', models.SlugField(unique=True, verbose_name='Identifier')),
26
                ('description', models.TextField(verbose_name='Description')),
27
                ('ldap_url', passerelle.utils.models.LDAPURLField(max_length=512, verbose_name='Server URL')),
28
                ('ldap_base_dn', models.CharField(max_length=256, verbose_name='Base DN of the directory')),
29
                (
30
                    'ldap_bind_dn',
31
                    models.CharField(blank=True, max_length=256, null=True, verbose_name='Bind DN'),
32
                ),
33
                (
34
                    'ldap_bind_password',
35
                    models.CharField(blank=True, max_length=128, null=True, verbose_name='Bind password'),
36
                ),
37
                (
38
                    'ldap_tls_cert',
39
                    passerelle.utils.models.BinaryFileField(
40
                        blank=True, max_length=32768, null=True, verbose_name='TLS client certificate'
41
                    ),
42
                ),
43
                (
44
                    'ldap_tls_key',
45
                    passerelle.utils.models.BinaryFileField(
46
                        blank=True, max_length=32768, null=True, verbose_name='TLS client key'
47
                    ),
48
                ),
49
                (
50
                    'users',
51
                    models.ManyToManyField(
52
                        blank=True,
53
                        related_name='_ldap_resource_users_+',
54
                        related_query_name='+',
55
                        to='base.ApiUser',
56
                    ),
57
                ),
58
            ],
59
            options={
60
                'verbose_name': 'LDAP',
61
            },
62
        ),
63
    ]
passerelle/apps/ldap/models.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import base64
18
import contextlib
19
import tempfile
20

  
21
import ldap
22
import ldap.filter
23
from django.core.exceptions import ValidationError
24
from django.db import models
25
from django.utils.html import format_html
26
from django.utils.translation import gettext_lazy as _
27
from OpenSSL import crypto
28

  
29
from passerelle.base.models import BaseResource
30
from passerelle.utils.api import endpoint
31
from passerelle.utils.jsonresponse import APIError
32
from passerelle.utils.models import BinaryFileField, LDAPURLField
33
from passerelle.utils.templates import render_to_string
34

  
35

  
36
def validate_certificate(value):
37
    try:
38
        crypto.load_certificate(crypto.FILETYPE_PEM, bytes(value))
39
    except Exception:
40
        raise ValidationError(_('Invalid private key.'))
41

  
42

  
43
def validate_private_key(value):
44
    try:
45
        crypto.load_privatekey(crypto.FILETYPE_PEM, bytes(value))
46
    except Exception:
47
        raise ValidationError(_('Invalid private key.'))
48

  
49

  
50
class Resource(BaseResource):
51
    ldap_url = LDAPURLField(verbose_name=_('Server URL'), max_length=512)
52
    ldap_base_dn = models.CharField(verbose_name=_('Base DN of the directory'), max_length=256)
53
    ldap_bind_dn = models.CharField(verbose_name=_('Bind DN'), max_length=256, null=True, blank=True)
54
    ldap_bind_password = models.CharField(
55
        verbose_name=_('Bind password'), max_length=128, null=True, blank=True
56
    )
57
    ldap_tls_cert = BinaryFileField(
58
        verbose_name=_('TLS client certificate'),
59
        max_length=1024 * 32,
60
        null=True,
61
        blank=True,
62
        validators=[validate_certificate],
63
    )
64
    ldap_tls_key = BinaryFileField(
65
        verbose_name=_('TLS client key'),
66
        max_length=1024 * 32,
67
        null=True,
68
        blank=True,
69
        validators=[validate_private_key],
70
    )
71

  
72
    category = _('Misc')
73

  
74
    class Meta:
75
        verbose_name = _('LDAP')
76

  
77
    def tls_cert(self, value):
78
        try:
79
            cert = crypto.load_certificate(crypto.FILETYPE_PEM, bytes(value))
80
            name = ','.join(
81
                '%s=%s' % (a.decode(), b.decode()) for a, b in cert.get_subject().get_components()
82
            )
83
        except Exception:
84
            name = ('%s bytes') % len(value)
85
        return format_html(
86
            '<a href="data:application/octet-string;base64,{}" target="_blank" download="tls.crt">{}<a/>',
87
            base64.b64encode(value).decode(),
88
            name,
89
        )
90

  
91
    def clean(self):
92
        if bool(self.ldap_bind_dn) != bool(self.ldap_bind_password):
93
            raise ValidationError('Bind DN and password must be set together.')
94
        if bool(self.ldap_tls_cert) != bool(self.ldap_tls_key):
95
            raise ValidationError('Client certificate and key must be set together.')
96

  
97
    def get_description_fields(self):
98
        fields = super().get_description_fields()
99
        fields = [
100
            (field, self.tls_cert(value) if field.name == 'ldap_tls_cert' and value else value)
101
            for field, value in fields
102
        ]
103
        return fields
104

  
105
    def check_status(self):
106
        with self.get_connection() as conn:
107
            conn.whoami_s()
108

  
109
    @contextlib.contextmanager
110
    def get_connection(self):
111
        with contextlib.ExitStack() as stack:
112
            conn = ldap.initialize(self.ldap_url)
113
            conn.set_option(ldap.OPT_DEBUG_LEVEL, 255)
114
            conn.set_option(ldap.OPT_X_TLS_REQUIRE_SAN, ldap.OPT_X_TLS_NEVER)
115
            conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
116
            if self.ldap_tls_cert and self.ldap_tls_key:
117
                cert_tempfile = stack.enter_context(tempfile.NamedTemporaryFile())
118
                cert_tempfile.write(self.ldap_tls_cert)
119
                cert_tempfile.flush()
120
                key_tempfile = stack.enter_context(tempfile.NamedTemporaryFile())
121
                key_tempfile.write(self.ldap_tls_key)
122
                key_tempfile.flush()
123

  
124
                conn.set_option(ldap.OPT_X_TLS_CACERTFILE, cert_tempfile.name)
125
                conn.set_option(ldap.OPT_X_TLS_CERTFILE, cert_tempfile.name)
126
                conn.set_option(ldap.OPT_X_TLS_KEYFILE, key_tempfile.name)
127
            conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
128
            if self.ldap_bind_dn:
129
                conn.simple_bind_s(self.ldap_bind_dn, self.ldap_bind_password or '')
130
            else:
131
                conn.simple_bind_s()
132
            yield conn
133
            conn.unbind()
134

  
135
    def ldap_search(self, base_dn, scope, ldap_filter, ldap_attributes, sizelimit=-1, timeout=5):
136
        with self.get_connection() as conn:
137
            message_id = conn.search_ext(
138
                base_dn, scope, ldap_filter, ldap_attributes, timeout=timeout, sizelimit=sizelimit
139
            )
140
            while True:
141
                try:
142
                    result_type, entries = conn.result(message_id, all=0)
143
                except ldap.SIZELIMIT_EXCEEDED:
144
                    break
145
                if not entries:
146
                    break
147
                for dn, attributes in entries:
148
                    if dn:
149
                        decoded_attributes = cidict()
150
                        # decode values to unicode, if possible, and keep only the first value
151
                        for k, values in attributes.items():
152
                            decoded_values = []
153
                            for value in values:
154
                                try:
155
                                    decoded_values.append(value.decode())
156
                                except UnicodeDecodeError:
157
                                    pass
158
                            if decoded_values:
159
                                if len(decoded_values) == 1:
160
                                    decoded_attributes[k] = decoded_values[0]
161
                                else:
162
                                    decoded_attributes[k] = decoded_values
163
                        yield dn, decoded_attributes
164

  
165
    @endpoint(
166
        description=_('Search'),
167
        perm='can_access',
168
        parameters={
169
            'ldap_base_dn': {
170
                'description': _('Base DN for the LDAP search'),
171
                'example_value': 'dc=company,dc=com',
172
            },
173
            'search_attribute': {
174
                'description': _('Attribute to search for the substring search'),
175
                'example_value': 'cn',
176
            },
177
            'id_attribute': {
178
                'description': _('Attribute used as a unique identifier'),
179
                'example_value': 'uid',
180
            },
181
            'text_template': {
182
                'description': _(
183
                    'Optional template string based on LDAP attributes '
184
                    'to create a text value, if none given the search_attribute is used'
185
                ),
186
                'example_value': '{{ givenName }} {{ surname }}',
187
            },
188
            'ldap_attributes': {
189
                'description': _('Space separated list of LDAP attributes to retrieve'),
190
                'example_value': 'l sn givenName locality',
191
            },
192
            'id': {
193
                'description': _('Identifier for exacte retrieval, using the id_attribute'),
194
                'example_value': 'johndoe',
195
            },
196
            'q': {
197
                'description': _('Substring to search in the search_attribute'),
198
                'example_value': 'John Doe',
199
            },
200
            'sizelimit': {
201
                'description': _('Maximum number of entries to retrieve, between 1 and 200, default is 30.')
202
            },
203
            'scope': {
204
                'description': _('Scope of the LDAP search, subtree or onelevel, default is subtree.'),
205
            },
206
            'filter': {
207
                'description': _('Extra LDAP filter.'),
208
                'example_value': 'objectClass=*',
209
            },
210
        },
211
    )
212
    def search(
213
        self,
214
        request,
215
        ldap_base_dn,
216
        search_attribute,
217
        id_attribute,
218
        text_template=None,
219
        ldap_attributes=None,
220
        id=None,
221
        q=None,
222
        sizelimit=None,
223
        scope=None,
224
        filter=None,
225
    ):
226
        search_attribute = search_attribute.lower()
227
        id_attribute = id_attribute.lower()
228
        if not search_attribute.isascii():
229
            raise APIError('search_attribute contains non ASCII characters')
230
        if not id_attribute.isascii():
231
            raise APIError('id_attribute contains non ASCII characters')
232
        ldap_attributes = set(ldap_attributes.split()) if ldap_attributes else set()
233
        ldap_attributes.update([search_attribute, id_attribute])
234
        if not all(attribute.isascii() for attribute in ldap_attributes):
235
            raise APIError('ldap_attributes contains non ASCII characters')
236
        try:
237
            sizelimit = int(sizelimit)
238
        except (ValueError, TypeError):
239
            pass
240
        sizelimit = max(1, min(sizelimit or 30, 200))
241
        if not q and not id:
242
            raise APIError('q or id are mandatory parameters', http_status=400)
243
        if id:
244
            ldap_filter = '(%s=%s)' % (id_attribute, ldap.filter.escape_filter_chars(id))
245
        elif q:
246
            ldap_filter = '(%s=*%s*)' % (search_attribute, ldap.filter.escape_filter_chars(q))
247
        if filter:
248
            if not filter.startswith('('):
249
                filter = '(%s)' % filter
250
            ldap_filter = '(&%s%s)' % (ldap_filter, filter)
251
        print('ldap_filter', ldap_filter)
252
        scopes = {
253
            'subtree': ldap.SCOPE_SUBTREE,
254
            'onelevel': ldap.SCOPE_ONELEVEL,
255
        }
256
        scope = scopes.get(scope, ldap.SCOPE_SUBTREE)
257
        try:
258
            entries = list(
259
                self.ldap_search(ldap_base_dn, scope, ldap_filter, ldap_attributes, sizelimit=sizelimit)
260
            )
261
        except ldap.LDAPError as e:
262
            return {
263
                'err': 1,
264
                'data': [
265
                    {
266
                        'id': '',
267
                        'text': _('Directory server is unavailable'),
268
                        'disabled': True,
269
                    }
270
                ],
271
                'err_clss': 'directory-server-unavailable',
272
                'err_desc': str(e),
273
            }
274
        data = []
275
        for dn, attributes in entries:
276
            entry_id = attributes.get(id_attribute)
277
            if not entry_id:
278
                continue
279
            if text_template:
280
                entry_text = render_to_string(text_template, attributes)
281
            else:
282
                entry_text = attributes.get(search_attribute)
283
            data.append(
284
                {
285
                    'id': entry_id,
286
                    'text': entry_text,
287
                    'dn': dn,
288
                    'attributes': attributes,
289
                }
290
            )
291
        data.sort(key=lambda x: (x['text'], x['id']))
292
        return {'data': data}
293

  
294

  
295
# use a case-insensitive dictionnary to handle map of attribute to values.
296

  
297

  
298
class cidict(dict):
299
    '''Case insensitive dictionnary'''
300

  
301
    def __setitem__(self, key, value):
302
        super().__setitem__(key.lower(), value)
303

  
304
    def __getitem__(self, key):
305
        return super().__getitem__(key.lower())
306

  
307
    def __contains__(self, key):
308
        return super().__contains__(key.lower())
309

  
310
    def get(self, key, default=None, /):
311
        return super().get(key.lower(), default)
passerelle/settings.py
149 149
    'passerelle.apps.gesbac',
150 150
    'passerelle.apps.holidays',
151 151
    'passerelle.apps.jsondatastore',
152
    'passerelle.apps.ldap',
152 153
    'passerelle.apps.maelis',
153 154
    'passerelle.apps.mdel',
154 155
    'passerelle.apps.mdel_ddpacs',
passerelle/utils/forms.py
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17 17
from django import forms
18
from django.core import validators
19

  
20

  
21
class LDAPURLField(forms.URLField):
22
    default_validators = [validators.URLValidator(schemes=['ldap', 'ldaps'])]
18 23

  
19 24

  
20 25
class BinaryFileInput(forms.ClearableFileInput):
passerelle/utils/models.py
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17
from django.core import validators
17 18
from django.db import models
18 19

  
19 20

  
21
class LDAPURLField(models.URLField):
22
    default_validators = [validators.URLValidator(schemes=['ldap', 'ldaps'])]
23

  
24
    def formfield(self, **kwargs):
25
        from .forms import LDAPURLField
26

  
27
        return super().formfield(
28
            **{
29
                'form_class': LDAPURLField,
30
                **kwargs,
31
            }
32
        )
33

  
34

  
20 35
class BinaryFileField(models.BinaryField):
21 36
    def __init__(self, *args, **kwargs):
22 37
        kwargs.setdefault('editable', True)
passerelle/views.py
501 501
        try:
502 502
            sig.bind(request, **params)
503 503
        except TypeError:
504
            raise
504 505
            # prevent errors if using name of an ignored parameter in an endpoint argspec
505 506
            ignored = set(parameters) & set(IGNORED_PARAMS)
506 507
            assert not ignored, 'endpoint %s has ignored parameter %s' % (request.path, ignored)
setup.py
165 165
        'pytz',
166 166
        'vobject',
167 167
        'Levenshtein',
168
        'python-ldap',
169
        'pyOpenSSL',
168 170
    ],
169 171
    cmdclass={
170 172
        'build': build,
tests/ldap/cert.pem
1
-----BEGIN CERTIFICATE-----
2
MIIDBjCCAe6gAwIBAgIUTKopT76CFlsVcI7FAilaYLILz0owDQYJKoZIhvcNAQEL
3
BQAwIzEhMB8GA1UEAwwYbG9jYWxob3N0LmVudHJvdXZlcnQub3JnMB4XDTE4MTIw
4
NTE2NTkyNFoXDTI4MTIwMjE2NTkyNFowIzEhMB8GA1UEAwwYbG9jYWxob3N0LmVu
5
dHJvdXZlcnQub3JnMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4xsL
6
n25yittbjk5bcKvY2I8zPivL6YWn2MJimaQQSNzCw/8POmVPLmMIb3lcZjydFRad
7
+RTxZfnuvCCJrnGrG7hOsJNenTLLU0ugN/yQ1869cM07a9tjSzL7NCz9H1NIK1+Q
8
cBsTExc77dOWpwWI9TjqYYRL+zex3ml8cdqcQ7BQUQxAvA4UU63DM2G+5O3dE7l8
9
uvyBUU3kW/shHyhfweWNXO8IXXIjvDfPYkOsjc6en2kFMr+sENSUKgfDKjz/Uzqy
10
S7LBb4tkJALZM8QP56VeQAG1JZF2J2/y1RqBfIGRIEkYoaHcj6UATZa1xcZjMubL
11
z3otRNYcRXKJMYWGbQIDAQABozIwMDAJBgNVHRMEAjAAMCMGA1UdEQQcMBqCGGxv
12
Y2FsaG9zdC5lbnRyb3V2ZXJ0Lm9yZzANBgkqhkiG9w0BAQsFAAOCAQEAFVPavBah
13
mIjgnTjq6ZbFxXTNJW0TrqN8olbKJ6SfwWVk0I8px7POekFaXd+egsFJlWYyH9q4
14
HkKotddRYYrWoXcPiodNfUa+bRnh2WYl2rEGMW5dbBf/MYCDts68c3SoA7JIYJ8w
15
0QZGAkijKNtVML0/FrLuJWbfFBAWH8JB46BcAg/8flbMHAULzV3F1g/v0A3FG3Y/
16
9fVr+lN5qs+NB9NXIMdf5wXrmJQYRjotyOjUO6yTFqDFvqE7DEpKQD5hnvqJoXCz
17
zYQS1DjH1qSRc5vC8I7YlJowCfnI9MsEICSrsk75DhT091aJC2XX93o4zhfNxmO5
18
Kj28hP87GHgNIg==
19
-----END CERTIFICATE-----
tests/ldap/conftest.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import contextlib
18
import socket
19

  
20
import pytest
21
from ldaptools.slapd import Slapd, has_slapd
22

  
23
pytestmark = pytest.mark.skipif(not has_slapd(), reason='slapd is not installed')
24

  
25

  
26
def find_free_tcp_port():
27
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
28
        s.bind(('', 0))
29
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
30
        return s.getsockname()[1]
31

  
32

  
33
@pytest.fixture
34
def ldap_params():
35
    return {
36
        'ldap_url': 'ldap://localhost.entrouvert.org:%s' % find_free_tcp_port(),
37
    }
38

  
39

  
40
@pytest.fixture
41
def ldap_object(ldap_params):
42
    with Slapd(**ldap_params) as slapd:
43
        yield slapd
44

  
45

  
46
@pytest.fixture
47
def ldap_configure():
48
    pass
49

  
50

  
51
@pytest.fixture
52
def ldap_server(ldap_object, ldap_configure):
53
    return ldap_object
54

  
55

  
56
@pytest.fixture
57
def resource_class(db):
58
    from passerelle.apps.ldap.models import Resource
59

  
60
    return Resource
61

  
62

  
63
@pytest.fixture
64
def resource_params(ldap_params):
65
    return {
66
        'title': 'resource',
67
        'slug': 'resource',
68
        'description': 'resource',
69
        'ldap_url': ldap_params['ldap_url'],
70
        'ldap_base_dn': 'o=orga',
71
    }
72

  
73

  
74
@pytest.fixture
75
def resource(resource_class, resource_params):
76
    return resource_class(**resource_params)
tests/ldap/key.pem
1
-----BEGIN PRIVATE KEY-----
2
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDjGwufbnKK21uO
3
Tltwq9jYjzM+K8vphafYwmKZpBBI3MLD/w86ZU8uYwhveVxmPJ0VFp35FPFl+e68
4
IImucasbuE6wk16dMstTS6A3/JDXzr1wzTtr22NLMvs0LP0fU0grX5BwGxMTFzvt
5
05anBYj1OOphhEv7N7HeaXxx2pxDsFBRDEC8DhRTrcMzYb7k7d0TuXy6/IFRTeRb
6
+yEfKF/B5Y1c7whdciO8N89iQ6yNzp6faQUyv6wQ1JQqB8MqPP9TOrJLssFvi2Qk
7
AtkzxA/npV5AAbUlkXYnb/LVGoF8gZEgSRihodyPpQBNlrXFxmMy5svPei1E1hxF
8
cokxhYZtAgMBAAECggEAOUZI2BxyprJLlMgOJ4wvU+5JbhR9iJc8jV34n+bQdI+4
9
TtW0cXW7UmeHaRWiR+Zhd0AM9xRhDObLXoaWMnhYPtVsgvunkN2OiaM49OWtYb+x
10
5xDbO4hIsl5ZG/98lrnaKZYgRyWM2fOyGXiTNewfbji8Y3uJ7gFNylmwGMaZQjhr
11
YNaqNEV7Vs2n7oERxqzKG9947oBAx2hpmoaW6eMyXcWl2ov7iHpJSKUBKho+5PWc
12
J731no0OsGuS+3jHa/0nZXrT8nKmemyDMdSfWmtTv659L/guFInZpHPfFVLf56vc
13
J6zb/IzEJV+Nh7CfBsMbHlTYBeUFlRWsy9t70+OZwQKBgQD3J4aVN4vJhXX1zDgL
14
dVAczwLGKXY38BoBjOeRtVhXHs5p/eNIqeZ2YbYwBBy3nL414Un7gqb9fDtg0i3n
15
5mQIOWhvpIYUxtwIPgYwzumxxp/n7XdU4BPDbxejZxkuC7AR5bB34pwJAJvWRGEf
16
0X1TxJlqULhiZ6g18O3S0oiJtwKBgQDrO9ROaj6kkxjYHmljBZIXXhdKDcn0AqPi
17
w20Aaafx0oxNQAoq8Gtu22Z1QHwRdBeUJwqCbmHVCCwbMf/568zFAANuT9bKMe6X
18
J0p0nTDiyn8w9MfduFuG4cUMn4oK6dIuYlscguoPQCvQdciwG+djqwTrHib5TEbm
19
jeKEkY2A+wKBgQDvXt+wy2BeqBzMF6M8Lb2OeUwVgniVyrxVPhPVgk5x6ks+SoAD
20
k1G62/3o2UK67lsmsfDGYA69uMGFj2qYjAHcGUW1wyF9I/BdJz01rmCWJmoe5VXK
21
5U8e3AyH3MV9XCKF4vCb2+UFrwo/ZnCusWVxaRqw5kb+P6ihvZuIsRE+VwKBgQC7
22
2duBg3bjFlUQwbiHSzuPTaRrjvdn1XPq8wVo/vcPNoS0bB+yiqxAqxT3LbfmeD8c
23
INFTt7KI3S3byeIRQy0TZR9YSInOjnFqZAYheiY/9lX8Un4Jod/1pvYlToJ+lJs0
24
T3dTHXitFSHoJydM+/ucrEYRPNMC4tb75vKty06lYQKBgQCx49+g5kQaaRZ+4psw
25
+eolMpAwKDkpK5gYen6OsrT8m4hpxTmtiteMsH5Avb/fxqoJWLOjhN4EnEZTMJzr
26
LyGoKsTv7rhZwhRznE15rOzxmldWrcCkl7DGuM2GcKgguhCYF7U7KA+vUCeqCE0H
27
LA2grkY+TxFpg1pwYdF1hekmTw==
28
-----END PRIVATE KEY-----
tests/ldap/test_model.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import os.path
18

  
19
import ldap
20
import pytest
21

  
22
base_dir = os.path.dirname(__file__)
23
cert_file = os.path.join(base_dir, 'cert.pem')
24
key_file = os.path.join(base_dir, 'key.pem')
25

  
26

  
27
def test_get_connection(resource):
28
    resource.get_connection()
29

  
30

  
31
class TestCheckStatus:
32
    def test_nok(self, resource):
33
        with pytest.raises(ldap.LDAPError):
34
            resource.check_status()
35

  
36
    def test_ok(self, resource, ldap_server):
37
        resource.check_status()
38

  
39

  
40
class TestTLSAuthentication:
41
    @pytest.fixture
42
    def ldap_params(self, ldap_params):
43
        ldap_params['ldap_url'] = ldap_params['ldap_url'].replace('ldap:', 'ldaps:')
44
        return {**ldap_params, 'tls': (key_file, cert_file)}
45

  
46
    @pytest.fixture
47
    def ldap_configure(self, ldap_object):
48
        conn = ldap_object.get_connection_admin()
49
        conn.modify_s(
50
            'cn=config',
51
            [
52
                (ldap.MOD_ADD, 'olcTLSCACertificateFile', cert_file.encode()),
53
                (ldap.MOD_ADD, 'olcTLSVerifyClient', b'demand'),
54
            ],
55
        )
56

  
57
    @pytest.fixture
58
    def resource_params(self, resource_params):
59
        with open(cert_file, 'rb') as cert_file_fd, open(key_file, 'rb') as key_file_fd:
60
            return {
61
                **resource_params,
62
                'ldap_tls_cert': cert_file_fd.read(),
63
                'ldap_tls_key': key_file_fd.read(),
64
            }
65

  
66
    def test_ok(self, resource, ldap_server):
67
        resource.check_status()
68

  
69

  
70
class TestLdapSearch:
71
    def test_nok(self, resource):
72
        with pytest.raises(ldap.LDAPError):
73
            list(resource.ldap_search('o=orga', ldap.SCOPE_SUBTREE, 'objectClass=*', ['*']))
74

  
75
    def test_ok(self, resource, ldap_server):
76
        entries = list(resource.ldap_search('o=orga', ldap.SCOPE_SUBTREE, 'objectClass=*', ['*']))
77
        assert entries == [('o=orga', {'o': 'orga', 'objectclass': 'organization'})]
78

  
79

  
80
class TestSearch:
81
    @pytest.fixture
82
    def ldap_configure(self, ldap_object):
83
        ldap_object.add_ldif(
84
            '''
85
dn: uid=johndoe,o=orga
86
objectClass: inetOrgPerson
87
uid: johndoe
88
cn: John Doe
89
sn: Doe
90
gn: John
91

  
92
dn: uid=janedoe,o=orga
93
objectClass: inetOrgPerson
94
uid: janedoe
95
cn: Jane Doe
96
sn: Doe
97
gn: Jane
98

  
99
dn: uid=janefoo,o=orga
100
objectClass: inetOrgPerson
101
uid: janefoo
102
cn: Jane Foo
103
sn: Foo
104
gn: Jane
105
'''
106
        )
107

  
108
    def test_server_unavailaible(self, resource):
109
        assert resource.search(
110
            request=None, q='Doe', ldap_base_dn='o=orga', search_attribute='cn', id_attribute='uid'
111
        ) == {
112
            'data': [{'disabled': True, 'id': '', 'text': 'Directory server is unavailable'}],
113
            'err': 1,
114
            'err_clss': 'directory-server-unavailable',
115
            'err_desc': '{\'result\': -1, \'desc\': "Can\'t contact LDAP server", '
116
            "'errno': 107, 'ctrls': [], 'info': 'Transport endpoint is not "
117
            "connected'}",
118
        }
119

  
120
    def test_q(self, resource, ldap_server):
121
        result = resource.search(
122
            request=None, q='Doe', ldap_base_dn='o=orga', search_attribute='cn', id_attribute='uid'
123
        )
124
        assert result == {
125
            'data': [
126
                {'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'}, 'id': 'janedoe', 'text': 'Jane Doe'},
127
                {'attributes': {'cn': 'John Doe', 'uid': 'johndoe'}, 'id': 'johndoe', 'text': 'John Doe'},
128
            ]
129
        }
130

  
131
    def test_id(self, resource, ldap_server):
132
        result = resource.search(
133
            request=None, id='janedoe', ldap_base_dn='o=orga', search_attribute='cn', id_attribute='uid'
134
        )
135
        assert result == {
136
            'data': [
137
                {
138
                    'attributes': {
139
                        'cn': 'Jane Doe',
140
                        'uid': 'janedoe',
141
                    },
142
                    'id': 'janedoe',
143
                    'text': 'Jane Doe',
144
                }
145
            ]
146
        }
147

  
148
    def test_q_sizelimit(self, resource, ldap_server):
149
        result = resource.search(
150
            request=None,
151
            q='Doe',
152
            ldap_base_dn='o=orga',
153
            search_attribute='cn',
154
            id_attribute='uid',
155
            sizelimit='1',
156
        )
157
        assert result == {
158
            'data': [
159
                {
160
                    'attributes': {
161
                        'cn': 'Jane Doe',
162
                        'uid': 'janedoe',
163
                    },
164
                    'id': 'janedoe',
165
                    'text': 'Jane Doe',
166
                }
167
            ]
168
        }
169

  
170
    def test_q_text_template(self, resource, ldap_server):
171
        result = resource.search(
172
            request=None,
173
            q='Doe',
174
            ldap_base_dn='o=orga',
175
            search_attribute='cn',
176
            id_attribute='uid',
177
            sizelimit='1',
178
            text_template='{{ sN }} {{ giVenName }} ({{ uId }})',
179
            ldap_attributes='givenname sn',
180
        )
181
        assert result == {
182
            'data': [
183
                {
184
                    'attributes': {'cn': 'Jane Doe', 'givenname': 'Jane', 'sn': 'Doe', 'uid': 'janedoe'},
185
                    'id': 'janedoe',
186
                    'text': 'Doe Jane (janedoe)',
187
                }
188
            ]
189
        }
tox.ini
46 46
  responses
47 47
  zeep<3.3
48 48
  codestyle: pre-commit
49
  ldaptools
49 50
commands =
50 51
  ./get_wcs.sh
51 52
  py.test {posargs: --numprocesses {env:NUMPROCESSES:1} --dist loadfile {env:FAST:} {env:COVERAGE:} {env:JUNIT:} tests/}
52
-