0001-new-notify-command-to-handle-role-provisionning-depr.patch
tests/test_notify.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
import shutil |
|
3 |
from quixote import cleanup |
|
4 | ||
5 |
from wcs.ctl.notify import CmdNotify |
|
6 |
from wcs.roles import Role |
|
7 | ||
8 |
from utilities import create_temporary_pub |
|
9 | ||
10 |
pub = None |
|
11 | ||
12 | ||
13 |
def setup_module(module): |
|
14 |
cleanup() |
|
15 |
global pub |
|
16 |
pub = create_temporary_pub() |
|
17 |
pub.cfg['sp'] = {'saml2_providerid': 'test'} |
|
18 |
pub.write_cfg() |
|
19 |
r = Role(name='Service étt civil') |
|
20 |
r.slug = 'service-ett-civil' |
|
21 |
r.store() |
|
22 | ||
23 | ||
24 |
def teardown_module(module): |
|
25 |
shutil.rmtree(pub.APP_DIR) |
|
26 | ||
27 | ||
28 |
def test_process_notification_wrong_audience(): |
|
29 |
notification = { |
|
30 |
'@type': u'provision', |
|
31 |
'audience': [u'coin'], |
|
32 |
'full': True, |
|
33 |
'objects': [ |
|
34 |
{ |
|
35 |
'@type': 'role', |
|
36 |
'name': u'Service enfance', |
|
37 |
'slug': u'service-enfance', |
|
38 |
'uuid': u'12345', |
|
39 |
}, |
|
40 |
{ |
|
41 |
'@type': 'role', |
|
42 |
'name': u'Service état civil', |
|
43 |
'slug': u'service-etat-civil', |
|
44 |
'uuid': u'12345', |
|
45 |
}, |
|
46 |
] |
|
47 |
} |
|
48 |
assert Role.count() == 1 |
|
49 |
assert Role.select()[0].name == 'Service étt civil' |
|
50 |
assert Role.select()[0].slug == 'service-ett-civil' |
|
51 |
CmdNotify.process_notification(notification) |
|
52 |
assert Role.count() == 1 |
|
53 |
assert Role.select()[0].name == 'Service étt civil' |
|
54 |
assert Role.select()[0].slug == 'service-ett-civil' |
|
55 | ||
56 | ||
57 |
def test_process_notification(): |
|
58 |
notification = { |
|
59 |
'@type': u'provision', |
|
60 |
'audience': [u'test'], |
|
61 |
'full': True, |
|
62 |
'objects': [ |
|
63 |
{ |
|
64 |
'@type': 'role', |
|
65 |
'name': u'Service enfance', |
|
66 |
'slug': u'service-enfance', |
|
67 |
'uuid': u'12345', |
|
68 |
}, |
|
69 |
{ |
|
70 |
'@type': 'role', |
|
71 |
'name': u'Service état civil', |
|
72 |
'slug': u'service-ett-civil', |
|
73 |
'uuid': u'xyz', |
|
74 |
}, |
|
75 |
] |
|
76 |
} |
|
77 |
assert Role.count() == 1 |
|
78 |
assert Role.select()[0].name == 'Service étt civil' |
|
79 |
assert Role.select()[0].slug == 'service-ett-civil' |
|
80 |
existing_role_id = Role.select()[0].id |
|
81 |
CmdNotify.process_notification(notification) |
|
82 |
assert Role.count() == 2 |
|
83 |
old_role = Role.get(existing_role_id) |
|
84 |
assert old_role.name == 'Service état civil' |
|
85 |
assert old_role.slug == 'xyz' |
|
86 |
new_role = Role.get_on_index('12345', 'slug') |
|
87 |
assert new_role.name == 'Service enfance' |
|
88 |
notification = { |
|
89 |
'@type': u'provision', |
|
90 |
'audience': [u'test'], |
|
91 |
'full': True, |
|
92 |
'objects': [ |
|
93 |
{ |
|
94 |
'@type': 'role', |
|
95 |
'name': u'Service enfance', |
|
96 |
'slug': u'service-enfance', |
|
97 |
'uuid': u'12345', |
|
98 |
}, |
|
99 |
] |
|
100 |
} |
|
101 |
CmdNotify.process_notification(notification) |
|
102 |
assert Role.count() == 1 |
|
103 |
assert Role.select()[0].id == new_role.id |
|
104 |
assert Role.select()[0].name == 'Service enfance' |
|
105 |
assert Role.select()[0].slug == '12345' |
wcs/ctl/notify.py | ||
---|---|---|
1 |
# w.c.s. - web application for online forms |
|
2 |
# Copyright (C) 2005-2014 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software; you can redistribute it and/or modify |
|
5 |
# it under the terms of the GNU General Public License as published by |
|
6 |
# the Free Software Foundation; either version 2 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU General Public License |
|
15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
import os |
|
18 |
import sys |
|
19 |
import json |
|
20 | ||
21 |
from quixote import get_publisher |
|
22 |
from wcs.roles import Role |
|
23 |
from qommon.ctl import Command |
|
24 |
from qommon.publisher import get_cfg |
|
25 | ||
26 | ||
27 |
class CmdNotify(Command): |
|
28 |
name = 'notify' |
|
29 | ||
30 |
def execute(self, base_options, sub_options, args): |
|
31 |
self.base_options = base_options |
|
32 |
if sub_options.extra: |
|
33 |
if not self.config.has_section('extra'): |
|
34 |
self.config.add_section('extra') |
|
35 |
for i, extra in enumerate(sub_options.extra): |
|
36 |
self.config.set('extra', 'cmd_line_extra_%d' % i, extra) |
|
37 | ||
38 |
notification = self.load_notification(args) |
|
39 |
if not self.check_valid_notification(notification): |
|
40 |
sys.exit(1) |
|
41 |
import publisher |
|
42 | ||
43 |
publisher.WcsPublisher.configure(self.config) |
|
44 |
pub = publisher.WcsPublisher.create_publisher() |
|
45 |
global_app_dir = pub.app_dir |
|
46 |
for hostname in os.listdir(global_app_dir): |
|
47 |
pub.app_dir = os.path.join(global_app_dir, hostname) |
|
48 |
pub.set_config() |
|
49 |
self.process_notification(notification) |
|
50 | ||
51 |
@classmethod |
|
52 |
def load_notification(cls, args): |
|
53 |
if args[1] == '-': |
|
54 |
# get environment definition from stdin |
|
55 |
return json.load(sys.stdin) |
|
56 |
else: |
|
57 |
return json.load(file(args[1])) |
|
58 | ||
59 |
@classmethod |
|
60 |
def check_valid_notification(cls, notification): |
|
61 |
return isinstance(notification, dict) \ |
|
62 |
and '@type' in notification \ |
|
63 |
and notification['@type'] in ['provision', 'deprovision'] \ |
|
64 |
and 'objects' in notification \ |
|
65 |
and 'audience' in notification \ |
|
66 |
and isinstance(notification['audience'], list) \ |
|
67 |
and isinstance(notification['objects'], list) |
|
68 | ||
69 |
@classmethod |
|
70 |
def process_notification(cls, notification): |
|
71 |
publisher = get_publisher() |
|
72 |
action = notification['@type'] |
|
73 |
audience = notification['audience'] |
|
74 |
full = notification['full'] if 'full' in notification else False |
|
75 | ||
76 |
# Verify tenant is in audience |
|
77 |
entity_id = get_cfg('sp', {}).get('saml2_providerid') |
|
78 |
if not entity_id or entity_id not in audience: |
|
79 |
return |
|
80 | ||
81 |
uuids = set() |
|
82 |
# Now provision/deprovision |
|
83 |
for o in notification['objects']: |
|
84 |
t = o['@type'] |
|
85 |
if t != 'role' \ |
|
86 |
or 'uuid' not in o \ |
|
87 |
or 'name' not in o \ |
|
88 |
or 'slug' not in o: |
|
89 |
continue |
|
90 |
uuid = o['uuid'].encode(publisher.site_charset) |
|
91 |
uuids.add(uuid) |
|
92 |
slug = o['slug'].encode(publisher.site_charset) |
|
93 |
name = o['name'].encode(publisher.site_charset) |
|
94 |
# Find existing role |
|
95 |
try: |
|
96 |
role = Role.get_on_index(uuid, 'slug') |
|
97 |
except KeyError: |
|
98 |
try: |
|
99 |
role = Role.get_on_index(slug, 'slug') |
|
100 |
except KeyError: |
|
101 |
# New role |
|
102 |
if action != 'provision': |
|
103 |
continue |
|
104 |
role = Role(name=name) |
|
105 |
if action == 'provision': |
|
106 |
# Provision/rename |
|
107 |
role.name = name |
|
108 |
role.slug = uuid |
|
109 |
role.store() |
|
110 |
elif action == 'deprovision': |
|
111 |
# Deprovision |
|
112 |
role.remove_self() |
|
113 |
# All roles have been sent |
|
114 |
if full: |
|
115 |
for role in Role.select(): |
|
116 |
if role.slug not in uuids: |
|
117 |
role.remove_self() |
|
0 |
- |