Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed feed_data after feed_eof assertion errors on asyncio #752

Merged
merged 8 commits into from
Sep 5, 2024

Conversation

agronholm
Copy link
Owner

Changes

This removes the call to self._stream.feed_eof(), as it should not be done when the protocol is still sending data (and we can't prevent it from doing so). Instead, we set an internal flag and raise ClosedResourceError if someone tries to read from the stream after it's closed.

Fixes #490.

Checklist

If this is a user-facing code change, like a bugfix or a new feature, please ensure that
you've fulfilled the following conditions (where applicable):

  • You've added tests (in tests/) added which would fail without your patch
  • You've updated the documentation (in docs/, in case of behavior changes or new
    features)
  • You've added a new changelog entry (in docs/versionhistory.rst).

If this is a trivial change, like a typo fix or a code reformatting, then you can ignore
these instructions.

Updating the changelog

If there are no entries after the last release, use **UNRELEASED** as the version.
If, say, your patch fixes issue #123, the entry should look like this:

* Fix big bad boo-boo in task groups (#123 <https://github.com/agronholm/anyio/issues/123>_; PR by @yourgithubaccount)

If there's no issue linked, just link to your pull request instead by updating the
changelog after you've created the PR.

@dolamroth
Copy link

dolamroth commented Jul 5, 2024

Test №1

process.py

import time
import random

while True:
    print(random.random(), flush=True)
    time.sleep(0.01)

test_subprocess.py

import sys
from pathlib import Path

import anyio
from anyio.streams.buffered import BufferedByteReceiveStream


async def main():
    _script_path = str(Path(__file__).parent / "process.py")
    async with await anyio.open_process([sys.executable, _script_path]) as process:
        stream = BufferedByteReceiveStream(process.stdout)
        while True:
            try:
                output = await stream.receive_until(b"\n", 6)
                print(output, flush=True)
            except (anyio.ClosedResourceError, anyio.BrokenResourceError, anyio.EndOfStream):
                break


if __name__ == "__main__":
    anyio.run(main)

WITHOUT fix -- starts producing these after some time

Exception in callback SubprocessStreamProtocol.pipe_data_received(1, b'\r\n0.85027...539023763\r\n')
handle: <Handle SubprocessStreamProtocol.pipe_data_received(1, b'\r\n0.85027...539023763\r\n')>
Traceback (most recent call last):
  File "C:\Users\...\AppData\Local\Programs\Python\Python312\Lib\asyncio\events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "C:\Users\...\AppData\Local\Programs\Python\Python312\Lib\asyncio\subprocess.py", line 72, in pipe_data_received
    reader.feed_data(data)
  File "C:\Users\...\AppData\Local\Programs\Python\Python312\Lib\asyncio\streams.py", line 500, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
           ^^^^^^^^^^^^^
AssertionError: feed_data after feed_eof

WITH fix
Outputs numbers without issues. Works on both Windows and Linux.

These results are consistent between different runs of tests

@dolamroth
Copy link

dolamroth commented Jul 5, 2024

Test №2
This has probably nothing to do directly with anyio, but I want to share this nonetheless.

Let's simulate exception in user's code, by raising IndexError in __anext__

test_subprocess.py

import sys
from pathlib import Path
from typing import Optional

from anyio.abc import Process
import random

import anyio
from anyio.streams.buffered import BufferedByteReceiveStream


class Generator:
    def __init__(self):
        self._proc: Optional[Process] = None

    async def __aenter__(self):
        self._proc = await anyio.open_process([
            sys.executable,
            str(Path(__file__).parent / "process.py")
        ])
        self._stream = BufferedByteReceiveStream(self._proc.stdout)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self._proc.aclose()
        return False

    def __aiter__(self):
        return self

    async def __anext__(self):
        while True:
            try:
                output = await self._stream.receive_until(b"\n", 65535)
            except (anyio.EndOfStream, anyio.IncompleteRead, anyio.ClosedResourceError):
                raise StopAsyncIteration

            # Simulate exception in user's code
            if random.random() < 0.02:
                raise IndexError

            return output


async def main():
    _script_path = str(Path(__file__).parent / "process.py")
    async with Generator() as gen:
        async for item in gen:
            print(item, flush=True)


if __name__ == "__main__":
    anyio.run(main)

WITHOUT fix -- spam of AssertionError: feed_data after feed_eof, as usual

WITH fix -- process just hangs and outputs nothing

This is consistent in Windows and Linux

@dolamroth
Copy link

dolamroth commented Jul 10, 2024

I think, the second test is not linked to the feed_eof issue. Rather, it may be due to a bit contrintuitive behavior of Process.aclose.

It turns out, that (in the case, if error pops in user's code and not in subprocess) you need to explicitly terminate or kill process before calling Process.aclose, because it calls await Process.wait() before Process.terminate().

If we substitute __aexit__ in the 2nd test with the following code, it works and raises IndexError as expected

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self._proc.terminate()
        await self._proc.aclose()
        return False

I've written tests for anyio (including with/without BufferedByteReceiveStream), trio and asyncio, and it works same everywhere.

@agronholm agronholm requested a review from Zac-HD August 29, 2024 14:41
Copy link
Collaborator

@Zac-HD Zac-HD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Shutdown logic is always tricky, at least until someone very carefully implments it as a context manager 😅

@agronholm
Copy link
Owner Author

@graingert correctly pointed out that if the stream is aclosed while another task is reading from it, it will hang, as there is no signal to stop waiting on the read. I'm struggling to write a proper test for this though.

@agronholm
Copy link
Owner Author

I've pushed an alternate solution, and an accompanying test to cover the case pointed out by @graingert.

@graingert
Copy link
Collaborator

Grumbles about asyncio streams

@agronholm agronholm merged commit 41647f4 into master Sep 5, 2024
17 checks passed
@agronholm agronholm deleted the fix-data-after-eof branch September 5, 2024 18:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Closing a process's stdout when there's more data to be read results in internal asyncio error
4 participants