4 |
4 |
from email.mime.text import MIMEText
|
5 |
5 |
from email.mime.multipart import MIMEMultipart
|
6 |
6 |
|
|
7 |
from wcs.formdef import FormDef
|
|
8 |
from wcs.workflows import Workflow
|
|
9 |
from wcs.wf.jump import JumpWorkflowStatusItem
|
|
10 |
from wcs.fields import StringField, EmailField
|
7 |
11 |
import wcs.qommon.ctl
|
8 |
12 |
from wcs.ctl.collectstatic import CmdCollectStatic
|
9 |
13 |
from wcs.ctl.process_bounce import CmdProcessBounce
|
|
14 |
from wcs.ctl.wipe_data import CmdWipeData
|
|
15 |
from wcs.ctl.trigger_jumps import CmdTriggerJumps, select_and_jump_formdata
|
10 |
16 |
|
11 |
17 |
from utilities import create_temporary_pub, clean_temporary_pub
|
12 |
18 |
|
... | ... | |
53 |
59 |
assert CmdProcessBounce.get_bounce_addrs(msg) == ['foobar@localhost']
|
54 |
60 |
|
55 |
61 |
def test_wipe_formdata(pub):
|
56 |
|
from wcs import formdef, fields
|
57 |
|
from wcs.ctl.wipe_data import CmdWipeData
|
58 |
|
|
59 |
|
form_1 = formdef.FormDef()
|
|
62 |
form_1 = FormDef()
|
60 |
63 |
form_1.name = 'example'
|
61 |
|
form_1.fields = [fields.StringField(id='0', label='Your Name'),
|
62 |
|
fields.EmailField(id='1', label='Email')]
|
|
64 |
form_1.fields = [StringField(id='0', label='Your Name'),
|
|
65 |
EmailField(id='1', label='Email')]
|
63 |
66 |
form_1.store()
|
64 |
67 |
form_1.data_class().wipe()
|
65 |
68 |
formdata_1 = form_1.data_class()()
|
... | ... | |
69 |
72 |
|
70 |
73 |
assert form_1.data_class().count() == 1
|
71 |
74 |
|
72 |
|
form_2 = formdef.FormDef()
|
|
75 |
form_2 = FormDef()
|
73 |
76 |
form_2.name = 'example2'
|
74 |
|
form_2.fields = [fields.StringField(id='0', label='First Name'),
|
75 |
|
fields.StringField(id='1', label='Last Name')]
|
|
77 |
form_2.fields = [StringField(id='0', label='First Name'),
|
|
78 |
StringField(id='1', label='Last Name')]
|
76 |
79 |
form_2.store()
|
77 |
80 |
form_2.data_class().wipe()
|
78 |
81 |
formdata_2 = form_2.data_class()()
|
... | ... | |
108 |
111 |
wipe_cmd.wipe(pub, sub_options, [])
|
109 |
112 |
assert form_1.data_class().count() == 0
|
110 |
113 |
assert form_2.data_class().count() == 0
|
|
114 |
|
|
115 |
def test_trigger_jumps(pub):
|
|
116 |
Workflow.wipe()
|
|
117 |
workflow = Workflow(name='test')
|
|
118 |
st1 = workflow.add_status('Status1', 'st1')
|
|
119 |
jump = JumpWorkflowStatusItem()
|
|
120 |
jump.trigger = 'goto2'
|
|
121 |
jump.status = 'st2'
|
|
122 |
st1.items.append(jump)
|
|
123 |
jump.parent = st1
|
|
124 |
st2 = workflow.add_status('Status2', 'st2')
|
|
125 |
workflow.store()
|
|
126 |
|
|
127 |
FormDef.wipe()
|
|
128 |
formdef = FormDef()
|
|
129 |
formdef.name = 'test'
|
|
130 |
formdef.fields = [StringField(id='0', label='Your Name', varname='name'),
|
|
131 |
EmailField(id='1', label='Email', varname='email')]
|
|
132 |
formdef.workflow_id = workflow.id
|
|
133 |
formdef.store()
|
|
134 |
|
|
135 |
def run_trigger(trigger, rows):
|
|
136 |
formdef.data_class().wipe()
|
|
137 |
formdata = formdef.data_class()()
|
|
138 |
formdata.data = {'0': 'Alice', '1': 'alice@example.net'}
|
|
139 |
formdata.status = 'wf-%s' % st1.id
|
|
140 |
formdata.store()
|
|
141 |
id1 = formdata.id
|
|
142 |
formdata = formdef.data_class()()
|
|
143 |
formdata.data = {'0': 'Bob', '1': 'bob@example.net'}
|
|
144 |
formdata.status = 'wf-%s' % st1.id
|
|
145 |
formdata.store()
|
|
146 |
id2 = formdata.id
|
|
147 |
select_and_jump_formdata(formdef, trigger, rows)
|
|
148 |
return formdef.data_class().get(id1), formdef.data_class().get(id2)
|
|
149 |
|
|
150 |
f1, f2 = run_trigger('goto2', '__all__')
|
|
151 |
assert f1.status == f2.status == 'wf-%s' % st2.id
|
|
152 |
|
|
153 |
# check publisher substitutions vars after the last jump_and_perform (#13964)
|
|
154 |
assert pub in pub.substitutions.sources
|
|
155 |
assert formdef in pub.substitutions.sources
|
|
156 |
# we cannot know which formdata is the last one, test each possibility
|
|
157 |
if f1 in pub.substitutions.sources:
|
|
158 |
assert f2 not in pub.substitutions.sources
|
|
159 |
if f2 in pub.substitutions.sources:
|
|
160 |
assert f1 not in pub.substitutions.sources
|
|
161 |
|
|
162 |
f1, f2 = run_trigger('goto2', [{'select': {}}])
|
|
163 |
assert f1.status == f2.status == 'wf-%s' % st2.id
|
|
164 |
|
|
165 |
f1, f2 = run_trigger('goto2', [{'select': {'form_number_raw': '1'}}])
|
|
166 |
assert f1.status == 'wf-%s' % st2.id
|
|
167 |
assert f2.status == 'wf-%s' % st1.id
|
|
168 |
|
|
169 |
f1, f2 = run_trigger('goto2', [{'select': {'form_var_email': 'bob@example.net'}}])
|
|
170 |
assert f1.status == 'wf-%s' % st1.id
|
|
171 |
assert f2.status == 'wf-%s' % st2.id
|
|
172 |
|
|
173 |
f1, f2 = run_trigger('goto2', [{'select': {},
|
|
174 |
'data': {'foo': 'bar'}}])
|
|
175 |
assert f1.status == f2.status == 'wf-%s' % st2.id
|
|
176 |
assert f1.workflow_data['foo'] == f2.workflow_data['foo'] == 'bar'
|
|
177 |
|
|
178 |
f1, f2 = run_trigger('goto2', [{'select': {'form_number_raw': '1'},
|
|
179 |
'data': {'foo': 'bar'}}])
|
|
180 |
assert f1.status == 'wf-%s' % st2.id
|
|
181 |
assert f1.workflow_data['foo'] == 'bar'
|
|
182 |
assert f2.status == 'wf-%s' % st1.id
|
|
183 |
assert not f2.workflow_data
|
|
184 |
|
|
185 |
f1, f2 = run_trigger('badtrigger', '__all__')
|
|
186 |
assert f1.status == f2.status == 'wf-%s' % st1.id
|
|
187 |
assert not f1.workflow_data
|
|
188 |
assert not f2.workflow_data
|