Projet

Général

Profil

0001-import_site-export_site-commands-16514.patch

Emmanuel Cazenave, 06 mars 2018 15:05

Télécharger (35,9 ko)

Voir les différences:

Subject: [PATCH] import_site, export_site commands (#16514)

Only import and export roles.
 src/authentic2/a2_rbac/models.py                  |  56 +++++
 src/authentic2/data_transfer.py                   | 257 +++++++++++++++++++
 src/authentic2/management/commands/export_site.py |  25 ++
 src/authentic2/management/commands/import_site.py |  31 +++
 tests/test_a2_rbac.py                             | 105 +++++++-
 tests/test_data_transfer.py                       | 289 ++++++++++++++++++++++
 tests/test_import_export_site_cmd.py              | 119 +++++++++
 7 files changed, 881 insertions(+), 1 deletion(-)
 create mode 100644 src/authentic2/data_transfer.py
 create mode 100644 src/authentic2/management/commands/export_site.py
 create mode 100644 src/authentic2/management/commands/import_site.py
 create mode 100644 tests/test_data_transfer.py
 create mode 100644 tests/test_import_export_site_cmd.py
src/authentic2/a2_rbac/models.py
22 22

  
23 23
from . import managers, fields
24 24

  
25
SIMPLE_SERIALIZABLE_FIELDS = (models.TextField, models.CharField, models.SlugField,
26
                              models.URLField, models.BooleanField, models.IntegerField,
27
                              models.CommaSeparatedIntegerField, models.EmailField,
28
                              models.IntegerField, models.PositiveIntegerField)
29

  
25 30

  
26 31
class OrganizationalUnit(OrganizationalUnitAbstractBase):
27 32
    username_is_unique = models.BooleanField(
......
207 212
            'ou__slug': self.ou.slug if self.ou else None,
208 213
        }
209 214

  
215
    def export_json(self, attributes=False, parents=False, relations=False):
216
        d = {}
217
        concrete_fields = [f for f in self.__class__._meta.get_fields()
218
                           if f.concrete and not f.is_relation]
219
        for field in concrete_fields:
220
            if field.name == 'id':
221
                continue
222
            value = getattr(self, field.attname)
223
            if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS):
224
                d[field.name] = value
225
            else:
226
                raise Exception('export_json: field %s of ressource class %s is unsupported' % (
227
                    field, self.__class__))
228

  
229
        if relations:
230
            for attr_name in ('ou', 'service'):
231
                related_obj = getattr(self, attr_name)
232
                if related_obj:
233
                    for related_attr in ('slug', 'uuid', 'name'):
234
                        if hasattr(related_obj, related_attr):
235
                            d['%s_%s' % (attr_name, related_attr)] = getattr(
236
                                related_obj, related_attr)
237

  
238
        if attributes:
239
            for attribute in self.attributes.all():
240
                d.setdefault('attributes', []).append(attribute.to_json())
241

  
242
        if parents:
243
            RoleParenting = rbac_utils.get_role_parenting_model()
244
            for parenting in RoleParenting.objects.filter(child_id=self.id, direct=True):
245
                d.setdefault('parents', []).append(parenting.parent.export_json())
246

  
247
        return d
248

  
249
    @classmethod
250
    def import_json(cls, d):
251
        kwargs = {}
252
        concrete_fields = [f for f in cls._meta.get_fields()
253
                           if f.concrete and not f.is_relation]
254
        for field in concrete_fields:
255
            if field.name == 'id':
256
                continue
257
            if isinstance(field, SIMPLE_SERIALIZABLE_FIELDS) and field.name in d:
258
                kwargs[field.name] = d[field.name]
259

  
260
        return cls.objects.create(**kwargs)
261

  
210 262

  
211 263
class RoleParenting(RoleParentingAbstractBase):
212 264
    class Meta(RoleParentingAbstractBase.Meta):
......
239 291
            ('role', 'name', 'kind', 'value'),
240 292
        )
241 293

  
294
    def to_json(self):
295
        return {'name': self.name, 'kind': self.kind, 'value': self.value}
296

  
297

  
242 298
GenericRelation(Permission,
243 299
                content_type_field='target_ct',
244 300
                object_id_field='target_id').contribute_to_class(ContentType, 'admin_perms')
src/authentic2/data_transfer.py
1
from django_rbac.utils import get_ou_model,  get_role_model, get_role_parenting_model
2
from authentic2.a2_rbac.models import RoleAttribute
3
from authentic2.models import Service
4

  
5

  
6
def export_roles(role_queryset):
7
    """ Serialize every role in queryset
8
    """
9
    roles = []
10
    for role in role_queryset:
11
        roles.append(role.export_json(attributes=True, parents=True, relations=True))
12
    return {'roles': roles}
13

  
14

  
15
class DBSearchStrategy(object):
16
    """DB search strategy.
17
    model_cls objects will be searched using the specified search_attrs,
18
    one at a time.
19
    """
20
    def __init__(self, model_cls, search_attrs):
21
        self._model_cls = model_cls
22
        self.search_attrs = search_attrs
23

  
24
    @property
25
    def model_name(self):
26
        return self._model_cls._meta.model_name
27

  
28
    def search(self, d):
29
        for attr in self.search_attrs:
30
            if attr in d:
31
                try:
32
                    return self._model_cls.objects.get(**{attr: d[attr]})
33
                except self._model_cls.DoesNotExist:
34
                    pass
35
        return None
36

  
37

  
38
class RoleRelatedObj(object):
39
    """
40
    service or ou
41
    """
42

  
43
    def __init__(self, data, searcher):
44
        self._data = data
45
        self._searcher = searcher
46
        self._obj = None
47
        self._checked = False
48

  
49
    def __eq__(self, other):
50
        return hash(self) == hash(other)
51

  
52
    def __hash__(self):
53
        to_hash = [self.model_name]
54
        for attr in self._searcher.search_attrs:
55
            to_hash.extend([attr, self._data[attr]])
56
        return hash(tuple(to_hash))
57

  
58
    def __str__(self):
59
        res = self.model_name
60
        for attr in self._searcher.search_attrs:
61
            res += " <%s: %s>" % (attr, self._data[attr])
62
        return res
63

  
64
    @property
65
    def model_name(self):
66
        return self._searcher.model_name
67

  
68
    @property
69
    def obj(self):
70
        """ Return the matching db obj (None if there is no match)
71
        """
72
        if self._checked:
73
            return self._obj
74
        self._obj = self._searcher.search(self._data)
75
        self._checked = True
76
        return self._obj
77

  
78

  
79
class ImportContext(object):
80
    """ Holds information on how to perform the import.
81

  
82
    role_update : if True, an already existing role will be checked and potentially
83
                  updated to match exactly what is described in the json export.
84
                  Service and OU relation could be added, changed or deleted
85
                  RoleAttributes could be added, changed our deleted
86
                  RoleParenting could be added, changed our deleted
87

  
88
    role_delete_orphans: if True any existing role that is not found in the export will
89
                         be deleted
90
    """
91

  
92
    def __init__(self, role_update=False, role_delete_orphans=False):
93
        self.role_update = role_update
94
        self.role_delete_orphans = role_delete_orphans
95

  
96

  
97
class DataImportError(Exception):
98
    pass
99

  
100

  
101
class MissingRelated(DataImportError):
102
    pass
103

  
104

  
105
class RoleDeserializer(object):
106

  
107
    def __init__(self, d, import_context):
108
        self._role_data = {}
109
        self._import_context = import_context
110
        self.ou = None
111
        self.service = None
112
        self.obj = None
113
        self._role_created = False
114
        self._role_updated = False
115
        self._role_noops = False
116
        self.parents = None
117
        self.attributes = None
118
        self._role_cls = get_role_model()
119
        self._searcher = DBSearchStrategy(self._role_cls, ('slug', 'uuid', 'name'))
120

  
121
        ou_d = {}
122
        service_d = {}
123
        for key, value in d.items():
124
            if key.startswith('ou_'):
125
                ou_d[key[3:]] = value
126
            elif key.startswith('service_'):
127
                service_d[key[8:]] = value
128
            elif key == 'parents':
129
                self.parents = value
130
            elif key == 'attributes':
131
                self.attributes = value
132
            else:
133
                self._role_data[key] = value
134
        if ou_d:
135
            self.ou = RoleRelatedObj(
136
                ou_d, DBSearchStrategy(get_ou_model(), ('slug', 'uuid', 'name')))
137
        if service_d:
138
            self.service = RoleRelatedObj(service_d, DBSearchStrategy(Service, ('slug', 'name')))
139

  
140
    def _deserialize_status(self):
141
        if self._role_created:
142
            return 'created'
143
        if self._role_updated:
144
            return 'updated'
145
        if self._role_noops:
146
            return 'noops'
147

  
148
    def check_related(self):
149
        missing_related = []
150
        for related in (self.ou, self.service):
151
            if related is not None and related.obj is None:
152
                missing_related.append(related)
153
        return missing_related
154

  
155
    def deserialize(self):
156
        obj = self._searcher.search(self._role_data)
157
        if obj:  # Role already exist
158
            self._role_noops = True
159
            self.obj = obj
160
            if self._import_context.role_update:
161
                # FIXME : do the job instead of crashing
162
                # check ou and service
163
                # check role attributes
164
                self._role_updated = True
165
                raise DataImportError(
166
                    "Unsupported context value for role_update : %s" % (
167
                        self._import_context.role_update))
168
        else:  # Create role
169
            kwargs = self._role_data.copy()
170
            self.obj = self._role_cls.import_json(kwargs)
171

  
172
            if self.ou or self.service:
173
                if self.ou:
174
                    self.obj.ou = self.ou.obj
175
                if self.service:
176
                    self.obj.service = self.service.obj
177
                self.obj.save()
178

  
179
            # Create attributes
180
            if self.attributes:
181
                for attr_dict in self.attributes:
182
                    attr_dict['role'] = self.obj
183
                    RoleAttribute.objects.create(**attr_dict)
184
            self._role_created = True
185

  
186
        return self.obj, self._deserialize_status()
187

  
188
    def parentings(self):
189
        created, updated, deleted = [], [], []
190
        if self.parents:
191
            if self._role_created:
192
                # Create parenting
193
                for parent_d in self.parents:
194
                    if not parent_d['slug'].startswith('_'):
195
                        parent = self._searcher.search(parent_d)
196
                        if not parent:
197
                            raise DataImportError("Could not find role : %s" % parent_d)
198
                        created.append(get_role_parenting_model().objects.create(
199
                            child=self.obj, direct=True, parent=parent))
200
            else:
201
                if self._import_context.role_update:
202
                    # FIXME : update parenting instead of crashing
203
                    raise DataImportError(
204
                        "Unsupported context value for role_update : %s" % (
205
                            self._import_context.role_update))
206

  
207
        return created, updated, deleted
208

  
209

  
210
class ImportResult(object):
211

  
212
    def __init__(self):
213
        self.roles = {'created': [], 'updated': [], 'noops': []}
214
        self.parentings = {'created': [], 'updated': [], 'deleted': []}
215
        self.missing_related = {}
216

  
217
    def update_roles(self, role, d_status):
218
        self.roles[d_status].append(role)
219

  
220
    def update_parentings(self, created, updated, deleted):
221
        self.parentings['created'].extend(created)
222
        self.parentings['updated'].extend(updated)
223
        self.parentings['deleted'].extend(deleted)
224

  
225
    def update_missing_related(self, related_l):
226
        for related in related_l:
227
            self.missing_related.setdefault(related.model_name, set()).add(related)
228

  
229
    @property
230
    def success(self):
231
        return not self.missing_related
232

  
233

  
234
def import_roles(roles_list, import_context):
235
    result = ImportResult()
236
    roles_ds = [RoleDeserializer(role_d, import_context) for role_d in roles_list
237
                if not role_d['slug'].startswith('_')]
238

  
239
    for ds in roles_ds:
240
        result.update_missing_related(ds.check_related())
241

  
242
    if not result.success:  # Some related objs are missing
243
        return result
244

  
245
    for ds in roles_ds:
246
        result.update_roles(*ds.deserialize())
247

  
248
    for ds in roles_ds:
249
        result.update_parentings(*ds.parentings())
250

  
251
    if import_context.role_delete_orphans:
252
        # FIXME : delete each role that is in DB but not in the export
253
        raise DataImportError(
254
            "Unsupported context value for role_delete_orphans : %s" % (
255
                import_context.role_delete_orphans))
256

  
257
    return result
src/authentic2/management/commands/export_site.py
1
import json
2
import sys
3

  
4
from django.core.management.base import BaseCommand
5

  
6
from authentic2.data_transfer import export_roles
7
from django_rbac.utils import get_role_model
8

  
9

  
10
class Command(BaseCommand):
11
    help = 'Export site'
12

  
13
    def add_arguments(self, parser):
14
        parser.add_argument('--output', metavar='FILE', default=None,
15
                            help='name of a file to write output to')
16

  
17
    def handle(self, *args, **options):
18
        if options['output']:
19
            output, close = open(options['output'], 'w'), True
20
        else:
21
            output, close = sys.stdout, False
22
        json.dump(export_roles(
23
            get_role_model().objects.exclude(slug__startswith='_').all()), output, indent=4)
24
        if close:
25
            output.close()
src/authentic2/management/commands/import_site.py
1
import json
2
import sys
3

  
4
from django.core.management.base import BaseCommand
5
from django.db import transaction
6

  
7

  
8
from authentic2.data_transfer import import_roles, ImportContext
9

  
10

  
11
class Command(BaseCommand):
12
    help = 'Import site'
13

  
14
    def add_arguments(self, parser):
15
        parser.add_argument('filename', metavar='FILENAME', type=str,
16
                            help='name of file to import')
17

  
18
    def handle(self, filename, **options):
19
        with open(filename, 'r') as f:
20
            with transaction.atomic():
21
                result = import_roles(json.load(f)['roles'], ImportContext())
22
            if not result.success:
23
                sys.stderr.write("Error : could not import roles due to missing related objects\n")
24
                for relation, objs in result.missing_related.items():
25
                    for obj in objs:
26
                        sys.stderr.write("Missing %s \n" % obj)
27
                sys.exit(1)
28

  
29
            sys.stdout.write("Success\n")
30
            for ops, roles in result.roles.items():
31
                sys.stdout.write("%s roles %s \n" % (len(roles), ops))
tests/test_a2_rbac.py
1 1
import pytest
2 2

  
3
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU, RoleAttribute
3 4
from authentic2.models import Service
4
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU
5
from authentic2.utils import get_hex_uuid
5 6

  
6 7

  
7 8
def test_role_natural_key(db):
......
24 25
        Role.objects.get_by_natural_key(*r2.natural_key())
25 26
    with pytest.raises(Role.DoesNotExist):
26 27
        Role.objects.get_by_natural_key(*r4.natural_key())
28

  
29

  
30
def test_basic_role_export_json(db):
31
    role = Role.objects.create(
32
        name='basic role', slug='basic-role', description='basic role description')
33
    role_dict = role.export_json()
34
    assert role_dict['name'] == role.name
35
    assert role_dict['slug'] == role.slug
36
    assert role_dict['uuid'] == role.uuid
37
    assert role_dict['description'] == role.description
38
    assert role_dict['admin_scope_id'] == role.admin_scope_id
39
    assert role_dict['external_id'] == role.external_id
40

  
41

  
42
def test_role_with_ou_export_json(db):
43
    ou = OU.objects.create(name='ou', slug='ou')
44
    role = Role.objects.create(name='some role', ou=ou)
45
    role_dict = role.export_json(relations=True)
46
    assert role_dict['ou_uuid'] == ou.uuid
47
    assert role_dict['ou_name'] == ou.name
48
    assert role_dict['ou_slug'] == ou.slug
49

  
50

  
51
def test_role_with_service_export_json(db):
52
    service = Service.objects.create(name='service name', slug='service-name')
53
    role = Role.objects.create(name='some role', service=service)
54
    role_dict = role.export_json(relations=True)
55
    assert role_dict['service_name'] == service.name
56
    assert role_dict['service_slug'] == service.slug
57

  
58

  
59
def test_role_with_attributes_export_json(db):
60
    role = Role.objects.create(name='some role')
61
    attr1 = RoleAttribute.objects.create(
62
        role=role, name='attr1_name', kind='string', value='attr1_value')
63
    attr2 = RoleAttribute.objects.create(
64
        role=role, name='attr2_name', kind='string', value='attr2_value')
65

  
66
    role_dict = role.export_json(attributes=True)
67
    attributes = role_dict['attributes']
68
    assert len(attributes) == 2
69

  
70
    expected_attr_names = set([attr1.name, attr2.name])
71
    for attr_dict in attributes:
72
        assert attr_dict['name'] in expected_attr_names
73
        expected_attr_names.remove(attr_dict['name'])
74
        target_attr = RoleAttribute.objects.filter(name=attr_dict['name']).first()
75
        assert attr_dict['kind'] == target_attr.kind
76
        assert attr_dict['value'] == target_attr.value
77

  
78

  
79
def test_role_with_parents_export_json(db):
80
    grand_parent_role = Role.objects.create(
81
        name='test grand parent role', slug='test-grand-parent-role')
82
    parent_1_role = Role.objects.create(
83
        name='test parent 1 role', slug='test-parent-1-role')
84
    parent_1_role.add_parent(grand_parent_role)
85
    parent_2_role = Role.objects.create(
86
        name='test parent 2 role', slug='test-parent-2-role')
87
    parent_2_role.add_parent(grand_parent_role)
88
    child_role = Role.objects.create(
89
        name='test child role', slug='test-child-role')
90
    child_role.add_parent(parent_1_role)
91
    child_role.add_parent(parent_2_role)
92

  
93
    child_role_dict = child_role.export_json(parents=True)
94
    assert child_role_dict['slug'] == child_role.slug
95
    parents = child_role_dict['parents']
96
    assert len(parents) == 2
97
    expected_slugs = set([parent_1_role.slug, parent_2_role.slug])
98
    for parent in parents:
99
        assert parent['slug'] in expected_slugs
100
        expected_slugs.remove(parent['slug'])
101

  
102
    grand_parent_role_dict = grand_parent_role.export_json(parents=True)
103
    assert grand_parent_role_dict['slug'] == grand_parent_role.slug
104
    assert 'parents' not in grand_parent_role_dict
105

  
106
    parent_1_role_dict = parent_1_role.export_json(parents=True)
107
    assert parent_1_role_dict['slug'] == parent_1_role.slug
108
    parents = parent_1_role_dict['parents']
109
    assert len(parents) == 1
110
    assert parents[0]['slug'] == grand_parent_role.slug
111

  
112
    parent_2_role_dict = parent_2_role.export_json(parents=True)
113
    assert parent_2_role_dict['slug'] == parent_2_role.slug
114
    parents = parent_2_role_dict['parents']
115
    assert len(parents) == 1
116
    assert parents[0]['slug'] == grand_parent_role.slug
117

  
118

  
119
def test_basic_role_import_json(db):
120
    role_dict = dict(
121
        name='basic role', slug='basic-role', description='basic role description',
122
        uuid=get_hex_uuid(), admin_scope_id=1, external_id='some id')
123
    role = Role.import_json(role_dict)
124
    assert role_dict['name'] == role.name
125
    assert role_dict['slug'] == role.slug
126
    assert role_dict['uuid'] == role.uuid
127
    assert role_dict['description'] == role.description
128
    assert role_dict['admin_scope_id'] == role.admin_scope_id
129
    assert role_dict['external_id'] == role.external_id
tests/test_data_transfer.py
1
from django_rbac.utils import get_role_model, get_ou_model
2
import pytest
3

  
4
from authentic2.a2_rbac.models import RoleParenting
5
from authentic2.data_transfer import (
6
    DataImportError, DBSearchStrategy, export_roles, import_roles, ImportContext, ImportResult,
7
    RoleDeserializer, RoleRelatedObj)
8
from authentic2.models import Service
9
from authentic2.utils import get_hex_uuid
10

  
11

  
12
Role = get_role_model()
13
OU = get_ou_model()
14

  
15

  
16
def test_export_basic_role(db):
17
    role = Role.objects.create(name='basic role', slug='basic-role', uuid=get_hex_uuid())
18
    query_set = Role.objects.filter(uuid=role.uuid)
19
    export = export_roles(query_set)
20
    roles = export['roles']
21
    assert len(roles) == 1
22
    role_dict = roles[0]
23
    for key, value in role.export_json().items():
24
        assert role_dict[key] == value
25

  
26

  
27
def test_export_role_with_parents(db):
28
    grand_parent_role = Role.objects.create(
29
        name='test grand parent role', slug='test-grand-parent-role', uuid=get_hex_uuid())
30
    parent_1_role = Role.objects.create(
31
        name='test parent 1 role', slug='test-parent-1-role', uuid=get_hex_uuid())
32
    parent_1_role.add_parent(grand_parent_role)
33
    parent_2_role = Role.objects.create(
34
        name='test parent 2 role', slug='test-parent-2-role', uuid=get_hex_uuid())
35
    parent_2_role.add_parent(grand_parent_role)
36
    child_role = Role.objects.create(
37
        name='test child role', slug='test-child-role', uuid=get_hex_uuid())
38
    child_role.add_parent(parent_1_role)
39
    child_role.add_parent(parent_2_role)
40

  
41
    query_set = Role.objects.filter(slug__startswith='test').order_by('slug')
42
    export = export_roles(query_set)
43
    roles = export['roles']
44
    assert len(roles) == 4
45

  
46
    child_role_dict = roles[0]
47
    assert child_role_dict['slug'] == child_role.slug
48
    parents = child_role_dict['parents']
49
    assert len(parents) == 2
50
    expected_slugs = set([parent_1_role.slug, parent_2_role.slug])
51
    for parent in parents:
52
        assert parent['slug'] in expected_slugs
53
        expected_slugs.remove(parent['slug'])
54

  
55
    grand_parent_role_dict = roles[1]
56
    assert grand_parent_role_dict['slug'] == grand_parent_role.slug
57

  
58
    parent_1_role_dict = roles[2]
59
    assert parent_1_role_dict['slug'] == parent_1_role.slug
60
    parents = parent_1_role_dict['parents']
61
    assert len(parents) == 1
62
    assert parents[0]['slug'] == grand_parent_role.slug
63

  
64
    parent_2_role_dict = roles[3]
65
    assert parent_2_role_dict['slug'] == parent_2_role.slug
66
    parents = parent_2_role_dict['parents']
67
    assert len(parents) == 1
68
    assert parents[0]['slug'] == grand_parent_role.slug
69

  
70

  
71
def test_db_search(db):
72
    role_search = DBSearchStrategy(Role, ('slug', 'uuid', 'name'))
73

  
74
    assert role_search.model_name == Role._meta.model_name
75

  
76
    role = Role.objects.create(slug='slug1')
77
    assert role == role_search.search({'slug': 'slug1'})
78

  
79
    role = Role.objects.create(slug='slug2')
80
    assert role == role_search.search({'slug': 'non-matching-slug', 'uuid': role.uuid})
81

  
82
    role = Role.objects.create(slug='slug3', name='some name')
83
    assert role == role_search.search(
84
        {'slug': 'non-matching-slug', 'uuid': get_hex_uuid(), 'name': 'some name'})
85

  
86
    role = Role.objects.create(slug='slug4', name='some name')
87
    obj = role_search.search(
88
        {'slug': 'non-matching-slug', 'uuid': get_hex_uuid(), 'name': 'non matching name'})
89
    assert obj is None
90

  
91

  
92
def test_db_search_accept_missing_data(db):
93
    role_search = DBSearchStrategy(Role, ('slug', 'uuid', 'name'))
94
    Role.objects.create(slug='slug1')
95
    assert role_search.search({}) is None
96

  
97

  
98
def test_role_related_obj(db):
99
    ou = OU.objects.create(slug='some-ou', name='some ou', uuid=get_hex_uuid())
100
    ou_search = DBSearchStrategy(OU, ('slug', 'uuid', 'name'))
101
    rro = RoleRelatedObj({'slug': ou.slug, 'uuid': ou.uuid, 'name': ou.name}, ou_search)
102
    assert rro.model_name == OU._meta.model_name
103
    assert rro.obj == ou
104
    assert hash(rro) == hash(rro)
105
    assert str(rro) == "%s <slug: some-ou> <uuid: %s> <name: some ou>" % (rro.model_name, ou.uuid)
106

  
107
    rro = RoleRelatedObj(
108
        {'slug': 'unkown-slug', 'uuid': get_hex_uuid(), 'name': 'unkown name'}, ou_search)
109
    assert rro.obj is None
110

  
111

  
112
def test_import_result(db):
113
    ou_search = DBSearchStrategy(OU, ('slug', 'uuid', 'name'))
114
    rro = RoleRelatedObj({'slug': 'ou-slug', 'uuid': 'asefdvsdvdsv', 'name': 'ou name'}, ou_search)
115
    imp_res = ImportResult()
116
    imp_res.update_missing_related([rro])
117
    assert rro in imp_res.missing_related[rro.model_name]
118

  
119

  
120
def test_role_deserializer(db):
121
    rd = RoleDeserializer({
122
        'name': 'some role', 'description': 'some role description', 'slug': 'some-role',
123
        'uuid': get_hex_uuid()}, ImportContext())
124
    assert rd.ou is None
125
    assert rd.service is None
126
    assert rd.parents is None
127
    assert rd.attributes is None
128
    assert rd.obj is None
129
    role, status = rd.deserialize()
130
    assert status == 'created'
131
    assert role.name == 'some role'
132
    assert role.description == 'some role description'
133
    assert role.slug == 'some-role'
134
    assert rd.obj == role
135

  
136

  
137
def test_role_deserializer_with_ou(db):
138
    ou = get_ou_model().objects.create(name='some ou', slug='some-ou')
139
    rd = RoleDeserializer({
140
        'name': 'some role', 'description': 'some role description', 'slug': 'some-role',
141
        'ou_name': 'some ou', 'ou_slug': 'some-ou'}, ImportContext())
142
    role, status = rd.deserialize()
143
    assert role.ou == ou
144

  
145

  
146
def test_role_deserializer_missing_ou(db):
147
    rd = RoleDeserializer({
148
        'name': 'some role', 'description': 'role description', 'slug': 'some-role',
149
        'ou_name': 'some ou', 'ou_slug': 'some-ou'}, ImportContext())
150
    missing_related = rd.check_related()
151
    assert len(missing_related) == 1
152
    assert missing_related[0]._data == {'slug': 'some-ou', 'name': 'some ou'}
153

  
154

  
155
def test_role_deserializer_with_service(db):
156
    service = Service.objects.create(name='some service', slug='some-service')
157
    rd = RoleDeserializer({
158
        'name': 'some role', 'description': 'some role description', 'slug': 'some-role',
159
        'service_name': service.name, 'service_slug': service.slug}, ImportContext())
160
    role, status = rd.deserialize()
161
    assert role.service == service
162

  
163

  
164
def test_role_deserializer_missing_service(db):
165
    rd = RoleDeserializer({
166
        'name': 'some role', 'description': 'some role description', 'slug': 'some-role',
167
        'service_name': 'some service', 'service_slug': 'some-service'}, ImportContext())
168
    missing_related = rd.check_related()
169
    assert len(missing_related) == 1
170
    assert missing_related[0]._data == {'slug': 'some-service', 'name': 'some service'}
171

  
172

  
173
def test_role_deserializer_with_attributes(db):
174

  
175
    attributes_data = {
176
        'attr1_name': dict(name='attr1_name', kind='string', value='attr1_value'),
177
        'attr2_name': dict(name='attr2_name', kind='string', value='attr2_value')
178
    }
179
    rd = RoleDeserializer({
180
        'name': 'some role', 'description': 'some role description', 'slug': 'some-role',
181
        'attributes': list(attributes_data.values())}, ImportContext())
182
    role, status = rd.deserialize()
183
    attributes = role.attributes.all()
184
    assert len(attributes) == 2
185

  
186
    for attr in attributes:
187
        attr_dict = attributes_data[attr.name]
188
        assert attr_dict['name'] == attr.name
189
        assert attr_dict['kind'] == attr.kind
190
        assert attr_dict['value'] == attr.value
191
        del attributes_data[attr.name]
192

  
193

  
194
def test_role_deserializer_role_update(db):
195
    role_dict = {'name': 'some role', 'slug': 'some-role'}
196
    Role.objects.create(**role_dict)
197
    rd = RoleDeserializer(role_dict, ImportContext(role_update=True))
198
    with pytest.raises(DataImportError):
199
        rd.deserialize()
200

  
201

  
202
def test_role_deserializer_parenting_existing_parent(db):
203
    parent_role_dict = {'name': 'grand parent role', 'slug': 'grand-parent-role'}
204
    parent_role = Role.objects.create(**parent_role_dict)
205
    child_role_dict = {'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict]}
206

  
207
    rd = RoleDeserializer(child_role_dict, ImportContext())
208
    child_role, status = rd.deserialize()
209
    created, updated, deleted = rd.parentings()
210

  
211
    assert len(created) == 1
212
    parenting = created[0]
213
    assert parenting.direct is True
214
    assert parenting.parent == parent_role
215
    assert parenting.child == child_role
216

  
217

  
218
def test_role_deserializer_parenting_non_existing_parent(db):
219
    parent_role_dict = {'name': 'grand parent role', 'slug': 'grand-parent-role'}
220
    child_role_dict = {'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict]}
221
    rd = RoleDeserializer(child_role_dict, ImportContext())
222
    rd.deserialize()
223
    with pytest.raises(DataImportError) as excinfo:
224
        rd.parentings()
225

  
226
    assert "Could not find role" in str(excinfo.value)
227

  
228

  
229
def test_role_deserializer_parenting_role_update(db):
230
    parent_role_dict = {'name': 'grand parent role', 'slug': 'grand-parent-role'}
231
    child_role_dict = {'name': 'child role', 'slug': 'child-role'}
232
    Role.objects.create(**child_role_dict)
233
    child_role_dict['parents'] = [parent_role_dict]
234

  
235
    import_context = ImportContext(role_update=False)
236
    rd = RoleDeserializer(child_role_dict, import_context)
237
    rd.deserialize()
238

  
239
    with pytest.raises(DataImportError) as excinfo:
240
        import_context.role_update = True
241
        rd.parentings()
242

  
243
    assert "Unsupported context value" in str(excinfo.value)
244

  
245

  
246
def test_empty_roles_import():
247
    res = import_roles([], ImportContext())
248
    assert res.success
249
    assert res.roles == {'created': [], 'updated': [], 'noops': []}
250
    assert res.parentings == {'created': [], 'updated': [], 'deleted': []}
251
    assert res.missing_related == {}
252

  
253

  
254
def test_import_roles(db):
255
    parent_role_dict = {'name': 'grand parent role', 'slug': 'grand-parent-role'}
256
    child_role_dict = {'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict]}
257
    roles = [
258
        parent_role_dict,
259
        child_role_dict
260
    ]
261

  
262
    res = import_roles(roles, ImportContext())
263
    assert res.success is True
264
    created_roles = res.roles['created']
265
    assert len(created_roles) == 2
266
    parent_role = Role.objects.get(**parent_role_dict)
267
    del child_role_dict['parents']
268
    child_role = Role.objects.get(**child_role_dict)
269
    assert created_roles[0] == parent_role
270
    assert created_roles[1] == child_role
271

  
272
    assert len(res.parentings['created']) == 1
273
    assert res.parentings['created'][0] == RoleParenting.objects.get(
274
        child=child_role, parent=parent_role, direct=True)
275

  
276

  
277
def test_roles_import_ignore_technical_role(db):
278
    roles = [{
279
        'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}]
280
    res = import_roles(roles, ImportContext())
281
    assert res.success is True
282
    assert res.roles == {'created': [], 'updated': [], 'noops': []}
283

  
284

  
285
def test_import_roles_role_delete_orphans(db, tmpdir, monkeypatch):
286
    roles = [{
287
        'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}]
288
    with pytest.raises(DataImportError):
289
        import_roles(roles, ImportContext(role_delete_orphans=True))
tests/test_import_export_site_cmd.py
1
import json
2
from mock import Mock
3

  
4
from django.core import management
5
import pytest
6

  
7
from django_rbac.utils import get_role_model
8

  
9

  
10
def dummy_export_roles(*args):
11
    return {'roles': [{'name': 'role1'}]}
12

  
13

  
14
class DummyImportResult(object):
15

  
16
    def __init__(self, success, missing_related, roles):
17
        self.success = success
18
        self.missing_related = missing_related
19
        self.roles = roles
20

  
21

  
22
def test_export_role_cmd_stdout(db, capsys, monkeypatch):
23
    import authentic2.management.commands.export_site
24
    monkeypatch.setattr(
25
        authentic2.management.commands.export_site, 'export_roles', dummy_export_roles)
26
    management.call_command('export_site')
27
    out, err = capsys.readouterr()
28
    assert json.loads(out) == dummy_export_roles()
29

  
30

  
31
def test_export_role_cmd_to_file(db, monkeypatch, tmpdir):
32
    import authentic2.management.commands.export_site
33
    monkeypatch.setattr(
34
        authentic2.management.commands.export_site, 'export_roles', dummy_export_roles)
35
    outfile = tmpdir.join('export.json')
36
    management.call_command('export_site', '--output', outfile.strpath)
37
    with outfile.open('r') as f:
38
        assert json.loads(f.read()) == dummy_export_roles()
39

  
40

  
41
def test_import_role_cmd(db, tmpdir, monkeypatch):
42
    export_file = tmpdir.join('roles-export.json')
43
    with export_file.open('w'):
44
        export_file.write(json.dumps({'roles': []}))
45
    management.call_command('import_site', export_file.strpath)
46

  
47

  
48
def test_import_role_cmd_error_on_stderr(db, tmpdir, monkeypatch, capsys):
49
    export_file = tmpdir.join('roles-export.json')
50
    with export_file.open('w'):
51
        export_file.write(json.dumps({'roles': []}))
52

  
53
    def error_import_roles(*args):
54
        return DummyImportResult(
55
            success=False, missing_related={'ou': ['ou1', 'ou2']},
56
            roles={'created': [], 'updated': [], 'deleted': []})
57

  
58
    import authentic2.management.commands.import_site
59
    monkeypatch.setattr(
60
        authentic2.management.commands.import_site, 'import_roles', error_import_roles)
61

  
62
    def sys_exit_side_effect(arg):
63
        return
64

  
65
    sys_exit = Mock(side_effect=sys_exit_side_effect)
66
    import sys
67
    monkeypatch.setattr(sys, 'exit', sys_exit)
68

  
69
    management.call_command('import_site', export_file.strpath)
70

  
71
    out, err = capsys.readouterr()
72
    assert "could not import roles due to missing" in err
73
    assert "Missing ou" in err
74
    assert "ou1" in err
75
    assert "ou2" in err
76
    sys_exit.assert_called_with(1)
77

  
78

  
79
def test_import_role_cmd_infos_on_stdout(db, tmpdir, monkeypatch, capsys):
80
    export_file = tmpdir.join('roles-export.json')
81
    with export_file.open('w'):
82
        export_file.write(json.dumps({'roles': []}))
83

  
84
    def dummy_import_roles(*args):
85
        return DummyImportResult(
86
            success=True, missing_related={},
87
            roles={'created': ['role1'], 'updated': [], 'deleted': []})
88

  
89
    import authentic2.management.commands.import_site
90
    monkeypatch.setattr(
91
        authentic2.management.commands.import_site, 'import_roles', dummy_import_roles)
92

  
93
    management.call_command('import_site', export_file.strpath)
94

  
95
    out, err = capsys.readouterr()
96
    assert "Success" in out
97
    assert "1 roles created" in out
98

  
99

  
100
def test_import_role_transaction_rollback(db, tmpdir, monkeypatch, capsys):
101
    export_file = tmpdir.join('roles-export.json')
102
    with export_file.open('w'):
103
        export_file.write(json.dumps({'roles': []}))
104

  
105
    Role = get_role_model()
106

  
107
    def exception_import_roles(*args):
108
        Role.objects.create(slug='role-slug')
109
        raise ValueError()
110

  
111
    import authentic2.management.commands.import_site
112
    monkeypatch.setattr(
113
        authentic2.management.commands.import_site, 'import_roles', exception_import_roles)
114

  
115
    with pytest.raises(ValueError):
116
        management.call_command('import_site', export_file.strpath)
117

  
118
    with pytest.raises(Role.DoesNotExist):
119
        Role.objects.get(slug='role-slug')
0
-