Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ipc-channel on Windows #108

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repository = "https://github.com/servo/ipc-channel"
force-inprocess = []
memfd = ["syscall"]
unstable = []
win32-trace = []

[dependencies]
bincode = "1.0.0-alpha2"
Expand All @@ -27,3 +28,7 @@ syscall = { version = "0.2.1", optional = true }

[dev-dependencies]
crossbeam = "0.2"

[target.'cfg(target_os = "windows")'.dependencies]
winapi = "0.2"
kernel32-sys = "0.2"
8 changes: 7 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

#![cfg_attr(any(feature = "force-inprocess", target_os = "windows", target_os = "android"),
#![cfg_attr(any(feature = "force-inprocess", target_os = "android"),
feature(mpsc_select))]
#![cfg_attr(all(feature = "unstable", test), feature(specialization))]

Expand All @@ -18,6 +18,7 @@ extern crate bincode;
extern crate libc;
extern crate rand;
extern crate serde;

#[cfg(any(feature = "force-inprocess", target_os = "windows", target_os = "android"))]
extern crate uuid;
#[cfg(all(not(feature = "force-inprocess"), any(target_os = "linux",
Expand All @@ -32,6 +33,11 @@ extern crate fnv;
extern crate syscall;


#[cfg(all(not(feature = "force-inprocess"), target_os = "windows"))]
extern crate winapi;
#[cfg(all(not(feature = "force-inprocess"), target_os = "windows"))]
extern crate kernel32;

pub mod ipc;
pub mod platform;
pub mod router;
Expand Down
11 changes: 9 additions & 2 deletions src/platform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,16 @@ mod os {
pub use super::macos::*;
}

#[cfg(any(feature = "force-inprocess", target_os = "windows", target_os = "android"))]
#[cfg(all(not(feature = "force-inprocess"), target_os = "windows"))]
mod windows;
#[cfg(all(not(feature = "force-inprocess"), target_os = "windows"))]
mod os {
pub use super::windows::*;
}

#[cfg(any(feature = "force-inprocess", target_os = "android"))]
mod inprocess;
#[cfg(any(feature = "force-inprocess", target_os = "windows", target_os = "android"))]
#[cfg(any(feature = "force-inprocess", target_os = "android"))]
mod os {
pub use super::inprocess::*;
}
Expand Down
224 changes: 220 additions & 4 deletions src/platform/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,35 @@
use platform::{self, OsIpcChannel, OsIpcReceiverSet};
use platform::{OsIpcSharedMemory};
use std::collections::HashMap;
#[cfg(not(any(feature = "force-inprocess", target_os = "android")))]
use std::process::{Command, Stdio};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;
#[cfg(not(any(feature = "force-inprocess", target_os = "android")))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also needed for other tests to compile (or you need to compile out the get_channel_name_arg too at least).

use std::env;

#[cfg(not(any(feature = "force-inprocess", target_os = "android")))]
use libc;
use platform::{OsIpcSender, OsIpcOneShotServer};
#[cfg(not(any(feature = "force-inprocess", target_os = "windows", target_os = "android")))]
use libc::{kill, SIGSTOP, SIGCONT};
#[cfg(not(any(feature = "force-inprocess", target_os = "windows", target_os = "android")))]
use test::{fork, Wait};

// Helper to get a channel_name argument passed in; used for the
// cross-process spawn server tests.
#[cfg(not(any(feature = "force-inprocess", target_os = "android")))]
fn get_channel_name_arg() -> Option<String> {
for arg in env::args() {
let arg_str = "channel_name:";
if arg.starts_with(arg_str) {
return Some(arg[arg_str.len()..].to_owned());
}
}
None
}

#[test]
fn simple() {
let (tx, rx) = platform::channel().unwrap();
Expand Down Expand Up @@ -208,7 +227,8 @@ fn with_n_fds(n: usize, size: usize) {

// These tests only apply to platforms that need fragmentation.
#[cfg(all(not(feature = "force-inprocess"), any(target_os = "linux",
target_os = "freebsd")))]
target_os = "freebsd",
target_os = "windows")))]
mod fragment_tests {
use platform;
use super::with_n_fds;
Expand Down Expand Up @@ -663,9 +683,52 @@ fn server_connect_first() {
(data, vec![], vec![]));
}

// Note! This test is actually used by the cross_process_spawn() test
// below as a second process. Running it by itself is meaningless, but
// passes.
#[cfg(not(any(feature = "force-inprocess", target_os = "android")))]
#[test]
#[ignore]
fn cross_process_server()
{
let data: &[u8] = b"1234567";
let channel_name = get_channel_name_arg();
if channel_name.is_none() {
return;
}

let tx = OsIpcSender::connect(channel_name.unwrap()).unwrap();
tx.send(data, vec![], vec![]).unwrap();
unsafe { libc::exit(0); }
}

#[cfg(not(any(feature = "force-inprocess", target_os = "android")))]
#[test]
fn cross_process_spawn() {
let (server, name) = OsIpcOneShotServer::new().unwrap();
let data: &[u8] = b"1234567";

let mut child_pid = Command::new(env::current_exe().unwrap())
.arg("--ignored")
.arg("cross_process_server")
.arg(format!("channel_name:{}", name))
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.expect("failed to execute server process");

let (_, mut received_data, received_channels, received_shared_memory_regions) =
server.accept().unwrap();
child_pid.wait().expect("failed to wait on child");
received_data.truncate(7);
assert_eq!((&received_data[..], received_channels, received_shared_memory_regions),
(data, vec![], vec![]));
}

#[cfg(not(any(feature = "force-inprocess", target_os = "windows", target_os = "android")))]
#[test]
fn cross_process() {
fn cross_process_fork() {
let (server, name) = OsIpcOneShotServer::new().unwrap();
let data: &[u8] = b"1234567";

Expand All @@ -682,9 +745,62 @@ fn cross_process() {
(data, vec![], vec![]));
}

// Note! This test is actually used by the cross_process_sender_transfer_spawn() test
// below as a second process. Running it by itself is meaningless, but
// passes.
#[cfg(not(any(feature = "force-inprocess", target_os = "android")))]
#[test]
#[ignore]
fn cross_process_sender_transfer_server()
{
let channel_name = get_channel_name_arg();
if channel_name.is_none() {
return;
}

let super_tx = OsIpcSender::connect(channel_name.unwrap()).unwrap();
let (sub_tx, sub_rx) = platform::channel().unwrap();
let data: &[u8] = b"foo";
super_tx.send(data, vec![OsIpcChannel::Sender(sub_tx)], vec![]).unwrap();
sub_rx.recv().unwrap();
let data: &[u8] = b"bar";
super_tx.send(data, vec![], vec![]).unwrap();
unsafe { libc::exit(0); }
}

#[cfg(not(any(feature = "force-inprocess", target_os = "android")))]
#[test]
fn cross_process_sender_transfer_spawn() {
let (server, name) = OsIpcOneShotServer::new().unwrap();

let mut child_pid = Command::new(env::current_exe().unwrap())
.arg("--ignored")
.arg("cross_process_sender_transfer_server")
.arg(format!("channel_name:{}", name))
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.expect("failed to execute server process");

let (super_rx, _, mut received_channels, _) = server.accept().unwrap();
assert_eq!(received_channels.len(), 1);
let sub_tx = received_channels[0].to_sender();
let data: &[u8] = b"baz";
sub_tx.send(data, vec![], vec![]).unwrap();

let data: &[u8] = b"bar";
let (mut received_data, received_channels, received_shared_memory_regions) =
super_rx.recv().unwrap();
child_pid.wait().expect("failed to wait on child");
received_data.truncate(3);
assert_eq!((&received_data[..], received_channels, received_shared_memory_regions),
(data, vec![], vec![]));
}

#[cfg(not(any(feature = "force-inprocess", target_os = "windows", target_os = "android")))]
#[test]
fn cross_process_sender_transfer() {
fn cross_process_sender_transfer_fork() {
let (server, name) = OsIpcOneShotServer::new().unwrap();

let child_pid = unsafe { fork(|| {
Expand All @@ -699,7 +815,7 @@ fn cross_process_sender_transfer() {

let (super_rx, _, mut received_channels, _) = server.accept().unwrap();
assert_eq!(received_channels.len(), 1);
let sub_tx = received_channels.pop().unwrap().to_sender();
let sub_tx = received_channels[0].to_sender();
let data: &[u8] = b"baz";
sub_tx.send(data, vec![], vec![]).unwrap();

Expand Down Expand Up @@ -880,3 +996,103 @@ mod sync_test {
platform::OsIpcSender::test_not_sync();
}
}

// Note! This test is actually used by the
// cross_process_two_step_transfer_spawn() test below. Running it by
// itself is meaningless, but it passes if run this way.
#[cfg(not(any(feature = "force-inprocess", target_os = "android")))]
#[test]
#[ignore]
fn cross_process_two_step_transfer_server()
{
let cookie: &[u8] = b"cookie";
let channel_name = get_channel_name_arg();
if channel_name.is_none() {
return;
}

// connect by name to our other process
let super_tx = OsIpcSender::connect(channel_name.unwrap()).unwrap();

// create a channel for real communication between the two processes
let (sub_tx, sub_rx) = platform::channel().unwrap();

// send the other process the tx side, so it can send us the channels
super_tx.send(&[], vec![OsIpcChannel::Sender(sub_tx)], vec![]).unwrap();

// get two_rx from the other process
let (_, mut received_channels, _) = sub_rx.recv().unwrap();
assert_eq!(received_channels.len(), 1);
let two_rx = received_channels[0].to_receiver();

// get one_rx from two_rx's buffer
let (_, mut received_channels, _) = two_rx.recv().unwrap();
assert_eq!(received_channels.len(), 1);
let one_rx = received_channels[0].to_receiver();

// get a cookie from one_rx
let (mut data, _, _) = one_rx.recv().unwrap();
data.truncate(cookie.len());
assert_eq!(&data[..], cookie);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do not actually need to convert data to a slice explicitly.

(Admittedly, many of the existing tests also have unnecessary conversions -- especially some of the first tests I wrote myself, before I really had a grasp on such things...)


// finally, send a cookie back
super_tx.send(&data, vec![], vec![]).unwrap();

// terminate
unsafe { libc::exit(0); }
}

// This test panics on Windows, because the other process will panic
// when it detects that it receives handles that are intended for another
// process. It's marked as ignore/known-fail on Windows for this reason.
//
// TODO -- this fails on OSX as well with a MACH_SEND_INVALID_RIGHT!
// Needs investigation. It may be a similar underlying issue, just done by
// the kernel instead of explicitly (ports in a message that's already
// buffered are intended for only one process).
#[cfg(not(any(feature = "force-inprocess", target_os = "android")))]
#[cfg_attr(any(target_os = "windows", target_os = "macos"), ignore)]
#[test]
fn cross_process_two_step_transfer_spawn() {
let cookie: &[u8] = b"cookie";

// create channel 1
let (one_tx, one_rx) = platform::channel().unwrap();
// put data in channel 1's pipe
one_tx.send(cookie, vec![], vec![]).unwrap();

// create channel 2
let (two_tx, two_rx) = platform::channel().unwrap();
// put channel 1's rx end in channel 2's pipe
two_tx.send(&[], vec![OsIpcChannel::Receiver(one_rx)], vec![]).unwrap();

// create a one-shot server, and spawn another process
let (server, name) = OsIpcOneShotServer::new().unwrap();
let mut child_pid = Command::new(env::current_exe().unwrap())
.arg("--ignored")
.arg("cross_process_two_step_transfer_server")
.arg(format!("channel_name:{}", name))
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.expect("failed to execute server process");

// The other process will have sent us a transmit channel in received channels
let (super_rx, _, mut received_channels, _) = server.accept().unwrap();
assert_eq!(received_channels.len(), 1);
let sub_tx = received_channels[0].to_sender();

// Send the outer payload channel, so the server can use it to
// retrive the inner payload and the cookie
sub_tx.send(&[], vec![OsIpcChannel::Receiver(two_rx)], vec![]).unwrap();

// Then we wait for the cookie to make its way back to us
let (mut received_data, received_channels, received_shared_memory_regions) =
super_rx.recv().unwrap();
let child_exit_code = child_pid.wait().expect("failed to wait on child");
assert!(child_exit_code.success());
received_data.truncate(cookie.len());
assert_eq!((&received_data[..], received_channels, received_shared_memory_regions),
(cookie, vec![], vec![]));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test looks good. However, it only checks moving a receiver that has data or another receiver in transfer. I'd say it would be good for completeness to also test moving the receiver while it has a sender or a SHM region in transfer?

(And possibly even moving a sender that has stuff in transfer? Shouldn't matter for your current implementation I think -- but it might for some other possible approaches I'd expect... Might be considered out of scope though I guess?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add this as a TODO for later. Senders and shm regions are handled identically by the Windows code (and are protected here via a single dest process ID), so there isn't any new bug that could be introduced via senders instead of receivers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realised that there is another, more interesting case that this test doesn't cover yet (I knew there is something missing... ;-) ): sending data or other channels over an outer channel while the receive end of that outer channel is already in transfer.

However, AIUI this should probably just work even with your current implementation?...

Loading