Projet

Général

Profil

0001-new-hobo_notify-command-to-handle-role-provisionning.patch

Benjamin Dauvergne, 15 septembre 2015 10:08

Télécharger (9,64 ko)

Voir les différences:

Subject: [PATCH 1/2] new hobo_notify command to handle role
 provisionning/deprovisionning messages (fixes #8219)

It gets wcs roles from uuid or slug of the authentic2 role; if role is
using a slug it's replaced by the uuid. It handles rename by using uuid
as the slug of roles.

Targetted tenants are identified through their SAML entity id.
 tests/test_hobo_notify.py | 132 ++++++++++++++++++++++++++++++++++++++++++++++
 wcs/ctl/hobo_notify.py    | 129 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 261 insertions(+)
 create mode 100644 tests/test_hobo_notify.py
 create mode 100644 wcs/ctl/hobo_notify.py
tests/test_hobo_notify.py
1
# -*- coding: utf-8 -*-
2
import shutil
3
from quixote import cleanup
4

  
5
from wcs.ctl.hobo_notify import CmdHoboNotify
6
from wcs.roles import Role
7

  
8
from utilities import create_temporary_pub
9

  
10
pub = None
11

  
12

  
13
def setup_module(module):
14
    cleanup()
15
    global pub
16
    pub = create_temporary_pub()
17
    pub.cfg['sp'] = {'saml2_providerid': 'test'}
18
    pub.write_cfg()
19

  
20

  
21
def teardown_module(module):
22
    shutil.rmtree(pub.APP_DIR)
23

  
24

  
25
def setup_function(function):
26
    r = Role(name='Service étt civil')
27
    r.slug = 'service-ett-civil'
28
    r.store()
29

  
30

  
31
def teardown_function(function):
32
    Role.wipe()
33

  
34

  
35
def test_process_notification_wrong_audience():
36
    notification = {
37
        '@type': u'provision',
38
        'audience': [u'coin'],
39
        'full': True,
40
        'objects': [
41
            {
42
                '@type': 'role',
43
                'name': u'Service enfance',
44
                'slug': u'service-enfance',
45
                'uuid': u'12345',
46
            },
47
            {
48
                '@type': 'role',
49
                'name': u'Service état civil',
50
                'slug': u'service-etat-civil',
51
                'uuid': u'xyz',
52
            },
53
        ]
54
    }
55
    assert Role.count() == 1
56
    assert Role.select()[0].name == 'Service étt civil'
57
    assert Role.select()[0].slug == 'service-ett-civil'
58
    CmdHoboNotify.process_notification(notification)
59
    assert Role.count() == 1
60
    assert Role.select()[0].name == 'Service étt civil'
61
    assert Role.select()[0].slug == 'service-ett-civil'
62

  
63

  
64
def test_process_notification():
65
    notification = {
66
        '@type': u'provision',
67
        'audience': [u'test'],
68
        'full': True,
69
        'objects': [
70
            {
71
                '@type': 'role',
72
                'name': u'Service enfance',
73
                'slug': u'service-enfance',
74
                'uuid': u'12345',
75
            },
76
            {
77
                '@type': 'role',
78
                'name': u'Service état civil',
79
                'slug': u'service-ett-civil',
80
                'uuid': u'xyz',
81
            },
82
        ]
83
    }
84
    assert Role.count() == 1
85
    assert Role.select()[0].name == 'Service étt civil'
86
    assert Role.select()[0].slug == 'service-ett-civil'
87
    existing_role_id = Role.select()[0].id
88
    CmdHoboNotify.process_notification(notification)
89
    assert Role.count() == 2
90
    old_role = Role.get(existing_role_id)
91
    assert old_role.name == 'Service état civil'
92
    assert old_role.slug == 'xyz'
93
    new_role = Role.get_on_index('12345', 'slug')
94
    assert new_role.name == 'Service enfance'
95
    notification = {
96
        '@type': u'provision',
97
        'audience': [u'test'],
98
        'full': True,
99
        'objects': [
100
            {
101
                '@type': 'role',
102
                'name': u'Service enfance',
103
                'slug': u'service-enfance',
104
                'uuid': u'12345',
105
            },
106
        ]
107
    }
108
    CmdHoboNotify.process_notification(notification)
109
    assert Role.count() == 1
110
    assert Role.select()[0].id == new_role.id
111
    assert Role.select()[0].name == 'Service enfance'
112
    assert Role.select()[0].slug == '12345'
113

  
114
def test_process_notification_deprovision():
115
    notification = {
116
        '@type': u'deprovision',
117
        'audience': [u'test'],
118
        'full': True,
119
        'objects': [
120
            {
121
                '@type': 'role',
122
                'name': u'Service état civil',
123
                'slug': u'service-ett-civil',
124
                'uuid': u'xyz',
125
            },
126
        ]
127
    }
128
    assert Role.count() == 1
129
    assert Role.select()[0].name == 'Service étt civil'
130
    assert Role.select()[0].slug == 'service-ett-civil'
131
    CmdHoboNotify.process_notification(notification)
132
    assert Role.count() == 0
wcs/ctl/hobo_notify.py
1
# w.c.s. - web application for online forms
2
# Copyright (C) 2005-2014  Entr'ouvert
3
#
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU General Public License for more details.
13
#
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16

  
17
import os
18
import sys
19
import json
20

  
21
from quixote import get_publisher
22
from wcs.roles import Role
23
from qommon.ctl import Command
24
from qommon.publisher import get_cfg
25

  
26

  
27
class CmdHoboNotify(Command):
28
    name = 'hobo_notify'
29

  
30
    def execute(self, base_options, sub_options, args):
31
        self.base_options = base_options
32
        if sub_options.extra:
33
            if not self.config.has_section('extra'):
34
                self.config.add_section('extra')
35
            for i, extra in enumerate(sub_options.extra):
36
                self.config.set('extra', 'cmd_line_extra_%d' % i, extra)
37

  
38
        notification = self.load_notification(args)
39
        if not self.check_valid_notification(notification):
40
            sys.exit(1)
41
        import publisher
42

  
43
        publisher.WcsPublisher.configure(self.config)
44
        pub = publisher.WcsPublisher.create_publisher()
45
        global_app_dir = pub.app_dir
46
        for hostname in os.listdir(global_app_dir):
47
            app_dir = os.path.join(global_app_dir, hostname)
48
            if not os.path.isdir(app_dir):
49
                continue
50
            pub.app_dir = app_dir
51
            pub.set_config()
52
            self.process_notification(notification, pub)
53

  
54
    @classmethod
55
    def load_notification(cls, args):
56
        if args[0] == '-':
57
            # get environment definition from stdin
58
            return json.load(sys.stdin)
59
        else:
60
            return json.load(file(args[0]))
61

  
62
    @classmethod
63
    def check_valid_notification(cls, notification):
64
        return isinstance(notification, dict) \
65
            and '@type' in notification \
66
            and notification['@type'] in ['provision', 'deprovision'] \
67
            and 'objects' in notification \
68
            and 'audience' in notification \
69
            and isinstance(notification['audience'], list) \
70
            and isinstance(notification['objects'], list)
71

  
72
    @classmethod
73
    def process_notification(cls, notification, publisher=None):
74
        publisher = publisher or get_publisher()
75
        action = notification['@type']
76
        audience = notification['audience']
77
        full = notification['full'] if 'full' in notification else False
78

  
79
        # Verify tenant is in audience
80
        entity_id = get_cfg('sp', {}).get('saml2_providerid')
81
        if not entity_id or entity_id not in audience:
82
            return
83

  
84
        uuids = set()
85
        # Now provision/deprovision
86
        for o in notification['objects']:
87
            t = o['@type']
88
            if t != 'role' \
89
               or 'uuid' not in o \
90
               or 'name' not in o \
91
               or 'description' not in o \
92
               or 'emails' not in o \
93
               or 'emails_to_members' not in o \
94
               or 'slug' not in o:
95
                continue
96
            uuid = o['uuid'].encode(publisher.site_charset)
97
            uuids.add(uuid)
98
            slug = o['slug'].encode(publisher.site_charset)
99
            name = o['name'].encode(publisher.site_charset)
100
            emails = [email.encode(publisher.site_charset) for email in o['emails']]
101
            emails_to_members = o['emails_to_members']
102
            # Find existing role
103
            try:
104
                role = Role.get_on_index(uuid, 'slug')
105
            except KeyError:
106
                try:
107
                    role = Role.get_on_index(slug, 'slug')
108
                except KeyError:
109
                    # New role
110
                    if action != 'provision':
111
                        continue
112
                    role = Role(name=name)
113
            if action == 'provision':
114
                # Provision/rename
115
                role.name = name
116
                role.slug = uuid
117
                role.emails = emails
118
                role.emails_to_members = emails_to_members
119
                role.store()
120
            elif action == 'deprovision':
121
                # Deprovision
122
                role.remove_self()
123
        # All roles have been sent
124
        if full:
125
            for role in Role.select():
126
                if role.slug not in uuids:
127
                    role.remove_self()
128

  
129
CmdHoboNotify.register()
0
-