From 1219e15bdb920096ed22f89e442c7df0d5f4b215 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Thu, 27 Jan 2022 08:27:38 +0100 Subject: [PATCH 1/3] misc: move authentic2.crypto to authentic2.utils.crypto (#60129) --- src/authentic2/crypto.py | 226 +----------------- src/authentic2/utils/crypto.py | 223 +++++++++++++++++ .../{test_crypto.py => test_utils_crypto.py} | 0 3 files changed, 226 insertions(+), 223 deletions(-) create mode 100644 src/authentic2/utils/crypto.py rename tests/{test_crypto.py => test_utils_crypto.py} (100%) diff --git a/src/authentic2/crypto.py b/src/authentic2/crypto.py index b32ab60a..aa6fcb38 100644 --- a/src/authentic2/crypto.py +++ b/src/authentic2/crypto.py @@ -1,223 +1,3 @@ -# authentic2 - versatile identity manager -# Copyright (C) 2010-2019 Entr'ouvert -# -# This program is free software: you can redistribute it and/or modify it -# under the terms of the GNU Affero General Public License as published -# by the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -import base64 -import hashlib -import hmac -import struct -from binascii import Error as Base64Error - -from Cryptodome import Random -from Cryptodome.Cipher import AES -from Cryptodome.Hash import HMAC, SHA256 -from Cryptodome.Protocol.KDF import PBKDF2 -from django.conf import settings -from django.utils.crypto import constant_time_compare -from django.utils.encoding import force_bytes - - -class DecryptionError(Exception): - pass - - -def base64url_decode(raw): - rem = len(raw) % 4 - if rem > 0: - raw += b'=' * (4 - rem) - return base64.urlsafe_b64decode(raw) - - -def base64url_encode(raw): - return base64.urlsafe_b64encode(raw).rstrip(b'=') - - -def get_hashclass(name): - if name in ['md5', 'sha1', 'sha256', 'sha384', 'sha512']: - return getattr(hashlib, name) - return None - - -def aes_base64_encrypt(key, data): - """Generate an AES key from any key material using PBKDF2, and encrypt data using CFB mode. A - new IV is generated each time, the IV is also used as salt for PBKDF2. - """ - iv = Random.get_random_bytes(16) - aes_key = PBKDF2(key, iv) - aes = AES.new(aes_key, AES.MODE_CFB, iv=iv) - crypted = aes.encrypt(data) - return b'%s$%s' % (base64.b64encode(iv), base64.b64encode(crypted)) - - -def aes_base64_decrypt(key, payload, raise_on_error=True): - '''Decrypt data encrypted with aes_base64_encrypt''' - if not isinstance(payload, bytes): - try: - payload = payload.encode('ascii') - except Exception: - raise DecryptionError('payload is not an ASCII string') - try: - iv, crypted = payload.split(b'$') - except (ValueError, TypeError): - if raise_on_error: - raise DecryptionError('bad payload') - return None - try: - iv = base64.b64decode(iv) - crypted = base64.b64decode(crypted) - except Base64Error: - if raise_on_error: - raise DecryptionError('incorrect base64 encoding') - return None - aes_key = PBKDF2(key, iv) - aes = AES.new(aes_key, AES.MODE_CFB, iv=iv) - return aes.decrypt(crypted) - - -def add_padding(msg, block_size): - '''Pad message with zero bytes to match block_size''' - pad_length = block_size - (len(msg) + 2) % block_size - padded = struct.pack(' len(msg) - 2: - raise DecryptionError('wrong padding') - if len(msg[2 + msg_length :].strip(force_bytes('\0'))): - raise DecryptionError('padding is not all zero') - if len(unpadded) != msg_length: - raise DecryptionError('wrong padding') - return unpadded - - -def aes_base64url_deterministic_encrypt(key, data, salt, hash_name='sha256', count=1): - """Encrypt using AES-128 and sign using HMAC-SHA256 shortened to 64 bits. - - Count and algorithm are encoded in the final string for future evolution. - - """ - mode = 1 # AES128-SHA256 - hashmod = SHA256 - key_size = 16 - hmac_size = key_size - - if isinstance(salt, str): - salt = force_bytes(salt) - iv = hashmod.new(salt).digest() - - def prf(secret, salt): - return HMAC.new(secret, salt, hashmod).digest() - - aes_key = PBKDF2(key, iv, dkLen=key_size, count=count, prf=prf) - - key_size = len(aes_key) - - aes = AES.new(aes_key, AES.MODE_CBC, iv[:key_size]) - - crypted = aes.encrypt(add_padding(data, key_size)) - - hmac = prf(key, crypted)[:hmac_size] - - raw = struct.pack('<2sBH', b'a2', mode, count) + crypted + hmac - return base64url_encode(raw) - - -def aes_base64url_deterministic_decrypt(key, urlencoded, salt, raise_on_error=True, max_count=1): - mode = 1 # AES128-SHA256 - hashmod = SHA256 - key_size = 16 - hmac_size = key_size - - def prf(secret, salt): - return HMAC.new(secret, salt, hashmod).digest() - - try: - try: - raw = base64url_decode(urlencoded) - except Exception as e: - raise DecryptionError('base64 decoding failed', e) - try: - magic, mode, count = struct.unpack('<2sBH', raw[:5]) - except struct.error as e: - raise DecryptionError('invalid packing', e) - if magic != b'a2': - raise DecryptionError('invalid magic string', magic) - if mode != 1: - raise DecryptionError('mode is not AES128-SHA256', mode) - if count > max_count: - raise DecryptionError('count is too big', count) - - crypted, hmac = raw[5:-hmac_size], raw[-hmac_size:] - - if not crypted or not hmac or prf(key, crypted)[:hmac_size] != hmac: - raise DecryptionError('invalid HMAC') - - if isinstance(salt, str): - salt = force_bytes(salt) - iv = hashmod.new(salt).digest() - - aes_key = PBKDF2(key, iv, dkLen=key_size, count=count, prf=prf) - - aes = AES.new(aes_key, AES.MODE_CBC, iv[:key_size]) - - data = remove_padding(aes.decrypt(crypted), key_size) - - return data - except DecryptionError: - if not raise_on_error: - return None - raise - - -def hmac_url(key, url): - if hasattr(key, 'encode'): - key = key.encode() - if hasattr(url, 'encode'): - url = url.encode() - return ( - base64.b32encode(hmac.HMAC(key=key, msg=url, digestmod=hashlib.sha256).digest()) - .decode('ascii') - .strip('=') - ) - - -def check_hmac_url(key, url, signature): - if hasattr(signature, 'decode'): - signature = signature.decode() - return constant_time_compare(signature, hmac_url(key, url).encode('ascii')) - - -def hash_chain(n, seed=None, encoded_seed=None): - '''Generate a chain of hashes''' - if encoded_seed: - seed = base64url_decode(encoded_seed.encode()) - if hasattr(seed, 'encode'): - seed = seed.encode() - if seed is None: - seed = Random.get_random_bytes(16) - chain = [seed] - for dummy in range(n - 1): - chain.append(hashlib.sha256(chain[-1] + settings.SECRET_KEY.encode()).digest()) - return [base64url_encode(x).decode('ascii') for x in chain] +# authentic2.crypto was moved to authentic2.utils.cryptor, use wildcard import to prevent +# breakage of import in other modules +from .utils.crypto import * # pylint: disable=unused-wildcard-import,wildcard-import diff --git a/src/authentic2/utils/crypto.py b/src/authentic2/utils/crypto.py new file mode 100644 index 00000000..b32ab60a --- /dev/null +++ b/src/authentic2/utils/crypto.py @@ -0,0 +1,223 @@ +# authentic2 - versatile identity manager +# Copyright (C) 2010-2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import base64 +import hashlib +import hmac +import struct +from binascii import Error as Base64Error + +from Cryptodome import Random +from Cryptodome.Cipher import AES +from Cryptodome.Hash import HMAC, SHA256 +from Cryptodome.Protocol.KDF import PBKDF2 +from django.conf import settings +from django.utils.crypto import constant_time_compare +from django.utils.encoding import force_bytes + + +class DecryptionError(Exception): + pass + + +def base64url_decode(raw): + rem = len(raw) % 4 + if rem > 0: + raw += b'=' * (4 - rem) + return base64.urlsafe_b64decode(raw) + + +def base64url_encode(raw): + return base64.urlsafe_b64encode(raw).rstrip(b'=') + + +def get_hashclass(name): + if name in ['md5', 'sha1', 'sha256', 'sha384', 'sha512']: + return getattr(hashlib, name) + return None + + +def aes_base64_encrypt(key, data): + """Generate an AES key from any key material using PBKDF2, and encrypt data using CFB mode. A + new IV is generated each time, the IV is also used as salt for PBKDF2. + """ + iv = Random.get_random_bytes(16) + aes_key = PBKDF2(key, iv) + aes = AES.new(aes_key, AES.MODE_CFB, iv=iv) + crypted = aes.encrypt(data) + return b'%s$%s' % (base64.b64encode(iv), base64.b64encode(crypted)) + + +def aes_base64_decrypt(key, payload, raise_on_error=True): + '''Decrypt data encrypted with aes_base64_encrypt''' + if not isinstance(payload, bytes): + try: + payload = payload.encode('ascii') + except Exception: + raise DecryptionError('payload is not an ASCII string') + try: + iv, crypted = payload.split(b'$') + except (ValueError, TypeError): + if raise_on_error: + raise DecryptionError('bad payload') + return None + try: + iv = base64.b64decode(iv) + crypted = base64.b64decode(crypted) + except Base64Error: + if raise_on_error: + raise DecryptionError('incorrect base64 encoding') + return None + aes_key = PBKDF2(key, iv) + aes = AES.new(aes_key, AES.MODE_CFB, iv=iv) + return aes.decrypt(crypted) + + +def add_padding(msg, block_size): + '''Pad message with zero bytes to match block_size''' + pad_length = block_size - (len(msg) + 2) % block_size + padded = struct.pack(' len(msg) - 2: + raise DecryptionError('wrong padding') + if len(msg[2 + msg_length :].strip(force_bytes('\0'))): + raise DecryptionError('padding is not all zero') + if len(unpadded) != msg_length: + raise DecryptionError('wrong padding') + return unpadded + + +def aes_base64url_deterministic_encrypt(key, data, salt, hash_name='sha256', count=1): + """Encrypt using AES-128 and sign using HMAC-SHA256 shortened to 64 bits. + + Count and algorithm are encoded in the final string for future evolution. + + """ + mode = 1 # AES128-SHA256 + hashmod = SHA256 + key_size = 16 + hmac_size = key_size + + if isinstance(salt, str): + salt = force_bytes(salt) + iv = hashmod.new(salt).digest() + + def prf(secret, salt): + return HMAC.new(secret, salt, hashmod).digest() + + aes_key = PBKDF2(key, iv, dkLen=key_size, count=count, prf=prf) + + key_size = len(aes_key) + + aes = AES.new(aes_key, AES.MODE_CBC, iv[:key_size]) + + crypted = aes.encrypt(add_padding(data, key_size)) + + hmac = prf(key, crypted)[:hmac_size] + + raw = struct.pack('<2sBH', b'a2', mode, count) + crypted + hmac + return base64url_encode(raw) + + +def aes_base64url_deterministic_decrypt(key, urlencoded, salt, raise_on_error=True, max_count=1): + mode = 1 # AES128-SHA256 + hashmod = SHA256 + key_size = 16 + hmac_size = key_size + + def prf(secret, salt): + return HMAC.new(secret, salt, hashmod).digest() + + try: + try: + raw = base64url_decode(urlencoded) + except Exception as e: + raise DecryptionError('base64 decoding failed', e) + try: + magic, mode, count = struct.unpack('<2sBH', raw[:5]) + except struct.error as e: + raise DecryptionError('invalid packing', e) + if magic != b'a2': + raise DecryptionError('invalid magic string', magic) + if mode != 1: + raise DecryptionError('mode is not AES128-SHA256', mode) + if count > max_count: + raise DecryptionError('count is too big', count) + + crypted, hmac = raw[5:-hmac_size], raw[-hmac_size:] + + if not crypted or not hmac or prf(key, crypted)[:hmac_size] != hmac: + raise DecryptionError('invalid HMAC') + + if isinstance(salt, str): + salt = force_bytes(salt) + iv = hashmod.new(salt).digest() + + aes_key = PBKDF2(key, iv, dkLen=key_size, count=count, prf=prf) + + aes = AES.new(aes_key, AES.MODE_CBC, iv[:key_size]) + + data = remove_padding(aes.decrypt(crypted), key_size) + + return data + except DecryptionError: + if not raise_on_error: + return None + raise + + +def hmac_url(key, url): + if hasattr(key, 'encode'): + key = key.encode() + if hasattr(url, 'encode'): + url = url.encode() + return ( + base64.b32encode(hmac.HMAC(key=key, msg=url, digestmod=hashlib.sha256).digest()) + .decode('ascii') + .strip('=') + ) + + +def check_hmac_url(key, url, signature): + if hasattr(signature, 'decode'): + signature = signature.decode() + return constant_time_compare(signature, hmac_url(key, url).encode('ascii')) + + +def hash_chain(n, seed=None, encoded_seed=None): + '''Generate a chain of hashes''' + if encoded_seed: + seed = base64url_decode(encoded_seed.encode()) + if hasattr(seed, 'encode'): + seed = seed.encode() + if seed is None: + seed = Random.get_random_bytes(16) + chain = [seed] + for dummy in range(n - 1): + chain.append(hashlib.sha256(chain[-1] + settings.SECRET_KEY.encode()).digest()) + return [base64url_encode(x).decode('ascii') for x in chain] diff --git a/tests/test_crypto.py b/tests/test_utils_crypto.py similarity index 100% rename from tests/test_crypto.py rename to tests/test_utils_crypto.py -- 2.34.1