From e087e3a8b9469a8e30a0b1f40b2fbc4c6cdcfbb2 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Fri, 8 Apr 2016 12:23:06 +0200 Subject: [PATCH 5/7] implement URL signatures in the file validation web-service calls (#10444) --- tests/test_backoffice_pages.py | 30 +++++++++++++++--------------- wcs/file_validation.py | 41 +++++++++++++++++++++++++++-------------- 2 files changed, 42 insertions(+), 29 deletions(-) diff --git a/tests/test_backoffice_pages.py b/tests/test_backoffice_pages.py index 311282a..6a57e6f 100644 --- a/tests/test_backoffice_pages.py +++ b/tests/test_backoffice_pages.py @@ -1703,9 +1703,9 @@ def test_backoffice_file_field_fargo_no_metadata(pub, fargo_url): assert not '0_structured' in formdata.data resp = app.get('/backoffice/management/form-title/%s/' % form_id) assert not 'Validate' in resp.body - with mock.patch('wcs.file_validation.http_post_request') as http_post_request: + with mock.patch('wcs.file_validation.fargo_post_json') as fargo_post_json: resp = app.get('/backoffice/management/form-title/%s/validate?field_id=0' % form_id) - assert http_post_request.call_count == 0 + assert fargo_post_json.call_count == 0 resp = resp.follow() assert not 'Valid ' in resp.body assert not 'Validate' in resp.body @@ -1780,7 +1780,7 @@ def test_backoffice_file_field_validation(pub, fargo_url): fragment = '%s : %s' % (metadata_field['label'], metadata[metadata_field['varname']]) assert fragment in resp.body - with mock.patch('wcs.file_validation.http_post_request') as http_post_request: + with mock.patch('wcs.file_validation.fargo_post_json') as fargo_post_json: payload = { 'user_nameid': '12345', 'origin': 'example.net', @@ -1799,18 +1799,17 @@ def test_backoffice_file_field_validation(pub, fargo_url): 'end': '1978-01-01', 'display': 'John Doe, 169 rue du château, 75014 PARIS' }) - http_post_request.return_value = None, 201, json.dumps(result), None + fargo_post_json.return_value = 201, result resp = app.get('/backoffice/management/form-title/%s/validate?field_id=0' % form_id) - assert http_post_request.call_count == 1 - assert http_post_request.call_args[0][0] == 'http://fargo.example.net/api/validation/justificatif-de-domicile/' - assert json_loads(http_post_request.call_args[0][1]) == payload - assert http_post_request.call_args[1] == {'headers': {'Content-Type': 'application/json'}} + assert fargo_post_json.call_count == 1 + assert fargo_post_json.call_args[0][0] == '/api/validation/justificatif-de-domicile/' + assert fargo_post_json.call_args[0][1] == payload resp = resp.follow() assert 'Valid from 1970-01-01 to 1978-01-01' in resp.body -def test_backoffice_file_validation_no_upload(pub, fargo_url): +def test_backoffice_file_validation_no_upload(pub, fargo_url, fargo_secret): document_type = { 'id': 'justificatif-de-domicile', 'fargo': True, @@ -1859,7 +1858,8 @@ def test_backoffice_file_validation_no_upload(pub, fargo_url): with mock.patch('wcs.file_validation.http_get_page') as http_get_page: http_get_page.return_value = None, 200, json.dumps(return_value), None resp = app.get('/form-title/') - http_get_page.assert_called_once_with( + assert http_get_page.call_count == 1 + assert http_get_page.call_args[0][0].startswith( 'http://fargo.example.net/api/validation/justificatif-de-domicile/?user_nameid=12345') assert validation['display'] in resp.body resp.forms[0]['f0$validation_url'] = 'zob' @@ -1872,16 +1872,16 @@ def test_backoffice_file_validation_no_upload(pub, fargo_url): } def side_effect(url): - if url == 'zob': + if url.startswith('http://fargo.example.net/zob'): return None, 200, json.dumps(return_value2), None else: return None, 200, json.dumps(return_value), None http_get_page.side_effect = side_effect resp = resp.forms[0].submit('submit') - assert http_get_page.call_args_list[0][0][0].endswith( - 'api/validation/justificatif-de-domicile/?user_nameid=12345') - assert http_get_page.call_args_list[1][0][0] == 'zob' - assert http_get_page.call_args_list[2][0][0] == 'zob' + assert http_get_page.call_count == 3 + assert 'api/validation/justificatif-de-domicile/?user_nameid=12345' in http_get_page.call_args_list[0][0][0] + assert http_get_page.call_args_list[1][0][0].startswith('http://fargo.example.net/zob') + assert http_get_page.call_args_list[2][0][0].startswith('http://fargo.example.net/zob') for key in metadata: assert 'value="%s"' % metadata[key] in resp.body assert 'Check values then click submit.' in resp.body diff --git a/wcs/file_validation.py b/wcs/file_validation.py index 1bbe5b9..db5c0d9 100644 --- a/wcs/file_validation.py +++ b/wcs/file_validation.py @@ -23,20 +23,40 @@ from qommon import get_logger from qommon.misc import http_get_page, json_loads, http_post_request, ConnectionError from quixote import get_publisher, get_request +from wcs.api_utils import get_secret_and_orig, sign_url + def has_file_validation(): return get_publisher().get_site_option('fargo_url') is not None -def fargo_get(path): +def fargo_url(url): fargo_url = get_publisher().get_site_option('fargo_url') - url = urlparse.urljoin(fargo_url, path) + url = urlparse.urljoin(fargo_url, url) + secret, orig = get_secret_and_orig(url) + if '?' in url: + url += '&orig=%s' % orig + else: + url += '?orig=%s' % orig + return sign_url(url, secret) + + +def fargo_get(url): + url = fargo_url(url) response, status, data, auth_header = http_get_page(url) if status == 200: return json_loads(data) return None +def fargo_post_json(url, payload): + url = fargo_url(url) + headers = {'Content-Type': 'application/json'} + response, status, response_payload, auth_header = http_post_request( + url, json.dumps(payload), headers=headers) + return status, json_loads(response_payload) + + def sha256_of_upload(upload): return hashlib.sha256(upload.get_content()).hexdigest() @@ -67,13 +87,11 @@ def get_document_types(): def get_validation(url): try: - response, status, data, auth_header = http_get_page(url) + result = fargo_get(url) except ConnectionError: get_logger().warning('unable to retrieve validation from fargo') return None - if status == 200: - return json_loads(data)['data'] - return None + return result['data'] if result else None def get_validations(document_type): @@ -107,9 +125,7 @@ def is_valid(filled, field, upload): def validate(filled, field, upload): '''Compute link to Fargo to validate the given document''' document_type_id = field.document_type['id'] - path = 'api/validation/%s/' % urllib.quote(document_type_id) - fargo_url = get_publisher().get_site_option('fargo_url') - url = urlparse.urljoin(fargo_url, path) + url = '/api/validation/%s/' % urllib.quote(document_type_id) payload = {} if filled.user: if filled.user.name_identifiers: @@ -122,15 +138,12 @@ def validate(filled, field, upload): for meta_field in field.metadata: if 'varname' in meta_field: payload[meta_field['varname']] = upload.metadata.get(meta_field['varname'], '') - headers = {'Content-Type': 'application/json'} try: - response, status, response_payload, auth_header = http_post_request(url, - json.dumps(payload), - headers=headers) + status, response = fargo_post_json(url, payload) except ConnectionError: get_logger().warning('unable to validate document on fargo for %s', filled.get_display_id()) return if status == 201: - upload.metadata = json_loads(response_payload)['data'] + upload.metadata = response['data'] filled.data['%s_structured' % field.id] = upload.metadata filled.store() -- 2.1.4