0001-manager-import-roles-using-CSV-24921.patch
src/authentic2/manager/forms.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import csv |
|
17 | 18 |
import hashlib |
18 | 19 |
import json |
19 | 20 |
import logging |
20 | 21 |
import smtplib |
22 |
from collections import defaultdict |
|
23 |
from io import StringIO |
|
21 | 24 | |
22 | 25 |
from django import forms |
23 | 26 |
from django.contrib.auth import get_user_model |
... | ... | |
40 | 43 |
) |
41 | 44 |
from django_rbac.backends import DjangoRBACBackend |
42 | 45 |
from django_rbac.models import Operation |
43 |
from django_rbac.utils import get_ou_model, get_permission_model, get_role_model |
|
46 |
from django_rbac.utils import generate_slug, get_ou_model, get_permission_model, get_role_model
|
|
44 | 47 | |
45 | 48 |
from . import app_settings, fields, utils |
46 | 49 | |
47 | 50 |
User = get_user_model() |
48 | 51 |
OU = get_ou_model() |
52 |
Role = get_role_model() |
|
49 | 53 | |
50 | 54 |
logger = logging.getLogger(__name__) |
51 | 55 | |
... | ... | |
756 | 760 |
with self.user_import.meta_update as meta: |
757 | 761 |
meta['ou'] = self.cleaned_data['ou'] |
758 | 762 |
meta['encoding'] = self.cleaned_data['encoding'] |
763 | ||
764 | ||
765 |
class RolesCsvImportForm(LimitQuerysetFormMixin, forms.Form): |
|
766 |
import_file = forms.FileField( |
|
767 |
label=_('Roles file'), |
|
768 |
required=True, |
|
769 |
help_text=_('CSV file with role name and optionnaly role slug and organizational unit.'), |
|
770 |
) |
|
771 | ||
772 |
ou = forms.ModelChoiceField( |
|
773 |
label=_('Organizational unit'), queryset=get_ou_model().objects, initial=lambda: get_default_ou().pk |
|
774 |
) |
|
775 | ||
776 |
def __init__(self, *args, **kwargs): |
|
777 |
super().__init__(*args, **kwargs) |
|
778 |
if utils.get_ou_count() < 2: |
|
779 |
self.fields['ou'].widget = forms.HiddenInput() |
|
780 | ||
781 |
def clean(self): |
|
782 |
super().clean() |
|
783 | ||
784 |
content = self.cleaned_data['import_file'].read() |
|
785 |
if b'\0' in content: |
|
786 |
raise ValidationError(_('Invalid file format.')) |
|
787 | ||
788 |
for charset in ('utf-8-sig', 'iso-8859-15'): |
|
789 |
try: |
|
790 |
content = content.decode(charset) |
|
791 |
break |
|
792 |
except UnicodeDecodeError: |
|
793 |
continue |
|
794 |
# all byte-sequences are ok for iso-8859-15 so we will always reach |
|
795 |
# this line with content being a unicode string. |
|
796 | ||
797 |
try: |
|
798 |
dialect = csv.Sniffer().sniff(content) |
|
799 |
except csv.Error: |
|
800 |
dialect = None |
|
801 | ||
802 |
all_roles = Role.objects.all() |
|
803 |
roles_by_slugs = defaultdict(dict) |
|
804 |
for role in all_roles: |
|
805 |
roles_by_slugs[role.ou][role.slug] = role |
|
806 |
roles_by_names = defaultdict(dict) |
|
807 |
for role in all_roles: |
|
808 |
if role.name: |
|
809 |
roles_by_names[role.ou][role.name] = role |
|
810 | ||
811 |
self.roles = [] |
|
812 |
for i, csvline in enumerate(csv.reader(StringIO(content), dialect=dialect, delimiter=',')): |
|
813 |
if not csvline: |
|
814 |
continue |
|
815 | ||
816 |
if i == 0: |
|
817 |
if csvline != ['name', 'slug', 'ou'][: len(csvline)]: |
|
818 |
header = ','.join(csvline) |
|
819 |
raise ValidationError(_('Invalid file header "%s", expected "name,slug,ou".') % header) |
|
820 |
continue |
|
821 | ||
822 |
name = csvline[0] |
|
823 |
if not name: |
|
824 |
self.add_line_error(_('Name is required.'), i) |
|
825 |
continue |
|
826 | ||
827 |
slug = '' |
|
828 |
if len(csvline) > 1: |
|
829 |
slug = csvline[1] |
|
830 | ||
831 |
ou = self.cleaned_data['ou'] |
|
832 |
if len(csvline) > 2 and csvline[2]: |
|
833 |
try: |
|
834 |
ou = OU.objects.get(slug=csvline[2]) |
|
835 |
except OU.DoesNotExist: |
|
836 |
self.add_line_error(_('Organizational Unit %s does not exist.') % csvline[2], i) |
|
837 |
continue |
|
838 | ||
839 |
if name in roles_by_names.get(ou, {}): |
|
840 |
role = roles_by_names[ou][name] |
|
841 |
role.slug = slug or role.slug |
|
842 |
elif slug in roles_by_slugs.get(ou, {}): |
|
843 |
role = roles_by_slugs[ou][slug] |
|
844 |
role.name = name |
|
845 |
else: |
|
846 |
role = Role(name=name, slug=slug) |
|
847 | ||
848 |
if not role.slug: |
|
849 |
role.slug = generate_slug(role.name, seen_slugs=roles_by_slugs[ou]) |
|
850 | ||
851 |
roles_by_slugs[ou][role.slug] = role |
|
852 |
roles_by_names[ou][role.name] = role |
|
853 | ||
854 |
role.ou = ou |
|
855 |
self.roles.append(role) |
|
856 | ||
857 |
def add_line_error(self, error, line): |
|
858 |
error = _('%s (line %d)') % (error, line + 1) |
|
859 |
self.add_error('import_file', error) |
src/authentic2/manager/role_views.py | ||
---|---|---|
686 | 686 |
roles_import = RolesImportView.as_view() |
687 | 687 | |
688 | 688 | |
689 |
class RolesCsvImportView( |
|
690 |
views.PermissionMixin, views.TitleMixin, views.MediaMixin, views.FormNeedsRequest, FormView |
|
691 |
): |
|
692 |
form_class = forms.RolesCsvImportForm |
|
693 |
model = get_role_model() |
|
694 |
template_name = 'authentic2/manager/roles_csv_import_form.html' |
|
695 |
title = _('Roles CSV Import') |
|
696 | ||
697 |
def get_initial(self): |
|
698 |
initial = super().get_initial() |
|
699 |
search_ou = self.request.GET.get('search-ou') |
|
700 |
if search_ou: |
|
701 |
initial['ou'] = search_ou |
|
702 |
return initial |
|
703 | ||
704 |
def post(self, request, *args, **kwargs): |
|
705 |
if not self.can_add: |
|
706 |
raise PermissionDenied |
|
707 |
return super().post(request, *args, **kwargs) |
|
708 | ||
709 |
def form_valid(self, form): |
|
710 |
self.ou = form.cleaned_data['ou'] |
|
711 |
for role in form.roles: |
|
712 |
role.save() |
|
713 |
return super().form_valid(form) |
|
714 | ||
715 |
def get_success_url(self): |
|
716 |
messages.success( |
|
717 |
self.request, |
|
718 |
_('Roles have been successfully imported inside "%s" organizational unit.') % self.ou, |
|
719 |
) |
|
720 |
return reverse('a2-manager-roles') + '?search-ou=%s' % self.ou.pk |
|
721 | ||
722 | ||
723 |
roles_csv_import = RolesCsvImportView.as_view() |
|
724 | ||
725 | ||
726 |
class RolesCsvImportSampleView(TemplateView): |
|
727 |
template_name = 'authentic2/manager/sample_roles.txt' |
|
728 |
content_type = 'text/csv' |
|
729 | ||
730 | ||
731 |
roles_csv_import_sample = RolesCsvImportSampleView.as_view() |
|
732 | ||
733 | ||
689 | 734 |
class RoleJournal(views.PermissionMixin, JournalViewWithContext, BaseJournalView): |
690 | 735 |
template_name = 'authentic2/manager/role_journal.html' |
691 | 736 |
permissions = ['a2_rbac.view_role'] |
src/authentic2/manager/templates/authentic2/manager/roles.html | ||
---|---|---|
19 | 19 |
<li><a download href="{% url 'a2-manager-roles-export' format="json" %}?{{ request.GET.urlencode }}">{% trans 'Export' %}</a></li> |
20 | 20 |
{% if view.can_add %} |
21 | 21 |
<li><a href="{% url 'a2-manager-roles-import' %}?{{ request.GET.urlencode }}" rel="popup">{% trans 'Import' %}</a></li> |
22 |
<li><a href="{% url 'a2-manager-roles-csv-import' %}?{{ request.GET.urlencode }}" rel="popup">{% trans 'CSV import' %}</a></li> |
|
22 | 23 |
{% endif %} |
23 | 24 |
</ul> |
24 | 25 |
</span> |
src/authentic2/manager/templates/authentic2/manager/roles_csv_import_form.html | ||
---|---|---|
1 |
{% extends "authentic2/manager/import_form.html" %} |
|
2 |
{% load i18n gadjo %} |
|
3 | ||
4 |
{% block content %} |
|
5 |
<form method="post" enctype="multipart/form-data"> |
|
6 |
{% csrf_token %} |
|
7 |
{{ form|with_template }} |
|
8 |
<p> |
|
9 |
<a href="{% url 'a2-manager-roles-csv-import-sample' %}">{% trans 'Download sample file' %}</a> |
|
10 |
</p> |
|
11 |
<div class="buttons"> |
|
12 |
<button>{% trans "Import" %}</button> |
|
13 |
<a class="cancel" href="{% url 'a2-manager-homepage' %}">{% trans 'Cancel' %}</a> |
|
14 |
</div> |
|
15 |
</form> |
|
16 |
{% endblock %} |
src/authentic2/manager/templates/authentic2/manager/sample_roles.txt | ||
---|---|---|
1 |
name,slug,ou |
|
2 |
Role Name,role_slug,ou_slug |
src/authentic2/manager/urls.py | ||
---|---|---|
106 | 106 |
# Authentic2 roles |
107 | 107 |
url(r'^roles/$', role_views.listing, name='a2-manager-roles'), |
108 | 108 |
url(r'^roles/import/$', role_views.roles_import, name='a2-manager-roles-import'), |
109 |
url(r'^roles/csv-import/$', role_views.roles_csv_import, name='a2-manager-roles-csv-import'), |
|
110 |
url( |
|
111 |
r'^roles/csv-import-sample/$', |
|
112 |
role_views.roles_csv_import_sample, |
|
113 |
name='a2-manager-roles-csv-import-sample', |
|
114 |
), |
|
109 | 115 |
url(r'^roles/add/$', role_views.add, name='a2-manager-role-add'), |
110 | 116 |
url(r'^roles/export/(?P<format>csv|json)/$', role_views.export, name='a2-manager-roles-export'), |
111 | 117 |
url(r'^roles/journal/$', role_views.roles_journal, name='a2-manager-roles-journal'), |
src/django_rbac/models.py | ||
---|---|---|
5 | 5 |
from django.conf import settings |
6 | 6 |
from django.db import models |
7 | 7 |
from django.db.models.query import Prefetch, Q |
8 |
from django.utils.text import slugify |
|
9 | 8 |
from django.utils.translation import ugettext_lazy as _ |
10 | 9 | |
11 | 10 |
try: |
... | ... | |
45 | 44 |
def save(self, *args, **kwargs): |
46 | 45 |
# truncate slug and add a hash if it's too long |
47 | 46 |
if not self.slug: |
48 |
self.slug = slugify(str(self.name)).lstrip('_')
|
|
47 |
self.slug = utils.generate_slug(self.name)
|
|
49 | 48 |
if len(self.slug) > 256: |
50 | 49 |
self.slug = self.slug[:252] + hashlib.md5(self.slug).hexdigest()[:4] |
51 | 50 |
if not self.uuid: |
src/django_rbac/utils.py | ||
---|---|---|
2 | 2 | |
3 | 3 |
from django.apps import apps |
4 | 4 |
from django.conf import settings |
5 |
from django.utils.text import slugify |
|
5 | 6 | |
6 | 7 |
from . import constants |
7 | 8 | |
... | ... | |
80 | 81 | |
81 | 82 |
operation, created = models.Operation.objects.get_or_create(slug=operation_tpl.slug) |
82 | 83 |
return operation |
84 | ||
85 | ||
86 |
def generate_slug(name, seen_slugs=None): |
|
87 |
slug = base_slug = slugify(name).lstrip('_') |
|
88 |
if seen_slugs: |
|
89 |
i = 1 |
|
90 |
while slug in seen_slugs: |
|
91 |
slug = '%s-%s' % (base_slug, i) |
|
92 |
return slug |
tests/test_role_manager.py | ||
---|---|---|
38 | 38 |
assert len(export['roles']) == 2 |
39 | 39 |
assert set([role['slug'] for role in export['roles']]) == set(['role_ou1', 'role_ou2']) |
40 | 40 | |
41 |
export_response = response.click('CSV') |
|
41 |
export_response = response.click('CSV', href='/export/')
|
|
42 | 42 |
reader = csv.reader( |
43 | 43 |
[force_text(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=',' |
44 | 44 |
) |
... | ... | |
59 | 59 |
assert len(export['roles']) == 1 |
60 | 60 |
assert export['roles'][0]['slug'] == 'role_ou1' |
61 | 61 | |
62 |
export_response = search_response.click('CSV') |
|
62 |
export_response = search_response.click('CSV', href='/export/')
|
|
63 | 63 |
reader = csv.reader( |
64 | 64 |
[force_text(line) for line in export_response.body.split(force_bytes('\r\n'))], delimiter=',' |
65 | 65 |
) |
... | ... | |
272 | 272 |
('role1', '', 'checked', None), |
273 | 273 |
('role2 (LDAP)', 'role1 ', None, 'disabled'), |
274 | 274 |
] |
275 | ||
276 | ||
277 |
def test_manager_role_csv_import(app, admin, ou1, ou2): |
|
278 |
roles_count = Role.objects.count() |
|
279 |
resp = login(app, admin, '/manage/roles/') |
|
280 | ||
281 |
resp = resp.click('CSV import') |
|
282 |
csv_header = b'name,slug,ou\n' |
|
283 |
csv_content = 'Role Name,role_slug,%s' % ou1.slug |
|
284 |
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv') |
|
285 |
resp.form.submit(status=302) |
|
286 |
assert Role.objects.get(name='Role Name', slug='role_slug', ou=ou1) |
|
287 |
assert Role.objects.count() == roles_count + 1 |
|
288 | ||
289 |
csv_content = 'Role 2,role2,\nRole 3,,%s' % ou2.slug |
|
290 |
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv') |
|
291 |
resp.form.submit(status=302) |
|
292 |
assert Role.objects.get(name='Role 2', slug='role2', ou=get_default_ou()) |
|
293 |
assert Role.objects.get(name='Role 3', slug='role-3', ou=ou2) |
|
294 |
assert Role.objects.count() == roles_count + 3 |
|
295 | ||
296 |
# slug can be updated using name, name can be updated using slug |
|
297 |
csv_content = 'Role two,role2,\nRole 3,role-three,%s' % ou2.slug |
|
298 |
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv') |
|
299 |
resp.form.submit(status=302) |
|
300 |
assert Role.objects.get(name='Role two', slug='role2', ou=get_default_ou()) |
|
301 |
assert Role.objects.get(name='Role 3', slug='role-three', ou=ou2) |
|
302 |
assert Role.objects.count() == roles_count + 3 |
|
303 | ||
304 |
# conflict in auto-generated slug is handled |
|
305 |
csv_header = b'name\n' |
|
306 |
csv_content = 'Role!2' |
|
307 |
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv') |
|
308 |
resp.form.submit(status=302) |
|
309 |
assert Role.objects.get(name='Role!2', slug='role2-1', ou=get_default_ou()) |
|
310 |
assert Role.objects.count() == roles_count + 4 |
|
311 | ||
312 |
# Identical roles are created only once |
|
313 |
csv_content = 'Role 4,role-4,\nRole 4,,\nRole 4,role-4,' |
|
314 |
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv') |
|
315 |
resp.form.submit(status=302) |
|
316 |
assert Role.objects.get(name='Role 4', slug='role-4', ou=get_default_ou()) |
|
317 |
assert Role.objects.count() == roles_count + 5 |
|
318 | ||
319 |
csv_content = 'xx\0xx,,' |
|
320 |
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv') |
|
321 |
resp = resp.form.submit() |
|
322 |
assert 'Invalid file format.' in resp.text |
|
323 | ||
324 |
wrong_header = b'a,b,c\n' |
|
325 |
resp.form['import_file'] = Upload('t.csv', wrong_header, 'text/csv') |
|
326 |
resp = resp.form.submit() |
|
327 |
assert 'Invalid file header' in resp.text |
|
328 | ||
329 |
csv_content = ',slug-but-no-name,\nRole,,unknown-ou' |
|
330 |
resp = app.get('/manage/roles/csv-import/') |
|
331 |
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv') |
|
332 |
resp = resp.form.submit() |
|
333 |
assert 'Name is required. (line 2)' in resp.text |
|
334 |
assert 'Organizational Unit unknown-ou does not exist. (line 3)' in resp.text |
|
335 | ||
336 |
resp = app.get('/manage/roles/csv-import/') |
|
337 |
resp = resp.click('Download sample') |
|
338 |
assert 'name,slug,ou' in resp.text |
|
275 |
- |