0003-add-csv-import-framework-32833.patch
debian/control | ||
---|---|---|
29 | 29 |
python-django-filters (>= 1), |
30 | 30 |
python-django-filters (<< 2), |
31 | 31 |
python-pil, |
32 |
python-tablib |
|
32 |
python-tablib, |
|
33 |
python-chardet, |
|
34 |
python-attr |
|
33 | 35 |
Breaks: python-authentic2-auth-fc (<< 0.26) |
34 | 36 |
Replaces: python-authentic2-auth-fc (<< 0.26) |
35 | 37 |
Provides: ${python:Provides}, python-authentic2-auth-fc |
setup.py | ||
---|---|---|
140 | 140 |
'xstatic-select2', |
141 | 141 |
'pillow', |
142 | 142 |
'tablib', |
143 |
'chardet', |
|
144 |
'attrs', |
|
143 | 145 |
], |
144 | 146 |
zip_safe=False, |
145 | 147 |
classifiers=[ |
src/authentic2/csv_import.py | ||
---|---|---|
1 |
# authentic2 - versatile identity manager |
|
2 |
# Copyright (C) 2010-2019 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software: you can redistribute it and/or modify it |
|
5 |
# under the terms of the GNU Affero General Public License as published |
|
6 |
# by the Free Software Foundation, either version 3 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU Affero General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU Affero General Public License |
|
15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from __future__ import unicode_literals |
|
18 | ||
19 |
import csv |
|
20 |
import io |
|
21 | ||
22 |
from chardet.universaldetector import UniversalDetector |
|
23 |
import attr |
|
24 | ||
25 |
from django import forms |
|
26 |
from django.core.exceptions import FieldDoesNotExist |
|
27 |
from django.core.validators import RegexValidator |
|
28 |
from django.db import IntegrityError |
|
29 |
from django.db.transaction import atomic |
|
30 |
from django.utils import six |
|
31 |
from django.utils.translation import ugettext as _ |
|
32 | ||
33 |
from authentic2 import app_settings |
|
34 |
from authentic2.a2_rbac.utils import get_default_ou |
|
35 |
from authentic2.custom_user.models import User |
|
36 |
from authentic2.forms.profile import modelform_factory, BaseUserForm |
|
37 |
from authentic2.models import Attribute, AttributeValue, UserExternalId |
|
38 | ||
39 | ||
40 |
class UTF8Recoder(object): |
|
41 |
def __init__(self, fd): |
|
42 |
self.fd = fd |
|
43 | ||
44 |
def __iter__(self): |
|
45 |
return self |
|
46 | ||
47 |
def next(self): |
|
48 |
return self.fd.next().encode('utf-8') |
|
49 | ||
50 | ||
51 |
class UnicodeReader(object): |
|
52 |
def __init__(self, fd, dialect='excel', **kwargs): |
|
53 |
self.reader = csv.reader(UTF8Recoder(fd), dialect=dialect, **kwargs) |
|
54 | ||
55 |
def next(self): |
|
56 |
row = self.reader.next() |
|
57 |
return [s.decode('utf-8') for s in row] |
|
58 | ||
59 |
def __iter__(self): |
|
60 |
return self |
|
61 | ||
62 | ||
63 |
class CsvImporter(object): |
|
64 |
rows = None |
|
65 |
error = None |
|
66 |
error_description = None |
|
67 |
encoding = None |
|
68 | ||
69 |
def run(self, fd_or_str, encoding): |
|
70 |
if isinstance(fd_or_str, six.binary_type): |
|
71 |
input_fd = io.BytesIO(fd_or_str) |
|
72 |
elif isinstance(fd_or_str, six.text_type): |
|
73 |
input_fd = io.StringIO(fd_or_str) |
|
74 |
elif not hasattr(fd_or_str, 'read1'): |
|
75 |
try: |
|
76 |
input_fd = io.open(fd_or_str.fileno(), closefd=False) |
|
77 |
except Exception: |
|
78 |
try: |
|
79 |
fd_or_str.seek(0) |
|
80 |
except Exception: |
|
81 |
pass |
|
82 |
content = fd_or_str.read() |
|
83 |
if isinstance(content, six.text_type): |
|
84 |
input_fd = io.StringIO(content) |
|
85 |
else: |
|
86 |
input_fd = io.BytesIO(content) |
|
87 |
else: |
|
88 |
input_fd = fd_or_str |
|
89 | ||
90 |
assert hasattr(input_fd, 'read'), 'fd_or_str is not a string or a file object' |
|
91 | ||
92 |
def set_encoding(input_fd, encoding): |
|
93 |
# detect StringIO |
|
94 |
if hasattr(input_fd, 'line_buffering'): |
|
95 |
return input_fd |
|
96 | ||
97 |
if encoding == 'detect': |
|
98 |
detector = UniversalDetector() |
|
99 | ||
100 |
try: |
|
101 |
for line in input_fd: |
|
102 |
detector.feed(line) |
|
103 |
if detector.done: |
|
104 |
break |
|
105 |
else: |
|
106 |
self.error = Error('cannot-detect-encoding', _('Cannot detect encoding')) |
|
107 |
return None |
|
108 |
detector.close() |
|
109 |
encoding = detector.result['encoding'] |
|
110 |
finally: |
|
111 |
input_fd.seek(0) |
|
112 | ||
113 |
if not hasattr(input_fd, 'readable'): |
|
114 |
input_fd = io.open(input_fd.fileno(), 'rb', closefd=False) |
|
115 |
return io.TextIOWrapper(input_fd, encoding=encoding) |
|
116 | ||
117 |
def parse_csv(): |
|
118 |
try: |
|
119 |
dialect = csv.Sniffer().sniff(input_fd.read().encode('utf-8')) |
|
120 |
except csv.Error as e: |
|
121 |
self.error = Error('unknown-csv-dialect', _('Unknown CSV dialect: %s') % e) |
|
122 |
return False |
|
123 |
finally: |
|
124 |
input_fd.seek(0) |
|
125 | ||
126 |
if not dialect: |
|
127 |
self.error = Error('unknown-csv-dialect', _('Unknown CSV dialect')) |
|
128 |
return False |
|
129 |
reader = UnicodeReader(input_fd, dialect) |
|
130 |
self.rows = list(reader) |
|
131 |
return True |
|
132 | ||
133 |
input_fd = set_encoding(input_fd, encoding) |
|
134 |
if input_fd is None: |
|
135 |
return False |
|
136 | ||
137 |
return parse_csv() |
|
138 | ||
139 | ||
140 |
@attr.s |
|
141 |
class CsvHeader(object): |
|
142 |
column = attr.ib() |
|
143 |
name = attr.ib(default='') |
|
144 |
field = attr.ib(default=False, converter=bool) |
|
145 |
attribute = attr.ib(default=False, converter=bool) |
|
146 |
create = attr.ib(default=True, metadata={'flag': True}) |
|
147 |
update = attr.ib(default=True, metadata={'flag': True}) |
|
148 |
key = attr.ib(default=False, metadata={'flag': True}) |
|
149 |
unique = attr.ib(default=False, metadata={'flag': True}) |
|
150 |
globally_unique = attr.ib(default=False, metadata={'flag': True}) |
|
151 |
verified = attr.ib(default=False, metadata={'flag': True}) |
|
152 | ||
153 |
@property |
|
154 |
def flags(self): |
|
155 |
flags = [] |
|
156 |
for attribute in attr.fields(self.__class__): |
|
157 |
if attribute.metadata.get('flag'): |
|
158 |
if getattr(self, attribute.name): |
|
159 |
flags.append(attribute.name) |
|
160 |
else: |
|
161 |
flags.append('no-' + attribute.name.replace('_', '-')) |
|
162 |
return flags |
|
163 | ||
164 | ||
165 |
@attr.s |
|
166 |
class Error(object): |
|
167 |
code = attr.ib() |
|
168 |
description = attr.ib(default='', cmp=False) |
|
169 | ||
170 | ||
171 |
@attr.s(cmp=False) |
|
172 |
class LineError(Error): |
|
173 |
line = attr.ib(default=0) |
|
174 |
column = attr.ib(default=0) |
|
175 | ||
176 |
@classmethod |
|
177 |
def from_error(cls, error): |
|
178 |
return cls(**attr.asdict(error)) |
|
179 | ||
180 |
def as_error(self): |
|
181 |
return Error(self.code, self.description) |
|
182 | ||
183 |
def __eq__(self, other): |
|
184 |
if isinstance(other, Error): |
|
185 |
return self.as_error() == other |
|
186 |
return (self.code, self.line, self.column) == (other.code, other.line, other.column) |
|
187 | ||
188 | ||
189 |
class ImportUserForm(BaseUserForm): |
|
190 |
def clean(self): |
|
191 |
super(BaseUserForm, self).clean() |
|
192 |
self._validate_unique = False |
|
193 | ||
194 |
SOURCE_NAME = '_source_name' |
|
195 |
SOURCE_ID = '_source_id' |
|
196 |
SOURCE_COLUMNS = set([SOURCE_NAME, SOURCE_ID]) |
|
197 | ||
198 | ||
199 |
class ImportUserFormWithExternalId(ImportUserForm): |
|
200 |
locals()[SOURCE_NAME] = forms.CharField( |
|
201 |
label=_('Source name'), |
|
202 |
required=False, |
|
203 |
validators=[ |
|
204 |
RegexValidator( |
|
205 |
r'^[a-zA-Z0-9_-]+$', |
|
206 |
_('source_name must no spaces and only letters, digits, - and _'), |
|
207 |
'invalid')]) |
|
208 |
locals()[SOURCE_ID] = forms.CharField( |
|
209 |
label=_('Source external id')) |
|
210 | ||
211 | ||
212 |
@attr.s |
|
213 |
class CsvRow(object): |
|
214 |
line = attr.ib() |
|
215 |
cells = attr.ib(default=[]) |
|
216 |
errors = attr.ib(default=[]) |
|
217 |
is_valid = attr.ib(default=True) |
|
218 |
action = attr.ib(default=None) |
|
219 | ||
220 |
def __getitem__(self, header): |
|
221 |
for cell in self.cells: |
|
222 |
if cell.header == header or cell.header.name == header: |
|
223 |
return cell |
|
224 |
raise KeyError(header.name) |
|
225 | ||
226 |
def __iter__(self): |
|
227 |
return iter(self.cells) |
|
228 | ||
229 | ||
230 |
@attr.s |
|
231 |
class CsvCell(object): |
|
232 |
line = attr.ib() |
|
233 |
header = attr.ib() |
|
234 |
value = attr.ib(default=None) |
|
235 |
missing = attr.ib(default=False) |
|
236 |
errors = attr.ib(default=[]) |
|
237 |
action = attr.ib(default=None) |
|
238 | ||
239 |
@property |
|
240 |
def column(self): |
|
241 |
return self.header.column |
|
242 | ||
243 | ||
244 |
class Simulate(Exception): |
|
245 |
pass |
|
246 | ||
247 | ||
248 |
class CancelImport(Exception): |
|
249 |
pass |
|
250 | ||
251 | ||
252 |
class UserCsvImporter(object): |
|
253 |
csv_importer = None |
|
254 |
errors = None |
|
255 |
headers = None |
|
256 |
headers_by_name = None |
|
257 |
rows = None |
|
258 |
has_errors = False |
|
259 |
ou = None |
|
260 |
updated = 0 |
|
261 |
created = 0 |
|
262 |
rows_with_errors = 0 |
|
263 | ||
264 |
def add_error(self, line_error): |
|
265 |
if not hasattr(line_error, 'line'): |
|
266 |
line_error = LineError.from_error(line_error) |
|
267 |
self.errors.append(line_error) |
|
268 | ||
269 |
def run(self, fd_or_str, encoding, ou=None, simulate=False): |
|
270 |
self.ou = ou or get_default_ou() |
|
271 |
self.errors = [] |
|
272 |
self.csv_importer = CsvImporter() |
|
273 | ||
274 |
def parse_csv(): |
|
275 |
if not self.csv_importer.run(fd_or_str, encoding): |
|
276 |
self.add_error(self.csv_importer.error) |
|
277 | ||
278 |
def do_import(): |
|
279 |
try: |
|
280 |
with atomic(): |
|
281 |
for row in self.rows: |
|
282 |
if not self.do_import_row(row): |
|
283 |
self.rows_with_errors += 1 |
|
284 |
if simulate: |
|
285 |
raise Simulate |
|
286 |
except Simulate: |
|
287 |
pass |
|
288 | ||
289 |
for action in [ |
|
290 |
parse_csv, |
|
291 |
self.parse_header_row, |
|
292 |
self.parse_rows, |
|
293 |
do_import]: |
|
294 |
action() |
|
295 |
if self.errors: |
|
296 |
break |
|
297 | ||
298 |
self.has_errors = self.has_errors or bool(self.errors) |
|
299 |
return not bool(self.errors) |
|
300 | ||
301 |
def parse_header_row(self): |
|
302 |
self.headers = [] |
|
303 |
self.headers_by_name = {} |
|
304 | ||
305 |
try: |
|
306 |
header_row = self.csv_importer.rows[0] |
|
307 |
except IndexError: |
|
308 |
self.add_error(Error('no-header-row', _('Missing header row'))) |
|
309 |
return |
|
310 | ||
311 |
for i, head in enumerate(header_row): |
|
312 |
self.parse_header(head, column=i + 1) |
|
313 | ||
314 |
if not self.headers: |
|
315 |
self.add_error(Error('empty-header-row', _('Empty header row'))) |
|
316 |
return |
|
317 | ||
318 |
key_counts = sum(1 for header in self.headers if header.key) |
|
319 | ||
320 |
if not key_counts: |
|
321 |
self.add_error(Error('missing-key-column', _('Missing key column'))) |
|
322 |
if key_counts > 1: |
|
323 |
self.add_error(Error('too-many-key-columns', _('Too many key columns'))) |
|
324 | ||
325 |
header_names = set(self.headers_by_name) |
|
326 |
if header_names & SOURCE_COLUMNS and not SOURCE_COLUMNS.issubset(header_names): |
|
327 |
self.add_error( |
|
328 |
Error('invalid-external-id-pair', |
|
329 |
_('You must have a source_name and a source_id column'))) |
|
330 | ||
331 |
def parse_header(self, head, column): |
|
332 |
splitted = head.split() |
|
333 |
try: |
|
334 |
header = CsvHeader(column, splitted[0]) |
|
335 |
if header.name in self.headers_by_name: |
|
336 |
self.add_error( |
|
337 |
Error('duplicate-header', _('Header "%s" is duplicated') % header.name)) |
|
338 |
return |
|
339 |
self.headers_by_name[header.name] = header |
|
340 |
except IndexError: |
|
341 |
header = CsvHeader(column) |
|
342 |
else: |
|
343 |
if header.name in SOURCE_COLUMNS: |
|
344 |
if header.name == SOURCE_ID: |
|
345 |
header.key = True |
|
346 |
else: |
|
347 |
try: |
|
348 |
if header.name in ['email', 'first_name', 'last_name', 'username']: |
|
349 |
User._meta.get_field(header.name) |
|
350 |
header.field = True |
|
351 |
if header.name == 'email': |
|
352 |
# by default email are expected to be verified |
|
353 |
header.verified = True |
|
354 |
if header.name == 'email' and self.email_is_unique: |
|
355 |
header.unique = True |
|
356 |
if app_settings.A2_EMAIL_IS_UNIQUE: |
|
357 |
header.globally_unique = True |
|
358 |
if header.name == 'username' and self.username_is_unique: |
|
359 |
header.unique = True |
|
360 |
if app_settings.A2_USERNAME_IS_UNIQUE: |
|
361 |
header.globally_unique = True |
|
362 |
except FieldDoesNotExist: |
|
363 |
pass |
|
364 |
if not header.field: |
|
365 |
try: |
|
366 |
attribute = Attribute.objects.get(name=header.name) # NOQA: F841 |
|
367 |
header.attribute = True |
|
368 |
except Attribute.DoesNotExist: |
|
369 |
pass |
|
370 | ||
371 |
self.headers.append(header) |
|
372 | ||
373 |
if (not (header.field or header.attribute) |
|
374 |
and header.name not in SOURCE_COLUMNS): |
|
375 |
self.add_error(LineError('unknown-or-missing-attribute', |
|
376 |
_('unknown or missing attribute "%s"') % head, |
|
377 |
line=1, column=column)) |
|
378 |
return |
|
379 | ||
380 |
for flag in splitted[1:]: |
|
381 |
if header.name in SOURCE_COLUMNS: |
|
382 |
self.add_error(LineError( |
|
383 |
'flag-forbidden-on-source-columns', |
|
384 |
_('You cannot set flags on source_app and source_id columns'), |
|
385 |
line=1)) |
|
386 |
break |
|
387 |
value = True |
|
388 |
if flag.startswith('no-'): |
|
389 |
value = False |
|
390 |
flag = flag[3:] |
|
391 |
flag = flag.replace('-', '_') |
|
392 |
try: |
|
393 |
if not getattr(attr.fields(CsvHeader), flag).metadata['flag']: |
|
394 |
raise TypeError |
|
395 |
setattr(header, flag, value) |
|
396 |
except (AttributeError, TypeError, KeyError): |
|
397 |
self.add_error(LineError('unknown-flag', _('unknown flag "%s"'), line=1, column=column)) |
|
398 | ||
399 |
def parse_rows(self): |
|
400 |
base_form_class = ImportUserForm |
|
401 |
if SOURCE_NAME in self.headers_by_name: |
|
402 |
base_form_class = ImportUserFormWithExternalId |
|
403 |
form_class = modelform_factory(User, fields=self.headers_by_name.keys(), form=base_form_class) |
|
404 |
rows = self.rows = [] |
|
405 |
for i, row in enumerate(self.csv_importer.rows[1:]): |
|
406 |
csv_row = self.parse_row(form_class, row, line=i + 2) |
|
407 |
self.has_errors = self.has_errors or not(csv_row.is_valid) |
|
408 |
rows.append(csv_row) |
|
409 | ||
410 |
def parse_row(self, form_class, row, line): |
|
411 |
data = {} |
|
412 | ||
413 |
for header in self.headers: |
|
414 |
try: |
|
415 |
data[header.name] = row[header.column - 1] |
|
416 |
except IndexError: |
|
417 |
pass |
|
418 | ||
419 |
form = form_class(data=data) |
|
420 |
form.is_valid() |
|
421 | ||
422 |
def get_form_errors(form, name): |
|
423 |
return [Error('data-error', six.text_type(value)) for value in form.errors.get(name, [])] |
|
424 | ||
425 |
cells = [ |
|
426 |
CsvCell( |
|
427 |
line=line, |
|
428 |
header=header, |
|
429 |
value=data.get(header.name), |
|
430 |
missing=header.name not in data, |
|
431 |
errors=get_form_errors(form, header.name)) |
|
432 |
for header in self.headers] |
|
433 |
cell_errors = any(bool(cell.errors) for cell in cells) |
|
434 |
errors = get_form_errors(form, '__all__') |
|
435 |
return CsvRow( |
|
436 |
line=line, |
|
437 |
cells=cells, |
|
438 |
errors=errors, |
|
439 |
is_valid=not bool(cell_errors or errors)) |
|
440 | ||
441 |
@property |
|
442 |
def email_is_unique(self): |
|
443 |
return app_settings.A2_EMAIL_IS_UNIQUE or self.ou.email_is_unique |
|
444 | ||
445 |
@property |
|
446 |
def username_is_unique(self): |
|
447 |
return app_settings.A2_USERNAME_IS_UNIQUE or self.ou.username_is_unique |
|
448 | ||
449 |
def check_unique_constraints(self, row, user=None): |
|
450 |
ou_users = User.objects.filter(ou=self.ou) |
|
451 |
users = User.objects.all() |
|
452 |
if user: |
|
453 |
users = users.exclude(pk=user.pk) |
|
454 |
ou_users = ou_users.exclude(pk=user.pk) |
|
455 |
errors = [] |
|
456 |
for cell in row: |
|
457 |
if (not cell.header.globally_unique and not cell.header.unique) or (user and not cell.header.update): |
|
458 |
continue |
|
459 |
qs = ou_users |
|
460 |
if cell.header.globally_unique: |
|
461 |
qs = users |
|
462 |
if cell.header.field: |
|
463 |
unique = not qs.filter(**{cell.header.name: cell.value}).exists() |
|
464 |
elif cell.header.attribute: |
|
465 |
atvs = AttributeValue.objects.filter(attribute__name=cell.header.name, content=cell.value) |
|
466 |
unique = not qs.filter(attribute_values__in=atvs).exists() |
|
467 |
if not unique: |
|
468 |
errors.append( |
|
469 |
Error('unique-constraint-failed', _('Unique constraint on column "%s" failed') % cell.header.name)) |
|
470 |
row.errors.extend(errors) |
|
471 |
row.is_valid = row.is_valid and not bool(errors) |
|
472 |
return not bool(errors) |
|
473 | ||
474 |
@atomic |
|
475 |
def do_import_row(self, row): |
|
476 |
if not row.is_valid: |
|
477 |
return False |
|
478 | ||
479 |
for header in self.headers: |
|
480 |
if header.key: |
|
481 |
header_key = header |
|
482 |
break |
|
483 |
else: |
|
484 |
assert False, 'should not happen' |
|
485 | ||
486 |
user = None |
|
487 |
if header_key.name == SOURCE_ID: |
|
488 |
# lookup by external id |
|
489 |
source_name = row[SOURCE_NAME].value |
|
490 |
source_id = row[SOURCE_ID].value |
|
491 |
userexternalids = UserExternalId.objects.filter(source=source_name, external_id=source_id) |
|
492 |
users = User.objects.filter(userexternalid__in=userexternalids)[:2] |
|
493 |
else: |
|
494 |
# lookup by field/attribute |
|
495 |
key_value = row[header_key].value |
|
496 |
if header_key.field: |
|
497 |
users = User.objects.filter( |
|
498 |
**{header_key.name: key_value}) |
|
499 |
elif header_key.attribute: |
|
500 |
atvs = AttributeValue.objects.filter(attribute__name=header_key.name, content=key_value) |
|
501 |
users = User.objects.filter(attribute_values__in=atvs) |
|
502 |
users = users[:2] |
|
503 | ||
504 |
if users: |
|
505 |
row.action = 'update' |
|
506 |
else: |
|
507 |
row.action = 'create' |
|
508 | ||
509 |
if len(users) > 1: |
|
510 |
row.errors.append( |
|
511 |
Error('key-matches-too-many-users', |
|
512 |
_('Key value "%s" matches too many users') % key_value)) |
|
513 |
return False |
|
514 | ||
515 |
user = None |
|
516 |
if users: |
|
517 |
user = users[0] |
|
518 | ||
519 |
if not self.check_unique_constraints(row, user=user): |
|
520 |
return False |
|
521 | ||
522 |
if not user: |
|
523 |
user = User() |
|
524 | ||
525 |
for cell in row.cells: |
|
526 |
if not cell.header.field: |
|
527 |
continue |
|
528 |
if (row.action == 'create' and cell.header.create) or (row.action == 'update' and cell.header.update): |
|
529 |
if getattr(user, cell.header.name) != cell.value: |
|
530 |
setattr(user, cell.header.name, cell.value) |
|
531 |
if cell.header.name == 'email' and cell.header.verified: |
|
532 |
user.email_verified = True |
|
533 |
cell.action = 'updated' |
|
534 |
continue |
|
535 |
cell.action = 'nothing' |
|
536 | ||
537 |
user.save() |
|
538 | ||
539 |
if header_key.name == SOURCE_ID: |
|
540 |
try: |
|
541 |
UserExternalId.objects.create(user=user, |
|
542 |
source=source_name, |
|
543 |
external_id=source_id) |
|
544 |
except IntegrityError: |
|
545 |
# should never happen since we have a unique index... |
|
546 |
self.errors.append( |
|
547 |
Error('external-id-already-exist', |
|
548 |
_('External id "%s.%s" already exists') % (source_name, source_id))) |
|
549 |
raise CancelImport |
|
550 | ||
551 |
for cell in row.cells: |
|
552 |
if cell.header.field or not cell.header.attribute: |
|
553 |
continue |
|
554 |
if (row.action == 'create' and cell.header.create) or (row.action == 'update' and cell.header.update): |
|
555 |
attributes = user.attributes |
|
556 |
if cell.header.verified: |
|
557 |
attributes = user.verified_attributes |
|
558 |
if getattr(attributes, cell.header.name) != cell.value: |
|
559 |
setattr(attributes, cell.header.name, cell.value) |
|
560 |
cell.action = 'updated' |
|
561 |
continue |
|
562 |
cell.action = 'nothing' |
|
563 | ||
564 |
setattr(self, row.action + 'd', getattr(self, row.action + 'd') + 1) |
|
565 |
return True |
src/authentic2/models.py | ||
---|---|---|
246 | 246 |
def natural_key(self): |
247 | 247 |
return (self.name,) |
248 | 248 | |
249 |
def __repr__(self): |
|
250 |
return '<%s %s>' % (self.__class__.__name__, repr(str(self))) |
|
251 | ||
249 | 252 |
def __str__(self): |
250 | 253 |
return self.label |
251 | 254 |
tests/test_csv_import.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
# authentic2 - versatile identity manager |
|
3 |
# Copyright (C) 2010-2019 Entr'ouvert |
|
4 |
# |
|
5 |
# This program is free software: you can redistribute it and/or modify it |
|
6 |
# under the terms of the GNU Affero General Public License as published |
|
7 |
# by the Free Software Foundation, either version 3 of the License, or |
|
8 |
# (at your option) any later version. |
|
9 |
# |
|
10 |
# This program is distributed in the hope that it will be useful, |
|
11 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
12 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
13 |
# GNU Affero General Public License for more details. |
|
14 |
# |
|
15 |
# You should have received a copy of the GNU Affero General Public License |
|
16 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
17 | ||
18 |
from __future__ import unicode_literals |
|
19 | ||
20 |
import pytest |
|
21 | ||
22 |
import io |
|
23 | ||
24 |
from authentic2.custom_user.models import User |
|
25 |
from authentic2.models import Attribute |
|
26 |
from authentic2.a2_rbac.utils import get_default_ou |
|
27 | ||
28 |
from authentic2.csv_import import CsvImporter, UserCsvImporter, CsvHeader, Error, LineError |
|
29 | ||
30 |
ENCODINGS = [ |
|
31 |
'iso-8859-1', |
|
32 |
'iso-8859-15', |
|
33 |
'utf-8', |
|
34 |
'cp1252', |
|
35 |
] |
|
36 | ||
37 | ||
38 |
def pytest_generate_tests(metafunc): |
|
39 |
if 'encoding' in metafunc.fixturenames: |
|
40 |
metafunc.parametrize('encoding', ENCODINGS) |
|
41 |
if 'style' in metafunc.fixturenames: |
|
42 |
metafunc.parametrize('style', ['str', 'file']) |
|
43 | ||
44 | ||
45 |
@pytest.fixture |
|
46 |
def profile(db): |
|
47 |
Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone') |
|
48 | ||
49 | ||
50 |
@pytest.fixture |
|
51 |
def csv_importer_factory(encoding, style): |
|
52 |
def factory(content): |
|
53 |
content = content.encode(encoding) |
|
54 |
if style == 'file': |
|
55 |
content = io.BytesIO(content) |
|
56 |
importer = CsvImporter() |
|
57 |
run = importer.run |
|
58 |
importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs) |
|
59 |
return importer |
|
60 |
return factory |
|
61 | ||
62 | ||
63 |
@pytest.fixture |
|
64 |
def user_csv_importer_factory(encoding, style): |
|
65 |
def factory(content): |
|
66 |
content = content.encode(encoding) |
|
67 |
if style == 'file': |
|
68 |
content = io.BytesIO(content) |
|
69 |
importer = UserCsvImporter() |
|
70 |
run = importer.run |
|
71 |
importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs) |
|
72 |
return importer |
|
73 |
return factory |
|
74 | ||
75 | ||
76 |
def test_unknown_csv_dialect_error(profile, user_csv_importer_factory): |
|
77 |
importer = user_csv_importer_factory('') |
|
78 |
assert not importer.run() |
|
79 |
assert importer.has_errors |
|
80 |
assert importer.errors == [Error('unknown-csv-dialect')] |
|
81 | ||
82 | ||
83 |
def test_empty_header_row_error(profile, user_csv_importer_factory): |
|
84 |
importer = user_csv_importer_factory('\n1,2,3') |
|
85 |
assert not importer.run() |
|
86 |
assert importer.has_errors |
|
87 |
assert importer.errors == [Error('empty-header-row')] |
|
88 | ||
89 | ||
90 |
def test_unknown_or_missing_attribute_error1(profile, user_csv_importer_factory): |
|
91 |
importer = user_csv_importer_factory('email key,first_name," "\n1,2,3') |
|
92 |
assert not importer.run() |
|
93 |
assert importer.has_errors |
|
94 |
assert importer.errors == [LineError('unknown-or-missing-attribute', line=1, column=2)] |
|
95 | ||
96 | ||
97 |
def test_unknown_or_missing_attribute_error2(profile, user_csv_importer_factory): |
|
98 |
importer = user_csv_importer_factory('email key,first_name,x\n1,2,3') |
|
99 |
assert not importer.run() |
|
100 |
assert importer.has_errors |
|
101 |
assert importer.errors == [LineError('unknown-or-missing-attribute', line=1, column=3)] |
|
102 | ||
103 | ||
104 |
def test_unknown_flag_error(profile, user_csv_importer_factory): |
|
105 |
importer = user_csv_importer_factory('email key,first_name xxx\n1,2') |
|
106 |
assert not importer.run() |
|
107 |
assert importer.has_errors |
|
108 |
assert importer.errors == [LineError('unknown-flag', line=1, column=2)] |
|
109 | ||
110 | ||
111 |
def test_missing_key_column_error(profile, user_csv_importer_factory): |
|
112 |
importer = user_csv_importer_factory('email,first_name\n1,2') |
|
113 |
assert not importer.run() |
|
114 |
assert importer.has_errors |
|
115 |
assert importer.errors == [Error('missing-key-column')] |
|
116 | ||
117 | ||
118 |
def test_too_many_key_columns_error(profile, user_csv_importer_factory): |
|
119 |
importer = user_csv_importer_factory('email key,first_name key\n1,2') |
|
120 |
assert not importer.run() |
|
121 |
assert importer.has_errors |
|
122 |
assert importer.errors == [Error('too-many-key-columns')] |
|
123 | ||
124 | ||
125 |
def test_run(profile, user_csv_importer_factory): |
|
126 |
assert User.objects.count() == 0 |
|
127 |
content = '''email key,first_name,last_name,phone update |
|
128 |
tnoel@entrouvert.com,Thomas,Noël,1234 |
|
129 |
fpeters@entrouvert.com,Frédéric,Péters,5678 |
|
130 |
x,x,x,x''' |
|
131 |
importer = user_csv_importer_factory(content) |
|
132 | ||
133 |
assert importer.run(), importer.errors |
|
134 |
assert importer.headers == [ |
|
135 |
CsvHeader(1, 'email', field=True, key=True, verified=True), |
|
136 |
CsvHeader(2, 'first_name', field=True), |
|
137 |
CsvHeader(3, 'last_name', field=True), |
|
138 |
CsvHeader(4, 'phone', attribute=True), |
|
139 |
] |
|
140 |
assert importer.has_errors |
|
141 |
assert len(importer.rows) == 3 |
|
142 |
assert all(row.is_valid for row in importer.rows[:2]) |
|
143 |
assert not importer.rows[2].is_valid |
|
144 |
assert importer.rows[2].cells[0].errors |
|
145 |
assert all(error == Error('data-error') for error in importer.rows[2].cells[0].errors) |
|
146 |
assert not importer.rows[2].cells[1].errors |
|
147 |
assert not importer.rows[2].cells[2].errors |
|
148 |
assert importer.rows[2].cells[3].errors |
|
149 |
assert all(error == Error('data-error') for error in importer.rows[2].cells[3].errors) |
|
150 | ||
151 |
assert importer.updated == 0 |
|
152 |
assert importer.created == 2 |
|
153 | ||
154 |
assert User.objects.count() == 2 |
|
155 |
thomas = User.objects.get(email='tnoel@entrouvert.com') |
|
156 |
assert thomas.email_verified is True |
|
157 |
assert thomas.first_name == 'Thomas' |
|
158 |
assert thomas.attributes.first_name == 'Thomas' |
|
159 |
assert thomas.last_name == 'Noël' |
|
160 |
assert thomas.attributes.last_name == 'Noël' |
|
161 |
assert thomas.attributes.phone == '1234' |
|
162 | ||
163 |
fpeters = User.objects.get(email='fpeters@entrouvert.com') |
|
164 |
assert fpeters.first_name == 'Frédéric' |
|
165 |
assert fpeters.email_verified is True |
|
166 |
assert fpeters.attributes.first_name == 'Frédéric' |
|
167 |
assert fpeters.last_name == 'Péters' |
|
168 |
assert fpeters.attributes.last_name == 'Péters' |
|
169 |
assert fpeters.attributes.phone == '5678' |
|
170 | ||
171 | ||
172 |
def test_simulate(profile, user_csv_importer_factory): |
|
173 |
assert User.objects.count() == 0 |
|
174 |
content = '''email key,first_name,last_name,phone update |
|
175 |
tnoel@entrouvert.com,Thomas,Noël,1234 |
|
176 |
fpeters@entrouvert.com,Frédéric,Péters,5678 |
|
177 |
x,x,x,x''' |
|
178 |
importer = user_csv_importer_factory(content) |
|
179 | ||
180 |
assert importer.run(simulate=True), importer.errors |
|
181 |
assert importer.headers == [ |
|
182 |
CsvHeader(1, 'email', field=True, key=True, verified=True), |
|
183 |
CsvHeader(2, 'first_name', field=True), |
|
184 |
CsvHeader(3, 'last_name', field=True), |
|
185 |
CsvHeader(4, 'phone', attribute=True), |
|
186 |
] |
|
187 |
assert importer.has_errors |
|
188 |
assert len(importer.rows) == 3 |
|
189 |
assert all(row.is_valid for row in importer.rows[:2]) |
|
190 |
assert not importer.rows[2].is_valid |
|
191 |
assert importer.rows[2].cells[0].errors |
|
192 |
assert all(error == Error('data-error') for error in importer.rows[2].cells[0].errors) |
|
193 |
assert not importer.rows[2].cells[1].errors |
|
194 |
assert not importer.rows[2].cells[2].errors |
|
195 |
assert importer.rows[2].cells[3].errors |
|
196 |
assert all(error == Error('data-error') for error in importer.rows[2].cells[3].errors) |
|
197 | ||
198 |
assert importer.updated == 0 |
|
199 |
assert importer.created == 2 |
|
200 | ||
201 |
assert User.objects.count() == 0 |
|
202 | ||
203 | ||
204 |
def test_create_unique_error(profile, user_csv_importer_factory): |
|
205 | ||
206 |
content = '''email key verified,first_name,last_name,phone unique |
|
207 |
tnoel@entrouvert.com,Thomas,Noël,1234''' |
|
208 |
importer = user_csv_importer_factory(content) |
|
209 | ||
210 |
user = User.objects.create(ou=get_default_ou()) |
|
211 |
user.attributes.phone = '1234' |
|
212 | ||
213 |
assert importer.run() |
|
214 | ||
215 |
assert importer.created == 0 |
|
216 |
assert importer.updated == 0 |
|
217 |
assert len(importer.rows) == 1 |
|
218 |
assert not importer.rows[0].is_valid |
|
219 |
assert importer.rows[0].action == 'create' |
|
220 |
assert all(not cell.errors for cell in importer.rows[0]) |
|
221 |
assert all(not cell.action for cell in importer.rows[0]) |
|
222 |
assert importer.rows[0].errors == [Error('unique-constraint-failed')] |
|
223 | ||
224 | ||
225 |
def test_create_unique_in_ou(profile, user_csv_importer_factory): |
|
226 | ||
227 |
content = '''email key verified,first_name,last_name,phone unique |
|
228 |
tnoel@entrouvert.com,Thomas,Noël,1234''' |
|
229 |
importer = user_csv_importer_factory(content) |
|
230 | ||
231 |
user = User.objects.create() |
|
232 |
user.attributes.phone = '1234' |
|
233 | ||
234 |
assert importer.run() |
|
235 | ||
236 |
assert len(importer.rows) == 1 |
|
237 |
assert importer.rows[0].is_valid |
|
238 |
assert importer.rows[0].action == 'create' |
|
239 |
assert all(not cell.errors for cell in importer.rows[0]) |
|
240 |
assert all(cell.action == 'updated' for cell in importer.rows[0]) |
|
241 |
assert importer.created == 1 |
|
242 |
assert importer.updated == 0 |
|
243 | ||
244 | ||
245 |
def test_create_unique_globally_error(profile, user_csv_importer_factory): |
|
246 | ||
247 |
content = '''email key verified,first_name,last_name,phone globally-unique |
|
248 |
tnoel@entrouvert.com,Thomas,Noël,1234''' |
|
249 |
importer = user_csv_importer_factory(content) |
|
250 | ||
251 |
user = User.objects.create() |
|
252 |
user.attributes.phone = '1234' |
|
253 | ||
254 |
assert importer.run() |
|
255 | ||
256 |
assert importer.created == 0 |
|
257 |
assert importer.updated == 0 |
|
258 |
assert len(importer.rows) == 1 |
|
259 |
assert not importer.rows[0].is_valid |
|
260 |
assert importer.rows[0].action == 'create' |
|
261 |
assert all(not cell.errors for cell in importer.rows[0]) |
|
262 |
assert all(not cell.action for cell in importer.rows[0]) |
|
263 |
assert importer.rows[0].errors == [Error('unique-constraint-failed')] |
|
264 | ||
265 | ||
266 |
def test_update_unique_error(profile, user_csv_importer_factory): |
|
267 |
content = '''email key verified,first_name,last_name,phone unique update |
|
268 |
tnoel@entrouvert.com,Thomas,Noël,1234''' |
|
269 |
importer = user_csv_importer_factory(content) |
|
270 | ||
271 |
user = User.objects.create(ou=get_default_ou()) |
|
272 |
user.attributes.phone = '1234' |
|
273 | ||
274 |
user = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou()) |
|
275 | ||
276 |
assert importer.run() |
|
277 | ||
278 |
assert importer.created == 0 |
|
279 |
assert importer.updated == 0 |
|
280 |
assert len(importer.rows) == 1 |
|
281 |
assert not importer.rows[0].is_valid |
|
282 |
assert importer.rows[0].action == 'update' |
|
283 |
assert all(not cell.errors for cell in importer.rows[0]) |
|
284 |
assert all(not cell.action for cell in importer.rows[0]) |
|
285 |
assert importer.rows[0].errors == [Error('unique-constraint-failed')] |
|
286 | ||
287 | ||
288 |
def test_update_unique_globally_error(profile, user_csv_importer_factory): |
|
289 |
content = '''email key verified,first_name,last_name,phone globally-unique update |
|
290 |
tnoel@entrouvert.com,Thomas,Noël,1234''' |
|
291 |
importer = user_csv_importer_factory(content) |
|
292 | ||
293 |
user = User.objects.create() |
|
294 |
user.attributes.phone = '1234' |
|
295 | ||
296 |
User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou()) |
|
297 | ||
298 |
assert importer.run() |
|
299 | ||
300 |
assert importer.created == 0 |
|
301 |
assert importer.updated == 0 |
|
302 |
assert len(importer.rows) == 1 |
|
303 |
assert not importer.rows[0].is_valid |
|
304 |
assert importer.rows[0].action == 'update' |
|
305 |
assert all(not cell.errors for cell in importer.rows[0]) |
|
306 |
assert all(not cell.action for cell in importer.rows[0]) |
|
307 |
assert importer.rows[0].errors == [Error('unique-constraint-failed')] |
|
308 | ||
309 | ||
310 |
def test_update_unique_globally(profile, user_csv_importer_factory): |
|
311 |
content = '''email key verified no-update,first_name no-update,last_name no-update,phone unique update |
|
312 |
tnoel@entrouvert.com,Thomas,Noël,1234''' |
|
313 |
importer = user_csv_importer_factory(content) |
|
314 | ||
315 |
user = User.objects.create() |
|
316 |
user.attributes.phone = '1234' |
|
317 | ||
318 |
thomas = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou()) |
|
319 | ||
320 |
assert importer.run() |
|
321 | ||
322 |
assert importer.created == 0 |
|
323 |
assert importer.updated == 1 |
|
324 |
assert len(importer.rows) == 1 |
|
325 |
assert importer.rows[0].is_valid |
|
326 |
assert importer.rows[0].action == 'update' |
|
327 |
assert all(not cell.errors for cell in importer.rows[0]) |
|
328 |
assert all(cell.action == 'nothing' for cell in importer.rows[0].cells[:3]) |
|
329 |
assert importer.rows[0].cells[3].action == 'updated' |
|
330 | ||
331 |
thomas.refresh_from_db() |
|
332 |
assert not thomas.first_name |
|
333 |
assert not thomas.last_name |
|
334 |
assert thomas.attributes.phone == '1234' |
|
335 | ||
336 | ||
337 |
def test_external_id(profile, user_csv_importer_factory): |
|
338 |
assert User.objects.count() == 0 |
|
339 |
content = '''_source_name,_source_id,email,first_name,last_name,phone |
|
340 |
app1,1,tnoel@entrouvert.com,Thomas,Noël,1234 |
|
341 |
app1,2,tnoel@entrouvert.com,Thomas,Noël,1234 |
|
342 |
''' |
|
343 |
importer = user_csv_importer_factory(content) |
|
344 | ||
345 |
assert importer.run(), importer.errors |
|
346 |
assert importer.headers == [ |
|
347 |
CsvHeader(1, '_source_name'), |
|
348 |
CsvHeader(2, '_source_id', key=True), |
|
349 |
CsvHeader(3, 'email', field=True, verified=True), |
|
350 |
CsvHeader(4, 'first_name', field=True), |
|
351 |
CsvHeader(5, 'last_name', field=True), |
|
352 |
CsvHeader(6, 'phone', attribute=True), |
|
353 |
] |
|
354 |
assert not importer.has_errors |
|
355 |
assert len(importer.rows) == 2 |
|
356 |
for external_id in ['1', '2']: |
|
357 |
thomas = User.objects.get( |
|
358 |
userexternalid__source='app1', |
|
359 |
userexternalid__external_id=external_id) |
|
360 | ||
361 |
assert thomas.email_verified is True |
|
362 |
assert thomas.first_name == 'Thomas' |
|
363 |
assert thomas.attributes.first_name == 'Thomas' |
|
364 |
assert thomas.last_name == 'Noël' |
|
365 |
assert thomas.attributes.last_name == 'Noël' |
|
366 |
assert thomas.attributes.phone == '1234' |
|
0 |
- |