From 8bc34947b1a65c71723560f356782d411c1f24cb Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Fri, 27 Apr 2018 19:25:48 +0200 Subject: [PATCH 2/2] use pg_advisory_lock to prevent running cronjobs during migrations and on different servers (fixes #15470) --- hobo/multitenant/locking.py | 47 +++++++++++++++++++ .../management/commands/migrate_schemas.py | 11 +++++ .../management/commands/tenant_command.py | 25 ++++++++-- tests_multitenant/test_tenant_command.py | 34 ++++++++++++++ 4 files changed, 114 insertions(+), 3 deletions(-) create mode 100644 hobo/multitenant/locking.py diff --git a/hobo/multitenant/locking.py b/hobo/multitenant/locking.py new file mode 100644 index 0000000..a666767 --- /dev/null +++ b/hobo/multitenant/locking.py @@ -0,0 +1,47 @@ +from django.db import connection, Error + +MIGRATION_LOCK_ID = 9999999 + + +class Locked(RuntimeError): + pass + + +class PGLock(object): + def __init__(self, lock_id, wait=2, shared=False): + self.lock_id = lock_id + self.wait = wait + self.shared = shared + + def __enter__(self): + take_lock = 'PG_ADVISORY_LOCK%s' % ('_SHARED' if self.shared else '') + try_lock = 'PG_TRY_ADVISORY_LOCK%s' % ('_SHARED' if self.shared else '') + with connection.cursor() as c: + if self.wait == 0: + c.execute('SELECT %s(%%s)' % try_lock, [self.lock_id]) + if not c.fetchone()[0]: + raise Locked() + else: + if self.wait: + c.execute("SET lock_timeout = '%ss'" % self.wait) + try: + c.execute('SELECT %s(%%s)' % take_lock, [self.lock_id]) + except Error: + raise Locked() + + def __exit__(self, *args, **kwargs): + with connection.cursor() as c: + c.execute('SELECT PG_ADVISORY_UNLOCK(%s)', [self.lock_id]) + + +def take_global_lock(lock_id, wait=2, shared=False): + '''Prevent global actions, like cron or migration to run at the same time''' + return PGLock(lock_id, wait=wait, shared=shared) + + +def take_shared_migration_lock(): + return take_global_lock(wait=0, lock_id=MIGRATION_LOCK_ID, shared=True) + + +def take_exclusive_migration_lock(): + return take_global_lock(wait=None, lock_id=MIGRATION_LOCK_ID) diff --git a/hobo/multitenant/management/commands/migrate_schemas.py b/hobo/multitenant/management/commands/migrate_schemas.py index 4c81574..2ed6031 100644 --- a/hobo/multitenant/management/commands/migrate_schemas.py +++ b/hobo/multitenant/management/commands/migrate_schemas.py @@ -4,12 +4,14 @@ from optparse import NO_DEFAULT if django.VERSION >= (1, 7, 0): from django.core.management.commands.migrate import Command as MigrateCommand from django.db.migrations.recorder import MigrationRecorder +from django.core.management.base import CommandError from django.db import connection from django.conf import settings from tenant_schemas.utils import get_public_schema_name, schema_exists from hobo.multitenant.middleware import TenantMiddleware, TenantNotFound from hobo.multitenant.management.commands import SyncCommon +from hobo.multitenant.locking import take_exclusive_migration_lock, Locked class MigrateSchemasCommand(SyncCommon): @@ -34,6 +36,15 @@ class MigrateSchemasCommand(SyncCommon): def handle(self, *args, **options): super(MigrateSchemasCommand, self).handle(*args, **options) + # migration job can wait for finishing cron jobs + + try: + with take_exclusive_migration_lock(): + self.handle_locked(*args, **options) + except Locked: + raise CommandError('global migration lock is taken') + + def handle_locked(self, *args, **options): self.PUBLIC_SCHEMA_NAME = get_public_schema_name() if self.sync_public and not self.schema_name: diff --git a/hobo/multitenant/management/commands/tenant_command.py b/hobo/multitenant/management/commands/tenant_command.py index e80c9be..1aba95d 100644 --- a/hobo/multitenant/management/commands/tenant_command.py +++ b/hobo/multitenant/management/commands/tenant_command.py @@ -10,6 +10,8 @@ from django.db import connection from hobo.multitenant.management.commands import InteractiveTenantOption from hobo.multitenant.middleware import TenantMiddleware +from hobo.multitenant.locking import take_global_lock, take_shared_migration_lock, Locked + class Command(InteractiveTenantOption, BaseCommand): help = "Wrapper around django commands for use with an individual tenant" @@ -40,12 +42,29 @@ class Command(InteractiveTenantOption, BaseCommand): args_parser.add_argument("--all-tenants", help="apply command to all tenants", action='store_true') args_parser.add_argument("-d", "--domain", dest="domain_name", help="specify tenant domain name") + args_parser.add_argument("--lock", dest="lock", type=int, + help="take a global lock id or fail", + default=False) args_namespace, args = args_parser.parse_known_args(argv) if args_namespace.all_tenants: - for tenant in TenantMiddleware.get_tenants(): - connection.set_tenant(tenant) - klass.run_from_argv(args) + try: + with take_shared_migration_lock(): + def run_for_all(): + for tenant in TenantMiddleware.get_tenants(): + connection.set_tenant(tenant) + klass.run_from_argv(args) + + if args_namespace.lock: + try: + with take_global_lock(wait=0, lock_id=args_namespace.lock): + run_for_all() + except Locked: + raise CommandError('lock %s is taken' % args_namespace.lock) + else: + run_for_all() + except Locked: + raise CommandError('migration lock is taken') else: tenant = self.get_tenant_from_options_or_interactive(domain=args_namespace.domain_name) connection.set_tenant(tenant) diff --git a/tests_multitenant/test_tenant_command.py b/tests_multitenant/test_tenant_command.py index d416d2f..e72cece 100644 --- a/tests_multitenant/test_tenant_command.py +++ b/tests_multitenant/test_tenant_command.py @@ -1,9 +1,12 @@ import pytest import mock import os +import threading from django.core.management import execute_from_command_line +from django.core.management.base import CommandError +from hobo.multitenant.locking import take_exclusive_migration_lock, take_global_lock from hobo.multitenant.middleware import TenantMiddleware pytestmark = pytest.mark.django_db @@ -58,3 +61,34 @@ def test_delete_tenant(tenants): assert False all_tenants = list(TenantMiddleware.get_tenants()) assert len(all_tenants) == 1 + + +def run_all_tenant_command(other_args=[]): + shared = [None] + + def thread_run(): + with pytest.raises(CommandError) as e: + execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '--all-tenants'] + other_args) + shared[0] = e.value + t = threading.Thread(target=thread_run) + t.start() + t.join() + return shared[0] + + +@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle') +def test_all_tenants_migration_lock(handle, tenants): + with take_exclusive_migration_lock(): + result = run_all_tenant_command() + + assert str(result) == 'migration lock is taken' + assert handle.call_count == 0 + + +@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle') +def test_all_tenants_lock(handle, tenants): + with take_global_lock(lock_id=999): + result = run_all_tenant_command(['--lock', '999']) + + assert str(result) == 'lock 999 is taken' + assert handle.call_count == 0 -- 2.17.0