From 515ffef05263e3b4cf9f43f34e0bbc57eda1ea48 Mon Sep 17 00:00:00 2001 From: Valentin Deniaud Date: Wed, 8 Dec 2021 12:19:54 +0100 Subject: [PATCH] manager: allow importing roles from different OUs (#58826) --- src/authentic2/data_transfer.py | 11 +++++- src/authentic2/manager/forms.py | 9 +---- src/authentic2/manager/role_views.py | 18 ++++++--- tests/test_role_manager.py | 56 ++++++++++++++++++++-------- 4 files changed, 64 insertions(+), 30 deletions(-) diff --git a/src/authentic2/data_transfer.py b/src/authentic2/data_transfer.py index 4ef5d672..af1825c1 100644 --- a/src/authentic2/data_transfer.py +++ b/src/authentic2/data_transfer.py @@ -158,6 +158,7 @@ class ImportContext: role_attributes_update=True, ou_delete_orphans=False, set_ou=None, + allowed_ous=None, ): self.import_roles = import_roles self.import_ous = import_ous @@ -167,6 +168,7 @@ class ImportContext: self.role_permissions_update = role_permissions_update self.role_attributes_update = role_attributes_update self.set_ou = set_ou + self.allowed_ous = allowed_ous def wraps_validationerror(func): @@ -210,12 +212,19 @@ class RoleDeserializer: if self._import_context.set_ou: ou = self._import_context.set_ou has_ou = True - else: + elif 'ou' in self._role_d: ou_d = self._role_d['ou'] has_ou = bool(ou_d) ou = None if not has_ou else search_ou(ou_d) if has_ou and not ou: raise ValidationError(_("Can't import role because missing Organizational Unit: %s") % ou_d) + if self._import_context.allowed_ous and ou not in self._import_context.allowed_ous: + raise ValidationError( + _("Can't import role because missing permissions on Organizational Unit: %s") % ou_d + ) + else: + name = self._role_d.get('name') or self._role_d.get('slug') or self._role_d.get('uuid') + raise ValidationError(_("Missing Organizational Unit for role: %s") % name) obj = search_role(self._role_d, ou=self._import_context.set_ou) diff --git a/src/authentic2/manager/forms.py b/src/authentic2/manager/forms.py index 781df100..e1e729fa 100644 --- a/src/authentic2/manager/forms.py +++ b/src/authentic2/manager/forms.py @@ -691,15 +691,10 @@ class OusImportForm(SiteImportForm): class RolesImportForm(LimitQuerysetFormMixin, SiteImportForm): file_field_label = _('Roles Export File') - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - if utils.get_ou_count() < 2: - self.fields['ou'].widget = forms.HiddenInput() - ou = forms.ModelChoiceField( - label=_('Organizational unit'), + required=False, + label=_('Force organizational unit'), queryset=OrganizationalUnit.objects, - initial=lambda: get_default_ou().pk, ) diff --git a/src/authentic2/manager/role_views.py b/src/authentic2/manager/role_views.py index e274b41e..c5fe3444 100644 --- a/src/authentic2/manager/role_views.py +++ b/src/authentic2/manager/role_views.py @@ -664,7 +664,9 @@ class RolesImportView( def form_valid(self, form): self.ou = form.cleaned_data['ou'] try: - context = data_transfer.ImportContext(import_ous=False, set_ou=self.ou) + context = data_transfer.ImportContext( + import_ous=False, set_ou=self.ou, allowed_ous=set(form.fields['ou'].queryset) + ) with transaction.atomic(): data_transfer.import_site(form.cleaned_data['site_json'], context) except ValidationError as e: @@ -674,11 +676,15 @@ class RolesImportView( return super().form_valid(form) def get_success_url(self): - messages.success( - self.request, - _('Roles have been successfully imported inside "%s" organizational unit.') % self.ou, - ) - return reverse('a2-manager-roles') + '?search-ou=%s' % self.ou.pk + if self.ou: + message = _('Roles have been successfully imported inside "%s" organizational unit.') % self.ou + querystring = '?search-ou=%s' % self.ou.pk + else: + message = _('Roles have been successfully imported.') + querystring = '' + + messages.success(self.request, message) + return reverse('a2-manager-roles') + querystring roles_import = RolesImportView.as_view() diff --git a/tests/test_role_manager.py b/tests/test_role_manager.py index f8d5d687..717190c0 100644 --- a/tests/test_role_manager.py +++ b/tests/test_role_manager.py @@ -138,11 +138,23 @@ def test_manager_role_import(app, admin, ou1, role_ou1, ou2, role_ou2): assert not 'ous' in export Role.objects.filter(ou__in=[ou1, ou2]).delete() + # import in OUs specified in export file resp = app.get('/manage/roles/') resp = resp.click('Import') resp.form['site_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json') resp = resp.form.submit().follow() + assert Role.objects.filter(name=role_ou1.name, ou=ou1).exists() + assert Role.objects.filter(name=role_ou2.name, ou=ou2).exists() + Role.objects.filter(ou__in=[ou1, ou2]).delete() + + # import in custom OU + resp = app.get('/manage/roles/') + resp = resp.click('Import') + resp.form['site_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json') + resp.form['ou'] = get_default_ou().pk + resp = resp.form.submit().follow() + assert Role.objects.filter(name=role_ou1.name, ou=get_default_ou()).exists() assert Role.objects.filter(name=role_ou2.name, ou=get_default_ou()).exists() @@ -155,6 +167,7 @@ def test_manager_role_import(app, admin, ou1, role_ou1, ou2, role_ou2): assert new_export['roles'][0]['uuid'] == export['roles'][0]['uuid'] assert new_export['roles'][1]['uuid'] == export['roles'][1]['uuid'] + # import in custom OU while roles exist in another OU resp = app.get('/manage/roles/') resp = resp.click('Import') resp.form['site_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json') @@ -176,31 +189,23 @@ def test_manager_role_import(app, admin, ou1, role_ou1, ou2, role_ou2): resp = app.get('/manage/roles/') # unselect ou1 resp = resp.click('Import') resp.form['site_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json') + resp.form['ou'] = get_default_ou().pk resp = resp.form.submit().follow() assert not OrganizationalUnit.objects.filter(slug="should_not_exist").exists() - -def test_manager_role_import_single_ou(app, admin, simple_role): - assert OrganizationalUnit.objects.count() == 1 - response = login(app, admin, 'a2-manager-roles') - - export_response = response.click('Export') - export = export_response.json - - assert len(export['roles']) == 1 - simple_role.delete() - + # missing ou in export file + export = {"roles": [{"slug": "agent", "name": "Agent"}]} resp = app.get('/manage/roles/') resp = resp.click('Import') - assert not 'Organizational unit' in resp.text - assert resp.form['ou'].attrs['type'] == 'hidden' + resp.form['site_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json') + resp = resp.form.submit() + assert 'Missing Organizational Unit for role: Agent' in resp.text + resp.form['ou'] = get_default_ou().pk resp.form['site_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json') resp = resp.form.submit().follow() - - imported_role = Role.objects.get(slug=simple_role.slug) - assert imported_role.ou == simple_role.ou + assert Role.objects.get(slug='agent') def test_manager_role_import_selected_ou(app, admin, ou1, ou2): @@ -211,6 +216,25 @@ def test_manager_role_import_selected_ou(app, admin, ou1, ou2): assert response.pyquery.find('select#id_ou option[selected]')[0].text == 'OU2' +def test_manager_role_import_ou_permission(app, admin, ou1, role_ou1, ou2, role_ou2, admin_ou1): + resp = login(app, admin, 'a2-manager-roles') + resp = resp.click('Export') + export = resp.json + assert len(export['roles']) == 2 + role_ou1.delete() + role_ou2.delete() + app.session.flush() # logout + + resp = login(app, admin_ou1, 'a2-manager-roles') + resp = resp.click('Import') + resp.form['ou'] = '' + resp.form['site_json'] = Upload('export.json', json.dumps(export).encode(), 'application/json') + resp = resp.form.submit() + + # importing fails because user has no permission on ou2 + assert 'missing permissions on Organizational Unit' in resp.text + + def test_manager_role_add_selected_ou(app, admin, ou1, ou2): response = login(app, admin, '/manage/roles/') response = response.click('Add role') -- 2.30.2