0002-use-pg_advisory_lock-to-prevent-running-cronjobs-dur.patch
hobo/multitenant/locking.py | ||
---|---|---|
1 |
from django.db import connection, Error |
|
2 | ||
3 |
MIGRATION_LOCK_ID = 9999999 |
|
4 | ||
5 | ||
6 |
class Locked(RuntimeError): |
|
7 |
pass |
|
8 | ||
9 | ||
10 |
class PGLock(object): |
|
11 |
def __init__(self, lock_id, wait=2, shared=False): |
|
12 |
self.lock_id = lock_id |
|
13 |
self.wait = wait |
|
14 |
self.shared = shared |
|
15 | ||
16 |
def __enter__(self): |
|
17 |
take_lock = 'PG_ADVISORY_LOCK%s' % ('_SHARED' if self.shared else '') |
|
18 |
try_lock = 'PG_TRY_ADVISORY_LOCK%s' % ('_SHARED' if self.shared else '') |
|
19 |
with connection.cursor() as c: |
|
20 |
if self.wait == 0: |
|
21 |
c.execute('SELECT %s(%%s)' % try_lock, [self.lock_id]) |
|
22 |
if not c.fetchone()[0]: |
|
23 |
raise Locked() |
|
24 |
else: |
|
25 |
if self.wait: |
|
26 |
c.execute("SET lock_timeout = '%ss'" % self.wait) |
|
27 |
try: |
|
28 |
c.execute('SELECT %s(%%s)' % take_lock, [self.lock_id]) |
|
29 |
except Error: |
|
30 |
raise Locked() |
|
31 | ||
32 |
def __exit__(self, *args, **kwargs): |
|
33 |
with connection.cursor() as c: |
|
34 |
c.execute('SELECT PG_ADVISORY_UNLOCK(%s)', [self.lock_id]) |
|
35 | ||
36 | ||
37 |
def take_global_lock(lock_id, wait=2, shared=False): |
|
38 |
'''Prevent global actions, like cron or migration to run at the same time''' |
|
39 |
return PGLock(lock_id, wait=wait, shared=shared) |
|
40 | ||
41 | ||
42 |
def take_shared_migration_lock(): |
|
43 |
return take_global_lock(wait=0, lock_id=MIGRATION_LOCK_ID, shared=True) |
|
44 | ||
45 | ||
46 |
def take_exclusive_migration_lock(): |
|
47 |
return take_global_lock(wait=None, lock_id=MIGRATION_LOCK_ID) |
hobo/multitenant/management/commands/migrate_schemas.py | ||
---|---|---|
4 | 4 |
if django.VERSION >= (1, 7, 0): |
5 | 5 |
from django.core.management.commands.migrate import Command as MigrateCommand |
6 | 6 |
from django.db.migrations.recorder import MigrationRecorder |
7 |
from django.core.management.base import CommandError |
|
7 | 8 |
from django.db import connection |
8 | 9 |
from django.conf import settings |
9 | 10 | |
10 | 11 |
from tenant_schemas.utils import get_public_schema_name, schema_exists |
11 | 12 |
from hobo.multitenant.middleware import TenantMiddleware, TenantNotFound |
12 | 13 |
from hobo.multitenant.management.commands import SyncCommon |
14 |
from hobo.multitenant.locking import take_exclusive_migration_lock, Locked |
|
13 | 15 | |
14 | 16 | |
15 | 17 |
class MigrateSchemasCommand(SyncCommon): |
... | ... | |
34 | 36 | |
35 | 37 |
def handle(self, *args, **options): |
36 | 38 |
super(MigrateSchemasCommand, self).handle(*args, **options) |
39 |
# migration job can wait for finishing cron jobs |
|
40 | ||
41 |
try: |
|
42 |
with take_exclusive_migration_lock(): |
|
43 |
self.handle_locked(*args, **options) |
|
44 |
except Locked: |
|
45 |
raise CommandError('global migration lock is taken') |
|
46 | ||
47 |
def handle_locked(self, *args, **options): |
|
37 | 48 |
self.PUBLIC_SCHEMA_NAME = get_public_schema_name() |
38 | 49 | |
39 | 50 |
if self.sync_public and not self.schema_name: |
hobo/multitenant/management/commands/tenant_command.py | ||
---|---|---|
10 | 10 | |
11 | 11 |
from hobo.multitenant.management.commands import InteractiveTenantOption |
12 | 12 |
from hobo.multitenant.middleware import TenantMiddleware |
13 |
from hobo.multitenant.locking import take_global_lock, take_shared_migration_lock, Locked |
|
14 | ||
13 | 15 | |
14 | 16 |
class Command(InteractiveTenantOption, BaseCommand): |
15 | 17 |
help = "Wrapper around django commands for use with an individual tenant" |
... | ... | |
40 | 42 |
args_parser.add_argument("--all-tenants", help="apply command to all tenants", |
41 | 43 |
action='store_true') |
42 | 44 |
args_parser.add_argument("-d", "--domain", dest="domain_name", help="specify tenant domain name") |
45 |
args_parser.add_argument("--lock", dest="lock", type=int, |
|
46 |
help="take a global lock id or fail", |
|
47 |
default=False) |
|
43 | 48 |
args_namespace, args = args_parser.parse_known_args(argv) |
44 | 49 | |
45 | 50 |
if args_namespace.all_tenants: |
46 |
for tenant in TenantMiddleware.get_tenants(): |
|
47 |
connection.set_tenant(tenant) |
|
48 |
klass.run_from_argv(args) |
|
51 |
try: |
|
52 |
with take_shared_migration_lock(): |
|
53 |
def run_for_all(): |
|
54 |
for tenant in TenantMiddleware.get_tenants(): |
|
55 |
connection.set_tenant(tenant) |
|
56 |
klass.run_from_argv(args) |
|
57 | ||
58 |
if args_namespace.lock: |
|
59 |
try: |
|
60 |
with take_global_lock(wait=0, lock_id=args_namespace.lock): |
|
61 |
run_for_all() |
|
62 |
except Locked: |
|
63 |
raise CommandError('lock %s is taken' % args_namespace.lock) |
|
64 |
else: |
|
65 |
run_for_all() |
|
66 |
except Locked: |
|
67 |
raise CommandError('migration lock is taken') |
|
49 | 68 |
else: |
50 | 69 |
tenant = self.get_tenant_from_options_or_interactive(domain=args_namespace.domain_name) |
51 | 70 |
connection.set_tenant(tenant) |
tests_multitenant/test_tenant_command.py | ||
---|---|---|
1 | 1 |
import pytest |
2 | 2 |
import mock |
3 | 3 |
import os |
4 |
import threading |
|
4 | 5 | |
5 | 6 |
from django.core.management import execute_from_command_line |
7 |
from django.core.management.base import CommandError |
|
6 | 8 | |
9 |
from hobo.multitenant.locking import take_exclusive_migration_lock, take_global_lock |
|
7 | 10 |
from hobo.multitenant.middleware import TenantMiddleware |
8 | 11 | |
9 | 12 |
pytestmark = pytest.mark.django_db |
... | ... | |
58 | 61 |
assert False |
59 | 62 |
all_tenants = list(TenantMiddleware.get_tenants()) |
60 | 63 |
assert len(all_tenants) == 1 |
64 | ||
65 | ||
66 |
def run_all_tenant_command(other_args=[]): |
|
67 |
shared = [None] |
|
68 | ||
69 |
def thread_run(): |
|
70 |
with pytest.raises(CommandError) as e: |
|
71 |
execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '--all-tenants'] + other_args) |
|
72 |
shared[0] = e.value |
|
73 |
t = threading.Thread(target=thread_run) |
|
74 |
t.start() |
|
75 |
t.join() |
|
76 |
return shared[0] |
|
77 | ||
78 | ||
79 |
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle') |
|
80 |
def test_all_tenants_migration_lock(handle, tenants): |
|
81 |
with take_exclusive_migration_lock(): |
|
82 |
result = run_all_tenant_command() |
|
83 | ||
84 |
assert str(result) == 'migration lock is taken' |
|
85 |
assert handle.call_count == 0 |
|
86 | ||
87 | ||
88 |
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle') |
|
89 |
def test_all_tenants_lock(handle, tenants): |
|
90 |
with take_global_lock(lock_id=999): |
|
91 |
result = run_all_tenant_command(['--lock', '999']) |
|
92 | ||
93 |
assert str(result) == 'lock 999 is taken' |
|
94 |
assert handle.call_count == 0 |
|
61 |
- |