Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(buffers): make LimitedSender/LimitedReceiver wake up correctly #11741

Merged
merged 1 commit into from
Mar 9, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions lib/vector-buffers/src/topology/channel/limited_queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crossbeam_queue::ArrayQueue;
use futures::{ready, task::AtomicWaker, Sink, Stream};
use futures::{ready, Sink, Stream};
use std::{
cmp, fmt,
pin::Pin,
Expand Down Expand Up @@ -32,7 +32,7 @@ struct Inner<T> {
data: Arc<ArrayQueue<(OwnedSemaphorePermit, T)>>,
limit: usize,
limiter: PollSemaphore,
read_waker: Arc<AtomicWaker>,
read_waker: PollNotify,
write_waker: PollNotify,
}

Expand Down Expand Up @@ -154,7 +154,7 @@ impl<T: Bufferable> LimitedSender<T> {
);

// Don't forget to wake the reader since there's data to consume now. :)
self.inner.read_waker.wake();
self.inner.read_waker.as_ref().notify_one();

Poll::Ready(Ok(()))
}
Expand Down Expand Up @@ -220,24 +220,27 @@ impl<T: Bufferable> LimitedReceiver<T> {
}

pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
match self.inner.data.pop() {
Some((permit, item)) => {
loop {
if let Some((permit, item)) = self.inner.data.pop() {
// We got an item, woohoo! Now, drop the permit which will properly free up permits
// in the semaphore, and then also try to notify a pending writer.
drop(permit);
self.inner.write_waker.as_ref().notify_one();
Poll::Ready(Some(item))

return Poll::Ready(Some(item));
}
// Figure out if we're actually closed or not, to determine if more items might be
// coming or if it's time to also close up shop.
None => {
if self.inner.limiter.is_closed() {
Poll::Ready(None)
} else {
self.inner.read_waker.register(cx.waker());
Poll::Pending
}

// There wasn't an item for us to pop, so see if the channel is actually closed. If so,
// then it's time for us to close up shop as well.
if self.inner.limiter.is_closed() {
return Poll::Ready(None);
}

// We're not closed, so we need to wait for a writer to tell us they made some
// progress. This might end up being a spurious wakeup since `Notify` will
// store up to one wakeup that gets consumed by the next call to `poll_notify`,
// but alas.
ready!(self.inner.read_waker.poll_notify(cx));
}
}

Expand Down Expand Up @@ -271,7 +274,7 @@ impl<T> Drop for LimitedSender<T> {
// If we're the last sender to drop, close the semaphore on our way out the door.
if self.sender_count.fetch_sub(1, Ordering::SeqCst) == 1 {
self.inner.limiter.close();
self.inner.read_waker.wake();
self.inner.read_waker.as_ref().notify_one();
}
}
}
Expand All @@ -281,7 +284,7 @@ pub fn limited<T>(limit: usize) -> (LimitedSender<T>, LimitedReceiver<T>) {
data: Arc::new(ArrayQueue::new(limit)),
limit,
limiter: PollSemaphore::new(Arc::new(Semaphore::new(limit))),
read_waker: Arc::new(AtomicWaker::new()),
read_waker: PollNotify::new(Arc::new(Notify::new())),
write_waker: PollNotify::new(Arc::new(Notify::new())),
};

Expand Down