0002-misc-add-after_timestamp-to-run-Job-later-36215.patch
passerelle/base/migrations/0015_auto_20190921_0347.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
# Generated by Django 1.11.20 on 2019-09-21 08:47 |
|
3 |
from __future__ import unicode_literals |
|
4 | ||
5 |
from django.db import migrations, models |
|
6 |
import passerelle.base.models |
|
7 | ||
8 | ||
9 |
class Migration(migrations.Migration): |
|
10 | ||
11 |
dependencies = [ |
|
12 |
('base', '0014_auto_20190820_0914'), |
|
13 |
] |
|
14 | ||
15 |
operations = [ |
|
16 |
migrations.AddField( |
|
17 |
model_name='job', |
|
18 |
name='after_timestamp', |
|
19 |
field=models.DateTimeField(null=True), |
|
20 |
), |
|
21 |
] |
passerelle/base/models.py | ||
---|---|---|
502 | 502 |
skipped_jobs = [] |
503 | 503 |
while True: |
504 | 504 |
with transaction.atomic(): |
505 |
# lock a job |
|
506 |
job = self.jobs_set().exclude( |
|
507 |
pk__in=skipped_jobs |
|
508 |
).filter( |
|
505 |
# lock an immediately runnable job |
|
506 |
job = ( |
|
507 |
self.jobs_set() |
|
508 |
.exclude(pk__in=skipped_jobs) |
|
509 |
.filter( |
|
510 |
Q(after_timestamp__isnull=True) | Q(after_timestamp__lt=timezone.now()), |
|
509 | 511 |
status='registered' |
510 |
).select_for_update(**skip_locked |
|
511 |
).order_by('pk')[:1].first() |
|
512 |
) |
|
513 |
.select_for_update(**skip_locked) |
|
514 |
.order_by('pk')[:1] |
|
515 |
.first() |
|
516 |
) |
|
512 | 517 |
if not job: |
513 | 518 |
break |
514 | 519 |
job.status = 'running' |
... | ... | |
516 | 521 |
# release lock |
517 | 522 |
try: |
518 | 523 |
getattr(self, job.method_name)(**job.parameters) |
519 |
except SkipJob: |
|
524 |
except SkipJob as e:
|
|
520 | 525 |
job.status = 'registered' |
526 |
job.set_after_timestamp(e.after_timestamp) |
|
521 | 527 |
skipped_jobs.append(job.id) |
522 | 528 |
except Exception as e: |
523 | 529 |
self.handle_job_error(job, sys.exc_info()) |
... | ... | |
526 | 532 |
job.done_timestamp = timezone.now() |
527 | 533 |
job.save() |
528 | 534 | |
529 |
def add_job(self, method_name, natural_id=None, **kwargs): |
|
535 |
def add_job(self, method_name, natural_id=None, after_timestamp=None, **kwargs):
|
|
530 | 536 |
resource_type = ContentType.objects.get_for_model(self) |
531 | 537 |
job = Job(resource_type=resource_type, |
532 | 538 |
resource_pk=self.pk, |
533 | 539 |
method_name=method_name, |
534 | 540 |
natural_id=natural_id, |
535 | 541 |
parameters=kwargs) |
542 |
job.set_after_timestamp(after_timestamp) |
|
536 | 543 |
job.save() |
537 | 544 |
return job |
538 | 545 | |
... | ... | |
645 | 652 | |
646 | 653 | |
647 | 654 |
class SkipJob(Exception): |
648 |
pass |
|
655 |
def __init__(self, after_timestamp=None): |
|
656 |
self.after_timestamp = after_timestamp |
|
657 |
super(SkipJob, self).__init__() |
|
649 | 658 | |
650 | 659 | |
651 | 660 |
class Job(models.Model): |
... | ... | |
658 | 667 |
creation_timestamp = models.DateTimeField(auto_now_add=True) |
659 | 668 |
update_timestamp = models.DateTimeField(auto_now=True) |
660 | 669 |
done_timestamp = models.DateTimeField(null=True) |
670 |
after_timestamp = models.DateTimeField(null=True) |
|
661 | 671 |
status = models.CharField( |
662 | 672 |
max_length=20, |
663 | 673 |
default='registered', |
... | ... | |
669 | 679 |
) |
670 | 680 |
status_details = jsonfield.JSONField(default={}) |
671 | 681 | |
682 |
def set_after_timestamp(self, value): |
|
683 |
if isinstance(value, datetime.datetime): |
|
684 |
self.after_timestamp = value |
|
685 |
elif isinstance(value, six.integer_types + (float,)): |
|
686 |
self.after_timestamp = timezone.now() + datetime.timedelta(seconds=value) |
|
687 |
elif isinstance(value, datetime.timedelta): |
|
688 |
self.after_timestamp = timezone.now() + value |
|
689 |
else: |
|
690 |
self.after_timestamp = value |
|
691 | ||
672 | 692 | |
673 | 693 |
class ResourceLog(models.Model): |
674 | 694 |
timestamp = models.DateTimeField(auto_now_add=True) |
tests/test_jobs.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 |
import datetime |
|
3 | 4 |
import os |
4 | 5 | |
5 | 6 |
import mock |
... | ... | |
12 | 13 | |
13 | 14 | |
14 | 15 |
@mock.patch('passerelle.utils.Request.get') |
15 |
def test_jobs(mocked_get, app, base_adresse): |
|
16 |
def test_jobs(mocked_get, app, base_adresse, freezer):
|
|
16 | 17 |
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.bz2') |
17 | 18 |
mocked_get.return_value = utils.FakedResponse(content=open(filepath).read(), status_code=200) |
18 | 19 | |
... | ... | |
39 | 40 |
base_adresse.jobs() |
40 | 41 |
assert Job.objects.get(id=job.id).status == 'registered' |
41 | 42 | |
43 |
# use after_timestamp with SkipJob |
|
44 |
freezer.move_to('2019-01-01 00:00:00') |
|
45 |
mocked_get.side_effect = SkipJob(after_timestamp='2019-01-02 00:00:00') |
|
46 |
base_adresse.jobs() |
|
47 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
48 |
mocked_get.side_effect = None |
|
49 |
freezer.move_to('2019-01-01 12:00:00') |
|
50 |
base_adresse.jobs() |
|
51 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
52 |
freezer.move_to('2019-01-02 01:00:00') |
|
53 |
base_adresse.jobs() |
|
54 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
55 | ||
56 |
# use after_timestamp with SkipJob and seconds |
|
57 |
job = base_adresse.add_job('update_streets_data') |
|
58 |
freezer.move_to('2019-01-01 00:00:00') |
|
59 |
mocked_get.side_effect = SkipJob(after_timestamp=3600) |
|
60 |
base_adresse.jobs() |
|
61 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
62 |
mocked_get.side_effect = None |
|
63 |
freezer.move_to('2019-01-01 00:30:00') |
|
64 |
base_adresse.jobs() |
|
65 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
66 |
freezer.move_to('2019-01-01 01:01:00') |
|
67 |
base_adresse.jobs() |
|
68 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
69 | ||
70 |
# use after_timestamp with SkipJob and timedelta |
|
71 |
job = base_adresse.add_job('update_streets_data') |
|
72 |
freezer.move_to('2019-01-01 00:00:00') |
|
73 |
mocked_get.side_effect = SkipJob(after_timestamp=datetime.timedelta(seconds=3600)) |
|
74 |
base_adresse.jobs() |
|
75 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
76 |
mocked_get.side_effect = None |
|
77 |
freezer.move_to('2019-01-01 00:30:00') |
|
78 |
base_adresse.jobs() |
|
79 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
80 |
freezer.move_to('2019-01-01 01:01:00') |
|
81 |
base_adresse.jobs() |
|
82 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
83 | ||
84 |
# use after_timestamp with add_job |
|
85 |
freezer.move_to('2019-01-01 00:00:00') |
|
86 |
job = base_adresse.add_job('update_streets_data', after_timestamp='2019-01-02 00:00:00') |
|
87 |
base_adresse.jobs() |
|
88 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
89 |
freezer.move_to('2019-01-02 01:00:00') |
|
90 |
base_adresse.jobs() |
|
91 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
92 | ||
93 |
# use after_timestamp with add_job and seconds |
|
94 |
freezer.move_to('2019-01-01 00:00:00') |
|
95 |
job = base_adresse.add_job('update_streets_data', after_timestamp=3600) |
|
96 |
base_adresse.jobs() |
|
97 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
98 |
freezer.move_to('2019-01-01 01:01:00') |
|
99 |
base_adresse.jobs() |
|
100 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
101 | ||
102 |
# use after_timestamp with add_job and seconds |
|
103 |
freezer.move_to('2019-01-01 00:00:00') |
|
104 |
job = base_adresse.add_job('update_streets_data', after_timestamp=datetime.timedelta(seconds=3600)) |
|
105 |
base_adresse.jobs() |
|
106 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
107 |
freezer.move_to('2019-01-01 01:01:00') |
|
108 |
base_adresse.jobs() |
|
109 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
110 | ||
42 | 111 |
# don't run jobs if connector is down |
43 | 112 |
StreetModel.objects.all().delete() |
44 | 113 |
with mock.patch('passerelle.apps.base_adresse.models.BaseAdresse.down') as down: |
45 |
- |