Projet

Général

Profil

0001-sql-switch-wcs_all_forms-to-a-table-maintained-by-tr.patch

Frédéric Péters, 25 mars 2022 14:53

Télécharger (21,9 ko)

Voir les différences:

Subject: [PATCH 1/9] sql: switch wcs_all_forms to a table maintained by
 triggers (#60552)

Instead of a view, wcs_all_forms is now a table.

Direct benefits:
- we will no longer explode the collapse limit of the PG optimizer
- since we won't read every table, we can be **much** faster
  (we aren't yet, I have not done the indexes so far)
- the optimizer will have less work, thus it will be faster (I said it already)
 tests/test_sql.py |  34 +----
 wcs/sql.py        | 333 ++++++++++++++++++++++++++++++++++++++++------
 2 files changed, 299 insertions(+), 68 deletions(-)
tests/test_sql.py
1115 1115
def test_migration_2_formdef_id_in_views(formdef):
1116 1116
    conn, cur = sql.get_connection_and_cursor()
1117 1117
    cur.execute('UPDATE wcs_meta SET value = 1 WHERE key = %s', ('sql_level',))
1118
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1118
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1119 1119

  
1120 1120
    # hack a formdef table the wrong way, to check it is reconstructed
1121 1121
    # properly before the views are created
......
1127 1127
    assert table_exists(cur, 'wcs_view_1_tests')
1128 1128
    assert not column_exists_in_table(cur, 'wcs_view_1_tests', 'f4_display')
1129 1129

  
1130
    view_names = []
1131
    cur.execute(
1132
        '''SELECT table_name FROM information_schema.views
1133
                   WHERE table_name LIKE %s''',
1134
        ('wcs_view_%',),
1135
    )
1136
    while True:
1137
        row = cur.fetchone()
1138
        if row is None:
1139
            break
1140
        view_names.append(row[0])
1141

  
1142
    fake_formdef = FormDef()
1143
    common_fields = sql.get_view_fields(fake_formdef)
1144
    # remove formdef_id for the purpose of this test
1145
    common_fields.remove([x for x in common_fields if x[1] == 'formdef_id'][0])
1146

  
1147
    union = ' UNION '.join(
1148
        ['''SELECT %s FROM %s''' % (', '.join([y[1] for y in common_fields]), x) for x in view_names]
1149
    )
1150
    assert 'formdef_id' not in union
1151
    cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
1152

  
1153 1130
    sql.migrate()
1154 1131

  
1155 1132
    assert column_exists_in_table(cur, 'wcs_all_forms', 'formdef_id')
......
1162 1139
def test_migration_6_actions_roles(formdef):
1163 1140
    conn, cur = sql.get_connection_and_cursor()
1164 1141
    cur.execute('UPDATE wcs_meta SET value = 5 WHERE key = %s', ('sql_level',))
1165
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1142
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1166 1143

  
1167 1144
    # hack a formdef table the wrong way, to check it is reconstructed
1168 1145
    # properly before the views are created
......
1187 1164
def test_migration_10_submission_channel(formdef):
1188 1165
    conn, cur = sql.get_connection_and_cursor()
1189 1166
    cur.execute('UPDATE wcs_meta SET value = 9 WHERE key = %s', ('sql_level',))
1190
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1167
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1191 1168

  
1192 1169
    # hack a formdef table the wrong way, to check it is reconstructed
1193 1170
    # properly before the views are created
......
1338 1315
    for table_name in table_names:
1339 1316
        if table_name.startswith('formdata_'):
1340 1317
            cur.execute('DROP TABLE %s CASCADE' % table_name)
1318
    cur.execute('TRUNCATE wcs_all_forms;')
1341 1319

  
1342 1320

  
1343 1321
def test_is_at_endpoint(pub):
......
2017 1995
def test_migration_31_user_label(formdef):
2018 1996
    conn, cur = sql.get_connection_and_cursor()
2019 1997
    cur.execute('UPDATE wcs_meta SET value = 30 WHERE key = %s', ('sql_level',))
2020
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1998
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
2021 1999

  
2022 2000
    cur.execute('DROP VIEW wcs_view_1_tests')
2023 2001
    cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN user_label')
......
2047 2025

  
2048 2026
    conn, cur = sql.get_connection_and_cursor()
2049 2027
    cur.execute('UPDATE wcs_meta SET value = 37 WHERE key = %s', ('sql_level',))
2050
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
2028
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
2051 2029

  
2052 2030
    cur.execute('DROP VIEW wcs_view_1_tests')
2053 2031
    cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN submission_agent_id')
wcs/sql.py
513 513
    return formdef.table_name
514 514

  
515 515

  
516
def get_formdef_trigger_function_name(formdef):
517
    assert formdef.id is not None
518
    return '%s_%s_trigger_fn' % (formdef.data_sql_prefix, formdef.id)
519

  
520

  
521
def get_formdef_trigger_name(formdef):
522
    assert formdef.id is not None
523
    return '%s_%s_trigger' % (formdef.data_sql_prefix, formdef.id)
524

  
525

  
516 526
def get_formdef_new_id(id_start):
517 527
    new_id = id_start
518 528
    conn, cur = get_connection_and_cursor()
......
558 568
        ('formdata\\_%%\\_%%',),
559 569
    )
560 570
    for table_name in [x[0] for x in cur.fetchall()]:
571
        cur.execute('DELETE FROM %s' % table_name)  # Force trigger execution
561 572
        cur.execute('''DROP TABLE %s CASCADE''' % table_name)
573
    cur.execute("SELECT relkind FROM pg_class WHERE relname = 'wcs_all_forms'")
574
    row = cur.fetchone()
575
    # only do the delete if wcs_all_forms is a table and not still a view
576
    if row is not None and row[0] == 'r':
577
        cur.execute("DELETE FROM wcs_all_forms WHERE object_type = 'formdata'")
562 578
    conn.commit()
563 579
    cur.close()
564 580

  
......
572 588
        ('carddata\\_%%\\_%%',),
573 589
    )
574 590
    for table_name in [x[0] for x in cur.fetchall()]:
591
        cur.execute('DELETE FROM %s' % table_name)  # Force trigger execution
575 592
        cur.execute('''DROP TABLE %s CASCADE''' % table_name)
576 593
    conn.commit()
577 594
    cur.close()
......
775 792
    for field in existing_fields - needed_fields:
776 793
        cur.execute('''ALTER TABLE %s DROP COLUMN %s CASCADE''' % (table_name, field))
777 794

  
795
    # recreate the trigger function, just so it's uptodate
796
    category_value = formdef.category_id
797
    geoloc_base_x_query = "NULL"
798
    geoloc_base_y_query = "NULL"
799
    if formdef.geolocations and 'base' in formdef.geolocations:
800
        # default geolocation is in the 'base' key; we have to unstructure the
801
        # field is the POINT type of postgresql cannot be used directly as it
802
        # doesn't have an equality operator.
803
        geoloc_base_x_query = "NEW.geoloc_base[0]"
804
        geoloc_base_y_query = "NEW.geoloc_base[1]"
805
    if formdef.category_id is None:
806
        category_value = "NULL"
807
    criticality_levels = len(formdef.workflow.criticality_levels or [0])
808
    endpoint_status = formdef.workflow.get_endpoint_status()
809
    endpoint_status_filter = ", ".join(["'wf-%s'" % x.id for x in endpoint_status])
810
    if endpoint_status_filter == "":
811
        # not the prettiest in town, but will do fine for now.
812
        endpoint_status_filter = "'xxxx'"
813
    object_type = formdef.data_sql_prefix  # shortcut.
814
    formed_name_quotedstring = psycopg2.extensions.QuotedString(formdef.name)
815
    formed_name_quotedstring.encoding = 'utf8'
816
    formdef_name = formed_name_quotedstring.getquoted().decode()
817
    cur.execute(
818
        '''
819
CREATE OR REPLACE FUNCTION {trg_fn_name}()
820
RETURNS trigger
821
LANGUAGE plpgsql
822
AS $$
823
BEGIN
824
    -- TODO : sync back from users change !
825
    IF TG_OP = 'DELETE' THEN
826
        DELETE FROM wcs_all_forms WHERE formdef_id = {formdef_id} AND id = OLD.id AND object_type = '{object_type}';
827
        RETURN OLD;
828
    ELSEIF TG_OP = 'INSERT' THEN
829
        INSERT INTO wcs_all_forms VALUES (
830
            {category_id},
831
            '{object_type}',
832
            {formdef_id},
833
            NEW.id,
834
            NEW.user_id,
835
            NEW.receipt_time,
836
            NEW.status,
837
            NEW.id_display,
838
            NEW.submission_agent_id,
839
            NEW.submission_channel,
840
            NEW.backoffice_submission,
841
            NEW.last_update_time,
842
            NEW.digests,
843
            NEW.user_label,
844
            NEW.concerned_roles_array,
845
            NEW.actions_roles_array,
846
            NEW.fts,
847
            NEW.status IN ({endpoint_status}),
848
            {formdef_name},
849
            (SELECT name FROM users WHERE users.id = CAST(NEW.user_id AS INTEGER)),
850
            NEW.criticality_level - {criticality_levels},
851
            {geoloc_base_x},
852
            {geoloc_base_y},
853
            NEW.anonymised);
854
        RETURN NEW;
855
    ELSE
856
        UPDATE wcs_all_forms SET
857
                user_id = NEW.user_id,
858
                receipt_time = NEW.receipt_time,
859
                status = NEW.status,
860
                id_display = NEW.id_display,
861
                submission_agent_id = NEW.submission_agent_id,
862
                submission_channel = NEW.submission_channel,
863
                backoffice_submission = NEW.backoffice_submission,
864
                last_update_time = NEW.last_update_time,
865
                digests = NEW.digests,
866
                user_label = NEW.user_label,
867
                concerned_roles_array = NEW.concerned_roles_array,
868
                actions_roles_array = NEW.actions_roles_array,
869
                fts = NEW.fts,
870
                is_at_endpoint = NEW.status IN ({endpoint_status}),
871
                formdef_name = {formdef_name},
872
                user_name = (SELECT name FROM users WHERE users.id = CAST(NEW.user_id AS INTEGER)),
873
                criticality_level = NEW.criticality_level - {criticality_levels},
874
                geoloc_base_x = {geoloc_base_x},
875
                geoloc_base_y = {geoloc_base_y},
876
                anonymised = NEW.anonymised
877
            WHERE formdef_id = {formdef_id}  AND id = OLD.id AND object_type = '{object_type}';
878
        RETURN NEW;
879
    END IF;
880
END;
881
$$;
882
    '''.format(
883
            trg_fn_name=get_formdef_trigger_function_name(formdef),
884
            category_id=category_value,  # always valued ? need to handle null otherwise.
885
            formdef_id=formdef.id,
886
            geoloc_base_x=geoloc_base_x_query,
887
            geoloc_base_y=geoloc_base_y_query,
888
            formdef_name=formdef_name,
889
            criticality_levels=criticality_levels,
890
            endpoint_status=endpoint_status_filter,
891
            object_type=object_type,
892
        )
893
    )
894

  
895
    trg_name = get_formdef_trigger_name(formdef)
896
    cur.execute(
897
        '''SELECT 1 FROM pg_trigger
898
            WHERE tgrelid = '%s'::regclass
899
              AND tgname = '%s'
900
        '''
901
        % (table_name, trg_name)
902
    )
903
    if len(cur.fetchall()) == 0:
904
        cur.execute(
905
            '''CREATE TRIGGER {trg_name} AFTER INSERT OR UPDATE OR DELETE
906
                ON {table_name}
907
                FOR EACH ROW EXECUTE FUNCTION {trg_fn_name}();
908
                '''.format(
909
                trg_fn_name=get_formdef_trigger_function_name(formdef),
910
                table_name=table_name,
911
                trg_name=trg_name,
912
            )
913
        )
914

  
778 915
    # migrations on _evolutions table
779 916
    cur.execute(
780 917
        '''SELECT column_name FROM information_schema.columns
......
1524 1661
    for view_name in view_names:
1525 1662
        cur.execute('''DROP VIEW IF EXISTS %s''' % view_name)
1526 1663

  
1527
    cur.execute('''DROP VIEW IF EXISTS wcs_all_forms''')
1528

  
1529 1664

  
1530 1665
def do_global_views(conn, cur):
1531 1666
    # recreate global views
1532
    from wcs.formdef import FormDef
1533

  
1534
    view_names = [get_formdef_view_name(x) for x in FormDef.select(ignore_migration=True)]
1667
    from .carddef import CardDef
1668
    from .formdef import FormDef
1535 1669

  
1670
    # XXX TODO: make me dynamic, please ?
1536 1671
    cur.execute(
1537
        '''SELECT table_name FROM information_schema.views
1538
                    WHERE table_schema = 'public'
1539
                      AND table_name LIKE %s''',
1540
        ('wcs\\_view\\_%',),
1672
        """CREATE TABLE IF NOT EXISTS wcs_all_forms (
1673
        category_id integer,
1674
        object_type character varying NOT NULL, -- formdef or carddef
1675
        formdef_id integer NOT NULL,
1676
        id integer NOT NULL,
1677
        user_id character varying,
1678
        receipt_time timestamp without time zone,
1679
        status character varying,
1680
        id_display character varying,
1681
        submission_agent_id character varying,
1682
        submission_channel character varying,
1683
        backoffice_submission boolean,
1684
        last_update_time timestamp without time zone,
1685
        digests jsonb,
1686
        user_label character varying,
1687
        concerned_roles_array text[],
1688
        actions_roles_array text[],
1689
        fts tsvector,
1690
        is_at_endpoint boolean,
1691
        formdef_name text,
1692
        user_name character varying,
1693
        criticality_level integer,
1694
        geoloc_base_x double precision,
1695
        geoloc_base_y double precision,
1696
        anonymised timestamp with time zone
1697
        , PRIMARY KEY(object_type, formdef_id, id)
1698
    )"""
1699
    )
1700
    cur.execute(
1701
        '''CREATE INDEX IF NOT EXISTS %s_fts ON %s USING gin(fts)''' % ("wcs_all_forms", "wcs_all_forms")
1541 1702
    )
1542
    existing_views = set()
1543
    while True:
1544
        row = cur.fetchone()
1545
        if row is None:
1546
            break
1547
        existing_views.add(row[0])
1548 1703

  
1549
    view_names = existing_views.intersection(view_names)
1550
    if not view_names:
1551
        return
1704
    for attr in ('receipt_time', 'anonymised', 'user_id', 'status'):
1705
        cur.execute(
1706
            '''CREATE INDEX IF NOT EXISTS %s_%s ON %s (%s)''' % ("wcs_all_forms", attr, "wcs_all_forms", attr)
1707
        )
1708
    for attr in ('concerned_roles_array', 'actions_roles_array'):
1709
        cur.execute(
1710
            '''CREATE INDEX IF NOT EXISTS %s_%s ON %s USING gin (%s)'''
1711
            % ("wcs_all_forms", attr, "wcs_all_forms", attr)
1712
        )
1552 1713

  
1553
    cur.execute('''DROP VIEW IF EXISTS wcs_all_forms CASCADE''')
1554

  
1555
    fake_formdef = FormDef()
1556
    common_fields = get_view_fields(fake_formdef)
1557
    common_fields.append(('concerned_roles_array', 'concerned_roles_array'))
1558
    common_fields.append(('actions_roles_array', 'actions_roles_array'))
1559
    common_fields.append(('fts', 'fts'))
1560
    common_fields.append(('is_at_endpoint', 'is_at_endpoint'))
1561
    common_fields.append(('formdef_name', 'formdef_name'))
1562
    common_fields.append(('user_name', 'user_name'))
1563
    common_fields.append(('criticality_level', 'criticality_level'))
1564
    common_fields.append(('geoloc_base_x', 'geoloc_base_x'))
1565
    common_fields.append(('geoloc_base_y', 'geoloc_base_y'))
1566
    common_fields.append(('anonymised', 'anonymised'))
1567

  
1568
    union = ' UNION ALL '.join(
1569
        ['''SELECT %s FROM %s''' % (', '.join([y[1] for y in common_fields]), x) for x in view_names]
1570
    )
1571
    cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
1714
    # Purge of any dead data
1715
    valid_data = {FormDef.data_sql_prefix: set(), CardDef.data_sql_prefix: set()}
1716
    for formdef in FormDef.select(ignore_migration=True):
1717
        valid_data[formdef.data_sql_prefix].add(formdef.id)
1718
    for object_type, valid_ids in valid_data.items():
1719
        if valid_ids:
1720
            cur.execute(
1721
                "DELETE FROM wcs_all_forms WHERE object_type = '%s' AND formdef_id NOT IN (%s)"
1722
                % (object_type, ", ".join(valid_ids))
1723
            )
1724
        else:
1725
            cur.execute("DELETE FROM wcs_all_forms WHERE object_type = '%s'" % object_type)
1726
    ###fake_formdef = FormDef()
1727
    ###common_fields = get_view_fields(fake_formdef)
1728
    ###common_fields.append(('concerned_roles_array', 'concerned_roles_array'))
1729
    ###common_fields.append(('actions_roles_array', 'actions_roles_array'))
1730
    ###common_fields.append(('fts', 'fts'))
1731
    ###common_fields.append(('is_at_endpoint', 'is_at_endpoint'))
1732
    ###common_fields.append(('formdef_name', 'formdef_name'))
1733
    ###common_fields.append(('user_name', 'user_name'))
1734
    ###common_fields.append(('criticality_level', 'criticality_level'))
1735
    ###common_fields.append(('geoloc_base_x', 'geoloc_base_x'))
1736
    ###common_fields.append(('geoloc_base_y', 'geoloc_base_y'))
1737
    ###common_fields.append(('anonymised', 'anonymised'))
1572 1738

  
1573 1739
    for category in wcs.categories.Category.select():
1574 1740
        name = get_name_as_sql_identifier(category.url_name)[:40]
1575 1741
        cur.execute(
1576
            '''CREATE VIEW wcs_category_%s AS SELECT * from wcs_all_forms
1742
            '''CREATE OR REPLACE VIEW wcs_category_%s AS SELECT * from wcs_all_forms
1577 1743
                        WHERE category_id = %s'''
1578 1744
            % (name, category.id)
1579 1745
        )
......
2588 2754
        conn, cur = get_connection_and_cursor()
2589 2755
        if drop:
2590 2756
            cur.execute('''DROP TABLE %s_evolutions CASCADE''' % cls._table_name)
2757
            cur.execute('''DELETE FROM %s''' % cls._table_name)  # force trigger execution first.
2591 2758
            cur.execute('''DROP TABLE %s CASCADE''' % cls._table_name)
2592 2759
        else:
2593 2760
            cur.execute('''DELETE FROM %s_evolutions''' % cls._table_name)
......
3776 3943
# latest migration, number + description (description is not used
3777 3944
# programmaticaly but will make sure git conflicts if two migrations are
3778 3945
# separately added with the same number)
3779
SQL_LEVEL = (58, 'add workflow_merged_roles_dict')
3946
SQL_LEVEL = (59, 'switch wcs_all_forms to a trigger-maintained table')
3780 3947

  
3781 3948

  
3782 3949
def migrate_global_views(conn, cur):
......
3855 4022
    conn, cur = get_connection_and_cursor()
3856 4023
    sql_level = get_sql_level(conn, cur)
3857 4024
    if sql_level < 0:
3858
        # fake code to help in tetsting the error code path.
4025
        # fake code to help in testing the error code path.
3859 4026
        raise RuntimeError()
3860 4027
    if sql_level < 1:  # 1: introduction of tracking_code table
3861 4028
        do_tracking_code_table()
......
3982 4149
            do_formdef_tables(formdef, rebuild_views=False, rebuild_global_views=False)
3983 4150
        migrate_views(conn, cur)
3984 4151
        set_reindex('formdata', 'needed', conn=conn, cur=cur)
4152
    if sql_level < 59:
4153
        # switch wcs_all_forms to a trigger-maintained table
4154
        from wcs.carddef import CardDef
4155
        from wcs.formdef import FormDef
4156

  
4157
        cur.execute("SELECT relkind FROM pg_class WHERE relname = 'wcs_all_forms';")
4158
        rows = cur.fetchall()
4159
        if len(rows) != 0:
4160
            if rows[0][0] == 'v':
4161
                # force wcs_all_forms table creation
4162
                cur.execute('DROP VIEW IF EXISTS wcs_all_forms CASCADE;')
4163
                do_global_views(conn, cur)
4164
            else:
4165
                assert rows[0][0] == 'r'
4166
        else:
4167
            do_global_views(conn, cur)
4168

  
4169
        # assert there is no row, ie we are doing a clean migration (special case with unit tests likely) ?
4170
        # cur.execute("SELECT COUNT(*) FROM wcs_all_forms;")
4171
        # assert(cur.fetchone()[0] == 0)
4172

  
4173
        # now copy all data into the table
4174
        for formdef in FormDef.select() + CardDef.select():
4175
            category_value = formdef.category_id
4176
            if formdef.category_id is None:
4177
                category_value = "NULL"
4178
            geoloc_base_x_query = "NULL"
4179
            geoloc_base_y_query = "NULL"
4180
            if formdef.geolocations and 'base' in formdef.geolocations:
4181
                # default geolocation is in the 'base' key; we have to unstructure the
4182
                # field is the POINT type of postgresql cannot be used directly as it
4183
                # doesn't have an equality operator.
4184
                geoloc_base_x_query = "geoloc_base[0]"
4185
                geoloc_base_y_query = "geoloc_base[1]"
4186
            criticality_levels = len(formdef.workflow.criticality_levels or [0])
4187
            endpoint_status = formdef.workflow.get_endpoint_status()
4188
            endpoint_status_filter = ", ".join(["'wf-%s'" % x.id for x in endpoint_status])
4189
            if endpoint_status_filter == "":
4190
                # not the prettiest in town, but will do fine for now.
4191
                endpoint_status_filter = "'xxxx'"
4192
            object_type = formdef.data_sql_prefix  # shortcut.
4193
            formed_name_quotedstring = psycopg2.extensions.QuotedString(formdef.name)
4194
            formed_name_quotedstring.encoding = 'utf8'
4195
            formdef_name = formed_name_quotedstring.getquoted().decode()
4196
            cur.execute(
4197
                """
4198
                INSERT INTO wcs_all_forms
4199
                SELECT
4200
                    {category_id},
4201
                    '{object_type}',
4202
                    {formdef_id},
4203
                    id,
4204
                    user_id,
4205
                    receipt_time,
4206
                    status,
4207
                    id_display,
4208
                    submission_agent_id,
4209
                    submission_channel,
4210
                    backoffice_submission,
4211
                    last_update_time,
4212
                    digests,
4213
                    user_label,
4214
                    concerned_roles_array,
4215
                    actions_roles_array,
4216
                    fts,
4217
                    status IN ({endpoint_status}),
4218
                    {formdef_name},
4219
                    (SELECT name FROM users WHERE users.id = CAST(user_id AS INTEGER)),
4220
                    criticality_level - {criticality_levels},
4221
                    {geoloc_base_x},
4222
                    {geoloc_base_y},
4223
                    anonymised
4224
                FROM {table_name}
4225
                ON CONFLICT DO NOTHING;
4226
                    """.format(
4227
                    table_name=get_formdef_table_name(formdef),
4228
                    category_id=category_value,  # always valued ? need to handle null otherwise.
4229
                    formdef_id=formdef.id,
4230
                    geoloc_base_x=geoloc_base_x_query,
4231
                    geoloc_base_y=geoloc_base_y_query,
4232
                    formdef_name=formdef_name,
4233
                    criticality_levels=criticality_levels,
4234
                    endpoint_status=endpoint_status_filter,
4235
                    object_type=object_type,
4236
                )
4237
            )
3985 4238

  
3986 4239
    cur.execute('''UPDATE wcs_meta SET value = %s WHERE key = %s''', (str(SQL_LEVEL[0]), 'sql_level'))
3987 4240

  
3988
-