Skip to content

Commit

Permalink
Save histograms in JSON report
Browse files Browse the repository at this point in the history
Switch to nanosecond precision in histograms.
  • Loading branch information
pkolaczk committed Dec 10, 2021
1 parent b4e3495 commit bc62c55
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 32 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.45"
base64 = "0.13.0"
chrono = "0.4.18"
clap = "=3.0.0-beta.5"
console = "0.15.0"
Expand Down
59 changes: 59 additions & 0 deletions src/histogram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::fmt;
use std::io::Cursor;

use hdrhistogram::serialization::{Serializer, V2Serializer};
use hdrhistogram::Histogram;
use serde::de::{Error, Visitor};
use serde::{Deserialize, Deserializer, Serialize};

/// A wrapper for HDR histogram that allows us to serialize/deserialize it to/from
/// a base64 encoded string we can store in JSON report.
pub struct SerializableHistogram(pub Histogram<u64>);

impl Serialize for SerializableHistogram {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut serialized_histogram = Vec::new();
V2Serializer::new()
.serialize(&self.0, &mut serialized_histogram)
.unwrap();
let encoded = base64::encode(serialized_histogram);
serializer.serialize_str(encoded.as_str())
}
}

struct HistogramVisitor;

impl<'de> Visitor<'de> for HistogramVisitor {
type Value = SerializableHistogram;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a compressed HDR histogram encoded as base64 string")
}

fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: Error,
{
let decoded =
base64::decode(v).map_err(|e| E::custom(format!("Not a valid base64 value. {}", e)))?;
let mut cursor = Cursor::new(&decoded);
let mut deserializer = hdrhistogram::serialization::Deserializer::new();
Ok(SerializableHistogram(
deserializer
.deserialize(&mut cursor)
.map_err(|e| E::custom(e))?,
))
}
}

impl<'de> Deserialize<'de> for SerializableHistogram {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_str(HistogramVisitor)
}
}
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::workload::{FnRef, Workload, WorkloadStats, LOAD_FN, RUN_FN};
mod config;
mod deadline;
mod error;
mod histogram;
mod interrupt;
mod progress;
mod report;
Expand Down
10 changes: 5 additions & 5 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub struct SessionStats {
pub row_count: u64,
pub queue_length: u64,
pub mean_queue_length: f32,
pub resp_times_us: Histogram<u64>,
pub resp_times_ns: Histogram<u64>,
}

impl SessionStats {
Expand All @@ -119,8 +119,8 @@ impl SessionStats {

pub fn complete_request(&mut self, duration: Duration, rs: &Result<QueryResult, QueryError>) {
self.queue_length -= 1;
let duration_us = duration.as_micros().clamp(1, u64::MAX as u128) as u64;
self.resp_times_us.record(duration_us).unwrap();
let duration_ns = duration.as_nanos().clamp(1, u64::MAX as u128) as u64;
self.resp_times_ns.record(duration_ns).unwrap();
self.req_count += 1;
match rs {
Ok(rs) => self.row_count += rs.rows.as_ref().map(|r| r.len()).unwrap_or(0) as u64,
Expand All @@ -138,7 +138,7 @@ impl SessionStats {
self.req_count = 0;
self.mean_queue_length = 0.0;
self.req_errors.clear();
self.resp_times_us.clear();
self.resp_times_ns.clear();

// note that current queue_length is *not* reset to zero because there
// might be pending requests and if we set it to zero, that would underflow
Expand All @@ -154,7 +154,7 @@ impl Default for SessionStats {
row_count: 0,
queue_length: 0,
mean_queue_length: 0.0,
resp_times_us: Histogram::new(3).unwrap(),
resp_times_ns: Histogram::new(3).unwrap(),
}
}
}
Expand Down
57 changes: 34 additions & 23 deletions src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::time::Instant;

use crate::workload::WorkloadStats;
use cpu_time::ProcessTime;
use hdrhistogram::Histogram;
use serde::{Deserialize, Serialize};
Expand All @@ -12,6 +11,9 @@ use strum::EnumCount;
use strum::IntoEnumIterator;
use strum_macros::{EnumCount as EnumCountM, EnumIter};

use crate::histogram::SerializableHistogram;
use crate::workload::WorkloadStats;

/// Controls the maximum order of autocovariance taken into
/// account when estimating the long run mean error. Higher values make the estimator
/// capture more autocorrelation from the signal, but also make the results
Expand Down Expand Up @@ -95,10 +97,10 @@ pub fn long_run_err(mean: f64, values: &[f32], weights: &[f32]) -> f64 {
(long_run_variance(mean, values, weights) / values.len() as f64).sqrt()
}

fn percentiles(hist: &Histogram<u64>) -> [f32; Percentile::COUNT] {
fn percentiles_ms(hist: &Histogram<u64>) -> [f32; Percentile::COUNT] {
let mut percentiles = [0.0; Percentile::COUNT];
for (i, p) in Percentile::iter().enumerate() {
percentiles[i] = hist.value_at_percentile(p.value()) as f32 / 1000.0;
percentiles[i] = hist.value_at_percentile(p.value()) as f32 / 1000000.0;
}
percentiles
}
Expand Down Expand Up @@ -155,10 +157,10 @@ pub fn t_test(mean1: &Mean, mean2: &Mean) -> f64 {
fn distribution(hist: &Histogram<u64>) -> Vec<Bucket> {
let mut result = Vec::new();
if !hist.is_empty() {
for x in hist.iter_log(100, 2.15443469) {
for x in hist.iter_log(100000, 2.15443469) {
result.push(Bucket {
percentile: x.percentile(),
duration_ms: x.value_iterated_to() as f64 / 1000.0,
duration_ms: x.value_iterated_to() as f64 / 1000000.0,
count: x.count_since_last_iteration(),
cumulative_count: x.count_at_value(),
});
Expand Down Expand Up @@ -249,21 +251,26 @@ pub struct Sample {
pub mean_resp_time_ms: f32,
pub call_time_percentiles: [f32; Percentile::COUNT],
pub resp_time_percentiles: [f32; Percentile::COUNT],
pub call_time_histogram_ns: SerializableHistogram,
pub resp_time_histogram_ns: SerializableHistogram,
}

impl Sample {
pub fn new(base_start_time: Instant, stats: &[WorkloadStats]) -> Sample {
assert!(!stats.is_empty());
let mut call_count = 0;
let mut call_times_us = Histogram::new(3).unwrap();
let mut call_times_ns = Histogram::new(3).unwrap();

let mut request_count = 0;
let mut row_count = 0;
let mut errors = HashSet::new();
let mut error_count = 0;
let mut mean_queue_len = 0.0;
let mut duration_s = 0.0;
let mut resp_times_us = Histogram::new(3).unwrap();
let mut resp_times_ns = Histogram::new(3).unwrap();

let mut call_time_histogram_ns = Histogram::new(3).unwrap();
let mut resp_time_histogram_ns = Histogram::new(3).unwrap();

for s in stats {
let ss = &s.session_stats;
Expand All @@ -276,13 +283,15 @@ impl Sample {
error_count += ss.req_error_count;
mean_queue_len += ss.mean_queue_length / stats.len() as f32;
duration_s += (s.end_time - s.start_time).as_secs_f32() / stats.len() as f32;
resp_times_us.add(&ss.resp_times_us).unwrap();
resp_times_ns.add(&ss.resp_times_ns).unwrap();
resp_time_histogram_ns.add(&ss.resp_times_ns).unwrap();

call_count += fs.call_count;
call_times_us.add(&fs.call_durations_us).unwrap();
call_times_ns.add(&fs.call_times_ns).unwrap();
call_time_histogram_ns.add(&fs.call_times_ns).unwrap();
}
let resp_time_percentiles = percentiles(&resp_times_us);
let call_time_percentiles = percentiles(&call_times_us);
let resp_time_percentiles = percentiles_ms(&resp_times_ns);
let call_time_percentiles = percentiles_ms(&call_times_ns);

Sample {
time_s: (stats[0].start_time - base_start_time).as_secs_f32(),
Expand All @@ -296,10 +305,12 @@ impl Sample {
call_throughput: call_count as f32 / duration_s,
req_throughput: request_count as f32 / duration_s,
row_throughput: row_count as f32 / duration_s,
mean_call_time_ms: call_times_us.mean() as f32 / 1000.0,
mean_call_time_ms: call_times_ns.mean() as f32 / 1000000.0,
call_time_histogram_ns: SerializableHistogram(call_time_histogram_ns),
call_time_percentiles,
mean_resp_time_ms: resp_times_us.mean() as f32 / 1000.0,
mean_resp_time_ms: resp_times_ns.mean() as f32 / 1000000.0,
resp_time_percentiles,
resp_time_histogram_ns: SerializableHistogram(resp_time_histogram_ns),
}
}
}
Expand Down Expand Up @@ -501,8 +512,8 @@ pub struct Recorder {
pub errors: HashSet<String>,
pub error_count: u64,
pub row_count: u64,
pub call_times_us: Histogram<u64>,
pub resp_times_us: Histogram<u64>,
pub call_times_ns: Histogram<u64>,
pub resp_times_ns: Histogram<u64>,
pub queue_len_sum: u64,
log: Log,
rate_limit: Option<f64>,
Expand All @@ -528,8 +539,8 @@ impl Recorder {
row_count: 0,
errors: HashSet::new(),
error_count: 0,
call_times_us: Histogram::new(3).unwrap(),
resp_times_us: Histogram::new(3).unwrap(),
call_times_ns: Histogram::new(3).unwrap(),
resp_times_ns: Histogram::new(3).unwrap(),
queue_len_sum: 0,
}
}
Expand All @@ -538,11 +549,11 @@ impl Recorder {
/// Called on completion of each sample.
pub fn record(&mut self, samples: &[WorkloadStats]) -> &Sample {
for s in samples.iter() {
self.resp_times_us
.add(&s.session_stats.resp_times_us)
self.resp_times_ns
.add(&s.session_stats.resp_times_ns)
.unwrap();
self.call_times_us
.add(&s.function_stats.call_durations_us)
self.call_times_ns
.add(&s.function_stats.call_times_ns)
.unwrap();
}
let stats = Sample::new(self.start_time, samples);
Expand Down Expand Up @@ -609,12 +620,12 @@ impl Recorder {
call_time_ms: TimeDistribution {
mean: self.log.call_time_ms(),
percentiles: call_time_percentiles,
distribution: distribution(&self.call_times_us),
distribution: distribution(&self.call_times_ns),
},
resp_time_ms: TimeDistribution {
mean: self.log.resp_time_ms(),
percentiles: resp_time_percentiles,
distribution: distribution(&self.resp_times_us),
distribution: distribution(&self.resp_times_ns),
},
concurrency,
concurrency_ratio,
Expand Down
8 changes: 4 additions & 4 deletions src/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,14 @@ impl Program {
#[derive(Clone, Debug)]
pub struct FnStats {
pub call_count: u64,
pub call_durations_us: Histogram<u64>,
pub call_times_ns: Histogram<u64>,
}

impl FnStats {
pub fn operation_completed(&mut self, duration: Duration) {
self.call_count += 1;
self.call_durations_us
.record(duration.as_micros().clamp(1, u64::MAX as u128) as u64)
self.call_times_ns
.record(duration.as_nanos().clamp(1, u64::MAX as u128) as u64)
.unwrap();
}
}
Expand All @@ -324,7 +324,7 @@ impl Default for FnStats {
fn default() -> Self {
FnStats {
call_count: 0,
call_durations_us: Histogram::new(3).unwrap(),
call_times_ns: Histogram::new(3).unwrap(),
}
}
}
Expand Down

0 comments on commit bc62c55

Please sign in to comment.