Skip to content

Commit

Permalink
feat(trace): enable await tree trace for compactor (#10381)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored and lmatz committed Jun 25, 2023
1 parent 9e94a53 commit ccc305e
Show file tree
Hide file tree
Showing 17 changed files with 206 additions and 17 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/monitor_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ message StackTraceRequest {}
message StackTraceResponse {
map<uint32, string> actor_traces = 1;
map<string, string> rpc_traces = 2;
map<string, string> compaction_task_traces = 3;
}

message ProfilingRequest {
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl MonitorService for MonitorServiceImpl {
Ok(Response::new(StackTraceResponse {
actor_traces,
rpc_traces,
compaction_task_traces: Default::default(),
}))
}

Expand Down
1 change: 1 addition & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ pub async fn compute_node_serve(
output_memory_limiter,
sstable_object_id_manager: storage.sstable_object_id_manager().clone(),
task_progress_manager: Default::default(),
await_tree_reg: None,
});

let (handle, shutdown_sender) =
Expand Down
21 changes: 20 additions & 1 deletion src/ctl/src/cmd_impl/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::BTreeMap;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::common::WorkerType;
use risingwave_pb::monitor_service::StackTraceResponse;
use risingwave_rpc_client::ComputeClientPool;
use risingwave_rpc_client::{CompactorClient, ComputeClientPool};

use crate::CtlContext;

Expand All @@ -41,6 +41,7 @@ pub async fn trace(context: &CtlContext) -> anyhow::Result<()> {
let StackTraceResponse {
actor_traces,
rpc_traces,
..
} = client.stack_trace().await?;

all_actor_traces.extend(actor_traces);
Expand All @@ -65,5 +66,23 @@ pub async fn trace(context: &CtlContext) -> anyhow::Result<()> {
}
}

let compactor_nodes = meta_client.list_worker_nodes(WorkerType::Compactor).await?;
let mut all_compaction_task_traces = BTreeMap::new();
for compactor in compactor_nodes {
let addr: HostAddr = compactor.get_host().unwrap().into();
let client = CompactorClient::new(addr).await?;
let StackTraceResponse {
compaction_task_traces,
..
} = client.stack_trace().await?;
all_compaction_task_traces.extend(compaction_task_traces);
}
if !all_compaction_task_traces.is_empty() {
println!("--- Compactor Traces ---");
for (key, trace) in all_compaction_task_traces {
println!(">> Compaction Task {key}\n{trace}");
}
}

Ok(())
}
48 changes: 48 additions & 0 deletions src/rpc_client/src/compactor_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use risingwave_common::util::addr::HostAddr;
use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
use tonic::transport::{Channel, Endpoint};

use crate::error::Result;

#[derive(Clone)]
pub struct CompactorClient {
pub monitor_client: MonitorServiceClient<Channel>,
}

impl CompactorClient {
pub async fn new(host_addr: HostAddr) -> Result<Self> {
let channel = Endpoint::from_shared(format!("http://{}", &host_addr))?
.connect_timeout(Duration::from_secs(5))
.connect()
.await?;
Ok(Self {
monitor_client: MonitorServiceClient::new(channel),
})
}

pub async fn stack_trace(&self) -> Result<StackTraceResponse> {
Ok(self
.monitor_client
.to_owned()
.stack_trace(StackTraceRequest::default())
.await?
.into_inner())
}
}
2 changes: 2 additions & 0 deletions src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ mod connector_client;
mod hummock_meta_client;
mod meta_client;
// mod sink_client;
mod compactor_client;
mod stream_client;

pub use compactor_client::CompactorClient;
pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef};
pub use connector_client::ConnectorClient;
pub use hummock_meta_client::{CompactTaskItem, HummockMetaClient};
Expand Down
2 changes: 2 additions & 0 deletions src/storage/compactor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ normal = ["workspace-hack"]
[dependencies]
anyhow = "1"
async-trait = "0.1"
await-tree = { workspace = true }
clap = { version = "4", features = ["derive"] }
parking_lot = "0.12"
prometheus = { version = "0.13" }
risingwave_common = { path = "../../common" }
risingwave_common_service = { path = "../../common/common_service" }
Expand Down
7 changes: 6 additions & 1 deletion src/storage/compactor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod server;
mod telemetry;

use clap::Parser;
use risingwave_common::config::OverrideConfig;
use risingwave_common::config::{AsyncStackTraceOption, OverrideConfig};

use crate::server::compactor_serve;

Expand Down Expand Up @@ -83,6 +83,11 @@ struct OverrideConfigOpts {
#[clap(long, env = "RW_MAX_CONCURRENT_TASK_NUMBER")]
#[override_opts(path = storage.max_concurrent_compaction_task_number)]
pub max_concurrent_task_number: Option<u64>,

/// Enable async stack tracing through `await-tree` for risectl.
#[clap(long, env = "RW_ASYNC_STACK_TRACE", value_enum)]
#[override_opts(path = streaming.async_stack_trace)]
pub async_stack_trace: Option<AsyncStackTraceOption>,
}

use std::future::Future;
Expand Down
49 changes: 49 additions & 0 deletions src/storage/compactor/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use parking_lot::RwLock;
use risingwave_pb::compactor::compactor_service_server::CompactorService;
use risingwave_pb::compactor::{EchoRequest, EchoResponse};
use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
use risingwave_pb::monitor_service::{
ProfilingRequest, ProfilingResponse, StackTraceRequest, StackTraceResponse,
};
use tonic::{Request, Response, Status};

#[derive(Default)]
Expand All @@ -25,3 +33,44 @@ impl CompactorService for CompactorServiceImpl {
Ok(Response::new(EchoResponse {}))
}
}

pub struct MonitorServiceImpl {
await_tree_reg: Option<Arc<RwLock<await_tree::Registry<String>>>>,
}

impl MonitorServiceImpl {
pub fn new(await_tree_reg: Option<Arc<RwLock<await_tree::Registry<String>>>>) -> Self {
Self { await_tree_reg }
}
}

#[async_trait::async_trait]
impl MonitorService for MonitorServiceImpl {
async fn stack_trace(
&self,
_request: Request<StackTraceRequest>,
) -> Result<Response<StackTraceResponse>, Status> {
let compaction_task_traces = match &self.await_tree_reg {
None => HashMap::default(),
Some(await_tree_reg) => await_tree_reg
.read()
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
};
Ok(Response::new(StackTraceResponse {
actor_traces: Default::default(),
rpc_traces: Default::default(),
compaction_task_traces,
}))
}

async fn profiling(
&self,
_request: Request<ProfilingRequest>,
) -> Result<Response<ProfilingResponse>, Status> {
Err(Status::unimplemented(
"profiling unimplemented in compactor",
))
}
}
23 changes: 20 additions & 3 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use risingwave_common::config::{extract_storage_memory_config, load_config};
use parking_lot::RwLock;
use risingwave_common::config::{
extract_storage_memory_config, load_config, AsyncStackTraceOption,
};
use risingwave_common::monitor::process_linux::monitor_process;
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_common::telemetry::manager::TelemetryManager;
Expand All @@ -29,6 +32,7 @@ use risingwave_common_service::observer_manager::ObserverManager;
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::common::WorkerType;
use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
use risingwave_rpc_client::MetaClient;
use risingwave_storage::filter_key_extractor::{FilterKeyExtractorManager, RemoteTableAccessor};
use risingwave_storage::hummock::compactor::{CompactionExecutor, CompactorContext};
Expand All @@ -45,7 +49,7 @@ use tokio::task::JoinHandle;
use tracing::info;

use super::compactor_observer::observer_manager::CompactorObserverNode;
use crate::rpc::CompactorServiceImpl;
use crate::rpc::{CompactorServiceImpl, MonitorServiceImpl};
use crate::telemetry::CompactorTelemetryCreator;
use crate::CompactorOpts;

Expand Down Expand Up @@ -173,6 +177,15 @@ pub async fn compactor_serve(
hummock_meta_client.clone(),
storage_opts.sstable_id_remote_fetch_number,
));
let await_tree_config = match &config.streaming.async_stack_trace {
AsyncStackTraceOption::Off => None,
c => await_tree::ConfigBuilder::default()
.verbose(c.is_verbose().unwrap())
.build()
.ok(),
};
let await_tree_reg =
await_tree_config.map(|c| Arc::new(RwLock::new(await_tree::Registry::new(c))));
let compactor_context = Arc::new(CompactorContext {
storage_opts,
hummock_meta_client: hummock_meta_client.clone(),
Expand All @@ -186,6 +199,7 @@ pub async fn compactor_serve(
output_memory_limiter,
sstable_object_id_manager: sstable_object_id_manager.clone(),
task_progress_manager: Default::default(),
await_tree_reg: await_tree_reg.clone(),
});
let mut sub_tasks = vec![
MetaClient::start_heartbeat_loop(
Expand Down Expand Up @@ -215,10 +229,13 @@ pub async fn compactor_serve(
tracing::info!("Telemetry didn't start due to config");
}

let compactor_srv = CompactorServiceImpl::default();
let monitor_srv = MonitorServiceImpl::new(await_tree_reg);
let (shutdown_send, mut shutdown_recv) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
tonic::transport::Server::builder()
.add_service(CompactorServiceServer::new(CompactorServiceImpl::default()))
.add_service(CompactorServiceServer::new(compactor_srv))
.add_service(MonitorServiceServer::new(monitor_srv))
.serve_with_shutdown(listen_addr, async move {
tokio::select! {
_ = tokio::signal::ctrl_c() => {},
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ pub(crate) mod tests {
options.sstable_id_remote_fetch_number,
)),
task_progress_manager: Default::default(),
await_tree_reg: None,
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/hummock/compactor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use parking_lot::RwLock;
use risingwave_rpc_client::HummockMetaClient;

use super::task_progress::TaskProgressManagerRef;
Expand Down Expand Up @@ -51,6 +52,8 @@ pub struct CompactorContext {
pub sstable_object_id_manager: SstableObjectIdManagerRef,

pub task_progress_manager: TaskProgressManagerRef,

pub await_tree_reg: Option<Arc<RwLock<await_tree::Registry<String>>>>,
}

impl CompactorContext {
Expand Down Expand Up @@ -83,6 +86,7 @@ impl CompactorContext {
output_memory_limiter: memory_limiter,
sstable_object_id_manager,
task_progress_manager: Default::default(),
await_tree_reg: None,
}
}
}
5 changes: 4 additions & 1 deletion src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::{atomic, Arc};
use std::time::Instant;

use await_tree::InstrumentAwait;
use risingwave_hummock_sdk::compaction_group::StateTableId;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::key_range::KeyRange;
Expand Down Expand Up @@ -127,7 +128,7 @@ impl SstableStreamIterator {
/// `self.block_iter` to `None`.
async fn next_block(&mut self) -> HummockResult<()> {
// Check if we want and if we can load the next block.
if self.remaining_blocks > 0 && let Some(block) = self.download_next_block().await? {
if self.remaining_blocks > 0 && let Some(block) = self.download_next_block().verbose_instrument_await("stream_iter_next_block").await? {
let mut block_iter = BlockIterator::new(BlockHolder::from_owned_block(block));
block_iter.seek_to_first();

Expand Down Expand Up @@ -280,6 +281,7 @@ impl ConcatSstableIterator {
let sstable = self
.sstable_store
.sstable(table_info, &mut self.stats)
.verbose_instrument_await("stream_iter_sstable")
.await?;
let stats_ptr = self.stats.remote_io_time.clone();
let now = Instant::now();
Expand Down Expand Up @@ -323,6 +325,7 @@ impl ConcatSstableIterator {
let block_stream = self
.sstable_store
.get_stream(sstable.value(), Some(start_index))
.verbose_instrument_await("stream_iter_get_stream")
.await?;

// Determine time needed to open stream.
Expand Down
Loading

0 comments on commit ccc305e

Please sign in to comment.