Projet

Général

Profil

0003-agent-common-add-user-provisionning-and-tests-for-co.patch

Benjamin Dauvergne, 13 octobre 2015 11:47

Télécharger (12,5 ko)

Voir les différences:

Subject: [PATCH 3/4] agent/common: add user provisionning and tests for common
 agent (#8440)

 .../common/management/commands/hobo_notify.py      |  68 ++++++++-
 tests_multitenant/settings.py                      |   4 +-
 tests_multitenant/test_hobo_notify.py              | 154 +++++++++++++++++++++
 3 files changed, 222 insertions(+), 4 deletions(-)
hobo/agent/common/management/commands/hobo_notify.py
16 16

  
17 17
import json
18 18
import sys
19
import random
19 20

  
20 21
from django.core.management.base import BaseCommand
22
from django.db.transaction import atomic
23
from django.db import IntegrityError
21 24

  
22 25
from tenant_schemas.utils import tenant_context
23 26
from hobo.multitenant.middleware import TenantMiddleware
24 27

  
25 28
from hobo.agent.common.models import Role
26 29

  
30
class TryAgain(Exception):
31
    pass
27 32

  
28 33
class Command(BaseCommand):
29 34
    @classmethod
......
57 62
               and 'description' in o
58 63

  
59 64
    @classmethod
60
    def provision_role(cls, action, data, full=False):
65
    def check_valid_user(cls, o):
66
        return 'uuid' in o \
67
               and 'email' in o \
68
               and 'first_name' in o \
69
               and 'last_name' in o \
70
               and 'roles' in o
71

  
72
    @classmethod
73
    def provision_user(cls, issuer, action, data, full=False):
74
        from django.contrib.auth import get_user_model
75
        from mellon.models import UserSAMLIdentifier
76
        User = get_user_model()
77

  
78

  
79
        assert not full  # provisionning all users is dangerous, we prefer deprovision
80
        uuids = set()
81
        for o in data:
82
            try:
83
                with atomic():
84
                    if action == 'provision':
85
                        assert cls.check_valid_user(o)
86
                        try:
87
                            mellon_user = UserSAMLIdentifier.objects.get(
88
                                issuer=issuer, name_id=o['uuid'])
89
                            user = mellon_user.user
90
                        except UserSAMLIdentifier.DoesNotExist:
91
                            # temp user object
92
                            random_uid = str(random.randint(1,10000000000000))
93
                            user = User.objects.create(
94
                                username=random_uid)
95
                            mellon_user = UserSAMLIdentifier.objects.create(
96
                                user=user, issuer=issuer, name_id=o['uuid'])
97
                        user.first_name = o['first_name']
98
                        user.last_name = o['last_name']
99
                        user.email = o['email']
100
                        user.username = o['uuid'][:30]
101
                        user.save()
102
                        role_uuids = [role['uuid'] for role in o.get('roles', [])]
103
                        user.groups = Role.objects.filter(uuid__in=role_uuids)
104
                    elif action == 'deprovision':
105
                        assert 'uuid' in o
106
                uuids.add(o['uuid'])
107
            except IntegrityError:
108
                raise TryAgain
109
        if full and action == 'provision':
110
            for usi in UserSAMLIdentifier.objects.exclude(name_id__in=uuids):
111
                usi.user.delete()
112
        elif action == 'deprovision':
113
            for user in User.objects.filter(saml_identifiers__name_id__in=uuids):
114
                user.delete()
115

  
116

  
117
    @classmethod
118
    def provision_role(cls, issuer, action, data, full=False):
61 119
        uuids = set()
62 120
        for o in data:
63 121
            assert cls.check_valid_role(o)
......
93 151
        audience = notification['audience']
94 152
        full = notification['full'] if 'full' in notification else False
95 153
        entity_id = service.get('saml-sp-metadata-url')
154
        issuer = notification.get('issuer')
96 155
        assert entity_id, 'service has no saml-sp-metadat-url field'
97 156
        if entity_id not in audience:
98 157
            return
99 158
        uuids = set()
100 159
        object_type = notification['objects']['@type']
101
        getattr(cls, 'provision_' + object_type)(action, notification['objects']['data'], full=full)
160
        while True:
161
            try:
162
                getattr(cls, 'provision_' + object_type)(issuer, action, notification['objects']['data'], full=full)
163
            except TryAgain:
164
                continue
165
            break
tests_multitenant/settings.py
4 4

  
5 5
LANGUAGE_CODE = 'en-us'
6 6

  
7
INSTALLED_APPS = ('django.contrib.auth', 'django.contrib.sessions', 'django.contrib.contenttypes')
7
INSTALLED_APPS = ('django.contrib.auth', 'django.contrib.sessions', 'django.contrib.contenttypes', 'mellon')
8 8

  
9 9
PROJECT_NAME = 'fake-agent'
10 10

  
11 11
with patch.object(builtin, 'file', mock_open(read_data='xxx')):
12 12
    execfile(os.path.join(os.path.dirname(__file__), '../debian/debian_config_common.py'))
13 13

  
14
TENANT_APPS = ('django.contrib.auth','django.contrib.sessions', 'django.contrib.contenttypes', 'hobo.agent.common')
14
TENANT_APPS = ('django.contrib.auth','django.contrib.sessions', 'django.contrib.contenttypes', 'hobo.agent.common', 'mellon')
15 15

  
16 16
ROOT_URLCONF = 'hobo.agent.test_urls'
17 17
CACHES = {
tests_multitenant/test_hobo_notify.py
107 107
            Command.process_notification(tenant, notification)
108 108
            assert Group.objects.count() == 0
109 109
            assert Role.objects.count() == 0
110

  
111
def test_provision_users(tenants):
112
    from hobo.agent.common.management.commands.hobo_notify import Command
113
    from tenant_schemas.utils import tenant_context
114
    from django.contrib.auth import get_user_model
115
    from django.contrib.auth.models import Group
116
    from hobo.agent.common.models import Role
117

  
118
    User = get_user_model()
119

  
120
    # provision a role
121
    for tenant in tenants:
122
        with tenant_context(tenant):
123
            notification = {
124
                u'@type': u'provision',
125
                u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
126
                u'objects': {
127
                    u'@type': 'role',
128
                    u'data': [
129
                        {
130
                            u'uuid': u'12345',
131
                            u'name': u'Service petite enfance',
132
                            u'slug': u'service-petite-enfance',
133
                            u'description': u'Role du service petite enfance %s' % tenant.domain_url,
134
                        }
135
                    ]
136
                }
137
            }
138
            Command.process_notification(tenant, notification)
139
            assert Group.objects.count() == 1
140
            assert Role.objects.count() == 1
141
            role = Role.objects.get()
142
            assert role.uuid == u'12345'
143
            assert role.name == u'Service petite enfance'
144
            assert role.description == u'Role du service petite enfance %s' % tenant.domain_url
145

  
146
    # test user provisionning
147
    for tenant in tenants:
148
        with tenant_context(tenant):
149
            notification = {
150
                u'@type': u'provision',
151
                u'issuer': 'http://idp.example.net/idp/saml/metadata',
152
                u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
153
                u'objects': {
154
                    u'@type': 'user',
155
                    u'data': [
156
                        {
157
                            u'uuid': u'a' * 32,
158
                            u'first_name': u'John',
159
                            u'last_name': u'Doe',
160
                            u'email': u'john.doe@example.net',
161
                            u'roles': [
162
                                {
163
                                    u'uuid': u'12345',
164
                                    u'name': u'Service petite enfance',
165
                                    u'description': u'etc.',
166
                                },
167
                                {
168
                                    u'uuid': u'xyz',
169
                                    u'name': u'Service état civil',
170
                                    u'description': u'etc.',
171
                                },
172
                            ],
173
                        }
174
                    ]
175
                }
176
            }
177
            Command.process_notification(tenant, notification)
178
            assert User.objects.count() == 1
179
            assert Role.objects.count() == 1
180
            assert Group.objects.count() == 1
181
            user = User.objects.get()
182
            assert user.username == 'a' * 30
183
            assert user.first_name == 'John'
184
            assert user.last_name == 'Doe'
185
            assert user.email == 'john.doe@example.net'
186
            assert user.saml_identifiers.count() == 1
187
            usi = user.saml_identifiers.get()
188
            assert usi.issuer == 'http://idp.example.net/idp/saml/metadata'
189
            assert usi.name_id == 'a' * 32
190
            assert user.groups.count() == 1
191
            group = user.groups.get()
192
            assert group.name == 'Service petite enfance'
193
            role = Role.objects.get(group_ptr=group.pk)
194
            assert role.uuid == '12345'
195

  
196
    # test nothing change if run a second time
197
    for tenant in tenants:
198
        with tenant_context(tenant):
199
            notification = {
200
                u'@type': u'provision',
201
                u'issuer': 'http://idp.example.net/idp/saml/metadata',
202
                u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
203
                u'objects': {
204
                    u'@type': 'user',
205
                    u'data': [
206
                        {
207
                            u'uuid': u'a' * 32,
208
                            u'first_name': u'John',
209
                            u'last_name': u'Doe',
210
                            u'email': u'john.doe@example.net',
211
                            u'roles': [
212
                                {
213
                                    u'uuid': u'12345',
214
                                    u'name': u'Service petite enfance',
215
                                    u'description': u'etc.',
216
                                },
217
                                {
218
                                    u'uuid': u'xyz',
219
                                    u'name': u'Service état civil',
220
                                    u'description': u'etc.',
221
                                },
222
                            ],
223
                        }
224
                    ]
225
                }
226
            }
227
            Command.process_notification(tenant, notification)
228
            assert User.objects.count() == 1
229
            assert Role.objects.count() == 1
230
            assert Group.objects.count() == 1
231
            user = User.objects.get()
232
            assert user.username == 'a' * 30
233
            assert user.first_name == 'John'
234
            assert user.last_name == 'Doe'
235
            assert user.email == 'john.doe@example.net'
236
            assert user.saml_identifiers.count() == 1
237
            usi = user.saml_identifiers.get()
238
            assert usi.issuer == 'http://idp.example.net/idp/saml/metadata'
239
            assert usi.name_id == 'a' * 32
240
            assert user.groups.count() == 1
241
            group = user.groups.get()
242
            assert group.name == 'Service petite enfance'
243
            role = Role.objects.get(group_ptr=group.pk)
244
            assert role.uuid == '12345'
245

  
246
    # test deprovision works
247
    for tenant in tenants:
248
        with tenant_context(tenant):
249
            notification = {
250
                u'@type': u'deprovision',
251
                u'issuer': 'http://idp.example.net/idp/saml/metadata',
252
                u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
253
                u'objects': {
254
                    u'@type': 'user',
255
                    u'data': [
256
                        {
257
                            u'uuid': u'a' * 32,
258
                        }
259
                    ]
260
                }
261
            }
262
            Command.process_notification(tenant, notification)
263
            assert User.objects.count() == 0
110
-