Projet

Général

Profil

0002-forms-redisplay-page-when-datasource-is-unavailable-.patch

Benjamin Dauvergne, 11 novembre 2021 09:58

Télécharger (19,7 ko)

Voir les différences:

Subject: [PATCH 2/2] forms: redisplay page when datasource is unavailable
 (#56824)

 tests/form_pages/test_all.py  | 179 ++++++++++++++++++++++++++++++++++
 tests/form_pages/test_live.py |  39 ++++++++
 wcs/forms/root.py             |  96 +++++++++++++-----
 3 files changed, 288 insertions(+), 26 deletions(-)
tests/form_pages/test_all.py
2775 2775
        assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'}
2776 2776

  
2777 2777

  
2778
@pytest.mark.parametrize('fail_after_count_page', range(2, 8))
2779
@pytest.mark.parametrize('fail_after_count_validation', range(0, 2))
2780
@mock.patch('wcs.qommon.misc.urlopen')
2781
def test_form_item_data_source_error(
2782
    urlopen, pub, monkeypatch, fail_after_count_page, fail_after_count_validation
2783
):
2784
    data_source = NamedDataSource(name='foobar')
2785
    data_source.data_source = {'type': 'json', 'value': 'http://www.example.net/plop'}
2786
    data_source.id_parameter = 'id'
2787
    data_source.store()
2788

  
2789
    normal_get_structured_value = NamedDataSource.get_structured_value
2790

  
2791
    class failing_get_structured_value:
2792
        def __init__(self, fail_after_count):
2793
            self.fail_after_count = fail_after_count
2794
            self.count = 0
2795

  
2796
        def __call__(self, *args):
2797
            import inspect
2798

  
2799
            for frame in inspect.stack():
2800
                if frame.function in ['store_display_value', 'store_structured_value']:
2801
                    count = self.count
2802
                    self.count += 1
2803
                    if count >= self.fail_after_count:
2804
                        return None
2805
            return normal_get_structured_value(*args)
2806

  
2807
        @property
2808
        def method(self):
2809
            def f(*args):
2810
                return self(*args)
2811

  
2812
            return f
2813

  
2814
    data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
2815
    urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
2816

  
2817
    formdef = create_formdef()
2818
    formdef.fields = [
2819
        fields.PageField(id='0', label='1st page', type='page'),
2820
        fields.ItemField(id='1', label='string', data_source={'type': 'foobar'}),
2821
    ]
2822

  
2823
    formdef.store()
2824
    resp = get_app(pub).get('/test/')
2825
    formdef.data_class().wipe()
2826
    resp.forms[0]['f1'] = '1'
2827

  
2828
    # fail in get_structured_value
2829
    monkeypatch.setattr(
2830
        NamedDataSource, 'get_structured_value', failing_get_structured_value(fail_after_count_page).method
2831
    )
2832
    resp = resp.forms[0].submit('submit')
2833
    assert 'Technical error, please try again' in resp.text
2834

  
2835
    # fix transient failure
2836
    monkeypatch.setattr(NamedDataSource, 'get_structured_value', normal_get_structured_value)
2837
    resp = resp.forms[0].submit('submit')
2838
    assert 'Check values then click submit.' in resp.text
2839

  
2840
    # fail in get_structured_value
2841
    monkeypatch.setattr(
2842
        NamedDataSource,
2843
        'get_structured_value',
2844
        failing_get_structured_value(fail_after_count_validation).method,
2845
    )
2846
    resp = resp.forms[0].submit('submit')
2847
    assert 'Technical error, please try again' in resp.text
2848

  
2849
    # fix transient failure
2850
    monkeypatch.setattr(NamedDataSource, 'get_structured_value', normal_get_structured_value)
2851
    resp = resp.forms[0].submit('submit')
2852
    assert resp.status_int == 302
2853
    resp = resp.follow()
2854
    assert 'The form has been recorded' in resp.text
2855
    assert formdef.data_class().count() == 1
2856
    data_id = formdef.data_class().select()[0].id
2857
    return formdef.data_class().get(data_id).data
2858

  
2859

  
2860
@pytest.mark.parametrize('fail_after_count_page', range(2, 8))
2861
@mock.patch('wcs.qommon.misc.urlopen')
2862
def test_form_item_data_source_error_no_confirmation(urlopen, pub, monkeypatch, fail_after_count_page):
2863
    data_source = NamedDataSource(name='foobar')
2864
    data_source.data_source = {'type': 'json', 'value': 'http://www.example.net/plop'}
2865
    data_source.id_parameter = 'id'
2866
    data_source.store()
2867

  
2868
    normal_get_structured_value = NamedDataSource.get_structured_value
2869

  
2870
    class failing_get_structured_value:
2871
        def __init__(self, fail_after_count):
2872
            self.fail_after_count = fail_after_count
2873
            self.count = 0
2874

  
2875
        def __call__(self, *args):
2876
            import inspect
2877

  
2878
            for frame in inspect.stack():
2879
                if frame.function in ['store_display_value', 'store_structured_value']:
2880
                    count = self.count
2881
                    self.count += 1
2882
                    if count >= self.fail_after_count:
2883
                        return None
2884
            return normal_get_structured_value(*args)
2885

  
2886
        @property
2887
        def method(self):
2888
            def f(*args):
2889
                return self(*args)
2890

  
2891
            return f
2892

  
2893
    data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
2894
    urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
2895

  
2896
    formdef = create_formdef()
2897
    formdef.confirmation = False
2898
    formdef.fields = [
2899
        fields.PageField(id='0', label='1st page', type='page'),
2900
        fields.ItemField(id='1', label='string', data_source={'type': 'foobar'}),
2901
    ]
2902

  
2903
    formdef.store()
2904
    resp = get_app(pub).get('/test/')
2905
    formdef.data_class().wipe()
2906
    resp.forms[0]['f1'] = '1'
2907

  
2908
    # fail in get_structured_value
2909
    monkeypatch.setattr(
2910
        NamedDataSource, 'get_structured_value', failing_get_structured_value(fail_after_count_page).method
2911
    )
2912
    resp = resp.forms[0].submit('submit')
2913
    assert 'Technical error, please try again' in resp.text
2914

  
2915
    # fix transient failure
2916
    monkeypatch.setattr(NamedDataSource, 'get_structured_value', normal_get_structured_value)
2917
    resp = resp.forms[0].submit('submit')
2918
    assert resp.status_int == 302
2919
    resp = resp.follow()
2920
    assert 'The form has been recorded' in resp.text
2921
    assert formdef.data_class().count() == 1
2922
    data_id = formdef.data_class().select()[0].id
2923
    return formdef.data_class().get(data_id).data
2924

  
2925

  
2778 2926
def test_form_items_data_source_field_submit(pub):
2779 2927
    def submit_items_data_source_field(ds):
2780 2928
        formdef = create_formdef()
......
4340 4488
    assert resp.forms[1]['f1'].value == 'foobar'  # not a valid email
4341 4489

  
4342 4490

  
4491
@mock.patch('wcs.qommon.misc.urlopen')
4492
def test_form_autosave_item_field_data_source_error(urlopen, pub):
4493
    ds = {'type': 'json', 'value': 'http://www.example.net/plop'}
4494
    formdef = create_formdef()
4495
    formdef.fields = [
4496
        fields.ItemField(id='1', label='string', data_source=ds),
4497
    ]
4498
    formdef.enable_tracking_codes = True
4499
    formdef.store()
4500

  
4501
    data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
4502
    urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
4503

  
4504
    formdef.data_class().wipe()
4505
    app = get_app(pub)
4506
    resp = app.get('/test/')
4507
    resp.form['f1'] = '1'  # not a valid email
4508

  
4509
    # make the ds fails
4510
    with mock.patch.object(NamedDataSource, 'get_structured_value', lambda *args: None):
4511

  
4512
        autosave_resp = app.post('/test/autosave', params=resp.form.submit_fields())
4513
    assert autosave_resp.json == {
4514
        'reason': 'form deserialization failed: a datasource is unavailable',
4515
        'result': 'error',
4516
    }
4517

  
4518
    autosave_resp = app.post('/test/autosave', params=resp.form.submit_fields())
4519
    assert autosave_resp.json['result'] == 'success'
4520

  
4521

  
4343 4522
def test_form_autosave_with_parameterized_datasource(pub):
4344 4523
    formdef = create_formdef()
4345 4524
    formdef.fields = [
tests/form_pages/test_live.py
1264 1264
    resp = resp.form.submit('submit')  # -> 3rd page
1265 1265
    resp = resp.form.submit('previous')  # -> 2nd page
1266 1266
    assert 'live value: attr1' in resp
1267

  
1268

  
1269
@mock.patch('wcs.qommon.misc.urlopen')
1270
def test_field_live_condition_data_source_error(urlopen, pub):
1271
    ds = {'type': 'json', 'value': 'http://www.example.net/plop'}
1272

  
1273
    FormDef.wipe()
1274
    formdef = FormDef()
1275
    formdef.name = 'Foo'
1276
    formdef.fields = [
1277
        fields.ItemField(id='1', label='string', data_source=ds, varname='foo'),
1278
        fields.StringField(
1279
            id='2',
1280
            label='bar',
1281
            required=True,
1282
            varname='bar',
1283
            condition={'type': 'django', 'value': 'form_var_foo_x == "bye"'},
1284
        ),
1285
    ]
1286
    formdef.store()
1287

  
1288
    data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux', 'x': 'bye'}]}
1289
    urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
1290

  
1291
    app = get_app(pub)
1292
    resp = app.get('/foo/')
1293
    assert 'f1' in resp.form.fields
1294
    assert 'f2' not in resp.form.fields
1295
    resp.form['f1'] = '2'
1296

  
1297
    with mock.patch.object(NamedDataSource, 'get_structured_value', lambda *args: None):
1298
        live_resp = app.post('/foo/live', params=resp.form.submit_fields())
1299
        assert live_resp.json == {
1300
            'reason': 'form deserialization failed: a datasource is unavailable',
1301
            'result': 'error',
1302
        }
1303

  
1304
    live_resp = app.post('/foo/live', params=resp.form.submit_fields())
1305
    assert live_resp.json == {'result': {'1': {'visible': True}, '2': {'visible': True}}}
wcs/forms/root.py
35 35
from quixote.util import randbytes
36 36

  
37 37
from wcs.categories import Category
38
from wcs.fields import SetValueError
38 39
from wcs.formdata import Evolution, FormData
39 40
from wcs.formdef import FormDef
40 41
from wcs.forms.common import FormStatusPage, FormTemplateMixin
......
543 544
            if had_prefill:
544 545
                # include prefilled data
545 546
                transient_formdata = self.get_transient_formdata(magictoken)
547
                # XXX: Should we handle datasources error here ?
546 548
                transient_formdata.data.update(self.formdef.get_data(form))
547 549
                if self.has_draft_support() and not (req.is_from_application() or req.is_from_bot()):
548 550
                    # save to get prefilling data in database
......
1061 1063
                # reset locked data with newly submitted values, this allows
1062 1064
                # for templates referencing fields from the sampe page.
1063 1065
                self.reset_locked_data(form)
1064
                data = self.formdef.get_data(form)
1066
                try:
1067
                    data = self.formdef.get_data(form, raise_on_error=True)
1068
                except SetValueError:
1069
                    return self.page(
1070
                        page,
1071
                        page_change=False,
1072
                        page_error_messages=[_('Technical error, please try again')],
1073
                        transient_formdata=transient_formdata,
1074
                    )
1065 1075
                computed_data = self.handle_computed_fields(magictoken, submitted_fields)
1066 1076

  
1067 1077
            form_data.update(data)
......
1082 1092
                # a new ConditionsVars will get added to the substitution
1083 1093
                # variables.
1084 1094
                form_data = copy.copy(session.get_by_magictoken(magictoken, {}))
1085
                data = self.formdef.get_data(form)
1095
                try:
1096
                    data = self.formdef.get_data(form, raise_on_error=True)
1097
                except SetValueError:
1098
                    return self.page(
1099
                        page,
1100
                        page_change=False,
1101
                        page_error_messages=[_('Technical error, please try again')],
1102
                        transient_formdata=transient_formdata,
1103
                    )
1086 1104
                form_data.update(data)
1087 1105
                form_data.update(computed_data)
1088 1106
                for i, post_condition in enumerate(post_conditions):
......
1120 1138

  
1121 1139
            form_data = session.get_by_magictoken(magictoken, {})
1122 1140
            with get_publisher().substitutions.temporary_feed(transient_formdata, force_mode='lazy'):
1123
                data = self.formdef.get_data(form)
1141
                try:
1142
                    data = self.formdef.get_data(form, raise_on_error=True)
1143
                except SetValueError:
1144
                    return self.page(
1145
                        page,
1146
                        page_change=False,
1147
                        page_error_messages=[_('Technical error, please try again')],
1148
                        transient_formdata=transient_formdata,
1149
                    )
1124 1150
            form_data.update(data)
1125 1151
            form_data.update(computed_data)
1126 1152

  
......
1161 1187
                            v = field.convert_value_to_str(v)
1162 1188
                        req.form['f%s' % k] = v
1163 1189
                if self.edit_mode:
1164
                    form = self.create_view_form(form_data, use_tokens=False)
1165
                    return self.submitted_existing(form)
1190
                    view_form = self.create_view_form(form_data, use_tokens=False)
1191
                    try:
1192
                        return self.submitted_existing(view_form)
1193
                    except SetValueError:
1194
                        return self.page(
1195
                            page,
1196
                            page_change=False,
1197
                            page_error_messages=[_('Technical error, please try again')],
1198
                            transient_formdata=transient_formdata,
1199
                        )
1166 1200
                if self.has_confirmation_page():
1167 1201
                    return self.validating(form_data)
1168 1202
                else:
......
1183 1217
        self.reset_locked_data(form)
1184 1218
        if step == 1:
1185 1219
            form.add_submit('previous')
1186
            magictoken = form.get_widget('magictoken').parse()
1187

  
1188 1220
            if form.get_submit() == 'previous':
1189 1221
                return self.previous_page(len(self.pages), magictoken)
1190
            magictoken = form.get_widget('magictoken').parse()
1191
            form_data = session.get_by_magictoken(magictoken, {})
1192
            data = self.formdef.get_data(form)
1193
            form_data.update(data)
1194
            session.add_magictoken(magictoken, form_data)
1195

  
1196 1222
            step = 2  # so it will flow to submit
1197
            form = Form()
1198
            form.add_hidden('step', '-1')
1199
            form.add_hidden('page', '-1')
1200
            form.add_hidden('magictoken', '-1')
1201
            form.add_submit('cancel')
1202 1223

  
1203 1224
        if step == 2:
1204
            form.add_submit('previous')
1225
            if 'previous' not in form:
1226
                form.add_submit('previous')
1205 1227
            magictoken = form.get_widget('magictoken').parse()
1206 1228
            self.feed_current_data(magictoken)
1207 1229
            form_data = session.get_by_magictoken(magictoken, {})
......
1228 1250
                # submitted a second time
1229 1251
                return template.error_page(_('This form has already been submitted.'))
1230 1252

  
1231
            return self.submitted(form, existing_formdata)
1253
            try:
1254
                return self.submitted(form, existing_formdata)
1255
            except SetValueError:
1256
                if get_request().form.get('step') == '2':
1257
                    # submit came from the validation page
1258
                    return self.validating(
1259
                        form_data, page_error_messages=[_('Technical error, please try again')]
1260
                    )
1261
                else:
1262
                    # last page
1263
                    return self.page(
1264
                        page,
1265
                        page_change=False,
1266
                        page_error_messages=[_('Technical error, please try again')],
1267
                        transient_formdata=transient_formdata,
1268
                    )
1232 1269

  
1233 1270
    def reset_locked_data(self, form):
1234 1271
        # reset locked fields, making sure the user cannot alter them.
......
1338 1375
            # on webservice results, there can be (temporary?) inconsistencies.
1339 1376
            return result_error('ouf ot range page_no')
1340 1377
        form = self.create_form(page=page)
1341
        data = self.formdef.get_data(form)
1378
        try:
1379
            data = self.formdef.get_data(form, raise_on_error=True)
1380
        except SetValueError as e:
1381
            return result_error('form deserialization failed: %s' % e)
1342 1382
        if not data:
1343 1383
            return result_error('nothing to save')
1344 1384

  
......
1428 1468
        displayed_fields = []
1429 1469
        with get_publisher().substitutions.temporary_feed(formdata, force_mode='lazy'):
1430 1470
            form = self.create_form(page=page, displayed_fields=displayed_fields, transient_formdata=formdata)
1431
        formdata.data.update(self.formdef.get_data(form))
1471
        try:
1472
            formdata.data.update(self.formdef.get_data(form, raise_on_error=True))
1473
        except SetValueError as e:
1474
            return result_error('form deserialization failed: %s' % e)
1432 1475
        return FormStatusPage.live_process_fields(form, formdata, displayed_fields)
1433 1476

  
1434 1477
    def clean_submission_context(self):
......
1443 1486
            filled = self.get_current_draft() or self.formdef.data_class()()
1444 1487
            filled.just_created()
1445 1488

  
1446
        filled.data = self.formdef.get_data(form)
1489
        filled.data = self.formdef.get_data(form, raise_on_error=True)
1447 1490
        magictoken = get_request().form['magictoken']
1448 1491
        computed_values = get_session().get_by_magictoken('%s-computed' % magictoken, {})
1449 1492
        filled.data.update(computed_values)
......
1493 1536
        code.formdata = formdata  # this will .store() the code
1494 1537

  
1495 1538
    def submitted_existing(self, form):
1496
        new_data = self.formdef.get_data(form)
1539
        new_data = self.formdef.get_data(form, raise_on_error=True)
1497 1540
        magictoken = get_request().form['magictoken']
1498 1541
        computed_values = get_session().get_by_magictoken('%s-computed' % magictoken, {})
1499 1542
        new_data.update(computed_values)
......
1565 1608
                return thumbnail
1566 1609
        return get_session().get_tempfile_content(t).get_file_pointer().read()
1567 1610

  
1568
    def validating(self, data):
1611
    def validating(self, data, page_error_messages=None):
1569 1612
        self.on_validation_page = True
1570 1613
        get_request().view_name = 'validation'
1571 1614
        self.html_top(self.formdef.name)
......
1573 1616
        # over in rendering.
1574 1617
        get_request().environ['REQUEST_METHOD'] = 'GET'
1575 1618
        form = self.create_view_form(data)
1619
        if page_error_messages:
1620
            form.add_global_errors(page_error_messages)
1576 1621
        token_widget = form.get_widget(form.TOKEN_NAME)
1577 1622
        token_widget._parsed = True
1578 1623
        if self.formdef.has_captcha and not (get_session().get_user() or get_session().won_captcha):
......
1604 1649
        }
1605 1650
        if self.has_draft_support() and data:
1606 1651
            context['tracking_code_box'] = lambda: self.tracking_code_box(data, magictoken)
1607

  
1608 1652
        return template.QommonTemplateResponse(
1609 1653
            templates=list(self.get_formdef_template_variants(self.validation_templates)), context=context
1610 1654
        )
1611
-