Projet

Général

Profil

0001-datasource-duplicate-and-configure-agenda-datasource.patch

Lauréline Guérin, 19 juin 2022 17:01

Télécharger (35,6 ko)

Voir les différences:

Subject: [PATCH] datasource: duplicate and configure agenda datasource
 (#63173)

 tests/admin_pages/test_datasource.py          | 200 +++++++++++++++++-
 tests/test_datasource.py                      |  66 +++++-
 tests/test_datasource_chrono.py               |  44 ++--
 tests/test_snapshots.py                       |   2 +
 wcs/admin/data_sources.py                     |  50 ++++-
 wcs/data_sources.py                           | 102 +++++++--
 wcs/templates/wcs/backoffice/data-source.html |  78 ++++---
 7 files changed, 463 insertions(+), 79 deletions(-)
tests/admin_pages/test_datasource.py
271 271
    ]
272 272

  
273 273

  
274
def test_data_sources_agenda_manual_qs_data_type_options(pub):
275
    create_superuser(pub)
276

  
277
    data_source = NamedDataSource(name='foobar')
278
    data_source.external = 'agenda_manual'
279
    data_source.store()
280

  
281
    if not pub.site_options.has_section('options'):
282
        pub.site_options.add_section('options')
283
    pub.site_options.set('options', 'disable-python-expressions', 'false')
284
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
285
        pub.site_options.write(fd)
286

  
287
    app = login(get_app(pub))
288
    resp = app.get('/backoffice/settings/data-sources/%s/edit' % data_source.id)
289
    assert resp.form['qs_data$element0value$type'].options == [
290
        ('text', False, 'Text'),
291
        ('template', False, 'Template'),
292
    ]
293

  
294
    pub.site_options.set('options', 'disable-python-expressions', 'true')
295
    with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd:
296
        pub.site_options.write(fd)
297

  
298
    resp = app.get('/backoffice/settings/data-sources/%s/edit' % data_source.id)
299
    assert resp.form['qs_data$element0value$type'].options == [
300
        ('text', False, 'Text'),
301
        ('template', False, 'Template'),
302
    ]
303

  
304

  
274 305
def test_data_sources_category(pub):
275 306
    create_superuser(pub)
276 307

  
......
357 388
    data_source.data_source = {'type': 'formula', 'value': '[]'}
358 389
    data_source.store()
359 390
    resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
391
    assert 'This data source is readonly.' not in resp
392
    assert 'href="edit"' in resp
393
    assert 'href="delete"' in resp
394
    assert 'href="duplicate"' in resp
360 395
    assert 'Type of source: Python Expression' in resp.text
361 396
    assert 'Python Expression' in resp.text
362 397
    assert 'Preview' not in resp.text
......
541 576
    assert 'Unexpected fatal error getting items for preview' in resp.text
542 577

  
543 578

  
579
def test_data_sources_duplicate(pub):
580
    create_superuser(pub)
581
    NamedDataSource.wipe()
582

  
583
    data_source = NamedDataSource(name='foobar')
584
    data_source.data_source = {'type': 'json', 'value': '{{data_source.foobar}}'}
585
    data_source.store()
586

  
587
    app = login(get_app(pub))
588
    resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id)
589
    resp = resp.click(href='duplicate')
590
    resp = resp.forms[0].submit()
591
    assert NamedDataSource.count() == 2
592
    new_data_source = NamedDataSource.select(order_by='id')[1]
593
    assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s' % new_data_source.id
594
    assert new_data_source.data_source == {'type': 'json', 'value': '{{data_source.foobar}}'}
595
    assert new_data_source.external is None
596

  
597

  
544 598
def test_data_sources_agenda_view(pub):
545 599
    create_superuser(pub)
546 600
    NamedDataSource.wipe()
......
553 607
    app = login(get_app(pub))
554 608
    resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
555 609
    assert 'This data source is readonly.' in resp
556
    assert '/backoffice/settings/data-sources/%s/edit' % data_source.id not in resp
557
    assert '/backoffice/settings/data-sources/%s/delete' % data_source.id not in resp
610
    assert 'href="edit"' not in resp
611
    assert 'href="delete"' not in resp
612
    assert 'href="duplicate"' in resp
613

  
614

  
615
def test_data_sources_agenda_duplicate(pub):
616
    create_superuser(pub)
617
    NamedDataSource.wipe()
618

  
619
    data_source = NamedDataSource(name='foobar')
620
    data_source.data_source = {'type': 'json', 'value': 'http://some.url'}
621
    data_source.external = 'agenda'
622
    data_source.store()
623

  
624
    app = login(get_app(pub))
625
    resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id)
626
    resp = resp.forms[0].submit('cancel')
627
    assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s/' % data_source.id
628
    assert NamedDataSource.count() == 1
629

  
630
    resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id)
631
    resp = resp.click(href='duplicate')
632
    resp = resp.forms[0].submit()
633
    assert NamedDataSource.count() == 2
634
    new_data_source = NamedDataSource.select(order_by='id')[1]
635
    assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s' % new_data_source.id
636
    assert new_data_source.data_source == {'type': 'json', 'value': 'http://some.url'}
637
    assert new_data_source.external == 'agenda_manual'
638
    assert new_data_source.qs_data is None
639

  
640

  
641
def test_data_sources_agenda_manual_view(pub):
642
    create_superuser(pub)
643
    NamedDataSource.wipe()
644

  
645
    data_source = NamedDataSource(name='foobar')
646
    data_source.data_source = {'type': 'json', 'value': 'http://some.url'}
647
    data_source.external = 'agenda_manual'
648
    data_source.qs_data = {'var1': 'value1', 'var2': 'value2'}
649
    data_source.store()
650

  
651
    app = login(get_app(pub))
652
    resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
653
    assert 'This data source is readonly.' not in resp
654
    assert 'href="edit"' in resp
655
    assert 'href="delete"' in resp
656
    assert 'href="duplicate"' in resp
657
    assert 'Type of source: Agenda data' in resp
658
    assert 'Copy of' not in resp
659
    assert 'Extra query string data' in resp
660
    assert '<li>var1: value1</li>' in resp
661
    assert '<li>var2: value2</li>' in resp
662

  
663
    data_source.qs_data = None
664
    data_source.store()
665
    resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
666
    assert 'Extra Query string data' not in resp
667

  
668
    data_source2 = NamedDataSource(name='foobar')
669
    data_source2.data_source = {'type': 'json', 'value': 'http://some.url'}
670
    data_source2.external = 'agenda'
671
    data_source2.store()
672
    resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
673
    assert (
674
        'Copy of: <a href="http://example.net/backoffice/settings/data-sources/%s/">foobar</a>'
675
        % data_source2.id
676
        in resp
677
    )
678

  
679

  
680
def test_data_sources_agenda_manual_duplicate(pub):
681
    create_superuser(pub)
682
    NamedDataSource.wipe()
683

  
684
    data_source = NamedDataSource(name='foobar')
685
    data_source.data_source = {'type': 'json', 'value': 'http://some.url'}
686
    data_source.external = 'agenda_manual'
687
    data_source.qs_data = {'var1': 'value1', 'var2': 'value2'}
688
    data_source.store()
689

  
690
    app = login(get_app(pub))
691
    resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id)
692
    resp = resp.click(href='duplicate')
693
    resp = resp.forms[0].submit()
694
    assert NamedDataSource.count() == 2
695
    new_data_source = NamedDataSource.select(order_by='id')[1]
696
    assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s' % new_data_source.id
697
    assert new_data_source.data_source == {'type': 'json', 'value': 'http://some.url'}
698
    assert new_data_source.external == 'agenda_manual'
699
    assert new_data_source.qs_data == {'var1': 'value1', 'var2': 'value2'}
700

  
701

  
702
def test_data_sources_user_view(pub):
703
    create_superuser(pub)
704
    NamedDataSource.wipe()
705

  
706
    data_source = NamedDataSource(name='foobar')
707
    data_source.data_source = {'type': 'wcs:users'}
708
    data_source.store()
709

  
710
    app = login(get_app(pub))
711
    resp = app.get('/backoffice/settings/data-sources/%s/' % data_source.id)
712
    assert 'This data source is readonly.' not in resp
713
    assert 'href="edit"' in resp
714
    assert 'href="delete"' in resp
715
    assert 'href="duplicate"' in resp
716

  
717

  
718
def test_data_sources_user_duplicate(pub):
719
    create_superuser(pub)
720
    NamedDataSource.wipe()
721

  
722
    data_source = NamedDataSource(name='foobar')
723
    data_source.data_source = {'type': 'wcs:users'}
724
    data_source.store()
725

  
726
    app = login(get_app(pub))
727
    resp = app.get('/backoffice/settings/data-sources/%s/duplicate' % data_source.id)
728
    resp = resp.click(href='duplicate')
729
    resp = resp.forms[0].submit()
730
    assert NamedDataSource.count() == 2
731
    new_data_source = NamedDataSource.select(order_by='id')[1]
732
    assert resp.location == 'http://example.net/backoffice/settings/data-sources/%s' % new_data_source.id
733
    assert new_data_source.data_source == {'type': 'wcs:users', 'value': ''}
734
    assert new_data_source.external is None
558 735

  
559 736

  
560 737
def test_data_sources_edit(pub):
......
605 782
    assert resp.location == 'http://example.net/backoffice/settings/data-sources/1/'
606 783

  
607 784

  
785
def test_data_sources_agenda_manual_edit(pub):
786
    create_superuser(pub)
787
    NamedDataSource.wipe()
788

  
789
    data_source = NamedDataSource(name='foobar')
790
    data_source.external = 'agenda_manual'
791
    data_source.store()
792

  
793
    app = login(get_app(pub))
794
    resp = app.get('/backoffice/settings/data-sources/%s/edit' % data_source.id)
795
    resp.forms[0]['qs_data$element0key'] = 'arg1'
796
    resp.forms[0]['qs_data$element0value$value_template'] = '{{ foobar }}'
797
    resp.forms[0]['qs_data$element0value$type'] = 'template'
798
    resp = resp.forms[0].submit('submit')
799

  
800
    data_source = NamedDataSource.get(data_source.id)
801
    assert data_source.qs_data == {'arg1': '{{ foobar }}'}
802

  
803

  
608 804
def test_data_sources_delete(pub):
609 805
    create_superuser(pub)
610 806
    NamedDataSource.wipe()
tests/test_datasource.py
424 424
    ]
425 425

  
426 426

  
427
def test_json_datasource_bad_url(pub, error_email, http_requests, emails, caplog):
427
def test_json_datasource_bad_url(pub, error_email, http_requests, emails):
428 428
    datasource = {'type': 'json', 'value': 'http://remote.example.net/404'}
429 429
    assert data_sources.get_items(datasource) == []
430 430
    assert emails.count() == 0
......
530 530
        )
531 531

  
532 532

  
533
@pytest.mark.parametrize('notify', [True, False])
534
@pytest.mark.parametrize('record', [True, False])
535
def test_json_datasource_bad_qs_data(pub, error_email, emails, notify, record):
536
    datasource = {
537
        'type': 'json',
538
        'value': "https://whatever.com/json",
539
        'qs_data': {'foo': '{% for invalid %}', 'bar': '{{ valid }}'},
540
        'notify_on_errors': notify,
541
        'record_on_errors': record,
542
    }
543
    with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
544
        urlopen.side_effect = lambda *args: io.StringIO(json.dumps({'data': [{'id': '1', 'text': 'foo'}]}))
545
        assert data_sources.get_items(datasource) == [('1', 'foo', '1', {'id': '1', 'text': 'foo'})]
546
    url = urlopen.call_args[0][0]
547
    assert url == 'https://whatever.com/json?bar='
548
    message = '[DATASOURCE] Failed to compute value "{% for invalid %}" for "foo" query parameter'
549
    if notify:
550
        assert emails.count() == 1
551
        assert message in emails.get_latest('subject')
552
    else:
553
        assert emails.count() == 0
554
    if pub.is_using_postgresql():
555
        if record:
556
            assert pub.loggederror_class.count() == 1
557
            logged_error = pub.loggederror_class.select(order_by='id')[0]
558
            assert logged_error.summary == message
559
        else:
560
            assert pub.loggederror_class.count() == 0
561

  
562

  
533 563
def test_geojson_datasource(pub, requests_pub, http_requests):
534 564
    get_request()
535 565
    get_request().datasources_cache = {}
......
1065 1095
        ]
1066 1096

  
1067 1097

  
1068
def test_data_source_signed(no_request_pub):
1098
@pytest.mark.parametrize('qs_data', [{}, {'arg1': 'val1', 'arg2': 'val2'}])
1099
def test_data_source_signed(no_request_pub, qs_data):
1069 1100
    NamedDataSource.wipe()
1070 1101
    data_source = NamedDataSource(name='foobar')
1071 1102
    data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json"}
1103
    data_source.qs_data = qs_data
1072 1104
    data_source.store()
1073 1105

  
1074 1106
    with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
......
1084 1116
    assert querystring['nonce'][0]
1085 1117
    assert querystring['timestamp'][0]
1086 1118
    assert querystring['signature'][0]
1119
    if qs_data:
1120
        assert querystring['arg1'][0] == 'val1'
1121
        assert querystring['arg2'][0] == 'val2'
1087 1122

  
1088 1123
    data_source.data_source = {'type': 'json', 'value': "https://api.example.com/json?foo=bar"}
1089 1124
    data_source.store()
......
1100 1135
    assert querystring['timestamp'][0]
1101 1136
    assert querystring['signature'][0]
1102 1137
    assert querystring['foo'][0] == 'bar'
1138
    if qs_data:
1139
        assert querystring['arg1'][0] == 'val1'
1140
        assert querystring['arg2'][0] == 'val2'
1103 1141

  
1104 1142
    data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json"}
1105 1143
    data_source.store()
......
1107 1145
        urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
1108 1146
        assert len(data_sources.get_items({'type': 'foobar'})) == 1
1109 1147
        unsigned_url = urlopen.call_args[0][0]
1110
    assert unsigned_url == 'https://no-secret.example.com/json'
1148
    if qs_data:
1149
        assert unsigned_url == 'https://no-secret.example.com/json?arg1=val1&arg2=val2'
1150
    else:
1151
        assert unsigned_url == 'https://no-secret.example.com/json'
1152

  
1153
    data_source.data_source = {'type': 'json', 'value': "https://no-secret.example.com/json?foo=bar"}
1154
    data_source.store()
1155
    with mock.patch('wcs.qommon.misc.urlopen') as urlopen:
1156
        urlopen.side_effect = lambda *args: io.StringIO('{"data": [{"id": 0, "text": "zero"}]}')
1157
        assert len(data_sources.get_items({'type': 'foobar'})) == 1
1158
        unsigned_url = urlopen.call_args[0][0]
1159
    if qs_data:
1160
        assert unsigned_url == 'https://no-secret.example.com/json?foo=bar&arg1=val1&arg2=val2'
1161
    else:
1162
        assert unsigned_url == 'https://no-secret.example.com/json?foo=bar'
1111 1163

  
1112 1164

  
1113 1165
def test_named_datasource_json_cache(requests_pub):
......
1326 1378
    export = ET.tostring(data_source.export_to_xml(include_id=True))
1327 1379
    data_source3 = NamedDataSource.import_from_xml_tree(ET.fromstring(export), include_id=True)
1328 1380
    assert data_source3.category_id is None
1381

  
1382

  
1383
def test_data_source_with_qs_data(pub):
1384
    data_source = NamedDataSource(name='test')
1385
    data_source.qs_data = {'arg1': 'val1', 'arg2': 'val2'}
1386
    data_source.store()
1387
    data_source2 = assert_import_export_works(data_source, include_id=True)
1388
    assert data_source2.qs_data == {'arg1': 'val1', 'arg2': 'val2'}
tests/test_datasource_chrono.py
208 208
    pub.load_site_options()
209 209
    NamedDataSource.wipe()
210 210

  
211
    # create some datasource, with same urls, but not external
211
    # create some datasource, with same urls, but external != 'agenda'
212 212
    ds = NamedDataSource(name='Foo A')
213 213
    ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-A/datetimes/'}
214 214
    ds.store()
215 215
    ds = NamedDataSource(name='Foo B')
216 216
    ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-B/datetimes/'}
217 217
    ds.store()
218
    ds = NamedDataSource(name='Foo A')
219
    ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-A/datetimes/'}
220
    ds.external = 'agenda_manual'
221
    ds.store()
222
    ds = NamedDataSource(name='Foo B')
223
    ds.external = 'agenda_manual'
224
    ds.data_source = {'type': 'json', 'value': 'http://chrono.example.net/api/agenda/events-B/datetimes/'}
225
    ds.store()
218 226

  
219 227
    # error during collect
220 228
    mock_collect.return_value = None
221 229
    build_agenda_datasources(pub)
222
    assert NamedDataSource.count() == 2  # no changes
230
    assert NamedDataSource.count() == 4  # no changes
223 231

  
224 232
    # no agenda datasource found in chrono
225 233
    mock_collect.return_value = []
226 234
    build_agenda_datasources(pub)
227
    assert NamedDataSource.count() == 2  # no changes
235
    assert NamedDataSource.count() == 4  # no changes
228 236

  
229 237
    # 2 agenda datasources found
230 238
    mock_collect.return_value = [
......
242 250

  
243 251
    # agenda datasources does not exist, create them
244 252
    build_agenda_datasources(pub)
245
    assert NamedDataSource.count() == 2 + 2
246
    datasource1 = NamedDataSource.get(2 + 1)
247
    datasource2 = NamedDataSource.get(2 + 2)
253
    assert NamedDataSource.count() == 4 + 2
254
    datasource1 = NamedDataSource.get(4 + 1)
255
    datasource2 = NamedDataSource.get(4 + 2)
248 256
    assert datasource1.name == 'Events A'
249 257
    assert datasource1.slug == 'chrono_ds_sluga'
250 258
    assert datasource1.external == 'agenda'
......
270 278
    datasource2.slug = 'wrong_again'
271 279
    datasource2.store()
272 280
    build_agenda_datasources(pub)
273
    assert NamedDataSource.count() == 2 + 2
274
    datasource1 = NamedDataSource.get(2 + 1)
275
    datasource2 = NamedDataSource.get(2 + 2)
281
    assert NamedDataSource.count() == 4 + 2
282
    datasource1 = NamedDataSource.get(4 + 1)
283
    datasource2 = NamedDataSource.get(4 + 2)
276 284
    assert datasource1.name == 'Events A'
277 285
    assert datasource1.slug == 'wrong'
278 286
    assert datasource2.name == 'Events B'
......
283 291
    datasource1.store()
284 292

  
285 293
    build_agenda_datasources(pub)
286
    assert NamedDataSource.count() == 2 + 2
294
    assert NamedDataSource.count() == 4 + 2
287 295
    # first datasource was deleted, because not found and not used
288
    datasource2 = NamedDataSource.get(2 + 2)
289
    datasource3 = NamedDataSource.get(2 + 3)
296
    datasource2 = NamedDataSource.get(4 + 2)
297
    datasource3 = NamedDataSource.get(4 + 3)
290 298
    assert datasource2.name == 'Events B'
291 299
    assert datasource2.external == 'agenda'
292 300
    assert datasource2.external_status is None
......
314 322
    datasource3.data_source['value'] = 'http://chrono.example.net/api/agenda/events-FOOBAR/datetimes/'
315 323
    datasource3.store()
316 324
    build_agenda_datasources(pub)
317
    assert NamedDataSource.count() == 2 + 3
318
    datasource2 = NamedDataSource.get(2 + 2)
319
    datasource3 = NamedDataSource.get(2 + 3)
320
    datasource4 = NamedDataSource.get(2 + 4)
325
    assert NamedDataSource.count() == 4 + 3
326
    datasource2 = NamedDataSource.get(4 + 2)
327
    datasource3 = NamedDataSource.get(4 + 3)
328
    datasource4 = NamedDataSource.get(4 + 4)
321 329
    assert datasource2.name == 'Events B'
322 330
    assert datasource2.slug == 'wrong_again'
323 331
    assert datasource2.external == 'agenda'
......
347 355
    datasource4.external_status = 'not-found'
348 356
    datasource4.store()
349 357
    build_agenda_datasources(pub)
350
    assert NamedDataSource.count() == 2 + 3
351
    datasource4 = NamedDataSource.get(2 + 4)
358
    assert NamedDataSource.count() == 4 + 3
359
    datasource4 = NamedDataSource.get(4 + 4)
352 360
    assert datasource4.external_status is None
tests/test_snapshots.py
496 496
    assert '<p>%s</p>' % localstrftime(snapshot.timestamp) in resp.text
497 497
    with pytest.raises(IndexError):
498 498
        resp = resp.click('Edit')
499
    with pytest.raises(IndexError):
500
        resp = resp.click('Duplicate')
499 501

  
500 502

  
501 503
def test_form_snapshot_browse(pub, formdef_with_history):
wcs/admin/data_sources.py
36 36
from wcs.qommon.errors import AccessForbiddenError
37 37
from wcs.qommon.form import (
38 38
    CheckboxWidget,
39
    ComputedExpressionWidget,
39 40
    DurationWidget,
40 41
    FileWidget,
41 42
    Form,
......
44 45
    SlugWidget,
45 46
    StringWidget,
46 47
    TextWidget,
48
    WidgetDict,
47 49
    WidgetList,
48 50
    get_response,
49 51
    get_session,
......
78 80
            rows=5,
79 81
            value=self.datasource.description,
80 82
        )
81
        if not self.datasource or self.datasource.type != 'wcs:users':
83
        if not self.datasource or (
84
            self.datasource.type != 'wcs:users' and self.datasource.external != 'agenda_manual'
85
        ):
82 86
            form.add(
83 87
                DataSourceSelectionWidget,
84 88
                'data_source',
......
105 109
                'data-dynamic-display-value-in': 'json|geojson',
106 110
            },
107 111
        )
108
        if not self.datasource or self.datasource.type != 'wcs:users':
112
        if not self.datasource or (
113
            self.datasource.type != 'wcs:users' and self.datasource.external != 'agenda_manual'
114
        ):
109 115
            form.add(
110 116
                StringWidget,
111 117
                'query_parameter',
......
249 255
                title=_('Record on errors'),
250 256
                value=self.datasource.record_on_errors,
251 257
            )
258
        if self.datasource.external == 'agenda_manual':
259
            form.add(
260
                WidgetDict,
261
                'qs_data',
262
                title=_('Query string data'),
263
                value=self.datasource.qs_data or {},
264
                element_value_type=ComputedExpressionWidget,
265
                element_value_kwargs={'allow_python': False},
266
                allow_empty_values=True,
267
                value_for_empty_value='',
268
            )
252 269

  
253 270
        if not self.datasource.is_readonly():
254 271
            form.add_submit('submit', _('Submit'))
......
289 306
        'edit',
290 307
        'delete',
291 308
        'export',
309
        'duplicate',
292 310
        ('history', 'snapshots_dir'),
293 311
    ]
294 312
    do_not_call_in_templates = True
......
311 329
            if hasattr(self.datasource, 'snapshot_object'):
312 330
                r += utils.snapshot_info_block(snapshot=self.datasource.snapshot_object)
313 331
        r += htmltext('<ul id="sidebar-actions">')
332
        if self.datasource.external == 'agenda' or not self.datasource.is_readonly():
333
            r += htmltext('<li><a href="duplicate" rel="popup">%s</a></li>') % _('Duplicate')
314 334
        if not self.datasource.is_readonly():
315 335
            r += htmltext('<li><a href="delete" rel="popup">%s</a></li>') % _('Delete')
316 336
            r += htmltext('<li><a href="export">%s</a></li>') % _('Export')
......
444 464
            content_type='application/x-wcs-datasource',
445 465
        )
446 466

  
467
    def duplicate(self):
468
        if hasattr(self.datasource, 'snapshot_object'):
469
            return redirect('.')
470

  
471
        form = Form(enctype='multipart/form-data')
472
        form.add_submit('duplicate', _('Submit'))
473
        form.add_submit('cancel', _('Cancel'))
474
        if form.get_widget('cancel').parse():
475
            return redirect('.')
476
        if not form.is_submitted() or form.has_errors():
477
            get_response().breadcrumb.append(('duplicate', _('Duplicate')))
478
            html_top('datasources', title=_('Duplicate Data Source'))
479
            r = TemplateIO(html=True)
480
            r += htmltext('<h2>%s %s</h2>') % (_('Duplicating Data Source:'), self.datasource.name)
481
            r += form.render()
482
            return r.getvalue()
483

  
484
        tree = self.datasource.export_to_xml(include_id=True)
485
        new_datasource = NamedDataSource.import_from_xml_tree(tree)
486
        new_datasource.name = _('Copy of %s' % new_datasource.name)
487
        new_datasource.slug = new_datasource.get_new_slug(new_datasource.slug)
488
        if self.datasource.agenda_ds:
489
            new_datasource.external = 'agenda_manual'
490
        new_datasource.store()
491
        return redirect('../%s' % new_datasource.id)
492

  
447 493

  
448 494
class NamedDataSourcesDirectory(Directory):
449 495
    _q_exports = [
wcs/data_sources.py
397 397
        if Template.is_template_string(url):
398 398
            vars = get_publisher().substitutions.get_context_variables(mode='lazy')
399 399
            url = get_variadic_url(url, vars)
400
        if data_source.get('qs_data'):  # merge qs_data into url
401
            from wcs.workflows import WorkflowStatusItem
402

  
403
            parsed = urllib.parse.urlparse(url)
404
            qs = list(urllib.parse.parse_qsl(parsed.query))
405
            for key, value in data_source['qs_data'].items():
406
                try:
407
                    value = WorkflowStatusItem.compute(value, raises=True, record_errors=False)
408
                    value = str(value) if value is not None else ''
409
                except Exception as e:
410
                    get_publisher().record_error(
411
                        _(
412
                            'Failed to compute value "%(value)s" for "%(query)s" query parameter'
413
                            % {'value': value, 'query': key}
414
                        ),
415
                        context='[DATASOURCE]',
416
                        exception=e,
417
                        notify=data_source.get('notify_on_errors'),
418
                        record=data_source.get('record_on_errors'),
419
                    )
420
                else:
421
                    key = force_str(key)
422
                    value = force_str(value)
423
                    qs.append((key, value))
424
            qs = urllib.parse.urlencode(qs)
425
            url = urllib.parse.urlunparse(parsed[:4] + (qs,) + parsed[5:6])
400 426

  
401 427
        request = get_request()
402 428
        if hasattr(request, 'datasources_cache') and url in request.datasources_cache:
......
473 499
    id_attribute = None
474 500
    text_attribute = None
475 501
    id_property = None
502
    qs_data = None
476 503
    label_template_property = None
477 504
    external = None
478 505
    external_status = None
......
497 524
        ('id_attribute', 'str'),
498 525
        ('text_attribute', 'str'),
499 526
        ('id_property', 'str'),
527
        ('qs_data', 'qs_data'),
500 528
        ('label_template_property', 'str'),
501 529
        ('external', 'str'),
502 530
        ('external_status', 'str'),
......
554 582
                    'data_attribute': self.data_attribute,
555 583
                    'id_attribute': self.id_attribute,
556 584
                    'text_attribute': self.text_attribute,
585
                    'qs_data': self.qs_data,
557 586
                    'notify_on_errors': notify_on_errors,
558 587
                    'record_on_errors': record_on_errors,
559 588
                }
......
583 612
    def maybe_datetimes(self):
584 613
        return self.type == 'json' and 'datetimes' in (self.data_source.get('value') or '')
585 614

  
615
    @property
616
    def agenda_ds(self):
617
        return self.external in ['agenda', 'agenda_manual']
618

  
619
    @property
620
    def agenda_ds_origin(self):
621
        if self.external != 'agenda_manual':
622
            return
623
        for datasource in NamedDataSource.select():
624
            if datasource.external != 'agenda':
625
                continue
626
            if datasource.data_source.get('value') == self.data_source.get('value'):
627
                return datasource
628

  
586 629
    def migrate(self):
587 630
        changed = False
588 631

  
......
631 674
            'value': force_str(element.find('value').text or ''),
632 675
        }
633 676

  
677
    def export_qs_data_to_xml(self, element, attribute_name, *args, **kwargs):
678
        if not self.qs_data:
679
            return
680
        for (key, value) in self.qs_data.items():
681
            item = ET.SubElement(element, 'item')
682
            if isinstance(key, str):
683
                ET.SubElement(item, 'name').text = force_text(key)
684
            else:
685
                raise AssertionError('unknown type for key (%r)' % key)
686
            if isinstance(value, str):
687
                ET.SubElement(item, 'value').text = force_text(value)
688
            else:
689
                raise AssertionError('unknown type for value (%r)' % key)
690

  
691
    def import_qs_data_from_xml(self, element, **kwargs):
692
        if element is None:
693
            return
694
        qs_data = {}
695
        for item in element.findall('item'):
696
            key = force_str(item.find('name').text)
697
            value = force_str(item.find('value').text or '')
698
            qs_data[key] = value
699
        return qs_data
700

  
634 701
    def get_dependencies(self):
635 702
        yield self.category
636 703

  
......
654 721
        return data_source
655 722

  
656 723
    def get_json_query_url(self):
657
        url = self.data_source.get('value').strip()
658
        if Template.is_template_string(url):
659
            vars = get_publisher().substitutions.get_context_variables(mode='lazy')
660
            url = get_variadic_url(url, vars)
724
        url = self.get_variadic_url()
661 725
        if not url:
662 726
            return ''
663 727
        if '?' not in url:
......
721 785
    def get_geojson_url(self):
722 786
        assert self.type == 'geojson'
723 787
        url = self.data_source.get('value').strip()
724
        if Template.is_template_string(url):
725
            context = get_publisher().substitutions.get_context_variables(mode='lazy')
726
            new_url = get_variadic_url(url, context)
727
            if new_url != url:
728
                token_context = {'session_id': get_session().id, 'url': new_url, 'slug': self.slug}
729
                token, created = get_publisher().token_class.get_or_create(
730
                    type='autocomplete', context=token_context
731
                )
732
                if created:
733
                    token.store()
734
                return '/api/geojson/%s' % token.id
788
        new_url = self.get_variadic_url()
789
        if new_url != url:
790
            token_context = {'session_id': get_session().id, 'url': new_url, 'slug': self.slug}
791
            token, created = get_publisher().token_class.get_or_create(
792
                type='autocomplete', context=token_context
793
            )
794
            if created:
795
                token.store()
796
            return '/api/geojson/%s' % token.id
735 797
        return '/api/geojson/%s' % self.slug
736 798

  
737 799
    def get_geojson_data(self, force_url=None):
738 800
        if force_url:
739 801
            url = force_url
740 802
        else:
741
            url = self.data_source.get('value').strip()
742
            if Template.is_template_string(url):
743
                context = get_publisher().substitutions.get_context_variables(mode='lazy')
744
                url = get_variadic_url(url, context)
803
            url = self.get_variadic_url()
745 804

  
746 805
        request = get_request()
747 806
        if hasattr(request, 'datasources_cache') and url in request.datasources_cache:
......
782 841
        return data
783 842

  
784 843
    def get_value_by_id(self, param_name, param_value):
785
        url = self.data_source.get('value').strip()
786
        if Template.is_template_string(url):
787
            vars = get_publisher().substitutions.get_context_variables(mode='lazy')
788
            url = get_variadic_url(url, vars)
844
        url = self.get_variadic_url()
789 845

  
790 846
        if '?' not in url:
791 847
            url += '?'
wcs/templates/wcs/backoffice/data-source.html
18 18
<div class="section">
19 19
<h3>{% trans "Configuration" %}</h3>
20 20
<ul>
21
  <li>{% trans "Type of source:" %} {{ datasource.type_label }}</li>
22
  {% if datasource.data_source.type == 'json' or datasource.data_source.type == 'jsonp' or datasource.data_source.type == 'geojson' %}
23
  <li>{% trans "URL:" %} <a href="{{ url }}">{{ datasource.data_source.value }}</a></li>
24
  {% elif datasource.data_source.type == 'formula' %}
25
  <li>{% trans "Python Expression:" %} {{ datasource.data_source.value }}</li>
26
  {% elif datasource.data_source.type == 'wcs:users' %}
27
  {% spaceless %}
28
  <li>{% trans "Users with roles:" %}
29
      <ul>
30
          {% for role in roles %}
31
          {% if role.0 in datasource.users_included_roles %}
32
          <li>{{ role.1 }}</li>
33
          {% endif %}
21
  {% if datasource.agenda_ds %}
22
   {% with datasource.agenda_ds_origin as origin %}
23
   <li>{% trans "Type of source:" %} {% trans "Agenda data" %}{% if origin %} ({% trans "Copy of:" %} <a href="{{ origin.get_admin_url }}">{{ origin.name }}</a>){% endif %}</li>
24
    <li>{% trans "URL:" %} <a href="{{ url }}">{{ datasource.data_source.value }}</a></li>
25
    {% if datasource.qs_data %}
26
    <li>{% trans "Extra query string data:" %}
27
        <ul>
28
          {% for k, v in datasource.qs_data.items %}
29
            <li>{% blocktrans %}{{ k }}:{% endblocktrans %} {{ v }}</li>
34 30
          {% endfor %}
35
      </ul>
36
  </li>
37
  <li>{% trans "Users without roles:" %}
38
      <ul>
39
          {% for role in roles %}
40
          {% if role.0 in datasource.users_excluded_roles %}
41
          <li>{{ role.1 }}</li>
42
          {% endif %}
43
          {% endfor %}
44
      </ul>
45
  </li>
46
  <li>{% trans "Include disabled users:" %}
47
    {% if datasource.include_disabled_users %}
48
      {% trans "Yes" %}
49
    {% else %}
50
      {% trans "No" %}
31
        </ul>
32
      </li>
33
    {% endif %}
34
    {% endwith %}
35
  {% else %}
36
    <li>{% trans "Type of source:" %} {{ datasource.type_label }}</li>
37
    {% if datasource.data_source.type == 'json' or datasource.data_source.type == 'jsonp' or datasource.data_source.type == 'geojson' %}
38
    <li>{% trans "URL:" %} <a href="{{ url }}">{{ datasource.data_source.value }}</a></li>
39
    {% elif datasource.data_source.type == 'formula' %}
40
    <li>{% trans "Python Expression:" %} {{ datasource.data_source.value }}</li>
41
    {% elif datasource.data_source.type == 'wcs:users' %}
42
    {% spaceless %}
43
    <li>{% trans "Users with roles:" %}
44
        <ul>
45
            {% for role in roles %}
46
            {% if role.0 in datasource.users_included_roles %}
47
            <li>{{ role.1 }}</li>
48
            {% endif %}
49
            {% endfor %}
50
        </ul>
51
    </li>
52
    <li>{% trans "Users without roles:" %}
53
        <ul>
54
            {% for role in roles %}
55
            {% if role.0 in datasource.users_excluded_roles %}
56
            <li>{{ role.1 }}</li>
57
            {% endif %}
58
            {% endfor %}
59
        </ul>
60
    </li>
61
    <li>{% trans "Include disabled users:" %}
62
      {% if datasource.include_disabled_users %}
63
        {% trans "Yes" %}
64
      {% else %}
65
        {% trans "No" %}
66
      {% endif %}
67
    </li>
68
    {% endspaceless %}
51 69
    {% endif %}
52
  </li>
53
  {% endspaceless %}
54 70
  {% endif %}
55 71
  {% if datasource.cache_duration %}
56 72
  <li>{% trans "Cache Duration:" %} {{ datasource.humanized_cache_duration }}
57
-