From b49e7f4c76e92da99694f68a8e893c293e17709d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 24 Apr 2024 16:27:56 +0800 Subject: [PATCH 1/2] fix: post process result on query full column name of prom labels API Signed-off-by: Ruihang Xia --- src/servers/src/http/prometheus.rs | 9 ++++++++- tests-integration/tests/http.rs | 9 +++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index b02b9b8bd70d..e25f05510e86 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -312,6 +312,8 @@ pub async fn labels_query( if queries.is_empty() { queries = form_params.matches.0; } + + // Fetch all columns if no query matcher is provided if queries.is_empty() { match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await { Ok(labels) => { @@ -323,6 +325,7 @@ pub async fn labels_query( } } + // Otherwise, run queries and extract column name from result set. let start = params .start .or(form_params.start) @@ -331,7 +334,6 @@ pub async fn labels_query( .end .or(form_params.end) .unwrap_or_else(current_time_rfc3339); - let lookback = params .lookback .or(form_params.lookback) @@ -380,6 +382,7 @@ pub async fn labels_query( resp } +/// For `/labels` API without matcher async fn get_all_column_names( catalog: &str, schema: &str, @@ -398,6 +401,10 @@ async fn get_all_column_names( } } + let _ = labels.insert(METRIC_NAME.to_string()); + let _ = labels.remove(GREPTIME_TIMESTAMP); + let _ = labels.remove(GREPTIME_VALUE); + let mut labels_vec = labels.into_iter().collect::>(); labels_vec.sort_unstable(); Ok(labels_vec) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7a3a1a43c55c..83f7d4dcc487 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -472,6 +472,15 @@ pub async fn test_prom_http_api(store_type: StorageType) { // labels without match[] param let res = client.get("/v1/prometheus/api/v1/labels").send().await; assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!([ + "__name__", "cpu", "host", "memory", "number", "ts" + ])) + .unwrap() + ); // labels query with multiple match[] params let res = client From e5461cf20c21d3fe85135e224ced7ceb9c9788cc Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 24 Apr 2024 17:24:10 +0800 Subject: [PATCH 2/2] only preserve tag column Signed-off-by: Ruihang Xia --- src/servers/src/http/prometheus.rs | 54 +++++++++++++++--------------- src/table/src/table.rs | 11 +++++- tests-integration/tests/http.rs | 11 ++---- 3 files changed, 40 insertions(+), 36 deletions(-) diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index e25f05510e86..4f453a1ea43b 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -21,7 +21,6 @@ use catalog::CatalogManagerRef; use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::{Output, OutputData}; use common_recordbatch::RecordBatches; use common_telemetry::tracing; @@ -313,16 +312,22 @@ pub async fn labels_query( queries = form_params.matches.0; } + // Fetch all tag columns. It will be used as white-list for tag names. + let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await + { + Ok(labels) => labels, + Err(e) => { + return PrometheusJsonResponse::error(e.status_code().to_string(), e.output_msg()) + } + }; + // insert the special metric name label + let _ = labels.insert(METRIC_NAME.to_string()); + // Fetch all columns if no query matcher is provided if queries.is_empty() { - match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await { - Ok(labels) => { - return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels)) - } - Err(e) => { - return PrometheusJsonResponse::error(e.status_code().to_string(), e.output_msg()) - } - } + let mut labels_vec = labels.into_iter().collect::>(); + labels_vec.sort_unstable(); + return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec)); } // Otherwise, run queries and extract column name from result set. @@ -339,8 +344,8 @@ pub async fn labels_query( .or(form_params.lookback) .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()); - let mut labels = HashSet::new(); - let _ = labels.insert(METRIC_NAME.to_string()); + let mut fetched_labels = HashSet::new(); + let _ = fetched_labels.insert(METRIC_NAME.to_string()); let mut merge_map = HashMap::new(); for query in queries { @@ -354,7 +359,8 @@ pub async fn labels_query( let result = handler.do_query(&prom_query, query_ctx.clone()).await; if let Err(err) = - retrieve_labels_name_from_query_result(result, &mut labels, &mut merge_map).await + retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map) + .await { // Prometheus won't report error if querying nonexist label and metric if err.status_code() != StatusCode::TableNotFound @@ -368,10 +374,11 @@ pub async fn labels_query( } } - let _ = labels.remove(GREPTIME_TIMESTAMP); - let _ = labels.remove(GREPTIME_VALUE); + // intersect `fetched_labels` with `labels` to filter out non-tag columns + fetched_labels.retain(|l| labels.contains(l)); + let _ = labels.insert(METRIC_NAME.to_string()); - let mut sorted_labels: Vec = labels.into_iter().collect(); + let mut sorted_labels: Vec = fetched_labels.into_iter().collect(); sorted_labels.sort(); let merge_map = merge_map .into_iter() @@ -382,12 +389,12 @@ pub async fn labels_query( resp } -/// For `/labels` API without matcher +/// Get all tag column name of the given schema async fn get_all_column_names( catalog: &str, schema: &str, manager: &CatalogManagerRef, -) -> std::result::Result, catalog::error::Error> { +) -> std::result::Result, catalog::error::Error> { let table_names = manager.table_names(catalog, schema).await?; let mut labels = HashSet::new(); @@ -395,19 +402,12 @@ async fn get_all_column_names( let Some(table) = manager.table(catalog, schema, &table_name).await? else { continue; }; - let schema = table.schema(); - for column in schema.column_schemas() { - labels.insert(column.name.to_string()); + for column in table.primary_key_columns() { + labels.insert(column.name); } } - let _ = labels.insert(METRIC_NAME.to_string()); - let _ = labels.remove(GREPTIME_TIMESTAMP); - let _ = labels.remove(GREPTIME_VALUE); - - let mut labels_vec = labels.into_iter().collect::>(); - labels_vec.sort_unstable(); - Ok(labels_vec) + Ok(labels) } async fn retrieve_series_from_query_result( diff --git a/src/table/src/table.rs b/src/table/src/table.rs index a0a45a07395f..44406c24b239 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use common_query::logical_plan::Expr; use common_recordbatch::SendableRecordBatchStream; -use datatypes::schema::SchemaRef; +use datatypes::schema::{ColumnSchema, SchemaRef}; use snafu::ResultExt; use store_api::data_source::DataSourceRef; use store_api::storage::ScanRequest; @@ -81,4 +81,13 @@ impl Table { pub fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { Ok(vec![self.filter_pushdown; filters.len()]) } + + /// Get primary key columns in the definition order. + pub fn primary_key_columns(&self) -> impl Iterator + '_ { + self.table_info + .meta + .primary_key_indices + .iter() + .map(|i| self.table_info.meta.schema.column_schemas()[*i].clone()) + } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 83f7d4dcc487..d269859b869a 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -463,10 +463,7 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert_eq!(body.status, "success"); assert_eq!( body.data, - serde_json::from_value::(json!([ - "__name__", "cpu", "host", "memory", "ts" - ])) - .unwrap() + serde_json::from_value::(json!(["__name__", "host",])).unwrap() ); // labels without match[] param @@ -476,10 +473,8 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert_eq!(body.status, "success"); assert_eq!( body.data, - serde_json::from_value::(json!([ - "__name__", "cpu", "host", "memory", "number", "ts" - ])) - .unwrap() + serde_json::from_value::(json!(["__name__", "host", "number",])) + .unwrap() ); // labels query with multiple match[] params