Projet

Général

Profil

0001-api-add-filter-operators-on-formdata-listing-60785.patch

Lauréline Guérin, 27 janvier 2022 09:06

Télécharger (39,6 ko)

Voir les différences:

Subject: [PATCH] api: add filter operators on formdata listing (#60785)

 tests/api/test_formdata.py   | 632 ++++++++++++++++++++++++++++++++++-
 wcs/backoffice/management.py |  97 ++++--
 wcs/qommon/storage.py        |  22 +-
 wcs/sql.py                   |  70 +++-
 4 files changed, 767 insertions(+), 54 deletions(-)
tests/api/test_formdata.py
766 766
    get_app(pub).get(sign_uri('/api/forms/test/list?filter=all&limit=plop', user=local_user), status=400)
767 767

  
768 768

  
769
def test_api_list_formdata_string_filter(pub, local_user):
770
    pub.role_class.wipe()
771
    role = pub.role_class(name='test')
772
    role.store()
773

  
774
    local_user.roles = [role.id]
775
    local_user.store()
776

  
777
    FormDef.wipe()
778
    formdef = FormDef()
779
    formdef.name = 'test'
780
    formdef.workflow_roles = {'_receiver': role.id}
781
    formdef.fields = [
782
        fields.StringField(id='0', label='String', type='string', varname='string'),
783
        fields.StringField(id='1', label='String2', type='string', varname='string2'),
784
    ]
785
    formdef.store()
786

  
787
    data_class = formdef.data_class()
788
    data_class.wipe()
789

  
790
    for i in range(3):
791
        formdata = data_class()
792
        formdata.data = {
793
            '0': 'FOO %s' % i,
794
            '1': '%s' % (9 + i),
795
        }
796
        formdata.user_id = local_user.id
797
        formdata.just_created()
798
        formdata.jump_status('new')
799
        formdata.store()
800

  
801
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-string=FOO 2', user=local_user))
802
    assert len(resp.json) == 1
803
    params = [
804
        ('eq', 'FOO 2', 1),
805
        ('ne', 'FOO 2', 2),
806
        ('lt', 'FOO 2', 2),
807
        ('lte', 'FOO 2', 3),
808
        ('gt', 'FOO 2', 0),
809
        ('gt', '42', 0),
810
        ('gte', 'FOO 2', 1),
811
    ]
812
    for operator, value, result in params:
813
        resp = get_app(pub).get(
814
            sign_uri(
815
                '/api/forms/test/list?filter-string=%s&filter-string-operator=%s' % (value, operator),
816
                user=local_user,
817
            )
818
        )
819
        assert len(resp.json) == result
820

  
821
    params = [
822
        ('eq', '10', 1),
823
        ('eq', '010', 1),
824
        ('ne', '10', 2),
825
        ('lt', '10', 1),
826
        ('lte', '10', 2),
827
        ('gt', '10', 1),
828
        ('gt', '9', 2),
829
        ('gte', '10', 2),
830
    ]
831
    for operator, value, result in params:
832
        resp = get_app(pub).get(
833
            sign_uri(
834
                '/api/forms/test/list?filter-string2=%s&filter-string2-operator=%s' % (value, operator),
835
                user=local_user,
836
            )
837
        )
838
        assert len(resp.json) == result
839

  
840

  
841
def test_api_list_formdata_item_filter(pub, local_user):
842
    pub.role_class.wipe()
843
    role = pub.role_class(name='test')
844
    role.store()
845

  
846
    local_user.roles = [role.id]
847
    local_user.store()
848

  
849
    NamedDataSource.wipe()
850
    data_source = NamedDataSource(name='foobar')
851
    data_source.data_source = {
852
        'type': 'formula',
853
        'value': repr([{'id': '9', 'text': 'foo'}, {'id': '10', 'text': 'bar'}, {'id': '11', 'text': 'baz'}]),
854
    }
855
    data_source.store()
856

  
857
    FormDef.wipe()
858
    formdef = FormDef()
859
    formdef.name = 'test'
860
    formdef.workflow_roles = {'_receiver': role.id}
861
    formdef.fields = [
862
        fields.ItemField(id='0', label='Item', type='item', data_source={'type': 'foobar'}, varname='item'),
863
        fields.ItemField(id='1', label='Other Item', type='item', items=['foo', 'bar'], varname='item2'),
864
    ]
865
    formdef.store()
866

  
867
    data_class = formdef.data_class()
868
    data_class.wipe()
869

  
870
    for i in range(3):
871
        formdata = data_class()
872
        formdata.data = {
873
            '0': str(9 + i),
874
            '1': 'foo' if i % 2 else 'bar',
875
        }
876
        formdata.user_id = local_user.id
877
        formdata.just_created()
878
        formdata.jump_status('new')
879
        formdata.store()
880

  
881
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-item=9', user=local_user))
882
    assert len(resp.json) == 1
883
    params = [
884
        ('eq', '10', 1),
885
        ('eq', '010', 1),
886
        ('ne', '10', 2),
887
        ('lt', '10', 1),
888
        ('lte', '10', 2),
889
        ('gt', '10', 1),
890
        ('gt', '9', 2),
891
        ('gte', '10', 2),
892
    ]
893
    for operator, value, result in params:
894
        resp = get_app(pub).get(
895
            sign_uri(
896
                '/api/forms/test/list?filter-item=%s&filter-item-operator=%s' % (value, operator),
897
                user=local_user,
898
            )
899
        )
900
        assert len(resp.json) == result
901

  
902
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-item2=foo', user=local_user))
903
    assert len(resp.json) == 1
904
    params = [
905
        ('eq', 'foo', 1),
906
        ('ne', 'foo', 2),
907
        ('lt', 'foo', 2),
908
        ('lte', 'foo', 3),
909
        ('gt', 'foo', 0),
910
        ('gt', '42', 0),
911
        ('gte', 'foo', 1),
912
    ]
913
    for operator, value, result in params:
914
        resp = get_app(pub).get(
915
            sign_uri(
916
                '/api/forms/test/list?filter-item2=%s&filter-item2-operator=%s' % (value, operator),
917
                user=local_user,
918
            )
919
        )
920
        assert len(resp.json) == result
921

  
922

  
923
def test_api_list_formdata_items_filter(pub, local_user):
924
    pub.role_class.wipe()
925
    role = pub.role_class(name='test')
926
    role.store()
927

  
928
    local_user.roles = [role.id]
929
    local_user.store()
930

  
931
    NamedDataSource.wipe()
932
    data_source = NamedDataSource(name='foobar')
933
    data_source.data_source = {
934
        'type': 'formula',
935
        'value': repr([{'id': '9', 'text': 'foo'}, {'id': '10', 'text': 'bar'}, {'id': '11', 'text': 'baz'}]),
936
    }
937
    data_source.store()
938

  
939
    FormDef.wipe()
940
    formdef = FormDef()
941
    formdef.name = 'test'
942
    formdef.workflow_roles = {'_receiver': role.id}
943
    formdef.fields = [
944
        fields.ItemsField(
945
            id='0', label='Items', type='items', data_source={'type': 'foobar'}, varname='items'
946
        ),
947
        fields.ItemsField(
948
            id='1', label='Other Item', type='items', items=['foo', 'bar', 'baz'], varname='items2'
949
        ),
950
    ]
951
    formdef.store()
952

  
953
    data_class = formdef.data_class()
954
    data_class.wipe()
955

  
956
    for i in range(4):
957
        formdata = data_class()
958
        formdata.data = {}
959
        if i < 3:
960
            formdata.data = {
961
                '0': ['9' if i % 2 else '11', '10'],
962
                '1': ['foo' if i % 2 else 'bar', 'baz'],
963
            }
964
        formdata.user_id = local_user.id
965
        formdata.just_created()
966
        formdata.jump_status('new')
967
        formdata.store()
968

  
969
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-items=11', user=local_user))
970
    assert len(resp.json) == 2
971
    params = [
972
        ('eq', '11', 2),
973
        ('eq', '011', 2),
974
        ('eq', '10', 3),
975
        ('ne', '9', 3),
976
        ('ne', '10', 1),
977
        ('lt', '10', 1),
978
        ('lte', '10', 3),
979
        ('gt', '10', 2),
980
        ('gt', '9', 3),
981
        ('gte', '11', 2),
982
    ]
983
    for operator, value, result in params:
984
        resp = get_app(pub).get(
985
            sign_uri(
986
                '/api/forms/test/list?filter-items=%s&filter-items-operator=%s' % (value, operator),
987
                user=local_user,
988
            )
989
        )
990
        assert len(resp.json) == result
991

  
992
    params = [
993
        ('eq', 'foo', 1),
994
        ('ne', 'foo', 3),
995
        ('lt', 'foo', 3),
996
        ('lte', 'foo', 3),
997
        ('gt', 'foo', 0),
998
        ('gt', '42', 0),
999
        ('gte', 'foo', 1),
1000
    ]
1001
    for operator, value, result in params:
1002
        resp = get_app(pub).get(
1003
            sign_uri(
1004
                '/api/forms/test/list?filter-items2=%s&filter-items2-operator=%s' % (value, operator),
1005
                user=local_user,
1006
            )
1007
        )
1008
        assert len(resp.json) == result
1009

  
1010

  
1011
def test_api_list_formdata_bool_filter(pub, local_user):
1012
    pub.role_class.wipe()
1013
    role = pub.role_class(name='test')
1014
    role.store()
1015

  
1016
    local_user.roles = [role.id]
1017
    local_user.store()
1018

  
1019
    FormDef.wipe()
1020
    formdef = FormDef()
1021
    formdef.name = 'test'
1022
    formdef.workflow_roles = {'_receiver': role.id}
1023
    formdef.fields = [
1024
        fields.BoolField(id='0', label='Bool', type='bool', varname='bool'),
1025
    ]
1026
    formdef.store()
1027

  
1028
    data_class = formdef.data_class()
1029
    data_class.wipe()
1030

  
1031
    for i in range(3):
1032
        formdata = data_class()
1033
        formdata.data = {
1034
            '0': bool(i % 2),
1035
        }
1036
        formdata.user_id = local_user.id
1037
        formdata.just_created()
1038
        formdata.jump_status('new')
1039
        formdata.store()
1040

  
1041
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-bool=false', user=local_user))
1042
    assert len(resp.json) == 2
1043
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-bool=true', user=local_user))
1044
    assert len(resp.json) == 1
1045
    params = [
1046
        ('eq', 'true', 1),
1047
        ('ne', 'true', 2),
1048
    ]
1049
    for operator, value, result in params:
1050
        resp = get_app(pub).get(
1051
            sign_uri(
1052
                '/api/forms/test/list?filter-bool=%s&filter-bool-operator=%s' % (value, operator),
1053
                user=local_user,
1054
            )
1055
        )
1056
        assert len(resp.json) == result
1057
    for operator in ['lt', 'lte', 'gt', 'gte']:
1058
        resp = get_app(pub).get(
1059
            sign_uri(
1060
                '/api/forms/test/list?filter-bool=true&filter-bool-operator=%s' % operator, user=local_user
1061
            ),
1062
            status=400,
1063
        )
1064
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-bool"' % operator
1065

  
1066

  
769 1067
def test_api_list_formdata_date_filter(pub, local_user):
770 1068
    if not pub.is_using_postgresql():
771 1069
        pytest.skip('this requires SQL')
......
782 1080
    formdef.name = 'test'
783 1081
    formdef.workflow_roles = {'_receiver': role.id}
784 1082
    formdef.fields = [
785
        fields.DateField(id='0', label='foobar', varname='foobar', type='date'),
1083
        fields.DateField(id='0', label='Date', varname='date', type='date'),
786 1084
    ]
787 1085
    formdef.store()
788 1086

  
789 1087
    data_class = formdef.data_class()
790 1088
    data_class.wipe()
791 1089

  
792
    for i in range(30):
1090
    for i in range(3):
793 1091
        formdata = data_class()
794
        formdata.data = {'0': time.strptime('2021-06-%02d' % (i + 1), '%Y-%m-%d')}
1092
        formdata.data = {'0': time.strptime('2021-06-%02d' % (i + 10), '%Y-%m-%d')}
795 1093
        formdata.user_id = local_user.id
796 1094
        formdata.just_created()
797 1095
        formdata.jump_status('new')
798 1096
        formdata.store()
799 1097

  
800
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar=2021-06-12', user=local_user))
801
    assert len(resp.json) == 1
1098
    for value in ['2021-06-11', '11/06/2021']:
1099
        resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-date=%s' % value, user=local_user))
1100
        assert len(resp.json) == 1
1101
        params = [
1102
            ('eq', 1),
1103
            ('ne', 2),
1104
            ('lt', 1),
1105
            ('lte', 2),
1106
            ('gt', 1),
1107
            ('gte', 2),
1108
        ]
1109
        for operator, result in params:
1110
            resp = get_app(pub).get(
1111
                sign_uri(
1112
                    '/api/forms/test/list?filter-date=%s&filter-date-operator=%s' % (value, operator),
1113
                    user=local_user,
1114
                )
1115
            )
1116
            assert len(resp.json) == result
1117

  
1118

  
1119
def test_api_list_formdata_email_filter(pub, local_user):
1120
    pub.role_class.wipe()
1121
    role = pub.role_class(name='test')
1122
    role.store()
1123

  
1124
    local_user.roles = [role.id]
1125
    local_user.store()
1126

  
1127
    FormDef.wipe()
1128
    formdef = FormDef()
1129
    formdef.name = 'test'
1130
    formdef.workflow_roles = {'_receiver': role.id}
1131
    formdef.fields = [
1132
        fields.EmailField(id='0', label='Email', type='email', varname='email'),
1133
    ]
1134
    formdef.store()
1135

  
1136
    data_class = formdef.data_class()
1137
    data_class.wipe()
802 1138

  
803
    # alternate date format
804
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar=12/06/2021', user=local_user))
1139
    for i in range(3):
1140
        formdata = data_class()
1141
        formdata.data = {'0': 'a@localhost' if i % 2 else 'b@localhost'}
1142
        formdata.user_id = local_user.id
1143
        formdata.just_created()
1144
        formdata.jump_status('new')
1145
        formdata.store()
1146

  
1147
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-email=a@localhost', user=local_user))
805 1148
    assert len(resp.json) == 1
1149
    params = [
1150
        ('eq', 'a@localhost', 1),
1151
        ('ne', 'a@localhost', 2),
1152
    ]
1153
    for operator, value, result in params:
1154
        resp = get_app(pub).get(
1155
            sign_uri(
1156
                '/api/forms/test/list?filter-email=%s&filter-email-operator=%s' % (value, operator),
1157
                user=local_user,
1158
            )
1159
        )
1160
        assert len(resp.json) == result
1161
    for operator in ['lt', 'lte', 'gt', 'gte']:
1162
        resp = get_app(pub).get(
1163
            sign_uri(
1164
                '/api/forms/test/list?filter-email=a@localhost&filter-email-operator=%s' % operator,
1165
                user=local_user,
1166
            ),
1167
            status=400,
1168
        )
1169
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-email"' % operator
806 1170

  
807 1171

  
808 1172
def test_api_list_formdata_internal_id_filter(pub, local_user):
......
823 1187
    data_class = formdef.data_class()
824 1188
    data_class.wipe()
825 1189

  
826
    for i in range(2):
1190
    for i in range(11):
827 1191
        formdata = data_class()
828 1192
        formdata.data = {}
829 1193
        formdata.user_id = local_user.id
......
838 1202
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-internal-id=42', user=local_user))
839 1203
    assert len(resp.json) == 0
840 1204

  
1205
    params = [
1206
        ('eq', '1', 1),
1207
        ('eq', '01', 1),
1208
        ('ne', '1', 10),
1209
        ('lt', '1', 0),
1210
        ('lte', '1', 1),
1211
        ('gt', '1', 10),
1212
        ('gt', '10', 1),
1213
        ('gte', '1', 11),
1214
    ]
1215
    for operator, value, result in params:
1216
        resp = get_app(pub).get(
1217
            sign_uri(
1218
                '/api/forms/test/list?filter-internal-id=%s&filter-internal-id-operator=%s'
1219
                % (value, operator),
1220
                user=local_user,
1221
            )
1222
        )
1223
        assert len(resp.json) == result
1224
    resp = get_app(pub).get(
1225
        sign_uri('/api/forms/test/list?filter-internal-id=blabla', user=local_user), status=400
1226
    )
1227
    assert resp.json['err_desc'] == 'Invalid value "blabla" for "filter-internal-id-value"'
1228

  
1229

  
1230
def test_api_list_formdata_user_filter(pub, local_user):
1231
    pub.role_class.wipe()
1232
    role = pub.role_class(name='test')
1233
    role.store()
1234

  
1235
    local_user.roles = [role.id]
1236
    local_user.store()
1237

  
1238
    user1 = pub.user_class(name='userA')
1239
    user1.name_identifiers = ['ABCDEF']
1240
    user1.store()
1241
    user2 = pub.user_class(name='userB')
1242
    user1.name_identifiers = ['GHIJKL']
1243
    user2.store()
1244

  
1245
    FormDef.wipe()
1246
    formdef = FormDef()
1247
    formdef.name = 'test'
1248
    formdef.workflow_roles = {'_receiver': role.id}
1249
    formdef.fields = []
1250
    formdef.store()
1251

  
1252
    data_class = formdef.data_class()
1253
    data_class.wipe()
1254

  
1255
    for i in range(3):
1256
        formdata = data_class()
1257
        formdata.data = {}
1258
        formdata.user_id = str(user1.id if i % 2 else user2.id)
1259
        formdata.just_created()
1260
        formdata.jump_status('new')
1261
        formdata.store()
1262

  
1263
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-user-uuid=ABCDEF', user=local_user))
1264
    assert len(resp.json) == 1
1265
    params = [
1266
        ('eq', 'ABCDEF', 1),
1267
        ('ne', 'ABCDEF', 2),
1268
    ]
1269
    for operator, value, result in params:
1270
        resp = get_app(pub).get(
1271
            sign_uri(
1272
                '/api/forms/test/list?filter-user-uuid=%s&filter-user-uuid-operator=%s' % (value, operator),
1273
                user=local_user,
1274
            )
1275
        )
1276
        assert len(resp.json) == result
1277
    for operator in ['lt', 'lte', 'gt', 'gte']:
1278
        resp = get_app(pub).get(
1279
            sign_uri(
1280
                '/api/forms/test/list?filter-user-uuid=ABCDEF&filter-user-uuid-operator=%s' % operator,
1281
                user=local_user,
1282
            ),
1283
            status=400,
1284
        )
1285
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-user-value"' % operator
1286

  
1287

  
1288
def test_api_list_formdata_submission_agent_filter(pub, local_user):
1289
    pub.role_class.wipe()
1290
    role = pub.role_class(name='test')
1291
    role.store()
1292

  
1293
    local_user.roles = [role.id]
1294
    local_user.store()
1295

  
1296
    user1 = pub.user_class(name='userA')
1297
    user1.name_identifiers = ['ABCDEF']
1298
    user1.store()
1299
    user2 = pub.user_class(name='userB')
1300
    user1.name_identifiers = ['GHIJKL']
1301
    user2.store()
1302

  
1303
    FormDef.wipe()
1304
    formdef = FormDef()
1305
    formdef.name = 'test'
1306
    formdef.workflow_roles = {'_receiver': role.id}
1307
    formdef.fields = []
1308
    formdef.store()
1309

  
1310
    data_class = formdef.data_class()
1311
    data_class.wipe()
1312

  
1313
    for i in range(3):
1314
        formdata = data_class()
1315
        formdata.data = {}
1316
        formdata.submission_agent_id = str(user1.id if i % 2 else user2.id)
1317
        formdata.user_id = local_user.id
1318
        formdata.just_created()
1319
        formdata.jump_status('new')
1320
        formdata.store()
1321

  
1322
    resp = get_app(pub).get(
1323
        sign_uri('/api/forms/test/list?filter-submission-agent-uuid=ABCDEF', user=local_user)
1324
    )
1325
    assert len(resp.json) == 1
1326
    params = [
1327
        ('eq', 'ABCDEF', 1),
1328
        ('ne', 'ABCDEF', 2),
1329
    ]
1330
    for operator, value, result in params:
1331
        resp = get_app(pub).get(
1332
            sign_uri(
1333
                '/api/forms/test/list?filter-submission-agent-uuid=%s&filter-submission-agent-uuid-operator=%s'
1334
                % (value, operator),
1335
                user=local_user,
1336
            )
1337
        )
1338
        assert len(resp.json) == result
1339
    for operator in ['lt', 'lte', 'gt', 'gte']:
1340
        resp = get_app(pub).get(
1341
            sign_uri(
1342
                '/api/forms/test/list?filter-submission-agent-uuid=ABCDEF&filter-submission-agent-uuid-operator=%s'
1343
                % operator,
1344
                user=local_user,
1345
            ),
1346
            status=400,
1347
        )
1348
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-submission-agent-value"' % operator
1349

  
841 1350

  
842 1351
def test_api_list_formdata_block_field_filter(pub, local_user):
843 1352
    if not pub.is_using_postgresql():
......
847 1356
    data_source = NamedDataSource(name='foobar')
848 1357
    data_source.data_source = {
849 1358
        'type': 'formula',
850
        'value': repr([{'id': '1', 'text': 'foo', 'more': 'XXX'}, {'id': '2', 'text': 'bar', 'more': 'YYY'}]),
1359
        'value': repr([{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]),
851 1360
    }
852 1361
    data_source.store()
853 1362

  
......
904 1413
        if i == 0:
905 1414
            formdata.data['0']['data'].append(
906 1415
                {
907
                    '1': 'plop%s' % i,
1416
                    '1': 'plop%s' % (i + 1),
908 1417
                    '2': '1',
909 1418
                    '2_display': 'foo',
910 1419
                    '2_structured': 'XXX',
......
925 1434
    assert len(resp.json) == 1
926 1435
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_string=plop10', user=local_user))
927 1436
    assert len(resp.json) == 0
1437
    params = [
1438
        ('eq', 'plop5', 1),
1439
        ('ne', 'plop5', 9),
1440
        ('ne', 'plop1', 8),
1441
        ('lt', 'plop5', 5),
1442
        ('lte', 'plop5', 6),
1443
        ('gt', 'plop5', 4),
1444
        ('gt', '42', 0),
1445
        ('gte', 'plop5', 5),
1446
    ]
1447
    for operator, value, result in params:
1448
        resp = get_app(pub).get(
1449
            sign_uri(
1450
                '/api/forms/test/list?filter-blockdata_string=%s&filter-blockdata_string-operator=%s'
1451
                % (value, operator),
1452
                user=local_user,
1453
            )
1454
        )
1455
        assert len(resp.json) == result
928 1456
    # item
929 1457
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_item=1', user=local_user))
930 1458
    assert len(resp.json) == 6
......
932 1460
    assert len(resp.json) == 5
933 1461
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_item=3', user=local_user))
934 1462
    assert len(resp.json) == 0
1463
    params = [
1464
        ('eq', '1', 6),
1465
        ('ne', '1', 4),
1466
        ('lt', '2', 6),
1467
        ('lte', '1', 6),
1468
        ('gt', '1', 5),
1469
        ('gte', '2', 5),
1470
    ]
1471
    for operator, value, result in params:
1472
        resp = get_app(pub).get(
1473
            sign_uri(
1474
                '/api/forms/test/list?filter-blockdata_item=%s&filter-blockdata_item-operator=%s'
1475
                % (value, operator),
1476
                user=local_user,
1477
            )
1478
        )
1479
        assert len(resp.json) == result
935 1480
    # bool
936 1481
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_bool=true', user=local_user))
937 1482
    assert len(resp.json) == 6
......
941 1486
        sign_uri('/api/forms/test/list?filter-blockdata_bool=foobar', user=local_user), status=400
942 1487
    )
943 1488
    assert resp.json['err_desc'] == 'Invalid value "foobar" for "filter-blockdata_bool"'
1489
    params = [
1490
        ('eq', 'true', 6),
1491
        ('ne', 'true', 4),
1492
    ]
1493
    for operator, value, result in params:
1494
        resp = get_app(pub).get(
1495
            sign_uri(
1496
                '/api/forms/test/list?filter-blockdata_bool=%s&filter-blockdata_bool-operator=%s'
1497
                % (value, operator),
1498
                user=local_user,
1499
            )
1500
        )
1501
        assert len(resp.json) == result
1502
    for operator in ['lt', 'lte', 'gt', 'gte']:
1503
        resp = get_app(pub).get(
1504
            sign_uri(
1505
                '/api/forms/test/list?filter-blockdata_bool=true&filter-blockdata_bool-operator=%s'
1506
                % operator,
1507
                user=local_user,
1508
            ),
1509
            status=400,
1510
        )
1511
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-blockdata_bool"' % operator
944 1512
    # date
945 1513
    resp = get_app(pub).get(
946 1514
        sign_uri('/api/forms/test/list?filter-blockdata_date=2021-06-01', user=local_user)
......
954 1522
        sign_uri('/api/forms/test/list?filter-blockdata_date=02/06/2021', user=local_user)
955 1523
    )
956 1524
    assert len(resp.json) == 2
1525
    params = [
1526
        ('eq', '2021-06-02', 2),
1527
        ('ne', '2021-06-02', 8),
1528
        ('lt', '2021-06-02', 1),
1529
        ('lte', '2021-06-02', 2),
1530
        ('gt', '2021-06-02', 8),
1531
        ('gte', '2021-06-02', 10),
1532
    ]
1533
    for operator, value, result in params:
1534
        resp = get_app(pub).get(
1535
            sign_uri(
1536
                '/api/forms/test/list?filter-blockdata_date=%s&filter-blockdata_date-operator=%s'
1537
                % (value, operator),
1538
                user=local_user,
1539
            )
1540
        )
1541
        assert len(resp.json) == result
957 1542
    # email
958 1543
    resp = get_app(pub).get(
959 1544
        sign_uri('/api/forms/test/list?filter-blockdata_email=a@localhost', user=local_user)
......
967 1552
        sign_uri('/api/forms/test/list?filter-blockdata_email=c@localhost', user=local_user)
968 1553
    )
969 1554
    assert len(resp.json) == 0
1555
    params = [
1556
        ('eq', 'a@localhost', 6),
1557
        ('ne', 'a@localhost', 4),
1558
    ]
1559
    for operator, value, result in params:
1560
        resp = get_app(pub).get(
1561
            sign_uri(
1562
                '/api/forms/test/list?filter-blockdata_email=%s&filter-blockdata_email-operator=%s'
1563
                % (value, operator),
1564
                user=local_user,
1565
            )
1566
        )
1567
        assert len(resp.json) == result
1568
    for operator in ['lt', 'lte', 'gt', 'gte']:
1569
        resp = get_app(pub).get(
1570
            sign_uri(
1571
                '/api/forms/test/list?filter-blockdata_email=plop0&filter-blockdata_email-operator=%s'
1572
                % operator,
1573
                user=local_user,
1574
            ),
1575
            status=400,
1576
        )
1577
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-blockdata_email"' % operator
970 1578
    # mix
971 1579
    resp = get_app(pub).get(
972 1580
        sign_uri(
973 1581
            '/api/forms/test/list?filter-blockdata_item=1&filter-blockdata_string=plop1', user=local_user
974 1582
        )
975 1583
    )
976
    assert len(resp.json) == 1
1584
    assert len(resp.json) == 2
977 1585
    resp = get_app(pub).get(
978 1586
        sign_uri(
979 1587
            '/api/forms/test/list?filter-blockdata_item=2&filter-blockdata_string=plop1', user=local_user
980 1588
        )
981 1589
    )
982
    assert len(resp.json) == 0
1590
    assert len(resp.json) == 1
983 1591
    resp = get_app(pub).get(
984 1592
        sign_uri(
985 1593
            '/api/forms/test/list?filter-blockdata_item=1&filter-blockdata_string=plop0', user=local_user
wcs/backoffice/management.py
69 69
    Contains,
70 70
    Equal,
71 71
    FtsMatch,
72
    Greater,
72 73
    GreaterOrEqual,
73 74
    Intersects,
75
    Less,
74 76
    LessOrEqual,
75 77
    NotEqual,
76 78
    NotNull,
......
1651 1653
        query_overrides = get_request().form
1652 1654
        return self.get_view_criterias(query_overrides, request=get_request())
1653 1655

  
1654
    def get_view_criterias(self, query_overrides=None, request=None):
1655
        from wcs import sql
1656
    def get_field_criteria(self, field, operator, field_key):
1657
        mapping = {
1658
            'eq': Equal,
1659
            'ne': NotEqual,
1660
            'lt': Less,
1661
            'lte': LessOrEqual,
1662
            'gt': Greater,
1663
            'gte': GreaterOrEqual,
1664
        }
1665
        types_with_all_ops = ['internal-id', 'date', 'item', 'items', 'string']
1666
        if field.type not in types_with_all_ops and operator not in ['eq', 'ne']:
1667
            # eq and ne are always allowed
1668
            raise RequestError('Invalid operator "%s" for "%s"' % (operator, field_key))
1669
        if operator not in mapping:
1670
            raise RequestError('Invalid operator "%s" for "%s"' % (operator, field_key))
1671
        return mapping[operator]
1656 1672

  
1673
    def get_view_criterias(self, query_overrides=None, request=None):
1657 1674
        fake_fields = [
1658 1675
            FakeField('internal-id', 'internal-id', _('Identifier')),
1659 1676
            FakeField('start', 'period-date', _('Start')),
......
1707 1724
            if filter_field.type == 'user-id':
1708 1725
                # convert uuid based filter into local id filter
1709 1726
                name_id = filters_dict.get('filter-user-uuid')
1727
                filters_dict['filter-user-operator'] = filters_dict.get('filter-user-uuid-operator')
1710 1728
                if name_id:
1711 1729
                    nameid_users = get_publisher().user_class.get_users_with_name_identifier(name_id)
1712 1730
                    request_form['filter-user'] = filters_dict['filter-user'] = 'on'
......
1724 1742
            if filter_field.type == 'submission-agent-id':
1725 1743
                # convert uuid based filter into local id filter
1726 1744
                name_id = filters_dict.get('filter-submission-agent-uuid')
1745
                filters_dict['filter-submission-agent-operator'] = filters_dict.get(
1746
                    'filter-submission-agent-uuid-operator'
1747
                )
1727 1748
                if name_id:
1728 1749
                    nameid_users = get_publisher().user_class.get_users_with_name_identifier(name_id)
1729 1750
                    request_form['filter-submission-agent'] = filters_dict['filter-submission-agent'] = 'on'
......
1749 1770
            if not filter_field_value:
1750 1771
                continue
1751 1772

  
1773
            # get operator and criteria
1774
            filter_field_operator_key = '%s-operator' % filter_field_key.replace('-value', '')
1775
            filter_field_operator = filters_dict.get(filter_field_operator_key) or 'eq'
1776
            criteria = self.get_field_criteria(filter_field, filter_field_operator, filter_field_key)
1777

  
1778
            # check value types
1752 1779
            if filter_field_value is None and filter_field.type in [
1753 1780
                'date',
1754 1781
                'bool',
......
1758 1785
                'email',
1759 1786
            ]:
1760 1787
                continue
1761
            if filter_field.type == 'date':
1788
            if filter_field.type == 'internal-id':
1789
                try:
1790
                    filter_field_value = int(filter_field_value)
1791
                except ValueError:
1792
                    raise RequestError('Invalid value "%s" for "%s"' % (filter_field_value, filter_field_key))
1793
            elif filter_field.type == 'period-date':
1794
                try:
1795
                    filter_date_value = misc.get_as_datetime(filter_field_value).timetuple()
1796
                except ValueError:
1797
                    continue
1798
            elif filter_field.type == 'date':
1762 1799
                try:
1763 1800
                    filter_field_value = misc.get_as_datetime(filter_field_value).date().strftime('%Y-%m-%d')
1764 1801
                except ValueError:
......
1770 1807
                    filter_field_value = False
1771 1808
                else:
1772 1809
                    raise RequestError('Invalid value "%s" for "%s"' % (filter_field_value, filter_field_key))
1773

  
1774
            if is_in_block_field:
1775
                criterias.append(
1776
                    sql.ArrayContains(
1777
                        'f%s' % filter_field.block_field.id,
1778
                        json.dumps([{filter_field.id: filter_field_value}]),
1779
                        parent_field=filter_field.block_field,
1780
                    )
1781
                )
1782
            elif filter_field.type == 'internal-id':
1783
                criterias.append(Equal('id', str(filter_field_value)))
1784
            elif filter_field.type == 'period-date':
1810
            elif filter_field.type in ('item', 'items', 'string'):
1785 1811
                try:
1786
                    filter_date_value = misc.get_as_datetime(filter_field_value).timetuple()
1812
                    filter_field_value = int(filter_field_value)
1787 1813
                except ValueError:
1788
                    continue
1814
                    pass
1815

  
1816
            # add criteria
1817
            if filter_field.type == 'internal-id':
1818
                criterias.append(criteria('id', filter_field_value))
1819
            elif filter_field.type == 'period-date':
1789 1820
                if filter_field.id == 'start':
1790 1821
                    criterias.append(GreaterOrEqual('receipt_time', filter_date_value))
1791 1822
                elif filter_field.id == 'end':
......
1798 1829
            elif filter_field.type == 'user-id':
1799 1830
                if filter_field_value == '__current__' and get_request().user:
1800 1831
                    filter_field_value = str(get_request().user.id)
1801
                criterias.append(Equal('user_id', filter_field_value))
1832
                criterias.append(criteria('user_id', filter_field_value))
1802 1833
            elif filter_field.type == 'submission-agent-id':
1803
                criterias.append(Equal('submission_agent_id', filter_field_value))
1804
            elif filter_field.type in ('item', 'items'):
1805
                if filter_field.type == 'item':
1806
                    criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
1807
                elif filter_field.type == 'items':
1808
                    criterias.append(Intersects('f%s' % filter_field.id, [filter_field_value]))
1809
                field_options = filter_field.get_options()
1810
                if field_options and type(field_options[0]) in (list, tuple):
1811
                    for option in field_options:
1812
                        if filter_field_value in (option[0], option[-1]):
1813
                            filter_field_value = option[1]
1814
                            break
1815
                criterias[-1]._label = '%s: %s' % (filter_field.label, filter_field_value)
1816
            elif filter_field.type == 'bool':
1817
                criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
1818
            elif filter_field.type in ('string', 'email'):
1819
                criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
1820
            elif filter_field.type == 'date':
1821
                criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
1834
                criterias.append(criteria('submission_agent_id', filter_field_value))
1835
            elif filter_field.type in ('item', 'items', 'bool', 'string', 'email', 'date'):
1836
                criterias.append(criteria('f%s' % filter_field.id, filter_field_value, field=filter_field))
1837
                if filter_field.type in ('item', 'items'):
1838
                    field_options = filter_field.get_options()
1839
                    if field_options and type(field_options[0]) in (list, tuple):
1840
                        for option in field_options:
1841
                            if filter_field_value in (option[0], option[-1]):
1842
                                filter_field_value = option[1]
1843
                                break
1844
                    criterias[-1]._label = '%s: %s' % (filter_field.label, filter_field_value)
1822 1845

  
1823 1846
        return criterias
1824 1847

  
wcs/qommon/storage.py
155 155
        self.field = kwargs.get('field')
156 156

  
157 157
    def build_lambda(self):
158
        return lambda x: self.op(getattr(x, self.attribute, None) or self.typed_none, self.value)
158
        def func(x):
159
            if self.field and self.field.key in ['item', 'string'] and isinstance(self.value, int):
160
                try:
161
                    _x = int(getattr(x, self.attribute, None))
162
                except (ValueError, TypeError):
163
                    _x = self.typed_none
164
                return self.op(_x, self.value)
165
            if self.field and self.field.key == 'items':
166
                _x = getattr(x, self.attribute, None) or []
167
                if isinstance(self.value, int):
168
                    try:
169
                        _x = [int(e) for e in _x]
170
                    except ValueError:
171
                        _x = []
172
                return getattr(self, 'array_op', any)(self.op(e, self.value) for e in _x)
173
            if isinstance(self.value, int):
174
                return self.op(int(getattr(x, self.attribute, None)) or self.typed_none, self.value)
175
            return self.op(getattr(x, self.attribute, None) or self.typed_none, self.value)
176

  
177
        return func
159 178

  
160 179
    def __repr__(self):
161 180
        return '<%s (attribute: %r%s)>' % (
......
179 198

  
180 199
class NotEqual(Criteria):
181 200
    op = operator.ne
201
    array_op = all
182 202

  
183 203

  
184 204
class LessOrEqual(Criteria):
wcs/sql.py
107 107
        self.attribute = attribute.replace('-', '_')
108 108
        self.value = value
109 109
        self.field = kwargs.get('field')
110
        self.parent_field = kwargs.get('parent_field')
111 110

  
112 111
    def as_sql(self):
112
        if self.field and getattr(self.field, 'block_field', None):
113
            # eq: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' = 'value')
114
            # lt: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' < 'value')
115
            # lte: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' <= 'value')
116
            # gt: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' > 'value')
117
            # gte: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' >= 'value')
118
            # with a NOT EXISTS and the opposite operator:
119
            # ne: NOT EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' = 'value')
120
            # note: aa->>'FOOBAR' can be written with an integer or bool cast
121
            attribute = "aa->>'%s'" % self.field.id
122
            if self.field.key in ['item', 'string'] and isinstance(self.value, int):
123
                # integer cast of db values
124
                attribute = "(CASE WHEN %s~E'^\\\\d+$' THEN (%s)::int ELSE NULL END)" % (attribute, attribute)
125
            elif self.field.key == 'bool':
126
                # bool cast of db values
127
                attribute = '(%s)::bool' % attribute
128
            return "%s(SELECT 1 FROM jsonb_array_elements(%s->'data') AS datas(aa) WHERE %s %s %%(c%s)s)" % (
129
                getattr(self, 'sql_exists', 'EXISTS'),
130
                get_field_id(self.field.block_field),
131
                attribute,
132
                getattr(self, 'sql_op_exists', self.sql_op),
133
                id(self.value),
134
            )
135

  
113 136
        attribute = self.attribute
137

  
138
        if self.field and self.field.key == 'items':
139
            # eq: 'value' = ANY (ITEMS)
140
            # ne: 'value' != ALL (ITEMS)
141
            # with reversed operator:
142
            # lt: 'value' > ANY (ITEMS)
143
            # lte: 'value' >= ANY (ITEMS)
144
            # gt: 'value' < ANY (ITEMS)
145
            # gte: 'value' <= ANY (ITEMS)
146
            # note: ITEMS is written with an integer cast or with a COALESCE expression
147
            if isinstance(self.value, int):
148
                # integer cast of db values
149
                attribute = (
150
                    "CASE WHEN array_to_string(%s, '')~E'^\\\\d+$' THEN %s::int[] ELSE ARRAY[]::int[] END"
151
                    % (attribute, attribute)
152
                )
153
            else:
154
                # for none values
155
                attribute = "COALESCE(%s, ARRAY[]::text[])" % attribute
156
            return '%%(c%s)s %s %s (%s)' % (
157
                id(self.value),
158
                getattr(self, 'sql_reversed_op', self.sql_op),
159
                getattr(self, 'sql_array_op', 'ANY'),
160
                attribute,
161
            )
162

  
114 163
        if self.field and self.field.key == 'computed':
115 164
            attribute = "%s->>'data'" % self.attribute
116
        if self.parent_field and self.parent_field.key == 'block':
117
            attribute = "%s->'data'" % self.attribute
118

  
165
        elif self.field and self.field.key in ['item', 'string'] and isinstance(self.value, int):
166
            # integer cast of db values
167
            attribute = "(CASE WHEN %s~E'^\\\\d+$' THEN %s::int ELSE NULL END)" % (attribute, attribute)
119 168
        return '%s %s %%(c%s)s' % (attribute, self.sql_op, id(self.value))
120 169

  
121 170
    def as_sql_param(self):
......
130 179

  
131 180
class Less(Criteria):
132 181
    sql_op = '<'
182
    sql_reversed_op = '>'
133 183

  
134 184

  
135 185
class Greater(Criteria):
136 186
    sql_op = '>'
187
    sql_reversed_op = '<'
137 188

  
138 189

  
139 190
class Equal(Criteria):
......
147 198

  
148 199
class LessOrEqual(Criteria):
149 200
    sql_op = '<='
201
    sql_reversed_op = '>='
150 202

  
151 203

  
152 204
class GreaterOrEqual(Criteria):
153 205
    sql_op = '>='
206
    sql_reversed_op = '<='
154 207

  
155 208

  
156 209
class NotEqual(Criteria):
157 210
    sql_op = '!='
211
    # in case of items field, we want to write this clause:
212
    # 'value' != ALL (ITEMS)
213
    sql_array_op = 'ALL'
214
    # in case of block field, we want to write this clause:
215
    # NOT EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' = 'value')
216
    # and not:
217
    # EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' != 'value')
218
    sql_exists = 'NOT EXISTS'
219
    sql_op_exists = '='
158 220

  
159 221

  
160 222
class Contains(Criteria):
161
-