Projet

Général

Profil

0001-sql-lock-wcs_meta-during-schema-updates-57017.patch

Benjamin Dauvergne, 16 septembre 2021 23:14

Télécharger (18,4 ko)

Voir les différences:

Subject: [PATCH] sql: lock wcs_meta during schema updates (#57017)

 wcs/sql.py | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 114 insertions(+)
wcs/sql.py
14 14
# You should have received a copy of the GNU General Public License
15 15
# along with this program; if not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import contextvars
17 18
import copy
18 19
import datetime
20
import functools
19 21
import io
20 22
import json
21 23
import re
......
388 390
    return get_publisher().pgconn
389 391

  
390 392

  
393
def lock_wcs_meta():
394
    conn, cur = get_connection_and_cursor()
395
    do_meta_table(conn, cur, insert_current_sql_level=False)
396
    cur.execute('LOCK wcs_meta')
397

  
398

  
391 399
def cleanup_connection():
392 400
    if hasattr(get_publisher(), 'pgconn') and get_publisher().pgconn is not None:
393 401
        get_publisher().pgconn.close()
......
510 518
    return f
511 519

  
512 520

  
521
class RedoWithLock(Exception):
522
    pass
523

  
524

  
525
locked = contextvars.ContextVar('locked', default=False)
526
guard_lock_depth = contextvars.ContextVar('guard_lock_depth', default=0)
527

  
528

  
529
def lock():
530
    if not locked.get():
531
        raise RedoWithLock
532

  
533

  
534
def guard_lock(func):
535
    @functools.wraps(func)
536
    def wrapper(*args, **kwargs):
537
        depth = guard_lock_depth.get()
538
        depth_token = guard_lock_depth.set(depth + 1)
539
        try:
540
            try:
541
                return func(*args, **kwargs)
542
            except RedoWithLock:
543
                if depth > 0:
544
                    raise
545
                token = locked.set(True)
546
                try:
547
                    lock_wcs_meta()
548
                    return func(*args, **kwargs)
549
                finally:
550
                    locked.reset(token)
551
        finally:
552
            guard_lock_depth.reset(depth_token)
553

  
554
    return wrapper
555

  
556

  
513 557
@guard_postgres
558
@guard_lock
514 559
def do_formdef_tables(formdef, conn=None, cur=None, rebuild_views=False, rebuild_global_views=True):
515 560
    if formdef.id is None:
516 561
        return []
......
537 582
        (table_name,),
538 583
    )
539 584
    if cur.fetchone()[0] == 0:
585
        lock()
540 586
        cur.execute(
541 587
            '''CREATE TABLE %s (id serial PRIMARY KEY,
542 588
                                    user_id varchar,
......
601 647
    # migrations
602 648
    if 'fts' not in existing_fields:
603 649
        # full text search
650
        lock()
604 651
        cur.execute('''ALTER TABLE %s ADD COLUMN fts tsvector''' % table_name)
605 652
        cur.execute('''CREATE INDEX %s_fts ON %s USING gin(fts)''' % (table_name, table_name))
606 653

  
607 654
    if 'workflow_roles' not in existing_fields:
655
        lock()
608 656
        cur.execute('''ALTER TABLE %s ADD COLUMN workflow_roles bytea''' % table_name)
609 657
        cur.execute('''ALTER TABLE %s ADD COLUMN workflow_roles_array text[]''' % table_name)
610 658
    if 'concerned_roles_array' not in existing_fields:
659
        lock()
611 660
        cur.execute('''ALTER TABLE %s ADD COLUMN concerned_roles_array text[]''' % table_name)
612 661
    if 'actions_roles_array' not in existing_fields:
662
        lock()
613 663
        cur.execute('''ALTER TABLE %s ADD COLUMN actions_roles_array text[]''' % table_name)
614 664

  
615 665
    if 'page_no' not in existing_fields:
666
        lock()
616 667
        cur.execute('''ALTER TABLE %s ADD COLUMN page_no varchar''' % table_name)
617 668

  
618 669
    if 'anonymised' not in existing_fields:
670
        lock()
619 671
        cur.execute('''ALTER TABLE %s ADD COLUMN anonymised timestamptz''' % table_name)
620 672

  
621 673
    if 'tracking_code' not in existing_fields:
674
        lock()
622 675
        cur.execute('''ALTER TABLE %s ADD COLUMN tracking_code varchar''' % table_name)
623 676

  
624 677
    if 'backoffice_submission' not in existing_fields:
678
        lock()
625 679
        cur.execute('''ALTER TABLE %s ADD COLUMN backoffice_submission boolean''' % table_name)
626 680

  
627 681
    if 'submission_context' not in existing_fields:
682
        lock()
628 683
        cur.execute('''ALTER TABLE %s ADD COLUMN submission_context bytea''' % table_name)
629 684

  
630 685
    if 'submission_agent_id' not in existing_fields:
686
        lock()
631 687
        cur.execute('''ALTER TABLE %s ADD COLUMN submission_agent_id varchar''' % table_name)
632 688

  
633 689
    if 'submission_channel' not in existing_fields:
690
        lock()
634 691
        cur.execute('''ALTER TABLE %s ADD COLUMN submission_channel varchar''' % table_name)
635 692

  
636 693
    if 'criticality_level' not in existing_fields:
694
        lock()
637 695
        cur.execute(
638 696
            '''ALTER TABLE %s ADD COLUMN criticality_level integer NOT NULL DEFAULT(0)''' % table_name
639 697
        )
640 698

  
641 699
    if 'last_update_time' not in existing_fields:
700
        lock()
642 701
        cur.execute('''ALTER TABLE %s ADD COLUMN last_update_time timestamp''' % table_name)
643 702

  
644 703
    if 'digests' not in existing_fields:
704
        lock()
645 705
        cur.execute('''ALTER TABLE %s ADD COLUMN digests jsonb''' % table_name)
646 706

  
647 707
    if 'user_label' not in existing_fields:
708
        lock()
648 709
        cur.execute('''ALTER TABLE %s ADD COLUMN user_label varchar''' % table_name)
649 710

  
650 711
    if 'prefilling_data' not in existing_fields:
712
        lock()
651 713
        cur.execute('''ALTER TABLE %s ADD COLUMN prefilling_data bytea''' % table_name)
652 714

  
653 715
    # add new fields
......
658 720
            continue
659 721
        needed_fields.add(get_field_id(field))
660 722
        if get_field_id(field) not in existing_fields:
723
            lock()
661 724
            cur.execute('''ALTER TABLE %s ADD COLUMN %s %s''' % (table_name, get_field_id(field), sql_type))
662 725
        if field.store_display_value:
663 726
            needed_fields.add('%s_display' % get_field_id(field))
664 727
            if '%s_display' % get_field_id(field) not in existing_fields:
728
                lock()
665 729
                cur.execute(
666 730
                    '''ALTER TABLE %s ADD COLUMN %s varchar'''
667 731
                    % (table_name, '%s_display' % get_field_id(field))
......
669 733
        if field.store_structured_value:
670 734
            needed_fields.add('%s_structured' % get_field_id(field))
671 735
            if '%s_structured' % get_field_id(field) not in existing_fields:
736
                lock()
672 737
                cur.execute(
673 738
                    '''ALTER TABLE %s ADD COLUMN %s bytea'''
674 739
                    % (table_name, '%s_structured' % get_field_id(field))
......
678 743
        column_name = 'geoloc_%s' % field
679 744
        needed_fields.add(column_name)
680 745
        if column_name not in existing_fields:
746
            lock()
681 747
            cur.execute('ALTER TABLE %s ADD COLUMN %s %s' '' % (table_name, column_name, 'POINT'))
682 748

  
683 749
    # delete obsolete fields
684 750
    for field in existing_fields - needed_fields:
751
        lock()
685 752
        cur.execute('''ALTER TABLE %s DROP COLUMN %s CASCADE''' % (table_name, field))
686 753

  
687 754
    # migrations on _evolutions table
......
694 761
    )
695 762
    evo_existing_fields = {x[0] for x in cur.fetchall()}
696 763
    if 'last_jump_datetime' not in evo_existing_fields:
764
        lock()
697 765
        cur.execute('''ALTER TABLE %s_evolutions ADD COLUMN last_jump_datetime timestamp''' % table_name)
698 766

  
699 767
    if rebuild_views or len(existing_fields - needed_fields):
700 768
        # views may have been dropped when dropping columns, so we recreate
701 769
        # them even if not asked to.
770
        lock()
702 771
        redo_views(conn, cur, formdef, rebuild_global_views=rebuild_global_views)
703 772

  
704 773
    if own_conn:
......
718 787
    return actions
719 788

  
720 789

  
790
@guard_lock
721 791
def do_formdef_indexes(formdef, created, conn, cur, concurrently=False):
722 792
    table_name = get_formdef_table_name(formdef)
723 793
    evolutions_table_name = table_name + '_evolutions'
......
737 807
        create_index = 'CREATE INDEX CONCURRENTLY'
738 808

  
739 809
    if evolutions_table_name + '_fid' not in existing_indexes:
810
        lock()
740 811
        cur.execute(
741 812
            '''%s %s_fid ON %s (formdata_id)''' % (create_index, evolutions_table_name, evolutions_table_name)
742 813
        )
743 814

  
744 815
    for attr in ('receipt_time', 'anonymised', 'user_id', 'status'):
745 816
        if table_name + '_' + attr + '_idx' not in existing_indexes:
817
            lock()
746 818
            cur.execute(
747 819
                '%(create_index)s %(table_name)s_%(attr)s_idx ON %(table_name)s (%(attr)s)'
748 820
                % {'create_index': create_index, 'table_name': table_name, 'attr': attr}
......
750 822

  
751 823

  
752 824
@guard_postgres
825
@guard_lock
753 826
def do_user_table():
754 827
    conn, cur = get_connection_and_cursor()
755 828
    table_name = 'users'
......
761 834
        (table_name,),
762 835
    )
763 836
    if cur.fetchone()[0] == 0:
837
        lock()
764 838
        cur.execute(
765 839
            '''CREATE TABLE %s (id serial PRIMARY KEY,
766 840
                                    name varchar,
......
813 887
            continue
814 888
        needed_fields.add(get_field_id(field))
815 889
        if get_field_id(field) not in existing_fields:
890
            lock()
816 891
            cur.execute('''ALTER TABLE %s ADD COLUMN %s %s''' % (table_name, get_field_id(field), sql_type))
817 892
        if field.store_display_value:
818 893
            needed_fields.add('%s_display' % get_field_id(field))
819 894
            if '%s_display' % get_field_id(field) not in existing_fields:
895
                lock()
820 896
                cur.execute(
821 897
                    '''ALTER TABLE %s ADD COLUMN %s varchar'''
822 898
                    % (table_name, '%s_display' % get_field_id(field))
......
824 900
        if field.store_structured_value:
825 901
            needed_fields.add('%s_structured' % get_field_id(field))
826 902
            if '%s_structured' % get_field_id(field) not in existing_fields:
903
                lock()
827 904
                cur.execute(
828 905
                    '''ALTER TABLE %s ADD COLUMN %s bytea'''
829 906
                    % (table_name, '%s_structured' % get_field_id(field))
......
832 909
    # migrations
833 910
    if 'fts' not in existing_fields:
834 911
        # full text search
912
        lock()
835 913
        cur.execute('''ALTER TABLE %s ADD COLUMN fts tsvector''' % table_name)
836 914
        cur.execute('''CREATE INDEX %s_fts ON %s USING gin(fts)''' % (table_name, table_name))
837 915

  
838 916
    if 'verified_fields' not in existing_fields:
917
        lock()
839 918
        cur.execute('ALTER TABLE %s ADD COLUMN verified_fields text[]' % table_name)
840 919

  
841 920
    if 'ascii_name' not in existing_fields:
921
        lock()
842 922
        cur.execute('ALTER TABLE %s ADD COLUMN ascii_name varchar' % table_name)
843 923

  
844 924
    if 'deleted_timestamp' not in existing_fields:
925
        lock()
845 926
        cur.execute('ALTER TABLE %s ADD COLUMN deleted_timestamp timestamp' % table_name)
846 927

  
847 928
    if 'is_active' not in existing_fields:
929
        lock()
848 930
        cur.execute('ALTER TABLE %s ADD COLUMN is_active bool DEFAULT TRUE' % table_name)
849 931
        cur.execute('UPDATE %s SET is_active = FALSE WHERE deleted_timestamp IS NOT NULL' % table_name)
850 932

  
851 933
    # delete obsolete fields
852 934
    for field in existing_fields - needed_fields:
935
        lock()
853 936
        cur.execute('''ALTER TABLE %s DROP COLUMN %s''' % (table_name, field))
854 937

  
855 938
    conn.commit()
856 939

  
857 940
    try:
941
        lock()
858 942
        cur.execute('CREATE INDEX users_name_idx ON users (name)')
859 943
        conn.commit()
860 944
    except psycopg2.ProgrammingError:
......
863 947
    cur.close()
864 948

  
865 949

  
950
@guard_lock
866 951
def do_role_table(concurrently=False):
867 952
    conn, cur = get_connection_and_cursor()
868 953
    table_name = 'roles'
......
874 959
        (table_name,),
875 960
    )
876 961
    if cur.fetchone()[0] == 0:
962
        lock()
877 963
        cur.execute(
878 964
            '''CREATE TABLE %s (id VARCHAR PRIMARY KEY,
879 965
                                name VARCHAR,
......
899 985

  
900 986
    # delete obsolete fields
901 987
    for field in existing_fields - needed_fields:
988
        lock()
902 989
        cur.execute('''ALTER TABLE %s DROP COLUMN %s''' % (table_name, field))
903 990

  
904 991
    conn.commit()
......
913 1000
        role.store()
914 1001

  
915 1002

  
1003
@guard_lock
916 1004
def do_tracking_code_table():
917 1005
    conn, cur = get_connection_and_cursor()
918 1006
    table_name = 'tracking_codes'
......
924 1012
        (table_name,),
925 1013
    )
926 1014
    if cur.fetchone()[0] == 0:
1015
        lock()
927 1016
        cur.execute(
928 1017
            '''CREATE TABLE %s (id varchar PRIMARY KEY,
929 1018
                                    formdef_id varchar,
......
942 1031

  
943 1032
    # delete obsolete fields
944 1033
    for field in existing_fields - needed_fields:
1034
        lock()
945 1035
        cur.execute('''ALTER TABLE %s DROP COLUMN %s''' % (table_name, field))
946 1036

  
947 1037
    conn.commit()
948 1038
    cur.close()
949 1039

  
950 1040

  
1041
@guard_lock
951 1042
def do_session_table():
952 1043
    conn, cur = get_connection_and_cursor()
953 1044
    table_name = 'sessions'
......
959 1050
        (table_name,),
960 1051
    )
961 1052
    if cur.fetchone()[0] == 0:
1053
        lock()
962 1054
        cur.execute(
963 1055
            '''CREATE TABLE %s (id varchar PRIMARY KEY,
964 1056
                                        session_data bytea,
......
979 1071

  
980 1072
    # migrations
981 1073
    if 'last_update_time' not in existing_fields:
1074
        lock()
982 1075
        cur.execute('''ALTER TABLE %s ADD COLUMN last_update_time timestamp DEFAULT NOW()''' % table_name)
983 1076
        cur.execute('''CREATE INDEX %s_ts ON %s (last_update_time)''' % (table_name, table_name))
984 1077

  
985 1078
    # delete obsolete fields
986 1079
    for field in existing_fields - needed_fields:
1080
        lock()
987 1081
        cur.execute('''ALTER TABLE %s DROP COLUMN %s''' % (table_name, field))
988 1082

  
989 1083
    conn.commit()
990 1084
    cur.close()
991 1085

  
992 1086

  
1087
@guard_lock
993 1088
def do_custom_views_table():
994 1089
    conn, cur = get_connection_and_cursor()
995 1090
    table_name = 'custom_views'
......
1001 1096
        (table_name,),
1002 1097
    )
1003 1098
    if cur.fetchone()[0] == 0:
1099
        lock()
1004 1100
        cur.execute(
1005 1101
            '''CREATE TABLE %s (id varchar PRIMARY KEY,
1006 1102
                                        title varchar,
......
1028 1124

  
1029 1125
    # migrations
1030 1126
    if 'is_default' not in existing_fields:
1127
        lock()
1031 1128
        cur.execute('''ALTER TABLE %s ADD COLUMN is_default boolean DEFAULT FALSE''' % table_name)
1032 1129

  
1033 1130
    # delete obsolete fields
1034 1131
    for field in existing_fields - needed_fields:
1132
        lock()
1035 1133
        cur.execute('''ALTER TABLE %s DROP COLUMN %s''' % (table_name, field))
1036 1134

  
1037 1135
    conn.commit()
1038 1136
    cur.close()
1039 1137

  
1040 1138

  
1139
@guard_lock
1041 1140
def do_snapshots_table():
1042 1141
    conn, cur = get_connection_and_cursor()
1043 1142
    table_name = 'snapshots'
......
1049 1148
        (table_name,),
1050 1149
    )
1051 1150
    if cur.fetchone()[0] == 0:
1151
        lock()
1052 1152
        cur.execute(
1053 1153
            '''CREATE TABLE %s (id SERIAL,
1054 1154
                                        object_type VARCHAR,
......
1073 1173

  
1074 1174
    # delete obsolete fields
1075 1175
    for field in existing_fields - needed_fields:
1176
        lock()
1076 1177
        cur.execute('''ALTER TABLE %s DROP COLUMN %s''' % (table_name, field))
1077 1178

  
1078 1179
    conn.commit()
1079 1180
    cur.close()
1080 1181

  
1081 1182

  
1183
@guard_lock
1082 1184
def do_loggederrors_table(concurrently=False):
1083 1185
    conn, cur = get_connection_and_cursor()
1084 1186
    table_name = 'loggederrors'
......
1090 1192
        (table_name,),
1091 1193
    )
1092 1194
    if cur.fetchone()[0] == 0:
1195
        lock()
1093 1196
        cur.execute(
1094 1197
            '''CREATE TABLE %s (id SERIAL PRIMARY KEY,
1095 1198
                                        tech_id VARCHAR UNIQUE,
......
1123 1226

  
1124 1227
    # delete obsolete fields
1125 1228
    for field in existing_fields - needed_fields:
1229
        lock()
1126 1230
        cur.execute('''ALTER TABLE %s DROP COLUMN %s''' % (table_name, field))
1127 1231

  
1128 1232
    create_index = 'CREATE INDEX'
......
1142 1246

  
1143 1247
    for attr in ('formdef_id', 'workflow_id'):
1144 1248
        if table_name + '_' + attr + '_idx' not in existing_indexes:
1249
            lock()
1145 1250
            cur.execute(
1146 1251
                '%(create_index)s %(table_name)s_%(attr)s_idx ON %(table_name)s (%(attr)s)'
1147 1252
                % {'create_index': create_index, 'table_name': table_name, 'attr': attr}
......
1198 1303

  
1199 1304

  
1200 1305
@guard_postgres
1306
@guard_lock
1201 1307
def drop_views(formdef, conn, cur):
1202 1308
    # remove the global views
1203 1309
    drop_global_views(conn, cur)
......
1229 1335
        view_names.append(row[0])
1230 1336

  
1231 1337
    for view_name in view_names:
1338
        lock()
1232 1339
        cur.execute('''DROP VIEW IF EXISTS %s''' % view_name)
1233 1340

  
1234 1341

  
......
1254 1361

  
1255 1362

  
1256 1363
@guard_postgres
1364
@guard_lock
1257 1365
def do_views(formdef, conn, cur, rebuild_global_views=True):
1258 1366
    # create new view
1259 1367
    table_name = get_formdef_table_name(formdef)
......
1338 1446

  
1339 1447
    fields_list = ', '.join(['%s AS %s' % (force_text(x), force_text(y)) for (x, y) in view_fields])
1340 1448

  
1449
    lock()
1341 1450
    cur.execute('''CREATE VIEW %s AS SELECT %s FROM %s''' % (view_name, fields_list, table_name))
1342 1451

  
1343 1452
    if rebuild_global_views:
1344 1453
        do_global_views(conn, cur)  # recreate global views
1345 1454

  
1346 1455

  
1456
@guard_lock
1347 1457
def drop_global_views(conn, cur):
1458
    lock()
1348 1459
    cur.execute(
1349 1460
        '''SELECT table_name FROM information_schema.views
1350 1461
                    WHERE table_schema = 'public'
......
1364 1475
    cur.execute('''DROP VIEW IF EXISTS wcs_all_forms''')
1365 1476

  
1366 1477

  
1478
@guard_lock
1367 1479
def do_global_views(conn, cur):
1368 1480
    # recreate global views
1369 1481
    from wcs.formdef import FormDef
......
1387 1499
    if not view_names:
1388 1500
        return
1389 1501

  
1502
    lock()
1390 1503
    cur.execute('''DROP VIEW IF EXISTS wcs_all_forms CASCADE''')
1391 1504

  
1392 1505
    fake_formdef = FormDef()
......
3510 3623

  
3511 3624

  
3512 3625
@guard_postgres
3626
@guard_lock
3513 3627
def migrate():
3514 3628
    conn, cur = get_connection_and_cursor()
3515 3629
    sql_level = get_sql_level(conn, cur)
3516
-