Projet

Général

Profil

0002-use-pg_advisory_lock-to-prevent-running-cronjobs-dur.patch

Benjamin Dauvergne, 30 avril 2018 12:51

Télécharger (8 ko)

Voir les différences:

Subject: [PATCH 2/2] use pg_advisory_lock to prevent running cronjobs during
 migrations and on different servers (fixes #15470)

 hobo/multitenant/locking.py                   | 47 +++++++++++++++++++
 .../management/commands/migrate_schemas.py    | 11 +++++
 .../management/commands/tenant_command.py     | 25 ++++++++--
 tests_multitenant/test_tenant_command.py      | 34 ++++++++++++++
 4 files changed, 114 insertions(+), 3 deletions(-)
 create mode 100644 hobo/multitenant/locking.py
hobo/multitenant/locking.py
1
from django.db import connection, Error
2

  
3
MIGRATION_LOCK_ID = 9999999
4

  
5

  
6
class Locked(RuntimeError):
7
    pass
8

  
9

  
10
class PGLock(object):
11
    def __init__(self, lock_id, wait=2, shared=False):
12
        self.lock_id = lock_id
13
        self.wait = wait
14
        self.shared = shared
15

  
16
    def __enter__(self):
17
        take_lock = 'PG_ADVISORY_LOCK%s' % ('_SHARED' if self.shared else '')
18
        try_lock = 'PG_TRY_ADVISORY_LOCK%s' % ('_SHARED' if self.shared else '')
19
        with connection.cursor() as c:
20
            if self.wait == 0:
21
                c.execute('SELECT %s(%%s)' % try_lock, [self.lock_id])
22
                if not c.fetchone()[0]:
23
                    raise Locked()
24
            else:
25
                if self.wait:
26
                    c.execute("SET lock_timeout = '%ss'" % self.wait)
27
                try:
28
                    c.execute('SELECT %s(%%s)' % take_lock, [self.lock_id])
29
                except Error:
30
                    raise Locked()
31

  
32
    def __exit__(self, *args, **kwargs):
33
        with connection.cursor() as c:
34
            c.execute('SELECT PG_ADVISORY_UNLOCK(%s)', [self.lock_id])
35

  
36

  
37
def take_global_lock(lock_id, wait=2, shared=False):
38
    '''Prevent global actions, like cron or migration to run at the same time'''
39
    return PGLock(lock_id, wait=wait, shared=shared)
40

  
41

  
42
def take_shared_migration_lock():
43
    return take_global_lock(wait=0, lock_id=MIGRATION_LOCK_ID, shared=True)
44

  
45

  
46
def take_exclusive_migration_lock():
47
    return take_global_lock(wait=None, lock_id=MIGRATION_LOCK_ID)
hobo/multitenant/management/commands/migrate_schemas.py
4 4
if django.VERSION >= (1, 7, 0):
5 5
    from django.core.management.commands.migrate import Command as MigrateCommand
6 6
    from django.db.migrations.recorder import MigrationRecorder
7
from django.core.management.base import CommandError
7 8
from django.db import connection
8 9
from django.conf import settings
9 10

  
10 11
from tenant_schemas.utils import get_public_schema_name, schema_exists
11 12
from hobo.multitenant.middleware import TenantMiddleware, TenantNotFound
12 13
from hobo.multitenant.management.commands import SyncCommon
14
from hobo.multitenant.locking import take_exclusive_migration_lock, Locked
13 15

  
14 16

  
15 17
class MigrateSchemasCommand(SyncCommon):
......
34 36

  
35 37
    def handle(self, *args, **options):
36 38
        super(MigrateSchemasCommand, self).handle(*args, **options)
39
        # migration job can wait for finishing cron jobs
40

  
41
        try:
42
            with take_exclusive_migration_lock():
43
                self.handle_locked(*args, **options)
44
        except Locked:
45
            raise CommandError('global migration lock is taken')
46

  
47
    def handle_locked(self, *args, **options):
37 48
        self.PUBLIC_SCHEMA_NAME = get_public_schema_name()
38 49

  
39 50
        if self.sync_public and not self.schema_name:
hobo/multitenant/management/commands/tenant_command.py
10 10

  
11 11
from hobo.multitenant.management.commands import InteractiveTenantOption
12 12
from hobo.multitenant.middleware import TenantMiddleware
13
from hobo.multitenant.locking import take_global_lock, take_shared_migration_lock, Locked
14

  
13 15

  
14 16
class Command(InteractiveTenantOption, BaseCommand):
15 17
    help = "Wrapper around django commands for use with an individual tenant"
......
40 42
        args_parser.add_argument("--all-tenants", help="apply command to all tenants",
41 43
                                 action='store_true')
42 44
        args_parser.add_argument("-d", "--domain", dest="domain_name", help="specify tenant domain name")
45
        args_parser.add_argument("--lock", dest="lock", type=int,
46
                                 help="take a global lock id or fail",
47
                                 default=False)
43 48
        args_namespace, args = args_parser.parse_known_args(argv)
44 49

  
45 50
        if args_namespace.all_tenants:
46
            for tenant in TenantMiddleware.get_tenants():
47
                connection.set_tenant(tenant)
48
                klass.run_from_argv(args)
51
            try:
52
                with take_shared_migration_lock():
53
                    def run_for_all():
54
                        for tenant in TenantMiddleware.get_tenants():
55
                            connection.set_tenant(tenant)
56
                            klass.run_from_argv(args)
57

  
58
                    if args_namespace.lock:
59
                        try:
60
                            with take_global_lock(wait=0, lock_id=args_namespace.lock):
61
                                run_for_all()
62
                        except Locked:
63
                            raise CommandError('lock %s is taken' % args_namespace.lock)
64
                    else:
65
                        run_for_all()
66
            except Locked:
67
                raise CommandError('migration lock is taken')
49 68
        else:
50 69
            tenant = self.get_tenant_from_options_or_interactive(domain=args_namespace.domain_name)
51 70
            connection.set_tenant(tenant)
tests_multitenant/test_tenant_command.py
1 1
import pytest
2 2
import mock
3 3
import os
4
import threading
4 5

  
5 6
from django.core.management import execute_from_command_line
7
from django.core.management.base import CommandError
6 8

  
9
from hobo.multitenant.locking import take_exclusive_migration_lock, take_global_lock
7 10
from hobo.multitenant.middleware import TenantMiddleware
8 11

  
9 12
pytestmark = pytest.mark.django_db
......
58 61
        assert False
59 62
    all_tenants = list(TenantMiddleware.get_tenants())
60 63
    assert len(all_tenants) == 1
64

  
65

  
66
def run_all_tenant_command(other_args=[]):
67
    shared = [None]
68

  
69
    def thread_run():
70
        with pytest.raises(CommandError) as e:
71
            execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '--all-tenants'] + other_args)
72
        shared[0] = e.value
73
    t = threading.Thread(target=thread_run)
74
    t.start()
75
    t.join()
76
    return shared[0]
77

  
78

  
79
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
80
def test_all_tenants_migration_lock(handle, tenants):
81
    with take_exclusive_migration_lock():
82
        result = run_all_tenant_command()
83

  
84
    assert str(result) == 'migration lock is taken'
85
    assert handle.call_count == 0
86

  
87

  
88
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
89
def test_all_tenants_lock(handle, tenants):
90
    with take_global_lock(lock_id=999):
91
        result = run_all_tenant_command(['--lock', '999'])
92

  
93
    assert str(result) == 'lock 999 is taken'
94
    assert handle.call_count == 0
61
-