1 |
1 |
import mock
|
2 |
2 |
import pytest
|
3 |
3 |
|
4 |
|
from passerelle.apps.opengis.models import OpenGIS
|
|
4 |
from django.core.management import call_command
|
|
5 |
|
|
6 |
from passerelle.apps.opengis.models import OpenGIS, Query, FeatureCache
|
|
7 |
from passerelle.base.models import Job
|
|
8 |
from passerelle.utils import import_site
|
5 |
9 |
|
6 |
10 |
import utils
|
7 |
11 |
|
... | ... | |
253 |
257 |
wfs_service_url='http://example.net/wfs'))
|
254 |
258 |
|
255 |
259 |
|
|
260 |
@pytest.fixture
|
|
261 |
def query(connector):
|
|
262 |
return Query.objects.create(
|
|
263 |
resource=connector,
|
|
264 |
name='Test Query',
|
|
265 |
slug='test_query',
|
|
266 |
description='Test query.',
|
|
267 |
typename='pvo_patrimoine_voirie.pvoparking',
|
|
268 |
filter_expression=('<Filter><PropertyIsEqualTo><PropertyName>typeparking'
|
|
269 |
'</PropertyName></PropertyIsEqualTo></Filter>')
|
|
270 |
)
|
|
271 |
|
|
272 |
|
256 |
273 |
def geoserver_responses(url, **kwargs):
|
257 |
274 |
if kwargs['params'].get('request') == 'GetCapabilities':
|
258 |
275 |
assert kwargs['params'].get('service')
|
... | ... | |
277 |
294 |
return utils.FakedResponse(status_code=200, content=FAKE_ERROR[:10])
|
278 |
295 |
|
279 |
296 |
|
|
297 |
def geoserver_geolocated_responses(url, **kwargs):
|
|
298 |
if kwargs['params'].get('request') == 'GetCapabilities':
|
|
299 |
return utils.FakedResponse(status_code=200, content=FAKE_SERVICE_CAPABILITIES)
|
|
300 |
return utils.FakedResponse(status_code=200, content=FAKE_GEOLOCATED_FEATURE)
|
|
301 |
|
|
302 |
|
280 |
303 |
@mock.patch('passerelle.utils.Request.get')
|
281 |
304 |
def test_feature_info(mocked_get, app, connector):
|
282 |
305 |
endpoint = utils.generic_endpoint_url('opengis', 'feature_info', slug=connector.slug)
|
... | ... | |
499 |
522 |
})
|
500 |
523 |
assert resp.json['err'] == 1
|
501 |
524 |
assert resp.json['err_desc'] == 'Webservice returned status code 404'
|
|
525 |
|
|
526 |
|
|
527 |
@mock.patch('passerelle.utils.Request.get')
|
|
528 |
def test_opengis_query_cache_update(mocked_get, app, connector, query):
|
|
529 |
mocked_get.side_effect = geoserver_geolocated_responses
|
|
530 |
query.update_cache()
|
|
531 |
|
|
532 |
assert mocked_get.call_args[1]['params']['filter'] == query.filter_expression
|
|
533 |
assert mocked_get.call_args[1]['params']['typenames'] == query.typename
|
|
534 |
assert FeatureCache.objects.count() == 4
|
|
535 |
|
|
536 |
feature = FeatureCache.objects.get(lon=1914059.51, lat=4224699.2)
|
|
537 |
assert feature.data['properties']['code_post'] == 38000
|
|
538 |
|
|
539 |
|
|
540 |
@mock.patch('passerelle.utils.Request.get')
|
|
541 |
def test_opengis_query_q_endpoint(mocked_get, app, connector, query):
|
|
542 |
endpoint = utils.generic_endpoint_url('opengis', 'query/test_query/', slug=connector.slug)
|
|
543 |
assert endpoint == '/opengis/test/query/test_query/'
|
|
544 |
mocked_get.side_effect = geoserver_geolocated_responses
|
|
545 |
query.update_cache()
|
|
546 |
feature = FeatureCache.objects.get(lon=1914059.51, lat=4224699.2)
|
|
547 |
resp = app.get(endpoint)
|
|
548 |
|
|
549 |
assert len(resp.json['features']) == FeatureCache.objects.count()
|
|
550 |
|
|
551 |
feature_data = next(feature for feature in resp.json['features']
|
|
552 |
if feature['geometry']['coordinates'][0] == 1914059.51)
|
|
553 |
assert feature_data == feature.data
|
|
554 |
|
|
555 |
|
|
556 |
@mock.patch('passerelle.utils.Request.get')
|
|
557 |
def test_opengis_query_cache_update_change(mocked_get, app, connector, query):
|
|
558 |
mocked_get.side_effect = geoserver_geolocated_responses
|
|
559 |
query.update_cache()
|
|
560 |
assert FeatureCache.objects.count() == 4
|
|
561 |
|
|
562 |
def new_response(url, **kwargs):
|
|
563 |
if kwargs['params'].get('request') == 'GetCapabilities':
|
|
564 |
return utils.FakedResponse(status_code=200, content=FAKE_SERVICE_CAPABILITIES)
|
|
565 |
return utils.FakedResponse(
|
|
566 |
content='{"features": [{"properties": {}, "geometry": {"coordinates": [1, 1]}}]}',
|
|
567 |
status_code=200
|
|
568 |
)
|
|
569 |
mocked_get.side_effect = new_response
|
|
570 |
query.update_cache()
|
|
571 |
assert FeatureCache.objects.count() == 1
|
|
572 |
|
|
573 |
|
|
574 |
@mock.patch('passerelle.utils.Request.get')
|
|
575 |
def test_opengis_query_q_endpoint_cache_empty(mocked_get, app, connector, query):
|
|
576 |
endpoint = utils.generic_endpoint_url('opengis', 'query/test_query/', slug=connector.slug)
|
|
577 |
assert not FeatureCache.objects.exists()
|
|
578 |
resp = app.get(endpoint)
|
|
579 |
|
|
580 |
assert resp.json['err'] == 1
|
|
581 |
assert 'not synchronized' in resp.json['err_desc']
|
|
582 |
assert not FeatureCache.objects.exists()
|
|
583 |
assert mocked_get.call_count == 0
|
|
584 |
|
|
585 |
|
|
586 |
@mock.patch('passerelle.utils.Request.get')
|
|
587 |
def test_opengis_query_cache_update_jobs(mocked_get, app, connector, query):
|
|
588 |
mocked_get.side_effect = geoserver_geolocated_responses
|
|
589 |
|
|
590 |
# fixtures created one query
|
|
591 |
job = Job.objects.get(method_name='update_queries')
|
|
592 |
assert not FeatureCache.objects.exists()
|
|
593 |
|
|
594 |
connector.jobs()
|
|
595 |
assert FeatureCache.objects.count() == 4
|
|
596 |
job.refresh_from_db()
|
|
597 |
assert job.status == 'completed'
|
|
598 |
|
|
599 |
# modifying a query triggers an update
|
|
600 |
query.save()
|
|
601 |
assert Job.objects.filter(method_name='update_queries', status='registered').count() == 1
|
|
602 |
connector.jobs()
|
|
603 |
|
|
604 |
# modifying the connector triggers an update
|
|
605 |
connector.save()
|
|
606 |
assert Job.objects.filter(method_name='update_queries', status='registered').count() == 1
|
|
607 |
connector.jobs()
|
|
608 |
|
|
609 |
# two queries to update
|
|
610 |
query.save()
|
|
611 |
query.pk = None
|
|
612 |
query.slug = query.name = 'test2'
|
|
613 |
query.save()
|
|
614 |
with mock.patch.object(Query, 'update_cache') as mocked:
|
|
615 |
connector.jobs()
|
|
616 |
assert mocked.call_count == 2
|
|
617 |
|
|
618 |
# now only one
|
|
619 |
query.save()
|
|
620 |
with mock.patch.object(Query, 'update_cache') as mocked:
|
|
621 |
connector.jobs()
|
|
622 |
assert mocked.call_count == 1
|
|
623 |
|
|
624 |
|
|
625 |
@mock.patch('passerelle.utils.Request.get')
|
|
626 |
def test_opengis_query_cache_update_daily(mocked_get, app, connector, query):
|
|
627 |
mocked_get.side_effect = geoserver_geolocated_responses
|
|
628 |
assert not FeatureCache.objects.exists()
|
|
629 |
call_command('cron', 'daily')
|
|
630 |
assert FeatureCache.objects.count() == 4
|
|
631 |
|
|
632 |
|
|
633 |
@mock.patch('passerelle.utils.Request.get')
|
|
634 |
def test_opengis_query_endpoint_documentation(mocked_get, app, connector, query):
|
|
635 |
resp = app.get(connector.get_absolute_url())
|
|
636 |
assert query.name in resp.text
|
|
637 |
assert query.description in resp.text
|
|
638 |
assert '/opengis/test/q/test_query/' in resp.text
|
|
639 |
|
|
640 |
|
|
641 |
def test_opengis_export_import(query):
|
|
642 |
assert OpenGIS.objects.count() == 1
|
|
643 |
assert Query.objects.count() == 1
|
|
644 |
serialization = {'resources': [query.resource.export_json()]}
|
|
645 |
OpenGIS.objects.all().delete()
|
|
646 |
assert OpenGIS.objects.count() == 0
|
|
647 |
assert Query.objects.count() == 0
|
|
648 |
import_site(serialization)
|
|
649 |
assert OpenGIS.objects.count() == 1
|
|
650 |
assert Query.objects.count() == 1
|
502 |
|
-
|