Skip to content

Commit

Permalink
WIP shutdown delay
Browse files Browse the repository at this point in the history
  • Loading branch information
mgjm committed Jul 19, 2023
1 parent 1cc6c8a commit 994349a
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 13 deletions.
29 changes: 24 additions & 5 deletions conmon-rs/server/src/child_reaper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use crate::{
child::Child,
container_io::{ContainerIO, ContainerIOType, SharedContainerIO},
inactivity::{Activity, Inactivity},
oom_watcher::OOMWatcher,
};
use anyhow::{bail, format_err, Context, Result};
Expand Down Expand Up @@ -35,10 +36,13 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, warn, Instrument};

#[derive(Debug, Default, Getters)]
#[derive(Debug, Clone, Getters)]
pub struct ChildReaper {
#[getset(get)]
grandchildren: Arc<Mutex<MultiMap<String, ReapableChild>>>,

#[getset(get)]
inactivity: Inactivity,
}

macro_rules! lock {
Expand All @@ -48,6 +52,13 @@ macro_rules! lock {
}

impl ChildReaper {
pub fn new(inactivity: Inactivity) -> ChildReaper {
Self {
grandchildren: Arc::default(),
inactivity,
}
}

pub fn get(&self, id: &str) -> Result<ReapableChild> {
let locked_grandchildren = &self.grandchildren().clone();
let lock = lock!(locked_grandchildren);
Expand Down Expand Up @@ -137,7 +148,7 @@ impl ChildReaper {
pub fn watch_grandchild(&self, child: Child) -> Result<Receiver<ExitChannelData>> {
let locked_grandchildren = &self.grandchildren().clone();
let mut map = lock!(locked_grandchildren);
let mut reapable_grandchild = ReapableChild::from_child(&child);
let mut reapable_grandchild = ReapableChild::from_child(&child, self.inactivity.activity());

let (exit_tx, exit_rx) = reapable_grandchild.watch()?;

Expand Down Expand Up @@ -233,6 +244,8 @@ pub struct ReapableChild {

#[getset(get = "pub")]
cleanup_cmd: Vec<String>,

activity: Activity,
}

#[derive(Clone, CopyGetters, Debug, Getters, Setters)]
Expand All @@ -248,7 +261,7 @@ pub struct ExitChannelData {
}

impl ReapableChild {
pub fn from_child(child: &Child) -> Self {
pub fn from_child(child: &Child, activity: Activity) -> Self {
Self {
exit_paths: child.exit_paths().clone(),
oom_exit_paths: child.oom_exit_paths().clone(),
Expand All @@ -258,6 +271,7 @@ impl ReapableChild {
token: child.token().clone(),
task: None,
cleanup_cmd: child.cleanup_cmd().to_vec(),
activity,
}
}

Expand Down Expand Up @@ -288,6 +302,8 @@ impl ReapableChild {
let stop_token = self.token().clone();
let cleanup_cmd_raw = self.cleanup_cmd().clone();

let activity = self.activity.clone();

let task = task::spawn(
async move {
debug!("Running task");
Expand Down Expand Up @@ -341,14 +357,16 @@ impl ReapableChild {
}

if !cleanup_cmd_raw.is_empty() {
Self::spawn_cleanup_process(&cleanup_cmd_raw).await;
Self::spawn_cleanup_process(&cleanup_cmd_raw, activity.clone()).await;
}

debug!("Sending exit struct to channel: {:?}", exit_channel_data);
if exit_tx_clone.send(exit_channel_data).is_err() {
debug!("Unable to send exit status");
}
debug!("Task done");

activity.stop();
}
.instrument(debug_span!("watch", pid)),
);
Expand All @@ -363,7 +381,7 @@ impl ReapableChild {
Ok((exit_tx, exit_rx))
}

async fn spawn_cleanup_process(raw_cmd: &[String]) {
async fn spawn_cleanup_process(raw_cmd: &[String], activity: Activity) {
let mut cleanup_cmd = Command::new(&raw_cmd[0]);

cleanup_cmd.args(&raw_cmd[1..]);
Expand All @@ -380,6 +398,7 @@ impl ReapableChild {
e
),
}
activity.stop();
});
}

Expand Down
11 changes: 10 additions & 1 deletion conmon-rs/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ macro_rules! prefix {
};
}

#[derive(CopyGetters, Debug, Deserialize, Eq, Getters, Parser, PartialEq, Serialize, Setters)]
#[derive(CopyGetters, Debug, Deserialize, Getters, Parser, PartialEq, Serialize, Setters)]
#[serde(rename_all = "kebab-case")]
#[command(
after_help("More info at: https://github.com/containers/conmon-rs"),
Expand Down Expand Up @@ -132,6 +132,15 @@ pub struct Config {
)]
/// OpenTelemetry GRPC endpoint to be used for tracing.
tracing_endpoint: String,

#[get_copy = "pub"]
#[arg(
env(concat!(prefix!(), "SHUTDOWN_DELAY")),
long("shutdown-delay"),
value_name("DELAY"),
)]
/// Automatically stop conmon-rs after DELAY seconds of inactivity. (0 = disabled)
shutdown_delay: f64,
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, Subcommand)]
Expand Down
124 changes: 124 additions & 0 deletions conmon-rs/server/src/inactivity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::{
future, process,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};

use tokio::sync::{futures::Notified, Notify};

/// Track activity and reacto to inactivity.
///
/// Can be used to exit accept loops after inactivity.
#[derive(Debug, Clone)]
pub struct Inactivity(Option<Arc<Inner>>);

impl Inactivity {
/// Create a new inactivity tracker.
pub fn new() -> Self {
Self(Some(Arc::default()))
}

/// Create a disabled inactivity tracker.
///
/// The wait function will never return. There is always activity.
pub const fn disabled() -> Self {
Self(None)
}

/// Start tracking an activity.
pub fn activity(&self) -> Activity {
Activity(self.0.as_ref().map(Inner::increment))
}

/// Async "block" until there is no activity and then wait for an additional`timeout`.
pub async fn wait(&self, timeout: Duration) {
if let Some(inner) = &self.0 {
loop {
let changed = inner.changed();
if inner.no_activity() {
let _ = tokio::time::timeout(timeout, changed).await;
if inner.no_activity() {
break;
}
} else {
changed.await
}
}
} else {
future::pending().await
}
}
}

/// Track an activity. Can be cloned to track additional activities.
///
/// The Activity stops on drop.
#[derive(Debug)]
pub struct Activity(Option<Arc<Inner>>);

impl Activity {
/// Explicitly stop an activity. Just a wrapper for `drop(activity)`.
pub fn stop(self) {
// nothing to to, we take self by value
}
}

impl Clone for Activity {
fn clone(&self) -> Self {
Self(self.0.as_ref().map(Inner::increment))
}
}

impl Drop for Activity {
fn drop(&mut self) {
if let Some(inner) = &self.0 {
inner.decrement();
}
}
}

#[derive(Debug, Default)]
struct Inner {
// number of current activites
active: AtomicUsize,
// gets notofied whenever the result of `no_activity` changes
notify: Notify,
}

impl Inner {
/// Abort if more then isize::MAX activities are active.
///
/// This prevents integer overflow of the `active` counter in case
/// someone is `mem::forget`ing activities.
///
/// The same logic is applied internaly by the `Arc` implementation.
const MAX_ACTIVE: usize = isize::MAX as usize;

fn increment(self: &Arc<Self>) -> Arc<Self> {
match self.active.fetch_add(1, Ordering::Relaxed) {
0 => self.notify.notify_waiters(),
1..=Self::MAX_ACTIVE => {}
_ => process::abort(),
}
self.clone()
}

fn decrement(&self) {
match self.active.fetch_sub(1, Ordering::Relaxed) {
1 => self.notify.notify_waiters(),
2..=Self::MAX_ACTIVE => {}
_ => process::abort(),
}
}

fn no_activity(&self) -> bool {
self.active.load(Ordering::Relaxed) == 0
}

fn changed(&self) -> Notified {
self.notify.notified()
}
}
1 change: 1 addition & 0 deletions conmon-rs/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod config;
mod container_io;
mod container_log;
mod cri_logger;
mod inactivity;
mod init;
mod journal;
mod listener;
Expand Down
36 changes: 29 additions & 7 deletions conmon-rs/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
child_reaper::ChildReaper,
config::{CgroupManager, Commands, Config, LogDriver, Verbosity},
container_io::{ContainerIO, ContainerIOType},
inactivity::Inactivity,
init::{DefaultInit, Init},
journal::Journal,
listener::{DefaultListener, Listener},
Expand All @@ -15,7 +16,7 @@ use anyhow::{format_err, Context, Result};
use capnp::text_list::Reader;
use capnp_rpc::{rpc_twoparty_capnp::Side, twoparty, RpcSystem};
use conmon_common::conmon_capnp::conmon;
use futures::{AsyncReadExt, FutureExt};
use futures::AsyncReadExt;
use getset::Getters;
use nix::{
errno,
Expand All @@ -24,7 +25,7 @@ use nix::{
unistd::{fork, ForkResult},
};
use opentelemetry::trace::FutureExt as OpenTelemetryFutureExt;
use std::{fs::File, io::Write, path::Path, process, str::FromStr, sync::Arc};
use std::{fs::File, io::Write, path::Path, process, str::FromStr, time::Duration};
use tokio::{
fs,
runtime::{Builder, Handle},
Expand All @@ -47,15 +48,26 @@ pub struct Server {

/// Child reaper instance.
#[getset(get = "pub(crate)")]
reaper: Arc<ChildReaper>,
reaper: ChildReaper,

// Shutdown controller.
#[getset(get = "pub(crate)")]
inactivity: Inactivity,
}

impl Server {
/// Create a new `Server` instance.
pub fn new() -> Result<Self> {
let config = Config::default();
let inactivity = if config.shutdown_delay() == 0.0 {
Inactivity::disabled()
} else {
Inactivity::new()
};
let server = Self {
config: Default::default(),
reaper: Default::default(),
config,
reaper: ChildReaper::new(inactivity.clone()),
inactivity,
};

if let Some(v) = server.config().version() {
Expand Down Expand Up @@ -214,7 +226,7 @@ impl Server {
}

async fn start_signal_handler<T: AsRef<Path>>(
reaper: Arc<ChildReaper>,
reaper: ChildReaper,
socket: T,
shutdown_tx: oneshot::Sender<()>,
) -> Result<()> {
Expand Down Expand Up @@ -254,6 +266,8 @@ impl Server {
}

async fn start_backend(self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> {
let inactivity = self.inactivity().clone();
let timeout = Duration::from_secs_f64(self.config().shutdown_delay());
let listener =
Listener::<DefaultListener>::default().bind_long_path(self.config().socket())?;
let client: conmon::Client = capnp_rpc::new_client(self);
Expand All @@ -264,6 +278,10 @@ impl Server {
debug!("Received shutdown message");
return Ok(())
}
() = inactivity.wait(timeout) => {
debug!("Automatic shutdown after inactivity");
return Ok(())
}
stream = listener.accept() => {
stream?.0
},
Expand All @@ -276,7 +294,11 @@ impl Server {
Default::default(),
));
let rpc_system = RpcSystem::new(network, Some(client.clone().client));
task::spawn_local(Box::pin(rpc_system.map(|_| ())));
let activity = inactivity.activity();
task::spawn_local(async move {
let _ = rpc_system.await;
drop(activity);
});
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ func (c *ConmonClient) toArgs(config *ConmonServerConfig) (entrypoint string, ar
}
}

args = append(args, "--shutdown-delay", "10")

return entrypoint, args, nil
}

Expand Down

0 comments on commit 994349a

Please sign in to comment.