0004-re-use-dimension-tables-from-previous-run-30752.patch
tests/conftest.py | ||
---|---|---|
1 | 1 |
# -*- coding: utf-8 -*- |
2 | 2 | |
3 | 3 |
import sys |
4 |
import subprocess |
|
5 | 4 |
import time |
6 | 5 |
import os |
7 | 6 |
import shutil |
... | ... | |
11 | 10 |
from collections import namedtuple |
12 | 11 | |
13 | 12 |
import psycopg2 |
14 | ||
15 | 13 |
import pytest |
16 | 14 | |
15 |
import utils |
|
16 | ||
17 | ||
17 | 18 |
Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid']) |
18 | 19 | |
19 | 20 | |
... | ... | |
92 | 93 |
fields.ItemField(id='2', label='2nd field', type='item', |
93 | 94 |
items=['foo', 'bar', 'baz'], varname='item'), |
94 | 95 |
fields.BoolField(id='3', label='3rd field', type='bool', varname='bool'), |
96 |
fields.ItemField(id='4', label='4rth field', type='item', varname='item_open'), |
|
95 | 97 |
] |
96 | 98 |
formdef.store() |
97 | 99 | |
... | ... | |
105 | 107 |
if i%4 == 0: |
106 | 108 |
formdata.data['2'] = 'foo' |
107 | 109 |
formdata.data['2_display'] = 'foo' |
110 |
formdata.data['4'] = 'open_one' |
|
111 |
formdata.data['4_display'] = 'open_one' |
|
108 | 112 |
elif i%4 == 1: |
109 | 113 |
formdata.data['2'] = 'bar' |
110 | 114 |
formdata.data['2_display'] = 'bar' |
115 |
formdata.data['4'] = 'open_two' |
|
116 |
formdata.data['4_display'] = 'open_two' |
|
111 | 117 |
else: |
112 | 118 |
formdata.data['2'] = 'baz' |
113 | 119 |
formdata.data['2_display'] = 'baz' |
120 |
formdata.data['4'] = "open'three" |
|
121 |
formdata.data['4_display'] = "open'three" |
|
122 | ||
114 | 123 |
formdata.data['3'] = bool(i % 2) |
115 | 124 |
if i%3 == 0: |
116 | 125 |
formdata.jump_status('new') |
... | ... | |
123 | 132 |
} |
124 | 133 | |
125 | 134 | |
126 |
@pytest.fixture(scope='session') |
|
127 |
def wcs(tmp_path_factory): |
|
135 |
@pytest.fixture |
|
136 |
def wcs_dir(tmp_path_factory): |
|
137 |
return tmp_path_factory.mktemp('wcs') |
|
138 | ||
139 | ||
140 |
@pytest.fixture |
|
141 |
def wcs(tmp_path_factory, wcs_dir): |
|
128 | 142 |
'''Session scoped wcs fixture, so read-only.''' |
129 |
if 'WCSCTL' not in os.environ or not os.path.exists(os.environ['WCSCTL']): |
|
130 |
pytest.skip('WCSCTL not defined in environment') |
|
131 |
WCSCTL = os.environ.get('WCSCTL') |
|
132 |
WCS_DIR = tmp_path_factory.mktemp('wcs') |
|
133 |
HOSTNAME = '127.0.0.1' |
|
134 | 143 |
PORT = 8899 |
135 | 144 |
ADDRESS = '0.0.0.0' |
136 | 145 |
WCS_PID = None |
137 | 146 | |
138 |
def run_wcs_script(script, hostname): |
|
139 |
'''Run python script inside w.c.s. environment''' |
|
140 | ||
141 |
script_path = WCS_DIR / (script + '.py') |
|
142 |
with script_path.open('w') as fd: |
|
143 |
fd.write(WCS_SCRIPTS[script]) |
|
144 | ||
145 |
subprocess.check_call( |
|
146 |
[WCSCTL, 'runscript', '--app-dir', str(WCS_DIR), '--vhost', hostname, |
|
147 |
str(script_path)]) |
|
148 | ||
149 |
tenant_dir = WCS_DIR / HOSTNAME |
|
147 |
tenant_dir = wcs_dir / utils.HOSTNAME |
|
150 | 148 |
tenant_dir.mkdir() |
151 | 149 | |
152 |
run_wcs_script('setup-auth', HOSTNAME)
|
|
153 |
run_wcs_script('create-user', HOSTNAME)
|
|
154 |
run_wcs_script('create-data', HOSTNAME)
|
|
150 |
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-auth'], 'setup-auth')
|
|
151 |
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-user'], 'create-user')
|
|
152 |
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-data'], 'create-data')
|
|
155 | 153 | |
156 | 154 |
with (tenant_dir / 'site-options.cfg').open('w') as fd: |
157 | 155 |
fd.write(u'''[api-secrets] |
158 | 156 |
olap = olap |
159 | 157 |
''') |
160 | 158 | |
161 |
with (WCS_DIR / 'wcs.cfg').open('w') as fd:
|
|
159 |
with (wcs_dir / 'wcs.cfg').open('w') as fd:
|
|
162 | 160 |
fd.write(u'''[main] |
163 |
app_dir = %s\n''' % WCS_DIR)
|
|
161 |
app_dir = %s\n''' % wcs_dir)
|
|
164 | 162 | |
165 |
with (WCS_DIR / 'local_settings.py').open('w') as fd:
|
|
163 |
with (wcs_dir / 'local_settings.py').open('w') as fd:
|
|
166 | 164 |
fd.write(u''' |
167 | 165 |
WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg' |
168 | 166 |
THEMES_DIRECTORY = '/' |
169 | 167 |
ALLOWED_HOSTS = ['%s'] |
170 |
''' % (WCS_DIR, HOSTNAME))
|
|
168 |
''' % (wcs_dir, utils.HOSTNAME))
|
|
171 | 169 | |
172 | 170 |
# launch a Django worker for running w.c.s. |
173 | 171 |
WCS_PID = os.fork() |
174 | 172 |
if not WCS_PID: |
175 |
os.chdir(os.path.dirname(WCSCTL)) |
|
173 |
os.chdir(os.path.dirname(utils.WCSCTL))
|
|
176 | 174 |
os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings' |
177 |
os.environ['WCS_SETTINGS_FILE'] = str(WCS_DIR / 'local_settings.py')
|
|
175 |
os.environ['WCS_SETTINGS_FILE'] = str(wcs_dir / 'local_settings.py')
|
|
178 | 176 |
os.execvp('python', ['python', 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)]) |
179 | 177 |
sys.exit(0) |
180 | 178 | |
... | ... | |
197 | 195 |
if pid: |
198 | 196 |
assert False, 'w.c.s. stopped with exit-code %s' % exit_code |
199 | 197 | |
200 |
yield Wcs(url='http://%s:%s/' % (HOSTNAME, PORT), appdir=WCS_DIR, pid=WCS_PID)
|
|
198 |
yield Wcs(url='http://%s:%s/' % (utils.HOSTNAME, PORT), appdir=wcs_dir, pid=WCS_PID)
|
|
201 | 199 |
os.kill(WCS_PID, 9) |
202 |
shutil.rmtree(str(WCS_DIR))
|
|
200 |
shutil.rmtree(str(wcs_dir))
|
|
203 | 201 | |
204 | 202 | |
205 | 203 |
@pytest.fixture |
tests/olap.model | ||
---|---|---|
264 | 264 |
"type": "bool", |
265 | 265 |
"value": "\"field_bool\"", |
266 | 266 |
"value_label": "(CASE WHEN \"field_bool\" IS NULL THEN NULL WHEN \"field_bool\" THEN 'Oui' ELSE 'Non' END)" |
267 |
}, |
|
268 |
{ |
|
269 |
"filter" : true, |
|
270 |
"join" : [ |
|
271 |
"item_open" |
|
272 |
], |
|
273 |
"label" : "4rth field", |
|
274 |
"name" : "item_open", |
|
275 |
"type" : "integer", |
|
276 |
"value" : "\"item_open\".id", |
|
277 |
"value_label" : "\"item_open\".label" |
|
267 | 278 |
} |
268 | 279 |
], |
269 | 280 |
"fact_table" : "formdata_demande", |
... | ... | |
329 | 340 |
"master" : "field_item", |
330 | 341 |
"name" : "item", |
331 | 342 |
"table" : "formdata_demande_field_item" |
343 |
}, |
|
344 |
{ |
|
345 |
"detail" : "id", |
|
346 |
"master" : "field_item_open", |
|
347 |
"name" : "item_open", |
|
348 |
"table" : "formdata_demande_field_item_open" |
|
332 | 349 |
} |
333 | 350 |
], |
334 | 351 |
"key" : "id", |
tests/test_wcs.py | ||
---|---|---|
6 | 6 |
import pathlib2 |
7 | 7 |
import mock |
8 | 8 | |
9 |
import utils |
|
10 | ||
9 | 11 | |
10 | 12 |
def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog): |
11 | 13 |
olap_cmd() |
... | ... | |
55 | 57 |
('formdata_demande', 'field_string'), |
56 | 58 |
('formdata_demande', 'field_item'), |
57 | 59 |
('formdata_demande', 'field_bool'), |
60 |
('formdata_demande', 'field_item_open'), |
|
58 | 61 |
('formdata_demande', 'function__receiver'), |
59 | 62 |
('formdata_demande_field_item', 'id'), |
60 | 63 |
('formdata_demande_field_item', 'label'), |
64 |
('formdata_demande_field_item_open', 'id'), |
|
65 |
('formdata_demande_field_item_open', 'label'), |
|
61 | 66 |
('formdef', 'id'), |
62 | 67 |
('formdef', 'category_id'), |
63 | 68 |
('formdef', 'label'), |
... | ... | |
113 | 118 |
with pytest.raises(SystemExit): |
114 | 119 |
olap_cmd(no_log_errors=False) |
115 | 120 |
assert 'Invalid JSON content' in caplog.text |
121 | ||
122 | ||
123 |
def test_dimension_stability(wcs, wcs_dir, postgres_db, tmpdir, olap_cmd, caplog): |
|
124 | ||
125 |
olap_cmd() |
|
126 | ||
127 |
with postgres_db.conn() as conn: |
|
128 |
with conn.cursor() as c: |
|
129 |
c.execute('SET search_path TO \'olap\'') |
|
130 |
c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id') |
|
131 |
refs = c.fetchall() |
|
132 |
assert len(refs) == 3 |
|
133 |
c.execute('SELECT * FROM formdata_demande_field_item_open ORDER BY id') |
|
134 |
open_refs = c.fetchall() |
|
135 |
assert len(open_refs) == 3 |
|
136 | ||
137 |
# Change an item of the field |
|
138 |
script = u""" |
|
139 |
import datetime |
|
140 |
import random |
|
141 |
from quixote import get_publisher |
|
142 |
from wcs.formdef import FormDef |
|
143 |
formdef = FormDef.get_by_urlname('demande') |
|
144 | ||
145 |
for field in formdef.fields: |
|
146 |
if field.label == '2nd field': |
|
147 |
ref_field = field |
|
148 |
break |
|
149 | ||
150 |
ref_field.items = ['foo', 'bar', 'bazouka'] |
|
151 |
formdef.store() |
|
152 | ||
153 |
user = get_publisher().user_class.select()[0] |
|
154 | ||
155 |
formdata = formdef.data_class()() |
|
156 |
formdata.just_created() |
|
157 |
formdata.receipt_time = datetime.datetime(2018, random.randrange(1, 13), random.randrange(1, 29)).timetuple() |
|
158 |
formdata.data = {'1': 'FOO BAR 1'} |
|
159 |
formdata.data['2'] = 'bazouka' |
|
160 |
formdata.data['2_display'] = 'bazouka' |
|
161 |
formdata.data['4'] = 'open_new_value' |
|
162 |
formdata.data['4_display'] = 'open_new_value' |
|
163 |
formdata.jump_status('new') |
|
164 |
formdata.store() |
|
165 |
""" |
|
166 |
utils.run_wcs_script(wcs_dir, script, 'toto') |
|
167 |
olap_cmd() |
|
168 | ||
169 |
# We expect to find in the new dimension table |
|
170 |
# the same records as before (including the one of the item that disappeared) |
|
171 |
# plus the new item |
|
172 |
with postgres_db.conn() as conn: |
|
173 |
with conn.cursor() as c: |
|
174 | ||
175 |
c.execute('SET search_path TO \'olap\'') |
|
176 |
c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id') |
|
177 |
new_refs = c.fetchall() |
|
178 |
assert len(new_refs) == 4 |
|
179 |
for ref in refs: |
|
180 |
assert ref in new_refs |
|
181 |
assert new_refs[-1][1] == 'bazouka' |
|
182 |
bazouka_id = new_refs[-1][0] |
|
183 | ||
184 |
c.execute('SELECT * FROM formdata_demande_field_item_open ORDER BY id') |
|
185 |
new_open_refs = c.fetchall() |
|
186 |
assert len(new_open_refs) == 4 |
|
187 |
for ref in open_refs: |
|
188 |
assert ref in new_open_refs |
|
189 |
assert new_open_refs[-1][1] == 'open_new_value' |
|
190 |
open_new_id = new_open_refs[-1][0] |
|
191 | ||
192 |
c.execute('''SELECT field_item, field_item_open |
|
193 |
FROM formdata_demande ORDER BY id''') |
|
194 |
formdata = c.fetchone() |
|
195 |
assert formdata[0] == bazouka_id |
|
196 |
assert formdata[1] == open_new_id |
tests/utils.py | ||
---|---|---|
1 |
import os |
|
2 |
import subprocess |
|
3 | ||
4 | ||
5 |
HOSTNAME = '127.0.0.1' |
|
6 |
WCSCTL = os.environ.get('WCSCTL') |
|
7 | ||
8 | ||
9 |
def run_wcs_script(wcs_dir, script, script_name): |
|
10 |
'''Run python script inside w.c.s. environment''' |
|
11 |
script_path = wcs_dir / (script_name + '.py') |
|
12 |
with script_path.open('w') as fd: |
|
13 |
fd.write(script) |
|
14 | ||
15 |
subprocess.check_call( |
|
16 |
[WCSCTL, 'runscript', '--app-dir', str(wcs_dir), '--vhost', HOSTNAME, |
|
17 |
str(script_path)]) |
wcs_olap/feeder.py | ||
---|---|---|
2 | 2 | |
3 | 3 |
import six |
4 | 4 |
import copy |
5 |
import itertools |
|
5 | 6 |
import os |
6 | 7 |
import json |
7 | 8 |
import hashlib |
... | ... | |
342 | 343 |
if comment: |
343 | 344 |
self.ex('COMMENT ON TABLE %s IS %%s' % name, vars=(comment,)) |
344 | 345 | |
345 |
def create_labeled_table(self, name, labels, serial=False, comment=None): |
|
346 |
if serial: |
|
347 |
id_type = 'serial primary key' |
|
348 |
else: |
|
349 |
id_type = 'smallint primary key' |
|
350 |
self.create_table(name, |
|
351 |
[ |
|
352 |
['id', id_type], |
|
353 |
['label', 'varchar'] |
|
354 |
], comment=comment) |
|
355 |
values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels) |
|
356 |
if not values: |
|
357 |
return |
|
358 |
self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values)) |
|
346 |
def prev_table_exists(self, name): |
|
347 |
query = """SELECT EXISTS (SELECT 1 FROM information_schema.tables |
|
348 |
WHERE table_schema = '{schema}' AND table_name = %s)""" |
|
349 |
self.ex(query, vars=(name,)) |
|
350 |
return self.cur.fetchone()[0] |
|
351 | ||
352 |
def create_labeled_table_serial(self, name, comment): |
|
353 |
self.create_table( |
|
354 |
name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment) |
|
355 | ||
356 |
if self.prev_table_exists(name): |
|
357 |
# Insert data from previous table |
|
358 |
self.ex( |
|
359 |
'INSERT INTO {schema_temp}.{name} SELECT * FROM {schema}.{name}', |
|
360 |
ctx={'name': name} |
|
361 |
) |
|
362 |
# Update sequence |
|
363 |
self.ex("""SELECT setval(pg_get_serial_sequence('{name}', 'id'), |
|
364 |
(SELECT MAX(id) FROM {name}))""", ctx={'name': name}) |
|
365 | ||
366 |
def create_labeled_table(self, name, labels, comment=None): |
|
367 |
self.create_table( |
|
368 |
name, |
|
369 |
[ |
|
370 |
['id', 'smallint primary key'], |
|
371 |
['label', 'varchar'] |
|
372 |
], comment=comment) |
|
373 | ||
374 |
if self.prev_table_exists(name): |
|
375 |
# Insert data from previous table |
|
376 |
self.ex( |
|
377 |
'INSERT INTO {schema_temp}.{name} select * FROM {schema}.{name}', |
|
378 |
ctx={'name': name} |
|
379 |
) |
|
380 |
# Find what is missing |
|
381 |
to_insert = [] |
|
382 |
for _id, _label in labels: |
|
383 |
self.ex( |
|
384 |
'SELECT * FROM {name} WHERE label = %s', ctx={'name': name}, vars=(_label,)) |
|
385 |
if self.cur.fetchone() is None: |
|
386 |
to_insert.append(_label) |
|
387 | ||
388 |
labels = None |
|
389 |
if to_insert: |
|
390 |
self.ex('SELECT MAX(id) FROM {name}', ctx={'name': name}) |
|
391 |
next_id = self.cur.fetchone()[0] + 1 |
|
392 |
ids = range(next_id, next_id + len(to_insert)) |
|
393 |
labels = zip(ids, to_insert) |
|
394 | ||
395 |
if labels: |
|
396 |
labels = list(labels) |
|
397 |
tmpl = ', '.join(['(%s, %s)'] * len(labels)) |
|
398 |
query_str = 'INSERT INTO {name} (id, label) VALUES %s' % tmpl |
|
399 |
self.ex(query_str, ctx={'name': name}, vars=list(itertools.chain(*labels))) |
|
400 | ||
401 |
res = {} |
|
402 |
self.ex("SELECT id, label FROM %s" % str(name)) |
|
403 |
for id_, label in self.cur.fetchall(): |
|
404 |
res[label] = id_ |
|
405 |
return res |
|
359 | 406 | |
360 | 407 |
def tpl(self, o, ctx=None): |
361 | 408 |
ctx = ctx or {} |
... | ... | |
379 | 426 | |
380 | 427 |
def do_base_table(self): |
381 | 428 |
# channels |
382 |
self.create_labeled_table('{channel_table}', [[c[0], c[2]] for c in self.channels],
|
|
429 |
self.create_labeled_table('channel', [[c[0], c[2]] for c in self.channels],
|
|
383 | 430 |
comment=u'canal') |
384 | 431 | |
385 | 432 |
# roles |
386 | 433 |
roles = dict((i, role.name) for i, role in enumerate(self.roles)) |
387 |
self.create_labeled_table('{role_table}', roles.items(), comment=u'role') |
|
388 |
self.role_mapping = dict((role.id, i) for i, role in enumerate(self.roles)) |
|
434 |
tmp_role_map = self.create_labeled_table('role', roles.items(), comment=u'role') |
|
435 |
self.role_mapping = dict( |
|
436 |
(role.id, tmp_role_map[role.name]) for role in self.roles) |
|
389 | 437 | |
390 | 438 |
# categories |
391 |
self.create_labeled_table('{category_table}', enumerate(c.name for c in self.categories),
|
|
392 |
comment=u'catégorie')
|
|
393 |
self.categories_mapping = dict((c.id, i) for i, c in enumerate(self.categories))
|
|
439 |
tmp_cat_map = self.create_labeled_table(
|
|
440 |
'category', enumerate(c.name for c in self.categories), comment=u'catégorie')
|
|
441 |
self.categories_mapping = dict((c.id, tmp_cat_map[c.name]) for c in self.categories)
|
|
394 | 442 | |
395 |
self.create_labeled_table('{hour_table}', zip(range(0, 24), map(str, range(0, 24))),
|
|
443 |
self.create_labeled_table('hour', zip(range(0, 24), map(str, range(0, 24))),
|
|
396 | 444 |
comment=u'heures') |
397 | 445 | |
398 |
self.create_labeled_table('{generic_status_table}', self.status,
|
|
446 |
self.create_labeled_table('status', self.status,
|
|
399 | 447 |
comment=u'statuts simplifiés') |
400 | 448 |
self.ex('CREATE TABLE {form_table} (id serial PRIMARY KEY,' |
401 | 449 |
' category_id integer REFERENCES {category_table} (id),' |
402 | 450 |
' label varchar)') |
403 | 451 |
self.ex('COMMENT ON TABLE {form_table} IS %s', vars=(u'types de formulaire',)) |
404 | 452 |
# agents |
405 |
self.create_labeled_table('{agent_table}', [], serial=True, comment=u'agents')
|
|
453 |
self.create_labeled_table_serial('agent', comment=u'agents')
|
|
406 | 454 | |
407 | 455 |
self.columns = [ |
408 | 456 |
['id', 'serial primary key'], |
... | ... | |
476 | 524 |
self.connection.close() |
477 | 525 | |
478 | 526 |
def insert_agent(self, name): |
479 |
self.ex('INSERT INTO {agent_table} (label) VALUES (%s) RETURNING (id)', vars=[name]) |
|
527 |
self.ex('SELECT id FROM {agent_table} WHERE label = %s', vars=(name,)) |
|
528 |
res = self.cur.fetchone() |
|
529 |
if res: |
|
530 |
return res[0] |
|
531 |
self.ex('INSERT INTO {agent_table} (label) VALUES (%s) RETURNING (id)', vars=(name,)) |
|
480 | 532 |
return self.cur.fetchone()[0] |
481 | 533 | |
482 | 534 |
def get_agent(self, user): |
... | ... | |
523 | 575 | |
524 | 576 |
def do_statuses(self): |
525 | 577 |
statuses = self.formdef.schema.workflow.statuses |
526 |
self.olap_feeder.create_labeled_table(self.status_table_name, |
|
527 |
enumerate([s.name for s in statuses]), |
|
528 |
comment=u'statuts du formulaire « %s »' % |
|
529 |
self.formdef.schema.name) |
|
530 |
self.status_mapping = dict((s.id, i) for i, s in enumerate(statuses)) |
|
578 |
tmp_status_map = self.olap_feeder.create_labeled_table( |
|
579 |
self.status_table_name, enumerate([s.name for s in statuses]), |
|
580 |
comment=u'statuts du formulaire « %s »' % self.formdef.schema.name) |
|
581 |
self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses) |
|
531 | 582 | |
532 | 583 |
def do_data_table(self): |
533 | 584 |
self.ex('INSERT INTO {form_table} (category_id, label) VALUES (%s, %s) RETURNING (id)', |
... | ... | |
560 | 611 |
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname) |
561 | 612 |
# create table and mapping |
562 | 613 |
if field.items: |
563 |
self.create_labeled_table(table_name, enumerate(field.items), |
|
564 |
comment=comment) |
|
565 |
self.items_mappings[field.varname] = dict( |
|
566 |
(item, i) for i, item in enumerate(field.items)) |
|
614 |
self.items_mappings[field.varname] = self.create_labeled_table( |
|
615 |
table_name, enumerate(field.items), comment=comment) |
|
567 | 616 |
else: |
568 | 617 |
# open item field, from data sources... |
569 |
self.create_labeled_table(table_name, [], serial=True, comment=comment)
|
|
618 |
self.create_labeled_table_serial(table_name, comment=comment)
|
|
570 | 619 |
field_def = 'smallint REFERENCES %s (id)' % table_name |
571 | 620 |
elif field.type == 'bool': |
572 | 621 |
field_def = 'boolean' |
... | ... | |
629 | 678 | |
630 | 679 |
def insert_item_value(self, field, value): |
631 | 680 |
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname) |
632 |
self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=[value], |
|
681 |
self.ex("SELECT id FROM {item_table} WHERE label = %s", |
|
682 |
ctx={'item_table': table_name}, vars=(value,)) |
|
683 |
res = self.cur.fetchone() |
|
684 |
if res: |
|
685 |
return res[0] |
|
686 |
self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=(value,), |
|
633 | 687 |
ctx={'item_table': table_name}) |
634 | 688 |
return self.cur.fetchone()[0] |
635 | 689 | |
636 |
- |