|
1 |
# -*- coding: utf-8 -*-
|
|
2 |
|
1 |
3 |
import pytest
|
2 |
4 |
import mock
|
3 |
5 |
import os
|
4 |
6 |
|
|
7 |
from django.core.management import BaseCommand, call_command, load_command_class
|
|
8 |
|
5 |
9 |
pytestmark = pytest.mark.django_db
|
6 |
10 |
|
|
11 |
|
7 |
12 |
class RecordTenant(object):
|
8 |
13 |
def __init__(self):
|
9 |
14 |
self.tenants = []
|
... | ... | |
12 |
17 |
from django.db import connection
|
13 |
18 |
self.tenants.append(connection.tenant)
|
14 |
19 |
|
|
20 |
|
15 |
21 |
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
|
16 |
22 |
def test_all_tenants(handle, tenants):
|
17 |
23 |
from django.core.management import execute_from_command_line
|
... | ... | |
19 |
25 |
execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '--all-tenants'])
|
20 |
26 |
assert handle.call_count == 2
|
21 |
27 |
assert len(handle.side_effect.tenants) == 2
|
22 |
|
assert set(tenant.domain_url for tenant in handle.side_effect.tenants) == \
|
23 |
|
set(['tenant1.example.net', 'tenant2.example.net'])
|
|
28 |
assert (set(tenant.domain_url for tenant in handle.side_effect.tenants)
|
|
29 |
== set(['tenant1.example.net', 'tenant2.example.net']))
|
|
30 |
|
24 |
31 |
|
25 |
32 |
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
|
26 |
33 |
def test_one_tenant(handle, tenants, tenant_in_call=None):
|
... | ... | |
31 |
38 |
assert len(handle.side_effect.tenants) == 1
|
32 |
39 |
assert handle.side_effect.tenants[0].domain_url == 'tenant2.example.net'
|
33 |
40 |
|
|
41 |
|
34 |
42 |
def test_delete_tenant(tenants):
|
35 |
43 |
from django.core.management import execute_from_command_line
|
36 |
44 |
from hobo.multitenant.middleware import TenantMiddleware
|
37 |
45 |
base = os.path.dirname(tenants[0].get_directory())
|
38 |
46 |
if any('removed' in d for d in os.listdir(base)):
|
39 |
47 |
assert False
|
|
48 |
|
40 |
49 |
def get_schemas():
|
41 |
50 |
from django.db import connection
|
42 |
51 |
cursor = connection.cursor()
|
43 |
52 |
cursor.execute('select schema_name from information_schema.schemata')
|
44 |
53 |
return [x[0] for x in cursor.fetchall()]
|
|
54 |
|
45 |
55 |
if any('removed' in x for x in get_schemas()):
|
46 |
56 |
assert False
|
47 |
57 |
all_tenants = list(TenantMiddleware.get_tenants())
|
48 |
58 |
assert len(all_tenants) == 2
|
49 |
59 |
execute_from_command_line(['manage.py', 'delete_tenant', 'tenant2.example.net'])
|
50 |
|
if not any('removed' in d for d in os.listdir(base)):
|
51 |
|
assert False
|
52 |
|
if not any('removed' in x for x in get_schemas()):
|
53 |
|
assert False
|
|
60 |
assert any('removed' in d for d in os.listdir(base))
|
|
61 |
assert any('removed' in x for x in get_schemas())
|
54 |
62 |
all_tenants = list(TenantMiddleware.get_tenants())
|
55 |
63 |
assert len(all_tenants) == 1
|
|
64 |
|
|
65 |
|
|
66 |
def test_tenant_command_all_tenants_errors(tenants, monkeypatch, capsys):
|
|
67 |
from hobo.multitenant.management.commands import tenant_command
|
|
68 |
|
|
69 |
get_commands = tenant_command.get_commands
|
|
70 |
|
|
71 |
class UnicodeErrorCommand(BaseCommand):
|
|
72 |
def handle(self, *args, **kwargs):
|
|
73 |
raise Exception(u'héhé')
|
|
74 |
|
|
75 |
class BytesErrorCommand(BaseCommand):
|
|
76 |
def handle(self, *args, **kwargs):
|
|
77 |
raise Exception(b'héhé')
|
|
78 |
|
|
79 |
class MixOfBothCommand(BaseCommand):
|
|
80 |
def handle(self, *args, **kwargs):
|
|
81 |
raise Exception([b'héhé', u'hého'])
|
|
82 |
|
|
83 |
class WtfExceptionCommand(BaseCommand):
|
|
84 |
def handle(self, *args, **kwargs):
|
|
85 |
class WTF(Exception):
|
|
86 |
def __str__(self):
|
|
87 |
raise Exception
|
|
88 |
|
|
89 |
def __repr__(self):
|
|
90 |
raise Exception
|
|
91 |
|
|
92 |
raise WTF()
|
|
93 |
|
|
94 |
def new_get_commands():
|
|
95 |
d = get_commands().copy()
|
|
96 |
d['uni-error'] = UnicodeErrorCommand()
|
|
97 |
d['bytes-error'] = BytesErrorCommand()
|
|
98 |
d['mix-error'] = MixOfBothCommand()
|
|
99 |
d['wtf-error'] = WtfExceptionCommand()
|
|
100 |
return d
|
|
101 |
|
|
102 |
monkeypatch.setattr(tenant_command, 'get_commands', new_get_commands)
|
|
103 |
|
|
104 |
klass = get_commands()['tenant_command']
|
|
105 |
if not hasattr(klass, '__call__'):
|
|
106 |
klass = load_command_class(klass, 'tenant_command')
|
|
107 |
|
|
108 |
capsys.readouterr()
|
|
109 |
with pytest.raises(SystemExit):
|
|
110 |
klass.run_from_argv(['manage.py', 'tenant_command', 'uni-error', '--all-tenants'])
|
|
111 |
captured = capsys.readouterr()
|
|
112 |
assert u'héhé' in captured.err
|
|
113 |
|
|
114 |
with pytest.raises(SystemExit):
|
|
115 |
klass.run_from_argv(['manage.py', 'tenant_command', 'bytes-error', '--all-tenants'])
|
|
116 |
|
|
117 |
captured = capsys.readouterr()
|
|
118 |
assert u'héhé' in captured.err
|
|
119 |
|
|
120 |
with pytest.raises(SystemExit):
|
|
121 |
klass.run_from_argv(['manage.py', 'tenant_command', 'mix-error', '--all-tenants'])
|
|
122 |
|
|
123 |
captured = capsys.readouterr()
|
|
124 |
assert repr(b'héhé') in captured.err
|
|
125 |
assert repr(u'hého') in captured.err
|
|
126 |
|
|
127 |
with pytest.raises(SystemExit):
|
|
128 |
klass.run_from_argv(['manage.py', 'tenant_command', 'wtf-error', '--all-tenants'])
|
|
129 |
|
|
130 |
captured = capsys.readouterr()
|
|
131 |
assert 'Unrepresentable exception' in captured.err
|
56 |
|
-
|