0003-workflow-to-configuration-for-notification-action-an.patch
tests/admin_pages/test_workflow.py | ||
---|---|---|
2783 | 2783 |
) |
2784 | 2784 | |
2785 | 2785 | |
2786 |
def test_workflows_user_notification_action(pub): |
|
2787 |
create_superuser(pub) |
|
2788 |
Workflow.wipe() |
|
2789 | ||
2790 |
wf = Workflow(name='notif') |
|
2791 |
st = wf.add_status('New', 'st') |
|
2792 |
wf.store() |
|
2793 | ||
2794 |
if not pub.site_options.has_section('variables'): |
|
2795 |
pub.site_options.add_section('variables') |
|
2796 |
pub.site_options.set('variables', 'portal_url', 'https://www.example.net/') |
|
2797 |
with open(os.path.join(pub.app_dir, 'site-options.cfg'), 'w') as fd: |
|
2798 |
pub.site_options.write(fd) |
|
2799 | ||
2800 |
app = login(get_app(pub)) |
|
2801 | ||
2802 |
resp = app.get('/backoffice/workflows/%s/status/%s/' % (wf.id, st.id)) |
|
2803 |
assert 'User Notification' in [o[0] for o in resp.forms[0]['action-interaction'].options] |
|
2804 | ||
2805 |
resp.forms[0]['action-interaction'] = 'User Notification' |
|
2806 |
resp = resp.forms[0].submit().follow() |
|
2807 |
assert 'User Notification' in resp.text |
|
2808 |
assert Workflow.get(wf.id).possible_status[0].items[0].to == ['_submitter'] |
|
2809 |
assert Workflow.get(wf.id).possible_status[0].items[0].users_template is None |
|
2810 | ||
2811 |
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (wf.id, st.id)) |
|
2812 |
assert resp.forms[0]['to'].value == '_submitter' |
|
2813 |
resp.forms[0]['to'] = '_receiver' |
|
2814 |
resp = resp.forms[0].submit('submit') |
|
2815 |
assert Workflow.get(wf.id).possible_status[0].items[0].to == ['_receiver'] |
|
2816 |
assert Workflow.get(wf.id).possible_status[0].items[0].users_template is None |
|
2817 | ||
2818 |
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (wf.id, st.id)) |
|
2819 |
assert resp.forms[0]['to'].value == '_receiver' |
|
2820 |
resp.forms[0]['to'] = '__other' |
|
2821 |
resp.forms[0]['users_template$value_text'] = 'foobar' |
|
2822 |
resp = resp.forms[0].submit('submit') |
|
2823 |
assert Workflow.get(wf.id).possible_status[0].items[0].to == [] |
|
2824 |
assert Workflow.get(wf.id).possible_status[0].items[0].users_template == 'foobar' |
|
2825 | ||
2826 |
resp = app.get('/backoffice/workflows/%s/status/%s/items/1/' % (wf.id, st.id)) |
|
2827 |
assert resp.forms[0]['to'].value == '__other' |
|
2828 |
assert resp.forms[0]['users_template$value_text'].value == 'foobar' |
|
2829 | ||
2830 | ||
2786 | 2831 |
def test_workflows_criticality_levels(pub): |
2787 | 2832 |
create_superuser(pub) |
2788 | 2833 |
tests/utilities.py | ||
---|---|---|
427 | 427 |
'http://remote.example.net/connection-error': (None, None, None), |
428 | 428 |
'http://authentic.example.net/idp/saml2/metadata': (200, metadata, None), |
429 | 429 |
'http://authentic2.example.net/idp/saml2/metadata': (200, metadata2, None), |
430 |
'https://portal/api/notification/add/': (200, {}, {}), |
|
431 |
'https://interco-portal/api/notification/add/': (200, {}, {}), |
|
430 | 432 |
}.get(base_url, (200, '', {})) |
431 | 433 | |
432 | 434 |
if url.startswith('file://'): |
tests/workflow/test_notification.py | ||
---|---|---|
20 | 20 |
clean_temporary_pub() |
21 | 21 | |
22 | 22 | |
23 |
@pytest.fixture |
|
24 |
def pub(request): |
|
25 |
pub = create_temporary_pub() |
|
23 |
def pub_fixture(**kwargs): |
|
24 |
pub = create_temporary_pub(**kwargs) |
|
26 | 25 |
pub.cfg['language'] = {'language': 'en'} |
27 | 26 |
pub.write_cfg() |
28 | 27 |
req = HTTPRequest(None, {'SERVER_NAME': 'example.net', 'SCRIPT_NAME': ''}) |
... | ... | |
34 | 33 |
return pub |
35 | 34 | |
36 | 35 | |
36 |
@pytest.fixture |
|
37 |
def pub(request): |
|
38 |
return pub_fixture() |
|
39 | ||
40 | ||
41 |
@pytest.fixture |
|
42 |
def sql_pub(request): |
|
43 |
return pub_fixture(sql_mode=True) |
|
44 | ||
45 | ||
37 | 46 |
def test_notifications(pub, http_requests): |
47 |
pub.user_class.wipe() |
|
48 |
FormDef.wipe() |
|
49 | ||
38 | 50 |
formdef = FormDef() |
39 | 51 |
formdef.name = 'baz' |
40 | 52 |
formdef.fields = [] |
41 | 53 |
formdef.store() |
54 |
formdef.data_class().wipe() |
|
42 | 55 | |
43 | 56 |
formdata = formdef.data_class()() |
44 | 57 |
formdata.just_created() |
... | ... | |
137 | 150 |
assert http_requests.count() == 1 |
138 | 151 |
assert http_requests.get_last('url') == 'https://interco-portal/api/notification/add/' |
139 | 152 |
assert set(json.loads(http_requests.get_last('body'))['name_ids']) == {'xxy1'} |
153 | ||
154 | ||
155 |
def test_notifications_to_users_template(sql_pub, http_requests): |
|
156 |
sql_pub.user_class.wipe() |
|
157 |
FormDef.wipe() |
|
158 | ||
159 |
user1 = sql_pub.user_class(name='userA') |
|
160 |
user1.name_identifiers = ['xxy1'] |
|
161 |
user1.email = 'user1@example.com' |
|
162 |
user1.store() |
|
163 |
user2 = sql_pub.user_class(name='userB') |
|
164 |
user2.name_identifiers = ['xxy2'] |
|
165 |
user2.email = 'user2@example.com' |
|
166 |
user2.store() |
|
167 | ||
168 |
formdef = FormDef() |
|
169 |
formdef.name = 'foo' |
|
170 |
formdef.fields = [] |
|
171 |
formdef.store() |
|
172 |
formdef.data_class().wipe() |
|
173 | ||
174 |
formdatas = [] |
|
175 |
for i in range(2): |
|
176 |
formdatas.append(formdef.data_class()()) |
|
177 | ||
178 |
formdatas[0].user_id = user1.id |
|
179 |
formdatas[1].user_id = user2.id |
|
180 | ||
181 |
for formdata in formdatas: |
|
182 |
formdata.just_created() |
|
183 |
formdata.store() |
|
184 | ||
185 |
formdef = FormDef() |
|
186 |
formdef.name = 'baz' |
|
187 |
formdef.fields = [] |
|
188 |
formdef.store() |
|
189 |
formdef.data_class().wipe() |
|
190 | ||
191 |
formdata = formdef.data_class()() |
|
192 |
formdata.just_created() |
|
193 |
formdata.store() |
|
194 | ||
195 |
assert not SendNotificationWorkflowStatusItem.is_available() |
|
196 |
if not sql_pub.site_options.has_section('variables'): |
|
197 |
sql_pub.site_options.add_section('variables') |
|
198 |
sql_pub.site_options.set('variables', 'portal_url', 'https://portal/') |
|
199 |
assert SendNotificationWorkflowStatusItem.is_available() |
|
200 | ||
201 |
item = SendNotificationWorkflowStatusItem() |
|
202 |
item.to = [] |
|
203 |
item.title = 'xxx' |
|
204 |
item.body = 'XXX' |
|
205 | ||
206 |
# no user template defined |
|
207 |
http_requests.empty() |
|
208 |
item.perform(formdata) |
|
209 |
assert http_requests.count() == 0 |
|
210 | ||
211 |
for users_template in [ |
|
212 |
'xxy1, , xxy2', |
|
213 |
'user1@example.com,user2@example.com', |
|
214 |
'{% for obj in forms|objects:"foo" %}{{ obj|get:"form_user_nameid" }},{% endfor %}', |
|
215 |
'{% for obj in forms|objects:"foo" %}{{ obj|get:"form_user_email" }},{% endfor %}', |
|
216 |
'{{ forms|objects:"foo"|getlist:"form_user_nameid" }}', |
|
217 |
'{{ forms|objects:"foo"|getlist:"form_user_nameid"|list }}', |
|
218 |
'{{ forms|objects:"foo"|getlist:"form_user_email" }}', |
|
219 |
'{{ forms|objects:"foo"|getlist:"form_user_email"|list }}', |
|
220 |
'{{ forms|objects:"foo"|getlist:"form_user" }}', |
|
221 |
'{{ forms|objects:"foo"|getlist:"form_user"|list }}', |
|
222 |
]: |
|
223 |
item.users_template = users_template |
|
224 |
http_requests.empty() |
|
225 |
item.perform(formdata) |
|
226 |
assert http_requests.count() == 1 |
|
227 |
assert http_requests.get_last('url') == 'https://portal/api/notification/add/' |
|
228 |
assert set(json.loads(http_requests.get_last('body'))['name_ids']) == {'xxy1', 'xxy2'} |
|
229 | ||
230 |
formdatas[1].user_id = user1.id |
|
231 |
formdatas[1].store() |
|
232 |
for users_template in [ |
|
233 |
'xxy1,', |
|
234 |
'user1@example.com', |
|
235 |
'{% for obj in forms|objects:"foo" %}{{ obj|get:"form_user_nameid" }},{% endfor %}', |
|
236 |
'{% for obj in forms|objects:"foo" %}{{ obj|get:"form_user_email" }},{% endfor %}', |
|
237 |
'{{ forms|objects:"foo"|getlist:"form_user_nameid" }}', |
|
238 |
'{{ forms|objects:"foo"|getlist:"form_user_nameid"|list }}', |
|
239 |
'{{ forms|objects:"foo"|getlist:"form_user_email" }}', |
|
240 |
'{{ forms|objects:"foo"|getlist:"form_user_email"|list }}', |
|
241 |
'{{ forms|objects:"foo"|getlist:"form_user" }}', |
|
242 |
'{{ forms|objects:"foo"|getlist:"form_user"|list }}', |
|
243 |
]: |
|
244 |
item.users_template = users_template |
|
245 |
http_requests.empty() |
|
246 |
item.perform(formdata) |
|
247 |
assert http_requests.count() == 1 |
|
248 |
assert http_requests.get_last('url') == 'https://portal/api/notification/add/' |
|
249 |
assert set(json.loads(http_requests.get_last('body'))['name_ids']) == {'xxy1'} |
|
250 | ||
251 |
# unknown user |
|
252 |
item.users_template = 'xxy1, foobar' |
|
253 |
http_requests.empty() |
|
254 |
item.perform(formdata) |
|
255 |
assert http_requests.count() == 1 |
|
256 |
assert set(json.loads(http_requests.get_last('body'))['name_ids']) == {'xxy1'} |
|
257 |
assert sql_pub.loggederror_class.count() == 1 |
|
258 |
logged_error = sql_pub.loggederror_class.select()[0] |
|
259 |
assert logged_error.summary == 'Failed to notify user (not found: "foobar")' |
|
260 |
assert logged_error.formdata_id == str(formdata.id) |
|
261 | ||
262 |
# result bad format |
|
263 |
item.users_template = '{{ forms|objects:"foo"|get:"foobarbaz" }}' # None |
|
264 |
http_requests.empty() |
|
265 |
item.perform(formdata) |
|
266 |
assert http_requests.count() == 0 |
|
267 |
assert sql_pub.loggederror_class.count() == 2 |
|
268 |
logged_error = sql_pub.loggederror_class.select()[1] |
|
269 |
assert logged_error.summary == 'Failed to notify users, bad template result (None)' |
|
270 |
assert logged_error.formdata_id == str(formdata.id) |
|
271 | ||
272 |
# template error |
|
273 |
item.users_template = '{% for obj in forms|objects:"foo" %}{{ obj|get:"form_user_nameid" }},' |
|
274 |
http_requests.empty() |
|
275 |
item.perform(formdata) |
|
276 |
assert http_requests.count() == 0 |
|
277 |
assert sql_pub.loggederror_class.count() == 3 |
|
278 |
logged_error = sql_pub.loggederror_class.select()[2] |
|
279 |
assert logged_error.summary == 'Failed to compute template' |
|
280 |
assert logged_error.formdata_id == str(formdata.id) |
wcs/variables.py | ||
---|---|---|
28 | 28 |
from .qommon.evalutils import make_datetime |
29 | 29 |
from .qommon.storage import And, Equal, Intersects, Not, NotEqual, Null, Or |
30 | 30 |
from .qommon.substitution import CompatibilityNamesDict |
31 |
from .qommon.templatetags.qommon import parse_datetime |
|
31 |
from .qommon.templatetags.qommon import parse_datetime, unlazy
|
|
32 | 32 | |
33 | 33 | |
34 | 34 |
class LazyFormDefObjectsManager: |
... | ... | |
310 | 310 |
if hasattr(value, 'timetuple'): |
311 | 311 |
value = value.timetuple() |
312 | 312 |
else: |
313 |
value = value.get_value()
|
|
313 |
value = unlazy(value)
|
|
314 | 314 |
self._cached_resultset.append(value) |
315 | 315 | |
316 | 316 |
def __len__(self): |
wcs/wf/notification.py | ||
---|---|---|
18 | 18 | |
19 | 19 |
from quixote import get_publisher |
20 | 20 | |
21 |
from wcs.variables import LazyList, LazyUser |
|
21 | 22 |
from wcs.workflows import WorkflowStatusItem, register_item_class, template_on_formdata |
22 | 23 | |
23 | 24 |
from ..qommon import _ |
24 |
from ..qommon.form import ComputedExpressionWidget, SingleSelectWidget, StringWidget, TextWidget, WidgetList
|
|
25 |
from ..qommon.form import ComputedExpressionWidget, SingleSelectWidget, StringWidget, TextWidget |
|
25 | 26 |
from ..qommon.template import TemplateError |
26 | 27 |
from .wscall import WebserviceCallStatusItem |
27 | 28 | |
... | ... | |
34 | 35 | |
35 | 36 |
# parameters |
36 | 37 |
to = ['_submitter'] |
38 |
users_template = None |
|
37 | 39 |
title = None |
38 | 40 |
body = None |
39 | 41 |
origin = None |
... | ... | |
68 | 70 |
return _(self.description) |
69 | 71 | |
70 | 72 |
def get_parameters(self): |
71 |
return ('title', 'body', 'origin', 'condition') |
|
73 |
return ('to', 'users_template', 'title', 'body', 'origin', 'condition')
|
|
72 | 74 | |
73 | 75 |
def add_parameters_widgets(self, form, parameters, prefix='', formdef=None, **kwargs): |
74 | 76 |
if 'to' in parameters: |
75 | 77 |
# never displayed in the current UI (no 'to' in get_parameters) |
78 |
options = [(None, '---', None)] + self.get_list_of_roles(include_logged_in_users=False) |
|
79 |
options.append(('__other', _('Other (from template)'), '__other')) |
|
76 | 80 |
form.add( |
77 |
WidgetList,
|
|
81 |
SingleSelectWidget,
|
|
78 | 82 |
'%sto' % prefix, |
79 | 83 |
title=_('To'), |
80 |
element_type=SingleSelectWidget, |
|
81 |
value=self.to, |
|
82 |
add_element_label=self.get_add_role_label(), |
|
83 |
element_kwargs={ |
|
84 |
'render_br': False, |
|
85 |
'options': [(None, '---', None)] + self.get_list_of_roles(include_logged_in_users=False), |
|
84 |
value=self.to[0] if self.to else '__other', |
|
85 |
options=options, |
|
86 |
attrs={'data-dynamic-display-parent': 'true'}, |
|
87 |
) |
|
88 |
if 'users_template' in parameters: |
|
89 |
form.add( |
|
90 |
ComputedExpressionWidget, |
|
91 |
'%susers_template' % prefix, |
|
92 |
title=_('Users template'), |
|
93 |
value=self.users_template, |
|
94 |
attrs={ |
|
95 |
'data-dynamic-display-child-of': '%sto' % prefix, |
|
96 |
'data-dynamic-display-value': '__other', |
|
86 | 97 |
}, |
98 |
allow_python=False, |
|
87 | 99 |
) |
88 | 100 |
if 'title' in parameters: |
89 | 101 |
form.add( |
... | ... | |
117 | 129 |
self, form, parameters, prefix=prefix, formdef=formdef, **kwargs |
118 | 130 |
) |
119 | 131 | |
132 |
def submit_admin_form(self, form): |
|
133 |
super().submit_admin_form(form) |
|
134 |
if not form.has_errors(): |
|
135 |
if self.to == '__other': |
|
136 |
self.to = [] |
|
137 |
elif self.to and isinstance(self.to, str): |
|
138 |
self.to = [self.to] |
|
139 |
self.users_template = None |
|
140 | ||
120 | 141 |
def perform(self, formdata): |
121 |
if not (self.is_available() and self.to and self.title and self.body):
|
|
142 |
if not (self.is_available() and (self.to or self.users_template) and self.title and self.body):
|
|
122 | 143 |
return |
123 | 144 | |
124 | 145 |
try: |
... | ... | |
138 | 159 |
return |
139 | 160 | |
140 | 161 |
users = [] |
141 |
for dest in self.to: |
|
142 |
if dest == '_submitter': |
|
143 |
users.append(formdata.get_user()) |
|
144 |
continue |
|
162 |
if self.to: |
|
163 |
for dest in self.to: |
|
164 |
if dest == '_submitter': |
|
165 |
users.append(formdata.get_user()) |
|
166 |
continue |
|
145 | 167 | |
146 |
for dest_id in formdata.get_function_roles(dest): |
|
168 |
for dest_id in formdata.get_function_roles(dest): |
|
169 |
try: |
|
170 |
role = get_publisher().role_class.get(dest_id) |
|
171 |
except KeyError: |
|
172 |
continue |
|
173 |
users.extend(get_publisher().user_class.get_users_with_role(role.id)) |
|
174 |
else: |
|
175 |
with get_publisher().complex_data(): |
|
147 | 176 |
try: |
148 |
role = get_publisher().role_class.get(dest_id) |
|
149 |
except KeyError: |
|
150 |
continue |
|
151 |
users.extend(get_publisher().user_class.get_users_with_role(role.id)) |
|
177 |
to = self.compute(self.users_template, allow_complex=True, raises=True, formdata=formdata) |
|
178 |
except Exception: |
|
179 |
return |
|
180 |
else: |
|
181 |
to = get_publisher().get_cached_complex_data(to) |
|
182 | ||
183 |
if isinstance(to, LazyList): |
|
184 |
to = list(to) |
|
185 |
elif isinstance(to, str): |
|
186 |
to = [x.strip() for x in to.split(',')] |
|
187 |
if not isinstance(to, list): |
|
188 |
get_publisher().record_error( |
|
189 |
_('Failed to notify users, bad template result (%s)') % to, |
|
190 |
formdata=formdata, |
|
191 |
status_item=self, |
|
192 |
) |
|
193 |
return |
|
194 |
to = [v for v in to if v] |
|
195 |
for value in to: |
|
196 |
if isinstance(value, LazyUser): |
|
197 |
value = value._user |
|
198 |
if isinstance(value, get_publisher().user_class): |
|
199 |
users.append(value) |
|
200 |
else: |
|
201 |
user = get_publisher().user_class.lookup_by_string(str(value)) |
|
202 |
if not user: |
|
203 |
get_publisher().record_error( |
|
204 |
_('Failed to notify user (not found: "%s")') % value, |
|
205 |
formdata=formdata, |
|
206 |
status_item=self, |
|
207 |
) |
|
208 |
continue |
|
209 |
users.append(user) |
|
152 | 210 | |
153 | 211 |
name_ids = set() |
154 | 212 |
for user in users: |
... | ... | |
157 | 215 |
for name_id in user.name_identifiers or []: |
158 | 216 |
name_ids.add(name_id) |
159 | 217 | |
160 |
if name_ids: |
|
161 |
self.post_data = { |
|
162 |
'summary': title, |
|
163 |
'body': body, |
|
164 |
'url': formdata.get_url(), |
|
165 |
'origin': self.origin or '', |
|
166 |
'id': 'formdata:%s' % formdata.get_display_id(), |
|
167 |
'name_ids': list(name_ids), |
|
168 |
} |
|
169 |
self.url = self.get_api_url() |
|
170 | ||
171 |
super().perform(formdata) |
|
218 |
if not name_ids: |
|
219 |
return |
|
220 | ||
221 |
self.post_data = { |
|
222 |
'summary': title, |
|
223 |
'body': body, |
|
224 |
'url': formdata.get_url(), |
|
225 |
'origin': self.origin or '', |
|
226 |
'id': 'formdata:%s' % formdata.get_display_id(), |
|
227 |
'name_ids': list(name_ids), |
|
228 |
} |
|
229 |
self.url = self.get_api_url() |
|
230 | ||
231 |
super().perform(formdata) |
|
172 | 232 | |
173 | 233 | |
174 | 234 |
register_item_class(SendNotificationWorkflowStatusItem) |
175 |
- |