From 6ac4de61b7d235d9b0386a067b45d41cf1356658 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Tue, 18 Dec 2018 12:27:16 +0100 Subject: [PATCH 1/3] data_transfer: add export context (#29162) --- src/authentic2/data_transfer.py | 121 ++++++++++++++++++++------------ tests/test_data_transfer.py | 50 ++++++++++--- 2 files changed, 117 insertions(+), 54 deletions(-) diff --git a/src/authentic2/data_transfer.py b/src/authentic2/data_transfer.py index 090c7b33..1bc72cef 100644 --- a/src/authentic2/data_transfer.py +++ b/src/authentic2/data_transfer.py @@ -2,28 +2,52 @@ from django.contrib.contenttypes.models import ContentType from django_rbac.models import Operation from django_rbac.utils import ( - get_ou_model, get_role_model, get_role_parenting_model, get_permission_model) + get_ou_model, get_role_model, get_role_parenting_model, get_permission_model) from authentic2.a2_rbac.models import RoleAttribute from authentic2.utils import update_model -def export_site(): - return { - 'roles': export_roles(get_role_model().objects.all()), - 'ous': export_ou(get_ou_model().objects.all()) - } +class ExportContext(object): + _role_qs = None + _ou_qs = None + export_roles = None + export_ous = None + def __init__(self, role_qs=None, ou_qs=None, export_roles=True, export_ous=True): + self._role_qs = role_qs + self._ou_qs = ou_qs + self.export_roles = export_roles + self.export_ous = export_ous -def export_ou(ou_query_set): - return [ou.export_json() for ou in ou_query_set] + @property + def role_qs(self): + return self._role_qs or get_role_model().objects.all() + @property + def ou_qs(self): + return self._ou_qs or get_ou_model().objects.all() -def export_roles(role_queryset): + +def export_site(context=None): + context = context or ExportContext() + d = {} + if context.export_roles: + d['roles'] = export_roles(context) + if context.export_ous: + d['ous'] = export_ous(context) + return d + + +def export_ous(context): + return [ou.export_json() for ou in context.ou_qs] + + +def export_roles(context): """ Serialize roles in role_queryset """ return [ role.export_json(attributes=True, parents=True, permissions=True) - for role in role_queryset + for role in context.role_qs ] @@ -66,9 +90,16 @@ class ImportContext(object): """ def __init__( - self, role_delete_orphans=False, role_parentings_update=True, - role_permissions_update=True, role_attributes_update=True, + self, + import_roles=True, + import_ous=True, + role_delete_orphans=False, + role_parentings_update=True, + role_permissions_update=True, + role_attributes_update=True, ou_delete_orphans=False): + self.import_roles = import_roles + self.import_ous = import_ous self.role_delete_orphans = role_delete_orphans self.ou_delete_orphans = ou_delete_orphans self.role_parentings_update = role_parentings_update @@ -81,7 +112,6 @@ class DataImportError(Exception): class RoleDeserializer(object): - def __init__(self, d, import_context): self._import_context = import_context self._obj = None @@ -106,12 +136,12 @@ class RoleDeserializer(object): ou = None if not has_ou else search_ou(ou_d) if has_ou and not ou: raise DataImportError( - "Can't import role because missing Organizational Unit : " - "%s" % ou_d) + "Can't import role because missing Organizational Unit : " + "%s" % ou_d) kwargs = self._role_d.copy() - del kwargs['ou'] - del kwargs['service'] + kwargs.pop('ou', None) + kwargs.pop('service', None) if has_ou: kwargs['ou'] = ou @@ -243,42 +273,45 @@ def import_ou(ou_d): return ou, status -def import_site(json_d, import_context): +def import_site(json_d, import_context=None): + import_context = import_context or ImportContext() result = ImportResult() if not isinstance(json_d, dict): raise DataImportError('Export file is invalid: not a dictionnary') - for ou_d in json_d.get('ous', []): - result.update_ous(*import_ou(ou_d)) - - roles_ds = [RoleDeserializer(role_d, import_context) for role_d in json_d.get('roles', []) - if not role_d['slug'].startswith('_')] + if import_context.import_ous: + for ou_d in json_d.get('ous', []): + result.update_ous(*import_ou(ou_d)) - for ds in roles_ds: - result.update_roles(*ds.deserialize()) + if import_context.import_roles: + roles_ds = [RoleDeserializer(role_d, import_context) for role_d in json_d.get('roles', []) + if not role_d['slug'].startswith('_')] - if import_context.role_attributes_update: for ds in roles_ds: - result.update_attributes(*ds.attributes()) + result.update_roles(*ds.deserialize()) - if import_context.role_parentings_update: - for ds in roles_ds: - result.update_parentings(*ds.parentings()) + if import_context.role_attributes_update: + for ds in roles_ds: + result.update_attributes(*ds.attributes()) - if import_context.role_permissions_update: - for ds in roles_ds: - result.update_permissions(*ds.permissions()) - - if import_context.ou_delete_orphans: - raise DataImportError( - "Unsupported context value for ou_delete_orphans : %s" % ( - import_context.ou_delete_orphans)) - - if import_context.role_delete_orphans: - # FIXME : delete each role that is in DB but not in the export - raise DataImportError( - "Unsupported context value for role_delete_orphans : %s" % ( - import_context.role_delete_orphans)) + if import_context.role_parentings_update: + for ds in roles_ds: + result.update_parentings(*ds.parentings()) + + if import_context.role_permissions_update: + for ds in roles_ds: + result.update_permissions(*ds.permissions()) + + if import_context.ou_delete_orphans: + raise DataImportError( + "Unsupported context value for ou_delete_orphans : %s" % ( + import_context.ou_delete_orphans)) + + if import_context.role_delete_orphans: + # FIXME : delete each role that is in DB but not in the export + raise DataImportError( + "Unsupported context value for role_delete_orphans : %s" % ( + import_context.role_delete_orphans)) return result diff --git a/tests/test_data_transfer.py b/tests/test_data_transfer.py index dc073ea4..b43746b0 100644 --- a/tests/test_data_transfer.py +++ b/tests/test_data_transfer.py @@ -1,13 +1,17 @@ -import json - from django_rbac.utils import get_role_model, get_ou_model -import py import pytest from authentic2.a2_rbac.models import RoleParenting from authentic2.data_transfer import ( - DataImportError, export_roles, import_site, export_ou, ImportContext, - RoleDeserializer, search_role, import_ou) + ExportContext, + DataImportError, + export_roles, + import_site, + export_ous, + ImportContext, + RoleDeserializer, + search_role, + import_ou) from authentic2.utils import get_hex_uuid @@ -18,7 +22,7 @@ OU = get_ou_model() def test_export_basic_role(db): role = Role.objects.create(name='basic role', slug='basic-role', uuid=get_hex_uuid()) query_set = Role.objects.filter(uuid=role.uuid) - roles = export_roles(query_set) + roles = export_roles(ExportContext(role_qs=query_set)) assert len(roles) == 1 role_dict = roles[0] for key, value in role.export_json().items(): @@ -40,7 +44,7 @@ def test_export_role_with_parents(db): child_role.add_parent(parent_2_role) query_set = Role.objects.filter(slug__startswith='test').order_by('slug') - roles = export_roles(query_set) + roles = export_roles(ExportContext(role_qs=query_set)) assert len(roles) == 4 child_role_dict = roles[0] @@ -68,9 +72,9 @@ def test_export_role_with_parents(db): assert parents[0]['slug'] == grand_parent_role.slug -def test_export_ou(db): +def test_export_ous(db): ou = OU.objects.create(name='ou name', slug='ou-slug', description='ou description') - ous = export_ou(OU.objects.filter(name='ou name')) + ous = export_ous(ExportContext(ou_qs=OU.objects.filter(name='ou name'))) assert len(ous) == 1 ou_d = ous[0] assert ou_d['name'] == ou.name @@ -137,7 +141,7 @@ def test_role_deserializer_missing_ou(db): rd = RoleDeserializer({ 'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'role description', 'slug': 'some-role', 'ou': {'slug': 'some-ou'}, 'service': None}, - ImportContext()) + ImportContext()) with pytest.raises(DataImportError): rd.deserialize() @@ -469,3 +473,29 @@ def test_import_ou_already_existing(db): assert len(res.ous['created']) == 0 assert num_ous == OU.objects.count() assert ou == OU.objects.get(uuid=uuid) + + +def test_import_context_flags(db): + ous = [{'uuid': get_hex_uuid(), 'slug': 'ou-slug', 'name': 'ou name'}] + roles = [{ + 'name': 'other role', + 'slug': 'other-role-slug', + 'uuid': get_hex_uuid(), + 'ou': {'slug': 'ou-slug'}, + }] + d = {'ous': ous, 'roles': roles} + import_site(d, ImportContext(import_roles=False, import_ous=False)) + assert Role.objects.exclude(slug__startswith='_').count() == 0 + assert OU.objects.exclude(slug='default').count() == 0 + with pytest.raises(DataImportError) as e: + import_site(d, ImportContext(import_roles=True, import_ous=False)) + assert 'missing Organizational' in e.value.args[0] + assert Role.objects.exclude(slug__startswith='_').count() == 0 + assert OU.objects.exclude(slug='default').count() == 0 + import_site(d, ImportContext(import_roles=False, import_ous=True)) + assert Role.objects.exclude(slug__startswith='_').count() == 0 + assert OU.objects.exclude(slug='default').count() == 1 + import_site(d, ImportContext(import_roles=True, import_ous=True)) + assert Role.objects.exclude(slug__startswith='_').count() == 1 + assert OU.objects.exclude(slug='default').count() == 1 + -- 2.18.0