diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 8759a39e1..91c2cbc67 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -18,7 +18,9 @@ pub mod utils; -use crate::handlers::http::cluster::utils::{check_liveness, to_url_string}; +use crate::handlers::http::cluster::utils::{ + check_liveness, to_url_string, IngestionStats, QueriedStats, +}; use crate::handlers::http::ingest::PostError; use crate::handlers::http::logstream::error::StreamError; use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY}; @@ -26,11 +28,12 @@ use crate::option::CONFIG; use crate::metrics::prom_utils::Metrics; use crate::storage::object_storage::ingester_metadata_path; -use crate::storage::ObjectStorageError; -use crate::storage::PARSEABLE_ROOT_DIRECTORY; +use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; +use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY}; use actix_web::http::header; use actix_web::{HttpRequest, Responder}; use bytes::Bytes; +use chrono::Utc; use http::StatusCode; use itertools::Itertools; use relative_path::RelativePathBuf; @@ -39,6 +42,8 @@ use url::Url; type IngesterMetadataArr = Vec; +use self::utils::StorageStats; + use super::base_path_without_preceding_slash; use super::modal::IngesterMetadata; @@ -108,51 +113,31 @@ pub async fn sync_streams_with_ingesters( pub async fn fetch_stats_from_ingesters( stream_name: &str, ) -> Result, StreamError> { - let mut stats = Vec::new(); - - let ingester_infos = get_ingester_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingester info: {:?}", err); - StreamError::Anyhow(err) - })?; - - for ingester in ingester_infos { - let url = format!( - "{}{}/logstream/{}/stats", - ingester.domain_name, - base_path_without_preceding_slash(), - stream_name - ); - - match utils::send_stats_request(&url, ingester.clone()).await { - Ok(Some(res)) => { - match serde_json::from_str::(&res.text().await.unwrap()) { - Ok(stat) => stats.push(stat), - Err(err) => { - log::error!( - "Could not parse stats from ingester: {}\n Error: {:?}", - ingester.domain_name, - err - ); - continue; - } - } - } - Ok(None) => { - log::error!("Ingester at {} is not reachable", &ingester.domain_name); - continue; - } - Err(err) => { - log::error!( - "Fatal: failed to fetch stats from ingester: {}\n Error: {:?}", - ingester.domain_name, - err - ); - return Err(err); - } + let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); + let obs = CONFIG + .storage() + .get_object_store() + .get_objects(Some(&path)) + .await?; + let mut ingestion_size = 0u64; + let mut storage_size = 0u64; + let mut count = 0u64; + for ob in obs { + if let Ok(stat) = serde_json::from_slice::(&ob) { + count += stat.stats.events; + ingestion_size += stat.stats.ingestion; + storage_size += stat.stats.storage; } } - Ok(stats) + let qs = QueriedStats::new( + "", + Utc::now(), + IngestionStats::new(count, format!("{} Bytes", ingestion_size), "json"), + StorageStats::new(format!("{} Bytes", storage_size), "parquet"), + ); + + Ok(vec![qs]) } async fn send_stream_sync_request( diff --git a/server/src/handlers/http/cluster/utils.rs b/server/src/handlers/http/cluster/utils.rs index fd1e30ea8..761177d45 100644 --- a/server/src/handlers/http/cluster/utils.rs +++ b/server/src/handlers/http/cluster/utils.rs @@ -98,8 +98,8 @@ impl IngestionStats { #[derive(Debug, Default, Serialize, Deserialize)] pub struct StorageStats { - size: String, - format: String, + pub size: String, + pub format: String, } impl StorageStats { @@ -120,7 +120,7 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { // .unwrap(); // should never be None // get the stream name - let stream_name = stats[0].stream.clone(); + let stream_name = stats[1].stream.clone(); // get the first event at // let min_first_event_at = stats @@ -198,6 +198,8 @@ pub async fn check_liveness(domain_name: &str) -> bool { } /// send a request to the ingester to fetch its stats +/// dead for now +#[allow(dead_code)] pub async fn send_stats_request( url: &str, ingester: IngesterMetadata,