0003-backoffice-add-filter-on-user-function-58881.patch
tests/api/test_carddef.py | ||
---|---|---|
1091 | 1091 |
'backoffice_url': 'http://example.net/backoffice/management/test/%s/' % formdata.id, |
1092 | 1092 |
'api_url': 'http://example.net/api/forms/test/%s/' % formdata.id, |
1093 | 1093 |
} |
1094 | ||
1095 | ||
1096 |
def test_cards_filter_function(pub, local_user): |
|
1097 |
pub.role_class.wipe() |
|
1098 |
role = pub.role_class(name='test') |
|
1099 |
role.store() |
|
1100 |
local_user.roles = [role.id] |
|
1101 |
local_user.name_identifiers = ['0123456789'] |
|
1102 |
local_user.store() |
|
1103 | ||
1104 |
user1 = pub.user_class(name='userA') |
|
1105 |
user1.name_identifiers = ['56789'] |
|
1106 |
user1.store() |
|
1107 |
user2 = pub.user_class(name='userB') |
|
1108 |
user2.name_identifiers = ['98765'] |
|
1109 |
user2.store() |
|
1110 | ||
1111 |
CardDef.wipe() |
|
1112 |
carddef = CardDef() |
|
1113 |
carddef.name = 'test' |
|
1114 |
carddef.fields = [] |
|
1115 |
carddef.workflow_roles = {'_viewer': role.id} |
|
1116 |
carddef.digest_templates = {'default': 'bla {{ form_number }} xxx'} |
|
1117 |
carddef.store() |
|
1118 | ||
1119 |
carddef.data_class().wipe() |
|
1120 | ||
1121 |
carddatas = [] |
|
1122 |
for i in range(3): |
|
1123 |
carddatas.append(carddef.data_class()()) |
|
1124 | ||
1125 |
carddatas[0].workflow_roles = {'_foobar': [str(role.id)]} |
|
1126 |
carddatas[1].workflow_roles = {'_foobar': ['_user:%s' % user1.id]} |
|
1127 |
carddatas[2].workflow_roles = {'_foobar': ['_user:%s' % user1.id, '_user:%s' % user2.id]} |
|
1128 | ||
1129 |
for carddata in carddatas: |
|
1130 |
carddata.just_created() |
|
1131 |
carddata.jump_status('recorded') |
|
1132 |
carddata.store() |
|
1133 | ||
1134 |
# no paramater, -> get everything |
|
1135 |
resp = get_app(pub).get(sign_uri('/api/cards/test/list')) |
|
1136 |
assert len(resp.json['data']) == 3 |
|
1137 | ||
1138 |
# filter on missing uuid |
|
1139 |
resp = get_app(pub).get( |
|
1140 |
sign_uri('/api/cards/test/list?filter-user-function=_foobar&filter-user-uuid=XXX') |
|
1141 |
) |
|
1142 |
assert len(resp.json['data']) == 0 |
|
1143 | ||
1144 |
resp = get_app(pub).get( |
|
1145 |
sign_uri( |
|
1146 |
'/api/cards/test/list?filter-user-function=_foobar&filter-user-uuid=%s' |
|
1147 |
% user1.name_identifiers[0] |
|
1148 |
) |
|
1149 |
) |
|
1150 |
assert len(resp.json['data']) == 2 |
|
1151 | ||
1152 |
resp = get_app(pub).get( |
|
1153 |
sign_uri( |
|
1154 |
'/api/cards/test/list?filter-user-function=_foobar&filter-user-uuid=%s' |
|
1155 |
% user2.name_identifiers[0] |
|
1156 |
) |
|
1157 |
) |
|
1158 |
assert len(resp.json['data']) == 1 |
|
1159 | ||
1160 |
# filter on role |
|
1161 |
resp = get_app(pub).get( |
|
1162 |
sign_uri( |
|
1163 |
'/api/cards/test/list?filter-user-function=_foobar&filter-user-uuid=%s' |
|
1164 |
% local_user.name_identifiers[0] |
|
1165 |
) |
|
1166 |
) |
|
1167 |
assert len(resp.json['data']) == 1 |
|
1168 | ||
1169 |
# via custom view |
|
1170 |
pub.custom_view_class.wipe() |
|
1171 |
custom_view = pub.custom_view_class() |
|
1172 |
custom_view.title = 'shared carddef custom view' |
|
1173 |
custom_view.formdef = carddef |
|
1174 |
custom_view.columns = {'list': [{'id': '0'}]} |
|
1175 |
custom_view.filters = {"filter-user-function": "on", "filter-user-function-value": "_foobar"} |
|
1176 |
custom_view.visibility = 'any' |
|
1177 |
custom_view.store() |
|
1178 | ||
1179 |
resp = get_app(pub).get( |
|
1180 |
sign_uri( |
|
1181 |
'/api/cards/test/list/shared-carddef-custom-view?filter-user-uuid=%s' % user1.name_identifiers[0] |
|
1182 |
) |
|
1183 |
) |
|
1184 |
assert len(resp.json['data']) == 2 |
|
1185 | ||
1186 |
# with function obtained via role set on carddef |
|
1187 |
carddef.workflow_roles = {'_foobar': role.id} |
|
1188 |
carddef.store() |
|
1189 |
carddef.data_class().rebuild_security() |
|
1190 | ||
1191 |
resp = get_app(pub).get( |
|
1192 |
sign_uri( |
|
1193 |
'/api/cards/test/list/shared-carddef-custom-view?filter-user-uuid=%s' |
|
1194 |
% local_user.name_identifiers[0] |
|
1195 |
) |
|
1196 |
) |
|
1197 |
assert len(resp.json['data']) == 3 |
tests/backoffice_pages/test_filters.py | ||
---|---|---|
896 | 896 |
assert resp.text.count('>userB<') == 0 |
897 | 897 | |
898 | 898 | |
899 |
def test_workflow_function_filter(pub): |
|
900 |
pub.user_class.wipe() |
|
901 |
user = create_superuser(pub) |
|
902 |
user.name_identifiers = ['0123456789'] |
|
903 |
pub.role_class.wipe() |
|
904 |
role = pub.role_class(name='test') |
|
905 |
role.store() |
|
906 | ||
907 |
workflow = Workflow.get_default_workflow() |
|
908 |
workflow.id = '2' |
|
909 |
workflow.roles['_foobar'] = 'Foobar' |
|
910 |
workflow.store() |
|
911 | ||
912 |
FormDef.wipe() |
|
913 |
formdef = FormDef() |
|
914 |
formdef.name = 'form-title' |
|
915 |
formdef.fields = [] |
|
916 |
formdef.workflow = workflow |
|
917 |
formdef.workflow_roles = {'_receiver': role.id} |
|
918 |
formdef.store() |
|
919 | ||
920 |
user1 = pub.user_class(name='userA') |
|
921 |
user1.name_identifiers = ['56789'] |
|
922 |
user1.store() |
|
923 |
user2 = pub.user_class(name='userB') |
|
924 |
user2.name_identifiers = ['98765'] |
|
925 |
user2.store() |
|
926 | ||
927 |
data_class = formdef.data_class() |
|
928 |
data_class.wipe() |
|
929 | ||
930 |
formdatas = [] |
|
931 |
for i in range(3): |
|
932 |
formdatas.append(data_class()) |
|
933 | ||
934 |
formdatas[0].workflow_roles = {'_foobar': ['_user:%s' % user.id]} |
|
935 |
formdatas[1].workflow_roles = {'_foobar': ['_user:%s' % user1.id]} |
|
936 |
formdatas[2].workflow_roles = {'_foobar': ['_user:%s' % user1.id, '_user:%s' % user2.id]} |
|
937 | ||
938 |
for formdata in formdatas: |
|
939 |
formdata.just_created() |
|
940 |
formdata.jump_status('new') |
|
941 |
formdata.store() |
|
942 | ||
943 |
app = login(get_app(pub)) |
|
944 |
resp = app.get('/backoffice/management/form-title/') |
|
945 |
# enable user-function column |
|
946 |
resp.forms['listing-settings']['filter-user-function'].checked = True |
|
947 |
resp = resp.forms['listing-settings'].submit() |
|
948 |
assert resp.text.count('<tr') == 4 |
|
949 | ||
950 |
# set a value in the select field |
|
951 |
resp.forms['listing-settings']['filter-user-function-value'].value = '_foobar' |
|
952 |
resp = resp.forms['listing-settings'].submit() |
|
953 |
assert resp.text.count('<tr') == 2 |
|
954 | ||
955 | ||
899 | 956 |
def test_backoffice_table_varname_filter(pub): |
900 | 957 |
pub.user_class.wipe() |
901 | 958 |
create_superuser(pub) |
wcs/backoffice/management.py | ||
---|---|---|
67 | 67 |
from ..qommon.misc import C_, ellipsize, get_type_name |
68 | 68 |
from ..qommon.storage import ( |
69 | 69 |
Contains, |
70 |
ElementIntersects, |
|
70 | 71 |
Equal, |
71 | 72 |
FtsMatch, |
72 | 73 |
GreaterOrEqual, |
... | ... | |
1024 | 1025 |
'number', |
1025 | 1026 |
'period-date', |
1026 | 1027 |
'user-id', |
1028 |
'user-function', |
|
1027 | 1029 |
'submission-agent-id', |
1028 | 1030 |
] |
1029 | 1031 |
if get_publisher().is_using_postgresql(): |
... | ... | |
1038 | 1040 |
FakeField('start', 'period-date', _('Start')), |
1039 | 1041 |
FakeField('end', 'period-date', _('End')), |
1040 | 1042 |
FakeField('user', 'user-id', _('User')), |
1043 |
FakeField('user-function', 'user-function', _('Current User Function')), |
|
1041 | 1044 |
FakeField('submission-agent', 'submission-agent-id', _('Submission Agent'), addable=False), |
1042 | 1045 |
] |
1043 | 1046 |
default_filters = self.get_default_filters(mode) |
... | ... | |
1175 | 1178 |
widget._parsed = True # make sure value is not replaced by request query |
1176 | 1179 |
r += widget.render() |
1177 | 1180 | |
1181 |
elif filter_field.type == 'user-function': |
|
1182 |
options = [('', '', '')] + [ |
|
1183 |
(x[0], x[1], x[0]) for x in self.formdef.workflow.get_sorted_functions() |
|
1184 |
] |
|
1185 |
r += SingleSelectWidget( |
|
1186 |
filter_field_key, |
|
1187 |
title=filter_field.label, |
|
1188 |
options=options, |
|
1189 |
value=filter_field_value, |
|
1190 |
render_br=False, |
|
1191 |
).render() |
|
1192 | ||
1178 | 1193 |
elif filter_field.type in ('item', 'items'): |
1179 | 1194 |
filter_field.required = False |
1180 | 1195 | |
... | ... | |
1647 | 1662 |
FakeField('start-mtime', 'period-date', _('Start (modification time)')), |
1648 | 1663 |
FakeField('end-mtime', 'period-date', _('End (modification time)')), |
1649 | 1664 |
FakeField('user', 'user-id', _('User')), |
1665 |
FakeField('user-function', 'user-function', _('Current User Function')), |
|
1650 | 1666 |
FakeField('submission-agent', 'submission-agent-id', _('Submission Agent')), |
1651 | 1667 |
] |
1652 | 1668 |
criterias = [] |
... | ... | |
1693 | 1709 |
if filter_field.type == 'number' and filters_dict.get('filter-number'): |
1694 | 1710 |
filters_dict['filter-number-value'] = filters_dict['filter-number'] |
1695 | 1711 | |
1696 |
if filter_field.type == 'user-id': |
|
1697 |
# convert uuid based filter into local id filter |
|
1712 |
if filter_field.type == 'user-id' and not filters_dict.get('filter-user-function'): |
|
1713 |
# convert uuid based filter into local id filter. |
|
1714 |
# do not apply if there's filter-user-function as it indicates the filtering |
|
1715 |
# should happen on function, not ownership. |
|
1698 | 1716 |
name_id = filters_dict.get('filter-user-uuid') |
1699 | 1717 |
if name_id: |
1700 | 1718 |
nameid_users = get_publisher().user_class.get_users_with_name_identifier(name_id) |
... | ... | |
1710 | 1728 |
'__current__' if name_id == '__current__' else '-1' |
1711 | 1729 |
) |
1712 | 1730 | |
1731 |
if filter_field.type == 'user-function' and filters_dict.get('filter-user-function'): |
|
1732 |
if filters_dict.get('filter-user-function') != 'on': |
|
1733 |
# allow for short form, with a single query parameter |
|
1734 |
filters_dict['filter-user-function-value'] = filters_dict.get('filter-user-function') |
|
1735 | ||
1713 | 1736 |
if filter_field.type == 'submission-agent-id': |
1714 | 1737 |
# convert uuid based filter into local id filter |
1715 | 1738 |
name_id = filters_dict.get('filter-submission-agent-uuid') |
... | ... | |
1741 | 1764 |
if not filter_field_value: |
1742 | 1765 |
continue |
1743 | 1766 | |
1767 |
if filter_field.type == 'user-function': |
|
1768 |
# .../list?filter-user-function=on&filter-user-function-value=_manager&filter-user-uuid=c3... |
|
1769 |
# .../list?filter-user-function=_manager&filter-user-uuid=c3... |
|
1770 |
name_id = filters_dict.get('filter-user-uuid') |
|
1771 |
if name_id: |
|
1772 |
nameid_users = get_publisher().user_class.get_users_with_name_identifier(name_id) |
|
1773 |
if nameid_users: |
|
1774 |
filter_field_value = '%s:%s' % ( |
|
1775 |
filters_dict['filter-user-function-value'], |
|
1776 |
nameid_users[0].id, |
|
1777 |
) |
|
1778 |
else: |
|
1779 |
# no user with this uuid, change to filter on nobody |
|
1780 |
filter_field_value = '%s:__none__' % (filters_dict['filter-user-function-value'],) |
|
1781 | ||
1744 | 1782 |
if filter_field.type == 'date': |
1745 | 1783 |
try: |
1746 | 1784 |
filter_field_value = misc.get_as_datetime(filter_field_value).date().strftime('%Y-%m-%d') |
... | ... | |
1786 | 1824 |
criterias.append(Equal('user_id', filter_field_value)) |
1787 | 1825 |
elif filter_field.type == 'submission-agent-id': |
1788 | 1826 |
criterias.append(Equal('submission_agent_id', filter_field_value)) |
1827 |
elif filter_field.type == 'user-function': |
|
1828 |
if ':' in filter_field_value: |
|
1829 |
filter_field_value, user_id = filter_field_value.split(':', 1) |
|
1830 |
user_object = None if user_id == '__none__' else get_publisher().user_class().get(user_id) |
|
1831 |
else: |
|
1832 |
user_object = get_request().user |
|
1833 |
criterias.append( |
|
1834 |
ElementIntersects( |
|
1835 |
'workflow_merged_roles_dict', |
|
1836 |
filter_field_value, |
|
1837 |
user_object.get_roles() if user_object else None, |
|
1838 |
) |
|
1839 |
) |
|
1789 | 1840 |
elif filter_field.type in ('item', 'items'): |
1790 | 1841 |
if filter_field.type == 'item': |
1791 | 1842 |
criterias.append(Equal('f%s' % filter_field.id, filter_field_value)) |
wcs/qommon/storage.py | ||
---|---|---|
325 | 325 |
) |
326 | 326 | |
327 | 327 | |
328 |
class ElementIntersects(Criteria): |
|
329 |
def __init__(self, attribute, key, value): |
|
330 |
self.attribute = attribute |
|
331 |
self.key = key |
|
332 |
self.value = value |
|
333 |
if not self.value: |
|
334 |
self.value = set() |
|
335 |
elif not isinstance(self.value, (tuple, list, set)): |
|
336 |
self.value = {self.value} |
|
337 |
else: |
|
338 |
self.value = set(self.value) |
|
339 | ||
340 |
def build_lambda(self): |
|
341 |
return lambda x: self.value.intersection( |
|
342 |
set((getattr(x, self.attribute, None) or {}).get(self.key, [])) |
|
343 |
) |
|
344 | ||
345 | ||
328 | 346 |
def parse_clause(clause): |
329 | 347 |
# creates a callable out of a clause |
330 | 348 |
# (attribute, operator, value) |
wcs/sql.py | ||
---|---|---|
313 | 313 |
return "%s->>'%s' ILIKE %%(c%s)s" % (self.attribute, self.key, id(self.value)) |
314 | 314 | |
315 | 315 | |
316 |
class ElementIntersects(ElementEqual): |
|
317 |
def as_sql(self): |
|
318 |
if not self.value: |
|
319 |
return 'FALSE' |
|
320 |
if not isinstance(self.value, (tuple, list, set)): |
|
321 |
self.value = [self.value] |
|
322 |
else: |
|
323 |
self.value = list(self.value) |
|
324 |
return "EXISTS(SELECT 1 FROM jsonb_array_elements_text(%s->'%s') foo WHERE foo = ANY(%%(c%s)s))" % ( |
|
325 |
self.attribute, |
|
326 |
self.key, |
|
327 |
id(self.value), |
|
328 |
) |
|
329 | ||
330 | ||
316 | 331 |
def get_name_as_sql_identifier(name): |
317 | 332 |
name = qommon.misc.simplify(name) |
318 | 333 |
for char in '<>|{}!?^*+/=\'': # forbidden chars |
319 |
- |