Skip to content

Commit

Permalink
Update to new futures_api (wake-by-ref)
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Apr 15, 2019
1 parent 53af42c commit ad8f3c4
Show file tree
Hide file tree
Showing 21 changed files with 75 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ matrix:

# When updating this, the reminder to update the minimum required version in README.md.
- name: cargo test (minimum required version)
rust: nightly-2019-04-08
rust: nightly-2019-04-13

- name: cargo clippy
rust: nightly
Expand Down
2 changes: 1 addition & 1 deletion futures-core/src/task/__internal/atomic_waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl AtomicWaker {
// Currently in the process of waking the task, i.e.,
// `wake` is currently being called on the old task handle.
// So, we call wake on the new waker
waker.wake();
waker.wake_by_ref();
}
state => {
// In this case, a concurrent thread is holding the
Expand Down
4 changes: 2 additions & 2 deletions futures-executor/benches/thread_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn thread_yield_single_thread_one_wait(b: &mut Bencher) {
Poll::Ready(())
} else {
self.rem -= 1;
cx.waker().wake();
cx.waker().wake_by_ref();
Poll::Pending
}
}
Expand Down Expand Up @@ -52,7 +52,7 @@ fn thread_yield_single_thread_many_wait(b: &mut Bencher) {
Poll::Ready(())
} else {
self.rem -= 1;
cx.waker().wake();
cx.waker().wake_by_ref();
Poll::Pending
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ thread_local! {
}

impl ArcWake for ThreadNotify {
fn wake(arc_self: &Arc<Self>) {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.thread.unpark();
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures-executor/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ impl fmt::Debug for Task {
}

impl ArcWake for WakeHandle {
fn wake(arc_self: &Arc<Self>) {
fn wake_by_ref(arc_self: &Arc<Self>) {
match arc_self.mutex.notify() {
Ok(task) => arc_self.exec.state.send(Message::Run(task)),
Err(()) => {}
Expand Down
3 changes: 1 addition & 2 deletions futures-executor/tests/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ fn tasks_are_scheduled_fairly() {
return Poll::Ready(());
}

cx.waker().wake();
cx.waker().wake_by_ref();
Poll::Pending
}
}
Expand All @@ -167,4 +167,3 @@ fn tasks_are_scheduled_fairly() {

pool.run();
}

12 changes: 6 additions & 6 deletions futures-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ mod if_std {
///
/// If no data is available for reading, the method returns
/// `Ok(Poll::Pending)` and arranges for the current task (via
/// `cx.waker().wake()`) to receive a notification when the object becomes
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
/// readable or is closed.
///
/// # Implementation
Expand All @@ -122,7 +122,7 @@ mod if_std {
///
/// If no data is available for reading, the method returns
/// `Ok(Poll::Pending)` and arranges for the current task (via
/// `cx.waker().wake()`) to receive a notification when the object becomes
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
/// readable or is closed.
/// By default, this method delegates to using `poll_read` on the first
/// buffer in `vec`. Objects which support vectored IO should override
Expand Down Expand Up @@ -160,7 +160,7 @@ mod if_std {
///
/// If the object is not ready for writing, the method returns
/// `Ok(Poll::Pending)` and arranges for the current task (via
/// `cx.waker().wake()`) to receive a notification when the object becomes
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
/// readable or is closed.
///
/// # Implementation
Expand All @@ -182,7 +182,7 @@ mod if_std {
///
/// If the object is not ready for writing, the method returns
/// `Ok(Poll::Pending)` and arranges for the current task (via
/// `cx.waker().wake()`) to receive a notification when the object becomes
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
/// readable or is closed.
///
/// By default, this method delegates to using `poll_write` on the first
Expand Down Expand Up @@ -213,7 +213,7 @@ mod if_std {
///
/// If flushing cannot immediately complete, this method returns
/// `Ok(Poll::Pending)` and arranges for the current task (via
/// `cx.waker().wake()`) to receive a notification when the object can make
/// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
/// progress towards flushing.
///
/// # Implementation
Expand All @@ -230,7 +230,7 @@ mod if_std {
///
/// If closing cannot immediately complete, this function returns
/// `Ok(Poll::Pending)` and arranges for the current task (via
/// `cx.waker().wake()`) to receive a notification when the object can make
/// `cx.waker().wake_by_ref()`) to receive a notification when the object can make
/// progress towards closing.
///
/// # Implementation
Expand Down
6 changes: 3 additions & 3 deletions futures-sink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub trait Sink<Item> {
///
/// This method returns `Poll::Ready` once the underlying sink is ready to
/// receive data. If this method returns `Poll::Pending`, the current task
/// is registered to be notified (via `cx.waker().wake()`) when `poll_ready`
/// is registered to be notified (via `cx.waker().wake_by_ref()`) when `poll_ready`
/// should be called again.
///
/// In most cases, if the sink encounters an error, the sink will
Expand Down Expand Up @@ -95,7 +95,7 @@ pub trait Sink<Item> {
/// via `start_send` have been flushed.
///
/// Returns `Ok(Poll::Pending)` if there is more work left to do, in which
/// case the current task is scheduled (via `cx.waker().wake()`) to wake up when
/// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when
/// `poll_flush` should be called again.
///
/// In most cases, if the sink encounters an error, the sink will
Expand All @@ -108,7 +108,7 @@ pub trait Sink<Item> {
/// has been successfully closed.
///
/// Returns `Ok(Poll::Pending)` if there is more work left to do, in which
/// case the current task is scheduled (via `cx.waker().wake()`) to wake up when
/// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when
/// `poll_close` should be called again.
///
/// If this function encounters an error, the sink should be considered to
Expand Down
2 changes: 1 addition & 1 deletion futures-test/src/future/pending_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl<Fut: Future> Future for PendingOnce<Fut> {
self.as_mut().future().poll(cx)
} else {
*self.as_mut().polled_before() = true;
cx.waker().wake();
cx.waker().wake_by_ref();
Poll::Pending
}
}
Expand Down
12 changes: 6 additions & 6 deletions futures-test/src/task/panic_waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ use futures_core::task::{Waker, RawWaker, RawWakerVTable};
use core::cell::UnsafeCell;
use core::ptr::null;

unsafe fn clone_panic_waker(_data: *const()) -> RawWaker {
unsafe fn clone_panic_waker(_data: *const ()) -> RawWaker {
raw_panic_waker()
}

unsafe fn noop(_data: *const()) {
}
unsafe fn noop(_data: *const ()) {}

unsafe fn wake_panic(_data: *const()) {
unsafe fn wake_panic(_data: *const ()) {
panic!("should not be woken");
}

const PANIC_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
clone_panic_waker,
wake_panic,
wake_panic,
noop,
);

Expand All @@ -37,7 +37,7 @@ fn raw_panic_waker() -> RawWaker {
/// waker.wake(); // Will panic
/// ```
pub fn panic_waker() -> Waker {
unsafe { Waker::new_unchecked(raw_panic_waker()) }
unsafe { Waker::from_raw(raw_panic_waker()) }
}

/// Get a global reference to a
Expand All @@ -52,7 +52,7 @@ pub fn panic_waker() -> Waker {
/// use futures_test::task::panic_waker_ref;
///
/// let waker = panic_waker_ref();
/// waker.wake(); // Will panic
/// waker.wake_by_ref(); // Will panic
/// ```
pub fn panic_waker_ref() -> &'static Waker {
thread_local! {
Expand Down
4 changes: 2 additions & 2 deletions futures-test/src/task/wake_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct WakerInner {
}

impl ArcWake for WakerInner {
fn wake(arc_self: &Arc<Self>) {
fn wake_by_ref(arc_self: &Arc<Self>) {
let _ = arc_self.count.fetch_add(1, Ordering::SeqCst);
}
}
Expand All @@ -49,7 +49,7 @@ impl ArcWake for WakerInner {
///
/// assert_eq!(count, 0);
///
/// waker.wake();
/// waker.wake_by_ref();
/// waker.wake();
///
/// assert_eq!(count, 2);
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/compat/compat01as03.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl<'a> From<WakerToHandle<'a>> for NotifyHandle01 {

impl Notify01 for NotifyWaker {
fn notify(&self, _: usize) {
self.0.wake();
self.0.wake_by_ref();
}
}

Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/compat/compat03as01.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,15 @@ impl Current {
}

let ptr = current_to_ptr(self);
let vtable = &RawWakerVTable::new(clone, wake, drop);
let vtable = &RawWakerVTable::new(clone, wake, wake, drop);
unsafe {
WakerRef::new(task03::Waker::new_unchecked(RawWaker::new(ptr, vtable)))
WakerRef::new(task03::Waker::from_raw(RawWaker::new(ptr, vtable)))
}
}
}

impl ArcWake03 for Current {
fn wake(arc_self: &Arc<Self>) {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.0.notify();
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/future/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ where
}

impl ArcWake for Notifier {
fn wake(arc_self: &Arc<Self>) {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.state.compare_and_swap(POLLING, REPOLL, SeqCst);

let wakers = &mut *arc_self.wakers.lock().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
// At this point, it may be worth yielding the thread &
// spinning a few times... but for now, just yield using the
// task system.
cx.waker().wake();
cx.waker().wake_by_ref();
return Poll::Pending;
}
Dequeue::Data(task) => task,
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/futures_unordered/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ unsafe impl<Fut> Send for Task<Fut> {}
unsafe impl<Fut> Sync for Task<Fut> {}

impl<Fut> ArcWake for Task<Fut> {
fn wake(arc_self: &Arc<Self>) {
fn wake_by_ref(arc_self: &Arc<Self>) {
let inner = match arc_self.ready_to_run_queue.upgrade() {
Some(inner) => inner,
None => return,
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/select_next_some.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl<'a, St: Stream + FusedStream + Unpin> Future for SelectNextSome<'a, St> {
Poll::Ready(item)
} else {
debug_assert!(self.stream.is_terminated());
cx.waker().wake();
cx.waker().wake_by_ref();
Poll::Pending
}
}
Expand Down
43 changes: 32 additions & 11 deletions futures-util/src/task/arc_wake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,41 @@ pub trait ArcWake {
///
/// Executors generally maintain a queue of "ready" tasks; `wake` should place
/// the associated task onto this queue.
fn wake(arc_self: &Arc<Self>);
fn wake(self: Arc<Self>) {
Self::wake_by_ref(&self)
}

/// Indicates that the associated task is ready to make progress and should
/// be `poll`ed.
///
/// This function can be called from an arbitrary thread, including threads which
/// did not create the `ArcWake` based `Waker`.
///
/// Executors generally maintain a queue of "ready" tasks; `wake_by_ref` should place
/// the associated task onto this queue.
///
/// This function is similar to `wake`, but must not consume the provided data
/// pointer.
fn wake_by_ref(arc_self: &Arc<Self>);

/// Creates a `Waker` from an Arc<T>, if T implements `ArcWake`.
///
/// If `wake()` is called on the returned `Waker`,
/// the `wake()` function that is defined inside this trait will get called.
fn into_waker(self: Arc<Self>) -> Waker where Self: Sized
{
let ptr = Arc::into_raw(self) as *const();
let ptr = Arc::into_raw(self) as *const ();

unsafe {
Waker::new_unchecked(RawWaker::new(ptr, waker_vtable!(Self)))
Waker::from_raw(RawWaker::new(ptr, waker_vtable!(Self)))
}
}
}

// FIXME: panics on Arc::clone / refcount changes could wreak havoc on the
// code here. We should guard against this by aborting.

unsafe fn increase_refcount<T: ArcWake>(data: *const()) {
unsafe fn increase_refcount<T: ArcWake>(data: *const ()) {
// Retain Arc by creating a copy
let arc: Arc<T> = Arc::from_raw(data as *const T);
let arc_clone = arc.clone();
Expand All @@ -46,19 +61,25 @@ unsafe fn increase_refcount<T: ArcWake>(data: *const()) {
}

// used by `waker_ref`
pub(super) unsafe fn clone_arc_raw<T: ArcWake>(data: *const()) -> RawWaker {
pub(super) unsafe fn clone_arc_raw<T: ArcWake>(data: *const ()) -> RawWaker {
increase_refcount::<T>(data);
RawWaker::new(data, waker_vtable!(T))
}

unsafe fn drop_arc_raw<T: ArcWake>(data: *const()) {
unsafe fn drop_arc_raw<T: ArcWake>(data: *const ()) {
drop(Arc::<T>::from_raw(data as *const T))
}

// used by `waker_ref`
pub(super) unsafe fn wake_arc_raw<T: ArcWake>(data: *const()) {
pub(super) unsafe fn wake_arc_raw<T: ArcWake>(data: *const ()) {
let arc: Arc<T> = Arc::from_raw(data as *const T);
ArcWake::wake(arc);
}

// used by `waker_ref`
pub(super) unsafe fn wake_by_ref_arc_raw<T: ArcWake>(data: *const ()) {
let arc: Arc<T> = Arc::from_raw(data as *const T);
ArcWake::wake(&arc);
ArcWake::wake_by_ref(&arc);
mem::forget(arc);
}

Expand All @@ -84,7 +105,7 @@ mod tests {
}

impl ArcWake for CountingWaker {
fn wake(arc_self: &Arc<Self>) {
fn wake_by_ref(arc_self: &Arc<Self>) {
let mut lock = arc_self.nr_wake.lock().unwrap();
*lock += 1;
}
Expand All @@ -96,13 +117,13 @@ mod tests {

let w1: Waker = ArcWake::into_waker(some_w.clone());
assert_eq!(2, Arc::strong_count(&some_w));
w1.wake();
w1.wake_by_ref();
assert_eq!(1, some_w.wakes());

let w2 = w1.clone();
assert_eq!(3, Arc::strong_count(&some_w));

w2.wake();
w2.wake_by_ref();
assert_eq!(2, some_w.wakes());

drop(w2);
Expand Down
1 change: 1 addition & 0 deletions futures-util/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ cfg_target_has_atomic! {
&RawWakerVTable::new(
clone_arc_raw::<$ty>,
wake_arc_raw::<$ty>,
wake_by_ref_arc_raw::<$ty>,
drop_arc_raw::<$ty>,
)
};
Expand Down
Loading

0 comments on commit ad8f3c4

Please sign in to comment.