Projet

Général

Profil

0001-Switch-wcs_all_forms-to-a-table-maintained-by-trigge.patch

Pierre Ducroquet, 19 janvier 2022 14:14

Télécharger (21,4 ko)

Voir les différences:

Subject: [PATCH] Switch wcs_all_forms to a table maintained by triggers
 (#60552)

Instead of a view, wcs_all_forms is now a table.
Direct benefits:
- we will no longer explode the collapse limit of the PG optimizer
- since we won't read every table, we can be **much** faster
(we aren't yet, I have not done the indexes so far)
- the optimizer will have less work, thus it will be faster
(I said it already)
 tests/test_sql.py |  34 +----
 wcs/sql.py        | 316 +++++++++++++++++++++++++++++++++++++++-------
 2 files changed, 278 insertions(+), 72 deletions(-)
tests/test_sql.py
1115 1115
def test_migration_2_formdef_id_in_views(formdef):
1116 1116
    conn, cur = sql.get_connection_and_cursor()
1117 1117
    cur.execute('UPDATE wcs_meta SET value = 1 WHERE key = %s', ('sql_level',))
1118
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1118
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1119 1119

  
1120 1120
    # hack a formdef table the wrong way, to check it is reconstructed
1121 1121
    # properly before the views are created
......
1127 1127
    assert table_exists(cur, 'wcs_view_1_tests')
1128 1128
    assert not column_exists_in_table(cur, 'wcs_view_1_tests', 'f4_display')
1129 1129

  
1130
    view_names = []
1131
    cur.execute(
1132
        '''SELECT table_name FROM information_schema.views
1133
                   WHERE table_name LIKE %s''',
1134
        ('wcs_view_%',),
1135
    )
1136
    while True:
1137
        row = cur.fetchone()
1138
        if row is None:
1139
            break
1140
        view_names.append(row[0])
1141

  
1142
    fake_formdef = FormDef()
1143
    common_fields = sql.get_view_fields(fake_formdef)
1144
    # remove formdef_id for the purpose of this test
1145
    common_fields.remove([x for x in common_fields if x[1] == 'formdef_id'][0])
1146

  
1147
    union = ' UNION '.join(
1148
        ['''SELECT %s FROM %s''' % (', '.join([y[1] for y in common_fields]), x) for x in view_names]
1149
    )
1150
    assert 'formdef_id' not in union
1151
    cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
1152

  
1153 1130
    sql.migrate()
1154 1131

  
1155 1132
    assert column_exists_in_table(cur, 'wcs_all_forms', 'formdef_id')
......
1162 1139
def test_migration_6_actions_roles(formdef):
1163 1140
    conn, cur = sql.get_connection_and_cursor()
1164 1141
    cur.execute('UPDATE wcs_meta SET value = 5 WHERE key = %s', ('sql_level',))
1165
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1142
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1166 1143

  
1167 1144
    # hack a formdef table the wrong way, to check it is reconstructed
1168 1145
    # properly before the views are created
......
1187 1164
def test_migration_10_submission_channel(formdef):
1188 1165
    conn, cur = sql.get_connection_and_cursor()
1189 1166
    cur.execute('UPDATE wcs_meta SET value = 9 WHERE key = %s', ('sql_level',))
1190
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1167
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1191 1168

  
1192 1169
    # hack a formdef table the wrong way, to check it is reconstructed
1193 1170
    # properly before the views are created
......
1338 1315
    for table_name in table_names:
1339 1316
        if table_name.startswith('formdata_'):
1340 1317
            cur.execute('DROP TABLE %s CASCADE' % table_name)
1318
    cur.execute('TRUNCATE wcs_all_forms;')
1341 1319

  
1342 1320

  
1343 1321
def test_is_at_endpoint(pub):
......
1990 1968
def test_migration_31_user_label(formdef):
1991 1969
    conn, cur = sql.get_connection_and_cursor()
1992 1970
    cur.execute('UPDATE wcs_meta SET value = 30 WHERE key = %s', ('sql_level',))
1993
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1971
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1994 1972

  
1995 1973
    cur.execute('DROP VIEW wcs_view_1_tests')
1996 1974
    cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN user_label')
......
2020 1998

  
2021 1999
    conn, cur = sql.get_connection_and_cursor()
2022 2000
    cur.execute('UPDATE wcs_meta SET value = 37 WHERE key = %s', ('sql_level',))
2023
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
2001
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
2024 2002

  
2025 2003
    cur.execute('DROP VIEW wcs_view_1_tests')
2026 2004
    cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN submission_agent_id')
wcs/sql.py
431 431
    formdef.store(object_only=True)
432 432
    return formdef.table_name
433 433

  
434
def get_formdef_trigger_function_name(formdef):
435
    assert formdef.id is not None
436
    return '%s_%s_trigger_fn' % (formdef.data_sql_prefix, formdef.id)
437

  
438
def get_formdef_trigger_name(formdef):
439
    assert formdef.id is not None
440
    return '%s_%s_trigger' % (formdef.data_sql_prefix, formdef.id)
434 441

  
435 442
def get_formdef_new_id(id_start):
436 443
    new_id = id_start
......
477 484
        ('formdata\\_%%\\_%%',),
478 485
    )
479 486
    for table_name in [x[0] for x in cur.fetchall()]:
487
        cur.execute('DELETE FROM %s' % table_name)  # Force trigger execution
480 488
        cur.execute('''DROP TABLE %s CASCADE''' % table_name)
489
    cur.execute("SELECT relkind FROM pg_class WHERE relname = 'wcs_all_forms'")
490
    row = cur.fetchone()
491
    # only do the delete if wcs_all_forms is a table and not still a view
492
    if row is not None and row[0] == 'r':
493
        cur.execute("DELETE FROM wcs_all_forms WHERE object_type = 'formdata'")
481 494
    conn.commit()
482 495
    cur.close()
483 496

  
......
491 504
        ('carddata\\_%%\\_%%',),
492 505
    )
493 506
    for table_name in [x[0] for x in cur.fetchall()]:
507
        cur.execute('DELETE FROM %s' % table_name)  # Force trigger execution
494 508
        cur.execute('''DROP TABLE %s CASCADE''' % table_name)
495 509
    conn.commit()
496 510
    cur.close()
......
689 703
    for field in existing_fields - needed_fields:
690 704
        cur.execute('''ALTER TABLE %s DROP COLUMN %s CASCADE''' % (table_name, field))
691 705

  
706

  
707
    # recreate the trigger function, just so it's uptodate
708
    category_value = formdef.category_id
709
    geoloc_base_x_query = "NULL"
710
    geoloc_base_y_query = "NULL"
711
    if formdef.geolocations and 'base' in formdef.geolocations:
712
        # default geolocation is in the 'base' key; we have to unstructure the
713
        # field is the POINT type of postgresql cannot be used directly as it
714
        # doesn't have an equality operator.
715
        geoloc_base_x_query = "NEW.geoloc_base[0]"
716
        geoloc_base_y_query = "NEW.geoloc_base[1]"
717
    if formdef.category_id is None:
718
        category_value = "NULL"
719
    criticality_levels = len(formdef.workflow.criticality_levels or [0])
720
    endpoint_status = formdef.workflow.get_endpoint_status()
721
    endpoint_status_filter = ", ".join(["'wf-%s'" % x.id for x in endpoint_status])
722
    if endpoint_status_filter == "":
723
        # not the prettiest in town, but will do fine for now.
724
        endpoint_status_filter = "'xxxx'"
725
    object_type = formdef.data_sql_prefix   # shortcut.
726
    formed_name_quotedstring = psycopg2.extensions.QuotedString(formdef.name)
727
    formed_name_quotedstring.encoding = 'utf8'
728
    formdef_name = formed_name_quotedstring.getquoted().decode()
729
    cur.execute('''
730
CREATE OR REPLACE FUNCTION {trg_fn_name}()
731
RETURNS trigger
732
LANGUAGE plpgsql
733
AS $$
734
BEGIN
735
    -- TODO : sync back from users change !
736
    IF TG_OP = 'DELETE' THEN
737
        DELETE FROM wcs_all_forms WHERE formdef_id = {formdef_id} AND id = OLD.id AND object_type = '{object_type}';
738
        RETURN OLD;
739
    ELSEIF TG_OP = 'INSERT' THEN
740
        INSERT INTO wcs_all_forms VALUES (
741
            {category_id},
742
            '{object_type}',
743
            {formdef_id},
744
            NEW.id,
745
            NEW.user_id,
746
            NEW.receipt_time,
747
            NEW.status,
748
            NEW.id_display,
749
            NEW.submission_agent_id,
750
            NEW.submission_channel,
751
            NEW.backoffice_submission,
752
            NEW.last_update_time,
753
            NEW.digests,
754
            NEW.user_label,
755
            NEW.concerned_roles_array,
756
            NEW.actions_roles_array,
757
            NEW.fts,
758
            NEW.status IN ({endpoint_status}),
759
            {formdef_name},
760
            (SELECT name FROM users WHERE users.id = CAST(NEW.user_id AS INTEGER)),
761
            NEW.criticality_level - {criticality_levels},
762
            {geoloc_base_x},
763
            {geoloc_base_y},
764
            NEW.anonymised);
765
        RETURN NEW;
766
    ELSE
767
        UPDATE wcs_all_forms SET
768
                user_id = NEW.user_id,
769
                receipt_time = NEW.receipt_time,
770
                status = NEW.status,
771
                id_display = NEW.id_display,
772
                submission_agent_id = NEW.submission_agent_id,
773
                submission_channel = NEW.submission_channel,
774
                backoffice_submission = NEW.backoffice_submission,
775
                last_update_time = NEW.last_update_time,
776
                digests = NEW.digests,
777
                user_label = NEW.user_label,
778
                concerned_roles_array = NEW.concerned_roles_array,
779
                actions_roles_array = NEW.actions_roles_array,
780
                fts = NEW.fts,
781
                is_at_endpoint = NEW.status IN ({endpoint_status}),
782
                formdef_name = {formdef_name},
783
                user_name = (SELECT name FROM users WHERE users.id = CAST(NEW.user_id AS INTEGER)),
784
                criticality_level = NEW.criticality_level - {criticality_levels},
785
                geoloc_base_x = {geoloc_base_x},
786
                geoloc_base_y = {geoloc_base_y},
787
                anonymised = NEW.anonymised
788
            WHERE formdef_id = {formdef_id}  AND id = OLD.id AND object_type = '{object_type}';
789
        RETURN NEW;
790
    END IF;
791
END;
792
$$;
793
    '''.format(trg_fn_name = get_formdef_trigger_function_name(formdef),
794
               category_id = category_value,                   # always valued ? need to handle null otherwise.
795
               formdef_id = formdef.id,
796
               geoloc_base_x = geoloc_base_x_query,
797
               geoloc_base_y = geoloc_base_y_query,
798
               formdef_name = formdef_name,
799
               criticality_levels = criticality_levels,
800
               endpoint_status = endpoint_status_filter,
801
               object_type = object_type
802
        ))
803

  
804
    trg_name = get_formdef_trigger_name(formdef)
805
    cur.execute(
806
        '''SELECT 1 FROM pg_trigger
807
            WHERE tgrelid = '%s'::regclass
808
              AND tgname = '%s'
809
        ''' % (table_name, trg_name))
810
    if len(cur.fetchall()) == 0:
811
        cur.execute(
812
            '''CREATE TRIGGER {trg_name} AFTER INSERT OR UPDATE OR DELETE
813
                ON {table_name}
814
                FOR EACH ROW EXECUTE FUNCTION {trg_fn_name}();
815
                '''.format(trg_fn_name = get_formdef_trigger_function_name(formdef),
816
                           table_name = table_name,
817
                           trg_name = trg_name))
818

  
692 819
    # migrations on _evolutions table
693 820
    cur.execute(
694 821
        '''SELECT column_name FROM information_schema.columns
......
1385 1512
    for view_name in view_names:
1386 1513
        cur.execute('''DROP VIEW IF EXISTS %s''' % view_name)
1387 1514

  
1388
    cur.execute('''DROP VIEW IF EXISTS wcs_all_forms''')
1389

  
1390

  
1391 1515
def do_global_views(conn, cur):
1392 1516
    # recreate global views
1393
    from wcs.formdef import FormDef
1394

  
1395
    view_names = [get_formdef_view_name(x) for x in FormDef.select(ignore_migration=True)]
1396

  
1397
    cur.execute(
1398
        '''SELECT table_name FROM information_schema.views
1399
                    WHERE table_schema = 'public'
1400
                      AND table_name LIKE %s''',
1401
        ('wcs\\_view\\_%',),
1402
    )
1403
    existing_views = set()
1404
    while True:
1405
        row = cur.fetchone()
1406
        if row is None:
1407
            break
1408
        existing_views.add(row[0])
1517
    from .formdef import FormDef
1518
    from .carddef import CardDef
1519

  
1520
    # XXX TODO: make me dynamic, please ?
1521
    cur.execute("""CREATE TABLE IF NOT EXISTS wcs_all_forms (
1522
        category_id integer,
1523
        object_type character varying NOT NULL, -- formdef or carddef
1524
        formdef_id integer NOT NULL,
1525
        id integer NOT NULL,
1526
        user_id character varying,
1527
        receipt_time timestamp without time zone,
1528
        status character varying,
1529
        id_display character varying,
1530
        submission_agent_id character varying,
1531
        submission_channel character varying,
1532
        backoffice_submission boolean,
1533
        last_update_time timestamp without time zone,
1534
        digests jsonb,
1535
        user_label character varying,
1536
        concerned_roles_array text[],
1537
        actions_roles_array text[],
1538
        fts tsvector,
1539
        is_at_endpoint boolean,
1540
        formdef_name text,
1541
        user_name character varying,
1542
        criticality_level integer,
1543
        geoloc_base_x double precision,
1544
        geoloc_base_y double precision,
1545
        anonymised timestamp with time zone
1546
        , PRIMARY KEY(object_type, formdef_id, id)
1547
    )""")
1548
    cur.execute('''CREATE INDEX IF NOT EXISTS %s_fts ON %s USING gin(fts)''' % ("wcs_all_forms", "wcs_all_forms"))
1409 1549

  
1410
    view_names = existing_views.intersection(view_names)
1411
    if not view_names:
1412
        return
1413

  
1414
    cur.execute('''DROP VIEW IF EXISTS wcs_all_forms CASCADE''')
1415

  
1416
    fake_formdef = FormDef()
1417
    common_fields = get_view_fields(fake_formdef)
1418
    common_fields.append(('concerned_roles_array', 'concerned_roles_array'))
1419
    common_fields.append(('actions_roles_array', 'actions_roles_array'))
1420
    common_fields.append(('fts', 'fts'))
1421
    common_fields.append(('is_at_endpoint', 'is_at_endpoint'))
1422
    common_fields.append(('formdef_name', 'formdef_name'))
1423
    common_fields.append(('user_name', 'user_name'))
1424
    common_fields.append(('criticality_level', 'criticality_level'))
1425
    common_fields.append(('geoloc_base_x', 'geoloc_base_x'))
1426
    common_fields.append(('geoloc_base_y', 'geoloc_base_y'))
1427
    common_fields.append(('anonymised', 'anonymised'))
1428

  
1429
    union = ' UNION ALL '.join(
1430
        ['''SELECT %s FROM %s''' % (', '.join([y[1] for y in common_fields]), x) for x in view_names]
1431
    )
1432
    cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
1550
    for attr in ('receipt_time', 'anonymised', 'user_id', 'status'):
1551
        cur.execute('''CREATE INDEX IF NOT EXISTS %s_%s ON %s (%s)''' % ("wcs_all_forms", attr, "wcs_all_forms", attr))
1552
    for attr in ('concerned_roles_array', 'actions_roles_array'):
1553
        cur.execute('''CREATE INDEX IF NOT EXISTS %s_%s ON %s USING gin (%s)''' % ("wcs_all_forms", attr, "wcs_all_forms", attr))
1554

  
1555
    # Purge of any dead data
1556
    valid_data = {FormDef.data_sql_prefix: set(), CardDef.data_sql_prefix: set()}
1557
    for formdef in FormDef.select(ignore_migration=True):
1558
        valid_data[formdef.data_sql_prefix].add(formdef.id)
1559
    for object_type, valid_ids in valid_data.items():
1560
        if valid_ids:
1561
            cur.execute("DELETE FROM wcs_all_forms WHERE object_type = '%s' AND formdef_id NOT IN (%s)" % (object_type, ", ".join(valid_ids)))
1562
        else:
1563
            cur.execute("DELETE FROM wcs_all_forms WHERE object_type = '%s'" % object_type)
1564
    ###fake_formdef = FormDef()
1565
    ###common_fields = get_view_fields(fake_formdef)
1566
    ###common_fields.append(('concerned_roles_array', 'concerned_roles_array'))
1567
    ###common_fields.append(('actions_roles_array', 'actions_roles_array'))
1568
    ###common_fields.append(('fts', 'fts'))
1569
    ###common_fields.append(('is_at_endpoint', 'is_at_endpoint'))
1570
    ###common_fields.append(('formdef_name', 'formdef_name'))
1571
    ###common_fields.append(('user_name', 'user_name'))
1572
    ###common_fields.append(('criticality_level', 'criticality_level'))
1573
    ###common_fields.append(('geoloc_base_x', 'geoloc_base_x'))
1574
    ###common_fields.append(('geoloc_base_y', 'geoloc_base_y'))
1575
    ###common_fields.append(('anonymised', 'anonymised'))
1433 1576

  
1434 1577
    for category in wcs.categories.Category.select():
1435 1578
        name = get_name_as_sql_identifier(category.url_name)[:40]
......
2423 2566
        conn, cur = get_connection_and_cursor()
2424 2567
        if drop:
2425 2568
            cur.execute('''DROP TABLE %s_evolutions CASCADE''' % cls._table_name)
2569
            cur.execute('''DELETE FROM %s''' % cls._table_name) # force trigger execution first.
2426 2570
            cur.execute('''DROP TABLE %s CASCADE''' % cls._table_name)
2427 2571
        else:
2428 2572
            cur.execute('''DELETE FROM %s_evolutions''' % cls._table_name)
......
3509 3653
# latest migration, number + description (description is not used
3510 3654
# programmaticaly but will make sure git conflicts if two migrations are
3511 3655
# separately added with the same number)
3512
SQL_LEVEL = (56, 'add gin indexes to concerned_roles_array and actions_roles_array')
3656
SQL_LEVEL = (57, 'switch wcs_all_forms to a trigger-maintained table')
3513 3657

  
3514 3658

  
3515 3659
def migrate_global_views(conn, cur):
......
3588 3732
    conn, cur = get_connection_and_cursor()
3589 3733
    sql_level = get_sql_level(conn, cur)
3590 3734
    if sql_level < 0:
3591
        # fake code to help in tetsting the error code path.
3735
        # fake code to help in testing the error code path.
3592 3736
        raise RuntimeError()
3593 3737
    if sql_level < 1:  # 1: introduction of tracking_code table
3594 3738
        do_tracking_code_table()
......
3701 3845
        # 50: switch role uuid column to varchar
3702 3846
        do_role_table()
3703 3847
        migrate_legacy_roles()
3848
    if sql_level < 57:
3849
        cur.execute("SELECT relkind FROM pg_class WHERE relname = 'wcs_all_forms';")
3850
        rows = cur.fetchall()
3851
        if len(rows) != 0:
3852
            if rows[0][0] == 'v':
3853
                # force wcs_all_forms table creation
3854
                cur.execute('DROP VIEW IF EXISTS wcs_all_forms CASCADE;')
3855
                do_global_views()
3856
            else:
3857
                assert(rows[0][0] == 'r')
3858
        else:
3859
            do_global_views()
3860

  
3861
        # assert there is no row, ie we are doing a clean migration (special case with unit tests likely) ?
3862
        #cur.execute("SELECT COUNT(*) FROM wcs_all_forms;")
3863
        #assert(cur.fetchone()[0] == 0)
3864

  
3865
        # now copy all data into the table
3866
        from wcs.carddef import CardDef
3867
        from wcs.formdef import FormDef
3868

  
3869
        for formdef in FormDef.select() + CardDef.select():
3870
            category_value = formdef.category_id
3871
            if formdef.category_id is None:
3872
                category_value = "NULL"
3873
            geoloc_base_x_query = "NULL"
3874
            geoloc_base_y_query = "NULL"
3875
            if formdef.geolocations and 'base' in formdef.geolocations:
3876
                # default geolocation is in the 'base' key; we have to unstructure the
3877
                # field is the POINT type of postgresql cannot be used directly as it
3878
                # doesn't have an equality operator.
3879
                geoloc_base_x_query = "geoloc_base[0]"
3880
                geoloc_base_y_query = "geoloc_base[1]"
3881
            criticality_levels = len(formdef.workflow.criticality_levels or [0])
3882
            endpoint_status = formdef.workflow.get_endpoint_status()
3883
            endpoint_status_filter = ", ".join(["'wf-%s'" % x.id for x in endpoint_status])
3884
            if endpoint_status_filter == "":
3885
                # not the prettiest in town, but will do fine for now.
3886
                endpoint_status_filter = "'xxxx'"
3887
            object_type = formdef.data_sql_prefix   # shortcut.
3888
            formed_name_quotedstring = psycopg2.extensions.QuotedString(formdef.name)
3889
            formed_name_quotedstring.encoding = 'utf8'
3890
            formdef_name = formed_name_quotedstring.getquoted().decode()
3891
            cur.execute("""
3892
                INSERT INTO wcs_all_forms
3893
                SELECT
3894
                    {category_id},
3895
                    '{object_type}',
3896
                    {formdef_id},
3897
                    id,
3898
                    user_id,
3899
                    receipt_time,
3900
                    status,
3901
                    id_display,
3902
                    submission_agent_id,
3903
                    submission_channel,
3904
                    backoffice_submission,
3905
                    last_update_time,
3906
                    digests,
3907
                    user_label,
3908
                    concerned_roles_array,
3909
                    actions_roles_array,
3910
                    fts,
3911
                    status IN ({endpoint_status}),
3912
                    {formdef_name},
3913
                    (SELECT name FROM users WHERE users.id = CAST(user_id AS INTEGER)),
3914
                    criticality_level - {criticality_levels},
3915
                    {geoloc_base_x},
3916
                    {geoloc_base_y},
3917
                    anonymised
3918
                FROM {table_name}
3919
                ON CONFLICT DO NOTHING;
3920
                    """.format(
3921
                        table_name = get_formdef_table_name(formdef),
3922
                        category_id = category_value,                   # always valued ? need to handle null otherwise.
3923
                        formdef_id = formdef.id,
3924
                        geoloc_base_x = geoloc_base_x_query,
3925
                        geoloc_base_y = geoloc_base_y_query,
3926
                        formdef_name = formdef_name,
3927
                        criticality_levels = criticality_levels,
3928
                        endpoint_status = endpoint_status_filter,
3929
                        object_type = object_type
3930
                    ))
3931

  
3704 3932

  
3705 3933
    cur.execute('''UPDATE wcs_meta SET value = %s WHERE key = %s''', (str(SQL_LEVEL[0]), 'sql_level'))
3706 3934

  
3707
-