Projet

Général

Profil

0002-backoffice-don-t-include-anonymised-formdata-in-list.patch

Frédéric Péters, 25 novembre 2019 09:17

Télécharger (9,08 ko)

Voir les différences:

Subject: [PATCH 2/2] backoffice: don't include anonymised formdata in listings
 (#33707)

 tests/test_backoffice_pages.py | 39 ++++++++++++++++++++++++++++++++++
 wcs/backoffice/management.py   |  1 +
 wcs/forms/backoffice.py        | 19 ++++++++++++-----
 wcs/sql.py                     | 30 ++++++++++++++++++++------
 4 files changed, 78 insertions(+), 11 deletions(-)
tests/test_backoffice_pages.py
475 475
    ids = [x.strip('/') for x in re.findall(r'data-link="(.*?)"', resp.text)]
476 476
    assert ids == list(reversed(last_update_time_order))
477 477

  
478
def test_backoffice_listing_anonymised(pub):
479
    if not pub.is_using_postgresql():
480
        pytest.skip('this requires SQL')
481
        return
482
    create_superuser(pub)
483
    create_environment(pub)
484
    app = login(get_app(pub))
485
    resp = app.get('/backoffice/management/form-title/?limit=500')
486
    assert resp.text.count('data-link') == 17
487

  
488
    formdef = FormDef.get_by_urlname('form-title')
489
    for i, formdata in enumerate(formdef.data_class().select(order_by='id')):
490
        if i % 2:
491
            formdata.anonymise()
492

  
493
    resp = app.get('/backoffice/management/form-title/?limit=500')
494
    assert resp.text.count('data-link') == 9
495

  
478 496
def test_backoffice_legacy_urls(pub):
479 497
    create_superuser(pub)
480 498
    create_environment(pub)
......
3232 3250
    resp = resp.forms['listing-settings'].submit()
3233 3251
    assert resp.text[resp.text.index('<tbody'):].count('<tr') == 37
3234 3252

  
3253
def test_global_listing_anonymised(pub):
3254
    if not pub.is_using_postgresql():
3255
        pytest.skip('this requires SQL')
3256
        return
3257

  
3258
    create_user(pub)
3259
    create_environment(pub)
3260
    app = login(get_app(pub))
3261
    resp = app.get('/backoffice/management/listing?limit=500&status=all')
3262
    assert resp.text[resp.text.index('<tbody'):].count('<tr') == 70
3263

  
3264
    formdef = FormDef.get_by_urlname('other-form')
3265
    for formdata in formdef.data_class().select():
3266
        formdata.anonymise()
3267

  
3268
    resp = app.get('/backoffice/management/listing?limit=500&status=all')
3269
    assert resp.text[resp.text.index('<tbody'):].count('<tr') == 50
3270

  
3271
    resp = app.get('/backoffice/management/listing?limit=500&status=open')
3272
    assert resp.text[resp.text.index('<tbody'):].count('<tr') == 17
3273

  
3235 3274
def test_global_listing_geojson(pub):
3236 3275
    if not pub.is_using_postgresql():
3237 3276
        pytest.skip('this requires SQL')
wcs/backoffice/management.py
840 840
            get_publisher().get_site_option('default-sort-order') or '-receipt_time')
841 841

  
842 842
        criterias = self.get_global_listing_criterias()
843
        criterias.append(Null('anonymised'))  # exclude anonymised forms
843 844
        total_count = sql.AnyFormData.count(criterias)
844 845
        if offset > total_count:
845 846
            get_request().form['offset'] = '0'
wcs/forms/backoffice.py
23 23
from ..qommon import misc
24 24
from ..qommon.form import *
25 25
from ..qommon.backoffice.listing import pagination_links
26
from wcs.qommon.storage import Null
26 27
from wcs.roles import logged_users_role
27 28

  
28 29
class FormDefUI(object):
......
35 36
                    include_checkboxes=False):
36 37

  
37 38
        partial_display = False
39
        using_postgresql = get_publisher().is_using_postgresql()
40

  
38 41

  
39 42
        if not items:
40 43
            if offset and not limit:
41 44
                limit = int(get_publisher().get_site_option('default-page-size') or 20)
45
            if not criterias:
46
                criterias = []
47
            if using_postgresql:
48
                criterias.append(Null('anonymised'))
42 49
            items, total_count = self.get_listing_items(
43 50
                            selected_filter, offset, limit, query, order_by,
44 51
                            criterias=criterias)
......
78 85
            r += htmltext('<col />')
79 86
        r += htmltext('</colgroup>')
80 87

  
81
        using_postgresql = get_publisher().is_using_postgresql()
82

  
83 88
        r += htmltext('<thead><tr>')
84 89
        if self.formdef.workflow.criticality_levels and using_postgresql:
85 90
            r += htmltext('<th class="criticality-level-cell" data-field-sort-key="criticality_level"><span></span></th>')
......
125 130

  
126 131
    def get_listing_item_ids(self, selected_filter='all', query=None, order_by=None, user=None, criterias=None, anonymise=False):
127 132
        formdata_class = self.formdef.data_class()
133
        clause_kwargs = {}
134
        if get_publisher().is_using_postgresql():
135
            # pass criterias to all queries
136
            clause_kwargs = {'clause': criterias}
128 137
        if selected_filter == 'all':
129
            item_ids = formdata_class.keys()
138
            item_ids = formdata_class.keys(**clause_kwargs)
130 139
            drafts = {x: True for x in formdata_class.get_ids_with_indexed_value('status', 'draft')}
131 140
            item_ids = [x for x in item_ids if x not in drafts]
132 141
        elif selected_filter == 'waiting':
......
145 154
            item_ids = []
146 155
            for status_id in applied_filters:
147 156
                item_ids.extend(formdata_class.get_ids_with_indexed_value(
148
                                        str('status'), status_id))
157
                    'status', status_id, **clause_kwargs))
149 158

  
150 159
        if query:
151 160
            query_ids = formdata_class.get_ids_from_query(query)
......
185 194
            # get_sorted_ids is only implemented in the SQL backend
186 195
            order_by = None
187 196
        if order_by and not anonymise:
188
            ordered_ids = formdata_class.get_sorted_ids(order_by)
197
            ordered_ids = formdata_class.get_sorted_ids(order_by, clause=criterias)
189 198
            item_ids_dict = {x: True for x in item_ids}
190 199
            item_ids = [x for x in ordered_ids if x in item_ids_dict]
191 200
        else:
wcs/sql.py
951 951

  
952 952
    @classmethod
953 953
    @guard_postgres
954
    def keys(cls):
954
    def keys(cls, clause=None):
955 955
        conn, cur = get_connection_and_cursor()
956
        where_clauses, parameters, func_clause = parse_clause(clause)
957
        assert not func_clause
956 958
        sql_statement = 'SELECT id FROM %s' % cls._table_name
957
        cur.execute(sql_statement)
959
        if where_clauses:
960
            sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
961
        cur.execute(sql_statement, parameters)
958 962
        ids = [x[0] for x in cur.fetchall()]
959 963
        conn.commit()
960 964
        cur.close()
......
1235 1239

  
1236 1240
    @classmethod
1237 1241
    @guard_postgres
1238
    def get_sorted_ids(cls, order_by):
1242
    def get_sorted_ids(cls, order_by, clause=None):
1239 1243
        conn, cur = get_connection_and_cursor()
1240 1244
        sql_statement = 'SELECT id FROM %s' % cls._table_name
1245
        where_clauses, parameters, func_clause = parse_clause(clause)
1246
        assert not func_clause
1247
        if where_clauses:
1248
            sql_statement += ' WHERE ' + ' AND '.join(where_clauses)
1241 1249
        if order_by.startswith('-'):
1242 1250
            order_by = order_by[1:]
1243 1251
            sql_statement += ' ORDER BY %s DESC' % order_by.replace('-', '_')
1244 1252
        else:
1245 1253
            sql_statement += ' ORDER BY %s' % order_by.replace('-', '_')
1246
        cur.execute(sql_statement)
1254
        cur.execute(sql_statement, parameters)
1247 1255
        ids = [x[0] for x in cur.fetchall()]
1248 1256
        conn.commit()
1249 1257
        cur.close()
......
1604 1612

  
1605 1613
    @classmethod
1606 1614
    @guard_postgres
1607
    def get_ids_with_indexed_value(cls, index, value, auto_fallback=True):
1615
    def get_ids_with_indexed_value(cls, index, value, auto_fallback=True, clause=None):
1608 1616
        conn, cur = get_connection_and_cursor()
1609 1617

  
1618
        where_clauses, parameters, func_clause = parse_clause(clause)
1619
        assert not func_clause
1620

  
1610 1621
        if type(value) is int:
1611 1622
            value = str(value)
1612 1623

  
......
1618 1629
            sql_statement = '''SELECT id FROM %s WHERE %s = %%(value)s''' % (
1619 1630
                            cls._table_name,
1620 1631
                            index)
1621
        cur.execute(sql_statement, {'value': value})
1632

  
1633
        if where_clauses:
1634
            sql_statement += ' AND ' + ' AND '.join(where_clauses)
1635
        else:
1636
            parameters = {}
1637

  
1638
        parameters.update({'value': value})
1639
        cur.execute(sql_statement, parameters)
1622 1640
        all_ids = [x[0] for x in cur.fetchall()]
1623 1641
        cur.close()
1624 1642
        return all_ids
1625
-