29 |
29 |
return settings.HOBO_SKELETONS_DIR
|
30 |
30 |
|
31 |
31 |
|
32 |
|
def test_hobo_deploy(monkeypatch, tenant_base, mocker, skeleton_dir):
|
|
32 |
def test_hobo_deploy(monkeypatch, tenant_base, mocker, skeleton_dir, tmp_path):
|
33 |
33 |
from django.core.management import call_command
|
34 |
34 |
from django.conf import settings
|
35 |
35 |
from hobo.agent.authentic2.management.commands.hobo_deploy import Command as HoboDeployCommand
|
... | ... | |
122 |
122 |
side_effect_iter = iter([meta1, meta2, RequestException(), meta3])
|
123 |
123 |
|
124 |
124 |
def side_effect(*args, **kwargs):
|
125 |
|
v = next(side_effect_iter)
|
126 |
|
if isinstance(v, Exception):
|
127 |
|
raise v
|
128 |
|
m = mock.Mock()
|
129 |
|
m.text = v
|
130 |
|
return m
|
|
125 |
for v in side_effect_iter:
|
|
126 |
if isinstance(v, Exception):
|
|
127 |
raise v
|
|
128 |
m = mock.Mock()
|
|
129 |
m.text = v
|
|
130 |
return m
|
131 |
131 |
requests_get.side_effect = side_effect
|
132 |
132 |
env = {
|
133 |
133 |
'users': [
|
... | ... | |
230 |
230 |
'name': 'phone'
|
231 |
231 |
},
|
232 |
232 |
{
|
233 |
|
'kind': 'phone_number',
|
|
233 |
'kind': 'string',
|
234 |
234 |
'description': '',
|
235 |
235 |
'required': False,
|
236 |
236 |
'user_visible': True,
|
... | ... | |
308 |
308 |
},
|
309 |
309 |
]
|
310 |
310 |
}
|
311 |
|
hobo_json_content = json.dumps(env)
|
312 |
|
hobo_json = tempfile.NamedTemporaryFile(mode='w')
|
313 |
|
hobo_json.write(hobo_json_content)
|
314 |
|
hobo_json.flush()
|
|
311 |
|
|
312 |
def hobo_json():
|
|
313 |
with tempfile.NamedTemporaryFile(mode='w', dir=str(tmp_path), delete=False) as hobo_json:
|
|
314 |
hobo_json_content = json.dumps(env)
|
|
315 |
hobo_json.write(hobo_json_content)
|
|
316 |
return hobo_json.name
|
315 |
317 |
|
316 |
318 |
with mock.patch('hobo.agent.authentic2.provisionning.notify_agents') as mock_notify, \
|
317 |
319 |
mock.patch('hobo.agent.authentic2.management.commands.hobo_deploy.sleep', wraps=time.sleep) as sleep_mock:
|
318 |
|
call_command('hobo_deploy', 'http://sso.example.net', hobo_json.name)
|
|
320 |
call_command('hobo_deploy', 'http://sso.example.net', hobo_json())
|
319 |
321 |
assert sleep_mock.call_count == 1 # there is one retry, as the third service's metadata is temporarily unavailable
|
320 |
322 |
|
321 |
323 |
# check role mass provisionning to new services
|
... | ... | |
469 |
471 |
call_command('hobo_deploy', redeploy=True)
|
470 |
472 |
assert sleep_mock.call_count == 0
|
471 |
473 |
|
|
474 |
# test attribute kind update
|
|
475 |
with tenant_context(tenant):
|
|
476 |
assert Attribute.objects.filter(name='mobile', kind='string').count() == 1
|
|
477 |
field = env['profile']['fields'][8]
|
|
478 |
assert field['name'] == 'mobile'
|
|
479 |
field['kind'] = 'phone_number'
|
|
480 |
side_effect_iter = iter([meta1, meta2, RequestException(), meta3])
|
|
481 |
with mock.patch('hobo.agent.authentic2.provisionning.notify_agents') as mock_notify, \
|
|
482 |
mock.patch('hobo.agent.authentic2.management.commands.hobo_deploy.sleep', wraps=time.sleep) as sleep_mock:
|
|
483 |
call_command('hobo_deploy', 'http://sso.example.net', hobo_json(), ignore_timestamp=True)
|
|
484 |
with tenant_context(tenant):
|
|
485 |
assert Attribute.objects.filter(name='mobile', kind='string').count() == 0
|
|
486 |
assert Attribute.objects.filter(name='mobile', kind='phone_number').count() == 1
|
|
487 |
|
|
488 |
|
472 |
489 |
|
473 |
490 |
def test_import_template(db, tenant_base):
|
474 |
491 |
def listify(value):
|
475 |
|
-
|