Skip to content

Commit

Permalink
Get rid of ext close callback, and other tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
johtso committed Jun 23, 2021
1 parent 9c4659e commit 3c41933
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 20 deletions.
5 changes: 2 additions & 3 deletions httpx_caching/_async/_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,8 @@ async def _io_close_response_stream(
def wrap_response_stream(
self, key: str, response: Response, vary_header_values: dict
) -> Response:
wrapped_stream = ByteStreamWrapper(
response.stream, response.extensions.get("aclose")
)
response_stream: AsyncByteStream = response.stream # type: ignore
wrapped_stream = ByteStreamWrapper(response_stream)
response.stream = wrapped_stream

async def callback(response_body: bytes):
Expand Down
8 changes: 5 additions & 3 deletions httpx_caching/_models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import dataclasses
from typing import AsyncIterable, Iterable, Union
from typing import Union

from httpx import Headers
from httpx import ByteStream, Headers

from ._utils import ByteStreamWrapper


@dataclasses.dataclass
Expand All @@ -12,7 +14,7 @@ class Response:

status_code: int
headers: Headers
stream: Union[Iterable[bytes], AsyncIterable[bytes]]
stream: Union[ByteStream, ByteStreamWrapper]
extensions: dict = dataclasses.field(default_factory=dict)

@classmethod
Expand Down
5 changes: 2 additions & 3 deletions httpx_caching/_sync/_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ def _io_close_response_stream(self, action: protocol.CloseResponseStream) -> Non
def wrap_response_stream(
self, key: str, response: Response, vary_header_values: dict
) -> Response:
wrapped_stream = ByteStreamWrapper(
response.stream, response.extensions.get("close")
)
response_stream: SyncByteStream = response.stream # type: ignore
wrapped_stream = ByteStreamWrapper(response_stream)
response.stream = wrapped_stream

def callback(response_body: bytes):
Expand Down
16 changes: 5 additions & 11 deletions httpx_caching/_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import threading
from typing import (
AsyncIterable,
AsyncIterator,
Awaitable,
Callable,
Generator,
Iterable,
Iterator,
Optional,
Tuple,
Expand All @@ -23,8 +21,7 @@
class ByteStreamWrapper:
def __init__(
self,
stream: Union[Iterable[bytes], AsyncIterable[bytes]],
stream_close: Optional[Callable],
stream: Union[httpx.SyncByteStream, httpx.AsyncByteStream],
callback: Optional[Callable] = None,
) -> None:
"""
Expand All @@ -33,7 +30,6 @@ def __init__(
"""
self.stream = stream
self.callback = callback or (lambda *args, **kwargs: None)
self.stream_close = stream_close

self.buffer = bytearray()
self.callback_called = False
Expand All @@ -43,7 +39,7 @@ def _on_read_finish(self):
self.callback(bytes(self.buffer))
self.callback_called = True

async def a_on_read_finish(self):
async def _a_on_read_finish(self):
if not self.callback_called:
await self.callback(bytes(self.buffer))
self.callback_called = True
Expand All @@ -58,15 +54,13 @@ async def __aiter__(self) -> AsyncIterator[bytes]:
async for chunk in self.stream: # type: ignore
self.buffer.extend(chunk)
yield chunk
await self.a_on_read_finish()
await self._a_on_read_finish()

def close(self) -> None:
if self.stream_close:
self.stream_close() # type: ignore
self.stream.close() # type: ignore

async def aclose(self) -> None:
if self.stream_close:
await self.stream_close() # type: ignore
await self.stream.aclose() # type: ignore


YieldType = TypeVar("YieldType")
Expand Down

0 comments on commit 3c41933

Please sign in to comment.