Projet

Général

Profil

0001-re-use-data-from-previous-run-in-dimension-tables-30.patch

Emmanuel Cazenave, 28 février 2019 20:07

Télécharger (21,3 ko)

Voir les différences:

Subject: [PATCH] re-use data from previous run in dimension tables (#30752)

 tests/conftest.py  |  61 +++++++++--------
 tests/olap.model   |  17 +++++
 tests/test_wcs.py  |  82 ++++++++++++++++++++++-
 tests/utils.py     |  17 +++++
 wcs_olap/feeder.py | 160 ++++++++++++++++++++++++++++++---------------
 5 files changed, 252 insertions(+), 85 deletions(-)
 create mode 100644 tests/utils.py
tests/conftest.py
11 11
from collections import namedtuple
12 12

  
13 13
import psycopg2
14

  
15 14
import pytest
16 15

  
16
import utils
17

  
18

  
17 19
Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid'])
18 20

  
19 21

  
......
92 94
    fields.ItemField(id='2', label='2nd field', type='item',
93 95
                     items=['foo', 'bar', 'baz'], varname='item'),
94 96
    fields.BoolField(id='3', label='3rd field', type='bool', varname='bool'),
97
    fields.ItemField(id='4', label='4rth field', type='item', varname='item_open'),
95 98
]
96 99
formdef.store()
97 100

  
......
105 108
    if i%4 == 0:
106 109
        formdata.data['2'] = 'foo'
107 110
        formdata.data['2_display'] = 'foo'
111
        formdata.data['4'] = 'open_one'
112
        formdata.data['4_display'] = 'open_one'
108 113
    elif i%4 == 1:
109 114
        formdata.data['2'] = 'bar'
110 115
        formdata.data['2_display'] = 'bar'
116
        formdata.data['4'] = 'open_two'
117
        formdata.data['4_display'] = 'open_two'
111 118
    else:
112 119
        formdata.data['2'] = 'baz'
113 120
        formdata.data['2_display'] = 'baz'
121
        formdata.data['4'] = 'open_three'
122
        formdata.data['4_display'] = 'open_three'
123

  
114 124
    formdata.data['3'] = bool(i % 2)
115 125
    if i%3 == 0:
116 126
        formdata.jump_status('new')
......
123 133
}
124 134

  
125 135

  
126
@pytest.fixture(scope='session')
127
def wcs(tmp_path_factory):
136
@pytest.fixture
137
def wcs_dir(tmp_path_factory):
138
    return tmp_path_factory.mktemp('wcs')
139

  
140

  
141
@pytest.fixture
142
def wcs(tmp_path_factory, wcs_dir):
128 143
    '''Session scoped wcs fixture, so read-only.'''
129
    if 'WCSCTL' not in os.environ or not os.path.exists(os.environ['WCSCTL']):
130
        pytest.skip('WCSCTL not defined in environment')
131
    WCSCTL = os.environ.get('WCSCTL')
132
    WCS_DIR = tmp_path_factory.mktemp('wcs')
133
    HOSTNAME = '127.0.0.1'
134 144
    PORT = 8899
135 145
    ADDRESS = '0.0.0.0'
136 146
    WCS_PID = None
137 147

  
138
    def run_wcs_script(script, hostname):
139
        '''Run python script inside w.c.s. environment'''
140

  
141
        script_path = WCS_DIR / (script + '.py')
142
        with script_path.open('w') as fd:
143
            fd.write(WCS_SCRIPTS[script])
144

  
145
        subprocess.check_call(
146
            [WCSCTL, 'runscript', '--app-dir', str(WCS_DIR), '--vhost', hostname,
147
             str(script_path)])
148

  
149
    tenant_dir = WCS_DIR / HOSTNAME
148
    tenant_dir = wcs_dir / utils.HOSTNAME
150 149
    tenant_dir.mkdir()
151 150

  
152
    run_wcs_script('setup-auth', HOSTNAME)
153
    run_wcs_script('create-user', HOSTNAME)
154
    run_wcs_script('create-data', HOSTNAME)
151
    utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-auth'], 'setup-auth')
152
    utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-user'], 'create-user')
153
    utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-data'], 'create-data')
155 154

  
156 155
    with (tenant_dir / 'site-options.cfg').open('w') as fd:
157 156
        fd.write(u'''[api-secrets]
158 157
olap = olap
159 158
''')
160 159

  
161
    with (WCS_DIR / 'wcs.cfg').open('w') as fd:
160
    with (wcs_dir / 'wcs.cfg').open('w') as fd:
162 161
        fd.write(u'''[main]
163
app_dir = %s\n''' % WCS_DIR)
162
app_dir = %s\n''' % wcs_dir)
164 163

  
165
    with (WCS_DIR / 'local_settings.py').open('w') as fd:
164
    with (wcs_dir / 'local_settings.py').open('w') as fd:
166 165
        fd.write(u'''
167 166
WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg'
168 167
THEMES_DIRECTORY = '/'
169 168
ALLOWED_HOSTS = ['%s']
170
''' % (WCS_DIR, HOSTNAME))
169
''' % (wcs_dir, utils.HOSTNAME))
171 170

  
172 171
    # launch a Django worker for running w.c.s.
173 172
    WCS_PID = os.fork()
174 173
    if not WCS_PID:
175
        os.chdir(os.path.dirname(WCSCTL))
174
        os.chdir(os.path.dirname(utils.WCSCTL))
176 175
        os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings'
177
        os.environ['WCS_SETTINGS_FILE'] = str(WCS_DIR / 'local_settings.py')
176
        os.environ['WCS_SETTINGS_FILE'] = str(wcs_dir / 'local_settings.py')
178 177
        os.execvp('python', ['python', 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)])
179 178
        sys.exit(0)
180 179

  
......
197 196
    if pid:
198 197
        assert False, 'w.c.s. stopped with exit-code %s' % exit_code
199 198

  
200
    yield Wcs(url='http://%s:%s/' % (HOSTNAME, PORT), appdir=WCS_DIR, pid=WCS_PID)
199
    yield Wcs(url='http://%s:%s/' % (utils.HOSTNAME, PORT), appdir=wcs_dir, pid=WCS_PID)
201 200
    os.kill(WCS_PID, 9)
202
    shutil.rmtree(str(WCS_DIR))
201
    shutil.rmtree(str(wcs_dir))
203 202

  
204 203

  
205 204
@pytest.fixture
tests/olap.model
264 264
             "type": "bool",
265 265
             "value": "\"field_bool\"",
266 266
             "value_label": "(CASE WHEN \"field_bool\" IS NULL THEN NULL WHEN \"field_bool\" THEN 'Oui' ELSE 'Non' END)"
267
            },
268
            {
269
               "filter" : true,
270
               "join" : [
271
                  "item_open"
272
               ],
273
               "label" : "4rth field",
274
               "name" : "item_open",
275
               "type" : "integer",
276
               "value" : "\"item_open\".id",
277
               "value_label" : "\"item_open\".label"
267 278
            }
268 279
         ],
269 280
         "fact_table" : "formdata_demande",
......
329 340
               "master" : "field_item",
330 341
               "name" : "item",
331 342
               "table" : "formdata_demande_field_item"
343
            },
344
            {
345
               "detail" : "id",
346
               "master" : "field_item_open",
347
               "name" : "item_open",
348
               "table" : "formdata_demande_field_item_open"
332 349
            }
333 350
         ],
334 351
         "key" : "id",
tests/test_wcs.py
6 6
import pathlib2
7 7
import mock
8 8

  
9
import utils
10

  
9 11

  
10 12
def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
11 13
    olap_cmd()
......
55 57
        ('formdata_demande', 'field_string'),
56 58
        ('formdata_demande', 'field_item'),
57 59
        ('formdata_demande', 'field_bool'),
60
        ('formdata_demande', 'field_item_open'),
58 61
        ('formdata_demande', 'function__receiver'),
59 62
        ('formdata_demande_field_item', 'id'),
60 63
        ('formdata_demande_field_item', 'label'),
64
        ('formdata_demande_field_item_open', 'id'),
65
        ('formdata_demande_field_item_open', 'label'),
61 66
        ('formdef', 'id'),
62 67
        ('formdef', 'category_id'),
63 68
        ('formdef', 'label'),
......
77 82
            c.execute('SELECT table_name, column_name '
78 83
                      'FROM information_schema.columns '
79 84
                      'WHERE table_schema = \'olap\' ORDER BY table_name, ordinal_position')
80

  
81 85
            assert list(c.fetchall()) == expected_schema
82 86

  
83 87
    # verify JSON schema
......
113 117
        with pytest.raises(SystemExit):
114 118
            olap_cmd(no_log_errors=False)
115 119
    assert 'Invalid JSON content' in caplog.text
120

  
121

  
122
def test_dimension_stability(wcs, wcs_dir, postgres_db, tmpdir, olap_cmd, caplog):
123

  
124
    olap_cmd()
125

  
126
    with postgres_db.conn() as conn:
127
        with conn.cursor() as c:
128
            c.execute('SET search_path TO \'olap\'')
129
            c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id')
130
            refs = c.fetchall()
131
            assert len(refs) == 3
132
            c.execute('SELECT * FROM formdata_demande_field_item_open ORDER BY id')
133
            open_refs = c.fetchall()
134
            assert len(open_refs) == 3
135

  
136
    # Change an item of the field
137
    script = u"""
138
import datetime
139
import random
140
from quixote import get_publisher
141
from wcs.formdef import FormDef
142
formdef = FormDef.get_by_urlname('demande')
143

  
144
for field in formdef.fields:
145
    if field.label == '2nd field':
146
        ref_field = field
147
        break
148

  
149
ref_field.items = ['foo', 'bar', 'bazouka']
150
formdef.store()
151

  
152
user = get_publisher().user_class.select()[0]
153

  
154
formdata = formdef.data_class()()
155
formdata.just_created()
156
formdata.receipt_time = datetime.datetime(2018, random.randrange(1, 13), random.randrange(1, 29)).timetuple()
157
formdata.data = {'1': 'FOO BAR 1'}
158
formdata.data['2'] = 'bazouka'
159
formdata.data['2_display'] = 'bazouka'
160
formdata.data['4'] = 'open_new_value'
161
formdata.data['4_display'] = 'open_new_value'
162
formdata.jump_status('new')
163
formdata.store()
164
"""
165
    utils.run_wcs_script(wcs_dir, script, 'toto')
166
    olap_cmd()
167

  
168
    # We expect to find in the new dimension table
169
    # the same  records as before (including the one of the item that disappeared)
170
    # plus the new item
171
    with postgres_db.conn() as conn:
172
        with conn.cursor() as c:
173

  
174
            c.execute('SET search_path TO \'olap\'')
175
            c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id')
176
            new_refs = c.fetchall()
177
            assert len(new_refs) == 4
178
            for ref in refs:
179
                assert ref in new_refs
180
            assert new_refs[-1][1] == 'bazouka'
181
            bazouka_id = new_refs[-1][0]
182

  
183
            c.execute('SELECT * FROM formdata_demande_field_item_open ORDER BY id')
184
            new_open_refs = c.fetchall()
185
            assert len(new_open_refs) == 4
186
            for ref in open_refs:
187
                assert ref in new_open_refs
188
            assert new_open_refs[-1][1] == 'open_new_value'
189
            open_new_id = new_open_refs[-1][0]
190

  
191
            c.execute('''SELECT field_item, field_item_open
192
            FROM formdata_demande ORDER BY id''')
193
            formdata = c.fetchone()
194
            assert formdata[0] == bazouka_id
195
            assert formdata[1] == open_new_id
tests/utils.py
1
import os
2
import subprocess
3

  
4

  
5
HOSTNAME = '127.0.0.1'
6
WCSCTL = os.environ.get('WCSCTL')
7

  
8

  
9
def run_wcs_script(wcs_dir, script, script_name):
10
    '''Run python script inside w.c.s. environment'''
11
    script_path = wcs_dir / (script_name + '.py')
12
    with script_path.open('w') as fd:
13
        fd.write(script)
14

  
15
    subprocess.check_call(
16
        [WCSCTL, 'runscript', '--app-dir', str(wcs_dir), '--vhost', HOSTNAME,
17
         str(script_path)])
wcs_olap/feeder.py
45 45

  
46 46

  
47 47
class WcsOlapFeeder(object):
48

  
49
    channels = [
50
        [1, 'web', u'web'],
51
        [2, 'mail', u'courrier'],
52
        [3, 'phone', u'téléphone'],
53
        [4, 'counter', u'guichet'],
54
        [5, 'backoffice', u'backoffice'],
55
        [6, 'email', u'email'],
56
        [7, 'fax', u'fax'],
57
    ]
58
    channel_to_id = dict((c[1], c[0]) for c in channels)
59
    id_to_channel = dict((c[0], c[1]) for c in channels)
60

  
61
    status = [
62
        [1, 'Nouveau'],
63
        [2, 'En cours'],
64
        [3, 'Terminé'],
65
    ]
66
    status_to_id = dict((c[1], c[0]) for c in channels)
67
    id_to_status = dict((c[0], c[1]) for c in channels)
68

  
48 69
    def __init__(self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False):
49 70
        self.api = api
50 71
        self.fake = fake
......
239 260
        self.connection = psycopg2.connect(dsn=pg_dsn)
240 261
        self.connection.autocommit = True
241 262
        self.cur = self.connection.cursor()
263
        psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, self.cur)
264

  
242 265
        try:
243 266
            self.has_jsonb = self.detect_jsonb()
244 267
            if self.has_jsonb:
......
310 333
        generate_series('2010-01-01'::date, '2020-01-01'::date, '1 day'::interval)
311 334
    AS the_date(the_date));''')
312 335

  
313
    channels = [
314
        [1, 'web', u'web'],
315
        [2, 'mail', u'courrier'],
316
        [3, 'phone', u'téléphone'],
317
        [4, 'counter', u'guichet'],
318
        [5, 'backoffice', u'backoffice'],
319
        [6, 'email', u'email'],
320
        [7, 'fax', u'fax'],
321
    ]
322
    channel_to_id = dict((c[1], c[0]) for c in channels)
323
    id_to_channel = dict((c[0], c[1]) for c in channels)
324

  
325
    status = [
326
        [1, 'Nouveau'],
327
        [2, 'En cours'],
328
        [3, 'Terminé'],
329
    ]
330
    status_to_id = dict((c[1], c[0]) for c in channels)
331
    id_to_status = dict((c[0], c[1]) for c in channels)
332

  
333 336
    def create_table(self, name, columns, inherits=None, comment=None):
334 337
        sql = 'CREATE TABLE %s' % name
335 338
        sql += '(' + ', '.join('%s %s' % (n, t) for n, t in columns) + ')'
......
339 342
        if comment:
340 343
            self.ex('COMMENT ON TABLE %s IS %%s' % name, vars=(comment,))
341 344

  
342
    def create_labeled_table(self, name, labels, serial=False, comment=None):
343
        if serial:
344
            id_type = 'serial primary key'
345
        else:
346
            id_type = 'smallint primary key'
347
        self.create_table(name,
348
                          [
349
                              ['id', id_type],
350
                              ['label', 'varchar']
351
                          ], comment=comment)
352
        values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels)
353
        if not values:
354
            return
355
        self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values))
345
    def prev_table_exists(self, name):
346
        query = """SELECT EXISTS (SELECT 1 FROM information_schema.tables
347
        WHERE  table_schema = '{schema}' AND table_name = '%s')""" % name
348
        self.ex(query)
349
        return self.cur.fetchone()[0]
350

  
351
    def create_labeled_table_serial(self, name, comment):
352
        self.create_table(
353
            name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment)
354

  
355
        if self.prev_table_exists(name):
356
            # Insert data from previous table
357
            self.ex(
358
                'INSERT INTO {schema_temp}.%(name)s select * FROM {schema}.%(name)s'
359
                % {'name': name}
360
            )
361
            # Update sequence
362
            self.ex("""SELECT setval(pg_get_serial_sequence('%(name)s', 'id'),
363
            (SELECT MAX(id) FROM %(name)s))""" % {'name': name})
364

  
365
    def create_labeled_table(self, name, labels, comment=None):
366
        self.create_table(
367
            name,
368
            [
369
                ['id', 'smallint primary key'],
370
                ['label', 'varchar']
371
            ], comment=comment)
372

  
373
        if self.prev_table_exists(name):
374
            # Insert data from previous table
375
            self.ex(
376
                'INSERT INTO {schema_temp}.%(name)s select * FROM {schema}.%(name)s'
377
                % {'name': name}
378
            )
379
            # Find what is missing
380
            to_insert = []
381
            for _id, _label in labels:
382
                self.ex("SELECT * FROM %s WHERE label = '%s'" % (name, _label))
383
                if self.cur.fetchone() is None:
384
                    to_insert.append(_label)
385

  
386
            labels = None
387
            if to_insert:
388
                self.ex('SELECT MAX(id) FROM %s' % name)
389
                next_id = self.cur.fetchone()[0] + 1
390
                ids = range(next_id, next_id + len(to_insert))
391
                labels = zip(ids, to_insert)
392

  
393
        if labels:
394
            values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels)
395
            self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values))
396

  
397
        res = {}
398
        self.ex("SELECT id, label FROM %s" % str(name))
399
        for id_, label in self.cur.fetchall():
400
            res[label] = id_
401
        return res
356 402

  
357 403
    def tpl(self, o, ctx=None):
358 404
        ctx = ctx or {}
......
381 427

  
382 428
        # roles
383 429
        roles = dict((i, role.name) for i, role in enumerate(self.roles))
384
        self.create_labeled_table('{role_table}', roles.items(), comment=u'role')
385
        self.role_mapping = dict((role.id, i) for i, role in enumerate(self.roles))
430
        tmp_role_map = self.create_labeled_table('{role_table}', roles.items(), comment=u'role')
431
        self.role_mapping = dict(
432
            (role.id, tmp_role_map[role.name]) for role in self.roles)
386 433

  
387 434
        # categories
388
        self.create_labeled_table('{category_table}', enumerate(c.name for c in self.categories),
389
                                  comment=u'catégorie')
390
        self.categories_mapping = dict((c.id, i) for i, c in enumerate(self.categories))
435
        tmp_cat_map = self.create_labeled_table(
436
            '{category_table}', enumerate(c.name for c in self.categories), comment=u'catégorie')
437
        self.categories_mapping = dict((c.id, tmp_cat_map[c.name]) for c in self.categories)
391 438

  
392 439
        self.create_labeled_table('{hour_table}', zip(range(0, 24), map(str, range(0, 24))),
393 440
                                  comment=u'heures')
......
399 446
                ' label varchar)')
400 447
        self.ex('COMMENT ON TABLE {form_table} IS %s', vars=(u'types de formulaire',))
401 448
        # agents
402
        self.create_labeled_table('{agent_table}', [], serial=True, comment=u'agents')
449
        self.create_labeled_table_serial('{agent_table}', comment=u'agents')
403 450

  
404 451
        self.columns = [
405 452
            ['id', 'serial primary key'],
......
473 520
            self.connection.close()
474 521

  
475 522
    def insert_agent(self, name):
523
        self.ex("SELECT id FROM {agent_table} WHERE label = '%s'" % name)
524
        res = self.cur.fetchone()
525
        if res:
526
            return res[0]
476 527
        self.ex('INSERT INTO {agent_table} (label) VALUES (%s) RETURNING (id)', vars=[name])
477 528
        return self.cur.fetchone()[0]
478 529

  
......
520 571

  
521 572
    def do_statuses(self):
522 573
        statuses = self.formdef.schema.workflow.statuses
523
        self.olap_feeder.create_labeled_table(self.status_table_name,
524
                                              enumerate([s.name for s in statuses]),
525
                                              comment=u'statuts du formulaire « %s »' %
526
                                              self.formdef.schema.name)
527
        self.status_mapping = dict((s.id, i) for i, s in enumerate(statuses))
574
        tmp_status_map = self.olap_feeder.create_labeled_table(
575
            self.status_table_name, enumerate([s.name for s in statuses]),
576
            comment=u'statuts du formulaire « %s »' % self.formdef.schema.name)
577
        self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses)
528 578

  
529 579
    def do_data_table(self):
530 580
        self.ex('INSERT INTO {form_table} (category_id, label) VALUES (%s, %s) RETURNING (id)',
......
557 607
                table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
558 608
                # create table and mapping
559 609
                if field.items:
560
                    self.create_labeled_table(table_name, enumerate(field.items),
561
                                              comment=comment)
562
                    self.items_mappings[field.varname] = dict(
563
                        (item, i) for i, item in enumerate(field.items))
610
                    self.items_mappings[field.varname] = self.create_labeled_table(
611
                        table_name, enumerate(field.items), comment=comment)
564 612
                elif field.options:
565 613
                    options = enumerate(field.options)
566
                    self.create_labeled_table(table_name, [(i, o['label']) for i, o in options],
567
                                              comment=comment)
568
                    self.items_mappings[field.varname] = dict((o['value'], i) for i, o in options)
614
                    tmp_options_map = self.create_labeled_table(
615
                        table_name, [(i, o['label']) for i, o in options], comment=comment)
616
                    self.items_mappings[field.varname] = dict(
617
                        (o['value'], tmp_options_map[o['value']]) for i, o in options)
569 618
                else:
570 619
                    # open item field, from data sources...
571
                    self.create_labeled_table(table_name, [], serial=True, comment=comment)
620
                    self.create_labeled_table_serial(table_name, comment=comment)
572 621
                field_def = 'smallint REFERENCES %s (id)' % table_name
573 622
            elif field.type == 'bool':
574 623
                field_def = 'boolean'
......
631 680

  
632 681
    def insert_item_value(self, field, value):
633 682
        table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname)
683
        self.ex("SELECT id FROM {item_table} WHERE label = '%s'" % value,
684
                ctx={'item_table': table_name})
685
        res = self.cur.fetchone()
686
        if res:
687
            return res[0]
634 688
        self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=[value],
635 689
                ctx={'item_table': table_name})
636 690
        return self.cur.fetchone()[0]
637
-