Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add shutdown method to CommandRunner #16930

Merged
merged 5 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ impl crate::CommandRunner for CommandRunner {
}
}
}

async fn shutdown(&self) -> Result<(), String> {
self.inner.shutdown().await
}
}

/// A wrapped Semaphore which adds concurrency metadata which supports overcommit.
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ impl crate::CommandRunner for CommandRunner {
}
Ok(result)
}

async fn shutdown(&self) -> Result<(), String> {
self.inner.shutdown().await
}
}

impl CommandRunner {
Expand Down
66 changes: 34 additions & 32 deletions src/rust/engine/process_execution/src/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ impl CommandRunner {
&named_caches,
&immutable_inputs,
image_pull_policy,
executor.clone(),
)?;

Ok(CommandRunner {
Expand Down Expand Up @@ -413,6 +412,10 @@ impl super::CommandRunner for CommandRunner {
)
.await
}

async fn shutdown(&self) -> Result<(), String> {
self.container_cache.shutdown().await
}
}

#[async_trait]
Expand Down Expand Up @@ -540,7 +543,6 @@ struct ContainerCache {
named_caches_base_dir: String,
immutable_inputs_base_dir: String,
image_pull_cache: ImagePullCache,
executor: Executor,
/// Cache that maps image name to container ID. async_oncecell::OnceCell is used so that
/// multiple tasks trying to access an initializing container do not try to start multiple
/// containers.
Expand All @@ -555,7 +557,6 @@ impl ContainerCache {
named_caches: &NamedCaches,
immutable_inputs: &ImmutableInputs,
image_pull_policy: ImagePullPolicy,
executor: Executor,
) -> Result<Self, String> {
let work_dir_base = work_dir_base
.to_path_buf()
Expand Down Expand Up @@ -599,7 +600,6 @@ impl ContainerCache {
immutable_inputs_base_dir,
image_pull_cache: ImagePullCache::new(image_pull_policy),
containers: Mutex::default(),
executor,
})
}

Expand Down Expand Up @@ -724,37 +724,39 @@ impl ContainerCache {

Ok(container_id.to_owned())
}
}

impl Drop for ContainerCache {
fn drop(&mut self) {
let executor = self.executor.clone();
let container_ids = self.containers.lock().keys().cloned().collect::<Vec<_>>();
let docker = self.docker.clone();
executor.enter(move || {
let join_fut = tokio::spawn(async move {
let docker = match docker.get().await {
Ok(d) => d,
Err(err) => {
log::warn!("Failed to get Docker connection during container removal: {err}");
return;
}
};
pub async fn shutdown(&self) -> Result<(), String> {
let docker = match self.docker.get().await {
Ok(d) => d,
Err(err) => {
return Err(format!(
"Failed to get Docker connection during container removal: {err}"
))
}
};

let removal_futures = container_ids.into_iter().map(|(id, _platform)| async move {
let remove_options = RemoveContainerOptions {
force: true,
..RemoveContainerOptions::default()
};
let remove_result = docker.remove_container(&id, Some(remove_options)).await;
if let Err(err) = remove_result {
log::warn!("Failed to remove Docker container `{id}`: {err:?}");
}
});
#[allow(clippy::needless_collect)]
// allow is necessary otherwise will get "temporary value dropped while borrowed" error
let container_ids = self
.containers
.lock()
.values()
.flat_map(|v| v.get())
.cloned()
.collect::<Vec<_>>();

let _ = futures::future::join_all(removal_futures).await;
});
let _ = tokio::task::block_in_place(|| futures::executor::block_on(join_fut));
let removal_futures = container_ids.into_iter().map(|id| async move {
let remove_options = RemoveContainerOptions {
force: true,
..RemoveContainerOptions::default()
};
docker
.remove_container(&id, Some(remove_options))
.await
.map_err(|err| format!("Failed to remove Docker container `{id}`: {err:?}"))
});

futures::future::try_join_all(removal_futures).await?;
Ok(())
}
}
20 changes: 13 additions & 7 deletions src/rust/engine/process_execution/src/docker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,13 +741,19 @@ async fn run_command_via_docker_in_dir(
cleanup,
ImagePullPolicy::IfMissing,
)?;
let original = runner.run(Context::default(), workunit, req.into()).await?;
let stdout_bytes = store
.load_file_bytes_with(original.stdout_digest, |bytes| bytes.to_vec())
.await?;
let stderr_bytes = store
.load_file_bytes_with(original.stderr_digest, |bytes| bytes.to_vec())
.await?;
let result: Result<_, ProcessError> = async {
let original = runner.run(Context::default(), workunit, req.into()).await?;
let stdout_bytes = store
.load_file_bytes_with(original.stdout_digest, |bytes| bytes.to_vec())
.await?;
let stderr_bytes = store
.load_file_bytes_with(original.stderr_digest, |bytes| bytes.to_vec())
.await?;
Ok((original, stdout_bytes, stderr_bytes))
}
.await;
let (original, stdout_bytes, stderr_bytes) = result?;
runner.shutdown().await?;
tdyas marked this conversation as resolved.
Show resolved Hide resolved
Ok(LocalTestResult {
original,
stdout_bytes,
Expand Down
24 changes: 24 additions & 0 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::convert::{TryFrom, TryInto};
use std::fmt::{self, Debug, Display};
use std::path::PathBuf;
use std::sync::Arc;

use async_trait::async_trait;
use concrete_time::{Duration, TimeSpan};
Expand Down Expand Up @@ -880,6 +881,9 @@ pub trait CommandRunner: Send + Sync + Debug {
workunit: &mut RunningWorkunit,
req: Process,
) -> Result<FallibleProcessResultWithPlatform, ProcessError>;

/// Shutdown this CommandRunner cleanly.
async fn shutdown(&self) -> Result<(), String>;
}

#[async_trait]
Expand All @@ -892,6 +896,26 @@ impl<T: CommandRunner + ?Sized> CommandRunner for Box<T> {
) -> Result<FallibleProcessResultWithPlatform, ProcessError> {
(**self).run(context, workunit, req).await
}

async fn shutdown(&self) -> Result<(), String> {
(**self).shutdown().await
}
}

#[async_trait]
impl<T: CommandRunner + ?Sized> CommandRunner for Arc<T> {
async fn run(
&self,
context: Context,
workunit: &mut RunningWorkunit,
req: Process,
) -> Result<FallibleProcessResultWithPlatform, ProcessError> {
(**self).run(context, workunit, req).await
}

async fn shutdown(&self) -> Result<(), String> {
(**self).shutdown().await
}
}

// TODO(#8513) possibly move to the MEPR struct, or to the hashing crate?
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ impl super::CommandRunner for CommandRunner {
)
.await
}

async fn shutdown(&self) -> Result<(), String> {
Ok(())
}
}

#[async_trait]
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/nailgun/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ impl super::CommandRunner for CommandRunner {
)
.await
}

async fn shutdown(&self) -> Result<(), String> {
Ok(())
}
}

#[async_trait]
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,10 @@ impl crate::CommandRunner for CommandRunner {
)
.await
}

async fn shutdown(&self) -> Result<(), String> {
Ok(())
}
}

fn maybe_add_workunit(
Expand Down
4 changes: 4 additions & 0 deletions src/rust/engine/process_execution/src/remote_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ impl crate::CommandRunner for CommandRunner {

Ok(result)
}

async fn shutdown(&self) -> Result<(), String> {
self.inner.shutdown().await
}
}

/// Check the remote Action Cache for a cached result of running the given `command` and the Action
Expand Down
8 changes: 8 additions & 0 deletions src/rust/engine/process_execution/src/remote_cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ impl CommandRunnerTrait for MockLocalCommandRunner {
self.call_counter.fetch_add(1, Ordering::SeqCst);
self.result.clone()
}

async fn shutdown(&self) -> Result<(), String> {
Ok(())
}
}

// NB: We bundle these into a struct to ensure they share the same lifetime.
Expand Down Expand Up @@ -576,6 +580,10 @@ async fn make_action_result_basic() {
) -> Result<FallibleProcessResultWithPlatform, ProcessError> {
unimplemented!()
}

async fn shutdown(&self) -> Result<(), String> {
Ok(())
}
}

let store_dir = TempDir::new().unwrap();
Expand Down
11 changes: 11 additions & 0 deletions src/rust/engine/process_execution/src/switched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ where
self.false_runner.run(context, workunit, req).await
}
}

async fn shutdown(&self) -> Result<(), String> {
let true_runner_shutdown_fut = self.true_runner.shutdown();
let false_runner_shutdown_fut = self.false_runner.shutdown();
futures::try_join!(true_runner_shutdown_fut, false_runner_shutdown_fut)?;
Ok(())
}
}

#[cfg(test)]
Expand All @@ -78,6 +85,10 @@ mod tests {
) -> Result<FallibleProcessResultWithPlatform, ProcessError> {
self.0.clone()
}

async fn shutdown(&self) -> Result<(), String> {
Ok(())
}
}

#[tokio::test]
Expand Down
14 changes: 14 additions & 0 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::types::Types;
use async_oncecell::OnceCell;
use cache::PersistentCache;
use fs::{safe_create_dir_all_ioerror, GitignoreStyleExcludes, PosixFS};
use futures::FutureExt;
use graph::{self, EntryId, Graph, InvalidationResult, NodeContext};
use hashing::Digest;
use log::info;
Expand Down Expand Up @@ -632,6 +633,19 @@ impl Core {
}
// Then clear the Graph to ensure that drop handlers run (particularly for running processes).
self.graph.clear();

// Allow command runners to cleanly shutdown in an async context to avoid issues with
// waiting for async code to run in a non-async drop context.
let shutdown_futures = self
.command_runners
.iter()
.map(|runner| runner.shutdown().boxed());
let shutdown_results = futures::future::join_all(shutdown_futures).await;
for shutfdown_result in shutdown_results {
if let Err(err) = shutfdown_result {
log::warn!("Command runner failed to shutdown cleanly: {err}");
}
}
}
}

Expand Down