diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 304752ccf..5243fd106 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -20,9 +20,9 @@ use std::{io::ErrorKind, sync::Arc}; use self::{column::Column, snapshot::ManifestItem}; use crate::handlers::http::base_path_without_preceding_slash; -use crate::metrics::{EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, STORAGE_SIZE_TODAY}; +use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; use crate::option::CONFIG; -use crate::stats::{event_labels, storage_size_labels, update_deleted_stats}; +use crate::stats::{event_labels_date, storage_size_labels_date, update_deleted_stats}; use crate::{ catalog::manifest::Manifest, event::DEFAULT_TIMESTAMP_KEY, @@ -102,22 +102,6 @@ pub async fn update_snapshot( stream_name: &str, change: manifest::File, ) -> Result<(), ObjectStorageError> { - // get current snapshot - let event_labels = event_labels(stream_name, "json"); - let storage_size_labels = storage_size_labels(stream_name); - let events_ingested = EVENTS_INGESTED_TODAY - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let ingestion_size = EVENTS_INGESTED_SIZE_TODAY - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let storage_size = STORAGE_SIZE_TODAY - .get_metric_with_label_values(&storage_size_labels) - .unwrap() - .get() as u64; - let mut meta = storage.get_object_store_format(stream_name).await?; let meta_clone = meta.clone(); let manifests = &mut meta.snapshot.manifest_list; @@ -132,6 +116,21 @@ pub async fn update_snapshot( lower_bound } }; + let date = lower_bound.date_naive().format("%Y-%m-%d").to_string(); + let event_labels = event_labels_date(stream_name, "json", &date); + let storage_size_labels = storage_size_labels_date(stream_name, &date); + let events_ingested = EVENTS_INGESTED_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let ingestion_size = EVENTS_INGESTED_SIZE_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let storage_size = EVENTS_STORAGE_SIZE_DATE + .get_metric_with_label_values(&storage_size_labels) + .unwrap() + .get() as u64; let pos = manifests.iter().position(|item| { item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound }); @@ -149,6 +148,25 @@ pub async fn update_snapshot( for m in manifests.iter_mut() { let p = manifest_path("").to_string(); if m.manifest_path.contains(&p) { + let date = m + .time_lower_bound + .date_naive() + .format("%Y-%m-%d") + .to_string(); + let event_labels = event_labels_date(stream_name, "json", &date); + let storage_size_labels = storage_size_labels_date(stream_name, &date); + let events_ingested = EVENTS_INGESTED_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let ingestion_size = EVENTS_INGESTED_SIZE_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let storage_size = EVENTS_STORAGE_SIZE_DATE + .get_metric_with_label_values(&storage_size_labels) + .unwrap() + .get() as u64; ch = true; m.events_ingested = events_ingested; m.ingestion_size = ingestion_size; diff --git a/server/src/event.rs b/server/src/event.rs index 6077ccb30..9e8f09a6e 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -82,6 +82,7 @@ impl Event { self.origin_format, self.origin_size, num_rows, + self.parsed_timestamp, )?; crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb); diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 15e343747..76f847672 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -25,8 +25,10 @@ use crate::handlers::{ CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, }; use crate::metadata::STREAM_INFO; +use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; use crate::option::{Mode, CONFIG}; use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}; +use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo}; use crate::{ catalog::{self, remove_manifest_from_snapshot}, @@ -482,6 +484,29 @@ pub async fn put_enable_cache( StatusCode::OK, )) } +pub async fn get_stats_date(stream_name: &str, date: &str) -> Result { + let event_labels = event_labels_date(stream_name, "json", date); + let storage_size_labels = storage_size_labels_date(stream_name, date); + let events_ingested = EVENTS_INGESTED_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let ingestion_size = EVENTS_INGESTED_SIZE_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let storage_size = EVENTS_STORAGE_SIZE_DATE + .get_metric_with_label_values(&storage_size_labels) + .unwrap() + .get() as u64; + + let stats = Stats { + events: events_ingested, + ingestion: ingestion_size, + storage: storage_size, + }; + Ok(stats) +} pub async fn get_stats(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -490,6 +515,25 @@ pub async fn get_stats(req: HttpRequest) -> Result return Err(StreamError::StreamNotFound(stream_name)); } + let query_string = req.query_string(); + if !query_string.is_empty() { + let date_key = query_string.split('=').collect::>()[0]; + let date_value = query_string.split('=').collect::>()[1]; + if date_key != "date" { + return Err(StreamError::Custom { + msg: "Invalid query parameter".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + if !date_value.is_empty() { + let stats = get_stats_date(&stream_name, date_value).await?; + let stats = serde_json::to_value(stats)?; + + return Ok((web::Json(stats), StatusCode::OK)); + } + } + let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 869d92b8c..24e978dda 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -339,7 +339,6 @@ impl IngestServer { } metrics::fetch_stats_from_storage().await; - metrics::reset_daily_metric_from_global(); let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index e13cd4f14..4b69ccf01 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -184,7 +184,6 @@ impl QueryServer { // load data from stats back to prometheus metrics metrics::fetch_stats_from_storage().await; - metrics::reset_daily_metric_from_global(); // track all parquet files already in the data directory storage::retention::load_retention_from_global(); diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 70a226a72..dd809de6d 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -508,7 +508,6 @@ impl Server { DASHBOARDS.load().await?; metrics::fetch_stats_from_storage().await; - metrics::reset_daily_metric_from_global(); storage::retention::load_retention_from_global(); let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 76d5dd0da..a9a8ac1dd 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -18,7 +18,7 @@ use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; -use chrono::Local; +use chrono::{Local, NaiveDateTime}; use itertools::Itertools; use once_cell::sync::Lazy; use std::collections::HashMap; @@ -27,10 +27,10 @@ use std::sync::{Arc, RwLock}; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; use crate::alerts::Alerts; use crate::metrics::{ - EVENTS_INGESTED, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, - LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, + EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, + EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, }; -use crate::storage::{LogStream, ObjectStorage, StorageDir}; +use crate::storage::{LogStream, ObjectStorage, ObjectStoreFormat, StorageDir}; use crate::utils::arrow::MergedRecordReader; use derive_more::{Deref, DerefMut}; @@ -244,7 +244,8 @@ impl StreamInfo { let alerts = storage.get_alerts(&stream.name).await?; let schema = storage.get_schema_on_server_start(&stream.name).await?; let meta = storage.get_stream_metadata(&stream.name).await?; - + let meta_clone = meta.clone(); + let stream_name = stream.name.clone(); let schema = update_schema_from_staging(&stream.name, schema); let schema = HashMap::from_iter( schema @@ -268,10 +269,30 @@ impl StreamInfo { let mut map = self.write().expect(LOCK_EXPECT); map.insert(stream.name, metadata); + Self::load_daily_metrics(meta_clone, &stream_name); Ok(()) } + fn load_daily_metrics(meta: ObjectStoreFormat, stream_name: &str) { + let manifests = meta.snapshot.manifest_list; + for manifest in manifests { + let manifest_date = manifest.time_lower_bound.date_naive().to_string(); + let events_ingested = manifest.events_ingested; + let ingestion_size = manifest.ingestion_size; + let storage_size = manifest.storage_size; + EVENTS_INGESTED_DATE + .with_label_values(&[stream_name, "json", &manifest_date]) + .set(events_ingested as i64); + EVENTS_INGESTED_SIZE_DATE + .with_label_values(&[stream_name, "json", &manifest_date]) + .set(ingestion_size as i64); + EVENTS_STORAGE_SIZE_DATE + .with_label_values(&["data", stream_name, "parquet", &manifest_date]) + .set(storage_size as i64); + } + } + pub fn list_streams(&self) -> Vec { self.read() .expect(LOCK_EXPECT) @@ -286,18 +307,20 @@ impl StreamInfo { origin: &'static str, size: u64, num_rows: u64, + parsed_timestamp: NaiveDateTime, ) -> Result<(), MetadataError> { + let parsed_date = parsed_timestamp.date().to_string(); EVENTS_INGESTED .with_label_values(&[stream_name, origin]) .add(num_rows as i64); - EVENTS_INGESTED_TODAY - .with_label_values(&[stream_name, origin]) + EVENTS_INGESTED_DATE + .with_label_values(&[stream_name, origin, parsed_date.as_str()]) .add(num_rows as i64); EVENTS_INGESTED_SIZE .with_label_values(&[stream_name, origin]) .add(size as i64); - EVENTS_INGESTED_SIZE_TODAY - .with_label_values(&[stream_name, origin]) + EVENTS_INGESTED_SIZE_DATE + .with_label_values(&[stream_name, origin, parsed_date.as_str()]) .add(size as i64); LIFETIME_EVENTS_INGESTED .with_label_values(&[stream_name, origin]) diff --git a/server/src/metrics/mod.rs b/server/src/metrics/mod.rs index 2407de1ae..dafe34629 100644 --- a/server/src/metrics/mod.rs +++ b/server/src/metrics/mod.rs @@ -18,23 +18,13 @@ pub mod prom_utils; pub mod storage; -use std::sync::Mutex; -use crate::{handlers::http::metrics_path, metadata::STREAM_INFO, metrics, option::CONFIG}; +use crate::{handlers::http::metrics_path, metadata::STREAM_INFO, option::CONFIG}; use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder}; -use clokwerk::AsyncScheduler; -use clokwerk::Job; -use clokwerk::TimeUnits; use once_cell::sync::Lazy; use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry}; -use std::thread; -use std::time::Duration; pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME"); -type SchedulerHandle = thread::JoinHandle<()>; - -static METRIC_SCHEDULER_HANDLER: Lazy>> = - Lazy::new(|| Mutex::new(None)); pub static EVENTS_INGESTED: Lazy = Lazy::new(|| { IntGaugeVec::new( @@ -44,14 +34,6 @@ pub static EVENTS_INGESTED: Lazy = Lazy::new(|| { .expect("metric can be created") }); -pub static EVENTS_INGESTED_TODAY: Lazy = Lazy::new(|| { - IntGaugeVec::new( - Opts::new("events_ingested_today", "Events ingested today").namespace(METRICS_NAMESPACE), - &["stream", "format"], - ) - .expect("metric can be created") -}); - pub static EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| { IntGaugeVec::new( Opts::new("events_ingested_size", "Events ingested size bytes") @@ -61,18 +43,6 @@ pub static EVENTS_INGESTED_SIZE: Lazy = Lazy::new(|| { .expect("metric can be created") }); -pub static EVENTS_INGESTED_SIZE_TODAY: Lazy = Lazy::new(|| { - IntGaugeVec::new( - Opts::new( - "events_ingested_size_today", - "Events ingested size today in bytes", - ) - .namespace(METRICS_NAMESPACE), - &["stream", "format"], - ) - .expect("metric can be created") -}); - pub static STORAGE_SIZE: Lazy = Lazy::new(|| { IntGaugeVec::new( Opts::new("storage_size", "Storage size bytes").namespace(METRICS_NAMESPACE), @@ -81,14 +51,6 @@ pub static STORAGE_SIZE: Lazy = Lazy::new(|| { .expect("metric can be created") }); -pub static STORAGE_SIZE_TODAY: Lazy = Lazy::new(|| { - IntGaugeVec::new( - Opts::new("storage_size_today", "Storage size today in bytes").namespace(METRICS_NAMESPACE), - &["type", "stream", "format"], - ) - .expect("metric can be created") -}); - pub static EVENTS_DELETED: Lazy = Lazy::new(|| { IntGaugeVec::new( Opts::new("events_deleted", "Events deleted").namespace(METRICS_NAMESPACE), @@ -150,6 +112,42 @@ pub static LIFETIME_EVENTS_STORAGE_SIZE: Lazy = Lazy::new(|| { .expect("metric can be created") }); +pub static EVENTS_INGESTED_DATE: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new( + "events_ingested_date", + "Events ingested on a particular date", + ) + .namespace(METRICS_NAMESPACE), + &["stream", "format", "date"], + ) + .expect("metric can be created") +}); + +pub static EVENTS_INGESTED_SIZE_DATE: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new( + "events_ingested_size_date", + "Events ingested size in bytes on a particular date", + ) + .namespace(METRICS_NAMESPACE), + &["stream", "format", "date"], + ) + .expect("metric can be created") +}); + +pub static EVENTS_STORAGE_SIZE_DATE: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new( + "events_storage_size_date", + "Events storage size in bytes on a particular date", + ) + .namespace(METRICS_NAMESPACE), + &["type", "stream", "format", "date"], + ) + .expect("metric can be created") +}); + pub static STAGING_FILES: Lazy = Lazy::new(|| { IntGaugeVec::new( Opts::new("staging_files", "Active Staging files").namespace(METRICS_NAMESPACE), @@ -186,21 +184,12 @@ fn custom_metrics(registry: &Registry) { registry .register(Box::new(EVENTS_INGESTED.clone())) .expect("metric can be registered"); - registry - .register(Box::new(EVENTS_INGESTED_TODAY.clone())) - .expect("metric can be registered"); registry .register(Box::new(EVENTS_INGESTED_SIZE.clone())) .expect("metric can be registered"); - registry - .register(Box::new(EVENTS_INGESTED_SIZE_TODAY.clone())) - .expect("metric can be registered"); registry .register(Box::new(STORAGE_SIZE.clone())) .expect("metric can be registered"); - registry - .register(Box::new(STORAGE_SIZE_TODAY.clone())) - .expect("metric can be registered"); registry .register(Box::new(EVENTS_DELETED.clone())) .expect("metric can be registered"); @@ -219,6 +208,15 @@ fn custom_metrics(registry: &Registry) { registry .register(Box::new(LIFETIME_EVENTS_STORAGE_SIZE.clone())) .expect("metric can be registered"); + registry + .register(Box::new(EVENTS_INGESTED_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(EVENTS_INGESTED_SIZE_DATE.clone())) + .expect("metric can be registered"); + registry + .register(Box::new(EVENTS_STORAGE_SIZE_DATE.clone())) + .expect("metric can be registered"); registry .register(Box::new(STAGING_FILES.clone())) .expect("metric can be registered"); @@ -271,21 +269,12 @@ pub async fn fetch_stats_from_storage() { EVENTS_INGESTED .with_label_values(&[&stream_name, "json"]) .set(stats.current_stats.events as i64); - EVENTS_INGESTED_TODAY - .with_label_values(&[&stream_name, "json"]) - .set(stats.current_date_stats.events as i64); EVENTS_INGESTED_SIZE .with_label_values(&[&stream_name, "json"]) .set(stats.current_stats.ingestion as i64); - EVENTS_INGESTED_SIZE_TODAY - .with_label_values(&[&stream_name, "json"]) - .set(stats.current_date_stats.ingestion as i64); STORAGE_SIZE .with_label_values(&["data", &stream_name, "parquet"]) .set(stats.current_stats.storage as i64); - STORAGE_SIZE_TODAY - .with_label_values(&["data", &stream_name, "parquet"]) - .set(stats.current_date_stats.storage as i64); EVENTS_DELETED .with_label_values(&[&stream_name, "json"]) .set(stats.deleted_stats.events as i64); @@ -307,49 +296,3 @@ pub async fn fetch_stats_from_storage() { .set(stats.lifetime_stats.storage as i64); } } - -fn async_runtime() -> tokio::runtime::Runtime { - tokio::runtime::Builder::new_current_thread() - .thread_name("reset-metrics-task-thread") - .enable_all() - .build() - .unwrap() -} - -pub fn reset_daily_metric_from_global() { - init_reset_daily_metric_scheduler(); -} - -pub fn init_reset_daily_metric_scheduler() { - log::info!("Setting up schedular"); - let mut scheduler = AsyncScheduler::new(); - let func = move || async { - //get retention every day at 12 am - for stream in STREAM_INFO.list_streams() { - metrics::EVENTS_INGESTED_TODAY - .with_label_values(&[&stream, "json"]) - .set(0); - metrics::EVENTS_INGESTED_SIZE_TODAY - .with_label_values(&[&stream, "json"]) - .set(0); - metrics::STORAGE_SIZE_TODAY - .with_label_values(&["data", &stream, "parquet"]) - .set(0); - } - }; - - scheduler.every(1.day()).at("00:00").run(func); - - let scheduler_handler = thread::spawn(|| { - let rt = async_runtime(); - rt.block_on(async move { - loop { - tokio::time::sleep(Duration::from_secs(10)).await; - scheduler.run_pending().await; - } - }); - }); - - *METRIC_SCHEDULER_HANDLER.lock().unwrap() = Some(scheduler_handler); - log::info!("Scheduler is initialized") -} diff --git a/server/src/migration/stream_metadata_migration.rs b/server/src/migration/stream_metadata_migration.rs index 24da19bf0..609e951bd 100644 --- a/server/src/migration/stream_metadata_migration.rs +++ b/server/src/migration/stream_metadata_migration.rs @@ -39,11 +39,6 @@ pub fn v1_v4(mut stream_metadata: Value) -> Value { "events": 0, "ingestion": 0, "storage": 0 - }, - "current_date_stats": { - "events": 0, - "ingestion": 0, - "storage": 0 } }); stream_metadata_map.insert("stats".to_owned(), default_stats); @@ -83,11 +78,6 @@ pub fn v2_v4(mut stream_metadata: Value) -> Value { "events": 0, "ingestion": 0, "storage": 0 - }, - "current_date_stats": { - "events": 0, - "ingestion": 0, - "storage": 0 } }); stream_metadata_map.insert("stats".to_owned(), default_stats); @@ -129,11 +119,6 @@ pub fn v3_v4(mut stream_metadata: Value) -> Value { "events": 0, "ingestion": 0, "storage": 0 - }, - "current_date_stats": { - "events": 0, - "ingestion": 0, - "storage": 0 } }); stream_metadata_map.insert("stats".to_owned(), default_stats); diff --git a/server/src/stats.rs b/server/src/stats.rs index 05fc91dfd..b7845ecc9 100644 --- a/server/src/stats.rs +++ b/server/src/stats.rs @@ -17,9 +17,9 @@ */ use crate::metrics::{ DELETED_EVENTS_STORAGE_SIZE, EVENTS_DELETED, EVENTS_DELETED_SIZE, EVENTS_INGESTED, - EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, - LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, LIFETIME_EVENTS_STORAGE_SIZE, - STORAGE_SIZE, STORAGE_SIZE_TODAY, + EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, + EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, + LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE, }; use crate::storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat}; use std::sync::Arc; @@ -37,7 +37,6 @@ pub struct FullStats { pub lifetime_stats: Stats, pub current_stats: Stats, pub deleted_stats: Stats, - pub current_date_stats: Stats, } pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option { @@ -80,18 +79,6 @@ pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option Option prometheus::Resu let storage_size_labels = storage_size_labels(stream_name); EVENTS_INGESTED.remove_label_values(&event_labels)?; - EVENTS_INGESTED_TODAY.remove_label_values(&event_labels)?; EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?; - EVENTS_INGESTED_SIZE_TODAY.remove_label_values(&event_labels)?; STORAGE_SIZE.remove_label_values(&storage_size_labels)?; - STORAGE_SIZE_TODAY.remove_label_values(&storage_size_labels)?; EVENTS_DELETED.remove_label_values(&event_labels)?; EVENTS_DELETED_SIZE.remove_label_values(&event_labels)?; DELETED_EVENTS_STORAGE_SIZE.remove_label_values(&storage_size_labels)?; @@ -181,6 +174,10 @@ pub fn delete_stats(stream_name: &str, format: &'static str) -> prometheus::Resu LIFETIME_EVENTS_INGESTED_SIZE.remove_label_values(&event_labels)?; LIFETIME_EVENTS_STORAGE_SIZE.remove_label_values(&storage_size_labels)?; + EVENTS_INGESTED_DATE.remove_label_values(&event_labels)?; + EVENTS_INGESTED_SIZE_DATE.remove_label_values(&event_labels)?; + EVENTS_STORAGE_SIZE_DATE.remove_label_values(&storage_size_labels)?; + Ok(()) } @@ -191,3 +188,15 @@ pub fn event_labels<'a>(stream_name: &'a str, format: &'static str) -> [&'a str; pub fn storage_size_labels(stream_name: &str) -> [&str; 3] { ["data", stream_name, "parquet"] } + +pub fn event_labels_date<'a>( + stream_name: &'a str, + format: &'static str, + date: &'a str, +) -> [&'a str; 3] { + [stream_name, format, date] +} + +pub fn storage_size_labels_date<'a>(stream_name: &'a str, date: &'a str) -> [&'a str; 4] { + ["data", stream_name, "parquet", date] +} diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 5af74cd60..568326e94 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -27,7 +27,7 @@ use super::{ use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; -use crate::metrics::{LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE_TODAY}; +use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; use crate::option::Mode; use crate::{ alerts::Alerts, @@ -468,20 +468,7 @@ pub trait ObjectStorage: Sync + 'static { commit_schema_to_storage(stream, schema).await?; } } - let mut compressed_size: u64 = 0; let parquet_files = dir.parquet_files(); - parquet_files.iter().for_each(|file| { - compressed_size += file.metadata().map_or(0, |meta| meta.len()); - }); - STORAGE_SIZE - .with_label_values(&["data", stream, "parquet"]) - .add(compressed_size as i64); - STORAGE_SIZE_TODAY - .with_label_values(&["data", stream, "parquet"]) - .add(compressed_size as i64); - LIFETIME_EVENTS_STORAGE_SIZE - .with_label_values(&["data", stream, "parquet"]) - .add(compressed_size as i64); for file in parquet_files { let filename = file @@ -489,6 +476,18 @@ pub trait ObjectStorage: Sync + 'static { .expect("only parquet files are returned by iterator") .to_str() .expect("filename is valid string"); + let mut file_date_part = filename.split('.').collect::>()[0]; + file_date_part = file_date_part.split('=').collect::>()[1]; + let compressed_size = file.metadata().map_or(0, |meta| meta.len()); + STORAGE_SIZE + .with_label_values(&["data", stream, "parquet"]) + .add(compressed_size as i64); + EVENTS_STORAGE_SIZE_DATE + .with_label_values(&["data", stream, "parquet", file_date_part]) + .add(compressed_size as i64); + LIFETIME_EVENTS_STORAGE_SIZE + .with_label_values(&["data", stream, "parquet"]) + .add(compressed_size as i64); let mut file_suffix = str::replacen(filename, ".", "/", 3); let custom_partition_clone = custom_partition.clone();