Projet

Général

Profil

0002-add-ldap-connector-66533.patch

Benjamin Dauvergne, 03 août 2022 15:01

Télécharger (43,9 ko)

Voir les différences:

Subject: [PATCH 2/2] add ldap connector (#66533)

 passerelle/apps/ldap/__init__.py              |   0
 .../apps/ldap/migrations/0001_initial.py      |  62 ++++
 passerelle/apps/ldap/migrations/__init__.py   |   0
 passerelle/apps/ldap/models.py                | 350 ++++++++++++++++++
 passerelle/settings.py                        |   1 +
 passerelle/utils/forms.py                     |   5 +
 passerelle/utils/models.py                    |  15 +
 setup.py                                      |   2 +
 tests/ldap/__init__.py                        |   0
 tests/ldap/cert.pem                           |  19 +
 tests/ldap/conftest.py                        | 117 ++++++
 tests/ldap/key.pem                            |  28 ++
 tests/ldap/test_manager.py                    | 149 ++++++++
 tests/ldap/test_model.py                      |  70 ++++
 tests/ldap/test_search_endpoint.py            | 178 +++++++++
 tox.ini                                       |   2 +
 16 files changed, 998 insertions(+)
 create mode 100644 passerelle/apps/ldap/__init__.py
 create mode 100644 passerelle/apps/ldap/migrations/0001_initial.py
 create mode 100644 passerelle/apps/ldap/migrations/__init__.py
 create mode 100644 passerelle/apps/ldap/models.py
 create mode 100644 tests/ldap/__init__.py
 create mode 100644 tests/ldap/cert.pem
 create mode 100644 tests/ldap/conftest.py
 create mode 100644 tests/ldap/key.pem
 create mode 100644 tests/ldap/test_manager.py
 create mode 100644 tests/ldap/test_model.py
 create mode 100644 tests/ldap/test_search_endpoint.py
passerelle/apps/ldap/migrations/0001_initial.py
1
# Generated by Django 3.2.14 on 2022-08-02 14:37
2

  
3
from django.db import migrations, models
4

  
5
import passerelle.utils.models
6

  
7

  
8
class Migration(migrations.Migration):
9

  
10
    initial = True
11

  
12
    dependencies = [
13
        ('base', '0029_auto_20210202_1627'),
14
    ]
15

  
16
    operations = [
17
        migrations.CreateModel(
18
            name='Resource',
19
            fields=[
20
                (
21
                    'id',
22
                    models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID'),
23
                ),
24
                ('title', models.CharField(max_length=50, verbose_name='Title')),
25
                ('slug', models.SlugField(unique=True, verbose_name='Identifier')),
26
                ('description', models.TextField(verbose_name='Description')),
27
                ('ldap_url', passerelle.utils.models.LDAPURLField(max_length=512, verbose_name='Server URL')),
28
                (
29
                    'ldap_bind_dn',
30
                    models.CharField(blank=True, max_length=256, null=True, verbose_name='Bind DN'),
31
                ),
32
                (
33
                    'ldap_bind_password',
34
                    models.CharField(blank=True, max_length=128, null=True, verbose_name='Bind password'),
35
                ),
36
                (
37
                    'ldap_tls_cert',
38
                    passerelle.utils.models.BinaryFileField(
39
                        blank=True, max_length=32768, null=True, verbose_name='TLS client certificate'
40
                    ),
41
                ),
42
                (
43
                    'ldap_tls_key',
44
                    passerelle.utils.models.BinaryFileField(
45
                        blank=True, max_length=32768, null=True, verbose_name='TLS client key'
46
                    ),
47
                ),
48
                (
49
                    'users',
50
                    models.ManyToManyField(
51
                        blank=True,
52
                        related_name='_ldap_resource_users_+',
53
                        related_query_name='+',
54
                        to='base.ApiUser',
55
                    ),
56
                ),
57
            ],
58
            options={
59
                'verbose_name': 'LDAP',
60
            },
61
        ),
62
    ]
passerelle/apps/ldap/models.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import base64
18
import contextlib
19
import tempfile
20

  
21
import ldap
22
import ldap.filter
23
from django.core.cache import cache
24
from django.core.exceptions import ValidationError
25
from django.db import models
26
from django.utils.html import format_html
27
from django.utils.translation import gettext_lazy as _
28
from OpenSSL import crypto
29

  
30
from passerelle.base.models import BaseResource
31
from passerelle.utils.api import endpoint
32
from passerelle.utils.jsonresponse import APIError
33
from passerelle.utils.models import BinaryFileField, LDAPURLField
34
from passerelle.utils.templates import render_to_string
35

  
36

  
37
def validate_certificate(value):
38
    try:
39
        crypto.load_certificate(crypto.FILETYPE_PEM, bytes(value))
40
    except Exception:
41
        raise ValidationError(_('Invalid certificate.'))
42

  
43

  
44
def validate_private_key(value):
45
    try:
46
        crypto.load_privatekey(crypto.FILETYPE_PEM, bytes(value))
47
    except Exception:
48
        raise ValidationError(_('Invalid private key.'))
49

  
50

  
51
class Resource(BaseResource):
52
    ldap_url = LDAPURLField(verbose_name=_('Server URL'), max_length=512)
53
    ldap_bind_dn = models.CharField(verbose_name=_('Bind DN'), max_length=256, null=True, blank=True)
54
    ldap_bind_password = models.CharField(
55
        verbose_name=_('Bind password'), max_length=128, null=True, blank=True
56
    )
57
    ldap_tls_cert = BinaryFileField(
58
        verbose_name=_('TLS client certificate'),
59
        max_length=1024 * 32,
60
        null=True,
61
        blank=True,
62
        validators=[validate_certificate],
63
    )
64
    ldap_tls_key = BinaryFileField(
65
        verbose_name=_('TLS client key'),
66
        max_length=1024 * 32,
67
        null=True,
68
        blank=True,
69
        validators=[validate_private_key],
70
    )
71

  
72
    category = _('Misc')
73

  
74
    class Meta:
75
        verbose_name = _('LDAP')
76

  
77
    def tls_cert(self, value):
78
        try:
79
            cert = crypto.load_certificate(crypto.FILETYPE_PEM, bytes(value))
80
            name = ','.join(
81
                '%s=%s' % (a.decode(), b.decode()) for a, b in cert.get_subject().get_components()
82
            )
83
        except Exception:
84
            name = ('%s bytes') % len(value)
85
        return format_html(
86
            '<a href="data:application/octet-string;base64,{}" target="_blank" download="tls.crt">{}<a/>',
87
            base64.b64encode(value).decode(),
88
            name,
89
        )
90

  
91
    def clean(self):
92
        if bool(self.ldap_bind_dn) != bool(self.ldap_bind_password):
93
            raise ValidationError('Bind DN and password must be set together.')
94
        if bool(self.ldap_tls_cert) != bool(self.ldap_tls_key):
95
            raise ValidationError('Client certificate and key must be set together.')
96

  
97
    def get_description_fields(self):
98
        fields = super().get_description_fields()
99
        fields = [
100
            (field, self.tls_cert(value) if field.name == 'ldap_tls_cert' and value else value)
101
            for field, value in fields
102
        ]
103
        return fields
104

  
105
    def check_status(self):
106
        with self.get_connection() as conn:
107
            conn.whoami_s()
108

  
109
    @contextlib.contextmanager
110
    def get_connection(self):
111
        with contextlib.ExitStack() as stack:
112
            conn = ldap.initialize(self.ldap_url)
113
            conn.set_option(ldap.OPT_X_TLS_REQUIRE_SAN, ldap.OPT_X_TLS_NEVER)
114
            conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
115
            if self.ldap_tls_cert and self.ldap_tls_key:
116
                cert_tempfile = stack.enter_context(tempfile.NamedTemporaryFile())
117
                cert_tempfile.write(self.ldap_tls_cert)
118
                cert_tempfile.flush()
119
                key_tempfile = stack.enter_context(tempfile.NamedTemporaryFile())
120
                key_tempfile.write(self.ldap_tls_key)
121
                key_tempfile.flush()
122

  
123
                conn.set_option(ldap.OPT_X_TLS_CERTFILE, cert_tempfile.name)
124
                conn.set_option(ldap.OPT_X_TLS_KEYFILE, key_tempfile.name)
125
            conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
126
            if self.ldap_bind_dn:
127
                conn.simple_bind_s(self.ldap_bind_dn, self.ldap_bind_password or '')
128
            else:
129
                conn.simple_bind_s()
130
            yield conn
131
            conn.unbind()
132

  
133
    def ldap_search(self, base_dn, scope, ldap_filter, ldap_attributes, sizelimit=-1, timeout=5):
134
        with self.get_connection() as conn:
135
            message_id = conn.search_ext(
136
                base_dn, scope, ldap_filter, ldap_attributes, timeout=timeout, sizelimit=sizelimit
137
            )
138
            while True:
139
                try:
140
                    dummy, entries = conn.result(message_id, all=0)
141
                except ldap.SIZELIMIT_EXCEEDED:
142
                    break
143
                if not entries:
144
                    break
145
                for dn, attributes in entries:
146
                    if dn:
147
                        decoded_attributes = cidict()
148
                        # decode values to unicode, if possible, and keep only the first value
149
                        for k, values in attributes.items():
150
                            decoded_values = []
151
                            for value in values:
152
                                try:
153
                                    decoded_values.append(value.decode())
154
                                except UnicodeDecodeError:
155
                                    pass
156
                            if decoded_values:
157
                                if len(decoded_values) == 1:
158
                                    decoded_attributes[k] = decoded_values[0]
159
                                else:
160
                                    decoded_attributes[k] = decoded_values
161
                        yield dn, decoded_attributes
162

  
163
    def search(
164
        self,
165
        ldap_base_dn,
166
        scope,
167
        ldap_filter,
168
        ldap_attributes,
169
        sizelimit,
170
        id_attribute,
171
        search_attribute,
172
        text_template,
173
    ):
174
        ldap_attributes = tuple(sorted(ldap_attributes))
175
        cache_fingerprint = str(
176
            [
177
                ldap_base_dn,
178
                scope,
179
                ldap_filter,
180
                ldap_attributes,
181
                sizelimit,
182
                id_attribute,
183
                search_attribute,
184
                text_template,
185
            ]
186
        )
187
        cache_key = f'ldap-{self.id}-{hash(cache_fingerprint)}'
188
        cache_value = cache.get(cache_key)
189
        if cache_value and cache_value[0] == cache_fingerprint:
190
            return {'data': cache_value[1]}
191
        try:
192
            entries = list(
193
                self.ldap_search(ldap_base_dn, scope, ldap_filter, ldap_attributes, sizelimit=sizelimit)
194
            )
195
        except ldap.LDAPError as e:
196
            # add a disabled entry to show something on search errors, with display_disabled_items on w.c.s.
197
            return {
198
                'err': 1,
199
                'data': [
200
                    {
201
                        'id': '',
202
                        'text': _('Directory server is unavailable'),
203
                        'disabled': True,
204
                    }
205
                ],
206
                'err_clss': 'directory-server-unavailable',
207
                'err_desc': str(e),
208
            }
209
        data = []
210
        for dn, attributes in entries:
211
            entry_id = attributes.get(id_attribute)
212
            if not entry_id:
213
                continue
214
            if text_template:
215
                entry_text = render_to_string(text_template, attributes)
216
            else:
217
                entry_text = attributes.get(search_attribute)
218
            data.append(
219
                {
220
                    'id': entry_id,
221
                    'text': entry_text,
222
                    'dn': dn,
223
                    'attributes': attributes,
224
                }
225
            )
226
        data.sort(key=lambda x: (x['text'], x['id']))
227
        cache.set(cache_key, (cache_fingerprint, data))
228
        return {'data': data}
229

  
230
    @endpoint(
231
        description=_('Search'),
232
        name='search',
233
        perm='can_access',
234
        parameters={
235
            'ldap_base_dn': {
236
                'description': _('Base DN for the LDAP search'),
237
                'example_value': 'dc=company,dc=com',
238
            },
239
            'search_attribute': {
240
                'description': _('Attribute to search for the substring search'),
241
                'example_value': 'cn',
242
            },
243
            'id_attribute': {
244
                'description': _('Attribute used as a unique identifier'),
245
                'example_value': 'uid',
246
            },
247
            'text_template': {
248
                'description': _(
249
                    'Optional template string based on LDAP attributes '
250
                    'to create a text value, if none given the search_attribute is used'
251
                ),
252
                'example_value': '{{ givenName }} {{ surname }}',
253
            },
254
            'ldap_attributes': {
255
                'description': _('Space separated list of LDAP attributes to retrieve'),
256
                'example_value': 'l sn givenName locality',
257
            },
258
            'id': {
259
                'description': _('Identifier for exacte retrieval, using the id_attribute'),
260
                'example_value': 'johndoe',
261
            },
262
            'q': {
263
                'description': _('Substring to search in the search_attribute'),
264
                'example_value': 'John Doe',
265
            },
266
            'sizelimit': {
267
                'description': _('Maximum number of entries to retrieve, between 1 and 200, default is 30.')
268
            },
269
            'scope': {
270
                'description': _('Scope of the LDAP search, subtree or onelevel, default is subtree.'),
271
            },
272
            'filter': {
273
                'description': _('Extra LDAP filter.'),
274
                'example_value': 'objectClass=*',
275
            },
276
        },
277
    )
278
    def search_endpoint(
279
        self,
280
        request,
281
        ldap_base_dn,
282
        search_attribute,
283
        id_attribute,
284
        text_template=None,
285
        ldap_attributes=None,
286
        id=None,
287
        q=None,
288
        sizelimit=None,
289
        scope=None,
290
        filter=None,
291
    ):
292
        search_attribute = search_attribute.lower()
293
        id_attribute = id_attribute.lower()
294
        if not search_attribute.isascii():
295
            raise APIError('search_attribute contains non ASCII characters')
296
        if not id_attribute.isascii():
297
            raise APIError('id_attribute contains non ASCII characters')
298
        ldap_attributes = set(ldap_attributes.split()) if ldap_attributes else set()
299
        ldap_attributes.update([search_attribute, id_attribute])
300
        if not all(attribute.isascii() for attribute in ldap_attributes):
301
            raise APIError('ldap_attributes contains non ASCII characters')
302
        try:
303
            sizelimit = int(sizelimit)
304
        except (ValueError, TypeError):
305
            pass
306
        sizelimit = max(1, min(sizelimit or 30, 200))
307
        if not q and not id:
308
            raise APIError('q or id are mandatory parameters', http_status=400)
309
        if id:
310
            ldap_filter = '(%s=%s)' % (id_attribute, ldap.filter.escape_filter_chars(id))
311
        elif q:
312
            ldap_filter = '(%s=*%s*)' % (search_attribute, ldap.filter.escape_filter_chars(q))
313
        if filter:
314
            if not filter.startswith('('):
315
                filter = '(%s)' % filter
316
            ldap_filter = '(&%s%s)' % (ldap_filter, filter)
317
        scopes = {
318
            'subtree': ldap.SCOPE_SUBTREE,
319
            'onelevel': ldap.SCOPE_ONELEVEL,
320
        }
321
        scope = scopes.get(scope, ldap.SCOPE_SUBTREE)
322
        return self.search(
323
            ldap_base_dn=ldap_base_dn,
324
            scope=scope,
325
            ldap_filter=ldap_filter,
326
            ldap_attributes=ldap_attributes,
327
            sizelimit=sizelimit,
328
            id_attribute=id_attribute,
329
            search_attribute=search_attribute,
330
            text_template=text_template,
331
        )
332

  
333

  
334
# use a case-insensitive dictionnary to handle map of attribute to values.
335

  
336

  
337
class cidict(dict):
338
    '''Case insensitive dictionnary'''
339

  
340
    def __setitem__(self, key, value):
341
        super().__setitem__(key.lower(), value)
342

  
343
    def __getitem__(self, key):
344
        return super().__getitem__(key.lower())
345

  
346
    def __contains__(self, key):
347
        return super().__contains__(key.lower())
348

  
349
    def get(self, key, default=None, /):
350
        return super().get(key.lower(), default)
passerelle/settings.py
149 149
    'passerelle.apps.gesbac',
150 150
    'passerelle.apps.holidays',
151 151
    'passerelle.apps.jsondatastore',
152
    'passerelle.apps.ldap',
152 153
    'passerelle.apps.maelis',
153 154
    'passerelle.apps.mdel',
154 155
    'passerelle.apps.mdel_ddpacs',
passerelle/utils/forms.py
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17 17
from django import forms
18
from django.core import validators
19

  
20

  
21
class LDAPURLField(forms.URLField):
22
    default_validators = [validators.URLValidator(schemes=['ldap', 'ldaps'])]
18 23

  
19 24

  
20 25
class BinaryFileInput(forms.ClearableFileInput):
passerelle/utils/models.py
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17
from django.core import validators
17 18
from django.db import models
18 19

  
19 20

  
21
class LDAPURLField(models.URLField):
22
    default_validators = [validators.URLValidator(schemes=['ldap', 'ldaps'])]
23

  
24
    def formfield(self, **kwargs):
25
        from .forms import LDAPURLField
26

  
27
        return super().formfield(
28
            **{
29
                'form_class': LDAPURLField,
30
                **kwargs,
31
            }
32
        )
33

  
34

  
20 35
class BinaryFileField(models.BinaryField):
21 36
    def __init__(self, *args, **kwargs):
22 37
        kwargs.setdefault('editable', True)
setup.py
165 165
        'pytz',
166 166
        'vobject',
167 167
        'Levenshtein',
168
        'python-ldap',
169
        'pyOpenSSL',
168 170
    ],
169 171
    cmdclass={
170 172
        'build': build,
tests/ldap/cert.pem
1
-----BEGIN CERTIFICATE-----
2
MIIDBjCCAe6gAwIBAgIUTKopT76CFlsVcI7FAilaYLILz0owDQYJKoZIhvcNAQEL
3
BQAwIzEhMB8GA1UEAwwYbG9jYWxob3N0LmVudHJvdXZlcnQub3JnMB4XDTE4MTIw
4
NTE2NTkyNFoXDTI4MTIwMjE2NTkyNFowIzEhMB8GA1UEAwwYbG9jYWxob3N0LmVu
5
dHJvdXZlcnQub3JnMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA4xsL
6
n25yittbjk5bcKvY2I8zPivL6YWn2MJimaQQSNzCw/8POmVPLmMIb3lcZjydFRad
7
+RTxZfnuvCCJrnGrG7hOsJNenTLLU0ugN/yQ1869cM07a9tjSzL7NCz9H1NIK1+Q
8
cBsTExc77dOWpwWI9TjqYYRL+zex3ml8cdqcQ7BQUQxAvA4UU63DM2G+5O3dE7l8
9
uvyBUU3kW/shHyhfweWNXO8IXXIjvDfPYkOsjc6en2kFMr+sENSUKgfDKjz/Uzqy
10
S7LBb4tkJALZM8QP56VeQAG1JZF2J2/y1RqBfIGRIEkYoaHcj6UATZa1xcZjMubL
11
z3otRNYcRXKJMYWGbQIDAQABozIwMDAJBgNVHRMEAjAAMCMGA1UdEQQcMBqCGGxv
12
Y2FsaG9zdC5lbnRyb3V2ZXJ0Lm9yZzANBgkqhkiG9w0BAQsFAAOCAQEAFVPavBah
13
mIjgnTjq6ZbFxXTNJW0TrqN8olbKJ6SfwWVk0I8px7POekFaXd+egsFJlWYyH9q4
14
HkKotddRYYrWoXcPiodNfUa+bRnh2WYl2rEGMW5dbBf/MYCDts68c3SoA7JIYJ8w
15
0QZGAkijKNtVML0/FrLuJWbfFBAWH8JB46BcAg/8flbMHAULzV3F1g/v0A3FG3Y/
16
9fVr+lN5qs+NB9NXIMdf5wXrmJQYRjotyOjUO6yTFqDFvqE7DEpKQD5hnvqJoXCz
17
zYQS1DjH1qSRc5vC8I7YlJowCfnI9MsEICSrsk75DhT091aJC2XX93o4zhfNxmO5
18
Kj28hP87GHgNIg==
19
-----END CERTIFICATE-----
tests/ldap/conftest.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import contextlib
18
import os.path
19
import pathlib
20
import socket
21

  
22
import pytest
23
from ldaptools.slapd import Slapd, has_slapd
24

  
25
pytestmark = pytest.mark.skipif(not has_slapd(), reason='slapd is not installed')
26

  
27
base_dir = os.path.dirname(__file__)
28
cert_file = os.path.join(base_dir, 'cert.pem')
29
key_file = os.path.join(base_dir, 'key.pem')
30

  
31

  
32
@pytest.fixture
33
def cert():
34
    return pathlib.Path(cert_file)
35

  
36

  
37
@pytest.fixture
38
def key():
39
    return pathlib.Path(key_file)
40

  
41

  
42
@pytest.fixture
43
def cert_content(cert):
44
    with cert.open(mode='rb') as fd:
45
        return fd.read()
46

  
47

  
48
@pytest.fixture
49
def key_content(key):
50
    with key.open(mode='rb') as fd:
51
        return fd.read()
52

  
53

  
54
def find_free_tcp_port():
55
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
56
        s.bind(('', 0))
57
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
58
        return s.getsockname()[1]
59

  
60

  
61
@pytest.fixture
62
def ldap_params():
63
    return {
64
        'ldap_url': 'ldap://localhost.entrouvert.org:%s' % find_free_tcp_port(),
65
    }
66

  
67

  
68
@pytest.fixture
69
def ldap_object(ldap_params):
70
    with Slapd(**ldap_params) as slapd:
71
        yield slapd
72

  
73

  
74
@pytest.fixture
75
def ldap_configure():
76
    pass
77

  
78

  
79
@pytest.fixture
80
def ldap_server(ldap_object, ldap_configure):
81
    return ldap_object
82

  
83

  
84
@pytest.fixture
85
def resource_class(db):
86
    from passerelle.apps.ldap.models import Resource
87

  
88
    return Resource
89

  
90

  
91
@pytest.fixture
92
def resource_params(ldap_params):
93
    return {
94
        'title': 'resource',
95
        'slug': 'resource',
96
        'description': 'resource',
97
        'ldap_url': ldap_params['ldap_url'],
98
    }
99

  
100

  
101
@pytest.fixture
102
def resource_access_rights(resource_object):
103
    from tests.utils import setup_access_rights
104

  
105
    setup_access_rights(resource_object)
106

  
107

  
108
@pytest.fixture
109
def resource_object(resource_class, resource_params):
110
    resource = resource_class(**resource_params)
111
    resource.save()
112
    return resource
113

  
114

  
115
@pytest.fixture
116
def resource(resource_object, resource_access_rights):
117
    return resource_object
tests/ldap/key.pem
1
-----BEGIN PRIVATE KEY-----
2
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDjGwufbnKK21uO
3
Tltwq9jYjzM+K8vphafYwmKZpBBI3MLD/w86ZU8uYwhveVxmPJ0VFp35FPFl+e68
4
IImucasbuE6wk16dMstTS6A3/JDXzr1wzTtr22NLMvs0LP0fU0grX5BwGxMTFzvt
5
05anBYj1OOphhEv7N7HeaXxx2pxDsFBRDEC8DhRTrcMzYb7k7d0TuXy6/IFRTeRb
6
+yEfKF/B5Y1c7whdciO8N89iQ6yNzp6faQUyv6wQ1JQqB8MqPP9TOrJLssFvi2Qk
7
AtkzxA/npV5AAbUlkXYnb/LVGoF8gZEgSRihodyPpQBNlrXFxmMy5svPei1E1hxF
8
cokxhYZtAgMBAAECggEAOUZI2BxyprJLlMgOJ4wvU+5JbhR9iJc8jV34n+bQdI+4
9
TtW0cXW7UmeHaRWiR+Zhd0AM9xRhDObLXoaWMnhYPtVsgvunkN2OiaM49OWtYb+x
10
5xDbO4hIsl5ZG/98lrnaKZYgRyWM2fOyGXiTNewfbji8Y3uJ7gFNylmwGMaZQjhr
11
YNaqNEV7Vs2n7oERxqzKG9947oBAx2hpmoaW6eMyXcWl2ov7iHpJSKUBKho+5PWc
12
J731no0OsGuS+3jHa/0nZXrT8nKmemyDMdSfWmtTv659L/guFInZpHPfFVLf56vc
13
J6zb/IzEJV+Nh7CfBsMbHlTYBeUFlRWsy9t70+OZwQKBgQD3J4aVN4vJhXX1zDgL
14
dVAczwLGKXY38BoBjOeRtVhXHs5p/eNIqeZ2YbYwBBy3nL414Un7gqb9fDtg0i3n
15
5mQIOWhvpIYUxtwIPgYwzumxxp/n7XdU4BPDbxejZxkuC7AR5bB34pwJAJvWRGEf
16
0X1TxJlqULhiZ6g18O3S0oiJtwKBgQDrO9ROaj6kkxjYHmljBZIXXhdKDcn0AqPi
17
w20Aaafx0oxNQAoq8Gtu22Z1QHwRdBeUJwqCbmHVCCwbMf/568zFAANuT9bKMe6X
18
J0p0nTDiyn8w9MfduFuG4cUMn4oK6dIuYlscguoPQCvQdciwG+djqwTrHib5TEbm
19
jeKEkY2A+wKBgQDvXt+wy2BeqBzMF6M8Lb2OeUwVgniVyrxVPhPVgk5x6ks+SoAD
20
k1G62/3o2UK67lsmsfDGYA69uMGFj2qYjAHcGUW1wyF9I/BdJz01rmCWJmoe5VXK
21
5U8e3AyH3MV9XCKF4vCb2+UFrwo/ZnCusWVxaRqw5kb+P6ihvZuIsRE+VwKBgQC7
22
2duBg3bjFlUQwbiHSzuPTaRrjvdn1XPq8wVo/vcPNoS0bB+yiqxAqxT3LbfmeD8c
23
INFTt7KI3S3byeIRQy0TZR9YSInOjnFqZAYheiY/9lX8Un4Jod/1pvYlToJ+lJs0
24
T3dTHXitFSHoJydM+/ucrEYRPNMC4tb75vKty06lYQKBgQCx49+g5kQaaRZ+4psw
25
+eolMpAwKDkpK5gYen6OsrT8m4hpxTmtiteMsH5Avb/fxqoJWLOjhN4EnEZTMJzr
26
LyGoKsTv7rhZwhRznE15rOzxmldWrcCkl7DGuM2GcKgguhCYF7U7KA+vUCeqCE0H
27
LA2grkY+TxFpg1pwYdF1hekmTw==
28
-----END PRIVATE KEY-----
tests/ldap/test_manager.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import json
18

  
19
import pytest
20
from webtest import Upload
21

  
22
pytestmark = pytest.mark.django_db
23

  
24

  
25
def login(app, username='admin', password='admin'):
26
    login_page = app.get('/login/')
27
    login_form = login_page.forms[0]
28
    login_form['username'] = username
29
    login_form['password'] = password
30
    resp = login_form.submit()
31
    assert resp.status_int == 302
32
    return app
33

  
34

  
35
@pytest.fixture
36
def app(app, admin_user):
37
    login(app)
38
    return app
39

  
40

  
41
def test_add(app, db, cert_content, key_content, resource_class):
42
    response = app.get('/manage/ldap/add')
43
    response.form.set('slug', 'resource')
44
    response.form.set('title', 'resource')
45
    response.form.set('description', 'resource')
46
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
47
    response.form.set('ldap_bind_dn', 'uid=user,o=orga')
48
    response.form.set('ldap_bind_password', 'password')
49
    response.form.set('ldap_tls_cert', Upload('cert.pem', cert_content, 'application/octet-stream'))
50
    response.form.set('ldap_tls_key', Upload('key.pem', key_content, 'application/octet-stream'))
51
    response = response.form.submit(status=302)
52
    assert resource_class.objects.count() == 1
53
    resource = resource_class.objects.get()
54
    assert resource.ldap_url == 'ldap://localhost.entrouvert.org'
55
    assert resource.ldap_bind_dn == 'uid=user,o=orga'
56
    assert resource.ldap_bind_password == 'password'
57
    assert bytes(resource.ldap_tls_cert) == cert_content
58
    assert bytes(resource.ldap_tls_key) == key_content
59

  
60

  
61
def test_missing_bind_password(app, db, cert_content, key_content, resource_class):
62
    response = app.get('/manage/ldap/add')
63
    response.form.set('slug', 'resource')
64
    response.form.set('title', 'resource')
65
    response.form.set('description', 'resource')
66
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
67
    response.form.set('ldap_bind_dn', 'uid=user,o=orga')
68
    response = response.form.submit(status=200)
69

  
70

  
71
def test_missing_bind_dn(app, db, cert_content, key_content, resource_class):
72
    response = app.get('/manage/ldap/add')
73
    response.form.set('slug', 'resource')
74
    response.form.set('title', 'resource')
75
    response.form.set('description', 'resource')
76
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
77
    response.form.set('ldap_bind_password', 'password')
78
    response = response.form.submit(status=200)
79

  
80

  
81
def test_missing_tls_key(app, db, cert_content, key_content, resource_class):
82
    response = app.get('/manage/ldap/add')
83
    response.form.set('slug', 'resource')
84
    response.form.set('title', 'resource')
85
    response.form.set('description', 'resource')
86
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
87
    response.form.set('ldap_tls_cert', Upload('cert.pem', cert_content, 'application/octet-stream'))
88
    response = response.form.submit(status=200)
89

  
90

  
91
def test_missing_tls_cert(app, db, cert_content, key_content, resource_class):
92
    response = app.get('/manage/ldap/add')
93
    response.form.set('slug', 'resource')
94
    response.form.set('title', 'resource')
95
    response.form.set('description', 'resource')
96
    response.form.set('ldap_url', 'ldap://localhost.entrouvert.org')
97
    response.form.set('ldap_tls_key', Upload('key.pem', key_content, 'application/octet-stream'))
98
    response = response.form.submit(status=200)
99

  
100

  
101
EXPORT_JSON = {
102
    'resources': [
103
        {
104
            '@type': 'passerelle-resource',
105
            'access_rights': [{'apiuser': 'all', 'codename': 'can_access'}],
106
            'description': 'resource',
107
            'ldap_bind_dn': None,
108
            'ldap_bind_password': None,
109
            'ldap_tls_cert': 'LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURCakNDQWU2Z0F3SUJBZ0lVVEtvcFQ3NkNGbHNWY0k3RkFpbGFZTElMejBvd0RRWUpLb1pJaHZjTkFRRUwKQlFBd0l6RWhNQjhHQTFVRUF3d1liRzlqWVd4b2IzTjBMbVZ1ZEhKdmRYWmxjblF1YjNKbk1CNFhEVEU0TVRJdwpOVEUyTlRreU5Gb1hEVEk0TVRJd01qRTJOVGt5TkZvd0l6RWhNQjhHQTFVRUF3d1liRzlqWVd4b2IzTjBMbVZ1CmRISnZkWFpsY25RdWIzSm5NSUlCSWpBTkJna3Foa2lHOXcwQkFRRUZBQU9DQVE4QU1JSUJDZ0tDQVFFQTR4c0wKbjI1eWl0dGJqazViY0t2WTJJOHpQaXZMNllXbjJNSmltYVFRU056Q3cvOFBPbVZQTG1NSWIzbGNaanlkRlJhZAorUlR4WmZudXZDQ0pybkdyRzdoT3NKTmVuVExMVTB1Z04veVExODY5Y00wN2E5dGpTekw3TkN6OUgxTklLMStRCmNCc1RFeGM3N2RPV3B3V0k5VGpxWVlSTCt6ZXgzbWw4Y2RxY1E3QlFVUXhBdkE0VVU2M0RNMkcrNU8zZEU3bDgKdXZ5QlVVM2tXL3NoSHloZndlV05YTzhJWFhJanZEZlBZa09zamM2ZW4ya0ZNcitzRU5TVUtnZkRLanovVXpxeQpTN0xCYjR0a0pBTFpNOFFQNTZWZVFBRzFKWkYySjIveTFScUJmSUdSSUVrWW9hSGNqNlVBVFphMXhjWmpNdWJMCnozb3RSTlljUlhLSk1ZV0diUUlEQVFBQm96SXdNREFKQmdOVkhSTUVBakFBTUNNR0ExVWRFUVFjTUJxQ0dHeHYKWTJGc2FHOXpkQzVsYm5SeWIzVjJaWEowTG05eVp6QU5CZ2txaGtpRzl3MEJBUXNGQUFPQ0FRRUFGVlBhdkJhaAptSWpnblRqcTZaYkZ4WFROSlcwVHJxTjhvbGJLSjZTZndXVmswSThweDdQT2VrRmFYZCtlZ3NGSmxXWXlIOXE0CkhrS290ZGRSWVlyV29YY1Bpb2ROZlVhK2JSbmgyV1lsMnJFR01XNWRiQmYvTVlDRHRzNjhjM1NvQTdKSVlKOHcKMFFaR0FraWpLTnRWTUwwL0ZyTHVKV2JmRkJBV0g4SkI0NkJjQWcvOGZsYk1IQVVMelYzRjFnL3YwQTNGRzNZLwo5ZlZyK2xONXFzK05COU5YSU1kZjV3WHJtSlFZUmpvdHlPalVPNnlURnFERnZxRTdERXBLUUQ1aG52cUpvWEN6CnpZUVMxRGpIMXFTUmM1dkM4STdZbEpvd0Nmbkk5TXNFSUNTcnNrNzVEaFQwOTFhSkMyWFg5M280emhmTnhtTzUKS2oyOGhQODdHSGdOSWc9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==',  # noqa: E501
110
            'ldap_tls_key': 'LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2d0lCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktrd2dnU2xBZ0VBQW9JQkFRRGpHd3VmYm5LSzIxdU8KVGx0d3E5allqek0rSzh2cGhhZll3bUtacEJCSTNNTEQvdzg2WlU4dVl3aHZlVnhtUEowVkZwMzVGUEZsK2U2OApJSW11Y2FzYnVFNndrMTZkTXN0VFM2QTMvSkRYenIxd3pUdHIyMk5MTXZzMExQMGZVMGdyWDVCd0d4TVRGenZ0CjA1YW5CWWoxT09waGhFdjdON0hlYVh4eDJweERzRkJSREVDOERoUlRyY016WWI3azdkMFR1WHk2L0lGUlRlUmIKK3lFZktGL0I1WTFjN3doZGNpTzhOODlpUTZ5TnpwNmZhUVV5djZ3UTFKUXFCOE1xUFA5VE9ySkxzc0Z2aTJRawpBdGt6eEEvbnBWNUFBYlVsa1hZbmIvTFZHb0Y4Z1pFZ1NSaWhvZHlQcFFCTmxyWEZ4bU15NXN2UGVpMUUxaHhGCmNva3hoWVp0QWdNQkFBRUNnZ0VBT1VaSTJCeHlwckpMbE1nT0o0d3ZVKzVKYmhSOWlKYzhqVjM0bitiUWRJKzQKVHRXMGNYVzdVbWVIYVJXaVIrWmhkMEFNOXhSaERPYkxYb2FXTW5oWVB0VnNndnVua04yT2lhTTQ5T1d0WWIreAo1eERiTzRoSXNsNVpHLzk4bHJuYUtaWWdSeVdNMmZPeUdYaVROZXdmYmppOFkzdUo3Z0ZOeWxtd0dNYVpRamhyCllOYXFORVY3VnMybjdvRVJ4cXpLRzk5NDdvQkF4MmhwbW9hVzZlTXlYY1dsMm92N2lIcEpTS1VCS2hvKzVQV2MKSjczMW5vME9zR3VTKzNqSGEvMG5aWHJUOG5LbWVteURNZFNmV210VHY2NTlML2d1RkluWnBIUGZGVkxmNTZ2YwpKNnpiL0l6RUpWK05oN0NmQnNNYkhsVFlCZVVGbFJXc3k5dDcwK09ad1FLQmdRRDNKNGFWTjR2SmhYWDF6RGdMCmRWQWN6d0xHS1hZMzhCb0JqT2VSdFZoWEhzNXAvZU5JcWVaMlliWXdCQnkzbkw0MTRVbjdncWI5ZkR0ZzBpM24KNW1RSU9XaHZwSVlVeHR3SVBnWXd6dW14eHAvbjdYZFU0QlBEYnhlalp4a3VDN0FSNWJCMzRwd0pBSnZXUkdFZgowWDFUeEpscVVMaGlaNmcxOE8zUzBvaUp0d0tCZ1FEck85Uk9hajZra3hqWUhtbGpCWklYWGhkS0RjbjBBcVBpCncyMEFhYWZ4MG94TlFBb3E4R3R1MjJaMVFId1JkQmVVSndxQ2JtSFZDQ3diTWYvNTY4ekZBQU51VDliS01lNlgKSjBwMG5URGl5bjh3OU1mZHVGdUc0Y1VNbjRvSzZkSXVZbHNjZ3VvUFFDdlFkY2l3RytkanF3VHJIaWI1VEVibQpqZUtFa1kyQSt3S0JnUUR2WHQrd3kyQmVxQnpNRjZNOExiMk9lVXdWZ25pVnlyeFZQaFBWZ2s1eDZrcytTb0FECmsxRzYyLzNvMlVLNjdsc21zZkRHWUE2OXVNR0ZqMnFZakFIY0dVVzF3eUY5SS9CZEp6MDFybUNXSm1vZTVWWEsKNVU4ZTNBeUgzTVY5WENLRjR2Q2IyK1VGcndvL1puQ3VzV1Z4YVJxdzVrYitQNmlodlp1SXNSRStWd0tCZ1FDNwoyZHVCZzNiakZsVVF3YmlIU3p1UFRhUnJqdmRuMVhQcTh3Vm8vdmNQTm9TMGJCK3lpcXhBcXhUM0xiZm1lRDhjCklORlR0N0tJM1MzYnllSVJReTBUWlI5WVNJbk9qbkZxWkFZaGVpWS85bFg4VW40Sm9kLzFwdllsVG9KK2xKczAKVDNkVEhYaXRGU0hvSnlkTSsvdWNyRVlSUE5NQzR0Yjc1dkt0eTA2bFlRS0JnUUN4NDkrZzVrUWFhUlorNHBzdworZW9sTXBBd0tEa3BLNWdZZW42T3NyVDhtNGhweFRtdGl0ZU1zSDVBdmIvZnhxb0pXTE9qaE40RW5FWlRNSnpyCkx5R29Lc1R2N3JoWndoUnpuRTE1ck96eG1sZFdyY0NrbDdER3VNMkdjS2dndWhDWUY3VTdLQSt2VUNlcUNFMEgKTEEyZ3JrWStUeEZwZzFwd1lkRjFoZWttVHc9PQotLS0tLUVORCBQUklWQVRFIEtFWS0tLS0tCg==',  # noqa: E501
111
            'ldap_url': 'ldap://localhost.entrouvert.org:52271',
112
            'log_level': 'INFO',
113
            'resource_type': 'ldap.resource',
114
            'slug': 'resource',
115
            'title': 'resource',
116
        }
117
    ],
118
}
119

  
120

  
121
class TestImportExport:
122
    @pytest.fixture
123
    def resource_params(self, resource_params, cert_content, key_content):
124
        return {
125
            **resource_params,
126
            'ldap_url': 'ldap://localhost.entrouvert.org:52271',
127
            'ldap_tls_cert': cert_content,
128
            'ldap_tls_key': key_content,
129
        }
130

  
131
    def test_import(self, app, resource_class, resource_params):
132
        assert not resource_class.objects.count()
133
        response = app.get('/manage/')
134
        response = response.click('Import')
135
        response.form.set('site_json', Upload('ldap.json', json.dumps(EXPORT_JSON).encode()))
136
        response.form.set('import_users', False)
137
        response = response.form.submit(status=302)
138
        instance = resource_class.objects.get()
139

  
140
        for key, value in resource_params.items():
141
            instance_value = getattr(instance, key)
142
            if isinstance(value, bytes):
143
                instance_value = bytes(instance_value)
144
            assert instance_value == value
145

  
146
    def test_export(self, app, resource, cert_content, key_content):
147
        response = app.get('/ldap/resource/')
148
        response = response.click('Export')
149
        assert json.loads(response.content) == EXPORT_JSON
tests/ldap/test_model.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import ldap
18
import pytest
19

  
20

  
21
def test_get_connection(resource):
22
    resource.get_connection()
23

  
24

  
25
class TestCheckStatus:
26
    def test_nok(self, resource):
27
        with pytest.raises(ldap.LDAPError):
28
            resource.check_status()
29

  
30
    def test_ok(self, resource, ldap_server):
31
        resource.check_status()
32

  
33

  
34
class TestTLSAuthentication:
35
    @pytest.fixture
36
    def ldap_params(self, ldap_params, key, cert):
37
        ldap_params['ldap_url'] = ldap_params['ldap_url'].replace('ldap:', 'ldaps:')
38
        return {**ldap_params, 'tls': (str(key), str(cert))}
39

  
40
    @pytest.fixture
41
    def ldap_configure(self, ldap_object, cert):
42
        conn = ldap_object.get_connection_admin()
43
        conn.modify_s(
44
            'cn=config',
45
            [
46
                (ldap.MOD_ADD, 'olcTLSCACertificateFile', str(cert).encode()),
47
                (ldap.MOD_ADD, 'olcTLSVerifyClient', b'demand'),
48
            ],
49
        )
50

  
51
    @pytest.fixture
52
    def resource_params(self, resource_params, cert_content, key_content):
53
        return {
54
            **resource_params,
55
            'ldap_tls_cert': cert_content,
56
            'ldap_tls_key': key_content,
57
        }
58

  
59
    def test_ok(self, resource, ldap_server):
60
        resource.check_status()
61

  
62

  
63
class TestLdapSearch:
64
    def test_nok(self, resource):
65
        with pytest.raises(ldap.LDAPError):
66
            list(resource.ldap_search('o=orga', ldap.SCOPE_SUBTREE, 'objectClass=*', ['*']))
67

  
68
    def test_ok(self, resource, ldap_server):
69
        entries = list(resource.ldap_search('o=orga', ldap.SCOPE_SUBTREE, 'objectClass=*', ['*']))
70
        assert entries == [('o=orga', {'o': 'orga', 'objectclass': 'organization'})]
tests/ldap/test_search_endpoint.py
1
# passerelle - uniform access to multiple data sources and services
2
# Copyright (C) 2022  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import pytest
18

  
19

  
20
@pytest.fixture
21
def ldap_configure(ldap_object):
22
    ldap_object.add_ldif(
23
        '''
24
dn: uid=johndoe,o=orga
25
objectClass: inetOrgPerson
26
uid: johndoe
27
cn: John Doe
28
sn: Doe
29
gn: John
30

  
31
dn: uid=janedoe,o=orga
32
objectClass: inetOrgPerson
33
uid: janedoe
34
cn: Jane Doe
35
sn: Doe
36
gn: Jane
37

  
38
dn: uid=janefoo,uid=janedoe,o=orga
39
objectClass: inetOrgPerson
40
uid: janefoo
41
cn: Jane Foo
42
sn: Foo
43
gn: Jane
44
'''
45
    )
46

  
47

  
48
def test_server_unavailaible(app, resource):
49
    response = app.get(
50
        '/ldap/resource/search',
51
        params={
52
            'q': 'Doe',
53
            'ldap_base_dn': 'o=orga',
54
            'search_attribute': 'cn',
55
            'id_attribute': 'uid',
56
        },
57
    )
58
    assert response.json == {
59
        'data': [{'disabled': True, 'id': '', 'text': 'Directory server is unavailable'}],
60
        'err': 1,
61
        'err_clss': 'directory-server-unavailable',
62
        'err_desc': '{\'result\': -1, \'desc\': "Can\'t contact LDAP server", '
63
        "'errno': 107, 'ctrls': [], 'info': 'Transport endpoint is not "
64
        "connected'}",
65
    }
66

  
67

  
68
def test_q(app, resource, ldap_server):
69
    response = app.get(
70
        '/ldap/resource/search',
71
        params={
72
            'q': 'Doe',
73
            'ldap_base_dn': 'o=orga',
74
            'search_attribute': 'cn',
75
            'id_attribute': 'uid',
76
        },
77
    )
78
    assert response.json == {
79
        'err': 0,
80
        'data': [
81
            {
82
                'attributes': {'cn': 'Jane Doe', 'uid': 'janedoe'},
83
                'dn': 'uid=janedoe,o=orga',
84
                'id': 'janedoe',
85
                'text': 'Jane Doe',
86
            },
87
            {
88
                'attributes': {'cn': 'John Doe', 'uid': 'johndoe'},
89
                'dn': 'uid=johndoe,o=orga',
90
                'id': 'johndoe',
91
                'text': 'John Doe',
92
            },
93
        ],
94
    }
95

  
96

  
97
def test_id(app, resource, ldap_server):
98
    response = app.get(
99
        '/ldap/resource/search',
100
        params={
101
            'id': 'janedoe',
102
            'ldap_base_dn': 'o=orga',
103
            'search_attribute': 'cn',
104
            'id_attribute': 'uid',
105
        },
106
    )
107
    assert response.json == {
108
        'err': 0,
109
        'data': [
110
            {
111
                'attributes': {
112
                    'cn': 'Jane Doe',
113
                    'uid': 'janedoe',
114
                },
115
                'dn': 'uid=janedoe,o=orga',
116
                'id': 'janedoe',
117
                'text': 'Jane Doe',
118
            }
119
        ],
120
    }
121

  
122

  
123
def test_sizelimit(app, resource, ldap_server):
124
    response = app.get(
125
        '/ldap/resource/search',
126
        params={
127
            'q': 'Doe',
128
            'ldap_base_dn': 'o=orga',
129
            'search_attribute': 'cn',
130
            'id_attribute': 'uid',
131
            'sizelimit': '1',
132
        },
133
    )
134
    assert len(response.json['data']) == 1
135

  
136

  
137
def test_text_template(app, resource, ldap_server):
138
    response = app.get(
139
        '/ldap/resource/search',
140
        params={
141
            'id': 'janedoe',
142
            'ldap_base_dn': 'o=orga',
143
            'search_attribute': 'cn',
144
            'id_attribute': 'uid',
145
            'ldap_attributes': 'sn givenname',
146
            'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
147
        },
148
    )
149
    assert response.json['data'][0]['text'] == 'Doe Jane (janedoe)'
150

  
151

  
152
def test_scope(app, resource, ldap_server):
153
    response = app.get(
154
        '/ldap/resource/search',
155
        params={
156
            'q': 'Foo',
157
            'scope': 'onelevel',
158
            'ldap_base_dn': 'o=orga',
159
            'search_attribute': 'cn',
160
            'id_attribute': 'uid',
161
            'ldap_attributes': 'sn givenname',
162
            'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
163
        },
164
    )
165
    assert len(response.json['data']) == 0
166

  
167
    response = app.get(
168
        '/ldap/resource/search',
169
        params={
170
            'q': 'Foo',
171
            'ldap_base_dn': 'o=orga',
172
            'search_attribute': 'cn',
173
            'id_attribute': 'uid',
174
            'ldap_attributes': 'sn givenname',
175
            'text_template': '{{ sN }} {{ giVenName }} ({{ uId }})',
176
        },
177
    )
178
    assert len(response.json['data']) == 1
tox.ini
46 46
  responses
47 47
  zeep<3.3
48 48
  codestyle: pre-commit
49
  ldaptools
49 50
commands =
50 51
  ./get_wcs.sh
51 52
  py.test {posargs: --numprocesses {env:NUMPROCESSES:1} --dist loadfile {env:FAST:} {env:COVERAGE:} {env:JUNIT:} tests/}
......
80 81
  pytest-freezegun
81 82
  responses
82 83
  mohawk
84
  ldaptools
83 85
commands =
84 86
  ./pylint.sh passerelle/ tests/
85
-