diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 72903aea..d99d30a2 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -29,6 +29,9 @@ This library adheres to `Semantic Versioning 2.0 `_. - Fixed ``to_process.run_sync()`` failing to initialize if ``__main__.__file__`` pointed to a file in a nonexistent directory (`#696 `_) +- Fixed ``AssertionError: feed_data after feed_eof`` on asyncio when a subprocess is + closed early, before its output has been read + (`#490 `_) - Fixed ``TaskInfo.has_pending_cancellation()`` on asyncio not respecting shielded scopes (`#771 `_; PR by @gschaffner) - Fixed ``SocketStream.receive()`` returning ``bytearray`` instead of ``bytes`` when diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index b88b9cc1..97edb8ad 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -930,7 +930,7 @@ async def receive(self, max_bytes: int = 65536) -> bytes: raise EndOfStream async def aclose(self) -> None: - self._stream.feed_eof() + self._stream.set_exception(ClosedResourceError()) await AsyncIOBackend.checkpoint() diff --git a/tests/test_subprocesses.py b/tests/test_subprocesses.py index 84c4b4dc..e8020a7c 100644 --- a/tests/test_subprocesses.py +++ b/tests/test_subprocesses.py @@ -13,7 +13,13 @@ import pytest from pytest import FixtureRequest -from anyio import CancelScope, ClosedResourceError, open_process, run_process +from anyio import ( + CancelScope, + ClosedResourceError, + create_task_group, + open_process, + run_process, +) from anyio.streams.buffered import BufferedByteReceiveStream pytestmark = pytest.mark.anyio @@ -289,3 +295,33 @@ async def test_py39_arguments( pytest.skip(f"the {argname!r} argument is not supported by uvloop yet") raise + + +async def test_close_early() -> None: + """Regression test for #490.""" + code = dedent("""\ + import sys + for _ in range(100): + sys.stdout.buffer.write(bytes(range(256))) + """) + + async with await open_process([sys.executable, "-c", code]): + pass + + +async def test_close_while_reading() -> None: + code = dedent("""\ + import time + + time.sleep(3) + """) + + async with await open_process( + [sys.executable, "-c", code] + ) as process, create_task_group() as tg: + assert process.stdout + tg.start_soon(process.stdout.aclose) + with pytest.raises(ClosedResourceError): + await process.stdout.receive() + + process.terminate()