Projet

Général

Profil

0004-misc-use-Django-s-EmailBackend-36977.patch

Lauréline Guérin, 21 septembre 2021 18:44

Télécharger (16,7 ko)

Voir les différences:

Subject: [PATCH 4/4] misc: use Django's EmailBackend (#36977)

 tests/form_pages/test_all.py |  14 ++---
 tests/test_emails.py         |   6 +-
 tests/test_mail_templates.py |   4 +-
 tests/test_register.py       |   8 ++-
 tests/test_workflows.py      |   4 +-
 tests/utilities.py           | 109 ++++++++++++++++++++---------------
 wcs/qommon/emails.py         |  63 ++++++++------------
 7 files changed, 105 insertions(+), 103 deletions(-)
tests/form_pages/test_all.py
1395 1395
    assert formdef.data_class().count() == 1
1396 1396
    assert '<div class="section foldable folded" id="summary">' in next_page.text
1397 1397
    # check the user received a copy by email
1398
    assert emails.emails.get('New form (test)')
1399
    assert emails.emails.get('New form (test)')['email_rcpt'] == ['foo@localhost']
1398
    assert emails.get('New form (test)')
1399
    assert emails.get('New form (test)')['email_rcpt'] == ['foo@localhost']
1400 1400

  
1401 1401

  
1402 1402
def test_form_submit_with_just_disabled_user(pub, emails):
......
1908 1908
    assert '<h2>Keep your tracking code</h2>' in resp.text
1909 1909
    resp.forms[0]['email'] = 'foo@localhost'
1910 1910
    resp = resp.forms[0].submit()
1911
    assert emails.emails.get('Tracking Code reminder')
1912
    assert tracking_code in list(emails.emails.values())[0]['payload']
1911
    assert emails.get('Tracking Code reminder')
1912
    assert tracking_code in emails.get('Tracking Code reminder')['payload']
1913 1913
    assert resp.location == 'http://example.net/test/code/%s/load' % tracking_code
1914 1914
    resp = resp.follow()
1915 1915
    resp = resp.follow()
......
1938 1938
    resp.forms[0]['email'] = 'foo@localhost'
1939 1939
    resp.forms[0]['validation'].checked = True  # stupit bot will do that
1940 1940
    resp = resp.forms[0].submit()
1941
    assert not emails.emails.values()
1941
    assert not emails.count()
1942 1942

  
1943 1943

  
1944 1944
def test_form_tracking_code_remove_draft(pub, nocache):
......
3711 3711
    resp = resp.follow()
3712 3712
    assert 'The form has been recorded' in resp.text
3713 3713
    # check rst2html didn't fail
3714
    assert b'ee' in emails.emails['New form (test)']['msg'].get_payload()[1].get_payload(decode=True)
3714
    assert b'ee' in emails.get('New form (test)')['msg'].get_payload()[1].get_payload(decode=True)
3715 3715

  
3716 3716

  
3717 3717
def test_form_table_rows_field_submit(pub, emails):
......
3783 3783
    resp = resp.form.submit('submit')
3784 3784
    resp = resp.follow()
3785 3785
    assert 'The form has been recorded' in resp.text
3786
    assert b'ee' in emails.emails['New form (test)']['msg'].get_payload()[1].get_payload(decode=True)
3786
    assert b'ee' in emails.get('New form (test)')['msg'].get_payload()[1].get_payload(decode=True)
3787 3787

  
3788 3788

  
3789 3789
def test_form_new_table_rows_field_draft_recall(pub):
tests/test_emails.py
4 4
import socket
5 5

  
6 6
import pytest
7
from quixote import cleanup
8 7

  
9 8
from wcs.qommon.emails import docutils  # noqa pylint: disable=unused-import
10 9
from wcs.qommon.emails import email as send_email
11 10
from wcs.qommon.upload_storage import PicklableUpload
12 11

  
13
from .utilities import clean_temporary_pub, create_temporary_pub
12
from .utilities import clean_temporary_pub, cleanup, create_temporary_pub
14 13

  
15 14

  
16 15
def setup_module(module):
......
36 35
    assert emails.count() == 1
37 36
    assert emails.emails['test']['from'] == '%s@%s' % (pwd.getpwuid(os.getuid())[0], socket.getfqdn())
38 37

  
38
    emails.empty()
39 39
    pub.cfg['emails'] = {'from': 'foo@localhost'}
40 40
    send_email('test', mail_body='Hello', email_rcpt='test@localhost', want_html=False)
41 41
    assert emails.count() == 1
42 42
    assert emails.emails['test']['from'] == 'foo@localhost'
43 43
    assert emails.emails['test']['msg']['From'] == 'foo@localhost'
44 44

  
45
    emails.empty()
45 46
    if not pub.site_options.has_section('variables'):
46 47
        pub.site_options.add_section('variables')
47 48
    pub.site_options.set('variables', 'global_title', 'HELLO')
......
125 126

  
126 127
def test_email_plain_with_attachments(emails):
127 128
    create_temporary_pub()
128

  
129 129
    jpg = PicklableUpload('test.jpeg', 'image/jpeg')
130 130
    with open(os.path.join(os.path.dirname(__file__), 'image-with-gps-data.jpeg'), 'rb') as fd:
131 131
        jpg_content = fd.read()
tests/test_mail_templates.py
244 244
    pub.get_request().response.process_after_jobs()
245 245
    assert emails.count() == 1
246 246
    assert emails.get('test subject')['email_rcpt'] == ['xyz@localhost']
247
    assert b'test body' in base64.decodebytes(
248
        force_bytes(emails.get('test subject')['msg'].get_payload(0).get_payload())
249
    )
247
    assert 'test body' in emails.get('test subject')['msg'].get_payload(0).get_payload()
250 248

  
251 249
    # check nothing is sent and an error is logged if the mail template is
252 250
    # missing
tests/test_register.py
144 144
    do_user_registration(pub)
145 145

  
146 146
    assert emails.get('New Registration')
147
    assert emails.get('New Registration').get('email_rcpt') == ['admin@localhost']
147
    assert emails.get('New Registration')['email_rcpt'] == ['admin@localhost']
148 148

  
149 149

  
150 150
def test_user_notification(pub, emails):
......
164 164
    account = PasswordAccount.get('foo@localhost')
165 165

  
166 166
    assert emails.get('Welcome to example.net')
167
    assert emails.get('Welcome to example.net').get('to') == 'foo@localhost'
168
    assert account.password in emails.get('Welcome to example.net').get('payload')
167
    assert emails.get('Welcome to example.net')['to'] == 'foo@localhost'
168
    assert account.password in emails.get('Welcome to example.net')['payload']
169 169

  
170 170

  
171 171
def test_user_login(pub):
......
241 241
    assert 'The token you submitted does not exist' in resp.text
242 242

  
243 243
    # new forgotten request
244
    emails.empty()
244 245
    resp = app.get('/ident/password/forgotten')
245 246
    resp.forms[0]['username'] = 'foo'
246 247
    resp = resp.forms[0].submit()
......
267 268
    pub.cfg['passwords'] = {'generate': False, 'can_change': True}
268 269
    pub.write_cfg()
269 270

  
271
    emails.empty()
270 272
    resp = app.get('/ident/password/forgotten')
271 273
    resp.forms[0]['username'] = 'foo'
272 274
    resp = resp.forms[0].submit()
tests/test_workflows.py
1664 1664
    item.perform(formdata)
1665 1665
    get_response().process_after_jobs()
1666 1666
    assert emails.count() == 1
1667
    assert emails.get('foobar').get('from') == 'foobar@localhost'
1667
    assert emails.get('foobar')['from'] == 'foobar@localhost'
1668 1668

  
1669 1669
    # custom from email (computed)
1670 1670
    emails.empty()
......
1673 1673
    item.perform(formdata)
1674 1674
    get_response().process_after_jobs()
1675 1675
    assert emails.count() == 1
1676
    assert emails.get('foobar').get('from') == 'foobar@localhost'
1676
    assert emails.get('foobar')['from'] == 'foobar@localhost'
1677 1677

  
1678 1678
    # custom sender name defined from site-options variable
1679 1679
    pub.load_site_options()
tests/utilities.py
1
import email.header
2
import email.parser
3 1
import http.cookies
4 2
import json
5 3
import os
6 4
import random
7 5
import shutil
8
import sys
9 6
import tempfile
10 7
import urllib.parse
11 8

  
12 9
import psycopg2
13 10
from django.conf import settings
14
from django.utils.encoding import force_bytes, force_text
11
from django.core import mail
12
from django.utils.encoding import force_bytes
15 13
from quixote import cleanup, get_publisher
16 14
from webtest import TestApp
17 15

  
......
245 243
    return app
246 244

  
247 245

  
248
class EmailsMocking:
249
    def create_smtp_server(self, *args, **kwargs):
250
        class MockSmtplibSMTP:
251
            def __init__(self, mocking):
252
                self.mocking = mocking
253

  
254
            def send_message(self, msg, msg_from, rcpts):
255
                return self.sendmail(msg_from, rcpts, msg.as_string())
256

  
257
            def sendmail(self, msg_from, rcpts, msg):
258
                msg = email.parser.Parser().parsestr(msg)
259
                subject = email.header.decode_header(msg['Subject'])[0][0]
260
                if msg.is_multipart():
261
                    payloads = [x.get_payload(decode=True) for x in msg.get_payload()]
262
                    payload = payloads[0]
263
                else:
264
                    payload = msg.get_payload(decode=True)
265
                    payloads = [payload]
266
                self.mocking.emails[force_text(subject)] = {
267
                    'from': msg_from,
268
                    'to': email.header.decode_header(msg['To'])[0][0],
269
                    'payload': force_str(payload if payload else ''),
270
                    'payloads': payloads,
271
                    'msg': msg,
272
                    'subject': force_text(subject),
273
                }
274
                self.mocking.emails[force_text(subject)]['email_rcpt'] = rcpts
275
                self.mocking.latest_subject = force_text(subject)
276

  
277
            def quit(self):
278
                pass
279

  
280
        return MockSmtplibSMTP(self)
246
class Email:
247
    def __init__(self, email):
248
        self.email = email
249

  
250
    @property
251
    def msg(self):
252
        return self.email.message()
253

  
254
    @property
255
    def email_rcpt(self):
256
        return self.email.recipients()
257

  
258
    @property
259
    def payload(self):
260
        return force_str(self.payloads[0])
261

  
262
    @property
263
    def payloads(self):
264
        if self.msg.is_multipart():
265
            return [x.get_payload(decode=True) for x in self.msg.get_payload()]
266
        return [self.msg.get_payload(decode=True)]
267

  
268
    @property
269
    def to(self):
270
        return self.email.message()['To']
271

  
272
    def get(self, key):
273
        return getattr(self.email, key)
274

  
275
    def __getitem__(self, key):
276
        if key in ['msg', 'email_rcpt', 'payload', 'payloads', 'to']:
277
            return getattr(self, key)
278
        if key == 'from':
279
            key = 'from_email'
280
        return getattr(self.email, key)
281 281

  
282

  
283
class Emails:
284
    def __contains__(self, value):
285
        return self[value] is not None
286

  
287
    def __getitem__(self, key):
288
        for em in mail.outbox:
289
            if em.subject == key:
290
                return Email(em)
291

  
292

  
293
class EmailsMocking:
282 294
    def get(self, subject):
283
        return self.emails.get(subject)
295
        return self.emails[subject]
284 296

  
285 297
    def get_latest(self, part=None):
286
        email = self.emails.get(self.latest_subject, {})
298
        email = Email(mail.outbox[-1])
287 299
        if part:
288 300
            return email.get(part) if email else None
289 301
        return email
290 302

  
291 303
    def empty(self):
292
        self.emails.clear()
304
        mail.outbox = []
293 305

  
294 306
    def count(self):
295
        return len(self.emails)
307
        return len(mail.outbox)
308

  
309
    @property
310
    def latest_subject(self):
311
        return mail.outbox[-1].subject
312

  
313
    @property
314
    def emails(self):
315
        return Emails()
296 316

  
297 317
    def __enter__(self):
298
        self.wcs_create_smtp_server = sys.modules['wcs.qommon.emails'].create_smtp_server
299
        sys.modules['wcs.qommon.emails'].create_smtp_server = self.create_smtp_server
300
        self.emails = {}
301
        self.latest_subject = None
302 318
        return self
303 319

  
304 320
    def __exit__(self, exc_type, exc_value, tb):
305
        del self.emails
306
        sys.modules['wcs.qommon.emails'].create_smtp_server = self.wcs_create_smtp_server
321
        pass
307 322

  
308 323

  
309 324
class MockSubstitutionVariables:
wcs/qommon/emails.py
38 38
except ImportError:
39 39
    docutils = None
40 40

  
41
from django.core.mail import EmailMessage, EmailMultiAlternatives
41
from django.core.mail import EmailMessage, EmailMultiAlternatives, get_connection
42 42
from django.template.loader import render_to_string
43 43
from django.utils.safestring import mark_safe
44 44
from quixote import get_publisher, get_request, get_response
......
341 341
        get_response().add_after_job('sending email', email_to_send, fire_and_forget=True)
342 342

  
343 343

  
344
def create_smtp_server(emails_cfg, smtp_timeout=None):
345
    publisher = get_publisher()
346
    try:
347
        s = smtplib.SMTP(emails_cfg.get('smtp_server', None) or 'localhost', timeout=smtp_timeout)
348
    except socket.timeout as e:
349
        publisher.record_error(_('Failed to connect to SMTP server (timeout)'), exception=e)
350
        raise errors.EmailError('Failed to connect to SMTP server (timeout)')
351
    except OSError as e:
352
        publisher.record_error(_('Failed to connect to SMTP server'), exception=e)
353
        raise errors.EmailError('Failed to connect to SMTP server')
354
    if not s.sock:
355
        publisher.record_error(_('Failed to connect to SMTP server'))
356
        raise errors.EmailError('Failed to connect to SMTP server')
357
    rc_code, ehlo_answer = s.ehlo()
358
    if rc_code != 250:
359
        publisher.record_error(_('Failed to EHLO to SMTP server (%s)') % rc_code)
360
        raise errors.EmailError('Failed to EHLO to SMTP server (%s)' % rc_code)
361
    if b'STARTTLS' in ehlo_answer:
362
        rc_code = s.starttls()[0]
363
        if rc_code != 220:
364
            publisher.record_error(_('Failed to STARTTLS to SMTP server (%s)') % rc_code)
365
            raise errors.EmailError('Failed to STARTTLS to SMTP server (%s)' % rc_code)
366
    if emails_cfg.get('smtp_login'):
367
        try:
368
            s.login(emails_cfg.get('smtp_login') or '', emails_cfg.get('smtp_password') or '')
369
        except smtplib.SMTPAuthenticationError as e:
370
            publisher.record_error(_('Failed to authenticate to SMTP server'), exception=e)
371
            raise errors.EmailError('Failed to authenticate to SMTP server')
372
        except smtplib.SMTPException as e:
373
            publisher.record_error(_('Failed to authenticate to SMTP server, unknown error.'), exception=e)
374
            raise errors.EmailError('Failed to authenticate to SMTP server, unknown error.')
375
    return s
376

  
377

  
378 344
class EmailToSend:
379 345
    def __init__(self, email_msg, smtp_timeout):
380 346
        self.email_msg = email_msg
381 347
        self.smtp_timeout = smtp_timeout
382 348

  
383 349
    def __call__(self, job=None):
350
        publisher = get_publisher()
384 351
        emails_cfg = get_cfg('emails', {})
385 352

  
386
        s = create_smtp_server(emails_cfg, self.smtp_timeout)
387 353
        try:
388
            s.send_message(self.email_msg.message(), self.email_msg.from_email, self.email_msg.recipients())
354
            if emails_cfg.get('smtp_server', None):
355
                kwargs = {
356
                    'host': emails_cfg['smtp_server'],
357
                    'username': emails_cfg.get('smtp_login') or '',
358
                    'password': emails_cfg.get('smtp_password') or '',
359
                    'timeout': self.smtp_timeout,
360
                }
361
                backend = get_connection(backend='django.core.mail.backends.smtp.EmailBackend', **kwargs)
362
                self.email_msg.connection = backend
363

  
364
            self.email_msg.send()
365
        except socket.timeout as e:
366
            publisher.record_error(_('Failed to connect to SMTP server (timeout)'), exception=e)
367
            raise errors.EmailError('Failed to connect to SMTP server (timeout)')
368
        except OSError as e:
369
            publisher.record_error(_('Failed to connect to SMTP server'), exception=e)
370
            raise errors.EmailError('Failed to connect to SMTP server')
371
        except smtplib.SMTPAuthenticationError as e:
372
            publisher.record_error(_('Failed to authenticate to SMTP server'), exception=e)
373
            raise errors.EmailError('Failed to authenticate to SMTP server')
374
        except smtplib.SMTPException as e:
375
            publisher.record_error(_('Failed to authenticate to SMTP server, unknown error.'), exception=e)
376
            raise errors.EmailError('Failed to authenticate to SMTP server, unknown error.')
389 377
        except (smtplib.SMTPRecipientsRefused, smtplib.SMTPNotSupportedError, smtplib.SMTPDataError):
390 378
            pass
391
        s.quit()
392
-