From a22829dd2296120fdf5ec1973710c9e31722e39a Mon Sep 17 00:00:00 2001 From: Valentin Deniaud Date: Thu, 17 Nov 2022 15:22:15 +0100 Subject: [PATCH 1/3] testdef: add new table to hold form tests (#71296) --- tests/test_sql.py | 21 ++++++++++++ wcs/sql.py | 85 ++++++++++++++++++++++++++++++++++++++++++++++- wcs/testdef.py | 28 ++++++++++++++++ 3 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 wcs/testdef.py diff --git a/tests/test_sql.py b/tests/test_sql.py index 7c39f9bee..8d48661e7 100644 --- a/tests/test_sql.py +++ b/tests/test_sql.py @@ -15,6 +15,7 @@ from wcs.blocks import BlockDef from wcs.formdata import Evolution from wcs.formdef import FormDef from wcs.qommon import force_str +from wcs.testdef import TestDef from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem from wcs.workflows import Workflow, WorkflowCriticalityLevel @@ -2318,3 +2319,23 @@ def test_repair_anonymisation(pub): # check it's no longer needed afterwards sql.migrate() assert sql.is_reindex_needed('anonymise', conn=conn, cur=cur) is False + + +def test_sql_testdef_unicity(pub): + TestDef.do_table() + + testdef = TestDef() + testdef.slug = 'test-1' + testdef.object_type = 'formdef' + testdef.object_id = '1' + testdef.store() + + # same slug, different object_id + testdef.id = None + testdef.object_id = '2' + testdef.store() + + # same slug, object_id and object_type + testdef.id = None + with pytest.raises(psycopg2.errors.UniqueViolation): + testdef.store() diff --git a/wcs/sql.py b/wcs/sql.py index d7d6a1f26..66aaeff85 100644 --- a/wcs/sql.py +++ b/wcs/sql.py @@ -4282,6 +4282,86 @@ class TranslatableMessage(SqlMixin): return catalog +class TestDef(SqlMixin): + _table_name = 'testdef' + _table_static_fields = [ + ('id', 'serial'), + ('name', 'varchar'), + ('slug', 'varchar'), + ('object_type', 'varchar'), + ('object_id', 'varchar'), + ('data', 'jsonb'), + ] + + id = None + + @classmethod + @guard_postgres + def do_table(cls, conn=None, cur=None): + conn, cur = get_connection_and_cursor() + table_name = cls._table_name + + cur.execute( + '''SELECT COUNT(*) FROM information_schema.tables + WHERE table_schema = 'public' + AND table_name = %s''', + (table_name,), + ) + if cur.fetchone()[0] == 0: + cur.execute( + '''CREATE TABLE %s (id SERIAL PRIMARY KEY, + name varchar, + slug varchar NOT NULL, + object_type varchar NOT NULL, + object_id varchar NOT NULL, + data jsonb, + UNIQUE(slug, object_type, object_id) + )''' + % table_name + ) + + conn.commit() + cur.close() + + @guard_postgres + def store(self): + sql_dict = {x[0]: getattr(self, x[0], None) for x in self._table_static_fields if x[0] != 'id'} + + conn, cur = get_connection_and_cursor() + column_names = list(sql_dict.keys()) + if not self.id: + sql_statement = '''INSERT INTO %s (id, %s) + VALUES (DEFAULT, %s) + RETURNING id''' % ( + self._table_name, + ', '.join(column_names), + ', '.join(['%%(%s)s' % x for x in column_names]), + ) + cur.execute(sql_statement, sql_dict) + self.id = cur.fetchone()[0] + else: + sql_dict['id'] = self.id + sql_statement = '''UPDATE %s SET %s WHERE id = %%(id)s RETURNING id''' % ( + self._table_name, + ', '.join(['%s = %%(%s)s' % (x, x) for x in column_names]), + ) + cur.execute(sql_statement, sql_dict) + + conn.commit() + cur.close() + + @classmethod + def _row2ob(cls, row, **kwargs): + o = cls.__new__(cls) + for attr, value in zip([x[0] for x in cls._table_static_fields], row): + setattr(o, attr, value) + return o + + @classmethod + def get_data_fields(cls): + return [] + + class classproperty: def __init__(self, f): self.f = f @@ -4660,7 +4740,7 @@ def get_period_total( # latest migration, number + description (description is not used # programmaticaly but will make sure git conflicts if two migrations are # separately added with the same number) -SQL_LEVEL = (70, 'repair anonymisation') +SQL_LEVEL = (71, 'add testdef table') def migrate_global_views(conn, cur): @@ -4842,6 +4922,9 @@ def migrate(): if sql_level < 68: # 68: multilinguism TranslatableMessage.do_table() + if sql_level < 71: + # 71: create testdef table + TestDef.do_table() if sql_level < 52: # 2: introduction of formdef_id in views # 5: add concerned_roles_array, is_at_endpoint and fts to views diff --git a/wcs/testdef.py b/wcs/testdef.py new file mode 100644 index 000000000..0e34f0428 --- /dev/null +++ b/wcs/testdef.py @@ -0,0 +1,28 @@ +# w.c.s. - web application for online forms +# Copyright (C) 2005-2022 Entr'ouvert +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, see . + +from wcs import sql + + +class TestDef(sql.TestDef): + _names = 'testdef' + + name = '' + slug = None + object_type = None # (formdef, carddef, etc.) + object_id = None + + data = None # (json export of formdata, carddata, etc.) -- 2.35.1