Projet

Général

Profil

0001-data_transfer-add-export-context-29162.patch

Benjamin Dauvergne, 18 décembre 2018 15:23

Télécharger (10,4 ko)

Voir les différences:

Subject: [PATCH 1/3] data_transfer: add export context (#29162)

 src/authentic2/data_transfer.py | 121 ++++++++++++++++++++------------
 tests/test_data_transfer.py     |  50 ++++++++++---
 2 files changed, 117 insertions(+), 54 deletions(-)
src/authentic2/data_transfer.py
2 2

  
3 3
from django_rbac.models import Operation
4 4
from django_rbac.utils import (
5
    get_ou_model,  get_role_model, get_role_parenting_model, get_permission_model)
5
    get_ou_model, get_role_model, get_role_parenting_model, get_permission_model)
6 6
from authentic2.a2_rbac.models import RoleAttribute
7 7
from authentic2.utils import update_model
8 8

  
9 9

  
10
def export_site():
11
    return {
12
        'roles': export_roles(get_role_model().objects.all()),
13
        'ous': export_ou(get_ou_model().objects.all())
14
    }
10
class ExportContext(object):
11
    _role_qs = None
12
    _ou_qs = None
13
    export_roles = None
14
    export_ous = None
15 15

  
16
    def __init__(self, role_qs=None, ou_qs=None, export_roles=True, export_ous=True):
17
        self._role_qs = role_qs
18
        self._ou_qs = ou_qs
19
        self.export_roles = export_roles
20
        self.export_ous = export_ous
16 21

  
17
def export_ou(ou_query_set):
18
    return [ou.export_json() for ou in ou_query_set]
22
    @property
23
    def role_qs(self):
24
        return self._role_qs or get_role_model().objects.all()
19 25

  
26
    @property
27
    def ou_qs(self):
28
        return self._ou_qs or get_ou_model().objects.all()
20 29

  
21
def export_roles(role_queryset):
30

  
31
def export_site(context=None):
32
    context = context or ExportContext()
33
    d = {}
34
    if context.export_roles:
35
        d['roles'] = export_roles(context)
36
    if context.export_ous:
37
        d['ous'] = export_ous(context)
38
    return d
39

  
40

  
41
def export_ous(context):
42
    return [ou.export_json() for ou in context.ou_qs]
43

  
44

  
45
def export_roles(context):
22 46
    """ Serialize roles in role_queryset
23 47
    """
24 48
    return [
25 49
        role.export_json(attributes=True, parents=True, permissions=True)
26
        for role in role_queryset
50
        for role in context.role_qs
27 51
    ]
28 52

  
29 53

  
......
66 90
    """
67 91

  
68 92
    def __init__(
69
            self, role_delete_orphans=False, role_parentings_update=True,
70
            role_permissions_update=True, role_attributes_update=True,
93
            self,
94
            import_roles=True,
95
            import_ous=True,
96
            role_delete_orphans=False,
97
            role_parentings_update=True,
98
            role_permissions_update=True,
99
            role_attributes_update=True,
71 100
            ou_delete_orphans=False):
101
        self.import_roles = import_roles
102
        self.import_ous = import_ous
72 103
        self.role_delete_orphans = role_delete_orphans
73 104
        self.ou_delete_orphans = ou_delete_orphans
74 105
        self.role_parentings_update = role_parentings_update
......
81 112

  
82 113

  
83 114
class RoleDeserializer(object):
84

  
85 115
    def __init__(self, d, import_context):
86 116
        self._import_context = import_context
87 117
        self._obj = None
......
106 136
        ou = None if not has_ou else search_ou(ou_d)
107 137
        if has_ou and not ou:
108 138
            raise DataImportError(
109
                    "Can't import role because missing Organizational Unit : "
110
                    "%s" % ou_d)
139
                "Can't import role because missing Organizational Unit : "
140
                "%s" % ou_d)
111 141

  
112 142
        kwargs = self._role_d.copy()
113
        del kwargs['ou']
114
        del kwargs['service']
143
        kwargs.pop('ou', None)
144
        kwargs.pop('service', None)
115 145
        if has_ou:
116 146
            kwargs['ou'] = ou
117 147

  
......
243 273
    return ou, status
244 274

  
245 275

  
246
def import_site(json_d, import_context):
276
def import_site(json_d, import_context=None):
277
    import_context = import_context or ImportContext()
247 278
    result = ImportResult()
248 279

  
249 280
    if not isinstance(json_d, dict):
250 281
        raise DataImportError('Export file is invalid: not a dictionnary')
251 282

  
252
    for ou_d in json_d.get('ous', []):
253
        result.update_ous(*import_ou(ou_d))
254

  
255
    roles_ds = [RoleDeserializer(role_d, import_context) for role_d in json_d.get('roles', [])
256
                if not role_d['slug'].startswith('_')]
283
    if import_context.import_ous:
284
        for ou_d in json_d.get('ous', []):
285
            result.update_ous(*import_ou(ou_d))
257 286

  
258
    for ds in roles_ds:
259
        result.update_roles(*ds.deserialize())
287
    if import_context.import_roles:
288
        roles_ds = [RoleDeserializer(role_d, import_context) for role_d in json_d.get('roles', [])
289
                    if not role_d['slug'].startswith('_')]
260 290

  
261
    if import_context.role_attributes_update:
262 291
        for ds in roles_ds:
263
            result.update_attributes(*ds.attributes())
292
            result.update_roles(*ds.deserialize())
264 293

  
265
    if import_context.role_parentings_update:
266
        for ds in roles_ds:
267
            result.update_parentings(*ds.parentings())
294
        if import_context.role_attributes_update:
295
            for ds in roles_ds:
296
                result.update_attributes(*ds.attributes())
268 297

  
269
    if import_context.role_permissions_update:
270
        for ds in roles_ds:
271
            result.update_permissions(*ds.permissions())
272

  
273
    if import_context.ou_delete_orphans:
274
        raise DataImportError(
275
            "Unsupported context value for ou_delete_orphans : %s" % (
276
                import_context.ou_delete_orphans))
277

  
278
    if import_context.role_delete_orphans:
279
        # FIXME : delete each role that is in DB but not in the export
280
        raise DataImportError(
281
            "Unsupported context value for role_delete_orphans : %s" % (
282
                import_context.role_delete_orphans))
298
        if import_context.role_parentings_update:
299
            for ds in roles_ds:
300
                result.update_parentings(*ds.parentings())
301

  
302
        if import_context.role_permissions_update:
303
            for ds in roles_ds:
304
                result.update_permissions(*ds.permissions())
305

  
306
        if import_context.ou_delete_orphans:
307
            raise DataImportError(
308
                "Unsupported context value for ou_delete_orphans : %s" % (
309
                    import_context.ou_delete_orphans))
310

  
311
        if import_context.role_delete_orphans:
312
            # FIXME : delete each role that is in DB but not in the export
313
            raise DataImportError(
314
                "Unsupported context value for role_delete_orphans : %s" % (
315
                    import_context.role_delete_orphans))
283 316

  
284 317
    return result
tests/test_data_transfer.py
1
import json
2

  
3 1
from django_rbac.utils import get_role_model, get_ou_model
4
import py
5 2
import pytest
6 3

  
7 4
from authentic2.a2_rbac.models import RoleParenting
8 5
from authentic2.data_transfer import (
9
    DataImportError, export_roles, import_site, export_ou, ImportContext,
10
    RoleDeserializer, search_role, import_ou)
6
    ExportContext,
7
    DataImportError,
8
    export_roles,
9
    import_site,
10
    export_ous,
11
    ImportContext,
12
    RoleDeserializer,
13
    search_role,
14
    import_ou)
11 15
from authentic2.utils import get_hex_uuid
12 16

  
13 17

  
......
18 22
def test_export_basic_role(db):
19 23
    role = Role.objects.create(name='basic role', slug='basic-role', uuid=get_hex_uuid())
20 24
    query_set = Role.objects.filter(uuid=role.uuid)
21
    roles = export_roles(query_set)
25
    roles = export_roles(ExportContext(role_qs=query_set))
22 26
    assert len(roles) == 1
23 27
    role_dict = roles[0]
24 28
    for key, value in role.export_json().items():
......
40 44
    child_role.add_parent(parent_2_role)
41 45

  
42 46
    query_set = Role.objects.filter(slug__startswith='test').order_by('slug')
43
    roles = export_roles(query_set)
47
    roles = export_roles(ExportContext(role_qs=query_set))
44 48
    assert len(roles) == 4
45 49

  
46 50
    child_role_dict = roles[0]
......
68 72
    assert parents[0]['slug'] == grand_parent_role.slug
69 73

  
70 74

  
71
def test_export_ou(db):
75
def test_export_ous(db):
72 76
    ou = OU.objects.create(name='ou name', slug='ou-slug', description='ou description')
73
    ous = export_ou(OU.objects.filter(name='ou name'))
77
    ous = export_ous(ExportContext(ou_qs=OU.objects.filter(name='ou name')))
74 78
    assert len(ous) == 1
75 79
    ou_d = ous[0]
76 80
    assert ou_d['name'] == ou.name
......
137 141
    rd = RoleDeserializer({
138 142
        'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'role description',
139 143
        'slug': 'some-role', 'ou': {'slug': 'some-ou'}, 'service': None},
140
            ImportContext())
144
        ImportContext())
141 145
    with pytest.raises(DataImportError):
142 146
        rd.deserialize()
143 147

  
......
469 473
    assert len(res.ous['created']) == 0
470 474
    assert num_ous == OU.objects.count()
471 475
    assert ou == OU.objects.get(uuid=uuid)
476

  
477

  
478
def test_import_context_flags(db):
479
    ous = [{'uuid': get_hex_uuid(), 'slug': 'ou-slug', 'name': 'ou name'}]
480
    roles = [{
481
        'name': 'other role',
482
        'slug': 'other-role-slug',
483
        'uuid': get_hex_uuid(),
484
        'ou': {'slug': 'ou-slug'},
485
    }]
486
    d = {'ous': ous, 'roles': roles}
487
    import_site(d, ImportContext(import_roles=False, import_ous=False))
488
    assert Role.objects.exclude(slug__startswith='_').count() == 0
489
    assert OU.objects.exclude(slug='default').count() == 0
490
    with pytest.raises(DataImportError) as e:
491
        import_site(d, ImportContext(import_roles=True, import_ous=False))
492
    assert 'missing Organizational' in e.value.args[0]
493
    assert Role.objects.exclude(slug__startswith='_').count() == 0
494
    assert OU.objects.exclude(slug='default').count() == 0
495
    import_site(d, ImportContext(import_roles=False, import_ous=True))
496
    assert Role.objects.exclude(slug__startswith='_').count() == 0
497
    assert OU.objects.exclude(slug='default').count() == 1
498
    import_site(d, ImportContext(import_roles=True, import_ous=True))
499
    assert Role.objects.exclude(slug__startswith='_').count() == 1
500
    assert OU.objects.exclude(slug='default').count() == 1
501

  
472
-