Projet

Général

Profil

0002-add-CSV-import-framework-32833.patch

Benjamin Dauvergne, 17 juin 2019 09:50

Télécharger (31 ko)

Voir les différences:

Subject: [PATCH 2/3] add CSV import framework (#32833)

 debian/control               |   4 +-
 setup.py                     |   2 +
 src/authentic2/csv_import.py | 495 +++++++++++++++++++++++++++++++++++
 src/authentic2/models.py     |   3 +
 tests/test_csv_import.py     | 334 +++++++++++++++++++++++
 5 files changed, 837 insertions(+), 1 deletion(-)
 create mode 100644 src/authentic2/csv_import.py
 create mode 100644 tests/test_csv_import.py
debian/control
29 29
    python-django-filters (>= 1),
30 30
    python-django-filters (<< 2),
31 31
    python-pil,
32
    python-tablib
32
    python-tablib,
33
    python-chardet,
34
    python-attr
33 35
Breaks: python-authentic2-auth-fc (<< 0.26)
34 36
Replaces: python-authentic2-auth-fc (<< 0.26)
35 37
Provides: ${python:Provides}, python-authentic2-auth-fc
setup.py
140 140
          'xstatic-select2',
141 141
          'pillow',
142 142
          'tablib',
143
          'chardet',
144
          'attrs',
143 145
      ],
144 146
      zip_safe=False,
145 147
      classifiers=[
src/authentic2/csv_import.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2019 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
from __future__ import unicode_literals
18

  
19
import csv
20
import io
21

  
22
from chardet.universaldetector import UniversalDetector
23
import attr
24

  
25
from django.core.exceptions import FieldDoesNotExist
26
from django.db.transaction import atomic
27
from django.utils import six
28
from django.utils.translation import ugettext as _
29

  
30
from authentic2 import app_settings
31
from authentic2.a2_rbac.utils import get_default_ou
32
from authentic2.custom_user.models import User
33
from authentic2.forms.profile import modelform_factory, BaseUserForm
34
from authentic2.models import Attribute, AttributeValue
35

  
36

  
37
class UTF8Recoder(object):
38
    def __init__(self, fd):
39
        self.fd = fd
40

  
41
    def __iter__(self):
42
        return self
43

  
44
    def next(self):
45
        return self.fd.next().encode('utf-8')
46

  
47

  
48
class UnicodeReader(object):
49
    def __init__(self, fd, dialect='excel', **kwargs):
50
        self.reader = csv.reader(UTF8Recoder(fd), dialect=dialect, **kwargs)
51

  
52
    def next(self):
53
        row = self.reader.next()
54
        return [s.decode('utf-8') for s in row]
55

  
56
    def __iter__(self):
57
        return self
58

  
59

  
60
class CsvImporter(object):
61
    rows = None
62
    error = None
63
    error_description = None
64
    encoding = None
65

  
66
    def run(self, fd_or_str, encoding):
67
        if isinstance(fd_or_str, six.binary_type):
68
            input_fd = io.BytesIO(fd_or_str)
69
        elif isinstance(fd_or_str, six.text_type):
70
            input_fd = io.StringIO(fd_or_str)
71
        elif not hasattr(fd_or_str, 'read1'):
72
            try:
73
                input_fd = io.open(fd_or_str.fileno(), closefd=False)
74
            except Exception:
75
                try:
76
                    fd_or_str.seek(0)
77
                except Exception:
78
                    pass
79
                content = fd_or_str.read()
80
                if isinstance(content, six.text_type):
81
                    input_fd = io.StringIO(content)
82
                else:
83
                    input_fd = io.BytesIO(content)
84
        else:
85
            input_fd = fd_or_str
86

  
87
        assert hasattr(input_fd, 'read'), 'fd_or_str is not a string or a file object'
88

  
89
        def detect_encoding(input_fd, encoding):
90
            # detect StringIO
91
            if hasattr(input_fd, 'line_buffering'):
92
                return input_fd
93

  
94
            if encoding == 'detect':
95
                detector = UniversalDetector()
96

  
97
                try:
98
                    for line in input_fd:
99
                        detector.feed(line)
100
                        if detector.done:
101
                            break
102
                    else:
103
                        self.error = Error('cannot-detect-encoding', _('Cannot detect encoding'))
104
                        return None
105
                    detector.close()
106
                    encoding = detector.result['encoding']
107
                finally:
108
                    input_fd.seek(0)
109

  
110
            if not hasattr(input_fd, 'readable'):
111
                input_fd = io.open(input_fd.fileno(), 'rb', closefd=False)
112
            return io.TextIOWrapper(input_fd, encoding=encoding)
113

  
114
        def parse_csv():
115
            try:
116
                dialect = csv.Sniffer().sniff(input_fd.read().encode('utf-8'))
117
            except csv.Error as e:
118
                self.error = Error('unknown-csv-dialect', _('Unknown CSV dialect: %s') % e)
119
                return False
120
            finally:
121
                input_fd.seek(0)
122

  
123
            if not dialect:
124
                self.error = Error('unknown-csv-dialect', _('Unknown CSV dialect'))
125
                return False
126
            reader = UnicodeReader(input_fd, dialect)
127
            self.rows = list(reader)
128
            return True
129

  
130
        input_fd = detect_encoding(input_fd, encoding)
131
        if input_fd is None:
132
            return False
133

  
134
        return parse_csv()
135

  
136

  
137
@attr.s
138
class CsvHeader(object):
139
    column = attr.ib()
140
    name = attr.ib(default='')
141
    field = attr.ib(default=False, converter=bool)
142
    attribute = attr.ib(default=False, converter=bool)
143
    create = attr.ib(default=True, metadata={'flag': True})
144
    update = attr.ib(default=True, metadata={'flag': True})
145
    key = attr.ib(default=False, metadata={'flag': True})
146
    unique = attr.ib(default=False, metadata={'flag': True})
147
    globally_unique = attr.ib(default=False, metadata={'flag': True})
148
    verified = attr.ib(default=False, metadata={'flag': True})
149

  
150
    @property
151
    def flags(self):
152
        flags = []
153
        for attribute in attr.fields(self.__class__):
154
            if attribute.metadata.get('flag'):
155
                if getattr(self, attribute.name):
156
                    flags.append(attribute.name)
157
                else:
158
                    flags.append('no-' + attribute.name.replace('_', '-'))
159
        return flags
160

  
161

  
162
@attr.s
163
class Error(object):
164
    code = attr.ib()
165
    description = attr.ib(default='', cmp=False)
166

  
167
    def __eq__(self, other):
168
        if isinstance(other, Error):
169
            return self.as_error.__eq__(other)
170
        return super(LineError, self).__eq__(other)
171

  
172

  
173
@attr.s(cmp=False)
174
class LineError(Error):
175
    line = attr.ib(default=0)
176
    column = attr.ib(default=0)
177

  
178
    @classmethod
179
    def from_error(cls, error):
180
        return cls(**attr.asdict(error))
181

  
182
    def as_error(self):
183
        return Error(self.code, self.description)
184

  
185
    def __eq__(self, other):
186
        if isinstance(other, Error):
187
            return self.as_error() == other
188
        return (self.code, self.line, self.column) == (other.code, other.line, other.column)
189

  
190

  
191
class ImportUserForm(BaseUserForm):
192
    def clean(self):
193
        super(BaseUserForm, self).clean()
194
        self._validate_unique = False
195

  
196

  
197
@attr.s
198
class CsvRow(object):
199
    line = attr.ib()
200
    cells = attr.ib(default=[])
201
    errors = attr.ib(default=[])
202
    is_valid = attr.ib(default=True)
203
    action = attr.ib(default=None)
204

  
205
    def __getitem__(self, header):
206
        for cell in self.cells:
207
            if cell.header == header or cell.header.name == header:
208
                return cell
209
        raise KeyError(header.name)
210

  
211
    def __iter__(self):
212
        return iter(self.cells)
213

  
214

  
215
@attr.s
216
class CsvCell(object):
217
    line = attr.ib()
218
    header = attr.ib()
219
    value = attr.ib(default=None)
220
    missing = attr.ib(default=False)
221
    errors = attr.ib(default=[])
222
    action = attr.ib(default=None)
223

  
224
    @property
225
    def column(self):
226
        return self.header.column
227

  
228

  
229
class Simulate(Exception):
230
    pass
231

  
232

  
233
class UserCsvImporter(object):
234
    csv_importer = None
235
    errors = None
236
    headers = None
237
    headers_by_name = None
238
    rows = None
239
    has_errors = False
240
    ou = None
241
    updated = 0
242
    created = 0
243

  
244
    def add_error(self, line_error):
245
        if not hasattr(line_error, 'line'):
246
            line_error = LineError.from_error(line_error)
247
        self.errors.append(line_error)
248

  
249
    def run(self, fd_or_str, encoding, ou=None, simulate=False):
250
        self.ou = ou or get_default_ou()
251
        self.errors = []
252
        self.csv_importer = CsvImporter()
253

  
254
        def parse_csv():
255
            if not self.csv_importer.run(fd_or_str, encoding):
256
                self.add_error(self.csv_importer.error)
257

  
258
        def do_import():
259
            try:
260
                with atomic():
261
                    for row in self.rows:
262
                        self.do_import_row(row)
263
                    if simulate:
264
                        raise Simulate
265
            except Simulate:
266
                pass
267

  
268
        for action in [
269
                parse_csv,
270
                self.parse_header_row,
271
                self.parse_rows,
272
                do_import]:
273
            action()
274
            if self.errors:
275
                break
276

  
277
        self.has_errors = self.has_errors or bool(self.errors)
278
        return not bool(self.errors)
279

  
280
    def parse_header_row(self):
281
        self.headers = []
282
        self.headers_by_name = {}
283

  
284
        try:
285
            header_row = self.csv_importer.rows[0]
286
        except IndexError:
287
            self.add_error(Error('no-header-row', _('Missing header row')))
288
            return
289

  
290
        for i, head in enumerate(header_row):
291
            self.parse_header(head, column=i + 1)
292

  
293
        if not self.headers:
294
            self.add_error(Error('empty-header-row', _('Empty header row')))
295
            return
296

  
297
        key_counts = sum(1 for header in self.headers if header.key)
298

  
299
        if not key_counts:
300
            self.add_error(Error('missing-key-column', _('Missing key column')))
301
        if key_counts > 1:
302
            self.add_error(Error('too-many-key-columns', _('Too many key columns')))
303

  
304
    def parse_header(self, head, column):
305
        splitted = head.split()
306
        try:
307
            header = CsvHeader(column, splitted[0])
308
            self.headers_by_name[header.name] = header
309
        except IndexError:
310
            header = CsvHeader(column)
311
        else:
312
            try:
313
                if header.name in ['email', 'first_name', 'last_name', 'username']:
314
                    field = User._meta.get_field(header.name)  # NOQA: F841
315
                    header.field = True
316
                    if header.name == 'email' and self.email_is_unique:
317
                        header.unique = True
318
                        if app_settings.A2_EMAIL_IS_UNIQUE:
319
                            header.globally_unique = True
320
                    if header.name == 'username' and self.username_is_unique:
321
                        header.unique = True
322
                        if app_settings.A2_USERNAME_IS_UNIQUE:
323
                            header.globally_unique = True
324
            except FieldDoesNotExist:
325
                pass
326
            if not header.field:
327
                try:
328
                    attribute = Attribute.objects.get(name=header.name)  # NOQA: F841
329
                    header.attribute = True
330
                except Attribute.DoesNotExist:
331
                    pass
332

  
333
        self.headers.append(header)
334

  
335
        if not (header.field or header.attribute):
336
            self.add_error(LineError('unknown-or-missing-attribute',
337
                                     _('unknown or missing attribute "%s"') % head,
338
                                     line=1, column=column))
339
            return
340

  
341
        for flag in splitted[1:]:
342

  
343
            value = True
344
            if flag.startswith('no-'):
345
                value = False
346
                flag = flag[3:]
347
            flag = flag.replace('-', '_')
348
            try:
349
                if not getattr(attr.fields(CsvHeader), flag).metadata['flag']:
350
                    raise TypeError
351
                setattr(header, flag, value)
352
            except (AttributeError, TypeError, KeyError):
353
                self.add_error(LineError('unknown-flag', _('unknown flag "%s"'), line=1, column=column))
354

  
355
    def parse_rows(self):
356
        form_class = modelform_factory(User, fields=self.headers_by_name.keys(), form=ImportUserForm)
357
        rows = self.rows = []
358
        for i, row in enumerate(self.csv_importer.rows[1:]):
359
            csv_row = self.parse_row(form_class, row, line=i + 2)
360
            self.has_errors = self.has_errors or not(csv_row.is_valid)
361
            rows.append(csv_row)
362

  
363
    def parse_row(self, form_class, row, line):
364
        data = {}
365

  
366
        for header in self.headers:
367
            try:
368
                data[header.name] = row[header.column - 1]
369
            except IndexError:
370
                pass
371

  
372
        form = form_class(data=data)
373
        form.is_valid()
374

  
375
        def get_form_errors(form, name):
376
            return [Error('data-error', six.text_type(value)) for value in form.errors.get(name, [])]
377

  
378
        cells = [
379
            CsvCell(
380
                line=line,
381
                header=header,
382
                value=data.get(header.name),
383
                missing=header.name not in data,
384
                errors=get_form_errors(form, header.name))
385
            for header in self.headers]
386
        cell_errors = any(bool(cell.errors) for cell in cells)
387
        errors = get_form_errors(form, '__all__')
388
        return CsvRow(
389
            line=line,
390
            cells=cells,
391
            errors=errors,
392
            is_valid=not bool(cell_errors or errors))
393

  
394
    @property
395
    def email_is_unique(self):
396
        return app_settings.A2_EMAIL_IS_UNIQUE or self.ou.email_is_unique
397

  
398
    @property
399
    def username_is_unique(self):
400
        return app_settings.A2_USERNAME_IS_UNIQUE or self.ou.username_is_unique
401

  
402
    def check_unique_constraints(self, row, user=None):
403
        ou_users = User.objects.filter(ou=self.ou)
404
        users = User.objects.all()
405
        if user:
406
            users = users.exclude(pk=user.pk)
407
            ou_users = ou_users.exclude(pk=user.pk)
408
        errors = []
409
        for cell in row:
410
            if (not cell.header.globally_unique and not cell.header.unique) or (user and not cell.header.update):
411
                continue
412
            qs = ou_users
413
            if cell.header.globally_unique:
414
                qs = users
415
            if cell.header.field:
416
                unique = not qs.filter(**{cell.header.name: cell.value}).exists()
417
            elif cell.header.attribute:
418
                atvs = AttributeValue.objects.filter(attribute__name=cell.header.name, content=cell.value)
419
                unique = not qs.filter(attribute_values__in=atvs).exists()
420
            if not unique:
421
                errors.append(
422
                    Error('unique-constraint-failed', _('Unique constraint on column "%s" failed') % cell.header.name))
423
        row.errors.extend(errors)
424
        row.is_valid = row.is_valid and not bool(errors)
425
        return not bool(errors)
426

  
427
    def do_import_row(self, row):
428
        if not row.is_valid:
429
            return
430

  
431
        for header in self.headers:
432
            if header.key:
433
                header_key = header
434
                break
435
        else:
436
            assert False, 'no header key'
437

  
438
        user = None
439
        key_value = row[header_key].value
440
        if header_key.field:
441
            users = User.objects.filter(
442
                **{header_key.name: key_value})
443
        elif header_key.attribute:
444
            atvs = AttributeValue.objects.filter(attribute__name=header_key.name, content=key_value)
445
            users = User.objects.filter(attribute_values__in=atvs)
446
        users = users[:2]
447
        if users:
448
            row.action = 'update'
449
        else:
450
            row.action = 'create'
451

  
452
        if len(users) > 1:
453
            row.errors.append(
454
                Error('key-matches-too-many-users',
455
                      _('Key value "%s" matches too many users') % key_value))
456
            return
457

  
458
        user = None
459
        if users:
460
            user = users[0]
461

  
462
        if not self.check_unique_constraints(row, user=user):
463
            return
464

  
465
        if not user:
466
            user = User()
467

  
468
        for cell in row.cells:
469
            if not cell.header.field:
470
                continue
471
            if (row.action == 'create' and cell.header.create) or (row.action == 'update' and cell.header.update):
472
                if getattr(user, cell.header.name) != cell.value:
473
                    setattr(user, cell.header.name, cell.value)
474
                    if cell.header.name == 'email' and cell.header.verified:
475
                        user.email_verified = True
476
                    cell.action = 'updated'
477
                    continue
478
            cell.action = 'nothing'
479

  
480
        user.save()
481

  
482
        for cell in row.cells:
483
            if cell.header.field or not cell.header.attribute:
484
                continue
485
            if (row.action == 'create' and cell.header.create) or (row.action == 'update' and cell.header.update):
486
                attributes = user.attributes
487
                if cell.header.verified:
488
                    attributes = user.verified_attributes
489
                if getattr(attributes, cell.header.name) != cell.value:
490
                    setattr(attributes, cell.header.name, cell.value)
491
                    cell.action = 'updated'
492
                    continue
493
            cell.action = 'nothing'
494

  
495
        setattr(self, row.action + 'd', getattr(self, row.action + 'd') + 1)
src/authentic2/models.py
243 243
    def natural_key(self):
244 244
        return (self.name,)
245 245

  
246
    def __repr__(self):
247
        return '<%s %s>' % (self.__class__.__name__, repr(str(self)))
248

  
246 249
    def __str__(self):
247 250
        return self.label
248 251

  
tests/test_csv_import.py
1
# -*- coding: utf-8 -*-
2
# authentic2 - versatile identity manager
3
# Copyright (C) 2010-2019 Entr'ouvert
4
#
5
# This program is free software: you can redistribute it and/or modify it
6
# under the terms of the GNU Affero General Public License as published
7
# by the Free Software Foundation, either version 3 of the License, or
8
# (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17

  
18
from __future__ import unicode_literals
19

  
20
import pytest
21

  
22
import io
23

  
24
from authentic2.custom_user.models import User
25
from authentic2.models import Attribute
26
from authentic2.a2_rbac.utils import get_default_ou
27

  
28
from authentic2.csv_import import CsvImporter, UserCsvImporter, CsvHeader, Error, LineError
29

  
30
ENCODINGS = [
31
    'iso-8859-1',
32
    'iso-8859-15',
33
    'utf-8',
34
    'cp1252',
35
]
36

  
37

  
38
def pytest_generate_tests(metafunc):
39
    if 'encoding' in metafunc.fixturenames:
40
        metafunc.parametrize('encoding', ENCODINGS)
41
    if 'style' in metafunc.fixturenames:
42
        metafunc.parametrize('style', ['str', 'file'])
43

  
44

  
45
@pytest.fixture
46
def profile(db):
47
    Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone')
48

  
49

  
50
@pytest.fixture
51
def csv_importer_factory(encoding, style):
52
    def factory(content):
53
        content = content.encode(encoding)
54
        if style == 'file':
55
            content = io.BytesIO(content)
56
        importer = CsvImporter()
57
        run = importer.run
58
        importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs)
59
        return importer
60
    return factory
61

  
62

  
63
@pytest.fixture
64
def user_csv_importer_factory(encoding, style):
65
    def factory(content):
66
        content = content.encode(encoding)
67
        if style == 'file':
68
            content = io.BytesIO(content)
69
        importer = UserCsvImporter()
70
        run = importer.run
71
        importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs)
72
        return importer
73
    return factory
74

  
75

  
76
def test_unknown_csv_dialect_error(profile, user_csv_importer_factory):
77
    importer = user_csv_importer_factory('')
78
    assert not importer.run()
79
    assert importer.has_errors
80
    assert importer.errors == [Error('unknown-csv-dialect')]
81

  
82

  
83
def test_empty_header_row_error(profile, user_csv_importer_factory):
84
    importer = user_csv_importer_factory('\n1,2,3')
85
    assert not importer.run()
86
    assert importer.has_errors
87
    assert importer.errors == [Error('empty-header-row')]
88

  
89

  
90
def test_unknown_or_missing_attribute_error1(profile, user_csv_importer_factory):
91
    importer = user_csv_importer_factory('email key,first_name," "\n1,2,3')
92
    assert not importer.run()
93
    assert importer.has_errors
94
    assert importer.errors == [LineError('unknown-or-missing-attribute', line=1, column=2)]
95

  
96

  
97
def test_unknown_or_missing_attribute_error2(profile, user_csv_importer_factory):
98
    importer = user_csv_importer_factory('email key,first_name,x\n1,2,3')
99
    assert not importer.run()
100
    assert importer.has_errors
101
    assert importer.errors == [LineError('unknown-or-missing-attribute', line=1, column=3)]
102

  
103

  
104
def test_unknown_flag_error(profile, user_csv_importer_factory):
105
    importer = user_csv_importer_factory('email key,first_name xxx\n1,2')
106
    assert not importer.run()
107
    assert importer.has_errors
108
    assert importer.errors == [LineError('unknown-flag', line=1, column=2)]
109

  
110

  
111
def test_missing_key_column_error(profile, user_csv_importer_factory):
112
    importer = user_csv_importer_factory('email,first_name\n1,2')
113
    assert not importer.run()
114
    assert importer.has_errors
115
    assert importer.errors == [Error('missing-key-column')]
116

  
117

  
118
def test_too_many_key_columns_error(profile, user_csv_importer_factory):
119
    importer = user_csv_importer_factory('email key,first_name key\n1,2')
120
    assert not importer.run()
121
    assert importer.has_errors
122
    assert importer.errors == [Error('too-many-key-columns')]
123

  
124

  
125
def test_run(profile, user_csv_importer_factory):
126
    assert User.objects.count() == 0
127
    content = '''email key verified,first_name,last_name,phone update
128
tnoel@entrouvert.com,Thomas,Noël,1234
129
fpeters@entrouvert.com,Frédéric,Péters,5678
130
x,x,x,x'''
131
    importer = user_csv_importer_factory(content)
132

  
133
    assert importer.run(), importer.errors
134
    assert importer.headers == [
135
        CsvHeader(1, 'email', field=True, key=True, verified=True),
136
        CsvHeader(2, 'first_name', field=True),
137
        CsvHeader(3, 'last_name', field=True),
138
        CsvHeader(4, 'phone', attribute=True),
139
    ]
140
    assert importer.has_errors
141
    assert len(importer.rows) == 3
142
    assert all(row.is_valid for row in importer.rows[:2])
143
    assert not importer.rows[2].is_valid
144
    assert importer.rows[2].cells[0].errors
145
    assert all(error == Error('data-error') for error in importer.rows[2].cells[0].errors)
146
    assert not importer.rows[2].cells[1].errors
147
    assert not importer.rows[2].cells[2].errors
148
    assert importer.rows[2].cells[3].errors
149
    assert all(error == Error('data-error') for error in importer.rows[2].cells[3].errors)
150

  
151
    assert importer.updated == 0
152
    assert importer.created == 2
153

  
154
    assert User.objects.count() == 2
155
    thomas = User.objects.get(email='tnoel@entrouvert.com')
156
    assert thomas.email_verified is True
157
    assert thomas.first_name == 'Thomas'
158
    assert thomas.attributes.first_name == 'Thomas'
159
    assert thomas.last_name == 'Noël'
160
    assert thomas.attributes.last_name == 'Noël'
161
    assert thomas.attributes.phone == '1234'
162

  
163
    fpeters = User.objects.get(email='fpeters@entrouvert.com')
164
    assert fpeters.first_name == 'Frédéric'
165
    assert fpeters.email_verified is True
166
    assert fpeters.attributes.first_name == 'Frédéric'
167
    assert fpeters.last_name == 'Péters'
168
    assert fpeters.attributes.last_name == 'Péters'
169
    assert fpeters.attributes.phone == '5678'
170

  
171

  
172
def test_simulate(profile, user_csv_importer_factory):
173
    assert User.objects.count() == 0
174
    content = '''email key verified,first_name,last_name,phone update
175
tnoel@entrouvert.com,Thomas,Noël,1234
176
fpeters@entrouvert.com,Frédéric,Péters,5678
177
x,x,x,x'''
178
    importer = user_csv_importer_factory(content)
179

  
180
    assert importer.run(simulate=True), importer.errors
181
    assert importer.headers == [
182
        CsvHeader(1, 'email', field=True, key=True, verified=True),
183
        CsvHeader(2, 'first_name', field=True),
184
        CsvHeader(3, 'last_name', field=True),
185
        CsvHeader(4, 'phone', attribute=True),
186
    ]
187
    assert importer.has_errors
188
    assert len(importer.rows) == 3
189
    assert all(row.is_valid for row in importer.rows[:2])
190
    assert not importer.rows[2].is_valid
191
    assert importer.rows[2].cells[0].errors
192
    assert all(error == Error('data-error') for error in importer.rows[2].cells[0].errors)
193
    assert not importer.rows[2].cells[1].errors
194
    assert not importer.rows[2].cells[2].errors
195
    assert importer.rows[2].cells[3].errors
196
    assert all(error == Error('data-error') for error in importer.rows[2].cells[3].errors)
197

  
198
    assert importer.updated == 0
199
    assert importer.created == 2
200

  
201
    assert User.objects.count() == 0
202

  
203

  
204
def test_create_unique_error(profile, user_csv_importer_factory):
205

  
206
    content = '''email key verified,first_name,last_name,phone unique
207
tnoel@entrouvert.com,Thomas,Noël,1234'''
208
    importer = user_csv_importer_factory(content)
209

  
210
    user = User.objects.create(ou=get_default_ou())
211
    user.attributes.phone = '1234'
212

  
213
    assert importer.run()
214

  
215
    assert importer.created == 0
216
    assert importer.updated == 0
217
    assert len(importer.rows) == 1
218
    assert not importer.rows[0].is_valid
219
    assert importer.rows[0].action == 'create'
220
    assert all(not cell.errors for cell in importer.rows[0])
221
    assert all(not cell.action for cell in importer.rows[0])
222
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
223

  
224

  
225
def test_create_unique_in_ou(profile, user_csv_importer_factory):
226

  
227
    content = '''email key verified,first_name,last_name,phone unique
228
tnoel@entrouvert.com,Thomas,Noël,1234'''
229
    importer = user_csv_importer_factory(content)
230

  
231
    user = User.objects.create()
232
    user.attributes.phone = '1234'
233

  
234
    assert importer.run()
235

  
236
    assert len(importer.rows) == 1
237
    assert importer.rows[0].is_valid
238
    assert importer.rows[0].action == 'create'
239
    assert all(not cell.errors for cell in importer.rows[0])
240
    assert all(cell.action == 'updated' for cell in importer.rows[0])
241
    assert importer.created == 1
242
    assert importer.updated == 0
243

  
244

  
245
def test_create_unique_globally_error(profile, user_csv_importer_factory):
246

  
247
    content = '''email key verified,first_name,last_name,phone globally-unique
248
tnoel@entrouvert.com,Thomas,Noël,1234'''
249
    importer = user_csv_importer_factory(content)
250

  
251
    user = User.objects.create()
252
    user.attributes.phone = '1234'
253

  
254
    assert importer.run()
255

  
256
    assert importer.created == 0
257
    assert importer.updated == 0
258
    assert len(importer.rows) == 1
259
    assert not importer.rows[0].is_valid
260
    assert importer.rows[0].action == 'create'
261
    assert all(not cell.errors for cell in importer.rows[0])
262
    assert all(not cell.action for cell in importer.rows[0])
263
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
264

  
265

  
266
def test_update_unique_error(profile, user_csv_importer_factory):
267
    content = '''email key verified,first_name,last_name,phone unique update
268
tnoel@entrouvert.com,Thomas,Noël,1234'''
269
    importer = user_csv_importer_factory(content)
270

  
271
    user = User.objects.create(ou=get_default_ou())
272
    user.attributes.phone = '1234'
273

  
274
    user = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
275

  
276
    assert importer.run()
277

  
278
    assert importer.created == 0
279
    assert importer.updated == 0
280
    assert len(importer.rows) == 1
281
    assert not importer.rows[0].is_valid
282
    assert importer.rows[0].action == 'update'
283
    assert all(not cell.errors for cell in importer.rows[0])
284
    assert all(not cell.action for cell in importer.rows[0])
285
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
286

  
287

  
288
def test_update_unique_globally_error(profile, user_csv_importer_factory):
289
    content = '''email key verified,first_name,last_name,phone globally-unique update
290
tnoel@entrouvert.com,Thomas,Noël,1234'''
291
    importer = user_csv_importer_factory(content)
292

  
293
    user = User.objects.create()
294
    user.attributes.phone = '1234'
295

  
296
    User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
297

  
298
    assert importer.run()
299

  
300
    assert importer.created == 0
301
    assert importer.updated == 0
302
    assert len(importer.rows) == 1
303
    assert not importer.rows[0].is_valid
304
    assert importer.rows[0].action == 'update'
305
    assert all(not cell.errors for cell in importer.rows[0])
306
    assert all(not cell.action for cell in importer.rows[0])
307
    assert importer.rows[0].errors == [Error('unique-constraint-failed')]
308

  
309

  
310
def test_update_unique_globally(profile, user_csv_importer_factory):
311
    content = '''email key verified no-update,first_name no-update,last_name no-update,phone unique update
312
tnoel@entrouvert.com,Thomas,Noël,1234'''
313
    importer = user_csv_importer_factory(content)
314

  
315
    user = User.objects.create()
316
    user.attributes.phone = '1234'
317

  
318
    thomas = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou())
319

  
320
    assert importer.run()
321

  
322
    assert importer.created == 0
323
    assert importer.updated == 1
324
    assert len(importer.rows) == 1
325
    assert importer.rows[0].is_valid
326
    assert importer.rows[0].action == 'update'
327
    assert all(not cell.errors for cell in importer.rows[0])
328
    assert all(cell.action == 'nothing' for cell in importer.rows[0].cells[:3])
329
    assert importer.rows[0].cells[3].action == 'updated'
330

  
331
    thomas.refresh_from_db()
332
    assert not thomas.first_name
333
    assert not thomas.last_name
334
    assert thomas.attributes.phone == '1234'
0
-