From 40ba9f339086f94b0610d53d1bdbe3044f01dbb3 Mon Sep 17 00:00:00 2001 From: Thomas NOEL Date: Tue, 26 Sep 2017 17:13:47 +0200 Subject: [PATCH] workflows: allow attachments in emails (#8274) --- tests/test_workflows.py | 111 ++++++++++++++++++++++++++++++++++++++++++++++++ wcs/workflows.py | 56 +++++++++++++++++++++++- 2 files changed, 166 insertions(+), 1 deletion(-) diff --git a/tests/test_workflows.py b/tests/test_workflows.py index d71e3bb9..c551d1fa 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -1,4 +1,5 @@ import datetime +import os import pytest import shutil import StringIO @@ -765,6 +766,116 @@ def test_email(pub, emails): assert emails.count() == 1 assert emails.get('foobar').get('from') == 'foobar@localhost' + +def test_email_attachments(pub, emails): + formdef = FormDef() + formdef.name = 'baz' + formdef.fields = [ + FileField(id='3', label='File', type='file', varname='file'), + ] + formdef.store() + + upload = PicklableUpload('test.jpeg', 'image/jpeg') + jpg = open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg')).read() + upload.receive([jpg]) + formdata = formdef.data_class()() + formdata.data = {'3': upload} + formdata.just_created() + formdata.store() + pub.substitutions.feed(formdata) + + sendmail = SendmailWorkflowStatusItem() + sendmail.subject = 'foobar' + sendmail.body = '

force html

' + sendmail.to = ['to@example.net'] + sendmail.attachments = ['form_var_file_raw'] + sendmail.perform(formdata) + get_response().process_after_jobs() + assert emails.count() == 1 + assert emails.emails['foobar']['msg'].is_multipart() + assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed' + assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html' + assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg' + + # build a backoffice field + Workflow.wipe() + FormDef.wipe() + wf = Workflow(name='email with attachments') + wf.backoffice_fields_formdef = WorkflowBackofficeFieldsFormDef(wf) + wf.backoffice_fields_formdef.fields = [ + FileField(id='bo1', label='bo field 1', type='file', varname='backoffice_file1'), + FileField(id='bo2', label='bo field 2', type='file', varname='backoffice_file2'), + ] + st1 = wf.add_status('Status1') + wf.store() + formdef = FormDef() + formdef.name = 'baz' + formdef.fields = [ + FileField(id='1', label='File', type='file', varname='frontoffice_file'), + ] + formdef.workflow_id = wf.id + formdef.store() + formdata = formdef.data_class()() + formdata.data = {'1': upload} + formdata.just_created() + formdata.store() + pub.substitutions.feed(formdata) + # store file in backoffice field form_fbo1 / form_var_backoffice_file_raw + setbo = SetBackofficeFieldsWorkflowStatusItem() + setbo.parent = st1 + setbo.fields = [{'field_id': 'bo1', 'value': '=form_var_frontoffice_file_raw'}] + setbo.perform(formdata) + + emails.empty() + sendmail.attachments = ['form_fbo1'] + sendmail.perform(formdata) + get_response().process_after_jobs() + assert emails.count() == 1 + assert emails.emails['foobar']['msg'].is_multipart() + assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed' + assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html' + assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg' + + emails.empty() + sendmail.attachments = ['form_var_backoffice_file1_raw'] + sendmail.perform(formdata) + get_response().process_after_jobs() + assert emails.count() == 1 + assert emails.emails['foobar']['msg'].is_multipart() + assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed' + assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html' + assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg' + + emails.empty() + sendmail.attachments = ['form_var_backoffice_file1_raw', 'form_var_backoffice_file2_raw'] + sendmail.perform(formdata) + get_response().process_after_jobs() + assert emails.count() == 1 + assert emails.emails['foobar']['msg'].is_multipart() + assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed' + assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html' + assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg' + # backoffice_file2 is unset, no more parts : + assert len(emails.emails['foobar']['msg'].get_payload()) == 2 + + # set backoffice_file2 and retry + setbo.fields = [{'field_id': 'bo2', + 'value': '={"content": "blah", "filename": "hello.txt", ' + '"content_type": "text/plain"}'}] + setbo.perform(formdata) + emails.empty() + sendmail.perform(formdata) + get_response().process_after_jobs() + assert emails.count() == 1 + assert emails.emails['foobar']['msg'].is_multipart() + assert emails.emails['foobar']['msg'].get_content_subtype() == 'mixed' + assert emails.emails['foobar']['msg'].get_payload()[0].get_content_type() == 'text/html' + assert emails.emails['foobar']['msg'].get_payload()[1].get_content_type() == 'image/jpeg' + assert emails.emails['foobar']['msg'].get_payload()[2].get_content_type() == 'text/plain' + assert emails.emails['foobar']['msg'].get_payload()[2].get_payload() == 'blah' + assert len(emails.emails['foobar']['msg'].get_payload()) == 3 + + def test_webservice_call(pub): pub.substitutions.feed(MockSubstitutionVariables()) diff --git a/wcs/workflows.py b/wcs/workflows.py index 191aada7..588d5421 100644 --- a/wcs/workflows.py +++ b/wcs/workflows.py @@ -38,6 +38,7 @@ from quixote.html import htmltext import qommon.errors from wcs.roles import Role, logged_users_role, get_user_roles +from wcs.fields import FileField from wcs.formdef import FormDef from wcs.formdata import Evolution @@ -2015,6 +2016,7 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem): subject = None body = None custom_from = None + attachments = None comment = None @@ -2053,7 +2055,7 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem): return _('Send mail (not completed)') def get_parameters(self): - return ('to', 'subject', 'body', 'custom_from') + return ('to', 'subject', 'body', 'attachments', 'custom_from') def fill_admin_form(self, form): self.add_parameters_widgets(form, self.get_parameters()) @@ -2076,6 +2078,38 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem): value=self.body, cols=80, rows=10, validation_function=ComputedExpressionWidget.validate_ezt, hint=_('Available variables: url, url_status, details, name, number, comment, field_NAME')) + + attachments_fields = [(None, '---', None)] + attachments_varnameless_fields = [] + for field in self.parent.parent.get_backoffice_fields(): + if field.key != 'file': + continue + if field.varname: + codename = 'form_var_%s_raw' % field.varname + else: + codename = 'form_f%s' % field.id # = form_fbo + attachments_varnameless_fields.append(codename) + attachments_fields.append((codename, field.label, codename)) + # filter: don't show removed fields without varname + attachments = [attachment for attachment in self.attachments or [] + if ((not attachment.startswith('form_fbo')) or + (attachment in attachments_varnameless_fields))] + if 'attachments' in parameters: + if len(attachments_fields) > 1: + form.add(WidgetList, '%sattachments' % prefix, title=_('Attachments'), + element_type=SingleSelectWidgetWithOther, + value=attachments, + add_element_label=_('Add attachment'), + element_kwargs={'render_br': False, 'options': attachments_fields}) + else: + form.add(WidgetList, '%sattachments' % prefix, + title=_('Attachments (Python expressions)'), + element_type=StringWidget, + value=attachments, + add_element_label=_('Add attachment'), + element_kwargs={'render_br': False, 'size': 50}, + advanced=not(bool(attachments))) + if 'custom_from' in parameters: form.add(ComputedExpressionWidget, '%scustom_from' % prefix, title=_('Custom From Address'), value=self.custom_from, @@ -2152,14 +2186,34 @@ class SendmailWorkflowStatusItem(WorkflowStatusItem): if self.custom_from: email_from = self.compute(self.custom_from) + attachments = [] + if self.attachments: + global_eval_dict = get_publisher().get_global_eval_dict() + local_eval_dict = get_publisher().substitutions.get_context_variables() + for attachment in self.attachments: + try: + picklableupload = eval(attachment, global_eval_dict, local_eval_dict) + except: + get_publisher().notify_of_exception(sys.exc_info(), + context='[Sendmail/Attachment]') + continue + if picklableupload: + try: + picklableupload = FileField.convert_value_from_anything(picklableupload) + except ValueError: + continue + attachments.append(picklableupload) + if len(addresses) > 1: emails.email(mail_subject, mail_body, email_rcpt=None, bcc=addresses, email_from=email_from, exclude_current_user=False, + attachments=attachments, fire_and_forget=True) else: emails.email(mail_subject, mail_body, email_rcpt=addresses, email_from=email_from, exclude_current_user=False, + attachments=attachments, fire_and_forget=True) register_item_class(SendmailWorkflowStatusItem) -- 2.14.2