Projet

Général

Profil

0001-import-users-from-csv-file-32833.patch

Paul Marillonnet, 10 mai 2019 15:21

Télécharger (18,2 ko)

Voir les différences:

Subject: [PATCH] import users from csv file (#32833)

 src/authentic2/app_settings.py                |  1 +
 src/authentic2/data_transfer.py               | 49 +++++++++++
 src/authentic2/manager/forms.py               | 13 +++
 .../templates/authentic2/manager/users.html   | 14 +++
 .../authentic2/manager/users_csv_import.html  | 34 ++++++++
 src/authentic2/manager/urls.py                |  2 +
 src/authentic2/manager/user_views.py          | 57 ++++++++++++-
 src/authentic2/manager/utils.py               |  6 ++
 tests/test_data_transfer.py                   | 85 ++++++++++++++++++-
 tests/test_manager.py                         | 43 ++++++++++
 tests/users_csv_import_testfile.csv           |  3 +
 tests/users_csv_import_testfile2.csv          |  3 +
 tests/users_csv_import_testfile3.csv          |  3 +
 13 files changed, 309 insertions(+), 4 deletions(-)
 create mode 100644 src/authentic2/manager/templates/authentic2/manager/users_csv_import.html
 create mode 100644 tests/users_csv_import_testfile.csv
 create mode 100644 tests/users_csv_import_testfile2.csv
 create mode 100644 tests/users_csv_import_testfile3.csv
src/authentic2/app_settings.py
209 209
    A2_ACCOUNTS_URL=Setting(default=None, definition='IdP has no account page, redirect to this one.'),
210 210
    A2_CACHE_ENABLED=Setting(default=True, definition='Disable all cache decorators for testing purpose.'),
211 211
    A2_ACCEPT_EMAIL_AUTHENTICATION=Setting(default=True, definition='Enable authentication by email'),
212
    A2_USERS_CSV_IMPORT_APP_ID_LABEL=Setting(default='app_id', definition='Label of the applicative identifier used when importing users from csv'),
212 213

  
213 214
)
214 215

  
src/authentic2/data_transfer.py
1
import csv
2
import logging
3

  
4
from django.contrib.auth import get_user_model
1 5
from django.contrib.contenttypes.models import ContentType
6
from django.db import transaction
2 7

  
3 8
from django_rbac.models import Operation
4 9
from django_rbac.utils import (
5 10
    get_ou_model, get_role_model, get_role_parenting_model, get_permission_model)
6 11
from authentic2.a2_rbac.models import RoleAttribute
12
from authentic2 import app_settings
13
from authentic2.models import UserExternalId, Attribute
14

  
15
logger = logging.getLogger(__name__)
7 16

  
8 17

  
9 18
def update_model(obj, d):
......
318 327
                    import_context.role_delete_orphans))
319 328

  
320 329
    return result
330

  
331

  
332
def create_user_from_data(user_data):
333
    User = get_user_model()
334
    user = User.objects.create()
335
    for name, value in user_data.items():
336
        if hasattr(user, name):
337
            setattr(user, name, value)
338
        elif hasattr(user.attributes, name):
339
            attribute = Attribute.objects.get(name=name)
340
            deserialize = attribute.get_kind()['deserialize']
341
            try:
342
                value = deserialize(value)
343
            except:
344
                logger.error(
345
                    'invalid value %s for attribute %s' % (value, name))
346
                continue
347
            attribute.set_value(owner=user, value=value)
348
    user.save()
349
    return user
350

  
351

  
352
def import_users_from_csv(csv_file, ou):
353
    reader = csv.reader(csv_file) # XXX dialect
354
    rows = list(reader)
355
    app_id_label = app_settings.A2_USERS_CSV_IMPORT_APP_ID_LABEL
356

  
357
    rows[0].index(app_id_label)
358

  
359
    with transaction.atomic():
360
        User = get_user_model()
361
        for i in range(1,len(rows)):
362
            user_data = dict(zip(rows[0], rows[i]))
363
            user_data.update({'ou': ou})
364
            app_id = user_data.pop(app_id_label)
365
            user = create_user_from_data(user_data)
366
            user_external_id = UserExternalId.objects.create(
367
                user=user, external_id=app_id, source='import')
368

  
369
    return len(rows)-1
src/authentic2/manager/forms.py
694 694

  
695 695
    class Meta:
696 696
        fields = ()
697

  
698

  
699
class UsersCsvImportForm(forms.Form):
700
    csv_file = forms.FileField(
701
        label=_('CSV file'),
702
        help_text=_('CSV file for the import.')
703
    )
704
    ou = forms.ChoiceField(
705
        label=_('Organizational unit'),
706
        help_text=_('The newly created users will be registered to the chosen OU.'),
707
        choices=utils.get_ou_choices(),
708
        initial=get_default_ou().uuid
709
    )
src/authentic2/manager/templates/authentic2/manager/users.html
20 20
         {% trans "Add user" %}
21 21
     </a>
22 22
   {% endif %}
23
   {% if import_users %}
24
     <a
25
        href="{% url "a2-manager-users-import" %}"
26
        id="import-users-btn">
27
         {% trans "Import users from csv" %}
28
     </a>
29
   {% else %}
30
     <a
31
        href="#"
32
        class="disabled"
33
        id="import-users-btn">
34
         {% trans "Import users from csv" %}
35
     </a>
36
   {% endif %}
23 37
  </span>
24 38
{% endblock %}
25 39

  
src/authentic2/manager/templates/authentic2/manager/users_csv_import.html
1
{% extends "authentic2/manager/form.html" %}
2
{% load i18n %}
3

  
4
{% block page-title %}
5
  {{ block.super }} - {{ view.title }}
6
{% endblock %}
7

  
8
{% block breadcrumb %}
9
  {{ block.super }}
10
  <a href="..">{% trans "Users" %}</a>
11
  <a href="">{{ view.title }}</a>
12
{% endblock %}
13

  
14
{% block content %}
15
  {% trans "Comma-separated value file defining optional standard user attributes as well as the extended profile attributes below:" %}
16
  <ul>
17
    <li><b>{{ app_id_label }}</b> ({% trans "mandatory" %})</li>
18
  {% for attr in attributes %}
19
    <li>{{ attr.name }}</li>
20
  {% endfor %}
21
  </ul>
22

  
23
  <form enctype="multipart/form-data" method="post">
24

  
25
    {% csrf_token %}
26
    {{ form.as_p }}
27
    {% if form.instance and form.instance.id %}
28
      <button class="submit-button">{% trans "Submit" %}</button>
29
      <button class="cancel-button" name="cancel" formnovalidate>{% trans "Cancel" %}</button>
30
    {% else %}
31
      <button class="submit-button">{% trans "Create" %}</button>
32
    {% endif %}
33
  </form>
34
{% endblock %}
src/authentic2/manager/urls.py
21 21
        url(r'^users/$', user_views.users, name='a2-manager-users'),
22 22
        url(r'^users/export/(?P<format>csv)/$',
23 23
            user_views.users_export, name='a2-manager-users-export'),
24
        url(r'^users/import/$',
25
            user_views.users_csv_import, name='a2-manager-users-import'),
24 26
        url(r'^users/add/$', user_views.user_add_default_ou,
25 27
            name='a2-manager-user-add-default-ou'),
26 28
        url(r'^users/(?P<ou_pk>\d+)/add/$', user_views.user_add,
src/authentic2/manager/user_views.py
15 15
from django.contrib import messages
16 16
from django.http import HttpResponseRedirect, QueryDict
17 17
from django.views.generic.detail import SingleObjectMixin
18
from django.views.generic import View
18
from django.views.generic import View, FormView
19 19

  
20 20
from import_export.fields import Field
21 21
import tablib
......
24 24
from authentic2.models import Attribute, AttributeValue, PasswordReset
25 25
from authentic2.utils import switch_user, send_password_reset_mail, redirect, select_next_url
26 26
from authentic2.a2_rbac.utils import get_default_ou
27
from authentic2 import hooks
27
from authentic2 import hooks, app_settings as a2_app_settings
28 28
from django_rbac.utils import get_role_model, get_role_parenting_model, get_ou_model
29 29

  
30 30

  
......
33 33
    BaseSubTableView, HideOUColumnMixin, BaseDeleteView, BaseDetailView
34 34
from .tables import UserTable, UserRolesTable, OuUserRolesTable
35 35
from .forms import (UserSearchForm, UserAddForm, UserEditForm,
36
    UserChangePasswordForm, ChooseUserRoleForm, UserRoleSearchForm, UserChangeEmailForm)
36
    UserChangePasswordForm, ChooseUserRoleForm, UserRoleSearchForm, UserChangeEmailForm,
37
    UsersCsvImportForm)
37 38
from .resources import UserResource
38 39
from .utils import get_ou_count
39 40
from . import app_settings
......
88 89
            ou = self.search_form.cleaned_data.get('ou')
89 90
        if ou and self.request.user.has_ou_perm('custom_user.add_user', ou):
90 91
            ctx['add_ou'] = ou
92
        if self.request.user.has_perm('custom_user.add_user'):
93
            ctx['import_users'] = True
91 94
        return ctx
92 95

  
93 96

  
......
603 606

  
604 607

  
605 608
user_delete = UserDeleteView.as_view()
609

  
610

  
611
class UsersCsvImportView(FormView):
612
    form_class = UsersCsvImportForm
613
    template_name = 'authentic2/manager/users_csv_import.html'
614
    title = _('CSV import')
615
    success_url = '..'
616
    permissions = ['custom_user.add_user']
617

  
618
    def get_context_data(self, **kwargs):
619
        from authentic2.models import Attribute
620

  
621
        ctx = super(UsersCsvImportView, self).get_context_data()
622
        ctx.update({
623
            'attributes': Attribute.objects.all(),
624
            'app_id_label': a2_app_settings.A2_USERS_CSV_IMPORT_APP_ID_LABEL
625
        })
626
        return ctx
627

  
628
    def form_valid(self, form):
629
        from authentic2.data_transfer import import_users_from_csv
630

  
631
        ou_uuid = form.cleaned_data.get('ou')
632
        ou = get_ou_model().objects.get(uuid=ou_uuid) or get_default_ou()
633
        csv_file = form.cleaned_data.get('csv_file')
634

  
635
        if not csv_file:
636
            messages.warning(
637
                self.request,
638
                _('No import performed. Please provide a csv file.')
639
            )
640
        else:
641
            try:
642
                nb_created = import_users_from_csv(csv_file, ou)
643
            except ValueError as e:
644
                messages.error(
645
                    self.request,
646
                    _('Value error while importing the users: %s') % e
647
                )
648
            else:
649
                messages.info(
650
                    self.request,
651
                    _('Created %s users.') % nb_created
652
                )
653
        return super(UsersCsvImportView, self).form_valid(form)
654

  
655

  
656
users_csv_import = UsersCsvImportView.as_view()
src/authentic2/manager/utils.py
26 26
@GlobalCache(timeout=10)
27 27
def get_ou_count():
28 28
    return get_ou_model().objects.count()
29

  
30

  
31
@GlobalCache(timeout=10)
32
def get_ou_choices():
33
    Ou = get_ou_model()
34
    return [(ou.uuid, ou.name) for ou in Ou.objects.all()]
tests/test_data_transfer.py
1
from django.contrib.auth import get_user_model
1 2
from django_rbac.utils import get_role_model, get_ou_model
2 3
import pytest
4
import csv
3 5

  
4 6
from authentic2.a2_rbac.models import RoleParenting
5 7
from authentic2.data_transfer import (
......
12 14
    ImportContext,
13 15
    RoleDeserializer,
14 16
    search_role,
15
    import_ou)
17
    import_ou,
18
    import_users_from_csv)
19
from authentic2.models import UserExternalId, Attribute
16 20
from authentic2.utils import get_hex_uuid
17 21

  
18 22

  
......
523 527
    d = export_site(ExportContext(export_ous=False))
524 528
    assert 'ous' not in d
525 529

  
530

  
531
def test_import_users_from_csv(db):
532
    ou = OU.objects.create(name='ou')
533
    User = get_user_model()
534
    assert not len(User.objects.all())
535

  
536
    with open('tests/users_csv_import_testfile.csv', 'r') as csv_file:
537
        assert import_users_from_csv(csv_file, ou) == 2
538

  
539
    with open('tests/users_csv_import_testfile.csv', 'r') as csv_file:
540
        reader = csv.reader(csv_file)
541
        rows = list(reader)
542
        assert len(User.objects.all()) == len(rows)-1
543
        for i in range(2):
544
            user = User.objects.all()[i]
545
            assert user.first_name == rows[i+1][1].decode('utf-8')
546
            assert user.last_name == rows[i+1][2].decode('utf-8')
547
            assert user.email == rows[i+1][3].decode('utf-8')
548
            assert user.ou == ou
549
            external_id = UserExternalId.objects.get(user=user)
550
            assert external_id.source == 'import'
551
            assert external_id.external_id == rows[i+1][0].decode('utf-8')
552

  
553

  
554
def test_import_users_from_csv_profile_attributes(db):
555
    Attribute.objects.create(
556
        label='Nickname',
557
        name='nickname',
558
        kind='string',
559
        user_visible=True,
560
        user_editable=True
561
    )
562
    Attribute.objects.create(
563
        label='Is a spy',
564
        name='is_a_spy',
565
        kind='boolean',
566
        user_visible=True,
567
        user_editable=True
568
    )
569
    Attribute.objects.create(
570
        label='Enrolled on',
571
        name='enrolled_on',
572
        kind='date',
573
        user_visible=True,
574
        user_editable=True
575
    )
576
    Attribute.objects.create(
577
        label='Secret phone number',
578
        name='secret_phone_number',
579
        kind='phone_number',
580
        user_visible=True,
581
        user_editable=True
582
    )
583
    Attribute.objects.create(
584
        label='Headquarters postcode',
585
        name='headquarters_postcode',
586
        kind='fr_postcode',
587
        user_visible=True,
588
        user_editable=True
589
    )
590

  
591
    ou = OU.objects.create(name='ou')
592
    User = get_user_model()
593
    assert not len(User.objects.all())
594

  
595
    with open('tests/users_csv_import_testfile2.csv', 'r') as csv_file:
596
        assert import_users_from_csv(csv_file, ou) == 2
597

  
598
    with open('tests/users_csv_import_testfile2.csv', 'r') as csv_file:
599
        reader = csv.reader(csv_file)
600
        rows = list(reader)
601
        assert len(User.objects.all()) == len(rows)-1
602
        for i in range(2):
603
            user = User.objects.all()[i]
604
            assert user.attributes.nickname == rows[i+1][3].decode('utf-8')
605
            assert user.attributes.is_a_spy == bool(int(rows[i+1][4].decode('utf-8')))
606
            assert user.attributes.enrolled_on.isoformat() == rows[i+1][5].decode('utf-8')
607
            assert user.attributes.secret_phone_number == rows[i+1][6].decode('utf-8')
608
            assert user.attributes.headquarters_postcode == rows[i+1][7].decode('utf-8')
tests/test_manager.py
8 8

  
9 9
from webtest import Upload
10 10

  
11
from authentic2 import app_settings
11 12
from authentic2.a2_rbac.utils import get_default_ou
13
from authentic2.models import Attribute
12 14

  
13 15
from django_rbac.utils import get_ou_model, get_role_model
14 16
from django.contrib.auth import get_user_model
......
889 891

  
890 892
    user = User.objects.get(id=simple_user.id)
891 893
    assert not user.email_verified
894

  
895

  
896
def test_manager_import_users_from_csv(admin, app, db, ou1, ou2):
897
    Attribute.objects.create(
898
        label='Nickname',
899
        name='nickname',
900
        kind='string',
901
        user_visible=True,
902
        user_editable=True
903
    )
904
    Attribute.objects.create(
905
        label='Freemason',
906
        name='freemason',
907
        kind='boolean',
908
        user_visible=True,
909
        user_editable=True
910
    )
911

  
912
    User = get_user_model()
913
    nb_initial = len(User.objects.all())
914

  
915
    login(app, admin, '/manage/')
916
    url = u'/manage/users/import/'
917

  
918
    ous = [ou1, ou2, get_default_ou()]
919

  
920
    response = app.get(url)
921
    assert 'nickname' in response.text
922
    assert 'freemason' in response.text
923
    assert app_settings.A2_USERS_CSV_IMPORT_APP_ID_LABEL in response.text
924
    for ou in ous:
925
        assert ou.uuid in response.text
926

  
927
    form = response.form
928
    form.set('csv_file', Upload('tests/users_csv_import_testfile3.csv'))
929
    form.set('ou', ou1.uuid)
930
    response = form.submit().follow()
931

  
932
    assert len(User.objects.all()) == nb_initial+2
933
    for last_name in ('Dupont', 'Durant'):
934
        assert User.objects.get(last_name=last_name).ou == ou1
tests/users_csv_import_testfile.csv
1
app_id,first_name,last_name,email
2
123,Hervé,Dupont,foo@bar.null
3
456,René,Durant,boo@far.null
tests/users_csv_import_testfile2.csv
1
app_id,first_name,last_name,nickname,is_a_spy,enrolled_on,secret_phone_number,headquarters_postcode
2
123,Hervé,Dupont,Herbie,1,2006-06-06,+12345,75001
3
456,René,Durant,Ronnie,0,2007-07-07,+12356,13001
tests/users_csv_import_testfile3.csv
1
app_id,first_name,last_name,nickname,freemason
2
123,Hervé,Dupont,Herv,0
3
456,René,Durant,RR,1
0
-