Skip to content

Commit

Permalink
feat: AES-2 encryption
Browse files Browse the repository at this point in the history
  • Loading branch information
michalc committed Jan 3, 2024
1 parent ec8a17b commit f6cd967
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 2 deletions.
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ classifiers = [
"License :: OSI Approved :: MIT License",
"Topic :: System :: Archiving :: Compression",
]
dependencies = [
"pycryptodome>=3.10.1",
]

[project.optional-dependencies]
dev = [
Expand All @@ -25,6 +28,7 @@ dev = [
"stream-unzip>=0.0.86"
]
ci = [
"pycryptodome==3.10.1",
"coverage==6.2",
"pytest==6.2.5",
"pytest-cov==3.0.0",
Expand Down
47 changes: 45 additions & 2 deletions stream_zip.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from collections import deque
from struct import Struct
import secrets
import zlib

from Crypto.Cipher import AES
from Crypto.Hash import HMAC, SHA1
from Crypto.Util import Counter
from Crypto.Protocol.KDF import PBKDF2

# Private methods

_NO_COMPRESSION_BUFFERED_32 = object()
Expand Down Expand Up @@ -63,7 +69,7 @@ def method_compressobj(offset, default_get_compressobj):
return method_compressobj


def stream_zip(files, chunk_size=65536, get_compressobj=lambda: zlib.compressobj(wbits=-zlib.MAX_WBITS, level=9), extended_timestamps=True):
def stream_zip(files, chunk_size=65536, get_compressobj=lambda: zlib.compressobj(wbits=-zlib.MAX_WBITS, level=9), extended_timestamps=True, password=None):

def evenly_sized(chunks):
chunk = b''
Expand Down Expand Up @@ -139,6 +145,41 @@ def _raise_if_beyond(offset, maximum, exception_class):
def _no_encryption(chunks):
return (yield from chunks)

def _aes_encrypted(chunks):
key_length = 32
salt_length = 16
password_verification_length = 2

salt = secrets.token_bytes(salt_length)
yield salt

keys = PBKDF2(password, salt, 2 * key_length + password_verification_length, 1000)
yield keys[-password_verification_length:]

encrypter = AES.new(
keys[:key_length], AES.MODE_CTR,
counter=Counter.new(nbits=128, little_endian=True),
)
hmac = HMAC.new(keys[key_length:key_length*2], digestmod=SHA1)

# We leverage the not-often used "return value" of generators. Here, we want to iterate
# over chunks to encrypt them, but still return the same "return value". So we use a
# bit of a trick to extract the return value but still have access to the chunks as
# we iterate over them
return_value = None
def with_return_value():
nonlocal return_value
return_value = yield from chunks

for chunk in with_return_value():
encrypted_chunk = encrypter.encrypt(chunk)
hmac.update(encrypted_chunk)
yield encrypted_chunk

yield hmac.digest()[:10]

return return_value

def _zip_64_local_header_and_data(name_encoded, mod_at_ms_dos, mod_at_unix_extra, aes_extra, external_attr, uncompressed_size, crc_32, _get_compress_obj, encryption_func, chunks):
file_offset = offset

Expand Down Expand Up @@ -577,7 +618,9 @@ def _no_compression_streamed_data(chunks, uncompressed_size, crc_32, maximum_siz
b'\x01', # Only modification time (as opposed to also other times)
int(modified_at.timestamp()),
) if extended_timestamps else b''
aes_extra, encryption_func = (b'', _no_encryption)
aes_extra, encryption_func = \
(b'', _aes_encrypted) if password is not None else \
(b'', _no_encryption)
external_attr = \
(mode << 16) | \
(0x10 if name_encoded[-1:] == b'/' else 0x0) # MS-DOS directory
Expand Down
50 changes: 50 additions & 0 deletions test_stream_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,3 +1082,53 @@ def test_unzip_modification_time_extended_timestamps_disabled(method, timezone,
subprocess.run(['unzip', f'{d}/test.zip', '-d', d], env={'TZ': timezone})

assert os.path.getmtime('my_file') == expected_modified_at.timestamp()


@pytest.mark.parametrize(
"method",
[
ZIP_32,
ZIP_64,
NO_COMPRESSION_64,
NO_COMPRESSION_32,
],
)
def test_password_unzips_with_stream_unzip(method):
now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600
password = 'my-pass'

files = (
('file-1', now, mode, ZIP_64, (b'a' * 10000, b'b' * 10000)),
('file-2', now, mode, ZIP_32, (b'c', b'd')),
)

assert [
(b'file-1', None, b'a' * 10000 + b'b' * 10000),
(b'file-2', None, b'cd'),
] == [
(name, size, b''.join(chunks))
for name, size, chunks in stream_unzip(stream_zip(files, password=password), password=password)
]


@pytest.mark.parametrize(
"method",
[
ZIP_32,
ZIP_64,
NO_COMPRESSION_64,
NO_COMPRESSION_32,
],
)
def test_password_bytes_not_deterministic(method):
now = datetime.strptime('2021-01-01 21:01:12', '%Y-%m-%d %H:%M:%S')
mode = stat.S_IFREG | 0o600
password = 'my-pass'

files = (
('file-1', now, mode, ZIP_64, (b'a' * 10000, b'b' * 10000)),
('file-2', now, mode, ZIP_32, (b'c', b'd')),
)

assert b''.join(stream_zip(files, password=password)) != b''.join(stream_zip(files, password=password))

0 comments on commit f6cd967

Please sign in to comment.