From 2ce719438dd8326b501880687197c68632bd0b13 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Mon, 5 Oct 2015 16:30:02 +0200 Subject: [PATCH 4/4] agent/authentic2: add hooks on signals to provision users (#8440) Signals intercepted: - post_save and post_delete on User - m2m_changed on Role.members.through --- hobo/agent/authentic2/apps.py | 86 +++++++++++++++++++++++-- tests_authentic/test_provisionning.py | 116 +++++++++++++++++++++++++++++++++- 2 files changed, 193 insertions(+), 9 deletions(-) diff --git a/hobo/agent/authentic2/apps.py b/hobo/agent/authentic2/apps.py index 337d0d8..d8eccd1 100644 --- a/hobo/agent/authentic2/apps.py +++ b/hobo/agent/authentic2/apps.py @@ -15,15 +15,18 @@ # along with this program. If not, see . import json +from urlparse import urljoin from django.apps import AppConfig -from django.db.models.signals import post_save, post_delete +from django.db.models.signals import post_save, post_delete, m2m_changed from django.conf import settings +from django.contrib.auth import get_user_model +from django.db import connection +from django.core.urlresolvers import reverse from django_rbac.utils import get_role_model from hobo.agent.common import notify_agents -from authentic2.utils import to_list from authentic2.saml.models import LibertyProvider from authentic2.a2_rbac.models import OrganizationalUnit @@ -57,7 +60,8 @@ def get_related_roles(role_or_through): role.emails = [] role.emails_to_members = False for attribute in role.attributes.all(): - if attribute.name in ('emails', 'emails_to_members') and attribute.kind == 'json': + if attribute.name in ('emails', 'emails_to_members') \ + and attribute.kind == 'json': setattr(role, attribute.name, json.loads(attribute.value)) return qs @@ -86,6 +90,68 @@ def notify_roles(sender, instance, **kwargs): pass +def get_entity_id(): + tenant = getattr(connection, 'tenant', None) + assert tenant + base_url = tenant.get_base_url() + return urljoin(base_url, reverse('a2-idp-saml-metadata')) + + +def provision_user(sender, instance, **kwargs): + notify_agents({ + '@type': 'provision', + 'issuer': unicode(get_entity_id()), + 'audience': get_audience(instance), + 'full': False, + 'objects': { + '@type': 'user', + 'data': [ + { + 'uuid': instance.uuid, + 'username': instance.username, + 'first_name': instance.first_name, + 'last_name': instance.last_name, + 'email': instance.email, + 'roles': [ + { + 'uuid': role.uuid, + 'name': role.name, + 'slug': role.slug, + } for role in instance.roles_and_parents()], + } + ], + } + }) + + +def deprovision_user(sender, instance, **kwargs): + notify_agents({ + '@type': 'deprovision', + 'issuer': unicode(get_entity_id()), + 'audience': get_audience(instance), + 'full': True, + 'objects': { + '@type': 'instance', + 'data': [ + { + 'uuid': instance.uuid, + } + ], + } + }) + + +def provision_user_on_role_change(sender, action, instance, model, pk_set, + reverse, **kwargs): + if not action.startswith('post'): + return + if reverse: + provision_user(sender, instance, **kwargs) + else: + for user in model.objects.filter(pk__in=pk_set): + provision_user(sender, user, **kwargs) + + class Authentic2AgentConfig(AppConfig): name = 'hobo.agent.authentic2' label = 'authentic2_agent' @@ -96,6 +162,14 @@ class Authentic2AgentConfig(AppConfig): Role = get_role_model() post_save.connect(notify_roles, sender=Role) post_delete.connect(notify_roles, sender=Role) - post_save.connect(notify_roles, sender=Role.members.through) - post_delete.connect(notify_roles, sender=Role.members.through) - settings.A2_MANAGER_ROLE_FORM_CLASS = 'hobo.agent.authentic2.role_forms.RoleForm' + post_save.connect(notify_roles, Role) + post_delete.connect(notify_roles, Role) + post_save.connect(notify_roles, Role.members.through) + post_delete.connect(notify_roles, Role.members.through) + User = get_user_model() + post_save.connect(provision_user, sender=User) + post_delete.connect(deprovision_user, sender=User) + m2m_changed.connect(provision_user_on_role_change, + sender=Role.members.through) + settings.A2_MANAGER_ROLE_FORM_CLASS = \ + 'hobo.agent.authentic2.role_forms.RoleForm' diff --git a/tests_authentic/test_provisionning.py b/tests_authentic/test_provisionning.py index 59050da..9aed935 100644 --- a/tests_authentic/test_provisionning.py +++ b/tests_authentic/test_provisionning.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- import pytest -from mock import patch, MagicMock, call, ANY +from mock import patch, call, ANY from django.contrib.auth import get_user_model @@ -12,6 +12,7 @@ from authentic2.a2_rbac.utils import get_default_ou pytestmark = pytest.mark.django_db + def test_provision_role(tenant): with patch('hobo.agent.authentic2.apps.notify_agents') as notify_agents: with tenant_context(tenant): @@ -25,7 +26,7 @@ def test_provision_role(tenant): 'audience', '@type', 'objects', 'full']) assert arg['audience'] == [] assert arg['@type'] == 'provision' - assert arg['full'] == True + assert arg['full'] is True objects = arg['objects'] assert isinstance(objects, dict) assert set(objects.keys()) == set(['data', '@type']) @@ -38,10 +39,119 @@ def test_provision_role(tenant): assert set(o.keys()) == set(['emails_to_members', 'description', 'uuid', 'name', 'slug', 'emails']) - assert o['emails_to_members'] == False + assert o['emails_to_members'] is False assert o['emails'] == [] if o['uuid'] == role.uuid and o['name'] == role.name \ and o['description'] == role.description \ and o['slug'] == role.slug: like_role += 1 assert like_role == 1 + + +def test_provision_user(tenant): + with patch('hobo.agent.authentic2.apps.notify_agents') as notify_agents: + with tenant_context(tenant): + role = Role.objects.create(name='coin', ou=get_default_ou()) + notify_agents.reset_mock() + User = get_user_model() + user = User.objects.create(username=u'Étienne', + email='etienne.dugenou@example.net', + first_name=u'Étienne', + last_name=u'Dugenou', + ou=get_default_ou()) + assert notify_agents.call_count == 1 + arg = notify_agents.call_args + assert arg == call(ANY) + arg = arg[0][0] + assert isinstance(arg, dict) + assert set(arg.keys()) == set([ + 'issuer', 'audience', '@type', 'objects', 'full']) + assert arg['issuer'] == \ + 'http://%s/idp/saml2/metadata' % tenant.domain_url + assert arg['audience'] == [] + assert arg['@type'] == 'provision' + assert arg['full'] is False + objects = arg['objects'] + assert isinstance(objects, dict) + assert set(objects.keys()) == set(['data', '@type']) + assert objects['@type'] == 'user' + data = objects['data'] + assert isinstance(data, list) + assert len(data) == 1 + for o in data: + assert set(o.keys()) == set(['uuid', 'username', 'first_name', + 'last_name', 'email', 'roles']) + assert o['uuid'] == user.uuid + assert o['username'] == user.username + assert o['first_name'] == user.first_name + assert o['last_name'] == user.last_name + assert o['email'] == user.email + assert o['roles'] == [] + + notify_agents.reset_mock() + role.members.add(user) + + assert notify_agents.call_count == 1 + arg = notify_agents.call_args + assert arg == call(ANY) + arg = arg[0][0] + assert isinstance(arg, dict) + assert set(arg.keys()) == set([ + 'issuer', 'audience', '@type', 'objects', 'full']) + assert arg['issuer'] == \ + 'http://%s/idp/saml2/metadata' % tenant.domain_url + assert arg['audience'] == [] + assert arg['@type'] == 'provision' + assert arg['full'] is True + objects = arg['objects'] + assert isinstance(objects, dict) + assert set(objects.keys()) == set(['data', '@type']) + assert objects['@type'] == 'user' + data = objects['data'] + assert isinstance(data, list) + assert len(data) == 1 + for o in data: + assert set(o.keys()) == set(['uuid', 'username', 'first_name', + 'last_name', 'email', 'roles']) + assert o['uuid'] == user.uuid + assert o['username'] == user.username + assert o['first_name'] == user.first_name + assert o['last_name'] == user.last_name + assert o['email'] == user.email + assert o['roles'] == [{ + 'uuid': role.uuid, + 'name': role.name, + 'slug': role.slug + }] + + notify_agents.reset_mock() + user.roles.remove(role) + + assert notify_agents.call_count == 1 + arg = notify_agents.call_args + assert arg == call(ANY) + arg = arg[0][0] + assert isinstance(arg, dict) + assert set(arg.keys()) == set([ + 'issuer', 'audience', '@type', 'objects', 'full']) + assert arg['issuer'] == \ + 'http://%s/idp/saml2/metadata' % tenant.domain_url + assert arg['audience'] == [] + assert arg['@type'] == 'provision' + assert arg['full'] is True + objects = arg['objects'] + assert isinstance(objects, dict) + assert set(objects.keys()) == set(['data', '@type']) + assert objects['@type'] == 'user' + data = objects['data'] + assert isinstance(data, list) + assert len(data) == 1 + for o in data: + assert set(o.keys()) == set(['uuid', 'username', 'first_name', + 'last_name', 'email', 'roles']) + assert o['uuid'] == user.uuid + assert o['username'] == user.username + assert o['first_name'] == user.first_name + assert o['last_name'] == user.last_name + assert o['email'] == user.email + assert o['roles'] == [] -- 2.1.4