Projet

Général

Profil

0005-tests-add-test-to-agent-worker-40098.patch

Nicolas Roche, 27 février 2020 01:10

Télécharger (6,69 ko)

Voir les différences:

Subject: [PATCH 5/6] tests: add test to agent worker (#40098)

 tests_multitenant/test_agent_worker.py | 163 +++++++++++++++++++++++++
 1 file changed, 163 insertions(+)
 create mode 100644 tests_multitenant/test_agent_worker.py
tests_multitenant/test_agent_worker.py
1
# -*- coding: utf-8 -*-
2
import json
3
import mock
4
import os
5
import pytest
6

  
7
from hobo.agent.worker.services import deploy, notify
8
from hobo.agent.worker import settings
9

  
10

  
11
ENVIRONMENT = {
12
    u'timestamp': '2022-02-22',
13
    u'services': [
14
        {
15
            u'service-id': u'hobo',
16
            u'slug': u'hobo',
17
            u'title': u'Hobo primary',
18
            u'base_url': u'https://hobo1.dev.publik.love/',
19
            u'this': True,
20
            u'secret_key': u'1nesüper5Cr!eteKAaY~',
21
        },
22
        {
23
            u'service-id': u'combo',
24
            u'slug': u'portal',
25
            u'title': u'Compte citoyen',
26
            u'base_url': u'https://combo.dev.publik.love/',
27
            u'secret_key': u'1nesüper5Cr!eteKAaY~',
28
        },
29
        {
30
            u'service-id': u'wcs',
31
            u'slug': u'eservices',
32
            u'title': u'Démarches',
33
            u'base_url': u'https://wcs.dev.publik.love/',
34
            u'secret_key': u'1nesüper5Cr!eteKAaY~',
35
        },
36
        {
37
            u'service-id': u'hobo',
38
            u'slug': u'hobo',
39
            u'title': u'skip beacause of agent host patterns',
40
            u'base_url': u'https://hobo2.dev.publik.love/',
41
            u'secondary': True,
42
            u'secret_key': u'1nesüper5Cr!eteKAaY~',
43
        },
44
        {
45
            u'service-id': u'passerelle',
46
            u'slug': u'passerelle',
47
            u'title': u'skipped because no secret_key',
48
            u'base_url': u'https://passerelle.dev.publik.love/',
49
        },
50
        {
51
            u'service-id': u'unknown',
52
            u'title': u'skipped because unknown service-id',
53
        }
54
    ]}
55

  
56
LOCAL_SETTINGS = """
57
AGENT_HOST_PATTERNS = {
58
    'wcs': ['*.dev.publik.love'],
59
    'hobo': ['*hobo1*', '!*hobo2*'],
60
}
61
"""
62

  
63
NOTIFICATION = {
64
    '@type': 'provision',
65
    'audience': ['https://combo.dev.publik.love/'],
66
    'objects': {
67
        '@type': 'role',
68
        'data': [
69
            {
70
                'uuid': '12345',
71
                'name': 'Service petite enfance',
72
                'slug': 'service-petite-enfance',
73
                'description': 'Role du service petite enfance',
74
            }]}}
75

  
76

  
77
@pytest.fixture
78
def local_settings(tmpdir):
79
    path = os.path.join(str(tmpdir), 'local_settings.py')
80
    open(path, 'w').write(LOCAL_SETTINGS)
81
    os.environ['HOBO_AGENT_SETTINGS_FILE'] = path
82

  
83
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=True)
84
@mock.patch('hobo.agent.worker.services.subprocess')
85
def test_deploy(mocked_subprocess, mocked_exists):
86
    mocked_communicate = mock.Mock(return_value=('', ''))
87
    mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
88
    mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
89
    deploy(ENVIRONMENT)
90

  
91
    # process called
92
    mock_calls = set(x[1][0] for x in mocked_subprocess.Popen.mock_calls)
93
    assert len(mock_calls) == 4
94
    assert '/usr/bin/hobo-manage hobo_deploy https://hobo1.dev.publik.love/ -' in mock_calls
95
    assert '/usr/bin/wcsctl hobo_deploy https://wcs.dev.publik.love/ -' in mock_calls
96
    assert '/usr/lib/combo/manage.py hobo_deploy https://combo.dev.publik.love/ -' in mock_calls
97
    assert '/usr/bin/hobo-manage hobo_deploy https://hobo2.dev.publik.love/ -' in mock_calls
98

  
99
    # environment sent
100
    mock_calls = mocked_communicate.mock_calls
101
    assert len(mock_calls) == 4
102
    for i in range(0, len(mock_calls)):
103
        assert json.loads(mock_calls[0][2]['input']) == ENVIRONMENT
104

  
105
    mocked_opened.returncode = 1
106
    with pytest.raises(RuntimeError, match='failed: '):
107
        deploy(ENVIRONMENT)
108

  
109
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=True)
110
@mock.patch('hobo.agent.worker.services.subprocess')
111
def test_deploy_host_with_agent_patterns(mocked_subprocess, mocked_exists, local_settings):
112
    mocked_communicate = mock.Mock(return_value=('', ''))
113
    mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
114
    mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
115

  
116
    reload(settings)
117
    deploy(ENVIRONMENT)
118
    # process called
119
    mock_calls = set(x[1][0] for x in mocked_subprocess.Popen.mock_calls)
120
    assert len(mock_calls) == 3
121
    assert '/usr/bin/hobo-manage hobo_deploy https://hobo1.dev.publik.love/ -' in mock_calls
122
    assert '/usr/bin/wcsctl hobo_deploy https://wcs.dev.publik.love/ -' in mock_calls
123
    assert '/usr/lib/combo/manage.py hobo_deploy https://combo.dev.publik.love/ -' in mock_calls
124

  
125
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=True)
126
@mock.patch('hobo.agent.worker.services.os.listdir')
127
@mock.patch('hobo.agent.worker.services.subprocess')
128
def test_notify(mocked_subprocess, mocked_listdir, mocked_exists):
129
    def listdir(path):
130
        return '%s.dev.publik.love' % path.split('/')[3]
131
    mocked_listdir.side_effect = listdir
132
    mocked_communicate = mock.Mock(return_value=('', ''))
133
    mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
134
    mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
135
    notify(NOTIFICATION)
136

  
137
    # process called
138
    #assert mocked_subprocess.Popen.mock_calls == 'toto'
139

  
140
    # notification sent
141
    mock_calls = mocked_communicate.mock_calls
142
    assert len(mock_calls) == 1
143
    assert json.loads(mock_calls[0][2]['input'])['objects']['data'][0]['uuid'] == '12345'
144

  
145
    mocked_opened.returncode = 1
146
    with pytest.raises(RuntimeError, match='failed: '):
147
        notify(NOTIFICATION)
148

  
149
    # ignore subprocess error
150
    mocked_communicate.reset_mock()
151
    mocked_subprocess.Popen.side_effect = OSError
152
    notify(NOTIFICATION)
153

  
154
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=False)
155
@mock.patch('hobo.agent.worker.services.subprocess')
156
def test_notify_no_tenant_path(mocked_subprocess, mocked_exists):
157
    mocked_communicate = mock.Mock(return_value=('', ''))
158
    mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
159
    mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
160

  
161
    notify(NOTIFICATION)
162
    mock_calls = mocked_communicate.mock_calls
163
    assert len(mock_calls) == 0
0
-