Project

General

Profile

Download (10.8 KB) Statistics
| Branch: | Tag: | Revision:

root / tests / test_broadcasting.py @ e01c978b

1
import pytest
2
from uuid import uuid4
3
import os
4
import re
5
import logging
6
import mock
7
import random
8
import requests
9

    
10
from django.urls import reverse
11
from django.core import mail, signing
12
from django.utils import timezone
13
from django.core.files.storage import DefaultStorage
14
from django.utils.six.moves.urllib import parse as urllib
15
from django.utils.text import slugify
16
from django.test import override_settings
17

    
18
from corbo.models import Category, Announce, Subscription, Broadcast
19
from corbo.models import channel_choices
20

    
21
pytestmark = pytest.mark.django_db
22

    
23
CATEGORIES = (u'Alerts', u'News')
24

    
25
def get_random_number():
26
    number_generator = list(range(10))
27
    random.shuffle(number_generator)
28
    number = ''.join(map(str, number_generator))
29
    return number
30

    
31

    
32
@pytest.fixture
33
def categories():
34
    categories = []
35
    for category in CATEGORIES:
36
        c, created = Category.objects.get_or_create(name=category, slug=slugify(category))
37
        categories.append(c)
38
    return categories
39

    
40

    
41
@pytest.fixture
42
def announces():
43
    announces = []
44
    for category in Category.objects.all():
45
        a = Announce.objects.create(category=category, title='Announce 1',
46
                                    publication_time=timezone.now(),
47
                                    text='<h2>Announce 1</h2>')
48
        Broadcast.objects.create(announce=a)
49
        announces.append(a)
50
        a = Announce.objects.create(category=category, title='Announce 2',
51
                                    publication_time=timezone.now(),
52
                                    text='<h2>Announce 2</h2>')
53
        Broadcast.objects.create(announce=a)
54
        announces.append(a)
55
    return announces
56

    
57

    
58
def test_emailing_with_no_subscriptions(app, categories, announces, mailoutbox):
59
    for announce in announces:
60
        broadcast = Broadcast.objects.get(announce=announce)
61
        broadcast.send()
62
        assert not broadcast.delivery_count
63
    assert not len(mailoutbox)
64

    
65

    
66
def test_send_email(app, categories, announces, mailoutbox):
67
    for category in categories:
68
        uuid = uuid4()
69
        Subscription.objects.create(category=category,
70
                                    identifier='mailto:%s@example.net' % uuid, uuid=uuid)
71
    for i, announce in enumerate(announces):
72
        broadcast = Broadcast.objects.get(announce=announce)
73
        broadcast.send()
74
        assert broadcast.delivery_count
75
        assert len(mailoutbox) == i+1
76

    
77

    
78
def test_check_inline_css(app, categories, announces, mailoutbox):
79
    total_sent = 0
80
    for i, announce in enumerate(announces):
81
        announce.text = '<style type="text/css">h2 {color: #F00}</style>' + announce.text
82
        announce.save()
83
        uuid = uuid4()
84
        Subscription.objects.create(category=announce.category,
85
                                    identifier='mailto:%s@example.net' % uuid, uuid=uuid)
86
        broadcast = Broadcast.objects.get(announce=announce)
87
        broadcast.send()
88
        assert broadcast.delivery_count
89
        assert len(mailoutbox) == total_sent + broadcast.delivery_count
90
        total_sent += broadcast.delivery_count
91
        assert 'h2 style="color:#F00"' in mailoutbox[i].html
92

    
93

    
94
@mock.patch('emails.utils.requests.get')
95
def test_check_inline_images(mocked_get, app, categories, announces, mailoutbox):
96
    storage = DefaultStorage()
97
    media_path = os.path.join(os.path.dirname(__file__), 'media')
98
    image_name = 'logo.png'
99
    image_name = storage.save(image_name, open(os.path.join(media_path, image_name), 'rb'))
100
    total_sent = 0
101
    for i, announce in enumerate(announces):
102
        img_src = "/media/%s" % image_name
103
        announce.text = announce.text + '<img src="%s" />' % img_src
104
        announce.save()
105
        uuid = uuid4()
106
        Subscription.objects.create(category=announce.category,
107
                                    identifier='mailto:%s@example.net' % uuid, uuid=uuid)
108
        broadcast = Broadcast.objects.get(announce=announce)
109
        mocked_get.return_value = mock.Mock(status_code=200,
110
                                            headers={'content-type': 'image/png'},
111
                                            content=storage.open(image_name).read())
112
        broadcast.send()
113
        assert broadcast.delivery_count
114

    
115
        assert len(mailoutbox) == total_sent + broadcast.delivery_count
116
        attachments = [a['filename'] for a in mailoutbox[0].attachments.as_dict()]
117
        assert image_name in attachments
118
        assert 'cid:%s' % image_name in mail.outbox[0].html_body
119
        assert 'cid:%s' % image_name in mail.outbox[0].text_body
120
        total_sent += broadcast.delivery_count
121
    storage.delete(image_name)
122

    
123

    
124
def test_unsubscription_link(app, categories, announces, custom_mailoutbox):
125
    unsubscription_link_sentinel = ''
126
    subscriptions_number = 3
127
    scheme = 'mailto:'
128
    for category in categories:
129
        for i in range(subscriptions_number):
130
            uuid = uuid4()
131
            uri = scheme + '%s@example.com' % uuid
132
            Subscription.objects.create(category=category, identifier=uri, uuid=str(uuid))
133

    
134
        for i, announce in enumerate(announces):
135
            if announce.category != category:
136
                continue
137
            broadcast = Broadcast.objects.get(announce=announce)
138
            broadcast.send()
139
            assert broadcast.delivery_count
140
            assert len(mail.outbox) == (i+1)*subscriptions_number
141
            assert mail.outbox[i*subscriptions_number].subject == announce.title
142

    
143
            for counter, destination in enumerate(category.subscription_set.all()):
144
                index = i*subscriptions_number+counter
145
                signature = urllib.unquote(re.findall('/unsubscribe/(.*)"', mail.outbox[index].html)[0])
146
                unsubscription_link = reverse('unsubscribe', kwargs={'unsubscription_token': signature})
147
                assert mail.outbox[index]._headers['List-Unsubscribe'] == '<http://localhost%s>' % unsubscription_link
148
                assert unsubscription_link in mail.outbox[index].html
149
                assert unsubscription_link in mail.outbox[index].text
150
                assert unsubscription_link_sentinel != unsubscription_link
151
                assert signing.loads(signature) == {
152
                    'category': announce.category.pk, 'identifier': destination.identifier}
153
                unsubscription_link_sentinel = unsubscription_link
154

    
155
                # refuse altered signature
156
                resp = app.get(unsubscription_link + 'altered', status=404)
157

    
158
                # make sure the uri schema is not in the page
159
                resp = app.get(unsubscription_link)
160
                assert scheme not in resp
161

    
162
def test_send_sms_with_no_gateway_defined(app, categories, announces, caplog):
163
    for category in categories:
164
        uuid = uuid4()
165
        Subscription.objects.create(category=category,
166
                                    identifier='sms:%s' % get_random_number(), uuid=uuid)
167
    for i, announce in enumerate(announces):
168
        broadcast = Broadcast.objects.get(announce=announce)
169
        broadcast.send()
170
        assert broadcast.delivery_count == 0
171
        for record in caplog.records:
172
            assert record.name == 'corbo.utils'
173
            assert record.levelno == logging.ERROR
174
            assert record.getMessage() == 'SMS send requested but no SMS gateway defined.'
175

    
176
@mock.patch('corbo.utils.requests.post')
177
def test_send_sms_with_gateway_api_error(mocked_post, app, categories, announces, caplog):
178
    for category in categories:
179
        for i in range(3):
180
            uuid = uuid4()
181
            Subscription.objects.create(category=category,
182
                            identifier='sms:%s' % get_random_number(), uuid=uuid)
183
    for i, announce in enumerate(announces):
184
        broadcast = Broadcast.objects.get(announce=announce)
185
        with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
186
            mocked_response = mock.Mock()
187
            mocked_response.json.return_value = {'err': 1, 'data': None,
188
                            'err_desc': 'Payload error: missing "message" in JSON payload'}
189
            mocked_post.return_value = mocked_response
190
            broadcast.send()
191
            assert broadcast.delivery_count == 0
192
            records = caplog.records
193
            assert len(records) == 1 + i
194
            for record in records:
195
                assert record.name == 'corbo.utils'
196
                assert record.levelno == logging.WARNING
197
                assert record.getMessage() == 'Error occured while sending sms: Payload error: missing "message" in JSON payload'
198

    
199
@mock.patch('corbo.utils.requests.post')
200
def test_send_sms_with_gateway_connection_error(mocked_post, app, categories, announces, caplog):
201
    for category in categories:
202
        for i in range(3):
203
            uuid = uuid4()
204
            Subscription.objects.create(category=category,
205
                            identifier='sms:%s' % get_random_number(), uuid=uuid)
206
    for i, announce in enumerate(announces):
207
        broadcast = Broadcast.objects.get(announce=announce)
208
        with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
209
            mocked_response = mock.Mock()
210
            def mocked_requests_connection_error(*args, **kwargs):
211
                raise requests.ConnectionError('unreachable')
212
            mocked_post.side_effect = mocked_requests_connection_error
213
            mocked_post.return_value = mocked_response
214
            broadcast.send()
215
            assert broadcast.delivery_count == 0
216
            records = caplog.records
217
            assert len(records) == 1 + i
218
            for record in records:
219
                assert record.name == 'corbo.utils'
220
                assert record.levelno == logging.WARNING
221
                assert record.getMessage() == 'Failed to reach SMS gateway: unreachable'
222

    
223
@mock.patch('corbo.utils.requests.post')
224
def test_send_sms(mocked_post, app, categories, announces):
225
    for category in categories:
226
        for i in range(3):
227
            uuid = uuid4()
228
            Subscription.objects.create(category=category,
229
                            identifier='sms:%s' % get_random_number(), uuid=uuid)
230
    for announce in announces:
231
        broadcast = Broadcast.objects.get(announce=announce)
232
        with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
233
            mocked_response = mock.Mock()
234
            mocked_response.json.return_value = {'err': 0, 'err_desc': None, 'data': 'gateway response'}
235

    
236
    for announce in announces:
237
        broadcast = Broadcast.objects.get(announce=announce)
238
        with override_settings(SMS_GATEWAY_URL='http://sms.gateway'):
239
            mocked_response = mock.Mock()
240
            mocked_response.json.return_value = {'err': 0, 'err_desc': None, 'data': 'gateway response'}
241
            mocked_post.return_value = mocked_response
242
            broadcast.send()
243
            assert mocked_post.call_args[0][0] == 'http://sms.gateway'
244
            assert mocked_post.call_args[1]['json']['from'] == 'Corbo'
245
            assert isinstance(mocked_post.call_args[1]['json']['to'], list)
246
            assert broadcast.delivery_count == 3
(5-5/10)