1 |
1 |
# -*- coding: utf-8 -*-
|
2 |
2 |
|
3 |
3 |
import pytest
|
4 |
4 |
import mock
|
5 |
5 |
import os
|
|
6 |
import sys
|
6 |
7 |
|
7 |
8 |
from django.core.management import BaseCommand, call_command, load_command_class
|
|
9 |
from django.core.management.base import CommandError
|
|
10 |
|
|
11 |
from hobo.multitenant.models import Tenant
|
8 |
12 |
|
9 |
13 |
pytestmark = pytest.mark.django_db
|
10 |
14 |
|
11 |
15 |
|
12 |
16 |
class RecordTenant(object):
|
13 |
17 |
def __init__(self):
|
14 |
18 |
self.tenants = []
|
15 |
19 |
|
16 |
20 |
def __call__(self, *args, **kwargs):
|
17 |
21 |
from django.db import connection
|
18 |
22 |
self.tenants.append(connection.tenant)
|
19 |
23 |
|
20 |
24 |
|
|
25 |
def get_schemas():
|
|
26 |
from django.db import connection
|
|
27 |
cursor = connection.cursor()
|
|
28 |
cursor.execute('select schema_name from information_schema.schemata')
|
|
29 |
return [x[0] for x in cursor.fetchall()]
|
|
30 |
|
|
31 |
|
21 |
32 |
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
|
22 |
33 |
def test_all_tenants(handle, tenants):
|
23 |
34 |
from django.core.management import execute_from_command_line
|
24 |
35 |
handle.side_effect = RecordTenant()
|
25 |
36 |
execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '--all-tenants'])
|
26 |
37 |
assert handle.call_count == 2
|
27 |
38 |
assert len(handle.side_effect.tenants) == 2
|
28 |
39 |
assert (set(tenant.domain_url for tenant in handle.side_effect.tenants)
|
... | ... | |
41 |
52 |
|
42 |
53 |
def test_delete_tenant(tenants):
|
43 |
54 |
from django.core.management import execute_from_command_line
|
44 |
55 |
from hobo.multitenant.middleware import TenantMiddleware
|
45 |
56 |
base = os.path.dirname(tenants[0].get_directory())
|
46 |
57 |
if any('removed' in d for d in os.listdir(base)):
|
47 |
58 |
assert False
|
48 |
59 |
|
49 |
|
def get_schemas():
|
50 |
|
from django.db import connection
|
51 |
|
cursor = connection.cursor()
|
52 |
|
cursor.execute('select schema_name from information_schema.schemata')
|
53 |
|
return [x[0] for x in cursor.fetchall()]
|
54 |
|
|
55 |
60 |
if any('removed' in x for x in get_schemas()):
|
56 |
61 |
assert False
|
57 |
62 |
all_tenants = list(TenantMiddleware.get_tenants())
|
58 |
63 |
assert len(all_tenants) == 2
|
59 |
64 |
execute_from_command_line(['manage.py', 'delete_tenant', 'tenant2.example.net'])
|
60 |
65 |
assert any('removed' in d for d in os.listdir(base))
|
61 |
66 |
assert any('removed' in x for x in get_schemas())
|
62 |
67 |
all_tenants = list(TenantMiddleware.get_tenants())
|
... | ... | |
124 |
129 |
assert repr(b'héhé') in captured.err
|
125 |
130 |
assert repr(u'hého') in captured.err
|
126 |
131 |
|
127 |
132 |
with pytest.raises(SystemExit):
|
128 |
133 |
klass.run_from_argv(['manage.py', 'tenant_command', 'wtf-error', '--all-tenants'])
|
129 |
134 |
|
130 |
135 |
captured = capsys.readouterr()
|
131 |
136 |
assert 'Unrepresentable exception' in captured.err
|
|
137 |
|
|
138 |
|
|
139 |
def test_list_tenants_command(db, tenants, capsys):
|
|
140 |
call_command('list_tenants')
|
|
141 |
captured = capsys.readouterr()
|
|
142 |
assert captured.out.split('\n') == [
|
|
143 |
'tenant1_example_net tenant1.example.net',
|
|
144 |
'tenant2_example_net tenant2.example.net',
|
|
145 |
''
|
|
146 |
]
|
|
147 |
|
|
148 |
|
|
149 |
def test_list_tenants_command_empty(db, capsys):
|
|
150 |
call_command('list_tenants')
|
|
151 |
captured = capsys.readouterr()
|
|
152 |
assert captured.out == ''
|
|
153 |
|
|
154 |
|
|
155 |
@mock.patch('hobo.multitenant.management.commands.create_schemas.TenantMiddleware.get_tenants')
|
|
156 |
def test_create_schema_command(mocked_get_tenants):
|
|
157 |
tenant = Tenant(domain_url='my-tenant-url', schema_name='my_schema_name')
|
|
158 |
mocked_get_tenants.return_value = [tenant]
|
|
159 |
nb_schemas = len(get_schemas())
|
|
160 |
call_command('create_schemas')
|
|
161 |
assert 'my_schema_name' in get_schemas()[nb_schemas]
|
|
162 |
|
|
163 |
|
|
164 |
def test_shell_command_empty():
|
|
165 |
with pytest.raises(CommandError, match="There are no tenants in the system."):
|
|
166 |
call_command('shell')
|
132 |
|
-
|