diff --git a/src/rust/engine/process_execution/src/bounded.rs b/src/rust/engine/process_execution/src/bounded.rs index 77bd8f4ae10..22072edd917 100644 --- a/src/rust/engine/process_execution/src/bounded.rs +++ b/src/rust/engine/process_execution/src/bounded.rs @@ -151,6 +151,10 @@ impl crate::CommandRunner for CommandRunner { } } } + + async fn shutdown(&self) -> Result<(), String> { + self.inner.shutdown().await + } } /// A wrapped Semaphore which adds concurrency metadata which supports overcommit. diff --git a/src/rust/engine/process_execution/src/cache.rs b/src/rust/engine/process_execution/src/cache.rs index 66ded303c0b..5b9ce805653 100644 --- a/src/rust/engine/process_execution/src/cache.rs +++ b/src/rust/engine/process_execution/src/cache.rs @@ -158,6 +158,10 @@ impl crate::CommandRunner for CommandRunner { } Ok(result) } + + async fn shutdown(&self) -> Result<(), String> { + self.inner.shutdown().await + } } impl CommandRunner { diff --git a/src/rust/engine/process_execution/src/docker.rs b/src/rust/engine/process_execution/src/docker.rs index ab5eff467ea..a2d09370d60 100644 --- a/src/rust/engine/process_execution/src/docker.rs +++ b/src/rust/engine/process_execution/src/docker.rs @@ -285,7 +285,6 @@ impl CommandRunner { &named_caches, &immutable_inputs, image_pull_policy, - executor.clone(), )?; Ok(CommandRunner { @@ -413,6 +412,10 @@ impl super::CommandRunner for CommandRunner { ) .await } + + async fn shutdown(&self) -> Result<(), String> { + self.container_cache.shutdown().await + } } #[async_trait] @@ -540,7 +543,6 @@ struct ContainerCache { named_caches_base_dir: String, immutable_inputs_base_dir: String, image_pull_cache: ImagePullCache, - executor: Executor, /// Cache that maps image name to container ID. async_oncecell::OnceCell is used so that /// multiple tasks trying to access an initializing container do not try to start multiple /// containers. @@ -555,7 +557,6 @@ impl ContainerCache { named_caches: &NamedCaches, immutable_inputs: &ImmutableInputs, image_pull_policy: ImagePullPolicy, - executor: Executor, ) -> Result { let work_dir_base = work_dir_base .to_path_buf() @@ -599,7 +600,6 @@ impl ContainerCache { immutable_inputs_base_dir, image_pull_cache: ImagePullCache::new(image_pull_policy), containers: Mutex::default(), - executor, }) } @@ -724,37 +724,39 @@ impl ContainerCache { Ok(container_id.to_owned()) } -} -impl Drop for ContainerCache { - fn drop(&mut self) { - let executor = self.executor.clone(); - let container_ids = self.containers.lock().keys().cloned().collect::>(); - let docker = self.docker.clone(); - executor.enter(move || { - let join_fut = tokio::spawn(async move { - let docker = match docker.get().await { - Ok(d) => d, - Err(err) => { - log::warn!("Failed to get Docker connection during container removal: {err}"); - return; - } - }; + pub async fn shutdown(&self) -> Result<(), String> { + let docker = match self.docker.get().await { + Ok(d) => d, + Err(err) => { + return Err(format!( + "Failed to get Docker connection during container removal: {err}" + )) + } + }; - let removal_futures = container_ids.into_iter().map(|(id, _platform)| async move { - let remove_options = RemoveContainerOptions { - force: true, - ..RemoveContainerOptions::default() - }; - let remove_result = docker.remove_container(&id, Some(remove_options)).await; - if let Err(err) = remove_result { - log::warn!("Failed to remove Docker container `{id}`: {err:?}"); - } - }); + #[allow(clippy::needless_collect)] + // allow is necessary otherwise will get "temporary value dropped while borrowed" error + let container_ids = self + .containers + .lock() + .values() + .flat_map(|v| v.get()) + .cloned() + .collect::>(); - let _ = futures::future::join_all(removal_futures).await; - }); - let _ = tokio::task::block_in_place(|| futures::executor::block_on(join_fut)); + let removal_futures = container_ids.into_iter().map(|id| async move { + let remove_options = RemoveContainerOptions { + force: true, + ..RemoveContainerOptions::default() + }; + docker + .remove_container(&id, Some(remove_options)) + .await + .map_err(|err| format!("Failed to remove Docker container `{id}`: {err:?}")) }); + + futures::future::try_join_all(removal_futures).await?; + Ok(()) } } diff --git a/src/rust/engine/process_execution/src/docker_tests.rs b/src/rust/engine/process_execution/src/docker_tests.rs index dce5bda9e1f..704b1397548 100644 --- a/src/rust/engine/process_execution/src/docker_tests.rs +++ b/src/rust/engine/process_execution/src/docker_tests.rs @@ -741,13 +741,19 @@ async fn run_command_via_docker_in_dir( cleanup, ImagePullPolicy::IfMissing, )?; - let original = runner.run(Context::default(), workunit, req.into()).await?; - let stdout_bytes = store - .load_file_bytes_with(original.stdout_digest, |bytes| bytes.to_vec()) - .await?; - let stderr_bytes = store - .load_file_bytes_with(original.stderr_digest, |bytes| bytes.to_vec()) - .await?; + let result: Result<_, ProcessError> = async { + let original = runner.run(Context::default(), workunit, req.into()).await?; + let stdout_bytes = store + .load_file_bytes_with(original.stdout_digest, |bytes| bytes.to_vec()) + .await?; + let stderr_bytes = store + .load_file_bytes_with(original.stderr_digest, |bytes| bytes.to_vec()) + .await?; + Ok((original, stdout_bytes, stderr_bytes)) + } + .await; + let (original, stdout_bytes, stderr_bytes) = result?; + runner.shutdown().await?; Ok(LocalTestResult { original, stdout_bytes, diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index 58d3f761128..4aea503b6b0 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -31,6 +31,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::convert::{TryFrom, TryInto}; use std::fmt::{self, Debug, Display}; use std::path::PathBuf; +use std::sync::Arc; use async_trait::async_trait; use concrete_time::{Duration, TimeSpan}; @@ -880,6 +881,9 @@ pub trait CommandRunner: Send + Sync + Debug { workunit: &mut RunningWorkunit, req: Process, ) -> Result; + + /// Shutdown this CommandRunner cleanly. + async fn shutdown(&self) -> Result<(), String>; } #[async_trait] @@ -892,6 +896,26 @@ impl CommandRunner for Box { ) -> Result { (**self).run(context, workunit, req).await } + + async fn shutdown(&self) -> Result<(), String> { + (**self).shutdown().await + } +} + +#[async_trait] +impl CommandRunner for Arc { + async fn run( + &self, + context: Context, + workunit: &mut RunningWorkunit, + req: Process, + ) -> Result { + (**self).run(context, workunit, req).await + } + + async fn shutdown(&self) -> Result<(), String> { + (**self).shutdown().await + } } // TODO(#8513) possibly move to the MEPR struct, or to the hashing crate? diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index 5ae14744c86..be6e92d5186 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -325,6 +325,10 @@ impl super::CommandRunner for CommandRunner { ) .await } + + async fn shutdown(&self) -> Result<(), String> { + Ok(()) + } } #[async_trait] diff --git a/src/rust/engine/process_execution/src/nailgun/mod.rs b/src/rust/engine/process_execution/src/nailgun/mod.rs index 9d50864b495..02252f85412 100644 --- a/src/rust/engine/process_execution/src/nailgun/mod.rs +++ b/src/rust/engine/process_execution/src/nailgun/mod.rs @@ -214,6 +214,10 @@ impl super::CommandRunner for CommandRunner { ) .await } + + async fn shutdown(&self) -> Result<(), String> { + Ok(()) + } } #[async_trait] diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index cf171d791b3..49fb937be0c 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -822,6 +822,10 @@ impl crate::CommandRunner for CommandRunner { ) .await } + + async fn shutdown(&self) -> Result<(), String> { + Ok(()) + } } fn maybe_add_workunit( diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs index 488fcf90321..6629e7021a1 100644 --- a/src/rust/engine/process_execution/src/remote_cache.rs +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -493,6 +493,10 @@ impl crate::CommandRunner for CommandRunner { Ok(result) } + + async fn shutdown(&self) -> Result<(), String> { + self.inner.shutdown().await + } } /// Check the remote Action Cache for a cached result of running the given `command` and the Action diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 0ae3c2b366f..c6a4ec78006 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -68,6 +68,10 @@ impl CommandRunnerTrait for MockLocalCommandRunner { self.call_counter.fetch_add(1, Ordering::SeqCst); self.result.clone() } + + async fn shutdown(&self) -> Result<(), String> { + Ok(()) + } } // NB: We bundle these into a struct to ensure they share the same lifetime. @@ -576,6 +580,10 @@ async fn make_action_result_basic() { ) -> Result { unimplemented!() } + + async fn shutdown(&self) -> Result<(), String> { + Ok(()) + } } let store_dir = TempDir::new().unwrap(); diff --git a/src/rust/engine/process_execution/src/switched.rs b/src/rust/engine/process_execution/src/switched.rs index 1d8d5ba8218..297f10ff99c 100644 --- a/src/rust/engine/process_execution/src/switched.rs +++ b/src/rust/engine/process_execution/src/switched.rs @@ -54,6 +54,13 @@ where self.false_runner.run(context, workunit, req).await } } + + async fn shutdown(&self) -> Result<(), String> { + let true_runner_shutdown_fut = self.true_runner.shutdown(); + let false_runner_shutdown_fut = self.false_runner.shutdown(); + futures::try_join!(true_runner_shutdown_fut, false_runner_shutdown_fut)?; + Ok(()) + } } #[cfg(test)] @@ -78,6 +85,10 @@ mod tests { ) -> Result { self.0.clone() } + + async fn shutdown(&self) -> Result<(), String> { + Ok(()) + } } #[tokio::test] diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index cd2331dd04a..fb607f1d69d 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -21,6 +21,7 @@ use crate::types::Types; use async_oncecell::OnceCell; use cache::PersistentCache; use fs::{safe_create_dir_all_ioerror, GitignoreStyleExcludes, PosixFS}; +use futures::FutureExt; use graph::{self, EntryId, Graph, InvalidationResult, NodeContext}; use hashing::Digest; use log::info; @@ -632,6 +633,19 @@ impl Core { } // Then clear the Graph to ensure that drop handlers run (particularly for running processes). self.graph.clear(); + + // Allow command runners to cleanly shutdown in an async context to avoid issues with + // waiting for async code to run in a non-async drop context. + let shutdown_futures = self + .command_runners + .iter() + .map(|runner| runner.shutdown().boxed()); + let shutdown_results = futures::future::join_all(shutdown_futures).await; + for shutfdown_result in shutdown_results { + if let Err(err) = shutfdown_result { + log::warn!("Command runner failed to shutdown cleanly: {err}"); + } + } } }