Skip to content

Commit

Permalink
Add AES-GCM as PBES2/PKCS8 mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Legrandin committed Jan 8, 2024
1 parent 5b3ac10 commit 8677775
Show file tree
Hide file tree
Showing 22 changed files with 359 additions and 42 deletions.
126 changes: 94 additions & 32 deletions lib/Crypto/IO/_PBES.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
DerObjectId, DerInteger,
)

from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from Crypto.Protocol.KDF import PBKDF1, PBKDF2, scrypt

Expand All @@ -59,7 +60,9 @@
_OID_AES128_CBC = "2.16.840.1.101.3.4.1.2"
_OID_AES192_CBC = "2.16.840.1.101.3.4.1.22"
_OID_AES256_CBC = "2.16.840.1.101.3.4.1.42"

_OID_AES128_GCM = "2.16.840.1.101.3.4.1.6"
_OID_AES192_GCM = "2.16.840.1.101.3.4.1.26"
_OID_AES256_GCM = "2.16.840.1.101.3.4.1.46"

class PbesError(ValueError):
pass
Expand Down Expand Up @@ -152,26 +155,26 @@ def decrypt(data, passphrase):
from Crypto.Hash import MD5
from Crypto.Cipher import DES
hashmod = MD5
ciphermod = DES
module = DES
elif pbe_oid == _OID_PBE_WITH_MD5_AND_RC2_CBC:
# PBE_MD5_RC2_CBC
from Crypto.Hash import MD5
from Crypto.Cipher import ARC2
hashmod = MD5
ciphermod = ARC2
module = ARC2
cipher_params['effective_keylen'] = 64
elif pbe_oid == _OID_PBE_WITH_SHA1_AND_DES_CBC:
# PBE_SHA1_DES_CBC
from Crypto.Hash import SHA1
from Crypto.Cipher import DES
hashmod = SHA1
ciphermod = DES
module = DES
elif pbe_oid == _OID_PBE_WITH_SHA1_AND_RC2_CBC:
# PBE_SHA1_RC2_CBC
from Crypto.Hash import SHA1
from Crypto.Cipher import ARC2
hashmod = SHA1
ciphermod = ARC2
module = ARC2
cipher_params['effective_keylen'] = 64
else:
raise PbesError("Unknown OID for PBES1")
Expand All @@ -183,7 +186,7 @@ def decrypt(data, passphrase):
key_iv = PBKDF1(passphrase, salt, 16, iterations, hashmod)
key, iv = key_iv[:8], key_iv[8:]

cipher = ciphermod.new(key, ciphermod.MODE_CBC, iv, **cipher_params)
cipher = module.new(key, module.MODE_CBC, iv, **cipher_params)
pt = cipher.decrypt(encrypted_data)
return unpad(pt, cipher.block_size)

Expand Down Expand Up @@ -260,35 +263,57 @@ def encrypt(data, passphrase, protection, prot_params=None, randfunc=None):
pbkdf = "scrypt"
enc_algo = res.group(3)

aead = False
if enc_algo == 'DES-EDE3-CBC':
from Crypto.Cipher import DES3
key_size = 24
module = DES3
cipher_mode = DES3.MODE_CBC
enc_oid = _OID_DES_EDE3_CBC
enc_param = {'iv': randfunc(8)}
elif enc_algo == 'AES128-CBC':
from Crypto.Cipher import AES
key_size = 16
module = AES
cipher_mode = AES.MODE_CBC
enc_oid = _OID_AES128_CBC
enc_param = {'iv': randfunc(16)}
elif enc_algo == 'AES192-CBC':
from Crypto.Cipher import AES
key_size = 24
module = AES
cipher_mode = AES.MODE_CBC
enc_oid = _OID_AES192_CBC
enc_param = {'iv': randfunc(16)}
elif enc_algo == 'AES256-CBC':
from Crypto.Cipher import AES
key_size = 32
module = AES
cipher_mode = AES.MODE_CBC
enc_oid = _OID_AES256_CBC
enc_param = {'iv': randfunc(16)}
elif enc_algo == 'AES128-GCM':
key_size = 16
module = AES
cipher_mode = AES.MODE_GCM
enc_oid = _OID_AES128_GCM
enc_param = {'nonce': randfunc(12)}
aead = True
elif enc_algo == 'AES192-GCM':
key_size = 24
module = AES
cipher_mode = AES.MODE_GCM
enc_oid = _OID_AES192_GCM
enc_param = {'nonce': randfunc(12)}
aead = True
elif enc_algo == 'AES256-GCM':
key_size = 32
module = AES
cipher_mode = AES.MODE_GCM
enc_oid = _OID_AES256_GCM
enc_param = {'nonce': randfunc(12)}
aead = True
else:
raise ValueError("Unknown encryption mode '%s'" % enc_algo)

# Get random data
iv = randfunc(module.block_size)
iv_nonce = list(enc_param.values())[0]
salt = randfunc(prot_params.get("salt_size", 8))

# Derive key from password
Expand All @@ -310,18 +335,18 @@ def encrypt(data, passphrase, protection, prot_params=None, randfunc=None):

if pbkdf2_hmac_algo != 'SHA1':
try:
hmac_oid = Hash.HMAC.new(digestmod).oid
hmac_oid = Hash.HMAC.new(b'', digestmod=digestmod).oid
except KeyError:
raise ValueError("No OID for HMAC hash algorithm")
pbkdf2_params.append(DerObjectId(hmac_oid))
pbkdf2_params.append(DerSequence([DerObjectId(hmac_oid)]))

kdf_info = DerSequence([
DerObjectId(_OID_PBKDF2), # PBKDF2
pbkdf2_params
])

elif pbkdf == 'scrypt':
# It must be scrypt

count = prot_params.get("iteration_count", 16384)
scrypt_r = prot_params.get('block_size', 8)
scrypt_p = prot_params.get('parallelization', 1)
Expand All @@ -341,11 +366,15 @@ def encrypt(data, passphrase, protection, prot_params=None, randfunc=None):
raise ValueError("Unknown KDF " + res.group(1))

# Create cipher and use it
cipher = module.new(key, cipher_mode, iv)
encrypted_data = cipher.encrypt(pad(data, cipher.block_size))
cipher = module.new(key, cipher_mode, **enc_param)
if aead:
ct, tag = cipher.encrypt_and_digest(data)
encrypted_data = ct + tag
else:
encrypted_data = cipher.encrypt(pad(data, cipher.block_size))
enc_info = DerSequence([
DerObjectId(enc_oid),
DerOctetString(iv)
DerOctetString(iv_nonce)
])

# Result
Expand Down Expand Up @@ -405,10 +434,12 @@ def decrypt(data, passphrase):

if left > 0:
try:
# Check if it's an INTEGER
kdf_key_length = pbkdf2_params[idx] - 0
left -= 1
idx += 1
except TypeError:
# keyLength is not present
pass

# Default is HMAC-SHA1
Expand All @@ -434,34 +465,55 @@ def decrypt(data, passphrase):
enc_info = DerSequence().decode(pbes2_params[1])
enc_oid = DerObjectId().decode(enc_info[0]).value

aead = False
if enc_oid == _OID_DES_EDE3_CBC:
# DES_EDE3_CBC
from Crypto.Cipher import DES3
ciphermod = DES3
module = DES3
cipher_mode = DES3.MODE_CBC
key_size = 24
cipher_param = 'iv'
elif enc_oid == _OID_AES128_CBC:
# AES128_CBC
from Crypto.Cipher import AES
ciphermod = AES
module = AES
cipher_mode = AES.MODE_CBC
key_size = 16
cipher_param = 'iv'
elif enc_oid == _OID_AES192_CBC:
# AES192_CBC
from Crypto.Cipher import AES
ciphermod = AES
module = AES
cipher_mode = AES.MODE_CBC
key_size = 24
cipher_param = 'iv'
elif enc_oid == _OID_AES256_CBC:
# AES256_CBC
from Crypto.Cipher import AES
ciphermod = AES
module = AES
cipher_mode = AES.MODE_CBC
key_size = 32
cipher_param = 'iv'
elif enc_oid == _OID_AES128_GCM:
module = AES
cipher_mode = AES.MODE_GCM
key_size = 16
cipher_param = 'nonce'
aead = True
elif enc_oid == _OID_AES192_GCM:
module = AES
cipher_mode = AES.MODE_GCM
key_size = 24
cipher_param = 'nonce'
aead = True
elif enc_oid == _OID_AES256_GCM:
module = AES
cipher_mode = AES.MODE_GCM
key_size = 32
cipher_param = 'nonce'
aead = True
else:
raise PbesError("Unsupported PBES2 cipher")
raise PbesError("Unsupported PBES2 cipher " + enc_algo)

if kdf_key_length and kdf_key_length != key_size:
raise PbesError("Mismatch between PBES2 KDF parameters"
" and selected cipher")

IV = DerOctetString().decode(enc_info[1]).payload
iv_nonce = DerOctetString().decode(enc_info[1]).payload

# Create cipher
if kdf_oid == _OID_PBKDF2:
Expand All @@ -477,8 +529,18 @@ def decrypt(data, passphrase):
else:
key = scrypt(passphrase, salt, key_size, iteration_count,
scrypt_r, scrypt_p)
cipher = ciphermod.new(key, ciphermod.MODE_CBC, IV)
cipher = module.new(key, cipher_mode, **{cipher_param:iv_nonce})

# Decrypt data
pt = cipher.decrypt(encrypted_data)
return unpad(pt, cipher.block_size)
if len(encrypted_data) < cipher.block_size:
raise ValueError("Too little data to decrypt")

if aead:
tag_len = cipher.block_size
pt = cipher.decrypt_and_verify(encrypted_data[:-tag_len],
encrypted_data[-tag_len:])
else:
pt_padded = cipher.decrypt(encrypted_data)
pt = unpad(pt_padded, cipher.block_size)

return pt
41 changes: 33 additions & 8 deletions lib/Crypto/SelfTest/IO/test_PBES.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,15 @@
"""Self-tests for Crypto.IO._PBES module"""

import unittest
from Crypto.Util.py3compat import *

from Crypto.IO._PBES import PBES2


class TestPBES2(unittest.TestCase):

def setUp(self):
self.ref = b("Test data")
self.passphrase = b("Passphrase")
self.ref = b"Test data"
self.passphrase = b"Passphrase"

def test1(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
Expand All @@ -53,29 +52,53 @@ def test1(self):

def test2(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'PBKDF2WithHMAC-SHA1AndAES128-CBC')
'PBKDF2WithHMAC-SHA224AndAES128-CBC')
pt = PBES2.decrypt(ct, self.passphrase)
self.assertEqual(self.ref, pt)

def test3(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'PBKDF2WithHMAC-SHA1AndAES192-CBC')
'PBKDF2WithHMAC-SHA256AndAES192-CBC')
pt = PBES2.decrypt(ct, self.passphrase)
self.assertEqual(self.ref, pt)

def test4(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'scryptAndAES128-CBC')
'PBKDF2WithHMAC-SHA384AndAES256-CBC')
pt = PBES2.decrypt(ct, self.passphrase)
self.assertEqual(self.ref, pt)

def test5(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'scryptAndAES192-CBC')
'PBKDF2WithHMAC-SHA512AndAES128-GCM')
pt = PBES2.decrypt(ct, self.passphrase)
self.assertEqual(self.ref, pt)

def test6(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'PBKDF2WithHMAC-SHA512-224AndAES192-GCM')
pt = PBES2.decrypt(ct, self.passphrase)
self.assertEqual(self.ref, pt)

def test7(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'PBKDF2WithHMAC-SHA3-256AndAES256-GCM')
pt = PBES2.decrypt(ct, self.passphrase)
self.assertEqual(self.ref, pt)

def test8(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'scryptAndAES128-CBC')
pt = PBES2.decrypt(ct, self.passphrase)
self.assertEqual(self.ref, pt)

def test9(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'scryptAndAES192-CBC')
pt = PBES2.decrypt(ct, self.passphrase)
self.assertEqual(self.ref, pt)

def test10(self):
ct = PBES2.encrypt(self.ref, self.passphrase,
'scryptAndAES256-CBC')
pt = PBES2.decrypt(ct, self.passphrase)
Expand All @@ -88,6 +111,8 @@ def get_tests(config={}):
listTests += list_test_cases(TestPBES2)
return listTests


if __name__ == '__main__':
suite = lambda: unittest.TestSuite(get_tests())
def suite():
return unittest.TestSuite(get_tests())
unittest.main(defaultTest='suite')
Loading

0 comments on commit 8677775

Please sign in to comment.