diff --git a/conmon-rs/server/src/child_reaper.rs b/conmon-rs/server/src/child_reaper.rs index 3ff22816e6..a2ef7c2135 100644 --- a/conmon-rs/server/src/child_reaper.rs +++ b/conmon-rs/server/src/child_reaper.rs @@ -2,6 +2,7 @@ use crate::{ child::Child, container_io::{ContainerIO, ContainerIOType, SharedContainerIO}, + inactivity::{Activity, Inactivity}, oom_watcher::OOMWatcher, }; use anyhow::{bail, format_err, Context, Result}; @@ -35,10 +36,13 @@ use tokio::{ use tokio_util::sync::CancellationToken; use tracing::{debug, debug_span, error, warn, Instrument}; -#[derive(Debug, Default, Getters)] +#[derive(Debug, Clone, Getters)] pub struct ChildReaper { #[getset(get)] grandchildren: Arc>>, + + #[getset(get)] + inactivity: Inactivity, } macro_rules! lock { @@ -48,6 +52,13 @@ macro_rules! lock { } impl ChildReaper { + pub fn new(inactivity: Inactivity) -> ChildReaper { + Self { + grandchildren: Arc::default(), + inactivity, + } + } + pub fn get(&self, id: &str) -> Result { let locked_grandchildren = &self.grandchildren().clone(); let lock = lock!(locked_grandchildren); @@ -137,7 +148,7 @@ impl ChildReaper { pub fn watch_grandchild(&self, child: Child) -> Result> { let locked_grandchildren = &self.grandchildren().clone(); let mut map = lock!(locked_grandchildren); - let mut reapable_grandchild = ReapableChild::from_child(&child); + let mut reapable_grandchild = ReapableChild::from_child(&child, self.inactivity.activity()); let (exit_tx, exit_rx) = reapable_grandchild.watch()?; @@ -233,6 +244,8 @@ pub struct ReapableChild { #[getset(get = "pub")] cleanup_cmd: Vec, + + activity: Activity, } #[derive(Clone, CopyGetters, Debug, Getters, Setters)] @@ -248,7 +261,7 @@ pub struct ExitChannelData { } impl ReapableChild { - pub fn from_child(child: &Child) -> Self { + pub fn from_child(child: &Child, activity: Activity) -> Self { Self { exit_paths: child.exit_paths().clone(), oom_exit_paths: child.oom_exit_paths().clone(), @@ -258,6 +271,7 @@ impl ReapableChild { token: child.token().clone(), task: None, cleanup_cmd: child.cleanup_cmd().to_vec(), + activity, } } @@ -288,6 +302,8 @@ impl ReapableChild { let stop_token = self.token().clone(); let cleanup_cmd_raw = self.cleanup_cmd().clone(); + let activity = self.activity.clone(); + let task = task::spawn( async move { debug!("Running task"); @@ -341,7 +357,7 @@ impl ReapableChild { } if !cleanup_cmd_raw.is_empty() { - Self::spawn_cleanup_process(&cleanup_cmd_raw).await; + Self::spawn_cleanup_process(&cleanup_cmd_raw, activity.clone()).await; } debug!("Sending exit struct to channel: {:?}", exit_channel_data); @@ -349,6 +365,8 @@ impl ReapableChild { debug!("Unable to send exit status"); } debug!("Task done"); + + activity.stop(); } .instrument(debug_span!("watch", pid)), ); @@ -363,7 +381,7 @@ impl ReapableChild { Ok((exit_tx, exit_rx)) } - async fn spawn_cleanup_process(raw_cmd: &[String]) { + async fn spawn_cleanup_process(raw_cmd: &[String], activity: Activity) { let mut cleanup_cmd = Command::new(&raw_cmd[0]); cleanup_cmd.args(&raw_cmd[1..]); @@ -380,6 +398,7 @@ impl ReapableChild { e ), } + activity.stop(); }); } diff --git a/conmon-rs/server/src/config.rs b/conmon-rs/server/src/config.rs index aa1b3b0d2d..95303609c7 100644 --- a/conmon-rs/server/src/config.rs +++ b/conmon-rs/server/src/config.rs @@ -12,7 +12,7 @@ macro_rules! prefix { }; } -#[derive(CopyGetters, Debug, Deserialize, Eq, Getters, Parser, PartialEq, Serialize, Setters)] +#[derive(CopyGetters, Debug, Deserialize, Getters, Parser, PartialEq, Serialize, Setters)] #[serde(rename_all = "kebab-case")] #[command( after_help("More info at: https://github.com/containers/conmon-rs"), @@ -132,6 +132,15 @@ pub struct Config { )] /// OpenTelemetry GRPC endpoint to be used for tracing. tracing_endpoint: String, + + #[get_copy = "pub"] + #[arg( + env(concat!(prefix!(), "SHUTDOWN_DELAY")), + long("shutdown-delay"), + value_name("DELAY"), + )] + /// Automatically stop conmon-rs after DELAY seconds of inactivity. (0 = disabled) + shutdown_delay: f64, } #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, Subcommand)] diff --git a/conmon-rs/server/src/inactivity.rs b/conmon-rs/server/src/inactivity.rs new file mode 100644 index 0000000000..51c4530a74 --- /dev/null +++ b/conmon-rs/server/src/inactivity.rs @@ -0,0 +1,124 @@ +use std::{ + future, process, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + +use tokio::sync::{futures::Notified, Notify}; + +/// Track activity and reacto to inactivity. +/// +/// Can be used to exit accept loops after inactivity. +#[derive(Debug, Clone)] +pub struct Inactivity(Option>); + +impl Inactivity { + /// Create a new inactivity tracker. + pub fn new() -> Self { + Self(Some(Arc::default())) + } + + /// Create a disabled inactivity tracker. + /// + /// The wait function will never return. There is always activity. + pub const fn disabled() -> Self { + Self(None) + } + + /// Start tracking an activity. + pub fn activity(&self) -> Activity { + Activity(self.0.as_ref().map(Inner::increment)) + } + + /// Async "block" until there is no activity and then wait for an additional`timeout`. + pub async fn wait(&self, timeout: Duration) { + if let Some(inner) = &self.0 { + loop { + let changed = inner.changed(); + if inner.no_activity() { + let _ = tokio::time::timeout(timeout, changed).await; + if inner.no_activity() { + break; + } + } else { + changed.await + } + } + } else { + future::pending().await + } + } +} + +/// Track an activity. Can be cloned to track additional activities. +/// +/// The Activity stops on drop. +#[derive(Debug)] +pub struct Activity(Option>); + +impl Activity { + /// Explicitly stop an activity. Just a wrapper for `drop(activity)`. + pub fn stop(self) { + // nothing to to, we take self by value + } +} + +impl Clone for Activity { + fn clone(&self) -> Self { + Self(self.0.as_ref().map(Inner::increment)) + } +} + +impl Drop for Activity { + fn drop(&mut self) { + if let Some(inner) = &self.0 { + inner.decrement(); + } + } +} + +#[derive(Debug, Default)] +struct Inner { + // number of current activites + active: AtomicUsize, + // gets notofied whenever the result of `no_activity` changes + notify: Notify, +} + +impl Inner { + /// Abort if more then isize::MAX activities are active. + /// + /// This prevents integer overflow of the `active` counter in case + /// someone is `mem::forget`ing activities. + /// + /// The same logic is applied internaly by the `Arc` implementation. + const MAX_ACTIVE: usize = isize::MAX as usize; + + fn increment(self: &Arc) -> Arc { + match self.active.fetch_add(1, Ordering::Relaxed) { + 0 => self.notify.notify_waiters(), + 1..=Self::MAX_ACTIVE => {} + _ => process::abort(), + } + self.clone() + } + + fn decrement(&self) { + match self.active.fetch_sub(1, Ordering::Relaxed) { + 1 => self.notify.notify_waiters(), + 2..=Self::MAX_ACTIVE => {} + _ => process::abort(), + } + } + + fn no_activity(&self) -> bool { + self.active.load(Ordering::Relaxed) == 0 + } + + fn changed(&self) -> Notified { + self.notify.notified() + } +} diff --git a/conmon-rs/server/src/lib.rs b/conmon-rs/server/src/lib.rs index ea56070b27..cd401cc919 100644 --- a/conmon-rs/server/src/lib.rs +++ b/conmon-rs/server/src/lib.rs @@ -12,6 +12,7 @@ mod config; mod container_io; mod container_log; mod cri_logger; +mod inactivity; mod init; mod journal; mod listener; diff --git a/conmon-rs/server/src/server.rs b/conmon-rs/server/src/server.rs index 3822f3dfd6..5bc883f47d 100644 --- a/conmon-rs/server/src/server.rs +++ b/conmon-rs/server/src/server.rs @@ -4,6 +4,7 @@ use crate::{ child_reaper::ChildReaper, config::{CgroupManager, Commands, Config, LogDriver, Verbosity}, container_io::{ContainerIO, ContainerIOType}, + inactivity::Inactivity, init::{DefaultInit, Init}, journal::Journal, listener::{DefaultListener, Listener}, @@ -15,7 +16,7 @@ use anyhow::{format_err, Context, Result}; use capnp::text_list::Reader; use capnp_rpc::{rpc_twoparty_capnp::Side, twoparty, RpcSystem}; use conmon_common::conmon_capnp::conmon; -use futures::{AsyncReadExt, FutureExt}; +use futures::AsyncReadExt; use getset::Getters; use nix::{ errno, @@ -24,7 +25,7 @@ use nix::{ unistd::{fork, ForkResult}, }; use opentelemetry::trace::FutureExt as OpenTelemetryFutureExt; -use std::{fs::File, io::Write, path::Path, process, str::FromStr, sync::Arc}; +use std::{fs::File, io::Write, path::Path, process, str::FromStr, time::Duration}; use tokio::{ fs, runtime::{Builder, Handle}, @@ -47,15 +48,26 @@ pub struct Server { /// Child reaper instance. #[getset(get = "pub(crate)")] - reaper: Arc, + reaper: ChildReaper, + + // Shutdown controller. + #[getset(get = "pub(crate)")] + inactivity: Inactivity, } impl Server { /// Create a new `Server` instance. pub fn new() -> Result { + let config = Config::default(); + let inactivity = if config.shutdown_delay() == 0.0 { + Inactivity::disabled() + } else { + Inactivity::new() + }; let server = Self { - config: Default::default(), - reaper: Default::default(), + config, + reaper: ChildReaper::new(inactivity.clone()), + inactivity, }; if let Some(v) = server.config().version() { @@ -214,7 +226,7 @@ impl Server { } async fn start_signal_handler>( - reaper: Arc, + reaper: ChildReaper, socket: T, shutdown_tx: oneshot::Sender<()>, ) -> Result<()> { @@ -254,6 +266,8 @@ impl Server { } async fn start_backend(self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> { + let inactivity = self.inactivity().clone(); + let timeout = Duration::from_secs_f64(self.config().shutdown_delay()); let listener = Listener::::default().bind_long_path(self.config().socket())?; let client: conmon::Client = capnp_rpc::new_client(self); @@ -264,6 +278,10 @@ impl Server { debug!("Received shutdown message"); return Ok(()) } + () = inactivity.wait(timeout) => { + debug!("Automatic shutdown after inactivity"); + return Ok(()) + } stream = listener.accept() => { stream?.0 }, @@ -276,7 +294,11 @@ impl Server { Default::default(), )); let rpc_system = RpcSystem::new(network, Some(client.clone().client)); - task::spawn_local(Box::pin(rpc_system.map(|_| ()))); + let activity = inactivity.activity(); + task::spawn_local(async move { + let _ = rpc_system.await; + drop(activity); + }); } } diff --git a/pkg/client/client.go b/pkg/client/client.go index 2f837a9978..84cae73573 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -334,6 +334,8 @@ func (c *ConmonClient) toArgs(config *ConmonServerConfig) (entrypoint string, ar } } + args = append(args, "--shutdown-delay", "10") + return entrypoint, args, nil }