Skip to content

Commit

Permalink
fix for stats for each date (#805)
Browse files Browse the repository at this point in the history
update stats in manifest list in snapshot for each date
load daily stats on server start update GET /stats endpoint 
to accept query param if given /stats/date={date in yyyy-mm-dd} 
format, date level stats is returned
  • Loading branch information
nikhilsinhaparseable authored Jun 7, 2024
1 parent ad39f57 commit a279822
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 186 deletions.
54 changes: 36 additions & 18 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use std::{io::ErrorKind, sync::Arc};

use self::{column::Column, snapshot::ManifestItem};
use crate::handlers::http::base_path_without_preceding_slash;
use crate::metrics::{EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, STORAGE_SIZE_TODAY};
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::option::CONFIG;
use crate::stats::{event_labels, storage_size_labels, update_deleted_stats};
use crate::stats::{event_labels_date, storage_size_labels_date, update_deleted_stats};
use crate::{
catalog::manifest::Manifest,
event::DEFAULT_TIMESTAMP_KEY,
Expand Down Expand Up @@ -102,22 +102,6 @@ pub async fn update_snapshot(
stream_name: &str,
change: manifest::File,
) -> Result<(), ObjectStorageError> {
// get current snapshot
let event_labels = event_labels(stream_name, "json");
let storage_size_labels = storage_size_labels(stream_name);
let events_ingested = EVENTS_INGESTED_TODAY
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let ingestion_size = EVENTS_INGESTED_SIZE_TODAY
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let storage_size = STORAGE_SIZE_TODAY
.get_metric_with_label_values(&storage_size_labels)
.unwrap()
.get() as u64;

let mut meta = storage.get_object_store_format(stream_name).await?;
let meta_clone = meta.clone();
let manifests = &mut meta.snapshot.manifest_list;
Expand All @@ -132,6 +116,21 @@ pub async fn update_snapshot(
lower_bound
}
};
let date = lower_bound.date_naive().format("%Y-%m-%d").to_string();
let event_labels = event_labels_date(stream_name, "json", &date);
let storage_size_labels = storage_size_labels_date(stream_name, &date);
let events_ingested = EVENTS_INGESTED_DATE
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let storage_size = EVENTS_STORAGE_SIZE_DATE
.get_metric_with_label_values(&storage_size_labels)
.unwrap()
.get() as u64;
let pos = manifests.iter().position(|item| {
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
});
Expand All @@ -149,6 +148,25 @@ pub async fn update_snapshot(
for m in manifests.iter_mut() {
let p = manifest_path("").to_string();
if m.manifest_path.contains(&p) {
let date = m
.time_lower_bound
.date_naive()
.format("%Y-%m-%d")
.to_string();
let event_labels = event_labels_date(stream_name, "json", &date);
let storage_size_labels = storage_size_labels_date(stream_name, &date);
let events_ingested = EVENTS_INGESTED_DATE
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let storage_size = EVENTS_STORAGE_SIZE_DATE
.get_metric_with_label_values(&storage_size_labels)
.unwrap()
.get() as u64;
ch = true;
m.events_ingested = events_ingested;
m.ingestion_size = ingestion_size;
Expand Down
1 change: 1 addition & 0 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl Event {
self.origin_format,
self.origin_size,
num_rows,
self.parsed_timestamp,
)?;

crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);
Expand Down
44 changes: 44 additions & 0 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ use crate::handlers::{
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
};
use crate::metadata::STREAM_INFO;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::option::{Mode, CONFIG};
use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema};
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
use crate::{
catalog::{self, remove_manifest_from_snapshot},
Expand Down Expand Up @@ -482,6 +484,29 @@ pub async fn put_enable_cache(
StatusCode::OK,
))
}
pub async fn get_stats_date(stream_name: &str, date: &str) -> Result<Stats, StreamError> {
let event_labels = event_labels_date(stream_name, "json", date);
let storage_size_labels = storage_size_labels_date(stream_name, date);
let events_ingested = EVENTS_INGESTED_DATE
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
.get_metric_with_label_values(&event_labels)
.unwrap()
.get() as u64;
let storage_size = EVENTS_STORAGE_SIZE_DATE
.get_metric_with_label_values(&storage_size_labels)
.unwrap()
.get() as u64;

let stats = Stats {
events: events_ingested,
ingestion: ingestion_size,
storage: storage_size,
};
Ok(stats)
}

pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
Expand All @@ -490,6 +515,25 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
return Err(StreamError::StreamNotFound(stream_name));
}

let query_string = req.query_string();
if !query_string.is_empty() {
let date_key = query_string.split('=').collect::<Vec<&str>>()[0];
let date_value = query_string.split('=').collect::<Vec<&str>>()[1];
if date_key != "date" {
return Err(StreamError::Custom {
msg: "Invalid query parameter".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

if !date_value.is_empty() {
let stats = get_stats_date(&stream_name, date_value).await?;
let stats = serde_json::to_value(stats)?;

return Ok((web::Json(stats), StatusCode::OK));
}
}

let stats = stats::get_current_stats(&stream_name, "json")
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

Expand Down
1 change: 0 additions & 1 deletion server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ impl IngestServer {
}

metrics::fetch_stats_from_storage().await;
metrics::reset_daily_metric_from_global();

let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync();
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
Expand Down
1 change: 0 additions & 1 deletion server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ impl QueryServer {

// load data from stats back to prometheus metrics
metrics::fetch_stats_from_storage().await;
metrics::reset_daily_metric_from_global();
// track all parquet files already in the data directory
storage::retention::load_retention_from_global();

Expand Down
1 change: 0 additions & 1 deletion server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,6 @@ impl Server {
DASHBOARDS.load().await?;

metrics::fetch_stats_from_storage().await;
metrics::reset_daily_metric_from_global();
storage::retention::load_retention_from_global();

let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync();
Expand Down
41 changes: 32 additions & 9 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
use chrono::Local;
use chrono::{Local, NaiveDateTime};
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::collections::HashMap;
Expand All @@ -27,10 +27,10 @@ use std::sync::{Arc, RwLock};
use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
use crate::alerts::Alerts;
use crate::metrics::{
EVENTS_INGESTED, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY,
LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE,
EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
};
use crate::storage::{LogStream, ObjectStorage, StorageDir};
use crate::storage::{LogStream, ObjectStorage, ObjectStoreFormat, StorageDir};
use crate::utils::arrow::MergedRecordReader;
use derive_more::{Deref, DerefMut};

Expand Down Expand Up @@ -244,7 +244,8 @@ impl StreamInfo {
let alerts = storage.get_alerts(&stream.name).await?;
let schema = storage.get_schema_on_server_start(&stream.name).await?;
let meta = storage.get_stream_metadata(&stream.name).await?;

let meta_clone = meta.clone();
let stream_name = stream.name.clone();
let schema = update_schema_from_staging(&stream.name, schema);
let schema = HashMap::from_iter(
schema
Expand All @@ -268,10 +269,30 @@ impl StreamInfo {
let mut map = self.write().expect(LOCK_EXPECT);

map.insert(stream.name, metadata);
Self::load_daily_metrics(meta_clone, &stream_name);

Ok(())
}

fn load_daily_metrics(meta: ObjectStoreFormat, stream_name: &str) {
let manifests = meta.snapshot.manifest_list;
for manifest in manifests {
let manifest_date = manifest.time_lower_bound.date_naive().to_string();
let events_ingested = manifest.events_ingested;
let ingestion_size = manifest.ingestion_size;
let storage_size = manifest.storage_size;
EVENTS_INGESTED_DATE
.with_label_values(&[stream_name, "json", &manifest_date])
.set(events_ingested as i64);
EVENTS_INGESTED_SIZE_DATE
.with_label_values(&[stream_name, "json", &manifest_date])
.set(ingestion_size as i64);
EVENTS_STORAGE_SIZE_DATE
.with_label_values(&["data", stream_name, "parquet", &manifest_date])
.set(storage_size as i64);
}
}

pub fn list_streams(&self) -> Vec<String> {
self.read()
.expect(LOCK_EXPECT)
Expand All @@ -286,18 +307,20 @@ impl StreamInfo {
origin: &'static str,
size: u64,
num_rows: u64,
parsed_timestamp: NaiveDateTime,
) -> Result<(), MetadataError> {
let parsed_date = parsed_timestamp.date().to_string();
EVENTS_INGESTED
.with_label_values(&[stream_name, origin])
.add(num_rows as i64);
EVENTS_INGESTED_TODAY
.with_label_values(&[stream_name, origin])
EVENTS_INGESTED_DATE
.with_label_values(&[stream_name, origin, parsed_date.as_str()])
.add(num_rows as i64);
EVENTS_INGESTED_SIZE
.with_label_values(&[stream_name, origin])
.add(size as i64);
EVENTS_INGESTED_SIZE_TODAY
.with_label_values(&[stream_name, origin])
EVENTS_INGESTED_SIZE_DATE
.with_label_values(&[stream_name, origin, parsed_date.as_str()])
.add(size as i64);
LIFETIME_EVENTS_INGESTED
.with_label_values(&[stream_name, origin])
Expand Down
Loading

0 comments on commit a279822

Please sign in to comment.