Skip to content

Commit

Permalink
add shutdown method to CommandRunner (#16930)
Browse files Browse the repository at this point in the history
The Docker command runner needs to cleanly remove cached containers when it shuts down. The removal API must be called in an async context, however, so the shutdown cannot be performed from drop handler since that is a non-async context.

Add a `shutdown` method to `CommandRunner` and implement it for all relevant command runners. For the Docker command runner, this means that the shutdown logic for `ContainerCache` will be invoked in an async context.

[ci skip-build-wheels]
  • Loading branch information
Tom Dyas authored Sep 21, 2022
1 parent 86295c1 commit 5b68eb0
Show file tree
Hide file tree
Showing 12 changed files with 128 additions and 39 deletions.
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ impl crate::CommandRunner for CommandRunner {
}
}
}

async fn shutdown(&self) -> Result<(), String> {
self.inner.shutdown().await
}
}

/// A wrapped Semaphore which adds concurrency metadata which supports overcommit.
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ impl crate::CommandRunner for CommandRunner {
}
Ok(result)
}

async fn shutdown(&self) -> Result<(), String> {
self.inner.shutdown().await
}
}

impl CommandRunner {
Expand Down
66 changes: 34 additions & 32 deletions src/rust/engine/process_execution/src/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ impl CommandRunner {
&named_caches,
&immutable_inputs,
image_pull_policy,
executor.clone(),
)?;

Ok(CommandRunner {
Expand Down Expand Up @@ -413,6 +412,10 @@ impl super::CommandRunner for CommandRunner {
)
.await
}

async fn shutdown(&self) -> Result<(), String> {
self.container_cache.shutdown().await
}
}

#[async_trait]
Expand Down Expand Up @@ -540,7 +543,6 @@ struct ContainerCache {
named_caches_base_dir: String,
immutable_inputs_base_dir: String,
image_pull_cache: ImagePullCache,
executor: Executor,
/// Cache that maps image name to container ID. async_oncecell::OnceCell is used so that
/// multiple tasks trying to access an initializing container do not try to start multiple
/// containers.
Expand All @@ -555,7 +557,6 @@ impl ContainerCache {
named_caches: &NamedCaches,
immutable_inputs: &ImmutableInputs,
image_pull_policy: ImagePullPolicy,
executor: Executor,
) -> Result<Self, String> {
let work_dir_base = work_dir_base
.to_path_buf()
Expand Down Expand Up @@ -599,7 +600,6 @@ impl ContainerCache {
immutable_inputs_base_dir,
image_pull_cache: ImagePullCache::new(image_pull_policy),
containers: Mutex::default(),
executor,
})
}

Expand Down Expand Up @@ -724,37 +724,39 @@ impl ContainerCache {

Ok(container_id.to_owned())
}
}

impl Drop for ContainerCache {
fn drop(&mut self) {
let executor = self.executor.clone();
let container_ids = self.containers.lock().keys().cloned().collect::<Vec<_>>();
let docker = self.docker.clone();
executor.enter(move || {
let join_fut = tokio::spawn(async move {
let docker = match docker.get().await {
Ok(d) => d,
Err(err) => {
log::warn!("Failed to get Docker connection during container removal: {err}");
return;
}
};
pub async fn shutdown(&self) -> Result<(), String> {
let docker = match self.docker.get().await {
Ok(d) => d,
Err(err) => {
return Err(format!(
"Failed to get Docker connection during container removal: {err}"
))
}
};

let removal_futures = container_ids.into_iter().map(|(id, _platform)| async move {
let remove_options = RemoveContainerOptions {
force: true,
..RemoveContainerOptions::default()
};
let remove_result = docker.remove_container(&id, Some(remove_options)).await;
if let Err(err) = remove_result {
log::warn!("Failed to remove Docker container `{id}`: {err:?}");
}
});
#[allow(clippy::needless_collect)]
// allow is necessary otherwise will get "temporary value dropped while borrowed" error
let container_ids = self
.containers
.lock()
.values()
.flat_map(|v| v.get())
.cloned()
.collect::<Vec<_>>();

let _ = futures::future::join_all(removal_futures).await;
});
let _ = tokio::task::block_in_place(|| futures::executor::block_on(join_fut));
let removal_futures = container_ids.into_iter().map(|id| async move {
let remove_options = RemoveContainerOptions {
force: true,
..RemoveContainerOptions::default()
};
docker
.remove_container(&id, Some(remove_options))
.await
.map_err(|err| format!("Failed to remove Docker container `{id}`: {err:?}"))
});

futures::future::try_join_all(removal_futures).await?;
Ok(())
}
}
20 changes: 13 additions & 7 deletions src/rust/engine/process_execution/src/docker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,13 +741,19 @@ async fn run_command_via_docker_in_dir(
cleanup,
ImagePullPolicy::IfMissing,
)?;
let original = runner.run(Context::default(), workunit, req.into()).await?;
let stdout_bytes = store
.load_file_bytes_with(original.stdout_digest, |bytes| bytes.to_vec())
.await?;
let stderr_bytes = store
.load_file_bytes_with(original.stderr_digest, |bytes| bytes.to_vec())
.await?;
let result: Result<_, ProcessError> = async {
let original = runner.run(Context::default(), workunit, req.into()).await?;
let stdout_bytes = store
.load_file_bytes_with(original.stdout_digest, |bytes| bytes.to_vec())
.await?;
let stderr_bytes = store
.load_file_bytes_with(original.stderr_digest, |bytes| bytes.to_vec())
.await?;
Ok((original, stdout_bytes, stderr_bytes))
}
.await;
let (original, stdout_bytes, stderr_bytes) = result?;
runner.shutdown().await?;
Ok(LocalTestResult {
original,
stdout_bytes,
Expand Down
24 changes: 24 additions & 0 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::convert::{TryFrom, TryInto};
use std::fmt::{self, Debug, Display};
use std::path::PathBuf;
use std::sync::Arc;

use async_trait::async_trait;
use concrete_time::{Duration, TimeSpan};
Expand Down Expand Up @@ -880,6 +881,9 @@ pub trait CommandRunner: Send + Sync + Debug {
workunit: &mut RunningWorkunit,
req: Process,
) -> Result<FallibleProcessResultWithPlatform, ProcessError>;

/// Shutdown this CommandRunner cleanly.
async fn shutdown(&self) -> Result<(), String>;
}

#[async_trait]
Expand All @@ -892,6 +896,26 @@ impl<T: CommandRunner + ?Sized> CommandRunner for Box<T> {
) -> Result<FallibleProcessResultWithPlatform, ProcessError> {
(**self).run(context, workunit, req).await
}

async fn shutdown(&self) -> Result<(), String> {
(**self).shutdown().await
}
}

#[async_trait]
impl<T: CommandRunner + ?Sized> CommandRunner for Arc<T> {
async fn run(
&self,
context: Context,
workunit: &mut RunningWorkunit,
req: Process,
) -> Result<FallibleProcessResultWithPlatform, ProcessError> {
(**self).run(context, workunit, req).await
}

async fn shutdown(&self) -> Result<(), String> {
(**self).shutdown().await
}
}

// TODO(#8513) possibly move to the MEPR struct, or to the hashing crate?
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ impl super::CommandRunner for CommandRunner {
)
.await
}

async fn shutdown(&self) -> Result<(), String> {
Ok(())
}
}

#[async_trait]
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/nailgun/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ impl super::CommandRunner for CommandRunner {
)
.await
}

async fn shutdown(&self) -> Result<(), String> {
Ok(())
}
}

#[async_trait]
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,10 @@ impl crate::CommandRunner for CommandRunner {
)
.await
}

async fn shutdown(&self) -> Result<(), String> {
Ok(())
}
}

fn maybe_add_workunit(
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/remote_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,10 @@ impl crate::CommandRunner for CommandRunner {

Ok(result)
}

async fn shutdown(&self) -> Result<(), String> {
self.inner.shutdown().await
}
}

/// Check the remote Action Cache for a cached result of running the given `command` and the Action
Expand Down
8 changes: 8 additions & 0 deletions src/rust/engine/process_execution/src/remote_cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ impl CommandRunnerTrait for MockLocalCommandRunner {
self.call_counter.fetch_add(1, Ordering::SeqCst);
self.result.clone()
}

async fn shutdown(&self) -> Result<(), String> {
Ok(())
}
}

// NB: We bundle these into a struct to ensure they share the same lifetime.
Expand Down Expand Up @@ -576,6 +580,10 @@ async fn make_action_result_basic() {
) -> Result<FallibleProcessResultWithPlatform, ProcessError> {
unimplemented!()
}

async fn shutdown(&self) -> Result<(), String> {
Ok(())
}
}

let store_dir = TempDir::new().unwrap();
Expand Down
11 changes: 11 additions & 0 deletions src/rust/engine/process_execution/src/switched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ where
self.false_runner.run(context, workunit, req).await
}
}

async fn shutdown(&self) -> Result<(), String> {
let true_runner_shutdown_fut = self.true_runner.shutdown();
let false_runner_shutdown_fut = self.false_runner.shutdown();
futures::try_join!(true_runner_shutdown_fut, false_runner_shutdown_fut)?;
Ok(())
}
}

#[cfg(test)]
Expand All @@ -78,6 +85,10 @@ mod tests {
) -> Result<FallibleProcessResultWithPlatform, ProcessError> {
self.0.clone()
}

async fn shutdown(&self) -> Result<(), String> {
Ok(())
}
}

#[tokio::test]
Expand Down
14 changes: 14 additions & 0 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::types::Types;
use async_oncecell::OnceCell;
use cache::PersistentCache;
use fs::{safe_create_dir_all_ioerror, GitignoreStyleExcludes, PosixFS};
use futures::FutureExt;
use graph::{self, EntryId, Graph, InvalidationResult, NodeContext};
use hashing::Digest;
use log::info;
Expand Down Expand Up @@ -632,6 +633,19 @@ impl Core {
}
// Then clear the Graph to ensure that drop handlers run (particularly for running processes).
self.graph.clear();

// Allow command runners to cleanly shutdown in an async context to avoid issues with
// waiting for async code to run in a non-async drop context.
let shutdown_futures = self
.command_runners
.iter()
.map(|runner| runner.shutdown().boxed());
let shutdown_results = futures::future::join_all(shutdown_futures).await;
for shutfdown_result in shutdown_results {
if let Err(err) = shutfdown_result {
log::warn!("Command runner failed to shutdown cleanly: {err}");
}
}
}
}

Expand Down

0 comments on commit 5b68eb0

Please sign in to comment.