From 4abdf7bb7101bee2d6a2211ab488121f24f9e5a1 Mon Sep 17 00:00:00 2001 From: Valentin Deniaud Date: Tue, 4 Oct 2022 18:30:19 +0200 Subject: [PATCH] authenticators: add import/export (#65360) --- src/authentic2/apps/authenticators/forms.py | 13 ++ .../apps/authenticators/manager_urls.py | 2 + src/authentic2/apps/authenticators/models.py | 102 ++++++++++- .../authenticators/authenticator_detail.html | 1 + .../authenticators/authenticators.html | 4 + src/authentic2/apps/authenticators/views.py | 55 +++++- tests/test_manager_authenticators.py | 169 +++++++++++++++++- 7 files changed, 339 insertions(+), 7 deletions(-) diff --git a/src/authentic2/apps/authenticators/forms.py b/src/authentic2/apps/authenticators/forms.py index 69f47f6c5..bf0f6724b 100644 --- a/src/authentic2/apps/authenticators/forms.py +++ b/src/authentic2/apps/authenticators/forms.py @@ -14,7 +14,10 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import json + from django import forms +from django.core.exceptions import ValidationError from django.db.models import Max from django.utils.translation import ugettext as _ @@ -54,6 +57,16 @@ class AuthenticatorAddForm(SlugMixin, forms.ModelForm): return super().save() +class AuthenticatorImportForm(forms.Form): + authenticator_json = forms.FileField(label=_('Authenticator export file')) + + def clean_authenticator_json(self): + try: + return json.loads(self.cleaned_data['authenticator_json'].read().decode()) + except ValueError: + raise ValidationError(_('File is not in the expected JSON format.')) + + class LoginPasswordAuthenticatorEditForm(forms.ModelForm): class Meta: model = LoginPasswordAuthenticator diff --git a/src/authentic2/apps/authenticators/manager_urls.py b/src/authentic2/apps/authenticators/manager_urls.py index c0352f538..3a27ec862 100644 --- a/src/authentic2/apps/authenticators/manager_urls.py +++ b/src/authentic2/apps/authenticators/manager_urls.py @@ -76,6 +76,8 @@ urlpatterns = required( views.journal, name='a2-manager-authenticator-journal', ), + path('authenticators//export/', views.export_json, name='a2-manager-authenticator-export'), + path('authenticators/import/', views.import_json, name='a2-manager-authenticator-import'), path( 'authenticators/order/', views.order, diff --git a/src/authentic2/apps/authenticators/models.py b/src/authentic2/apps/authenticators/models.py index 8d67d07f5..26dad709b 100644 --- a/src/authentic2/apps/authenticators/models.py +++ b/src/authentic2/apps/authenticators/models.py @@ -18,8 +18,10 @@ import datetime import logging import uuid +from django.apps import apps from django.core.exceptions import ValidationError from django.db import models +from django.db.models import Max from django.shortcuts import render, reverse from django.utils.formats import date_format from django.utils.text import capfirst @@ -28,6 +30,7 @@ from django.utils.translation import ugettext_lazy as _ from authentic2 import views from authentic2.a2_rbac.models import Role +from authentic2.data_transfer import search_ou, search_role from authentic2.manager.utils import label_from_role from authentic2.utils.evaluate import condition_validator, evaluate_condition @@ -36,6 +39,10 @@ from .query import AuthenticatorManager logger = logging.getLogger(__name__) +class AuthenticatorImportError(Exception): + pass + + class BaseAuthenticator(models.Model): uuid = models.CharField(max_length=255, unique=True, default=uuid.uuid4, editable=False) name = models.CharField(_('Name'), blank=True, max_length=128) @@ -76,7 +83,7 @@ class BaseAuthenticator(models.Model): authenticators = AuthenticatorManager() type = '' - related_models = [] + related_models = {} related_object_form_class = None manager_view_template_name = 'authentic2/authenticators/authenticator_detail.html' unique = False @@ -142,6 +149,66 @@ class BaseAuthenticator(models.Model): return False return True + def export_json(self): + data = { + 'authenticator_type': '%s.%s' % (self._meta.app_label, self._meta.model_name), + } + + fields = [ + f for f in self._meta.get_fields() if not f.is_relation and not f.auto_created and f.editable + ] + data.update({field.name: getattr(self, field.attname) for field in fields}) + + data['ou'] = self.ou and self.ou.natural_key_json() + data['related_objects'] = [obj.export_json() for qs in self.related_models.values() for obj in qs] + + return data + + @staticmethod + def import_json(data): + def get_model_from_dict(data, key): + try: + model_name = data.pop(key) + except KeyError: + raise AuthenticatorImportError(_('Missing "%s" key.') % key) + + try: + return apps.get_model(model_name) + except LookupError: + raise AuthenticatorImportError( + _('Unknown %(key)s: %(value)s.') % {'key': key, 'value': model_name} + ) + except ValueError: + raise AuthenticatorImportError( + _('Invalid %(key)s: %(value)s.') % {'key': key, 'value': model_name} + ) + + related_objects = data.pop('related_objects', []) + + ou = data.pop('ou', None) + if ou: + data['ou'] = search_ou(ou) + if not data['ou']: + raise AuthenticatorImportError(_('Organization unit not found: %s.') % ou) + + model = get_model_from_dict(data, 'authenticator_type') + try: + slug = data.pop('slug') + except KeyError: + raise AuthenticatorImportError(_('Missing slug.')) + authenticator, created = model.objects.update_or_create(slug=slug, defaults=data) + + for obj in related_objects: + model = get_model_from_dict(obj, 'object_type') + model.import_json(obj, authenticator) + + if created: + max_order = BaseAuthenticator.objects.aggregate(max=Max('order'))['max'] or 0 + authenticator.order = max_order + 1 + authenticator.save() + + return authenticator, created + class AuthenticatorRelatedObjectBase(models.Model): authenticator = models.ForeignKey(BaseAuthenticator, on_delete=models.CASCADE) @@ -160,6 +227,21 @@ class AuthenticatorRelatedObjectBase(models.Model): def verbose_name_plural(self): return self._meta.verbose_name_plural + def export_json(self): + data = { + 'object_type': '%s.%s' % (self._meta.app_label, self._meta.model_name), + } + + fields = [ + f for f in self._meta.get_fields() if not f.is_relation and not f.auto_created and f.editable + ] + data.update({field.name: getattr(self, field.attname) for field in fields}) + return data + + @classmethod + def import_json(cls, data, authenticator): + cls.objects.update_or_create(authenticator=authenticator, **data) + class AddRoleAction(AuthenticatorRelatedObjectBase): role = models.ForeignKey(Role, verbose_name=_('Role'), on_delete=models.CASCADE) @@ -176,6 +258,24 @@ class AddRoleAction(AuthenticatorRelatedObjectBase): def __str__(self): return label_from_role(self.role) + def export_json(self): + data = super().export_json() + data['role'] = self.role.natural_key_json() + return data + + @classmethod + def import_json(cls, data, authenticator): + try: + role = data.pop('role') + except KeyError: + raise AuthenticatorImportError(_('Missing "role" key in add role action.')) + + data['role'] = search_role(role) + if not data['role']: + raise AuthenticatorImportError(_('Role not found: %s.') % role) + + super().import_json(data, authenticator) + class LoginPasswordAuthenticator(BaseAuthenticator): remember_me = models.PositiveIntegerField( diff --git a/src/authentic2/apps/authenticators/templates/authentic2/authenticators/authenticator_detail.html b/src/authentic2/apps/authenticators/templates/authentic2/authenticators/authenticator_detail.html index 38df449dd..8300fe82c 100644 --- a/src/authentic2/apps/authenticators/templates/authentic2/authenticators/authenticator_detail.html +++ b/src/authentic2/apps/authenticators/templates/authentic2/authenticators/authenticator_detail.html @@ -11,6 +11,7 @@ {% endif %} {% trans "Edit" %}
    +
  • {% trans "Export" %}
  • {% trans "Journal" %}
  • {% if not object.protected %}
  • {% trans "Delete" %}
  • diff --git a/src/authentic2/apps/authenticators/templates/authentic2/authenticators/authenticators.html b/src/authentic2/apps/authenticators/templates/authentic2/authenticators/authenticators.html index c9e81ae46..3a1884777 100644 --- a/src/authentic2/apps/authenticators/templates/authentic2/authenticators/authenticators.html +++ b/src/authentic2/apps/authenticators/templates/authentic2/authenticators/authenticators.html @@ -4,8 +4,12 @@ {% block appbar %} {{ block.super }} + {% trans "Edit order" %} {% trans "Add new authenticator" %} + {% endblock %} diff --git a/src/authentic2/apps/authenticators/views.py b/src/authentic2/apps/authenticators/views.py index cbf073d59..9affd72bb 100644 --- a/src/authentic2/apps/authenticators/views.py +++ b/src/authentic2/apps/authenticators/views.py @@ -14,11 +14,14 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import datetime +import json + from django.apps import apps from django.contrib import messages from django.core.exceptions import PermissionDenied from django.forms.models import modelform_factory -from django.http import Http404, HttpResponseRedirect +from django.http import Http404, HttpResponse, HttpResponseRedirect from django.shortcuts import get_object_or_404 from django.urls import reverse, reverse_lazy from django.utils.functional import cached_property @@ -27,12 +30,13 @@ from django.utils.translation import ugettext as _ from django.views.generic import CreateView, DeleteView, DetailView, FormView, UpdateView from django.views.generic.list import ListView -from authentic2.apps.authenticators import forms -from authentic2.apps.authenticators.models import BaseAuthenticator from authentic2.apps.journal.views import JournalViewWithContext from authentic2.manager.journal_views import BaseJournalView from authentic2.manager.views import MediaMixin, TitleMixin +from . import forms +from .models import AuthenticatorImportError, BaseAuthenticator + class AuthenticatorsMixin(MediaMixin, TitleMixin): model = BaseAuthenticator @@ -190,6 +194,51 @@ class AuthenticatorJournal(JournalViewWithContext, BaseJournalView): journal = AuthenticatorJournal.as_view() +class AuthenticatorExportView(AuthenticatorsMixin, DetailView): + def get(self, request, *args, **kwargs): + authenticator = self.get_object() + + response = HttpResponse(content_type='application/json') + today = datetime.date.today() + attachment = 'attachment; filename="export_{}_{}.json"'.format( + authenticator.slug.replace('-', '_'), today.strftime('%Y%m%d') + ) + response['Content-Disposition'] = attachment + json.dump(authenticator.export_json(), response, indent=4) + return response + + +export_json = AuthenticatorExportView.as_view() + + +class AuthenticatorImportView(AuthenticatorsMixin, FormView): + form_class = forms.AuthenticatorImportForm + template_name = 'authentic2/manager/import_form.html' + title = _('Authenticator Import') + + def form_valid(self, form): + try: + self.authenticator, created = BaseAuthenticator.import_json( + form.cleaned_data['authenticator_json'] + ) + except AuthenticatorImportError as e: + form.add_error('authenticator_json', e) + return self.form_invalid(form) + + if created: + messages.success(self.request, _('Authenticator has been created.')) + else: + messages.success(self.request, _('Authenticator has been updated.')) + + return super().form_valid(form) + + def get_success_url(self): + return reverse('a2-manager-authenticator-detail', kwargs={'pk': self.authenticator.pk}) + + +import_json = AuthenticatorImportView.as_view() + + class AuthenticatorsOrderView(AuthenticatorsMixin, FormView): template_name = 'authentic2/authenticators/authenticators_order_form.html' title = _('Configure display order') diff --git a/tests/test_manager_authenticators.py b/tests/test_manager_authenticators.py index 4774902b7..58d6fbc25 100644 --- a/tests/test_manager_authenticators.py +++ b/tests/test_manager_authenticators.py @@ -14,16 +14,19 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import json + import pytest from django import VERSION as DJ_VERSION from django.utils.html import escape +from webtest import Upload from authentic2.a2_rbac.utils import get_default_ou -from authentic2.apps.authenticators.models import BaseAuthenticator, LoginPasswordAuthenticator +from authentic2.apps.authenticators.models import AddRoleAction, BaseAuthenticator, LoginPasswordAuthenticator from authentic2.models import Attribute from authentic2_auth_fc.models import FcAuthenticator -from authentic2_auth_oidc.models import OIDCProvider -from authentic2_auth_saml.models import SAMLAuthenticator +from authentic2_auth_oidc.models import OIDCClaimMapping, OIDCProvider +from authentic2_auth_saml.models import SAMLAuthenticator, SetAttributeAction from .utils import assert_event, login, logout @@ -101,6 +104,44 @@ def test_authenticators_password(app, superuser): assert 'Password' not in resp.text +@pytest.mark.freeze_time('2022-04-19 14:00') +def test_authenticators_password_export(app, superuser): + resp = login(app, superuser, path='/manage/authenticators/') + assert LoginPasswordAuthenticator.objects.count() == 1 + + resp = resp.click('Configure') + resp = resp.click('Export') + assert resp.headers['content-type'] == 'application/json' + assert ( + resp.headers['content-disposition'] + == 'attachment; filename="export_password_authenticator_20220419.json"' + ) + + authenticator_json = json.loads(resp.text) + assert authenticator_json == { + 'authenticator_type': 'authenticators.loginpasswordauthenticator', + 'button_description': '', + 'button_label': 'Login', + 'include_ou_selector': False, + 'name': '', + 'ou': None, + 'related_objects': [], + 'remember_me': None, + 'show_condition': '', + 'slug': 'password-authenticator', + } + + resp = app.get('/manage/authenticators/') + resp = resp.click('Import') + authenticator_json['button_description'] = 'test' + resp.form['authenticator_json'] = Upload( + 'export.json', json.dumps(authenticator_json).encode(), 'application/json' + ) + resp = resp.form.submit() + assert LoginPasswordAuthenticator.objects.count() == 1 + assert LoginPasswordAuthenticator.objects.get().button_description == 'test' + + @pytest.mark.freeze_time('2022-04-19 14:00') def test_authenticators_oidc(app, superuser, ou1, ou2): resp = login(app, superuser, path='/manage/authenticators/') @@ -233,6 +274,103 @@ def test_authenticators_oidc_add_role(app, superuser, role_ou1): assert 'role_ou1' in resp.text +def test_authenticators_oidc_export(app, superuser, simple_role): + authenticator = OIDCProvider.objects.create(slug='idp1', order=42, ou=get_default_ou(), enabled=True) + OIDCClaimMapping.objects.create(authenticator=authenticator, claim='test', attribute='hop') + AddRoleAction.objects.create(authenticator=authenticator, role=simple_role) + + resp = login(app, superuser, path=authenticator.get_absolute_url()) + export_resp = resp.click('Export') + + resp = app.get('/manage/authenticators/import/') + resp.form['authenticator_json'] = Upload('export.json', export_resp.body, 'application/json') + resp = resp.form.submit() + assert '/authenticators/%s/' % authenticator.pk in resp.location + + resp = resp.follow() + assert 'Authenticator has been updated.' in resp.text + assert OIDCProvider.objects.count() == 1 + assert OIDCClaimMapping.objects.count() == 1 + assert AddRoleAction.objects.count() == 1 + + OIDCProvider.objects.all().delete() + OIDCClaimMapping.objects.all().delete() + AddRoleAction.objects.all().delete() + + resp = app.get('/manage/authenticators/import/') + resp.form['authenticator_json'] = Upload('export.json', export_resp.body, 'application/json') + resp = resp.form.submit().follow() + assert 'Authenticator has been created.' in resp.text + + authenticator = OIDCProvider.objects.get() + assert authenticator.slug == 'idp1' + assert authenticator.order == 1 + assert authenticator.ou == get_default_ou() + assert authenticator.enabled is False + assert OIDCClaimMapping.objects.filter( + authenticator=authenticator, claim='test', attribute='hop' + ).exists() + assert AddRoleAction.objects.filter(authenticator=authenticator, role=simple_role).exists() + + +def test_authenticators_oidc_import_errors(app, superuser, simple_role): + resp = login(app, superuser, path='/manage/authenticators/import/') + resp.form['authenticator_json'] = Upload('export.json', b'not-json', 'application/json') + resp = resp.form.submit() + assert 'File is not in the expected JSON format.' in resp.text + + resp.form['authenticator_json'] = Upload('export.json', b'{}', 'application/json') + resp = resp.form.submit() + assert escape('Missing "authenticator_type" key.') in resp.text + + resp.form['authenticator_json'] = Upload( + 'export.json', b'{"authenticator_type": "xxx"}', 'application/json' + ) + resp = resp.form.submit() + assert 'Invalid authenticator_type: xxx.' in resp.text + + resp.form['authenticator_json'] = Upload( + 'export.json', b'{"authenticator_type": "x.y"}', 'application/json' + ) + resp = resp.form.submit() + assert 'Unknown authenticator_type: x.y.' in resp.text + + authenticator = OIDCProvider.objects.create(slug='idp1', order=42, ou=get_default_ou(), enabled=True) + AddRoleAction.objects.create(authenticator=authenticator, role=simple_role) + + export_resp = app.get('/manage/authenticators/%s/export/' % authenticator.pk) + + export = json.loads(export_resp.text) + del export['slug'] + resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json') + resp = resp.form.submit() + assert 'Missing slug.' in resp.text + + export = json.loads(export_resp.text) + export['ou'] = {'slug': 'xxx'} + resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json') + resp = resp.form.submit() + assert escape("Organization unit not found: {'slug': 'xxx'}.") in resp.text + + export = json.loads(export_resp.text) + del export['related_objects'][0]['object_type'] + resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json') + resp = resp.form.submit() + assert escape('Missing "object_type" key.') in resp.text + + export = json.loads(export_resp.text) + del export['related_objects'][0]['role'] + resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json') + resp = resp.form.submit() + assert escape('Missing "role" key in add role action.') in resp.text + + export = json.loads(export_resp.text) + export['related_objects'][0]['role'] = {'slug': 'xxx'} + resp.form['authenticator_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json') + resp = resp.form.submit() + assert escape("Role not found: {'slug': 'xxx'}.") in resp.text + + def test_authenticators_fc(app, superuser): resp = login(app, superuser, path='/manage/authenticators/') @@ -423,6 +561,31 @@ def test_authenticators_saml_add_role(app, superuser, role_ou1, role_ou2): assert 'role_ou2' in resp.text +def test_authenticators_saml_export(app, superuser, simple_role): + authenticator = SAMLAuthenticator.objects.create(metadata='meta1.xml', slug='idp1') + SetAttributeAction.objects.create(authenticator=authenticator, user_field='test', saml_attribute='hop') + AddRoleAction.objects.create(authenticator=authenticator, role=simple_role) + + resp = login(app, superuser, path=authenticator.get_absolute_url()) + export_resp = resp.click('Export') + + SAMLAuthenticator.objects.all().delete() + SetAttributeAction.objects.all().delete() + AddRoleAction.objects.all().delete() + + resp = app.get('/manage/authenticators/import/') + resp.form['authenticator_json'] = Upload('export.json', export_resp.body, 'application/json') + resp = resp.form.submit().follow() + + authenticator = SAMLAuthenticator.objects.get() + assert authenticator.slug == 'idp1' + assert authenticator.metadata == 'meta1.xml' + assert SetAttributeAction.objects.filter( + authenticator=authenticator, user_field='test', saml_attribute='hop' + ).exists() + assert AddRoleAction.objects.filter(authenticator=authenticator, role=simple_role).exists() + + def test_authenticators_order(app, superuser): resp = login(app, superuser, path='/manage/authenticators/') -- 2.35.1