From 94665f08b20362a8d8af6bf0e23bca6a2487e951 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20P=C3=A9ters?= Date: Fri, 7 Dec 2018 18:34:10 +0100 Subject: [PATCH] api: filter user forms when requested by another user (#28732) --- tests/test_api.py | 52 +++++++++++++++++++++++++++++++++++++++++++++++ wcs/api.py | 32 +++++++++++++++++++++++++---- wcs/sql.py | 1 + 3 files changed, 81 insertions(+), 4 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 7904c6ca7..f6e0d73d5 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1321,6 +1321,58 @@ def test_user_forms(pub, local_user): draft_formdata = [x for x in resp.json['data'] if x['status'] == 'Draft'][0] assert draft_formdata.get('url')[-1] != '/' +def test_user_forms_from_agent(pub, local_user): + Role.wipe() + role = Role(name='Foo bar') + role.store() + + agent_user = get_publisher().user_class() + agent_user.name = 'Agent' + agent_user.email = 'agent@example.com' + agent_user.name_identifiers = ['ABCDE'] + agent_user.roles = [role.id] + agent_user.store() + + FormDef.wipe() + formdef = FormDef() + formdef.name = 'test' + formdef.fields = [ + fields.StringField(id='0', label='foobar', varname='foobar'), + fields.StringField(id='1', label='foobar2'),] + formdef.store() + formdef.data_class().wipe() + + formdata = formdef.data_class()() + formdata.data = {'0': 'foo@localhost', '1': 'xxx'} + formdata.user_id = local_user.id + formdata.just_created() + formdata.jump_status('new') + formdata.store() + + resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user)) + assert resp.json['err'] == 0 + assert len(resp.json['data']) == 1 + assert resp.json['data'][0]['form_name'] == 'test' + assert resp.json['data'][0]['form_slug'] == 'test' + assert resp.json['data'][0]['form_status'] == 'New' + assert resp.json['data'][0]['readable'] is False + + formdef.skip_from_360_view = True + formdef.store() + + resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user)) + assert len(resp.json['data']) == 0 + + formdef.workflow_roles = {'_receiver': str(role.id)} + formdef.store() + formdef.data_class().rebuild_security() + resp = get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user)) + assert len(resp.json['data']) == 1 + + agent_user.roles = [] + agent_user.store() + get_app(pub).get(sign_uri('/api/users/%s/forms' % local_user.id, user=agent_user), status=403) + def test_user_drafts(pub, local_user): FormDef.wipe() formdef = FormDef() diff --git a/wcs/api.py b/wcs/api.py index 930096b33..b8a4ab5c7 100644 --- a/wcs/api.py +++ b/wcs/api.py @@ -613,9 +613,32 @@ class ApiUserDirectory(Directory): return json.dumps({'err': 1, 'err_desc': 'unknown NameID', 'data': []}) if not user: return json.dumps({'err': 1, 'err_desc': 'no user specified', 'data': []}) - forms = [] + + forms = self.get_user_forms(user) + + if self.user: + query_user = get_user_from_api_query_string() or get_request().user + if query_user and query_user.id != self.user.id: + if not query_user.can_go_in_backoffice(): + raise AccessForbiddenError('user not allowed to query data from others') + # mark forms that are readable by querying user + user_roles = set(query_user.roles or []) + if get_publisher().is_using_postgresql(): + # use concerned_roles_array attribute that was saved in the + # table. + for form in forms: + form.readable = bool(set(form.concerned_roles_array).intersection(user_roles)) + else: + # recomputed concerned roles. + for form in forms: + concerned_roles_array = [str(x) for x in form.concerned_roles if x] + form.readable = bool(set(concerned_roles_array).intersection(user_roles)) + # ignore confidential forms + forms = [x for x in forms if x.readable or not x.formdef.skip_from_360_view] + include_drafts = include_drafts or get_request().form.get('include-drafts') == 'true' - for form in self.get_user_forms(user): + result = [] + for form in forms: if form.is_draft(): if not include_drafts: continue @@ -628,9 +651,10 @@ class ApiUserDirectory(Directory): if not formdata_dict: # skip hidden forms continue - forms.append(formdata_dict) + formdata_dict['readable'] = getattr(form, 'readable', True) + result.append(formdata_dict) - return json.dumps({'err': 0, 'data': forms}, + return json.dumps({'err': 0, 'data': result}, cls=misc.JSONEncoder, encoding=get_publisher().site_charset) diff --git a/wcs/sql.py b/wcs/sql.py index 79d80b254..c45e448c4 100644 --- a/wcs/sql.py +++ b/wcs/sql.py @@ -1960,6 +1960,7 @@ class AnyFormData(SqlMixin): cls.__table_static_fields.append(('criticality_level', 'criticality_level')) cls.__table_static_fields.append(('geoloc_base_x', 'geoloc_base_x')) cls.__table_static_fields.append(('geoloc_base_y', 'geoloc_base_y')) + cls.__table_static_fields.append(('concerned_roles_array', 'concerned_roles_array')) return cls.__table_static_fields @classmethod -- 2.20.0.rc2