Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do a better job of communicating the problems with using nurseries/cancel scopes inside generators #638

Open
miracle2k opened this issue Aug 29, 2018 · 27 comments

Comments

@miracle2k
Copy link
Contributor

miracle2k commented Aug 29, 2018

I am aware of the issues with cleaning up async iterators, PEP 533 and the need for aclosing. However, if a closing is not used, which I overlook frequently, things can really blow up in your face:

import trio

async def gen_with_task():
    async with trio.open_nursery() as n:
        n.start_soon(trio.sleep_forever)
        while True:
            await trio.sleep(1)
            yield 1

async def main():
    async for f in gen_with_task():
        raise ValueError()

trio.run(main)

gives me:

Exception ignored in: <async_generator object gen_with_task at 0x10c7857b8>
RuntimeError: async generator ignored GeneratorExit
Traceback (most recent call last):
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/site-packages/trio/_core/_run.py", line 1229, in run
    result = run_impl(runner, async_fn, args)
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/site-packages/trio/_core/_run.py", line 1357, in run_impl
    runner.task_exited(task, final_result)
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/site-packages/trio/_core/_run.py", line 852, in task_exited
    self.tasks.remove(task)
KeyError: <Task '__main__.main' at 0x10c72c668>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "triotest.py", line 14, in <module>
    trio.run(main)
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/site-packages/trio/_core/_run.py", line 1235, in run
    ) from exc
trio.TrioInternalError: internal error in trio - please file a bug!
Exception ignored in: <coroutine object _AsyncGeneratorContextManager.__aexit__ at 0x10c78f648>
Traceback (most recent call last):
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/site-packages/async_generator/_util.py", line 84, in __aexit__
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/site-packages/async_generator/_util.py", line 14, in __aexit__
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/site-packages/async_generator/_impl.py", line 308, in aclose
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/site-packages/async_generator/_impl.py", line 277, in athrow
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/site-packages/async_generator/_impl.py", line 290, in _do_it
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/site-packages/async_generator/_impl.py", line 197, in __next__
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/site-packages/async_generator/_impl.py", line 209, in _invoke
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/site-packages/trio/_core/_run.py", line 311, in open_nursery
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/contextlib.py", line 130, in __exit__
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/site-packages/trio/_core/_run.py", line 198, in open_cancel_scope
  File "/Users/michael/.pyenv/versions/3.7.0/lib/python3.7/site-packages/trio/_core/_run.py", line 157, in _remove_task
KeyError: <Task '__main__.main' at 0x10c72c668>

Note that in addition to the internal trio exceptions, the original cause is not shown at all. This happens when the async generator had a task running in a nursery.

Maybe there is no way to handle this better - I can't tell, this is above my paygrade! But in case there is a way to show a clearer error message, I think it might be worth it.

@njsmith
Copy link
Member

njsmith commented Aug 30, 2018

Yeah, this is tricky.

So one problem is that we aren't communicating the issues here well (partly because I've struggled to understand them myself!). My current understanding is that unless you're using @contextmanager or @asynccontextmanager, it is always wrong to put a yield inside a nursery block or cancel scope block. Using aclosing is enough to make regular resource-cleanup with/async with work reliably inside generators, but nurseries and cancel scopes are even more complicated: they assume that between when you enter the block, and when you exit the block, you only run code that's inside the block. Where "inside" means, if an exception is raised, then the block will have a chance to catch it. In particular, consider what happens if you have a background task, and it crashes at a moment when the generator is suspended – trio will try to cancel all the code inside the nursery block, but it will get confused and end up cancelling whatever code is currently running in that task instead. So... I'm afraid your example code above is not really fixable.

We might be able to handle this better in 3.8, maybe with some version of PEP 568; in fact while writing this I had an idea which I've added to #495 :-). But right now there's not a lot we can do except getting better at educating users.

We should definitely have a clear description of this issue and its consequences in the docs.

Also, you're right about the error message here: we should be able to detect this case (at least some of the time), and give a better error message. I think we can diagnose it reliably by detecting cases where we try to leave a cancel scope and it's not at the top of the cancel stack, or when a task exits with a cancel stack that's bigger than the one it started with.

@njsmith njsmith changed the title Exceptions in an async generator, if aclosing is not used Do a better job of communicating the problems with using nurseries/cancel scopes inside generators Aug 30, 2018
@miracle2k
Copy link
Contributor Author

miracle2k commented Sep 4, 2018

My current understanding is that unless you're using @contextmanager or @asynccontextmanager, it is always wrong to put a yield inside a nursery block or cancel scope block. Using aclosing is enough to make regular resource-cleanup with/async with work reliably inside generators, but nurseries and cancel scopes are even more complicated:

So since I am once again struggling with a generator-related issue (I'm now tending towards "avoid them"), let me ask for clarification. Until now I read this as meaning that as long as I am using aclosing, I will be fine. If I replace my example from the initial post with:

import trio

async def gen_with_task():
    async with trio.open_nursery() as n:
        n.start_soon(trio.sleep_forever)
        while True:
            await trio.sleep(1)
            yield 1

async def main():
    iterable = gen_with_task()         # THIS IS NEW
    async with aclosing(iterable):  # THIS IS NEW
        async for f in iterable:
            raise ValueError()

trio.run(main)

Then things are appear to be going well. But you write:

Using aclosing is enough to make regular resource-cleanup with/async with work reliably inside generators, but nurseries and cancel scopes are even more complicated

Which would imply that using aclosing() is not enough?

My current understanding is that unless you're using @contextmanager or @asynccontextmanager, it is always wrong to put a yield inside a nursery block or cancel scope block.

Does that mean I should not be doing what I am doing above, even with aclosing? A generator function should never start a nursery, and even though this particular example seems to work now, there might be other side-effects?

Or is there some way to make it work using @contextmanager/@asynccontextmanager?

@miracle2k
Copy link
Contributor Author

So the case I find my self struggling with is this:

import trio
from async_generator import aclosing

async def fail_soon():
    raise ValueError()

async def subscribe():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(fail_soon)
        while True:            
            yield 1


async def main():
    events = subscribe()
    async with aclosing(events):
        async for event in events:
            # XXX
            await trio.sleep(1)

trio.run(main)

This causes a trio.MultiError: ValueError(), GeneratorExit().

My problem is the GeneratorExit that happens here. In my production case, I am actually seeing a MultiError with only GeneratorExist exceptions, but I haven't been able to reproduce that.

As far as I can tell, this is what happens:

  • The generator is suspended, and main is waiting on the XXX mark, awaiting the sleep to return.
  • Meanwhile, with main doing nothing, trio looks for other work. How about starting the fail_soon task?
  • fail_soon raises a ValueError, which causes the nursery that was created within the generator to be marked as cancelled.
  • XXX belongs to the scope of the generator nursery, I think, so now the await, being a checkpoint, fails and raises `Cancelled .
  • Python unwinds and passes that exception to aclosing's __aexit__. I verified that it receives a a Cancelled exception. It calls events.aclose().

Somehow this leads to a GeneratorExit. I'm quite curious how that that exception comes to be, and why it is not hidden from me in this case.

@smurfix
Copy link
Contributor

smurfix commented Sep 5, 2018

aclosing() hits the subscribe() generator with a GeneratorExit exception, and will ignore that Exception when it gets it back. However, your subscribe doesn't (re-)raise that GeneratorExit – it raises MultiError with a GeneratorExit inside, but Python doesn't introspect that (obviously).

Fix: ignore that error:

async def subscribe():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(fail_soon)
        try:
            while True:
                yield 1
        except GeneratorExit:
            return

@smurfix
Copy link
Contributor

smurfix commented Sep 5, 2018

Writing a context manager which does the same thing is left as an exercise to the reader. ;-)

@njsmith
Copy link
Member

njsmith commented Sep 5, 2018

Does that mean I should not be doing what I am doing above, even with aclosing? A generator function should never start a nursery, and even though this particular example seems to work now, there might be other side-effects?

Correct :-(

(To be totally precise: a generator function can use a nursery – e.g. it's OK to call trio.open_tcp_stream, which uses a nursery internally for the happy eyeballs stuff – but it shouldn't yield inside the nursery block.)

@smurfix is right about what's happening with the GeneratorExit error, but I wouldn't recommend using that workaround... it doesn't address the underlying issue. In particular, consider what happens when a background task inside the nursery crashes, at a moment when you're blocked inside the body of the for loop. Trio will try to deliver a cancellation to the blocked function – even though it's not inside the generator! – because Trio has no way to know that you've yielded out of the nursery; it just knows that you've entered the nursery, and haven't exited it yet. This Cancelled exception is supposed to be caught by the nursery block... but the nursery block is inside the generator, and the Cancelled exception is outside, so the nursery block can't catch it. Instead it propagates all the way out and crashes your program.

There is good news though: you can make this API work basically the same as before, just, instead of using aclosing as a context manager, you'll want to define a custom context manager that handles the nursery. That's totally fine. SO something like:

@asynccontextmanager
async def subscribe():
    events = await set_up_subscription()
    async with trio.open_nursery() as nursery:
        nursery.start_soon(background_task)
        async def events_gen_function():
            while True:
                yield get_something_from_background_task_or_whatever()
        async with aclosing(events_gen_function()) as events:
            yield events

# Usage:
async with subscribe() as events:
    async for event in events:
        ...

So the pattern is: use an @asynccontextmanager to set up the nursery and start any background tasks, then from it, yield an async iterable. That async iterable can now trust that the background task is running.

@njsmith
Copy link
Member

njsmith commented Sep 6, 2018

I guess we could have a @async_generator_with_nursery decorator that does some hacky thing where it makes it so (a) you have to use async with function() as async_iterable: ... instead of calling it directly, and (b) if the code inside the async with block crashes, the exception gets thrown into the suspended generator. (And if it doesn't crash, then GeneratorExit gets thrown in.) So it'd be like a super-weird hybrid of a context manager generator and a regular iterable generator.

This idea violates all kinds of conventional rules and taste, but maybe it'd be pragmatic and useful anyway? idk

@alexshpilkin
Copy link

alexshpilkin commented Sep 21, 2018

Here’s one more example where one might want this (adapted from my recent code):

async def collect(*awaitables): # essentially the same thing as asyncio.as_completed()
    results = Queue(0)
    async def perform(a):
        await results.push(await a)
    async with open_nursery() as nursery:
        for a in awaitables: nursery.start_soon(a)
        while nursery.child_tasks: yield await results.get()

I also had a similar function, merge(), for a list of async iterators (essentially aiostream.merge). But the point here is not the functions, but the fact that these are actual, useful combinators. Not only is it impossible to write them nicely as above, but, if we keep the interface, it’s impossible to write them in trio at all. Of course, we could make the nursery an argument of collect()/merge(), but this looks like a leaky abstraction to me (especially given that race(), which is like collect() but only returns the first result, does not need a nursery argument).

Side note: I understand the desire for minimal APIs, but trio needs more combinators like these. It has lots of useful stuff, but I want to be able to talk about it in my programs. (Similarly, str does not strictly need str.split(), but it would be infuriating to write it from scratch every time.)

@alexshpilkin
Copy link

For the purposes of this comment, a “thread” will mean anything with a separate (Python) instruction pointer (whether or not it has a stack, is associated with a trio task or an OS thread, etc.).

@njsmith says:

So one problem is that we aren't communicating the issues here well (partly because I've struggled to understand them myself!)

Semantically speaking, what cancel scopes and (to some extent) nurseries implement is dynamic scoping. For example, I see the cancel scopes section of @njsmith’s essay as essentially advocating passing the cancel token in a dynamically scoped variable (Common Lisp “special variable”, Clojure “var”, etc.) instead of an explicit argument.

What’s relevant about dynamic scope here is that when a thread is created, it should (generally) inherit it from its parent. AFAIU, this is indeed what trio does when a child task is created (at least wrt cancel scopes). However, Python provides one more way (!) to create a thread: create a generator (whether async or not). It seems that trio doesn’t (or is unable to) handle this at all, and so it all comes crashing down.

@alexshpilkin
Copy link

alexshpilkin commented Sep 21, 2018

If I understand contexts, context vars and PEP 568 correctly (... :-/ ), a correct dynamic variable (suitable for e.g. a cancel token), given PEP 568, is simply something like the following ...

from contextlib  import contextmanager
from contextvars import ContextVar

class Dynamic:
    def __init__(self, name):
        self._value = ContextVar(name)
        self._stack = []
    @property
    def value(self):
        return self._value.get()
    @contextmanager
    def __call__(self, value):
        self._stack.append(self._value.set(value))
        try:
            yield
        finally:
            self._value.reset(self._stack.pop())

if __name__ == '__main__':
    v = Dynamic('v')
    with v('spam'):
        print(v.value) # spam
        with v('eggs and'):
            print(v.value) # eggs and
        print(v.value) # spam

... which really should be in the standard library.

@njsmith
Copy link
Member

njsmith commented Sep 21, 2018

Yeah, what cancel scopes need is something like dynamic scoping (or actually slightly more powerful – scopes can be nested, so we need to be able to fetch all the values on the stack, not just the latest value). That was a major motivation for writing PEP 568.

But, PEP 568 hasn't been accepted or implemented, so we can't actually use it currently. And the issues with nurseries are actually worse: if you can yield out of a nursery block, you can actually recreate the equivalent of a go statement by mistake. Currently the best overview of how all these things relate is this comment: #264 (comment)

At the python sprint last week, Yury (main asyncio maintainer) and i talked about this quite a bit, since asyncio has the same problem now that it's adding nurseries. The best we were able to come up with was to add a way for a context manager to forbid yields inside it. Though neither of us are overjoyed about it.

Regarding your collect: yes, this means that you can't implement that operation with that API. Notice that that API is unsafe though, in the "structured concurrency" sense: if you stop iterating half-way through and abandon the generator, then there will be orphaned tasks left dangling. What we can support though is an API like:

async with collect(...) as results:
    async for result in results:
        ...

Here the async with makes it safe for collect to create an internal nursery.

trio needs more combinators like these

Agreed. I want to get #586 finished first (so the results object in the example above can be a Channel), but then after that I think this is a high priority. Though tentatively my plan is to make it a separate library at least to start, and with some more substantial features, like built-in capacity management ("don't run more than 10 tasks at once", etc.).

@alexshpilkin
Copy link

By the way, it’s unclear to me how either PEP 568 or forbidding yield inside generators could accomodate creating nurseries inside context managers using contextlib.asynccontextmanager, like that mentioned in #569.

@njsmith
Copy link
Member

njsmith commented Sep 24, 2018

@alexshpilkin yeah, that's one of the major sources of complexity here. The idea is that we would keep a stack of Contexts (in PEP 568) or "is yield allowed" flags, and by default push/pop them when entering/exiting a generator, and yield would check this stack before yielding. BUT, we'd also have a mechanism to disable this push/pop/check mechanism, by setting some flag on the generator object, and the contextmanager decorator function would set this flag. So the intuition is that contextmanager would have some special case code to tell the interpreter that it's generator isn't really a separate context, and yield inside its generator isn't really a yield.

(There is a bit of discussion of this in PEP 568, if you search for contextmanager.)

@njsmith
Copy link
Member

njsmith commented Oct 21, 2018

Agreed. I want to get #586 finished first (so the results object in the example above can be a Channel), but then after that I think this is a high priority. Though tentatively my plan is to make it a separate library at least to start, and with some more substantial features, like built-in capacity management ("don't run more than 10 tasks at once", etc.).

See https://github.com/python-trio/trimeter

That's not really a drop-in replacement for async generators, though, it's a higher-level thing.

Here's a sketch for a possible alternative to async generators, that tries to keep a lot of the ergonomics that makes them attractive in the first place, while allowing background tasks and such: https://gist.github.com/njsmith/4db568255a276d4c7cf8a9a6b4295348

@miracle2k's original example using this:

# Add decorator, and send_channel argument
@producer
async def gen_with_task(*, send_channel):
    async with trio.open_nursery() as n:
        n.start_soon(trio.sleep_forever)
        while True:
            await trio.sleep(1)
            # Replace 'yield X' with 'await send_channel.send(X)'
            await send_channel.send(1)

async def main():
    # Now have to use 'async with' to get an async iterator (actually a full-fledge ReceiveChannel)
    async with gen_with_task() as aiter:
        async for f in aiter:
            raise ValueError()

@oremanj
Copy link
Member

oremanj commented Oct 22, 2018

I really like the ergonomics-per-implementation-complexity-unit of @producer. I think the version as listed won't quite work -- the async with send_channel: needs to be done inside the nursery task, else the channel won't be closed (and the iterating async for thus won't terminate) until the context is exited, which is too late.

I also played around with supporting real async generators, and I think it's possible to do so robustly:

def producer(wrapped):
    @asynccontextmanager
    @functools.wraps(wrapped)
    async def wrapper(*args, **kwargs):
        send_channel, receive_channel = trio.open_memory_channel(0)
        async with trio.open_nursery() as nursery:
            async def adapter():
                async with send_channel, aclosing(wrapped(*args, **kwargs)) as agen:
                    while True:
                        try:
                            # Advance underlying async generator to next yield
                            value = await agen.__anext__()
                        except StopAsyncIteration:
                            break
                        while True:
                            try:
                                # Forward the yielded value into the send channel
                                await send_channel.send(value)
                                break
                            except BaseException:
                                # If send_channel.send() raised (e.g. Cancelled),
                                # throw the raised exception back into the generator,
                                # and get the next yielded value to forward.
                                value = await agen.athrow(*sys.exc_info())

            nursery.start_soon(adapter, name=wrapped)
            async with receive_channel:
                yield receive_channel
    return wrapper

This is still technically suspending an async generator while it has nurseries/cancel scopes open, but the task that's iterating the async generator doesn't exit or enter any nurseries or cancel scopes while the async generator is suspended, and the exception forwarding effectively puts send_channel.send() into the same context as the async generator's yield. It seems to work well in practice, and supports things like putting a timeout around a yield.

@producer
async def squares_in_range(low, high):
    try:
        for i in range(low, high):
            with trio.move_on_after(0.5):
                yield i ** 2
    finally:
        print("unwinding")

async def do_it():
    async with squares_in_range(0, 100) as sqiter:
        async for square in sqiter:
            print(square)
            await trio.sleep(0.01 * square)
            if square >= 400:
                raise RuntimeError("kaboom")

@njsmith
Copy link
Member

njsmith commented Oct 22, 2018

Oo, neat idea.

I have a tentative intuition that we do want something like the .raw invocation, in particular to let you use non-default channel types. That can obviously be added to the async generator version though.

One thing I'm not sure of: suppose send raises an exception (like BrokenResourceError). With the explicit stream-passing version, it's obvious what happens: the code calls a stream method, that method can raise, it can handle it, etc. With the async generator version it's a little less obvious what should happen. In the code above, I think an exception from send causes the yield to raise GeneratorExit, and then after the GenrratorExit finishes unwinding the agen, it gets caught and the original exception keeps going. The alternative would be to take the exception that send raised, and throw it into the agen, so the yield acts exactly like a call to send.

On the one hand, it's kind of surprising to have yield raise BrokenResourceError. (Especially for people who aren't familiar with all the intricacies of generators, which i think is most people.) But it's also nice to have the traceback show the agen frame being cleaned up, and to give the agen a chance to respond to the exception.

Another thing I'm not sure of: the version that passes in a channel can call any method on the channel, including e.g. send_nowait or statistics or whatever. With the async generator version, yield is the equivalent of send, and there's no way to access the stream object itself. Does this matter?

@oremanj
Copy link
Member

oremanj commented Oct 22, 2018

The alternative would be to take the exception that send raised, and throw it into the agen, so the yield acts exactly like a call to send.

I believe the code above does in fact do that -- is there something I'm missing? The rethrowing is necessary to ensure that if a cancel scope inside the generator causes the send to be cancelled, the Cancelled exception passes through that cancel scope as it unwinds.

For what it's worth, I think the ergonomics of the generator solution might be further improved by suppressing BrokenResourceError explicitly on the call to send() in adapter, since generally when someone exits the generator context it can be assumed that they want the generator to be unwound. Otherwise you will pretty much always get BrokenResourceError raised out of the caller's async with block if the async for block is exited before the async generator is exhausted. This is probably open to debate though.

I agree that the version that passes send_channel explicitly is more flexible and arguably clearer. ("Explicit is better than implicit", etc.) On the other hand, the version that adapts an async generator is an easier fit for people who go "why does my async generator [that uses cancel scopes/nurseries] not work?", and will provide somewhat clearer support for yield from in async generators if CPython ever gets that. It's also only really possible to suppress BrokenResourceError in the async generator version, and I think the inability to suppress it will be bug-prone in the explicit channel version.

It should be feasible to write adapters that go in either direction, so I don't think it matters too much which one we decide should be "primitive".

@oremanj
Copy link
Member

oremanj commented Oct 22, 2018

We would probably also want to cancel the nursery inside wrapper() when the context gets exited. Closing the receive channel will deal with cleanup when wrapper() is suspended inside send_channel.send(), but won't help if it's suspended somewhere else (i.e. in some await in the async generator). We could optionally decide that this cancellation is always the way we clean up, and not close the receive channel until everything gets torn down. That would allow more parity between the asyncgen and explicit-channel implementations, and remove the BrokenResourceError trap.

@njsmith
Copy link
Member

njsmith commented Oct 23, 2018

I believe the code above does in fact do that -- is there something I'm missing?

Oh whoops, no, I just read too quickly on my phone :-).

The rethrowing is necessary to ensure that if a cancel scope inside the generator causes the send to be cancelled, the Cancelled exception passes through that cancel scope as it unwinds.

Ah yeah. That's a compelling argument against the other option, where we don't re-throw. Though it also makes me a little more dubious about the whole thing... especially since I'm trying to convince Yury that we should extend the interpreter to disallow yield inside cancel scopes! (Reasoning is here.)

For what it's worth, I think the ergonomics of the generator solution might be further improved by suppressing BrokenResourceError explicitly on the call to send() in adapter, since generally when someone exits the generator context it can be assumed that they want the generator to be unwound. Otherwise you will pretty much always get BrokenResourceError raised out of the caller's async with block if the async for block is exited before the async generator is exhausted. This is probably open to debate though.

That's true, when people abandon a for loop they don't generally expect the generator to get annoyed at them about that.

We could optionally decide that this cancellation is always the way we clean up, and not close the receive channel until everything gets torn down. That would allow more parity between the asyncgen and explicit-channel implementations, and remove the BrokenResourceError trap.

Makes sense to me.

yield from

The .raw semantics are isomorphic to yield from, I guess. (.raw is a terrible name, we should come up with a better one. But the thing my gist called .raw.)

@belm0
Copy link
Member

belm0 commented May 23, 2019

Here I've implemented "suppressing BrokenResourceError explicitly on the call to send() in adapter". It seems to work. Please correct I've missed something.

def producer(wrapped):
    @asynccontextmanager
    @functools.wraps(wrapped)
    async def wrapper(*args, **kwargs):
        send_channel, receive_channel = trio.open_memory_channel(0)
        async with trio.open_nursery() as nursery:
            async def adapter():
                async with send_channel, aclosing(wrapped(*args, **kwargs)) as agen:
                    user_exit = False
                    while not user_exit:
                        try:
                            # Advance underlying async generator to next yield
                            value = await agen.__anext__()
                        except StopAsyncIteration:
                            break
                        while True:
                            try:
                                # Forward the yielded value into the send channel
                                try:
                                    await send_channel.send(value)
                                except trio.BrokenResourceError:
                                    user_exit = True
                                break
                            except BaseException:
                                # If send_channel.send() raised (e.g. Cancelled),
                                # throw the raised exception back into the generator,
                                # and get the next yielded value to forward.
                                value = await agen.athrow(*sys.exc_info())
    
            nursery.start_soon(adapter, name=wrapped)
            async with receive_channel:
                yield receive_channel
    return wrapper

@belm0
Copy link
Member

belm0 commented May 23, 2019

It doesn't appear to handle Cancelled aborting the yield as advertised however:

@producer
async def squares_in_range(low, high):
    try:
        for i in range(low, high):
            with trio.move_on_after(0.5):
                yield i ** 2
    finally:
        print("unwinding")

async def test_producer_cancelled():
    async with squares_in_range(0, 50) as squares:
        async for _ in squares:
            await trio.sleep(1)
RuntimeError: async generator raised StopAsyncIteration

@belm0
Copy link
Member

belm0 commented Sep 25, 2020

I've fixed the StopAsyncIteration issue based on a hint here: agronholm/asyncio_extras#2

at this point there several references to the async generator adapter approach in this issue-- there really should be a working, tested implementation

I'll make a PR for trio-util and open it to review

@belm0
Copy link
Member

belm0 commented Sep 25, 2020

trio-util @trio_async_generator: groove-x/trio-util#9

AllSeeingEyeTolledEweSew added a commit to AllSeeingEyeTolledEweSew/tvaf that referenced this issue Oct 7, 2022
goodboy added a commit to pikers/piker that referenced this issue Apr 21, 2023
`trio`'s internals don't allow for async generator (and thus by
consequence dynamic reset of async exit stacks containing `@acm`s)
interleaving since doing so corrupts the cancel-scope stack. See details
in:
- python-trio/trio#638
- https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator

We originally tried to address this using
`@trio_util.trio_async_generator` in backend streaming code but for
whatever reason stopped working recently (at least for me) and it's more
or less implemented the same way as this patch but with more layers and
an extra dep. I also don't want us to have to address this problem again
if/when that lib isn't able to keep up to date with wtv `trio` is
doing..

So instead this is a complete rewrite of the conc design of our
auto-reconnect ws API to move all reset logic and msg relay into a bg
task which is respawned on reset-requiring events: user spec-ed msg recv
latency, network errors, roaming events.

Deatz:
- drop all usage of `AsyncExitStack` and no longer require client code
  to (hackily) call `NoBsWs._connect()` on msg latency conditions,
  intead this is all done behind the scenes and the user can instead
  pass in a `msg_recv_timeout: float`.
- massively simplify impl of `NoBsWs` and move all reset logic into a
  new `_reconnect_forever()` task.
- offer use of `reset_after: int` a count value that determines how many
  `msg_recv_timeout` events are allowed to occur before reconnecting the
  entire ws from scratch again.
goodboy added a commit to pikers/piker that referenced this issue Apr 21, 2023
`trio`'s internals don't allow for async generator (and thus by
consequence dynamic reset of async exit stacks containing `@acm`s)
interleaving since doing so corrupts the cancel-scope stack. See details
in:
- python-trio/trio#638
- https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator
- `trio._core._run.MISNESTING_ADVICE`

We originally tried to address this using
`@trio_util.trio_async_generator` in backend streaming code but for
whatever reason stopped working recently (at least for me) and it's more
or less implemented the same way as this patch but with more layers and
an extra dep. I also don't want us to have to address this problem again
if/when that lib isn't able to keep up to date with wtv `trio` is
doing..

So instead this is a complete rewrite of the conc design of our
auto-reconnect ws API to move all reset logic and msg relay into a bg
task which is respawned on reset-requiring events: user spec-ed msg recv
latency, network errors, roaming events.

Deatz:
- drop all usage of `AsyncExitStack` and no longer require client code
  to (hackily) call `NoBsWs._connect()` on msg latency conditions,
  intead this is all done behind the scenes and the user can instead
  pass in a `msg_recv_timeout: float`.
- massively simplify impl of `NoBsWs` and move all reset logic into a
  new `_reconnect_forever()` task.
- offer use of `reset_after: int` a count value that determines how many
  `msg_recv_timeout` events are allowed to occur before reconnecting the
  entire ws from scratch again.
goodboy added a commit to pikers/piker that referenced this issue May 9, 2023
`trio`'s internals don't allow for async generator (and thus by
consequence dynamic reset of async exit stacks containing `@acm`s)
interleaving since doing so corrupts the cancel-scope stack. See details
in:
- python-trio/trio#638
- https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator
- `trio._core._run.MISNESTING_ADVICE`

We originally tried to address this using
`@trio_util.trio_async_generator` in backend streaming code but for
whatever reason stopped working recently (at least for me) and it's more
or less implemented the same way as this patch but with more layers and
an extra dep. I also don't want us to have to address this problem again
if/when that lib isn't able to keep up to date with wtv `trio` is
doing..

So instead this is a complete rewrite of the conc design of our
auto-reconnect ws API to move all reset logic and msg relay into a bg
task which is respawned on reset-requiring events: user spec-ed msg recv
latency, network errors, roaming events.

Deatz:
- drop all usage of `AsyncExitStack` and no longer require client code
  to (hackily) call `NoBsWs._connect()` on msg latency conditions,
  intead this is all done behind the scenes and the user can instead
  pass in a `msg_recv_timeout: float`.
- massively simplify impl of `NoBsWs` and move all reset logic into a
  new `_reconnect_forever()` task.
- offer use of `reset_after: int` a count value that determines how many
  `msg_recv_timeout` events are allowed to occur before reconnecting the
  entire ws from scratch again.
goodboy added a commit to pikers/piker that referenced this issue May 9, 2023
`trio`'s internals don't allow for async generator (and thus by
consequence dynamic reset of async exit stacks containing `@acm`s)
interleaving since doing so corrupts the cancel-scope stack. See details
in:
- python-trio/trio#638
- https://trio-util.readthedocs.io/en/latest/#trio_util.trio_async_generator
- `trio._core._run.MISNESTING_ADVICE`

We originally tried to address this using
`@trio_util.trio_async_generator` in backend streaming code but for
whatever reason stopped working recently (at least for me) and it's more
or less implemented the same way as this patch but with more layers and
an extra dep. I also don't want us to have to address this problem again
if/when that lib isn't able to keep up to date with wtv `trio` is
doing..

So instead this is a complete rewrite of the conc design of our
auto-reconnect ws API to move all reset logic and msg relay into a bg
task which is respawned on reset-requiring events: user spec-ed msg recv
latency, network errors, roaming events.

Deatz:
- drop all usage of `AsyncExitStack` and no longer require client code
  to (hackily) call `NoBsWs._connect()` on msg latency conditions,
  intead this is all done behind the scenes and the user can instead
  pass in a `msg_recv_timeout: float`.
- massively simplify impl of `NoBsWs` and move all reset logic into a
  new `_reconnect_forever()` task.
- offer use of `reset_after: int` a count value that determines how many
  `msg_recv_timeout` events are allowed to occur before reconnecting the
  entire ws from scratch again.
@Zac-HD
Copy link
Member

Zac-HD commented Nov 30, 2024

Robustly wrapping up an async generator into a background task + channel has turned out to be hilariously difficult: you can see the long list of attempts above... and I recently found a latent deadlock in my best synthesis.

I therefore suggest that we should ship a well-tested wrapper function as part of Trio itself, which will also let us offer very clear advice on how to safely use async generators ("only if wrapped in known decorators", cf this lint rule).


Here's a branch with my implementation. Changes relative to those above:

  • accepts a max_buffer_size argument, to support cases where the user wants more of a buffer. Note that using a background task already allows execution to be interleaved, whereas iterating an async generator per se suspends the calling code until the generator yields, so even zero is effectively a single-element buffer. (I've only seen this break existing code once though, and it was not idiomatic)
  • named @background_with_channel(); I wanted to describe what the decorator did. Open to suggested names though.
  • when we await send_chan.send(value), if BrokenResourceError is raised then return cleanly (e.g. when break was used), and if Cancelled is raised then re-raise it; otherwise throw into the generator.
    • The special case for Cancelled is so that we can allow (context managers inside) the generator to suppress exceptions, but not to suppress a Cancelled. Which would be mostly futile in Trio due to level-triggered state etc, but this is much easier to reason about and to port to anyio/asyncio without worrying.
  • cancels the background task immediately on the context manager __aexit__, rather than waiting for the generator to be ready to yield. This fixed a deadlock, which I hope is rare, but it's also a pretty pure performance win to cancel work where we're never going to use the result. Toy repro:
@background_with_channel()
async def agen():
    yield 1
    await trio.sleep_forever()  # simulate deadlock
    yield 2


@trio.run
async def main():
    async with agen() as recv_chan:
        async for x in recv_chan:
            print(x)
            break  # exit, cleanup should be quick
    # comment `nursery.cancel_scope.cancel()` and it hangs

@A5rocks
Copy link
Contributor

A5rocks commented Dec 1, 2024

I therefore suggest that we should ship a well-tested wrapper function as part of Trio itself, which will also let us offer very clear advice on how to safely use async generators ("only if wrapped in known decorators", cf this lint rule).

This would make sense to me -- though as a counterpoint, I'm not sure this is that popular a use case. For instance, a quick GH code search for trio_async_generator shows 2 projects using it (obviously I may have missed some + likely more than that in private code). I know I would probably use this nonetheless.

Here's a branch with my implementation.

I can't tell whether this handles aclose raising an error on Python <3.10 when exiting the context manager with an exception. But minor issues like that can be checked/fixed at a PR stage.

@TeamSpen210
Copy link
Contributor

Even if it's not that popular, the consequences of using async gens wrong are pretty dire, it might be good to just promote always using this wrapper perhaps to just eliminate the issue?

@Zac-HD
Copy link
Member

Zac-HD commented Dec 3, 2024

Yep. I also think it'd be good to get this helper into Trio and likely AnyIO; that would also give me a clear recommendation I can make in PEP-789 and flake8-async.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants