Projet

Général

Profil

0001-create-import_site-and-export_site-commands-16514.patch

Emmanuel Cazenave, 28 mars 2018 16:10

Télécharger (45,1 ko)

Voir les différences:

Subject: [PATCH] create 'import_site' and 'export_site' commands (#16514)

Handle only OU and Role
 src/authentic2/a2_rbac/models.py                  |  83 +++++
 src/authentic2/data_transfer.py                   | 282 ++++++++++++++++
 src/authentic2/management/commands/export_site.py |  24 ++
 src/authentic2/management/commands/import_site.py |  76 +++++
 tests/test_a2_rbac.py                             | 157 ++++++++-
 tests/test_data_transfer.py                       | 379 ++++++++++++++++++++++
 tests/test_import_export_site_cmd.py              | 113 +++++++
 7 files changed, 1113 insertions(+), 1 deletion(-)
 create mode 100644 src/authentic2/data_transfer.py
 create mode 100644 src/authentic2/management/commands/export_site.py
 create mode 100644 src/authentic2/management/commands/import_site.py
 create mode 100644 tests/test_data_transfer.py
 create mode 100644 tests/test_import_export_site_cmd.py
src/authentic2/a2_rbac/models.py
22 22

  
23 23
from . import managers, fields
24 24

  
25
SIMPLE_SERIALIZABLE_FIELDS = (models.TextField, models.CharField, models.SlugField,
26
                              models.URLField, models.BooleanField, fields.UniqueBooleanField,
27
                              models.IntegerField, models.CommaSeparatedIntegerField,
28
                              models.EmailField, models.IntegerField, models.PositiveIntegerField)
29

  
25 30

  
26 31
class OrganizationalUnit(OrganizationalUnitAbstractBase):
27 32
    username_is_unique = models.BooleanField(
......
92 97
    def cached(cls):
93 98
        return cls.objects.all()
94 99

  
100
    def export_json(self):
101
        d = {}
102
        concrete_fields = [f for f in self.__class__._meta.get_fields()
103
                           if f.concrete and not f.is_relation]
104
        for field in concrete_fields:
105
            if field.name == 'id':
106
                continue
107
            value = getattr(self, field.attname)
108
            if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS):
109
                d[field.name] = value
110
            else:
111
                raise Exception('export_json: field %s of ressource class %s is unsupported' % (
112
                    field, self.__class__))
113
        return d
114

  
115
    @classmethod
116
    def import_json(cls, d, to_update=None):
117
        if to_update is None:
118
            return cls.objects.create(**d)
119
        else:
120
            for attr, value in d.items():
121
                setattr(to_update, attr, value)
122
            to_update.save()
123
            return to_update
124

  
95 125

  
96 126
class Permission(PermissionAbstractBase):
97 127
    class Meta:
......
207 237
            'ou__slug': self.ou.slug if self.ou else None,
208 238
        }
209 239

  
240
    def export_json(self, attributes=False, parents=False, permissions=False):
241
        d = {}
242
        concrete_fields = [f for f in self.__class__._meta.get_fields()
243
                           if f.concrete and not f.is_relation]
244
        for field in concrete_fields:
245
            if field.name == 'id':
246
                continue
247
            value = getattr(self, field.attname)
248
            if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS):
249
                d[field.name] = value
250
            else:
251
                raise Exception('export_json: field %s of ressource class %s is unsupported' % (
252
                    field, self.__class__))
253

  
254
        d['natural_key'] = self.natural_key()
255

  
256
        if attributes:
257
            for attribute in self.attributes.all():
258
                d.setdefault('attributes', []).append(attribute.to_json())
259

  
260
        if parents:
261
            RoleParenting = rbac_utils.get_role_parenting_model()
262
            for parenting in RoleParenting.objects.filter(child_id=self.id, direct=True):
263
                d.setdefault('parents', []).append(parenting.parent.export_json())
264

  
265
        if permissions:
266
            for perm in self.permissions.all():
267
                d.setdefault('permissions', []).append(perm.natural_key())
268

  
269
        return d
270

  
271
    @classmethod
272
    def import_json(cls, d, to_update=None):
273
        kwargs = {}
274
        concrete_fields = [f for f in cls._meta.get_fields()
275
                           if f.concrete and not f.is_relation]
276
        for field in concrete_fields:
277
            if field.name == 'id':
278
                continue
279
            if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS) and field.name in d:
280
                kwargs[field.name] = d[field.name]
281
        if to_update is None:
282
            return cls.objects.create(**kwargs)
283
        else:
284
            for attr, value in kwargs.items():
285
                setattr(to_update, attr, value)
286
            to_update.save()
287
        return to_update
288

  
210 289

  
211 290
class RoleParenting(RoleParentingAbstractBase):
212 291
    class Meta(RoleParentingAbstractBase.Meta):
......
239 318
            ('role', 'name', 'kind', 'value'),
240 319
        )
241 320

  
321
    def to_json(self):
322
        return {'name': self.name, 'kind': self.kind, 'value': self.value}
323

  
324

  
242 325
GenericRelation(Permission,
243 326
                content_type_field='target_ct',
244 327
                object_id_field='target_id').contribute_to_class(ContentType, 'admin_perms')
src/authentic2/data_transfer.py
1
from django.contrib.contenttypes.models import ContentType
2

  
3
from django_rbac.models import Operation
4
from django_rbac.utils import (
5
    get_ou_model,  get_role_model, get_role_parenting_model, get_permission_model)
6
from authentic2.a2_rbac.models import RoleAttribute
7

  
8

  
9
def export_site():
10
    return {
11
        'roles': export_roles(get_role_model().objects.all()),
12
        'ous': export_ou(get_ou_model().objects.all())
13
    }
14

  
15

  
16
def export_ou(ou_query_set):
17
    return [ou.export_json() for ou in ou_query_set]
18

  
19

  
20
def export_roles(role_queryset):
21
    """ Serialize roles in role_queryset
22
    """
23
    return [role.export_json(attributes=True, parents=True) for role in role_queryset]
24

  
25

  
26
def search_ou(ou_natural_key):
27
    """ ou_natural_key: ['ou-slug'] or None
28
    """
29
    if ou_natural_key:
30
        try:
31
            OU = get_ou_model()
32
            return OU.objects.get_by_natural_key(*ou_natural_key)
33
        except OU.DoesNotExist:
34
            pass
35
    return None
36

  
37

  
38
def search_role(role_d):
39
    Role = get_role_model()
40
    try:
41
        return Role.objects.get(uuid=role_d['uuid'])
42
    except Role.DoesNotExist:
43
        pass
44
    try:
45
        return Role.objects.get_by_natural_key(*role_d['natural_key'])
46
    except Role.DoesNotExist:
47
        return None
48

  
49

  
50
class ImportContext(object):
51
    """ Holds information on how to perform the import.
52

  
53
    ou_delete_orphans: if True any existing ou that is not found in the export will
54
                       be deleted
55

  
56
    role_delete_orphans: if True any existing role that is not found in the export will
57
                         be deleted
58

  
59

  
60
    role_attributes_update: for each role in the import data,
61
                            attributes  will deleted and re-created
62

  
63

  
64
    role_parentings_update: for each role in the import data,
65
                            parentings will deleted and re-created
66

  
67
    role_permissions_update: for each role in the import data,
68
                             permissions  will deleted and re-created
69
    """
70

  
71
    def __init__(
72
            self, role_delete_orphans=False, role_parentings_update=True,
73
            role_permissions_update=True, role_attributes_update=True,
74
            ou_delete_orphans=False):
75
        self.role_delete_orphans = role_delete_orphans
76
        self.ou_delete_orphans = ou_delete_orphans
77
        self.role_parentings_update = role_parentings_update
78
        self.role_permissions_update = role_permissions_update
79
        self.role_attributes_update = role_attributes_update
80

  
81

  
82
class DataImportError(Exception):
83
    pass
84

  
85

  
86
class RoleDeserializer(object):
87

  
88
    def __init__(self, d, import_context):
89
        self._import_context = import_context
90
        self._obj = None
91
        self._ou = None
92
        self._parents = None
93
        self._attributes = None
94
        self._permissions = None
95

  
96
        self._role_d = dict()
97
        for key, value in d.items():
98
            if key == 'parents':
99
                self._parents = value
100
            elif key == 'attributes':
101
                self._attributes = value
102
            elif key == 'permissions':
103
                self._permissions = value
104
            else:
105
                self._role_d[key] = value
106

  
107
        ou_natural_key = self._role_d['natural_key'][1]
108
        if ou_natural_key:
109
            self._ou = search_ou(ou_natural_key)
110
            if self._ou is None:
111
                raise DataImportError(
112
                    "Can't import role because missing Organizational Unit : "
113
                    "%s" % ou_natural_key[0])
114

  
115
    def deserialize(self):
116
        kwargs = self._role_d.copy()
117
        obj = search_role(self._role_d)
118
        if obj:  # Role already exist
119
            self._obj = obj
120
            status = 'updated'
121
            get_role_model().import_json(kwargs, self._obj)
122
            if self._ou and (self._obj.ou != self._ou):
123
                # Need to update ou
124
                self._obj.ou = self._ou
125
                self._obj.save()
126
        else:  # Create role
127
            self._obj = get_role_model().import_json(kwargs)
128
            if self._ou:
129
                self._obj.ou = self._ou
130
                self._obj.save()
131
            status = 'created'
132

  
133
        # Ensure admin role is created
134
        self._obj.get_admin_role()
135
        return self._obj, status
136

  
137
    def attributes(self):
138
        """ Update attributes (delete everything then create)
139
        """
140
        created, deleted = [], []
141
        for attr in self._obj.attributes.all():
142
            attr.delete()
143
            deleted.append(attr)
144
        # Create attributes
145
        if self._attributes:
146
            for attr_dict in self._attributes:
147
                attr_dict['role'] = self._obj
148
                created.append(RoleAttribute.objects.create(**attr_dict))
149

  
150
        return created, deleted
151

  
152
    def parentings(self):
153
        """ Update parentings (delete everything then create)
154
        """
155
        created, deleted = [], []
156
        Parenting = get_role_parenting_model()
157
        for parenting in Parenting.objects.filter(child=self._obj, direct=True):
158
            parenting.delete()
159
            deleted.append(parenting)
160

  
161
        if self._parents:
162
            for parent_d in self._parents:
163
                parent = search_role(parent_d)
164
                if not parent:
165
                    raise DataImportError("Could not find role : %s" % parent_d)
166
                created.append(Parenting.objects.create(
167
                    child=self._obj, direct=True, parent=parent))
168

  
169
        return created, deleted
170

  
171
    def permissions(self):
172
        """ Update permissions (delete everything then create)
173
        """
174
        created, deleted = [], []
175
        for perm in self._obj.permissions.all():
176
            perm.delete()
177
            deleted.append(perm)
178
        self._obj.permissions.clear()
179
        if self._permissions:
180
            for perm in self._permissions:
181
                op = Operation.objects.get_by_natural_key(perm[0])
182
                ou = get_ou_model().objects.get_by_natural_key(perm[1]) if perm[1] else None
183
                ct = ContentType.objects.get_by_natural_key(*perm[2])
184
                target = ct.model_class().objects.get_by_natural_key(*perm[3])
185
                perm = get_permission_model().objects.create(
186
                    operation=op, ou=ou, target_ct=ct, target_id=target.pk)
187
                self._obj.permissions.add(perm)
188
                created.append(perm)
189

  
190
        return created, deleted
191

  
192

  
193
class ImportResult(object):
194

  
195
    def __init__(self):
196
        self.roles = {'created': [], 'updated': []}
197
        self.ous = {'created': [], 'updated': []}
198
        self.attributes = {'created': [], 'deleted': []}
199
        self.parentings = {'created': [], 'deleted': []}
200
        self.permissions = {'created': [], 'deleted': []}
201

  
202
    def update_roles(self, role, d_status):
203
        self.roles[d_status].append(role)
204

  
205
    def update_ous(self, ou, status):
206
        self.ous[status].append(ou)
207

  
208
    def _bulk_update(self, attrname, created, deleted):
209
        attr = getattr(self, attrname)
210
        attr['created'].extend(created)
211
        attr['deleted'].extend(deleted)
212

  
213
    def update_attributes(self, created, deleted):
214
        self._bulk_update('attributes', created, deleted)
215

  
216
    def update_parentings(self, created, deleted):
217
        self._bulk_update('parentings', created, deleted)
218

  
219
    def update_permissions(self, created, deleted):
220
        self._bulk_update('permissions', created, deleted)
221

  
222
    def to_str(self, verbose=False):
223
        res = ""
224
        for attr in ('roles', 'ous', 'parentings', 'permissions', 'attributes'):
225
            data = getattr(self, attr)
226
            for status in ('created', 'updated', 'deleted'):
227
                if status in data:
228
                    s_data = data[status]
229
                    res += "%s %s %s\n" % (len(s_data), attr, status)
230
        return res
231

  
232

  
233
def import_ou(ou_d):
234
    OU = get_ou_model()
235
    ou = search_ou([ou_d['slug']])
236
    if ou is None:
237
        ou = OU.import_json(ou_d)
238
        status = 'created'
239
    else:
240
        OU.import_json(ou_d, ou)
241
        status = 'updated'
242
    # Ensure admin role is created
243
    ou.get_admin_role()
244
    return ou, status
245

  
246

  
247
def import_site(json_d, import_context):
248
    result = ImportResult()
249

  
250
    for ou_d in json_d.get('ous', []):
251
        result.update_ous(*import_ou(ou_d))
252

  
253
    roles_ds = [RoleDeserializer(role_d, import_context) for role_d in json_d.get('roles', [])
254
                if not role_d['slug'].startswith('_')]
255

  
256
    for ds in roles_ds:
257
        result.update_roles(*ds.deserialize())
258

  
259
    if import_context.role_attributes_update:
260
        for ds in roles_ds:
261
            result.update_attributes(*ds.attributes())
262

  
263
    if import_context.role_parentings_update:
264
        for ds in roles_ds:
265
            result.update_parentings(*ds.parentings())
266

  
267
    if import_context.role_permissions_update:
268
        for ds in roles_ds:
269
            result.update_permissions(*ds.permissions())
270

  
271
    if import_context.ou_delete_orphans:
272
        raise DataImportError(
273
            "Unsupported context value for ou_delete_orphans : %s" % (
274
                import_context.ou_delete_orphans))
275

  
276
    if import_context.role_delete_orphans:
277
        # FIXME : delete each role that is in DB but not in the export
278
        raise DataImportError(
279
            "Unsupported context value for role_delete_orphans : %s" % (
280
                import_context.role_delete_orphans))
281

  
282
    return result
src/authentic2/management/commands/export_site.py
1
import json
2
import sys
3

  
4
from django.core.management.base import BaseCommand
5

  
6
from authentic2.data_transfer import export_site
7
from django_rbac.utils import get_role_model
8

  
9

  
10
class Command(BaseCommand):
11
    help = 'Export site'
12

  
13
    def add_arguments(self, parser):
14
        parser.add_argument('--output', metavar='FILE', default=None,
15
                            help='name of a file to write output to')
16

  
17
    def handle(self, *args, **options):
18
        if options['output']:
19
            output, close = open(options['output'], 'w'), True
20
        else:
21
            output, close = sys.stdout, False
22
        json.dump(export_site(), output, indent=4)
23
        if close:
24
            output.close()
src/authentic2/management/commands/import_site.py
1
import contextlib
2
import json
3
import sys
4

  
5
from django.conf import settings
6
from django.core.management.base import BaseCommand
7
from django.db import transaction
8
from django.utils import translation
9

  
10
from authentic2.data_transfer import import_site, ImportContext
11

  
12

  
13
class DryRunException(Exception):
14
    pass
15

  
16

  
17
def create_context_args(options):
18
    kwargs = {}
19
    if options['option']:
20
        for context_op in options['option']:
21
            context_op = context_op.replace('-', '_')
22
            if context_op.startswith('no_'):
23
                kwargs[context_op[3:]] = False
24
            else:
25
                kwargs[context_op] = True
26
    return kwargs
27

  
28

  
29
#  Borrowed from https://bugs.python.org/issue10049#msg118599
30
@contextlib.contextmanager
31
def provision_contextm(dry_run):
32
    multitenant = False
33
    try:
34
        import hobo.agent.authentic2
35
        multitenant = True
36
    except ImportError:
37
        pass
38
    if dry_run and multitenant:
39
        with hobo.agent.authentic2.provisionning.Provisionning():
40
            yield
41
    else:
42
        yield
43

  
44

  
45
class Command(BaseCommand):
46
    help = 'Import site'
47

  
48
    def add_arguments(self, parser):
49
        parser.add_argument(
50
            'filename', metavar='FILENAME', type=str, help='name of file to import')
51
        parser.add_argument(
52
            '--no-dry-run', action='store_false', dest='dry_run', help='Really perform the import')
53
        parser.add_argument(
54
            '-o', '--option', action='append', help='Import context options',
55
            choices=[
56
                'role-delete-orphans', 'ou-delete-orphans', 'no-role-permissions-update',
57
                'no-role-attributes-update', 'no-role-parentings-update'])
58

  
59
    def handle(self, filename, **options):
60
        translation.activate(settings.LANGUAGE_CODE)
61
        dry_run = options['dry_run']
62
        msg = "Dry run\n" if dry_run else "Real run\n"
63
        c_kwargs = create_context_args(options)
64
        try:
65
            with open(filename, 'r') as f:
66
                with provision_contextm(dry_run):
67
                    with transaction.atomic():
68
                        sys.stdout.write(msg)
69
                        result = import_site(json.load(f), ImportContext(**c_kwargs))
70
                        if dry_run:
71
                            raise DryRunException()
72
        except DryRunException:
73
            pass
74
        sys.stdout.write(result.to_str())
75
        sys.stdout.write("Success\n")
76
        translation.deactivate()
tests/test_a2_rbac.py
1 1
import pytest
2 2

  
3
from django.contrib.contenttypes.models import ContentType
4
from django_rbac.utils import get_permission_model
5
from django_rbac.models import Operation
6
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU, RoleAttribute
3 7
from authentic2.models import Service
4
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU
8
from authentic2.utils import get_hex_uuid
5 9

  
6 10

  
7 11
def test_role_natural_key(db):
......
24 28
        Role.objects.get_by_natural_key(*r2.natural_key())
25 29
    with pytest.raises(Role.DoesNotExist):
26 30
        Role.objects.get_by_natural_key(*r4.natural_key())
31

  
32

  
33
def test_basic_role_export_json(db):
34
    role = Role.objects.create(
35
        name='basic role', slug='basic-role', description='basic role description')
36
    role_dict = role.export_json()
37
    assert role_dict['name'] == role.name
38
    assert role_dict['slug'] == role.slug
39
    assert role_dict['uuid'] == role.uuid
40
    assert role_dict['description'] == role.description
41
    assert role_dict['admin_scope_id'] == role.admin_scope_id
42
    assert role_dict['external_id'] == role.external_id
43
    assert role_dict['natural_key'] == ['basic-role', None, None]
44

  
45

  
46
def test_role_with_ou_export_json(db):
47
    ou = OU.objects.create(name='ou', slug='ou')
48
    role = Role.objects.create(name='some role', ou=ou)
49
    role_dict = role.export_json()
50
    assert role_dict['natural_key'] == ['some-role', ['ou'], None]
51

  
52

  
53
def test_role_with_service_export_json(db):
54
    service = Service.objects.create(name='service name', slug='service-name')
55
    role = Role.objects.create(name='some role', service=service)
56
    role_dict = role.export_json()
57
    assert role_dict['natural_key'] == [u'some-role', None, [None, 'service-name']]
58

  
59

  
60
def test_role_with_attributes_export_json(db):
61
    role = Role.objects.create(name='some role')
62
    attr1 = RoleAttribute.objects.create(
63
        role=role, name='attr1_name', kind='string', value='attr1_value')
64
    attr2 = RoleAttribute.objects.create(
65
        role=role, name='attr2_name', kind='string', value='attr2_value')
66

  
67
    role_dict = role.export_json(attributes=True)
68
    attributes = role_dict['attributes']
69
    assert len(attributes) == 2
70

  
71
    expected_attr_names = set([attr1.name, attr2.name])
72
    for attr_dict in attributes:
73
        assert attr_dict['name'] in expected_attr_names
74
        expected_attr_names.remove(attr_dict['name'])
75
        target_attr = RoleAttribute.objects.filter(name=attr_dict['name']).first()
76
        assert attr_dict['kind'] == target_attr.kind
77
        assert attr_dict['value'] == target_attr.value
78

  
79

  
80
def test_role_with_parents_export_json(db):
81
    grand_parent_role = Role.objects.create(
82
        name='test grand parent role', slug='test-grand-parent-role')
83
    parent_1_role = Role.objects.create(
84
        name='test parent 1 role', slug='test-parent-1-role')
85
    parent_1_role.add_parent(grand_parent_role)
86
    parent_2_role = Role.objects.create(
87
        name='test parent 2 role', slug='test-parent-2-role')
88
    parent_2_role.add_parent(grand_parent_role)
89
    child_role = Role.objects.create(
90
        name='test child role', slug='test-child-role')
91
    child_role.add_parent(parent_1_role)
92
    child_role.add_parent(parent_2_role)
93

  
94
    child_role_dict = child_role.export_json(parents=True)
95
    assert child_role_dict['slug'] == child_role.slug
96
    parents = child_role_dict['parents']
97
    assert len(parents) == 2
98
    expected_slugs = set([parent_1_role.slug, parent_2_role.slug])
99
    for parent in parents:
100
        assert parent['slug'] in expected_slugs
101
        expected_slugs.remove(parent['slug'])
102

  
103
    grand_parent_role_dict = grand_parent_role.export_json(parents=True)
104
    assert grand_parent_role_dict['slug'] == grand_parent_role.slug
105
    assert 'parents' not in grand_parent_role_dict
106

  
107
    parent_1_role_dict = parent_1_role.export_json(parents=True)
108
    assert parent_1_role_dict['slug'] == parent_1_role.slug
109
    parents = parent_1_role_dict['parents']
110
    assert len(parents) == 1
111
    assert parents[0]['slug'] == grand_parent_role.slug
112

  
113
    parent_2_role_dict = parent_2_role.export_json(parents=True)
114
    assert parent_2_role_dict['slug'] == parent_2_role.slug
115
    parents = parent_2_role_dict['parents']
116
    assert len(parents) == 1
117
    assert parents[0]['slug'] == grand_parent_role.slug
118

  
119

  
120
def test_role_with_permission_export_json(db):
121
    some_ou = OU.objects.create(name='some ou', slug='some-ou')
122
    role = Role.objects.create(name='role name', slug='role-slug')
123
    other_role = Role.objects.create(
124
        name='other role name', slug='other-role-slug', uuid=get_hex_uuid(), ou=some_ou)
125
    ou = OU.objects.create(name='basic ou', slug='basic-ou', description='basic ou description')
126
    Permission = get_permission_model()
127
    op = Operation.objects.first()
128
    perm_saml = Permission.objects.create(
129
        operation=op, ou=ou,
130
        target_ct=ContentType.objects.get_for_model(ContentType),
131
        target_id=ContentType.objects.get(app_label="saml", model="libertyprovider").pk)
132
    role.permissions.add(perm_saml)
133
    perm_role = Permission.objects.create(
134
        operation=op, ou=None,
135
        target_ct=ContentType.objects.get_for_model(Role),
136
        target_id=other_role.pk)
137
    role.permissions.add(perm_role)
138

  
139
    export = role.export_json(permissions=True)
140
    permissions = export['permissions']
141
    assert len(permissions) == 2
142
    assert permissions[0] == [
143
        u'add', [u'basic-ou'], (u'contenttypes', u'contenttype'), (u'saml', u'libertyprovider')]
144
    assert permissions[1] == [
145
        u'add', None, (u'a2_rbac', u'role'), [u'other-role-slug', ['some-ou'], None]]
146

  
147

  
148
def test_basic_role_import_json(db):
149
    role_dict = dict(
150
        name='basic role', slug='basic-role', description='basic role description',
151
        uuid=get_hex_uuid(), admin_scope_id=1, external_id='some id')
152
    role = Role.import_json(role_dict)
153
    assert role_dict['name'] == role.name
154
    assert role_dict['slug'] == role.slug
155
    assert role_dict['uuid'] == role.uuid
156
    assert role_dict['description'] == role.description
157
    assert role_dict['admin_scope_id'] == role.admin_scope_id
158
    assert role_dict['external_id'] == role.external_id
159

  
160

  
161
def test_ou_export_json(db):
162
    ou = OU.objects.create(
163
        name='basic ou', slug='basic-ou', description='basic ou description',
164
        username_is_unique=True, email_is_unique=True, default=False, validate_emails=True)
165
    ou_dict = ou.export_json()
166
    assert ou_dict['name'] == ou.name
167
    assert ou_dict['slug'] == ou.slug
168
    assert ou_dict['uuid'] == ou.uuid
169
    assert ou_dict['description'] == ou.description
170
    assert ou_dict['username_is_unique'] == ou.username_is_unique
171
    assert ou_dict['email_is_unique'] == ou.email_is_unique
172
    assert ou_dict['default'] == ou.default
173
    assert ou_dict['validate_emails'] == ou.validate_emails
174

  
175

  
176
def test_ou_import_json(db):
177
    ou_d = dict(name='basic ou', slug='basic-ou', description='basic ou description',
178
                username_is_unique=True, email_is_unique=True, default=False, validate_emails=True)
179
    ou = OU.import_json(ou_d)
180
    for field, value in ou_d.items():
181
        assert getattr(ou, field) == value
tests/test_data_transfer.py
1
from django_rbac.utils import get_role_model, get_ou_model
2
import pytest
3

  
4
from authentic2.a2_rbac.models import RoleParenting
5
from authentic2.data_transfer import (
6
    DataImportError, export_roles, import_site, export_ou, ImportContext,
7
    RoleDeserializer, search_role, import_ou)
8
from authentic2.utils import get_hex_uuid
9

  
10

  
11
Role = get_role_model()
12
OU = get_ou_model()
13

  
14

  
15
def test_export_basic_role(db):
16
    role = Role.objects.create(name='basic role', slug='basic-role', uuid=get_hex_uuid())
17
    query_set = Role.objects.filter(uuid=role.uuid)
18
    roles = export_roles(query_set)
19
    assert len(roles) == 1
20
    role_dict = roles[0]
21
    for key, value in role.export_json().items():
22
        assert role_dict[key] == value
23

  
24

  
25
def test_export_role_with_parents(db):
26
    grand_parent_role = Role.objects.create(
27
        name='test grand parent role', slug='test-grand-parent-role', uuid=get_hex_uuid())
28
    parent_1_role = Role.objects.create(
29
        name='test parent 1 role', slug='test-parent-1-role', uuid=get_hex_uuid())
30
    parent_1_role.add_parent(grand_parent_role)
31
    parent_2_role = Role.objects.create(
32
        name='test parent 2 role', slug='test-parent-2-role', uuid=get_hex_uuid())
33
    parent_2_role.add_parent(grand_parent_role)
34
    child_role = Role.objects.create(
35
        name='test child role', slug='test-child-role', uuid=get_hex_uuid())
36
    child_role.add_parent(parent_1_role)
37
    child_role.add_parent(parent_2_role)
38

  
39
    query_set = Role.objects.filter(slug__startswith='test').order_by('slug')
40
    roles = export_roles(query_set)
41
    assert len(roles) == 4
42

  
43
    child_role_dict = roles[0]
44
    assert child_role_dict['slug'] == child_role.slug
45
    parents = child_role_dict['parents']
46
    assert len(parents) == 2
47
    expected_slugs = set([parent_1_role.slug, parent_2_role.slug])
48
    for parent in parents:
49
        assert parent['slug'] in expected_slugs
50
        expected_slugs.remove(parent['slug'])
51

  
52
    grand_parent_role_dict = roles[1]
53
    assert grand_parent_role_dict['slug'] == grand_parent_role.slug
54

  
55
    parent_1_role_dict = roles[2]
56
    assert parent_1_role_dict['slug'] == parent_1_role.slug
57
    parents = parent_1_role_dict['parents']
58
    assert len(parents) == 1
59
    assert parents[0]['slug'] == grand_parent_role.slug
60

  
61
    parent_2_role_dict = roles[3]
62
    assert parent_2_role_dict['slug'] == parent_2_role.slug
63
    parents = parent_2_role_dict['parents']
64
    assert len(parents) == 1
65
    assert parents[0]['slug'] == grand_parent_role.slug
66

  
67

  
68
def test_export_ou(db):
69
    ou = OU.objects.create(name='ou name', slug='ou-slug')
70
    ous = export_ou(OU.objects.filter(name='ou name'))
71
    assert len(ous) == 1
72
    ou_d = ous[0]
73
    assert ou_d['name'] == ou.name
74
    assert ou_d['slug'] == ou.slug
75

  
76

  
77
def test_search_role_by_uuid(db):
78
    uuid = get_hex_uuid()
79
    role_d = {'uuid': uuid, 'slug': 'role-slug'}
80
    role = Role.objects.create(**role_d)
81
    assert role == search_role({'uuid': uuid, 'slug': 'other-role-slug'})
82

  
83

  
84
def test_search_role_by_slug(db):
85
    role_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug'}
86
    role = Role.objects.create(**role_d)
87
    assert role == search_role({
88
        'uuid': get_hex_uuid(), 'slug': 'role-slug', 'natural_key': ['role-slug', None, None]})
89

  
90

  
91
def test_search_role_not_found(db):
92
    assert search_role(
93
        {
94
            'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name',
95
            'natural_key': ['role-slug', None, None]}) is None
96

  
97

  
98
def test_search_role_slug_not_unique(db):
99
    role1_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name'}
100
    role2_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name'}
101
    ou = OU.objects.create(name='some ou', slug='some-ou')
102
    role1 = Role.objects.create(ou=ou, **role1_d)
103
    Role.objects.create(**role2_d)
104
    assert role1 == search_role(role1.export_json())
105

  
106

  
107
def test_role_deserializer(db):
108
    rd = RoleDeserializer({
109
        'name': 'some role', 'description': 'some role description', 'slug': 'some-role',
110
        'uuid': get_hex_uuid(), 'natural_key': ['some-role', None, None]}, ImportContext())
111
    assert rd._ou is None
112
    assert rd._parents is None
113
    assert rd._attributes is None
114
    assert rd._obj is None
115
    role, status = rd.deserialize()
116
    assert status == 'created'
117
    assert role.name == 'some role'
118
    assert role.description == 'some role description'
119
    assert role.slug == 'some-role'
120
    assert rd._obj == role
121

  
122

  
123
def test_role_deserializer_with_ou(db):
124
    ou = OU.objects.create(name='some ou', slug='some-ou')
125
    rd = RoleDeserializer({
126
        'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'some role description',
127
        'slug': 'some-role', 'natural_key': ['some-role', ['some-ou'], None]}, ImportContext())
128
    role, status = rd.deserialize()
129
    assert role.ou == ou
130

  
131

  
132
def test_role_deserializer_missing_ou(db):
133
    with pytest.raises(DataImportError):
134
        RoleDeserializer({
135
            'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'role description',
136
            'slug': 'some-role', 'natural_key': ['role-slug', ['some-ou'], None]},
137
            ImportContext())
138

  
139

  
140
def test_role_deserializer_update_ou(db):
141
    ou1 = OU.objects.create(name='ou 1', slug='ou-1')
142
    ou2 = OU.objects.create(name='ou 2', slug='ou-2')
143
    uuid = get_hex_uuid()
144
    existing_role = Role.objects.create(uuid=uuid, slug='some-role', ou=ou1)
145
    rd = RoleDeserializer({
146
        'uuid': uuid, 'name': 'some-role', 'slug': 'some-role',
147
        'natural_key': ['some-role', ['ou-2'], None]}, ImportContext())
148
    role, status = rd.deserialize()
149
    assert role == existing_role
150
    assert role.ou == ou2
151

  
152

  
153
def test_role_deserializer_update_fields(db):
154
    uuid = get_hex_uuid()
155
    existing_role = Role.objects.create(uuid=uuid, slug='some-role', name='some role')
156
    rd = RoleDeserializer({
157
        'uuid': uuid, 'slug': 'some-role', 'name': 'some role changed',
158
        'natural_key': ['some-role', None, None]}, ImportContext())
159
    role, status = rd.deserialize()
160
    assert role == existing_role
161
    assert role.name == 'some role changed'
162

  
163

  
164
def test_role_deserializer_with_attributes(db):
165

  
166
    attributes_data = {
167
        'attr1_name': dict(name='attr1_name', kind='string', value='attr1_value'),
168
        'attr2_name': dict(name='attr2_name', kind='string', value='attr2_value')
169
    }
170
    rd = RoleDeserializer({
171
        'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'some role description',
172
        'slug': 'some-role', 'attributes': list(attributes_data.values()),
173
        'natural_key': ['some-role', None, None]}, ImportContext())
174
    role, status = rd.deserialize()
175
    created, deleted = rd.attributes()
176
    assert role.attributes.count() == 2
177
    assert len(created) == 2
178

  
179
    for attr in created:
180
        attr_dict = attributes_data[attr.name]
181
        assert attr_dict['name'] == attr.name
182
        assert attr_dict['kind'] == attr.kind
183
        assert attr_dict['value'] == attr.value
184
        del attributes_data[attr.name]
185

  
186

  
187
def test_role_deserializer_creates_admin_role(db):
188
    role_dict = {
189
        'name': 'some role', 'slug': 'some-role', 'uuid': get_hex_uuid(),
190
        'natural_key': ['some-role', None, None]}
191
    rd = RoleDeserializer(role_dict, ImportContext())
192
    rd.deserialize()
193
    Role.objects.get(slug='_a2-managers-of-role-some-role')
194

  
195

  
196
def test_role_deserializer_parenting_existing_parent(db):
197
    parent_role_dict = {
198
        'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
199
        'natural_key': ['grand-parent-role', None, None]}
200
    parent_role = Role.import_json(parent_role_dict)
201
    child_role_dict = {
202
        'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict],
203
        'uuid': get_hex_uuid(), 'natural_key': ['child-role', None, None]}
204

  
205
    rd = RoleDeserializer(child_role_dict, ImportContext())
206
    child_role, status = rd.deserialize()
207
    created, deleted = rd.parentings()
208

  
209
    assert len(created) == 1
210
    parenting = created[0]
211
    assert parenting.direct is True
212
    assert parenting.parent == parent_role
213
    assert parenting.child == child_role
214

  
215

  
216
def test_role_deserializer_parenting_non_existing_parent(db):
217
    parent_role_dict = {
218
        'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
219
        'natural_key': ['grand-parent-role', None, None]}
220
    child_role_dict = {
221
        'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict],
222
        'uuid': get_hex_uuid(), 'natural_key': ['child-role', None, None]}
223
    rd = RoleDeserializer(child_role_dict, ImportContext())
224
    rd.deserialize()
225
    with pytest.raises(DataImportError) as excinfo:
226
        rd.parentings()
227

  
228
    assert "Could not find role" in str(excinfo.value)
229

  
230

  
231
def test_role_deserializer_permissions(db):
232
    ou = OU.objects.create(slug='some-ou')
233
    other_role_dict = {
234
        'name': 'other role', 'slug': 'other-role-slug', 'uuid': get_hex_uuid(), 'ou': ou}
235
    other_role = Role.objects.create(**other_role_dict)
236
    some_role_dict = {
237
        'name': 'some role', 'slug': 'some-role', 'uuid': get_hex_uuid(),
238
        'natural_key': ['some-role', None, None]}
239
    some_role_dict['permissions'] = [
240
        [u'add', None, (u'a2_rbac', u'role'), [u'other-role-slug', ['some-ou'], None]]
241
    ]
242

  
243
    import_context = ImportContext()
244
    rd = RoleDeserializer(some_role_dict, import_context)
245
    rd.deserialize()
246
    perm_created, perm_deleted = rd.permissions()
247

  
248
    assert len(perm_created) == 1
249
    assert len(perm_deleted) == 0
250
    del some_role_dict['permissions']
251
    role = Role.objects.get(slug=some_role_dict['slug'])
252
    assert role.permissions.count() == 1
253
    perm = role.permissions.first()
254
    assert perm.operation.slug == 'add'
255
    assert not perm.ou
256
    assert perm.target == other_role
257

  
258
    # that one should delete permissions
259
    rd = RoleDeserializer(some_role_dict, import_context)
260
    role, _ = rd.deserialize()
261
    perm_created, perm_deleted = rd.permissions()
262
    assert role.permissions.count() == 0
263
    assert len(perm_created) == 0
264
    assert len(perm_deleted) == 1
265

  
266

  
267
def import_ou_created(db):
268
    uuid = get_hex_uuid()
269
    ou_d = {'uuid': uuid, 'slug': 'ou-slug', 'name': 'ou name'}
270
    ou, status = import_ou(ou_d)
271
    assert status == 'created'
272
    assert ou.uuid == ou_d['uuid']
273
    assert ou.slug == ou_d['slug']
274
    assert ou.name == ou_d['name']
275

  
276

  
277
def import_ou_updated(db):
278
    ou = OU.objects.create(slug='some-ou', name='ou name')
279
    ou_d = {'uuid': ou.uuid, 'slug': ou.slug, 'name': 'new name'}
280
    ou_updated, status = import_ou(ou_d)
281
    assert status == 'updated'
282
    assert ou == ou_updated
283
    assert ou.name == 'new name'
284

  
285

  
286
def testi_import_site_empty():
287
    res = import_site({}, ImportContext())
288
    assert res.roles == {'created': [], 'updated': []}
289
    assert res.ous == {'created': [], 'updated': []}
290
    assert res.parentings == {'created': [], 'deleted': []}
291

  
292

  
293
def test_import_site_roles(db):
294
    parent_role_dict = {
295
        'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
296
        'natural_key': ['grand-parent-role', None, None]}
297
    child_role_dict = {
298
        'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict],
299
        'uuid': get_hex_uuid(), 'natural_key': ['child-role', None, None]}
300
    roles = [
301
        parent_role_dict,
302
        child_role_dict
303
    ]
304
    res = import_site({'roles': roles}, ImportContext())
305
    created_roles = res.roles['created']
306
    assert len(created_roles) == 2
307
    del parent_role_dict['natural_key']
308
    parent_role = Role.objects.get(**parent_role_dict)
309
    del child_role_dict['parents']
310
    del child_role_dict['natural_key']
311
    child_role = Role.objects.get(**child_role_dict)
312
    assert created_roles[0] == parent_role
313
    assert created_roles[1] == child_role
314

  
315
    assert len(res.parentings['created']) == 1
316
    assert res.parentings['created'][0] == RoleParenting.objects.get(
317
        child=child_role, parent=parent_role, direct=True)
318

  
319

  
320
def test_roles_import_ignore_technical_role(db):
321
    roles = [{
322
        'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}]
323
    res = import_site({'roles': roles}, ImportContext())
324
    assert res.roles == {'created': [], 'updated': []}
325

  
326

  
327
def test_roles_import_ignore_technical_role_with_service(db):
328
    roles = [{
329
        'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}]
330
    res = import_site({'roles': roles}, ImportContext())
331
    assert res.roles == {'created': [], 'updated': []}
332

  
333

  
334
def test_import_role_handle_manager_role_parenting(db):
335
    parent_role_dict = {
336
        'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
337
        'natural_key': ['grand-parent-role', None, None]}
338
    parent_role_manager_dict = {
339
        'name': 'Administrateur du role grand parent role',
340
        'slug': '_a2-managers-of-role-grand-parent-role', 'uuid': get_hex_uuid(),
341
        'natural_key': ['_a2-managers-of-role-grand-parent-role', None, None]}
342
    child_role_dict = {
343
        'name': 'child role', 'slug': 'child-role',
344
        'parents': [parent_role_dict, parent_role_manager_dict],
345
        'uuid': get_hex_uuid(), 'natural_key': ['child-role', None, None]}
346
    import_site({'roles': [child_role_dict, parent_role_dict]}, ImportContext())
347
    child = Role.objects.get(slug='child-role')
348
    manager = Role.objects.get(slug='_a2-managers-of-role-grand-parent-role')
349
    RoleParenting.objects.get(child=child, parent=manager, direct=True)
350

  
351

  
352
def test_import_roles_role_delete_orphans(db):
353
    roles = [{
354
        'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}]
355
    with pytest.raises(DataImportError):
356
        import_site({'roles': roles}, ImportContext(role_delete_orphans=True))
357

  
358

  
359
def test_import_ou(db):
360
    uuid = get_hex_uuid()
361
    name = 'ou name'
362
    ous = [{'uuid': uuid, 'slug': 'ou-slug', 'name': name}]
363
    res = import_site({'ous': ous}, ImportContext())
364
    assert len(res.ous['created']) == 1
365
    ou = res.ous['created'][0]
366
    assert ou.uuid == uuid
367
    assert ou.name == name
368
    Role.objects.get(slug='_a2-managers-of-ou-slug')
369

  
370

  
371
def test_import_ou_already_existing(db):
372
    uuid = get_hex_uuid()
373
    ou_d = {'uuid': uuid, 'slug': 'ou-slug', 'name': 'ou name'}
374
    ou = OU.objects.create(**ou_d)
375
    num_ous = OU.objects.count()
376
    res = import_site({'ous': [ou_d]}, ImportContext())
377
    assert len(res.ous['created']) == 0
378
    assert num_ous == OU.objects.count()
379
    assert ou == OU.objects.get(uuid=uuid)
tests/test_import_export_site_cmd.py
1
import json
2

  
3
from django.core import management
4
import pytest
5

  
6
from django_rbac.utils import get_role_model
7

  
8

  
9
def dummy_export_site(*args):
10
    return {'roles': [{'name': 'role1'}]}
11

  
12

  
13
def test_export_role_cmd_stdout(db, capsys, monkeypatch):
14
    import authentic2.management.commands.export_site
15
    monkeypatch.setattr(
16
        authentic2.management.commands.export_site, 'export_site', dummy_export_site)
17
    management.call_command('export_site')
18
    out, err = capsys.readouterr()
19
    assert json.loads(out) == dummy_export_site()
20

  
21

  
22
def test_export_role_cmd_to_file(db, monkeypatch, tmpdir):
23
    import authentic2.management.commands.export_site
24
    monkeypatch.setattr(
25
        authentic2.management.commands.export_site, 'export_site', dummy_export_site)
26
    outfile = tmpdir.join('export.json')
27
    management.call_command('export_site', '--output', outfile.strpath)
28
    with outfile.open('r') as f:
29
        assert json.loads(f.read()) == dummy_export_site()
30

  
31

  
32
def test_import_site_cmd(db, tmpdir, monkeypatch):
33
    export_file = tmpdir.join('roles-export.json')
34
    with export_file.open('w'):
35
        export_file.write(json.dumps({'roles': []}))
36
    management.call_command('import_site', export_file.strpath)
37

  
38

  
39
def test_import_site_cmd_infos_on_stdout(db, tmpdir, monkeypatch, capsys):
40
    export_file = tmpdir.join('roles-export.json')
41
    with export_file.open('w'):
42
        export_file.write(json.dumps(
43
            {'roles': [{
44
                'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
45
                'natural_key': ['role-slug', None, None]}]}))
46

  
47
    management.call_command('import_site', export_file.strpath)
48

  
49
    out, err = capsys.readouterr()
50
    assert "Dry run" in out
51
    assert "1 roles created" in out
52
    assert "0 roles updated" in out
53

  
54

  
55
def test_import_site_transaction_rollback_on_error(db, tmpdir, monkeypatch, capsys):
56
    export_file = tmpdir.join('roles-export.json')
57
    with export_file.open('w'):
58
        export_file.write(json.dumps({'roles': []}))
59

  
60
    Role = get_role_model()
61

  
62
    def exception_import_site(*args):
63
        Role.objects.create(slug='role-slug')
64
        raise Exception()
65

  
66
    import authentic2.management.commands.import_site
67
    monkeypatch.setattr(
68
        authentic2.management.commands.import_site, 'import_site', exception_import_site)
69

  
70
    with pytest.raises(Exception):
71
        management.call_command('import_site', '--no-dry-run', export_file.strpath)
72

  
73
    with pytest.raises(Role.DoesNotExist):
74
        Role.objects.get(slug='role-slug')
75

  
76

  
77
def test_import_site_transaction_rollback_on_dry_run(db, tmpdir, monkeypatch, capsys):
78
    export_file = tmpdir.join('roles-export.json')
79
    with export_file.open('w'):
80
        export_file.write(json.dumps(
81
            {'roles': [{
82
                'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
83
                'natural_key': ['role-slug', None, None]}]}))
84

  
85
    Role = get_role_model()
86

  
87
    management.call_command('import_site', export_file.strpath)
88

  
89
    with pytest.raises(Role.DoesNotExist):
90
        Role.objects.get(slug='role-slug')
91

  
92

  
93
def test_import_site_cmd_unhandled_context_option(db, tmpdir, monkeypatch, capsys):
94
    from authentic2.data_transfer import DataImportError
95

  
96
    export_file = tmpdir.join('roles-export.json')
97
    with export_file.open('w'):
98
        export_file.write(json.dumps(
99
            {'roles': [{
100
                'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
101
                'natural_key': ['role-slug', None, None]}]}))
102

  
103
    get_role_model().objects.create(uuid='dqfewrvesvews2532', slug='role-slug', name='role-name')
104

  
105
    with pytest.raises(DataImportError):
106
        management.call_command('import_site', '-o', 'role-delete-orphans', export_file.strpath)
107

  
108

  
109
def test_import_site_cmd_unknown_context_option(db, tmpdir, monkeypatch, capsys):
110
    from django.core.management.base import CommandError
111
    export_file = tmpdir.join('roles-export.json')
112
    with pytest.raises(CommandError):
113
        management.call_command('import_site', '-o', 'unknown-option', export_file.strpath)
0
-