Projet

Général

Profil

0001-api-add-filter-operators-on-formdata-listing-60785.patch

Lauréline Guérin, 10 février 2022 09:25

Télécharger (34,7 ko)

Voir les différences:

Subject: [PATCH] api: add filter operators on formdata listing (#60785)

 tests/api/test_formdata.py   | 511 ++++++++++++++++++++++++++++++++++-
 wcs/backoffice/management.py |  91 ++++---
 wcs/qommon/storage.py        |  22 +-
 wcs/sql.py                   |  70 ++++-
 4 files changed, 641 insertions(+), 53 deletions(-)
tests/api/test_formdata.py
817 817
    assert resp.json['err_desc'] == 'Invalid filters "baz", "foobar"'
818 818

  
819 819

  
820
def test_api_list_formdata_string_filter(pub, local_user):
821
    pub.role_class.wipe()
822
    role = pub.role_class(name='test')
823
    role.store()
824

  
825
    local_user.roles = [role.id]
826
    local_user.store()
827

  
828
    FormDef.wipe()
829
    formdef = FormDef()
830
    formdef.name = 'test'
831
    formdef.workflow_roles = {'_receiver': role.id}
832
    formdef.fields = [
833
        fields.StringField(id='0', label='String', type='string', varname='string'),
834
        fields.StringField(id='1', label='String2', type='string', varname='string2'),
835
    ]
836
    formdef.store()
837

  
838
    data_class = formdef.data_class()
839
    data_class.wipe()
840

  
841
    for i in range(3):
842
        formdata = data_class()
843
        formdata.data = {
844
            '0': 'FOO %s' % i,
845
            '1': '%s' % (9 + i),
846
        }
847
        formdata.user_id = local_user.id
848
        formdata.just_created()
849
        formdata.jump_status('new')
850
        formdata.store()
851

  
852
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-string=FOO 2', user=local_user))
853
    assert len(resp.json) == 1
854
    params = [
855
        ('eq', 'FOO 2', 1),
856
        ('ne', 'FOO 2', 2),
857
        ('lt', 'FOO 2', 2),
858
        ('lte', 'FOO 2', 3),
859
        ('gt', 'FOO 2', 0),
860
        ('gt', '42', 0),
861
        ('gte', 'FOO 2', 1),
862
    ]
863
    for operator, value, result in params:
864
        resp = get_app(pub).get(
865
            sign_uri(
866
                '/api/forms/test/list?filter-string=%s&filter-string-operator=%s' % (value, operator),
867
                user=local_user,
868
            )
869
        )
870
        assert len(resp.json) == result
871

  
872
    params = [
873
        ('eq', '10', 1),
874
        ('eq', '010', 1),
875
        ('ne', '10', 2),
876
        ('lt', '10', 1),
877
        ('lte', '10', 2),
878
        ('gt', '10', 1),
879
        ('gt', '9', 2),
880
        ('gte', '10', 2),
881
    ]
882
    for operator, value, result in params:
883
        resp = get_app(pub).get(
884
            sign_uri(
885
                '/api/forms/test/list?filter-string2=%s&filter-string2-operator=%s' % (value, operator),
886
                user=local_user,
887
            )
888
        )
889
        assert len(resp.json) == result
890

  
891

  
892
def test_api_list_formdata_item_filter(pub, local_user):
893
    pub.role_class.wipe()
894
    role = pub.role_class(name='test')
895
    role.store()
896

  
897
    local_user.roles = [role.id]
898
    local_user.store()
899

  
900
    NamedDataSource.wipe()
901
    data_source = NamedDataSource(name='foobar')
902
    data_source.data_source = {
903
        'type': 'formula',
904
        'value': repr([{'id': '9', 'text': 'foo'}, {'id': '10', 'text': 'bar'}, {'id': '11', 'text': 'baz'}]),
905
    }
906
    data_source.store()
907

  
908
    FormDef.wipe()
909
    formdef = FormDef()
910
    formdef.name = 'test'
911
    formdef.workflow_roles = {'_receiver': role.id}
912
    formdef.fields = [
913
        fields.ItemField(id='0', label='Item', type='item', data_source={'type': 'foobar'}, varname='item'),
914
        fields.ItemField(id='1', label='Other Item', type='item', items=['foo', 'bar'], varname='item2'),
915
    ]
916
    formdef.store()
917

  
918
    data_class = formdef.data_class()
919
    data_class.wipe()
920

  
921
    for i in range(3):
922
        formdata = data_class()
923
        formdata.data = {
924
            '0': str(9 + i),
925
            '1': 'foo' if i % 2 else 'bar',
926
        }
927
        formdata.user_id = local_user.id
928
        formdata.just_created()
929
        formdata.jump_status('new')
930
        formdata.store()
931

  
932
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-item=9', user=local_user))
933
    assert len(resp.json) == 1
934
    params = [
935
        ('eq', '10', 1),
936
        ('eq', '010', 1),
937
        ('ne', '10', 2),
938
        ('lt', '10', 1),
939
        ('lte', '10', 2),
940
        ('gt', '10', 1),
941
        ('gt', '9', 2),
942
        ('gte', '10', 2),
943
    ]
944
    for operator, value, result in params:
945
        resp = get_app(pub).get(
946
            sign_uri(
947
                '/api/forms/test/list?filter-item=%s&filter-item-operator=%s' % (value, operator),
948
                user=local_user,
949
            )
950
        )
951
        assert len(resp.json) == result
952

  
953
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-item2=foo', user=local_user))
954
    assert len(resp.json) == 1
955
    params = [
956
        ('eq', 'foo', 1),
957
        ('ne', 'foo', 2),
958
        ('lt', 'foo', 2),
959
        ('lte', 'foo', 3),
960
        ('gt', 'foo', 0),
961
        ('gt', '42', 0),
962
        ('gte', 'foo', 1),
963
    ]
964
    for operator, value, result in params:
965
        resp = get_app(pub).get(
966
            sign_uri(
967
                '/api/forms/test/list?filter-item2=%s&filter-item2-operator=%s' % (value, operator),
968
                user=local_user,
969
            )
970
        )
971
        assert len(resp.json) == result
972

  
973

  
974
def test_api_list_formdata_items_filter(pub, local_user):
975
    pub.role_class.wipe()
976
    role = pub.role_class(name='test')
977
    role.store()
978

  
979
    local_user.roles = [role.id]
980
    local_user.store()
981

  
982
    NamedDataSource.wipe()
983
    data_source = NamedDataSource(name='foobar')
984
    data_source.data_source = {
985
        'type': 'formula',
986
        'value': repr([{'id': '9', 'text': 'foo'}, {'id': '10', 'text': 'bar'}, {'id': '11', 'text': 'baz'}]),
987
    }
988
    data_source.store()
989

  
990
    FormDef.wipe()
991
    formdef = FormDef()
992
    formdef.name = 'test'
993
    formdef.workflow_roles = {'_receiver': role.id}
994
    formdef.fields = [
995
        fields.ItemsField(
996
            id='0', label='Items', type='items', data_source={'type': 'foobar'}, varname='items'
997
        ),
998
        fields.ItemsField(
999
            id='1', label='Other Item', type='items', items=['foo', 'bar', 'baz'], varname='items2'
1000
        ),
1001
    ]
1002
    formdef.store()
1003

  
1004
    data_class = formdef.data_class()
1005
    data_class.wipe()
1006

  
1007
    for i in range(4):
1008
        formdata = data_class()
1009
        formdata.data = {}
1010
        if i < 3:
1011
            formdata.data = {
1012
                '0': ['9' if i % 2 else '11', '10'],
1013
                '1': ['foo' if i % 2 else 'bar', 'baz'],
1014
            }
1015
        formdata.user_id = local_user.id
1016
        formdata.just_created()
1017
        formdata.jump_status('new')
1018
        formdata.store()
1019

  
1020
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-items=11', user=local_user))
1021
    assert len(resp.json) == 2
1022
    params = [
1023
        ('eq', '11', 2),
1024
        ('eq', '011', 2),
1025
        ('eq', '10', 3),
1026
        ('ne', '9', 3),
1027
        ('ne', '10', 1),
1028
        ('lt', '10', 1),
1029
        ('lte', '10', 3),
1030
        ('gt', '10', 2),
1031
        ('gt', '9', 3),
1032
        ('gte', '11', 2),
1033
    ]
1034
    for operator, value, result in params:
1035
        resp = get_app(pub).get(
1036
            sign_uri(
1037
                '/api/forms/test/list?filter-items=%s&filter-items-operator=%s' % (value, operator),
1038
                user=local_user,
1039
            )
1040
        )
1041
        assert len(resp.json) == result
1042

  
1043
    params = [
1044
        ('eq', 'foo', 1),
1045
        ('ne', 'foo', 3),
1046
        ('lt', 'foo', 3),
1047
        ('lte', 'foo', 3),
1048
        ('gt', 'foo', 0),
1049
        ('gt', '42', 0),
1050
        ('gte', 'foo', 1),
1051
    ]
1052
    for operator, value, result in params:
1053
        resp = get_app(pub).get(
1054
            sign_uri(
1055
                '/api/forms/test/list?filter-items2=%s&filter-items2-operator=%s' % (value, operator),
1056
                user=local_user,
1057
            )
1058
        )
1059
        assert len(resp.json) == result
1060

  
1061

  
1062
def test_api_list_formdata_bool_filter(pub, local_user):
1063
    pub.role_class.wipe()
1064
    role = pub.role_class(name='test')
1065
    role.store()
1066

  
1067
    local_user.roles = [role.id]
1068
    local_user.store()
1069

  
1070
    FormDef.wipe()
1071
    formdef = FormDef()
1072
    formdef.name = 'test'
1073
    formdef.workflow_roles = {'_receiver': role.id}
1074
    formdef.fields = [
1075
        fields.BoolField(id='0', label='Bool', type='bool', varname='bool'),
1076
    ]
1077
    formdef.store()
1078

  
1079
    data_class = formdef.data_class()
1080
    data_class.wipe()
1081

  
1082
    for i in range(3):
1083
        formdata = data_class()
1084
        formdata.data = {
1085
            '0': bool(i % 2),
1086
        }
1087
        formdata.user_id = local_user.id
1088
        formdata.just_created()
1089
        formdata.jump_status('new')
1090
        formdata.store()
1091

  
1092
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-bool=false', user=local_user))
1093
    assert len(resp.json) == 2
1094
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-bool=true', user=local_user))
1095
    assert len(resp.json) == 1
1096
    params = [
1097
        ('eq', 'true', 1),
1098
        ('ne', 'true', 2),
1099
    ]
1100
    for operator, value, result in params:
1101
        resp = get_app(pub).get(
1102
            sign_uri(
1103
                '/api/forms/test/list?filter-bool=%s&filter-bool-operator=%s' % (value, operator),
1104
                user=local_user,
1105
            )
1106
        )
1107
        assert len(resp.json) == result
1108
    for operator in ['lt', 'lte', 'gt', 'gte']:
1109
        resp = get_app(pub).get(
1110
            sign_uri(
1111
                '/api/forms/test/list?filter-bool=true&filter-bool-operator=%s' % operator, user=local_user
1112
            ),
1113
            status=400,
1114
        )
1115
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-bool"' % operator
1116

  
1117

  
820 1118
def test_api_list_formdata_date_filter(pub, local_user):
821 1119
    if not pub.is_using_postgresql():
822 1120
        pytest.skip('this requires SQL')
......
833 1131
    formdef.name = 'test'
834 1132
    formdef.workflow_roles = {'_receiver': role.id}
835 1133
    formdef.fields = [
836
        fields.DateField(id='0', label='foobar', varname='foobar', type='date'),
1134
        fields.DateField(id='0', label='Date', varname='date', type='date'),
837 1135
    ]
838 1136
    formdef.store()
839 1137

  
840 1138
    data_class = formdef.data_class()
841 1139
    data_class.wipe()
842 1140

  
843
    for i in range(30):
1141
    for i in range(3):
844 1142
        formdata = data_class()
845
        formdata.data = {'0': time.strptime('2021-06-%02d' % (i + 1), '%Y-%m-%d')}
1143
        formdata.data = {'0': time.strptime('2021-06-%02d' % (i + 10), '%Y-%m-%d')}
846 1144
        formdata.user_id = local_user.id
847 1145
        formdata.just_created()
848 1146
        formdata.jump_status('new')
849 1147
        formdata.store()
850 1148

  
851
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar=2021-06-12', user=local_user))
852
    assert len(resp.json) == 1
1149
    for value in ['2021-06-11', '11/06/2021']:
1150
        resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-date=%s' % value, user=local_user))
1151
        assert len(resp.json) == 1
1152
        params = [
1153
            ('eq', 1),
1154
            ('ne', 2),
1155
            ('lt', 1),
1156
            ('lte', 2),
1157
            ('gt', 1),
1158
            ('gte', 2),
1159
        ]
1160
        for operator, result in params:
1161
            resp = get_app(pub).get(
1162
                sign_uri(
1163
                    '/api/forms/test/list?filter-date=%s&filter-date-operator=%s' % (value, operator),
1164
                    user=local_user,
1165
                )
1166
            )
1167
            assert len(resp.json) == result
1168

  
1169

  
1170
def test_api_list_formdata_email_filter(pub, local_user):
1171
    pub.role_class.wipe()
1172
    role = pub.role_class(name='test')
1173
    role.store()
1174

  
1175
    local_user.roles = [role.id]
1176
    local_user.store()
1177

  
1178
    FormDef.wipe()
1179
    formdef = FormDef()
1180
    formdef.name = 'test'
1181
    formdef.workflow_roles = {'_receiver': role.id}
1182
    formdef.fields = [
1183
        fields.EmailField(id='0', label='Email', type='email', varname='email'),
1184
    ]
1185
    formdef.store()
853 1186

  
854
    # alternate date format
855
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar=12/06/2021', user=local_user))
1187
    data_class = formdef.data_class()
1188
    data_class.wipe()
1189

  
1190
    for i in range(3):
1191
        formdata = data_class()
1192
        formdata.data = {'0': 'a@localhost' if i % 2 else 'b@localhost'}
1193
        formdata.user_id = local_user.id
1194
        formdata.just_created()
1195
        formdata.jump_status('new')
1196
        formdata.store()
1197

  
1198
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-email=a@localhost', user=local_user))
856 1199
    assert len(resp.json) == 1
1200
    params = [
1201
        ('eq', 'a@localhost', 1),
1202
        ('ne', 'a@localhost', 2),
1203
    ]
1204
    for operator, value, result in params:
1205
        resp = get_app(pub).get(
1206
            sign_uri(
1207
                '/api/forms/test/list?filter-email=%s&filter-email-operator=%s' % (value, operator),
1208
                user=local_user,
1209
            )
1210
        )
1211
        assert len(resp.json) == result
1212
    for operator in ['lt', 'lte', 'gt', 'gte']:
1213
        resp = get_app(pub).get(
1214
            sign_uri(
1215
                '/api/forms/test/list?filter-email=a@localhost&filter-email-operator=%s' % operator,
1216
                user=local_user,
1217
            ),
1218
            status=400,
1219
        )
1220
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-email"' % operator
857 1221

  
858 1222

  
859 1223
def test_api_list_formdata_internal_id_filter(pub, local_user):
......
874 1238
    data_class = formdef.data_class()
875 1239
    data_class.wipe()
876 1240

  
877
    for i in range(2):
1241
    for i in range(11):
878 1242
        formdata = data_class()
879 1243
        formdata.data = {}
880 1244
        formdata.user_id = local_user.id
......
889 1253
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-internal-id=42', user=local_user))
890 1254
    assert len(resp.json) == 0
891 1255

  
1256
    params = [
1257
        ('eq', '1', 1),
1258
        ('eq', '01', 1),
1259
        ('ne', '1', 10),
1260
        ('lt', '1', 0),
1261
        ('lte', '1', 1),
1262
        ('gt', '1', 10),
1263
        ('gt', '10', 1),
1264
        ('gte', '1', 11),
1265
    ]
1266
    for operator, value, result in params:
1267
        resp = get_app(pub).get(
1268
            sign_uri(
1269
                '/api/forms/test/list?filter-internal-id=%s&filter-internal-id-operator=%s'
1270
                % (value, operator),
1271
                user=local_user,
1272
            )
1273
        )
1274
        assert len(resp.json) == result
1275
    resp = get_app(pub).get(
1276
        sign_uri('/api/forms/test/list?filter-internal-id=blabla', user=local_user), status=400
1277
    )
1278
    assert resp.json['err_desc'] == 'Invalid value "blabla" for "filter-internal-id-value"'
1279

  
892 1280

  
893 1281
def test_api_list_formdata_number_filter(pub, local_user):
894 1282
    pub.role_class.wipe()
......
934 1322
    data_source = NamedDataSource(name='foobar')
935 1323
    data_source.data_source = {
936 1324
        'type': 'formula',
937
        'value': repr([{'id': '1', 'text': 'foo', 'more': 'XXX'}, {'id': '2', 'text': 'bar', 'more': 'YYY'}]),
1325
        'value': repr([{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]),
938 1326
    }
939 1327
    data_source.store()
940 1328

  
......
991 1379
        if i == 0:
992 1380
            formdata.data['0']['data'].append(
993 1381
                {
994
                    '1': 'plop%s' % i,
1382
                    '1': 'plop%s' % (i + 1),
995 1383
                    '2': '1',
996 1384
                    '2_display': 'foo',
997 1385
                    '2_structured': 'XXX',
......
1012 1400
    assert len(resp.json) == 1
1013 1401
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_string=plop10', user=local_user))
1014 1402
    assert len(resp.json) == 0
1403
    params = [
1404
        ('eq', 'plop5', 1),
1405
        ('ne', 'plop5', 9),
1406
        ('ne', 'plop1', 8),
1407
        ('lt', 'plop5', 5),
1408
        ('lte', 'plop5', 6),
1409
        ('gt', 'plop5', 4),
1410
        ('gt', '42', 0),
1411
        ('gte', 'plop5', 5),
1412
    ]
1413
    for operator, value, result in params:
1414
        resp = get_app(pub).get(
1415
            sign_uri(
1416
                '/api/forms/test/list?filter-blockdata_string=%s&filter-blockdata_string-operator=%s'
1417
                % (value, operator),
1418
                user=local_user,
1419
            )
1420
        )
1421
        assert len(resp.json) == result
1015 1422
    # item
1016 1423
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_item=1', user=local_user))
1017 1424
    assert len(resp.json) == 6
......
1019 1426
    assert len(resp.json) == 5
1020 1427
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_item=3', user=local_user))
1021 1428
    assert len(resp.json) == 0
1429
    params = [
1430
        ('eq', '1', 6),
1431
        ('ne', '1', 4),
1432
        ('lt', '2', 6),
1433
        ('lte', '1', 6),
1434
        ('gt', '1', 5),
1435
        ('gte', '2', 5),
1436
    ]
1437
    for operator, value, result in params:
1438
        resp = get_app(pub).get(
1439
            sign_uri(
1440
                '/api/forms/test/list?filter-blockdata_item=%s&filter-blockdata_item-operator=%s'
1441
                % (value, operator),
1442
                user=local_user,
1443
            )
1444
        )
1445
        assert len(resp.json) == result
1022 1446
    # bool
1023 1447
    resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_bool=true', user=local_user))
1024 1448
    assert len(resp.json) == 6
......
1028 1452
        sign_uri('/api/forms/test/list?filter-blockdata_bool=foobar', user=local_user), status=400
1029 1453
    )
1030 1454
    assert resp.json['err_desc'] == 'Invalid value "foobar" for "filter-blockdata_bool"'
1455
    params = [
1456
        ('eq', 'true', 6),
1457
        ('ne', 'true', 4),
1458
    ]
1459
    for operator, value, result in params:
1460
        resp = get_app(pub).get(
1461
            sign_uri(
1462
                '/api/forms/test/list?filter-blockdata_bool=%s&filter-blockdata_bool-operator=%s'
1463
                % (value, operator),
1464
                user=local_user,
1465
            )
1466
        )
1467
        assert len(resp.json) == result
1468
    for operator in ['lt', 'lte', 'gt', 'gte']:
1469
        resp = get_app(pub).get(
1470
            sign_uri(
1471
                '/api/forms/test/list?filter-blockdata_bool=true&filter-blockdata_bool-operator=%s'
1472
                % operator,
1473
                user=local_user,
1474
            ),
1475
            status=400,
1476
        )
1477
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-blockdata_bool"' % operator
1031 1478
    # date
1032 1479
    resp = get_app(pub).get(
1033 1480
        sign_uri('/api/forms/test/list?filter-blockdata_date=2021-06-01', user=local_user)
......
1041 1488
        sign_uri('/api/forms/test/list?filter-blockdata_date=02/06/2021', user=local_user)
1042 1489
    )
1043 1490
    assert len(resp.json) == 2
1491
    params = [
1492
        ('eq', '2021-06-02', 2),
1493
        ('ne', '2021-06-02', 8),
1494
        ('lt', '2021-06-02', 1),
1495
        ('lte', '2021-06-02', 2),
1496
        ('gt', '2021-06-02', 8),
1497
        ('gte', '2021-06-02', 10),
1498
    ]
1499
    for operator, value, result in params:
1500
        resp = get_app(pub).get(
1501
            sign_uri(
1502
                '/api/forms/test/list?filter-blockdata_date=%s&filter-blockdata_date-operator=%s'
1503
                % (value, operator),
1504
                user=local_user,
1505
            )
1506
        )
1507
        assert len(resp.json) == result
1044 1508
    # email
1045 1509
    resp = get_app(pub).get(
1046 1510
        sign_uri('/api/forms/test/list?filter-blockdata_email=a@localhost', user=local_user)
......
1054 1518
        sign_uri('/api/forms/test/list?filter-blockdata_email=c@localhost', user=local_user)
1055 1519
    )
1056 1520
    assert len(resp.json) == 0
1521
    params = [
1522
        ('eq', 'a@localhost', 6),
1523
        ('ne', 'a@localhost', 4),
1524
    ]
1525
    for operator, value, result in params:
1526
        resp = get_app(pub).get(
1527
            sign_uri(
1528
                '/api/forms/test/list?filter-blockdata_email=%s&filter-blockdata_email-operator=%s'
1529
                % (value, operator),
1530
                user=local_user,
1531
            )
1532
        )
1533
        assert len(resp.json) == result
1534
    for operator in ['lt', 'lte', 'gt', 'gte']:
1535
        resp = get_app(pub).get(
1536
            sign_uri(
1537
                '/api/forms/test/list?filter-blockdata_email=plop0&filter-blockdata_email-operator=%s'
1538
                % operator,
1539
                user=local_user,
1540
            ),
1541
            status=400,
1542
        )
1543
        assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-blockdata_email"' % operator
1057 1544
    # mix
1058 1545
    resp = get_app(pub).get(
1059 1546
        sign_uri(
1060 1547
            '/api/forms/test/list?filter-blockdata_item=1&filter-blockdata_string=plop1', user=local_user
1061 1548
        )
1062 1549
    )
1063
    assert len(resp.json) == 1
1550
    assert len(resp.json) == 2
1064 1551
    resp = get_app(pub).get(
1065 1552
        sign_uri(
1066 1553
            '/api/forms/test/list?filter-blockdata_item=2&filter-blockdata_string=plop1', user=local_user
1067 1554
        )
1068 1555
    )
1069
    assert len(resp.json) == 0
1556
    assert len(resp.json) == 1
1070 1557
    resp = get_app(pub).get(
1071 1558
        sign_uri(
1072 1559
            '/api/forms/test/list?filter-blockdata_item=1&filter-blockdata_string=plop0', user=local_user
wcs/backoffice/management.py
69 69
    Contains,
70 70
    Equal,
71 71
    FtsMatch,
72
    Greater,
72 73
    GreaterOrEqual,
73 74
    Intersects,
75
    Less,
74 76
    LessOrEqual,
75 77
    NotEqual,
76 78
    NotNull,
......
1636 1638
        query_overrides = get_request().form
1637 1639
        return self.get_view_criterias(query_overrides, request=get_request())
1638 1640

  
1639
    def get_view_criterias(self, query_overrides=None, request=None, record_errors=False):
1640
        from wcs import sql
1641
    def get_field_criteria(self, field, operator, field_key):
1642
        mapping = {
1643
            'eq': Equal,
1644
            'ne': NotEqual,
1645
            'lt': Less,
1646
            'lte': LessOrEqual,
1647
            'gt': Greater,
1648
            'gte': GreaterOrEqual,
1649
        }
1650
        types_with_all_ops = ['internal-id', 'date', 'item', 'items', 'string']
1651
        if field.type not in types_with_all_ops and operator not in ['eq', 'ne']:
1652
            # eq and ne are always allowed
1653
            raise RequestError('Invalid operator "%s" for "%s"' % (operator, field_key))
1654
        if operator not in mapping:
1655
            raise RequestError('Invalid operator "%s" for "%s"' % (operator, field_key))
1656
        return mapping[operator]
1641 1657

  
1658
    def get_view_criterias(self, query_overrides=None, request=None, record_errors=False):
1642 1659
        fake_fields = [
1643 1660
            FakeField('internal-id', 'internal-id', _('Identifier')),
1644 1661
            FakeField('number', 'number', _('Number')),
......
1665 1682
        filters_in_request = {
1666 1683
            k.replace('filter-', '')
1667 1684
            for k in filters_dict
1668
            if k.startswith('filter-') and not k.endswith('-value')
1685
            if k.startswith('filter-') and not k.endswith('-value') and not k.endswith('-operator')
1669 1686
        }
1670 1687
        filters_in_request = {
1671 1688
            f
......
1741 1758
            if not filter_field_value:
1742 1759
                continue
1743 1760

  
1744
            if filter_field.type == 'date':
1761
            # get operator and criteria
1762
            filter_field_operator_key = '%s-operator' % filter_field_key.replace('-value', '')
1763
            filter_field_operator = filters_dict.get(filter_field_operator_key) or 'eq'
1764
            criteria = self.get_field_criteria(filter_field, filter_field_operator, filter_field_key)
1765

  
1766
            # check value types
1767
            if filter_field.type == 'internal-id':
1768
                try:
1769
                    filter_field_value = int(filter_field_value)
1770
                except ValueError:
1771
                    raise RequestError('Invalid value "%s" for "%s"' % (filter_field_value, filter_field_key))
1772
            elif filter_field.type == 'period-date':
1773
                try:
1774
                    filter_date_value = misc.get_as_datetime(filter_field_value).timetuple()
1775
                except ValueError:
1776
                    continue
1777
            elif filter_field.type == 'date':
1745 1778
                try:
1746 1779
                    filter_field_value = misc.get_as_datetime(filter_field_value).date().strftime('%Y-%m-%d')
1747 1780
                except ValueError:
......
1753 1786
                    filter_field_value = False
1754 1787
                else:
1755 1788
                    raise RequestError('Invalid value "%s" for "%s"' % (filter_field_value, filter_field_key))
1789
            elif filter_field.type in ('item', 'items', 'string'):
1790
                try:
1791
                    filter_field_value = int(filter_field_value)
1792
                except ValueError:
1793
                    pass
1756 1794

  
1757
            if getattr(filter_field, 'block_field', None):
1758
                criterias.append(
1759
                    sql.ArrayContains(
1760
                        'f%s' % filter_field.block_field.id,
1761
                        json.dumps([{filter_field.id: filter_field_value}]),
1762
                        parent_field=filter_field.block_field,
1763
                    )
1764
                )
1765
            elif filter_field.type == 'internal-id':
1766
                criterias.append(Equal('id', str(filter_field_value)))
1795
            # add criteria
1796
            if filter_field.type == 'internal-id':
1797
                criterias.append(criteria('id', filter_field_value))
1767 1798
            elif filter_field.type == 'number':
1768 1799
                criterias.append(Equal('id_display', str(filter_field_value)))
1769 1800
            elif filter_field.type == 'period-date':
1770
                try:
1771
                    filter_date_value = misc.get_as_datetime(filter_field_value).timetuple()
1772
                except ValueError:
1773
                    continue
1774 1801
                if filter_field.id == 'start':
1775 1802
                    criterias.append(GreaterOrEqual('receipt_time', filter_date_value))
1776 1803
                elif filter_field.id == 'end':
......
1786 1813
                criterias.append(Equal('user_id', filter_field_value))
1787 1814
            elif filter_field.type == 'submission-agent-id':
1788 1815
                criterias.append(Equal('submission_agent_id', filter_field_value))
1789
            elif filter_field.type in ('item', 'items'):
1790
                if filter_field.type == 'item':
1791
                    criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
1792
                elif filter_field.type == 'items':
1793
                    criterias.append(Intersects('f%s' % filter_field.id, [filter_field_value]))
1794
                field_options = filter_field.get_options()
1795
                if field_options and type(field_options[0]) in (list, tuple):
1796
                    for option in field_options:
1797
                        if filter_field_value in (option[0], option[-1]):
1798
                            filter_field_value = option[1]
1799
                            break
1800
                criterias[-1]._label = '%s: %s' % (filter_field.label, filter_field_value)
1801
            elif filter_field.type == 'bool':
1802
                criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
1803
            elif filter_field.type in ('string', 'email'):
1804
                criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
1805
            elif filter_field.type == 'date':
1806
                criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
1816
            elif filter_field.type in ('item', 'items', 'bool', 'string', 'email', 'date'):
1817
                criterias.append(criteria('f%s' % filter_field.id, filter_field_value, field=filter_field))
1818
                if filter_field.type in ('item', 'items'):
1819
                    field_options = filter_field.get_options()
1820
                    if field_options and type(field_options[0]) in (list, tuple):
1821
                        for option in field_options:
1822
                            if filter_field_value in (option[0], option[-1]):
1823
                                filter_field_value = option[1]
1824
                                break
1825
                    criterias[-1]._label = '%s: %s' % (filter_field.label, filter_field_value)
1807 1826

  
1808 1827
        unknown_filters = sorted(filters_in_request - known_filters)
1809 1828
        if unknown_filters:
wcs/qommon/storage.py
155 155
        self.field = kwargs.get('field')
156 156

  
157 157
    def build_lambda(self):
158
        return lambda x: self.op(getattr(x, self.attribute, None) or self.typed_none, self.value)
158
        def func(x):
159
            if self.field and self.field.key in ['item', 'string'] and isinstance(self.value, int):
160
                try:
161
                    _x = int(getattr(x, self.attribute, None))
162
                except (ValueError, TypeError):
163
                    _x = self.typed_none
164
                return self.op(_x, self.value)
165
            if self.field and self.field.key == 'items':
166
                _x = getattr(x, self.attribute, None) or []
167
                if isinstance(self.value, int):
168
                    try:
169
                        _x = [int(e) for e in _x]
170
                    except ValueError:
171
                        _x = []
172
                return getattr(self, 'array_op', any)(self.op(e, self.value) for e in _x)
173
            if isinstance(self.value, int):
174
                return self.op(int(getattr(x, self.attribute, None)) or self.typed_none, self.value)
175
            return self.op(getattr(x, self.attribute, None) or self.typed_none, self.value)
176

  
177
        return func
159 178

  
160 179
    def __repr__(self):
161 180
        return '<%s (attribute: %r%s)>' % (
......
179 198

  
180 199
class NotEqual(Criteria):
181 200
    op = operator.ne
201
    array_op = all
182 202

  
183 203

  
184 204
class LessOrEqual(Criteria):
wcs/sql.py
108 108
        self.attribute = attribute.replace('-', '_')
109 109
        self.value = value
110 110
        self.field = kwargs.get('field')
111
        self.parent_field = kwargs.get('parent_field')
112 111

  
113 112
    def as_sql(self):
113
        if self.field and getattr(self.field, 'block_field', None):
114
            # eq: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' = 'value')
115
            # lt: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' < 'value')
116
            # lte: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' <= 'value')
117
            # gt: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' > 'value')
118
            # gte: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' >= 'value')
119
            # with a NOT EXISTS and the opposite operator:
120
            # ne: NOT EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' = 'value')
121
            # note: aa->>'FOOBAR' can be written with an integer or bool cast
122
            attribute = "aa->>'%s'" % self.field.id
123
            if self.field.key in ['item', 'string'] and isinstance(self.value, int):
124
                # integer cast of db values
125
                attribute = "(CASE WHEN %s~E'^\\\\d+$' THEN (%s)::int ELSE NULL END)" % (attribute, attribute)
126
            elif self.field.key == 'bool':
127
                # bool cast of db values
128
                attribute = '(%s)::bool' % attribute
129
            return "%s(SELECT 1 FROM jsonb_array_elements(%s->'data') AS datas(aa) WHERE %s %s %%(c%s)s)" % (
130
                getattr(self, 'sql_exists', 'EXISTS'),
131
                get_field_id(self.field.block_field),
132
                attribute,
133
                getattr(self, 'sql_op_exists', self.sql_op),
134
                id(self.value),
135
            )
136

  
114 137
        attribute = self.attribute
138

  
139
        if self.field and self.field.key == 'items':
140
            # eq: 'value' = ANY (ITEMS)
141
            # ne: 'value' != ALL (ITEMS)
142
            # with reversed operator:
143
            # lt: 'value' > ANY (ITEMS)
144
            # lte: 'value' >= ANY (ITEMS)
145
            # gt: 'value' < ANY (ITEMS)
146
            # gte: 'value' <= ANY (ITEMS)
147
            # note: ITEMS is written with an integer cast or with a COALESCE expression
148
            if isinstance(self.value, int):
149
                # integer cast of db values
150
                attribute = (
151
                    "CASE WHEN array_to_string(%s, '')~E'^\\\\d+$' THEN %s::int[] ELSE ARRAY[]::int[] END"
152
                    % (attribute, attribute)
153
                )
154
            else:
155
                # for none values
156
                attribute = "COALESCE(%s, ARRAY[]::text[])" % attribute
157
            return '%%(c%s)s %s %s (%s)' % (
158
                id(self.value),
159
                getattr(self, 'sql_reversed_op', self.sql_op),
160
                getattr(self, 'sql_array_op', 'ANY'),
161
                attribute,
162
            )
163

  
115 164
        if self.field and self.field.key == 'computed':
116 165
            attribute = "%s->>'data'" % self.attribute
117
        if self.parent_field and self.parent_field.key == 'block':
118
            attribute = "%s->'data'" % self.attribute
119

  
166
        elif self.field and self.field.key in ['item', 'string'] and isinstance(self.value, int):
167
            # integer cast of db values
168
            attribute = "(CASE WHEN %s~E'^\\\\d+$' THEN %s::int ELSE NULL END)" % (attribute, attribute)
120 169
        return '%s %s %%(c%s)s' % (attribute, self.sql_op, id(self.value))
121 170

  
122 171
    def as_sql_param(self):
......
131 180

  
132 181
class Less(Criteria):
133 182
    sql_op = '<'
183
    sql_reversed_op = '>'
134 184

  
135 185

  
136 186
class Greater(Criteria):
137 187
    sql_op = '>'
188
    sql_reversed_op = '<'
138 189

  
139 190

  
140 191
class Equal(Criteria):
......
148 199

  
149 200
class LessOrEqual(Criteria):
150 201
    sql_op = '<='
202
    sql_reversed_op = '>='
151 203

  
152 204

  
153 205
class GreaterOrEqual(Criteria):
154 206
    sql_op = '>='
207
    sql_reversed_op = '<='
155 208

  
156 209

  
157 210
class NotEqual(Criteria):
158 211
    sql_op = '!='
212
    # in case of items field, we want to write this clause:
213
    # 'value' != ALL (ITEMS)
214
    sql_array_op = 'ALL'
215
    # in case of block field, we want to write this clause:
216
    # NOT EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' = 'value')
217
    # and not:
218
    # EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' != 'value')
219
    sql_exists = 'NOT EXISTS'
220
    sql_op_exists = '='
159 221

  
160 222

  
161 223
class Contains(Criteria):
162
-