0003-manager-import-roles-using-CSV-24921.patch
src/authentic2/manager/forms.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import csv |
|
17 | 18 |
import hashlib |
18 | 19 |
import json |
19 | 20 |
import smtplib |
20 | 21 |
import logging |
22 |
from collections import defaultdict |
|
23 |
from io import StringIO |
|
21 | 24 | |
22 | 25 |
from django.utils.translation import ugettext_lazy as _, pgettext, ugettext |
23 | 26 |
from django import forms |
... | ... | |
33 | 36 |
from authentic2.forms.fields import NewPasswordField, CheckPasswordField, ValidatedEmailField |
34 | 37 | |
35 | 38 |
from django_rbac.models import Operation |
36 |
from django_rbac.utils import get_ou_model, get_role_model, get_permission_model |
|
39 |
from django_rbac.utils import get_ou_model, get_role_model, get_permission_model, generate_slug
|
|
37 | 40 |
from django_rbac.backends import DjangoRBACBackend |
38 | 41 | |
39 | 42 |
from authentic2.forms.profile import BaseUserForm |
... | ... | |
47 | 50 | |
48 | 51 |
User = get_user_model() |
49 | 52 |
OU = get_ou_model() |
53 |
Role = get_role_model() |
|
50 | 54 | |
51 | 55 |
logger = logging.getLogger(__name__) |
52 | 56 | |
... | ... | |
780 | 784 |
with self.user_import.meta_update as meta: |
781 | 785 |
meta['ou'] = self.cleaned_data['ou'] |
782 | 786 |
meta['encoding'] = self.cleaned_data['encoding'] |
787 | ||
788 | ||
789 |
class RolesCsvImportForm(RolesImportForm): |
|
790 |
file_field_label = _('Roles CSV File') |
|
791 | ||
792 |
def clean_import_file(self): |
|
793 |
return self.cleaned_data['import_file'] |
|
794 | ||
795 |
def clean(self): |
|
796 |
super().clean() |
|
797 | ||
798 |
content = self.cleaned_data['import_file'].read() |
|
799 |
if b'\0' in content: |
|
800 |
raise ValidationError(_('Invalid file format.')) |
|
801 | ||
802 |
for charset in ('utf-8-sig', 'iso-8859-15'): |
|
803 |
try: |
|
804 |
content = content.decode(charset) |
|
805 |
break |
|
806 |
except UnicodeDecodeError: |
|
807 |
continue |
|
808 |
# all byte-sequences are ok for iso-8859-15 so we will always reach |
|
809 |
# this line with content being a unicode string. |
|
810 | ||
811 |
try: |
|
812 |
dialect = csv.Sniffer().sniff(content) |
|
813 |
except csv.Error: |
|
814 |
dialect = None |
|
815 | ||
816 |
all_roles = Role.objects.all() |
|
817 |
roles_by_slugs = defaultdict(dict) |
|
818 |
for role in all_roles: |
|
819 |
roles_by_slugs[role.ou][role.slug] = role |
|
820 |
roles_by_names = defaultdict(dict) |
|
821 |
for role in all_roles: |
|
822 |
if role.name: |
|
823 |
roles_by_names[role.ou][role.name] = role |
|
824 | ||
825 |
self.roles = [] |
|
826 |
for i, csvline in enumerate(csv.reader(StringIO(content), dialect=dialect)): |
|
827 |
if not csvline: |
|
828 |
continue |
|
829 | ||
830 |
if i == 0 and csvline[0].strip('#').lower() in ('name', _('name')): |
|
831 |
continue |
|
832 | ||
833 |
if len(csvline) < 3: |
|
834 |
raise ValidationError(_('Invalid file format. (line %d)') % (i + 1)) |
|
835 | ||
836 |
name = slug = None |
|
837 |
ou = self.cleaned_data['ou'] |
|
838 | ||
839 |
name = csvline[0] |
|
840 |
if not name: |
|
841 |
raise ValidationError(_('Name is required. (line %d)') % (i + 1)) |
|
842 | ||
843 |
if csvline[1]: |
|
844 |
slug = csvline[1] |
|
845 |
if csvline[2]: |
|
846 |
try: |
|
847 |
ou = OU.objects.get(slug=csvline[2]) |
|
848 |
except OU.DoesNotExist: |
|
849 |
raise ValidationError( |
|
850 |
_('Organizational Unit %s does not exist. (line %d)' % (csvline[2], (i + 1))) |
|
851 |
) |
|
852 | ||
853 |
if name in roles_by_names.get(ou, {}): |
|
854 |
role = roles_by_names[ou][name] |
|
855 |
role.slug = slug or role.slug |
|
856 |
elif slug in roles_by_slugs.get(ou, {}): |
|
857 |
role = roles_by_slugs[ou][slug] |
|
858 |
role.name = name |
|
859 |
else: |
|
860 |
role = Role(name=name, slug=slug) |
|
861 | ||
862 |
if not role.slug: |
|
863 |
role.slug = generate_slug(role.name, seen_slugs=roles_by_slugs[ou]) |
|
864 | ||
865 |
roles_by_slugs[ou][role.slug] = role |
|
866 |
roles_by_names[ou][role.name] = role |
|
867 | ||
868 |
role.ou = ou |
|
869 |
self.roles.append(role) |
src/authentic2/manager/role_views.py | ||
---|---|---|
590 | 590 |
roles_import = RolesImportView.as_view() |
591 | 591 | |
592 | 592 | |
593 |
class RolesCsvImportView(views.PermissionMixin, views.TitleMixin, views.MediaMixin, views.FormNeedsRequest, FormView): |
|
594 |
form_class = forms.RolesCsvImportForm |
|
595 |
model = get_role_model() |
|
596 |
template_name = 'authentic2/manager/roles_csv_import_form.html' |
|
597 |
title = _('Roles CSV Import') |
|
598 | ||
599 |
def get_initial(self): |
|
600 |
initial = super().get_initial() |
|
601 |
search_ou = self.request.GET.get('search-ou') |
|
602 |
if search_ou: |
|
603 |
initial['ou'] = search_ou |
|
604 |
return initial |
|
605 | ||
606 |
def post(self, request, *args, **kwargs): |
|
607 |
if not self.can_add: |
|
608 |
raise PermissionDenied |
|
609 |
return super().post(request, *args, **kwargs) |
|
610 | ||
611 |
def form_valid(self, form): |
|
612 |
self.ou = form.cleaned_data['ou'] |
|
613 |
for role in form.roles: |
|
614 |
role.save() |
|
615 |
return super().form_valid(form) |
|
616 | ||
617 |
def get_success_url(self): |
|
618 |
messages.success( |
|
619 |
self.request, |
|
620 |
_('Roles have been successfully imported inside "%s" organizational unit.') % self.ou |
|
621 |
) |
|
622 |
return reverse('a2-manager-roles') + '?search-ou=%s' % self.ou.pk |
|
623 | ||
624 | ||
625 |
roles_csv_import = RolesCsvImportView.as_view() |
|
626 | ||
627 | ||
628 |
class RolesCsvImportSampleView(TemplateView): |
|
629 |
template_name = 'authentic2/manager/sample_roles.txt' |
|
630 |
content_type = 'text/csv' |
|
631 | ||
632 | ||
633 |
roles_csv_import_sample = RolesCsvImportSampleView.as_view() |
|
634 | ||
635 | ||
593 | 636 |
class RoleJournal(views.PermissionMixin, JournalViewWithContext, BaseJournalView): |
594 | 637 |
template_name = 'authentic2/manager/role_journal.html' |
595 | 638 |
permissions = ['a2_rbac.view_role'] |
src/authentic2/manager/templates/authentic2/manager/roles.html | ||
---|---|---|
19 | 19 |
<li><a download href="{% url 'a2-manager-roles-export' format="json" %}?{{ request.GET.urlencode }}">{% trans 'Export' %}</a></li> |
20 | 20 |
{% if view.can_add %} |
21 | 21 |
<li><a href="{% url 'a2-manager-roles-import' %}?{{ request.GET.urlencode }}" rel="popup">{% trans 'Import' %}</a></li> |
22 |
<li><a href="{% url 'a2-manager-roles-csv-import' %}?{{ request.GET.urlencode }}" rel="popup">{% trans 'CSV import' %}</a></li> |
|
22 | 23 |
{% endif %} |
23 | 24 |
</ul> |
24 | 25 |
</span> |
src/authentic2/manager/templates/authentic2/manager/roles_csv_import_form.html | ||
---|---|---|
1 |
{% extends "authentic2/manager/import_form.html" %} |
|
2 |
{% load i18n gadjo %} |
|
3 | ||
4 |
{% block content %} |
|
5 |
<form method="post" enctype="multipart/form-data"> |
|
6 |
{% csrf_token %} |
|
7 |
{{ form|with_template }} |
|
8 |
<p> |
|
9 |
<a href="{% url 'a2-manager-roles-csv-import-sample' %}">{% trans 'Download sample file' %}</a> |
|
10 |
</p> |
|
11 |
<div class="buttons"> |
|
12 |
<button>{% trans "Import" %}</button> |
|
13 |
<a class="cancel" href="{% url 'a2-manager-homepage' %}">{% trans 'Cancel' %}</a> |
|
14 |
</div> |
|
15 |
</form> |
|
16 |
{% endblock %} |
src/authentic2/manager/templates/authentic2/manager/sample_roles.txt | ||
---|---|---|
1 |
{% load i18n %}{% trans 'name' %},{% trans 'slug' %},{% trans 'organizational unit' %} |
|
2 |
Role Name,role_slug,ou_slug |
src/authentic2/manager/urls.py | ||
---|---|---|
98 | 98 |
name='a2-manager-roles'), |
99 | 99 |
url(r'^roles/import/$', role_views.roles_import, |
100 | 100 |
name='a2-manager-roles-import'), |
101 |
url(r'^roles/csv-import/$', role_views.roles_csv_import, |
|
102 |
name='a2-manager-roles-csv-import'), |
|
103 |
url(r'^roles/csv-import-sample/$', role_views.roles_csv_import_sample, |
|
104 |
name='a2-manager-roles-csv-import-sample'), |
|
101 | 105 |
url(r'^roles/add/$', role_views.add, |
102 | 106 |
name='a2-manager-role-add'), |
103 | 107 |
url(r'^roles/export/(?P<format>csv|json)/$', |
src/django_rbac/models.py | ||
---|---|---|
2 | 2 |
import hashlib |
3 | 3 | |
4 | 4 |
from django.utils import six |
5 |
from django.utils.text import slugify |
|
6 | 5 |
from django.utils.translation import ugettext_lazy as _ |
7 | 6 |
from django.db import models |
8 | 7 |
from django.conf import settings |
... | ... | |
52 | 51 |
def save(self, *args, **kwargs): |
53 | 52 |
# truncate slug and add a hash if it's too long |
54 | 53 |
if not self.slug: |
55 |
self.slug = slugify(six.text_type(self.name)).lstrip('_')
|
|
54 |
self.slug = utils.generate_slug(self.name)
|
|
56 | 55 |
if len(self.slug) > 256: |
57 | 56 |
self.slug = self.slug[:252] + \ |
58 | 57 |
hashlib.md5(self.slug).hexdigest()[:4] |
src/django_rbac/utils.py | ||
---|---|---|
3 | 3 |
from django.conf import settings |
4 | 4 |
from django.apps import apps |
5 | 5 |
from django.utils import six |
6 |
from django.utils.text import slugify |
|
6 | 7 | |
7 | 8 |
from . import constants |
8 | 9 | |
... | ... | |
80 | 81 |
from . import models |
81 | 82 |
operation, created = models.Operation.objects.get_or_create(slug=operation_tpl.slug) |
82 | 83 |
return operation |
84 | ||
85 | ||
86 |
def generate_slug(name, seen_slugs=None): |
|
87 |
slug = base_slug = slugify(six.text_type(name)).lstrip('_') |
|
88 |
if seen_slugs: |
|
89 |
i = 1 |
|
90 |
while slug in seen_slugs: |
|
91 |
slug = '%s-%s' % (base_slug, i) |
|
92 |
return slug |
tests/test_role_manager.py | ||
---|---|---|
253 | 253 |
('role1', '', 'checked', None), |
254 | 254 |
('role2 (LDAP)', 'role1 ', None, 'disabled'), |
255 | 255 |
] |
256 | ||
257 | ||
258 |
def test_manager_role_csv_import(app, admin, ou1, ou2): |
|
259 |
roles_count = Role.objects.count() |
|
260 |
resp = login(app, admin, '/manage/roles/') |
|
261 | ||
262 |
resp = resp.click('CSV import') |
|
263 |
csv_header = b'name,slug,ou\n' |
|
264 |
csv_content = 'Role Name,role_slug,%s' % ou1.slug |
|
265 |
resp.form['import_file'] = Upload('t.csv', csv_header + csv_content.encode(), 'text/csv') |
|
266 |
resp.form.submit(status=302) |
|
267 |
assert Role.objects.get(name='Role Name', slug='role_slug', ou=ou1) |
|
268 |
assert Role.objects.count() == roles_count + 1 |
|
269 | ||
270 |
csv_content = 'Role 2,role2,\nRole 3,,%s' % ou2.slug |
|
271 |
resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') |
|
272 |
resp.form.submit() |
|
273 |
assert Role.objects.get(name='Role 2', slug='role2', ou=get_default_ou()) |
|
274 |
assert Role.objects.get(name='Role 3', slug='role-3', ou=ou2) |
|
275 |
assert Role.objects.count() == roles_count + 3 |
|
276 | ||
277 |
# slug can be updated using name, name can be updated using slug |
|
278 |
csv_content = 'Role two,role2,\nRole 3,role-three,%s' % ou2.slug |
|
279 |
resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') |
|
280 |
resp.form.submit() |
|
281 |
assert Role.objects.get(name='Role two', slug='role2', ou=get_default_ou()) |
|
282 |
assert Role.objects.get(name='Role 3', slug='role-three', ou=ou2) |
|
283 |
assert Role.objects.count() == roles_count + 3 |
|
284 | ||
285 |
# conflict in auto-generated slug is handled |
|
286 |
csv_content = 'Role!2,,' |
|
287 |
resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') |
|
288 |
resp.form.submit() |
|
289 |
assert Role.objects.get(name='Role!2', slug='role2-1', ou=get_default_ou()) |
|
290 |
assert Role.objects.count() == roles_count + 4 |
|
291 | ||
292 |
# Identical roles are created only once |
|
293 |
csv_content = 'Role 4,role-4,\nRole 4,,\nRole 4,role-4,' |
|
294 |
resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') |
|
295 |
resp.form.submit() |
|
296 |
assert Role.objects.get(name='Role 4', slug='role-4', ou=get_default_ou()) |
|
297 |
assert Role.objects.count() == roles_count + 5 |
|
298 | ||
299 |
csv_content = 'xx\0xx' |
|
300 |
resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') |
|
301 |
resp = resp.form.submit() |
|
302 |
assert 'Invalid file format.' in resp.text |
|
303 | ||
304 |
csv_content = ',slug-but-no-name,' |
|
305 |
resp = app.get('/manage/roles/csv-import/') |
|
306 |
resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') |
|
307 |
resp = resp.form.submit() |
|
308 |
assert 'Name is required. (line 1)' in resp.text |
|
309 | ||
310 |
csv_content = ',slug-but-no-name,' |
|
311 |
resp = app.get('/manage/roles/csv-import/') |
|
312 |
resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') |
|
313 |
resp = resp.form.submit() |
|
314 |
assert 'Name is required. (line 1)' in resp.text |
|
315 | ||
316 |
csv_content = 'Role,,unknown-ou' |
|
317 |
resp = app.get('/manage/roles/csv-import/') |
|
318 |
resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') |
|
319 |
resp = resp.form.submit() |
|
320 |
assert 'Organizational Unit unknown-ou does not exist. (line 1)' in resp.text |
|
321 | ||
322 |
csv_content = 'Role,too few fields' |
|
323 |
resp = app.get('/manage/roles/csv-import/') |
|
324 |
resp.form['import_file'] = Upload('t.csv', csv_content.encode(), 'text/csv') |
|
325 |
resp = resp.form.submit() |
|
326 |
assert 'Invalid file format. (line 1)' in resp.text |
|
327 | ||
328 |
resp = app.get('/manage/roles/csv-import/') |
|
329 |
resp = resp.click('Download sample') |
|
330 |
assert 'name,slug,organizational unit' in resp.text |
|
256 |
- |