Projet

Général

Profil

0001-new-notify-command-to-handle-role-provisionning-depr.patch

Benjamin Dauvergne, 09 septembre 2015 13:42

Télécharger (8,34 ko)

Voir les différences:

Subject: [PATCH] new notify command to handle role
 provisionning/deprovisionning messages (fixes #8219)

It gets wcs roles from uuid or slug of the authentic2 role; if role is
using a slug it's replaced by the uuid. It handles rename by using uuid
as the slug of roles.

Targetted tenants are identified through their SAML entity id.
 tests/test_notify.py | 105 +++++++++++++++++++++++++++++++++++++++++++++
 wcs/ctl/notify.py    | 117 +++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 222 insertions(+)
 create mode 100644 tests/test_notify.py
 create mode 100644 wcs/ctl/notify.py
tests/test_notify.py
1
# -*- coding: utf-8 -*-
2
import shutil
3
from quixote import cleanup
4

  
5
from wcs.ctl.notify import CmdNotify
6
from wcs.roles import Role
7

  
8
from utilities import create_temporary_pub
9

  
10
pub = None
11

  
12

  
13
def setup_module(module):
14
    cleanup()
15
    global pub
16
    pub = create_temporary_pub()
17
    pub.cfg['sp'] = {'saml2_providerid': 'test'}
18
    pub.write_cfg()
19
    r = Role(name='Service étt civil')
20
    r.slug = 'service-ett-civil'
21
    r.store()
22

  
23

  
24
def teardown_module(module):
25
    shutil.rmtree(pub.APP_DIR)
26

  
27

  
28
def test_process_notification_wrong_audience():
29
    notification = {
30
        '@type': u'provision',
31
        'audience': [u'coin'],
32
        'full': True,
33
        'objects': [
34
            {
35
                '@type': 'role',
36
                'name': u'Service enfance',
37
                'slug': u'service-enfance',
38
                'uuid': u'12345',
39
            },
40
            {
41
                '@type': 'role',
42
                'name': u'Service état civil',
43
                'slug': u'service-etat-civil',
44
                'uuid': u'12345',
45
            },
46
        ]
47
    }
48
    assert Role.count() == 1
49
    assert Role.select()[0].name == 'Service étt civil'
50
    assert Role.select()[0].slug == 'service-ett-civil'
51
    CmdNotify.process_notification(notification)
52
    assert Role.count() == 1
53
    assert Role.select()[0].name == 'Service étt civil'
54
    assert Role.select()[0].slug == 'service-ett-civil'
55

  
56

  
57
def test_process_notification():
58
    notification = {
59
        '@type': u'provision',
60
        'audience': [u'test'],
61
        'full': True,
62
        'objects': [
63
            {
64
                '@type': 'role',
65
                'name': u'Service enfance',
66
                'slug': u'service-enfance',
67
                'uuid': u'12345',
68
            },
69
            {
70
                '@type': 'role',
71
                'name': u'Service état civil',
72
                'slug': u'service-ett-civil',
73
                'uuid': u'xyz',
74
            },
75
        ]
76
    }
77
    assert Role.count() == 1
78
    assert Role.select()[0].name == 'Service étt civil'
79
    assert Role.select()[0].slug == 'service-ett-civil'
80
    existing_role_id = Role.select()[0].id
81
    CmdNotify.process_notification(notification)
82
    assert Role.count() == 2
83
    old_role = Role.get(existing_role_id)
84
    assert old_role.name == 'Service état civil'
85
    assert old_role.slug == 'xyz'
86
    new_role = Role.get_on_index('12345', 'slug')
87
    assert new_role.name == 'Service enfance'
88
    notification = {
89
        '@type': u'provision',
90
        'audience': [u'test'],
91
        'full': True,
92
        'objects': [
93
            {
94
                '@type': 'role',
95
                'name': u'Service enfance',
96
                'slug': u'service-enfance',
97
                'uuid': u'12345',
98
            },
99
        ]
100
    }
101
    CmdNotify.process_notification(notification)
102
    assert Role.count() == 1
103
    assert Role.select()[0].id == new_role.id
104
    assert Role.select()[0].name == 'Service enfance'
105
    assert Role.select()[0].slug == '12345'
wcs/ctl/notify.py
1
# w.c.s. - web application for online forms
2
# Copyright (C) 2005-2014  Entr'ouvert
3
#
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU General Public License for more details.
13
#
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16

  
17
import os
18
import sys
19
import json
20

  
21
from quixote import get_publisher
22
from wcs.roles import Role
23
from qommon.ctl import Command
24
from qommon.publisher import get_cfg
25

  
26

  
27
class CmdNotify(Command):
28
    name = 'notify'
29

  
30
    def execute(self, base_options, sub_options, args):
31
        self.base_options = base_options
32
        if sub_options.extra:
33
            if not self.config.has_section('extra'):
34
                self.config.add_section('extra')
35
            for i, extra in enumerate(sub_options.extra):
36
                self.config.set('extra', 'cmd_line_extra_%d' % i, extra)
37

  
38
        notification = self.load_notification(args)
39
        if not self.check_valid_notification(notification):
40
            sys.exit(1)
41
        import publisher
42

  
43
        publisher.WcsPublisher.configure(self.config)
44
        pub = publisher.WcsPublisher.create_publisher()
45
        global_app_dir = pub.app_dir
46
        for hostname in os.listdir(global_app_dir):
47
            pub.app_dir = os.path.join(global_app_dir, hostname)
48
            pub.set_config()
49
            self.process_notification(notification)
50

  
51
    @classmethod
52
    def load_notification(cls, args):
53
        if args[1] == '-':
54
            # get environment definition from stdin
55
            return json.load(sys.stdin)
56
        else:
57
            return json.load(file(args[1]))
58

  
59
    @classmethod
60
    def check_valid_notification(cls, notification):
61
        return isinstance(notification, dict) \
62
            and '@type' in notification \
63
            and notification['@type'] in ['provision', 'deprovision'] \
64
            and 'objects' in notification \
65
            and 'audience' in notification \
66
            and isinstance(notification['audience'], list) \
67
            and isinstance(notification['objects'], list)
68

  
69
    @classmethod
70
    def process_notification(cls, notification):
71
        publisher = get_publisher()
72
        action = notification['@type']
73
        audience = notification['audience']
74
        full = notification['full'] if 'full' in notification else False
75

  
76
        # Verify tenant is in audience
77
        entity_id = get_cfg('sp', {}).get('saml2_providerid')
78
        if not entity_id or entity_id not in audience:
79
            return
80

  
81
        uuids = set()
82
        # Now provision/deprovision
83
        for o in notification['objects']:
84
            t = o['@type']
85
            if t != 'role' \
86
               or 'uuid' not in o \
87
               or 'name' not in o \
88
               or 'slug' not in o:
89
                continue
90
            uuid = o['uuid'].encode(publisher.site_charset)
91
            uuids.add(uuid)
92
            slug = o['slug'].encode(publisher.site_charset)
93
            name = o['name'].encode(publisher.site_charset)
94
            # Find existing role
95
            try:
96
                role = Role.get_on_index(uuid, 'slug')
97
            except KeyError:
98
                try:
99
                    role = Role.get_on_index(slug, 'slug')
100
                except KeyError:
101
                    # New role
102
                    if action != 'provision':
103
                        continue
104
                    role = Role(name=name)
105
            if action == 'provision':
106
                # Provision/rename
107
                role.name = name
108
                role.slug = uuid
109
                role.store()
110
            elif action == 'deprovision':
111
                # Deprovision
112
                role.remove_self()
113
        # All roles have been sent
114
        if full:
115
            for role in Role.select():
116
                if role.slug not in uuids:
117
                    role.remove_self()
0
-