0001-multitenant-add-delete_tenant-command-15513.patch
hobo/multitenant/management/commands/delete_tenant.py | ||
---|---|---|
1 |
import sys |
|
2 |
from optparse import make_option |
|
3 | ||
4 |
from django.core.management.base import CommandError, BaseCommand |
|
5 |
from hobo.multitenant.middleware import TenantMiddleware |
|
6 | ||
7 | ||
8 |
class Command(BaseCommand): |
|
9 |
help = "Delete tenant(s) by hostname(s)" |
|
10 |
args = ['...'] |
|
11 |
option_list = BaseCommand.option_list + ( |
|
12 |
make_option('--force-drop', action='store_true', default=False, |
|
13 |
help='If you want the schema to be deleted from database'), |
|
14 |
) |
|
15 | ||
16 |
def handle(self, *args, **options): |
|
17 | ||
18 |
if not args: |
|
19 |
raise CommandError("you must give at least one tenant hostname") |
|
20 | ||
21 |
# if - is given on the command line, get list of hostnames from stdin |
|
22 |
if '-' in args: |
|
23 |
args = list(args) |
|
24 |
args.remove('-') |
|
25 |
args.extend([x.strip() for x in sys.stdin.readlines()]) |
|
26 |
for hostname in args: |
|
27 |
tenant = TenantMiddleware.get_tenant_by_hostname(hostname) |
|
28 |
tenant.delete(force_drop=options['force_drop']) |
|
29 |
hobo/multitenant/models.py | ||
---|---|---|
1 | 1 |
import os |
2 | 2 |
from urlparse import urljoin |
3 | 3 |
import json |
4 |
from shutil import rmtree |
|
4 | 5 | |
5 | 6 |
from django.conf import settings |
6 | 7 |
from django.db import connection |
8 |
from django.utils import timezone |
|
7 | 9 | |
8 | 10 |
from tenant_schemas.utils import get_public_schema_name |
9 | 11 |
from tenant_schemas.models import TenantMixin |
... | ... | |
64 | 66 |
raise Exception("Can't delete tenant outside it's own schema or " |
65 | 67 |
"the public schema. Current schema is %s." |
66 | 68 |
% connection.schema_name) |
67 | ||
68 |
os.rename(self.get_directory(), self.get_directory() + '.invalid') |
|
69 |
if force_drop: |
|
70 |
rmtree(self.get_directory()) |
|
71 |
else: |
|
72 |
deletion_date = timezone.now().strftime('%Y%m%d_%H%M%S_%f') |
|
73 |
os.rename(self.get_directory(), self.get_directory() + '.removed_%s.invalid' % deletion_date) |
|
69 | 74 | |
70 | 75 |
if schema_exists(self.schema_name) and (self.auto_drop_schema or force_drop): |
71 | 76 |
cursor = connection.cursor() |
72 | 77 |
cursor.execute('DROP SCHEMA %s CASCADE' % self.schema_name) |
78 | ||
79 |
if schema_exists(self.schema_name) and (not self.auto_drop_schema and not force_drop): |
|
80 |
cursor = connection.cursor() |
|
81 |
schema_new_name = 'removed_%s_%s' % (deletion_date, self.schema_name) |
|
82 |
cursor.execute('ALTER SCHEMA %s RENAME TO %s' % (self.schema_name, schema_new_name[:63])) |
tests_multitenant/conftest.py | ||
---|---|---|
54 | 54 |
tenants = [make_tenant('tenant1.example.net'), make_tenant('tenant2.example.net')] |
55 | 55 |
def fin(): |
56 | 56 |
from django.db import connection |
57 |
from hobo.multitenant.middleware import TenantMiddleware |
|
57 | 58 |
connection.set_schema_to_public() |
58 |
for t in tenants:
|
|
59 |
for t in TenantMiddleware.get_tenants():
|
|
59 | 60 |
t.delete(True) |
60 | 61 |
shutil.rmtree(base) |
61 | 62 |
request.addfinalizer(fin) |
tests_multitenant/test_tenant_command.py | ||
---|---|---|
1 | 1 |
import pytest |
2 | 2 |
import mock |
3 |
import os |
|
3 | 4 | |
4 | 5 |
pytestmark = pytest.mark.django_db |
5 | 6 | |
... | ... | |
29 | 30 |
assert handle.call_count == 1 |
30 | 31 |
assert len(handle.side_effect.tenants) == 1 |
31 | 32 |
assert handle.side_effect.tenants[0].domain_url == 'tenant2.example.net' |
33 | ||
34 |
def test_delete_tenant(tenants): |
|
35 |
from django.core.management import execute_from_command_line |
|
36 |
from hobo.multitenant.middleware import TenantMiddleware |
|
37 |
base = os.path.dirname(tenants[0].get_directory()) |
|
38 |
if any('removed' in d for d in os.listdir(base)): |
|
39 |
assert False |
|
40 |
def get_schemas(): |
|
41 |
from django.db import connection |
|
42 |
cursor = connection.cursor() |
|
43 |
cursor.execute('select schema_name from information_schema.schemata') |
|
44 |
return [x[0] for x in cursor.fetchall()] |
|
45 |
if any('removed' in x for x in get_schemas()): |
|
46 |
assert False |
|
47 |
all_tenants = list(TenantMiddleware.get_tenants()) |
|
48 |
assert len(all_tenants) == 2 |
|
49 |
execute_from_command_line(['manage.py', 'delete_tenant', 'tenant2.example.net']) |
|
50 |
if not any('removed' in d for d in os.listdir(base)): |
|
51 |
assert False |
|
52 |
if not any('removed' in x for x in get_schemas()): |
|
53 |
assert False |
|
54 |
all_tenants = list(TenantMiddleware.get_tenants()) |
|
55 |
assert len(all_tenants) == 1 |
|
32 |
- |