Projet

Général

Profil

0001-requests_wrapper-sign-URL-of-prepared-requests-35225.patch

Serghei Mihai, 06 août 2019 10:11

Télécharger (9,69 ko)

Voir les différences:

Subject: [PATCH] requests_wrapper: sign URL of prepared requests (#35225)

 combo/utils/requests_wrapper.py  | 14 +++++++++++--
 tests/test_lingo_remote_regie.py |  2 --
 tests/test_newsletters_cell.py   | 16 +++++++++------
 tests/test_requests.py           | 34 +++++++++++++++++++-------------
 4 files changed, 42 insertions(+), 24 deletions(-)
combo/utils/requests_wrapper.py
18 18
import logging
19 19

  
20 20
from requests import Response, Session as RequestsSession
21
from requests.auth import AuthBase
21 22

  
22 23
from django.conf import settings
23 24
from django.core.cache import cache
......
32 33
    pass
33 34

  
34 35

  
36
class PublikSignature(AuthBase):
37
    def __init__(self, secret):
38
        self.secret = secret
39

  
40
    def __call__(self, request):
41
        request.url = sign_url(request.url, self.secret)
42
        return request
43

  
44

  
35 45
class Requests(RequestsSession):
36 46

  
37 47
    def request(self, method, url, **kwargs):
......
117 127
            elif raise_if_not_cached:
118 128
                raise NothingInCacheException()
119 129

  
120
        if remote_service: # sign
121
            url = sign_url(url, remote_service.get('secret'))
130
        if remote_service:  # sign
131
            kwargs['auth'] = PublikSignature(remote_service.get('secret'))
122 132

  
123 133
        kwargs['timeout'] = kwargs.get('timeout') or settings.REQUESTS_TIMEOUT
124 134

  
tests/test_lingo_remote_regie.py
119 119
    query = urlparse.parse_qs(querystring, keep_blank_values=True)
120 120
    assert query['NameID'][0] == 'r2d2'
121 121
    assert query['orig'][0] == 'combo'
122
    assert check_query(querystring, 'combo') == True
123 122

  
124 123
    # with no invoice
125 124
    ws_invoices = {'err': 0, 'data': []}
......
164 163
    query = urlparse.parse_qs(querystring, keep_blank_values=True)
165 164
    assert query['NameID'][0] == 'r2d2'
166 165
    assert query['orig'][0] == 'combo'
167
    assert check_query(querystring, 'combo') == True
168 166

  
169 167
    # with no invoice
170 168
    ws_invoices = {'err': 0, 'data': []}
tests/test_newsletters_cell.py
6 6

  
7 7
from django.template.defaultfilters import slugify
8 8
from django.contrib.auth.models import User
9
from django.utils.six.moves.urllib import parse as urlparse
9 10

  
10 11
from combo.data.models import Page
11 12
from combo.apps.newsletters.models import NewslettersCell, SubscriptionsSaveError
......
147 148
    assert cell.get_subscriptions(user) == expected_subscriptions
148 149
    assert mock_get.call_args[1]['user'].email == USER_EMAIL
149 150

  
150
@mock.patch('combo.utils.requests_wrapper.RequestsSession.request')
151
def test_get_subscriptions_signature_check(mock_get, cell, user):
151
@mock.patch('combo.utils.requests_wrapper.RequestsSession.send')
152
def test_get_subscriptions_signature_check(send, cell, user):
152 153
    restrictions = ('mail', 'sms')
153 154
    cell.transports_restrictions = ','.join(restrictions)
154 155
    expected_subscriptions = []
......
160 161
    mock_json = mock.Mock(status_code=200)
161 162
    mock_json.json.return_value = {'err': 0,
162 163
                                   'data': SUBSCRIPTIONS}
163
    mock_get.return_value = mock_json
164
    send.return_value = mock_json
164 165
    cell.get_subscriptions(user)
165
    assert 'orig=combo' in mock_get.call_args[0][1]
166
    assert 'signature=' in mock_get.call_args[0][1]
167
    assert 'nonce=' in mock_get.call_args[0][1]
166
    url = send.call_args[0][0].url
167
    scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
168
    query = urlparse.parse_qs(querystring, keep_blank_values=True)
169
    assert query['orig'][0] == 'combo'
170
    assert 'signature' in query
171
    assert 'nonce' in query
168 172

  
169 173

  
170 174
def mocked_requests_connection_error(*args, **kwargs):
tests/test_requests.py
23 23

  
24 24

  
25 25
def test_nosign():
26
    with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
26
    with mock.patch('combo.utils.requests_wrapper.RequestsSession.send') as send:
27 27
        requests.get('http://example.org/foo/bar/')
28
        assert request.call_args[0][1] == 'http://example.org/foo/bar/'
28
        assert send.call_args[0][0].url == 'http://example.org/foo/bar/'
29 29

  
30 30
def test_sign():
31 31
    remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'}
32
    with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
32
    with mock.patch('combo.utils.requests_wrapper.RequestsSession.send') as send:
33 33
        requests.get('/foo/bar/', remote_service=remote_service)
34
        url = request.call_args[0][1]
34
        url = send.call_args[0][0].url
35 35
        assert url.startswith('http://example.org/foo/bar/?')
36 36
        scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
37

  
38
        # make sure signature is the last parameter
39
        params = querystring.split('&')
40
        assert params[-1].startswith('signature=')
41

  
37 42
        query = urlparse.parse_qs(querystring, keep_blank_values=True)
38 43
        assert query['orig'][0] == 'myself'
39 44
        assert query['email'][0]== ''
40 45
        assert query['NameID'][0]== ''
46
        assert 'signature' in query
41 47
        assert check_query(querystring, 'secret') == True
42 48

  
43 49
        requests.get('/foo/bar/', remote_service=remote_service, without_user=True)
44
        url = request.call_args[0][1]
50
        url = send.call_args[0][0].url
45 51
        assert url.startswith('http://example.org/foo/bar/?')
46 52
        scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
47 53
        query = urlparse.parse_qs(querystring, keep_blank_values=True)
......
52 58

  
53 59

  
54 60
def test_auto_sign():
55
    with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
61
    with mock.patch('combo.utils.requests_wrapper.RequestsSession.send') as send:
56 62
        requests.get('http://example.org/foo/bar/', remote_service='auto')
57
        url = request.call_args[0][1]
63
        url = send.call_args[0][0].url
58 64
        assert url.startswith('http://example.org/foo/bar/?')
59 65
        scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
60 66
        query = urlparse.parse_qs(querystring, keep_blank_values=True)
......
62 68
        assert check_query(querystring, 'combo') == True
63 69

  
64 70
        requests.get('http://doesnotexist/foo/bar/', remote_service='auto')
65
        assert request.call_args[0][1] == 'http://doesnotexist/foo/bar/'
71
        assert send.call_args[0][0].url == 'http://doesnotexist/foo/bar/'
66 72

  
67 73

  
68 74
def test_sign_user():
69 75
    remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'}
70
    with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
76
    with mock.patch('combo.utils.requests_wrapper.RequestsSession.send') as send:
71 77

  
72 78
        user = MockUser(samlized=True)
73 79

  
74 80
        requests.get('/foo/bar/', remote_service=remote_service, user=user)
75
        url = request.call_args[0][1]
81
        url = send.call_args[0][0].url
76 82
        assert url.startswith('http://example.org/foo/bar/?')
77 83
        scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
78 84
        query = urlparse.parse_qs(querystring, keep_blank_values=True)
......
83 89

  
84 90
        requests.get('/foo/bar/', remote_service=remote_service, user=user,
85 91
                     federation_key='email')
86
        url = request.call_args[0][1]
92
        url = send.call_args[0][0].url
87 93
        assert url.startswith('http://example.org/foo/bar/?')
88 94
        scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
89 95
        query = urlparse.parse_qs(querystring, keep_blank_values=True)
......
96 102
        user = MockUser(samlized=False)
97 103

  
98 104
        requests.get('/foo/bar/', remote_service=remote_service, user=user)
99
        url = request.call_args[0][1]
105
        url = send.call_args[0][0].url
100 106
        assert url.startswith('http://example.org/foo/bar/?')
101 107
        scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
102 108
        query = urlparse.parse_qs(querystring, keep_blank_values=True)
......
108 114

  
109 115
def test_sign_anonymous_user():
110 116
    remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'}
111
    with mock.patch('combo.utils.requests_wrapper.RequestsSession.request') as request:
117
    with mock.patch('combo.utils.requests_wrapper.RequestsSession.send') as send:
112 118

  
113 119
        user = AnonymousUser()
114 120

  
115 121
        requests.get('/foo/bar/', remote_service=remote_service, user=user)
116
        url = request.call_args[0][1]
122
        url = send.call_args[0][0].url
117 123
        assert url.startswith('http://example.org/foo/bar/?')
118 124
        scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
119 125
        query = urlparse.parse_qs(querystring, keep_blank_values=True)
120
-