Projet

Général

Profil

0002-sms-authorize-numbers-using-masks-39650.patch

Nicolas Roche, 29 mai 2020 12:35

Télécharger (9,51 ko)

Voir les différences:

Subject: [PATCH 2/2] sms: authorize numbers using masks (#39650)

 passerelle/sms/models.py | 38 +++++++++++++++++++++
 tests/test_sms.py        | 71 +++++++++++++++++++++++++++++++++++++++-
 2 files changed, 108 insertions(+), 1 deletion(-)
passerelle/sms/models.py
1
# -*- coding: utf-8 -*-
1 2
# passerelle - uniform access to multiple data sources and services
2 3
# Copyright (C) 2020  Entr'ouvert
3 4
#
4 5
# This program is free software: you can redistribute it and/or modify it
5 6
# under the terms of the GNU Affero General Public License as published
6 7
# by the Free Software Foundation, either version 3 of the License, or
7 8
# (at your option) any later version.
8 9
#
......
132 133
            elif number.startswith(self.default_trunk_prefix):
133 134
                number = '00' + self.default_country_code + number[len(self.default_trunk_prefix):]
134 135
            else:
135 136
                raise APIError('phone number %r is unsupported (no international prefix, '
136 137
                               'no local trunk prefix)' % number)
137 138
            numbers.append(number)
138 139
        return numbers
139 140

  
141
    def authorize_numbers(self, destinations):
142
        unknown_numbers = set()
143
        premium_numbers = set()
144
        regexes = []
145
        if SMSResource.ALL not in self.authorized:
146
            if SMSResource.FR_METRO in self.authorized:
147
                regexes.append(r'0033[67]\d{8}')    # France
148
            if SMSResource.FR_DOMTOM in self.authorized:
149
                regexes.append(r'00262262\d{6}')    # Réunion, Mayotte, Terres australe/antarctiques
150
                regexes.append(r'508508\d{6}')      # Saint-Pierre-et-Miquelon
151
                regexes.append(r'590590\d{6}')      # Guadeloupe, Saint-Barthélemy, Saint-Martin
152
                regexes.append(r'594594\d{6}')      # Guyane
153
                regexes.append(r'596596\d{6}')      # Martinique
154
                regexes.append(r'00687[67]\d{8}')   # Nouvelle-Calédonie
155
            if SMSResource.BE_ in self.authorized:
156
                regexes.append(r'00324[5-9]\d{7}')  # Belgian
157

  
158
            unknown_numbers = set(destinations)
159
            for value in regexes:
160
                regex = re.compile(value)
161
                unknown_numbers = set(dest for dest in unknown_numbers if not regex.match(dest))
162

  
163
            for number in list(unknown_numbers):
164
                logging.warning('phone number does not match any authorization mask: %s', number)
165

  
166
        if SMSResource.NO_ == self.premium_rate:
167
            regex = re.compile(r'0033[8]\d{8}')
168
            premium_numbers = set(dest for dest in destinations if regex.match(dest))
169
        for number in list(premium_numbers):
170
            logging.warning('primium rate phone number: %s', number)
171

  
172
        unauthorized_numbers = list(unknown_numbers.union(premium_numbers))
173
        if unauthorized_numbers != []:
174
            raise APIError('phone numbers not authorized: %s' % ', '.join(unauthorized_numbers))
175
        return True
176

  
140 177
    @endpoint(perm='can_send_messages', methods=['post'])
141 178
    def send(self, request, *args, **kwargs):
142 179
        try:
143 180
            data = json_loads(request.body)
144 181
            assert isinstance(data, dict), 'JSON payload is not a dict'
145 182
            assert 'message' in data, 'missing "message" in JSON payload'
146 183
            assert 'from' in data, 'missing "from" in JSON payload'
147 184
            assert 'to' in data, 'missing "to" in JSON payload'
......
149 186
            assert isinstance(data['from'], six.text_type), 'from is not a string'
150 187
            assert all(map(lambda x: isinstance(x, six.text_type), data['to'])), \
151 188
                'to is not a list of strings'
152 189
        except (ValueError, AssertionError) as e:
153 190
            raise APIError('Payload error: %s' % e)
154 191
        data['message'] = data['message'][:self.max_message_length]
155 192
        data['to'] = self.clean_numbers(data['to'])
156 193
        logging.info('sending SMS to %r from %r', data['to'], data['from'])
194
        self.authorize_numbers(data['to'])
157 195
        stop = not bool('nostop' in request.GET)
158 196
        self.add_job('send_job',
159 197
                     text=data['message'], sender=data['from'], destinations=data['to'],
160 198
                     stop=stop)
161 199
        return {'err': 0}
162 200

  
163 201
    def send_job(self, *args, **kwargs):
164 202
        self.send_msg(**kwargs)
tests/test_sms.py
9 9
# This program is distributed in the hope that it will be useful,
10 10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 12
# GNU Affero General Public License for more details.
13 13
#
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16
import isodate
17
import logging
17 18
import mock
18 19
import pytest
19 20
from requests import RequestException
20 21

  
21 22
from django.contrib.contenttypes.models import ContentType
22 23
from django.utils.translation import ugettext as _
23 24

  
24 25
from passerelle.apps.choosit.models import ChoositSMSGateway
25 26
from passerelle.apps.ovh.models import OVHSMSGateway
26
from passerelle.base.models import ApiUser, AccessRight, Job
27
from passerelle.base.models import ApiUser, AccessRight, Job, ResourceLog
27 28
from passerelle.sms.models import SMSResource, SMSLog
28 29
from passerelle.utils.jsonresponse import APIError
29 30

  
30 31
from test_manager import login, admin_user
31 32

  
32 33
import utils
33 34

  
34 35
pytestmark = pytest.mark.django_db
......
46 47
    connector.save()
47 48
    assert connector.clean_numbers(['+ 33 12']) == ['003312']
48 49
    assert connector.clean_numbers(['0 0 33 12']) == ['003312']
49 50
    assert connector.clean_numbers(['1 12']) == ['003212']
50 51
    with pytest.raises(APIError, match='phone number %r is unsupported' % '0123'):
51 52
        connector.clean_numbers(['0123'])
52 53

  
53 54

  
55
def test_authorize_numbers():
56
    connector = OVHSMSGateway()
57

  
58
    # premium-rate
59
    assert connector.premium_rate == SMSResource.NO_
60
    number = '0033' + '8' + '12345678'
61
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
62
        assert connector.authorize_numbers([number])
63
    connector.premium_rate = SMSResource.YES
64
    connector.save()
65

  
66
    # All country
67
    assert connector.authorized == [SMSResource.ALL]
68
    number = '0033' + '1' + '12345678'
69
    assert connector.authorize_numbers([number])
70
    connector.authorized = [SMSResource.FR_METRO]
71
    connector.save()
72
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
73
        connector.authorize_numbers([number])
74

  
75
    # France
76
    number = '0033' + '6' + '12345678'
77
    assert connector.authorize_numbers([number])
78
    connector.authorized = [SMSResource.FR_DOMTOM]
79
    connector.save()
80
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
81
        connector.authorize_numbers([number])
82

  
83
    # Dom-Tom
84
    number = '596596' + '123456'
85
    assert connector.authorize_numbers([number])
86
    connector.authorized = [SMSResource.BE_]
87
    connector.save()
88
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
89
        connector.authorize_numbers([number])
90

  
91
    # Belgian
92
    number = '0032' + '45' + '1234567'
93
    assert connector.authorize_numbers([number])
94
    connector.authorized = [SMSResource.FR_METRO]
95
    connector.save()
96
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
97
        connector.authorize_numbers([number])
98

  
99

  
54 100
@pytest.fixture(params=klasses)
55 101
def connector(request, db):
56 102
    klass = request.param
57 103
    kwargs = getattr(klass, 'TEST_DEFAULTS', {}).get('create_kwargs', {})
58 104
    kwargs.update({
59 105
        'title': klass.__name__,
60 106
        'slug': klass.__name__.lower(),
61 107
        'description': klass.__name__,
......
180 226
        'There were errors processing your form.'
181 227
    resp.html.find('div', {'class': 'error'}).text.strip() == 'This field is required.'
182 228
    resp.form['authorized'] = [SMSResource.FR_METRO, SMSResource.FR_DOMTOM]
183 229
    resp = resp.form.submit()
184 230
    resp = resp.follow()
185 231
    assert _('France mainland (+33 [67])') in [
186 232
        x.text for x in resp.html.find_all('p')
187 233
        if x.text.startswith(_('Authorized Countries'))][0]
234

  
235
    path = '/%s/%s/send/' % (connector.get_connector_slug(), connector.slug)
236
    payload = {
237
        'message': 'plop',
238
        'from': '+33699999999',
239
        'to': ['+33688888888'],
240
    }
241
    app.post_json(path, params=payload)
242
    with mock.patch.object(type(connector), 'send_msg') as send_function:
243
        send_function.return_value = {}
244
        connector.jobs()
245
    assert SMSLog.objects.count() == 1
246

  
247
    payload['to'][0] = '+33188888888'
248
    SMSLog.objects.all().delete()
249
    app.post_json(path, params=payload)
250
    with mock.patch.object(type(connector), 'send_msg') as send_function:
251
        send_function.return_value = {}
252
        connector.jobs()
253
    assert not SMSLog.objects.count()
254
    assert ResourceLog.objects.filter(levelno=logging.WARNING).count() == 1
255
    assert ResourceLog.objects.filter(levelno=30)[0].extra['exception'] == \
256
        'phone numbers not authorized: 0033188888888'
188
-