Skip to content

Commit

Permalink
Merge pull request #130 from uktrade/fix/async-signature-and-types
Browse files Browse the repository at this point in the history
fix!: the signature of async_stream_zip
  • Loading branch information
michalc authored May 28, 2024
2 parents 9e443e8 + be933b1 commit 00c06b2
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions stream_zip/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def _get(self, offset: int, default_get_compressobj: _CompressObjGetter) -> _Met

# Each member file is a tuple of its name, last modified date, file mode, Method, and its bytes
MemberFile = Tuple[str, datetime, int, Method, Iterable[bytes]]
AsyncMemberFile = Tuple[str, datetime, int, Method, AsyncIterable[bytes]]


def stream_zip(files: Iterable[MemberFile], chunk_size: int=65536,
Expand Down Expand Up @@ -789,7 +790,13 @@ def _no_compression_streamed_data(chunks: Iterable[bytes], uncompressed_size: in
yield from evenly_sized(zipped_chunks)


async def async_stream_zip(member_files: AsyncIterable[MemberFile], *args: Any, **kwargs: Any) -> AsyncIterable[bytes]:
async def async_stream_zip(
files: AsyncIterable[AsyncMemberFile], chunk_size: int=65536,
get_compressobj: _CompressObjGetter=lambda: zlib.compressobj(wbits=-zlib.MAX_WBITS, level=9),
extended_timestamps: bool=True,
password: Optional[str]=None,
get_crypto_random: Callable[[int], bytes]=lambda num_bytes: secrets.token_bytes(num_bytes),
) -> AsyncIterable[bytes]:

async def to_async_iterable(sync_iterable: Iterable[Any]) -> AsyncIterable[Any]:
# asyncio.to_thread is not available until Python 3.9, and StopIteration doesn't get
Expand Down Expand Up @@ -825,10 +832,16 @@ def to_sync_iterable(async_iterable: AsyncIterable[Any]) -> Iterable[Any]:
loop = asyncio.get_event_loop()
sync_member_files = (
member_file[0:4] + (to_sync_iterable(member_file[4],),)
for member_file in to_sync_iterable(member_files)
for member_file in to_sync_iterable(files)
)

async for chunk in to_async_iterable(stream_zip(sync_member_files, *args, **kwargs)):
async for chunk in to_async_iterable(stream_zip(
files=sync_member_files, chunk_size=chunk_size,
get_compressobj=get_compressobj,
extended_timestamps=extended_timestamps,
password=password,
get_crypto_random=get_crypto_random,
)):
yield chunk


Expand Down

0 comments on commit 00c06b2

Please sign in to comment.