Projet

Général

Profil

0001-add-ldap-connector-66533.patch

Benjamin Dauvergne, 05 août 2022 10:02

Télécharger (50,6 ko)

Voir les différences:

Subject: [PATCH] add ldap connector (#66533)

 passerelle/apps/ldap/__init__.py              |   0
 passerelle/apps/ldap/forms.py                 |  75 ++++
 .../apps/ldap/migrations/0001_initial.py      |  70 ++++
 passerelle/apps/ldap/migrations/__init__.py   |   0
 passerelle/apps/ldap/models.py                | 347 ++++++++++++++++++
 passerelle/base/models.py                     |   5 +
 .../widgets/clearable_file_input.html         |   5 +
 passerelle/settings.py                        |   1 +
 passerelle/utils/forms.py                     |  22 ++
 passerelle/utils/models.py                    |  33 ++
 setup.py                                      |   2 +
 tests/ldap/__init__.py                        |   0
 tests/ldap/cert.pem                           |  19 +
 tests/ldap/conftest.py                        | 117 ++++++
 tests/ldap/key.pem                            |  28 ++
 tests/ldap/test_manager.py                    | 149 ++++++++
 tests/ldap/test_model.py                      |  70 ++++
 tests/ldap/test_search_endpoint.py            | 178 +++++++++
 tox.ini                                       |   2 +
 19 files changed, 1123 insertions(+)
 create mode 100644 passerelle/apps/ldap/__init__.py
 create mode 100644 passerelle/apps/ldap/forms.py
 create mode 100644 passerelle/apps/ldap/migrations/0001_initial.py
 create mode 100644 passerelle/apps/ldap/migrations/__init__.py
 create mode 100644 passerelle/apps/ldap/models.py
 create mode 100644 passerelle/base/templates/passerelle/widgets/clearable_file_input.html
 create mode 100644 passerelle/utils/forms.py
 create mode 100644 passerelle/utils/models.py
 create mode 100644 tests/ldap/__init__.py
 create mode 100644 tests/ldap/cert.pem
 create mode 100644 tests/ldap/conftest.py
 create mode 100644 tests/ldap/key.pem
 create mode 100644 tests/ldap/test_manager.py
 create mode 100644 tests/ldap/test_model.py
 create mode 100644 tests/ldap/test_search_endpoint.py
passerelle/apps/ldap/forms.py
1
import base64
2

  
3
from django import forms
4
from django.core.exceptions import ValidationError
5
from django.core.files.base import ContentFile
6
from django.utils.translation import gettext_lazy as _
7
from OpenSSL import crypto
8

  
9
from passerelle.forms import GenericConnectorForm
10

  
11

  
12
def validate_certificate(value):
13
    try:
14
        crypto.load_certificate(crypto.FILETYPE_PEM, value.open().read())
15
    except Exception:
16
        raise ValidationError(_('Invalid certificate.'))
17

  
18

  
19
def validate_private_key(value):
20
    try:
21
        crypto.load_privatekey(crypto.FILETYPE_PEM, value.open().read())
22
    except Exception:
23
        raise ValidationError(_('Invalid private key.'))
24

  
25

  
26
class NamedContentFile(ContentFile):
27
    def __str__(self):
28
        return self.name or ''
29

  
30
    @property
31
    def url(self):
32
        return 'data:application/octet-stream;base64,%s' % base64.b64encode(self.file.getvalue()).decode()
33

  
34

  
35
class ResourceForm(GenericConnectorForm):
36
    tls_cert = forms.FileField(
37
        label=_('TLS client certificate'), required=False, validators=[validate_certificate]
38
    )
39
    tls_key = forms.FileField(label=_('TLS client key'), required=False, validators=[validate_private_key])
40

  
41
    def __init__(self, *args, **kwargs):
42
        instance = kwargs.get('instance')
43
        if instance:
44
            initial = kwargs.setdefault('initial', {})
45
            if instance.ldap_tls_cert:
46
                initial['tls_cert'] = NamedContentFile(instance.ldap_tls_cert, 'cert.pem')
47
            if instance.ldap_tls_key:
48
                initial['tls_key'] = NamedContentFile(instance.ldap_tls_key, 'key.pem')
49
        super().__init__(*args, **kwargs)
50
        for name in ['tls_cert', 'tls_key']:
51
            self.fields[name].widget.template_name = 'passerelle/widgets/clearable_file_input.html'
52

  
53
    def clean(self):
54
        tls_cert = self.cleaned_data.get('tls_cert')
55
        tls_key = self.cleaned_data.get('tls_key')
56
        ldap_bind_dn = self.cleaned_data.get('ldap_bind_dn')
57
        ldap_bind_password = self.cleaned_data.get('ldap_bind_password')
58
        if bool(ldap_bind_dn) != bool(ldap_bind_password):
59
            raise ValidationError('Bind DN and password must be set together.')
60
        if bool(tls_cert) != bool(tls_key):
61
            raise ValidationError('Client certificate and key must be set together.')
62
        if tls_cert in (None, False) or tls_key in (None, False):
63
            self.instance.ldap_tls_cert = None
64
            self.instance.ldap_tls_key = None
65
        else:
66
            tls_cert = tls_cert.open().read()
67
            tls_key = tls_key.open().read()
68
            self.instance.ldap_tls_cert = tls_cert
69
            self.instance.ldap_tls_key = tls_key
70

  
71
    def save(self, commit=True):
72
        return super().save(commit=commit)
73

  
74
    class Meta:
75
        exclude = ('ldap_tls_cert', 'ldap_tls_key')
passerelle/apps/ldap/migrations/0001_initial.py
1
# Generated by Django 3.2.14 on 2022-08-02 14:37
2

  
3
from django.db import migrations, models
4

  
5
import passerelle.utils.models
6

  
7

  
8
class Migration(migrations.Migration):
9

  
10
    initial = True
11

  
12
    dependencies = [
13
        ('base', '0029_auto_20210202_1627'),
14
    ]
15

  
16
    operations = [
17
        migrations.CreateModel(
18
            name='Resource',
19
            fields=[
20
                (
21
                    'id',
22
                    models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
23
                ),
24
                ('title', models.CharField(max_length=50, verbose_name='Title')),
25
                ('slug', models.SlugField(unique=True, verbose_name='Identifier')),
26
                ('description', models.TextField(verbose_name='Description')),
27
                ('ldap_url', passerelle.utils.models.LDAPURLField(max_length=512, verbose_name='Server URL')),
28
                (
29
                    'ldap_bind_dn',
30
                    models.CharField(blank=True, max_length=256, null=True, verbose_name='Bind DN'),
31
                ),
32
                (
33
                    'ldap_bind_password',
34
                    models.CharField(blank=True, max_length=128, null=True, verbose_name='Bind password'),
35
                ),
36
                (
37
                    'ldap_tls_cert',
38
                    models.BinaryField(
39
                        blank=True,
40
                        max_length=32768,
41
                        null=True,
42
                        verbose_name='TLS client certificate',
43
                        editable=True,
44
                    ),
45
                ),
46
                (
47
                    'ldap_tls_key',
48
                    models.BinaryField(
49
                        blank=True,
50
                        max_length=32768,
51
                        null=True,
52
                        verbose_name='TLS client key',
53
                        editable=True,
54
                    ),
55
                ),
56
                (
57
                    'users',
58
                    models.ManyToManyField(
59
                        blank=True,
60
                        related_name='_ldap_resource_users_+',
61
                        related_query_name='+',
62
                        to='base.ApiUser',
63
                    ),
64
                ),
65
            ],
66
            options={
67
                'verbose_name': 'LDAP',
68
            },
69
        ),
70
    ]
passerelle/apps/ldap/models.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import base64
18
import contextlib
19
import tempfile
20

  
21
import ldap
22
import ldap.filter
23
from django.core.cache import cache
24
from django.core.exceptions import ValidationError
25
from django.db import models
26
from django.utils.html import format_html
27
from django.utils.translation import gettext_lazy as _
28
from OpenSSL import crypto
29

  
30
from passerelle.base.models import BaseResource
31
from passerelle.utils.api import endpoint
32
from passerelle.utils.jsonresponse import APIError
33
from passerelle.utils.models import LDAPURLField
34
from passerelle.utils.templates import render_to_string
35

  
36
from . import forms
37

  
38

  
39
class Resource(BaseResource):
40
    ldap_url = LDAPURLField(verbose_name=_('Server URL'), max_length=512)
41
    ldap_bind_dn = models.CharField(verbose_name=_('Bind DN'), max_length=256, null=True, blank=True)
42
    ldap_bind_password = models.CharField(
43
        verbose_name=_('Bind password'), max_length=128, null=True, blank=True
44
    )
45
    ldap_tls_cert = models.BinaryField(
46
        verbose_name=_('TLS client certificate'),
47
        max_length=1024 * 32,
48
        null=True,
49
        blank=True,
50
        editable=True,
51
    )
52
    ldap_tls_key = models.BinaryField(
53
        verbose_name=_('TLS client key'),
54
        max_length=1024 * 32,
55
        null=True,
56
        blank=True,
57
        editable=True,
58
    )
59

  
60
    category = _('Misc')
61

  
62
    manager_form_base_class = forms.ResourceForm
63

  
64
    class Meta:
65
        verbose_name = _('LDAP')
66

  
67
    @classmethod
68
    def get_manager_form_class(cls, **kwargs):
69
        kwargs['exclude'] = ('ldap_tls_cert', 'ldap_tls_key') + kwargs.get('exclude', ())
70
        return super().get_manager_form_class(**kwargs)
71

  
72
    def tls_cert(self, value):
73
        try:
74
            cert = crypto.load_certificate(crypto.FILETYPE_PEM, bytes(value))
75
            name = ','.join(
76
                '%s=%s' % (a.decode(), b.decode()) for a, b in cert.get_subject().get_components()
77
            )
78
        except Exception:
79
            name = ('%s bytes') % len(value)
80
        return format_html(
81
            '<a href="data:application/octet-string;base64,{}" target="_blank" download="tls.crt">{}<a/>',
82
            base64.b64encode(value).decode(),
83
            name,
84
        )
85

  
86
    def clean(self):
87
        if bool(self.ldap_bind_dn) != bool(self.ldap_bind_password):
88
            raise ValidationError('Bind DN and password must be set together.')
89
        if bool(self.ldap_tls_cert) != bool(self.ldap_tls_key):
90
            raise ValidationError('Client certificate and key must be set together.')
91

  
92
    def get_description_fields(self):
93
        fields = super().get_description_fields()
94
        fields = [
95
            (field, self.tls_cert(value) if field.name == 'ldap_tls_cert' and value else value)
96
            for field, value in fields
97
        ]
98
        return fields
99

  
100
    def check_status(self):
101
        with self.get_connection() as conn:
102
            conn.whoami_s()
103

  
104
    @contextlib.contextmanager
105
    def get_connection(self):
106
        with contextlib.ExitStack() as stack:
107
            conn = ldap.initialize(self.ldap_url)
108
            conn.set_option(ldap.OPT_TIMEOUT, 5)
109
            conn.set_option(ldap.OPT_NETWORK_TIMEOUT, 5)
110
            conn.set_option(ldap.OPT_X_TLS_REQUIRE_SAN, ldap.OPT_X_TLS_NEVER)
111
            conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
112
            if self.ldap_tls_cert and self.ldap_tls_key:
113
                cert_tempfile = stack.enter_context(tempfile.NamedTemporaryFile())
114
                cert_tempfile.write(self.ldap_tls_cert)
115
                cert_tempfile.flush()
116
                key_tempfile = stack.enter_context(tempfile.NamedTemporaryFile())
117
                key_tempfile.write(self.ldap_tls_key)
118
                key_tempfile.flush()
119

  
120
                conn.set_option(ldap.OPT_X_TLS_CERTFILE, cert_tempfile.name)
121
                conn.set_option(ldap.OPT_X_TLS_KEYFILE, key_tempfile.name)
122
            conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
123
            if self.ldap_bind_dn:
124
                conn.simple_bind_s(self.ldap_bind_dn, self.ldap_bind_password or '')
125
            else:
126
                conn.simple_bind_s()
127
            yield conn
128
            conn.unbind()
129

  
130
    def ldap_search(self, base_dn, scope, ldap_filter, ldap_attributes, sizelimit=-1, timeout=5):
131
        with self.get_connection() as conn:
132
            message_id = conn.search_ext(
133
                base_dn, scope, ldap_filter, ldap_attributes, timeout=timeout, sizelimit=sizelimit
134
            )
135
            while True:
136
                try:
137
                    dummy, entries = conn.result(message_id, all=0)
138
                except ldap.SIZELIMIT_EXCEEDED:
139
                    break
140
                if not entries:
141
                    break
142
                for dn, attributes in entries:
143
                    if dn:
144
                        decoded_attributes = cidict()
145
                        # decode values to unicode, if possible, and keep only the first value
146
                        for k, values in attributes.items():
147
                            decoded_values = []
148
                            for value in values:
149
                                try:
150
                                    decoded_values.append(value.decode())
151
                                except UnicodeDecodeError:
152
                                    pass
153
                            if decoded_values:
154
                                if len(decoded_values) == 1:
155
                                    decoded_attributes[k] = decoded_values[0]
156
                                else:
157
                                    decoded_attributes[k] = decoded_values
158
                        yield dn, decoded_attributes
159

  
160
    def search(
161
        self,
162
        ldap_base_dn,
163
        scope,
164
        ldap_filter,
165
        ldap_attributes,
166
        sizelimit,
167
        id_attribute,
168
        search_attribute,
169
        text_template,
170
    ):
171
        ldap_attributes = tuple(sorted(ldap_attributes))
172
        cache_fingerprint = str(
173
            [
174
                ldap_base_dn,
175
                scope,
176
                ldap_filter,
177
                ldap_attributes,
178
                sizelimit,
179
                id_attribute,
180
                search_attribute,
181
                text_template,
182
            ]
183
        )
184
        cache_key = f'ldap-{self.id}-{hash(cache_fingerprint)}'
185
        cache_value = cache.get(cache_key)
186
        if cache_value and cache_value[0] == cache_fingerprint:
187
            return {'data': cache_value[1]}
188
        try:
189
            entries = list(
190
                self.ldap_search(ldap_base_dn, scope, ldap_filter, ldap_attributes, sizelimit=sizelimit)
191
            )
192
        except ldap.LDAPError as e:
193
            # add a disabled entry to show something on search errors, with display_disabled_items on w.c.s.
194
            return {
195
                'err': 1,
196
                'data': [
197
                    {
198
                        'id': '',
199
                        'text': _('Directory server is unavailable'),
200
                        'disabled': True,
201
                    }
202
                ],
203
                'err_clss': 'directory-server-unavailable',
204
                'err_desc': str(e),
205
            }
206
        data = []
207
        for dn, attributes in entries:
208
            entry_id = attributes.get(id_attribute)
209
            if not entry_id:
210
                continue
211
            if text_template:
212
                entry_text = render_to_string(text_template, attributes)
213
            else:
214
                entry_text = attributes.get(search_attribute)
215
            data.append(
216
                {
217
                    'id': entry_id,
218
                    'text': entry_text,
219
                    'dn': dn,
220
                    'attributes': attributes,
221
                }
222
            )
223
        data.sort(key=lambda x: (x['text'], x['id']))
224
        cache.set(cache_key, (cache_fingerprint, data))
225
        return {'data': data}
226

  
227
    @endpoint(
228
        description=_('Search'),
229
        name='search',
230
        perm='can_access',
231
        parameters={
232
            'ldap_base_dn': {
233
                'description': _('Base DN for the LDAP search'),
234
                'example_value': 'dc=company,dc=com',
235
            },
236
            'search_attribute': {
237
                'description': _('Attribute to search for the substring search'),
238
                'example_value': 'cn',
239
            },
240
            'id_attribute': {
241
                'description': _('Attribute used as a unique identifier'),
242
                'example_value': 'uid',
243
            },
244
            'text_template': {
245
                'description': _(
246
                    'Optional template string based on LDAP attributes '
247
                    'to create a text value, if none given the search_attribute is used'
248
                ),
249
                'example_value': '{{ givenName }} {{ surname }}',
250
            },
251
            'ldap_attributes': {
252
                'description': _('Space separated list of LDAP attributes to retrieve'),
253
                'example_value': 'l sn givenName locality',
254
            },
255
            'id': {
256
                'description': _('Identifier for exacte retrieval, using the id_attribute'),
257
                'example_value': 'johndoe',
258
            },
259
            'q': {
260
                'description': _('Substring to search in the search_attribute'),
261
                'example_value': 'John Doe',
262
            },
263
            'sizelimit': {
264
                'description': _('Maximum number of entries to retrieve, between 1 and 200, default is 30.')
265
            },
266
            'scope': {
267
                'description': _('Scope of the LDAP search, subtree or onelevel, default is subtree.'),
268
            },
269
            'filter': {
270
                'description': _('Extra LDAP filter.'),
271
                'example_value': 'objectClass=*',
272
            },
273
        },
274
    )
275
    def search_endpoint(
276
        self,
277
        request,
278
        ldap_base_dn,
279
        search_attribute,
280
        id_attribute,
281
        text_template=None,
282
        ldap_attributes=None,
283
        id=None,
284
        q=None,
285
        sizelimit=None,
286
        scope=None,
287
        filter=None,
288
    ):
289
        search_attribute = search_attribute.lower()
290
        id_attribute = id_attribute.lower()
291
        if not search_attribute.isascii():
292
            raise APIError('search_attribute contains non ASCII characters')
293
        if not id_attribute.isascii():
294
            raise APIError('id_attribute contains non ASCII characters')
295
        ldap_attributes = set(ldap_attributes.split()) if ldap_attributes else set()
296
        ldap_attributes.update([search_attribute, id_attribute])
297
        if not all(attribute.isascii() for attribute in ldap_attributes):
298
            raise APIError('ldap_attributes contains non ASCII characters')
299
        try:
300
            sizelimit = int(sizelimit)
301
        except (ValueError, TypeError):
302
            pass
303
        sizelimit = max(1, min(sizelimit or 30, 200))
304
        if not q and not id:
305
            raise APIError('q or id are mandatory parameters', http_status=400)
306
        if id:
307
            ldap_filter = '(%s=%s)' % (id_attribute, ldap.filter.escape_filter_chars(id))
308
        elif q:
309
            ldap_filter = '(%s=*%s*)' % (search_attribute, ldap.filter.escape_filter_chars(q))
310
        if filter:
311
            if not filter.startswith('('):
312
                filter = '(%s)' % filter
313
            ldap_filter = '(&%s%s)' % (ldap_filter, filter)
314
        scopes = {
315
            'subtree': ldap.SCOPE_SUBTREE,
316
            'onelevel': ldap.SCOPE_ONELEVEL,
317
        }
318
        scope = scopes.get(scope, ldap.SCOPE_SUBTREE)
319
        return self.search(
320
            ldap_base_dn=ldap_base_dn,
321
            scope=scope,
322
            ldap_filter=ldap_filter,
323
            ldap_attributes=ldap_attributes,
324
            sizelimit=sizelimit,
325
            id_attribute=id_attribute,
326
            search_attribute=search_attribute,
327
            text_template=text_template,
328
        )
329

  
330

  
331
# use a case-insensitive dictionnary to handle map of attribute to values.
332

  
333

  
334
class cidict(dict):
335
    '''Case insensitive dictionnary'''
336

  
337
    def __setitem__(self, key, value):
338
        super().__setitem__(key.lower(), value)
339

  
340
    def __getitem__(self, key):
341
        return super().__getitem__(key.lower())
342

  
343
    def __contains__(self, key):
344
        return super().__contains__(key.lower())
345

  
346
    def get(self, key, default=None, /):
347
        return super().get(key.lower(), default)
passerelle/base/models.py
379 379
                    d[field.name] = None
380 380
            elif isinstance(field, SFTPField):
381 381
                d[field.name] = value and value.__json__()
382
            elif isinstance(field, models.BinaryField):
383
                d[field.name] = None if value is None else base64.b64encode(bytes(value)).decode()
382 384
            else:
383 385
                raise Exception(
384 386
                    'export_json: field %s of ressource class %s is unsupported' % (field, self.__class__)
......
457 459
                if value:
458 460
                    value = SFTP(**value)
459 461
                setattr(instance, field.attname, value)
462
            elif isinstance(field, models.BinaryField):
463
                value = None if value is None else base64.b64decode(value)
464
                setattr(instance, field.attname, value)
460 465
            else:
461 466
                raise Exception(
462 467
                    'import_json_real: field %s of ressource class ' '%s is unsupported' % (field, cls)
passerelle/base/templates/passerelle/widgets/clearable_file_input.html
1
{% if widget.is_initial %}{{ widget.initial_text }}: <a href="{{ widget.value.url }}" download="{{ widget.value.name }}">{{ widget.value }}</a>{% if not widget.required %}
2
<input type="checkbox" name="{{ widget.checkbox_name }}" id="{{ widget.checkbox_id }}">
3
<label for="{{ widget.checkbox_id }}">{{ widget.clear_checkbox_label }}</label>{% endif %}<br>
4
{{ widget.input_text }}:{% endif %}
5
<input type="{{ widget.type }}" name="{{ widget.name }}"{% include "django/forms/widgets/attrs.html" %}>
passerelle/settings.py
149 149
    'passerelle.apps.gesbac',
150 150
    'passerelle.apps.holidays',
151 151
    'passerelle.apps.jsondatastore',
152
    'passerelle.apps.ldap',
152 153
    'passerelle.apps.maelis',
153 154
    'passerelle.apps.mdel',
154 155
    'passerelle.apps.mdel_ddpacs',
passerelle/utils/forms.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from django import forms
18
from django.core import validators
19

  
20

  
21
class LDAPURLField(forms.URLField):
22
    default_validators = [validators.URLValidator(schemes=['ldap', 'ldaps'])]
passerelle/utils/models.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from django.core import validators
18
from django.db import models
19

  
20
from .forms import LDAPURLField as LDAPURLFormField
21

  
22

  
23
class LDAPURLField(models.URLField):
24
    default_validators = [validators.URLValidator(schemes=['ldap', 'ldaps'])]
25

  
26
    def formfield(self, **kwargs):
27

  
28
        return super().formfield(
29
            **{
30
                'form_class': LDAPURLFormField,
31
                **kwargs,
32
            }
33
        )
setup.py
165 165
        'pytz',
166 166
        'vobject',
167 167
        'Levenshtein',
168
        'python-ldap',
169
        'pyOpenSSL',
168 170
    ],
169 171
    cmdclass={
170 172
        'build': build,
tests/ldap/cert.pem
1
-----BEGIN CERTIFICATE-----
2
MIIDBjCCAe6gAwIBAgIUTKopT76CFlsVcI7FAilaYLILz0owDQYJKoZIhvcNAQEL
3
BQAwIzEhMB8GA1UEAwwYbG9jYWxob3N0LmVudHJvdXZlcnQub3JnMB4XDTE4MTIw
4
NTE2NTkyNFoXDTI4MTIwMjE2NTkyNFowIzEhMB8GA1UEAwwYbG9jYWxob3N0LmVu
5
dHJvdXZlcnQub3JnMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4xsL
6
n25yittbjk5bcKvY2I8zPivL6YWn2MJimaQQSNzCw/8POmVPLmMIb3lcZjydFRad
7
+RTxZfnuvCCJrnGrG7hOsJNenTLLU0ugN/yQ1869cM07a9tjSzL7NCz9H1NIK1+Q
8
cBsTExc77dOWpwWI9TjqYYRL+zex3ml8cdqcQ7BQUQxAvA4UU63DM2G+5O3dE7l8
9
uvyBUU3kW/shHyhfweWNXO8IXXIjvDfPYkOsjc6en2kFMr+sENSUKgfDKjz/Uzqy
10
S7LBb4tkJALZM8QP56VeQAG1JZF2J2/y1RqBfIGRIEkYoaHcj6UATZa1xcZjMubL
11
z3otRNYcRXKJMYWGbQIDAQABozIwMDAJBgNVHRMEAjAAMCMGA1UdEQQcMBqCGGxv
12
Y2FsaG9zdC5lbnRyb3V2ZXJ0Lm9yZzANBgkqhkiG9w0BAQsFAAOCAQEAFVPavBah
13
mIjgnTjq6ZbFxXTNJW0TrqN8olbKJ6SfwWVk0I8px7POekFaXd+egsFJlWYyH9q4
14
HkKotddRYYrWoXcPiodNfUa+bRnh2WYl2rEGMW5dbBf/MYCDts68c3SoA7JIYJ8w
15
0QZGAkijKNtVML0/FrLuJWbfFBAWH8JB46BcAg/8flbMHAULzV3F1g/v0A3FG3Y/
16
9fVr+lN5qs+NB9NXIMdf5wXrmJQYRjotyOjUO6yTFqDFvqE7DEpKQD5hnvqJoXCz
17
zYQS1DjH1qSRc5vC8I7YlJowCfnI9MsEICSrsk75DhT091aJC2XX93o4zhfNxmO5
18
Kj28hP87GHgNIg==
19
-----END CERTIFICATE-----
tests/ldap/conftest.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import contextlib
18
import os.path
19
import pathlib
20
import socket
21

  
22
import pytest
23
from ldaptools.slapd import Slapd, has_slapd
24

  
25
pytestmark = pytest.mark.skipif(not has_slapd(), reason='slapd is not installed')
26

  
27
base_dir = os.path.dirname(__file__)
28
cert_file = os.path.join(base_dir, 'cert.pem')
29
key_file = os.path.join(base_dir, 'key.pem')
30

  
31

  
32
@pytest.fixture
33
def cert():
34
    return pathlib.Path(cert_file)
35

  
36

  
37
@pytest.fixture
38
def key():
39
    return pathlib.Path(key_file)
40

  
41

  
42
@pytest.fixture
43
def cert_content(cert):
44
    with cert.open(mode='rb') as fd:
45
        return fd.read()
46

  
47

  
48
@pytest.fixture
49
def key_content(key):
50
    with key.open(mode='rb') as fd:
51
        return fd.read()
52

  
53

  
54
def find_free_tcp_port():
55
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
56
        s.bind(('', 0))
57
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
58
        return s.getsockname()[1]
59

  
60

  
61
@pytest.fixture
62
def ldap_params():
63
    return {
64
        'ldap_url': 'ldap://localhost.entrouvert.org:%s' % find_free_tcp_port(),
65
    }
66

  
67

  
68
@pytest.fixture
69
def ldap_object(ldap_params):
70
    with Slapd(**ldap_params) as slapd:
71
        yield slapd
72

  
73

  
74
@pytest.fixture
75
def ldap_configure():
76
    pass
77

  
78

  
79
@pytest.fixture
80
def ldap_server(ldap_object, ldap_configure):
81
    return ldap_object
82

  
83

  
84
@pytest.fixture
85
def resource_class(db):
86
    from passerelle.apps.ldap.models import Resource
87

  
88
    return Resource
89

  
90

  
91
@pytest.fixture
92
def resource_params(ldap_params):
93
    return {
94
        'title': 'resource',
95
        'slug': 'resource',
96
        'description': 'resource',
97
        'ldap_url': ldap_params['ldap_url'],
98
    }
99

  
100

  
101
@pytest.fixture
102
def resource_access_rights(resource_object):
103
    from tests.utils import setup_access_rights
104

  
105
    setup_access_rights(resource_object)
106

  
107

  
108
@pytest.fixture
109
def resource_object(resource_class, resource_params):
110
    resource = resource_class(**resource_params)
111
    resource.save()
112
    return resource
113

  
114

  
115
@pytest.fixture
116
def resource(resource_object, resource_access_rights):
117
    return resource_object
tests/ldap/key.pem
1
-----BEGIN PRIVATE KEY-----
2
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDjGwufbnKK21uO
3
Tltwq9jYjzM+K8vphafYwmKZpBBI3MLD/w86ZU8uYwhveVxmPJ0VFp35FPFl+e68
4
IImucasbuE6wk16dMstTS6A3/JDXzr1wzTtr22NLMvs0LP0fU0grX5BwGxMTFzvt
5
05anBYj1OOphhEv7N7HeaXxx2pxDsFBRDEC8DhRTrcMzYb7k7d0TuXy6/IFRTeRb
6
+yEfKF/B5Y1c7whdciO8N89iQ6yNzp6faQUyv6wQ1JQqB8MqPP9TOrJLssFvi2Qk
7
AtkzxA/npV5AAbUlkXYnb/LVGoF8gZEgSRihodyPpQBNlrXFxmMy5svPei1E1hxF
8
cokxhYZtAgMBAAECggEAOUZI2BxyprJLlMgOJ4wvU+5JbhR9iJc8jV34n+bQdI+4
9
TtW0cXW7UmeHaRWiR+Zhd0AM9xRhDObLXoaWMnhYPtVsgvunkN2OiaM49OWtYb+x
10
5xDbO4hIsl5ZG/98lrnaKZYgRyWM2fOyGXiTNewfbji8Y3uJ7gFNylmwGMaZQjhr
11
YNaqNEV7Vs2n7oERxqzKG9947oBAx2hpmoaW6eMyXcWl2ov7iHpJSKUBKho+5PWc
12
J731no0OsGuS+3jHa/0nZXrT8nKmemyDMdSfWmtTv659L/guFInZpHPfFVLf56vc
13
J6zb/IzEJV+Nh7CfBsMbHlTYBeUFlRWsy9t70+OZwQKBgQD3J4aVN4vJhXX1zDgL
14
dVAczwLGKXY38BoBjOeRtVhXHs5p/eNIqeZ2YbYwBBy3nL414Un7gqb9fDtg0i3n
15
5mQIOWhvpIYUxtwIPgYwzumxxp/n7XdU4BPDbxejZxkuC7AR5bB34pwJAJvWRGEf
16
0X1TxJlqULhiZ6g18O3S0oiJtwKBgQDrO9ROaj6kkxjYHmljBZIXXhdKDcn0AqPi
17
w20Aaafx0oxNQAoq8Gtu22Z1QHwRdBeUJwqCbmHVCCwbMf/568zFAANuT9bKMe6X
18
J0p0nTDiyn8w9MfduFuG4cUMn4oK6dIuYlscguoPQCvQdciwG+djqwTrHib5TEbm
19
jeKEkY2A+wKBgQDvXt+wy2BeqBzMF6M8Lb2OeUwVgniVyrxVPhPVgk5x6ks+SoAD
20
k1G62/3o2UK67lsmsfDGYA69uMGFj2qYjAHcGUW1wyF9I/BdJz01rmCWJmoe5VXK
21
5U8e3AyH3MV9XCKF4vCb2+UFrwo/ZnCusWVxaRqw5kb+P6ihvZuIsRE+VwKBgQC7
22
2duBg3bjFlUQwbiHSzuPTaRrjvdn1XPq8wVo/vcPNoS0bB+yiqxAqxT3LbfmeD8c
23
INFTt7KI3S3byeIRQy0TZR9YSInOjnFqZAYheiY/9lX8Un4Jod/1pvYlToJ+lJs0
24
T3dTHXitFSHoJydM+/ucrEYRPNMC4tb75vKty06lYQKBgQCx49+g5kQaaRZ+4psw
25
+eolMpAwKDkpK5gYen6OsrT8m4hpxTmtiteMsH5Avb/fxqoJWLOjhN4EnEZTMJzr
26
LyGoKsTv7rhZwhRznE15rOzxmldWrcCkl7DGuM2GcKgguhCYF7U7KA+vUCeqCE0H
27
LA2grkY+TxFpg1pwYdF1hekmTw==
28
-----END PRIVATE KEY-----
tests/ldap/test_manager.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import json
18

  
19
import pytest
20
from webtest import Upload
21

  
22
pytestmark = pytest.mark.django_db
23

  
24

  
25
def login(app, username='admin', password='admin'):
26
    login_page = app.get('/login/')
27
    login_form = login_page.forms[0]
28
    login_form['username'] = username
29
    login_form['password'] = password
30
    resp = login_form.submit()
31
    assert resp.status_int == 302
32
    return app
33

  
34

  
35
@pytest.fixture
36
def app(app, admin_user):
37
    login(app)
38
    return app
39

  
40

  
41
def test_add(app, db, cert_content, key_content, resource_class):
42
    response = app.get('/manage/ldap/add')
43
    response.form.set('slug', 'resource')
44
    response.form.set('title', 'resource')
45
    response.form.set('description', 'resource')
46
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
47
    response.form.set('ldap_bind_dn', 'uid=user,o=orga')
48
    response.form.set('ldap_bind_password', 'password')
49
    response.form.set('tls_cert', Upload('cert.pem', cert_content, 'application/octet-stream'))
50
    response.form.set('tls_key', Upload('key.pem', key_content, 'application/octet-stream'))
51
    response = response.form.submit(status=302)
52
    assert resource_class.objects.count() == 1
53
    resource = resource_class.objects.get()
54
    assert resource.ldap_url == 'ldap://localhost.entrouvert.org'
55
    assert resource.ldap_bind_dn == 'uid=user,o=orga'
56
    assert resource.ldap_bind_password == 'password'
57
    assert bytes(resource.ldap_tls_cert) == cert_content
58
    assert bytes(resource.ldap_tls_key) == key_content
59

  
60

  
61
def test_missing_bind_password(app, db, cert_content, key_content, resource_class):
62
    response = app.get('/manage/ldap/add')
63
    response.form.set('slug', 'resource')
64
    response.form.set('title', 'resource')
65
    response.form.set('description', 'resource')
66
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
67
    response.form.set('ldap_bind_dn', 'uid=user,o=orga')
68
    response = response.form.submit(status=200)
69

  
70

  
71
def test_missing_bind_dn(app, db, cert_content, key_content, resource_class):
72
    response = app.get('/manage/ldap/add')
73
    response.form.set('slug', 'resource')
74
    response.form.set('title', 'resource')
75
    response.form.set('description', 'resource')
76
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
77
    response.form.set('ldap_bind_password', 'password')
78
    response = response.form.submit(status=200)
79

  
80

  
81
def test_missing_tls_key(app, db, cert_content, key_content, resource_class):
82
    response = app.get('/manage/ldap/add')
83
    response.form.set('slug', 'resource')
84
    response.form.set('title', 'resource')
85
    response.form.set('description', 'resource')
86
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
87
    response.form.set('tls_cert', Upload('cert.pem', cert_content, 'application/octet-stream'))
88
    response = response.form.submit(status=200)
89

  
90

  
91
def test_missing_tls_cert(app, db, cert_content, key_content, resource_class):
92
    response = app.get('/manage/ldap/add')
93
    response.form.set('slug', 'resource')
94
    response.form.set('title', 'resource')
95
    response.form.set('description', 'resource')
96
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
97
    response.form.set('tls_key', Upload('key.pem', key_content, 'application/octet-stream'))
98
    response = response.form.submit(status=200)
99

  
100

  
101
EXPORT_JSON = {
102
    'resources': [
103
        {
104
            '@type': 'passerelle-resource',
105
            'access_rights': [{'apiuser': 'all', 'codename': 'can_access'}],
106
            'description': 'resource',
107
            'ldap_bind_dn': None,
108
            'ldap_bind_password': None,
109
            'ldap_tls_cert': 'LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURCakNDQWU2Z0F3SUJBZ0lVVEtvcFQ3NkNGbHNWY0k3RkFpbGFZTElMejBvd0RRWUpLb1pJaHZjTkFRRUwKQlFBd0l6RWhNQjhHQTFVRUF3d1liRzlqWVd4b2IzTjBMbVZ1ZEhKdmRYWmxjblF1YjNKbk1CNFhEVEU0TVRJdwpOVEUyTlRreU5Gb1hEVEk0TVRJd01qRTJOVGt5TkZvd0l6RWhNQjhHQTFVRUF3d1liRzlqWVd4b2IzTjBMbVZ1CmRISnZkWFpsY25RdWIzSm5NSUlCSWpBTkJna3Foa2lHOXcwQkFRRUZBQU9DQVE4QU1JSUJDZ0tDQVFFQTR4c0wKbjI1eWl0dGJqazViY0t2WTJJOHpQaXZMNllXbjJNSmltYVFRU056Q3cvOFBPbVZQTG1NSWIzbGNaanlkRlJhZAorUlR4WmZudXZDQ0pybkdyRzdoT3NKTmVuVExMVTB1Z04veVExODY5Y00wN2E5dGpTekw3TkN6OUgxTklLMStRCmNCc1RFeGM3N2RPV3B3V0k5VGpxWVlSTCt6ZXgzbWw4Y2RxY1E3QlFVUXhBdkE0VVU2M0RNMkcrNU8zZEU3bDgKdXZ5QlVVM2tXL3NoSHloZndlV05YTzhJWFhJanZEZlBZa09zamM2ZW4ya0ZNcitzRU5TVUtnZkRLanovVXpxeQpTN0xCYjR0a0pBTFpNOFFQNTZWZVFBRzFKWkYySjIveTFScUJmSUdSSUVrWW9hSGNqNlVBVFphMXhjWmpNdWJMCnozb3RSTlljUlhLSk1ZV0diUUlEQVFBQm96SXdNREFKQmdOVkhSTUVBakFBTUNNR0ExVWRFUVFjTUJxQ0dHeHYKWTJGc2FHOXpkQzVsYm5SeWIzVjJaWEowTG05eVp6QU5CZ2txaGtpRzl3MEJBUXNGQUFPQ0FRRUFGVlBhdkJhaAptSWpnblRqcTZaYkZ4WFROSlcwVHJxTjhvbGJLSjZTZndXVmswSThweDdQT2VrRmFYZCtlZ3NGSmxXWXlIOXE0CkhrS290ZGRSWVlyV29YY1Bpb2ROZlVhK2JSbmgyV1lsMnJFR01XNWRiQmYvTVlDRHRzNjhjM1NvQTdKSVlKOHcKMFFaR0FraWpLTnRWTUwwL0ZyTHVKV2JmRkJBV0g4SkI0NkJjQWcvOGZsYk1IQVVMelYzRjFnL3YwQTNGRzNZLwo5ZlZyK2xONXFzK05COU5YSU1kZjV3WHJtSlFZUmpvdHlPalVPNnlURnFERnZxRTdERXBLUUQ1aG52cUpvWEN6CnpZUVMxRGpIMXFTUmM1dkM4STdZbEpvd0Nmbkk5TXNFSUNTcnNrNzVEaFQwOTFhSkMyWFg5M280emhmTnhtTzUKS2oyOGhQODdHSGdOSWc9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==',  # pylint: disable=line-too-long
110
            'ldap_tls_key': 'LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2d0lCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktrd2dnU2xBZ0VBQW9JQkFRRGpHd3VmYm5LSzIxdU8KVGx0d3E5allqek0rSzh2cGhhZll3bUtacEJCSTNNTEQvdzg2WlU4dVl3aHZlVnhtUEowVkZwMzVGUEZsK2U2OApJSW11Y2FzYnVFNndrMTZkTXN0VFM2QTMvSkRYenIxd3pUdHIyMk5MTXZzMExQMGZVMGdyWDVCd0d4TVRGenZ0CjA1YW5CWWoxT09waGhFdjdON0hlYVh4eDJweERzRkJSREVDOERoUlRyY016WWI3azdkMFR1WHk2L0lGUlRlUmIKK3lFZktGL0I1WTFjN3doZGNpTzhOODlpUTZ5TnpwNmZhUVV5djZ3UTFKUXFCOE1xUFA5VE9ySkxzc0Z2aTJRawpBdGt6eEEvbnBWNUFBYlVsa1hZbmIvTFZHb0Y4Z1pFZ1NSaWhvZHlQcFFCTmxyWEZ4bU15NXN2UGVpMUUxaHhGCmNva3hoWVp0QWdNQkFBRUNnZ0VBT1VaSTJCeHlwckpMbE1nT0o0d3ZVKzVKYmhSOWlKYzhqVjM0bitiUWRJKzQKVHRXMGNYVzdVbWVIYVJXaVIrWmhkMEFNOXhSaERPYkxYb2FXTW5oWVB0VnNndnVua04yT2lhTTQ5T1d0WWIreAo1eERiTzRoSXNsNVpHLzk4bHJuYUtaWWdSeVdNMmZPeUdYaVROZXdmYmppOFkzdUo3Z0ZOeWxtd0dNYVpRamhyCllOYXFORVY3VnMybjdvRVJ4cXpLRzk5NDdvQkF4MmhwbW9hVzZlTXlYY1dsMm92N2lIcEpTS1VCS2hvKzVQV2MKSjczMW5vME9zR3VTKzNqSGEvMG5aWHJUOG5LbWVteURNZFNmV210VHY2NTlML2d1RkluWnBIUGZGVkxmNTZ2YwpKNnpiL0l6RUpWK05oN0NmQnNNYkhsVFlCZVVGbFJXc3k5dDcwK09ad1FLQmdRRDNKNGFWTjR2SmhYWDF6RGdMCmRWQWN6d0xHS1hZMzhCb0JqT2VSdFZoWEhzNXAvZU5JcWVaMlliWXdCQnkzbkw0MTRVbjdncWI5ZkR0ZzBpM24KNW1RSU9XaHZwSVlVeHR3SVBnWXd6dW14eHAvbjdYZFU0QlBEYnhlalp4a3VDN0FSNWJCMzRwd0pBSnZXUkdFZgowWDFUeEpscVVMaGlaNmcxOE8zUzBvaUp0d0tCZ1FEck85Uk9hajZra3hqWUhtbGpCWklYWGhkS0RjbjBBcVBpCncyMEFhYWZ4MG94TlFBb3E4R3R1MjJaMVFId1JkQmVVSndxQ2JtSFZDQ3diTWYvNTY4ekZBQU51VDliS01lNlgKSjBwMG5URGl5bjh3OU1mZHVGdUc0Y1VNbjRvSzZkSXVZbHNjZ3VvUFFDdlFkY2l3RytkanF3VHJIaWI1VEVibQpqZUtFa1kyQSt3S0JnUUR2WHQrd3kyQmVxQnpNRjZNOExiMk9lVXdWZ25pVnlyeFZQaFBWZ2s1eDZrcytTb0FECmsxRzYyLzNvMlVLNjdsc21zZkRHWUE2OXVNR0ZqMnFZakFIY0dVVzF3eUY5SS9CZEp6MDFybUNXSm1vZTVWWEsKNVU4ZTNBeUgzTVY5WENLRjR2Q2IyK1VGcndvL1puQ3VzV1Z4YVJxdzVrYitQNmlodlp1SXNSRStWd0tCZ1FDNwoyZHVCZzNiakZsVVF3YmlIU3p1UFRhUnJqdmRuMVhQcTh3Vm8vdmNQTm9TMGJCK3lpcXhBcXhUM0xiZm1lRDhjCklORlR0N0tJM1MzYnllSVJReTBUWlI5WVNJbk9qbkZxWkFZaGVpWS85bFg4VW40Sm9kLzFwdllsVG9KK2xKczAKVDNkVEhYaXRGU0hvSnlkTSsvdWNyRVlSUE5NQzR0Yjc1dkt0eTA2bFlRS0JnUUN4NDkrZzVrUWFhUlorNHBzdworZW9sTXBBd0tEa3BLNWdZZW42T3NyVDhtNGhweFRtdGl0ZU1zSDVBdmIvZnhxb0pXTE9qaE40RW5FWlRNSnpyCkx5R29Lc1R2N3JoWndoUnpuRTE1ck96eG1sZFdyY0NrbDdER3VNMkdjS2dndWhDWUY3VTdLQSt2VUNlcUNFMEgKTEEyZ3JrWStUeEZwZzFwd1lkRjFoZWttVHc9PQotLS0tLUVORCBQUklWQVRFIEtFWS0tLS0tCg==',  # pylint: disable=line-too-long
111
            'ldap_url': 'ldap://localhost.entrouvert.org:52271',
112
            'log_level': 'INFO',
113
            'resource_type': 'ldap.resource',
114
            'slug': 'resource',
115
            'title': 'resource',
116
        }
117
    ],
118
}
119

  
120

  
121
class TestImportExport:
122
    @pytest.fixture
123
    def resource_params(self, resource_params, cert_content, key_content):
124
        return {
125
            **resource_params,
126
            'ldap_url': 'ldap://localhost.entrouvert.org:52271',
127
            'ldap_tls_cert': cert_content,
128
            'ldap_tls_key': key_content,
129
        }
130

  
131
    def test_import(self, app, resource_class, resource_params):
132
        assert not resource_class.objects.count()
133
        response = app.get('/manage/')
134
        response = response.click('Import')
135
        response.form.set('site_json', Upload('ldap.json', json.dumps(EXPORT_JSON).encode()))
136
        response.form.set('import_users', False)
137
        response = response.form.submit(status=302)
138
        instance = resource_class.objects.get()
139

  
140
        for key, value in resource_params.items():
141
            instance_value = getattr(instance, key)
142
            if isinstance(value, bytes) and instance_value:
143
                instance_value = bytes(instance_value)
144
            assert instance_value == value
145

  
146
    def test_export(self, app, resource, cert_content, key_content):
147
        response = app.get('/ldap/resource/')
148
        response = response.click('Export')
149
        assert json.loads(response.content) == EXPORT_JSON
tests/ldap/test_model.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import ldap
18
import pytest
19

  
20

  
21
def test_get_connection(resource):
22
    resource.get_connection()
23

  
24

  
25
class TestCheckStatus:
26
    def test_nok(self, resource):
27
        with pytest.raises(ldap.LDAPError):
28
            resource.check_status()
29

  
30
    def test_ok(self, resource, ldap_server):
31
        resource.check_status()
32

  
33

  
34
class TestTLSAuthentication:
35
    @pytest.fixture
36
    def ldap_params(self, ldap_params, key, cert):
37
        ldap_params['ldap_url'] = ldap_params['ldap_url'].replace('ldap:', 'ldaps:')
38
        return {**ldap_params, 'tls': (str(key), str(cert))}
39

  
40
    @pytest.fixture
41
    def ldap_configure(self, ldap_object, cert):
42
        conn = ldap_object.get_connection_admin()
43
        conn.modify_s(
44
            'cn=config',
45
            [
46
                (ldap.MOD_ADD, 'olcTLSCACertificateFile', str(cert).encode()),
47
                (ldap.MOD_ADD, 'olcTLSVerifyClient', b'demand'),
48
            ],
49
        )
50

  
51
    @pytest.fixture
52
    def resource_params(self, resource_params, cert_content, key_content):
53
        return {
54
            **resource_params,
55
            'ldap_tls_cert': cert_content,
56
            'ldap_tls_key': key_content,
57
        }
58

  
59
    def test_ok(self, resource, ldap_server):
60
        resource.check_status()
61

  
62

  
63
class TestLdapSearch:
64
    def test_nok(self, resource):
65
        with pytest.raises(ldap.LDAPError):
66
            list(resource.ldap_search('o=orga', ldap.SCOPE_SUBTREE, 'objectClass=*', ['*']))
67

  
68
    def test_ok(self, resource, ldap_server):
69
        entries = list(resource.ldap_search('o=orga', ldap.SCOPE_SUBTREE, 'objectClass=*', ['*']))
70
        assert entries == [('o=orga', {'o': 'orga', 'objectclass': 'organization'})]
tests/ldap/test_search_endpoint.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import pytest
18

  
19

  
20
@pytest.fixture
21
def ldap_configure(ldap_object):
22
    ldap_object.add_ldif(
23
        '''
24
dn: uid=johndoe,o=orga
25
objectClass: inetOrgPerson
26
uid: johndoe
27
cn: John Doe
28
sn: Doe
29
gn: John
30

  
31
dn: uid=janedoe,o=orga
32
objectClass: inetOrgPerson
33
uid: janedoe
34
cn: Jane Doe
35
sn: Doe
36
gn: Jane
37

  
38
dn: uid=janefoo,uid=janedoe,o=orga
39
objectClass: inetOrgPerson
40
uid: janefoo
41
cn: Jane Foo
42
sn: Foo
43
gn: Jane
44
'''
45
    )
46

  
47

  
48
def test_server_unavailaible(app, resource):
49
    response = app.get(
50
        '/ldap/resource/search',
51
        params={
52
            'q': 'Doe',
53
            'ldap_base_dn': 'o=orga',
54
            'search_attribute': 'cn',
55
            'id_attribute': 'uid',
56
        },
57
    )
58
    assert response.json == {
59
        'data': [{'disabled': True, 'id': '', 'text': 'Directory server is unavailable'}],
60
        'err': 1,
61
        'err_clss': 'directory-server-unavailable',
62
        'err_desc': '{\'result\': -1, \'desc\': "Can\'t contact LDAP server", '
63
        "'errno': 107, 'ctrls': [], 'info': 'Transport endpoint is not "
64
        "connected'}",
65
    }
66

  
67

  
68
def test_q(app, resource, ldap_server):
69
    response = app.get(
70
        '/ldap/resource/search',
71
        params={
72
            'q': 'Doe',
73
            'ldap_base_dn': 'o=orga',
74
            'search_attribute': 'cn',
75
            'id_attribute': 'uid',
76
        },
77
    )
78
    assert response.json == {
79
        'err': 0,
80
        'data': [
81
            {
82
                'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'},
83
                'dn': 'uid=janedoe,o=orga',
84
                'id': 'janedoe',
85
                'text': 'Jane Doe',
86
            },
87
            {
88
                'attributes': {'cn': 'John Doe', 'uid': 'johndoe'},
89
                'dn': 'uid=johndoe,o=orga',
90
                'id': 'johndoe',
91
                'text': 'John Doe',
92
            },
93
        ],
94
    }
95

  
96

  
97
def test_id(app, resource, ldap_server):
98
    response = app.get(
99
        '/ldap/resource/search',
100
        params={
101
            'id': 'janedoe',
102
            'ldap_base_dn': 'o=orga',
103
            'search_attribute': 'cn',
104
            'id_attribute': 'uid',
105
        },
106
    )
107
    assert response.json == {
108
        'err': 0,
109
        'data': [
110
            {
111
                'attributes': {
112
                    'cn': 'Jane Doe',
113
                    'uid': 'janedoe',
114
                },
115
                'dn': 'uid=janedoe,o=orga',
116
                'id': 'janedoe',
117
                'text': 'Jane Doe',
118
            }
119
        ],
120
    }
121

  
122

  
123
def test_sizelimit(app, resource, ldap_server):
124
    response = app.get(
125
        '/ldap/resource/search',
126
        params={
127
            'q': 'Doe',
128
            'ldap_base_dn': 'o=orga',
129
            'search_attribute': 'cn',
130
            'id_attribute': 'uid',
131
            'sizelimit': '1',
132
        },
133
    )
134
    assert len(response.json['data']) == 1
135

  
136

  
137
def test_text_template(app, resource, ldap_server):
138
    response = app.get(
139
        '/ldap/resource/search',
140
        params={
141
            'id': 'janedoe',
142
            'ldap_base_dn': 'o=orga',
143
            'search_attribute': 'cn',
144
            'id_attribute': 'uid',
145
            'ldap_attributes': 'sn givenname',
146
            'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
147
        },
148
    )
149
    assert response.json['data'][0]['text'] == 'Doe Jane (janedoe)'
150

  
151

  
152
def test_scope(app, resource, ldap_server):
153
    response = app.get(
154
        '/ldap/resource/search',
155
        params={
156
            'q': 'Foo',
157
            'scope': 'onelevel',
158
            'ldap_base_dn': 'o=orga',
159
            'search_attribute': 'cn',
160
            'id_attribute': 'uid',
161
            'ldap_attributes': 'sn givenname',
162
            'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
163
        },
164
    )
165
    assert len(response.json['data']) == 0
166

  
167
    response = app.get(
168
        '/ldap/resource/search',
169
        params={
170
            'q': 'Foo',
171
            'ldap_base_dn': 'o=orga',
172
            'search_attribute': 'cn',
173
            'id_attribute': 'uid',
174
            'ldap_attributes': 'sn givenname',
175
            'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
176
        },
177
    )
178
    assert len(response.json['data']) == 1
tox.ini
46 46
  responses
47 47
  zeep<3.3
48 48
  codestyle: pre-commit
49
  ldaptools
49 50
commands =
50 51
  ./get_wcs.sh
51 52
  py.test {posargs: --numprocesses {env:NUMPROCESSES:1} --dist loadfile {env:FAST:} {env:COVERAGE:} {env:JUNIT:} tests/}
......
80 81
  pytest-freezegun
81 82
  responses
82 83
  mohawk
84
  ldaptools
83 85
commands =
84 86
  ./pylint.sh passerelle/ tests/
85
-