From 1409c7a1b0187ce0a4ad612953474d0580c8be24 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 30 Jul 2018 01:47:03 -0700 Subject: [PATCH 01/23] Add trio.open_channel --- trio/__init__.py | 2 + trio/_channel.py | 207 +++++++++++++++++++++++++++++++++++++ trio/tests/test_channel.py | 76 ++++++++++++++ 3 files changed, 285 insertions(+) create mode 100644 trio/_channel.py create mode 100644 trio/tests/test_channel.py diff --git a/trio/__init__.py b/trio/__init__.py index adf8221f2a..ad05e84bf6 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -38,6 +38,8 @@ from ._highlevel_generic import aclose_forcefully, StapledStream +from ._channel import open_channel, EndOfChannel, BrokenChannelError + from ._signals import catch_signals, open_signal_receiver from ._highlevel_socket import SocketStream, SocketListener diff --git a/trio/_channel.py b/trio/_channel.py new file mode 100644 index 0000000000..49542fb55a --- /dev/null +++ b/trio/_channel.py @@ -0,0 +1,207 @@ +from collections import deque, OrderedDict +from math import inf + +import attr +from outcome import Error, Value + +from . import _core +from ._util import aiter_compat + +# TODO: +# - introspection: +# - statistics +# - capacity, usage +# - repr +# - BrokenResourceError? +# - tests +# - docs +# - should there be a put_back method that inserts an item at the front of the +# queue, while ignoring length limits? (the idea being that you call this +# from a task that is also doing get(), and making get() block on put() is +# a ticket to deadlock city) Example use case: depth-first traversal of a +# directory tree. (Well... does this work? If you start out 10-wide then you +# won't converge on a single DFS quickly, or maybe at all... is that still +# good? do you actually want a priority queue that sorts by depth? maybe +# that is what you want. Huh.) + + +class EndOfChannel(Exception): + pass + + +class BrokenChannelError(Exception): + pass + + +def open_channel(capacity): + if capacity != inf and not isinstance(capacity, int): + raise TypeError("capacity must be an integer or math.inf") + if capacity < 0: + raise ValueError("capacity must be >= 0") + buf = ChannelBuf(capacity) + return PutChannel(buf), GetChannel(buf) + + +@attr.s(cmp=False, hash=False) +class ChannelBuf: + capacity = attr.ib() + data = attr.ib(default=attr.Factory(deque)) + # counts + put_channels = attr.ib(default=0) + get_channels = attr.ib(default=0) + # {task: value} + put_tasks = attr.ib(default=attr.Factory(OrderedDict)) + # {task: None} + get_tasks = attr.ib(default=attr.Factory(OrderedDict)) + + +class PutChannel: + def __init__(self, buf): + self._buf = buf + self.closed = False + self._tasks = set() + self._buf.put_channels += 1 + + @_core.disable_ki_protection + def put_nowait(self, value): + if self.closed: + raise _core.ClosedResourceError + if not self._buf.get_channels: + raise BrokenChannelError + if self._buf.get_tasks: + assert not self._buf.data + task, _ = self._buf.get_tasks.popitem(last=False) + task.custom_sleep_data._tasks.remove(task) + _core.reschedule(task, Value(value)) + elif len(self._buf.data) < self._buf.capacity: + self._buf.data.append(value) + else: + raise _core.WouldBlock + + @_core.disable_ki_protection + async def put(self, value): + await _core.checkpoint_if_cancelled() + try: + self.put_nowait(value) + except _core.WouldBlock: + pass + else: + await _core.cancel_shielded_checkpoint() + return + + task = _core.current_task() + self._tasks.add(task) + self._buf.put_tasks[task] = value + task.custom_sleep_data = self + + def abort_fn(_): + self._tasks.remove(task) + del self._buf.put_tasks[task] + return _core.Abort.SUCCEEDED + + await _core.wait_task_rescheduled(abort_fn) + + @_core.disable_ki_protection + def clone(self): + if self.closed: + raise _core.ClosedResourceError + return PutChannel(self._buf) + + @_core.disable_ki_protection + def close(self): + if self.closed: + return + self.closed = True + for task in list(self._tasks): + _core.reschedule(task, Error(ClosedResourceError())) + self._buf.put_channels -= 1 + if self._buf.put_channels == 0: + assert not self._buf.put_tasks + for task in list(self._buf.get_tasks): + _core.reschedule(task, Error(EndOfChannel())) + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + +class GetChannel: + def __init__(self, buf): + self._buf = buf + self.closed = False + self._tasks = set() + self._buf.get_channels += 1 + + @_core.disable_ki_protection + def get_nowait(self): + if self.closed: + raise _core.ClosedResourceError + buf = self._buf + if buf.put_tasks: + task, value = buf.put_tasks.popitem(last=False) + task.custom_sleep_data._tasks.remove(task) + _core.reschedule(task) + buf.data.append(value) + # Fall through + if buf.data: + return buf.data.popleft() + if not buf.put_channels: + raise EndOfChannel + raise _core.WouldBlock + + @_core.disable_ki_protection + async def get(self): + await _core.checkpoint_if_cancelled() + try: + value = self.get_nowait() + except _core.WouldBlock: + pass + else: + await _core.cancel_shielded_checkpoint() + return value + + task = _core.current_task() + self._tasks.add(task) + self._buf.get_tasks[task] = None + task.custom_sleep_data = self + + def abort_fn(_): + self._tasks.remove(task) + del self._buf.get_tasks[task] + return _core.Abort.SUCCEEDED + + return await _core.wait_task_rescheduled(abort_fn) + + @_core.disable_ki_protection + def close(self): + if self.closed: + return + self.closed = True + for task in list(self._tasks): + _core.reschedule(task, Error(ClosedResourceError())) + self._buf.get_channels -= 1 + if self._buf.get_channels == 0: + assert not self._buf.get_tasks + for task in list(self._buf.put_tasks): + _core.reschedule(task, Error(BrokenChannelError())) + # XX: or if we're losing data, maybe we should raise a + # BrokenChannelError here? + self._buf.data.clear() + + @aiter_compat + def __aiter__(self): + return self + + async def __anext__(self): + try: + return await self.get() + except EndOfChannel: + raise StopAsyncIteration + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() diff --git a/trio/tests/test_channel.py b/trio/tests/test_channel.py new file mode 100644 index 0000000000..4157e8da9b --- /dev/null +++ b/trio/tests/test_channel.py @@ -0,0 +1,76 @@ +import pytest + +from ..testing import wait_all_tasks_blocked, assert_checkpoints +import trio +from trio import open_channel, EndOfChannel, BrokenChannelError + +async def test_channel(): + with pytest.raises(TypeError): + open_channel(1.0) + with pytest.raises(ValueError): + open_channel(-1) + + p, g = open_channel(2) + repr(p) # smoke test + repr(g) # smoke test + + p.put_nowait(1) + with assert_checkpoints(): + await p.put(2) + with pytest.raises(trio.WouldBlock): + p.put_nowait(None) + + with assert_checkpoints(): + assert await g.get() == 1 + assert g.get_nowait() == 2 + with pytest.raises(trio.WouldBlock): + g.get_nowait() + + p.put_nowait("last") + p.close() + with pytest.raises(trio.ClosedResourceError): + await p.put("too late") + + assert g.get_nowait() == "last" + with pytest.raises(EndOfChannel): + await g.get() + + +async def test_553(autojump_clock): + p, g = open_channel(1) + with trio.move_on_after(10) as timeout_scope: + await g.get() + assert timeout_scope.cancelled_caught + await p.put("Test for PR #553") + + +async def test_channel_fan_in(): + async def producer(put_channel, i): + # We close our handle when we're done with it + with put_channel: + for j in range(3 * i, 3 * (i + 1)): + await put_channel.put(j) + + put_channel, get_channel = open_channel(0) + async with trio.open_nursery() as nursery: + # We hand out clones to all the new producers, and then close the + # original. + with put_channel: + for i in range(10): + nursery.start_soon(producer, put_channel.clone(), i) + + got = [] + async for value in get_channel: + got.append(value) + + got.sort() + assert got == list(range(30)) + + +# tests to add: +# - put/get close wakes other puts/gets on close +# - for put, only wakes the ones on the same handle +# - get close -> also wakes puts +# - and future puts raise +# - all the queue tests, including e.g. fairness tests +# - statistics From 87b3c5dea28ab296ae598b02ab02c21e6ecceee7 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Wed, 26 Sep 2018 03:03:59 -0700 Subject: [PATCH 02/23] Drop some TODOs that aren't on the critical path --- trio/_channel.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/trio/_channel.py b/trio/_channel.py index 49542fb55a..aab226538e 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -12,17 +12,8 @@ # - statistics # - capacity, usage # - repr -# - BrokenResourceError? # - tests # - docs -# - should there be a put_back method that inserts an item at the front of the -# queue, while ignoring length limits? (the idea being that you call this -# from a task that is also doing get(), and making get() block on put() is -# a ticket to deadlock city) Example use case: depth-first traversal of a -# directory tree. (Well... does this work? If you start out 10-wide then you -# won't converge on a single DFS quickly, or maybe at all... is that still -# good? do you actually want a priority queue that sorts by depth? maybe -# that is what you want. Huh.) class EndOfChannel(Exception): From f22bd4b6f523984d1c73a20acd413bd7d0ae806e Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Wed, 26 Sep 2018 03:05:54 -0700 Subject: [PATCH 03/23] yapf --- trio/tests/test_channel.py | 1 + 1 file changed, 1 insertion(+) diff --git a/trio/tests/test_channel.py b/trio/tests/test_channel.py index 4157e8da9b..072237f6d4 100644 --- a/trio/tests/test_channel.py +++ b/trio/tests/test_channel.py @@ -4,6 +4,7 @@ import trio from trio import open_channel, EndOfChannel, BrokenChannelError + async def test_channel(): with pytest.raises(TypeError): open_channel(1.0) From 58b8d4684df1d114b50b1e0e0a759f6ad3467879 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Wed, 26 Sep 2018 03:34:41 -0700 Subject: [PATCH 04/23] Simplify now that we don't support cloning get channels --- trio/_channel.py | 121 +++++++++++++++++++++-------------------------- 1 file changed, 55 insertions(+), 66 deletions(-) diff --git a/trio/_channel.py b/trio/_channel.py index aab226538e..6d200495a0 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -29,47 +29,34 @@ def open_channel(capacity): raise TypeError("capacity must be an integer or math.inf") if capacity < 0: raise ValueError("capacity must be >= 0") - buf = ChannelBuf(capacity) - return PutChannel(buf), GetChannel(buf) - - -@attr.s(cmp=False, hash=False) -class ChannelBuf: - capacity = attr.ib() - data = attr.ib(default=attr.Factory(deque)) - # counts - put_channels = attr.ib(default=0) - get_channels = attr.ib(default=0) - # {task: value} - put_tasks = attr.ib(default=attr.Factory(OrderedDict)) - # {task: None} - get_tasks = attr.ib(default=attr.Factory(OrderedDict)) + get_channel = GetChannel(capacity) + put_channel = PutChannel(get_channel) + return put_channel, get_channel class PutChannel: - def __init__(self, buf): - self._buf = buf + def __init__(self, get_channel): + self._gc = get_channel self.closed = False self._tasks = set() - self._buf.put_channels += 1 + self._gc._open_put_channels += 1 - @_core.disable_ki_protection + @_core.enable_ki_protection def put_nowait(self, value): if self.closed: raise _core.ClosedResourceError - if not self._buf.get_channels: + if self._gc.closed: raise BrokenChannelError - if self._buf.get_tasks: - assert not self._buf.data - task, _ = self._buf.get_tasks.popitem(last=False) - task.custom_sleep_data._tasks.remove(task) + if self._gc._get_tasks: + assert not self._gc._data + task, _ = self._gc._get_tasks.popitem(last=False) _core.reschedule(task, Value(value)) - elif len(self._buf.data) < self._buf.capacity: - self._buf.data.append(value) + elif len(self._gc._data) < self._gc._capacity: + self._gc._data.append(value) else: raise _core.WouldBlock - @_core.disable_ki_protection + @_core.enable_ki_protection async def put(self, value): await _core.checkpoint_if_cancelled() try: @@ -82,34 +69,36 @@ async def put(self, value): task = _core.current_task() self._tasks.add(task) - self._buf.put_tasks[task] = value + self._gc._put_tasks[task] = value task.custom_sleep_data = self def abort_fn(_): self._tasks.remove(task) - del self._buf.put_tasks[task] + del self._gc._put_tasks[task] return _core.Abort.SUCCEEDED await _core.wait_task_rescheduled(abort_fn) - @_core.disable_ki_protection + @_core.enable_ki_protection def clone(self): if self.closed: raise _core.ClosedResourceError - return PutChannel(self._buf) + return PutChannel(self._gc) - @_core.disable_ki_protection + @_core.enable_ki_protection def close(self): if self.closed: return self.closed = True - for task in list(self._tasks): + for task in self._tasks: _core.reschedule(task, Error(ClosedResourceError())) - self._buf.put_channels -= 1 - if self._buf.put_channels == 0: - assert not self._buf.put_tasks - for task in list(self._buf.get_tasks): + self._tasks.clear() + self._gc._open_put_channels -= 1 + if self._gc._open_put_channels == 0: + assert not self._gc._put_tasks + for task in self._gc._get_tasks: _core.reschedule(task, Error(EndOfChannel())) + self._gc._get_tasks.clear() def __enter__(self): return self @@ -118,31 +107,35 @@ def __exit__(self, *args): self.close() +@attr.s(cmp=False, hash=False, repr=False) class GetChannel: - def __init__(self, buf): - self._buf = buf - self.closed = False - self._tasks = set() - self._buf.get_channels += 1 + _capacity = attr.ib() + _data = attr.ib(default=attr.Factory(deque)) + closed = attr.ib(default=False) + # count of open put channels + _open_put_channels = attr.ib(default=0) + # {task: value} + _put_tasks = attr.ib(default=attr.Factory(OrderedDict)) + # {task: None} + _get_tasks = attr.ib(default=attr.Factory(OrderedDict)) - @_core.disable_ki_protection + @_core.enable_ki_protection def get_nowait(self): if self.closed: raise _core.ClosedResourceError - buf = self._buf - if buf.put_tasks: - task, value = buf.put_tasks.popitem(last=False) + if self._put_tasks: + task, value = self._put_tasks.popitem(last=False) task.custom_sleep_data._tasks.remove(task) _core.reschedule(task) - buf.data.append(value) + self._data.append(value) # Fall through - if buf.data: - return buf.data.popleft() - if not buf.put_channels: + if self._data: + return self._data.popleft() + if not self._open_put_channels: raise EndOfChannel raise _core.WouldBlock - @_core.disable_ki_protection + @_core.enable_ki_protection async def get(self): await _core.checkpoint_if_cancelled() try: @@ -154,32 +147,28 @@ async def get(self): return value task = _core.current_task() - self._tasks.add(task) - self._buf.get_tasks[task] = None - task.custom_sleep_data = self + self._get_tasks[task] = None def abort_fn(_): - self._tasks.remove(task) - del self._buf.get_tasks[task] + del self._get_tasks[task] return _core.Abort.SUCCEEDED return await _core.wait_task_rescheduled(abort_fn) - @_core.disable_ki_protection + @_core.enable_ki_protection def close(self): if self.closed: return self.closed = True - for task in list(self._tasks): + for task in self._get_tasks: _core.reschedule(task, Error(ClosedResourceError())) - self._buf.get_channels -= 1 - if self._buf.get_channels == 0: - assert not self._buf.get_tasks - for task in list(self._buf.put_tasks): - _core.reschedule(task, Error(BrokenChannelError())) - # XX: or if we're losing data, maybe we should raise a - # BrokenChannelError here? - self._buf.data.clear() + self._get_tasks.clear() + for task in self._put_tasks: + _core.reschedule(task, Error(BrokenChannelError())) + self._put_tasks.clear() + # XX: or if we're losing data, maybe we should raise a + # BrokenChannelError here? + self._data.clear() @aiter_compat def __aiter__(self): From bdc5092429eb54c1397c8375977ac7da83aa15db Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 27 Sep 2018 16:37:35 -0700 Subject: [PATCH 05/23] hack hack --- setup.py | 2 +- trio/__init__.py | 2 +- trio/_channel.py | 249 ++++++++++++++++++++++++------------ trio/tests/test_channel.py | 253 ++++++++++++++++++++++++++++++++----- 4 files changed, 392 insertions(+), 114 deletions(-) diff --git a/setup.py b/setup.py index cc6728c093..73ac00ab8e 100644 --- a/setup.py +++ b/setup.py @@ -74,7 +74,7 @@ license="MIT -or- Apache License 2.0", packages=find_packages(), install_requires=[ - "attrs", + "attrs >= 18.1.0", # for attr.ib(factory=...) "sortedcontainers", "async_generator >= 1.9", "idna", diff --git a/trio/__init__.py b/trio/__init__.py index ad05e84bf6..287c507041 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -38,7 +38,7 @@ from ._highlevel_generic import aclose_forcefully, StapledStream -from ._channel import open_channel, EndOfChannel, BrokenChannelError +from ._channel import open_channel, EndOfChannel from ._signals import catch_signals, open_signal_receiver diff --git a/trio/_channel.py b/trio/_channel.py index 6d200495a0..d464b3f7d4 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -6,61 +6,135 @@ from . import _core from ._util import aiter_compat +from .abc import AsyncResource # TODO: -# - introspection: -# - statistics -# - capacity, usage -# - repr # - tests # - docs +# is there a better name for 'clone'? People seem to be having trouble with +# it. + +# rename SendChannel/ReceiveChannel to SendHandle/ReceiveHandle? +# eh, maybe not -- SendStream/ReceiveStream don't work like that. + +# send or send_object? eh just send is good enough + +# implementing this interface on top of a stream is very natural... just +# pickle/unpickle (+ some framing). Actually, even clone() is not bad at +# all... you just need a shared counter of open handles, and then close the +# underlying stream when all SendChannels are closed. + +# to think about later: +# - buffer_max=0 default? +# - should we make ReceiveChannel.close() raise BrokenChannelError if data gets +# lost? This isn't how ReceiveStream works. And it might not be doable for a +# channel that reaches between processes (e.g. data could be in flight but +# we don't know it yet) class EndOfChannel(Exception): pass -class BrokenChannelError(Exception): - pass +def open_channel(buffer_max): + """Open a channel for communicating between tasks. + + A channel is represented by two objects + Args: + buffer_max (int or math.inf): The maximum number of items that can be + buffered in the channel before :meth:`SendChannel.send` blocks. + Choosing a sensible value here is important to ensure that + backpressure is communicated promptly and avoid unnecessary latency. + If in doubt, use 0, which means that sends always block until another + task calls receive. -def open_channel(capacity): - if capacity != inf and not isinstance(capacity, int): - raise TypeError("capacity must be an integer or math.inf") - if capacity < 0: - raise ValueError("capacity must be >= 0") - get_channel = GetChannel(capacity) - put_channel = PutChannel(get_channel) - return put_channel, get_channel + Returns: + A pair (:class:`SendChannel`, :class:`ReceiveChannel`). Remember: data + flows from left to right. + """ + if buffer_max != inf and not isinstance(buffer_max, int): + raise TypeError("buffer_max must be an integer or math.inf") + if buffer_max < 0: + raise ValueError("buffer_max must be >= 0") + receive_channel = ReceiveChannel(buffer_max) + send_channel = SendChannel(receive_channel) + return send_channel, receive_channel -class PutChannel: - def __init__(self, get_channel): - self._gc = get_channel - self.closed = False + +@attr.s(frozen=True) +class ChannelStats: + buffer_used = attr.ib() + buffer_max = attr.ib() + open_send_channels = attr.ib() + tasks_waiting_send = attr.ib() + tasks_waiting_receive = attr.ib() + + +class SendChannel(AsyncResource): + def __init__(self, receive_channel): + self._rc = receive_channel + self._closed = False self._tasks = set() - self._gc._open_put_channels += 1 + self._rc._open_send_channels += 1 + + def __repr__(self): + return ( + "" + .format(id(self), id(self._rc)) + ) + + def statistics(self): + # XX should we also report statistics specific to this object, like + # len(self._tasks)? + return self._rc.statistics() @_core.enable_ki_protection - def put_nowait(self, value): - if self.closed: + def send_nowait(self, value): + """Attempt to send an object into the channel, without blocking. + + Args: + value (object): The object to send. + + Raises: + WouldBlock: if the channel is full. + ClosedResourceError: if this :class:`SendHandle` object has already + been _closed. + BrokenChannelError: if the receiving :class:`ReceiveHandle` object + has already been _closed. + + """ + if self._closed: raise _core.ClosedResourceError - if self._gc.closed: - raise BrokenChannelError - if self._gc._get_tasks: - assert not self._gc._data - task, _ = self._gc._get_tasks.popitem(last=False) + if self._rc._closed: + raise _core.BrokenChannelError + if self._rc._receive_tasks: + assert not self._rc._data + task, _ = self._rc._receive_tasks.popitem(last=False) _core.reschedule(task, Value(value)) - elif len(self._gc._data) < self._gc._capacity: - self._gc._data.append(value) + elif len(self._rc._data) < self._rc._capacity: + self._rc._data.append(value) else: raise _core.WouldBlock @_core.enable_ki_protection - async def put(self, value): + async def send(self, value): + """Attempt to send an object into the channel, blocking if necessary. + + Args: + value (object): The object to send. + + Raises: + ClosedResourceError: if this :class:`SendChannel` object has already + been _closed. + BrokenChannelError: if the receiving :class:`ReceiveHandle` object + has already been _closed. + + """ await _core.checkpoint_if_cancelled() try: - self.put_nowait(value) + self.send_nowait(value) except _core.WouldBlock: pass else: @@ -69,77 +143,94 @@ async def put(self, value): task = _core.current_task() self._tasks.add(task) - self._gc._put_tasks[task] = value + self._rc._send_tasks[task] = value task.custom_sleep_data = self def abort_fn(_): self._tasks.remove(task) - del self._gc._put_tasks[task] + del self._rc._send_tasks[task] return _core.Abort.SUCCEEDED await _core.wait_task_rescheduled(abort_fn) @_core.enable_ki_protection def clone(self): - if self.closed: + """Clone this send channel. + + Raises: + ClosedResourceError: if this :class:`SendChannel` object has already + been _closed. + """ + if self._closed: raise _core.ClosedResourceError - return PutChannel(self._gc) + return SendChannel(self._rc) @_core.enable_ki_protection - def close(self): - if self.closed: + async def aclose(self): + if self._closed: + await _core.checkpoint() return - self.closed = True + self._closed = True for task in self._tasks: - _core.reschedule(task, Error(ClosedResourceError())) + _core.reschedule(task, Error(_core.ClosedResourceError())) + del self._rc._send_tasks[task] self._tasks.clear() - self._gc._open_put_channels -= 1 - if self._gc._open_put_channels == 0: - assert not self._gc._put_tasks - for task in self._gc._get_tasks: + self._rc._open_send_channels -= 1 + if self._rc._open_send_channels == 0: + assert not self._rc._send_tasks + for task in self._rc._receive_tasks: _core.reschedule(task, Error(EndOfChannel())) - self._gc._get_tasks.clear() - - def __enter__(self): - return self - - def __exit__(self, *args): - self.close() + self._rc._receive_tasks.clear() + await _core.checkpoint() @attr.s(cmp=False, hash=False, repr=False) -class GetChannel: +class ReceiveChannel(AsyncResource): _capacity = attr.ib() - _data = attr.ib(default=attr.Factory(deque)) - closed = attr.ib(default=False) - # count of open put channels - _open_put_channels = attr.ib(default=0) + _data = attr.ib(factory=deque) + _closed = attr.ib(default=False) + # count of open send channels + _open_send_channels = attr.ib(default=0) # {task: value} - _put_tasks = attr.ib(default=attr.Factory(OrderedDict)) + _send_tasks = attr.ib(factory=OrderedDict) # {task: None} - _get_tasks = attr.ib(default=attr.Factory(OrderedDict)) + _receive_tasks = attr.ib(factory=OrderedDict) + + def statistics(self): + return ChannelStats( + buffer_used=len(self._data), + buffer_max=self._capacity, + open_send_channels=self._open_send_channels, + tasks_waiting_send=len(self._send_tasks), + tasks_waiting_receive=len(self._receive_tasks), + ) + + def __repr__(self): + return "".format( + id(self), self._open_send_channels + ) @_core.enable_ki_protection - def get_nowait(self): - if self.closed: + def receive_nowait(self): + if self._closed: raise _core.ClosedResourceError - if self._put_tasks: - task, value = self._put_tasks.popitem(last=False) + if self._send_tasks: + task, value = self._send_tasks.popitem(last=False) task.custom_sleep_data._tasks.remove(task) _core.reschedule(task) self._data.append(value) # Fall through if self._data: return self._data.popleft() - if not self._open_put_channels: + if not self._open_send_channels: raise EndOfChannel raise _core.WouldBlock @_core.enable_ki_protection - async def get(self): + async def receive(self): await _core.checkpoint_if_cancelled() try: - value = self.get_nowait() + value = self.receive_nowait() except _core.WouldBlock: pass else: @@ -147,28 +238,30 @@ async def get(self): return value task = _core.current_task() - self._get_tasks[task] = None + self._receive_tasks[task] = None def abort_fn(_): - del self._get_tasks[task] + del self._receive_tasks[task] return _core.Abort.SUCCEEDED return await _core.wait_task_rescheduled(abort_fn) @_core.enable_ki_protection - def close(self): - if self.closed: + async def aclose(self): + if self._closed: + await _core.checkpoint() return - self.closed = True - for task in self._get_tasks: - _core.reschedule(task, Error(ClosedResourceError())) - self._get_tasks.clear() - for task in self._put_tasks: - _core.reschedule(task, Error(BrokenChannelError())) - self._put_tasks.clear() + self._closed = True + for task in self._receive_tasks: + _core.reschedule(task, Error(_core.ClosedResourceError())) + self._receive_tasks.clear() + for task in self._send_tasks: + _core.reschedule(task, Error(_core.BrokenChannelError())) + self._send_tasks.clear() # XX: or if we're losing data, maybe we should raise a # BrokenChannelError here? self._data.clear() + await _core.checkpoint() @aiter_compat def __aiter__(self): @@ -176,12 +269,6 @@ def __aiter__(self): async def __anext__(self): try: - return await self.get() + return await self.receive() except EndOfChannel: raise StopAsyncIteration - - def __enter__(self): - return self - - def __exit__(self, *args): - self.close() diff --git a/trio/tests/test_channel.py b/trio/tests/test_channel.py index 072237f6d4..b37631da72 100644 --- a/trio/tests/test_channel.py +++ b/trio/tests/test_channel.py @@ -2,7 +2,7 @@ from ..testing import wait_all_tasks_blocked, assert_checkpoints import trio -from trio import open_channel, EndOfChannel, BrokenChannelError +from trio import open_channel, EndOfChannel async def test_channel(): @@ -11,67 +11,258 @@ async def test_channel(): with pytest.raises(ValueError): open_channel(-1) - p, g = open_channel(2) - repr(p) # smoke test - repr(g) # smoke test + s, r = open_channel(2) + repr(s) # smoke test + repr(r) # smoke test - p.put_nowait(1) + s.send_nowait(1) with assert_checkpoints(): - await p.put(2) + await s.send(2) with pytest.raises(trio.WouldBlock): - p.put_nowait(None) + s.send_nowait(None) with assert_checkpoints(): - assert await g.get() == 1 - assert g.get_nowait() == 2 + assert await r.receive() == 1 + assert r.receive_nowait() == 2 with pytest.raises(trio.WouldBlock): - g.get_nowait() + r.receive_nowait() - p.put_nowait("last") - p.close() + s.send_nowait("last") + await s.aclose() with pytest.raises(trio.ClosedResourceError): - await p.put("too late") + await s.send("too late") - assert g.get_nowait() == "last" + assert r.receive_nowait() == "last" with pytest.raises(EndOfChannel): - await g.get() + await r.receive() async def test_553(autojump_clock): - p, g = open_channel(1) + s, r = open_channel(1) with trio.move_on_after(10) as timeout_scope: - await g.get() + await r.receive() assert timeout_scope.cancelled_caught - await p.put("Test for PR #553") + await s.send("Test for PR #553") async def test_channel_fan_in(): - async def producer(put_channel, i): + async def producer(send_channel, i): # We close our handle when we're done with it - with put_channel: + async with send_channel: for j in range(3 * i, 3 * (i + 1)): - await put_channel.put(j) + await send_channel.send(j) - put_channel, get_channel = open_channel(0) + send_channel, receive_channel = open_channel(0) async with trio.open_nursery() as nursery: # We hand out clones to all the new producers, and then close the # original. - with put_channel: + async with send_channel: for i in range(10): - nursery.start_soon(producer, put_channel.clone(), i) + nursery.start_soon(producer, send_channel.clone(), i) got = [] - async for value in get_channel: + async for value in receive_channel: got.append(value) got.sort() assert got == list(range(30)) +async def test_close_basics(): + async def send_block(s, expect): + with pytest.raises(expect): + await s.send(None) + + # closing send -> other send gets ClosedResourceError + s, r = open_channel(0) + async with trio.open_nursery() as nursery: + nursery.start_soon(send_block, s, trio.ClosedResourceError) + await wait_all_tasks_blocked() + await s.aclose() + + # and it's persistent + with pytest.raises(trio.ClosedResourceError): + s.send_nowait(None) + with pytest.raises(trio.ClosedResourceError): + await s.send(None) + + # and receive is notified, of course + with pytest.raises(EndOfChannel): + r.receive_nowait() + with pytest.raises(EndOfChannel): + await r.receive() + + # closing receive -> send gets BrokenResourceError + s, r = open_channel(0) + async with trio.open_nursery() as nursery: + nursery.start_soon(send_block, s, trio.BrokenResourceError) + await wait_all_tasks_blocked() + await r.aclose() + + # and it's persistent + with pytest.raises(trio.BrokenResourceError): + s.send_nowait(None) + with pytest.raises(trio.BrokenResourceError): + await s.send(None) + + # closing receive -> other receive gets ClosedResourceError + async def receive_block(r): + with pytest.raises(trio.ClosedResourceError): + await r.receive() + + s, r = open_channel(0) + async with trio.open_nursery() as nursery: + nursery.start_soon(receive_block, r) + await wait_all_tasks_blocked() + await r.aclose() + + # and it's persistent + with pytest.raises(trio.ClosedResourceError): + r.receive_nowait() + with pytest.raises(trio.ClosedResourceError): + await r.receive() + + +async def test_close_multiple_send_handles(): + # With multiple send handles, closing one handle only wakes senders on + # that handle, but others can continue just fine + s1, r = open_channel(0) + s2 = s1.clone() + + async def send_will_close(): + with pytest.raises(trio.ClosedResourceError): + await s1.send("nope") + + async def send_will_succeed(): + await s2.send("ok") + + async with trio.open_nursery() as nursery: + nursery.start_soon(send_will_close) + nursery.start_soon(send_will_succeed) + await wait_all_tasks_blocked() + await s1.aclose() + assert await r.receive() == "ok" + + +async def test_inf_capacity(): + s, r = open_channel(float("inf")) + + # It's accepted, and we can send all day without blocking + async with s: + for i in range(10): + s.send_nowait(i) + + got = [] + async for i in r: + got.append(i) + assert got == list(range(10)) + + +async def test_statistics(): + s, r = open_channel(2) + + assert s.statistics() == r.statistics() + stats = s.statistics() + assert stats.buffer_used == 0 + assert stats.buffer_max == 2 + assert stats.open_send_channels == 1 + assert stats.tasks_waiting_send == 0 + assert stats.tasks_waiting_receive == 0 + + s.send_nowait(None) + assert s.statistics().buffer_used == 1 + + s2 = s.clone() + assert s.statistics().open_send_channels == 2 + await s.aclose() + assert s2.statistics().open_send_channels == 1 + + async with trio.open_nursery() as nursery: + s2.send_nowait(None) # fill up the buffer + assert s.statistics().buffer_used == 2 + nursery.start_soon(s2.send, None) + nursery.start_soon(s2.send, None) + await wait_all_tasks_blocked() + assert s.statistics().tasks_waiting_send == 2 + nursery.cancel_scope.cancel() + assert s.statistics().tasks_waiting_send == 0 + + # empty out the buffer again + try: + while True: + r.receive_nowait() + except trio.WouldBlock: + pass + + async with trio.open_nursery() as nursery: + nursery.start_soon(r.receive) + await wait_all_tasks_blocked() + assert s.statistics().tasks_waiting_receive == 1 + nursery.cancel_scope.cancel() + assert s.statistics().tasks_waiting_receive == 0 + + +async def test_channel_fairness(): + + # We can remove an item we just sent, and send an item back in after, if + # no-one else is waiting. + s, r = open_channel(1) + s.send_nowait(1) + assert r.receive_nowait() == 1 + s.send_nowait(2) + assert r.receive_nowait() == 2 + + # But if someone else is waiting to receive, then they "own" the item we + # send, so we can't receive it (even though we run first): + + result = None + + async def do_receive(r): + nonlocal result + result = await r.receive() + + async with trio.open_nursery() as nursery: + nursery.start_soon(do_receive, r) + await wait_all_tasks_blocked() + s.send_nowait(2) + with pytest.raises(trio.WouldBlock): + r.receive_nowait() + assert result == 2 + + # And the analogous situation for send: if we free up a space, we can't + # immediately send something in it if someone is already waiting to do + # that + s, r = open_channel(1) + s.send_nowait(1) + with pytest.raises(trio.WouldBlock): + s.send_nowait(None) + async with trio.open_nursery() as nursery: + nursery.start_soon(s.send, 2) + await wait_all_tasks_blocked() + assert r.receive_nowait() == 1 + with pytest.raises(trio.WouldBlock): + s.send_nowait(3) + assert (await r.receive()) == 2 + + +async def test_unbuffered(): + s, r = open_channel(0) + with pytest.raises(trio.WouldBlock): + r.receive_nowait() + with pytest.raises(trio.WouldBlock): + s.send_nowait(1) + + async def do_send(s, v): + with assert_checkpoints(): + await s.send(v) + + async with trio.open_nursery() as nursery: + nursery.start_soon(do_send, s, 1) + with assert_checkpoints(): + assert await r.receive() == 1 + with pytest.raises(trio.WouldBlock): + r.receive_nowait() + + # tests to add: -# - put/get close wakes other puts/gets on close -# - for put, only wakes the ones on the same handle -# - get close -> also wakes puts -# - and future puts raise -# - all the queue tests, including e.g. fairness tests -# - statistics +# - all the queue tests, including e.r. fairness tests From 5006e3ae1f71c87a12dba0596523ab1734fc8e9a Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 30 Sep 2018 22:13:12 -0700 Subject: [PATCH 06/23] Tests pass --- trio/_channel.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/trio/_channel.py b/trio/_channel.py index d464b3f7d4..24a513b1cd 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -108,7 +108,7 @@ def send_nowait(self, value): if self._closed: raise _core.ClosedResourceError if self._rc._closed: - raise _core.BrokenChannelError + raise _core.BrokenResourceError if self._rc._receive_tasks: assert not self._rc._data task, _ = self._rc._receive_tasks.popitem(last=False) @@ -256,7 +256,8 @@ async def aclose(self): _core.reschedule(task, Error(_core.ClosedResourceError())) self._receive_tasks.clear() for task in self._send_tasks: - _core.reschedule(task, Error(_core.BrokenChannelError())) + task.custom_sleep_data._tasks.remove(task) + _core.reschedule(task, Error(_core.BrokenResourceError())) self._send_tasks.clear() # XX: or if we're losing data, maybe we should raise a # BrokenChannelError here? From 6468e288f77b9afc26cc9e65e2fed666fdb2f8ae Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 30 Sep 2018 22:19:15 -0700 Subject: [PATCH 07/23] Add channel-based lock tests --- trio/tests/test_sync.py | 79 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 76 insertions(+), 3 deletions(-) diff --git a/trio/tests/test_sync.py b/trio/tests/test_sync.py index c30992804f..8cd041d794 100644 --- a/trio/tests/test_sync.py +++ b/trio/tests/test_sync.py @@ -558,11 +558,76 @@ async def do_put(q, v): q.get_nowait() -# Two ways of implementing a Lock in terms of a Queue. Used to let us put the -# Queue through the generic lock tests. - from .._sync import async_cm +from .._channel import open_channel + + +# Three ways of implementing a Lock in terms of a channel. Used to let us put +# the channel through the generic lock tests. + +@async_cm +class ChannelLock1: + def __init__(self, capacity): + self.s, self.r = open_channel(capacity) + for _ in range(capacity - 1): + self.s.send_nowait(None) + + def acquire_nowait(self): + self.s.send_nowait(None) + + async def acquire(self): + await self.s.send(None) + + def release(self): + self.r.receive_nowait() + + +@async_cm +class ChannelLock2: + def __init__(self): + self.s, self.r = open_channel(10) + self.s.send_nowait(None) + + def acquire_nowait(self): + self.r.receive_nowait() + + async def acquire(self): + await self.r.receive() + + def release(self): + self.s.send_nowait(None) + + +@async_cm +class ChannelLock3: + def __init__(self): + self.s, self.r = open_channel(0) + # self.acquired is true when one task acquires the lock and + # only becomes false when it's released and no tasks are + # waiting to acquire. + self.acquired = False + + def acquire_nowait(self): + assert not self.acquired + self.acquired = True + + async def acquire(self): + if self.acquired: + await self.s.send(None) + else: + self.acquired = True + await _core.checkpoint() + + def release(self): + try: + self.r.receive_nowait() + except _core.WouldBlock: + assert self.acquired + self.acquired = False + +# Three ways of implementing a Lock in terms of a Queue. Used to let us put +# the Queue through the generic lock tests. @async_cm class QueueLock1: @@ -630,6 +695,10 @@ def release(self): lambda: Semaphore(1), Lock, StrictFIFOLock, + lambda: ChannelLock1(10), + lambda: ChannelLock1(1), + ChannelLock2, + ChannelLock3, lambda: QueueLock1(10), lambda: QueueLock1(1), QueueLock2, @@ -640,6 +709,10 @@ def release(self): "Semaphore(1)", "Lock", "StrictFIFOLock", + "ChannelLock1(10)", + "ChannelLock1(1)", + "ChannelLock2", + "ChannelLock3", "QueueLock1(10)", "QueueLock1(1)", "QueueLock2", From c6351083eba6e7837a072223b2a35a14eb159d09 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 30 Sep 2018 22:22:36 -0700 Subject: [PATCH 08/23] Get channel tests to 100% coverage --- trio/tests/test_channel.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/trio/tests/test_channel.py b/trio/tests/test_channel.py index b37631da72..ca7b8efd59 100644 --- a/trio/tests/test_channel.py +++ b/trio/tests/test_channel.py @@ -31,10 +31,21 @@ async def test_channel(): await s.aclose() with pytest.raises(trio.ClosedResourceError): await s.send("too late") + with pytest.raises(trio.ClosedResourceError): + s.send_nowait("too late") + with pytest.raises(trio.ClosedResourceError): + s.clone() + await s.aclose() assert r.receive_nowait() == "last" with pytest.raises(EndOfChannel): await r.receive() + await r.aclose() + with pytest.raises(trio.ClosedResourceError): + await r.receive() + with pytest.raises(trio.ClosedResourceError): + await r.receive_nowait() + await r.aclose() async def test_553(autojump_clock): From 18c6402bb46cd2064b1228910ca1e344475ae2cc Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 30 Sep 2018 22:31:35 -0700 Subject: [PATCH 09/23] Add newsfragments --- newsfragments/497.feature.rst | 10 ++++++++++ newsfragments/497.removal.rst | 2 ++ 2 files changed, 12 insertions(+) create mode 100644 newsfragments/497.feature.rst create mode 100644 newsfragments/497.removal.rst diff --git a/newsfragments/497.feature.rst b/newsfragments/497.feature.rst new file mode 100644 index 0000000000..fe2d6d372d --- /dev/null +++ b/newsfragments/497.feature.rst @@ -0,0 +1,10 @@ +New and improved APIs for inter-task communication: +:func:`trio.open_channel` (replacing ``trio.Queue``). This interface +uses separate "send" and "receive" objects, for consistency with other +communication interfaces like :class:`trio.Stream`. Also, the two +objects can now be closed individually, making it much easier to +gracefully shut down a channel. Also, there's a cool ``clone`` +API to manage fan-in scenarios. Also, the API has been written to +allow for future channel-like objects that send objects across process +boundaries. Also, help I can't stop writing also. See +:func:`trio.open_channel` for more details. diff --git a/newsfragments/497.removal.rst b/newsfragments/497.removal.rst new file mode 100644 index 0000000000..1dd3987532 --- /dev/null +++ b/newsfragments/497.removal.rst @@ -0,0 +1,2 @@ +``trio.Queue`` has been deprecated, in favor of +:func:`trio.open_channel`. From 4b982f50bbf1047c855366c87d8281d2e28e4c76 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 1 Oct 2018 04:06:23 -0700 Subject: [PATCH 10/23] Deprecate Queue and UnboundedQueue, and start on channel docs --- docs/source/reference-core.rst | 109 +++++++++++-------- docs/source/reference-hazmat.rst | 32 +----- newsfragments/497.feature.rst | 11 +- newsfragments/497.removal.rst | 4 +- trio/_channel.py | 2 +- trio/_core/_unbounded_queue.py | 7 ++ trio/_core/tests/test_unbounded_queue.py | 4 + trio/_sync.py | 4 + trio/tests/test_highlevel_serve_listeners.py | 8 +- trio/tests/test_sync.py | 7 +- 10 files changed, 99 insertions(+), 89 deletions(-) diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index ae6bbedc32..668bc97be1 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -830,23 +830,23 @@ finishes first:: if not async_fns: raise ValueError("must pass at least one argument") - q = trio.Queue(1) + send_channel, receive_channel = trio.open_channel(0) async def jockey(async_fn): - await q.put(await async_fn()) + await send_channel.send(await async_fn()) async with trio.open_nursery() as nursery: for async_fn in async_fns: nursery.start_soon(jockey, async_fn) - winner = await q.get() + winner = await receive_channel.receive() nursery.cancel_scope.cancel() return winner This works by starting a set of tasks which each try to run their function, and then report back the value it returns. The main task -uses ``q.get()`` to wait for one to finish; as soon as the first task -crosses the finish line, it cancels the rest, and then returns the -winning value. +uses ``receive_channel.receive`` to wait for one to finish; as soon as +the first task crosses the finish line, it cancels the rest, and then +returns the winning value. Here if one or more of the racing functions raises an unhandled exception then Trio's normal handling kicks in: it cancels the others @@ -1191,11 +1191,11 @@ In trio, we standardize on the following conventions: :mod:`threading`.) We like this approach because it allows us to make the blocking version async and the non-blocking version sync. -* When a non-blocking method cannot succeed (the queue is empty, the - lock is already held, etc.), then it raises - :exc:`trio.WouldBlock`. There's no equivalent to the - :exc:`queue.Empty` versus :exc:`queue.Full` distinction – we just - have the one exception that we use consistently. +* When a non-blocking method cannot succeed (the channel is empty, the + lock is already held, etc.), then it raises :exc:`trio.WouldBlock`. + There's no equivalent to the :exc:`queue.Empty` versus + :exc:`queue.Full` distinction – we just have the one exception that + we use consistently. Fairness @@ -1245,59 +1245,78 @@ Broadcasting an event with :class:`Event` :members: -.. _queue: +.. _channels: -Passing messages with :class:`Queue` -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Passing messages through channels +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +A *channel* allows you to safely and conveniently send objects between +different tasks. + +.. autofunction:: open_channel + + +Closing and cloning ++++++++++++++++++++ + +example demonstrating close, simple async with on each side +result if either side exits -You can use :class:`Queue` objects to safely pass objects between -tasks. Trio :class:`Queue` objects always have a bounded size. Here's -a toy example to demonstrate why this is important. Suppose we have a -queue with two producers and one consumer:: +example demonstrating how fan-in makes it hard - async def producer(queue): +example demonstrating how clone solves this + + +Buffering in channels ++++++++++++++++++++++ + +When you create a channel, Trio forces you to choose how many objects +you can send without receiving, before send starts to block. You can +make this unbounded if you want, but this is risky. Here's a toy +example to demonstrate why. Suppose we have a channel with two +producers and one consumer:: + + async def producer(send_channel): while True: - await queue.put(1) + await send_channel.send(1) - async def consumer(queue): + async def consumer(receive_channel): while True: - print(await queue.get()) + print(await receive_channel.receive()) async def main(): - # This example won't work with Trio's actual Queue class, so - # imagine we have some sort of platonic ideal of an unbounded - # queue here: - queue = trio.HypotheticalQueue() + send_channel, receive_channel = trio.open_channel(math.inf) async with trio.open_nursery() as nursery: # Two producers - nursery.start_soon(producer, queue) - nursery.start_soon(producer, queue) + nursery.start_soon(producer, send_channel) + nursery.start_soon(producer, send_channel) # One consumer - nursery.start_soon(consumer, queue) + nursery.start_soon(consumer, receive_channel) trio.run(main) If we naively cycle between these three tasks in round-robin style, -then we put an item, then put an item, then get an item, then put an -item, then put an item, then get an item, ... and since on each cycle -we add two items to the queue but only remove one, then over time the -queue size grows arbitrarily large, our latency is terrible, we run -out of memory, it's just generally bad news all around. - -By placing an upper bound on our queue's size, we avoid this problem. -If the queue gets too big, then it applies *backpressure*: ``put`` +then we send an item, then send an item, then receive an item, then +send an item, then send an item, then receive an item, ... On each +cycle we send two items but only receive one, which means the extra +item has to be buffered inside the channel. And over time, this means +the buffer will keep growing and growing, our latency is terrible +(since each new item has to wait in line behind all the others in the +buffer), we run out of memory, it's just generally bad news all +around. + +By placing an upper bound on our buffer size, we avoid this problem. +If the buffer gets too big, then it applies *backpressure*: ``send`` blocks and forces the producers to slow down and wait until the -consumer calls ``get``. +consumer calls ``receive``. -You can also create a :class:`Queue` with size 0. In that case any -task that calls ``put`` on the queue will wait until another task -calls ``get`` on the same queue, and vice versa. This is similar to -the behavior of `channels as described in the CSP model +You can also create a channel without a buffer, by doing +``open_channel(0)``. In that case any task that calls ``send`` on the +channel will wait until another task calls ``receive`` on the same +channel, and vice versa. This is similar to how channels work in the +`classic Communicating Sequential Processes model `__. -.. autoclass:: Queue - :members: - Lower-level synchronization primitives ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index c3156617e1..65545253ec 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -46,7 +46,7 @@ Debugging and instrumentation Trio tries hard to provide useful hooks for debugging and instrumentation. Some are documented above (the nursery introspection -attributes, :meth:`trio.Queue.statistics`, etc.). Here are some more. +attributes, :meth:`trio.Lock.statistics`, etc.). Here are some more. Global statistics @@ -281,36 +281,6 @@ anything real. See `#26 :with: queue -Unbounded queues -================ - -In the section :ref:`queue`, we showed an example with two producers -and one consumer using the same queue, where the queue size would grow -without bound to produce unbounded latency and memory usage. -:class:`trio.Queue` avoids this by placing an upper bound on how big -the queue can get before ``put`` starts blocking. But what if you're -in a situation where ``put`` can't block? - -There is another option: the queue consumer could get greedy. Each -time it runs, it could eagerly consume all of the pending items before -allowing another task to run. (In some other systems, this would -happen automatically because their queue's ``get`` method doesn't -invoke the scheduler unless it has to block. But :ref:`in trio, get is -always a checkpoint `.) This works, but it's a bit -risky: basically instead of applying backpressure to specifically the -producer tasks, we're applying it to *all* the tasks in our system. -The danger here is that if enough items have built up in the queue, -then "stopping the world" to process them all may cause unacceptable -latency spikes in unrelated tasks. Nonetheless, this is still the -right choice in situations where it's impossible to apply backpressure -more precisely. So this is the strategy implemented by -:class:`UnboundedQueue`. The main time you should use this is when -working with low-level APIs like :func:`monitor_kevent`. - -.. autoclass:: UnboundedQueue - :members: - - Global state: system tasks and run-local variables ================================================== diff --git a/newsfragments/497.feature.rst b/newsfragments/497.feature.rst index fe2d6d372d..43778d2260 100644 --- a/newsfragments/497.feature.rst +++ b/newsfragments/497.feature.rst @@ -3,8 +3,9 @@ New and improved APIs for inter-task communication: uses separate "send" and "receive" objects, for consistency with other communication interfaces like :class:`trio.Stream`. Also, the two objects can now be closed individually, making it much easier to -gracefully shut down a channel. Also, there's a cool ``clone`` -API to manage fan-in scenarios. Also, the API has been written to -allow for future channel-like objects that send objects across process -boundaries. Also, help I can't stop writing also. See -:func:`trio.open_channel` for more details. +gracefully shut down a channel. Also, check out the nifty ``clone`` +API to make it easy to manage fan-in scenarios. Also, the API has been +written to allow for future channel-like objects that send objects +across process boundaries. Also, it supports unbounded buffering if +you really need it. Also, help I can't stop writing also. See +:ref:`channels` for more details. diff --git a/newsfragments/497.removal.rst b/newsfragments/497.removal.rst index 1dd3987532..cf6309e515 100644 --- a/newsfragments/497.removal.rst +++ b/newsfragments/497.removal.rst @@ -1,2 +1,2 @@ -``trio.Queue`` has been deprecated, in favor of -:func:`trio.open_channel`. +``trio.Queue`` and ``trio.hazmat.UnboundedQueue`` have been +deprecated, in favor of :func:`trio.open_channel`. diff --git a/trio/_channel.py b/trio/_channel.py index 24a513b1cd..0e77a43ff0 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -9,7 +9,6 @@ from .abc import AsyncResource # TODO: -# - tests # - docs # is there a better name for 'clone'? People seem to be having trouble with @@ -32,6 +31,7 @@ # channel that reaches between processes (e.g. data could be in flight but # we don't know it yet) + class EndOfChannel(Exception): pass diff --git a/trio/_core/_unbounded_queue.py b/trio/_core/_unbounded_queue.py index 9109b45a9c..f9e7321aa9 100644 --- a/trio/_core/_unbounded_queue.py +++ b/trio/_core/_unbounded_queue.py @@ -3,6 +3,7 @@ from .. import _core from .._util import aiter_compat +from .._deprecate import deprecated __all__ = ["UnboundedQueue"] @@ -43,6 +44,12 @@ class UnboundedQueue: """ + @deprecated( + "0.9.0", + issue=497, + thing="trio.hazmat.UnboundedQueue", + instead="trio.open_channel" + ) def __init__(self): self._lot = _core.ParkingLot() self._data = [] diff --git a/trio/_core/tests/test_unbounded_queue.py b/trio/_core/tests/test_unbounded_queue.py index d8d4dd7cf0..801c34ce46 100644 --- a/trio/_core/tests/test_unbounded_queue.py +++ b/trio/_core/tests/test_unbounded_queue.py @@ -5,6 +5,10 @@ from ... import _core from ...testing import assert_checkpoints, wait_all_tasks_blocked +pytestmark = pytest.mark.filterwarnings( + "ignore:.*UnboundedQueue:trio.TrioDeprecationWarning" +) + async def test_UnboundedQueue_basic(): q = _core.UnboundedQueue() diff --git a/trio/_sync.py b/trio/_sync.py index f9a22b49d9..51884f90db 100644 --- a/trio/_sync.py +++ b/trio/_sync.py @@ -6,6 +6,7 @@ from . import _core from ._util import aiter_compat +from ._deprecate import deprecated __all__ = [ "Event", @@ -843,6 +844,9 @@ class Queue: """ + @deprecated( + "0.9.0", issue=497, thing="trio.Queue", instead="trio.open_channel" + ) def __init__(self, capacity): if not isinstance(capacity, int): raise TypeError("capacity must be an integer") diff --git a/trio/tests/test_highlevel_serve_listeners.py b/trio/tests/test_highlevel_serve_listeners.py index 7d237419a6..d197c909ed 100644 --- a/trio/tests/test_highlevel_serve_listeners.py +++ b/trio/tests/test_highlevel_serve_listeners.py @@ -13,14 +13,14 @@ @attr.s(hash=False, cmp=False) class MemoryListener(trio.abc.Listener): closed = attr.ib(default=False) - accepted_streams = attr.ib(default=attr.Factory(list)) - queued_streams = attr.ib(default=attr.Factory(lambda: trio.Queue(1))) + accepted_streams = attr.ib(factory=list) + queued_streams = attr.ib(factory=(lambda: trio.open_channel(1))) accept_hook = attr.ib(default=None) async def connect(self): assert not self.closed client, server = memory_stream_pair() - await self.queued_streams.put(server) + await self.queued_streams[0].send(server) return client async def accept(self): @@ -28,7 +28,7 @@ async def accept(self): assert not self.closed if self.accept_hook is not None: await self.accept_hook() - stream = await self.queued_streams.get() + stream = await self.queued_streams[1].receive() self.accepted_streams.append(stream) return stream diff --git a/trio/tests/test_sync.py b/trio/tests/test_sync.py index 8cd041d794..da769019e9 100644 --- a/trio/tests/test_sync.py +++ b/trio/tests/test_sync.py @@ -9,6 +9,10 @@ from .._timeouts import sleep_forever, move_on_after from .._sync import * +pytestmark = pytest.mark.filterwarnings( + "ignore:.*trio.Queue:trio.TrioDeprecationWarning" +) + async def test_Event(): e = Event() @@ -561,10 +565,10 @@ async def do_put(q, v): from .._sync import async_cm from .._channel import open_channel - # Three ways of implementing a Lock in terms of a channel. Used to let us put # the channel through the generic lock tests. + @async_cm class ChannelLock1: def __init__(self, capacity): @@ -629,6 +633,7 @@ def release(self): # Three ways of implementing a Lock in terms of a Queue. Used to let us put # the Queue through the generic lock tests. + @async_cm class QueueLock1: def __init__(self, capacity): From 0655003c264deb3513700fbee25958e1a30aaf38 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 4 Oct 2018 02:11:01 -0700 Subject: [PATCH 11/23] Wrote lots of docs, and made changes triggered by docs Made SendChannel and ReceiveChannel into ABCs. Renamed open_channel to open_memory_channel --- docs/source/conf.py | 7 +- docs/source/design.rst | 7 +- docs/source/history.rst | 6 +- docs/source/reference-core.rst | 257 ++++++++++++++---- .../channels-fan-in-fan-out-broken.py | 28 ++ .../reference-core/channels-shutdown.py | 19 ++ docs/source/reference-core/channels-simple.py | 23 ++ docs/source/reference-io.rst | 18 ++ trio/__init__.py | 4 +- trio/_abc.py | 188 ++++++++++++- trio/_channel.py | 146 ++++------ trio/_core/__init__.py | 2 +- trio/_core/_exceptions.py | 9 + trio/tests/test_channel.py | 42 ++- trio/tests/test_highlevel_serve_listeners.py | 2 +- trio/tests/test_sync.py | 8 +- 16 files changed, 575 insertions(+), 191 deletions(-) create mode 100644 docs/source/reference-core/channels-fan-in-fan-out-broken.py create mode 100644 docs/source/reference-core/channels-shutdown.py create mode 100644 docs/source/reference-core/channels-simple.py diff --git a/docs/source/conf.py b/docs/source/conf.py index e5c1e28d40..9d5890b491 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -35,10 +35,7 @@ ("py:class", "trio.hazmat.RunLocal"), # trio.abc is documented at random places scattered throughout the docs ("py:mod", "trio.abc"), - # contextvars is added in 3.7, but the docs point to 3.6 - # these two entries can be removed after 3.7 is released - ("py:mod", "contextvars"), - ("py:class", "contextvars.Context"), + ("py:class", "math.inf"), ] autodoc_inherit_docstrings = False @@ -70,7 +67,7 @@ def setup(app): intersphinx_mapping = { "python": ('https://docs.python.org/3', None), - "outcome": ('https://outcome.readthedocs.org/en/latest/', None), + "outcome": ('https://outcome.readthedocs.io/en/latest/', None), } autodoc_member_order = "bysource" diff --git a/docs/source/design.rst b/docs/source/design.rst index 699aa16107..07ea830753 100644 --- a/docs/source/design.rst +++ b/docs/source/design.rst @@ -413,11 +413,6 @@ Specific style guidelines and the ``nowait`` version raises :exc:`trio.WouldBlock` if it would block. -* The word ``monitor`` is used for APIs that involve an - :class:`trio.hazmat.UnboundedQueue` receiving some kind of events. - (Examples: nursery ``.monitor`` attribute, some of the low-level I/O - functions in :mod:`trio.hazmat`.) - * ...we should, but currently don't, have a solid convention to distinguish between functions that take an async callable and those that take a sync callable. See `issue #68 @@ -447,7 +442,7 @@ strategy is to make sure that it's possible for independent packages to add new features on top of trio. Enforcing the ``trio`` vs ``trio._core`` split is a way of `eating our own dogfood `__: basic -functionality like :class:`trio.Queue` and :mod:`trio.socket` is +functionality like :class:`trio.Lock` and :mod:`trio.socket` is actually implemented solely in terms of public APIs. And the hope is that by doing this, we increase the chances that someone who comes up with a better kind of queue or wants to add some new functionality diff --git a/docs/source/history.rst b/docs/source/history.rst index 32a6d18536..2c87e50dbe 100644 --- a/docs/source/history.rst +++ b/docs/source/history.rst @@ -139,7 +139,7 @@ Features the creator's :mod:`contextvars` context, instead using one created at :func:`~trio.run`. (`#289 `__) -- Add support for :class:`trio.Queue` with `capacity=0`. Queue's implementation +- Add support for ``trio.Queue`` with ``capacity=0``. Queue's implementation is also faster now. (`#473 `__) - Switch to using standalone `Outcome @@ -397,7 +397,7 @@ Upcoming breaking changes with warnings (i.e., stuff that in 0.2.0 See `#68 `__ for details. -* :class:`trio.Queue`\'s ``join`` and ``task_done`` methods are +* ``trio.Queue``\'s ``join`` and ``task_done`` methods are deprecated without replacement (`#321 `__) @@ -424,7 +424,7 @@ Upcoming breaking changes with warnings (i.e., stuff that in 0.2.0 * ``trio.Result`` → ``trio.hazmat.Result`` * ``trio.Value`` → ``trio.hazmat.Value`` * ``trio.Error`` → ``trio.hazmat.Error`` - * ``trio.UnboundedQueue`` → :class:`trio.hazmat.UnboundedQueue` + * ``trio.UnboundedQueue`` → ``trio.hazmat.UnboundedQueue`` In addition, several introspection attributes are being renamed: diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 668bc97be1..a9ca0935ed 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -830,7 +830,7 @@ finishes first:: if not async_fns: raise ValueError("must pass at least one argument") - send_channel, receive_channel = trio.open_channel(0) + send_channel, receive_channel = trio.open_memory_channel(0) async def jockey(async_fn): await send_channel.send(await async_fn()) @@ -1247,75 +1247,236 @@ Broadcasting an event with :class:`Event` .. _channels: -Passing messages through channels -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Passing value through channels +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +*Channels* allow you to safely and conveniently send objects between +different tasks. They're useful implementing producer/consumer +patterns, including fan-in and fan-out. + +The channel API is defined by the abstract base classes +:class:`trio.abc.SendChannel` and :class:`trio.abc.ReceiveChannel`. +You can use these to implement your own custom channels, that do +things like pass objects between processes or over the network. But in +many cases, you just want to pass objects between different tasks +inside a single process. + +.. autofunction:: open_memory_channel + +.. note:: If you've the :mod:`threading` or :mod:`asyncio` modules, + you may be familiar with :class:`queue.Queue` or + :class:`asyncio.Queue`. In Trio, :func:`open_memory_channel` is + what you use when you're looking for a queue. The main difference + is that Trio splits the classic queue interface up into two + objects. The advantage of this is that it makes it possible to put + the two ends in different processes, and that we can close the two + sides separately + + +A simple channel example +++++++++++++++++++++++++ + +Here's a simple example of how to use channels: + +.. literalinclude:: reference-core/channels-simple.py + +If you run this, it prints: + +.. code-block:: none + + got value "message 0" + got value "message 1" + got value "message 2" + +And then it hangs forever. (Use control-C to quit.) + + +.. _channel-shutdown: + +Clean shutdown with channels +++++++++++++++++++++++++++++ + +Of course we don't generally like it when programs hang. What +happened? The problem is that the producer sent 3 messages and then +exited, but the consumer has no way to tell that the producer is gone: +for all it knows, another message might be coming along any moment. So +it hangs forever waiting for the 4th message. + +Here's a new version that fixes this: it produces the same output as +the previous version, and then exits cleanly. The only change is the +addition of ``async with`` blocks inside the producer and consumer: -A *channel* allows you to safely and conveniently send objects between -different tasks. +.. literalinclude:: reference-core/channels-shutdown.py + :emphasize-lines: 10,15 -.. autofunction:: open_channel +The really important thing here is the producer's ``async with`` . +When the producer exits, this closes the ``send_channel``, and that +tells the consumer that no more messages are coming, so it can cleanly +exit its ``async for`` loop, and the program shuts down because both +tasks have exited. +We also put an ``async for`` inside the consumer. This isn't as +important, but can it help us catch mistakes or other problems. For +example, suppose that the consumer exited early for some reason – +maybe because of a bug. Then the producer would be sending messages +into the void, and might get stuck indefinitely. But, if the consumer +closes its ``receive_channel``, then the producer will get a +:exc:`BrokenResourceError` to alert it that it should stop sending +messages because no-one is listening. -Closing and cloning -+++++++++++++++++++ +If you want to see the effect of the consumer exiting early, try +adding a ``break`` statement to the ``async for`` loop – you should +see a :exc:`BrokenResourceError` from the producer. -example demonstrating close, simple async with on each side -result if either side exits -example demonstrating how fan-in makes it hard +.. _channel-fan-in-fan-out: -example demonstrating how clone solves this +Fan-in and fan-out +++++++++++++++++++ +You can also have multiple producers, and multiple consumers, all +sharing the same channel. However, this makes shutdown a little more +complicated. + +For example, consider this naive extension of our previous example, +now with two producers and two consumers: + +.. literalinclude:: reference-core/channels-fan-in-fan-out-broken.py + + + +.. _channel-fan-in: + +Fan-in +++++++ + + +.. _channel-buffering: Buffering in channels +++++++++++++++++++++ -When you create a channel, Trio forces you to choose how many objects -you can send without receiving, before send starts to block. You can -make this unbounded if you want, but this is risky. Here's a toy -example to demonstrate why. Suppose we have a channel with two -producers and one consumer:: +When you call :func:`open_memory_channel`, you have to specify how +many values can be buffered internally in the channel. If the buffer +is full, then any task that calls :meth:`~trio.abc.SendChannel.send` +will stop and wait for another task to call +:meth:`~trio.abc.ReceiveChannel.receive`. This is useful because it +produces *backpressure*: if the channel producers are running faster +than the consumers, then it forces the producers to slow down. + +You can disable buffering entirely, by doing +``open_memory_channel(0)``. In that case any task calls +:meth:`~trio.abc.SendChannel.send` will wait until another task calls +`~trio.abc.SendChannel.receive`, and vice versa. This is similar to +how channels work in the `classic Communicating Sequential Processes +model `__, and is +a reasonable default if you aren't sure what size buffer to use. +(That's why we used it in the examples above.) + +At the other extreme, you can make the buffer unbounded by using +``open_memory_channel(math.inf)``. In this case, +:meth:`~trio.abc.SendChannel.send` *always* returns immediately. +Normally, this is a bad idea. To see why, consider a program where the +producer runs more quickly than the consumer:: + + # Simulate a producer that generates values 10x faster than the + # consumer can handle them. + + import trio + import math async def producer(send_channel): + count = 0 while True: - await send_channel.send(1) + await send_channel.send(count) + print("Sent message:", count) + count += 1 + await trio.sleep(0.1) async def consumer(receive_channel): - while True: - print(await receive_channel.receive()) + async for value in receive_channel: + print("Received message:", value) + await trio.sleep(1) async def main(): - send_channel, receive_channel = trio.open_channel(math.inf) + send_channel, receive_channel = trio.open_memory_channel(math.inf) async with trio.open_nursery() as nursery: - # Two producers - nursery.start_soon(producer, send_channel) nursery.start_soon(producer, send_channel) - # One consumer nursery.start_soon(consumer, receive_channel) trio.run(main) -If we naively cycle between these three tasks in round-robin style, -then we send an item, then send an item, then receive an item, then -send an item, then send an item, then receive an item, ... On each -cycle we send two items but only receive one, which means the extra -item has to be buffered inside the channel. And over time, this means -the buffer will keep growing and growing, our latency is terrible -(since each new item has to wait in line behind all the others in the -buffer), we run out of memory, it's just generally bad news all -around. - -By placing an upper bound on our buffer size, we avoid this problem. -If the buffer gets too big, then it applies *backpressure*: ``send`` -blocks and forces the producers to slow down and wait until the -consumer calls ``receive``. - -You can also create a channel without a buffer, by doing -``open_channel(0)``. In that case any task that calls ``send`` on the -channel will wait until another task calls ``receive`` on the same -channel, and vice versa. This is similar to how channels work in the -`classic Communicating Sequential Processes model -`__. +If you run this program, you'll see output like: + +.. code-block:: none + + Received message: 0 + Sent message: 0 + Sent message: 1 + Sent message: 2 + Sent message: 3 + Sent message: 4 + Sent message: 5 + Sent message: 6 + Sent message: 7 + Sent message: 8 + Sent message: 9 + Received message: 1 + Sent message: 10 + Sent message: 11 + Sent message: 12 + ... + +On average, the producer sends ten messages per second, and the +consumer only receives a message once per second. After each second +the channel's internal buffer has to hold another nine items. After a +minute, the buffer will have ~540 items in it; after an hour, that +grows to ~32,400. Eventually, the program will run out of memory. And +well before we run out of memory, our latency on handling individual +messages will become terrible. For example, at the one minute mark, +the producer is sending message ~600, but the producer is still +processing message ~60. Message 600 will have to sit in the channel +for ~9 minutes before the consumer catches up and processes +it. + +Now try replacing ``open_memory_channel(math.inf)`` with +``open_memory_channel(0)``, and run it again. We get output like: + +.. code-block:: none + + Sent message: 0 + Received message: 0 + Received message: 1 + Sent message: 1 + Received message: 2 + Sent message: 2 + Sent message: 3 + Received message: 3 + +Now the ``send`` calls wait for the ``receive`` calls to finish, which +forces the producer to slow down to match the consumer's speed. (It +might look strange that some values are reported as "Received" before +they're reported as "Sent"; this happens because the actual +send/receive happen at the same time, so which line gets printed first +is random.) + +Now, try setting the buffer size to 10 – what do you think will happen? + + +So hopefully that makes clear why you generally + +, there are times when you actually do need an unbounded +buffer. For example, consider a web crawler that uses a channel to +keep track of all the URLs it still wants to crawl. Each crawler runs +a loop where it takes a URL from the channel, fetches it, checks the +HTML for outgoing links, and then adds the new URLs to the channel. +This creates a *circular flow*, where each consumer is also a +producer. In this case, if your channel buffer gets full, then the +crawlers will block when they try to add new URLs to the channel, and +if all the crawlers got blocked, then they aren't taking any URLs out +of the channel, so they're stuck forever in a deadlock. Using an +unbounded channel avoids this, because it means that +:meth:`~trio.abc.SendChannel.send` never blocks. Lower-level synchronization primitives @@ -1509,8 +1670,8 @@ Getting back into the trio thread from another thread :members: This will probably be clearer with an example. Here we demonstrate how -to spawn a child thread, and then use a :class:`trio.Queue` to send -messages between the thread and a trio task:: +to spawn a child thread, and then use a :ref:`memory channel +` to send messages between the thread and a trio task:: import trio import threading @@ -1563,6 +1724,8 @@ Exceptions and warnings .. autoexception:: WouldBlock +.. autoexception:: EndOfChannel + .. autoexception:: BusyResourceError .. autoexception:: ClosedResourceError diff --git a/docs/source/reference-core/channels-fan-in-fan-out-broken.py b/docs/source/reference-core/channels-fan-in-fan-out-broken.py new file mode 100644 index 0000000000..44fb59642c --- /dev/null +++ b/docs/source/reference-core/channels-fan-in-fan-out-broken.py @@ -0,0 +1,28 @@ +# This example usually crashes! + +import trio +import random + +async def main(): + async with trio.open_nursery() as nursery: + send_channel, receive_channel = trio.open_memory_channel(0) + # Start two producers + nursery.start_soon(producer, "A", send_channel) + nursery.start_soon(producer, "B", send_channel) + # And two consumers + nursery.start_soon(consumer, "X", receive_channel) + nursery.start_soon(consumer, "Y", receive_channel) + +async def producer(name, send_channel): + async with send_channel: + for i in range(3): + await send_channel.send("message {}-{}".format(name, i)) + await trio.sleep(random.random()) + +async def consumer(name, receive_channel): + async with receive_channel: + async for value in receive_channel: + print("{} got value {!r}".format(name, value)) + await trio.sleep(random.random()) + +trio.run(main) diff --git a/docs/source/reference-core/channels-shutdown.py b/docs/source/reference-core/channels-shutdown.py new file mode 100644 index 0000000000..dcd35767ae --- /dev/null +++ b/docs/source/reference-core/channels-shutdown.py @@ -0,0 +1,19 @@ +import trio + +async def main(): + async with trio.open_nursery() as nursery: + send_channel, receive_channel = trio.open_memory_channel(0) + nursery.start_soon(producer, send_channel) + nursery.start_soon(consumer, receive_channel) + +async def producer(send_channel): + async with send_channel: + for i in range(3): + await send_channel.send("message {}".format(i)) + +async def consumer(receive_channel): + async with receive_channel: + async for value in receive_channel: + print("got value {!r}".format(value)) + +trio.run(main) diff --git a/docs/source/reference-core/channels-simple.py b/docs/source/reference-core/channels-simple.py new file mode 100644 index 0000000000..d04ebd722c --- /dev/null +++ b/docs/source/reference-core/channels-simple.py @@ -0,0 +1,23 @@ +import trio + +async def main(): + async with trio.open_nursery() as nursery: + # Open a channel: + send_channel, receive_channel = trio.open_memory_channel(0) + # Start a producer and a consumer, passing one end of the channel to + # each of them: + nursery.start_soon(producer, send_channel) + nursery.start_soon(consumer, receive_channel) + +async def producer(send_channel): + # Producer sends 3 messages + for i in range(3): + # The producer sends using 'await send_channel.send(...)' + await send_channel.send("message {}".format(i)) + +async def consumer(receive_channel): + # The consumer uses an 'async for' loop to receive the values: + async for value in receive_channel: + print("got value {!r}".format(value)) + +trio.run(main) diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index 178b552e39..1af80212d1 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -118,6 +118,16 @@ Abstract base classes - :meth:`~Listener.accept` - - :class:`~trio.SocketListener`, :class:`~trio.ssl.SSLListener` + * - :class:`SendChannel` + - :class:`AsyncResource` + - :meth:`~SendChannel.send`, :meth:`~SendChannel.send_nowait` + - + - :func:`~trio.open_memory_channel` + * - :class:`ReceiveChannel` + - :class:`AsyncResource` + - :meth:`~ReceiveChannel.receive`, :meth:`~ReceiveChannel.receive_nowait` + - ``__aiter__``, ``__anext__`` + - :func:`~trio.open_memory_channel` .. autoclass:: trio.abc.AsyncResource :members: @@ -150,6 +160,14 @@ Abstract base classes :members: :show-inheritance: +.. autoclass:: trio.abc.SendChannel + :members: + :show-inheritance: + +.. autoclass:: trio.abc.ReceiveChannel + :members: + :show-inheritance: + .. currentmodule:: trio diff --git a/trio/__init__.py b/trio/__init__.py index 287c507041..d77360a3a4 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -19,7 +19,7 @@ TrioInternalError, RunFinishedError, WouldBlock, Cancelled, BusyResourceError, ClosedResourceError, MultiError, run, open_nursery, open_cancel_scope, current_effective_deadline, TASK_STATUS_IGNORED, - current_time, BrokenResourceError + current_time, BrokenResourceError, EndOfChannel ) from ._timeouts import ( @@ -38,7 +38,7 @@ from ._highlevel_generic import aclose_forcefully, StapledStream -from ._channel import open_channel, EndOfChannel +from ._channel import open_memory_channel from ._signals import catch_signals, open_signal_receiver diff --git a/trio/_abc.py b/trio/_abc.py index e49062a2f1..f0a3750608 100644 --- a/trio/_abc.py +++ b/trio/_abc.py @@ -1,4 +1,5 @@ from abc import ABCMeta, abstractmethod +from ._util import aiter_compat from . import _core __all__ = [ @@ -12,6 +13,8 @@ "SocketFactory", "HostnameResolver", "Listener", + "SendChannel", + "ReceiveChannel", ] @@ -282,8 +285,12 @@ class SendStream(AsyncResource): bidirectional, then you probably want to also implement :class:`ReceiveStream`, which makes your object a :class:`Stream`. - Every :class:`SendStream` also implements the :class:`AsyncResource` - interface. + :class:`SendStream` objects also implement the :class:`AsyncResource` + interface, so they can be closed by calling :meth:`~AsyncResource.aclose` + or using an ``async with`` block. + + If you want to send Python objects rather than raw bytes, see + :class:`SendChannel`. """ __slots__ = () @@ -378,8 +385,12 @@ class ReceiveStream(AsyncResource): bidirectional, then you probably want to also implement :class:`SendStream`, which makes your object a :class:`Stream`. - Every :class:`ReceiveStream` also implements the :class:`AsyncResource` - interface. + :class:`ReceiveStream` objects also implement the :class:`AsyncResource` + interface, so they can be closed by calling :meth:`~AsyncResource.aclose` + or using an ``async with`` block. + + If you want to receive Python objects rather than raw bytes, see + :class:`ReceiveChannel`. """ __slots__ = () @@ -490,6 +501,10 @@ async def send_eof(self): class Listener(AsyncResource): """A standard interface for listening for incoming connections. + :class:`Listener` objects also implement the :class:`AsyncResource` + interface, so they can be closed by calling :meth:`~AsyncResource.aclose` + or using an ``async with`` block. + """ __slots__ = () @@ -499,9 +514,9 @@ async def accept(self): Returns: AsyncResource: An object representing the incoming connection. In - practice this is almost always some variety of :class:`Stream`, - though in principle you could also use this interface with, say, - SOCK_SEQPACKET sockets or similar. + practice this is generally some kind of :class:`Stream`, + but in principle you could also define a :class:`Listener` that + returned, say, channel objects. Raises: trio.BusyResourceError: if two tasks attempt to call @@ -510,11 +525,158 @@ async def accept(self): object, or if another task closes this listener object while :meth:`accept` is running. - Note that there is no ``BrokenListenerError``, because for listeners - there is no general condition of "the network/remote peer broke the - connection" that can be handled in a generic way, like there is for - streams. Other errors *can* occur and be raised from :meth:`accept` – - for example, if you run out of file descriptors then you might get an - :class:`OSError` with its errno set to ``EMFILE``. + Listeners don't generally raise :exc:`~trio.BrokenResourceError`, + because for listeners there is no general condition of "the + network/remote peer broke the connection" that can be handled in a + generic way, like there is for streams. Other errors *can* occur and + be raised from :meth:`accept` – for example, if you run out of file + descriptors then you might get an :class:`OSError` with its errno set + to ``EMFILE``. """ + + +class SendChannel(AsyncResource): + """A standard interface for sending Python objects to some receiver. + + :class:`SendChannel` objects also implement the :class:`AsyncResource` + interface, so they can be closed by calling :meth:`~AsyncResource.aclose` + or using an ``async with`` block. + + If you want to send raw bytes rather than Python objects, see + :class:`ReceiveStream`. + + """ + __slots__ = () + + @abstractmethod + async def send_nowait(self, value): + """Attempt to send an object through the channel, without blocking. + + Args: + value (object): The object to send. + + Raises: + trio.WouldBlock: if the operation cannot be completed immediately + (for example, because the channel's internal buffer is full). + trio.BrokenResourceError: if something has gone wrong, and the + channel is broken. For example, you may get this if the receiver + has already been closed. + trio.ClosedResourceError: if you previously closed this + :class:`SendChannel` object. + + """ + + @abstractmethod + async def send(self, value): + """Attempt to send an object through the channel, blocking if necessary. + + Args: + value (object): The object to send. + + Raises: + trio.BrokenResourceError: if something has gone wrong, and the + channel is broken. For example, you may get this if the receiver + has already been closed. + trio.ClosedResourceError: if you previously closed this + :class:`SendChannel` object, or if another task closes it while + :meth:`send` is running. + + """ + + @abstractmethod + def clone(self): + """Clone this send channel object. + + This returns a new :class:`SendChannel` object, which acts as a + duplicate of the original: sending on the new object does exactly the + same thing as sending on the old object. + + However, closing one of the objects does not close the other, and + receivers don't get :exc:`~trio.EndOfChannel` until *all* clones have + been closed. + + This is useful for fan-in communication patterns, with multiple + producers all sending objects to the same destination. If you give + each producer its own clone of the :class:`SendChannel`, and make sure + to close each :class:`SendChannel` when it's finished, then receivers + will automatically get notified when all producers are finished. + + Raises: + trio.ClosedResourceError: if you already closed this + :class:`SendChannel` object. + + """ + + +class ReceiveChannel(AsyncResource): + """A standard interface for receiving Python objects from some sender. + + You can iterate over a :class:`ReceiveChannel` using an ``async for`` + loop:: + + async for value in receive_channel: + ... + + This is equivalent to calling :meth:`receive` repeatedly. The loop exits + without error when :meth:`receive` raises :exc:`~trio.EndOfChannel`. + + :class:`ReceiveChannel` objects also implement the :class:`AsyncResource` + interface, so they can be closed by calling :meth:`~AsyncResource.aclose` + or using an ``async with`` block. + + If you want to receive raw bytes rather than Python objects, see + :class:`ReceiveStream`. + + """ + __slots__ = () + + @abstractmethod + async def receive_nowait(self): + """Attempt to receive an incoming object, without blocking. + + Returns: + object: Whatever object was received. + + Raises: + trio.WouldBlock: if the operation cannot be completed immediately + (for example, because no object has been sent yet). + trio.EndOfChannel: if the sender has been closed cleanly, and no + more objects are coming. This is not an error condition. + trio.ClosedResourceError: if you previously closed this + :class:`ReceiveChannel` object. + trio.BrokenResourceError: if something has gone wrong, and the + channel is broken. + + """ + + @abstractmethod + async def receive(self): + """Attempt to receive an incoming object, blocking if necessary. + + It's legal for multiple tasks to call :meth:`receive` at the same + time. If this happens, then one task receives the first value sent, + another task receives the next value sent, and so on. + + Returns: + object: Whatever object was received. + + Raises: + trio.EndOfChannel: if the sender has been closed cleanly, and no + more objects are coming. This is not an error condition. + trio.ClosedResourceError: if you previously closed this + :class:`ReceiveChannel` object. + trio.BrokenResourceError: if something has gone wrong, and the + channel is broken. + + """ + + @aiter_compat + def __aiter__(self): + return self + + async def __anext__(self): + try: + return await self.receive() + except _core.EndOfChannel: + raise StopAsyncIteration diff --git a/trio/_channel.py b/trio/_channel.py index 0e77a43ff0..483c8c8fbb 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -5,14 +5,7 @@ from outcome import Error, Value from . import _core -from ._util import aiter_compat -from .abc import AsyncResource - -# TODO: -# - docs - -# is there a better name for 'clone'? People seem to be having trouble with -# it. +from .abc import SendChannel, ReceiveChannel # rename SendChannel/ReceiveChannel to SendHandle/ReceiveHandle? # eh, maybe not -- SendStream/ReceiveStream don't work like that. @@ -25,57 +18,81 @@ # underlying stream when all SendChannels are closed. # to think about later: -# - buffer_max=0 default? +# - max_buffer_size=0 default? # - should we make ReceiveChannel.close() raise BrokenChannelError if data gets # lost? This isn't how ReceiveStream works. And it might not be doable for a # channel that reaches between processes (e.g. data could be in flight but -# we don't know it yet) - - -class EndOfChannel(Exception): - pass - - -def open_channel(buffer_max): - """Open a channel for communicating between tasks. - - A channel is represented by two objects +# we don't know it yet). (Well, we could make it raise if it hasn't gotten a +# clean goodbye message.) OTOH, ReceiveStream has the assumption that you're +# going to spend significant effort on engineering some protocol on top of +# it, while Channel is supposed to be useful out-of-the-box. +# Practically speaking, if a consumer crashes and then its __aexit__ +# replaces the actual exception with BrokenChannelError, that's kind of +# annoying. +# But lost messages are bad too... maybe the *sender* aclose() should raise +# if it lost a message? I guess that has the same issue... +# - should we have a ChannelPair object, instead of returning a tuple? +# upside: no need to worry about order +# could have shorthand send/receive methods +# downside: pretty annoying to type out channel_pair.send_channel like... +# ever. can't deconstruct on assignment. (Or, well you could by making it +# implement __iter__, but then that's yet another quirky way to do it.) +# - is their a better/more evocative name for "clone"? People seem to be +# having trouble with it, but I'm not sure whether it's just because of +# missing docs. + + +def open_memory_channel(max_buffer_size): + """Open a channel for passing objects between tasks within a process. + + This channel is lightweight and entirely in-memory; it doesn't involve any + operating-system resources. + + The channel objects are only closed if you explicitly call + :meth:`~trio.abc.AsyncResource.aclose` or use ``async with``. In + particular, they are *not* automatically closed when garbage collected. + Closing in-memory channel objects is not mandatory, but it's generally a + good idea, because it helps avoid situations where tasks get stuck + waiting on a channel when there's no-one on the other side. Args: - buffer_max (int or math.inf): The maximum number of items that can be - buffered in the channel before :meth:`SendChannel.send` blocks. - Choosing a sensible value here is important to ensure that - backpressure is communicated promptly and avoid unnecessary latency. - If in doubt, use 0, which means that sends always block until another - task calls receive. + max_buffer_size (int or math.inf): The maximum number of items that can + be buffered in the channel before :meth:`~trio.abc.SendChannel.send` + blocks. Choosing a sensible value here is important to ensure that + backpressure is communicated promptly and avoid unnecessary latency; + see :ref:`channel-buffering` for more details. If in doubt, use 0. Returns: - A pair (:class:`SendChannel`, :class:`ReceiveChannel`). Remember: data - flows from left to right. + A pair ``(send_channel, receive_channel)``. If you have + trouble remembering which order these go in, remember: data + flows from left → right. """ - if buffer_max != inf and not isinstance(buffer_max, int): - raise TypeError("buffer_max must be an integer or math.inf") - if buffer_max < 0: - raise ValueError("buffer_max must be >= 0") - receive_channel = ReceiveChannel(buffer_max) - send_channel = SendChannel(receive_channel) + if max_buffer_size != inf and not isinstance(max_buffer_size, int): + raise TypeError("max_buffer_size must be an integer or math.inf") + if max_buffer_size < 0: + raise ValueError("max_buffer_size must be >= 0") + receive_channel = MemoryReceiveChannel(max_buffer_size) + send_channel = MemorySendChannel(receive_channel) return send_channel, receive_channel @attr.s(frozen=True) class ChannelStats: - buffer_used = attr.ib() - buffer_max = attr.ib() + current_buffer_used = attr.ib() + max_buffer_size = attr.ib() open_send_channels = attr.ib() tasks_waiting_send = attr.ib() tasks_waiting_receive = attr.ib() -class SendChannel(AsyncResource): +class MemorySendChannel(SendChannel): def __init__(self, receive_channel): self._rc = receive_channel self._closed = False + # This is just the tasks waiting on *this* object. As compared to + # self._rc._send_tasks, which includes tasks from this object and all + # clones. self._tasks = set() self._rc._open_send_channels += 1 @@ -92,19 +109,6 @@ def statistics(self): @_core.enable_ki_protection def send_nowait(self, value): - """Attempt to send an object into the channel, without blocking. - - Args: - value (object): The object to send. - - Raises: - WouldBlock: if the channel is full. - ClosedResourceError: if this :class:`SendHandle` object has already - been _closed. - BrokenChannelError: if the receiving :class:`ReceiveHandle` object - has already been _closed. - - """ if self._closed: raise _core.ClosedResourceError if self._rc._closed: @@ -120,18 +124,6 @@ def send_nowait(self, value): @_core.enable_ki_protection async def send(self, value): - """Attempt to send an object into the channel, blocking if necessary. - - Args: - value (object): The object to send. - - Raises: - ClosedResourceError: if this :class:`SendChannel` object has already - been _closed. - BrokenChannelError: if the receiving :class:`ReceiveHandle` object - has already been _closed. - - """ await _core.checkpoint_if_cancelled() try: self.send_nowait(value) @@ -155,15 +147,9 @@ def abort_fn(_): @_core.enable_ki_protection def clone(self): - """Clone this send channel. - - Raises: - ClosedResourceError: if this :class:`SendChannel` object has already - been _closed. - """ if self._closed: raise _core.ClosedResourceError - return SendChannel(self._rc) + return MemorySendChannel(self._rc) @_core.enable_ki_protection async def aclose(self): @@ -179,13 +165,13 @@ async def aclose(self): if self._rc._open_send_channels == 0: assert not self._rc._send_tasks for task in self._rc._receive_tasks: - _core.reschedule(task, Error(EndOfChannel())) + _core.reschedule(task, Error(_core.EndOfChannel())) self._rc._receive_tasks.clear() await _core.checkpoint() @attr.s(cmp=False, hash=False, repr=False) -class ReceiveChannel(AsyncResource): +class MemoryReceiveChannel(ReceiveChannel): _capacity = attr.ib() _data = attr.ib(factory=deque) _closed = attr.ib(default=False) @@ -198,8 +184,8 @@ class ReceiveChannel(AsyncResource): def statistics(self): return ChannelStats( - buffer_used=len(self._data), - buffer_max=self._capacity, + current_buffer_used=len(self._data), + max_buffer_size=self._capacity, open_send_channels=self._open_send_channels, tasks_waiting_send=len(self._send_tasks), tasks_waiting_receive=len(self._receive_tasks), @@ -223,7 +209,7 @@ def receive_nowait(self): if self._data: return self._data.popleft() if not self._open_send_channels: - raise EndOfChannel + raise _core.EndOfChannel raise _core.WouldBlock @_core.enable_ki_protection @@ -259,17 +245,5 @@ async def aclose(self): task.custom_sleep_data._tasks.remove(task) _core.reschedule(task, Error(_core.BrokenResourceError())) self._send_tasks.clear() - # XX: or if we're losing data, maybe we should raise a - # BrokenChannelError here? self._data.clear() await _core.checkpoint() - - @aiter_compat - def __aiter__(self): - return self - - async def __anext__(self): - try: - return await self.receive() - except EndOfChannel: - raise StopAsyncIteration diff --git a/trio/_core/__init__.py b/trio/_core/__init__.py index 7853ce896c..49899f716d 100644 --- a/trio/_core/__init__.py +++ b/trio/_core/__init__.py @@ -16,7 +16,7 @@ def _public(fn): from ._exceptions import ( TrioInternalError, RunFinishedError, WouldBlock, Cancelled, - BusyResourceError, ClosedResourceError, BrokenResourceError + BusyResourceError, ClosedResourceError, BrokenResourceError, EndOfChannel ) from ._multierror import MultiError diff --git a/trio/_core/_exceptions.py b/trio/_core/_exceptions.py index 09397f2b31..fd20406ae0 100644 --- a/trio/_core/_exceptions.py +++ b/trio/_core/_exceptions.py @@ -123,3 +123,12 @@ class BrokenResourceError(Exception): information about the underlying error. """ + + +class EndOfChannel(Exception): + """Raised when trying to receive from a :class:`trio.abc.ReceiveChannel` + that has no more data to receive. + + This is analogous to an "end-of-file" condition, but for channels. + + """ diff --git a/trio/tests/test_channel.py b/trio/tests/test_channel.py index ca7b8efd59..f1b6ce6ac0 100644 --- a/trio/tests/test_channel.py +++ b/trio/tests/test_channel.py @@ -2,16 +2,16 @@ from ..testing import wait_all_tasks_blocked, assert_checkpoints import trio -from trio import open_channel, EndOfChannel +from trio import open_memory_channel, EndOfChannel async def test_channel(): with pytest.raises(TypeError): - open_channel(1.0) + open_memory_channel(1.0) with pytest.raises(ValueError): - open_channel(-1) + open_memory_channel(-1) - s, r = open_channel(2) + s, r = open_memory_channel(2) repr(s) # smoke test repr(r) # smoke test @@ -49,7 +49,7 @@ async def test_channel(): async def test_553(autojump_clock): - s, r = open_channel(1) + s, r = open_memory_channel(1) with trio.move_on_after(10) as timeout_scope: await r.receive() assert timeout_scope.cancelled_caught @@ -63,7 +63,7 @@ async def producer(send_channel, i): for j in range(3 * i, 3 * (i + 1)): await send_channel.send(j) - send_channel, receive_channel = open_channel(0) + send_channel, receive_channel = open_memory_channel(0) async with trio.open_nursery() as nursery: # We hand out clones to all the new producers, and then close the # original. @@ -85,7 +85,7 @@ async def send_block(s, expect): await s.send(None) # closing send -> other send gets ClosedResourceError - s, r = open_channel(0) + s, r = open_memory_channel(0) async with trio.open_nursery() as nursery: nursery.start_soon(send_block, s, trio.ClosedResourceError) await wait_all_tasks_blocked() @@ -104,7 +104,7 @@ async def send_block(s, expect): await r.receive() # closing receive -> send gets BrokenResourceError - s, r = open_channel(0) + s, r = open_memory_channel(0) async with trio.open_nursery() as nursery: nursery.start_soon(send_block, s, trio.BrokenResourceError) await wait_all_tasks_blocked() @@ -121,7 +121,7 @@ async def receive_block(r): with pytest.raises(trio.ClosedResourceError): await r.receive() - s, r = open_channel(0) + s, r = open_memory_channel(0) async with trio.open_nursery() as nursery: nursery.start_soon(receive_block, r) await wait_all_tasks_blocked() @@ -137,7 +137,7 @@ async def receive_block(r): async def test_close_multiple_send_handles(): # With multiple send handles, closing one handle only wakes senders on # that handle, but others can continue just fine - s1, r = open_channel(0) + s1, r = open_memory_channel(0) s2 = s1.clone() async def send_will_close(): @@ -156,7 +156,7 @@ async def send_will_succeed(): async def test_inf_capacity(): - s, r = open_channel(float("inf")) + s, r = open_memory_channel(float("inf")) # It's accepted, and we can send all day without blocking async with s: @@ -170,18 +170,18 @@ async def test_inf_capacity(): async def test_statistics(): - s, r = open_channel(2) + s, r = open_memory_channel(2) assert s.statistics() == r.statistics() stats = s.statistics() - assert stats.buffer_used == 0 - assert stats.buffer_max == 2 + assert stats.current_buffer_used == 0 + assert stats.max_buffer_size == 2 assert stats.open_send_channels == 1 assert stats.tasks_waiting_send == 0 assert stats.tasks_waiting_receive == 0 s.send_nowait(None) - assert s.statistics().buffer_used == 1 + assert s.statistics().current_buffer_used == 1 s2 = s.clone() assert s.statistics().open_send_channels == 2 @@ -190,7 +190,7 @@ async def test_statistics(): async with trio.open_nursery() as nursery: s2.send_nowait(None) # fill up the buffer - assert s.statistics().buffer_used == 2 + assert s.statistics().current_buffer_used == 2 nursery.start_soon(s2.send, None) nursery.start_soon(s2.send, None) await wait_all_tasks_blocked() @@ -217,7 +217,7 @@ async def test_channel_fairness(): # We can remove an item we just sent, and send an item back in after, if # no-one else is waiting. - s, r = open_channel(1) + s, r = open_memory_channel(1) s.send_nowait(1) assert r.receive_nowait() == 1 s.send_nowait(2) @@ -243,7 +243,7 @@ async def do_receive(r): # And the analogous situation for send: if we free up a space, we can't # immediately send something in it if someone is already waiting to do # that - s, r = open_channel(1) + s, r = open_memory_channel(1) s.send_nowait(1) with pytest.raises(trio.WouldBlock): s.send_nowait(None) @@ -257,7 +257,7 @@ async def do_receive(r): async def test_unbuffered(): - s, r = open_channel(0) + s, r = open_memory_channel(0) with pytest.raises(trio.WouldBlock): r.receive_nowait() with pytest.raises(trio.WouldBlock): @@ -273,7 +273,3 @@ async def do_send(s, v): assert await r.receive() == 1 with pytest.raises(trio.WouldBlock): r.receive_nowait() - - -# tests to add: -# - all the queue tests, including e.r. fairness tests diff --git a/trio/tests/test_highlevel_serve_listeners.py b/trio/tests/test_highlevel_serve_listeners.py index d197c909ed..a0fc80589d 100644 --- a/trio/tests/test_highlevel_serve_listeners.py +++ b/trio/tests/test_highlevel_serve_listeners.py @@ -14,7 +14,7 @@ class MemoryListener(trio.abc.Listener): closed = attr.ib(default=False) accepted_streams = attr.ib(factory=list) - queued_streams = attr.ib(factory=(lambda: trio.open_channel(1))) + queued_streams = attr.ib(factory=(lambda: trio.open_memory_channel(1))) accept_hook = attr.ib(default=None) async def connect(self): diff --git a/trio/tests/test_sync.py b/trio/tests/test_sync.py index da769019e9..0f39f62008 100644 --- a/trio/tests/test_sync.py +++ b/trio/tests/test_sync.py @@ -563,7 +563,7 @@ async def do_put(q, v): from .._sync import async_cm -from .._channel import open_channel +from .._channel import open_memory_channel # Three ways of implementing a Lock in terms of a channel. Used to let us put # the channel through the generic lock tests. @@ -572,7 +572,7 @@ async def do_put(q, v): @async_cm class ChannelLock1: def __init__(self, capacity): - self.s, self.r = open_channel(capacity) + self.s, self.r = open_memory_channel(capacity) for _ in range(capacity - 1): self.s.send_nowait(None) @@ -589,7 +589,7 @@ def release(self): @async_cm class ChannelLock2: def __init__(self): - self.s, self.r = open_channel(10) + self.s, self.r = open_memory_channel(10) self.s.send_nowait(None) def acquire_nowait(self): @@ -605,7 +605,7 @@ def release(self): @async_cm class ChannelLock3: def __init__(self): - self.s, self.r = open_channel(0) + self.s, self.r = open_memory_channel(0) # self.acquired is true when one task acquires the lock and # only becomes false when it's released and no tasks are # waiting to acquire. From 6a86490351dca85b99d78682128a0bacdfdf1d4a Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 4 Oct 2018 02:32:06 -0700 Subject: [PATCH 12/23] Add ReceiveChannel.clone Discovered while writing the docs that not having it is really confusing and hard to justify. --- trio/_abc.py | 25 +++++++- trio/_channel.py | 156 ++++++++++++++++++++++++++++------------------- 2 files changed, 116 insertions(+), 65 deletions(-) diff --git a/trio/_abc.py b/trio/_abc.py index f0a3750608..eb4833309c 100644 --- a/trio/_abc.py +++ b/trio/_abc.py @@ -600,7 +600,8 @@ def clone(self): producers all sending objects to the same destination. If you give each producer its own clone of the :class:`SendChannel`, and make sure to close each :class:`SendChannel` when it's finished, then receivers - will automatically get notified when all producers are finished. + will automatically get notified when all producers are finished. See + :ref:`channels-fan-in-fan-out`. Raises: trio.ClosedResourceError: if you already closed this @@ -671,6 +672,28 @@ async def receive(self): """ + @abstractmethod + def clone(self): + """Clone this receive channel object. + + This returns a new :class:`ReceiveChannel` object, which acts as a + duplicate of the original: receiving on the new object does exactly + the same thing as receiving on the old object. They share an + underlying buffer. + + However, closing one of the objects does not close the other, and the + underlying channel is not closed until all clones are closed. + + This is useful for fan-out communication patterns, with multiple + consumers all receiving objects from the same underlying channel. See + :ref:`channels-fan-in-fan-out`. + + Raises: + trio.ClosedResourceError: if you already closed this + :class:`SendChannel` object. + + """ + @aiter_compat def __aiter__(self): return self diff --git a/trio/_channel.py b/trio/_channel.py index 483c8c8fbb..a4512ccd5a 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -72,9 +72,8 @@ def open_memory_channel(max_buffer_size): raise TypeError("max_buffer_size must be an integer or math.inf") if max_buffer_size < 0: raise ValueError("max_buffer_size must be >= 0") - receive_channel = MemoryReceiveChannel(max_buffer_size) - send_channel = MemorySendChannel(receive_channel) - return send_channel, receive_channel + state = MemoryChannelState(max_buffer_size) + return MemorySendChannel(state), MemoryReceiveChannel(state) @attr.s(frozen=True) @@ -86,39 +85,63 @@ class ChannelStats: tasks_waiting_receive = attr.ib() +@attr.s +class MemoryChannelState: + max_buffer_size = attr.ib() + data = attr.ib(factory=deque) + # Counts of open endpoints using this state + open_send_channels = attr.ib(default=0) + open_receive_channels = attr.ib(default=0) + # {task: value} + send_tasks = attr.ib(factory=OrderedDict) + # {task: None} + receive_tasks = attr.ib(factory=OrderedDict) + + def statistics(self): + return ChannelStats( + current_buffer_used=len(self.data), + max_buffer_size=self.max_buffer_size, + open_send_channels=self.open_send_channels, + tasks_waiting_send=len(self.send_tasks), + tasks_waiting_receive=len(self.receive_tasks), + ) + + +@attr.s(cmp=False, repr=False) class MemorySendChannel(SendChannel): - def __init__(self, receive_channel): - self._rc = receive_channel - self._closed = False - # This is just the tasks waiting on *this* object. As compared to - # self._rc._send_tasks, which includes tasks from this object and all - # clones. - self._tasks = set() - self._rc._open_send_channels += 1 + _state = attr.ib() + _closed = attr.ib(default=False) + # This is just the tasks waiting on *this* object. As compared to + # self._state.send_tasks, which includes tasks from this object and + # all clones. + _tasks = attr.ib(factory=set) + + def __attrs_post_init__(self): + self._state.open_send_channels += 1 def __repr__(self): return ( - "" - .format(id(self), id(self._rc)) + "" + .format(id(self), id(self._state)) ) def statistics(self): - # XX should we also report statistics specific to this object, like - # len(self._tasks)? - return self._rc.statistics() + # XX should we also report statistics specific to this object? + return self._state.statistics() @_core.enable_ki_protection def send_nowait(self, value): if self._closed: raise _core.ClosedResourceError - if self._rc._closed: + if self._state.open_receive_channels == 0: raise _core.BrokenResourceError - if self._rc._receive_tasks: - assert not self._rc._data - task, _ = self._rc._receive_tasks.popitem(last=False) + if self._state.receive_tasks: + assert not self._state.data + task, _ = self._state.receive_tasks.popitem(last=False) + task.custom_sleep_data._tasks.remove(task) _core.reschedule(task, Value(value)) - elif len(self._rc._data) < self._rc._capacity: - self._rc._data.append(value) + elif len(self._state.data) < self._state.max_buffer_size: + self._state.data.append(value) else: raise _core.WouldBlock @@ -135,12 +158,12 @@ async def send(self, value): task = _core.current_task() self._tasks.add(task) - self._rc._send_tasks[task] = value + self._state.send_tasks[task] = value task.custom_sleep_data = self def abort_fn(_): self._tasks.remove(task) - del self._rc._send_tasks[task] + del self._state.send_tasks[task] return _core.Abort.SUCCEEDED await _core.wait_task_rescheduled(abort_fn) @@ -149,7 +172,7 @@ def abort_fn(_): def clone(self): if self._closed: raise _core.ClosedResourceError - return MemorySendChannel(self._rc) + return MemorySendChannel(self._state) @_core.enable_ki_protection async def aclose(self): @@ -159,56 +182,48 @@ async def aclose(self): self._closed = True for task in self._tasks: _core.reschedule(task, Error(_core.ClosedResourceError())) - del self._rc._send_tasks[task] + del self._state.send_tasks[task] self._tasks.clear() - self._rc._open_send_channels -= 1 - if self._rc._open_send_channels == 0: - assert not self._rc._send_tasks - for task in self._rc._receive_tasks: + self._state.open_send_channels -= 1 + if self._state.open_send_channels == 0: + assert not self._state.send_tasks + for task in self._state.receive_tasks: + task.custom_sleep_data._tasks.remove(task) _core.reschedule(task, Error(_core.EndOfChannel())) - self._rc._receive_tasks.clear() + self._state.receive_tasks.clear() await _core.checkpoint() -@attr.s(cmp=False, hash=False, repr=False) +@attr.s(cmp=False, repr=False) class MemoryReceiveChannel(ReceiveChannel): - _capacity = attr.ib() - _data = attr.ib(factory=deque) + _state = attr.ib() _closed = attr.ib(default=False) - # count of open send channels - _open_send_channels = attr.ib(default=0) - # {task: value} - _send_tasks = attr.ib(factory=OrderedDict) - # {task: None} - _receive_tasks = attr.ib(factory=OrderedDict) + _tasks = attr.ib(factory=set) + + def __attrs_post_init__(self): + self._state.open_receive_channels += 1 def statistics(self): - return ChannelStats( - current_buffer_used=len(self._data), - max_buffer_size=self._capacity, - open_send_channels=self._open_send_channels, - tasks_waiting_send=len(self._send_tasks), - tasks_waiting_receive=len(self._receive_tasks), - ) + return self._state.statistics() def __repr__(self): - return "".format( - id(self), self._open_send_channels + return "".format( + id(self), id(self._state) ) @_core.enable_ki_protection def receive_nowait(self): if self._closed: raise _core.ClosedResourceError - if self._send_tasks: - task, value = self._send_tasks.popitem(last=False) + if self._state.send_tasks: + task, value = self._state.send_tasks.popitem(last=False) task.custom_sleep_data._tasks.remove(task) _core.reschedule(task) - self._data.append(value) + self._state.data.append(value) # Fall through - if self._data: - return self._data.popleft() - if not self._open_send_channels: + if self._state.data: + return self._state.data.popleft() + if not self._state.open_send_channels: raise _core.EndOfChannel raise _core.WouldBlock @@ -224,26 +239,39 @@ async def receive(self): return value task = _core.current_task() - self._receive_tasks[task] = None + self._tasks.add(task) + self._state.receive_tasks[task] = None + task.custom_sleep_data = self def abort_fn(_): - del self._receive_tasks[task] + self._tasks.remove(task) + del self._state.receive_tasks[task] return _core.Abort.SUCCEEDED return await _core.wait_task_rescheduled(abort_fn) + @_core.enable_ki_protection + def clone(self): + if self._closed: + raise _core.ClosedResourceError + return MemoryReceiveChannel(self._state) + @_core.enable_ki_protection async def aclose(self): if self._closed: await _core.checkpoint() return self._closed = True - for task in self._receive_tasks: + for task in self._tasks: _core.reschedule(task, Error(_core.ClosedResourceError())) - self._receive_tasks.clear() - for task in self._send_tasks: - task.custom_sleep_data._tasks.remove(task) - _core.reschedule(task, Error(_core.BrokenResourceError())) - self._send_tasks.clear() - self._data.clear() + del self._state.receive_tasks[task] + self._tasks.clear() + self._state.open_receive_channels -= 1 + if self._state.open_receive_channels == 0: + assert not self._state.receive_tasks + for task in self._state.send_tasks: + task.custom_sleep_data._tasks.remove(task) + _core.reschedule(task, Error(_core.BrokenResourceError())) + self._state.send_tasks.clear() + self._state.data.clear() await _core.checkpoint() From 0ee63176a444ebd37e02a19f2fa8590bb008a220 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 4 Oct 2018 02:33:57 -0700 Subject: [PATCH 13/23] Fix test failure on windows --- trio/_core/tests/test_windows.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/trio/_core/tests/test_windows.py b/trio/_core/tests/test_windows.py index c0ec5cce64..18436c7704 100644 --- a/trio/_core/tests/test_windows.py +++ b/trio/_core/tests/test_windows.py @@ -11,6 +11,12 @@ from .._windows_cffi import ffi, kernel32 +# The undocumented API that this is testing should be changed to stop using +# UnboundedQueue (or just removed until we have time to redo it), but until +# then we filter out the warning. +@pytest.mark.filterwarnings( + "ignore:.*UnboundedQueue:trio.TrioDeprecationWarning" +) async def test_completion_key_listen(): async def post(key): iocp = ffi.cast("HANDLE", _core.current_iocp()) From 17242bae6855f6e1bb996ed41ea614a159e7f63e Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 4 Oct 2018 02:41:47 -0700 Subject: [PATCH 14/23] Add tests for ReceiveChannel.clone --- trio/tests/test_channel.py | 48 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/trio/tests/test_channel.py b/trio/tests/test_channel.py index f1b6ce6ac0..def3ac445f 100644 --- a/trio/tests/test_channel.py +++ b/trio/tests/test_channel.py @@ -134,6 +134,33 @@ async def receive_block(r): await r.receive() +async def test_receive_channel_clone_and_close(): + s, r = open_memory_channel(10) + + r2 = r.clone() + r3 = r.clone() + + s.send_nowait(None) + await r.aclose() + async with r2: + pass + + with pytest.raises(trio.ClosedResourceError): + r.clone() + + with pytest.raises(trio.ClosedResourceError): + r2.clone() + + # Can still send, r3 is still open + s.send_nowait(None) + + await r3.aclose() + + # But now the receiver is really closed + with pytest.raises(trio.BrokenResourceError): + s.send_nowait(None) + + async def test_close_multiple_send_handles(): # With multiple send handles, closing one handle only wakes senders on # that handle, but others can continue just fine @@ -155,6 +182,27 @@ async def send_will_succeed(): assert await r.receive() == "ok" +async def test_close_multiple_receive_handles(): + # With multiple receive handles, closing one handle only wakes receivers on + # that handle, but others can continue just fine + s, r1 = open_memory_channel(0) + r2 = r1.clone() + + async def receive_will_close(): + with pytest.raises(trio.ClosedResourceError): + await r1.receive() + + async def receive_will_succeed(): + assert await r2.receive() == "ok" + + async with trio.open_nursery() as nursery: + nursery.start_soon(receive_will_close) + nursery.start_soon(receive_will_succeed) + await wait_all_tasks_blocked() + await r1.aclose() + await s.send("ok") + + async def test_inf_capacity(): s, r = open_memory_channel(float("inf")) From c5a8b5c53ea0844443dcec869b57fc9bb8086eef Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 4 Oct 2018 03:47:58 -0700 Subject: [PATCH 15/23] Finish writing channel docs --- docs/source/reference-core.rst | 194 +++++++++++++----- .../reference-core/channels-backpressure.py | 30 +++ .../channels-fan-in-fan-out-broken.py | 6 +- .../channels-fan-in-fan-out-fixed.py | 29 +++ trio/_abc.py | 4 +- trio/_channel.py | 5 +- 6 files changed, 206 insertions(+), 62 deletions(-) create mode 100644 docs/source/reference-core/channels-backpressure.py create mode 100644 docs/source/reference-core/channels-fan-in-fan-out-fixed.py diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index a9ca0935ed..25ba6d305e 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1247,8 +1247,8 @@ Broadcasting an event with :class:`Event` .. _channels: -Passing value through channels -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Using channels to pass values between tasks +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ *Channels* allow you to safely and conveniently send objects between different tasks. They're useful implementing producer/consumer @@ -1343,12 +1343,90 @@ now with two producers and two consumers: .. literalinclude:: reference-core/channels-fan-in-fan-out-broken.py +The two producers, A and B, send 3 messages apiece. These are then +randomly distributed between the two producers, X and Y. So we're +hoping to see some output like: + +.. code-block:: none + + consumer Y got value '0 from producer B' + consumer X got value '0 from producer A' + consumer Y got value '1 from producer A' + consumer Y got value '1 from producer B' + consumer X got value '2 from producer B' + consumer X got value '2 from producer A' + +However, on most runs, that's not what happens – the first part of the +output is OK, and then when we get to the end the program crashes with +:exc:`ClosedResourceError`. If you run the program a few times, you'll +see that sometimes the traceback shows ``send`` crashing, and other +times it shows ``receive`` crashing, and you might even find that on +some runs it doesn't crash at all. + +Here's what's happening: suppose that producer A finishes first. It +exits, and its ``async with`` block closes the ``send_channel``. But +wait! Producer B was still using that ``send_channel``... so when it +calls ``send``, it gets a :exc:`ClosedResourceError`. + +Sometimes, though if we're lucky, the two producers might finish at +the same time (or close enough), so they both make their last ``send`` +before either of them closes the ``send_channel``. + +But, even if that happens, we're not out of the woods yet! After the +producers exit, the two consumers race to be the first to notice that +the ``send_channel`` has closed. Suppose that X wins the race. It +exits its ``async for`` loop, then exits the ``async with`` block... +and closes the ``receive_channel``, while Y is still using it. Again, +this causes a crash. + +We could avoid this by using some complicated bookkeeping to make sure +that only the *last* producer and the *last* consumer close their +channel endpoints... but that would be tiresome and fragile. +Fortunately, there's a better way! Here's a fixed version of our +program above: + +.. literalinclude:: reference-core/channels-fan-in-fan-out-fixed.py + :emphasize-lines: 7, 9, 10, 12, 13 + +What we're doing here is taking advantage of the +:meth:`SendChannel.clone ` and +:meth:`ReceiveChannel.clone ` methods. +What these do is create copies of our endpoints, that act just like +the original – except that they can be closed independently. And the +underlying channel is only closed after *all* the clones have been +closed. So this completely solves our problem with shutdown, and if +you run this program, you'll see it print its six lines of output and +then exits cleanly. + +Notice a small trick we use: the code in ``main`` that sets up the +channels in the first place uses an ``async with`` block to close the +original objects after passing on clones to all the tasks. We could +alternatively have passed on the original objects into the tasks, +like:: + + # Also works, but is more finicky: + send_channel, receive_channel = trio.open_memory_channel(0) + nursery.start_soon(producer, "A", send_channel.clone()) + nursery.start_soon(producer, "B", send_channel) + nursery.start_soon(consumer, "X", receive_channel.clone()) + nursery.start_soon(consumer, "Y", receive_channel) +But this is more error-prone, especially if in more complex code that +might use a loop to spawn the producers/consumers. -.. _channel-fan-in: +Just make sure that you don't write:: -Fan-in -++++++ + # Broken, will cause program to hang: + send_channel, receive_channel = trio.open_memory_channel(0) + nursery.start_soon(producer, "A", send_channel.clone()) + nursery.start_soon(producer, "B", send_channel.clone()) + nursery.start_soon(consumer, "X", receive_channel.clone()) + nursery.start_soon(consumer, "Y", receive_channel.clone()) + +Here we pass clones into the tasks, but never close the original +objects. That means we have 3 send channel objects (the original + two +clones), but we only close 2 of them, so the consumers will hang +around forever waiting for that last one to be closed. .. _channel-buffering: @@ -1379,39 +1457,14 @@ At the other extreme, you can make the buffer unbounded by using Normally, this is a bad idea. To see why, consider a program where the producer runs more quickly than the consumer:: - # Simulate a producer that generates values 10x faster than the - # consumer can handle them. - - import trio - import math - - async def producer(send_channel): - count = 0 - while True: - await send_channel.send(count) - print("Sent message:", count) - count += 1 - await trio.sleep(0.1) - - async def consumer(receive_channel): - async for value in receive_channel: - print("Received message:", value) - await trio.sleep(1) - - async def main(): - send_channel, receive_channel = trio.open_memory_channel(math.inf) - async with trio.open_nursery() as nursery: - nursery.start_soon(producer, send_channel) - nursery.start_soon(consumer, receive_channel) - - trio.run(main) +.. literalinclude:: reference-core/channels-backpressure.py If you run this program, you'll see output like: .. code-block:: none - Received message: 0 Sent message: 0 + Received message: 0 Sent message: 1 Sent message: 2 Sent message: 3 @@ -1428,16 +1481,16 @@ If you run this program, you'll see output like: ... On average, the producer sends ten messages per second, and the -consumer only receives a message once per second. After each second -the channel's internal buffer has to hold another nine items. After a -minute, the buffer will have ~540 items in it; after an hour, that -grows to ~32,400. Eventually, the program will run out of memory. And -well before we run out of memory, our latency on handling individual -messages will become terrible. For example, at the one minute mark, -the producer is sending message ~600, but the producer is still -processing message ~60. Message 600 will have to sit in the channel -for ~9 minutes before the consumer catches up and processes -it. +consumer only receives a message once per second. That means that each +second, the channel's internal buffer has to grow to hold an extra +nine items. After a minute, the buffer will have ~540 items in it; +after an hour, that grows to ~32,400. Eventually, the program will run +out of memory. And well before we run out of memory, our latency on +handling individual messages will become abysmal. For example, at the +one minute mark, the producer is sending message ~600, but the +producer is still processing message ~60. Message 600 will have to sit +in the channel for ~9 minutes before the consumer catches up and +processes it. Now try replacing ``open_memory_channel(math.inf)`` with ``open_memory_channel(0)``, and run it again. We get output like: @@ -1452,6 +1505,7 @@ Now try replacing ``open_memory_channel(math.inf)`` with Sent message: 2 Sent message: 3 Received message: 3 + ... Now the ``send`` calls wait for the ``receive`` calls to finish, which forces the producer to slow down to match the consumer's speed. (It @@ -1460,23 +1514,51 @@ they're reported as "Sent"; this happens because the actual send/receive happen at the same time, so which line gets printed first is random.) -Now, try setting the buffer size to 10 – what do you think will happen? +Now, let's try setting a small but nonzero buffer size, like +``open_memory_channel(3)``. what do you think will happen? +I get: -So hopefully that makes clear why you generally +.. code-block:: none + + Sent message: 0 + Received message: 0 + Sent message: 1 + Sent message: 2 + Sent message: 3 + Received message: 1 + Sent message: 4 + Received message: 2 + Sent message: 5 + ... -, there are times when you actually do need an unbounded -buffer. For example, consider a web crawler that uses a channel to -keep track of all the URLs it still wants to crawl. Each crawler runs -a loop where it takes a URL from the channel, fetches it, checks the -HTML for outgoing links, and then adds the new URLs to the channel. -This creates a *circular flow*, where each consumer is also a -producer. In this case, if your channel buffer gets full, then the -crawlers will block when they try to add new URLs to the channel, and -if all the crawlers got blocked, then they aren't taking any URLs out -of the channel, so they're stuck forever in a deadlock. Using an -unbounded channel avoids this, because it means that -:meth:`~trio.abc.SendChannel.send` never blocks. +So you can see that the producer runs ahead by 3 messages, and then +stops to wait: when the consumer reads message 1, it sends message 4, +then when the consumer reads message 2, it sends message 5, and so on. +Once it reaches the steady state, this version acts just like our +previous version where we set the buffer size to 0, except that it +uses a bit more memory and each message sits in the buffer for a bit +longer before being processed (i.e., the message latency is higher). + +Of course real producers and consumers are usually more complicated +than this, and in some situations, a modest amount of buffering might +improve throughput. But too much buffering wastes memory and increases +latency, so if you want to tune your application you should experiment +to see what value works best for you. + +**Why do we even support unbounded buffers then?** Good question! +Despite everything we saw above, there are times when you actually do +need an unbounded buffer. For example, consider a web crawler that +uses a channel to keep track of all the URLs it still wants to crawl. +Each crawler runs a loop where it takes a URL from the channel, +fetches it, checks the HTML for outgoing links, and then adds the new +URLs to the channel. This creates a *circular flow*, where each +consumer is also a producer. In this case, if your channel buffer gets +full, then the crawlers will block when they try to add new URLs to +the channel, and if all the crawlers got blocked, then they aren't +taking any URLs out of the channel, so they're stuck forever in a +deadlock. Using an unbounded channel avoids this, because it means +that :meth:`~trio.abc.SendChannel.send` never blocks. Lower-level synchronization primitives diff --git a/docs/source/reference-core/channels-backpressure.py b/docs/source/reference-core/channels-backpressure.py new file mode 100644 index 0000000000..50ac67f20a --- /dev/null +++ b/docs/source/reference-core/channels-backpressure.py @@ -0,0 +1,30 @@ +# Simulate a producer that generates values 10x faster than the +# consumer can handle them. + +import trio +import math + +async def producer(send_channel): + count = 0 + while True: + # Pretend that we have to do some work to create this message, and it + # takes 0.1 seconds: + await trio.sleep(0.1) + await send_channel.send(count) + print("Sent message:", count) + count += 1 + +async def consumer(receive_channel): + async for value in receive_channel: + print("Received message:", value) + # Pretend that we have to do some work to handle this message, and it + # takes 1 second + await trio.sleep(1) + +async def main(): + send_channel, receive_channel = trio.open_memory_channel(math.inf) + async with trio.open_nursery() as nursery: + nursery.start_soon(producer, send_channel) + nursery.start_soon(consumer, receive_channel) + +trio.run(main) diff --git a/docs/source/reference-core/channels-fan-in-fan-out-broken.py b/docs/source/reference-core/channels-fan-in-fan-out-broken.py index 44fb59642c..2a755acba3 100644 --- a/docs/source/reference-core/channels-fan-in-fan-out-broken.py +++ b/docs/source/reference-core/channels-fan-in-fan-out-broken.py @@ -16,13 +16,15 @@ async def main(): async def producer(name, send_channel): async with send_channel: for i in range(3): - await send_channel.send("message {}-{}".format(name, i)) + await send_channel.send("{} from producer {}".format(i, name)) + # Random sleeps help trigger the problem more reliably await trio.sleep(random.random()) async def consumer(name, receive_channel): async with receive_channel: async for value in receive_channel: - print("{} got value {!r}".format(name, value)) + print("consumer {} got value {!r}".format(name, value)) + # Random sleeps help trigger the problem more reliably await trio.sleep(random.random()) trio.run(main) diff --git a/docs/source/reference-core/channels-fan-in-fan-out-fixed.py b/docs/source/reference-core/channels-fan-in-fan-out-fixed.py new file mode 100644 index 0000000000..a3e7044fe7 --- /dev/null +++ b/docs/source/reference-core/channels-fan-in-fan-out-fixed.py @@ -0,0 +1,29 @@ +import trio +import random + +async def main(): + async with trio.open_nursery() as nursery: + send_channel, receive_channel = trio.open_memory_channel(0) + async with send_channel, receive_channel: + # Start two producers, giving each its own private clone + nursery.start_soon(producer, "A", send_channel.clone()) + nursery.start_soon(producer, "B", send_channel.clone()) + # And two consumers, giving each its own private clone + nursery.start_soon(consumer, "X", receive_channel.clone()) + nursery.start_soon(consumer, "Y", receive_channel.clone()) + +async def producer(name, send_channel): + async with send_channel: + for i in range(3): + await send_channel.send("{} from producer {}".format(i, name)) + # Random sleeps help trigger the problem more reliably + await trio.sleep(random.random()) + +async def consumer(name, receive_channel): + async with receive_channel: + async for value in receive_channel: + print("consumer {} got value {!r}".format(name, value)) + # Random sleeps help trigger the problem more reliably + await trio.sleep(random.random()) + +trio.run(main) diff --git a/trio/_abc.py b/trio/_abc.py index eb4833309c..7da9e16c8e 100644 --- a/trio/_abc.py +++ b/trio/_abc.py @@ -601,7 +601,7 @@ def clone(self): each producer its own clone of the :class:`SendChannel`, and make sure to close each :class:`SendChannel` when it's finished, then receivers will automatically get notified when all producers are finished. See - :ref:`channels-fan-in-fan-out`. + :ref:`channel-fan-in-fan-out`. Raises: trio.ClosedResourceError: if you already closed this @@ -686,7 +686,7 @@ def clone(self): This is useful for fan-out communication patterns, with multiple consumers all receiving objects from the same underlying channel. See - :ref:`channels-fan-in-fan-out`. + :ref:`channel-fan-in-fan-out`. Raises: trio.ClosedResourceError: if you already closed this diff --git a/trio/_channel.py b/trio/_channel.py index a4512ccd5a..d91e1d2abf 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -121,8 +121,9 @@ def __attrs_post_init__(self): def __repr__(self): return ( - "" - .format(id(self), id(self._state)) + "".format( + id(self), id(self._state) + ) ) def statistics(self): From 48b3058ab9c90fb8829925c2423271771c1caed9 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 4 Oct 2018 03:57:08 -0700 Subject: [PATCH 16/23] Add another followup reminder --- trio/_channel.py | 1 + 1 file changed, 1 insertion(+) diff --git a/trio/_channel.py b/trio/_channel.py index d91e1d2abf..837b430106 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -40,6 +40,7 @@ # - is their a better/more evocative name for "clone"? People seem to be # having trouble with it, but I'm not sure whether it's just because of # missing docs. +# - trio.testing.check_channel? def open_memory_channel(max_buffer_size): From 55eac4357539f999022264cc30a7af48507b2589 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 5 Oct 2018 00:13:02 -0700 Subject: [PATCH 17/23] Address review comments --- docs/source/reference-core.rst | 109 ++++++------------ .../channels-fan-in-fan-out-broken.py | 30 ----- .../channels-fan-in-fan-out-fixed.py | 29 ----- newsfragments/497.feature.rst | 11 +- newsfragments/497.removal.rst | 2 +- trio/_abc.py | 28 +++-- trio/_channel.py | 3 + trio/_core/_unbounded_queue.py | 2 +- trio/_sync.py | 5 +- trio/tests/test_channel.py | 26 ++++- 10 files changed, 91 insertions(+), 154 deletions(-) delete mode 100644 docs/source/reference-core/channels-fan-in-fan-out-broken.py delete mode 100644 docs/source/reference-core/channels-fan-in-fan-out-fixed.py diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 25ba6d305e..755808a3a6 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1251,26 +1251,27 @@ Using channels to pass values between tasks ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ *Channels* allow you to safely and conveniently send objects between -different tasks. They're useful implementing producer/consumer -patterns, including fan-in and fan-out. +different tasks. They're particularly useful for implementing +producer/consumer patterns. The channel API is defined by the abstract base classes :class:`trio.abc.SendChannel` and :class:`trio.abc.ReceiveChannel`. You can use these to implement your own custom channels, that do things like pass objects between processes or over the network. But in many cases, you just want to pass objects between different tasks -inside a single process. +inside a single process, and for that you can use +:func:`trio.open_memory_channel`: .. autofunction:: open_memory_channel -.. note:: If you've the :mod:`threading` or :mod:`asyncio` modules, - you may be familiar with :class:`queue.Queue` or +.. note:: If you've used the :mod:`threading` or :mod:`asyncio` + modules, you may be familiar with :class:`queue.Queue` or :class:`asyncio.Queue`. In Trio, :func:`open_memory_channel` is what you use when you're looking for a queue. The main difference is that Trio splits the classic queue interface up into two objects. The advantage of this is that it makes it possible to put the two ends in different processes, and that we can close the two - sides separately + sides separately. A simple channel example @@ -1312,10 +1313,10 @@ addition of ``async with`` blocks inside the producer and consumer: The really important thing here is the producer's ``async with`` . When the producer exits, this closes the ``send_channel``, and that tells the consumer that no more messages are coming, so it can cleanly -exit its ``async for`` loop, and the program shuts down because both +exit its ``async for`` loop. Then the program shuts down because both tasks have exited. -We also put an ``async for`` inside the consumer. This isn't as +We also added an ``async with`` to the consumer. This isn't as important, but can it help us catch mistakes or other problems. For example, suppose that the consumer exited early for some reason – maybe because of a bug. Then the producer would be sending messages @@ -1329,10 +1330,10 @@ adding a ``break`` statement to the ``async for`` loop – you should see a :exc:`BrokenResourceError` from the producer. -.. _channel-fan-in-fan-out: +.. _channel-mpmc: -Fan-in and fan-out -++++++++++++++++++ +Managing multiple producers and/or multiple consumers ++++++++++++++++++++++++++++++++++++++++++++++++++++++ You can also have multiple producers, and multiple consumers, all sharing the same channel. However, this makes shutdown a little more @@ -1341,7 +1342,7 @@ complicated. For example, consider this naive extension of our previous example, now with two producers and two consumers: -.. literalinclude:: reference-core/channels-fan-in-fan-out-broken.py +.. literalinclude:: reference-core/channels-mpmc-broken.py The two producers, A and B, send 3 messages apiece. These are then randomly distributed between the two producers, X and Y. So we're @@ -1365,8 +1366,8 @@ some runs it doesn't crash at all. Here's what's happening: suppose that producer A finishes first. It exits, and its ``async with`` block closes the ``send_channel``. But -wait! Producer B was still using that ``send_channel``... so when it -calls ``send``, it gets a :exc:`ClosedResourceError`. +wait! Producer B was still using that ``send_channel``... so the next +time B calls ``send``, it gets a :exc:`ClosedResourceError`. Sometimes, though if we're lucky, the two producers might finish at the same time (or close enough), so they both make their last ``send`` @@ -1385,24 +1386,23 @@ channel endpoints... but that would be tiresome and fragile. Fortunately, there's a better way! Here's a fixed version of our program above: -.. literalinclude:: reference-core/channels-fan-in-fan-out-fixed.py +.. literalinclude:: reference-core/channels-mpmc-fixed.py :emphasize-lines: 7, 9, 10, 12, 13 -What we're doing here is taking advantage of the -:meth:`SendChannel.clone ` and -:meth:`ReceiveChannel.clone ` methods. -What these do is create copies of our endpoints, that act just like -the original – except that they can be closed independently. And the -underlying channel is only closed after *all* the clones have been -closed. So this completely solves our problem with shutdown, and if -you run this program, you'll see it print its six lines of output and -then exits cleanly. - -Notice a small trick we use: the code in ``main`` that sets up the -channels in the first place uses an ``async with`` block to close the -original objects after passing on clones to all the tasks. We could -alternatively have passed on the original objects into the tasks, -like:: +This example demonstrates using the :meth:`SendChannel.clone +` and :meth:`ReceiveChannel.clone +` methods. What these do is create +copies of our endpoints, that act just like the original – except that +they can be closed independently. And the underlying channel is only +closed after *all* the clones have been closed. So this completely +solves our problem with shutdown, and if you run this program, you'll +see it print its six lines of output and then exits cleanly. + +Notice a small trick we use: the code in ``main`` creates clone +objects to pass into all the child tasks, and then closes the original +objects using ``async with``. Another option is to pass clones into +all-but-one of the child tasks, and then pass the original object into +the last task, like:: # Also works, but is more finicky: send_channel, receive_channel = trio.open_memory_channel(0) @@ -1411,8 +1411,8 @@ like:: nursery.start_soon(consumer, "X", receive_channel.clone()) nursery.start_soon(consumer, "Y", receive_channel) -But this is more error-prone, especially if in more complex code that -might use a loop to spawn the producers/consumers. +But this is more error-prone, especially if you use a loop to spawn +the producers/consumers. Just make sure that you don't write:: @@ -1480,8 +1480,8 @@ If you run this program, you'll see output like: Sent message: 12 ... -On average, the producer sends ten messages per second, and the -consumer only receives a message once per second. That means that each +On average, the producer sends ten messages per second, but the +consumer only calls ``receive`` once per second. That means that each second, the channel's internal buffer has to grow to hold an extra nine items. After a minute, the buffer will have ~540 items in it; after an hour, that grows to ~32,400. Eventually, the program will run @@ -1755,46 +1755,7 @@ This will probably be clearer with an example. Here we demonstrate how to spawn a child thread, and then use a :ref:`memory channel ` to send messages between the thread and a trio task:: - import trio - import threading - - def thread_fn(portal, request_queue, response_queue): - while True: - # Since we're in a thread, we can't call trio.Queue methods - # directly -- so we use our portal to call them. - request = portal.run(request_queue.get) - # We use 'None' as a request to quit - if request is not None: - response = request + 1 - portal.run(response_queue.put, response) - else: - # acknowledge that we're shutting down, and then do it - portal.run(response_queue.put, None) - return - - async def main(): - portal = trio.BlockingTrioPortal() - request_queue = trio.Queue(1) - response_queue = trio.Queue(1) - thread = threading.Thread( - target=thread_fn, - args=(portal, request_queue, response_queue)) - thread.start() - - # prints "1" - await request_queue.put(0) - print(await response_queue.get()) - - # prints "2" - await request_queue.put(1) - print(await response_queue.get()) - - # prints "None" - await request_queue.put(None) - print(await response_queue.get()) - thread.join() - - trio.run(main) +.. literalinclude:: reference-core/blocking-trio-portal-example.py Exceptions and warnings diff --git a/docs/source/reference-core/channels-fan-in-fan-out-broken.py b/docs/source/reference-core/channels-fan-in-fan-out-broken.py deleted file mode 100644 index 2a755acba3..0000000000 --- a/docs/source/reference-core/channels-fan-in-fan-out-broken.py +++ /dev/null @@ -1,30 +0,0 @@ -# This example usually crashes! - -import trio -import random - -async def main(): - async with trio.open_nursery() as nursery: - send_channel, receive_channel = trio.open_memory_channel(0) - # Start two producers - nursery.start_soon(producer, "A", send_channel) - nursery.start_soon(producer, "B", send_channel) - # And two consumers - nursery.start_soon(consumer, "X", receive_channel) - nursery.start_soon(consumer, "Y", receive_channel) - -async def producer(name, send_channel): - async with send_channel: - for i in range(3): - await send_channel.send("{} from producer {}".format(i, name)) - # Random sleeps help trigger the problem more reliably - await trio.sleep(random.random()) - -async def consumer(name, receive_channel): - async with receive_channel: - async for value in receive_channel: - print("consumer {} got value {!r}".format(name, value)) - # Random sleeps help trigger the problem more reliably - await trio.sleep(random.random()) - -trio.run(main) diff --git a/docs/source/reference-core/channels-fan-in-fan-out-fixed.py b/docs/source/reference-core/channels-fan-in-fan-out-fixed.py deleted file mode 100644 index a3e7044fe7..0000000000 --- a/docs/source/reference-core/channels-fan-in-fan-out-fixed.py +++ /dev/null @@ -1,29 +0,0 @@ -import trio -import random - -async def main(): - async with trio.open_nursery() as nursery: - send_channel, receive_channel = trio.open_memory_channel(0) - async with send_channel, receive_channel: - # Start two producers, giving each its own private clone - nursery.start_soon(producer, "A", send_channel.clone()) - nursery.start_soon(producer, "B", send_channel.clone()) - # And two consumers, giving each its own private clone - nursery.start_soon(consumer, "X", receive_channel.clone()) - nursery.start_soon(consumer, "Y", receive_channel.clone()) - -async def producer(name, send_channel): - async with send_channel: - for i in range(3): - await send_channel.send("{} from producer {}".format(i, name)) - # Random sleeps help trigger the problem more reliably - await trio.sleep(random.random()) - -async def consumer(name, receive_channel): - async with receive_channel: - async for value in receive_channel: - print("consumer {} got value {!r}".format(name, value)) - # Random sleeps help trigger the problem more reliably - await trio.sleep(random.random()) - -trio.run(main) diff --git a/newsfragments/497.feature.rst b/newsfragments/497.feature.rst index 43778d2260..820b76471a 100644 --- a/newsfragments/497.feature.rst +++ b/newsfragments/497.feature.rst @@ -1,9 +1,10 @@ New and improved APIs for inter-task communication: -:func:`trio.open_channel` (replacing ``trio.Queue``). This interface -uses separate "send" and "receive" objects, for consistency with other -communication interfaces like :class:`trio.Stream`. Also, the two -objects can now be closed individually, making it much easier to -gracefully shut down a channel. Also, check out the nifty ``clone`` +:class:`trio.abc.SendChannel`, :class:`trio.abc.ReceiveChannel`, and +:func:`trio.open_memory_channel` (which replaces ``trio.Queue``). This +interface uses separate "send" and "receive" objects, for consistency +with other communication interfaces like :class:`trio.Stream`. Also, +the two objects can now be closed individually, making it much easier +to gracefully shut down a channel. Also, check out the nifty ``clone`` API to make it easy to manage fan-in scenarios. Also, the API has been written to allow for future channel-like objects that send objects across process boundaries. Also, it supports unbounded buffering if diff --git a/newsfragments/497.removal.rst b/newsfragments/497.removal.rst index cf6309e515..aa838ad7b6 100644 --- a/newsfragments/497.removal.rst +++ b/newsfragments/497.removal.rst @@ -1,2 +1,2 @@ ``trio.Queue`` and ``trio.hazmat.UnboundedQueue`` have been -deprecated, in favor of :func:`trio.open_channel`. +deprecated, in favor of :func:`trio.open_memory_channel`. diff --git a/trio/_abc.py b/trio/_abc.py index 7da9e16c8e..5c41f79e70 100644 --- a/trio/_abc.py +++ b/trio/_abc.py @@ -550,7 +550,7 @@ class SendChannel(AsyncResource): __slots__ = () @abstractmethod - async def send_nowait(self, value): + def send_nowait(self, value): """Attempt to send an object through the channel, without blocking. Args: @@ -596,12 +596,12 @@ def clone(self): receivers don't get :exc:`~trio.EndOfChannel` until *all* clones have been closed. - This is useful for fan-in communication patterns, with multiple + This is useful for communication patterns that involve multiple producers all sending objects to the same destination. If you give - each producer its own clone of the :class:`SendChannel`, and make sure - to close each :class:`SendChannel` when it's finished, then receivers + each producer its own clone of the :class:`SendChannel`, and then make + sure to close each :class:`SendChannel` when it's finished, receivers will automatically get notified when all producers are finished. See - :ref:`channel-fan-in-fan-out`. + :ref:`channel-mpmc` for examples. Raises: trio.ClosedResourceError: if you already closed this @@ -633,7 +633,7 @@ class ReceiveChannel(AsyncResource): __slots__ = () @abstractmethod - async def receive_nowait(self): + def receive_nowait(self): """Attempt to receive an incoming object, without blocking. Returns: @@ -678,15 +678,21 @@ def clone(self): This returns a new :class:`ReceiveChannel` object, which acts as a duplicate of the original: receiving on the new object does exactly - the same thing as receiving on the old object. They share an - underlying buffer. + the same thing as receiving on the old object. However, closing one of the objects does not close the other, and the underlying channel is not closed until all clones are closed. - This is useful for fan-out communication patterns, with multiple - consumers all receiving objects from the same underlying channel. See - :ref:`channel-fan-in-fan-out`. + This is useful for communication patterns involving multiple consumers + all receiving objects from the same underlying channel. See + :ref:`channel-mpmc` for examples. + + .. warning:: The clones all share the same underlying channel. + Whenever a clone :meth:`receive`\s a value, it is removed from the + channel and the other clones do *not* receive that value. If you + want to send multiple copies of the same stream of values to + multiple destinations, like :func:`itertools.tee`, then you need to + find some other solution; this method does *not* do that. Raises: trio.ClosedResourceError: if you already closed this diff --git a/trio/_channel.py b/trio/_channel.py index 837b430106..64d5cff12a 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -40,6 +40,9 @@ # - is their a better/more evocative name for "clone"? People seem to be # having trouble with it, but I'm not sure whether it's just because of # missing docs. +# - and btw, any better names than Channel (in particular vs. Stream?) +# - should the *_nowait methods be in the ABC? (e.g. doesn't really make sense +# for something like websockets...) # - trio.testing.check_channel? diff --git a/trio/_core/_unbounded_queue.py b/trio/_core/_unbounded_queue.py index f9e7321aa9..57ea47d5ed 100644 --- a/trio/_core/_unbounded_queue.py +++ b/trio/_core/_unbounded_queue.py @@ -48,7 +48,7 @@ class UnboundedQueue: "0.9.0", issue=497, thing="trio.hazmat.UnboundedQueue", - instead="trio.open_channel" + instead="trio.open_memory_channel(math.inf)" ) def __init__(self): self._lot = _core.ParkingLot() diff --git a/trio/_sync.py b/trio/_sync.py index 51884f90db..edb85ce1bc 100644 --- a/trio/_sync.py +++ b/trio/_sync.py @@ -845,7 +845,10 @@ class Queue: """ @deprecated( - "0.9.0", issue=497, thing="trio.Queue", instead="trio.open_channel" + "0.9.0", + issue=497, + thing="trio.Queue", + instead="trio.open_memory_channel" ) def __init__(self, capacity): if not isinstance(capacity, int): diff --git a/trio/tests/test_channel.py b/trio/tests/test_channel.py index def3ac445f..7c101b9439 100644 --- a/trio/tests/test_channel.py +++ b/trio/tests/test_channel.py @@ -56,7 +56,7 @@ async def test_553(autojump_clock): await s.send("Test for PR #553") -async def test_channel_fan_in(): +async def test_channel_multiple_producers(): async def producer(send_channel, i): # We close our handle when we're done with it async with send_channel: @@ -79,6 +79,28 @@ async def producer(send_channel, i): assert got == list(range(30)) +async def test_channel_multiple_consumers(): + successful_receivers = set() + received = [] + + async def consumer(receive_channel, i): + async for value in receive_channel: + successful_receivers.add(i) + received.append(value) + + async with trio.open_nursery() as nursery: + send_channel, receive_channel = trio.open_memory_channel(1) + async with send_channel: + for i in range(5): + nursery.start_soon(consumer, receive_channel, i) + await wait_all_tasks_blocked() + for i in range(10): + await send_channel.send(i) + + assert successful_receivers == set(range(5)) + assert len(received) == 10 + assert set(received) == set(range(10)) + async def test_close_basics(): async def send_block(s, expect): with pytest.raises(expect): @@ -97,7 +119,7 @@ async def send_block(s, expect): with pytest.raises(trio.ClosedResourceError): await s.send(None) - # and receive is notified, of course + # and receive gets EndOfChannel with pytest.raises(EndOfChannel): r.receive_nowait() with pytest.raises(EndOfChannel): From 9600a1c293aa4e8404aecb46ee2e5e8de45854b6 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 5 Oct 2018 00:16:40 -0700 Subject: [PATCH 18/23] yapf --- trio/tests/test_channel.py | 1 + 1 file changed, 1 insertion(+) diff --git a/trio/tests/test_channel.py b/trio/tests/test_channel.py index 7c101b9439..b96af5fd6d 100644 --- a/trio/tests/test_channel.py +++ b/trio/tests/test_channel.py @@ -101,6 +101,7 @@ async def consumer(receive_channel, i): assert len(received) == 10 assert set(received) == set(range(10)) + async def test_close_basics(): async def send_block(s, expect): with pytest.raises(expect): From 177bc0cf2469cf4ab6f70ee3016b332978f3c95b Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 5 Oct 2018 00:16:50 -0700 Subject: [PATCH 19/23] Add forgotten files --- .../blocking-trio-portal-example.py | 44 +++++++++++++++++++ .../reference-core/channels-mpmc-broken.py | 30 +++++++++++++ .../reference-core/channels-mpmc-fixed.py | 29 ++++++++++++ 3 files changed, 103 insertions(+) create mode 100644 docs/source/reference-core/blocking-trio-portal-example.py create mode 100644 docs/source/reference-core/channels-mpmc-broken.py create mode 100644 docs/source/reference-core/channels-mpmc-fixed.py diff --git a/docs/source/reference-core/blocking-trio-portal-example.py b/docs/source/reference-core/blocking-trio-portal-example.py new file mode 100644 index 0000000000..998fec9bd2 --- /dev/null +++ b/docs/source/reference-core/blocking-trio-portal-example.py @@ -0,0 +1,44 @@ +import trio +import threading + +def thread_fn(portal, receive_from_trio, send_to_trio): + while True: + # Since we're in a thread, we can't call methods on Trio + # objects directly -- so we use our portal to call them. + try: + request = portal.run(receive_from_trio.receive) + except trio.EndOfChannel: + portal.run(send_to_trio.aclose) + return + else: + response = request + 1 + portal.run(send_to_trio.send, response) + +async def main(): + portal = trio.BlockingTrioPortal() + send_to_thread, receive_from_trio = trio.open_memory_channel(0) + send_to_trio, receive_from_thread = trio.open_memory_channel(0) + + async with trio.open_nursery() as nursery: + # In a background thread, run: + # thread_fn(portal, receive_from_trio, send_to_trio) + nursery.start_soon( + trio.run_sync_in_worker_thread, + thread_fn, portal, receive_from_trio, send_to_trio + ) + + # prints "1" + await send_to_thread.send(0) + print(await receive_from_thread.receive()) + + # prints "2" + await send_to_thread.send(1) + print(await receive_from_thread.receive()) + + # When we close the channel, it signals the thread to exit. + await send_to_thread.aclose() + + # When we exit the nursery, it waits for the background thread to + # exit. + +trio.run(main) diff --git a/docs/source/reference-core/channels-mpmc-broken.py b/docs/source/reference-core/channels-mpmc-broken.py new file mode 100644 index 0000000000..2a755acba3 --- /dev/null +++ b/docs/source/reference-core/channels-mpmc-broken.py @@ -0,0 +1,30 @@ +# This example usually crashes! + +import trio +import random + +async def main(): + async with trio.open_nursery() as nursery: + send_channel, receive_channel = trio.open_memory_channel(0) + # Start two producers + nursery.start_soon(producer, "A", send_channel) + nursery.start_soon(producer, "B", send_channel) + # And two consumers + nursery.start_soon(consumer, "X", receive_channel) + nursery.start_soon(consumer, "Y", receive_channel) + +async def producer(name, send_channel): + async with send_channel: + for i in range(3): + await send_channel.send("{} from producer {}".format(i, name)) + # Random sleeps help trigger the problem more reliably + await trio.sleep(random.random()) + +async def consumer(name, receive_channel): + async with receive_channel: + async for value in receive_channel: + print("consumer {} got value {!r}".format(name, value)) + # Random sleeps help trigger the problem more reliably + await trio.sleep(random.random()) + +trio.run(main) diff --git a/docs/source/reference-core/channels-mpmc-fixed.py b/docs/source/reference-core/channels-mpmc-fixed.py new file mode 100644 index 0000000000..a3e7044fe7 --- /dev/null +++ b/docs/source/reference-core/channels-mpmc-fixed.py @@ -0,0 +1,29 @@ +import trio +import random + +async def main(): + async with trio.open_nursery() as nursery: + send_channel, receive_channel = trio.open_memory_channel(0) + async with send_channel, receive_channel: + # Start two producers, giving each its own private clone + nursery.start_soon(producer, "A", send_channel.clone()) + nursery.start_soon(producer, "B", send_channel.clone()) + # And two consumers, giving each its own private clone + nursery.start_soon(consumer, "X", receive_channel.clone()) + nursery.start_soon(consumer, "Y", receive_channel.clone()) + +async def producer(name, send_channel): + async with send_channel: + for i in range(3): + await send_channel.send("{} from producer {}".format(i, name)) + # Random sleeps help trigger the problem more reliably + await trio.sleep(random.random()) + +async def consumer(name, receive_channel): + async with receive_channel: + async for value in receive_channel: + print("consumer {} got value {!r}".format(name, value)) + # Random sleeps help trigger the problem more reliably + await trio.sleep(random.random()) + +trio.run(main) From a9b95ef598ab1efc7532eb910a04d406cf093152 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 5 Oct 2018 00:35:46 -0700 Subject: [PATCH 20/23] Fix sphinx link --- newsfragments/497.feature.rst | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/newsfragments/497.feature.rst b/newsfragments/497.feature.rst index 820b76471a..2f3e21b4b1 100644 --- a/newsfragments/497.feature.rst +++ b/newsfragments/497.feature.rst @@ -1,12 +1,12 @@ New and improved APIs for inter-task communication: :class:`trio.abc.SendChannel`, :class:`trio.abc.ReceiveChannel`, and :func:`trio.open_memory_channel` (which replaces ``trio.Queue``). This -interface uses separate "send" and "receive" objects, for consistency -with other communication interfaces like :class:`trio.Stream`. Also, -the two objects can now be closed individually, making it much easier -to gracefully shut down a channel. Also, check out the nifty ``clone`` -API to make it easy to manage fan-in scenarios. Also, the API has been -written to allow for future channel-like objects that send objects -across process boundaries. Also, it supports unbounded buffering if -you really need it. Also, help I can't stop writing also. See -:ref:`channels` for more details. +interface uses separate "send" and "receive" methods, for consistency +with other communication interfaces like :class:`~trio.abc.Stream`. +Also, the two objects can now be closed individually, making it much +easier to gracefully shut down a channel. Also, check out the nifty +``clone`` API to make it easy to manage fan-in scenarios. Also, the +API has been written to allow for future channel-like objects that +send objects across process boundaries. Also, it supports unbounded +buffering if you really need it. Also, help I can't stop writing also. +See :ref:`channels` for more details. From c23dcba9491c3dcb2cdcd144b160b0d72f2dc78a Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 5 Oct 2018 01:24:54 -0700 Subject: [PATCH 21/23] Remove working notes Now immortalized in gh-719 --- trio/_channel.py | 38 -------------------------------------- 1 file changed, 38 deletions(-) diff --git a/trio/_channel.py b/trio/_channel.py index 64d5cff12a..a74bde9f81 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -7,44 +7,6 @@ from . import _core from .abc import SendChannel, ReceiveChannel -# rename SendChannel/ReceiveChannel to SendHandle/ReceiveHandle? -# eh, maybe not -- SendStream/ReceiveStream don't work like that. - -# send or send_object? eh just send is good enough - -# implementing this interface on top of a stream is very natural... just -# pickle/unpickle (+ some framing). Actually, even clone() is not bad at -# all... you just need a shared counter of open handles, and then close the -# underlying stream when all SendChannels are closed. - -# to think about later: -# - max_buffer_size=0 default? -# - should we make ReceiveChannel.close() raise BrokenChannelError if data gets -# lost? This isn't how ReceiveStream works. And it might not be doable for a -# channel that reaches between processes (e.g. data could be in flight but -# we don't know it yet). (Well, we could make it raise if it hasn't gotten a -# clean goodbye message.) OTOH, ReceiveStream has the assumption that you're -# going to spend significant effort on engineering some protocol on top of -# it, while Channel is supposed to be useful out-of-the-box. -# Practically speaking, if a consumer crashes and then its __aexit__ -# replaces the actual exception with BrokenChannelError, that's kind of -# annoying. -# But lost messages are bad too... maybe the *sender* aclose() should raise -# if it lost a message? I guess that has the same issue... -# - should we have a ChannelPair object, instead of returning a tuple? -# upside: no need to worry about order -# could have shorthand send/receive methods -# downside: pretty annoying to type out channel_pair.send_channel like... -# ever. can't deconstruct on assignment. (Or, well you could by making it -# implement __iter__, but then that's yet another quirky way to do it.) -# - is their a better/more evocative name for "clone"? People seem to be -# having trouble with it, but I'm not sure whether it's just because of -# missing docs. -# - and btw, any better names than Channel (in particular vs. Stream?) -# - should the *_nowait methods be in the ABC? (e.g. doesn't really make sense -# for something like websockets...) -# - trio.testing.check_channel? - def open_memory_channel(max_buffer_size): """Open a channel for passing objects between tasks within a process. From ad74f70a474097d1189391d0ea26fb636ed238c6 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 5 Oct 2018 01:31:36 -0700 Subject: [PATCH 22/23] Fix another stray reference to queues --- docs/source/reference-core.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index 755808a3a6..5b2dc82741 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1564,7 +1564,7 @@ that :meth:`~trio.abc.SendChannel.send` never blocks. Lower-level synchronization primitives ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Personally, I find that events and queues are usually enough to +Personally, I find that events and channels are usually enough to implement most things I care about, and lead to easier to read code than the lower-level primitives discussed in this section. But if you need them, they're here. (If you find yourself reaching for these From d424aa9ba330861d116d5b37a4083651d07e7783 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 5 Oct 2018 01:32:06 -0700 Subject: [PATCH 23/23] Document channel .statistics(), and add open_receive_channels stat --- trio/_channel.py | 21 +++++++++++++++++++++ trio/tests/test_channel.py | 6 ++++++ 2 files changed, 27 insertions(+) diff --git a/trio/_channel.py b/trio/_channel.py index a74bde9f81..7188e0a054 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -33,6 +33,25 @@ def open_memory_channel(max_buffer_size): trouble remembering which order these go in, remember: data flows from left → right. + In addition to the regular channel interfaces, all memory channel + endpoints provide a ``statistics()`` method, which returns an object with + the following fields: + + * ``current_buffer_used``: The number of items currently stored in the + channel buffer. + * ``max_buffer_size``: The maximum number of items allowed in the buffer, + as passed to :func:`open_memory_channel`. + * ``open_send_channels``: The number of open + :class:`~trio.abc.SendChannel` endpoints pointing to this channel. + Initially 1, but can be increased by + :meth:`~trio.abc.SendChannel.clone`. + * ``open_receive_channels``: Likewise, but for open + :class:`~trio.abc.ReceiveChannel` endpoints. + * ``tasks_waiting_send``: The number of tasks blocked in ``send`` on this + channel (summing over all clones). + * ``tasks_waiting_receive``: The number of tasks blocked in ``receive`` on + this channel (summing over all clones). + """ if max_buffer_size != inf and not isinstance(max_buffer_size, int): raise TypeError("max_buffer_size must be an integer or math.inf") @@ -47,6 +66,7 @@ class ChannelStats: current_buffer_used = attr.ib() max_buffer_size = attr.ib() open_send_channels = attr.ib() + open_receive_channels = attr.ib() tasks_waiting_send = attr.ib() tasks_waiting_receive = attr.ib() @@ -68,6 +88,7 @@ def statistics(self): current_buffer_used=len(self.data), max_buffer_size=self.max_buffer_size, open_send_channels=self.open_send_channels, + open_receive_channels=self.open_receive_channels, tasks_waiting_send=len(self.send_tasks), tasks_waiting_receive=len(self.receive_tasks), ) diff --git a/trio/tests/test_channel.py b/trio/tests/test_channel.py index b96af5fd6d..b43466dd7d 100644 --- a/trio/tests/test_channel.py +++ b/trio/tests/test_channel.py @@ -248,6 +248,7 @@ async def test_statistics(): assert stats.current_buffer_used == 0 assert stats.max_buffer_size == 2 assert stats.open_send_channels == 1 + assert stats.open_receive_channels == 1 assert stats.tasks_waiting_send == 0 assert stats.tasks_waiting_receive == 0 @@ -259,6 +260,11 @@ async def test_statistics(): await s.aclose() assert s2.statistics().open_send_channels == 1 + r2 = r.clone() + assert s2.statistics().open_receive_channels == 2 + await r2.aclose() + assert s2.statistics().open_receive_channels == 1 + async with trio.open_nursery() as nursery: s2.send_nowait(None) # fill up the buffer assert s.statistics().current_buffer_used == 2