0002-keep-data-form-previous-run-in-labeled-table-30752.patch
tests/test_wcs.py | ||
---|---|---|
6 | 6 |
import pathlib2 |
7 | 7 |
import mock |
8 | 8 | |
9 |
import utils |
|
10 | ||
9 | 11 | |
10 | 12 |
def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog): |
11 | 13 |
olap_cmd() |
... | ... | |
113 | 115 |
with pytest.raises(SystemExit): |
114 | 116 |
olap_cmd(no_log_errors=False) |
115 | 117 |
assert 'Invalid JSON content' in caplog.text |
118 | ||
119 | ||
120 |
def test_dimension_stability(wcs, wcs_dir, postgres_db, tmpdir, olap_cmd, caplog): |
|
121 |
olap_cmd() |
|
122 | ||
123 |
with postgres_db.conn() as conn: |
|
124 |
with conn.cursor() as c: |
|
125 |
c.execute('SET search_path TO \'olap\'') |
|
126 |
c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id') |
|
127 |
refs = c.fetchall() |
|
128 |
assert len(refs) == 3 |
|
129 | ||
130 |
# Change an item of the field |
|
131 |
script = u""" |
|
132 |
from wcs.formdef import FormDef |
|
133 | ||
134 |
formdef = FormDef.get_by_urlname('demande') |
|
135 |
for field in formdef.fields: |
|
136 |
if field.label == '2nd field': |
|
137 |
ref_field = field |
|
138 |
break |
|
139 |
ref_field.items = ['foo', 'bar', 'bazouka'] |
|
140 |
formdef.store() |
|
141 |
""" |
|
142 |
utils.run_wcs_script(wcs_dir, script, 'toto') |
|
143 |
olap_cmd() |
|
144 | ||
145 |
# We expect to find in the new dimension table |
|
146 |
# the same records as before (including the one of the item that disappeared) |
|
147 |
# plus the new item |
|
148 |
with postgres_db.conn() as conn: |
|
149 |
with conn.cursor() as c: |
|
150 |
c.execute('SET search_path TO \'olap\'') |
|
151 |
c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id') |
|
152 |
new_refs = c.fetchall() |
|
153 |
assert len(new_refs) == 4 |
|
154 |
for ref in refs: |
|
155 |
assert ref in new_refs |
|
156 |
assert new_refs[-1][1] == 'bazouka' |
wcs_olap/feeder.py | ||
---|---|---|
340 | 340 |
self.ex('COMMENT ON TABLE %s IS %%s' % name, vars=(comment,)) |
341 | 341 | |
342 | 342 |
def create_labeled_table(self, name, labels, serial=False, comment=None): |
343 |
if serial: |
|
344 |
id_type = 'serial primary key' |
|
343 | ||
344 |
query = """SELECT EXISTS (SELECT 1 FROM information_schema.tables |
|
345 |
WHERE table_schema = '{schema}' AND table_name = '%s')""" % name |
|
346 |
self.ex(query) |
|
347 |
table_exists = self.cur.fetchone()[0] |
|
348 | ||
349 |
if table_exists: |
|
350 |
self.ex( |
|
351 |
'CREATE TABLE {schema_temp}.%(name)s (LIKE {schema}.%(name)s INCLUDING ALL)' % |
|
352 |
{'name': name} |
|
353 |
) |
|
354 |
self.ex( |
|
355 |
'INSERT INTO {schema_temp}.%(name)s select * FROM {schema}.%(name)s' % |
|
356 |
{'name': name} |
|
357 |
) |
|
358 |
for _id, _label in labels: |
|
359 |
self.ex("SELECT * FROM %s WHERE label = '%s'" % (name, _label)) |
|
360 |
if self.cur.fetchone() is None: |
|
361 |
self.ex( |
|
362 |
"SELECT nextval(pg_get_serial_sequence('%s', 'id')) AS new_id;" % name) |
|
363 |
res = self.cur.fetchone()[0] |
|
364 |
if res is None: |
|
365 |
self.ex('SELECT MAX(id) FROM %s' % name) |
|
366 |
next_id = self.cur.fetchone()[0] + 1 |
|
367 |
else: |
|
368 |
next_id = res |
|
369 |
values = self.cur.mogrify('(%s, %s)', [next_id, _label]) |
|
370 |
self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values)) |
|
345 | 371 |
else: |
346 |
id_type = 'smallint primary key' |
|
347 |
self.create_table(name, |
|
348 |
[ |
|
349 |
['id', id_type], |
|
350 |
['label', 'varchar'] |
|
351 |
], comment=comment) |
|
352 |
values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels) |
|
353 |
if not values: |
|
354 |
return |
|
355 |
self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values)) |
|
372 |
if serial: |
|
373 |
id_type = 'serial primary key' |
|
374 |
else: |
|
375 |
id_type = 'smallint primary key' |
|
376 |
self.create_table(name, |
|
377 |
[ |
|
378 |
['id', id_type], |
|
379 |
['label', 'varchar'] |
|
380 |
], comment=comment) |
|
381 |
values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels) |
|
382 |
if not values: |
|
383 |
return |
|
384 |
self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values)) |
|
356 | 385 | |
357 | 386 |
def tpl(self, o, ctx=None): |
358 | 387 |
ctx = ctx or {} |
359 |
- |