From 52ec976f9f15f822051708e0ef028030ae7a0ff6 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Sat, 21 Sep 2019 11:06:39 +0200 Subject: [PATCH 2/2] misc: add after_timestamp to run Job later (#36215) The after_timestamp can be set: - when adding the job with: self.add_job(..., after_timestamp=datetime(...)) - when skipping a job with: raise SkipJob(after_timestamp=datetime(...)) --- .../migrations/0015_auto_20190921_0347.py | 21 ++++++++++++++ passerelle/base/models.py | 28 +++++++++++++------ tests/test_jobs.py | 21 +++++++++++++- 3 files changed, 60 insertions(+), 10 deletions(-) create mode 100644 passerelle/base/migrations/0015_auto_20190921_0347.py diff --git a/passerelle/base/migrations/0015_auto_20190921_0347.py b/passerelle/base/migrations/0015_auto_20190921_0347.py new file mode 100644 index 00000000..db7ab742 --- /dev/null +++ b/passerelle/base/migrations/0015_auto_20190921_0347.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.20 on 2019-09-21 08:47 +from __future__ import unicode_literals + +from django.db import migrations, models +import passerelle.base.models + + +class Migration(migrations.Migration): + + dependencies = [ + ('base', '0014_auto_20190820_0914'), + ] + + operations = [ + migrations.AddField( + model_name='job', + name='after_timestamp', + field=models.DateTimeField(null=True), + ), + ] diff --git a/passerelle/base/models.py b/passerelle/base/models.py index 9f38c038..e38efe3b 100644 --- a/passerelle/base/models.py +++ b/passerelle/base/models.py @@ -502,13 +502,18 @@ class BaseResource(models.Model): skipped_jobs = [] while True: with transaction.atomic(): - # lock a job - job = self.jobs_set().exclude( - pk__in=skipped_jobs - ).filter( + # lock an immediately runnable job + job = ( + self.jobs_set() + .exclude(pk__in=skipped_jobs) + .filter( + Q(after_timestamp__isnull=True) | Q(after_timestamp__lt=timezone.now()), status='registered' - ).select_for_update(**skip_locked - ).order_by('pk')[:1].first() + ) + .select_for_update(**skip_locked) + .order_by('pk')[:1] + .first() + ) if not job: break job.status = 'running' @@ -516,8 +521,9 @@ class BaseResource(models.Model): # release lock try: getattr(self, job.method_name)(**job.parameters) - except SkipJob: + except SkipJob as e: job.status = 'registered' + job.after_timestamp = e.after_timestamp skipped_jobs.append(job.id) except Exception as e: self.handle_job_error(job, sys.exc_info()) @@ -526,12 +532,13 @@ class BaseResource(models.Model): job.done_timestamp = timezone.now() job.save() - def add_job(self, method_name, natural_id=None, **kwargs): + def add_job(self, method_name, natural_id=None, after_timestamp=None, **kwargs): resource_type = ContentType.objects.get_for_model(self) job = Job(resource_type=resource_type, resource_pk=self.pk, method_name=method_name, natural_id=natural_id, + after_timestamp=after_timestamp, parameters=kwargs) job.save() return job @@ -645,7 +652,9 @@ class AvailabilityParameters(models.Model): class SkipJob(Exception): - pass + def __init__(self, after_timestamp=None): + self.after_timestamp = after_timestamp + super(SkipJob, self).__init__() class Job(models.Model): @@ -658,6 +667,7 @@ class Job(models.Model): creation_timestamp = models.DateTimeField(auto_now_add=True) update_timestamp = models.DateTimeField(auto_now=True) done_timestamp = models.DateTimeField(null=True) + after_timestamp = models.DateTimeField(null=True) status = models.CharField( max_length=20, default='registered', diff --git a/tests/test_jobs.py b/tests/test_jobs.py index 47a3a7e4..e932909f 100644 --- a/tests/test_jobs.py +++ b/tests/test_jobs.py @@ -12,7 +12,7 @@ from .test_base_adresse import base_adresse, StreetModel @mock.patch('passerelle.utils.Request.get') -def test_jobs(mocked_get, app, base_adresse): +def test_jobs(mocked_get, app, base_adresse, freezer): filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.bz2') mocked_get.return_value = utils.FakedResponse(content=open(filepath).read(), status_code=200) @@ -39,6 +39,25 @@ def test_jobs(mocked_get, app, base_adresse): base_adresse.jobs() assert Job.objects.get(id=job.id).status == 'registered' + # use after_timestamp with SkipJob + mocked_get.side_effect = None + freezer.move_to('2019-01-01 00:00:00') + Job.objects.filter(id=job.id).update(after_timestamp='2019-01-02 00:00:00') + base_adresse.jobs() + assert Job.objects.get(id=job.id).status == 'registered' + freezer.move_to('2019-01-02 01:00:00') + base_adresse.jobs() + assert Job.objects.get(id=job.id).status == 'completed' + + # use after_timestamp with add_job + freezer.move_to('2019-01-01 00:00:00') + job = base_adresse.add_job('update_streets_data', after_timestamp='2019-01-02 00:00:00') + base_adresse.jobs() + assert Job.objects.get(id=job.id).status == 'registered' + freezer.move_to('2019-01-02 01:00:00') + base_adresse.jobs() + assert Job.objects.get(id=job.id).status == 'completed' + # don't run jobs if connector is down StreetModel.objects.all().delete() with mock.patch('passerelle.apps.base_adresse.models.BaseAdresse.down') as down: -- 2.23.0