diff --git a/Cargo.lock b/Cargo.lock index faa82846152f6..129b595a5de59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5839,9 +5839,11 @@ version = "1.0.0-alpha" dependencies = [ "anyhow", "async-trait", + "await-tree", "clap 4.2.7", "madsim-tokio", "madsim-tonic", + "parking_lot 0.12.1", "prometheus", "risingwave_common", "risingwave_common_service", diff --git a/proto/monitor_service.proto b/proto/monitor_service.proto index d2fff8d174095..f7919f792620f 100644 --- a/proto/monitor_service.proto +++ b/proto/monitor_service.proto @@ -10,6 +10,7 @@ message StackTraceRequest {} message StackTraceResponse { map actor_traces = 1; map rpc_traces = 2; + map compaction_task_traces = 3; } message ProfilingRequest { diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs index 30b2d9c2b3310..3d40197db6570 100644 --- a/src/compute/src/rpc/service/monitor_service.rs +++ b/src/compute/src/rpc/service/monitor_service.rs @@ -70,6 +70,7 @@ impl MonitorService for MonitorServiceImpl { Ok(Response::new(StackTraceResponse { actor_traces, rpc_traces, + compaction_task_traces: Default::default(), })) } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index a26030aa817ed..5a8727636a68d 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -224,6 +224,7 @@ pub async fn compute_node_serve( output_memory_limiter, sstable_object_id_manager: storage.sstable_object_id_manager().clone(), task_progress_manager: Default::default(), + await_tree_reg: None, }); let (handle, shutdown_sender) = diff --git a/src/ctl/src/cmd_impl/trace.rs b/src/ctl/src/cmd_impl/trace.rs index c69a8ade6c463..dec02108ce115 100644 --- a/src/ctl/src/cmd_impl/trace.rs +++ b/src/ctl/src/cmd_impl/trace.rs @@ -17,7 +17,7 @@ use std::collections::BTreeMap; use risingwave_common::util::addr::HostAddr; use risingwave_pb::common::WorkerType; use risingwave_pb::monitor_service::StackTraceResponse; -use risingwave_rpc_client::ComputeClientPool; +use risingwave_rpc_client::{CompactorClient, ComputeClientPool}; use crate::CtlContext; @@ -41,6 +41,7 @@ pub async fn trace(context: &CtlContext) -> anyhow::Result<()> { let StackTraceResponse { actor_traces, rpc_traces, + .. } = client.stack_trace().await?; all_actor_traces.extend(actor_traces); @@ -65,5 +66,23 @@ pub async fn trace(context: &CtlContext) -> anyhow::Result<()> { } } + let compactor_nodes = meta_client.list_worker_nodes(WorkerType::Compactor).await?; + let mut all_compaction_task_traces = BTreeMap::new(); + for compactor in compactor_nodes { + let addr: HostAddr = compactor.get_host().unwrap().into(); + let client = CompactorClient::new(addr).await?; + let StackTraceResponse { + compaction_task_traces, + .. + } = client.stack_trace().await?; + all_compaction_task_traces.extend(compaction_task_traces); + } + if !all_compaction_task_traces.is_empty() { + println!("--- Compactor Traces ---"); + for (key, trace) in all_compaction_task_traces { + println!(">> Compaction Task {key}\n{trace}"); + } + } + Ok(()) } diff --git a/src/rpc_client/src/compactor_client.rs b/src/rpc_client/src/compactor_client.rs new file mode 100644 index 0000000000000..cdd1b08049087 --- /dev/null +++ b/src/rpc_client/src/compactor_client.rs @@ -0,0 +1,48 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use risingwave_common::util::addr::HostAddr; +use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient; +use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse}; +use tonic::transport::{Channel, Endpoint}; + +use crate::error::Result; + +#[derive(Clone)] +pub struct CompactorClient { + pub monitor_client: MonitorServiceClient, +} + +impl CompactorClient { + pub async fn new(host_addr: HostAddr) -> Result { + let channel = Endpoint::from_shared(format!("http://{}", &host_addr))? + .connect_timeout(Duration::from_secs(5)) + .connect() + .await?; + Ok(Self { + monitor_client: MonitorServiceClient::new(channel), + }) + } + + pub async fn stack_trace(&self) -> Result { + Ok(self + .monitor_client + .to_owned() + .stack_trace(StackTraceRequest::default()) + .await? + .into_inner()) + } +} diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index c25efa77f17cc..1d886cb07b7b1 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -50,8 +50,10 @@ mod connector_client; mod hummock_meta_client; mod meta_client; // mod sink_client; +mod compactor_client; mod stream_client; +pub use compactor_client::CompactorClient; pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef}; pub use connector_client::ConnectorClient; pub use hummock_meta_client::{CompactTaskItem, HummockMetaClient}; diff --git a/src/storage/compactor/Cargo.toml b/src/storage/compactor/Cargo.toml index 768bdcadcb1d7..edcdaa3c2c397 100644 --- a/src/storage/compactor/Cargo.toml +++ b/src/storage/compactor/Cargo.toml @@ -17,7 +17,9 @@ normal = ["workspace-hack"] [dependencies] anyhow = "1" async-trait = "0.1" +await-tree = { workspace = true } clap = { version = "4", features = ["derive"] } +parking_lot = "0.12" prometheus = { version = "0.13" } risingwave_common = { path = "../../common" } risingwave_common_service = { path = "../../common/common_service" } diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 62f411e1682ab..664c7edc91118 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -18,7 +18,7 @@ mod server; mod telemetry; use clap::Parser; -use risingwave_common::config::OverrideConfig; +use risingwave_common::config::{AsyncStackTraceOption, OverrideConfig}; use crate::server::compactor_serve; @@ -83,6 +83,11 @@ struct OverrideConfigOpts { #[clap(long, env = "RW_MAX_CONCURRENT_TASK_NUMBER")] #[override_opts(path = storage.max_concurrent_compaction_task_number)] pub max_concurrent_task_number: Option, + + /// Enable async stack tracing through `await-tree` for risectl. + #[clap(long, env = "RW_ASYNC_STACK_TRACE", value_enum)] + #[override_opts(path = streaming.async_stack_trace)] + pub async_stack_trace: Option, } use std::future::Future; diff --git a/src/storage/compactor/src/rpc.rs b/src/storage/compactor/src/rpc.rs index cad7e28a1c72f..9b04ae80072e5 100644 --- a/src/storage/compactor/src/rpc.rs +++ b/src/storage/compactor/src/rpc.rs @@ -12,8 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; +use std::sync::Arc; + +use parking_lot::RwLock; use risingwave_pb::compactor::compactor_service_server::CompactorService; use risingwave_pb::compactor::{EchoRequest, EchoResponse}; +use risingwave_pb::monitor_service::monitor_service_server::MonitorService; +use risingwave_pb::monitor_service::{ + ProfilingRequest, ProfilingResponse, StackTraceRequest, StackTraceResponse, +}; use tonic::{Request, Response, Status}; #[derive(Default)] @@ -25,3 +33,44 @@ impl CompactorService for CompactorServiceImpl { Ok(Response::new(EchoResponse {})) } } + +pub struct MonitorServiceImpl { + await_tree_reg: Option>>>, +} + +impl MonitorServiceImpl { + pub fn new(await_tree_reg: Option>>>) -> Self { + Self { await_tree_reg } + } +} + +#[async_trait::async_trait] +impl MonitorService for MonitorServiceImpl { + async fn stack_trace( + &self, + _request: Request, + ) -> Result, Status> { + let compaction_task_traces = match &self.await_tree_reg { + None => HashMap::default(), + Some(await_tree_reg) => await_tree_reg + .read() + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + }; + Ok(Response::new(StackTraceResponse { + actor_traces: Default::default(), + rpc_traces: Default::default(), + compaction_task_traces, + })) + } + + async fn profiling( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented( + "profiling unimplemented in compactor", + )) + } +} diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 8c91ff1d8abb7..ae593c0cb12dc 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -16,7 +16,10 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use risingwave_common::config::{extract_storage_memory_config, load_config}; +use parking_lot::RwLock; +use risingwave_common::config::{ + extract_storage_memory_config, load_config, AsyncStackTraceOption, +}; use risingwave_common::monitor::process_linux::monitor_process; use risingwave_common::system_param::local_manager::LocalSystemParamsManager; use risingwave_common::telemetry::manager::TelemetryManager; @@ -29,6 +32,7 @@ use risingwave_common_service::observer_manager::ObserverManager; use risingwave_object_store::object::parse_remote_object_store; use risingwave_pb::common::WorkerType; use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer; +use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer; use risingwave_rpc_client::MetaClient; use risingwave_storage::filter_key_extractor::{FilterKeyExtractorManager, RemoteTableAccessor}; use risingwave_storage::hummock::compactor::{CompactionExecutor, CompactorContext}; @@ -45,7 +49,7 @@ use tokio::task::JoinHandle; use tracing::info; use super::compactor_observer::observer_manager::CompactorObserverNode; -use crate::rpc::CompactorServiceImpl; +use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl}; use crate::telemetry::CompactorTelemetryCreator; use crate::CompactorOpts; @@ -174,6 +178,15 @@ pub async fn compactor_serve( hummock_meta_client.clone(), storage_opts.sstable_id_remote_fetch_number, )); + let await_tree_config = match &config.streaming.async_stack_trace { + AsyncStackTraceOption::Off => None, + c => await_tree::ConfigBuilder::default() + .verbose(c.is_verbose().unwrap()) + .build() + .ok(), + }; + let await_tree_reg = + await_tree_config.map(|c| Arc::new(RwLock::new(await_tree::Registry::new(c)))); let compactor_context = Arc::new(CompactorContext { storage_opts, hummock_meta_client: hummock_meta_client.clone(), @@ -187,6 +200,7 @@ pub async fn compactor_serve( output_memory_limiter, sstable_object_id_manager: sstable_object_id_manager.clone(), task_progress_manager: Default::default(), + await_tree_reg: await_tree_reg.clone(), }); let mut sub_tasks = vec![ MetaClient::start_heartbeat_loop( @@ -216,10 +230,13 @@ pub async fn compactor_serve( tracing::info!("Telemetry didn't start due to config"); } + let compactor_srv = CompactorServiceImpl::default(); + let monitor_srv = MonitorServiceImpl::new(await_tree_reg); let (shutdown_send, mut shutdown_recv) = tokio::sync::oneshot::channel(); let join_handle = tokio::spawn(async move { tonic::transport::Server::builder() - .add_service(CompactorServiceServer::new(CompactorServiceImpl::default())) + .add_service(CompactorServiceServer::new(compactor_srv)) + .add_service(MonitorServiceServer::new(monitor_srv)) .serve_with_shutdown(listen_addr, async move { tokio::select! { _ = tokio::signal::ctrl_c() => {}, diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 59a4a31a7f27b..1857484bf1031 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -191,6 +191,7 @@ pub(crate) mod tests { options.sstable_id_remote_fetch_number, )), task_progress_manager: Default::default(), + await_tree_reg: None, } } diff --git a/src/storage/src/hummock/compactor/context.rs b/src/storage/src/hummock/compactor/context.rs index 6c71321c216b8..c23f12addeab4 100644 --- a/src/storage/src/hummock/compactor/context.rs +++ b/src/storage/src/hummock/compactor/context.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use parking_lot::RwLock; use risingwave_rpc_client::HummockMetaClient; use super::task_progress::TaskProgressManagerRef; @@ -51,6 +52,8 @@ pub struct CompactorContext { pub sstable_object_id_manager: SstableObjectIdManagerRef, pub task_progress_manager: TaskProgressManagerRef, + + pub await_tree_reg: Option>>>, } impl CompactorContext { @@ -83,6 +86,7 @@ impl CompactorContext { output_memory_limiter: memory_limiter, sstable_object_id_manager, task_progress_manager: Default::default(), + await_tree_reg: None, } } } diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 070546f0a5bef..cb3559254969c 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -19,6 +19,7 @@ use std::sync::atomic::AtomicU64; use std::sync::{atomic, Arc}; use std::time::Instant; +use await_tree::InstrumentAwait; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::key_range::KeyRange; @@ -127,7 +128,7 @@ impl SstableStreamIterator { /// `self.block_iter` to `None`. async fn next_block(&mut self) -> HummockResult<()> { // Check if we want and if we can load the next block. - if self.remaining_blocks > 0 && let Some(block) = self.download_next_block().await? { + if self.remaining_blocks > 0 && let Some(block) = self.download_next_block().verbose_instrument_await("stream_iter_next_block").await? { let mut block_iter = BlockIterator::new(BlockHolder::from_owned_block(block)); block_iter.seek_to_first(); @@ -280,6 +281,7 @@ impl ConcatSstableIterator { let sstable = self .sstable_store .sstable(table_info, &mut self.stats) + .verbose_instrument_await("stream_iter_sstable") .await?; let stats_ptr = self.stats.remote_io_time.clone(); let now = Instant::now(); @@ -323,6 +325,7 @@ impl ConcatSstableIterator { let block_stream = self .sstable_store .get_stream(sstable.value(), Some(start_index)) + .verbose_instrument_await("stream_iter_get_stream") .await?; // Determine time needed to open stream. diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index e6c688f789c93..7a8d7ff0825c6 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -28,6 +28,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; +use await_tree::InstrumentAwait; pub use compaction_executor::CompactionExecutor; pub use compaction_filter::{ CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter, @@ -35,7 +36,7 @@ pub use compaction_filter::{ }; pub use context::CompactorContext; use futures::future::try_join_all; -use futures::{stream, StreamExt}; +use futures::{stream, FutureExt, StreamExt}; pub use iterator::ConcatSstableIterator; use itertools::Itertools; use risingwave_hummock_sdk::compact::{compact_task_to_string, estimate_state_for_compaction}; @@ -314,11 +315,26 @@ impl Compactor { CompactorRunner::new(split_index, compactor_context.clone(), compact_task.clone()); let del_agg = delete_range_agg.clone(); let task_progress = task_progress_guard.progress.clone(); - let handle = tokio::spawn(async move { + let runner = async move { compactor_runner .run(filter, multi_filter_key_extractor, del_agg, task_progress) .await - }); + }; + let traced = match context.await_tree_reg.as_ref() { + None => runner.right_future(), + Some(await_tree_reg) => await_tree_reg + .write() + .register( + format!("{}-{}", compact_task.task_id, split_index), + format!( + "Compaction Task {} Split {} ", + compact_task.task_id, split_index + ), + ) + .instrument(runner) + .left_future(), + }; + let handle = tokio::spawn(traced); compaction_futures.push(handle); } @@ -638,10 +654,12 @@ impl Compactor { if !task_config.key_range.left.is_empty() { let full_key = FullKey::decode(&task_config.key_range.left); - iter.seek(full_key).await?; + iter.seek(full_key) + .verbose_instrument_await("iter_seek") + .await?; del_iter.seek(full_key.user_key); } else { - iter.rewind().await?; + iter.rewind().verbose_instrument_await("rewind").await?; del_iter.rewind(); } @@ -742,7 +760,9 @@ impl Compactor { last_table_stats.total_key_size -= last_key.encoded_len() as i64; last_table_stats.total_value_size -= iter.value().encoded_len() as i64; } - iter.next().await?; + iter.next() + .verbose_instrument_await("iter_next_in_drop") + .await?; continue; } @@ -764,6 +784,7 @@ impl Compactor { iter_key.epoch = earliest_range_delete_which_can_see_iter_key; sst_builder .add_full_key(iter_key, HummockValue::Delete, is_new_user_key) + .verbose_instrument_await("add_full_key_delete") .await?; iter_key.epoch = epoch; is_new_user_key = false; @@ -772,9 +793,10 @@ impl Compactor { // Don't allow two SSTs to share same user key sst_builder .add_full_key(iter_key, value, is_new_user_key) + .verbose_instrument_await("add_full_key") .await?; - iter.next().await?; + iter.next().verbose_instrument_await("iter_next").await?; } if let Some(task_progress) = task_progress.as_ref() && progress_key_num > 0 { @@ -848,6 +870,7 @@ impl Compactor { filter_key_extractor, task_progress.clone(), ) + .verbose_instrument_await("compact") .await? } else { self.compact_key_range_impl::<_, Xor8FilterBuilder>( @@ -858,6 +881,7 @@ impl Compactor { filter_key_extractor, task_progress.clone(), ) + .verbose_instrument_await("compact") .await? } } else { @@ -871,6 +895,7 @@ impl Compactor { filter_key_extractor, task_progress.clone(), ) + .verbose_instrument_await("compact") .await? } else { self.compact_key_range_impl::<_, Xor8FilterBuilder>( @@ -881,6 +906,7 @@ impl Compactor { filter_key_extractor, task_progress.clone(), ) + .verbose_instrument_await("compact") .await? } }; @@ -902,6 +928,7 @@ impl Compactor { let context_cloned = self.context.clone(); upload_join_handles.push(async move { upload_join_handle + .verbose_instrument_await("upload") .await .map_err(HummockError::sstable_upload_error)??; if let Some(tracker) = tracker_cloned { @@ -923,7 +950,9 @@ impl Compactor { } // Check if there are any failed uploads. Report all of those SSTs. - try_join_all(upload_join_handles).await?; + try_join_all(upload_join_handles) + .verbose_instrument_await("join") + .await?; self.context .compactor_metrics .get_table_id_total_time_duration @@ -983,9 +1012,13 @@ impl Compactor { compaction_filter, task_progress, ) + .verbose_instrument_await("compact_and_build_sst") .await?; - let ssts = sst_builder.finish().await?; + let ssts = sst_builder + .finish() + .verbose_instrument_await("builder_finish") + .await?; Ok((ssts, compaction_statistics)) } diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 8710e412db5f1..4e268ddc4e8ed 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -205,8 +205,8 @@ impl LocalStreamManager { /// Get await-tree contexts for all actors. pub async fn get_actor_traces(&self) -> HashMap { - let mut core = self.core.lock().await; - match &mut core.await_tree_reg { + let core = self.core.lock().await; + match &core.await_tree_reg { Some(mgr) => mgr.iter().map(|(k, v)| (*k, v)).collect(), None => Default::default(), } diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index d249ce455f9a3..661dac99c7c97 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -560,6 +560,7 @@ fn run_compactor_thread( output_memory_limiter: MemoryLimiter::unlimit(), sstable_object_id_manager, task_progress_manager: Default::default(), + await_tree_reg: None, }); risingwave_storage::hummock::compactor::Compactor::start_compactor( compactor_context,