0002-misc-add-after_timestamp-to-run-Job-later-36215.patch
passerelle/base/migrations/0015_auto_20190921_0347.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
2 |
# Generated by Django 1.11.20 on 2019-09-21 08:47 |
|
3 |
from __future__ import unicode_literals |
|
4 | ||
5 |
from django.db import migrations, models |
|
6 |
import passerelle.base.models |
|
7 | ||
8 | ||
9 |
class Migration(migrations.Migration): |
|
10 | ||
11 |
dependencies = [ |
|
12 |
('base', '0014_auto_20190820_0914'), |
|
13 |
] |
|
14 | ||
15 |
operations = [ |
|
16 |
migrations.AddField( |
|
17 |
model_name='job', |
|
18 |
name='after_timestamp', |
|
19 |
field=models.DateTimeField(null=True), |
|
20 |
), |
|
21 |
] |
passerelle/base/models.py | ||
---|---|---|
502 | 502 |
skipped_jobs = [] |
503 | 503 |
while True: |
504 | 504 |
with transaction.atomic(): |
505 |
# lock a job |
|
506 |
job = self.jobs_set().exclude( |
|
507 |
pk__in=skipped_jobs |
|
508 |
).filter( |
|
505 |
# lock an immediately runnable job |
|
506 |
job = ( |
|
507 |
self.jobs_set() |
|
508 |
.exclude(pk__in=skipped_jobs) |
|
509 |
.filter( |
|
510 |
Q(after_timestamp__isnull=True) | Q(after_timestamp__lt=timezone.now()), |
|
509 | 511 |
status='registered' |
510 |
).select_for_update(**skip_locked |
|
511 |
).order_by('pk')[:1].first() |
|
512 |
) |
|
513 |
.select_for_update(**skip_locked) |
|
514 |
.order_by('pk')[:1] |
|
515 |
.first() |
|
516 |
) |
|
512 | 517 |
if not job: |
513 | 518 |
break |
514 | 519 |
job.status = 'running' |
... | ... | |
516 | 521 |
# release lock |
517 | 522 |
try: |
518 | 523 |
getattr(self, job.method_name)(**job.parameters) |
519 |
except SkipJob: |
|
524 |
except SkipJob as e:
|
|
520 | 525 |
job.status = 'registered' |
526 |
job.after_timestamp = e.after_timestamp |
|
521 | 527 |
skipped_jobs.append(job.id) |
522 | 528 |
except Exception as e: |
523 | 529 |
self.handle_job_error(job, sys.exc_info()) |
... | ... | |
526 | 532 |
job.done_timestamp = timezone.now() |
527 | 533 |
job.save() |
528 | 534 | |
529 |
def add_job(self, method_name, natural_id=None, **kwargs): |
|
535 |
def add_job(self, method_name, natural_id=None, after_timestamp=None, **kwargs):
|
|
530 | 536 |
resource_type = ContentType.objects.get_for_model(self) |
531 | 537 |
job = Job(resource_type=resource_type, |
532 | 538 |
resource_pk=self.pk, |
533 | 539 |
method_name=method_name, |
534 | 540 |
natural_id=natural_id, |
541 |
after_timestamp=after_timestamp, |
|
535 | 542 |
parameters=kwargs) |
536 | 543 |
job.save() |
537 | 544 |
return job |
... | ... | |
645 | 652 | |
646 | 653 | |
647 | 654 |
class SkipJob(Exception): |
648 |
pass |
|
655 |
def __init__(self, after_timestamp=None): |
|
656 |
self.after_timestamp = after_timestamp |
|
657 |
super(SkipJob, self).__init__() |
|
649 | 658 | |
650 | 659 | |
651 | 660 |
class Job(models.Model): |
... | ... | |
658 | 667 |
creation_timestamp = models.DateTimeField(auto_now_add=True) |
659 | 668 |
update_timestamp = models.DateTimeField(auto_now=True) |
660 | 669 |
done_timestamp = models.DateTimeField(null=True) |
670 |
after_timestamp = models.DateTimeField(null=True) |
|
661 | 671 |
status = models.CharField( |
662 | 672 |
max_length=20, |
663 | 673 |
default='registered', |
tests/test_jobs.py | ||
---|---|---|
12 | 12 | |
13 | 13 | |
14 | 14 |
@mock.patch('passerelle.utils.Request.get') |
15 |
def test_jobs(mocked_get, app, base_adresse): |
|
15 |
def test_jobs(mocked_get, app, base_adresse, freezer):
|
|
16 | 16 |
filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.bz2') |
17 | 17 |
mocked_get.return_value = utils.FakedResponse(content=open(filepath).read(), status_code=200) |
18 | 18 | |
... | ... | |
39 | 39 |
base_adresse.jobs() |
40 | 40 |
assert Job.objects.get(id=job.id).status == 'registered' |
41 | 41 | |
42 |
# use after_timestamp with SkipJob |
|
43 |
mocked_get.side_effect = None |
|
44 |
freezer.move_to('2019-01-01 00:00:00') |
|
45 |
Job.objects.filter(id=job.id).update(after_timestamp='2019-01-02 00:00:00') |
|
46 |
base_adresse.jobs() |
|
47 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
48 |
freezer.move_to('2019-01-02 01:00:00') |
|
49 |
base_adresse.jobs() |
|
50 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
51 | ||
52 |
# use after_timestamp with add_job |
|
53 |
freezer.move_to('2019-01-01 00:00:00') |
|
54 |
job = base_adresse.add_job('update_streets_data', after_timestamp='2019-01-02 00:00:00') |
|
55 |
base_adresse.jobs() |
|
56 |
assert Job.objects.get(id=job.id).status == 'registered' |
|
57 |
freezer.move_to('2019-01-02 01:00:00') |
|
58 |
base_adresse.jobs() |
|
59 |
assert Job.objects.get(id=job.id).status == 'completed' |
|
60 | ||
42 | 61 |
# don't run jobs if connector is down |
43 | 62 |
StreetModel.objects.all().delete() |
44 | 63 |
with mock.patch('passerelle.apps.base_adresse.models.BaseAdresse.down') as down: |
45 |
- |