0001-re-use-data-from-previous-run-in-dimension-tables-30.patch
tests/conftest.py | ||
---|---|---|
11 | 11 |
from collections import namedtuple |
12 | 12 | |
13 | 13 |
import psycopg2 |
14 | ||
15 | 14 |
import pytest |
16 | 15 | |
16 |
import utils |
|
17 | ||
18 | ||
17 | 19 |
Wcs = namedtuple('Wcs', ['url', 'appdir', 'pid']) |
18 | 20 | |
19 | 21 | |
... | ... | |
92 | 94 |
fields.ItemField(id='2', label='2nd field', type='item', |
93 | 95 |
items=['foo', 'bar', 'baz'], varname='item'), |
94 | 96 |
fields.BoolField(id='3', label='3rd field', type='bool', varname='bool'), |
97 |
fields.ItemField(id='4', label='4rth field', type='item', varname='item_open'), |
|
95 | 98 |
] |
96 | 99 |
formdef.store() |
97 | 100 | |
... | ... | |
105 | 108 |
if i%4 == 0: |
106 | 109 |
formdata.data['2'] = 'foo' |
107 | 110 |
formdata.data['2_display'] = 'foo' |
111 |
formdata.data['4'] = 'open_one' |
|
112 |
formdata.data['4_display'] = 'open_one' |
|
108 | 113 |
elif i%4 == 1: |
109 | 114 |
formdata.data['2'] = 'bar' |
110 | 115 |
formdata.data['2_display'] = 'bar' |
116 |
formdata.data['4'] = 'open_two' |
|
117 |
formdata.data['4_display'] = 'open_two' |
|
111 | 118 |
else: |
112 | 119 |
formdata.data['2'] = 'baz' |
113 | 120 |
formdata.data['2_display'] = 'baz' |
121 |
formdata.data['4'] = 'open_three' |
|
122 |
formdata.data['4_display'] = 'open_three' |
|
123 | ||
114 | 124 |
formdata.data['3'] = bool(i % 2) |
115 | 125 |
if i%3 == 0: |
116 | 126 |
formdata.jump_status('new') |
... | ... | |
123 | 133 |
} |
124 | 134 | |
125 | 135 | |
126 |
@pytest.fixture(scope='session') |
|
127 |
def wcs(tmp_path_factory): |
|
136 |
@pytest.fixture |
|
137 |
def wcs_dir(tmp_path_factory): |
|
138 |
return tmp_path_factory.mktemp('wcs') |
|
139 | ||
140 | ||
141 |
@pytest.fixture |
|
142 |
def wcs(tmp_path_factory, wcs_dir): |
|
128 | 143 |
'''Session scoped wcs fixture, so read-only.''' |
129 |
if 'WCSCTL' not in os.environ or not os.path.exists(os.environ['WCSCTL']): |
|
130 |
pytest.skip('WCSCTL not defined in environment') |
|
131 |
WCSCTL = os.environ.get('WCSCTL') |
|
132 |
WCS_DIR = tmp_path_factory.mktemp('wcs') |
|
133 |
HOSTNAME = '127.0.0.1' |
|
134 | 144 |
PORT = 8899 |
135 | 145 |
ADDRESS = '0.0.0.0' |
136 | 146 |
WCS_PID = None |
137 | 147 | |
138 |
def run_wcs_script(script, hostname): |
|
139 |
'''Run python script inside w.c.s. environment''' |
|
140 | ||
141 |
script_path = WCS_DIR / (script + '.py') |
|
142 |
with script_path.open('w') as fd: |
|
143 |
fd.write(WCS_SCRIPTS[script]) |
|
144 | ||
145 |
subprocess.check_call( |
|
146 |
[WCSCTL, 'runscript', '--app-dir', str(WCS_DIR), '--vhost', hostname, |
|
147 |
str(script_path)]) |
|
148 | ||
149 |
tenant_dir = WCS_DIR / HOSTNAME |
|
148 |
tenant_dir = wcs_dir / utils.HOSTNAME |
|
150 | 149 |
tenant_dir.mkdir() |
151 | 150 | |
152 |
run_wcs_script('setup-auth', HOSTNAME)
|
|
153 |
run_wcs_script('create-user', HOSTNAME)
|
|
154 |
run_wcs_script('create-data', HOSTNAME)
|
|
151 |
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['setup-auth'], 'setup-auth')
|
|
152 |
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-user'], 'create-user')
|
|
153 |
utils.run_wcs_script(wcs_dir, WCS_SCRIPTS['create-data'], 'create-data')
|
|
155 | 154 | |
156 | 155 |
with (tenant_dir / 'site-options.cfg').open('w') as fd: |
157 | 156 |
fd.write(u'''[api-secrets] |
158 | 157 |
olap = olap |
159 | 158 |
''') |
160 | 159 | |
161 |
with (WCS_DIR / 'wcs.cfg').open('w') as fd:
|
|
160 |
with (wcs_dir / 'wcs.cfg').open('w') as fd:
|
|
162 | 161 |
fd.write(u'''[main] |
163 |
app_dir = %s\n''' % WCS_DIR)
|
|
162 |
app_dir = %s\n''' % wcs_dir)
|
|
164 | 163 | |
165 |
with (WCS_DIR / 'local_settings.py').open('w') as fd:
|
|
164 |
with (wcs_dir / 'local_settings.py').open('w') as fd:
|
|
166 | 165 |
fd.write(u''' |
167 | 166 |
WCS_LEGACY_CONFIG_FILE = '%s/wcs.cfg' |
168 | 167 |
THEMES_DIRECTORY = '/' |
169 | 168 |
ALLOWED_HOSTS = ['%s'] |
170 |
''' % (WCS_DIR, HOSTNAME))
|
|
169 |
''' % (wcs_dir, utils.HOSTNAME))
|
|
171 | 170 | |
172 | 171 |
# launch a Django worker for running w.c.s. |
173 | 172 |
WCS_PID = os.fork() |
174 | 173 |
if not WCS_PID: |
175 |
os.chdir(os.path.dirname(WCSCTL)) |
|
174 |
os.chdir(os.path.dirname(utils.WCSCTL))
|
|
176 | 175 |
os.environ['DJANGO_SETTINGS_MODULE'] = 'wcs.settings' |
177 |
os.environ['WCS_SETTINGS_FILE'] = str(WCS_DIR / 'local_settings.py')
|
|
176 |
os.environ['WCS_SETTINGS_FILE'] = str(wcs_dir / 'local_settings.py')
|
|
178 | 177 |
os.execvp('python', ['python', 'manage.py', 'runserver', '--noreload', '%s:%s' % (ADDRESS, PORT)]) |
179 | 178 |
sys.exit(0) |
180 | 179 | |
... | ... | |
197 | 196 |
if pid: |
198 | 197 |
assert False, 'w.c.s. stopped with exit-code %s' % exit_code |
199 | 198 | |
200 |
yield Wcs(url='http://%s:%s/' % (HOSTNAME, PORT), appdir=WCS_DIR, pid=WCS_PID)
|
|
199 |
yield Wcs(url='http://%s:%s/' % (utils.HOSTNAME, PORT), appdir=wcs_dir, pid=WCS_PID)
|
|
201 | 200 |
os.kill(WCS_PID, 9) |
202 |
shutil.rmtree(str(WCS_DIR))
|
|
201 |
shutil.rmtree(str(wcs_dir))
|
|
203 | 202 | |
204 | 203 | |
205 | 204 |
@pytest.fixture |
tests/olap.model | ||
---|---|---|
264 | 264 |
"type": "bool", |
265 | 265 |
"value": "\"field_bool\"", |
266 | 266 |
"value_label": "(CASE WHEN \"field_bool\" IS NULL THEN NULL WHEN \"field_bool\" THEN 'Oui' ELSE 'Non' END)" |
267 |
}, |
|
268 |
{ |
|
269 |
"filter" : true, |
|
270 |
"join" : [ |
|
271 |
"item_open" |
|
272 |
], |
|
273 |
"label" : "4rth field", |
|
274 |
"name" : "item_open", |
|
275 |
"type" : "integer", |
|
276 |
"value" : "\"item_open\".id", |
|
277 |
"value_label" : "\"item_open\".label" |
|
267 | 278 |
} |
268 | 279 |
], |
269 | 280 |
"fact_table" : "formdata_demande", |
... | ... | |
329 | 340 |
"master" : "field_item", |
330 | 341 |
"name" : "item", |
331 | 342 |
"table" : "formdata_demande_field_item" |
343 |
}, |
|
344 |
{ |
|
345 |
"detail" : "id", |
|
346 |
"master" : "field_item_open", |
|
347 |
"name" : "item_open", |
|
348 |
"table" : "formdata_demande_field_item_open" |
|
332 | 349 |
} |
333 | 350 |
], |
334 | 351 |
"key" : "id", |
tests/test_wcs.py | ||
---|---|---|
6 | 6 |
import pathlib2 |
7 | 7 |
import mock |
8 | 8 | |
9 |
import utils |
|
10 | ||
9 | 11 | |
10 | 12 |
def test_wcs_fixture(wcs, postgres_db, tmpdir, olap_cmd, caplog): |
11 | 13 |
olap_cmd() |
... | ... | |
55 | 57 |
('formdata_demande', 'field_string'), |
56 | 58 |
('formdata_demande', 'field_item'), |
57 | 59 |
('formdata_demande', 'field_bool'), |
60 |
('formdata_demande', 'field_item_open'), |
|
58 | 61 |
('formdata_demande', 'function__receiver'), |
59 | 62 |
('formdata_demande_field_item', 'id'), |
60 | 63 |
('formdata_demande_field_item', 'label'), |
64 |
('formdata_demande_field_item_open', 'id'), |
|
65 |
('formdata_demande_field_item_open', 'label'), |
|
61 | 66 |
('formdef', 'id'), |
62 | 67 |
('formdef', 'category_id'), |
63 | 68 |
('formdef', 'label'), |
... | ... | |
77 | 82 |
c.execute('SELECT table_name, column_name ' |
78 | 83 |
'FROM information_schema.columns ' |
79 | 84 |
'WHERE table_schema = \'olap\' ORDER BY table_name, ordinal_position') |
80 | ||
81 | 85 |
assert list(c.fetchall()) == expected_schema |
82 | 86 | |
83 | 87 |
# verify JSON schema |
... | ... | |
113 | 117 |
with pytest.raises(SystemExit): |
114 | 118 |
olap_cmd(no_log_errors=False) |
115 | 119 |
assert 'Invalid JSON content' in caplog.text |
120 | ||
121 | ||
122 |
def test_dimension_stability(wcs, wcs_dir, postgres_db, tmpdir, olap_cmd, caplog): |
|
123 | ||
124 |
olap_cmd() |
|
125 | ||
126 |
with postgres_db.conn() as conn: |
|
127 |
with conn.cursor() as c: |
|
128 |
c.execute('SET search_path TO \'olap\'') |
|
129 |
c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id') |
|
130 |
refs = c.fetchall() |
|
131 |
assert len(refs) == 3 |
|
132 |
c.execute('SELECT * FROM formdata_demande_field_item_open ORDER BY id') |
|
133 |
open_refs = c.fetchall() |
|
134 |
assert len(open_refs) == 3 |
|
135 | ||
136 |
# Change an item of the field |
|
137 |
script = u""" |
|
138 |
import datetime |
|
139 |
import random |
|
140 |
from quixote import get_publisher |
|
141 |
from wcs.formdef import FormDef |
|
142 |
formdef = FormDef.get_by_urlname('demande') |
|
143 | ||
144 |
for field in formdef.fields: |
|
145 |
if field.label == '2nd field': |
|
146 |
ref_field = field |
|
147 |
break |
|
148 | ||
149 |
ref_field.items = ['foo', 'bar', 'bazouka'] |
|
150 |
formdef.store() |
|
151 | ||
152 |
user = get_publisher().user_class.select()[0] |
|
153 | ||
154 |
formdata = formdef.data_class()() |
|
155 |
formdata.just_created() |
|
156 |
formdata.receipt_time = datetime.datetime(2018, random.randrange(1, 13), random.randrange(1, 29)).timetuple() |
|
157 |
formdata.data = {'1': 'FOO BAR 1'} |
|
158 |
formdata.data['2'] = 'bazouka' |
|
159 |
formdata.data['2_display'] = 'bazouka' |
|
160 |
formdata.data['4'] = 'open_new_value' |
|
161 |
formdata.data['4_display'] = 'open_new_value' |
|
162 |
formdata.jump_status('new') |
|
163 |
formdata.store() |
|
164 |
""" |
|
165 |
utils.run_wcs_script(wcs_dir, script, 'toto') |
|
166 |
olap_cmd() |
|
167 | ||
168 |
# We expect to find in the new dimension table |
|
169 |
# the same records as before (including the one of the item that disappeared) |
|
170 |
# plus the new item |
|
171 |
with postgres_db.conn() as conn: |
|
172 |
with conn.cursor() as c: |
|
173 | ||
174 |
c.execute('SET search_path TO \'olap\'') |
|
175 |
c.execute('SELECT * FROM formdata_demande_field_item ORDER BY id') |
|
176 |
new_refs = c.fetchall() |
|
177 |
assert len(new_refs) == 4 |
|
178 |
for ref in refs: |
|
179 |
assert ref in new_refs |
|
180 |
assert new_refs[-1][1] == 'bazouka' |
|
181 |
bazouka_id = new_refs[-1][0] |
|
182 | ||
183 |
c.execute('SELECT * FROM formdata_demande_field_item_open ORDER BY id') |
|
184 |
new_open_refs = c.fetchall() |
|
185 |
assert len(new_open_refs) == 4 |
|
186 |
for ref in open_refs: |
|
187 |
assert ref in new_open_refs |
|
188 |
assert new_open_refs[-1][1] == 'open_new_value' |
|
189 |
open_new_id = new_open_refs[-1][0] |
|
190 | ||
191 |
c.execute('''SELECT field_item, field_item_open |
|
192 |
FROM formdata_demande ORDER BY id''') |
|
193 |
formdata = c.fetchone() |
|
194 |
assert formdata[0] == bazouka_id |
|
195 |
assert formdata[1] == open_new_id |
tests/utils.py | ||
---|---|---|
1 |
import os |
|
2 |
import subprocess |
|
3 | ||
4 | ||
5 |
HOSTNAME = '127.0.0.1' |
|
6 |
WCSCTL = os.environ.get('WCSCTL') |
|
7 | ||
8 | ||
9 |
def run_wcs_script(wcs_dir, script, script_name): |
|
10 |
'''Run python script inside w.c.s. environment''' |
|
11 |
script_path = wcs_dir / (script_name + '.py') |
|
12 |
with script_path.open('w') as fd: |
|
13 |
fd.write(script) |
|
14 | ||
15 |
subprocess.check_call( |
|
16 |
[WCSCTL, 'runscript', '--app-dir', str(wcs_dir), '--vhost', HOSTNAME, |
|
17 |
str(script_path)]) |
wcs_olap/feeder.py | ||
---|---|---|
45 | 45 | |
46 | 46 | |
47 | 47 |
class WcsOlapFeeder(object): |
48 | ||
49 |
channels = [ |
|
50 |
[1, 'web', u'web'], |
|
51 |
[2, 'mail', u'courrier'], |
|
52 |
[3, 'phone', u'téléphone'], |
|
53 |
[4, 'counter', u'guichet'], |
|
54 |
[5, 'backoffice', u'backoffice'], |
|
55 |
[6, 'email', u'email'], |
|
56 |
[7, 'fax', u'fax'], |
|
57 |
] |
|
58 |
channel_to_id = dict((c[1], c[0]) for c in channels) |
|
59 |
id_to_channel = dict((c[0], c[1]) for c in channels) |
|
60 | ||
61 |
status = [ |
|
62 |
[1, 'Nouveau'], |
|
63 |
[2, 'En cours'], |
|
64 |
[3, 'Terminé'], |
|
65 |
] |
|
66 |
status_to_id = dict((c[1], c[0]) for c in channels) |
|
67 |
id_to_status = dict((c[0], c[1]) for c in channels) |
|
68 | ||
48 | 69 |
def __init__(self, api, pg_dsn, schema, logger=None, config=None, do_feed=True, fake=False): |
49 | 70 |
self.api = api |
50 | 71 |
self.fake = fake |
... | ... | |
239 | 260 |
self.connection = psycopg2.connect(dsn=pg_dsn) |
240 | 261 |
self.connection.autocommit = True |
241 | 262 |
self.cur = self.connection.cursor() |
263 |
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, self.cur) |
|
264 | ||
242 | 265 |
try: |
243 | 266 |
self.has_jsonb = self.detect_jsonb() |
244 | 267 |
if self.has_jsonb: |
... | ... | |
310 | 333 |
generate_series('2010-01-01'::date, '2020-01-01'::date, '1 day'::interval) |
311 | 334 |
AS the_date(the_date));''') |
312 | 335 | |
313 |
channels = [ |
|
314 |
[1, 'web', u'web'], |
|
315 |
[2, 'mail', u'courrier'], |
|
316 |
[3, 'phone', u'téléphone'], |
|
317 |
[4, 'counter', u'guichet'], |
|
318 |
[5, 'backoffice', u'backoffice'], |
|
319 |
[6, 'email', u'email'], |
|
320 |
[7, 'fax', u'fax'], |
|
321 |
] |
|
322 |
channel_to_id = dict((c[1], c[0]) for c in channels) |
|
323 |
id_to_channel = dict((c[0], c[1]) for c in channels) |
|
324 | ||
325 |
status = [ |
|
326 |
[1, 'Nouveau'], |
|
327 |
[2, 'En cours'], |
|
328 |
[3, 'Terminé'], |
|
329 |
] |
|
330 |
status_to_id = dict((c[1], c[0]) for c in channels) |
|
331 |
id_to_status = dict((c[0], c[1]) for c in channels) |
|
332 | ||
333 | 336 |
def create_table(self, name, columns, inherits=None, comment=None): |
334 | 337 |
sql = 'CREATE TABLE %s' % name |
335 | 338 |
sql += '(' + ', '.join('%s %s' % (n, t) for n, t in columns) + ')' |
... | ... | |
339 | 342 |
if comment: |
340 | 343 |
self.ex('COMMENT ON TABLE %s IS %%s' % name, vars=(comment,)) |
341 | 344 | |
342 |
def create_labeled_table(self, name, labels, serial=False, comment=None): |
|
343 |
if serial: |
|
344 |
id_type = 'serial primary key' |
|
345 |
else: |
|
346 |
id_type = 'smallint primary key' |
|
347 |
self.create_table(name, |
|
348 |
[ |
|
349 |
['id', id_type], |
|
350 |
['label', 'varchar'] |
|
351 |
], comment=comment) |
|
352 |
values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels) |
|
353 |
if not values: |
|
354 |
return |
|
355 |
self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values)) |
|
345 |
def prev_table_exists(self, name): |
|
346 |
query = """SELECT EXISTS (SELECT 1 FROM information_schema.tables |
|
347 |
WHERE table_schema = '{schema}' AND table_name = '%s')""" % name |
|
348 |
self.ex(query) |
|
349 |
return self.cur.fetchone()[0] |
|
350 | ||
351 |
def create_labeled_table_serial(self, name, comment): |
|
352 |
self.create_table( |
|
353 |
name, [['id', 'serial primary key'], ['label', 'varchar']], comment=comment) |
|
354 | ||
355 |
if self.prev_table_exists(name): |
|
356 |
# Insert data from previous table |
|
357 |
self.ex( |
|
358 |
'INSERT INTO {schema_temp}.%(name)s select * FROM {schema}.%(name)s' |
|
359 |
% {'name': name} |
|
360 |
) |
|
361 |
# Update sequence |
|
362 |
self.ex("""SELECT setval(pg_get_serial_sequence('%(name)s', 'id'), |
|
363 |
(SELECT MAX(id) FROM %(name)s))""" % {'name': name}) |
|
364 | ||
365 |
def create_labeled_table(self, name, labels, comment=None): |
|
366 |
self.create_table( |
|
367 |
name, |
|
368 |
[ |
|
369 |
['id', 'smallint primary key'], |
|
370 |
['label', 'varchar'] |
|
371 |
], comment=comment) |
|
372 | ||
373 |
if self.prev_table_exists(name): |
|
374 |
# Insert data from previous table |
|
375 |
self.ex( |
|
376 |
'INSERT INTO {schema_temp}.%(name)s select * FROM {schema}.%(name)s' |
|
377 |
% {'name': name} |
|
378 |
) |
|
379 |
# Find what is missing |
|
380 |
to_insert = [] |
|
381 |
for _id, _label in labels: |
|
382 |
self.ex("SELECT * FROM %s WHERE label = '%s'" % (name, _label)) |
|
383 |
if self.cur.fetchone() is None: |
|
384 |
to_insert.append(_label) |
|
385 | ||
386 |
labels = None |
|
387 |
if to_insert: |
|
388 |
self.ex('SELECT MAX(id) FROM %s' % name) |
|
389 |
next_id = self.cur.fetchone()[0] + 1 |
|
390 |
ids = range(next_id, next_id + len(to_insert)) |
|
391 |
labels = zip(ids, to_insert) |
|
392 | ||
393 |
if labels: |
|
394 |
values = ', '.join(self.cur.mogrify('(%s, %s)', [_id, _label]) for _id, _label in labels) |
|
395 |
self.ex('INSERT INTO %s (id, label) VALUES %s' % (str(name), values)) |
|
396 | ||
397 |
res = {} |
|
398 |
self.ex("SELECT id, label FROM %s" % str(name)) |
|
399 |
for id_, label in self.cur.fetchall(): |
|
400 |
res[label] = id_ |
|
401 |
return res |
|
356 | 402 | |
357 | 403 |
def tpl(self, o, ctx=None): |
358 | 404 |
ctx = ctx or {} |
... | ... | |
381 | 427 | |
382 | 428 |
# roles |
383 | 429 |
roles = dict((i, role.name) for i, role in enumerate(self.roles)) |
384 |
self.create_labeled_table('{role_table}', roles.items(), comment=u'role') |
|
385 |
self.role_mapping = dict((role.id, i) for i, role in enumerate(self.roles)) |
|
430 |
tmp_role_map = self.create_labeled_table('{role_table}', roles.items(), comment=u'role') |
|
431 |
self.role_mapping = dict( |
|
432 |
(role.id, tmp_role_map[role.name]) for role in self.roles) |
|
386 | 433 | |
387 | 434 |
# categories |
388 |
self.create_labeled_table('{category_table}', enumerate(c.name for c in self.categories),
|
|
389 |
comment=u'catégorie')
|
|
390 |
self.categories_mapping = dict((c.id, i) for i, c in enumerate(self.categories))
|
|
435 |
tmp_cat_map = self.create_labeled_table(
|
|
436 |
'{category_table}', enumerate(c.name for c in self.categories), comment=u'catégorie')
|
|
437 |
self.categories_mapping = dict((c.id, tmp_cat_map[c.name]) for c in self.categories)
|
|
391 | 438 | |
392 | 439 |
self.create_labeled_table('{hour_table}', zip(range(0, 24), map(str, range(0, 24))), |
393 | 440 |
comment=u'heures') |
... | ... | |
399 | 446 |
' label varchar)') |
400 | 447 |
self.ex('COMMENT ON TABLE {form_table} IS %s', vars=(u'types de formulaire',)) |
401 | 448 |
# agents |
402 |
self.create_labeled_table('{agent_table}', [], serial=True, comment=u'agents')
|
|
449 |
self.create_labeled_table_serial('{agent_table}', comment=u'agents')
|
|
403 | 450 | |
404 | 451 |
self.columns = [ |
405 | 452 |
['id', 'serial primary key'], |
... | ... | |
473 | 520 |
self.connection.close() |
474 | 521 | |
475 | 522 |
def insert_agent(self, name): |
523 |
self.ex("SELECT id FROM {agent_table} WHERE label = '%s'" % name) |
|
524 |
res = self.cur.fetchone() |
|
525 |
if res: |
|
526 |
return res[0] |
|
476 | 527 |
self.ex('INSERT INTO {agent_table} (label) VALUES (%s) RETURNING (id)', vars=[name]) |
477 | 528 |
return self.cur.fetchone()[0] |
478 | 529 | |
... | ... | |
520 | 571 | |
521 | 572 |
def do_statuses(self): |
522 | 573 |
statuses = self.formdef.schema.workflow.statuses |
523 |
self.olap_feeder.create_labeled_table(self.status_table_name, |
|
524 |
enumerate([s.name for s in statuses]), |
|
525 |
comment=u'statuts du formulaire « %s »' % |
|
526 |
self.formdef.schema.name) |
|
527 |
self.status_mapping = dict((s.id, i) for i, s in enumerate(statuses)) |
|
574 |
tmp_status_map = self.olap_feeder.create_labeled_table( |
|
575 |
self.status_table_name, enumerate([s.name for s in statuses]), |
|
576 |
comment=u'statuts du formulaire « %s »' % self.formdef.schema.name) |
|
577 |
self.status_mapping = dict((s.id, tmp_status_map[s.name]) for s in statuses) |
|
528 | 578 | |
529 | 579 |
def do_data_table(self): |
530 | 580 |
self.ex('INSERT INTO {form_table} (category_id, label) VALUES (%s, %s) RETURNING (id)', |
... | ... | |
557 | 607 |
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname) |
558 | 608 |
# create table and mapping |
559 | 609 |
if field.items: |
560 |
self.create_labeled_table(table_name, enumerate(field.items), |
|
561 |
comment=comment) |
|
562 |
self.items_mappings[field.varname] = dict( |
|
563 |
(item, i) for i, item in enumerate(field.items)) |
|
610 |
self.items_mappings[field.varname] = self.create_labeled_table( |
|
611 |
table_name, enumerate(field.items), comment=comment) |
|
564 | 612 |
elif field.options: |
565 | 613 |
options = enumerate(field.options) |
566 |
self.create_labeled_table(table_name, [(i, o['label']) for i, o in options], |
|
567 |
comment=comment) |
|
568 |
self.items_mappings[field.varname] = dict((o['value'], i) for i, o in options) |
|
614 |
tmp_options_map = self.create_labeled_table( |
|
615 |
table_name, [(i, o['label']) for i, o in options], comment=comment) |
|
616 |
self.items_mappings[field.varname] = dict( |
|
617 |
(o['value'], tmp_options_map[o['value']]) for i, o in options) |
|
569 | 618 |
else: |
570 | 619 |
# open item field, from data sources... |
571 |
self.create_labeled_table(table_name, [], serial=True, comment=comment)
|
|
620 |
self.create_labeled_table_serial(table_name, comment=comment)
|
|
572 | 621 |
field_def = 'smallint REFERENCES %s (id)' % table_name |
573 | 622 |
elif field.type == 'bool': |
574 | 623 |
field_def = 'boolean' |
... | ... | |
631 | 680 | |
632 | 681 |
def insert_item_value(self, field, value): |
633 | 682 |
table_name = self.hash_table_name('{formdata_table}_field_%s' % field.varname) |
683 |
self.ex("SELECT id FROM {item_table} WHERE label = '%s'" % value, |
|
684 |
ctx={'item_table': table_name}) |
|
685 |
res = self.cur.fetchone() |
|
686 |
if res: |
|
687 |
return res[0] |
|
634 | 688 |
self.ex('INSERT INTO {item_table} (label) VALUES (%s) RETURNING (id)', vars=[value], |
635 | 689 |
ctx={'item_table': table_name}) |
636 | 690 |
return self.cur.fetchone()[0] |
637 |
- |