0004-agent-authentic2-add-hooks-on-signals-to-provision-u.patch
hobo/agent/authentic2/apps.py | ||
---|---|---|
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 | 17 |
import json |
18 |
from urlparse import urljoin |
|
18 | 19 | |
19 | 20 |
from django.apps import AppConfig |
20 |
from django.db.models.signals import post_save, post_delete |
|
21 |
from django.db.models.signals import post_save, post_delete, m2m_changed
|
|
21 | 22 |
from django.conf import settings |
23 |
from django.contrib.auth import get_user_model |
|
24 |
from django.db import connection |
|
25 |
from django.core.urlresolvers import reverse |
|
22 | 26 | |
23 | 27 |
from django_rbac.utils import get_role_model |
24 | 28 | |
25 | 29 |
from hobo.agent.common import notify_agents |
26 |
from authentic2.utils import to_list |
|
27 | 30 |
from authentic2.saml.models import LibertyProvider |
28 | 31 |
from authentic2.a2_rbac.models import OrganizationalUnit |
29 | 32 | |
... | ... | |
57 | 60 |
role.emails = [] |
58 | 61 |
role.emails_to_members = False |
59 | 62 |
for attribute in role.attributes.all(): |
60 |
if attribute.name in ('emails', 'emails_to_members') and attribute.kind == 'json': |
|
63 |
if attribute.name in ('emails', 'emails_to_members') \ |
|
64 |
and attribute.kind == 'json': |
|
61 | 65 |
setattr(role, attribute.name, json.loads(attribute.value)) |
62 | 66 |
return qs |
63 | 67 | |
... | ... | |
86 | 90 |
pass |
87 | 91 | |
88 | 92 | |
93 |
def get_entity_id(): |
|
94 |
tenant = getattr(connection, 'tenant', None) |
|
95 |
assert tenant |
|
96 |
base_url = tenant.get_base_url() |
|
97 |
return urljoin(base_url, reverse('a2-idp-saml-metadata')) |
|
98 | ||
99 | ||
100 |
def provision_user(sender, instance, **kwargs): |
|
101 |
notify_agents({ |
|
102 |
'@type': 'provision', |
|
103 |
'issuer': unicode(get_entity_id()), |
|
104 |
'audience': get_audience(instance), |
|
105 |
'full': False, |
|
106 |
'objects': { |
|
107 |
'@type': 'user', |
|
108 |
'data': [ |
|
109 |
{ |
|
110 |
'uuid': instance.uuid, |
|
111 |
'username': instance.username, |
|
112 |
'first_name': instance.first_name, |
|
113 |
'last_name': instance.last_name, |
|
114 |
'email': instance.email, |
|
115 |
'roles': [ |
|
116 |
{ |
|
117 |
'uuid': role.uuid, |
|
118 |
'name': role.name, |
|
119 |
'slug': role.slug, |
|
120 |
} for role in instance.roles_and_parents()], |
|
121 |
} |
|
122 |
], |
|
123 |
} |
|
124 |
}) |
|
125 | ||
126 | ||
127 |
def deprovision_user(sender, instance, **kwargs): |
|
128 |
notify_agents({ |
|
129 |
'@type': 'deprovision', |
|
130 |
'issuer': unicode(get_entity_id()), |
|
131 |
'audience': get_audience(instance), |
|
132 |
'full': True, |
|
133 |
'objects': { |
|
134 |
'@type': 'instance', |
|
135 |
'data': [ |
|
136 |
{ |
|
137 |
'uuid': instance.uuid, |
|
138 |
} |
|
139 |
], |
|
140 |
} |
|
141 |
}) |
|
142 | ||
143 | ||
144 |
def provision_user_on_role_change(sender, action, instance, model, pk_set, |
|
145 |
reverse, **kwargs): |
|
146 |
if not action.startswith('post'): |
|
147 |
return |
|
148 |
if reverse: |
|
149 |
provision_user(sender, instance, **kwargs) |
|
150 |
else: |
|
151 |
for user in model.objects.filter(pk__in=pk_set): |
|
152 |
provision_user(sender, user, **kwargs) |
|
153 | ||
154 | ||
89 | 155 |
class Authentic2AgentConfig(AppConfig): |
90 | 156 |
name = 'hobo.agent.authentic2' |
91 | 157 |
label = 'authentic2_agent' |
... | ... | |
96 | 162 |
Role = get_role_model() |
97 | 163 |
post_save.connect(notify_roles, sender=Role) |
98 | 164 |
post_delete.connect(notify_roles, sender=Role) |
99 |
post_save.connect(notify_roles, sender=Role.members.through) |
|
100 |
post_delete.connect(notify_roles, sender=Role.members.through) |
|
101 |
settings.A2_MANAGER_ROLE_FORM_CLASS = 'hobo.agent.authentic2.role_forms.RoleForm' |
|
165 |
post_save.connect(notify_roles, Role) |
|
166 |
post_delete.connect(notify_roles, Role) |
|
167 |
post_save.connect(notify_roles, Role.members.through) |
|
168 |
post_delete.connect(notify_roles, Role.members.through) |
|
169 |
User = get_user_model() |
|
170 |
post_save.connect(provision_user, sender=User) |
|
171 |
post_delete.connect(deprovision_user, sender=User) |
|
172 |
m2m_changed.connect(provision_user_on_role_change, |
|
173 |
sender=Role.members.through) |
|
174 |
settings.A2_MANAGER_ROLE_FORM_CLASS = \ |
|
175 |
'hobo.agent.authentic2.role_forms.RoleForm' |
tests_authentic/test_provisionning.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 |
import pytest |
3 | 3 | |
4 |
from mock import patch, MagicMock, call, ANY
|
|
4 |
from mock import patch, call, ANY |
|
5 | 5 | |
6 | 6 |
from django.contrib.auth import get_user_model |
7 | 7 | |
... | ... | |
12 | 12 | |
13 | 13 |
pytestmark = pytest.mark.django_db |
14 | 14 | |
15 | ||
15 | 16 |
def test_provision_role(tenant): |
16 | 17 |
with patch('hobo.agent.authentic2.apps.notify_agents') as notify_agents: |
17 | 18 |
with tenant_context(tenant): |
... | ... | |
25 | 26 |
'audience', '@type', 'objects', 'full']) |
26 | 27 |
assert arg['audience'] == [] |
27 | 28 |
assert arg['@type'] == 'provision' |
28 |
assert arg['full'] == True
|
|
29 |
assert arg['full'] is True
|
|
29 | 30 |
objects = arg['objects'] |
30 | 31 |
assert isinstance(objects, dict) |
31 | 32 |
assert set(objects.keys()) == set(['data', '@type']) |
... | ... | |
38 | 39 |
assert set(o.keys()) == set(['emails_to_members', |
39 | 40 |
'description', 'uuid', 'name', |
40 | 41 |
'slug', 'emails']) |
41 |
assert o['emails_to_members'] == False
|
|
42 |
assert o['emails_to_members'] is False
|
|
42 | 43 |
assert o['emails'] == [] |
43 | 44 |
if o['uuid'] == role.uuid and o['name'] == role.name \ |
44 | 45 |
and o['description'] == role.description \ |
45 | 46 |
and o['slug'] == role.slug: |
46 | 47 |
like_role += 1 |
47 | 48 |
assert like_role == 1 |
49 | ||
50 | ||
51 |
def test_provision_user(tenant): |
|
52 |
with patch('hobo.agent.authentic2.apps.notify_agents') as notify_agents: |
|
53 |
with tenant_context(tenant): |
|
54 |
role = Role.objects.create(name='coin', ou=get_default_ou()) |
|
55 |
notify_agents.reset_mock() |
|
56 |
User = get_user_model() |
|
57 |
user = User.objects.create(username=u'Étienne', |
|
58 |
email='etienne.dugenou@example.net', |
|
59 |
first_name=u'Étienne', |
|
60 |
last_name=u'Dugenou', |
|
61 |
ou=get_default_ou()) |
|
62 |
assert notify_agents.call_count == 1 |
|
63 |
arg = notify_agents.call_args |
|
64 |
assert arg == call(ANY) |
|
65 |
arg = arg[0][0] |
|
66 |
assert isinstance(arg, dict) |
|
67 |
assert set(arg.keys()) == set([ |
|
68 |
'issuer', 'audience', '@type', 'objects', 'full']) |
|
69 |
assert arg['issuer'] == \ |
|
70 |
'http://%s/idp/saml2/metadata' % tenant.domain_url |
|
71 |
assert arg['audience'] == [] |
|
72 |
assert arg['@type'] == 'provision' |
|
73 |
assert arg['full'] is False |
|
74 |
objects = arg['objects'] |
|
75 |
assert isinstance(objects, dict) |
|
76 |
assert set(objects.keys()) == set(['data', '@type']) |
|
77 |
assert objects['@type'] == 'user' |
|
78 |
data = objects['data'] |
|
79 |
assert isinstance(data, list) |
|
80 |
assert len(data) == 1 |
|
81 |
for o in data: |
|
82 |
assert set(o.keys()) == set(['uuid', 'username', 'first_name', |
|
83 |
'last_name', 'email', 'roles']) |
|
84 |
assert o['uuid'] == user.uuid |
|
85 |
assert o['username'] == user.username |
|
86 |
assert o['first_name'] == user.first_name |
|
87 |
assert o['last_name'] == user.last_name |
|
88 |
assert o['email'] == user.email |
|
89 |
assert o['roles'] == [] |
|
90 | ||
91 |
notify_agents.reset_mock() |
|
92 |
role.members.add(user) |
|
93 | ||
94 |
assert notify_agents.call_count == 1 |
|
95 |
arg = notify_agents.call_args |
|
96 |
assert arg == call(ANY) |
|
97 |
arg = arg[0][0] |
|
98 |
assert isinstance(arg, dict) |
|
99 |
assert set(arg.keys()) == set([ |
|
100 |
'issuer', 'audience', '@type', 'objects', 'full']) |
|
101 |
assert arg['issuer'] == \ |
|
102 |
'http://%s/idp/saml2/metadata' % tenant.domain_url |
|
103 |
assert arg['audience'] == [] |
|
104 |
assert arg['@type'] == 'provision' |
|
105 |
assert arg['full'] is True |
|
106 |
objects = arg['objects'] |
|
107 |
assert isinstance(objects, dict) |
|
108 |
assert set(objects.keys()) == set(['data', '@type']) |
|
109 |
assert objects['@type'] == 'user' |
|
110 |
data = objects['data'] |
|
111 |
assert isinstance(data, list) |
|
112 |
assert len(data) == 1 |
|
113 |
for o in data: |
|
114 |
assert set(o.keys()) == set(['uuid', 'username', 'first_name', |
|
115 |
'last_name', 'email', 'roles']) |
|
116 |
assert o['uuid'] == user.uuid |
|
117 |
assert o['username'] == user.username |
|
118 |
assert o['first_name'] == user.first_name |
|
119 |
assert o['last_name'] == user.last_name |
|
120 |
assert o['email'] == user.email |
|
121 |
assert o['roles'] == [{ |
|
122 |
'uuid': role.uuid, |
|
123 |
'name': role.name, |
|
124 |
'slug': role.slug |
|
125 |
}] |
|
126 | ||
127 |
notify_agents.reset_mock() |
|
128 |
user.roles.remove(role) |
|
129 | ||
130 |
assert notify_agents.call_count == 1 |
|
131 |
arg = notify_agents.call_args |
|
132 |
assert arg == call(ANY) |
|
133 |
arg = arg[0][0] |
|
134 |
assert isinstance(arg, dict) |
|
135 |
assert set(arg.keys()) == set([ |
|
136 |
'issuer', 'audience', '@type', 'objects', 'full']) |
|
137 |
assert arg['issuer'] == \ |
|
138 |
'http://%s/idp/saml2/metadata' % tenant.domain_url |
|
139 |
assert arg['audience'] == [] |
|
140 |
assert arg['@type'] == 'provision' |
|
141 |
assert arg['full'] is True |
|
142 |
objects = arg['objects'] |
|
143 |
assert isinstance(objects, dict) |
|
144 |
assert set(objects.keys()) == set(['data', '@type']) |
|
145 |
assert objects['@type'] == 'user' |
|
146 |
data = objects['data'] |
|
147 |
assert isinstance(data, list) |
|
148 |
assert len(data) == 1 |
|
149 |
for o in data: |
|
150 |
assert set(o.keys()) == set(['uuid', 'username', 'first_name', |
|
151 |
'last_name', 'email', 'roles']) |
|
152 |
assert o['uuid'] == user.uuid |
|
153 |
assert o['username'] == user.username |
|
154 |
assert o['first_name'] == user.first_name |
|
155 |
assert o['last_name'] == user.last_name |
|
156 |
assert o['email'] == user.email |
|
157 |
assert o['roles'] == [] |
|
48 |
- |