Projet

Général

Profil

0001-import-users-from-csv-file-32833.patch

Paul Marillonnet, 14 mai 2019 16:41

Télécharger (21,5 ko)

Voir les différences:

Subject: [PATCH] import users from csv file (#32833)

 src/authentic2/app_settings.py                |  1 +
 src/authentic2/data_transfer.py               | 81 +++++++++++++++
 src/authentic2/manager/forms.py               | 18 ++++
 .../templates/authentic2/manager/users.html   | 14 +++
 .../authentic2/manager/users_csv_import.html  | 29 ++++++
 src/authentic2/manager/urls.py                |  2 +
 src/authentic2/manager/user_views.py          | 64 +++++++++++-
 src/authentic2/manager/utils.py               |  5 +
 tests/test_data_transfer.py                   | 99 ++++++++++++++++++-
 tests/test_manager.py                         | 54 ++++++++++
 tests/users_csv_import_testfile.csv           |  3 +
 tests/users_csv_import_testfile2.csv          |  3 +
 tests/users_csv_import_testfile3.csv          |  3 +
 tests/users_csv_import_testfile_modify1.csv   |  2 +
 tests/users_csv_import_testfile_modify2.csv   |  2 +
 15 files changed, 376 insertions(+), 4 deletions(-)
 create mode 100644 src/authentic2/manager/templates/authentic2/manager/users_csv_import.html
 create mode 100644 tests/users_csv_import_testfile.csv
 create mode 100644 tests/users_csv_import_testfile2.csv
 create mode 100644 tests/users_csv_import_testfile3.csv
 create mode 100644 tests/users_csv_import_testfile_modify1.csv
 create mode 100644 tests/users_csv_import_testfile_modify2.csv
src/authentic2/app_settings.py
209 209
    A2_ACCOUNTS_URL=Setting(default=None, definition='IdP has no account page, redirect to this one.'),
210 210
    A2_CACHE_ENABLED=Setting(default=True, definition='Disable all cache decorators for testing purpose.'),
211 211
    A2_ACCEPT_EMAIL_AUTHENTICATION=Setting(default=True, definition='Enable authentication by email'),
212
    A2_USERS_CSV_IMPORT_APP_ID_LABEL=Setting(default='app_id', definition='Label of the applicative identifier used when importing users from csv'),
212 213

  
213 214
)
214 215

  
src/authentic2/data_transfer.py
1
import csv
2
import logging
3

  
4
from django.contrib.auth import get_user_model
1 5
from django.contrib.contenttypes.models import ContentType
6
from django.db import transaction
2 7

  
3 8
from django_rbac.models import Operation
4 9
from django_rbac.utils import (
5 10
    get_ou_model, get_role_model, get_role_parenting_model, get_permission_model)
6 11
from authentic2.a2_rbac.models import RoleAttribute
12
from authentic2 import app_settings
13
from authentic2.models import UserExternalId, Attribute
14

  
15
logger = logging.getLogger(__name__)
7 16

  
8 17

  
9 18
def update_model(obj, d):
......
318 327
                    import_context.role_delete_orphans))
319 328

  
320 329
    return result
330

  
331

  
332
def create_user_from_data(user_data, app_id):
333
    User = get_user_model()
334
    user = User.objects.create()
335
    for name, value in user_data.items():
336
        if hasattr(user, name):
337
            setattr(user, name, value)
338
        elif hasattr(user.attributes, name):
339
            attribute = Attribute.objects.get(name=name)
340
            deserialize = attribute.get_kind()['deserialize']
341
            try:
342
                value = deserialize(value)
343
            except:
344
                logger.error(
345
                    'invalid value %s for attribute %s' % (value, name))
346
                continue
347
            attribute.set_value(owner=user, value=value)
348
    user.save()
349
    UserExternalId.objects.create(
350
        user=user, external_id=app_id, source='import')
351
    return user
352

  
353

  
354
def import_users_from_csv(csv_file, ou, erase=False):
355
    reader = csv.reader(csv_file) # XXX dialect # XXX bom
356
    rows = list(reader)
357
    app_id_label = app_settings.A2_USERS_CSV_IMPORT_APP_ID_LABEL
358

  
359
    try:
360
        rows[0].index(app_id_label)
361
    except ValueError:
362
        logger.error(
363
            "csv file doesn't contain a column named %s" % app_id_label)
364
        return 0, 0, 0
365

  
366
    nb_created = 0
367
    nb_erased = 0
368
    nb_ignored = 0
369

  
370
    with transaction.atomic():
371
        User = get_user_model()
372
        for row in rows[1:]:
373
            user_data = dict(zip(rows[0], row))
374
            user_data.update({'ou': ou})
375
            app_id = user_data.pop(app_id_label)
376

  
377
            try:
378
                user_external_id = UserExternalId.objects.get(
379
                    external_id=app_id, source='import')
380
            except UserExternalId.MultipleObjectsReturned:
381
                logger.error(
382
                    '''multiple user external ids return for external id value
383
                    '%s' and source 'import'.''' % app_id)
384
                nb_ignored = nb_ignored+1
385
                continue
386
            except UserExternalId.DoesNotExist:
387
                exists = False
388
            else:
389
                exists = True
390

  
391
            if erase:
392
                user_external_id.user.delete()
393
                user_external_id.delete()
394
                nb_erased = nb_erased+1
395
            if erase or not exists:
396
                create_user_from_data(user_data, app_id)
397
                nb_created = nb_created+1
398
            if exists and not erase:
399
                nb_ignored = nb_ignored+1
400

  
401
    return nb_created, nb_erased, nb_ignored
src/authentic2/manager/forms.py
694 694

  
695 695
    class Meta:
696 696
        fields = ()
697

  
698

  
699
class UsersCsvImportForm(forms.Form):
700
    csv_file = forms.FileField(
701
        label=_('CSV file'),
702
        help_text=_('CSV file for the import.')
703
    )
704
    ou = forms.ChoiceField(
705
        label=_('Organizational unit'),
706
        help_text=_('The newly created users will be registered to the chosen OU.'),
707
        choices=utils.get_ou_choices,
708
    )
709
    erase = forms.BooleanField(
710
        label=_('Erase existing users'),
711
        help_text=_('''Defines whether to erase already existing users. If
712
            unchecked, these users won't undergo any deletion.'''),
713
        required=False,
714
    )
src/authentic2/manager/templates/authentic2/manager/users.html
20 20
         {% trans "Add user" %}
21 21
     </a>
22 22
   {% endif %}
23
   {% if import_users %}
24
     <a
25
        href="{% url "a2-manager-users-import" %}"
26
        id="import-users-btn">
27
         {% trans "Import users from csv" %}
28
     </a>
29
   {% else %}
30
     <a
31
        href="#"
32
        class="disabled"
33
        id="import-users-btn">
34
         {% trans "Import users from csv" %}
35
     </a>
36
   {% endif %}
23 37
  </span>
24 38
{% endblock %}
25 39

  
src/authentic2/manager/templates/authentic2/manager/users_csv_import.html
1
{% extends "authentic2/manager/form.html" %}
2
{% load i18n %}
3

  
4
{% block page-title %}
5
  {{ block.super }} - {{ view.title }}
6
{% endblock %}
7

  
8
{% block breadcrumb %}
9
  {{ block.super }}
10
  <a href="..">{% trans "Users" %}</a>
11
  <a href="">{{ view.title }}</a>
12
{% endblock %}
13

  
14
{% block content %}
15
  {% trans "Comma-separated value file defining optional standard user attributes as well as the extended profile attributes below:" %}
16
  <ul>
17
    <li><b>{{ app_id_label }}</b> ({% trans "mandatory" %})</li>
18
  {% for attr in attributes %}
19
    <li>{{ attr.name }}</li>
20
  {% endfor %}
21
  </ul>
22

  
23
  <form enctype="multipart/form-data" method="post">
24

  
25
    {% csrf_token %}
26
    {{ form.as_p }}
27
    <button class="submit-button">{% trans "Create" %}</button>
28
  </form>
29
{% endblock %}
src/authentic2/manager/urls.py
21 21
        url(r'^users/$', user_views.users, name='a2-manager-users'),
22 22
        url(r'^users/export/(?P<format>csv)/$',
23 23
            user_views.users_export, name='a2-manager-users-export'),
24
        url(r'^users/import/$',
25
            user_views.users_csv_import, name='a2-manager-users-import'),
24 26
        url(r'^users/add/$', user_views.user_add_default_ou,
25 27
            name='a2-manager-user-add-default-ou'),
26 28
        url(r'^users/(?P<ou_pk>\d+)/add/$', user_views.user_add,
src/authentic2/manager/user_views.py
15 15
from django.contrib import messages
16 16
from django.http import HttpResponseRedirect, QueryDict
17 17
from django.views.generic.detail import SingleObjectMixin
18
from django.views.generic import View
18
from django.views.generic import View, FormView
19 19

  
20 20
from import_export.fields import Field
21 21
import tablib
......
24 24
from authentic2.models import Attribute, AttributeValue, PasswordReset
25 25
from authentic2.utils import switch_user, send_password_reset_mail, redirect, select_next_url
26 26
from authentic2.a2_rbac.utils import get_default_ou
27
from authentic2 import hooks
27
from authentic2 import hooks, app_settings as a2_app_settings
28 28
from django_rbac.utils import get_role_model, get_role_parenting_model, get_ou_model
29 29

  
30 30

  
......
33 33
    BaseSubTableView, HideOUColumnMixin, BaseDeleteView, BaseDetailView
34 34
from .tables import UserTable, UserRolesTable, OuUserRolesTable
35 35
from .forms import (UserSearchForm, UserAddForm, UserEditForm,
36
    UserChangePasswordForm, ChooseUserRoleForm, UserRoleSearchForm, UserChangeEmailForm)
36
    UserChangePasswordForm, ChooseUserRoleForm, UserRoleSearchForm, UserChangeEmailForm,
37
    UsersCsvImportForm)
37 38
from .resources import UserResource
38 39
from .utils import get_ou_count
39 40
from . import app_settings
......
88 89
            ou = self.search_form.cleaned_data.get('ou')
89 90
        if ou and self.request.user.has_ou_perm('custom_user.add_user', ou):
90 91
            ctx['add_ou'] = ou
92
        if self.request.user.has_perm('custom_user.add_user'):
93
            ctx['import_users'] = True
91 94
        return ctx
92 95

  
93 96

  
......
603 606

  
604 607

  
605 608
user_delete = UserDeleteView.as_view()
609

  
610

  
611
class UsersCsvImportView(FormView):
612
    form_class = UsersCsvImportForm
613
    template_name = 'authentic2/manager/users_csv_import.html'
614
    title = _('CSV import')
615
    success_url = '..'
616
    permissions = ['custom_user.add_user']
617

  
618
    def get_context_data(self, **kwargs):
619
        from authentic2.models import Attribute
620

  
621
        ctx = super(UsersCsvImportView, self).get_context_data()
622
        ctx.update({
623
            'attributes': Attribute.objects.all(),
624
            'app_id_label': a2_app_settings.A2_USERS_CSV_IMPORT_APP_ID_LABEL
625
        })
626
        return ctx
627

  
628
    def form_valid(self, form):
629
        from authentic2.data_transfer import import_users_from_csv
630

  
631
        ou_id = form.cleaned_data.get('ou')
632
        ou = get_ou_model().objects.get(id=ou_id) or get_default_ou()
633
        csv_file = form.cleaned_data.get('csv_file')
634
        erase = form.cleaned_data.get('erase')
635

  
636
        if not csv_file:
637
            messages.warning(
638
                self.request,
639
                _('No import performed. Please provide a csv file.')
640
            )
641
        else:
642
            try:
643
                nb_created, nb_modified, nb_ignored = import_users_from_csv(
644
                        csv_file, ou, modify)
645
            except ValueError as e:
646
                messages.error(
647
                    self.request,
648
                    _('Value error while importing the users: %s') % e
649
                )
650
            else:
651
                message = _('Created %s users.') % nb_created
652
                if nb_modified:
653
                    message = message + (' Modified %s users.') % nb_modified
654
                if nb_ignored:
655
                    message = message + (' Ignored %s users.') % nb_ignored
656
                messages.info(
657
                    self.request,
658
                    message
659
                )
660
        return super(UsersCsvImportView, self).form_valid(form)
661

  
662

  
663
users_csv_import = UsersCsvImportView.as_view()
src/authentic2/manager/utils.py
26 26
@GlobalCache(timeout=10)
27 27
def get_ou_count():
28 28
    return get_ou_model().objects.count()
29

  
30

  
31
def get_ou_choices():
32
    Ou = get_ou_model()
33
    return [(ou.id, ou.name) for ou in Ou.objects.all()]
tests/test_data_transfer.py
1
from django.contrib.auth import get_user_model
1 2
from django_rbac.utils import get_role_model, get_ou_model
2 3
import pytest
4
import csv
3 5

  
4 6
from authentic2.a2_rbac.models import RoleParenting
7
from authentic2.a2_rbac.utils import get_default_ou
5 8
from authentic2.data_transfer import (
6 9
    export_site,
7 10
    ExportContext,
......
12 15
    ImportContext,
13 16
    RoleDeserializer,
14 17
    search_role,
15
    import_ou)
18
    import_ou,
19
    import_users_from_csv)
20
from authentic2.models import UserExternalId, Attribute
16 21
from authentic2.utils import get_hex_uuid
17 22

  
18 23

  
......
523 528
    d = export_site(ExportContext(export_ous=False))
524 529
    assert 'ous' not in d
525 530

  
531

  
532
def test_import_users_from_csv(db):
533
    ou = OU.objects.create(name='ou')
534
    User = get_user_model()
535
    assert not len(User.objects.all())
536

  
537
    with open('tests/users_csv_import_testfile.csv', 'r') as csv_file:
538
        assert import_users_from_csv(csv_file, ou) == (2, 0, 0)
539

  
540
    with open('tests/users_csv_import_testfile.csv', 'r') as csv_file:
541
        reader = csv.reader(csv_file)
542
        rows = list(reader)
543
        assert len(User.objects.all()) == len(rows)-1
544
        for i in range(2):
545
            user = User.objects.all()[i]
546
            assert user.first_name == rows[i+1][1].decode('utf-8')
547
            assert user.last_name == rows[i+1][2].decode('utf-8')
548
            assert user.email == rows[i+1][3].decode('utf-8')
549
            assert user.ou == ou
550
            external_id = UserExternalId.objects.get(user=user)
551
            assert external_id.source == 'import'
552
            assert external_id.external_id == rows[i+1][0].decode('utf-8')
553

  
554

  
555
def test_import_users_from_csv_profile_attributes(db):
556
    Attribute.objects.create(
557
        label='Nickname',
558
        name='nickname',
559
        kind='string',
560
        user_visible=True,
561
        user_editable=True
562
    )
563
    Attribute.objects.create(
564
        label='Is a spy',
565
        name='is_a_spy',
566
        kind='boolean',
567
        user_visible=True,
568
        user_editable=True
569
    )
570
    Attribute.objects.create(
571
        label='Enrolled on',
572
        name='enrolled_on',
573
        kind='date',
574
        user_visible=True,
575
        user_editable=True
576
    )
577
    Attribute.objects.create(
578
        label='Secret phone number',
579
        name='secret_phone_number',
580
        kind='phone_number',
581
        user_visible=True,
582
        user_editable=True
583
    )
584
    Attribute.objects.create(
585
        label='Headquarters postcode',
586
        name='headquarters_postcode',
587
        kind='fr_postcode',
588
        user_visible=True,
589
        user_editable=True
590
    )
591

  
592
    ou = OU.objects.create(name='ou')
593
    User = get_user_model()
594
    assert not len(User.objects.all())
595

  
596
    with open('tests/users_csv_import_testfile2.csv', 'r') as csv_file:
597
        assert import_users_from_csv(csv_file, ou) == (2, 0, 0)
598

  
599
    with open('tests/users_csv_import_testfile2.csv', 'r') as csv_file:
600
        reader = csv.reader(csv_file)
601
        rows = list(reader)
602
        assert len(User.objects.all()) == len(rows)-1
603
        for i in range(2):
604
            user = User.objects.all()[i]
605
            assert user.attributes.nickname == rows[i+1][3].decode('utf-8')
606
            assert user.attributes.is_a_spy == bool(int(rows[i+1][4].decode('utf-8')))
607
            assert user.attributes.enrolled_on.isoformat() == rows[i+1][5].decode('utf-8')
608
            assert user.attributes.secret_phone_number == rows[i+1][6].decode('utf-8')
609
            assert user.attributes.headquarters_postcode == rows[i+1][7].decode('utf-8')
610

  
611

  
612
def test_import_users_from_csv_modify(db):
613
    ou = get_default_ou()
614

  
615
    with open('tests/users_csv_import_testfile_modify1.csv', 'r') as csv_file:
616
        assert import_users_from_csv(csv_file, ou) == (1, 0, 0) # 1 created
617

  
618
    with open('tests/users_csv_import_testfile_modify2.csv', 'r') as csv_file:
619
        assert import_users_from_csv(csv_file, ou, True) == (1, 1, 0) # 1 created and 1 erased
620

  
621
    with open('tests/users_csv_import_testfile_modify1.csv', 'r') as csv_file:
622
        assert import_users_from_csv(csv_file, ou, False) == (0, 0, 1) # 1 ignored
tests/test_manager.py
3 3
import pytest
4 4
import json
5 5

  
6
from django import VERSION as django_version
6 7
from django.core.urlresolvers import reverse
7 8
from django.core import mail
8 9

  
9 10
from webtest import Upload
10 11

  
12
from authentic2 import app_settings
11 13
from authentic2.a2_rbac.utils import get_default_ou
14
from authentic2.models import Attribute
12 15

  
13 16
from django_rbac.utils import get_ou_model, get_role_model
14 17
from django.contrib.auth import get_user_model
......
889 892

  
890 893
    user = User.objects.get(id=simple_user.id)
891 894
    assert not user.email_verified
895

  
896

  
897
@pytest.mark.skipif(django_version < (1, 11),
898
        reason='Requires Django v1.11 or higher.')
899
def test_manager_import_users_from_csv(admin, app, db):
900
    Attribute.objects.create(
901
        label='Nickname',
902
        name='nickname',
903
        kind='string',
904
        user_visible=True,
905
        user_editable=True
906
    )
907
    Attribute.objects.create(
908
        label='Freemason',
909
        name='freemason',
910
        kind='boolean',
911
        user_visible=True,
912
        user_editable=True
913
    )
914

  
915
    User = get_user_model()
916
    nb_initial = len(User.objects.all())
917

  
918
    login(app, admin, '/manage/')
919
    url = u'/manage/users/import/'
920

  
921
    ou_foo = get_ou_model().objects.create(name='Foo', slug='foo')
922
    ou_foo.save()
923
    ou_bar = get_ou_model().objects.create(name='Bar', slug='bar')
924
    ou_bar.save()
925
    ous = [
926
        ou_foo,
927
        ou_bar,
928
        get_default_ou()
929
    ]
930

  
931
    response = app.get(url)
932
    assert 'nickname' in response.text
933
    assert 'freemason' in response.text
934
    assert app_settings.A2_USERS_CSV_IMPORT_APP_ID_LABEL in response.text
935
    for ou in ous:
936
        assert 'value="%s"' % ou.id in response.text
937

  
938
    form = response.form
939
    form.set('csv_file', Upload('tests/users_csv_import_testfile3.csv'))
940
    form.set('ou', str(ou_bar.id))
941
    response = form.submit().follow()
942

  
943
    assert len(User.objects.all()) == nb_initial+2
944
    for last_name in ('Dupont', 'Durant'):
945
        assert User.objects.get(last_name=last_name).ou == ou_bar
tests/users_csv_import_testfile.csv
1
app_id,first_name,last_name,email
2
123,Hervé,Dupont,foo@bar.null
3
456,René,Durant,boo@far.null
tests/users_csv_import_testfile2.csv
1
app_id,first_name,last_name,nickname,is_a_spy,enrolled_on,secret_phone_number,headquarters_postcode
2
123,Hervé,Dupont,Herbie,1,2006-06-06,+12345,75001
3
456,René,Durant,Ronnie,0,2007-07-07,+12356,13001
tests/users_csv_import_testfile3.csv
1
app_id,first_name,last_name,nickname,freemason
2
123,Hervé,Dupont,Herv,0
3
456,René,Durant,RR,1
tests/users_csv_import_testfile_modify1.csv
1
app_id,first_name,last_name,email
2
123,Hervé,Dupont,foo@bar.null
tests/users_csv_import_testfile_modify2.csv
1
app_id,first_name,last_name,email
2
123,Hervé,Durant,foo@bar.null
0
-