Projet

Général

Profil

0001-utils-add-dumps-loads-for-confidentiality-protected-.patch

Benjamin Dauvergne, 26 janvier 2022 22:01

Télécharger (18,2 ko)

Voir les différences:

Subject: [PATCH 1/2] utils: add dumps/loads for confidentiality protected
 tokens (#61130)

 src/authentic2/crypto.py                      | 224 +--------------
 src/authentic2/utils/crypto.py                | 256 ++++++++++++++++++
 .../{test_crypto.py => test_utils_crypto.py}  |  32 ++-
 3 files changed, 288 insertions(+), 224 deletions(-)
 create mode 100644 src/authentic2/utils/crypto.py
 rename tests/{test_crypto.py => test_utils_crypto.py} (72%)
src/authentic2/crypto.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2019 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import base64
18
import hashlib
19
import hmac
20
import struct
21
from binascii import Error as Base64Error
22

  
23
from Cryptodome import Random
24
from Cryptodome.Cipher import AES
25
from Cryptodome.Hash import HMAC, SHA256
26
from Cryptodome.Protocol.KDF import PBKDF2
27
from django.conf import settings
28
from django.utils.crypto import constant_time_compare
29
from django.utils.encoding import force_bytes
30

  
31

  
32
class DecryptionError(Exception):
33
    pass
34

  
35

  
36
def base64url_decode(raw):
37
    rem = len(raw) % 4
38
    if rem > 0:
39
        raw += b'=' * (4 - rem)
40
    return base64.urlsafe_b64decode(raw)
41

  
42

  
43
def base64url_encode(raw):
44
    return base64.urlsafe_b64encode(raw).rstrip(b'=')
45

  
46

  
47
def get_hashclass(name):
48
    if name in ['md5', 'sha1', 'sha256', 'sha384', 'sha512']:
49
        return getattr(hashlib, name)
50
    return None
51

  
52

  
53
def aes_base64_encrypt(key, data):
54
    """Generate an AES key from any key material using PBKDF2, and encrypt data using CFB mode. A
55
    new IV is generated each time, the IV is also used as salt for PBKDF2.
56
    """
57
    iv = Random.get_random_bytes(16)
58
    aes_key = PBKDF2(key, iv)
59
    aes = AES.new(aes_key, AES.MODE_CFB, iv=iv)
60
    crypted = aes.encrypt(data)
61
    return b'%s$%s' % (base64.b64encode(iv), base64.b64encode(crypted))
62

  
63

  
64
def aes_base64_decrypt(key, payload, raise_on_error=True):
65
    '''Decrypt data encrypted with aes_base64_encrypt'''
66
    if not isinstance(payload, bytes):
67
        try:
68
            payload = payload.encode('ascii')
69
        except Exception:
70
            raise DecryptionError('payload is not an ASCII string')
71
    try:
72
        iv, crypted = payload.split(b'$')
73
    except (ValueError, TypeError):
74
        if raise_on_error:
75
            raise DecryptionError('bad payload')
76
        return None
77
    try:
78
        iv = base64.b64decode(iv)
79
        crypted = base64.b64decode(crypted)
80
    except Base64Error:
81
        if raise_on_error:
82
            raise DecryptionError('incorrect base64 encoding')
83
        return None
84
    aes_key = PBKDF2(key, iv)
85
    aes = AES.new(aes_key, AES.MODE_CFB, iv=iv)
86
    return aes.decrypt(crypted)
87

  
88

  
89
def add_padding(msg, block_size):
90
    '''Pad message with zero bytes to match block_size'''
91
    pad_length = block_size - (len(msg) + 2) % block_size
92
    padded = struct.pack('<h%ds%ds' % (len(msg), pad_length), len(msg), msg, b'\0' * pad_length)
93
    assert len(padded) % block_size == 0
94
    return padded
95

  
96

  
97
def remove_padding(msg, block_size):
98
    '''Ignore padded zero bytes'''
99
    try:
100
        (msg_length,) = struct.unpack('<h', msg[:2])
101
    except struct.error:
102
        raise DecryptionError('wrong padding')
103
    if len(msg) % block_size != 0:
104
        raise DecryptionError('message length is not a multiple of block size', len(msg), block_size)
105
    unpadded = msg[2 : 2 + msg_length]
106
    if msg_length > len(msg) - 2:
107
        raise DecryptionError('wrong padding')
108
    if len(msg[2 + msg_length :].strip(force_bytes('\0'))):
109
        raise DecryptionError('padding is not all zero')
110
    if len(unpadded) != msg_length:
111
        raise DecryptionError('wrong padding')
112
    return unpadded
113

  
114

  
115
def aes_base64url_deterministic_encrypt(key, data, salt, hash_name='sha256', count=1):
116
    """Encrypt using AES-128 and sign using HMAC-SHA256 shortened to 64 bits.
117

  
118
    Count and algorithm are encoded in the final string for future evolution.
119

  
120
    """
121
    mode = 1  # AES128-SHA256
122
    hashmod = SHA256
123
    key_size = 16
124
    hmac_size = key_size
125

  
126
    if isinstance(salt, str):
127
        salt = force_bytes(salt)
128
    iv = hashmod.new(salt).digest()
129

  
130
    def prf(secret, salt):
131
        return HMAC.new(secret, salt, hashmod).digest()
132

  
133
    aes_key = PBKDF2(key, iv, dkLen=key_size, count=count, prf=prf)
134

  
135
    key_size = len(aes_key)
136

  
137
    aes = AES.new(aes_key, AES.MODE_CBC, iv[:key_size])
138

  
139
    crypted = aes.encrypt(add_padding(data, key_size))
140

  
141
    hmac = prf(key, crypted)[:hmac_size]
142

  
143
    raw = struct.pack('<2sBH', b'a2', mode, count) + crypted + hmac
144
    return base64url_encode(raw)
145

  
146

  
147
def aes_base64url_deterministic_decrypt(key, urlencoded, salt, raise_on_error=True, max_count=1):
148
    mode = 1  # AES128-SHA256
149
    hashmod = SHA256
150
    key_size = 16
151
    hmac_size = key_size
152

  
153
    def prf(secret, salt):
154
        return HMAC.new(secret, salt, hashmod).digest()
155

  
156
    try:
157
        try:
158
            raw = base64url_decode(urlencoded)
159
        except Exception as e:
160
            raise DecryptionError('base64 decoding failed', e)
161
        try:
162
            magic, mode, count = struct.unpack('<2sBH', raw[:5])
163
        except struct.error as e:
164
            raise DecryptionError('invalid packing', e)
165
        if magic != b'a2':
166
            raise DecryptionError('invalid magic string', magic)
167
        if mode != 1:
168
            raise DecryptionError('mode is not AES128-SHA256', mode)
169
        if count > max_count:
170
            raise DecryptionError('count is too big', count)
171

  
172
        crypted, hmac = raw[5:-hmac_size], raw[-hmac_size:]
173

  
174
        if not crypted or not hmac or prf(key, crypted)[:hmac_size] != hmac:
175
            raise DecryptionError('invalid HMAC')
176

  
177
        if isinstance(salt, str):
178
            salt = force_bytes(salt)
179
        iv = hashmod.new(salt).digest()
180

  
181
        aes_key = PBKDF2(key, iv, dkLen=key_size, count=count, prf=prf)
182

  
183
        aes = AES.new(aes_key, AES.MODE_CBC, iv[:key_size])
184

  
185
        data = remove_padding(aes.decrypt(crypted), key_size)
186

  
187
        return data
188
    except DecryptionError:
189
        if not raise_on_error:
190
            return None
191
        raise
192

  
193

  
194
def hmac_url(key, url):
195
    if hasattr(key, 'encode'):
196
        key = key.encode()
197
    if hasattr(url, 'encode'):
198
        url = url.encode()
199
    return (
200
        base64.b32encode(hmac.HMAC(key=key, msg=url, digestmod=hashlib.sha256).digest())
201
        .decode('ascii')
202
        .strip('=')
203
    )
204

  
205

  
206
def check_hmac_url(key, url, signature):
207
    if hasattr(signature, 'decode'):
208
        signature = signature.decode()
209
    return constant_time_compare(signature, hmac_url(key, url).encode('ascii'))
210

  
211

  
212
def hash_chain(n, seed=None, encoded_seed=None):
213
    '''Generate a chain of hashes'''
214
    if encoded_seed:
215
        seed = base64url_decode(encoded_seed.encode())
216
    if hasattr(seed, 'encode'):
217
        seed = seed.encode()
218
    if seed is None:
219
        seed = Random.get_random_bytes(16)
220
    chain = [seed]
221
    for dummy in range(n - 1):
222
        chain.append(hashlib.sha256(chain[-1] + settings.SECRET_KEY.encode()).digest())
223
    return [base64url_encode(x).decode('ascii') for x in chain]
1
from .utils.crypto import *  # pylint: disable=unused-wildcard-import,wilcard-import
src/authentic2/utils/crypto.py
1
# authentic2 - versatile identity manager
2
# Copyright (C) 2010-2019 Entr'ouvert
3
#
4
# This program is free software: you can redistribute it and/or modify it
5
# under the terms of the GNU Affero General Public License as published
6
# by the Free Software Foundation, either version 3 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU Affero General Public License for more details.
13
#
14
# You should have received a copy of the GNU Affero General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

  
17
import base64
18
import hashlib
19
import hmac
20
import struct
21
from binascii import Error as Base64Error
22

  
23
from Cryptodome import Random
24
from Cryptodome.Cipher import AES
25
from Cryptodome.Hash import HMAC, SHA256
26
from Cryptodome.Protocol.KDF import PBKDF2
27
from django.conf import settings
28
from django.core import signing
29
from django.core.signing import BadSignature, SignatureExpired  # pylint: disable=unused-import
30
from django.utils.crypto import constant_time_compare
31
from django.utils.encoding import force_bytes
32

  
33

  
34
class DecryptionError(Exception):
35
    pass
36

  
37

  
38
def base64url_decode(raw):
39
    rem = len(raw) % 4
40
    if rem > 0:
41
        raw += b'=' * (4 - rem)
42
    return base64.urlsafe_b64decode(raw)
43

  
44

  
45
def base64url_encode(raw):
46
    return base64.urlsafe_b64encode(raw).rstrip(b'=')
47

  
48

  
49
def get_hashclass(name):
50
    if name in ['md5', 'sha1', 'sha256', 'sha384', 'sha512']:
51
        return getattr(hashlib, name)
52
    return None
53

  
54

  
55
def aes_base64_encrypt(key, data, urlsafe=False, sep=b'$'):
56
    """Generate an AES key from any key material using PBKDF2, and encrypt data using CFB mode. A
57
    new IV is generated each time, the IV is also used as salt for PBKDF2.
58
    """
59
    iv = Random.get_random_bytes(16)
60
    aes_key = PBKDF2(key, iv)
61
    aes = AES.new(aes_key, AES.MODE_CFB, iv=iv)
62
    crypted = aes.encrypt(data)
63
    if urlsafe:
64
        return b'%s%s%s' % (base64url_encode(iv), sep, base64url_encode(crypted))
65
    else:
66
        return b'%s%s%s' % (base64.b64encode(iv), sep, base64.b64encode(crypted))
67

  
68

  
69
def aes_base64_decrypt(key, payload, raise_on_error=True, urlsafe=False, sep=b'$'):
70
    '''Decrypt data encrypted with aes_base64_encrypt'''
71
    if not isinstance(payload, bytes):
72
        try:
73
            payload = payload.encode('ascii')
74
        except Exception:
75
            raise DecryptionError('payload is not an ASCII string')
76
    try:
77
        iv, crypted = payload.split(sep)
78
    except (ValueError, TypeError):
79
        if raise_on_error:
80
            raise DecryptionError('bad payload')
81
        return None
82

  
83
    if urlsafe:
84
        decode = base64url_decode
85
    else:
86
        decode = base64.b64decode
87

  
88
    try:
89
        iv = decode(iv)
90
        crypted = decode(crypted)
91
    except Base64Error:
92
        if raise_on_error:
93
            raise DecryptionError('incorrect base64 encoding')
94
        return None
95
    aes_key = PBKDF2(key, iv)
96
    aes = AES.new(aes_key, AES.MODE_CFB, iv=iv)
97
    return aes.decrypt(crypted)
98

  
99

  
100
def add_padding(msg, block_size):
101
    '''Pad message with zero bytes to match block_size'''
102
    pad_length = block_size - (len(msg) + 2) % block_size
103
    padded = struct.pack('<h%ds%ds' % (len(msg), pad_length), len(msg), msg, b'\0' * pad_length)
104
    assert len(padded) % block_size == 0
105
    return padded
106

  
107

  
108
def remove_padding(msg, block_size):
109
    '''Ignore padded zero bytes'''
110
    try:
111
        (msg_length,) = struct.unpack('<h', msg[:2])
112
    except struct.error:
113
        raise DecryptionError('wrong padding')
114
    if len(msg) % block_size != 0:
115
        raise DecryptionError('message length is not a multiple of block size', len(msg), block_size)
116
    unpadded = msg[2 : 2 + msg_length]
117
    if msg_length > len(msg) - 2:
118
        raise DecryptionError('wrong padding')
119
    if len(msg[2 + msg_length :].strip(force_bytes('\0'))):
120
        raise DecryptionError('padding is not all zero')
121
    if len(unpadded) != msg_length:
122
        raise DecryptionError('wrong padding')
123
    return unpadded
124

  
125

  
126
def aes_base64url_deterministic_encrypt(key, data, salt, hash_name='sha256', count=1):
127
    """Encrypt using AES-128 and sign using HMAC-SHA256 shortened to 64 bits.
128

  
129
    Count and algorithm are encoded in the final string for future evolution.
130

  
131
    """
132
    mode = 1  # AES128-SHA256
133
    hashmod = SHA256
134
    key_size = 16
135
    hmac_size = key_size
136

  
137
    if isinstance(salt, str):
138
        salt = force_bytes(salt)
139
    iv = hashmod.new(salt).digest()
140

  
141
    def prf(secret, salt):
142
        return HMAC.new(secret, salt, hashmod).digest()
143

  
144
    aes_key = PBKDF2(key, iv, dkLen=key_size, count=count, prf=prf)
145

  
146
    key_size = len(aes_key)
147

  
148
    aes = AES.new(aes_key, AES.MODE_CBC, iv[:key_size])
149

  
150
    crypted = aes.encrypt(add_padding(data, key_size))
151

  
152
    hmac = prf(key, crypted)[:hmac_size]
153

  
154
    raw = struct.pack('<2sBH', b'a2', mode, count) + crypted + hmac
155
    return base64url_encode(raw)
156

  
157

  
158
def aes_base64url_deterministic_decrypt(key, urlencoded, salt, raise_on_error=True, max_count=1):
159
    mode = 1  # AES128-SHA256
160
    hashmod = SHA256
161
    key_size = 16
162
    hmac_size = key_size
163

  
164
    def prf(secret, salt):
165
        return HMAC.new(secret, salt, hashmod).digest()
166

  
167
    try:
168
        try:
169
            raw = base64url_decode(urlencoded)
170
        except Exception as e:
171
            raise DecryptionError('base64 decoding failed', e)
172
        try:
173
            magic, mode, count = struct.unpack('<2sBH', raw[:5])
174
        except struct.error as e:
175
            raise DecryptionError('invalid packing', e)
176
        if magic != b'a2':
177
            raise DecryptionError('invalid magic string', magic)
178
        if mode != 1:
179
            raise DecryptionError('mode is not AES128-SHA256', mode)
180
        if count > max_count:
181
            raise DecryptionError('count is too big', count)
182

  
183
        crypted, hmac = raw[5:-hmac_size], raw[-hmac_size:]
184

  
185
        if not crypted or not hmac or prf(key, crypted)[:hmac_size] != hmac:
186
            raise DecryptionError('invalid HMAC')
187

  
188
        if isinstance(salt, str):
189
            salt = force_bytes(salt)
190
        iv = hashmod.new(salt).digest()
191

  
192
        aes_key = PBKDF2(key, iv, dkLen=key_size, count=count, prf=prf)
193

  
194
        aes = AES.new(aes_key, AES.MODE_CBC, iv[:key_size])
195

  
196
        data = remove_padding(aes.decrypt(crypted), key_size)
197

  
198
        return data
199
    except DecryptionError:
200
        if not raise_on_error:
201
            return None
202
        raise
203

  
204

  
205
def hmac_url(key, url):
206
    if hasattr(key, 'encode'):
207
        key = key.encode()
208
    if hasattr(url, 'encode'):
209
        url = url.encode()
210
    return (
211
        base64.b32encode(hmac.HMAC(key=key, msg=url, digestmod=hashlib.sha256).digest())
212
        .decode('ascii')
213
        .strip('=')
214
    )
215

  
216

  
217
def check_hmac_url(key, url, signature):
218
    if hasattr(signature, 'decode'):
219
        signature = signature.decode()
220
    return constant_time_compare(signature, hmac_url(key, url).encode('ascii'))
221

  
222

  
223
def hash_chain(n, seed=None, encoded_seed=None):
224
    '''Generate a chain of hashes'''
225
    if encoded_seed:
226
        seed = base64url_decode(encoded_seed.encode())
227
    if hasattr(seed, 'encode'):
228
        seed = seed.encode()
229
    if seed is None:
230
        seed = Random.get_random_bytes(16)
231
    chain = [seed]
232
    for dummy in range(n - 1):
233
        chain.append(hashlib.sha256(chain[-1] + settings.SECRET_KEY.encode()).digest())
234
    return [base64url_encode(x).decode('ascii') for x in chain]
235

  
236

  
237
def dumps(obj, key=None, **kwargs):
238
    if not key:
239
        key = settings.SECRET_KEY
240
    return aes_base64_encrypt(
241
        key.encode(), signing.dumps(obj, key=key, **kwargs).encode(), urlsafe=True, sep=b':'
242
    ).decode()
243

  
244

  
245
def loads(s, key=None, **kwargs):
246
    if not key:
247
        key = settings.SECRET_KEY
248
    try:
249
        decrypted = aes_base64_decrypt(key.encode(), s.encode(), urlsafe=True, sep=b':')
250
    except DecryptionError:
251
        return signing.loads(s, key=key, **kwargs)
252
    try:
253
        decrypted = decrypted.decode()
254
    except UnicodeDecodeError:
255
        raise BadSignature
256
    return signing.loads(decrypted, key=key, **kwargs)
tests/test_crypto.py → tests/test_utils_crypto.py
14 14
# You should have received a copy of the GNU Affero General Public License
15 15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 16

  
17
import datetime
17 18
import random
18 19
import uuid
19 20

  
20 21
import pytest
21 22
from django.utils.encoding import force_bytes
22 23

  
23
from authentic2 import crypto
24
from authentic2.utils import crypto
24 25

  
25 26
key = b'1234'
26 27

  
......
72 73
    key = 'é'
73 74
    url = 'https://example.invalid/\u0000'
74 75
    assert crypto.check_hmac_url(key, url, crypto.hmac_url(key, url))
76

  
77

  
78
def test_dumps_loads(settings, freezer):
79
    data = {'a': 1, 'b': 'foo', 'bar': 'zib@!$#$#$#$#'}
80

  
81
    token = crypto.dumps(data)
82
    assert token.encode('ascii')
83
    assert crypto.loads(token) == data
84
    settings.SECRET_KEY = 'bb'
85
    with pytest.raises(crypto.BadSignature):
86
        assert crypto.loads(token)
87

  
88
    token = crypto.dumps(data, key='aa')
89
    with pytest.raises(crypto.BadSignature):
90
        assert crypto.loads(token)
91
    assert crypto.loads(token, key='aa') == data
92

  
93
    freezer.move_to(datetime.timedelta(seconds=100))
94
    with pytest.raises(crypto.SignatureExpired):
95
        crypto.loads(token, key='aa', max_age=10)
96
    assert crypto.loads(token, key='aa') == data
97

  
98

  
99
def test_dumps_loads_retrocompatibility():
100
    from django.core import signing
101

  
102
    data = {'a': 1, 'b': 'foo', 'bar': 'zib@!$#$#$#$#'}
103
    token = signing.dumps(data)
104
    assert crypto.loads(token) == data
75
-