From ee8d83f2d853edd68083fa840bf32d498d413abf Mon Sep 17 00:00:00 2001 From: Valentin Deniaud Date: Tue, 16 Mar 2021 11:17:12 +0100 Subject: [PATCH] manager: import roles using CSV (#24921) --- src/authentic2/manager/forms.py | 102 +++++++++++++++++- src/authentic2/manager/role_views.py | 43 ++++++++ .../templates/authentic2/manager/roles.html | 1 + .../manager/roles_csv_import_form.html | 16 +++ .../authentic2/manager/sample_roles.txt | 2 + src/authentic2/manager/urls.py | 4 + src/django_rbac/models.py | 3 +- src/django_rbac/utils.py | 10 ++ tests/test_role_manager.py | 62 ++++++++++- 9 files changed, 238 insertions(+), 5 deletions(-) create mode 100644 src/authentic2/manager/templates/authentic2/manager/roles_csv_import_form.html create mode 100644 src/authentic2/manager/templates/authentic2/manager/sample_roles.txt diff --git a/src/authentic2/manager/forms.py b/src/authentic2/manager/forms.py index 0a212ac3..e75f74bf 100644 --- a/src/authentic2/manager/forms.py +++ b/src/authentic2/manager/forms.py @@ -14,10 +14,13 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import csv import hashlib import json import smtplib import logging +from collections import defaultdict +from io import StringIO from django.utils.translation import ugettext_lazy as _, pgettext, ugettext from django import forms @@ -33,7 +36,7 @@ from authentic2.utils import send_templated_mail from authentic2.forms.fields import NewPasswordField, CheckPasswordField, ValidatedEmailField from django_rbac.models import Operation -from django_rbac.utils import get_ou_model, get_role_model, get_permission_model +from django_rbac.utils import get_ou_model, get_role_model, get_permission_model, generate_slug from django_rbac.backends import DjangoRBACBackend from authentic2.forms.profile import BaseUserForm @@ -47,6 +50,7 @@ from . import fields, app_settings, utils User = get_user_model() OU = get_ou_model() +Role = get_role_model() logger = logging.getLogger(__name__) @@ -776,3 +780,99 @@ class UserEditImportForm(UserImportForm): with self.user_import.meta_update as meta: meta['ou'] = self.cleaned_data['ou'] meta['encoding'] = self.cleaned_data['encoding'] + + +class RolesCsvImportForm(LimitQuerysetFormMixin, forms.Form): + import_file = forms.FileField( + label=_('Roles file'), + required=True, + help_text=_('CSV file with role name and optionnaly role slug and organizational unit.') + ) + + ou = forms.ModelChoiceField( + label=_('Organizational unit'), + queryset=get_ou_model().objects, + initial=lambda: get_default_ou().pk + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if utils.get_ou_count() < 2: + self.fields['ou'].widget = forms.HiddenInput() + + def clean(self): + super().clean() + + content = self.cleaned_data['import_file'].read() + if b'\0' in content: + raise ValidationError(_('Invalid file format.')) + + for charset in ('utf-8-sig', 'iso-8859-15'): + try: + content = content.decode(charset) + break + except UnicodeDecodeError: + continue + # all byte-sequences are ok for iso-8859-15 so we will always reach + # this line with content being a unicode string. + + try: + dialect = csv.Sniffer().sniff(content) + except csv.Error: + dialect = None + + all_roles = Role.objects.all() + roles_by_slugs = defaultdict(dict) + for role in all_roles: + roles_by_slugs[role.ou][role.slug] = role + roles_by_names = defaultdict(dict) + for role in all_roles: + if role.name: + roles_by_names[role.ou][role.name] = role + + self.roles = [] + for i, csvline in enumerate(csv.reader(StringIO(content), dialect=dialect, delimiter=',')): + if not csvline: + continue + + if i == 0 and csvline[0].strip('#').lower() == 'name': + continue + + name = csvline[0] + if not name: + self.add_line_error(_('Name is required.'), i) + continue + + slug = '' + if len(csvline) > 1: + slug = csvline[1] + + ou = self.cleaned_data['ou'] + if len(csvline) > 2 and csvline[2]: + try: + ou = OU.objects.get(slug=csvline[2]) + except OU.DoesNotExist: + self.add_line_error(_('Organizational Unit %s does not exist.') % csvline[2], i) + continue + + if name in roles_by_names.get(ou, {}): + role = roles_by_names[ou][name] + role.slug = slug or role.slug + elif slug in roles_by_slugs.get(ou, {}): + role = roles_by_slugs[ou][slug] + role.name = name + else: + role = Role(name=name, slug=slug) + + if not role.slug: + role.slug = generate_slug(role.name, seen_slugs=roles_by_slugs[ou]) + + roles_by_slugs[ou][role.slug] = role + roles_by_names[ou][role.name] = role + + role.ou = ou + self.roles.append(role) + + def add_line_error(self, error, line): + error = _('%s (line %d)') % (error, line + 1) + self.add_error('import_file', error) diff --git a/src/authentic2/manager/role_views.py b/src/authentic2/manager/role_views.py index bba2dbca..9f24f946 100644 --- a/src/authentic2/manager/role_views.py +++ b/src/authentic2/manager/role_views.py @@ -590,6 +590,49 @@ class RolesImportView(views.PermissionMixin, views.TitleMixin, views.MediaMixin, roles_import = RolesImportView.as_view() +class RolesCsvImportView(views.PermissionMixin, views.TitleMixin, views.MediaMixin, views.FormNeedsRequest, FormView): + form_class = forms.RolesCsvImportForm + model = get_role_model() + template_name = 'authentic2/manager/roles_csv_import_form.html' + title = _('Roles CSVĀ Import') + + def get_initial(self): + initial = super().get_initial() + search_ou = self.request.GET.get('search-ou') + if search_ou: + initial['ou'] = search_ou + return initial + + def post(self, request, *args, **kwargs): + if not self.can_add: + raise PermissionDenied + return super().post(request, *args, **kwargs) + + def form_valid(self, form): + self.ou = form.cleaned_data['ou'] + for role in form.roles: + role.save() + return super().form_valid(form) + + def get_success_url(self): + messages.success( + self.request, + _('Roles have been successfully imported inside "%s" organizational unit.') % self.ou + ) + return reverse('a2-manager-roles') + '?search-ou=%s' % self.ou.pk + + +roles_csv_import = RolesCsvImportView.as_view() + + +class RolesCsvImportSampleView(TemplateView): + template_name = 'authentic2/manager/sample_roles.txt' + content_type = 'text/csv' + + +roles_csv_import_sample = RolesCsvImportSampleView.as_view() + + class RoleJournal(views.PermissionMixin, JournalViewWithContext, BaseJournalView): template_name = 'authentic2/manager/role_journal.html' permissions = ['a2_rbac.view_role'] diff --git a/src/authentic2/manager/templates/authentic2/manager/roles.html b/src/authentic2/manager/templates/authentic2/manager/roles.html index 9cc1c9ac..f4c5f670 100644 --- a/src/authentic2/manager/templates/authentic2/manager/roles.html +++ b/src/authentic2/manager/templates/authentic2/manager/roles.html @@ -19,6 +19,7 @@
  • {% trans 'Export' %}
  • {% if view.can_add %}
  • {% trans 'Import' %}
  • +
  • {% trans 'CSV import' %}
  • {% endif %} diff --git a/src/authentic2/manager/templates/authentic2/manager/roles_csv_import_form.html b/src/authentic2/manager/templates/authentic2/manager/roles_csv_import_form.html new file mode 100644 index 00000000..c86695bf --- /dev/null +++ b/src/authentic2/manager/templates/authentic2/manager/roles_csv_import_form.html @@ -0,0 +1,16 @@ +{% extends "authentic2/manager/import_form.html" %} +{% load i18n gadjo %} + +{% block content %} +
    +{% csrf_token %} +{{ form|with_template }} +

    +{% trans 'Download sample file' %} +

    +
    + + {% trans 'Cancel' %} +
    +
    +{% endblock %} diff --git a/src/authentic2/manager/templates/authentic2/manager/sample_roles.txt b/src/authentic2/manager/templates/authentic2/manager/sample_roles.txt new file mode 100644 index 00000000..14f5939e --- /dev/null +++ b/src/authentic2/manager/templates/authentic2/manager/sample_roles.txt @@ -0,0 +1,2 @@ +name,slug,ou +Role Name,role_slug,ou_slug diff --git a/src/authentic2/manager/urls.py b/src/authentic2/manager/urls.py index 244f242d..03ae643a 100644 --- a/src/authentic2/manager/urls.py +++ b/src/authentic2/manager/urls.py @@ -98,6 +98,10 @@ urlpatterns = required( name='a2-manager-roles'), url(r'^roles/import/$', role_views.roles_import, name='a2-manager-roles-import'), + url(r'^roles/csv-import/$', role_views.roles_csv_import, + name='a2-manager-roles-csv-import'), + url(r'^roles/csv-import-sample/$', role_views.roles_csv_import_sample, + name='a2-manager-roles-csv-import-sample'), url(r'^roles/add/$', role_views.add, name='a2-manager-role-add'), url(r'^roles/export/(?Pcsv|json)/$', diff --git a/src/django_rbac/models.py b/src/django_rbac/models.py index dcb7e9c5..008d43bd 100644 --- a/src/django_rbac/models.py +++ b/src/django_rbac/models.py @@ -2,7 +2,6 @@ import operator import hashlib from django.utils import six -from django.utils.text import slugify from django.utils.translation import ugettext_lazy as _ from django.db import models from django.conf import settings @@ -52,7 +51,7 @@ class AbstractBase(models.Model): def save(self, *args, **kwargs): # truncate slug and add a hash if it's too long if not self.slug: - self.slug = slugify(six.text_type(self.name)).lstrip('_') + self.slug = utils.generate_slug(self.name) if len(self.slug) > 256: self.slug = self.slug[:252] + \ hashlib.md5(self.slug).hexdigest()[:4] diff --git a/src/django_rbac/utils.py b/src/django_rbac/utils.py index 5128d06e..a271b41f 100644 --- a/src/django_rbac/utils.py +++ b/src/django_rbac/utils.py @@ -3,6 +3,7 @@ import uuid from django.conf import settings from django.apps import apps from django.utils import six +from django.utils.text import slugify from . import constants @@ -80,3 +81,12 @@ def get_operation(operation_tpl): from . import models operation, created = models.Operation.objects.get_or_create(slug=operation_tpl.slug) return operation + + +def generate_slug(name, seen_slugs=None): + slug = base_slug = slugify(six.text_type(name)).lstrip('_') + if seen_slugs: + i = 1 + while slug in seen_slugs: + slug = '%s-%s' % (base_slug, i) + return slug diff --git a/tests/test_role_manager.py b/tests/test_role_manager.py index be869077..64cc8145 100644 --- a/tests/test_role_manager.py +++ b/tests/test_role_manager.py @@ -39,7 +39,7 @@ def test_manager_role_export(app, admin, ou1, role_ou1, ou2, role_ou2): assert len(export['roles']) == 2 assert set([role['slug'] for role in export['roles']]) == set(['role_ou1', 'role_ou2']) - export_response = response.click('CSV') + export_response = response.click('CSV', href='/export/') reader = csv.reader([force_text(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=',') rows = [row for row in reader] @@ -57,7 +57,7 @@ def test_manager_role_export(app, admin, ou1, role_ou1, ou2, role_ou2): assert len(export['roles']) == 1 assert export['roles'][0]['slug'] == 'role_ou1' - export_response = search_response.click('CSV') + export_response = search_response.click('CSV', href='/export/') reader = csv.reader([force_text(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=',') rows = [row for row in reader] @@ -253,3 +253,61 @@ def test_roles_displayed_fields(app, admin, ou1, ou2): ('role1', '', 'checked', None), ('role2 (LDAP)', 'role1 ', None, 'disabled'), ] + + +def test_manager_role_csv_import(app, admin, ou1, ou2): + roles_count = Role.objects.count() + resp = login(app, admin, '/manage/roles/') + + resp = resp.click('CSV import') + csv_header = b'name,slug,ou\n' + csv_content = 'Role Name,role_slug,%s' % ou1.slug + resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv') + resp.form.submit(status=302) + assert Role.objects.get(name='Role Name', slug='role_slug', ou=ou1) + assert Role.objects.count() == roles_count + 1 + + csv_content = 'Role 2,role2,\nRole 3,,%s' % ou2.slug + resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') + resp.form.submit(status=302) + assert Role.objects.get(name='Role 2', slug='role2', ou=get_default_ou()) + assert Role.objects.get(name='Role 3', slug='role-3', ou=ou2) + assert Role.objects.count() == roles_count + 3 + + # slug can be updated using name, name can be updated using slug + csv_content = 'Role two,role2,\nRole 3,role-three,%s' % ou2.slug + resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') + resp.form.submit(status=302) + assert Role.objects.get(name='Role two', slug='role2', ou=get_default_ou()) + assert Role.objects.get(name='Role 3', slug='role-three', ou=ou2) + assert Role.objects.count() == roles_count + 3 + + # conflict in auto-generated slug is handled + csv_content = 'Role!2' + resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') + resp.form.submit(status=302) + assert Role.objects.get(name='Role!2', slug='role2-1', ou=get_default_ou()) + assert Role.objects.count() == roles_count + 4 + + # Identical roles are created only once + csv_content = 'Role 4,role-4\nRole 4\nRole 4,role-4' + resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') + resp.form.submit(status=302) + assert Role.objects.get(name='Role 4', slug='role-4', ou=get_default_ou()) + assert Role.objects.count() == roles_count + 5 + + csv_content = 'xx\0xx' + resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') + resp = resp.form.submit() + assert 'Invalid file format.' in resp.text + + csv_content = ',slug-but-no-name,\nRole,,unknown-ou' + resp = app.get('/manage/roles/csv-import/') + resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') + resp = resp.form.submit() + assert 'Name is required. (line 1)' in resp.text + assert 'Organizational Unit unknown-ou does not exist. (line 2)' in resp.text + + resp = app.get('/manage/roles/csv-import/') + resp = resp.click('Download sample') + assert 'name,slug,ou' in resp.text -- 2.20.1