From f3b12a6554d6e50e13f5cecbc7fbc7cacfa07eb2 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 29 Jan 2020 14:17:06 -0800 Subject: [PATCH] Break out on EINTR for shutdown on Unix This was a mistake in the previous "retry on EINTR" PR where we want to not retry in the case that we're being shut down with `pthread_kill`. Closes #23 --- src/lib.rs | 14 ++++++++++++-- src/unix.rs | 38 +++++++++++++++++++++++++++++++------- src/wasm.rs | 3 +-- src/windows.rs | 11 +++-------- tests/helper.rs | 37 +++++++++++++++++++++++++++++++++---- 5 files changed, 80 insertions(+), 23 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 815ecdf..6cb08b8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -458,7 +458,13 @@ impl HelperState { self.lock.lock().unwrap_or_else(|e| e.into_inner()) } - fn for_each_request(&self, mut f: impl FnMut()) { + /// Executes `f` for each request for a token, where `f` is expected to + /// block and then provide the original closure with a token once it's + /// acquired. + /// + /// This is an infinite loop until the helper thread is dropped, at which + /// point everything should get interrupted. + fn for_each_request(&self, mut f: impl FnMut(&HelperState)) { let mut lock = self.lock(); // We only execute while we could receive requests, but as soon as @@ -477,12 +483,16 @@ impl HelperState { // wait for a long time for a token. lock.requests -= 1; drop(lock); - f(); + f(self); lock = self.lock(); } lock.consumer_done = true; self.cvar.notify_one(); } + + fn producer_done(&self) -> bool { + self.lock().producer_done + } } #[test] diff --git a/src/unix.rs b/src/unix.rs index 4ba4f99..5dcfbf4 100644 --- a/src/unix.rs +++ b/src/unix.rs @@ -1,3 +1,4 @@ +use libc::c_int; use std::fs::File; use std::io::{self, Read, Write}; use std::mem; @@ -7,7 +8,6 @@ use std::ptr; use std::sync::{Arc, Once}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; -use libc::c_int; #[derive(Debug)] pub struct Client { @@ -102,6 +102,17 @@ impl Client { } pub fn acquire(&self) -> io::Result { + // Ignore interrupts and keep trying if that happens + loop { + if let Some(token) = self.acquire_allow_interrupts()? { + return Ok(token); + } + } + } + + /// Block waiting for a token, returning `None` if we're interrupted with + /// EINTR. + fn acquire_allow_interrupts(&self) -> io::Result> { // We don't actually know if the file descriptor here is set in // blocking or nonblocking mode. AFAIK all released versions of // `make` use blocking fds for the jobserver, but the unreleased @@ -130,7 +141,7 @@ impl Client { if libc::poll(&mut fd, 1, -1) == -1 { let e = io::Error::last_os_error(); match e.kind() { - io::ErrorKind::Interrupted => continue, + io::ErrorKind::Interrupted => return Ok(None), _ => return Err(e), } } @@ -139,7 +150,7 @@ impl Client { } let mut buf = [0]; match (&self.read).read(&mut buf) { - Ok(1) => return Ok(Acquired { byte: buf[0] }), + Ok(1) => return Ok(Some(Acquired { byte: buf[0] })), Ok(_) => { return Err(io::Error::new( io::ErrorKind::Other, @@ -147,7 +158,7 @@ impl Client { )) } Err(e) => match e.kind() { - io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted => continue, + io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted => return Ok(None), _ => return Err(e), }, } @@ -219,7 +230,20 @@ pub(crate) fn spawn_helper( let state2 = state.clone(); let thread = Builder::new().spawn(move || { - state2.for_each_request(|| f(client.acquire())); + state2.for_each_request(|helper| loop { + match client.inner.acquire_allow_interrupts() { + Ok(Some(data)) => { + break f(Ok(crate::Acquired { + client: client.inner.clone(), + data, + disabled: false, + })) + } + Err(e) => break f(Err(e)), + Ok(None) if helper.producer_done() => break, + Ok(None) => {} + } + }); })?; Ok(Helper { thread, state }) @@ -239,7 +263,7 @@ impl Helper { // This signal should interrupt any blocking `read` call with // `io::ErrorKind::Interrupt` and cause the thread to cleanly exit. // - // Note that we don'tdo this forever though since there's a chance + // Note that we don't do this forever though since there's a chance // of bugs, so only do this opportunistically to make a best effort // at clearing ourselves up. for _ in 0..100 { @@ -264,7 +288,7 @@ impl Helper { } // If we managed to actually see the consumer get done, then we can - // definitely wait for the thread. Otherwise it's... of in the ether + // definitely wait for the thread. Otherwise it's... off in the ether // I guess? if state.consumer_done { drop(self.thread.join()); diff --git a/src/wasm.rs b/src/wasm.rs index 236c7e5..b88a9d9 100644 --- a/src/wasm.rs +++ b/src/wasm.rs @@ -1,4 +1,3 @@ - use std::io; use std::process::Command; use std::sync::{Arc, Condvar, Mutex}; @@ -76,7 +75,7 @@ pub(crate) fn spawn_helper( mut f: Box) + Send>, ) -> io::Result { let thread = Builder::new().spawn(move || { - state.for_each_request(|| f(client.acquire())); + state.for_each_request(|_| f(client.acquire())); })?; Ok(Helper { thread: thread }) diff --git a/src/windows.rs b/src/windows.rs index 6c1fbd4..d795c1c 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -53,11 +53,7 @@ extern "system" { lMaximumCount: LONG, lpName: *const i8, ) -> HANDLE; - fn OpenSemaphoreA( - dwDesiredAccess: DWORD, - bInheritHandle: BOOL, - lpName: *const i8, - ) -> HANDLE; + fn OpenSemaphoreA(dwDesiredAccess: DWORD, bInheritHandle: BOOL, lpName: *const i8) -> HANDLE; fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD; #[link_name = "SystemFunction036"] fn RtlGenRandom(RandomBuffer: *mut u8, RandomBufferLength: u32) -> u8; @@ -95,8 +91,7 @@ impl Client { for _ in 0..100 { let mut bytes = [0; 4]; getrandom(&mut bytes)?; - let mut name = - format!("__rust_jobserver_semaphore_{}\0", u32::from_ne_bytes(bytes)); + let mut name = format!("__rust_jobserver_semaphore_{}\0", u32::from_ne_bytes(bytes)); unsafe { let create_limit = if limit == 0 { 1 } else { limit }; let r = CreateSemaphoreA( @@ -218,7 +213,7 @@ pub(crate) fn spawn_helper( let event2 = event.clone(); let thread = Builder::new().spawn(move || { let objects = [event2.0, client.inner.sem.0]; - state.for_each_request(|| { + state.for_each_request(|_| { const WAIT_OBJECT_1: u32 = WAIT_OBJECT_0 + 1; match unsafe { WaitForMultipleObjects(2, objects.as_ptr(), FALSE, INFINITE) } { WAIT_OBJECT_0 => return, diff --git a/tests/helper.rs b/tests/helper.rs index 79f204a..093a504 100644 --- a/tests/helper.rs +++ b/tests/helper.rs @@ -1,8 +1,7 @@ -extern crate jobserver; - -use std::sync::mpsc; - use jobserver::Client; +use std::sync::atomic::*; +use std::sync::mpsc; +use std::sync::*; macro_rules! t { ($e:expr) => { @@ -46,3 +45,33 @@ fn acquire() { helper.request_token(); drop(helper); } + +#[test] +fn prompt_shutdown() { + for _ in 0..100 { + let client = jobserver::Client::new(4).unwrap(); + let count = Arc::new(AtomicU32::new(0)); + let count2 = count.clone(); + let tokens = Arc::new(Mutex::new(Vec::new())); + let helper = client + .into_helper_thread(move |token| { + tokens.lock().unwrap().push(token); + count2.fetch_add(1, Ordering::SeqCst); + }) + .unwrap(); + + // Request more tokens than what are available. + for _ in 0..5 { + helper.request_token(); + } + // Wait for at least some of the requests to finish. + while count.load(Ordering::SeqCst) < 3 { + std::thread::yield_now(); + } + // Drop helper + let t = std::time::Instant::now(); + drop(helper); + let d = t.elapsed(); + assert!(d.as_secs_f64() < 0.5); + } +}