0001-general-introduce-a-new-computed-data-field-52110.patch
tests/admin_pages/test_form.py | ||
---|---|---|
2650 | 2650 |
app = login(get_app(pub)) |
2651 | 2651 |
resp = app.get('/backoffice/forms/%s/' % formdef.id) |
2652 | 2652 |
assert 'x [webservice.xxx.foobar] x' in resp.text |
2653 | ||
2654 | ||
2655 |
def test_form_new_computed_field(pub): |
|
2656 |
create_superuser(pub) |
|
2657 |
create_role(pub) |
|
2658 | ||
2659 |
FormDef.wipe() |
|
2660 |
formdef = FormDef() |
|
2661 |
formdef.name = 'form title' |
|
2662 |
formdef.fields = [] |
|
2663 |
formdef.store() |
|
2664 | ||
2665 |
app = login(get_app(pub)) |
|
2666 |
resp = app.get('/backoffice/forms/1/') |
|
2667 |
resp = resp.click(href='fields/') |
|
2668 | ||
2669 |
resp.forms[0]['label'] = 'foobar' |
|
2670 |
resp.forms[0]['type'] = 'computed' |
|
2671 |
resp = resp.forms[0].submit().follow() |
|
2672 | ||
2673 |
assert len(FormDef.get(1).fields) == 1 |
|
2674 |
assert FormDef.get(1).fields[0].key == 'computed' |
|
2675 |
assert FormDef.get(1).fields[0].label == 'foobar' |
|
2676 |
assert FormDef.get(1).fields[0].varname == 'foobar' |
tests/form_pages/test_computed_field.py | ||
---|---|---|
1 |
import datetime |
|
2 | ||
3 |
import pytest |
|
4 |
from django.utils.timezone import make_aware |
|
5 | ||
6 |
from wcs import fields |
|
7 |
from wcs.formdef import FormDef |
|
8 |
from wcs.wscalls import NamedWsCall |
|
9 | ||
10 |
from ..utilities import clean_temporary_pub, create_temporary_pub, get_app, login |
|
11 |
from .test_all import create_user |
|
12 | ||
13 | ||
14 |
def pytest_generate_tests(metafunc): |
|
15 |
if 'pub' in metafunc.fixturenames: |
|
16 |
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True) |
|
17 | ||
18 | ||
19 |
@pytest.fixture |
|
20 |
def pub(request, emails): |
|
21 |
pub = create_temporary_pub( |
|
22 |
sql_mode=bool('sql' in request.param), |
|
23 |
templates_mode=bool('templates' in request.param), |
|
24 |
lazy_mode=bool('lazy' in request.param), |
|
25 |
) |
|
26 |
pub.cfg['identification'] = {'methods': ['password']} |
|
27 |
pub.cfg['language'] = {'language': 'en'} |
|
28 |
pub.write_cfg() |
|
29 | ||
30 |
return pub |
|
31 | ||
32 | ||
33 |
def teardown_module(module): |
|
34 |
clean_temporary_pub() |
|
35 | ||
36 | ||
37 |
def test_computed_field_simple(pub): |
|
38 |
create_user(pub) |
|
39 | ||
40 |
FormDef.wipe() |
|
41 |
formdef = FormDef() |
|
42 |
formdef.name = 'test' |
|
43 |
formdef.fields = [ |
|
44 |
fields.ComputedField(id='1', label='computed', varname='computed', value_template='{{ "xxx" }}'), |
|
45 |
] |
|
46 |
formdef.store() |
|
47 |
formdef.data_class().wipe() |
|
48 | ||
49 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/') |
|
50 |
resp = resp.forms[0].submit('submit') # -> validation |
|
51 |
resp = resp.forms[0].submit('submit').follow() # -> submit |
|
52 |
assert 'The form has been recorded' in resp.text |
|
53 |
assert formdef.data_class().count() == 1 |
|
54 |
formdata = formdef.data_class().select()[0] |
|
55 |
assert formdata.data == {'1': 'xxx'} |
|
56 | ||
57 | ||
58 |
def test_computed_field_used_in_prefill(pub): |
|
59 |
FormDef.wipe() |
|
60 |
formdef = FormDef() |
|
61 |
formdef.name = 'test' |
|
62 |
formdef.fields = [ |
|
63 |
fields.ComputedField(id='1', label='computed', varname='computed', value_template='xxx'), |
|
64 |
fields.StringField( |
|
65 |
id='2', label='string', prefill={'type': 'string', 'value': '{{ form_var_computed }}'} |
|
66 |
), |
|
67 |
] |
|
68 |
formdef.store() |
|
69 |
formdef.data_class().wipe() |
|
70 | ||
71 |
resp = get_app(pub).get('/test/') |
|
72 |
assert resp.forms[0]['f2'].value == 'xxx' |
|
73 |
resp = resp.forms[0].submit('submit') # -> validation |
|
74 |
resp = resp.forms[0].submit('submit').follow() # -> submit |
|
75 |
assert 'The form has been recorded' in resp.text |
|
76 |
assert formdef.data_class().count() == 1 |
|
77 |
formdata = formdef.data_class().select()[0] |
|
78 |
assert formdata.data == {'1': 'xxx', '2': 'xxx'} |
|
79 | ||
80 | ||
81 |
def test_computed_field_used_in_comment(pub): |
|
82 |
FormDef.wipe() |
|
83 |
formdef = FormDef() |
|
84 |
formdef.name = 'test' |
|
85 |
formdef.fields = [ |
|
86 |
fields.ComputedField(id='1', label='computed', varname='computed', value_template='xxx'), |
|
87 |
fields.CommentField(id='2', label='X{{ form_var_computed }}Y', type='comment'), |
|
88 |
] |
|
89 |
formdef.store() |
|
90 |
formdef.data_class().wipe() |
|
91 | ||
92 |
resp = get_app(pub).get('/test/') |
|
93 |
assert 'XxxxY' in resp.text |
|
94 |
resp = resp.forms[0].submit('submit') # -> validation |
|
95 |
resp = resp.forms[0].submit('submit').follow() # -> submit |
|
96 |
assert 'The form has been recorded' in resp.text |
|
97 |
assert formdef.data_class().count() == 1 |
|
98 |
formdata = formdef.data_class().select()[0] |
|
99 |
assert formdata.data == {'1': 'xxx'} |
|
100 | ||
101 | ||
102 |
def test_computed_field_freeze(pub, freezer): |
|
103 |
FormDef.wipe() |
|
104 |
formdef = FormDef() |
|
105 |
formdef.name = 'test' |
|
106 |
formdef.fields = [ |
|
107 |
fields.ComputedField( |
|
108 |
id='1', |
|
109 |
label='computed', |
|
110 |
varname='computed', |
|
111 |
value_template='{% now "H:i" %}', |
|
112 |
freeze_on_initial_value=False, |
|
113 |
), |
|
114 |
] |
|
115 |
formdef.store() |
|
116 |
formdef.data_class().wipe() |
|
117 | ||
118 |
freezer.move_to(make_aware(datetime.datetime(2021, 4, 6, 10, 0))) |
|
119 |
resp = get_app(pub).get('/test/') |
|
120 |
freezer.move_to(make_aware(datetime.datetime(2021, 4, 6, 10, 5))) |
|
121 |
resp = resp.forms[0].submit('submit') # -> validation |
|
122 |
resp = resp.forms[0].submit('submit').follow() # -> submit |
|
123 |
assert 'The form has been recorded' in resp.text |
|
124 |
assert formdef.data_class().count() == 1 |
|
125 |
formdata = formdef.data_class().select()[0] |
|
126 |
assert formdata.data == {'1': '10:05'} |
|
127 | ||
128 |
formdef.data_class().wipe() |
|
129 |
formdef.fields[0].freeze_on_initial_value = True |
|
130 |
formdef.store() |
|
131 | ||
132 |
freezer.move_to(make_aware(datetime.datetime(2021, 4, 6, 10, 0))) |
|
133 |
resp = get_app(pub).get('/test/') |
|
134 |
freezer.move_to(make_aware(datetime.datetime(2021, 4, 6, 10, 5))) |
|
135 |
resp = resp.forms[0].submit('submit') # -> validation |
|
136 |
resp = resp.forms[0].submit('submit').follow() # -> submit |
|
137 |
assert 'The form has been recorded' in resp.text |
|
138 |
assert formdef.data_class().count() == 1 |
|
139 |
formdata = formdef.data_class().select()[0] |
|
140 |
assert formdata.data == {'1': '10:00'} |
|
141 | ||
142 | ||
143 |
def test_computed_field_from_request_get(pub): |
|
144 |
FormDef.wipe() |
|
145 |
formdef = FormDef() |
|
146 |
formdef.name = 'test' |
|
147 |
formdef.fields = [ |
|
148 |
fields.ComputedField( |
|
149 |
id='1', |
|
150 |
label='computed', |
|
151 |
varname='computed', |
|
152 |
value_template='{{ request.GET.param }}', |
|
153 |
freeze_on_initial_value=True, |
|
154 |
), |
|
155 |
] |
|
156 |
formdef.store() |
|
157 |
formdef.data_class().wipe() |
|
158 | ||
159 |
resp = get_app(pub).get('/test/?param=value') |
|
160 |
resp = resp.forms[0].submit('submit') # -> validation |
|
161 |
resp = resp.forms[0].submit('submit').follow() # -> submit |
|
162 |
assert 'The form has been recorded' in resp.text |
|
163 |
assert formdef.data_class().count() == 1 |
|
164 |
formdata = formdef.data_class().select()[0] |
|
165 |
assert formdata.data == {'1': 'value'} |
|
166 | ||
167 | ||
168 |
def test_computed_field_usage_in_post_condition(pub): |
|
169 |
FormDef.wipe() |
|
170 |
formdef = FormDef() |
|
171 |
formdef.name = 'test' |
|
172 |
formdef.fields = [ |
|
173 |
fields.PageField( |
|
174 |
id='0', |
|
175 |
label='1st page', |
|
176 |
type='page', |
|
177 |
post_conditions=[ |
|
178 |
{ |
|
179 |
'condition': { |
|
180 |
'type': 'django', |
|
181 |
'value': 'form_var_computed == "xxx"', |
|
182 |
}, |
|
183 |
'error_message': 'You shall not pass.', |
|
184 |
} |
|
185 |
], |
|
186 |
), |
|
187 |
fields.ComputedField( |
|
188 |
id='1', |
|
189 |
label='computed', |
|
190 |
varname='computed', |
|
191 |
value_template='{{ request.GET.param }}', |
|
192 |
freeze_on_initial_value=True, |
|
193 |
), |
|
194 |
] |
|
195 |
formdef.store() |
|
196 |
formdef.data_class().wipe() |
|
197 | ||
198 |
resp = get_app(pub).get('/test/?param=test') |
|
199 |
resp = resp.forms[0].submit('submit') # -> validation |
|
200 |
assert 'You shall not pass.' in resp.text |
|
201 |
resp = get_app(pub).get('/test/?param=xxx') |
|
202 |
resp = resp.forms[0].submit('submit') # -> validation |
|
203 |
resp = resp.forms[0].submit('submit').follow() # -> submit |
|
204 |
assert 'The form has been recorded' in resp.text |
|
205 | ||
206 | ||
207 |
def test_computed_field_usage_updated_in_post_condition(pub): |
|
208 |
FormDef.wipe() |
|
209 |
formdef = FormDef() |
|
210 |
formdef.name = 'test' |
|
211 |
formdef.fields = [ |
|
212 |
fields.PageField( |
|
213 |
id='0', |
|
214 |
label='1st page', |
|
215 |
type='page', |
|
216 |
post_conditions=[ |
|
217 |
{ |
|
218 |
'condition': { |
|
219 |
'type': 'django', |
|
220 |
'value': 'form_var_computed == "xxx"', |
|
221 |
}, |
|
222 |
'error_message': 'You shall not pass.', |
|
223 |
} |
|
224 |
], |
|
225 |
), |
|
226 |
fields.ComputedField( |
|
227 |
id='1', |
|
228 |
label='computed', |
|
229 |
varname='computed', |
|
230 |
value_template='{{ form_var_field }}', |
|
231 |
), |
|
232 |
fields.StringField(id='2', label='string', varname='field'), |
|
233 |
] |
|
234 |
formdef.store() |
|
235 |
formdef.data_class().wipe() |
|
236 | ||
237 |
resp = get_app(pub).get('/test/') |
|
238 |
resp.forms[0]['f2'].value = 'test' |
|
239 |
resp = resp.forms[0].submit('submit') # -> validation |
|
240 |
assert 'You shall not pass.' in resp.text |
|
241 |
resp.forms[0]['f2'].value = 'xxx' |
|
242 |
resp = resp.forms[0].submit('submit') # -> validation |
|
243 |
resp = resp.forms[0].submit('submit').follow() # -> submit |
|
244 |
assert 'The form has been recorded' in resp.text |
|
245 | ||
246 | ||
247 |
def test_computed_field_recall_draft(pub): |
|
248 |
create_user(pub) |
|
249 | ||
250 |
FormDef.wipe() |
|
251 |
formdef = FormDef() |
|
252 |
formdef.name = 'test' |
|
253 |
formdef.fields = [ |
|
254 |
fields.ComputedField( |
|
255 |
id='1', |
|
256 |
label='computed', |
|
257 |
varname='computed', |
|
258 |
value_template='{{ request.GET.param }}', |
|
259 |
freeze_on_initial_value=True, |
|
260 |
), |
|
261 |
] |
|
262 |
formdef.store() |
|
263 |
formdef.data_class().wipe() |
|
264 | ||
265 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/?param=value') |
|
266 |
resp = resp.forms[0].submit('submit') # -> validation |
|
267 |
assert formdef.data_class().count() == 1 |
|
268 |
formdata = formdef.data_class().select()[0] |
|
269 |
assert formdata.is_draft() |
|
270 | ||
271 |
# recall draft |
|
272 |
resp = login(get_app(pub), username='foo', password='foo').get(formdata.get_url()).follow() |
|
273 |
assert 'form-validation' in resp.text |
|
274 |
resp = resp.forms[1].submit('submit').follow() # -> submit |
|
275 |
assert 'The form has been recorded' in resp.text |
|
276 |
assert formdef.data_class().count() == 1 |
|
277 |
formdata = formdef.data_class().select()[0] |
|
278 |
assert formdata.data == {'1': 'value'} |
|
279 | ||
280 |
# retry, moving back to first page |
|
281 |
formdef.data_class().wipe() |
|
282 |
resp = login(get_app(pub), username='foo', password='foo').get('/test/?param=value') |
|
283 |
resp = resp.forms[0].submit('submit') # -> validation |
|
284 |
assert formdef.data_class().count() == 1 |
|
285 |
formdata = formdef.data_class().select()[0] |
|
286 |
assert formdata.is_draft() |
|
287 | ||
288 |
resp = login(get_app(pub), username='foo', password='foo').get(formdata.get_url()).follow() |
|
289 |
assert 'form-validation' in resp.text |
|
290 |
resp = resp.forms[1].submit('previous') # -> first page |
|
291 |
resp = resp.forms[1].submit('submit') # -> validation |
|
292 |
resp = resp.forms[1].submit('submit').follow() # -> submit |
|
293 |
assert 'The form has been recorded' in resp.text |
|
294 |
assert formdef.data_class().count() == 1 |
|
295 |
formdata = formdef.data_class().select()[0] |
|
296 |
assert formdata.data == {'1': 'value'} |
|
297 | ||
298 | ||
299 |
def test_computed_field_complex_data(pub, http_requests): |
|
300 |
FormDef.wipe() |
|
301 | ||
302 |
wscall = NamedWsCall() |
|
303 |
wscall.name = 'Hello world' |
|
304 |
wscall.request = {'url': 'http://remote.example.net/json'} |
|
305 |
wscall.store() |
|
306 | ||
307 |
formdef = FormDef() |
|
308 |
formdef.name = 'test' |
|
309 |
formdef.fields = [ |
|
310 |
fields.ComputedField( |
|
311 |
id='1', |
|
312 |
label='computed', |
|
313 |
varname='computed', |
|
314 |
value_template='{{ webservice.hello_world }}', |
|
315 |
freeze_on_initial_value=True, |
|
316 |
), |
|
317 |
fields.CommentField(id='2', label='X{{form_var_computed_foo}}Y', type='comment'), |
|
318 |
] |
|
319 |
formdef.store() |
|
320 |
formdef.data_class().wipe() |
|
321 | ||
322 |
resp = get_app(pub).get('/test/') |
|
323 |
assert 'XbarY' in resp.text |
|
324 |
resp = resp.forms[0].submit('submit') # -> validation |
|
325 |
resp = resp.forms[0].submit('submit').follow() # -> submit |
|
326 |
assert 'The form has been recorded' in resp.text |
|
327 |
assert formdef.data_class().count() == 1 |
|
328 |
formdata = formdef.data_class().select()[0] |
|
329 |
assert formdata.data['1'] == {'foo': 'bar'} |
tests/test_fields.py | ||
---|---|---|
50 | 50 |
if klass is fields.PageField: |
51 | 51 |
with pytest.raises(AttributeError): |
52 | 52 |
klass(label='foo').add_to_form(form) |
53 |
elif klass is fields.ComputedField: |
|
54 |
# no ui |
|
55 |
continue |
|
53 | 56 |
else: |
54 | 57 |
klass(label='foo').add_to_form(form) |
55 | 58 |
wcs/admin/blocks.py | ||
---|---|---|
46 | 46 |
('history', 'snapshots_dir'), |
47 | 47 |
] |
48 | 48 |
field_def_page_class = BlockFieldDefPage |
49 |
blacklisted_types = ['page', 'table', 'table-select', 'tablerows', 'ranked-items', 'blocks'] |
|
49 |
blacklisted_types = ['page', 'table', 'table-select', 'tablerows', 'ranked-items', 'blocks', 'computed']
|
|
50 | 50 |
support_import = False |
51 | 51 |
readonly_message = N_('This block of fields is readonly.') |
52 | 52 |
wcs/admin/settings.py | ||
---|---|---|
141 | 141 |
section = 'settings' |
142 | 142 |
field_def_page_class = UserFieldDefPage |
143 | 143 |
support_import = False |
144 |
blacklisted_types = ['page'] |
|
144 |
blacklisted_types = ['page', 'computed']
|
|
145 | 145 |
field_var_prefix = '..._user_var_' |
146 | 146 | |
147 | 147 |
def index_bottom(self): |
wcs/admin/workflows.py | ||
---|---|---|
1026 | 1026 |
section = 'workflows' |
1027 | 1027 |
field_def_page_class = WorkflowVariablesFieldDefPage |
1028 | 1028 |
support_import = False |
1029 |
blacklisted_types = ['page', 'blocks'] |
|
1029 |
blacklisted_types = ['page', 'blocks', 'computed']
|
|
1030 | 1030 |
field_var_prefix = 'form_option_' |
1031 | 1031 |
readonly_message = N_('This workflow is readonly.') |
1032 | 1032 | |
... | ... | |
1048 | 1048 |
section = 'workflows' |
1049 | 1049 |
field_def_page_class = WorkflowBackofficeFieldDefPage |
1050 | 1050 |
support_import = False |
1051 |
blacklisted_types = ['page', 'blocks'] |
|
1051 |
blacklisted_types = ['page', 'blocks', 'computed']
|
|
1052 | 1052 |
blacklisted_attributes = ['condition'] |
1053 | 1053 |
field_var_prefix = 'form_var_' |
1054 | 1054 |
readonly_message = N_('This workflow is readonly.') |
wcs/fields.py | ||
---|---|---|
3389 | 3389 |
return node |
3390 | 3390 | |
3391 | 3391 | |
3392 |
class ComputedField(Field): |
|
3393 |
key = 'computed' |
|
3394 |
description = _('Computed Data') |
|
3395 | ||
3396 |
value_template = None |
|
3397 |
freeze_on_initial_value = False |
|
3398 | ||
3399 |
add_to_form = None |
|
3400 |
add_to_view_form = None |
|
3401 |
get_opendocument_node_value = None |
|
3402 | ||
3403 |
def __init__(self, *args, **kwargs): |
|
3404 |
super().__init__(*args, **kwargs) |
|
3405 |
if not self.varname: |
|
3406 |
self.varname = misc.simplify(self.label, space='_') |
|
3407 | ||
3408 |
def get_admin_attributes(self): |
|
3409 |
attributes = super().get_admin_attributes() |
|
3410 |
attributes.remove('condition') |
|
3411 |
return attributes + ['varname', 'value_template', 'freeze_on_initial_value'] |
|
3412 | ||
3413 |
def fill_admin_form(self, form): |
|
3414 |
form.add(StringWidget, 'label', title=_('Label'), value=self.label, required=True, size=50) |
|
3415 |
form.add( |
|
3416 |
VarnameWidget, |
|
3417 |
'varname', |
|
3418 |
title=_('Identifier'), |
|
3419 |
required=True, |
|
3420 |
value=self.varname, |
|
3421 |
size=30, |
|
3422 |
hint=_('This is used as suffix for variable names.'), |
|
3423 |
) |
|
3424 |
form.add( |
|
3425 |
StringWidget, |
|
3426 |
'value_template', |
|
3427 |
title=_('Value Template'), |
|
3428 |
required=True, |
|
3429 |
size=80, |
|
3430 |
value=self.value_template, |
|
3431 |
validation_function=ComputedExpressionWidget.validate_template, |
|
3432 |
) |
|
3433 |
form.add( |
|
3434 |
CheckboxWidget, |
|
3435 |
'freeze_on_initial_value', |
|
3436 |
title=_('Freeze on initial value'), |
|
3437 |
value=self.freeze_on_initial_value, |
|
3438 |
) |
|
3439 | ||
3440 | ||
3441 |
register_field_class(ComputedField) |
|
3442 | ||
3443 | ||
3392 | 3444 |
def get_field_class_by_type(type): |
3393 | 3445 |
for k in field_classes: |
3394 | 3446 |
if k.key == type: |
wcs/formdef.py | ||
---|---|---|
612 | 612 |
) |
613 | 613 |
return form |
614 | 614 | |
615 |
def get_computed_fields_from_page(self, page): |
|
616 |
on_page = page is None |
|
617 |
for field in self.fields: |
|
618 |
if field.key == 'page': |
|
619 |
if on_page: |
|
620 |
break |
|
621 |
if page.id == field.id: |
|
622 |
on_page = True |
|
623 |
continue |
|
624 |
if not on_page: |
|
625 |
continue |
|
626 |
if field.key == 'computed': |
|
627 |
yield field |
|
628 | ||
615 | 629 |
def add_fields_to_form( |
616 | 630 |
self, |
617 | 631 |
form, |
... | ... | |
641 | 655 |
value = None |
642 | 656 |
if form_data: |
643 | 657 |
value = form_data.get(field.id) |
658 |
if not field.add_to_form: |
|
659 |
continue |
|
644 | 660 |
widget = field.add_to_form(form, value) |
645 | 661 |
widget.is_hidden = not (visible) |
646 | 662 |
widget.field = field |
... | ... | |
710 | 726 | |
711 | 727 |
for field in page['fields']: |
712 | 728 |
value = dict.get(field.id) |
729 |
if not field.add_to_view_form: |
|
730 |
continue |
|
713 | 731 |
if not field.include_in_validation_page: |
714 | 732 |
form.widgets.append(HtmlWidget(htmltext('<div style="display: none;">'))) |
715 | 733 |
field.add_to_view_form(form, value) |
... | ... | |
1298 | 1316 |
data = formdata.data |
1299 | 1317 |
for field in self.fields: |
1300 | 1318 |
if isinstance( |
1301 |
field, (fields.SubtitleField, fields.TitleField, fields.CommentField, fields.PageField) |
|
1319 |
field, |
|
1320 |
( |
|
1321 |
fields.SubtitleField, |
|
1322 |
fields.TitleField, |
|
1323 |
fields.CommentField, |
|
1324 |
fields.PageField, |
|
1325 |
fields.ComputedField, |
|
1326 |
), |
|
1302 | 1327 |
): |
1303 | 1328 |
continue |
1304 | 1329 |
if data is None: |
wcs/forms/common.py | ||
---|---|---|
310 | 310 |
form_data['draft_formdata_id'] = filled.id |
311 | 311 |
form_data['page_no'] = filled.page_no |
312 | 312 |
session.add_magictoken(magictoken, form_data) |
313 | ||
314 |
# restore computed fields data |
|
315 |
computed_data = {} |
|
316 |
for field in self.formdef.fields: |
|
317 |
if field.key != 'computed': |
|
318 |
continue |
|
319 |
if field.id in form_data: |
|
320 |
computed_data[field.id] = form_data[field.id] |
|
321 |
if computed_data: |
|
322 |
session.add_magictoken('%s-computed' % magictoken, computed_data) |
|
323 | ||
313 | 324 |
return redirect('../?mt=%s' % magictoken) |
314 | 325 | |
315 | 326 |
def get_workflow_form(self, user): |
wcs/forms/root.py | ||
---|---|---|
41 | 41 |
from wcs.qommon.form import get_selection_error_text |
42 | 42 |
from wcs.roles import logged_users_role |
43 | 43 |
from wcs.variables import LazyFormDef |
44 |
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef |
|
44 |
from wcs.workflows import Workflow, WorkflowBackofficeFieldsFormDef, WorkflowStatusItem
|
|
45 | 45 | |
46 | 46 |
from ..qommon import N_, _, emails, errors, get_cfg, get_logger, misc, template |
47 | 47 |
from ..qommon.admin.emails import EmailsDirectory |
48 | 48 |
from ..qommon.form import CheckboxWidget, EmailWidget, Form, HiddenErrorWidget, HtmlWidget, StringWidget |
49 |
from ..qommon.template import TemplateError |
|
49 | 50 | |
50 | 51 | |
51 | 52 |
class SubmittedDraftException(Exception): |
... | ... | |
320 | 321 |
from wcs.blocks import BlockSubWidget |
321 | 322 | |
322 | 323 |
for field in fields: |
324 |
if field.key == 'computed': |
|
325 |
continue |
|
323 | 326 |
field_key = '%s' % field.id |
324 | 327 |
widget = form.get_widget('f%s' % field_key) if form else None |
325 | 328 |
yield field, field_key, widget, None, None |
... | ... | |
458 | 461 | |
459 | 462 |
session = get_session() |
460 | 463 | |
464 |
magictoken = get_request().form.get('magictoken') |
|
461 | 465 |
if page and self.pages.index(page) > 0: |
462 |
magictoken = get_request().form['magictoken'] |
|
466 |
self.feed_current_data(magictoken) |
|
467 | ||
468 |
has_new_magictoken = False |
|
469 |
if magictoken: |
|
470 |
form_data = session.get_by_magictoken(magictoken, {}) |
|
471 |
else: |
|
472 |
form_data = {} |
|
473 | ||
474 |
if page == self.pages[0] and 'magictoken' not in get_request().form: |
|
475 |
magictoken = randbytes(8) |
|
476 |
has_new_magictoken = True |
|
477 | ||
478 |
computed_data = self.handle_computed_fields( |
|
479 |
magictoken, self.formdef.get_computed_fields_from_page(page) |
|
480 |
) |
|
481 |
if computed_data: |
|
482 |
form_data.update(computed_data) |
|
463 | 483 |
self.feed_current_data(magictoken) |
464 | 484 | |
465 | 485 |
with get_publisher().substitutions.temporary_feed(transient_formdata, force_mode='lazy'): |
... | ... | |
478 | 498 |
form.action = self.action_url |
479 | 499 |
# include a data-has-draft attribute on the <form> element when a draft |
480 | 500 |
# already exists for the form; this will activate the autosave. |
481 |
magictoken = get_request().form.get('magictoken') |
|
482 |
if magictoken: |
|
483 |
form_data = session.get_by_magictoken(magictoken, {}) |
|
484 |
if self.has_draft_support(): |
|
485 |
form.attrs['data-has-draft'] = 'yes' |
|
486 |
else: |
|
487 |
form_data = {} |
|
501 |
if not has_new_magictoken and self.has_draft_support(): |
|
502 |
form.attrs['data-has-draft'] = 'yes' |
|
488 | 503 | |
489 |
if page == self.pages[0] and 'magictoken' not in get_request().form: |
|
490 |
magictoken = randbytes(8) |
|
491 |
else: |
|
492 |
magictoken = get_request().form['magictoken'] |
|
493 | 504 |
form.add_hidden('magictoken', magictoken) |
494 | 505 |
data = session.get_by_magictoken(magictoken, {}) |
495 | 506 | |
... | ... | |
530 | 541 | |
531 | 542 |
if had_prefill: |
532 | 543 |
# include prefilled data |
533 |
transient_formdata = self.get_transient_formdata() |
|
544 |
transient_formdata = self.get_transient_formdata(magictoken)
|
|
534 | 545 |
transient_formdata.data.update(self.formdef.get_data(form)) |
535 | 546 |
if self.has_draft_support() and not (req.is_from_application() or req.is_from_bot()): |
536 | 547 |
# save to get prefilling data in database |
... | ... | |
620 | 631 |
templates=list(self.get_formdef_template_variants(self.filling_templates)), context=context |
621 | 632 |
) |
622 | 633 | |
634 |
def handle_computed_fields(self, magictoken, fields): |
|
635 |
fields = [x for x in fields if x.key == 'computed' and x.value_template] |
|
636 |
computed_values = get_session().get_by_magictoken('%s-computed' % magictoken, {}) |
|
637 |
if not fields: |
|
638 |
return computed_values |
|
639 |
if not computed_values: |
|
640 |
get_session().add_magictoken('%s-computed' % magictoken, computed_values) |
|
641 |
for field in fields: |
|
642 |
if field.freeze_on_initial_value and field.id in computed_values: |
|
643 |
continue |
|
644 |
with get_publisher().complex_data(): |
|
645 |
try: |
|
646 |
value = WorkflowStatusItem.compute(field.value_template, raises=True, allow_complex=True) |
|
647 |
except TemplateError: |
|
648 |
continue |
|
649 |
else: |
|
650 |
value = get_publisher().get_cached_complex_data(value) |
|
651 |
computed_values[field.id] = value |
|
652 |
return computed_values |
|
653 | ||
623 | 654 |
def modify_filling_context(self, context, page, data): |
624 | 655 |
pass |
625 | 656 | |
... | ... | |
687 | 718 |
formdata.user = get_request().user |
688 | 719 |
formdata.data = get_session().get_by_magictoken(magictoken, {}) |
689 | 720 |
formdata.prefilling_data = formdata.data.get('prefilling_data', {}) |
721 |
computed_values = get_session().get_by_magictoken('%s-computed' % magictoken) or {} |
|
722 |
formdata.data.update(computed_values) |
|
690 | 723 | |
691 | 724 |
if self.edit_mode: |
692 | 725 |
if magictoken is None: |
... | ... | |
886 | 919 |
if 'mt' in get_request().form: |
887 | 920 |
magictoken = get_request().form['mt'] |
888 | 921 |
data = session.get_by_magictoken(magictoken, {}) |
922 |
computed_values = session.get_by_magictoken('%s-computed' % magictoken, {}) |
|
889 | 923 |
if not get_request().is_in_backoffice(): |
890 | 924 |
# don't remove magictoken as the backoffice agent may get |
891 | 925 |
# the page reloaded. |
892 | 926 |
session.remove_magictoken(magictoken) |
893 |
if data: |
|
927 |
if data or computed_values:
|
|
894 | 928 |
# create a new one since the other has been exposed in a url |
895 | 929 |
magictoken = randbytes(8) |
896 |
session.add_magictoken(magictoken, data) |
|
930 |
session.add_magictoken(magictoken, data or {}) |
|
931 |
session.add_magictoken('%s-computed' % magictoken, computed_values) |
|
932 | ||
897 | 933 |
get_request().form['magictoken'] = magictoken |
898 | 934 |
self.feed_current_data(magictoken) |
899 | 935 |
if 'page_no' in data and int(data['page_no']) != 0: |
... | ... | |
985 | 1021 |
# for templates referencing fields from the sampe page. |
986 | 1022 |
self.reset_locked_data(form) |
987 | 1023 |
data = self.formdef.get_data(form) |
1024 |
computed_data = self.handle_computed_fields(magictoken, submitted_fields) |
|
988 | 1025 | |
989 | 1026 |
form_data.update(data) |
990 | 1027 | |
991 | 1028 |
if self.has_draft_support() and form.get_submit() == 'savedraft': |
1029 |
form_data.update(computed_data) |
|
992 | 1030 |
filled = self.save_draft(form_data, page_no) |
993 | 1031 |
return redirect(filled.get_url()) |
994 | 1032 | |
... | ... | |
1005 | 1043 |
form_data = copy.copy(session.get_by_magictoken(magictoken, {})) |
1006 | 1044 |
data = self.formdef.get_data(form) |
1007 | 1045 |
form_data.update(data) |
1046 |
form_data.update(computed_data) |
|
1008 | 1047 |
for i, post_condition in enumerate(post_conditions): |
1009 | 1048 |
condition = post_condition.get('condition') |
1010 | 1049 |
error_message = post_condition.get('error_message') |
... | ... | |
1042 | 1081 |
with get_publisher().substitutions.temporary_feed(transient_formdata, force_mode='lazy'): |
1043 | 1082 |
data = self.formdef.get_data(form) |
1044 | 1083 |
form_data.update(data) |
1084 |
form_data.update(computed_data) |
|
1045 | 1085 | |
1046 | 1086 |
session.add_magictoken(magictoken, form_data) |
1047 | 1087 | |
... | ... | |
1363 | 1403 |
filled.just_created() |
1364 | 1404 | |
1365 | 1405 |
filled.data = self.formdef.get_data(form) |
1406 |
magictoken = get_request().form['magictoken'] |
|
1407 |
computed_values = get_session().get_by_magictoken('%s-computed' % magictoken, {}) |
|
1408 |
filled.data.update(computed_values) |
|
1366 | 1409 |
session = get_session() |
1367 | 1410 |
filled.user = get_request().user |
1368 | 1411 |
wcs/sql.py | ||
---|---|---|
80 | 80 |
'password': 'text[][]', |
81 | 81 |
# field block |
82 | 82 |
'block': 'jsonb', |
83 |
# computed data field |
|
84 |
'computed': 'jsonb', |
|
83 | 85 |
} |
84 | 86 | |
85 | 87 | |
... | ... | |
1674 | 1676 |
if field.key in ('ranked-items', 'password'): |
1675 | 1677 |
# turn {'poire': 2, 'abricot': 1, 'pomme': 3} into an array |
1676 | 1678 |
value = [[force_str(x), force_str(y)] for x, y in value.items()] |
1679 |
elif field.key == 'computed': |
|
1680 |
if value is not None: |
|
1681 |
# embed value in a dict, so it's never necessary to cast the |
|
1682 |
# value for postgresql |
|
1683 |
value = {'data': value, '@type': 'computed-data'} |
|
1677 | 1684 |
elif sql_type == 'varchar': |
1678 | 1685 |
assert isinstance(value, str) |
1679 | 1686 |
elif sql_type == 'date': |
... | ... | |
1730 | 1737 |
for fmt, val in value: |
1731 | 1738 |
d[fmt] = force_str(val) |
1732 | 1739 |
value = d |
1740 |
elif field.key == 'computed': |
|
1741 |
if isinstance(value, dict) and value.get('@type') == 'computed-data': |
|
1742 |
value = value.get('data') |
|
1733 | 1743 |
if sql_type == 'date': |
1734 | 1744 |
value = value.timetuple() |
1735 | 1745 |
elif sql_type == 'bytea': |
1736 | 1746 |
value = pickle_loads(value) |
1737 |
elif sql_type == 'jsonb' and value.get('schema'): |
|
1747 |
elif sql_type == 'jsonb' and isinstance(value, dict) and value.get('schema'):
|
|
1738 | 1748 |
# block field, adapt date/field values |
1739 | 1749 |
for field_id, field_type in value.get('schema').items(): |
1740 | 1750 |
if field_type not in ('date', 'file'): |
wcs/variables.py | ||
---|---|---|
687 | 687 |
'file': LazyFieldVarFile, |
688 | 688 |
'block': LazyFieldVarBlock, |
689 | 689 |
'bool': LazyFieldVarBool, |
690 |
'computed': LazyFieldVarComputed, |
|
690 | 691 |
}.get(field.key, klass) |
691 | 692 | |
692 | 693 |
return klass(**self.get_field_kwargs(field)) |
... | ... | |
783 | 784 |
raise AssertionError('lazy cannot be pickled') |
784 | 785 | |
785 | 786 | |
786 |
class LazyFieldVarStructured(LazyFieldVar): |
|
787 |
class LazyFieldVarComplex(LazyFieldVar): |
|
788 |
def inspect_keys(self): |
|
789 |
keys = [] |
|
790 |
structured_value = self.get_field_var_value() |
|
791 | ||
792 |
def walk(base, value): |
|
793 |
if isinstance(value, dict): |
|
794 |
for k, v in value.items(): |
|
795 |
if CompatibilityNamesDict.valid_key_regex.match(k): |
|
796 |
walk(k if not base else base + '_' + k, v) |
|
797 |
else: |
|
798 |
keys.append(base) |
|
799 | ||
800 |
if isinstance(structured_value, list): |
|
801 |
for i, value in enumerate(structured_value): |
|
802 |
walk(str(i), value) |
|
803 |
else: |
|
804 |
walk('', structured_value) |
|
805 | ||
806 |
return keys |
|
807 | ||
808 |
def __getitem__(self, key): |
|
809 |
try: |
|
810 |
return super().__getitem__(key) |
|
811 |
except KeyError: |
|
812 |
pass |
|
813 |
structured_value = self.get_field_var_value() |
|
814 |
if not structured_value: |
|
815 |
raise KeyError(key) |
|
816 |
if isinstance(structured_value, dict): |
|
817 |
return structured_value[key] |
|
818 |
if isinstance(structured_value, list): |
|
819 |
for i, struct_value in enumerate(structured_value): |
|
820 |
if str(key) == str(i): |
|
821 |
return struct_value |
|
822 |
raise KeyError(key) |
|
823 | ||
824 | ||
825 |
class LazyFieldVarComputed(LazyFieldVarComplex): |
|
826 |
def get_field_var_value(self): |
|
827 |
return self.get_value() |
|
828 | ||
829 | ||
830 |
class LazyFieldVarStructured(LazyFieldVarComplex): |
|
787 | 831 |
def inspect_keys(self): |
788 | 832 |
structured_value = self._field.get_structured_value(self._data) |
789 | 833 |
if not structured_value: |
... | ... | |
801 | 845 |
else: |
802 | 846 |
keys.append('live') |
803 | 847 | |
804 |
def walk(base, value): |
|
805 |
if isinstance(value, dict): |
|
806 |
for k, v in value.items(): |
|
807 |
if CompatibilityNamesDict.valid_key_regex.match(k): |
|
808 |
walk(k if not base else base + '_' + k, v) |
|
809 |
else: |
|
810 |
keys.append(base) |
|
811 | ||
812 |
if isinstance(structured_value, list): |
|
813 |
for i, value in enumerate(structured_value): |
|
814 |
walk(str(i), value) |
|
815 |
else: |
|
816 |
walk('', structured_value) |
|
848 |
keys.extend(super().inspect_keys()) |
|
817 | 849 | |
818 | 850 |
return keys |
819 | 851 | |
... | ... | |
826 | 858 |
def structured(self): |
827 | 859 |
return self._field.get_structured_value(self._data) |
828 | 860 | |
861 |
def get_field_var_value(self): |
|
862 |
return self.structured |
|
863 | ||
829 | 864 |
@property |
830 | 865 |
def live(self): |
831 | 866 |
if not (self._field.data_source and self._field.data_source.get('type', '').startswith('carddef:')): |
... | ... | |
857 | 892 |
raise AttributeError('live') |
858 | 893 |
return LazyFormData(carddata) |
859 | 894 | |
860 |
def __getitem__(self, key): |
|
861 |
try: |
|
862 |
return super().__getitem__(key) |
|
863 |
except KeyError: |
|
864 |
pass |
|
865 |
structured_value = self._field.get_structured_value(self._data) |
|
866 |
if not structured_value: |
|
867 |
raise KeyError(key) |
|
868 |
if isinstance(structured_value, dict): |
|
869 |
return structured_value[key] |
|
870 |
if isinstance(structured_value, list): |
|
871 |
for i, struct_value in enumerate(structured_value): |
|
872 |
if str(key) == str(i): |
|
873 |
return struct_value |
|
874 |
raise KeyError(key) |
|
875 | ||
876 | 895 | |
877 | 896 |
class DateOperatorsMixin: |
878 | 897 |
def __eq__(self, other): |
wcs/wf/form.py | ||
---|---|---|
80 | 80 |
class WorkflowFormFieldsDirectory(FieldsDirectory): |
81 | 81 |
section = 'workflows' |
82 | 82 |
support_import = False |
83 |
blacklisted_types = ['page'] |
|
83 |
blacklisted_types = ['page', 'computed']
|
|
84 | 84 |
field_def_page_class = WorkflowFormFieldDefPage |
85 | 85 | |
86 | 86 | |
87 |
- |