Projet

Général

Profil

0002-misc-add-after_timestamp-to-run-Job-later-36215.patch

Benjamin Dauvergne, 21 septembre 2019 11:17

Télécharger (6,04 ko)

Voir les différences:

Subject: [PATCH 2/2] misc: add after_timestamp to run Job later (#36215)

The after_timestamp can be set:
- when adding the job with:

    self.add_job(..., after_timestamp=datetime(...))

- when skipping a job with:

    raise SkipJob(after_timestamp=datetime(...))
 .../migrations/0015_auto_20190921_0347.py     | 21 ++++++++++++++
 passerelle/base/models.py                     | 28 +++++++++++++------
 tests/test_jobs.py                            | 21 +++++++++++++-
 3 files changed, 60 insertions(+), 10 deletions(-)
 create mode 100644 passerelle/base/migrations/0015_auto_20190921_0347.py
passerelle/base/migrations/0015_auto_20190921_0347.py
1
# -*- coding: utf-8 -*-
2
# Generated by Django 1.11.20 on 2019-09-21 08:47
3
from __future__ import unicode_literals
4

  
5
from django.db import migrations, models
6
import passerelle.base.models
7

  
8

  
9
class Migration(migrations.Migration):
10

  
11
    dependencies = [
12
        ('base', '0014_auto_20190820_0914'),
13
    ]
14

  
15
    operations = [
16
        migrations.AddField(
17
            model_name='job',
18
            name='after_timestamp',
19
            field=models.DateTimeField(null=True),
20
        ),
21
    ]
passerelle/base/models.py
502 502
        skipped_jobs = []
503 503
        while True:
504 504
            with transaction.atomic():
505
                # lock a job
506
                job = self.jobs_set().exclude(
507
                        pk__in=skipped_jobs
508
                        ).filter(
505
                # lock an immediately runnable job
506
                job = (
507
                    self.jobs_set()
508
                    .exclude(pk__in=skipped_jobs)
509
                    .filter(
510
                        Q(after_timestamp__isnull=True) | Q(after_timestamp__lt=timezone.now()),
509 511
                        status='registered'
510
                        ).select_for_update(**skip_locked
511
                        ).order_by('pk')[:1].first()
512
                    )
513
                    .select_for_update(**skip_locked)
514
                    .order_by('pk')[:1]
515
                    .first()
516
                )
512 517
                if not job:
513 518
                    break
514 519
                job.status = 'running'
......
516 521
                # release lock
517 522
            try:
518 523
                getattr(self, job.method_name)(**job.parameters)
519
            except SkipJob:
524
            except SkipJob as e:
520 525
                job.status = 'registered'
526
                job.after_timestamp = e.after_timestamp
521 527
                skipped_jobs.append(job.id)
522 528
            except Exception as e:
523 529
                self.handle_job_error(job, sys.exc_info())
......
526 532
                job.done_timestamp = timezone.now()
527 533
            job.save()
528 534

  
529
    def add_job(self, method_name, natural_id=None, **kwargs):
535
    def add_job(self, method_name, natural_id=None, after_timestamp=None, **kwargs):
530 536
        resource_type = ContentType.objects.get_for_model(self)
531 537
        job = Job(resource_type=resource_type,
532 538
                  resource_pk=self.pk,
533 539
                  method_name=method_name,
534 540
                  natural_id=natural_id,
541
                  after_timestamp=after_timestamp,
535 542
                  parameters=kwargs)
536 543
        job.save()
537 544
        return job
......
645 652

  
646 653

  
647 654
class SkipJob(Exception):
648
    pass
655
    def __init__(self, after_timestamp=None):
656
        self.after_timestamp = after_timestamp
657
        super(SkipJob, self).__init__()
649 658

  
650 659

  
651 660
class Job(models.Model):
......
658 667
    creation_timestamp = models.DateTimeField(auto_now_add=True)
659 668
    update_timestamp = models.DateTimeField(auto_now=True)
660 669
    done_timestamp = models.DateTimeField(null=True)
670
    after_timestamp = models.DateTimeField(null=True)
661 671
    status = models.CharField(
662 672
            max_length=20,
663 673
            default='registered',
tests/test_jobs.py
12 12

  
13 13

  
14 14
@mock.patch('passerelle.utils.Request.get')
15
def test_jobs(mocked_get, app, base_adresse):
15
def test_jobs(mocked_get, app, base_adresse, freezer):
16 16
    filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.bz2')
17 17
    mocked_get.return_value = utils.FakedResponse(content=open(filepath).read(), status_code=200)
18 18

  
......
39 39
    base_adresse.jobs()
40 40
    assert Job.objects.get(id=job.id).status == 'registered'
41 41

  
42
    # use after_timestamp with SkipJob
43
    mocked_get.side_effect = None
44
    freezer.move_to('2019-01-01 00:00:00')
45
    Job.objects.filter(id=job.id).update(after_timestamp='2019-01-02 00:00:00')
46
    base_adresse.jobs()
47
    assert Job.objects.get(id=job.id).status == 'registered'
48
    freezer.move_to('2019-01-02 01:00:00')
49
    base_adresse.jobs()
50
    assert Job.objects.get(id=job.id).status == 'completed'
51

  
52
    # use after_timestamp with add_job
53
    freezer.move_to('2019-01-01 00:00:00')
54
    job = base_adresse.add_job('update_streets_data', after_timestamp='2019-01-02 00:00:00')
55
    base_adresse.jobs()
56
    assert Job.objects.get(id=job.id).status == 'registered'
57
    freezer.move_to('2019-01-02 01:00:00')
58
    base_adresse.jobs()
59
    assert Job.objects.get(id=job.id).status == 'completed'
60

  
42 61
    # don't run jobs if connector is down
43 62
    StreetModel.objects.all().delete()
44 63
    with mock.patch('passerelle.apps.base_adresse.models.BaseAdresse.down') as down:
45
-