Projet

Général

Profil

0001-misc-add-special-Nothing-criteria-to-bypass-query-an.patch

Frédéric Péters, 12 août 2022 15:05

Télécharger (8,78 ko)

Voir les différences:

Subject: [PATCH] misc: add special "Nothing" criteria to bypass query and
 return nothing (#67884)

 wcs/backoffice/management.py | 15 ++++++++-------
 wcs/carddef.py               |  4 +++-
 wcs/custom_views.py          |  4 ++--
 wcs/qommon/storage.py        |  9 +++++++++
 wcs/sql.py                   |  8 ++++++++
 wcs/variables.py             | 23 ++++++++++++++++++-----
 6 files changed, 48 insertions(+), 15 deletions(-)
wcs/backoffice/management.py
80 80
    Not,
81 81
    NotContains,
82 82
    NotEqual,
83
    Nothing,
83 84
    NotNull,
84 85
    Null,
85 86
    StrictNotEqual,
......
917 918
        criterias.append(StrictNotEqual('status', 'draft'))
918 919
        if selected_filter == 'all':
919 920
            if selected_filter_operator == 'ne':
920
                criterias.append(Equal('status', '_none'))
921
                criterias.append(Nothing())
921 922
        elif selected_filter in ('waiting', 'pending'):
922 923
            statuses = ['wf-%s' % x.id for x in self.formdef.workflow.get_not_endpoint_status()]
923 924
            _criterias = [Contains('status', statuses)]
......
1782 1783
                raise RequestError(error_message)
1783 1784
            if self.view_type == 'table':
1784 1785
                get_session().message = ('warning', error_message)
1785
            criterias.append(Equal('status', '_none'))
1786
            criterias.append(Nothing())
1786 1787

  
1787 1788
        for filter_field in fake_fields + list(self.get_formdef_fields()):
1788 1789
            if filter_field.type not in self.get_filterable_field_types():
......
1883 1884
                        criterias.append(criteria('id', filter_field_value))
1884 1885
                        continue
1885 1886
                    if not compile_templates:
1886
                        criterias.append(Equal('status', '_none'))
1887
                        criterias.append(Nothing())
1887 1888
                        continue
1888 1889
                    with get_publisher().complex_data():
1889 1890
                        value = WorkflowStatusItem.compute(filter_field_value, allow_complex=True)
1890 1891
                        filter_field_value = get_publisher().get_cached_complex_data(value)
1891 1892
                    if filter_field_value is None:
1892
                        criterias.append(Equal('status', '_none'))
1893
                        criterias.append(Nothing())
1893 1894
                        continue
1894 1895

  
1895 1896
                    def record_error(value, operator):
......
1910 1911
                            [int(v) for v in filter_field_value]
1911 1912
                        except ValueError:
1912 1913
                            record_error(filter_field_value, filter_field_operator)
1913
                            criterias.append(Equal('status', '_none'))
1914
                            criterias.append(Nothing())
1914 1915
                            continue
1915 1916
                        if filter_field_operator == 'eq':
1916 1917
                            criterias.append(Contains('id', filter_field_value))
......
1918 1919
                            criterias.append(NotContains('id', filter_field_value))
1919 1920
                        else:
1920 1921
                            record_error(filter_field_value, filter_field_operator)
1921
                            criterias.append(Equal('status', '_none'))
1922
                            criterias.append(Nothing())
1922 1923
                        continue
1923 1924
                try:
1924 1925
                    filter_field_value = int(filter_field_value)
......
1948 1949
                        criterias.append(criteria(filter_field.id, filter_field_value))
1949 1950
                        continue
1950 1951
                    if not compile_templates:
1951
                        criterias.append(Equal('status', '_none'))
1952
                        criterias.append(Nothing())
1952 1953
                        continue
1953 1954
                    filter_field_value = WorkflowStatusItem.compute(filter_field_value)
1954 1955
                try:
wcs/carddef.py
208 208
                        user = variables['form_user']
209 209
                    except KeyError:
210 210
                        user = None
211
                    criterias.append(Equal('user_id', str(user.id) if user else '-1'))
211
                    if not user:
212
                        return []
213
                    criterias.append(Equal('user_id', str(user.id)))
212 214
            else:
213 215
                if custom_view is None:
214 216
                    custom_view = cls.get_data_source_custom_view(data_source_id, carddef=carddef)
wcs/custom_views.py
24 24
from wcs.carddef import CardDef
25 25
from wcs.formdef import FormDef
26 26
from wcs.qommon.misc import simplify
27
from wcs.qommon.storage import Contains, Equal, NotContains, StorableObject
27
from wcs.qommon.storage import Contains, Equal, NotContains, Nothing, StorableObject
28 28

  
29 29
from .qommon.misc import xml_node_text
30 30

  
......
177 177
        selected_status_filter_operator = self.get_status_filter_operator()
178 178
        if selected_filter and selected_filter == 'all':
179 179
            if selected_status_filter_operator == 'ne':
180
                criterias.append(Equal('status', '_none'))
180
                criterias.append(Nothing())
181 181
        elif selected_filter:
182 182
            if selected_filter == 'pending':
183 183
                applied_filters = ['wf-%s' % x.id for x in formdef.workflow.get_not_endpoint_status()]
wcs/qommon/storage.py
393 393
        )
394 394

  
395 395

  
396
class Nothing(Criteria):
397
    def __init__(self, *args, **kwargs):
398
        self.attribute = None
399
        self.value = None
400

  
401
    def build_lambda(self):
402
        return lambda x: False
403

  
404

  
396 405
def parse_clause(clause):
397 406
    # creates a callable out of a clause
398 407
    #  (attribute, operator, value)
wcs/sql.py
403 403
        )
404 404

  
405 405

  
406
class Nothing(wcs.qommon.storage.Nothing):
407
    def as_sql(self):
408
        return 'FALSE'
409

  
410
    def as_sql_param(self):
411
        return {}
412

  
413

  
406 414
def get_name_as_sql_identifier(name):
407 415
    name = qommon.misc.simplify(name)
408 416
    for char in '<>|{}!?^*+/=\'':  # forbidden chars
wcs/variables.py
26 26
from .formdef import FormDef, FormDefDoesNotExist
27 27
from .qommon import _, force_str, misc
28 28
from .qommon.evalutils import make_datetime
29
from .qommon.storage import And, Contains, Equal, Intersects, Not, NotEqual, NotNull, Null, Or, StrictNotEqual
29
from .qommon.storage import (
30
    And,
31
    Contains,
32
    Equal,
33
    Intersects,
34
    Not,
35
    NotEqual,
36
    Nothing,
37
    NotNull,
38
    Null,
39
    Or,
40
    StrictNotEqual,
41
)
30 42
from .qommon.substitution import CompatibilityNamesDict
31 43
from .qommon.templatetags.qommon import parse_datetime
32 44

  
......
89 101
        return self._clone(None)
90 102

  
91 103
    def none(self):
92
        return self._clone([Equal('status', '_none')])
104
        return self._clone([Nothing()])
93 105

  
94 106
    def pending(self):
95 107
        status_filters = ['wf-%s' % x.id for x in self._formdef.workflow.get_not_endpoint_status()]
......
97 109
        return self._clone(self._criterias + criterias)
98 110

  
99 111
    def current_user(self):  # filter on current user
100
        user = get_request().user
101
        return self._clone(self._criterias + [Equal('user_id', str(user.id) if user else '-1')])
112
        return self.filter_by_user(get_request().user)
102 113

  
103 114
    def filter_by_user(self, user):
104 115
        if isinstance(user, str):
105 116
            user = get_publisher().user_class.lookup_by_string(user)
106
        return self._clone(self._criterias + [Equal('user_id', str(user.id) if user else '-1')])
117
        if not user:
118
            return self.none()
119
        return self._clone(self._criterias + [Equal('user_id', str(user.id))])
107 120

  
108 121
    def filter_by_status(self, status):
109 122
        for wfs in self._formdef.workflow.possible_status:
110
-