Projet

Général

Profil

0001-tenant_command-convert-exception-to-UTF-8-30559.patch

Benjamin Dauvergne, 09 juillet 2019 18:47

Télécharger (7,35 ko)

Voir les différences:

Subject: [PATCH] tenant_command: convert exception to UTF-8 (#30559)

 .../management/commands/tenant_command.py     | 27 +++++-
 tests_multitenant/test_tenant_command.py      | 88 +++++++++++++++++--
 2 files changed, 107 insertions(+), 8 deletions(-)
hobo/multitenant/management/commands/tenant_command.py
9 9
import sys
10 10

  
11 11
import django
12
from django.utils import six
13
from django.utils.encoding import force_text
12 14
from django.conf import settings
13 15
from django.core.exceptions import ImproperlyConfigured
14 16
from django.core.management.base import (BaseCommand, CommandError,
15
        SystemCheckError, handle_default_options)
17
                                         SystemCheckError,
18
                                         handle_default_options)
16 19
from django.core.management import call_command, get_commands, load_command_class
17 20
from django.db import connection, connections
18 21

  
19 22
from hobo.multitenant.management.commands import InteractiveTenantOption
20 23
from hobo.multitenant.middleware import TenantMiddleware
21 24

  
25

  
26
def exception_to_text(e):
27
    try:
28
        return six.text_type(e)
29
    except Exception:
30
        pass
31

  
32
    try:
33
        return force_text(str(e), errors='ignore')
34
    except Exception:
35
        pass
36

  
37
    try:
38
        return force_text(repr(e), errors='ignore')
39
    except Exception:
40
        pass
41

  
42
    return 'Unrepresentable exception'
43

  
44

  
22 45
def run_command_from_argv(command, argv):
23 46
    # copied/adapted from Django run_from_argv
24 47
    command._called_from_command_line = True
......
44 67
            command.stderr.write(str(e), lambda x: x)
45 68
        else:
46 69
            command.stderr.write('%s: %s: %s' % (
47
                connection.get_tenant(), e.__class__.__name__, e))
70
                connection.get_tenant(), e.__class__.__name__, exception_to_text(e)))
48 71
        return e
49 72
    finally:
50 73
        try:
tests_multitenant/test_tenant_command.py
1
# -*- coding: utf-8 -*-
2

  
1 3
import pytest
2 4
import mock
3 5
import os
4 6

  
7
from django.core.management import BaseCommand, call_command, load_command_class
8

  
5 9
pytestmark = pytest.mark.django_db
6 10

  
11

  
7 12
class RecordTenant(object):
8 13
    def __init__(self):
9 14
        self.tenants = []
......
12 17
        from django.db import connection
13 18
        self.tenants.append(connection.tenant)
14 19

  
20

  
15 21
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
16 22
def test_all_tenants(handle, tenants):
17 23
    from django.core.management import execute_from_command_line
......
19 25
    execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '--all-tenants'])
20 26
    assert handle.call_count == 2
21 27
    assert len(handle.side_effect.tenants) == 2
22
    assert set(tenant.domain_url for tenant in handle.side_effect.tenants) == \
23
        set(['tenant1.example.net', 'tenant2.example.net'])
28
    assert (set(tenant.domain_url for tenant in handle.side_effect.tenants)
29
            == set(['tenant1.example.net', 'tenant2.example.net']))
30

  
24 31

  
25 32
@mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle')
26 33
def test_one_tenant(handle, tenants, tenant_in_call=None):
......
31 38
    assert len(handle.side_effect.tenants) == 1
32 39
    assert handle.side_effect.tenants[0].domain_url == 'tenant2.example.net'
33 40

  
41

  
34 42
def test_delete_tenant(tenants):
35 43
    from django.core.management import execute_from_command_line
36 44
    from hobo.multitenant.middleware import TenantMiddleware
37 45
    base = os.path.dirname(tenants[0].get_directory())
38 46
    if any('removed' in d for d in os.listdir(base)):
39 47
        assert False
48

  
40 49
    def get_schemas():
41 50
        from django.db import connection
42 51
        cursor = connection.cursor()
43 52
        cursor.execute('select schema_name from information_schema.schemata')
44 53
        return [x[0] for x in cursor.fetchall()]
54

  
45 55
    if any('removed' in x for x in get_schemas()):
46 56
        assert False
47 57
    all_tenants = list(TenantMiddleware.get_tenants())
48 58
    assert len(all_tenants) == 2
49 59
    execute_from_command_line(['manage.py', 'delete_tenant', 'tenant2.example.net'])
50
    if not  any('removed' in d for d in os.listdir(base)):
51
        assert False
52
    if not any('removed' in x for x in get_schemas()):
53
        assert False 
60
    assert any('removed' in d for d in os.listdir(base))
61
    assert any('removed' in x for x in get_schemas())
54 62
    all_tenants = list(TenantMiddleware.get_tenants())
55 63
    assert len(all_tenants) == 1
64

  
65

  
66
def test_tenant_command_all_tenants_errors(tenants, monkeypatch, capsys):
67
    from hobo.multitenant.management.commands import tenant_command
68

  
69
    get_commands = tenant_command.get_commands
70

  
71
    class UnicodeErrorCommand(BaseCommand):
72
        def handle(self, *args, **kwargs):
73
            raise Exception(u'héhé')
74

  
75
    class BytesErrorCommand(BaseCommand):
76
        def handle(self, *args, **kwargs):
77
            raise Exception(b'héhé')
78

  
79
    class MixOfBothCommand(BaseCommand):
80
        def handle(self, *args, **kwargs):
81
            raise Exception([b'héhé', u'hého'])
82

  
83
    class WtfExceptionCommand(BaseCommand):
84
        def handle(self, *args, **kwargs):
85
            class WTF(Exception):
86
                def __str__(self):
87
                    raise Exception
88

  
89
                def __repr__(self):
90
                    raise Exception
91

  
92
            raise WTF()
93

  
94
    def new_get_commands():
95
        d = get_commands().copy()
96
        d['uni-error'] = UnicodeErrorCommand()
97
        d['bytes-error'] = BytesErrorCommand()
98
        d['mix-error'] = MixOfBothCommand()
99
        d['wtf-error'] = WtfExceptionCommand()
100
        return d
101

  
102
    monkeypatch.setattr(tenant_command, 'get_commands', new_get_commands)
103

  
104
    klass = get_commands()['tenant_command']
105
    if not hasattr(klass, '__call__'):
106
        klass = load_command_class(klass, 'tenant_command')
107

  
108
    capsys.readouterr()
109
    with pytest.raises(SystemExit):
110
        klass.run_from_argv(['manage.py', 'tenant_command', 'uni-error', '--all-tenants'])
111
    captured = capsys.readouterr()
112
    assert u'héhé' in captured.err
113

  
114
    with pytest.raises(SystemExit):
115
        klass.run_from_argv(['manage.py', 'tenant_command', 'bytes-error', '--all-tenants'])
116

  
117
    captured = capsys.readouterr()
118
    assert u'héhé' in captured.err
119

  
120
    with pytest.raises(SystemExit):
121
        klass.run_from_argv(['manage.py', 'tenant_command', 'mix-error', '--all-tenants'])
122

  
123
    captured = capsys.readouterr()
124
    assert repr(b'héhé') in captured.err
125
    assert repr(u'hého') in captured.err
126

  
127
    with pytest.raises(SystemExit):
128
        klass.run_from_argv(['manage.py', 'tenant_command', 'wtf-error', '--all-tenants'])
129

  
130
    captured = capsys.readouterr()
131
    assert 'Unrepresentable exception' in captured.err
56
-