Projet

Général

Profil

0001-Switch-wcs_all_forms-to-a-table-maintained-by-trigge.patch

Pierre Ducroquet, 18 janvier 2022 10:09

Télécharger (20 ko)

Voir les différences:

Subject: [PATCH] Switch wcs_all_forms to a table maintained by triggers
 (#60552)

Instead of a view, wcs_all_forms is now a table.
Direct benefits:
- we will no longer explode the collapse limit of the PG optimizer
- since we won't read every table, we can be **much** faster
(we aren't yet, I have not done the indexes so far)
- the optimizer will have less work, thus it will be faster
(I said it already)
 tests/test_sql.py |  34 +-----
 wcs/sql.py        | 292 +++++++++++++++++++++++++++++++++++++++-------
 2 files changed, 256 insertions(+), 70 deletions(-)
tests/test_sql.py
1115 1115
def test_migration_2_formdef_id_in_views(formdef):
1116 1116
    conn, cur = sql.get_connection_and_cursor()
1117 1117
    cur.execute('UPDATE wcs_meta SET value = 1 WHERE key = %s', ('sql_level',))
1118
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1118
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1119 1119

  
1120 1120
    # hack a formdef table the wrong way, to check it is reconstructed
1121 1121
    # properly before the views are created
......
1127 1127
    assert table_exists(cur, 'wcs_view_1_tests')
1128 1128
    assert not column_exists_in_table(cur, 'wcs_view_1_tests', 'f4_display')
1129 1129

  
1130
    view_names = []
1131
    cur.execute(
1132
        '''SELECT table_name FROM information_schema.views
1133
                   WHERE table_name LIKE %s''',
1134
        ('wcs_view_%',),
1135
    )
1136
    while True:
1137
        row = cur.fetchone()
1138
        if row is None:
1139
            break
1140
        view_names.append(row[0])
1141

  
1142
    fake_formdef = FormDef()
1143
    common_fields = sql.get_view_fields(fake_formdef)
1144
    # remove formdef_id for the purpose of this test
1145
    common_fields.remove([x for x in common_fields if x[1] == 'formdef_id'][0])
1146

  
1147
    union = ' UNION '.join(
1148
        ['''SELECT %s FROM %s''' % (', '.join([y[1] for y in common_fields]), x) for x in view_names]
1149
    )
1150
    assert 'formdef_id' not in union
1151
    cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
1152

  
1153 1130
    sql.migrate()
1154 1131

  
1155 1132
    assert column_exists_in_table(cur, 'wcs_all_forms', 'formdef_id')
......
1162 1139
def test_migration_6_actions_roles(formdef):
1163 1140
    conn, cur = sql.get_connection_and_cursor()
1164 1141
    cur.execute('UPDATE wcs_meta SET value = 5 WHERE key = %s', ('sql_level',))
1165
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1142
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1166 1143

  
1167 1144
    # hack a formdef table the wrong way, to check it is reconstructed
1168 1145
    # properly before the views are created
......
1187 1164
def test_migration_10_submission_channel(formdef):
1188 1165
    conn, cur = sql.get_connection_and_cursor()
1189 1166
    cur.execute('UPDATE wcs_meta SET value = 9 WHERE key = %s', ('sql_level',))
1190
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1167
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1191 1168

  
1192 1169
    # hack a formdef table the wrong way, to check it is reconstructed
1193 1170
    # properly before the views are created
......
1338 1315
    for table_name in table_names:
1339 1316
        if table_name.startswith('formdata_'):
1340 1317
            cur.execute('DROP TABLE %s CASCADE' % table_name)
1318
    cur.execute('TRUNCATE wcs_all_forms;')
1341 1319

  
1342 1320

  
1343 1321
def test_is_at_endpoint(pub):
......
1990 1968
def test_migration_31_user_label(formdef):
1991 1969
    conn, cur = sql.get_connection_and_cursor()
1992 1970
    cur.execute('UPDATE wcs_meta SET value = 30 WHERE key = %s', ('sql_level',))
1993
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
1971
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
1994 1972

  
1995 1973
    cur.execute('DROP VIEW wcs_view_1_tests')
1996 1974
    cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN user_label')
......
2020 1998

  
2021 1999
    conn, cur = sql.get_connection_and_cursor()
2022 2000
    cur.execute('UPDATE wcs_meta SET value = 37 WHERE key = %s', ('sql_level',))
2023
    cur.execute('DROP VIEW wcs_all_forms CASCADE')
2001
    cur.execute('DROP TABLE wcs_all_forms CASCADE')
2024 2002

  
2025 2003
    cur.execute('DROP VIEW wcs_view_1_tests')
2026 2004
    cur.execute('ALTER TABLE formdata_1_tests DROP COLUMN submission_agent_id')
wcs/sql.py
431 431
    formdef.store(object_only=True)
432 432
    return formdef.table_name
433 433

  
434
def get_formdef_trigger_function_name(formdef):
435
    assert formdef.id is not None
436
    return '%s_%s_trigger_fn' % (formdef.data_sql_prefix, formdef.id)
437

  
438
def get_formdef_trigger_name(formdef):
439
    assert formdef.id is not None
440
    return '%s_%s_trigger' % (formdef.data_sql_prefix, formdef.id)
434 441

  
435 442
def get_formdef_new_id(id_start):
436 443
    new_id = id_start
......
477 484
        ('formdata\\_%%\\_%%',),
478 485
    )
479 486
    for table_name in [x[0] for x in cur.fetchall()]:
487
        cur.execute('DELETE FROM %s' % table_name)  # Force trigger execution
480 488
        cur.execute('''DROP TABLE %s CASCADE''' % table_name)
481 489
    conn.commit()
482 490
    cur.close()
......
491 499
        ('carddata\\_%%\\_%%',),
492 500
    )
493 501
    for table_name in [x[0] for x in cur.fetchall()]:
502
        cur.execute('DELETE FROM %s' % table_name)  # Force trigger execution
494 503
        cur.execute('''DROP TABLE %s CASCADE''' % table_name)
495 504
    conn.commit()
496 505
    cur.close()
......
689 698
    for field in existing_fields - needed_fields:
690 699
        cur.execute('''ALTER TABLE %s DROP COLUMN %s CASCADE''' % (table_name, field))
691 700

  
701

  
702
    # recreate the trigger function, just so it's uptodate
703
    category_value = formdef.category_id
704
    geoloc_base_x_query = "NULL"
705
    geoloc_base_y_query = "NULL"
706
    if formdef.geolocations and 'base' in formdef.geolocations:
707
        # default geolocation is in the 'base' key; we have to unstructure the
708
        # field is the POINT type of postgresql cannot be used directly as it
709
        # doesn't have an equality operator.
710
        geoloc_base_x_query = "NEW.geoloc_base[0]"
711
        geoloc_base_y_query = "NEW.geoloc_base[1]"
712
    if formdef.category_id is None:
713
        category_value = "NULL"
714
    criticality_levels = len(formdef.workflow.criticality_levels or [0])
715
    endpoint_status = formdef.workflow.get_endpoint_status()
716
    endpoint_status_filter = ", ".join(["'wf-%s'" % x.id for x in endpoint_status])
717
    if endpoint_status_filter == "":
718
        # not the prettiest in town, but will do fine for now.
719
        endpoint_status_filter = "'xxxx'"
720
    object_type = formdef.data_sql_prefix   # shortcut.
721
    formed_name_quotedstring = psycopg2.extensions.QuotedString(formdef.name)
722
    formed_name_quotedstring.encoding = 'utf8'
723
    formdef_name = formed_name_quotedstring.getquoted().decode()
724
    cur.execute('''
725
CREATE OR REPLACE FUNCTION {trg_fn_name}()
726
RETURNS trigger
727
LANGUAGE plpgsql
728
AS $$
729
BEGIN
730
    -- TODO : sync back from users change !
731
    IF TG_OP = 'DELETE' THEN
732
        DELETE FROM wcs_all_forms WHERE formdef_id = {formdef_id} AND id = OLD.id AND object_type = '{object_type}';
733
        RETURN OLD;
734
    ELSEIF TG_OP = 'INSERT' THEN
735
        INSERT INTO wcs_all_forms VALUES (
736
            {category_id},
737
            '{object_type}',
738
            {formdef_id},
739
            NEW.id,
740
            NEW.user_id,
741
            NEW.receipt_time,
742
            NEW.status,
743
            NEW.id_display,
744
            NEW.submission_agent_id,
745
            NEW.submission_channel,
746
            NEW.backoffice_submission,
747
            NEW.last_update_time,
748
            NEW.digests,
749
            NEW.user_label,
750
            NEW.concerned_roles_array,
751
            NEW.actions_roles_array,
752
            NEW.fts,
753
            NEW.status IN ({endpoint_status}),
754
            {formdef_name},
755
            (SELECT name FROM users WHERE users.id = CAST(NEW.user_id AS INTEGER)),
756
            NEW.criticality_level - {criticality_levels},
757
            {geoloc_base_x},
758
            {geoloc_base_y},
759
            NEW.anonymised);
760
        RETURN NEW;
761
    ELSE
762
        UPDATE wcs_all_forms SET
763
                user_id = NEW.user_id,
764
                receipt_time = NEW.receipt_time,
765
                status = NEW.status,
766
                id_display = NEW.id_display,
767
                submission_agent_id = NEW.submission_agent_id,
768
                submission_channel = NEW.submission_channel,
769
                backoffice_submission = NEW.backoffice_submission,
770
                last_update_time = NEW.last_update_time,
771
                digests = NEW.digests,
772
                user_label = NEW.user_label,
773
                concerned_roles_array = NEW.concerned_roles_array,
774
                actions_roles_array = NEW.actions_roles_array,
775
                fts = NEW.fts,
776
                is_at_endpoint = NEW.status IN ({endpoint_status}),
777
                formdef_name = {formdef_name},
778
                user_name = (SELECT name FROM users WHERE users.id = CAST(NEW.user_id AS INTEGER)),
779
                criticality_level = NEW.criticality_level - {criticality_levels},
780
                geoloc_base_x = {geoloc_base_x},
781
                geoloc_base_y = {geoloc_base_y},
782
                anonymised = NEW.anonymised
783
            WHERE formdef_id = {formdef_id}  AND id = OLD.id AND object_type = '{object_type}';
784
        RETURN NEW;
785
    END IF;
786
END;
787
$$;
788
    '''.format(trg_fn_name = get_formdef_trigger_function_name(formdef),
789
               category_id = category_value,                   # always valued ? need to handle null otherwise.
790
               formdef_id = formdef.id,
791
               geoloc_base_x = geoloc_base_x_query,
792
               geoloc_base_y = geoloc_base_y_query,
793
               formdef_name = formdef_name,
794
               criticality_levels = criticality_levels,
795
               endpoint_status = endpoint_status_filter,
796
               object_type = object_type
797
        ))
798

  
799
    trg_name = get_formdef_trigger_name(formdef)
800
    cur.execute(
801
        '''SELECT 1 FROM pg_trigger
802
            WHERE tgrelid = '%s'::regclass
803
              AND tgname = '%s'
804
        ''' % (table_name, trg_name))
805
    if len(cur.fetchall()) == 0:
806
        cur.execute(
807
            '''CREATE TRIGGER {trg_name} AFTER INSERT OR UPDATE OR DELETE
808
                ON {table_name}
809
                FOR EACH ROW EXECUTE FUNCTION {trg_fn_name}();
810
                '''.format(trg_fn_name = get_formdef_trigger_function_name(formdef),
811
                           table_name = table_name,
812
                           trg_name = trg_name))
813

  
692 814
    # migrations on _evolutions table
693 815
    cur.execute(
694 816
        '''SELECT column_name FROM information_schema.columns
......
1385 1507
    for view_name in view_names:
1386 1508
        cur.execute('''DROP VIEW IF EXISTS %s''' % view_name)
1387 1509

  
1388
    cur.execute('''DROP VIEW IF EXISTS wcs_all_forms''')
1389

  
1390 1510

  
1391 1511
def do_global_views(conn, cur):
1392 1512
    # recreate global views
1393 1513
    from wcs.formdef import FormDef
1394 1514

  
1395
    view_names = [get_formdef_view_name(x) for x in FormDef.select(ignore_migration=True)]
1396

  
1397
    cur.execute(
1398
        '''SELECT table_name FROM information_schema.views
1399
                    WHERE table_schema = 'public'
1400
                      AND table_name LIKE %s''',
1401
        ('wcs\\_view\\_%',),
1402
    )
1403
    existing_views = set()
1404
    while True:
1405
        row = cur.fetchone()
1406
        if row is None:
1407
            break
1408
        existing_views.add(row[0])
1409

  
1410
    view_names = existing_views.intersection(view_names)
1411
    if not view_names:
1412
        return
1413

  
1414
    cur.execute('''DROP VIEW IF EXISTS wcs_all_forms CASCADE''')
1415

  
1416
    fake_formdef = FormDef()
1417
    common_fields = get_view_fields(fake_formdef)
1418
    common_fields.append(('concerned_roles_array', 'concerned_roles_array'))
1419
    common_fields.append(('actions_roles_array', 'actions_roles_array'))
1420
    common_fields.append(('fts', 'fts'))
1421
    common_fields.append(('is_at_endpoint', 'is_at_endpoint'))
1422
    common_fields.append(('formdef_name', 'formdef_name'))
1423
    common_fields.append(('user_name', 'user_name'))
1424
    common_fields.append(('criticality_level', 'criticality_level'))
1425
    common_fields.append(('geoloc_base_x', 'geoloc_base_x'))
1426
    common_fields.append(('geoloc_base_y', 'geoloc_base_y'))
1427
    common_fields.append(('anonymised', 'anonymised'))
1428

  
1429
    union = ' UNION ALL '.join(
1430
        ['''SELECT %s FROM %s''' % (', '.join([y[1] for y in common_fields]), x) for x in view_names]
1431
    )
1432
    cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
1515
    # XXX TODO: make me dynamic, please ?
1516
    cur.execute("""CREATE TABLE IF NOT EXISTS wcs_all_forms (
1517
        category_id integer,
1518
        object_type character varying NOT NULL, -- formdef or carddef
1519
        formdef_id integer NOT NULL,
1520
        id integer NOT NULL,
1521
        user_id character varying,
1522
        receipt_time timestamp without time zone,
1523
        status character varying,
1524
        id_display character varying,
1525
        submission_agent_id character varying,
1526
        submission_channel character varying,
1527
        backoffice_submission boolean,
1528
        last_update_time timestamp without time zone,
1529
        digests jsonb,
1530
        user_label character varying,
1531
        concerned_roles_array text[],
1532
        actions_roles_array text[],
1533
        fts tsvector,
1534
        is_at_endpoint boolean,
1535
        formdef_name text,
1536
        user_name character varying,
1537
        criticality_level integer,
1538
        geoloc_base_x double precision,
1539
        geoloc_base_y double precision,
1540
        anonymised timestamp with time zone
1541
        , PRIMARY KEY(object_type, formdef_id, id)
1542
    )""")
1543

  
1544
    ###fake_formdef = FormDef()
1545
    ###common_fields = get_view_fields(fake_formdef)
1546
    ###common_fields.append(('concerned_roles_array', 'concerned_roles_array'))
1547
    ###common_fields.append(('actions_roles_array', 'actions_roles_array'))
1548
    ###common_fields.append(('fts', 'fts'))
1549
    ###common_fields.append(('is_at_endpoint', 'is_at_endpoint'))
1550
    ###common_fields.append(('formdef_name', 'formdef_name'))
1551
    ###common_fields.append(('user_name', 'user_name'))
1552
    ###common_fields.append(('criticality_level', 'criticality_level'))
1553
    ###common_fields.append(('geoloc_base_x', 'geoloc_base_x'))
1554
    ###common_fields.append(('geoloc_base_y', 'geoloc_base_y'))
1555
    ###common_fields.append(('anonymised', 'anonymised'))
1433 1556

  
1434 1557
    for category in wcs.categories.Category.select():
1435 1558
        name = get_name_as_sql_identifier(category.url_name)[:40]
......
2423 2546
        conn, cur = get_connection_and_cursor()
2424 2547
        if drop:
2425 2548
            cur.execute('''DROP TABLE %s_evolutions CASCADE''' % cls._table_name)
2549
            cur.execute('''DELETE FROM %s''' % cls._table_name) # force trigger execution first.
2426 2550
            cur.execute('''DROP TABLE %s CASCADE''' % cls._table_name)
2427 2551
        else:
2428 2552
            cur.execute('''DELETE FROM %s_evolutions''' % cls._table_name)
......
3509 3633
# latest migration, number + description (description is not used
3510 3634
# programmaticaly but will make sure git conflicts if two migrations are
3511 3635
# separately added with the same number)
3512
SQL_LEVEL = (56, 'add gin indexes to concerned_roles_array and actions_roles_array')
3636
SQL_LEVEL = (57, 'switch wcs_all_forms to a trigger-maintained table')
3513 3637

  
3514 3638

  
3515 3639
def migrate_global_views(conn, cur):
......
3588 3712
    conn, cur = get_connection_and_cursor()
3589 3713
    sql_level = get_sql_level(conn, cur)
3590 3714
    if sql_level < 0:
3591
        # fake code to help in tetsting the error code path.
3715
        # fake code to help in testing the error code path.
3592 3716
        raise RuntimeError()
3593 3717
    if sql_level < 1:  # 1: introduction of tracking_code table
3594 3718
        do_tracking_code_table()
......
3701 3825
        # 50: switch role uuid column to varchar
3702 3826
        do_role_table()
3703 3827
        migrate_legacy_roles()
3828
    if sql_level < 57:
3829
        cur.execute("SELECT relkind FROM pg_class WHERE relname = 'wcs_all_forms';")
3830
        rows = cur.fetchall()
3831
        if len(rows) != 0:
3832
            if rows[0][0] == 'v':
3833
                # force wcs_all_forms table creation
3834
                cur.execute('DROP VIEW IF EXISTS wcs_all_forms CASCADE;')
3835
                do_global_views()
3836
            else:
3837
                assert(rows[0][0] == 'r')
3838
        else:
3839
            do_global_views()
3840

  
3841
        # assert there is no row, ie we are doing a clean migration (special case with unit tests likely) ?
3842
        #cur.execute("SELECT COUNT(*) FROM wcs_all_forms;")
3843
        #assert(cur.fetchone()[0] == 0)
3844

  
3845
        # now copy all data into the table
3846
        from wcs.carddef import CardDef
3847
        from wcs.formdef import FormDef
3848

  
3849
        for formdef in FormDef.select() + CardDef.select():
3850
            category_value = formdef.category_id
3851
            if formdef.category_id is None:
3852
                category_value = "NULL"
3853
            geoloc_base_x_query = "NULL"
3854
            geoloc_base_y_query = "NULL"
3855
            if formdef.geolocations and 'base' in formdef.geolocations:
3856
                # default geolocation is in the 'base' key; we have to unstructure the
3857
                # field is the POINT type of postgresql cannot be used directly as it
3858
                # doesn't have an equality operator.
3859
                geoloc_base_x_query = "geoloc_base[0]"
3860
                geoloc_base_y_query = "geoloc_base[1]"
3861
            criticality_levels = len(formdef.workflow.criticality_levels or [0])
3862
            endpoint_status = formdef.workflow.get_endpoint_status()
3863
            endpoint_status_filter = ", ".join(["'wf-%s'" % x.id for x in endpoint_status])
3864
            if endpoint_status_filter == "":
3865
                # not the prettiest in town, but will do fine for now.
3866
                endpoint_status_filter = "'xxxx'"
3867
            object_type = formdef.data_sql_prefix   # shortcut.
3868
            formed_name_quotedstring = psycopg2.extensions.QuotedString(formdef.name)
3869
            formed_name_quotedstring.encoding = 'utf8'
3870
            formdef_name = formed_name_quotedstring.getquoted().decode()
3871
            cur.execute("""
3872
                INSERT INTO wcs_all_forms
3873
                SELECT
3874
                    {category_id},
3875
                    '{object_type}',
3876
                    {formdef_id},
3877
                    id,
3878
                    user_id,
3879
                    receipt_time,
3880
                    status,
3881
                    id_display,
3882
                    submission_agent_id,
3883
                    submission_channel,
3884
                    backoffice_submission,
3885
                    last_update_time,
3886
                    digests,
3887
                    user_label,
3888
                    concerned_roles_array,
3889
                    actions_roles_array,
3890
                    fts,
3891
                    status IN ({endpoint_status}),
3892
                    {formdef_name},
3893
                    (SELECT name FROM users WHERE users.id = CAST(user_id AS INTEGER)),
3894
                    criticality_level - {criticality_levels},
3895
                    {geoloc_base_x},
3896
                    {geoloc_base_y},
3897
                    anonymised
3898
                FROM {table_name}
3899
                ON CONFLICT DO NOTHING;
3900
                    """.format(
3901
                        table_name = get_formdef_table_name(formdef),
3902
                        category_id = category_value,                   # always valued ? need to handle null otherwise.
3903
                        formdef_id = formdef.id,
3904
                        geoloc_base_x = geoloc_base_x_query,
3905
                        geoloc_base_y = geoloc_base_y_query,
3906
                        formdef_name = formdef_name,
3907
                        criticality_levels = criticality_levels,
3908
                        endpoint_status = endpoint_status_filter,
3909
                        object_type = object_type
3910
                    ))
3911

  
3704 3912

  
3705 3913
    cur.execute('''UPDATE wcs_meta SET value = %s WHERE key = %s''', (str(SQL_LEVEL[0]), 'sql_level'))
3706 3914

  
3707
-