Projet

Général

Profil

0003-agent-common-add-user-provisionning-and-tests-for-co.patch

Benjamin Dauvergne, 30 septembre 2015 18:06

Télécharger (13,1 ko)

Voir les différences:

Subject: [PATCH 3/3] agent/common: add user provisionning and tests for common
 agent (#8440)

 .../common/management/commands/hobo_notify.py      |  66 +++++++-
 tests_multitenant/settings.py                      |   4 +-
 tests_multitenant/test_hobo_notify.py              | 169 +++++++++++++++++++++
 3 files changed, 235 insertions(+), 4 deletions(-)
hobo/agent/common/management/commands/hobo_notify.py
16 16

  
17 17
import json
18 18
import sys
19
import random
19 20

  
20 21
from django.core.management.base import BaseCommand
22
from django.db.transaction import atomic
23
from django.db import IntegrityError
21 24

  
22 25
from tenant_schemas.utils import tenant_context
23 26
from hobo.multitenant.middleware import TenantMiddleware
24 27

  
25 28
from hobo.agent.common.models import Role
26 29

  
30
class TryAgain(Exception):
31
    pass
27 32

  
28 33
class Command(BaseCommand):
29 34
    @classmethod
......
57 62
               and 'description' in o
58 63

  
59 64
    @classmethod
60
    def provision_role(cls, action, data, full=False):
65
    def check_valid_user(cls, o):
66
        return 'uuid' in o \
67
               and 'email' in o \
68
               and 'first_name' in o \
69
               and 'last_name' in o \
70
               and 'roles' in o
71

  
72
    @classmethod
73
    def provision_user(cls, issuer, action, data, full=False):
74
        from django.contrib.auth import get_user_model
75
        from mellon.models import UserSAMLIdentifier
76
        User = get_user_model()
77

  
78

  
79
        assert not full  # provisionning all users is dangerous, we prefer deprovision
80
        uuids = set()
81
        for o in data:
82
            uuids.add(o['uuid'])
83
            try:
84
                with atomic():
85
                    assert cls.check_valid_user(o)
86
                    if action == 'provision':
87
                        try:
88
                            mellon_user = UserSAMLIdentifier.objects.get(
89
                                issuer=issuer, name_id=o['uuid'])
90
                            user = mellon_user.user
91
                        except UserSAMLIdentifier.DoesNotExist:
92
                            # temp user object
93
                            random_uid = str(random.randint(1,10000000000000))
94
                            user = User.objects.create(
95
                                username=random_uid)
96
                            mellon_user = UserSAMLIdentifier.objects.create(
97
                                user=user, issuer=issuer, name_id=o['uuid'])
98
                        user.first_name = o['first_name']
99
                        user.last_name = o['last_name']
100
                        user.email = o['email']
101
                        user.username = o['uuid'][:30]
102
                        user.save()
103
                        role_uuids = [role['uuid'] for role in o.get('roles', [])]
104
                        user.groups = Role.objects.filter(uuid__in=role_uuids)
105
            except IntegrityError:
106
                raise TryAgain
107
        if full and action == 'provision':
108
            for usi in UserSAMLIdentifier.objects.exclude(name_id__in=uuids):
109
                usi.user.delete()
110
        elif action == 'deprovision':
111
            for user in User.objects.filter(saml_identifiers__name_id__in=uuids):
112
                user.delete()
113

  
114

  
115
    @classmethod
116
    def provision_role(cls, issuer, action, data, full=False):
61 117
        uuids = set()
62 118
        for o in data:
63 119
            assert cls.check_valid_role(o)
......
93 149
        audience = notification['audience']
94 150
        full = notification['full'] if 'full' in notification else False
95 151
        entity_id = service.get('saml-sp-metadata-url')
152
        issuer = notification.get('issuer')
96 153
        assert entity_id, 'service has no saml-sp-metadat-url field'
97 154
        if entity_id not in audience:
98 155
            return
99 156
        uuids = set()
100 157
        object_type = notification['objects']['@type']
101
        getattr(cls, 'provision_' + object_type)(action, notification['objects']['data'], full=full)
158
        while True:
159
            try:
160
                getattr(cls, 'provision_' + object_type)(issuer, action, notification['objects']['data'], full=full)
161
            except TryAgain:
162
                continue
163
            break
tests_multitenant/settings.py
4 4

  
5 5
LANGUAGE_CODE = 'en-us'
6 6

  
7
INSTALLED_APPS = ('django.contrib.auth', 'django.contrib.sessions', 'django.contrib.contenttypes')
7
INSTALLED_APPS = ('django.contrib.auth', 'django.contrib.sessions', 'django.contrib.contenttypes', 'mellon')
8 8

  
9 9
PROJECT_NAME = 'fake-agent'
10 10

  
11 11
with patch.object(builtin, 'file', mock_open(read_data='xxx')):
12 12
    execfile(os.path.join(os.path.dirname(__file__), '../debian/debian_config_common.py'))
13 13

  
14
TENANT_APPS = ('django.contrib.auth','django.contrib.sessions', 'django.contrib.contenttypes', 'hobo.agent.common')
14
TENANT_APPS = ('django.contrib.auth','django.contrib.sessions', 'django.contrib.contenttypes', 'hobo.agent.common', 'mellon')
15 15

  
16 16
ROOT_URLCONF = 'hobo.agent.test_urls'
17 17
CACHES = {
tests_multitenant/test_hobo_notify.py
107 107
            Command.process_notification(tenant, notification)
108 108
            assert Group.objects.count() == 0
109 109
            assert Role.objects.count() == 0
110

  
111
def test_provision_users(tenants):
112
    from hobo.agent.common.management.commands.hobo_notify import Command
113
    from tenant_schemas.utils import tenant_context
114
    from django.contrib.auth import get_user_model
115
    from django.contrib.auth.models import Group
116
    from hobo.agent.common.models import Role
117

  
118
    User = get_user_model()
119

  
120
    # provision a role
121
    for tenant in tenants:
122
        with tenant_context(tenant):
123
            notification = {
124
                u'@type': u'provision',
125
                u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
126
                u'objects': {
127
                    u'@type': 'role',
128
                    u'data': [
129
                        {
130
                            u'uuid': u'12345',
131
                            u'name': u'Service petite enfance',
132
                            u'slug': u'service-petite-enfance',
133
                            u'description': u'Role du service petite enfance %s' % tenant.domain_url,
134
                        }
135
                    ]
136
                }
137
            }
138
            Command.process_notification(tenant, notification)
139
            assert Group.objects.count() == 1
140
            assert Role.objects.count() == 1
141
            role = Role.objects.get()
142
            assert role.uuid == u'12345'
143
            assert role.name == u'Service petite enfance'
144
            assert role.description == u'Role du service petite enfance %s' % tenant.domain_url
145

  
146
    # test user provisionning
147
    for tenant in tenants:
148
        with tenant_context(tenant):
149
            notification = {
150
                u'@type': u'provision',
151
                u'issuer': 'http://idp.example.net/idp/saml/metadata',
152
                u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
153
                u'objects': {
154
                    u'@type': 'user',
155
                    u'data': [
156
                        {
157
                            u'uuid': u'a' * 32,
158
                            u'first_name': u'John',
159
                            u'last_name': u'Doe',
160
                            u'email': u'john.doe@example.net',
161
                            u'roles': [
162
                                {
163
                                    u'uuid': u'12345',
164
                                    u'name': u'Service petite enfance',
165
                                    u'description': u'etc.',
166
                                },
167
                                {
168
                                    u'uuid': u'xyz',
169
                                    u'name': u'Service état civil',
170
                                    u'description': u'etc.',
171
                                },
172
                            ],
173
                        }
174
                    ]
175
                }
176
            }
177
            Command.process_notification(tenant, notification)
178
            assert User.objects.count() == 1
179
            assert Role.objects.count() == 1
180
            assert Group.objects.count() == 1
181
            user = User.objects.get()
182
            assert user.username == 'a' * 30
183
            assert user.first_name == 'John'
184
            assert user.last_name == 'Doe'
185
            assert user.email == 'john.doe@example.net'
186
            assert user.saml_identifiers.count() == 1
187
            usi = user.saml_identifiers.get()
188
            assert usi.issuer == 'http://idp.example.net/idp/saml/metadata'
189
            assert usi.name_id == 'a' * 32
190
            assert user.groups.count() == 1
191
            group = user.groups.get()
192
            assert group.name == 'Service petite enfance'
193
            role = Role.objects.get(group_ptr=group.pk)
194
            assert role.uuid == '12345'
195

  
196
    # test nothing change if run a second time
197
    for tenant in tenants:
198
        with tenant_context(tenant):
199
            notification = {
200
                u'@type': u'provision',
201
                u'issuer': 'http://idp.example.net/idp/saml/metadata',
202
                u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
203
                u'objects': {
204
                    u'@type': 'user',
205
                    u'data': [
206
                        {
207
                            u'uuid': u'a' * 32,
208
                            u'first_name': u'John',
209
                            u'last_name': u'Doe',
210
                            u'email': u'john.doe@example.net',
211
                            u'roles': [
212
                                {
213
                                    u'uuid': u'12345',
214
                                    u'name': u'Service petite enfance',
215
                                    u'description': u'etc.',
216
                                },
217
                                {
218
                                    u'uuid': u'xyz',
219
                                    u'name': u'Service état civil',
220
                                    u'description': u'etc.',
221
                                },
222
                            ],
223
                        }
224
                    ]
225
                }
226
            }
227
            Command.process_notification(tenant, notification)
228
            assert User.objects.count() == 1
229
            assert Role.objects.count() == 1
230
            assert Group.objects.count() == 1
231
            user = User.objects.get()
232
            assert user.username == 'a' * 30
233
            assert user.first_name == 'John'
234
            assert user.last_name == 'Doe'
235
            assert user.email == 'john.doe@example.net'
236
            assert user.saml_identifiers.count() == 1
237
            usi = user.saml_identifiers.get()
238
            assert usi.issuer == 'http://idp.example.net/idp/saml/metadata'
239
            assert usi.name_id == 'a' * 32
240
            assert user.groups.count() == 1
241
            group = user.groups.get()
242
            assert group.name == 'Service petite enfance'
243
            role = Role.objects.get(group_ptr=group.pk)
244
            assert role.uuid == '12345'
245

  
246
    # test deprovision works
247
    for tenant in tenants:
248
        with tenant_context(tenant):
249
            notification = {
250
                u'@type': u'deprovision',
251
                u'issuer': 'http://idp.example.net/idp/saml/metadata',
252
                u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
253
                u'objects': {
254
                    u'@type': 'user',
255
                    u'data': [
256
                        {
257
                            u'uuid': u'a' * 32,
258
                            u'first_name': u'John',
259
                            u'last_name': u'Doe',
260
                            u'email': u'john.doe@example.net',
261
                            u'roles': [
262
                                {
263
                                    u'uuid': u'12345',
264
                                    u'name': u'Service petite enfance',
265
                                    u'description': u'etc.',
266
                                },
267
                                {
268
                                    u'uuid': u'xyz',
269
                                    u'name': u'Service état civil',
270
                                    u'description': u'etc.',
271
                                },
272
                            ],
273
                        }
274
                    ]
275
                }
276
            }
277
            Command.process_notification(tenant, notification)
278
            assert User.objects.count() == 0
110
-