From 03bbaf52c78c222c75bea4233dbe2d85ac12ecc0 Mon Sep 17 00:00:00 2001 From: Rebecca Turner Date: Mon, 28 Aug 2023 12:21:28 -0700 Subject: [PATCH] Tracing JSON reader --- Cargo.lock | 13 ++++ test-harness/Cargo.toml | 5 ++ test-harness/src/lib.rs | 5 ++ test-harness/src/tracing_json.rs | 108 +++++++++++++++++++++++++++++ test-harness/src/tracing_reader.rs | 72 +++++++++++++++++++ 5 files changed, 203 insertions(+) create mode 100644 test-harness/src/tracing_json.rs create mode 100644 test-harness/src/tracing_reader.rs diff --git a/Cargo.lock b/Cargo.lock index fe869bc9..70567d78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1743,6 +1743,19 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "test-harness" +version = "0.1.0" +dependencies = [ + "backoff", + "itertools", + "miette", + "serde", + "serde_json", + "tokio", + "tracing", +] + [[package]] name = "textwrap" version = "0.15.2" diff --git a/test-harness/Cargo.toml b/test-harness/Cargo.toml index 391be51a..2b520dc1 100644 --- a/test-harness/Cargo.toml +++ b/test-harness/Cargo.toml @@ -6,5 +6,10 @@ description = "Test harness for ghcid-ng" publish = false [dependencies] +backoff = { version = "0.4.0", default-features = false } +itertools = "0.11.0" miette = { version = "5.9.0", features = ["fancy"] } +serde = { version = "1.0.186", features = ["derive"] } +serde_json = "1.0.105" tokio = { version = "1.28.2", features = ["full", "tracing"] } +tracing = "0.1.37" diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index d521fbd7..ce7d22b0 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -1 +1,6 @@ +mod tracing_json; +pub use tracing_json::Event; + +mod tracing_reader; + pub mod fs; diff --git a/test-harness/src/tracing_json.rs b/test-harness/src/tracing_json.rs new file mode 100644 index 00000000..5a2923e4 --- /dev/null +++ b/test-harness/src/tracing_json.rs @@ -0,0 +1,108 @@ +use std::collections::HashMap; +use std::fmt::Display; + +use miette::Context; +use miette::IntoDiagnostic; +use serde::Deserialize; +use tracing::Level; + +#[derive(Deserialize, Debug)] +#[serde(try_from = "JsonEvent")] +pub struct Event { + pub timestamp: String, + pub level: Level, + pub message: String, + pub fields: HashMap, + pub target: String, + pub span: Option, + pub spans: Vec, +} + +impl Event { + /// Get an iterator over this event's spans, from the inside out. + pub fn spans(&self) -> impl Iterator { + self.span.iter().chain(self.spans.iter()) + } +} + +impl Display for Event { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} {}", self.level, self.target)?; + let spans = itertools::join(self.spans(), ">"); + if !spans.is_empty() { + write!(f, " [{spans}]")?; + } + write!(f, ": {}", self.message)?; + if !self.fields.is_empty() { + write!(f, " {}", display_map(&self.fields))?; + } + Ok(()) + } +} + +impl TryFrom for Event { + type Error = miette::Report; + + fn try_from(event: JsonEvent) -> Result { + Ok(Self { + timestamp: event.timestamp, + level: event + .level + .parse() + .into_diagnostic() + .wrap_err_with(|| format!("Failed to parse tracing level: {}", event.level))?, + message: event.fields.message, + fields: event.fields.rest, + target: event.target, + span: event.span, + spans: event.spans, + }) + } +} + +#[derive(Deserialize)] +struct JsonEvent { + timestamp: String, + level: String, + fields: Fields, + target: String, + span: Option, + #[serde(default)] + spans: Vec, +} + +#[derive(Deserialize, Debug)] +pub struct Span { + pub name: String, + #[serde(flatten)] + pub rest: HashMap, +} + +impl Display for Span { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}{}", self.name, display_map(&self.rest)) + } +} + +#[derive(Deserialize, Debug)] +struct Fields { + message: String, + #[serde(flatten)] + rest: HashMap, +} + +fn display_map(hashmap: &HashMap) -> String { + if hashmap.is_empty() { + String::new() + } else { + format!( + "{{{}}}", + itertools::join( + hashmap + .iter() + .map(|(name, value)| format!("{name}={value}")), + ", ", + ) + ) + } +} diff --git a/test-harness/src/tracing_reader.rs b/test-harness/src/tracing_reader.rs new file mode 100644 index 00000000..3809cd2c --- /dev/null +++ b/test-harness/src/tracing_reader.rs @@ -0,0 +1,72 @@ +use std::path::Path; +use std::time::Duration; + +use backoff::backoff::Backoff; +use backoff::ExponentialBackoff; +use miette::Context; +use miette::IntoDiagnostic; +use tokio::fs::File; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio::io::Lines; +use tokio::sync::mpsc; +use tracing::instrument; + +use super::Event; + +/// A task to read JSON tracing log events output by `ghid-ng` and send them over a channel. +pub struct TracingReader { + sender: mpsc::Sender, + lines: Lines>, +} + +impl TracingReader { + pub async fn new(sender: mpsc::Sender, path: impl AsRef) -> miette::Result { + let path = path.as_ref(); + + let file = File::open(path) + .await + .into_diagnostic() + .wrap_err_with(|| format!("Failed to open {path:?}"))?; + + let lines = BufReader::new(file).lines(); + + Ok(Self { sender, lines }) + } + + #[instrument(skip(self), name = "json-reader", level = "debug")] + pub async fn run(mut self) -> miette::Result<()> { + loop { + match self.run_inner().await { + Ok(()) => { + // Graceful shutdown + tracing::debug!("JSON reader exiting"); + break; + } + Err(err) => { + tracing::error!("{err:?}"); + } + } + } + + Ok(()) + } + + async fn run_inner(&mut self) -> miette::Result<()> { + let mut backoff = ExponentialBackoff { + max_elapsed_time: None, + max_interval: Duration::from_secs(1), + ..Default::default() + }; + + while let Some(duration) = backoff.next_backoff() { + while let Some(line) = self.lines.next_line().await.into_diagnostic()? { + let event = serde_json::from_str(&line).into_diagnostic()?; + self.sender.send(event).await.into_diagnostic()?; + } + tokio::time::sleep(duration).await; + } + + Ok(()) + } +}