Projet

Général

Profil

0001-First-version-of-wcs_all_forms-as-a-table-maintained.patch

Pierre Ducroquet, 16 janvier 2022 20:52

Télécharger (10,6 ko)

Voir les différences:

Subject: [PATCH] First version of wcs_all_forms as a table maintained by
 triggers (#60552)

Instead of a view, wcs_all_forms is now a table.
Direct benefits:
- we will no longer explode the collapse limit of the PG optimizer
- since we won't read every table, we can be **much** faster
(we aren't yet, I have not done the indexes so far)
- the optimizer will have less work, thus it will be faster
(I said it already)
 tests/test_sql.py |   1 +
 wcs/sql.py        | 171 ++++++++++++++++++++++++++++++++++++++++------
 2 files changed, 150 insertions(+), 22 deletions(-)
tests/test_sql.py
1338 1338
    for table_name in table_names:
1339 1339
        if table_name.startswith('formdata_'):
1340 1340
            cur.execute('DROP TABLE %s CASCADE' % table_name)
1341
    cur.execute('TRUNCATE wcs_all_forms;')
1341 1342

  
1342 1343

  
1343 1344
def test_is_at_endpoint(pub):
wcs/sql.py
431 431
    formdef.store(object_only=True)
432 432
    return formdef.table_name
433 433

  
434
def get_formdef_trigger_function_name(formdef):
435
    assert formdef.id is not None
436
    return '%s_%s_trigger_fn' % (formdef.data_sql_prefix, formdef.id)
437

  
438
def get_formdef_trigger_name(formdef):
439
    assert formdef.id is not None
440
    return '%s_%s_trigger' % (formdef.data_sql_prefix, formdef.id)
434 441

  
435 442
def get_formdef_new_id(id_start):
436 443
    new_id = id_start
......
478 485
    )
479 486
    for table_name in [x[0] for x in cur.fetchall()]:
480 487
        cur.execute('''DROP TABLE %s CASCADE''' % table_name)
488
    cur.execute('TRUNCATE wcs_all_forms;')
481 489
    conn.commit()
482 490
    cur.close()
483 491

  
......
689 697
    for field in existing_fields - needed_fields:
690 698
        cur.execute('''ALTER TABLE %s DROP COLUMN %s CASCADE''' % (table_name, field))
691 699

  
700

  
701
    # recreate the trigger function, just so it's uptodate
702
    category_value = formdef.category_id
703
    geoloc_base_x_query = "NULL"
704
    geoloc_base_y_query = "NULL"
705
    if formdef.geolocations and 'base' in formdef.geolocations:
706
        # default geolocation is in the 'base' key; we have to unstructure the
707
        # field is the POINT type of postgresql cannot be used directly as it
708
        # doesn't have an equality operator.
709
        geoloc_base_x_query = "NEW.geoloc_base[0]"
710
        geoloc_base_y_query = "NEW.geoloc_base[1]"
711
    if formdef.category_id is None:
712
        category_value = "NULL"
713
    criticality_levels = len(formdef.workflow.criticality_levels or [0])
714
    endpoint_status = formdef.workflow.get_endpoint_status()
715
    endpoint_status_filter = ", ".join(["'wf-%s'" % x.id for x in endpoint_status])
716
    if endpoint_status_filter == "":
717
        # not the prettiest in town, but will do fine for now.
718
        endpoint_status_filter = "'xxxx'"
719
    cur.execute('''
720
CREATE OR REPLACE FUNCTION {trg_fn_name}()
721
RETURNS trigger
722
LANGUAGE plpgsql
723
AS $$
724
BEGIN
725
    -- TODO : sync back from users change !
726
    IF TG_OP = 'DELETE' THEN
727
        DELETE FROM wcs_all_forms WHERE formdef_id = {formdef_id} AND id = OLD.id;
728
        RETURN OLD;
729
    ELSEIF TG_OP = 'INSERT' THEN
730
        INSERT INTO wcs_all_forms VALUES (
731
            {category_id},
732
            {formdef_id},
733
            NEW.id,
734
            NEW.user_id,
735
            NEW.receipt_time,
736
            NEW.status,
737
            NEW.id_display,
738
            NEW.submission_agent_id,
739
            NEW.submission_channel,
740
            NEW.backoffice_submission,
741
            NEW.last_update_time,
742
            NEW.digests,
743
            NEW.user_label,
744
            NEW.concerned_roles_array,
745
            NEW.actions_roles_array,
746
            NEW.fts,
747
            NEW.status IN ({endpoint_status}),
748
            {formdef_name},
749
            (SELECT name FROM users WHERE users.id = CAST(NEW.user_id AS INTEGER)),
750
            NEW.criticality_level - {criticality_levels},
751
            {geoloc_base_x},
752
            {geoloc_base_y},
753
            NEW.anonymised);
754
        RETURN NEW;
755
    ELSE
756
        UPDATE wcs_all_forms SET
757
                user_id = NEW.user_id,
758
                receipt_time = NEW.receipt_time,
759
                status = NEW.status,
760
                id_display = NEW.id_display,
761
                submission_agent_id = NEW.submission_agent_id,
762
                submission_channel = NEW.submission_channel,
763
                backoffice_submission = NEW.backoffice_submission,
764
                last_update_time = NEW.last_update_time,
765
                digests = NEW.digests,
766
                user_label = NEW.user_label,
767
                concerned_roles_array = NEW.concerned_roles_array,
768
                actions_roles_array = NEW.actions_roles_array,
769
                fts = NEW.fts,
770
                is_at_endpoint = NEW.status IN ({endpoint_status}),
771
                formdef_name = {formdef_name},
772
                user_name = (SELECT name FROM users WHERE users.id = CAST(NEW.user_id AS INTEGER)),
773
                criticality_level = NEW.criticality_level - {criticality_levels},
774
                geoloc_base_x = {geoloc_base_x},
775
                geoloc_base_y = {geoloc_base_y},
776
                anonymised = NEW.anonymised
777
            WHERE formdef_id = {formdef_id}  AND id = OLD.id;
778
        RETURN NEW;
779
    END IF;
780
END;
781
$$;
782
    '''.format(trg_fn_name = get_formdef_trigger_function_name(formdef),
783
               category_id = category_value,                   # always valued ? need to handle null otherwise.
784
               formdef_id = formdef.id,
785
               geoloc_base_x = geoloc_base_x_query,
786
               geoloc_base_y = geoloc_base_y_query,
787
               formdef_name = psycopg2.extensions.QuotedString(formdef.name).getquoted().decode(),
788
               criticality_levels = criticality_levels,
789
               endpoint_status = endpoint_status_filter
790
        ))
791

  
792
    cur.execute('''CREATE OR REPLACE TRIGGER {trg_name} AFTER INSERT OR UPDATE OR DELETE
793
                ON {table_name}
794
                FOR EACH ROW EXECUTE FUNCTION {trg_fn_name}();
795
                '''.format(trg_fn_name = get_formdef_trigger_function_name(formdef),
796
                           table_name = table_name,
797
                           trg_name = get_formdef_trigger_name(formdef)))
798

  
692 799
    # migrations on _evolutions table
693 800
    cur.execute(
694 801
        '''SELECT column_name FROM information_schema.columns
......
1385 1492
    for view_name in view_names:
1386 1493
        cur.execute('''DROP VIEW IF EXISTS %s''' % view_name)
1387 1494

  
1388
    cur.execute('''DROP VIEW IF EXISTS wcs_all_forms''')
1389

  
1390 1495

  
1391 1496
def do_global_views(conn, cur):
1392 1497
    # recreate global views
......
1411 1516
    if not view_names:
1412 1517
        return
1413 1518

  
1414
    cur.execute('''DROP VIEW IF EXISTS wcs_all_forms CASCADE''')
1415

  
1416
    fake_formdef = FormDef()
1417
    common_fields = get_view_fields(fake_formdef)
1418
    common_fields.append(('concerned_roles_array', 'concerned_roles_array'))
1419
    common_fields.append(('actions_roles_array', 'actions_roles_array'))
1420
    common_fields.append(('fts', 'fts'))
1421
    common_fields.append(('is_at_endpoint', 'is_at_endpoint'))
1422
    common_fields.append(('formdef_name', 'formdef_name'))
1423
    common_fields.append(('user_name', 'user_name'))
1424
    common_fields.append(('criticality_level', 'criticality_level'))
1425
    common_fields.append(('geoloc_base_x', 'geoloc_base_x'))
1426
    common_fields.append(('geoloc_base_y', 'geoloc_base_y'))
1427
    common_fields.append(('anonymised', 'anonymised'))
1428

  
1429
    union = ' UNION ALL '.join(
1430
        ['''SELECT %s FROM %s''' % (', '.join([y[1] for y in common_fields]), x) for x in view_names]
1431
    )
1432
    cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
1519
    # XXX TODO: make me dynamic, please ?
1520
    cur.execute("""CREATE TABLE IF NOT EXISTS wcs_all_forms (
1521
        category_id integer,
1522
        formdef_id integer NOT NULL,
1523
        id integer NOT NULL,
1524
        user_id character varying,
1525
        receipt_time timestamp without time zone,
1526
        status character varying,
1527
        id_display character varying,
1528
        submission_agent_id character varying,
1529
        submission_channel character varying,
1530
        backoffice_submission boolean,
1531
        last_update_time timestamp without time zone,
1532
        digests jsonb,
1533
        user_label character varying,
1534
        concerned_roles_array text[],
1535
        actions_roles_array text[],
1536
        fts tsvector,
1537
        is_at_endpoint boolean,
1538
        formdef_name text,
1539
        user_name character varying,
1540
        criticality_level integer,
1541
        geoloc_base_x double precision,
1542
        geoloc_base_y double precision,
1543
        anonymised timestamp with time zone
1544
        , PRIMARY KEY(formdef_id, id)
1545
    )""")
1546

  
1547
    ###fake_formdef = FormDef()
1548
    ###common_fields = get_view_fields(fake_formdef)
1549
    ###common_fields.append(('concerned_roles_array', 'concerned_roles_array'))
1550
    ###common_fields.append(('actions_roles_array', 'actions_roles_array'))
1551
    ###common_fields.append(('fts', 'fts'))
1552
    ###common_fields.append(('is_at_endpoint', 'is_at_endpoint'))
1553
    ###common_fields.append(('formdef_name', 'formdef_name'))
1554
    ###common_fields.append(('user_name', 'user_name'))
1555
    ###common_fields.append(('criticality_level', 'criticality_level'))
1556
    ###common_fields.append(('geoloc_base_x', 'geoloc_base_x'))
1557
    ###common_fields.append(('geoloc_base_y', 'geoloc_base_y'))
1558
    ###common_fields.append(('anonymised', 'anonymised'))
1433 1559

  
1434 1560
    for category in wcs.categories.Category.select():
1435 1561
        name = get_name_as_sql_identifier(category.url_name)[:40]
......
2423 2549
        conn, cur = get_connection_and_cursor()
2424 2550
        if drop:
2425 2551
            cur.execute('''DROP TABLE %s_evolutions CASCADE''' % cls._table_name)
2552
            cur.execute('''DELETE FROM %s''' % cls._table_name) # force trigger execution first.
2426 2553
            cur.execute('''DROP TABLE %s CASCADE''' % cls._table_name)
2427 2554
        else:
2428 2555
            cur.execute('''DELETE FROM %s_evolutions''' % cls._table_name)
......
3588 3715
    conn, cur = get_connection_and_cursor()
3589 3716
    sql_level = get_sql_level(conn, cur)
3590 3717
    if sql_level < 0:
3591
        # fake code to help in tetsting the error code path.
3718
        # fake code to help in testing the error code path.
3592 3719
        raise RuntimeError()
3593 3720
    if sql_level < 1:  # 1: introduction of tracking_code table
3594 3721
        do_tracking_code_table()
3595
-