Projet

Général

Profil

0002-keep-data-form-previous-run-in-labeled-table-30752.patch

Emmanuel Cazenave, 22 février 2019 15:41

Télécharger (5,03 ko)

Voir les différences:

Subject: [PATCH 2/2] keep data form previous run in labeled table (#30752)

 tests/test_wcs.py  | 41 +++++++++++++++++++++++++++++++++++
 wcs_olap/feeder.py | 53 +++++++++++++++++++++++++++++++++++-----------
 2 files changed, 82 insertions(+), 12 deletions(-)
tests/test_wcs.py
6 6
import pathlib2
7 7
import mock
8 8

  
9
import utils
10

  
9 11

  
10 12
def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog):
11 13
    olap_cmd()
......
113 115
        with pytest.raises(SystemExit):
114 116
            olap_cmd(no_log_errors=False)
115 117
    assert 'Invalid JSON content' in caplog.text
118

  
119

  
120
def test_dimension_stability(wcs, wcs_dir, postgres_db, tmpdir, olap_cmd, caplog):
121
    olap_cmd()
122

  
123
    with postgres_db.conn() as conn:
124
        with conn.cursor() as c:
125
            c.execute('SET search_path TO \'olap\'')
126
            c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id')
127
            refs = c.fetchall()
128
            assert len(refs) == 3
129

  
130
    # Change an item of the field
131
    script = u"""
132
from wcs.formdef import FormDef
133

  
134
formdef = FormDef.get_by_urlname('demande')
135
for field in formdef.fields:
136
    if field.label == '2nd field':
137
        ref_field = field
138
        break
139
ref_field.items = ['foo', 'bar', 'bazouka']
140
formdef.store()
141
"""
142
    utils.run_wcs_script(wcs_dir, script, 'toto')
143
    olap_cmd()
144

  
145
    # We expect to find in the new dimension table
146
    # the same  records as before (including the one of the item that disappeared)
147
    # plus the new item
148
    with postgres_db.conn() as conn:
149
        with conn.cursor() as c:
150
            c.execute('SET search_path TO \'olap\'')
151
            c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id')
152
            new_refs = c.fetchall()
153
            assert len(new_refs) == 4
154
    for ref in refs:
155
        assert ref in new_refs
156
    assert new_refs[-1][1] == 'bazouka'
wcs_olap/feeder.py
340 340
            self.ex('COMMENT ON TABLE %s IS %%s' % name, vars=(comment,))
341 341

  
342 342
    def create_labeled_table(self, name, labels, serial=False, comment=None):
343
        if serial:
344
            id_type = 'serial primary key'
343

  
344
        query = """SELECT EXISTS (SELECT 1 FROM information_schema.tables
345
        WHERE  table_schema = '{schema}' AND table_name = '%s')""" % name
346
        self.ex(query)
347
        table_exists = self.cur.fetchone()[0]
348

  
349
        if table_exists:
350
            self.ex(
351
                'CREATE TABLE {schema_temp}.%(name)s (LIKE {schema}.%(name)s INCLUDING ALL)' %
352
                {'name': name}
353
            )
354
            self.ex(
355
                'INSERT INTO {schema_temp}.%(name)s select * FROM {schema}.%(name)s' %
356
                {'name': name}
357
            )
358
            for _id, _label in labels:
359
                self.ex("SELECT * FROM %s WHERE label = '%s'" % (name, _label))
360
                if self.cur.fetchone() is None:
361
                    self.ex(
362
                        "SELECT nextval(pg_get_serial_sequence('%s', 'id')) AS new_id;" % name)
363
                    res = self.cur.fetchone()[0]
364
                    if res is None:
365
                        self.ex('SELECT MAX(id) FROM %s' % name)
366
                        next_id = self.cur.fetchone()[0] + 1
367
                    else:
368
                        next_id = res
369
                    values = self.cur.mogrify('(%s, %s)', [next_id, _label])
370
                    self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values))
345 371
        else:
346
            id_type = 'smallint primary key'
347
        self.create_table(name,
348
                          [
349
                              ['id', id_type],
350
                              ['label', 'varchar']
351
                          ], comment=comment)
352
        values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels)
353
        if not values:
354
            return
355
        self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values))
372
            if serial:
373
                id_type = 'serial primary key'
374
            else:
375
                id_type = 'smallint primary key'
376
            self.create_table(name,
377
                              [
378
                                  ['id', id_type],
379
                                  ['label', 'varchar']
380
                              ], comment=comment)
381
            values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels)
382
            if not values:
383
                return
384
            self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values))
356 385

  
357 386
    def tpl(self, o, ctx=None):
358 387
        ctx = ctx or {}
359
-