Projet

Général

Profil

0001-First-version-of-wcs_all_forms-as-a-table-maintained.patch

Pierre Ducroquet, 17 janvier 2022 09:04

Télécharger (10,9 ko)

Voir les différences:

Subject: [PATCH] First version of wcs_all_forms as a table maintained by
 triggers (#60552)

Instead of a view, wcs_all_forms is now a table.
Direct benefits:
- we will no longer explode the collapse limit of the PG optimizer
- since we won't read every table, we can be **much** faster
(we aren't yet, I have not done the indexes so far)
- the optimizer will have less work, thus it will be faster
(I said it already)
 tests/test_sql.py |   1 +
 wcs/sql.py        | 179 ++++++++++++++++++++++++++++++++++++++++------
 2 files changed, 158 insertions(+), 22 deletions(-)
tests/test_sql.py
1338 1338
    for table_name in table_names:
1339 1339
        if table_name.startswith('formdata_'):
1340 1340
            cur.execute('DROP TABLE %s CASCADE' % table_name)
1341
    cur.execute('TRUNCATE wcs_all_forms;')
1341 1342

  
1342 1343

  
1343 1344
def test_is_at_endpoint(pub):
wcs/sql.py
431 431
    formdef.store(object_only=True)
432 432
    return formdef.table_name
433 433

  
434
def get_formdef_trigger_function_name(formdef):
435
    assert formdef.id is not None
436
    return '%s_%s_trigger_fn' % (formdef.data_sql_prefix, formdef.id)
437

  
438
def get_formdef_trigger_name(formdef):
439
    assert formdef.id is not None
440
    return '%s_%s_trigger' % (formdef.data_sql_prefix, formdef.id)
434 441

  
435 442
def get_formdef_new_id(id_start):
436 443
    new_id = id_start
......
478 485
    )
479 486
    for table_name in [x[0] for x in cur.fetchall()]:
480 487
        cur.execute('''DROP TABLE %s CASCADE''' % table_name)
488
    cur.execute('TRUNCATE wcs_all_forms;')
481 489
    conn.commit()
482 490
    cur.close()
483 491

  
......
689 697
    for field in existing_fields - needed_fields:
690 698
        cur.execute('''ALTER TABLE %s DROP COLUMN %s CASCADE''' % (table_name, field))
691 699

  
700

  
701
    # recreate the trigger function, just so it's uptodate
702
    category_value = formdef.category_id
703
    geoloc_base_x_query = "NULL"
704
    geoloc_base_y_query = "NULL"
705
    if formdef.geolocations and 'base' in formdef.geolocations:
706
        # default geolocation is in the 'base' key; we have to unstructure the
707
        # field is the POINT type of postgresql cannot be used directly as it
708
        # doesn't have an equality operator.
709
        geoloc_base_x_query = "NEW.geoloc_base[0]"
710
        geoloc_base_y_query = "NEW.geoloc_base[1]"
711
    if formdef.category_id is None:
712
        category_value = "NULL"
713
    criticality_levels = len(formdef.workflow.criticality_levels or [0])
714
    endpoint_status = formdef.workflow.get_endpoint_status()
715
    endpoint_status_filter = ", ".join(["'wf-%s'" % x.id for x in endpoint_status])
716
    if endpoint_status_filter == "":
717
        # not the prettiest in town, but will do fine for now.
718
        endpoint_status_filter = "'xxxx'"
719
    cur.execute('''
720
CREATE OR REPLACE FUNCTION {trg_fn_name}()
721
RETURNS trigger
722
LANGUAGE plpgsql
723
AS $$
724
BEGIN
725
    -- TODO : sync back from users change !
726
    IF TG_OP = 'DELETE' THEN
727
        DELETE FROM wcs_all_forms WHERE formdef_id = {formdef_id} AND id = OLD.id;
728
        RETURN OLD;
729
    ELSEIF TG_OP = 'INSERT' THEN
730
        INSERT INTO wcs_all_forms VALUES (
731
            {category_id},
732
            {formdef_id},
733
            NEW.id,
734
            NEW.user_id,
735
            NEW.receipt_time,
736
            NEW.status,
737
            NEW.id_display,
738
            NEW.submission_agent_id,
739
            NEW.submission_channel,
740
            NEW.backoffice_submission,
741
            NEW.last_update_time,
742
            NEW.digests,
743
            NEW.user_label,
744
            NEW.concerned_roles_array,
745
            NEW.actions_roles_array,
746
            NEW.fts,
747
            NEW.status IN ({endpoint_status}),
748
            {formdef_name},
749
            (SELECT name FROM users WHERE users.id = CAST(NEW.user_id AS INTEGER)),
750
            NEW.criticality_level - {criticality_levels},
751
            {geoloc_base_x},
752
            {geoloc_base_y},
753
            NEW.anonymised);
754
        RETURN NEW;
755
    ELSE
756
        UPDATE wcs_all_forms SET
757
                user_id = NEW.user_id,
758
                receipt_time = NEW.receipt_time,
759
                status = NEW.status,
760
                id_display = NEW.id_display,
761
                submission_agent_id = NEW.submission_agent_id,
762
                submission_channel = NEW.submission_channel,
763
                backoffice_submission = NEW.backoffice_submission,
764
                last_update_time = NEW.last_update_time,
765
                digests = NEW.digests,
766
                user_label = NEW.user_label,
767
                concerned_roles_array = NEW.concerned_roles_array,
768
                actions_roles_array = NEW.actions_roles_array,
769
                fts = NEW.fts,
770
                is_at_endpoint = NEW.status IN ({endpoint_status}),
771
                formdef_name = {formdef_name},
772
                user_name = (SELECT name FROM users WHERE users.id = CAST(NEW.user_id AS INTEGER)),
773
                criticality_level = NEW.criticality_level - {criticality_levels},
774
                geoloc_base_x = {geoloc_base_x},
775
                geoloc_base_y = {geoloc_base_y},
776
                anonymised = NEW.anonymised
777
            WHERE formdef_id = {formdef_id}  AND id = OLD.id;
778
        RETURN NEW;
779
    END IF;
780
END;
781
$$;
782
    '''.format(trg_fn_name = get_formdef_trigger_function_name(formdef),
783
               category_id = category_value,                   # always valued ? need to handle null otherwise.
784
               formdef_id = formdef.id,
785
               geoloc_base_x = geoloc_base_x_query,
786
               geoloc_base_y = geoloc_base_y_query,
787
               formdef_name = psycopg2.extensions.QuotedString(formdef.name).getquoted().decode(),
788
               criticality_levels = criticality_levels,
789
               endpoint_status = endpoint_status_filter
790
        ))
791

  
792
    trg_name = get_formdef_trigger_name(formdef)
793
    cur.execute(
794
        '''SELECT 1 FROM pg_trigger
795
            WHERE tgrelid = '%s'::regclass
796
              AND tgname = '%s'
797
        ''' % (table_name, trg_name))
798
    if len(cur.fetchall()) == 0:
799
        cur.execute(
800
            '''CREATE TRIGGER {trg_name} AFTER INSERT OR UPDATE OR DELETE
801
                ON {table_name}
802
                FOR EACH ROW EXECUTE FUNCTION {trg_fn_name}();
803
                '''.format(trg_fn_name = get_formdef_trigger_function_name(formdef),
804
                           table_name = table_name,
805
                           trg_name = trg_name))
806

  
692 807
    # migrations on _evolutions table
693 808
    cur.execute(
694 809
        '''SELECT column_name FROM information_schema.columns
......
1385 1500
    for view_name in view_names:
1386 1501
        cur.execute('''DROP VIEW IF EXISTS %s''' % view_name)
1387 1502

  
1388
    cur.execute('''DROP VIEW IF EXISTS wcs_all_forms''')
1389

  
1390 1503

  
1391 1504
def do_global_views(conn, cur):
1392 1505
    # recreate global views
......
1411 1524
    if not view_names:
1412 1525
        return
1413 1526

  
1414
    cur.execute('''DROP VIEW IF EXISTS wcs_all_forms CASCADE''')
1415

  
1416
    fake_formdef = FormDef()
1417
    common_fields = get_view_fields(fake_formdef)
1418
    common_fields.append(('concerned_roles_array', 'concerned_roles_array'))
1419
    common_fields.append(('actions_roles_array', 'actions_roles_array'))
1420
    common_fields.append(('fts', 'fts'))
1421
    common_fields.append(('is_at_endpoint', 'is_at_endpoint'))
1422
    common_fields.append(('formdef_name', 'formdef_name'))
1423
    common_fields.append(('user_name', 'user_name'))
1424
    common_fields.append(('criticality_level', 'criticality_level'))
1425
    common_fields.append(('geoloc_base_x', 'geoloc_base_x'))
1426
    common_fields.append(('geoloc_base_y', 'geoloc_base_y'))
1427
    common_fields.append(('anonymised', 'anonymised'))
1428

  
1429
    union = ' UNION ALL '.join(
1430
        ['''SELECT %s FROM %s''' % (', '.join([y[1] for y in common_fields]), x) for x in view_names]
1431
    )
1432
    cur.execute('''CREATE VIEW wcs_all_forms AS %s''' % union)
1527
    # XXX TODO: make me dynamic, please ?
1528
    cur.execute("""CREATE TABLE IF NOT EXISTS wcs_all_forms (
1529
        category_id integer,
1530
        formdef_id integer NOT NULL,
1531
        id integer NOT NULL,
1532
        user_id character varying,
1533
        receipt_time timestamp without time zone,
1534
        status character varying,
1535
        id_display character varying,
1536
        submission_agent_id character varying,
1537
        submission_channel character varying,
1538
        backoffice_submission boolean,
1539
        last_update_time timestamp without time zone,
1540
        digests jsonb,
1541
        user_label character varying,
1542
        concerned_roles_array text[],
1543
        actions_roles_array text[],
1544
        fts tsvector,
1545
        is_at_endpoint boolean,
1546
        formdef_name text,
1547
        user_name character varying,
1548
        criticality_level integer,
1549
        geoloc_base_x double precision,
1550
        geoloc_base_y double precision,
1551
        anonymised timestamp with time zone
1552
        , PRIMARY KEY(formdef_id, id)
1553
    )""")
1554

  
1555
    ###fake_formdef = FormDef()
1556
    ###common_fields = get_view_fields(fake_formdef)
1557
    ###common_fields.append(('concerned_roles_array', 'concerned_roles_array'))
1558
    ###common_fields.append(('actions_roles_array', 'actions_roles_array'))
1559
    ###common_fields.append(('fts', 'fts'))
1560
    ###common_fields.append(('is_at_endpoint', 'is_at_endpoint'))
1561
    ###common_fields.append(('formdef_name', 'formdef_name'))
1562
    ###common_fields.append(('user_name', 'user_name'))
1563
    ###common_fields.append(('criticality_level', 'criticality_level'))
1564
    ###common_fields.append(('geoloc_base_x', 'geoloc_base_x'))
1565
    ###common_fields.append(('geoloc_base_y', 'geoloc_base_y'))
1566
    ###common_fields.append(('anonymised', 'anonymised'))
1433 1567

  
1434 1568
    for category in wcs.categories.Category.select():
1435 1569
        name = get_name_as_sql_identifier(category.url_name)[:40]
......
2423 2557
        conn, cur = get_connection_and_cursor()
2424 2558
        if drop:
2425 2559
            cur.execute('''DROP TABLE %s_evolutions CASCADE''' % cls._table_name)
2560
            cur.execute('''DELETE FROM %s''' % cls._table_name) # force trigger execution first.
2426 2561
            cur.execute('''DROP TABLE %s CASCADE''' % cls._table_name)
2427 2562
        else:
2428 2563
            cur.execute('''DELETE FROM %s_evolutions''' % cls._table_name)
......
3588 3723
    conn, cur = get_connection_and_cursor()
3589 3724
    sql_level = get_sql_level(conn, cur)
3590 3725
    if sql_level < 0:
3591
        # fake code to help in tetsting the error code path.
3726
        # fake code to help in testing the error code path.
3592 3727
        raise RuntimeError()
3593 3728
    if sql_level < 1:  # 1: introduction of tracking_code table
3594 3729
        do_tracking_code_table()
3595
-