25 |
25 |
from Cryptodome.Hash import HMAC, SHA256
|
26 |
26 |
from Cryptodome.Protocol.KDF import PBKDF2
|
27 |
27 |
from django.conf import settings
|
|
28 |
from django.core import signing
|
|
29 |
from django.core.signing import BadSignature, SignatureExpired # pylint: disable=unused-import
|
28 |
30 |
from django.utils.crypto import constant_time_compare
|
29 |
31 |
from django.utils.encoding import force_bytes
|
30 |
32 |
|
... | ... | |
50 |
52 |
return None
|
51 |
53 |
|
52 |
54 |
|
53 |
|
def aes_base64_encrypt(key, data):
|
|
55 |
def aes_base64_encrypt(key, data, urlsafe=False, sep=b'$'):
|
54 |
56 |
"""Generate an AES key from any key material using PBKDF2, and encrypt data using CFB mode. A
|
55 |
57 |
new IV is generated each time, the IV is also used as salt for PBKDF2.
|
56 |
58 |
"""
|
... | ... | |
58 |
60 |
aes_key = PBKDF2(key, iv)
|
59 |
61 |
aes = AES.new(aes_key, AES.MODE_CFB, iv=iv)
|
60 |
62 |
crypted = aes.encrypt(data)
|
61 |
|
return b'%s$%s' % (base64.b64encode(iv), base64.b64encode(crypted))
|
|
63 |
if urlsafe:
|
|
64 |
return b'%s%s%s' % (base64url_encode(iv), sep, base64url_encode(crypted))
|
|
65 |
else:
|
|
66 |
return b'%s%s%s' % (base64.b64encode(iv), sep, base64.b64encode(crypted))
|
62 |
67 |
|
63 |
68 |
|
64 |
|
def aes_base64_decrypt(key, payload, raise_on_error=True):
|
|
69 |
def aes_base64_decrypt(key, payload, raise_on_error=True, urlsafe=False, sep=b'$'):
|
65 |
70 |
'''Decrypt data encrypted with aes_base64_encrypt'''
|
66 |
71 |
if not isinstance(payload, bytes):
|
67 |
72 |
try:
|
... | ... | |
69 |
74 |
except Exception:
|
70 |
75 |
raise DecryptionError('payload is not an ASCII string')
|
71 |
76 |
try:
|
72 |
|
iv, crypted = payload.split(b'$')
|
|
77 |
iv, crypted = payload.split(sep)
|
73 |
78 |
except (ValueError, TypeError):
|
74 |
79 |
if raise_on_error:
|
75 |
80 |
raise DecryptionError('bad payload')
|
76 |
81 |
return None
|
|
82 |
|
|
83 |
if urlsafe:
|
|
84 |
decode = base64url_decode
|
|
85 |
else:
|
|
86 |
decode = base64.b64decode
|
|
87 |
|
77 |
88 |
try:
|
78 |
|
iv = base64.b64decode(iv)
|
79 |
|
crypted = base64.b64decode(crypted)
|
|
89 |
iv = decode(iv)
|
|
90 |
crypted = decode(crypted)
|
80 |
91 |
except Base64Error:
|
81 |
92 |
if raise_on_error:
|
82 |
93 |
raise DecryptionError('incorrect base64 encoding')
|
... | ... | |
221 |
232 |
for dummy in range(n - 1):
|
222 |
233 |
chain.append(hashlib.sha256(chain[-1] + settings.SECRET_KEY.encode()).digest())
|
223 |
234 |
return [base64url_encode(x).decode('ascii') for x in chain]
|
|
235 |
|
|
236 |
|
|
237 |
def dumps(obj, key=None, **kwargs):
|
|
238 |
if not key:
|
|
239 |
key = settings.SECRET_KEY
|
|
240 |
return aes_base64_encrypt(
|
|
241 |
key.encode(), signing.dumps(obj, key=key, **kwargs).encode(), urlsafe=True, sep=b':'
|
|
242 |
).decode()
|
|
243 |
|
|
244 |
|
|
245 |
def loads(s, key=None, **kwargs):
|
|
246 |
if not key:
|
|
247 |
key = settings.SECRET_KEY
|
|
248 |
try:
|
|
249 |
decrypted = aes_base64_decrypt(key.encode(), s.encode(), urlsafe=True, sep=b':')
|
|
250 |
except DecryptionError:
|
|
251 |
return signing.loads(s, key=key, **kwargs)
|
|
252 |
try:
|
|
253 |
decrypted = decrypted.decode()
|
|
254 |
except UnicodeDecodeError:
|
|
255 |
raise BadSignature
|
|
256 |
return signing.loads(decrypted, key=key, **kwargs)
|