From 792bb13144e7d40415c16edf812e783fba90fa12 Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Tue, 8 Mar 2022 22:22:30 -0500 Subject: [PATCH] fix(buffers): make LimitedSender/LimitedReceiver wake up correctly Signed-off-by: Toby Lawrence --- .../src/topology/channel/limited_queue.rs | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/lib/vector-buffers/src/topology/channel/limited_queue.rs b/lib/vector-buffers/src/topology/channel/limited_queue.rs index 29e982db8ea82..88f5f2c5b8acf 100644 --- a/lib/vector-buffers/src/topology/channel/limited_queue.rs +++ b/lib/vector-buffers/src/topology/channel/limited_queue.rs @@ -1,5 +1,5 @@ use crossbeam_queue::ArrayQueue; -use futures::{ready, task::AtomicWaker, Sink, Stream}; +use futures::{ready, Sink, Stream}; use std::{ cmp, fmt, pin::Pin, @@ -32,7 +32,7 @@ struct Inner { data: Arc>, limit: usize, limiter: PollSemaphore, - read_waker: Arc, + read_waker: PollNotify, write_waker: PollNotify, } @@ -154,7 +154,7 @@ impl LimitedSender { ); // Don't forget to wake the reader since there's data to consume now. :) - self.inner.read_waker.wake(); + self.inner.read_waker.as_ref().notify_one(); Poll::Ready(Ok(())) } @@ -220,24 +220,27 @@ impl LimitedReceiver { } pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { - match self.inner.data.pop() { - Some((permit, item)) => { + loop { + if let Some((permit, item)) = self.inner.data.pop() { // We got an item, woohoo! Now, drop the permit which will properly free up permits // in the semaphore, and then also try to notify a pending writer. drop(permit); self.inner.write_waker.as_ref().notify_one(); - Poll::Ready(Some(item)) + + return Poll::Ready(Some(item)); } - // Figure out if we're actually closed or not, to determine if more items might be - // coming or if it's time to also close up shop. - None => { - if self.inner.limiter.is_closed() { - Poll::Ready(None) - } else { - self.inner.read_waker.register(cx.waker()); - Poll::Pending - } + + // There wasn't an item for us to pop, so see if the channel is actually closed. If so, + // then it's time for us to close up shop as well. + if self.inner.limiter.is_closed() { + return Poll::Ready(None); } + + // We're not closed, so we need to wait for a writer to tell us they made some + // progress. This might end up being a spurious wakeup since `Notify` will + // store up to one wakeup that gets consumed by the next call to `poll_notify`, + // but alas. + ready!(self.inner.read_waker.poll_notify(cx)); } } @@ -271,7 +274,7 @@ impl Drop for LimitedSender { // If we're the last sender to drop, close the semaphore on our way out the door. if self.sender_count.fetch_sub(1, Ordering::SeqCst) == 1 { self.inner.limiter.close(); - self.inner.read_waker.wake(); + self.inner.read_waker.as_ref().notify_one(); } } } @@ -281,7 +284,7 @@ pub fn limited(limit: usize) -> (LimitedSender, LimitedReceiver) { data: Arc::new(ArrayQueue::new(limit)), limit, limiter: PollSemaphore::new(Arc::new(Semaphore::new(limit))), - read_waker: Arc::new(AtomicWaker::new()), + read_waker: PollNotify::new(Arc::new(Notify::new())), write_waker: PollNotify::new(Arc::new(Notify::new())), };