From 375ad4513c050254a30f4239706c1dc820d37fea Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Thu, 30 Sep 2021 13:19:39 +0200 Subject: [PATCH] rest_authentication: improve signature errors reporting (#57450) --- hobo/rest_authentication.py | 12 ++-- hobo/signature.py | 78 ++++++++++++++++----- tests_authentic/test_rest_authentication.py | 30 ++++++-- 3 files changed, 93 insertions(+), 27 deletions(-) diff --git a/hobo/rest_authentication.py b/hobo/rest_authentication.py index ae16a4d..beb0655 100644 --- a/hobo/rest_authentication.py +++ b/hobo/rest_authentication.py @@ -54,7 +54,7 @@ class PublikAuthenticationFailed(exceptions.APIException): default_code = 'invalid-signature' def __init__(self, code): - self.detail = {'err': code} + self.detail = {'err': 1, 'err_desc': code} class PublikAuthentication(authentication.BaseAuthentication): @@ -113,9 +113,13 @@ class PublikAuthentication(authentication.BaseAuthentication): if not request.GET.get('orig') or not request.GET.get('signature'): return None key = self.get_orig_key(request.GET['orig']) - if not signature.check_url(full_path, key): - self.logger.warning('invalid signature') - raise PublikAuthenticationFailed('invalid-signature') + try: + if not signature.check_url(full_path, key, raise_on_error=True): + self.logger.warning('invalid signature') + raise PublikAuthenticationFailed('invalid-signature') + except signature.SignatureError as e: + self.logger.warning('publik authentication failed: %s', e) + raise PublikAuthenticationFailed(str(e)) user = self.resolve_user(request) self.logger.info('user authenticated with signature %s', user) return (user, None) diff --git a/hobo/signature.py b/hobo/signature.py index 5ada660..a377f7e 100644 --- a/hobo/signature.py +++ b/hobo/signature.py @@ -3,6 +3,7 @@ import datetime import hashlib import hmac import random +import secrets from django.utils import six from django.utils.encoding import smart_bytes @@ -12,6 +13,10 @@ from django.utils.six.moves.urllib import parse as urlparse '''Simple signature scheme for query strings''' +class SignatureError(Exception): + pass + + def sign_url(url, key, algo='sha256', timestamp=None, nonce=None): parsed = urlparse.urlparse(url) new_query = sign_query(parsed.query, key, algo, timestamp, nonce) @@ -43,36 +48,75 @@ def sign_string(s, key, algo='sha256'): return hash.digest() -def check_url(url, key, known_nonce=None, timedelta=30): +def check_url(url, key, known_nonce=None, timedelta=30, raise_on_error=False): parsed = urlparse.urlparse(url, 'https') - return check_query(parsed.query, key, known_nonce=known_nonce, timedelta=timedelta) + return check_query( + parsed.query, key, known_nonce=known_nonce, timedelta=timedelta, raise_on_error=raise_on_error + ) -def check_query(query, key, known_nonce=None, timedelta=30): +def check_query(query, key, known_nonce=None, timedelta=30, raise_on_error=False): parsed = urlparse.parse_qs(query) - if not ('signature' in parsed and 'algo' in parsed and 'timestamp' in parsed): + parsed = {key: value[0] if len(value) == 1 else value for key, value in parsed.items()} + signature = parsed.get('signature') + if not signature or not isinstance(signature, str): + if raise_on_error: + raise SignatureError('multiple/missing signature') + return False + algo = parsed.get('algo') + if not algo or not isinstance(algo, str): + if raise_on_error: + raise SignatureError('multiple/missing algo') + return False + if algo != 'sha256': + if raise_on_error: + raise SignatureError('invalid algo, must be sha256') + return False + timestamp = parsed.get('timestamp') + if not timestamp or not isinstance(timestamp, str): + if raise_on_error: + raise SignatureError('multiple/missing timestamp') return False if known_nonce is not None: - if ('nonce' not in parsed) or known_nonce(parsed['nonce'][0]): + nonce = parsed.get('nonce') + if not nonce or not isinstance(nonce, str): + if raise_on_error: + raise SignatureError('multiple/missing nonce') + return False + if known_nonce(nonce): + if raise_on_error: + raise SignatureError('nonce replayed') return False unsigned_query, signature_content = query.split('&signature=', 1) if '&' in signature_content: + if raise_on_error: + raise SignatureError('signature is not the last parameter') return False # signature must be the last parameter - signature = base64.b64decode(parsed['signature'][0]) - algo = parsed['algo'][0] - timestamp = parsed['timestamp'][0] - timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ') - if abs(datetime.datetime.utcnow() - timestamp) > datetime.timedelta(seconds=timedelta): + try: + signature = base64.b64decode(signature) + except ValueError: + if raise_on_error: + raise SignatureError('signature is invalid base64') + return False + try: + timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ') + except ValueError as e: + if raise_on_error: + raise SignatureError('invalid timestamp, %s' % e) + return False + delta = abs(datetime.datetime.utcnow() - timestamp) + if delta > datetime.timedelta(seconds=timedelta): + if raise_on_error: + raise SignatureError('timestamp delta is more than %s seconds: %s' % (timedelta, delta)) return False - return check_string(unsigned_query, signature, key, algo=algo) + return check_string(unsigned_query, signature, key, algo=algo, raise_on_error=raise_on_error) -def check_string(s, signature, key, algo='sha256'): +def check_string(s, signature, key, algo='sha256', raise_on_error=False): # constant time compare signature2 = sign_string(s, key, algo=algo) - if len(signature2) != len(signature): + if not secrets.compare_digest(signature, signature2): + if raise_on_error: + raise SignatureError('HMAC hash is different') return False - res = 0 - for a, b in zip(signature, signature2): - res |= a ^ b - return res == 0 + return True diff --git a/tests_authentic/test_rest_authentication.py b/tests_authentic/test_rest_authentication.py index e265027..9919c7d 100644 --- a/tests_authentic/test_rest_authentication.py +++ b/tests_authentic/test_rest_authentication.py @@ -14,6 +14,8 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import re + import pytest from django.contrib.auth import get_user_model from django.test import RequestFactory @@ -88,7 +90,8 @@ def test_publik_authentication(tenant, settings): publik_authentication = rest_authentication.PublikAuthentication() with pytest.raises(rest_authentication.PublikAuthenticationFailed) as exc_info: publik_authentication.authenticate(request) - assert exc_info.value.detail['err'] == 'invalid-signature' + assert exc_info.value.detail['err'] == 1 + assert exc_info.value.detail['err_desc'] == 'HMAC hash is different' def test_response(rf, settings, tenant): @@ -114,7 +117,8 @@ def test_response(rf, settings, tenant): response = view(request) assert response.status_code == 401 - assert response.data['err'] == 'no-known-services-setting' + assert response.data['err'] == 1 + assert response.data['err_desc'] == 'no-known-services-setting' secret_key = 'bbb' settings.KNOWN_SERVICES = { @@ -127,19 +131,33 @@ def test_response(rf, settings, tenant): response = view(request) assert response.status_code == 401 - assert response.data['err'] == 'no-secret-found-for-orig' + assert response.data['err'] == 1 + assert response.data['err_desc'] == 'no-secret-found-for-orig' settings.KNOWN_SERVICES['whatever']['whatever']['secret'] = secret_key response = view(request) assert response.status_code == 401 - assert response.data['err'] == 'invalid-signature' + assert response.data['err'] == 1 + assert response.data['err_desc'] == 'multiple/missing algo' # User authentication request = rf.get(signature.sign_url('/?orig=zzz&NameID=1234', secret_key)) response = view(request) assert response.status_code == 401 - assert response.data == {'err': 'user-not-found'} + assert response.data == {'err': 1, 'err_desc': 'user-not-found'} + + # Service authentication, wrong timestamp + request = rf.get( + re.sub('timestamp=[^&]*', 'timestamp=xxx', signature.sign_url('/?orig=zzz', secret_key)) + ) + + response = view(request) + assert response.status_code == 401 + assert response.data == { + 'err': 1, + 'err_desc': "invalid timestamp, time data 'xxx' does not match format '%Y-%m-%dT%H:%M:%SZ'", + } # Service authentication request = rf.get(signature.sign_url('/?orig=zzz', secret_key)) @@ -151,4 +169,4 @@ def test_response(rf, settings, tenant): del settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS response = view(request) assert response.status_code == 401 - assert response.data == {'err': 'no-user-for-orig'} + assert response.data == {'err': 1, 'err_desc': 'no-user-for-orig'} -- 2.33.0