From a9c41a7895307fe478860822c6a28acc26a66400 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Sat, 15 Jun 2019 15:19:44 +0200 Subject: [PATCH 3/5] add csv import framework (#32833) --- debian/control | 4 +- setup.py | 2 + src/authentic2/csv_import.py | 569 +++++++++++++++++++++++++++++++++++ src/authentic2/models.py | 3 + tests/test_csv_import.py | 366 ++++++++++++++++++++++ 5 files changed, 943 insertions(+), 1 deletion(-) create mode 100644 src/authentic2/csv_import.py create mode 100644 tests/test_csv_import.py diff --git a/debian/control b/debian/control index c4e526f1..0cfdd507 100644 --- a/debian/control +++ b/debian/control @@ -29,7 +29,9 @@ Depends: ${misc:Depends}, ${python:Depends}, python-django-filters (>= 1), python-django-filters (<< 2), python-pil, - python-tablib + python-tablib, + python-chardet, + python-attr Breaks: python-authentic2-auth-fc (<< 0.26) Replaces: python-authentic2-auth-fc (<< 0.26) Provides: ${python:Provides}, python-authentic2-auth-fc diff --git a/setup.py b/setup.py index cfe9920c..43193a43 100755 --- a/setup.py +++ b/setup.py @@ -140,6 +140,8 @@ setup(name="authentic2", 'xstatic-select2', 'pillow', 'tablib', + 'chardet', + 'attrs', ], zip_safe=False, classifiers=[ diff --git a/src/authentic2/csv_import.py b/src/authentic2/csv_import.py new file mode 100644 index 00000000..e02b9cd0 --- /dev/null +++ b/src/authentic2/csv_import.py @@ -0,0 +1,569 @@ +# authentic2 - versatile identity manager +# Copyright (C) 2010-2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from __future__ import unicode_literals + +import csv +import io + +from chardet.universaldetector import UniversalDetector +import attr + +from django import forms +from django.core.exceptions import FieldDoesNotExist +from django.core.validators import RegexValidator +from django.db import IntegrityError +from django.db.transaction import atomic +from django.utils import six +from django.utils.translation import ugettext as _ + +from authentic2 import app_settings +from authentic2.a2_rbac.utils import get_default_ou +from authentic2.custom_user.models import User +from authentic2.forms.profile import modelform_factory, BaseUserForm +from authentic2.models import Attribute, AttributeValue, UserExternalId + + +class UTF8Recoder(object): + def __init__(self, fd): + self.fd = fd + + def __iter__(self): + return self + + def next(self): + return self.fd.next().encode('utf-8') + + +class UnicodeReader(object): + def __init__(self, fd, dialect='excel', **kwargs): + self.reader = csv.reader(UTF8Recoder(fd), dialect=dialect, **kwargs) + + def next(self): + row = self.reader.next() + return [s.decode('utf-8') for s in row] + + def __iter__(self): + return self + + +class CsvImporter(object): + rows = None + error = None + error_description = None + encoding = None + + def run(self, fd_or_str, encoding): + if isinstance(fd_or_str, six.binary_type): + input_fd = io.BytesIO(fd_or_str) + elif isinstance(fd_or_str, six.text_type): + input_fd = io.StringIO(fd_or_str) + elif not hasattr(fd_or_str, 'read1'): + try: + input_fd = io.open(fd_or_str.fileno(), closefd=False) + except Exception: + try: + fd_or_str.seek(0) + except Exception: + pass + content = fd_or_str.read() + if isinstance(content, six.text_type): + input_fd = io.StringIO(content) + else: + input_fd = io.BytesIO(content) + else: + input_fd = fd_or_str + + assert hasattr(input_fd, 'read'), 'fd_or_str is not a string or a file object' + + def detect_encoding(input_fd, encoding): + # detect StringIO + if hasattr(input_fd, 'line_buffering'): + return input_fd + + if encoding == 'detect': + detector = UniversalDetector() + + try: + for line in input_fd: + detector.feed(line) + if detector.done: + break + else: + self.error = Error('cannot-detect-encoding', _('Cannot detect encoding')) + return None + detector.close() + encoding = detector.result['encoding'] + finally: + input_fd.seek(0) + + if not hasattr(input_fd, 'readable'): + input_fd = io.open(input_fd.fileno(), 'rb', closefd=False) + return io.TextIOWrapper(input_fd, encoding=encoding) + + def parse_csv(): + try: + dialect = csv.Sniffer().sniff(input_fd.read().encode('utf-8')) + except csv.Error as e: + self.error = Error('unknown-csv-dialect', _('Unknown CSV dialect: %s') % e) + return False + finally: + input_fd.seek(0) + + if not dialect: + self.error = Error('unknown-csv-dialect', _('Unknown CSV dialect')) + return False + reader = UnicodeReader(input_fd, dialect) + self.rows = list(reader) + return True + + input_fd = detect_encoding(input_fd, encoding) + if input_fd is None: + return False + + return parse_csv() + + +@attr.s +class CsvHeader(object): + column = attr.ib() + name = attr.ib(default='') + field = attr.ib(default=False, converter=bool) + attribute = attr.ib(default=False, converter=bool) + create = attr.ib(default=True, metadata={'flag': True}) + update = attr.ib(default=True, metadata={'flag': True}) + key = attr.ib(default=False, metadata={'flag': True}) + unique = attr.ib(default=False, metadata={'flag': True}) + globally_unique = attr.ib(default=False, metadata={'flag': True}) + verified = attr.ib(default=False, metadata={'flag': True}) + + @property + def flags(self): + flags = [] + for attribute in attr.fields(self.__class__): + if attribute.metadata.get('flag'): + if getattr(self, attribute.name): + flags.append(attribute.name) + else: + flags.append('no-' + attribute.name.replace('_', '-')) + return flags + + +@attr.s +class Error(object): + code = attr.ib() + description = attr.ib(default='', cmp=False) + + def __eq__(self, other): + if isinstance(other, Error): + return self.as_error.__eq__(other) + return super(LineError, self).__eq__(other) + + +@attr.s(cmp=False) +class LineError(Error): + line = attr.ib(default=0) + column = attr.ib(default=0) + + @classmethod + def from_error(cls, error): + return cls(**attr.asdict(error)) + + def as_error(self): + return Error(self.code, self.description) + + def __eq__(self, other): + if isinstance(other, Error): + return self.as_error() == other + return (self.code, self.line, self.column) == (other.code, other.line, other.column) + + +class ImportUserForm(BaseUserForm): + def clean(self): + super(BaseUserForm, self).clean() + self._validate_unique = False + +SOURCE_NAME = '_source_name' +SOURCE_ID = '_source_id' +SOURCE_COLUMNS = set([SOURCE_NAME, SOURCE_ID]) + + +class ImportUserFormWithExternalId(ImportUserForm): + locals()[SOURCE_NAME] = forms.CharField( + label=_('Source name'), + required=False, + validators=[ + RegexValidator( + r'^[a-zA-Z0-9_-]+$', + _('source_name must no spaces and only letters, digits, - and _'), + 'invalid')]) + locals()[SOURCE_ID] = forms.CharField( + label=_('Source external id')) + + +@attr.s +class CsvRow(object): + line = attr.ib() + cells = attr.ib(default=[]) + errors = attr.ib(default=[]) + is_valid = attr.ib(default=True) + action = attr.ib(default=None) + + def __getitem__(self, header): + for cell in self.cells: + if cell.header == header or cell.header.name == header: + return cell + raise KeyError(header.name) + + def __iter__(self): + return iter(self.cells) + + +@attr.s +class CsvCell(object): + line = attr.ib() + header = attr.ib() + value = attr.ib(default=None) + missing = attr.ib(default=False) + errors = attr.ib(default=[]) + action = attr.ib(default=None) + + @property + def column(self): + return self.header.column + + +class Simulate(Exception): + pass + + +class CancelImport(Exception): + pass + + +class UserCsvImporter(object): + csv_importer = None + errors = None + headers = None + headers_by_name = None + rows = None + has_errors = False + ou = None + updated = 0 + created = 0 + rows_with_errors = 0 + + def add_error(self, line_error): + if not hasattr(line_error, 'line'): + line_error = LineError.from_error(line_error) + self.errors.append(line_error) + + def run(self, fd_or_str, encoding, ou=None, simulate=False): + self.ou = ou or get_default_ou() + self.errors = [] + self.csv_importer = CsvImporter() + + def parse_csv(): + if not self.csv_importer.run(fd_or_str, encoding): + self.add_error(self.csv_importer.error) + + def do_import(): + try: + with atomic(): + for row in self.rows: + if not self.do_import_row(row): + self.rows_with_errors += 1 + if simulate: + raise Simulate + except Simulate: + pass + + for action in [ + parse_csv, + self.parse_header_row, + self.parse_rows, + do_import]: + action() + if self.errors: + break + + self.has_errors = self.has_errors or bool(self.errors) + return not bool(self.errors) + + def parse_header_row(self): + self.headers = [] + self.headers_by_name = {} + + try: + header_row = self.csv_importer.rows[0] + except IndexError: + self.add_error(Error('no-header-row', _('Missing header row'))) + return + + for i, head in enumerate(header_row): + self.parse_header(head, column=i + 1) + + if not self.headers: + self.add_error(Error('empty-header-row', _('Empty header row'))) + return + + key_counts = sum(1 for header in self.headers if header.key) + + if not key_counts: + self.add_error(Error('missing-key-column', _('Missing key column'))) + if key_counts > 1: + self.add_error(Error('too-many-key-columns', _('Too many key columns'))) + + header_names = set(self.headers_by_name) + if header_names & SOURCE_COLUMNS and not SOURCE_COLUMNS.issubset(header_names): + self.add_error( + Error('invalid-external-id-pair', + _('You must have a source_name and a source_id column'))) + + def parse_header(self, head, column): + splitted = head.split() + try: + header = CsvHeader(column, splitted[0]) + if header.name in self.headers_by_name: + self.add_error( + Error('duplicate-header', _('Header "%s" is duplicated') % header.name)) + return + self.headers_by_name[header.name] = header + except IndexError: + header = CsvHeader(column) + else: + if header.name in (SOURCE_NAME, SOURCE_ID): + if header.name == SOURCE_ID: + header.key = True + else: + try: + if header.name in ['email', 'first_name', 'last_name', 'username']: + field = User._meta.get_field(header.name) # NOQA: F841 + header.field = True + if header.name == 'email': + # by default email are expected to be verified + header.verified = True + if header.name == 'email' and self.email_is_unique: + header.unique = True + if app_settings.A2_EMAIL_IS_UNIQUE: + header.globally_unique = True + if header.name == 'username' and self.username_is_unique: + header.unique = True + if app_settings.A2_USERNAME_IS_UNIQUE: + header.globally_unique = True + except FieldDoesNotExist: + pass + if not header.field: + try: + attribute = Attribute.objects.get(name=header.name) # NOQA: F841 + header.attribute = True + except Attribute.DoesNotExist: + pass + + self.headers.append(header) + + if (not (header.field or header.attribute) + and header.name not in SOURCE_COLUMNS): + self.add_error(LineError('unknown-or-missing-attribute', + _('unknown or missing attribute "%s"') % head, + line=1, column=column)) + return + + for flag in splitted[1:]: + if header.name in SOURCE_COLUMNS: + self.add_error(LineError( + 'flag-forbidden-on-source-columns', + _('You cannot set flags on source_app and source_id columns'))) + break + value = True + if flag.startswith('no-'): + value = False + flag = flag[3:] + flag = flag.replace('-', '_') + try: + if not getattr(attr.fields(CsvHeader), flag).metadata['flag']: + raise TypeError + setattr(header, flag, value) + except (AttributeError, TypeError, KeyError): + self.add_error(LineError('unknown-flag', _('unknown flag "%s"'), line=1, column=column)) + + def parse_rows(self): + base_form_class = ImportUserForm + if SOURCE_NAME in self.headers_by_name: + base_form_class = ImportUserFormWithExternalId + form_class = modelform_factory(User, fields=self.headers_by_name.keys(), form=base_form_class) + rows = self.rows = [] + for i, row in enumerate(self.csv_importer.rows[1:]): + csv_row = self.parse_row(form_class, row, line=i + 2) + self.has_errors = self.has_errors or not(csv_row.is_valid) + rows.append(csv_row) + + def parse_row(self, form_class, row, line): + data = {} + + for header in self.headers: + try: + data[header.name] = row[header.column - 1] + except IndexError: + pass + + form = form_class(data=data) + form.is_valid() + + def get_form_errors(form, name): + return [Error('data-error', six.text_type(value)) for value in form.errors.get(name, [])] + + cells = [ + CsvCell( + line=line, + header=header, + value=data.get(header.name), + missing=header.name not in data, + errors=get_form_errors(form, header.name)) + for header in self.headers] + cell_errors = any(bool(cell.errors) for cell in cells) + errors = get_form_errors(form, '__all__') + return CsvRow( + line=line, + cells=cells, + errors=errors, + is_valid=not bool(cell_errors or errors)) + + @property + def email_is_unique(self): + return app_settings.A2_EMAIL_IS_UNIQUE or self.ou.email_is_unique + + @property + def username_is_unique(self): + return app_settings.A2_USERNAME_IS_UNIQUE or self.ou.username_is_unique + + def check_unique_constraints(self, row, user=None): + ou_users = User.objects.filter(ou=self.ou) + users = User.objects.all() + if user: + users = users.exclude(pk=user.pk) + ou_users = ou_users.exclude(pk=user.pk) + errors = [] + for cell in row: + if (not cell.header.globally_unique and not cell.header.unique) or (user and not cell.header.update): + continue + qs = ou_users + if cell.header.globally_unique: + qs = users + if cell.header.field: + unique = not qs.filter(**{cell.header.name: cell.value}).exists() + elif cell.header.attribute: + atvs = AttributeValue.objects.filter(attribute__name=cell.header.name, content=cell.value) + unique = not qs.filter(attribute_values__in=atvs).exists() + if not unique: + errors.append( + Error('unique-constraint-failed', _('Unique constraint on column "%s" failed') % cell.header.name)) + row.errors.extend(errors) + row.is_valid = row.is_valid and not bool(errors) + return not bool(errors) + + @atomic + def do_import_row(self, row): + if not row.is_valid: + return False + + for header in self.headers: + if header.key: + header_key = header + break + else: + assert False, 'should not happen' + + user = None + if header_key.name == SOURCE_ID: + # lookup by external id + source_name = row[SOURCE_NAME].value + source_id = row[SOURCE_ID].value + userexternalids = UserExternalId.objects.filter(source=source_name, external_id=source_id) + users = User.objects.filter(userexternalid__in=userexternalids)[:2] + else: + # lookup by field/attribute + key_value = row[header_key].value + if header_key.field: + users = User.objects.filter( + **{header_key.name: key_value}) + elif header_key.attribute: + atvs = AttributeValue.objects.filter(attribute__name=header_key.name, content=key_value) + users = User.objects.filter(attribute_values__in=atvs) + users = users[:2] + + if users: + row.action = 'update' + else: + row.action = 'create' + + if len(users) > 1: + row.errors.append( + Error('key-matches-too-many-users', + _('Key value "%s" matches too many users') % key_value)) + return False + + user = None + if users: + user = users[0] + + if not self.check_unique_constraints(row, user=user): + return False + + if not user: + user = User() + + for cell in row.cells: + if not cell.header.field: + continue + if (row.action == 'create' and cell.header.create) or (row.action == 'update' and cell.header.update): + if getattr(user, cell.header.name) != cell.value: + setattr(user, cell.header.name, cell.value) + if cell.header.name == 'email' and cell.header.verified: + user.email_verified = True + cell.action = 'updated' + continue + cell.action = 'nothing' + + user.save() + + if header_key.name == SOURCE_ID: + try: + UserExternalId.objects.create(user=user, + source=source_name, + external_id=source_id) + except IntegrityError: + # should never happen since we have a unique index... + self.errors.append( + Error('external-id-already-exist', + _('External id "%s.%s" already exists') % (source_name, source_id))) + raise CancelImport + + for cell in row.cells: + if cell.header.field or not cell.header.attribute: + continue + if (row.action == 'create' and cell.header.create) or (row.action == 'update' and cell.header.update): + attributes = user.attributes + if cell.header.verified: + attributes = user.verified_attributes + if getattr(attributes, cell.header.name) != cell.value: + setattr(attributes, cell.header.name, cell.value) + cell.action = 'updated' + continue + cell.action = 'nothing' + + setattr(self, row.action + 'd', getattr(self, row.action + 'd') + 1) + return True diff --git a/src/authentic2/models.py b/src/authentic2/models.py index 9f2d239b..a16be648 100644 --- a/src/authentic2/models.py +++ b/src/authentic2/models.py @@ -246,6 +246,9 @@ class Attribute(models.Model): def natural_key(self): return (self.name,) + def __repr__(self): + return '<%s %s>' % (self.__class__.__name__, repr(str(self))) + def __str__(self): return self.label diff --git a/tests/test_csv_import.py b/tests/test_csv_import.py new file mode 100644 index 00000000..5a7aa411 --- /dev/null +++ b/tests/test_csv_import.py @@ -0,0 +1,366 @@ +# -*- coding: utf-8 -*- +# authentic2 - versatile identity manager +# Copyright (C) 2010-2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from __future__ import unicode_literals + +import pytest + +import io + +from authentic2.custom_user.models import User +from authentic2.models import Attribute +from authentic2.a2_rbac.utils import get_default_ou + +from authentic2.csv_import import CsvImporter, UserCsvImporter, CsvHeader, Error, LineError + +ENCODINGS = [ + 'iso-8859-1', + 'iso-8859-15', + 'utf-8', + 'cp1252', +] + + +def pytest_generate_tests(metafunc): + if 'encoding' in metafunc.fixturenames: + metafunc.parametrize('encoding', ENCODINGS) + if 'style' in metafunc.fixturenames: + metafunc.parametrize('style', ['str', 'file']) + + +@pytest.fixture +def profile(db): + Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone') + + +@pytest.fixture +def csv_importer_factory(encoding, style): + def factory(content): + content = content.encode(encoding) + if style == 'file': + content = io.BytesIO(content) + importer = CsvImporter() + run = importer.run + importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs) + return importer + return factory + + +@pytest.fixture +def user_csv_importer_factory(encoding, style): + def factory(content): + content = content.encode(encoding) + if style == 'file': + content = io.BytesIO(content) + importer = UserCsvImporter() + run = importer.run + importer.run = lambda *args, **kwargs: run(content, *args, encoding=encoding, **kwargs) + return importer + return factory + + +def test_unknown_csv_dialect_error(profile, user_csv_importer_factory): + importer = user_csv_importer_factory('') + assert not importer.run() + assert importer.has_errors + assert importer.errors == [Error('unknown-csv-dialect')] + + +def test_empty_header_row_error(profile, user_csv_importer_factory): + importer = user_csv_importer_factory('\n1,2,3') + assert not importer.run() + assert importer.has_errors + assert importer.errors == [Error('empty-header-row')] + + +def test_unknown_or_missing_attribute_error1(profile, user_csv_importer_factory): + importer = user_csv_importer_factory('email key,first_name," "\n1,2,3') + assert not importer.run() + assert importer.has_errors + assert importer.errors == [LineError('unknown-or-missing-attribute', line=1, column=2)] + + +def test_unknown_or_missing_attribute_error2(profile, user_csv_importer_factory): + importer = user_csv_importer_factory('email key,first_name,x\n1,2,3') + assert not importer.run() + assert importer.has_errors + assert importer.errors == [LineError('unknown-or-missing-attribute', line=1, column=3)] + + +def test_unknown_flag_error(profile, user_csv_importer_factory): + importer = user_csv_importer_factory('email key,first_name xxx\n1,2') + assert not importer.run() + assert importer.has_errors + assert importer.errors == [LineError('unknown-flag', line=1, column=2)] + + +def test_missing_key_column_error(profile, user_csv_importer_factory): + importer = user_csv_importer_factory('email,first_name\n1,2') + assert not importer.run() + assert importer.has_errors + assert importer.errors == [Error('missing-key-column')] + + +def test_too_many_key_columns_error(profile, user_csv_importer_factory): + importer = user_csv_importer_factory('email key,first_name key\n1,2') + assert not importer.run() + assert importer.has_errors + assert importer.errors == [Error('too-many-key-columns')] + + +def test_run(profile, user_csv_importer_factory): + assert User.objects.count() == 0 + content = '''email key,first_name,last_name,phone update +tnoel@entrouvert.com,Thomas,Noël,1234 +fpeters@entrouvert.com,Frédéric,Péters,5678 +x,x,x,x''' + importer = user_csv_importer_factory(content) + + assert importer.run(), importer.errors + assert importer.headers == [ + CsvHeader(1, 'email', field=True, key=True, verified=True), + CsvHeader(2, 'first_name', field=True), + CsvHeader(3, 'last_name', field=True), + CsvHeader(4, 'phone', attribute=True), + ] + assert importer.has_errors + assert len(importer.rows) == 3 + assert all(row.is_valid for row in importer.rows[:2]) + assert not importer.rows[2].is_valid + assert importer.rows[2].cells[0].errors + assert all(error == Error('data-error') for error in importer.rows[2].cells[0].errors) + assert not importer.rows[2].cells[1].errors + assert not importer.rows[2].cells[2].errors + assert importer.rows[2].cells[3].errors + assert all(error == Error('data-error') for error in importer.rows[2].cells[3].errors) + + assert importer.updated == 0 + assert importer.created == 2 + + assert User.objects.count() == 2 + thomas = User.objects.get(email='tnoel@entrouvert.com') + assert thomas.email_verified is True + assert thomas.first_name == 'Thomas' + assert thomas.attributes.first_name == 'Thomas' + assert thomas.last_name == 'Noël' + assert thomas.attributes.last_name == 'Noël' + assert thomas.attributes.phone == '1234' + + fpeters = User.objects.get(email='fpeters@entrouvert.com') + assert fpeters.first_name == 'Frédéric' + assert fpeters.email_verified is True + assert fpeters.attributes.first_name == 'Frédéric' + assert fpeters.last_name == 'Péters' + assert fpeters.attributes.last_name == 'Péters' + assert fpeters.attributes.phone == '5678' + + +def test_simulate(profile, user_csv_importer_factory): + assert User.objects.count() == 0 + content = '''email key,first_name,last_name,phone update +tnoel@entrouvert.com,Thomas,Noël,1234 +fpeters@entrouvert.com,Frédéric,Péters,5678 +x,x,x,x''' + importer = user_csv_importer_factory(content) + + assert importer.run(simulate=True), importer.errors + assert importer.headers == [ + CsvHeader(1, 'email', field=True, key=True, verified=True), + CsvHeader(2, 'first_name', field=True), + CsvHeader(3, 'last_name', field=True), + CsvHeader(4, 'phone', attribute=True), + ] + assert importer.has_errors + assert len(importer.rows) == 3 + assert all(row.is_valid for row in importer.rows[:2]) + assert not importer.rows[2].is_valid + assert importer.rows[2].cells[0].errors + assert all(error == Error('data-error') for error in importer.rows[2].cells[0].errors) + assert not importer.rows[2].cells[1].errors + assert not importer.rows[2].cells[2].errors + assert importer.rows[2].cells[3].errors + assert all(error == Error('data-error') for error in importer.rows[2].cells[3].errors) + + assert importer.updated == 0 + assert importer.created == 2 + + assert User.objects.count() == 0 + + +def test_create_unique_error(profile, user_csv_importer_factory): + + content = '''email key verified,first_name,last_name,phone unique +tnoel@entrouvert.com,Thomas,Noël,1234''' + importer = user_csv_importer_factory(content) + + user = User.objects.create(ou=get_default_ou()) + user.attributes.phone = '1234' + + assert importer.run() + + assert importer.created == 0 + assert importer.updated == 0 + assert len(importer.rows) == 1 + assert not importer.rows[0].is_valid + assert importer.rows[0].action == 'create' + assert all(not cell.errors for cell in importer.rows[0]) + assert all(not cell.action for cell in importer.rows[0]) + assert importer.rows[0].errors == [Error('unique-constraint-failed')] + + +def test_create_unique_in_ou(profile, user_csv_importer_factory): + + content = '''email key verified,first_name,last_name,phone unique +tnoel@entrouvert.com,Thomas,Noël,1234''' + importer = user_csv_importer_factory(content) + + user = User.objects.create() + user.attributes.phone = '1234' + + assert importer.run() + + assert len(importer.rows) == 1 + assert importer.rows[0].is_valid + assert importer.rows[0].action == 'create' + assert all(not cell.errors for cell in importer.rows[0]) + assert all(cell.action == 'updated' for cell in importer.rows[0]) + assert importer.created == 1 + assert importer.updated == 0 + + +def test_create_unique_globally_error(profile, user_csv_importer_factory): + + content = '''email key verified,first_name,last_name,phone globally-unique +tnoel@entrouvert.com,Thomas,Noël,1234''' + importer = user_csv_importer_factory(content) + + user = User.objects.create() + user.attributes.phone = '1234' + + assert importer.run() + + assert importer.created == 0 + assert importer.updated == 0 + assert len(importer.rows) == 1 + assert not importer.rows[0].is_valid + assert importer.rows[0].action == 'create' + assert all(not cell.errors for cell in importer.rows[0]) + assert all(not cell.action for cell in importer.rows[0]) + assert importer.rows[0].errors == [Error('unique-constraint-failed')] + + +def test_update_unique_error(profile, user_csv_importer_factory): + content = '''email key verified,first_name,last_name,phone unique update +tnoel@entrouvert.com,Thomas,Noël,1234''' + importer = user_csv_importer_factory(content) + + user = User.objects.create(ou=get_default_ou()) + user.attributes.phone = '1234' + + user = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou()) + + assert importer.run() + + assert importer.created == 0 + assert importer.updated == 0 + assert len(importer.rows) == 1 + assert not importer.rows[0].is_valid + assert importer.rows[0].action == 'update' + assert all(not cell.errors for cell in importer.rows[0]) + assert all(not cell.action for cell in importer.rows[0]) + assert importer.rows[0].errors == [Error('unique-constraint-failed')] + + +def test_update_unique_globally_error(profile, user_csv_importer_factory): + content = '''email key verified,first_name,last_name,phone globally-unique update +tnoel@entrouvert.com,Thomas,Noël,1234''' + importer = user_csv_importer_factory(content) + + user = User.objects.create() + user.attributes.phone = '1234' + + User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou()) + + assert importer.run() + + assert importer.created == 0 + assert importer.updated == 0 + assert len(importer.rows) == 1 + assert not importer.rows[0].is_valid + assert importer.rows[0].action == 'update' + assert all(not cell.errors for cell in importer.rows[0]) + assert all(not cell.action for cell in importer.rows[0]) + assert importer.rows[0].errors == [Error('unique-constraint-failed')] + + +def test_update_unique_globally(profile, user_csv_importer_factory): + content = '''email key verified no-update,first_name no-update,last_name no-update,phone unique update +tnoel@entrouvert.com,Thomas,Noël,1234''' + importer = user_csv_importer_factory(content) + + user = User.objects.create() + user.attributes.phone = '1234' + + thomas = User.objects.create(email='tnoel@entrouvert.com', ou=get_default_ou()) + + assert importer.run() + + assert importer.created == 0 + assert importer.updated == 1 + assert len(importer.rows) == 1 + assert importer.rows[0].is_valid + assert importer.rows[0].action == 'update' + assert all(not cell.errors for cell in importer.rows[0]) + assert all(cell.action == 'nothing' for cell in importer.rows[0].cells[:3]) + assert importer.rows[0].cells[3].action == 'updated' + + thomas.refresh_from_db() + assert not thomas.first_name + assert not thomas.last_name + assert thomas.attributes.phone == '1234' + + +def test_external_id(profile, user_csv_importer_factory): + assert User.objects.count() == 0 + content = '''_source_name,_source_id,email,first_name,last_name,phone +app1,1,tnoel@entrouvert.com,Thomas,Noël,1234 +app1,2,tnoel@entrouvert.com,Thomas,Noël,1234 +''' + importer = user_csv_importer_factory(content) + + assert importer.run(), importer.errors + assert importer.headers == [ + CsvHeader(1, '_source_name'), + CsvHeader(2, '_source_id', key=True), + CsvHeader(3, 'email', field=True, verified=True), + CsvHeader(4, 'first_name', field=True), + CsvHeader(5, 'last_name', field=True), + CsvHeader(6, 'phone', attribute=True), + ] + assert not importer.has_errors + assert len(importer.rows) == 2 + for external_id in ['1', '2']: + thomas = User.objects.get( + userexternalid__source='app1', + userexternalid__external_id=external_id) + + assert thomas.email_verified is True + assert thomas.first_name == 'Thomas' + assert thomas.attributes.first_name == 'Thomas' + assert thomas.last_name == 'Noël' + assert thomas.attributes.last_name == 'Noël' + assert thomas.attributes.phone == '1234' -- 2.20.1