Projet

Général

Profil

0001-opengis-correctly-handle-XML-formatted-OpenGIS-error.patch

Benjamin Dauvergne, 15 décembre 2017 22:55

Télécharger (5,63 ko)

Voir les différences:

Subject: [PATCH] opengis: correctly handle XML formatted OpenGIS errors
 (#20754)

 passerelle/apps/opengis/models.py |   27 +++++++++++++++++++++++++++
 tests/test_opengis.py             |   24 ++++++++++++++++++++++++
 2 files changed, 51 insertions(+)
passerelle/apps/opengis/models.py
16 16

  
17 17
import math
18 18
import xml.etree.ElementTree as ET
19
from HTMLParser import HTMLParser
19 20

  
20 21
import pyproj
21 22

  
......
101 102
                params['CQL_FILTER'] += ' AND %s %s \'%%%s%%\'' % (filter_property_name, operator, q)
102 103
        response = self.requests.get(self.wfs_service_url, params=params)
103 104
        data = []
105
        try:
106
            json_result = response.json()
107
        except ValueError:
108
            self.handle_opengis_error(response)
109
            # if handle_opengis_error did not raise an error, we raise a generic one
110
            raise APIError(u'OpenGIS Error: unparsable error',
111
                           data={'content': repr(response.content[:1024])})
104 112
        for feature in response.json()['features']:
105 113
            feature['text'] = feature['properties'].get(property_name)
106 114
            data.append(feature)
107 115
        return {'data': data}
108 116

  
117
    def handle_opengis_error(self, response):
118
        try:
119
            root = ET.fromstring(response.content)
120
        except ET.ParseError:
121
            return None
122
        if root.tag != '{http://www.opengis.net/ows/1.1}ExceptionReport':
123
            return None
124
        exception = root.find('{http://www.opengis.net/ows/1.1}Exception')
125
        exception_code = exception.attrib.get('exceptionCode')
126
        if exception is None:
127
            return None
128
        exception_text = exception.find('{http://www.opengis.net/ows/1.1}ExceptionText')
129
        if exception_text is None:
130
            return None
131
        content = exception_text.text
132
        htmlparser = HTMLParser()
133
        content = htmlparser.unescape(content)
134
        raise APIError(u'OpenGIS Error: %s' % exception_code or 'unknown code',
135
                       data={'text': content})
109 136

  
110 137
    @endpoint(perm='can_access',
111 138
              description=_('Get feature info'),
tests/test_opengis.py
99 99
    "type": "FeatureCollection"
100 100
}'''
101 101

  
102
FAKE_ERROR = u'<ows:ExceptionReport xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:ows="http://www.opengis.net/ows/1.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="2.0.0" xsi:schemaLocation="http://www.opengis.net/ows/1.1 https://sigmetropole.lametro.fr/geoserver/schemas/ows/1.1.0/owsAll.xsd">\n  <ows:Exception exceptionCode="NoApplicableCode">\n    <ows:ExceptionText>Could not parse CQL filter list.\nEncountered &amp;quot;BIS&amp;quot; at line 1, column 129.\nWas expecting one of:\n    &amp;lt;EOF&amp;gt; \n    &amp;quot;and&amp;quot; ...\n    &amp;quot;or&amp;quot; ...\n    &amp;quot;;&amp;quot; ...\n    &amp;quot;/&amp;quot; ...\n    &amp;quot;*&amp;quot; ...\n    &amp;quot;+&amp;quot; ...\n    &amp;quot;-&amp;quot; ...\n     Parsing : strEqualsIgnoreCase(nom_commune, &amp;apos;Grenoble&amp;apos;) = true AND strEqualsIgnoreCase(nom_voie, &amp;apos;rue albert recoura&amp;apos;) = true AND numero=8 BIS.</ows:ExceptionText>\n  </ows:Exception>\n</ows:ExceptionReport>\n'
103

  
102 104

  
103 105
@pytest.fixture
104 106
def connector(db):
......
112 114
        return utils.FakedResponse(status_code=200, content=FAKE_SERVICE_CAPABILITIES)
113 115
    return utils.FakedResponse(status_code=200, content=FAKE_FEATURES_JSON)
114 116

  
117
def geoserver_responses_errors(url, **kwargs):
118
    if kwargs['params'].get('request') == 'GetCapabilities':
119
        return utils.FakedResponse(status_code=200, content=FAKE_SERVICE_CAPABILITIES)
120
    return utils.FakedResponse(status_code=200, content=FAKE_ERROR)
121

  
115 122
@mock.patch('passerelle.utils.LoggedRequest.get')
116 123
def test_feature_info(mocked_get, app, connector):
117 124
    endpoint = utils.generic_endpoint_url('opengis', 'feature_info', slug=connector.slug)
......
184 191
    params.pop('cql_filter')
185 192
    resp = app.get(endpoint, params=params)
186 193
    assert 'CQL_FILTER' not in mocked_get.call_args[1]['params']
194

  
195
@mock.patch('passerelle.utils.LoggedRequest.get')
196
def test_get_feature(mocked_get, app, connector):
197
    endpoint = utils.generic_endpoint_url('opengis', 'features', slug=connector.slug)
198
    assert endpoint == '/opengis/test/features'
199
    mocked_get.side_effect = geoserver_responses_errors
200
    resp = app.get(endpoint, params={'type_names': 'ref_metro_limites_communales', 'property_name': 'nom'})
201
    assert mocked_get.call_args[1]['params']['REQUEST'] == 'GetFeature'
202
    assert mocked_get.call_args[1]['params']['PROPERTYNAME'] == 'nom'
203
    assert mocked_get.call_args[1]['params']['TYPENAMES'] == 'ref_metro_limites_communales'
204
    assert mocked_get.call_args[1]['params']['OUTPUTFORMAT'] == 'json'
205
    assert mocked_get.call_args[1]['params']['SERVICE'] == 'WFS'
206
    assert mocked_get.call_args[1]['params']['VERSION'] == connector.get_wfs_service_version()
207
    result = resp.json
208
    assert result['err'] == 1
209
    assert result['err_desc'] == 'OpenGIS Error: NoApplicableCode'
210
    assert 'Could not parse' in result['data']['text']
187
-