0003-misc-add-after_timestamp-to-run-Job-later-36215.patch
passerelle/base/migrations/0015_auto_20190921_0347.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
# Generated by Django 1.11.20 on 2019-09-21 08:47 |
|
3 |
from __future__ import unicode_literals |
|
4 | ||
5 |
from django.db import migrations, models |
|
6 |
import passerelle.base.models |
|
7 | ||
8 | ||
9 |
class Migration(migrations.Migration): |
|
10 | ||
11 |
dependencies = [ |
|
12 |
('base', '0014_auto_20190820_0914'), |
|
13 |
] |
|
14 | ||
15 |
operations = [ |
|
16 |
migrations.AddField( |
|
17 |
model_name='job', |
|
18 |
name='after_timestamp', |
|
19 |
field=models.DateTimeField(null=True), |
|
20 |
), |
|
21 |
] |
passerelle/base/models.py | ||
---|---|---|
501 | 501 |
skipped_jobs = [] |
502 | 502 |
while True: |
503 | 503 |
with transaction.atomic(): |
504 |
# lock a runnable job |
|
504 |
# lock an immediately runnable job
|
|
505 | 505 |
job = self.jobs_set().exclude( |
506 | 506 |
pk__in=skipped_jobs |
507 | 507 |
).filter( |
508 |
Q(after_timestamp__isnull=True) | Q(after_timestamp__lt=timezone.now()), |
|
508 | 509 |
status='registered' |
509 | 510 |
).select_for_update(**skip_locked |
510 | 511 |
).order_by('pk')[:1].first() |
... | ... | |
515 | 516 |
# release lock |
516 | 517 |
try: |
517 | 518 |
getattr(self, job.method_name)(**job.parameters) |
518 |
except SkipJob: |
|
519 |
except SkipJob as e:
|
|
519 | 520 |
job.status = 'registered' |
521 |
job.set_after_timestamp(e.after_timestamp) |
|
520 | 522 |
skipped_jobs.append(job.id) |
521 | 523 |
except Exception as e: |
522 | 524 |
self.handle_job_error(job, sys.exc_info()) |
... | ... | |
525 | 527 |
job.done_timestamp = timezone.now() |
526 | 528 |
job.save() |
527 | 529 | |
528 |
def add_job(self, method_name, natural_id=None, **kwargs): |
|
530 |
def add_job(self, method_name, natural_id=None, after_timestamp=None, **kwargs):
|
|
529 | 531 |
resource_type = ContentType.objects.get_for_model(self) |
530 | 532 |
job = Job(resource_type=resource_type, |
531 | 533 |
resource_pk=self.pk, |
532 | 534 |
method_name=method_name, |
533 | 535 |
natural_id=natural_id, |
534 | 536 |
parameters=kwargs) |
537 |
job.set_after_timestamp(after_timestamp) |
|
535 | 538 |
job.save() |
536 | 539 |
return job |
537 | 540 | |
... | ... | |
644 | 647 | |
645 | 648 | |
646 | 649 |
class SkipJob(Exception): |
647 |
pass |
|
650 |
def __init__(self, after_timestamp=None): |
|
651 |
self.after_timestamp = after_timestamp |
|
652 |
super(SkipJob, self).__init__() |
|
648 | 653 | |
649 | 654 | |
650 | 655 |
class Job(models.Model): |
... | ... | |
657 | 662 |
creation_timestamp = models.DateTimeField(auto_now_add=True) |
658 | 663 |
update_timestamp = models.DateTimeField(auto_now=True) |
659 | 664 |
done_timestamp = models.DateTimeField(null=True) |
665 |
after_timestamp = models.DateTimeField(null=True) |
|
660 | 666 |
status = models.CharField( |
661 | 667 |
max_length=20, |
662 | 668 |
default='registered', |
... | ... | |
668 | 674 |
) |
669 | 675 |
status_details = jsonfield.JSONField(default={}) |
670 | 676 | |
677 |
def set_after_timestamp(self, value): |
|
678 |
if isinstance(value, datetime.datetime): |
|
679 |
self.after_timestamp = value |
|
680 |
elif isinstance(value, six.integer_types + (float,)): |
|
681 |
self.after_timestamp = timezone.now() + datetime.timedelta(seconds=value) |
|
682 |
elif isinstance(value, datetime.timedelta): |
|
683 |
self.after_timestamp = timezone.now() + value |
|
684 |
else: |
|
685 |
self.after_timestamp = value |
|
686 | ||
671 | 687 | |
672 | 688 |
class ResourceLog(models.Model): |
673 | 689 |
timestamp = models.DateTimeField(auto_now_add=True) |
tests/test_jobs.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 |
import datetime |
|
3 | 4 |
import os |
4 | 5 | |
5 | 6 |
import mock |
... | ... | |
12 | 13 | |
13 | 14 | |
14 | 15 |
@mock.patch('passerelle.utils.Request.get') |
15 |
def test_jobs(mocked_get, app, base_adresse): |
|
16 |
def test_jobs(mocked_get, app, base_adresse, freezer):
|
|
16 | 17 |
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.bz2') |
17 | 18 |
mocked_get.return_value = utils.FakedResponse(content=open(filepath).read(), status_code=200) |
18 | 19 | |
... | ... | |
39 | 40 |
base_adresse.jobs() |
40 | 41 |
assert Job.objects.get(id=job.id).status == 'registered' |
41 | 42 | |
43 |
# use after_timestamp with SkipJob |
|
44 |
freezer.move_to('2019-01-01 00:00:00') |
|
45 |
mocked_get.side_effect = SkipJob(after_timestamp='2019-01-02 00:00:00') |
|
46 |
base_adresse.jobs() |
|
47 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
48 |
mocked_get.side_effect = None |
|
49 |
freezer.move_to('2019-01-01 12:00:00') |
|
50 |
base_adresse.jobs() |
|
51 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
52 |
freezer.move_to('2019-01-02 01:00:00') |
|
53 |
base_adresse.jobs() |
|
54 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
55 | ||
56 |
# use after_timestamp with SkipJob and seconds |
|
57 |
job = base_adresse.add_job('update_streets_data') |
|
58 |
freezer.move_to('2019-01-01 00:00:00') |
|
59 |
mocked_get.side_effect = SkipJob(after_timestamp=3600) |
|
60 |
base_adresse.jobs() |
|
61 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
62 |
mocked_get.side_effect = None |
|
63 |
freezer.move_to('2019-01-01 00:30:00') |
|
64 |
base_adresse.jobs() |
|
65 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
66 |
freezer.move_to('2019-01-01 01:01:00') |
|
67 |
base_adresse.jobs() |
|
68 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
69 | ||
70 |
# use after_timestamp with SkipJob and timedelta |
|
71 |
job = base_adresse.add_job('update_streets_data') |
|
72 |
freezer.move_to('2019-01-01 00:00:00') |
|
73 |
mocked_get.side_effect = SkipJob(after_timestamp=datetime.timedelta(seconds=3600)) |
|
74 |
base_adresse.jobs() |
|
75 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
76 |
mocked_get.side_effect = None |
|
77 |
freezer.move_to('2019-01-01 00:30:00') |
|
78 |
base_adresse.jobs() |
|
79 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
80 |
freezer.move_to('2019-01-01 01:01:00') |
|
81 |
base_adresse.jobs() |
|
82 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
83 | ||
84 |
# use after_timestamp with add_job |
|
85 |
freezer.move_to('2019-01-01 00:00:00') |
|
86 |
job = base_adresse.add_job('update_streets_data', after_timestamp='2019-01-02 00:00:00') |
|
87 |
base_adresse.jobs() |
|
88 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
89 |
freezer.move_to('2019-01-02 01:00:00') |
|
90 |
base_adresse.jobs() |
|
91 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
92 | ||
93 |
# use after_timestamp with add_job and seconds |
|
94 |
freezer.move_to('2019-01-01 00:00:00') |
|
95 |
job = base_adresse.add_job('update_streets_data', after_timestamp=3600) |
|
96 |
base_adresse.jobs() |
|
97 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
98 |
freezer.move_to('2019-01-01 01:01:00') |
|
99 |
base_adresse.jobs() |
|
100 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
101 | ||
102 |
# use after_timestamp with add_job and seconds |
|
103 |
freezer.move_to('2019-01-01 00:00:00') |
|
104 |
job = base_adresse.add_job('update_streets_data', after_timestamp=datetime.timedelta(seconds=3600)) |
|
105 |
base_adresse.jobs() |
|
106 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
107 |
freezer.move_to('2019-01-01 01:01:00') |
|
108 |
base_adresse.jobs() |
|
109 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
110 | ||
42 | 111 |
# don't run jobs if connector is down |
43 | 112 |
StreetModel.objects.all().delete() |
44 | 113 |
with mock.patch('passerelle.apps.base_adresse.models.BaseAdresse.down') as down: |
45 |
- |