Projet

Général

Profil

0002-misc-add-after_timestamp-to-run-Job-later-36215.patch

Benjamin Dauvergne, 21 septembre 2019 14:04

Télécharger (8,88 ko)

Voir les différences:

Subject: [PATCH 2/2] misc: add after_timestamp to run Job later (#36215)

The after_timestamp can be set:
- when adding the job with:

    self.add_job(..., after_timestamp=datetime(...))

- when skipping a job with:

    raise SkipJob(after_timestamp=datetime(...))
 .../migrations/0015_auto_20190921_0347.py     | 21 ++++++
 passerelle/base/models.py                     | 38 +++++++---
 tests/test_jobs.py                            | 71 ++++++++++++++++++-
 3 files changed, 120 insertions(+), 10 deletions(-)
 create mode 100644 passerelle/base/migrations/0015_auto_20190921_0347.py
passerelle/base/migrations/0015_auto_20190921_0347.py
1
# -*- coding: utf-8 -*-
2
# Generated by Django 1.11.20 on 2019-09-21 08:47
3
from __future__ import unicode_literals
4

  
5
from django.db import migrations, models
6
import passerelle.base.models
7

  
8

  
9
class Migration(migrations.Migration):
10

  
11
    dependencies = [
12
        ('base', '0014_auto_20190820_0914'),
13
    ]
14

  
15
    operations = [
16
        migrations.AddField(
17
            model_name='job',
18
            name='after_timestamp',
19
            field=models.DateTimeField(null=True),
20
        ),
21
    ]
passerelle/base/models.py
502 502
        skipped_jobs = []
503 503
        while True:
504 504
            with transaction.atomic():
505
                # lock a job
506
                job = self.jobs_set().exclude(
507
                        pk__in=skipped_jobs
508
                        ).filter(
505
                # lock an immediately runnable job
506
                job = (
507
                    self.jobs_set()
508
                    .exclude(pk__in=skipped_jobs)
509
                    .filter(
510
                        Q(after_timestamp__isnull=True) | Q(after_timestamp__lt=timezone.now()),
509 511
                        status='registered'
510
                        ).select_for_update(**skip_locked
511
                        ).order_by('pk')[:1].first()
512
                    )
513
                    .select_for_update(**skip_locked)
514
                    .order_by('pk')[:1]
515
                    .first()
516
                )
512 517
                if not job:
513 518
                    break
514 519
                job.status = 'running'
......
516 521
                # release lock
517 522
            try:
518 523
                getattr(self, job.method_name)(**job.parameters)
519
            except SkipJob:
524
            except SkipJob as e:
520 525
                job.status = 'registered'
526
                job.set_after_timestamp(e.after_timestamp)
521 527
                skipped_jobs.append(job.id)
522 528
            except Exception as e:
523 529
                self.handle_job_error(job, sys.exc_info())
......
526 532
                job.done_timestamp = timezone.now()
527 533
            job.save()
528 534

  
529
    def add_job(self, method_name, natural_id=None, **kwargs):
535
    def add_job(self, method_name, natural_id=None, after_timestamp=None, **kwargs):
530 536
        resource_type = ContentType.objects.get_for_model(self)
531 537
        job = Job(resource_type=resource_type,
532 538
                  resource_pk=self.pk,
533 539
                  method_name=method_name,
534 540
                  natural_id=natural_id,
535 541
                  parameters=kwargs)
542
        job.set_after_timestamp(after_timestamp)
536 543
        job.save()
537 544
        return job
538 545

  
......
645 652

  
646 653

  
647 654
class SkipJob(Exception):
648
    pass
655
    def __init__(self, after_timestamp=None):
656
        self.after_timestamp = after_timestamp
657
        super(SkipJob, self).__init__()
649 658

  
650 659

  
651 660
class Job(models.Model):
......
658 667
    creation_timestamp = models.DateTimeField(auto_now_add=True)
659 668
    update_timestamp = models.DateTimeField(auto_now=True)
660 669
    done_timestamp = models.DateTimeField(null=True)
670
    after_timestamp = models.DateTimeField(null=True)
661 671
    status = models.CharField(
662 672
            max_length=20,
663 673
            default='registered',
......
669 679
            )
670 680
    status_details = jsonfield.JSONField(default={})
671 681

  
682
    def set_after_timestamp(self, value):
683
        if isinstance(value, datetime.datetime):
684
            self.after_timestamp = value
685
        elif isinstance(value, six.integer_types + (float,)):
686
            self.after_timestamp = timezone.now() + datetime.timedelta(seconds=value)
687
        elif isinstance(value, datetime.timedelta):
688
            self.after_timestamp = timezone.now() + value
689
        else:
690
            self.after_timestamp = value
691

  
672 692

  
673 693
class ResourceLog(models.Model):
674 694
    timestamp = models.DateTimeField(auto_now_add=True)
tests/test_jobs.py
1 1
# -*- coding: utf-8 -*-
2 2

  
3
import datetime
3 4
import os
4 5

  
5 6
import mock
......
12 13

  
13 14

  
14 15
@mock.patch('passerelle.utils.Request.get')
15
def test_jobs(mocked_get, app, base_adresse):
16
def test_jobs(mocked_get, app, base_adresse, freezer):
16 17
    filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.bz2')
17 18
    mocked_get.return_value = utils.FakedResponse(content=open(filepath).read(), status_code=200)
18 19

  
......
39 40
    base_adresse.jobs()
40 41
    assert Job.objects.get(id=job.id).status == 'registered'
41 42

  
43
    # use after_timestamp with SkipJob
44
    freezer.move_to('2019-01-01 00:00:00')
45
    mocked_get.side_effect = SkipJob(after_timestamp='2019-01-02 00:00:00')
46
    base_adresse.jobs()
47
    assert Job.objects.get(id=job.id).status == 'registered'
48
    mocked_get.side_effect = None
49
    freezer.move_to('2019-01-01 12:00:00')
50
    base_adresse.jobs()
51
    assert Job.objects.get(id=job.id).status == 'registered'
52
    freezer.move_to('2019-01-02 01:00:00')
53
    base_adresse.jobs()
54
    assert Job.objects.get(id=job.id).status == 'completed'
55

  
56
    # use after_timestamp with SkipJob and seconds
57
    job = base_adresse.add_job('update_streets_data')
58
    freezer.move_to('2019-01-01 00:00:00')
59
    mocked_get.side_effect = SkipJob(after_timestamp=3600)
60
    base_adresse.jobs()
61
    assert Job.objects.get(id=job.id).status == 'registered'
62
    mocked_get.side_effect = None
63
    freezer.move_to('2019-01-01 00:30:00')
64
    base_adresse.jobs()
65
    assert Job.objects.get(id=job.id).status == 'registered'
66
    freezer.move_to('2019-01-01 01:01:00')
67
    base_adresse.jobs()
68
    assert Job.objects.get(id=job.id).status == 'completed'
69

  
70
    # use after_timestamp with SkipJob and timedelta
71
    job = base_adresse.add_job('update_streets_data')
72
    freezer.move_to('2019-01-01 00:00:00')
73
    mocked_get.side_effect = SkipJob(after_timestamp=datetime.timedelta(seconds=3600))
74
    base_adresse.jobs()
75
    assert Job.objects.get(id=job.id).status == 'registered'
76
    mocked_get.side_effect = None
77
    freezer.move_to('2019-01-01 00:30:00')
78
    base_adresse.jobs()
79
    assert Job.objects.get(id=job.id).status == 'registered'
80
    freezer.move_to('2019-01-01 01:01:00')
81
    base_adresse.jobs()
82
    assert Job.objects.get(id=job.id).status == 'completed'
83

  
84
    # use after_timestamp with add_job
85
    freezer.move_to('2019-01-01 00:00:00')
86
    job = base_adresse.add_job('update_streets_data', after_timestamp='2019-01-02 00:00:00')
87
    base_adresse.jobs()
88
    assert Job.objects.get(id=job.id).status == 'registered'
89
    freezer.move_to('2019-01-02 01:00:00')
90
    base_adresse.jobs()
91
    assert Job.objects.get(id=job.id).status == 'completed'
92

  
93
    # use after_timestamp with add_job and seconds
94
    freezer.move_to('2019-01-01 00:00:00')
95
    job = base_adresse.add_job('update_streets_data', after_timestamp=3600)
96
    base_adresse.jobs()
97
    assert Job.objects.get(id=job.id).status == 'registered'
98
    freezer.move_to('2019-01-01 01:01:00')
99
    base_adresse.jobs()
100
    assert Job.objects.get(id=job.id).status == 'completed'
101

  
102
    # use after_timestamp with add_job and seconds
103
    freezer.move_to('2019-01-01 00:00:00')
104
    job = base_adresse.add_job('update_streets_data', after_timestamp=datetime.timedelta(seconds=3600))
105
    base_adresse.jobs()
106
    assert Job.objects.get(id=job.id).status == 'registered'
107
    freezer.move_to('2019-01-01 01:01:00')
108
    base_adresse.jobs()
109
    assert Job.objects.get(id=job.id).status == 'completed'
110

  
42 111
    # don't run jobs if connector is down
43 112
    StreetModel.objects.all().delete()
44 113
    with mock.patch('passerelle.apps.base_adresse.models.BaseAdresse.down') as down:
45
-