0001-api-add-filter-operators-on-formdata-listing-60785.patch
tests/api/test_formdata.py | ||
---|---|---|
817 | 817 |
assert resp.json['err_desc'] == 'Invalid filters "baz", "foobar"' |
818 | 818 | |
819 | 819 | |
820 |
def test_api_list_formdata_string_filter(pub, local_user): |
|
821 |
pub.role_class.wipe() |
|
822 |
role = pub.role_class(name='test') |
|
823 |
role.store() |
|
824 | ||
825 |
local_user.roles = [role.id] |
|
826 |
local_user.store() |
|
827 | ||
828 |
FormDef.wipe() |
|
829 |
formdef = FormDef() |
|
830 |
formdef.name = 'test' |
|
831 |
formdef.workflow_roles = {'_receiver': role.id} |
|
832 |
formdef.fields = [ |
|
833 |
fields.StringField(id='0', label='String', type='string', varname='string'), |
|
834 |
fields.StringField(id='1', label='String2', type='string', varname='string2'), |
|
835 |
] |
|
836 |
formdef.store() |
|
837 | ||
838 |
data_class = formdef.data_class() |
|
839 |
data_class.wipe() |
|
840 | ||
841 |
for i in range(3): |
|
842 |
formdata = data_class() |
|
843 |
formdata.data = { |
|
844 |
'0': 'FOO %s' % i, |
|
845 |
'1': '%s' % (9 + i), |
|
846 |
} |
|
847 |
formdata.user_id = local_user.id |
|
848 |
formdata.just_created() |
|
849 |
formdata.jump_status('new') |
|
850 |
formdata.store() |
|
851 | ||
852 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-string=FOO 2', user=local_user)) |
|
853 |
assert len(resp.json) == 1 |
|
854 |
params = [ |
|
855 |
('eq', 'FOO 2', 1), |
|
856 |
('ne', 'FOO 2', 2), |
|
857 |
('lt', 'FOO 2', 2), |
|
858 |
('lte', 'FOO 2', 3), |
|
859 |
('gt', 'FOO 2', 0), |
|
860 |
('gt', '42', 0), |
|
861 |
('gte', 'FOO 2', 1), |
|
862 |
] |
|
863 |
for operator, value, result in params: |
|
864 |
resp = get_app(pub).get( |
|
865 |
sign_uri( |
|
866 |
'/api/forms/test/list?filter-string=%s&filter-string-operator=%s' % (value, operator), |
|
867 |
user=local_user, |
|
868 |
) |
|
869 |
) |
|
870 |
assert len(resp.json) == result |
|
871 | ||
872 |
params = [ |
|
873 |
('eq', '10', 1), |
|
874 |
('eq', '010', 1), |
|
875 |
('ne', '10', 2), |
|
876 |
('lt', '10', 1), |
|
877 |
('lte', '10', 2), |
|
878 |
('gt', '10', 1), |
|
879 |
('gt', '9', 2), |
|
880 |
('gte', '10', 2), |
|
881 |
] |
|
882 |
for operator, value, result in params: |
|
883 |
resp = get_app(pub).get( |
|
884 |
sign_uri( |
|
885 |
'/api/forms/test/list?filter-string2=%s&filter-string2-operator=%s' % (value, operator), |
|
886 |
user=local_user, |
|
887 |
) |
|
888 |
) |
|
889 |
assert len(resp.json) == result |
|
890 | ||
891 | ||
892 |
def test_api_list_formdata_item_filter(pub, local_user): |
|
893 |
pub.role_class.wipe() |
|
894 |
role = pub.role_class(name='test') |
|
895 |
role.store() |
|
896 | ||
897 |
local_user.roles = [role.id] |
|
898 |
local_user.store() |
|
899 | ||
900 |
NamedDataSource.wipe() |
|
901 |
data_source = NamedDataSource(name='foobar') |
|
902 |
data_source.data_source = { |
|
903 |
'type': 'formula', |
|
904 |
'value': repr([{'id': '9', 'text': 'foo'}, {'id': '10', 'text': 'bar'}, {'id': '11', 'text': 'baz'}]), |
|
905 |
} |
|
906 |
data_source.store() |
|
907 | ||
908 |
FormDef.wipe() |
|
909 |
formdef = FormDef() |
|
910 |
formdef.name = 'test' |
|
911 |
formdef.workflow_roles = {'_receiver': role.id} |
|
912 |
formdef.fields = [ |
|
913 |
fields.ItemField(id='0', label='Item', type='item', data_source={'type': 'foobar'}, varname='item'), |
|
914 |
fields.ItemField(id='1', label='Other Item', type='item', items=['foo', 'bar'], varname='item2'), |
|
915 |
] |
|
916 |
formdef.store() |
|
917 | ||
918 |
data_class = formdef.data_class() |
|
919 |
data_class.wipe() |
|
920 | ||
921 |
for i in range(3): |
|
922 |
formdata = data_class() |
|
923 |
formdata.data = { |
|
924 |
'0': str(9 + i), |
|
925 |
'1': 'foo' if i % 2 else 'bar', |
|
926 |
} |
|
927 |
formdata.user_id = local_user.id |
|
928 |
formdata.just_created() |
|
929 |
formdata.jump_status('new') |
|
930 |
formdata.store() |
|
931 | ||
932 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-item=9', user=local_user)) |
|
933 |
assert len(resp.json) == 1 |
|
934 |
params = [ |
|
935 |
('eq', '10', 1), |
|
936 |
('eq', '010', 1), |
|
937 |
('ne', '10', 2), |
|
938 |
('lt', '10', 1), |
|
939 |
('lte', '10', 2), |
|
940 |
('gt', '10', 1), |
|
941 |
('gt', '9', 2), |
|
942 |
('gte', '10', 2), |
|
943 |
] |
|
944 |
for operator, value, result in params: |
|
945 |
resp = get_app(pub).get( |
|
946 |
sign_uri( |
|
947 |
'/api/forms/test/list?filter-item=%s&filter-item-operator=%s' % (value, operator), |
|
948 |
user=local_user, |
|
949 |
) |
|
950 |
) |
|
951 |
assert len(resp.json) == result |
|
952 | ||
953 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-item2=foo', user=local_user)) |
|
954 |
assert len(resp.json) == 1 |
|
955 |
params = [ |
|
956 |
('eq', 'foo', 1), |
|
957 |
('ne', 'foo', 2), |
|
958 |
('lt', 'foo', 2), |
|
959 |
('lte', 'foo', 3), |
|
960 |
('gt', 'foo', 0), |
|
961 |
('gt', '42', 0), |
|
962 |
('gte', 'foo', 1), |
|
963 |
] |
|
964 |
for operator, value, result in params: |
|
965 |
resp = get_app(pub).get( |
|
966 |
sign_uri( |
|
967 |
'/api/forms/test/list?filter-item2=%s&filter-item2-operator=%s' % (value, operator), |
|
968 |
user=local_user, |
|
969 |
) |
|
970 |
) |
|
971 |
assert len(resp.json) == result |
|
972 | ||
973 | ||
974 |
def test_api_list_formdata_items_filter(pub, local_user): |
|
975 |
pub.role_class.wipe() |
|
976 |
role = pub.role_class(name='test') |
|
977 |
role.store() |
|
978 | ||
979 |
local_user.roles = [role.id] |
|
980 |
local_user.store() |
|
981 | ||
982 |
NamedDataSource.wipe() |
|
983 |
data_source = NamedDataSource(name='foobar') |
|
984 |
data_source.data_source = { |
|
985 |
'type': 'formula', |
|
986 |
'value': repr([{'id': '9', 'text': 'foo'}, {'id': '10', 'text': 'bar'}, {'id': '11', 'text': 'baz'}]), |
|
987 |
} |
|
988 |
data_source.store() |
|
989 | ||
990 |
FormDef.wipe() |
|
991 |
formdef = FormDef() |
|
992 |
formdef.name = 'test' |
|
993 |
formdef.workflow_roles = {'_receiver': role.id} |
|
994 |
formdef.fields = [ |
|
995 |
fields.ItemsField( |
|
996 |
id='0', label='Items', type='items', data_source={'type': 'foobar'}, varname='items' |
|
997 |
), |
|
998 |
fields.ItemsField( |
|
999 |
id='1', label='Other Item', type='items', items=['foo', 'bar', 'baz'], varname='items2' |
|
1000 |
), |
|
1001 |
] |
|
1002 |
formdef.store() |
|
1003 | ||
1004 |
data_class = formdef.data_class() |
|
1005 |
data_class.wipe() |
|
1006 | ||
1007 |
for i in range(4): |
|
1008 |
formdata = data_class() |
|
1009 |
formdata.data = {} |
|
1010 |
if i < 3: |
|
1011 |
formdata.data = { |
|
1012 |
'0': ['9' if i % 2 else '11', '10'], |
|
1013 |
'1': ['foo' if i % 2 else 'bar', 'baz'], |
|
1014 |
} |
|
1015 |
formdata.user_id = local_user.id |
|
1016 |
formdata.just_created() |
|
1017 |
formdata.jump_status('new') |
|
1018 |
formdata.store() |
|
1019 | ||
1020 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-items=11', user=local_user)) |
|
1021 |
assert len(resp.json) == 2 |
|
1022 |
params = [ |
|
1023 |
('eq', '11', 2), |
|
1024 |
('eq', '011', 2), |
|
1025 |
('eq', '10', 3), |
|
1026 |
('ne', '9', 3), |
|
1027 |
('ne', '10', 1), |
|
1028 |
('lt', '10', 1), |
|
1029 |
('lte', '10', 3), |
|
1030 |
('gt', '10', 2), |
|
1031 |
('gt', '9', 3), |
|
1032 |
('gte', '11', 2), |
|
1033 |
] |
|
1034 |
for operator, value, result in params: |
|
1035 |
resp = get_app(pub).get( |
|
1036 |
sign_uri( |
|
1037 |
'/api/forms/test/list?filter-items=%s&filter-items-operator=%s' % (value, operator), |
|
1038 |
user=local_user, |
|
1039 |
) |
|
1040 |
) |
|
1041 |
assert len(resp.json) == result |
|
1042 | ||
1043 |
params = [ |
|
1044 |
('eq', 'foo', 1), |
|
1045 |
('ne', 'foo', 3), |
|
1046 |
('lt', 'foo', 3), |
|
1047 |
('lte', 'foo', 3), |
|
1048 |
('gt', 'foo', 0), |
|
1049 |
('gt', '42', 0), |
|
1050 |
('gte', 'foo', 1), |
|
1051 |
] |
|
1052 |
for operator, value, result in params: |
|
1053 |
resp = get_app(pub).get( |
|
1054 |
sign_uri( |
|
1055 |
'/api/forms/test/list?filter-items2=%s&filter-items2-operator=%s' % (value, operator), |
|
1056 |
user=local_user, |
|
1057 |
) |
|
1058 |
) |
|
1059 |
assert len(resp.json) == result |
|
1060 | ||
1061 | ||
1062 |
def test_api_list_formdata_bool_filter(pub, local_user): |
|
1063 |
pub.role_class.wipe() |
|
1064 |
role = pub.role_class(name='test') |
|
1065 |
role.store() |
|
1066 | ||
1067 |
local_user.roles = [role.id] |
|
1068 |
local_user.store() |
|
1069 | ||
1070 |
FormDef.wipe() |
|
1071 |
formdef = FormDef() |
|
1072 |
formdef.name = 'test' |
|
1073 |
formdef.workflow_roles = {'_receiver': role.id} |
|
1074 |
formdef.fields = [ |
|
1075 |
fields.BoolField(id='0', label='Bool', type='bool', varname='bool'), |
|
1076 |
] |
|
1077 |
formdef.store() |
|
1078 | ||
1079 |
data_class = formdef.data_class() |
|
1080 |
data_class.wipe() |
|
1081 | ||
1082 |
for i in range(3): |
|
1083 |
formdata = data_class() |
|
1084 |
formdata.data = { |
|
1085 |
'0': bool(i % 2), |
|
1086 |
} |
|
1087 |
formdata.user_id = local_user.id |
|
1088 |
formdata.just_created() |
|
1089 |
formdata.jump_status('new') |
|
1090 |
formdata.store() |
|
1091 | ||
1092 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-bool=false', user=local_user)) |
|
1093 |
assert len(resp.json) == 2 |
|
1094 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-bool=true', user=local_user)) |
|
1095 |
assert len(resp.json) == 1 |
|
1096 |
params = [ |
|
1097 |
('eq', 'true', 1), |
|
1098 |
('ne', 'true', 2), |
|
1099 |
] |
|
1100 |
for operator, value, result in params: |
|
1101 |
resp = get_app(pub).get( |
|
1102 |
sign_uri( |
|
1103 |
'/api/forms/test/list?filter-bool=%s&filter-bool-operator=%s' % (value, operator), |
|
1104 |
user=local_user, |
|
1105 |
) |
|
1106 |
) |
|
1107 |
assert len(resp.json) == result |
|
1108 |
for operator in ['lt', 'lte', 'gt', 'gte']: |
|
1109 |
resp = get_app(pub).get( |
|
1110 |
sign_uri( |
|
1111 |
'/api/forms/test/list?filter-bool=true&filter-bool-operator=%s' % operator, user=local_user |
|
1112 |
), |
|
1113 |
status=400, |
|
1114 |
) |
|
1115 |
assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-bool"' % operator |
|
1116 | ||
1117 | ||
820 | 1118 |
def test_api_list_formdata_date_filter(pub, local_user): |
821 | 1119 |
if not pub.is_using_postgresql(): |
822 | 1120 |
pytest.skip('this requires SQL') |
... | ... | |
833 | 1131 |
formdef.name = 'test' |
834 | 1132 |
formdef.workflow_roles = {'_receiver': role.id} |
835 | 1133 |
formdef.fields = [ |
836 |
fields.DateField(id='0', label='foobar', varname='foobar', type='date'),
|
|
1134 |
fields.DateField(id='0', label='Date', varname='date', type='date'),
|
|
837 | 1135 |
] |
838 | 1136 |
formdef.store() |
839 | 1137 | |
840 | 1138 |
data_class = formdef.data_class() |
841 | 1139 |
data_class.wipe() |
842 | 1140 | |
843 |
for i in range(30):
|
|
1141 |
for i in range(3): |
|
844 | 1142 |
formdata = data_class() |
845 |
formdata.data = {'0': time.strptime('2021-06-%02d' % (i + 1), '%Y-%m-%d')} |
|
1143 |
formdata.data = {'0': time.strptime('2021-06-%02d' % (i + 10), '%Y-%m-%d')}
|
|
846 | 1144 |
formdata.user_id = local_user.id |
847 | 1145 |
formdata.just_created() |
848 | 1146 |
formdata.jump_status('new') |
849 | 1147 |
formdata.store() |
850 | 1148 | |
851 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar=2021-06-12', user=local_user)) |
|
852 |
assert len(resp.json) == 1 |
|
1149 |
for value in ['2021-06-11', '11/06/2021']: |
|
1150 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-date=%s' % value, user=local_user)) |
|
1151 |
assert len(resp.json) == 1 |
|
1152 |
params = [ |
|
1153 |
('eq', 1), |
|
1154 |
('ne', 2), |
|
1155 |
('lt', 1), |
|
1156 |
('lte', 2), |
|
1157 |
('gt', 1), |
|
1158 |
('gte', 2), |
|
1159 |
] |
|
1160 |
for operator, result in params: |
|
1161 |
resp = get_app(pub).get( |
|
1162 |
sign_uri( |
|
1163 |
'/api/forms/test/list?filter-date=%s&filter-date-operator=%s' % (value, operator), |
|
1164 |
user=local_user, |
|
1165 |
) |
|
1166 |
) |
|
1167 |
assert len(resp.json) == result |
|
1168 | ||
1169 | ||
1170 |
def test_api_list_formdata_email_filter(pub, local_user): |
|
1171 |
pub.role_class.wipe() |
|
1172 |
role = pub.role_class(name='test') |
|
1173 |
role.store() |
|
1174 | ||
1175 |
local_user.roles = [role.id] |
|
1176 |
local_user.store() |
|
1177 | ||
1178 |
FormDef.wipe() |
|
1179 |
formdef = FormDef() |
|
1180 |
formdef.name = 'test' |
|
1181 |
formdef.workflow_roles = {'_receiver': role.id} |
|
1182 |
formdef.fields = [ |
|
1183 |
fields.EmailField(id='0', label='Email', type='email', varname='email'), |
|
1184 |
] |
|
1185 |
formdef.store() |
|
853 | 1186 | |
854 |
# alternate date format |
|
855 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-foobar=12/06/2021', user=local_user)) |
|
1187 |
data_class = formdef.data_class() |
|
1188 |
data_class.wipe() |
|
1189 | ||
1190 |
for i in range(3): |
|
1191 |
formdata = data_class() |
|
1192 |
formdata.data = {'0': 'a@localhost' if i % 2 else 'b@localhost'} |
|
1193 |
formdata.user_id = local_user.id |
|
1194 |
formdata.just_created() |
|
1195 |
formdata.jump_status('new') |
|
1196 |
formdata.store() |
|
1197 | ||
1198 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-email=a@localhost', user=local_user)) |
|
856 | 1199 |
assert len(resp.json) == 1 |
1200 |
params = [ |
|
1201 |
('eq', 'a@localhost', 1), |
|
1202 |
('ne', 'a@localhost', 2), |
|
1203 |
] |
|
1204 |
for operator, value, result in params: |
|
1205 |
resp = get_app(pub).get( |
|
1206 |
sign_uri( |
|
1207 |
'/api/forms/test/list?filter-email=%s&filter-email-operator=%s' % (value, operator), |
|
1208 |
user=local_user, |
|
1209 |
) |
|
1210 |
) |
|
1211 |
assert len(resp.json) == result |
|
1212 |
for operator in ['lt', 'lte', 'gt', 'gte']: |
|
1213 |
resp = get_app(pub).get( |
|
1214 |
sign_uri( |
|
1215 |
'/api/forms/test/list?filter-email=a@localhost&filter-email-operator=%s' % operator, |
|
1216 |
user=local_user, |
|
1217 |
), |
|
1218 |
status=400, |
|
1219 |
) |
|
1220 |
assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-email"' % operator |
|
857 | 1221 | |
858 | 1222 | |
859 | 1223 |
def test_api_list_formdata_internal_id_filter(pub, local_user): |
... | ... | |
874 | 1238 |
data_class = formdef.data_class() |
875 | 1239 |
data_class.wipe() |
876 | 1240 | |
877 |
for i in range(2):
|
|
1241 |
for i in range(11):
|
|
878 | 1242 |
formdata = data_class() |
879 | 1243 |
formdata.data = {} |
880 | 1244 |
formdata.user_id = local_user.id |
... | ... | |
889 | 1253 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-internal-id=42', user=local_user)) |
890 | 1254 |
assert len(resp.json) == 0 |
891 | 1255 | |
1256 |
params = [ |
|
1257 |
('eq', '1', 1), |
|
1258 |
('eq', '01', 1), |
|
1259 |
('ne', '1', 10), |
|
1260 |
('lt', '1', 0), |
|
1261 |
('lte', '1', 1), |
|
1262 |
('gt', '1', 10), |
|
1263 |
('gt', '10', 1), |
|
1264 |
('gte', '1', 11), |
|
1265 |
] |
|
1266 |
for operator, value, result in params: |
|
1267 |
resp = get_app(pub).get( |
|
1268 |
sign_uri( |
|
1269 |
'/api/forms/test/list?filter-internal-id=%s&filter-internal-id-operator=%s' |
|
1270 |
% (value, operator), |
|
1271 |
user=local_user, |
|
1272 |
) |
|
1273 |
) |
|
1274 |
assert len(resp.json) == result |
|
1275 |
resp = get_app(pub).get( |
|
1276 |
sign_uri('/api/forms/test/list?filter-internal-id=blabla', user=local_user), status=400 |
|
1277 |
) |
|
1278 |
assert resp.json['err_desc'] == 'Invalid value "blabla" for "filter-internal-id-value"' |
|
1279 | ||
892 | 1280 | |
893 | 1281 |
def test_api_list_formdata_number_filter(pub, local_user): |
894 | 1282 |
pub.role_class.wipe() |
... | ... | |
934 | 1322 |
data_source = NamedDataSource(name='foobar') |
935 | 1323 |
data_source.data_source = { |
936 | 1324 |
'type': 'formula', |
937 |
'value': repr([{'id': '1', 'text': 'foo', 'more': 'XXX'}, {'id': '2', 'text': 'bar', 'more': 'YYY'}]),
|
|
1325 |
'value': repr([{'id': '1', 'text': 'foo'}, {'id': '2', 'text': 'bar'}]),
|
|
938 | 1326 |
} |
939 | 1327 |
data_source.store() |
940 | 1328 | |
... | ... | |
991 | 1379 |
if i == 0: |
992 | 1380 |
formdata.data['0']['data'].append( |
993 | 1381 |
{ |
994 |
'1': 'plop%s' % i,
|
|
1382 |
'1': 'plop%s' % (i + 1),
|
|
995 | 1383 |
'2': '1', |
996 | 1384 |
'2_display': 'foo', |
997 | 1385 |
'2_structured': 'XXX', |
... | ... | |
1012 | 1400 |
assert len(resp.json) == 1 |
1013 | 1401 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_string=plop10', user=local_user)) |
1014 | 1402 |
assert len(resp.json) == 0 |
1403 |
params = [ |
|
1404 |
('eq', 'plop5', 1), |
|
1405 |
('ne', 'plop5', 9), |
|
1406 |
('ne', 'plop1', 8), |
|
1407 |
('lt', 'plop5', 5), |
|
1408 |
('lte', 'plop5', 6), |
|
1409 |
('gt', 'plop5', 4), |
|
1410 |
('gt', '42', 0), |
|
1411 |
('gte', 'plop5', 5), |
|
1412 |
] |
|
1413 |
for operator, value, result in params: |
|
1414 |
resp = get_app(pub).get( |
|
1415 |
sign_uri( |
|
1416 |
'/api/forms/test/list?filter-blockdata_string=%s&filter-blockdata_string-operator=%s' |
|
1417 |
% (value, operator), |
|
1418 |
user=local_user, |
|
1419 |
) |
|
1420 |
) |
|
1421 |
assert len(resp.json) == result |
|
1015 | 1422 |
# item |
1016 | 1423 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_item=1', user=local_user)) |
1017 | 1424 |
assert len(resp.json) == 6 |
... | ... | |
1019 | 1426 |
assert len(resp.json) == 5 |
1020 | 1427 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_item=3', user=local_user)) |
1021 | 1428 |
assert len(resp.json) == 0 |
1429 |
params = [ |
|
1430 |
('eq', '1', 6), |
|
1431 |
('ne', '1', 4), |
|
1432 |
('lt', '2', 6), |
|
1433 |
('lte', '1', 6), |
|
1434 |
('gt', '1', 5), |
|
1435 |
('gte', '2', 5), |
|
1436 |
] |
|
1437 |
for operator, value, result in params: |
|
1438 |
resp = get_app(pub).get( |
|
1439 |
sign_uri( |
|
1440 |
'/api/forms/test/list?filter-blockdata_item=%s&filter-blockdata_item-operator=%s' |
|
1441 |
% (value, operator), |
|
1442 |
user=local_user, |
|
1443 |
) |
|
1444 |
) |
|
1445 |
assert len(resp.json) == result |
|
1022 | 1446 |
# bool |
1023 | 1447 |
resp = get_app(pub).get(sign_uri('/api/forms/test/list?filter-blockdata_bool=true', user=local_user)) |
1024 | 1448 |
assert len(resp.json) == 6 |
... | ... | |
1028 | 1452 |
sign_uri('/api/forms/test/list?filter-blockdata_bool=foobar', user=local_user), status=400 |
1029 | 1453 |
) |
1030 | 1454 |
assert resp.json['err_desc'] == 'Invalid value "foobar" for "filter-blockdata_bool"' |
1455 |
params = [ |
|
1456 |
('eq', 'true', 6), |
|
1457 |
('ne', 'true', 4), |
|
1458 |
] |
|
1459 |
for operator, value, result in params: |
|
1460 |
resp = get_app(pub).get( |
|
1461 |
sign_uri( |
|
1462 |
'/api/forms/test/list?filter-blockdata_bool=%s&filter-blockdata_bool-operator=%s' |
|
1463 |
% (value, operator), |
|
1464 |
user=local_user, |
|
1465 |
) |
|
1466 |
) |
|
1467 |
assert len(resp.json) == result |
|
1468 |
for operator in ['lt', 'lte', 'gt', 'gte']: |
|
1469 |
resp = get_app(pub).get( |
|
1470 |
sign_uri( |
|
1471 |
'/api/forms/test/list?filter-blockdata_bool=true&filter-blockdata_bool-operator=%s' |
|
1472 |
% operator, |
|
1473 |
user=local_user, |
|
1474 |
), |
|
1475 |
status=400, |
|
1476 |
) |
|
1477 |
assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-blockdata_bool"' % operator |
|
1031 | 1478 |
# date |
1032 | 1479 |
resp = get_app(pub).get( |
1033 | 1480 |
sign_uri('/api/forms/test/list?filter-blockdata_date=2021-06-01', user=local_user) |
... | ... | |
1041 | 1488 |
sign_uri('/api/forms/test/list?filter-blockdata_date=02/06/2021', user=local_user) |
1042 | 1489 |
) |
1043 | 1490 |
assert len(resp.json) == 2 |
1491 |
params = [ |
|
1492 |
('eq', '2021-06-02', 2), |
|
1493 |
('ne', '2021-06-02', 8), |
|
1494 |
('lt', '2021-06-02', 1), |
|
1495 |
('lte', '2021-06-02', 2), |
|
1496 |
('gt', '2021-06-02', 8), |
|
1497 |
('gte', '2021-06-02', 10), |
|
1498 |
] |
|
1499 |
for operator, value, result in params: |
|
1500 |
resp = get_app(pub).get( |
|
1501 |
sign_uri( |
|
1502 |
'/api/forms/test/list?filter-blockdata_date=%s&filter-blockdata_date-operator=%s' |
|
1503 |
% (value, operator), |
|
1504 |
user=local_user, |
|
1505 |
) |
|
1506 |
) |
|
1507 |
assert len(resp.json) == result |
|
1044 | 1508 |
|
1045 | 1509 |
resp = get_app(pub).get( |
1046 | 1510 |
sign_uri('/api/forms/test/list?filter-blockdata_email=a@localhost', user=local_user) |
... | ... | |
1054 | 1518 |
sign_uri('/api/forms/test/list?filter-blockdata_email=c@localhost', user=local_user) |
1055 | 1519 |
) |
1056 | 1520 |
assert len(resp.json) == 0 |
1521 |
params = [ |
|
1522 |
('eq', 'a@localhost', 6), |
|
1523 |
('ne', 'a@localhost', 4), |
|
1524 |
] |
|
1525 |
for operator, value, result in params: |
|
1526 |
resp = get_app(pub).get( |
|
1527 |
sign_uri( |
|
1528 |
'/api/forms/test/list?filter-blockdata_email=%s&filter-blockdata_email-operator=%s' |
|
1529 |
% (value, operator), |
|
1530 |
user=local_user, |
|
1531 |
) |
|
1532 |
) |
|
1533 |
assert len(resp.json) == result |
|
1534 |
for operator in ['lt', 'lte', 'gt', 'gte']: |
|
1535 |
resp = get_app(pub).get( |
|
1536 |
sign_uri( |
|
1537 |
'/api/forms/test/list?filter-blockdata_email=plop0&filter-blockdata_email-operator=%s' |
|
1538 |
% operator, |
|
1539 |
user=local_user, |
|
1540 |
), |
|
1541 |
status=400, |
|
1542 |
) |
|
1543 |
assert resp.json['err_desc'] == 'Invalid operator "%s" for "filter-blockdata_email"' % operator |
|
1057 | 1544 |
# mix |
1058 | 1545 |
resp = get_app(pub).get( |
1059 | 1546 |
sign_uri( |
1060 | 1547 |
'/api/forms/test/list?filter-blockdata_item=1&filter-blockdata_string=plop1', user=local_user |
1061 | 1548 |
) |
1062 | 1549 |
) |
1063 |
assert len(resp.json) == 1
|
|
1550 |
assert len(resp.json) == 2
|
|
1064 | 1551 |
resp = get_app(pub).get( |
1065 | 1552 |
sign_uri( |
1066 | 1553 |
'/api/forms/test/list?filter-blockdata_item=2&filter-blockdata_string=plop1', user=local_user |
1067 | 1554 |
) |
1068 | 1555 |
) |
1069 |
assert len(resp.json) == 0
|
|
1556 |
assert len(resp.json) == 1
|
|
1070 | 1557 |
resp = get_app(pub).get( |
1071 | 1558 |
sign_uri( |
1072 | 1559 |
'/api/forms/test/list?filter-blockdata_item=1&filter-blockdata_string=plop0', user=local_user |
wcs/backoffice/management.py | ||
---|---|---|
69 | 69 |
Contains, |
70 | 70 |
Equal, |
71 | 71 |
FtsMatch, |
72 |
Greater, |
|
72 | 73 |
GreaterOrEqual, |
73 | 74 |
Intersects, |
75 |
Less, |
|
74 | 76 |
LessOrEqual, |
75 | 77 |
NotEqual, |
76 | 78 |
NotNull, |
... | ... | |
1636 | 1638 |
query_overrides = get_request().form |
1637 | 1639 |
return self.get_view_criterias(query_overrides, request=get_request()) |
1638 | 1640 | |
1639 |
def get_view_criterias(self, query_overrides=None, request=None, record_errors=False): |
|
1640 |
from wcs import sql |
|
1641 |
def get_field_criteria(self, field, operator, field_key): |
|
1642 |
mapping = { |
|
1643 |
'eq': Equal, |
|
1644 |
'ne': NotEqual, |
|
1645 |
'lt': Less, |
|
1646 |
'lte': LessOrEqual, |
|
1647 |
'gt': Greater, |
|
1648 |
'gte': GreaterOrEqual, |
|
1649 |
} |
|
1650 |
types_with_all_ops = ['internal-id', 'date', 'item', 'items', 'string'] |
|
1651 |
if field.type not in types_with_all_ops and operator not in ['eq', 'ne']: |
|
1652 |
# eq and ne are always allowed |
|
1653 |
raise RequestError('Invalid operator "%s" for "%s"' % (operator, field_key)) |
|
1654 |
if operator not in mapping: |
|
1655 |
raise RequestError('Invalid operator "%s" for "%s"' % (operator, field_key)) |
|
1656 |
return mapping[operator] |
|
1641 | 1657 | |
1658 |
def get_view_criterias(self, query_overrides=None, request=None, record_errors=False): |
|
1642 | 1659 |
fake_fields = [ |
1643 | 1660 |
FakeField('internal-id', 'internal-id', _('Identifier')), |
1644 | 1661 |
FakeField('number', 'number', _('Number')), |
... | ... | |
1665 | 1682 |
filters_in_request = { |
1666 | 1683 |
k.replace('filter-', '') |
1667 | 1684 |
for k in filters_dict |
1668 |
if k.startswith('filter-') and not k.endswith('-value') |
|
1685 |
if k.startswith('filter-') and not k.endswith('-value') and not k.endswith('-operator')
|
|
1669 | 1686 |
} |
1670 | 1687 |
filters_in_request = { |
1671 | 1688 |
f |
... | ... | |
1741 | 1758 |
if not filter_field_value: |
1742 | 1759 |
continue |
1743 | 1760 | |
1744 |
if filter_field.type == 'date': |
|
1761 |
# get operator and criteria |
|
1762 |
filter_field_operator_key = '%s-operator' % filter_field_key.replace('-value', '') |
|
1763 |
filter_field_operator = filters_dict.get(filter_field_operator_key) or 'eq' |
|
1764 |
criteria = self.get_field_criteria(filter_field, filter_field_operator, filter_field_key) |
|
1765 | ||
1766 |
# check value types |
|
1767 |
if filter_field.type == 'internal-id': |
|
1768 |
try: |
|
1769 |
filter_field_value = int(filter_field_value) |
|
1770 |
except ValueError: |
|
1771 |
raise RequestError('Invalid value "%s" for "%s"' % (filter_field_value, filter_field_key)) |
|
1772 |
elif filter_field.type == 'period-date': |
|
1773 |
try: |
|
1774 |
filter_date_value = misc.get_as_datetime(filter_field_value).timetuple() |
|
1775 |
except ValueError: |
|
1776 |
continue |
|
1777 |
elif filter_field.type == 'date': |
|
1745 | 1778 |
try: |
1746 | 1779 |
filter_field_value = misc.get_as_datetime(filter_field_value).date().strftime('%Y-%m-%d') |
1747 | 1780 |
except ValueError: |
... | ... | |
1753 | 1786 |
filter_field_value = False |
1754 | 1787 |
else: |
1755 | 1788 |
raise RequestError('Invalid value "%s" for "%s"' % (filter_field_value, filter_field_key)) |
1789 |
elif filter_field.type in ('item', 'items', 'string'): |
|
1790 |
try: |
|
1791 |
filter_field_value = int(filter_field_value) |
|
1792 |
except ValueError: |
|
1793 |
pass |
|
1756 | 1794 | |
1757 |
if getattr(filter_field, 'block_field', None): |
|
1758 |
criterias.append( |
|
1759 |
sql.ArrayContains( |
|
1760 |
'f%s' % filter_field.block_field.id, |
|
1761 |
json.dumps([{filter_field.id: filter_field_value}]), |
|
1762 |
parent_field=filter_field.block_field, |
|
1763 |
) |
|
1764 |
) |
|
1765 |
elif filter_field.type == 'internal-id': |
|
1766 |
criterias.append(Equal('id', str(filter_field_value))) |
|
1795 |
# add criteria |
|
1796 |
if filter_field.type == 'internal-id': |
|
1797 |
criterias.append(criteria('id', filter_field_value)) |
|
1767 | 1798 |
elif filter_field.type == 'number': |
1768 | 1799 |
criterias.append(Equal('id_display', str(filter_field_value))) |
1769 | 1800 |
elif filter_field.type == 'period-date': |
1770 |
try: |
|
1771 |
filter_date_value = misc.get_as_datetime(filter_field_value).timetuple() |
|
1772 |
except ValueError: |
|
1773 |
continue |
|
1774 | 1801 |
if filter_field.id == 'start': |
1775 | 1802 |
criterias.append(GreaterOrEqual('receipt_time', filter_date_value)) |
1776 | 1803 |
elif filter_field.id == 'end': |
... | ... | |
1786 | 1813 |
criterias.append(Equal('user_id', filter_field_value)) |
1787 | 1814 |
elif filter_field.type == 'submission-agent-id': |
1788 | 1815 |
criterias.append(Equal('submission_agent_id', filter_field_value)) |
1789 |
elif filter_field.type in ('item', 'items'): |
|
1790 |
if filter_field.type == 'item': |
|
1791 |
criterias.append(Equal('f%s' % filter_field.id, filter_field_value)) |
|
1792 |
elif filter_field.type == 'items': |
|
1793 |
criterias.append(Intersects('f%s' % filter_field.id, [filter_field_value])) |
|
1794 |
field_options = filter_field.get_options() |
|
1795 |
if field_options and type(field_options[0]) in (list, tuple): |
|
1796 |
for option in field_options: |
|
1797 |
if filter_field_value in (option[0], option[-1]): |
|
1798 |
filter_field_value = option[1] |
|
1799 |
break |
|
1800 |
criterias[-1]._label = '%s: %s' % (filter_field.label, filter_field_value) |
|
1801 |
elif filter_field.type == 'bool': |
|
1802 |
criterias.append(Equal('f%s' % filter_field.id, filter_field_value)) |
|
1803 |
elif filter_field.type in ('string', 'email'): |
|
1804 |
criterias.append(Equal('f%s' % filter_field.id, filter_field_value)) |
|
1805 |
elif filter_field.type == 'date': |
|
1806 |
criterias.append(Equal('f%s' % filter_field.id, filter_field_value)) |
|
1816 |
elif filter_field.type in ('item', 'items', 'bool', 'string', 'email', 'date'): |
|
1817 |
criterias.append(criteria('f%s' % filter_field.id, filter_field_value, field=filter_field)) |
|
1818 |
if filter_field.type in ('item', 'items'): |
|
1819 |
field_options = filter_field.get_options() |
|
1820 |
if field_options and type(field_options[0]) in (list, tuple): |
|
1821 |
for option in field_options: |
|
1822 |
if filter_field_value in (option[0], option[-1]): |
|
1823 |
filter_field_value = option[1] |
|
1824 |
break |
|
1825 |
criterias[-1]._label = '%s: %s' % (filter_field.label, filter_field_value) |
|
1807 | 1826 | |
1808 | 1827 |
unknown_filters = sorted(filters_in_request - known_filters) |
1809 | 1828 |
if unknown_filters: |
wcs/qommon/storage.py | ||
---|---|---|
155 | 155 |
self.field = kwargs.get('field') |
156 | 156 | |
157 | 157 |
def build_lambda(self): |
158 |
return lambda x: self.op(getattr(x, self.attribute, None) or self.typed_none, self.value) |
|
158 |
def func(x): |
|
159 |
if self.field and self.field.key in ['item', 'string'] and isinstance(self.value, int): |
|
160 |
try: |
|
161 |
_x = int(getattr(x, self.attribute, None)) |
|
162 |
except (ValueError, TypeError): |
|
163 |
_x = self.typed_none |
|
164 |
return self.op(_x, self.value) |
|
165 |
if self.field and self.field.key == 'items': |
|
166 |
_x = getattr(x, self.attribute, None) or [] |
|
167 |
if isinstance(self.value, int): |
|
168 |
try: |
|
169 |
_x = [int(e) for e in _x] |
|
170 |
except ValueError: |
|
171 |
_x = [] |
|
172 |
return getattr(self, 'array_op', any)(self.op(e, self.value) for e in _x) |
|
173 |
if isinstance(self.value, int): |
|
174 |
return self.op(int(getattr(x, self.attribute, None)) or self.typed_none, self.value) |
|
175 |
return self.op(getattr(x, self.attribute, None) or self.typed_none, self.value) |
|
176 | ||
177 |
return func |
|
159 | 178 | |
160 | 179 |
def __repr__(self): |
161 | 180 |
return '<%s (attribute: %r%s)>' % ( |
... | ... | |
179 | 198 | |
180 | 199 |
class NotEqual(Criteria): |
181 | 200 |
op = operator.ne |
201 |
array_op = all |
|
182 | 202 | |
183 | 203 | |
184 | 204 |
class LessOrEqual(Criteria): |
wcs/sql.py | ||
---|---|---|
108 | 108 |
self.attribute = attribute.replace('-', '_') |
109 | 109 |
self.value = value |
110 | 110 |
self.field = kwargs.get('field') |
111 |
self.parent_field = kwargs.get('parent_field') |
|
112 | 111 | |
113 | 112 |
def as_sql(self): |
113 |
if self.field and getattr(self.field, 'block_field', None): |
|
114 |
# eq: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' = 'value') |
|
115 |
# lt: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' < 'value') |
|
116 |
# lte: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' <= 'value') |
|
117 |
# gt: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' > 'value') |
|
118 |
# gte: EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' >= 'value') |
|
119 |
# with a NOT EXISTS and the opposite operator: |
|
120 |
# ne: NOT EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' = 'value') |
|
121 |
# note: aa->>'FOOBAR' can be written with an integer or bool cast |
|
122 |
attribute = "aa->>'%s'" % self.field.id |
|
123 |
if self.field.key in ['item', 'string'] and isinstance(self.value, int): |
|
124 |
# integer cast of db values |
|
125 |
attribute = "(CASE WHEN %s~E'^\\\\d+$' THEN (%s)::int ELSE NULL END)" % (attribute, attribute) |
|
126 |
elif self.field.key == 'bool': |
|
127 |
# bool cast of db values |
|
128 |
attribute = '(%s)::bool' % attribute |
|
129 |
return "%s(SELECT 1 FROM jsonb_array_elements(%s->'data') AS datas(aa) WHERE %s %s %%(c%s)s)" % ( |
|
130 |
getattr(self, 'sql_exists', 'EXISTS'), |
|
131 |
get_field_id(self.field.block_field), |
|
132 |
attribute, |
|
133 |
getattr(self, 'sql_op_exists', self.sql_op), |
|
134 |
id(self.value), |
|
135 |
) |
|
136 | ||
114 | 137 |
attribute = self.attribute |
138 | ||
139 |
if self.field and self.field.key == 'items': |
|
140 |
# eq: 'value' = ANY (ITEMS) |
|
141 |
# ne: 'value' != ALL (ITEMS) |
|
142 |
# with reversed operator: |
|
143 |
# lt: 'value' > ANY (ITEMS) |
|
144 |
# lte: 'value' >= ANY (ITEMS) |
|
145 |
# gt: 'value' < ANY (ITEMS) |
|
146 |
# gte: 'value' <= ANY (ITEMS) |
|
147 |
# note: ITEMS is written with an integer cast or with a COALESCE expression |
|
148 |
if isinstance(self.value, int): |
|
149 |
# integer cast of db values |
|
150 |
attribute = ( |
|
151 |
"CASE WHEN array_to_string(%s, '')~E'^\\\\d+$' THEN %s::int[] ELSE ARRAY[]::int[] END" |
|
152 |
% (attribute, attribute) |
|
153 |
) |
|
154 |
else: |
|
155 |
# for none values |
|
156 |
attribute = "COALESCE(%s, ARRAY[]::text[])" % attribute |
|
157 |
return '%%(c%s)s %s %s (%s)' % ( |
|
158 |
id(self.value), |
|
159 |
getattr(self, 'sql_reversed_op', self.sql_op), |
|
160 |
getattr(self, 'sql_array_op', 'ANY'), |
|
161 |
attribute, |
|
162 |
) |
|
163 | ||
115 | 164 |
if self.field and self.field.key == 'computed': |
116 | 165 |
attribute = "%s->>'data'" % self.attribute |
117 |
if self.parent_field and self.parent_field.key == 'block':
|
|
118 |
attribute = "%s->'data'" % self.attribute
|
|
119 | ||
166 |
elif self.field and self.field.key in ['item', 'string'] and isinstance(self.value, int):
|
|
167 |
# integer cast of db values
|
|
168 |
attribute = "(CASE WHEN %s~E'^\\\\d+$' THEN %s::int ELSE NULL END)" % (attribute, attribute) |
|
120 | 169 |
return '%s %s %%(c%s)s' % (attribute, self.sql_op, id(self.value)) |
121 | 170 | |
122 | 171 |
def as_sql_param(self): |
... | ... | |
131 | 180 | |
132 | 181 |
class Less(Criteria): |
133 | 182 |
sql_op = '<' |
183 |
sql_reversed_op = '>' |
|
134 | 184 | |
135 | 185 | |
136 | 186 |
class Greater(Criteria): |
137 | 187 |
sql_op = '>' |
188 |
sql_reversed_op = '<' |
|
138 | 189 | |
139 | 190 | |
140 | 191 |
class Equal(Criteria): |
... | ... | |
148 | 199 | |
149 | 200 |
class LessOrEqual(Criteria): |
150 | 201 |
sql_op = '<=' |
202 |
sql_reversed_op = '>=' |
|
151 | 203 | |
152 | 204 | |
153 | 205 |
class GreaterOrEqual(Criteria): |
154 | 206 |
sql_op = '>=' |
207 |
sql_reversed_op = '<=' |
|
155 | 208 | |
156 | 209 | |
157 | 210 |
class NotEqual(Criteria): |
158 | 211 |
sql_op = '!=' |
212 |
# in case of items field, we want to write this clause: |
|
213 |
# 'value' != ALL (ITEMS) |
|
214 |
sql_array_op = 'ALL' |
|
215 |
# in case of block field, we want to write this clause: |
|
216 |
# NOT EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' = 'value') |
|
217 |
# and not: |
|
218 |
# EXISTS (SELECT 1 FROM jsonb_array_elements(BLOCK->'data') AS datas(aa) WHERE aa->>'FOOBAR' != 'value') |
|
219 |
sql_exists = 'NOT EXISTS' |
|
220 |
sql_op_exists = '=' |
|
159 | 221 | |
160 | 222 | |
161 | 223 |
class Contains(Criteria): |
162 |
- |