Skip to content

Commit

Permalink
Break out on EINTR for shutdown on Unix
Browse files Browse the repository at this point in the history
This was a mistake in the previous "retry on EINTR" PR where we want to
not retry in the case that we're being shut down with `pthread_kill`.

Closes #23
  • Loading branch information
alexcrichton committed Jan 29, 2020
1 parent b41247c commit f3b12a6
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 23 deletions.
14 changes: 12 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,13 @@ impl HelperState {
self.lock.lock().unwrap_or_else(|e| e.into_inner())
}

fn for_each_request(&self, mut f: impl FnMut()) {
/// Executes `f` for each request for a token, where `f` is expected to
/// block and then provide the original closure with a token once it's
/// acquired.
///
/// This is an infinite loop until the helper thread is dropped, at which
/// point everything should get interrupted.
fn for_each_request(&self, mut f: impl FnMut(&HelperState)) {
let mut lock = self.lock();

// We only execute while we could receive requests, but as soon as
Expand All @@ -477,12 +483,16 @@ impl HelperState {
// wait for a long time for a token.
lock.requests -= 1;
drop(lock);
f();
f(self);
lock = self.lock();
}
lock.consumer_done = true;
self.cvar.notify_one();
}

fn producer_done(&self) -> bool {
self.lock().producer_done
}
}

#[test]
Expand Down
38 changes: 31 additions & 7 deletions src/unix.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use libc::c_int;
use std::fs::File;
use std::io::{self, Read, Write};
use std::mem;
Expand All @@ -7,7 +8,6 @@ use std::ptr;
use std::sync::{Arc, Once};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use libc::c_int;

#[derive(Debug)]
pub struct Client {
Expand Down Expand Up @@ -102,6 +102,17 @@ impl Client {
}

pub fn acquire(&self) -> io::Result<Acquired> {
// Ignore interrupts and keep trying if that happens
loop {
if let Some(token) = self.acquire_allow_interrupts()? {
return Ok(token);
}
}
}

/// Block waiting for a token, returning `None` if we're interrupted with
/// EINTR.
fn acquire_allow_interrupts(&self) -> io::Result<Option<Acquired>> {
// We don't actually know if the file descriptor here is set in
// blocking or nonblocking mode. AFAIK all released versions of
// `make` use blocking fds for the jobserver, but the unreleased
Expand Down Expand Up @@ -130,7 +141,7 @@ impl Client {
if libc::poll(&mut fd, 1, -1) == -1 {
let e = io::Error::last_os_error();
match e.kind() {
io::ErrorKind::Interrupted => continue,
io::ErrorKind::Interrupted => return Ok(None),
_ => return Err(e),
}
}
Expand All @@ -139,15 +150,15 @@ impl Client {
}
let mut buf = [0];
match (&self.read).read(&mut buf) {
Ok(1) => return Ok(Acquired { byte: buf[0] }),
Ok(1) => return Ok(Some(Acquired { byte: buf[0] })),
Ok(_) => {
return Err(io::Error::new(
io::ErrorKind::Other,
"early EOF on jobserver pipe",
))
}
Err(e) => match e.kind() {
io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted => continue,
io::ErrorKind::WouldBlock | io::ErrorKind::Interrupted => return Ok(None),
_ => return Err(e),
},
}
Expand Down Expand Up @@ -219,7 +230,20 @@ pub(crate) fn spawn_helper(

let state2 = state.clone();
let thread = Builder::new().spawn(move || {
state2.for_each_request(|| f(client.acquire()));
state2.for_each_request(|helper| loop {
match client.inner.acquire_allow_interrupts() {
Ok(Some(data)) => {
break f(Ok(crate::Acquired {
client: client.inner.clone(),
data,
disabled: false,
}))
}
Err(e) => break f(Err(e)),
Ok(None) if helper.producer_done() => break,
Ok(None) => {}
}
});
})?;

Ok(Helper { thread, state })
Expand All @@ -239,7 +263,7 @@ impl Helper {
// This signal should interrupt any blocking `read` call with
// `io::ErrorKind::Interrupt` and cause the thread to cleanly exit.
//
// Note that we don'tdo this forever though since there's a chance
// Note that we don't do this forever though since there's a chance
// of bugs, so only do this opportunistically to make a best effort
// at clearing ourselves up.
for _ in 0..100 {
Expand All @@ -264,7 +288,7 @@ impl Helper {
}

// If we managed to actually see the consumer get done, then we can
// definitely wait for the thread. Otherwise it's... of in the ether
// definitely wait for the thread. Otherwise it's... off in the ether
// I guess?
if state.consumer_done {
drop(self.thread.join());
Expand Down
3 changes: 1 addition & 2 deletions src/wasm.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

use std::io;
use std::process::Command;
use std::sync::{Arc, Condvar, Mutex};
Expand Down Expand Up @@ -76,7 +75,7 @@ pub(crate) fn spawn_helper(
mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
) -> io::Result<Helper> {
let thread = Builder::new().spawn(move || {
state.for_each_request(|| f(client.acquire()));
state.for_each_request(|_| f(client.acquire()));
})?;

Ok(Helper { thread: thread })
Expand Down
11 changes: 3 additions & 8 deletions src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ extern "system" {
lMaximumCount: LONG,
lpName: *const i8,
) -> HANDLE;
fn OpenSemaphoreA(
dwDesiredAccess: DWORD,
bInheritHandle: BOOL,
lpName: *const i8,
) -> HANDLE;
fn OpenSemaphoreA(dwDesiredAccess: DWORD, bInheritHandle: BOOL, lpName: *const i8) -> HANDLE;
fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD;
#[link_name = "SystemFunction036"]
fn RtlGenRandom(RandomBuffer: *mut u8, RandomBufferLength: u32) -> u8;
Expand Down Expand Up @@ -95,8 +91,7 @@ impl Client {
for _ in 0..100 {
let mut bytes = [0; 4];
getrandom(&mut bytes)?;
let mut name =
format!("__rust_jobserver_semaphore_{}\0", u32::from_ne_bytes(bytes));
let mut name = format!("__rust_jobserver_semaphore_{}\0", u32::from_ne_bytes(bytes));
unsafe {
let create_limit = if limit == 0 { 1 } else { limit };
let r = CreateSemaphoreA(
Expand Down Expand Up @@ -218,7 +213,7 @@ pub(crate) fn spawn_helper(
let event2 = event.clone();
let thread = Builder::new().spawn(move || {
let objects = [event2.0, client.inner.sem.0];
state.for_each_request(|| {
state.for_each_request(|_| {
const WAIT_OBJECT_1: u32 = WAIT_OBJECT_0 + 1;
match unsafe { WaitForMultipleObjects(2, objects.as_ptr(), FALSE, INFINITE) } {
WAIT_OBJECT_0 => return,
Expand Down
37 changes: 33 additions & 4 deletions tests/helper.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
extern crate jobserver;

use std::sync::mpsc;

use jobserver::Client;
use std::sync::atomic::*;
use std::sync::mpsc;
use std::sync::*;

macro_rules! t {
($e:expr) => {
Expand Down Expand Up @@ -46,3 +45,33 @@ fn acquire() {
helper.request_token();
drop(helper);
}

#[test]
fn prompt_shutdown() {
for _ in 0..100 {
let client = jobserver::Client::new(4).unwrap();
let count = Arc::new(AtomicU32::new(0));
let count2 = count.clone();
let tokens = Arc::new(Mutex::new(Vec::new()));
let helper = client
.into_helper_thread(move |token| {
tokens.lock().unwrap().push(token);
count2.fetch_add(1, Ordering::SeqCst);
})
.unwrap();

// Request more tokens than what are available.
for _ in 0..5 {
helper.request_token();
}
// Wait for at least some of the requests to finish.
while count.load(Ordering::SeqCst) < 3 {
std::thread::yield_now();
}
// Drop helper
let t = std::time::Instant::now();
drop(helper);
let d = t.elapsed();
assert!(d.as_secs_f64() < 0.5);
}
}

0 comments on commit f3b12a6

Please sign in to comment.