From a3a66deef033ef2284738a749d55390806ef017f Mon Sep 17 00:00:00 2001 From: Valentin Deniaud Date: Wed, 8 Jul 2020 16:07:55 +0200 Subject: [PATCH 1/2] utils: add signature tools (#44159) --- chrono/settings.py | 4 + chrono/utils/requests_wrapper.py | 154 +++++++++++++++++++++++++++++++ chrono/utils/signature.py | 107 +++++++++++++++++++++ tests/settings.py | 12 +++ tests/test_requests.py | 149 ++++++++++++++++++++++++++++++ 5 files changed, 426 insertions(+) create mode 100644 chrono/utils/requests_wrapper.py create mode 100644 chrono/utils/signature.py create mode 100644 tests/test_requests.py diff --git a/chrono/settings.py b/chrono/settings.py index 37cadf3..a0bc414 100644 --- a/chrono/settings.py +++ b/chrono/settings.py @@ -161,6 +161,10 @@ MELLON_IDENTITY_PROVIDERS = [] # (see http://docs.python-requests.org/en/master/user/advanced/#proxies) REQUESTS_PROXIES = None +# timeout used in python-requests call, in seconds +# we use 28s by default: timeout just before web server, which is usually 30s +REQUESTS_TIMEOUT = 28 + local_settings_file = os.environ.get( 'CHRONO_SETTINGS_FILE', os.path.join(os.path.dirname(__file__), 'local_settings.py') ) diff --git a/chrono/utils/requests_wrapper.py b/chrono/utils/requests_wrapper.py new file mode 100644 index 0000000..5505e19 --- /dev/null +++ b/chrono/utils/requests_wrapper.py @@ -0,0 +1,154 @@ +# combo - content management system +# Copyright (C) 2015-2018 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import hashlib +import logging + +from requests import Response, Session as RequestsSession +from requests.auth import AuthBase + +from django.conf import settings +from django.core.cache import cache +from django.utils.encoding import smart_bytes +from django.utils.http import urlencode +from django.utils.six.moves.urllib import parse as urlparse +from django.utils.six import BytesIO + +from .signature import sign_url + + +class NothingInCacheException(Exception): + pass + + +class PublikSignature(AuthBase): + def __init__(self, secret): + self.secret = secret + + def __call__(self, request): + request.url = sign_url(request.url, self.secret) + return request + + +class Requests(RequestsSession): + def request(self, method, url, **kwargs): + remote_service = kwargs.pop('remote_service', None) + cache_duration = kwargs.pop('cache_duration', 15) + invalidate_cache = kwargs.pop('invalidate_cache', False) + user = kwargs.pop('user', None) + django_request = kwargs.pop('django_request', None) + without_user = kwargs.pop('without_user', False) + federation_key = kwargs.pop('federation_key', 'auto') # 'auto', 'email', 'nameid' + raise_if_not_cached = kwargs.pop('raise_if_not_cached', False) + log_errors = kwargs.pop('log_errors', True) + + # don't use persistent cookies + self.cookies.clear() + + if remote_service == 'auto': + remote_service = None + scheme, netloc, path, params, query, fragment = urlparse.urlparse(url) + for services in settings.KNOWN_SERVICES.values(): + for service in services.values(): + remote_url = service.get('url') + remote_scheme, remote_netloc, r_path, r_params, r_query, r_fragment = urlparse.urlparse( + remote_url + ) + if remote_scheme == scheme and remote_netloc == netloc: + remote_service = service + break + else: + continue + break + if remote_service: + # only keeps the path (URI) in url parameter, scheme and netloc are + # in remote_service + url = urlparse.urlunparse(('', '', path, params, query, fragment)) + else: + logging.warning('service not found in settings.KNOWN_SERVICES for %s', url) + + if remote_service: + if isinstance(user, dict): + query_params = user.copy() + elif not user or not user.is_authenticated: + if without_user: + query_params = {} + else: + query_params = {'NameID': '', 'email': ''} + else: + query_params = {} + if federation_key == 'nameid': + query_params['NameID'] = user.get_name_id() + elif federation_key == 'email': + query_params['email'] = user.email + else: # 'auto' + user_name_id = user.get_name_id() + if user_name_id: + query_params['NameID'] = user_name_id + else: + query_params['email'] = user.email + + query_params['orig'] = remote_service.get('orig') + + remote_service_base_url = remote_service.get('url') + scheme, netloc, old_path, params, old_query, fragment = urlparse.urlparse(remote_service_base_url) + + query = urlencode(query_params) + if '?' in url: + path, old_query = url.split('?', 1) + query += '&' + old_query + else: + path = url + + url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment)) + + if method == 'GET' and cache_duration: + # handle cache + cache_key = hashlib.md5(smart_bytes(url)).hexdigest() + cache_content = cache.get(cache_key) + if cache_content and not invalidate_cache: + response = Response() + response.status_code = 200 + response.raw = BytesIO(smart_bytes(cache_content)) + return response + elif raise_if_not_cached: + raise NothingInCacheException() + + if remote_service: # sign + kwargs['auth'] = PublikSignature(remote_service.get('secret')) + + kwargs['timeout'] = kwargs.get('timeout') or settings.REQUESTS_TIMEOUT + + response = super(Requests, self).request(method, url, **kwargs) + if log_errors and (response.status_code // 100 != 2): + extra = {} + if django_request: + extra['request'] = django_request + if log_errors == 'warn': + logging.warning( + 'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra + ) + else: + logging.error( + 'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra + ) + if method == 'GET' and cache_duration and (response.status_code // 100 == 2): + cache.set(cache_key, response.content, cache_duration) + + return response + + +requests = Requests() diff --git a/chrono/utils/signature.py b/chrono/utils/signature.py new file mode 100644 index 0000000..efd560e --- /dev/null +++ b/chrono/utils/signature.py @@ -0,0 +1,107 @@ +# combo - content management system +# Copyright (C) 2015-2018 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import base64 +import datetime +import hmac +import hashlib +import random + +from django.conf import settings +from django.utils.encoding import smart_bytes +from django.utils.http import quote, urlencode +from django.utils import six +from django.utils.six.moves.urllib import parse as urlparse + + +# Simple signature scheme for query strings + + +def sign_url(url, key, algo='sha256', timestamp=None, nonce=None): + parsed = urlparse.urlparse(url) + new_query = sign_query(parsed.query, key, algo, timestamp, nonce) + return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:]) + + +def sign_query(query, key, algo='sha256', timestamp=None, nonce=None): + if timestamp is None: + timestamp = datetime.datetime.utcnow() + timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') + if nonce is None: + nonce = hex(random.getrandbits(128))[2:] + new_query = query + if new_query: + new_query += '&' + new_query += urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce))) + signature = base64.b64encode(sign_string(new_query, key, algo=algo)) + new_query += '&signature=' + quote(signature) + return new_query + + +def sign_string(s, key, algo='sha256', timedelta=30): + digestmod = getattr(hashlib, algo) + hash = hmac.HMAC(smart_bytes(key), digestmod=digestmod, msg=smart_bytes(s)) + return hash.digest() + + +def check_request_signature(django_request, keys=[]): + query_string = django_request.META['QUERY_STRING'] + if not query_string: + return False + orig = django_request.GET.get('orig', '') + known_services = getattr(settings, 'KNOWN_SERVICES', None) + if known_services and orig: + for services in known_services.values(): + for service in services.values(): + if 'verif_orig' in service and service['verif_orig'] == orig: + keys.append(service['secret']) + break + return check_query(query_string, keys) + + +def check_query(query, keys, known_nonce=None, timedelta=30): + parsed = urlparse.parse_qs(query) + if not ('signature' in parsed and 'algo' in parsed and 'timestamp' in parsed and 'nonce' in parsed): + return False + unsigned_query, signature_content = query.split('&signature=', 1) + if '&' in signature_content: + return False # signature must be the last parameter + signature = base64.b64decode(parsed['signature'][0]) + algo = parsed['algo'][0] + timestamp = parsed['timestamp'][0] + timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ') + nonce = parsed['nonce'] + if known_nonce is not None and known_nonce(nonce): + return False + if abs(datetime.datetime.utcnow() - timestamp) > datetime.timedelta(seconds=timedelta): + return False + return check_string(unsigned_query, signature, keys, algo=algo) + + +def check_string(s, signature, keys, algo='sha256'): + if not isinstance(keys, list): + keys = [keys] + for key in keys: + signature2 = sign_string(s, key, algo=algo) + if len(signature2) != len(signature): + continue + res = 0 + # constant time compare + for a, b in zip(signature, signature2): + res |= a ^ b + if res == 0: + return True + return False diff --git a/tests/settings.py b/tests/settings.py index dcc1e8e..d1ace55 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -13,3 +13,15 @@ DATABASES = { 'TEST': {'NAME': 'chrono-test-%s' % os.environ.get("BRANCH_NAME", "").replace('/', '-')[:63],}, } } + +KNOWN_SERVICES = { + 'wcs': { + 'default': { + 'title': 'test', + 'url': 'http://example.org', + 'secret': 'chrono', + 'orig': 'chrono', + 'backoffice-menu-url': 'http://example.org/manage/', + } + }, +} diff --git a/tests/test_requests.py b/tests/test_requests.py new file mode 100644 index 0000000..d92f72f --- /dev/null +++ b/tests/test_requests.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- + +import mock +import pytest + +from django.contrib.auth.models import AnonymousUser +from django.utils.six.moves.urllib import parse as urlparse + +from chrono.utils.requests_wrapper import requests, NothingInCacheException +from chrono.utils.signature import check_query + + +class MockUser(object): + email = 'foo@example.net' + is_authenticated = True + + def get_name_id(self): + if self.samlized: + return 'r2d2' + return None + + def __init__(self, samlized=True): + self.samlized = samlized + + +def test_nosign(): + with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send: + requests.get('http://example.org/foo/bar/') + assert send.call_args[0][0].url == 'http://example.org/foo/bar/' + + +def test_sign(): + remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'} + with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send: + requests.get('/foo/bar/', remote_service=remote_service) + url = send.call_args[0][0].url + assert url.startswith('http://example.org/foo/bar/?') + scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url) + query = urlparse.parse_qs(querystring, keep_blank_values=True) + assert query['orig'][0] == 'myself' + assert query['email'][0] == '' + assert query['NameID'][0] == '' + assert check_query(querystring, 'secret') == True + + requests.get('/foo/bar/', remote_service=remote_service, without_user=True) + url = send.call_args[0][0].url + assert url.startswith('http://example.org/foo/bar/?') + scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url) + query = urlparse.parse_qs(querystring, keep_blank_values=True) + assert query['orig'][0] == 'myself' + assert 'email' not in query + assert 'NameID' not in query + assert check_query(querystring, 'secret') == True + + +def test_auto_sign(): + with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send: + requests.get('http://example.org/foo/bar/', remote_service='auto') + url = send.call_args[0][0].url + assert url.startswith('http://example.org/foo/bar/?') + scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url) + query = urlparse.parse_qs(querystring, keep_blank_values=True) + assert query['orig'][0] == 'chrono' + assert check_query(querystring, 'chrono') == True + + requests.get('http://doesnotexist/foo/bar/', remote_service='auto') + assert send.call_args[0][0].url == 'http://doesnotexist/foo/bar/' + + +def test_sign_user(): + remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'} + with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send: + + user = MockUser(samlized=True) + + requests.get('/foo/bar/', remote_service=remote_service, user=user) + url = send.call_args[0][0].url + assert url.startswith('http://example.org/foo/bar/?') + scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url) + query = urlparse.parse_qs(querystring, keep_blank_values=True) + assert query['NameID'][0] == 'r2d2' + assert 'email' not in query + assert query['orig'][0] == 'myself' + assert check_query(querystring, 'secret') == True + + requests.get('/foo/bar/', remote_service=remote_service, user=user, federation_key='email') + url = send.call_args[0][0].url + assert url.startswith('http://example.org/foo/bar/?') + scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url) + query = urlparse.parse_qs(querystring, keep_blank_values=True) + assert query['email'][0] == 'foo@example.net' + assert 'NameID' not in query + assert query['orig'][0] == 'myself' + assert check_query(querystring, 'secret') == True + + user = MockUser(samlized=False) + + requests.get('/foo/bar/', remote_service=remote_service, user=user) + url = send.call_args[0][0].url + assert url.startswith('http://example.org/foo/bar/?') + scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url) + query = urlparse.parse_qs(querystring, keep_blank_values=True) + assert 'NameID' not in query + assert query['email'][0] == 'foo@example.net' + assert query['orig'][0] == 'myself' + assert check_query(querystring, 'secret') == True + + +def test_sign_anonymous_user(): + remote_service = {'url': 'http://example.org', 'secret': 'secret', 'orig': 'myself'} + with mock.patch('chrono.utils.requests_wrapper.RequestsSession.send') as send: + + user = AnonymousUser() + + requests.get('/foo/bar/', remote_service=remote_service, user=user) + url = send.call_args[0][0].url + assert url.startswith('http://example.org/foo/bar/?') + scheme, netloc, path, params, querystring, fragment = urlparse.urlparse(url) + query = urlparse.parse_qs(querystring, keep_blank_values=True) + assert query['NameID'][0] == '' + assert query['email'][0] == '' + assert query['orig'][0] == 'myself' + assert check_query(querystring, 'secret') == True + + +def test_requests_cache(): + with mock.patch('chrono.utils.requests_wrapper.RequestsSession.request') as requests_get: + requests_get.return_value = mock.Mock(content=b'hello world', status_code=200) + # default cache, nothing in there + assert requests.get('http://cache.example.org/').content == b'hello world' + assert requests_get.call_count == 1 + # now there's something in cache + assert requests.get('http://cache.example.org/').content == b'hello world' + assert requests_get.call_count == 1 + # value changed + requests_get.return_value = mock.Mock(content=b'hello second world', status_code=200) + assert requests.get('http://cache.example.org/').content == b'hello world' + assert requests_get.call_count == 1 + # force cache invalidation + assert ( + requests.get('http://cache.example.org/', invalidate_cache=True).content == b'hello second world' + ) + assert requests_get.call_count == 2 + # check raise_if_not_cached + with pytest.raises(NothingInCacheException): + requests.get('http://cache.example.org/other', raise_if_not_cached=True) + + # check with unicode url + assert requests.get(u'http://cache.example.org/éléphant').content == b'hello second world' -- 2.20.1