From a3b1968868660256b8eae227e53a054cd58488cc Mon Sep 17 00:00:00 2001 From: Thomas NOEL Date: Wed, 21 Mar 2018 16:50:55 +0100 Subject: [PATCH] cron: fork each job, in each tenant, with a lock (#18519) --- tests/test_publisher.py | 6 --- wcs/qommon/cron.py | 69 +++++++++++++++++++++------------- wcs/qommon/management/commands/cron.py | 41 +++++++++++++------- 3 files changed, 70 insertions(+), 46 deletions(-) diff --git a/tests/test_publisher.py b/tests/test_publisher.py index c4184df0..20290959 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -166,9 +166,3 @@ def test_import_config_zip(): assert pub.cfg['sp'] == {'what': 'ever'} assert not isinstance(pub.cfg['language'], unicode) assert not isinstance(pub.cfg['whatever2'][-1]['c'], unicode) - -def test_cron_command(): - pub = create_temporary_pub() - with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker: - call_command('cron') - assert cron_worker.call_count == 1 diff --git a/wcs/qommon/cron.py b/wcs/qommon/cron.py index 15a46366..37742c3e 100644 --- a/wcs/qommon/cron.py +++ b/wcs/qommon/cron.py @@ -14,46 +14,61 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, see . +import os import sys +from qommon.vendor.locket import lock_file, LockError + + class CronJob(object): hours = None minutes = None weekdays = None days = None function = None + name = None - def __init__(self, function, hours = None, minutes = None, weekdays = None, days = None): + def __init__(self, function, name=None, hours=None, minutes=None, weekdays=None, days=None): self.function = function self.hours = hours self.minutes = minutes self.weekdays = weekdays self.days = days + self.name = name or function.__name__ + + def run(self, publisher, now): + if self.days and now[2] not in self.days: + return + if self.weekdays and now[6] not in self.weekdays: + return + if self.hours and not now[3] in self.hours: + return + if self.minutes and not now[4] in self.minutes: + return + + logger = publisher.get_app_logger() + logger.debug('cron: run %s on %s', self.name, publisher.app_dir) + + lock_filename = os.path.join(publisher.app_dir, 'cron-%s.lock' % self.name) + lock = lock_file(lock_filename, timeout=0) + + try: + lock.acquire() + except LockError as e: + logger.info('cron: locked by %s', lock_filename) + return -def cron_worker(publisher, now): - try: - publisher.set_config() - except: - return - for job in publisher.cronjobs: - if job.days and now[2] not in job.days: - continue - if job.weekdays and now[6] not in job.weekdays: - continue - if job.hours and not now[3] in job.hours: - continue - if job.minutes and not now[4] in job.minutes: - continue - - class FakeRequest(object): - language = publisher.get_site_language() - - publisher.install_lang(FakeRequest()) - publisher.substitutions.reset() - publisher.substitutions.feed(publisher) - for extra_source in publisher.extra_sources: - publisher.substitutions.feed(extra_source(publisher, None)) try: - job.function(publisher) - except: - publisher.notify_of_exception(sys.exc_info(), context='[CRON]') + class FakeRequest(object): + language = publisher.get_site_language() + publisher.install_lang(FakeRequest()) + publisher.substitutions.reset() + publisher.substitutions.feed(publisher) + for extra_source in publisher.extra_sources: + publisher.substitutions.feed(extra_source(publisher, None)) + try: + self.function(publisher) + except Exception as e: + publisher.notify_of_exception(sys.exc_info(), context='[CRON]') + finally: + lock.release() diff --git a/wcs/qommon/management/commands/cron.py b/wcs/qommon/management/commands/cron.py index da778bbc..478e639e 100644 --- a/wcs/qommon/management/commands/cron.py +++ b/wcs/qommon/management/commands/cron.py @@ -14,27 +14,42 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, see . -import tempfile +import multiprocessing.pool import time import os from django.core.management.base import BaseCommand from qommon.publisher import get_publisher_class -from qommon.vendor import locket -from qommon.cron import cron_worker - class Command(BaseCommand): help = 'Execute cronjobs' def handle(self, verbosity, **options): - with locket.lock_file(os.path.join(tempfile.gettempdir(), 'wcs-cron')): - now = time.localtime() - publisher_class = get_publisher_class() - publisher_class.register_cronjobs() - publisher = publisher_class.create_publisher() - app_dir = publisher.app_dir - for hostname in publisher.get_tenants(): - publisher.app_dir = os.path.join(app_dir, hostname) - cron_worker(publisher, now) + now = time.localtime() + publisher_class = get_publisher_class() + publisher_class.register_cronjobs() + publisher = publisher_class.create_publisher() + cronjobs = [] + for hostname in publisher.get_tenants(): + try: + publisher.set_config() + except Exception as e: + continue + job_publisher_class = get_publisher_class() + job_publisher_class.register_cronjobs() + job_publisher = job_publisher_class.create_publisher() + app_dir = job_publisher.app_dir + job_publisher.app_dir = os.path.join(app_dir, hostname) + for cronjob in job_publisher.cronjobs: + cronjobs.append({ + 'cronjob': cronjob, + 'publisher': job_publisher + }) + + pool = multiprocessing.pool.ThreadPool() + try: + list(pool.imap_unordered(lambda x: x['cronjob'].run(x['publisher'], now), cronjobs)) + finally: + pool.close() + pool.terminate() -- 2.16.2