Projet

Général

Profil

0003-add-management-command-configure_connection.patch

Christophe Siraut, 21 juin 2018 12:28

Télécharger (6,11 ko)

Voir les différences:

Subject: [PATCH 3/4] add management command configure_connection

 tests/test_configure_connection.py                 | 71 ++++++++++++++++++++++
 wcs/ctl/management/commands/__init__.py            | 24 ++++++++
 .../management/commands/configure_connection.py    | 49 +++++++++++++++
 3 files changed, 144 insertions(+)
 create mode 100644 tests/test_configure_connection.py
 create mode 100644 wcs/ctl/management/commands/configure_connection.py
tests/test_configure_connection.py
1
import os
2
import psycopg2
3
import pytest
4
import random
5
from django.core.management import get_commands
6
from django.core.management import call_command
7
from django.core.management.base import CommandError
8
from utilities import clean_temporary_pub
9
from utilities import create_temporary_pub
10
from utilities import force_connections_close
11
from wcs.sql import cleanup_connection
12

  
13

  
14
@pytest.fixture(scope='module')
15
def cursor():
16
    conn = psycopg2.connect(user=os.environ['USER'])
17
    conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
18
    cur = conn.cursor()
19
    yield cur
20
    cur.close()
21

  
22

  
23
@pytest.fixture
24
def database(cursor):
25
    dbname = 'wcstests%d' % random.randint(0, 100000)
26
    cursor.execute('CREATE DATABASE %s' % dbname)
27
    yield dbname
28
    cleanup_connection()
29
    cursor.execute('DROP DATABASE %s' % dbname)
30

  
31

  
32
@pytest.fixture(params=['pickle', 'sql'])
33
def pub(request):
34
    pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
35
    yield pub
36
    cleanup_connection()
37
    force_connections_close()
38
    clean_temporary_pub()
39

  
40

  
41
def test_command_exists():
42
    assert 'configure_connection' in get_commands()
43

  
44

  
45
def test_unknown_publisher():
46
    with pytest.raises(CommandError) as excinfo:
47
        call_command('configure_connection', '-d', 'unknown.net',
48
                     '--database', 'foobar')
49
    assert excinfo.value.message == 'unknown tenant'
50

  
51

  
52
def test_failing_connection(pub):
53
    with pytest.raises(psycopg2.OperationalError) as excinfo:
54
        call_command('configure_connection', '-d', 'example.net',
55
                     '--database', 'foobar', '--port', '666')
56
    assert 'could not connect' in excinfo.value.message
57

  
58

  
59
def test_database_does_not_exist(pub):
60
    new_database = 'test_{}'.format(random.randint(1000, 9999))
61
    with pytest.raises(psycopg2.OperationalError) as excinfo:
62
        call_command('configure_connection', '-d', 'example.net',
63
                     '--database', new_database)
64
    assert 'does not exist' in excinfo.value.message
65

  
66

  
67
def test_setup_database(pub, database):
68
    call_command('configure_connection', '-d', 'example.net',
69
                 '--database', database)
70
    pub.set_config()
71
    assert pub.cfg['postgresql'].get('database') == database
wcs/ctl/management/commands/__init__.py
1
import os
2
from django.core.management.base import BaseCommand, CommandError
3
from qommon.publisher import get_publisher_class
4

  
5

  
6
class WcsTenantCommand(BaseCommand):
7

  
8
    def create_parser(self, *args):
9
        parser = super(WcsTenantCommand, self).create_parser(*args)
10
        parser.add_argument('-d', '--domain', required=True)
11
        return parser
12

  
13
    def execute(self, *args, **kwargs):
14
        self.publisher = self.get_publisher(kwargs['domain'])
15
        super(WcsTenantCommand, self).execute(*args, **kwargs)
16

  
17
    def get_publisher(self, domain):
18
        publisher_class = get_publisher_class()
19
        publisher = publisher_class.create_publisher()
20
        if domain not in publisher.get_tenants():
21
            raise CommandError('unknown tenant')
22
        publisher.app_dir = os.path.join(publisher.app_dir, domain)
23
        publisher.set_config()
24
        return publisher
wcs/ctl/management/commands/configure_connection.py
1
import os
2
from wcs.ctl.management.commands import WcsTenantCommand
3
from wcs.sql import get_connection
4
from psycopg2 import OperationalError
5

  
6

  
7
class Command(WcsTenantCommand):
8

  
9
    help = 'Configure postgresql connection parameters.'
10

  
11
    def add_arguments(self, parser):
12
        parser.add_argument('--host')
13
        parser.add_argument('--port', type=int)
14
        parser.add_argument('--database', required=True)
15
        parser.add_argument('--user')
16
        parser.add_argument('--password')
17
        parser.add_argument('-f', '--force')
18

  
19
    def handle(self, **kwargs):
20
        self.setup_connection(**kwargs)
21
        self.publisher.write_cfg()
22
        self.publisher.load_site_options()
23
        self.enable()
24
        self.publisher.cleanup()
25

  
26
    def enable(self):
27
        if not self.publisher.site_options.has_option('options', 'postgresql'):
28
            self.publisher.site_options.set('options', 'postgresql', 'true')
29
            options_file = os.path.join(self.publisher.app_dir,
30
                                        'site-options.cfg')
31
            self.publisher.site_options.write(open(options_file, 'w'))
32

  
33
    def setup_connection(self, **kwargs):
34
        options = {}
35
        for k in ['host', 'port', 'database', 'user', 'password']:
36
            if k in kwargs:
37
                options[k] = kwargs.get(k)
38
        self.publisher.cfg['postgresql'] = options
39

  
40
        if not kwargs.get('force'):
41
            conn = self.test_connection()
42
            if isinstance(conn, OperationalError):
43
                raise(conn)
44

  
45
    def test_connection(self):
46
        try:
47
            return get_connection(new=True)
48
        except OperationalError as e:
49
            return e
0
-