0004-misc-use-Django-s-EmailBackend-36977.patch
tests/form_pages/test_all.py | ||
---|---|---|
1395 | 1395 |
assert formdef.data_class().count() == 1 |
1396 | 1396 |
assert '<div class="section foldable folded" id="summary">' in next_page.text |
1397 | 1397 |
# check the user received a copy by email |
1398 |
assert emails.emails.get('New form (test)')
|
|
1399 |
assert emails.emails.get('New form (test)')['email_rcpt'] == ['foo@localhost']
|
|
1398 |
assert emails.get('New form (test)') |
|
1399 |
assert emails.get('New form (test)')['email_rcpt'] == ['foo@localhost'] |
|
1400 | 1400 | |
1401 | 1401 | |
1402 | 1402 |
def test_form_submit_with_just_disabled_user(pub, emails): |
... | ... | |
1908 | 1908 |
assert '<h2>Keep your tracking code</h2>' in resp.text |
1909 | 1909 |
resp.forms[0]['email'] = 'foo@localhost' |
1910 | 1910 |
resp = resp.forms[0].submit() |
1911 |
assert emails.emails.get('Tracking Code reminder')
|
|
1912 |
assert tracking_code in list(emails.emails.values())[0]['payload']
|
|
1911 |
assert emails.get('Tracking Code reminder') |
|
1912 |
assert tracking_code in emails.get('Tracking Code reminder')['payload']
|
|
1913 | 1913 |
assert resp.location == 'http://example.net/test/code/%s/load' % tracking_code |
1914 | 1914 |
resp = resp.follow() |
1915 | 1915 |
resp = resp.follow() |
... | ... | |
1938 | 1938 |
resp.forms[0]['email'] = 'foo@localhost' |
1939 | 1939 |
resp.forms[0]['validation'].checked = True # stupit bot will do that |
1940 | 1940 |
resp = resp.forms[0].submit() |
1941 |
assert not emails.emails.values()
|
|
1941 |
assert not emails.count()
|
|
1942 | 1942 | |
1943 | 1943 | |
1944 | 1944 |
def test_form_tracking_code_remove_draft(pub, nocache): |
... | ... | |
3711 | 3711 |
resp = resp.follow() |
3712 | 3712 |
assert 'The form has been recorded' in resp.text |
3713 | 3713 |
# check rst2html didn't fail |
3714 |
assert b'ee' in emails.emails['New form (test)']['msg'].get_payload()[1].get_payload(decode=True)
|
|
3714 |
assert b'ee' in emails.get('New form (test)')['msg'].get_payload()[1].get_payload(decode=True)
|
|
3715 | 3715 | |
3716 | 3716 | |
3717 | 3717 |
def test_form_table_rows_field_submit(pub, emails): |
... | ... | |
3783 | 3783 |
resp = resp.form.submit('submit') |
3784 | 3784 |
resp = resp.follow() |
3785 | 3785 |
assert 'The form has been recorded' in resp.text |
3786 |
assert b'ee' in emails.emails['New form (test)']['msg'].get_payload()[1].get_payload(decode=True)
|
|
3786 |
assert b'ee' in emails.get('New form (test)')['msg'].get_payload()[1].get_payload(decode=True)
|
|
3787 | 3787 | |
3788 | 3788 | |
3789 | 3789 |
def test_form_new_table_rows_field_draft_recall(pub): |
tests/test_emails.py | ||
---|---|---|
4 | 4 |
import socket |
5 | 5 | |
6 | 6 |
import pytest |
7 |
from quixote import cleanup |
|
8 | 7 | |
9 | 8 |
from wcs.qommon.emails import docutils # noqa pylint: disable=unused-import |
10 | 9 |
from wcs.qommon.emails import email as send_email |
11 | 10 |
from wcs.qommon.upload_storage import PicklableUpload |
12 | 11 | |
13 |
from .utilities import clean_temporary_pub, create_temporary_pub |
|
12 |
from .utilities import clean_temporary_pub, cleanup, create_temporary_pub
|
|
14 | 13 | |
15 | 14 | |
16 | 15 |
def setup_module(module): |
... | ... | |
36 | 35 |
assert emails.count() == 1 |
37 | 36 |
assert emails.emails['test']['from'] == '%s@%s' % (pwd.getpwuid(os.getuid())[0], socket.getfqdn()) |
38 | 37 | |
38 |
emails.empty() |
|
39 | 39 |
pub.cfg['emails'] = {'from': 'foo@localhost'} |
40 | 40 |
send_email('test', mail_body='Hello', email_rcpt='test@localhost', want_html=False) |
41 | 41 |
assert emails.count() == 1 |
42 | 42 |
assert emails.emails['test']['from'] == 'foo@localhost' |
43 | 43 |
assert emails.emails['test']['msg']['From'] == 'foo@localhost' |
44 | 44 | |
45 |
emails.empty() |
|
45 | 46 |
if not pub.site_options.has_section('variables'): |
46 | 47 |
pub.site_options.add_section('variables') |
47 | 48 |
pub.site_options.set('variables', 'global_title', 'HELLO') |
... | ... | |
125 | 126 | |
126 | 127 |
def test_email_plain_with_attachments(emails): |
127 | 128 |
create_temporary_pub() |
128 | ||
129 | 129 |
jpg = PicklableUpload('test.jpeg', 'image/jpeg') |
130 | 130 |
with open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb') as fd: |
131 | 131 |
jpg_content = fd.read() |
tests/test_mail_templates.py | ||
---|---|---|
244 | 244 |
pub.get_request().response.process_after_jobs() |
245 | 245 |
assert emails.count() == 1 |
246 | 246 |
assert emails.get('test subject')['email_rcpt'] == ['xyz@localhost'] |
247 |
assert b'test body' in base64.decodebytes( |
|
248 |
force_bytes(emails.get('test subject')['msg'].get_payload(0).get_payload()) |
|
249 |
) |
|
247 |
assert 'test body' in emails.get('test subject')['msg'].get_payload(0).get_payload() |
|
250 | 248 | |
251 | 249 |
# check nothing is sent and an error is logged if the mail template is |
252 | 250 |
# missing |
tests/test_register.py | ||
---|---|---|
144 | 144 |
do_user_registration(pub) |
145 | 145 | |
146 | 146 |
assert emails.get('New Registration') |
147 |
assert emails.get('New Registration').get('email_rcpt') == ['admin@localhost']
|
|
147 |
assert emails.get('New Registration')['email_rcpt'] == ['admin@localhost']
|
|
148 | 148 | |
149 | 149 | |
150 | 150 |
def test_user_notification(pub, emails): |
... | ... | |
164 | 164 |
account = PasswordAccount.get('foo@localhost') |
165 | 165 | |
166 | 166 |
assert emails.get('Welcome to example.net') |
167 |
assert emails.get('Welcome to example.net').get('to') == 'foo@localhost'
|
|
168 |
assert account.password in emails.get('Welcome to example.net').get('payload')
|
|
167 |
assert emails.get('Welcome to example.net')['to'] == 'foo@localhost'
|
|
168 |
assert account.password in emails.get('Welcome to example.net')['payload']
|
|
169 | 169 | |
170 | 170 | |
171 | 171 |
def test_user_login(pub): |
... | ... | |
241 | 241 |
assert 'The token you submitted does not exist' in resp.text |
242 | 242 | |
243 | 243 |
# new forgotten request |
244 |
emails.empty() |
|
244 | 245 |
resp = app.get('/ident/password/forgotten') |
245 | 246 |
resp.forms[0]['username'] = 'foo' |
246 | 247 |
resp = resp.forms[0].submit() |
... | ... | |
267 | 268 |
pub.cfg['passwords'] = {'generate': False, 'can_change': True} |
268 | 269 |
pub.write_cfg() |
269 | 270 | |
271 |
emails.empty() |
|
270 | 272 |
resp = app.get('/ident/password/forgotten') |
271 | 273 |
resp.forms[0]['username'] = 'foo' |
272 | 274 |
resp = resp.forms[0].submit() |
tests/test_workflows.py | ||
---|---|---|
1664 | 1664 |
item.perform(formdata) |
1665 | 1665 |
get_response().process_after_jobs() |
1666 | 1666 |
assert emails.count() == 1 |
1667 |
assert emails.get('foobar').get('from') == 'foobar@localhost'
|
|
1667 |
assert emails.get('foobar')['from'] == 'foobar@localhost'
|
|
1668 | 1668 | |
1669 | 1669 |
# custom from email (computed) |
1670 | 1670 |
emails.empty() |
... | ... | |
1673 | 1673 |
item.perform(formdata) |
1674 | 1674 |
get_response().process_after_jobs() |
1675 | 1675 |
assert emails.count() == 1 |
1676 |
assert emails.get('foobar').get('from') == 'foobar@localhost'
|
|
1676 |
assert emails.get('foobar')['from'] == 'foobar@localhost'
|
|
1677 | 1677 | |
1678 | 1678 |
# custom sender name defined from site-options variable |
1679 | 1679 |
pub.load_site_options() |
tests/utilities.py | ||
---|---|---|
1 |
import email.header |
|
2 |
import email.parser |
|
3 | 1 |
import http.cookies |
4 | 2 |
import json |
5 | 3 |
import os |
6 | 4 |
import random |
7 | 5 |
import shutil |
8 |
import sys |
|
9 | 6 |
import tempfile |
10 | 7 |
import urllib.parse |
11 | 8 | |
12 | 9 |
import psycopg2 |
13 | 10 |
from django.conf import settings |
14 |
from django.utils.encoding import force_bytes, force_text |
|
11 |
from django.core import mail |
|
12 |
from django.utils.encoding import force_bytes |
|
15 | 13 |
from quixote import cleanup, get_publisher |
16 | 14 |
from webtest import TestApp |
17 | 15 | |
... | ... | |
245 | 243 |
return app |
246 | 244 | |
247 | 245 | |
248 |
class EmailsMocking: |
|
249 |
def create_smtp_server(self, *args, **kwargs): |
|
250 |
class MockSmtplibSMTP: |
|
251 |
def __init__(self, mocking): |
|
252 |
self.mocking = mocking |
|
253 | ||
254 |
def send_message(self, msg, msg_from, rcpts): |
|
255 |
return self.sendmail(msg_from, rcpts, msg.as_string()) |
|
256 | ||
257 |
def sendmail(self, msg_from, rcpts, msg): |
|
258 |
msg = email.parser.Parser().parsestr(msg) |
|
259 |
subject = email.header.decode_header(msg['Subject'])[0][0] |
|
260 |
if msg.is_multipart(): |
|
261 |
payloads = [x.get_payload(decode=True) for x in msg.get_payload()] |
|
262 |
payload = payloads[0] |
|
263 |
else: |
|
264 |
payload = msg.get_payload(decode=True) |
|
265 |
payloads = [payload] |
|
266 |
self.mocking.emails[force_text(subject)] = { |
|
267 |
'from': msg_from, |
|
268 |
'to': email.header.decode_header(msg['To'])[0][0], |
|
269 |
'payload': force_str(payload if payload else ''), |
|
270 |
'payloads': payloads, |
|
271 |
'msg': msg, |
|
272 |
'subject': force_text(subject), |
|
273 |
} |
|
274 |
self.mocking.emails[force_text(subject)]['email_rcpt'] = rcpts |
|
275 |
self.mocking.latest_subject = force_text(subject) |
|
276 | ||
277 |
def quit(self): |
|
278 |
pass |
|
279 | ||
280 |
return MockSmtplibSMTP(self) |
|
246 |
class Email: |
|
247 |
def __init__(self, email): |
|
248 |
self.email = email |
|
249 | ||
250 |
@property |
|
251 |
def msg(self): |
|
252 |
return self.email.message() |
|
253 | ||
254 |
@property |
|
255 |
def email_rcpt(self): |
|
256 |
return self.email.recipients() |
|
257 | ||
258 |
@property |
|
259 |
def payload(self): |
|
260 |
return force_str(self.payloads[0]) |
|
261 | ||
262 |
@property |
|
263 |
def payloads(self): |
|
264 |
if self.msg.is_multipart(): |
|
265 |
return [x.get_payload(decode=True) for x in self.msg.get_payload()] |
|
266 |
return [self.msg.get_payload(decode=True)] |
|
267 | ||
268 |
@property |
|
269 |
def to(self): |
|
270 |
return self.email.message()['To'] |
|
271 | ||
272 |
def get(self, key): |
|
273 |
return getattr(self.email, key) |
|
274 | ||
275 |
def __getitem__(self, key): |
|
276 |
if key in ['msg', 'email_rcpt', 'payload', 'payloads', 'to']: |
|
277 |
return getattr(self, key) |
|
278 |
if key == 'from': |
|
279 |
key = 'from_email' |
|
280 |
return getattr(self.email, key) |
|
281 | 281 | |
282 | ||
283 |
class Emails: |
|
284 |
def __contains__(self, value): |
|
285 |
return self[value] is not None |
|
286 | ||
287 |
def __getitem__(self, key): |
|
288 |
for em in mail.outbox: |
|
289 |
if em.subject == key: |
|
290 |
return Email(em) |
|
291 | ||
292 | ||
293 |
class EmailsMocking: |
|
282 | 294 |
def get(self, subject): |
283 |
return self.emails.get(subject)
|
|
295 |
return self.emails[subject]
|
|
284 | 296 | |
285 | 297 |
def get_latest(self, part=None): |
286 |
email = self.emails.get(self.latest_subject, {})
|
|
298 |
email = Email(mail.outbox[-1])
|
|
287 | 299 |
if part: |
288 | 300 |
return email.get(part) if email else None |
289 | 301 |
return email |
290 | 302 | |
291 | 303 |
def empty(self): |
292 |
self.emails.clear()
|
|
304 |
mail.outbox = []
|
|
293 | 305 | |
294 | 306 |
def count(self): |
295 |
return len(self.emails) |
|
307 |
return len(mail.outbox) |
|
308 | ||
309 |
@property |
|
310 |
def latest_subject(self): |
|
311 |
return mail.outbox[-1].subject |
|
312 | ||
313 |
@property |
|
314 |
def emails(self): |
|
315 |
return Emails() |
|
296 | 316 | |
297 | 317 |
def __enter__(self): |
298 |
self.wcs_create_smtp_server = sys.modules['wcs.qommon.emails'].create_smtp_server |
|
299 |
sys.modules['wcs.qommon.emails'].create_smtp_server = self.create_smtp_server |
|
300 |
self.emails = {} |
|
301 |
self.latest_subject = None |
|
302 | 318 |
return self |
303 | 319 | |
304 | 320 |
def __exit__(self, exc_type, exc_value, tb): |
305 |
del self.emails |
|
306 |
sys.modules['wcs.qommon.emails'].create_smtp_server = self.wcs_create_smtp_server |
|
321 |
pass |
|
307 | 322 | |
308 | 323 | |
309 | 324 |
class MockSubstitutionVariables: |
wcs/qommon/emails.py | ||
---|---|---|
38 | 38 |
except ImportError: |
39 | 39 |
docutils = None |
40 | 40 | |
41 |
from django.core.mail import EmailMessage, EmailMultiAlternatives |
|
41 |
from django.core.mail import EmailMessage, EmailMultiAlternatives, get_connection
|
|
42 | 42 |
from django.template.loader import render_to_string |
43 | 43 |
from django.utils.safestring import mark_safe |
44 | 44 |
from quixote import get_publisher, get_request, get_response |
... | ... | |
341 | 341 |
get_response().add_after_job('sending email', email_to_send, fire_and_forget=True) |
342 | 342 | |
343 | 343 | |
344 |
def create_smtp_server(emails_cfg, smtp_timeout=None): |
|
345 |
publisher = get_publisher() |
|
346 |
try: |
|
347 |
s = smtplib.SMTP(emails_cfg.get('smtp_server', None) or 'localhost', timeout=smtp_timeout) |
|
348 |
except socket.timeout as e: |
|
349 |
publisher.record_error(_('Failed to connect to SMTP server (timeout)'), exception=e) |
|
350 |
raise errors.EmailError('Failed to connect to SMTP server (timeout)') |
|
351 |
except OSError as e: |
|
352 |
publisher.record_error(_('Failed to connect to SMTP server'), exception=e) |
|
353 |
raise errors.EmailError('Failed to connect to SMTP server') |
|
354 |
if not s.sock: |
|
355 |
publisher.record_error(_('Failed to connect to SMTP server')) |
|
356 |
raise errors.EmailError('Failed to connect to SMTP server') |
|
357 |
rc_code, ehlo_answer = s.ehlo() |
|
358 |
if rc_code != 250: |
|
359 |
publisher.record_error(_('Failed to EHLO to SMTP server (%s)') % rc_code) |
|
360 |
raise errors.EmailError('Failed to EHLO to SMTP server (%s)' % rc_code) |
|
361 |
if b'STARTTLS' in ehlo_answer: |
|
362 |
rc_code = s.starttls()[0] |
|
363 |
if rc_code != 220: |
|
364 |
publisher.record_error(_('Failed to STARTTLS to SMTP server (%s)') % rc_code) |
|
365 |
raise errors.EmailError('Failed to STARTTLS to SMTP server (%s)' % rc_code) |
|
366 |
if emails_cfg.get('smtp_login'): |
|
367 |
try: |
|
368 |
s.login(emails_cfg.get('smtp_login') or '', emails_cfg.get('smtp_password') or '') |
|
369 |
except smtplib.SMTPAuthenticationError as e: |
|
370 |
publisher.record_error(_('Failed to authenticate to SMTP server'), exception=e) |
|
371 |
raise errors.EmailError('Failed to authenticate to SMTP server') |
|
372 |
except smtplib.SMTPException as e: |
|
373 |
publisher.record_error(_('Failed to authenticate to SMTP server, unknown error.'), exception=e) |
|
374 |
raise errors.EmailError('Failed to authenticate to SMTP server, unknown error.') |
|
375 |
return s |
|
376 | ||
377 | ||
378 | 344 |
class EmailToSend: |
379 | 345 |
def __init__(self, email_msg, smtp_timeout): |
380 | 346 |
self.email_msg = email_msg |
381 | 347 |
self.smtp_timeout = smtp_timeout |
382 | 348 | |
383 | 349 |
def __call__(self, job=None): |
350 |
publisher = get_publisher() |
|
384 | 351 |
emails_cfg = get_cfg('emails', {}) |
385 | 352 | |
386 |
s = create_smtp_server(emails_cfg, self.smtp_timeout) |
|
387 | 353 |
try: |
388 |
s.send_message(self.email_msg.message(), self.email_msg.from_email, self.email_msg.recipients()) |
|
354 |
if emails_cfg.get('smtp_server', None): |
|
355 |
kwargs = { |
|
356 |
'host': emails_cfg['smtp_server'], |
|
357 |
'username': emails_cfg.get('smtp_login') or '', |
|
358 |
'password': emails_cfg.get('smtp_password') or '', |
|
359 |
'timeout': self.smtp_timeout, |
|
360 |
} |
|
361 |
backend = get_connection(backend='django.core.mail.backends.smtp.EmailBackend', **kwargs) |
|
362 |
self.email_msg.connection = backend |
|
363 | ||
364 |
self.email_msg.send() |
|
365 |
except socket.timeout as e: |
|
366 |
publisher.record_error(_('Failed to connect to SMTP server (timeout)'), exception=e) |
|
367 |
raise errors.EmailError('Failed to connect to SMTP server (timeout)') |
|
368 |
except OSError as e: |
|
369 |
publisher.record_error(_('Failed to connect to SMTP server'), exception=e) |
|
370 |
raise errors.EmailError('Failed to connect to SMTP server') |
|
371 |
except smtplib.SMTPAuthenticationError as e: |
|
372 |
publisher.record_error(_('Failed to authenticate to SMTP server'), exception=e) |
|
373 |
raise errors.EmailError('Failed to authenticate to SMTP server') |
|
374 |
except smtplib.SMTPException as e: |
|
375 |
publisher.record_error(_('Failed to authenticate to SMTP server, unknown error.'), exception=e) |
|
376 |
raise errors.EmailError('Failed to authenticate to SMTP server, unknown error.') |
|
389 | 377 |
except (smtplib.SMTPRecipientsRefused, smtplib.SMTPNotSupportedError, smtplib.SMTPDataError): |
390 | 378 |
pass |
391 |
s.quit() |
|
392 |
- |