Projet

Général

Profil

0001-add-ldap-connector-66533.patch

Benjamin Dauvergne, 07 septembre 2022 09:30

Télécharger (54,9 ko)

Voir les différences:

Subject: [PATCH 1/2] add ldap connector (#66533)

 passerelle/apps/ldap/__init__.py              |   0
 passerelle/apps/ldap/forms.py                 |  17 +
 .../apps/ldap/migrations/0001_initial.py      |  81 ++++
 passerelle/apps/ldap/migrations/__init__.py   |   0
 passerelle/apps/ldap/models.py                | 365 ++++++++++++++++++
 .../apps/sector/migrations/0001_initial.py    |   3 +-
 passerelle/apps/sector/models.py              |   7 +-
 .../migrations/0004_csv_upload_to.py          |   3 +-
 passerelle/contrib/nancypoll/models.py        |   7 +-
 passerelle/settings.py                        |   1 +
 passerelle/utils/forms.py                     |  22 ++
 passerelle/utils/models.py                    |  37 ++
 setup.py                                      |   2 +
 tests/ldap/__init__.py                        |   0
 tests/ldap/cert.pem                           |  19 +
 tests/ldap/conftest.py                        | 115 ++++++
 tests/ldap/key.pem                            |  28 ++
 tests/ldap/test_manager.py                    | 168 ++++++++
 tests/ldap/test_model.py                      |  72 ++++
 tests/ldap/test_search_endpoint.py            | 265 +++++++++++++
 tox.ini                                       |   2 +
 21 files changed, 1202 insertions(+), 12 deletions(-)
 create mode 100644 passerelle/apps/ldap/__init__.py
 create mode 100644 passerelle/apps/ldap/forms.py
 create mode 100644 passerelle/apps/ldap/migrations/0001_initial.py
 create mode 100644 passerelle/apps/ldap/migrations/__init__.py
 create mode 100644 passerelle/apps/ldap/models.py
 create mode 100644 passerelle/utils/forms.py
 create mode 100644 passerelle/utils/models.py
 create mode 100644 tests/ldap/__init__.py
 create mode 100644 tests/ldap/cert.pem
 create mode 100644 tests/ldap/conftest.py
 create mode 100644 tests/ldap/key.pem
 create mode 100644 tests/ldap/test_manager.py
 create mode 100644 tests/ldap/test_model.py
 create mode 100644 tests/ldap/test_search_endpoint.py
passerelle/apps/ldap/forms.py
1
from django.core.exceptions import ValidationError
2
from django.utils.translation import gettext_lazy as _
3
from OpenSSL import crypto
4

  
5

  
6
def validate_certificate(value):
7
    try:
8
        crypto.load_certificate(crypto.FILETYPE_PEM, value.open().read())
9
    except Exception:
10
        raise ValidationError(_('Invalid certificate.'))
11

  
12

  
13
def validate_private_key(value):
14
    try:
15
        crypto.load_privatekey(crypto.FILETYPE_PEM, value.open().read())
16
    except Exception:
17
        raise ValidationError(_('Invalid private key.'))
passerelle/apps/ldap/migrations/0001_initial.py
1
# Generated by Django 3.2.14 on 2022-08-02 14:37
2

  
3
from django.db import migrations, models
4

  
5
import passerelle.apps.ldap.forms
6
import passerelle.utils.models
7

  
8

  
9
class Migration(migrations.Migration):
10

  
11
    initial = True
12

  
13
    dependencies = [
14
        ('base', '0029_auto_20210202_1627'),
15
    ]
16

  
17
    operations = [
18
        migrations.CreateModel(
19
            name='Resource',
20
            fields=[
21
                (
22
                    'id',
23
                    models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
24
                ),
25
                ('title', models.CharField(max_length=50, verbose_name='Title')),
26
                ('slug', models.SlugField(unique=True, verbose_name='Identifier')),
27
                ('description', models.TextField(verbose_name='Description')),
28
                ('ldap_url', passerelle.utils.models.LDAPURLField(max_length=512, verbose_name='Server URL')),
29
                (
30
                    'ldap_bind_dn',
31
                    models.CharField(blank=True, max_length=256, null=True, verbose_name='Bind DN'),
32
                ),
33
                (
34
                    'ldap_bind_password',
35
                    models.CharField(blank=True, max_length=128, null=True, verbose_name='Bind password'),
36
                ),
37
                (
38
                    'ldap_tls_cert',
39
                    models.FileField(
40
                        verbose_name='TLS client certificate',
41
                        null=True,
42
                        blank=True,
43
                        upload_to=passerelle.utils.models.resource_file_upload_to,
44
                        validators=[passerelle.apps.ldap.forms.validate_certificate],
45
                    ),
46
                ),
47
                (
48
                    'ldap_tls_key',
49
                    models.FileField(
50
                        blank=True,
51
                        null=True,
52
                        upload_to=passerelle.utils.models.resource_file_upload_to,
53
                        validators=[passerelle.apps.ldap.forms.validate_private_key],
54
                        verbose_name='TLS client key',
55
                    ),
56
                ),
57
                (
58
                    'ldap_tls_cacert',
59
                    models.FileField(
60
                        blank=True,
61
                        null=True,
62
                        upload_to=passerelle.utils.models.resource_file_upload_to,
63
                        validators=[passerelle.apps.ldap.forms.validate_certificate],
64
                        verbose_name='TLS trusted certificate',
65
                    ),
66
                ),
67
                (
68
                    'users',
69
                    models.ManyToManyField(
70
                        blank=True,
71
                        related_name='_ldap_resource_users_+',
72
                        related_query_name='+',
73
                        to='base.ApiUser',
74
                    ),
75
                ),
76
            ],
77
            options={
78
                'verbose_name': 'LDAP',
79
            },
80
        ),
81
    ]
passerelle/apps/ldap/models.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import base64
18
import contextlib
19

  
20
import ldap
21
import ldap.filter
22
from django.core.cache import cache
23
from django.core.exceptions import ValidationError
24
from django.db import models
25
from django.utils.html import format_html
26
from django.utils.translation import gettext_lazy as _
27
from OpenSSL import crypto
28

  
29
from passerelle.base.models import BaseResource
30
from passerelle.utils.api import endpoint
31
from passerelle.utils.jsonresponse import APIError
32
from passerelle.utils.models import LDAPURLField, resource_file_upload_to
33
from passerelle.utils.templates import render_to_string
34

  
35
from . import forms
36

  
37
SEARCH_OP_SUBSTRING = 'substring'
38
SEARCH_OP_PREFIX = 'prefix'
39
SEARCH_OP_APPROX = 'approx'
40
SEARCH_OPS = [SEARCH_OP_SUBSTRING, SEARCH_OP_PREFIX, SEARCH_OP_APPROX]
41

  
42

  
43
class Resource(BaseResource):
44
    ldap_url = LDAPURLField(verbose_name=_('Server URL'), max_length=512)
45
    ldap_bind_dn = models.CharField(verbose_name=_('Bind DN'), max_length=256, null=True, blank=True)
46
    ldap_bind_password = models.CharField(
47
        verbose_name=_('Bind password'), max_length=128, null=True, blank=True
48
    )
49
    ldap_tls_cert = models.FileField(
50
        verbose_name=_('TLS client certificate'),
51
        upload_to=resource_file_upload_to,
52
        null=True,
53
        blank=True,
54
        validators=[forms.validate_certificate],
55
    )
56
    ldap_tls_key = models.FileField(
57
        verbose_name=_('TLS client key'),
58
        upload_to=resource_file_upload_to,
59
        null=True,
60
        blank=True,
61
        validators=[forms.validate_private_key],
62
    )
63
    ldap_tls_cacert = models.FileField(
64
        verbose_name=_('TLS trusted certificate'),
65
        upload_to=resource_file_upload_to,
66
        null=True,
67
        blank=True,
68
        validators=[forms.validate_certificate],
69
    )
70

  
71
    category = _('Misc')
72

  
73
    class Meta:
74
        verbose_name = _('LDAP')
75

  
76
    def tls_cert(self, value):
77
        if not value.name:
78
            return None
79
        with value as fd:
80
            content = fd.read()
81
            try:
82
                cert = crypto.load_certificate(crypto.FILETYPE_PEM, content)
83
                name = ','.join(
84
                    '%s=%s' % (a.decode(), b.decode()) for a, b in cert.get_subject().get_components()
85
                )
86
            except Exception:
87
                name = ('%s bytes') % len(content)
88
            return format_html(
89
                '<a href="data:application/octet-string;base64,{}" target="_blank" download="tls.crt">{}<a/>',
90
                base64.b64encode(content).decode(),
91
                name,
92
            )
93

  
94
    def clean(self):
95
        if bool(self.ldap_bind_dn) != bool(self.ldap_bind_password):
96
            raise ValidationError('Bind DN and password must be set together.')
97
        if bool(self.ldap_tls_cert.name) != bool(self.ldap_tls_key.name):
98
            raise ValidationError('Client certificate and key must be set together.')
99

  
100
    def get_description_fields(self):
101
        fields = super().get_description_fields()
102
        fields = [
103
            (field, self.tls_cert(value) if field.name == 'ldap_tls_cert' else value)
104
            for field, value in fields
105
        ]
106
        return fields
107

  
108
    def check_status(self):
109
        with self.get_connection() as conn:
110
            conn.whoami_s()
111

  
112
    @contextlib.contextmanager
113
    def get_connection(self):
114
        conn = ldap.initialize(self.ldap_url)
115
        conn.set_option(ldap.OPT_TIMEOUT, 5)
116
        conn.set_option(ldap.OPT_NETWORK_TIMEOUT, 5)
117
        conn.set_option(ldap.OPT_X_TLS_REQUIRE_SAN, ldap.OPT_X_TLS_NEVER)
118
        conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
119
        if self.ldap_tls_cert.name and self.ldap_tls_key.name:
120
            conn.set_option(ldap.OPT_X_TLS_CERTFILE, self.ldap_tls_cert.path)
121
            conn.set_option(ldap.OPT_X_TLS_KEYFILE, self.ldap_tls_key.path)
122
        if self.ldap_tls_cacert.name:
123
            conn.set_option(ldap.OPT_X_TLS_CACERTFILE, self.ldap_tls_cacert.path)
124
        conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
125
        if self.ldap_bind_dn:
126
            conn.simple_bind_s(self.ldap_bind_dn, self.ldap_bind_password or '')
127
        else:
128
            conn.simple_bind_s()
129
        yield conn
130
        conn.unbind()
131

  
132
    def ldap_search(self, base_dn, scope, ldap_filter, ldap_attributes, sizelimit=-1, timeout=5):
133
        with self.get_connection() as conn:
134
            message_id = conn.search_ext(
135
                base_dn, scope, ldap_filter, ldap_attributes, timeout=timeout, sizelimit=sizelimit
136
            )
137
            while True:
138
                try:
139
                    dummy, entries = conn.result(message_id, all=0)
140
                except ldap.SIZELIMIT_EXCEEDED:
141
                    break
142
                if not entries:
143
                    break
144
                for dn, attributes in entries:
145
                    if dn:
146
                        decoded_attributes = cidict()
147
                        # decode values to unicode, if possible, and keep only the first value
148
                        for k, values in attributes.items():
149
                            decoded_values = []
150
                            for value in values:
151
                                try:
152
                                    decoded_values.append(value.decode())
153
                                except UnicodeDecodeError:
154
                                    pass
155
                            if decoded_values:
156
                                if len(decoded_values) == 1:
157
                                    decoded_attributes[k] = decoded_values[0]
158
                                else:
159
                                    decoded_attributes[k] = decoded_values
160
                        yield dn, decoded_attributes
161

  
162
    def search(
163
        self,
164
        ldap_base_dn,
165
        scope,
166
        ldap_filter,
167
        ldap_attributes,
168
        sizelimit,
169
        id_attribute,
170
        search_attribute,
171
        text_template,
172
    ):
173
        ldap_attributes = tuple(sorted(ldap_attributes))
174
        cache_fingerprint = str(
175
            [
176
                ldap_base_dn,
177
                scope,
178
                ldap_filter,
179
                ldap_attributes,
180
                sizelimit,
181
                id_attribute,
182
                search_attribute,
183
                text_template,
184
            ]
185
        )
186
        cache_key = f'ldap-{self.id}-{hash(cache_fingerprint)}'
187
        cache_value = cache.get(cache_key)
188
        if cache_value and cache_value[0] == cache_fingerprint:
189
            return {'data': cache_value[1]}
190
        try:
191
            entries = list(
192
                self.ldap_search(ldap_base_dn, scope, ldap_filter, ldap_attributes, sizelimit=sizelimit)
193
            )
194
        except ldap.LDAPError as e:
195
            # add a disabled entry to show something on search errors, with display_disabled_items on w.c.s.
196
            return {
197
                'err': 1,
198
                'data': [
199
                    {
200
                        'id': '',
201
                        'text': _('Directory server is unavailable'),
202
                        'disabled': True,
203
                    }
204
                ],
205
                'err_class': 'directory-server-unavailable',
206
                'err_desc': str(e),
207
            }
208
        data = []
209
        for dn, attributes in entries:
210
            entry_id = attributes.get(id_attribute)
211
            if not entry_id:
212
                continue
213
            if text_template:
214
                entry_text = render_to_string(text_template, attributes)
215
            else:
216
                entry_text = attributes.get(search_attribute)
217
            data.append(
218
                {
219
                    'id': entry_id,
220
                    'text': entry_text,
221
                    'dn': dn,
222
                    'attributes': attributes,
223
                }
224
            )
225
        data.sort(key=lambda x: (x['text'], x['id']))
226
        cache.set(cache_key, (cache_fingerprint, data))
227
        return {'data': data}
228

  
229
    @endpoint(
230
        description=_('Search'),
231
        name='search',
232
        perm='can_access',
233
        parameters={
234
            'ldap_base_dn': {
235
                'description': _('Base DN for the LDAP search'),
236
                'example_value': 'dc=company,dc=com',
237
            },
238
            'search_attribute': {
239
                'description': _('Attribute to search for the substring search'),
240
                'example_value': 'cn',
241
            },
242
            'id_attribute': {
243
                'description': _('Attribute used as a unique identifier'),
244
                'example_value': 'uid',
245
            },
246
            'text_template': {
247
                'description': _(
248
                    'Optional template string based on LDAP attributes '
249
                    'to create a text value, if none given the search_attribute is used'
250
                ),
251
                'example_value': '{{ givenName }} {{ surname }}',
252
            },
253
            'ldap_attributes': {
254
                'description': _('Space separated list of LDAP attributes to retrieve'),
255
                'example_value': 'l sn givenName locality',
256
            },
257
            'id': {
258
                'description': _('Identifier for exacte retrieval, using the id_attribute'),
259
                'example_value': 'johndoe',
260
            },
261
            'q': {
262
                'description': _('Substring to search in the search_attribute'),
263
                'example_value': 'John Doe',
264
            },
265
            'sizelimit': {
266
                'description': _('Maximum number of entries to retrieve, between 1 and 200, default is 30.')
267
            },
268
            'scope': {
269
                'description': _('Scope of the LDAP search, subtree or onelevel, default is subtree.'),
270
            },
271
            'filter': {
272
                'description': _('Extra LDAP filter.'),
273
                'example_value': 'objectClass=*',
274
            },
275
            'search_op': {
276
                'description': _(
277
                    'Search operator, can be "substring" (the default value), "prefix" or "approx"'
278
                ),
279
                'example_value': SEARCH_OP_SUBSTRING,
280
            },
281
        },
282
    )
283
    def search_endpoint(
284
        self,
285
        request,
286
        ldap_base_dn,
287
        search_attribute,
288
        id_attribute,
289
        text_template=None,
290
        ldap_attributes=None,
291
        id=None,
292
        q=None,
293
        sizelimit=None,
294
        scope=None,
295
        filter=None,
296
        search_op=SEARCH_OP_SUBSTRING,
297
    ):
298
        search_attribute = search_attribute.lower()
299
        id_attribute = id_attribute.lower()
300
        if not search_attribute.isascii():
301
            raise APIError('search_attribute contains non ASCII characters')
302
        if not id_attribute.isascii():
303
            raise APIError('id_attribute contains non ASCII characters')
304
        ldap_attributes = set(ldap_attributes.split()) if ldap_attributes else set()
305
        ldap_attributes.update([search_attribute, id_attribute])
306
        if not all(attribute.isascii() for attribute in ldap_attributes):
307
            raise APIError('ldap_attributes contains non ASCII characters')
308
        try:
309
            sizelimit = int(sizelimit)
310
        except (ValueError, TypeError):
311
            pass
312
        sizelimit = max(1, min(sizelimit or 30, 200))
313
        if search_op not in SEARCH_OPS:
314
            search_op = SEARCH_OP_SUBSTRING
315
        if not q and not id:
316
            raise APIError('q or id are mandatory parameters', http_status=400)
317
        if id:
318
            ldap_filter = '(%s=%s)' % (id_attribute, ldap.filter.escape_filter_chars(id))
319
        elif q:
320
            if search_op == SEARCH_OP_SUBSTRING:
321
                ldap_filter = '(%s=*%s*)' % (search_attribute, ldap.filter.escape_filter_chars(q))
322
            elif search_op == SEARCH_OP_PREFIX:
323
                ldap_filter = '(%s=%s*)' % (search_attribute, ldap.filter.escape_filter_chars(q))
324
            elif search_op == SEARCH_OP_APPROX:
325
                ldap_filter = '(%s~=%s)' % (search_attribute, ldap.filter.escape_filter_chars(q))
326
            else:
327
                raise APIError('unknown search_op %r' % search_op)
328
        if filter:
329
            if not filter.startswith('('):
330
                filter = '(%s)' % filter
331
            ldap_filter = '(&%s%s)' % (ldap_filter, filter)
332
        scopes = {
333
            'subtree': ldap.SCOPE_SUBTREE,
334
            'onelevel': ldap.SCOPE_ONELEVEL,
335
        }
336
        scope = scopes.get(scope, ldap.SCOPE_SUBTREE)
337
        return self.search(
338
            ldap_base_dn=ldap_base_dn,
339
            scope=scope,
340
            ldap_filter=ldap_filter,
341
            ldap_attributes=ldap_attributes,
342
            sizelimit=sizelimit,
343
            id_attribute=id_attribute,
344
            search_attribute=search_attribute,
345
            text_template=text_template,
346
        )
347

  
348

  
349
# use a case-insensitive dictionnary to handle map of attribute to values.
350

  
351

  
352
class cidict(dict):
353
    '''Case insensitive dictionnary'''
354

  
355
    def __setitem__(self, key, value):
356
        super().__setitem__(key.lower(), value)
357

  
358
    def __getitem__(self, key):
359
        return super().__getitem__(key.lower())
360

  
361
    def __contains__(self, key):
362
        return super().__contains__(key.lower())
363

  
364
    def get(self, key, default=None, /):
365
        return super().get(key.lower(), default)
passerelle/apps/sector/migrations/0001_initial.py
4 4
from django.db import migrations, models
5 5

  
6 6
import passerelle.apps.sector.models
7
import passerelle.utils.models
7 8

  
8 9

  
9 10
class Migration(migrations.Migration):
......
43 44
                    'csv_file',
44 45
                    models.FileField(
45 46
                        help_text='CSV file',
46
                        upload_to=passerelle.apps.sector.models.upload_to,
47
                        upload_to=passerelle.utils.models.resource_file_upload_to,
47 48
                        verbose_name='Sectorization file',
48 49
                    ),
49 50
                ),
passerelle/apps/sector/models.py
32 32
from passerelle.base.models import BaseResource
33 33
from passerelle.utils.api import endpoint
34 34
from passerelle.utils.jsonresponse import APIError
35
from passerelle.utils.models import resource_file_upload_to
35 36

  
36 37
PARITY_ALL = 0
37 38
PARITY_ODD = 1
......
56 57
MAX_HOUSENUMBER = 999_999
57 58

  
58 59

  
59
def upload_to(instance, filename):
60
    return '%s/%s/%s' % (instance.get_connector_slug(), instance.slug, filename)
61

  
62

  
63 60
class SectorResource(BaseResource):
64 61
    csv_file = models.FileField(
65 62
        _('Sectorization file'),
66
        upload_to=upload_to,
63
        upload_to=resource_file_upload_to,
67 64
        help_text=_('CSV file'),
68 65
    )
69 66
    titles_in_first_line = models.BooleanField(
passerelle/contrib/nancypoll/migrations/0004_csv_upload_to.py
4 4
from django.db import migrations, models
5 5

  
6 6
import passerelle.contrib.nancypoll.models
7
import passerelle.utils.models
7 8

  
8 9

  
9 10
class Migration(migrations.Migration):
......
17 18
            model_name='nancypoll',
18 19
            name='csv_file',
19 20
            field=models.FileField(
20
                upload_to=passerelle.contrib.nancypoll.models.upload_to, verbose_name='CSV File'
21
                upload_to=passerelle.utils.models.resource_file_upload_to, verbose_name='CSV File'
21 22
            ),
22 23
        ),
23 24
    ]
passerelle/contrib/nancypoll/models.py
7 7
from passerelle.base.models import BaseResource
8 8
from passerelle.utils.api import endpoint
9 9
from passerelle.utils.jsonresponse import APIError
10
from passerelle.utils.models import resource_file_upload_to
10 11

  
11 12
COLUMN_NAMES = (
12 13
    'street_start_number, street_end_number,,,street_side,,,,code,id,text,address,,,street_name,,canton,,,'
......
17 18
    return force_str(value, 'utf-8')
18 19

  
19 20

  
20
def upload_to(instance, filename):
21
    return '%s/%s/%s' % (instance.get_connector_slug(), instance.slug, filename)
22

  
23

  
24 21
class NancyPoll(BaseResource):
25
    csv_file = models.FileField(_('CSV File'), upload_to=upload_to)
22
    csv_file = models.FileField(_('CSV File'), upload_to=resource_file_upload_to)
26 23
    category = _('Data Sources')
27 24

  
28 25
    class Meta:
passerelle/settings.py
150 150
    'passerelle.apps.gesbac',
151 151
    'passerelle.apps.holidays',
152 152
    'passerelle.apps.jsondatastore',
153
    'passerelle.apps.ldap',
153 154
    'passerelle.apps.maelis',
154 155
    'passerelle.apps.mdel',
155 156
    'passerelle.apps.mdel_ddpacs',
passerelle/utils/forms.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from django import forms
18
from django.core import validators
19

  
20

  
21
class LDAPURLField(forms.URLField):
22
    default_validators = [validators.URLValidator(schemes=['ldap', 'ldaps'])]
passerelle/utils/models.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from django.core import validators
18
from django.db import models
19

  
20
from .forms import LDAPURLField as LDAPURLFormField
21

  
22

  
23
def resource_file_upload_to(instance, filename):
24
    return '%s/%s/%s' % (instance.get_connector_slug(), instance.slug, filename)
25

  
26

  
27
class LDAPURLField(models.URLField):
28
    default_validators = [validators.URLValidator(schemes=['ldap', 'ldaps'])]
29

  
30
    def formfield(self, **kwargs):
31

  
32
        return super().formfield(
33
            **{
34
                'form_class': LDAPURLFormField,
35
                **kwargs,
36
            }
37
        )
setup.py
165 165
        'pytz',
166 166
        'vobject',
167 167
        'Levenshtein',
168
        'python-ldap',
169
        'pyOpenSSL',
168 170
    ],
169 171
    cmdclass={
170 172
        'build': build,
tests/ldap/cert.pem
1
-----BEGIN CERTIFICATE-----
2
MIIDBjCCAe6gAwIBAgIUTKopT76CFlsVcI7FAilaYLILz0owDQYJKoZIhvcNAQEL
3
BQAwIzEhMB8GA1UEAwwYbG9jYWxob3N0LmVudHJvdXZlcnQub3JnMB4XDTE4MTIw
4
NTE2NTkyNFoXDTI4MTIwMjE2NTkyNFowIzEhMB8GA1UEAwwYbG9jYWxob3N0LmVu
5
dHJvdXZlcnQub3JnMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4xsL
6
n25yittbjk5bcKvY2I8zPivL6YWn2MJimaQQSNzCw/8POmVPLmMIb3lcZjydFRad
7
+RTxZfnuvCCJrnGrG7hOsJNenTLLU0ugN/yQ1869cM07a9tjSzL7NCz9H1NIK1+Q
8
cBsTExc77dOWpwWI9TjqYYRL+zex3ml8cdqcQ7BQUQxAvA4UU63DM2G+5O3dE7l8
9
uvyBUU3kW/shHyhfweWNXO8IXXIjvDfPYkOsjc6en2kFMr+sENSUKgfDKjz/Uzqy
10
S7LBb4tkJALZM8QP56VeQAG1JZF2J2/y1RqBfIGRIEkYoaHcj6UATZa1xcZjMubL
11
z3otRNYcRXKJMYWGbQIDAQABozIwMDAJBgNVHRMEAjAAMCMGA1UdEQQcMBqCGGxv
12
Y2FsaG9zdC5lbnRyb3V2ZXJ0Lm9yZzANBgkqhkiG9w0BAQsFAAOCAQEAFVPavBah
13
mIjgnTjq6ZbFxXTNJW0TrqN8olbKJ6SfwWVk0I8px7POekFaXd+egsFJlWYyH9q4
14
HkKotddRYYrWoXcPiodNfUa+bRnh2WYl2rEGMW5dbBf/MYCDts68c3SoA7JIYJ8w
15
0QZGAkijKNtVML0/FrLuJWbfFBAWH8JB46BcAg/8flbMHAULzV3F1g/v0A3FG3Y/
16
9fVr+lN5qs+NB9NXIMdf5wXrmJQYRjotyOjUO6yTFqDFvqE7DEpKQD5hnvqJoXCz
17
zYQS1DjH1qSRc5vC8I7YlJowCfnI9MsEICSrsk75DhT091aJC2XX93o4zhfNxmO5
18
Kj28hP87GHgNIg==
19
-----END CERTIFICATE-----
tests/ldap/conftest.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import contextlib
18
import os.path
19
import pathlib
20
import socket
21

  
22
import pytest
23
from ldaptools.slapd import Slapd, has_slapd
24

  
25
pytestmark = pytest.mark.skipif(not has_slapd(), reason='slapd is not installed')
26

  
27
base_dir = os.path.dirname(__file__)
28
cert_file = os.path.join(base_dir, 'cert.pem')
29
key_file = os.path.join(base_dir, 'key.pem')
30

  
31

  
32
@pytest.fixture
33
def cert():
34
    return pathlib.Path(cert_file)
35

  
36

  
37
@pytest.fixture
38
def key():
39
    return pathlib.Path(key_file)
40

  
41

  
42
@pytest.fixture
43
def cert_content(cert):
44
    with cert.open(mode='rb') as fd:
45
        return fd.read()
46

  
47

  
48
@pytest.fixture
49
def key_content(key):
50
    with key.open(mode='rb') as fd:
51
        return fd.read()
52

  
53

  
54
def find_free_tcp_port():
55
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
56
        s.bind(('', 0))
57
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
58
        return s.getsockname()[1]
59

  
60

  
61
@pytest.fixture
62
def ldap_params():
63
    return {
64
        'ldap_url': 'ldap://localhost.entrouvert.org:%s' % find_free_tcp_port(),
65
    }
66

  
67

  
68
@pytest.fixture
69
def ldap_object(ldap_params):
70
    with Slapd(**ldap_params) as slapd:
71
        yield slapd
72

  
73

  
74
@pytest.fixture
75
def ldap_configure():
76
    pass
77

  
78

  
79
@pytest.fixture
80
def ldap_server(ldap_object, ldap_configure):
81
    return ldap_object
82

  
83

  
84
@pytest.fixture
85
def resource_class(db):
86
    from passerelle.apps.ldap.models import Resource
87

  
88
    return Resource
89

  
90

  
91
@pytest.fixture
92
def resource_params(ldap_params):
93
    return {
94
        'title': 'resource',
95
        'slug': 'resource',
96
        'description': 'resource',
97
        'ldap_url': ldap_params['ldap_url'],
98
    }
99

  
100

  
101
@pytest.fixture
102
def resource_access_rights(resource_object):
103
    from tests.utils import setup_access_rights
104

  
105
    setup_access_rights(resource_object)
106

  
107

  
108
@pytest.fixture
109
def resource_object(resource_class, resource_params):
110
    return resource_class.objects.create(**resource_params)
111

  
112

  
113
@pytest.fixture
114
def resource(resource_object, resource_access_rights):
115
    return resource_object
tests/ldap/key.pem
1
-----BEGIN PRIVATE KEY-----
2
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDjGwufbnKK21uO
3
Tltwq9jYjzM+K8vphafYwmKZpBBI3MLD/w86ZU8uYwhveVxmPJ0VFp35FPFl+e68
4
IImucasbuE6wk16dMstTS6A3/JDXzr1wzTtr22NLMvs0LP0fU0grX5BwGxMTFzvt
5
05anBYj1OOphhEv7N7HeaXxx2pxDsFBRDEC8DhRTrcMzYb7k7d0TuXy6/IFRTeRb
6
+yEfKF/B5Y1c7whdciO8N89iQ6yNzp6faQUyv6wQ1JQqB8MqPP9TOrJLssFvi2Qk
7
AtkzxA/npV5AAbUlkXYnb/LVGoF8gZEgSRihodyPpQBNlrXFxmMy5svPei1E1hxF
8
cokxhYZtAgMBAAECggEAOUZI2BxyprJLlMgOJ4wvU+5JbhR9iJc8jV34n+bQdI+4
9
TtW0cXW7UmeHaRWiR+Zhd0AM9xRhDObLXoaWMnhYPtVsgvunkN2OiaM49OWtYb+x
10
5xDbO4hIsl5ZG/98lrnaKZYgRyWM2fOyGXiTNewfbji8Y3uJ7gFNylmwGMaZQjhr
11
YNaqNEV7Vs2n7oERxqzKG9947oBAx2hpmoaW6eMyXcWl2ov7iHpJSKUBKho+5PWc
12
J731no0OsGuS+3jHa/0nZXrT8nKmemyDMdSfWmtTv659L/guFInZpHPfFVLf56vc
13
J6zb/IzEJV+Nh7CfBsMbHlTYBeUFlRWsy9t70+OZwQKBgQD3J4aVN4vJhXX1zDgL
14
dVAczwLGKXY38BoBjOeRtVhXHs5p/eNIqeZ2YbYwBBy3nL414Un7gqb9fDtg0i3n
15
5mQIOWhvpIYUxtwIPgYwzumxxp/n7XdU4BPDbxejZxkuC7AR5bB34pwJAJvWRGEf
16
0X1TxJlqULhiZ6g18O3S0oiJtwKBgQDrO9ROaj6kkxjYHmljBZIXXhdKDcn0AqPi
17
w20Aaafx0oxNQAoq8Gtu22Z1QHwRdBeUJwqCbmHVCCwbMf/568zFAANuT9bKMe6X
18
J0p0nTDiyn8w9MfduFuG4cUMn4oK6dIuYlscguoPQCvQdciwG+djqwTrHib5TEbm
19
jeKEkY2A+wKBgQDvXt+wy2BeqBzMF6M8Lb2OeUwVgniVyrxVPhPVgk5x6ks+SoAD
20
k1G62/3o2UK67lsmsfDGYA69uMGFj2qYjAHcGUW1wyF9I/BdJz01rmCWJmoe5VXK
21
5U8e3AyH3MV9XCKF4vCb2+UFrwo/ZnCusWVxaRqw5kb+P6ihvZuIsRE+VwKBgQC7
22
2duBg3bjFlUQwbiHSzuPTaRrjvdn1XPq8wVo/vcPNoS0bB+yiqxAqxT3LbfmeD8c
23
INFTt7KI3S3byeIRQy0TZR9YSInOjnFqZAYheiY/9lX8Un4Jod/1pvYlToJ+lJs0
24
T3dTHXitFSHoJydM+/ucrEYRPNMC4tb75vKty06lYQKBgQCx49+g5kQaaRZ+4psw
25
+eolMpAwKDkpK5gYen6OsrT8m4hpxTmtiteMsH5Avb/fxqoJWLOjhN4EnEZTMJzr
26
LyGoKsTv7rhZwhRznE15rOzxmldWrcCkl7DGuM2GcKgguhCYF7U7KA+vUCeqCE0H
27
LA2grkY+TxFpg1pwYdF1hekmTw==
28
-----END PRIVATE KEY-----
tests/ldap/test_manager.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import json
18

  
19
import pytest
20
from django.core.files.base import ContentFile
21
from webtest import Upload
22

  
23
pytestmark = pytest.mark.django_db
24

  
25

  
26
def login(app, username='admin', password='admin'):
27
    login_page = app.get('/login/')
28
    login_form = login_page.forms[0]
29
    login_form['username'] = username
30
    login_form['password'] = password
31
    resp = login_form.submit()
32
    assert resp.status_int == 302
33
    return app
34

  
35

  
36
@pytest.fixture
37
def app(app, admin_user):
38
    login(app)
39
    return app
40

  
41

  
42
def test_add(app, db, cert_content, key_content, resource_class):
43
    response = app.get('/manage/ldap/add')
44
    response.form.set('slug', 'resource')
45
    response.form.set('title', 'resource')
46
    response.form.set('description', 'resource')
47
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
48
    response.form.set('ldap_bind_dn', 'uid=user,o=orga')
49
    response.form.set('ldap_bind_password', 'password')
50
    response.form.set('ldap_tls_cert', Upload('cert.pem', cert_content, 'application/octet-stream'))
51
    response.form.set('ldap_tls_key', Upload('key.pem', key_content, 'application/octet-stream'))
52
    response = response.form.submit(status=302)
53
    assert resource_class.objects.count() == 1
54
    resource = resource_class.objects.get()
55
    assert resource.ldap_url == 'ldap://localhost.entrouvert.org'
56
    assert resource.ldap_bind_dn == 'uid=user,o=orga'
57
    assert resource.ldap_bind_password == 'password'
58
    with resource.ldap_tls_cert as fd:
59
        assert fd.read() == cert_content
60
    with resource.ldap_tls_key as fd:
61
        assert fd.read() == key_content
62

  
63

  
64
def test_missing_bind_password(app, db, cert_content, key_content, resource_class):
65
    response = app.get('/manage/ldap/add')
66
    response.form.set('slug', 'resource')
67
    response.form.set('title', 'resource')
68
    response.form.set('description', 'resource')
69
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
70
    response.form.set('ldap_bind_dn', 'uid=user,o=orga')
71
    response = response.form.submit(status=200)
72

  
73

  
74
def test_missing_bind_dn(app, db, cert_content, key_content, resource_class):
75
    response = app.get('/manage/ldap/add')
76
    response.form.set('slug', 'resource')
77
    response.form.set('title', 'resource')
78
    response.form.set('description', 'resource')
79
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
80
    response.form.set('ldap_bind_password', 'password')
81
    response = response.form.submit(status=200)
82

  
83

  
84
def test_missing_tls_key(app, db, cert_content, key_content, resource_class):
85
    response = app.get('/manage/ldap/add')
86
    response.form.set('slug', 'resource')
87
    response.form.set('title', 'resource')
88
    response.form.set('description', 'resource')
89
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
90
    response.form.set('ldap_tls_cert', Upload('cert.pem', cert_content, 'application/octet-stream'))
91
    response = response.form.submit(status=200)
92

  
93

  
94
def test_missing_tls_cert(app, db, cert_content, key_content, resource_class):
95
    response = app.get('/manage/ldap/add')
96
    response.form.set('slug', 'resource')
97
    response.form.set('title', 'resource')
98
    response.form.set('description', 'resource')
99
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
100
    response.form.set('ldap_tls_key', Upload('key.pem', key_content, 'application/octet-stream'))
101
    response = response.form.submit(status=200)
102

  
103

  
104
EXPORT_JSON = {
105
    'resources': [
106
        {
107
            '@type': 'passerelle-resource',
108
            'access_rights': [{'apiuser': 'all', 'codename': 'can_access'}],
109
            'description': 'resource',
110
            'ldap_bind_dn': None,
111
            'ldap_bind_password': None,
112
            'ldap_tls_cert': {
113
                'name': 'cert.pem',
114
                'content': 'LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURCakNDQWU2Z0F3SUJBZ0lVVEtvcFQ3NkNGbHNWY0k3RkFpbGFZTElMejBvd0RRWUpLb1pJaHZjTkFRRUwKQlFBd0l6RWhNQjhHQTFVRUF3d1liRzlqWVd4b2IzTjBMbVZ1ZEhKdmRYWmxjblF1YjNKbk1CNFhEVEU0TVRJdwpOVEUyTlRreU5Gb1hEVEk0TVRJd01qRTJOVGt5TkZvd0l6RWhNQjhHQTFVRUF3d1liRzlqWVd4b2IzTjBMbVZ1CmRISnZkWFpsY25RdWIzSm5NSUlCSWpBTkJna3Foa2lHOXcwQkFRRUZBQU9DQVE4QU1JSUJDZ0tDQVFFQTR4c0wKbjI1eWl0dGJqazViY0t2WTJJOHpQaXZMNllXbjJNSmltYVFRU056Q3cvOFBPbVZQTG1NSWIzbGNaanlkRlJhZAorUlR4WmZudXZDQ0pybkdyRzdoT3NKTmVuVExMVTB1Z04veVExODY5Y00wN2E5dGpTekw3TkN6OUgxTklLMStRCmNCc1RFeGM3N2RPV3B3V0k5VGpxWVlSTCt6ZXgzbWw4Y2RxY1E3QlFVUXhBdkE0VVU2M0RNMkcrNU8zZEU3bDgKdXZ5QlVVM2tXL3NoSHloZndlV05YTzhJWFhJanZEZlBZa09zamM2ZW4ya0ZNcitzRU5TVUtnZkRLanovVXpxeQpTN0xCYjR0a0pBTFpNOFFQNTZWZVFBRzFKWkYySjIveTFScUJmSUdSSUVrWW9hSGNqNlVBVFphMXhjWmpNdWJMCnozb3RSTlljUlhLSk1ZV0diUUlEQVFBQm96SXdNREFKQmdOVkhSTUVBakFBTUNNR0ExVWRFUVFjTUJxQ0dHeHYKWTJGc2FHOXpkQzVsYm5SeWIzVjJaWEowTG05eVp6QU5CZ2txaGtpRzl3MEJBUXNGQUFPQ0FRRUFGVlBhdkJhaAptSWpnblRqcTZaYkZ4WFROSlcwVHJxTjhvbGJLSjZTZndXVmswSThweDdQT2VrRmFYZCtlZ3NGSmxXWXlIOXE0CkhrS290ZGRSWVlyV29YY1Bpb2ROZlVhK2JSbmgyV1lsMnJFR01XNWRiQmYvTVlDRHRzNjhjM1NvQTdKSVlKOHcKMFFaR0FraWpLTnRWTUwwL0ZyTHVKV2JmRkJBV0g4SkI0NkJjQWcvOGZsYk1IQVVMelYzRjFnL3YwQTNGRzNZLwo5ZlZyK2xONXFzK05COU5YSU1kZjV3WHJtSlFZUmpvdHlPalVPNnlURnFERnZxRTdERXBLUUQ1aG52cUpvWEN6CnpZUVMxRGpIMXFTUmM1dkM4STdZbEpvd0Nmbkk5TXNFSUNTcnNrNzVEaFQwOTFhSkMyWFg5M280emhmTnhtTzUKS2oyOGhQODdHSGdOSWc9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==',  # pylint: disable=line-too-long
115
            },
116
            'ldap_tls_key': {
117
                'name': 'key.pem',
118
                'content': 'LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2d0lCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktrd2dnU2xBZ0VBQW9JQkFRRGpHd3VmYm5LSzIxdU8KVGx0d3E5allqek0rSzh2cGhhZll3bUtacEJCSTNNTEQvdzg2WlU4dVl3aHZlVnhtUEowVkZwMzVGUEZsK2U2OApJSW11Y2FzYnVFNndrMTZkTXN0VFM2QTMvSkRYenIxd3pUdHIyMk5MTXZzMExQMGZVMGdyWDVCd0d4TVRGenZ0CjA1YW5CWWoxT09waGhFdjdON0hlYVh4eDJweERzRkJSREVDOERoUlRyY016WWI3azdkMFR1WHk2L0lGUlRlUmIKK3lFZktGL0I1WTFjN3doZGNpTzhOODlpUTZ5TnpwNmZhUVV5djZ3UTFKUXFCOE1xUFA5VE9ySkxzc0Z2aTJRawpBdGt6eEEvbnBWNUFBYlVsa1hZbmIvTFZHb0Y4Z1pFZ1NSaWhvZHlQcFFCTmxyWEZ4bU15NXN2UGVpMUUxaHhGCmNva3hoWVp0QWdNQkFBRUNnZ0VBT1VaSTJCeHlwckpMbE1nT0o0d3ZVKzVKYmhSOWlKYzhqVjM0bitiUWRJKzQKVHRXMGNYVzdVbWVIYVJXaVIrWmhkMEFNOXhSaERPYkxYb2FXTW5oWVB0VnNndnVua04yT2lhTTQ5T1d0WWIreAo1eERiTzRoSXNsNVpHLzk4bHJuYUtaWWdSeVdNMmZPeUdYaVROZXdmYmppOFkzdUo3Z0ZOeWxtd0dNYVpRamhyCllOYXFORVY3VnMybjdvRVJ4cXpLRzk5NDdvQkF4MmhwbW9hVzZlTXlYY1dsMm92N2lIcEpTS1VCS2hvKzVQV2MKSjczMW5vME9zR3VTKzNqSGEvMG5aWHJUOG5LbWVteURNZFNmV210VHY2NTlML2d1RkluWnBIUGZGVkxmNTZ2YwpKNnpiL0l6RUpWK05oN0NmQnNNYkhsVFlCZVVGbFJXc3k5dDcwK09ad1FLQmdRRDNKNGFWTjR2SmhYWDF6RGdMCmRWQWN6d0xHS1hZMzhCb0JqT2VSdFZoWEhzNXAvZU5JcWVaMlliWXdCQnkzbkw0MTRVbjdncWI5ZkR0ZzBpM24KNW1RSU9XaHZwSVlVeHR3SVBnWXd6dW14eHAvbjdYZFU0QlBEYnhlalp4a3VDN0FSNWJCMzRwd0pBSnZXUkdFZgowWDFUeEpscVVMaGlaNmcxOE8zUzBvaUp0d0tCZ1FEck85Uk9hajZra3hqWUhtbGpCWklYWGhkS0RjbjBBcVBpCncyMEFhYWZ4MG94TlFBb3E4R3R1MjJaMVFId1JkQmVVSndxQ2JtSFZDQ3diTWYvNTY4ekZBQU51VDliS01lNlgKSjBwMG5URGl5bjh3OU1mZHVGdUc0Y1VNbjRvSzZkSXVZbHNjZ3VvUFFDdlFkY2l3RytkanF3VHJIaWI1VEVibQpqZUtFa1kyQSt3S0JnUUR2WHQrd3kyQmVxQnpNRjZNOExiMk9lVXdWZ25pVnlyeFZQaFBWZ2s1eDZrcytTb0FECmsxRzYyLzNvMlVLNjdsc21zZkRHWUE2OXVNR0ZqMnFZakFIY0dVVzF3eUY5SS9CZEp6MDFybUNXSm1vZTVWWEsKNVU4ZTNBeUgzTVY5WENLRjR2Q2IyK1VGcndvL1puQ3VzV1Z4YVJxdzVrYitQNmlodlp1SXNSRStWd0tCZ1FDNwoyZHVCZzNiakZsVVF3YmlIU3p1UFRhUnJqdmRuMVhQcTh3Vm8vdmNQTm9TMGJCK3lpcXhBcXhUM0xiZm1lRDhjCklORlR0N0tJM1MzYnllSVJReTBUWlI5WVNJbk9qbkZxWkFZaGVpWS85bFg4VW40Sm9kLzFwdllsVG9KK2xKczAKVDNkVEhYaXRGU0hvSnlkTSsvdWNyRVlSUE5NQzR0Yjc1dkt0eTA2bFlRS0JnUUN4NDkrZzVrUWFhUlorNHBzdworZW9sTXBBd0tEa3BLNWdZZW42T3NyVDhtNGhweFRtdGl0ZU1zSDVBdmIvZnhxb0pXTE9qaE40RW5FWlRNSnpyCkx5R29Lc1R2N3JoWndoUnpuRTE1ck96eG1sZFdyY0NrbDdER3VNMkdjS2dndWhDWUY3VTdLQSt2VUNlcUNFMEgKTEEyZ3JrWStUeEZwZzFwd1lkRjFoZWttVHc9PQotLS0tLUVORCBQUklWQVRFIEtFWS0tLS0tCg==',  # pylint: disable=line-too-long
119
            },
120
            'ldap_tls_cacert': None,
121
            'ldap_url': 'ldap://localhost.entrouvert.org:52271',
122
            'log_level': 'INFO',
123
            'resource_type': 'ldap.resource',
124
            'slug': 'resource',
125
            'title': 'resource',
126
        }
127
    ],
128
}
129

  
130

  
131
class TestImportExport:
132
    @pytest.fixture
133
    def resource_params(self, resource_params, cert_content, key_content):
134
        return {
135
            **resource_params,
136
            'ldap_url': 'ldap://localhost.entrouvert.org:52271',
137
            'ldap_tls_cert': ContentFile(cert_content, name='cert.pem'),
138
            'ldap_tls_key': ContentFile(key_content, name='key.pem'),
139
        }
140

  
141
    def test_import(self, app, resource_class, resource_params):
142
        assert not resource_class.objects.count()
143
        response = app.get('/manage/')
144
        response = response.click('Import')
145
        response.form.set('site_json', Upload('ldap.json', json.dumps(EXPORT_JSON).encode()))
146
        response.form.set('import_users', False)
147
        response = response.form.submit(status=302)
148
        instance = resource_class.objects.get()
149

  
150
        for key, value in resource_params.items():
151
            instance_value = getattr(instance, key)
152
            if isinstance(value, ContentFile):
153
                with instance_value as fd1:
154
                    with value as fd2:
155
                        assert fd1.read() == fd2.read()
156
            else:
157
                assert instance_value == value
158

  
159
    def test_export(self, app, resource, cert_content, key_content):
160
        response = app.get('/ldap/resource/')
161
        response = response.click('Export')
162
        content = response.json
163
        for r in content['resources']:
164
            if r.get('ldap_tls_cert'):
165
                r['ldap_tls_cert']['name'] = 'cert.pem'
166
            if r.get('ldap_tls_key'):
167
                r['ldap_tls_key']['name'] = 'key.pem'
168
        assert content == EXPORT_JSON
tests/ldap/test_model.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import ldap
18
import pytest
19
from django.core.files.base import ContentFile
20

  
21

  
22
def test_get_connection(resource):
23
    resource.get_connection()
24

  
25

  
26
class TestCheckStatus:
27
    def test_nok(self, resource):
28
        with pytest.raises(ldap.LDAPError):
29
            resource.check_status()
30

  
31
    def test_ok(self, resource, ldap_server):
32
        resource.check_status()
33

  
34

  
35
class TestTLSAuthentication:
36
    @pytest.fixture
37
    def ldap_params(self, ldap_params, key, cert):
38
        ldap_params['ldap_url'] = ldap_params['ldap_url'].replace('ldap:', 'ldaps:')
39
        return {**ldap_params, 'tls': (str(key), str(cert))}
40

  
41
    @pytest.fixture
42
    def ldap_configure(self, ldap_object, cert):
43
        conn = ldap_object.get_connection_admin()
44
        conn.modify_s(
45
            'cn=config',
46
            [
47
                (ldap.MOD_ADD, 'olcTLSCACertificateFile', str(cert).encode()),
48
                (ldap.MOD_ADD, 'olcTLSVerifyClient', b'demand'),
49
            ],
50
        )
51

  
52
    @pytest.fixture
53
    def resource_params(self, resource_params, cert_content, key_content):
54
        return {
55
            **resource_params,
56
            'ldap_tls_cert': ContentFile(cert_content, name='cert.pem'),
57
            'ldap_tls_key': ContentFile(key_content, name='key.pem'),
58
            'ldap_tls_cacert': ContentFile(cert_content, name='cert.pem'),
59
        }
60

  
61
    def test_ok(self, resource, ldap_server):
62
        resource.check_status()
63

  
64

  
65
class TestLdapSearch:
66
    def test_nok(self, resource):
67
        with pytest.raises(ldap.LDAPError):
68
            list(resource.ldap_search('o=orga', ldap.SCOPE_SUBTREE, 'objectClass=*', ['*']))
69

  
70
    def test_ok(self, resource, ldap_server):
71
        entries = list(resource.ldap_search('o=orga', ldap.SCOPE_SUBTREE, 'objectClass=*', ['*']))
72
        assert entries == [('o=orga', {'o': 'orga', 'objectclass': 'organization'})]
tests/ldap/test_search_endpoint.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import ldap
18
import pytest
19

  
20

  
21
@pytest.fixture
22
def ldap_configure(ldap_object):
23
    # configure approximative indexes
24
    conn = ldap_object.get_connection_admin()
25
    ldif = [
26
        (
27
            ldap.MOD_ADD,
28
            'olcDbIndex',
29
            b'cn,sn,mail pres,eq,approx',
30
        )
31
    ]
32
    conn.modify_s('olcDatabase={%s}mdb,cn=config' % (ldap_object.db_index - 1), ldif)
33

  
34
    # add some entries
35
    ldap_object.add_ldif(
36
        '''
37
dn: uid=johndoe,o=orga
38
objectClass: inetOrgPerson
39
uid: johndoe
40
cn: John Doe
41
sn: Doe
42
gn: John
43

  
44
dn: uid=janedoe,o=orga
45
objectClass: inetOrgPerson
46
uid: janedoe
47
cn: Jane Doe
48
sn: Doe
49
gn: Jane
50

  
51
dn: uid=janefoo,uid=janedoe,o=orga
52
objectClass: inetOrgPerson
53
uid: janefoo
54
cn: Jane Foo
55
sn: Foo
56
gn: Jane
57
'''
58
    )
59

  
60

  
61
def test_server_unavailaible(app, resource):
62
    response = app.get(
63
        '/ldap/resource/search',
64
        params={
65
            'q': 'Doe',
66
            'ldap_base_dn': 'o=orga',
67
            'search_attribute': 'cn',
68
            'id_attribute': 'uid',
69
        },
70
    )
71
    assert response.json == {
72
        'data': [{'disabled': True, 'id': '', 'text': 'Directory server is unavailable'}],
73
        'err': 1,
74
        'err_class': 'directory-server-unavailable',
75
        'err_desc': '{\'result\': -1, \'desc\': "Can\'t contact LDAP server", '
76
        "'errno': 107, 'ctrls': [], 'info': 'Transport endpoint is not "
77
        "connected'}",
78
    }
79

  
80

  
81
def test_q(app, resource, ldap_server):
82
    response = app.get(
83
        '/ldap/resource/search',
84
        params={
85
            'q': 'Doe',
86
            'ldap_base_dn': 'o=orga',
87
            'search_attribute': 'cn',
88
            'id_attribute': 'uid',
89
        },
90
    )
91
    assert response.json == {
92
        'err': 0,
93
        'data': [
94
            {
95
                'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'},
96
                'dn': 'uid=janedoe,o=orga',
97
                'id': 'janedoe',
98
                'text': 'Jane Doe',
99
            },
100
            {
101
                'attributes': {'cn': 'John Doe', 'uid': 'johndoe'},
102
                'dn': 'uid=johndoe,o=orga',
103
                'id': 'johndoe',
104
                'text': 'John Doe',
105
            },
106
        ],
107
    }
108

  
109

  
110
def test_q_prefix(app, resource, ldap_server):
111
    response = app.get(
112
        '/ldap/resource/search',
113
        params={
114
            'q': 'Doe',
115
            'ldap_base_dn': 'o=orga',
116
            'search_attribute': 'cn',
117
            'id_attribute': 'uid',
118
            'search_op': 'prefix',
119
        },
120
    )
121
    assert response.json == {
122
        'err': 0,
123
        'data': [],
124
    }
125
    response = app.get(
126
        '/ldap/resource/search',
127
        params={
128
            'q': 'jane',
129
            'ldap_base_dn': 'o=orga',
130
            'search_attribute': 'cn',
131
            'id_attribute': 'uid',
132
            'search_op': 'prefix',
133
        },
134
    )
135
    assert response.json == {
136
        'err': 0,
137
        'data': [
138
            {
139
                'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'},
140
                'dn': 'uid=janedoe,o=orga',
141
                'id': 'janedoe',
142
                'text': 'Jane Doe',
143
            },
144
            {
145
                'attributes': {'cn': 'Jane Foo', 'uid': 'janefoo'},
146
                'dn': 'uid=janefoo,uid=janedoe,o=orga',
147
                'id': 'janefoo',
148
                'text': 'Jane Foo',
149
            },
150
        ],
151
    }
152

  
153

  
154
def test_q_approx(app, resource, ldap_server):
155
    response = app.get(
156
        '/ldap/resource/search',
157
        params={
158
            'q': 'jne  do',
159
            'ldap_base_dn': 'o=orga',
160
            'search_attribute': 'cn',
161
            'id_attribute': 'uid',
162
            'search_op': 'approx',
163
        },
164
    )
165
    assert response.json == {
166
        'err': 0,
167
        'data': [
168
            {
169
                'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'},
170
                'dn': 'uid=janedoe,o=orga',
171
                'id': 'janedoe',
172
                'text': 'Jane Doe',
173
            },
174
            {
175
                'attributes': {'cn': 'John Doe', 'uid': 'johndoe'},
176
                'dn': 'uid=johndoe,o=orga',
177
                'id': 'johndoe',
178
                'text': 'John Doe',
179
            },
180
        ],
181
    }
182

  
183

  
184
def test_id(app, resource, ldap_server):
185
    response = app.get(
186
        '/ldap/resource/search',
187
        params={
188
            'id': 'janedoe',
189
            'ldap_base_dn': 'o=orga',
190
            'search_attribute': 'cn',
191
            'id_attribute': 'uid',
192
        },
193
    )
194
    assert response.json == {
195
        'err': 0,
196
        'data': [
197
            {
198
                'attributes': {
199
                    'cn': 'Jane Doe',
200
                    'uid': 'janedoe',
201
                },
202
                'dn': 'uid=janedoe,o=orga',
203
                'id': 'janedoe',
204
                'text': 'Jane Doe',
205
            }
206
        ],
207
    }
208

  
209

  
210
def test_sizelimit(app, resource, ldap_server):
211
    response = app.get(
212
        '/ldap/resource/search',
213
        params={
214
            'q': 'Doe',
215
            'ldap_base_dn': 'o=orga',
216
            'search_attribute': 'cn',
217
            'id_attribute': 'uid',
218
            'sizelimit': '1',
219
        },
220
    )
221
    assert len(response.json['data']) == 1
222

  
223

  
224
def test_text_template(app, resource, ldap_server):
225
    response = app.get(
226
        '/ldap/resource/search',
227
        params={
228
            'id': 'janedoe',
229
            'ldap_base_dn': 'o=orga',
230
            'search_attribute': 'cn',
231
            'id_attribute': 'uid',
232
            'ldap_attributes': 'sn givenname',
233
            'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
234
        },
235
    )
236
    assert response.json['data'][0]['text'] == 'Doe Jane (janedoe)'
237

  
238

  
239
def test_scope(app, resource, ldap_server):
240
    response = app.get(
241
        '/ldap/resource/search',
242
        params={
243
            'q': 'Foo',
244
            'scope': 'onelevel',
245
            'ldap_base_dn': 'o=orga',
246
            'search_attribute': 'cn',
247
            'id_attribute': 'uid',
248
            'ldap_attributes': 'sn givenname',
249
            'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
250
        },
251
    )
252
    assert len(response.json['data']) == 0
253

  
254
    response = app.get(
255
        '/ldap/resource/search',
256
        params={
257
            'q': 'Foo',
258
            'ldap_base_dn': 'o=orga',
259
            'search_attribute': 'cn',
260
            'id_attribute': 'uid',
261
            'ldap_attributes': 'sn givenname',
262
            'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
263
        },
264
    )
265
    assert len(response.json['data']) == 1
tox.ini
46 46
  responses
47 47
  zeep<3.3
48 48
  codestyle: pre-commit
49
  ldaptools
49 50
commands =
50 51
  ./get_wcs.sh
51 52
  py.test {posargs: --numprocesses {env:NUMPROCESSES:1} --dist loadfile {env:FAST:} {env:COVERAGE:} {env:JUNIT:} tests/}
......
80 81
  pytest-freezegun
81 82
  responses
82 83
  mohawk
84
  ldaptools
83 85
commands =
84 86
  ./pylint.sh passerelle/ tests/
85
-