From 2bd6eebbe9026ef02f31b914a02d305f69404a67 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Tue, 9 Jul 2019 17:32:11 +0200 Subject: [PATCH] tenant_command: convert exception to UTF-8 (#30559) --- .../management/commands/tenant_command.py | 27 +++++- tests_multitenant/test_tenant_command.py | 88 +++++++++++++++++-- 2 files changed, 107 insertions(+), 8 deletions(-) diff --git a/hobo/multitenant/management/commands/tenant_command.py b/hobo/multitenant/management/commands/tenant_command.py index 954568d..09e23ab 100644 --- a/hobo/multitenant/management/commands/tenant_command.py +++ b/hobo/multitenant/management/commands/tenant_command.py @@ -9,16 +9,39 @@ import argparse import sys import django +from django.utils import six +from django.utils.encoding import force_text from django.conf import settings from django.core.exceptions import ImproperlyConfigured from django.core.management.base import (BaseCommand, CommandError, - SystemCheckError, handle_default_options) + SystemCheckError, + handle_default_options) from django.core.management import call_command, get_commands, load_command_class from django.db import connection, connections from hobo.multitenant.management.commands import InteractiveTenantOption from hobo.multitenant.middleware import TenantMiddleware + +def exception_to_text(e): + try: + return six.text_type(e) + except Exception: + pass + + try: + return force_text(str(e), errors='ignore') + except Exception: + pass + + try: + return force_text(repr(e), errors='ignore') + except Exception: + pass + + return 'Unrepresentable exception' + + def run_command_from_argv(command, argv): # copied/adapted from Django run_from_argv command._called_from_command_line = True @@ -44,7 +67,7 @@ def run_command_from_argv(command, argv): command.stderr.write(str(e), lambda x: x) else: command.stderr.write('%s: %s: %s' % ( - connection.get_tenant(), e.__class__.__name__, e)) + connection.get_tenant(), e.__class__.__name__, exception_to_text(e))) return e finally: try: diff --git a/tests_multitenant/test_tenant_command.py b/tests_multitenant/test_tenant_command.py index bd7b473..b258f72 100644 --- a/tests_multitenant/test_tenant_command.py +++ b/tests_multitenant/test_tenant_command.py @@ -1,9 +1,14 @@ +# -*- coding: utf-8 -*- + import pytest import mock import os +from django.core.management import BaseCommand, call_command, load_command_class + pytestmark = pytest.mark.django_db + class RecordTenant(object): def __init__(self): self.tenants = [] @@ -12,6 +17,7 @@ class RecordTenant(object): from django.db import connection self.tenants.append(connection.tenant) + @mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle') def test_all_tenants(handle, tenants): from django.core.management import execute_from_command_line @@ -19,8 +25,9 @@ def test_all_tenants(handle, tenants): execute_from_command_line(['manage.py', 'tenant_command', 'clearsessions', '--all-tenants']) assert handle.call_count == 2 assert len(handle.side_effect.tenants) == 2 - assert set(tenant.domain_url for tenant in handle.side_effect.tenants) == \ - set(['tenant1.example.net', 'tenant2.example.net']) + assert (set(tenant.domain_url for tenant in handle.side_effect.tenants) + == set(['tenant1.example.net', 'tenant2.example.net'])) + @mock.patch('django.contrib.sessions.management.commands.clearsessions.Command.handle') def test_one_tenant(handle, tenants, tenant_in_call=None): @@ -31,25 +38,94 @@ def test_one_tenant(handle, tenants, tenant_in_call=None): assert len(handle.side_effect.tenants) == 1 assert handle.side_effect.tenants[0].domain_url == 'tenant2.example.net' + def test_delete_tenant(tenants): from django.core.management import execute_from_command_line from hobo.multitenant.middleware import TenantMiddleware base = os.path.dirname(tenants[0].get_directory()) if any('removed' in d for d in os.listdir(base)): assert False + def get_schemas(): from django.db import connection cursor = connection.cursor() cursor.execute('select schema_name from information_schema.schemata') return [x[0] for x in cursor.fetchall()] + if any('removed' in x for x in get_schemas()): assert False all_tenants = list(TenantMiddleware.get_tenants()) assert len(all_tenants) == 2 execute_from_command_line(['manage.py', 'delete_tenant', 'tenant2.example.net']) - if not any('removed' in d for d in os.listdir(base)): - assert False - if not any('removed' in x for x in get_schemas()): - assert False + assert any('removed' in d for d in os.listdir(base)) + assert any('removed' in x for x in get_schemas()) all_tenants = list(TenantMiddleware.get_tenants()) assert len(all_tenants) == 1 + + +def test_tenant_command_all_tenants_errors(tenants, monkeypatch, capsys): + from hobo.multitenant.management.commands import tenant_command + + get_commands = tenant_command.get_commands + + class UnicodeErrorCommand(BaseCommand): + def handle(self, *args, **kwargs): + raise Exception(u'héhé') + + class BytesErrorCommand(BaseCommand): + def handle(self, *args, **kwargs): + raise Exception(b'héhé') + + class MixOfBothCommand(BaseCommand): + def handle(self, *args, **kwargs): + raise Exception([b'héhé', u'hého']) + + class WtfExceptionCommand(BaseCommand): + def handle(self, *args, **kwargs): + class WTF(Exception): + def __str__(self): + raise Exception + + def __repr__(self): + raise Exception + + raise WTF() + + def new_get_commands(): + d = get_commands().copy() + d['uni-error'] = UnicodeErrorCommand() + d['bytes-error'] = BytesErrorCommand() + d['mix-error'] = MixOfBothCommand() + d['wtf-error'] = WtfExceptionCommand() + return d + + monkeypatch.setattr(tenant_command, 'get_commands', new_get_commands) + + klass = get_commands()['tenant_command'] + if not hasattr(klass, '__call__'): + klass = load_command_class(klass, 'tenant_command') + + capsys.readouterr() + with pytest.raises(SystemExit): + klass.run_from_argv(['manage.py', 'tenant_command', 'uni-error', '--all-tenants']) + captured = capsys.readouterr() + assert u'héhé' in captured.err + + with pytest.raises(SystemExit): + klass.run_from_argv(['manage.py', 'tenant_command', 'bytes-error', '--all-tenants']) + + captured = capsys.readouterr() + assert u'héhé' in captured.err + + with pytest.raises(SystemExit): + klass.run_from_argv(['manage.py', 'tenant_command', 'mix-error', '--all-tenants']) + + captured = capsys.readouterr() + assert repr(b'héhé') in captured.err + assert repr(u'hého') in captured.err + + with pytest.raises(SystemExit): + klass.run_from_argv(['manage.py', 'tenant_command', 'wtf-error', '--all-tenants']) + + captured = capsys.readouterr() + assert 'Unrepresentable exception' in captured.err -- 2.20.1