Skip to content

Commit

Permalink
Manually manage Delay's timer thread
Browse files Browse the repository at this point in the history
The current implemnetation isn't fork-safe; if we fork after executing a
process, the thread won't be started in the child process, and timers
won't run.
  • Loading branch information
illicitonion committed Dec 17, 2018
1 parent 4d9a0b5 commit 9ed6993
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 51 deletions.
10 changes: 7 additions & 3 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ bytes = "0.4.5"
fnv = "1.0.5"
fs = { path = "fs" }
futures = "^0.1.16"
# TODO: Switch to a release once https://github.com/alexcrichton/futures-timer/pull/11 and https://github.com/alexcrichton/futures-timer/pull/12 merge
futures-timer = { git = "https://github.com/pantsbuild/futures-timer", rev = "0b747e565309a58537807ab43c674d8951f9e5a0" }
graph = { path = "graph" }
hashing = { path = "hashing" }
indexmap = "1.0.2"
Expand Down
4 changes: 3 additions & 1 deletion src/rust/engine/process_execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ grpcio = { git = "https://github.com/pantsbuild/grpc-rs.git", rev = "4dfafe9355d
hashing = { path = "../hashing" }
log = "0.4"
protobuf = { version = "2.0.4", features = ["with-bytes"] }
resettable = { path = "../resettable" }
sha2 = "0.8"
tempfile = "3"
futures-timer = "0.1"
# TODO: Switch to a release once https://github.com/alexcrichton/futures-timer/pull/11 and https://github.com/alexcrichton/futures-timer/pull/12 merge
futures-timer = { git = "https://github.com/pantsbuild/futures-timer", rev = "0b747e565309a58537807ab43c674d8951f9e5a0" }
time = "0.1.40"
tokio-codec = "0.1"
tokio-process = "0.2.1"
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ extern crate log;
#[cfg(test)]
extern crate mock;
extern crate protobuf;
extern crate resettable;
extern crate sha2;
#[cfg(test)]
extern crate tempfile;
Expand Down
107 changes: 75 additions & 32 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct CommandRunner {
execution_client: Arc<bazel_protos::remote_execution_grpc::ExecutionClient>,
operations_client: Arc<bazel_protos::operations_grpc::OperationsClient>,
store: Store,
futures_timer_thread: resettable::Resettable<futures_timer::HelperThread>,
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -145,6 +146,7 @@ impl super::CommandRunner for CommandRunner {
let command_runner3 = self.clone();
let execute_request = Arc::new(execute_request);
let execute_request2 = execute_request.clone();
let futures_timer_thread = self.futures_timer_thread.clone();

let mut history = ExecutionHistory::default();

Expand Down Expand Up @@ -173,6 +175,7 @@ impl super::CommandRunner for CommandRunner {
let operations_client = operations_client.clone();
let command_runner2 = command_runner2.clone();
let command_runner3 = command_runner3.clone();
let futures_timer_thread = futures_timer_thread.clone();
let f = command_runner2.extract_execute_response(operation, &mut history);
f.map(future::Loop::Break).or_else(move |value| {
match value {
Expand Down Expand Up @@ -229,26 +232,26 @@ impl super::CommandRunner for CommandRunner {
)).to_boxed()
} else {
// maybe the delay here should be the min of remaining time and the backoff period
Delay::new(Duration::from_millis(backoff_period))
.map_err(move |e| {
format!(
"Future-Delay errored at operation result polling for {}, {}: {}",
operation_name, description, e
)
}).and_then(move |_| {
future::done(
operations_client
.get_operation_opt(
&operation_request,
command_runner3.call_option(),
).or_else(move |err| {
rpcerror_recover_cancelled(operation_request.take_name(), err)
}).map(OperationOrStatus::Operation)
.map_err(rpcerror_to_string),
).map(move |operation| {
future::Loop::Continue((history, operation, iter_num + 1))
}).to_boxed()
Delay::new_handle(
Instant::now() + Duration::from_millis(backoff_period),
futures_timer_thread.with(|thread| thread.handle()),
).map_err(move |e| {
format!(
"Future-Delay errored at operation result polling for {}, {}: {}",
operation_name, description, e
)
}).and_then(move |_| {
future::done(
operations_client
.get_operation_opt(&operation_request, command_runner3.call_option())
.or_else(move |err| {
rpcerror_recover_cancelled(operation_request.take_name(), err)
}).map(OperationOrStatus::Operation)
.map_err(rpcerror_to_string),
).map(move |operation| {
future::Loop::Continue((history, operation, iter_num + 1))
}).to_boxed()
}).to_boxed()
}
}
}
Expand Down Expand Up @@ -278,6 +281,7 @@ impl CommandRunner {
const BACKOFF_INCR_WAIT_MILLIS: u64 = 500;
const BACKOFF_MAX_WAIT_MILLIS: u64 = 5000;

#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub fn new(
address: &str,
cache_key_gen_version: Option<String>,
Expand All @@ -286,6 +290,7 @@ impl CommandRunner {
oauth_bearer_token: Option<String>,
thread_count: usize,
store: Store,
futures_timer_thread: resettable::Resettable<futures_timer::HelperThread>,
) -> CommandRunner {
let env = Arc::new(grpcio::Environment::new(thread_count));
let channel = {
Expand Down Expand Up @@ -318,6 +323,7 @@ impl CommandRunner {
execution_client,
operations_client,
store,
futures_timer_thread,
}
}

Expand Down Expand Up @@ -1351,7 +1357,16 @@ mod tests {
Duration::from_secs(1),
).expect("Failed to make store");

let cmd_runner = CommandRunner::new(&mock_server.address(), None, None, None, None, 1, store);
let cmd_runner = CommandRunner::new(
&mock_server.address(),
None,
None,
None,
None,
1,
store,
timer_thread(),
);
let result = cmd_runner.run(echo_roland_request()).wait().unwrap();
assert_eq!(
result.without_execution_attempts(),
Expand Down Expand Up @@ -1708,10 +1723,18 @@ mod tests {
.wait()
.expect("Saving file bytes to store");

let result = CommandRunner::new(&mock_server.address(), None, None, None, None, 1, store)
.run(cat_roland_request())
.wait()
.unwrap();
let result = CommandRunner::new(
&mock_server.address(),
None,
None,
None,
None,
1,
store,
timer_thread(),
).run(cat_roland_request())
.wait()
.unwrap();
assert_eq!(
result.without_execution_attempts(),
FallibleExecuteProcessResult {
Expand Down Expand Up @@ -1789,9 +1812,17 @@ mod tests {
.wait()
.expect("Saving file bytes to store");

let result = CommandRunner::new(&mock_server.address(), None, None, None, None, 1, store)
.run(cat_roland_request())
.wait();
let result = CommandRunner::new(
&mock_server.address(),
None,
None,
None,
None,
1,
store,
timer_thread(),
).run(cat_roland_request())
.wait();
assert_eq!(
result,
Ok(FallibleExecuteProcessResult {
Expand Down Expand Up @@ -1846,10 +1877,18 @@ mod tests {
Duration::from_secs(1),
).expect("Failed to make store");

let error = CommandRunner::new(&mock_server.address(), None, None, None, None, 1, store)
.run(cat_roland_request())
.wait()
.expect_err("Want error");
let error = CommandRunner::new(
&mock_server.address(),
None,
None,
None,
None,
1,
store,
timer_thread(),
).run(cat_roland_request())
.wait()
.expect_err("Want error");
assert_contains(&error, &format!("{}", missing_digest.0));
}

Expand Down Expand Up @@ -2418,7 +2457,11 @@ mod tests {
Duration::from_secs(1),
).expect("Failed to make store");

CommandRunner::new(&address, None, None, None, None, 1, store)
CommandRunner::new(&address, None, None, None, None, 1, store, timer_thread())
}

fn timer_thread() -> resettable::Resettable<futures_timer::HelperThread> {
resettable::Resettable::new(|| futures_timer::HelperThread::new().unwrap())
}

fn extract_execute_response(
Expand Down
3 changes: 3 additions & 0 deletions src/rust/engine/process_executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ publish = false
clap = "2"
env_logger = "0.5.4"
fs = { path = "../fs" }
# TODO: Switch to a release once https://github.com/alexcrichton/futures-timer/pull/11 and https://github.com/alexcrichton/futures-timer/pull/12 merge
futures-timer = { git = "https://github.com/pantsbuild/futures-timer", rev = "0b747e565309a58537807ab43c674d8951f9e5a0" }
hashing = { path = "../hashing" }
futures = "^0.1.16"
process_execution = { path = "../process_execution" }
resettable = { path = "../resettable" }
1 change: 1 addition & 0 deletions src/rust/engine/process_executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ fn main() {
oauth_bearer_token,
1,
store,
resettable::Resettable::new(|| futures_timer::HelperThread::new().unwrap()),
))
}
None => Box::new(process_execution::local::CommandRunner::new(
Expand Down
42 changes: 32 additions & 10 deletions src/rust/engine/resettable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,27 @@ use parking_lot::RwLock;
/// to reset any references which hide background threads, so that forked processes don't inherit
/// pointers to threads from the parent process which will not exist in the forked process.
///
#[derive(Clone)]
pub struct Resettable<T> {
val: Arc<RwLock<Option<T>>>,
make: Arc<Fn() -> T>,
}

// Sadly we need to manualy implement Clone because of https://github.com/rust-lang/rust/issues/26925
impl<T> Clone for Resettable<T> {
fn clone(&self) -> Self {
Resettable {
val: self.val.clone(),
make: self.make.clone(),
}
}
}

unsafe impl<T> Send for Resettable<T> {}
unsafe impl<T> Sync for Resettable<T> {}

impl<T> Resettable<T>
where
T: Clone + Send + Sync,
T: Send + Sync,
{
pub fn new<F: Fn() -> T + 'static>(make: F) -> Resettable<T> {
let val = (make)();
Expand All @@ -62,18 +71,12 @@ where
}
}

///
/// TODO: This probably needs to switch to a "with"/context-manager style pattern.
/// Having an externalized `get` and using Clone like this is problematic: if there
/// might be references/Clones of the field outside of the lock on the resource, then we can't
/// be sure that dropping it will actually deallocate the resource.
///
pub fn get(&self) -> T {
pub fn with<O, F: FnOnce(&T) -> O>(&self, f: F) -> O {
let val_opt = self.val.read();
let val = val_opt
.as_ref()
.unwrap_or_else(|| panic!("A Resettable value cannot be used while it is shutdown."));
val.clone()
f(val)
}

///
Expand All @@ -90,3 +93,22 @@ where
t
}
}

impl<T> Resettable<T>
where
T: Clone + Send + Sync,
{
///
/// Callers should probably use `with` rather than `get`.
/// Having an externalized `get` and using Clone like this is problematic: if there
/// might be references/Clones of the field outside of the lock on the resource, then we can't
/// be sure that dropping it will actually deallocate the resource.
///
pub fn get(&self) -> T {
let val_opt = self.val.read();
let val = val_opt
.as_ref()
.unwrap_or_else(|| panic!("A Resettable value cannot be used while it is shutdown."));
val.clone()
}
}
Loading

0 comments on commit 9ed6993

Please sign in to comment.