Skip to content

Commit

Permalink
Export HDR log
Browse files Browse the repository at this point in the history
  • Loading branch information
pkolaczk committed Dec 10, 2021
1 parent bc62c55 commit 3ee31e0
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 33 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ license = "Apache-2.0"
[dependencies]
anyhow = "1.0.45"
base64 = "0.13.0"
chrono = "0.4.18"
chrono = { version = "0.4.18", features = ["serde"] }
clap = "=3.0.0-beta.5"
console = "0.15.0"
cpu-time = "1.0.0"
Expand Down
46 changes: 41 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ pub struct RunCommand {
pub output: Option<PathBuf>,

/// Path to a report from another earlier run that should be compared to side-by-side
#[clap(short('b'), long)]
#[clap(short('b'), long, value_name = "PATH")]
pub baseline: Option<PathBuf>,

/// Skips erasing and loading data before running the benchmark.
#[clap(long)]
pub no_load: bool,

/// Path to the workload definition file
#[clap(name = "workload", required = true)]
#[clap(name = "workload", required = true, value_name = "PATH")]
pub workload: PathBuf,

#[clap(short('P'), parse(try_from_str = parse_key_val),
Expand Down Expand Up @@ -183,25 +183,61 @@ impl RunCommand {
.find(|(k, _)| k == key)
.and_then(|v| v.1.parse().ok())
}

/// Returns benchmark name
pub fn name(&self) -> String {
self.workload
.file_stem()
.unwrap()
.to_string_lossy()
.to_string()
}
}

#[derive(Parser, Debug)]
pub struct ShowCommand {
/// Path to the JSON report file
#[clap(value_name = "PATH")]
pub report: PathBuf,

/// Optional path to another JSON report file
#[clap(short('b'), long)]
#[clap(short('b'), long, value_name = "PATH")]
pub baseline: Option<PathBuf>,
}

#[derive(Parser, Debug)]
pub struct HdrCommand {
/// Path to the input JSON report file
#[clap(value_name = "PATH")]
pub report: PathBuf,

/// Output file; if not given, the hdr log gets printed to stdout
#[clap(short('o'), long, value_name = "PATH")]
pub output: Option<PathBuf>,
}

#[derive(Parser, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Command {
/// Runs the benchmark
/// Runs the benchmark.
///
/// Prints nicely formatted statistics to the standard output.
/// Additionally dumps all data into a JSON report file.
Run(RunCommand),
/// Displays the report(s) of previously executed benchmark(s)

/// Displays the report(s) of previously executed benchmark(s).
///
/// Can compare two runs.
Show(ShowCommand),

/// Exports call- and response-time histograms as a compressed HDR interval log.
///
/// To be used with HdrHistogram (https://github.com/HdrHistogram/HdrHistogram).
/// Timestamps are given in seconds since Unix epoch.
/// Response times are recorded in nanoseconds.
///
/// Each histogram is tagged by the benchmark name, parameters and benchmark tags.
Hdr(HdrCommand),
}

#[derive(Parser, Debug)]
Expand Down
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::session::CassError;
use err_derive::*;
use hdrhistogram::serialization::interval_log::IntervalLogWriterError;
use hdrhistogram::serialization::V2DeflateSerializeError;
use std::path::PathBuf;

#[derive(Debug, Error)]
Expand All @@ -22,6 +24,12 @@ pub enum LatteError {
#[error(display = "{}", _0)]
Diagnostics(#[source] rune::diagnostics::EmitError),

#[error(display = "Failed to create output file {:?}: {}", _0, _1)]
OutputFileCreate(PathBuf, std::io::Error),

#[error(display = "Error writing HDR log: {}", _0)]
HdrLogWrite(#[source] IntervalLogWriterError<V2DeflateSerializeError>),

#[error(display = "Interrupted")]
Interrupted,
}
Expand Down
4 changes: 2 additions & 2 deletions src/histogram.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt;
use std::io::Cursor;

use hdrhistogram::serialization::{Serializer, V2Serializer};
use hdrhistogram::serialization::{Serializer, V2DeflateSerializer};
use hdrhistogram::Histogram;
use serde::de::{Error, Visitor};
use serde::{Deserialize, Deserializer, Serialize};
Expand All @@ -16,7 +16,7 @@ impl Serialize for SerializableHistogram {
S: serde::Serializer,
{
let mut serialized_histogram = Vec::new();
V2Serializer::new()
V2DeflateSerializer::new()
.serialize(&self.0, &mut serialized_histogram)
.unwrap();
let encoded = base64::encode(serialized_histogram);
Expand Down
67 changes: 55 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::cmp::max;
use std::fs::File;
use std::io::{stdout, Write};
use std::num::NonZeroUsize;
use std::ops::Range;
use std::path::{Path, PathBuf};
Expand All @@ -11,6 +13,8 @@ use futures::channel::mpsc;
use futures::channel::mpsc::{Receiver, Sender};
use futures::future::ready;
use futures::{SinkExt, Stream, StreamExt};
use hdrhistogram::serialization::interval_log::Tag;
use hdrhistogram::serialization::{interval_log, V2DeflateSerializer};
use itertools::Itertools;
use rune::Source;
use status_line::StatusLine;
Expand All @@ -19,7 +23,7 @@ use tokio_stream::wrappers::IntervalStream;

use config::RunCommand;

use crate::config::{AppConfig, Command, ShowCommand};
use crate::config::{AppConfig, Command, HdrCommand, ShowCommand};
use crate::deadline::Deadline;
use crate::error::{LatteError, Result};
use crate::interrupt::InterruptHandler;
Expand All @@ -41,6 +45,8 @@ mod session;
mod stats;
mod workload;

const VERSION: &str = env!("CARGO_PKG_VERSION");

#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

Expand Down Expand Up @@ -315,20 +321,11 @@ fn load_report_or_abort(path: &Path) -> Report {
/// Constructs the output report file name from the parameters in the command.
/// Separates parameters with underscores.
fn get_default_output_name(conf: &RunCommand) -> PathBuf {
let name = conf
.workload
.file_stem()
.unwrap()
.to_string_lossy()
.to_string();

let mut components = vec![name];
let mut components = vec![conf.name()];
components.extend(conf.cluster_name.iter().map(|x| x.replace(" ", "_")));
components.extend(conf.cass_version.iter().cloned());
components.extend(conf.tags.iter().cloned());
if let Some(r) = conf.rate {
components.push(format!("r{}", r))
};
components.extend(conf.rate.map(|r| format!("r{}", r)));
components.push(format!("p{}", conf.concurrency));
components.push(format!("t{}", conf.threads));
components.push(format!("c{}", conf.connections));
Expand Down Expand Up @@ -515,10 +512,56 @@ async fn show(conf: ShowCommand) -> Result<()> {
Ok(())
}

/// Reads histograms from the report and dumps them to an hdr log
async fn export_hdr_log(conf: HdrCommand) -> Result<()> {
let report = load_report_or_abort(&conf.report);
let stdout = stdout();
let output_file: File;
let stdout_stream;
let mut out: Box<dyn Write> = match conf.output {
Some(path) => {
output_file = File::create(&path).map_err(|e| LatteError::OutputFileCreate(path, e))?;
Box::new(output_file)
}
None => {
stdout_stream = stdout.lock();
Box::new(stdout_stream)
}
};

let mut serializer = V2DeflateSerializer::new();
let mut log_writer = interval_log::IntervalLogWriterBuilder::new()
.add_comment(format!("[Logged with Latte {}]", VERSION).as_str())
.with_start_time(report.result.start_time.into())
.with_base_time(report.result.start_time.into())
.with_max_value_divisor(1000000.0) // ms
.begin_log_with(&mut out, &mut serializer)
.unwrap();

for sample in &report.result.log {
let interval_start_time = Duration::from_millis((sample.time_s * 1000.0) as u64);
let interval_duration = Duration::from_millis((sample.duration_s * 1000.0) as u64);
log_writer.write_histogram(
&sample.call_time_histogram_ns.0,
interval_start_time,
interval_duration,
Tag::new("call_time"),
)?;
log_writer.write_histogram(
&sample.resp_time_histogram_ns.0,
interval_start_time,
interval_duration,
Tag::new("resp_time"),
)?;
}
Ok(())
}

async fn async_main(command: Command) -> Result<()> {
match command {
Command::Run(config) => run(config).await?,
Command::Show(config) => show(config).await?,
Command::Hdr(config) => export_hdr_log(config).await?,
}
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions src/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,12 +554,12 @@ impl<'a> Display for BenchmarkCmp<'a> {
self.line("└─", "row/req", |s| {
Quantity::new(s.row_count_per_req, 1)
}),
self.line("Samples", "", |s| Quantity::new(s.samples.len(), 0)),
self.line("Samples", "", |s| Quantity::new(s.log.len(), 0)),
self.line("Mean sample size", "op", |s| {
Quantity::new(s.samples.iter().map(|s| s.call_count as f64).mean(), 0)
Quantity::new(s.log.iter().map(|s| s.call_count as f64).mean(), 0)
}),
self.line("└─", "req", |s| {
Quantity::new(s.samples.iter().map(|s| s.request_count as f64).mean(), 0)
Quantity::new(s.log.iter().map(|s| s.request_count as f64).mean(), 0)
}),
self.line("Concurrency", "req", |s| {
Quantity::new(s.concurrency.value, 1)
Expand Down
31 changes: 21 additions & 10 deletions src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use chrono::{DateTime, Local};
use std::cmp::min;
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::time::Instant;
use std::time::{Instant, SystemTime};

use cpu_time::ProcessTime;
use hdrhistogram::Histogram;
Expand Down Expand Up @@ -418,6 +419,8 @@ pub struct TimeDistribution {
/// Stores the final statistics of the test run.
#[derive(Serialize, Deserialize)]
pub struct BenchmarkStats {
pub start_time: DateTime<Local>,
pub end_time: DateTime<Local>,
pub elapsed_time_s: f64,
pub cpu_time_s: f64,
pub cpu_util: f64,
Expand All @@ -437,7 +440,7 @@ pub struct BenchmarkStats {
pub resp_time_ms: TimeDistribution,
pub concurrency: Mean,
pub concurrency_ratio: f64,
pub samples: Vec<Sample>,
pub log: Vec<Sample>,
}

/// Stores the statistics of one or two test runs.
Expand Down Expand Up @@ -503,8 +506,10 @@ impl BenchmarkCmp<'_> {
/// Can be also used to split the time-series into smaller sub-samples and to
/// compute statistics for each sub-sample separately.
pub struct Recorder {
pub start_time: Instant,
pub end_time: Instant,
pub start_time: SystemTime,
pub end_time: SystemTime,
pub start_instant: Instant,
pub end_instant: Instant,
pub start_cpu_time: ProcessTime,
pub end_cpu_time: ProcessTime,
pub call_count: u64,
Expand All @@ -522,13 +527,16 @@ pub struct Recorder {

impl Recorder {
/// Creates a new recorder.
/// The `rate_limit` and `parallelism_limit` parameters are used only as the
/// The `rate_limit` and `concurrency_limit` parameters are used only as the
/// reference levels for relative throughput and relative parallelism.
pub fn start(rate_limit: Option<f64>, concurrency_limit: NonZeroUsize) -> Recorder {
let start_time = Instant::now();
let start_time = SystemTime::now();
let start_instant = Instant::now();
Recorder {
start_time,
end_time: start_time,
start_instant,
end_instant: start_instant,
start_cpu_time: ProcessTime::now(),
end_cpu_time: ProcessTime::now(),
log: Log::new(),
Expand Down Expand Up @@ -556,7 +564,7 @@ impl Recorder {
.add(&s.function_stats.call_times_ns)
.unwrap();
}
let stats = Sample::new(self.start_time, samples);
let stats = Sample::new(self.start_instant, samples);
self.call_count += stats.call_count;
self.request_count += stats.request_count;
self.row_count += stats.row_count;
Expand All @@ -569,15 +577,16 @@ impl Recorder {

/// Stops the recording, computes the statistics and returns them as the new object.
pub fn finish(mut self) -> BenchmarkStats {
self.end_time = Instant::now();
self.end_time = SystemTime::now();
self.end_instant = Instant::now();
self.end_cpu_time = ProcessTime::now();
self.stats()
}

/// Computes the final statistics based on collected data
/// and turn them into report that can be serialized
fn stats(self) -> BenchmarkStats {
let elapsed_time_s = (self.end_time - self.start_time).as_secs_f64();
let elapsed_time_s = (self.end_instant - self.start_instant).as_secs_f64();
let cpu_time_s = self
.end_cpu_time
.duration_since(self.start_cpu_time)
Expand All @@ -602,6 +611,8 @@ impl Recorder {
.collect();

BenchmarkStats {
start_time: self.start_time.into(),
end_time: self.end_time.into(),
elapsed_time_s,
cpu_time_s,
cpu_util,
Expand Down Expand Up @@ -629,7 +640,7 @@ impl Recorder {
},
concurrency,
concurrency_ratio,
samples: self.log.samples,
log: self.log.samples,
}
}
}
Expand Down

0 comments on commit 3ee31e0

Please sign in to comment.