Projet

Général

Profil

0003-add-csv-import-framework-32833.patch

Benjamin Dauvergne, 21 juin 2019 18:57

Télécharger (36,7 ko)

Voir les différences:

Subject: [PATCH 3/4] add csv import framework (#32833)

 debian/control               |   4 +-
 setup.py                     |   2 +
 src/authentic2/csv_import.py | 583 +++++++++++++++++++++++++++++++++++
 src/authentic2/models.py     |   3 +
 tests/test_csv_import.py     | 384 +++++++++++++++++++++++
 5 files changed, 975 insertions(+), 1 deletion(-)
 create mode 100644 src/authentic2/csv_import.py
 create mode 100644 tests/test_csv_import.py
debian/control
29 29
    python-django-filters (>= 1),
30 30
    python-django-filters (<< 2),
31 31
    python-pil,
32
    python-tablib
32
    python-tablib,
33
    python-chardet,
34
    python-attr
33 35
Breaks: python-authentic2-auth-fc (<< 0.26)
34 36
Replaces: python-authentic2-auth-fc (<< 0.26)
35 37
Provides: ${python:Provides}, python-authentic2-auth-fc
setup.py
140 140
          'xstatic-select2',
141 141
          'pillow',
142 142
          'tablib',
143
          'chardet',
144
          'attrs',
143 145
      ],
144 146
      zip_safe=False,
145 147
      classifiers=[
src/authentic2/csv_import.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2019 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from __future__ import unicode_literals
18

  
19
import csv
20
import io
21

  
22
from chardet.universaldetector import UniversalDetector
23
import attr
24

  
25
from django import forms
26
from django.core.exceptions import FieldDoesNotExist
27
from django.core.validators import RegexValidator
28
from django.db import IntegrityError
29
from django.db.transaction import atomic
30
from django.utils import six
31
from django.utils.translation import ugettext as _
32

  
33
from authentic2 import app_settings
34
from authentic2.a2_rbac.utils import get_default_ou
35
from authentic2.custom_user.models import User
36
from authentic2.forms.profile import modelform_factory, BaseUserForm
37
from authentic2.models import Attribute, AttributeValue, UserExternalId
38

  
39

  
40
class UTF8Recoder(object):
41
    def __init__(self, fd):
42
        self.fd = fd
43

  
44
    def __iter__(self):
45
        return self
46

  
47
    def next(self):
48
        return self.fd.next().encode('utf-8')
49

  
50

  
51
class UnicodeReader(object):
52
    def __init__(self, fd, dialect='excel', **kwargs):
53
        self.reader = csv.reader(UTF8Recoder(fd), dialect=dialect, **kwargs)
54

  
55
    def next(self):
56
        row = self.reader.next()
57
        return [s.decode('utf-8') for s in row]
58

  
59
    def __iter__(self):
60
        return self
61

  
62

  
63
class CsvImporter(object):
64
    rows = None
65
    error = None
66
    error_description = None
67
    encoding = None
68

  
69
    def run(self, fd_or_str, encoding):
70
        if isinstance(fd_or_str, six.binary_type):
71
            input_fd = io.BytesIO(fd_or_str)
72
        elif isinstance(fd_or_str, six.text_type):
73
            input_fd = io.StringIO(fd_or_str)
74
        elif not hasattr(fd_or_str, 'read1'):
75
            try:
76
                input_fd = io.open(fd_or_str.fileno(), closefd=False, mode='rb')
77
            except Exception:
78
                try:
79
                    fd_or_str.seek(0)
80
                except Exception:
81
                    pass
82
                content = fd_or_str.read()
83
                if isinstance(content, six.text_type):
84
                    input_fd = io.StringIO(content)
85
                else:
86
                    input_fd = io.BytesIO(content)
87
        else:
88
            input_fd = fd_or_str
89

  
90
        assert hasattr(input_fd, 'read'), 'fd_or_str is not a string or a file object'
91

  
92
        def set_encoding(input_fd, encoding):
93
            # detect StringIO
94
            if hasattr(input_fd, 'line_buffering'):
95
                return input_fd
96

  
97
            if encoding == 'detect':
98
                detector = UniversalDetector()
99

  
100
                try:
101
                    for line in input_fd:
102
                        detector.feed(line)
103
                        if detector.done:
104
                            break
105
                    else:
106
                        self.error = Error('cannot-detect-encoding', _('Cannot detect encoding'))
107
                        return None
108
                    detector.close()
109
                    encoding = detector.result['encoding']
110
                finally:
111
                    input_fd.seek(0)
112

  
113
            if not hasattr(input_fd, 'readable'):
114
                input_fd = io.open(input_fd.fileno(), 'rb', closefd=False)
115
            return io.TextIOWrapper(input_fd, encoding=encoding)
116

  
117
        def parse_csv():
118
            try:
119
                dialect = csv.Sniffer().sniff(input_fd.read().encode('utf-8'))
120
            except csv.Error as e:
121
                self.error = Error('unknown-csv-dialect', _('Unknown CSV dialect: %s') % e)
122
                return False
123
            finally:
124
                input_fd.seek(0)
125

  
126
            if not dialect:
127
                self.error = Error('unknown-csv-dialect', _('Unknown CSV dialect'))
128
                return False
129
            reader = UnicodeReader(input_fd, dialect)
130
            self.rows = list(reader)
131
            return True
132

  
133
        input_fd = set_encoding(input_fd, encoding)
134
        if input_fd is None:
135
            return False
136

  
137
        return parse_csv()
138

  
139

  
140
@attr.s
141
class CsvHeader(object):
142
    column = attr.ib()
143
    name = attr.ib(default='')
144
    field = attr.ib(default=False, converter=bool)
145
    attribute = attr.ib(default=False, converter=bool)
146
    create = attr.ib(default=True, metadata={'flag': True})
147
    update = attr.ib(default=True, metadata={'flag': True})
148
    key = attr.ib(default=False, metadata={'flag': True})
149
    unique = attr.ib(default=False, metadata={'flag': True})
150
    globally_unique = attr.ib(default=False, metadata={'flag': True})
151
    verified = attr.ib(default=False, metadata={'flag': True})
152

  
153
    @property
154
    def flags(self):
155
        flags = []
156
        for attribute in attr.fields(self.__class__):
157
            if attribute.metadata.get('flag'):
158
                if getattr(self, attribute.name):
159
                    flags.append(attribute.name)
160
                else:
161
                    flags.append('no-' + attribute.name.replace('_', '-'))
162
        return flags
163

  
164

  
165
@attr.s
166
class Error(object):
167
    code = attr.ib()
168
    description = attr.ib(default='', cmp=False)
169

  
170

  
171
@attr.s(cmp=False)
172
class LineError(Error):
173
    line = attr.ib(default=0)
174
    column = attr.ib(default=0)
175

  
176
    @classmethod
177
    def from_error(cls, error):
178
        return cls(**attr.asdict(error))
179

  
180
    def as_error(self):
181
        return Error(self.code, self.description)
182

  
183
    def __eq__(self, other):
184
        if isinstance(other, Error):
185
            return self.as_error() == other
186
        return (self.code, self.line, self.column) == (other.code, other.line, other.column)
187

  
188

  
189
class ImportUserForm(BaseUserForm):
190
    def clean(self):
191
        super(BaseUserForm, self).clean()
192
        self._validate_unique = False
193

  
194
SOURCE_NAME = '_source_name'
195
SOURCE_ID = '_source_id'
196
SOURCE_COLUMNS = set([SOURCE_NAME, SOURCE_ID])
197

  
198

  
199
class ImportUserFormWithExternalId(ImportUserForm):
200
    locals()[SOURCE_NAME] = forms.CharField(
201
        label=_('Source name'),
202
        required=False,
203
        validators=[
204
            RegexValidator(
205
                r'^[a-zA-Z0-9_-]+$',
206
                _('source_name must no spaces and only letters, digits, - and _'),
207
                'invalid')])
208
    locals()[SOURCE_ID] = forms.CharField(
209
        label=_('Source external id'))
210

  
211

  
212
@attr.s
213
class CsvRow(object):
214
    line = attr.ib()
215
    cells = attr.ib(default=[])
216
    errors = attr.ib(default=[])
217
    is_valid = attr.ib(default=True)
218
    action = attr.ib(default=None)
219

  
220
    def __getitem__(self, header):
221
        for cell in self.cells:
222
            if cell.header == header or cell.header.name == header:
223
                return cell
224
        raise KeyError(header.name)
225

  
226
    def __iter__(self):
227
        return iter(self.cells)
228

  
229

  
230
@attr.s
231
class CsvCell(object):
232
    line = attr.ib()
233
    header = attr.ib()
234
    value = attr.ib(default=None)
235
    missing = attr.ib(default=False)
236
    errors = attr.ib(default=[])
237
    action = attr.ib(default=None)
238

  
239
    @property
240
    def column(self):
241
        return self.header.column
242

  
243

  
244
class Simulate(Exception):
245
    pass
246

  
247

  
248
class CancelImport(Exception):
249
    pass
250

  
251

  
252
class UserCsvImporter(object):
253
    csv_importer = None
254
    errors = None
255
    headers = None
256
    headers_by_name = None
257
    rows = None
258
    has_errors = False
259
    ou = None
260
    updated = 0
261
    created = 0
262
    rows_with_errors = 0
263

  
264
    def add_error(self, line_error):
265
        if not hasattr(line_error, 'line'):
266
            line_error = LineError.from_error(line_error)
267
        self.errors.append(line_error)
268

  
269
    def run(self, fd_or_str, encoding, ou=None, simulate=False):
270
        self.ou = ou or get_default_ou()
271
        self.errors = []
272
        self.csv_importer = CsvImporter()
273

  
274
        def parse_csv():
275
            if not self.csv_importer.run(fd_or_str, encoding):
276
                self.add_error(self.csv_importer.error)
277

  
278
        def do_import():
279
            unique_map = {}
280

  
281
            try:
282
                with atomic():
283
                    for row in self.rows:
284
                        if not self.do_import_row(row, unique_map):
285
                            self.rows_with_errors += 1
286
                    if simulate:
287
                        raise Simulate
288
            except Simulate:
289
                pass
290

  
291
        for action in [
292
                parse_csv,
293
                self.parse_header_row,
294
                self.parse_rows,
295
                do_import]:
296
            action()
297
            if self.errors:
298
                break
299

  
300
        self.has_errors = self.has_errors or bool(self.errors)
301
        return not bool(self.errors)
302

  
303
    def parse_header_row(self):
304
        self.headers = []
305
        self.headers_by_name = {}
306

  
307
        try:
308
            header_row = self.csv_importer.rows[0]
309
        except IndexError:
310
            self.add_error(Error('no-header-row', _('Missing header row')))
311
            return
312

  
313
        for i, head in enumerate(header_row):
314
            self.parse_header(head, column=i + 1)
315

  
316
        if not self.headers:
317
            self.add_error(Error('empty-header-row', _('Empty header row')))
318
            return
319

  
320
        key_counts = sum(1 for header in self.headers if header.key)
321

  
322
        if not key_counts:
323
            self.add_error(Error('missing-key-column', _('Missing key column')))
324
        if key_counts > 1:
325
            self.add_error(Error('too-many-key-columns', _('Too many key columns')))
326

  
327
        header_names = set(self.headers_by_name)
328
        if header_names & SOURCE_COLUMNS and not SOURCE_COLUMNS.issubset(header_names):
329
            self.add_error(
330
                Error('invalid-external-id-pair',
331
                      _('You must have a source_name and a source_id column')))
332

  
333
    def parse_header(self, head, column):
334
        splitted = head.split()
335
        try:
336
            header = CsvHeader(column, splitted[0])
337
            if header.name in self.headers_by_name:
338
                self.add_error(
339
                    Error('duplicate-header', _('Header "%s" is duplicated') % header.name))
340
                return
341
            self.headers_by_name[header.name] = header
342
        except IndexError:
343
            header = CsvHeader(column)
344
        else:
345
            if header.name in SOURCE_COLUMNS:
346
                if header.name == SOURCE_ID:
347
                    header.key = True
348
            else:
349
                try:
350
                    if header.name in ['email', 'first_name', 'last_name', 'username']:
351
                        User._meta.get_field(header.name)
352
                        header.field = True
353
                        if header.name == 'email':
354
                            # by default email are expected to be verified
355
                            header.verified = True
356
                        if header.name == 'email' and self.email_is_unique:
357
                            header.unique = True
358
                            if app_settings.A2_EMAIL_IS_UNIQUE:
359
                                header.globally_unique = True
360
                        if header.name == 'username' and self.username_is_unique:
361
                            header.unique = True
362
                            if app_settings.A2_USERNAME_IS_UNIQUE:
363
                                header.globally_unique = True
364
                except FieldDoesNotExist:
365
                    pass
366
                if not header.field:
367
                    try:
368
                        attribute = Attribute.objects.get(name=header.name)  # NOQA: F841
369
                        header.attribute = True
370
                    except Attribute.DoesNotExist:
371
                        pass
372

  
373
        self.headers.append(header)
374

  
375
        if (not (header.field or header.attribute)
376
                and header.name not in SOURCE_COLUMNS):
377
            self.add_error(LineError('unknown-or-missing-attribute',
378
                                     _('unknown or missing attribute "%s"') % head,
379
                                     line=1, column=column))
380
            return
381

  
382
        for flag in splitted[1:]:
383
            if header.name in SOURCE_COLUMNS:
384
                self.add_error(LineError(
385
                    'flag-forbidden-on-source-columns',
386
                    _('You cannot set flags on source_app and source_id columns'),
387
                    line=1))
388
                break
389
            value = True
390
            if flag.startswith('no-'):
391
                value = False
392
                flag = flag[3:]
393
            flag = flag.replace('-', '_')
394
            try:
395
                if not getattr(attr.fields(CsvHeader), flag).metadata['flag']:
396
                    raise TypeError
397
                setattr(header, flag, value)
398
            except (AttributeError, TypeError, KeyError):
399
                self.add_error(LineError('unknown-flag', _('unknown flag "%s"'), line=1, column=column))
400

  
401
    def parse_rows(self):
402
        base_form_class = ImportUserForm
403
        if SOURCE_NAME in self.headers_by_name:
404
            base_form_class = ImportUserFormWithExternalId
405
        form_class = modelform_factory(User, fields=self.headers_by_name.keys(), form=base_form_class)
406
        rows = self.rows = []
407
        for i, row in enumerate(self.csv_importer.rows[1:]):
408
            csv_row = self.parse_row(form_class, row, line=i + 2)
409
            self.has_errors = self.has_errors or not(csv_row.is_valid)
410
            rows.append(csv_row)
411

  
412
    def parse_row(self, form_class, row, line):
413
        data = {}
414

  
415
        for header in self.headers:
416
            try:
417
                data[header.name] = row[header.column - 1]
418
            except IndexError:
419
                pass
420

  
421
        form = form_class(data=data)
422
        form.is_valid()
423

  
424
        def get_form_errors(form, name):
425
            return [Error('data-error', six.text_type(value)) for value in form.errors.get(name, [])]
426

  
427
        cells = [
428
            CsvCell(
429
                line=line,
430
                header=header,
431
                value=data.get(header.name),
432
                missing=header.name not in data,
433
                errors=get_form_errors(form, header.name))
434
            for header in self.headers]
435
        cell_errors = any(bool(cell.errors) for cell in cells)
436
        errors = get_form_errors(form, '__all__')
437
        return CsvRow(
438
            line=line,
439
            cells=cells,
440
            errors=errors,
441
            is_valid=not bool(cell_errors or errors))
442

  
443
    @property
444
    def email_is_unique(self):
445
        return app_settings.A2_EMAIL_IS_UNIQUE or self.ou.email_is_unique
446

  
447
    @property
448
    def username_is_unique(self):
449
        return app_settings.A2_USERNAME_IS_UNIQUE or self.ou.username_is_unique
450

  
451
    def check_unique_constraints(self, row, unique_map, user=None):
452
        ou_users = User.objects.filter(ou=self.ou)
453
        users = User.objects.all()
454
        if user:
455
            users = users.exclude(pk=user.pk)
456
            ou_users = ou_users.exclude(pk=user.pk)
457
        errors = []
458
        for cell in row:
459
            header = cell.header
460
            if header.name == SOURCE_ID:
461
                unique_key = (SOURCE_ID, row[SOURCE_NAME].value, cell.value)
462
            elif header.key or header.globally_unique or header.unique:
463
                unique_key = (header.name, cell.value)
464
            else:
465
                continue
466
            if unique_key in unique_map:
467
                errors.append(
468
                    Error('unique-constraint-failed',
469
                          _('Unique constraint on column "%s" failed: '
470
                            'value already appear on line %d') % (header.name, row.line)))
471
            else:
472
                unique_map[unique_key] = row.line
473

  
474
        for cell in row:
475
            if (not cell.header.globally_unique and not cell.header.unique) or (user and not cell.header.update):
476
                continue
477
            qs = ou_users
478
            if cell.header.globally_unique:
479
                qs = users
480
            if cell.header.field:
481
                unique = not qs.filter(**{cell.header.name: cell.value}).exists()
482
            elif cell.header.attribute:
483
                atvs = AttributeValue.objects.filter(attribute__name=cell.header.name, content=cell.value)
484
                unique = not qs.filter(attribute_values__in=atvs).exists()
485
            if not unique:
486
                errors.append(
487
                    Error('unique-constraint-failed', _('Unique constraint on column "%s" failed') % cell.header.name))
488
        row.errors.extend(errors)
489
        row.is_valid = row.is_valid and not bool(errors)
490
        return not bool(errors)
491

  
492
    @atomic
493
    def do_import_row(self, row, unique_map):
494
        if not row.is_valid:
495
            return False
496

  
497
        for header in self.headers:
498
            if header.key:
499
                header_key = header
500
                break
501
        else:
502
            assert False, 'should not happen'
503

  
504
        user = None
505
        if header_key.name == SOURCE_ID:
506
            # lookup by external id
507
            source_name = row[SOURCE_NAME].value
508
            source_id = row[SOURCE_ID].value
509
            userexternalids = UserExternalId.objects.filter(source=source_name, external_id=source_id)
510
            users = User.objects.filter(userexternalid__in=userexternalids)[:2]
511
        else:
512
            # lookup by field/attribute
513
            key_value = row[header_key].value
514
            if header_key.field:
515
                users = User.objects.filter(
516
                    **{header_key.name: key_value})
517
            elif header_key.attribute:
518
                atvs = AttributeValue.objects.filter(attribute__name=header_key.name, content=key_value)
519
                users = User.objects.filter(attribute_values__in=atvs)
520
            users = users[:2]
521

  
522
        if users:
523
            row.action = 'update'
524
        else:
525
            row.action = 'create'
526

  
527
        if len(users) > 1:
528
            row.errors.append(
529
                Error('key-matches-too-many-users',
530
                      _('Key value "%s" matches too many users') % key_value))
531
            return False
532

  
533
        user = None
534
        if users:
535
            user = users[0]
536

  
537
        if not self.check_unique_constraints(row, unique_map, user=user):
538
            return False
539

  
540
        if not user:
541
            user = User()
542

  
543
        for cell in row.cells:
544
            if not cell.header.field:
545
                continue
546
            if (row.action == 'create' and cell.header.create) or (row.action == 'update' and cell.header.update):
547
                if getattr(user, cell.header.name) != cell.value:
548
                    setattr(user, cell.header.name, cell.value)
549
                    if cell.header.name == 'email' and cell.header.verified:
550
                        user.email_verified = True
551
                    cell.action = 'updated'
552
                    continue
553
            cell.action = 'nothing'
554

  
555
        user.save()
556

  
557
        if header_key.name == SOURCE_ID:
558
            try:
559
                UserExternalId.objects.create(user=user,
560
                                              source=source_name,
561
                                              external_id=source_id)
562
            except IntegrityError:
563
                # should never happen since we have a unique index...
564
                self.errors.append(
565
                    Error('external-id-already-exist',
566
                          _('External id "%s.%s" already exists') % (source_name, source_id)))
567
                raise CancelImport
568

  
569
        for cell in row.cells:
570
            if cell.header.field or not cell.header.attribute:
571
                continue
572
            if (row.action == 'create' and cell.header.create) or (row.action == 'update' and cell.header.update):
573
                attributes = user.attributes
574
                if cell.header.verified:
575
                    attributes = user.verified_attributes
576
                if getattr(attributes, cell.header.name) != cell.value:
577
                    setattr(attributes, cell.header.name, cell.value)
578
                    cell.action = 'updated'
579
                    continue
580
            cell.action = 'nothing'
581

  
582
        setattr(self, row.action + 'd', getattr(self, row.action + 'd') + 1)
583
        return True
src/authentic2/models.py
246 246
    def natural_key(self):
247 247
        return (self.name,)
248 248

  
249
    def __repr__(self):
250
        return '<%s %s>' % (self.__class__.__name__, repr(str(self)))
251

  
249 252
    def __str__(self):
250 253
        return self.label
251 254

  
tests/test_csv_import.py
1
# -*- coding: utf-8 -*-
2
# authentic2 - versatile identity manager
3
# Copyright (C) 2010-2019 Entr'ouvert
4
#
5
# This program is free software: you can redistribute it and/or modify it
6
# under the terms of the GNU Affero General Public License as published
7
# by the Free Software Foundation, either version 3 of the License, or
8
# (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17

  
18
from __future__ import unicode_literals
19

  
20
import pytest
21

  
22
import io
23

  
24
from authentic2.custom_user.models import User
25
from authentic2.models import Attribute
26
from authentic2.a2_rbac.utils import get_default_ou
27

  
28
from authentic2.csv_import import CsvImporter, UserCsvImporter, CsvHeader, Error, LineError
29

  
30
ENCODINGS = [
31
    'iso-8859-1',
32
    'iso-8859-15',
33
    'utf-8',
34
    'cp1252',
35
]
36

  
37

  
38
def pytest_generate_tests(metafunc):
39
    if 'encoding' in metafunc.fixturenames:
40
        metafunc.parametrize('encoding', ENCODINGS)
41
    if 'style' in metafunc.fixturenames:
42
        metafunc.parametrize('style', ['str', 'file'])
43

  
44

  
45
@pytest.fixture
46
def profile(db):
47
    Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone')
48

  
49

  
50
@pytest.fixture
51
def csv_importer_factory(encoding, style):
52
    def factory(content):
53
        content = content.encode(encoding)
54
        if style == 'file':
55
            content = io.BytesIO(content)
56
        importer = CsvImporter()
57
        run = importer.run
58
        importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs)
59
        return importer
60
    return factory
61

  
62

  
63
@pytest.fixture
64
def user_csv_importer_factory(encoding, style):
65
    def factory(content):
66
        content = content.encode(encoding)
67
        if style == 'file':
68
            content = io.BytesIO(content)
69
        importer = UserCsvImporter()
70
        run = importer.run
71
        importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs)
72
        return importer
73
    return factory
74

  
75

  
76
def test_unknown_csv_dialect_error(profile, user_csv_importer_factory):
77
    importer = user_csv_importer_factory('')
78
    assert not importer.run()
79
    assert importer.has_errors
80
    assert importer.errors == [Error('unknown-csv-dialect')]
81

  
82

  
83
def test_empty_header_row_error(profile, user_csv_importer_factory):
84
    importer = user_csv_importer_factory('\n1,2,3')
85
    assert not importer.run()
86
    assert importer.has_errors
87
    assert importer.errors == [Error('empty-header-row')]
88

  
89

  
90
def test_unknown_or_missing_attribute_error1(profile, user_csv_importer_factory):
91
    importer = user_csv_importer_factory('email key,first_name," "\n1,2,3')
92
    assert not importer.run()
93
    assert importer.has_errors
94
    assert importer.errors == [LineError('unknown-or-missing-attribute', line=1, column=2)]
95

  
96

  
97
def test_unknown_or_missing_attribute_error2(profile, user_csv_importer_factory):
98
    importer = user_csv_importer_factory('email key,first_name,x\n1,2,3')
99
    assert not importer.run()
100
    assert importer.has_errors
101
    assert importer.errors == [LineError('unknown-or-missing-attribute', line=1, column=3)]
102

  
103

  
104
def test_unknown_flag_error(profile, user_csv_importer_factory):
105
    importer = user_csv_importer_factory('email key,first_name xxx\n1,2')
106
    assert not importer.run()
107
    assert importer.has_errors
108
    assert importer.errors == [LineError('unknown-flag', line=1, column=2)]
109

  
110

  
111
def test_missing_key_column_error(profile, user_csv_importer_factory):
112
    importer = user_csv_importer_factory('email,first_name\n1,2')
113
    assert not importer.run()
114
    assert importer.has_errors
115
    assert importer.errors == [Error('missing-key-column')]
116

  
117

  
118
def test_too_many_key_columns_error(profile, user_csv_importer_factory):
119
    importer = user_csv_importer_factory('email key,first_name key\n1,2')
120
    assert not importer.run()
121
    assert importer.has_errors
122
    assert importer.errors == [Error('too-many-key-columns')]
123

  
124

  
125
def test_run(profile, user_csv_importer_factory):
126
    assert User.objects.count() == 0
127
    content = '''email key,first_name,last_name,phone update
128
tnoel@entrouvert.com,Thomas,Noël,1234
129
fpeters@entrouvert.com,Frédéric,Péters,5678
130
x,x,x,x'''
131
    importer = user_csv_importer_factory(content)
132

  
133
    assert importer.run(), importer.errors
134
    assert importer.headers == [
135
        CsvHeader(1, 'email', field=True, key=True, verified=True),
136
        CsvHeader(2, 'first_name', field=True),
137
        CsvHeader(3, 'last_name', field=True),
138
        CsvHeader(4, 'phone', attribute=True),
139
    ]
140
    assert importer.has_errors
141
    assert len(importer.rows) == 3
142
    assert all(row.is_valid for row in importer.rows[:2])
143
    assert not importer.rows[2].is_valid
144
    assert importer.rows[2].cells[0].errors
145
    assert all(error == Error('data-error') for error in importer.rows[2].cells[0].errors)
146
    assert not importer.rows[2].cells[1].errors
147
    assert not importer.rows[2].cells[2].errors
148
    assert importer.rows[2].cells[3].errors
149
    assert all(error == Error('data-error') for error in importer.rows[2].cells[3].errors)
150

  
151
    assert importer.updated == 0
152
    assert importer.created == 2
153

  
154
    assert User.objects.count() == 2
155
    thomas = User.objects.get(email='tnoel@entrouvert.com')
156
    assert thomas.email_verified is True
157
    assert thomas.first_name == 'Thomas'
158
    assert thomas.attributes.first_name == 'Thomas'
159
    assert thomas.last_name == 'Noël'
160
    assert thomas.attributes.last_name == 'Noël'
161
    assert thomas.attributes.phone == '1234'
162

  
163
    fpeters = User.objects.get(email='fpeters@entrouvert.com')
164
    assert fpeters.first_name == 'Frédéric'
165
    assert fpeters.email_verified is True
166
    assert fpeters.attributes.first_name == 'Frédéric'
167
    assert fpeters.last_name == 'Péters'
168
    assert fpeters.attributes.last_name == 'Péters'
169
    assert fpeters.attributes.phone == '5678'
170

  
171

  
172
def test_simulate(profile, user_csv_importer_factory):
173
    assert User.objects.count() == 0
174
    content = '''email key,first_name,last_name,phone update
175
tnoel@entrouvert.com,Thomas,Noël,1234
176
fpeters@entrouvert.com,Frédéric,Péters,5678
177
x,x,x,x'''
178
    importer = user_csv_importer_factory(content)
179

  
180
    assert importer.run(simulate=True), importer.errors
181
    assert importer.headers == [
182
        CsvHeader(1, 'email', field=True, key=True, verified=True),
183
        CsvHeader(2, 'first_name', field=True),
184
        CsvHeader(3, 'last_name', field=True),
185
        CsvHeader(4, 'phone', attribute=True),
186
    ]
187
    assert importer.has_errors
188
    assert len(importer.rows) == 3
189
    assert all(row.is_valid for row in importer.rows[:2])
190
    assert not importer.rows[2].is_valid
191
    assert importer.rows[2].cells[0].errors
192
    assert all(error == Error('data-error') for error in importer.rows[2].cells[0].errors)
193
    assert not importer.rows[2].cells[1].errors
194
    assert not importer.rows[2].cells[2].errors
195
    assert importer.rows[2].cells[3].errors
196
    assert all(error == Error('data-error') for error in importer.rows[2].cells[3].errors)
197

  
198
    assert importer.updated == 0
199
    assert importer.created == 2
200

  
201
    assert User.objects.count() == 0
202

  
203

  
204
def test_create_unique_error(profile, user_csv_importer_factory):
205

  
206
    content = '''email key verified,first_name,last_name,phone unique
207
tnoel@entrouvert.com,Thomas,Noël,1234'''
208
    importer = user_csv_importer_factory(content)
209

  
210
    user = User.objects.create(ou=get_default_ou())
211
    user.attributes.phone = '1234'
212

  
213
    assert importer.run()
214

  
215
    assert importer.created == 0
216
    assert importer.updated == 0
217
    assert len(importer.rows) == 1
218
    assert not importer.rows[0].is_valid
219
    assert importer.rows[0].action == 'create'
220
    assert all(not cell.errors for cell in importer.rows[0])
221
    assert all(not cell.action for cell in importer.rows[0])
222
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
223

  
224

  
225
def test_create_unique_in_ou(profile, user_csv_importer_factory):
226

  
227
    content = '''email key verified,first_name,last_name,phone unique
228
tnoel@entrouvert.com,Thomas,Noël,1234'''
229
    importer = user_csv_importer_factory(content)
230

  
231
    user = User.objects.create()
232
    user.attributes.phone = '1234'
233

  
234
    assert importer.run()
235

  
236
    assert len(importer.rows) == 1
237
    assert importer.rows[0].is_valid
238
    assert importer.rows[0].action == 'create'
239
    assert all(not cell.errors for cell in importer.rows[0])
240
    assert all(cell.action == 'updated' for cell in importer.rows[0])
241
    assert importer.created == 1
242
    assert importer.updated == 0
243

  
244

  
245
def test_create_unique_globally_error(profile, user_csv_importer_factory):
246

  
247
    content = '''email key verified,first_name,last_name,phone globally-unique
248
tnoel@entrouvert.com,Thomas,Noël,1234'''
249
    importer = user_csv_importer_factory(content)
250

  
251
    user = User.objects.create()
252
    user.attributes.phone = '1234'
253

  
254
    assert importer.run()
255

  
256
    assert importer.created == 0
257
    assert importer.updated == 0
258
    assert len(importer.rows) == 1
259
    assert not importer.rows[0].is_valid
260
    assert importer.rows[0].action == 'create'
261
    assert all(not cell.errors for cell in importer.rows[0])
262
    assert all(not cell.action for cell in importer.rows[0])
263
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
264

  
265

  
266
def test_create_key_self_reference_error(profile, user_csv_importer_factory):
267
    content = '''email key,first_name,last_name,phone
268
tnoel@entrouvert.com,Thomas,Noël,1234
269
tnoel@entrouvert.com,Frédéric,Péters,1234'''
270
    importer = user_csv_importer_factory(content)
271

  
272
    assert importer.run()
273

  
274
    assert importer.created == 1
275
    assert importer.updated == 0
276
    assert len(importer.rows) == 2
277
    assert importer.rows[0].is_valid
278
    assert importer.rows[0].action == 'create'
279
    assert not importer.rows[1].is_valid
280
    assert importer.rows[1].action == 'update'
281
    assert importer.rows[1].errors == [Error('unique-constraint-failed')]
282

  
283

  
284
def test_update_unique_error(profile, user_csv_importer_factory):
285
    content = '''email key verified,first_name,last_name,phone unique update
286
tnoel@entrouvert.com,Thomas,Noël,1234'''
287
    importer = user_csv_importer_factory(content)
288

  
289
    user = User.objects.create(ou=get_default_ou())
290
    user.attributes.phone = '1234'
291

  
292
    user = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
293

  
294
    assert importer.run()
295

  
296
    assert importer.created == 0
297
    assert importer.updated == 0
298
    assert len(importer.rows) == 1
299
    assert not importer.rows[0].is_valid
300
    assert importer.rows[0].action == 'update'
301
    assert all(not cell.errors for cell in importer.rows[0])
302
    assert all(not cell.action for cell in importer.rows[0])
303
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
304

  
305

  
306
def test_update_unique_globally_error(profile, user_csv_importer_factory):
307
    content = '''email key verified,first_name,last_name,phone globally-unique update
308
tnoel@entrouvert.com,Thomas,Noël,1234'''
309
    importer = user_csv_importer_factory(content)
310

  
311
    user = User.objects.create()
312
    user.attributes.phone = '1234'
313

  
314
    User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
315

  
316
    assert importer.run()
317

  
318
    assert importer.created == 0
319
    assert importer.updated == 0
320
    assert len(importer.rows) == 1
321
    assert not importer.rows[0].is_valid
322
    assert importer.rows[0].action == 'update'
323
    assert all(not cell.errors for cell in importer.rows[0])
324
    assert all(not cell.action for cell in importer.rows[0])
325
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
326

  
327

  
328
def test_update_unique_globally(profile, user_csv_importer_factory):
329
    content = '''email key verified no-update,first_name no-update,last_name no-update,phone unique update
330
tnoel@entrouvert.com,Thomas,Noël,1234'''
331
    importer = user_csv_importer_factory(content)
332

  
333
    user = User.objects.create()
334
    user.attributes.phone = '1234'
335

  
336
    thomas = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
337

  
338
    assert importer.run()
339

  
340
    assert importer.created == 0
341
    assert importer.updated == 1
342
    assert len(importer.rows) == 1
343
    assert importer.rows[0].is_valid
344
    assert importer.rows[0].action == 'update'
345
    assert all(not cell.errors for cell in importer.rows[0])
346
    assert all(cell.action == 'nothing' for cell in importer.rows[0].cells[:3])
347
    assert importer.rows[0].cells[3].action == 'updated'
348

  
349
    thomas.refresh_from_db()
350
    assert not thomas.first_name
351
    assert not thomas.last_name
352
    assert thomas.attributes.phone == '1234'
353

  
354

  
355
def test_external_id(profile, user_csv_importer_factory):
356
    assert User.objects.count() == 0
357
    content = '''_source_name,_source_id,email,first_name,last_name,phone
358
app1,1,tnoel@entrouvert.com,Thomas,Noël,1234
359
app1,2,tnoel@entrouvert.com,Thomas,Noël,1234
360
'''
361
    importer = user_csv_importer_factory(content)
362

  
363
    assert importer.run(), importer.errors
364
    assert importer.headers == [
365
        CsvHeader(1, '_source_name'),
366
        CsvHeader(2, '_source_id', key=True),
367
        CsvHeader(3, 'email', field=True, verified=True),
368
        CsvHeader(4, 'first_name', field=True),
369
        CsvHeader(5, 'last_name', field=True),
370
        CsvHeader(6, 'phone', attribute=True),
371
    ]
372
    assert not importer.has_errors
373
    assert len(importer.rows) == 2
374
    for external_id in ['1', '2']:
375
        thomas = User.objects.get(
376
            userexternalid__source='app1',
377
            userexternalid__external_id=external_id)
378

  
379
        assert thomas.email_verified is True
380
        assert thomas.first_name == 'Thomas'
381
        assert thomas.attributes.first_name == 'Thomas'
382
        assert thomas.last_name == 'Noël'
383
        assert thomas.attributes.last_name == 'Noël'
384
        assert thomas.attributes.phone == '1234'
0
-