Projet

Général

Profil

0001-utils-add-signature-tools-44159.patch

Valentin Deniaud, 03 août 2020 12:56

Télécharger (19 ko)

Voir les différences:

Subject: [PATCH 1/2] utils: add signature tools (#44159)

 chrono/settings.py               |   4 +
 chrono/utils/requests_wrapper.py | 154 +++++++++++++++++++++++++++++++
 chrono/utils/signature.py        | 107 +++++++++++++++++++++
 tests/settings.py                |  12 +++
 tests/test_requests.py           | 149 ++++++++++++++++++++++++++++++
 5 files changed, 426 insertions(+)
 create mode 100644 chrono/utils/requests_wrapper.py
 create mode 100644 chrono/utils/signature.py
 create mode 100644 tests/test_requests.py
chrono/settings.py
161 161
# (see http://docs.python-requests.org/en/master/user/advanced/#proxies)
162 162
REQUESTS_PROXIES = None
163 163

  
164
# timeout used in python-requests call, in seconds
165
# we use 28s by default: timeout just before web server, which is usually 30s
166
REQUESTS_TIMEOUT = 28
167

  
164 168
local_settings_file = os.environ.get(
165 169
    'CHRONO_SETTINGS_FILE', os.path.join(os.path.dirname(__file__), 'local_settings.py')
166 170
)
chrono/utils/requests_wrapper.py
1
# combo - content management system
2
# Copyright (C) 2015-2018  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import hashlib
18
import logging
19

  
20
from requests import Response, Session as RequestsSession
21
from requests.auth import AuthBase
22

  
23
from django.conf import settings
24
from django.core.cache import cache
25
from django.utils.encoding import smart_bytes
26
from django.utils.http import urlencode
27
from django.utils.six.moves.urllib import parse as urlparse
28
from django.utils.six import BytesIO
29

  
30
from .signature import sign_url
31

  
32

  
33
class NothingInCacheException(Exception):
34
    pass
35

  
36

  
37
class PublikSignature(AuthBase):
38
    def __init__(self, secret):
39
        self.secret = secret
40

  
41
    def __call__(self, request):
42
        request.url = sign_url(request.url, self.secret)
43
        return request
44

  
45

  
46
class Requests(RequestsSession):
47
    def request(self, method, url, **kwargs):
48
        remote_service = kwargs.pop('remote_service', None)
49
        cache_duration = kwargs.pop('cache_duration', 15)
50
        invalidate_cache = kwargs.pop('invalidate_cache', False)
51
        user = kwargs.pop('user', None)
52
        django_request = kwargs.pop('django_request', None)
53
        without_user = kwargs.pop('without_user', False)
54
        federation_key = kwargs.pop('federation_key', 'auto')  # 'auto', 'email', 'nameid'
55
        raise_if_not_cached = kwargs.pop('raise_if_not_cached', False)
56
        log_errors = kwargs.pop('log_errors', True)
57

  
58
        # don't use persistent cookies
59
        self.cookies.clear()
60

  
61
        if remote_service == 'auto':
62
            remote_service = None
63
            scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
64
            for services in settings.KNOWN_SERVICES.values():
65
                for service in services.values():
66
                    remote_url = service.get('url')
67
                    remote_scheme, remote_netloc, r_path, r_params, r_query, r_fragment = urlparse.urlparse(
68
                        remote_url
69
                    )
70
                    if remote_scheme == scheme and remote_netloc == netloc:
71
                        remote_service = service
72
                        break
73
                else:
74
                    continue
75
                break
76
            if remote_service:
77
                # only keeps the path (URI) in url parameter, scheme and netloc are
78
                # in remote_service
79
                url = urlparse.urlunparse(('', '', path, params, query, fragment))
80
            else:
81
                logging.warning('service not found in settings.KNOWN_SERVICES for %s', url)
82

  
83
        if remote_service:
84
            if isinstance(user, dict):
85
                query_params = user.copy()
86
            elif not user or not user.is_authenticated:
87
                if without_user:
88
                    query_params = {}
89
                else:
90
                    query_params = {'NameID': '', 'email': ''}
91
            else:
92
                query_params = {}
93
                if federation_key == 'nameid':
94
                    query_params['NameID'] = user.get_name_id()
95
                elif federation_key == 'email':
96
                    query_params['email'] = user.email
97
                else:  # 'auto'
98
                    user_name_id = user.get_name_id()
99
                    if user_name_id:
100
                        query_params['NameID'] = user_name_id
101
                    else:
102
                        query_params['email'] = user.email
103

  
104
            query_params['orig'] = remote_service.get('orig')
105

  
106
            remote_service_base_url = remote_service.get('url')
107
            scheme, netloc, old_path, params, old_query, fragment = urlparse.urlparse(remote_service_base_url)
108

  
109
            query = urlencode(query_params)
110
            if '?' in url:
111
                path, old_query = url.split('?', 1)
112
                query += '&' + old_query
113
            else:
114
                path = url
115

  
116
            url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
117

  
118
        if method == 'GET' and cache_duration:
119
            # handle cache
120
            cache_key = hashlib.md5(smart_bytes(url)).hexdigest()
121
            cache_content = cache.get(cache_key)
122
            if cache_content and not invalidate_cache:
123
                response = Response()
124
                response.status_code = 200
125
                response.raw = BytesIO(smart_bytes(cache_content))
126
                return response
127
            elif raise_if_not_cached:
128
                raise NothingInCacheException()
129

  
130
        if remote_service:  # sign
131
            kwargs['auth'] = PublikSignature(remote_service.get('secret'))
132

  
133
        kwargs['timeout'] = kwargs.get('timeout') or settings.REQUESTS_TIMEOUT
134

  
135
        response = super(Requests, self).request(method, url, **kwargs)
136
        if log_errors and (response.status_code // 100 != 2):
137
            extra = {}
138
            if django_request:
139
                extra['request'] = django_request
140
            if log_errors == 'warn':
141
                logging.warning(
142
                    'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra
143
                )
144
            else:
145
                logging.error(
146
                    'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra
147
                )
148
        if method == 'GET' and cache_duration and (response.status_code // 100 == 2):
149
            cache.set(cache_key, response.content, cache_duration)
150

  
151
        return response
152

  
153

  
154
requests = Requests()
chrono/utils/signature.py
1
# combo - content management system
2
# Copyright (C) 2015-2018  Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import base64
18
import datetime
19
import hmac
20
import hashlib
21
import random
22

  
23
from django.conf import settings
24
from django.utils.encoding import smart_bytes
25
from django.utils.http import quote, urlencode
26
from django.utils import six
27
from django.utils.six.moves.urllib import parse as urlparse
28

  
29

  
30
# Simple signature scheme for query strings
31

  
32

  
33
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
34
    parsed = urlparse.urlparse(url)
35
    new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
36
    return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
37

  
38

  
39
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
40
    if timestamp is None:
41
        timestamp = datetime.datetime.utcnow()
42
    timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
43
    if nonce is None:
44
        nonce = hex(random.getrandbits(128))[2:]
45
    new_query = query
46
    if new_query:
47
        new_query += '&'
48
    new_query += urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce)))
49
    signature = base64.b64encode(sign_string(new_query, key, algo=algo))
50
    new_query += '&signature=' + quote(signature)
51
    return new_query
52

  
53

  
54
def sign_string(s, key, algo='sha256', timedelta=30):
55
    digestmod = getattr(hashlib, algo)
56
    hash = hmac.HMAC(smart_bytes(key), digestmod=digestmod, msg=smart_bytes(s))
57
    return hash.digest()
58

  
59

  
60
def check_request_signature(django_request, keys=[]):
61
    query_string = django_request.META['QUERY_STRING']
62
    if not query_string:
63
        return False
64
    orig = django_request.GET.get('orig', '')
65
    known_services = getattr(settings, 'KNOWN_SERVICES', None)
66
    if known_services and orig:
67
        for services in known_services.values():
68
            for service in services.values():
69
                if 'verif_orig' in service and service['verif_orig'] == orig:
70
                    keys.append(service['secret'])
71
                    break
72
    return check_query(query_string, keys)
73

  
74

  
75
def check_query(query, keys, known_nonce=None, timedelta=30):
76
    parsed = urlparse.parse_qs(query)
77
    if not ('signature' in parsed and 'algo' in parsed and 'timestamp' in parsed and 'nonce' in parsed):
78
        return False
79
    unsigned_query, signature_content = query.split('&signature=', 1)
80
    if '&' in signature_content:
81
        return False  # signature must be the last parameter
82
    signature = base64.b64decode(parsed['signature'][0])
83
    algo = parsed['algo'][0]
84
    timestamp = parsed['timestamp'][0]
85
    timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
86
    nonce = parsed['nonce']
87
    if known_nonce is not None and known_nonce(nonce):
88
        return False
89
    if abs(datetime.datetime.utcnow() - timestamp) > datetime.timedelta(seconds=timedelta):
90
        return False
91
    return check_string(unsigned_query, signature, keys, algo=algo)
92

  
93

  
94
def check_string(s, signature, keys, algo='sha256'):
95
    if not isinstance(keys, list):
96
        keys = [keys]
97
    for key in keys:
98
        signature2 = sign_string(s, key, algo=algo)
99
        if len(signature2) != len(signature):
100
            continue
101
        res = 0
102
        # constant time compare
103
        for a, b in zip(signature, signature2):
104
            res |= a ^ b
105
        if res == 0:
106
            return True
107
    return False
tests/settings.py
13 13
        'TEST': {'NAME': 'chrono-test-%s' % os.environ.get("BRANCH_NAME", "").replace('/', '-')[:63],},
14 14
    }
15 15
}
16

  
17
KNOWN_SERVICES = {
18
    'wcs': {
19
        'default': {
20
            'title': 'test',
21
            'url': 'http://example.org',
22
            'secret': 'chrono',
23
            'orig': 'chrono',
24
            'backoffice-menu-url': 'http://example.org/manage/',
25
        }
26
    },
27
}
tests/test_requests.py
1
# -*- coding: utf-8 -*-
2

  
3
import mock
4
import pytest
5

  
6
from django.contrib.auth.models import AnonymousUser
7
from django.utils.six.moves.urllib import parse as urlparse
8

  
9
from chrono.utils.requests_wrapper import requests, NothingInCacheException
10
from chrono.utils.signature import check_query
11

  
12

  
13
class MockUser(object):
14
    email = 'foo@example.net'
15
    is_authenticated = True
16

  
17
    def get_name_id(self):
18
        if self.samlized:
19
            return 'r2d2'
20
        return None
21

  
22
    def __init__(self, samlized=True):
23
        self.samlized = samlized
24

  
25

  
26
def test_nosign():
27
    with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send:
28
        requests.get('http://example.org/foo/bar/')
29
        assert send.call_args[0][0].url == 'http://example.org/foo/bar/'
30

  
31

  
32
def test_sign():
33
    remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'}
34
    with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send:
35
        requests.get('/foo/bar/', remote_service=remote_service)
36
        url = send.call_args[0][0].url
37
        assert url.startswith('http://example.org/foo/bar/?')
38
        scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
39
        query = urlparse.parse_qs(querystring, keep_blank_values=True)
40
        assert query['orig'][0] == 'myself'
41
        assert query['email'][0] == ''
42
        assert query['NameID'][0] == ''
43
        assert check_query(querystring, 'secret') == True
44

  
45
        requests.get('/foo/bar/', remote_service=remote_service, without_user=True)
46
        url = send.call_args[0][0].url
47
        assert url.startswith('http://example.org/foo/bar/?')
48
        scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
49
        query = urlparse.parse_qs(querystring, keep_blank_values=True)
50
        assert query['orig'][0] == 'myself'
51
        assert 'email' not in query
52
        assert 'NameID' not in query
53
        assert check_query(querystring, 'secret') == True
54

  
55

  
56
def test_auto_sign():
57
    with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send:
58
        requests.get('http://example.org/foo/bar/', remote_service='auto')
59
        url = send.call_args[0][0].url
60
        assert url.startswith('http://example.org/foo/bar/?')
61
        scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
62
        query = urlparse.parse_qs(querystring, keep_blank_values=True)
63
        assert query['orig'][0] == 'chrono'
64
        assert check_query(querystring, 'chrono') == True
65

  
66
        requests.get('http://doesnotexist/foo/bar/', remote_service='auto')
67
        assert send.call_args[0][0].url == 'http://doesnotexist/foo/bar/'
68

  
69

  
70
def test_sign_user():
71
    remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'}
72
    with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send:
73

  
74
        user = MockUser(samlized=True)
75

  
76
        requests.get('/foo/bar/', remote_service=remote_service, user=user)
77
        url = send.call_args[0][0].url
78
        assert url.startswith('http://example.org/foo/bar/?')
79
        scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
80
        query = urlparse.parse_qs(querystring, keep_blank_values=True)
81
        assert query['NameID'][0] == 'r2d2'
82
        assert 'email' not in query
83
        assert query['orig'][0] == 'myself'
84
        assert check_query(querystring, 'secret') == True
85

  
86
        requests.get('/foo/bar/', remote_service=remote_service, user=user, federation_key='email')
87
        url = send.call_args[0][0].url
88
        assert url.startswith('http://example.org/foo/bar/?')
89
        scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
90
        query = urlparse.parse_qs(querystring, keep_blank_values=True)
91
        assert query['email'][0] == 'foo@example.net'
92
        assert 'NameID' not in query
93
        assert query['orig'][0] == 'myself'
94
        assert check_query(querystring, 'secret') == True
95

  
96
        user = MockUser(samlized=False)
97

  
98
        requests.get('/foo/bar/', remote_service=remote_service, user=user)
99
        url = send.call_args[0][0].url
100
        assert url.startswith('http://example.org/foo/bar/?')
101
        scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
102
        query = urlparse.parse_qs(querystring, keep_blank_values=True)
103
        assert 'NameID' not in query
104
        assert query['email'][0] == 'foo@example.net'
105
        assert query['orig'][0] == 'myself'
106
        assert check_query(querystring, 'secret') == True
107

  
108

  
109
def test_sign_anonymous_user():
110
    remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'}
111
    with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send:
112

  
113
        user = AnonymousUser()
114

  
115
        requests.get('/foo/bar/', remote_service=remote_service, user=user)
116
        url = send.call_args[0][0].url
117
        assert url.startswith('http://example.org/foo/bar/?')
118
        scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url)
119
        query = urlparse.parse_qs(querystring, keep_blank_values=True)
120
        assert query['NameID'][0] == ''
121
        assert query['email'][0] == ''
122
        assert query['orig'][0] == 'myself'
123
        assert check_query(querystring, 'secret') == True
124

  
125

  
126
def test_requests_cache():
127
    with mock.patch('chrono.utils.requests_wrapper.RequestsSession.request') as requests_get:
128
        requests_get.return_value = mock.Mock(content=b'hello world', status_code=200)
129
        # default cache, nothing in there
130
        assert requests.get('http://cache.example.org/').content == b'hello world'
131
        assert requests_get.call_count == 1
132
        # now there's something in cache
133
        assert requests.get('http://cache.example.org/').content == b'hello world'
134
        assert requests_get.call_count == 1
135
        # value changed
136
        requests_get.return_value = mock.Mock(content=b'hello second world', status_code=200)
137
        assert requests.get('http://cache.example.org/').content == b'hello world'
138
        assert requests_get.call_count == 1
139
        # force cache invalidation
140
        assert (
141
            requests.get('http://cache.example.org/', invalidate_cache=True).content == b'hello second world'
142
        )
143
        assert requests_get.call_count == 2
144
        # check raise_if_not_cached
145
        with pytest.raises(NothingInCacheException):
146
            requests.get('http://cache.example.org/other', raise_if_not_cached=True)
147

  
148
        # check with unicode url
149
        assert requests.get(u'http://cache.example.org/éléphant').content == b'hello second world'
0
-