Projet

Général

Profil

0002-data_transfer-add-export-context-29162.patch

Benjamin Dauvergne, 18 décembre 2018 15:56

Télécharger (11,1 ko)

Voir les différences:

Subject: [PATCH 2/5] data_transfer: add export context (#29162)

 src/authentic2/data_transfer.py | 121 ++++++++++++++++++++------------
 tests/test_data_transfer.py     |  67 +++++++++++++++---
 2 files changed, 134 insertions(+), 54 deletions(-)
src/authentic2/data_transfer.py
2 2

  
3 3
from django_rbac.models import Operation
4 4
from django_rbac.utils import (
5
    get_ou_model,  get_role_model, get_role_parenting_model, get_permission_model)
5
    get_ou_model, get_role_model, get_role_parenting_model, get_permission_model)
6 6
from authentic2.a2_rbac.models import RoleAttribute
7 7
from authentic2.utils import update_model
8 8

  
9 9

  
10
def export_site():
11
    return {
12
        'roles': export_roles(get_role_model().objects.all()),
13
        'ous': export_ou(get_ou_model().objects.all())
14
    }
10
class ExportContext(object):
11
    _role_qs = None
12
    _ou_qs = None
13
    export_roles = None
14
    export_ous = None
15 15

  
16
    def __init__(self, role_qs=None, ou_qs=None, export_roles=True, export_ous=True):
17
        self._role_qs = role_qs
18
        self._ou_qs = ou_qs
19
        self.export_roles = export_roles
20
        self.export_ous = export_ous
16 21

  
17
def export_ou(ou_query_set):
18
    return [ou.export_json() for ou in ou_query_set]
22
    @property
23
    def role_qs(self):
24
        return self._role_qs or get_role_model().objects.all()
19 25

  
26
    @property
27
    def ou_qs(self):
28
        return self._ou_qs or get_ou_model().objects.all()
20 29

  
21
def export_roles(role_queryset):
30

  
31
def export_site(context=None):
32
    context = context or ExportContext()
33
    d = {}
34
    if context.export_roles:
35
        d['roles'] = export_roles(context)
36
    if context.export_ous:
37
        d['ous'] = export_ous(context)
38
    return d
39

  
40

  
41
def export_ous(context):
42
    return [ou.export_json() for ou in context.ou_qs]
43

  
44

  
45
def export_roles(context):
22 46
    """ Serialize roles in role_queryset
23 47
    """
24 48
    return [
25 49
        role.export_json(attributes=True, parents=True, permissions=True)
26
        for role in role_queryset
50
        for role in context.role_qs
27 51
    ]
28 52

  
29 53

  
......
66 90
    """
67 91

  
68 92
    def __init__(
69
            self, role_delete_orphans=False, role_parentings_update=True,
70
            role_permissions_update=True, role_attributes_update=True,
93
            self,
94
            import_roles=True,
95
            import_ous=True,
96
            role_delete_orphans=False,
97
            role_parentings_update=True,
98
            role_permissions_update=True,
99
            role_attributes_update=True,
71 100
            ou_delete_orphans=False):
101
        self.import_roles = import_roles
102
        self.import_ous = import_ous
72 103
        self.role_delete_orphans = role_delete_orphans
73 104
        self.ou_delete_orphans = ou_delete_orphans
74 105
        self.role_parentings_update = role_parentings_update
......
81 112

  
82 113

  
83 114
class RoleDeserializer(object):
84

  
85 115
    def __init__(self, d, import_context):
86 116
        self._import_context = import_context
87 117
        self._obj = None
......
106 136
        ou = None if not has_ou else search_ou(ou_d)
107 137
        if has_ou and not ou:
108 138
            raise DataImportError(
109
                    "Can't import role because missing Organizational Unit : "
110
                    "%s" % ou_d)
139
                "Can't import role because missing Organizational Unit : "
140
                "%s" % ou_d)
111 141

  
112 142
        kwargs = self._role_d.copy()
113
        del kwargs['ou']
114
        del kwargs['service']
143
        kwargs.pop('ou', None)
144
        kwargs.pop('service', None)
115 145
        if has_ou:
116 146
            kwargs['ou'] = ou
117 147

  
......
243 273
    return ou, status
244 274

  
245 275

  
246
def import_site(json_d, import_context):
276
def import_site(json_d, import_context=None):
277
    import_context = import_context or ImportContext()
247 278
    result = ImportResult()
248 279

  
249 280
    if not isinstance(json_d, dict):
250 281
        raise DataImportError('Export file is invalid: not a dictionnary')
251 282

  
252
    for ou_d in json_d.get('ous', []):
253
        result.update_ous(*import_ou(ou_d))
254

  
255
    roles_ds = [RoleDeserializer(role_d, import_context) for role_d in json_d.get('roles', [])
256
                if not role_d['slug'].startswith('_')]
283
    if import_context.import_ous:
284
        for ou_d in json_d.get('ous', []):
285
            result.update_ous(*import_ou(ou_d))
257 286

  
258
    for ds in roles_ds:
259
        result.update_roles(*ds.deserialize())
287
    if import_context.import_roles:
288
        roles_ds = [RoleDeserializer(role_d, import_context) for role_d in json_d.get('roles', [])
289
                    if not role_d['slug'].startswith('_')]
260 290

  
261
    if import_context.role_attributes_update:
262 291
        for ds in roles_ds:
263
            result.update_attributes(*ds.attributes())
292
            result.update_roles(*ds.deserialize())
264 293

  
265
    if import_context.role_parentings_update:
266
        for ds in roles_ds:
267
            result.update_parentings(*ds.parentings())
294
        if import_context.role_attributes_update:
295
            for ds in roles_ds:
296
                result.update_attributes(*ds.attributes())
268 297

  
269
    if import_context.role_permissions_update:
270
        for ds in roles_ds:
271
            result.update_permissions(*ds.permissions())
272

  
273
    if import_context.ou_delete_orphans:
274
        raise DataImportError(
275
            "Unsupported context value for ou_delete_orphans : %s" % (
276
                import_context.ou_delete_orphans))
277

  
278
    if import_context.role_delete_orphans:
279
        # FIXME : delete each role that is in DB but not in the export
280
        raise DataImportError(
281
            "Unsupported context value for role_delete_orphans : %s" % (
282
                import_context.role_delete_orphans))
298
        if import_context.role_parentings_update:
299
            for ds in roles_ds:
300
                result.update_parentings(*ds.parentings())
301

  
302
        if import_context.role_permissions_update:
303
            for ds in roles_ds:
304
                result.update_permissions(*ds.permissions())
305

  
306
        if import_context.ou_delete_orphans:
307
            raise DataImportError(
308
                "Unsupported context value for ou_delete_orphans : %s" % (
309
                    import_context.ou_delete_orphans))
310

  
311
        if import_context.role_delete_orphans:
312
            # FIXME : delete each role that is in DB but not in the export
313
            raise DataImportError(
314
                "Unsupported context value for role_delete_orphans : %s" % (
315
                    import_context.role_delete_orphans))
283 316

  
284 317
    return result
tests/test_data_transfer.py
1
import json
2

  
3 1
from django_rbac.utils import get_role_model, get_ou_model
4
import py
5 2
import pytest
6 3

  
7 4
from authentic2.a2_rbac.models import RoleParenting
8 5
from authentic2.data_transfer import (
9
    DataImportError, export_roles, import_site, export_ou, ImportContext,
10
    RoleDeserializer, search_role, import_ou)
6
    export_site,
7
    ExportContext,
8
    DataImportError,
9
    export_roles,
10
    import_site,
11
    export_ous,
12
    ImportContext,
13
    RoleDeserializer,
14
    search_role,
15
    import_ou)
11 16
from authentic2.utils import get_hex_uuid
12 17

  
13 18

  
......
18 23
def test_export_basic_role(db):
19 24
    role = Role.objects.create(name='basic role', slug='basic-role', uuid=get_hex_uuid())
20 25
    query_set = Role.objects.filter(uuid=role.uuid)
21
    roles = export_roles(query_set)
26
    roles = export_roles(ExportContext(role_qs=query_set))
22 27
    assert len(roles) == 1
23 28
    role_dict = roles[0]
24 29
    for key, value in role.export_json().items():
......
40 45
    child_role.add_parent(parent_2_role)
41 46

  
42 47
    query_set = Role.objects.filter(slug__startswith='test').order_by('slug')
43
    roles = export_roles(query_set)
48
    roles = export_roles(ExportContext(role_qs=query_set))
44 49
    assert len(roles) == 4
45 50

  
46 51
    child_role_dict = roles[0]
......
68 73
    assert parents[0]['slug'] == grand_parent_role.slug
69 74

  
70 75

  
71
def test_export_ou(db):
76
def test_export_ous(db):
72 77
    ou = OU.objects.create(name='ou name', slug='ou-slug', description='ou description')
73
    ous = export_ou(OU.objects.filter(name='ou name'))
78
    ous = export_ous(ExportContext(ou_qs=OU.objects.filter(name='ou name')))
74 79
    assert len(ous) == 1
75 80
    ou_d = ous[0]
76 81
    assert ou_d['name'] == ou.name
......
137 142
    rd = RoleDeserializer({
138 143
        'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'role description',
139 144
        'slug': 'some-role', 'ou': {'slug': 'some-ou'}, 'service': None},
140
            ImportContext())
145
        ImportContext())
141 146
    with pytest.raises(DataImportError):
142 147
        rd.deserialize()
143 148

  
......
469 474
    assert len(res.ous['created']) == 0
470 475
    assert num_ous == OU.objects.count()
471 476
    assert ou == OU.objects.get(uuid=uuid)
477

  
478

  
479
def test_import_context_flags(db):
480
    ous = [{'uuid': get_hex_uuid(), 'slug': 'ou-slug', 'name': 'ou name'}]
481
    roles = [{
482
        'name': 'other role',
483
        'slug': 'other-role-slug',
484
        'uuid': get_hex_uuid(),
485
        'ou': {'slug': 'ou-slug'},
486
    }]
487
    d = {'ous': ous, 'roles': roles}
488
    import_site(d, ImportContext(import_roles=False, import_ous=False))
489
    assert Role.objects.exclude(slug__startswith='_').count() == 0
490
    assert OU.objects.exclude(slug='default').count() == 0
491
    with pytest.raises(DataImportError) as e:
492
        import_site(d, ImportContext(import_roles=True, import_ous=False))
493
    assert 'missing Organizational' in e.value.args[0]
494
    assert Role.objects.exclude(slug__startswith='_').count() == 0
495
    assert OU.objects.exclude(slug='default').count() == 0
496
    import_site(d, ImportContext(import_roles=False, import_ous=True))
497
    assert Role.objects.exclude(slug__startswith='_').count() == 0
498
    assert OU.objects.exclude(slug='default').count() == 1
499
    import_site(d, ImportContext(import_roles=True, import_ous=True))
500
    assert Role.objects.exclude(slug__startswith='_').count() == 1
501
    assert OU.objects.exclude(slug='default').count() == 1
502

  
503

  
504
def test_export_site(db):
505
    ou = OU.objects.create(name='ou')
506
    role = Role.objects.create(name='role', ou=ou)
507
    d = export_site()
508
    assert len([ou for ou in d['ous'] if ou['slug'] != 'default']) == 1
509
    assert len([role for role in d['roles'] if role['slug'][0] != '_']) == 1
510
    d = export_site(ExportContext(ou_qs=OU.objects.filter(name='ou')))
511
    assert len(d['ous']) == 1
512
    d = export_site(ExportContext(role_qs=Role.objects.filter(name='role')))
513
    assert len(d['roles']) == 1
514
    d = export_site(ExportContext(export_roles=False))
515
    assert 'roles' not in d
516
    d = export_site(ExportContext(export_ous=False))
517
    assert 'ous' not in d
518

  
472
-