0001-import-users-from-csv-file-32833.patch
src/authentic2/app_settings.py | ||
---|---|---|
209 | 209 |
A2_ACCOUNTS_URL=Setting(default=None, definition='IdP has no account page, redirect to this one.'), |
210 | 210 |
A2_CACHE_ENABLED=Setting(default=True, definition='Disable all cache decorators for testing purpose.'), |
211 | 211 |
A2_ACCEPT_EMAIL_AUTHENTICATION=Setting(default=True, definition='Enable authentication by email'), |
212 |
A2_USERS_CSV_IMPORT_APP_ID_LABEL=Setting(default='app_id', definition='Label of the applicative identifier used when importing users from csv'), |
|
212 | 213 | |
213 | 214 |
) |
214 | 215 |
src/authentic2/data_transfer.py | ||
---|---|---|
1 |
import csv |
|
2 |
import logging |
|
3 | ||
4 |
from django.contrib.auth import get_user_model |
|
1 | 5 |
from django.contrib.contenttypes.models import ContentType |
6 |
from django.db import transaction |
|
2 | 7 | |
3 | 8 |
from django_rbac.models import Operation |
4 | 9 |
from django_rbac.utils import ( |
5 | 10 |
get_ou_model, get_role_model, get_role_parenting_model, get_permission_model) |
6 | 11 |
from authentic2.a2_rbac.models import RoleAttribute |
12 |
from authentic2 import app_settings |
|
13 |
from authentic2.models import UserExternalId, Attribute |
|
14 | ||
15 |
logger = logging.getLogger(__name__) |
|
7 | 16 | |
8 | 17 | |
9 | 18 |
def update_model(obj, d): |
... | ... | |
318 | 327 |
import_context.role_delete_orphans)) |
319 | 328 | |
320 | 329 |
return result |
330 | ||
331 | ||
332 |
def create_user_from_data(user_data): |
|
333 |
User = get_user_model() |
|
334 |
user = User.objects.create() |
|
335 |
for name, value in user_data.items(): |
|
336 |
if hasattr(user, name): |
|
337 |
setattr(user, name, value) |
|
338 |
elif hasattr(user.attributes, name): |
|
339 |
attribute = Attribute.objects.get(name=name) |
|
340 |
deserialize = attribute.get_kind()['deserialize'] |
|
341 |
try: |
|
342 |
value = deserialize(value) |
|
343 |
except: |
|
344 |
logger.error( |
|
345 |
'invalid value %s for attribute %s' % (value, name)) |
|
346 |
continue |
|
347 |
attribute.set_value(owner=user, value=value) |
|
348 |
user.save() |
|
349 |
return user |
|
350 | ||
351 | ||
352 |
def import_users_from_csv(csv_file, ou): |
|
353 |
reader = csv.reader(csv_file) # XXX dialect |
|
354 |
rows = list(reader) |
|
355 |
app_id_label = app_settings.A2_USERS_CSV_IMPORT_APP_ID_LABEL |
|
356 | ||
357 |
rows[0].index(app_id_label) |
|
358 | ||
359 |
with transaction.atomic(): |
|
360 |
User = get_user_model() |
|
361 |
for i in range(1,len(rows)): |
|
362 |
user_data = dict(zip(rows[0], rows[i])) |
|
363 |
user_data.update({'ou': ou}) |
|
364 |
app_id = user_data.pop(app_id_label) |
|
365 |
user = create_user_from_data(user_data) |
|
366 |
user_external_id = UserExternalId.objects.create( |
|
367 |
user=user, external_id=app_id, source='import') |
|
368 | ||
369 |
return len(rows)-1 |
src/authentic2/manager/forms.py | ||
---|---|---|
694 | 694 | |
695 | 695 |
class Meta: |
696 | 696 |
fields = () |
697 | ||
698 | ||
699 |
class UsersCsvImportForm(forms.Form): |
|
700 |
csv_file = forms.FileField( |
|
701 |
label=_('CSV file'), |
|
702 |
help_text=_('CSV file for the import.') |
|
703 |
) |
|
704 |
ou = forms.ChoiceField( |
|
705 |
label=_('Organizational unit'), |
|
706 |
help_text=_('The newly created users will be registered to the chosen OU.'), |
|
707 |
choices=utils.get_ou_choices(), |
|
708 |
initial=get_default_ou().uuid |
|
709 |
) |
src/authentic2/manager/templates/authentic2/manager/users.html | ||
---|---|---|
20 | 20 |
{% trans "Add user" %} |
21 | 21 |
</a> |
22 | 22 |
{% endif %} |
23 |
{% if import_users %} |
|
24 |
<a |
|
25 |
href="{% url "a2-manager-users-import" %}" |
|
26 |
id="import-users-btn"> |
|
27 |
{% trans "Import users from csv" %} |
|
28 |
</a> |
|
29 |
{% else %} |
|
30 |
<a |
|
31 |
href="#" |
|
32 |
class="disabled" |
|
33 |
id="import-users-btn"> |
|
34 |
{% trans "Import users from csv" %} |
|
35 |
</a> |
|
36 |
{% endif %} |
|
23 | 37 |
</span> |
24 | 38 |
{% endblock %} |
25 | 39 |
src/authentic2/manager/templates/authentic2/manager/users_csv_import.html | ||
---|---|---|
1 |
{% extends "authentic2/manager/form.html" %} |
|
2 |
{% load i18n %} |
|
3 | ||
4 |
{% block page-title %} |
|
5 |
{{ block.super }} - {{ view.title }} |
|
6 |
{% endblock %} |
|
7 | ||
8 |
{% block breadcrumb %} |
|
9 |
{{ block.super }} |
|
10 |
<a href="..">{% trans "Users" %}</a> |
|
11 |
<a href="">{{ view.title }}</a> |
|
12 |
{% endblock %} |
|
13 | ||
14 |
{% block content %} |
|
15 |
{% trans "Comma-separated value file with named columns:" %} |
|
16 |
<ul> |
|
17 |
<li>{{ app_id_label }}</li> |
|
18 |
{% for attr in attributes %} |
|
19 |
<li>{{ attr.name }}</li> |
|
20 |
{% endfor %} |
|
21 |
</ul> |
|
22 | ||
23 |
<form enctype="multipart/form-data" method="post"> |
|
24 | ||
25 |
{% csrf_token %} |
|
26 |
{{ form.as_p }} |
|
27 |
{% if form.instance and form.instance.id %} |
|
28 |
<button class="submit-button">{% trans "Submit" %}</button> |
|
29 |
<button class="cancel-button" name="cancel" formnovalidate>{% trans "Cancel" %}</button> |
|
30 |
{% else %} |
|
31 |
<button class="submit-button">{% trans "Create" %}</button> |
|
32 |
{% endif %} |
|
33 |
</form> |
|
34 |
{% endblock %} |
src/authentic2/manager/urls.py | ||
---|---|---|
21 | 21 |
url(r'^users/$', user_views.users, name='a2-manager-users'), |
22 | 22 |
url(r'^users/export/(?P<format>csv)/$', |
23 | 23 |
user_views.users_export, name='a2-manager-users-export'), |
24 |
url(r'^users/import/$', |
|
25 |
user_views.users_csv_import, name='a2-manager-users-import'), |
|
24 | 26 |
url(r'^users/add/$', user_views.user_add_default_ou, |
25 | 27 |
name='a2-manager-user-add-default-ou'), |
26 | 28 |
url(r'^users/(?P<ou_pk>\d+)/add/$', user_views.user_add, |
src/authentic2/manager/user_views.py | ||
---|---|---|
15 | 15 |
from django.contrib import messages |
16 | 16 |
from django.http import HttpResponseRedirect, QueryDict |
17 | 17 |
from django.views.generic.detail import SingleObjectMixin |
18 |
from django.views.generic import View |
|
18 |
from django.views.generic import View, FormView
|
|
19 | 19 | |
20 | 20 |
from import_export.fields import Field |
21 | 21 |
import tablib |
... | ... | |
24 | 24 |
from authentic2.models import Attribute, AttributeValue, PasswordReset |
25 | 25 |
from authentic2.utils import switch_user, send_password_reset_mail, redirect, select_next_url |
26 | 26 |
from authentic2.a2_rbac.utils import get_default_ou |
27 |
from authentic2 import hooks |
|
27 |
from authentic2 import hooks, app_settings as a2_app_settings
|
|
28 | 28 |
from django_rbac.utils import get_role_model, get_role_parenting_model, get_ou_model |
29 | 29 | |
30 | 30 | |
... | ... | |
33 | 33 |
BaseSubTableView, HideOUColumnMixin, BaseDeleteView, BaseDetailView |
34 | 34 |
from .tables import UserTable, UserRolesTable, OuUserRolesTable |
35 | 35 |
from .forms import (UserSearchForm, UserAddForm, UserEditForm, |
36 |
UserChangePasswordForm, ChooseUserRoleForm, UserRoleSearchForm, UserChangeEmailForm) |
|
36 |
UserChangePasswordForm, ChooseUserRoleForm, UserRoleSearchForm, UserChangeEmailForm, |
|
37 |
UsersCsvImportForm) |
|
37 | 38 |
from .resources import UserResource |
38 | 39 |
from .utils import get_ou_count |
39 | 40 |
from . import app_settings |
... | ... | |
88 | 89 |
ou = self.search_form.cleaned_data.get('ou') |
89 | 90 |
if ou and self.request.user.has_ou_perm('custom_user.add_user', ou): |
90 | 91 |
ctx['add_ou'] = ou |
92 |
if self.request.user.has_perm('custom_user.add_user'): |
|
93 |
ctx['import_users'] = True |
|
91 | 94 |
return ctx |
92 | 95 | |
93 | 96 | |
... | ... | |
603 | 606 | |
604 | 607 | |
605 | 608 |
user_delete = UserDeleteView.as_view() |
609 | ||
610 | ||
611 |
class UsersCsvImportView(FormView): |
|
612 |
form_class = UsersCsvImportForm |
|
613 |
template_name = 'authentic2/manager/users_csv_import.html' |
|
614 |
title = _('CSV import') |
|
615 |
success_url = '..' |
|
616 |
permissions = ['custom_user.add_user'] |
|
617 | ||
618 |
def get_context_data(self, **kwargs): |
|
619 |
from authentic2.models import Attribute |
|
620 | ||
621 |
ctx = super(UsersCsvImportView, self).get_context_data() |
|
622 |
ctx.update({ |
|
623 |
'attributes': Attribute.objects.all(), |
|
624 |
'app_id_label': a2_app_settings.A2_USERS_CSV_IMPORT_APP_ID_LABEL |
|
625 |
}) |
|
626 |
return ctx |
|
627 | ||
628 |
def form_valid(self, form): |
|
629 |
from authentic2.data_transfer import import_users_from_csv |
|
630 | ||
631 |
ou_uuid = form.cleaned_data.get('ou') |
|
632 |
ou = get_ou_model().objects.get(uuid=ou_uuid) or get_default_ou() |
|
633 |
csv_file = form.cleaned_data.get('csv_file') |
|
634 | ||
635 |
if not csv_file: |
|
636 |
messages.warning( |
|
637 |
self.request, |
|
638 |
_('No import performed. Please provide a csv file.') |
|
639 |
) |
|
640 |
else: |
|
641 |
try: |
|
642 |
nb_created = import_users_from_csv(csv_file, ou) |
|
643 |
except ValueError as e: |
|
644 |
messages.error( |
|
645 |
self.request, |
|
646 |
_('Value error while importing the users: %s') % e |
|
647 |
) |
|
648 |
else: |
|
649 |
messages.info( |
|
650 |
self.request, |
|
651 |
_('Created %s users.') % nb_created |
|
652 |
) |
|
653 |
return super(UsersCsvImportView, self).form_valid(form) |
|
654 | ||
655 | ||
656 |
users_csv_import = UsersCsvImportView.as_view() |
src/authentic2/manager/utils.py | ||
---|---|---|
26 | 26 |
@GlobalCache(timeout=10) |
27 | 27 |
def get_ou_count(): |
28 | 28 |
return get_ou_model().objects.count() |
29 | ||
30 | ||
31 |
@GlobalCache(timeout=10) |
|
32 |
def get_ou_choices(): |
|
33 |
Ou = get_ou_model() |
|
34 |
return [(ou.uuid, ou.name) for ou in Ou.objects.all()] |
tests/test_data_transfer.py | ||
---|---|---|
1 |
from django.contrib.auth import get_user_model |
|
1 | 2 |
from django_rbac.utils import get_role_model, get_ou_model |
2 | 3 |
import pytest |
4 |
import csv |
|
3 | 5 | |
4 | 6 |
from authentic2.a2_rbac.models import RoleParenting |
5 | 7 |
from authentic2.data_transfer import ( |
... | ... | |
12 | 14 |
ImportContext, |
13 | 15 |
RoleDeserializer, |
14 | 16 |
search_role, |
15 |
import_ou) |
|
17 |
import_ou, |
|
18 |
import_users_from_csv) |
|
19 |
from authentic2.models import UserExternalId, Attribute |
|
16 | 20 |
from authentic2.utils import get_hex_uuid |
17 | 21 | |
18 | 22 | |
... | ... | |
523 | 527 |
d = export_site(ExportContext(export_ous=False)) |
524 | 528 |
assert 'ous' not in d |
525 | 529 | |
530 | ||
531 |
def test_import_users_from_csv(db): |
|
532 |
ou = OU.objects.create(name='ou') |
|
533 |
User = get_user_model() |
|
534 |
assert not len(User.objects.all()) |
|
535 | ||
536 |
with open('tests/users_csv_import_testfile.csv', 'r') as csv_file: |
|
537 |
assert import_users_from_csv(csv_file, ou) == 2 |
|
538 | ||
539 |
with open('tests/users_csv_import_testfile.csv', 'r') as csv_file: |
|
540 |
reader = csv.reader(csv_file) |
|
541 |
rows = list(reader) |
|
542 |
assert len(User.objects.all()) == len(rows)-1 |
|
543 |
for i in range(2): |
|
544 |
user = User.objects.all()[i] |
|
545 |
assert user.first_name == rows[i+1][1].decode('utf-8') |
|
546 |
assert user.last_name == rows[i+1][2].decode('utf-8') |
|
547 |
assert user.ou == ou |
|
548 |
external_id = UserExternalId.objects.get(user=user) |
|
549 |
assert external_id.source == 'import' |
|
550 |
assert external_id.external_id == rows[i+1][0].decode('utf-8') |
|
551 | ||
552 | ||
553 |
def test_import_users_from_csv_profile_attributes(db): |
|
554 |
Attribute.objects.create( |
|
555 |
label='Nickname', |
|
556 |
name='nickname', |
|
557 |
kind='string', |
|
558 |
user_visible=True, |
|
559 |
user_editable=True |
|
560 |
) |
|
561 |
Attribute.objects.create( |
|
562 |
label='Is a spy', |
|
563 |
name='is_a_spy', |
|
564 |
kind='boolean', |
|
565 |
user_visible=True, |
|
566 |
user_editable=True |
|
567 |
) |
|
568 |
Attribute.objects.create( |
|
569 |
label='Enrolled on', |
|
570 |
name='enrolled_on', |
|
571 |
kind='date', |
|
572 |
user_visible=True, |
|
573 |
user_editable=True |
|
574 |
) |
|
575 |
Attribute.objects.create( |
|
576 |
label='Secret phone number', |
|
577 |
name='secret_phone_number', |
|
578 |
kind='phone_number', |
|
579 |
user_visible=True, |
|
580 |
user_editable=True |
|
581 |
) |
|
582 |
Attribute.objects.create( |
|
583 |
label='Headquarters postcode', |
|
584 |
name='headquarters_postcode', |
|
585 |
kind='fr_postcode', |
|
586 |
user_visible=True, |
|
587 |
user_editable=True |
|
588 |
) |
|
589 | ||
590 |
ou = OU.objects.create(name='ou') |
|
591 |
User = get_user_model() |
|
592 |
assert not len(User.objects.all()) |
|
593 | ||
594 |
with open('tests/users_csv_import_testfile2.csv', 'r') as csv_file: |
|
595 |
assert import_users_from_csv(csv_file, ou) == 2 |
|
596 | ||
597 |
with open('tests/users_csv_import_testfile2.csv', 'r') as csv_file: |
|
598 |
reader = csv.reader(csv_file) |
|
599 |
rows = list(reader) |
|
600 |
assert len(User.objects.all()) == len(rows)-1 |
|
601 |
for i in range(2): |
|
602 |
user = User.objects.all()[i] |
|
603 |
assert user.attributes.nickname == rows[i+1][3].decode('utf-8') |
|
604 |
assert user.attributes.is_a_spy == bool(int(rows[i+1][4].decode('utf-8'))) |
|
605 |
assert user.attributes.enrolled_on.isoformat() == rows[i+1][5].decode('utf-8') |
|
606 |
assert user.attributes.secret_phone_number == rows[i+1][6].decode('utf-8') |
|
607 |
assert user.attributes.headquarters_postcode == rows[i+1][7].decode('utf-8') |
tests/users_csv_import_testfile.csv | ||
---|---|---|
1 |
app_id,first_name,last_name |
|
2 |
123,Hervé,Dupont |
|
3 |
456,René,Durant |
tests/users_csv_import_testfile2.csv | ||
---|---|---|
1 |
app_id,first_name,last_name,nickname,is_a_spy,enrolled_on,secret_phone_number,headquarters_postcode |
|
2 |
123,Hervé,Dupont,Herbie,1,2006-06-06,+12345,75001 |
|
3 |
456,René,Durant,Ronnie,0,2007-07-07,+12356,13001 |
|
0 |
- |