From ddf9ad2f0b21fd2f0c32515d458324a1ee990100 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Wed, 9 Sep 2015 09:32:38 +0200 Subject: [PATCH 1/2] new hobo_notify command to handle role provisionning/deprovisionning messages (fixes #8219) It gets wcs roles from uuid or slug of the authentic2 role; if role is using a slug it's replaced by the uuid. It handles rename by using uuid as the slug of roles. Targetted tenants are identified through their SAML entity id. --- tests/test_hobo_notify.py | 168 ++++++++++++++++++++++++++++++++++++++++++++++ wcs/ctl/hobo_notify.py | 131 ++++++++++++++++++++++++++++++++++++ 2 files changed, 299 insertions(+) create mode 100644 tests/test_hobo_notify.py create mode 100644 wcs/ctl/hobo_notify.py diff --git a/tests/test_hobo_notify.py b/tests/test_hobo_notify.py new file mode 100644 index 0000000..5192815 --- /dev/null +++ b/tests/test_hobo_notify.py @@ -0,0 +1,168 @@ +# -*- coding: utf-8 -*- +import shutil +from quixote import cleanup + +from wcs.ctl.hobo_notify import CmdHoboNotify +from wcs.roles import Role + +from utilities import create_temporary_pub + +pub = None + + +def setup_module(module): + cleanup() + global pub + pub = create_temporary_pub() + pub.cfg['sp'] = {'saml2_providerid': 'test'} + pub.write_cfg() + + +def teardown_module(module): + shutil.rmtree(pub.APP_DIR) + + +def setup_function(function): + r = Role(name='Service étt civil') + r.slug = 'service-ett-civil' + r.store() + + +def teardown_function(function): + Role.wipe() + + +def test_process_notification_wrong_audience(): + notification = { + '@type': u'provision', + 'audience': [u'coin'], + 'full': True, + 'objects': [ + { + '@type': 'role', + 'name': u'Service enfance', + 'slug': u'service-enfance', + 'description': u'Rôle du service petite enfance', + 'uuid': u'12345', + 'emails': [u'petite-enfance@example.com'], + 'emails_to_members': False, + }, + { + '@type': 'role', + 'name': u'Service état civil', + 'slug': u'service-etat-civil', + 'description': u'Rôle du service état civil', + 'uuid': u'xyz', + 'emails': [u'etat-civil@example.com'], + 'emails_to_members': True, + }, + ] + } + assert Role.count() == 1 + assert Role.select()[0].name == 'Service étt civil' + assert Role.select()[0].slug == 'service-ett-civil' + assert Role.select()[0].details is None + assert Role.select()[0].emails is None + assert Role.select()[0].emails_to_members is False + CmdHoboNotify.process_notification(notification) + assert Role.count() == 1 + assert Role.select()[0].name == 'Service étt civil' + assert Role.select()[0].slug == 'service-ett-civil' + assert Role.select()[0].details is None + assert Role.select()[0].emails is None + assert Role.select()[0].emails_to_members is False + + +def test_process_notification(): + notification = { + '@type': u'provision', + 'audience': [u'test'], + 'full': True, + 'objects': [ + { + '@type': 'role', + 'name': u'Service enfance', + 'slug': u'service-enfance', + 'description': u'Rôle du service petite enfance', + 'uuid': u'12345', + 'emails': [u'petite-enfance@example.com'], + 'emails_to_members': False, + }, + { + '@type': 'role', + 'name': u'Service état civil', + 'slug': u'service-ett-civil', + 'description': u'Rôle du service état civil', + 'uuid': u'xyz', + 'emails': [u'etat-civil@example.com'], + 'emails_to_members': True, + }, + ] + } + assert Role.count() == 1 + assert Role.select()[0].name == 'Service étt civil' + assert Role.select()[0].slug == 'service-ett-civil' + assert Role.select()[0].details is None + assert Role.select()[0].emails is None + assert Role.select()[0].emails_to_members is False + existing_role_id = Role.select()[0].id + CmdHoboNotify.process_notification(notification) + assert Role.count() == 2 + old_role = Role.get(existing_role_id) + assert old_role.name == 'Service état civil' + assert old_role.slug == 'xyz' + assert old_role.details == 'Rôle du service état civil' + assert old_role.emails == ['etat-civil@example.com'] + assert old_role.emails_to_members is True + new_role = Role.get_on_index('12345', 'slug') + assert new_role.name == 'Service enfance' + assert new_role.details == 'Rôle du service petite enfance' + assert new_role.emails == ['petite-enfance@example.com'] + assert new_role.emails_to_members is False + notification = { + '@type': u'provision', + 'audience': [u'test'], + 'full': True, + 'objects': [ + { + '@type': 'role', + 'name': u'Service enfance', + 'slug': u'service-enfance', + 'description': u'Rôle du service petite enfance', + 'uuid': u'12345', + 'emails': [u'petite-enfance@example.com'], + 'emails_to_members': True, + }, + ] + } + CmdHoboNotify.process_notification(notification) + assert Role.count() == 1 + assert Role.select()[0].id == new_role.id + assert Role.select()[0].name == 'Service enfance' + assert Role.select()[0].slug == '12345' + assert Role.select()[0].details == 'Rôle du service petite enfance' + assert Role.select()[0].emails == ['petite-enfance@example.com'] + assert Role.select()[0].emails_to_members is True + +def test_process_notification_deprovision(): + notification = { + '@type': u'deprovision', + 'audience': [u'test'], + 'full': True, + 'objects': [ + { + '@type': 'role', + 'name': u'Service état civil', + 'slug': u'service-ett-civil', + 'description': u'Rôle du service état civil', + 'uuid': u'xyz', + 'emails': [u'etat-civil@example.com'], + 'emails_to_members': True, + }, + ] + } + assert Role.count() == 1 + assert Role.select()[0].name == 'Service étt civil' + assert Role.select()[0].slug == 'service-ett-civil' + CmdHoboNotify.process_notification(notification) + assert Role.count() == 0 diff --git a/wcs/ctl/hobo_notify.py b/wcs/ctl/hobo_notify.py new file mode 100644 index 0000000..4c5d2c3 --- /dev/null +++ b/wcs/ctl/hobo_notify.py @@ -0,0 +1,131 @@ +# w.c.s. - web application for online forms +# Copyright (C) 2005-2014 Entr'ouvert +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, see . + +import os +import sys +import json + +from quixote import get_publisher +from wcs.roles import Role +from qommon.ctl import Command +from qommon.publisher import get_cfg + + +class CmdHoboNotify(Command): + name = 'hobo_notify' + + def execute(self, base_options, sub_options, args): + self.base_options = base_options + if sub_options.extra: + if not self.config.has_section('extra'): + self.config.add_section('extra') + for i, extra in enumerate(sub_options.extra): + self.config.set('extra', 'cmd_line_extra_%d' % i, extra) + + notification = self.load_notification(args) + if not self.check_valid_notification(notification): + sys.exit(1) + import publisher + + publisher.WcsPublisher.configure(self.config) + pub = publisher.WcsPublisher.create_publisher() + global_app_dir = pub.app_dir + for hostname in os.listdir(global_app_dir): + app_dir = os.path.join(global_app_dir, hostname) + if not os.path.isdir(app_dir): + continue + pub.app_dir = app_dir + pub.set_config() + self.process_notification(notification, pub) + + @classmethod + def load_notification(cls, args): + if args[0] == '-': + # get environment definition from stdin + return json.load(sys.stdin) + else: + return json.load(file(args[0])) + + @classmethod + def check_valid_notification(cls, notification): + return isinstance(notification, dict) \ + and '@type' in notification \ + and notification['@type'] in ['provision', 'deprovision'] \ + and 'objects' in notification \ + and 'audience' in notification \ + and isinstance(notification['audience'], list) \ + and isinstance(notification['objects'], list) + + @classmethod + def process_notification(cls, notification, publisher=None): + publisher = publisher or get_publisher() + action = notification['@type'] + audience = notification['audience'] + full = notification['full'] if 'full' in notification else False + + # Verify tenant is in audience + entity_id = get_cfg('sp', {}).get('saml2_providerid') + if not entity_id or entity_id not in audience: + return + + uuids = set() + # Now provision/deprovision + for o in notification['objects']: + t = o['@type'] + if t != 'role' \ + or 'uuid' not in o \ + or 'name' not in o \ + or 'description' not in o \ + or 'emails' not in o \ + or 'emails_to_members' not in o \ + or 'slug' not in o: + continue + uuid = o['uuid'].encode(publisher.site_charset) + uuids.add(uuid) + slug = o['slug'].encode(publisher.site_charset) + details = o.get('description', '').encode(publisher.site_charset) + name = o['name'].encode(publisher.site_charset) + emails = [email.encode(publisher.site_charset) for email in o['emails']] + emails_to_members = o['emails_to_members'] + # Find existing role + try: + role = Role.get_on_index(uuid, 'slug') + except KeyError: + try: + role = Role.get_on_index(slug, 'slug') + except KeyError: + # New role + if action != 'provision': + continue + role = Role(name=name) + if action == 'provision': + # Provision/rename + role.name = name + role.slug = uuid + role.emails = emails + role.details = details + role.emails_to_members = emails_to_members + role.store() + elif action == 'deprovision': + # Deprovision + role.remove_self() + # All roles have been sent + if full: + for role in Role.select(): + if role.slug not in uuids: + role.remove_self() + +CmdHoboNotify.register() -- 2.1.4