Projet

Général

Profil

0002-sms-authorize-numbers-using-masks-39650.patch

Nicolas Roche, 19 mai 2020 19:52

Télécharger (9,49 ko)

Voir les différences:

Subject: [PATCH 2/2] sms: authorize numbers using masks (#39650)

 passerelle/sms/models.py | 38 ++++++++++++++++++++++
 tests/test_sms.py        | 69 +++++++++++++++++++++++++++++++++++++++-
 2 files changed, 106 insertions(+), 1 deletion(-)
passerelle/sms/models.py
1
# -*- coding: utf-8 -*-
1 2
# passerelle - uniform access to multiple data sources and services
2 3
# Copyright (C) 2020  Entr'ouvert
3 4
#
4 5
# This program is free software: you can redistribute it and/or modify it
5 6
# under the terms of the GNU Affero General Public License as published
6 7
# by the Free Software Foundation, either version 3 of the License, or
7 8
# (at your option) any later version.
8 9
#
......
128 129
            elif number.startswith(self.default_trunk_prefix):
129 130
                number = '00' + self.default_country_code + number[len(self.default_trunk_prefix):]
130 131
            else:
131 132
                raise APIError('phone number %r is unsupported (no international prefix, '
132 133
                               'no local trunk prefix)' % number)
133 134
            numbers.append(number)
134 135
        return numbers
135 136

  
137
    def authorize_numbers(self, destinations):
138
        unknown_numbers = set()
139
        premium_numbers = set()
140
        regexes = []
141
        if SMSResource.ALL not in self.authorized:
142
            if SMSResource.FR_METRO in self.authorized:
143
                regexes.append(r'0033[67]\d{8}')    # France
144
            if SMSResource.FR_DOMTOM in self.authorized:
145
                regexes.append(r'00262262\d{6}')    # Réunion, Mayotte, Terres australe/antarctiques
146
                regexes.append(r'508508\d{6}')      # Saint-Pierre-et-Miquelon
147
                regexes.append(r'590590\d{6}')      # Guadeloupe, Saint-Barthélemy, Saint-Martin
148
                regexes.append(r'594594\d{6}')      # Guyane
149
                regexes.append(r'596596\d{6}')      # Martinique
150
                regexes.append(r'00687[67]\d{8}')   # Nouvelle-Calédonie
151
            if SMSResource.BE in self.authorized:
152
                regexes.append(r'00324[5-9]\d{7}')  # Belgian
153

  
154
            unknown_numbers = set(destinations)
155
            for value in regexes:
156
                regex = re.compile(value)
157
                unknown_numbers = set(dest for dest in unknown_numbers if not regex.match(dest))
158

  
159
            for number in list(unknown_numbers):
160
                logging.warning('phone number does not match any authorization mask: %s', number)
161

  
162
        if SMSResource.NO == self.premium_rate:
163
            regex = re.compile(r'0033[8]\d{8}')
164
            premium_numbers = set(dest for dest in destinations if regex.match(dest))
165
        for number in list(premium_numbers):
166
            logging.warning('primium rate phone number: %s', number)
167

  
168
        unauthorized_numbers = list(unknown_numbers.union(premium_numbers))
169
        if unauthorized_numbers != []:
170
            raise APIError('phone numbers not authorized: %s' % ', '.join(unauthorized_numbers))
171
        return True
172

  
136 173
    @endpoint(perm='can_send_messages', methods=['post'])
137 174
    def send(self, request, *args, **kwargs):
138 175
        try:
139 176
            data = json_loads(request.body)
140 177
            assert isinstance(data, dict), 'JSON payload is not a dict'
141 178
            assert 'message' in data, 'missing "message" in JSON payload'
142 179
            assert 'from' in data, 'missing "from" in JSON payload'
143 180
            assert 'to' in data, 'missing "to" in JSON payload'
144 181
            assert isinstance(data['message'], six.text_type), 'message is not a string'
145 182
            assert isinstance(data['from'], six.text_type), 'from is not a string'
146 183
            assert all(map(lambda x: isinstance(x, six.text_type), data['to'])), \
147 184
                'to is not a list of strings'
148 185
        except (ValueError, AssertionError) as e:
149 186
            raise APIError('Payload error: %s' % e)
150 187
        data['message'] = data['message'][:self.max_message_length]
151 188
        data['to'] = self.clean_numbers(data['to'])
189
        self.authorize_numbers(data['to'])
152 190
        stop = not bool('nostop' in request.GET)
153 191
        logging.info('sending message %r to %r with sending number %r',
154 192
                     data['message'], data['to'], data['from'])
155 193
        # unfortunately it lacks a batch API...
156 194
        result = {'data': self.send_msg(data['message'], data['from'], data['to'], stop=stop)}
157 195
        SMSLog.objects.create(appname=self.get_connector_slug(), slug=self.slug)
158 196
        return result
159 197

  
tests/test_sms.py
8 8
#
9 9
# This program is distributed in the hope that it will be useful,
10 10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 12
# GNU Affero General Public License for more details.
13 13
#
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
import logging
16 17
import mock
17 18
import pytest
18 19

  
19 20
from django.contrib.contenttypes.models import ContentType
20 21
from django.utils.translation import ugettext as _
21 22

  
22 23
from passerelle.apps.choosit.models import ChoositSMSGateway
23 24
from passerelle.apps.ovh.models import OVHSMSGateway
24
from passerelle.base.models import ApiUser, AccessRight
25
from passerelle.base.models import ApiUser, AccessRight, ResourceLog
25 26
from passerelle.sms.models import SMSResource, SMSLog
26 27
from passerelle.utils.jsonresponse import APIError
27 28

  
28 29
from test_manager import login, admin_user
29 30

  
30 31
import utils
31 32

  
32 33
pytestmark = pytest.mark.django_db
......
44 45
    connector.save()
45 46
    assert connector.clean_numbers(['+ 33 12']) == ['003312']
46 47
    assert connector.clean_numbers(['0 0 33 12']) == ['003312']
47 48
    assert connector.clean_numbers(['1 12']) == ['003212']
48 49
    with pytest.raises(APIError, match='phone number %r is unsupported' % '0123'):
49 50
        connector.clean_numbers(['0123'])
50 51

  
51 52

  
53
def test_authorize_numbers():
54
    connector = OVHSMSGateway()
55

  
56
    # premium-rate
57
    assert connector.premium_rate == SMSResource.NO
58
    number = '0033' + '8' + '12345678'
59
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
60
        assert connector.authorize_numbers([number])
61
    connector.premium_rate = SMSResource.YES
62
    connector.save()
63

  
64
    # All country
65
    assert connector.authorized == [SMSResource.ALL]
66
    number = '0033' + '1' + '12345678'
67
    assert connector.authorize_numbers([number])
68
    connector.authorized = [SMSResource.FR_METRO]
69
    connector.save()
70
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
71
        connector.authorize_numbers([number])
72

  
73
    # France
74
    number = '0033' + '6' + '12345678'
75
    assert connector.authorize_numbers([number])
76
    connector.authorized = [SMSResource.FR_DOMTOM]
77
    connector.save()
78
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
79
        connector.authorize_numbers([number])
80

  
81
    # Dom-Tom
82
    number = '596596' + '123456'
83
    assert connector.authorize_numbers([number])
84
    connector.authorized = [SMSResource.BE]
85
    connector.save()
86
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
87
        connector.authorize_numbers([number])
88

  
89
    # Belgian
90
    number = '0032' + '45' + '1234567'
91
    assert connector.authorize_numbers([number])
92
    connector.authorized = [SMSResource.FR_METRO]
93
    connector.save()
94
    with pytest.raises(APIError, match='phone numbers not authorized: %s' % number):
95
        connector.authorize_numbers([number])
96

  
97

  
52 98
@pytest.fixture(params=klasses)
53 99
def connector(request, db):
54 100
    klass = request.param
55 101
    kwargs = getattr(klass, 'TEST_DEFAULTS', {}).get('create_kwargs', {})
56 102
    kwargs.update({
57 103
        'title': klass.__name__,
58 104
        'slug': klass.__name__.lower(),
59 105
        'description': klass.__name__,
......
154 200
        'There were errors processing your form.'
155 201
    resp.html.find('div', {'class': 'error'}).text.strip() == 'This field is required.'
156 202
    resp.form['authorized'] = [SMSResource.FR_METRO, SMSResource.FR_DOMTOM]
157 203
    resp = resp.form.submit()
158 204
    resp = resp.follow()
159 205
    assert _('France mainland (+33 [67])') in [
160 206
        x.text for x in resp.html.find_all('p')
161 207
        if x.text.startswith(_('Authorized Countries'))][0]
208

  
209
    path = '/%s/%s/send/' % (connector.get_connector_slug(), connector.slug)
210
    payload = {
211
        'message': 'plop',
212
        'from': '+33699999999',
213
        'to': ['+33688888888'],
214
    }
215
    with mock.patch.object(type(connector), 'send_msg') as send_function:
216
        send_function.return_value = {}
217
        app.post_json(path, params=payload)
218
    assert SMSLog.objects.count() == 1
219

  
220
    payload['to'][0] = '+33188888888'
221
    SMSLog.objects.all().delete()
222
    with mock.patch.object(type(connector), 'send_msg') as send_function:
223
        send_function.return_value = {}
224
        app.post_json(path, params=payload)
225
    assert not SMSLog.objects.count()
226
    assert ResourceLog.objects.filter(levelno=logging.WARNING).count() == 1
227
    assert ResourceLog.objects.filter(levelno=30)[0].extra['exception'] == \
228
        'phone numbers not authorized: 0033188888888'
162
-