Projet

Général

Profil

0004-agent-authentic2-add-hooks-on-signals-to-provision-u.patch

Benjamin Dauvergne, 06 octobre 2015 15:38

Télécharger (11,6 ko)

Voir les différences:

Subject: [PATCH 4/4] agent/authentic2: add hooks on signals to provision users
 (#8440)

Signals intercepted:
- post_save and post_delete on User
- m2m_changed on Role.members.through
 hobo/agent/authentic2/apps.py         |  86 +++++++++++++++++++++++--
 tests_authentic/test_provisionning.py | 116 +++++++++++++++++++++++++++++++++-
 2 files changed, 193 insertions(+), 9 deletions(-)
hobo/agent/authentic2/apps.py
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17 17
import json
18
from urlparse import urljoin
18 19

  
19 20
from django.apps import AppConfig
20
from django.db.models.signals import post_save, post_delete
21
from django.db.models.signals import post_save, post_delete, m2m_changed
21 22
from django.conf import settings
23
from django.contrib.auth import get_user_model
24
from django.db import connection
25
from django.core.urlresolvers import reverse
22 26

  
23 27
from django_rbac.utils import get_role_model
24 28

  
25 29
from hobo.agent.common import notify_agents
26
from authentic2.utils import to_list
27 30
from authentic2.saml.models import LibertyProvider
28 31
from authentic2.a2_rbac.models import OrganizationalUnit
29 32

  
......
57 60
        role.emails = []
58 61
        role.emails_to_members = False
59 62
        for attribute in role.attributes.all():
60
            if attribute.name in ('emails', 'emails_to_members') and attribute.kind == 'json':
63
            if attribute.name in ('emails', 'emails_to_members') \
64
               and attribute.kind == 'json':
61 65
                setattr(role, attribute.name, json.loads(attribute.value))
62 66
    return qs
63 67

  
......
86 90
        pass
87 91

  
88 92

  
93
def get_entity_id():
94
    tenant = getattr(connection, 'tenant', None)
95
    assert tenant
96
    base_url = tenant.get_base_url()
97
    return urljoin(base_url, reverse('a2-idp-saml-metadata'))
98

  
99

  
100
def provision_user(sender, instance, **kwargs):
101
    notify_agents({
102
        '@type': 'provision',
103
        'issuer': unicode(get_entity_id()),
104
        'audience': get_audience(instance),
105
        'full': True,
106
        'objects': {
107
            '@type': 'user',
108
            'data': [
109
                {
110
                    'uuid': instance.uuid,
111
                    'username': instance.username,
112
                    'first_name': instance.first_name,
113
                    'last_name': instance.last_name,
114
                    'email': instance.email,
115
                    'roles': [
116
                        {
117
                            'uuid': role.uuid,
118
                            'name': role.name,
119
                            'slug': role.slug,
120
                        } for role in instance.roles_and_parents()],
121
                }
122
            ],
123
        }
124
    })
125

  
126

  
127
def deprovision_user(sender, instance, **kwargs):
128
    notify_agents({
129
        '@type': 'deprovision',
130
        'issuer': unicode(get_entity_id()),
131
        'audience': get_audience(instance),
132
        'full': True,
133
        'objects': {
134
            '@type': 'instance',
135
            'data': [
136
                {
137
                    'uuid': instance.uuid,
138
                }
139
            ],
140
        }
141
    })
142

  
143

  
144
def provision_user_on_role_change(sender, action, instance, model, pk_set,
145
                                  reverse, **kwargs):
146
    if not action.startswith('post'):
147
        return
148
    if reverse:
149
        provision_user(sender, instance, **kwargs)
150
    else:
151
        for user in model.objects.filter(pk__in=pk_set):
152
            provision_user(sender, user, **kwargs)
153

  
154

  
89 155
class Authentic2AgentConfig(AppConfig):
90 156
    name = 'hobo.agent.authentic2'
91 157
    label = 'authentic2_agent'
......
96 162
            Role = get_role_model()
97 163
            post_save.connect(notify_roles, sender=Role)
98 164
            post_delete.connect(notify_roles, sender=Role)
99
            post_save.connect(notify_roles, sender=Role.members.through)
100
            post_delete.connect(notify_roles, sender=Role.members.through)
101
        settings.A2_MANAGER_ROLE_FORM_CLASS = 'hobo.agent.authentic2.role_forms.RoleForm'
165
            post_save.connect(notify_roles, Role)
166
            post_delete.connect(notify_roles, Role)
167
            post_save.connect(notify_roles, Role.members.through)
168
            post_delete.connect(notify_roles, Role.members.through)
169
            User = get_user_model()
170
            post_save.connect(provision_user, sender=User)
171
            post_delete.connect(deprovision_user, sender=User)
172
            m2m_changed.connect(provision_user_on_role_change,
173
                                sender=Role.members.through)
174
        settings.A2_MANAGER_ROLE_FORM_CLASS = \
175
            'hobo.agent.authentic2.role_forms.RoleForm'
tests_authentic/test_provisionning.py
1 1
# -*- coding: utf-8 -*-
2 2
import pytest
3 3

  
4
from mock import patch, MagicMock, call, ANY
4
from mock import patch, call, ANY
5 5

  
6 6
from django.contrib.auth import get_user_model
7 7

  
......
12 12

  
13 13
pytestmark = pytest.mark.django_db
14 14

  
15

  
15 16
def test_provision_role(tenant):
16 17
    with patch('hobo.agent.authentic2.apps.notify_agents') as notify_agents:
17 18
        with tenant_context(tenant):
......
25 26
                'audience', '@type', 'objects', 'full'])
26 27
            assert arg['audience'] == []
27 28
            assert arg['@type'] == 'provision'
28
            assert arg['full'] == True
29
            assert arg['full'] is True
29 30
            objects = arg['objects']
30 31
            assert isinstance(objects, dict)
31 32
            assert set(objects.keys()) == set(['data', '@type'])
......
38 39
                assert set(o.keys()) == set(['emails_to_members',
39 40
                                             'description', 'uuid', 'name',
40 41
                                             'slug', 'emails'])
41
                assert o['emails_to_members'] == False
42
                assert o['emails_to_members'] is False
42 43
                assert o['emails'] == []
43 44
                if o['uuid'] == role.uuid and o['name'] == role.name \
44 45
                   and o['description'] == role.description \
45 46
                   and o['slug'] == role.slug:
46 47
                    like_role += 1
47 48
            assert like_role == 1
49

  
50

  
51
def test_provision_user(tenant):
52
    with patch('hobo.agent.authentic2.apps.notify_agents') as notify_agents:
53
        with tenant_context(tenant):
54
            role = Role.objects.create(name='coin', ou=get_default_ou())
55
            notify_agents.reset_mock()
56
            User = get_user_model()
57
            user = User.objects.create(username=u'Étienne',
58
                                       email='etienne.dugenou@example.net',
59
                                       first_name=u'Étienne',
60
                                       last_name=u'Dugenou',
61
                                       ou=get_default_ou())
62
            assert notify_agents.call_count == 1
63
            arg = notify_agents.call_args
64
            assert arg == call(ANY)
65
            arg = arg[0][0]
66
            assert isinstance(arg, dict)
67
            assert set(arg.keys()) == set([
68
                'issuer', 'audience', '@type', 'objects', 'full'])
69
            assert arg['issuer'] == \
70
                'http://%s/idp/saml2/metadata' % tenant.domain_url
71
            assert arg['audience'] == []
72
            assert arg['@type'] == 'provision'
73
            assert arg['full'] is True
74
            objects = arg['objects']
75
            assert isinstance(objects, dict)
76
            assert set(objects.keys()) == set(['data', '@type'])
77
            assert objects['@type'] == 'user'
78
            data = objects['data']
79
            assert isinstance(data, list)
80
            assert len(data) == 1
81
            for o in data:
82
                assert set(o.keys()) == set(['uuid', 'username', 'first_name',
83
                                             'last_name', 'email', 'roles'])
84
                assert o['uuid'] == user.uuid
85
                assert o['username'] == user.username
86
                assert o['first_name'] == user.first_name
87
                assert o['last_name'] == user.last_name
88
                assert o['email'] == user.email
89
                assert o['roles'] == []
90

  
91
            notify_agents.reset_mock()
92
            role.members.add(user)
93

  
94
            assert notify_agents.call_count == 1
95
            arg = notify_agents.call_args
96
            assert arg == call(ANY)
97
            arg = arg[0][0]
98
            assert isinstance(arg, dict)
99
            assert set(arg.keys()) == set([
100
                'issuer', 'audience', '@type', 'objects', 'full'])
101
            assert arg['issuer'] == \
102
                'http://%s/idp/saml2/metadata' % tenant.domain_url
103
            assert arg['audience'] == []
104
            assert arg['@type'] == 'provision'
105
            assert arg['full'] is True
106
            objects = arg['objects']
107
            assert isinstance(objects, dict)
108
            assert set(objects.keys()) == set(['data', '@type'])
109
            assert objects['@type'] == 'user'
110
            data = objects['data']
111
            assert isinstance(data, list)
112
            assert len(data) == 1
113
            for o in data:
114
                assert set(o.keys()) == set(['uuid', 'username', 'first_name',
115
                                             'last_name', 'email', 'roles'])
116
                assert o['uuid'] == user.uuid
117
                assert o['username'] == user.username
118
                assert o['first_name'] == user.first_name
119
                assert o['last_name'] == user.last_name
120
                assert o['email'] == user.email
121
                assert o['roles'] == [{
122
                    'uuid': role.uuid,
123
                    'name': role.name,
124
                    'slug': role.slug
125
                }]
126

  
127
            notify_agents.reset_mock()
128
            user.roles.remove(role)
129

  
130
            assert notify_agents.call_count == 1
131
            arg = notify_agents.call_args
132
            assert arg == call(ANY)
133
            arg = arg[0][0]
134
            assert isinstance(arg, dict)
135
            assert set(arg.keys()) == set([
136
                'issuer', 'audience', '@type', 'objects', 'full'])
137
            assert arg['issuer'] == \
138
                'http://%s/idp/saml2/metadata' % tenant.domain_url
139
            assert arg['audience'] == []
140
            assert arg['@type'] == 'provision'
141
            assert arg['full'] is True
142
            objects = arg['objects']
143
            assert isinstance(objects, dict)
144
            assert set(objects.keys()) == set(['data', '@type'])
145
            assert objects['@type'] == 'user'
146
            data = objects['data']
147
            assert isinstance(data, list)
148
            assert len(data) == 1
149
            for o in data:
150
                assert set(o.keys()) == set(['uuid', 'username', 'first_name',
151
                                             'last_name', 'email', 'roles'])
152
                assert o['uuid'] == user.uuid
153
                assert o['username'] == user.username
154
                assert o['first_name'] == user.first_name
155
                assert o['last_name'] == user.last_name
156
                assert o['email'] == user.email
157
                assert o['roles'] == []
48
-