Projet

Général

Profil

0004-re-use-dimension-tables-from-previous-run-30752.patch

Emmanuel Cazenave, 01 mars 2019 14:30

Télécharger (19,3 ko)

Voir les différences:

Subject: [PATCH 4/4] re-use dimension tables from previous run (#30752)

 tests/conftest.py  |  62 +++++++++++------------
 tests/olap.model   |  17 +++++++
 tests/test_wcs.py  |  81 +++++++++++++++++++++++++++++
 tests/utils.py     |  17 +++++++
 wcs_olap/feeder.py | 124 ++++++++++++++++++++++++++++++++-------------
 5 files changed, 234 insertions(+), 67 deletions(-)
 create mode 100644 tests/utils.py
tests/conftest.py
1 1
# -*- coding: utf-8 -*-
2 2

  
3 3
import sys
4
import subprocess
5 4
import time
6 5
import os
7 6
import shutil
......
11 10
from collections import namedtuple
12 11

  
13 12
import psycopg2
14

  
15 13
import pytest
16 14

  
15
import utils
16

  
17

  
17 18
Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid'])
18 19

  
19 20

  
......
92 93
    fields.ItemField(id='2', label='2nd field', type='item',
93 94
                     items=['foo', 'bar', 'baz'], varname='item'),
94 95
    fields.BoolField(id='3', label='3rd field', type='bool', varname='bool'),
96
    fields.ItemField(id='4', label='4rth field', type='item', varname='item_open'),
95 97
]
96 98
formdef.store()
97 99

  
......
105 107
    if i%4 == 0:
106 108
        formdata.data['2'] = 'foo'
107 109
        formdata.data['2_display'] = 'foo'
110
        formdata.data['4'] = 'open_one'
111
        formdata.data['4_display'] = 'open_one'
108 112
    elif i%4 == 1:
109 113
        formdata.data['2'] = 'bar'
110 114
        formdata.data['2_display'] = 'bar'
115
        formdata.data['4'] = 'open_two'
116
        formdata.data['4_display'] = 'open_two'
111 117
    else:
112 118
        formdata.data['2'] = 'baz'
113 119
        formdata.data['2_display'] = 'baz'
120
        formdata.data['4'] = "open'three"
121
        formdata.data['4_display'] = "open'three"
122

  
114 123
    formdata.data['3'] = bool(i % 2)
115 124
    if i%3 == 0:
116 125
        formdata.jump_status('new')
......
123 132
}
124 133

  
125 134

  
126
@pytest.fixture(scope='session')
127
def wcs(tmp_path_factory):
135
@pytest.fixture
136
def wcs_dir(tmp_path_factory):
137
    return tmp_path_factory.mktemp('wcs')
138

  
139

  
140
@pytest.fixture
141
def wcs(tmp_path_factory, wcs_dir):
128 142
    '''Session scoped wcs fixture, so read-only.'''
129
    if 'WCSCTL' not in os.environ or not os.path.exists(os.environ['WCSCTL']):
130
        pytest.skip('WCSCTL not defined in environment')
131
    WCSCTL = os.environ.get('WCSCTL')
132
    WCS_DIR = tmp_path_factory.mktemp('wcs')
133
    HOSTNAME = '127.0.0.1'
134 143
    PORT = 8899
135 144
    ADDRESS = '0.0.0.0'
136 145
    WCS_PID = None
137 146

  
138
    def run_wcs_script(script, hostname):
139
        '''Run python script inside w.c.s. environment'''
140

  
141
        script_path = WCS_DIR / (script + '.py')
142
        with script_path.open('w') as fd:
143
            fd.write(WCS_SCRIPTS[script])
144

  
145
        subprocess.check_call(
146
            [WCSCTL, 'runscript', '--app-dir', str(WCS_DIR), '--vhost', hostname,
147
             str(script_path)])
148

  
149
    tenant_dir = WCS_DIR / HOSTNAME
147
    tenant_dir = wcs_dir / utils.HOSTNAME
150 148
    tenant_dir.mkdir()
151 149

  
152
    run_wcs_script('setup-auth', HOSTNAME)
153
    run_wcs_script('create-user', HOSTNAME)
154
    run_wcs_script('create-data', HOSTNAME)
150
    utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-auth'], 'setup-auth')
151
    utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-user'], 'create-user')
152
    utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-data'], 'create-data')
155 153

  
156 154
    with (tenant_dir / 'site-options.cfg').open('w') as fd:
157 155
        fd.write(u'''[api-secrets]
158 156
olap = olap
159 157
''')
160 158

  
161
    with (WCS_DIR / 'wcs.cfg').open('w') as fd:
159
    with (wcs_dir / 'wcs.cfg').open('w') as fd:
162 160
        fd.write(u'''[main]
163
app_dir = %s\n''' % WCS_DIR)
161
app_dir = %s\n''' % wcs_dir)
164 162

  
165
    with (WCS_DIR / 'local_settings.py').open('w') as fd:
163
    with (wcs_dir / 'local_settings.py').open('w') as fd:
166 164
        fd.write(u'''
167 165
WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg'
168 166
THEMES_DIRECTORY = '/'
169 167
ALLOWED_HOSTS = ['%s']
170
''' % (WCS_DIR, HOSTNAME))
168
''' % (wcs_dir, utils.HOSTNAME))
171 169

  
172 170
    # launch a Django worker for running w.c.s.
173 171
    WCS_PID = os.fork()
174 172
    if not WCS_PID:
175
        os.chdir(os.path.dirname(WCSCTL))
173
        os.chdir(os.path.dirname(utils.WCSCTL))
176 174
        os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings'
177
        os.environ['WCS_SETTINGS_FILE'] = str(WCS_DIR / 'local_settings.py')
175
        os.environ['WCS_SETTINGS_FILE'] = str(wcs_dir / 'local_settings.py')
178 176
        os.execvp('python', ['python', 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)])
179 177
        sys.exit(0)
180 178

  
......
197 195
    if pid:
198 196
        assert False, 'w.c.s. stopped with exit-code %s' % exit_code
199 197

  
200
    yield Wcs(url='http://%s:%s/' % (HOSTNAME, PORT), appdir=WCS_DIR, pid=WCS_PID)
198
    yield Wcs(url='http://%s:%s/' % (utils.HOSTNAME, PORT), appdir=wcs_dir, pid=WCS_PID)
201 199
    os.kill(WCS_PID, 9)
202
    shutil.rmtree(str(WCS_DIR))
200
    shutil.rmtree(str(wcs_dir))
203 201

  
204 202

  
205 203
@pytest.fixture
tests/olap.model
264 264
             "type": "bool",
265 265
             "value": "\"field_bool\"",
266 266
             "value_label": "(CASE WHEN \"field_bool\" IS NULL THEN NULL WHEN \"field_bool\" THEN 'Oui' ELSE 'Non' END)"
267
            },
268
            {
269
               "filter" : true,
270
               "join" : [
271
                  "item_open"
272
               ],
273
               "label" : "4rth field",
274
               "name" : "item_open",
275
               "type" : "integer",
276
               "value" : "\"item_open\".id",
277
               "value_label" : "\"item_open\".label"
267 278
            }
268 279
         ],
269 280
         "fact_table" : "formdata_demande",
......
329 340
               "master" : "field_item",
330 341
               "name" : "item",
331 342
               "table" : "formdata_demande_field_item"
343
            },
344
            {
345
               "detail" : "id",
346
               "master" : "field_item_open",
347
               "name" : "item_open",
348
               "table" : "formdata_demande_field_item_open"
332 349
            }
333 350
         ],
334 351
         "key" : "id",
tests/test_wcs.py
6 6
import pathlib2
7 7
import mock
8 8

  
9
import utils
10

  
9 11

  
10 12
def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
11 13
    olap_cmd()
......
55 57
        ('formdata_demande', 'field_string'),
56 58
        ('formdata_demande', 'field_item'),
57 59
        ('formdata_demande', 'field_bool'),
60
        ('formdata_demande', 'field_item_open'),
58 61
        ('formdata_demande', 'function__receiver'),
59 62
        ('formdata_demande_field_item', 'id'),
60 63
        ('formdata_demande_field_item', 'label'),
64
        ('formdata_demande_field_item_open', 'id'),
65
        ('formdata_demande_field_item_open', 'label'),
61 66
        ('formdef', 'id'),
62 67
        ('formdef', 'category_id'),
63 68
        ('formdef', 'label'),
......
113 118
        with pytest.raises(SystemExit):
114 119
            olap_cmd(no_log_errors=False)
115 120
    assert 'Invalid JSON content' in caplog.text
121

  
122

  
123
def test_dimension_stability(wcs, wcs_dir, postgres_db, tmpdir, olap_cmd, caplog):
124

  
125
    olap_cmd()
126

  
127
    with postgres_db.conn() as conn:
128
        with conn.cursor() as c:
129
            c.execute('SET search_path TO \'olap\'')
130
            c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id')
131
            refs = c.fetchall()
132
            assert len(refs) == 3
133
            c.execute('SELECT * FROM formdata_demande_field_item_open ORDER BY id')
134
            open_refs = c.fetchall()
135
            assert len(open_refs) == 3
136

  
137
    # Change an item of the field
138
    script = u"""
139
import datetime
140
import random
141
from quixote import get_publisher
142
from wcs.formdef import FormDef
143
formdef = FormDef.get_by_urlname('demande')
144

  
145
for field in formdef.fields:
146
    if field.label == '2nd field':
147
        ref_field = field
148
        break
149

  
150
ref_field.items = ['foo', 'bar', 'bazouka']
151
formdef.store()
152

  
153
user = get_publisher().user_class.select()[0]
154

  
155
formdata = formdef.data_class()()
156
formdata.just_created()
157
formdata.receipt_time = datetime.datetime(2018, random.randrange(1, 13), random.randrange(1, 29)).timetuple()
158
formdata.data = {'1': 'FOO BAR 1'}
159
formdata.data['2'] = 'bazouka'
160
formdata.data['2_display'] = 'bazouka'
161
formdata.data['4'] = 'open_new_value'
162
formdata.data['4_display'] = 'open_new_value'
163
formdata.jump_status('new')
164
formdata.store()
165
"""
166
    utils.run_wcs_script(wcs_dir, script, 'toto')
167
    olap_cmd()
168

  
169
    # We expect to find in the new dimension table
170
    # the same  records as before (including the one of the item that disappeared)
171
    # plus the new item
172
    with postgres_db.conn() as conn:
173
        with conn.cursor() as c:
174

  
175
            c.execute('SET search_path TO \'olap\'')
176
            c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id')
177
            new_refs = c.fetchall()
178
            assert len(new_refs) == 4
179
            for ref in refs:
180
                assert ref in new_refs
181
            assert new_refs[-1][1] == 'bazouka'
182
            bazouka_id = new_refs[-1][0]
183

  
184
            c.execute('SELECT * FROM formdata_demande_field_item_open ORDER BY id')
185
            new_open_refs = c.fetchall()
186
            assert len(new_open_refs) == 4
187
            for ref in open_refs:
188
                assert ref in new_open_refs
189
            assert new_open_refs[-1][1] == 'open_new_value'
190
            open_new_id = new_open_refs[-1][0]
191

  
192
            c.execute('''SELECT field_item, field_item_open
193
            FROM formdata_demande ORDER BY id''')
194
            formdata = c.fetchone()
195
            assert formdata[0] == bazouka_id
196
            assert formdata[1] == open_new_id
tests/utils.py
1
import os
2
import subprocess
3

  
4

  
5
HOSTNAME = '127.0.0.1'
6
WCSCTL = os.environ.get('WCSCTL')
7

  
8

  
9
def run_wcs_script(wcs_dir, script, script_name):
10
    '''Run python script inside w.c.s. environment'''
11
    script_path = wcs_dir / (script_name + '.py')
12
    with script_path.open('w') as fd:
13
        fd.write(script)
14

  
15
    subprocess.check_call(
16
        [WCSCTL, 'runscript', '--app-dir', str(wcs_dir), '--vhost', HOSTNAME,
17
         str(script_path)])
wcs_olap/feeder.py
2 2

  
3 3
import six
4 4
import copy
5
import itertools
5 6
import os
6 7
import json
7 8
import hashlib
......
342 343
        if comment:
343 344
            self.ex('COMMENT ON TABLE %s IS %%s' % name, vars=(comment,))
344 345

  
345
    def create_labeled_table(self, name, labels, serial=False, comment=None):
346
        if serial:
347
            id_type = 'serial primary key'
348
        else:
349
            id_type = 'smallint primary key'
350
        self.create_table(name,
351
                          [
352
                              ['id', id_type],
353
                              ['label', 'varchar']
354
                          ], comment=comment)
355
        values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels)
356
        if not values:
357
            return
358
        self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values))
346
    def prev_table_exists(self, name):
347
        query = """SELECT EXISTS (SELECT 1 FROM information_schema.tables
348
        WHERE  table_schema = '{schema}' AND table_name = %s)"""
349
        self.ex(query, vars=(name,))
350
        return self.cur.fetchone()[0]
351

  
352
    def create_labeled_table_serial(self, name, comment):
353
        self.create_table(
354
            name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment)
355

  
356
        if self.prev_table_exists(name):
357
            # Insert data from previous table
358
            self.ex(
359
                'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}',
360
                ctx={'name': name}
361
            )
362
            # Update sequence
363
            self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'),
364
            (SELECT MAX(id) FROM {name}))""", ctx={'name': name})
365

  
366
    def create_labeled_table(self, name, labels, comment=None):
367
        self.create_table(
368
            name,
369
            [
370
                ['id', 'smallint primary key'],
371
                ['label', 'varchar']
372
            ], comment=comment)
373

  
374
        if self.prev_table_exists(name):
375
            # Insert data from previous table
376
            self.ex(
377
                'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}',
378
                ctx={'name': name}
379
            )
380
            # Find what is missing
381
            to_insert = []
382
            for _id, _label in labels:
383
                self.ex(
384
                    'SELECT * FROM {name} WHERE label = %s', ctx={'name': name}, vars=(_label,))
385
                if self.cur.fetchone() is None:
386
                    to_insert.append(_label)
387

  
388
            labels = None
389
            if to_insert:
390
                self.ex('SELECT MAX(id) FROM {name}', ctx={'name': name})
391
                next_id = self.cur.fetchone()[0] + 1
392
                ids = range(next_id, next_id + len(to_insert))
393
                labels = zip(ids, to_insert)
394

  
395
        if labels:
396
            labels = list(labels)
397
            tmpl = ', '.join(['(%s, %s)'] * len(labels))
398
            query_str = 'INSERT INTO {name} (id, label) VALUES %s' % tmpl
399
            self.ex(query_str, ctx={'name': name}, vars=list(itertools.chain(*labels)))
400

  
401
        res = {}
402
        self.ex("SELECT id, label FROM %s" % str(name))
403
        for id_, label in self.cur.fetchall():
404
            res[label] = id_
405
        return res
359 406

  
360 407
    def tpl(self, o, ctx=None):
361 408
        ctx = ctx or {}
......
379 426

  
380 427
    def do_base_table(self):
381 428
        # channels
382
        self.create_labeled_table('{channel_table}', [[c[0], c[2]] for c in self.channels],
429
        self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels],
383 430
                                  comment=u'canal')
384 431

  
385 432
        # roles
386 433
        roles = dict((i, role.name) for i, role in enumerate(self.roles))
387
        self.create_labeled_table('{role_table}', roles.items(), comment=u'role')
388
        self.role_mapping = dict((role.id, i) for i, role in enumerate(self.roles))
434
        tmp_role_map = self.create_labeled_table('role', roles.items(), comment=u'role')
435
        self.role_mapping = dict(
436
            (role.id, tmp_role_map[role.name]) for role in self.roles)
389 437

  
390 438
        # categories
391
        self.create_labeled_table('{category_table}', enumerate(c.name for c in self.categories),
392
                                  comment=u'catégorie')
393
        self.categories_mapping = dict((c.id, i) for i, c in enumerate(self.categories))
439
        tmp_cat_map = self.create_labeled_table(
440
            'category', enumerate(c.name for c in self.categories), comment=u'catégorie')
441
        self.categories_mapping = dict((c.id, tmp_cat_map[c.name]) for c in self.categories)
394 442

  
395
        self.create_labeled_table('{hour_table}', zip(range(0, 24), map(str, range(0, 24))),
443
        self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))),
396 444
                                  comment=u'heures')
397 445

  
398
        self.create_labeled_table('{generic_status_table}', self.status,
446
        self.create_labeled_table('status', self.status,
399 447
                                  comment=u'statuts simplifiés')
400 448
        self.ex('CREATE TABLE {form_table} (id serial PRIMARY KEY,'
401 449
                ' category_id integer REFERENCES {category_table} (id),'
402 450
                ' label varchar)')
403 451
        self.ex('COMMENT ON TABLE {form_table} IS %s', vars=(u'types de formulaire',))
404 452
        # agents
405
        self.create_labeled_table('{agent_table}', [], serial=True, comment=u'agents')
453
        self.create_labeled_table_serial('agent', comment=u'agents')
406 454

  
407 455
        self.columns = [
408 456
            ['id', 'serial primary key'],
......
476 524
            self.connection.close()
477 525

  
478 526
    def insert_agent(self, name):
479
        self.ex('INSERT INTO {agent_table} (label) VALUES (%s) RETURNING (id)', vars=[name])
527
        self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,))
528
        res = self.cur.fetchone()
529
        if res:
530
            return res[0]
531
        self.ex('INSERT INTO {agent_table} (label) VALUES (%s) RETURNING (id)', vars=(name,))
480 532
        return self.cur.fetchone()[0]
481 533

  
482 534
    def get_agent(self, user):
......
523 575

  
524 576
    def do_statuses(self):
525 577
        statuses = self.formdef.schema.workflow.statuses
526
        self.olap_feeder.create_labeled_table(self.status_table_name,
527
                                              enumerate([s.name for s in statuses]),
528
                                              comment=u'statuts du formulaire « %s »' %
529
                                              self.formdef.schema.name)
530
        self.status_mapping = dict((s.id, i) for i, s in enumerate(statuses))
578
        tmp_status_map = self.olap_feeder.create_labeled_table(
579
            self.status_table_name, enumerate([s.name for s in statuses]),
580
            comment=u'statuts du formulaire « %s »' % self.formdef.schema.name)
581
        self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses)
531 582

  
532 583
    def do_data_table(self):
533 584
        self.ex('INSERT INTO {form_table} (category_id, label) VALUES (%s, %s) RETURNING (id)',
......
560 611
                table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
561 612
                # create table and mapping
562 613
                if field.items:
563
                    self.create_labeled_table(table_name, enumerate(field.items),
564
                                              comment=comment)
565
                    self.items_mappings[field.varname] = dict(
566
                        (item, i) for i, item in enumerate(field.items))
614
                    self.items_mappings[field.varname] = self.create_labeled_table(
615
                        table_name, enumerate(field.items), comment=comment)
567 616
                else:
568 617
                    # open item field, from data sources...
569
                    self.create_labeled_table(table_name, [], serial=True, comment=comment)
618
                    self.create_labeled_table_serial(table_name, comment=comment)
570 619
                field_def = 'smallint REFERENCES %s (id)' % table_name
571 620
            elif field.type == 'bool':
572 621
                field_def = 'boolean'
......
629 678

  
630 679
    def insert_item_value(self, field, value):
631 680
        table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
632
        self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=[value],
681
        self.ex("SELECT id FROM {item_table} WHERE label = %s",
682
                ctx={'item_table': table_name}, vars=(value,))
683
        res = self.cur.fetchone()
684
        if res:
685
            return res[0]
686
        self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=(value,),
633 687
                ctx={'item_table': table_name})
634 688
        return self.cur.fetchone()[0]
635 689

  
636
-