From c2ee3f9a639a72bfc0ce4553d25f5d8b5584827d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 5 Dec 2024 19:43:28 +0530 Subject: [PATCH] instrument execution plan construction --- src/handlers/http/mod.rs | 2 ++ .../http/modal/utils/logstream_utils.rs | 2 ++ src/handlers/http/query.rs | 12 +++++++++++- src/query/mod.rs | 2 ++ src/query/stream_schema_provider.rs | 17 ++++++++++++----- src/querycache.rs | 8 +++++++- src/storage/object_storage.rs | 4 +++- 7 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index f77d6335b..270129a27 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -20,6 +20,7 @@ use actix_cors::Cors; use arrow_schema::Schema; use itertools::Itertools; use serde_json::Value; +use tracing::instrument; use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY}; @@ -75,6 +76,7 @@ pub fn base_path_without_preceding_slash() -> String { /// # Returns /// /// An `anyhow::Result` containing the `arrow_schema::Schema` for the specified stream. +#[instrument] pub async fn fetch_schema(stream_name: &str) -> anyhow::Result { let path_prefix = relative_path::RelativePathBuf::from(format!("{}/{}", stream_name, STREAM_ROOT_DIRECTORY)); diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index caa111e6a..8d7884892 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -22,6 +22,7 @@ use actix_web::{http::header::HeaderMap, HttpRequest}; use arrow_schema::{Field, Schema}; use bytes::Bytes; use http::StatusCode; +use tracing::instrument; use crate::{ handlers::{ @@ -438,6 +439,7 @@ pub async fn create_stream( /// list all streams from storage /// if stream exists in storage, create stream and schema from storage /// and add it to the memory map +#[instrument] pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result { // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 5e3d9f0e0..69624312c 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -30,7 +30,7 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; -use tracing::{error, info, warn}; +use tracing::{error, info, instrument, span, warn, Level}; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; @@ -70,8 +70,10 @@ pub struct Query { pub filter_tags: Option>, } +#[instrument] pub async fn query(req: HttpRequest, query_request: Query) -> Result { let session_state = QUERY_SESSION.state(); + let _guard = span!(Level::DEBUG, "Logical Plan Construction"); let raw_logical_plan = match session_state .create_logical_plan(&query_request.query) .await @@ -85,6 +87,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result) -> Result<(), QueryError> { if CONFIG.parseable.mode == Mode::Query { for table in tables { @@ -192,10 +196,13 @@ pub async fn update_schema_when_distributed(tables: Vec) -> Result<(), Q /// Create streams for querier if they do not exist /// get list of streams from memory and storage /// create streams for memory from storage if they do not exist +#[instrument] pub async fn create_streams_for_querier() { let querier_streams = STREAM_INFO.list_streams(); let store = CONFIG.storage().get_object_store(); + let _guard = span!(Level::DEBUG, "Listing Streams"); let storage_streams = store.list_streams().await.unwrap(); + drop(_guard); for stream in storage_streams { let stream_name = stream.name; @@ -206,6 +213,7 @@ pub async fn create_streams_for_querier() { } #[allow(clippy::too_many_arguments)] +#[instrument] pub async fn put_results_in_cache( cache_results: Option<&str>, user_id: Option<&str>, @@ -263,6 +271,7 @@ pub async fn put_results_in_cache( } #[allow(clippy::too_many_arguments)] +#[instrument] pub async fn get_results_from_cache( show_cached: Option<&str>, query_cache_manager: Option<&QueryCacheManager>, @@ -385,6 +394,7 @@ impl FromRequest for Query { } } +#[instrument] pub async fn into_query( query: &Query, session_state: &SessionState, diff --git a/src/query/mod.rs b/src/query/mod.rs index b41a066f8..2a541ee0b 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -38,6 +38,7 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use sysinfo::System; +use tracing::instrument; use self::error::ExecuteError; use self::stream_schema_provider::GlobalSchemaProvider; @@ -126,6 +127,7 @@ impl Query { SessionContext::new_with_state(state) } + #[instrument] pub async fn execute( &self, stream_name: String, diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 2c568aaf5..2f40b86d6 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -55,6 +55,7 @@ use itertools::Itertools; use object_store::{path::Path, ObjectStore}; use relative_path::RelativePathBuf; use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc}; +use tracing::{instrument, span, Level}; use url::Url; use crate::{ @@ -125,6 +126,7 @@ async fn create_parquet_physical_plan( state: &dyn Session, time_partition: Option, ) -> Result, DataFusionError> { + let _guard = span!(Level::DEBUG, "Construct Physical Plan"); // Convert filters to physical expressions if applicable let filters = filters .iter() @@ -182,6 +184,7 @@ async fn create_parquet_physical_plan( Ok(plan) } +#[instrument] async fn collect_from_snapshot( snapshot: &catalog::snapshot::Snapshot, time_filters: &[PartialTimeFilter], @@ -332,6 +335,7 @@ impl TableProvider for StandardTableProvider { filters: &[Expr], limit: Option, ) -> Result, DataFusionError> { + let _guard = span!(Level::DEBUG, "Execution Plan Preparation"); let mut memory_exec = None; let mut cache_exec = None; let mut hot_tier_exec = None; @@ -357,6 +361,7 @@ impl TableProvider for StandardTableProvider { // Memory Execution Plan (for current stream data in memory) if include_now(filters, time_partition.clone()) { + let _guard = span!(Level::DEBUG, "Memory Execution Plan Preparation"); if let Some(records) = event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema) { @@ -371,6 +376,7 @@ impl TableProvider for StandardTableProvider { // Merged snapshot creation for query mode let merged_snapshot = if CONFIG.parseable.mode == Mode::Query { + let _guard = span!(Level::DEBUG, "Merging snapshots"); let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]); glob_storage .get_objects( @@ -398,8 +404,8 @@ impl TableProvider for StandardTableProvider { let listing_time_fiters = return_listing_time_filters(&merged_snapshot.manifest_list, &mut time_filters); - listing_exec = if let Some(listing_time_filter) = listing_time_fiters { - legacy_listing_table( + if let Some(listing_time_filter) = listing_time_fiters { + listing_exec = legacy_listing_table( self.stream.clone(), glob_storage.clone(), object_store.clone(), @@ -412,9 +418,7 @@ impl TableProvider for StandardTableProvider { time_partition.clone(), ) .await? - } else { - None - }; + } } // Manifest file collection @@ -545,6 +549,7 @@ async fn get_cache_exectuion_plan( state: &dyn Session, time_partition: Option, ) -> Result>, DataFusionError> { + let _guard = span!(Level::DEBUG, "Construct Execution Plan against Cache"); let (cached, remainder) = cache_manager .partition_on_cached(stream, manifest_files.clone(), |file: &File| { &file.file_path @@ -594,6 +599,7 @@ async fn get_hottier_exectuion_plan( state: &dyn Session, time_partition: Option, ) -> Result>, DataFusionError> { + let _guard = span!(Level::DEBUG, "Execution Plan for Hot Tier"); let (hot_tier_files, remainder) = hot_tier_manager .get_hot_tier_manifest_files(stream, manifest_files.clone()) .await @@ -646,6 +652,7 @@ async fn legacy_listing_table( limit: Option, time_partition: Option, ) -> Result>, DataFusionError> { + let _guard = span!(Level::DEBUG, "Execution Plan for Legacy Listings"); let remote_table = ListingTableBuilder::new(stream) .populate_via_listing(glob_storage.clone(), object_store, time_filters) .and_then(|builder| async { diff --git a/src/querycache.rs b/src/querycache.rs index 3169e8061..97ad1bb0a 100644 --- a/src/querycache.rs +++ b/src/querycache.rs @@ -30,7 +30,7 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use tokio::fs as AsyncFs; use tokio::{fs, sync::Mutex}; -use tracing::{error, info, warn}; +use tracing::{error, info, instrument, warn}; use crate::handlers::http::users::USERS_ROOT_DIR; use crate::metadata::STREAM_INFO; @@ -88,6 +88,7 @@ impl QueryCache { self.files.remove(key) } + #[instrument] pub async fn delete(&mut self, key: &CacheMetadata, path: PathBuf) -> Result<(), CacheError> { self.files.delete(key); AsyncFs::remove_file(path).await?; @@ -101,6 +102,7 @@ impl QueryCache { // read the parquet // return the recordbatches + #[instrument] pub async fn get_cached_records( &self, path: &PathBuf, @@ -141,6 +143,7 @@ impl QueryCacheMeta { } } +#[derive(Debug)] pub struct QueryCacheManager { filesystem: LocalFileSystem, cache_path: PathBuf, // refers to the path passed in the env var @@ -241,6 +244,7 @@ impl QueryCacheManager { Ok(()) } + #[instrument] pub async fn get_cache(&self, stream: &str, user_id: &str) -> Result { let path = query_cache_file_path(&self.cache_path, stream, user_id).unwrap(); let res = self @@ -320,6 +324,7 @@ impl QueryCacheManager { Ok(()) } + #[instrument] pub async fn create_parquet_cache( &self, table_name: &str, @@ -364,6 +369,7 @@ impl QueryCacheManager { .await } + #[instrument] pub async fn clear_cache(&self, stream: &str, user_id: &str) -> Result<(), CacheError> { let cache = self.get_cache(stream, user_id).await?; let map = cache.files.values().collect_vec(); diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 443dd930a..a21cc1eb6 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -48,7 +48,7 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::R use itertools::Itertools; use relative_path::RelativePath; use relative_path::RelativePathBuf; -use tracing::error; +use tracing::{error, instrument}; use std::collections::BTreeMap; use std::{ @@ -407,6 +407,7 @@ pub trait ObjectStorage: std::fmt::Debug + Send + Sync + 'static { } // gets the snapshot of the stream + #[instrument] async fn get_object_store_format( &self, stream: &str, @@ -662,6 +663,7 @@ pub trait ObjectStorage: std::fmt::Debug + Send + Sync + 'static { fn get_bucket_name(&self) -> String; } +#[instrument] pub async fn commit_schema_to_storage( stream_name: &str, schema: Schema,