Projet

Général

Profil

0003-add-csv-import-framework-32833.patch

Benjamin Dauvergne, 17 juin 2019 12:27

Télécharger (35,4 ko)

Voir les différences:

Subject: [PATCH 3/4] add csv import framework (#32833)

 debian/control               |   4 +-
 setup.py                     |   2 +
 src/authentic2/csv_import.py | 569 +++++++++++++++++++++++++++++++++++
 src/authentic2/models.py     |   3 +
 tests/test_csv_import.py     | 366 ++++++++++++++++++++++
 5 files changed, 943 insertions(+), 1 deletion(-)
 create mode 100644 src/authentic2/csv_import.py
 create mode 100644 tests/test_csv_import.py
debian/control
29 29
    python-django-filters (>= 1),
30 30
    python-django-filters (<< 2),
31 31
    python-pil,
32
    python-tablib
32
    python-tablib,
33
    python-chardet,
34
    python-attr
33 35
Breaks: python-authentic2-auth-fc (<< 0.26)
34 36
Replaces: python-authentic2-auth-fc (<< 0.26)
35 37
Provides: ${python:Provides}, python-authentic2-auth-fc
setup.py
140 140
          'xstatic-select2',
141 141
          'pillow',
142 142
          'tablib',
143
          'chardet',
144
          'attrs',
143 145
      ],
144 146
      zip_safe=False,
145 147
      classifiers=[
src/authentic2/csv_import.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2019 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from __future__ import unicode_literals
18

  
19
import csv
20
import io
21

  
22
from chardet.universaldetector import UniversalDetector
23
import attr
24

  
25
from django import forms
26
from django.core.exceptions import FieldDoesNotExist
27
from django.core.validators import RegexValidator
28
from django.db import IntegrityError
29
from django.db.transaction import atomic
30
from django.utils import six
31
from django.utils.translation import ugettext as _
32

  
33
from authentic2 import app_settings
34
from authentic2.a2_rbac.utils import get_default_ou
35
from authentic2.custom_user.models import User
36
from authentic2.forms.profile import modelform_factory, BaseUserForm
37
from authentic2.models import Attribute, AttributeValue, UserExternalId
38

  
39

  
40
class UTF8Recoder(object):
41
    def __init__(self, fd):
42
        self.fd = fd
43

  
44
    def __iter__(self):
45
        return self
46

  
47
    def next(self):
48
        return self.fd.next().encode('utf-8')
49

  
50

  
51
class UnicodeReader(object):
52
    def __init__(self, fd, dialect='excel', **kwargs):
53
        self.reader = csv.reader(UTF8Recoder(fd), dialect=dialect, **kwargs)
54

  
55
    def next(self):
56
        row = self.reader.next()
57
        return [s.decode('utf-8') for s in row]
58

  
59
    def __iter__(self):
60
        return self
61

  
62

  
63
class CsvImporter(object):
64
    rows = None
65
    error = None
66
    error_description = None
67
    encoding = None
68

  
69
    def run(self, fd_or_str, encoding):
70
        if isinstance(fd_or_str, six.binary_type):
71
            input_fd = io.BytesIO(fd_or_str)
72
        elif isinstance(fd_or_str, six.text_type):
73
            input_fd = io.StringIO(fd_or_str)
74
        elif not hasattr(fd_or_str, 'read1'):
75
            try:
76
                input_fd = io.open(fd_or_str.fileno(), closefd=False)
77
            except Exception:
78
                try:
79
                    fd_or_str.seek(0)
80
                except Exception:
81
                    pass
82
                content = fd_or_str.read()
83
                if isinstance(content, six.text_type):
84
                    input_fd = io.StringIO(content)
85
                else:
86
                    input_fd = io.BytesIO(content)
87
        else:
88
            input_fd = fd_or_str
89

  
90
        assert hasattr(input_fd, 'read'), 'fd_or_str is not a string or a file object'
91

  
92
        def detect_encoding(input_fd, encoding):
93
            # detect StringIO
94
            if hasattr(input_fd, 'line_buffering'):
95
                return input_fd
96

  
97
            if encoding == 'detect':
98
                detector = UniversalDetector()
99

  
100
                try:
101
                    for line in input_fd:
102
                        detector.feed(line)
103
                        if detector.done:
104
                            break
105
                    else:
106
                        self.error = Error('cannot-detect-encoding', _('Cannot detect encoding'))
107
                        return None
108
                    detector.close()
109
                    encoding = detector.result['encoding']
110
                finally:
111
                    input_fd.seek(0)
112

  
113
            if not hasattr(input_fd, 'readable'):
114
                input_fd = io.open(input_fd.fileno(), 'rb', closefd=False)
115
            return io.TextIOWrapper(input_fd, encoding=encoding)
116

  
117
        def parse_csv():
118
            try:
119
                dialect = csv.Sniffer().sniff(input_fd.read().encode('utf-8'))
120
            except csv.Error as e:
121
                self.error = Error('unknown-csv-dialect', _('Unknown CSV dialect: %s') % e)
122
                return False
123
            finally:
124
                input_fd.seek(0)
125

  
126
            if not dialect:
127
                self.error = Error('unknown-csv-dialect', _('Unknown CSV dialect'))
128
                return False
129
            reader = UnicodeReader(input_fd, dialect)
130
            self.rows = list(reader)
131
            return True
132

  
133
        input_fd = detect_encoding(input_fd, encoding)
134
        if input_fd is None:
135
            return False
136

  
137
        return parse_csv()
138

  
139

  
140
@attr.s
141
class CsvHeader(object):
142
    column = attr.ib()
143
    name = attr.ib(default='')
144
    field = attr.ib(default=False, converter=bool)
145
    attribute = attr.ib(default=False, converter=bool)
146
    create = attr.ib(default=True, metadata={'flag': True})
147
    update = attr.ib(default=True, metadata={'flag': True})
148
    key = attr.ib(default=False, metadata={'flag': True})
149
    unique = attr.ib(default=False, metadata={'flag': True})
150
    globally_unique = attr.ib(default=False, metadata={'flag': True})
151
    verified = attr.ib(default=False, metadata={'flag': True})
152

  
153
    @property
154
    def flags(self):
155
        flags = []
156
        for attribute in attr.fields(self.__class__):
157
            if attribute.metadata.get('flag'):
158
                if getattr(self, attribute.name):
159
                    flags.append(attribute.name)
160
                else:
161
                    flags.append('no-' + attribute.name.replace('_', '-'))
162
        return flags
163

  
164

  
165
@attr.s
166
class Error(object):
167
    code = attr.ib()
168
    description = attr.ib(default='', cmp=False)
169

  
170
    def __eq__(self, other):
171
        if isinstance(other, Error):
172
            return self.as_error.__eq__(other)
173
        return super(LineError, self).__eq__(other)
174

  
175

  
176
@attr.s(cmp=False)
177
class LineError(Error):
178
    line = attr.ib(default=0)
179
    column = attr.ib(default=0)
180

  
181
    @classmethod
182
    def from_error(cls, error):
183
        return cls(**attr.asdict(error))
184

  
185
    def as_error(self):
186
        return Error(self.code, self.description)
187

  
188
    def __eq__(self, other):
189
        if isinstance(other, Error):
190
            return self.as_error() == other
191
        return (self.code, self.line, self.column) == (other.code, other.line, other.column)
192

  
193

  
194
class ImportUserForm(BaseUserForm):
195
    def clean(self):
196
        super(BaseUserForm, self).clean()
197
        self._validate_unique = False
198

  
199
SOURCE_NAME = '_source_name'
200
SOURCE_ID = '_source_id'
201
SOURCE_COLUMNS = set([SOURCE_NAME, SOURCE_ID])
202

  
203

  
204
class ImportUserFormWithExternalId(ImportUserForm):
205
    locals()[SOURCE_NAME] = forms.CharField(
206
        label=_('Source name'),
207
        required=False,
208
        validators=[
209
            RegexValidator(
210
                r'^[a-zA-Z0-9_-]+$',
211
                _('source_name must no spaces and only letters, digits, - and _'),
212
                'invalid')])
213
    locals()[SOURCE_ID] = forms.CharField(
214
        label=_('Source external id'))
215

  
216

  
217
@attr.s
218
class CsvRow(object):
219
    line = attr.ib()
220
    cells = attr.ib(default=[])
221
    errors = attr.ib(default=[])
222
    is_valid = attr.ib(default=True)
223
    action = attr.ib(default=None)
224

  
225
    def __getitem__(self, header):
226
        for cell in self.cells:
227
            if cell.header == header or cell.header.name == header:
228
                return cell
229
        raise KeyError(header.name)
230

  
231
    def __iter__(self):
232
        return iter(self.cells)
233

  
234

  
235
@attr.s
236
class CsvCell(object):
237
    line = attr.ib()
238
    header = attr.ib()
239
    value = attr.ib(default=None)
240
    missing = attr.ib(default=False)
241
    errors = attr.ib(default=[])
242
    action = attr.ib(default=None)
243

  
244
    @property
245
    def column(self):
246
        return self.header.column
247

  
248

  
249
class Simulate(Exception):
250
    pass
251

  
252

  
253
class CancelImport(Exception):
254
    pass
255

  
256

  
257
class UserCsvImporter(object):
258
    csv_importer = None
259
    errors = None
260
    headers = None
261
    headers_by_name = None
262
    rows = None
263
    has_errors = False
264
    ou = None
265
    updated = 0
266
    created = 0
267
    rows_with_errors = 0
268

  
269
    def add_error(self, line_error):
270
        if not hasattr(line_error, 'line'):
271
            line_error = LineError.from_error(line_error)
272
        self.errors.append(line_error)
273

  
274
    def run(self, fd_or_str, encoding, ou=None, simulate=False):
275
        self.ou = ou or get_default_ou()
276
        self.errors = []
277
        self.csv_importer = CsvImporter()
278

  
279
        def parse_csv():
280
            if not self.csv_importer.run(fd_or_str, encoding):
281
                self.add_error(self.csv_importer.error)
282

  
283
        def do_import():
284
            try:
285
                with atomic():
286
                    for row in self.rows:
287
                        if not self.do_import_row(row):
288
                            self.rows_with_errors += 1
289
                    if simulate:
290
                        raise Simulate
291
            except Simulate:
292
                pass
293

  
294
        for action in [
295
                parse_csv,
296
                self.parse_header_row,
297
                self.parse_rows,
298
                do_import]:
299
            action()
300
            if self.errors:
301
                break
302

  
303
        self.has_errors = self.has_errors or bool(self.errors)
304
        return not bool(self.errors)
305

  
306
    def parse_header_row(self):
307
        self.headers = []
308
        self.headers_by_name = {}
309

  
310
        try:
311
            header_row = self.csv_importer.rows[0]
312
        except IndexError:
313
            self.add_error(Error('no-header-row', _('Missing header row')))
314
            return
315

  
316
        for i, head in enumerate(header_row):
317
            self.parse_header(head, column=i + 1)
318

  
319
        if not self.headers:
320
            self.add_error(Error('empty-header-row', _('Empty header row')))
321
            return
322

  
323
        key_counts = sum(1 for header in self.headers if header.key)
324

  
325
        if not key_counts:
326
            self.add_error(Error('missing-key-column', _('Missing key column')))
327
        if key_counts > 1:
328
            self.add_error(Error('too-many-key-columns', _('Too many key columns')))
329

  
330
        header_names = set(self.headers_by_name)
331
        if header_names & SOURCE_COLUMNS and not SOURCE_COLUMNS.issubset(header_names):
332
            self.add_error(
333
                Error('invalid-external-id-pair',
334
                      _('You must have a source_name and a source_id column')))
335

  
336
    def parse_header(self, head, column):
337
        splitted = head.split()
338
        try:
339
            header = CsvHeader(column, splitted[0])
340
            if header.name in self.headers_by_name:
341
                self.add_error(
342
                    Error('duplicate-header', _('Header "%s" is duplicated') % header.name))
343
                return
344
            self.headers_by_name[header.name] = header
345
        except IndexError:
346
            header = CsvHeader(column)
347
        else:
348
            if header.name in (SOURCE_NAME, SOURCE_ID):
349
                if header.name == SOURCE_ID:
350
                    header.key = True
351
            else:
352
                try:
353
                    if header.name in ['email', 'first_name', 'last_name', 'username']:
354
                        field = User._meta.get_field(header.name)  # NOQA: F841
355
                        header.field = True
356
                        if header.name == 'email':
357
                            # by default email are expected to be verified
358
                            header.verified = True
359
                        if header.name == 'email' and self.email_is_unique:
360
                            header.unique = True
361
                            if app_settings.A2_EMAIL_IS_UNIQUE:
362
                                header.globally_unique = True
363
                        if header.name == 'username' and self.username_is_unique:
364
                            header.unique = True
365
                            if app_settings.A2_USERNAME_IS_UNIQUE:
366
                                header.globally_unique = True
367
                except FieldDoesNotExist:
368
                    pass
369
                if not header.field:
370
                    try:
371
                        attribute = Attribute.objects.get(name=header.name)  # NOQA: F841
372
                        header.attribute = True
373
                    except Attribute.DoesNotExist:
374
                        pass
375

  
376
        self.headers.append(header)
377

  
378
        if (not (header.field or header.attribute)
379
                and header.name not in SOURCE_COLUMNS):
380
            self.add_error(LineError('unknown-or-missing-attribute',
381
                                     _('unknown or missing attribute "%s"') % head,
382
                                     line=1, column=column))
383
            return
384

  
385
        for flag in splitted[1:]:
386
            if header.name in SOURCE_COLUMNS:
387
                self.add_error(LineError(
388
                    'flag-forbidden-on-source-columns',
389
                    _('You cannot set flags on source_app and source_id columns')))
390
                break
391
            value = True
392
            if flag.startswith('no-'):
393
                value = False
394
                flag = flag[3:]
395
            flag = flag.replace('-', '_')
396
            try:
397
                if not getattr(attr.fields(CsvHeader), flag).metadata['flag']:
398
                    raise TypeError
399
                setattr(header, flag, value)
400
            except (AttributeError, TypeError, KeyError):
401
                self.add_error(LineError('unknown-flag', _('unknown flag "%s"'), line=1, column=column))
402

  
403
    def parse_rows(self):
404
        base_form_class = ImportUserForm
405
        if SOURCE_NAME in self.headers_by_name:
406
            base_form_class = ImportUserFormWithExternalId
407
        form_class = modelform_factory(User, fields=self.headers_by_name.keys(), form=base_form_class)
408
        rows = self.rows = []
409
        for i, row in enumerate(self.csv_importer.rows[1:]):
410
            csv_row = self.parse_row(form_class, row, line=i + 2)
411
            self.has_errors = self.has_errors or not(csv_row.is_valid)
412
            rows.append(csv_row)
413

  
414
    def parse_row(self, form_class, row, line):
415
        data = {}
416

  
417
        for header in self.headers:
418
            try:
419
                data[header.name] = row[header.column - 1]
420
            except IndexError:
421
                pass
422

  
423
        form = form_class(data=data)
424
        form.is_valid()
425

  
426
        def get_form_errors(form, name):
427
            return [Error('data-error', six.text_type(value)) for value in form.errors.get(name, [])]
428

  
429
        cells = [
430
            CsvCell(
431
                line=line,
432
                header=header,
433
                value=data.get(header.name),
434
                missing=header.name not in data,
435
                errors=get_form_errors(form, header.name))
436
            for header in self.headers]
437
        cell_errors = any(bool(cell.errors) for cell in cells)
438
        errors = get_form_errors(form, '__all__')
439
        return CsvRow(
440
            line=line,
441
            cells=cells,
442
            errors=errors,
443
            is_valid=not bool(cell_errors or errors))
444

  
445
    @property
446
    def email_is_unique(self):
447
        return app_settings.A2_EMAIL_IS_UNIQUE or self.ou.email_is_unique
448

  
449
    @property
450
    def username_is_unique(self):
451
        return app_settings.A2_USERNAME_IS_UNIQUE or self.ou.username_is_unique
452

  
453
    def check_unique_constraints(self, row, user=None):
454
        ou_users = User.objects.filter(ou=self.ou)
455
        users = User.objects.all()
456
        if user:
457
            users = users.exclude(pk=user.pk)
458
            ou_users = ou_users.exclude(pk=user.pk)
459
        errors = []
460
        for cell in row:
461
            if (not cell.header.globally_unique and not cell.header.unique) or (user and not cell.header.update):
462
                continue
463
            qs = ou_users
464
            if cell.header.globally_unique:
465
                qs = users
466
            if cell.header.field:
467
                unique = not qs.filter(**{cell.header.name: cell.value}).exists()
468
            elif cell.header.attribute:
469
                atvs = AttributeValue.objects.filter(attribute__name=cell.header.name, content=cell.value)
470
                unique = not qs.filter(attribute_values__in=atvs).exists()
471
            if not unique:
472
                errors.append(
473
                    Error('unique-constraint-failed', _('Unique constraint on column "%s" failed') % cell.header.name))
474
        row.errors.extend(errors)
475
        row.is_valid = row.is_valid and not bool(errors)
476
        return not bool(errors)
477

  
478
    @atomic
479
    def do_import_row(self, row):
480
        if not row.is_valid:
481
            return False
482

  
483
        for header in self.headers:
484
            if header.key:
485
                header_key = header
486
                break
487
        else:
488
            assert False, 'should not happen'
489

  
490
        user = None
491
        if header_key.name == SOURCE_ID:
492
            # lookup by external id
493
            source_name = row[SOURCE_NAME].value
494
            source_id = row[SOURCE_ID].value
495
            userexternalids = UserExternalId.objects.filter(source=source_name, external_id=source_id)
496
            users = User.objects.filter(userexternalid__in=userexternalids)[:2]
497
        else:
498
            # lookup by field/attribute
499
            key_value = row[header_key].value
500
            if header_key.field:
501
                users = User.objects.filter(
502
                    **{header_key.name: key_value})
503
            elif header_key.attribute:
504
                atvs = AttributeValue.objects.filter(attribute__name=header_key.name, content=key_value)
505
                users = User.objects.filter(attribute_values__in=atvs)
506
            users = users[:2]
507

  
508
        if users:
509
            row.action = 'update'
510
        else:
511
            row.action = 'create'
512

  
513
        if len(users) > 1:
514
            row.errors.append(
515
                Error('key-matches-too-many-users',
516
                      _('Key value "%s" matches too many users') % key_value))
517
            return False
518

  
519
        user = None
520
        if users:
521
            user = users[0]
522

  
523
        if not self.check_unique_constraints(row, user=user):
524
            return False
525

  
526
        if not user:
527
            user = User()
528

  
529
        for cell in row.cells:
530
            if not cell.header.field:
531
                continue
532
            if (row.action == 'create' and cell.header.create) or (row.action == 'update' and cell.header.update):
533
                if getattr(user, cell.header.name) != cell.value:
534
                    setattr(user, cell.header.name, cell.value)
535
                    if cell.header.name == 'email' and cell.header.verified:
536
                        user.email_verified = True
537
                    cell.action = 'updated'
538
                    continue
539
            cell.action = 'nothing'
540

  
541
        user.save()
542

  
543
        if header_key.name == SOURCE_ID:
544
            try:
545
                UserExternalId.objects.create(user=user,
546
                                              source=source_name,
547
                                              external_id=source_id)
548
            except IntegrityError:
549
                # should never happen since we have a unique index...
550
                self.errors.append(
551
                    Error('external-id-already-exist',
552
                          _('External id "%s.%s" already exists') % (source_name, source_id)))
553
                raise CancelImport
554

  
555
        for cell in row.cells:
556
            if cell.header.field or not cell.header.attribute:
557
                continue
558
            if (row.action == 'create' and cell.header.create) or (row.action == 'update' and cell.header.update):
559
                attributes = user.attributes
560
                if cell.header.verified:
561
                    attributes = user.verified_attributes
562
                if getattr(attributes, cell.header.name) != cell.value:
563
                    setattr(attributes, cell.header.name, cell.value)
564
                    cell.action = 'updated'
565
                    continue
566
            cell.action = 'nothing'
567

  
568
        setattr(self, row.action + 'd', getattr(self, row.action + 'd') + 1)
569
        return True
src/authentic2/models.py
246 246
    def natural_key(self):
247 247
        return (self.name,)
248 248

  
249
    def __repr__(self):
250
        return '<%s %s>' % (self.__class__.__name__, repr(str(self)))
251

  
249 252
    def __str__(self):
250 253
        return self.label
251 254

  
tests/test_csv_import.py
1
# -*- coding: utf-8 -*-
2
# authentic2 - versatile identity manager
3
# Copyright (C) 2010-2019 Entr'ouvert
4
#
5
# This program is free software: you can redistribute it and/or modify it
6
# under the terms of the GNU Affero General Public License as published
7
# by the Free Software Foundation, either version 3 of the License, or
8
# (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17

  
18
from __future__ import unicode_literals
19

  
20
import pytest
21

  
22
import io
23

  
24
from authentic2.custom_user.models import User
25
from authentic2.models import Attribute
26
from authentic2.a2_rbac.utils import get_default_ou
27

  
28
from authentic2.csv_import import CsvImporter, UserCsvImporter, CsvHeader, Error, LineError
29

  
30
ENCODINGS = [
31
    'iso-8859-1',
32
    'iso-8859-15',
33
    'utf-8',
34
    'cp1252',
35
]
36

  
37

  
38
def pytest_generate_tests(metafunc):
39
    if 'encoding' in metafunc.fixturenames:
40
        metafunc.parametrize('encoding', ENCODINGS)
41
    if 'style' in metafunc.fixturenames:
42
        metafunc.parametrize('style', ['str', 'file'])
43

  
44

  
45
@pytest.fixture
46
def profile(db):
47
    Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone')
48

  
49

  
50
@pytest.fixture
51
def csv_importer_factory(encoding, style):
52
    def factory(content):
53
        content = content.encode(encoding)
54
        if style == 'file':
55
            content = io.BytesIO(content)
56
        importer = CsvImporter()
57
        run = importer.run
58
        importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs)
59
        return importer
60
    return factory
61

  
62

  
63
@pytest.fixture
64
def user_csv_importer_factory(encoding, style):
65
    def factory(content):
66
        content = content.encode(encoding)
67
        if style == 'file':
68
            content = io.BytesIO(content)
69
        importer = UserCsvImporter()
70
        run = importer.run
71
        importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs)
72
        return importer
73
    return factory
74

  
75

  
76
def test_unknown_csv_dialect_error(profile, user_csv_importer_factory):
77
    importer = user_csv_importer_factory('')
78
    assert not importer.run()
79
    assert importer.has_errors
80
    assert importer.errors == [Error('unknown-csv-dialect')]
81

  
82

  
83
def test_empty_header_row_error(profile, user_csv_importer_factory):
84
    importer = user_csv_importer_factory('\n1,2,3')
85
    assert not importer.run()
86
    assert importer.has_errors
87
    assert importer.errors == [Error('empty-header-row')]
88

  
89

  
90
def test_unknown_or_missing_attribute_error1(profile, user_csv_importer_factory):
91
    importer = user_csv_importer_factory('email key,first_name," "\n1,2,3')
92
    assert not importer.run()
93
    assert importer.has_errors
94
    assert importer.errors == [LineError('unknown-or-missing-attribute', line=1, column=2)]
95

  
96

  
97
def test_unknown_or_missing_attribute_error2(profile, user_csv_importer_factory):
98
    importer = user_csv_importer_factory('email key,first_name,x\n1,2,3')
99
    assert not importer.run()
100
    assert importer.has_errors
101
    assert importer.errors == [LineError('unknown-or-missing-attribute', line=1, column=3)]
102

  
103

  
104
def test_unknown_flag_error(profile, user_csv_importer_factory):
105
    importer = user_csv_importer_factory('email key,first_name xxx\n1,2')
106
    assert not importer.run()
107
    assert importer.has_errors
108
    assert importer.errors == [LineError('unknown-flag', line=1, column=2)]
109

  
110

  
111
def test_missing_key_column_error(profile, user_csv_importer_factory):
112
    importer = user_csv_importer_factory('email,first_name\n1,2')
113
    assert not importer.run()
114
    assert importer.has_errors
115
    assert importer.errors == [Error('missing-key-column')]
116

  
117

  
118
def test_too_many_key_columns_error(profile, user_csv_importer_factory):
119
    importer = user_csv_importer_factory('email key,first_name key\n1,2')
120
    assert not importer.run()
121
    assert importer.has_errors
122
    assert importer.errors == [Error('too-many-key-columns')]
123

  
124

  
125
def test_run(profile, user_csv_importer_factory):
126
    assert User.objects.count() == 0
127
    content = '''email key,first_name,last_name,phone update
128
tnoel@entrouvert.com,Thomas,Noël,1234
129
fpeters@entrouvert.com,Frédéric,Péters,5678
130
x,x,x,x'''
131
    importer = user_csv_importer_factory(content)
132

  
133
    assert importer.run(), importer.errors
134
    assert importer.headers == [
135
        CsvHeader(1, 'email', field=True, key=True, verified=True),
136
        CsvHeader(2, 'first_name', field=True),
137
        CsvHeader(3, 'last_name', field=True),
138
        CsvHeader(4, 'phone', attribute=True),
139
    ]
140
    assert importer.has_errors
141
    assert len(importer.rows) == 3
142
    assert all(row.is_valid for row in importer.rows[:2])
143
    assert not importer.rows[2].is_valid
144
    assert importer.rows[2].cells[0].errors
145
    assert all(error == Error('data-error') for error in importer.rows[2].cells[0].errors)
146
    assert not importer.rows[2].cells[1].errors
147
    assert not importer.rows[2].cells[2].errors
148
    assert importer.rows[2].cells[3].errors
149
    assert all(error == Error('data-error') for error in importer.rows[2].cells[3].errors)
150

  
151
    assert importer.updated == 0
152
    assert importer.created == 2
153

  
154
    assert User.objects.count() == 2
155
    thomas = User.objects.get(email='tnoel@entrouvert.com')
156
    assert thomas.email_verified is True
157
    assert thomas.first_name == 'Thomas'
158
    assert thomas.attributes.first_name == 'Thomas'
159
    assert thomas.last_name == 'Noël'
160
    assert thomas.attributes.last_name == 'Noël'
161
    assert thomas.attributes.phone == '1234'
162

  
163
    fpeters = User.objects.get(email='fpeters@entrouvert.com')
164
    assert fpeters.first_name == 'Frédéric'
165
    assert fpeters.email_verified is True
166
    assert fpeters.attributes.first_name == 'Frédéric'
167
    assert fpeters.last_name == 'Péters'
168
    assert fpeters.attributes.last_name == 'Péters'
169
    assert fpeters.attributes.phone == '5678'
170

  
171

  
172
def test_simulate(profile, user_csv_importer_factory):
173
    assert User.objects.count() == 0
174
    content = '''email key,first_name,last_name,phone update
175
tnoel@entrouvert.com,Thomas,Noël,1234
176
fpeters@entrouvert.com,Frédéric,Péters,5678
177
x,x,x,x'''
178
    importer = user_csv_importer_factory(content)
179

  
180
    assert importer.run(simulate=True), importer.errors
181
    assert importer.headers == [
182
        CsvHeader(1, 'email', field=True, key=True, verified=True),
183
        CsvHeader(2, 'first_name', field=True),
184
        CsvHeader(3, 'last_name', field=True),
185
        CsvHeader(4, 'phone', attribute=True),
186
    ]
187
    assert importer.has_errors
188
    assert len(importer.rows) == 3
189
    assert all(row.is_valid for row in importer.rows[:2])
190
    assert not importer.rows[2].is_valid
191
    assert importer.rows[2].cells[0].errors
192
    assert all(error == Error('data-error') for error in importer.rows[2].cells[0].errors)
193
    assert not importer.rows[2].cells[1].errors
194
    assert not importer.rows[2].cells[2].errors
195
    assert importer.rows[2].cells[3].errors
196
    assert all(error == Error('data-error') for error in importer.rows[2].cells[3].errors)
197

  
198
    assert importer.updated == 0
199
    assert importer.created == 2
200

  
201
    assert User.objects.count() == 0
202

  
203

  
204
def test_create_unique_error(profile, user_csv_importer_factory):
205

  
206
    content = '''email key verified,first_name,last_name,phone unique
207
tnoel@entrouvert.com,Thomas,Noël,1234'''
208
    importer = user_csv_importer_factory(content)
209

  
210
    user = User.objects.create(ou=get_default_ou())
211
    user.attributes.phone = '1234'
212

  
213
    assert importer.run()
214

  
215
    assert importer.created == 0
216
    assert importer.updated == 0
217
    assert len(importer.rows) == 1
218
    assert not importer.rows[0].is_valid
219
    assert importer.rows[0].action == 'create'
220
    assert all(not cell.errors for cell in importer.rows[0])
221
    assert all(not cell.action for cell in importer.rows[0])
222
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
223

  
224

  
225
def test_create_unique_in_ou(profile, user_csv_importer_factory):
226

  
227
    content = '''email key verified,first_name,last_name,phone unique
228
tnoel@entrouvert.com,Thomas,Noël,1234'''
229
    importer = user_csv_importer_factory(content)
230

  
231
    user = User.objects.create()
232
    user.attributes.phone = '1234'
233

  
234
    assert importer.run()
235

  
236
    assert len(importer.rows) == 1
237
    assert importer.rows[0].is_valid
238
    assert importer.rows[0].action == 'create'
239
    assert all(not cell.errors for cell in importer.rows[0])
240
    assert all(cell.action == 'updated' for cell in importer.rows[0])
241
    assert importer.created == 1
242
    assert importer.updated == 0
243

  
244

  
245
def test_create_unique_globally_error(profile, user_csv_importer_factory):
246

  
247
    content = '''email key verified,first_name,last_name,phone globally-unique
248
tnoel@entrouvert.com,Thomas,Noël,1234'''
249
    importer = user_csv_importer_factory(content)
250

  
251
    user = User.objects.create()
252
    user.attributes.phone = '1234'
253

  
254
    assert importer.run()
255

  
256
    assert importer.created == 0
257
    assert importer.updated == 0
258
    assert len(importer.rows) == 1
259
    assert not importer.rows[0].is_valid
260
    assert importer.rows[0].action == 'create'
261
    assert all(not cell.errors for cell in importer.rows[0])
262
    assert all(not cell.action for cell in importer.rows[0])
263
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
264

  
265

  
266
def test_update_unique_error(profile, user_csv_importer_factory):
267
    content = '''email key verified,first_name,last_name,phone unique update
268
tnoel@entrouvert.com,Thomas,Noël,1234'''
269
    importer = user_csv_importer_factory(content)
270

  
271
    user = User.objects.create(ou=get_default_ou())
272
    user.attributes.phone = '1234'
273

  
274
    user = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
275

  
276
    assert importer.run()
277

  
278
    assert importer.created == 0
279
    assert importer.updated == 0
280
    assert len(importer.rows) == 1
281
    assert not importer.rows[0].is_valid
282
    assert importer.rows[0].action == 'update'
283
    assert all(not cell.errors for cell in importer.rows[0])
284
    assert all(not cell.action for cell in importer.rows[0])
285
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
286

  
287

  
288
def test_update_unique_globally_error(profile, user_csv_importer_factory):
289
    content = '''email key verified,first_name,last_name,phone globally-unique update
290
tnoel@entrouvert.com,Thomas,Noël,1234'''
291
    importer = user_csv_importer_factory(content)
292

  
293
    user = User.objects.create()
294
    user.attributes.phone = '1234'
295

  
296
    User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
297

  
298
    assert importer.run()
299

  
300
    assert importer.created == 0
301
    assert importer.updated == 0
302
    assert len(importer.rows) == 1
303
    assert not importer.rows[0].is_valid
304
    assert importer.rows[0].action == 'update'
305
    assert all(not cell.errors for cell in importer.rows[0])
306
    assert all(not cell.action for cell in importer.rows[0])
307
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
308

  
309

  
310
def test_update_unique_globally(profile, user_csv_importer_factory):
311
    content = '''email key verified no-update,first_name no-update,last_name no-update,phone unique update
312
tnoel@entrouvert.com,Thomas,Noël,1234'''
313
    importer = user_csv_importer_factory(content)
314

  
315
    user = User.objects.create()
316
    user.attributes.phone = '1234'
317

  
318
    thomas = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
319

  
320
    assert importer.run()
321

  
322
    assert importer.created == 0
323
    assert importer.updated == 1
324
    assert len(importer.rows) == 1
325
    assert importer.rows[0].is_valid
326
    assert importer.rows[0].action == 'update'
327
    assert all(not cell.errors for cell in importer.rows[0])
328
    assert all(cell.action == 'nothing' for cell in importer.rows[0].cells[:3])
329
    assert importer.rows[0].cells[3].action == 'updated'
330

  
331
    thomas.refresh_from_db()
332
    assert not thomas.first_name
333
    assert not thomas.last_name
334
    assert thomas.attributes.phone == '1234'
335

  
336

  
337
def test_external_id(profile, user_csv_importer_factory):
338
    assert User.objects.count() == 0
339
    content = '''_source_name,_source_id,email,first_name,last_name,phone
340
app1,1,tnoel@entrouvert.com,Thomas,Noël,1234
341
app1,2,tnoel@entrouvert.com,Thomas,Noël,1234
342
'''
343
    importer = user_csv_importer_factory(content)
344

  
345
    assert importer.run(), importer.errors
346
    assert importer.headers == [
347
        CsvHeader(1, '_source_name'),
348
        CsvHeader(2, '_source_id', key=True),
349
        CsvHeader(3, 'email', field=True, verified=True),
350
        CsvHeader(4, 'first_name', field=True),
351
        CsvHeader(5, 'last_name', field=True),
352
        CsvHeader(6, 'phone', attribute=True),
353
    ]
354
    assert not importer.has_errors
355
    assert len(importer.rows) == 2
356
    for external_id in ['1', '2']:
357
        thomas = User.objects.get(
358
            userexternalid__source='app1',
359
            userexternalid__external_id=external_id)
360

  
361
        assert thomas.email_verified is True
362
        assert thomas.first_name == 'Thomas'
363
        assert thomas.attributes.first_name == 'Thomas'
364
        assert thomas.last_name == 'Noël'
365
        assert thomas.attributes.last_name == 'Noël'
366
        assert thomas.attributes.phone == '1234'
0
-