Projet

Général

Profil

0005-tests-add-test-to-agent-worker-40098.patch

Nicolas Roche, 28 février 2020 12:37

Télécharger (6,87 ko)

Voir les différences:

Subject: [PATCH 5/6] tests: add test to agent worker (#40098)

 tests_multitenant/test_agent_worker.py | 169 +++++++++++++++++++++++++
 1 file changed, 169 insertions(+)
 create mode 100644 tests_multitenant/test_agent_worker.py
tests_multitenant/test_agent_worker.py
1
# -*- coding: utf-8 -*-
2
import importlib
3
import json
4
import mock
5
import os
6
import pytest
7
import six
8

  
9
from hobo.agent.worker.services import deploy, notify
10
from hobo.agent.worker import settings
11

  
12

  
13
ENVIRONMENT = {
14
    u'timestamp': '2022-02-22',
15
    u'services': [
16
        {
17
            u'service-id': u'hobo',
18
            u'slug': u'hobo',
19
            u'title': u'Hobo primary',
20
            u'base_url': u'https://hobo1.dev.publik.love/',
21
            u'this': True,
22
            u'secret_key': u'1nesüper5Cr!eteKAaY~',
23
        },
24
        {
25
            u'service-id': u'combo',
26
            u'slug': u'portal',
27
            u'title': u'Compte citoyen',
28
            u'base_url': u'https://combo.dev.publik.love/',
29
            u'secret_key': u'1nesüper5Cr!eteKAaY~',
30
        },
31
        {
32
            u'service-id': u'wcs',
33
            u'slug': u'eservices',
34
            u'title': u'Démarches',
35
            u'base_url': u'https://wcs.dev.publik.love/',
36
            u'secret_key': u'1nesüper5Cr!eteKAaY~',
37
        },
38
        {
39
            u'service-id': u'hobo',
40
            u'slug': u'hobo',
41
            u'title': u'skip beacause of agent host patterns',
42
            u'base_url': u'https://hobo2.dev.publik.love/',
43
            u'secondary': True,
44
            u'secret_key': u'1nesüper5Cr!eteKAaY~',
45
        },
46
        {
47
            u'service-id': u'passerelle',
48
            u'slug': u'passerelle',
49
            u'title': u'skipped because no secret_key',
50
            u'base_url': u'https://passerelle.dev.publik.love/',
51
        },
52
        {
53
            u'service-id': u'unknown',
54
            u'title': u'skipped because unknown service-id',
55
        }
56
    ]}
57

  
58
LOCAL_SETTINGS = """
59
AGENT_HOST_PATTERNS = {
60
    'wcs': ['*.dev.publik.love'],
61
    'hobo': ['*hobo1*', '!*hobo2*'],
62
}
63
"""
64

  
65
NOTIFICATION = {
66
    '@type': 'provision',
67
    'audience': ['https://combo.dev.publik.love/'],
68
    'objects': {
69
        '@type': 'role',
70
        'data': [
71
            {
72
                'uuid': '12345',
73
                'name': 'Service petite enfance',
74
                'slug': 'service-petite-enfance',
75
                'description': 'Role du service petite enfance',
76
            }]}}
77

  
78

  
79
@pytest.fixture
80
def local_settings(tmpdir):
81
    path = os.path.join(str(tmpdir), 'local_settings.py')
82
    open(path, 'w').write(LOCAL_SETTINGS)
83
    os.environ['HOBO_AGENT_SETTINGS_FILE'] = path
84

  
85
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=True)
86
@mock.patch('hobo.agent.worker.services.subprocess')
87
def test_deploy(mocked_subprocess, mocked_exists):
88
    mocked_communicate = mock.Mock(return_value=('', ''))
89
    mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
90
    mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
91
    deploy(ENVIRONMENT)
92

  
93
    # process called
94
    mock_calls = set(x[1][0] for x in mocked_subprocess.Popen.mock_calls)
95
    assert len(mock_calls) == 4
96
    assert '/usr/bin/hobo-manage hobo_deploy https://hobo1.dev.publik.love/ -' in mock_calls
97
    assert '/usr/bin/wcsctl hobo_deploy https://wcs.dev.publik.love/ -' in mock_calls
98
    assert '/usr/lib/combo/manage.py hobo_deploy https://combo.dev.publik.love/ -' in mock_calls
99
    assert '/usr/bin/hobo-manage hobo_deploy https://hobo2.dev.publik.love/ -' in mock_calls
100

  
101
    # environment sent
102
    mock_calls = mocked_communicate.mock_calls
103
    assert len(mock_calls) == 4
104
    for i in range(0, len(mock_calls)):
105
        assert json.loads(mock_calls[0][2]['input']) == ENVIRONMENT
106

  
107
    mocked_opened.returncode = 1
108
    with pytest.raises(RuntimeError, match='failed: '):
109
        deploy(ENVIRONMENT)
110

  
111
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=True)
112
@mock.patch('hobo.agent.worker.services.subprocess')
113
def test_deploy_host_with_agent_patterns(mocked_subprocess, mocked_exists, local_settings):
114
    mocked_communicate = mock.Mock(return_value=('', ''))
115
    mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
116
    mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
117

  
118
    if six.PY2:
119
        reload(settings)
120
    else:
121
        importlib.reload(settings)
122
    deploy(ENVIRONMENT)
123
    # process called
124
    mock_calls = set(x[1][0] for x in mocked_subprocess.Popen.mock_calls)
125
    assert len(mock_calls) == 3
126
    assert '/usr/bin/hobo-manage hobo_deploy https://hobo1.dev.publik.love/ -' in mock_calls
127
    assert '/usr/bin/wcsctl hobo_deploy https://wcs.dev.publik.love/ -' in mock_calls
128
    assert '/usr/lib/combo/manage.py hobo_deploy https://combo.dev.publik.love/ -' in mock_calls
129

  
130
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=True)
131
@mock.patch('hobo.agent.worker.services.os.listdir')
132
@mock.patch('hobo.agent.worker.services.subprocess')
133
def test_notify(mocked_subprocess, mocked_listdir, mocked_exists):
134
    def listdir(path):
135
        return '%s.dev.publik.love' % path.split('/')[3]
136
    mocked_listdir.side_effect = listdir
137
    mocked_communicate = mock.Mock(return_value=('', ''))
138
    mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
139
    mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
140
    notify(NOTIFICATION)
141

  
142
    # process called
143
    mock_calls = set(x[1][0] for x in mocked_subprocess.Popen.mock_calls)
144
    assert mock_calls == {'/usr/lib/combo/manage.py hobo_notify -'}
145

  
146
    # notification sent
147
    mock_calls = mocked_communicate.mock_calls
148
    assert len(mock_calls) == 1
149
    assert json.loads(mock_calls[0][2]['input'])['objects']['data'][0]['uuid'] == '12345'
150

  
151
    mocked_opened.returncode = 1
152
    with pytest.raises(RuntimeError, match='failed: '):
153
        notify(NOTIFICATION)
154

  
155
    # ignore subprocess error
156
    mocked_communicate.reset_mock()
157
    mocked_subprocess.Popen.side_effect = OSError
158
    notify(NOTIFICATION)
159

  
160
@mock.patch('hobo.agent.worker.services.os.path.exists', return_value=False)
161
@mock.patch('hobo.agent.worker.services.subprocess')
162
def test_notify_no_tenant_path(mocked_subprocess, mocked_exists):
163
    mocked_communicate = mock.Mock(return_value=('', ''))
164
    mocked_opened = mock.Mock(communicate=mocked_communicate, returncode=0)
165
    mocked_subprocess.Popen = mock.Mock(return_value=mocked_opened)
166

  
167
    notify(NOTIFICATION)
168
    mock_calls = mocked_communicate.mock_calls
169
    assert len(mock_calls) == 0
0
-