Skip to content

Commit

Permalink
Update to new futures_api (cx)
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Apr 8, 2019
1 parent dfb7210 commit 3fa1ec0
Show file tree
Hide file tree
Showing 168 changed files with 1,012 additions and 1,008 deletions.
40 changes: 20 additions & 20 deletions futures-channel/benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
ready,
stream::{Stream, StreamExt},
sink::Sink,
task::{Waker, Poll},
task::{Context, Poll},
},
futures_test::task::noop_waker_ref,
std::pin::Pin,
Expand All @@ -18,7 +18,7 @@ use {
/// Single producer, single consumer
#[bench]
fn unbounded_1_tx(b: &mut Bencher) {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(noop_waker_ref());
b.iter(|| {
let (tx, mut rx) = mpsc::unbounded();

Expand All @@ -27,20 +27,20 @@ fn unbounded_1_tx(b: &mut Bencher) {
for i in 0..1000 {

// Poll, not ready, park
assert_eq!(Poll::Pending, rx.poll_next_unpin(waker));
assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));

UnboundedSender::unbounded_send(&tx, i).unwrap();

// Now poll ready
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
}
})
}

/// 100 producers, single consumer
#[bench]
fn unbounded_100_tx(b: &mut Bencher) {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(noop_waker_ref());
b.iter(|| {
let (tx, mut rx) = mpsc::unbounded();

Expand All @@ -49,26 +49,26 @@ fn unbounded_100_tx(b: &mut Bencher) {
// 1000 send/recv operations total, result should be divided by 1000
for _ in 0..10 {
for i in 0..tx.len() {
assert_eq!(Poll::Pending, rx.poll_next_unpin(waker));
assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));

UnboundedSender::unbounded_send(&tx[i], i).unwrap();

assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
}
}
})
}

#[bench]
fn unbounded_uncontended(b: &mut Bencher) {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(noop_waker_ref());
b.iter(|| {
let (tx, mut rx) = mpsc::unbounded();

for i in 0..1000 {
UnboundedSender::unbounded_send(&tx, i).expect("send");
// No need to create a task, because poll is not going to park.
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
}
})
}
Expand All @@ -84,41 +84,41 @@ struct TestSender {
impl Stream for TestSender {
type Item = u32;

fn poll_next(mut self: Pin<&mut Self>, waker: &Waker)
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
let this = &mut *self;
let mut tx = Pin::new(&mut this.tx);

ready!(tx.as_mut().poll_ready(waker)).unwrap();
ready!(tx.as_mut().poll_ready(cx)).unwrap();
tx.as_mut().start_send(this.last + 1).unwrap();
this.last += 1;
assert_eq!(Poll::Ready(Ok(())), tx.as_mut().poll_flush(waker));
assert_eq!(Poll::Ready(Ok(())), tx.as_mut().poll_flush(cx));
Poll::Ready(Some(this.last))
}
}

/// Single producers, single consumer
#[bench]
fn bounded_1_tx(b: &mut Bencher) {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(noop_waker_ref());
b.iter(|| {
let (tx, mut rx) = mpsc::channel(0);

let mut tx = TestSender { tx, last: 0 };

for i in 0..1000 {
assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(waker));
assert_eq!(Poll::Pending, tx.poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx));
assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx));
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
}
})
}

/// 100 producers, single consumer
#[bench]
fn bounded_100_tx(b: &mut Bencher) {
let waker = noop_waker_ref();
let mut cx = Context::from_waker(noop_waker_ref());
b.iter(|| {
// Each sender can send one item after specified capacity
let (tx, mut rx) = mpsc::channel(0);
Expand All @@ -133,11 +133,11 @@ fn bounded_100_tx(b: &mut Bencher) {
for i in 0..10 {
for j in 0..tx.len() {
// Send an item
assert_eq!(Poll::Ready(Some(i + 1)), tx[j].poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i + 1)), tx[j].poll_next_unpin(&mut cx));
// Then block
assert_eq!(Poll::Pending, tx[j].poll_next_unpin(waker));
assert_eq!(Poll::Pending, tx[j].poll_next_unpin(&mut cx));
// Recv the item
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(waker));
assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
}
}
})
Expand Down
24 changes: 12 additions & 12 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
// by the queue structure.

use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Waker, Poll};
use futures_core::task::{Context, Poll, Waker};
use futures_core::task::__internal::AtomicWaker;
use std::any::Any;
use std::error::Error;
Expand Down Expand Up @@ -555,7 +555,7 @@ impl<T> SenderInner<T> {
/// - `Err(SendError)` if the receiver has been dropped.
fn poll_ready(
&mut self,
waker: &Waker
cx: &mut Context<'_>,
) -> Poll<Result<(), SendError>> {
let state = decode_state(self.inner.state.load(SeqCst));
if !state.is_open {
Expand All @@ -564,7 +564,7 @@ impl<T> SenderInner<T> {
}));
}

self.poll_unparked(Some(waker)).map(Ok)
self.poll_unparked(Some(cx)).map(Ok)
}

/// Returns whether this channel is closed without needing a context.
Expand All @@ -582,7 +582,7 @@ impl<T> SenderInner<T> {
self.inner.recv_task.wake();
}

fn poll_unparked(&mut self, waker: Option<&Waker>) -> Poll<()> {
fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
// First check the `maybe_parked` variable. This avoids acquiring the
// lock in most cases
if self.maybe_parked {
Expand All @@ -600,7 +600,7 @@ impl<T> SenderInner<T> {
//
// Update the task in case the `Sender` has been moved to another
// task
task.task = waker.cloned();
task.task = cx.map(|cx| cx.waker().clone());

Poll::Pending
} else {
Expand Down Expand Up @@ -649,12 +649,12 @@ impl<T> Sender<T> {
/// - `Err(SendError)` if the receiver has been dropped.
pub fn poll_ready(
&mut self,
waker: &Waker,
cx: &mut Context<'_>,
) -> Poll<Result<(), SendError>> {
let inner = self.0.as_mut().ok_or(SendError {
kind: SendErrorKind::Disconnected,
})?;
inner.poll_ready(waker)
inner.poll_ready(cx)
}

/// Returns whether this channel is closed without needing a context.
Expand All @@ -679,7 +679,7 @@ impl<T> UnboundedSender<T> {
/// Check if the channel is ready to receive a message.
pub fn poll_ready(
&self,
_: &Waker,
_: &mut Context<'_>,
) -> Poll<Result<(), SendError>> {
let inner = self.0.as_ref().ok_or(SendError {
kind: SendErrorKind::Disconnected,
Expand Down Expand Up @@ -904,7 +904,7 @@ impl<T> Stream for Receiver<T> {

fn poll_next(
mut self: Pin<&mut Self>,
waker: &Waker,
cx: &mut Context<'_>,
) -> Poll<Option<T>> {
// Try to read a message off of the message queue.
match self.next_message() {
Expand All @@ -916,7 +916,7 @@ impl<T> Stream for Receiver<T> {
},
Poll::Pending => {
// There are no messages to read, in this case, park.
self.inner.as_ref().unwrap().recv_task.register(waker);
self.inner.as_ref().unwrap().recv_task.register(cx.waker());
// Check queue again after parking to prevent race condition:
// a message could be added to the queue after previous `next_message`
// before `register` call.
Expand Down Expand Up @@ -971,9 +971,9 @@ impl<T> Stream for UnboundedReceiver<T> {

fn poll_next(
mut self: Pin<&mut Self>,
waker: &Waker,
cx: &mut Context<'_>,
) -> Poll<Option<T>> {
Pin::new(&mut self.0).poll_next(waker)
Pin::new(&mut self.0).poll_next(cx)
}
}

Expand Down
18 changes: 9 additions & 9 deletions futures-channel/src/oneshot.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A channel for sending a single message between asynchronous tasks.
use futures_core::future::Future;
use futures_core::task::{Waker, Poll};
use futures_core::task::{Context, Poll, Waker};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
Expand Down Expand Up @@ -154,7 +154,7 @@ impl<T> Inner<T> {
}
}

fn poll_cancel(&self, waker: &Waker) -> Poll<()> {
fn poll_cancel(&self, cx: &mut Context<'_>) -> Poll<()> {
// Fast path up first, just read the flag and see if our other half is
// gone. This flag is set both in our destructor and the oneshot
// destructor, but our destructor hasn't run yet so if it's set then the
Expand All @@ -176,7 +176,7 @@ impl<T> Inner<T> {
// `Receiver` may have been dropped. The first thing it does is set the
// flag, and if it fails to acquire the lock it assumes that we'll see
// the flag later on. So... we then try to see the flag later on!
let handle = waker.clone();
let handle = cx.waker().clone();
match self.tx_task.try_lock() {
Some(mut p) => *p = Some(handle),
None => return Poll::Ready(()),
Expand Down Expand Up @@ -249,7 +249,7 @@ impl<T> Inner<T> {
}
}

fn recv(&self, waker: &Waker) -> Poll<Result<T, Canceled>> {
fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
// Check to see if some data has arrived. If it hasn't then we need to
// block our task.
//
Expand All @@ -260,7 +260,7 @@ impl<T> Inner<T> {
let done = if self.complete.load(SeqCst) {
true
} else {
let task = waker.clone();
let task = cx.waker().clone();
match self.rx_task.try_lock() {
Some(mut slot) => { *slot = Some(task); false },
None => true,
Expand Down Expand Up @@ -348,8 +348,8 @@ impl<T> Sender<T> {
/// alive and may be able to receive a message if sent. The current task,
/// however, is scheduled to receive a notification if the corresponding
/// `Receiver` goes away.
pub fn poll_cancel(&mut self, waker: &Waker) -> Poll<()> {
self.inner.poll_cancel(waker)
pub fn poll_cancel(&mut self, cx: &mut Context<'_>) -> Poll<()> {
self.inner.poll_cancel(cx)
}

/// Tests to see whether this `Sender`'s corresponding `Receiver`
Expand Down Expand Up @@ -416,9 +416,9 @@ impl<T> Future for Receiver<T> {

fn poll(
self: Pin<&mut Self>,
waker: &Waker,
cx: &mut Context<'_>,
) -> Poll<Result<T, Canceled>> {
self.inner.recv(waker)
self.inner.recv(cx)
}
}

Expand Down
4 changes: 2 additions & 2 deletions futures-channel/tests/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) {
fn drop_sender() {
let (tx, mut rx) = mpsc::channel::<u32>(1);
drop(tx);
let f = poll_fn(|lw| {
rx.poll_next_unpin(lw)
let f = poll_fn(|cx| {
rx.poll_next_unpin(cx)
});
assert_eq!(block_on(f), None)
}
Expand Down
Loading

0 comments on commit 3fa1ec0

Please sign in to comment.