0001-manager-users-csv-import.patch
MANIFEST.in | ||
---|---|---|
55 | 55 |
include doc/pictures/* |
56 | 56 |
include COPYING NEWS README AUTHORS.txt |
57 | 57 |
include src/authentic2/auth2_auth/auth2_ssl/authentic_ssl.vhost |
58 |
include media/misc/users_csv_import_example.csv |
|
58 | 59 |
include getlasso.sh |
59 | 60 |
include getlasso3.sh |
60 | 61 |
include src/authentic2/nonce/README.rst |
media/misc/users_csv_import_example.csv | ||
---|---|---|
1 |
source_domain,source_id,first_name,last_name,username,email,ou_slug |
|
2 |
domain.nowhere.null,123,Hervé,Dupont,hdup,foo@bar.null,foo |
|
3 |
domain.nowhere.null,456,René,Durant,rdur,boo@far.null,bar |
src/authentic2/data_transfer.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import logging |
|
18 | ||
19 |
from django.contrib.auth import get_user_model |
|
17 | 20 |
from django.contrib.contenttypes.models import ContentType |
21 |
from django.db import transaction |
|
18 | 22 | |
19 | 23 |
from django_rbac.models import Operation |
20 | 24 |
from django_rbac.utils import ( |
21 | 25 |
get_ou_model, get_role_model, get_role_parenting_model, get_permission_model) |
22 | 26 |
from authentic2.a2_rbac.models import RoleAttribute |
27 |
from authentic2.models import Attribute, UserExternalId |
|
23 | 28 | |
24 | 29 | |
25 | 30 |
def update_model(obj, d): |
... | ... | |
334 | 339 |
import_context.role_delete_orphans)) |
335 | 340 | |
336 | 341 |
return result |
342 | ||
343 | ||
344 |
def import_users_from_csv_rows(rows, configuration, attributes_configuration, |
|
345 |
dry_run=False, erase=False): |
|
346 |
""" |
|
347 |
This is the backend csv import function called by the users csv import |
|
348 |
management form. |
|
349 | ||
350 |
This function takes as mandatory input (i) the csv rows (i.e. a list of |
|
351 |
lists, each one of the latter representing a split csv line), whose first |
|
352 |
row must be the named columns of the csv ; (ii) the column names for the |
|
353 |
user-attributes ; and (iii) |
|
354 | ||
355 |
If the `dry_run` optional argument is set, the import will be simulated |
|
356 |
only. |
|
357 | ||
358 |
If the `erase`, the presence of csv entries mapping to already-existing |
|
359 |
users will result in the deletion of such user accounts. Newly-created user- |
|
360 |
accounts will be replacing them. |
|
361 | ||
362 |
The `results` dictionary object returned by this function provides global as |
|
363 |
well as ou- and source-wise metrics about the import process. |
|
364 |
""" |
|
365 | ||
366 |
logger = logging.getLogger(__name__) |
|
367 | ||
368 |
class UsersCsvImportDryRun(Exception): |
|
369 |
""" |
|
370 |
Will be used to simulate the user import. If the import is simulated, |
|
371 |
this exception at the end of the transaction. |
|
372 |
""" |
|
373 |
pass |
|
374 | ||
375 |
def get_ou(configuration, user_data): |
|
376 |
""" |
|
377 |
Retrieve the organizational unit of the user depending on its entry in |
|
378 |
the csv file. If the entry doesn't mention any valid OU, the (mandatory) |
|
379 |
fallback OU is chosen. |
|
380 |
""" |
|
381 |
OrganizationalUnit = get_ou_model() |
|
382 |
ou_column_name = configuration.get('ou_column_name', '') |
|
383 |
ou = None |
|
384 |
qs_keyword = None |
|
385 | ||
386 |
if ou_column_name is not None: |
|
387 |
for candidate_keyword in ('id', 'name', 'slug'): |
|
388 |
if ou_column_name.endswith(candidate_keyword): |
|
389 |
qs_keyword = candidate_keyword |
|
390 |
break |
|
391 | ||
392 |
if qs_keyword: |
|
393 |
qs_filter = { |
|
394 |
qs_keyword: user_data.get(configuration.get('ou_column_name')) |
|
395 |
} |
|
396 |
try: |
|
397 |
ou = OrganizationalUnit.objects.get(**qs_filter) |
|
398 |
except (OrganizationalUnit.DoesNotExist, |
|
399 |
OrganizationalUnit.MultipleObjectsReturned): |
|
400 |
logger.error('No OU found for filter %s' % qs_filter) |
|
401 |
pass |
|
402 | ||
403 |
if not ou: |
|
404 |
ou = OrganizationalUnit.objects.get(id=configuration['fallback_ou']) |
|
405 |
return ou |
|
406 | ||
407 |
def create_user_from_data(user_data, attributes_configuration, ou, |
|
408 |
source_id, source_domain): |
|
409 |
""" |
|
410 |
Create the user according to the csv entry line, the organizational |
|
411 |
and user-attribute configuration defined in the users management |
|
412 |
csv import form. |
|
413 |
""" |
|
414 |
User = get_user_model() |
|
415 |
user = User.objects.create() |
|
416 |
user_attributes = {} |
|
417 |
for key, value in attributes_configuration.items(): |
|
418 |
if not value: |
|
419 |
continue |
|
420 |
if hasattr(user, key): |
|
421 |
setattr(user, key, user_data.get(value)) |
|
422 |
elif hasattr(user.attributes, key): |
|
423 |
attribute = Attribute.objects.get(name=key) |
|
424 |
deserialize = attribute.get_kind()['deserialize'] |
|
425 |
try: |
|
426 |
value = deserialize(user_data.get(value)) |
|
427 |
except: |
|
428 |
logger.error( |
|
429 |
'invalid value %s for attribute %s' % ( |
|
430 |
user_data.get(value), key)) |
|
431 |
continue |
|
432 |
attribute.set_value(owner=user, value=value) |
|
433 |
user.ou = ou |
|
434 |
user.save() |
|
435 |
return user |
|
436 | ||
437 |
def update_results(results, ou, source, modified_entry): |
|
438 |
# update ou local results |
|
439 |
if ou.slug not in results['by_ou']: |
|
440 |
results['by_ou'][ou.slug] = {'name': ou.name} |
|
441 |
counter = results['by_ou'][ou.slug].get(modified_entry, 0) |
|
442 |
results['by_ou'][ou.slug].update({modified_entry: counter+1}) |
|
443 |
# update source local results |
|
444 |
if source not in results['by_source']: |
|
445 |
results['by_source'][source] = {} |
|
446 |
counter = results['by_source'][source].get(modified_entry, 0) |
|
447 |
results['by_source'][source].update({modified_entry: counter+1}) |
|
448 |
# update global results |
|
449 |
results[modified_entry] = results[modified_entry]+1 |
|
450 | ||
451 |
results = { |
|
452 |
# global results |
|
453 |
'nb_created': 0, 'nb_erased': 0, 'nb_ignored': 0, |
|
454 |
# local results |
|
455 |
'by_ou': {}, 'by_source': {}, |
|
456 |
# error message for rendering in template |
|
457 |
'error': None, |
|
458 |
} |
|
459 | ||
460 |
try: |
|
461 | ||
462 |
source_id_column = configuration.get('source_id_column_name') |
|
463 |
source_domain_column = configuration.get('source_domain_column_name') |
|
464 | ||
465 |
if not (source_id_column and source_domain_column): |
|
466 |
error_msg = 'Missing external id or source domain column names.' |
|
467 |
logger.warning(error_msg) |
|
468 |
results['error'] = error_msg |
|
469 |
return results |
|
470 | ||
471 |
with transaction.atomic(): |
|
472 |
User = get_user_model() |
|
473 |
for row in rows[1:]: |
|
474 |
user_data = dict(zip(rows[0], row)) |
|
475 |
ou = get_ou(configuration, user_data) |
|
476 | ||
477 |
external_id = user_data.get(source_id_column) |
|
478 |
source_domain = user_data.get(source_domain_column) |
|
479 | ||
480 |
if not (external_id and source_domain): |
|
481 |
logger.warning('missing external id or source domain for ' |
|
482 |
'user %r' % user_data) |
|
483 |
continue |
|
484 | ||
485 |
try: |
|
486 |
user_external_id = UserExternalId.objects.get( |
|
487 |
external_id=user_data.get(source_id_column), |
|
488 |
source=user_data.get(source_domain_column)) |
|
489 |
except UserExternalId.MultipleObjectsReturned: |
|
490 |
logger.error('''multiple user external ids returned for |
|
491 |
external id value '%s' and source %s.''' % ( |
|
492 |
external_id, source_domain)) |
|
493 |
results['nb_ignored'] = results['nb_ignored']+1 |
|
494 |
continue |
|
495 |
except UserExternalId.DoesNotExist: |
|
496 |
exists = False |
|
497 |
else: |
|
498 |
exists = True |
|
499 | ||
500 |
if erase: |
|
501 |
erased_user_ou = user_external_id.user.ou |
|
502 |
user_external_id.user.delete() |
|
503 |
user_external_id.delete() |
|
504 | ||
505 |
update_results(results, erased_user_ou, source_domain, 'nb_erased') |
|
506 | ||
507 |
if erase or not exists: |
|
508 |
user = create_user_from_data( |
|
509 |
user_data, attributes_configuration, ou, |
|
510 |
external_id, source_domain) |
|
511 |
UserExternalId.objects.get_or_create( |
|
512 |
user=user, external_id=external_id, |
|
513 |
source=source_domain) |
|
514 | ||
515 |
update_results(results, ou, source_domain, 'nb_created') |
|
516 | ||
517 |
if exists and not erase: |
|
518 |
update_results(results, ou, source_domain, 'nb_ignored') |
|
519 | ||
520 |
if dry_run: |
|
521 |
raise UsersCsvImportDryRun() |
|
522 | ||
523 |
except UsersCsvImportDryRun as e: |
|
524 |
logger.info('Performed Import. Now cancelling the database transaction') |
|
525 |
pass |
|
526 |
except Exception as e: |
|
527 |
results['error'] = error = 'Error while performing the import: %s' % e |
|
528 |
logger.error(error) |
|
529 | ||
530 |
return results |
src/authentic2/manager/forms.py | ||
---|---|---|
36 | 36 |
from django_rbac.backends import DjangoRBACBackend |
37 | 37 | |
38 | 38 |
from authentic2.forms.profile import BaseUserForm |
39 |
from authentic2.models import PasswordReset |
|
39 |
from authentic2.models import PasswordReset, Attribute
|
|
40 | 40 |
from authentic2.utils import import_module_or_class |
41 | 41 |
from authentic2.a2_rbac.utils import get_default_ou |
42 | 42 |
from authentic2.utils import send_password_reset_mail, send_email_change_email |
... | ... | |
715 | 715 |
class SiteImportForm(forms.Form): |
716 | 716 |
site_json = forms.FileField( |
717 | 717 |
label=_('Site Export File')) |
718 | ||
719 | ||
720 |
class UsersCsvImportUploadFileForm(forms.Form): |
|
721 |
csv_file = forms.FileField( |
|
722 |
label=_('CSV file'), |
|
723 |
help_text=_('CSV file for the import.')) |
|
724 | ||
725 | ||
726 |
class UsersCsvImportConfigureForm(forms.Form): |
|
727 |
ou_column_name = forms.CharField( |
|
728 |
label=_('Organizational unit column name'), |
|
729 |
max_length=63, required=False) |
|
730 |
fallback_ou = forms.ChoiceField( |
|
731 |
label=_('Fallback organizational unit'), |
|
732 |
help_text=_('''The newly created users will be registered to this chosen |
|
733 |
fallback OU if their entry line do not mention any OU.'''), |
|
734 |
choices=utils.get_ou_choices) |
|
735 |
source_domain_column_name = forms.CharField( |
|
736 |
label=_('Source domain column name'), |
|
737 |
max_length=63) |
|
738 |
source_id_column_name = forms.CharField( |
|
739 |
label=_('Source identifier column name'), |
|
740 |
max_length=63) |
|
741 | ||
742 | ||
743 |
class UsersCsvImportConfigureAttributesForm(forms.Form): |
|
744 |
first_name = forms.CharField(label=_('First name'), max_length=63, |
|
745 |
required=False) |
|
746 |
last_name = forms.CharField(label=_('Last name'), max_length=63, |
|
747 |
required=False) |
|
748 |
username = forms.CharField(label=_('Username'), max_length=63, |
|
749 |
required=False) |
|
750 |
email = forms.CharField(label=_('Email address'), max_length=63, |
|
751 |
required=False) |
|
752 |
email_verified = forms.CharField( |
|
753 |
label=_('Has a verified email address'), max_length=63, required=False) |
|
754 |
is_staff = forms.CharField(label=_('Is staff'), max_length=63, |
|
755 |
required=False) |
|
756 |
is_active = forms.CharField(label=_('Is active'), max_length=63, |
|
757 |
required=False) |
|
758 | ||
759 |
def __init__(self, *args, **kwargs): |
|
760 |
""" |
|
761 |
TODO: provide more subtlety in initial value retrieval |
|
762 |
""" |
|
763 |
header = kwargs.pop('header', []) |
|
764 |
super(UsersCsvImportConfigureAttributesForm, self).__init__(*args, **kwargs) |
|
765 | ||
766 |
# provide initial for standard attribute fields |
|
767 |
for entry in header: |
|
768 |
if self.fields.get(entry): |
|
769 |
self.fields[entry].initial = entry |
|
770 | ||
771 |
# instantiate extended profile attribute fields |
|
772 |
for attribute in Attribute.objects.all(): |
|
773 |
if self.fields.get(attribute.name): |
|
774 |
# avoid attribute redundancy introduced by default in authentic |
|
775 |
continue |
|
776 |
field = self.fields['%s' % attribute.name] = forms.CharField( |
|
777 |
label=attribute.label, max_length=63, required=False) |
|
778 |
if not header: |
|
779 |
# no way to provide any initial value for this field |
|
780 |
continue |
|
781 |
if attribute.name in header: # XXX too simplistical |
|
782 |
field.initial = attribute.name |
src/authentic2/manager/templates/authentic2/manager/users.html | ||
---|---|---|
20 | 20 |
{% trans "Add user" %} |
21 | 21 |
</a> |
22 | 22 |
{% endif %} |
23 |
{% if import_users %} |
|
24 |
<a |
|
25 |
href="{% url "a2-manager-users-csv-import-upload-file" %}" |
|
26 |
id="import-users-btn"> |
|
27 |
{% trans "Import users from csv" %} |
|
28 |
</a> |
|
29 |
{% else %} |
|
30 |
<a |
|
31 |
href="#" |
|
32 |
class="disabled" |
|
33 |
id="import-users-btn"> |
|
34 |
{% trans "Import users from csv" %} |
|
35 |
</a> |
|
36 |
{% endif %} |
|
23 | 37 |
</span> |
24 | 38 |
{% endblock %} |
25 | 39 |
src/authentic2/manager/templates/authentic2/manager/users_csv_import_configure.html | ||
---|---|---|
1 |
{% extends "authentic2/manager/form.html" %} |
|
2 |
{% load i18n %} |
|
3 | ||
4 |
{% block page-title %} |
|
5 |
{{ block.super }} - {{ view.title }} |
|
6 |
{% endblock %} |
|
7 | ||
8 |
{% block breadcrumb %} |
|
9 |
{{ block.super }} |
|
10 |
<a href="../..">{% trans "Users" %}</a> |
|
11 |
<a href="">{{ view.title }}</a> |
|
12 |
{% endblock %} |
|
13 | ||
14 |
{% block content %} |
|
15 |
<p>{% blocktrans %}{{ nb_lines }} user account data lines in input file.{% endblocktrans %}</p> |
|
16 |
<form enctype="multipart/form-data" method="post"> |
|
17 |
{% csrf_token %} |
|
18 |
<p>{% trans "Please fill the csv column names for the organization information:" %}</p> |
|
19 |
{{ form.as_p }} |
|
20 |
<button class="submit-button">{% trans "Submit" %}</button> |
|
21 |
</form> |
|
22 |
{% endblock %} |
src/authentic2/manager/templates/authentic2/manager/users_csv_import_configure_attributes.html | ||
---|---|---|
1 |
{% extends "authentic2/manager/form.html" %} |
|
2 |
{% load i18n %} |
|
3 | ||
4 |
{% block page-title %} |
|
5 |
{{ block.super }} - {{ view.title }} |
|
6 |
{% endblock %} |
|
7 | ||
8 |
{% block breadcrumb %} |
|
9 |
{{ block.super }} |
|
10 |
<a href="../..">{% trans "Users" %}</a> |
|
11 |
<a href="">{{ view.title }}</a> |
|
12 |
{% endblock %} |
|
13 | ||
14 |
{% block content %} |
|
15 |
<form enctype="multipart/form-data" method="post"> |
|
16 |
{% csrf_token %} |
|
17 |
<p>{% trans "Please fill the column names for the following user attributes:" %}</p> |
|
18 |
{{ form.as_p }} |
|
19 |
<p>{% trans "An empty field means that the corresponding attribute won't be provisioned." %}</p> |
|
20 |
<button class="submit-button">{% trans "Submit" %}</button> |
|
21 |
</form> |
|
22 |
{% endblock %} |
src/authentic2/manager/templates/authentic2/manager/users_csv_import_perform.html | ||
---|---|---|
1 |
{% extends "authentic2/manager/form.html" %} |
|
2 |
{% load i18n %} |
|
3 |
{% load authentic2 %} |
|
4 | ||
5 |
{% block page-title %} |
|
6 |
{{ block.super }} - {{ view.title }} |
|
7 |
{% endblock %} |
|
8 | ||
9 |
{% block breadcrumb %} |
|
10 |
{{ block.super }} |
|
11 |
<a href="../..">{% trans "Users" %}</a> |
|
12 |
<a href="">{{ view.title }}</a> |
|
13 |
{% endblock %} |
|
14 | ||
15 |
{% block content %} |
|
16 |
{% if results|get_item:"nb_created" or results|get_item:"nb_erased" or results|get_item:"nb_ignored" %} |
|
17 |
<div id="global" class="import-results"> |
|
18 |
<h2>{% trans "Global results :" %}</h2> |
|
19 |
<ul> |
|
20 |
{% include "authentic2/manager/users_csv_import_perform_include.html" with results=results %} |
|
21 |
</ul> |
|
22 |
</div> |
|
23 |
{% if results.by_ou.items %} |
|
24 |
<div id="by-ou" class="import-results"> |
|
25 |
<p>{% trans "Results by organizational unit:" %}</p> |
|
26 |
<ul> |
|
27 |
{% for ou, values in results.by_ou.items %} |
|
28 |
<li>{{ values|get_item:"name" }} |
|
29 |
<ul> |
|
30 |
{% include "authentic2/manager/users_csv_import_perform_include.html" with results=values %} |
|
31 |
</ul> |
|
32 |
</li> |
|
33 |
{% endfor %} |
|
34 |
</ul> |
|
35 |
</div> |
|
36 |
{% endif %} |
|
37 |
{% if results.by_source.items %} |
|
38 |
<div id="by-source" class="import-results"> |
|
39 |
<span>{% trans "Results by source:" %}</span> |
|
40 |
<ul> |
|
41 |
{% for source, values in results.by_source.items %} |
|
42 |
<li>{{ source }} |
|
43 |
<ul> |
|
44 |
{% include "authentic2/manager/users_csv_import_perform_include.html" with results=values %} |
|
45 |
</ul> |
|
46 |
</li> |
|
47 |
{% endfor %} |
|
48 |
</div> |
|
49 |
{% endif %} |
|
50 |
<a class="button" href="{% url "a2-manager-users" %}">{% trans "Go back to the users page" %}</a> |
|
51 |
{% else %} |
|
52 |
<div id="global" class="import-results"> |
|
53 |
{% trans "No import operation has been performed." %} |
|
54 |
{% if results|get_item:"error" %}{% blocktrans with error=results|get_item:"error" %}Reason: {{ error }}{% endblocktrans %} {% endif %} |
|
55 |
{% endif %} |
|
56 |
{% endblock %} |
src/authentic2/manager/templates/authentic2/manager/users_csv_import_perform_include.html | ||
---|---|---|
1 |
{% load i18n %} |
|
2 |
{% load authentic2 %} |
|
3 | ||
4 |
{% if results|get_item:"nb_created" %} |
|
5 |
<li>{% blocktrans with counter=results|get_item:"nb_created" plural=results|get_item:"nb_created"|pluralize %}{{ counter }} user{{ plural }} have been created.{% endblocktrans %}</li> |
|
6 |
{% endif %} |
|
7 | ||
8 |
{% if results|get_item:"nb_erased" %} |
|
9 |
<li>{% blocktrans with counter=results|get_item:"nb_erased" plural=results|get_item:"nb_erased"|pluralize %}{{ counter }} user{{ plural }} have been erased.{% endblocktrans %}</li> |
|
10 |
{% endif %} |
|
11 | ||
12 |
{% if results|get_item:"nb_ignored" %} |
|
13 |
<li>{% blocktrans with counter=results|get_item:"nb_ignored" plural=results|get_item:"nb_ignored"|pluralize %}{{ counter }} user{{ plural }} have been ignored.{% endblocktrans %}</li> |
|
14 |
{% endif %} |
src/authentic2/manager/templates/authentic2/manager/users_csv_import_simulate.html | ||
---|---|---|
1 |
{% extends "authentic2/manager/form.html" %} |
|
2 |
{% load i18n %} |
|
3 |
{% load authentic2 %} |
|
4 | ||
5 |
{% block page-title %} |
|
6 |
{{ block.super }} - {{ view.title }} |
|
7 |
{% endblock %} |
|
8 | ||
9 |
{% block breadcrumb %} |
|
10 |
{{ block.super }} |
|
11 |
<a href="../..">{% trans "Users" %}</a> |
|
12 |
<a href="">{{ view.title }}</a> |
|
13 |
{% endblock %} |
|
14 | ||
15 |
{% block content %} |
|
16 |
{% if results|get_item:"nb_created" or results|get_item:"nb_erased" or results|get_item:"nb_ignored" %} |
|
17 |
<div id="global" class="import-results"> |
|
18 |
<h2>{% trans "Global results :" %}</h2> |
|
19 |
<ul> |
|
20 |
{% include "authentic2/manager/users_csv_import_simulate_include.html" with results=results %} |
|
21 |
</ul> |
|
22 |
</div> |
|
23 |
{% if results.by_ou.items %} |
|
24 |
<div id="by-ou" class="import-results"> |
|
25 |
<h2>{% trans "Results by organizational unit:" %}</h2> |
|
26 |
<ul> |
|
27 |
{% for ou, values in results.by_ou.items %} |
|
28 |
<li>{{ values|get_item:"name" }} |
|
29 |
<ul> |
|
30 |
{% include "authentic2/manager/users_csv_import_simulate_include.html" with results=values %} |
|
31 |
</ul> |
|
32 |
</li> |
|
33 |
{% endfor %} |
|
34 |
</ul> |
|
35 |
</div> |
|
36 |
{% endif %} |
|
37 |
{% if results.by_source.items %} |
|
38 |
<div id="by-source" class="import-results"> |
|
39 |
<h2>{% trans "Results by source:" %}</h2> |
|
40 |
<ul> |
|
41 |
{% for source, values in results.by_source.items %} |
|
42 |
<li>{{ source }} |
|
43 |
<ul> |
|
44 |
{% include "authentic2/manager/users_csv_import_simulate_include.html" with results=values %} |
|
45 |
</ul> |
|
46 |
</li> |
|
47 |
{% endfor %} |
|
48 |
</div> |
|
49 |
{% endif %} |
|
50 |
<a class="button" href="{% url "a2-manager-users-csv-import-perform" %}">{% trans "Perform the import" %}</a> |
|
51 |
<a class="button" href="{% url "a2-manager-users" %}">{% trans "Cancel" %}</a> |
|
52 |
{% else %} |
|
53 |
<div id="global" class="import-results"> |
|
54 |
{% trans "No import operation would be performed." %} |
|
55 |
{% if results|get_item:"error" %}{% blocktrans with error=results|get_item:"error" %}Reason: {{ error }}{% endblocktrans %} {% endif %} |
|
56 |
{% endif %} |
|
57 |
{% endblock %} |
src/authentic2/manager/templates/authentic2/manager/users_csv_import_simulate_include.html | ||
---|---|---|
1 |
{% load i18n %} |
|
2 |
{% load authentic2 %} |
|
3 | ||
4 |
{% if results|get_item:"nb_created" %} |
|
5 |
<li>{% blocktrans with counter=results|get_item:"nb_created" plural=results|get_item:"nb_created"|pluralize %}{{ counter }} user{{ plural }} would be created.{% endblocktrans %}</li> |
|
6 |
{% endif %} |
|
7 | ||
8 |
{% if results|get_item:"nb_erased" %} |
|
9 |
<li>{% blocktrans with counter=results|get_item:"nb_erased" plural=results|get_item:"nb_erased"|pluralize %}{{ counter }} user{{ plural }} would be erased.{% endblocktrans %}</li> |
|
10 |
{% endif %} |
|
11 | ||
12 |
{% if results|get_item:"nb_ignored" %} |
|
13 |
<li>{% blocktrans with counter=results|get_item:"nb_ignored" plural=results|get_item:"nb_ignored"|pluralize %}{{ counter }} user{{ plural }} would be ignored.{% endblocktrans %}</li> |
|
14 |
{% endif %} |
src/authentic2/manager/templates/authentic2/manager/users_csv_import_upload_file.html | ||
---|---|---|
1 |
{% extends "authentic2/manager/form.html" %} |
|
2 |
{% load i18n %} |
|
3 | ||
4 |
{% block page-title %} |
|
5 |
{{ block.super }} - {{ view.title }} |
|
6 |
{% endblock %} |
|
7 | ||
8 |
{% block breadcrumb %} |
|
9 |
{{ block.super }} |
|
10 |
<a href="../..">{% trans "Users" %}</a> |
|
11 |
<a href="">{{ view.title }}</a> |
|
12 |
{% endblock %} |
|
13 | ||
14 |
{% block content %} |
|
15 |
<form enctype="multipart/form-data" method="post"> |
|
16 | ||
17 |
{% csrf_token %} |
|
18 |
{{ form.as_p }} |
|
19 |
<p><a href="{{ example_csv_href }}">{% trans "Download an example csv file." %}</a></p> |
|
20 |
<button class="submit-button">{% trans "Submit" %}</button> |
|
21 |
</form> |
|
22 |
{% endblock %} |
src/authentic2/manager/urls.py | ||
---|---|---|
70 | 70 |
url(r'^users/uuid:(?P<slug>[a-z0-9]+)/change-email/$', |
71 | 71 |
user_views.user_change_email, |
72 | 72 |
name='a2-manager-user-by-uuid-change-email'), |
73 |
url(r'^users/csv-import/upload-file/$', |
|
74 |
user_views.users_csv_import_upload_file, |
|
75 |
name='a2-manager-users-csv-import-upload-file'), |
|
76 |
url(r'^users/csv-import/configure/$', |
|
77 |
user_views.users_csv_import_configure, |
|
78 |
name='a2-manager-users-csv-import-configure'), |
|
79 |
url(r'^users/csv-import/configurea-attributes/$', |
|
80 |
user_views.users_csv_import_configure_attributes, |
|
81 |
name='a2-manager-users-csv-import-configure-attributes'), |
|
82 |
url(r'^users/csv-import/simulate/$', |
|
83 |
user_views.users_csv_import_simulate, |
|
84 |
name='a2-manager-users-csv-import-simulate'), |
|
85 |
url(r'^users/csv-import/perform/$', |
|
86 |
user_views.users_csv_import_perform, |
|
87 |
name='a2-manager-users-csv-import-perform'), |
|
73 | 88 | |
74 | 89 |
# Authentic2 roles |
75 | 90 |
url(r'^roles/$', role_views.listing, |
src/authentic2/manager/user_views.py | ||
---|---|---|
17 | 17 |
import datetime |
18 | 18 |
import collections |
19 | 19 | |
20 |
from django import forms |
|
20 | 21 |
from django.db import models |
21 | 22 |
from django.utils.translation import ugettext_lazy as _, ugettext |
22 | 23 |
from django.utils.html import format_html |
... | ... | |
26 | 27 |
from django.contrib.auth import get_user_model |
27 | 28 |
from django.contrib.contenttypes.models import ContentType |
28 | 29 |
from django.contrib import messages |
30 |
from django.views.generic import View, FormView, TemplateView |
|
29 | 31 | |
32 |
import csv |
|
30 | 33 |
import tablib |
31 | 34 | |
32 | 35 |
from authentic2.models import Attribute, AttributeValue, PasswordReset |
33 | 36 |
from authentic2.utils import switch_user, send_password_reset_mail, redirect, select_next_url |
34 | 37 |
from authentic2.a2_rbac.utils import get_default_ou |
38 |
from authentic2.data_transfer import import_users_from_csv_rows |
|
35 | 39 |
from authentic2 import hooks |
36 | 40 |
from django_rbac.utils import get_role_model, get_role_parenting_model, get_ou_model |
37 | 41 | |
... | ... | |
41 | 45 |
BaseSubTableView, HideOUColumnMixin, BaseDeleteView, BaseDetailView |
42 | 46 |
from .tables import UserTable, UserRolesTable, OuUserRolesTable |
43 | 47 |
from .forms import (UserSearchForm, UserAddForm, UserEditForm, |
44 |
UserChangePasswordForm, ChooseUserRoleForm, UserRoleSearchForm, UserChangeEmailForm) |
|
48 |
UserChangePasswordForm, ChooseUserRoleForm, UserRoleSearchForm, UserChangeEmailForm, |
|
49 |
UsersCsvImportUploadFileForm, UsersCsvImportConfigureForm, |
|
50 |
UsersCsvImportConfigureAttributesForm) |
|
45 | 51 |
from .resources import UserResource |
46 | 52 |
from .utils import get_ou_count |
47 | 53 |
from . import app_settings |
... | ... | |
96 | 102 |
ou = self.search_form.cleaned_data.get('ou') |
97 | 103 |
if ou and self.request.user.has_ou_perm('custom_user.add_user', ou): |
98 | 104 |
ctx['add_ou'] = ou |
105 |
if self.request.user.has_perm('custom_user.add_user'): |
|
106 |
ctx['import_users'] = True |
|
99 | 107 |
return ctx |
100 | 108 | |
101 | 109 | |
... | ... | |
610 | 618 | |
611 | 619 | |
612 | 620 |
user_delete = UserDeleteView.as_view() |
621 | ||
622 | ||
623 |
class UsersCsvImportUploadFileView(FormView): |
|
624 |
template_name = 'authentic2/manager/users_csv_import_upload_file.html' |
|
625 |
form_class = UsersCsvImportUploadFileForm |
|
626 |
title = _('Upload CSV file for user import') |
|
627 |
permissions = ['custom_user.add_user'] |
|
628 |
example_csv_href = '/media/misc/users_csv_import_example.csv' |
|
629 | ||
630 |
def get_context_data(self, **kwargs): |
|
631 |
context = super(UsersCsvImportUploadFileView, self).get_context_data( |
|
632 |
**kwargs) |
|
633 |
context['example_csv_href'] = self.example_csv_href |
|
634 |
return context |
|
635 | ||
636 |
def get_success_url(self): |
|
637 |
return reverse('a2-manager-users-csv-import-configure') |
|
638 | ||
639 |
def form_valid(self, form): |
|
640 |
csv_file = form.cleaned_data.get('csv_file') |
|
641 |
reader = csv.reader(csv_file) |
|
642 |
rows = list(reader) |
|
643 |
self.request.session['rows'] = rows |
|
644 |
return super(UsersCsvImportUploadFileView, self).form_valid(form) |
|
645 | ||
646 | ||
647 |
users_csv_import_upload_file = UsersCsvImportUploadFileView.as_view() |
|
648 | ||
649 | ||
650 |
class UsersCsvImportConfigureView(FormView): |
|
651 |
template_name = 'authentic2/manager/users_csv_import_configure.html' |
|
652 |
form_class = UsersCsvImportConfigureForm |
|
653 |
title = _('Organization configuration') |
|
654 |
permissions = ['custom_user.add_user'] |
|
655 | ||
656 |
def get_initial(self): |
|
657 |
initial = super(UsersCsvImportConfigureView, self).get_initial() |
|
658 |
# put number of data lines from the csv |
|
659 |
rows = self.request.session['rows'] # XXX dialect # XXX bom |
|
660 |
header = rows[0] |
|
661 |
# inspect csv_file for ou column name |
|
662 |
for ou_column_name in ['ou__slug', 'ou_slug', 'ou__name', 'ou_name', |
|
663 |
'ou__id', 'ou_id']: |
|
664 |
if ou_column_name in header: |
|
665 |
initial['ou_column_name'] = ou_column_name |
|
666 |
break |
|
667 |
# inspect csv_file for source_id column name |
|
668 |
for source_id_column_name in ['source_id', 'app_id']: |
|
669 |
if source_id_column_name in header: |
|
670 |
initial['source_id_column_name'] = source_id_column_name |
|
671 |
break |
|
672 |
# inspect csv_file for source_domain column name |
|
673 |
if 'source_domain' in header: |
|
674 |
initial['source_domain_column_name'] = 'source_domain' |
|
675 |
return initial |
|
676 | ||
677 |
def get_context_data(self, **kwargs): |
|
678 |
context = super(UsersCsvImportConfigureView, self).get_context_data(**kwargs) |
|
679 |
rows = self.request.session['rows'] # XXX dialect # XXX bom |
|
680 |
header = rows[0] |
|
681 |
context['nb_lines'] = max(len(rows), 0) |
|
682 |
return context |
|
683 | ||
684 |
def get_success_url(self): |
|
685 |
return reverse('a2-manager-users-csv-import-configure-attributes') |
|
686 | ||
687 |
def form_valid(self, form): |
|
688 |
# put configuration in session |
|
689 |
configuration = self.request.session['configuration'] = {} |
|
690 |
configuration.update(form.cleaned_data) |
|
691 |
return super(UsersCsvImportConfigureView, self).form_valid(form) |
|
692 | ||
693 | ||
694 |
users_csv_import_configure = UsersCsvImportConfigureView.as_view() |
|
695 | ||
696 | ||
697 |
class UsersCsvImportConfigureAttributesView(FormView): |
|
698 |
form_class = UsersCsvImportConfigureAttributesForm |
|
699 |
template_name = 'authentic2/manager/users_csv_import_configure_attributes.html' |
|
700 |
title = _('User attribute configuration') |
|
701 |
permissions = ['custom_user.add_user'] |
|
702 | ||
703 |
def get_form_kwargs(self): |
|
704 |
kwargs = super(UsersCsvImportConfigureAttributesView, self).get_form_kwargs() |
|
705 |
if self.request.method == 'GET': |
|
706 |
rows = self.request.session['rows'] |
|
707 |
header = rows[0] |
|
708 |
kwargs.update({'header': header}) |
|
709 |
return kwargs |
|
710 | ||
711 |
def form_valid(self, form): |
|
712 |
# put attribute configuration in session |
|
713 |
attribute_configuration = self.request.session['attribute_configuration'] = {} |
|
714 |
attribute_configuration.update(form.cleaned_data) |
|
715 |
return super(UsersCsvImportConfigureAttributesView, self).form_valid(form) |
|
716 | ||
717 |
def get_success_url(self): |
|
718 |
return reverse('a2-manager-users-csv-import-simulate') |
|
719 | ||
720 | ||
721 |
users_csv_import_configure_attributes = UsersCsvImportConfigureAttributesView.as_view() |
|
722 | ||
723 | ||
724 |
class UsersCsvImportSimulateView(TemplateView): |
|
725 |
template_name = 'authentic2/manager/users_csv_import_simulate.html' |
|
726 |
title = _('Simulation of the user import from CSV file') |
|
727 |
permissions = ['custom_user.add_user'] |
|
728 |
results = None |
|
729 | ||
730 |
def get(self, request, *args, **kwargs): |
|
731 |
rows = request.session.get('rows') |
|
732 |
# retrieve configuration from request |
|
733 |
attribute_configuration = request.session.get('attribute_configuration') |
|
734 |
configuration = request.session.get('configuration') |
|
735 |
self.results = import_users_from_csv_rows( |
|
736 |
rows, configuration, attribute_configuration, dry_run=True) |
|
737 |
return super(UsersCsvImportSimulateView, self).get( |
|
738 |
request, *args, **kwargs) |
|
739 | ||
740 |
def get_context_data(self, **kwargs): |
|
741 |
context = super(UsersCsvImportSimulateView, self).get_context_data( |
|
742 |
**kwargs) |
|
743 |
context['results'] = self.results |
|
744 |
return context |
|
745 | ||
746 | ||
747 |
users_csv_import_simulate = UsersCsvImportSimulateView.as_view() |
|
748 | ||
749 | ||
750 |
class UsersCsvImportPerformView(TemplateView): |
|
751 |
template_name = 'authentic2/manager/users_csv_import_perform.html' |
|
752 |
title = _('Peform the user import from CSV file') |
|
753 |
permissions = ['custom_user.add_user'] |
|
754 |
results = None |
|
755 | ||
756 |
def get(self, request, *args, **kwargs): |
|
757 |
rows = request.session.pop('rows') |
|
758 |
# retrieve configuration from request |
|
759 |
attribute_configuration = request.session.pop('attribute_configuration') |
|
760 |
configuration = request.session.pop('configuration') |
|
761 |
self.results = import_users_from_csv_rows( |
|
762 |
rows, configuration, attribute_configuration, dry_run=False) |
|
763 |
return super(UsersCsvImportPerformView, self).get( |
|
764 |
request, *args, **kwargs) |
|
765 | ||
766 |
def get_context_data(self, **kwargs): |
|
767 |
context = super(UsersCsvImportPerformView, self).get_context_data( |
|
768 |
**kwargs) |
|
769 |
context['results'] = self.results |
|
770 |
return context |
|
771 | ||
772 | ||
773 |
users_csv_import_perform = UsersCsvImportPerformView.as_view() |
src/authentic2/manager/utils.py | ||
---|---|---|
40 | 40 |
@GlobalCache(timeout=10) |
41 | 41 |
def get_ou_count(): |
42 | 42 |
return get_ou_model().objects.count() |
43 | ||
44 | ||
45 |
@GlobalCache(timeout=10) |
|
46 |
def get_ou_choices(): |
|
47 |
Ou = get_ou_model() |
|
48 |
return [(ou.id, ou.name) for ou in Ou.objects.all()] |
src/authentic2/migrations/0024_auto_20190606_1412.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
from __future__ import unicode_literals |
|
3 | ||
4 |
from django.db import models, migrations |
|
5 | ||
6 | ||
7 |
class Migration(migrations.Migration): |
|
8 | ||
9 |
dependencies = [ |
|
10 |
('authentic2', '0023_auto_20181031_0900'), |
|
11 |
] |
|
12 | ||
13 |
operations = [ |
|
14 |
migrations.AlterUniqueTogether( |
|
15 |
name='userexternalid', |
|
16 |
unique_together=set([('user', 'source'), ('external_id', 'source')]), |
|
17 |
), |
|
18 |
] |
src/authentic2/models.py | ||
---|---|---|
75 | 75 |
class Meta: |
76 | 76 |
verbose_name = _('user external id') |
77 | 77 |
verbose_name_plural = _('user external ids') |
78 |
unique_together = [['user', 'source'], ['external_id', 'source']] |
|
78 | 79 | |
79 | 80 | |
80 | 81 |
@six.python_2_unicode_compatible |
src/authentic2/templatetags/authentic2.py | ||
---|---|---|
1 |
from django import template |
|
2 | ||
3 |
register = template.Library() |
|
4 | ||
5 | ||
6 |
@register.filter(name='get_item') |
|
7 |
def get_item(dictionary, value): |
|
8 |
return dictionary.get(value) |
tests/csv/users_csv_import_testfile.csv | ||
---|---|---|
1 |
source_domain,source_id,first_name,last_name,username,email,ou_slug |
|
2 |
domain.nowhere.null,123,Hervé,Dupont,hdup,foo@bar.null,foo |
|
3 |
domain.nowhere.null,456,René,Durant,rdur,boo@far.null,bar |
tests/csv/users_csv_import_testfile2.csv | ||
---|---|---|
1 |
source_domain,app_id,first_name,last_name,nickname,is_a_spy,enrolled_on,secret_phone_number,headquarters_postcode,email |
|
2 |
domain.nowhere.null,123,Hervé,Dupont,Herbie,1,2006-06-06,+12345,75001,hdup@bar.null |
|
3 |
domain.nowhere.null,456,René,Durant,Ronnie,0,2007-07-07,+12356,13001,rdur@bar.null |
tests/csv/users_csv_import_testfile3.csv | ||
---|---|---|
1 |
source_domain,source_id,first_name,last_name,username,email |
|
2 |
domain.nowhere.null,123,Hervé,Dupont,hdup,foo@bar.null |
|
3 |
domain.nowhere.null,456,René,Durant,rdur,boo@far.null |
tests/csv/users_csv_import_testfile4.csv | ||
---|---|---|
1 |
source_domain,app_id,first_name,last_name,nickname,freemason |
|
2 |
whatever.nowhere,123,Hervé,Dupont,Herv,0 |
|
3 |
whatever.nowhere,456,René,Durant,RR,1 |
tests/csv/users_csv_import_testfile5.csv | ||
---|---|---|
1 |
app_id,first_name,last_name,email |
|
2 |
123,Hervé,Durant,foo@bar.null |
tests/test_data_transfer.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
from django.contrib.auth import get_user_model |
|
17 | 18 |
from django_rbac.utils import get_role_model, get_ou_model |
19 |
import csv |
|
18 | 20 |
import pytest |
19 | 21 | |
20 | 22 |
from authentic2.a2_rbac.models import RoleParenting |
23 |
from authentic2.a2_rbac.utils import get_default_ou |
|
21 | 24 |
from authentic2.data_transfer import ( |
22 | 25 |
export_site, |
23 | 26 |
ExportContext, |
... | ... | |
28 | 31 |
ImportContext, |
29 | 32 |
RoleDeserializer, |
30 | 33 |
search_role, |
31 |
import_ou) |
|
34 |
import_ou, |
|
35 |
import_users_from_csv_rows) |
|
36 |
from authentic2.models import UserExternalId, Attribute |
|
32 | 37 |
from authentic2.utils import get_hex_uuid |
33 | 38 | |
34 | 39 | |
... | ... | |
540 | 545 |
d = export_site(ExportContext(export_ous=False)) |
541 | 546 |
assert 'ous' not in d |
542 | 547 | |
548 | ||
549 |
def test_import_users_from_csv_rows(db): |
|
550 |
ou = OU.objects.create(name='Foo', slug='foo') |
|
551 |
ou = OU.objects.create(name='Bar', slug='bar') |
|
552 | ||
553 |
User = get_user_model() |
|
554 | ||
555 |
with open('tests/csv/users_csv_import_testfile.csv', 'r') as csv_file: |
|
556 |
reader = csv.reader(csv_file) |
|
557 |
rows = list(reader) |
|
558 | ||
559 |
# a dictionary defining the column names of the organizational information: |
|
560 |
organization_configuration = { |
|
561 |
'ou_column_name': 'ou_slug', |
|
562 |
'fallback_ou': get_default_ou().id, |
|
563 |
'source_domain_column_name': 'source_domain', |
|
564 |
'source_id_column_name': 'source_id' |
|
565 |
} |
|
566 |
# a dictionary defining the column names of the user attributes: |
|
567 |
attributes_configuration = { |
|
568 |
'username': 'username', |
|
569 |
'first_name': 'first_name', |
|
570 |
'last_name': 'last_name', |
|
571 |
'email': 'email', |
|
572 |
} |
|
573 | ||
574 |
assert not len(User.objects.all()) |
|
575 | ||
576 |
results = import_users_from_csv_rows( |
|
577 |
rows, organization_configuration, attributes_configuration, |
|
578 |
dry_run=False, erase=False) |
|
579 | ||
580 |
assert len(User.objects.all()) == len(rows)-1 |
|
581 |
for i in range(2): |
|
582 |
user = User.objects.all()[i] |
|
583 |
external_id = UserExternalId.objects.get(user=user) |
|
584 |
assert external_id.source == rows[i+1][0].decode('utf-8') |
|
585 |
assert external_id.external_id == rows[i+1][1].decode('utf-8') |
|
586 |
assert user.first_name == rows[i+1][2].decode('utf-8') |
|
587 |
assert user.last_name == rows[i+1][3].decode('utf-8') |
|
588 |
assert user.username == rows[i+1][4].decode('utf-8') |
|
589 |
assert user.email == rows[i+1][5].decode('utf-8') |
|
590 |
assert user.ou.slug == rows[i+1][6].decode('utf-8') |
|
591 | ||
592 |
assert not results['error'] |
|
593 |
assert results['nb_created'] == 2 |
|
594 |
assert results['nb_ignored'] == 0 |
|
595 |
assert results['nb_erased'] == 0 |
|
596 |
assert results['by_ou']['foo']['nb_created'] == 1 |
|
597 |
assert 'nb_erased' not in results['by_ou']['foo'] |
|
598 |
assert 'nb_ignored' not in results['by_ou']['foo'] |
|
599 |
assert results['by_ou']['bar']['nb_created'] == 1 |
|
600 |
assert 'nb_erased' not in results['by_ou']['bar'] |
|
601 |
assert 'nb_ignored' not in results['by_ou']['bar'] |
|
602 |
assert results['by_source']['domain.nowhere.null']['nb_created'] == 2 |
|
603 |
assert 'nb_erased' not in results['by_source']['domain.nowhere.null'] |
|
604 |
assert 'nb_ignored' not in results['by_source']['domain.nowhere.null'] |
|
605 | ||
606 | ||
607 |
def test_import_users_from_csv_rows_dry_run(db): |
|
608 |
ou = OU.objects.create(name='Foo', slug='foo') |
|
609 |
ou = OU.objects.create(name='Bar', slug='bar') |
|
610 | ||
611 |
User = get_user_model() |
|
612 | ||
613 |
with open('tests/csv/users_csv_import_testfile.csv', 'r') as csv_file: |
|
614 |
reader = csv.reader(csv_file) |
|
615 |
rows = list(reader) |
|
616 | ||
617 |
# a dictionary defining the column names of the organizational information: |
|
618 |
organization_configuration = { |
|
619 |
'ou_column_name': 'ou_slug', |
|
620 |
'fallback_ou': get_default_ou().id, |
|
621 |
'source_domain_column_name': 'source_domain', |
|
622 |
'source_id_column_name': 'source_id' |
|
623 |
} |
|
624 |
# a dictionary defining the column names of the user attributes: |
|
625 |
attributes_configuration = { |
|
626 |
'username': 'username', |
|
627 |
'first_name': 'first_name', |
|
628 |
'last_name': 'last_name', |
|
629 |
'email': 'email', |
|
630 |
} |
|
631 | ||
632 |
assert not len(User.objects.all()) |
|
633 | ||
634 |
results = import_users_from_csv_rows( |
|
635 |
rows, organization_configuration, attributes_configuration, |
|
636 |
dry_run=True, erase=False) |
|
637 | ||
638 |
assert not len(User.objects.all()) |
|
639 | ||
640 |
assert not results['error'] |
|
641 |
assert results['nb_created'] == 2 |
|
642 |
assert results['nb_ignored'] == 0 |
|
643 |
assert results['nb_erased'] == 0 |
|
644 |
assert results['by_ou']['foo']['nb_created'] == 1 |
|
645 |
assert 'nb_erased' not in results['by_ou']['foo'] |
|
646 |
assert 'nb_ignored' not in results['by_ou']['foo'] |
|
647 |
assert results['by_ou']['bar']['nb_created'] == 1 |
|
648 |
assert 'nb_erased' not in results['by_ou']['bar'] |
|
649 |
assert 'nb_ignored' not in results['by_ou']['bar'] |
|
650 |
assert results['by_source']['domain.nowhere.null']['nb_created'] == 2 |
|
651 |
assert 'nb_erased' not in results['by_source']['domain.nowhere.null'] |
|
652 |
assert 'nb_ignored' not in results['by_source']['domain.nowhere.null'] |
|
653 | ||
654 | ||
655 |
def test_import_users_from_csv_profile_attributes(db): |
|
656 |
Attribute.objects.create( |
|
657 |
label='Nickname', |
|
658 |
name='nickname', |
|
659 |
kind='string', |
|
660 |
user_visible=True, |
|
661 |
user_editable=True |
|
662 |
) |
|
663 |
Attribute.objects.create( |
|
664 |
label='Is a spy', |
|
665 |
name='is_a_spy', |
|
666 |
kind='boolean', |
|
667 |
user_visible=True, |
|
668 |
user_editable=True |
|
669 |
) |
|
670 |
Attribute.objects.create( |
|
671 |
label='Enrolled on', |
|
672 |
name='enrolled_on', |
|
673 |
kind='date', |
|
674 |
user_visible=True, |
|
675 |
user_editable=True |
|
676 |
) |
|
677 |
Attribute.objects.create( |
|
678 |
label='Secret phone number', |
|
679 |
name='secret_phone_number', |
|
680 |
kind='phone_number', |
|
681 |
user_visible=True, |
|
682 |
user_editable=True |
|
683 |
) |
|
684 |
Attribute.objects.create( |
|
685 |
label='Headquarters postcode', |
|
686 |
name='headquarters_postcode', |
|
687 |
kind='fr_postcode', |
|
688 |
user_visible=True, |
|
689 |
user_editable=True |
|
690 |
) |
|
691 | ||
692 |
User = get_user_model() |
|
693 | ||
694 |
# a dictionary defining the column names of the organizational information: |
|
695 |
organization_configuration = { |
|
696 |
'fallback_ou': get_default_ou().id, |
|
697 |
'source_domain_column_name': 'source_domain', |
|
698 |
'source_id_column_name': 'app_id' |
|
699 |
} |
|
700 |
# a dictionary defining the column names of the user attributes: |
|
701 |
attributes_configuration = { |
|
702 |
'username': 'username', |
|
703 |
'first_name': 'first_name', |
|
704 |
'last_name': 'last_name', |
|
705 |
'email': 'email', |
|
706 |
'nickname': 'nickname', |
|
707 |
'is_a_spy': 'is_a_spy', |
|
708 |
'enrolled_on': 'enrolled_on', |
|
709 |
'secret_phone_number': 'secret_phone_number', |
|
710 |
'headquarters_postcode': 'headquarters_postcode', |
|
711 |
} |
|
712 | ||
713 |
with open('tests/csv/users_csv_import_testfile2.csv', 'r') as csv_file: |
|
714 |
reader = csv.reader(csv_file) |
|
715 |
rows = list(reader) |
|
716 | ||
717 |
assert not len(User.objects.all()) |
|
718 | ||
719 |
results = import_users_from_csv_rows( |
|
720 |
rows, organization_configuration, attributes_configuration, |
|
721 |
dry_run=False, erase=False) |
|
722 | ||
723 |
assert not results['error'] |
|
724 |
assert len(User.objects.all()) == len(rows)-1 |
|
725 | ||
726 |
for i in range(2): |
|
727 |
user = User.objects.all()[i] |
|
728 |
assert user.attributes.nickname == rows[i+1][4].decode('utf-8') |
|
729 |
assert user.attributes.is_a_spy == bool(int(rows[i+1][5].decode('utf-8'))) |
|
730 |
assert user.attributes.enrolled_on.isoformat() == rows[i+1][6].decode('utf-8') |
|
731 |
assert user.attributes.secret_phone_number == rows[i+1][7].decode('utf-8') |
|
732 |
assert user.attributes.headquarters_postcode == rows[i+1][8].decode('utf-8') |
|
733 | ||
734 | ||
735 |
def test_import_users_from_csv_ignore(db): |
|
736 |
ou = get_default_ou() |
|
737 |
User = get_user_model() |
|
738 | ||
739 |
# a dictionary defining the column names of the organizational information: |
|
740 |
organization_configuration = { |
|
741 |
'fallback_ou': get_default_ou().id, |
|
742 |
'source_domain_column_name': 'source_domain', |
|
743 |
'source_id_column_name': 'source_id' |
|
744 |
} |
|
745 |
# a dictionary defining the column names of the user attributes: |
|
746 |
attributes_configuration = { |
|
747 |
'username': 'username', |
|
748 |
'first_name': 'first_name', |
|
749 |
'last_name': 'last_name', |
|
750 |
'email': 'email', |
|
751 |
} |
|
752 | ||
753 |
with open('tests/csv/users_csv_import_testfile3.csv', 'r') as csv_file: |
|
754 |
reader = csv.reader(csv_file) |
|
755 |
rows = list(reader) |
|
756 | ||
757 |
assert not len(User.objects.all()) |
|
758 | ||
759 |
results = import_users_from_csv_rows( |
|
760 |
rows, organization_configuration, attributes_configuration, |
|
761 |
dry_run=False, erase=False) |
|
762 | ||
763 |
assert len(User.objects.all()) == len(rows)-1 |
|
764 |
assert not results['error'] |
|
765 |
assert results['nb_created'] == 2 |
|
766 |
assert results['by_ou']['default']['nb_created'] == 2 |
|
767 |
assert results['by_source']['domain.nowhere.null']['nb_created'] == 2 |
|
768 | ||
769 |
results = import_users_from_csv_rows( |
|
770 |
rows, organization_configuration, attributes_configuration, |
|
771 |
dry_run=False, erase=False) |
|
772 | ||
773 |
assert len(User.objects.all()) == len(rows)-1 |
|
774 |
assert not results['error'] |
|
775 |
assert results['nb_ignored'] == 2 |
|
776 |
assert results['by_ou']['default']['nb_ignored'] == 2 |
|
777 |
assert results['by_source']['domain.nowhere.null']['nb_ignored'] == 2 |
|
778 | ||
779 |
results = import_users_from_csv_rows( |
|
780 |
rows, organization_configuration, attributes_configuration, |
|
781 |
dry_run=False, erase=False) |
|
782 | ||
783 | ||
784 |
def test_import_users_from_csv_erase(db): |
|
785 |
ou = get_default_ou() |
|
786 |
User = get_user_model() |
|
787 | ||
788 |
# a dictionary defining the column names of the organizational information: |
|
789 |
organization_configuration = { |
|
790 |
'fallback_ou': get_default_ou().id, |
|
791 |
'source_domain_column_name': 'source_domain', |
|
792 |
'source_id_column_name': 'source_id' |
|
793 |
} |
|
794 |
# a dictionary defining the column names of the user attributes: |
|
795 |
attributes_configuration = { |
|
796 |
'username': 'username', |
|
797 |
'first_name': 'first_name', |
|
798 |
'last_name': 'last_name', |
|
799 |
'email': 'email', |
|
800 |
} |
|
801 | ||
802 |
with open('tests/csv/users_csv_import_testfile3.csv', 'r') as csv_file: |
|
803 |
reader = csv.reader(csv_file) |
|
804 |
rows = list(reader) |
|
805 | ||
806 |
assert not len(User.objects.all()) |
|
807 | ||
808 |
results = import_users_from_csv_rows( |
|
809 |
rows, organization_configuration, attributes_configuration, |
|
810 |
dry_run=False, erase=False) |
|
811 | ||
812 |
assert len(User.objects.all()) == len(rows)-1 |
|
813 |
assert not results['error'] |
|
814 |
assert results['nb_created'] == 2 |
|
815 |
assert results['by_ou']['default']['nb_created'] == 2 |
|
816 |
assert results['by_source']['domain.nowhere.null']['nb_created'] == 2 |
|
817 | ||
818 |
results = import_users_from_csv_rows( |
|
819 |
rows, organization_configuration, attributes_configuration, |
|
820 |
dry_run=False, erase=True) |
|
821 | ||
822 |
assert len(User.objects.all()) == len(rows)-1 |
|
823 |
assert not results['error'] |
|
824 |
assert results['nb_erased'] == 2 |
|
825 |
assert results['by_ou']['default']['nb_erased'] == 2 |
|
826 |
assert results['by_source']['domain.nowhere.null']['nb_erased'] == 2 |
|
827 | ||
828 |
results = import_users_from_csv_rows( |
|
829 |
rows, organization_configuration, attributes_configuration, |
|
830 |
dry_run=False, erase=False) |
tests/test_manager.py | ||
---|---|---|
18 | 18 |
import pytest |
19 | 19 |
import json |
20 | 20 | |
21 |
from django import VERSION as django_version |
|
21 | 22 |
from django.core.urlresolvers import reverse |
22 | 23 |
from django.core import mail |
23 | 24 | |
24 | 25 |
from webtest import Upload |
25 | 26 | |
26 | 27 |
from authentic2.a2_rbac.utils import get_default_ou |
28 |
from authentic2.models import Attribute |
|
27 | 29 | |
28 | 30 |
from django_rbac.utils import get_ou_model, get_role_model |
29 | 31 |
from django.contrib.auth import get_user_model |
... | ... | |
904 | 906 | |
905 | 907 |
user = User.objects.get(id=simple_user.id) |
906 | 908 |
assert not user.email_verified |
909 | ||
910 | ||
911 |
@pytest.mark.skipif( |
|
912 |
django_version < (1, 11), reason='Requires Django v1.11 or higher.') |
|
913 |
def test_manager_import_users_from_csv(admin, app, db): |
|
914 |
Attribute.objects.create( |
|
915 |
label='Nickname', |
|
916 |
name='nickname', |
|
917 |
kind='string', |
|
918 |
user_visible=True, |
|
919 |
user_editable=True |
|
920 |
) |
|
921 |
Attribute.objects.create( |
|
922 |
label='Freemason', |
|
923 |
name='freemason', |
|
924 |
kind='boolean', |
|
925 |
user_visible=True, |
|
926 |
user_editable=True |
|
927 |
) |
|
928 | ||
929 |
User = get_user_model() |
|
930 |
nb_initial = len(User.objects.all()) |
|
931 | ||
932 |
login(app, admin, '/manage/') |
|
933 |
url = u'/manage/users/csv-import/upload-file/' |
|
934 |
response = app.get(url) |
|
935 | ||
936 |
# page 1: upload file |
|
937 |
assert 'csv_file' in response.text |
|
938 |
form = response.form |
|
939 |
form.set('csv_file', Upload('tests/csv/users_csv_import_testfile4.csv')) |
|
940 |
response = form.submit().follow() |
|
941 | ||
942 |
# page 2: organizational columns configuration |
|
943 |
assert 'ou_column_name' in response.text |
|
944 |
assert 'fallback_ou' in response.text |
|
945 |
assert 'source_domain_column_name' in response.text |
|
946 |
assert 'source_id_column_name' in response.text |
|
947 |
form = response.form |
|
948 |
form.set('source_domain_column_name', 'source_domain') |
|
949 |
form.set('source_id_column_name', 'app_id') |
|
950 |
response = form.submit().follow() |
|
951 | ||
952 |
# page 3: profile attribute column configuration |
|
953 |
# # standard profile attributes |
|
954 |
assert 'first_name' in response.text |
|
955 |
assert 'last_name' in response.text |
|
956 |
assert 'username' in response.text |
|
957 |
assert 'email' in response.text |
|
958 |
assert 'email_verified' in response.text |
|
959 |
assert 'is_staff' in response.text |
|
960 |
assert 'is_active' in response.text |
|
961 |
# # extended profile attributes |
|
962 |
assert 'nickname' in response.text |
|
963 |
assert 'freemason' in response.text |
|
964 |
form = response.form |
|
965 |
for directly_mapped_element in [ |
|
966 |
'first_name', 'last_name', 'nickname', 'freemason']: |
|
967 |
form.set(directly_mapped_element, directly_mapped_element) |
|
968 |
response = form.submit().follow() |
|
969 | ||
970 |
# page 4: simulated import results |
|
971 |
assert '2 user(s) would be created.' in response.text |
|
972 |
assert 'Global results' in response.text |
|
973 |
assert 'Results by organizational unit' in response.text |
|
974 |
assert 'Results by source' in response.text |
|
975 |
response = response.click(description='Perform the import') |
|
976 | ||
977 |
# page5: import results |
|
978 |
assert '2 user(s) have been created.' in response.text |
|
979 |
assert 'Global results' in response.text |
|
980 |
assert 'Results by organizational unit' in response.text |
|
981 |
assert 'Results by source' in response.text |
|
982 |
assert len(User.objects.all()) == nb_initial+2 |
|
907 |
- |