0001-announces-send-command-10805.patch
corbo/models.py | ||
---|---|---|
1 |
from datetime import datetime |
|
2 |
import logging |
|
3 |
from html2text import HTML2Text |
|
4 |
from emails.django import Message |
|
5 |
from lxml.etree import HTML as HTMLTree |
|
6 | ||
1 | 7 |
from django.utils import timezone |
2 | 8 |
from django.conf import settings |
3 | 9 |
from django.db import models |
10 |
from django.core.files.storage import DefaultStorage |
|
4 | 11 |
from django.utils.translation import ugettext_lazy as _ |
5 | 12 | |
6 | 13 |
from ckeditor.fields import RichTextField |
... | ... | |
10 | 17 |
('homepage', _('Homepage')) |
11 | 18 |
) |
12 | 19 | |
20 |
logger = logging.getLogger(__name__) |
|
21 | ||
13 | 22 |
class Category(models.Model): |
14 | 23 |
name = models.CharField(max_length=64, blank=False, null=False) |
15 | 24 |
ctime = models.DateTimeField(auto_now_add=True) |
... | ... | |
62 | 71 |
id=self.announce.id, time=self.deliver_time) |
63 | 72 |
return u'announce {id} to deliver'.format(id=self.announce.id) |
64 | 73 | |
74 |
def send(self): |
|
75 |
subscriptions = self.announce.category.subscription_set.all() |
|
76 |
total_sent = 0 |
|
77 |
handler = HTML2Text() |
|
78 |
m = Message(html=self.announce.text, subject=self.announce.title, |
|
79 |
text=handler.handle(self.announce.text), |
|
80 |
mail_from=settings.DEFAULT_FROM_EMAIL) |
|
81 |
html_tree = HTMLTree(self.announce.text) |
|
82 |
storage = DefaultStorage() |
|
83 |
for img in html_tree.xpath('//img/@src'): |
|
84 |
img_path = img.lstrip(storage.base_url) |
|
85 |
m.attach(filename=img, data=storage.open(img_path)) |
|
86 |
m.attachments[img].is_inline = True |
|
87 |
m.transformer.synchronize_inline_images() |
|
88 |
m.transformer.load_and_transform() |
|
89 |
m.transformer.save() |
|
90 |
for s in subscriptions: |
|
91 |
if not s.identifier: |
|
92 |
continue |
|
93 |
sent = m.send(to=s.identifier) |
|
94 |
if sent: |
|
95 |
total_sent += 1 |
|
96 |
logger.info('Announce "%s" sent to %s', self.announce.title, s.identifier) |
|
97 |
else: |
|
98 |
logger.warning('Error occured while sending announce "%s" to %s.', |
|
99 |
self.announce.title, s.identifier) |
|
100 |
self.result = total_sent |
|
101 |
self.deliver_time = timezone.now() |
|
102 |
self.save() |
|
103 | ||
65 | 104 |
class Meta: |
66 | 105 |
verbose_name = _('sent') |
67 | 106 |
ordering = ('-deliver_time',) |
corbo/settings.py | ||
---|---|---|
104 | 104 |
RSS_LINK = '' |
105 | 105 |
RSS_LINK_TEMPLATE = '/#announce{0}' |
106 | 106 | |
107 |
# default mass emails expeditor |
|
108 |
CORBO_DEFAULT_FROM_EMAIL = 'webmaster@localhost' |
|
109 | ||
107 | 110 |
# django-mellon settings |
108 | 111 |
MELLON_ATTRIBUTE_MAPPING = { |
109 | 112 |
'username': '{attributes[username][0]}', |
requirements.txt | ||
---|---|---|
1 | 1 |
Django>=1.7, <1.8 |
2 | 2 |
django-ckeditor<4.5.3 |
3 | 3 |
djangorestframework |
4 |
html2text |
|
5 |
emails |
|
4 | 6 |
-e git+http://repos.entrouvert.org/gadjo.git/#egg=gadjo |
7 |
setup.py | ||
---|---|---|
96 | 96 |
install_requires=['django>=1.7, <1.8', |
97 | 97 |
'django-ckeditor<4.5.3', |
98 | 98 |
'djangorestframework', |
99 |
'gadjo' |
|
99 |
'html2text', |
|
100 |
'gadjo', |
|
101 |
'emails', |
|
102 |
'lxml', |
|
100 | 103 |
], |
101 | 104 |
zip_safe=False, |
102 | 105 |
cmdclass={ |
tests/test_emailing.py | ||
---|---|---|
1 |
import pytest |
|
2 |
import json |
|
3 |
from uuid import uuid4 |
|
4 |
import os |
|
5 | ||
6 |
from django.core.urlresolvers import reverse |
|
7 |
from django.utils.http import urlencode |
|
8 |
from django.core import mail |
|
9 |
from django.utils import timezone |
|
10 |
from django.core.files.storage import DefaultStorage |
|
11 | ||
12 |
from corbo.models import Category, Announce, Subscription, Broadcast |
|
13 |
from corbo.models import channel_choices |
|
14 | ||
15 |
pytestmark = pytest.mark.django_db |
|
16 | ||
17 |
CATEGORIES = ('Alerts', 'News') |
|
18 | ||
19 | ||
20 |
@pytest.fixture |
|
21 |
def categories(): |
|
22 |
categories = [] |
|
23 |
for category in CATEGORIES: |
|
24 |
c, created = Category.objects.get_or_create(name=category) |
|
25 |
categories.append(c) |
|
26 |
return categories |
|
27 | ||
28 |
@pytest.fixture |
|
29 |
def announces(): |
|
30 |
announces = [] |
|
31 |
for category in Category.objects.all(): |
|
32 |
a = Announce.objects.create(category=category, title='Announce 1', |
|
33 |
publication_time=timezone.now(), |
|
34 |
text='<h2>Announce 1</h2>') |
|
35 |
Broadcast.objects.create(announce=a) |
|
36 |
announces.append(a) |
|
37 |
a = Announce.objects.create(category=category, title='Announce 2', |
|
38 |
publication_time=timezone.now(), |
|
39 |
text='<h2>Announce 2</h2>') |
|
40 |
Broadcast.objects.create(announce=a) |
|
41 |
announces.append(a) |
|
42 |
return announces |
|
43 | ||
44 |
def test_emailing_with_no_subscriptions(app, categories, announces): |
|
45 |
for announce in announces: |
|
46 |
broadcast = Broadcast.objects.get(announce=announce) |
|
47 |
broadcast.send() |
|
48 |
assert not broadcast.result |
|
49 |
assert not mail.outbox |
|
50 | ||
51 |
def test_send_email(app, categories, announces): |
|
52 |
for category in categories: |
|
53 |
uuid = uuid4() |
|
54 |
s = Subscription.objects.create(category=category, |
|
55 |
identifier='%s@example.net' % uuid, uuid=uuid) |
|
56 |
for announce in announces: |
|
57 |
broadcast= Broadcast.objects.get(announce=announce) |
|
58 |
broadcast.send() |
|
59 |
assert broadcast.result |
|
60 |
assert mail.outbox |
|
61 | ||
62 |
def test_check_inline_css(app, categories, announces): |
|
63 |
for announce in announces: |
|
64 |
announce.text = '<style type="text/css">h2 {color: #F00}</style>' + announce.text |
|
65 |
announce.save() |
|
66 |
uuid = uuid4() |
|
67 |
s = Subscription.objects.create(category=announce.category, |
|
68 |
identifier='%s@example.net' % uuid, uuid=uuid) |
|
69 |
broadcast = Broadcast.objects.get(announce=announce) |
|
70 |
broadcast.send() |
|
71 |
assert broadcast.result |
|
72 |
assert mail.outbox |
|
73 |
assert 'h2 style="color:#F00"' in mail.outbox[0].html |
|
74 |
mail.outbox = [] |
|
75 | ||
76 |
def test_check_inline_images(app, categories, announces): |
|
77 |
storage = DefaultStorage() |
|
78 |
media_path = os.path.join(os.path.dirname(__file__), 'media') |
|
79 |
image_name = 'logo.png' |
|
80 |
storage.save(image_name, file(os.path.join(media_path, image_name))) |
|
81 |
for announce in announces: |
|
82 |
announce.text = announce.text + '<img src="/media/%s" />' % image_name |
|
83 |
announce.save() |
|
84 |
uuid = uuid4() |
|
85 |
s = Subscription.objects.create(category=announce.category, |
|
86 |
identifier='%s@example.net' % uuid, uuid=uuid) |
|
87 |
broadcast = Broadcast.objects.get(announce=announce) |
|
88 |
broadcast.send() |
|
89 |
assert broadcast.result |
|
90 |
assert mail.outbox |
|
91 |
assert storage.url(image_name) in mail.outbox[0].attachments.keys() |
|
92 |
mail.outbox = [] |
|
93 |
storage.delete(image_name) |
|
0 |
- |