Skip to content

Commit

Permalink
fix(buffers): make LimitedSender/LimitedReceiver wake up correctly (#…
Browse files Browse the repository at this point in the history
…11741)

Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
  • Loading branch information
tobz authored Mar 9, 2022
1 parent d06ee2a commit 6448a29
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions lib/vector-buffers/src/topology/channel/limited_queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crossbeam_queue::ArrayQueue;
use futures::{ready, task::AtomicWaker, Sink, Stream};
use futures::{ready, Sink, Stream};
use std::{
cmp, fmt,
pin::Pin,
Expand Down Expand Up @@ -32,7 +32,7 @@ struct Inner<T> {
data: Arc<ArrayQueue<(OwnedSemaphorePermit, T)>>,
limit: usize,
limiter: PollSemaphore,
read_waker: Arc<AtomicWaker>,
read_waker: PollNotify,
write_waker: PollNotify,
}

Expand Down Expand Up @@ -154,7 +154,7 @@ impl<T: Bufferable> LimitedSender<T> {
);

// Don't forget to wake the reader since there's data to consume now. :)
self.inner.read_waker.wake();
self.inner.read_waker.as_ref().notify_one();

Poll::Ready(Ok(()))
}
Expand Down Expand Up @@ -220,24 +220,27 @@ impl<T: Bufferable> LimitedReceiver<T> {
}

pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
match self.inner.data.pop() {
Some((permit, item)) => {
loop {
if let Some((permit, item)) = self.inner.data.pop() {
// We got an item, woohoo! Now, drop the permit which will properly free up permits
// in the semaphore, and then also try to notify a pending writer.
drop(permit);
self.inner.write_waker.as_ref().notify_one();
Poll::Ready(Some(item))

return Poll::Ready(Some(item));
}
// Figure out if we're actually closed or not, to determine if more items might be
// coming or if it's time to also close up shop.
None => {
if self.inner.limiter.is_closed() {
Poll::Ready(None)
} else {
self.inner.read_waker.register(cx.waker());
Poll::Pending
}

// There wasn't an item for us to pop, so see if the channel is actually closed. If so,
// then it's time for us to close up shop as well.
if self.inner.limiter.is_closed() {
return Poll::Ready(None);
}

// We're not closed, so we need to wait for a writer to tell us they made some
// progress. This might end up being a spurious wakeup since `Notify` will
// store up to one wakeup that gets consumed by the next call to `poll_notify`,
// but alas.
ready!(self.inner.read_waker.poll_notify(cx));
}
}

Expand Down Expand Up @@ -271,7 +274,7 @@ impl<T> Drop for LimitedSender<T> {
// If we're the last sender to drop, close the semaphore on our way out the door.
if self.sender_count.fetch_sub(1, Ordering::SeqCst) == 1 {
self.inner.limiter.close();
self.inner.read_waker.wake();
self.inner.read_waker.as_ref().notify_one();
}
}
}
Expand All @@ -281,7 +284,7 @@ pub fn limited<T>(limit: usize) -> (LimitedSender<T>, LimitedReceiver<T>) {
data: Arc::new(ArrayQueue::new(limit)),
limit,
limiter: PollSemaphore::new(Arc::new(Semaphore::new(limit))),
read_waker: Arc::new(AtomicWaker::new()),
read_waker: PollNotify::new(Arc::new(Notify::new())),
write_waker: PollNotify::new(Arc::new(Notify::new())),
};

Expand Down

0 comments on commit 6448a29

Please sign in to comment.