0001-rest_authentication-improve-signature-errors-reporti.patch
hobo/rest_authentication.py | ||
---|---|---|
54 | 54 |
default_code = 'invalid-signature' |
55 | 55 | |
56 | 56 |
def __init__(self, code): |
57 |
self.detail = {'err': code} |
|
57 |
self.detail = {'err': 1, 'err_desc': code}
|
|
58 | 58 | |
59 | 59 | |
60 | 60 |
class PublikAuthentication(authentication.BaseAuthentication): |
... | ... | |
113 | 113 |
if not request.GET.get('orig') or not request.GET.get('signature'): |
114 | 114 |
return None |
115 | 115 |
key = self.get_orig_key(request.GET['orig']) |
116 |
if not signature.check_url(full_path, key): |
|
117 |
self.logger.warning('invalid signature') |
|
118 |
raise PublikAuthenticationFailed('invalid-signature') |
|
116 |
try: |
|
117 |
assert signature.check_url( |
|
118 |
full_path, key, raise_on_error=True |
|
119 |
), 'signature.check_url should never return False with raise_on_error' |
|
120 |
except signature.SignatureError as e: |
|
121 |
self.logger.warning('publik rest-framework-authentication failed: %s', e) |
|
122 |
raise PublikAuthenticationFailed(str(e)) |
|
119 | 123 |
user = self.resolve_user(request) |
120 | 124 |
self.logger.info('user authenticated with signature %s', user) |
121 | 125 |
return (user, None) |
hobo/signature.py | ||
---|---|---|
3 | 3 |
import hashlib |
4 | 4 |
import hmac |
5 | 5 |
import random |
6 |
import secrets |
|
6 | 7 | |
7 | 8 |
from django.utils import six |
8 | 9 |
from django.utils.encoding import smart_bytes |
... | ... | |
12 | 13 |
'''Simple signature scheme for query strings''' |
13 | 14 | |
14 | 15 | |
16 |
class SignatureError(Exception): |
|
17 |
pass |
|
18 | ||
19 | ||
15 | 20 |
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None): |
16 | 21 |
parsed = urlparse.urlparse(url) |
17 | 22 |
new_query = sign_query(parsed.query, key, algo, timestamp, nonce) |
... | ... | |
43 | 48 |
return hash.digest() |
44 | 49 | |
45 | 50 | |
46 |
def check_url(url, key, known_nonce=None, timedelta=30): |
|
51 |
def check_url(url, key, known_nonce=None, timedelta=30, raise_on_error=False):
|
|
47 | 52 |
parsed = urlparse.urlparse(url, 'https') |
48 |
return check_query(parsed.query, key, known_nonce=known_nonce, timedelta=timedelta) |
|
53 |
return check_query( |
|
54 |
parsed.query, key, known_nonce=known_nonce, timedelta=timedelta, raise_on_error=raise_on_error |
|
55 |
) |
|
49 | 56 | |
50 | 57 | |
51 |
def check_query(query, key, known_nonce=None, timedelta=30): |
|
58 |
def check_query(query, key, known_nonce=None, timedelta=30, raise_on_error=False):
|
|
52 | 59 |
parsed = urlparse.parse_qs(query) |
53 |
if not ('signature' in parsed and 'algo' in parsed and 'timestamp' in parsed): |
|
60 |
parsed = {key: value[0] if len(value) == 1 else value for key, value in parsed.items()} |
|
61 |
signature = parsed.get('signature') |
|
62 |
if not signature or not isinstance(signature, str): |
|
63 |
if raise_on_error: |
|
64 |
raise SignatureError('multiple/missing signature') |
|
65 |
return False |
|
66 |
algo = parsed.get('algo') |
|
67 |
if not algo or not isinstance(algo, str): |
|
68 |
if raise_on_error: |
|
69 |
raise SignatureError('multiple/missing algo') |
|
70 |
return False |
|
71 |
if algo != 'sha256': |
|
72 |
if raise_on_error: |
|
73 |
raise SignatureError('invalid algo, must be sha256') |
|
74 |
return False |
|
75 |
timestamp = parsed.get('timestamp') |
|
76 |
if not timestamp or not isinstance(timestamp, str): |
|
77 |
if raise_on_error: |
|
78 |
raise SignatureError('multiple/missing timestamp') |
|
54 | 79 |
return False |
55 | 80 |
if known_nonce is not None: |
56 |
if ('nonce' not in parsed) or known_nonce(parsed['nonce'][0]): |
|
81 |
nonce = parsed.get('nonce') |
|
82 |
if not nonce or not isinstance(nonce, str): |
|
83 |
if raise_on_error: |
|
84 |
raise SignatureError('multiple/missing nonce') |
|
85 |
return False |
|
86 |
if known_nonce(nonce): |
|
87 |
if raise_on_error: |
|
88 |
raise SignatureError('nonce replayed') |
|
57 | 89 |
return False |
58 | 90 |
unsigned_query, signature_content = query.split('&signature=', 1) |
59 | 91 |
if '&' in signature_content: |
92 |
if raise_on_error: |
|
93 |
raise SignatureError('signature is not the last parameter') |
|
60 | 94 |
return False # signature must be the last parameter |
61 |
signature = base64.b64decode(parsed['signature'][0]) |
|
62 |
algo = parsed['algo'][0] |
|
63 |
timestamp = parsed['timestamp'][0] |
|
64 |
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ') |
|
65 |
if abs(datetime.datetime.utcnow() - timestamp) > datetime.timedelta(seconds=timedelta): |
|
95 |
try: |
|
96 |
signature = base64.b64decode(signature) |
|
97 |
except ValueError: |
|
98 |
if raise_on_error: |
|
99 |
raise SignatureError('signature is invalid base64') |
|
100 |
return False |
|
101 |
try: |
|
102 |
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ') |
|
103 |
except ValueError as e: |
|
104 |
if raise_on_error: |
|
105 |
raise SignatureError('invalid timestamp, %s' % e) |
|
106 |
return False |
|
107 |
delta = abs(datetime.datetime.utcnow() - timestamp) |
|
108 |
if delta > datetime.timedelta(seconds=timedelta): |
|
109 |
if raise_on_error: |
|
110 |
raise SignatureError('timestamp delta is more than %s seconds: %s' % (timedelta, delta)) |
|
66 | 111 |
return False |
67 |
return check_string(unsigned_query, signature, key, algo=algo) |
|
112 |
return check_string(unsigned_query, signature, key, algo=algo, raise_on_error=raise_on_error)
|
|
68 | 113 | |
69 | 114 | |
70 |
def check_string(s, signature, key, algo='sha256'): |
|
115 |
def check_string(s, signature, key, algo='sha256', raise_on_error=False):
|
|
71 | 116 |
# constant time compare |
72 | 117 |
signature2 = sign_string(s, key, algo=algo) |
73 |
if len(signature2) != len(signature): |
|
118 |
if not secrets.compare_digest(signature, signature2): |
|
119 |
if raise_on_error: |
|
120 |
raise SignatureError('HMAC hash is different') |
|
74 | 121 |
return False |
75 |
res = 0 |
|
76 |
for a, b in zip(signature, signature2): |
|
77 |
res |= a ^ b |
|
78 |
return res == 0 |
|
122 |
return True |
tests_authentic/test_rest_authentication.py | ||
---|---|---|
14 | 14 |
# You should have received a copy of the GNU Affero General Public License |
15 | 15 |
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
16 | 16 | |
17 |
import re |
|
18 | ||
17 | 19 |
import pytest |
18 | 20 |
from django.contrib.auth import get_user_model |
19 | 21 |
from django.test import RequestFactory |
... | ... | |
88 | 90 |
publik_authentication = rest_authentication.PublikAuthentication() |
89 | 91 |
with pytest.raises(rest_authentication.PublikAuthenticationFailed) as exc_info: |
90 | 92 |
publik_authentication.authenticate(request) |
91 |
assert exc_info.value.detail['err'] == 'invalid-signature' |
|
93 |
assert exc_info.value.detail['err'] == 1 |
|
94 |
assert exc_info.value.detail['err_desc'] == 'HMAC hash is different' |
|
92 | 95 | |
93 | 96 | |
94 | 97 |
def test_response(rf, settings, tenant): |
... | ... | |
114 | 117 | |
115 | 118 |
response = view(request) |
116 | 119 |
assert response.status_code == 401 |
117 |
assert response.data['err'] == 'no-known-services-setting' |
|
120 |
assert response.data['err'] == 1 |
|
121 |
assert response.data['err_desc'] == 'no-known-services-setting' |
|
118 | 122 | |
119 | 123 |
secret_key = 'bbb' |
120 | 124 |
settings.KNOWN_SERVICES = { |
... | ... | |
127 | 131 | |
128 | 132 |
response = view(request) |
129 | 133 |
assert response.status_code == 401 |
130 |
assert response.data['err'] == 'no-secret-found-for-orig' |
|
134 |
assert response.data['err'] == 1 |
|
135 |
assert response.data['err_desc'] == 'no-secret-found-for-orig' |
|
131 | 136 | |
132 | 137 |
settings.KNOWN_SERVICES['whatever']['whatever']['secret'] = secret_key |
133 | 138 |
response = view(request) |
134 | 139 |
assert response.status_code == 401 |
135 |
assert response.data['err'] == 'invalid-signature' |
|
140 |
assert response.data['err'] == 1 |
|
141 |
assert response.data['err_desc'] == 'multiple/missing algo' |
|
136 | 142 | |
137 | 143 |
# User authentication |
138 | 144 |
request = rf.get(signature.sign_url('/?orig=zzz&NameID=1234', secret_key)) |
139 | 145 | |
140 | 146 |
response = view(request) |
141 | 147 |
assert response.status_code == 401 |
142 |
assert response.data == {'err': 'user-not-found'} |
|
148 |
assert response.data == {'err': 1, 'err_desc': 'user-not-found'} |
|
149 | ||
150 |
# Service authentication, wrong timestamp |
|
151 |
request = rf.get( |
|
152 |
re.sub('timestamp=[^&]*', 'timestamp=xxx', signature.sign_url('/?orig=zzz', secret_key)) |
|
153 |
) |
|
154 | ||
155 |
response = view(request) |
|
156 |
assert response.status_code == 401 |
|
157 |
assert response.data == { |
|
158 |
'err': 1, |
|
159 |
'err_desc': "invalid timestamp, time data 'xxx' does not match format '%Y-%m-%dT%H:%M:%SZ'", |
|
160 |
} |
|
143 | 161 | |
144 | 162 |
# Service authentication |
145 | 163 |
request = rf.get(signature.sign_url('/?orig=zzz', secret_key)) |
... | ... | |
151 | 169 |
del settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS |
152 | 170 |
response = view(request) |
153 | 171 |
assert response.status_code == 401 |
154 |
assert response.data == {'err': 'no-user-for-orig'} |
|
172 |
assert response.data == {'err': 1, 'err_desc': 'no-user-for-orig'} |
|
155 |
- |