Projet

Général

Profil

0001-Switch-wcs_all_forms-to-a-table-maintained-by-trigge.patch

Pierre Ducroquet, 19 janvier 2022 23:49

Télécharger (21,8 ko)

Voir les différences:

Subject: [PATCH] Switch wcs_all_forms to a table maintained by triggers
 (#60552)

Instead of a view, wcs_all_forms is now a table.
Direct benefits:
- we will no longer explode the collapse limit of the PG optimizer
- since we won't read every table, we can be **much** faster
(we aren't yet, I have not done the indexes so far)
- the optimizer will have less work, thus it will be faster
(I said it already)
 tests/test_sql.py |  34 +----
 wcs/sql.py        | 332 ++++++++++++++++++++++++++++++++++++++++------
 2 files changed, 298 insertions(+), 68 deletions(-)
tests/test_sql.py
1115 1115
def test_migration_2_formdef_id_in_views(formdef):
1116 1116
    conn, cur = sql.get_connection_and_cursor()
1117 1117
    cur.execute('UPDATE wcs_meta SET value = 1 WHERE key = %s', ('sql_level',))
1118
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1118
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1119 1119

  
1120 1120
    # hack a formdef table the wrong way, to check it is reconstructed
1121 1121
    # properly before the views are created
......
1127 1127
    assert table_exists(cur, 'wcs_view_1_tests')
1128 1128
    assert not column_exists_in_table(cur, 'wcs_view_1_tests', 'f4_display')
1129 1129

  
1130
    view_names = []
1131
    cur.execute(
1132
        '''SELECT table_name FROM information_schema.views
1133
                   WHERE table_name LIKE %s''',
1134
        ('wcs_view_%',),
1135
    )
1136
    while True:
1137
        row = cur.fetchone()
1138
        if row is None:
1139
            break
1140
        view_names.append(row[0])
1141

  
1142
    fake_formdef = FormDef()
1143
    common_fields = sql.get_view_fields(fake_formdef)
1144
    # remove formdef_id for the purpose of this test
1145
    common_fields.remove([x for x in common_fields if x[1] == 'formdef_id'][0])
1146

  
1147
    union = ' UNION '.join(
1148
        ['''SELECT %s FROM %s''' % (', '.join([y[1] for y in common_fields]), x) for x in view_names]
1149
    )
1150
    assert 'formdef_id' not in union
1151
    cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
1152

  
1153 1130
    sql.migrate()
1154 1131

  
1155 1132
    assert column_exists_in_table(cur, 'wcs_all_forms', 'formdef_id')
......
1162 1139
def test_migration_6_actions_roles(formdef):
1163 1140
    conn, cur = sql.get_connection_and_cursor()
1164 1141
    cur.execute('UPDATE wcs_meta SET value = 5 WHERE key = %s', ('sql_level',))
1165
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1142
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1166 1143

  
1167 1144
    # hack a formdef table the wrong way, to check it is reconstructed
1168 1145
    # properly before the views are created
......
1187 1164
def test_migration_10_submission_channel(formdef):
1188 1165
    conn, cur = sql.get_connection_and_cursor()
1189 1166
    cur.execute('UPDATE wcs_meta SET value = 9 WHERE key = %s', ('sql_level',))
1190
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1167
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1191 1168

  
1192 1169
    # hack a formdef table the wrong way, to check it is reconstructed
1193 1170
    # properly before the views are created
......
1338 1315
    for table_name in table_names:
1339 1316
        if table_name.startswith('formdata_'):
1340 1317
            cur.execute('DROP TABLE %s CASCADE' % table_name)
1318
    cur.execute('TRUNCATE wcs_all_forms;')
1341 1319

  
1342 1320

  
1343 1321
def test_is_at_endpoint(pub):
......
1990 1968
def test_migration_31_user_label(formdef):
1991 1969
    conn, cur = sql.get_connection_and_cursor()
1992 1970
    cur.execute('UPDATE wcs_meta SET value = 30 WHERE key = %s', ('sql_level',))
1993
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1971
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1994 1972

  
1995 1973
    cur.execute('DROP VIEW wcs_view_1_tests')
1996 1974
    cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN user_label')
......
2020 1998

  
2021 1999
    conn, cur = sql.get_connection_and_cursor()
2022 2000
    cur.execute('UPDATE wcs_meta SET value = 37 WHERE key = %s', ('sql_level',))
2023
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
2001
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
2024 2002

  
2025 2003
    cur.execute('DROP VIEW wcs_view_1_tests')
2026 2004
    cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN submission_agent_id')
wcs/sql.py
432 432
    return formdef.table_name
433 433

  
434 434

  
435
def get_formdef_trigger_function_name(formdef):
436
    assert formdef.id is not None
437
    return '%s_%s_trigger_fn' % (formdef.data_sql_prefix, formdef.id)
438

  
439

  
440
def get_formdef_trigger_name(formdef):
441
    assert formdef.id is not None
442
    return '%s_%s_trigger' % (formdef.data_sql_prefix, formdef.id)
443

  
444

  
435 445
def get_formdef_new_id(id_start):
436 446
    new_id = id_start
437 447
    conn, cur = get_connection_and_cursor()
......
477 487
        ('formdata\\_%%\\_%%',),
478 488
    )
479 489
    for table_name in [x[0] for x in cur.fetchall()]:
490
        cur.execute('DELETE FROM %s' % table_name)  # Force trigger execution
480 491
        cur.execute('''DROP TABLE %s CASCADE''' % table_name)
492
    cur.execute("SELECT relkind FROM pg_class WHERE relname = 'wcs_all_forms'")
493
    row = cur.fetchone()
494
    # only do the delete if wcs_all_forms is a table and not still a view
495
    if row is not None and row[0] == 'r':
496
        cur.execute("DELETE FROM wcs_all_forms WHERE object_type = 'formdata'")
481 497
    conn.commit()
482 498
    cur.close()
483 499

  
......
491 507
        ('carddata\\_%%\\_%%',),
492 508
    )
493 509
    for table_name in [x[0] for x in cur.fetchall()]:
510
        cur.execute('DELETE FROM %s' % table_name)  # Force trigger execution
494 511
        cur.execute('''DROP TABLE %s CASCADE''' % table_name)
495 512
    conn.commit()
496 513
    cur.close()
......
689 706
    for field in existing_fields - needed_fields:
690 707
        cur.execute('''ALTER TABLE %s DROP COLUMN %s CASCADE''' % (table_name, field))
691 708

  
709
    # recreate the trigger function, just so it's uptodate
710
    category_value = formdef.category_id
711
    geoloc_base_x_query = "NULL"
712
    geoloc_base_y_query = "NULL"
713
    if formdef.geolocations and 'base' in formdef.geolocations:
714
        # default geolocation is in the 'base' key; we have to unstructure the
715
        # field is the POINT type of postgresql cannot be used directly as it
716
        # doesn't have an equality operator.
717
        geoloc_base_x_query = "NEW.geoloc_base[0]"
718
        geoloc_base_y_query = "NEW.geoloc_base[1]"
719
    if formdef.category_id is None:
720
        category_value = "NULL"
721
    criticality_levels = len(formdef.workflow.criticality_levels or [0])
722
    endpoint_status = formdef.workflow.get_endpoint_status()
723
    endpoint_status_filter = ", ".join(["'wf-%s'" % x.id for x in endpoint_status])
724
    if endpoint_status_filter == "":
725
        # not the prettiest in town, but will do fine for now.
726
        endpoint_status_filter = "'xxxx'"
727
    object_type = formdef.data_sql_prefix  # shortcut.
728
    formed_name_quotedstring = psycopg2.extensions.QuotedString(formdef.name)
729
    formed_name_quotedstring.encoding = 'utf8'
730
    formdef_name = formed_name_quotedstring.getquoted().decode()
731
    cur.execute(
732
        '''
733
CREATE OR REPLACE FUNCTION {trg_fn_name}()
734
RETURNS trigger
735
LANGUAGE plpgsql
736
AS $$
737
BEGIN
738
    -- TODO : sync back from users change !
739
    IF TG_OP = 'DELETE' THEN
740
        DELETE FROM wcs_all_forms WHERE formdef_id = {formdef_id} AND id = OLD.id AND object_type = '{object_type}';
741
        RETURN OLD;
742
    ELSEIF TG_OP = 'INSERT' THEN
743
        INSERT INTO wcs_all_forms VALUES (
744
            {category_id},
745
            '{object_type}',
746
            {formdef_id},
747
            NEW.id,
748
            NEW.user_id,
749
            NEW.receipt_time,
750
            NEW.status,
751
            NEW.id_display,
752
            NEW.submission_agent_id,
753
            NEW.submission_channel,
754
            NEW.backoffice_submission,
755
            NEW.last_update_time,
756
            NEW.digests,
757
            NEW.user_label,
758
            NEW.concerned_roles_array,
759
            NEW.actions_roles_array,
760
            NEW.fts,
761
            NEW.status IN ({endpoint_status}),
762
            {formdef_name},
763
            (SELECT name FROM users WHERE users.id = CAST(NEW.user_id AS INTEGER)),
764
            NEW.criticality_level - {criticality_levels},
765
            {geoloc_base_x},
766
            {geoloc_base_y},
767
            NEW.anonymised);
768
        RETURN NEW;
769
    ELSE
770
        UPDATE wcs_all_forms SET
771
                user_id = NEW.user_id,
772
                receipt_time = NEW.receipt_time,
773
                status = NEW.status,
774
                id_display = NEW.id_display,
775
                submission_agent_id = NEW.submission_agent_id,
776
                submission_channel = NEW.submission_channel,
777
                backoffice_submission = NEW.backoffice_submission,
778
                last_update_time = NEW.last_update_time,
779
                digests = NEW.digests,
780
                user_label = NEW.user_label,
781
                concerned_roles_array = NEW.concerned_roles_array,
782
                actions_roles_array = NEW.actions_roles_array,
783
                fts = NEW.fts,
784
                is_at_endpoint = NEW.status IN ({endpoint_status}),
785
                formdef_name = {formdef_name},
786
                user_name = (SELECT name FROM users WHERE users.id = CAST(NEW.user_id AS INTEGER)),
787
                criticality_level = NEW.criticality_level - {criticality_levels},
788
                geoloc_base_x = {geoloc_base_x},
789
                geoloc_base_y = {geoloc_base_y},
790
                anonymised = NEW.anonymised
791
            WHERE formdef_id = {formdef_id}  AND id = OLD.id AND object_type = '{object_type}';
792
        RETURN NEW;
793
    END IF;
794
END;
795
$$;
796
    '''.format(
797
            trg_fn_name=get_formdef_trigger_function_name(formdef),
798
            category_id=category_value,  # always valued ? need to handle null otherwise.
799
            formdef_id=formdef.id,
800
            geoloc_base_x=geoloc_base_x_query,
801
            geoloc_base_y=geoloc_base_y_query,
802
            formdef_name=formdef_name,
803
            criticality_levels=criticality_levels,
804
            endpoint_status=endpoint_status_filter,
805
            object_type=object_type,
806
        )
807
    )
808

  
809
    trg_name = get_formdef_trigger_name(formdef)
810
    cur.execute(
811
        '''SELECT 1 FROM pg_trigger
812
            WHERE tgrelid = '%s'::regclass
813
              AND tgname = '%s'
814
        '''
815
        % (table_name, trg_name)
816
    )
817
    if len(cur.fetchall()) == 0:
818
        cur.execute(
819
            '''CREATE TRIGGER {trg_name} AFTER INSERT OR UPDATE OR DELETE
820
                ON {table_name}
821
                FOR EACH ROW EXECUTE FUNCTION {trg_fn_name}();
822
                '''.format(
823
                trg_fn_name=get_formdef_trigger_function_name(formdef),
824
                table_name=table_name,
825
                trg_name=trg_name,
826
            )
827
        )
828

  
692 829
    # migrations on _evolutions table
693 830
    cur.execute(
694 831
        '''SELECT column_name FROM information_schema.columns
......
1385 1522
    for view_name in view_names:
1386 1523
        cur.execute('''DROP VIEW IF EXISTS %s''' % view_name)
1387 1524

  
1388
    cur.execute('''DROP VIEW IF EXISTS wcs_all_forms''')
1389

  
1390 1525

  
1391 1526
def do_global_views(conn, cur):
1392 1527
    # recreate global views
1393
    from wcs.formdef import FormDef
1394

  
1395
    view_names = [get_formdef_view_name(x) for x in FormDef.select(ignore_migration=True)]
1528
    from .carddef import CardDef
1529
    from .formdef import FormDef
1396 1530

  
1531
    # XXX TODO: make me dynamic, please ?
1397 1532
    cur.execute(
1398
        '''SELECT table_name FROM information_schema.views
1399
                    WHERE table_schema = 'public'
1400
                      AND table_name LIKE %s''',
1401
        ('wcs\\_view\\_%',),
1533
        """CREATE TABLE IF NOT EXISTS wcs_all_forms (
1534
        category_id integer,
1535
        object_type character varying NOT NULL, -- formdef or carddef
1536
        formdef_id integer NOT NULL,
1537
        id integer NOT NULL,
1538
        user_id character varying,
1539
        receipt_time timestamp without time zone,
1540
        status character varying,
1541
        id_display character varying,
1542
        submission_agent_id character varying,
1543
        submission_channel character varying,
1544
        backoffice_submission boolean,
1545
        last_update_time timestamp without time zone,
1546
        digests jsonb,
1547
        user_label character varying,
1548
        concerned_roles_array text[],
1549
        actions_roles_array text[],
1550
        fts tsvector,
1551
        is_at_endpoint boolean,
1552
        formdef_name text,
1553
        user_name character varying,
1554
        criticality_level integer,
1555
        geoloc_base_x double precision,
1556
        geoloc_base_y double precision,
1557
        anonymised timestamp with time zone
1558
        , PRIMARY KEY(object_type, formdef_id, id)
1559
    )"""
1560
    )
1561
    cur.execute(
1562
        '''CREATE INDEX IF NOT EXISTS %s_fts ON %s USING gin(fts)''' % ("wcs_all_forms", "wcs_all_forms")
1402 1563
    )
1403
    existing_views = set()
1404
    while True:
1405
        row = cur.fetchone()
1406
        if row is None:
1407
            break
1408
        existing_views.add(row[0])
1409 1564

  
1410
    view_names = existing_views.intersection(view_names)
1411
    if not view_names:
1412
        return
1565
    for attr in ('receipt_time', 'anonymised', 'user_id', 'status'):
1566
        cur.execute(
1567
            '''CREATE INDEX IF NOT EXISTS %s_%s ON %s (%s)''' % ("wcs_all_forms", attr, "wcs_all_forms", attr)
1568
        )
1569
    for attr in ('concerned_roles_array', 'actions_roles_array'):
1570
        cur.execute(
1571
            '''CREATE INDEX IF NOT EXISTS %s_%s ON %s USING gin (%s)'''
1572
            % ("wcs_all_forms", attr, "wcs_all_forms", attr)
1573
        )
1413 1574

  
1414
    cur.execute('''DROP VIEW IF EXISTS wcs_all_forms CASCADE''')
1415

  
1416
    fake_formdef = FormDef()
1417
    common_fields = get_view_fields(fake_formdef)
1418
    common_fields.append(('concerned_roles_array', 'concerned_roles_array'))
1419
    common_fields.append(('actions_roles_array', 'actions_roles_array'))
1420
    common_fields.append(('fts', 'fts'))
1421
    common_fields.append(('is_at_endpoint', 'is_at_endpoint'))
1422
    common_fields.append(('formdef_name', 'formdef_name'))
1423
    common_fields.append(('user_name', 'user_name'))
1424
    common_fields.append(('criticality_level', 'criticality_level'))
1425
    common_fields.append(('geoloc_base_x', 'geoloc_base_x'))
1426
    common_fields.append(('geoloc_base_y', 'geoloc_base_y'))
1427
    common_fields.append(('anonymised', 'anonymised'))
1428

  
1429
    union = ' UNION ALL '.join(
1430
        ['''SELECT %s FROM %s''' % (', '.join([y[1] for y in common_fields]), x) for x in view_names]
1431
    )
1432
    cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
1575
    # Purge of any dead data
1576
    valid_data = {FormDef.data_sql_prefix: set(), CardDef.data_sql_prefix: set()}
1577
    for formdef in FormDef.select(ignore_migration=True):
1578
        valid_data[formdef.data_sql_prefix].add(formdef.id)
1579
    for object_type, valid_ids in valid_data.items():
1580
        if valid_ids:
1581
            cur.execute(
1582
                "DELETE FROM wcs_all_forms WHERE object_type = '%s' AND formdef_id NOT IN (%s)"
1583
                % (object_type, ", ".join(valid_ids))
1584
            )
1585
        else:
1586
            cur.execute("DELETE FROM wcs_all_forms WHERE object_type = '%s'" % object_type)
1587
    ###fake_formdef = FormDef()
1588
    ###common_fields = get_view_fields(fake_formdef)
1589
    ###common_fields.append(('concerned_roles_array', 'concerned_roles_array'))
1590
    ###common_fields.append(('actions_roles_array', 'actions_roles_array'))
1591
    ###common_fields.append(('fts', 'fts'))
1592
    ###common_fields.append(('is_at_endpoint', 'is_at_endpoint'))
1593
    ###common_fields.append(('formdef_name', 'formdef_name'))
1594
    ###common_fields.append(('user_name', 'user_name'))
1595
    ###common_fields.append(('criticality_level', 'criticality_level'))
1596
    ###common_fields.append(('geoloc_base_x', 'geoloc_base_x'))
1597
    ###common_fields.append(('geoloc_base_y', 'geoloc_base_y'))
1598
    ###common_fields.append(('anonymised', 'anonymised'))
1433 1599

  
1434 1600
    for category in wcs.categories.Category.select():
1435 1601
        name = get_name_as_sql_identifier(category.url_name)[:40]
1436 1602
        cur.execute(
1437
            '''CREATE VIEW wcs_category_%s AS SELECT * from wcs_all_forms
1603
            '''CREATE OR REPLACE VIEW wcs_category_%s AS SELECT * from wcs_all_forms
1438 1604
                        WHERE category_id = %s'''
1439 1605
            % (name, category.id)
1440 1606
        )
......
2423 2589
        conn, cur = get_connection_and_cursor()
2424 2590
        if drop:
2425 2591
            cur.execute('''DROP TABLE %s_evolutions CASCADE''' % cls._table_name)
2592
            cur.execute('''DELETE FROM %s''' % cls._table_name)  # force trigger execution first.
2426 2593
            cur.execute('''DROP TABLE %s CASCADE''' % cls._table_name)
2427 2594
        else:
2428 2595
            cur.execute('''DELETE FROM %s_evolutions''' % cls._table_name)
......
3509 3676
# latest migration, number + description (description is not used
3510 3677
# programmaticaly but will make sure git conflicts if two migrations are
3511 3678
# separately added with the same number)
3512
SQL_LEVEL = (56, 'add gin indexes to concerned_roles_array and actions_roles_array')
3679
SQL_LEVEL = (57, 'switch wcs_all_forms to a trigger-maintained table')
3513 3680

  
3514 3681

  
3515 3682
def migrate_global_views(conn, cur):
......
3588 3755
    conn, cur = get_connection_and_cursor()
3589 3756
    sql_level = get_sql_level(conn, cur)
3590 3757
    if sql_level < 0:
3591
        # fake code to help in tetsting the error code path.
3758
        # fake code to help in testing the error code path.
3592 3759
        raise RuntimeError()
3593 3760
    if sql_level < 1:  # 1: introduction of tracking_code table
3594 3761
        do_tracking_code_table()
......
3701 3868
        # 50: switch role uuid column to varchar
3702 3869
        do_role_table()
3703 3870
        migrate_legacy_roles()
3871
    if sql_level < 57:
3872
        cur.execute("SELECT relkind FROM pg_class WHERE relname = 'wcs_all_forms';")
3873
        rows = cur.fetchall()
3874
        if len(rows) != 0:
3875
            if rows[0][0] == 'v':
3876
                # force wcs_all_forms table creation
3877
                cur.execute('DROP VIEW IF EXISTS wcs_all_forms CASCADE;')
3878
                do_global_views(conn, cur)
3879
            else:
3880
                assert rows[0][0] == 'r'
3881
        else:
3882
            do_global_views(conn, cur)
3883

  
3884
        # assert there is no row, ie we are doing a clean migration (special case with unit tests likely) ?
3885
        # cur.execute("SELECT COUNT(*) FROM wcs_all_forms;")
3886
        # assert(cur.fetchone()[0] == 0)
3887

  
3888
        # now copy all data into the table
3889
        from wcs.carddef import CardDef
3890
        from wcs.formdef import FormDef
3891

  
3892
        for formdef in FormDef.select() + CardDef.select():
3893
            category_value = formdef.category_id
3894
            if formdef.category_id is None:
3895
                category_value = "NULL"
3896
            geoloc_base_x_query = "NULL"
3897
            geoloc_base_y_query = "NULL"
3898
            if formdef.geolocations and 'base' in formdef.geolocations:
3899
                # default geolocation is in the 'base' key; we have to unstructure the
3900
                # field is the POINT type of postgresql cannot be used directly as it
3901
                # doesn't have an equality operator.
3902
                geoloc_base_x_query = "geoloc_base[0]"
3903
                geoloc_base_y_query = "geoloc_base[1]"
3904
            criticality_levels = len(formdef.workflow.criticality_levels or [0])
3905
            endpoint_status = formdef.workflow.get_endpoint_status()
3906
            endpoint_status_filter = ", ".join(["'wf-%s'" % x.id for x in endpoint_status])
3907
            if endpoint_status_filter == "":
3908
                # not the prettiest in town, but will do fine for now.
3909
                endpoint_status_filter = "'xxxx'"
3910
            object_type = formdef.data_sql_prefix  # shortcut.
3911
            formed_name_quotedstring = psycopg2.extensions.QuotedString(formdef.name)
3912
            formed_name_quotedstring.encoding = 'utf8'
3913
            formdef_name = formed_name_quotedstring.getquoted().decode()
3914
            cur.execute(
3915
                """
3916
                INSERT INTO wcs_all_forms
3917
                SELECT
3918
                    {category_id},
3919
                    '{object_type}',
3920
                    {formdef_id},
3921
                    id,
3922
                    user_id,
3923
                    receipt_time,
3924
                    status,
3925
                    id_display,
3926
                    submission_agent_id,
3927
                    submission_channel,
3928
                    backoffice_submission,
3929
                    last_update_time,
3930
                    digests,
3931
                    user_label,
3932
                    concerned_roles_array,
3933
                    actions_roles_array,
3934
                    fts,
3935
                    status IN ({endpoint_status}),
3936
                    {formdef_name},
3937
                    (SELECT name FROM users WHERE users.id = CAST(user_id AS INTEGER)),
3938
                    criticality_level - {criticality_levels},
3939
                    {geoloc_base_x},
3940
                    {geoloc_base_y},
3941
                    anonymised
3942
                FROM {table_name}
3943
                ON CONFLICT DO NOTHING;
3944
                    """.format(
3945
                    table_name=get_formdef_table_name(formdef),
3946
                    category_id=category_value,  # always valued ? need to handle null otherwise.
3947
                    formdef_id=formdef.id,
3948
                    geoloc_base_x=geoloc_base_x_query,
3949
                    geoloc_base_y=geoloc_base_y_query,
3950
                    formdef_name=formdef_name,
3951
                    criticality_levels=criticality_levels,
3952
                    endpoint_status=endpoint_status_filter,
3953
                    object_type=object_type,
3954
                )
3955
            )
3704 3956

  
3705 3957
    cur.execute('''UPDATE wcs_meta SET value = %s WHERE key = %s''', (str(SQL_LEVEL[0]), 'sql_level'))
3706 3958

  
3707
-