diff --git a/Cargo.lock b/Cargo.lock index b3df38ebbf..9ab1c671a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -508,25 +508,14 @@ checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" [[package]] name = "errno" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add4f07d43996f76ef320709726a556a9d4f965d9410d8d0271132d2f8293480" +checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" dependencies = [ - "errno-dragonfly", "libc", "windows-sys", ] -[[package]] -name = "errno-dragonfly" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" -dependencies = [ - "cc", - "libc", -] - [[package]] name = "fastrand" version = "1.9.0" @@ -1041,9 +1030,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.148" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "libgit2-sys" @@ -1310,9 +1299,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", ] @@ -1590,9 +1579,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.67" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] @@ -1811,9 +1800,9 @@ dependencies = [ [[package]] name = "shadow-rs" -version = "0.24.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f88940f406d415c7f11f2fb3d28fae8dfe500ac1be48f07cd0bb4818e4fb2ad2" +checksum = "f9198caff1c94f1a5df6664bddbc379896b51b98a55b0b3fedcb23078fe00c77" dependencies = [ "const_format", "git2", @@ -2053,9 +2042,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", "bytes", diff --git a/conmon-rs/client/Cargo.toml b/conmon-rs/client/Cargo.toml index fc5fb3e51d..d43fd8c16a 100644 --- a/conmon-rs/client/Cargo.toml +++ b/conmon-rs/client/Cargo.toml @@ -10,5 +10,5 @@ conmon-common = { path = "../common" } futures = "0.3.28" log = { version = "0.4.20", features = ["serde", "std"] } serde = { version = "1.0.188", features = ["derive"] } -tokio = { version = "1.32.0", features = ["fs", "macros", "net", "process", "rt", "signal", "time"] } +tokio = { version = "1.33.0", features = ["fs", "macros", "net", "process", "rt", "signal", "time"] } tokio-util = { version = "0.7.9", features = ["compat"] } diff --git a/conmon-rs/server/Cargo.toml b/conmon-rs/server/Cargo.toml index f45599f283..0ba77567c3 100644 --- a/conmon-rs/server/Cargo.toml +++ b/conmon-rs/server/Cargo.toml @@ -17,7 +17,7 @@ conmon-common = { path = "../common" } futures = "0.3.28" getset = "0.1.2" lazy_static = "1.4.0" -libc = "0.2.148" +libc = "0.2.149" libsystemd = "0.6.0" memchr = "2.6.4" multimap = "0.9.0" @@ -31,11 +31,11 @@ prctl = "1.0.0" regex = "1.9.6" sendfd = { version = "0.4.3", features = ["tokio"] } serde = { version = "1.0.188", features = ["derive"] } -shadow-rs = "0.24.0" +shadow-rs = "0.24.1" signal-hook = "0.3.17" strum = { version = "0.25.0", features = ["derive"] } tempfile = "3.8.0" -tokio = { version = "1.32.0", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "time"] } +tokio = { version = "1.33.0", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "time"] } tokio-eventfd = "0.2.1" tokio-seqpacket = "0.7.0" tokio-util = { version = "0.7.9", features = ["compat"] } @@ -46,7 +46,7 @@ tz-rs = "0.6.14" uuid = { version = "1.4.1", features = ["v4", "fast-rng", "macro-diagnostics"] } [build-dependencies] -shadow-rs = "0.24.0" +shadow-rs = "0.24.1" dashmap = "5.5.3" [dev-dependencies] diff --git a/conmon-rs/server/src/container_log.rs b/conmon-rs/server/src/container_log.rs index fff7f7ce20..272bc3d2ba 100644 --- a/conmon-rs/server/src/container_log.rs +++ b/conmon-rs/server/src/container_log.rs @@ -2,7 +2,7 @@ use crate::{container_io::Pipe, cri_logger::CriLogger}; use anyhow::Result; use capnp::struct_list::Reader; use conmon_common::conmon_capnp::conmon::log_driver::{Owned, Type}; -use futures::future::join_all; +use futures::{future::join_all, FutureExt}; use std::sync::Arc; use tokio::{io::AsyncBufRead, sync::RwLock}; @@ -40,7 +40,19 @@ impl ContainerLog { }, )?) } +<<<<<<< HEAD }) +======= + Type::Json => Ok(LogDriver::Json(JsonLogger::new( + x.get_path()?, + if x.get_max_size() > 0 { + Some(x.get_max_size() as usize) + } else { + None + }, + )?)), + } +>>>>>>> d9216df (updated cargo fmt) }) .collect(); Ok(Arc::new(RwLock::new(Self { drivers }))) @@ -52,7 +64,18 @@ impl ContainerLog { self.drivers .iter_mut() .map(|x| match x { +<<<<<<< HEAD +<<<<<<< HEAD LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => cri_logger.init(), +======= + LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => cri_logger.init().boxed(), +======= + LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => { + cri_logger.init().boxed() + } +>>>>>>> d9216df (updated cargo fmt) + LogDriver::Json(ref mut json_logger) => json_logger.init().boxed(), +>>>>>>> 217ede1 (updated logger) }) .collect::>(), ) @@ -61,14 +84,28 @@ impl ContainerLog { .collect::>>()?; Ok(()) } +<<<<<<< HEAD +======= +>>>>>>> d9216df (updated cargo fmt) /// Reopen the container logs. pub async fn reopen(&mut self) -> Result<()> { join_all( self.drivers .iter_mut() .map(|x| match x { +<<<<<<< HEAD +<<<<<<< HEAD LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => cri_logger.reopen(), +======= + LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => cri_logger.reopen().boxed(), +======= + LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => { + cri_logger.reopen().boxed() + } +>>>>>>> d9216df (updated cargo fmt) + LogDriver::Json(ref mut json_logger) => json_logger.reopen().boxed(), +>>>>>>> 42ff85b (updated jsonlogger) }) .collect::>(), ) @@ -81,6 +118,7 @@ impl ContainerLog { /// Write the contents of the provided reader into all loggers. pub async fn write(&mut self, pipe: Pipe, bytes: T) -> Result<()> where +<<<<<<< HEAD T: AsyncBufRead + Unpin + Copy, { join_all( @@ -96,6 +134,35 @@ impl ContainerLog { .await .into_iter() .collect::>>()?; +======= + T: AsyncBufRead + Unpin + Clone, + { + let futures = self + .drivers + .iter_mut() + .map(|x| { + async fn box_future<'a, T: AsyncBufRead + Unpin + Clone>( + logger: &mut LogDriver, + pipe: Pipe, + bytes: T, + ) -> Result<()> { + match logger { + LogDriver::ContainerRuntimeInterface(cri_logger) => { + cri_logger.write(pipe, bytes).await + } + LogDriver::Json(json_logger) => json_logger.write(pipe, bytes).await, + } + } + + box_future(x, pipe, bytes.clone()) + }) + .collect::>(); + + join_all(futures) + .await + .into_iter() + .collect::>>()?; +>>>>>>> d9216df (updated cargo fmt) Ok(()) } } diff --git a/conmon-rs/server/src/json_logger.rs b/conmon-rs/server/src/json_logger.rs new file mode 100644 index 0000000000..211f1f83cd --- /dev/null +++ b/conmon-rs/server/src/json_logger.rs @@ -0,0 +1,182 @@ +use crate::container_io::Pipe; +use anyhow::{Context, Result}; +use getset::{CopyGetters, Getters, Setters}; +use serde_json::json; +use std::{ + marker::Unpin, + path::{Path, PathBuf}, +}; +use tokio::{ + fs::{File, OpenOptions}, + io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, +}; +use tracing::debug; + +#[derive(Debug, CopyGetters, Getters, Setters)] +pub struct JsonLogger { + #[getset(get)] + path: PathBuf, + + #[getset(set)] + file: Option>, + + #[getset(get_copy)] + max_log_size: Option, + + #[getset(get_copy, set)] + bytes_written: usize, +} + +impl JsonLogger { + const ERR_UNINITIALIZED: &'static str = "logger not initialized"; + + pub fn new>(path: T, max_log_size: Option) -> Result { + Ok(Self { + path: path.as_ref().into(), + file: None, + max_log_size, + bytes_written: 0, + }) + } + + pub async fn init(&mut self) -> Result<()> { + debug!("Initializing JSON logger in path {}", self.path().display()); + self.set_file(Self::open(self.path()).await?.into()); + Ok(()) + } + + pub async fn write(&mut self, pipe: Pipe, bytes: T) -> Result<()> + where + T: AsyncBufRead + Unpin, + { + let mut reader = BufReader::new(bytes); + let mut line_buf = Vec::new(); + + while reader.read_until(b'\n', &mut line_buf).await? > 0 { + let log_entry = json!({ + "timestamp": format!("{:?}", std::time::SystemTime::now()), + "pipe": match pipe { + Pipe::StdOut => "stdout", + Pipe::StdErr => "stderr", + }, + "message": String::from_utf8_lossy(&line_buf).trim().to_string() + }); + + let log_str = log_entry.to_string(); + let bytes = log_str.as_bytes(); + self.bytes_written += bytes.len(); + + if let Some(max_size) = self.max_log_size { + if self.bytes_written > max_size { + self.reopen().await?; + self.bytes_written = 0; + } + } + + let file = self.file.as_mut().context(Self::ERR_UNINITIALIZED)?; + file.write_all(bytes).await?; + file.write_all(b"\n").await?; + self.flush().await?; + line_buf.clear(); + } + + Ok(()) + } + + pub async fn reopen(&mut self) -> Result<()> { + debug!("Reopen JSON log {}", self.path().display()); + self.file + .as_mut() + .context(Self::ERR_UNINITIALIZED)? + .get_ref() + .sync_all() + .await?; + self.init().await + } + + pub async fn flush(&mut self) -> Result<()> { + self.file + .as_mut() + .context(Self::ERR_UNINITIALIZED)? + .flush() + .await + .context("flush file writer") + } + + async fn open>(path: T) -> Result> { + Ok(BufWriter::new( + OpenOptions::new() + .create(true) + .read(true) + .truncate(true) + .write(true) + .open(&path) + .await + .context(format!("open log file path '{}'", path.as_ref().display()))?, + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; + use tokio::io::AsyncReadExt; + + #[tokio::test] + async fn test_json_logger_new() { + let logger = JsonLogger::new("/tmp/test.log", Some(1000)).unwrap(); + assert_eq!(logger.path().to_str().unwrap(), "/tmp/test.log"); + assert_eq!(logger.max_log_size().unwrap(), 1000); + } + + #[tokio::test] + async fn test_json_logger_init() { + let mut logger = JsonLogger::new("/tmp/test_init.log", Some(1000)).unwrap(); + logger.init().await.unwrap(); + assert!(logger.file.is_some()); + } + + #[tokio::test] + async fn test_json_logger_write() { + let mut logger = JsonLogger::new("/tmp/test_write.log", Some(1000)).unwrap(); + logger.init().await.unwrap(); + + let cursor = Cursor::new(b"Test log message\n".to_vec()); + logger.write(Pipe::StdOut, cursor).await.unwrap(); + + // Read back from the file + let mut file = File::open("/tmp/test_write.log").await.unwrap(); + let mut contents = String::new(); + file.read_to_string(&mut contents).await.unwrap(); + + // Check if the file contains the logged message + assert!(contents.contains("Test log message")); + } + + #[tokio::test] + async fn test_json_logger_reopen() { + let mut logger = JsonLogger::new("/tmp/test_reopen.log", Some(1000)).unwrap(); + logger.init().await.unwrap(); + + // Write to the file + let cursor = Cursor::new(b"Test log message before reopen\n".to_vec()); + logger.write(Pipe::StdOut, cursor).await.unwrap(); + + // Reopen the file + logger.reopen().await.unwrap(); + + // Write to the file again + let cursor = Cursor::new(b"Test log message after reopen\n".to_vec()); + logger.write(Pipe::StdOut, cursor).await.unwrap(); + + // Read back from the file + let mut file = File::open("/tmp/test_reopen.log").await.unwrap(); + let mut contents = String::new(); + file.read_to_string(&mut contents).await.unwrap(); + + // Check if the file contains the logged message + assert!(contents.contains("Test log message after reopen")); + assert!(!contents.contains("Test log message before reopen")); + } +} diff --git a/conmon-rs/server/src/lib.rs b/conmon-rs/server/src/lib.rs index 72f3237cd2..c47ef04cc2 100644 --- a/conmon-rs/server/src/lib.rs +++ b/conmon-rs/server/src/lib.rs @@ -18,6 +18,7 @@ mod cri_logger; mod fd_socket; mod init; mod journal; +mod json_logger; mod listener; mod oom_watcher; mod pause; diff --git a/go.mod b/go.mod index 8de14ef66f..2c85a7b451 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/containers/common v0.56.1-0.20230920110729-eb4ad859f309 github.com/containers/storage v1.50.2 github.com/google/uuid v1.3.1 - github.com/onsi/ginkgo/v2 v2.12.1 + github.com/onsi/ginkgo/v2 v2.13.0 github.com/onsi/gomega v1.28.0 github.com/opencontainers/runc v1.1.9 github.com/opencontainers/runtime-tools v0.9.1-0.20230317050512-e931285f4b69 diff --git a/go.sum b/go.sum index 75d41f1680..2d1cc10194 100644 --- a/go.sum +++ b/go.sum @@ -36,8 +36,8 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= -github.com/onsi/ginkgo/v2 v2.12.1 h1:uHNEO1RP2SpuZApSkel9nEh1/Mu+hmQe7Q+Pepg5OYA= -github.com/onsi/ginkgo/v2 v2.12.1/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= +github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= +github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= github.com/onsi/gomega v1.28.0 h1:i2rg/p9n/UqIDAMFUJ6qIUUMcsqOuUHgbpbu235Vr1c= github.com/onsi/gomega v1.28.0/go.mod h1:A1H2JE76sI14WIP57LMKj7FVfCHx3g3BcZVjJG8bjX8= github.com/opencontainers/runc v1.1.9 h1:XR0VIHTGce5eWPkaPesqTBrhW2yAcaraWfsEalNwQLM= diff --git a/pkg/client/client.go b/pkg/client/client.go index e6aa985ef5..b154a9137b 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -685,6 +685,7 @@ const ( // LogDriverTypeContainerRuntimeInterface is the Kubernetes CRI logger // type. LogDriverTypeContainerRuntimeInterface LogDriverType = iota + LogDriverTypeJSONLogger LogDriverType = iota ) // CreateContainerResponse is the response of the CreateContainer method.