Projet

Général

Profil

0001-rest_authentication-improve-signature-errors-reporti.patch

Benjamin Dauvergne, 01 octobre 2021 11:57

Télécharger (9,7 ko)

Voir les différences:

Subject: [PATCH] rest_authentication: improve signature errors reporting
 (#57450)

 hobo/rest_authentication.py                 | 12 ++--
 hobo/signature.py                           | 78 ++++++++++++++++-----
 tests_authentic/test_rest_authentication.py | 30 ++++++--
 3 files changed, 93 insertions(+), 27 deletions(-)
hobo/rest_authentication.py
54 54
    default_code = 'invalid-signature'
55 55

  
56 56
    def __init__(self, code):
57
        self.detail = {'err': code}
57
        self.detail = {'err': 1, 'err_desc': code}
58 58

  
59 59

  
60 60
class PublikAuthentication(authentication.BaseAuthentication):
......
113 113
        if not request.GET.get('orig') or not request.GET.get('signature'):
114 114
            return None
115 115
        key = self.get_orig_key(request.GET['orig'])
116
        if not signature.check_url(full_path, key):
117
            self.logger.warning('invalid signature')
118
            raise PublikAuthenticationFailed('invalid-signature')
116
        try:
117
            assert signature.check_url(
118
                full_path, key, raise_on_error=True
119
            ), 'signature.check_url should never return False with raise_on_error'
120
        except signature.SignatureError as e:
121
            self.logger.warning('publik rest-framework-authentication failed: %s', e)
122
            raise PublikAuthenticationFailed(str(e))
119 123
        user = self.resolve_user(request)
120 124
        self.logger.info('user authenticated with signature %s', user)
121 125
        return (user, None)
hobo/signature.py
3 3
import hashlib
4 4
import hmac
5 5
import random
6
import secrets
6 7

  
7 8
from django.utils import six
8 9
from django.utils.encoding import smart_bytes
......
12 13
'''Simple signature scheme for query strings'''
13 14

  
14 15

  
16
class SignatureError(Exception):
17
    pass
18

  
19

  
15 20
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
16 21
    parsed = urlparse.urlparse(url)
17 22
    new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
......
43 48
    return hash.digest()
44 49

  
45 50

  
46
def check_url(url, key, known_nonce=None, timedelta=30):
51
def check_url(url, key, known_nonce=None, timedelta=30, raise_on_error=False):
47 52
    parsed = urlparse.urlparse(url, 'https')
48
    return check_query(parsed.query, key, known_nonce=known_nonce, timedelta=timedelta)
53
    return check_query(
54
        parsed.query, key, known_nonce=known_nonce, timedelta=timedelta, raise_on_error=raise_on_error
55
    )
49 56

  
50 57

  
51
def check_query(query, key, known_nonce=None, timedelta=30):
58
def check_query(query, key, known_nonce=None, timedelta=30, raise_on_error=False):
52 59
    parsed = urlparse.parse_qs(query)
53
    if not ('signature' in parsed and 'algo' in parsed and 'timestamp' in parsed):
60
    parsed = {key: value[0] if len(value) == 1 else value for key, value in parsed.items()}
61
    signature = parsed.get('signature')
62
    if not signature or not isinstance(signature, str):
63
        if raise_on_error:
64
            raise SignatureError('multiple/missing signature')
65
        return False
66
    algo = parsed.get('algo')
67
    if not algo or not isinstance(algo, str):
68
        if raise_on_error:
69
            raise SignatureError('multiple/missing algo')
70
        return False
71
    if algo != 'sha256':
72
        if raise_on_error:
73
            raise SignatureError('invalid algo, must be sha256')
74
        return False
75
    timestamp = parsed.get('timestamp')
76
    if not timestamp or not isinstance(timestamp, str):
77
        if raise_on_error:
78
            raise SignatureError('multiple/missing timestamp')
54 79
        return False
55 80
    if known_nonce is not None:
56
        if ('nonce' not in parsed) or known_nonce(parsed['nonce'][0]):
81
        nonce = parsed.get('nonce')
82
        if not nonce or not isinstance(nonce, str):
83
            if raise_on_error:
84
                raise SignatureError('multiple/missing nonce')
85
            return False
86
        if known_nonce(nonce):
87
            if raise_on_error:
88
                raise SignatureError('nonce replayed')
57 89
            return False
58 90
    unsigned_query, signature_content = query.split('&signature=', 1)
59 91
    if '&' in signature_content:
92
        if raise_on_error:
93
            raise SignatureError('signature is not the last parameter')
60 94
        return False  # signature must be the last parameter
61
    signature = base64.b64decode(parsed['signature'][0])
62
    algo = parsed['algo'][0]
63
    timestamp = parsed['timestamp'][0]
64
    timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
65
    if abs(datetime.datetime.utcnow() - timestamp) > datetime.timedelta(seconds=timedelta):
95
    try:
96
        signature = base64.b64decode(signature)
97
    except ValueError:
98
        if raise_on_error:
99
            raise SignatureError('signature is invalid base64')
100
        return False
101
    try:
102
        timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
103
    except ValueError as e:
104
        if raise_on_error:
105
            raise SignatureError('invalid timestamp, %s' % e)
106
        return False
107
    delta = abs(datetime.datetime.utcnow() - timestamp)
108
    if delta > datetime.timedelta(seconds=timedelta):
109
        if raise_on_error:
110
            raise SignatureError('timestamp delta is more than %s seconds: %s' % (timedelta, delta))
66 111
        return False
67
    return check_string(unsigned_query, signature, key, algo=algo)
112
    return check_string(unsigned_query, signature, key, algo=algo, raise_on_error=raise_on_error)
68 113

  
69 114

  
70
def check_string(s, signature, key, algo='sha256'):
115
def check_string(s, signature, key, algo='sha256', raise_on_error=False):
71 116
    # constant time compare
72 117
    signature2 = sign_string(s, key, algo=algo)
73
    if len(signature2) != len(signature):
118
    if not secrets.compare_digest(signature, signature2):
119
        if raise_on_error:
120
            raise SignatureError('HMAC hash is different')
74 121
        return False
75
    res = 0
76
    for a, b in zip(signature, signature2):
77
        res |= a ^ b
78
    return res == 0
122
    return True
tests_authentic/test_rest_authentication.py
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import re
18

  
17 19
import pytest
18 20
from django.contrib.auth import get_user_model
19 21
from django.test import RequestFactory
......
88 90
        publik_authentication = rest_authentication.PublikAuthentication()
89 91
        with pytest.raises(rest_authentication.PublikAuthenticationFailed) as exc_info:
90 92
            publik_authentication.authenticate(request)
91
        assert exc_info.value.detail['err'] == 'invalid-signature'
93
        assert exc_info.value.detail['err'] == 1
94
        assert exc_info.value.detail['err_desc'] == 'HMAC hash is different'
92 95

  
93 96

  
94 97
def test_response(rf, settings, tenant):
......
114 117

  
115 118
        response = view(request)
116 119
        assert response.status_code == 401
117
        assert response.data['err'] == 'no-known-services-setting'
120
        assert response.data['err'] == 1
121
        assert response.data['err_desc'] == 'no-known-services-setting'
118 122

  
119 123
        secret_key = 'bbb'
120 124
        settings.KNOWN_SERVICES = {
......
127 131

  
128 132
        response = view(request)
129 133
        assert response.status_code == 401
130
        assert response.data['err'] == 'no-secret-found-for-orig'
134
        assert response.data['err'] == 1
135
        assert response.data['err_desc'] == 'no-secret-found-for-orig'
131 136

  
132 137
        settings.KNOWN_SERVICES['whatever']['whatever']['secret'] = secret_key
133 138
        response = view(request)
134 139
        assert response.status_code == 401
135
        assert response.data['err'] == 'invalid-signature'
140
        assert response.data['err'] == 1
141
        assert response.data['err_desc'] == 'multiple/missing algo'
136 142

  
137 143
        # User authentication
138 144
        request = rf.get(signature.sign_url('/?orig=zzz&NameID=1234', secret_key))
139 145

  
140 146
        response = view(request)
141 147
        assert response.status_code == 401
142
        assert response.data == {'err': 'user-not-found'}
148
        assert response.data == {'err': 1, 'err_desc': 'user-not-found'}
149

  
150
        # Service authentication, wrong timestamp
151
        request = rf.get(
152
            re.sub('timestamp=[^&]*', 'timestamp=xxx', signature.sign_url('/?orig=zzz', secret_key))
153
        )
154

  
155
        response = view(request)
156
        assert response.status_code == 401
157
        assert response.data == {
158
            'err': 1,
159
            'err_desc': "invalid timestamp, time data 'xxx' does not match format '%Y-%m-%dT%H:%M:%SZ'",
160
        }
143 161

  
144 162
        # Service authentication
145 163
        request = rf.get(signature.sign_url('/?orig=zzz', secret_key))
......
151 169
        del settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS
152 170
        response = view(request)
153 171
        assert response.status_code == 401
154
        assert response.data == {'err': 'no-user-for-orig'}
172
        assert response.data == {'err': 1, 'err_desc': 'no-user-for-orig'}
155
-