Projet

Général

Profil

0003-misc-add-after_timestamp-to-run-Job-later-36215.patch

Benjamin Dauvergne, 01 octobre 2019 10:41

Télécharger (8,49 ko)

Voir les différences:

Subject: [PATCH 3/3] misc: add after_timestamp to run Job later (#36215)

The after_timestamp can be set:
- when adding the job with:

    self.add_job(..., after_timestamp=datetime(...))

- when skipping a job with:

    raise SkipJob(after_timestamp=datetime(...))
 .../migrations/0015_auto_20190921_0347.py     | 21 ++++++
 passerelle/base/models.py                     | 24 +++++--
 tests/test_jobs.py                            | 71 ++++++++++++++++++-
 3 files changed, 111 insertions(+), 5 deletions(-)
 create mode 100644 passerelle/base/migrations/0015_auto_20190921_0347.py
passerelle/base/migrations/0015_auto_20190921_0347.py
1
# -*- coding: utf-8 -*-
2
# Generated by Django 1.11.20 on 2019-09-21 08:47
3
from __future__ import unicode_literals
4

  
5
from django.db import migrations, models
6
import passerelle.base.models
7

  
8

  
9
class Migration(migrations.Migration):
10

  
11
    dependencies = [
12
        ('base', '0014_auto_20190820_0914'),
13
    ]
14

  
15
    operations = [
16
        migrations.AddField(
17
            model_name='job',
18
            name='after_timestamp',
19
            field=models.DateTimeField(null=True),
20
        ),
21
    ]
passerelle/base/models.py
501 501
        skipped_jobs = []
502 502
        while True:
503 503
            with transaction.atomic():
504
                # lock a runnable job
504
                # lock an immediately runnable job
505 505
                job = self.jobs_set().exclude(
506 506
                        pk__in=skipped_jobs
507 507
                        ).filter(
508
                        Q(after_timestamp__isnull=True) | Q(after_timestamp__lt=timezone.now()),
508 509
                        status='registered'
509 510
                        ).select_for_update(**skip_locked
510 511
                        ).order_by('pk')[:1].first()
......
515 516
                # release lock
516 517
            try:
517 518
                getattr(self, job.method_name)(**job.parameters)
518
            except SkipJob:
519
            except SkipJob as e:
519 520
                job.status = 'registered'
521
                job.set_after_timestamp(e.after_timestamp)
520 522
                skipped_jobs.append(job.id)
521 523
            except Exception as e:
522 524
                self.handle_job_error(job, sys.exc_info())
......
525 527
                job.done_timestamp = timezone.now()
526 528
            job.save()
527 529

  
528
    def add_job(self, method_name, natural_id=None, **kwargs):
530
    def add_job(self, method_name, natural_id=None, after_timestamp=None, **kwargs):
529 531
        resource_type = ContentType.objects.get_for_model(self)
530 532
        job = Job(resource_type=resource_type,
531 533
                  resource_pk=self.pk,
532 534
                  method_name=method_name,
533 535
                  natural_id=natural_id,
534 536
                  parameters=kwargs)
537
        job.set_after_timestamp(after_timestamp)
535 538
        job.save()
536 539
        return job
537 540

  
......
644 647

  
645 648

  
646 649
class SkipJob(Exception):
647
    pass
650
    def __init__(self, after_timestamp=None):
651
        self.after_timestamp = after_timestamp
652
        super(SkipJob, self).__init__()
648 653

  
649 654

  
650 655
class Job(models.Model):
......
657 662
    creation_timestamp = models.DateTimeField(auto_now_add=True)
658 663
    update_timestamp = models.DateTimeField(auto_now=True)
659 664
    done_timestamp = models.DateTimeField(null=True)
665
    after_timestamp = models.DateTimeField(null=True)
660 666
    status = models.CharField(
661 667
            max_length=20,
662 668
            default='registered',
......
668 674
            )
669 675
    status_details = jsonfield.JSONField(default={})
670 676

  
677
    def set_after_timestamp(self, value):
678
        if isinstance(value, datetime.datetime):
679
            self.after_timestamp = value
680
        elif isinstance(value, six.integer_types + (float,)):
681
            self.after_timestamp = timezone.now() + datetime.timedelta(seconds=value)
682
        elif isinstance(value, datetime.timedelta):
683
            self.after_timestamp = timezone.now() + value
684
        else:
685
            self.after_timestamp = value
686

  
671 687

  
672 688
class ResourceLog(models.Model):
673 689
    timestamp = models.DateTimeField(auto_now_add=True)
tests/test_jobs.py
1 1
# -*- coding: utf-8 -*-
2 2

  
3
import datetime
3 4
import os
4 5

  
5 6
import mock
......
12 13

  
13 14

  
14 15
@mock.patch('passerelle.utils.Request.get')
15
def test_jobs(mocked_get, app, base_adresse):
16
def test_jobs(mocked_get, app, base_adresse, freezer):
16 17
    filepath = os.path.join(os.path.dirname(__file__), 'data', 'update_streets_test.bz2')
17 18
    mocked_get.return_value = utils.FakedResponse(content=open(filepath).read(), status_code=200)
18 19

  
......
39 40
    base_adresse.jobs()
40 41
    assert Job.objects.get(id=job.id).status == 'registered'
41 42

  
43
    # use after_timestamp with SkipJob
44
    freezer.move_to('2019-01-01 00:00:00')
45
    mocked_get.side_effect = SkipJob(after_timestamp='2019-01-02 00:00:00')
46
    base_adresse.jobs()
47
    assert Job.objects.get(id=job.id).status == 'registered'
48
    mocked_get.side_effect = None
49
    freezer.move_to('2019-01-01 12:00:00')
50
    base_adresse.jobs()
51
    assert Job.objects.get(id=job.id).status == 'registered'
52
    freezer.move_to('2019-01-02 01:00:00')
53
    base_adresse.jobs()
54
    assert Job.objects.get(id=job.id).status == 'completed'
55

  
56
    # use after_timestamp with SkipJob and seconds
57
    job = base_adresse.add_job('update_streets_data')
58
    freezer.move_to('2019-01-01 00:00:00')
59
    mocked_get.side_effect = SkipJob(after_timestamp=3600)
60
    base_adresse.jobs()
61
    assert Job.objects.get(id=job.id).status == 'registered'
62
    mocked_get.side_effect = None
63
    freezer.move_to('2019-01-01 00:30:00')
64
    base_adresse.jobs()
65
    assert Job.objects.get(id=job.id).status == 'registered'
66
    freezer.move_to('2019-01-01 01:01:00')
67
    base_adresse.jobs()
68
    assert Job.objects.get(id=job.id).status == 'completed'
69

  
70
    # use after_timestamp with SkipJob and timedelta
71
    job = base_adresse.add_job('update_streets_data')
72
    freezer.move_to('2019-01-01 00:00:00')
73
    mocked_get.side_effect = SkipJob(after_timestamp=datetime.timedelta(seconds=3600))
74
    base_adresse.jobs()
75
    assert Job.objects.get(id=job.id).status == 'registered'
76
    mocked_get.side_effect = None
77
    freezer.move_to('2019-01-01 00:30:00')
78
    base_adresse.jobs()
79
    assert Job.objects.get(id=job.id).status == 'registered'
80
    freezer.move_to('2019-01-01 01:01:00')
81
    base_adresse.jobs()
82
    assert Job.objects.get(id=job.id).status == 'completed'
83

  
84
    # use after_timestamp with add_job
85
    freezer.move_to('2019-01-01 00:00:00')
86
    job = base_adresse.add_job('update_streets_data', after_timestamp='2019-01-02 00:00:00')
87
    base_adresse.jobs()
88
    assert Job.objects.get(id=job.id).status == 'registered'
89
    freezer.move_to('2019-01-02 01:00:00')
90
    base_adresse.jobs()
91
    assert Job.objects.get(id=job.id).status == 'completed'
92

  
93
    # use after_timestamp with add_job and seconds
94
    freezer.move_to('2019-01-01 00:00:00')
95
    job = base_adresse.add_job('update_streets_data', after_timestamp=3600)
96
    base_adresse.jobs()
97
    assert Job.objects.get(id=job.id).status == 'registered'
98
    freezer.move_to('2019-01-01 01:01:00')
99
    base_adresse.jobs()
100
    assert Job.objects.get(id=job.id).status == 'completed'
101

  
102
    # use after_timestamp with add_job and seconds
103
    freezer.move_to('2019-01-01 00:00:00')
104
    job = base_adresse.add_job('update_streets_data', after_timestamp=datetime.timedelta(seconds=3600))
105
    base_adresse.jobs()
106
    assert Job.objects.get(id=job.id).status == 'registered'
107
    freezer.move_to('2019-01-01 01:01:00')
108
    base_adresse.jobs()
109
    assert Job.objects.get(id=job.id).status == 'completed'
110

  
42 111
    # don't run jobs if connector is down
43 112
    StreetModel.objects.all().delete()
44 113
    with mock.patch('passerelle.apps.base_adresse.models.BaseAdresse.down') as down:
45
-