From 43a19c8cb656e3ae308a45921e74d881d23c6cb4 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 5 Oct 2016 16:53:17 -0700 Subject: [PATCH] Rewrite the stream::channel implementation The previous implementation suffered a fatal flaw (#191) where to make progress one half was required to block literally waiting for the other. The whole point of all the lock free shenanigans was to avoid that, so it clearly didn't solve its intended problem! This commit completely rewrites `stream::channel` away from this pesky `Slot` abstraction in a way that is geared towards avoiding this race we discovered. Lots more details can be found in the implementation itself. Closes #191 --- src/lib.rs | 1 - src/slot.rs | 691 ------------------------------------------ src/stream/channel.rs | 354 ++++++++++++++++++---- tests/channel.rs | 22 ++ 4 files changed, 310 insertions(+), 758 deletions(-) delete mode 100644 src/slot.rs diff --git a/src/lib.rs b/src/lib.rs index 7f36eb479d1..ffee1a20a7d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -208,7 +208,6 @@ pub use then::Then; if_std! { mod lock; - mod slot; pub mod task; mod catch_unwind; diff --git a/src/slot.rs b/src/slot.rs deleted file mode 100644 index a48718e32dd..00000000000 --- a/src/slot.rs +++ /dev/null @@ -1,691 +0,0 @@ -//! A slot in memory for communicating between a producer and a consumer. -//! -//! This module contains an implementation detail of this library for a type -//! which is only intended to be shared between one consumer and one producer of -//! a value. It is unlikely that this module will survive stabilization of this -//! library, so it is not recommended to rely on it. - -#![allow(dead_code)] // imported in a few places - -use std::prelude::v1::*; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use lock::Lock; - -/// A slot in memory intended to represent the communication channel between one -/// producer and one consumer. -/// -/// Each slot contains space for a piece of data of type `T`, and can have -/// callbacks registered to run when the slot is either full or empty. -/// -/// Slots are only intended to be shared between exactly one producer and -/// exactly one consumer. If there are multiple concurrent producers or -/// consumers then this is still memory safe but will have unpredictable results -/// (and maybe panics). Note that this does not require that the "consumer" is -/// the same for the entire lifetime of a slot, simply that there is only one -/// consumer at a time. -/// -/// # Registering callbacks -/// -/// [`on_empty`](#method.on_empty) registers a callback to run when the slot -/// becomes empty, and [`on_full`](#method.on_full) registers one to run when it -/// becomes full. In both cases, the callback will run immediately if possible. -/// -/// At most one callback can be registered at any given time: it is an error to -/// attempt to register a callback with `on_full` if one is currently registered -/// via `on_empty`, or any other combination. -/// -/// # Cancellation -/// -/// Registering a callback returns a `Token` which can be used to -/// [`cancel`](#method.cancel) the callback. Only callbacks that have not yet -/// started running can be canceled. Canceling a callback that has already run -/// is not an error, and `cancel` does not signal whether or not the callback -/// was actually canceled to the caller. -pub struct Slot { - // The purpose of this data type is to communicate when a value becomes - // available and coordinate between a producer and consumer about that - // value. Slots end up being at the core of many futures as they handle - // values transferring between producers and consumers, which means that - // they can never block. - // - // As a result, this `Slot` is a lock-free implementation in terms of not - // actually blocking at any point in time. The `Lock` types are - // half-optional and half-not-optional. They aren't actually mutexes as they - // only support a `try_lock` operation, and all the methods below ensure - // that progress can always be made without blocking. - // - // The `state` variable keeps track of the state of this slot, while the - // other fields here are just the payloads of the slot itself. Note that the - // exact bits of `state` are typically wrapped up in a `State` for - // inspection (see below). - state: AtomicUsize, - slot: Lock>, - on_full: Lock>>>, - on_empty: Lock>, Option)>>, -} - -/// Error value returned from erroneous calls to `try_produce`, which contains -/// the value that was passed to `try_produce`. -#[derive(Debug, PartialEq)] -pub struct TryProduceError(T); - -/// Error value returned from erroneous calls to `try_consume`. -#[derive(Debug, PartialEq)] -pub struct TryConsumeError(()); - -/// Error value returned from erroneous calls to `on_full`. -#[derive(Debug, PartialEq)] -pub struct OnFullError(()); - -/// Error value returned from erroneous calls to `on_empty`. -#[derive(Debug, PartialEq)] -pub struct OnEmptyError(()); - -/// A `Token` represents a registered callback, and can be used to cancel the callback. -#[derive(Clone, Copy)] -pub struct Token(usize); - -// Slot state: the lowest 3 bits are flags; the remaining bits are used to -// store the `Token` for the currently registered callback. The special token -// value 0 means no callback is registered. -// -// The flags are: -// - `DATA`: the `Slot` contains a value -// - `ON_FULL`: the `Slot` has an `on_full` callback registered -// - `ON_EMPTY`: the `Slot` has an `on_empty` callback registered -struct State(usize); - -const DATA: usize = 1 << 0; -const ON_FULL: usize = 1 << 1; -const ON_EMPTY: usize = 1 << 2; -const STATE_BITS: usize = 3; -const STATE_MASK: usize = (1 << STATE_BITS) - 1; - -fn _is_send() {} -fn _is_sync() {} - -fn _assert() { - _is_send::>(); - _is_sync::>(); -} - -impl Slot { - /// Creates a new `Slot` containing `val`, which may be `None` to create an - /// empty `Slot`. - pub fn new(val: Option) -> Slot { - Slot { - state: AtomicUsize::new(if val.is_some() {DATA} else {0}), - slot: Lock::new(val), - on_full: Lock::new(None), - on_empty: Lock::new(None), - } - } - - /// Attempts to store `t` in the slot. - /// - /// This method can only be called by the one producer working on this - /// `Slot`. Concurrent calls to this method or `on_empty` will result in - /// panics or possibly errors. - /// - /// # Errors - /// - /// Returns `Err` if the slot is already full. The value you attempted to - /// store is included in the error value. - /// - /// # Panics - /// - /// This method will panic if called concurrently with `try_produce` or - /// `on_empty`, or if `on_empty` has been called previously but the callback - /// hasn't fired. - pub fn try_produce(&self, t: T) -> Result<(), TryProduceError> { - // First up, let's take a look at our current state. Of our three flags, - // we check a few: - // - // * DATA - if this is set, then the production fails as a value has - // already been produced and we're not ready to receive it yet. - // * ON_EMPTY - this should never be set as it indicates a contract - // violation as the producer already registered interest in - // a value but the callback wasn't fired. - // * ON_FULL - doesn't matter in this use case, we don't check it as - // either state is valid. - let mut state = State(self.state.load(Ordering::SeqCst)); - assert!(!state.flag(ON_EMPTY)); - if state.flag(DATA) { - return Err(TryProduceError(t)) - } - - // Ok, so we've determined that our state is either `ON_FULL` or `0`, in - // both cases we're going to store our data into our slot. This should - // always succeed as access to `slot` is gated on the `DATA` flag being - // set on the consumer side (which isn't set) and there should only be - // one producer. - let mut slot = self.slot.try_lock().expect("interference with consumer?"); - assert!(slot.is_none()); - *slot = Some(t); - drop(slot); - - // Next, we update our state with `DATA` to say that something is - // available, and we also unset `ON_FULL` because we'll invoke the - // callback if it's available. - loop { - assert!(!state.flag(ON_EMPTY)); - let new_state = state.set_flag(DATA, true).set_flag(ON_FULL, false); - let old = self.state.compare_and_swap(state.0, - new_state.0, - Ordering::SeqCst); - if old == state.0 { - break - } - state.0 = old; - } - - // If our previous state we transitioned from indicates that it has an - // on-full callback, we call that callback here. There's a few unwraps - // here that should never fail because the consumer shouldn't be placing - // another callback here and there shouldn't be any other producers as - // well. - if state.flag(ON_FULL) { - let cb = self.on_full.try_lock().expect("interference2") - .take().expect("ON_FULL but no callback"); - cb.call_box(self); - } - Ok(()) - } - - /// Registers `f` as a callback to run when the slot becomes empty. - /// - /// The callback will run immediately if the slot is already empty. Returns - /// a token that can be used to cancel the callback. This method is to be - /// called by the producer, and it is illegal to call this method - /// concurrently with either `on_empty` or `try_produce`. - /// - /// # Panics - /// - /// Panics if another callback was already registered via `on_empty` or - /// `on_full`, or if this value is called concurrently with other producer - /// methods. - pub fn on_empty(&self, item: Option, f: F) -> Token - where F: FnOnce(&Slot, Option) + Send + 'static - { - // First up, as usual, take a look at our state. Of the three flags we - // check two: - // - // * DATA - if set, we keep going, but if unset we're done as there's no - // data and we're already empty. - // * ON_EMPTY - this should be impossible as it's a contract violation - // to call this twice or concurrently. - // * ON_FULL - it's illegal to have both an empty and a full callback - // simultaneously, so we check this just after we ensure - // there's data available. If there's data there should not - // be a full callback as it should have been called. - let mut state = State(self.state.load(Ordering::SeqCst)); - assert!(!state.flag(ON_EMPTY)); - if !state.flag(DATA) { - f(self, item); - return Token(0) - } - assert!(!state.flag(ON_FULL)); - - // At this point we've precisely determined that our state is `DATA` and - // all other flags are unset. We're cleared for landing in initializing - // the `on_empty` slot so we store our callback here. - let mut slot = self.on_empty.try_lock().expect("on_empty interference"); - assert!(slot.is_none()); - *slot = Some((Box::new(f), item)); - drop(slot); - - // In this loop, we transition ourselves from the `DATA` state to a - // state which has the on empty flag state. Note that we also increase - // the token of this state as we're registering a new callback. - loop { - assert!(state.flag(DATA)); - assert!(!state.flag(ON_FULL)); - assert!(!state.flag(ON_EMPTY)); - let new_state = state.set_flag(ON_EMPTY, true) - .set_token(state.token() + 1); - let old = self.state.compare_and_swap(state.0, - new_state.0, - Ordering::SeqCst); - - // If we succeeded in the CAS, then we're done and our token is - // valid. - if old == state.0 { - return Token(new_state.token()) - } - state.0 = old; - - // If we failed the CAS but the data was taken in the meantime we - // abort our attempt to set on-empty and call the callback - // immediately. Note that the on-empty flag was never set, so it - // should still be there and should be available to take. - if !state.flag(DATA) { - let cb = self.on_empty.try_lock().expect("on_empty interference2") - .take().expect("on_empty not empty??"); - let (cb, item) = cb; - cb.call_box(self, item); - return Token(0) - } - } - } - - /// Attempts to consume the value stored in the slot. - /// - /// This method can only be called by the one consumer of this slot, and - /// cannot be called concurrently with `try_consume` or `on_full`. - /// - /// # Errors - /// - /// Returns `Err` if the slot is already empty. - /// - /// # Panics - /// - /// This method will panic if called concurrently with `try_consume` or - /// `on_full`, or otherwise show weird behavior. - pub fn try_consume(&self) -> Result { - // The implementation of this method is basically the same as - // `try_produce` above, it's just the opposite of all the operations. - let mut state = State(self.state.load(Ordering::SeqCst)); - assert!(!state.flag(ON_FULL)); - if !state.flag(DATA) { - return Err(TryConsumeError(())) - } - let mut slot = self.slot.try_lock().expect("interference with producer?"); - let val = slot.take().expect("DATA but not data"); - drop(slot); - - loop { - assert!(!state.flag(ON_FULL)); - let new_state = state.set_flag(DATA, false).set_flag(ON_EMPTY, false); - let old = self.state.compare_and_swap(state.0, - new_state.0, - Ordering::SeqCst); - if old == state.0 { - break - } - state.0 = old; - } - assert!(!state.flag(ON_FULL)); - if state.flag(ON_EMPTY) { - let cb = self.on_empty.try_lock().expect("interference3") - .take().expect("ON_EMPTY but no callback"); - let (cb, item) = cb; - cb.call_box(self, item); - } - Ok(val) - } - - /// Registers `f` as a callback to run when the slot becomes full. - /// - /// The callback will run immediately if the slot is already full. Returns a - /// token that can be used to cancel the callback. - /// - /// This method is to be called by the consumer. - /// - /// # Panics - /// - /// Panics if another callback was already registered via `on_empty` or - /// `on_full` or if called concurrently with `on_full` or `try_consume`. - pub fn on_full(&self, f: F) -> Token - where F: FnOnce(&Slot) + Send + 'static - { - // The implementation of this method is basically the same as - // `on_empty` above, it's just the opposite of all the operations. - let mut state = State(self.state.load(Ordering::SeqCst)); - assert!(!state.flag(ON_FULL)); - if state.flag(DATA) { - f(self); - return Token(0) - } - assert!(!state.flag(ON_EMPTY)); - - let mut slot = self.on_full.try_lock().expect("on_full interference"); - assert!(slot.is_none()); - *slot = Some(Box::new(f)); - drop(slot); - - loop { - assert!(!state.flag(DATA)); - assert!(!state.flag(ON_EMPTY)); - assert!(!state.flag(ON_FULL)); - let new_state = state.set_flag(ON_FULL, true) - .set_token(state.token() + 1); - let old = self.state.compare_and_swap(state.0, - new_state.0, - Ordering::SeqCst); - if old == state.0 { - return Token(new_state.token()) - } - state.0 = old; - - if state.flag(DATA) { - let cb = self.on_full.try_lock().expect("on_full interference2") - .take().expect("on_full not full??"); - cb.call_box(self); - return Token(0) - } - } - } - - /// Cancels the callback associated with `token`. - /// - /// Canceling a callback that has already started running, or has already - /// run will do nothing, and is not an error. See - /// [Cancellation](#cancellation). - /// - /// # Panics - /// - /// This method may cause panics if it is called concurrently with - /// `on_empty` or `on_full`, depending on which callback is being canceled. - pub fn cancel(&self, token: Token) { - // Tokens with a value of "0" are sentinels which don't actually do - // anything. - let token = token.0; - if token == 0 { - return - } - - let mut state = State(self.state.load(Ordering::SeqCst)); - loop { - // If we've moved on to a different token, then we're guaranteed - // that our token won't show up again, so we can return immediately - // as our closure has likely already run (or been previously - // canceled). - if state.token() != token { - return - } - - // If our token matches, then let's see if we're cancelling either - // the on-full or on-empty callbacks. It's illegal to have them both - // registered, so we only need to look at one. - // - // If neither are set then the token has probably already run, so we - // just continue along our merry way and don't worry. - let new_state = if state.flag(ON_FULL) { - assert!(!state.flag(ON_EMPTY)); - state.set_flag(ON_FULL, false) - } else if state.flag(ON_EMPTY) { - assert!(!state.flag(ON_FULL)); - state.set_flag(ON_EMPTY, false) - } else { - return - }; - let old = self.state.compare_and_swap(state.0, - new_state.0, - Ordering::SeqCst); - if old == state.0 { - break - } - state.0 = old; - } - - // Figure out which callback we just canceled, and now that the flag is - // unset we should own the callback to clear it. - - if state.flag(ON_FULL) { - let cb = self.on_full.try_lock().expect("on_full interference3") - .take().expect("on_full not full??"); - drop(cb); - } else { - let cb = self.on_empty.try_lock().expect("on_empty interference3") - .take().expect("on_empty not empty??"); - drop(cb); - } - } -} - -impl TryProduceError { - /// Extracts the value that was attempted to be produced. - pub fn into_inner(self) -> T { - self.0 - } -} - -trait FnBox: Send { - fn call_box(self: Box, other: &Slot); -} - -impl FnBox for F - where F: FnOnce(&Slot) + Send, -{ - fn call_box(self: Box, other: &Slot) { - (*self)(other) - } -} - -trait FnBox2: Send { - fn call_box(self: Box, other: &Slot, Option); -} - -impl FnBox2 for F - where F: FnOnce(&Slot, Option) + Send, -{ - fn call_box(self: Box, other: &Slot, item: Option) { - (*self)(other, item) - } -} - -impl State { - fn flag(&self, f: usize) -> bool { - self.0 & f != 0 - } - - fn set_flag(&self, f: usize, val: bool) -> State { - State(if val { - self.0 | f - } else { - self.0 & !f - }) - } - - fn token(&self) -> usize { - self.0 >> STATE_BITS - } - - fn set_token(&self, gen: usize) -> State { - State((gen << STATE_BITS) | (self.0 & STATE_MASK)) - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::thread; - - use super::Slot; - - #[test] - fn sequential() { - let slot = Slot::new(Some(1)); - - // We can consume once - assert_eq!(slot.try_consume(), Ok(1)); - assert!(slot.try_consume().is_err()); - - // Consume a production - assert_eq!(slot.try_produce(2), Ok(())); - assert_eq!(slot.try_consume(), Ok(2)); - - // Can't produce twice - assert_eq!(slot.try_produce(3), Ok(())); - assert!(slot.try_produce(3).is_err()); - - // on_full is run immediately if full - let hit = Arc::new(AtomicUsize::new(0)); - let hit2 = hit.clone(); - slot.on_full(move |_s| { - hit2.fetch_add(1, Ordering::SeqCst); - }); - assert_eq!(hit.load(Ordering::SeqCst), 1); - - // on_full can be run twice, and we can consume in the callback - let hit2 = hit.clone(); - slot.on_full(move |s| { - hit2.fetch_add(1, Ordering::SeqCst); - assert_eq!(s.try_consume(), Ok(3)); - }); - assert_eq!(hit.load(Ordering::SeqCst), 2); - - // Production can't run a previous callback - assert_eq!(slot.try_produce(4), Ok(())); - assert_eq!(hit.load(Ordering::SeqCst), 2); - assert_eq!(slot.try_consume(), Ok(4)); - - // Productions run new callbacks - let hit2 = hit.clone(); - slot.on_full(move |s| { - hit2.fetch_add(1, Ordering::SeqCst); - assert_eq!(s.try_consume(), Ok(5)); - }); - assert_eq!(slot.try_produce(5), Ok(())); - assert_eq!(hit.load(Ordering::SeqCst), 3); - - // on empty should fire immediately for an empty slot - let hit2 = hit.clone(); - slot.on_empty(None, move |_, _| { - hit2.fetch_add(1, Ordering::SeqCst); - }); - assert_eq!(hit.load(Ordering::SeqCst), 4); - } - - #[test] - fn channel() { - const N: usize = 10000; - - struct Sender { - slot: Arc>, - hit: Arc, - } - - struct Receiver { - slot: Arc>, - hit: Arc, - } - - impl Sender { - fn send(&self, val: usize) { - if self.slot.try_produce(val).is_ok() { - return - } - let me = thread::current(); - self.hit.store(0, Ordering::SeqCst); - let hit = self.hit.clone(); - self.slot.on_empty(None, move |_slot, _| { - hit.store(1, Ordering::SeqCst); - me.unpark(); - }); - while self.hit.load(Ordering::SeqCst) == 0 { - thread::park(); - } - self.slot.try_produce(val).expect("can't produce after on_empty") - } - } - - impl Receiver { - fn recv(&self) -> usize { - if let Ok(i) = self.slot.try_consume() { - return i - } - - let me = thread::current(); - self.hit.store(0, Ordering::SeqCst); - let hit = self.hit.clone(); - self.slot.on_full(move |_slot| { - hit.store(1, Ordering::SeqCst); - me.unpark(); - }); - while self.hit.load(Ordering::SeqCst) == 0 { - thread::park(); - } - self.slot.try_consume().expect("can't consume after on_full") - } - } - - let slot = Arc::new(Slot::new(None)); - let slot2 = slot.clone(); - - let tx = Sender { slot: slot2, hit: Arc::new(AtomicUsize::new(0)) }; - let rx = Receiver { slot: slot, hit: Arc::new(AtomicUsize::new(0)) }; - - let a = thread::spawn(move || { - for i in 0..N { - assert_eq!(rx.recv(), i); - } - }); - - for i in 0..N { - tx.send(i); - } - - a.join().unwrap(); - } - - #[test] - fn cancel() { - let slot = Slot::new(None); - let hits = Arc::new(AtomicUsize::new(0)); - - let add = || { - let hits = hits.clone(); - move |_: &Slot| { hits.fetch_add(1, Ordering::SeqCst); } - }; - let add_empty = || { - let hits = hits.clone(); - move |_: &Slot, _: Option| { - hits.fetch_add(1, Ordering::SeqCst); - } - }; - - // cancel on_full - let n = hits.load(Ordering::SeqCst); - assert_eq!(hits.load(Ordering::SeqCst), n); - let token = slot.on_full(add()); - assert_eq!(hits.load(Ordering::SeqCst), n); - slot.cancel(token); - assert_eq!(hits.load(Ordering::SeqCst), n); - assert!(slot.try_consume().is_err()); - assert!(slot.try_produce(1).is_ok()); - assert!(slot.try_consume().is_ok()); - assert_eq!(hits.load(Ordering::SeqCst), n); - - // cancel on_empty - let n = hits.load(Ordering::SeqCst); - assert_eq!(hits.load(Ordering::SeqCst), n); - slot.try_produce(1).unwrap(); - let token = slot.on_empty(None, add_empty()); - assert_eq!(hits.load(Ordering::SeqCst), n); - slot.cancel(token); - assert_eq!(hits.load(Ordering::SeqCst), n); - assert!(slot.try_produce(1).is_err()); - - // cancel with no effect - let n = hits.load(Ordering::SeqCst); - assert_eq!(hits.load(Ordering::SeqCst), n); - let token = slot.on_full(add()); - assert_eq!(hits.load(Ordering::SeqCst), n + 1); - slot.cancel(token); - assert_eq!(hits.load(Ordering::SeqCst), n + 1); - assert!(slot.try_consume().is_ok()); - let token = slot.on_empty(None, add_empty()); - assert_eq!(hits.load(Ordering::SeqCst), n + 2); - slot.cancel(token); - assert_eq!(hits.load(Ordering::SeqCst), n + 2); - - // cancel old ones don't count - let n = hits.load(Ordering::SeqCst); - assert_eq!(hits.load(Ordering::SeqCst), n); - let token1 = slot.on_full(add()); - assert_eq!(hits.load(Ordering::SeqCst), n); - assert!(slot.try_produce(1).is_ok()); - assert_eq!(hits.load(Ordering::SeqCst), n + 1); - assert!(slot.try_consume().is_ok()); - assert_eq!(hits.load(Ordering::SeqCst), n + 1); - let token2 = slot.on_full(add()); - assert_eq!(hits.load(Ordering::SeqCst), n + 1); - slot.cancel(token1); - assert_eq!(hits.load(Ordering::SeqCst), n + 1); - slot.cancel(token2); - assert_eq!(hits.load(Ordering::SeqCst), n + 1); - } -} diff --git a/src/stream/channel.rs b/src/stream/channel.rs index 471e370fd53..51c51824139 100644 --- a/src/stream/channel.rs +++ b/src/stream/channel.rs @@ -2,12 +2,13 @@ use std::any::Any; use std::error::Error; use std::fmt; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; use {Future, Poll, Async}; -use slot::{Slot, Token}; +use lock::Lock; use stream::Stream; -use task; +use task::{self, Task}; /// Creates an in-memory channel implementation of the `Stream` trait. /// @@ -22,15 +23,20 @@ use task; /// number of the associated combinators for transforming the result. pub fn channel() -> (Sender, Receiver) { let inner = Arc::new(Inner { - slot: Slot::new(None), - receiver_gone: AtomicBool::new(false), + state: AtomicUsize::new(EMPTY), + data: Lock::new(None), + rx_task1: Lock::new(None), + rx_task2: Lock::new(None), + tx_task1: Lock::new(None), + tx_task2: Lock::new(None), }); let sender = Sender { inner: inner.clone(), + flag: true, }; let receiver = Receiver { inner: inner, - on_full_token: None, + flag: true, }; (sender, receiver) } @@ -39,16 +45,16 @@ pub fn channel() -> (Sender, Receiver) { /// /// This is created by the `channel` method in the `stream` module. pub struct Sender { - inner: Arc>, + inner: Arc>>, + // described below on `Inner` + flag: bool, } /// A future returned by the `Sender::send` method which will resolve to the /// sender once it's available to send another message. #[must_use = "futures do nothing unless polled"] pub struct FutureSender { - sender: Option>, - data: Option>, - on_empty_token: Option, + state: Option<(Sender, Result)>, } /// The receiving end of a channel which implements the `Stream` trait. @@ -58,20 +64,94 @@ pub struct FutureSender { /// `channel` method in the `stream` module. #[must_use = "streams do nothing unless polled"] pub struct Receiver { - inner: Arc>, - on_full_token: Option, + inner: Arc>>, + // described below on `Inner` + flag: bool, } -struct Inner { - slot: Slot>>, - receiver_gone: AtomicBool, -} +/// Internal state shared by the `Sender` and `Receiver` types. +/// +/// While this is similar to the oneshot internal state, it's subtly different +/// with an extra `rx_task` and `tx_task` fields for blocking. See comments +/// below for what's what. +struct Inner { + /// Actual state of this channel, essentially what's going on inside `data`. + /// + /// This currently has three valid values (constants below this struct + /// definition): + /// + /// * EMPTY - both the sender and receiver are alive, but there's no data to + /// be had in `data`. A receiver must block and a sender can + /// proceed. + /// * DATA - both the sender and receiver are alive, and there's data to be + /// retrieved inside of `data`. A receiver can proceed by picking + /// out the data but a sender must block to send something else. + /// * GONE - *either* the sender or receiver is gone (or both). No operation + /// should block any more and all data should be handled + /// appropriately. Note that if a receiver sees GONE then there may + /// still be data inside `data`, so it needs to be checked. + /// + /// This isn't really atomically updated in the sense of swap or + /// compare_exchange, but rather with a few atomic stores here and there + /// with a sprinkling of compare_exchange. See the code below for more + /// details. + state: AtomicUsize, + + /// The actual data being transmitted across this channel. + /// + /// This is `Some` if state is `DATA` and `None` if state is `EMPTY`. If + /// the state is `GONE` then the receiver needs to check this and the sender + /// should ignore it. + /// + /// Note that this probably doesn't need a `Lock` around it and can likely + /// just be an `UnsafeCell` + data: Lock>, -enum Message { - Data(T), - Done, + /// Ok, here's where things get tricky. These four fields are for blocked + /// tasks. + /// + /// "Four?!" you might be saying, "surely there can only be at most one task + /// blocked on a channel" you might also be saying. Well, you're correct! + /// Due to various subtleties and the desire to never have any task *block* + /// another (in the sense of a mutex block) these are all required. + /// + /// The general gist of what's going on here is that a `Sender` will + /// alternate storing its blocked task in `tx_task1` and `tx_task2`. + /// Similarly a `Receiver` will alternate storing a blocked task in + /// `rx_task1` and `rx_task2`. + /// + /// The race that this is trying to solve is this: + /// + /// * When the receiver receives a message, it will empty out the data, then + /// lock the tx task to wake it up (if one's available). + /// * The sender, not blocked, then sees that the channel is empty, so it + /// sends some data. + /// * The sender again, not blocked, tries to send some more data, but this + /// time its blocked. + /// + /// Here we've got a concurrent situation where the receiver is holding the + /// locked for the tx task, but the sender *also* wants to store a new tx + /// task to get unblocked. This would involve the sender literally blocking + /// waiting for the receiver to unlock, so instead we shard up the tx task + /// locks into two. This means that in the situation above the two halves + /// will never be racing on the same slot and always have access to block + /// when they need it. + /// + /// Similar logic applies to the receiver (I think...) so there's two rx + /// task slots here as well. + /// + /// TODO: does this really need two rx tasks? I've thought through tx task + /// to justify those two but not the other way around. + rx_task1: Lock>, + rx_task2: Lock>, + tx_task1: Lock>, + tx_task2: Lock>, } +const EMPTY: usize = 0; +const DATA: usize = 1; +const GONE: usize = 2; + /// Error type returned by `FutureSender` when the receiving end of a `channel` is dropped pub struct SendError(Result); @@ -103,34 +183,125 @@ impl Stream for Receiver { type Error = E; fn poll(&mut self) -> Poll, E> { - if let Some(token) = self.on_full_token.take() { - self.inner.slot.cancel(token); - } + // First thing's first, let's check out the state of the channel. A + // local flag is kept which indicates whether we've got data available + // to us. + let mut data = false; + match self.inner.state.load(SeqCst) { + // If the sender is gone, then we need to take a look inside our + // `data` field. Fall through below to figure that out. + GONE => data = true, + + // If we've got data, then we've got data! + DATA => data = true, - match self.inner.slot.try_consume() { - Ok(Message::Data(Ok(e))) => Ok(Async::Ready(Some(e))), - Ok(Message::Data(Err(e))) => Err(e), - Ok(Message::Done) => Ok(Async::Ready(None)), - Err(..) => { + // If the channel thinks it's empty, then we need to try to block. + // Take our task and put it in the appropriate slot. If we can't + // acquire the lock on the slot, then we know it's taken because a + // sender put some data in the channel and it's trying to wake us + // up. In that situation we know we've got data. + EMPTY => { let task = task::park(); - self.on_full_token = Some(self.inner.slot.on_full(move |_| { - task.unpark(); - })); - Ok(Async::NotReady) + match self.rx_task().try_lock() { + Some(mut slot) => *slot = Some(task), + None => data = true, + } + } + + n => panic!("bad state: {}", n), + } + + // If we *didn't* get data above, then we stored our task to be woken up + // at a later time. Recheck to cover the race where right before we + // stored the task a sender put some data on the channel. If we still + // see `EMPTY`, however, then we're guaranteed any future sender will + // wake up our task. + if !data && self.inner.state.load(SeqCst) == EMPTY { + return Ok(Async::NotReady) + } + + // We've gotten this far, so extract the data (which is guaranteed to + // not be contended) and transform it to our return value. + let ret = match self.inner.data.try_lock().unwrap().take() { + Some(Ok(e)) => Ok(Some(e).into()), + Some(Err(e)) => Err(e), + None => Ok(None.into()), + }; + + // Inform the channel that our data slot is now empty. Note that we use + // a compare_exchange here to ensure that if the sender goes away (e.g. + // transitions to GONE) we don't paper over that state. + drop(self.inner.state.compare_exchange(DATA, EMPTY, SeqCst, SeqCst)); + + // Now that we've extracted the data and updated the state of the + // channel, it's time for us to notify any blocked sender that it can + // continue to move along if it needs. Take a peek at the tx_task we're + // waking up and if it's there unpark it. + // + // TODO: Should this try_lock be an unwrap()? I... can't think of a case + // where the sender should be interfering with this. + if let Some(mut slot) = self.tx_task().try_lock() { + if let Some(task) = slot.take() { + drop(slot); + task.unpark(); } } + + // And finally, with our successfuly receiving of a message, we flip our + // flag to switch the slots we're thinking of for rx tasks and tx tasks. + self.flag = !self.flag; + return ret + } +} + +impl Receiver { + /// Helper method to look at the right slot to store an rx task into, given + /// how many messages we've sent so far. + fn rx_task(&self) -> &Lock> { + if self.flag { + &self.inner.rx_task1 + } else { + &self.inner.rx_task2 + } + } + + /// Helper method to look at the right slot to store an tx task into, given + /// how many messages we've sent so far. + fn tx_task(&self) -> &Lock> { + if self.flag { + &self.inner.tx_task1 + } else { + &self.inner.tx_task2 + } } } impl Drop for Receiver { fn drop(&mut self) { - self.inner.receiver_gone.store(true, Ordering::SeqCst); - if let Some(token) = self.on_full_token.take() { - self.inner.slot.cancel(token); + // First up, inform our sender bretheren that we're going away by + // transitioning ourselves to the GONE state. + if self.inner.state.swap(GONE, SeqCst) == DATA { + drop(self.inner.data.try_lock().unwrap().take()); + } + + // Next, if we stored a handle to our own task to get woken up then + // we're sure to drop that here. No need to keep that around and we + // don't want to hold onto any stale references. + if let Some(mut slot) = self.rx_task().try_lock() { + if let Some(task) = slot.take() { + drop(slot); + drop(task); + } + } + + // And finally, if any sender was waiting for us to take some data we + // ... well ... took the data! If they're here then wake them up. + if let Some(mut slot) = self.tx_task().try_lock() { + if let Some(task) = slot.take() { + drop(slot); + task.unpark(); + } } - self.inner.slot.on_full(|slot| { - drop(slot.try_consume()); - }); } } @@ -140,19 +311,54 @@ impl Sender { /// This method consumes the sender and returns a future which will resolve /// to the sender again when the value sent has been consumed. pub fn send(self, t: Result) -> FutureSender { + // FIXME(#164) this should send immediately FutureSender { - sender: Some(self), - data: Some(t), - on_empty_token: None, + state: Some((self, t)), + } + } + + /// Same as Receiver::rx_task above. + fn rx_task(&self) -> &Lock> { + if self.flag { + &self.inner.rx_task1 + } else { + &self.inner.rx_task2 + } + } + + /// *Almost* the same as Receiver::tx_task above were it not for the lone + /// `!` in front of `self.flag`. + /// + /// Here we actually invert what we're looking at to ensure that the + /// receiver and the sender are always trying to deal with the same task + /// blocking location. + /// + /// For example if the sender has not received anything, it'll wake up + /// blocked tasks in `tx_task1`. If we've sent something, however, our flag + /// will be the opposite and then when we block we want the receiver to wake + /// us up. As a result, we invert our logic to block in the same location. + fn tx_task(&self) -> &Lock> { + if !self.flag { + &self.inner.tx_task1 + } else { + &self.inner.tx_task2 } } } impl Drop for Sender { fn drop(&mut self) { - self.inner.slot.on_empty(None, |slot, _none| { - slot.try_produce(Message::Done).ok().unwrap(); - }); + // Like Receiver::drop we let our other half know we're gone, and then + // we try to wake them up if they're waiting for us. Note that we don't + // frob tx_task here as that's done in FutureSender down below. + self.inner.state.store(GONE, SeqCst); + + if let Some(mut slot) = self.rx_task().try_lock() { + if let Some(task) = slot.take() { + drop(slot); + task.unpark(); + } + } } } @@ -161,38 +367,54 @@ impl Future for FutureSender { type Error = SendError; fn poll(&mut self) -> Poll { - let data = self.data.take().expect("cannot poll FutureSender twice"); - let sender = self.sender.take().expect("cannot poll FutureSender twice"); - if let Some(token) = self.on_empty_token.take() { - sender.inner.slot.cancel(token); + // This is very similar to `Receiver::poll` above, it's basically just + // the opposite in a few places. + let (mut sender, data) = self.state.take().expect("cannot poll \ + FutureSender twice"); + let mut empty = false; + match sender.inner.state.load(SeqCst) { + GONE => return Err(SendError(data)), + EMPTY => empty = true, + DATA => { + let task = task::park(); + match sender.tx_task().try_lock() { + Some(mut slot) => *slot = Some(task), + None => empty = true, + } + } + n => panic!("bad state: {}", n), } - if sender.inner.receiver_gone.load(Ordering::SeqCst) { - return Err(SendError(data)) + if !empty && sender.inner.state.load(SeqCst) == DATA { + self.state = Some((sender, data)); + return Ok(Async::NotReady) } - match sender.inner.slot.try_produce(Message::Data(data)) { - Ok(()) => Ok(Async::Ready(sender)), - Err(e) => { - let task = task::park(); - let token = sender.inner.slot.on_empty(None, move |_slot, _item| { - task.unpark(); - }); - self.on_empty_token = Some(token); - self.data = Some(match e.into_inner() { - Message::Data(data) => data, - Message::Done => panic!(), - }); - self.sender = Some(sender); - Ok(Async::NotReady) + *sender.inner.data.try_lock().unwrap() = Some(data); + drop(sender.inner.state.compare_exchange(EMPTY, DATA, SeqCst, SeqCst)); + if let Some(mut slot) = sender.rx_task().try_lock() { + if let Some(task) = slot.take() { + drop(slot); + task.unpark(); } } + sender.flag = !sender.flag; + Ok(sender.into()) } } impl Drop for FutureSender { fn drop(&mut self) { - if let Some(token) = self.on_empty_token.take() { - if let Some(sender) = self.sender.take() { - sender.inner.slot.cancel(token); + let sender = match self.state.take() { + Some((sender, _)) => sender, + None => return, + }; + + // If we've registered a task to be interested in when the sender was + // empty again, there's no need to hold onto it any more, so extract it + // and drop it. + let slot = sender.tx_task().try_lock(); + if let Some(mut slot) = slot { + if let Some(task) = slot.take() { + drop((slot, task)); } } } diff --git a/tests/channel.rs b/tests/channel.rs index 553aacb539d..b58377525c0 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -3,6 +3,7 @@ extern crate futures; use futures::{task, done, Future, Async}; use futures::stream::*; use std::sync::Arc; +use std::sync::atomic::*; mod support; use support::*; @@ -80,3 +81,24 @@ fn poll_future_then_drop() { drop(t); } + +#[test] +fn drop_order() { + static DROPS: AtomicUsize = ATOMIC_USIZE_INIT; + let (tx, rx) = channel::<_, u32>(); + + struct A; + + impl Drop for A { + fn drop(&mut self) { + DROPS.fetch_add(1, Ordering::SeqCst); + } + } + + let tx = tx.send(Ok(A)).wait().unwrap(); + assert_eq!(DROPS.load(Ordering::SeqCst), 0); + drop(rx); + assert_eq!(DROPS.load(Ordering::SeqCst), 1); + assert!(tx.send(Ok(A)).wait().is_err()); + assert_eq!(DROPS.load(Ordering::SeqCst), 2); +}