1 |
1 |
import mock
|
2 |
2 |
import pytest
|
3 |
3 |
|
4 |
|
from passerelle.apps.opengis.models import OpenGIS
|
|
4 |
from django.core.management import call_command
|
|
5 |
|
|
6 |
from passerelle.apps.opengis.models import OpenGIS, Query, FeatureCache
|
|
7 |
from passerelle.base.models import Job
|
|
8 |
from passerelle.utils import import_site
|
5 |
9 |
|
6 |
10 |
import utils
|
7 |
11 |
|
... | ... | |
233 |
237 |
wfs_service_url='http://example.net/wfs'))
|
234 |
238 |
|
235 |
239 |
|
|
240 |
@pytest.fixture
|
|
241 |
def query(connector):
|
|
242 |
return Query.objects.create(
|
|
243 |
resource=connector,
|
|
244 |
name='Test Query',
|
|
245 |
slug='test_query',
|
|
246 |
description='Test query.',
|
|
247 |
typename='pvo_patrimoine_voirie.pvoparking',
|
|
248 |
filter_expression=('<Filter><PropertyIsEqualTo><PropertyName>typeparking'
|
|
249 |
'</PropertyName></PropertyIsEqualTo></Filter>')
|
|
250 |
)
|
|
251 |
|
|
252 |
|
236 |
253 |
def geoserver_responses(url, **kwargs):
|
237 |
254 |
if kwargs['params'].get('request') == 'GetCapabilities':
|
238 |
255 |
return utils.FakedResponse(status_code=200, content=FAKE_SERVICE_CAPABILITIES)
|
... | ... | |
255 |
272 |
return utils.FakedResponse(status_code=200, content=FAKE_ERROR[:10])
|
256 |
273 |
|
257 |
274 |
|
|
275 |
def geoserver_geolocated_responses(url, **kwargs):
|
|
276 |
if kwargs['params'].get('request') == 'GetCapabilities':
|
|
277 |
return utils.FakedResponse(status_code=200, content=FAKE_SERVICE_CAPABILITIES)
|
|
278 |
return utils.FakedResponse(status_code=200, content=FAKE_GEOLOCATED_FEATURE)
|
|
279 |
|
|
280 |
|
258 |
281 |
@mock.patch('passerelle.utils.Request.get')
|
259 |
282 |
def test_feature_info(mocked_get, app, connector):
|
260 |
283 |
endpoint = utils.generic_endpoint_url('opengis', 'feature_info', slug=connector.slug)
|
... | ... | |
474 |
497 |
})
|
475 |
498 |
assert resp.json['err'] == 1
|
476 |
499 |
assert resp.json['err_desc'] == 'Webservice returned status code 404'
|
|
500 |
|
|
501 |
|
|
502 |
@mock.patch('passerelle.utils.Request.get')
|
|
503 |
def test_opengis_query_cache_update(mocked_get, app, connector, query):
|
|
504 |
mocked_get.side_effect = geoserver_geolocated_responses
|
|
505 |
query.update_cache()
|
|
506 |
|
|
507 |
assert mocked_get.call_args[1]['params']['FILTER'] == query.filter_expression
|
|
508 |
assert mocked_get.call_args[1]['params']['TYPENAMES'] == query.typename
|
|
509 |
assert FeatureCache.objects.count() == 4
|
|
510 |
|
|
511 |
feature = FeatureCache.objects.get(lon=1914059.51, lat=4224699.2)
|
|
512 |
assert feature.data['properties']['code_post'] == 38000
|
|
513 |
|
|
514 |
|
|
515 |
@mock.patch('passerelle.utils.Request.get')
|
|
516 |
def test_opengis_query_q_endpoint(mocked_get, app, connector, query):
|
|
517 |
endpoint = utils.generic_endpoint_url('opengis', 'q/test_query/', slug=connector.slug)
|
|
518 |
assert endpoint == '/opengis/test/q/test_query/'
|
|
519 |
mocked_get.side_effect = geoserver_geolocated_responses
|
|
520 |
query.update_cache()
|
|
521 |
feature = FeatureCache.objects.get(lon=1914059.51, lat=4224699.2)
|
|
522 |
resp = app.get(endpoint)
|
|
523 |
|
|
524 |
assert len(resp.json['features']) == FeatureCache.objects.count()
|
|
525 |
|
|
526 |
feature_data = next(feature for feature in resp.json['features']
|
|
527 |
if feature['geometry']['coordinates'][0] == 1914059.51)
|
|
528 |
assert feature_data == feature.data
|
|
529 |
|
|
530 |
|
|
531 |
@mock.patch('passerelle.utils.Request.get')
|
|
532 |
def test_opengis_query_cache_update_change(mocked_get, app, connector, query):
|
|
533 |
mocked_get.side_effect = geoserver_geolocated_responses
|
|
534 |
query.update_cache()
|
|
535 |
assert FeatureCache.objects.count() == 4
|
|
536 |
|
|
537 |
def new_response(url, **kwargs):
|
|
538 |
if kwargs['params'].get('request') == 'GetCapabilities':
|
|
539 |
return utils.FakedResponse(status_code=200, content=FAKE_SERVICE_CAPABILITIES)
|
|
540 |
return utils.FakedResponse(
|
|
541 |
content='{"features": [{"properties": {}, "geometry": {"coordinates": [1, 1]}}]}',
|
|
542 |
status_code=200
|
|
543 |
)
|
|
544 |
mocked_get.side_effect = new_response
|
|
545 |
query.update_cache()
|
|
546 |
assert FeatureCache.objects.count() == 1
|
|
547 |
|
|
548 |
|
|
549 |
@mock.patch('passerelle.utils.Request.get')
|
|
550 |
def test_opengis_query_q_endpoint_cache_empty(mocked_get, app, connector, query):
|
|
551 |
endpoint = utils.generic_endpoint_url('opengis', 'q/test_query/', slug=connector.slug)
|
|
552 |
assert not FeatureCache.objects.exists()
|
|
553 |
resp = app.get(endpoint)
|
|
554 |
|
|
555 |
assert resp.json['err'] == 1
|
|
556 |
assert 'not synchronized' in resp.json['err_desc']
|
|
557 |
assert not FeatureCache.objects.exists()
|
|
558 |
assert mocked_get.call_count == 0
|
|
559 |
|
|
560 |
|
|
561 |
@mock.patch('passerelle.utils.Request.get')
|
|
562 |
def test_opengis_query_cache_update_jobs(mocked_get, app, connector, query):
|
|
563 |
mocked_get.side_effect = geoserver_geolocated_responses
|
|
564 |
|
|
565 |
# fixtures created one query
|
|
566 |
job = Job.objects.get(method_name='update_queries')
|
|
567 |
assert not FeatureCache.objects.exists()
|
|
568 |
|
|
569 |
connector.jobs()
|
|
570 |
assert FeatureCache.objects.count() == 4
|
|
571 |
job.refresh_from_db()
|
|
572 |
assert job.status == 'completed'
|
|
573 |
|
|
574 |
# modifying a query triggers an update
|
|
575 |
query.save()
|
|
576 |
assert Job.objects.filter(method_name='update_queries', status='registered').count() == 1
|
|
577 |
connector.jobs()
|
|
578 |
|
|
579 |
# modifying the connector triggers an update
|
|
580 |
connector.save()
|
|
581 |
assert Job.objects.filter(method_name='update_queries', status='registered').count() == 1
|
|
582 |
connector.jobs()
|
|
583 |
|
|
584 |
# two queries to update
|
|
585 |
query.save()
|
|
586 |
query.pk = None
|
|
587 |
query.slug = query.name = 'test2'
|
|
588 |
query.save()
|
|
589 |
with mock.patch.object(Query, 'update_cache') as mocked:
|
|
590 |
connector.jobs()
|
|
591 |
assert mocked.call_count == 2
|
|
592 |
|
|
593 |
# now only one
|
|
594 |
query.save()
|
|
595 |
with mock.patch.object(Query, 'update_cache') as mocked:
|
|
596 |
connector.jobs()
|
|
597 |
assert mocked.call_count == 1
|
|
598 |
|
|
599 |
|
|
600 |
@mock.patch('passerelle.utils.Request.get')
|
|
601 |
def test_opengis_query_cache_update_daily(mocked_get, app, connector, query):
|
|
602 |
mocked_get.side_effect = geoserver_geolocated_responses
|
|
603 |
assert not FeatureCache.objects.exists()
|
|
604 |
call_command('cron', 'daily')
|
|
605 |
assert FeatureCache.objects.count() == 4
|
|
606 |
|
|
607 |
|
|
608 |
@mock.patch('passerelle.utils.Request.get')
|
|
609 |
def test_opengis_query_endpoint_documentation(mocked_get, app, connector, query):
|
|
610 |
resp = app.get(connector.get_absolute_url())
|
|
611 |
assert query.name in resp.text
|
|
612 |
assert query.description in resp.text
|
|
613 |
assert '/opengis/test/q/test_query/' in resp.text
|
|
614 |
|
|
615 |
|
|
616 |
def test_opengis_export_import(query):
|
|
617 |
assert OpenGIS.objects.count() == 1
|
|
618 |
assert Query.objects.count() == 1
|
|
619 |
serialization = {'resources': [query.resource.export_json()]}
|
|
620 |
OpenGIS.objects.all().delete()
|
|
621 |
assert OpenGIS.objects.count() == 0
|
|
622 |
assert Query.objects.count() == 0
|
|
623 |
import_site(serialization)
|
|
624 |
assert OpenGIS.objects.count() == 1
|
|
625 |
assert Query.objects.count() == 1
|
477 |
|
-
|