Projet

Général

Profil

0001-cron-fork-each-job-in-each-tenant-with-a-lock-18519.patch

Thomas Noël, 21 mars 2018 16:56

Télécharger (6,48 ko)

Voir les différences:

Subject: [PATCH] cron: fork each job, in each tenant, with a lock (#18519)

 tests/test_publisher.py                |  6 ---
 wcs/qommon/cron.py                     | 69 +++++++++++++++++++++-------------
 wcs/qommon/management/commands/cron.py | 41 +++++++++++++-------
 3 files changed, 70 insertions(+), 46 deletions(-)
tests/test_publisher.py
166 166
    assert pub.cfg['sp'] == {'what': 'ever'}
167 167
    assert not isinstance(pub.cfg['language'], unicode)
168 168
    assert not isinstance(pub.cfg['whatever2'][-1]['c'], unicode)
169

  
170
def test_cron_command():
171
    pub = create_temporary_pub()
172
    with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
173
        call_command('cron')
174
        assert cron_worker.call_count == 1
wcs/qommon/cron.py
14 14
# You should have received a copy of the GNU General Public License
15 15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import os
17 18
import sys
18 19

  
20
from qommon.vendor.locket import lock_file, LockError
21

  
22

  
19 23
class CronJob(object):
20 24
    hours = None
21 25
    minutes = None
22 26
    weekdays = None
23 27
    days = None
24 28
    function = None
29
    name = None
25 30

  
26
    def __init__(self, function, hours = None, minutes = None, weekdays = None, days = None):
31
    def __init__(self, function, name=None, hours=None, minutes=None, weekdays=None, days=None):
27 32
        self.function = function
28 33
        self.hours = hours
29 34
        self.minutes = minutes
30 35
        self.weekdays = weekdays
31 36
        self.days = days
37
        self.name = name or function.__name__
38

  
39
    def run(self, publisher, now):
40
        if self.days and now[2] not in self.days:
41
            return
42
        if self.weekdays and now[6] not in self.weekdays:
43
            return
44
        if self.hours and not now[3] in self.hours:
45
            return
46
        if self.minutes and not now[4] in self.minutes:
47
            return
48

  
49
        logger = publisher.get_app_logger()
50
        logger.debug('cron: run %s on %s', self.name, publisher.app_dir)
51

  
52
        lock_filename = os.path.join(publisher.app_dir, 'cron-%s.lock' % self.name)
53
        lock = lock_file(lock_filename, timeout=0)
54

  
55
        try:
56
            lock.acquire()
57
        except LockError as e:
58
            logger.info('cron: locked by %s', lock_filename)
59
            return
32 60

  
33
def cron_worker(publisher, now):
34
    try:
35
        publisher.set_config()
36
    except:
37
        return
38
    for job in publisher.cronjobs:
39
        if job.days and now[2] not in job.days:
40
            continue
41
        if job.weekdays and now[6] not in job.weekdays:
42
            continue
43
        if job.hours and not now[3] in job.hours:
44
            continue
45
        if job.minutes and not now[4] in job.minutes:
46
            continue
47

  
48
        class FakeRequest(object):
49
            language = publisher.get_site_language()
50

  
51
        publisher.install_lang(FakeRequest())
52
        publisher.substitutions.reset()
53
        publisher.substitutions.feed(publisher)
54
        for extra_source in publisher.extra_sources:
55
            publisher.substitutions.feed(extra_source(publisher, None))
56 61
        try:
57
            job.function(publisher)
58
        except:
59
            publisher.notify_of_exception(sys.exc_info(), context='[CRON]')
62
            class FakeRequest(object):
63
                language = publisher.get_site_language()
64
            publisher.install_lang(FakeRequest())
65
            publisher.substitutions.reset()
66
            publisher.substitutions.feed(publisher)
67
            for extra_source in publisher.extra_sources:
68
                publisher.substitutions.feed(extra_source(publisher, None))
69
            try:
70
                self.function(publisher)
71
            except Exception as e:
72
                publisher.notify_of_exception(sys.exc_info(), context='[CRON]')
73
        finally:
74
            lock.release()
wcs/qommon/management/commands/cron.py
14 14
# You should have received a copy of the GNU General Public License
15 15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import tempfile
17
import multiprocessing.pool
18 18
import time
19 19
import os
20 20

  
21 21
from django.core.management.base import BaseCommand
22 22
from qommon.publisher import get_publisher_class
23 23

  
24
from qommon.vendor import locket
25
from qommon.cron import cron_worker
26

  
27 24

  
28 25
class Command(BaseCommand):
29 26
    help = 'Execute cronjobs'
30 27

  
31 28
    def handle(self, verbosity, **options):
32
        with locket.lock_file(os.path.join(tempfile.gettempdir(), 'wcs-cron')):
33
            now = time.localtime()
34
            publisher_class = get_publisher_class()
35
            publisher_class.register_cronjobs()
36
            publisher = publisher_class.create_publisher()
37
            app_dir = publisher.app_dir
38
            for hostname in publisher.get_tenants():
39
                publisher.app_dir = os.path.join(app_dir, hostname)
40
                cron_worker(publisher, now)
29
        now = time.localtime()
30
        publisher_class = get_publisher_class()
31
        publisher_class.register_cronjobs()
32
        publisher = publisher_class.create_publisher()
33
        cronjobs = []
34
        for hostname in publisher.get_tenants():
35
            try:
36
                publisher.set_config()
37
            except Exception as e:
38
                continue
39
            job_publisher_class = get_publisher_class()
40
            job_publisher_class.register_cronjobs()
41
            job_publisher = job_publisher_class.create_publisher()
42
            app_dir = job_publisher.app_dir
43
            job_publisher.app_dir = os.path.join(app_dir, hostname)
44
            for cronjob in job_publisher.cronjobs:
45
                cronjobs.append({
46
                    'cronjob': cronjob,
47
                    'publisher': job_publisher
48
                })
49

  
50
        pool = multiprocessing.pool.ThreadPool()
51
        try:
52
            list(pool.imap_unordered(lambda x: x['cronjob'].run(x['publisher'], now), cronjobs))
53
        finally:
54
            pool.close()
55
            pool.terminate()
41
-