Projet

Général

Profil

0003-add-csv-import-framework-32833.patch

Benjamin Dauvergne, 21 juin 2019 13:33

Télécharger (35,3 ko)

Voir les différences:

Subject: [PATCH 3/6] add csv import framework (#32833)

 debian/control               |   4 +-
 setup.py                     |   2 +
 src/authentic2/csv_import.py | 565 +++++++++++++++++++++++++++++++++++
 src/authentic2/models.py     |   3 +
 tests/test_csv_import.py     | 366 +++++++++++++++++++++++
 5 files changed, 939 insertions(+), 1 deletion(-)
 create mode 100644 src/authentic2/csv_import.py
 create mode 100644 tests/test_csv_import.py
debian/control
29 29
    python-django-filters (>= 1),
30 30
    python-django-filters (<< 2),
31 31
    python-pil,
32
    python-tablib
32
    python-tablib,
33
    python-chardet,
34
    python-attr
33 35
Breaks: python-authentic2-auth-fc (<< 0.26)
34 36
Replaces: python-authentic2-auth-fc (<< 0.26)
35 37
Provides: ${python:Provides}, python-authentic2-auth-fc
setup.py
140 140
          'xstatic-select2',
141 141
          'pillow',
142 142
          'tablib',
143
          'chardet',
144
          'attrs',
143 145
      ],
144 146
      zip_safe=False,
145 147
      classifiers=[
src/authentic2/csv_import.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2019 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from __future__ import unicode_literals
18

  
19
import csv
20
import io
21

  
22
from chardet.universaldetector import UniversalDetector
23
import attr
24

  
25
from django import forms
26
from django.core.exceptions import FieldDoesNotExist
27
from django.core.validators import RegexValidator
28
from django.db import IntegrityError
29
from django.db.transaction import atomic
30
from django.utils import six
31
from django.utils.translation import ugettext as _
32

  
33
from authentic2 import app_settings
34
from authentic2.a2_rbac.utils import get_default_ou
35
from authentic2.custom_user.models import User
36
from authentic2.forms.profile import modelform_factory, BaseUserForm
37
from authentic2.models import Attribute, AttributeValue, UserExternalId
38

  
39

  
40
class UTF8Recoder(object):
41
    def __init__(self, fd):
42
        self.fd = fd
43

  
44
    def __iter__(self):
45
        return self
46

  
47
    def next(self):
48
        return self.fd.next().encode('utf-8')
49

  
50

  
51
class UnicodeReader(object):
52
    def __init__(self, fd, dialect='excel', **kwargs):
53
        self.reader = csv.reader(UTF8Recoder(fd), dialect=dialect, **kwargs)
54

  
55
    def next(self):
56
        row = self.reader.next()
57
        return [s.decode('utf-8') for s in row]
58

  
59
    def __iter__(self):
60
        return self
61

  
62

  
63
class CsvImporter(object):
64
    rows = None
65
    error = None
66
    error_description = None
67
    encoding = None
68

  
69
    def run(self, fd_or_str, encoding):
70
        if isinstance(fd_or_str, six.binary_type):
71
            input_fd = io.BytesIO(fd_or_str)
72
        elif isinstance(fd_or_str, six.text_type):
73
            input_fd = io.StringIO(fd_or_str)
74
        elif not hasattr(fd_or_str, 'read1'):
75
            try:
76
                input_fd = io.open(fd_or_str.fileno(), closefd=False)
77
            except Exception:
78
                try:
79
                    fd_or_str.seek(0)
80
                except Exception:
81
                    pass
82
                content = fd_or_str.read()
83
                if isinstance(content, six.text_type):
84
                    input_fd = io.StringIO(content)
85
                else:
86
                    input_fd = io.BytesIO(content)
87
        else:
88
            input_fd = fd_or_str
89

  
90
        assert hasattr(input_fd, 'read'), 'fd_or_str is not a string or a file object'
91

  
92
        def set_encoding(input_fd, encoding):
93
            # detect StringIO
94
            if hasattr(input_fd, 'line_buffering'):
95
                return input_fd
96

  
97
            if encoding == 'detect':
98
                detector = UniversalDetector()
99

  
100
                try:
101
                    for line in input_fd:
102
                        detector.feed(line)
103
                        if detector.done:
104
                            break
105
                    else:
106
                        self.error = Error('cannot-detect-encoding', _('Cannot detect encoding'))
107
                        return None
108
                    detector.close()
109
                    encoding = detector.result['encoding']
110
                finally:
111
                    input_fd.seek(0)
112

  
113
            if not hasattr(input_fd, 'readable'):
114
                input_fd = io.open(input_fd.fileno(), 'rb', closefd=False)
115
            return io.TextIOWrapper(input_fd, encoding=encoding)
116

  
117
        def parse_csv():
118
            try:
119
                dialect = csv.Sniffer().sniff(input_fd.read().encode('utf-8'))
120
            except csv.Error as e:
121
                self.error = Error('unknown-csv-dialect', _('Unknown CSV dialect: %s') % e)
122
                return False
123
            finally:
124
                input_fd.seek(0)
125

  
126
            if not dialect:
127
                self.error = Error('unknown-csv-dialect', _('Unknown CSV dialect'))
128
                return False
129
            reader = UnicodeReader(input_fd, dialect)
130
            self.rows = list(reader)
131
            return True
132

  
133
        input_fd = set_encoding(input_fd, encoding)
134
        if input_fd is None:
135
            return False
136

  
137
        return parse_csv()
138

  
139

  
140
@attr.s
141
class CsvHeader(object):
142
    column = attr.ib()
143
    name = attr.ib(default='')
144
    field = attr.ib(default=False, converter=bool)
145
    attribute = attr.ib(default=False, converter=bool)
146
    create = attr.ib(default=True, metadata={'flag': True})
147
    update = attr.ib(default=True, metadata={'flag': True})
148
    key = attr.ib(default=False, metadata={'flag': True})
149
    unique = attr.ib(default=False, metadata={'flag': True})
150
    globally_unique = attr.ib(default=False, metadata={'flag': True})
151
    verified = attr.ib(default=False, metadata={'flag': True})
152

  
153
    @property
154
    def flags(self):
155
        flags = []
156
        for attribute in attr.fields(self.__class__):
157
            if attribute.metadata.get('flag'):
158
                if getattr(self, attribute.name):
159
                    flags.append(attribute.name)
160
                else:
161
                    flags.append('no-' + attribute.name.replace('_', '-'))
162
        return flags
163

  
164

  
165
@attr.s
166
class Error(object):
167
    code = attr.ib()
168
    description = attr.ib(default='', cmp=False)
169

  
170

  
171
@attr.s(cmp=False)
172
class LineError(Error):
173
    line = attr.ib(default=0)
174
    column = attr.ib(default=0)
175

  
176
    @classmethod
177
    def from_error(cls, error):
178
        return cls(**attr.asdict(error))
179

  
180
    def as_error(self):
181
        return Error(self.code, self.description)
182

  
183
    def __eq__(self, other):
184
        if isinstance(other, Error):
185
            return self.as_error() == other
186
        return (self.code, self.line, self.column) == (other.code, other.line, other.column)
187

  
188

  
189
class ImportUserForm(BaseUserForm):
190
    def clean(self):
191
        super(BaseUserForm, self).clean()
192
        self._validate_unique = False
193

  
194
SOURCE_NAME = '_source_name'
195
SOURCE_ID = '_source_id'
196
SOURCE_COLUMNS = set([SOURCE_NAME, SOURCE_ID])
197

  
198

  
199
class ImportUserFormWithExternalId(ImportUserForm):
200
    locals()[SOURCE_NAME] = forms.CharField(
201
        label=_('Source name'),
202
        required=False,
203
        validators=[
204
            RegexValidator(
205
                r'^[a-zA-Z0-9_-]+$',
206
                _('source_name must no spaces and only letters, digits, - and _'),
207
                'invalid')])
208
    locals()[SOURCE_ID] = forms.CharField(
209
        label=_('Source external id'))
210

  
211

  
212
@attr.s
213
class CsvRow(object):
214
    line = attr.ib()
215
    cells = attr.ib(default=[])
216
    errors = attr.ib(default=[])
217
    is_valid = attr.ib(default=True)
218
    action = attr.ib(default=None)
219

  
220
    def __getitem__(self, header):
221
        for cell in self.cells:
222
            if cell.header == header or cell.header.name == header:
223
                return cell
224
        raise KeyError(header.name)
225

  
226
    def __iter__(self):
227
        return iter(self.cells)
228

  
229

  
230
@attr.s
231
class CsvCell(object):
232
    line = attr.ib()
233
    header = attr.ib()
234
    value = attr.ib(default=None)
235
    missing = attr.ib(default=False)
236
    errors = attr.ib(default=[])
237
    action = attr.ib(default=None)
238

  
239
    @property
240
    def column(self):
241
        return self.header.column
242

  
243

  
244
class Simulate(Exception):
245
    pass
246

  
247

  
248
class CancelImport(Exception):
249
    pass
250

  
251

  
252
class UserCsvImporter(object):
253
    csv_importer = None
254
    errors = None
255
    headers = None
256
    headers_by_name = None
257
    rows = None
258
    has_errors = False
259
    ou = None
260
    updated = 0
261
    created = 0
262
    rows_with_errors = 0
263

  
264
    def add_error(self, line_error):
265
        if not hasattr(line_error, 'line'):
266
            line_error = LineError.from_error(line_error)
267
        self.errors.append(line_error)
268

  
269
    def run(self, fd_or_str, encoding, ou=None, simulate=False):
270
        self.ou = ou or get_default_ou()
271
        self.errors = []
272
        self.csv_importer = CsvImporter()
273

  
274
        def parse_csv():
275
            if not self.csv_importer.run(fd_or_str, encoding):
276
                self.add_error(self.csv_importer.error)
277

  
278
        def do_import():
279
            try:
280
                with atomic():
281
                    for row in self.rows:
282
                        if not self.do_import_row(row):
283
                            self.rows_with_errors += 1
284
                    if simulate:
285
                        raise Simulate
286
            except Simulate:
287
                pass
288

  
289
        for action in [
290
                parse_csv,
291
                self.parse_header_row,
292
                self.parse_rows,
293
                do_import]:
294
            action()
295
            if self.errors:
296
                break
297

  
298
        self.has_errors = self.has_errors or bool(self.errors)
299
        return not bool(self.errors)
300

  
301
    def parse_header_row(self):
302
        self.headers = []
303
        self.headers_by_name = {}
304

  
305
        try:
306
            header_row = self.csv_importer.rows[0]
307
        except IndexError:
308
            self.add_error(Error('no-header-row', _('Missing header row')))
309
            return
310

  
311
        for i, head in enumerate(header_row):
312
            self.parse_header(head, column=i + 1)
313

  
314
        if not self.headers:
315
            self.add_error(Error('empty-header-row', _('Empty header row')))
316
            return
317

  
318
        key_counts = sum(1 for header in self.headers if header.key)
319

  
320
        if not key_counts:
321
            self.add_error(Error('missing-key-column', _('Missing key column')))
322
        if key_counts > 1:
323
            self.add_error(Error('too-many-key-columns', _('Too many key columns')))
324

  
325
        header_names = set(self.headers_by_name)
326
        if header_names & SOURCE_COLUMNS and not SOURCE_COLUMNS.issubset(header_names):
327
            self.add_error(
328
                Error('invalid-external-id-pair',
329
                      _('You must have a source_name and a source_id column')))
330

  
331
    def parse_header(self, head, column):
332
        splitted = head.split()
333
        try:
334
            header = CsvHeader(column, splitted[0])
335
            if header.name in self.headers_by_name:
336
                self.add_error(
337
                    Error('duplicate-header', _('Header "%s" is duplicated') % header.name))
338
                return
339
            self.headers_by_name[header.name] = header
340
        except IndexError:
341
            header = CsvHeader(column)
342
        else:
343
            if header.name in SOURCE_COLUMNS:
344
                if header.name == SOURCE_ID:
345
                    header.key = True
346
            else:
347
                try:
348
                    if header.name in ['email', 'first_name', 'last_name', 'username']:
349
                        User._meta.get_field(header.name)
350
                        header.field = True
351
                        if header.name == 'email':
352
                            # by default email are expected to be verified
353
                            header.verified = True
354
                        if header.name == 'email' and self.email_is_unique:
355
                            header.unique = True
356
                            if app_settings.A2_EMAIL_IS_UNIQUE:
357
                                header.globally_unique = True
358
                        if header.name == 'username' and self.username_is_unique:
359
                            header.unique = True
360
                            if app_settings.A2_USERNAME_IS_UNIQUE:
361
                                header.globally_unique = True
362
                except FieldDoesNotExist:
363
                    pass
364
                if not header.field:
365
                    try:
366
                        attribute = Attribute.objects.get(name=header.name)  # NOQA: F841
367
                        header.attribute = True
368
                    except Attribute.DoesNotExist:
369
                        pass
370

  
371
        self.headers.append(header)
372

  
373
        if (not (header.field or header.attribute)
374
                and header.name not in SOURCE_COLUMNS):
375
            self.add_error(LineError('unknown-or-missing-attribute',
376
                                     _('unknown or missing attribute "%s"') % head,
377
                                     line=1, column=column))
378
            return
379

  
380
        for flag in splitted[1:]:
381
            if header.name in SOURCE_COLUMNS:
382
                self.add_error(LineError(
383
                    'flag-forbidden-on-source-columns',
384
                    _('You cannot set flags on source_app and source_id columns'),
385
                    line=1))
386
                break
387
            value = True
388
            if flag.startswith('no-'):
389
                value = False
390
                flag = flag[3:]
391
            flag = flag.replace('-', '_')
392
            try:
393
                if not getattr(attr.fields(CsvHeader), flag).metadata['flag']:
394
                    raise TypeError
395
                setattr(header, flag, value)
396
            except (AttributeError, TypeError, KeyError):
397
                self.add_error(LineError('unknown-flag', _('unknown flag "%s"'), line=1, column=column))
398

  
399
    def parse_rows(self):
400
        base_form_class = ImportUserForm
401
        if SOURCE_NAME in self.headers_by_name:
402
            base_form_class = ImportUserFormWithExternalId
403
        form_class = modelform_factory(User, fields=self.headers_by_name.keys(), form=base_form_class)
404
        rows = self.rows = []
405
        for i, row in enumerate(self.csv_importer.rows[1:]):
406
            csv_row = self.parse_row(form_class, row, line=i + 2)
407
            self.has_errors = self.has_errors or not(csv_row.is_valid)
408
            rows.append(csv_row)
409

  
410
    def parse_row(self, form_class, row, line):
411
        data = {}
412

  
413
        for header in self.headers:
414
            try:
415
                data[header.name] = row[header.column - 1]
416
            except IndexError:
417
                pass
418

  
419
        form = form_class(data=data)
420
        form.is_valid()
421

  
422
        def get_form_errors(form, name):
423
            return [Error('data-error', six.text_type(value)) for value in form.errors.get(name, [])]
424

  
425
        cells = [
426
            CsvCell(
427
                line=line,
428
                header=header,
429
                value=data.get(header.name),
430
                missing=header.name not in data,
431
                errors=get_form_errors(form, header.name))
432
            for header in self.headers]
433
        cell_errors = any(bool(cell.errors) for cell in cells)
434
        errors = get_form_errors(form, '__all__')
435
        return CsvRow(
436
            line=line,
437
            cells=cells,
438
            errors=errors,
439
            is_valid=not bool(cell_errors or errors))
440

  
441
    @property
442
    def email_is_unique(self):
443
        return app_settings.A2_EMAIL_IS_UNIQUE or self.ou.email_is_unique
444

  
445
    @property
446
    def username_is_unique(self):
447
        return app_settings.A2_USERNAME_IS_UNIQUE or self.ou.username_is_unique
448

  
449
    def check_unique_constraints(self, row, user=None):
450
        ou_users = User.objects.filter(ou=self.ou)
451
        users = User.objects.all()
452
        if user:
453
            users = users.exclude(pk=user.pk)
454
            ou_users = ou_users.exclude(pk=user.pk)
455
        errors = []
456
        for cell in row:
457
            if (not cell.header.globally_unique and not cell.header.unique) or (user and not cell.header.update):
458
                continue
459
            qs = ou_users
460
            if cell.header.globally_unique:
461
                qs = users
462
            if cell.header.field:
463
                unique = not qs.filter(**{cell.header.name: cell.value}).exists()
464
            elif cell.header.attribute:
465
                atvs = AttributeValue.objects.filter(attribute__name=cell.header.name, content=cell.value)
466
                unique = not qs.filter(attribute_values__in=atvs).exists()
467
            if not unique:
468
                errors.append(
469
                    Error('unique-constraint-failed', _('Unique constraint on column "%s" failed') % cell.header.name))
470
        row.errors.extend(errors)
471
        row.is_valid = row.is_valid and not bool(errors)
472
        return not bool(errors)
473

  
474
    @atomic
475
    def do_import_row(self, row):
476
        if not row.is_valid:
477
            return False
478

  
479
        for header in self.headers:
480
            if header.key:
481
                header_key = header
482
                break
483
        else:
484
            assert False, 'should not happen'
485

  
486
        user = None
487
        if header_key.name == SOURCE_ID:
488
            # lookup by external id
489
            source_name = row[SOURCE_NAME].value
490
            source_id = row[SOURCE_ID].value
491
            userexternalids = UserExternalId.objects.filter(source=source_name, external_id=source_id)
492
            users = User.objects.filter(userexternalid__in=userexternalids)[:2]
493
        else:
494
            # lookup by field/attribute
495
            key_value = row[header_key].value
496
            if header_key.field:
497
                users = User.objects.filter(
498
                    **{header_key.name: key_value})
499
            elif header_key.attribute:
500
                atvs = AttributeValue.objects.filter(attribute__name=header_key.name, content=key_value)
501
                users = User.objects.filter(attribute_values__in=atvs)
502
            users = users[:2]
503

  
504
        if users:
505
            row.action = 'update'
506
        else:
507
            row.action = 'create'
508

  
509
        if len(users) > 1:
510
            row.errors.append(
511
                Error('key-matches-too-many-users',
512
                      _('Key value "%s" matches too many users') % key_value))
513
            return False
514

  
515
        user = None
516
        if users:
517
            user = users[0]
518

  
519
        if not self.check_unique_constraints(row, user=user):
520
            return False
521

  
522
        if not user:
523
            user = User()
524

  
525
        for cell in row.cells:
526
            if not cell.header.field:
527
                continue
528
            if (row.action == 'create' and cell.header.create) or (row.action == 'update' and cell.header.update):
529
                if getattr(user, cell.header.name) != cell.value:
530
                    setattr(user, cell.header.name, cell.value)
531
                    if cell.header.name == 'email' and cell.header.verified:
532
                        user.email_verified = True
533
                    cell.action = 'updated'
534
                    continue
535
            cell.action = 'nothing'
536

  
537
        user.save()
538

  
539
        if header_key.name == SOURCE_ID:
540
            try:
541
                UserExternalId.objects.create(user=user,
542
                                              source=source_name,
543
                                              external_id=source_id)
544
            except IntegrityError:
545
                # should never happen since we have a unique index...
546
                self.errors.append(
547
                    Error('external-id-already-exist',
548
                          _('External id "%s.%s" already exists') % (source_name, source_id)))
549
                raise CancelImport
550

  
551
        for cell in row.cells:
552
            if cell.header.field or not cell.header.attribute:
553
                continue
554
            if (row.action == 'create' and cell.header.create) or (row.action == 'update' and cell.header.update):
555
                attributes = user.attributes
556
                if cell.header.verified:
557
                    attributes = user.verified_attributes
558
                if getattr(attributes, cell.header.name) != cell.value:
559
                    setattr(attributes, cell.header.name, cell.value)
560
                    cell.action = 'updated'
561
                    continue
562
            cell.action = 'nothing'
563

  
564
        setattr(self, row.action + 'd', getattr(self, row.action + 'd') + 1)
565
        return True
src/authentic2/models.py
246 246
    def natural_key(self):
247 247
        return (self.name,)
248 248

  
249
    def __repr__(self):
250
        return '<%s %s>' % (self.__class__.__name__, repr(str(self)))
251

  
249 252
    def __str__(self):
250 253
        return self.label
251 254

  
tests/test_csv_import.py
1
# -*- coding: utf-8 -*-
2
# authentic2 - versatile identity manager
3
# Copyright (C) 2010-2019 Entr'ouvert
4
#
5
# This program is free software: you can redistribute it and/or modify it
6
# under the terms of the GNU Affero General Public License as published
7
# by the Free Software Foundation, either version 3 of the License, or
8
# (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17

  
18
from __future__ import unicode_literals
19

  
20
import pytest
21

  
22
import io
23

  
24
from authentic2.custom_user.models import User
25
from authentic2.models import Attribute
26
from authentic2.a2_rbac.utils import get_default_ou
27

  
28
from authentic2.csv_import import CsvImporter, UserCsvImporter, CsvHeader, Error, LineError
29

  
30
ENCODINGS = [
31
    'iso-8859-1',
32
    'iso-8859-15',
33
    'utf-8',
34
    'cp1252',
35
]
36

  
37

  
38
def pytest_generate_tests(metafunc):
39
    if 'encoding' in metafunc.fixturenames:
40
        metafunc.parametrize('encoding', ENCODINGS)
41
    if 'style' in metafunc.fixturenames:
42
        metafunc.parametrize('style', ['str', 'file'])
43

  
44

  
45
@pytest.fixture
46
def profile(db):
47
    Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone')
48

  
49

  
50
@pytest.fixture
51
def csv_importer_factory(encoding, style):
52
    def factory(content):
53
        content = content.encode(encoding)
54
        if style == 'file':
55
            content = io.BytesIO(content)
56
        importer = CsvImporter()
57
        run = importer.run
58
        importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs)
59
        return importer
60
    return factory
61

  
62

  
63
@pytest.fixture
64
def user_csv_importer_factory(encoding, style):
65
    def factory(content):
66
        content = content.encode(encoding)
67
        if style == 'file':
68
            content = io.BytesIO(content)
69
        importer = UserCsvImporter()
70
        run = importer.run
71
        importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs)
72
        return importer
73
    return factory
74

  
75

  
76
def test_unknown_csv_dialect_error(profile, user_csv_importer_factory):
77
    importer = user_csv_importer_factory('')
78
    assert not importer.run()
79
    assert importer.has_errors
80
    assert importer.errors == [Error('unknown-csv-dialect')]
81

  
82

  
83
def test_empty_header_row_error(profile, user_csv_importer_factory):
84
    importer = user_csv_importer_factory('\n1,2,3')
85
    assert not importer.run()
86
    assert importer.has_errors
87
    assert importer.errors == [Error('empty-header-row')]
88

  
89

  
90
def test_unknown_or_missing_attribute_error1(profile, user_csv_importer_factory):
91
    importer = user_csv_importer_factory('email key,first_name," "\n1,2,3')
92
    assert not importer.run()
93
    assert importer.has_errors
94
    assert importer.errors == [LineError('unknown-or-missing-attribute', line=1, column=2)]
95

  
96

  
97
def test_unknown_or_missing_attribute_error2(profile, user_csv_importer_factory):
98
    importer = user_csv_importer_factory('email key,first_name,x\n1,2,3')
99
    assert not importer.run()
100
    assert importer.has_errors
101
    assert importer.errors == [LineError('unknown-or-missing-attribute', line=1, column=3)]
102

  
103

  
104
def test_unknown_flag_error(profile, user_csv_importer_factory):
105
    importer = user_csv_importer_factory('email key,first_name xxx\n1,2')
106
    assert not importer.run()
107
    assert importer.has_errors
108
    assert importer.errors == [LineError('unknown-flag', line=1, column=2)]
109

  
110

  
111
def test_missing_key_column_error(profile, user_csv_importer_factory):
112
    importer = user_csv_importer_factory('email,first_name\n1,2')
113
    assert not importer.run()
114
    assert importer.has_errors
115
    assert importer.errors == [Error('missing-key-column')]
116

  
117

  
118
def test_too_many_key_columns_error(profile, user_csv_importer_factory):
119
    importer = user_csv_importer_factory('email key,first_name key\n1,2')
120
    assert not importer.run()
121
    assert importer.has_errors
122
    assert importer.errors == [Error('too-many-key-columns')]
123

  
124

  
125
def test_run(profile, user_csv_importer_factory):
126
    assert User.objects.count() == 0
127
    content = '''email key,first_name,last_name,phone update
128
tnoel@entrouvert.com,Thomas,Noël,1234
129
fpeters@entrouvert.com,Frédéric,Péters,5678
130
x,x,x,x'''
131
    importer = user_csv_importer_factory(content)
132

  
133
    assert importer.run(), importer.errors
134
    assert importer.headers == [
135
        CsvHeader(1, 'email', field=True, key=True, verified=True),
136
        CsvHeader(2, 'first_name', field=True),
137
        CsvHeader(3, 'last_name', field=True),
138
        CsvHeader(4, 'phone', attribute=True),
139
    ]
140
    assert importer.has_errors
141
    assert len(importer.rows) == 3
142
    assert all(row.is_valid for row in importer.rows[:2])
143
    assert not importer.rows[2].is_valid
144
    assert importer.rows[2].cells[0].errors
145
    assert all(error == Error('data-error') for error in importer.rows[2].cells[0].errors)
146
    assert not importer.rows[2].cells[1].errors
147
    assert not importer.rows[2].cells[2].errors
148
    assert importer.rows[2].cells[3].errors
149
    assert all(error == Error('data-error') for error in importer.rows[2].cells[3].errors)
150

  
151
    assert importer.updated == 0
152
    assert importer.created == 2
153

  
154
    assert User.objects.count() == 2
155
    thomas = User.objects.get(email='tnoel@entrouvert.com')
156
    assert thomas.email_verified is True
157
    assert thomas.first_name == 'Thomas'
158
    assert thomas.attributes.first_name == 'Thomas'
159
    assert thomas.last_name == 'Noël'
160
    assert thomas.attributes.last_name == 'Noël'
161
    assert thomas.attributes.phone == '1234'
162

  
163
    fpeters = User.objects.get(email='fpeters@entrouvert.com')
164
    assert fpeters.first_name == 'Frédéric'
165
    assert fpeters.email_verified is True
166
    assert fpeters.attributes.first_name == 'Frédéric'
167
    assert fpeters.last_name == 'Péters'
168
    assert fpeters.attributes.last_name == 'Péters'
169
    assert fpeters.attributes.phone == '5678'
170

  
171

  
172
def test_simulate(profile, user_csv_importer_factory):
173
    assert User.objects.count() == 0
174
    content = '''email key,first_name,last_name,phone update
175
tnoel@entrouvert.com,Thomas,Noël,1234
176
fpeters@entrouvert.com,Frédéric,Péters,5678
177
x,x,x,x'''
178
    importer = user_csv_importer_factory(content)
179

  
180
    assert importer.run(simulate=True), importer.errors
181
    assert importer.headers == [
182
        CsvHeader(1, 'email', field=True, key=True, verified=True),
183
        CsvHeader(2, 'first_name', field=True),
184
        CsvHeader(3, 'last_name', field=True),
185
        CsvHeader(4, 'phone', attribute=True),
186
    ]
187
    assert importer.has_errors
188
    assert len(importer.rows) == 3
189
    assert all(row.is_valid for row in importer.rows[:2])
190
    assert not importer.rows[2].is_valid
191
    assert importer.rows[2].cells[0].errors
192
    assert all(error == Error('data-error') for error in importer.rows[2].cells[0].errors)
193
    assert not importer.rows[2].cells[1].errors
194
    assert not importer.rows[2].cells[2].errors
195
    assert importer.rows[2].cells[3].errors
196
    assert all(error == Error('data-error') for error in importer.rows[2].cells[3].errors)
197

  
198
    assert importer.updated == 0
199
    assert importer.created == 2
200

  
201
    assert User.objects.count() == 0
202

  
203

  
204
def test_create_unique_error(profile, user_csv_importer_factory):
205

  
206
    content = '''email key verified,first_name,last_name,phone unique
207
tnoel@entrouvert.com,Thomas,Noël,1234'''
208
    importer = user_csv_importer_factory(content)
209

  
210
    user = User.objects.create(ou=get_default_ou())
211
    user.attributes.phone = '1234'
212

  
213
    assert importer.run()
214

  
215
    assert importer.created == 0
216
    assert importer.updated == 0
217
    assert len(importer.rows) == 1
218
    assert not importer.rows[0].is_valid
219
    assert importer.rows[0].action == 'create'
220
    assert all(not cell.errors for cell in importer.rows[0])
221
    assert all(not cell.action for cell in importer.rows[0])
222
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
223

  
224

  
225
def test_create_unique_in_ou(profile, user_csv_importer_factory):
226

  
227
    content = '''email key verified,first_name,last_name,phone unique
228
tnoel@entrouvert.com,Thomas,Noël,1234'''
229
    importer = user_csv_importer_factory(content)
230

  
231
    user = User.objects.create()
232
    user.attributes.phone = '1234'
233

  
234
    assert importer.run()
235

  
236
    assert len(importer.rows) == 1
237
    assert importer.rows[0].is_valid
238
    assert importer.rows[0].action == 'create'
239
    assert all(not cell.errors for cell in importer.rows[0])
240
    assert all(cell.action == 'updated' for cell in importer.rows[0])
241
    assert importer.created == 1
242
    assert importer.updated == 0
243

  
244

  
245
def test_create_unique_globally_error(profile, user_csv_importer_factory):
246

  
247
    content = '''email key verified,first_name,last_name,phone globally-unique
248
tnoel@entrouvert.com,Thomas,Noël,1234'''
249
    importer = user_csv_importer_factory(content)
250

  
251
    user = User.objects.create()
252
    user.attributes.phone = '1234'
253

  
254
    assert importer.run()
255

  
256
    assert importer.created == 0
257
    assert importer.updated == 0
258
    assert len(importer.rows) == 1
259
    assert not importer.rows[0].is_valid
260
    assert importer.rows[0].action == 'create'
261
    assert all(not cell.errors for cell in importer.rows[0])
262
    assert all(not cell.action for cell in importer.rows[0])
263
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
264

  
265

  
266
def test_update_unique_error(profile, user_csv_importer_factory):
267
    content = '''email key verified,first_name,last_name,phone unique update
268
tnoel@entrouvert.com,Thomas,Noël,1234'''
269
    importer = user_csv_importer_factory(content)
270

  
271
    user = User.objects.create(ou=get_default_ou())
272
    user.attributes.phone = '1234'
273

  
274
    user = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
275

  
276
    assert importer.run()
277

  
278
    assert importer.created == 0
279
    assert importer.updated == 0
280
    assert len(importer.rows) == 1
281
    assert not importer.rows[0].is_valid
282
    assert importer.rows[0].action == 'update'
283
    assert all(not cell.errors for cell in importer.rows[0])
284
    assert all(not cell.action for cell in importer.rows[0])
285
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
286

  
287

  
288
def test_update_unique_globally_error(profile, user_csv_importer_factory):
289
    content = '''email key verified,first_name,last_name,phone globally-unique update
290
tnoel@entrouvert.com,Thomas,Noël,1234'''
291
    importer = user_csv_importer_factory(content)
292

  
293
    user = User.objects.create()
294
    user.attributes.phone = '1234'
295

  
296
    User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
297

  
298
    assert importer.run()
299

  
300
    assert importer.created == 0
301
    assert importer.updated == 0
302
    assert len(importer.rows) == 1
303
    assert not importer.rows[0].is_valid
304
    assert importer.rows[0].action == 'update'
305
    assert all(not cell.errors for cell in importer.rows[0])
306
    assert all(not cell.action for cell in importer.rows[0])
307
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
308

  
309

  
310
def test_update_unique_globally(profile, user_csv_importer_factory):
311
    content = '''email key verified no-update,first_name no-update,last_name no-update,phone unique update
312
tnoel@entrouvert.com,Thomas,Noël,1234'''
313
    importer = user_csv_importer_factory(content)
314

  
315
    user = User.objects.create()
316
    user.attributes.phone = '1234'
317

  
318
    thomas = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
319

  
320
    assert importer.run()
321

  
322
    assert importer.created == 0
323
    assert importer.updated == 1
324
    assert len(importer.rows) == 1
325
    assert importer.rows[0].is_valid
326
    assert importer.rows[0].action == 'update'
327
    assert all(not cell.errors for cell in importer.rows[0])
328
    assert all(cell.action == 'nothing' for cell in importer.rows[0].cells[:3])
329
    assert importer.rows[0].cells[3].action == 'updated'
330

  
331
    thomas.refresh_from_db()
332
    assert not thomas.first_name
333
    assert not thomas.last_name
334
    assert thomas.attributes.phone == '1234'
335

  
336

  
337
def test_external_id(profile, user_csv_importer_factory):
338
    assert User.objects.count() == 0
339
    content = '''_source_name,_source_id,email,first_name,last_name,phone
340
app1,1,tnoel@entrouvert.com,Thomas,Noël,1234
341
app1,2,tnoel@entrouvert.com,Thomas,Noël,1234
342
'''
343
    importer = user_csv_importer_factory(content)
344

  
345
    assert importer.run(), importer.errors
346
    assert importer.headers == [
347
        CsvHeader(1, '_source_name'),
348
        CsvHeader(2, '_source_id', key=True),
349
        CsvHeader(3, 'email', field=True, verified=True),
350
        CsvHeader(4, 'first_name', field=True),
351
        CsvHeader(5, 'last_name', field=True),
352
        CsvHeader(6, 'phone', attribute=True),
353
    ]
354
    assert not importer.has_errors
355
    assert len(importer.rows) == 2
356
    for external_id in ['1', '2']:
357
        thomas = User.objects.get(
358
            userexternalid__source='app1',
359
            userexternalid__external_id=external_id)
360

  
361
        assert thomas.email_verified is True
362
        assert thomas.first_name == 'Thomas'
363
        assert thomas.attributes.first_name == 'Thomas'
364
        assert thomas.last_name == 'Noël'
365
        assert thomas.attributes.last_name == 'Noël'
366
        assert thomas.attributes.phone == '1234'
0
-