Projet

Général

Profil

0001-add-roles-import-export-command-16514.patch

Josué Kouka, 04 octobre 2017 17:18

Télécharger (23,2 ko)

Voir les différences:

Subject: [PATCH] add roles import/export command (#16514)

 src/authentic2/a2_rbac/models.py                   |  36 ++-
 src/authentic2/management/commands/export-roles.py |  26 ++
 src/authentic2/management/commands/import-roles.py |  27 ++
 src/authentic2/models.py                           |   4 +-
 src/authentic2/utils.py                            | 133 ++++++++-
 tests/test_import_export.py                        | 304 +++++++++++++++++++++
 6 files changed, 525 insertions(+), 5 deletions(-)
 create mode 100644 src/authentic2/management/commands/export-roles.py
 create mode 100644 src/authentic2/management/commands/import-roles.py
 create mode 100644 tests/test_import_export.py
src/authentic2/a2_rbac/models.py
10 10
                                CHANGE_OP, Operation)
11 11
from django_rbac import utils as rbac_utils
12 12

  
13
from authentic2.utils import ImportExportMixin
14

  
13 15
try:
14 16
    from django.contrib.contenttypes.fields import GenericForeignKey, \
15 17
        GenericRelation
......
21 23
from . import managers, fields
22 24

  
23 25

  
24
class OrganizationalUnit(OrganizationalUnitAbstractBase):
26
class OrganizationalUnit(OrganizationalUnitAbstractBase, ImportExportMixin):
25 27
    username_is_unique = models.BooleanField(
26 28
        blank=True,
27 29
        default=False,
......
91 93
                                   object_id_field='admin_scope_id')
92 94

  
93 95

  
94
class Role(RoleAbstractBase):
96
class Role(RoleAbstractBase, ImportExportMixin):
95 97
    admin_scope_ct = models.ForeignKey(
96 98
        to='contenttypes.ContentType',
97 99
        null=True,
......
195 197
            'ou__slug': self.ou.slug if self.ou else None,
196 198
        }
197 199

  
200
    @classmethod
201
    def import_json(cls, data):
202
        role = super(Role, cls).import_json(data)
203
        # set attributes
204
        attributes_json = data.pop('attributes', {})
205
        for attr in attributes_json:
206
            attribute, created = RoleAttribute.objects.get_or_create(
207
                role=role, name=attr['name'], kind=attr['kind'], defaults={
208
                    'value': attr['value']})
209
            if created:
210
                attribute.value = attr['value']
211
                attribute.save()
212
        return role
213

  
214
    def export_json(self):
215
        data = super(Role, self).export_json()
216
        attributes = []
217
        for attr in self.attributes.all():
218
            attributes.append({
219
                'name': attr.name, 'kind': attr.kind,
220
                'value': attr.value})
221
        data['attributes'] = attributes
222

  
223
        parents = []
224
        for parent in self.parents(include_self=False):
225
            parents.append({
226
                'uuid': parent.uuid, 'slug': parent.slug, 'name': parent.name})
227
        data['parents'] = parents
228
        return data
229

  
198 230

  
199 231
class RoleParenting(RoleParentingAbstractBase):
200 232
    class Meta(RoleParentingAbstractBase.Meta):
src/authentic2/management/commands/export-roles.py
1
import json
2
from optparse import make_option
3
import sys
4

  
5
from django.core.management import BaseCommand
6

  
7
from authentic2.utils import export_roles
8

  
9

  
10
class Command(BaseCommand):
11
    help = 'Export roles as json'
12

  
13
    args = '<filename>'
14

  
15
    option_list = BaseCommand.option_list + (
16
        make_option('--ou', dest='ou', default=None, type=str,
17
                    help='restrict to the organizational unit slug'),)
18

  
19
    def handle(self, *args, **options):
20
        if len(args) < 1:
21
            output = sys.stdout
22
        else:
23
            output = open(args[0], 'w')
24

  
25
        data = export_roles(ou_slug=options['ou'])
26
        json.dump(data, output, encoding='utf-8', indent=4)
src/authentic2/management/commands/import-roles.py
1
import json
2
from optparse import make_option
3

  
4
from django.core.management.base import BaseCommand, CommandError
5

  
6
from authentic2.utils import import_roles, ImportExportError
7

  
8

  
9
class Command(BaseCommand):
10
    help = 'Import roles from json file'
11

  
12
    args = '<filename>'
13

  
14
    option_list = BaseCommand.option_list + (
15
        make_option('--ou', dest='ou', default=None, type=str,
16
                    help='restrict to the organizational unit slug'),
17
        make_option('--stop-on-absent-parent', action='store_true', dest='stop_absent_parent', default=False,
18
                    help='stop if parent is absent')
19
    )
20

  
21
    def handle(self, *args, **options):
22
        if args:
23
            fd = open(args[0])
24
            try:
25
                import_roles(json.load(fd), **options)
26
            except(ImportExportError,) as exc:
27
                raise CommandError(exc.message)
src/authentic2/models.py
23 23
from django.contrib.contenttypes.models import ContentType
24 24

  
25 25
from . import managers
26
from .utils import ServiceAccessDenied
26
from .utils import ServiceAccessDenied, ImportExportMixin
27 27

  
28 28

  
29 29
class DeletedUser(models.Model):
......
310 310
        return unicode(self.user)
311 311

  
312 312

  
313
class Service(models.Model):
313
class Service(models.Model, ImportExportMixin):
314 314
    name = models.CharField(
315 315
        verbose_name=_('name'),
316 316
        max_length=128)
src/authentic2/utils.py
16 16

  
17 17
import django
18 18
from django.conf import settings
19
from django.db import transaction
20
from django.db import models
19 21
from django.http import HttpResponseRedirect, HttpResponse
20
from django.core.exceptions import ImproperlyConfigured, PermissionDenied
22
from django.core.exceptions import ImproperlyConfigured, PermissionDenied, FieldError
21 23
from django.http.request import QueryDict
22 24
from django.contrib.auth import (REDIRECT_FIELD_NAME, login as auth_login, SESSION_KEY,
23 25
                                 HASH_SESSION_KEY, BACKEND_SESSION_KEY, authenticate,
......
41 43
from django.utils.encoding import force_bytes
42 44
from django.shortcuts import render
43 45

  
46
from jsonfield import JSONField
47

  
44 48

  
45 49
try:
46 50
    from django.core.exceptions import FieldDoesNotExist
......
54 58
from . import plugins, app_settings, constants
55 59

  
56 60

  
61
IMPORT_EXPORT_FIELDS = (
62
    models.TextField, models.CharField, models.SlugField,
63
    models.URLField, models.BooleanField, models.IntegerField,
64
    models.CommaSeparatedIntegerField, models.EmailField, models.NullBooleanField,
65
    models.IntegerField, models.PositiveIntegerField, JSONField)
66

  
67

  
68
class ImportExportError(Exception):
69
    pass
70

  
71

  
57 72
class CleanLogMessage(logging.Filter):
58 73
    def filter(self, record):
59 74
        record.msg = filter_attribute_private_key(record.msg)
......
966 981
    user = copy.deepcopy(user)
967 982
    user.backend = backend
968 983
    return login(request, user, method, **kwargs)
984

  
985

  
986
class ImportExportMixin(object):
987

  
988
    @classmethod
989
    def get_model_fields(cls):
990
        return cls._meta.get_concrete_fields_with_model()
991

  
992
    @classmethod
993
    def get_model_required_fields(cls):
994
        return [field for field, _ in cls.get_model_fields() if not field.null and field.name != 'id']
995

  
996
    @classmethod
997
    def get_instance(cls, data):
998
        fields = cls.get_model_required_fields()
999
        kwargs = {field.name: data[field.name] for field in fields}
1000
        instance, created = cls.objects.get_or_create(**kwargs)
1001
        return instance
1002

  
1003
    @classmethod
1004
    def import_json(cls, data):
1005
        instance = cls.get_instance(data)
1006
        for field, model in cls._meta.get_concrete_fields_with_model():
1007
            if field.name == 'id':
1008
                continue
1009
            value = data[field.name]
1010
            if isinstance(field, IMPORT_EXPORT_FIELDS):
1011
                setattr(instance, field.attname, value)
1012
            elif isinstance(field, models.ForeignKey):
1013
                if value:
1014
                    related_model = field.rel.to
1015
                    related_instance = instance.get_object_by_main_attr(related_model, **value)
1016
                    if not related_instance:
1017
                        raise ImportExportError(
1018
                            '%s %s related object %s %s does not exist.' % (
1019
                                cls.__name__, instance, related_model.__name__, value.values()))
1020
                    value = related_instance
1021
                setattr(instance, field.name, value)
1022
            else:
1023
                raise Exception('export_json: field %s of ressource class %s is unsupported' % (
1024
                                field, self.__class__))
1025
        instance.save()
1026
        return instance
1027

  
1028
    def export_json(self):
1029
        data = {}
1030
        for field, model in self.get_model_fields():
1031
            if field.name == 'id':
1032
                continue
1033
            value = getattr(self, field.attname)
1034
            if isinstance(field, IMPORT_EXPORT_FIELDS):
1035
                data[field.name] = value
1036
            elif isinstance(field, models.ForeignKey):
1037
                if value:
1038
                    value = getattr(self, field.name)
1039
                    data[field.name] = value.export_json()
1040
                else:
1041
                    data[field.name] = None
1042
            else:
1043
                raise Exception('export_json: field %s of ressource class %s is unsupported' % (
1044
                                field, self.__class__))
1045
        return data
1046

  
1047
    def get_object_by_main_attr(self, model, **kwargs):
1048
        """Sequentially try to get an object by uuid, slug, then name.
1049
        Creates one if <create> is True
1050
        """
1051
        uuid = kwargs.pop('uuid', None)
1052
        slug = kwargs.pop('slug', None)
1053
        name = kwargs.pop('name', None)
1054
        try:
1055
            return model.objects.get(uuid=uuid)
1056
        except (model.DoesNotExist, FieldError):
1057
            try:
1058
                return model.objects.get(slug=slug)
1059
            except (model.DoesNotExist, FieldError):
1060
                try:
1061
                    return model.objects.get(name=name)
1062
                except (model.DoesNotExist, FieldError):
1063
                    pass
1064
        return None
1065

  
1066

  
1067
def export_roles(ou_slug=None):
1068
    from django_rbac.utils import get_role_model
1069
    Role = get_role_model()
1070
    filters = {'slug__startswith': '_'}
1071
    if ou_slug:
1072
        roles = Role.objects.filter(ou__slug=ou_slug).exclude(**filters)
1073
    else:
1074
        roles = Role.objects.exclude(**filters)
1075
    return [role.export_json() for role in roles]
1076

  
1077

  
1078
def import_roles(data, **options):
1079
    from django_rbac.utils import get_role_model
1080

  
1081
    Role = get_role_model()
1082
    ou_slug = options.pop('ou')
1083

  
1084
    if ou_slug:
1085
        data = [datum for datum in data if datum['ou']['slug'] == ou_slug]
1086
    with transaction.atomic():
1087
        for role_json in data:
1088
            Role.import_json(role_json)
1089

  
1090
        # once all roles are created, set their relationships
1091
        for role_json in data:
1092
            role = Role.objects.get(uuid=role_json['uuid'])
1093
            for parent_json in role_json.get('parents', []):
1094
                parent = role.get_object_by_main_attr(Role, **parent_json)
1095
                if parent:
1096
                    role.add_parent(parent)
1097
                else:
1098
                    raise ImportExportError(
1099
                        'Parent role (%(uuid)s, %(slug)s, %(name)s does not exist.' % parent_json)
tests/test_import_export.py
1
# -*- coding: utf-8 -*-
2

  
3
import json
4

  
5
import pytest
6

  
7
from django.core.management import call_command, CommandError
8
from django.contrib.auth import get_user_model
9
from django_rbac.utils import get_role_model, get_ou_model
10

  
11
from authentic2.models import Attribute, UserExternalId, Service
12
from authentic2.a2_rbac.models import RoleAttribute
13

  
14
pytestmark = pytest.mark.django_db
15

  
16

  
17
def create_user(**kwargs):
18
    User = get_user_model()
19
    Role = get_role_model()
20

  
21
    attributes = kwargs.pop('attributes')
22
    roles = kwargs.pop('roles')
23
    password = kwargs.pop('password', None) or kwargs['username']
24
    external_ids = kwargs.pop('external_ids', [])
25

  
26
    user = User.objects.create(**kwargs)
27

  
28
    if password:
29
        user.set_password(password)
30
        user.save()
31

  
32
    for key in attributes:
33
        Attribute.objects.get(name=key).set_value(user, attributes[key])
34

  
35
    for role in roles:
36
        user.roles.add(Role.objects.get(slug=role))
37

  
38
    for eid in external_ids:
39
        UserExternalId.objects.create(user=user, source=eid['source'],
40
                                      external_id=eid['external_id'])
41

  
42
    return user
43

  
44

  
45
# FIXTURES
46

  
47
@pytest.fixture(scope='session')
48
def tmp_export_dir(tmpdir_factory):
49
    return tmpdir_factory.mktemp('export')
50

  
51

  
52
@pytest.fixture
53
def city_ou(db):
54
    OU = get_ou_model()
55

  
56
    return OU.objects.create(
57
        uuid='6b0d73622a694ca7a0440cbc809487f7',
58
        name='Narnia', slug='city')
59

  
60

  
61
@pytest.fixture
62
def weird_ou(db):
63
    OU = get_ou_model()
64

  
65
    return OU.objects.create(
66
        uuid='aca57fbf08e1406c86900c86b335d004',
67
        slug='weird',
68
        name='Weird'
69
    )
70

  
71

  
72
@pytest.fixture
73
def city_roles(db, city_ou, weird_ou):
74

  
75
    Role = get_role_model()
76

  
77
    city_service = Service.objects.create(name='f.i.b', slug='fib', ou=city_ou)
78

  
79
    emails = {
80
        'city-agents': json.dumps(['smith@city.fan', 'coolsen@city.fan', 'fury@city.fan']),
81
        'city-enfance': json.dumps(['alice@city.fan']),
82
        'city-vip': json.dumps(['kirk@city.fan']),
83
        'city-dsi': json.dumps(['sheldon@city.fan', 'eliot@city.fan']),
84
        'city-culture': json.dumps(['chelsea@city.fan']),
85
    }
86

  
87
    roles = [
88
        {'uuid': 'e0f821033649475abf70448350796676', 'slug': 'city-agents', 'name': 'Narnia Agents'},
89
        {'uuid': 'f61a77e094894bd089039afb4f5ce64b', 'slug': 'city-enfance', 'name': 'Narnia Enfance'},
90
        {'uuid': '63516d5f7d444e63b639325d7b53fd3a', 'slug': 'city-vip', 'name': 'Narnia VIP'},
91
        {'uuid': 'f091a140421949208b1c1ebf78d15b35', 'slug': 'city-dsi', 'name': 'Narnia DSI'},
92
        {'uuid': '535583de8f544c7f9eb46725c72acf0a', 'slug': 'city-culture', 'name': 'Narnia Culture'},
93
    ]
94

  
95
    for role in roles:
96
        ou = weird_ou if role['slug'] == 'city-vip' else city_ou
97
        Role.objects.create(ou=ou, **role)
98

  
99
    concerned_roles = Role.objects.exclude(slug__startswith='_a2')
100

  
101
    for role in concerned_roles:
102
        RoleAttribute.objects.create(
103
            role=role, name='emails', kind='json',
104
            value=emails[role.slug]
105
        )
106

  
107
        RoleAttribute.objects.create(
108
            role=role, name='emails_to_members', kind='json', value=False)
109

  
110
    agents = Role.objects.get(slug='city-agents')
111

  
112
    for slug in ['city-enfance', 'city-culture']:
113
        Role.objects.get(slug=slug).add_parent(agents)
114

  
115
    role_dsi = Role.objects.get(slug='city-dsi')
116
    role_dsi.service = city_service
117
    role_dsi.save()
118

  
119

  
120
@pytest.fixture
121
def city_attributes(db):
122
    attributes = [
123
        {'name': 'street', 'label': 'street', 'kind': 'string'},
124
        {'name': 'city', 'label': 'city', 'kind': 'string'},
125
        {'name': 'zip_code', 'label': 'zip_code', 'kind': 'string'},
126
    ]
127

  
128
    for attr in attributes:
129
        Attribute.objects.create(**attr)
130

  
131

  
132
@pytest.fixture
133
def city_users(city_ou, weird_ou, city_roles, city_attributes):
134

  
135
    users = [
136
        {'username': 'josh', 'email': 'josh@loking.fan',
137
            'attributes': {'street': '302 Main Street', 'city': 'Vancouver', 'zip_code': 'V5K 0A6'},
138
            'roles': ['city-vip', 'city-agents', 'city-dsi'],
139
            'external_ids': [{'source': 'dedsec', 'external_id': 'r3tr0'},
140
                             {'source': 'nuddle', 'external_id': 'horatio'}]},
141
        {'username': 'kim', 'email': 'kim@loking.fan',
142
            'attributes': {'street': '37 Bvd Henry Orion', 'city': 'Nantes', 'zip_code': '44000'},
143
            'roles': ['city-enfance']},
144
        {'username': 'chelsea', 'email': 'chelsea@loking.fan',
145
            'attributes': {'street': '740 Studebaker Dr.', 'city': 'Baton Rouge', 'zip_code': '70806'},
146
            'roles': ['city-enfance']},
147
        {'username': 'mandla', 'email': 'mandla@loking.fan',
148
            'attributes': {'street': '497 Jacob Mare Street', 'city': 'Pretoria', 'zip_code': '0001'},
149
            'roles': ['city-culture']},
150
        {'username': 'tux', 'email': 'tux@linux.org',
151
            'attributes': {},
152
            'roles': ['city-vip']},
153
    ]
154

  
155
    for user in users:
156
        create_user(ou=city_ou, **user)
157

  
158
    others = [
159
        {'username': 'éric', 'email': 'eric@loking.fan',
160
            'attributes': {'street': '342 Lincon Blvd', 'city': 'Los Angeles', 'zip_code': '90291'},
161
            'roles': ['city-vip', 'city-agents', 'city-dsi']},
162
        {'username': 'hannibal', 'email': 'hannibal@loking.fan',
163
            'attributes': {'street': '12 5th Ave', 'city': 'New York', 'zip_code': '10002'},
164
            'roles': ['city-enfance']},
165
    ]
166

  
167
    for other in others:
168
        create_user(ou=weird_ou, **other)
169

  
170

  
171
# TESTS
172

  
173
def test_roles_import_export(tmp_export_dir, city_roles):
174
    dest_file = tmp_export_dir.join("roles.json").strpath
175
    call_command('export-roles', dest_file)
176
    data = json.load(file(dest_file))
177

  
178
    assert len(data) == 5
179

  
180
    for datum in data:
181
        if datum['slug'] in ['city-enfance', 'city-culture']:
182
            assert len(datum['parents']) == 1
183
            assert datum['parents'][0]['uuid'] == 'e0f821033649475abf70448350796676'
184
            assert datum['parents'][0]['slug'] == 'city-agents'
185

  
186
        if datum['slug'] == 'city-agents':
187
            assert len(datum['parents']) == 0
188
            assert datum['uuid'] == 'e0f821033649475abf70448350796676'
189

  
190
            assert len(datum['attributes']) == 2
191
            for attr in datum['attributes']:
192
                if attr['name'] == 'emails_to_members':
193
                    continue
194

  
195
                assert attr['kind'] == 'json'
196
                assert attr["value"] == '["smith@city.fan", "coolsen@city.fan", "fury@city.fan"]'
197

  
198
        if datum['slug'] == 'city-vip':
199
            assert datum['ou']['slug'] == 'weird'
200
            assert len(datum['attributes']) == 2
201
            for attr in datum['attributes']:
202
                if attr['name'] == 'emails':
203
                    assert attr['value'] == '["kirk@city.fan"]'
204
                else:
205
                    assert attr['value'] == "False"
206

  
207
    # import roles
208
    Role = get_role_model()
209
    Role.objects.exclude(slug__startswith='_').delete()
210
    call_command('import-roles', dest_file)
211

  
212
    roles = Role.objects.exclude(slug__startswith='_')
213

  
214
    assert len(roles) == 5
215

  
216
    for role in roles:
217
        if role.slug == 'city-vip':
218
            continue
219

  
220
        assert role.ou.uuid == '6b0d73622a694ca7a0440cbc809487f7'
221
        assert role.ou.slug == 'city'
222
        assert role.ou.name == 'Narnia'
223

  
224
    agent_role = roles.get(slug='city-agents')
225

  
226
    assert agent_role.uuid == 'e0f821033649475abf70448350796676'
227
    assert agent_role.name == 'Narnia Agents'
228

  
229
    assert len(agent_role.parents(include_self=False)) == 0
230

  
231
    for attr in agent_role.attributes.all():
232
        assert attr.kind == 'json'
233
        if attr.name == 'emails':
234
            assert attr.value == u'["smith@city.fan", "coolsen@city.fan", "fury@city.fan"]'
235

  
236
    enfance_role = roles.get(slug='city-enfance')
237

  
238
    assert enfance_role.uuid == 'f61a77e094894bd089039afb4f5ce64b'
239
    assert enfance_role.name == 'Narnia Enfance'
240
    assert len(enfance_role.parents(include_self=False)) == 1
241
    assert agent_role.child_relation.filter(child__slug='city-enfance').exists() is True
242
    assert Role.objects.exclude(slug__startswith='_').count() == 5
243

  
244
    # re-import the same roles and make sure the number or role is the same
245
    call_command('import-roles', dest_file)
246
    assert Role.objects.exclude(slug__startswith='_').count() == 5
247

  
248

  
249
def test_roles_import_export_by_ou_slug(tmp_export_dir, city_roles):
250
    dest_file = tmp_export_dir.join("roles_ou.json").strpath
251
    call_command('export-roles', dest_file, ou='city')
252
    data = json.load(file(dest_file))
253
    assert len(data) == 4
254

  
255
    # test import
256
    call_command('export-roles', dest_file, ou='weird')
257
    Role = get_role_model()
258
    Role.objects.exclude(slug__startswith='_').delete()
259

  
260
    dest_file = tmp_export_dir.join('roles_ou.json').strpath
261
    call_command('import-roles', dest_file, ou='weird')
262

  
263
    roles = Role.objects.exclude(slug__startswith='_')
264
    assert len(roles) == 1
265
    for role in roles:
266
        role.uuid = 'aca57fbf08e1406c86900c86b335d004'
267
        role.slug = 'weird'
268
        role.name = 'Weird'
269
    assert Role.objects.exclude(slug__startswith='_').count() == 1
270

  
271

  
272
def test_roles_import_export_with_missing_ou(tmp_export_dir, city_roles):
273
    dest_file = tmp_export_dir.join("roles.json").strpath
274
    call_command('export-roles', dest_file)
275
    Role = get_role_model()
276
    Role.objects.exclude(slug__startswith='_').delete()
277
    get_ou_model().objects.get(slug='city').delete()
278
    with pytest.raises(CommandError) as excinfo:
279
        call_command('import-roles', dest_file)
280
    assert Role.objects.exclude(slug__startswith='_').count() == 0
281

  
282

  
283
def test_roles_import_export_with_missing_service(tmp_export_dir, city_roles):
284
    dest_file = tmp_export_dir.join("roles.json").strpath
285
    call_command('export-roles', dest_file)
286
    Role = get_role_model()
287
    Role.objects.exclude(slug__startswith='_').delete()
288
    Service.objects.first().delete()
289
    with pytest.raises(CommandError) as excinfo:
290
        call_command('import-roles', dest_file)
291
    assert Role.objects.exclude(slug__startswith='_').count() == 0
292

  
293

  
294
def test_roles_import_export_with_missing_parent_role(tmp_export_dir, city_roles, city_ou):
295
    Role = get_role_model()
296
    dest_file = tmp_export_dir.join("roles.json").strpath
297
    role_tmp = Role.objects.create(uuid='tmp', slug='_tmp', name='tmp', ou=city_ou)
298
    Role.objects.get(slug='city-agents').add_parent(role_tmp)
299
    call_command('export-roles', dest_file)
300
    Role.objects.exclude(slug__startswith='_').delete()
301
    role_tmp.delete()
302
    with pytest.raises(CommandError) as excinfo:
303
        call_command('import-roles', dest_file)
304
    assert Role.objects.exclude(slug__startswith='_').count() == 0
0
-