Projet

Général

Profil

0001-new-hobo_notify-command-to-handle-role-provisionning.patch

Benjamin Dauvergne, 15 septembre 2015 10:35

Télécharger (11,7 ko)

Voir les différences:

Subject: [PATCH 1/2] new hobo_notify command to handle role
 provisionning/deprovisionning messages (fixes #8219)

It gets wcs roles from uuid or slug of the authentic2 role; if role is
using a slug it's replaced by the uuid. It handles rename by using uuid
as the slug of roles.

Targetted tenants are identified through their SAML entity id.
 tests/test_hobo_notify.py | 168 ++++++++++++++++++++++++++++++++++++++++++++++
 wcs/ctl/hobo_notify.py    | 131 ++++++++++++++++++++++++++++++++++++
 2 files changed, 299 insertions(+)
 create mode 100644 tests/test_hobo_notify.py
 create mode 100644 wcs/ctl/hobo_notify.py
tests/test_hobo_notify.py
1
# -*- coding: utf-8 -*-
2
import shutil
3
from quixote import cleanup
4

  
5
from wcs.ctl.hobo_notify import CmdHoboNotify
6
from wcs.roles import Role
7

  
8
from utilities import create_temporary_pub
9

  
10
pub = None
11

  
12

  
13
def setup_module(module):
14
    cleanup()
15
    global pub
16
    pub = create_temporary_pub()
17
    pub.cfg['sp'] = {'saml2_providerid': 'test'}
18
    pub.write_cfg()
19

  
20

  
21
def teardown_module(module):
22
    shutil.rmtree(pub.APP_DIR)
23

  
24

  
25
def setup_function(function):
26
    r = Role(name='Service étt civil')
27
    r.slug = 'service-ett-civil'
28
    r.store()
29

  
30

  
31
def teardown_function(function):
32
    Role.wipe()
33

  
34

  
35
def test_process_notification_wrong_audience():
36
    notification = {
37
        '@type': u'provision',
38
        'audience': [u'coin'],
39
        'full': True,
40
        'objects': [
41
            {
42
                '@type': 'role',
43
                'name': u'Service enfance',
44
                'slug': u'service-enfance',
45
                'description': u'Rôle du service petite enfance',
46
                'uuid': u'12345',
47
                'emails': [u'petite-enfance@example.com'],
48
                'emails_to_members': False,
49
            },
50
            {
51
                '@type': 'role',
52
                'name': u'Service état civil',
53
                'slug': u'service-etat-civil',
54
                'description': u'Rôle du service état civil',
55
                'uuid': u'xyz',
56
                'emails': [u'etat-civil@example.com'],
57
                'emails_to_members': True,
58
            },
59
        ]
60
    }
61
    assert Role.count() == 1
62
    assert Role.select()[0].name == 'Service étt civil'
63
    assert Role.select()[0].slug == 'service-ett-civil'
64
    assert Role.select()[0].details is None
65
    assert Role.select()[0].emails is None
66
    assert Role.select()[0].emails_to_members is False
67
    CmdHoboNotify.process_notification(notification)
68
    assert Role.count() == 1
69
    assert Role.select()[0].name == 'Service étt civil'
70
    assert Role.select()[0].slug == 'service-ett-civil'
71
    assert Role.select()[0].details is None
72
    assert Role.select()[0].emails is None
73
    assert Role.select()[0].emails_to_members is False
74

  
75

  
76
def test_process_notification():
77
    notification = {
78
        '@type': u'provision',
79
        'audience': [u'test'],
80
        'full': True,
81
        'objects': [
82
            {
83
                '@type': 'role',
84
                'name': u'Service enfance',
85
                'slug': u'service-enfance',
86
                'description': u'Rôle du service petite enfance',
87
                'uuid': u'12345',
88
                'emails': [u'petite-enfance@example.com'],
89
                'emails_to_members': False,
90
            },
91
            {
92
                '@type': 'role',
93
                'name': u'Service état civil',
94
                'slug': u'service-ett-civil',
95
                'description': u'Rôle du service état civil',
96
                'uuid': u'xyz',
97
                'emails': [u'etat-civil@example.com'],
98
                'emails_to_members': True,
99
            },
100
        ]
101
    }
102
    assert Role.count() == 1
103
    assert Role.select()[0].name == 'Service étt civil'
104
    assert Role.select()[0].slug == 'service-ett-civil'
105
    assert Role.select()[0].details is None
106
    assert Role.select()[0].emails is None
107
    assert Role.select()[0].emails_to_members is False
108
    existing_role_id = Role.select()[0].id
109
    CmdHoboNotify.process_notification(notification)
110
    assert Role.count() == 2
111
    old_role = Role.get(existing_role_id)
112
    assert old_role.name == 'Service état civil'
113
    assert old_role.slug == 'xyz'
114
    assert old_role.details == 'Rôle du service état civil'
115
    assert old_role.emails == ['etat-civil@example.com']
116
    assert old_role.emails_to_members is True
117
    new_role = Role.get_on_index('12345', 'slug')
118
    assert new_role.name == 'Service enfance'
119
    assert new_role.details == 'Rôle du service petite enfance'
120
    assert new_role.emails == ['petite-enfance@example.com']
121
    assert new_role.emails_to_members is False
122
    notification = {
123
        '@type': u'provision',
124
        'audience': [u'test'],
125
        'full': True,
126
        'objects': [
127
            {
128
                '@type': 'role',
129
                'name': u'Service enfance',
130
                'slug': u'service-enfance',
131
                'description': u'Rôle du service petite enfance',
132
                'uuid': u'12345',
133
                'emails': [u'petite-enfance@example.com'],
134
                'emails_to_members': True,
135
            },
136
        ]
137
    }
138
    CmdHoboNotify.process_notification(notification)
139
    assert Role.count() == 1
140
    assert Role.select()[0].id == new_role.id
141
    assert Role.select()[0].name == 'Service enfance'
142
    assert Role.select()[0].slug == '12345'
143
    assert Role.select()[0].details == 'Rôle du service petite enfance'
144
    assert Role.select()[0].emails == ['petite-enfance@example.com']
145
    assert Role.select()[0].emails_to_members is True
146

  
147
def test_process_notification_deprovision():
148
    notification = {
149
        '@type': u'deprovision',
150
        'audience': [u'test'],
151
        'full': True,
152
        'objects': [
153
            {
154
                '@type': 'role',
155
                'name': u'Service état civil',
156
                'slug': u'service-ett-civil',
157
                'description': u'Rôle du service état civil',
158
                'uuid': u'xyz',
159
                'emails': [u'etat-civil@example.com'],
160
                'emails_to_members': True,
161
            },
162
        ]
163
    }
164
    assert Role.count() == 1
165
    assert Role.select()[0].name == 'Service étt civil'
166
    assert Role.select()[0].slug == 'service-ett-civil'
167
    CmdHoboNotify.process_notification(notification)
168
    assert Role.count() == 0
wcs/ctl/hobo_notify.py
1
# w.c.s. - web application for online forms
2
# Copyright (C) 2005-2014  Entr'ouvert
3
#
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU General Public License for more details.
13
#
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16

  
17
import os
18
import sys
19
import json
20

  
21
from quixote import get_publisher
22
from wcs.roles import Role
23
from qommon.ctl import Command
24
from qommon.publisher import get_cfg
25

  
26

  
27
class CmdHoboNotify(Command):
28
    name = 'hobo_notify'
29

  
30
    def execute(self, base_options, sub_options, args):
31
        self.base_options = base_options
32
        if sub_options.extra:
33
            if not self.config.has_section('extra'):
34
                self.config.add_section('extra')
35
            for i, extra in enumerate(sub_options.extra):
36
                self.config.set('extra', 'cmd_line_extra_%d' % i, extra)
37

  
38
        notification = self.load_notification(args)
39
        if not self.check_valid_notification(notification):
40
            sys.exit(1)
41
        import publisher
42

  
43
        publisher.WcsPublisher.configure(self.config)
44
        pub = publisher.WcsPublisher.create_publisher()
45
        global_app_dir = pub.app_dir
46
        for hostname in os.listdir(global_app_dir):
47
            app_dir = os.path.join(global_app_dir, hostname)
48
            if not os.path.isdir(app_dir):
49
                continue
50
            pub.app_dir = app_dir
51
            pub.set_config()
52
            self.process_notification(notification, pub)
53

  
54
    @classmethod
55
    def load_notification(cls, args):
56
        if args[0] == '-':
57
            # get environment definition from stdin
58
            return json.load(sys.stdin)
59
        else:
60
            return json.load(file(args[0]))
61

  
62
    @classmethod
63
    def check_valid_notification(cls, notification):
64
        return isinstance(notification, dict) \
65
            and '@type' in notification \
66
            and notification['@type'] in ['provision', 'deprovision'] \
67
            and 'objects' in notification \
68
            and 'audience' in notification \
69
            and isinstance(notification['audience'], list) \
70
            and isinstance(notification['objects'], list)
71

  
72
    @classmethod
73
    def process_notification(cls, notification, publisher=None):
74
        publisher = publisher or get_publisher()
75
        action = notification['@type']
76
        audience = notification['audience']
77
        full = notification['full'] if 'full' in notification else False
78

  
79
        # Verify tenant is in audience
80
        entity_id = get_cfg('sp', {}).get('saml2_providerid')
81
        if not entity_id or entity_id not in audience:
82
            return
83

  
84
        uuids = set()
85
        # Now provision/deprovision
86
        for o in notification['objects']:
87
            t = o['@type']
88
            if t != 'role' \
89
               or 'uuid' not in o \
90
               or 'name' not in o \
91
               or 'description' not in o \
92
               or 'emails' not in o \
93
               or 'emails_to_members' not in o \
94
               or 'slug' not in o:
95
                continue
96
            uuid = o['uuid'].encode(publisher.site_charset)
97
            uuids.add(uuid)
98
            slug = o['slug'].encode(publisher.site_charset)
99
            details = o.get('description', '').encode(publisher.site_charset)
100
            name = o['name'].encode(publisher.site_charset)
101
            emails = [email.encode(publisher.site_charset) for email in o['emails']]
102
            emails_to_members = o['emails_to_members']
103
            # Find existing role
104
            try:
105
                role = Role.get_on_index(uuid, 'slug')
106
            except KeyError:
107
                try:
108
                    role = Role.get_on_index(slug, 'slug')
109
                except KeyError:
110
                    # New role
111
                    if action != 'provision':
112
                        continue
113
                    role = Role(name=name)
114
            if action == 'provision':
115
                # Provision/rename
116
                role.name = name
117
                role.slug = uuid
118
                role.emails = emails
119
                role.details = details
120
                role.emails_to_members = emails_to_members
121
                role.store()
122
            elif action == 'deprovision':
123
                # Deprovision
124
                role.remove_self()
125
        # All roles have been sent
126
        if full:
127
            for role in Role.select():
128
                if role.slug not in uuids:
129
                    role.remove_self()
130

  
131
CmdHoboNotify.register()
0
-