0001-new-hobo_notify-command-to-handle-role-provisionning.patch
tests/test_hobo_notify.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
import shutil |
|
3 |
from quixote import cleanup |
|
4 | ||
5 |
from wcs.ctl.hobo_notify import CmdHoboNotify |
|
6 |
from wcs.roles import Role |
|
7 | ||
8 |
from utilities import create_temporary_pub |
|
9 | ||
10 |
pub = None |
|
11 | ||
12 | ||
13 |
def setup_module(module): |
|
14 |
cleanup() |
|
15 |
global pub |
|
16 |
pub = create_temporary_pub() |
|
17 |
pub.cfg['sp'] = {'saml2_providerid': 'test'} |
|
18 |
pub.write_cfg() |
|
19 | ||
20 | ||
21 |
def teardown_module(module): |
|
22 |
shutil.rmtree(pub.APP_DIR) |
|
23 | ||
24 | ||
25 |
def setup_function(function): |
|
26 |
r = Role(name='Service étt civil') |
|
27 |
r.slug = 'service-ett-civil' |
|
28 |
r.store() |
|
29 | ||
30 | ||
31 |
def teardown_function(function): |
|
32 |
Role.wipe() |
|
33 | ||
34 | ||
35 |
def test_process_notification_wrong_audience(): |
|
36 |
notification = { |
|
37 |
'@type': u'provision', |
|
38 |
'audience': [u'coin'], |
|
39 |
'full': True, |
|
40 |
'objects': [ |
|
41 |
{ |
|
42 |
'@type': 'role', |
|
43 |
'name': u'Service enfance', |
|
44 |
'slug': u'service-enfance', |
|
45 |
'uuid': u'12345', |
|
46 |
}, |
|
47 |
{ |
|
48 |
'@type': 'role', |
|
49 |
'name': u'Service état civil', |
|
50 |
'slug': u'service-etat-civil', |
|
51 |
'uuid': u'xyz', |
|
52 |
}, |
|
53 |
] |
|
54 |
} |
|
55 |
assert Role.count() == 1 |
|
56 |
assert Role.select()[0].name == 'Service étt civil' |
|
57 |
assert Role.select()[0].slug == 'service-ett-civil' |
|
58 |
CmdHoboNotify.process_notification(notification) |
|
59 |
assert Role.count() == 1 |
|
60 |
assert Role.select()[0].name == 'Service étt civil' |
|
61 |
assert Role.select()[0].slug == 'service-ett-civil' |
|
62 | ||
63 | ||
64 |
def test_process_notification(): |
|
65 |
notification = { |
|
66 |
'@type': u'provision', |
|
67 |
'audience': [u'test'], |
|
68 |
'full': True, |
|
69 |
'objects': [ |
|
70 |
{ |
|
71 |
'@type': 'role', |
|
72 |
'name': u'Service enfance', |
|
73 |
'slug': u'service-enfance', |
|
74 |
'uuid': u'12345', |
|
75 |
}, |
|
76 |
{ |
|
77 |
'@type': 'role', |
|
78 |
'name': u'Service état civil', |
|
79 |
'slug': u'service-ett-civil', |
|
80 |
'uuid': u'xyz', |
|
81 |
}, |
|
82 |
] |
|
83 |
} |
|
84 |
assert Role.count() == 1 |
|
85 |
assert Role.select()[0].name == 'Service étt civil' |
|
86 |
assert Role.select()[0].slug == 'service-ett-civil' |
|
87 |
existing_role_id = Role.select()[0].id |
|
88 |
CmdHoboNotify.process_notification(notification) |
|
89 |
assert Role.count() == 2 |
|
90 |
old_role = Role.get(existing_role_id) |
|
91 |
assert old_role.name == 'Service état civil' |
|
92 |
assert old_role.slug == 'xyz' |
|
93 |
new_role = Role.get_on_index('12345', 'slug') |
|
94 |
assert new_role.name == 'Service enfance' |
|
95 |
notification = { |
|
96 |
'@type': u'provision', |
|
97 |
'audience': [u'test'], |
|
98 |
'full': True, |
|
99 |
'objects': [ |
|
100 |
{ |
|
101 |
'@type': 'role', |
|
102 |
'name': u'Service enfance', |
|
103 |
'slug': u'service-enfance', |
|
104 |
'uuid': u'12345', |
|
105 |
}, |
|
106 |
] |
|
107 |
} |
|
108 |
CmdHoboNotify.process_notification(notification) |
|
109 |
assert Role.count() == 1 |
|
110 |
assert Role.select()[0].id == new_role.id |
|
111 |
assert Role.select()[0].name == 'Service enfance' |
|
112 |
assert Role.select()[0].slug == '12345' |
|
113 | ||
114 |
def test_process_notification_deprovision(): |
|
115 |
notification = { |
|
116 |
'@type': u'deprovision', |
|
117 |
'audience': [u'test'], |
|
118 |
'full': True, |
|
119 |
'objects': [ |
|
120 |
{ |
|
121 |
'@type': 'role', |
|
122 |
'name': u'Service état civil', |
|
123 |
'slug': u'service-ett-civil', |
|
124 |
'uuid': u'xyz', |
|
125 |
}, |
|
126 |
] |
|
127 |
} |
|
128 |
assert Role.count() == 1 |
|
129 |
assert Role.select()[0].name == 'Service étt civil' |
|
130 |
assert Role.select()[0].slug == 'service-ett-civil' |
|
131 |
CmdHoboNotify.process_notification(notification) |
|
132 |
assert Role.count() == 0 |
wcs/ctl/hobo_notify.py | ||
---|---|---|
1 |
# w.c.s. - web application for online forms |
|
2 |
# Copyright (C) 2005-2014 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software; you can redistribute it and/or modify |
|
5 |
# it under the terms of the GNU General Public License as published by |
|
6 |
# the Free Software Foundation; either version 2 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU General Public License |
|
15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import os |
|
18 |
import sys |
|
19 |
import json |
|
20 | ||
21 |
from quixote import get_publisher |
|
22 |
from wcs.roles import Role |
|
23 |
from qommon.ctl import Command |
|
24 |
from qommon.publisher import get_cfg |
|
25 | ||
26 | ||
27 |
class CmdHoboNotify(Command): |
|
28 |
name = 'hobo_notify' |
|
29 | ||
30 |
def execute(self, base_options, sub_options, args): |
|
31 |
self.base_options = base_options |
|
32 |
if sub_options.extra: |
|
33 |
if not self.config.has_section('extra'): |
|
34 |
self.config.add_section('extra') |
|
35 |
for i, extra in enumerate(sub_options.extra): |
|
36 |
self.config.set('extra', 'cmd_line_extra_%d' % i, extra) |
|
37 | ||
38 |
notification = self.load_notification(args) |
|
39 |
if not self.check_valid_notification(notification): |
|
40 |
sys.exit(1) |
|
41 |
import publisher |
|
42 | ||
43 |
publisher.WcsPublisher.configure(self.config) |
|
44 |
pub = publisher.WcsPublisher.create_publisher() |
|
45 |
global_app_dir = pub.app_dir |
|
46 |
for hostname in os.listdir(global_app_dir): |
|
47 |
app_dir = os.path.join(global_app_dir, hostname) |
|
48 |
if not os.path.isdir(app_dir): |
|
49 |
continue |
|
50 |
pub.app_dir = app_dir |
|
51 |
pub.set_config() |
|
52 |
self.process_notification(notification, pub) |
|
53 | ||
54 |
@classmethod |
|
55 |
def load_notification(cls, args): |
|
56 |
if args[0] == '-': |
|
57 |
# get environment definition from stdin |
|
58 |
return json.load(sys.stdin) |
|
59 |
else: |
|
60 |
return json.load(file(args[0])) |
|
61 | ||
62 |
@classmethod |
|
63 |
def check_valid_notification(cls, notification): |
|
64 |
return isinstance(notification, dict) \ |
|
65 |
and '@type' in notification \ |
|
66 |
and notification['@type'] in ['provision', 'deprovision'] \ |
|
67 |
and 'objects' in notification \ |
|
68 |
and 'audience' in notification \ |
|
69 |
and isinstance(notification['audience'], list) \ |
|
70 |
and isinstance(notification['objects'], list) |
|
71 | ||
72 |
@classmethod |
|
73 |
def process_notification(cls, notification, publisher=None): |
|
74 |
publisher = publisher or get_publisher() |
|
75 |
action = notification['@type'] |
|
76 |
audience = notification['audience'] |
|
77 |
full = notification['full'] if 'full' in notification else False |
|
78 | ||
79 |
# Verify tenant is in audience |
|
80 |
entity_id = get_cfg('sp', {}).get('saml2_providerid') |
|
81 |
if not entity_id or entity_id not in audience: |
|
82 |
return |
|
83 | ||
84 |
uuids = set() |
|
85 |
# Now provision/deprovision |
|
86 |
for o in notification['objects']: |
|
87 |
t = o['@type'] |
|
88 |
if t != 'role' \ |
|
89 |
or 'uuid' not in o \ |
|
90 |
or 'name' not in o \ |
|
91 |
or 'description' not in o \ |
|
92 |
or 'emails' not in o \ |
|
93 |
or 'emails_to_members' not in o \ |
|
94 |
or 'slug' not in o: |
|
95 |
continue |
|
96 |
uuid = o['uuid'].encode(publisher.site_charset) |
|
97 |
uuids.add(uuid) |
|
98 |
slug = o['slug'].encode(publisher.site_charset) |
|
99 |
name = o['name'].encode(publisher.site_charset) |
|
100 |
emails = [email.encode(publisher.site_charset) for email in o['emails']] |
|
101 |
emails_to_members = o['emails_to_members'] |
|
102 |
# Find existing role |
|
103 |
try: |
|
104 |
role = Role.get_on_index(uuid, 'slug') |
|
105 |
except KeyError: |
|
106 |
try: |
|
107 |
role = Role.get_on_index(slug, 'slug') |
|
108 |
except KeyError: |
|
109 |
# New role |
|
110 |
if action != 'provision': |
|
111 |
continue |
|
112 |
role = Role(name=name) |
|
113 |
if action == 'provision': |
|
114 |
# Provision/rename |
|
115 |
role.name = name |
|
116 |
role.slug = uuid |
|
117 |
role.emails = emails |
|
118 |
role.emails_to_members = emails_to_members |
|
119 |
role.store() |
|
120 |
elif action == 'deprovision': |
|
121 |
# Deprovision |
|
122 |
role.remove_self() |
|
123 |
# All roles have been sent |
|
124 |
if full: |
|
125 |
for role in Role.select(): |
|
126 |
if role.slug not in uuids: |
|
127 |
role.remove_self() |
|
128 | ||
129 |
CmdHoboNotify.register() |
|
0 |
- |