Skip to content

Commit

Permalink
Updated JsonLogger and its Unit Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wasup-yash committed Oct 10, 2023
1 parent c99a1c1 commit 69479b9
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 32 deletions.
35 changes: 12 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion conmon-rs/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ conmon-common = { path = "../common" }
futures = "0.3.28"
log = { version = "0.4.20", features = ["serde", "std"] }
serde = { version = "1.0.188", features = ["derive"] }
tokio = { version = "1.32.0", features = ["fs", "macros", "net", "process", "rt", "signal", "time"] }
tokio = { version = "1.33.0", features = ["fs", "macros", "net", "process", "rt", "signal", "time"] }
tokio-util = { version = "0.7.9", features = ["compat"] }
8 changes: 4 additions & 4 deletions conmon-rs/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ conmon-common = { path = "../common" }
futures = "0.3.28"
getset = "0.1.2"
lazy_static = "1.4.0"
libc = "0.2.148"
libc = "0.2.149"
libsystemd = "0.6.0"
memchr = "2.6.4"
multimap = "0.9.0"
Expand All @@ -31,11 +31,11 @@ prctl = "1.0.0"
regex = "1.9.6"
sendfd = { version = "0.4.3", features = ["tokio"] }
serde = { version = "1.0.188", features = ["derive"] }
shadow-rs = "0.24.0"
shadow-rs = "0.24.1"
signal-hook = "0.3.17"
strum = { version = "0.25.0", features = ["derive"] }
tempfile = "3.8.0"
tokio = { version = "1.32.0", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "time"] }
tokio = { version = "1.33.0", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "time"] }
tokio-eventfd = "0.2.1"
tokio-seqpacket = "0.7.0"
tokio-util = { version = "0.7.9", features = ["compat"] }
Expand All @@ -46,7 +46,7 @@ tz-rs = "0.6.14"
uuid = { version = "1.4.1", features = ["v4", "fast-rng", "macro-diagnostics"] }

[build-dependencies]
shadow-rs = "0.24.0"
shadow-rs = "0.24.1"
dashmap = "5.5.3"

[dev-dependencies]
Expand Down
69 changes: 68 additions & 1 deletion conmon-rs/server/src/container_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{container_io::Pipe, cri_logger::CriLogger};
use anyhow::Result;
use capnp::struct_list::Reader;
use conmon_common::conmon_capnp::conmon::log_driver::{Owned, Type};
use futures::future::join_all;
use futures::{future::join_all, FutureExt};
use std::sync::Arc;
use tokio::{io::AsyncBufRead, sync::RwLock};

Expand Down Expand Up @@ -40,7 +40,19 @@ impl ContainerLog {
},
)?)
}
<<<<<<< HEAD
})
=======
Type::Json => Ok(LogDriver::Json(JsonLogger::new(
x.get_path()?,
if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?)),
}
>>>>>>> d9216df (updated cargo fmt)
})
.collect();
Ok(Arc::new(RwLock::new(Self { drivers })))
Expand All @@ -52,7 +64,18 @@ impl ContainerLog {
self.drivers
.iter_mut()
.map(|x| match x {
<<<<<<< HEAD
<<<<<<< HEAD
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => cri_logger.init(),
=======
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => cri_logger.init().boxed(),
=======
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => {
cri_logger.init().boxed()
}
>>>>>>> d9216df (updated cargo fmt)
LogDriver::Json(ref mut json_logger) => json_logger.init().boxed(),
>>>>>>> 217ede1 (updated logger)
})
.collect::<Vec<_>>(),
)
Expand All @@ -61,14 +84,28 @@ impl ContainerLog {
.collect::<Result<Vec<_>>>()?;
Ok(())
}
<<<<<<< HEAD

=======
>>>>>>> d9216df (updated cargo fmt)
/// Reopen the container logs.
pub async fn reopen(&mut self) -> Result<()> {
join_all(
self.drivers
.iter_mut()
.map(|x| match x {
<<<<<<< HEAD
<<<<<<< HEAD
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => cri_logger.reopen(),
=======
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => cri_logger.reopen().boxed(),
=======
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => {
cri_logger.reopen().boxed()
}
>>>>>>> d9216df (updated cargo fmt)
LogDriver::Json(ref mut json_logger) => json_logger.reopen().boxed(),
>>>>>>> 42ff85b (updated jsonlogger)
})
.collect::<Vec<_>>(),
)
Expand All @@ -81,6 +118,7 @@ impl ContainerLog {
/// Write the contents of the provided reader into all loggers.
pub async fn write<T>(&mut self, pipe: Pipe, bytes: T) -> Result<()>
where
<<<<<<< HEAD
T: AsyncBufRead + Unpin + Copy,
{
join_all(
Expand All @@ -96,6 +134,35 @@ impl ContainerLog {
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
=======
T: AsyncBufRead + Unpin + Clone,
{
let futures = self
.drivers
.iter_mut()
.map(|x| {
async fn box_future<'a, T: AsyncBufRead + Unpin + Clone>(
logger: &mut LogDriver,
pipe: Pipe,
bytes: T,
) -> Result<()> {
match logger {
LogDriver::ContainerRuntimeInterface(cri_logger) => {
cri_logger.write(pipe, bytes).await
}
LogDriver::Json(json_logger) => json_logger.write(pipe, bytes).await,
}
}

box_future(x, pipe, bytes.clone())
})
.collect::<Vec<_>>();

join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
>>>>>>> d9216df (updated cargo fmt)
Ok(())
}
}
Loading

0 comments on commit 69479b9

Please sign in to comment.