Projet

Général

Profil

0001-testdef-add-new-table-to-hold-form-tests-71296.patch

Valentin Deniaud, 14 décembre 2022 18:09

Télécharger (6,29 ko)

Voir les différences:

Subject: [PATCH 1/3] testdef: add new table to hold form tests (#71296)

 tests/test_sql.py | 21 ++++++++++++
 wcs/sql.py        | 85 ++++++++++++++++++++++++++++++++++++++++++++++-
 wcs/testdef.py    | 28 ++++++++++++++++
 3 files changed, 133 insertions(+), 1 deletion(-)
 create mode 100644 wcs/testdef.py
tests/test_sql.py
15 15
from wcs.formdata import Evolution
16 16
from wcs.formdef import FormDef
17 17
from wcs.qommon import force_str
18
from wcs.testdef import TestDef
18 19
from wcs.wf.register_comment import RegisterCommenterWorkflowStatusItem
19 20
from wcs.workflows import Workflow, WorkflowCriticalityLevel
20 21

  
......
2318 2319
    # check it's no longer needed afterwards
2319 2320
    sql.migrate()
2320 2321
    assert sql.is_reindex_needed('anonymise', conn=conn, cur=cur) is False
2322

  
2323

  
2324
def test_sql_testdef_unicity(pub):
2325
    TestDef.do_table()
2326

  
2327
    testdef = TestDef()
2328
    testdef.slug = 'test-1'
2329
    testdef.object_type = 'formdef'
2330
    testdef.object_id = '1'
2331
    testdef.store()
2332

  
2333
    # same slug, different object_id
2334
    testdef.id = None
2335
    testdef.object_id = '2'
2336
    testdef.store()
2337

  
2338
    # same slug, object_id and object_type
2339
    testdef.id = None
2340
    with pytest.raises(psycopg2.errors.UniqueViolation):
2341
        testdef.store()
wcs/sql.py
4282 4282
        return catalog
4283 4283

  
4284 4284

  
4285
class TestDef(SqlMixin):
4286
    _table_name = 'testdef'
4287
    _table_static_fields = [
4288
        ('id', 'serial'),
4289
        ('name', 'varchar'),
4290
        ('slug', 'varchar'),
4291
        ('object_type', 'varchar'),
4292
        ('object_id', 'varchar'),
4293
        ('data', 'jsonb'),
4294
    ]
4295

  
4296
    id = None
4297

  
4298
    @classmethod
4299
    @guard_postgres
4300
    def do_table(cls, conn=None, cur=None):
4301
        conn, cur = get_connection_and_cursor()
4302
        table_name = cls._table_name
4303

  
4304
        cur.execute(
4305
            '''SELECT COUNT(*) FROM information_schema.tables
4306
                        WHERE table_schema = 'public'
4307
                          AND table_name = %s''',
4308
            (table_name,),
4309
        )
4310
        if cur.fetchone()[0] == 0:
4311
            cur.execute(
4312
                '''CREATE TABLE %s (id SERIAL PRIMARY KEY,
4313
                                            name varchar,
4314
                                            slug varchar NOT NULL,
4315
                                            object_type varchar NOT NULL,
4316
                                            object_id varchar NOT NULL,
4317
                                            data jsonb,
4318
                                            UNIQUE(slug, object_type, object_id)
4319
                                            )'''
4320
                % table_name
4321
            )
4322

  
4323
        conn.commit()
4324
        cur.close()
4325

  
4326
    @guard_postgres
4327
    def store(self):
4328
        sql_dict = {x[0]: getattr(self, x[0], None) for x in self._table_static_fields if x[0] != 'id'}
4329

  
4330
        conn, cur = get_connection_and_cursor()
4331
        column_names = list(sql_dict.keys())
4332
        if not self.id:
4333
            sql_statement = '''INSERT INTO %s (id, %s)
4334
                               VALUES (DEFAULT, %s)
4335
                               RETURNING id''' % (
4336
                self._table_name,
4337
                ', '.join(column_names),
4338
                ', '.join(['%%(%s)s' % x for x in column_names]),
4339
            )
4340
            cur.execute(sql_statement, sql_dict)
4341
            self.id = cur.fetchone()[0]
4342
        else:
4343
            sql_dict['id'] = self.id
4344
            sql_statement = '''UPDATE %s SET %s WHERE id = %%(id)s RETURNING id''' % (
4345
                self._table_name,
4346
                ', '.join(['%s = %%(%s)s' % (x, x) for x in column_names]),
4347
            )
4348
            cur.execute(sql_statement, sql_dict)
4349

  
4350
        conn.commit()
4351
        cur.close()
4352

  
4353
    @classmethod
4354
    def _row2ob(cls, row, **kwargs):
4355
        o = cls.__new__(cls)
4356
        for attr, value in zip([x[0] for x in cls._table_static_fields], row):
4357
            setattr(o, attr, value)
4358
        return o
4359

  
4360
    @classmethod
4361
    def get_data_fields(cls):
4362
        return []
4363

  
4364

  
4285 4365
class classproperty:
4286 4366
    def __init__(self, f):
4287 4367
        self.f = f
......
4660 4740
# latest migration, number + description (description is not used
4661 4741
# programmaticaly but will make sure git conflicts if two migrations are
4662 4742
# separately added with the same number)
4663
SQL_LEVEL = (70, 'repair anonymisation')
4743
SQL_LEVEL = (71, 'add testdef table')
4664 4744

  
4665 4745

  
4666 4746
def migrate_global_views(conn, cur):
......
4842 4922
    if sql_level < 68:
4843 4923
        # 68: multilinguism
4844 4924
        TranslatableMessage.do_table()
4925
    if sql_level < 71:
4926
        # 71: create testdef table
4927
        TestDef.do_table()
4845 4928
    if sql_level < 52:
4846 4929
        # 2: introduction of formdef_id in views
4847 4930
        # 5: add concerned_roles_array, is_at_endpoint and fts to views
wcs/testdef.py
1
# w.c.s. - web application for online forms
2
# Copyright (C) 2005-2022  Entr'ouvert
3
#
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU General Public License for more details.
13
#
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16

  
17
from wcs import sql
18

  
19

  
20
class TestDef(sql.TestDef):
21
    _names = 'testdef'
22

  
23
    name = ''
24
    slug = None
25
    object_type = None  # (formdef, carddef, etc.)
26
    object_id = None
27

  
28
    data = None  # (json export of formdata, carddata, etc.)
0
-