0002-forms-redisplay-page-when-datasource-is-unavailable-.patch
tests/form_pages/test_all.py | ||
---|---|---|
2775 | 2775 |
assert submit_item_data_source_field(ds) == {'0': '1', '0_display': 'un'} |
2776 | 2776 | |
2777 | 2777 | |
2778 |
@pytest.mark.parametrize('fail_after_count_page', range(2, 8)) |
|
2779 |
@pytest.mark.parametrize('fail_after_count_validation', range(0, 2)) |
|
2780 |
@mock.patch('wcs.qommon.misc.urlopen') |
|
2781 |
def test_form_item_data_source_error( |
|
2782 |
urlopen, pub, monkeypatch, fail_after_count_page, fail_after_count_validation |
|
2783 |
): |
|
2784 |
data_source = NamedDataSource(name='foobar') |
|
2785 |
data_source.data_source = {'type': 'json', 'value': 'http://www.example.net/plop'} |
|
2786 |
data_source.id_parameter = 'id' |
|
2787 |
data_source.store() |
|
2788 | ||
2789 |
normal_get_structured_value = NamedDataSource.get_structured_value |
|
2790 | ||
2791 |
class failing_get_structured_value: |
|
2792 |
def __init__(self, fail_after_count): |
|
2793 |
self.fail_after_count = fail_after_count |
|
2794 |
self.count = 0 |
|
2795 | ||
2796 |
def __call__(self, *args): |
|
2797 |
import inspect |
|
2798 | ||
2799 |
for frame in inspect.stack(): |
|
2800 |
if frame.function in ['store_display_value', 'store_structured_value']: |
|
2801 |
count = self.count |
|
2802 |
self.count += 1 |
|
2803 |
if count >= self.fail_after_count: |
|
2804 |
return None |
|
2805 |
return normal_get_structured_value(*args) |
|
2806 | ||
2807 |
@property |
|
2808 |
def method(self): |
|
2809 |
def f(*args): |
|
2810 |
return self(*args) |
|
2811 | ||
2812 |
return f |
|
2813 | ||
2814 |
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]} |
|
2815 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data)) |
|
2816 | ||
2817 |
formdef = create_formdef() |
|
2818 |
formdef.fields = [ |
|
2819 |
fields.PageField(id='0', label='1st page', type='page'), |
|
2820 |
fields.ItemField(id='1', label='string', data_source={'type': 'foobar'}), |
|
2821 |
] |
|
2822 | ||
2823 |
formdef.store() |
|
2824 |
resp = get_app(pub).get('/test/') |
|
2825 |
formdef.data_class().wipe() |
|
2826 |
resp.forms[0]['f1'] = '1' |
|
2827 | ||
2828 |
# fail in get_structured_value |
|
2829 |
monkeypatch.setattr( |
|
2830 |
NamedDataSource, 'get_structured_value', failing_get_structured_value(fail_after_count_page).method |
|
2831 |
) |
|
2832 |
resp = resp.forms[0].submit('submit') |
|
2833 |
assert 'Technical error, please try again' in resp.text |
|
2834 | ||
2835 |
# fix transient failure |
|
2836 |
monkeypatch.setattr(NamedDataSource, 'get_structured_value', normal_get_structured_value) |
|
2837 |
resp = resp.forms[0].submit('submit') |
|
2838 |
assert 'Check values then click submit.' in resp.text |
|
2839 | ||
2840 |
# fail in get_structured_value |
|
2841 |
monkeypatch.setattr( |
|
2842 |
NamedDataSource, |
|
2843 |
'get_structured_value', |
|
2844 |
failing_get_structured_value(fail_after_count_validation).method, |
|
2845 |
) |
|
2846 |
resp = resp.forms[0].submit('submit') |
|
2847 |
assert 'Technical error, please try again' in resp.text |
|
2848 | ||
2849 |
# fix transient failure |
|
2850 |
monkeypatch.setattr(NamedDataSource, 'get_structured_value', normal_get_structured_value) |
|
2851 |
resp = resp.forms[0].submit('submit') |
|
2852 |
assert resp.status_int == 302 |
|
2853 |
resp = resp.follow() |
|
2854 |
assert 'The form has been recorded' in resp.text |
|
2855 |
assert formdef.data_class().count() == 1 |
|
2856 |
data_id = formdef.data_class().select()[0].id |
|
2857 |
return formdef.data_class().get(data_id).data |
|
2858 | ||
2859 | ||
2860 |
@pytest.mark.parametrize('fail_after_count_page', range(2, 8)) |
|
2861 |
@mock.patch('wcs.qommon.misc.urlopen') |
|
2862 |
def test_form_item_data_source_error_no_confirmation(urlopen, pub, monkeypatch, fail_after_count_page): |
|
2863 |
data_source = NamedDataSource(name='foobar') |
|
2864 |
data_source.data_source = {'type': 'json', 'value': 'http://www.example.net/plop'} |
|
2865 |
data_source.id_parameter = 'id' |
|
2866 |
data_source.store() |
|
2867 | ||
2868 |
normal_get_structured_value = NamedDataSource.get_structured_value |
|
2869 | ||
2870 |
class failing_get_structured_value: |
|
2871 |
def __init__(self, fail_after_count): |
|
2872 |
self.fail_after_count = fail_after_count |
|
2873 |
self.count = 0 |
|
2874 | ||
2875 |
def __call__(self, *args): |
|
2876 |
import inspect |
|
2877 | ||
2878 |
for frame in inspect.stack(): |
|
2879 |
if frame.function in ['store_display_value', 'store_structured_value']: |
|
2880 |
count = self.count |
|
2881 |
self.count += 1 |
|
2882 |
if count >= self.fail_after_count: |
|
2883 |
return None |
|
2884 |
return normal_get_structured_value(*args) |
|
2885 | ||
2886 |
@property |
|
2887 |
def method(self): |
|
2888 |
def f(*args): |
|
2889 |
return self(*args) |
|
2890 | ||
2891 |
return f |
|
2892 | ||
2893 |
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]} |
|
2894 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data)) |
|
2895 | ||
2896 |
formdef = create_formdef() |
|
2897 |
formdef.confirmation = False |
|
2898 |
formdef.fields = [ |
|
2899 |
fields.PageField(id='0', label='1st page', type='page'), |
|
2900 |
fields.ItemField(id='1', label='string', data_source={'type': 'foobar'}), |
|
2901 |
] |
|
2902 | ||
2903 |
formdef.store() |
|
2904 |
resp = get_app(pub).get('/test/') |
|
2905 |
formdef.data_class().wipe() |
|
2906 |
resp.forms[0]['f1'] = '1' |
|
2907 | ||
2908 |
# fail in get_structured_value |
|
2909 |
monkeypatch.setattr( |
|
2910 |
NamedDataSource, 'get_structured_value', failing_get_structured_value(fail_after_count_page).method |
|
2911 |
) |
|
2912 |
resp = resp.forms[0].submit('submit') |
|
2913 |
assert 'Technical error, please try again' in resp.text |
|
2914 | ||
2915 |
# fix transient failure |
|
2916 |
monkeypatch.setattr(NamedDataSource, 'get_structured_value', normal_get_structured_value) |
|
2917 |
resp = resp.forms[0].submit('submit') |
|
2918 |
assert resp.status_int == 302 |
|
2919 |
resp = resp.follow() |
|
2920 |
assert 'The form has been recorded' in resp.text |
|
2921 |
assert formdef.data_class().count() == 1 |
|
2922 |
data_id = formdef.data_class().select()[0].id |
|
2923 |
return formdef.data_class().get(data_id).data |
|
2924 | ||
2925 | ||
2778 | 2926 |
def test_form_items_data_source_field_submit(pub): |
2779 | 2927 |
def submit_items_data_source_field(ds): |
2780 | 2928 |
formdef = create_formdef() |
... | ... | |
4340 | 4488 |
assert resp.forms[1]['f1'].value == 'foobar' # not a valid email |
4341 | 4489 | |
4342 | 4490 | |
4491 |
@mock.patch('wcs.qommon.misc.urlopen') |
|
4492 |
def test_form_autosave_item_field_data_source_error(urlopen, pub): |
|
4493 |
ds = {'type': 'json', 'value': 'http://www.example.net/plop'} |
|
4494 |
formdef = create_formdef() |
|
4495 |
formdef.fields = [ |
|
4496 |
fields.ItemField(id='1', label='string', data_source=ds), |
|
4497 |
] |
|
4498 |
formdef.enable_tracking_codes = True |
|
4499 |
formdef.store() |
|
4500 | ||
4501 |
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux'}]} |
|
4502 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data)) |
|
4503 | ||
4504 |
formdef.data_class().wipe() |
|
4505 |
app = get_app(pub) |
|
4506 |
resp = app.get('/test/') |
|
4507 |
resp.form['f1'] = '1' # not a valid email |
|
4508 | ||
4509 |
# make the ds fails |
|
4510 |
with mock.patch.object(NamedDataSource, 'get_structured_value', lambda *args: None): |
|
4511 | ||
4512 |
autosave_resp = app.post('/test/autosave', params=resp.form.submit_fields()) |
|
4513 |
assert autosave_resp.json == { |
|
4514 |
'reason': 'form deserialization failed: a datasource is unavailable', |
|
4515 |
'result': 'error', |
|
4516 |
} |
|
4517 | ||
4518 |
autosave_resp = app.post('/test/autosave', params=resp.form.submit_fields()) |
|
4519 |
assert autosave_resp.json['result'] == 'success' |
|
4520 | ||
4521 | ||
4343 | 4522 |
def test_form_autosave_with_parameterized_datasource(pub): |
4344 | 4523 |
formdef = create_formdef() |
4345 | 4524 |
formdef.fields = [ |
tests/form_pages/test_live.py | ||
---|---|---|
1264 | 1264 |
resp = resp.form.submit('submit') # -> 3rd page |
1265 | 1265 |
resp = resp.form.submit('previous') # -> 2nd page |
1266 | 1266 |
assert 'live value: attr1' in resp |
1267 | ||
1268 | ||
1269 |
@mock.patch('wcs.qommon.misc.urlopen') |
|
1270 |
def test_field_live_condition_data_source_error(urlopen, pub): |
|
1271 |
ds = {'type': 'json', 'value': 'http://www.example.net/plop'} |
|
1272 | ||
1273 |
FormDef.wipe() |
|
1274 |
formdef = FormDef() |
|
1275 |
formdef.name = 'Foo' |
|
1276 |
formdef.fields = [ |
|
1277 |
fields.ItemField(id='1', label='string', data_source=ds, varname='foo'), |
|
1278 |
fields.StringField( |
|
1279 |
id='2', |
|
1280 |
label='bar', |
|
1281 |
required=True, |
|
1282 |
varname='bar', |
|
1283 |
condition={'type': 'django', 'value': 'form_var_foo_x == "bye"'}, |
|
1284 |
), |
|
1285 |
] |
|
1286 |
formdef.store() |
|
1287 | ||
1288 |
data = {'data': [{'id': '1', 'text': 'un'}, {'id': '2', 'text': 'deux', 'x': 'bye'}]} |
|
1289 |
urlopen.side_effect = lambda *args: io.StringIO(json.dumps(data)) |
|
1290 | ||
1291 |
app = get_app(pub) |
|
1292 |
resp = app.get('/foo/') |
|
1293 |
assert 'f1' in resp.form.fields |
|
1294 |
assert 'f2' not in resp.form.fields |
|
1295 |
resp.form['f1'] = '2' |
|
1296 | ||
1297 |
with mock.patch.object(NamedDataSource, 'get_structured_value', lambda *args: None): |
|
1298 |
live_resp = app.post('/foo/live', params=resp.form.submit_fields()) |
|
1299 |
assert live_resp.json == { |
|
1300 |
'reason': 'form deserialization failed: a datasource is unavailable', |
|
1301 |
'result': 'error', |
|
1302 |
} |
|
1303 | ||
1304 |
live_resp = app.post('/foo/live', params=resp.form.submit_fields()) |
|
1305 |
assert live_resp.json == {'result': {'1': {'visible': True}, '2': {'visible': True}}} |
wcs/forms/root.py | ||
---|---|---|
35 | 35 |
from quixote.util import randbytes |
36 | 36 | |
37 | 37 |
from wcs.categories import Category |
38 |
from wcs.fields import SetValueError |
|
38 | 39 |
from wcs.formdata import Evolution, FormData |
39 | 40 |
from wcs.formdef import FormDef |
40 | 41 |
from wcs.forms.common import FormStatusPage, FormTemplateMixin |
... | ... | |
543 | 544 |
if had_prefill: |
544 | 545 |
# include prefilled data |
545 | 546 |
transient_formdata = self.get_transient_formdata(magictoken) |
547 |
# XXX: Should we handle datasources error here ? |
|
546 | 548 |
transient_formdata.data.update(self.formdef.get_data(form)) |
547 | 549 |
if self.has_draft_support() and not (req.is_from_application() or req.is_from_bot()): |
548 | 550 |
# save to get prefilling data in database |
... | ... | |
1061 | 1063 |
# reset locked data with newly submitted values, this allows |
1062 | 1064 |
# for templates referencing fields from the sampe page. |
1063 | 1065 |
self.reset_locked_data(form) |
1064 |
data = self.formdef.get_data(form) |
|
1066 |
try: |
|
1067 |
data = self.formdef.get_data(form, raise_on_error=True) |
|
1068 |
except SetValueError: |
|
1069 |
return self.page( |
|
1070 |
page, |
|
1071 |
page_change=False, |
|
1072 |
page_error_messages=[_('Technical error, please try again')], |
|
1073 |
transient_formdata=transient_formdata, |
|
1074 |
) |
|
1065 | 1075 |
computed_data = self.handle_computed_fields(magictoken, submitted_fields) |
1066 | 1076 | |
1067 | 1077 |
form_data.update(data) |
... | ... | |
1082 | 1092 |
# a new ConditionsVars will get added to the substitution |
1083 | 1093 |
# variables. |
1084 | 1094 |
form_data = copy.copy(session.get_by_magictoken(magictoken, {})) |
1085 |
data = self.formdef.get_data(form) |
|
1095 |
try: |
|
1096 |
data = self.formdef.get_data(form, raise_on_error=True) |
|
1097 |
except SetValueError: |
|
1098 |
return self.page( |
|
1099 |
page, |
|
1100 |
page_change=False, |
|
1101 |
page_error_messages=[_('Technical error, please try again')], |
|
1102 |
transient_formdata=transient_formdata, |
|
1103 |
) |
|
1086 | 1104 |
form_data.update(data) |
1087 | 1105 |
form_data.update(computed_data) |
1088 | 1106 |
for i, post_condition in enumerate(post_conditions): |
... | ... | |
1120 | 1138 | |
1121 | 1139 |
form_data = session.get_by_magictoken(magictoken, {}) |
1122 | 1140 |
with get_publisher().substitutions.temporary_feed(transient_formdata, force_mode='lazy'): |
1123 |
data = self.formdef.get_data(form) |
|
1141 |
try: |
|
1142 |
data = self.formdef.get_data(form, raise_on_error=True) |
|
1143 |
except SetValueError: |
|
1144 |
return self.page( |
|
1145 |
page, |
|
1146 |
page_change=False, |
|
1147 |
page_error_messages=[_('Technical error, please try again')], |
|
1148 |
transient_formdata=transient_formdata, |
|
1149 |
) |
|
1124 | 1150 |
form_data.update(data) |
1125 | 1151 |
form_data.update(computed_data) |
1126 | 1152 | |
... | ... | |
1161 | 1187 |
v = field.convert_value_to_str(v) |
1162 | 1188 |
req.form['f%s' % k] = v |
1163 | 1189 |
if self.edit_mode: |
1164 |
form = self.create_view_form(form_data, use_tokens=False) |
|
1165 |
return self.submitted_existing(form) |
|
1190 |
view_form = self.create_view_form(form_data, use_tokens=False) |
|
1191 |
try: |
|
1192 |
return self.submitted_existing(view_form) |
|
1193 |
except SetValueError: |
|
1194 |
return self.page( |
|
1195 |
page, |
|
1196 |
page_change=False, |
|
1197 |
page_error_messages=[_('Technical error, please try again')], |
|
1198 |
transient_formdata=transient_formdata, |
|
1199 |
) |
|
1166 | 1200 |
if self.has_confirmation_page(): |
1167 | 1201 |
return self.validating(form_data) |
1168 | 1202 |
else: |
... | ... | |
1183 | 1217 |
self.reset_locked_data(form) |
1184 | 1218 |
if step == 1: |
1185 | 1219 |
form.add_submit('previous') |
1186 |
magictoken = form.get_widget('magictoken').parse() |
|
1187 | ||
1188 | 1220 |
if form.get_submit() == 'previous': |
1189 | 1221 |
return self.previous_page(len(self.pages), magictoken) |
1190 |
magictoken = form.get_widget('magictoken').parse() |
|
1191 |
form_data = session.get_by_magictoken(magictoken, {}) |
|
1192 |
data = self.formdef.get_data(form) |
|
1193 |
form_data.update(data) |
|
1194 |
session.add_magictoken(magictoken, form_data) |
|
1195 | ||
1196 | 1222 |
step = 2 # so it will flow to submit |
1197 |
form = Form() |
|
1198 |
form.add_hidden('step', '-1') |
|
1199 |
form.add_hidden('page', '-1') |
|
1200 |
form.add_hidden('magictoken', '-1') |
|
1201 |
form.add_submit('cancel') |
|
1202 | 1223 | |
1203 | 1224 |
if step == 2: |
1204 |
form.add_submit('previous') |
|
1225 |
if 'previous' not in form: |
|
1226 |
form.add_submit('previous') |
|
1205 | 1227 |
magictoken = form.get_widget('magictoken').parse() |
1206 | 1228 |
self.feed_current_data(magictoken) |
1207 | 1229 |
form_data = session.get_by_magictoken(magictoken, {}) |
... | ... | |
1228 | 1250 |
# submitted a second time |
1229 | 1251 |
return template.error_page(_('This form has already been submitted.')) |
1230 | 1252 | |
1231 |
return self.submitted(form, existing_formdata) |
|
1253 |
try: |
|
1254 |
return self.submitted(form, existing_formdata) |
|
1255 |
except SetValueError: |
|
1256 |
if get_request().form.get('step') == '2': |
|
1257 |
# submit came from the validation page |
|
1258 |
return self.validating( |
|
1259 |
form_data, page_error_messages=[_('Technical error, please try again')] |
|
1260 |
) |
|
1261 |
else: |
|
1262 |
# last page |
|
1263 |
return self.page( |
|
1264 |
page, |
|
1265 |
page_change=False, |
|
1266 |
page_error_messages=[_('Technical error, please try again')], |
|
1267 |
transient_formdata=transient_formdata, |
|
1268 |
) |
|
1232 | 1269 | |
1233 | 1270 |
def reset_locked_data(self, form): |
1234 | 1271 |
# reset locked fields, making sure the user cannot alter them. |
... | ... | |
1338 | 1375 |
# on webservice results, there can be (temporary?) inconsistencies. |
1339 | 1376 |
return result_error('ouf ot range page_no') |
1340 | 1377 |
form = self.create_form(page=page) |
1341 |
data = self.formdef.get_data(form) |
|
1378 |
try: |
|
1379 |
data = self.formdef.get_data(form, raise_on_error=True) |
|
1380 |
except SetValueError as e: |
|
1381 |
return result_error('form deserialization failed: %s' % e) |
|
1342 | 1382 |
if not data: |
1343 | 1383 |
return result_error('nothing to save') |
1344 | 1384 | |
... | ... | |
1428 | 1468 |
displayed_fields = [] |
1429 | 1469 |
with get_publisher().substitutions.temporary_feed(formdata, force_mode='lazy'): |
1430 | 1470 |
form = self.create_form(page=page, displayed_fields=displayed_fields, transient_formdata=formdata) |
1431 |
formdata.data.update(self.formdef.get_data(form)) |
|
1471 |
try: |
|
1472 |
formdata.data.update(self.formdef.get_data(form, raise_on_error=True)) |
|
1473 |
except SetValueError as e: |
|
1474 |
return result_error('form deserialization failed: %s' % e) |
|
1432 | 1475 |
return FormStatusPage.live_process_fields(form, formdata, displayed_fields) |
1433 | 1476 | |
1434 | 1477 |
def clean_submission_context(self): |
... | ... | |
1443 | 1486 |
filled = self.get_current_draft() or self.formdef.data_class()() |
1444 | 1487 |
filled.just_created() |
1445 | 1488 | |
1446 |
filled.data = self.formdef.get_data(form) |
|
1489 |
filled.data = self.formdef.get_data(form, raise_on_error=True)
|
|
1447 | 1490 |
magictoken = get_request().form['magictoken'] |
1448 | 1491 |
computed_values = get_session().get_by_magictoken('%s-computed' % magictoken, {}) |
1449 | 1492 |
filled.data.update(computed_values) |
... | ... | |
1493 | 1536 |
code.formdata = formdata # this will .store() the code |
1494 | 1537 | |
1495 | 1538 |
def submitted_existing(self, form): |
1496 |
new_data = self.formdef.get_data(form) |
|
1539 |
new_data = self.formdef.get_data(form, raise_on_error=True)
|
|
1497 | 1540 |
magictoken = get_request().form['magictoken'] |
1498 | 1541 |
computed_values = get_session().get_by_magictoken('%s-computed' % magictoken, {}) |
1499 | 1542 |
new_data.update(computed_values) |
... | ... | |
1565 | 1608 |
return thumbnail |
1566 | 1609 |
return get_session().get_tempfile_content(t).get_file_pointer().read() |
1567 | 1610 | |
1568 |
def validating(self, data): |
|
1611 |
def validating(self, data, page_error_messages=None):
|
|
1569 | 1612 |
self.on_validation_page = True |
1570 | 1613 |
get_request().view_name = 'validation' |
1571 | 1614 |
self.html_top(self.formdef.name) |
... | ... | |
1573 | 1616 |
# over in rendering. |
1574 | 1617 |
get_request().environ['REQUEST_METHOD'] = 'GET' |
1575 | 1618 |
form = self.create_view_form(data) |
1619 |
if page_error_messages: |
|
1620 |
form.add_global_errors(page_error_messages) |
|
1576 | 1621 |
token_widget = form.get_widget(form.TOKEN_NAME) |
1577 | 1622 |
token_widget._parsed = True |
1578 | 1623 |
if self.formdef.has_captcha and not (get_session().get_user() or get_session().won_captcha): |
... | ... | |
1604 | 1649 |
} |
1605 | 1650 |
if self.has_draft_support() and data: |
1606 | 1651 |
context['tracking_code_box'] = lambda: self.tracking_code_box(data, magictoken) |
1607 | ||
1608 | 1652 |
return template.QommonTemplateResponse( |
1609 | 1653 |
templates=list(self.get_formdef_template_variants(self.validation_templates)), context=context |
1610 | 1654 |
) |
1611 |
- |