Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed feed_data after feed_eof assertion errors on asyncio #752

Merged
merged 8 commits into from
Sep 5, 2024
3 changes: 3 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
- Fixed ``to_process.run_sync()`` failing to initialize if ``__main__.__file__`` pointed
to a file in a nonexistent directory
(`#696 <https://github.com/agronholm/anyio/issues/696>`_)
- Fixed ``AssertionError: feed_data after feed_eof`` on asyncio when a subprocess is
closed early, before its output has been read
(`#490 <https://github.com/agronholm/anyio/issues/490>`_)
- Fixed ``TaskInfo.has_pending_cancellation()`` on asyncio not respecting shielded
scopes (`#771 <https://github.com/agronholm/anyio/issues/771>`_; PR by @gschaffner)
- Fixed quitting the debugger in a pytest test session while in an active task group
Expand Down
8 changes: 6 additions & 2 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from concurrent.futures import Future
from contextlib import suppress
from contextvars import Context, copy_context
from dataclasses import dataclass
from dataclasses import dataclass, field
from functools import partial, wraps
from inspect import (
CORO_RUNNING,
Expand Down Expand Up @@ -907,16 +907,20 @@ def _spawn_task_from_thread(
@dataclass(eq=False)
class StreamReaderWrapper(abc.ByteReceiveStream):
_stream: asyncio.StreamReader
_closed: bool = field(init=False, default=False)

async def receive(self, max_bytes: int = 65536) -> bytes:
if self._closed:
raise ClosedResourceError

data = await self._stream.read(max_bytes)
if data:
return data
else:
raise EndOfStream

async def aclose(self) -> None:
self._stream.feed_eof()
self._closed = True
await AsyncIOBackend.checkpoint()


Expand Down
12 changes: 12 additions & 0 deletions tests/test_subprocesses.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,15 @@ async def test_process_aexit_cancellation_closes_standard_streams(

with pytest.raises(ClosedResourceError):
await process.stderr.receive(1)


async def test_close_early() -> None:
"""Regression test for #490."""
code = dedent("""\
import sys
for _ in range(100):
sys.stdout.buffer.write(bytes(range(256)))
""")

async with await open_process([sys.executable, "-c", code]):
pass
Loading