From 89c7c3d8c75ea7a2683c0f1dcd495247fc5d20f8 Mon Sep 17 00:00:00 2001 From: Emmanuel Cazenave Date: Tue, 3 Apr 2018 10:21:36 +0200 Subject: [PATCH] create 'import_site' and 'export_site' commands (#16514) Handle only OU and Role. --- src/authentic2/a2_rbac/models.py | 90 ++++++ src/authentic2/data_transfer.py | 294 +++++++++++++++++ src/authentic2/management/commands/export_site.py | 24 ++ src/authentic2/management/commands/import_site.py | 117 +++++++ tests/test_a2_rbac.py | 167 +++++++++- tests/test_data_transfer.py | 378 ++++++++++++++++++++++ tests/test_import_export_site_cmd.py | 153 +++++++++ 7 files changed, 1222 insertions(+), 1 deletion(-) create mode 100644 src/authentic2/data_transfer.py create mode 100644 src/authentic2/management/commands/export_site.py create mode 100644 src/authentic2/management/commands/import_site.py create mode 100644 tests/test_data_transfer.py create mode 100644 tests/test_import_export_site_cmd.py diff --git a/src/authentic2/a2_rbac/models.py b/src/authentic2/a2_rbac/models.py index cf912b48..db25fe24 100644 --- a/src/authentic2/a2_rbac/models.py +++ b/src/authentic2/a2_rbac/models.py @@ -22,6 +22,11 @@ from authentic2.decorators import GlobalCache from . import managers, fields +SIMPLE_SERIALIZABLE_FIELDS = (models.TextField, models.CharField, models.SlugField, + models.URLField, models.BooleanField, fields.UniqueBooleanField, + models.IntegerField, models.CommaSeparatedIntegerField, + models.EmailField, models.IntegerField, models.PositiveIntegerField) + class OrganizationalUnit(OrganizationalUnitAbstractBase): username_is_unique = models.BooleanField( @@ -92,6 +97,31 @@ class OrganizationalUnit(OrganizationalUnitAbstractBase): def cached(cls): return cls.objects.all() + def export_json(self): + d = {} + concrete_fields = [f for f in self.__class__._meta.get_fields() + if f.concrete and not f.is_relation] + for field in concrete_fields: + if field.name == 'id': + continue + value = getattr(self, field.attname) + if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS): + d[field.name] = value + else: + raise Exception('export_json: field %s of ressource class %s is unsupported' % ( + field, self.__class__)) + return d + + @classmethod + def import_json(cls, d, to_update=None): + if to_update is None: + return cls.objects.create(**d) + else: + for attr, value in d.items(): + setattr(to_update, attr, value) + to_update.save() + return to_update + class Permission(PermissionAbstractBase): class Meta: @@ -207,6 +237,62 @@ class Role(RoleAbstractBase): 'ou__slug': self.ou.slug if self.ou else None, } + def export_json(self, attributes=False, parents=False, permissions=False): + d = {} + concrete_fields = [f for f in self.__class__._meta.get_fields() + if f.concrete and not f.is_relation] + for field in concrete_fields: + if field.name == 'id': + continue + value = getattr(self, field.attname) + if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS): + d[field.name] = value + else: + raise Exception('export_json: field %s of ressource class %s is unsupported' % ( + field, self.__class__)) + + d['ou'] = None + if self.ou: + d['ou'] = {'uuid': self.ou.uuid, 'slug': self.ou.slug, 'name': self.ou.name} + d['service'] = None + if self.service: + d['service'] = {'slug': self.service.slug, 'name': self.service.name, 'ou': None} + if self.service.ou: + d['service']['ou'] = {'slug': self.service.ou.slug} + + if attributes: + for attribute in self.attributes.all(): + d.setdefault('attributes', []).append(attribute.to_json()) + + if parents: + RoleParenting = rbac_utils.get_role_parenting_model() + for parenting in RoleParenting.objects.filter(child_id=self.id, direct=True): + d.setdefault('parents', []).append(parenting.parent.export_json()) + + if permissions: + for perm in self.permissions.all(): + d.setdefault('permissions', []).append(perm.natural_key()) + + return d + + @classmethod + def import_json(cls, d, to_update=None): + kwargs = {} + concrete_fields = [f for f in cls._meta.get_fields() + if f.concrete and not f.is_relation] + for field in concrete_fields: + if field.name == 'id': + continue + if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS) and field.name in d: + kwargs[field.name] = d[field.name] + if to_update is None: + return cls.objects.create(**kwargs) + else: + for attr, value in kwargs.items(): + setattr(to_update, attr, value) + to_update.save() + return to_update + class RoleParenting(RoleParentingAbstractBase): class Meta(RoleParentingAbstractBase.Meta): @@ -239,6 +325,10 @@ class RoleAttribute(models.Model): ('role', 'name', 'kind', 'value'), ) + def to_json(self): + return {'name': self.name, 'kind': self.kind, 'value': self.value} + + GenericRelation(Permission, content_type_field='target_ct', object_id_field='target_id').contribute_to_class(ContentType, 'admin_perms') diff --git a/src/authentic2/data_transfer.py b/src/authentic2/data_transfer.py new file mode 100644 index 00000000..155cea56 --- /dev/null +++ b/src/authentic2/data_transfer.py @@ -0,0 +1,294 @@ +from django.contrib.contenttypes.models import ContentType + +from django_rbac.models import Operation +from django_rbac.utils import ( + get_ou_model, get_role_model, get_role_parenting_model, get_permission_model) +from authentic2.a2_rbac.models import RoleAttribute + + +def export_site(): + return { + 'roles': export_roles(get_role_model().objects.all()), + 'ous': export_ou(get_ou_model().objects.all()) + } + + +def export_ou(ou_query_set): + return [ou.export_json() for ou in ou_query_set] + + +def export_roles(role_queryset): + """ Serialize roles in role_queryset + """ + return [role.export_json(attributes=True, parents=True) for role in role_queryset] + + +def build_ou_natural_key(d): + return None if d['ou'] is None else [d['ou']['slug']] + + +def build_role_natural_key(role_d): + role_slug = role_d['slug'] + ou_nk = build_ou_natural_key(role_d) + service_nk = None if role_d['service'] is None else [ + build_ou_natural_key(role_d['service']), role_d['service']['slug']] + return [role_slug, ou_nk, service_nk] + + +def search_ou(ou_natural_key): + """ ou_natural_key: ['ou-slug'] or None + """ + if ou_natural_key: + try: + OU = get_ou_model() + return OU.objects.get_by_natural_key(*ou_natural_key) + except OU.DoesNotExist: + pass + return None + + +def search_role(role_d): + Role = get_role_model() + try: + return Role.objects.get(uuid=role_d['uuid']) + except Role.DoesNotExist: + pass + try: + return Role.objects.get_by_natural_key(*build_role_natural_key(role_d)) + except Role.DoesNotExist: + return None + + +class ImportContext(object): + """ Holds information on how to perform the import. + + ou_delete_orphans: if True any existing ou that is not found in the export will + be deleted + + role_delete_orphans: if True any existing role that is not found in the export will + be deleted + + + role_attributes_update: for each role in the import data, + attributes will deleted and re-created + + + role_parentings_update: for each role in the import data, + parentings will deleted and re-created + + role_permissions_update: for each role in the import data, + permissions will deleted and re-created + """ + + def __init__( + self, role_delete_orphans=False, role_parentings_update=True, + role_permissions_update=True, role_attributes_update=True, + ou_delete_orphans=False): + self.role_delete_orphans = role_delete_orphans + self.ou_delete_orphans = ou_delete_orphans + self.role_parentings_update = role_parentings_update + self.role_permissions_update = role_permissions_update + self.role_attributes_update = role_attributes_update + + +class DataImportError(Exception): + pass + + +class RoleDeserializer(object): + + def __init__(self, d, import_context): + self._import_context = import_context + self._obj = None + self._ou = None + self._parents = None + self._attributes = None + self._permissions = None + + self._role_d = dict() + for key, value in d.items(): + if key == 'parents': + self._parents = value + elif key == 'attributes': + self._attributes = value + elif key == 'permissions': + self._permissions = value + else: + self._role_d[key] = value + + ou_natural_key = build_ou_natural_key(self._role_d) + if ou_natural_key: + self._ou = search_ou(ou_natural_key) + if self._ou is None: + raise DataImportError( + "Can't import role because missing Organizational Unit : " + "%s" % ou_natural_key[0]) + + def deserialize(self): + kwargs = self._role_d.copy() + obj = search_role(self._role_d) + if obj: # Role already exist + self._obj = obj + status = 'updated' + get_role_model().import_json(kwargs, self._obj) + if self._ou and (self._obj.ou != self._ou): + # Need to update ou + self._obj.ou = self._ou + self._obj.save() + else: # Create role + self._obj = get_role_model().import_json(kwargs) + if self._ou: + self._obj.ou = self._ou + self._obj.save() + status = 'created' + + # Ensure admin role is created + self._obj.get_admin_role() + return self._obj, status + + def attributes(self): + """ Update attributes (delete everything then create) + """ + created, deleted = [], [] + for attr in self._obj.attributes.all(): + attr.delete() + deleted.append(attr) + # Create attributes + if self._attributes: + for attr_dict in self._attributes: + attr_dict['role'] = self._obj + created.append(RoleAttribute.objects.create(**attr_dict)) + + return created, deleted + + def parentings(self): + """ Update parentings (delete everything then create) + """ + created, deleted = [], [] + Parenting = get_role_parenting_model() + for parenting in Parenting.objects.filter(child=self._obj, direct=True): + parenting.delete() + deleted.append(parenting) + + if self._parents: + for parent_d in self._parents: + parent = search_role(parent_d) + if not parent: + raise DataImportError("Could not find role : %s" % parent_d) + created.append(Parenting.objects.create( + child=self._obj, direct=True, parent=parent)) + + return created, deleted + + def permissions(self): + """ Update permissions (delete everything then create) + """ + created, deleted = [], [] + for perm in self._obj.permissions.all(): + perm.delete() + deleted.append(perm) + self._obj.permissions.clear() + if self._permissions: + for perm in self._permissions: + op = Operation.objects.get_by_natural_key(perm[0]) + ou = get_ou_model().objects.get_by_natural_key(perm[1]) if perm[1] else None + ct = ContentType.objects.get_by_natural_key(*perm[2]) + target = ct.model_class().objects.get_by_natural_key(*perm[3]) + perm = get_permission_model().objects.create( + operation=op, ou=ou, target_ct=ct, target_id=target.pk) + self._obj.permissions.add(perm) + created.append(perm) + + return created, deleted + + +class ImportResult(object): + + def __init__(self): + self.roles = {'created': [], 'updated': []} + self.ous = {'created': [], 'updated': []} + self.attributes = {'created': [], 'deleted': []} + self.parentings = {'created': [], 'deleted': []} + self.permissions = {'created': [], 'deleted': []} + + def update_roles(self, role, d_status): + self.roles[d_status].append(role) + + def update_ous(self, ou, status): + self.ous[status].append(ou) + + def _bulk_update(self, attrname, created, deleted): + attr = getattr(self, attrname) + attr['created'].extend(created) + attr['deleted'].extend(deleted) + + def update_attributes(self, created, deleted): + self._bulk_update('attributes', created, deleted) + + def update_parentings(self, created, deleted): + self._bulk_update('parentings', created, deleted) + + def update_permissions(self, created, deleted): + self._bulk_update('permissions', created, deleted) + + def to_str(self, verbose=False): + res = "" + for attr in ('roles', 'ous', 'parentings', 'permissions', 'attributes'): + data = getattr(self, attr) + for status in ('created', 'updated', 'deleted'): + if status in data: + s_data = data[status] + res += "%s %s %s\n" % (len(s_data), attr, status) + return res + + +def import_ou(ou_d): + OU = get_ou_model() + ou = search_ou([ou_d['slug']]) + if ou is None: + ou = OU.import_json(ou_d) + status = 'created' + else: + OU.import_json(ou_d, ou) + status = 'updated' + # Ensure admin role is created + ou.get_admin_role() + return ou, status + + +def import_site(json_d, import_context): + result = ImportResult() + + for ou_d in json_d.get('ous', []): + result.update_ous(*import_ou(ou_d)) + + roles_ds = [RoleDeserializer(role_d, import_context) for role_d in json_d.get('roles', []) + if not role_d['slug'].startswith('_')] + + for ds in roles_ds: + result.update_roles(*ds.deserialize()) + + if import_context.role_attributes_update: + for ds in roles_ds: + result.update_attributes(*ds.attributes()) + + if import_context.role_parentings_update: + for ds in roles_ds: + result.update_parentings(*ds.parentings()) + + if import_context.role_permissions_update: + for ds in roles_ds: + result.update_permissions(*ds.permissions()) + + if import_context.ou_delete_orphans: + raise DataImportError( + "Unsupported context value for ou_delete_orphans : %s" % ( + import_context.ou_delete_orphans)) + + if import_context.role_delete_orphans: + # FIXME : delete each role that is in DB but not in the export + raise DataImportError( + "Unsupported context value for role_delete_orphans : %s" % ( + import_context.role_delete_orphans)) + + return result diff --git a/src/authentic2/management/commands/export_site.py b/src/authentic2/management/commands/export_site.py new file mode 100644 index 00000000..58c6ddda --- /dev/null +++ b/src/authentic2/management/commands/export_site.py @@ -0,0 +1,24 @@ +import json +import sys + +from django.core.management.base import BaseCommand + +from authentic2.data_transfer import export_site +from django_rbac.utils import get_role_model + + +class Command(BaseCommand): + help = 'Export site' + + def add_arguments(self, parser): + parser.add_argument('--output', metavar='FILE', default=None, + help='name of a file to write output to') + + def handle(self, *args, **options): + if options['output']: + output, close = open(options['output'], 'w'), True + else: + output, close = sys.stdout, False + json.dump(export_site(), output, indent=4) + if close: + output.close() diff --git a/src/authentic2/management/commands/import_site.py b/src/authentic2/management/commands/import_site.py new file mode 100644 index 00000000..9d4f755a --- /dev/null +++ b/src/authentic2/management/commands/import_site.py @@ -0,0 +1,117 @@ +import contextlib +import json +import sys + +from django.conf import settings +from django.core.management.base import BaseCommand +from django.db import transaction +from django.utils import translation + +from authentic2.data_transfer import import_site, ImportContext + + +class DryRunException(Exception): + pass + + +# Borrowed from http://code.activestate.com/recipes/577058/ +def query_yes_no(question, default="yes"): + """Ask a yes/no question via raw_input() and return their answer. + + "question" is a string that is presented to the user. + "default" is the presumed answer if the user just hits . + It must be "yes" (the default), "no" or None (meaning + an answer is required of the user). + + The "answer" return value is True for "yes" or False for "no". + """ + valid = {"yes": True, "y": True, "ye": True, + "no": False, "n": False} + if default is None: + prompt = " [y/n] " + elif default == "yes": + prompt = " [Y/n] " + elif default == "no": + prompt = " [y/N] " + else: + raise ValueError("invalid default answer: '%s'" % default) + + while True: + sys.stdout.write(question + prompt) + choice = raw_input().lower() + if default is not None and choice == '': + return valid[default] + elif choice in valid: + return valid[choice] + else: + sys.stdout.write("Please respond with 'yes' or 'no' " + "(or 'y' or 'n').\n") + + +def create_context_args(options): + kwargs = {} + if options['option']: + for context_op in options['option']: + context_op = context_op.replace('-', '_') + if context_op.startswith('no_'): + kwargs[context_op[3:]] = False + else: + kwargs[context_op] = True + return kwargs + + +# Borrowed from https://bugs.python.org/issue10049#msg118599 +@contextlib.contextmanager +def provision_contextm(dry_run): + multitenant = False + try: + import hobo.agent.authentic2 + multitenant = True + except ImportError: + pass + if dry_run and multitenant: + with hobo.agent.authentic2.provisionning.Provisionning(): + yield + else: + yield + + +class Command(BaseCommand): + help = 'Import site' + + def add_arguments(self, parser): + parser.add_argument( + 'filename', metavar='FILENAME', type=str, help='name of file to import') + parser.add_argument( + '--dry-run', action='store_true', dest='dry_run', help='Really perform the import') + parser.add_argument( + '-y', '--yes', action='store_true', dest='skip_confirm', + help='Skip confirmation prompt') + parser.add_argument( + '-o', '--option', action='append', help='Import context options', + choices=[ + 'role-delete-orphans', 'ou-delete-orphans', 'no-role-permissions-update', + 'no-role-attributes-update', 'no-role-parentings-update']) + + def handle(self, filename, **options): + translation.activate(settings.LANGUAGE_CODE) + dry_run = options['dry_run'] + skip_confirm = options['skip_confirm'] + if not (dry_run or skip_confirm): + if not query_yes_no("Do you really want to perform the import ?"): + sys.exit() + msg = "Dry run\n" if dry_run else "Real run\n" + c_kwargs = create_context_args(options) + try: + with open(filename, 'r') as f: + with provision_contextm(dry_run): + with transaction.atomic(): + sys.stdout.write(msg) + result = import_site(json.load(f), ImportContext(**c_kwargs)) + if dry_run: + raise DryRunException() + except DryRunException: + pass + sys.stdout.write(result.to_str()) + sys.stdout.write("Success\n") + translation.deactivate() diff --git a/tests/test_a2_rbac.py b/tests/test_a2_rbac.py index f619f26f..e3eed653 100644 --- a/tests/test_a2_rbac.py +++ b/tests/test_a2_rbac.py @@ -1,7 +1,11 @@ import pytest +from django.contrib.contenttypes.models import ContentType +from django_rbac.utils import get_permission_model +from django_rbac.models import Operation +from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU, RoleAttribute from authentic2.models import Service -from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU +from authentic2.utils import get_hex_uuid def test_role_natural_key(db): @@ -24,3 +28,164 @@ def test_role_natural_key(db): Role.objects.get_by_natural_key(*r2.natural_key()) with pytest.raises(Role.DoesNotExist): Role.objects.get_by_natural_key(*r4.natural_key()) + + +def test_basic_role_export_json(db): + role = Role.objects.create( + name='basic role', slug='basic-role', description='basic role description') + role_dict = role.export_json() + assert role_dict['name'] == role.name + assert role_dict['slug'] == role.slug + assert role_dict['uuid'] == role.uuid + assert role_dict['description'] == role.description + assert role_dict['admin_scope_id'] == role.admin_scope_id + assert role_dict['external_id'] == role.external_id + assert role_dict['ou'] is None + assert role_dict['service'] is None + + +def test_role_with_ou_export_json(db): + ou = OU.objects.create(name='ou', slug='ou') + role = Role.objects.create(name='some role', ou=ou) + role_dict = role.export_json() + assert role_dict['ou'] == {'uuid': ou.uuid, 'slug': ou.slug, 'name': ou.name} + + +def test_role_with_service_export_json(db): + service = Service.objects.create(name='service name', slug='service-name') + role = Role.objects.create(name='some role', service=service) + role_dict = role.export_json() + assert role_dict['service'] == {'slug': service.slug, 'name': service.name, 'ou': None} + + +def test_role_with_service_with_ou_export_json(db): + ou = OU.objects.create(name='ou', slug='ou') + service = Service.objects.create(name='service name', slug='service-name', ou=ou) + role = Role.objects.create(name='some role', service=service) + role_dict = role.export_json() + assert role_dict['service'] == { + 'slug': service.slug, 'name': service.name, 'ou': {'slug': 'ou'}} + + +def test_role_with_attributes_export_json(db): + role = Role.objects.create(name='some role') + attr1 = RoleAttribute.objects.create( + role=role, name='attr1_name', kind='string', value='attr1_value') + attr2 = RoleAttribute.objects.create( + role=role, name='attr2_name', kind='string', value='attr2_value') + + role_dict = role.export_json(attributes=True) + attributes = role_dict['attributes'] + assert len(attributes) == 2 + + expected_attr_names = set([attr1.name, attr2.name]) + for attr_dict in attributes: + assert attr_dict['name'] in expected_attr_names + expected_attr_names.remove(attr_dict['name']) + target_attr = RoleAttribute.objects.filter(name=attr_dict['name']).first() + assert attr_dict['kind'] == target_attr.kind + assert attr_dict['value'] == target_attr.value + + +def test_role_with_parents_export_json(db): + grand_parent_role = Role.objects.create( + name='test grand parent role', slug='test-grand-parent-role') + parent_1_role = Role.objects.create( + name='test parent 1 role', slug='test-parent-1-role') + parent_1_role.add_parent(grand_parent_role) + parent_2_role = Role.objects.create( + name='test parent 2 role', slug='test-parent-2-role') + parent_2_role.add_parent(grand_parent_role) + child_role = Role.objects.create( + name='test child role', slug='test-child-role') + child_role.add_parent(parent_1_role) + child_role.add_parent(parent_2_role) + + child_role_dict = child_role.export_json(parents=True) + assert child_role_dict['slug'] == child_role.slug + parents = child_role_dict['parents'] + assert len(parents) == 2 + expected_slugs = set([parent_1_role.slug, parent_2_role.slug]) + for parent in parents: + assert parent['slug'] in expected_slugs + expected_slugs.remove(parent['slug']) + + grand_parent_role_dict = grand_parent_role.export_json(parents=True) + assert grand_parent_role_dict['slug'] == grand_parent_role.slug + assert 'parents' not in grand_parent_role_dict + + parent_1_role_dict = parent_1_role.export_json(parents=True) + assert parent_1_role_dict['slug'] == parent_1_role.slug + parents = parent_1_role_dict['parents'] + assert len(parents) == 1 + assert parents[0]['slug'] == grand_parent_role.slug + + parent_2_role_dict = parent_2_role.export_json(parents=True) + assert parent_2_role_dict['slug'] == parent_2_role.slug + parents = parent_2_role_dict['parents'] + assert len(parents) == 1 + assert parents[0]['slug'] == grand_parent_role.slug + + +def test_role_with_permission_export_json(db): + some_ou = OU.objects.create(name='some ou', slug='some-ou') + role = Role.objects.create(name='role name', slug='role-slug') + other_role = Role.objects.create( + name='other role name', slug='other-role-slug', uuid=get_hex_uuid(), ou=some_ou) + ou = OU.objects.create(name='basic ou', slug='basic-ou', description='basic ou description') + Permission = get_permission_model() + op = Operation.objects.first() + perm_saml = Permission.objects.create( + operation=op, ou=ou, + target_ct=ContentType.objects.get_for_model(ContentType), + target_id=ContentType.objects.get(app_label="saml", model="libertyprovider").pk) + role.permissions.add(perm_saml) + perm_role = Permission.objects.create( + operation=op, ou=None, + target_ct=ContentType.objects.get_for_model(Role), + target_id=other_role.pk) + role.permissions.add(perm_role) + + export = role.export_json(permissions=True) + permissions = export['permissions'] + assert len(permissions) == 2 + assert permissions[0] == [ + u'add', [u'basic-ou'], (u'contenttypes', u'contenttype'), (u'saml', u'libertyprovider')] + assert permissions[1] == [ + u'add', None, (u'a2_rbac', u'role'), [u'other-role-slug', ['some-ou'], None]] + + +def test_basic_role_import_json(db): + role_dict = dict( + name='basic role', slug='basic-role', description='basic role description', + uuid=get_hex_uuid(), admin_scope_id=1, external_id='some id') + role = Role.import_json(role_dict) + assert role_dict['name'] == role.name + assert role_dict['slug'] == role.slug + assert role_dict['uuid'] == role.uuid + assert role_dict['description'] == role.description + assert role_dict['admin_scope_id'] == role.admin_scope_id + assert role_dict['external_id'] == role.external_id + + +def test_ou_export_json(db): + ou = OU.objects.create( + name='basic ou', slug='basic-ou', description='basic ou description', + username_is_unique=True, email_is_unique=True, default=False, validate_emails=True) + ou_dict = ou.export_json() + assert ou_dict['name'] == ou.name + assert ou_dict['slug'] == ou.slug + assert ou_dict['uuid'] == ou.uuid + assert ou_dict['description'] == ou.description + assert ou_dict['username_is_unique'] == ou.username_is_unique + assert ou_dict['email_is_unique'] == ou.email_is_unique + assert ou_dict['default'] == ou.default + assert ou_dict['validate_emails'] == ou.validate_emails + + +def test_ou_import_json(db): + ou_d = dict(name='basic ou', slug='basic-ou', description='basic ou description', + username_is_unique=True, email_is_unique=True, default=False, validate_emails=True) + ou = OU.import_json(ou_d) + for field, value in ou_d.items(): + assert getattr(ou, field) == value diff --git a/tests/test_data_transfer.py b/tests/test_data_transfer.py new file mode 100644 index 00000000..e489725b --- /dev/null +++ b/tests/test_data_transfer.py @@ -0,0 +1,378 @@ +from django_rbac.utils import get_role_model, get_ou_model +import pytest + +from authentic2.a2_rbac.models import RoleParenting +from authentic2.data_transfer import ( + DataImportError, export_roles, import_site, export_ou, ImportContext, + RoleDeserializer, search_role, import_ou, build_role_natural_key) +from authentic2.utils import get_hex_uuid + + +Role = get_role_model() +OU = get_ou_model() + + +def test_export_basic_role(db): + role = Role.objects.create(name='basic role', slug='basic-role', uuid=get_hex_uuid()) + query_set = Role.objects.filter(uuid=role.uuid) + roles = export_roles(query_set) + assert len(roles) == 1 + role_dict = roles[0] + for key, value in role.export_json().items(): + assert role_dict[key] == value + + +def test_export_role_with_parents(db): + grand_parent_role = Role.objects.create( + name='test grand parent role', slug='test-grand-parent-role', uuid=get_hex_uuid()) + parent_1_role = Role.objects.create( + name='test parent 1 role', slug='test-parent-1-role', uuid=get_hex_uuid()) + parent_1_role.add_parent(grand_parent_role) + parent_2_role = Role.objects.create( + name='test parent 2 role', slug='test-parent-2-role', uuid=get_hex_uuid()) + parent_2_role.add_parent(grand_parent_role) + child_role = Role.objects.create( + name='test child role', slug='test-child-role', uuid=get_hex_uuid()) + child_role.add_parent(parent_1_role) + child_role.add_parent(parent_2_role) + + query_set = Role.objects.filter(slug__startswith='test').order_by('slug') + roles = export_roles(query_set) + assert len(roles) == 4 + + child_role_dict = roles[0] + assert child_role_dict['slug'] == child_role.slug + parents = child_role_dict['parents'] + assert len(parents) == 2 + expected_slugs = set([parent_1_role.slug, parent_2_role.slug]) + for parent in parents: + assert parent['slug'] in expected_slugs + expected_slugs.remove(parent['slug']) + + grand_parent_role_dict = roles[1] + assert grand_parent_role_dict['slug'] == grand_parent_role.slug + + parent_1_role_dict = roles[2] + assert parent_1_role_dict['slug'] == parent_1_role.slug + parents = parent_1_role_dict['parents'] + assert len(parents) == 1 + assert parents[0]['slug'] == grand_parent_role.slug + + parent_2_role_dict = roles[3] + assert parent_2_role_dict['slug'] == parent_2_role.slug + parents = parent_2_role_dict['parents'] + assert len(parents) == 1 + assert parents[0]['slug'] == grand_parent_role.slug + + +def test_export_ou(db): + ou = OU.objects.create(name='ou name', slug='ou-slug') + ous = export_ou(OU.objects.filter(name='ou name')) + assert len(ous) == 1 + ou_d = ous[0] + assert ou_d['name'] == ou.name + assert ou_d['slug'] == ou.slug + + +def test_search_role_by_uuid(db): + uuid = get_hex_uuid() + role_d = {'uuid': uuid, 'slug': 'role-slug'} + role = Role.objects.create(**role_d) + assert role == search_role({'uuid': uuid, 'slug': 'other-role-slug'}) + + +def test_search_role_by_slug(db): + role_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug'} + role = Role.objects.create(**role_d) + assert role == search_role({ + 'uuid': get_hex_uuid(), 'slug': 'role-slug', + 'ou': None, 'service': None}) + + +def test_search_role_not_found(db): + assert search_role( + { + 'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name', + 'ou': None, 'service': None}) is None + + +def test_search_role_slug_not_unique(db): + role1_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name'} + role2_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name'} + ou = OU.objects.create(name='some ou', slug='some-ou') + role1 = Role.objects.create(ou=ou, **role1_d) + Role.objects.create(**role2_d) + assert role1 == search_role(role1.export_json()) + + +def test_role_deserializer(db): + rd = RoleDeserializer({ + 'name': 'some role', 'description': 'some role description', 'slug': 'some-role', + 'uuid': get_hex_uuid(), 'ou': None, 'service': None}, ImportContext()) + assert rd._ou is None + assert rd._parents is None + assert rd._attributes is None + assert rd._obj is None + role, status = rd.deserialize() + assert status == 'created' + assert role.name == 'some role' + assert role.description == 'some role description' + assert role.slug == 'some-role' + assert rd._obj == role + + +def test_role_deserializer_with_ou(db): + ou = OU.objects.create(name='some ou', slug='some-ou') + rd = RoleDeserializer({ + 'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'some role description', + 'slug': 'some-role', 'ou': {'slug': 'some-ou'}, 'service': None}, ImportContext()) + role, status = rd.deserialize() + assert role.ou == ou + + +def test_role_deserializer_missing_ou(db): + with pytest.raises(DataImportError): + RoleDeserializer({ + 'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'role description', + 'slug': 'some-role', 'ou': {'slug': 'some-ou'}, 'service': None}, + ImportContext()) + + +def test_role_deserializer_update_ou(db): + ou1 = OU.objects.create(name='ou 1', slug='ou-1') + ou2 = OU.objects.create(name='ou 2', slug='ou-2') + uuid = get_hex_uuid() + existing_role = Role.objects.create(uuid=uuid, slug='some-role', ou=ou1) + rd = RoleDeserializer({ + 'uuid': uuid, 'name': 'some-role', 'slug': 'some-role', + 'ou': {'slug': 'ou-2'}, 'service': None}, ImportContext()) + role, status = rd.deserialize() + assert role == existing_role + assert role.ou == ou2 + + +def test_role_deserializer_update_fields(db): + uuid = get_hex_uuid() + existing_role = Role.objects.create(uuid=uuid, slug='some-role', name='some role') + rd = RoleDeserializer({ + 'uuid': uuid, 'slug': 'some-role', 'name': 'some role changed', + 'ou': None, 'service': None}, ImportContext()) + role, status = rd.deserialize() + assert role == existing_role + assert role.name == 'some role changed' + + +def test_role_deserializer_with_attributes(db): + + attributes_data = { + 'attr1_name': dict(name='attr1_name', kind='string', value='attr1_value'), + 'attr2_name': dict(name='attr2_name', kind='string', value='attr2_value') + } + rd = RoleDeserializer({ + 'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'some role description', + 'slug': 'some-role', 'attributes': list(attributes_data.values()), + 'ou': None, 'service': None}, ImportContext()) + role, status = rd.deserialize() + created, deleted = rd.attributes() + assert role.attributes.count() == 2 + assert len(created) == 2 + + for attr in created: + attr_dict = attributes_data[attr.name] + assert attr_dict['name'] == attr.name + assert attr_dict['kind'] == attr.kind + assert attr_dict['value'] == attr.value + del attributes_data[attr.name] + + +def test_role_deserializer_creates_admin_role(db): + role_dict = { + 'name': 'some role', 'slug': 'some-role', 'uuid': get_hex_uuid(), + 'ou': None, 'service': None} + rd = RoleDeserializer(role_dict, ImportContext()) + rd.deserialize() + Role.objects.get(slug='_a2-managers-of-role-some-role') + + +def test_role_deserializer_parenting_existing_parent(db): + parent_role_dict = { + 'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(), + 'ou': None, 'service': None} + parent_role = Role.import_json(parent_role_dict) + child_role_dict = { + 'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict], + 'uuid': get_hex_uuid(), 'ou': None, 'service': None} + + rd = RoleDeserializer(child_role_dict, ImportContext()) + child_role, status = rd.deserialize() + created, deleted = rd.parentings() + + assert len(created) == 1 + parenting = created[0] + assert parenting.direct is True + assert parenting.parent == parent_role + assert parenting.child == child_role + + +def test_role_deserializer_parenting_non_existing_parent(db): + parent_role_dict = { + 'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(), + 'ou': None, 'service': None} + child_role_dict = { + 'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict], + 'uuid': get_hex_uuid(), 'ou': None, 'service': None} + rd = RoleDeserializer(child_role_dict, ImportContext()) + rd.deserialize() + with pytest.raises(DataImportError) as excinfo: + rd.parentings() + + assert "Could not find role" in str(excinfo.value) + + +def test_role_deserializer_permissions(db): + ou = OU.objects.create(slug='some-ou') + other_role_dict = { + 'name': 'other role', 'slug': 'other-role-slug', 'uuid': get_hex_uuid(), 'ou': ou} + other_role = Role.objects.create(**other_role_dict) + some_role_dict = { + 'name': 'some role', 'slug': 'some-role', 'uuid': get_hex_uuid(), + 'ou': None, 'service': None} + some_role_dict['permissions'] = [ + [u'add', None, (u'a2_rbac', u'role'), [u'other-role-slug', ['some-ou'], None]] + ] + + import_context = ImportContext() + rd = RoleDeserializer(some_role_dict, import_context) + rd.deserialize() + perm_created, perm_deleted = rd.permissions() + + assert len(perm_created) == 1 + assert len(perm_deleted) == 0 + del some_role_dict['permissions'] + role = Role.objects.get(slug=some_role_dict['slug']) + assert role.permissions.count() == 1 + perm = role.permissions.first() + assert perm.operation.slug == 'add' + assert not perm.ou + assert perm.target == other_role + + # that one should delete permissions + rd = RoleDeserializer(some_role_dict, import_context) + role, _ = rd.deserialize() + perm_created, perm_deleted = rd.permissions() + assert role.permissions.count() == 0 + assert len(perm_created) == 0 + assert len(perm_deleted) == 1 + + +def import_ou_created(db): + uuid = get_hex_uuid() + ou_d = {'uuid': uuid, 'slug': 'ou-slug', 'name': 'ou name'} + ou, status = import_ou(ou_d) + assert status == 'created' + assert ou.uuid == ou_d['uuid'] + assert ou.slug == ou_d['slug'] + assert ou.name == ou_d['name'] + + +def import_ou_updated(db): + ou = OU.objects.create(slug='some-ou', name='ou name') + ou_d = {'uuid': ou.uuid, 'slug': ou.slug, 'name': 'new name'} + ou_updated, status = import_ou(ou_d) + assert status == 'updated' + assert ou == ou_updated + assert ou.name == 'new name' + + +def testi_import_site_empty(): + res = import_site({}, ImportContext()) + assert res.roles == {'created': [], 'updated': []} + assert res.ous == {'created': [], 'updated': []} + assert res.parentings == {'created': [], 'deleted': []} + + +def test_import_site_roles(db): + parent_role_dict = { + 'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(), + 'ou': None, 'service': None} + child_role_dict = { + 'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict], + 'uuid': get_hex_uuid(), 'ou': None, 'service': None} + roles = [ + parent_role_dict, + child_role_dict + ] + res = import_site({'roles': roles}, ImportContext()) + created_roles = res.roles['created'] + assert len(created_roles) == 2 + parent_role = Role.objects.get(**parent_role_dict) + del child_role_dict['parents'] + child_role = Role.objects.get(**child_role_dict) + assert created_roles[0] == parent_role + assert created_roles[1] == child_role + + assert len(res.parentings['created']) == 1 + assert res.parentings['created'][0] == RoleParenting.objects.get( + child=child_role, parent=parent_role, direct=True) + + +def test_roles_import_ignore_technical_role(db): + roles = [{ + 'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}] + res = import_site({'roles': roles}, ImportContext()) + assert res.roles == {'created': [], 'updated': []} + + +def test_roles_import_ignore_technical_role_with_service(db): + roles = [{ + 'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}] + res = import_site({'roles': roles}, ImportContext()) + assert res.roles == {'created': [], 'updated': []} + + +def test_import_role_handle_manager_role_parenting(db): + parent_role_dict = { + 'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(), + 'ou': None, 'service': None} + parent_role_manager_dict = { + 'name': 'Administrateur du role grand parent role', + 'slug': '_a2-managers-of-role-grand-parent-role', 'uuid': get_hex_uuid(), + 'ou': None, 'service': None} + child_role_dict = { + 'name': 'child role', 'slug': 'child-role', + 'parents': [parent_role_dict, parent_role_manager_dict], + 'uuid': get_hex_uuid(), 'ou': None, 'service': None} + import_site({'roles': [child_role_dict, parent_role_dict]}, ImportContext()) + child = Role.objects.get(slug='child-role') + manager = Role.objects.get(slug='_a2-managers-of-role-grand-parent-role') + RoleParenting.objects.get(child=child, parent=manager, direct=True) + + +def test_import_roles_role_delete_orphans(db): + roles = [{ + 'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}] + with pytest.raises(DataImportError): + import_site({'roles': roles}, ImportContext(role_delete_orphans=True)) + + +def test_import_ou(db): + uuid = get_hex_uuid() + name = 'ou name' + ous = [{'uuid': uuid, 'slug': 'ou-slug', 'name': name}] + res = import_site({'ous': ous}, ImportContext()) + assert len(res.ous['created']) == 1 + ou = res.ous['created'][0] + assert ou.uuid == uuid + assert ou.name == name + Role.objects.get(slug='_a2-managers-of-ou-slug') + + +def test_import_ou_already_existing(db): + uuid = get_hex_uuid() + ou_d = {'uuid': uuid, 'slug': 'ou-slug', 'name': 'ou name'} + ou = OU.objects.create(**ou_d) + num_ous = OU.objects.count() + res = import_site({'ous': [ou_d]}, ImportContext()) + assert len(res.ous['created']) == 0 + assert num_ous == OU.objects.count() + assert ou == OU.objects.get(uuid=uuid) diff --git a/tests/test_import_export_site_cmd.py b/tests/test_import_export_site_cmd.py new file mode 100644 index 00000000..b04b133b --- /dev/null +++ b/tests/test_import_export_site_cmd.py @@ -0,0 +1,153 @@ +import __builtin__ +import json + +from django.core import management +import pytest + +from django_rbac.utils import get_role_model + + +def dummy_export_site(*args): + return {'roles': [{'name': 'role1'}]} + + +def test_export_role_cmd_stdout(db, capsys, monkeypatch): + import authentic2.management.commands.export_site + monkeypatch.setattr( + authentic2.management.commands.export_site, 'export_site', dummy_export_site) + management.call_command('export_site') + out, err = capsys.readouterr() + assert json.loads(out) == dummy_export_site() + + +def test_export_role_cmd_to_file(db, monkeypatch, tmpdir): + import authentic2.management.commands.export_site + monkeypatch.setattr( + authentic2.management.commands.export_site, 'export_site', dummy_export_site) + outfile = tmpdir.join('export.json') + management.call_command('export_site', '--output', outfile.strpath) + with outfile.open('r') as f: + assert json.loads(f.read()) == dummy_export_site() + + +def test_import_site_cmd(db, tmpdir, monkeypatch): + export_file = tmpdir.join('roles-export.json') + with export_file.open('w'): + export_file.write(json.dumps({'roles': []})) + management.call_command('import_site', '-y', export_file.strpath) + + +def test_import_site_cmd_infos_on_stdout(db, tmpdir, monkeypatch, capsys): + export_file = tmpdir.join('roles-export.json') + with export_file.open('w'): + export_file.write(json.dumps( + {'roles': [{ + 'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name', + 'ou': None, 'service': None}]})) + + management.call_command('import_site', '-y', export_file.strpath) + + out, err = capsys.readouterr() + assert "Real run" in out + assert "1 roles created" in out + assert "0 roles updated" in out + + +def test_import_site_transaction_rollback_on_error(db, tmpdir, monkeypatch, capsys): + export_file = tmpdir.join('roles-export.json') + with export_file.open('w'): + export_file.write(json.dumps({'roles': []})) + + Role = get_role_model() + + def exception_import_site(*args): + Role.objects.create(slug='role-slug') + raise Exception() + + import authentic2.management.commands.import_site + monkeypatch.setattr( + authentic2.management.commands.import_site, 'import_site', exception_import_site) + + with pytest.raises(Exception): + management.call_command('import_site', '--yes', export_file.strpath) + + with pytest.raises(Role.DoesNotExist): + Role.objects.get(slug='role-slug') + + +def test_import_site_transaction_rollback_on_dry_run(db, tmpdir, monkeypatch, capsys): + export_file = tmpdir.join('roles-export.json') + with export_file.open('w'): + export_file.write(json.dumps( + {'roles': [{ + 'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name', + 'ou': None, 'service': None}]})) + + Role = get_role_model() + + management.call_command('import_site', '--dry-run', export_file.strpath) + + with pytest.raises(Role.DoesNotExist): + Role.objects.get(slug='role-slug') + + +def test_import_site_cmd_unhandled_context_option(db, tmpdir, monkeypatch, capsys): + from authentic2.data_transfer import DataImportError + + export_file = tmpdir.join('roles-export.json') + with export_file.open('w'): + export_file.write(json.dumps( + {'roles': [{ + 'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name', + 'ou': None, 'service': None}]})) + + get_role_model().objects.create(uuid='dqfewrvesvews2532', slug='role-slug', name='role-name') + + with pytest.raises(DataImportError): + management.call_command( + 'import_site', '-y', '-o', 'role-delete-orphans', export_file.strpath) + + +def test_import_site_cmd_unknown_context_option(db, tmpdir, monkeypatch, capsys): + from django.core.management.base import CommandError + export_file = tmpdir.join('roles-export.json') + with pytest.raises(CommandError): + management.call_command('import_site', '-y', '-o', 'unknown-option', export_file.strpath) + + +def test_import_site_confirm_prompt_yes(db, tmpdir, monkeypatch): + export_file = tmpdir.join('roles-export.json') + with export_file.open('w'): + export_file.write(json.dumps( + {'roles': [{ + 'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name', + 'ou': None, 'service': None}]})) + + def yes_raw_input(*args, **kwargs): + return 'yes' + + monkeypatch.setattr(__builtin__, 'raw_input', yes_raw_input) + + management.call_command('import_site', export_file.strpath, stdin='yes') + assert get_role_model().objects.get(uuid='dqfewrvesvews2532') + + +def test_import_site_confirm_prompt_no(db, monkeypatch, tmpdir): + export_file = tmpdir.join('roles-export.json') + with export_file.open('w'): + export_file.write(json.dumps( + {'roles': [{ + 'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name', + 'ou': None, 'service': None}]})) + + def no_raw_input(*args, **kwargs): + return 'no' + + monkeypatch.setattr(__builtin__, 'raw_input', no_raw_input) + + with pytest.raises(SystemExit): + management.call_command('import_site', export_file.strpath) + + Role = get_role_model() + with pytest.raises(Role.DoesNotExist): + Role.objects.get(uuid='dqfewrvesvews2532') -- 2.16.3