0002-forms-redisplay-page-when-datasource-is-unavailable-.patch
tests/form_pages/test_all.py | ||
---|---|---|
2775 | 2775 |
assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'} |
2776 | 2776 | |
2777 | 2777 | |
2778 |
@pytest.mark.parametrize('fail_after_count_page', range(2, 8)) |
|
2779 |
@pytest.mark.parametrize('fail_after_count_validation', range(0, 2)) |
|
2780 |
@mock.patch('wcs.qommon.misc.urlopen') |
|
2781 |
def test_form_item_data_source_error( |
|
2782 |
urlopen, pub, monkeypatch, fail_after_count_page, fail_after_count_validation |
|
2783 |
): |
|
2784 |
data_source = NamedDataSource(name='foobar') |
|
2785 |
data_source.data_source = {'type': 'json', 'value': 'http://www.example.net/plop'} |
|
2786 |
data_source.id_parameter = 'id' |
|
2787 |
data_source.store() |
|
2788 | ||
2789 |
normal_get_structured_value = NamedDataSource.get_structured_value |
|
2790 | ||
2791 |
count = [0] |
|
2792 | ||
2793 |
class failing_get_structured_value: |
|
2794 |
def __init__(self, fail_after_count): |
|
2795 |
self.fail_after_count = fail_after_count |
|
2796 |
self.count = 0 |
|
2797 | ||
2798 |
def __call__(self, *args): |
|
2799 |
import inspect |
|
2800 | ||
2801 |
for frame in inspect.stack(): |
|
2802 |
if frame.function in ['store_display_value', 'store_structured_value']: |
|
2803 |
count = self.count |
|
2804 |
self.count += 1 |
|
2805 |
if count >= self.fail_after_count: |
|
2806 |
return None |
|
2807 |
return normal_get_structured_value(*args) |
|
2808 | ||
2809 |
@property |
|
2810 |
def method(self): |
|
2811 |
return lambda *args: self(*args) |
|
2812 | ||
2813 |
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]} |
|
2814 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data)) |
|
2815 | ||
2816 |
formdef = create_formdef() |
|
2817 |
formdef.fields = [ |
|
2818 |
fields.PageField(id='0', label='1st page', type='page'), |
|
2819 |
fields.ItemField(id='1', label='string', data_source={'type': 'foobar'}), |
|
2820 |
] |
|
2821 | ||
2822 |
formdef.store() |
|
2823 |
resp = get_app(pub).get('/test/') |
|
2824 |
formdef.data_class().wipe() |
|
2825 |
resp.forms[0]['f1'] = '1' |
|
2826 | ||
2827 |
# fail in get_structured_value |
|
2828 |
monkeypatch.setattr( |
|
2829 |
NamedDataSource, 'get_structured_value', failing_get_structured_value(fail_after_count_page).method |
|
2830 |
) |
|
2831 |
resp = resp.forms[0].submit('submit') |
|
2832 |
assert 'Technical error, please try again' in resp.text |
|
2833 | ||
2834 |
# fix transient failure |
|
2835 |
monkeypatch.setattr(NamedDataSource, 'get_structured_value', normal_get_structured_value) |
|
2836 |
resp = resp.forms[0].submit('submit') |
|
2837 |
assert 'Check values then click submit.' in resp.text |
|
2838 | ||
2839 |
# fail in get_structured_value |
|
2840 |
monkeypatch.setattr( |
|
2841 |
NamedDataSource, |
|
2842 |
'get_structured_value', |
|
2843 |
failing_get_structured_value(fail_after_count_validation).method, |
|
2844 |
) |
|
2845 |
resp = resp.forms[0].submit('submit') |
|
2846 |
assert 'Technical error, please try again' in resp.text |
|
2847 | ||
2848 |
# fix transient failure |
|
2849 |
monkeypatch.setattr(NamedDataSource, 'get_structured_value', normal_get_structured_value) |
|
2850 |
resp = resp.forms[0].submit('submit') |
|
2851 |
assert resp.status_int == 302 |
|
2852 |
resp = resp.follow() |
|
2853 |
assert 'The form has been recorded' in resp.text |
|
2854 |
assert formdef.data_class().count() == 1 |
|
2855 |
data_id = formdef.data_class().select()[0].id |
|
2856 |
return formdef.data_class().get(data_id).data |
|
2857 | ||
2858 | ||
2778 | 2859 |
def test_form_items_data_source_field_submit(pub): |
2779 | 2860 |
def submit_items_data_source_field(ds): |
2780 | 2861 |
formdef = create_formdef() |
... | ... | |
4340 | 4421 |
assert resp.forms[1]['f1'].value == 'foobar' # not a valid email |
4341 | 4422 | |
4342 | 4423 | |
4424 |
@mock.patch('wcs.qommon.misc.urlopen') |
|
4425 |
def test_form_autosave_item_field_data_source_error(urlopen, pub): |
|
4426 |
ds = {'type': 'json', 'value': 'http://www.example.net/plop'} |
|
4427 |
formdef = create_formdef() |
|
4428 |
formdef.fields = [ |
|
4429 |
fields.ItemField(id='1', label='string', data_source=ds), |
|
4430 |
] |
|
4431 |
formdef.enable_tracking_codes = True |
|
4432 |
formdef.store() |
|
4433 | ||
4434 |
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]} |
|
4435 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data)) |
|
4436 | ||
4437 |
formdef.data_class().wipe() |
|
4438 |
app = get_app(pub) |
|
4439 |
resp = app.get('/test/') |
|
4440 |
resp.form['f1'] = '1' # not a valid email |
|
4441 | ||
4442 |
# make the ds fails |
|
4443 |
with mock.patch.object(NamedDataSource, 'get_structured_value', lambda *args: None): |
|
4444 | ||
4445 |
autosave_resp = app.post('/test/autosave', params=resp.form.submit_fields()) |
|
4446 |
assert autosave_resp.json == { |
|
4447 |
'reason': 'form deserialization failed: a datasource is unavailable', |
|
4448 |
'result': 'error', |
|
4449 |
} |
|
4450 | ||
4451 |
autosave_resp = app.post('/test/autosave', params=resp.form.submit_fields()) |
|
4452 |
assert autosave_resp.json['result'] == 'success' |
|
4453 | ||
4454 | ||
4343 | 4455 |
def test_form_autosave_with_parameterized_datasource(pub): |
4344 | 4456 |
formdef = create_formdef() |
4345 | 4457 |
formdef.fields = [ |
tests/form_pages/test_live.py | ||
---|---|---|
1264 | 1264 |
resp = resp.form.submit('submit') # -> 3rd page |
1265 | 1265 |
resp = resp.form.submit('previous') # -> 2nd page |
1266 | 1266 |
assert 'live value: attr1' in resp |
1267 | ||
1268 | ||
1269 |
@mock.patch('wcs.qommon.misc.urlopen') |
|
1270 |
def test_field_live_condition_data_source_error(urlopen, pub): |
|
1271 |
ds = {'type': 'json', 'value': 'http://www.example.net/plop'} |
|
1272 | ||
1273 |
FormDef.wipe() |
|
1274 |
formdef = FormDef() |
|
1275 |
formdef.name = 'Foo' |
|
1276 |
formdef.fields = [ |
|
1277 |
fields.ItemField(id='1', label='string', data_source=ds, varname='foo'), |
|
1278 |
fields.StringField( |
|
1279 |
id='2', |
|
1280 |
label='bar', |
|
1281 |
required=True, |
|
1282 |
varname='bar', |
|
1283 |
condition={'type': 'django', 'value': 'form_var_foo_x == "bye"'}, |
|
1284 |
), |
|
1285 |
] |
|
1286 |
formdef.store() |
|
1287 | ||
1288 |
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux', 'x': 'bye'}]} |
|
1289 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data)) |
|
1290 | ||
1291 |
app = get_app(pub) |
|
1292 |
resp = app.get('/foo/') |
|
1293 |
assert 'f1' in resp.form.fields |
|
1294 |
assert 'f2' not in resp.form.fields |
|
1295 |
resp.form['f1'] = '2' |
|
1296 | ||
1297 |
with mock.patch.object(NamedDataSource, 'get_structured_value', lambda *args: None): |
|
1298 |
live_resp = app.post('/foo/live', params=resp.form.submit_fields()) |
|
1299 |
assert live_resp.json == { |
|
1300 |
'reason': 'form deserialization failed: a datasource is unavailable', |
|
1301 |
'result': 'error', |
|
1302 |
} |
|
1303 | ||
1304 |
live_resp = app.post('/foo/live', params=resp.form.submit_fields()) |
|
1305 |
assert live_resp.json == {'result': {'1': {'visible': True}, '2': {'visible': True}}} |
wcs/forms/root.py | ||
---|---|---|
35 | 35 |
from quixote.util import randbytes |
36 | 36 | |
37 | 37 |
from wcs.categories import Category |
38 |
from wcs.fields import SetValueError |
|
38 | 39 |
from wcs.formdata import Evolution, FormData |
39 | 40 |
from wcs.formdef import FormDef |
40 | 41 |
from wcs.forms.common import FormStatusPage, FormTemplateMixin |
... | ... | |
543 | 544 |
if had_prefill: |
544 | 545 |
# include prefilled data |
545 | 546 |
transient_formdata = self.get_transient_formdata(magictoken) |
547 |
# XXX: Should we handle datasources error here ? |
|
546 | 548 |
transient_formdata.data.update(self.formdef.get_data(form)) |
547 | 549 |
if self.has_draft_support() and not (req.is_from_application() or req.is_from_bot()): |
548 | 550 |
# save to get prefilling data in database |
... | ... | |
1061 | 1063 |
# reset locked data with newly submitted values, this allows |
1062 | 1064 |
# for templates referencing fields from the sampe page. |
1063 | 1065 |
self.reset_locked_data(form) |
1064 |
data = self.formdef.get_data(form) |
|
1066 |
try: |
|
1067 |
data = self.formdef.get_data(form, raise_on_error=True) |
|
1068 |
except SetValueError: |
|
1069 |
return self.page( |
|
1070 |
page, |
|
1071 |
page_change=False, |
|
1072 |
page_error_messages=[_('Technical error, please try again')], |
|
1073 |
transient_formdata=transient_formdata, |
|
1074 |
) |
|
1065 | 1075 |
computed_data = self.handle_computed_fields(magictoken, submitted_fields) |
1066 | 1076 | |
1067 | 1077 |
form_data.update(data) |
... | ... | |
1082 | 1092 |
# a new ConditionsVars will get added to the substitution |
1083 | 1093 |
# variables. |
1084 | 1094 |
form_data = copy.copy(session.get_by_magictoken(magictoken, {})) |
1085 |
data = self.formdef.get_data(form) |
|
1095 |
try: |
|
1096 |
data = self.formdef.get_data(form, raise_on_error=True) |
|
1097 |
except SetValueError: |
|
1098 |
return self.page( |
|
1099 |
page, |
|
1100 |
page_change=False, |
|
1101 |
page_error_messages=[_('Technical error, please try again')], |
|
1102 |
transient_formdata=transient_formdata, |
|
1103 |
) |
|
1086 | 1104 |
form_data.update(data) |
1087 | 1105 |
form_data.update(computed_data) |
1088 | 1106 |
for i, post_condition in enumerate(post_conditions): |
... | ... | |
1120 | 1138 | |
1121 | 1139 |
form_data = session.get_by_magictoken(magictoken, {}) |
1122 | 1140 |
with get_publisher().substitutions.temporary_feed(transient_formdata, force_mode='lazy'): |
1123 |
data = self.formdef.get_data(form) |
|
1141 |
try: |
|
1142 |
data = self.formdef.get_data(form, raise_on_error=True) |
|
1143 |
except SetValueError: |
|
1144 |
return self.page( |
|
1145 |
page, |
|
1146 |
page_change=False, |
|
1147 |
page_error_messages=[_('Technical error, please try again')], |
|
1148 |
transient_formdata=transient_formdata, |
|
1149 |
) |
|
1124 | 1150 |
form_data.update(data) |
1125 | 1151 |
form_data.update(computed_data) |
1126 | 1152 | |
... | ... | |
1161 | 1187 |
v = field.convert_value_to_str(v) |
1162 | 1188 |
req.form['f%s' % k] = v |
1163 | 1189 |
if self.edit_mode: |
1164 |
form = self.create_view_form(form_data, use_tokens=False) |
|
1165 |
return self.submitted_existing(form) |
|
1190 |
view_form = self.create_view_form(form_data, use_tokens=False) |
|
1191 |
try: |
|
1192 |
return self.submitted_existing(view_form) |
|
1193 |
except SetValueError: |
|
1194 |
return self.page( |
|
1195 |
page, |
|
1196 |
page_change=False, |
|
1197 |
page_error_messages=[_('Technical error, please try again')], |
|
1198 |
transient_formdata=transient_formdata, |
|
1199 |
) |
|
1166 | 1200 |
if self.has_confirmation_page(): |
1167 | 1201 |
return self.validating(form_data) |
1168 | 1202 |
else: |
... | ... | |
1189 | 1223 |
return self.previous_page(len(self.pages), magictoken) |
1190 | 1224 |
magictoken = form.get_widget('magictoken').parse() |
1191 | 1225 |
form_data = session.get_by_magictoken(magictoken, {}) |
1192 |
data = self.formdef.get_data(form) |
|
1226 |
try: |
|
1227 |
data = self.formdef.get_data(form) |
|
1228 |
except SetValueError: |
|
1229 |
return self.page( |
|
1230 |
page, |
|
1231 |
page_change=False, |
|
1232 |
page_error_messages=[_('Technical error, please try again')], |
|
1233 |
transient_formdata=transient_formdata, |
|
1234 |
) |
|
1193 | 1235 |
form_data.update(data) |
1194 | 1236 |
session.add_magictoken(magictoken, form_data) |
1195 | 1237 | |
... | ... | |
1228 | 1270 |
# submitted a second time |
1229 | 1271 |
return template.error_page(_('This form has already been submitted.')) |
1230 | 1272 | |
1231 |
return self.submitted(form, existing_formdata) |
|
1273 |
try: |
|
1274 |
return self.submitted(form, existing_formdata) |
|
1275 |
except SetValueError: |
|
1276 |
if get_request().form.get('step') == '2': |
|
1277 |
# submit came from the validation page |
|
1278 |
return self.validating( |
|
1279 |
form_data, page_error_messages=[_('Technical error, please try again')] |
|
1280 |
) |
|
1281 |
else: |
|
1282 |
# last page |
|
1283 |
return self.page( |
|
1284 |
page, |
|
1285 |
page_change=False, |
|
1286 |
page_error_messages=[_('Technical error, please try again')], |
|
1287 |
transient_formdata=transient_formdata, |
|
1288 |
) |
|
1232 | 1289 | |
1233 | 1290 |
def reset_locked_data(self, form): |
1234 | 1291 |
# reset locked fields, making sure the user cannot alter them. |
... | ... | |
1338 | 1395 |
# on webservice results, there can be (temporary?) inconsistencies. |
1339 | 1396 |
return result_error('ouf ot range page_no') |
1340 | 1397 |
form = self.create_form(page=page) |
1341 |
data = self.formdef.get_data(form) |
|
1398 |
try: |
|
1399 |
data = self.formdef.get_data(form, raise_on_error=True) |
|
1400 |
except SetValueError as e: |
|
1401 |
return result_error('form deserialization failed: %s' % e) |
|
1342 | 1402 |
if not data: |
1343 | 1403 |
return result_error('nothing to save') |
1344 | 1404 | |
... | ... | |
1428 | 1488 |
displayed_fields = [] |
1429 | 1489 |
with get_publisher().substitutions.temporary_feed(formdata, force_mode='lazy'): |
1430 | 1490 |
form = self.create_form(page=page, displayed_fields=displayed_fields, transient_formdata=formdata) |
1431 |
formdata.data.update(self.formdef.get_data(form)) |
|
1491 |
try: |
|
1492 |
formdata.data.update(self.formdef.get_data(form, raise_on_error=True)) |
|
1493 |
except SetValueError as e: |
|
1494 |
return result_error('form deserialization failed: %s' % e) |
|
1432 | 1495 |
return FormStatusPage.live_process_fields(form, formdata, displayed_fields) |
1433 | 1496 | |
1434 | 1497 |
def clean_submission_context(self): |
... | ... | |
1443 | 1506 |
filled = self.get_current_draft() or self.formdef.data_class()() |
1444 | 1507 |
filled.just_created() |
1445 | 1508 | |
1446 |
filled.data = self.formdef.get_data(form) |
|
1509 |
filled.data = self.formdef.get_data(form, raise_on_error=True)
|
|
1447 | 1510 |
magictoken = get_request().form['magictoken'] |
1448 | 1511 |
computed_values = get_session().get_by_magictoken('%s-computed' % magictoken, {}) |
1449 | 1512 |
filled.data.update(computed_values) |
... | ... | |
1493 | 1556 |
code.formdata = formdata # this will .store() the code |
1494 | 1557 | |
1495 | 1558 |
def submitted_existing(self, form): |
1496 |
new_data = self.formdef.get_data(form) |
|
1559 |
new_data = self.formdef.get_data(form, raise_on_error=True)
|
|
1497 | 1560 |
magictoken = get_request().form['magictoken'] |
1498 | 1561 |
computed_values = get_session().get_by_magictoken('%s-computed' % magictoken, {}) |
1499 | 1562 |
new_data.update(computed_values) |
... | ... | |
1565 | 1628 |
return thumbnail |
1566 | 1629 |
return get_session().get_tempfile_content(t).get_file_pointer().read() |
1567 | 1630 | |
1568 |
def validating(self, data): |
|
1631 |
def validating(self, data, page_error_messages=None):
|
|
1569 | 1632 |
self.on_validation_page = True |
1570 | 1633 |
get_request().view_name = 'validation' |
1571 | 1634 |
self.html_top(self.formdef.name) |
... | ... | |
1573 | 1636 |
# over in rendering. |
1574 | 1637 |
get_request().environ['REQUEST_METHOD'] = 'GET' |
1575 | 1638 |
form = self.create_view_form(data) |
1639 |
if page_error_messages: |
|
1640 |
form.add_global_errors(page_error_messages) |
|
1576 | 1641 |
token_widget = form.get_widget(form.TOKEN_NAME) |
1577 | 1642 |
token_widget._parsed = True |
1578 | 1643 |
if self.formdef.has_captcha and not (get_session().get_user() or get_session().won_captcha): |
... | ... | |
1604 | 1669 |
} |
1605 | 1670 |
if self.has_draft_support() and data: |
1606 | 1671 |
context['tracking_code_box'] = lambda: self.tracking_code_box(data, magictoken) |
1607 | ||
1608 | 1672 |
return template.QommonTemplateResponse( |
1609 | 1673 |
templates=list(self.get_formdef_template_variants(self.validation_templates)), context=context |
1610 | 1674 |
) |
1611 |
- |