0002-sms-authorize-numbers-using-masks-39650.patch
passerelle/sms/models.py | ||
---|---|---|
1 |
# -*- coding: utf-8 -*- |
|
1 | 2 |
# passerelle - uniform access to multiple data sources and services |
2 | 3 |
# Copyright (C) 2020 Entr'ouvert |
3 | 4 |
# |
4 | 5 |
# This program is free software: you can redistribute it and/or modify it |
5 | 6 |
# under the terms of the GNU Affero General Public License as published |
6 | 7 |
# by the Free Software Foundation, either version 3 of the License, or |
7 | 8 |
# (at your option) any later version. |
8 | 9 |
# |
... | ... | |
162 | 163 |
elif number.startswith(self.default_trunk_prefix): |
163 | 164 |
number = '00' + self.default_country_code + number[len(self.default_trunk_prefix):] |
164 | 165 |
else: |
165 | 166 |
raise APIError('phone number %r is unsupported (no international prefix, ' |
166 | 167 |
'no local trunk prefix)' % number) |
167 | 168 |
numbers.append(number) |
168 | 169 |
return numbers |
169 | 170 | |
171 |
def authorize_numbers(self, destinations): |
|
172 |
unknown_numbers = set() |
|
173 |
premium_numbers = set() |
|
174 |
regexes = [] |
|
175 |
if SMSResource.ALL not in self.authorized: |
|
176 |
if SMSResource.FR_METRO in self.authorized: |
|
177 |
regexes.append(r'0033[67]\d{8}') # France |
|
178 |
if SMSResource.FR_DOMTOM in self.authorized: |
|
179 |
regexes.append(r'00262262\d{6}') # Réunion, Mayotte, Terres australe/antarctiques |
|
180 |
regexes.append(r'508508\d{6}') # Saint-Pierre-et-Miquelon |
|
181 |
regexes.append(r'590590\d{6}') # Guadeloupe, Saint-Barthélemy, Saint-Martin |
|
182 |
regexes.append(r'594594\d{6}') # Guyane |
|
183 |
regexes.append(r'596596\d{6}') # Martinique |
|
184 |
regexes.append(r'00687[67]\d{8}') # Nouvelle-Calédonie |
|
185 |
if SMSResource.BE_ in self.authorized: |
|
186 |
regexes.append(r'00324[5-9]\d{7}') # Belgian |
|
187 | ||
188 |
unknown_numbers = set(destinations) |
|
189 |
for value in regexes: |
|
190 |
regex = re.compile(value) |
|
191 |
unknown_numbers = set(dest for dest in unknown_numbers if not regex.match(dest)) |
|
192 | ||
193 |
for number in list(unknown_numbers): |
|
194 |
logging.warning('phone number does not match any authorization mask: %s', number) |
|
195 | ||
196 |
if SMSResource.NO_ == self.premium_rate: |
|
197 |
regex = re.compile(r'0033[8]\d{8}') |
|
198 |
premium_numbers = set(dest for dest in destinations if regex.match(dest)) |
|
199 |
for number in list(premium_numbers): |
|
200 |
logging.warning('primium rate phone number: %s', number) |
|
201 | ||
202 |
unauthorized_numbers = list(unknown_numbers.union(premium_numbers)) |
|
203 |
if unauthorized_numbers != []: |
|
204 |
raise APIError('phone numbers not authorized: %s' % ', '.join(unauthorized_numbers)) |
|
205 |
return True |
|
206 | ||
170 | 207 |
@endpoint(perm='can_send_messages', methods=['post'], |
171 | 208 |
description=_('Send a SMS message'), |
172 | 209 |
parameters={'nostop': {'description': _('Do not send STOP instruction'), 'example_value': '1'}}, |
173 | 210 |
post={'request_body': {'schema': {'application/json': SEND_SCHEMA}}}) |
174 | 211 |
def send(self, request, post_data, nostop=None): |
175 | 212 |
post_data['message'] = post_data['message'][:self.max_message_length] |
176 | 213 |
post_data['to'] = self.clean_numbers(post_data['to']) |
214 |
self.authorize_numbers(post_data['to']) |
|
177 | 215 |
logging.info('sending SMS to %r from %r', post_data['to'], post_data['from']) |
178 | 216 |
stop = nostop is None # ?nostop in not in query string |
179 | 217 |
self.add_job('send_job', |
180 | 218 |
text=post_data['message'], sender=post_data['from'], destinations=post_data['to'], |
181 | 219 |
stop=stop) |
182 | 220 |
return {'err': 0} |
183 | 221 | |
184 | 222 |
def send_job(self, *args, **kwargs): |
tests/test_sms.py | ||
---|---|---|
10 | 10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | 11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | 12 |
# GNU Affero General Public License for more details. |
13 | 13 |
# |
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 |
import isodate |
17 | 17 |
import json |
18 |
import logging |
|
18 | 19 |
import mock |
19 | 20 |
import pytest |
20 | 21 |
from requests import RequestException |
21 | 22 | |
22 | 23 |
from django.conf import settings |
23 | 24 |
from django.contrib.contenttypes.models import ContentType |
24 | 25 |
from django.core.urlresolvers import reverse |
25 | 26 |
from django.utils.translation import ugettext as _ |
26 | 27 | |
27 | 28 |
from passerelle.apps.choosit.models import ChoositSMSGateway |
28 | 29 |
from passerelle.apps.ovh.models import OVHSMSGateway |
29 |
from passerelle.base.models import ApiUser, AccessRight, Job |
|
30 |
from passerelle.base.models import ApiUser, AccessRight, Job, ResourceLog
|
|
30 | 31 |
from passerelle.sms.models import SMSResource, SMSLog |
31 | 32 |
from passerelle.utils.jsonresponse import APIError |
32 | 33 | |
33 | 34 |
from test_manager import login, admin_user |
34 | 35 | |
35 | 36 |
import utils |
36 | 37 | |
37 | 38 |
pytestmark = pytest.mark.django_db |
... | ... | |
49 | 50 |
connector.save() |
50 | 51 |
assert connector.clean_numbers(['+ 33 12']) == ['003312'] |
51 | 52 |
assert connector.clean_numbers(['0 0 33 12']) == ['003312'] |
52 | 53 |
assert connector.clean_numbers(['1 12']) == ['003212'] |
53 | 54 |
with pytest.raises(APIError, match='phone number %r is unsupported' % '0123'): |
54 | 55 |
connector.clean_numbers(['0123']) |
55 | 56 | |
56 | 57 | |
58 |
def test_authorize_numbers(): |
|
59 |
connector = OVHSMSGateway() |
|
60 | ||
61 |
# premium-rate |
|
62 |
assert connector.premium_rate == SMSResource.NO_ |
|
63 |
number = '0033' + '8' + '12345678' |
|
64 |
with pytest.raises(APIError, match='phone numbers not authorized: %s' % number): |
|
65 |
assert connector.authorize_numbers([number]) |
|
66 |
connector.premium_rate = SMSResource.YES |
|
67 |
connector.save() |
|
68 | ||
69 |
# All country |
|
70 |
assert connector.authorized == [SMSResource.ALL] |
|
71 |
number = '0033' + '1' + '12345678' |
|
72 |
assert connector.authorize_numbers([number]) |
|
73 |
connector.authorized = [SMSResource.FR_METRO] |
|
74 |
connector.save() |
|
75 |
with pytest.raises(APIError, match='phone numbers not authorized: %s' % number): |
|
76 |
connector.authorize_numbers([number]) |
|
77 | ||
78 |
# France |
|
79 |
number = '0033' + '6' + '12345678' |
|
80 |
assert connector.authorize_numbers([number]) |
|
81 |
connector.authorized = [SMSResource.FR_DOMTOM] |
|
82 |
connector.save() |
|
83 |
with pytest.raises(APIError, match='phone numbers not authorized: %s' % number): |
|
84 |
connector.authorize_numbers([number]) |
|
85 | ||
86 |
# Dom-Tom |
|
87 |
number = '596596' + '123456' |
|
88 |
assert connector.authorize_numbers([number]) |
|
89 |
connector.authorized = [SMSResource.BE_] |
|
90 |
connector.save() |
|
91 |
with pytest.raises(APIError, match='phone numbers not authorized: %s' % number): |
|
92 |
connector.authorize_numbers([number]) |
|
93 | ||
94 |
# Belgian |
|
95 |
number = '0032' + '45' + '1234567' |
|
96 |
assert connector.authorize_numbers([number]) |
|
97 |
connector.authorized = [SMSResource.FR_METRO] |
|
98 |
connector.save() |
|
99 |
with pytest.raises(APIError, match='phone numbers not authorized: %s' % number): |
|
100 |
connector.authorize_numbers([number]) |
|
101 | ||
102 | ||
57 | 103 |
@pytest.fixture(params=klasses) |
58 | 104 |
def connector(request, db): |
59 | 105 |
klass = request.param |
60 | 106 |
kwargs = getattr(klass, 'TEST_DEFAULTS', {}).get('create_kwargs', {}) |
61 | 107 |
kwargs.update({ |
62 | 108 |
'title': klass.__name__, |
63 | 109 |
'slug': klass.__name__.lower(), |
64 | 110 |
'description': klass.__name__, |
... | ... | |
415 | 461 |
'There were errors processing your form.' |
416 | 462 |
resp.html.find('div', {'class': 'error'}).text.strip() == 'This field is required.' |
417 | 463 |
resp.form['authorized'] = [SMSResource.FR_METRO, SMSResource.FR_DOMTOM] |
418 | 464 |
resp = resp.form.submit() |
419 | 465 |
resp = resp.follow() |
420 | 466 |
assert _('France mainland (+33 [67])') in [ |
421 | 467 |
x.text for x in resp.html.find_all('p') |
422 | 468 |
if x.text.startswith(_('Authorized Countries'))][0] |
469 | ||
470 |
path = '/%s/%s/send/' % (connector.get_connector_slug(), connector.slug) |
|
471 |
payload = { |
|
472 |
'message': 'plop', |
|
473 |
'from': '+33699999999', |
|
474 |
'to': ['+33688888888'], |
|
475 |
} |
|
476 |
app.post_json(path, params=payload) |
|
477 |
with mock.patch.object(type(connector), 'send_msg') as send_function: |
|
478 |
send_function.return_value = {} |
|
479 |
connector.jobs() |
|
480 |
assert SMSLog.objects.count() == 1 |
|
481 | ||
482 |
payload['to'][0] = '+33188888888' |
|
483 |
SMSLog.objects.all().delete() |
|
484 |
app.post_json(path, params=payload) |
|
485 |
with mock.patch.object(type(connector), 'send_msg') as send_function: |
|
486 |
send_function.return_value = {} |
|
487 |
connector.jobs() |
|
488 |
assert not SMSLog.objects.count() |
|
489 |
assert ResourceLog.objects.filter(levelno=logging.WARNING).count() == 1 |
|
490 |
assert ResourceLog.objects.filter(levelno=30)[0].extra['exception'] == \ |
|
491 |
'phone numbers not authorized: 0033188888888' |
|
423 |
- |