37 |
37 |
apiuser = ApiUser.objects.create(username='me', keytype='API', key=API_KEY)
|
38 |
38 |
obj_type = ContentType.objects.get_for_model(OVHSMSGateway)
|
39 |
39 |
AccessRight.objects.create(
|
40 |
40 |
codename='can_access', apiuser=apiuser, resource_type=obj_type, resource_pk=connector.pk
|
41 |
41 |
)
|
42 |
42 |
return connector
|
43 |
43 |
|
44 |
44 |
|
45 |
|
@pytest.mark.xfail(run=False)
|
46 |
45 |
@mock.patch('passerelle.sms.models.SMSResource.send_job')
|
47 |
|
def test_api_jobs(mocked_send_job, app, connector, simple_user, admin_user):
|
|
46 |
def test_api_jobs(mocked_send_job, app, connector, simple_user, admin_user, freezer):
|
48 |
47 |
assert Job.objects.count() == 0
|
49 |
48 |
url = reverse('api-job', kwargs={'pk': 22})
|
|
49 |
freezer.move_to('2022-01-01T12:00:00Z')
|
50 |
50 |
|
51 |
51 |
# no job
|
52 |
52 |
resp = app.get(url, status=200)
|
53 |
53 |
assert resp.json['err'] == 1
|
54 |
54 |
assert resp.json['err_desc'] == 'No job found matching the query'
|
55 |
55 |
|
56 |
56 |
job = connector.add_job('send_job', my_parameter='my_value')
|
57 |
57 |
assert Job.objects.count() == 1
|
... | ... | |
79 |
79 |
assert resp.json['data']['id'] == job.id
|
80 |
80 |
assert resp.json['data']['resource'] == 'OVHSMSGateway'
|
81 |
81 |
assert resp.json['data']['parameters'] == {'my_parameter': 'my_value'}
|
82 |
82 |
assert resp.json['data']['status'] == 'registered'
|
83 |
83 |
assert resp.json['data']['done_timestamp'] is None
|
84 |
84 |
update_timestamp1 = resp.json['data']['update_timestamp']
|
85 |
85 |
|
86 |
86 |
# completed job
|
|
87 |
freezer.move_to('2022-01-01T12:01:00Z')
|
87 |
88 |
connector.jobs()
|
88 |
89 |
assert mocked_send_job.call_args == mock.call(my_parameter='my_value')
|
89 |
90 |
resp = app.get(url, status=200)
|
90 |
91 |
assert not resp.json['err']
|
91 |
92 |
assert resp.json['data']['id'] == job.id
|
92 |
93 |
assert resp.json['data']['status'] == 'completed'
|
93 |
94 |
assert resp.json['data']['done_timestamp'] is not None
|
94 |
|
assert resp.json['data']['update_timestamp'] < update_timestamp1
|
|
95 |
assert resp.json['data']['update_timestamp'] > update_timestamp1
|
95 |
96 |
|
96 |
97 |
# failed job
|
97 |
98 |
job = connector.add_job('send_job')
|
98 |
99 |
assert Job.objects.count() == 2
|
99 |
100 |
mocked_send_job.side_effect = Exception('my error message')
|
100 |
101 |
connector.jobs()
|
101 |
102 |
url = reverse('api-job', kwargs={'pk': job.id})
|
102 |
103 |
resp = app.get(url, status=200)
|
103 |
|
-
|