Skip to content

Commit

Permalink
Update to new futures_api (wake-by-ref)
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Apr 13, 2019
1 parent 53af42c commit 265ca12
Show file tree
Hide file tree
Showing 21 changed files with 103 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ matrix:

# When updating this, the reminder to update the minimum required version in README.md.
- name: cargo test (minimum required version)
rust: nightly-2019-04-08
rust: nightly-2019-04-13

- name: cargo clippy
rust: nightly
Expand Down
2 changes: 1 addition & 1 deletion futures-core/src/task/__internal/atomic_waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl AtomicWaker {
// Currently in the process of waking the task, i.e.,
// `wake` is currently being called on the old task handle.
// So, we call wake on the new waker
waker.wake();
waker.wake_by_ref();
}
state => {
// In this case, a concurrent thread is holding the
Expand Down
4 changes: 2 additions & 2 deletions futures-executor/benches/thread_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn thread_yield_single_thread_one_wait(b: &mut Bencher) {
Poll::Ready(())
} else {
self.rem -= 1;
cx.waker().wake();
cx.waker().wake_by_ref();
Poll::Pending
}
}
Expand Down Expand Up @@ -52,7 +52,7 @@ fn thread_yield_single_thread_many_wait(b: &mut Bencher) {
Poll::Ready(())
} else {
self.rem -= 1;
cx.waker().wake();
cx.waker().wake_by_ref();
Poll::Pending
}
}
Expand Down
6 changes: 5 additions & 1 deletion futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ thread_local! {
}

impl ArcWake for ThreadNotify {
fn wake(arc_self: &Arc<Self>) {
fn wake(self: Arc<Self>) {
Self::wake_by_ref(&self)
}

fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.thread.unpark();
}
}
Expand Down
6 changes: 5 additions & 1 deletion futures-executor/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,11 @@ impl fmt::Debug for Task {
}

impl ArcWake for WakeHandle {
fn wake(arc_self: &Arc<Self>) {
fn wake(self: Arc<Self>) {
Self::wake_by_ref(&self)
}

fn wake_by_ref(arc_self: &Arc<Self>) {
match arc_self.mutex.notify() {
Ok(task) => arc_self.exec.state.send(Message::Run(task)),
Err(()) => {}
Expand Down
3 changes: 1 addition & 2 deletions futures-executor/tests/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ fn tasks_are_scheduled_fairly() {
return Poll::Ready(());
}

cx.waker().wake();
cx.waker().wake_by_ref();
Poll::Pending
}
}
Expand All @@ -167,4 +167,3 @@ fn tasks_are_scheduled_fairly() {

pool.run();
}

12 changes: 6 additions & 6 deletions futures-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ mod if_std {
///
/// If no data is available for reading, the method returns
/// `Ok(Poll::Pending)` and arranges for the current task (via
/// `cx.waker().wake()`) to receive a notification when the object becomes
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
/// readable or is closed.
///
/// # Implementation
Expand All @@ -122,7 +122,7 @@ mod if_std {
///
/// If no data is available for reading, the method returns
/// `Ok(Poll::Pending)` and arranges for the current task (via
/// `cx.waker().wake()`) to receive a notification when the object becomes
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
/// readable or is closed.
/// By default, this method delegates to using `poll_read` on the first
/// buffer in `vec`. Objects which support vectored IO should override
Expand Down Expand Up @@ -160,7 +160,7 @@ mod if_std {
///
/// If the object is not ready for writing, the method returns
/// `Ok(Poll::Pending)` and arranges for the current task (via
/// `cx.waker().wake()`) to receive a notification when the object becomes
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
/// readable or is closed.
///
/// # Implementation
Expand All @@ -182,7 +182,7 @@ mod if_std {
///
/// If the object is not ready for writing, the method returns
/// `Ok(Poll::Pending)` and arranges for the current task (via
/// `cx.waker().wake()`) to receive a notification when the object becomes
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
/// readable or is closed.
///
/// By default, this method delegates to using `poll_write` on the first
Expand Down Expand Up @@ -213,7 +213,7 @@ mod if_std {
///
/// If flushing cannot immediately complete, this method returns
/// `Ok(Poll::Pending)` and arranges for the current task (via
/// `cx.waker().wake()`) to receive a notification when the object can make
/// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
/// progress towards flushing.
///
/// # Implementation
Expand All @@ -230,7 +230,7 @@ mod if_std {
///
/// If closing cannot immediately complete, this function returns
/// `Ok(Poll::Pending)` and arranges for the current task (via
/// `cx.waker().wake()`) to receive a notification when the object can make
/// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
/// progress towards closing.
///
/// # Implementation
Expand Down
6 changes: 3 additions & 3 deletions futures-sink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub trait Sink<Item> {
///
/// This method returns `Poll::Ready` once the underlying sink is ready to
/// receive data. If this method returns `Poll::Pending`, the current task
/// is registered to be notified (via `cx.waker().wake()`) when `poll_ready`
/// is registered to be notified (via `cx.waker().wake_by_ref()`) when `poll_ready`
/// should be called again.
///
/// In most cases, if the sink encounters an error, the sink will
Expand Down Expand Up @@ -95,7 +95,7 @@ pub trait Sink<Item> {
/// via `start_send` have been flushed.
///
/// Returns `Ok(Poll::Pending)` if there is more work left to do, in which
/// case the current task is scheduled (via `cx.waker().wake()`) to wake up when
/// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when
/// `poll_flush` should be called again.
///
/// In most cases, if the sink encounters an error, the sink will
Expand All @@ -108,7 +108,7 @@ pub trait Sink<Item> {
/// has been successfully closed.
///
/// Returns `Ok(Poll::Pending)` if there is more work left to do, in which
/// case the current task is scheduled (via `cx.waker().wake()`) to wake up when
/// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when
/// `poll_close` should be called again.
///
/// If this function encounters an error, the sink should be considered to
Expand Down
2 changes: 1 addition & 1 deletion futures-test/src/future/pending_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl<Fut: Future> Future for PendingOnce<Fut> {
self.as_mut().future().poll(cx)
} else {
*self.as_mut().polled_before() = true;
cx.waker().wake();
cx.waker().wake_by_ref();
Poll::Pending
}
}
Expand Down
12 changes: 6 additions & 6 deletions futures-test/src/task/panic_waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ use futures_core::task::{Waker, RawWaker, RawWakerVTable};
use core::cell::UnsafeCell;
use core::ptr::null;

unsafe fn clone_panic_waker(_data: *const()) -> RawWaker {
unsafe fn clone_panic_waker(_data: *const ()) -> RawWaker {
raw_panic_waker()
}

unsafe fn noop(_data: *const()) {
}
unsafe fn noop(_data: *const ()) {}

unsafe fn wake_panic(_data: *const()) {
unsafe fn wake_panic(_data: *const ()) {
panic!("should not be woken");
}

const PANIC_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
clone_panic_waker,
wake_panic,
wake_panic,
noop,
);

Expand All @@ -37,7 +37,7 @@ fn raw_panic_waker() -> RawWaker {
/// waker.wake(); // Will panic
/// ```
pub fn panic_waker() -> Waker {
unsafe { Waker::new_unchecked(raw_panic_waker()) }
unsafe { Waker::from_raw(raw_panic_waker()) }
}

/// Get a global reference to a
Expand All @@ -52,7 +52,7 @@ pub fn panic_waker() -> Waker {
/// use futures_test::task::panic_waker_ref;
///
/// let waker = panic_waker_ref();
/// waker.wake(); // Will panic
/// waker.wake_by_ref(); // Will panic
/// ```
pub fn panic_waker_ref() -> &'static Waker {
thread_local! {
Expand Down
8 changes: 6 additions & 2 deletions futures-test/src/task/wake_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ struct WakerInner {
}

impl ArcWake for WakerInner {
fn wake(arc_self: &Arc<Self>) {
fn wake(self: Arc<Self>) {
Self::wake_by_ref(&self)
}

fn wake_by_ref(arc_self: &Arc<Self>) {
let _ = arc_self.count.fetch_add(1, Ordering::SeqCst);
}
}
Expand All @@ -49,7 +53,7 @@ impl ArcWake for WakerInner {
///
/// assert_eq!(count, 0);
///
/// waker.wake();
/// waker.wake_by_ref();
/// waker.wake();
///
/// assert_eq!(count, 2);
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/compat/compat01as03.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {

impl Notify01 for NotifyWaker {
fn notify(&self, _: usize) {
self.0.wake();
self.0.wake_by_ref();
}
}

Expand Down
10 changes: 7 additions & 3 deletions futures-util/src/compat/compat03as01.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,19 @@ impl Current {
}

let ptr = current_to_ptr(self);
let vtable = &RawWakerVTable::new(clone, wake, drop);
let vtable = &RawWakerVTable::new(clone, wake, wake, drop);
unsafe {
WakerRef::new(task03::Waker::new_unchecked(RawWaker::new(ptr, vtable)))
WakerRef::new(task03::Waker::from_raw(RawWaker::new(ptr, vtable)))
}
}
}

impl ArcWake03 for Current {
fn wake(arc_self: &Arc<Self>) {
fn wake(self: Arc<Self>) {
Self::wake_by_ref(&self)
}

fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.0.notify();
}
}
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/future/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,11 @@ where
}

impl ArcWake for Notifier {
fn wake(arc_self: &Arc<Self>) {
fn wake(self: Arc<Self>) {
Self::wake_by_ref(&self)
}

fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.state.compare_and_swap(POLLING, REPOLL, SeqCst);

let wakers = &mut *arc_self.wakers.lock().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
// At this point, it may be worth yielding the thread &
// spinning a few times... but for now, just yield using the
// task system.
cx.waker().wake();
cx.waker().wake_by_ref();
return Poll::Pending;
}
Dequeue::Data(task) => task,
Expand Down
6 changes: 5 additions & 1 deletion futures-util/src/stream/futures_unordered/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ unsafe impl<Fut> Send for Task<Fut> {}
unsafe impl<Fut> Sync for Task<Fut> {}

impl<Fut> ArcWake for Task<Fut> {
fn wake(arc_self: &Arc<Self>) {
fn wake(self: Arc<Self>) {
Self::wake_by_ref(&self)
}

fn wake_by_ref(arc_self: &Arc<Self>) {
let inner = match arc_self.ready_to_run_queue.upgrade() {
Some(inner) => inner,
None => return,
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/select_next_some.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl<'a, St: Stream + FusedStream + Unpin> Future for SelectNextSome<'a, St> {
Poll::Ready(item)
} else {
debug_assert!(self.stream.is_terminated());
cx.waker().wake();
cx.waker().wake_by_ref();
Poll::Pending
}
}
Expand Down
Loading

0 comments on commit 265ca12

Please sign in to comment.