From 399aaeb6b0b2c83f28c3a29d6029a882c2f5bd5f Mon Sep 17 00:00:00 2001 From: Ari Date: Thu, 19 Dec 2024 11:02:16 -0800 Subject: [PATCH] Add top k metrics for cache hit rate and failure rate (#32432) Adds top k metrics for failure rate and cache hit rate to be used by the dashboard. The result of these endpoints is a map of function identifier to timeseries, where there are `k+1` items. The `_rest` key is a timeseries of the sum of the metric for all functions not included in `k` GitOrigin-RevId: eb20733e6d647ecb5821e9e8f47b3532b9c023a1 --- crates/application/src/function_log.rs | 289 ++++++++++++++++++++---- crates/application/src/lib.rs | 24 ++ crates/local_backend/src/app_metrics.rs | 39 ++++ crates/local_backend/src/router.rs | 7 + crates/udf_metrics/src/lib.rs | 26 ++- 5 files changed, 334 insertions(+), 51 deletions(-) diff --git a/crates/application/src/function_log.rs b/crates/application/src/function_log.rs index a2d54db4..b85cb1de 100644 --- a/crates/application/src/function_log.rs +++ b/crates/application/src/function_log.rs @@ -1,8 +1,8 @@ use std::{ cell::Cell, - cmp::Ordering, collections::{ BTreeMap, + HashMap, VecDeque, }, str::FromStr, @@ -73,9 +73,11 @@ use serde_json::{ }; use tokio::sync::oneshot; use udf_metrics::{ + CounterBucket, MetricName, MetricStore, MetricStoreConfig, + MetricType, MetricsWindow, Percentile, Timeseries, @@ -1065,11 +1067,7 @@ impl FunctionExecutionLog { UdfRate::CacheHits => udf_cache_hits_metric(&identifier), UdfRate::CacheMisses => udf_cache_misses_metric(&identifier), }; - let buckets = metrics - .query_counter(&name, window.start..window.end)? - .into_iter() - .map(|bucket| (bucket.index, bucket.value)) - .collect(); + let buckets = metrics.query_counter(&name, window.start..window.end)?; window.resample_counters(&metrics, buckets, true) } @@ -1086,42 +1084,163 @@ impl FunctionExecutionLog { &udf_cache_hits_metric(&identifier), window.start..window.end, )?; + let hits = window.resample_counters(&metrics, hits, false /* is_rate */)?; let misses = metrics.query_counter( &udf_cache_misses_metric(&identifier), window.start..window.end, )?; + let misses = window.resample_counters(&metrics, misses, false /* is_rate */)?; - // Merge the two timeseries by index, computing the hit percentage for each - // bucket. - let mut hits_iter = hits.into_iter().peekable(); - let mut misses_iter = misses.into_iter().peekable(); - let mut result = Vec::new(); + merge_cache_hit_rate(&hits, &misses) + } - loop { - let ordering = match (hits_iter.peek(), misses_iter.peek()) { - (Some(hit), Some(miss)) => hit.index.cmp(&miss.index), - (Some(_), None) => Ordering::Less, - (None, Some(_)) => Ordering::Greater, - (None, None) => break, - }; - match ordering { - Ordering::Less => { - let hit = hits_iter.next().unwrap(); - result.push((hit.index, 100.)); - }, - Ordering::Equal => { - let hit = hits_iter.next().unwrap(); - let miss = misses_iter.next().unwrap(); - result.push((hit.index, hit.value / (hit.value + miss.value) * 100.)); + pub fn failure_percentage_top_k( + &self, + window: MetricsWindow, + k: usize, + ) -> anyhow::Result> { + let metrics = { + let inner = self.inner.lock(); + inner.metrics.clone() + }; + + // Get the invocations and errors + let invocations = Self::get_udf_metric_counter(&window, &metrics, "invocations")?; + let errors = Self::get_udf_metric_counter(&window, &metrics, "errors")?; + + Self::top_k_for_rate(&window, errors, invocations, k, merge_rate, false) + } + + pub fn cache_hit_percentage_top_k( + &self, + window: MetricsWindow, + k: usize, + ) -> anyhow::Result> { + let metrics = { + let inner = self.inner.lock(); + inner.metrics.clone() + }; + + // Get the invocations and hits + let hits = Self::get_udf_metric_counter(&window, &metrics, "cache_hits")?; + let misses = Self::get_udf_metric_counter(&window, &metrics, "cache_misses")?; + + Self::top_k_for_rate(&window, hits, misses, k, merge_cache_hit_rate, true) + } + + fn get_udf_metric_counter( + window: &MetricsWindow, + metrics: &MetricStore, + metric_name: &str, + ) -> anyhow::Result> { + let metric_names = metrics.metric_names_for_type(MetricType::Counter); + + let filtered_metric_names: Vec<_> = metric_names + .iter() + .filter(|name| name.starts_with("udf") && name.ends_with(metric_name)) + .collect(); + + let mut results: HashMap> = HashMap::new(); + for metric_name in filtered_metric_names { + let result = metrics.query_counter(metric_name, window.start..window.end)?; + let metric_name_parts: Vec<&str> = metric_name.split(':').collect(); + // UDF names can have colons in them, so we need to only exclude + // the metric type and metric name + let metric_name = metric_name_parts[1..metric_name_parts.len() - 1].join(":"); + results.insert(metric_name, result); + } + + results + .into_iter() + .map(|(k, v)| { + Ok(( + k, + window.resample_counters(metrics, v, false /* is_rate */)?, + )) + }) + .collect() + } + + fn top_k( + map: HashMap, + k: usize, + ascending: bool, + ) -> Vec<(String, Timeseries)> { + let mut top_k: Vec<_> = map + .iter() + .map(|(key, timeseries)| { + let sum: f64 = timeseries + .iter() + .map(|(_, value)| (*value).unwrap_or_default()) + .sum(); + (key, sum) + }) + .collect(); + + top_k.sort_by(|(_, a), (_, b)| { + if ascending { + a.total_cmp(b) + } else { + b.total_cmp(a) + } + }); + top_k.truncate(k); + + let mut ret = Vec::new(); + for (name, _) in &top_k { + let timeseries = map.get(*name).unwrap(); + ret.push((name.to_string(), timeseries.clone())); + } + + ret + } + + // Compute the top k rates for the given UDFs + fn top_k_for_rate( + window: &MetricsWindow, + mut ts1: HashMap, + mut ts2: HashMap, + k: usize, + merge: impl Fn(&Timeseries, &Timeseries) -> anyhow::Result, + ascending: bool, + ) -> anyhow::Result> { + let mut map: HashMap = HashMap::new(); + + for (udf_id, ts1_bucket) in ts1.iter() { + let ts2_bucket = ts2.get(udf_id); + + match ts2_bucket { + None => { + let result = ts1_bucket.iter().map(|&(ts, _)| (ts, None)).collect(); + + map.insert(udf_id.to_string(), result); }, - Ordering::Greater => { - let miss = misses_iter.next().unwrap(); - result.push((miss.index, 0.)); + Some(ts2_bucket) => { + let merged: Vec<(SystemTime, Option)> = merge(ts1_bucket, ts2_bucket)?; + map.insert(udf_id.to_string(), merged); }, } } - window.resample_counters(&metrics, result, false) + let mut ret = Self::top_k(map, k, ascending); + + // Remove the top k from the hits and misses timeseries + // so we can sum up everything that's left over. + for (name, _) in &ret { + ts1.remove(name); + ts2.remove(name); + } + + // Sum up the rest of the rates + if !ts2.is_empty() || !ts1.is_empty() { + let rest_ts1 = sum_timeseries(window, ts1.values())?; + let rest_ts2 = sum_timeseries(window, ts2.values())?; + + let rest = merge(&rest_ts1, &rest_ts2)?; + ret.push(("_rest".to_string(), rest)); + } + + Ok(ret) } pub fn latency_percentiles( @@ -1155,11 +1274,7 @@ impl FunctionExecutionLog { TableRate::RowsRead => table_rows_read_metric(&table_name), TableRate::RowsWritten => table_rows_written_metric(&table_name), }; - let buckets = metrics - .query_counter(&name, window.start..window.end)? - .into_iter() - .map(|bucket| (bucket.index, bucket.value)) - .collect(); + let buckets = metrics.query_counter(&name, window.start..window.end)?; window.resample_counters(&metrics, buckets, true) } @@ -1306,6 +1421,73 @@ impl FunctionExecutionLog { } } +fn sum_timeseries<'a>( + window: &MetricsWindow, + series: impl Iterator, +) -> anyhow::Result { + let mut result = (0..window.num_buckets) + .map(|i| Ok((window.bucket_start(i)?, None))) + .collect::>()?; + for timeseries in series { + anyhow::ensure!(timeseries.len() == result.len()); + for (i, &(ts, value)) in timeseries.iter().enumerate() { + anyhow::ensure!(ts == result[i].0); + if let Some(value) = value { + *result[i].1.get_or_insert(0.) += value; + } + } + } + Ok(result) +} + +/// Merges two series to get the rate, where rate = partial / total * 100. +fn merge_rate(partial: &Timeseries, total: &Timeseries) -> anyhow::Result { + anyhow::ensure!(total.len() == partial.len()); + total + .iter() + .zip(partial) + .map(|(&(t1, total_value), &(t2, partial_value))| { + anyhow::ensure!(t1 == t2); + Ok(( + t1, + match (total_value, partial_value) { + (Some(total_value), Some(partial_value)) => { + Some(partial_value / total_value * 100.) + }, + (Some(_total_value), None) => Some(0.), + (None, Some(_partial_value)) => Some(100.), + (None, None) => None, + }, + )) + }) + .collect() +} + +/// Merges two series to get the cache hit rate, +/// where rate = hits / (misses + hits) * 100. +fn merge_cache_hit_rate(hits: &Timeseries, misses: &Timeseries) -> anyhow::Result { + anyhow::ensure!(hits.len() == misses.len()); + hits.iter() + .zip(misses) + .map(|(&(t1, hits_value), &(t2, misses_value))| { + anyhow::ensure!(t1 == t2); + Ok(( + t1, + match (hits_value, misses_value) { + (Some(hits_value), Some(misses_value)) => { + Some(hits_value / (hits_value + misses_value) * 100.) + }, + // There are hits but not misses, so the hit rate is 100. + (Some(_hits_value), None) => Some(100.), + // There are misses but not hits, so the hit rate is 0. + (None, Some(_misses_value)) => Some(0.), + (None, None) => None, + }, + )) + }) + .collect() +} + struct Inner { rt: RT, @@ -1400,12 +1582,14 @@ impl Inner { let name = udf_errors_metric(&identifier); self.metrics.add_counter(&name, ts, 1.0)?; } - if execution.cached_result { - let name = udf_cache_hits_metric(&identifier); - self.metrics.add_counter(&name, ts, 1.0)?; - } else { - let name = udf_cache_misses_metric(&identifier); - self.metrics.add_counter(&name, ts, 1.0)?; + if execution.udf_type == UdfType::Query { + if execution.cached_result { + let name = udf_cache_hits_metric(&identifier); + self.metrics.add_counter(&name, ts, 1.0)?; + } else { + let name = udf_cache_misses_metric(&identifier); + self.metrics.add_counter(&name, ts, 1.0)?; + } } let name = udf_execution_time_metric(&identifier); @@ -1554,25 +1738,26 @@ impl From for JsonValue { } fn udf_invocations_metric(identifier: &UdfIdentifier) -> MetricName { - format!("udf:{}:invocations", identifier) + format!("udf:{}:invocations", udf_metric_name(identifier)) } fn udf_errors_metric(identifier: &UdfIdentifier) -> MetricName { - format!("udf:{}:errors", identifier) + format!("udf:{}:errors", udf_metric_name(identifier)) } fn udf_cache_hits_metric(identifier: &UdfIdentifier) -> MetricName { - format!("udf:{}:cache_hits", identifier) + format!("udf:{}:cache_hits", udf_metric_name(identifier)) } fn udf_cache_misses_metric(identifier: &UdfIdentifier) -> MetricName { - format!("udf:{}:cache_misses", identifier) + format!("udf:{}:cache_misses", udf_metric_name(identifier)) } fn udf_execution_time_metric(identifier: &UdfIdentifier) -> MetricName { - format!("udf:{}:execution_time", identifier) + format!("udf:{}:execution_time", udf_metric_name(identifier)) } +// TODO: Thread component path through here. fn table_rows_read_metric(table_name: &TableName) -> MetricName { format!("table:{}:rows_read", table_name) } @@ -1584,3 +1769,13 @@ fn table_rows_written_metric(table_name: &TableName) -> MetricName { fn scheduled_job_next_ts_metric() -> &'static str { "scheduled_jobs:next_ts" } + +fn udf_metric_name(identifier: &UdfIdentifier) -> String { + let (component, id) = identifier.clone().into_component_and_udf_path(); + match component { + Some(component) => { + format!("{}/{}", component, id) + }, + None => id, + } +} diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index 1b6fab6c..56a9d175 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -2607,6 +2607,30 @@ impl Application { self.function_log.udf_rate(identifier, metric, window) } + pub async fn failure_percentage_top_k( + &self, + identity: Identity, + window: MetricsWindow, + k: usize, + ) -> anyhow::Result> { + if !(identity.is_admin() || identity.is_system()) { + anyhow::bail!(unauthorized_error("failure_percentage_top_k")); + } + self.function_log.failure_percentage_top_k(window, k) + } + + pub async fn cache_hit_percentage_top_k( + &self, + identity: Identity, + window: MetricsWindow, + k: usize, + ) -> anyhow::Result> { + if !(identity.is_admin() || identity.is_system()) { + anyhow::bail!(unauthorized_error("failure_percentage_top_k")); + } + self.function_log.cache_hit_percentage_top_k(window, k) + } + pub async fn cache_hit_percentage( &self, identity: Identity, diff --git a/crates/local_backend/src/app_metrics.rs b/crates/local_backend/src/app_metrics.rs index 1af709e5..4a78f04b 100644 --- a/crates/local_backend/src/app_metrics.rs +++ b/crates/local_backend/src/app_metrics.rs @@ -58,6 +58,45 @@ pub(crate) async fn udf_rate( .application .udf_rate(identity, udf_identifier, metric.parse()?, window) .await?; + Ok(Json(timeseries.clone())) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct TopKQueryArgs { + window: String, + k: Option, +} + +pub(crate) async fn failure_percentage_top_k( + State(st): State, + ExtractIdentity(identity): ExtractIdentity, + Query(TopKQueryArgs { window, k }): Query, +) -> Result { + let window_json: serde_json::Value = + serde_json::from_str(&window).map_err(anyhow::Error::new)?; + let window = window_json.try_into()?; + + let timeseries = st + .application + .failure_percentage_top_k(identity, window, k.unwrap_or(5)) + .await?; + Ok(Json(timeseries)) +} + +pub(crate) async fn cache_hit_percentage_top_k( + State(st): State, + ExtractIdentity(identity): ExtractIdentity, + Query(TopKQueryArgs { window, k }): Query, +) -> Result { + let window_json: serde_json::Value = + serde_json::from_str(&window).map_err(anyhow::Error::new)?; + let window = window_json.try_into()?; + + let timeseries = st + .application + .cache_hit_percentage_top_k(identity, window, k.unwrap_or(5)) + .await?; Ok(Json(timeseries)) } diff --git a/crates/local_backend/src/router.rs b/crates/local_backend/src/router.rs index 6d2f6b0e..39613abd 100644 --- a/crates/local_backend/src/router.rs +++ b/crates/local_backend/src/router.rs @@ -52,6 +52,8 @@ use tower_http::{ use crate::{ app_metrics::{ cache_hit_percentage, + cache_hit_percentage_top_k, + failure_percentage_top_k, latency_percentiles, scheduled_job_lag, table_rate, @@ -310,6 +312,11 @@ where .route("/stream_udf_execution", get(stream_udf_execution)) .route("/stream_function_logs", get(stream_function_logs)) .route("/udf_rate", get(udf_rate)) + .route("/failure_percentage_top_k", get(failure_percentage_top_k)) + .route( + "/cache_hit_percentage_top_k", + get(cache_hit_percentage_top_k), + ) .route("/cache_hit_percentage", get(cache_hit_percentage)) .route("/table_rate", get(table_rate)) .route("/latency_percentiles", get(latency_percentiles)) diff --git a/crates/udf_metrics/src/lib.rs b/crates/udf_metrics/src/lib.rs index 948485e1..67dbf414 100644 --- a/crates/udf_metrics/src/lib.rs +++ b/crates/udf_metrics/src/lib.rs @@ -24,7 +24,9 @@ //! time, and one on `(metric_key, bucket_index)` for efficiently finding the //! buckets for a given metric. use std::{ - cmp, + cmp::{ + self, + }, collections::{ BTreeMap, BTreeSet, @@ -319,6 +321,22 @@ impl MetricStore { Ok(()) } + /// Query all of the metrics that match a given metric type and name within + /// a desired time range. The time range is inclusive of its start + /// endpoint and exclusive of its end endpoint. + pub fn metric_names_for_type(&self, metric_type: MetricType) -> Vec { + self.metrics + .iter() + .filter_map(|(_, metric)| { + if metric.metric_type == metric_type { + Some(metric.name.clone()) + } else { + None + } + }) + .collect() + } + /// Query all of the counter buckets that cover a desired time range. The /// time range is inclusive of its start endpoint and exclusive of its /// end endpoint. @@ -633,7 +651,7 @@ impl MetricsWindow { pub fn resample_counters( &self, metrics: &MetricStore, - buckets: Vec<(BucketIndex, f32)>, + buckets: Vec<&CounterBucket>, is_rate: bool, ) -> anyhow::Result { // Start by filling out the output buckets with unknown values. @@ -659,8 +677,8 @@ impl MetricsWindow { // simply find which output bucket the input bucket's start time falls into. // This may create some aliasing, especially if the output bucket size is small // relative to the input bucket size, but is good enough for now. - for (bucket_index, value) in buckets { - let bucket_start = metrics.bucket_start(bucket_index); + for &CounterBucket { index, value } in buckets { + let bucket_start = metrics.bucket_start(index); if (self.start..self.end).contains(&bucket_start) { let (_, existing) = &mut result[self.bucket_index(bucket_start)?]; *existing.as_mut().context("Missing counter")? += value as f64;