0001-datasource-duplicate-and-configure-agenda-datasource.patch
tests/admin_pages/test_datasource.py | ||
---|---|---|
271 | 271 |
] |
272 | 272 | |
273 | 273 | |
274 |
def test_data_sources_agenda_manual_qs_data_type_options(pub): |
|
275 |
create_superuser(pub) |
|
276 | ||
277 |
data_source = NamedDataSource(name='foobar') |
|
278 |
data_source.external = 'agenda_manual' |
|
279 |
data_source.store() |
|
280 | ||
281 |
if not pub.site_options.has_section('options'): |
|
282 |
pub.site_options.add_section('options') |
|
283 |
pub.site_options.set('options', 'disable-python-expressions', 'false') |
|
284 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
285 |
pub.site_options.write(fd) |
|
286 | ||
287 |
app = login(get_app(pub)) |
|
288 |
resp = app.get('/backoffice/settings/data-sources/%s/edit' % data_source.id) |
|
289 |
assert resp.form['qs_data$element0value$type'].options == [ |
|
290 |
('text', False, 'Text'), |
|
291 |
('template', False, 'Template'), |
|
292 |
] |
|
293 | ||
294 |
pub.site_options.set('options', 'disable-python-expressions', 'true') |
|
295 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
296 |
pub.site_options.write(fd) |
|
297 | ||
298 |
resp = app.get('/backoffice/settings/data-sources/%s/edit' % data_source.id) |
|
299 |
assert resp.form['qs_data$element0value$type'].options == [ |
|
300 |
('text', False, 'Text'), |
|
301 |
('template', False, 'Template'), |
|
302 |
] |
|
303 | ||
304 | ||
274 | 305 |
def test_data_sources_category(pub): |
275 | 306 |
create_superuser(pub) |
276 | 307 | |
... | ... | |
357 | 388 |
data_source.data_source = {'type': 'formula', 'value': '[]'} |
358 | 389 |
data_source.store() |
359 | 390 |
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id) |
391 |
assert 'This data source is readonly.' not in resp |
|
392 |
assert 'href="edit"' in resp |
|
393 |
assert 'href="delete"' in resp |
|
394 |
assert 'href="duplicate"' in resp |
|
360 | 395 |
assert 'Type of source: Python Expression' in resp.text |
361 | 396 |
assert 'Python Expression' in resp.text |
362 | 397 |
assert 'Preview' not in resp.text |
... | ... | |
541 | 576 |
assert 'Unexpected fatal error getting items for preview' in resp.text |
542 | 577 | |
543 | 578 | |
579 |
def test_data_sources_duplicate(pub): |
|
580 |
create_superuser(pub) |
|
581 |
NamedDataSource.wipe() |
|
582 | ||
583 |
data_source = NamedDataSource(name='foobar') |
|
584 |
data_source.data_source = {'type': 'json', 'value': '{{data_source.foobar}}'} |
|
585 |
data_source.store() |
|
586 | ||
587 |
app = login(get_app(pub)) |
|
588 |
resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id) |
|
589 |
resp = resp.click(href='duplicate') |
|
590 |
resp = resp.forms[0].submit() |
|
591 |
assert NamedDataSource.count() == 2 |
|
592 |
new_data_source = NamedDataSource.select(order_by='id')[1] |
|
593 |
assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s' % new_data_source.id |
|
594 |
assert new_data_source.data_source == {'type': 'json', 'value': '{{data_source.foobar}}'} |
|
595 |
assert new_data_source.external is None |
|
596 | ||
597 | ||
544 | 598 |
def test_data_sources_agenda_view(pub): |
545 | 599 |
create_superuser(pub) |
546 | 600 |
NamedDataSource.wipe() |
... | ... | |
553 | 607 |
app = login(get_app(pub)) |
554 | 608 |
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id) |
555 | 609 |
assert 'This data source is readonly.' in resp |
556 |
assert '/backoffice/settings/data-sources/%s/edit' % data_source.id not in resp |
|
557 |
assert '/backoffice/settings/data-sources/%s/delete' % data_source.id not in resp |
|
610 |
assert 'href="edit"' not in resp |
|
611 |
assert 'href="delete"' not in resp |
|
612 |
assert 'href="duplicate"' in resp |
|
613 | ||
614 | ||
615 |
def test_data_sources_agenda_duplicate(pub): |
|
616 |
create_superuser(pub) |
|
617 |
NamedDataSource.wipe() |
|
618 | ||
619 |
data_source = NamedDataSource(name='foobar') |
|
620 |
data_source.data_source = {'type': 'json', 'value': 'http://some.url'} |
|
621 |
data_source.external = 'agenda' |
|
622 |
data_source.store() |
|
623 | ||
624 |
app = login(get_app(pub)) |
|
625 |
resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id) |
|
626 |
resp = resp.forms[0].submit('cancel') |
|
627 |
assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s/' % data_source.id |
|
628 |
assert NamedDataSource.count() == 1 |
|
629 | ||
630 |
resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id) |
|
631 |
resp = resp.click(href='duplicate') |
|
632 |
resp = resp.forms[0].submit() |
|
633 |
assert NamedDataSource.count() == 2 |
|
634 |
new_data_source = NamedDataSource.select(order_by='id')[1] |
|
635 |
assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s' % new_data_source.id |
|
636 |
assert new_data_source.data_source == {'type': 'json', 'value': 'http://some.url'} |
|
637 |
assert new_data_source.external == 'agenda_manual' |
|
638 |
assert new_data_source.qs_data is None |
|
639 | ||
640 | ||
641 |
def test_data_sources_agenda_manual_view(pub): |
|
642 |
create_superuser(pub) |
|
643 |
NamedDataSource.wipe() |
|
644 | ||
645 |
data_source = NamedDataSource(name='foobar') |
|
646 |
data_source.data_source = {'type': 'json', 'value': 'http://some.url'} |
|
647 |
data_source.external = 'agenda_manual' |
|
648 |
data_source.qs_data = {'var1': 'value1', 'var2': 'value2'} |
|
649 |
data_source.store() |
|
650 | ||
651 |
app = login(get_app(pub)) |
|
652 |
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id) |
|
653 |
assert 'This data source is readonly.' not in resp |
|
654 |
assert 'href="edit"' in resp |
|
655 |
assert 'href="delete"' in resp |
|
656 |
assert 'href="duplicate"' in resp |
|
657 |
assert 'Type of source: Agenda data' in resp |
|
658 |
assert 'Copy of' not in resp |
|
659 |
assert 'Extra query string data' in resp |
|
660 |
assert '<li>var1: value1</li>' in resp |
|
661 |
assert '<li>var2: value2</li>' in resp |
|
662 | ||
663 |
data_source.qs_data = None |
|
664 |
data_source.store() |
|
665 |
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id) |
|
666 |
assert 'Extra Query string data' not in resp |
|
667 | ||
668 |
data_source2 = NamedDataSource(name='foobar') |
|
669 |
data_source2.data_source = {'type': 'json', 'value': 'http://some.url'} |
|
670 |
data_source2.external = 'agenda' |
|
671 |
data_source2.store() |
|
672 |
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id) |
|
673 |
assert ( |
|
674 |
'Copy of: <a href="http://example.net/backoffice/settings/data-sources/%s/">foobar</a>' |
|
675 |
% data_source2.id |
|
676 |
in resp |
|
677 |
) |
|
678 | ||
679 | ||
680 |
def test_data_sources_agenda_manual_duplicate(pub): |
|
681 |
create_superuser(pub) |
|
682 |
NamedDataSource.wipe() |
|
683 | ||
684 |
data_source = NamedDataSource(name='foobar') |
|
685 |
data_source.data_source = {'type': 'json', 'value': 'http://some.url'} |
|
686 |
data_source.external = 'agenda_manual' |
|
687 |
data_source.qs_data = {'var1': 'value1', 'var2': 'value2'} |
|
688 |
data_source.store() |
|
689 | ||
690 |
app = login(get_app(pub)) |
|
691 |
resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id) |
|
692 |
resp = resp.click(href='duplicate') |
|
693 |
resp = resp.forms[0].submit() |
|
694 |
assert NamedDataSource.count() == 2 |
|
695 |
new_data_source = NamedDataSource.select(order_by='id')[1] |
|
696 |
assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s' % new_data_source.id |
|
697 |
assert new_data_source.data_source == {'type': 'json', 'value': 'http://some.url'} |
|
698 |
assert new_data_source.external == 'agenda_manual' |
|
699 |
assert new_data_source.qs_data == {'var1': 'value1', 'var2': 'value2'} |
|
700 | ||
701 | ||
702 |
def test_data_sources_user_view(pub): |
|
703 |
create_superuser(pub) |
|
704 |
NamedDataSource.wipe() |
|
705 | ||
706 |
data_source = NamedDataSource(name='foobar') |
|
707 |
data_source.data_source = {'type': 'wcs:users'} |
|
708 |
data_source.store() |
|
709 | ||
710 |
app = login(get_app(pub)) |
|
711 |
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id) |
|
712 |
assert 'This data source is readonly.' not in resp |
|
713 |
assert 'href="edit"' in resp |
|
714 |
assert 'href="delete"' in resp |
|
715 |
assert 'href="duplicate"' in resp |
|
716 | ||
717 | ||
718 |
def test_data_sources_user_duplicate(pub): |
|
719 |
create_superuser(pub) |
|
720 |
NamedDataSource.wipe() |
|
721 | ||
722 |
data_source = NamedDataSource(name='foobar') |
|
723 |
data_source.data_source = {'type': 'wcs:users'} |
|
724 |
data_source.store() |
|
725 | ||
726 |
app = login(get_app(pub)) |
|
727 |
resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id) |
|
728 |
resp = resp.click(href='duplicate') |
|
729 |
resp = resp.forms[0].submit() |
|
730 |
assert NamedDataSource.count() == 2 |
|
731 |
new_data_source = NamedDataSource.select(order_by='id')[1] |
|
732 |
assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s' % new_data_source.id |
|
733 |
assert new_data_source.data_source == {'type': 'wcs:users', 'value': ''} |
|
734 |
assert new_data_source.external is None |
|
558 | 735 | |
559 | 736 | |
560 | 737 |
def test_data_sources_edit(pub): |
... | ... | |
605 | 782 |
assert resp.location == 'http://example.net/backoffice/settings/data-sources/1/' |
606 | 783 | |
607 | 784 | |
785 |
def test_data_sources_agenda_manual_edit(pub): |
|
786 |
create_superuser(pub) |
|
787 |
NamedDataSource.wipe() |
|
788 | ||
789 |
data_source = NamedDataSource(name='foobar') |
|
790 |
data_source.external = 'agenda_manual' |
|
791 |
data_source.store() |
|
792 | ||
793 |
app = login(get_app(pub)) |
|
794 |
resp = app.get('/backoffice/settings/data-sources/%s/edit' % data_source.id) |
|
795 |
resp.forms[0]['qs_data$element0key'] = 'arg1' |
|
796 |
resp.forms[0]['qs_data$element0value$value_template'] = '{{ foobar }}' |
|
797 |
resp.forms[0]['qs_data$element0value$type'] = 'template' |
|
798 |
resp = resp.forms[0].submit('submit') |
|
799 | ||
800 |
data_source = NamedDataSource.get(data_source.id) |
|
801 |
assert data_source.qs_data == {'arg1': '{{ foobar }}'} |
|
802 | ||
803 | ||
608 | 804 |
def test_data_sources_delete(pub): |
609 | 805 |
create_superuser(pub) |
610 | 806 |
NamedDataSource.wipe() |
tests/test_datasource.py | ||
---|---|---|
424 | 424 |
] |
425 | 425 | |
426 | 426 | |
427 |
def test_json_datasource_bad_url(pub, error_email, http_requests, emails, caplog):
|
|
427 |
def test_json_datasource_bad_url(pub, error_email, http_requests, emails): |
|
428 | 428 |
datasource = {'type': 'json', 'value': 'http://remote.example.net/404'} |
429 | 429 |
assert data_sources.get_items(datasource) == [] |
430 | 430 |
assert emails.count() == 0 |
... | ... | |
530 | 530 |
) |
531 | 531 | |
532 | 532 | |
533 |
@pytest.mark.parametrize('notify', [True, False]) |
|
534 |
@pytest.mark.parametrize('record', [True, False]) |
|
535 |
def test_json_datasource_bad_qs_data(pub, error_email, emails, notify, record): |
|
536 |
datasource = { |
|
537 |
'type': 'json', |
|
538 |
'value': "https://whatever.com/json", |
|
539 |
'qs_data': {'foo': '{% for invalid %}', 'bar': '{{ valid }}'}, |
|
540 |
'notify_on_errors': notify, |
|
541 |
'record_on_errors': record, |
|
542 |
} |
|
543 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
|
544 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': [{'id': '1', 'text': 'foo'}]})) |
|
545 |
assert data_sources.get_items(datasource) == [('1', 'foo', '1', {'id': '1', 'text': 'foo'})] |
|
546 |
url = urlopen.call_args[0][0] |
|
547 |
assert url == 'https://whatever.com/json?bar=' |
|
548 |
message = '[DATASOURCE] Failed to compute value "{% for invalid %}" for "foo" query parameter' |
|
549 |
if notify: |
|
550 |
assert emails.count() == 1 |
|
551 |
assert message in emails.get_latest('subject') |
|
552 |
else: |
|
553 |
assert emails.count() == 0 |
|
554 |
if pub.is_using_postgresql(): |
|
555 |
if record: |
|
556 |
assert pub.loggederror_class.count() == 1 |
|
557 |
logged_error = pub.loggederror_class.select(order_by='id')[0] |
|
558 |
assert logged_error.summary == message |
|
559 |
else: |
|
560 |
assert pub.loggederror_class.count() == 0 |
|
561 | ||
562 | ||
533 | 563 |
def test_geojson_datasource(pub, requests_pub, http_requests): |
534 | 564 |
get_request() |
535 | 565 |
get_request().datasources_cache = {} |
... | ... | |
1065 | 1095 |
] |
1066 | 1096 | |
1067 | 1097 | |
1068 |
def test_data_source_signed(no_request_pub): |
|
1098 |
@pytest.mark.parametrize('qs_data', [{}, {'arg1': 'val1', 'arg2': 'val2'}]) |
|
1099 |
def test_data_source_signed(no_request_pub, qs_data): |
|
1069 | 1100 |
NamedDataSource.wipe() |
1070 | 1101 |
data_source = NamedDataSource(name='foobar') |
1071 | 1102 |
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json"} |
1103 |
data_source.qs_data = qs_data |
|
1072 | 1104 |
data_source.store() |
1073 | 1105 | |
1074 | 1106 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
... | ... | |
1084 | 1116 |
assert querystring['nonce'][0] |
1085 | 1117 |
assert querystring['timestamp'][0] |
1086 | 1118 |
assert querystring['signature'][0] |
1119 |
if qs_data: |
|
1120 |
assert querystring['arg1'][0] == 'val1' |
|
1121 |
assert querystring['arg2'][0] == 'val2' |
|
1087 | 1122 | |
1088 | 1123 |
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json?foo=bar"} |
1089 | 1124 |
data_source.store() |
... | ... | |
1100 | 1135 |
assert querystring['timestamp'][0] |
1101 | 1136 |
assert querystring['signature'][0] |
1102 | 1137 |
assert querystring['foo'][0] == 'bar' |
1138 |
if qs_data: |
|
1139 |
assert querystring['arg1'][0] == 'val1' |
|
1140 |
assert querystring['arg2'][0] == 'val2' |
|
1103 | 1141 | |
1104 | 1142 |
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json"} |
1105 | 1143 |
data_source.store() |
... | ... | |
1107 | 1145 |
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}') |
1108 | 1146 |
assert len(data_sources.get_items({'type': 'foobar'})) == 1 |
1109 | 1147 |
unsigned_url = urlopen.call_args[0][0] |
1110 |
assert unsigned_url == 'https://no-secret.example.com/json' |
|
1148 |
if qs_data: |
|
1149 |
assert unsigned_url == 'https://no-secret.example.com/json?arg1=val1&arg2=val2' |
|
1150 |
else: |
|
1151 |
assert unsigned_url == 'https://no-secret.example.com/json' |
|
1152 | ||
1153 |
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json?foo=bar"} |
|
1154 |
data_source.store() |
|
1155 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
|
1156 |
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}') |
|
1157 |
assert len(data_sources.get_items({'type': 'foobar'})) == 1 |
|
1158 |
unsigned_url = urlopen.call_args[0][0] |
|
1159 |
if qs_data: |
|
1160 |
assert unsigned_url == 'https://no-secret.example.com/json?foo=bar&arg1=val1&arg2=val2' |
|
1161 |
else: |
|
1162 |
assert unsigned_url == 'https://no-secret.example.com/json?foo=bar' |
|
1111 | 1163 | |
1112 | 1164 | |
1113 | 1165 |
def test_named_datasource_json_cache(requests_pub): |
... | ... | |
1326 | 1378 |
export = ET.tostring(data_source.export_to_xml(include_id=True)) |
1327 | 1379 |
data_source3 = NamedDataSource.import_from_xml_tree(ET.fromstring(export), include_id=True) |
1328 | 1380 |
assert data_source3.category_id is None |
1381 | ||
1382 | ||
1383 |
def test_data_source_with_qs_data(pub): |
|
1384 |
data_source = NamedDataSource(name='test') |
|
1385 |
data_source.qs_data = {'arg1': 'val1', 'arg2': 'val2'} |
|
1386 |
data_source.store() |
|
1387 |
data_source2 = assert_import_export_works(data_source, include_id=True) |
|
1388 |
assert data_source2.qs_data == {'arg1': 'val1', 'arg2': 'val2'} |
tests/test_datasource_chrono.py | ||
---|---|---|
208 | 208 |
pub.load_site_options() |
209 | 209 |
NamedDataSource.wipe() |
210 | 210 | |
211 |
# create some datasource, with same urls, but not external
|
|
211 |
# create some datasource, with same urls, but external != 'agenda'
|
|
212 | 212 |
ds = NamedDataSource(name='Foo A') |
213 | 213 |
ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-A/datetimes/'} |
214 | 214 |
ds.store() |
215 | 215 |
ds = NamedDataSource(name='Foo B') |
216 | 216 |
ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-B/datetimes/'} |
217 | 217 |
ds.store() |
218 |
ds = NamedDataSource(name='Foo A') |
|
219 |
ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-A/datetimes/'} |
|
220 |
ds.external = 'agenda_manual' |
|
221 |
ds.store() |
|
222 |
ds = NamedDataSource(name='Foo B') |
|
223 |
ds.external = 'agenda_manual' |
|
224 |
ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-B/datetimes/'} |
|
225 |
ds.store() |
|
218 | 226 | |
219 | 227 |
# error during collect |
220 | 228 |
mock_collect.return_value = None |
221 | 229 |
build_agenda_datasources(pub) |
222 |
assert NamedDataSource.count() == 2 # no changes
|
|
230 |
assert NamedDataSource.count() == 4 # no changes
|
|
223 | 231 | |
224 | 232 |
# no agenda datasource found in chrono |
225 | 233 |
mock_collect.return_value = [] |
226 | 234 |
build_agenda_datasources(pub) |
227 |
assert NamedDataSource.count() == 2 # no changes
|
|
235 |
assert NamedDataSource.count() == 4 # no changes
|
|
228 | 236 | |
229 | 237 |
# 2 agenda datasources found |
230 | 238 |
mock_collect.return_value = [ |
... | ... | |
242 | 250 | |
243 | 251 |
# agenda datasources does not exist, create them |
244 | 252 |
build_agenda_datasources(pub) |
245 |
assert NamedDataSource.count() == 2 + 2
|
|
246 |
datasource1 = NamedDataSource.get(2 + 1)
|
|
247 |
datasource2 = NamedDataSource.get(2 + 2)
|
|
253 |
assert NamedDataSource.count() == 4 + 2
|
|
254 |
datasource1 = NamedDataSource.get(4 + 1)
|
|
255 |
datasource2 = NamedDataSource.get(4 + 2)
|
|
248 | 256 |
assert datasource1.name == 'Events A' |
249 | 257 |
assert datasource1.slug == 'chrono_ds_sluga' |
250 | 258 |
assert datasource1.external == 'agenda' |
... | ... | |
270 | 278 |
datasource2.slug = 'wrong_again' |
271 | 279 |
datasource2.store() |
272 | 280 |
build_agenda_datasources(pub) |
273 |
assert NamedDataSource.count() == 2 + 2
|
|
274 |
datasource1 = NamedDataSource.get(2 + 1)
|
|
275 |
datasource2 = NamedDataSource.get(2 + 2)
|
|
281 |
assert NamedDataSource.count() == 4 + 2
|
|
282 |
datasource1 = NamedDataSource.get(4 + 1)
|
|
283 |
datasource2 = NamedDataSource.get(4 + 2)
|
|
276 | 284 |
assert datasource1.name == 'Events A' |
277 | 285 |
assert datasource1.slug == 'wrong' |
278 | 286 |
assert datasource2.name == 'Events B' |
... | ... | |
283 | 291 |
datasource1.store() |
284 | 292 | |
285 | 293 |
build_agenda_datasources(pub) |
286 |
assert NamedDataSource.count() == 2 + 2
|
|
294 |
assert NamedDataSource.count() == 4 + 2
|
|
287 | 295 |
# first datasource was deleted, because not found and not used |
288 |
datasource2 = NamedDataSource.get(2 + 2)
|
|
289 |
datasource3 = NamedDataSource.get(2 + 3)
|
|
296 |
datasource2 = NamedDataSource.get(4 + 2)
|
|
297 |
datasource3 = NamedDataSource.get(4 + 3)
|
|
290 | 298 |
assert datasource2.name == 'Events B' |
291 | 299 |
assert datasource2.external == 'agenda' |
292 | 300 |
assert datasource2.external_status is None |
... | ... | |
314 | 322 |
datasource3.data_source['value'] = 'http://chrono.example.net/api/agenda/events-FOOBAR/datetimes/' |
315 | 323 |
datasource3.store() |
316 | 324 |
build_agenda_datasources(pub) |
317 |
assert NamedDataSource.count() == 2 + 3
|
|
318 |
datasource2 = NamedDataSource.get(2 + 2)
|
|
319 |
datasource3 = NamedDataSource.get(2 + 3)
|
|
320 |
datasource4 = NamedDataSource.get(2 + 4)
|
|
325 |
assert NamedDataSource.count() == 4 + 3
|
|
326 |
datasource2 = NamedDataSource.get(4 + 2)
|
|
327 |
datasource3 = NamedDataSource.get(4 + 3)
|
|
328 |
datasource4 = NamedDataSource.get(4 + 4)
|
|
321 | 329 |
assert datasource2.name == 'Events B' |
322 | 330 |
assert datasource2.slug == 'wrong_again' |
323 | 331 |
assert datasource2.external == 'agenda' |
... | ... | |
347 | 355 |
datasource4.external_status = 'not-found' |
348 | 356 |
datasource4.store() |
349 | 357 |
build_agenda_datasources(pub) |
350 |
assert NamedDataSource.count() == 2 + 3
|
|
351 |
datasource4 = NamedDataSource.get(2 + 4)
|
|
358 |
assert NamedDataSource.count() == 4 + 3
|
|
359 |
datasource4 = NamedDataSource.get(4 + 4)
|
|
352 | 360 |
assert datasource4.external_status is None |
tests/test_snapshots.py | ||
---|---|---|
496 | 496 |
assert '<p>%s</p>' % localstrftime(snapshot.timestamp) in resp.text |
497 | 497 |
with pytest.raises(IndexError): |
498 | 498 |
resp = resp.click('Edit') |
499 |
with pytest.raises(IndexError): |
|
500 |
resp = resp.click('Duplicate') |
|
499 | 501 | |
500 | 502 | |
501 | 503 |
def test_form_snapshot_browse(pub, formdef_with_history): |
wcs/admin/data_sources.py | ||
---|---|---|
36 | 36 |
from wcs.qommon.errors import AccessForbiddenError |
37 | 37 |
from wcs.qommon.form import ( |
38 | 38 |
CheckboxWidget, |
39 |
ComputedExpressionWidget, |
|
39 | 40 |
DurationWidget, |
40 | 41 |
FileWidget, |
41 | 42 |
Form, |
... | ... | |
44 | 45 |
SlugWidget, |
45 | 46 |
StringWidget, |
46 | 47 |
TextWidget, |
48 |
WidgetDict, |
|
47 | 49 |
WidgetList, |
48 | 50 |
get_response, |
49 | 51 |
get_session, |
... | ... | |
78 | 80 |
rows=5, |
79 | 81 |
value=self.datasource.description, |
80 | 82 |
) |
81 |
if not self.datasource or self.datasource.type != 'wcs:users': |
|
83 |
if not self.datasource or ( |
|
84 |
self.datasource.type != 'wcs:users' and self.datasource.external != 'agenda_manual' |
|
85 |
): |
|
82 | 86 |
form.add( |
83 | 87 |
DataSourceSelectionWidget, |
84 | 88 |
'data_source', |
... | ... | |
105 | 109 |
'data-dynamic-display-value-in': 'json|geojson', |
106 | 110 |
}, |
107 | 111 |
) |
108 |
if not self.datasource or self.datasource.type != 'wcs:users': |
|
112 |
if not self.datasource or ( |
|
113 |
self.datasource.type != 'wcs:users' and self.datasource.external != 'agenda_manual' |
|
114 |
): |
|
109 | 115 |
form.add( |
110 | 116 |
StringWidget, |
111 | 117 |
'query_parameter', |
... | ... | |
249 | 255 |
title=_('Record on errors'), |
250 | 256 |
value=self.datasource.record_on_errors, |
251 | 257 |
) |
258 |
if self.datasource.external == 'agenda_manual': |
|
259 |
form.add( |
|
260 |
WidgetDict, |
|
261 |
'qs_data', |
|
262 |
title=_('Query string data'), |
|
263 |
value=self.datasource.qs_data or {}, |
|
264 |
element_value_type=ComputedExpressionWidget, |
|
265 |
element_value_kwargs={'allow_python': False}, |
|
266 |
allow_empty_values=True, |
|
267 |
value_for_empty_value='', |
|
268 |
) |
|
252 | 269 | |
253 | 270 |
if not self.datasource.is_readonly(): |
254 | 271 |
form.add_submit('submit', _('Submit')) |
... | ... | |
289 | 306 |
'edit', |
290 | 307 |
'delete', |
291 | 308 |
'export', |
309 |
'duplicate', |
|
292 | 310 |
('history', 'snapshots_dir'), |
293 | 311 |
] |
294 | 312 |
do_not_call_in_templates = True |
... | ... | |
311 | 329 |
if hasattr(self.datasource, 'snapshot_object'): |
312 | 330 |
r += utils.snapshot_info_block(snapshot=self.datasource.snapshot_object) |
313 | 331 |
r += htmltext('<ul id="sidebar-actions">') |
332 |
if self.datasource.external == 'agenda' or not self.datasource.is_readonly(): |
|
333 |
r += htmltext('<li><a href="duplicate" rel="popup">%s</a></li>') % _('Duplicate') |
|
314 | 334 |
if not self.datasource.is_readonly(): |
315 | 335 |
r += htmltext('<li><a href="delete" rel="popup">%s</a></li>') % _('Delete') |
316 | 336 |
r += htmltext('<li><a href="export">%s</a></li>') % _('Export') |
... | ... | |
444 | 464 |
content_type='application/x-wcs-datasource', |
445 | 465 |
) |
446 | 466 | |
467 |
def duplicate(self): |
|
468 |
if hasattr(self.datasource, 'snapshot_object'): |
|
469 |
return redirect('.') |
|
470 | ||
471 |
form = Form(enctype='multipart/form-data') |
|
472 |
form.add_submit('duplicate', _('Submit')) |
|
473 |
form.add_submit('cancel', _('Cancel')) |
|
474 |
if form.get_widget('cancel').parse(): |
|
475 |
return redirect('.') |
|
476 |
if not form.is_submitted() or form.has_errors(): |
|
477 |
get_response().breadcrumb.append(('duplicate', _('Duplicate'))) |
|
478 |
html_top('datasources', title=_('Duplicate Data Source')) |
|
479 |
r = TemplateIO(html=True) |
|
480 |
r += htmltext('<h2>%s %s</h2>') % (_('Duplicating Data Source:'), self.datasource.name) |
|
481 |
r += form.render() |
|
482 |
return r.getvalue() |
|
483 | ||
484 |
tree = self.datasource.export_to_xml(include_id=True) |
|
485 |
new_datasource = NamedDataSource.import_from_xml_tree(tree) |
|
486 |
new_datasource.name = _('Copy of %s' % new_datasource.name) |
|
487 |
new_datasource.slug = new_datasource.get_new_slug(new_datasource.slug) |
|
488 |
if self.datasource.agenda_ds: |
|
489 |
new_datasource.external = 'agenda_manual' |
|
490 |
new_datasource.store() |
|
491 |
return redirect('../%s' % new_datasource.id) |
|
492 | ||
447 | 493 | |
448 | 494 |
class NamedDataSourcesDirectory(Directory): |
449 | 495 |
_q_exports = [ |
wcs/data_sources.py | ||
---|---|---|
397 | 397 |
if Template.is_template_string(url): |
398 | 398 |
vars = get_publisher().substitutions.get_context_variables(mode='lazy') |
399 | 399 |
url = get_variadic_url(url, vars) |
400 |
if data_source.get('qs_data'): # merge qs_data into url |
|
401 |
from wcs.workflows import WorkflowStatusItem |
|
402 | ||
403 |
parsed = urllib.parse.urlparse(url) |
|
404 |
qs = list(urllib.parse.parse_qsl(parsed.query)) |
|
405 |
for key, value in data_source['qs_data'].items(): |
|
406 |
try: |
|
407 |
value = WorkflowStatusItem.compute(value, raises=True, record_errors=False) |
|
408 |
value = str(value) if value is not None else '' |
|
409 |
except Exception as e: |
|
410 |
get_publisher().record_error( |
|
411 |
_( |
|
412 |
'Failed to compute value "%(value)s" for "%(query)s" query parameter' |
|
413 |
% {'value': value, 'query': key} |
|
414 |
), |
|
415 |
context='[DATASOURCE]', |
|
416 |
exception=e, |
|
417 |
notify=data_source.get('notify_on_errors'), |
|
418 |
record=data_source.get('record_on_errors'), |
|
419 |
) |
|
420 |
else: |
|
421 |
key = force_str(key) |
|
422 |
value = force_str(value) |
|
423 |
qs.append((key, value)) |
|
424 |
qs = urllib.parse.urlencode(qs) |
|
425 |
url = urllib.parse.urlunparse(parsed[:4] + (qs,) + parsed[5:6]) |
|
400 | 426 | |
401 | 427 |
request = get_request() |
402 | 428 |
if hasattr(request, 'datasources_cache') and url in request.datasources_cache: |
... | ... | |
473 | 499 |
id_attribute = None |
474 | 500 |
text_attribute = None |
475 | 501 |
id_property = None |
502 |
qs_data = None |
|
476 | 503 |
label_template_property = None |
477 | 504 |
external = None |
478 | 505 |
external_status = None |
... | ... | |
497 | 524 |
('id_attribute', 'str'), |
498 | 525 |
('text_attribute', 'str'), |
499 | 526 |
('id_property', 'str'), |
527 |
('qs_data', 'qs_data'), |
|
500 | 528 |
('label_template_property', 'str'), |
501 | 529 |
('external', 'str'), |
502 | 530 |
('external_status', 'str'), |
... | ... | |
554 | 582 |
'data_attribute': self.data_attribute, |
555 | 583 |
'id_attribute': self.id_attribute, |
556 | 584 |
'text_attribute': self.text_attribute, |
585 |
'qs_data': self.qs_data, |
|
557 | 586 |
'notify_on_errors': notify_on_errors, |
558 | 587 |
'record_on_errors': record_on_errors, |
559 | 588 |
} |
... | ... | |
583 | 612 |
def maybe_datetimes(self): |
584 | 613 |
return self.type == 'json' and 'datetimes' in (self.data_source.get('value') or '') |
585 | 614 | |
615 |
@property |
|
616 |
def agenda_ds(self): |
|
617 |
return self.external in ['agenda', 'agenda_manual'] |
|
618 | ||
619 |
@property |
|
620 |
def agenda_ds_origin(self): |
|
621 |
if self.external != 'agenda_manual': |
|
622 |
return |
|
623 |
for datasource in NamedDataSource.select(): |
|
624 |
if datasource.external != 'agenda': |
|
625 |
continue |
|
626 |
if datasource.data_source.get('value') == self.data_source.get('value'): |
|
627 |
return datasource |
|
628 | ||
586 | 629 |
def migrate(self): |
587 | 630 |
changed = False |
588 | 631 | |
... | ... | |
631 | 674 |
'value': force_str(element.find('value').text or ''), |
632 | 675 |
} |
633 | 676 | |
677 |
def export_qs_data_to_xml(self, element, attribute_name, *args, **kwargs): |
|
678 |
if not self.qs_data: |
|
679 |
return |
|
680 |
for (key, value) in self.qs_data.items(): |
|
681 |
item = ET.SubElement(element, 'item') |
|
682 |
if isinstance(key, str): |
|
683 |
ET.SubElement(item, 'name').text = force_text(key) |
|
684 |
else: |
|
685 |
raise AssertionError('unknown type for key (%r)' % key) |
|
686 |
if isinstance(value, str): |
|
687 |
ET.SubElement(item, 'value').text = force_text(value) |
|
688 |
else: |
|
689 |
raise AssertionError('unknown type for value (%r)' % key) |
|
690 | ||
691 |
def import_qs_data_from_xml(self, element, **kwargs): |
|
692 |
if element is None: |
|
693 |
return |
|
694 |
qs_data = {} |
|
695 |
for item in element.findall('item'): |
|
696 |
key = force_str(item.find('name').text) |
|
697 |
value = force_str(item.find('value').text or '') |
|
698 |
qs_data[key] = value |
|
699 |
return qs_data |
|
700 | ||
634 | 701 |
def get_dependencies(self): |
635 | 702 |
yield self.category |
636 | 703 | |
... | ... | |
654 | 721 |
return data_source |
655 | 722 | |
656 | 723 |
def get_json_query_url(self): |
657 |
url = self.data_source.get('value').strip() |
|
658 |
if Template.is_template_string(url): |
|
659 |
vars = get_publisher().substitutions.get_context_variables(mode='lazy') |
|
660 |
url = get_variadic_url(url, vars) |
|
724 |
url = self.get_variadic_url() |
|
661 | 725 |
if not url: |
662 | 726 |
return '' |
663 | 727 |
if '?' not in url: |
... | ... | |
721 | 785 |
def get_geojson_url(self): |
722 | 786 |
assert self.type == 'geojson' |
723 | 787 |
url = self.data_source.get('value').strip() |
724 |
if Template.is_template_string(url): |
|
725 |
context = get_publisher().substitutions.get_context_variables(mode='lazy') |
|
726 |
new_url = get_variadic_url(url, context) |
|
727 |
if new_url != url: |
|
728 |
token_context = {'session_id': get_session().id, 'url': new_url, 'slug': self.slug} |
|
729 |
token, created = get_publisher().token_class.get_or_create( |
|
730 |
type='autocomplete', context=token_context |
|
731 |
) |
|
732 |
if created: |
|
733 |
token.store() |
|
734 |
return '/api/geojson/%s' % token.id |
|
788 |
new_url = self.get_variadic_url() |
|
789 |
if new_url != url: |
|
790 |
token_context = {'session_id': get_session().id, 'url': new_url, 'slug': self.slug} |
|
791 |
token, created = get_publisher().token_class.get_or_create( |
|
792 |
type='autocomplete', context=token_context |
|
793 |
) |
|
794 |
if created: |
|
795 |
token.store() |
|
796 |
return '/api/geojson/%s' % token.id |
|
735 | 797 |
return '/api/geojson/%s' % self.slug |
736 | 798 | |
737 | 799 |
def get_geojson_data(self, force_url=None): |
738 | 800 |
if force_url: |
739 | 801 |
url = force_url |
740 | 802 |
else: |
741 |
url = self.data_source.get('value').strip() |
|
742 |
if Template.is_template_string(url): |
|
743 |
context = get_publisher().substitutions.get_context_variables(mode='lazy') |
|
744 |
url = get_variadic_url(url, context) |
|
803 |
url = self.get_variadic_url() |
|
745 | 804 | |
746 | 805 |
request = get_request() |
747 | 806 |
if hasattr(request, 'datasources_cache') and url in request.datasources_cache: |
... | ... | |
782 | 841 |
return data |
783 | 842 | |
784 | 843 |
def get_value_by_id(self, param_name, param_value): |
785 |
url = self.data_source.get('value').strip() |
|
786 |
if Template.is_template_string(url): |
|
787 |
vars = get_publisher().substitutions.get_context_variables(mode='lazy') |
|
788 |
url = get_variadic_url(url, vars) |
|
844 |
url = self.get_variadic_url() |
|
789 | 845 | |
790 | 846 |
if '?' not in url: |
791 | 847 |
url += '?' |
wcs/templates/wcs/backoffice/data-source.html | ||
---|---|---|
18 | 18 |
<div class="section"> |
19 | 19 |
<h3>{% trans "Configuration" %}</h3> |
20 | 20 |
<ul> |
21 |
<li>{% trans "Type of source:" %} {{ datasource.type_label }}</li> |
|
22 |
{% if datasource.data_source.type == 'json' or datasource.data_source.type == 'jsonp' or datasource.data_source.type == 'geojson' %} |
|
23 |
<li>{% trans "URL:" %} <a href="{{ url }}">{{ datasource.data_source.value }}</a></li> |
|
24 |
{% elif datasource.data_source.type == 'formula' %} |
|
25 |
<li>{% trans "Python Expression:" %} {{ datasource.data_source.value }}</li> |
|
26 |
{% elif datasource.data_source.type == 'wcs:users' %} |
|
27 |
{% spaceless %} |
|
28 |
<li>{% trans "Users with roles:" %} |
|
29 |
<ul> |
|
30 |
{% for role in roles %} |
|
31 |
{% if role.0 in datasource.users_included_roles %} |
|
32 |
<li>{{ role.1 }}</li> |
|
33 |
{% endif %} |
|
21 |
{% if datasource.agenda_ds %} |
|
22 |
{% with datasource.agenda_ds_origin as origin %} |
|
23 |
<li>{% trans "Type of source:" %} {% trans "Agenda data" %}{% if origin %} ({% trans "Copy of:" %} <a href="{{ origin.get_admin_url }}">{{ origin.name }}</a>){% endif %}</li> |
|
24 |
<li>{% trans "URL:" %} <a href="{{ url }}">{{ datasource.data_source.value }}</a></li> |
|
25 |
{% if datasource.qs_data %} |
|
26 |
<li>{% trans "Extra query string data:" %} |
|
27 |
<ul> |
|
28 |
{% for k, v in datasource.qs_data.items %} |
|
29 |
<li>{% blocktrans %}{{ k }}:{% endblocktrans %} {{ v }}</li> |
|
34 | 30 |
{% endfor %} |
35 |
</ul> |
|
36 |
</li> |
|
37 |
<li>{% trans "Users without roles:" %} |
|
38 |
<ul> |
|
39 |
{% for role in roles %} |
|
40 |
{% if role.0 in datasource.users_excluded_roles %} |
|
41 |
<li>{{ role.1 }}</li> |
|
42 |
{% endif %} |
|
43 |
{% endfor %} |
|
44 |
</ul> |
|
45 |
</li> |
|
46 |
<li>{% trans "Include disabled users:" %} |
|
47 |
{% if datasource.include_disabled_users %} |
|
48 |
{% trans "Yes" %} |
|
49 |
{% else %} |
|
50 |
{% trans "No" %} |
|
31 |
</ul> |
|
32 |
</li> |
|
33 |
{% endif %} |
|
34 |
{% endwith %} |
|
35 |
{% else %} |
|
36 |
<li>{% trans "Type of source:" %} {{ datasource.type_label }}</li> |
|
37 |
{% if datasource.data_source.type == 'json' or datasource.data_source.type == 'jsonp' or datasource.data_source.type == 'geojson' %} |
|
38 |
<li>{% trans "URL:" %} <a href="{{ url }}">{{ datasource.data_source.value }}</a></li> |
|
39 |
{% elif datasource.data_source.type == 'formula' %} |
|
40 |
<li>{% trans "Python Expression:" %} {{ datasource.data_source.value }}</li> |
|
41 |
{% elif datasource.data_source.type == 'wcs:users' %} |
|
42 |
{% spaceless %} |
|
43 |
<li>{% trans "Users with roles:" %} |
|
44 |
<ul> |
|
45 |
{% for role in roles %} |
|
46 |
{% if role.0 in datasource.users_included_roles %} |
|
47 |
<li>{{ role.1 }}</li> |
|
48 |
{% endif %} |
|
49 |
{% endfor %} |
|
50 |
</ul> |
|
51 |
</li> |
|
52 |
<li>{% trans "Users without roles:" %} |
|
53 |
<ul> |
|
54 |
{% for role in roles %} |
|
55 |
{% if role.0 in datasource.users_excluded_roles %} |
|
56 |
<li>{{ role.1 }}</li> |
|
57 |
{% endif %} |
|
58 |
{% endfor %} |
|
59 |
</ul> |
|
60 |
</li> |
|
61 |
<li>{% trans "Include disabled users:" %} |
|
62 |
{% if datasource.include_disabled_users %} |
|
63 |
{% trans "Yes" %} |
|
64 |
{% else %} |
|
65 |
{% trans "No" %} |
|
66 |
{% endif %} |
|
67 |
</li> |
|
68 |
{% endspaceless %} |
|
51 | 69 |
{% endif %} |
52 |
</li> |
|
53 |
{% endspaceless %} |
|
54 | 70 |
{% endif %} |
55 | 71 |
{% if datasource.cache_duration %} |
56 | 72 |
<li>{% trans "Cache Duration:" %} {{ datasource.humanized_cache_duration }} |
57 |
- |