Skip to content

Commit

Permalink
instrument execution plan construction
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Dec 5, 2024
1 parent 384f84f commit c2ee3f9
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 8 deletions.
2 changes: 2 additions & 0 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use actix_cors::Cors;
use arrow_schema::Schema;
use itertools::Itertools;
use serde_json::Value;
use tracing::instrument;

use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY};

Expand Down Expand Up @@ -75,6 +76,7 @@ pub fn base_path_without_preceding_slash() -> String {
/// # Returns
///
/// An `anyhow::Result` containing the `arrow_schema::Schema` for the specified stream.
#[instrument]
pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Schema> {
let path_prefix =
relative_path::RelativePathBuf::from(format!("{}/{}", stream_name, STREAM_ROOT_DIRECTORY));
Expand Down
2 changes: 2 additions & 0 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use actix_web::{http::header::HeaderMap, HttpRequest};
use arrow_schema::{Field, Schema};
use bytes::Bytes;
use http::StatusCode;
use tracing::instrument;

use crate::{
handlers::{
Expand Down Expand Up @@ -438,6 +439,7 @@ pub async fn create_stream(
/// list all streams from storage
/// if stream exists in storage, create stream and schema from storage
/// and add it to the memory map
#[instrument]
pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<bool, StreamError> {
// Proceed to create log stream if it doesn't exist
let storage = CONFIG.storage().get_object_store();
Expand Down
12 changes: 11 additions & 1 deletion src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tracing::{error, info, warn};
use tracing::{error, info, instrument, span, warn, Level};

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
Expand Down Expand Up @@ -70,8 +70,10 @@ pub struct Query {
pub filter_tags: Option<Vec<String>>,
}

#[instrument]
pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
let session_state = QUERY_SESSION.state();
let _guard = span!(Level::DEBUG, "Logical Plan Construction");
let raw_logical_plan = match session_state
.create_logical_plan(&query_request.query)
.await
Expand All @@ -85,6 +87,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
.await?
}
};
drop(_guard);
// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
Expand Down Expand Up @@ -174,6 +177,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
Ok(response)
}

#[instrument]
pub async fn update_schema_when_distributed(tables: Vec<String>) -> Result<(), QueryError> {
if CONFIG.parseable.mode == Mode::Query {
for table in tables {
Expand All @@ -192,10 +196,13 @@ pub async fn update_schema_when_distributed(tables: Vec<String>) -> Result<(), Q
/// Create streams for querier if they do not exist
/// get list of streams from memory and storage
/// create streams for memory from storage if they do not exist
#[instrument]
pub async fn create_streams_for_querier() {
let querier_streams = STREAM_INFO.list_streams();
let store = CONFIG.storage().get_object_store();
let _guard = span!(Level::DEBUG, "Listing Streams");
let storage_streams = store.list_streams().await.unwrap();
drop(_guard);
for stream in storage_streams {
let stream_name = stream.name;

Expand All @@ -206,6 +213,7 @@ pub async fn create_streams_for_querier() {
}

#[allow(clippy::too_many_arguments)]
#[instrument]
pub async fn put_results_in_cache(
cache_results: Option<&str>,
user_id: Option<&str>,
Expand Down Expand Up @@ -263,6 +271,7 @@ pub async fn put_results_in_cache(
}

#[allow(clippy::too_many_arguments)]
#[instrument]
pub async fn get_results_from_cache(
show_cached: Option<&str>,
query_cache_manager: Option<&QueryCacheManager>,
Expand Down Expand Up @@ -385,6 +394,7 @@ impl FromRequest for Query {
}
}

#[instrument]
pub async fn into_query(
query: &Query,
session_state: &SessionState,
Expand Down
2 changes: 2 additions & 0 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use sysinfo::System;
use tracing::instrument;

use self::error::ExecuteError;
use self::stream_schema_provider::GlobalSchemaProvider;
Expand Down Expand Up @@ -126,6 +127,7 @@ impl Query {
SessionContext::new_with_state(state)
}

#[instrument]
pub async fn execute(
&self,
stream_name: String,
Expand Down
17 changes: 12 additions & 5 deletions src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use itertools::Itertools;
use object_store::{path::Path, ObjectStore};
use relative_path::RelativePathBuf;
use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc};
use tracing::{instrument, span, Level};
use url::Url;

use crate::{
Expand Down Expand Up @@ -125,6 +126,7 @@ async fn create_parquet_physical_plan(
state: &dyn Session,
time_partition: Option<String>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let _guard = span!(Level::DEBUG, "Construct Physical Plan");
// Convert filters to physical expressions if applicable
let filters = filters
.iter()
Expand Down Expand Up @@ -182,6 +184,7 @@ async fn create_parquet_physical_plan(
Ok(plan)
}

#[instrument]
async fn collect_from_snapshot(
snapshot: &catalog::snapshot::Snapshot,
time_filters: &[PartialTimeFilter],
Expand Down Expand Up @@ -332,6 +335,7 @@ impl TableProvider for StandardTableProvider {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let _guard = span!(Level::DEBUG, "Execution Plan Preparation");
let mut memory_exec = None;
let mut cache_exec = None;
let mut hot_tier_exec = None;
Expand All @@ -357,6 +361,7 @@ impl TableProvider for StandardTableProvider {

// Memory Execution Plan (for current stream data in memory)
if include_now(filters, time_partition.clone()) {
let _guard = span!(Level::DEBUG, "Memory Execution Plan Preparation");
if let Some(records) =
event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema)
{
Expand All @@ -371,6 +376,7 @@ impl TableProvider for StandardTableProvider {

// Merged snapshot creation for query mode
let merged_snapshot = if CONFIG.parseable.mode == Mode::Query {
let _guard = span!(Level::DEBUG, "Merging snapshots");
let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]);
glob_storage
.get_objects(
Expand Down Expand Up @@ -398,8 +404,8 @@ impl TableProvider for StandardTableProvider {
let listing_time_fiters =
return_listing_time_filters(&merged_snapshot.manifest_list, &mut time_filters);

listing_exec = if let Some(listing_time_filter) = listing_time_fiters {
legacy_listing_table(
if let Some(listing_time_filter) = listing_time_fiters {
listing_exec = legacy_listing_table(
self.stream.clone(),
glob_storage.clone(),
object_store.clone(),
Expand All @@ -412,9 +418,7 @@ impl TableProvider for StandardTableProvider {
time_partition.clone(),
)
.await?
} else {
None
};
}
}

// Manifest file collection
Expand Down Expand Up @@ -545,6 +549,7 @@ async fn get_cache_exectuion_plan(
state: &dyn Session,
time_partition: Option<String>,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
let _guard = span!(Level::DEBUG, "Construct Execution Plan against Cache");
let (cached, remainder) = cache_manager
.partition_on_cached(stream, manifest_files.clone(), |file: &File| {
&file.file_path
Expand Down Expand Up @@ -594,6 +599,7 @@ async fn get_hottier_exectuion_plan(
state: &dyn Session,
time_partition: Option<String>,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
let _guard = span!(Level::DEBUG, "Execution Plan for Hot Tier");
let (hot_tier_files, remainder) = hot_tier_manager
.get_hot_tier_manifest_files(stream, manifest_files.clone())
.await
Expand Down Expand Up @@ -646,6 +652,7 @@ async fn legacy_listing_table(
limit: Option<usize>,
time_partition: Option<String>,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
let _guard = span!(Level::DEBUG, "Execution Plan for Legacy Listings");
let remote_table = ListingTableBuilder::new(stream)
.populate_via_listing(glob_storage.clone(), object_store, time_filters)
.and_then(|builder| async {
Expand Down
8 changes: 7 additions & 1 deletion src/querycache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use tokio::fs as AsyncFs;
use tokio::{fs, sync::Mutex};
use tracing::{error, info, warn};
use tracing::{error, info, instrument, warn};

use crate::handlers::http::users::USERS_ROOT_DIR;
use crate::metadata::STREAM_INFO;
Expand Down Expand Up @@ -88,6 +88,7 @@ impl QueryCache {
self.files.remove(key)
}

#[instrument]
pub async fn delete(&mut self, key: &CacheMetadata, path: PathBuf) -> Result<(), CacheError> {
self.files.delete(key);
AsyncFs::remove_file(path).await?;
Expand All @@ -101,6 +102,7 @@ impl QueryCache {

// read the parquet
// return the recordbatches
#[instrument]
pub async fn get_cached_records(
&self,
path: &PathBuf,
Expand Down Expand Up @@ -141,6 +143,7 @@ impl QueryCacheMeta {
}
}

#[derive(Debug)]
pub struct QueryCacheManager {
filesystem: LocalFileSystem,
cache_path: PathBuf, // refers to the path passed in the env var
Expand Down Expand Up @@ -241,6 +244,7 @@ impl QueryCacheManager {
Ok(())
}

#[instrument]
pub async fn get_cache(&self, stream: &str, user_id: &str) -> Result<QueryCache, CacheError> {
let path = query_cache_file_path(&self.cache_path, stream, user_id).unwrap();
let res = self
Expand Down Expand Up @@ -320,6 +324,7 @@ impl QueryCacheManager {
Ok(())
}

#[instrument]
pub async fn create_parquet_cache(
&self,
table_name: &str,
Expand Down Expand Up @@ -364,6 +369,7 @@ impl QueryCacheManager {
.await
}

#[instrument]
pub async fn clear_cache(&self, stream: &str, user_id: &str) -> Result<(), CacheError> {
let cache = self.get_cache(stream, user_id).await?;
let map = cache.files.values().collect_vec();
Expand Down
4 changes: 3 additions & 1 deletion src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::R
use itertools::Itertools;
use relative_path::RelativePath;
use relative_path::RelativePathBuf;
use tracing::error;
use tracing::{error, instrument};

use std::collections::BTreeMap;
use std::{
Expand Down Expand Up @@ -407,6 +407,7 @@ pub trait ObjectStorage: std::fmt::Debug + Send + Sync + 'static {
}

// gets the snapshot of the stream
#[instrument]
async fn get_object_store_format(
&self,
stream: &str,
Expand Down Expand Up @@ -662,6 +663,7 @@ pub trait ObjectStorage: std::fmt::Debug + Send + Sync + 'static {
fn get_bucket_name(&self) -> String;
}

#[instrument]
pub async fn commit_schema_to_storage(
stream_name: &str,
schema: Schema,
Expand Down

0 comments on commit c2ee3f9

Please sign in to comment.