Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed feed_data after feed_eof assertion errors on asyncio #752

Merged
merged 8 commits into from
Sep 5, 2024
3 changes: 3 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
- Fixed ``to_process.run_sync()`` failing to initialize if ``__main__.__file__`` pointed
to a file in a nonexistent directory
(`#696 <https://github.com/agronholm/anyio/issues/696>`_)
- Fixed ``AssertionError: feed_data after feed_eof`` on asyncio when a subprocess is
closed early, before its output has been read
(`#490 <https://github.com/agronholm/anyio/issues/490>`_)
- Fixed ``TaskInfo.has_pending_cancellation()`` on asyncio not respecting shielded
scopes (`#771 <https://github.com/agronholm/anyio/issues/771>`_; PR by @gschaffner)
- Fixed ``SocketStream.receive()`` returning ``bytearray`` instead of ``bytes`` when
Expand Down
2 changes: 1 addition & 1 deletion src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ async def receive(self, max_bytes: int = 65536) -> bytes:
raise EndOfStream

async def aclose(self) -> None:
self._stream.feed_eof()
self._stream.set_exception(ClosedResourceError())
await AsyncIOBackend.checkpoint()


Expand Down
38 changes: 37 additions & 1 deletion tests/test_subprocesses.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
import pytest
from pytest import FixtureRequest

from anyio import CancelScope, ClosedResourceError, open_process, run_process
from anyio import (
CancelScope,
ClosedResourceError,
create_task_group,
open_process,
run_process,
)
from anyio.streams.buffered import BufferedByteReceiveStream

pytestmark = pytest.mark.anyio
Expand Down Expand Up @@ -289,3 +295,33 @@ async def test_py39_arguments(
pytest.skip(f"the {argname!r} argument is not supported by uvloop yet")

raise


async def test_close_early() -> None:
"""Regression test for #490."""
code = dedent("""\
import sys
for _ in range(100):
sys.stdout.buffer.write(bytes(range(256)))
""")

async with await open_process([sys.executable, "-c", code]):
pass


async def test_close_while_reading() -> None:
code = dedent("""\
import time

time.sleep(3)
""")

async with await open_process(
[sys.executable, "-c", code]
) as process, create_task_group() as tg:
assert process.stdout
tg.start_soon(process.stdout.aclose)
with pytest.raises(ClosedResourceError):
await process.stdout.receive()

process.terminate()
Loading