0001-cron-fork-each-job-in-each-tenant-with-a-lock-18519.patch
tests/test_publisher.py | ||
---|---|---|
166 | 166 |
assert pub.cfg['sp'] == {'what': 'ever'} |
167 | 167 |
assert not isinstance(pub.cfg['language'], unicode) |
168 | 168 |
assert not isinstance(pub.cfg['whatever2'][-1]['c'], unicode) |
169 | ||
170 |
def test_cron_command(): |
|
171 |
pub = create_temporary_pub() |
|
172 |
with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker: |
|
173 |
call_command('cron') |
|
174 |
assert cron_worker.call_count == 1 |
wcs/qommon/cron.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import os |
|
17 | 18 |
import sys |
18 | 19 | |
20 |
from qommon.vendor.locket import lock_file, LockError |
|
21 | ||
22 | ||
19 | 23 |
class CronJob(object): |
20 | 24 |
hours = None |
21 | 25 |
minutes = None |
22 | 26 |
weekdays = None |
23 | 27 |
days = None |
24 | 28 |
function = None |
29 |
name = None |
|
25 | 30 | |
26 |
def __init__(self, function, hours = None, minutes = None, weekdays = None, days = None):
|
|
31 |
def __init__(self, function, name=None, hours=None, minutes=None, weekdays=None, days=None):
|
|
27 | 32 |
self.function = function |
28 | 33 |
self.hours = hours |
29 | 34 |
self.minutes = minutes |
30 | 35 |
self.weekdays = weekdays |
31 | 36 |
self.days = days |
37 |
self.name = name or function.__name__ |
|
38 | ||
39 |
def run(self, publisher, now): |
|
40 |
if self.days and now[2] not in self.days: |
|
41 |
return |
|
42 |
if self.weekdays and now[6] not in self.weekdays: |
|
43 |
return |
|
44 |
if self.hours and not now[3] in self.hours: |
|
45 |
return |
|
46 |
if self.minutes and not now[4] in self.minutes: |
|
47 |
return |
|
48 | ||
49 |
logger = publisher.get_app_logger() |
|
50 |
logger.debug('cron: run %s on %s', self.name, publisher.app_dir) |
|
51 | ||
52 |
lock_filename = os.path.join(publisher.app_dir, 'cron-%s.lock' % self.name) |
|
53 |
lock = lock_file(lock_filename, timeout=0) |
|
54 | ||
55 |
try: |
|
56 |
lock.acquire() |
|
57 |
except LockError as e: |
|
58 |
logger.info('cron: locked by %s', lock_filename) |
|
59 |
return |
|
32 | 60 | |
33 |
def cron_worker(publisher, now): |
|
34 |
try: |
|
35 |
publisher.set_config() |
|
36 |
except: |
|
37 |
return |
|
38 |
for job in publisher.cronjobs: |
|
39 |
if job.days and now[2] not in job.days: |
|
40 |
continue |
|
41 |
if job.weekdays and now[6] not in job.weekdays: |
|
42 |
continue |
|
43 |
if job.hours and not now[3] in job.hours: |
|
44 |
continue |
|
45 |
if job.minutes and not now[4] in job.minutes: |
|
46 |
continue |
|
47 | ||
48 |
class FakeRequest(object): |
|
49 |
language = publisher.get_site_language() |
|
50 | ||
51 |
publisher.install_lang(FakeRequest()) |
|
52 |
publisher.substitutions.reset() |
|
53 |
publisher.substitutions.feed(publisher) |
|
54 |
for extra_source in publisher.extra_sources: |
|
55 |
publisher.substitutions.feed(extra_source(publisher, None)) |
|
56 | 61 |
try: |
57 |
job.function(publisher) |
|
58 |
except: |
|
59 |
publisher.notify_of_exception(sys.exc_info(), context='[CRON]') |
|
62 |
class FakeRequest(object): |
|
63 |
language = publisher.get_site_language() |
|
64 |
publisher.install_lang(FakeRequest()) |
|
65 |
publisher.substitutions.reset() |
|
66 |
publisher.substitutions.feed(publisher) |
|
67 |
for extra_source in publisher.extra_sources: |
|
68 |
publisher.substitutions.feed(extra_source(publisher, None)) |
|
69 |
try: |
|
70 |
self.function(publisher) |
|
71 |
except Exception as e: |
|
72 |
publisher.notify_of_exception(sys.exc_info(), context='[CRON]') |
|
73 |
finally: |
|
74 |
lock.release() |
wcs/qommon/management/commands/cron.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU General Public License |
15 | 15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import tempfile
|
|
17 |
import multiprocessing.pool
|
|
18 | 18 |
import time |
19 | 19 |
import os |
20 | 20 | |
21 | 21 |
from django.core.management.base import BaseCommand |
22 | 22 |
from qommon.publisher import get_publisher_class |
23 | 23 | |
24 |
from qommon.vendor import locket |
|
25 |
from qommon.cron import cron_worker |
|
26 | ||
27 | 24 | |
28 | 25 |
class Command(BaseCommand): |
29 | 26 |
help = 'Execute cronjobs' |
30 | 27 | |
31 | 28 |
def handle(self, verbosity, **options): |
32 |
with locket.lock_file(os.path.join(tempfile.gettempdir(), 'wcs-cron')): |
|
33 |
now = time.localtime() |
|
34 |
publisher_class = get_publisher_class() |
|
35 |
publisher_class.register_cronjobs() |
|
36 |
publisher = publisher_class.create_publisher() |
|
37 |
app_dir = publisher.app_dir |
|
38 |
for hostname in publisher.get_tenants(): |
|
39 |
publisher.app_dir = os.path.join(app_dir, hostname) |
|
40 |
cron_worker(publisher, now) |
|
29 |
now = time.localtime() |
|
30 |
publisher_class = get_publisher_class() |
|
31 |
publisher_class.register_cronjobs() |
|
32 |
publisher = publisher_class.create_publisher() |
|
33 |
cronjobs = [] |
|
34 |
for hostname in publisher.get_tenants(): |
|
35 |
try: |
|
36 |
publisher.set_config() |
|
37 |
except Exception as e: |
|
38 |
continue |
|
39 |
job_publisher_class = get_publisher_class() |
|
40 |
job_publisher_class.register_cronjobs() |
|
41 |
job_publisher = job_publisher_class.create_publisher() |
|
42 |
app_dir = job_publisher.app_dir |
|
43 |
job_publisher.app_dir = os.path.join(app_dir, hostname) |
|
44 |
for cronjob in job_publisher.cronjobs: |
|
45 |
cronjobs.append({ |
|
46 |
'cronjob': cronjob, |
|
47 |
'publisher': job_publisher |
|
48 |
}) |
|
49 | ||
50 |
pool = multiprocessing.pool.ThreadPool() |
|
51 |
try: |
|
52 |
list(pool.imap_unordered(lambda x: x['cronjob'].run(x['publisher'], now), cronjobs)) |
|
53 |
finally: |
|
54 |
pool.close() |
|
55 |
pool.terminate() |
|
41 |
- |