Projet

Général

Profil

0003-backoffice-add-filter-on-user-function-58881.patch

Frédéric Péters, 08 février 2022 09:56

Télécharger (14,3 ko)

Voir les différences:

Subject: [PATCH 3/3] backoffice: add filter on user function (#58881)

 tests/api/test_carddef.py              | 104 +++++++++++++++++++++++++
 tests/backoffice_pages/test_filters.py |  57 ++++++++++++++
 wcs/backoffice/management.py           |  55 ++++++++++++-
 wcs/qommon/storage.py                  |  18 +++++
 wcs/sql.py                             |  15 ++++
 5 files changed, 247 insertions(+), 2 deletions(-)
tests/api/test_carddef.py
1091 1091
        'backoffice_url': 'http://example.net/backoffice/management/test/%s/' % formdata.id,
1092 1092
        'api_url': 'http://example.net/api/forms/test/%s/' % formdata.id,
1093 1093
    }
1094

  
1095

  
1096
def test_cards_filter_function(pub, local_user):
1097
    pub.role_class.wipe()
1098
    role = pub.role_class(name='test')
1099
    role.store()
1100
    local_user.roles = [role.id]
1101
    local_user.name_identifiers = ['0123456789']
1102
    local_user.store()
1103

  
1104
    user1 = pub.user_class(name='userA')
1105
    user1.name_identifiers = ['56789']
1106
    user1.store()
1107
    user2 = pub.user_class(name='userB')
1108
    user2.name_identifiers = ['98765']
1109
    user2.store()
1110

  
1111
    CardDef.wipe()
1112
    carddef = CardDef()
1113
    carddef.name = 'test'
1114
    carddef.fields = []
1115
    carddef.workflow_roles = {'_viewer': role.id}
1116
    carddef.digest_templates = {'default': 'bla {{ form_number }} xxx'}
1117
    carddef.store()
1118

  
1119
    carddef.data_class().wipe()
1120

  
1121
    carddatas = []
1122
    for i in range(3):
1123
        carddatas.append(carddef.data_class()())
1124

  
1125
    carddatas[0].workflow_roles = {'_foobar': [str(role.id)]}
1126
    carddatas[1].workflow_roles = {'_foobar': ['_user:%s' % user1.id]}
1127
    carddatas[2].workflow_roles = {'_foobar': ['_user:%s' % user1.id, '_user:%s' % user2.id]}
1128

  
1129
    for carddata in carddatas:
1130
        carddata.just_created()
1131
        carddata.jump_status('recorded')
1132
        carddata.store()
1133

  
1134
    # no paramater, -> get everything
1135
    resp = get_app(pub).get(sign_uri('/api/cards/test/list'))
1136
    assert len(resp.json['data']) == 3
1137

  
1138
    # filter on missing uuid
1139
    resp = get_app(pub).get(
1140
        sign_uri('/api/cards/test/list?filter-user-function=_foobar&filter-user-uuid=XXX')
1141
    )
1142
    assert len(resp.json['data']) == 0
1143

  
1144
    resp = get_app(pub).get(
1145
        sign_uri(
1146
            '/api/cards/test/list?filter-user-function=_foobar&filter-user-uuid=%s'
1147
            % user1.name_identifiers[0]
1148
        )
1149
    )
1150
    assert len(resp.json['data']) == 2
1151

  
1152
    resp = get_app(pub).get(
1153
        sign_uri(
1154
            '/api/cards/test/list?filter-user-function=_foobar&filter-user-uuid=%s'
1155
            % user2.name_identifiers[0]
1156
        )
1157
    )
1158
    assert len(resp.json['data']) == 1
1159

  
1160
    # filter on role
1161
    resp = get_app(pub).get(
1162
        sign_uri(
1163
            '/api/cards/test/list?filter-user-function=_foobar&filter-user-uuid=%s'
1164
            % local_user.name_identifiers[0]
1165
        )
1166
    )
1167
    assert len(resp.json['data']) == 1
1168

  
1169
    # via custom view
1170
    pub.custom_view_class.wipe()
1171
    custom_view = pub.custom_view_class()
1172
    custom_view.title = 'shared carddef custom view'
1173
    custom_view.formdef = carddef
1174
    custom_view.columns = {'list': [{'id': '0'}]}
1175
    custom_view.filters = {"filter-user-function": "on", "filter-user-function-value": "_foobar"}
1176
    custom_view.visibility = 'any'
1177
    custom_view.store()
1178

  
1179
    resp = get_app(pub).get(
1180
        sign_uri(
1181
            '/api/cards/test/list/shared-carddef-custom-view?filter-user-uuid=%s' % user1.name_identifiers[0]
1182
        )
1183
    )
1184
    assert len(resp.json['data']) == 2
1185

  
1186
    # with function obtained via role set on carddef
1187
    carddef.workflow_roles = {'_foobar': role.id}
1188
    carddef.store()
1189
    carddef.data_class().rebuild_security()
1190

  
1191
    resp = get_app(pub).get(
1192
        sign_uri(
1193
            '/api/cards/test/list/shared-carddef-custom-view?filter-user-uuid=%s'
1194
            % local_user.name_identifiers[0]
1195
        )
1196
    )
1197
    assert len(resp.json['data']) == 3
tests/backoffice_pages/test_filters.py
896 896
    assert resp.text.count('>userB<') == 0
897 897

  
898 898

  
899
def test_workflow_function_filter(pub):
900
    pub.user_class.wipe()
901
    user = create_superuser(pub)
902
    user.name_identifiers = ['0123456789']
903
    pub.role_class.wipe()
904
    role = pub.role_class(name='test')
905
    role.store()
906

  
907
    workflow = Workflow.get_default_workflow()
908
    workflow.id = '2'
909
    workflow.roles['_foobar'] = 'Foobar'
910
    workflow.store()
911

  
912
    FormDef.wipe()
913
    formdef = FormDef()
914
    formdef.name = 'form-title'
915
    formdef.fields = []
916
    formdef.workflow = workflow
917
    formdef.workflow_roles = {'_receiver': role.id}
918
    formdef.store()
919

  
920
    user1 = pub.user_class(name='userA')
921
    user1.name_identifiers = ['56789']
922
    user1.store()
923
    user2 = pub.user_class(name='userB')
924
    user2.name_identifiers = ['98765']
925
    user2.store()
926

  
927
    data_class = formdef.data_class()
928
    data_class.wipe()
929

  
930
    formdatas = []
931
    for i in range(3):
932
        formdatas.append(data_class())
933

  
934
    formdatas[0].workflow_roles = {'_foobar': ['_user:%s' % user.id]}
935
    formdatas[1].workflow_roles = {'_foobar': ['_user:%s' % user1.id]}
936
    formdatas[2].workflow_roles = {'_foobar': ['_user:%s' % user1.id, '_user:%s' % user2.id]}
937

  
938
    for formdata in formdatas:
939
        formdata.just_created()
940
        formdata.jump_status('new')
941
        formdata.store()
942

  
943
    app = login(get_app(pub))
944
    resp = app.get('/backoffice/management/form-title/')
945
    # enable user-function column
946
    resp.forms['listing-settings']['filter-user-function'].checked = True
947
    resp = resp.forms['listing-settings'].submit()
948
    assert resp.text.count('<tr') == 4
949

  
950
    # set a value in the select field
951
    resp.forms['listing-settings']['filter-user-function-value'].value = '_foobar'
952
    resp = resp.forms['listing-settings'].submit()
953
    assert resp.text.count('<tr') == 2
954

  
955

  
899 956
def test_backoffice_table_varname_filter(pub):
900 957
    pub.user_class.wipe()
901 958
    create_superuser(pub)
wcs/backoffice/management.py
67 67
from ..qommon.misc import C_, ellipsize, get_type_name
68 68
from ..qommon.storage import (
69 69
    Contains,
70
    ElementIntersects,
70 71
    Equal,
71 72
    FtsMatch,
72 73
    GreaterOrEqual,
......
1024 1025
            'number',
1025 1026
            'period-date',
1026 1027
            'user-id',
1028
            'user-function',
1027 1029
            'submission-agent-id',
1028 1030
        ]
1029 1031
        if get_publisher().is_using_postgresql():
......
1038 1040
            FakeField('start', 'period-date', _('Start')),
1039 1041
            FakeField('end', 'period-date', _('End')),
1040 1042
            FakeField('user', 'user-id', _('User')),
1043
            FakeField('user-function', 'user-function', _('Current User Function')),
1041 1044
            FakeField('submission-agent', 'submission-agent-id', _('Submission Agent'), addable=False),
1042 1045
        ]
1043 1046
        default_filters = self.get_default_filters(mode)
......
1175 1178
                    widget._parsed = True  # make sure value is not replaced by request query
1176 1179
                    r += widget.render()
1177 1180

  
1181
            elif filter_field.type == 'user-function':
1182
                options = [('', '', '')] + [
1183
                    (x[0], x[1], x[0]) for x in self.formdef.workflow.get_sorted_functions()
1184
                ]
1185
                r += SingleSelectWidget(
1186
                    filter_field_key,
1187
                    title=filter_field.label,
1188
                    options=options,
1189
                    value=filter_field_value,
1190
                    render_br=False,
1191
                ).render()
1192

  
1178 1193
            elif filter_field.type in ('item', 'items'):
1179 1194
                filter_field.required = False
1180 1195

  
......
1647 1662
            FakeField('start-mtime', 'period-date', _('Start (modification time)')),
1648 1663
            FakeField('end-mtime', 'period-date', _('End (modification time)')),
1649 1664
            FakeField('user', 'user-id', _('User')),
1665
            FakeField('user-function', 'user-function', _('Current User Function')),
1650 1666
            FakeField('submission-agent', 'submission-agent-id', _('Submission Agent')),
1651 1667
        ]
1652 1668
        criterias = []
......
1693 1709
            if filter_field.type == 'number' and filters_dict.get('filter-number'):
1694 1710
                filters_dict['filter-number-value'] = filters_dict['filter-number']
1695 1711

  
1696
            if filter_field.type == 'user-id':
1697
                # convert uuid based filter into local id filter
1712
            if filter_field.type == 'user-id' and not filters_dict.get('filter-user-function'):
1713
                # convert uuid based filter into local id filter.
1714
                # do not apply if there's filter-user-function as it indicates the filtering
1715
                # should happen on function, not ownership.
1698 1716
                name_id = filters_dict.get('filter-user-uuid')
1699 1717
                if name_id:
1700 1718
                    nameid_users = get_publisher().user_class.get_users_with_name_identifier(name_id)
......
1710 1728
                            '__current__' if name_id == '__current__' else '-1'
1711 1729
                        )
1712 1730

  
1731
            if filter_field.type == 'user-function' and filters_dict.get('filter-user-function'):
1732
                if filters_dict.get('filter-user-function') != 'on':
1733
                    # allow for short form, with a single query parameter
1734
                    filters_dict['filter-user-function-value'] = filters_dict.get('filter-user-function')
1735

  
1713 1736
            if filter_field.type == 'submission-agent-id':
1714 1737
                # convert uuid based filter into local id filter
1715 1738
                name_id = filters_dict.get('filter-submission-agent-uuid')
......
1741 1764
            if not filter_field_value:
1742 1765
                continue
1743 1766

  
1767
            if filter_field.type == 'user-function':
1768
                # .../list?filter-user-function=on&filter-user-function-value=_manager&filter-user-uuid=c3...
1769
                # .../list?filter-user-function=_manager&filter-user-uuid=c3...
1770
                name_id = filters_dict.get('filter-user-uuid')
1771
                if name_id:
1772
                    nameid_users = get_publisher().user_class.get_users_with_name_identifier(name_id)
1773
                    if nameid_users:
1774
                        filter_field_value = '%s:%s' % (
1775
                            filters_dict['filter-user-function-value'],
1776
                            nameid_users[0].id,
1777
                        )
1778
                    else:
1779
                        # no user with this uuid, change to filter on nobody
1780
                        filter_field_value = '%s:__none__' % (filters_dict['filter-user-function-value'],)
1781

  
1744 1782
            if filter_field.type == 'date':
1745 1783
                try:
1746 1784
                    filter_field_value = misc.get_as_datetime(filter_field_value).date().strftime('%Y-%m-%d')
......
1786 1824
                criterias.append(Equal('user_id', filter_field_value))
1787 1825
            elif filter_field.type == 'submission-agent-id':
1788 1826
                criterias.append(Equal('submission_agent_id', filter_field_value))
1827
            elif filter_field.type == 'user-function':
1828
                if ':' in filter_field_value:
1829
                    filter_field_value, user_id = filter_field_value.split(':', 1)
1830
                    user_object = None if user_id == '__none__' else get_publisher().user_class().get(user_id)
1831
                else:
1832
                    user_object = get_request().user
1833
                criterias.append(
1834
                    ElementIntersects(
1835
                        'workflow_merged_roles_dict',
1836
                        filter_field_value,
1837
                        user_object.get_roles() if user_object else None,
1838
                    )
1839
                )
1789 1840
            elif filter_field.type in ('item', 'items'):
1790 1841
                if filter_field.type == 'item':
1791 1842
                    criterias.append(Equal('f%s' % filter_field.id, filter_field_value))
wcs/qommon/storage.py
325 325
        )
326 326

  
327 327

  
328
class ElementIntersects(Criteria):
329
    def __init__(self, attribute, key, value):
330
        self.attribute = attribute
331
        self.key = key
332
        self.value = value
333
        if not self.value:
334
            self.value = set()
335
        elif not isinstance(self.value, (tuple, list, set)):
336
            self.value = {self.value}
337
        else:
338
            self.value = set(self.value)
339

  
340
    def build_lambda(self):
341
        return lambda x: self.value.intersection(
342
            set((getattr(x, self.attribute, None) or {}).get(self.key, []))
343
        )
344

  
345

  
328 346
def parse_clause(clause):
329 347
    # creates a callable out of a clause
330 348
    #  (attribute, operator, value)
wcs/sql.py
313 313
        return "%s->>'%s' ILIKE %%(c%s)s" % (self.attribute, self.key, id(self.value))
314 314

  
315 315

  
316
class ElementIntersects(ElementEqual):
317
    def as_sql(self):
318
        if not self.value:
319
            return 'FALSE'
320
        if not isinstance(self.value, (tuple, list, set)):
321
            self.value = [self.value]
322
        else:
323
            self.value = list(self.value)
324
        return "EXISTS(SELECT 1 FROM jsonb_array_elements_text(%s->'%s') foo WHERE foo = ANY(%%(c%s)s))" % (
325
            self.attribute,
326
            self.key,
327
            id(self.value),
328
        )
329

  
330

  
316 331
def get_name_as_sql_identifier(name):
317 332
    name = qommon.misc.simplify(name)
318 333
    for char in '<>|{}!?^*+/=\'':  # forbidden chars
319
-