12 |
12 |
from wcs.fields import StringField, DateField
|
13 |
13 |
from wcs.roles import Role
|
14 |
14 |
from wcs.workflows import (Workflow, WorkflowStatusItem,
|
15 |
|
SendmailWorkflowStatusItem, SendSMSWorkflowStatusItem)
|
|
15 |
SendmailWorkflowStatusItem, SendSMSWorkflowStatusItem,
|
|
16 |
DisplayMessageWorkflowStatusItem)
|
16 |
17 |
from wcs.wf.anonymise import AnonymiseWorkflowStatusItem
|
17 |
18 |
from wcs.wf.dispatch import DispatchWorkflowStatusItem
|
18 |
19 |
from wcs.wf.form import FormWorkflowStatusItem, WorkflowFormFieldsFormDef
|
... | ... | |
27 |
28 |
|
28 |
29 |
def setup_module(module):
|
29 |
30 |
cleanup()
|
30 |
|
global pub, req
|
|
31 |
|
|
32 |
def teardown_module(module):
|
|
33 |
clean_temporary_pub()
|
|
34 |
|
|
35 |
def pytest_generate_tests(metafunc):
|
|
36 |
if 'two_pubs' in metafunc.fixturenames:
|
|
37 |
metafunc.parametrize('two_pubs', ['pickle', 'sql'], indirect=True)
|
|
38 |
|
|
39 |
@pytest.fixture
|
|
40 |
def pub(request):
|
31 |
41 |
pub = create_temporary_pub()
|
32 |
42 |
pub.cfg['language'] = {'language': 'en'}
|
33 |
43 |
pub.write_cfg()
|
... | ... | |
36 |
46 |
req.user = None
|
37 |
47 |
pub._set_request(req)
|
38 |
48 |
req.session = sessions.BasicSession(id=1)
|
39 |
|
|
40 |
|
def teardown_module(module):
|
41 |
|
clean_temporary_pub()
|
42 |
|
|
43 |
|
def pytest_generate_tests(metafunc):
|
44 |
|
if 'two_pubs' in metafunc.fixturenames:
|
45 |
|
metafunc.parametrize('two_pubs', ['pickle', 'sql'], indirect=True)
|
|
49 |
return pub
|
46 |
50 |
|
47 |
51 |
@pytest.fixture
|
48 |
52 |
def two_pubs(request):
|
... | ... | |
57 |
61 |
|
58 |
62 |
return pub
|
59 |
63 |
|
60 |
|
def test_jump_nothing():
|
|
64 |
def test_jump_nothing(pub):
|
61 |
65 |
FormDef.wipe()
|
62 |
66 |
formdef = FormDef()
|
63 |
67 |
formdef.name = 'foobar'
|
... | ... | |
66 |
70 |
item = JumpWorkflowStatusItem()
|
67 |
71 |
assert item.must_jump(formdata) is True
|
68 |
72 |
|
69 |
|
def test_jump_datetime_condition():
|
|
73 |
def test_jump_datetime_condition(pub):
|
70 |
74 |
FormDef.wipe()
|
71 |
75 |
formdef = FormDef()
|
72 |
76 |
formdef.name = 'foobar'
|
... | ... | |
83 |
87 |
tomorrow.timetuple()[:3]
|
84 |
88 |
assert item.must_jump(formdata) is False
|
85 |
89 |
|
86 |
|
def test_jump_count_condition():
|
|
90 |
def test_jump_count_condition(pub):
|
87 |
91 |
FormDef.wipe()
|
88 |
92 |
formdef = FormDef()
|
89 |
93 |
formdef.name = 'foobar'
|
... | ... | |
101 |
105 |
item.condition = 'form_objects.count < 2'
|
102 |
106 |
assert item.must_jump(formdata) is False
|
103 |
107 |
|
104 |
|
def test_check_auth():
|
|
108 |
def test_check_auth(pub):
|
105 |
109 |
user = pub.user_class(name='foo')
|
106 |
110 |
user.store()
|
107 |
111 |
|
... | ... | |
148 |
152 |
formdata.workflow_roles = None
|
149 |
153 |
assert status_item.check_auth(formdata, user) is True
|
150 |
154 |
|
151 |
|
def test_dispatch():
|
|
155 |
def test_dispatch(pub):
|
152 |
156 |
formdef = FormDef()
|
153 |
157 |
formdef.name = 'baz'
|
154 |
158 |
formdef.store()
|
... | ... | |
165 |
169 |
item.perform(formdata)
|
166 |
170 |
assert formdata.workflow_roles == {'_receiver': '1'}
|
167 |
171 |
|
168 |
|
def test_roles():
|
|
172 |
def test_roles(pub):
|
169 |
173 |
user = pub.user_class()
|
170 |
174 |
user.store()
|
171 |
175 |
|
... | ... | |
206 |
210 |
item.perform(formdata)
|
207 |
211 |
assert pub.user_class.get(user.id).roles == ['2']
|
208 |
212 |
|
209 |
|
def test_anonymise():
|
|
213 |
def test_anonymise(pub):
|
210 |
214 |
formdef = FormDef()
|
211 |
215 |
formdef.name = 'baz'
|
212 |
216 |
formdef.fields = []
|
... | ... | |
222 |
226 |
assert formdef.data_class().get(formdata.id).user_id is None
|
223 |
227 |
assert formdef.data_class().get(formdata.id).anonymised
|
224 |
228 |
|
225 |
|
def test_remove():
|
|
229 |
def test_remove(pub):
|
226 |
230 |
formdef = FormDef()
|
227 |
231 |
formdef.name = 'baz'
|
228 |
232 |
formdef.store()
|
... | ... | |
239 |
243 |
formdata.store()
|
240 |
244 |
|
241 |
245 |
item = RemoveWorkflowStatusItem()
|
|
246 |
req = pub.get_request()
|
242 |
247 |
req.response.filter['in_backoffice'] = True
|
243 |
248 |
assert formdef.data_class().has_key(formdata.id)
|
244 |
249 |
assert item.perform(formdata) == '..'
|
... | ... | |
246 |
251 |
req.response.filter = {}
|
247 |
252 |
assert req.session.message
|
248 |
253 |
|
249 |
|
def test_register_comment():
|
|
254 |
def test_register_comment(pub):
|
250 |
255 |
pub.substitutions.feed(MockSubstitutionVariables())
|
251 |
256 |
|
252 |
257 |
formdef = FormDef()
|
... | ... | |
286 |
291 |
item.perform(formdata)
|
287 |
292 |
assert formdata.evolution[-1].display_parts()[-1] == '<div>1 < 3</div>'
|
288 |
293 |
|
289 |
|
def test_email():
|
|
294 |
def test_email(pub):
|
290 |
295 |
pub.substitutions.feed(MockSubstitutionVariables())
|
291 |
296 |
|
292 |
297 |
formdef = FormDef()
|
... | ... | |
378 |
383 |
assert emails.count() == 1
|
379 |
384 |
assert emails.get('foobar')['email_rcpt'] == ['xyz@localhost']
|
380 |
385 |
|
381 |
|
def test_webservice_call():
|
|
386 |
def test_webservice_call(pub):
|
382 |
387 |
pub.substitutions.feed(MockSubstitutionVariables())
|
383 |
388 |
|
384 |
389 |
formdef = FormDef()
|
... | ... | |
425 |
430 |
item.perform(formdata)
|
426 |
431 |
assert 'signature=' in http_requests.get_last('url')
|
427 |
432 |
|
428 |
|
def test_timeout():
|
|
433 |
def test_timeout(pub):
|
429 |
434 |
workflow = Workflow(name='timeout')
|
430 |
435 |
st1 = workflow.add_status('Status1', 'st1')
|
431 |
436 |
st2 = workflow.add_status('Status2', 'st2')
|
... | ... | |
491 |
496 |
assert str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
|
492 |
497 |
|
493 |
498 |
time.sleep(0.2)
|
494 |
|
_apply_timeouts(pub)
|
|
499 |
_apply_timeouts(two_pubs)
|
495 |
500 |
|
496 |
501 |
assert not str(formdata_id) in [str(x) for x in formdef.data_class().keys()]
|
497 |
502 |
|
498 |
|
def test_sms():
|
|
503 |
def test_sms(pub):
|
499 |
504 |
formdef = FormDef()
|
500 |
505 |
formdef.name = 'baz'
|
501 |
506 |
formdef.fields = []
|
... | ... | |
572 |
577 |
assert formdata.get_substitution_variables()['xxx_var_date_raw'] == \
|
573 |
578 |
time.strptime('2015-05-12', '%Y-%m-%d')
|
574 |
579 |
|
575 |
|
pub.cfg['language'] = {'language': 'en'}
|
|
580 |
two_pubs.cfg['language'] = {'language': 'en'}
|
576 |
581 |
|
577 |
|
def test_workflow_role_type_migration():
|
|
582 |
def test_workflow_role_type_migration(pub):
|
578 |
583 |
workflow = Workflow(name='role migration')
|
579 |
584 |
st1 = workflow.add_status('Status1', 'st1')
|
580 |
585 |
|
... | ... | |
588 |
593 |
|
589 |
594 |
reloaded_workflow = Workflow.get(workflow.id)
|
590 |
595 |
assert reloaded_workflow.possible_status[0].items[0].by == ['1', '2']
|
|
596 |
|
|
597 |
def test_workflow_display_message(pub):
|
|
598 |
pub.substitutions.feed(MockSubstitutionVariables())
|
|
599 |
|
|
600 |
workflow = Workflow(name='display message')
|
|
601 |
st1 = workflow.add_status('Status1', 'st1')
|
|
602 |
|
|
603 |
FormDef.wipe()
|
|
604 |
formdef = FormDef()
|
|
605 |
formdef.name = 'foobar'
|
|
606 |
formdef.fields = []
|
|
607 |
formdef.store()
|
|
608 |
formdata = formdef.data_class()()
|
|
609 |
formdata.id = '1'
|
|
610 |
|
|
611 |
display_message = DisplayMessageWorkflowStatusItem()
|
|
612 |
display_message.parent = st1
|
|
613 |
|
|
614 |
display_message.message = 'test'
|
|
615 |
assert display_message.get_message(formdata) == 'test'
|
|
616 |
|
|
617 |
display_message.message = '[number]'
|
|
618 |
assert display_message.get_message(formdata) == str(formdata.id)
|
|
619 |
|
|
620 |
display_message.message = '[bar]'
|
|
621 |
assert display_message.get_message(formdata) == 'Foobar'
|
|
622 |
|
|
623 |
# makes sure the string is correctly escaped for HTML
|
|
624 |
display_message.message = '[foo]'
|
|
625 |
assert display_message.get_message(formdata) == '1 < 3'
|