Projet

Général

Profil

0001-add-roles-import-export-command-16514.patch

Josué Kouka, 29 septembre 2017 09:58

Télécharger (21,1 ko)

Voir les différences:

Subject: [PATCH] add roles import/export command (#16514)

 src/authentic2/a2_rbac/models.py                   |  65 +++++
 src/authentic2/management/commands/export-roles.py |  26 ++
 src/authentic2/management/commands/import-roles.py |  27 ++
 src/authentic2/models.py                           |  13 +
 src/authentic2/utils.py                            |  63 ++++-
 tests/test_import_export.py                        | 310 +++++++++++++++++++++
 6 files changed, 503 insertions(+), 1 deletion(-)
 create mode 100644 src/authentic2/management/commands/export-roles.py
 create mode 100644 src/authentic2/management/commands/import-roles.py
 create mode 100644 tests/test_import_export.py
src/authentic2/a2_rbac/models.py
10 10
                                CHANGE_OP, Operation)
11 11
from django_rbac import utils as rbac_utils
12 12

  
13
from authentic2.utils import get_object_by_main_attr, ImportExportError
14

  
13 15
try:
14 16
    from django.contrib.contenttypes.fields import GenericForeignKey, \
15 17
        GenericRelation
......
195 197
            'ou__slug': self.ou.slug if self.ou else None,
196 198
        }
197 199

  
200
    @classmethod
201
    def import_json(cls, data):
202
        from authentic2.models import Service
203
        ou_json = data.pop('ou', {})
204
        service_json = data.pop('service', {})
205
        attributes_json = data.pop('attributes', {})
206

  
207
        # if not ou stop import
208
        ou = get_object_by_main_attr(OrganizationalUnit, **ou_json)
209
        if not ou:
210
            raise ImportExportError('OU (%(uuid)s, %(slug)s, %(name)s) does not exist.' % ou_json)
211
        # get role's service or raise an exception if none is found
212
        service = None
213
        if service_json:
214
            service_json.pop('ou')
215
            service = get_object_by_main_attr(Service, **service_json)
216
            if not service:
217
                raise ImportExportError('Service (%(slug)s, %(name)s) does not exist.' % service_json)
218

  
219
        kwargs = {'uuid': data['uuid'], 'slug': data['slug'], 'name': data['name']}
220
        role, created = cls.objects.get_or_create(ou=ou, **kwargs)
221
        # set service
222
        if service:
223
            service.roles.add(role)
224
        # set attributes
225
        for attr in attributes_json:
226
            attribute, created = RoleAttribute.objects.get_or_create(
227
                role=role, name=attr['name'], kind=attr['kind'], defaults={
228
                    'value': attr['value']})
229
            if created:
230
                attribute.value = attr['value']
231
                attribute.save()
232

  
233
    def export_json(self):
234
        data = {
235
            'uuid': self.uuid,
236
            'slug': self.slug,
237
            'name': self.name,
238
        }
239
        if self.ou:
240
            data['ou'] = {
241
                'uuid': self.ou.uuid,
242
                'name': self.ou.name,
243
                'slug': self.ou.slug
244
            }
245

  
246
        if self.service:
247
            data['service'] = self.service.export_json()
248

  
249
        attributes = []
250
        for attr in self.attributes.all():
251
            attributes.append({
252
                'name': attr.name, 'kind': attr.kind,
253
                'value': attr.value})
254
        data['attributes'] = attributes
255

  
256
        parents = []
257
        for parent in self.parents(include_self=False):
258
            parents.append({
259
                'uuid': parent.uuid, 'slug': parent.slug, 'name': parent.name})
260
        data['parents'] = parents
261
        return data
262

  
198 263

  
199 264
class RoleParenting(RoleParentingAbstractBase):
200 265
    class Meta(RoleParentingAbstractBase.Meta):
src/authentic2/management/commands/export-roles.py
1
import json
2
from optparse import make_option
3
import sys
4

  
5
from django.core.management import BaseCommand
6

  
7
from authentic2.utils import export_roles
8

  
9

  
10
class Command(BaseCommand):
11
    help = 'Export roles as json'
12

  
13
    args = '<filename>'
14

  
15
    option_list = BaseCommand.option_list + (
16
        make_option('--ou', dest='ou', default=None, type=str,
17
                    help='restrict to the organizational unit slug'),)
18

  
19
    def handle(self, *args, **options):
20
        if len(args) < 1:
21
            output = sys.stdout
22
        else:
23
            output = open(args[0], 'w')
24

  
25
        data = export_roles(ou_slug=options['ou'])
26
        json.dump(data, output, encoding='utf-8', indent=4)
src/authentic2/management/commands/import-roles.py
1
import json
2
from optparse import make_option
3

  
4
from django.core.management.base import BaseCommand, CommandError
5

  
6
from authentic2.utils import import_roles, ImportExportError
7

  
8

  
9
class Command(BaseCommand):
10
    help = 'Import roles from json file'
11

  
12
    args = '<filename>'
13

  
14
    option_list = BaseCommand.option_list + (
15
        make_option('--ou', dest='ou', default=None, type=str,
16
                    help='restrict to the organizational unit slug'),
17
        make_option('--stop-on-absent-parent', action='store_true', dest='stop_absent_parent', default=False,
18
                    help='stop if parent is absent')
19
    )
20

  
21
    def handle(self, *args, **options):
22
        if args:
23
            fd = open(args[0])
24
            try:
25
                import_roles(json.load(fd), **options)
26
            except(ImportExportError,) as exc:
27
                raise CommandError(exc.message)
src/authentic2/models.py
399 399
            'roles': [role.to_json() for role in roles],
400 400
        }
401 401

  
402
    @classmethod
403
    def import_json(cls, data):
404
        ou = data.pop('ou')
405
        cls.objects.get_or_create(ou=ou, **data)
406

  
407
    def export_json(self):
408
        return {
409
            'name': self.name, 'slug': self.slug,
410
            'ou': {
411
                'uuid': self.ou.uuid, 'slug': self.ou.slug, 'name': self.ou.name
412
            }
413
        }
414

  
402 415

  
403 416
class AuthorizedRole(models.Model):
404 417
    service = models.ForeignKey(Service, on_delete=models.CASCADE)
src/authentic2/utils.py
16 16

  
17 17
import django
18 18
from django.conf import settings
19
from django.db import transaction
19 20
from django.http import HttpResponseRedirect, HttpResponse
20
from django.core.exceptions import ImproperlyConfigured, PermissionDenied
21
from django.core.exceptions import ImproperlyConfigured, PermissionDenied, FieldError
21 22
from django.http.request import QueryDict
22 23
from django.contrib.auth import (REDIRECT_FIELD_NAME, login as auth_login, SESSION_KEY,
23 24
                                 HASH_SESSION_KEY, BACKEND_SESSION_KEY, authenticate)
......
942 943
    user = copy.deepcopy(user)
943 944
    user.backend = backend
944 945
    return login(request, user, method, **kwargs)
946

  
947

  
948
class ImportExportError(Exception):
949
    pass
950

  
951

  
952
def get_object_by_main_attr(obj, uuid=None, slug=None, name=None, create=False):
953
    """Sequentially try to get an object by uuid, slug, then name.
954
    Creates one if <create> is True
955
    """
956
    try:
957
        return obj.objects.get(uuid=uuid)
958
    except (obj.DoesNotExist, FieldError):
959
        try:
960
            return obj.objects.get(slug=slug)
961
        except (obj.DoesNotExist, FieldError):
962
            try:
963
                return obj.objects.get(name=name)
964
            except (obj.DoesNotExist, FieldError):
965
                pass
966
    if create:
967
        return obj.objects.create(uuid=uuid, slug=slug, name=name)
968
    return None
969

  
970

  
971
def export_roles(ou_slug=None):
972
    from django_rbac.utils import get_role_model
973
    Role = get_role_model()
974
    filters = {'slug__startswith': '_'}
975
    if ou_slug:
976
        roles = Role.objects.filter(ou__slug=ou_slug).exclude(**filters)
977
    else:
978
        roles = Role.objects.exclude(**filters)
979
    return [role.export_json() for role in roles]
980

  
981

  
982
def import_roles(data, **options):
983
    from django_rbac.utils import get_role_model
984

  
985
    Role = get_role_model()
986
    ou_slug = options.pop('ou')
987
    stop_absent_parent = options.pop('stop_absent_parent')
988

  
989
    if ou_slug:
990
        data = [datum for datum in data if datum['ou']['slug'] == ou_slug]
991
    with transaction.atomic():
992
        for role_json in data:
993
            Role.import_json(role_json)
994

  
995
        # once all roles are created, set their relationships
996
        for role_json in data:
997
            role = Role.objects.get(uuid=role_json['uuid'])
998
            for parent_json in role_json.get('parents', []):
999
                parent = get_object_by_main_attr(Role, **parent_json)
1000
                if parent:
1001
                    role.add_parent(parent)
1002
                else:
1003
                    if stop_absent_parent:
1004
                        raise ImportExportError(
1005
                            'Parent role (%(uuid)s, %(slug)s, %(name)s does not exist.' % parent_json)
tests/test_import_export.py
1
# -*- coding: utf-8 -*-
2

  
3
import json
4

  
5
import pytest
6

  
7
from django.core.management import call_command, CommandError
8
from django.contrib.auth import get_user_model
9
from django_rbac.utils import get_role_model, get_ou_model
10

  
11
from authentic2.models import Attribute, UserExternalId, Service
12
from authentic2.a2_rbac.models import RoleAttribute
13

  
14
pytestmark = pytest.mark.django_db
15

  
16

  
17
def create_user(**kwargs):
18
    User = get_user_model()
19
    Role = get_role_model()
20

  
21
    attributes = kwargs.pop('attributes')
22
    roles = kwargs.pop('roles')
23
    password = kwargs.pop('password', None) or kwargs['username']
24
    external_ids = kwargs.pop('external_ids', [])
25

  
26
    user = User.objects.create(**kwargs)
27

  
28
    if password:
29
        user.set_password(password)
30
        user.save()
31

  
32
    for key in attributes:
33
        Attribute.objects.get(name=key).set_value(user, attributes[key])
34

  
35
    for role in roles:
36
        user.roles.add(Role.objects.get(slug=role))
37

  
38
    for eid in external_ids:
39
        UserExternalId.objects.create(user=user, source=eid['source'],
40
                                      external_id=eid['external_id'])
41

  
42
    return user
43

  
44

  
45
# FIXTURES
46

  
47
@pytest.fixture(scope='session')
48
def tmp_export_dir(tmpdir_factory):
49
    return tmpdir_factory.mktemp('export')
50

  
51

  
52
@pytest.fixture
53
def narnia_ou(db):
54
    OU = get_ou_model()
55

  
56
    return OU.objects.create(
57
        uuid='6b0d73622a694ca7a0440cbc809487f7',
58
        name='Narnia', slug='narnia')
59

  
60

  
61
@pytest.fixture
62
def weird_ou(db):
63
    OU = get_ou_model()
64

  
65
    return OU.objects.create(
66
        uuid='aca57fbf08e1406c86900c86b335d004',
67
        slug='weird',
68
        name='Weird'
69
    )
70

  
71

  
72
@pytest.fixture
73
def narnia_roles(db, narnia_ou, weird_ou):
74

  
75
    Role = get_role_model()
76

  
77
    narnia_service = Service.objects.create(name='f.i.b', slug='fib', ou=narnia_ou)
78

  
79
    emails = {
80
        'narnia-agents': json.dumps(['smith@narnia.fan', 'coolsen@narnia.fan', 'fury@narnia.fan']),
81
        'narnia-enfance': json.dumps(['alice@narnia.fan']),
82
        'narnia-vip': json.dumps(['kirk@narnia.fan']),
83
        'narnia-dsi': json.dumps(['sheldon@narnia.fan', 'eliot@narnia.fan']),
84
        'narnia-culture': json.dumps(['chelsea@narnia.fan']),
85
    }
86

  
87
    roles = [
88
        {'uuid': 'e0f821033649475abf70448350796676', 'slug': 'narnia-agents', 'name': 'Narnia Agents'},
89
        {'uuid': 'f61a77e094894bd089039afb4f5ce64b', 'slug': 'narnia-enfance', 'name': 'Narnia Enfance'},
90
        {'uuid': '63516d5f7d444e63b639325d7b53fd3a', 'slug': 'narnia-vip', 'name': 'Narnia VIP'},
91
        {'uuid': 'f091a140421949208b1c1ebf78d15b35', 'slug': 'narnia-dsi', 'name': 'Narnia DSI'},
92
        {'uuid': '535583de8f544c7f9eb46725c72acf0a', 'slug': 'narnia-culture', 'name': 'Narnia Culture'},
93
    ]
94

  
95
    for role in roles:
96
        ou = weird_ou if role['slug'] == 'narnia-vip' else narnia_ou
97
        Role.objects.create(ou=ou, **role)
98

  
99
    concerned_roles = Role.objects.exclude(slug__startswith='_a2')
100

  
101
    for role in concerned_roles:
102
        RoleAttribute.objects.create(
103
            role=role, name='emails', kind='json',
104
            value=emails[role.slug]
105
        )
106

  
107
        RoleAttribute.objects.create(
108
            role=role, name='emails_to_members', kind='json', value=False)
109

  
110
    agents = Role.objects.get(slug='narnia-agents')
111

  
112
    for slug in ['narnia-enfance', 'narnia-culture']:
113
        Role.objects.get(slug=slug).add_parent(agents)
114

  
115
    role_dsi = Role.objects.get(slug='narnia-dsi')
116
    role_dsi.service = narnia_service
117
    role_dsi.save()
118

  
119

  
120
@pytest.fixture
121
def narnia_attributes(db):
122
    attributes = [
123
        {'name': 'street', 'label': 'street', 'kind': 'string'},
124
        {'name': 'city', 'label': 'city', 'kind': 'string'},
125
        {'name': 'zip_code', 'label': 'zip_code', 'kind': 'string'},
126
    ]
127

  
128
    for attr in attributes:
129
        Attribute.objects.create(**attr)
130

  
131

  
132
@pytest.fixture
133
def narnia_users(narnia_ou, weird_ou, narnia_roles, narnia_attributes):
134

  
135
    users = [
136
        {'username': 'josh', 'email': 'josh@loking.fan',
137
            'attributes': {'street': '302 Main Street', 'city': 'Vancouver', 'zip_code': 'V5K 0A6'},
138
            'roles': ['narnia-vip', 'narnia-agents', 'narnia-dsi'],
139
            'external_ids': [{'source': 'dedsec', 'external_id': 'r3tr0'},
140
                             {'source': 'nuddle', 'external_id': 'horatio'}]},
141
        {'username': 'kim', 'email': 'kim@loking.fan',
142
            'attributes': {'street': '37 Bvd Henry Orion', 'city': 'Nantes', 'zip_code': '44000'},
143
            'roles': ['narnia-enfance']},
144
        {'username': 'chelsea', 'email': 'chelsea@loking.fan',
145
            'attributes': {'street': '740 Studebaker Dr.', 'city': 'Baton Rouge', 'zip_code': '70806'},
146
            'roles': ['narnia-enfance']},
147
        {'username': 'mandla', 'email': 'mandla@loking.fan',
148
            'attributes': {'street': '497 Jacob Mare Street', 'city': 'Pretoria', 'zip_code': '0001'},
149
            'roles': ['narnia-culture']},
150
        {'username': 'tux', 'email': 'tux@linux.org',
151
            'attributes': {},
152
            'roles': ['narnia-vip']},
153
    ]
154

  
155
    for user in users:
156
        create_user(ou=narnia_ou, **user)
157

  
158
    others = [
159
        {'username': 'éric', 'email': 'eric@loking.fan',
160
            'attributes': {'street': '342 Lincon Blvd', 'city': 'Los Angeles', 'zip_code': '90291'},
161
            'roles': ['narnia-vip', 'narnia-agents', 'narnia-dsi']},
162
        {'username': 'hannibal', 'email': 'hannibal@loking.fan',
163
            'attributes': {'street': '12 5th Ave', 'city': 'New York', 'zip_code': '10002'},
164
            'roles': ['narnia-enfance']},
165
    ]
166

  
167
    for other in others:
168
        create_user(ou=weird_ou, **other)
169

  
170

  
171
# TESTS
172

  
173
def test_roles_import_export(tmp_export_dir, narnia_roles):
174
    dest_file = tmp_export_dir.join("roles.json").strpath
175
    call_command('export-roles', dest_file)
176
    data = json.load(file(dest_file))
177

  
178
    assert len(data) == 5
179

  
180
    for datum in data:
181
        if datum['slug'] in ['narnia-enfance', 'narnia-culture']:
182
            assert len(datum['parents']) == 1
183
            assert datum['parents'][0]['uuid'] == 'e0f821033649475abf70448350796676'
184
            assert datum['parents'][0]['slug'] == 'narnia-agents'
185

  
186
        if datum['slug'] == 'narnia-agents':
187
            assert len(datum['parents']) == 0
188
            assert datum['uuid'] == 'e0f821033649475abf70448350796676'
189

  
190
            assert len(datum['attributes']) == 2
191
            for attr in datum['attributes']:
192
                if attr['name'] == 'emails_to_members':
193
                    continue
194

  
195
                assert attr['kind'] == 'json'
196
                assert attr["value"] == '["smith@narnia.fan", "coolsen@narnia.fan", "fury@narnia.fan"]'
197

  
198
        if datum['slug'] == 'narnia-vip':
199
            assert datum['ou']['slug'] == 'weird'
200
            assert len(datum['attributes']) == 2
201
            for attr in datum['attributes']:
202
                if attr['name'] == 'emails':
203
                    assert attr['value'] == '["kirk@narnia.fan"]'
204
                else:
205
                    assert attr['value'] == "False"
206

  
207
    # import roles
208
    Role = get_role_model()
209
    Role.objects.exclude(slug__startswith='_').delete()
210
    call_command('import-roles', dest_file)
211

  
212
    roles = Role.objects.exclude(slug__startswith='_')
213

  
214
    assert len(roles) == 5
215

  
216
    for role in roles:
217
        if role.slug == 'narnia-vip':
218
            continue
219

  
220
        assert role.ou.uuid == '6b0d73622a694ca7a0440cbc809487f7'
221
        assert role.ou.slug == 'narnia'
222
        assert role.ou.name == 'Narnia'
223

  
224
    agent_role = roles.get(slug='narnia-agents')
225

  
226
    assert agent_role.uuid == 'e0f821033649475abf70448350796676'
227
    assert agent_role.name == 'Narnia Agents'
228

  
229
    assert len(agent_role.parents(include_self=False)) == 0
230

  
231
    for attr in agent_role.attributes.all():
232
        assert attr.kind == 'json'
233
        if attr.name == 'emails':
234
            assert attr.value == u'["smith@narnia.fan", "coolsen@narnia.fan", "fury@narnia.fan"]'
235

  
236
    enfance_role = roles.get(slug='narnia-enfance')
237

  
238
    assert enfance_role.uuid == 'f61a77e094894bd089039afb4f5ce64b'
239
    assert enfance_role.name == 'Narnia Enfance'
240
    assert len(enfance_role.parents(include_self=False)) == 1
241
    assert agent_role.child_relation.filter(child__slug='narnia-enfance').exists() is True
242
    assert Role.objects.exclude(slug__startswith='_').count() == 5
243

  
244
    # re-import the same roles and make sure the number or role is the same
245
    call_command('import-roles', dest_file)
246
    assert Role.objects.exclude(slug__startswith='_').count() == 5
247

  
248

  
249
def test_roles_import_export_by_ou_slug(tmp_export_dir, narnia_roles):
250
    dest_file = tmp_export_dir.join("roles_ou.json").strpath
251
    call_command('export-roles', dest_file, ou='narnia')
252
    data = json.load(file(dest_file))
253
    assert len(data) == 4
254

  
255
    # test import
256
    call_command('export-roles', dest_file, ou='weird')
257
    Role = get_role_model()
258
    Role.objects.exclude(slug__startswith='_').delete()
259

  
260
    dest_file = tmp_export_dir.join('roles_ou.json').strpath
261
    call_command('import-roles', dest_file, ou='weird')
262

  
263
    roles = Role.objects.exclude(slug__startswith='_')
264
    assert len(roles) == 1
265
    for role in roles:
266
        role.uuid = 'aca57fbf08e1406c86900c86b335d004'
267
        role.slug = 'weird'
268
        role.name = 'Weird'
269
    assert Role.objects.exclude(slug__startswith='_').count() == 1
270

  
271

  
272
def test_roles_import_export_with_missing_ou(tmp_export_dir, narnia_roles):
273
    dest_file = tmp_export_dir.join("roles.json").strpath
274
    call_command('export-roles', dest_file)
275
    Role = get_role_model()
276
    Role.objects.exclude(slug__startswith='_').delete()
277
    get_ou_model().objects.get(slug='narnia').delete()
278
    with pytest.raises(CommandError) as excinfo:
279
        call_command('import-roles', dest_file)
280
    excinfo.value.message == 'OU (6b0d73622a694ca7a0440cbc809487f7, narnia, Narnia) does not exist.'
281
    assert Role.objects.exclude(slug__startswith='_').count() == 0
282

  
283

  
284
def test_roles_import_export_with_missing_service(tmp_export_dir, narnia_roles):
285
    dest_file = tmp_export_dir.join("roles.json").strpath
286
    call_command('export-roles', dest_file)
287
    Role = get_role_model()
288
    Role.objects.exclude(slug__startswith='_').delete()
289
    Service.objects.first().delete()
290
    with pytest.raises(CommandError) as excinfo:
291
        call_command('import-roles', dest_file)
292
    excinfo.value.message == 'Service (f.i.b, fib) does not exist.'
293
    assert Role.objects.exclude(slug__startswith='_').count() == 0
294

  
295

  
296
def test_roles_import_export_with_missing_parent_role(tmp_export_dir, narnia_roles, narnia_ou):
297
    Role = get_role_model()
298
    dest_file = tmp_export_dir.join("roles.json").strpath
299
    role_tmp = Role.objects.create(uuid='tmp', slug='_tmp', name='tmp', ou=narnia_ou)
300
    Role.objects.get(slug='narnia-agents').add_parent(role_tmp)
301
    call_command('export-roles', dest_file)
302
    Role.objects.exclude(slug__startswith='_').delete()
303
    role_tmp.delete()
304
    with pytest.raises(CommandError) as excinfo:
305
        call_command('import-roles', dest_file, stop_absent_parent=True)
306
    excinfo.value.message == 'Parent role (tmp, _tmp, tmp) does not exist.'
307
    assert Role.objects.exclude(slug__startswith='_').count() == 0
308
    # while ignoring absent parent
309
    call_command('import-roles', dest_file)
310
    assert Role.objects.exclude(slug__startswith='_').count() == 5
0
-