Projet

Général

Profil

0002-create-import_site-and-export_site-commands-16514.patch

Emmanuel Cazenave, 13 avril 2018 17:17

Télécharger (47,5 ko)

Voir les différences:

Subject: [PATCH 2/2] create 'import_site' and 'export_site' commands (#16514)

 src/authentic2/a2_rbac/models.py                  |  36 ++
 src/authentic2/data_transfer.py                   | 281 +++++++++++++
 src/authentic2/management/commands/export_site.py |  24 ++
 src/authentic2/management/commands/import_site.py |  71 ++++
 src/authentic2/utils.py                           |   5 +
 src/django_rbac/models.py                         |  11 +
 tests/test_a2_rbac.py                             | 158 +++++++-
 tests/test_data_transfer.py                       | 471 ++++++++++++++++++++++
 tests/test_import_export_site_cmd.py              | 132 ++++++
 9 files changed, 1188 insertions(+), 1 deletion(-)
 create mode 100644 src/authentic2/data_transfer.py
 create mode 100644 src/authentic2/management/commands/export_site.py
 create mode 100644 src/authentic2/management/commands/import_site.py
 create mode 100644 tests/test_data_transfer.py
 create mode 100644 tests/test_import_export_site_cmd.py
src/authentic2/a2_rbac/models.py
92 92
    def cached(cls):
93 93
        return cls.objects.all()
94 94

  
95
    def export_json(self):
96
        return {
97
            'uuid': self.uuid, 'slug': self.slug, 'name': self.name,
98
            'description': self.description, 'default': self.default,
99
            'email_is_unique': self.email_is_unique,
100
            'username_is_unique': self.username_is_unique,
101
            'validate_emails': self.validate_emails
102
        }
103

  
95 104

  
96 105
OrganizationalUnit._meta.natural_key = [['uuid'], ['slug'], ['name']]
97 106

  
......
213 222
            'ou__slug': self.ou.slug if self.ou else None,
214 223
        }
215 224

  
225
    def export_json(self, attributes=False, parents=False, permissions=False):
226
        d = {
227
            'uuid': self.uuid, 'slug': self.slug, 'name': self.name,
228
            'description': self.description, 'external_id': self.external_id,
229
            'ou': self.ou and self.ou.natural_key_json(),
230
            'service': self.service and self.service.natural_key_json()
231
        }
232

  
233
        if attributes:
234
            for attribute in self.attributes.all():
235
                d.setdefault('attributes', []).append(attribute.to_json())
236

  
237
        if parents:
238
            RoleParenting = rbac_utils.get_role_parenting_model()
239
            for parenting in RoleParenting.objects.filter(child_id=self.id, direct=True):
240
                d.setdefault('parents', []).append(parenting.parent.natural_key_json())
241

  
242
        if permissions:
243
            for perm in self.permissions.all():
244
                d.setdefault('permissions', []).append(perm.export_json())
245

  
246
        return d
247

  
216 248

  
217 249
Role._meta.natural_key = [
218 250
    ['uuid'], ['slug', 'ou'], ['name', 'ou'], ['slug', 'service'], ['name', 'service']
......
250 282
            ('role', 'name', 'kind', 'value'),
251 283
        )
252 284

  
285
    def to_json(self):
286
        return {'name': self.name, 'kind': self.kind, 'value': self.value}
287

  
288

  
253 289
GenericRelation(Permission,
254 290
                content_type_field='target_ct',
255 291
                object_id_field='target_id').contribute_to_class(ContentType, 'admin_perms')
src/authentic2/data_transfer.py
1
from django.contrib.contenttypes.models import ContentType
2

  
3
from django_rbac.models import Operation
4
from django_rbac.utils import (
5
    get_ou_model,  get_role_model, get_role_parenting_model, get_permission_model)
6
from authentic2.a2_rbac.models import RoleAttribute
7
from authentic2.utils import update_model
8

  
9

  
10
def export_site():
11
    return {
12
        'roles': export_roles(get_role_model().objects.all()),
13
        'ous': export_ou(get_ou_model().objects.all())
14
    }
15

  
16

  
17
def export_ou(ou_query_set):
18
    return [ou.export_json() for ou in ou_query_set]
19

  
20

  
21
def export_roles(role_queryset):
22
    """ Serialize roles in role_queryset
23
    """
24
    return [
25
        role.export_json(attributes=True, parents=True, permissions=True)
26
        for role in role_queryset
27
    ]
28

  
29

  
30
def search_ou(ou_d):
31
    try:
32
        OU = get_ou_model()
33
        return OU.objects.get_by_natural_key_json(ou_d)
34
    except OU.DoesNotExist:
35
        return None
36

  
37

  
38
def search_role(role_d):
39
    Role = get_role_model()
40
    try:
41
        Role = get_role_model()
42
        return Role.objects.get_by_natural_key_json(role_d)
43
    except Role.DoesNotExist:
44
        return None
45

  
46

  
47
class ImportContext(object):
48
    """ Holds information on how to perform the import.
49

  
50
    ou_delete_orphans: if True any existing ou that is not found in the export will
51
                       be deleted
52

  
53
    role_delete_orphans: if True any existing role that is not found in the export will
54
                         be deleted
55

  
56

  
57
    role_attributes_update: for each role in the import data,
58
                            attributes  will deleted and re-created
59

  
60

  
61
    role_parentings_update: for each role in the import data,
62
                            parentings will deleted and re-created
63

  
64
    role_permissions_update: for each role in the import data,
65
                             permissions  will deleted and re-created
66
    """
67

  
68
    def __init__(
69
            self, role_delete_orphans=False, role_parentings_update=True,
70
            role_permissions_update=True, role_attributes_update=True,
71
            ou_delete_orphans=False):
72
        self.role_delete_orphans = role_delete_orphans
73
        self.ou_delete_orphans = ou_delete_orphans
74
        self.role_parentings_update = role_parentings_update
75
        self.role_permissions_update = role_permissions_update
76
        self.role_attributes_update = role_attributes_update
77

  
78

  
79
class DataImportError(Exception):
80
    pass
81

  
82

  
83
class RoleDeserializer(object):
84

  
85
    def __init__(self, d, import_context):
86
        self._import_context = import_context
87
        self._obj = None
88
        self._parents = None
89
        self._attributes = None
90
        self._permissions = None
91

  
92
        self._role_d = dict()
93
        for key, value in d.items():
94
            if key == 'parents':
95
                self._parents = value
96
            elif key == 'attributes':
97
                self._attributes = value
98
            elif key == 'permissions':
99
                self._permissions = value
100
            else:
101
                self._role_d[key] = value
102

  
103
    def deserialize(self):
104
        ou_d = self._role_d['ou']
105
        has_ou = bool(ou_d)
106
        ou = None if not has_ou else search_ou(ou_d)
107
        if has_ou and not ou:
108
            raise DataImportError(
109
                    "Can't import role because missing Organizational Unit : "
110
                    "%s" % ou_d)
111

  
112
        kwargs = self._role_d.copy()
113
        del kwargs['ou']
114
        del kwargs['service']
115
        if has_ou:
116
            kwargs['ou'] = ou
117

  
118
        obj = search_role(self._role_d)
119
        if obj:  # Role already exist
120
            self._obj = obj
121
            status = 'updated'
122
            update_model(self._obj, kwargs)
123
        else:  # Create role
124
            self._obj = get_role_model().objects.create(**kwargs)
125
            status = 'created'
126

  
127
        # Ensure admin role is created.
128
        # Absoluteley necessary to create
129
        # parentings relationship later on,
130
        # since we don't deserialize technical role.
131
        self._obj.get_admin_role()
132
        return self._obj, status
133

  
134
    def attributes(self):
135
        """ Update attributes (delete everything then create)
136
        """
137
        created, deleted = [], []
138
        for attr in self._obj.attributes.all():
139
            attr.delete()
140
            deleted.append(attr)
141
        # Create attributes
142
        if self._attributes:
143
            for attr_dict in self._attributes:
144
                attr_dict['role'] = self._obj
145
                created.append(RoleAttribute.objects.create(**attr_dict))
146

  
147
        return created, deleted
148

  
149
    def parentings(self):
150
        """ Update parentings (delete everything then create)
151
        """
152
        created, deleted = [], []
153
        Parenting = get_role_parenting_model()
154
        for parenting in Parenting.objects.filter(child=self._obj, direct=True):
155
            parenting.delete()
156
            deleted.append(parenting)
157

  
158
        if self._parents:
159
            for parent_d in self._parents:
160
                parent = search_role(parent_d)
161
                if not parent:
162
                    raise DataImportError("Could not find role : %s" % parent_d)
163
                created.append(Parenting.objects.create(
164
                    child=self._obj, direct=True, parent=parent))
165

  
166
        return created, deleted
167

  
168
    def permissions(self):
169
        """ Update permissions (delete everything then create)
170
        """
171
        created, deleted = [], []
172
        for perm in self._obj.permissions.all():
173
            perm.delete()
174
            deleted.append(perm)
175
        self._obj.permissions.clear()
176
        if self._permissions:
177
            for perm in self._permissions:
178
                op = Operation.objects.get_by_natural_key_json(perm['operation'])
179
                ou = get_ou_model().objects.get_by_natural_key_json(
180
                    perm['ou']) if perm['ou'] else None
181
                ct = ContentType.objects.get_by_natural_key_json(perm['target_ct'])
182
                target = ct.model_class().objects.get_by_natural_key_json(perm['target'])
183
                perm = get_permission_model().objects.create(
184
                    operation=op, ou=ou, target_ct=ct, target_id=target.pk)
185
                self._obj.permissions.add(perm)
186
                created.append(perm)
187

  
188
        return created, deleted
189

  
190

  
191
class ImportResult(object):
192

  
193
    def __init__(self):
194
        self.roles = {'created': [], 'updated': []}
195
        self.ous = {'created': [], 'updated': []}
196
        self.attributes = {'created': [], 'deleted': []}
197
        self.parentings = {'created': [], 'deleted': []}
198
        self.permissions = {'created': [], 'deleted': []}
199

  
200
    def update_roles(self, role, d_status):
201
        self.roles[d_status].append(role)
202

  
203
    def update_ous(self, ou, status):
204
        self.ous[status].append(ou)
205

  
206
    def _bulk_update(self, attrname, created, deleted):
207
        attr = getattr(self, attrname)
208
        attr['created'].extend(created)
209
        attr['deleted'].extend(deleted)
210

  
211
    def update_attributes(self, created, deleted):
212
        self._bulk_update('attributes', created, deleted)
213

  
214
    def update_parentings(self, created, deleted):
215
        self._bulk_update('parentings', created, deleted)
216

  
217
    def update_permissions(self, created, deleted):
218
        self._bulk_update('permissions', created, deleted)
219

  
220
    def to_str(self, verbose=False):
221
        res = ""
222
        for attr in ('roles', 'ous', 'parentings', 'permissions', 'attributes'):
223
            data = getattr(self, attr)
224
            for status in ('created', 'updated', 'deleted'):
225
                if status in data:
226
                    s_data = data[status]
227
                    res += "%s %s %s\n" % (len(s_data), attr, status)
228
        return res
229

  
230

  
231
def import_ou(ou_d):
232
    OU = get_ou_model()
233
    # ou = search_ou([ou_d['slug']])
234
    ou = search_ou(ou_d)
235
    if ou is None:
236
        ou = OU.objects.create(**ou_d)
237
        status = 'created'
238
    else:
239
        update_model(ou, ou_d)
240
        status = 'updated'
241
    # Ensure admin role is created
242
    ou.get_admin_role()
243
    return ou, status
244

  
245

  
246
def import_site(json_d, import_context):
247
    result = ImportResult()
248

  
249
    for ou_d in json_d.get('ous', []):
250
        result.update_ous(*import_ou(ou_d))
251

  
252
    roles_ds = [RoleDeserializer(role_d, import_context) for role_d in json_d.get('roles', [])
253
                if not role_d['slug'].startswith('_')]
254

  
255
    for ds in roles_ds:
256
        result.update_roles(*ds.deserialize())
257

  
258
    if import_context.role_attributes_update:
259
        for ds in roles_ds:
260
            result.update_attributes(*ds.attributes())
261

  
262
    if import_context.role_parentings_update:
263
        for ds in roles_ds:
264
            result.update_parentings(*ds.parentings())
265

  
266
    if import_context.role_permissions_update:
267
        for ds in roles_ds:
268
            result.update_permissions(*ds.permissions())
269

  
270
    if import_context.ou_delete_orphans:
271
        raise DataImportError(
272
            "Unsupported context value for ou_delete_orphans : %s" % (
273
                import_context.ou_delete_orphans))
274

  
275
    if import_context.role_delete_orphans:
276
        # FIXME : delete each role that is in DB but not in the export
277
        raise DataImportError(
278
            "Unsupported context value for role_delete_orphans : %s" % (
279
                import_context.role_delete_orphans))
280

  
281
    return result
src/authentic2/management/commands/export_site.py
1
import json
2
import sys
3

  
4
from django.core.management.base import BaseCommand
5

  
6
from authentic2.data_transfer import export_site
7
from django_rbac.utils import get_role_model
8

  
9

  
10
class Command(BaseCommand):
11
    help = 'Export site'
12

  
13
    def add_arguments(self, parser):
14
        parser.add_argument('--output', metavar='FILE', default=None,
15
                            help='name of a file to write output to')
16

  
17
    def handle(self, *args, **options):
18
        if options['output']:
19
            output, close = open(options['output'], 'w'), True
20
        else:
21
            output, close = sys.stdout, False
22
        json.dump(export_site(), output, indent=4)
23
        if close:
24
            output.close()
src/authentic2/management/commands/import_site.py
1
import contextlib
2
import json
3
import sys
4

  
5
from django.conf import settings
6
from django.core.management.base import BaseCommand
7
from django.db import transaction
8
from django.utils import translation
9

  
10
from authentic2.data_transfer import import_site, ImportContext
11

  
12

  
13
class DryRunException(Exception):
14
    pass
15

  
16

  
17
def create_context_args(options):
18
    kwargs = {}
19
    if options['option']:
20
        for context_op in options['option']:
21
            context_op = context_op.replace('-', '_')
22
            if context_op.startswith('no_'):
23
                kwargs[context_op[3:]] = False
24
            else:
25
                kwargs[context_op] = True
26
    return kwargs
27

  
28

  
29
#  Borrowed from https://bugs.python.org/issue10049#msg118599
30
@contextlib.contextmanager
31
def provision_contextm(dry_run, settings):
32
    if dry_run and 'hobo.agent.authentic2' in settings.INSTALLED_APPS:
33
        import hobo.agent.authentic2
34
        with hobo.agent.authentic2.provisionning.Provisionning():
35
            yield
36
    else:
37
        yield
38

  
39

  
40
class Command(BaseCommand):
41
    help = 'Import site'
42

  
43
    def add_arguments(self, parser):
44
        parser.add_argument(
45
            'filename', metavar='FILENAME', type=str, help='name of file to import')
46
        parser.add_argument(
47
            '--dry-run', action='store_true', dest='dry_run', help='Really perform the import')
48
        parser.add_argument(
49
            '-o', '--option', action='append', help='Import context options',
50
            choices=[
51
                'role-delete-orphans', 'ou-delete-orphans', 'no-role-permissions-update',
52
                'no-role-attributes-update', 'no-role-parentings-update'])
53

  
54
    def handle(self, filename, **options):
55
        translation.activate(settings.LANGUAGE_CODE)
56
        dry_run = options['dry_run']
57
        msg = "Dry run\n" if dry_run else "Real run\n"
58
        c_kwargs = create_context_args(options)
59
        try:
60
            with open(filename, 'r') as f:
61
                with provision_contextm(dry_run, settings):
62
                    with transaction.atomic():
63
                        sys.stdout.write(msg)
64
                        result = import_site(json.load(f), ImportContext(**c_kwargs))
65
                        if dry_run:
66
                            raise DryRunException()
67
        except DryRunException:
68
            pass
69
        sys.stdout.write(result.to_str())
70
        sys.stdout.write("Success\n")
71
        translation.deactivate()
src/authentic2/utils.py
1047 1047
        context=ctx,
1048 1048
        legacy_subject_templates=legacy_subject_templates,
1049 1049
        legacy_body_templates=legacy_body_templates)
1050

  
1051

  
1052
def update_model(obj, d):
1053
    for attr, value in d.items():
1054
        setattr(obj, attr, value)
src/django_rbac/models.py
114 114
    def __unicode__(self):
115 115
        return unicode(_(self.name))
116 116

  
117
    def export_json(self):
118
        return {'slug': self.slug, 'name': self.name}
119

  
117 120
    objects = managers.OperationManager()
118 121

  
119 122

  
......
145 148
                self.target and self.target_ct.natural_key(),
146 149
                self.target and self.target.natural_key()]
147 150

  
151
    def export_json(self):
152
        return {
153
            "operation": self.operation.natural_key_json(),
154
            "ou": self.ou and self.ou.natural_key_json(),
155
            'target_ct': self.target_ct.natural_key_json(),
156
            "target": self.target.natural_key_json()
157
        }
158

  
148 159
    def __unicode__(self):
149 160
        ct = ContentType.objects.get_for_id(self.target_ct_id)
150 161
        ct_ct = ContentType.objects.get_for_model(ContentType)
tests/test_a2_rbac.py
1 1
import pytest
2 2

  
3
from django.contrib.contenttypes.models import ContentType
4
from django_rbac.utils import get_permission_model
5
from django_rbac.models import Operation
6
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU, RoleAttribute
3 7
from authentic2.models import Service
4
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU
8
from authentic2.utils import get_hex_uuid
5 9

  
6 10

  
7 11
def test_role_natural_key(db):
......
24 28
        Role.objects.get_by_natural_key(*r2.natural_key())
25 29
    with pytest.raises(Role.DoesNotExist):
26 30
        Role.objects.get_by_natural_key(*r4.natural_key())
31

  
32

  
33
def test_basic_role_export_json(db):
34
    role = Role.objects.create(
35
        name='basic role', slug='basic-role', description='basic role description')
36
    role_dict = role.export_json()
37
    assert role_dict['name'] == role.name
38
    assert role_dict['slug'] == role.slug
39
    assert role_dict['uuid'] == role.uuid
40
    assert role_dict['description'] == role.description
41
    assert role_dict['external_id'] == role.external_id
42
    assert role_dict['ou'] is None
43
    assert role_dict['service'] is None
44

  
45

  
46
def test_role_with_ou_export_json(db):
47
    ou = OU.objects.create(name='ou', slug='ou')
48
    role = Role.objects.create(name='some role', ou=ou)
49
    role_dict = role.export_json()
50
    assert role_dict['ou'] == {'uuid': ou.uuid, 'slug': ou.slug,  'name': ou.name}
51

  
52

  
53
def test_role_with_service_export_json(db):
54
    service = Service.objects.create(name='service name', slug='service-name')
55
    role = Role.objects.create(name='some role', service=service)
56
    role_dict = role.export_json()
57
    assert role_dict['service'] == {'slug': service.slug,  'ou': None}
58

  
59

  
60
def test_role_with_service_with_ou_export_json(db):
61
    ou = OU.objects.create(name='ou', slug='ou')
62
    service = Service.objects.create(name='service name', slug='service-name', ou=ou)
63
    role = Role.objects.create(name='some role', service=service)
64
    role_dict = role.export_json()
65
    assert role_dict['service'] == {
66
        'slug': service.slug,  'ou': {'uuid': ou.uuid, 'slug': 'ou', 'name': 'ou'}}
67

  
68

  
69
def test_role_with_attributes_export_json(db):
70
    role = Role.objects.create(name='some role')
71
    attr1 = RoleAttribute.objects.create(
72
        role=role, name='attr1_name', kind='string', value='attr1_value')
73
    attr2 = RoleAttribute.objects.create(
74
        role=role, name='attr2_name', kind='string', value='attr2_value')
75

  
76
    role_dict = role.export_json(attributes=True)
77
    attributes = role_dict['attributes']
78
    assert len(attributes) == 2
79

  
80
    expected_attr_names = set([attr1.name, attr2.name])
81
    for attr_dict in attributes:
82
        assert attr_dict['name'] in expected_attr_names
83
        expected_attr_names.remove(attr_dict['name'])
84
        target_attr = RoleAttribute.objects.filter(name=attr_dict['name']).first()
85
        assert attr_dict['kind'] == target_attr.kind
86
        assert attr_dict['value'] == target_attr.value
87

  
88

  
89
def test_role_with_parents_export_json(db):
90
    grand_parent_role = Role.objects.create(
91
        name='test grand parent role', slug='test-grand-parent-role')
92
    parent_1_role = Role.objects.create(
93
        name='test parent 1 role', slug='test-parent-1-role')
94
    parent_1_role.add_parent(grand_parent_role)
95
    parent_2_role = Role.objects.create(
96
        name='test parent 2 role', slug='test-parent-2-role')
97
    parent_2_role.add_parent(grand_parent_role)
98
    child_role = Role.objects.create(
99
        name='test child role', slug='test-child-role')
100
    child_role.add_parent(parent_1_role)
101
    child_role.add_parent(parent_2_role)
102

  
103
    child_role_dict = child_role.export_json(parents=True)
104
    assert child_role_dict['slug'] == child_role.slug
105
    parents = child_role_dict['parents']
106
    assert len(parents) == 2
107
    expected_slugs = set([parent_1_role.slug, parent_2_role.slug])
108
    for parent in parents:
109
        assert parent['slug'] in expected_slugs
110
        expected_slugs.remove(parent['slug'])
111

  
112
    grand_parent_role_dict = grand_parent_role.export_json(parents=True)
113
    assert grand_parent_role_dict['slug'] == grand_parent_role.slug
114
    assert 'parents' not in grand_parent_role_dict
115

  
116
    parent_1_role_dict = parent_1_role.export_json(parents=True)
117
    assert parent_1_role_dict['slug'] == parent_1_role.slug
118
    parents = parent_1_role_dict['parents']
119
    assert len(parents) == 1
120
    assert parents[0]['slug'] == grand_parent_role.slug
121

  
122
    parent_2_role_dict = parent_2_role.export_json(parents=True)
123
    assert parent_2_role_dict['slug'] == parent_2_role.slug
124
    parents = parent_2_role_dict['parents']
125
    assert len(parents) == 1
126
    assert parents[0]['slug'] == grand_parent_role.slug
127

  
128

  
129
def test_role_with_permission_export_json(db):
130
    some_ou = OU.objects.create(name='some ou', slug='some-ou')
131
    role = Role.objects.create(name='role name', slug='role-slug')
132
    other_role = Role.objects.create(
133
        name='other role name', slug='other-role-slug', uuid=get_hex_uuid(), ou=some_ou)
134
    ou = OU.objects.create(name='basic ou', slug='basic-ou', description='basic ou description')
135
    Permission = get_permission_model()
136
    op = Operation.objects.first()
137
    perm_saml = Permission.objects.create(
138
        operation=op, ou=ou,
139
        target_ct=ContentType.objects.get_for_model(ContentType),
140
        target_id=ContentType.objects.get(app_label="saml", model="libertyprovider").pk)
141
    role.permissions.add(perm_saml)
142
    perm_role = Permission.objects.create(
143
        operation=op, ou=None,
144
        target_ct=ContentType.objects.get_for_model(Role),
145
        target_id=other_role.pk)
146
    role.permissions.add(perm_role)
147

  
148
    export = role.export_json(permissions=True)
149
    permissions = export['permissions']
150
    assert len(permissions) == 2
151
    assert permissions[0] == {
152
        'operation': {'slug': 'add'},
153
        'ou': {'uuid': ou.uuid, 'slug': ou.slug, 'name': ou.name},
154
        'target_ct': {'app_label': u'contenttypes', 'model': u'contenttype'},
155
        'target': {'model': u'libertyprovider', 'app_label': u'saml'}
156
    }
157
    assert permissions[1] == {
158
        'operation': {'slug': 'add'},
159
        'ou': None,
160
        'target_ct': {'app_label': u'a2_rbac', 'model': u'role'},
161
        'target': {
162
            'slug': u'other-role-slug', 'service': None, 'uuid': other_role.uuid,
163
            'ou': {
164
                'slug': u'some-ou', 'uuid': some_ou.uuid, 'name': u'some ou'
165
            },
166
            'name': u'other role name'}
167
    }
168

  
169

  
170
def test_ou_export_json(db):
171
    ou = OU.objects.create(
172
        name='basic ou', slug='basic-ou', description='basic ou description',
173
        username_is_unique=True, email_is_unique=True, default=False, validate_emails=True)
174
    ou_dict = ou.export_json()
175
    assert ou_dict['name'] == ou.name
176
    assert ou_dict['slug'] == ou.slug
177
    assert ou_dict['uuid'] == ou.uuid
178
    assert ou_dict['description'] == ou.description
179
    assert ou_dict['username_is_unique'] == ou.username_is_unique
180
    assert ou_dict['email_is_unique'] == ou.email_is_unique
181
    assert ou_dict['default'] == ou.default
182
    assert ou_dict['validate_emails'] == ou.validate_emails
tests/test_data_transfer.py
1
import json
2

  
3
from django_rbac.utils import get_role_model, get_ou_model
4
import py
5
import pytest
6

  
7
from authentic2.a2_rbac.models import RoleParenting
8
from authentic2.data_transfer import (
9
    DataImportError, export_roles, import_site, export_ou, ImportContext,
10
    RoleDeserializer, search_role, import_ou)
11
from authentic2.utils import get_hex_uuid
12

  
13

  
14
Role = get_role_model()
15
OU = get_ou_model()
16

  
17

  
18
def test_export_basic_role(db):
19
    role = Role.objects.create(name='basic role', slug='basic-role', uuid=get_hex_uuid())
20
    query_set = Role.objects.filter(uuid=role.uuid)
21
    roles = export_roles(query_set)
22
    assert len(roles) == 1
23
    role_dict = roles[0]
24
    for key, value in role.export_json().items():
25
        assert role_dict[key] == value
26

  
27

  
28
def test_export_role_with_parents(db):
29
    grand_parent_role = Role.objects.create(
30
        name='test grand parent role', slug='test-grand-parent-role', uuid=get_hex_uuid())
31
    parent_1_role = Role.objects.create(
32
        name='test parent 1 role', slug='test-parent-1-role', uuid=get_hex_uuid())
33
    parent_1_role.add_parent(grand_parent_role)
34
    parent_2_role = Role.objects.create(
35
        name='test parent 2 role', slug='test-parent-2-role', uuid=get_hex_uuid())
36
    parent_2_role.add_parent(grand_parent_role)
37
    child_role = Role.objects.create(
38
        name='test child role', slug='test-child-role', uuid=get_hex_uuid())
39
    child_role.add_parent(parent_1_role)
40
    child_role.add_parent(parent_2_role)
41

  
42
    query_set = Role.objects.filter(slug__startswith='test').order_by('slug')
43
    roles = export_roles(query_set)
44
    assert len(roles) == 4
45

  
46
    child_role_dict = roles[0]
47
    assert child_role_dict['slug'] == child_role.slug
48
    parents = child_role_dict['parents']
49
    assert len(parents) == 2
50
    expected_slugs = set([parent_1_role.slug, parent_2_role.slug])
51
    for parent in parents:
52
        assert parent['slug'] in expected_slugs
53
        expected_slugs.remove(parent['slug'])
54

  
55
    grand_parent_role_dict = roles[1]
56
    assert grand_parent_role_dict['slug'] == grand_parent_role.slug
57

  
58
    parent_1_role_dict = roles[2]
59
    assert parent_1_role_dict['slug'] == parent_1_role.slug
60
    parents = parent_1_role_dict['parents']
61
    assert len(parents) == 1
62
    assert parents[0]['slug'] == grand_parent_role.slug
63

  
64
    parent_2_role_dict = roles[3]
65
    assert parent_2_role_dict['slug'] == parent_2_role.slug
66
    parents = parent_2_role_dict['parents']
67
    assert len(parents) == 1
68
    assert parents[0]['slug'] == grand_parent_role.slug
69

  
70

  
71
def test_export_ou(db):
72
    ou = OU.objects.create(name='ou name', slug='ou-slug', description='ou description')
73
    ous = export_ou(OU.objects.filter(name='ou name'))
74
    assert len(ous) == 1
75
    ou_d = ous[0]
76
    assert ou_d['name'] == ou.name
77
    assert ou_d['slug'] == ou.slug
78
    assert ou_d['description'] == ou.description
79

  
80

  
81
def test_search_role_by_uuid(db):
82
    uuid = get_hex_uuid()
83
    role_d = {'uuid': uuid, 'slug': 'role-slug'}
84
    role = Role.objects.create(**role_d)
85
    assert role == search_role({'uuid': uuid, 'slug': 'other-role-slug'})
86

  
87

  
88
def test_search_role_by_slug(db):
89
    role_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug'}
90
    role = Role.objects.create(**role_d)
91
    assert role == search_role({
92
        'uuid': get_hex_uuid(), 'slug': 'role-slug',
93
        'ou': None, 'service': None})
94

  
95

  
96
def test_search_role_not_found(db):
97
    assert search_role(
98
        {
99
            'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name',
100
            'ou': None, 'service': None}) is None
101

  
102

  
103
def test_search_role_slug_not_unique(db):
104
    role1_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name'}
105
    role2_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name'}
106
    ou = OU.objects.create(name='some ou', slug='some-ou')
107
    role1 = Role.objects.create(ou=ou, **role1_d)
108
    Role.objects.create(**role2_d)
109
    assert role1 == search_role(role1.export_json())
110

  
111

  
112
def test_role_deserializer(db):
113
    rd = RoleDeserializer({
114
        'name': 'some role', 'description': 'some role description', 'slug': 'some-role',
115
        'uuid': get_hex_uuid(), 'ou': None, 'service': None}, ImportContext())
116
    assert rd._parents is None
117
    assert rd._attributes is None
118
    assert rd._obj is None
119
    role, status = rd.deserialize()
120
    assert status == 'created'
121
    assert role.name == 'some role'
122
    assert role.description == 'some role description'
123
    assert role.slug == 'some-role'
124
    assert rd._obj == role
125

  
126

  
127
def test_role_deserializer_with_ou(db):
128
    ou = OU.objects.create(name='some ou', slug='some-ou')
129
    rd = RoleDeserializer({
130
        'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'some role description',
131
        'slug': 'some-role', 'ou': {'slug': 'some-ou'}, 'service': None}, ImportContext())
132
    role, status = rd.deserialize()
133
    assert role.ou == ou
134

  
135

  
136
def test_role_deserializer_missing_ou(db):
137
    rd = RoleDeserializer({
138
        'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'role description',
139
        'slug': 'some-role', 'ou': {'slug': 'some-ou'}, 'service': None},
140
            ImportContext())
141
    with pytest.raises(DataImportError):
142
        rd.deserialize()
143

  
144

  
145
def test_role_deserializer_update_ou(db):
146
    ou1 = OU.objects.create(name='ou 1', slug='ou-1')
147
    ou2 = OU.objects.create(name='ou 2', slug='ou-2')
148
    uuid = get_hex_uuid()
149
    existing_role = Role.objects.create(uuid=uuid, slug='some-role', ou=ou1)
150
    rd = RoleDeserializer({
151
        'uuid': uuid, 'name': 'some-role', 'slug': 'some-role',
152
        'ou': {'slug': 'ou-2'}, 'service': None}, ImportContext())
153
    role, status = rd.deserialize()
154
    assert role == existing_role
155
    assert role.ou == ou2
156

  
157

  
158
def test_role_deserializer_update_fields(db):
159
    uuid = get_hex_uuid()
160
    existing_role = Role.objects.create(uuid=uuid, slug='some-role', name='some role')
161
    rd = RoleDeserializer({
162
        'uuid': uuid, 'slug': 'some-role', 'name': 'some role changed',
163
        'ou': None, 'service': None}, ImportContext())
164
    role, status = rd.deserialize()
165
    assert role == existing_role
166
    assert role.name == 'some role changed'
167

  
168

  
169
def test_role_deserializer_with_attributes(db):
170

  
171
    attributes_data = {
172
        'attr1_name': dict(name='attr1_name', kind='string', value='attr1_value'),
173
        'attr2_name': dict(name='attr2_name', kind='string', value='attr2_value')
174
    }
175
    rd = RoleDeserializer({
176
        'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'some role description',
177
        'slug': 'some-role', 'attributes': list(attributes_data.values()),
178
        'ou': None, 'service': None}, ImportContext())
179
    role, status = rd.deserialize()
180
    created, deleted = rd.attributes()
181
    assert role.attributes.count() == 2
182
    assert len(created) == 2
183

  
184
    for attr in created:
185
        attr_dict = attributes_data[attr.name]
186
        assert attr_dict['name'] == attr.name
187
        assert attr_dict['kind'] == attr.kind
188
        assert attr_dict['value'] == attr.value
189
        del attributes_data[attr.name]
190

  
191

  
192
def test_role_deserializer_creates_admin_role(db):
193
    role_dict = {
194
        'name': 'some role', 'slug': 'some-role', 'uuid': get_hex_uuid(),
195
        'ou': None, 'service': None}
196
    rd = RoleDeserializer(role_dict, ImportContext())
197
    rd.deserialize()
198
    Role.objects.get(slug='_a2-managers-of-role-some-role')
199

  
200

  
201
def test_role_deserializer_parenting_existing_parent(db):
202
    parent_role_dict = {
203
        'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
204
        'ou': None, 'service': None}
205
    parent_role = Role.objects.create(**parent_role_dict)
206
    child_role_dict = {
207
        'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict],
208
        'uuid': get_hex_uuid(), 'ou': None, 'service': None}
209

  
210
    rd = RoleDeserializer(child_role_dict, ImportContext())
211
    child_role, status = rd.deserialize()
212
    created, deleted = rd.parentings()
213

  
214
    assert len(created) == 1
215
    parenting = created[0]
216
    assert parenting.direct is True
217
    assert parenting.parent == parent_role
218
    assert parenting.child == child_role
219

  
220

  
221
def test_role_deserializer_parenting_non_existing_parent(db):
222
    parent_role_dict = {
223
        'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
224
        'ou': None, 'service': None}
225
    child_role_dict = {
226
        'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict],
227
        'uuid': get_hex_uuid(), 'ou': None, 'service': None}
228
    rd = RoleDeserializer(child_role_dict, ImportContext())
229
    rd.deserialize()
230
    with pytest.raises(DataImportError) as excinfo:
231
        rd.parentings()
232

  
233
    assert "Could not find role" in str(excinfo.value)
234

  
235

  
236
def test_role_deserializer_permissions(db):
237
    ou = OU.objects.create(slug='some-ou')
238
    other_role_dict = {
239
        'name': 'other role', 'slug': 'other-role-slug', 'uuid': get_hex_uuid(), 'ou': ou}
240
    other_role = Role.objects.create(**other_role_dict)
241
    other_role_dict['permisison'] = {
242
        "operation": {
243
            "slug": "admin"
244
        },
245
        "ou": {
246
            "slug": "default",
247
            "name": "Collectivit\u00e9 par d\u00e9faut"
248
        },
249
        'target_ct': {'app_label': u'a2_rbac', 'model': u'role'},
250
        "target": {
251
            "slug": "role-deux",
252
            "ou": {
253
                "slug": "default",
254
                "name": "Collectivit\u00e9 par d\u00e9faut"
255
            },
256
            "service": None,
257
            "name": "role deux"
258
        }
259
    }
260
    some_role_dict = {
261
        'name': 'some role', 'slug': 'some-role', 'uuid': get_hex_uuid(),
262
        'ou': None, 'service': None}
263
    some_role_dict['permissions'] = [
264
        {
265
            'operation': {'slug': 'add'},
266
            'ou': None,
267
            'target_ct': {'app_label': u'a2_rbac', 'model': u'role'},
268
            'target': {
269
                "slug": u'other-role-slug', 'ou': {'slug': 'some-ou'}, 'service': None}
270
        }
271
    ]
272

  
273
    import_context = ImportContext()
274
    rd = RoleDeserializer(some_role_dict, import_context)
275
    rd.deserialize()
276
    perm_created, perm_deleted = rd.permissions()
277

  
278
    assert len(perm_created) == 1
279
    assert len(perm_deleted) == 0
280
    del some_role_dict['permissions']
281
    role = Role.objects.get(slug=some_role_dict['slug'])
282
    assert role.permissions.count() == 1
283
    perm = role.permissions.first()
284
    assert perm.operation.slug == 'add'
285
    assert not perm.ou
286
    assert perm.target == other_role
287

  
288
    # that one should delete permissions
289
    rd = RoleDeserializer(some_role_dict, import_context)
290
    role, _ = rd.deserialize()
291
    perm_created, perm_deleted = rd.permissions()
292
    assert role.permissions.count() == 0
293
    assert len(perm_created) == 0
294
    assert len(perm_deleted) == 1
295

  
296

  
297
def test_permission_on_role(db):
298
    perm_ou = OU.objects.create(slug='perm-ou', name='perm ou')
299
    perm_role = Role.objects.create(slug='perm-role', ou=perm_ou, name='perm role')
300

  
301
    some_role_dict = {
302
        'name': 'some role', 'slug': 'some-role-slug', 'ou': None, 'service': None}
303
    some_role_dict['permissions'] = [{
304
        "operation": {
305
            "slug": "admin"
306
        },
307
        "ou": {
308
            "slug": "perm-ou",
309
            "name": "perm-ou"
310
        },
311
        'target_ct': {'app_label': u'a2_rbac', 'model': u'role'},
312
        "target": {
313
            "slug": "perm-role",
314
            "ou": {
315
                "slug": "perm-ou",
316
                "name": "perm ou"
317
            },
318
            "service": None,
319
            "name": "perm role"
320
        }
321
    }]
322

  
323
    import_context = ImportContext()
324
    rd = RoleDeserializer(some_role_dict, import_context)
325
    rd.deserialize()
326
    perm_created, perm_deleted = rd.permissions()
327
    assert len(perm_created) == 1
328
    perm = perm_created[0]
329
    assert perm.target == perm_role
330
    assert perm.ou == perm_ou
331
    assert perm.operation.slug == 'admin'
332

  
333

  
334
def test_permission_on_contentype(db):
335
    perm_ou = OU.objects.create(slug='perm-ou', name='perm ou')
336
    some_role_dict = {
337
        'name': 'some role', 'slug': 'some-role-slug', 'ou': None, 'service': None}
338
    some_role_dict['permissions'] = [{
339
        "operation": {
340
            "slug": "admin"
341
        },
342
        "ou": {
343
            "slug": "perm-ou",
344
            "name": "perm-ou"
345
        },
346
        'target_ct': {"model": "contenttype", "app_label": "contenttypes"},
347
        "target": {"model": "logentry", "app_label": "admin"}
348
    }]
349

  
350
    import_context = ImportContext()
351
    rd = RoleDeserializer(some_role_dict, import_context)
352
    rd.deserialize()
353
    perm_created, perm_deleted = rd.permissions()
354
    assert len(perm_created) == 1
355
    perm = perm_created[0]
356
    assert perm.target.app_label == 'admin'
357
    assert perm.target.model == 'logentry'
358
    assert perm.ou == perm_ou
359

  
360

  
361
def import_ou_created(db):
362
    uuid = get_hex_uuid()
363
    ou_d = {'uuid': uuid, 'slug': 'ou-slug', 'name': 'ou name'}
364
    ou, status = import_ou(ou_d)
365
    assert status == 'created'
366
    assert ou.uuid == ou_d['uuid']
367
    assert ou.slug == ou_d['slug']
368
    assert ou.name == ou_d['name']
369

  
370

  
371
def import_ou_updated(db):
372
    ou = OU.objects.create(slug='some-ou', name='ou name')
373
    ou_d = {'uuid': ou.uuid, 'slug': ou.slug, 'name': 'new name'}
374
    ou_updated, status = import_ou(ou_d)
375
    assert status == 'updated'
376
    assert ou == ou_updated
377
    assert ou.name == 'new name'
378

  
379

  
380
def testi_import_site_empty():
381
    res = import_site({}, ImportContext())
382
    assert res.roles == {'created': [], 'updated': []}
383
    assert res.ous == {'created': [], 'updated': []}
384
    assert res.parentings == {'created': [], 'deleted': []}
385

  
386

  
387
def test_import_site_roles(db):
388
    parent_role_dict = {
389
        'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
390
        'ou': None, 'service': None}
391
    child_role_dict = {
392
        'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict],
393
        'uuid': get_hex_uuid(), 'ou': None, 'service': None}
394
    roles = [
395
        parent_role_dict,
396
        child_role_dict
397
    ]
398
    res = import_site({'roles': roles}, ImportContext())
399
    created_roles = res.roles['created']
400
    assert len(created_roles) == 2
401
    parent_role = Role.objects.get(**parent_role_dict)
402
    del child_role_dict['parents']
403
    child_role = Role.objects.get(**child_role_dict)
404
    assert created_roles[0] == parent_role
405
    assert created_roles[1] == child_role
406

  
407
    assert len(res.parentings['created']) == 1
408
    assert res.parentings['created'][0] == RoleParenting.objects.get(
409
        child=child_role, parent=parent_role, direct=True)
410

  
411

  
412
def test_roles_import_ignore_technical_role(db):
413
    roles = [{
414
        'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}]
415
    res = import_site({'roles': roles}, ImportContext())
416
    assert res.roles == {'created': [], 'updated': []}
417

  
418

  
419
def test_roles_import_ignore_technical_role_with_service(db):
420
    roles = [{
421
        'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}]
422
    res = import_site({'roles': roles}, ImportContext())
423
    assert res.roles == {'created': [], 'updated': []}
424

  
425

  
426
def test_import_role_handle_manager_role_parenting(db):
427
    parent_role_dict = {
428
        'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
429
        'ou': None, 'service': None}
430
    parent_role_manager_dict = {
431
        'name': 'Administrateur du role grand parent role',
432
        'slug': '_a2-managers-of-role-grand-parent-role', 'uuid': get_hex_uuid(),
433
        'ou': None, 'service': None}
434
    child_role_dict = {
435
        'name': 'child role', 'slug': 'child-role',
436
        'parents': [parent_role_dict, parent_role_manager_dict],
437
        'uuid': get_hex_uuid(), 'ou': None, 'service': None}
438
    import_site({'roles': [child_role_dict, parent_role_dict]}, ImportContext())
439
    child = Role.objects.get(slug='child-role')
440
    manager = Role.objects.get(slug='_a2-managers-of-role-grand-parent-role')
441
    RoleParenting.objects.get(child=child, parent=manager, direct=True)
442

  
443

  
444
def test_import_roles_role_delete_orphans(db):
445
    roles = [{
446
        'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}]
447
    with pytest.raises(DataImportError):
448
        import_site({'roles': roles}, ImportContext(role_delete_orphans=True))
449

  
450

  
451
def test_import_ou(db):
452
    uuid = get_hex_uuid()
453
    name = 'ou name'
454
    ous = [{'uuid': uuid, 'slug': 'ou-slug', 'name': name}]
455
    res = import_site({'ous': ous}, ImportContext())
456
    assert len(res.ous['created']) == 1
457
    ou = res.ous['created'][0]
458
    assert ou.uuid == uuid
459
    assert ou.name == name
460
    Role.objects.get(slug='_a2-managers-of-ou-slug')
461

  
462

  
463
def test_import_ou_already_existing(db):
464
    uuid = get_hex_uuid()
465
    ou_d = {'uuid': uuid, 'slug': 'ou-slug', 'name': 'ou name'}
466
    ou = OU.objects.create(**ou_d)
467
    num_ous = OU.objects.count()
468
    res = import_site({'ous': [ou_d]}, ImportContext())
469
    assert len(res.ous['created']) == 0
470
    assert num_ous == OU.objects.count()
471
    assert ou == OU.objects.get(uuid=uuid)
tests/test_import_export_site_cmd.py
1
import __builtin__
2
import json
3

  
4
from django.core import management
5
import pytest
6

  
7
from django_rbac.utils import get_role_model
8

  
9

  
10
def dummy_export_site(*args):
11
    return {'roles': [{'name': 'role1'}]}
12

  
13

  
14
def test_export_role_cmd_stdout(db, capsys, monkeypatch):
15
    import authentic2.management.commands.export_site
16
    monkeypatch.setattr(
17
        authentic2.management.commands.export_site, 'export_site', dummy_export_site)
18
    management.call_command('export_site')
19
    out, err = capsys.readouterr()
20
    assert json.loads(out) == dummy_export_site()
21

  
22

  
23
def test_export_role_cmd_to_file(db, monkeypatch, tmpdir):
24
    import authentic2.management.commands.export_site
25
    monkeypatch.setattr(
26
        authentic2.management.commands.export_site, 'export_site', dummy_export_site)
27
    outfile = tmpdir.join('export.json')
28
    management.call_command('export_site', '--output', outfile.strpath)
29
    with outfile.open('r') as f:
30
        assert json.loads(f.read()) == dummy_export_site()
31

  
32

  
33
def test_import_site_cmd(db, tmpdir, monkeypatch):
34
    export_file = tmpdir.join('roles-export.json')
35
    with export_file.open('w'):
36
        export_file.write(json.dumps({'roles': []}))
37
    management.call_command('import_site', export_file.strpath)
38

  
39

  
40
def test_import_site_cmd_infos_on_stdout(db, tmpdir, monkeypatch, capsys):
41
    export_file = tmpdir.join('roles-export.json')
42
    with export_file.open('w'):
43
        export_file.write(json.dumps(
44
            {'roles': [{
45
                'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
46
                'ou': None, 'service': None}]}))
47

  
48
    management.call_command('import_site', export_file.strpath)
49

  
50
    out, err = capsys.readouterr()
51
    assert "Real run" in out
52
    assert "1 roles created" in out
53
    assert "0 roles updated" in out
54

  
55

  
56
def test_import_site_transaction_rollback_on_error(db, tmpdir, monkeypatch, capsys):
57
    export_file = tmpdir.join('roles-export.json')
58
    with export_file.open('w'):
59
        export_file.write(json.dumps({'roles': []}))
60

  
61
    Role = get_role_model()
62

  
63
    def exception_import_site(*args):
64
        Role.objects.create(slug='role-slug')
65
        raise Exception()
66

  
67
    import authentic2.management.commands.import_site
68
    monkeypatch.setattr(
69
        authentic2.management.commands.import_site, 'import_site', exception_import_site)
70

  
71
    with pytest.raises(Exception):
72
        management.call_command('import_site', export_file.strpath)
73

  
74
    with pytest.raises(Role.DoesNotExist):
75
        Role.objects.get(slug='role-slug')
76

  
77

  
78
def test_import_site_transaction_rollback_on_dry_run(db, tmpdir, monkeypatch, capsys):
79
    export_file = tmpdir.join('roles-export.json')
80
    with export_file.open('w'):
81
        export_file.write(json.dumps(
82
            {'roles': [{
83
                'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
84
                'ou': None, 'service': None}]}))
85

  
86
    Role = get_role_model()
87

  
88
    management.call_command('import_site', '--dry-run', export_file.strpath)
89

  
90
    with pytest.raises(Role.DoesNotExist):
91
        Role.objects.get(slug='role-slug')
92

  
93

  
94
def test_import_site_cmd_unhandled_context_option(db, tmpdir, monkeypatch, capsys):
95
    from authentic2.data_transfer import DataImportError
96

  
97
    export_file = tmpdir.join('roles-export.json')
98
    with export_file.open('w'):
99
        export_file.write(json.dumps(
100
            {'roles': [{
101
                'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
102
                'ou': None, 'service': None}]}))
103

  
104
    get_role_model().objects.create(uuid='dqfewrvesvews2532', slug='role-slug', name='role-name')
105

  
106
    with pytest.raises(DataImportError):
107
        management.call_command(
108
            'import_site', '-o', 'role-delete-orphans', export_file.strpath)
109

  
110

  
111
def test_import_site_cmd_unknown_context_option(db, tmpdir, monkeypatch, capsys):
112
    from django.core.management.base import CommandError
113
    export_file = tmpdir.join('roles-export.json')
114
    with pytest.raises(CommandError):
115
        management.call_command('import_site', '-o', 'unknown-option', export_file.strpath)
116

  
117

  
118
def test_import_site_confirm_prompt_yes(db, tmpdir, monkeypatch):
119
    export_file = tmpdir.join('roles-export.json')
120
    with export_file.open('w'):
121
        export_file.write(json.dumps(
122
            {'roles': [{
123
                'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
124
                'ou': None, 'service': None}]}))
125

  
126
    def yes_raw_input(*args, **kwargs):
127
        return 'yes'
128

  
129
    monkeypatch.setattr(__builtin__, 'raw_input', yes_raw_input)
130

  
131
    management.call_command('import_site', export_file.strpath, stdin='yes')
132
    assert get_role_model().objects.get(uuid='dqfewrvesvews2532')
0
-