Projet

Général

Profil

0001-cron-allow-some-cron-workers-to-be-run-in-parallel-3.patch

Frédéric Péters, 09 septembre 2022 13:38

Télécharger (29,4 ko)

Voir les différences:

Subject: [PATCH] cron: allow some cron workers to be run in parallel (#33280)

 tests/test_publisher.py                | 427 ++++++++++++++-----------
 wcs/qommon/cron.py                     |  13 +-
 wcs/qommon/management/commands/cron.py | 113 ++++---
 wcs/settings.py                        |   3 +
 wcs/sql.py                             |  29 ++
 5 files changed, 344 insertions(+), 241 deletions(-)
tests/test_publisher.py
1 1
import datetime
2
import glob
2 3
import io
3 4
import json
4 5
import os
......
6 7
import re
7 8
import shutil
8 9
import sys
9
import tempfile
10 10
import time
11 11
import xml.etree.ElementTree as ET
12 12
import zipfile
......
14 14

  
15 15
import pytest
16 16
from django.core.management import call_command
17
from django.core.management.base import CommandError
18 17
from django.http import Http404
19 18
from django.test import override_settings
20 19
from django.utils.timezone import localtime
21
from quixote import cleanup
20
from quixote import cleanup, get_publisher
22 21
from quixote.http_request import Upload
23 22

  
23
from wcs import sql
24 24
from wcs.qommon import get_publisher_class
25 25
from wcs.qommon.afterjobs import AfterJob
26 26
from wcs.qommon.cron import CronJob
......
214 214
    assert pub.cfg['sp'] == {'what': 'ever'}
215 215

  
216 216

  
217
def test_cron_command(settings):
218
    pub = create_temporary_pub()
219

  
220
    def clear_log_file():
221
        now = localtime()
222
        with open(os.path.join(pub.APP_DIR, 'cron.log-%s' % now.strftime('%Y%m%d')), 'w'):
217
def clear_log_files():
218
    now = localtime()
219
    with open(os.path.join(get_publisher().APP_DIR, 'cron.log-%s' % now.strftime('%Y%m%d')), 'w'):
220
        pass
221
    for filepath in glob.glob(os.path.join(get_publisher().APP_DIR, '*', 'cron.log-*')):
222
        with open(filepath, 'w'):
223 223
            pass
224 224

  
225
    def get_logs():
226
        now = localtime()
227
        with open(os.path.join(pub.APP_DIR, 'cron.log-%s' % now.strftime('%Y%m%d'))) as fd:
228
            lines = fd.readlines()
229
        lines = [line[33:].strip() for line in lines]  # 33 chars for date & time
230
        return lines
231

  
232
    offset = ord(settings.SECRET_KEY[-1]) % 60
233
    with mock.patch('tempfile.gettempdir') as gettempdir:
234
        gettempdir.side_effect = lambda: pub.app_dir
235 225

  
236
        hostnames = ['example.net', 'foo.bar', 'something.com']
237
        for hostname in hostnames:
238
            if not os.path.exists(os.path.join(pub.APP_DIR, hostname)):
239
                os.mkdir(os.path.join(pub.APP_DIR, hostname))
240
                # add a config.pck with postgresql configuration
241
                with open(os.path.join(pub.APP_DIR, hostname, 'config.pck'), 'wb') as fd:
242
                    pickle.dump(pub.cfg, file=fd)
226
def get_logs(hostname=None):
227
    pub = get_publisher()
228
    now = localtime()
229
    if hostname:
230
        base_dir = os.path.join(pub.APP_DIR, hostname)
231
    else:
232
        base_dir = pub.APP_DIR
233
    with open(os.path.join(base_dir, 'cron.log-%s' % now.strftime('%Y%m%d'))) as fd:
234
        lines = fd.readlines()
235
    lines = [line[33:].strip() for line in lines]  # 33 chars for date & time
236
    return lines
243 237

  
244
        with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
245
            with mock.patch('wcs.qommon.publisher.QommonPublisher.get_tenants') as mock_tenants:
246
                mock_tenants.return_value = [
247
                    Tenant(os.path.join(pub.app_dir, x)) for x in ('example.net', 'foo.bar', 'something.com')
248
                ]
249
                clear_log_file()
250
                call_command('cron')
251
                assert cron_worker.call_count == 3
252
                assert get_logs() == [
253
                    'starting cron (minutes offset is %s)' % offset,
254
                    '[tenant example.net] start',
255
                    '[tenant foo.bar] start',
256
                    '[tenant something.com] start',
257
                ]
258
                cron_worker.reset_mock()
259
                clear_log_file()
260
                call_command('cron', domain='example.net')
261
                assert cron_worker.call_count == 1
262
                assert get_logs() == [
263
                    'starting cron (minutes offset is %s)' % offset,
264
                    '[tenant example.net] start',
265
                ]
266
                cron_worker.reset_mock()
267

  
268
                # disable cron on something.com
269
                site_options_path = os.path.join(pub.APP_DIR, 'something.com', 'site-options.cfg')
270
                with open(site_options_path, 'w') as fd:
271
                    fd.write(
272
                        '''\
273
                        [variables]
274
                        disable_cron_jobs = True
275
                        '''
276
                    )
277

  
278
                clear_log_file()
279
                call_command('cron')
280
                assert cron_worker.call_count == 2
281
                assert get_logs() == [
282
                    'starting cron (minutes offset is %s)' % offset,
283
                    '[tenant example.net] start',
284
                    '[tenant foo.bar] start',
285
                ]
286
                cron_worker.reset_mock()
287
                os.unlink(site_options_path)
288

  
289
        # simulate another locked cron
290
        from wcs.qommon.vendor import locket
291

  
292
        lockfile = os.path.join(tempfile.gettempdir(), 'wcs-cron-in-progress.lock')
293
        with locket.lock_file(lockfile, timeout=0):
294
            with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
295
                call_command('cron')  # silent by default (verbosity=0)
296
                assert cron_worker.call_count == 0
297
                call_command('cron', verbosity=2)  # same if verbosity>0
298
                assert cron_worker.call_count == 0
299
                with mock.patch('wcs.qommon.management.commands.cron.JUMP_TIMEOUT_INTERVAL', -1):
300
                    with pytest.raises(CommandError, match='can not start cron job.*seems old'):
301
                        call_command('cron')
302
                assert cron_worker.call_count == 0
303

  
304
        # verify that the lock is released
305
        with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
306
            call_command('cron', domain='example.net')
307
            assert cron_worker.call_count == 1
308 238

  
309
        # simulate a cron crash
310
        with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
311
            cron_worker.side_effect = NotImplementedError
312
            with pytest.raises(NotImplementedError):
313
                call_command('cron', domain='example.net')
314
            assert cron_worker.call_count == 1
315
        # verify that the lock is released
316
        with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
317
            call_command('cron', domain='example.net')
318
            assert cron_worker.call_count == 1
239
def get_sql_cron_statuses():
240
    conn, cur = sql.get_connection_and_cursor()
241
    cur.execute("SELECT key, value FROM wcs_meta WHERE key LIKE 'cron-status-%'")
242
    rows = cur.fetchall()
243
    conn.commit()
244
    cur.close()
245
    return dict(rows)
319 246

  
320
        # disable cron system
321
        with override_settings(DISABLE_CRON_JOBS=True):
322
            with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
323
                call_command('cron', domain='example.net')
324
                assert cron_worker.call_count == 0
325 247

  
326
        # run a specific job
327
        jobs = []
248
def test_cron_command(settings):
249
    pub = create_temporary_pub()
250
    offset = ord(settings.SECRET_KEY[-1]) % 60
328 251

  
329
        def job1(pub, job=None):
330
            jobs.append('job1')
331

  
332
        def job2(pub, job=None):
333
            jobs.append('job2')
334

  
335
        def job3(pub, job=None):
336
            jobs.append('job3')
337
            for key in ['foo', 'bar', 'blah']:
338
                with job.log_long_job(key):
339
                    pass
340

  
341
        @classmethod
342
        def register_test_cronjobs(cls):
343
            cls.register_cronjob(CronJob(job1, days=[10]))
344
            cls.register_cronjob(CronJob(job2, name='job2', days=[10]))
345
            cls.register_cronjob(CronJob(job3, name='job3', days=[10]))
346

  
347
        with mock.patch('wcs.publisher.WcsPublisher.register_cronjobs', register_test_cronjobs):
348
            get_publisher_class().cronjobs = []
349
            call_command('cron', job_name='job1', domain='example.net')
350
            assert jobs == []
351
            get_publisher_class().cronjobs = []
352
            clear_log_file()
353
            call_command('cron', job_name='job2', domain='example.net')
354
            assert jobs == ['job2']
252
    hostnames = ['example.net', 'foo.bar', 'something.com']
253
    for hostname in hostnames:
254
        if not os.path.exists(os.path.join(pub.APP_DIR, hostname)):
255
            os.mkdir(os.path.join(pub.APP_DIR, hostname))
256
            # add a config.pck with postgresql configuration
257
            with open(os.path.join(pub.APP_DIR, hostname, 'config.pck'), 'wb') as fd:
258
                pickle.dump(pub.cfg, file=fd)
259

  
260
    with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
261
        with mock.patch('wcs.qommon.publisher.QommonPublisher.get_tenants') as mock_tenants:
262
            mock_tenants.return_value = [
263
                Tenant(os.path.join(pub.app_dir, x)) for x in ('example.net', 'foo.bar', 'something.com')
264
            ]
265
            clear_log_files()
266
            call_command('cron')
267
            assert cron_worker.call_count == 3
355 268
            assert get_logs() == [
356 269
                'starting cron (minutes offset is %s)' % offset,
357
                '[tenant example.net] start',
358 270
            ]
359
            get_publisher_class().cronjobs = []
360
            jobs = []
361
            clear_log_file()
362
            with mock.patch('wcs.qommon.cron.CronJob.LONG_JOB_DURATION', 0):
363
                call_command('cron', job_name='job2', domain='example.net')
271
            cron_worker.reset_mock()
272
            clear_log_files()
273
            call_command('cron', domain='example.net')
274
            assert cron_worker.call_count == 1
364 275
            assert get_logs() == [
365 276
                'starting cron (minutes offset is %s)' % offset,
366
                '[tenant example.net] start',
367
                '[tenant example.net] long job: job2 (took 0 minutes)',
368 277
            ]
369
            assert jobs == ['job2']
370
            get_publisher_class().cronjobs = []
371
            jobs = []
372
            clear_log_file()
373
            with mock.patch('wcs.qommon.cron.CronJob.LONG_JOB_DURATION', 0):
374
                call_command('cron', job_name='job3', domain='example.net')
278
            cron_worker.reset_mock()
279

  
280
            # check we're still running them all
281
            call_command('cron')
282
            assert cron_worker.call_count == 3
283
            cron_worker.reset_mock()
284

  
285
            assert get_sql_cron_statuses() == {
286
                'cron-status-example.net': 'needed',
287
                'cron-status-foo.bar': 'needed',
288
                'cron-status-something.com': 'needed',
289
            }
290

  
291
            # disable cron on something.com
292
            site_options_path = os.path.join(pub.APP_DIR, 'something.com', 'site-options.cfg')
293
            with open(site_options_path, 'w') as fd:
294
                fd.write(
295
                    '''\
296
                    [variables]
297
                    disable_cron_jobs = True
298
                    '''
299
                )
300

  
301
            clear_log_files()
302
            call_command('cron')
303
            assert cron_worker.call_count == 2
375 304
            assert get_logs() == [
376 305
                'starting cron (minutes offset is %s)' % offset,
377
                '[tenant example.net] start',
378
                '[tenant example.net] job3: running on "foo" took 0 minutes',
379
                '[tenant example.net] job3: running on "bar" took 0 minutes',
380
                '[tenant example.net] job3: running on "blah" took 0 minutes',
381
                '[tenant example.net] long job: job3 (took 0 minutes)',
382 306
            ]
383
            assert jobs == ['job3']
307
            os.unlink(site_options_path)
308

  
309
            assert get_sql_cron_statuses() == {
310
                'cron-status-example.net': 'needed',
311
                'cron-status-foo.bar': 'needed',
312
                'cron-status-something.com': 'needed',
313
            }
314

  
315
            # simulate a running cron
316
            cron_worker.reset_mock()
317
            settings.CRON_WORKERS = 1
318
            get_publisher().set_tenant_by_hostname('example.net')
319
            sql.mark_cron_status('running')
320
            call_command('cron')
321
            assert cron_worker.call_count == 0
322

  
323
            assert get_sql_cron_statuses() == {
324
                'cron-status-example.net': 'running',
325
                'cron-status-foo.bar': 'needed',
326
                'cron-status-something.com': 'needed',
327
            }
328

  
329
            # with one more worker, the two other tenants can be run sequentially
330
            cron_worker.reset_mock()
331
            settings.CRON_WORKERS = 2
332
            call_command('cron')
333
            assert cron_worker.call_count == 2
334

  
335
            assert get_sql_cron_statuses() == {
336
                'cron-status-example.net': 'running',
337
                'cron-status-foo.bar': 'done',
338
                'cron-status-something.com': 'done',
339
            }
340

  
341
            get_publisher().set_tenant_by_hostname('example.net')
342
            sql.mark_cron_status('running')
343
            get_publisher().set_tenant_by_hostname('foo.bar')
344
            sql.mark_cron_status('running')
345
            get_publisher().set_tenant_by_hostname('something.com')
346
            sql.mark_cron_status('needed')
347

  
348
            assert get_sql_cron_statuses() == {
349
                'cron-status-example.net': 'running',
350
                'cron-status-foo.bar': 'running',
351
                'cron-status-something.com': 'needed',
352
            }
353
            cron_worker.reset_mock()
354
            call_command('cron')
355
            assert cron_worker.call_count == 0
356

  
357
            shutil.rmtree(os.path.join(pub.APP_DIR, 'foo.bar'))
358
            shutil.rmtree(os.path.join(pub.APP_DIR, 'something.com'))
359

  
360
    get_publisher().set_tenant_by_hostname('example.net')
361
    sql.mark_cron_status('needed')
362
    # simulate a cron crash
363
    with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
364
        cron_worker.side_effect = NotImplementedError
365
        with pytest.raises(NotImplementedError):
366
            call_command('cron')
367
        assert cron_worker.call_count == 1
368

  
369
    # verify that the job is not marked as running
370
    assert get_sql_cron_statuses().get('cron-status-example.net') == 'done'
371

  
372
    # disable cron system
373
    with override_settings(DISABLE_CRON_JOBS=True):
374
        with mock.patch('wcs.qommon.management.commands.cron.cron_worker') as cron_worker:
375
            call_command('cron', domain='example.net')
376
            assert cron_worker.call_count == 0
377

  
378

  
379
def test_cron_command_jobs(settings):
380
    create_temporary_pub()
381

  
382
    # run a specific job
383
    jobs = []
384

  
385
    def job1(pub, job=None):
386
        jobs.append('job1')
387

  
388
    def job2(pub, job=None):
389
        jobs.append('job2')
390

  
391
    def job3(pub, job=None):
392
        jobs.append('job3')
393
        for key in ['foo', 'bar', 'blah']:
394
            with job.log_long_job(key):
395
                pass
396

  
397
    @classmethod
398
    def register_test_cronjobs(cls):
399
        cls.register_cronjob(CronJob(job1, days=[10]))
400
        cls.register_cronjob(CronJob(job2, name='job2', days=[10]))
401
        cls.register_cronjob(CronJob(job3, name='job3', days=[10]))
402

  
403
    get_publisher().set_tenant_by_hostname('example.net')
404
    sql.mark_cron_status('needed')
405

  
406
    with mock.patch('wcs.publisher.WcsPublisher.register_cronjobs', register_test_cronjobs):
407
        get_publisher_class().cronjobs = []
408
        call_command('cron', job_name='job1', domain='example.net')
409
        assert jobs == []
410
        get_publisher_class().cronjobs = []
411
        clear_log_files()
412
        call_command('cron', job_name='job2', domain='example.net')
413
        assert jobs == ['job2']
414
        assert get_logs('example.net') == ['start']
415
        get_publisher_class().cronjobs = []
416
        jobs = []
417
        clear_log_files()
418
        with mock.patch('wcs.qommon.cron.CronJob.LONG_JOB_DURATION', 0):
419
            call_command('cron', job_name='job2', domain='example.net')
420
        assert get_logs('example.net') == ['start', 'long job: job2 (took 0 minutes)']
421
        assert jobs == ['job2']
422
        get_publisher_class().cronjobs = []
423
        jobs = []
424
        clear_log_files()
425
        with mock.patch('wcs.qommon.cron.CronJob.LONG_JOB_DURATION', 0):
426
            call_command('cron', job_name='job3', domain='example.net')
427
        assert get_logs('example.net') == [
428
            'start',
429
            'job3: running on "foo" took 0 minutes',
430
            'job3: running on "bar" took 0 minutes',
431
            'job3: running on "blah" took 0 minutes',
432
            'long job: job3 (took 0 minutes)',
433
        ]
434
        assert jobs == ['job3']
384 435

  
385 436

  
386 437
def test_cron_command_delayed_jobs(settings, freezer):
387 438
    pub = create_temporary_pub()
388 439

  
389 440
    offset = ord(settings.SECRET_KEY[-1]) % 60
390
    with mock.patch('tempfile.gettempdir') as gettempdir:
391
        gettempdir.side_effect = lambda: pub.app_dir
392 441

  
393
        jobs = []
442
    jobs = []
394 443

  
395
        def job1(pub, job=None):
396
            jobs.append('job1')
444
    def job1(pub, job=None):
445
        jobs.append('job1')
397 446

  
398
        def job2(pub, job=None):
399
            jobs.append('job2')
447
    def job2(pub, job=None):
448
        jobs.append('job2')
400 449

  
401
        def job3(pub, job=None):
402
            jobs.append('job3')
450
    def job3(pub, job=None):
451
        jobs.append('job3')
403 452

  
404
        start_time = datetime.datetime(2021, 4, 6, 2, offset)
453
    start_time = datetime.datetime(2021, 4, 6, 2, offset)
405 454

  
406
        @classmethod
407
        def register_test_cronjobs(cls):
408
            cls.register_cronjob(CronJob(job1, minutes=[0, 3]))
409
            cls.register_cronjob(CronJob(job2, minutes=[2]))
410
            cls.register_cronjob(CronJob(job3, minutes=[10]))
455
    @classmethod
456
    def register_test_cronjobs(cls):
457
        cls.register_cronjob(CronJob(job1, minutes=[0, 3]))
458
        cls.register_cronjob(CronJob(job2, minutes=[2]))
459
        cls.register_cronjob(CronJob(job3, minutes=[10]))
411 460

  
412
        freezer.move_to(start_time)
413
        with mock.patch('wcs.publisher.WcsPublisher.register_cronjobs', register_test_cronjobs):
414
            get_publisher_class().cronjobs = []
415
            call_command('cron', domain='example.net')
416
            assert jobs == ['job1']
461
    freezer.move_to(start_time)
462
    with mock.patch('wcs.publisher.WcsPublisher.register_cronjobs', register_test_cronjobs):
463
        get_publisher_class().cronjobs = []
464
        call_command('cron', domain='example.net')
465
        assert jobs == ['job1']
417 466

  
418
        # reset cronjobs
419
        pub.cronjobs = []
467
    # reset cronjobs
468
    pub.cronjobs = []
420 469

  
421
        jobs = []
470
    jobs = []
422 471

  
423
        def job1_delay(pub, job=None):
424
            jobs.append('job1')
425
            freezer.move_to(datetime.timedelta(minutes=5))
472
    def job1_delay(pub, job=None):
473
        jobs.append('job1')
474
        freezer.move_to(datetime.timedelta(minutes=5))
426 475

  
427
        @classmethod
428
        def register_test_cronjobs_2(cls):
429
            cls.register_cronjob(CronJob(job1_delay, minutes=[0, 3]))
430
            cls.register_cronjob(CronJob(job2, minutes=[2]))
431
            cls.register_cronjob(CronJob(job3, minutes=[10]))
476
    @classmethod
477
    def register_test_cronjobs_2(cls):
478
        cls.register_cronjob(CronJob(job1_delay, minutes=[0, 3]))
479
        cls.register_cronjob(CronJob(job2, minutes=[2]))
480
        cls.register_cronjob(CronJob(job3, minutes=[10]))
432 481

  
433
        freezer.move_to(start_time)
434
        with mock.patch('wcs.publisher.WcsPublisher.register_cronjobs', register_test_cronjobs_2):
435
            get_publisher_class().cronjobs = []
436
            call_command('cron', domain='example.net')
437
            assert jobs == ['job1', 'job2']
482
    freezer.move_to(start_time)
483
    with mock.patch('wcs.publisher.WcsPublisher.register_cronjobs', register_test_cronjobs_2):
484
        get_publisher_class().cronjobs = []
485
        call_command('cron', domain='example.net')
486
        assert jobs == ['job1', 'job2']
438 487

  
439 488

  
440 489
def test_clean_afterjobs():
wcs/qommon/cron.py
54 54
                self.log('long job: %s (took %s minutes)' % (self.name, minutes))
55 55

  
56 56
    @staticmethod
57
    def log(message, with_tenant=True):
58
        tenant_prefix = ''
57
    def log(message, in_tenant=True):
59 58
        now = localtime()
60
        if with_tenant:
61
            tenant_prefix = '[tenant %s] ' % get_publisher().tenant.hostname
62
        with open(os.path.join(get_publisher().APP_DIR, 'cron.log-%s' % now.strftime('%Y%m%d')), 'a+') as fd:
63
            fd.write('%s %s%s\n' % (now.isoformat(), tenant_prefix, message))
59
        if in_tenant:
60
            base_dir = get_publisher().tenant.directory
61
        else:
62
            base_dir = get_publisher().APP_DIR
63
        with open(os.path.join(base_dir, 'cron.log-%s' % now.strftime('%Y%m%d')), 'a+') as fd:
64
            fd.write('%s %s\n' % (now.isoformat(), message))
64 65

  
65 66
    def is_time(self, timetuple):
66 67
        minutes = self.minutes
wcs/qommon/management/commands/cron.py
14 14
# You should have received a copy of the GNU General Public License
15 15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import os
18
import tempfile
19 17
import time
20 18

  
21 19
from django.conf import settings
22
from django.core.management.base import BaseCommand, CommandError
20
from django.core.management.base import BaseCommand
23 21

  
22
from wcs import sql
24 23
from wcs.qommon.cron import CronJob, cron_worker
25 24
from wcs.qommon.publisher import get_publisher_class
26
from wcs.qommon.vendor import locket
27
from wcs.wf.jump import JUMP_TIMEOUT_INTERVAL
28 25

  
29 26

  
30 27
class Command(BaseCommand):
......
41 38
        )
42 39
        parser.add_argument('--job', dest='job_name', metavar='NAME')
43 40

  
44
    def handle(self, verbosity, **options):
41
    def handle(self, verbosity, domain=None, job_name=None, **options):
45 42
        if getattr(settings, 'DISABLE_CRON_JOBS', False) and not options['force_job']:
46 43
            if verbosity > 1:
47 44
                print('Command is ignored because DISABLE_CRON_JOBS is set in settings')
48 45
            return
49
        lockfile = os.path.join(tempfile.gettempdir(), 'wcs-cron-in-progress.lock')
50
        if options.get('domain'):
51
            domains = [options.get('domain')]
46
        if domain:
47
            domains = [domain]
52 48
        else:
53 49
            domains = [x.hostname for x in get_publisher_class().get_tenants()]
54
        try:
55
            with locket.lock_file(lockfile, timeout=0):
56
                if verbosity > 2:
57
                    print('cron start (lock %s)' % lockfile)
58
                now = time.localtime()
59
                publisher_class = get_publisher_class()
60
                publisher_class.register_cronjobs()
61
                publisher = publisher_class.create_publisher()
62
                offset = ord(settings.SECRET_KEY[-1]) % 60
63
                CronJob.log('starting cron (minutes offset is %s)' % offset, with_tenant=False)
64
                for hostname in domains:
65
                    publisher.set_tenant_by_hostname(hostname)
66
                    if publisher.get_site_option('disable_cron_jobs', 'variables'):
67
                        if verbosity > 1:
68
                            print('cron ignored on %s because DISABLE_CRON_JOBS is set' % hostname)
69
                        continue
70
                    if not publisher.has_postgresql_config():
71
                        if verbosity > 1:
72
                            print('cron ignored on %s because it has no PostgreSQL configuration' % hostname)
73
                        continue
50
        if not job_name and verbosity > 2:
51
            print('cron start')
52
        now = time.localtime()
53
        publisher_class = get_publisher_class()
54
        publisher_class.register_cronjobs()
55
        publisher = publisher_class.create_publisher()
56
        offset = ord(settings.SECRET_KEY[-1]) % 60
57
        if not job_name:
58
            CronJob.log('starting cron (minutes offset is %s)' % offset, in_tenant=False)
59
        tenant_status = {
60
            'needed': [],
61
            'running': [],
62
            'done': [],
63
        }
64
        for hostname in domains:
65
            publisher.set_tenant_by_hostname(hostname)
66
            if publisher.get_site_option('disable_cron_jobs', 'variables'):
67
                if verbosity > 1:
68
                    print('cron ignored on %s because DISABLE_CRON_JOBS is set' % hostname)
69
                continue
70
            if not publisher.has_postgresql_config():
71
                if verbosity > 1:
72
                    print('cron ignored on %s because it has no PostgreSQL configuration' % hostname)
73
                continue
74
            if domain:
75
                cron_status = 'ignored'
76
            else:
77
                cron_status = sql.get_and_update_cron_status()
78
                tenant_status[cron_status].append(hostname)
79
            if not options['force_job']:
80
                if len(tenant_status['running']) >= settings.CRON_WORKERS:
81
                    if cron_status == 'needed':
82
                        # unmark current tenant as being running
83
                        sql.mark_cron_status('needed')
74 84
                    if verbosity > 1:
75
                        print('cron work on %s' % hostname)
76
                    CronJob.log('start')
77
                    try:
78
                        cron_worker(publisher, now, job_name=options.get('job_name'))
79
                    except Exception as e:
80
                        CronJob.log('aborted (%r)' % e)
81
                        raise e
82
            if verbosity > 2:
83
                print('cron end (release lock %s)' % lockfile)
84
        except locket.LockError:
85
            age = int(time.time() - os.stat(lockfile).st_mtime)
86
            if age > JUMP_TIMEOUT_INTERVAL * 10:
87
                raise CommandError(
88
                    'can not start cron job, locked by %s for %s seconds (seems old)' % (lockfile, age)
89
                )
90
            if verbosity > 0:
91
                print('can not start cron job, locked by %s for %s seconds' % (lockfile, age))
85
                        print(hostname, 'skip running, too many workers')
86
                    break
87
                if cron_status in ('running', 'done'):
88
                    if verbosity > 1:
89
                        print(hostname, 'skip running, already handled')
90
                    continue
91
            if verbosity > 1:
92
                print('cron work on %s' % hostname)
93
            CronJob.log('start')
94
            try:
95
                cron_worker(publisher, now, job_name=job_name)
96
            except Exception as e:
97
                CronJob.log('aborted (%r)' % e)
98
                raise e
99
            finally:
100
                if not domain:
101
                    tenant_status['needed'].remove(hostname)
102
                    tenant_status['done'].append(hostname)
103
                    sql.mark_cron_status('done')
104

  
105
        if tenant_status['done'] and not (tenant_status['needed'] or tenant_status['running']):
106
            # reset everything if no hostnames are marked as needed or running
107
            for hostname in tenant_status['done']:
108
                publisher.set_tenant_by_hostname(hostname)
109
                sql.mark_cron_status('needed')
110

  
111
        if verbosity > 1:
112
            print('cron end')
wcs/settings.py
193 193
# workalendar config
194 194
WORKING_DAY_CALENDAR = 'workalendar.europe.France'
195 195

  
196
# CRON_WORKERS
197
CRON_WORKERS = os.cpu_count() // 2 + 1
198

  
196 199
local_settings_file = os.environ.get(
197 200
    'WCS_SETTINGS_FILE', os.path.join(os.path.dirname(__file__), 'local_settings.py')
198 201
)
wcs/sql.py
4416 4416
    return sql_level
4417 4417

  
4418 4418

  
4419
@guard_postgres
4420
def get_and_update_cron_status():
4421
    conn, cur = get_connection_and_cursor()
4422
    do_meta_table(conn, cur, insert_current_sql_level=False)
4423
    key = 'cron-status-%s' % get_publisher().tenant.hostname
4424
    cur.execute("SELECT value FROM wcs_meta WHERE key = %s", (key,))
4425
    row = cur.fetchone()
4426
    if row is None:
4427
        cur.execute("INSERT INTO wcs_meta (id, key, value) VALUES (DEFAULT, %s, 'running')", (key,))
4428
        status = 'needed'
4429
    elif row[0] == 'needed':
4430
        cur.execute("UPDATE wcs_meta SET value = 'running', updated_at = NOW() WHERE key = %s", (key,))
4431
        status = 'needed'
4432
    else:
4433
        status = row[0]
4434
    conn.commit()
4435
    cur.close()
4436
    return status
4437

  
4438

  
4439
@guard_postgres
4440
def mark_cron_status(status):
4441
    conn, cur = get_connection_and_cursor()
4442
    key = 'cron-status-%s' % get_publisher().tenant.hostname
4443
    cur.execute("UPDATE wcs_meta SET value = %s, updated_at = NOW() WHERE key = %s", (status, key))
4444
    conn.commit()
4445
    cur.close()
4446

  
4447

  
4419 4448
@guard_postgres
4420 4449
def is_reindex_needed(index, conn, cur):
4421 4450
    do_meta_table(conn, cur, insert_current_sql_level=False)
4422
-