Projet

Général

Profil

0001-manager-import-roles-using-CSV-24921.patch

Valentin Deniaud, 31 mars 2021 17:28

Télécharger (16 ko)

Voir les différences:

Subject: [PATCH] manager: import roles using CSV (#24921)

 src/authentic2/manager/forms.py               | 103 +++++++++++++++++-
 src/authentic2/manager/role_views.py          |  45 ++++++++
 .../templates/authentic2/manager/roles.html   |   1 +
 .../manager/roles_csv_import_form.html        |  16 +++
 .../authentic2/manager/sample_roles.txt       |   2 +
 src/authentic2/manager/urls.py                |   6 +
 src/django_rbac/models.py                     |   3 +-
 src/django_rbac/utils.py                      |  10 ++
 tests/test_role_manager.py                    |  68 +++++++++++-
 9 files changed, 249 insertions(+), 5 deletions(-)
 create mode 100644 src/authentic2/manager/templates/authentic2/manager/roles_csv_import_form.html
 create mode 100644 src/authentic2/manager/templates/authentic2/manager/sample_roles.txt
src/authentic2/manager/forms.py
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import csv
17 18
import hashlib
18 19
import json
19 20
import logging
20 21
import smtplib
22
from collections import defaultdict
23
from io import StringIO
21 24

  
22 25
from django import forms
23 26
from django.contrib.auth import get_user_model
......
40 43
)
41 44
from django_rbac.backends import DjangoRBACBackend
42 45
from django_rbac.models import Operation
43
from django_rbac.utils import get_ou_model, get_permission_model, get_role_model
46
from django_rbac.utils import generate_slug, get_ou_model, get_permission_model, get_role_model
44 47

  
45 48
from . import app_settings, fields, utils
46 49

  
47 50
User = get_user_model()
48 51
OU = get_ou_model()
52
Role = get_role_model()
49 53

  
50 54
logger = logging.getLogger(__name__)
51 55

  
......
756 760
        with self.user_import.meta_update as meta:
757 761
            meta['ou'] = self.cleaned_data['ou']
758 762
            meta['encoding'] = self.cleaned_data['encoding']
763

  
764

  
765
class RolesCsvImportForm(LimitQuerysetFormMixin, forms.Form):
766
    import_file = forms.FileField(
767
        label=_('Roles file'),
768
        required=True,
769
        help_text=_('CSV file with role name and optionnaly role slug and organizational unit.'),
770
    )
771

  
772
    ou = forms.ModelChoiceField(
773
        label=_('Organizational unit'), queryset=get_ou_model().objects, initial=lambda: get_default_ou().pk
774
    )
775

  
776
    def __init__(self, *args, **kwargs):
777
        super().__init__(*args, **kwargs)
778
        if utils.get_ou_count() < 2:
779
            self.fields['ou'].widget = forms.HiddenInput()
780

  
781
    def clean(self):
782
        super().clean()
783

  
784
        content = self.cleaned_data['import_file'].read()
785
        if b'\0' in content:
786
            raise ValidationError(_('Invalid file format.'))
787

  
788
        for charset in ('utf-8-sig', 'iso-8859-15'):
789
            try:
790
                content = content.decode(charset)
791
                break
792
            except UnicodeDecodeError:
793
                continue
794
        # all byte-sequences are ok for iso-8859-15 so we will always reach
795
        # this line with content being a unicode string.
796

  
797
        try:
798
            dialect = csv.Sniffer().sniff(content)
799
        except csv.Error:
800
            dialect = None
801

  
802
        all_roles = Role.objects.all()
803
        roles_by_slugs = defaultdict(dict)
804
        for role in all_roles:
805
            roles_by_slugs[role.ou][role.slug] = role
806
        roles_by_names = defaultdict(dict)
807
        for role in all_roles:
808
            if role.name:
809
                roles_by_names[role.ou][role.name] = role
810

  
811
        self.roles = []
812
        for i, csvline in enumerate(csv.reader(StringIO(content), dialect=dialect, delimiter=',')):
813
            if not csvline:
814
                continue
815

  
816
            if i == 0:
817
                if csvline != ['name', 'slug', 'ou'][: len(csvline)]:
818
                    header = ','.join(csvline)
819
                    raise ValidationError(_('Invalid file header "%s", expected "name,slug,ou".') % header)
820
                continue
821

  
822
            name = csvline[0]
823
            if not name:
824
                self.add_line_error(_('Name is required.'), i)
825
                continue
826

  
827
            slug = ''
828
            if len(csvline) > 1:
829
                slug = csvline[1]
830

  
831
            ou = self.cleaned_data['ou']
832
            if len(csvline) > 2 and csvline[2]:
833
                try:
834
                    ou = OU.objects.get(slug=csvline[2])
835
                except OU.DoesNotExist:
836
                    self.add_line_error(_('Organizational Unit %s does not exist.') % csvline[2], i)
837
                    continue
838

  
839
            if name in roles_by_names.get(ou, {}):
840
                role = roles_by_names[ou][name]
841
                role.slug = slug or role.slug
842
            elif slug in roles_by_slugs.get(ou, {}):
843
                role = roles_by_slugs[ou][slug]
844
                role.name = name
845
            else:
846
                role = Role(name=name, slug=slug)
847

  
848
            if not role.slug:
849
                role.slug = generate_slug(role.name, seen_slugs=roles_by_slugs[ou])
850

  
851
            roles_by_slugs[ou][role.slug] = role
852
            roles_by_names[ou][role.name] = role
853

  
854
            role.ou = ou
855
            self.roles.append(role)
856

  
857
    def add_line_error(self, error, line):
858
        error = _('%s (line %d)') % (error, line + 1)
859
        self.add_error('import_file', error)
src/authentic2/manager/role_views.py
686 686
roles_import = RolesImportView.as_view()
687 687

  
688 688

  
689
class RolesCsvImportView(
690
    views.PermissionMixin, views.TitleMixin, views.MediaMixin, views.FormNeedsRequest, FormView
691
):
692
    form_class = forms.RolesCsvImportForm
693
    model = get_role_model()
694
    template_name = 'authentic2/manager/roles_csv_import_form.html'
695
    title = _('Roles CSV Import')
696

  
697
    def get_initial(self):
698
        initial = super().get_initial()
699
        search_ou = self.request.GET.get('search-ou')
700
        if search_ou:
701
            initial['ou'] = search_ou
702
        return initial
703

  
704
    def post(self, request, *args, **kwargs):
705
        if not self.can_add:
706
            raise PermissionDenied
707
        return super().post(request, *args, **kwargs)
708

  
709
    def form_valid(self, form):
710
        self.ou = form.cleaned_data['ou']
711
        for role in form.roles:
712
            role.save()
713
        return super().form_valid(form)
714

  
715
    def get_success_url(self):
716
        messages.success(
717
            self.request,
718
            _('Roles have been successfully imported inside "%s" organizational unit.') % self.ou,
719
        )
720
        return reverse('a2-manager-roles') + '?search-ou=%s' % self.ou.pk
721

  
722

  
723
roles_csv_import = RolesCsvImportView.as_view()
724

  
725

  
726
class RolesCsvImportSampleView(TemplateView):
727
    template_name = 'authentic2/manager/sample_roles.txt'
728
    content_type = 'text/csv'
729

  
730

  
731
roles_csv_import_sample = RolesCsvImportSampleView.as_view()
732

  
733

  
689 734
class RoleJournal(views.PermissionMixin, JournalViewWithContext, BaseJournalView):
690 735
    template_name = 'authentic2/manager/role_journal.html'
691 736
    permissions = ['a2_rbac.view_role']
src/authentic2/manager/templates/authentic2/manager/roles.html
19 19
    <li><a download href="{% url 'a2-manager-roles-export' format="json" %}?{{ request.GET.urlencode }}">{% trans 'Export' %}</a></li>
20 20
    {% if view.can_add %}
21 21
    <li><a href="{% url 'a2-manager-roles-import' %}?{{ request.GET.urlencode }}" rel="popup">{% trans 'Import' %}</a></li>
22
    <li><a href="{% url 'a2-manager-roles-csv-import' %}?{{ request.GET.urlencode }}" rel="popup">{% trans 'CSV import' %}</a></li>
22 23
    {% endif %}
23 24
  </ul>
24 25
  </span>
src/authentic2/manager/templates/authentic2/manager/roles_csv_import_form.html
1
{% extends "authentic2/manager/import_form.html" %}
2
{% load i18n gadjo %}
3

  
4
{% block content %}
5
<form method="post" enctype="multipart/form-data">
6
{% csrf_token %}
7
{{ form|with_template }}
8
<p>
9
<a href="{% url 'a2-manager-roles-csv-import-sample' %}">{% trans 'Download sample file' %}</a>
10
</p>
11
<div class="buttons">
12
  <button>{% trans "Import" %}</button>
13
  <a class="cancel" href="{% url 'a2-manager-homepage' %}">{% trans 'Cancel' %}</a>
14
</div>
15
</form>
16
{% endblock %}
src/authentic2/manager/templates/authentic2/manager/sample_roles.txt
1
name,slug,ou
2
Role Name,role_slug,ou_slug
src/authentic2/manager/urls.py
106 106
        # Authentic2 roles
107 107
        url(r'^roles/$', role_views.listing, name='a2-manager-roles'),
108 108
        url(r'^roles/import/$', role_views.roles_import, name='a2-manager-roles-import'),
109
        url(r'^roles/csv-import/$', role_views.roles_csv_import, name='a2-manager-roles-csv-import'),
110
        url(
111
            r'^roles/csv-import-sample/$',
112
            role_views.roles_csv_import_sample,
113
            name='a2-manager-roles-csv-import-sample',
114
        ),
109 115
        url(r'^roles/add/$', role_views.add, name='a2-manager-role-add'),
110 116
        url(r'^roles/export/(?P<format>csv|json)/$', role_views.export, name='a2-manager-roles-export'),
111 117
        url(r'^roles/journal/$', role_views.roles_journal, name='a2-manager-roles-journal'),
src/django_rbac/models.py
5 5
from django.conf import settings
6 6
from django.db import models
7 7
from django.db.models.query import Prefetch, Q
8
from django.utils.text import slugify
9 8
from django.utils.translation import ugettext_lazy as _
10 9

  
11 10
try:
......
45 44
    def save(self, *args, **kwargs):
46 45
        # truncate slug and add a hash if it's too long
47 46
        if not self.slug:
48
            self.slug = slugify(str(self.name)).lstrip('_')
47
            self.slug = utils.generate_slug(self.name)
49 48
        if len(self.slug) > 256:
50 49
            self.slug = self.slug[:252] + hashlib.md5(self.slug).hexdigest()[:4]
51 50
        if not self.uuid:
src/django_rbac/utils.py
2 2

  
3 3
from django.apps import apps
4 4
from django.conf import settings
5
from django.utils.text import slugify
5 6

  
6 7
from . import constants
7 8

  
......
80 81

  
81 82
    operation, created = models.Operation.objects.get_or_create(slug=operation_tpl.slug)
82 83
    return operation
84

  
85

  
86
def generate_slug(name, seen_slugs=None):
87
    slug = base_slug = slugify(name).lstrip('_')
88
    if seen_slugs:
89
        i = 1
90
        while slug in seen_slugs:
91
            slug = '%s-%s' % (base_slug, i)
92
    return slug
tests/test_role_manager.py
38 38
    assert len(export['roles']) == 2
39 39
    assert set([role['slug'] for role in export['roles']]) == set(['role_ou1', 'role_ou2'])
40 40

  
41
    export_response = response.click('CSV')
41
    export_response = response.click('CSV', href='/export/')
42 42
    reader = csv.reader(
43 43
        [force_text(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=','
44 44
    )
......
59 59
    assert len(export['roles']) == 1
60 60
    assert export['roles'][0]['slug'] == 'role_ou1'
61 61

  
62
    export_response = search_response.click('CSV')
62
    export_response = search_response.click('CSV', href='/export/')
63 63
    reader = csv.reader(
64 64
        [force_text(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=','
65 65
    )
......
272 272
        ('role1', '', 'checked', None),
273 273
        ('role2 (LDAP)', 'role1 ', None, 'disabled'),
274 274
    ]
275

  
276

  
277
def test_manager_role_csv_import(app, admin, ou1, ou2):
278
    roles_count = Role.objects.count()
279
    resp = login(app, admin, '/manage/roles/')
280

  
281
    resp = resp.click('CSV import')
282
    csv_header = b'name,slug,ou\n'
283
    csv_content = 'Role Name,role_slug,%s' % ou1.slug
284
    resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv')
285
    resp.form.submit(status=302)
286
    assert Role.objects.get(name='Role Name', slug='role_slug', ou=ou1)
287
    assert Role.objects.count() == roles_count + 1
288

  
289
    csv_content = 'Role 2,role2,\nRole 3,,%s' % ou2.slug
290
    resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv')
291
    resp.form.submit(status=302)
292
    assert Role.objects.get(name='Role 2', slug='role2', ou=get_default_ou())
293
    assert Role.objects.get(name='Role 3', slug='role-3', ou=ou2)
294
    assert Role.objects.count() == roles_count + 3
295

  
296
    # slug can be updated using name, name can be updated using slug
297
    csv_content = 'Role two,role2,\nRole 3,role-three,%s' % ou2.slug
298
    resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv')
299
    resp.form.submit(status=302)
300
    assert Role.objects.get(name='Role two', slug='role2', ou=get_default_ou())
301
    assert Role.objects.get(name='Role 3', slug='role-three', ou=ou2)
302
    assert Role.objects.count() == roles_count + 3
303

  
304
    # conflict in auto-generated slug is handled
305
    csv_header = b'name\n'
306
    csv_content = 'Role!2'
307
    resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv')
308
    resp.form.submit(status=302)
309
    assert Role.objects.get(name='Role!2', slug='role2-1', ou=get_default_ou())
310
    assert Role.objects.count() == roles_count + 4
311

  
312
    # Identical roles are created only once
313
    csv_content = 'Role 4,role-4,\nRole 4,,\nRole 4,role-4,'
314
    resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv')
315
    resp.form.submit(status=302)
316
    assert Role.objects.get(name='Role 4', slug='role-4', ou=get_default_ou())
317
    assert Role.objects.count() == roles_count + 5
318

  
319
    csv_content = 'xx\0xx,,'
320
    resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv')
321
    resp = resp.form.submit()
322
    assert 'Invalid file format.' in resp.text
323

  
324
    wrong_header = b'a,b,c\n'
325
    resp.form['import_file'] = Upload('t.csv', wrong_header, 'text/csv')
326
    resp = resp.form.submit()
327
    assert 'Invalid file header' in resp.text
328

  
329
    csv_content = ',slug-but-no-name,\nRole,,unknown-ou'
330
    resp = app.get('/manage/roles/csv-import/')
331
    resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv')
332
    resp = resp.form.submit()
333
    assert 'Name is required. (line 2)' in resp.text
334
    assert 'Organizational Unit unknown-ou does not exist. (line 3)' in resp.text
335

  
336
    resp = app.get('/manage/roles/csv-import/')
337
    resp = resp.click('Download sample')
338
    assert 'name,slug,ou' in resp.text
275
-