Projet

Général

Profil

0001-api-add-filter-operators-on-formdata-listing-60785.patch

Lauréline Guérin, 31 janvier 2022 14:00

Télécharger (40,2 ko)

Voir les différences:

Subject: [PATCH] api: add filter operators on formdata listing (#60785)

 tests/api/test_formdata.py   | 632 ++++++++++++++++++++++++++++++++++-
 wcs/backoffice/management.py | 108 +++---
 wcs/qommon/storage.py        |  22 +-
 wcs/sql.py                   |  70 +++-
 4 files changed, 777 insertions(+), 55 deletions(-)
tests/api/test_formdata.py
817 817
    assert resp.json['err_desc'] == 'Invalid filters "baz", "foobar"'
818 818

  
819 819

  
820
def test_api_list_formdata_string_filter(pub, local_user):
821
    pub.role_class.wipe()
822
    role = pub.role_class(name='test')
823
    role.store()
824

  
825
    local_user.roles = [role.id]
826
    local_user.store()
827

  
828
    FormDef.wipe()
829
    formdef = FormDef()
830
    formdef.name = 'test'
831
    formdef.workflow_roles = {'_receiver': role.id}
832
    formdef.fields = [
833
        fields.StringField(id='0', label='String', type='string', varname='string'),
834
        fields.StringField(id='1', label='String2', type='string', varname='string2'),
835
    ]
836
    formdef.store()
837

  
838
    data_class = formdef.data_class()
839
    data_class.wipe()
840

  
841
    for i in range(3):
842
        formdata = data_class()
843
        formdata.data = {
844
            '0': 'FOO %s' % i,
845
            '1': '%s' % (9 + i),
846
        }
847
        formdata.user_id = local_user.id
848
        formdata.just_created()
849
        formdata.jump_status('new')
850
        formdata.store()
851

  
852
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-string=FOO 2', user=local_user))
853
    assert len(resp.json) == 1
854
    params = [
855
        ('eq', 'FOO 2', 1),
856
        ('ne', 'FOO 2', 2),
857
        ('lt', 'FOO 2', 2),
858
        ('lte', 'FOO 2', 3),
859
        ('gt', 'FOO 2', 0),
860
        ('gt', '42', 0),
861
        ('gte', 'FOO 2', 1),
862
    ]
863
    for operator, value, result in params:
864
        resp = get_app(pub).get(
865
            sign_uri(
866
                '/api/forms/test/list?filter-string=%s&filter-string-operator=%s' % (value, operator),
867
                user=local_user,
868
            )
869
        )
870
        assert len(resp.json) == result
871

  
872
    params = [
873
        ('eq', '10', 1),
874
        ('eq', '010', 1),
875
        ('ne', '10', 2),
876
        ('lt', '10', 1),
877
        ('lte', '10', 2),
878
        ('gt', '10', 1),
879
        ('gt', '9', 2),
880
        ('gte', '10', 2),
881
    ]
882
    for operator, value, result in params:
883
        resp = get_app(pub).get(
884
            sign_uri(
885
                '/api/forms/test/list?filter-string2=%s&filter-string2-operator=%s' % (value, operator),
886
                user=local_user,
887
            )
888
        )
889
        assert len(resp.json) == result
890

  
891

  
892
def test_api_list_formdata_item_filter(pub, local_user):
893
    pub.role_class.wipe()
894
    role = pub.role_class(name='test')
895
    role.store()
896

  
897
    local_user.roles = [role.id]
898
    local_user.store()
899

  
900
    NamedDataSource.wipe()
901
    data_source = NamedDataSource(name='foobar')
902
    data_source.data_source = {
903
        'type': 'formula',
904
        'value': repr([{'id': '9', 'text': 'foo'}, {'id': '10', 'text': 'bar'}, {'id': '11', 'text': 'baz'}]),
905
    }
906
    data_source.store()
907

  
908
    FormDef.wipe()
909
    formdef = FormDef()
910
    formdef.name = 'test'
911
    formdef.workflow_roles = {'_receiver': role.id}
912
    formdef.fields = [
913
        fields.ItemField(id='0', label='Item', type='item', data_source={'type': 'foobar'}, varname='item'),
914
        fields.ItemField(id='1', label='Other Item', type='item', items=['foo', 'bar'], varname='item2'),
915
    ]
916
    formdef.store()
917

  
918
    data_class = formdef.data_class()
919
    data_class.wipe()
920

  
921
    for i in range(3):
922
        formdata = data_class()
923
        formdata.data = {
924
            '0': str(9 + i),
925
            '1': 'foo' if i % 2 else 'bar',
926
        }
927
        formdata.user_id = local_user.id
928
        formdata.just_created()
929
        formdata.jump_status('new')
930
        formdata.store()
931

  
932
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-item=9', user=local_user))
933
    assert len(resp.json) == 1
934
    params = [
935
        ('eq', '10', 1),
936
        ('eq', '010', 1),
937
        ('ne', '10', 2),
938
        ('lt', '10', 1),
939
        ('lte', '10', 2),
940
        ('gt', '10', 1),
941
        ('gt', '9', 2),
942
        ('gte', '10', 2),
943
    ]
944
    for operator, value, result in params:
945
        resp = get_app(pub).get(
946
            sign_uri(
947
                '/api/forms/test/list?filter-item=%s&filter-item-operator=%s' % (value, operator),
948
                user=local_user,
949
            )
950
        )
951
        assert len(resp.json) == result
952

  
953
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-item2=foo', user=local_user))
954
    assert len(resp.json) == 1
955
    params = [
956
        ('eq', 'foo', 1),
957
        ('ne', 'foo', 2),
958
        ('lt', 'foo', 2),
959
        ('lte', 'foo', 3),
960
        ('gt', 'foo', 0),
961
        ('gt', '42', 0),
962
        ('gte', 'foo', 1),
963
    ]
964
    for operator, value, result in params:
965
        resp = get_app(pub).get(
966
            sign_uri(
967
                '/api/forms/test/list?filter-item2=%s&filter-item2-operator=%s' % (value, operator),
968
                user=local_user,
969
            )
970
        )
971
        assert len(resp.json) == result
972

  
973

  
974
def test_api_list_formdata_items_filter(pub, local_user):
975
    pub.role_class.wipe()
976
    role = pub.role_class(name='test')
977
    role.store()
978

  
979
    local_user.roles = [role.id]
980
    local_user.store()
981

  
982
    NamedDataSource.wipe()
983
    data_source = NamedDataSource(name='foobar')
984
    data_source.data_source = {
985
        'type': 'formula',
986
        'value': repr([{'id': '9', 'text': 'foo'}, {'id': '10', 'text': 'bar'}, {'id': '11', 'text': 'baz'}]),
987
    }
988
    data_source.store()
989

  
990
    FormDef.wipe()
991
    formdef = FormDef()
992
    formdef.name = 'test'
993
    formdef.workflow_roles = {'_receiver': role.id}
994
    formdef.fields = [
995
        fields.ItemsField(
996
            id='0', label='Items', type='items', data_source={'type': 'foobar'}, varname='items'
997
        ),
998
        fields.ItemsField(
999
            id='1', label='Other Item', type='items', items=['foo', 'bar', 'baz'], varname='items2'
1000
        ),
1001
    ]
1002
    formdef.store()
1003

  
1004
    data_class = formdef.data_class()
1005
    data_class.wipe()
1006

  
1007
    for i in range(4):
1008
        formdata = data_class()
1009
        formdata.data = {}
1010
        if i < 3:
1011
            formdata.data = {
1012
                '0': ['9' if i % 2 else '11', '10'],
1013
                '1': ['foo' if i % 2 else 'bar', 'baz'],
1014
            }
1015
        formdata.user_id = local_user.id
1016
        formdata.just_created()
1017
        formdata.jump_status('new')
1018
        formdata.store()
1019

  
1020
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-items=11', user=local_user))
1021
    assert len(resp.json) == 2
1022
    params = [
1023
        ('eq', '11', 2),
1024
        ('eq', '011', 2),
1025
        ('eq', '10', 3),
1026
        ('ne', '9', 3),
1027
        ('ne', '10', 1),
1028
        ('lt', '10', 1),
1029
        ('lte', '10', 3),
1030
        ('gt', '10', 2),
1031
        ('gt', '9', 3),
1032
        ('gte', '11', 2),
1033
    ]
1034
    for operator, value, result in params:
1035
        resp = get_app(pub).get(
1036
            sign_uri(
1037
                '/api/forms/test/list?filter-items=%s&filter-items-operator=%s' % (value, operator),
1038
                user=local_user,
1039
            )
1040
        )
1041
        assert len(resp.json) == result
1042

  
1043
    params = [
1044
        ('eq', 'foo', 1),
1045
        ('ne', 'foo', 3),
1046
        ('lt', 'foo', 3),
1047
        ('lte', 'foo', 3),
1048
        ('gt', 'foo', 0),
1049
        ('gt', '42', 0),
1050
        ('gte', 'foo', 1),
1051
    ]
1052
    for operator, value, result in params:
1053
        resp = get_app(pub).get(
1054
            sign_uri(
1055
                '/api/forms/test/list?filter-items2=%s&filter-items2-operator=%s' % (value, operator),
1056
                user=local_user,
1057
            )
1058
        )
1059
        assert len(resp.json) == result
1060

  
1061

  
1062
def test_api_list_formdata_bool_filter(pub, local_user):
1063
    pub.role_class.wipe()
1064
    role = pub.role_class(name='test')
1065
    role.store()
1066

  
1067
    local_user.roles = [role.id]
1068
    local_user.store()
1069

  
1070
    FormDef.wipe()
1071
    formdef = FormDef()
1072
    formdef.name = 'test'
1073
    formdef.workflow_roles = {'_receiver': role.id}
1074
    formdef.fields = [
1075
        fields.BoolField(id='0', label='Bool', type='bool', varname='bool'),
1076
    ]
1077
    formdef.store()
1078

  
1079
    data_class = formdef.data_class()
1080
    data_class.wipe()
1081

  
1082
    for i in range(3):
1083
        formdata = data_class()
1084
        formdata.data = {
1085
            '0': bool(i % 2),
1086
        }
1087
        formdata.user_id = local_user.id
1088
        formdata.just_created()
1089
        formdata.jump_status('new')
1090
        formdata.store()
1091

  
1092
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-bool=false', user=local_user))
1093
    assert len(resp.json) == 2
1094
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-bool=true', user=local_user))
1095
    assert len(resp.json) == 1
1096
    params = [
1097
        ('eq', 'true', 1),
1098
        ('ne', 'true', 2),
1099
    ]
1100
    for operator, value, result in params:
1101
        resp = get_app(pub).get(
1102
            sign_uri(
1103
                '/api/forms/test/list?filter-bool=%s&filter-bool-operator=%s' % (value, operator),
1104
                user=local_user,
1105
            )
1106
        )
1107
        assert len(resp.json) == result
1108
    for operator in ['lt', 'lte', 'gt', 'gte']:
1109
        resp = get_app(pub).get(
1110
            sign_uri(
1111
                '/api/forms/test/list?filter-bool=true&filter-bool-operator=%s' % operator, user=local_user
1112
            ),
1113
            status=400,
1114
        )
1115
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-bool"' % operator
1116

  
1117

  
820 1118
def test_api_list_formdata_date_filter(pub, local_user):
821 1119
    if not pub.is_using_postgresql():
822 1120
        pytest.skip('this requires SQL')
......
833 1131
    formdef.name = 'test'
834 1132
    formdef.workflow_roles = {'_receiver': role.id}
835 1133
    formdef.fields = [
836
        fields.DateField(id='0', label='foobar', varname='foobar', type='date'),
1134
        fields.DateField(id='0', label='Date', varname='date', type='date'),
837 1135
    ]
838 1136
    formdef.store()
839 1137

  
840 1138
    data_class = formdef.data_class()
841 1139
    data_class.wipe()
842 1140

  
843
    for i in range(30):
1141
    for i in range(3):
844 1142
        formdata = data_class()
845
        formdata.data = {'0': time.strptime('2021-06-%02d' % (i + 1), '%Y-%m-%d')}
1143
        formdata.data = {'0': time.strptime('2021-06-%02d' % (i + 10), '%Y-%m-%d')}
846 1144
        formdata.user_id = local_user.id
847 1145
        formdata.just_created()
848 1146
        formdata.jump_status('new')
849 1147
        formdata.store()
850 1148

  
851
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar=2021-06-12', user=local_user))
852
    assert len(resp.json) == 1
1149
    for value in ['2021-06-11', '11/06/2021']:
1150
        resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-date=%s' % value, user=local_user))
1151
        assert len(resp.json) == 1
1152
        params = [
1153
            ('eq', 1),
1154
            ('ne', 2),
1155
            ('lt', 1),
1156
            ('lte', 2),
1157
            ('gt', 1),
1158
            ('gte', 2),
1159
        ]
1160
        for operator, result in params:
1161
            resp = get_app(pub).get(
1162
                sign_uri(
1163
                    '/api/forms/test/list?filter-date=%s&filter-date-operator=%s' % (value, operator),
1164
                    user=local_user,
1165
                )
1166
            )
1167
            assert len(resp.json) == result
853 1168

  
854
    # alternate date format
855
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar=12/06/2021', user=local_user))
1169

  
1170
def test_api_list_formdata_email_filter(pub, local_user):
1171
    pub.role_class.wipe()
1172
    role = pub.role_class(name='test')
1173
    role.store()
1174

  
1175
    local_user.roles = [role.id]
1176
    local_user.store()
1177

  
1178
    FormDef.wipe()
1179
    formdef = FormDef()
1180
    formdef.name = 'test'
1181
    formdef.workflow_roles = {'_receiver': role.id}
1182
    formdef.fields = [
1183
        fields.EmailField(id='0', label='Email', type='email', varname='email'),
1184
    ]
1185
    formdef.store()
1186

  
1187
    data_class = formdef.data_class()
1188
    data_class.wipe()
1189

  
1190
    for i in range(3):
1191
        formdata = data_class()
1192
        formdata.data = {'0': 'a@localhost' if i % 2 else 'b@localhost'}
1193
        formdata.user_id = local_user.id
1194
        formdata.just_created()
1195
        formdata.jump_status('new')
1196
        formdata.store()
1197

  
1198
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-email=a@localhost', user=local_user))
856 1199
    assert len(resp.json) == 1
1200
    params = [
1201
        ('eq', 'a@localhost', 1),
1202
        ('ne', 'a@localhost', 2),
1203
    ]
1204
    for operator, value, result in params:
1205
        resp = get_app(pub).get(
1206
            sign_uri(
1207
                '/api/forms/test/list?filter-email=%s&filter-email-operator=%s' % (value, operator),
1208
                user=local_user,
1209
            )
1210
        )
1211
        assert len(resp.json) == result
1212
    for operator in ['lt', 'lte', 'gt', 'gte']:
1213
        resp = get_app(pub).get(
1214
            sign_uri(
1215
                '/api/forms/test/list?filter-email=a@localhost&filter-email-operator=%s' % operator,
1216
                user=local_user,
1217
            ),
1218
            status=400,
1219
        )
1220
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-email"' % operator
857 1221

  
858 1222

  
859 1223
def test_api_list_formdata_internal_id_filter(pub, local_user):
......
874 1238
    data_class = formdef.data_class()
875 1239
    data_class.wipe()
876 1240

  
877
    for i in range(2):
1241
    for i in range(11):
878 1242
        formdata = data_class()
879 1243
        formdata.data = {}
880 1244
        formdata.user_id = local_user.id
......
889 1253
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-internal-id=42', user=local_user))
890 1254
    assert len(resp.json) == 0
891 1255

  
1256
    params = [
1257
        ('eq', '1', 1),
1258
        ('eq', '01', 1),
1259
        ('ne', '1', 10),
1260
        ('lt', '1', 0),
1261
        ('lte', '1', 1),
1262
        ('gt', '1', 10),
1263
        ('gt', '10', 1),
1264
        ('gte', '1', 11),
1265
    ]
1266
    for operator, value, result in params:
1267
        resp = get_app(pub).get(
1268
            sign_uri(
1269
                '/api/forms/test/list?filter-internal-id=%s&filter-internal-id-operator=%s'
1270
                % (value, operator),
1271
                user=local_user,
1272
            )
1273
        )
1274
        assert len(resp.json) == result
1275
    resp = get_app(pub).get(
1276
        sign_uri('/api/forms/test/list?filter-internal-id=blabla', user=local_user), status=400
1277
    )
1278
    assert resp.json['err_desc'] == 'Invalid value "blabla" for "filter-internal-id-value"'
1279

  
1280

  
1281
def test_api_list_formdata_user_filter(pub, local_user):
1282
    pub.role_class.wipe()
1283
    role = pub.role_class(name='test')
1284
    role.store()
1285

  
1286
    local_user.roles = [role.id]
1287
    local_user.store()
1288

  
1289
    user1 = pub.user_class(name='userA')
1290
    user1.name_identifiers = ['ABCDEF']
1291
    user1.store()
1292
    user2 = pub.user_class(name='userB')
1293
    user1.name_identifiers = ['GHIJKL']
1294
    user2.store()
1295

  
1296
    FormDef.wipe()
1297
    formdef = FormDef()
1298
    formdef.name = 'test'
1299
    formdef.workflow_roles = {'_receiver': role.id}
1300
    formdef.fields = []
1301
    formdef.store()
1302

  
1303
    data_class = formdef.data_class()
1304
    data_class.wipe()
1305

  
1306
    for i in range(3):
1307
        formdata = data_class()
1308
        formdata.data = {}
1309
        formdata.user_id = str(user1.id if i % 2 else user2.id)
1310
        formdata.just_created()
1311
        formdata.jump_status('new')
1312
        formdata.store()
1313

  
1314
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-user-uuid=ABCDEF', user=local_user))
1315
    assert len(resp.json) == 1
1316
    params = [
1317
        ('eq', 'ABCDEF', 1),
1318
        ('ne', 'ABCDEF', 2),
1319
    ]
1320
    for operator, value, result in params:
1321
        resp = get_app(pub).get(
1322
            sign_uri(
1323
                '/api/forms/test/list?filter-user-uuid=%s&filter-user-uuid-operator=%s' % (value, operator),
1324
                user=local_user,
1325
            )
1326
        )
1327
        assert len(resp.json) == result
1328
    for operator in ['lt', 'lte', 'gt', 'gte']:
1329
        resp = get_app(pub).get(
1330
            sign_uri(
1331
                '/api/forms/test/list?filter-user-uuid=ABCDEF&filter-user-uuid-operator=%s' % operator,
1332
                user=local_user,
1333
            ),
1334
            status=400,
1335
        )
1336
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-user-value"' % operator
1337

  
1338

  
1339
def test_api_list_formdata_submission_agent_filter(pub, local_user):
1340
    pub.role_class.wipe()
1341
    role = pub.role_class(name='test')
1342
    role.store()
1343

  
1344
    local_user.roles = [role.id]
1345
    local_user.store()
1346

  
1347
    user1 = pub.user_class(name='userA')
1348
    user1.name_identifiers = ['ABCDEF']
1349
    user1.store()
1350
    user2 = pub.user_class(name='userB')
1351
    user1.name_identifiers = ['GHIJKL']
1352
    user2.store()
1353

  
1354
    FormDef.wipe()
1355
    formdef = FormDef()
1356
    formdef.name = 'test'
1357
    formdef.workflow_roles = {'_receiver': role.id}
1358
    formdef.fields = []
1359
    formdef.store()
1360

  
1361
    data_class = formdef.data_class()
1362
    data_class.wipe()
1363

  
1364
    for i in range(3):
1365
        formdata = data_class()
1366
        formdata.data = {}
1367
        formdata.submission_agent_id = str(user1.id if i % 2 else user2.id)
1368
        formdata.user_id = local_user.id
1369
        formdata.just_created()
1370
        formdata.jump_status('new')
1371
        formdata.store()
1372

  
1373
    resp = get_app(pub).get(
1374
        sign_uri('/api/forms/test/list?filter-submission-agent-uuid=ABCDEF', user=local_user)
1375
    )
1376
    assert len(resp.json) == 1
1377
    params = [
1378
        ('eq', 'ABCDEF', 1),
1379
        ('ne', 'ABCDEF', 2),
1380
    ]
1381
    for operator, value, result in params:
1382
        resp = get_app(pub).get(
1383
            sign_uri(
1384
                '/api/forms/test/list?filter-submission-agent-uuid=%s&filter-submission-agent-uuid-operator=%s'
1385
                % (value, operator),
1386
                user=local_user,
1387
            )
1388
        )
1389
        assert len(resp.json) == result
1390
    for operator in ['lt', 'lte', 'gt', 'gte']:
1391
        resp = get_app(pub).get(
1392
            sign_uri(
1393
                '/api/forms/test/list?filter-submission-agent-uuid=ABCDEF&filter-submission-agent-uuid-operator=%s'
1394
                % operator,
1395
                user=local_user,
1396
            ),
1397
            status=400,
1398
        )
1399
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-submission-agent-value"' % operator
1400

  
892 1401

  
893 1402
def test_api_list_formdata_number_filter(pub, local_user):
894 1403
    pub.role_class.wipe()
......
934 1443
    data_source = NamedDataSource(name='foobar')
935 1444
    data_source.data_source = {
936 1445
        'type': 'formula',
937
        'value': repr([{'id': '1', 'text': 'foo', 'more': 'XXX'}, {'id': '2', 'text': 'bar', 'more': 'YYY'}]),
1446
        'value': repr([{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]),
938 1447
    }
939 1448
    data_source.store()
940 1449

  
......
991 1500
        if i == 0:
992 1501
            formdata.data['0']['data'].append(
993 1502
                {
994
                    '1': 'plop%s' % i,
1503
                    '1': 'plop%s' % (i + 1),
995 1504
                    '2': '1',
996 1505
                    '2_display': 'foo',
997 1506
                    '2_structured': 'XXX',
......
1012 1521
    assert len(resp.json) == 1
1013 1522
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_string=plop10', user=local_user))
1014 1523
    assert len(resp.json) == 0
1524
    params = [
1525
        ('eq', 'plop5', 1),
1526
        ('ne', 'plop5', 9),
1527
        ('ne', 'plop1', 8),
1528
        ('lt', 'plop5', 5),
1529
        ('lte', 'plop5', 6),
1530
        ('gt', 'plop5', 4),
1531
        ('gt', '42', 0),
1532
        ('gte', 'plop5', 5),
1533
    ]
1534
    for operator, value, result in params:
1535
        resp = get_app(pub).get(
1536
            sign_uri(
1537
                '/api/forms/test/list?filter-blockdata_string=%s&filter-blockdata_string-operator=%s'
1538
                % (value, operator),
1539
                user=local_user,
1540
            )
1541
        )
1542
        assert len(resp.json) == result
1015 1543
    # item
1016 1544
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_item=1', user=local_user))
1017 1545
    assert len(resp.json) == 6
......
1019 1547
    assert len(resp.json) == 5
1020 1548
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_item=3', user=local_user))
1021 1549
    assert len(resp.json) == 0
1550
    params = [
1551
        ('eq', '1', 6),
1552
        ('ne', '1', 4),
1553
        ('lt', '2', 6),
1554
        ('lte', '1', 6),
1555
        ('gt', '1', 5),
1556
        ('gte', '2', 5),
1557
    ]
1558
    for operator, value, result in params:
1559
        resp = get_app(pub).get(
1560
            sign_uri(
1561
                '/api/forms/test/list?filter-blockdata_item=%s&filter-blockdata_item-operator=%s'
1562
                % (value, operator),
1563
                user=local_user,
1564
            )
1565
        )
1566
        assert len(resp.json) == result
1022 1567
    # bool
1023 1568
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_bool=true', user=local_user))
1024 1569
    assert len(resp.json) == 6
......
1028 1573
        sign_uri('/api/forms/test/list?filter-blockdata_bool=foobar', user=local_user), status=400
1029 1574
    )
1030 1575
    assert resp.json['err_desc'] == 'Invalid value "foobar" for "filter-blockdata_bool"'
1576
    params = [
1577
        ('eq', 'true', 6),
1578
        ('ne', 'true', 4),
1579
    ]
1580
    for operator, value, result in params:
1581
        resp = get_app(pub).get(
1582
            sign_uri(
1583
                '/api/forms/test/list?filter-blockdata_bool=%s&filter-blockdata_bool-operator=%s'
1584
                % (value, operator),
1585
                user=local_user,
1586
            )
1587
        )
1588
        assert len(resp.json) == result
1589
    for operator in ['lt', 'lte', 'gt', 'gte']:
1590
        resp = get_app(pub).get(
1591
            sign_uri(
1592
                '/api/forms/test/list?filter-blockdata_bool=true&filter-blockdata_bool-operator=%s'
1593
                % operator,
1594
                user=local_user,
1595
            ),
1596
            status=400,
1597
        )
1598
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-blockdata_bool"' % operator
1031 1599
    # date
1032 1600
    resp = get_app(pub).get(
1033 1601
        sign_uri('/api/forms/test/list?filter-blockdata_date=2021-06-01', user=local_user)
......
1041 1609
        sign_uri('/api/forms/test/list?filter-blockdata_date=02/06/2021', user=local_user)
1042 1610
    )
1043 1611
    assert len(resp.json) == 2
1612
    params = [
1613
        ('eq', '2021-06-02', 2),
1614
        ('ne', '2021-06-02', 8),
1615
        ('lt', '2021-06-02', 1),
1616
        ('lte', '2021-06-02', 2),
1617
        ('gt', '2021-06-02', 8),
1618
        ('gte', '2021-06-02', 10),
1619
    ]
1620
    for operator, value, result in params:
1621
        resp = get_app(pub).get(
1622
            sign_uri(
1623
                '/api/forms/test/list?filter-blockdata_date=%s&filter-blockdata_date-operator=%s'
1624
                % (value, operator),
1625
                user=local_user,
1626
            )
1627
        )
1628
        assert len(resp.json) == result
1044 1629
    # email
1045 1630
    resp = get_app(pub).get(
1046 1631
        sign_uri('/api/forms/test/list?filter-blockdata_email=a@localhost', user=local_user)
......
1054 1639
        sign_uri('/api/forms/test/list?filter-blockdata_email=c@localhost', user=local_user)
1055 1640
    )
1056 1641
    assert len(resp.json) == 0
1642
    params = [
1643
        ('eq', 'a@localhost', 6),
1644
        ('ne', 'a@localhost', 4),
1645
    ]
1646
    for operator, value, result in params:
1647
        resp = get_app(pub).get(
1648
            sign_uri(
1649
                '/api/forms/test/list?filter-blockdata_email=%s&filter-blockdata_email-operator=%s'
1650
                % (value, operator),
1651
                user=local_user,
1652
            )
1653
        )
1654
        assert len(resp.json) == result
1655
    for operator in ['lt', 'lte', 'gt', 'gte']:
1656
        resp = get_app(pub).get(
1657
            sign_uri(
1658
                '/api/forms/test/list?filter-blockdata_email=plop0&filter-blockdata_email-operator=%s'
1659
                % operator,
1660
                user=local_user,
1661
            ),
1662
            status=400,
1663
        )
1664
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-blockdata_email"' % operator
1057 1665
    # mix
1058 1666
    resp = get_app(pub).get(
1059 1667
        sign_uri(
1060 1668
            '/api/forms/test/list?filter-blockdata_item=1&filter-blockdata_string=plop1', user=local_user
1061 1669
        )
1062 1670
    )
1063
    assert len(resp.json) == 1
1671
    assert len(resp.json) == 2
1064 1672
    resp = get_app(pub).get(
1065 1673
        sign_uri(
1066 1674
            '/api/forms/test/list?filter-blockdata_item=2&filter-blockdata_string=plop1', user=local_user
1067 1675
        )
1068 1676
    )
1069
    assert len(resp.json) == 0
1677
    assert len(resp.json) == 1
1070 1678
    resp = get_app(pub).get(
1071 1679
        sign_uri(
1072 1680
            '/api/forms/test/list?filter-blockdata_item=1&filter-blockdata_string=plop0', user=local_user
wcs/backoffice/management.py
69 69
    Contains,
70 70
    Equal,
71 71
    FtsMatch,
72
    Greater,
72 73
    GreaterOrEqual,
73 74
    Intersects,
75
    Less,
74 76
    LessOrEqual,
75 77
    NotEqual,
76 78
    NotNull,
......
1636 1638
        query_overrides = get_request().form
1637 1639
        return self.get_view_criterias(query_overrides, request=get_request())
1638 1640

  
1639
    def get_view_criterias(self, query_overrides=None, request=None, record_errors=False):
1640
        from wcs import sql
1641
    def get_field_criteria(self, field, operator, field_key):
1642
        mapping = {
1643
            'eq': Equal,
1644
            'ne': NotEqual,
1645
            'lt': Less,
1646
            'lte': LessOrEqual,
1647
            'gt': Greater,
1648
            'gte': GreaterOrEqual,
1649
        }
1650
        types_with_all_ops = ['internal-id', 'date', 'item', 'items', 'string']
1651
        if field.type not in types_with_all_ops and operator not in ['eq', 'ne']:
1652
            # eq and ne are always allowed
1653
            raise RequestError('Invalid operator "%s" for "%s"' % (operator, field_key))
1654
        if operator not in mapping:
1655
            raise RequestError('Invalid operator "%s" for "%s"' % (operator, field_key))
1656
        return mapping[operator]
1641 1657

  
1658
    def get_view_criterias(self, query_overrides=None, request=None, record_errors=False):
1642 1659
        fake_fields = [
1643 1660
            FakeField('internal-id', 'internal-id', _('Identifier')),
1644 1661
            FakeField('number', 'number', _('Number')),
......
1665 1682
        filters_in_request = {
1666 1683
            k.replace('filter-', '')
1667 1684
            for k in filters_dict
1668
            if k.startswith('filter-') and not k.endswith('-value')
1685
            if k.startswith('filter-') and not k.endswith('-value') and not k.endswith('-operator')
1669 1686
        }
1670 1687
        filters_in_request = {
1671 1688
            f
......
1696 1713
            if filter_field.type == 'user-id':
1697 1714
                # convert uuid based filter into local id filter
1698 1715
                name_id = filters_dict.get('filter-user-uuid')
1716
                filters_dict['filter-user-operator'] = filters_dict.get('filter-user-uuid-operator')
1699 1717
                if name_id:
1700 1718
                    nameid_users = get_publisher().user_class.get_users_with_name_identifier(name_id)
1701 1719
                    request_form['filter-user'] = filters_dict['filter-user'] = 'on'
......
1713 1731
            if filter_field.type == 'submission-agent-id':
1714 1732
                # convert uuid based filter into local id filter
1715 1733
                name_id = filters_dict.get('filter-submission-agent-uuid')
1734
                filters_dict['filter-submission-agent-operator'] = filters_dict.get(
1735
                    'filter-submission-agent-uuid-operator'
1736
                )
1716 1737
                if name_id:
1717 1738
                    nameid_users = get_publisher().user_class.get_users_with_name_identifier(name_id)
1718 1739
                    request_form['filter-submission-agent'] = filters_dict['filter-submission-agent'] = 'on'
......
1741 1762
            if not filter_field_value:
1742 1763
                continue
1743 1764

  
1744
            if filter_field.type == 'date':
1765
            # get operator and criteria
1766
            filter_field_operator_key = '%s-operator' % filter_field_key.replace('-value', '')
1767
            filter_field_operator = filters_dict.get(filter_field_operator_key) or 'eq'
1768
            criteria = self.get_field_criteria(filter_field, filter_field_operator, filter_field_key)
1769

  
1770
            # check value types
1771
            if filter_field_value is None and filter_field.type in [
1772
                'date',
1773
                'bool',
1774
                'item',
1775
                'items',
1776
                'string',
1777
                'email',
1778
            ]:
1779
                continue
1780
            if filter_field.type == 'internal-id':
1781
                try:
1782
                    filter_field_value = int(filter_field_value)
1783
                except ValueError:
1784
                    raise RequestError('Invalid value "%s" for "%s"' % (filter_field_value, filter_field_key))
1785
            elif filter_field.type == 'period-date':
1786
                try:
1787
                    filter_date_value = misc.get_as_datetime(filter_field_value).timetuple()
1788
                except ValueError:
1789
                    continue
1790
            elif filter_field.type == 'date':
1745 1791
                try:
1746 1792
                    filter_field_value = misc.get_as_datetime(filter_field_value).date().strftime('%Y-%m-%d')
1747 1793
                except ValueError:
......
1753 1799
                    filter_field_value = False
1754 1800
                else:
1755 1801
                    raise RequestError('Invalid value "%s" for "%s"' % (filter_field_value, filter_field_key))
1802
            elif filter_field.type in ('item', 'items', 'string'):
1803
                try:
1804
                    filter_field_value = int(filter_field_value)
1805
                except ValueError:
1806
                    pass
1756 1807

  
1757
            if getattr(filter_field, 'block_field', None):
1758
                criterias.append(
1759
                    sql.ArrayContains(
1760
                        'f%s' % filter_field.block_field.id,
1761
                        json.dumps([{filter_field.id: filter_field_value}]),
1762
                        parent_field=filter_field.block_field,
1763
                    )
1764
                )
1765
            elif filter_field.type == 'internal-id':
1766
                criterias.append(Equal('id', str(filter_field_value)))
1808
            # add criteria
1809
            if filter_field.type == 'internal-id':
1810
                criterias.append(criteria('id', filter_field_value))
1767 1811
            elif filter_field.type == 'number':
1768 1812
                criterias.append(Equal('id_display', str(filter_field_value)))
1769 1813
            elif filter_field.type == 'period-date':
1770
                try:
1771
                    filter_date_value = misc.get_as_datetime(filter_field_value).timetuple()
1772
                except ValueError:
1773
                    continue
1774 1814
                if filter_field.id == 'start':
1775 1815
                    criterias.append(GreaterOrEqual('receipt_time', filter_date_value))
1776 1816
                elif filter_field.id == 'end':
......
1783 1823
            elif filter_field.type == 'user-id':
1784 1824
                if filter_field_value == '__current__' and get_request().user:
1785 1825
                    filter_field_value = str(get_request().user.id)
1786
                criterias.append(Equal('user_id', filter_field_value))
1826
                criterias.append(criteria('user_id', filter_field_value))
1787 1827
            elif filter_field.type == 'submission-agent-id':
1788
                criterias.append(Equal('submission_agent_id', filter_field_value))
1789
            elif filter_field.type in ('item', 'items'):
1790
                if filter_field.type == 'item':
1791
                    criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
1792
                elif filter_field.type == 'items':
1793
                    criterias.append(Intersects('f%s' % filter_field.id, [filter_field_value]))
1794
                field_options = filter_field.get_options()
1795
                if field_options and type(field_options[0]) in (list, tuple):
1796
                    for option in field_options:
1797
                        if filter_field_value in (option[0], option[-1]):
1798
                            filter_field_value = option[1]
1799
                            break
1800
                criterias[-1]._label = '%s: %s' % (filter_field.label, filter_field_value)
1801
            elif filter_field.type == 'bool':
1802
                criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
1803
            elif filter_field.type in ('string', 'email'):
1804
                criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
1805
            elif filter_field.type == 'date':
1806
                criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
1828
                criterias.append(criteria('submission_agent_id', filter_field_value))
1829
            elif filter_field.type in ('item', 'items', 'bool', 'string', 'email', 'date'):
1830
                criterias.append(criteria('f%s' % filter_field.id, filter_field_value, field=filter_field))
1831
                if filter_field.type in ('item', 'items'):
1832
                    field_options = filter_field.get_options()
1833
                    if field_options and type(field_options[0]) in (list, tuple):
1834
                        for option in field_options:
1835
                            if filter_field_value in (option[0], option[-1]):
1836
                                filter_field_value = option[1]
1837
                                break
1838
                    criterias[-1]._label = '%s: %s' % (filter_field.label, filter_field_value)
1807 1839

  
1808 1840
        unknown_filters = sorted(filters_in_request - known_filters)
1809 1841
        if unknown_filters:
wcs/qommon/storage.py
155 155
        self.field = kwargs.get('field')
156 156

  
157 157
    def build_lambda(self):
158
        return lambda x: self.op(getattr(x, self.attribute, None) or self.typed_none, self.value)
158
        def func(x):
159
            if self.field and self.field.key in ['item', 'string'] and isinstance(self.value, int):
160
                try:
161
                    _x = int(getattr(x, self.attribute, None))
162
                except (ValueError, TypeError):
163
                    _x = self.typed_none
164
                return self.op(_x, self.value)
165
            if self.field and self.field.key == 'items':
166
                _x = getattr(x, self.attribute, None) or []
167
                if isinstance(self.value, int):
168
                    try:
169
                        _x = [int(e) for e in _x]
170
                    except ValueError:
171
                        _x = []
172
                return getattr(self, 'array_op', any)(self.op(e, self.value) for e in _x)
173
            if isinstance(self.value, int):
174
                return self.op(int(getattr(x, self.attribute, None)) or self.typed_none, self.value)
175
            return self.op(getattr(x, self.attribute, None) or self.typed_none, self.value)
176

  
177
        return func
159 178

  
160 179
    def __repr__(self):
161 180
        return '<%s (attribute: %r%s)>' % (
......
179 198

  
180 199
class NotEqual(Criteria):
181 200
    op = operator.ne
201
    array_op = all
182 202

  
183 203

  
184 204
class LessOrEqual(Criteria):
wcs/sql.py
108 108
        self.attribute = attribute.replace('-', '_')
109 109
        self.value = value
110 110
        self.field = kwargs.get('field')
111
        self.parent_field = kwargs.get('parent_field')
112 111

  
113 112
    def as_sql(self):
113
        if self.field and getattr(self.field, 'block_field', None):
114
            # eq: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' = 'value')
115
            # lt: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' < 'value')
116
            # lte: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' <= 'value')
117
            # gt: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' > 'value')
118
            # gte: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' >= 'value')
119
            # with a NOT EXISTS and the opposite operator:
120
            # ne: NOT EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' = 'value')
121
            # note: aa->>'FOOBAR' can be written with an integer or bool cast
122
            attribute = "aa->>'%s'" % self.field.id
123
            if self.field.key in ['item', 'string'] and isinstance(self.value, int):
124
                # integer cast of db values
125
                attribute = "(CASE WHEN %s~E'^\\\\d+$' THEN (%s)::int ELSE NULL END)" % (attribute, attribute)
126
            elif self.field.key == 'bool':
127
                # bool cast of db values
128
                attribute = '(%s)::bool' % attribute
129
            return "%s(SELECT 1 FROM jsonb_array_elements(%s->'data') AS datas(aa) WHERE %s %s %%(c%s)s)" % (
130
                getattr(self, 'sql_exists', 'EXISTS'),
131
                get_field_id(self.field.block_field),
132
                attribute,
133
                getattr(self, 'sql_op_exists', self.sql_op),
134
                id(self.value),
135
            )
136

  
114 137
        attribute = self.attribute
138

  
139
        if self.field and self.field.key == 'items':
140
            # eq: 'value' = ANY (ITEMS)
141
            # ne: 'value' != ALL (ITEMS)
142
            # with reversed operator:
143
            # lt: 'value' > ANY (ITEMS)
144
            # lte: 'value' >= ANY (ITEMS)
145
            # gt: 'value' < ANY (ITEMS)
146
            # gte: 'value' <= ANY (ITEMS)
147
            # note: ITEMS is written with an integer cast or with a COALESCE expression
148
            if isinstance(self.value, int):
149
                # integer cast of db values
150
                attribute = (
151
                    "CASE WHEN array_to_string(%s, '')~E'^\\\\d+$' THEN %s::int[] ELSE ARRAY[]::int[] END"
152
                    % (attribute, attribute)
153
                )
154
            else:
155
                # for none values
156
                attribute = "COALESCE(%s, ARRAY[]::text[])" % attribute
157
            return '%%(c%s)s %s %s (%s)' % (
158
                id(self.value),
159
                getattr(self, 'sql_reversed_op', self.sql_op),
160
                getattr(self, 'sql_array_op', 'ANY'),
161
                attribute,
162
            )
163

  
115 164
        if self.field and self.field.key == 'computed':
116 165
            attribute = "%s->>'data'" % self.attribute
117
        if self.parent_field and self.parent_field.key == 'block':
118
            attribute = "%s->'data'" % self.attribute
119

  
166
        elif self.field and self.field.key in ['item', 'string'] and isinstance(self.value, int):
167
            # integer cast of db values
168
            attribute = "(CASE WHEN %s~E'^\\\\d+$' THEN %s::int ELSE NULL END)" % (attribute, attribute)
120 169
        return '%s %s %%(c%s)s' % (attribute, self.sql_op, id(self.value))
121 170

  
122 171
    def as_sql_param(self):
......
131 180

  
132 181
class Less(Criteria):
133 182
    sql_op = '<'
183
    sql_reversed_op = '>'
134 184

  
135 185

  
136 186
class Greater(Criteria):
137 187
    sql_op = '>'
188
    sql_reversed_op = '<'
138 189

  
139 190

  
140 191
class Equal(Criteria):
......
148 199

  
149 200
class LessOrEqual(Criteria):
150 201
    sql_op = '<='
202
    sql_reversed_op = '>='
151 203

  
152 204

  
153 205
class GreaterOrEqual(Criteria):
154 206
    sql_op = '>='
207
    sql_reversed_op = '<='
155 208

  
156 209

  
157 210
class NotEqual(Criteria):
158 211
    sql_op = '!='
212
    # in case of items field, we want to write this clause:
213
    # 'value' != ALL (ITEMS)
214
    sql_array_op = 'ALL'
215
    # in case of block field, we want to write this clause:
216
    # NOT EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' = 'value')
217
    # and not:
218
    # EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' != 'value')
219
    sql_exists = 'NOT EXISTS'
220
    sql_op_exists = '='
159 221

  
160 222

  
161 223
class Contains(Criteria):
162
-