From a78a743221d4531ac3a24af5d6ef3e978aa82b39 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Fri, 8 Dec 2017 11:39:25 +0100 Subject: [PATCH] Switch to crossbeam-channel --- Cargo.toml | 3 +- src/lib.rs | 3 +- src/platform/inprocess/mod.rs | 212 +++++++++++++++++----------------- src/platform/macos/mod.rs | 3 +- src/router.rs | 47 ++++---- src/test.rs | 33 +++--- 6 files changed, 154 insertions(+), 147 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a4c4016d0..58a4feeb1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ipc-channel" -version = "0.9.1" +version = "0.10.0" description = "A multiprocess drop-in replacement for Rust channels" authors = ["The Servo Project Developers"] license = "MIT/Apache-2.0" @@ -14,6 +14,7 @@ async = ["futures"] [dependencies] bincode = "0.9" +crossbeam-channel = "0.1.1" lazy_static = "1" libc = "0.2.12" rand = "0.3" diff --git a/src/lib.rs b/src/lib.rs index 081a33cc8..6ce7db171 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,14 +7,13 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -#![cfg_attr(any(feature = "force-inprocess", target_os = "windows", target_os = "android", target_os = "ios"), - feature(mpsc_select))] #![cfg_attr(all(feature = "unstable", test), feature(specialization))] #[macro_use] extern crate lazy_static; extern crate bincode; +extern crate crossbeam_channel; extern crate libc; extern crate rand; extern crate serde; diff --git a/src/platform/inprocess/mod.rs b/src/platform/inprocess/mod.rs index 6a8828e45..9d252e5ae 100644 --- a/src/platform/inprocess/mod.rs +++ b/src/platform/inprocess/mod.rs @@ -8,7 +8,7 @@ // except according to those terms. use bincode; -use std::sync::mpsc; +use crossbeam_channel::{self, Receiver, Select, Sender}; use std::sync::{Arc, Mutex}; use std::collections::hash_map::HashMap; use std::cell::{RefCell}; @@ -17,29 +17,28 @@ use std::slice; use std::fmt::{self, Debug, Formatter}; use std::cmp::{PartialEq}; use std::ops::{Deref, RangeFrom}; -use std::mem; use std::usize; use uuid::Uuid; #[derive(Clone)] struct ServerRecord { sender: OsIpcSender, - conn_sender: mpsc::Sender, - conn_receiver: Arc>>, + conn_sender: Sender, + conn_receiver: Receiver, } impl ServerRecord { fn new(sender: OsIpcSender) -> ServerRecord { - let (tx, rx) = mpsc::channel::(); + let (tx, rx) = crossbeam_channel::unbounded::(); ServerRecord { sender: sender, conn_sender: tx, - conn_receiver: Arc::new(Mutex::new(rx)), + conn_receiver: rx, } } fn accept(&self) { - self.conn_receiver.lock().unwrap().recv().unwrap(); + self.conn_receiver.recv().unwrap(); } fn connect(&self) { @@ -51,16 +50,16 @@ lazy_static! { static ref ONE_SHOT_SERVERS: Mutex> = Mutex::new(HashMap::new()); } -struct MpscChannelMessage(Vec, Vec, Vec); +struct ChannelMessage(Vec, Vec, Vec); -pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver),MpscError> { - let (base_sender, base_receiver) = mpsc::channel::(); +pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), ChannelError> { + let (base_sender, base_receiver) = crossbeam_channel::unbounded::(); Ok((OsIpcSender::new(base_sender), OsIpcReceiver::new(base_receiver))) } #[derive(Debug)] pub struct OsIpcReceiver { - receiver: RefCell>>, + receiver: RefCell>>, } impl PartialEq for OsIpcReceiver { @@ -71,7 +70,7 @@ impl PartialEq for OsIpcReceiver { } impl OsIpcReceiver { - fn new(receiver: mpsc::Receiver) -> OsIpcReceiver { + fn new(receiver: Receiver) -> OsIpcReceiver { OsIpcReceiver { receiver: RefCell::new(Some(receiver)), } @@ -82,31 +81,37 @@ impl OsIpcReceiver { OsIpcReceiver::new(receiver.unwrap()) } - pub fn recv(&self) -> Result<(Vec, Vec, Vec),MpscError> { + pub fn recv( + &self + ) -> Result<(Vec, Vec, Vec), ChannelError> { let r = self.receiver.borrow(); match r.as_ref().unwrap().recv() { - Ok(MpscChannelMessage(d,c,s)) => Ok((d, - c.into_iter().map(OsOpaqueIpcChannel::new).collect(), - s)), - Err(_) => Err(MpscError::ChannelClosedError), + Ok(ChannelMessage(d, c, s)) => { + Ok((d, c.into_iter().map(OsOpaqueIpcChannel::new).collect(), s)) + } + Err(_) => Err(ChannelError::ChannelClosedError), } } - pub fn try_recv(&self) -> Result<(Vec, Vec, Vec),MpscError> { + pub fn try_recv( + &self + ) -> Result<(Vec, Vec, Vec), ChannelError> { let r = self.receiver.borrow(); match r.as_ref().unwrap().try_recv() { - Ok(MpscChannelMessage(d,c,s)) => Ok((d, - c.into_iter().map(OsOpaqueIpcChannel::new).collect(), - s)), - Err(mpsc::TryRecvError::Disconnected) => Err(MpscError::ChannelClosedError), - Err(_) => Err(MpscError::UnknownError), + Ok(ChannelMessage(d, c, s)) => { + Ok((d, c.into_iter().map(OsOpaqueIpcChannel::new).collect(), s)) + } + Err(crossbeam_channel::TryRecvError::Disconnected) => { + Err(ChannelError::ChannelClosedError) + } + Err(_) => Err(ChannelError::UnknownError), } } } #[derive(Clone, Debug)] pub struct OsIpcSender { - sender: RefCell>, + sender: RefCell>, } impl PartialEq for OsIpcSender { @@ -117,13 +122,13 @@ impl PartialEq for OsIpcSender { } impl OsIpcSender { - fn new(sender: mpsc::Sender) -> OsIpcSender { + fn new(sender: Sender) -> OsIpcSender { OsIpcSender { sender: RefCell::new(sender), } } - pub fn connect(name: String) -> Result { + pub fn connect(name: String) -> Result { let record = ONE_SHOT_SERVERS.lock().unwrap().get(&name).unwrap().clone(); record.connect(); Ok(record.sender) @@ -133,14 +138,17 @@ impl OsIpcSender { usize::MAX } - pub fn send(&self, - data: &[u8], - ports: Vec, - shared_memory_regions: Vec) - -> Result<(),MpscError> - { - match self.sender.borrow().send(MpscChannelMessage(data.to_vec(), ports, shared_memory_regions)) { - Err(_) => Err(MpscError::BrokenPipeError), + pub fn send( + &self, + data: &[u8], + ports: Vec, + shared_memory_regions: Vec, + ) -> Result<(), ChannelError> { + match self.sender + .borrow() + .send(ChannelMessage(data.to_vec(), ports, shared_memory_regions)) + { + Err(_) => Err(ChannelError::BrokenPipeError), Ok(_) => Ok(()), } } @@ -153,7 +161,7 @@ pub struct OsIpcReceiverSet { } impl OsIpcReceiverSet { - pub fn new() -> Result { + pub fn new() -> Result { Ok(OsIpcReceiverSet { incrementor: 0.., receiver_ids: vec![], @@ -161,67 +169,48 @@ impl OsIpcReceiverSet { }) } - pub fn add(&mut self, receiver: OsIpcReceiver) -> Result { + pub fn add(&mut self, receiver: OsIpcReceiver) -> Result { let last_index = self.incrementor.next().unwrap(); self.receiver_ids.push(last_index); self.receivers.push(receiver.consume()); Ok(last_index) } - pub fn select(&mut self) -> Result,MpscError> { - let mut receivers: Vec>> = Vec::with_capacity(self.receivers.len()); - let mut r_id: Option = None; - let mut r_index: usize = 0; - - { - let select = mpsc::Select::new(); - // we *must* allocate exact capacity for this, because the Handles *can't move* - let mut handles: Vec> = Vec::with_capacity(self.receivers.len()); + pub fn select(&mut self) -> Result, ChannelError> { + if self.receivers.is_empty() { + return Err(ChannelError::UnknownError); + } - for r in &self.receivers { - let inner_r = mem::replace(&mut *r.receiver.borrow_mut(), None); - receivers.push(inner_r); - } - - for r in &receivers { - unsafe { - handles.push(select.handle(r.as_ref().unwrap())); - handles.last_mut().unwrap().add(); + let mut sel = Select::with_timeout(::std::time::Duration::from_secs(1)); + loop { + for (index, rx) in self.receivers + .iter_mut() + .map(|r| r.receiver.get_mut().as_ref().unwrap()) + .enumerate() + { + if let Ok(msg) = sel.recv(rx) { + let r_id = self.receiver_ids[index]; + let ChannelMessage(data, channels, shmems) = msg; + let channels = channels.into_iter().map(OsOpaqueIpcChannel::new).collect(); + return Ok(vec![ + OsIpcSelectionResult::DataReceived(r_id, data, channels, shmems), + ]); } } - - let id = select.wait(); - - for (index,h) in handles.iter().enumerate() { - if h.id() == id { - r_index = index; - r_id = Some(self.receiver_ids[index]); - break; - } + if sel.timed_out() { // TODO: this should be any_disconnected + break; } } - // put the receivers back - for (index,r) in self.receivers.iter().enumerate() { - mem::replace(&mut *r.receiver.borrow_mut(), mem::replace(&mut receivers[index], None)); - } - - match r_id { - None => Err(MpscError::UnknownError), - Some(r_id) => { - let receivers = &mut self.receivers; - match receivers[r_index].recv() { - Ok((data, channels, shmems)) => - Ok(vec![OsIpcSelectionResult::DataReceived(r_id, data, channels, shmems)]), - Err(MpscError::ChannelClosedError) => { - receivers.remove(r_index); - self.receiver_ids.remove(r_index); - Ok(vec![OsIpcSelectionResult::ChannelClosed(r_id)]) - }, - Err(err) => Err(err), - } - } - } + let (index, _) = self.receivers + .iter_mut() + .map(|r| r.receiver.get_mut().as_ref().unwrap()) + .enumerate() + .find(|&(_, rx)| rx.is_disconnected()) + .unwrap(); + self.receivers.remove(index); + let r_id = self.receiver_ids.remove(index); + Ok(vec![OsIpcSelectionResult::ChannelClosed(r_id)]) } } @@ -249,7 +238,7 @@ pub struct OsIpcOneShotServer { } impl OsIpcOneShotServer { - pub fn new() -> Result<(OsIpcOneShotServer, String),MpscError> { + pub fn new() -> Result<(OsIpcOneShotServer, String), ChannelError> { let (sender, receiver) = try!(channel()); let name = Uuid::new_v4().to_string(); @@ -261,12 +250,23 @@ impl OsIpcOneShotServer { },name.clone())) } - pub fn accept(self) -> Result<(OsIpcReceiver, - Vec, - Vec, - Vec),MpscError> - { - let record = ONE_SHOT_SERVERS.lock().unwrap().get(&self.name).unwrap().clone(); + pub fn accept( + self, + ) -> Result< + ( + OsIpcReceiver, + Vec, + Vec, + Vec, + ), + ChannelError, + > { + let record = ONE_SHOT_SERVERS + .lock() + .unwrap() + .get(&self.name) + .unwrap() + .clone(); record.accept(); ONE_SHOT_SERVERS.lock().unwrap().remove(&self.name).unwrap(); let (data, channels, shmems) = try!(self.receiver.recv()); @@ -373,35 +373,37 @@ impl OsIpcSharedMemory { } #[derive(Debug, PartialEq)] -pub enum MpscError { +pub enum ChannelError { ChannelClosedError, BrokenPipeError, UnknownError, } -impl MpscError { +impl ChannelError { #[allow(dead_code)] pub fn channel_is_closed(&self) -> bool { - *self == MpscError::ChannelClosedError + *self == ChannelError::ChannelClosedError } } -impl From for bincode::Error { - fn from(mpsc_error: MpscError) -> Self { - Error::from(mpsc_error).into() +impl From for bincode::Error { + fn from(crossbeam_error: ChannelError) -> Self { + Error::from(crossbeam_error).into() } } -impl From for Error { - fn from(mpsc_error: MpscError) -> Error { - match mpsc_error { - MpscError::ChannelClosedError => { - Error::new(ErrorKind::ConnectionReset, "MPSC channel sender closed") +impl From for Error { + fn from(crossbeam_error: ChannelError) -> Error { + match crossbeam_error { + ChannelError::ChannelClosedError => { + Error::new(ErrorKind::ConnectionReset, "crossbeam-channel sender closed") + } + ChannelError::BrokenPipeError => { + Error::new(ErrorKind::BrokenPipe, "crossbeam-channel receiver closed") } - MpscError::BrokenPipeError => { - Error::new(ErrorKind::BrokenPipe, "MPSC channel receiver closed") + ChannelError::UnknownError => { + Error::new(ErrorKind::Other, "Other crossbeam-channel error") } - MpscError::UnknownError => Error::new(ErrorKind::Other, "Other MPSC channel error"), } } } diff --git a/src/platform/macos/mod.rs b/src/platform/macos/mod.rs index d97601ab5..907097fd8 100644 --- a/src/platform/macos/mod.rs +++ b/src/platform/macos/mod.rs @@ -307,7 +307,8 @@ impl OsIpcReceiver { #[derive(PartialEq, Debug)] pub struct OsIpcSender { port: mach_port_t, - // Make sure this is `!Sync`, to match `mpsc::Sender`; and to discourage sharing references. + // Make sure this is `!Sync`, to match `crossbeam_channel::Sender`; and to discourage sharing + // references. // // (Rather, senders should just be cloned, as they are shared internally anyway -- // another layer of sharing only adds unnecessary overhead...) diff --git a/src/router.rs b/src/router.rs index d41b763f9..7a7479171 100644 --- a/src/router.rs +++ b/src/router.rs @@ -9,9 +9,9 @@ use std::collections::HashMap; use std::sync::Mutex; -use std::sync::mpsc::{self, Receiver, Sender}; use std::thread; +use crossbeam_channel::{self, Receiver, Sender}; use ipc::{self, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender, OpaqueIpcMessage}; use ipc::{OpaqueIpcReceiver}; use serde::{Deserialize, Serialize}; @@ -26,7 +26,7 @@ pub struct RouterProxy { impl RouterProxy { pub fn new() -> RouterProxy { - let (msg_sender, msg_receiver) = mpsc::channel(); + let (msg_sender, msg_receiver) = crossbeam_channel::unbounded(); let (wakeup_sender, wakeup_receiver) = ipc::channel().unwrap(); thread::spawn(move || Router::new(msg_receiver, wakeup_receiver).run()); RouterProxy { @@ -44,29 +44,33 @@ impl RouterProxy { } /// A convenience function to route an `IpcReceiver` to an existing `Sender`. - pub fn route_ipc_receiver_to_mpsc_sender(&self, - ipc_receiver: IpcReceiver, - mpsc_sender: Sender) - where T: for<'de> Deserialize<'de> + - Serialize + - Send + - 'static { - self.add_route(ipc_receiver.to_opaque(), Box::new(move |message| { - drop(mpsc_sender.send(message.to::().unwrap())) - })) + pub fn route_ipc_receiver_to_crossbeam_sender( + &self, + ipc_receiver: IpcReceiver, + crossbeam_sender: Sender, + ) where + T: for<'de> Deserialize<'de> + Serialize + Send + 'static, + { + self.add_route( + ipc_receiver.to_opaque(), + Box::new(move |message| { + drop(crossbeam_sender.send(message.to::().unwrap())) + }), + ) } /// A convenience function to route an `IpcReceiver` to a `Receiver`: the most common /// use of a `Router`. - pub fn route_ipc_receiver_to_new_mpsc_receiver(&self, ipc_receiver: IpcReceiver) - -> Receiver - where T: for<'de> Deserialize<'de> + - Serialize + - Send + - 'static { - let (mpsc_sender, mpsc_receiver) = mpsc::channel(); - self.route_ipc_receiver_to_mpsc_sender(ipc_receiver, mpsc_sender); - mpsc_receiver + pub fn route_ipc_receiver_to_new_crossbeam_receiver( + &self, + ipc_receiver: IpcReceiver, + ) -> Receiver + where + T: for<'de> Deserialize<'de> + Serialize + Send + 'static, + { + let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded(); + self.route_ipc_receiver_to_crossbeam_sender(ipc_receiver, crossbeam_sender); + crossbeam_receiver } } @@ -129,4 +133,3 @@ enum RouterMsg { } pub type RouterHandler = Box; - diff --git a/src/test.rs b/src/test.rs index 4894d9081..340688c94 100644 --- a/src/test.rs +++ b/src/test.rs @@ -7,6 +7,7 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use crossbeam_channel::{self, Sender}; use ipc::{self, IpcReceiverSet, IpcSender, IpcSharedMemory}; #[cfg(not(any(feature = "force-inprocess", target_os = "windows", target_os = "android", target_os = "ios")))] use ipc::IpcReceiver; @@ -19,7 +20,6 @@ use std::iter; #[cfg(not(any(feature = "force-inprocess", target_os = "windows", target_os = "android", target_os = "ios")))] use std::ptr; use std::sync::Arc; -use std::sync::mpsc::{self, Sender}; use std::thread; #[cfg(not(any(feature = "force-inprocess", target_os = "windows", target_os = "android", target_os = "ios")))] @@ -164,7 +164,7 @@ fn router_simple() { let (tx, rx) = ipc::channel().unwrap(); tx.send(person.clone()).unwrap(); - let (callback_fired_sender, callback_fired_receiver) = mpsc::channel::(); + let (callback_fired_sender, callback_fired_receiver) = crossbeam_channel::unbounded::(); ROUTER.add_route(rx.to_opaque(), Box::new(move |person| { callback_fired_sender.send(person.to().unwrap()).unwrap() })); @@ -173,13 +173,13 @@ fn router_simple() { } #[test] -fn router_routing_to_new_mpsc_receiver() { +fn router_routing_to_new_crossbeam_receiver() { let person = ("Patrick Walton".to_owned(), 29); let (tx, rx) = ipc::channel().unwrap(); tx.send(person.clone()).unwrap(); - let mpsc_receiver = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(rx); - let received_person = mpsc_receiver.recv().unwrap(); + let crossbeam_receiver = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(rx); + let received_person = crossbeam_receiver.recv().unwrap(); assert_eq!(received_person, person); } @@ -191,10 +191,10 @@ fn router_multiplexing() { let (tx1, rx1) = ipc::channel().unwrap(); tx1.send(person.clone()).unwrap(); - let mpsc_rx_0 = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(rx0); - let mpsc_rx_1 = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(rx1); - let received_person_0 = mpsc_rx_0.recv().unwrap(); - let received_person_1 = mpsc_rx_1.recv().unwrap(); + let crossbeam_rx_0 = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(rx0); + let crossbeam_rx_1 = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(rx1); + let received_person_0 = crossbeam_rx_0.recv().unwrap(); + let received_person_1 = crossbeam_rx_1.recv().unwrap(); assert_eq!(received_person_0, person); assert_eq!(received_person_1, person); } @@ -210,10 +210,10 @@ fn router_multithreaded_multiplexing() { let (tx1, rx1) = ipc::channel().unwrap(); thread::spawn(move || tx1.send(person_for_thread).unwrap()); - let mpsc_rx_0 = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(rx0); - let mpsc_rx_1 = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(rx1); - let received_person_0 = mpsc_rx_0.recv().unwrap(); - let received_person_1 = mpsc_rx_1.recv().unwrap(); + let crossbeam_rx_0 = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(rx0); + let crossbeam_rx_1 = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(rx1); + let received_person_0 = crossbeam_rx_0.recv().unwrap(); + let received_person_1 = crossbeam_rx_1.recv().unwrap(); assert_eq!(received_person_0, person); assert_eq!(received_person_1, person); } @@ -231,7 +231,7 @@ fn router_drops_callbacks_on_sender_shutdown() { } let (tx0, rx0) = ipc::channel::<()>().unwrap(); - let (drop_tx, drop_rx) = mpsc::channel(); + let (drop_tx, drop_rx) = crossbeam_channel::unbounded(); let dropper = Dropper { sender: drop_tx, }; @@ -254,7 +254,7 @@ fn router_drops_callbacks_on_cloned_sender_shutdown() { } let (tx0, rx0) = ipc::channel::<()>().unwrap(); - let (drop_tx, drop_rx) = mpsc::channel(); + let (drop_tx, drop_rx) = crossbeam_channel::unbounded(); let dropper = Dropper { sender: drop_tx, }; @@ -276,7 +276,8 @@ fn router_big_data() { tx.send(people_for_subthread).unwrap(); }); - let (callback_fired_sender, callback_fired_receiver) = mpsc::channel::>(); + let (callback_fired_sender, callback_fired_receiver) = + crossbeam_channel::unbounded::>(); ROUTER.add_route(rx.to_opaque(), Box::new(move |people| { callback_fired_sender.send(people.to().unwrap()).unwrap() }));