Projet

Général

Profil

0002-sms-authorize-numbers-using-masks-39650.patch

Voir les différences:

Subject: [PATCH 2/2] sms: authorize numbers using masks (#39650)

 passerelle/sms/models.py | 38 +++++++++++++++++++++
 tests/test_sms.py        | 71 +++++++++++++++++++++++++++++++++++++++-
 2 files changed, 108 insertions(+), 1 deletion(-)
passerelle/sms/models.py
1
# -*- coding: utf-8 -*-
1 2
# passerelle - uniform access to multiple data sources and services
2 3
# Copyright (C) 2020  Entr'ouvert
3 4
#
4 5
# This program is free software: you can redistribute it and/or modify it
5 6
# under the terms of the GNU Affero General Public License as published
6 7
# by the Free Software Foundation, either version 3 of the License, or
7 8
# (at your option) any later version.
8 9
#
......
131 132
            elif number.startswith(self.default_trunk_prefix):
132 133
                number = '00' + self.default_country_code + number[len(self.default_trunk_prefix):]
133 134
            else:
134 135
                raise APIError('phone number %r is unsupported (no international prefix, '
135 136
                               'no local trunk prefix)' % number)
136 137
            numbers.append(number)
137 138
        return numbers
138 139

  
140
    def authorize_numbers(self, destinations):
141
        unknown_numbers = set()
142
        premium_numbers = set()
143
        regexes = []
144
        if SMSResource.ALL not in self.authorized:
145
            if SMSResource.FR_METRO in self.authorized:
146
                regexes.append(r'0033[67]\d{8}')    # France
147
            if SMSResource.FR_DOMTOM in self.authorized:
148
                regexes.append(r'00262262\d{6}')    # Réunion, Mayotte, Terres australe/antarctiques
149
                regexes.append(r'508508\d{6}')      # Saint-Pierre-et-Miquelon
150
                regexes.append(r'590590\d{6}')      # Guadeloupe, Saint-Barthélemy, Saint-Martin
151
                regexes.append(r'594594\d{6}')      # Guyane
152
                regexes.append(r'596596\d{6}')      # Martinique
153
                regexes.append(r'00687[67]\d{8}')   # Nouvelle-Calédonie
154
            if SMSResource.BE_ in self.authorized:
155
                regexes.append(r'00324[5-9]\d{7}')  # Belgian
156

  
157
            unknown_numbers = set(destinations)
158
            for value in regexes:
159
                regex = re.compile(value)
160
                unknown_numbers = set(dest for dest in unknown_numbers if not regex.match(dest))
161

  
162
            for number in list(unknown_numbers):
163
                logging.warning('phone number does not match any authorization mask: %s', number)
164

  
165
        if SMSResource.NO_ == self.premium_rate:
166
            regex = re.compile(r'0033[8]\d{8}')
167
            premium_numbers = set(dest for dest in destinations if regex.match(dest))
168
        for number in list(premium_numbers):
169
            logging.warning('primium rate phone number: %s', number)
170

  
171
        unauthorized_numbers = list(unknown_numbers.union(premium_numbers))
172
        if unauthorized_numbers != []:
173
            raise APIError('phone numbers not authorized: %s' % ', '.join(unauthorized_numbers))
174
        return True
175

  
139 176
    @endpoint(perm='can_send_messages', methods=['post'])
140 177
    def send(self, request, *args, **kwargs):
141 178
        try:
142 179
            data = json_loads(request.body)
143 180
            assert isinstance(data, dict), 'JSON payload is not a dict'
144 181
            assert 'message' in data, 'missing "message" in JSON payload'
145 182
            assert 'from' in data, 'missing "from" in JSON payload'
146 183
            assert 'to' in data, 'missing "to" in JSON payload'
......
148 185
            assert isinstance(data['from'], six.text_type), 'from is not a string'
149 186
            assert all(map(lambda x: isinstance(x, six.text_type), data['to'])), \
150 187
                'to is not a list of strings'
151 188
        except (ValueError, AssertionError) as e:
152 189
            raise APIError('Payload error: %s' % e)
153 190
        data['message'] = data['message'][:self.max_message_length]
154 191
        data['to'] = self.clean_numbers(data['to'])
155 192
        logging.info('sending SMS to %r from %r', data['to'], data['from'])
193
        self.authorize_numbers(data['to'])
156 194
        stop = not bool('nostop' in request.GET)
157 195
        self.add_job('send_job',
158 196
                     text=data['message'], sender=data['from'], destinations=data['to'],
159 197
                     stop=stop)
160 198
        return {'err': 0}
161 199

  
162 200
    def send_job(self, *args, **kwargs):
163 201
        self.send_msg(**kwargs)
tests/test_sms.py
9 9
# This program is distributed in the hope that it will be useful,
10 10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 12
# GNU Affero General Public License for more details.
13 13
#
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16
import isodate
17
import logging
17 18
import mock
18 19
import pytest
19 20
from requests import RequestException
20 21

  
21 22
from django.contrib.contenttypes.models import ContentType
22 23
from django.utils.translation import ugettext as _
23 24

  
24 25
from passerelle.apps.choosit.models import ChoositSMSGateway
25 26
from passerelle.apps.ovh.models import OVHSMSGateway
26
from passerelle.base.models import ApiUser, AccessRight, Job
27
from passerelle.base.models import ApiUser, AccessRight, Job, ResourceLog
27 28
from passerelle.sms.models import SMSResource, SMSLog
28 29
from passerelle.utils.jsonresponse import APIError
29 30

  
30 31
from test_manager import login, admin_user
31 32

  
32 33
import utils
33 34

  
34 35
pytestmark = pytest.mark.django_db
......
46 47
    connector.save()
47 48
    assert connector.clean_numbers(['+ 33 12']) == ['003312']
48 49
    assert connector.clean_numbers(['0 0 33 12']) == ['003312']
49 50
    assert connector.clean_numbers(['1 12']) == ['003212']
50 51
    with pytest.raises(APIError, match='phone number %r is unsupported' % '0123'):
51 52
        connector.clean_numbers(['0123'])
52 53

  
53 54

  
55
def test_authorize_numbers():
56
    connector = OVHSMSGateway()
57

  
58
    # premium-rate
59
    assert connector.premium_rate == SMSResource.NO_
60
    number = '0033' + '8' + '12345678'
61
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
62
        assert connector.authorize_numbers([number])
63
    connector.premium_rate = SMSResource.YES
64
    connector.save()
65

  
66
    # All country
67
    assert connector.authorized == [SMSResource.ALL]
68
    number = '0033' + '1' + '12345678'
69
    assert connector.authorize_numbers([number])
70
    connector.authorized = [SMSResource.FR_METRO]
71
    connector.save()
72
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
73
        connector.authorize_numbers([number])
74

  
75
    # France
76
    number = '0033' + '6' + '12345678'
77
    assert connector.authorize_numbers([number])
78
    connector.authorized = [SMSResource.FR_DOMTOM]
79
    connector.save()
80
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
81
        connector.authorize_numbers([number])
82

  
83
    # Dom-Tom
84
    number = '596596' + '123456'
85
    assert connector.authorize_numbers([number])
86
    connector.authorized = [SMSResource.BE_]
87
    connector.save()
88
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
89
        connector.authorize_numbers([number])
90

  
91
    # Belgian
92
    number = '0032' + '45' + '1234567'
93
    assert connector.authorize_numbers([number])
94
    connector.authorized = [SMSResource.FR_METRO]
95
    connector.save()
96
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
97
        connector.authorize_numbers([number])
98

  
99

  
54 100
@pytest.fixture(params=klasses)
55 101
def connector(request, db):
56 102
    klass = request.param
57 103
    kwargs = getattr(klass, 'TEST_DEFAULTS', {}).get('create_kwargs', {})
58 104
    kwargs.update({
59 105
        'title': klass.__name__,
60 106
        'slug': klass.__name__.lower(),
61 107
        'description': klass.__name__,
......
180 226
        'There were errors processing your form.'
181 227
    resp.html.find('div', {'class': 'error'}).text.strip() == 'This field is required.'
182 228
    resp.form['authorized'] = [SMSResource.FR_METRO, SMSResource.FR_DOMTOM]
183 229
    resp = resp.form.submit()
184 230
    resp = resp.follow()
185 231
    assert _('France mainland (+33 [67])') in [
186 232
        x.text for x in resp.html.find_all('p')
187 233
        if x.text.startswith(_('Authorized Countries'))][0]
234

  
235
    path = '/%s/%s/send/' % (connector.get_connector_slug(), connector.slug)
236
    payload = {
237
        'message': 'plop',
238
        'from': '+33699999999',
239
        'to': ['+33688888888'],
240
    }
241
    app.post_json(path, params=payload)
242
    with mock.patch.object(type(connector), 'send_msg') as send_function:
243
        send_function.return_value = {}
244
        connector.jobs()
245
    assert SMSLog.objects.count() == 1
246

  
247
    payload['to'][0] = '+33188888888'
248
    SMSLog.objects.all().delete()
249
    app.post_json(path, params=payload)
250
    with mock.patch.object(type(connector), 'send_msg') as send_function:
251
        send_function.return_value = {}
252
        connector.jobs()
253
    assert not SMSLog.objects.count()
254
    assert ResourceLog.objects.filter(levelno=logging.WARNING).count() == 1
255
    assert ResourceLog.objects.filter(levelno=30)[0].extra['exception'] == \
256
        'phone numbers not authorized: 0033188888888'
188
-