0001-testdef-add-new-table-to-hold-form-tests-71296.patch
tests/test_sql.py | ||
---|---|---|
15 | 15 |
from wcs.formdata import Evolution |
16 | 16 |
from wcs.formdef import FormDef |
17 | 17 |
from wcs.qommon import force_str |
18 |
from wcs.testdef import TestDef |
|
18 | 19 |
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem |
19 | 20 |
from wcs.workflows import Workflow, WorkflowCriticalityLevel |
20 | 21 | |
... | ... | |
2318 | 2319 |
# check it's no longer needed afterwards |
2319 | 2320 |
sql.migrate() |
2320 | 2321 |
assert sql.is_reindex_needed('anonymise', conn=conn, cur=cur) is False |
2322 | ||
2323 | ||
2324 |
def test_sql_testdef_unicity(pub): |
|
2325 |
TestDef.do_table() |
|
2326 | ||
2327 |
testdef = TestDef() |
|
2328 |
testdef.slug = 'test-1' |
|
2329 |
testdef.object_type = 'formdef' |
|
2330 |
testdef.object_id = '1' |
|
2331 |
testdef.store() |
|
2332 | ||
2333 |
# same slug, different object_id |
|
2334 |
testdef.id = None |
|
2335 |
testdef.object_id = '2' |
|
2336 |
testdef.store() |
|
2337 | ||
2338 |
# same slug, object_id and object_type |
|
2339 |
testdef.id = None |
|
2340 |
with pytest.raises(psycopg2.errors.UniqueViolation): |
|
2341 |
testdef.store() |
wcs/sql.py | ||
---|---|---|
4282 | 4282 |
return catalog |
4283 | 4283 | |
4284 | 4284 | |
4285 |
class TestDef(SqlMixin): |
|
4286 |
_table_name = 'testdef' |
|
4287 |
_table_static_fields = [ |
|
4288 |
('id', 'serial'), |
|
4289 |
('name', 'varchar'), |
|
4290 |
('slug', 'varchar'), |
|
4291 |
('object_type', 'varchar'), |
|
4292 |
('object_id', 'varchar'), |
|
4293 |
('data', 'jsonb'), |
|
4294 |
] |
|
4295 | ||
4296 |
id = None |
|
4297 | ||
4298 |
@classmethod |
|
4299 |
@guard_postgres |
|
4300 |
def do_table(cls, conn=None, cur=None): |
|
4301 |
conn, cur = get_connection_and_cursor() |
|
4302 |
table_name = cls._table_name |
|
4303 | ||
4304 |
cur.execute( |
|
4305 |
'''SELECT COUNT(*) FROM information_schema.tables |
|
4306 |
WHERE table_schema = 'public' |
|
4307 |
AND table_name = %s''', |
|
4308 |
(table_name,), |
|
4309 |
) |
|
4310 |
if cur.fetchone()[0] == 0: |
|
4311 |
cur.execute( |
|
4312 |
'''CREATE TABLE %s (id SERIAL PRIMARY KEY, |
|
4313 |
name varchar, |
|
4314 |
slug varchar NOT NULL, |
|
4315 |
object_type varchar NOT NULL, |
|
4316 |
object_id varchar NOT NULL, |
|
4317 |
data jsonb, |
|
4318 |
UNIQUE(slug, object_type, object_id) |
|
4319 |
)''' |
|
4320 |
% table_name |
|
4321 |
) |
|
4322 | ||
4323 |
conn.commit() |
|
4324 |
cur.close() |
|
4325 | ||
4326 |
@guard_postgres |
|
4327 |
def store(self): |
|
4328 |
sql_dict = {x[0]: getattr(self, x[0], None) for x in self._table_static_fields if x[0] != 'id'} |
|
4329 | ||
4330 |
conn, cur = get_connection_and_cursor() |
|
4331 |
column_names = list(sql_dict.keys()) |
|
4332 |
if not self.id: |
|
4333 |
sql_statement = '''INSERT INTO %s (id, %s) |
|
4334 |
VALUES (DEFAULT, %s) |
|
4335 |
RETURNING id''' % ( |
|
4336 |
self._table_name, |
|
4337 |
', '.join(column_names), |
|
4338 |
', '.join(['%%(%s)s' % x for x in column_names]), |
|
4339 |
) |
|
4340 |
cur.execute(sql_statement, sql_dict) |
|
4341 |
self.id = cur.fetchone()[0] |
|
4342 |
else: |
|
4343 |
sql_dict['id'] = self.id |
|
4344 |
sql_statement = '''UPDATE %s SET %s WHERE id = %%(id)s RETURNING id''' % ( |
|
4345 |
self._table_name, |
|
4346 |
', '.join(['%s = %%(%s)s' % (x, x) for x in column_names]), |
|
4347 |
) |
|
4348 |
cur.execute(sql_statement, sql_dict) |
|
4349 | ||
4350 |
conn.commit() |
|
4351 |
cur.close() |
|
4352 | ||
4353 |
@classmethod |
|
4354 |
def _row2ob(cls, row, **kwargs): |
|
4355 |
o = cls.__new__(cls) |
|
4356 |
for attr, value in zip([x[0] for x in cls._table_static_fields], row): |
|
4357 |
setattr(o, attr, value) |
|
4358 |
return o |
|
4359 | ||
4360 |
@classmethod |
|
4361 |
def get_data_fields(cls): |
|
4362 |
return [] |
|
4363 | ||
4364 | ||
4285 | 4365 |
class classproperty: |
4286 | 4366 |
def __init__(self, f): |
4287 | 4367 |
self.f = f |
... | ... | |
4660 | 4740 |
# latest migration, number + description (description is not used |
4661 | 4741 |
# programmaticaly but will make sure git conflicts if two migrations are |
4662 | 4742 |
# separately added with the same number) |
4663 |
SQL_LEVEL = (70, 'repair anonymisation')
|
|
4743 |
SQL_LEVEL = (71, 'add testdef table')
|
|
4664 | 4744 | |
4665 | 4745 | |
4666 | 4746 |
def migrate_global_views(conn, cur): |
... | ... | |
4842 | 4922 |
if sql_level < 68: |
4843 | 4923 |
# 68: multilinguism |
4844 | 4924 |
TranslatableMessage.do_table() |
4925 |
if sql_level < 71: |
|
4926 |
# 71: create testdef table |
|
4927 |
TestDef.do_table() |
|
4845 | 4928 |
if sql_level < 52: |
4846 | 4929 |
# 2: introduction of formdef_id in views |
4847 | 4930 |
# 5: add concerned_roles_array, is_at_endpoint and fts to views |
wcs/testdef.py | ||
---|---|---|
1 |
# w.c.s. - web application for online forms |
|
2 |
# Copyright (C) 2005-2022 Entr'ouvert |
|
3 |
# |
|
4 |
# This program is free software; you can redistribute it and/or modify |
|
5 |
# it under the terms of the GNU General Public License as published by |
|
6 |
# the Free Software Foundation; either version 2 of the License, or |
|
7 |
# (at your option) any later version. |
|
8 |
# |
|
9 |
# This program is distributed in the hope that it will be useful, |
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
12 |
# GNU General Public License for more details. |
|
13 |
# |
|
14 |
# You should have received a copy of the GNU General Public License |
|
15 |
# along with this program; if not, see <http://www.gnu.org/licenses/>. |
|
16 | ||
17 |
from wcs import sql |
|
18 | ||
19 | ||
20 |
class TestDef(sql.TestDef): |
|
21 |
_names = 'testdef' |
|
22 | ||
23 |
name = '' |
|
24 |
slug = None |
|
25 |
object_type = None # (formdef, carddef, etc.) |
|
26 |
object_id = None |
|
27 | ||
28 |
data = None # (json export of formdata, carddata, etc.) |
|
0 |
- |