Skip to content

Commit

Permalink
Tracing JSON reader
Browse files Browse the repository at this point in the history
  • Loading branch information
9999years committed Aug 28, 2023
1 parent d098ec2 commit 03bbaf5
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 0 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions test-harness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,10 @@ description = "Test harness for ghcid-ng"
publish = false

[dependencies]
backoff = { version = "0.4.0", default-features = false }
itertools = "0.11.0"
miette = { version = "5.9.0", features = ["fancy"] }
serde = { version = "1.0.186", features = ["derive"] }
serde_json = "1.0.105"
tokio = { version = "1.28.2", features = ["full", "tracing"] }
tracing = "0.1.37"
5 changes: 5 additions & 0 deletions test-harness/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
mod tracing_json;
pub use tracing_json::Event;

mod tracing_reader;

pub mod fs;
108 changes: 108 additions & 0 deletions test-harness/src/tracing_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use std::collections::HashMap;
use std::fmt::Display;

use miette::Context;
use miette::IntoDiagnostic;
use serde::Deserialize;
use tracing::Level;

#[derive(Deserialize, Debug)]
#[serde(try_from = "JsonEvent")]
pub struct Event {
pub timestamp: String,
pub level: Level,
pub message: String,
pub fields: HashMap<String, serde_json::Value>,
pub target: String,
pub span: Option<Span>,
pub spans: Vec<Span>,
}

impl Event {
/// Get an iterator over this event's spans, from the inside out.
pub fn spans(&self) -> impl Iterator<Item = &Span> {
self.span.iter().chain(self.spans.iter())
}
}

impl Display for Event {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} {}", self.level, self.target)?;
let spans = itertools::join(self.spans(), ">");
if !spans.is_empty() {
write!(f, " [{spans}]")?;
}
write!(f, ": {}", self.message)?;
if !self.fields.is_empty() {
write!(f, " {}", display_map(&self.fields))?;
}
Ok(())
}
}

impl TryFrom<JsonEvent> for Event {
type Error = miette::Report;

fn try_from(event: JsonEvent) -> Result<Self, Self::Error> {
Ok(Self {
timestamp: event.timestamp,
level: event
.level
.parse()
.into_diagnostic()
.wrap_err_with(|| format!("Failed to parse tracing level: {}", event.level))?,
message: event.fields.message,
fields: event.fields.rest,
target: event.target,
span: event.span,
spans: event.spans,
})
}
}

#[derive(Deserialize)]
struct JsonEvent {
timestamp: String,
level: String,
fields: Fields,
target: String,
span: Option<Span>,
#[serde(default)]
spans: Vec<Span>,
}

#[derive(Deserialize, Debug)]
pub struct Span {
pub name: String,
#[serde(flatten)]
pub rest: HashMap<String, serde_json::Value>,
}

impl Display for Span {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}{}", self.name, display_map(&self.rest))
}
}

#[derive(Deserialize, Debug)]
struct Fields {
message: String,
#[serde(flatten)]
rest: HashMap<String, serde_json::Value>,
}

fn display_map(hashmap: &HashMap<String, serde_json::Value>) -> String {
if hashmap.is_empty() {
String::new()
} else {
format!(
"{{{}}}",
itertools::join(
hashmap
.iter()
.map(|(name, value)| format!("{name}={value}")),
", ",
)
)
}
}
72 changes: 72 additions & 0 deletions test-harness/src/tracing_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::path::Path;
use std::time::Duration;

use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use miette::Context;
use miette::IntoDiagnostic;
use tokio::fs::File;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::io::Lines;
use tokio::sync::mpsc;
use tracing::instrument;

use super::Event;

/// A task to read JSON tracing log events output by `ghid-ng` and send them over a channel.
pub struct TracingReader {
sender: mpsc::Sender<Event>,
lines: Lines<BufReader<File>>,
}

impl TracingReader {
pub async fn new(sender: mpsc::Sender<Event>, path: impl AsRef<Path>) -> miette::Result<Self> {
let path = path.as_ref();

let file = File::open(path)
.await
.into_diagnostic()
.wrap_err_with(|| format!("Failed to open {path:?}"))?;

let lines = BufReader::new(file).lines();

Ok(Self { sender, lines })
}

#[instrument(skip(self), name = "json-reader", level = "debug")]
pub async fn run(mut self) -> miette::Result<()> {
loop {
match self.run_inner().await {
Ok(()) => {
// Graceful shutdown
tracing::debug!("JSON reader exiting");
break;
}
Err(err) => {
tracing::error!("{err:?}");
}
}
}

Ok(())
}

async fn run_inner(&mut self) -> miette::Result<()> {
let mut backoff = ExponentialBackoff {
max_elapsed_time: None,
max_interval: Duration::from_secs(1),
..Default::default()
};

while let Some(duration) = backoff.next_backoff() {
while let Some(line) = self.lines.next_line().await.into_diagnostic()? {
let event = serde_json::from_str(&line).into_diagnostic()?;
self.sender.send(event).await.into_diagnostic()?;
}
tokio::time::sleep(duration).await;
}

Ok(())
}
}

0 comments on commit 03bbaf5

Please sign in to comment.