0002-backoffice-don-t-include-anonymised-formdata-in-list.patch
tests/test_backoffice_pages.py | ||
---|---|---|
475 | 475 |
ids = [x.strip('/') for x in re.findall(r'data-link="(.*?)"', resp.text)] |
476 | 476 |
assert ids == list(reversed(last_update_time_order)) |
477 | 477 | |
478 |
def test_backoffice_listing_anonymised(pub): |
|
479 |
if not pub.is_using_postgresql(): |
|
480 |
pytest.skip('this requires SQL') |
|
481 |
return |
|
482 |
create_superuser(pub) |
|
483 |
create_environment(pub) |
|
484 |
app = login(get_app(pub)) |
|
485 |
resp = app.get('/backoffice/management/form-title/?limit=500') |
|
486 |
assert resp.text.count('data-link') == 17 |
|
487 | ||
488 |
formdef = FormDef.get_by_urlname('form-title') |
|
489 |
for i, formdata in enumerate(formdef.data_class().select(order_by='id')): |
|
490 |
if i % 2: |
|
491 |
formdata.anonymise() |
|
492 | ||
493 |
resp = app.get('/backoffice/management/form-title/?limit=500') |
|
494 |
assert resp.text.count('data-link') == 9 |
|
495 | ||
478 | 496 |
def test_backoffice_legacy_urls(pub): |
479 | 497 |
create_superuser(pub) |
480 | 498 |
create_environment(pub) |
... | ... | |
3232 | 3250 |
resp = resp.forms['listing-settings'].submit() |
3233 | 3251 |
assert resp.text[resp.text.index('<tbody'):].count('<tr') == 37 |
3234 | 3252 | |
3253 |
def test_global_listing_anonymised(pub): |
|
3254 |
if not pub.is_using_postgresql(): |
|
3255 |
pytest.skip('this requires SQL') |
|
3256 |
return |
|
3257 | ||
3258 |
create_user(pub) |
|
3259 |
create_environment(pub) |
|
3260 |
app = login(get_app(pub)) |
|
3261 |
resp = app.get('/backoffice/management/listing?limit=500&status=all') |
|
3262 |
assert resp.text[resp.text.index('<tbody'):].count('<tr') == 70 |
|
3263 | ||
3264 |
formdef = FormDef.get_by_urlname('other-form') |
|
3265 |
for formdata in formdef.data_class().select(): |
|
3266 |
formdata.anonymise() |
|
3267 | ||
3268 |
resp = app.get('/backoffice/management/listing?limit=500&status=all') |
|
3269 |
assert resp.text[resp.text.index('<tbody'):].count('<tr') == 50 |
|
3270 | ||
3271 |
resp = app.get('/backoffice/management/listing?limit=500&status=open') |
|
3272 |
assert resp.text[resp.text.index('<tbody'):].count('<tr') == 17 |
|
3273 | ||
3235 | 3274 |
def test_global_listing_geojson(pub): |
3236 | 3275 |
if not pub.is_using_postgresql(): |
3237 | 3276 |
pytest.skip('this requires SQL') |
wcs/backoffice/management.py | ||
---|---|---|
840 | 840 |
get_publisher().get_site_option('default-sort-order') or '-receipt_time') |
841 | 841 | |
842 | 842 |
criterias = self.get_global_listing_criterias() |
843 |
criterias.append(Null('anonymised')) # exclude anonymised forms |
|
843 | 844 |
total_count = sql.AnyFormData.count(criterias) |
844 | 845 |
if offset > total_count: |
845 | 846 |
get_request().form['offset'] = '0' |
wcs/forms/backoffice.py | ||
---|---|---|
23 | 23 |
from ..qommon import misc |
24 | 24 |
from ..qommon.form import * |
25 | 25 |
from ..qommon.backoffice.listing import pagination_links |
26 |
from wcs.qommon.storage import Null |
|
26 | 27 |
from wcs.roles import logged_users_role |
27 | 28 | |
28 | 29 |
class FormDefUI(object): |
... | ... | |
35 | 36 |
include_checkboxes=False): |
36 | 37 | |
37 | 38 |
partial_display = False |
39 |
using_postgresql = get_publisher().is_using_postgresql() |
|
40 | ||
38 | 41 | |
39 | 42 |
if not items: |
40 | 43 |
if offset and not limit: |
41 | 44 |
limit = int(get_publisher().get_site_option('default-page-size') or 20) |
45 |
if not criterias: |
|
46 |
criterias = [] |
|
47 |
if using_postgresql: |
|
48 |
criterias.append(Null('anonymised')) |
|
42 | 49 |
items, total_count = self.get_listing_items( |
43 | 50 |
selected_filter, offset, limit, query, order_by, |
44 | 51 |
criterias=criterias) |
... | ... | |
78 | 85 |
r += htmltext('<col />') |
79 | 86 |
r += htmltext('</colgroup>') |
80 | 87 | |
81 |
using_postgresql = get_publisher().is_using_postgresql() |
|
82 | ||
83 | 88 |
r += htmltext('<thead><tr>') |
84 | 89 |
if self.formdef.workflow.criticality_levels and using_postgresql: |
85 | 90 |
r += htmltext('<th class="criticality-level-cell" data-field-sort-key="criticality_level"><span></span></th>') |
... | ... | |
125 | 130 | |
126 | 131 |
def get_listing_item_ids(self, selected_filter='all', query=None, order_by=None, user=None, criterias=None, anonymise=False): |
127 | 132 |
formdata_class = self.formdef.data_class() |
133 |
clause_kwargs = {} |
|
134 |
if get_publisher().is_using_postgresql(): |
|
135 |
# pass criterias to all queries |
|
136 |
clause_kwargs = {'clause': criterias} |
|
128 | 137 |
if selected_filter == 'all': |
129 |
item_ids = formdata_class.keys() |
|
138 |
item_ids = formdata_class.keys(**clause_kwargs)
|
|
130 | 139 |
drafts = {x: True for x in formdata_class.get_ids_with_indexed_value('status', 'draft')} |
131 | 140 |
item_ids = [x for x in item_ids if x not in drafts] |
132 | 141 |
elif selected_filter == 'waiting': |
... | ... | |
145 | 154 |
item_ids = [] |
146 | 155 |
for status_id in applied_filters: |
147 | 156 |
item_ids.extend(formdata_class.get_ids_with_indexed_value( |
148 |
str('status'), status_id))
|
|
157 |
'status', status_id, **clause_kwargs))
|
|
149 | 158 | |
150 | 159 |
if query: |
151 | 160 |
query_ids = formdata_class.get_ids_from_query(query) |
... | ... | |
185 | 194 |
# get_sorted_ids is only implemented in the SQL backend |
186 | 195 |
order_by = None |
187 | 196 |
if order_by and not anonymise: |
188 |
ordered_ids = formdata_class.get_sorted_ids(order_by) |
|
197 |
ordered_ids = formdata_class.get_sorted_ids(order_by, clause=criterias)
|
|
189 | 198 |
item_ids_dict = {x: True for x in item_ids} |
190 | 199 |
item_ids = [x for x in ordered_ids if x in item_ids_dict] |
191 | 200 |
else: |
wcs/sql.py | ||
---|---|---|
951 | 951 | |
952 | 952 |
@classmethod |
953 | 953 |
@guard_postgres |
954 |
def keys(cls): |
|
954 |
def keys(cls, clause=None):
|
|
955 | 955 |
conn, cur = get_connection_and_cursor() |
956 |
where_clauses, parameters, func_clause = parse_clause(clause) |
|
957 |
assert not func_clause |
|
956 | 958 |
sql_statement = 'SELECT id FROM %s' % cls._table_name |
957 |
cur.execute(sql_statement) |
|
959 |
if where_clauses: |
|
960 |
sql_statement += ' WHERE ' + ' AND '.join(where_clauses) |
|
961 |
cur.execute(sql_statement, parameters) |
|
958 | 962 |
ids = [x[0] for x in cur.fetchall()] |
959 | 963 |
conn.commit() |
960 | 964 |
cur.close() |
... | ... | |
1235 | 1239 | |
1236 | 1240 |
@classmethod |
1237 | 1241 |
@guard_postgres |
1238 |
def get_sorted_ids(cls, order_by): |
|
1242 |
def get_sorted_ids(cls, order_by, clause=None):
|
|
1239 | 1243 |
conn, cur = get_connection_and_cursor() |
1240 | 1244 |
sql_statement = 'SELECT id FROM %s' % cls._table_name |
1245 |
where_clauses, parameters, func_clause = parse_clause(clause) |
|
1246 |
assert not func_clause |
|
1247 |
if where_clauses: |
|
1248 |
sql_statement += ' WHERE ' + ' AND '.join(where_clauses) |
|
1241 | 1249 |
if order_by.startswith('-'): |
1242 | 1250 |
order_by = order_by[1:] |
1243 | 1251 |
sql_statement += ' ORDER BY %s DESC' % order_by.replace('-', '_') |
1244 | 1252 |
else: |
1245 | 1253 |
sql_statement += ' ORDER BY %s' % order_by.replace('-', '_') |
1246 |
cur.execute(sql_statement) |
|
1254 |
cur.execute(sql_statement, parameters)
|
|
1247 | 1255 |
ids = [x[0] for x in cur.fetchall()] |
1248 | 1256 |
conn.commit() |
1249 | 1257 |
cur.close() |
... | ... | |
1604 | 1612 | |
1605 | 1613 |
@classmethod |
1606 | 1614 |
@guard_postgres |
1607 |
def get_ids_with_indexed_value(cls, index, value, auto_fallback=True): |
|
1615 |
def get_ids_with_indexed_value(cls, index, value, auto_fallback=True, clause=None):
|
|
1608 | 1616 |
conn, cur = get_connection_and_cursor() |
1609 | 1617 | |
1618 |
where_clauses, parameters, func_clause = parse_clause(clause) |
|
1619 |
assert not func_clause |
|
1620 | ||
1610 | 1621 |
if type(value) is int: |
1611 | 1622 |
value = str(value) |
1612 | 1623 | |
... | ... | |
1618 | 1629 |
sql_statement = '''SELECT id FROM %s WHERE %s = %%(value)s''' % ( |
1619 | 1630 |
cls._table_name, |
1620 | 1631 |
index) |
1621 |
cur.execute(sql_statement, {'value': value}) |
|
1632 | ||
1633 |
if where_clauses: |
|
1634 |
sql_statement += ' AND ' + ' AND '.join(where_clauses) |
|
1635 |
else: |
|
1636 |
parameters = {} |
|
1637 | ||
1638 |
parameters.update({'value': value}) |
|
1639 |
cur.execute(sql_statement, parameters) |
|
1622 | 1640 |
all_ids = [x[0] for x in cur.fetchall()] |
1623 | 1641 |
cur.close() |
1624 | 1642 |
return all_ids |
1625 |
- |