From ef941e961f2354babec7d0099af55935d8d2e37f Mon Sep 17 00:00:00 2001 From: Valentin Deniaud Date: Wed, 8 Jul 2020 16:07:55 +0200 Subject: [PATCH 1/2] utils: add signature tools (#44159) --- chrono/utils/requests_wrapper.py | 154 +++++++++++++++++++++++++++++++ chrono/utils/signature.py | 107 +++++++++++++++++++++ 2 files changed, 261 insertions(+) create mode 100644 chrono/utils/requests_wrapper.py create mode 100644 chrono/utils/signature.py diff --git a/chrono/utils/requests_wrapper.py b/chrono/utils/requests_wrapper.py new file mode 100644 index 0000000..5505e19 --- /dev/null +++ b/chrono/utils/requests_wrapper.py @@ -0,0 +1,154 @@ +# combo - content management system +# Copyright (C) 2015-2018 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import hashlib +import logging + +from requests import Response, Session as RequestsSession +from requests.auth import AuthBase + +from django.conf import settings +from django.core.cache import cache +from django.utils.encoding import smart_bytes +from django.utils.http import urlencode +from django.utils.six.moves.urllib import parse as urlparse +from django.utils.six import BytesIO + +from .signature import sign_url + + +class NothingInCacheException(Exception): + pass + + +class PublikSignature(AuthBase): + def __init__(self, secret): + self.secret = secret + + def __call__(self, request): + request.url = sign_url(request.url, self.secret) + return request + + +class Requests(RequestsSession): + def request(self, method, url, **kwargs): + remote_service = kwargs.pop('remote_service', None) + cache_duration = kwargs.pop('cache_duration', 15) + invalidate_cache = kwargs.pop('invalidate_cache', False) + user = kwargs.pop('user', None) + django_request = kwargs.pop('django_request', None) + without_user = kwargs.pop('without_user', False) + federation_key = kwargs.pop('federation_key', 'auto') # 'auto', 'email', 'nameid' + raise_if_not_cached = kwargs.pop('raise_if_not_cached', False) + log_errors = kwargs.pop('log_errors', True) + + # don't use persistent cookies + self.cookies.clear() + + if remote_service == 'auto': + remote_service = None + scheme, netloc, path, params, query, fragment = urlparse.urlparse(url) + for services in settings.KNOWN_SERVICES.values(): + for service in services.values(): + remote_url = service.get('url') + remote_scheme, remote_netloc, r_path, r_params, r_query, r_fragment = urlparse.urlparse( + remote_url + ) + if remote_scheme == scheme and remote_netloc == netloc: + remote_service = service + break + else: + continue + break + if remote_service: + # only keeps the path (URI) in url parameter, scheme and netloc are + # in remote_service + url = urlparse.urlunparse(('', '', path, params, query, fragment)) + else: + logging.warning('service not found in settings.KNOWN_SERVICES for %s', url) + + if remote_service: + if isinstance(user, dict): + query_params = user.copy() + elif not user or not user.is_authenticated: + if without_user: + query_params = {} + else: + query_params = {'NameID': '', 'email': ''} + else: + query_params = {} + if federation_key == 'nameid': + query_params['NameID'] = user.get_name_id() + elif federation_key == 'email': + query_params['email'] = user.email + else: # 'auto' + user_name_id = user.get_name_id() + if user_name_id: + query_params['NameID'] = user_name_id + else: + query_params['email'] = user.email + + query_params['orig'] = remote_service.get('orig') + + remote_service_base_url = remote_service.get('url') + scheme, netloc, old_path, params, old_query, fragment = urlparse.urlparse(remote_service_base_url) + + query = urlencode(query_params) + if '?' in url: + path, old_query = url.split('?', 1) + query += '&' + old_query + else: + path = url + + url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment)) + + if method == 'GET' and cache_duration: + # handle cache + cache_key = hashlib.md5(smart_bytes(url)).hexdigest() + cache_content = cache.get(cache_key) + if cache_content and not invalidate_cache: + response = Response() + response.status_code = 200 + response.raw = BytesIO(smart_bytes(cache_content)) + return response + elif raise_if_not_cached: + raise NothingInCacheException() + + if remote_service: # sign + kwargs['auth'] = PublikSignature(remote_service.get('secret')) + + kwargs['timeout'] = kwargs.get('timeout') or settings.REQUESTS_TIMEOUT + + response = super(Requests, self).request(method, url, **kwargs) + if log_errors and (response.status_code // 100 != 2): + extra = {} + if django_request: + extra['request'] = django_request + if log_errors == 'warn': + logging.warning( + 'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra + ) + else: + logging.error( + 'failed to %s %s (%s)', method, response.request.url, response.status_code, extra=extra + ) + if method == 'GET' and cache_duration and (response.status_code // 100 == 2): + cache.set(cache_key, response.content, cache_duration) + + return response + + +requests = Requests() diff --git a/chrono/utils/signature.py b/chrono/utils/signature.py new file mode 100644 index 0000000..efd560e --- /dev/null +++ b/chrono/utils/signature.py @@ -0,0 +1,107 @@ +# combo - content management system +# Copyright (C) 2015-2018 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import base64 +import datetime +import hmac +import hashlib +import random + +from django.conf import settings +from django.utils.encoding import smart_bytes +from django.utils.http import quote, urlencode +from django.utils import six +from django.utils.six.moves.urllib import parse as urlparse + + +# Simple signature scheme for query strings + + +def sign_url(url, key, algo='sha256', timestamp=None, nonce=None): + parsed = urlparse.urlparse(url) + new_query = sign_query(parsed.query, key, algo, timestamp, nonce) + return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:]) + + +def sign_query(query, key, algo='sha256', timestamp=None, nonce=None): + if timestamp is None: + timestamp = datetime.datetime.utcnow() + timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ') + if nonce is None: + nonce = hex(random.getrandbits(128))[2:] + new_query = query + if new_query: + new_query += '&' + new_query += urlencode((('algo', algo), ('timestamp', timestamp), ('nonce', nonce))) + signature = base64.b64encode(sign_string(new_query, key, algo=algo)) + new_query += '&signature=' + quote(signature) + return new_query + + +def sign_string(s, key, algo='sha256', timedelta=30): + digestmod = getattr(hashlib, algo) + hash = hmac.HMAC(smart_bytes(key), digestmod=digestmod, msg=smart_bytes(s)) + return hash.digest() + + +def check_request_signature(django_request, keys=[]): + query_string = django_request.META['QUERY_STRING'] + if not query_string: + return False + orig = django_request.GET.get('orig', '') + known_services = getattr(settings, 'KNOWN_SERVICES', None) + if known_services and orig: + for services in known_services.values(): + for service in services.values(): + if 'verif_orig' in service and service['verif_orig'] == orig: + keys.append(service['secret']) + break + return check_query(query_string, keys) + + +def check_query(query, keys, known_nonce=None, timedelta=30): + parsed = urlparse.parse_qs(query) + if not ('signature' in parsed and 'algo' in parsed and 'timestamp' in parsed and 'nonce' in parsed): + return False + unsigned_query, signature_content = query.split('&signature=', 1) + if '&' in signature_content: + return False # signature must be the last parameter + signature = base64.b64decode(parsed['signature'][0]) + algo = parsed['algo'][0] + timestamp = parsed['timestamp'][0] + timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ') + nonce = parsed['nonce'] + if known_nonce is not None and known_nonce(nonce): + return False + if abs(datetime.datetime.utcnow() - timestamp) > datetime.timedelta(seconds=timedelta): + return False + return check_string(unsigned_query, signature, keys, algo=algo) + + +def check_string(s, signature, keys, algo='sha256'): + if not isinstance(keys, list): + keys = [keys] + for key in keys: + signature2 = sign_string(s, key, algo=algo) + if len(signature2) != len(signature): + continue + res = 0 + # constant time compare + for a, b in zip(signature, signature2): + res |= a ^ b + if res == 0: + return True + return False -- 2.20.1