Skip to content

Commit

Permalink
Add top k metrics for cache hit rate and failure rate (#32432)
Browse files Browse the repository at this point in the history
Adds top k metrics for failure rate and cache hit rate to be used by the dashboard.

The result of these endpoints is a map of function identifier to timeseries, where there are `k+1` items. The `_rest` key is a timeseries of the sum of the metric for all functions not included in `k`

GitOrigin-RevId: eb20733e6d647ecb5821e9e8f47b3532b9c023a1
  • Loading branch information
atrakh authored and Convex, Inc. committed Dec 19, 2024
1 parent 714378d commit 399aaeb
Show file tree
Hide file tree
Showing 5 changed files with 334 additions and 51 deletions.
289 changes: 242 additions & 47 deletions crates/application/src/function_log.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{
cell::Cell,
cmp::Ordering,
collections::{
BTreeMap,
HashMap,
VecDeque,
},
str::FromStr,
Expand Down Expand Up @@ -73,9 +73,11 @@ use serde_json::{
};
use tokio::sync::oneshot;
use udf_metrics::{
CounterBucket,
MetricName,
MetricStore,
MetricStoreConfig,
MetricType,
MetricsWindow,
Percentile,
Timeseries,
Expand Down Expand Up @@ -1065,11 +1067,7 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
UdfRate::CacheHits => udf_cache_hits_metric(&identifier),
UdfRate::CacheMisses => udf_cache_misses_metric(&identifier),
};
let buckets = metrics
.query_counter(&name, window.start..window.end)?
.into_iter()
.map(|bucket| (bucket.index, bucket.value))
.collect();
let buckets = metrics.query_counter(&name, window.start..window.end)?;
window.resample_counters(&metrics, buckets, true)
}

Expand All @@ -1086,42 +1084,163 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
&udf_cache_hits_metric(&identifier),
window.start..window.end,
)?;
let hits = window.resample_counters(&metrics, hits, false /* is_rate */)?;
let misses = metrics.query_counter(
&udf_cache_misses_metric(&identifier),
window.start..window.end,
)?;
let misses = window.resample_counters(&metrics, misses, false /* is_rate */)?;

// Merge the two timeseries by index, computing the hit percentage for each
// bucket.
let mut hits_iter = hits.into_iter().peekable();
let mut misses_iter = misses.into_iter().peekable();
let mut result = Vec::new();
merge_cache_hit_rate(&hits, &misses)
}

loop {
let ordering = match (hits_iter.peek(), misses_iter.peek()) {
(Some(hit), Some(miss)) => hit.index.cmp(&miss.index),
(Some(_), None) => Ordering::Less,
(None, Some(_)) => Ordering::Greater,
(None, None) => break,
};
match ordering {
Ordering::Less => {
let hit = hits_iter.next().unwrap();
result.push((hit.index, 100.));
},
Ordering::Equal => {
let hit = hits_iter.next().unwrap();
let miss = misses_iter.next().unwrap();
result.push((hit.index, hit.value / (hit.value + miss.value) * 100.));
pub fn failure_percentage_top_k(
&self,
window: MetricsWindow,
k: usize,
) -> anyhow::Result<Vec<(String, Timeseries)>> {
let metrics = {
let inner = self.inner.lock();
inner.metrics.clone()
};

// Get the invocations and errors
let invocations = Self::get_udf_metric_counter(&window, &metrics, "invocations")?;
let errors = Self::get_udf_metric_counter(&window, &metrics, "errors")?;

Self::top_k_for_rate(&window, errors, invocations, k, merge_rate, false)
}

pub fn cache_hit_percentage_top_k(
&self,
window: MetricsWindow,
k: usize,
) -> anyhow::Result<Vec<(String, Timeseries)>> {
let metrics = {
let inner = self.inner.lock();
inner.metrics.clone()
};

// Get the invocations and hits
let hits = Self::get_udf_metric_counter(&window, &metrics, "cache_hits")?;
let misses = Self::get_udf_metric_counter(&window, &metrics, "cache_misses")?;

Self::top_k_for_rate(&window, hits, misses, k, merge_cache_hit_rate, true)
}

fn get_udf_metric_counter(
window: &MetricsWindow,
metrics: &MetricStore,
metric_name: &str,
) -> anyhow::Result<HashMap<String, Timeseries>> {
let metric_names = metrics.metric_names_for_type(MetricType::Counter);

let filtered_metric_names: Vec<_> = metric_names
.iter()
.filter(|name| name.starts_with("udf") && name.ends_with(metric_name))
.collect();

let mut results: HashMap<String, Vec<&CounterBucket>> = HashMap::new();
for metric_name in filtered_metric_names {
let result = metrics.query_counter(metric_name, window.start..window.end)?;
let metric_name_parts: Vec<&str> = metric_name.split(':').collect();
// UDF names can have colons in them, so we need to only exclude
// the metric type and metric name
let metric_name = metric_name_parts[1..metric_name_parts.len() - 1].join(":");
results.insert(metric_name, result);
}

results
.into_iter()
.map(|(k, v)| {
Ok((
k,
window.resample_counters(metrics, v, false /* is_rate */)?,
))
})
.collect()
}

fn top_k(
map: HashMap<String, Timeseries>,
k: usize,
ascending: bool,
) -> Vec<(String, Timeseries)> {
let mut top_k: Vec<_> = map
.iter()
.map(|(key, timeseries)| {
let sum: f64 = timeseries
.iter()
.map(|(_, value)| (*value).unwrap_or_default())
.sum();
(key, sum)
})
.collect();

top_k.sort_by(|(_, a), (_, b)| {
if ascending {
a.total_cmp(b)
} else {
b.total_cmp(a)
}
});
top_k.truncate(k);

let mut ret = Vec::new();
for (name, _) in &top_k {
let timeseries = map.get(*name).unwrap();
ret.push((name.to_string(), timeseries.clone()));
}

ret
}

// Compute the top k rates for the given UDFs
fn top_k_for_rate(
window: &MetricsWindow,
mut ts1: HashMap<String, Timeseries>,
mut ts2: HashMap<String, Timeseries>,
k: usize,
merge: impl Fn(&Timeseries, &Timeseries) -> anyhow::Result<Timeseries>,
ascending: bool,
) -> anyhow::Result<Vec<(String, Timeseries)>> {
let mut map: HashMap<String, Timeseries> = HashMap::new();

for (udf_id, ts1_bucket) in ts1.iter() {
let ts2_bucket = ts2.get(udf_id);

match ts2_bucket {
None => {
let result = ts1_bucket.iter().map(|&(ts, _)| (ts, None)).collect();

map.insert(udf_id.to_string(), result);
},
Ordering::Greater => {
let miss = misses_iter.next().unwrap();
result.push((miss.index, 0.));
Some(ts2_bucket) => {
let merged: Vec<(SystemTime, Option<f64>)> = merge(ts1_bucket, ts2_bucket)?;
map.insert(udf_id.to_string(), merged);
},
}
}

window.resample_counters(&metrics, result, false)
let mut ret = Self::top_k(map, k, ascending);

// Remove the top k from the hits and misses timeseries
// so we can sum up everything that's left over.
for (name, _) in &ret {
ts1.remove(name);
ts2.remove(name);
}

// Sum up the rest of the rates
if !ts2.is_empty() || !ts1.is_empty() {
let rest_ts1 = sum_timeseries(window, ts1.values())?;
let rest_ts2 = sum_timeseries(window, ts2.values())?;

let rest = merge(&rest_ts1, &rest_ts2)?;
ret.push(("_rest".to_string(), rest));
}

Ok(ret)
}

pub fn latency_percentiles(
Expand Down Expand Up @@ -1155,11 +1274,7 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
TableRate::RowsRead => table_rows_read_metric(&table_name),
TableRate::RowsWritten => table_rows_written_metric(&table_name),
};
let buckets = metrics
.query_counter(&name, window.start..window.end)?
.into_iter()
.map(|bucket| (bucket.index, bucket.value))
.collect();
let buckets = metrics.query_counter(&name, window.start..window.end)?;
window.resample_counters(&metrics, buckets, true)
}

Expand Down Expand Up @@ -1306,6 +1421,73 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
}
}

fn sum_timeseries<'a>(
window: &MetricsWindow,
series: impl Iterator<Item = &'a Timeseries>,
) -> anyhow::Result<Timeseries> {
let mut result = (0..window.num_buckets)
.map(|i| Ok((window.bucket_start(i)?, None)))
.collect::<anyhow::Result<Timeseries>>()?;
for timeseries in series {
anyhow::ensure!(timeseries.len() == result.len());
for (i, &(ts, value)) in timeseries.iter().enumerate() {
anyhow::ensure!(ts == result[i].0);
if let Some(value) = value {
*result[i].1.get_or_insert(0.) += value;
}
}
}
Ok(result)
}

/// Merges two series to get the rate, where rate = partial / total * 100.
fn merge_rate(partial: &Timeseries, total: &Timeseries) -> anyhow::Result<Timeseries> {
anyhow::ensure!(total.len() == partial.len());
total
.iter()
.zip(partial)
.map(|(&(t1, total_value), &(t2, partial_value))| {
anyhow::ensure!(t1 == t2);
Ok((
t1,
match (total_value, partial_value) {
(Some(total_value), Some(partial_value)) => {
Some(partial_value / total_value * 100.)
},
(Some(_total_value), None) => Some(0.),
(None, Some(_partial_value)) => Some(100.),
(None, None) => None,
},
))
})
.collect()
}

/// Merges two series to get the cache hit rate,
/// where rate = hits / (misses + hits) * 100.
fn merge_cache_hit_rate(hits: &Timeseries, misses: &Timeseries) -> anyhow::Result<Timeseries> {
anyhow::ensure!(hits.len() == misses.len());
hits.iter()
.zip(misses)
.map(|(&(t1, hits_value), &(t2, misses_value))| {
anyhow::ensure!(t1 == t2);
Ok((
t1,
match (hits_value, misses_value) {
(Some(hits_value), Some(misses_value)) => {
Some(hits_value / (hits_value + misses_value) * 100.)
},
// There are hits but not misses, so the hit rate is 100.
(Some(_hits_value), None) => Some(100.),
// There are misses but not hits, so the hit rate is 0.
(None, Some(_misses_value)) => Some(0.),
(None, None) => None,
},
))
})
.collect()
}

struct Inner<RT: Runtime> {
rt: RT,

Expand Down Expand Up @@ -1400,12 +1582,14 @@ impl<RT: Runtime> Inner<RT> {
let name = udf_errors_metric(&identifier);
self.metrics.add_counter(&name, ts, 1.0)?;
}
if execution.cached_result {
let name = udf_cache_hits_metric(&identifier);
self.metrics.add_counter(&name, ts, 1.0)?;
} else {
let name = udf_cache_misses_metric(&identifier);
self.metrics.add_counter(&name, ts, 1.0)?;
if execution.udf_type == UdfType::Query {
if execution.cached_result {
let name = udf_cache_hits_metric(&identifier);
self.metrics.add_counter(&name, ts, 1.0)?;
} else {
let name = udf_cache_misses_metric(&identifier);
self.metrics.add_counter(&name, ts, 1.0)?;
}
}

let name = udf_execution_time_metric(&identifier);
Expand Down Expand Up @@ -1554,25 +1738,26 @@ impl From<FunctionSummary> for JsonValue {
}

fn udf_invocations_metric(identifier: &UdfIdentifier) -> MetricName {
format!("udf:{}:invocations", identifier)
format!("udf:{}:invocations", udf_metric_name(identifier))
}

fn udf_errors_metric(identifier: &UdfIdentifier) -> MetricName {
format!("udf:{}:errors", identifier)
format!("udf:{}:errors", udf_metric_name(identifier))
}

fn udf_cache_hits_metric(identifier: &UdfIdentifier) -> MetricName {
format!("udf:{}:cache_hits", identifier)
format!("udf:{}:cache_hits", udf_metric_name(identifier))
}

fn udf_cache_misses_metric(identifier: &UdfIdentifier) -> MetricName {
format!("udf:{}:cache_misses", identifier)
format!("udf:{}:cache_misses", udf_metric_name(identifier))
}

fn udf_execution_time_metric(identifier: &UdfIdentifier) -> MetricName {
format!("udf:{}:execution_time", identifier)
format!("udf:{}:execution_time", udf_metric_name(identifier))
}

// TODO: Thread component path through here.
fn table_rows_read_metric(table_name: &TableName) -> MetricName {
format!("table:{}:rows_read", table_name)
}
Expand All @@ -1584,3 +1769,13 @@ fn table_rows_written_metric(table_name: &TableName) -> MetricName {
fn scheduled_job_next_ts_metric() -> &'static str {
"scheduled_jobs:next_ts"
}

fn udf_metric_name(identifier: &UdfIdentifier) -> String {
let (component, id) = identifier.clone().into_component_and_udf_path();
match component {
Some(component) => {
format!("{}/{}", component, id)
},
None => id,
}
}
Loading

0 comments on commit 399aaeb

Please sign in to comment.