diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 26c0facc4..4afce9b64 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -575,12 +575,15 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { log::info!("Cluster metrics fetched successfully from all ingestors"); if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { let stream_name = INTERNAL_STREAM_NAME; - if let Ok(()) = ingest_internal_stream( - stream_name.to_string(), - bytes::Bytes::from(metrics_bytes), - ) - .await - { + + if matches!( + ingest_internal_stream( + stream_name.to_string(), + bytes::Bytes::from(metrics_bytes), + ) + .await, + Ok(()) + ) { log::info!( "Cluster metrics successfully ingested into internal stream" ); diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index fd05e2d1b..0abfc7ff0 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -186,7 +186,7 @@ impl QueryServer { analytics::init_analytics_scheduler()?; } - if let Ok(()) = init_cluster_metrics_schedular() { + if matches!(init_cluster_metrics_schedular(), Ok(())) { log::info!("Cluster metrics scheduler started successfully"); } let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); diff --git a/server/src/metrics/prom_utils.rs b/server/src/metrics/prom_utils.rs index 9c4cd7ed0..a80ec5d85 100644 --- a/server/src/metrics/prom_utils.rs +++ b/server/src/metrics/prom_utils.rs @@ -105,51 +105,65 @@ impl Metrics { if let PromValue::Gauge(val) = sample.value { prom_dress.parseable_events_ingested += val; } - } else if &sample.metric == "parseable_events_ingested_size" { + } + if &sample.metric == "parseable_events_ingested_size" { if let PromValue::Gauge(val) = sample.value { prom_dress.parseable_events_ingested_size += val; } - } else if &sample.metric == "parseable_lifetime_events_ingested" { + } + if &sample.metric == "parseable_lifetime_events_ingested" { if let PromValue::Gauge(val) = sample.value { prom_dress.parseable_lifetime_events_ingested += val; } - } else if &sample.metric == "parseable_lifetime_events_ingested_size" { + } + if &sample.metric == "parseable_lifetime_events_ingested_size" { if let PromValue::Gauge(val) = sample.value { prom_dress.parseable_lifetime_events_ingested_size += val; } - } else if &sample.metric == "parseable_deleted_events_ingested" { + } + if &sample.metric == "parseable_deleted_events_ingested" { if let PromValue::Gauge(val) = sample.value { prom_dress.parseable_deleted_events_ingested += val; } - } else if &sample.metric == "parseable_deleted_events_ingested_size" { + } + if &sample.metric == "parseable_deleted_events_ingested_size" { if let PromValue::Gauge(val) = sample.value { prom_dress.parseable_deleted_events_ingested_size += val; } - } else if sample.metric == "parseable_staging_files" { + } + if sample.metric == "parseable_staging_files" { if let PromValue::Gauge(val) = sample.value { prom_dress.parseable_staging_files += val; } - } else if sample.metric == "process_resident_memory_bytes" { + } + if sample.metric == "process_resident_memory_bytes" { if let PromValue::Gauge(val) = sample.value { prom_dress.process_resident_memory_bytes += val; } - } else if sample.metric == "parseable_storage_size" { - if sample.labels.get("type").expect("type is present") == "data" { - if let PromValue::Gauge(val) = sample.value { - prom_dress.parseable_storage_size.data += val; - } - } else if sample.labels.get("type").expect("type is present") == "staging" { - if let PromValue::Gauge(val) = sample.value { - prom_dress.parseable_storage_size.staging += val; - } + } + if sample.metric == "parseable_storage_size" + && sample.labels.get("type").expect("type is present") == "data" + { + if let PromValue::Gauge(val) = sample.value { + prom_dress.parseable_storage_size.data += val; + } + } + if sample.metric == "parseable_storage_size" + && sample.labels.get("type").expect("type is present") == "staging" + { + if let PromValue::Gauge(val) = sample.value { + prom_dress.parseable_storage_size.staging += val; } - } else if sample.metric == "parseable_lifetime_events_storage_size" { - if sample.labels.get("type").expect("type is present") == "data" { - if let PromValue::Gauge(val) = sample.value { - prom_dress.parseable_lifetime_storage_size.data += val; - } + } + + if sample.metric == "parseable_lifetime_events_storage_size" + && sample.labels.get("type").expect("type is present") == "data" + { + if let PromValue::Gauge(val) = sample.value { + prom_dress.parseable_lifetime_storage_size.data += val; } - } else if sample.metric == "parseable_deleted_events_storage_size" + } + if sample.metric == "parseable_deleted_events_storage_size" && sample.labels.get("type").expect("type is present") == "data" { if let PromValue::Gauge(val) = sample.value { @@ -157,34 +171,23 @@ impl Metrics { } } } - let about_api_json = Self::from_about_api_response(ingestor_metadata.clone()) + let (commit_id, staging, cache) = Self::from_about_api_response(ingestor_metadata.clone()) .await .map_err(|err| { log::error!("Fatal: failed to get ingestor info: {:?}", err); PostError::Invalid(err.into()) })?; - let commit_id = about_api_json - .get("commit") - .and_then(|x| x.as_str()) - .unwrap_or_default(); - let staging = about_api_json - .get("staging") - .and_then(|x| x.as_str()) - .unwrap_or_default(); - let cache = about_api_json - .get("cache") - .and_then(|x| x.as_str()) - .unwrap_or_default(); - prom_dress.commit = commit_id.to_string(); - prom_dress.staging = staging.to_string(); - prom_dress.cache = cache.to_string(); + + prom_dress.commit = commit_id; + prom_dress.staging = staging; + prom_dress.cache = cache; Ok(prom_dress) } pub async fn from_about_api_response( ingestor_metadata: IngestorMetadata, - ) -> Result { + ) -> Result<(String, String, String), PostError> { let uri = Url::parse(&format!( "{}{}/about", &ingestor_metadata.domain_name, @@ -203,7 +206,23 @@ impl Metrics { let about_api_json = res.text().await.map_err(PostError::NetworkError)?; let about_api_json: serde_json::Value = serde_json::from_str(&about_api_json).map_err(PostError::SerdeError)?; - Ok(about_api_json) + let commit_id = about_api_json + .get("commit") + .and_then(|x| x.as_str()) + .unwrap_or_default(); + let staging = about_api_json + .get("staging") + .and_then(|x| x.as_str()) + .unwrap_or_default(); + let cache = about_api_json + .get("cache") + .and_then(|x| x.as_str()) + .unwrap_or_default(); + Ok(( + commit_id.to_string(), + staging.to_string(), + cache.to_string(), + )) } else { log::warn!( "Failed to fetch about API response from ingestor: {}\n",