Projet

Général

Profil

0002-forms-redisplay-page-when-datasource-is-unavailable-.patch

Benjamin Dauvergne, 09 novembre 2021 23:23

Télécharger (16,8 ko)

Voir les différences:

Subject: [PATCH 2/2] forms: redisplay page when datasource is unavailable
 (#56824)

 tests/form_pages/test_all.py  | 112 ++++++++++++++++++++++++++++++++++
 tests/form_pages/test_live.py |  39 ++++++++++++
 wcs/forms/root.py             |  90 +++++++++++++++++++++++----
 3 files changed, 228 insertions(+), 13 deletions(-)
tests/form_pages/test_all.py
2775 2775
        assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'}
2776 2776

  
2777 2777

  
2778
@pytest.mark.parametrize('fail_after_count_page', range(2, 8))
2779
@pytest.mark.parametrize('fail_after_count_validation', range(0, 2))
2780
@mock.patch('wcs.qommon.misc.urlopen')
2781
def test_form_item_data_source_error(
2782
    urlopen, pub, monkeypatch, fail_after_count_page, fail_after_count_validation
2783
):
2784
    data_source = NamedDataSource(name='foobar')
2785
    data_source.data_source = {'type': 'json', 'value': 'http://www.example.net/plop'}
2786
    data_source.id_parameter = 'id'
2787
    data_source.store()
2788

  
2789
    normal_get_structured_value = NamedDataSource.get_structured_value
2790

  
2791
    count = [0]
2792

  
2793
    class failing_get_structured_value:
2794
        def __init__(self, fail_after_count):
2795
            self.fail_after_count = fail_after_count
2796
            self.count = 0
2797

  
2798
        def __call__(self, *args):
2799
            import inspect
2800

  
2801
            for frame in inspect.stack():
2802
                if frame.function in ['store_display_value', 'store_structured_value']:
2803
                    count = self.count
2804
                    self.count += 1
2805
                    if count >= self.fail_after_count:
2806
                        return None
2807
            return normal_get_structured_value(*args)
2808

  
2809
        @property
2810
        def method(self):
2811
            return lambda *args: self(*args)
2812

  
2813
    data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
2814
    urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
2815

  
2816
    formdef = create_formdef()
2817
    formdef.fields = [
2818
        fields.PageField(id='0', label='1st page', type='page'),
2819
        fields.ItemField(id='1', label='string', data_source={'type': 'foobar'}),
2820
    ]
2821

  
2822
    formdef.store()
2823
    resp = get_app(pub).get('/test/')
2824
    formdef.data_class().wipe()
2825
    resp.forms[0]['f1'] = '1'
2826

  
2827
    # fail in get_structured_value
2828
    monkeypatch.setattr(
2829
        NamedDataSource, 'get_structured_value', failing_get_structured_value(fail_after_count_page).method
2830
    )
2831
    resp = resp.forms[0].submit('submit')
2832
    assert 'Technical error, please try again' in resp.text
2833

  
2834
    # fix transient failure
2835
    monkeypatch.setattr(NamedDataSource, 'get_structured_value', normal_get_structured_value)
2836
    resp = resp.forms[0].submit('submit')
2837
    assert 'Check values then click submit.' in resp.text
2838

  
2839
    # fail in get_structured_value
2840
    monkeypatch.setattr(
2841
        NamedDataSource,
2842
        'get_structured_value',
2843
        failing_get_structured_value(fail_after_count_validation).method,
2844
    )
2845
    resp = resp.forms[0].submit('submit')
2846
    assert 'Technical error, please try again' in resp.text
2847

  
2848
    # fix transient failure
2849
    monkeypatch.setattr(NamedDataSource, 'get_structured_value', normal_get_structured_value)
2850
    resp = resp.forms[0].submit('submit')
2851
    assert resp.status_int == 302
2852
    resp = resp.follow()
2853
    assert 'The form has been recorded' in resp.text
2854
    assert formdef.data_class().count() == 1
2855
    data_id = formdef.data_class().select()[0].id
2856
    return formdef.data_class().get(data_id).data
2857

  
2858

  
2778 2859
def test_form_items_data_source_field_submit(pub):
2779 2860
    def submit_items_data_source_field(ds):
2780 2861
        formdef = create_formdef()
......
4340 4421
    assert resp.forms[1]['f1'].value == 'foobar'  # not a valid email
4341 4422

  
4342 4423

  
4424
@mock.patch('wcs.qommon.misc.urlopen')
4425
def test_form_autosave_item_field_data_source_error(urlopen, pub):
4426
    ds = {'type': 'json', 'value': 'http://www.example.net/plop'}
4427
    formdef = create_formdef()
4428
    formdef.fields = [
4429
        fields.ItemField(id='1', label='string', data_source=ds),
4430
    ]
4431
    formdef.enable_tracking_codes = True
4432
    formdef.store()
4433

  
4434
    data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]}
4435
    urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
4436

  
4437
    formdef.data_class().wipe()
4438
    app = get_app(pub)
4439
    resp = app.get('/test/')
4440
    resp.form['f1'] = '1'  # not a valid email
4441

  
4442
    # make the ds fails
4443
    with mock.patch.object(NamedDataSource, 'get_structured_value', lambda *args: None):
4444

  
4445
        autosave_resp = app.post('/test/autosave', params=resp.form.submit_fields())
4446
    assert autosave_resp.json == {
4447
        'reason': 'form deserialization failed: a datasource is unavailable',
4448
        'result': 'error',
4449
    }
4450

  
4451
    autosave_resp = app.post('/test/autosave', params=resp.form.submit_fields())
4452
    assert autosave_resp.json['result'] == 'success'
4453

  
4454

  
4343 4455
def test_form_autosave_with_parameterized_datasource(pub):
4344 4456
    formdef = create_formdef()
4345 4457
    formdef.fields = [
tests/form_pages/test_live.py
1264 1264
    resp = resp.form.submit('submit')  # -> 3rd page
1265 1265
    resp = resp.form.submit('previous')  # -> 2nd page
1266 1266
    assert 'live value: attr1' in resp
1267

  
1268

  
1269
@mock.patch('wcs.qommon.misc.urlopen')
1270
def test_field_live_condition_data_source_error(urlopen, pub):
1271
    ds = {'type': 'json', 'value': 'http://www.example.net/plop'}
1272

  
1273
    FormDef.wipe()
1274
    formdef = FormDef()
1275
    formdef.name = 'Foo'
1276
    formdef.fields = [
1277
        fields.ItemField(id='1', label='string', data_source=ds, varname='foo'),
1278
        fields.StringField(
1279
            id='2',
1280
            label='bar',
1281
            required=True,
1282
            varname='bar',
1283
            condition={'type': 'django', 'value': 'form_var_foo_x == "bye"'},
1284
        ),
1285
    ]
1286
    formdef.store()
1287

  
1288
    data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux', 'x': 'bye'}]}
1289
    urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data))
1290

  
1291
    app = get_app(pub)
1292
    resp = app.get('/foo/')
1293
    assert 'f1' in resp.form.fields
1294
    assert 'f2' not in resp.form.fields
1295
    resp.form['f1'] = '2'
1296

  
1297
    with mock.patch.object(NamedDataSource, 'get_structured_value', lambda *args: None):
1298
        live_resp = app.post('/foo/live', params=resp.form.submit_fields())
1299
        assert live_resp.json == {
1300
            'reason': 'form deserialization failed: a datasource is unavailable',
1301
            'result': 'error',
1302
        }
1303

  
1304
    live_resp = app.post('/foo/live', params=resp.form.submit_fields())
1305
    assert live_resp.json == {'result': {'1': {'visible': True}, '2': {'visible': True}}}
wcs/forms/root.py
35 35
from quixote.util import randbytes
36 36

  
37 37
from wcs.categories import Category
38
from wcs.fields import SetValueError
38 39
from wcs.formdata import Evolution, FormData
39 40
from wcs.formdef import FormDef
40 41
from wcs.forms.common import FormStatusPage, FormTemplateMixin
......
543 544
            if had_prefill:
544 545
                # include prefilled data
545 546
                transient_formdata = self.get_transient_formdata(magictoken)
547
                # XXX: Should we handle datasources error here ?
546 548
                transient_formdata.data.update(self.formdef.get_data(form))
547 549
                if self.has_draft_support() and not (req.is_from_application() or req.is_from_bot()):
548 550
                    # save to get prefilling data in database
......
1061 1063
                # reset locked data with newly submitted values, this allows
1062 1064
                # for templates referencing fields from the sampe page.
1063 1065
                self.reset_locked_data(form)
1064
                data = self.formdef.get_data(form)
1066
                try:
1067
                    data = self.formdef.get_data(form, raise_on_error=True)
1068
                except SetValueError:
1069
                    return self.page(
1070
                        page,
1071
                        page_change=False,
1072
                        page_error_messages=[_('Technical error, please try again')],
1073
                        transient_formdata=transient_formdata,
1074
                    )
1065 1075
                computed_data = self.handle_computed_fields(magictoken, submitted_fields)
1066 1076

  
1067 1077
            form_data.update(data)
......
1082 1092
                # a new ConditionsVars will get added to the substitution
1083 1093
                # variables.
1084 1094
                form_data = copy.copy(session.get_by_magictoken(magictoken, {}))
1085
                data = self.formdef.get_data(form)
1095
                try:
1096
                    data = self.formdef.get_data(form, raise_on_error=True)
1097
                except SetValueError:
1098
                    return self.page(
1099
                        page,
1100
                        page_change=False,
1101
                        page_error_messages=[_('Technical error, please try again')],
1102
                        transient_formdata=transient_formdata,
1103
                    )
1086 1104
                form_data.update(data)
1087 1105
                form_data.update(computed_data)
1088 1106
                for i, post_condition in enumerate(post_conditions):
......
1120 1138

  
1121 1139
            form_data = session.get_by_magictoken(magictoken, {})
1122 1140
            with get_publisher().substitutions.temporary_feed(transient_formdata, force_mode='lazy'):
1123
                data = self.formdef.get_data(form)
1141
                try:
1142
                    data = self.formdef.get_data(form, raise_on_error=True)
1143
                except SetValueError:
1144
                    return self.page(
1145
                        page,
1146
                        page_change=False,
1147
                        page_error_messages=[_('Technical error, please try again')],
1148
                        transient_formdata=transient_formdata,
1149
                    )
1124 1150
            form_data.update(data)
1125 1151
            form_data.update(computed_data)
1126 1152

  
......
1161 1187
                            v = field.convert_value_to_str(v)
1162 1188
                        req.form['f%s' % k] = v
1163 1189
                if self.edit_mode:
1164
                    form = self.create_view_form(form_data, use_tokens=False)
1165
                    return self.submitted_existing(form)
1190
                    view_form = self.create_view_form(form_data, use_tokens=False)
1191
                    try:
1192
                        return self.submitted_existing(view_form)
1193
                    except SetValueError:
1194
                        return self.page(
1195
                            page,
1196
                            page_change=False,
1197
                            page_error_messages=[_('Technical error, please try again')],
1198
                            transient_formdata=transient_formdata,
1199
                        )
1166 1200
                if self.has_confirmation_page():
1167 1201
                    return self.validating(form_data)
1168 1202
                else:
......
1189 1223
                return self.previous_page(len(self.pages), magictoken)
1190 1224
            magictoken = form.get_widget('magictoken').parse()
1191 1225
            form_data = session.get_by_magictoken(magictoken, {})
1192
            data = self.formdef.get_data(form)
1226
            try:
1227
                data = self.formdef.get_data(form)
1228
            except SetValueError:
1229
                return self.page(
1230
                    page,
1231
                    page_change=False,
1232
                    page_error_messages=[_('Technical error, please try again')],
1233
                    transient_formdata=transient_formdata,
1234
                )
1193 1235
            form_data.update(data)
1194 1236
            session.add_magictoken(magictoken, form_data)
1195 1237

  
......
1228 1270
                # submitted a second time
1229 1271
                return template.error_page(_('This form has already been submitted.'))
1230 1272

  
1231
            return self.submitted(form, existing_formdata)
1273
            try:
1274
                return self.submitted(form, existing_formdata)
1275
            except SetValueError:
1276
                if get_request().form.get('step') == '2':
1277
                    # submit came from the validation page
1278
                    return self.validating(
1279
                        form_data, page_error_messages=[_('Technical error, please try again')]
1280
                    )
1281
                else:
1282
                    # last page
1283
                    return self.page(
1284
                        page,
1285
                        page_change=False,
1286
                        page_error_messages=[_('Technical error, please try again')],
1287
                        transient_formdata=transient_formdata,
1288
                    )
1232 1289

  
1233 1290
    def reset_locked_data(self, form):
1234 1291
        # reset locked fields, making sure the user cannot alter them.
......
1338 1395
            # on webservice results, there can be (temporary?) inconsistencies.
1339 1396
            return result_error('ouf ot range page_no')
1340 1397
        form = self.create_form(page=page)
1341
        data = self.formdef.get_data(form)
1398
        try:
1399
            data = self.formdef.get_data(form, raise_on_error=True)
1400
        except SetValueError as e:
1401
            return result_error('form deserialization failed: %s' % e)
1342 1402
        if not data:
1343 1403
            return result_error('nothing to save')
1344 1404

  
......
1428 1488
        displayed_fields = []
1429 1489
        with get_publisher().substitutions.temporary_feed(formdata, force_mode='lazy'):
1430 1490
            form = self.create_form(page=page, displayed_fields=displayed_fields, transient_formdata=formdata)
1431
        formdata.data.update(self.formdef.get_data(form))
1491
        try:
1492
            formdata.data.update(self.formdef.get_data(form, raise_on_error=True))
1493
        except SetValueError as e:
1494
            return result_error('form deserialization failed: %s' % e)
1432 1495
        return FormStatusPage.live_process_fields(form, formdata, displayed_fields)
1433 1496

  
1434 1497
    def clean_submission_context(self):
......
1443 1506
            filled = self.get_current_draft() or self.formdef.data_class()()
1444 1507
            filled.just_created()
1445 1508

  
1446
        filled.data = self.formdef.get_data(form)
1509
        filled.data = self.formdef.get_data(form, raise_on_error=True)
1447 1510
        magictoken = get_request().form['magictoken']
1448 1511
        computed_values = get_session().get_by_magictoken('%s-computed' % magictoken, {})
1449 1512
        filled.data.update(computed_values)
......
1493 1556
        code.formdata = formdata  # this will .store() the code
1494 1557

  
1495 1558
    def submitted_existing(self, form):
1496
        new_data = self.formdef.get_data(form)
1559
        new_data = self.formdef.get_data(form, raise_on_error=True)
1497 1560
        magictoken = get_request().form['magictoken']
1498 1561
        computed_values = get_session().get_by_magictoken('%s-computed' % magictoken, {})
1499 1562
        new_data.update(computed_values)
......
1565 1628
                return thumbnail
1566 1629
        return get_session().get_tempfile_content(t).get_file_pointer().read()
1567 1630

  
1568
    def validating(self, data):
1631
    def validating(self, data, page_error_messages=None):
1569 1632
        self.on_validation_page = True
1570 1633
        get_request().view_name = 'validation'
1571 1634
        self.html_top(self.formdef.name)
......
1573 1636
        # over in rendering.
1574 1637
        get_request().environ['REQUEST_METHOD'] = 'GET'
1575 1638
        form = self.create_view_form(data)
1639
        if page_error_messages:
1640
            form.add_global_errors(page_error_messages)
1576 1641
        token_widget = form.get_widget(form.TOKEN_NAME)
1577 1642
        token_widget._parsed = True
1578 1643
        if self.formdef.has_captcha and not (get_session().get_user() or get_session().won_captcha):
......
1604 1669
        }
1605 1670
        if self.has_draft_support() and data:
1606 1671
            context['tracking_code_box'] = lambda: self.tracking_code_box(data, magictoken)
1607

  
1608 1672
        return template.QommonTemplateResponse(
1609 1673
            templates=list(self.get_formdef_template_variants(self.validation_templates)), context=context
1610 1674
        )
1611
-