0001-datasource-duplicate-and-configure-agenda-datasource.patch
tests/admin_pages/test_datasource.py | ||
---|---|---|
271 | 271 |
] |
272 | 272 | |
273 | 273 | |
274 |
def test_data_sources_agenda_manual_qs_data_type_options(pub): |
|
275 |
create_superuser(pub) |
|
276 | ||
277 |
data_source = NamedDataSource(name='foobar') |
|
278 |
data_source.external = 'agenda_manual' |
|
279 |
data_source.store() |
|
280 | ||
281 |
if not pub.site_options.has_section('options'): |
|
282 |
pub.site_options.add_section('options') |
|
283 |
pub.site_options.set('options', 'disable-python-expressions', 'false') |
|
284 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
285 |
pub.site_options.write(fd) |
|
286 | ||
287 |
app = login(get_app(pub)) |
|
288 |
resp = app.get('/backoffice/settings/data-sources/%s/edit' % data_source.id) |
|
289 |
assert resp.form['qs_data$element0value$type'].options == [ |
|
290 |
('text', False, 'Text'), |
|
291 |
('template', False, 'Template'), |
|
292 |
] |
|
293 | ||
294 |
pub.site_options.set('options', 'disable-python-expressions', 'true') |
|
295 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
296 |
pub.site_options.write(fd) |
|
297 | ||
298 |
resp = app.get('/backoffice/settings/data-sources/%s/edit' % data_source.id) |
|
299 |
assert resp.form['qs_data$element0value$type'].options == [ |
|
300 |
('text', False, 'Text'), |
|
301 |
('template', False, 'Template'), |
|
302 |
] |
|
303 | ||
304 | ||
274 | 305 |
def test_data_sources_category(pub): |
275 | 306 |
create_superuser(pub) |
276 | 307 | |
... | ... | |
357 | 388 |
data_source.data_source = {'type': 'formula', 'value': '[]'} |
358 | 389 |
data_source.store() |
359 | 390 |
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id) |
391 |
assert 'This data source is readonly.' not in resp |
|
392 |
assert 'href="edit"' in resp |
|
393 |
assert 'href="delete"' in resp |
|
394 |
assert 'href="duplicate"' not in resp |
|
360 | 395 |
assert 'Type of source: Python Expression' in resp.text |
361 | 396 |
assert 'Python Expression' in resp.text |
362 | 397 |
assert 'Preview' not in resp.text |
... | ... | |
543 | 578 |
app = login(get_app(pub)) |
544 | 579 |
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id) |
545 | 580 |
assert 'This data source is readonly.' in resp |
546 |
assert '/backoffice/settings/data-sources/%s/edit' % data_source.id not in resp |
|
547 |
assert '/backoffice/settings/data-sources/%s/delete' % data_source.id not in resp |
|
581 |
assert 'href="edit"' not in resp |
|
582 |
assert 'href="delete"' not in resp |
|
583 |
assert 'href="duplicate"' in resp |
|
584 | ||
585 | ||
586 |
def test_data_sources_agenda_duplicate(pub): |
|
587 |
create_superuser(pub) |
|
588 |
NamedDataSource.wipe() |
|
589 | ||
590 |
data_source = NamedDataSource(name='foobar') |
|
591 |
data_source.data_source = {'type': 'json', 'value': 'http://some.url'} |
|
592 |
data_source.external = 'agenda' |
|
593 |
data_source.store() |
|
594 | ||
595 |
app = login(get_app(pub)) |
|
596 |
resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id) |
|
597 |
resp = resp.forms[0].submit('cancel') |
|
598 |
assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s/' % data_source.id |
|
599 |
assert NamedDataSource.count() == 1 |
|
600 | ||
601 |
resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id) |
|
602 |
resp = resp.click(href='duplicate') |
|
603 |
resp = resp.forms[0].submit() |
|
604 |
assert NamedDataSource.count() == 2 |
|
605 |
new_data_source = NamedDataSource.select(order_by='id')[1] |
|
606 |
assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s' % new_data_source.id |
|
607 |
assert new_data_source.data_source == {'type': 'json', 'value': 'http://some.url'} |
|
608 |
assert new_data_source.external == 'agenda_manual' |
|
609 |
assert new_data_source.qs_data is None |
|
610 | ||
611 | ||
612 |
def test_data_sources_agenda_manual_view(pub): |
|
613 |
create_superuser(pub) |
|
614 |
NamedDataSource.wipe() |
|
615 | ||
616 |
data_source = NamedDataSource(name='foobar') |
|
617 |
data_source.data_source = {'type': 'json', 'value': 'http://some.url'} |
|
618 |
data_source.external = 'agenda_manual' |
|
619 |
data_source.qs_data = {'var1': 'value1', 'var2': 'value2'} |
|
620 |
data_source.store() |
|
621 | ||
622 |
app = login(get_app(pub)) |
|
623 |
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id) |
|
624 |
assert 'This data source is readonly.' not in resp |
|
625 |
assert 'href="edit"' in resp |
|
626 |
assert 'href="delete"' in resp |
|
627 |
assert 'href="duplicate"' not in resp |
|
628 |
assert 'Query string data' in resp |
|
629 |
assert '<li>var1: value1</li>' in resp |
|
630 |
assert '<li>var2: value2</li> in resp' |
|
631 | ||
632 |
data_source.qs_data = None |
|
633 |
data_source.store() |
|
634 |
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id) |
|
635 |
assert 'Query string data' not in resp |
|
636 | ||
637 | ||
638 |
def test_data_sources_user_view(pub): |
|
639 |
create_superuser(pub) |
|
640 |
NamedDataSource.wipe() |
|
641 | ||
642 |
data_source = NamedDataSource(name='foobar') |
|
643 |
data_source.data_source = {'type': 'wcs:users'} |
|
644 |
data_source.store() |
|
645 | ||
646 |
app = login(get_app(pub)) |
|
647 |
resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id) |
|
648 |
assert 'This data source is readonly.' not in resp |
|
649 |
assert 'href="edit"' in resp |
|
650 |
assert 'href="delete"' in resp |
|
651 |
assert 'href="duplicate"' not in resp |
|
548 | 652 | |
549 | 653 | |
550 | 654 |
def test_data_sources_edit(pub): |
... | ... | |
612 | 716 |
assert resp.location == 'http://example.net/backoffice/settings/data-sources/1/' |
613 | 717 | |
614 | 718 | |
719 |
def test_data_sources_agenda_manual_edit(pub): |
|
720 |
create_superuser(pub) |
|
721 |
NamedDataSource.wipe() |
|
722 | ||
723 |
data_source = NamedDataSource(name='foobar') |
|
724 |
data_source.external = 'agenda_manual' |
|
725 |
data_source.store() |
|
726 | ||
727 |
app = login(get_app(pub)) |
|
728 |
resp = app.get('/backoffice/settings/data-sources/%s/edit' % data_source.id) |
|
729 |
resp.forms[0]['qs_data$element0key'] = 'arg1' |
|
730 |
resp.forms[0]['qs_data$element0value$value_template'] = '{{ foobar }}' |
|
731 |
resp.forms[0]['qs_data$element0value$type'] = 'template' |
|
732 |
resp = resp.forms[0].submit('submit') |
|
733 | ||
734 |
data_source = NamedDataSource.get(data_source.id) |
|
735 |
assert data_source.qs_data == {'arg1': '{{ foobar }}'} |
|
736 | ||
737 | ||
615 | 738 |
def test_data_sources_delete(pub): |
616 | 739 |
create_superuser(pub) |
617 | 740 |
NamedDataSource.wipe() |
tests/test_datasource.py | ||
---|---|---|
399 | 399 |
] |
400 | 400 | |
401 | 401 | |
402 |
def test_json_datasource_bad_url(pub, error_email, http_requests, emails, caplog):
|
|
402 |
def test_json_datasource_bad_url(pub, error_email, http_requests, emails): |
|
403 | 403 |
datasource = {'type': 'json', 'value': 'http://remote.example.net/404'} |
404 | 404 |
assert data_sources.get_items(datasource) == [] |
405 | 405 |
assert emails.count() == 0 |
... | ... | |
505 | 505 |
) |
506 | 506 | |
507 | 507 | |
508 |
@pytest.mark.parametrize('notify', [True, False]) |
|
509 |
@pytest.mark.parametrize('record', [True, False]) |
|
510 |
def test_json_datasource_bad_qs_data(pub, error_email, emails, notify, record): |
|
511 |
datasource = { |
|
512 |
'type': 'json', |
|
513 |
'value': "https://whatever.com/json", |
|
514 |
'qs_data': {'foo': '{% for invalid %}', 'bar': '{{ valid }}'}, |
|
515 |
'notify_on_errors': notify, |
|
516 |
'record_on_errors': record, |
|
517 |
} |
|
518 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
|
519 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': [{'id': '1', 'text': 'foo'}]})) |
|
520 |
assert data_sources.get_items(datasource) == [('1', 'foo', '1', {'id': '1', 'text': 'foo'})] |
|
521 |
url = urlopen.call_args[0][0] |
|
522 |
assert url == 'https://whatever.com/json?bar=' |
|
523 |
message = '[DATASOURCE] Failed to compute value "{% for invalid %}" for "foo" query parameter' |
|
524 |
if notify: |
|
525 |
assert emails.count() == 1 |
|
526 |
assert message in emails.get_latest('subject') |
|
527 |
else: |
|
528 |
assert emails.count() == 0 |
|
529 |
if pub.is_using_postgresql(): |
|
530 |
if record: |
|
531 |
assert pub.loggederror_class.count() == 1 |
|
532 |
logged_error = pub.loggederror_class.select(order_by='id')[0] |
|
533 |
assert logged_error.summary == message |
|
534 |
else: |
|
535 |
assert pub.loggederror_class.count() == 0 |
|
536 | ||
537 | ||
508 | 538 |
def test_geojson_datasource(pub, requests_pub, http_requests): |
509 | 539 |
get_request() |
510 | 540 |
get_request().datasources_cache = {} |
... | ... | |
1040 | 1070 |
] |
1041 | 1071 | |
1042 | 1072 | |
1043 |
def test_data_source_signed(no_request_pub): |
|
1073 |
@pytest.mark.parametrize('qs_data', [{}, {'arg1': 'val1', 'arg2': 'val2'}]) |
|
1074 |
def test_data_source_signed(no_request_pub, qs_data): |
|
1044 | 1075 |
NamedDataSource.wipe() |
1045 | 1076 |
data_source = NamedDataSource(name='foobar') |
1046 | 1077 |
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json"} |
1078 |
data_source.qs_data = qs_data |
|
1047 | 1079 |
data_source.store() |
1048 | 1080 | |
1049 | 1081 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
... | ... | |
1059 | 1091 |
assert querystring['nonce'][0] |
1060 | 1092 |
assert querystring['timestamp'][0] |
1061 | 1093 |
assert querystring['signature'][0] |
1094 |
if qs_data: |
|
1095 |
assert querystring['arg1'][0] == 'val1' |
|
1096 |
assert querystring['arg2'][0] == 'val2' |
|
1062 | 1097 | |
1063 | 1098 |
data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json?foo=bar"} |
1064 | 1099 |
data_source.store() |
... | ... | |
1075 | 1110 |
assert querystring['timestamp'][0] |
1076 | 1111 |
assert querystring['signature'][0] |
1077 | 1112 |
assert querystring['foo'][0] == 'bar' |
1113 |
if qs_data: |
|
1114 |
assert querystring['arg1'][0] == 'val1' |
|
1115 |
assert querystring['arg2'][0] == 'val2' |
|
1078 | 1116 | |
1079 | 1117 |
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json"} |
1080 | 1118 |
data_source.store() |
... | ... | |
1082 | 1120 |
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}') |
1083 | 1121 |
assert len(data_sources.get_items({'type': 'foobar'})) == 1 |
1084 | 1122 |
unsigned_url = urlopen.call_args[0][0] |
1085 |
assert unsigned_url == 'https://no-secret.example.com/json' |
|
1123 |
if qs_data: |
|
1124 |
assert unsigned_url == 'https://no-secret.example.com/json?arg1=val1&arg2=val2' |
|
1125 |
else: |
|
1126 |
assert unsigned_url == 'https://no-secret.example.com/json' |
|
1127 | ||
1128 |
data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json?foo=bar"} |
|
1129 |
data_source.store() |
|
1130 |
with mock.patch('wcs.qommon.misc.urlopen') as urlopen: |
|
1131 |
urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}') |
|
1132 |
assert len(data_sources.get_items({'type': 'foobar'})) == 1 |
|
1133 |
unsigned_url = urlopen.call_args[0][0] |
|
1134 |
if qs_data: |
|
1135 |
assert unsigned_url == 'https://no-secret.example.com/json?foo=bar&arg1=val1&arg2=val2' |
|
1136 |
else: |
|
1137 |
assert unsigned_url == 'https://no-secret.example.com/json?foo=bar' |
|
1086 | 1138 | |
1087 | 1139 | |
1088 | 1140 |
def test_named_datasource_json_cache(requests_pub): |
... | ... | |
1301 | 1353 |
export = ET.tostring(data_source.export_to_xml(include_id=True)) |
1302 | 1354 |
data_source3 = NamedDataSource.import_from_xml_tree(ET.fromstring(export), include_id=True) |
1303 | 1355 |
assert data_source3.category_id is None |
1356 | ||
1357 | ||
1358 |
def test_data_source_with_qs_data(pub): |
|
1359 |
data_source = NamedDataSource(name='test') |
|
1360 |
data_source.qs_data = {'arg1': 'val1', 'arg2': 'val2'} |
|
1361 |
data_source.store() |
|
1362 |
data_source2 = assert_import_export_works(data_source, include_id=True) |
|
1363 |
assert data_source2.qs_data == {'arg1': 'val1', 'arg2': 'val2'} |
tests/test_datasource_chrono.py | ||
---|---|---|
208 | 208 |
pub.load_site_options() |
209 | 209 |
NamedDataSource.wipe() |
210 | 210 | |
211 |
# create some datasource, with same urls, but not external
|
|
211 |
# create some datasource, with same urls, but external != 'agenda'
|
|
212 | 212 |
ds = NamedDataSource(name='Foo A') |
213 | 213 |
ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-A/datetimes/'} |
214 | 214 |
ds.store() |
215 | 215 |
ds = NamedDataSource(name='Foo B') |
216 | 216 |
ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-B/datetimes/'} |
217 | 217 |
ds.store() |
218 |
ds = NamedDataSource(name='Foo A') |
|
219 |
ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-A/datetimes/'} |
|
220 |
ds.external = 'agenda_manual' |
|
221 |
ds.store() |
|
222 |
ds = NamedDataSource(name='Foo B') |
|
223 |
ds.external = 'agenda_manual' |
|
224 |
ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-B/datetimes/'} |
|
225 |
ds.store() |
|
218 | 226 | |
219 | 227 |
# error during collect |
220 | 228 |
mock_collect.return_value = None |
221 | 229 |
build_agenda_datasources(pub) |
222 |
assert NamedDataSource.count() == 2 # no changes
|
|
230 |
assert NamedDataSource.count() == 4 # no changes
|
|
223 | 231 | |
224 | 232 |
# no agenda datasource found in chrono |
225 | 233 |
mock_collect.return_value = [] |
226 | 234 |
build_agenda_datasources(pub) |
227 |
assert NamedDataSource.count() == 2 # no changes
|
|
235 |
assert NamedDataSource.count() == 4 # no changes
|
|
228 | 236 | |
229 | 237 |
# 2 agenda datasources found |
230 | 238 |
mock_collect.return_value = [ |
... | ... | |
242 | 250 | |
243 | 251 |
# agenda datasources does not exist, create them |
244 | 252 |
build_agenda_datasources(pub) |
245 |
assert NamedDataSource.count() == 2 + 2
|
|
246 |
datasource1 = NamedDataSource.get(2 + 1)
|
|
247 |
datasource2 = NamedDataSource.get(2 + 2)
|
|
253 |
assert NamedDataSource.count() == 4 + 2
|
|
254 |
datasource1 = NamedDataSource.get(4 + 1)
|
|
255 |
datasource2 = NamedDataSource.get(4 + 2)
|
|
248 | 256 |
assert datasource1.name == 'Events A' |
249 | 257 |
assert datasource1.slug == 'chrono_ds_sluga' |
250 | 258 |
assert datasource1.external == 'agenda' |
... | ... | |
270 | 278 |
datasource2.slug = 'wrong_again' |
271 | 279 |
datasource2.store() |
272 | 280 |
build_agenda_datasources(pub) |
273 |
assert NamedDataSource.count() == 2 + 2
|
|
274 |
datasource1 = NamedDataSource.get(2 + 1)
|
|
275 |
datasource2 = NamedDataSource.get(2 + 2)
|
|
281 |
assert NamedDataSource.count() == 4 + 2
|
|
282 |
datasource1 = NamedDataSource.get(4 + 1)
|
|
283 |
datasource2 = NamedDataSource.get(4 + 2)
|
|
276 | 284 |
assert datasource1.name == 'Events A' |
277 | 285 |
assert datasource1.slug == 'wrong' |
278 | 286 |
assert datasource2.name == 'Events B' |
... | ... | |
283 | 291 |
datasource1.store() |
284 | 292 | |
285 | 293 |
build_agenda_datasources(pub) |
286 |
assert NamedDataSource.count() == 2 + 2
|
|
294 |
assert NamedDataSource.count() == 4 + 2
|
|
287 | 295 |
# first datasource was deleted, because not found and not used |
288 |
datasource2 = NamedDataSource.get(2 + 2)
|
|
289 |
datasource3 = NamedDataSource.get(2 + 3)
|
|
296 |
datasource2 = NamedDataSource.get(4 + 2)
|
|
297 |
datasource3 = NamedDataSource.get(4 + 3)
|
|
290 | 298 |
assert datasource2.name == 'Events B' |
291 | 299 |
assert datasource2.external == 'agenda' |
292 | 300 |
assert datasource2.external_status is None |
... | ... | |
314 | 322 |
datasource3.data_source['value'] = 'http://chrono.example.net/api/agenda/events-FOOBAR/datetimes/' |
315 | 323 |
datasource3.store() |
316 | 324 |
build_agenda_datasources(pub) |
317 |
assert NamedDataSource.count() == 2 + 3
|
|
318 |
datasource2 = NamedDataSource.get(2 + 2)
|
|
319 |
datasource3 = NamedDataSource.get(2 + 3)
|
|
320 |
datasource4 = NamedDataSource.get(2 + 4)
|
|
325 |
assert NamedDataSource.count() == 4 + 3
|
|
326 |
datasource2 = NamedDataSource.get(4 + 2)
|
|
327 |
datasource3 = NamedDataSource.get(4 + 3)
|
|
328 |
datasource4 = NamedDataSource.get(4 + 4)
|
|
321 | 329 |
assert datasource2.name == 'Events B' |
322 | 330 |
assert datasource2.slug == 'wrong_again' |
323 | 331 |
assert datasource2.external == 'agenda' |
... | ... | |
347 | 355 |
datasource4.external_status = 'not-found' |
348 | 356 |
datasource4.store() |
349 | 357 |
build_agenda_datasources(pub) |
350 |
assert NamedDataSource.count() == 2 + 3
|
|
351 |
datasource4 = NamedDataSource.get(2 + 4)
|
|
358 |
assert NamedDataSource.count() == 4 + 3
|
|
359 |
datasource4 = NamedDataSource.get(4 + 4)
|
|
352 | 360 |
assert datasource4.external_status is None |
tests/test_snapshots.py | ||
---|---|---|
496 | 496 |
assert '<p>%s</p>' % localstrftime(snapshot.timestamp) in resp.text |
497 | 497 |
with pytest.raises(IndexError): |
498 | 498 |
resp = resp.click('Edit') |
499 |
with pytest.raises(IndexError): |
|
500 |
resp = resp.click('Duplicate') |
|
499 | 501 | |
500 | 502 | |
501 | 503 |
def test_form_snapshot_browse(pub, formdef_with_history): |
wcs/admin/data_sources.py | ||
---|---|---|
36 | 36 |
from wcs.qommon.errors import AccessForbiddenError |
37 | 37 |
from wcs.qommon.form import ( |
38 | 38 |
CheckboxWidget, |
39 |
ComputedExpressionWidget, |
|
39 | 40 |
DurationWidget, |
40 | 41 |
FileWidget, |
41 | 42 |
Form, |
... | ... | |
44 | 45 |
SlugWidget, |
45 | 46 |
StringWidget, |
46 | 47 |
TextWidget, |
48 |
WidgetDict, |
|
47 | 49 |
WidgetList, |
48 | 50 |
get_response, |
49 | 51 |
get_session, |
... | ... | |
78 | 80 |
rows=5, |
79 | 81 |
value=self.datasource.description, |
80 | 82 |
) |
81 |
if not self.datasource or self.datasource.type != 'wcs:users': |
|
83 |
if not self.datasource or ( |
|
84 |
self.datasource.type != 'wcs:users' and self.datasource.external != 'agenda_manual' |
|
85 |
): |
|
82 | 86 |
form.add( |
83 | 87 |
DataSourceSelectionWidget, |
84 | 88 |
'data_source', |
... | ... | |
105 | 109 |
'data-dynamic-display-value-in': 'json|geojson', |
106 | 110 |
}, |
107 | 111 |
) |
108 |
if not self.datasource or self.datasource.type != 'wcs:users': |
|
112 |
if not self.datasource or ( |
|
113 |
self.datasource.type != 'wcs:users' and self.datasource.external != 'agenda_manual' |
|
114 |
): |
|
109 | 115 |
form.add( |
110 | 116 |
StringWidget, |
111 | 117 |
'query_parameter', |
... | ... | |
243 | 249 |
title=_('Record on errors'), |
244 | 250 |
value=self.datasource.record_on_errors, |
245 | 251 |
) |
252 |
if self.datasource.external == 'agenda_manual': |
|
253 |
form.add( |
|
254 |
WidgetDict, |
|
255 |
'qs_data', |
|
256 |
title=_('Query string data'), |
|
257 |
value=self.datasource.qs_data or {}, |
|
258 |
element_value_type=ComputedExpressionWidget, |
|
259 |
element_value_kwargs={'allow_python': False}, |
|
260 |
allow_empty_values=True, |
|
261 |
value_for_empty_value='', |
|
262 |
) |
|
246 | 263 | |
247 | 264 |
if not self.datasource.is_readonly(): |
248 | 265 |
form.add_submit('submit', _('Submit')) |
... | ... | |
283 | 300 |
'edit', |
284 | 301 |
'delete', |
285 | 302 |
'export', |
303 |
'duplicate', |
|
286 | 304 |
('history', 'snapshots_dir'), |
287 | 305 |
] |
288 | 306 |
do_not_call_in_templates = True |
... | ... | |
305 | 323 |
if hasattr(self.datasource, 'snapshot_object'): |
306 | 324 |
r += utils.snapshot_info_block(snapshot=self.datasource.snapshot_object) |
307 | 325 |
r += htmltext('<ul id="sidebar-actions">') |
308 |
if not self.datasource.is_readonly(): |
|
326 |
if self.datasource.external == 'agenda': |
|
327 |
r += htmltext('<li><a href="duplicate" rel="popup">%s</a></li>') % _('Duplicate') |
|
328 |
elif not self.datasource.is_readonly(): |
|
309 | 329 |
r += htmltext('<li><a href="delete" rel="popup">%s</a></li>') % _('Delete') |
310 | 330 |
r += htmltext('<li><a href="export">%s</a></li>') % _('Export') |
311 | 331 |
if get_publisher().snapshot_class: |
... | ... | |
422 | 442 |
content_type='application/x-wcs-datasource', |
423 | 443 |
) |
424 | 444 | |
445 |
def duplicate(self): |
|
446 |
if not self.datasource.external == 'agenda' or hasattr(self.datasource, 'snapshot_object'): |
|
447 |
return redirect('.') |
|
448 | ||
449 |
form = Form(enctype='multipart/form-data') |
|
450 |
form.add_submit('duplicate', _('Submit')) |
|
451 |
form.add_submit('cancel', _('Cancel')) |
|
452 |
if form.get_widget('cancel').parse(): |
|
453 |
return redirect('.') |
|
454 |
if not form.is_submitted() or form.has_errors(): |
|
455 |
get_response().breadcrumb.append(('duplicate', _('Duplicate'))) |
|
456 |
html_top('datasources', title=_('Duplicate Data Source')) |
|
457 |
r = TemplateIO(html=True) |
|
458 |
r += htmltext('<h2>%s %s</h2>') % (_('Duplicating Data Source:'), self.datasource.name) |
|
459 |
r += form.render() |
|
460 |
return r.getvalue() |
|
461 | ||
462 |
tree = self.datasource.export_to_xml(include_id=True) |
|
463 |
new_datasource = NamedDataSource.import_from_xml_tree(tree) |
|
464 |
new_datasource.name = _('Copy of %s' % new_datasource.name) |
|
465 |
new_datasource.slug = new_datasource.get_new_slug(new_datasource.slug) |
|
466 |
new_datasource.external = 'agenda_manual' |
|
467 |
new_datasource.store() |
|
468 |
return redirect('../%s' % new_datasource.id) |
|
469 | ||
425 | 470 | |
426 | 471 |
class NamedDataSourcesDirectory(Directory): |
427 | 472 |
_q_exports = [ |
wcs/data_sources.py | ||
---|---|---|
388 | 388 |
if Template.is_template_string(url): |
389 | 389 |
vars = get_publisher().substitutions.get_context_variables(mode='lazy') |
390 | 390 |
url = get_variadic_url(url, vars) |
391 |
if data_source.get('qs_data'): # merge qs_data into url |
|
392 |
from wcs.workflows import WorkflowStatusItem |
|
393 | ||
394 |
parsed = urllib.parse.urlparse(url) |
|
395 |
qs = list(urllib.parse.parse_qsl(parsed.query)) |
|
396 |
for key, value in data_source['qs_data'].items(): |
|
397 |
try: |
|
398 |
value = WorkflowStatusItem.compute(value, raises=True, record_errors=False) |
|
399 |
value = str(value) if value is not None else '' |
|
400 |
except Exception as e: |
|
401 |
get_publisher().record_error( |
|
402 |
_( |
|
403 |
'Failed to compute value "%(value)s" for "%(query)s" query parameter' |
|
404 |
% {'value': value, 'query': key} |
|
405 |
), |
|
406 |
context='[DATASOURCE]', |
|
407 |
exception=e, |
|
408 |
notify=data_source.get('notify_on_errors'), |
|
409 |
record=data_source.get('record_on_errors'), |
|
410 |
) |
|
411 |
else: |
|
412 |
key = force_str(key) |
|
413 |
value = force_str(value) |
|
414 |
qs.append((key, value)) |
|
415 |
qs = urllib.parse.urlencode(qs) |
|
416 |
url = urllib.parse.urlunparse(parsed[:4] + (qs,) + parsed[5:6]) |
|
391 | 417 | |
392 | 418 |
request = get_request() |
393 | 419 |
if hasattr(request, 'datasources_cache') and url in request.datasources_cache: |
... | ... | |
464 | 490 |
id_attribute = None |
465 | 491 |
text_attribute = None |
466 | 492 |
id_property = None |
493 |
qs_data = None |
|
467 | 494 |
label_template_property = None |
468 | 495 |
external = None |
469 | 496 |
external_status = None |
... | ... | |
487 | 514 |
('id_attribute', 'str'), |
488 | 515 |
('text_attribute', 'str'), |
489 | 516 |
('id_property', 'str'), |
517 |
('qs_data', 'qs_data'), |
|
490 | 518 |
('label_template_property', 'str'), |
491 | 519 |
('external', 'str'), |
492 | 520 |
('external_status', 'str'), |
... | ... | |
543 | 571 |
'data_attribute': self.data_attribute, |
544 | 572 |
'id_attribute': self.id_attribute, |
545 | 573 |
'text_attribute': self.text_attribute, |
574 |
'qs_data': self.qs_data, |
|
546 | 575 |
'notify_on_errors': notify_on_errors, |
547 | 576 |
'record_on_errors': record_on_errors, |
548 | 577 |
} |
... | ... | |
612 | 641 |
'value': force_str(element.find('value').text or ''), |
613 | 642 |
} |
614 | 643 | |
644 |
def export_qs_data_to_xml(self, element, attribute_name, *args, **kwargs): |
|
645 |
if not self.qs_data: |
|
646 |
return |
|
647 |
for (key, value) in self.qs_data.items(): |
|
648 |
item = ET.SubElement(element, 'item') |
|
649 |
if isinstance(key, str): |
|
650 |
ET.SubElement(item, 'name').text = force_text(key) |
|
651 |
else: |
|
652 |
raise AssertionError('unknown type for key (%r)' % key) |
|
653 |
if isinstance(value, str): |
|
654 |
ET.SubElement(item, 'value').text = force_text(value) |
|
655 |
else: |
|
656 |
raise AssertionError('unknown type for value (%r)' % key) |
|
657 | ||
658 |
def import_qs_data_from_xml(self, element, **kwargs): |
|
659 |
if element is None: |
|
660 |
return |
|
661 |
qs_data = {} |
|
662 |
for item in element.findall('item'): |
|
663 |
key = force_str(item.find('name').text) |
|
664 |
value = force_str(item.find('value').text or '') |
|
665 |
qs_data[key] = value |
|
666 |
return qs_data |
|
667 | ||
615 | 668 |
def get_dependencies(self): |
616 | 669 |
yield self.category |
617 | 670 | |
... | ... | |
635 | 688 |
return data_source |
636 | 689 | |
637 | 690 |
def get_json_query_url(self): |
638 |
url = self.data_source.get('value').strip() |
|
639 |
if Template.is_template_string(url): |
|
640 |
vars = get_publisher().substitutions.get_context_variables(mode='lazy') |
|
641 |
url = get_variadic_url(url, vars) |
|
691 |
url = self.get_variadic_url() |
|
642 | 692 |
if not url: |
643 | 693 |
return '' |
644 | 694 |
if '?' not in url: |
... | ... | |
702 | 752 |
def get_geojson_url(self): |
703 | 753 |
assert self.type == 'geojson' |
704 | 754 |
url = self.data_source.get('value').strip() |
705 |
if Template.is_template_string(url): |
|
706 |
context = get_publisher().substitutions.get_context_variables(mode='lazy') |
|
707 |
new_url = get_variadic_url(url, context) |
|
708 |
if new_url != url: |
|
709 |
token_context = {'session_id': get_session().id, 'url': new_url, 'slug': self.slug} |
|
710 |
token, created = get_publisher().token_class.get_or_create( |
|
711 |
type='autocomplete', context=token_context |
|
712 |
) |
|
713 |
if created: |
|
714 |
token.store() |
|
715 |
return '/api/geojson/%s' % token.id |
|
755 |
new_url = self.get_variadic_url() |
|
756 |
if new_url != url: |
|
757 |
token_context = {'session_id': get_session().id, 'url': new_url, 'slug': self.slug} |
|
758 |
token, created = get_publisher().token_class.get_or_create( |
|
759 |
type='autocomplete', context=token_context |
|
760 |
) |
|
761 |
if created: |
|
762 |
token.store() |
|
763 |
return '/api/geojson/%s' % token.id |
|
716 | 764 |
return '/api/geojson/%s' % self.slug |
717 | 765 | |
718 | 766 |
def get_geojson_data(self, force_url=None): |
719 | 767 |
if force_url: |
720 | 768 |
url = force_url |
721 | 769 |
else: |
722 |
url = self.data_source.get('value').strip() |
|
723 |
if Template.is_template_string(url): |
|
724 |
context = get_publisher().substitutions.get_context_variables(mode='lazy') |
|
725 |
url = get_variadic_url(url, context) |
|
770 |
url = self.get_variadic_url() |
|
726 | 771 | |
727 | 772 |
request = get_request() |
728 | 773 |
if hasattr(request, 'datasources_cache') and url in request.datasources_cache: |
... | ... | |
763 | 808 |
return data |
764 | 809 | |
765 | 810 |
def get_value_by_id(self, param_name, param_value): |
766 |
url = self.data_source.get('value').strip() |
|
767 |
if Template.is_template_string(url): |
|
768 |
vars = get_publisher().substitutions.get_context_variables(mode='lazy') |
|
769 |
url = get_variadic_url(url, vars) |
|
811 |
url = self.get_variadic_url() |
|
770 | 812 | |
771 | 813 |
if '?' not in url: |
772 | 814 |
url += '?' |
wcs/templates/wcs/backoffice/data-source.html | ||
---|---|---|
45 | 45 |
</li> |
46 | 46 |
{% endspaceless %} |
47 | 47 |
{% endif %} |
48 |
{% if datasource.qs_data %} |
|
49 |
<li>{% trans "Query string data:" %} |
|
50 |
<ul> |
|
51 |
{% for k, v in datasource.qs_data.items %} |
|
52 |
<li>{% blocktrans %}{{ k }}:{% endblocktrans %} {{ v }}</li> |
|
53 |
{% endfor %} |
|
54 |
</ul> |
|
55 |
</li> |
|
56 |
{% endif %} |
|
48 | 57 |
{% if datasource.cache_duration %} |
49 | 58 |
<li>{% trans "Cache Duration:" %} {{ datasource.humanized_cache_duration }} |
50 | 59 |
{% endif %} |
51 |
- |