From e3fddaffea0a19b9dbd8d677f54af484cc35e46a Mon Sep 17 00:00:00 2001 From: Emmanuel Cazenave Date: Tue, 6 Mar 2018 15:00:34 +0100 Subject: [PATCH] import_site, export_site commands (#16514) Only import and export roles. --- src/authentic2/a2_rbac/models.py | 56 +++++ src/authentic2/data_transfer.py | 257 +++++++++++++++++++ src/authentic2/management/commands/export_site.py | 25 ++ src/authentic2/management/commands/import_site.py | 31 +++ tests/test_a2_rbac.py | 105 +++++++- tests/test_data_transfer.py | 289 ++++++++++++++++++++++ tests/test_import_export_site_cmd.py | 119 +++++++++ 7 files changed, 881 insertions(+), 1 deletion(-) create mode 100644 src/authentic2/data_transfer.py create mode 100644 src/authentic2/management/commands/export_site.py create mode 100644 src/authentic2/management/commands/import_site.py create mode 100644 tests/test_data_transfer.py create mode 100644 tests/test_import_export_site_cmd.py diff --git a/src/authentic2/a2_rbac/models.py b/src/authentic2/a2_rbac/models.py index cf912b48..16b1258d 100644 --- a/src/authentic2/a2_rbac/models.py +++ b/src/authentic2/a2_rbac/models.py @@ -22,6 +22,11 @@ from authentic2.decorators import GlobalCache from . import managers, fields +SIMPLE_SERIALIZABLE_FIELDS = (models.TextField, models.CharField, models.SlugField, + models.URLField, models.BooleanField, models.IntegerField, + models.CommaSeparatedIntegerField, models.EmailField, + models.IntegerField, models.PositiveIntegerField) + class OrganizationalUnit(OrganizationalUnitAbstractBase): username_is_unique = models.BooleanField( @@ -207,6 +212,53 @@ class Role(RoleAbstractBase): 'ou__slug': self.ou.slug if self.ou else None, } + def export_json(self, attributes=False, parents=False, relations=False): + d = {} + concrete_fields = [f for f in self.__class__._meta.get_fields() + if f.concrete and not f.is_relation] + for field in concrete_fields: + if field.name == 'id': + continue + value = getattr(self, field.attname) + if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS): + d[field.name] = value + else: + raise Exception('export_json: field %s of ressource class %s is unsupported' % ( + field, self.__class__)) + + if relations: + for attr_name in ('ou', 'service'): + related_obj = getattr(self, attr_name) + if related_obj: + for related_attr in ('slug', 'uuid', 'name'): + if hasattr(related_obj, related_attr): + d['%s_%s' % (attr_name, related_attr)] = getattr( + related_obj, related_attr) + + if attributes: + for attribute in self.attributes.all(): + d.setdefault('attributes', []).append(attribute.to_json()) + + if parents: + RoleParenting = rbac_utils.get_role_parenting_model() + for parenting in RoleParenting.objects.filter(child_id=self.id, direct=True): + d.setdefault('parents', []).append(parenting.parent.export_json()) + + return d + + @classmethod + def import_json(cls, d): + kwargs = {} + concrete_fields = [f for f in cls._meta.get_fields() + if f.concrete and not f.is_relation] + for field in concrete_fields: + if field.name == 'id': + continue + if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS) and field.name in d: + kwargs[field.name] = d[field.name] + + return cls.objects.create(**kwargs) + class RoleParenting(RoleParentingAbstractBase): class Meta(RoleParentingAbstractBase.Meta): @@ -239,6 +291,10 @@ class RoleAttribute(models.Model): ('role', 'name', 'kind', 'value'), ) + def to_json(self): + return {'name': self.name, 'kind': self.kind, 'value': self.value} + + GenericRelation(Permission, content_type_field='target_ct', object_id_field='target_id').contribute_to_class(ContentType, 'admin_perms') diff --git a/src/authentic2/data_transfer.py b/src/authentic2/data_transfer.py new file mode 100644 index 00000000..eb93c411 --- /dev/null +++ b/src/authentic2/data_transfer.py @@ -0,0 +1,257 @@ +from django_rbac.utils import get_ou_model, get_role_model, get_role_parenting_model +from authentic2.a2_rbac.models import RoleAttribute +from authentic2.models import Service + + +def export_roles(role_queryset): + """ Serialize every role in queryset + """ + roles = [] + for role in role_queryset: + roles.append(role.export_json(attributes=True, parents=True, relations=True)) + return {'roles': roles} + + +class DBSearchStrategy(object): + """DB search strategy. + model_cls objects will be searched using the specified search_attrs, + one at a time. + """ + def __init__(self, model_cls, search_attrs): + self._model_cls = model_cls + self.search_attrs = search_attrs + + @property + def model_name(self): + return self._model_cls._meta.model_name + + def search(self, d): + for attr in self.search_attrs: + if attr in d: + try: + return self._model_cls.objects.get(**{attr: d[attr]}) + except self._model_cls.DoesNotExist: + pass + return None + + +class RoleRelatedObj(object): + """ + service or ou + """ + + def __init__(self, data, searcher): + self._data = data + self._searcher = searcher + self._obj = None + self._checked = False + + def __eq__(self, other): + return hash(self) == hash(other) + + def __hash__(self): + to_hash = [self.model_name] + for attr in self._searcher.search_attrs: + to_hash.extend([attr, self._data[attr]]) + return hash(tuple(to_hash)) + + def __str__(self): + res = self.model_name + for attr in self._searcher.search_attrs: + res += " <%s: %s>" % (attr, self._data[attr]) + return res + + @property + def model_name(self): + return self._searcher.model_name + + @property + def obj(self): + """ Return the matching db obj (None if there is no match) + """ + if self._checked: + return self._obj + self._obj = self._searcher.search(self._data) + self._checked = True + return self._obj + + +class ImportContext(object): + """ Holds information on how to perform the import. + + role_update : if True, an already existing role will be checked and potentially + updated to match exactly what is described in the json export. + Service and OU relation could be added, changed or deleted + RoleAttributes could be added, changed our deleted + RoleParenting could be added, changed our deleted + + role_delete_orphans: if True any existing role that is not found in the export will + be deleted + """ + + def __init__(self, role_update=False, role_delete_orphans=False): + self.role_update = role_update + self.role_delete_orphans = role_delete_orphans + + +class DataImportError(Exception): + pass + + +class MissingRelated(DataImportError): + pass + + +class RoleDeserializer(object): + + def __init__(self, d, import_context): + self._role_data = {} + self._import_context = import_context + self.ou = None + self.service = None + self.obj = None + self._role_created = False + self._role_updated = False + self._role_noops = False + self.parents = None + self.attributes = None + self._role_cls = get_role_model() + self._searcher = DBSearchStrategy(self._role_cls, ('slug', 'uuid', 'name')) + + ou_d = {} + service_d = {} + for key, value in d.items(): + if key.startswith('ou_'): + ou_d[key[3:]] = value + elif key.startswith('service_'): + service_d[key[8:]] = value + elif key == 'parents': + self.parents = value + elif key == 'attributes': + self.attributes = value + else: + self._role_data[key] = value + if ou_d: + self.ou = RoleRelatedObj( + ou_d, DBSearchStrategy(get_ou_model(), ('slug', 'uuid', 'name'))) + if service_d: + self.service = RoleRelatedObj(service_d, DBSearchStrategy(Service, ('slug', 'name'))) + + def _deserialize_status(self): + if self._role_created: + return 'created' + if self._role_updated: + return 'updated' + if self._role_noops: + return 'noops' + + def check_related(self): + missing_related = [] + for related in (self.ou, self.service): + if related is not None and related.obj is None: + missing_related.append(related) + return missing_related + + def deserialize(self): + obj = self._searcher.search(self._role_data) + if obj: # Role already exist + self._role_noops = True + self.obj = obj + if self._import_context.role_update: + # FIXME : do the job instead of crashing + # check ou and service + # check role attributes + self._role_updated = True + raise DataImportError( + "Unsupported context value for role_update : %s" % ( + self._import_context.role_update)) + else: # Create role + kwargs = self._role_data.copy() + self.obj = self._role_cls.import_json(kwargs) + + if self.ou or self.service: + if self.ou: + self.obj.ou = self.ou.obj + if self.service: + self.obj.service = self.service.obj + self.obj.save() + + # Create attributes + if self.attributes: + for attr_dict in self.attributes: + attr_dict['role'] = self.obj + RoleAttribute.objects.create(**attr_dict) + self._role_created = True + + return self.obj, self._deserialize_status() + + def parentings(self): + created, updated, deleted = [], [], [] + if self.parents: + if self._role_created: + # Create parenting + for parent_d in self.parents: + if not parent_d['slug'].startswith('_'): + parent = self._searcher.search(parent_d) + if not parent: + raise DataImportError("Could not find role : %s" % parent_d) + created.append(get_role_parenting_model().objects.create( + child=self.obj, direct=True, parent=parent)) + else: + if self._import_context.role_update: + # FIXME : update parenting instead of crashing + raise DataImportError( + "Unsupported context value for role_update : %s" % ( + self._import_context.role_update)) + + return created, updated, deleted + + +class ImportResult(object): + + def __init__(self): + self.roles = {'created': [], 'updated': [], 'noops': []} + self.parentings = {'created': [], 'updated': [], 'deleted': []} + self.missing_related = {} + + def update_roles(self, role, d_status): + self.roles[d_status].append(role) + + def update_parentings(self, created, updated, deleted): + self.parentings['created'].extend(created) + self.parentings['updated'].extend(updated) + self.parentings['deleted'].extend(deleted) + + def update_missing_related(self, related_l): + for related in related_l: + self.missing_related.setdefault(related.model_name, set()).add(related) + + @property + def success(self): + return not self.missing_related + + +def import_roles(roles_list, import_context): + result = ImportResult() + roles_ds = [RoleDeserializer(role_d, import_context) for role_d in roles_list + if not role_d['slug'].startswith('_')] + + for ds in roles_ds: + result.update_missing_related(ds.check_related()) + + if not result.success: # Some related objs are missing + return result + + for ds in roles_ds: + result.update_roles(*ds.deserialize()) + + for ds in roles_ds: + result.update_parentings(*ds.parentings()) + + if import_context.role_delete_orphans: + # FIXME : delete each role that is in DB but not in the export + raise DataImportError( + "Unsupported context value for role_delete_orphans : %s" % ( + import_context.role_delete_orphans)) + + return result diff --git a/src/authentic2/management/commands/export_site.py b/src/authentic2/management/commands/export_site.py new file mode 100644 index 00000000..56d4205d --- /dev/null +++ b/src/authentic2/management/commands/export_site.py @@ -0,0 +1,25 @@ +import json +import sys + +from django.core.management.base import BaseCommand + +from authentic2.data_transfer import export_roles +from django_rbac.utils import get_role_model + + +class Command(BaseCommand): + help = 'Export site' + + def add_arguments(self, parser): + parser.add_argument('--output', metavar='FILE', default=None, + help='name of a file to write output to') + + def handle(self, *args, **options): + if options['output']: + output, close = open(options['output'], 'w'), True + else: + output, close = sys.stdout, False + json.dump(export_roles( + get_role_model().objects.exclude(slug__startswith='_').all()), output, indent=4) + if close: + output.close() diff --git a/src/authentic2/management/commands/import_site.py b/src/authentic2/management/commands/import_site.py new file mode 100644 index 00000000..d70bf31c --- /dev/null +++ b/src/authentic2/management/commands/import_site.py @@ -0,0 +1,31 @@ +import json +import sys + +from django.core.management.base import BaseCommand +from django.db import transaction + + +from authentic2.data_transfer import import_roles, ImportContext + + +class Command(BaseCommand): + help = 'Import site' + + def add_arguments(self, parser): + parser.add_argument('filename', metavar='FILENAME', type=str, + help='name of file to import') + + def handle(self, filename, **options): + with open(filename, 'r') as f: + with transaction.atomic(): + result = import_roles(json.load(f)['roles'], ImportContext()) + if not result.success: + sys.stderr.write("Error : could not import roles due to missing related objects\n") + for relation, objs in result.missing_related.items(): + for obj in objs: + sys.stderr.write("Missing %s \n" % obj) + sys.exit(1) + + sys.stdout.write("Success\n") + for ops, roles in result.roles.items(): + sys.stdout.write("%s roles %s \n" % (len(roles), ops)) diff --git a/tests/test_a2_rbac.py b/tests/test_a2_rbac.py index f619f26f..c50c9ee1 100644 --- a/tests/test_a2_rbac.py +++ b/tests/test_a2_rbac.py @@ -1,7 +1,8 @@ import pytest +from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU, RoleAttribute from authentic2.models import Service -from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU +from authentic2.utils import get_hex_uuid def test_role_natural_key(db): @@ -24,3 +25,105 @@ def test_role_natural_key(db): Role.objects.get_by_natural_key(*r2.natural_key()) with pytest.raises(Role.DoesNotExist): Role.objects.get_by_natural_key(*r4.natural_key()) + + +def test_basic_role_export_json(db): + role = Role.objects.create( + name='basic role', slug='basic-role', description='basic role description') + role_dict = role.export_json() + assert role_dict['name'] == role.name + assert role_dict['slug'] == role.slug + assert role_dict['uuid'] == role.uuid + assert role_dict['description'] == role.description + assert role_dict['admin_scope_id'] == role.admin_scope_id + assert role_dict['external_id'] == role.external_id + + +def test_role_with_ou_export_json(db): + ou = OU.objects.create(name='ou', slug='ou') + role = Role.objects.create(name='some role', ou=ou) + role_dict = role.export_json(relations=True) + assert role_dict['ou_uuid'] == ou.uuid + assert role_dict['ou_name'] == ou.name + assert role_dict['ou_slug'] == ou.slug + + +def test_role_with_service_export_json(db): + service = Service.objects.create(name='service name', slug='service-name') + role = Role.objects.create(name='some role', service=service) + role_dict = role.export_json(relations=True) + assert role_dict['service_name'] == service.name + assert role_dict['service_slug'] == service.slug + + +def test_role_with_attributes_export_json(db): + role = Role.objects.create(name='some role') + attr1 = RoleAttribute.objects.create( + role=role, name='attr1_name', kind='string', value='attr1_value') + attr2 = RoleAttribute.objects.create( + role=role, name='attr2_name', kind='string', value='attr2_value') + + role_dict = role.export_json(attributes=True) + attributes = role_dict['attributes'] + assert len(attributes) == 2 + + expected_attr_names = set([attr1.name, attr2.name]) + for attr_dict in attributes: + assert attr_dict['name'] in expected_attr_names + expected_attr_names.remove(attr_dict['name']) + target_attr = RoleAttribute.objects.filter(name=attr_dict['name']).first() + assert attr_dict['kind'] == target_attr.kind + assert attr_dict['value'] == target_attr.value + + +def test_role_with_parents_export_json(db): + grand_parent_role = Role.objects.create( + name='test grand parent role', slug='test-grand-parent-role') + parent_1_role = Role.objects.create( + name='test parent 1 role', slug='test-parent-1-role') + parent_1_role.add_parent(grand_parent_role) + parent_2_role = Role.objects.create( + name='test parent 2 role', slug='test-parent-2-role') + parent_2_role.add_parent(grand_parent_role) + child_role = Role.objects.create( + name='test child role', slug='test-child-role') + child_role.add_parent(parent_1_role) + child_role.add_parent(parent_2_role) + + child_role_dict = child_role.export_json(parents=True) + assert child_role_dict['slug'] == child_role.slug + parents = child_role_dict['parents'] + assert len(parents) == 2 + expected_slugs = set([parent_1_role.slug, parent_2_role.slug]) + for parent in parents: + assert parent['slug'] in expected_slugs + expected_slugs.remove(parent['slug']) + + grand_parent_role_dict = grand_parent_role.export_json(parents=True) + assert grand_parent_role_dict['slug'] == grand_parent_role.slug + assert 'parents' not in grand_parent_role_dict + + parent_1_role_dict = parent_1_role.export_json(parents=True) + assert parent_1_role_dict['slug'] == parent_1_role.slug + parents = parent_1_role_dict['parents'] + assert len(parents) == 1 + assert parents[0]['slug'] == grand_parent_role.slug + + parent_2_role_dict = parent_2_role.export_json(parents=True) + assert parent_2_role_dict['slug'] == parent_2_role.slug + parents = parent_2_role_dict['parents'] + assert len(parents) == 1 + assert parents[0]['slug'] == grand_parent_role.slug + + +def test_basic_role_import_json(db): + role_dict = dict( + name='basic role', slug='basic-role', description='basic role description', + uuid=get_hex_uuid(), admin_scope_id=1, external_id='some id') + role = Role.import_json(role_dict) + assert role_dict['name'] == role.name + assert role_dict['slug'] == role.slug + assert role_dict['uuid'] == role.uuid + assert role_dict['description'] == role.description + assert role_dict['admin_scope_id'] == role.admin_scope_id + assert role_dict['external_id'] == role.external_id diff --git a/tests/test_data_transfer.py b/tests/test_data_transfer.py new file mode 100644 index 00000000..10efa50c --- /dev/null +++ b/tests/test_data_transfer.py @@ -0,0 +1,289 @@ +from django_rbac.utils import get_role_model, get_ou_model +import pytest + +from authentic2.a2_rbac.models import RoleParenting +from authentic2.data_transfer import ( + DataImportError, DBSearchStrategy, export_roles, import_roles, ImportContext, ImportResult, + RoleDeserializer, RoleRelatedObj) +from authentic2.models import Service +from authentic2.utils import get_hex_uuid + + +Role = get_role_model() +OU = get_ou_model() + + +def test_export_basic_role(db): + role = Role.objects.create(name='basic role', slug='basic-role', uuid=get_hex_uuid()) + query_set = Role.objects.filter(uuid=role.uuid) + export = export_roles(query_set) + roles = export['roles'] + assert len(roles) == 1 + role_dict = roles[0] + for key, value in role.export_json().items(): + assert role_dict[key] == value + + +def test_export_role_with_parents(db): + grand_parent_role = Role.objects.create( + name='test grand parent role', slug='test-grand-parent-role', uuid=get_hex_uuid()) + parent_1_role = Role.objects.create( + name='test parent 1 role', slug='test-parent-1-role', uuid=get_hex_uuid()) + parent_1_role.add_parent(grand_parent_role) + parent_2_role = Role.objects.create( + name='test parent 2 role', slug='test-parent-2-role', uuid=get_hex_uuid()) + parent_2_role.add_parent(grand_parent_role) + child_role = Role.objects.create( + name='test child role', slug='test-child-role', uuid=get_hex_uuid()) + child_role.add_parent(parent_1_role) + child_role.add_parent(parent_2_role) + + query_set = Role.objects.filter(slug__startswith='test').order_by('slug') + export = export_roles(query_set) + roles = export['roles'] + assert len(roles) == 4 + + child_role_dict = roles[0] + assert child_role_dict['slug'] == child_role.slug + parents = child_role_dict['parents'] + assert len(parents) == 2 + expected_slugs = set([parent_1_role.slug, parent_2_role.slug]) + for parent in parents: + assert parent['slug'] in expected_slugs + expected_slugs.remove(parent['slug']) + + grand_parent_role_dict = roles[1] + assert grand_parent_role_dict['slug'] == grand_parent_role.slug + + parent_1_role_dict = roles[2] + assert parent_1_role_dict['slug'] == parent_1_role.slug + parents = parent_1_role_dict['parents'] + assert len(parents) == 1 + assert parents[0]['slug'] == grand_parent_role.slug + + parent_2_role_dict = roles[3] + assert parent_2_role_dict['slug'] == parent_2_role.slug + parents = parent_2_role_dict['parents'] + assert len(parents) == 1 + assert parents[0]['slug'] == grand_parent_role.slug + + +def test_db_search(db): + role_search = DBSearchStrategy(Role, ('slug', 'uuid', 'name')) + + assert role_search.model_name == Role._meta.model_name + + role = Role.objects.create(slug='slug1') + assert role == role_search.search({'slug': 'slug1'}) + + role = Role.objects.create(slug='slug2') + assert role == role_search.search({'slug': 'non-matching-slug', 'uuid': role.uuid}) + + role = Role.objects.create(slug='slug3', name='some name') + assert role == role_search.search( + {'slug': 'non-matching-slug', 'uuid': get_hex_uuid(), 'name': 'some name'}) + + role = Role.objects.create(slug='slug4', name='some name') + obj = role_search.search( + {'slug': 'non-matching-slug', 'uuid': get_hex_uuid(), 'name': 'non matching name'}) + assert obj is None + + +def test_db_search_accept_missing_data(db): + role_search = DBSearchStrategy(Role, ('slug', 'uuid', 'name')) + Role.objects.create(slug='slug1') + assert role_search.search({}) is None + + +def test_role_related_obj(db): + ou = OU.objects.create(slug='some-ou', name='some ou', uuid=get_hex_uuid()) + ou_search = DBSearchStrategy(OU, ('slug', 'uuid', 'name')) + rro = RoleRelatedObj({'slug': ou.slug, 'uuid': ou.uuid, 'name': ou.name}, ou_search) + assert rro.model_name == OU._meta.model_name + assert rro.obj == ou + assert hash(rro) == hash(rro) + assert str(rro) == "%s " % (rro.model_name, ou.uuid) + + rro = RoleRelatedObj( + {'slug': 'unkown-slug', 'uuid': get_hex_uuid(), 'name': 'unkown name'}, ou_search) + assert rro.obj is None + + +def test_import_result(db): + ou_search = DBSearchStrategy(OU, ('slug', 'uuid', 'name')) + rro = RoleRelatedObj({'slug': 'ou-slug', 'uuid': 'asefdvsdvdsv', 'name': 'ou name'}, ou_search) + imp_res = ImportResult() + imp_res.update_missing_related([rro]) + assert rro in imp_res.missing_related[rro.model_name] + + +def test_role_deserializer(db): + rd = RoleDeserializer({ + 'name': 'some role', 'description': 'some role description', 'slug': 'some-role', + 'uuid': get_hex_uuid()}, ImportContext()) + assert rd.ou is None + assert rd.service is None + assert rd.parents is None + assert rd.attributes is None + assert rd.obj is None + role, status = rd.deserialize() + assert status == 'created' + assert role.name == 'some role' + assert role.description == 'some role description' + assert role.slug == 'some-role' + assert rd.obj == role + + +def test_role_deserializer_with_ou(db): + ou = get_ou_model().objects.create(name='some ou', slug='some-ou') + rd = RoleDeserializer({ + 'name': 'some role', 'description': 'some role description', 'slug': 'some-role', + 'ou_name': 'some ou', 'ou_slug': 'some-ou'}, ImportContext()) + role, status = rd.deserialize() + assert role.ou == ou + + +def test_role_deserializer_missing_ou(db): + rd = RoleDeserializer({ + 'name': 'some role', 'description': 'role description', 'slug': 'some-role', + 'ou_name': 'some ou', 'ou_slug': 'some-ou'}, ImportContext()) + missing_related = rd.check_related() + assert len(missing_related) == 1 + assert missing_related[0]._data == {'slug': 'some-ou', 'name': 'some ou'} + + +def test_role_deserializer_with_service(db): + service = Service.objects.create(name='some service', slug='some-service') + rd = RoleDeserializer({ + 'name': 'some role', 'description': 'some role description', 'slug': 'some-role', + 'service_name': service.name, 'service_slug': service.slug}, ImportContext()) + role, status = rd.deserialize() + assert role.service == service + + +def test_role_deserializer_missing_service(db): + rd = RoleDeserializer({ + 'name': 'some role', 'description': 'some role description', 'slug': 'some-role', + 'service_name': 'some service', 'service_slug': 'some-service'}, ImportContext()) + missing_related = rd.check_related() + assert len(missing_related) == 1 + assert missing_related[0]._data == {'slug': 'some-service', 'name': 'some service'} + + +def test_role_deserializer_with_attributes(db): + + attributes_data = { + 'attr1_name': dict(name='attr1_name', kind='string', value='attr1_value'), + 'attr2_name': dict(name='attr2_name', kind='string', value='attr2_value') + } + rd = RoleDeserializer({ + 'name': 'some role', 'description': 'some role description', 'slug': 'some-role', + 'attributes': list(attributes_data.values())}, ImportContext()) + role, status = rd.deserialize() + attributes = role.attributes.all() + assert len(attributes) == 2 + + for attr in attributes: + attr_dict = attributes_data[attr.name] + assert attr_dict['name'] == attr.name + assert attr_dict['kind'] == attr.kind + assert attr_dict['value'] == attr.value + del attributes_data[attr.name] + + +def test_role_deserializer_role_update(db): + role_dict = {'name': 'some role', 'slug': 'some-role'} + Role.objects.create(**role_dict) + rd = RoleDeserializer(role_dict, ImportContext(role_update=True)) + with pytest.raises(DataImportError): + rd.deserialize() + + +def test_role_deserializer_parenting_existing_parent(db): + parent_role_dict = {'name': 'grand parent role', 'slug': 'grand-parent-role'} + parent_role = Role.objects.create(**parent_role_dict) + child_role_dict = {'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict]} + + rd = RoleDeserializer(child_role_dict, ImportContext()) + child_role, status = rd.deserialize() + created, updated, deleted = rd.parentings() + + assert len(created) == 1 + parenting = created[0] + assert parenting.direct is True + assert parenting.parent == parent_role + assert parenting.child == child_role + + +def test_role_deserializer_parenting_non_existing_parent(db): + parent_role_dict = {'name': 'grand parent role', 'slug': 'grand-parent-role'} + child_role_dict = {'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict]} + rd = RoleDeserializer(child_role_dict, ImportContext()) + rd.deserialize() + with pytest.raises(DataImportError) as excinfo: + rd.parentings() + + assert "Could not find role" in str(excinfo.value) + + +def test_role_deserializer_parenting_role_update(db): + parent_role_dict = {'name': 'grand parent role', 'slug': 'grand-parent-role'} + child_role_dict = {'name': 'child role', 'slug': 'child-role'} + Role.objects.create(**child_role_dict) + child_role_dict['parents'] = [parent_role_dict] + + import_context = ImportContext(role_update=False) + rd = RoleDeserializer(child_role_dict, import_context) + rd.deserialize() + + with pytest.raises(DataImportError) as excinfo: + import_context.role_update = True + rd.parentings() + + assert "Unsupported context value" in str(excinfo.value) + + +def test_empty_roles_import(): + res = import_roles([], ImportContext()) + assert res.success + assert res.roles == {'created': [], 'updated': [], 'noops': []} + assert res.parentings == {'created': [], 'updated': [], 'deleted': []} + assert res.missing_related == {} + + +def test_import_roles(db): + parent_role_dict = {'name': 'grand parent role', 'slug': 'grand-parent-role'} + child_role_dict = {'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict]} + roles = [ + parent_role_dict, + child_role_dict + ] + + res = import_roles(roles, ImportContext()) + assert res.success is True + created_roles = res.roles['created'] + assert len(created_roles) == 2 + parent_role = Role.objects.get(**parent_role_dict) + del child_role_dict['parents'] + child_role = Role.objects.get(**child_role_dict) + assert created_roles[0] == parent_role + assert created_roles[1] == child_role + + assert len(res.parentings['created']) == 1 + assert res.parentings['created'][0] == RoleParenting.objects.get( + child=child_role, parent=parent_role, direct=True) + + +def test_roles_import_ignore_technical_role(db): + roles = [{ + 'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}] + res = import_roles(roles, ImportContext()) + assert res.success is True + assert res.roles == {'created': [], 'updated': [], 'noops': []} + + +def test_import_roles_role_delete_orphans(db, tmpdir, monkeypatch): + roles = [{ + 'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}] + with pytest.raises(DataImportError): + import_roles(roles, ImportContext(role_delete_orphans=True)) diff --git a/tests/test_import_export_site_cmd.py b/tests/test_import_export_site_cmd.py new file mode 100644 index 00000000..f8676a7c --- /dev/null +++ b/tests/test_import_export_site_cmd.py @@ -0,0 +1,119 @@ +import json +from mock import Mock + +from django.core import management +import pytest + +from django_rbac.utils import get_role_model + + +def dummy_export_roles(*args): + return {'roles': [{'name': 'role1'}]} + + +class DummyImportResult(object): + + def __init__(self, success, missing_related, roles): + self.success = success + self.missing_related = missing_related + self.roles = roles + + +def test_export_role_cmd_stdout(db, capsys, monkeypatch): + import authentic2.management.commands.export_site + monkeypatch.setattr( + authentic2.management.commands.export_site, 'export_roles', dummy_export_roles) + management.call_command('export_site') + out, err = capsys.readouterr() + assert json.loads(out) == dummy_export_roles() + + +def test_export_role_cmd_to_file(db, monkeypatch, tmpdir): + import authentic2.management.commands.export_site + monkeypatch.setattr( + authentic2.management.commands.export_site, 'export_roles', dummy_export_roles) + outfile = tmpdir.join('export.json') + management.call_command('export_site', '--output', outfile.strpath) + with outfile.open('r') as f: + assert json.loads(f.read()) == dummy_export_roles() + + +def test_import_role_cmd(db, tmpdir, monkeypatch): + export_file = tmpdir.join('roles-export.json') + with export_file.open('w'): + export_file.write(json.dumps({'roles': []})) + management.call_command('import_site', export_file.strpath) + + +def test_import_role_cmd_error_on_stderr(db, tmpdir, monkeypatch, capsys): + export_file = tmpdir.join('roles-export.json') + with export_file.open('w'): + export_file.write(json.dumps({'roles': []})) + + def error_import_roles(*args): + return DummyImportResult( + success=False, missing_related={'ou': ['ou1', 'ou2']}, + roles={'created': [], 'updated': [], 'deleted': []}) + + import authentic2.management.commands.import_site + monkeypatch.setattr( + authentic2.management.commands.import_site, 'import_roles', error_import_roles) + + def sys_exit_side_effect(arg): + return + + sys_exit = Mock(side_effect=sys_exit_side_effect) + import sys + monkeypatch.setattr(sys, 'exit', sys_exit) + + management.call_command('import_site', export_file.strpath) + + out, err = capsys.readouterr() + assert "could not import roles due to missing" in err + assert "Missing ou" in err + assert "ou1" in err + assert "ou2" in err + sys_exit.assert_called_with(1) + + +def test_import_role_cmd_infos_on_stdout(db, tmpdir, monkeypatch, capsys): + export_file = tmpdir.join('roles-export.json') + with export_file.open('w'): + export_file.write(json.dumps({'roles': []})) + + def dummy_import_roles(*args): + return DummyImportResult( + success=True, missing_related={}, + roles={'created': ['role1'], 'updated': [], 'deleted': []}) + + import authentic2.management.commands.import_site + monkeypatch.setattr( + authentic2.management.commands.import_site, 'import_roles', dummy_import_roles) + + management.call_command('import_site', export_file.strpath) + + out, err = capsys.readouterr() + assert "Success" in out + assert "1 roles created" in out + + +def test_import_role_transaction_rollback(db, tmpdir, monkeypatch, capsys): + export_file = tmpdir.join('roles-export.json') + with export_file.open('w'): + export_file.write(json.dumps({'roles': []})) + + Role = get_role_model() + + def exception_import_roles(*args): + Role.objects.create(slug='role-slug') + raise ValueError() + + import authentic2.management.commands.import_site + monkeypatch.setattr( + authentic2.management.commands.import_site, 'import_roles', exception_import_roles) + + with pytest.raises(ValueError): + management.call_command('import_site', export_file.strpath) + + with pytest.raises(Role.DoesNotExist): + Role.objects.get(slug='role-slug') -- 2.16.1