Project

General

Profile

0002-misc-turn-urlopen-into-a-_http_request-wrapper-19437.patch

Frédéric Péters, 29 November 2017 11:37 PM

Download (27.1 KB)

View differences:

Subject: [PATCH 2/2] misc: turn urlopen into a _http_request wrapper (#19437)

 tests/test_admin_pages.py |  2 +-
 tests/test_api.py         |  2 +-
 tests/test_datasource.py  | 31 ++++++++++++++++---------------
 tests/test_form_pages.py  | 16 ++++++++--------
 tests/test_hobo.py        |  8 +-------
 tests/utilities.py        | 15 ++++++++++++++-
 wcs/admin/forms.py        | 15 ++++-----------
 wcs/admin/settings.py     |  9 +++------
 wcs/admin/workflows.py    |  8 ++------
 wcs/api.py                |  3 +--
 wcs/ctl/check_hobos.py    |  8 +++-----
 wcs/data_sources.py       |  7 ++-----
 wcs/portfolio.py          |  4 ++--
 wcs/qommon/ident/idp.py   | 27 ++++++++++-----------------
 wcs/qommon/misc.py        | 16 ++++++++++++++--
 wcs/qommon/sms.py         | 27 +++++++++++++--------------
 16 files changed, 95 insertions(+), 103 deletions(-)
tests/test_admin_pages.py
resp = resp.forms[0].submit() # confirm delete
assert len(pub.cfg['idp']) == 0
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
idp_metadata_filename = os.path.join(os.path.dirname(__file__), 'idp_metadata.xml')
urlopen.side_effect = lambda *args: open(idp_metadata_filename)
resp = app.get('/backoffice/settings/identification/idp/idp/')
tests/test_api.py
assert orig == 'example.net'
def test_reverse_geocoding(pub):
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO('xxx')
get_app(pub).get('/api/reverse-geocoding', status=400)
resp = get_app(pub).get('/api/reverse-geocoding?lat=0&lon=0')
tests/test_datasource.py
# -*- coding: utf-8 -*-
from cStringIO import StringIO
import pytest
import os
import json
......
data_source2 = NamedDataSource.select()[0]
assert data_source2.data_source == data_source.data_source
with mock.patch('wcs.data_sources.urllib2') as urllib2:
urllib2.urlopen.return_value.read.return_value = \
'{"data": [{"id": 0, "text": "zéro"}, {"id": 1, "text": "uné"}, {"id": 2, "text": "deux"}]}'
with mock.patch('qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zéro"}, {"id": 1, "text": "uné"}, {"id": 2, "text": "deux"}]}')
assert data_sources.get_items({'type': 'foobar'}) == [
('0', 'zéro', '0', {"id": 0, "text": "zéro"}),
('1', 'uné', '1', {"id": 1, "text": "uné"}),
......
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json"}
data_source.store()
with mock.patch('wcs.data_sources.urllib2') as urllib2:
urllib2.urlopen.return_value.read.return_value = \
'{"data": [{"id": 0, "text": "zero"}]}'
with mock.patch('qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
signed_url = urllib2.urlopen.call_args[0][0]
signed_url = urlopen.call_args[0][0]
assert signed_url.startswith('https://api.example.com/json?')
parsed = urlparse.urlparse(signed_url)
querystring = urlparse.parse_qs(parsed.query)
......
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json?foo=bar"}
data_source.store()
with mock.patch('wcs.data_sources.urllib2') as urllib2:
urllib2.urlopen.return_value.read.return_value = \
'{"data": [{"id": 0, "text": "zero"}]}'
with mock.patch('qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
signed_url = urllib2.urlopen.call_args[0][0]
signed_url = urlopen.call_args[0][0]
assert signed_url.startswith('https://api.example.com/json?')
parsed = urlparse.urlparse(signed_url)
querystring = urlparse.parse_qs(parsed.query)
......
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json"}
data_source.store()
with mock.patch('wcs.data_sources.urllib2') as urllib2:
urllib2.urlopen.return_value.read.return_value = \
'{"data": [{"id": 0, "text": "zero"}]}'
with mock.patch('qommon.misc.urlopen') as urlopen:
urlopen.side_effect = lambda *args: StringIO(
'{"data": [{"id": 0, "text": "zero"}]}')
assert len(data_sources.get_items({'type': 'foobar'})) == 1
unsigned_url = urllib2.urlopen.call_args[0][0]
unsigned_url = urlopen.call_args[0][0]
assert unsigned_url == 'https://no-secret.example.com/json'
tests/test_form_pages.py
data_source=ds, display_disabled_items=True)]
formdef.store()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
......
formdef.data_class().wipe()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
......
display_disabled_items=False)]
formdef.store()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
......
data_source=ds, show_as_radio=True, display_disabled_items=True)]
formdef.store()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
......
formdef.data_class().wipe()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
......
data_source=ds, display_disabled_items=True)]
formdef.store()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello'}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
......
formdef.data_class().wipe()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
......
data_source=ds, display_disabled_items=False)]
formdef.store()
with mock.patch('urllib2.urlopen') as urlopen:
with mock.patch('qommon.misc.urlopen') as urlopen:
data = {'data': [{'id': '1', 'text': 'hello', 'disabled': True}, {'id': '2', 'text': 'world'}]}
urlopen.side_effect = lambda *args: StringIO.StringIO(json.dumps(data))
resp = get_app(pub).get('/test/')
tests/test_hobo.py
pub.cfg['idp'] = {}
service = [x for x in HOBO_JSON.get('services', []) if x.get('service-id') == 'wcs'][0]
# with an error retrieving metadata
hobo_cmd.configure_authentication_methods(service, pub)
# with real metadata
with mock.patch('urllib2.urlopen') as urlopen:
idp_metadata_filename = os.path.join(os.path.dirname(__file__), 'idp_metadata.xml')
urlopen.side_effect = lambda *args: open(idp_metadata_filename)
hobo_cmd.configure_authentication_methods(service, pub)
hobo_cmd.configure_authentication_methods(service, pub)
assert len(pub.cfg['idp'].keys()) == 1
assert pub.cfg['saml_identities']['registration-url']
tests/utilities.py
wcs.qommon.misc._http_request = self.http_request
qommon.misc._http_request = self.http_request
def http_request(self, url, method='GET', body=None, headers={}, timeout=None):
def http_request(self, url, method='GET', body=None, headers={},
cert_file=None, timeout=None, raise_on_http_errors=False):
self.requests.append(
{'url': url,
'method': method,
......
'http://remote.example.net/xml-errheader': (200, '<?xml version="1.0"><foo/>',
{'content-type': 'text/xml', 'x-error-code': '1'}),
'http://remote.example.net/connection-error': (None, None, None),
'http://authentic.example.net/idp/saml2/metadata': (
200, open(os.path.join(os.path.dirname(__file__), 'idp_metadata.xml')).read(), None),
}.get(base_url, (200, '', {}))
if url.startswith('file://'):
try:
status, data = 200, open(url[7:]).read()
except IOError:
status = 404
class FakeResponse(object):
def __init__(self, status, data, headers):
self.status = status
......
if status is None:
raise ConnectionError('error')
if raise_on_http_errors and not (200 <= status < 300):
raise ConnectionError('error in HTTP request to (status: %s)' % status)
return FakeResponse(status, data, headers), status, data, None
def get_last(self, attribute):
wcs/admin/forms.py
import difflib
import tarfile
import time
import urllib2
from cStringIO import StringIO
from quixote import get_response, redirect
......
elif form.get_widget('url').parse():
url = form.get_widget('url').parse()
try:
fp = urllib2.urlopen(url)
except urllib2.HTTPError as e:
form.set_error('url', _('Error loading form (%s).') % str(e))
raise ValueError()
except urllib2.URLError as e:
fp = misc.urlopen(url)
except misc.ConnectionError as e:
form.set_error('url', _('Error loading form (%s).') % str(e))
raise ValueError()
else:
......
elif form.get_widget('url').parse():
url = form.get_widget('url').parse()
try:
fp = urllib2.urlopen(url)
except urllib2.HTTPError as e:
form.set_error('url', _('Error loading form (%s).') % str(e))
raise ValueError()
except urllib2.URLError as e:
fp = misc.urlopen(url)
except misc.ConnectionError as e:
form.set_error('url', _('Error loading form (%s).') % str(e))
raise ValueError()
else:
wcs/admin/settings.py
import hashlib
import mimetypes
import os
import urllib2
try:
import lasso
except ImportError:
......
from qommon import _
from qommon import get_cfg
from qommon import errors
from qommon import misc
from qommon.form import *
from qommon.sms import SMS
......
def install_theme_from_url(self, url):
try:
fp = urllib2.urlopen(url)
except urllib2.HTTPError as e:
get_session().message = ('error', _('Error loading theme (%s).') % str(e))
return redirect('themes')
except urllib2.URLError as e:
fp = misc.urlopen(url)
except misc.ConnectionError as e:
get_session().message = ('error', _('Error loading theme (%s).') % str(e))
return redirect('themes')
wcs/admin/workflows.py
from subprocess import Popen, PIPE
import textwrap
import xml.etree.ElementTree as ET
import urllib2
from quixote import redirect, get_publisher
from quixote.directory import Directory
......
elif form.get_widget('url').parse():
url = form.get_widget('url').parse()
try:
fp = urllib2.urlopen(url)
except urllib2.HTTPError as e:
form.set_error('url', _('Error loading form (%s).') % str(e))
raise ValueError()
except urllib2.URLError as e:
fp = misc.urlopen(url)
except misc.ConnectionError as e:
form.set_error('url', _('Error loading form (%s).') % str(e))
raise ValueError()
else:
wcs/api.py
import json
import re
import time
import urllib2
import sys
from quixote import get_request, get_publisher, get_response
......
url += '&accept-language=%s' % (get_publisher().get_site_language() or 'en')
if get_publisher().get_site_option('nominatim_key'):
url += '&key=' + get_publisher().get_site_option('nominatim_key')
return urllib2.urlopen(url).read()
return misc.urlopen(url).read()
def roles(self):
get_response().set_content_type('application/json')
wcs/ctl/check_hobos.py
import hashlib
from quixote import cleanup
from qommon import misc
from qommon.ctl import Command, make_option
from qommon.storage import atomic_write
......
idp['base_url'] = idp['base_url'] + '/'
metadata_url = '%sidp/saml2/metadata' % idp['base_url']
try:
rfd = urllib2.urlopen(metadata_url)
except (urllib2.HTTPError, urllib2.URLError), e:
print >> sys.stderr, 'failed to get metadata URL', metadata_url, e
continue
except Exception, e:
rfd = misc.urlopen(metadata_url)
except misc.ConnectionError as e:
print >> sys.stderr, 'failed to get metadata URL', metadata_url, e
continue
wcs/data_sources.py
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import collections
import urllib2
import urllib
import urlparse
import xml.etree.ElementTree as ET
......
url = urlparse.urlunparse(parsed[:4] + (querystring,) + parsed[5:6])
url = sign_url(url, signature_key)
try:
entries = qommon.misc.json_loads(urllib2.urlopen(url).read())
entries = qommon.misc.json_loads(qommon.misc.urlopen(url).read())
if type(entries) is not dict:
raise ValueError('not a json dict')
if type(entries.get('data')) is not list:
......
item['text'] = item['id']
items.append(item)
return items
except urllib2.HTTPError as e:
get_logger().warn('Error loading JSON data source (%s)' % str(e))
except urllib2.URLError as e:
except qommon.misc.ConnectionError as e:
get_logger().warn('Error loading JSON data source (%s)' % str(e))
except ValueError as e:
get_logger().warn('Error reading JSON data source output (%s)' % str(e))
wcs/portfolio.py
import base64
from qommon import get_logger
from qommon.misc import http_get_page, json_loads, http_post_request
from qommon.misc import http_get_page, json_loads, http_post_request, urlopen
import qommon.form
from quixote import get_publisher, get_request, get_response, redirect, get_session
from quixote.directory import Directory
......
# Download file
# FIXME: handle error cases
url = request.form['url']
document = urllib.urlopen(request.form['url']).read()
document = urlopen(request.form['url']).read()
scheme, netloc, path, qs, frag = urlparse.urlsplit(url)
path = map(None, path.split('/'))
name = urllib.unquote(path[-1])
wcs/qommon/ident/idp.py
lasso = None
import urllib
import urllib2
import urlparse
from quixote.directory import Directory
......
metadata_url = form.get_widget('metadata_url').parse()
publickey_url = None
try:
rfd = urllib2.urlopen(metadata_url)
except (urllib2.HTTPError, urllib2.URLError), e:
if e.code == 404:
form.set_error('metadata_url', _('Resource not found'))
else:
form.set_error('metadata_url', _('HTTP error on retrieval: %s') % e.code)
rfd = misc.urlopen(metadata_url)
except misc.ConnectionError as e:
form.set_error('metadata_url', _('Failed to retrieve file (%s)') % e)
except:
form.set_error('metadata_url', _('Failed to retrieve file'))
else:
......
form.add(StringWidget, 'publickey_url', title = _('URL to public key'),
required=True, size = 60)
try:
rfd = urllib2.urlopen(publickey_url)
except (urllib2.HTTPError, urllib2.URLError), e:
if e.code == 404:
form.set_error('publickey_url', _('Resource not found'))
else:
form.set_error('publickey_url', _('HTTP error on retrieval: %s') % e.code)
rfd = misc.urlopen(publickey_url)
except misc.ConnectionError as e:
form.set_error('metadata_url', _('Failed to retrieve file (%s)') % e)
except:
form.set_error('publickey_url', _('Failed to retrieve file'))
else:
......
metadata_url = self.idp.get('metadata_url')
try:
metadata_fd = urllib2.urlopen(metadata_url)
except (urllib2.HTTPError, urllib2.URLError), e:
metadata_fd = misc.urlopen(metadata_url)
except misc.ConnectionError:
return template.error_page('failed to download')
metadata = metadata_fd.read()
publickey_url = self.idp.get('publickey_url')
if publickey_url:
try:
publickey_fd = urllib2.urlopen(publickey_url)
except (urllib2.HTTPError, urllib2.URLError), e:
publickey_fd = misc.urlopen(publickey_url)
except misc.ConnectionError:
return template.error_page('failed to download')
publickey = publickey_fd.read()
else:
wcs/qommon/misc.py
return formatstring % locals()
def _http_request(url, method='GET', body=None, headers={}, cert_file=None, timeout=None):
def _http_request(url, method='GET', body=None, headers={}, cert_file=None, timeout=None,
raise_on_http_errors=False):
get_publisher().reload_cfg()
if url.startswith('http://'):
......
except requests.Timeout:
raise ConnectionError('connection timed out while fetching the page')
except requests.RequestException as err:
raise ConnectionError('error in http request to to %s (%s)' % (hostname, err))
raise ConnectionError('error in HTTP request to %s (%s)' % (hostname, err))
else:
data = response.content
status = response.status_code
auth_header = response.headers.get('WWW-Authenticate')
if raise_on_http_errors and not (200 <= status < 300):
raise ConnectionError('error in HTTP request to (status: %s)' % status)
return response, status, data, auth_header
def urlopen(url, data=None):
response, status, data, auth_header = _http_request(
url, 'GET' if data is None else 'POST',
body=data,
raise_on_http_errors=True)
return StringIO(data)
def http_get_page(url, **kwargs):
return _http_request(url, **kwargs)
wcs/qommon/sms.py
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import re
import urllib2
import urllib
from qommon import errors
from qommon import errors, misc
from qommon import get_cfg, get_logger
from qommon.form import StringWidget, PasswordWidget
from wcs.wscalls import call_webservice
......
'qty': quality
})
try:
r = urllib2.urlopen("http://multilevel.mobyt.fr/sms/batch.php", params)
except Exception, e:
raise errors.SMSError("urlopen mobyt.fr failed (%s)" % e)
r = misc.urlopen("http://multilevel.mobyt.fr/sms/batch.php", params)
except misc.ConnectionError as e:
raise errors.SMSError("failed to POST to mobyt.fr (%s)" % e)
answer = r.read()
r.close()
if answer[:2] == "KO":
......
'type': type
})
try:
r = urllib2.urlopen("http://multilevel.mobyt.fr/sms/credit.php", params)
except Exception, e:
raise errors.SMSError("urlopen mobyt.fr failed (%s)" % e)
r = misc.urlopen("http://multilevel.mobyt.fr/sms/credit.php", params)
except misc.ConnectionError as e:
raise errors.SMSError("failed to POST to mobyt.fr (%s)" % e)
answer = r.read()
r.close()
if answer[:2] == "KO":
......
'flash': '0'
})
try:
r = urllib2.urlopen('http://sms.oxyd.fr/send.php', params)
except Exception, e:
r = misc.urlopen('http://sms.oxyd.fr/send.php', params)
except misc.ConnectionError as e:
# XXX: add proper handling of errors
raise errors.SMSError('urlopen oxyd.fr failed (%s)' % e)
raise errors.SMSError("failed to POST to oxyd.fr (%s)" % e)
r.close()
def get_sms_left(self, type='standard'):
......
'content': text[:160],
})
try:
r = urllib2.urlopen('http://sms.choosit.com/webservice', params)
except Exception, e:
r = misc.urlopen('http://sms.choosit.com/webservice', params)
except misc.ConnectionError as e:
# XXX: add proper handling of errors
raise errors.SMSError('urlopen choosit.com failed (%s)' % e)
raise errors.SMSError("failed to POST to choosit.com (%s)" % e)
r.close()
def get_sms_left(self, type='standard'):