Skip to content

Commit

Permalink
Switch to crossbeam-channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Stjepan Glavina authored and SimonSapin committed Dec 18, 2017
1 parent 4fe9d58 commit a78a743
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 147 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ipc-channel"
version = "0.9.1"
version = "0.10.0"
description = "A multiprocess drop-in replacement for Rust channels"
authors = ["The Servo Project Developers"]
license = "MIT/Apache-2.0"
Expand All @@ -14,6 +14,7 @@ async = ["futures"]

[dependencies]
bincode = "0.9"
crossbeam-channel = "0.1.1"
lazy_static = "1"
libc = "0.2.12"
rand = "0.3"
Expand Down
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

#![cfg_attr(any(feature = "force-inprocess", target_os = "windows", target_os = "android", target_os = "ios"),
feature(mpsc_select))]
#![cfg_attr(all(feature = "unstable", test), feature(specialization))]

#[macro_use]
extern crate lazy_static;

extern crate bincode;
extern crate crossbeam_channel;
extern crate libc;
extern crate rand;
extern crate serde;
Expand Down
212 changes: 107 additions & 105 deletions src/platform/inprocess/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// except according to those terms.

use bincode;
use std::sync::mpsc;
use crossbeam_channel::{self, Receiver, Select, Sender};
use std::sync::{Arc, Mutex};
use std::collections::hash_map::HashMap;
use std::cell::{RefCell};
Expand All @@ -17,29 +17,28 @@ use std::slice;
use std::fmt::{self, Debug, Formatter};
use std::cmp::{PartialEq};
use std::ops::{Deref, RangeFrom};
use std::mem;
use std::usize;
use uuid::Uuid;

#[derive(Clone)]
struct ServerRecord {
sender: OsIpcSender,
conn_sender: mpsc::Sender<bool>,
conn_receiver: Arc<Mutex<mpsc::Receiver<bool>>>,
conn_sender: Sender<bool>,
conn_receiver: Receiver<bool>,
}

impl ServerRecord {
fn new(sender: OsIpcSender) -> ServerRecord {
let (tx, rx) = mpsc::channel::<bool>();
let (tx, rx) = crossbeam_channel::unbounded::<bool>();
ServerRecord {
sender: sender,
conn_sender: tx,
conn_receiver: Arc::new(Mutex::new(rx)),
conn_receiver: rx,
}
}

fn accept(&self) {
self.conn_receiver.lock().unwrap().recv().unwrap();
self.conn_receiver.recv().unwrap();
}

fn connect(&self) {
Expand All @@ -51,16 +50,16 @@ lazy_static! {
static ref ONE_SHOT_SERVERS: Mutex<HashMap<String,ServerRecord>> = Mutex::new(HashMap::new());
}

struct MpscChannelMessage(Vec<u8>, Vec<OsIpcChannel>, Vec<OsIpcSharedMemory>);
struct ChannelMessage(Vec<u8>, Vec<OsIpcChannel>, Vec<OsIpcSharedMemory>);

pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver),MpscError> {
let (base_sender, base_receiver) = mpsc::channel::<MpscChannelMessage>();
pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), ChannelError> {
let (base_sender, base_receiver) = crossbeam_channel::unbounded::<ChannelMessage>();
Ok((OsIpcSender::new(base_sender), OsIpcReceiver::new(base_receiver)))
}

#[derive(Debug)]
pub struct OsIpcReceiver {
receiver: RefCell<Option<mpsc::Receiver<MpscChannelMessage>>>,
receiver: RefCell<Option<Receiver<ChannelMessage>>>,
}

impl PartialEq for OsIpcReceiver {
Expand All @@ -71,7 +70,7 @@ impl PartialEq for OsIpcReceiver {
}

impl OsIpcReceiver {
fn new(receiver: mpsc::Receiver<MpscChannelMessage>) -> OsIpcReceiver {
fn new(receiver: Receiver<ChannelMessage>) -> OsIpcReceiver {
OsIpcReceiver {
receiver: RefCell::new(Some(receiver)),
}
Expand All @@ -82,31 +81,37 @@ impl OsIpcReceiver {
OsIpcReceiver::new(receiver.unwrap())
}

pub fn recv(&self) -> Result<(Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>),MpscError> {
pub fn recv(
&self
) -> Result<(Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>), ChannelError> {
let r = self.receiver.borrow();
match r.as_ref().unwrap().recv() {
Ok(MpscChannelMessage(d,c,s)) => Ok((d,
c.into_iter().map(OsOpaqueIpcChannel::new).collect(),
s)),
Err(_) => Err(MpscError::ChannelClosedError),
Ok(ChannelMessage(d, c, s)) => {
Ok((d, c.into_iter().map(OsOpaqueIpcChannel::new).collect(), s))
}
Err(_) => Err(ChannelError::ChannelClosedError),
}
}

pub fn try_recv(&self) -> Result<(Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>),MpscError> {
pub fn try_recv(
&self
) -> Result<(Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>), ChannelError> {
let r = self.receiver.borrow();
match r.as_ref().unwrap().try_recv() {
Ok(MpscChannelMessage(d,c,s)) => Ok((d,
c.into_iter().map(OsOpaqueIpcChannel::new).collect(),
s)),
Err(mpsc::TryRecvError::Disconnected) => Err(MpscError::ChannelClosedError),
Err(_) => Err(MpscError::UnknownError),
Ok(ChannelMessage(d, c, s)) => {
Ok((d, c.into_iter().map(OsOpaqueIpcChannel::new).collect(), s))
}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
Err(ChannelError::ChannelClosedError)
}
Err(_) => Err(ChannelError::UnknownError),
}
}
}

#[derive(Clone, Debug)]
pub struct OsIpcSender {
sender: RefCell<mpsc::Sender<MpscChannelMessage>>,
sender: RefCell<Sender<ChannelMessage>>,
}

impl PartialEq for OsIpcSender {
Expand All @@ -117,13 +122,13 @@ impl PartialEq for OsIpcSender {
}

impl OsIpcSender {
fn new(sender: mpsc::Sender<MpscChannelMessage>) -> OsIpcSender {
fn new(sender: Sender<ChannelMessage>) -> OsIpcSender {
OsIpcSender {
sender: RefCell::new(sender),
}
}

pub fn connect(name: String) -> Result<OsIpcSender,MpscError> {
pub fn connect(name: String) -> Result<OsIpcSender, ChannelError> {
let record = ONE_SHOT_SERVERS.lock().unwrap().get(&name).unwrap().clone();
record.connect();
Ok(record.sender)
Expand All @@ -133,14 +138,17 @@ impl OsIpcSender {
usize::MAX
}

pub fn send(&self,
data: &[u8],
ports: Vec<OsIpcChannel>,
shared_memory_regions: Vec<OsIpcSharedMemory>)
-> Result<(),MpscError>
{
match self.sender.borrow().send(MpscChannelMessage(data.to_vec(), ports, shared_memory_regions)) {
Err(_) => Err(MpscError::BrokenPipeError),
pub fn send(
&self,
data: &[u8],
ports: Vec<OsIpcChannel>,
shared_memory_regions: Vec<OsIpcSharedMemory>,
) -> Result<(), ChannelError> {
match self.sender
.borrow()
.send(ChannelMessage(data.to_vec(), ports, shared_memory_regions))
{
Err(_) => Err(ChannelError::BrokenPipeError),
Ok(_) => Ok(()),
}
}
Expand All @@ -153,75 +161,56 @@ pub struct OsIpcReceiverSet {
}

impl OsIpcReceiverSet {
pub fn new() -> Result<OsIpcReceiverSet,MpscError> {
pub fn new() -> Result<OsIpcReceiverSet, ChannelError> {
Ok(OsIpcReceiverSet {
incrementor: 0..,
receiver_ids: vec![],
receivers: vec![],
})
}

pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64,MpscError> {
pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64, ChannelError> {
let last_index = self.incrementor.next().unwrap();
self.receiver_ids.push(last_index);
self.receivers.push(receiver.consume());
Ok(last_index)
}

pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>,MpscError> {
let mut receivers: Vec<Option<mpsc::Receiver<MpscChannelMessage>>> = Vec::with_capacity(self.receivers.len());
let mut r_id: Option<u64> = None;
let mut r_index: usize = 0;

{
let select = mpsc::Select::new();
// we *must* allocate exact capacity for this, because the Handles *can't move*
let mut handles: Vec<mpsc::Handle<MpscChannelMessage>> = Vec::with_capacity(self.receivers.len());
pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>, ChannelError> {
if self.receivers.is_empty() {
return Err(ChannelError::UnknownError);
}

for r in &self.receivers {
let inner_r = mem::replace(&mut *r.receiver.borrow_mut(), None);
receivers.push(inner_r);
}

for r in &receivers {
unsafe {
handles.push(select.handle(r.as_ref().unwrap()));
handles.last_mut().unwrap().add();
let mut sel = Select::with_timeout(::std::time::Duration::from_secs(1));
loop {
for (index, rx) in self.receivers
.iter_mut()
.map(|r| r.receiver.get_mut().as_ref().unwrap())
.enumerate()
{
if let Ok(msg) = sel.recv(rx) {
let r_id = self.receiver_ids[index];
let ChannelMessage(data, channels, shmems) = msg;
let channels = channels.into_iter().map(OsOpaqueIpcChannel::new).collect();
return Ok(vec![
OsIpcSelectionResult::DataReceived(r_id, data, channels, shmems),
]);
}
}

let id = select.wait();

for (index,h) in handles.iter().enumerate() {
if h.id() == id {
r_index = index;
r_id = Some(self.receiver_ids[index]);
break;
}
if sel.timed_out() { // TODO: this should be any_disconnected
break;
}
}

// put the receivers back
for (index,r) in self.receivers.iter().enumerate() {
mem::replace(&mut *r.receiver.borrow_mut(), mem::replace(&mut receivers[index], None));
}

match r_id {
None => Err(MpscError::UnknownError),
Some(r_id) => {
let receivers = &mut self.receivers;
match receivers[r_index].recv() {
Ok((data, channels, shmems)) =>
Ok(vec![OsIpcSelectionResult::DataReceived(r_id, data, channels, shmems)]),
Err(MpscError::ChannelClosedError) => {
receivers.remove(r_index);
self.receiver_ids.remove(r_index);
Ok(vec![OsIpcSelectionResult::ChannelClosed(r_id)])
},
Err(err) => Err(err),
}
}
}
let (index, _) = self.receivers
.iter_mut()
.map(|r| r.receiver.get_mut().as_ref().unwrap())
.enumerate()
.find(|&(_, rx)| rx.is_disconnected())
.unwrap();
self.receivers.remove(index);
let r_id = self.receiver_ids.remove(index);
Ok(vec![OsIpcSelectionResult::ChannelClosed(r_id)])
}
}

Expand Down Expand Up @@ -249,7 +238,7 @@ pub struct OsIpcOneShotServer {
}

impl OsIpcOneShotServer {
pub fn new() -> Result<(OsIpcOneShotServer, String),MpscError> {
pub fn new() -> Result<(OsIpcOneShotServer, String), ChannelError> {
let (sender, receiver) = try!(channel());

let name = Uuid::new_v4().to_string();
Expand All @@ -261,12 +250,23 @@ impl OsIpcOneShotServer {
},name.clone()))
}

pub fn accept(self) -> Result<(OsIpcReceiver,
Vec<u8>,
Vec<OsOpaqueIpcChannel>,
Vec<OsIpcSharedMemory>),MpscError>
{
let record = ONE_SHOT_SERVERS.lock().unwrap().get(&self.name).unwrap().clone();
pub fn accept(
self,
) -> Result<
(
OsIpcReceiver,
Vec<u8>,
Vec<OsOpaqueIpcChannel>,
Vec<OsIpcSharedMemory>,
),
ChannelError,
> {
let record = ONE_SHOT_SERVERS
.lock()
.unwrap()
.get(&self.name)
.unwrap()
.clone();
record.accept();
ONE_SHOT_SERVERS.lock().unwrap().remove(&self.name).unwrap();
let (data, channels, shmems) = try!(self.receiver.recv());
Expand Down Expand Up @@ -373,35 +373,37 @@ impl OsIpcSharedMemory {
}

#[derive(Debug, PartialEq)]
pub enum MpscError {
pub enum ChannelError {
ChannelClosedError,
BrokenPipeError,
UnknownError,
}

impl MpscError {
impl ChannelError {
#[allow(dead_code)]
pub fn channel_is_closed(&self) -> bool {
*self == MpscError::ChannelClosedError
*self == ChannelError::ChannelClosedError
}
}

impl From<MpscError> for bincode::Error {
fn from(mpsc_error: MpscError) -> Self {
Error::from(mpsc_error).into()
impl From<ChannelError> for bincode::Error {
fn from(crossbeam_error: ChannelError) -> Self {
Error::from(crossbeam_error).into()
}
}

impl From<MpscError> for Error {
fn from(mpsc_error: MpscError) -> Error {
match mpsc_error {
MpscError::ChannelClosedError => {
Error::new(ErrorKind::ConnectionReset, "MPSC channel sender closed")
impl From<ChannelError> for Error {
fn from(crossbeam_error: ChannelError) -> Error {
match crossbeam_error {
ChannelError::ChannelClosedError => {
Error::new(ErrorKind::ConnectionReset, "crossbeam-channel sender closed")
}
ChannelError::BrokenPipeError => {
Error::new(ErrorKind::BrokenPipe, "crossbeam-channel receiver closed")
}
MpscError::BrokenPipeError => {
Error::new(ErrorKind::BrokenPipe, "MPSC channel receiver closed")
ChannelError::UnknownError => {
Error::new(ErrorKind::Other, "Other crossbeam-channel error")
}
MpscError::UnknownError => Error::new(ErrorKind::Other, "Other MPSC channel error"),
}
}
}
Expand Down
Loading

0 comments on commit a78a743

Please sign in to comment.