From b41247c65bfc02e09db36569a57a2a55696457f7 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 29 Jan 2020 14:05:40 -0800 Subject: [PATCH] Split implementations into their own files --- Cargo.toml | 3 + src/lib.rs | 679 +------------------------------------------------ src/unix.rs | 310 ++++++++++++++++++++++ src/wasm.rs | 91 +++++++ src/windows.rs | 251 ++++++++++++++++++ 5 files changed, 668 insertions(+), 666 deletions(-) create mode 100644 src/unix.rs create mode 100644 src/wasm.rs create mode 100644 src/windows.rs diff --git a/Cargo.toml b/Cargo.toml index 9c289ff..2a1575f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,9 @@ An implementation of the GNU make jobserver for Rust """ edition = "2018" +[dependencies] +cfg-if = "0.1.10" + [target.'cfg(unix)'.dependencies] libc = "0.2.50" diff --git a/src/lib.rs b/src/lib.rs index 14bdf64..815ecdf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,19 @@ use std::io; use std::process::Command; use std::sync::{Arc, Condvar, Mutex, MutexGuard}; +cfg_if::cfg_if! { + if #[cfg(unix)] { + mod unix; + use unix as imp; + } else if #[cfg(windows)] { + mod windows; + use windows as imp; + } else { + mod wasm; + use wasm as imp; + } +} + /// A client of a jobserver /// /// This structure is the main type exposed by this library, and is where @@ -472,672 +485,6 @@ impl HelperState { } } -#[cfg(unix)] -mod imp { - extern crate libc; - - use std::fs::File; - use std::io::{self, Read, Write}; - use std::mem; - use std::os::unix::prelude::*; - use std::process::Command; - use std::ptr; - use std::sync::{Arc, Once}; - use std::thread::{self, Builder, JoinHandle}; - use std::time::Duration; - - use self::libc::c_int; - - #[derive(Debug)] - pub struct Client { - read: File, - write: File, - } - - #[derive(Debug)] - pub struct Acquired { - byte: u8, - } - - impl Client { - pub fn new(limit: usize) -> io::Result { - let client = unsafe { Client::mk()? }; - // I don't think the character written here matters, but I could be - // wrong! - for _ in 0..limit { - (&client.write).write(&[b'|'])?; - } - Ok(client) - } - - unsafe fn mk() -> io::Result { - let mut pipes = [0; 2]; - - // Attempt atomically-create-with-cloexec if we can on Linux, - // detected by using the `syscall` function in `libc` to try to work - // with as many kernels/glibc implementations as possible. - #[cfg(target_os = "linux")] - { - static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true); - if PIPE2_AVAILABLE.load(Ordering::SeqCst) { - match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) { - -1 => { - let err = io::Error::last_os_error(); - if err.raw_os_error() == Some(libc::ENOSYS) { - PIPE2_AVAILABLE.store(false, Ordering::SeqCst); - } else { - return Err(err); - } - } - _ => return Ok(Client::from_fds(pipes[0], pipes[1])), - } - } - } - - cvt(libc::pipe(pipes.as_mut_ptr()))?; - drop(set_cloexec(pipes[0], true)); - drop(set_cloexec(pipes[1], true)); - Ok(Client::from_fds(pipes[0], pipes[1])) - } - - pub unsafe fn open(s: &str) -> Option { - let mut parts = s.splitn(2, ','); - let read = parts.next().unwrap(); - let write = match parts.next() { - Some(s) => s, - None => return None, - }; - - let read = match read.parse() { - Ok(n) => n, - Err(_) => return None, - }; - let write = match write.parse() { - Ok(n) => n, - Err(_) => return None, - }; - - // Ok so we've got two integers that look like file descriptors, but - // for extra sanity checking let's see if they actually look like - // instances of a pipe before we return the client. - // - // If we're called from `make` *without* the leading + on our rule - // then we'll have `MAKEFLAGS` env vars but won't actually have - // access to the file descriptors. - if is_valid_fd(read) && is_valid_fd(write) { - drop(set_cloexec(read, true)); - drop(set_cloexec(write, true)); - Some(Client::from_fds(read, write)) - } else { - None - } - } - - unsafe fn from_fds(read: c_int, write: c_int) -> Client { - Client { - read: File::from_raw_fd(read), - write: File::from_raw_fd(write), - } - } - - pub fn acquire(&self) -> io::Result { - // We don't actually know if the file descriptor here is set in - // blocking or nonblocking mode. AFAIK all released versions of - // `make` use blocking fds for the jobserver, but the unreleased - // version of `make` doesn't. In the unreleased version jobserver - // fds are set to nonblocking and combined with `pselect` - // internally. - // - // Here we try to be compatible with both strategies. We - // unconditionally expect the file descriptor to be in nonblocking - // mode and if it happens to be in blocking mode then most of this - // won't end up actually being necessary! - // - // We use `poll` here to block this thread waiting for read - // readiness, and then afterwards we perform the `read` itself. If - // the `read` returns that it would block then we start over and try - // again. - // - // Also note that we explicitly don't handle EINTR here. That's used - // to shut us down, so we otherwise punt all errors upwards. - unsafe { - let mut fd: libc::pollfd = mem::zeroed(); - fd.fd = self.read.as_raw_fd(); - fd.events = libc::POLLIN; - loop { - fd.revents = 0; - if libc::poll(&mut fd, 1, -1) == -1 { - let e = io::Error::last_os_error(); - match e.kind() { - io::ErrorKind::Interrupted => continue, - _ => return Err(e), - } - } - if fd.revents == 0 { - continue; - } - let mut buf = [0]; - match (&self.read).read(&mut buf) { - Ok(1) => return Ok(Acquired { byte: buf[0] }), - Ok(_) => { - return Err(io::Error::new( - io::ErrorKind::Other, - "early EOF on jobserver pipe", - )) - } - Err(e) => match e.kind() { - io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted => continue, - _ => return Err(e), - }, - } - } - } - } - - pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> { - // Note that the fd may be nonblocking but we're going to go ahead - // and assume that the writes here are always nonblocking (we can - // always quickly release a token). If that turns out to not be the - // case we'll get an error anyway! - let byte = data.map(|d| d.byte).unwrap_or(b'+'); - match (&self.write).write(&[byte])? { - 1 => Ok(()), - _ => Err(io::Error::new( - io::ErrorKind::Other, - "failed to write token back to jobserver", - )), - } - } - - pub fn string_arg(&self) -> String { - format!("{},{} -j", self.read.as_raw_fd(), self.write.as_raw_fd()) - } - - pub fn configure(&self, cmd: &mut Command) { - // Here we basically just want to say that in the child process - // we'll configure the read/write file descriptors to *not* be - // cloexec, so they're inherited across the exec and specified as - // integers through `string_arg` above. - let read = self.read.as_raw_fd(); - let write = self.write.as_raw_fd(); - unsafe { - cmd.pre_exec(move || { - set_cloexec(read, false)?; - set_cloexec(write, false)?; - Ok(()) - }); - } - } - } - - #[derive(Debug)] - pub struct Helper { - thread: JoinHandle<()>, - state: Arc, - } - - pub(crate) fn spawn_helper( - client: crate::Client, - state: Arc, - mut f: Box) + Send>, - ) -> io::Result { - static USR1_INIT: Once = Once::new(); - let mut err = None; - USR1_INIT.call_once(|| unsafe { - let mut new: libc::sigaction = mem::zeroed(); - new.sa_sigaction = sigusr1_handler as usize; - new.sa_flags = libc::SA_SIGINFO as _; - if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 { - err = Some(io::Error::last_os_error()); - } - }); - - if let Some(e) = err.take() { - return Err(e); - } - - let state2 = state.clone(); - let thread = Builder::new().spawn(move || { - state2.for_each_request(|| f(client.acquire())); - })?; - - Ok(Helper { thread, state }) - } - - impl Helper { - pub fn join(self) { - let dur = Duration::from_millis(10); - let mut state = self.state.lock(); - debug_assert!(state.producer_done); - - // We need to join our helper thread, and it could be blocked in one - // of two locations. First is the wait for a request, but the - // initial drop of `HelperState` will take care of that. Otherwise - // it may be blocked in `client.acquire()`. We actually have no way - // of interrupting that, so resort to `pthread_kill` as a fallback. - // This signal should interrupt any blocking `read` call with - // `io::ErrorKind::Interrupt` and cause the thread to cleanly exit. - // - // Note that we don'tdo this forever though since there's a chance - // of bugs, so only do this opportunistically to make a best effort - // at clearing ourselves up. - for _ in 0..100 { - if state.consumer_done { - break; - } - unsafe { - // Ignore the return value here of `pthread_kill`, - // apparently on OSX if you kill a dead thread it will - // return an error, but on other platforms it may not. In - // that sense we don't actually know if this will succeed or - // not! - libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1); - } - state = self - .state - .cvar - .wait_timeout(state, dur) - .unwrap_or_else(|e| e.into_inner()) - .0; - thread::yield_now(); // we really want the other thread to run - } - - // If we managed to actually see the consumer get done, then we can - // definitely wait for the thread. Otherwise it's... of in the ether - // I guess? - if state.consumer_done { - drop(self.thread.join()); - } - } - } - - fn is_valid_fd(fd: c_int) -> bool { - unsafe { - return libc::fcntl(fd, libc::F_GETFD) != -1; - } - } - - fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> { - unsafe { - let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?; - let new = if set { - previous | libc::FD_CLOEXEC - } else { - previous & !libc::FD_CLOEXEC - }; - if new != previous { - cvt(libc::fcntl(fd, libc::F_SETFD, new))?; - } - Ok(()) - } - } - - fn cvt(t: c_int) -> io::Result { - if t == -1 { - Err(io::Error::last_os_error()) - } else { - Ok(t) - } - } - - extern "C" fn sigusr1_handler( - _signum: c_int, - _info: *mut libc::siginfo_t, - _ptr: *mut libc::c_void, - ) { - // nothing to do - } -} - -#[cfg(windows)] -mod imp { - use std::ffi::CString; - use std::io; - use std::process::Command; - use std::ptr; - use std::sync::Arc; - use std::thread::{Builder, JoinHandle}; - - #[derive(Debug)] - pub struct Client { - sem: Handle, - name: String, - } - - #[derive(Debug)] - pub struct Acquired; - - type BOOL = i32; - type DWORD = u32; - type HANDLE = *mut u8; - type LONG = i32; - - const ERROR_ALREADY_EXISTS: DWORD = 183; - const FALSE: BOOL = 0; - const INFINITE: DWORD = 0xffffffff; - const SEMAPHORE_MODIFY_STATE: DWORD = 0x2; - const SYNCHRONIZE: DWORD = 0x00100000; - const TRUE: BOOL = 1; - const WAIT_OBJECT_0: DWORD = 0; - - extern "system" { - fn CloseHandle(handle: HANDLE) -> BOOL; - fn SetEvent(hEvent: HANDLE) -> BOOL; - fn WaitForMultipleObjects( - ncount: DWORD, - lpHandles: *const HANDLE, - bWaitAll: BOOL, - dwMilliseconds: DWORD, - ) -> DWORD; - fn CreateEventA( - lpEventAttributes: *mut u8, - bManualReset: BOOL, - bInitialState: BOOL, - lpName: *const i8, - ) -> HANDLE; - fn ReleaseSemaphore( - hSemaphore: HANDLE, - lReleaseCount: LONG, - lpPreviousCount: *mut LONG, - ) -> BOOL; - fn CreateSemaphoreA( - lpEventAttributes: *mut u8, - lInitialCount: LONG, - lMaximumCount: LONG, - lpName: *const i8, - ) -> HANDLE; - fn OpenSemaphoreA( - dwDesiredAccess: DWORD, - bInheritHandle: BOOL, - lpName: *const i8, - ) -> HANDLE; - fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD; - #[link_name = "SystemFunction036"] - fn RtlGenRandom(RandomBuffer: *mut u8, RandomBufferLength: u32) -> u8; - } - - // Note that we ideally would use the `getrandom` crate, but unfortunately - // that causes build issues when this crate is used in rust-lang/rust (see - // rust-lang/rust#65014 for more information). As a result we just inline - // the pretty simple Windows-specific implementation of generating - // randomness. - fn getrandom(dest: &mut [u8]) -> io::Result<()> { - // Prevent overflow of u32 - for chunk in dest.chunks_mut(u32::max_value() as usize) { - let ret = unsafe { RtlGenRandom(chunk.as_mut_ptr(), chunk.len() as u32) }; - if ret == 0 { - return Err(io::Error::new( - io::ErrorKind::Other, - "failed to generate random bytes", - )); - } - } - Ok(()) - } - - impl Client { - pub fn new(limit: usize) -> io::Result { - // Try a bunch of random semaphore names until we get a unique one, - // but don't try for too long. - // - // Note that `limit == 0` is a valid argument above but Windows - // won't let us create a semaphore with 0 slots available to it. Get - // `limit == 0` working by creating a semaphore instead with one - // slot and then immediately acquire it (without ever releaseing it - // back). - for _ in 0..100 { - let mut bytes = [0; 4]; - getrandom(&mut bytes)?; - let mut name = - format!("__rust_jobserver_semaphore_{}\0", u32::from_ne_bytes(bytes)); - unsafe { - let create_limit = if limit == 0 { 1 } else { limit }; - let r = CreateSemaphoreA( - ptr::null_mut(), - create_limit as LONG, - create_limit as LONG, - name.as_ptr() as *const _, - ); - if r.is_null() { - return Err(io::Error::last_os_error()); - } - let handle = Handle(r); - - let err = io::Error::last_os_error(); - if err.raw_os_error() == Some(ERROR_ALREADY_EXISTS as i32) { - continue; - } - name.pop(); // chop off the trailing nul - let client = Client { - sem: handle, - name: name, - }; - if create_limit != limit { - client.acquire()?; - } - return Ok(client); - } - } - - Err(io::Error::new( - io::ErrorKind::Other, - "failed to find a unique name for a semaphore", - )) - } - - pub unsafe fn open(s: &str) -> Option { - let name = match CString::new(s) { - Ok(s) => s, - Err(_) => return None, - }; - - let sem = OpenSemaphoreA(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, FALSE, name.as_ptr()); - if sem.is_null() { - None - } else { - Some(Client { - sem: Handle(sem), - name: s.to_string(), - }) - } - } - - pub fn acquire(&self) -> io::Result { - unsafe { - let r = WaitForSingleObject(self.sem.0, INFINITE); - if r == WAIT_OBJECT_0 { - Ok(Acquired) - } else { - Err(io::Error::last_os_error()) - } - } - } - - pub fn release(&self, _data: Option<&Acquired>) -> io::Result<()> { - unsafe { - let r = ReleaseSemaphore(self.sem.0, 1, ptr::null_mut()); - if r != 0 { - Ok(()) - } else { - Err(io::Error::last_os_error()) - } - } - } - - pub fn string_arg(&self) -> String { - self.name.clone() - } - - pub fn configure(&self, _cmd: &mut Command) { - // nothing to do here, we gave the name of our semaphore to the - // child above - } - } - - #[derive(Debug)] - struct Handle(HANDLE); - // HANDLE is a raw ptr, but we're send/sync - unsafe impl Sync for Handle {} - unsafe impl Send for Handle {} - - impl Drop for Handle { - fn drop(&mut self) { - unsafe { - CloseHandle(self.0); - } - } - } - - #[derive(Debug)] - pub struct Helper { - event: Arc, - thread: JoinHandle<()>, - } - - pub(crate) fn spawn_helper( - client: crate::Client, - state: Arc, - mut f: Box) + Send>, - ) -> io::Result { - let event = unsafe { - let r = CreateEventA(ptr::null_mut(), TRUE, FALSE, ptr::null()); - if r.is_null() { - return Err(io::Error::last_os_error()); - } else { - Handle(r) - } - }; - let event = Arc::new(event); - let event2 = event.clone(); - let thread = Builder::new().spawn(move || { - let objects = [event2.0, client.inner.sem.0]; - state.for_each_request(|| { - const WAIT_OBJECT_1: u32 = WAIT_OBJECT_0 + 1; - match unsafe { WaitForMultipleObjects(2, objects.as_ptr(), FALSE, INFINITE) } { - WAIT_OBJECT_0 => return, - WAIT_OBJECT_1 => f(Ok(crate::Acquired { - client: client.inner.clone(), - data: Acquired, - disabled: false, - })), - _ => f(Err(io::Error::last_os_error())), - } - }); - })?; - Ok(Helper { thread, event }) - } - - impl Helper { - pub fn join(self) { - // Unlike unix this logic is much easier. If our thread was blocked - // in waiting for requests it should already be woken up and - // exiting. Otherwise it's waiting for a token, so we wake it up - // with a different event that it's also waiting on here. After - // these two we should be guaranteed the thread is on its way out, - // so we can safely `join`. - let r = unsafe { SetEvent(self.event.0) }; - if r == 0 { - panic!("failed to set event: {}", io::Error::last_os_error()); - } - drop(self.thread.join()); - } - } -} - -#[cfg(not(any(unix, windows)))] -mod imp { - use std::io; - use std::process::Command; - use std::sync::{Arc, Condvar, Mutex}; - use std::thread::{Builder, JoinHandle}; - - #[derive(Debug)] - pub struct Client { - inner: Arc, - } - - #[derive(Debug)] - struct Inner { - count: Mutex, - cvar: Condvar, - } - - #[derive(Debug)] - pub struct Acquired(()); - - impl Client { - pub fn new(limit: usize) -> io::Result { - Ok(Client { - inner: Arc::new(Inner { - count: Mutex::new(limit), - cvar: Condvar::new(), - }), - }) - } - - pub unsafe fn open(_s: &str) -> Option { - None - } - - pub fn acquire(&self) -> io::Result { - let mut lock = self.inner.count.lock().unwrap_or_else(|e| e.into_inner()); - while *lock == 0 { - lock = self - .inner - .cvar - .wait(lock) - .unwrap_or_else(|e| e.into_inner()); - } - *lock -= 1; - Ok(Acquired(())) - } - - pub fn release(&self, _data: Option<&Acquired>) -> io::Result<()> { - let mut lock = self.inner.count.lock().unwrap_or_else(|e| e.into_inner()); - *lock += 1; - drop(lock); - self.inner.cvar.notify_one(); - Ok(()) - } - - pub fn string_arg(&self) -> String { - panic!( - "On this platform there is no cross process jobserver support, - so Client::configure is not supported." - ); - } - - pub fn configure(&self, _cmd: &mut Command) { - unreachable!(); - } - } - - #[derive(Debug)] - pub struct Helper { - thread: JoinHandle<()>, - } - - pub(crate) fn spawn_helper( - client: crate::Client, - state: Arc, - mut f: Box) + Send>, - ) -> io::Result { - let thread = Builder::new().spawn(move || { - state.for_each_request(|| f(client.acquire())); - })?; - - Ok(Helper { thread: thread }) - } - - impl Helper { - pub fn join(self) { - // TODO: this is not correct if the thread is blocked in - // `client.acquire()`. - drop(self.thread.join()); - } - } -} - #[test] fn no_helper_deadlock() { let x = crate::Client::new(32).unwrap(); diff --git a/src/unix.rs b/src/unix.rs new file mode 100644 index 0000000..4ba4f99 --- /dev/null +++ b/src/unix.rs @@ -0,0 +1,310 @@ +use std::fs::File; +use std::io::{self, Read, Write}; +use std::mem; +use std::os::unix::prelude::*; +use std::process::Command; +use std::ptr; +use std::sync::{Arc, Once}; +use std::thread::{self, Builder, JoinHandle}; +use std::time::Duration; +use libc::c_int; + +#[derive(Debug)] +pub struct Client { + read: File, + write: File, +} + +#[derive(Debug)] +pub struct Acquired { + byte: u8, +} + +impl Client { + pub fn new(limit: usize) -> io::Result { + let client = unsafe { Client::mk()? }; + // I don't think the character written here matters, but I could be + // wrong! + for _ in 0..limit { + (&client.write).write(&[b'|'])?; + } + Ok(client) + } + + unsafe fn mk() -> io::Result { + let mut pipes = [0; 2]; + + // Attempt atomically-create-with-cloexec if we can on Linux, + // detected by using the `syscall` function in `libc` to try to work + // with as many kernels/glibc implementations as possible. + #[cfg(target_os = "linux")] + { + static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true); + if PIPE2_AVAILABLE.load(Ordering::SeqCst) { + match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) { + -1 => { + let err = io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::ENOSYS) { + PIPE2_AVAILABLE.store(false, Ordering::SeqCst); + } else { + return Err(err); + } + } + _ => return Ok(Client::from_fds(pipes[0], pipes[1])), + } + } + } + + cvt(libc::pipe(pipes.as_mut_ptr()))?; + drop(set_cloexec(pipes[0], true)); + drop(set_cloexec(pipes[1], true)); + Ok(Client::from_fds(pipes[0], pipes[1])) + } + + pub unsafe fn open(s: &str) -> Option { + let mut parts = s.splitn(2, ','); + let read = parts.next().unwrap(); + let write = match parts.next() { + Some(s) => s, + None => return None, + }; + + let read = match read.parse() { + Ok(n) => n, + Err(_) => return None, + }; + let write = match write.parse() { + Ok(n) => n, + Err(_) => return None, + }; + + // Ok so we've got two integers that look like file descriptors, but + // for extra sanity checking let's see if they actually look like + // instances of a pipe before we return the client. + // + // If we're called from `make` *without* the leading + on our rule + // then we'll have `MAKEFLAGS` env vars but won't actually have + // access to the file descriptors. + if is_valid_fd(read) && is_valid_fd(write) { + drop(set_cloexec(read, true)); + drop(set_cloexec(write, true)); + Some(Client::from_fds(read, write)) + } else { + None + } + } + + unsafe fn from_fds(read: c_int, write: c_int) -> Client { + Client { + read: File::from_raw_fd(read), + write: File::from_raw_fd(write), + } + } + + pub fn acquire(&self) -> io::Result { + // We don't actually know if the file descriptor here is set in + // blocking or nonblocking mode. AFAIK all released versions of + // `make` use blocking fds for the jobserver, but the unreleased + // version of `make` doesn't. In the unreleased version jobserver + // fds are set to nonblocking and combined with `pselect` + // internally. + // + // Here we try to be compatible with both strategies. We + // unconditionally expect the file descriptor to be in nonblocking + // mode and if it happens to be in blocking mode then most of this + // won't end up actually being necessary! + // + // We use `poll` here to block this thread waiting for read + // readiness, and then afterwards we perform the `read` itself. If + // the `read` returns that it would block then we start over and try + // again. + // + // Also note that we explicitly don't handle EINTR here. That's used + // to shut us down, so we otherwise punt all errors upwards. + unsafe { + let mut fd: libc::pollfd = mem::zeroed(); + fd.fd = self.read.as_raw_fd(); + fd.events = libc::POLLIN; + loop { + fd.revents = 0; + if libc::poll(&mut fd, 1, -1) == -1 { + let e = io::Error::last_os_error(); + match e.kind() { + io::ErrorKind::Interrupted => continue, + _ => return Err(e), + } + } + if fd.revents == 0 { + continue; + } + let mut buf = [0]; + match (&self.read).read(&mut buf) { + Ok(1) => return Ok(Acquired { byte: buf[0] }), + Ok(_) => { + return Err(io::Error::new( + io::ErrorKind::Other, + "early EOF on jobserver pipe", + )) + } + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted => continue, + _ => return Err(e), + }, + } + } + } + } + + pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> { + // Note that the fd may be nonblocking but we're going to go ahead + // and assume that the writes here are always nonblocking (we can + // always quickly release a token). If that turns out to not be the + // case we'll get an error anyway! + let byte = data.map(|d| d.byte).unwrap_or(b'+'); + match (&self.write).write(&[byte])? { + 1 => Ok(()), + _ => Err(io::Error::new( + io::ErrorKind::Other, + "failed to write token back to jobserver", + )), + } + } + + pub fn string_arg(&self) -> String { + format!("{},{} -j", self.read.as_raw_fd(), self.write.as_raw_fd()) + } + + pub fn configure(&self, cmd: &mut Command) { + // Here we basically just want to say that in the child process + // we'll configure the read/write file descriptors to *not* be + // cloexec, so they're inherited across the exec and specified as + // integers through `string_arg` above. + let read = self.read.as_raw_fd(); + let write = self.write.as_raw_fd(); + unsafe { + cmd.pre_exec(move || { + set_cloexec(read, false)?; + set_cloexec(write, false)?; + Ok(()) + }); + } + } +} + +#[derive(Debug)] +pub struct Helper { + thread: JoinHandle<()>, + state: Arc, +} + +pub(crate) fn spawn_helper( + client: crate::Client, + state: Arc, + mut f: Box) + Send>, +) -> io::Result { + static USR1_INIT: Once = Once::new(); + let mut err = None; + USR1_INIT.call_once(|| unsafe { + let mut new: libc::sigaction = mem::zeroed(); + new.sa_sigaction = sigusr1_handler as usize; + new.sa_flags = libc::SA_SIGINFO as _; + if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 { + err = Some(io::Error::last_os_error()); + } + }); + + if let Some(e) = err.take() { + return Err(e); + } + + let state2 = state.clone(); + let thread = Builder::new().spawn(move || { + state2.for_each_request(|| f(client.acquire())); + })?; + + Ok(Helper { thread, state }) +} + +impl Helper { + pub fn join(self) { + let dur = Duration::from_millis(10); + let mut state = self.state.lock(); + debug_assert!(state.producer_done); + + // We need to join our helper thread, and it could be blocked in one + // of two locations. First is the wait for a request, but the + // initial drop of `HelperState` will take care of that. Otherwise + // it may be blocked in `client.acquire()`. We actually have no way + // of interrupting that, so resort to `pthread_kill` as a fallback. + // This signal should interrupt any blocking `read` call with + // `io::ErrorKind::Interrupt` and cause the thread to cleanly exit. + // + // Note that we don'tdo this forever though since there's a chance + // of bugs, so only do this opportunistically to make a best effort + // at clearing ourselves up. + for _ in 0..100 { + if state.consumer_done { + break; + } + unsafe { + // Ignore the return value here of `pthread_kill`, + // apparently on OSX if you kill a dead thread it will + // return an error, but on other platforms it may not. In + // that sense we don't actually know if this will succeed or + // not! + libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1); + } + state = self + .state + .cvar + .wait_timeout(state, dur) + .unwrap_or_else(|e| e.into_inner()) + .0; + thread::yield_now(); // we really want the other thread to run + } + + // If we managed to actually see the consumer get done, then we can + // definitely wait for the thread. Otherwise it's... of in the ether + // I guess? + if state.consumer_done { + drop(self.thread.join()); + } + } +} + +fn is_valid_fd(fd: c_int) -> bool { + unsafe { + return libc::fcntl(fd, libc::F_GETFD) != -1; + } +} + +fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> { + unsafe { + let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?; + let new = if set { + previous | libc::FD_CLOEXEC + } else { + previous & !libc::FD_CLOEXEC + }; + if new != previous { + cvt(libc::fcntl(fd, libc::F_SETFD, new))?; + } + Ok(()) + } +} + +fn cvt(t: c_int) -> io::Result { + if t == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(t) + } +} + +extern "C" fn sigusr1_handler( + _signum: c_int, + _info: *mut libc::siginfo_t, + _ptr: *mut libc::c_void, +) { + // nothing to do +} diff --git a/src/wasm.rs b/src/wasm.rs new file mode 100644 index 0000000..236c7e5 --- /dev/null +++ b/src/wasm.rs @@ -0,0 +1,91 @@ + +use std::io; +use std::process::Command; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread::{Builder, JoinHandle}; + +#[derive(Debug)] +pub struct Client { + inner: Arc, +} + +#[derive(Debug)] +struct Inner { + count: Mutex, + cvar: Condvar, +} + +#[derive(Debug)] +pub struct Acquired(()); + +impl Client { + pub fn new(limit: usize) -> io::Result { + Ok(Client { + inner: Arc::new(Inner { + count: Mutex::new(limit), + cvar: Condvar::new(), + }), + }) + } + + pub unsafe fn open(_s: &str) -> Option { + None + } + + pub fn acquire(&self) -> io::Result { + let mut lock = self.inner.count.lock().unwrap_or_else(|e| e.into_inner()); + while *lock == 0 { + lock = self + .inner + .cvar + .wait(lock) + .unwrap_or_else(|e| e.into_inner()); + } + *lock -= 1; + Ok(Acquired(())) + } + + pub fn release(&self, _data: Option<&Acquired>) -> io::Result<()> { + let mut lock = self.inner.count.lock().unwrap_or_else(|e| e.into_inner()); + *lock += 1; + drop(lock); + self.inner.cvar.notify_one(); + Ok(()) + } + + pub fn string_arg(&self) -> String { + panic!( + "On this platform there is no cross process jobserver support, + so Client::configure is not supported." + ); + } + + pub fn configure(&self, _cmd: &mut Command) { + unreachable!(); + } +} + +#[derive(Debug)] +pub struct Helper { + thread: JoinHandle<()>, +} + +pub(crate) fn spawn_helper( + client: crate::Client, + state: Arc, + mut f: Box) + Send>, +) -> io::Result { + let thread = Builder::new().spawn(move || { + state.for_each_request(|| f(client.acquire())); + })?; + + Ok(Helper { thread: thread }) +} + +impl Helper { + pub fn join(self) { + // TODO: this is not correct if the thread is blocked in + // `client.acquire()`. + drop(self.thread.join()); + } +} diff --git a/src/windows.rs b/src/windows.rs new file mode 100644 index 0000000..6c1fbd4 --- /dev/null +++ b/src/windows.rs @@ -0,0 +1,251 @@ +use std::ffi::CString; +use std::io; +use std::process::Command; +use std::ptr; +use std::sync::Arc; +use std::thread::{Builder, JoinHandle}; + +#[derive(Debug)] +pub struct Client { + sem: Handle, + name: String, +} + +#[derive(Debug)] +pub struct Acquired; + +type BOOL = i32; +type DWORD = u32; +type HANDLE = *mut u8; +type LONG = i32; + +const ERROR_ALREADY_EXISTS: DWORD = 183; +const FALSE: BOOL = 0; +const INFINITE: DWORD = 0xffffffff; +const SEMAPHORE_MODIFY_STATE: DWORD = 0x2; +const SYNCHRONIZE: DWORD = 0x00100000; +const TRUE: BOOL = 1; +const WAIT_OBJECT_0: DWORD = 0; + +extern "system" { + fn CloseHandle(handle: HANDLE) -> BOOL; + fn SetEvent(hEvent: HANDLE) -> BOOL; + fn WaitForMultipleObjects( + ncount: DWORD, + lpHandles: *const HANDLE, + bWaitAll: BOOL, + dwMilliseconds: DWORD, + ) -> DWORD; + fn CreateEventA( + lpEventAttributes: *mut u8, + bManualReset: BOOL, + bInitialState: BOOL, + lpName: *const i8, + ) -> HANDLE; + fn ReleaseSemaphore( + hSemaphore: HANDLE, + lReleaseCount: LONG, + lpPreviousCount: *mut LONG, + ) -> BOOL; + fn CreateSemaphoreA( + lpEventAttributes: *mut u8, + lInitialCount: LONG, + lMaximumCount: LONG, + lpName: *const i8, + ) -> HANDLE; + fn OpenSemaphoreA( + dwDesiredAccess: DWORD, + bInheritHandle: BOOL, + lpName: *const i8, + ) -> HANDLE; + fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD; + #[link_name = "SystemFunction036"] + fn RtlGenRandom(RandomBuffer: *mut u8, RandomBufferLength: u32) -> u8; +} + +// Note that we ideally would use the `getrandom` crate, but unfortunately +// that causes build issues when this crate is used in rust-lang/rust (see +// rust-lang/rust#65014 for more information). As a result we just inline +// the pretty simple Windows-specific implementation of generating +// randomness. +fn getrandom(dest: &mut [u8]) -> io::Result<()> { + // Prevent overflow of u32 + for chunk in dest.chunks_mut(u32::max_value() as usize) { + let ret = unsafe { RtlGenRandom(chunk.as_mut_ptr(), chunk.len() as u32) }; + if ret == 0 { + return Err(io::Error::new( + io::ErrorKind::Other, + "failed to generate random bytes", + )); + } + } + Ok(()) +} + +impl Client { + pub fn new(limit: usize) -> io::Result { + // Try a bunch of random semaphore names until we get a unique one, + // but don't try for too long. + // + // Note that `limit == 0` is a valid argument above but Windows + // won't let us create a semaphore with 0 slots available to it. Get + // `limit == 0` working by creating a semaphore instead with one + // slot and then immediately acquire it (without ever releaseing it + // back). + for _ in 0..100 { + let mut bytes = [0; 4]; + getrandom(&mut bytes)?; + let mut name = + format!("__rust_jobserver_semaphore_{}\0", u32::from_ne_bytes(bytes)); + unsafe { + let create_limit = if limit == 0 { 1 } else { limit }; + let r = CreateSemaphoreA( + ptr::null_mut(), + create_limit as LONG, + create_limit as LONG, + name.as_ptr() as *const _, + ); + if r.is_null() { + return Err(io::Error::last_os_error()); + } + let handle = Handle(r); + + let err = io::Error::last_os_error(); + if err.raw_os_error() == Some(ERROR_ALREADY_EXISTS as i32) { + continue; + } + name.pop(); // chop off the trailing nul + let client = Client { + sem: handle, + name: name, + }; + if create_limit != limit { + client.acquire()?; + } + return Ok(client); + } + } + + Err(io::Error::new( + io::ErrorKind::Other, + "failed to find a unique name for a semaphore", + )) + } + + pub unsafe fn open(s: &str) -> Option { + let name = match CString::new(s) { + Ok(s) => s, + Err(_) => return None, + }; + + let sem = OpenSemaphoreA(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, FALSE, name.as_ptr()); + if sem.is_null() { + None + } else { + Some(Client { + sem: Handle(sem), + name: s.to_string(), + }) + } + } + + pub fn acquire(&self) -> io::Result { + unsafe { + let r = WaitForSingleObject(self.sem.0, INFINITE); + if r == WAIT_OBJECT_0 { + Ok(Acquired) + } else { + Err(io::Error::last_os_error()) + } + } + } + + pub fn release(&self, _data: Option<&Acquired>) -> io::Result<()> { + unsafe { + let r = ReleaseSemaphore(self.sem.0, 1, ptr::null_mut()); + if r != 0 { + Ok(()) + } else { + Err(io::Error::last_os_error()) + } + } + } + + pub fn string_arg(&self) -> String { + self.name.clone() + } + + pub fn configure(&self, _cmd: &mut Command) { + // nothing to do here, we gave the name of our semaphore to the + // child above + } +} + +#[derive(Debug)] +struct Handle(HANDLE); +// HANDLE is a raw ptr, but we're send/sync +unsafe impl Sync for Handle {} +unsafe impl Send for Handle {} + +impl Drop for Handle { + fn drop(&mut self) { + unsafe { + CloseHandle(self.0); + } + } +} + +#[derive(Debug)] +pub struct Helper { + event: Arc, + thread: JoinHandle<()>, +} + +pub(crate) fn spawn_helper( + client: crate::Client, + state: Arc, + mut f: Box) + Send>, +) -> io::Result { + let event = unsafe { + let r = CreateEventA(ptr::null_mut(), TRUE, FALSE, ptr::null()); + if r.is_null() { + return Err(io::Error::last_os_error()); + } else { + Handle(r) + } + }; + let event = Arc::new(event); + let event2 = event.clone(); + let thread = Builder::new().spawn(move || { + let objects = [event2.0, client.inner.sem.0]; + state.for_each_request(|| { + const WAIT_OBJECT_1: u32 = WAIT_OBJECT_0 + 1; + match unsafe { WaitForMultipleObjects(2, objects.as_ptr(), FALSE, INFINITE) } { + WAIT_OBJECT_0 => return, + WAIT_OBJECT_1 => f(Ok(crate::Acquired { + client: client.inner.clone(), + data: Acquired, + disabled: false, + })), + _ => f(Err(io::Error::last_os_error())), + } + }); + })?; + Ok(Helper { thread, event }) +} + +impl Helper { + pub fn join(self) { + // Unlike unix this logic is much easier. If our thread was blocked + // in waiting for requests it should already be woken up and + // exiting. Otherwise it's waiting for a token, so we wake it up + // with a different event that it's also waiting on here. After + // these two we should be guaranteed the thread is on its way out, + // so we can safely `join`. + let r = unsafe { SetEvent(self.event.0) }; + if r == 0 { + panic!("failed to set event: {}", io::Error::last_os_error()); + } + drop(self.thread.join()); + } +}