Projet

Général

Profil

0002-forms-redisplay-page-when-datasource-is-unavailable-.patch

Benjamin Dauvergne, 12 novembre 2021 16:06

Télécharger (19,1 ko)

Voir les différences:

Subject: [PATCH 2/2] forms: redisplay page when datasource is unavailable
 (#56824)

 tests/form_pages/test_all.py  | 179 ++++++++++++++++++++++++++++++++++
 tests/form_pages/test_live.py |  39 ++++++++
 wcs/forms/root.py             |  95 +++++++++++++-----
 3 files changed, 287 insertions(+), 26 deletions(-)
tests/form_pages/test_all.py
2775 2775
        assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'}
2776 2776

  
2777 2777

  
2778
@pytest.mark.parametrize('fail_after_count_page', range(2, 8))
2779
@pytest.mark.parametrize('fail_after_count_validation', range(0, 2))
2780
@mock.patch('wcs.qommon.misc.urlopen')
2781
def test_form_item_data_source_error(
2782
    urlopen, pub, monkeypatch, fail_after_count_page, fail_after_count_validation
2783
):
2784
    data_source = NamedDataSource(name='foobar')
2785
    data_source.data_source = {'type': 'json', 'value': 'http://www.example.net/plop'}
2786
    data_source.id_parameter = 'id'
2787
    data_source.store()
2788

  
2789
    normal_get_structured_value = NamedDataSource.get_structured_value
2790

  
2791
    class failing_get_structured_value:
2792
        def __init__(self, fail_after_count):
2793
            self.fail_after_count = fail_after_count
2794
            self.count = 0
2795

  
2796
        def __call__(self, *args):
2797
            import inspect
2798

  
2799
            for frame in inspect.stack():
2800
                if frame.function in ['store_display_value', 'store_structured_value']:
2801
                    count = self.count
2802
                    self.count += 1
2803
                    if count >= self.fail_after_count:
2804
                        return None
2805
            return normal_get_structured_value(*args)
2806

  
2807
        @property
2808
        def method(self):
2809
            def f(*args):
2810
                return self(*args)
2811

  
2812
            return f
2813

  
2814
    data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
2815
    urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
2816

  
2817
    formdef = create_formdef()
2818
    formdef.fields = [
2819
        fields.PageField(id='0', label='1st page', type='page'),
2820
        fields.ItemField(id='1', label='string', data_source={'type': 'foobar'}),
2821
    ]
2822

  
2823
    formdef.store()
2824
    resp = get_app(pub).get('/test/')
2825
    formdef.data_class().wipe()
2826
    resp.forms[0]['f1'] = '1'
2827

  
2828
    # fail in get_structured_value
2829
    monkeypatch.setattr(
2830
        NamedDataSource, 'get_structured_value', failing_get_structured_value(fail_after_count_page).method
2831
    )
2832
    resp = resp.forms[0].submit('submit')
2833
    assert 'Technical error, please try again' in resp.text
2834

  
2835
    # fix transient failure
2836
    monkeypatch.setattr(NamedDataSource, 'get_structured_value', normal_get_structured_value)
2837
    resp = resp.forms[0].submit('submit')
2838
    assert 'Check values then click submit.' in resp.text
2839

  
2840
    # fail in get_structured_value
2841
    monkeypatch.setattr(
2842
        NamedDataSource,
2843
        'get_structured_value',
2844
        failing_get_structured_value(fail_after_count_validation).method,
2845
    )
2846
    resp = resp.forms[0].submit('submit')
2847
    assert 'Technical error, please try again' in resp.text
2848

  
2849
    # fix transient failure
2850
    monkeypatch.setattr(NamedDataSource, 'get_structured_value', normal_get_structured_value)
2851
    resp = resp.forms[0].submit('submit')
2852
    assert resp.status_int == 302
2853
    resp = resp.follow()
2854
    assert 'The form has been recorded' in resp.text
2855
    assert formdef.data_class().count() == 1
2856
    data_id = formdef.data_class().select()[0].id
2857
    return formdef.data_class().get(data_id).data
2858

  
2859

  
2860
@pytest.mark.parametrize('fail_after_count_page', range(2, 8))
2861
@mock.patch('wcs.qommon.misc.urlopen')
2862
def test_form_item_data_source_error_no_confirmation(urlopen, pub, monkeypatch, fail_after_count_page):
2863
    data_source = NamedDataSource(name='foobar')
2864
    data_source.data_source = {'type': 'json', 'value': 'http://www.example.net/plop'}
2865
    data_source.id_parameter = 'id'
2866
    data_source.store()
2867

  
2868
    normal_get_structured_value = NamedDataSource.get_structured_value
2869

  
2870
    class failing_get_structured_value:
2871
        def __init__(self, fail_after_count):
2872
            self.fail_after_count = fail_after_count
2873
            self.count = 0
2874

  
2875
        def __call__(self, *args):
2876
            import inspect
2877

  
2878
            for frame in inspect.stack():
2879
                if frame.function in ['store_display_value', 'store_structured_value']:
2880
                    count = self.count
2881
                    self.count += 1
2882
                    if count >= self.fail_after_count:
2883
                        return None
2884
            return normal_get_structured_value(*args)
2885

  
2886
        @property
2887
        def method(self):
2888
            def f(*args):
2889
                return self(*args)
2890

  
2891
            return f
2892

  
2893
    data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
2894
    urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
2895

  
2896
    formdef = create_formdef()
2897
    formdef.confirmation = False
2898
    formdef.fields = [
2899
        fields.PageField(id='0', label='1st page', type='page'),
2900
        fields.ItemField(id='1', label='string', data_source={'type': 'foobar'}),
2901
    ]
2902

  
2903
    formdef.store()
2904
    resp = get_app(pub).get('/test/')
2905
    formdef.data_class().wipe()
2906
    resp.forms[0]['f1'] = '1'
2907

  
2908
    # fail in get_structured_value
2909
    monkeypatch.setattr(
2910
        NamedDataSource, 'get_structured_value', failing_get_structured_value(fail_after_count_page).method
2911
    )
2912
    resp = resp.forms[0].submit('submit')
2913
    assert 'Technical error, please try again' in resp.text
2914

  
2915
    # fix transient failure
2916
    monkeypatch.setattr(NamedDataSource, 'get_structured_value', normal_get_structured_value)
2917
    resp = resp.forms[0].submit('submit')
2918
    assert resp.status_int == 302
2919
    resp = resp.follow()
2920
    assert 'The form has been recorded' in resp.text
2921
    assert formdef.data_class().count() == 1
2922
    data_id = formdef.data_class().select()[0].id
2923
    return formdef.data_class().get(data_id).data
2924

  
2925

  
2778 2926
def test_form_items_data_source_field_submit(pub):
2779 2927
    def submit_items_data_source_field(ds):
2780 2928
        formdef = create_formdef()
......
4340 4488
    assert resp.forms[1]['f1'].value == 'foobar'  # not a valid email
4341 4489

  
4342 4490

  
4491
@mock.patch('wcs.qommon.misc.urlopen')
4492
def test_form_autosave_item_field_data_source_error(urlopen, pub):
4493
    ds = {'type': 'json', 'value': 'http://www.example.net/plop'}
4494
    formdef = create_formdef()
4495
    formdef.fields = [
4496
        fields.ItemField(id='1', label='string', data_source=ds),
4497
    ]
4498
    formdef.enable_tracking_codes = True
4499
    formdef.store()
4500

  
4501
    data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
4502
    urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
4503

  
4504
    formdef.data_class().wipe()
4505
    app = get_app(pub)
4506
    resp = app.get('/test/')
4507
    resp.form['f1'] = '1'  # not a valid email
4508

  
4509
    # make the ds fails
4510
    with mock.patch.object(NamedDataSource, 'get_structured_value', lambda *args: None):
4511

  
4512
        autosave_resp = app.post('/test/autosave', params=resp.form.submit_fields())
4513
    assert autosave_resp.json == {
4514
        'reason': 'form deserialization failed: a datasource is unavailable',
4515
        'result': 'error',
4516
    }
4517

  
4518
    autosave_resp = app.post('/test/autosave', params=resp.form.submit_fields())
4519
    assert autosave_resp.json['result'] == 'success'
4520

  
4521

  
4343 4522
def test_form_autosave_with_parameterized_datasource(pub):
4344 4523
    formdef = create_formdef()
4345 4524
    formdef.fields = [
tests/form_pages/test_live.py
1264 1264
    resp = resp.form.submit('submit')  # -> 3rd page
1265 1265
    resp = resp.form.submit('previous')  # -> 2nd page
1266 1266
    assert 'live value: attr1' in resp
1267

  
1268

  
1269
@mock.patch('wcs.qommon.misc.urlopen')
1270
def test_field_live_condition_data_source_error(urlopen, pub):
1271
    ds = {'type': 'json', 'value': 'http://www.example.net/plop'}
1272

  
1273
    FormDef.wipe()
1274
    formdef = FormDef()
1275
    formdef.name = 'Foo'
1276
    formdef.fields = [
1277
        fields.ItemField(id='1', label='string', data_source=ds, varname='foo'),
1278
        fields.StringField(
1279
            id='2',
1280
            label='bar',
1281
            required=True,
1282
            varname='bar',
1283
            condition={'type': 'django', 'value': 'form_var_foo_x == "bye"'},
1284
        ),
1285
    ]
1286
    formdef.store()
1287

  
1288
    data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux', 'x': 'bye'}]}
1289
    urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
1290

  
1291
    app = get_app(pub)
1292
    resp = app.get('/foo/')
1293
    assert 'f1' in resp.form.fields
1294
    assert 'f2' not in resp.form.fields
1295
    resp.form['f1'] = '2'
1296

  
1297
    with mock.patch.object(NamedDataSource, 'get_structured_value', lambda *args: None):
1298
        live_resp = app.post('/foo/live', params=resp.form.submit_fields())
1299
        assert live_resp.json == {
1300
            'reason': 'form deserialization failed: a datasource is unavailable',
1301
            'result': 'error',
1302
        }
1303

  
1304
    live_resp = app.post('/foo/live', params=resp.form.submit_fields())
1305
    assert live_resp.json == {'result': {'1': {'visible': True}, '2': {'visible': True}}}
wcs/forms/root.py
35 35
from quixote.util import randbytes
36 36

  
37 37
from wcs.categories import Category
38
from wcs.fields import SetValueError
38 39
from wcs.formdata import Evolution, FormData
39 40
from wcs.formdef import FormDef
40 41
from wcs.forms.common import FormStatusPage, FormTemplateMixin
......
1061 1062
                # reset locked data with newly submitted values, this allows
1062 1063
                # for templates referencing fields from the sampe page.
1063 1064
                self.reset_locked_data(form)
1064
                data = self.formdef.get_data(form)
1065
                try:
1066
                    data = self.formdef.get_data(form, raise_on_error=True)
1067
                except SetValueError:
1068
                    return self.page(
1069
                        page,
1070
                        page_change=False,
1071
                        page_error_messages=[_('Technical error, please try again')],
1072
                        transient_formdata=transient_formdata,
1073
                    )
1065 1074
                computed_data = self.handle_computed_fields(magictoken, submitted_fields)
1066 1075

  
1067 1076
            form_data.update(data)
......
1082 1091
                # a new ConditionsVars will get added to the substitution
1083 1092
                # variables.
1084 1093
                form_data = copy.copy(session.get_by_magictoken(magictoken, {}))
1085
                data = self.formdef.get_data(form)
1094
                try:
1095
                    data = self.formdef.get_data(form, raise_on_error=True)
1096
                except SetValueError:
1097
                    return self.page(
1098
                        page,
1099
                        page_change=False,
1100
                        page_error_messages=[_('Technical error, please try again')],
1101
                        transient_formdata=transient_formdata,
1102
                    )
1086 1103
                form_data.update(data)
1087 1104
                form_data.update(computed_data)
1088 1105
                for i, post_condition in enumerate(post_conditions):
......
1120 1137

  
1121 1138
            form_data = session.get_by_magictoken(magictoken, {})
1122 1139
            with get_publisher().substitutions.temporary_feed(transient_formdata, force_mode='lazy'):
1123
                data = self.formdef.get_data(form)
1140
                try:
1141
                    data = self.formdef.get_data(form, raise_on_error=True)
1142
                except SetValueError:
1143
                    return self.page(
1144
                        page,
1145
                        page_change=False,
1146
                        page_error_messages=[_('Technical error, please try again')],
1147
                        transient_formdata=transient_formdata,
1148
                    )
1124 1149
            form_data.update(data)
1125 1150
            form_data.update(computed_data)
1126 1151

  
......
1161 1186
                            v = field.convert_value_to_str(v)
1162 1187
                        req.form['f%s' % k] = v
1163 1188
                if self.edit_mode:
1164
                    form = self.create_view_form(form_data, use_tokens=False)
1165
                    return self.submitted_existing(form)
1189
                    view_form = self.create_view_form(form_data, use_tokens=False)
1190
                    try:
1191
                        return self.submitted_existing(view_form)
1192
                    except SetValueError:
1193
                        return self.page(
1194
                            page,
1195
                            page_change=False,
1196
                            page_error_messages=[_('Technical error, please try again')],
1197
                            transient_formdata=transient_formdata,
1198
                        )
1166 1199
                if self.has_confirmation_page():
1167 1200
                    return self.validating(form_data)
1168 1201
                else:
......
1183 1216
        self.reset_locked_data(form)
1184 1217
        if step == 1:
1185 1218
            form.add_submit('previous')
1186
            magictoken = form.get_widget('magictoken').parse()
1187

  
1188 1219
            if form.get_submit() == 'previous':
1189 1220
                return self.previous_page(len(self.pages), magictoken)
1190
            magictoken = form.get_widget('magictoken').parse()
1191
            form_data = session.get_by_magictoken(magictoken, {})
1192
            data = self.formdef.get_data(form)
1193
            form_data.update(data)
1194
            session.add_magictoken(magictoken, form_data)
1195

  
1196 1221
            step = 2  # so it will flow to submit
1197
            form = Form()
1198
            form.add_hidden('step', '-1')
1199
            form.add_hidden('page', '-1')
1200
            form.add_hidden('magictoken', '-1')
1201
            form.add_submit('cancel')
1202 1222

  
1203 1223
        if step == 2:
1204
            form.add_submit('previous')
1224
            if 'previous' not in form:
1225
                form.add_submit('previous')
1205 1226
            magictoken = form.get_widget('magictoken').parse()
1206 1227
            self.feed_current_data(magictoken)
1207 1228
            form_data = session.get_by_magictoken(magictoken, {})
......
1228 1249
                # submitted a second time
1229 1250
                return template.error_page(_('This form has already been submitted.'))
1230 1251

  
1231
            return self.submitted(form, existing_formdata)
1252
            try:
1253
                return self.submitted(form, existing_formdata)
1254
            except SetValueError:
1255
                if get_request().form.get('step') == '2':
1256
                    # submit came from the validation page
1257
                    return self.validating(
1258
                        form_data, page_error_messages=[_('Technical error, please try again')]
1259
                    )
1260
                else:
1261
                    # last page
1262
                    return self.page(
1263
                        page,
1264
                        page_change=False,
1265
                        page_error_messages=[_('Technical error, please try again')],
1266
                        transient_formdata=transient_formdata,
1267
                    )
1232 1268

  
1233 1269
    def reset_locked_data(self, form):
1234 1270
        # reset locked fields, making sure the user cannot alter them.
......
1338 1374
            # on webservice results, there can be (temporary?) inconsistencies.
1339 1375
            return result_error('ouf ot range page_no')
1340 1376
        form = self.create_form(page=page)
1341
        data = self.formdef.get_data(form)
1377
        try:
1378
            data = self.formdef.get_data(form, raise_on_error=True)
1379
        except SetValueError as e:
1380
            return result_error('form deserialization failed: %s' % e)
1342 1381
        if not data:
1343 1382
            return result_error('nothing to save')
1344 1383

  
......
1428 1467
        displayed_fields = []
1429 1468
        with get_publisher().substitutions.temporary_feed(formdata, force_mode='lazy'):
1430 1469
            form = self.create_form(page=page, displayed_fields=displayed_fields, transient_formdata=formdata)
1431
        formdata.data.update(self.formdef.get_data(form))
1470
        try:
1471
            formdata.data.update(self.formdef.get_data(form, raise_on_error=True))
1472
        except SetValueError as e:
1473
            return result_error('form deserialization failed: %s' % e)
1432 1474
        return FormStatusPage.live_process_fields(form, formdata, displayed_fields)
1433 1475

  
1434 1476
    def clean_submission_context(self):
......
1443 1485
            filled = self.get_current_draft() or self.formdef.data_class()()
1444 1486
            filled.just_created()
1445 1487

  
1446
        filled.data = self.formdef.get_data(form)
1488
        filled.data = self.formdef.get_data(form, raise_on_error=True)
1447 1489
        magictoken = get_request().form['magictoken']
1448 1490
        computed_values = get_session().get_by_magictoken('%s-computed' % magictoken, {})
1449 1491
        filled.data.update(computed_values)
......
1493 1535
        code.formdata = formdata  # this will .store() the code
1494 1536

  
1495 1537
    def submitted_existing(self, form):
1496
        new_data = self.formdef.get_data(form)
1538
        new_data = self.formdef.get_data(form, raise_on_error=True)
1497 1539
        magictoken = get_request().form['magictoken']
1498 1540
        computed_values = get_session().get_by_magictoken('%s-computed' % magictoken, {})
1499 1541
        new_data.update(computed_values)
......
1565 1607
                return thumbnail
1566 1608
        return get_session().get_tempfile_content(t).get_file_pointer().read()
1567 1609

  
1568
    def validating(self, data):
1610
    def validating(self, data, page_error_messages=None):
1569 1611
        self.on_validation_page = True
1570 1612
        get_request().view_name = 'validation'
1571 1613
        self.html_top(self.formdef.name)
......
1573 1615
        # over in rendering.
1574 1616
        get_request().environ['REQUEST_METHOD'] = 'GET'
1575 1617
        form = self.create_view_form(data)
1618
        if page_error_messages:
1619
            form.add_global_errors(page_error_messages)
1576 1620
        token_widget = form.get_widget(form.TOKEN_NAME)
1577 1621
        token_widget._parsed = True
1578 1622
        if self.formdef.has_captcha and not (get_session().get_user() or get_session().won_captcha):
......
1604 1648
        }
1605 1649
        if self.has_draft_support() and data:
1606 1650
            context['tracking_code_box'] = lambda: self.tracking_code_box(data, magictoken)
1607

  
1608 1651
        return template.QommonTemplateResponse(
1609 1652
            templates=list(self.get_formdef_template_variants(self.validation_templates)), context=context
1610 1653
        )
1611
-