1
|
import pytest
|
2
|
from uuid import uuid4
|
3
|
import os
|
4
|
import re
|
5
|
import logging
|
6
|
import mock
|
7
|
import random
|
8
|
import requests
|
9
|
|
10
|
from django.urls import reverse
|
11
|
from django.core import mail, signing
|
12
|
from django.utils import timezone
|
13
|
from django.core.files.storage import DefaultStorage
|
14
|
from django.utils.six.moves.urllib import parse as urllib
|
15
|
from django.utils.text import slugify
|
16
|
from django.test import override_settings
|
17
|
|
18
|
from corbo.models import Category, Announce, Subscription, Broadcast
|
19
|
from corbo.models import channel_choices
|
20
|
|
21
|
pytestmark = pytest.mark.django_db
|
22
|
|
23
|
CATEGORIES = (u'Alerts', u'News')
|
24
|
|
25
|
def get_random_number():
|
26
|
number_generator = list(range(10))
|
27
|
random.shuffle(number_generator)
|
28
|
number = ''.join(map(str, number_generator))
|
29
|
return number
|
30
|
|
31
|
|
32
|
@pytest.fixture
|
33
|
def categories():
|
34
|
categories = []
|
35
|
for category in CATEGORIES:
|
36
|
c, created = Category.objects.get_or_create(name=category, slug=slugify(category))
|
37
|
categories.append(c)
|
38
|
return categories
|
39
|
|
40
|
|
41
|
@pytest.fixture
|
42
|
def announces():
|
43
|
announces = []
|
44
|
for category in Category.objects.all():
|
45
|
a = Announce.objects.create(category=category, title='Announce 1',
|
46
|
publication_time=timezone.now(),
|
47
|
text='<h2>Announce 1</h2>')
|
48
|
Broadcast.objects.create(announce=a)
|
49
|
announces.append(a)
|
50
|
a = Announce.objects.create(category=category, title='Announce 2',
|
51
|
publication_time=timezone.now(),
|
52
|
text='<h2>Announce 2</h2>')
|
53
|
Broadcast.objects.create(announce=a)
|
54
|
announces.append(a)
|
55
|
return announces
|
56
|
|
57
|
|
58
|
def test_emailing_with_no_subscriptions(app, categories, announces, mailoutbox):
|
59
|
for announce in announces:
|
60
|
broadcast = Broadcast.objects.get(announce=announce)
|
61
|
broadcast.send()
|
62
|
assert not broadcast.delivery_count
|
63
|
assert not len(mailoutbox)
|
64
|
|
65
|
|
66
|
def test_send_email(app, categories, announces, mailoutbox):
|
67
|
for category in categories:
|
68
|
uuid = uuid4()
|
69
|
Subscription.objects.create(category=category,
|
70
|
identifier='mailto:%s@example.net' % uuid, uuid=uuid)
|
71
|
for i, announce in enumerate(announces):
|
72
|
broadcast = Broadcast.objects.get(announce=announce)
|
73
|
broadcast.send()
|
74
|
assert broadcast.delivery_count
|
75
|
assert len(mailoutbox) == i+1
|
76
|
|
77
|
|
78
|
def test_check_inline_css(app, categories, announces, mailoutbox):
|
79
|
total_sent = 0
|
80
|
for i, announce in enumerate(announces):
|
81
|
announce.text = '<style type="text/css">h2 {color: #F00}</style>' + announce.text
|
82
|
announce.save()
|
83
|
uuid = uuid4()
|
84
|
Subscription.objects.create(category=announce.category,
|
85
|
identifier='mailto:%s@example.net' % uuid, uuid=uuid)
|
86
|
broadcast = Broadcast.objects.get(announce=announce)
|
87
|
broadcast.send()
|
88
|
assert broadcast.delivery_count
|
89
|
assert len(mailoutbox) == total_sent + broadcast.delivery_count
|
90
|
total_sent += broadcast.delivery_count
|
91
|
assert 'h2 style="color:#F00"' in mailoutbox[i].html
|
92
|
|
93
|
|
94
|
@mock.patch('emails.utils.requests.get')
|
95
|
def test_check_inline_images(mocked_get, app, categories, announces, mailoutbox):
|
96
|
storage = DefaultStorage()
|
97
|
media_path = os.path.join(os.path.dirname(__file__), 'media')
|
98
|
image_name = 'logo.png'
|
99
|
image_name = storage.save(image_name, open(os.path.join(media_path, image_name), 'rb'))
|
100
|
total_sent = 0
|
101
|
for i, announce in enumerate(announces):
|
102
|
img_src = "/media/%s" % image_name
|
103
|
announce.text = announce.text + '<img src="%s" />' % img_src
|
104
|
announce.save()
|
105
|
uuid = uuid4()
|
106
|
Subscription.objects.create(category=announce.category,
|
107
|
identifier='mailto:%s@example.net' % uuid, uuid=uuid)
|
108
|
broadcast = Broadcast.objects.get(announce=announce)
|
109
|
mocked_get.return_value = mock.Mock(status_code=200,
|
110
|
headers={'content-type': 'image/png'},
|
111
|
content=storage.open(image_name).read())
|
112
|
broadcast.send()
|
113
|
assert broadcast.delivery_count
|
114
|
|
115
|
assert len(mailoutbox) == total_sent + broadcast.delivery_count
|
116
|
attachments = [a['filename'] for a in mailoutbox[0].attachments.as_dict()]
|
117
|
assert image_name in attachments
|
118
|
assert 'cid:%s' % image_name in mail.outbox[0].html_body
|
119
|
assert 'cid:%s' % image_name in mail.outbox[0].text_body
|
120
|
total_sent += broadcast.delivery_count
|
121
|
storage.delete(image_name)
|
122
|
|
123
|
|
124
|
def test_unsubscription_link(app, categories, announces, custom_mailoutbox):
|
125
|
unsubscription_link_sentinel = ''
|
126
|
subscriptions_number = 3
|
127
|
scheme = 'mailto:'
|
128
|
for category in categories:
|
129
|
for i in range(subscriptions_number):
|
130
|
uuid = uuid4()
|
131
|
uri = scheme + '%s@example.com' % uuid
|
132
|
Subscription.objects.create(category=category, identifier=uri, uuid=str(uuid))
|
133
|
|
134
|
for i, announce in enumerate(announces):
|
135
|
if announce.category != category:
|
136
|
continue
|
137
|
broadcast = Broadcast.objects.get(announce=announce)
|
138
|
broadcast.send()
|
139
|
assert broadcast.delivery_count
|
140
|
assert len(mail.outbox) == (i+1)*subscriptions_number
|
141
|
assert mail.outbox[i*subscriptions_number].subject == announce.title
|
142
|
|
143
|
for counter, destination in enumerate(category.subscription_set.all()):
|
144
|
index = i*subscriptions_number+counter
|
145
|
signature = urllib.unquote(re.findall('/unsubscribe/(.*)"', mail.outbox[index].html)[0])
|
146
|
unsubscription_link = reverse('unsubscribe', kwargs={'unsubscription_token': signature})
|
147
|
assert mail.outbox[index]._headers['List-Unsubscribe'] == '<http://localhost%s>' % unsubscription_link
|
148
|
assert unsubscription_link in mail.outbox[index].html
|
149
|
assert unsubscription_link in mail.outbox[index].text
|
150
|
assert unsubscription_link_sentinel != unsubscription_link
|
151
|
assert signing.loads(signature) == {
|
152
|
'category': announce.category.pk, 'identifier': destination.identifier}
|
153
|
unsubscription_link_sentinel = unsubscription_link
|
154
|
|
155
|
# refuse altered signature
|
156
|
resp = app.get(unsubscription_link + 'altered', status=404)
|
157
|
|
158
|
# make sure the uri schema is not in the page
|
159
|
resp = app.get(unsubscription_link)
|
160
|
assert scheme not in resp
|
161
|
|
162
|
def test_send_sms_with_no_gateway_defined(app, categories, announces, caplog):
|
163
|
for category in categories:
|
164
|
uuid = uuid4()
|
165
|
Subscription.objects.create(category=category,
|
166
|
identifier='sms:%s' % get_random_number(), uuid=uuid)
|
167
|
for i, announce in enumerate(announces):
|
168
|
broadcast = Broadcast.objects.get(announce=announce)
|
169
|
broadcast.send()
|
170
|
assert broadcast.delivery_count == 0
|
171
|
for record in caplog.records:
|
172
|
assert record.name == 'corbo.utils'
|
173
|
assert record.levelno == logging.ERROR
|
174
|
assert record.getMessage() == 'SMS send requested but no SMS gateway defined.'
|
175
|
|
176
|
@mock.patch('corbo.utils.requests.post')
|
177
|
def test_send_sms_with_gateway_api_error(mocked_post, app, categories, announces, caplog):
|
178
|
for category in categories:
|
179
|
for i in range(3):
|
180
|
uuid = uuid4()
|
181
|
Subscription.objects.create(category=category,
|
182
|
identifier='sms:%s' % get_random_number(), uuid=uuid)
|
183
|
for i, announce in enumerate(announces):
|
184
|
broadcast = Broadcast.objects.get(announce=announce)
|
185
|
with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
|
186
|
mocked_response = mock.Mock()
|
187
|
mocked_response.json.return_value = {'err': 1, 'data': None,
|
188
|
'err_desc': 'Payload error: missing "message" in JSON payload'}
|
189
|
mocked_post.return_value = mocked_response
|
190
|
broadcast.send()
|
191
|
assert broadcast.delivery_count == 0
|
192
|
records = caplog.records
|
193
|
assert len(records) == 1 + i
|
194
|
for record in records:
|
195
|
assert record.name == 'corbo.utils'
|
196
|
assert record.levelno == logging.WARNING
|
197
|
assert record.getMessage() == 'Error occured while sending sms: Payload error: missing "message" in JSON payload'
|
198
|
|
199
|
@mock.patch('corbo.utils.requests.post')
|
200
|
def test_send_sms_with_gateway_connection_error(mocked_post, app, categories, announces, caplog):
|
201
|
for category in categories:
|
202
|
for i in range(3):
|
203
|
uuid = uuid4()
|
204
|
Subscription.objects.create(category=category,
|
205
|
identifier='sms:%s' % get_random_number(), uuid=uuid)
|
206
|
for i, announce in enumerate(announces):
|
207
|
broadcast = Broadcast.objects.get(announce=announce)
|
208
|
with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
|
209
|
mocked_response = mock.Mock()
|
210
|
def mocked_requests_connection_error(*args, **kwargs):
|
211
|
raise requests.ConnectionError('unreachable')
|
212
|
mocked_post.side_effect = mocked_requests_connection_error
|
213
|
mocked_post.return_value = mocked_response
|
214
|
broadcast.send()
|
215
|
assert broadcast.delivery_count == 0
|
216
|
records = caplog.records
|
217
|
assert len(records) == 1 + i
|
218
|
for record in records:
|
219
|
assert record.name == 'corbo.utils'
|
220
|
assert record.levelno == logging.WARNING
|
221
|
assert record.getMessage() == 'Failed to reach SMS gateway: unreachable'
|
222
|
|
223
|
@mock.patch('corbo.utils.requests.post')
|
224
|
def test_send_sms(mocked_post, app, categories, announces):
|
225
|
for category in categories:
|
226
|
for i in range(3):
|
227
|
uuid = uuid4()
|
228
|
Subscription.objects.create(category=category,
|
229
|
identifier='sms:%s' % get_random_number(), uuid=uuid)
|
230
|
for announce in announces:
|
231
|
broadcast = Broadcast.objects.get(announce=announce)
|
232
|
with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
|
233
|
mocked_response = mock.Mock()
|
234
|
mocked_response.json.return_value = {'err': 0, 'err_desc': None, 'data': 'gateway response'}
|
235
|
|
236
|
for announce in announces:
|
237
|
broadcast = Broadcast.objects.get(announce=announce)
|
238
|
with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
|
239
|
mocked_response = mock.Mock()
|
240
|
mocked_response.json.return_value = {'err': 0, 'err_desc': None, 'data': 'gateway response'}
|
241
|
mocked_post.return_value = mocked_response
|
242
|
broadcast.send()
|
243
|
assert mocked_post.call_args[0][0] == 'http://sms.gateway'
|
244
|
assert mocked_post.call_args[1]['json']['from'] == 'Corbo'
|
245
|
assert isinstance(mocked_post.call_args[1]['json']['to'], list)
|
246
|
assert broadcast.delivery_count == 3
|