From b28633b37be220f5b8c8edffe31e6b22d3412ba1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20P=C3=A9ters?= Date: Tue, 20 Nov 2018 18:01:56 +0100 Subject: [PATCH] api: add possibility of ignoring roles to /api/forms/ (#28201) --- tests/test_api.py | 44 ++++++++++++++++++++++++++++++++---- wcs/api.py | 28 ++++++++++++++++++++--- wcs/backoffice/management.py | 14 ++++++++---- 3 files changed, 73 insertions(+), 13 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 0337e7ee9..f7f704871 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1742,6 +1742,11 @@ def test_api_global_geojson(pub, local_user): assert len(resp.json['features']) == 20 def test_api_global_listing(pub, local_user): + if not pub.is_using_postgresql(): + resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404) + pytest.skip('this requires SQL') + return + Role.wipe() role = Role(name='test') role.store() @@ -1772,11 +1777,6 @@ def test_api_global_listing(pub, local_user): formdata.jump_status('finished') formdata.store() - if not pub.is_using_postgresql(): - resp = get_app(pub).get(sign_uri('/api/forms/geojson', user=local_user), status=404) - pytest.skip('this requires SQL') - return - # check empty content if user doesn't have the appropriate role resp = get_app(pub).get(sign_uri('/api/forms/', user=local_user)) assert len(resp.json['data']) == 0 @@ -1793,6 +1793,40 @@ def test_api_global_listing(pub, local_user): resp = get_app(pub).get(sign_uri('/api/forms/?status=done', user=local_user)) assert len(resp.json['data']) == 20 +def test_api_global_listing_ignored_roles(pub, local_user): + test_api_global_listing(pub, local_user) + + role = Role(name='test2') + role.store() + + formdef = FormDef() + formdef.name = 'test2' + formdef.workflow_roles = {'_receiver': role.id} + formdef.fields = [ + fields.StringField(id='0', label='foobar', varname='foobar'), + ] + formdef.store() + + data_class = formdef.data_class() + data_class.wipe() + + for i in range(10): + formdata = data_class() + date = time.strptime('2014-01-20', '%Y-%m-%d') + formdata.data = {'0': 'FOO BAR'} + formdata.user_id = local_user.id + formdata.just_created() + formdata.jump_status('new') + formdata.store() + + # considering roles + resp = get_app(pub).get(sign_uri('/api/forms/?status=all&limit=100', user=local_user)) + assert len(resp.json['data']) == 30 + + # ignore roles + resp = get_app(pub).get(sign_uri('/api/forms/?status=all&limit=100&ignore-roles=on', user=local_user)) + assert len(resp.json['data']) == 40 + @pytest.fixture def ics_data(local_user): Role.wipe() diff --git a/wcs/api.py b/wcs/api.py index 246787f1f..056710c6c 100644 --- a/wcs/api.py +++ b/wcs/api.py @@ -195,6 +195,8 @@ class ApiFormsDirectory(Directory): # grant access to admins, to ease debug if not (get_request().user and get_request().user.is_admin): raise AccessForbiddenError('user not authenticated') + if get_request().form.get('ignore-roles') == 'on' and not get_request().user.can_go_in_backoffice(): + raise AccessForbiddenError('user not allowed to ignore roles') def _q_index(self): if not get_publisher().is_using_postgresql(): @@ -207,6 +209,9 @@ class ApiFormsDirectory(Directory): management_directory = ManagementDirectory() criterias = management_directory.get_global_listing_criterias() + if get_request().form.get('ignore-roles') == 'on': + roles_criterias = criterias + criterias = management_directory.get_global_listing_criterias(ignore_user_roles=True) limit = int(get_request().form.get('limit', get_publisher().get_site_option('default-page-size') or 20)) @@ -214,9 +219,26 @@ class ApiFormsDirectory(Directory): order_by = get_request().form.get('order_by', get_publisher().get_site_option('default-sort-order') or '-receipt_time') - output = [get_formdata_dict(x, user=get_request().user, consider_status_visibility=False) - for x in sql.AnyFormData.select( - criterias, order_by=order_by, limit=limit, offset=offset)] + formdatas = sql.AnyFormData.select(criterias, order_by=order_by, limit=limit, offset=offset) + if get_request().form.get('ignore-roles') == 'on': + # When ignoring roles formdatas will be returned even if they are + # not readable by the user, an additional attribute (readable) is + # added to differentiate readable and non-readable formdatas. + # + # A full SQL query is run as it will benefit from cached + # concerned_roles/action_roles. + limited_formdatas = [(x.formdef.id, x.id) for x in sql.AnyFormData.select( + roles_criterias, order_by=order_by, limit=limit, offset=offset)] + output = [] + for formdata in formdatas: + formdata_dict = get_formdata_dict(formdata, + user=get_request().user, + consider_status_visibility=False) + formdata_dict['readable'] = bool((formdata.formdef.id, formdata.id) in limited_formdatas) + output.append(formdata_dict) + else: + output = [get_formdata_dict(x, user=get_request().user, consider_status_visibility=False) + for x in formdatas] get_response().set_content_type('application/json') return json.dumps({'data': output}, diff --git a/wcs/backoffice/management.py b/wcs/backoffice/management.py index de7b18f5c..09734b200 100644 --- a/wcs/backoffice/management.py +++ b/wcs/backoffice/management.py @@ -765,7 +765,7 @@ class ManagementDirectory(Directory): r += htmltext('') return r.getvalue() - def get_global_listing_criterias(self): + def get_global_listing_criterias(self, ignore_user_roles=False): parsed_values = {} user_roles = [logged_users_role().id] if get_request().user and get_request().user.roles: @@ -782,15 +782,19 @@ class ManagementDirectory(Directory): status = 'open' if status == 'waiting': criterias.append(Equal('is_at_endpoint', False)) - criterias.append(Intersects('actions_roles_array', user_roles)) + if not ignore_user_roles: + criterias.append(Intersects('actions_roles_array', user_roles)) elif status == 'open': criterias.append(Equal('is_at_endpoint', False)) - criterias.append(Intersects('concerned_roles_array', user_roles)) + if not ignore_user_roles: + criterias.append(Intersects('concerned_roles_array', user_roles)) elif status == 'done': criterias.append(Equal('is_at_endpoint', True)) - criterias.append(Intersects('concerned_roles_array', user_roles)) + if not ignore_user_roles: + criterias.append(Intersects('concerned_roles_array', user_roles)) elif status == 'all': - criterias.append(Intersects('concerned_roles_array', user_roles)) + if not ignore_user_roles: + criterias.append(Intersects('concerned_roles_array', user_roles)) if get_request().form.get('submission_channel'): if get_request().form.get('submission_channel') == 'web': criterias.append(Null('submission_channel')) -- 2.19.1