Projet

Général

Profil

0001-create-import_site-and-export_site-commands-16514.patch

Emmanuel Cazenave, 03 avril 2018 10:24

Télécharger (48,8 ko)

Voir les différences:

Subject: [PATCH] create 'import_site' and 'export_site' commands (#16514)

Handle only OU and Role.
 src/authentic2/a2_rbac/models.py                  |  90 ++++++
 src/authentic2/data_transfer.py                   | 294 +++++++++++++++++
 src/authentic2/management/commands/export_site.py |  24 ++
 src/authentic2/management/commands/import_site.py | 117 +++++++
 tests/test_a2_rbac.py                             | 167 +++++++++-
 tests/test_data_transfer.py                       | 378 ++++++++++++++++++++++
 tests/test_import_export_site_cmd.py              | 153 +++++++++
 7 files changed, 1222 insertions(+), 1 deletion(-)
 create mode 100644 src/authentic2/data_transfer.py
 create mode 100644 src/authentic2/management/commands/export_site.py
 create mode 100644 src/authentic2/management/commands/import_site.py
 create mode 100644 tests/test_data_transfer.py
 create mode 100644 tests/test_import_export_site_cmd.py
src/authentic2/a2_rbac/models.py
22 22

  
23 23
from . import managers, fields
24 24

  
25
SIMPLE_SERIALIZABLE_FIELDS = (models.TextField, models.CharField, models.SlugField,
26
                              models.URLField, models.BooleanField, fields.UniqueBooleanField,
27
                              models.IntegerField, models.CommaSeparatedIntegerField,
28
                              models.EmailField, models.IntegerField, models.PositiveIntegerField)
29

  
25 30

  
26 31
class OrganizationalUnit(OrganizationalUnitAbstractBase):
27 32
    username_is_unique = models.BooleanField(
......
92 97
    def cached(cls):
93 98
        return cls.objects.all()
94 99

  
100
    def export_json(self):
101
        d = {}
102
        concrete_fields = [f for f in self.__class__._meta.get_fields()
103
                           if f.concrete and not f.is_relation]
104
        for field in concrete_fields:
105
            if field.name == 'id':
106
                continue
107
            value = getattr(self, field.attname)
108
            if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS):
109
                d[field.name] = value
110
            else:
111
                raise Exception('export_json: field %s of ressource class %s is unsupported' % (
112
                    field, self.__class__))
113
        return d
114

  
115
    @classmethod
116
    def import_json(cls, d, to_update=None):
117
        if to_update is None:
118
            return cls.objects.create(**d)
119
        else:
120
            for attr, value in d.items():
121
                setattr(to_update, attr, value)
122
            to_update.save()
123
            return to_update
124

  
95 125

  
96 126
class Permission(PermissionAbstractBase):
97 127
    class Meta:
......
207 237
            'ou__slug': self.ou.slug if self.ou else None,
208 238
        }
209 239

  
240
    def export_json(self, attributes=False, parents=False, permissions=False):
241
        d = {}
242
        concrete_fields = [f for f in self.__class__._meta.get_fields()
243
                           if f.concrete and not f.is_relation]
244
        for field in concrete_fields:
245
            if field.name == 'id':
246
                continue
247
            value = getattr(self, field.attname)
248
            if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS):
249
                d[field.name] = value
250
            else:
251
                raise Exception('export_json: field %s of ressource class %s is unsupported' % (
252
                    field, self.__class__))
253

  
254
        d['ou'] = None
255
        if self.ou:
256
            d['ou'] = {'uuid': self.ou.uuid, 'slug': self.ou.slug, 'name': self.ou.name}
257
        d['service'] = None
258
        if self.service:
259
            d['service'] = {'slug': self.service.slug, 'name': self.service.name, 'ou': None}
260
            if self.service.ou:
261
                d['service']['ou'] = {'slug': self.service.ou.slug}
262

  
263
        if attributes:
264
            for attribute in self.attributes.all():
265
                d.setdefault('attributes', []).append(attribute.to_json())
266

  
267
        if parents:
268
            RoleParenting = rbac_utils.get_role_parenting_model()
269
            for parenting in RoleParenting.objects.filter(child_id=self.id, direct=True):
270
                d.setdefault('parents', []).append(parenting.parent.export_json())
271

  
272
        if permissions:
273
            for perm in self.permissions.all():
274
                d.setdefault('permissions', []).append(perm.natural_key())
275

  
276
        return d
277

  
278
    @classmethod
279
    def import_json(cls, d, to_update=None):
280
        kwargs = {}
281
        concrete_fields = [f for f in cls._meta.get_fields()
282
                           if f.concrete and not f.is_relation]
283
        for field in concrete_fields:
284
            if field.name == 'id':
285
                continue
286
            if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS) and field.name in d:
287
                kwargs[field.name] = d[field.name]
288
        if to_update is None:
289
            return cls.objects.create(**kwargs)
290
        else:
291
            for attr, value in kwargs.items():
292
                setattr(to_update, attr, value)
293
            to_update.save()
294
        return to_update
295

  
210 296

  
211 297
class RoleParenting(RoleParentingAbstractBase):
212 298
    class Meta(RoleParentingAbstractBase.Meta):
......
239 325
            ('role', 'name', 'kind', 'value'),
240 326
        )
241 327

  
328
    def to_json(self):
329
        return {'name': self.name, 'kind': self.kind, 'value': self.value}
330

  
331

  
242 332
GenericRelation(Permission,
243 333
                content_type_field='target_ct',
244 334
                object_id_field='target_id').contribute_to_class(ContentType, 'admin_perms')
src/authentic2/data_transfer.py
1
from django.contrib.contenttypes.models import ContentType
2

  
3
from django_rbac.models import Operation
4
from django_rbac.utils import (
5
    get_ou_model,  get_role_model, get_role_parenting_model, get_permission_model)
6
from authentic2.a2_rbac.models import RoleAttribute
7

  
8

  
9
def export_site():
10
    return {
11
        'roles': export_roles(get_role_model().objects.all()),
12
        'ous': export_ou(get_ou_model().objects.all())
13
    }
14

  
15

  
16
def export_ou(ou_query_set):
17
    return [ou.export_json() for ou in ou_query_set]
18

  
19

  
20
def export_roles(role_queryset):
21
    """ Serialize roles in role_queryset
22
    """
23
    return [role.export_json(attributes=True, parents=True) for role in role_queryset]
24

  
25

  
26
def build_ou_natural_key(d):
27
    return None if d['ou'] is None else [d['ou']['slug']]
28

  
29

  
30
def build_role_natural_key(role_d):
31
    role_slug = role_d['slug']
32
    ou_nk = build_ou_natural_key(role_d)
33
    service_nk = None if role_d['service'] is None else [
34
        build_ou_natural_key(role_d['service']), role_d['service']['slug']]
35
    return [role_slug, ou_nk, service_nk]
36

  
37

  
38
def search_ou(ou_natural_key):
39
    """ ou_natural_key: ['ou-slug'] or None
40
    """
41
    if ou_natural_key:
42
        try:
43
            OU = get_ou_model()
44
            return OU.objects.get_by_natural_key(*ou_natural_key)
45
        except OU.DoesNotExist:
46
            pass
47
    return None
48

  
49

  
50
def search_role(role_d):
51
    Role = get_role_model()
52
    try:
53
        return Role.objects.get(uuid=role_d['uuid'])
54
    except Role.DoesNotExist:
55
        pass
56
    try:
57
        return Role.objects.get_by_natural_key(*build_role_natural_key(role_d))
58
    except Role.DoesNotExist:
59
        return None
60

  
61

  
62
class ImportContext(object):
63
    """ Holds information on how to perform the import.
64

  
65
    ou_delete_orphans: if True any existing ou that is not found in the export will
66
                       be deleted
67

  
68
    role_delete_orphans: if True any existing role that is not found in the export will
69
                         be deleted
70

  
71

  
72
    role_attributes_update: for each role in the import data,
73
                            attributes  will deleted and re-created
74

  
75

  
76
    role_parentings_update: for each role in the import data,
77
                            parentings will deleted and re-created
78

  
79
    role_permissions_update: for each role in the import data,
80
                             permissions  will deleted and re-created
81
    """
82

  
83
    def __init__(
84
            self, role_delete_orphans=False, role_parentings_update=True,
85
            role_permissions_update=True, role_attributes_update=True,
86
            ou_delete_orphans=False):
87
        self.role_delete_orphans = role_delete_orphans
88
        self.ou_delete_orphans = ou_delete_orphans
89
        self.role_parentings_update = role_parentings_update
90
        self.role_permissions_update = role_permissions_update
91
        self.role_attributes_update = role_attributes_update
92

  
93

  
94
class DataImportError(Exception):
95
    pass
96

  
97

  
98
class RoleDeserializer(object):
99

  
100
    def __init__(self, d, import_context):
101
        self._import_context = import_context
102
        self._obj = None
103
        self._ou = None
104
        self._parents = None
105
        self._attributes = None
106
        self._permissions = None
107

  
108
        self._role_d = dict()
109
        for key, value in d.items():
110
            if key == 'parents':
111
                self._parents = value
112
            elif key == 'attributes':
113
                self._attributes = value
114
            elif key == 'permissions':
115
                self._permissions = value
116
            else:
117
                self._role_d[key] = value
118

  
119
        ou_natural_key = build_ou_natural_key(self._role_d)
120
        if ou_natural_key:
121
            self._ou = search_ou(ou_natural_key)
122
            if self._ou is None:
123
                raise DataImportError(
124
                    "Can't import role because missing Organizational Unit : "
125
                    "%s" % ou_natural_key[0])
126

  
127
    def deserialize(self):
128
        kwargs = self._role_d.copy()
129
        obj = search_role(self._role_d)
130
        if obj:  # Role already exist
131
            self._obj = obj
132
            status = 'updated'
133
            get_role_model().import_json(kwargs, self._obj)
134
            if self._ou and (self._obj.ou != self._ou):
135
                # Need to update ou
136
                self._obj.ou = self._ou
137
                self._obj.save()
138
        else:  # Create role
139
            self._obj = get_role_model().import_json(kwargs)
140
            if self._ou:
141
                self._obj.ou = self._ou
142
                self._obj.save()
143
            status = 'created'
144

  
145
        # Ensure admin role is created
146
        self._obj.get_admin_role()
147
        return self._obj, status
148

  
149
    def attributes(self):
150
        """ Update attributes (delete everything then create)
151
        """
152
        created, deleted = [], []
153
        for attr in self._obj.attributes.all():
154
            attr.delete()
155
            deleted.append(attr)
156
        # Create attributes
157
        if self._attributes:
158
            for attr_dict in self._attributes:
159
                attr_dict['role'] = self._obj
160
                created.append(RoleAttribute.objects.create(**attr_dict))
161

  
162
        return created, deleted
163

  
164
    def parentings(self):
165
        """ Update parentings (delete everything then create)
166
        """
167
        created, deleted = [], []
168
        Parenting = get_role_parenting_model()
169
        for parenting in Parenting.objects.filter(child=self._obj, direct=True):
170
            parenting.delete()
171
            deleted.append(parenting)
172

  
173
        if self._parents:
174
            for parent_d in self._parents:
175
                parent = search_role(parent_d)
176
                if not parent:
177
                    raise DataImportError("Could not find role : %s" % parent_d)
178
                created.append(Parenting.objects.create(
179
                    child=self._obj, direct=True, parent=parent))
180

  
181
        return created, deleted
182

  
183
    def permissions(self):
184
        """ Update permissions (delete everything then create)
185
        """
186
        created, deleted = [], []
187
        for perm in self._obj.permissions.all():
188
            perm.delete()
189
            deleted.append(perm)
190
        self._obj.permissions.clear()
191
        if self._permissions:
192
            for perm in self._permissions:
193
                op = Operation.objects.get_by_natural_key(perm[0])
194
                ou = get_ou_model().objects.get_by_natural_key(perm[1]) if perm[1] else None
195
                ct = ContentType.objects.get_by_natural_key(*perm[2])
196
                target = ct.model_class().objects.get_by_natural_key(*perm[3])
197
                perm = get_permission_model().objects.create(
198
                    operation=op, ou=ou, target_ct=ct, target_id=target.pk)
199
                self._obj.permissions.add(perm)
200
                created.append(perm)
201

  
202
        return created, deleted
203

  
204

  
205
class ImportResult(object):
206

  
207
    def __init__(self):
208
        self.roles = {'created': [], 'updated': []}
209
        self.ous = {'created': [], 'updated': []}
210
        self.attributes = {'created': [], 'deleted': []}
211
        self.parentings = {'created': [], 'deleted': []}
212
        self.permissions = {'created': [], 'deleted': []}
213

  
214
    def update_roles(self, role, d_status):
215
        self.roles[d_status].append(role)
216

  
217
    def update_ous(self, ou, status):
218
        self.ous[status].append(ou)
219

  
220
    def _bulk_update(self, attrname, created, deleted):
221
        attr = getattr(self, attrname)
222
        attr['created'].extend(created)
223
        attr['deleted'].extend(deleted)
224

  
225
    def update_attributes(self, created, deleted):
226
        self._bulk_update('attributes', created, deleted)
227

  
228
    def update_parentings(self, created, deleted):
229
        self._bulk_update('parentings', created, deleted)
230

  
231
    def update_permissions(self, created, deleted):
232
        self._bulk_update('permissions', created, deleted)
233

  
234
    def to_str(self, verbose=False):
235
        res = ""
236
        for attr in ('roles', 'ous', 'parentings', 'permissions', 'attributes'):
237
            data = getattr(self, attr)
238
            for status in ('created', 'updated', 'deleted'):
239
                if status in data:
240
                    s_data = data[status]
241
                    res += "%s %s %s\n" % (len(s_data), attr, status)
242
        return res
243

  
244

  
245
def import_ou(ou_d):
246
    OU = get_ou_model()
247
    ou = search_ou([ou_d['slug']])
248
    if ou is None:
249
        ou = OU.import_json(ou_d)
250
        status = 'created'
251
    else:
252
        OU.import_json(ou_d, ou)
253
        status = 'updated'
254
    # Ensure admin role is created
255
    ou.get_admin_role()
256
    return ou, status
257

  
258

  
259
def import_site(json_d, import_context):
260
    result = ImportResult()
261

  
262
    for ou_d in json_d.get('ous', []):
263
        result.update_ous(*import_ou(ou_d))
264

  
265
    roles_ds = [RoleDeserializer(role_d, import_context) for role_d in json_d.get('roles', [])
266
                if not role_d['slug'].startswith('_')]
267

  
268
    for ds in roles_ds:
269
        result.update_roles(*ds.deserialize())
270

  
271
    if import_context.role_attributes_update:
272
        for ds in roles_ds:
273
            result.update_attributes(*ds.attributes())
274

  
275
    if import_context.role_parentings_update:
276
        for ds in roles_ds:
277
            result.update_parentings(*ds.parentings())
278

  
279
    if import_context.role_permissions_update:
280
        for ds in roles_ds:
281
            result.update_permissions(*ds.permissions())
282

  
283
    if import_context.ou_delete_orphans:
284
        raise DataImportError(
285
            "Unsupported context value for ou_delete_orphans : %s" % (
286
                import_context.ou_delete_orphans))
287

  
288
    if import_context.role_delete_orphans:
289
        # FIXME : delete each role that is in DB but not in the export
290
        raise DataImportError(
291
            "Unsupported context value for role_delete_orphans : %s" % (
292
                import_context.role_delete_orphans))
293

  
294
    return result
src/authentic2/management/commands/export_site.py
1
import json
2
import sys
3

  
4
from django.core.management.base import BaseCommand
5

  
6
from authentic2.data_transfer import export_site
7
from django_rbac.utils import get_role_model
8

  
9

  
10
class Command(BaseCommand):
11
    help = 'Export site'
12

  
13
    def add_arguments(self, parser):
14
        parser.add_argument('--output', metavar='FILE', default=None,
15
                            help='name of a file to write output to')
16

  
17
    def handle(self, *args, **options):
18
        if options['output']:
19
            output, close = open(options['output'], 'w'), True
20
        else:
21
            output, close = sys.stdout, False
22
        json.dump(export_site(), output, indent=4)
23
        if close:
24
            output.close()
src/authentic2/management/commands/import_site.py
1
import contextlib
2
import json
3
import sys
4

  
5
from django.conf import settings
6
from django.core.management.base import BaseCommand
7
from django.db import transaction
8
from django.utils import translation
9

  
10
from authentic2.data_transfer import import_site, ImportContext
11

  
12

  
13
class DryRunException(Exception):
14
    pass
15

  
16

  
17
# Borrowed from http://code.activestate.com/recipes/577058/
18
def query_yes_no(question, default="yes"):
19
    """Ask a yes/no question via raw_input() and return their answer.
20

  
21
    "question" is a string that is presented to the user.
22
    "default" is the presumed answer if the user just hits <Enter>.
23
        It must be "yes" (the default), "no" or None (meaning
24
        an answer is required of the user).
25

  
26
    The "answer" return value is True for "yes" or False for "no".
27
    """
28
    valid = {"yes": True, "y": True, "ye": True,
29
             "no": False, "n": False}
30
    if default is None:
31
        prompt = " [y/n] "
32
    elif default == "yes":
33
        prompt = " [Y/n] "
34
    elif default == "no":
35
        prompt = " [y/N] "
36
    else:
37
        raise ValueError("invalid default answer: '%s'" % default)
38

  
39
    while True:
40
        sys.stdout.write(question + prompt)
41
        choice = raw_input().lower()
42
        if default is not None and choice == '':
43
            return valid[default]
44
        elif choice in valid:
45
            return valid[choice]
46
        else:
47
            sys.stdout.write("Please respond with 'yes' or 'no' "
48
                             "(or 'y' or 'n').\n")
49

  
50

  
51
def create_context_args(options):
52
    kwargs = {}
53
    if options['option']:
54
        for context_op in options['option']:
55
            context_op = context_op.replace('-', '_')
56
            if context_op.startswith('no_'):
57
                kwargs[context_op[3:]] = False
58
            else:
59
                kwargs[context_op] = True
60
    return kwargs
61

  
62

  
63
#  Borrowed from https://bugs.python.org/issue10049#msg118599
64
@contextlib.contextmanager
65
def provision_contextm(dry_run):
66
    multitenant = False
67
    try:
68
        import hobo.agent.authentic2
69
        multitenant = True
70
    except ImportError:
71
        pass
72
    if dry_run and multitenant:
73
        with hobo.agent.authentic2.provisionning.Provisionning():
74
            yield
75
    else:
76
        yield
77

  
78

  
79
class Command(BaseCommand):
80
    help = 'Import site'
81

  
82
    def add_arguments(self, parser):
83
        parser.add_argument(
84
            'filename', metavar='FILENAME', type=str, help='name of file to import')
85
        parser.add_argument(
86
            '--dry-run', action='store_true', dest='dry_run', help='Really perform the import')
87
        parser.add_argument(
88
            '-y', '--yes', action='store_true', dest='skip_confirm',
89
            help='Skip confirmation prompt')
90
        parser.add_argument(
91
            '-o', '--option', action='append', help='Import context options',
92
            choices=[
93
                'role-delete-orphans', 'ou-delete-orphans', 'no-role-permissions-update',
94
                'no-role-attributes-update', 'no-role-parentings-update'])
95

  
96
    def handle(self, filename, **options):
97
        translation.activate(settings.LANGUAGE_CODE)
98
        dry_run = options['dry_run']
99
        skip_confirm = options['skip_confirm']
100
        if not (dry_run or skip_confirm):
101
            if not query_yes_no("Do you really want to perform the import ?"):
102
                sys.exit()
103
        msg = "Dry run\n" if dry_run else "Real run\n"
104
        c_kwargs = create_context_args(options)
105
        try:
106
            with open(filename, 'r') as f:
107
                with provision_contextm(dry_run):
108
                    with transaction.atomic():
109
                        sys.stdout.write(msg)
110
                        result = import_site(json.load(f), ImportContext(**c_kwargs))
111
                        if dry_run:
112
                            raise DryRunException()
113
        except DryRunException:
114
            pass
115
        sys.stdout.write(result.to_str())
116
        sys.stdout.write("Success\n")
117
        translation.deactivate()
tests/test_a2_rbac.py
1 1
import pytest
2 2

  
3
from django.contrib.contenttypes.models import ContentType
4
from django_rbac.utils import get_permission_model
5
from django_rbac.models import Operation
6
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU, RoleAttribute
3 7
from authentic2.models import Service
4
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU
8
from authentic2.utils import get_hex_uuid
5 9

  
6 10

  
7 11
def test_role_natural_key(db):
......
24 28
        Role.objects.get_by_natural_key(*r2.natural_key())
25 29
    with pytest.raises(Role.DoesNotExist):
26 30
        Role.objects.get_by_natural_key(*r4.natural_key())
31

  
32

  
33
def test_basic_role_export_json(db):
34
    role = Role.objects.create(
35
        name='basic role', slug='basic-role', description='basic role description')
36
    role_dict = role.export_json()
37
    assert role_dict['name'] == role.name
38
    assert role_dict['slug'] == role.slug
39
    assert role_dict['uuid'] == role.uuid
40
    assert role_dict['description'] == role.description
41
    assert role_dict['admin_scope_id'] == role.admin_scope_id
42
    assert role_dict['external_id'] == role.external_id
43
    assert role_dict['ou'] is None
44
    assert role_dict['service'] is None
45

  
46

  
47
def test_role_with_ou_export_json(db):
48
    ou = OU.objects.create(name='ou', slug='ou')
49
    role = Role.objects.create(name='some role', ou=ou)
50
    role_dict = role.export_json()
51
    assert role_dict['ou'] == {'uuid': ou.uuid, 'slug': ou.slug,  'name': ou.name}
52

  
53

  
54
def test_role_with_service_export_json(db):
55
    service = Service.objects.create(name='service name', slug='service-name')
56
    role = Role.objects.create(name='some role', service=service)
57
    role_dict = role.export_json()
58
    assert role_dict['service'] == {'slug': service.slug,  'name': service.name, 'ou': None}
59

  
60

  
61
def test_role_with_service_with_ou_export_json(db):
62
    ou = OU.objects.create(name='ou', slug='ou')
63
    service = Service.objects.create(name='service name', slug='service-name', ou=ou)
64
    role = Role.objects.create(name='some role', service=service)
65
    role_dict = role.export_json()
66
    assert role_dict['service'] == {
67
        'slug': service.slug,  'name': service.name, 'ou': {'slug': 'ou'}}
68

  
69

  
70
def test_role_with_attributes_export_json(db):
71
    role = Role.objects.create(name='some role')
72
    attr1 = RoleAttribute.objects.create(
73
        role=role, name='attr1_name', kind='string', value='attr1_value')
74
    attr2 = RoleAttribute.objects.create(
75
        role=role, name='attr2_name', kind='string', value='attr2_value')
76

  
77
    role_dict = role.export_json(attributes=True)
78
    attributes = role_dict['attributes']
79
    assert len(attributes) == 2
80

  
81
    expected_attr_names = set([attr1.name, attr2.name])
82
    for attr_dict in attributes:
83
        assert attr_dict['name'] in expected_attr_names
84
        expected_attr_names.remove(attr_dict['name'])
85
        target_attr = RoleAttribute.objects.filter(name=attr_dict['name']).first()
86
        assert attr_dict['kind'] == target_attr.kind
87
        assert attr_dict['value'] == target_attr.value
88

  
89

  
90
def test_role_with_parents_export_json(db):
91
    grand_parent_role = Role.objects.create(
92
        name='test grand parent role', slug='test-grand-parent-role')
93
    parent_1_role = Role.objects.create(
94
        name='test parent 1 role', slug='test-parent-1-role')
95
    parent_1_role.add_parent(grand_parent_role)
96
    parent_2_role = Role.objects.create(
97
        name='test parent 2 role', slug='test-parent-2-role')
98
    parent_2_role.add_parent(grand_parent_role)
99
    child_role = Role.objects.create(
100
        name='test child role', slug='test-child-role')
101
    child_role.add_parent(parent_1_role)
102
    child_role.add_parent(parent_2_role)
103

  
104
    child_role_dict = child_role.export_json(parents=True)
105
    assert child_role_dict['slug'] == child_role.slug
106
    parents = child_role_dict['parents']
107
    assert len(parents) == 2
108
    expected_slugs = set([parent_1_role.slug, parent_2_role.slug])
109
    for parent in parents:
110
        assert parent['slug'] in expected_slugs
111
        expected_slugs.remove(parent['slug'])
112

  
113
    grand_parent_role_dict = grand_parent_role.export_json(parents=True)
114
    assert grand_parent_role_dict['slug'] == grand_parent_role.slug
115
    assert 'parents' not in grand_parent_role_dict
116

  
117
    parent_1_role_dict = parent_1_role.export_json(parents=True)
118
    assert parent_1_role_dict['slug'] == parent_1_role.slug
119
    parents = parent_1_role_dict['parents']
120
    assert len(parents) == 1
121
    assert parents[0]['slug'] == grand_parent_role.slug
122

  
123
    parent_2_role_dict = parent_2_role.export_json(parents=True)
124
    assert parent_2_role_dict['slug'] == parent_2_role.slug
125
    parents = parent_2_role_dict['parents']
126
    assert len(parents) == 1
127
    assert parents[0]['slug'] == grand_parent_role.slug
128

  
129

  
130
def test_role_with_permission_export_json(db):
131
    some_ou = OU.objects.create(name='some ou', slug='some-ou')
132
    role = Role.objects.create(name='role name', slug='role-slug')
133
    other_role = Role.objects.create(
134
        name='other role name', slug='other-role-slug', uuid=get_hex_uuid(), ou=some_ou)
135
    ou = OU.objects.create(name='basic ou', slug='basic-ou', description='basic ou description')
136
    Permission = get_permission_model()
137
    op = Operation.objects.first()
138
    perm_saml = Permission.objects.create(
139
        operation=op, ou=ou,
140
        target_ct=ContentType.objects.get_for_model(ContentType),
141
        target_id=ContentType.objects.get(app_label="saml", model="libertyprovider").pk)
142
    role.permissions.add(perm_saml)
143
    perm_role = Permission.objects.create(
144
        operation=op, ou=None,
145
        target_ct=ContentType.objects.get_for_model(Role),
146
        target_id=other_role.pk)
147
    role.permissions.add(perm_role)
148

  
149
    export = role.export_json(permissions=True)
150
    permissions = export['permissions']
151
    assert len(permissions) == 2
152
    assert permissions[0] == [
153
        u'add', [u'basic-ou'], (u'contenttypes', u'contenttype'), (u'saml', u'libertyprovider')]
154
    assert permissions[1] == [
155
        u'add', None, (u'a2_rbac', u'role'), [u'other-role-slug', ['some-ou'], None]]
156

  
157

  
158
def test_basic_role_import_json(db):
159
    role_dict = dict(
160
        name='basic role', slug='basic-role', description='basic role description',
161
        uuid=get_hex_uuid(), admin_scope_id=1, external_id='some id')
162
    role = Role.import_json(role_dict)
163
    assert role_dict['name'] == role.name
164
    assert role_dict['slug'] == role.slug
165
    assert role_dict['uuid'] == role.uuid
166
    assert role_dict['description'] == role.description
167
    assert role_dict['admin_scope_id'] == role.admin_scope_id
168
    assert role_dict['external_id'] == role.external_id
169

  
170

  
171
def test_ou_export_json(db):
172
    ou = OU.objects.create(
173
        name='basic ou', slug='basic-ou', description='basic ou description',
174
        username_is_unique=True, email_is_unique=True, default=False, validate_emails=True)
175
    ou_dict = ou.export_json()
176
    assert ou_dict['name'] == ou.name
177
    assert ou_dict['slug'] == ou.slug
178
    assert ou_dict['uuid'] == ou.uuid
179
    assert ou_dict['description'] == ou.description
180
    assert ou_dict['username_is_unique'] == ou.username_is_unique
181
    assert ou_dict['email_is_unique'] == ou.email_is_unique
182
    assert ou_dict['default'] == ou.default
183
    assert ou_dict['validate_emails'] == ou.validate_emails
184

  
185

  
186
def test_ou_import_json(db):
187
    ou_d = dict(name='basic ou', slug='basic-ou', description='basic ou description',
188
                username_is_unique=True, email_is_unique=True, default=False, validate_emails=True)
189
    ou = OU.import_json(ou_d)
190
    for field, value in ou_d.items():
191
        assert getattr(ou, field) == value
tests/test_data_transfer.py
1
from django_rbac.utils import get_role_model, get_ou_model
2
import pytest
3

  
4
from authentic2.a2_rbac.models import RoleParenting
5
from authentic2.data_transfer import (
6
    DataImportError, export_roles, import_site, export_ou, ImportContext,
7
    RoleDeserializer, search_role, import_ou, build_role_natural_key)
8
from authentic2.utils import get_hex_uuid
9

  
10

  
11
Role = get_role_model()
12
OU = get_ou_model()
13

  
14

  
15
def test_export_basic_role(db):
16
    role = Role.objects.create(name='basic role', slug='basic-role', uuid=get_hex_uuid())
17
    query_set = Role.objects.filter(uuid=role.uuid)
18
    roles = export_roles(query_set)
19
    assert len(roles) == 1
20
    role_dict = roles[0]
21
    for key, value in role.export_json().items():
22
        assert role_dict[key] == value
23

  
24

  
25
def test_export_role_with_parents(db):
26
    grand_parent_role = Role.objects.create(
27
        name='test grand parent role', slug='test-grand-parent-role', uuid=get_hex_uuid())
28
    parent_1_role = Role.objects.create(
29
        name='test parent 1 role', slug='test-parent-1-role', uuid=get_hex_uuid())
30
    parent_1_role.add_parent(grand_parent_role)
31
    parent_2_role = Role.objects.create(
32
        name='test parent 2 role', slug='test-parent-2-role', uuid=get_hex_uuid())
33
    parent_2_role.add_parent(grand_parent_role)
34
    child_role = Role.objects.create(
35
        name='test child role', slug='test-child-role', uuid=get_hex_uuid())
36
    child_role.add_parent(parent_1_role)
37
    child_role.add_parent(parent_2_role)
38

  
39
    query_set = Role.objects.filter(slug__startswith='test').order_by('slug')
40
    roles = export_roles(query_set)
41
    assert len(roles) == 4
42

  
43
    child_role_dict = roles[0]
44
    assert child_role_dict['slug'] == child_role.slug
45
    parents = child_role_dict['parents']
46
    assert len(parents) == 2
47
    expected_slugs = set([parent_1_role.slug, parent_2_role.slug])
48
    for parent in parents:
49
        assert parent['slug'] in expected_slugs
50
        expected_slugs.remove(parent['slug'])
51

  
52
    grand_parent_role_dict = roles[1]
53
    assert grand_parent_role_dict['slug'] == grand_parent_role.slug
54

  
55
    parent_1_role_dict = roles[2]
56
    assert parent_1_role_dict['slug'] == parent_1_role.slug
57
    parents = parent_1_role_dict['parents']
58
    assert len(parents) == 1
59
    assert parents[0]['slug'] == grand_parent_role.slug
60

  
61
    parent_2_role_dict = roles[3]
62
    assert parent_2_role_dict['slug'] == parent_2_role.slug
63
    parents = parent_2_role_dict['parents']
64
    assert len(parents) == 1
65
    assert parents[0]['slug'] == grand_parent_role.slug
66

  
67

  
68
def test_export_ou(db):
69
    ou = OU.objects.create(name='ou name', slug='ou-slug')
70
    ous = export_ou(OU.objects.filter(name='ou name'))
71
    assert len(ous) == 1
72
    ou_d = ous[0]
73
    assert ou_d['name'] == ou.name
74
    assert ou_d['slug'] == ou.slug
75

  
76

  
77
def test_search_role_by_uuid(db):
78
    uuid = get_hex_uuid()
79
    role_d = {'uuid': uuid, 'slug': 'role-slug'}
80
    role = Role.objects.create(**role_d)
81
    assert role == search_role({'uuid': uuid, 'slug': 'other-role-slug'})
82

  
83

  
84
def test_search_role_by_slug(db):
85
    role_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug'}
86
    role = Role.objects.create(**role_d)
87
    assert role == search_role({
88
        'uuid': get_hex_uuid(), 'slug': 'role-slug',
89
        'ou': None, 'service': None})
90

  
91

  
92
def test_search_role_not_found(db):
93
    assert search_role(
94
        {
95
            'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name',
96
            'ou': None, 'service': None}) is None
97

  
98

  
99
def test_search_role_slug_not_unique(db):
100
    role1_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name'}
101
    role2_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name'}
102
    ou = OU.objects.create(name='some ou', slug='some-ou')
103
    role1 = Role.objects.create(ou=ou, **role1_d)
104
    Role.objects.create(**role2_d)
105
    assert role1 == search_role(role1.export_json())
106

  
107

  
108
def test_role_deserializer(db):
109
    rd = RoleDeserializer({
110
        'name': 'some role', 'description': 'some role description', 'slug': 'some-role',
111
        'uuid': get_hex_uuid(), 'ou': None, 'service': None}, ImportContext())
112
    assert rd._ou is None
113
    assert rd._parents is None
114
    assert rd._attributes is None
115
    assert rd._obj is None
116
    role, status = rd.deserialize()
117
    assert status == 'created'
118
    assert role.name == 'some role'
119
    assert role.description == 'some role description'
120
    assert role.slug == 'some-role'
121
    assert rd._obj == role
122

  
123

  
124
def test_role_deserializer_with_ou(db):
125
    ou = OU.objects.create(name='some ou', slug='some-ou')
126
    rd = RoleDeserializer({
127
        'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'some role description',
128
        'slug': 'some-role', 'ou': {'slug': 'some-ou'}, 'service': None}, ImportContext())
129
    role, status = rd.deserialize()
130
    assert role.ou == ou
131

  
132

  
133
def test_role_deserializer_missing_ou(db):
134
    with pytest.raises(DataImportError):
135
        RoleDeserializer({
136
            'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'role description',
137
            'slug': 'some-role', 'ou': {'slug': 'some-ou'}, 'service': None},
138
            ImportContext())
139

  
140

  
141
def test_role_deserializer_update_ou(db):
142
    ou1 = OU.objects.create(name='ou 1', slug='ou-1')
143
    ou2 = OU.objects.create(name='ou 2', slug='ou-2')
144
    uuid = get_hex_uuid()
145
    existing_role = Role.objects.create(uuid=uuid, slug='some-role', ou=ou1)
146
    rd = RoleDeserializer({
147
        'uuid': uuid, 'name': 'some-role', 'slug': 'some-role',
148
        'ou': {'slug': 'ou-2'}, 'service': None}, ImportContext())
149
    role, status = rd.deserialize()
150
    assert role == existing_role
151
    assert role.ou == ou2
152

  
153

  
154
def test_role_deserializer_update_fields(db):
155
    uuid = get_hex_uuid()
156
    existing_role = Role.objects.create(uuid=uuid, slug='some-role', name='some role')
157
    rd = RoleDeserializer({
158
        'uuid': uuid, 'slug': 'some-role', 'name': 'some role changed',
159
        'ou': None, 'service': None}, ImportContext())
160
    role, status = rd.deserialize()
161
    assert role == existing_role
162
    assert role.name == 'some role changed'
163

  
164

  
165
def test_role_deserializer_with_attributes(db):
166

  
167
    attributes_data = {
168
        'attr1_name': dict(name='attr1_name', kind='string', value='attr1_value'),
169
        'attr2_name': dict(name='attr2_name', kind='string', value='attr2_value')
170
    }
171
    rd = RoleDeserializer({
172
        'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'some role description',
173
        'slug': 'some-role', 'attributes': list(attributes_data.values()),
174
        'ou': None, 'service': None}, ImportContext())
175
    role, status = rd.deserialize()
176
    created, deleted = rd.attributes()
177
    assert role.attributes.count() == 2
178
    assert len(created) == 2
179

  
180
    for attr in created:
181
        attr_dict = attributes_data[attr.name]
182
        assert attr_dict['name'] == attr.name
183
        assert attr_dict['kind'] == attr.kind
184
        assert attr_dict['value'] == attr.value
185
        del attributes_data[attr.name]
186

  
187

  
188
def test_role_deserializer_creates_admin_role(db):
189
    role_dict = {
190
        'name': 'some role', 'slug': 'some-role', 'uuid': get_hex_uuid(),
191
        'ou': None, 'service': None}
192
    rd = RoleDeserializer(role_dict, ImportContext())
193
    rd.deserialize()
194
    Role.objects.get(slug='_a2-managers-of-role-some-role')
195

  
196

  
197
def test_role_deserializer_parenting_existing_parent(db):
198
    parent_role_dict = {
199
        'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
200
        'ou': None, 'service': None}
201
    parent_role = Role.import_json(parent_role_dict)
202
    child_role_dict = {
203
        'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict],
204
        'uuid': get_hex_uuid(), 'ou': None, 'service': None}
205

  
206
    rd = RoleDeserializer(child_role_dict, ImportContext())
207
    child_role, status = rd.deserialize()
208
    created, deleted = rd.parentings()
209

  
210
    assert len(created) == 1
211
    parenting = created[0]
212
    assert parenting.direct is True
213
    assert parenting.parent == parent_role
214
    assert parenting.child == child_role
215

  
216

  
217
def test_role_deserializer_parenting_non_existing_parent(db):
218
    parent_role_dict = {
219
        'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
220
        'ou': None, 'service': None}
221
    child_role_dict = {
222
        'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict],
223
        'uuid': get_hex_uuid(), 'ou': None, 'service': None}
224
    rd = RoleDeserializer(child_role_dict, ImportContext())
225
    rd.deserialize()
226
    with pytest.raises(DataImportError) as excinfo:
227
        rd.parentings()
228

  
229
    assert "Could not find role" in str(excinfo.value)
230

  
231

  
232
def test_role_deserializer_permissions(db):
233
    ou = OU.objects.create(slug='some-ou')
234
    other_role_dict = {
235
        'name': 'other role', 'slug': 'other-role-slug', 'uuid': get_hex_uuid(), 'ou': ou}
236
    other_role = Role.objects.create(**other_role_dict)
237
    some_role_dict = {
238
        'name': 'some role', 'slug': 'some-role', 'uuid': get_hex_uuid(),
239
        'ou': None, 'service': None}
240
    some_role_dict['permissions'] = [
241
        [u'add', None, (u'a2_rbac', u'role'), [u'other-role-slug', ['some-ou'], None]]
242
    ]
243

  
244
    import_context = ImportContext()
245
    rd = RoleDeserializer(some_role_dict, import_context)
246
    rd.deserialize()
247
    perm_created, perm_deleted = rd.permissions()
248

  
249
    assert len(perm_created) == 1
250
    assert len(perm_deleted) == 0
251
    del some_role_dict['permissions']
252
    role = Role.objects.get(slug=some_role_dict['slug'])
253
    assert role.permissions.count() == 1
254
    perm = role.permissions.first()
255
    assert perm.operation.slug == 'add'
256
    assert not perm.ou
257
    assert perm.target == other_role
258

  
259
    # that one should delete permissions
260
    rd = RoleDeserializer(some_role_dict, import_context)
261
    role, _ = rd.deserialize()
262
    perm_created, perm_deleted = rd.permissions()
263
    assert role.permissions.count() == 0
264
    assert len(perm_created) == 0
265
    assert len(perm_deleted) == 1
266

  
267

  
268
def import_ou_created(db):
269
    uuid = get_hex_uuid()
270
    ou_d = {'uuid': uuid, 'slug': 'ou-slug', 'name': 'ou name'}
271
    ou, status = import_ou(ou_d)
272
    assert status == 'created'
273
    assert ou.uuid == ou_d['uuid']
274
    assert ou.slug == ou_d['slug']
275
    assert ou.name == ou_d['name']
276

  
277

  
278
def import_ou_updated(db):
279
    ou = OU.objects.create(slug='some-ou', name='ou name')
280
    ou_d = {'uuid': ou.uuid, 'slug': ou.slug, 'name': 'new name'}
281
    ou_updated, status = import_ou(ou_d)
282
    assert status == 'updated'
283
    assert ou == ou_updated
284
    assert ou.name == 'new name'
285

  
286

  
287
def testi_import_site_empty():
288
    res = import_site({}, ImportContext())
289
    assert res.roles == {'created': [], 'updated': []}
290
    assert res.ous == {'created': [], 'updated': []}
291
    assert res.parentings == {'created': [], 'deleted': []}
292

  
293

  
294
def test_import_site_roles(db):
295
    parent_role_dict = {
296
        'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
297
        'ou': None, 'service': None}
298
    child_role_dict = {
299
        'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict],
300
        'uuid': get_hex_uuid(), 'ou': None, 'service': None}
301
    roles = [
302
        parent_role_dict,
303
        child_role_dict
304
    ]
305
    res = import_site({'roles': roles}, ImportContext())
306
    created_roles = res.roles['created']
307
    assert len(created_roles) == 2
308
    parent_role = Role.objects.get(**parent_role_dict)
309
    del child_role_dict['parents']
310
    child_role = Role.objects.get(**child_role_dict)
311
    assert created_roles[0] == parent_role
312
    assert created_roles[1] == child_role
313

  
314
    assert len(res.parentings['created']) == 1
315
    assert res.parentings['created'][0] == RoleParenting.objects.get(
316
        child=child_role, parent=parent_role, direct=True)
317

  
318

  
319
def test_roles_import_ignore_technical_role(db):
320
    roles = [{
321
        'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}]
322
    res = import_site({'roles': roles}, ImportContext())
323
    assert res.roles == {'created': [], 'updated': []}
324

  
325

  
326
def test_roles_import_ignore_technical_role_with_service(db):
327
    roles = [{
328
        'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}]
329
    res = import_site({'roles': roles}, ImportContext())
330
    assert res.roles == {'created': [], 'updated': []}
331

  
332

  
333
def test_import_role_handle_manager_role_parenting(db):
334
    parent_role_dict = {
335
        'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
336
        'ou': None, 'service': None}
337
    parent_role_manager_dict = {
338
        'name': 'Administrateur du role grand parent role',
339
        'slug': '_a2-managers-of-role-grand-parent-role', 'uuid': get_hex_uuid(),
340
        'ou': None, 'service': None}
341
    child_role_dict = {
342
        'name': 'child role', 'slug': 'child-role',
343
        'parents': [parent_role_dict, parent_role_manager_dict],
344
        'uuid': get_hex_uuid(), 'ou': None, 'service': None}
345
    import_site({'roles': [child_role_dict, parent_role_dict]}, ImportContext())
346
    child = Role.objects.get(slug='child-role')
347
    manager = Role.objects.get(slug='_a2-managers-of-role-grand-parent-role')
348
    RoleParenting.objects.get(child=child, parent=manager, direct=True)
349

  
350

  
351
def test_import_roles_role_delete_orphans(db):
352
    roles = [{
353
        'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}]
354
    with pytest.raises(DataImportError):
355
        import_site({'roles': roles}, ImportContext(role_delete_orphans=True))
356

  
357

  
358
def test_import_ou(db):
359
    uuid = get_hex_uuid()
360
    name = 'ou name'
361
    ous = [{'uuid': uuid, 'slug': 'ou-slug', 'name': name}]
362
    res = import_site({'ous': ous}, ImportContext())
363
    assert len(res.ous['created']) == 1
364
    ou = res.ous['created'][0]
365
    assert ou.uuid == uuid
366
    assert ou.name == name
367
    Role.objects.get(slug='_a2-managers-of-ou-slug')
368

  
369

  
370
def test_import_ou_already_existing(db):
371
    uuid = get_hex_uuid()
372
    ou_d = {'uuid': uuid, 'slug': 'ou-slug', 'name': 'ou name'}
373
    ou = OU.objects.create(**ou_d)
374
    num_ous = OU.objects.count()
375
    res = import_site({'ous': [ou_d]}, ImportContext())
376
    assert len(res.ous['created']) == 0
377
    assert num_ous == OU.objects.count()
378
    assert ou == OU.objects.get(uuid=uuid)
tests/test_import_export_site_cmd.py
1
import __builtin__
2
import json
3

  
4
from django.core import management
5
import pytest
6

  
7
from django_rbac.utils import get_role_model
8

  
9

  
10
def dummy_export_site(*args):
11
    return {'roles': [{'name': 'role1'}]}
12

  
13

  
14
def test_export_role_cmd_stdout(db, capsys, monkeypatch):
15
    import authentic2.management.commands.export_site
16
    monkeypatch.setattr(
17
        authentic2.management.commands.export_site, 'export_site', dummy_export_site)
18
    management.call_command('export_site')
19
    out, err = capsys.readouterr()
20
    assert json.loads(out) == dummy_export_site()
21

  
22

  
23
def test_export_role_cmd_to_file(db, monkeypatch, tmpdir):
24
    import authentic2.management.commands.export_site
25
    monkeypatch.setattr(
26
        authentic2.management.commands.export_site, 'export_site', dummy_export_site)
27
    outfile = tmpdir.join('export.json')
28
    management.call_command('export_site', '--output', outfile.strpath)
29
    with outfile.open('r') as f:
30
        assert json.loads(f.read()) == dummy_export_site()
31

  
32

  
33
def test_import_site_cmd(db, tmpdir, monkeypatch):
34
    export_file = tmpdir.join('roles-export.json')
35
    with export_file.open('w'):
36
        export_file.write(json.dumps({'roles': []}))
37
    management.call_command('import_site', '-y', export_file.strpath)
38

  
39

  
40
def test_import_site_cmd_infos_on_stdout(db, tmpdir, monkeypatch, capsys):
41
    export_file = tmpdir.join('roles-export.json')
42
    with export_file.open('w'):
43
        export_file.write(json.dumps(
44
            {'roles': [{
45
                'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
46
                'ou': None, 'service': None}]}))
47

  
48
    management.call_command('import_site', '-y', export_file.strpath)
49

  
50
    out, err = capsys.readouterr()
51
    assert "Real run" in out
52
    assert "1 roles created" in out
53
    assert "0 roles updated" in out
54

  
55

  
56
def test_import_site_transaction_rollback_on_error(db, tmpdir, monkeypatch, capsys):
57
    export_file = tmpdir.join('roles-export.json')
58
    with export_file.open('w'):
59
        export_file.write(json.dumps({'roles': []}))
60

  
61
    Role = get_role_model()
62

  
63
    def exception_import_site(*args):
64
        Role.objects.create(slug='role-slug')
65
        raise Exception()
66

  
67
    import authentic2.management.commands.import_site
68
    monkeypatch.setattr(
69
        authentic2.management.commands.import_site, 'import_site', exception_import_site)
70

  
71
    with pytest.raises(Exception):
72
        management.call_command('import_site', '--yes', export_file.strpath)
73

  
74
    with pytest.raises(Role.DoesNotExist):
75
        Role.objects.get(slug='role-slug')
76

  
77

  
78
def test_import_site_transaction_rollback_on_dry_run(db, tmpdir, monkeypatch, capsys):
79
    export_file = tmpdir.join('roles-export.json')
80
    with export_file.open('w'):
81
        export_file.write(json.dumps(
82
            {'roles': [{
83
                'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
84
                'ou': None, 'service': None}]}))
85

  
86
    Role = get_role_model()
87

  
88
    management.call_command('import_site', '--dry-run', export_file.strpath)
89

  
90
    with pytest.raises(Role.DoesNotExist):
91
        Role.objects.get(slug='role-slug')
92

  
93

  
94
def test_import_site_cmd_unhandled_context_option(db, tmpdir, monkeypatch, capsys):
95
    from authentic2.data_transfer import DataImportError
96

  
97
    export_file = tmpdir.join('roles-export.json')
98
    with export_file.open('w'):
99
        export_file.write(json.dumps(
100
            {'roles': [{
101
                'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
102
                'ou': None, 'service': None}]}))
103

  
104
    get_role_model().objects.create(uuid='dqfewrvesvews2532', slug='role-slug', name='role-name')
105

  
106
    with pytest.raises(DataImportError):
107
        management.call_command(
108
            'import_site', '-y', '-o', 'role-delete-orphans', export_file.strpath)
109

  
110

  
111
def test_import_site_cmd_unknown_context_option(db, tmpdir, monkeypatch, capsys):
112
    from django.core.management.base import CommandError
113
    export_file = tmpdir.join('roles-export.json')
114
    with pytest.raises(CommandError):
115
        management.call_command('import_site', '-y', '-o', 'unknown-option', export_file.strpath)
116

  
117

  
118
def test_import_site_confirm_prompt_yes(db, tmpdir, monkeypatch):
119
    export_file = tmpdir.join('roles-export.json')
120
    with export_file.open('w'):
121
        export_file.write(json.dumps(
122
            {'roles': [{
123
                'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
124
                'ou': None, 'service': None}]}))
125

  
126
    def yes_raw_input(*args, **kwargs):
127
        return 'yes'
128

  
129
    monkeypatch.setattr(__builtin__, 'raw_input', yes_raw_input)
130

  
131
    management.call_command('import_site', export_file.strpath, stdin='yes')
132
    assert get_role_model().objects.get(uuid='dqfewrvesvews2532')
133

  
134

  
135
def test_import_site_confirm_prompt_no(db, monkeypatch, tmpdir):
136
    export_file = tmpdir.join('roles-export.json')
137
    with export_file.open('w'):
138
        export_file.write(json.dumps(
139
            {'roles': [{
140
                'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
141
                'ou': None, 'service': None}]}))
142

  
143
    def no_raw_input(*args, **kwargs):
144
        return 'no'
145

  
146
    monkeypatch.setattr(__builtin__, 'raw_input', no_raw_input)
147

  
148
    with pytest.raises(SystemExit):
149
        management.call_command('import_site', export_file.strpath)
150

  
151
    Role = get_role_model()
152
    with pytest.raises(Role.DoesNotExist):
153
        Role.objects.get(uuid='dqfewrvesvews2532')
0
-