diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 21a7c56dc8154..9b20419fb61f0 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -39,7 +39,7 @@ use super::decouple_checkpoint_log_sink::{ default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, }; use super::writer::SinkWriter; -use super::{DummySinkCommitCoordinator, SinkWriterParam}; +use super::{DummySinkCommitCoordinator, SinkWriterMetrics, SinkWriterParam}; use crate::error::ConnectorResult; use crate::sink::{ Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, @@ -553,7 +553,7 @@ impl Sink for ClickHouseSink { Ok(DecoupleCheckpointLogSinkerOf::new( writer, - writer_param.sink_metrics, + SinkWriterMetrics::new(&writer_param), commit_checkpoint_interval, )) } diff --git a/src/connector/src/sink/decouple_checkpoint_log_sink.rs b/src/connector/src/sink/decouple_checkpoint_log_sink.rs index 59e3335eb36db..5da0e725bf654 100644 --- a/src/connector/src/sink/decouple_checkpoint_log_sink.rs +++ b/src/connector/src/sink/decouple_checkpoint_log_sink.rs @@ -19,7 +19,8 @@ use async_trait::async_trait; use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; use crate::sink::writer::SinkWriter; -use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics}; +use crate::sink::{LogSinker, Result, SinkLogReader, SinkWriterMetrics}; + pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE: u64 = 10; pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE: u64 = 1; pub const COMMIT_CHECKPOINT_INTERVAL: &str = "commit_checkpoint_interval"; @@ -33,7 +34,7 @@ pub fn default_commit_checkpoint_interval() -> u64 { /// we delay the checkpoint barrier to make commits less frequent. pub struct DecoupleCheckpointLogSinkerOf { writer: W, - sink_metrics: SinkMetrics, + sink_writer_metrics: SinkWriterMetrics, commit_checkpoint_interval: NonZeroU64, } @@ -42,12 +43,12 @@ impl DecoupleCheckpointLogSinkerOf { /// decouple log reader `KvLogStoreReader`. pub fn new( writer: W, - sink_metrics: SinkMetrics, + sink_writer_metrics: SinkWriterMetrics, commit_checkpoint_interval: NonZeroU64, ) -> Self { DecoupleCheckpointLogSinkerOf { writer, - sink_metrics, + sink_writer_metrics, commit_checkpoint_interval, } } @@ -57,7 +58,6 @@ impl DecoupleCheckpointLogSinkerOf { impl> LogSinker for DecoupleCheckpointLogSinkerOf { async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut sink_writer = self.writer; - let sink_metrics = self.sink_metrics; #[derive(Debug)] enum LogConsumerState { /// Mark that the log consumer is not initialized yet @@ -74,6 +74,7 @@ impl> LogSinker for DecoupleCheckpointLogSink let mut current_checkpoint: u64 = 0; let commit_checkpoint_interval = self.commit_checkpoint_interval; + let sink_writer_metrics = self.sink_writer_metrics; loop { let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?; @@ -87,8 +88,8 @@ impl> LogSinker for DecoupleCheckpointLogSink // force commit on update vnode bitmap let start_time = Instant::now(); sink_writer.barrier(true).await?; - sink_metrics - .sink_commit_duration_metrics + sink_writer_metrics + .sink_commit_duration .observe(start_time.elapsed().as_millis() as f64); log_reader.truncate(TruncateOffset::Barrier { epoch: *prev_epoch })?; current_checkpoint = 0; @@ -149,8 +150,8 @@ impl> LogSinker for DecoupleCheckpointLogSink if current_checkpoint >= commit_checkpoint_interval.get() { let start_time = Instant::now(); sink_writer.barrier(true).await?; - sink_metrics - .sink_commit_duration_metrics + sink_writer_metrics + .sink_commit_duration .observe(start_time.elapsed().as_millis() as f64); log_reader.truncate(TruncateOffset::Barrier { epoch })?; current_checkpoint = 0; diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index 494adb2dd6fed..45a439273e93b 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -46,7 +46,7 @@ use super::decouple_checkpoint_log_sink::{ }; use super::writer::SinkWriter; use super::{ - Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam, + Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; @@ -289,6 +289,8 @@ impl Sink for DeltaLakeSink { self.param.downstream_pk.clone(), ) .await?; + + let metrics = SinkWriterMetrics::new(&writer_param); let writer = CoordinatedSinkWriter::new( writer_param .meta_client @@ -312,7 +314,7 @@ impl Sink for DeltaLakeSink { Ok(DecoupleCheckpointLogSinkerOf::new( writer, - writer_param.sink_metrics, + metrics, commit_checkpoint_interval, )) } diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index 0571c9a2bd6bc..735a3b6c74025 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -35,7 +35,9 @@ use super::doris_starrocks_connector::{ HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_DELETE_SIGN, DORIS_SUCCESS_STATUS, POOL_IDLE_TIMEOUT, }; -use super::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use super::{ + Result, SinkError, SinkWriterMetrics, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, +}; use crate::sink::encoder::{JsonEncoder, RowEncoder}; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam}; @@ -209,7 +211,7 @@ impl Sink for DorisSink { self.is_append_only, ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(SinkWriterMetrics::new(&writer_param))) } async fn validate(&self) -> Result<()> { diff --git a/src/connector/src/sink/file_sink/opendal_sink.rs b/src/connector/src/sink/file_sink/opendal_sink.rs index 2b6c356b1e1e8..7843cdfe0f2c1 100644 --- a/src/connector/src/sink/file_sink/opendal_sink.rs +++ b/src/connector/src/sink/file_sink/opendal_sink.rs @@ -31,6 +31,7 @@ use crate::sink::catalog::SinkEncode; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam, SinkWriter, + SinkWriterMetrics, }; use crate::source::TryFromBTreeMap; use crate::with_options::WithOptions; @@ -130,7 +131,7 @@ impl Sink for FileSink { self.format_desc.encode.clone(), self.engine_type.clone(), )? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(SinkWriterMetrics::new(&writer_param))) } } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 53fe7fae6fab0..95dc0b5312193 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -41,6 +41,7 @@ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::Schema; +use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter}; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; use risingwave_pb::connector_service::SinkMetadata; @@ -55,7 +56,8 @@ use super::decouple_checkpoint_log_sink::{ default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, }; use super::{ - Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, + Sink, SinkError, SinkWriterMetrics, SinkWriterParam, GLOBAL_SINK_METRICS, + SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; use crate::connector_common::IcebergCommon; use crate::sink::coordinate::CoordinatedSinkWriter; @@ -367,6 +369,8 @@ impl Sink for IcebergSink { } else { IcebergWriter::new_append_only(table, &writer_param).await? }; + + let metrics = SinkWriterMetrics::new(&writer_param); let writer = CoordinatedSinkWriter::new( writer_param .meta_client @@ -390,7 +394,7 @@ impl Sink for IcebergSink { Ok(DecoupleCheckpointLogSinkerOf::new( writer, - writer_param.sink_metrics, + metrics, commit_checkpoint_interval, )) } @@ -411,6 +415,17 @@ impl Sink for IcebergSink { pub struct IcebergWriter { inner_writer: IcebergWriterEnum, schema: SchemaRef, + // See comments below + _metrics: IcebergWriterMetrics, +} + +pub struct IcebergWriterMetrics { + // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management. + // They are actually used in `PrometheusWriterBuilder`: + // WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone()) + // We keep them here to let the guard cleans the labels from metrics registry when dropped + _write_qps: LabelGuardedIntCounter<3>, + _write_latency: LabelGuardedHistogram<3>, } enum IcebergWriterEnum { @@ -446,38 +461,56 @@ impl IcebergWriter { pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result { let builder_helper = table.builder_helper()?; + let SinkWriterParam { + extra_partition_col_idx, + actor_id, + sink_id, + sink_name, + .. + } = writer_param; + let metrics_labels = [ + &actor_id.to_string(), + &sink_id.to_string(), + sink_name.as_str(), + ]; + + // Metrics + let write_qps = GLOBAL_SINK_METRICS + .iceberg_write_qps + .with_guarded_label_values(&metrics_labels); + let write_latency = GLOBAL_SINK_METRICS + .iceberg_write_latency + .with_guarded_label_values(&metrics_labels); + let rolling_unflushed_data_file = GLOBAL_SINK_METRICS + .iceberg_rolling_unflushed_data_file + .with_guarded_label_values(&metrics_labels); let data_file_builder = DataFileWriterBuilder::new(MonitoredBaseFileWriterBuilder::new( builder_helper .rolling_writer_builder(builder_helper.parquet_writer_builder(0, None)?)?, - writer_param - .sink_metrics - .iceberg_rolling_unflushed_data_file - .clone(), + rolling_unflushed_data_file, )); - if let Some(extra_partition_col_idx) = writer_param.extra_partition_col_idx { + + if let Some(extra_partition_col_idx) = extra_partition_col_idx { let partition_data_file_builder = builder_helper.precompute_partition_writer_builder( data_file_builder.clone(), - extra_partition_col_idx, + *extra_partition_col_idx, )?; let dispatch_builder = builder_helper .dispatcher_writer_builder(partition_data_file_builder, data_file_builder)?; let prometheus_builder = PrometheusWriterBuilder::new( dispatch_builder, - WriterMetrics::new( - writer_param.sink_metrics.iceberg_write_qps.deref().clone(), - writer_param - .sink_metrics - .iceberg_write_latency - .deref() - .clone(), - ), + WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone()), ); - let schema = Self::schema_with_extra_partition_col(&table, extra_partition_col_idx)?; + let schema = Self::schema_with_extra_partition_col(&table, *extra_partition_col_idx)?; let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); Ok(Self { inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), schema, + _metrics: IcebergWriterMetrics { + _write_qps: write_qps, + _write_latency: write_latency, + }, }) } else { let partition_data_file_builder = @@ -487,20 +520,17 @@ impl IcebergWriter { // wrap a layer with collect write metrics let prometheus_builder = PrometheusWriterBuilder::new( dispatch_builder, - WriterMetrics::new( - writer_param.sink_metrics.iceberg_write_qps.deref().clone(), - writer_param - .sink_metrics - .iceberg_write_latency - .deref() - .clone(), - ), + WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone()), ); let schema = table.current_arrow_schema()?; let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); Ok(Self { inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), schema, + _metrics: IcebergWriterMetrics { + _write_qps: write_qps, + _write_latency: write_latency, + }, }) } } @@ -511,20 +541,41 @@ impl IcebergWriter { writer_param: &SinkWriterParam, ) -> Result { let builder_helper = table.builder_helper()?; + let SinkWriterParam { + extra_partition_col_idx, + actor_id, + sink_id, + sink_name, + .. + } = writer_param; + let metrics_labels = [ + &actor_id.to_string(), + &sink_id.to_string(), + sink_name.as_str(), + ]; + + // Metrics + let write_qps = GLOBAL_SINK_METRICS + .iceberg_write_qps + .with_guarded_label_values(&metrics_labels); + let write_latency = GLOBAL_SINK_METRICS + .iceberg_write_latency + .with_guarded_label_values(&metrics_labels); + let rolling_unflushed_data_file = GLOBAL_SINK_METRICS + .iceberg_rolling_unflushed_data_file + .with_guarded_label_values(&metrics_labels); + let position_delete_cache_num = GLOBAL_SINK_METRICS + .iceberg_position_delete_cache_num + .with_guarded_label_values(&metrics_labels); + let data_file_builder = DataFileWriterBuilder::new(MonitoredBaseFileWriterBuilder::new( builder_helper .rolling_writer_builder(builder_helper.parquet_writer_builder(0, None)?)?, - writer_param - .sink_metrics - .iceberg_rolling_unflushed_data_file - .clone(), + rolling_unflushed_data_file, )); let position_delete_builder = MonitoredPositionDeleteWriterBuilder::new( builder_helper.position_delete_writer_builder(0, 1024)?, - writer_param - .sink_metrics - .iceberg_position_delete_cache_num - .clone(), + position_delete_cache_num, ); let equality_delete_builder = builder_helper.equality_delete_writer_builder(unique_column_ids.clone(), 0)?; @@ -534,30 +585,27 @@ impl IcebergWriter { equality_delete_builder, unique_column_ids, ); - if let Some(extra_partition_col_idx) = writer_param.extra_partition_col_idx { + if let Some(extra_partition_col_idx) = extra_partition_col_idx { let partition_delta_builder = builder_helper.precompute_partition_writer_builder( delta_builder.clone(), - extra_partition_col_idx, + *extra_partition_col_idx, )?; let dispatch_builder = builder_helper.dispatcher_writer_builder(partition_delta_builder, delta_builder)?; // wrap a layer with collect write metrics let prometheus_builder = PrometheusWriterBuilder::new( dispatch_builder, - WriterMetrics::new( - writer_param.sink_metrics.iceberg_write_qps.deref().clone(), - writer_param - .sink_metrics - .iceberg_write_latency - .deref() - .clone(), - ), + WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone()), ); - let schema = Self::schema_with_extra_partition_col(&table, extra_partition_col_idx)?; + let schema = Self::schema_with_extra_partition_col(&table, *extra_partition_col_idx)?; let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); Ok(Self { inner_writer: IcebergWriterEnum::Upsert(inner_writer), schema, + _metrics: IcebergWriterMetrics { + _write_qps: write_qps, + _write_latency: write_latency, + }, }) } else { let partition_delta_builder = @@ -567,20 +615,17 @@ impl IcebergWriter { // wrap a layer with collect write metrics let prometheus_builder = PrometheusWriterBuilder::new( dispatch_builder, - WriterMetrics::new( - writer_param.sink_metrics.iceberg_write_qps.deref().clone(), - writer_param - .sink_metrics - .iceberg_write_latency - .deref() - .clone(), - ), + WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone()), ); let schema = table.current_arrow_schema()?; let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); Ok(Self { inner_writer: IcebergWriterEnum::Upsert(inner_writer), schema, + _metrics: IcebergWriterMetrics { + _write_qps: write_qps, + _write_latency: write_latency, + }, }) } } diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index 3c25b1ec7d75a..bddc79b486546 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -25,11 +25,9 @@ use futures::{TryFuture, TryFutureExt}; use risingwave_common::array::StreamChunk; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; -use risingwave_common::metrics::LabelGuardedIntCounter; +use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge}; use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH}; -use crate::sink::SinkMetrics; - pub type LogStoreResult = Result; pub type ChunkId = usize; @@ -268,11 +266,17 @@ impl LogReader for BackpressureMonitoredLogReader { pub struct MonitoredLogReader { inner: R, read_epoch: u64, - metrics: SinkMetrics, + metrics: LogReaderMetrics, +} + +pub struct LogReaderMetrics { + pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>, + pub log_store_read_rows: LabelGuardedIntCounter<4>, + pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>, } impl MonitoredLogReader { - pub fn new(inner: R, metrics: SinkMetrics) -> Self { + pub fn new(inner: R, metrics: LogReaderMetrics) -> Self { Self { inner, read_epoch: INVALID_EPOCH, @@ -327,7 +331,8 @@ where TransformChunkLogReader { f, inner: self } } - pub fn monitored(self, metrics: SinkMetrics) -> impl LogReader { + pub fn monitored(self, metrics: LogReaderMetrics) -> impl LogReader { + // TODO: The `clone()` can be avoided if move backpressure inside `MonitoredLogReader` let wait_new_future_duration = metrics.log_store_reader_wait_new_future_duration_ns.clone(); BackpressureMonitoredLogReader::new( MonitoredLogReader::new(self, metrics), @@ -338,7 +343,13 @@ where pub struct MonitoredLogWriter { inner: W, - metrics: SinkMetrics, + metrics: LogWriterMetrics, +} + +pub struct LogWriterMetrics { + pub log_store_first_write_epoch: LabelGuardedIntGauge<4>, + pub log_store_latest_write_epoch: LabelGuardedIntGauge<4>, + pub log_store_write_rows: LabelGuardedIntCounter<4>, } impl LogWriter for MonitoredLogWriter { @@ -395,7 +406,7 @@ impl T where T: LogWriter + Sized, { - pub fn monitored(self, metrics: SinkMetrics) -> MonitoredLogWriter { + pub fn monitored(self, metrics: LogWriterMetrics) -> MonitoredLogWriter { MonitoredLogWriter { inner: self, metrics, diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 4711dd1a3bdb6..7b10210580581 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -47,6 +47,7 @@ pub mod writer; use std::collections::BTreeMap; use std::future::Future; +use std::sync::LazyLock; use ::clickhouse::error::Error as ClickHouseError; use ::deltalake::DeltaTableError; @@ -61,14 +62,22 @@ use decouple_checkpoint_log_sink::{ use deltalake::DELTALAKE_SINK; use iceberg::ICEBERG_SINK; use opendal::Error as OpendalError; +use prometheus::Registry; use risingwave_common::array::ArrayError; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; +use risingwave_common::hash::ActorId; use risingwave_common::metrics::{ - LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedIntGauge, + LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter, + LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec, }; +use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use risingwave_common::secret::{LocalSecretManager, SecretError}; use risingwave_common::session_config::sink_decouple::SinkDecouple; +use risingwave_common::{ + register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry, + register_guarded_int_gauge_vec_with_registry, +}; use risingwave_pb::catalog::PbSinkType; use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema}; use risingwave_rpc_client::error::RpcError; @@ -86,6 +95,7 @@ use crate::sink::catalog::{SinkCatalog, SinkId}; use crate::sink::file_sink::fs::FsSink; use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset}; use crate::sink::writer::SinkWriter; + const BOUNDED_CHANNEL_SIZE: usize = 16; #[macro_export] macro_rules! for_all_sinks { @@ -287,56 +297,211 @@ impl SinkParam { } } +pub static GLOBAL_SINK_METRICS: LazyLock = + LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY)); + #[derive(Clone)] pub struct SinkMetrics { - pub sink_commit_duration_metrics: LabelGuardedHistogram<4>, - pub connector_sink_rows_received: LabelGuardedIntCounter<3>, - pub log_store_first_write_epoch: LabelGuardedIntGauge<4>, - pub log_store_latest_write_epoch: LabelGuardedIntGauge<4>, - pub log_store_write_rows: LabelGuardedIntCounter<4>, - pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>, - pub log_store_read_rows: LabelGuardedIntCounter<4>, - pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>, - - pub iceberg_write_qps: LabelGuardedIntCounter<3>, - pub iceberg_write_latency: LabelGuardedHistogram<3>, - pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGauge<3>, - pub iceberg_position_delete_cache_num: LabelGuardedIntGauge<3>, - pub iceberg_partition_num: LabelGuardedIntGauge<3>, + pub sink_commit_duration: LabelGuardedHistogramVec<4>, + pub connector_sink_rows_received: LabelGuardedIntCounterVec<3>, + + // Log store metrics + pub log_store_first_write_epoch: LabelGuardedIntGaugeVec<4>, + pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec<4>, + pub log_store_write_rows: LabelGuardedIntCounterVec<4>, + pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec<4>, + pub log_store_read_rows: LabelGuardedIntCounterVec<4>, + pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>, + + // Iceberg metrics + pub iceberg_write_qps: LabelGuardedIntCounterVec<3>, + pub iceberg_write_latency: LabelGuardedHistogramVec<3>, + pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec<3>, + pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec<3>, + pub iceberg_partition_num: LabelGuardedIntGaugeVec<3>, } impl SinkMetrics { - fn for_test() -> Self { - SinkMetrics { - sink_commit_duration_metrics: LabelGuardedHistogram::test_histogram(), - connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter(), - log_store_first_write_epoch: LabelGuardedIntGauge::test_int_gauge(), - log_store_latest_write_epoch: LabelGuardedIntGauge::test_int_gauge(), - log_store_latest_read_epoch: LabelGuardedIntGauge::test_int_gauge(), - log_store_write_rows: LabelGuardedIntCounter::test_int_counter(), - log_store_read_rows: LabelGuardedIntCounter::test_int_counter(), - log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter::test_int_counter( - ), - iceberg_write_qps: LabelGuardedIntCounter::test_int_counter(), - iceberg_write_latency: LabelGuardedHistogram::test_histogram(), - iceberg_rolling_unflushed_data_file: LabelGuardedIntGauge::test_int_gauge(), - iceberg_position_delete_cache_num: LabelGuardedIntGauge::test_int_gauge(), - iceberg_partition_num: LabelGuardedIntGauge::test_int_gauge(), + pub fn new(registry: &Registry) -> Self { + let sink_commit_duration = register_guarded_histogram_vec_with_registry!( + "sink_commit_duration", + "Duration of commit op in sink", + &["actor_id", "connector", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!( + "connector_sink_rows_received", + "Number of rows received by sink", + &["connector_type", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!( + "log_store_first_write_epoch", + "The first write epoch of log store", + &["actor_id", "connector", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!( + "log_store_latest_write_epoch", + "The latest write epoch of log store", + &["actor_id", "connector", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let log_store_write_rows = register_guarded_int_counter_vec_with_registry!( + "log_store_write_rows", + "The write rate of rows", + &["actor_id", "connector", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!( + "log_store_latest_read_epoch", + "The latest read epoch of log store", + &["actor_id", "connector", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let log_store_read_rows = register_guarded_int_counter_vec_with_registry!( + "log_store_read_rows", + "The read rate of rows", + &["actor_id", "connector", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let log_store_reader_wait_new_future_duration_ns = + register_guarded_int_counter_vec_with_registry!( + "log_store_reader_wait_new_future_duration_ns", + "Accumulated duration of LogReader to wait for next call to create future", + &["actor_id", "connector", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!( + "iceberg_write_qps", + "The qps of iceberg writer", + &["actor_id", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let iceberg_write_latency = register_guarded_histogram_vec_with_registry!( + "iceberg_write_latency", + "The latency of iceberg writer", + &["actor_id", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!( + "iceberg_rolling_unflushed_data_file", + "The unflushed data file count of iceberg rolling writer", + &["actor_id", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!( + "iceberg_position_delete_cache_num", + "The delete cache num of iceberg position delete writer", + &["actor_id", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!( + "iceberg_partition_num", + "The partition num of iceberg partition writer", + &["actor_id", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + Self { + sink_commit_duration, + connector_sink_rows_received, + log_store_first_write_epoch, + log_store_latest_write_epoch, + log_store_write_rows, + log_store_latest_read_epoch, + log_store_read_rows, + log_store_reader_wait_new_future_duration_ns, + iceberg_write_qps, + iceberg_write_latency, + iceberg_rolling_unflushed_data_file, + iceberg_position_delete_cache_num, + iceberg_partition_num, } } } #[derive(Clone)] pub struct SinkWriterParam { + // TODO(eric): deprecate executor_id pub executor_id: u64, pub vnode_bitmap: Option, pub meta_client: Option, - pub sink_metrics: SinkMetrics, // The val has two effect: // 1. Indicates that the sink will accpect the data chunk with extra partition value column. // 2. The index of the extra partition value column. // More detail of partition value column, see `PartitionComputeInfo` pub extra_partition_col_idx: Option, + + pub actor_id: ActorId, + pub sink_id: SinkId, + pub sink_name: String, + pub connector: String, +} + +#[derive(Clone)] +pub struct SinkWriterMetrics { + pub sink_commit_duration: LabelGuardedHistogram<4>, + pub connector_sink_rows_received: LabelGuardedIntCounter<3>, +} + +impl SinkWriterMetrics { + pub fn new(writer_param: &SinkWriterParam) -> Self { + let sink_commit_duration = GLOBAL_SINK_METRICS + .sink_commit_duration + .with_guarded_label_values(&[ + &writer_param.actor_id.to_string(), + writer_param.connector.as_str(), + &writer_param.sink_id.to_string(), + writer_param.sink_name.as_str(), + ]); + let connector_sink_rows_received = GLOBAL_SINK_METRICS + .connector_sink_rows_received + .with_guarded_label_values(&[ + // TODO: should have `actor_id` as label + // &writer_param.actor_id.to_string(), + writer_param.connector.as_str(), + &writer_param.sink_id.to_string(), + writer_param.sink_name.as_str(), + ]); + Self { + sink_commit_duration, + connector_sink_rows_received, + } + } + + #[cfg(test)] + pub fn for_test() -> Self { + Self { + sink_commit_duration: LabelGuardedHistogram::test_histogram(), + connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter(), + } + } } #[derive(Clone)] @@ -368,8 +533,12 @@ impl SinkWriterParam { executor_id: Default::default(), vnode_bitmap: Default::default(), meta_client: Default::default(), - sink_metrics: SinkMetrics::for_test(), extra_partition_col_idx: Default::default(), + + actor_id: 1, + sink_id: SinkId::new(1), + sink_name: "test_sink".to_string(), + connector: "test_connector".to_string(), } } } diff --git a/src/connector/src/sink/mongodb.rs b/src/connector/src/sink/mongodb.rs index d09d44b0de9dc..d11572856d785 100644 --- a/src/connector/src/sink/mongodb.rs +++ b/src/connector/src/sink/mongodb.rs @@ -38,8 +38,8 @@ use crate::deserialize_bool_from_string; use crate::sink::encoder::RowEncoder; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam, - SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, + DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterMetrics, + SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; pub const MONGODB_SINK: &str = "mongodb"; @@ -302,7 +302,7 @@ impl Sink for MongodbSink { self.is_append_only, ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(SinkWriterMetrics::new(&writer_param))) } } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 40d9601a2b491..d72991094f689 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, VecDeque}; +use std::collections::VecDeque; use std::marker::PhantomData; use std::ops::Deref; use std::pin::pin; @@ -64,7 +64,7 @@ use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset}; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError, - SinkLogReader, SinkMetrics, SinkParam, SinkWriterParam, + SinkLogReader, SinkParam, SinkWriterMetrics, SinkWriterParam, }; macro_rules! def_remote_sink { @@ -255,8 +255,8 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe pub struct RemoteLogSinker { request_sender: BidiStreamSender, response_stream: BidiStreamReceiver, - sink_metrics: SinkMetrics, stream_chunk_converter: StreamChunkConverter, + sink_writer_metrics: SinkWriterMetrics, } impl RemoteLogSinker { @@ -288,11 +288,10 @@ impl RemoteLogSinker { .start_sink_writer_stream(payload_schema, sink_proto) .await?; - let sink_metrics = writer_param.sink_metrics; Ok(RemoteLogSinker { request_sender, response_stream, - sink_metrics, + sink_writer_metrics: SinkWriterMetrics::new(&writer_param), stream_chunk_converter: StreamChunkConverter::new( sink_name, sink_param.schema(), @@ -308,7 +307,7 @@ impl LogSinker for RemoteLogSinker { async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut request_tx = self.request_sender; let mut response_err_stream_rx = self.response_stream; - let sink_metrics = self.sink_metrics; + let sink_writer_metrics = self.sink_writer_metrics; let (response_tx, mut response_rx) = unbounded_channel(); @@ -336,7 +335,7 @@ impl LogSinker for RemoteLogSinker { queue: &mut VecDeque<(TruncateOffset, Option)>, persisted_offset: TruncateOffset, log_reader: &mut impl SinkLogReader, - metrics: &SinkMetrics, + sink_writer_metrics: &SinkWriterMetrics, ) -> Result<()> { while let Some((sent_offset, _)) = queue.front() && sent_offset < &persisted_offset @@ -358,8 +357,8 @@ impl LogSinker for RemoteLogSinker { if let (TruncateOffset::Barrier { .. }, Some(start_time)) = (persisted_offset, start_time) { - metrics - .sink_commit_duration_metrics + sink_writer_metrics + .sink_commit_duration .observe(start_time.elapsed().as_millis() as f64); } @@ -399,7 +398,7 @@ impl LogSinker for RemoteLogSinker { chunk_id: batch_id as _, }, log_reader, - &sink_metrics, + &sink_writer_metrics, )?; } SinkWriterStreamResponse { @@ -418,7 +417,7 @@ impl LogSinker for RemoteLogSinker { &mut sent_offset_queue, TruncateOffset::Barrier { epoch }, log_reader, - &sink_metrics, + &sink_writer_metrics, )?; } response => { @@ -439,7 +438,7 @@ impl LogSinker for RemoteLogSinker { prev_offset.check_next_offset(offset)?; } let cardinality = chunk.cardinality(); - sink_metrics + sink_writer_metrics .connector_sink_rows_received .inc_by(cardinality as _); @@ -528,6 +527,7 @@ impl Sink for CoordinatedRemoteSink { } async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + let metrics = SinkWriterMetrics::new(&writer_param); Ok(CoordinatedSinkWriter::new( writer_param .meta_client @@ -540,11 +540,10 @@ impl Sink for CoordinatedRemoteSink { "sink needs coordination and should not have singleton input" )) })?, - CoordinatedRemoteSinkWriter::new(self.param.clone(), writer_param.sink_metrics.clone()) - .await?, + CoordinatedRemoteSinkWriter::new(self.param.clone(), metrics.clone()).await?, ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(metrics)) } async fn new_coordinator(&self) -> Result { @@ -553,27 +552,24 @@ impl Sink for CoordinatedRemoteSink { } pub struct CoordinatedRemoteSinkWriter { - #[expect(dead_code)] - properties: BTreeMap, epoch: Option, batch_id: u64, stream_handle: SinkWriterStreamHandle, - sink_metrics: SinkMetrics, + metrics: SinkWriterMetrics, } impl CoordinatedRemoteSinkWriter { - pub async fn new(param: SinkParam, sink_metrics: SinkMetrics) -> Result { + pub async fn new(param: SinkParam, metrics: SinkWriterMetrics) -> Result { let sink_proto = param.to_proto(); let stream_handle = EmbeddedConnectorClient::new()? .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto) .await?; Ok(Self { - properties: param.properties, epoch: None, batch_id: 0, stream_handle, - sink_metrics, + metrics, }) } @@ -584,8 +580,6 @@ impl CoordinatedRemoteSinkWriter { ) -> CoordinatedRemoteSinkWriter { use futures::StreamExt; - let properties = BTreeMap::from([("output.path".to_string(), "/tmp/rw".to_string())]); - let stream_handle = SinkWriterStreamHandle::for_test( request_sender, ReceiverStream::new(response_receiver) @@ -594,11 +588,10 @@ impl CoordinatedRemoteSinkWriter { ); CoordinatedRemoteSinkWriter { - properties, epoch: None, batch_id: 0, stream_handle, - sink_metrics: SinkMetrics::for_test(), + metrics: SinkWriterMetrics::for_test(), } } } @@ -609,7 +602,7 @@ impl SinkWriter for CoordinatedRemoteSinkWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { let cardinality = chunk.cardinality(); - self.sink_metrics + self.metrics .connector_sink_rows_received .inc_by(cardinality as _); diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index d87072a2502e9..9e65ba6f4ea02 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -38,7 +38,7 @@ use super::encoder::{ TimestamptzHandlingMode, }; use super::writer::LogSinkerOf; -use super::{SinkError, SinkParam}; +use super::{SinkError, SinkParam, SinkWriterMetrics}; use crate::sink::writer::SinkWriterExt; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; @@ -114,7 +114,7 @@ impl Sink for SnowflakeSink { self.pk_indices.clone(), self.is_append_only, )? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(SinkWriterMetrics::new(&writer_param))) } async fn validate(&self) -> Result<()> { diff --git a/src/connector/src/sink/sqlserver.rs b/src/connector/src/sink/sqlserver.rs index d40e2a2647b24..4a76d4d35cbc8 100644 --- a/src/connector/src/sink/sqlserver.rs +++ b/src/connector/src/sink/sqlserver.rs @@ -31,7 +31,9 @@ use tokio::net::TcpStream; use tokio_util::compat::TokioAsyncWriteCompatExt; use with_options::WithOptions; -use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use super::{ + SinkError, SinkWriterMetrics, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, +}; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam}; @@ -257,7 +259,7 @@ ORDER BY self.is_append_only, ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(SinkWriterMetrics::new(&writer_param))) } } diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 5c3e724721d18..456415a62e0d6 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -44,8 +44,8 @@ use super::doris_starrocks_connector::{ }; use super::encoder::{JsonEncoder, RowEncoder}; use super::{ - SinkCommitCoordinator, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, - SINK_TYPE_UPSERT, + SinkCommitCoordinator, SinkError, SinkParam, SinkWriterMetrics, SINK_TYPE_APPEND_ONLY, + SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; @@ -313,6 +313,8 @@ impl Sink for StarrocksSink { self.is_append_only, writer_param.executor_id, )?; + + let metrics = SinkWriterMetrics::new(&writer_param); let writer = CoordinatedSinkWriter::new( writer_param .meta_client @@ -331,7 +333,7 @@ impl Sink for StarrocksSink { Ok(DecoupleCheckpointLogSinkerOf::new( writer, - writer_param.sink_metrics, + metrics, commit_checkpoint_interval, )) } diff --git a/src/connector/src/sink/test_sink.rs b/src/connector/src/sink/test_sink.rs index 70c92b8b7c4d4..082fabadd0e49 100644 --- a/src/connector/src/sink/test_sink.rs +++ b/src/connector/src/sink/test_sink.rs @@ -19,7 +19,7 @@ use parking_lot::Mutex; use crate::sink::boxed::{BoxCoordinator, BoxWriter}; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; -use crate::sink::{Sink, SinkError, SinkParam, SinkWriterParam}; +use crate::sink::{Sink, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam}; pub trait BuildBoxWriterTrait = FnMut(SinkParam, SinkWriterParam) -> BoxWriter<()> + Send + 'static; @@ -57,7 +57,7 @@ impl Sink for TestSink { &self, writer_param: SinkWriterParam, ) -> crate::sink::Result { - let metrics = writer_param.sink_metrics.clone(); + let metrics = SinkWriterMetrics::new(&writer_param); Ok(build_box_writer(self.param.clone(), writer_param).into_log_sinker(metrics)) } } diff --git a/src/connector/src/sink/writer.rs b/src/connector/src/sink/writer.rs index 7057fa8c8b510..144619545529f 100644 --- a/src/connector/src/sink/writer.rs +++ b/src/connector/src/sink/writer.rs @@ -29,7 +29,7 @@ use crate::sink::formatter::SinkFormatter; use crate::sink::log_store::{ DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogStoreReadItem, TruncateOffset, }; -use crate::sink::{LogSinker, Result, SinkError, SinkLogReader, SinkMetrics}; +use crate::sink::{LogSinker, Result, SinkError, SinkLogReader, SinkWriterMetrics}; #[async_trait] pub trait SinkWriter: Send + 'static { @@ -112,14 +112,14 @@ pub trait FormattedSink { pub struct LogSinkerOf { writer: W, - sink_metrics: SinkMetrics, + sink_writer_metrics: SinkWriterMetrics, } impl LogSinkerOf { - pub fn new(writer: W, sink_metrics: SinkMetrics) -> Self { + pub fn new(writer: W, sink_writer_metrics: SinkWriterMetrics) -> Self { LogSinkerOf { writer, - sink_metrics, + sink_writer_metrics, } } } @@ -128,7 +128,7 @@ impl LogSinkerOf { impl> LogSinker for LogSinkerOf { async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut sink_writer = self.writer; - let sink_metrics = self.sink_metrics; + let metrics = self.sink_writer_metrics; #[derive(Debug)] enum LogConsumerState { /// Mark that the log consumer is not initialized yet @@ -196,8 +196,8 @@ impl> LogSinker for LogSinkerOf { if is_checkpoint { let start_time = Instant::now(); sink_writer.barrier(true).await?; - sink_metrics - .sink_commit_duration_metrics + metrics + .sink_commit_duration .observe(start_time.elapsed().as_millis() as f64); log_reader.truncate(TruncateOffset::Barrier { epoch })?; } else { @@ -218,10 +218,10 @@ impl T where T: SinkWriter + Sized, { - pub fn into_log_sinker(self, sink_metrics: SinkMetrics) -> LogSinkerOf { + pub fn into_log_sinker(self, sink_writer_metrics: SinkWriterMetrics) -> LogSinkerOf { LogSinkerOf { writer: self, - sink_metrics, + sink_writer_metrics, } } } diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs index 97a747c281bfd..05a4dfc0bf1a4 100644 --- a/src/stream/src/executor/monitor/streaming_stats.rs +++ b/src/stream/src/executor/monitor/streaming_stats.rs @@ -27,13 +27,11 @@ use risingwave_common::metrics::{ RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec, }; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; -use risingwave_common::util::epoch::Epoch; use risingwave_common::{ register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry, }; use risingwave_connector::sink::catalog::SinkId; -use risingwave_connector::sink::SinkMetrics; use crate::common::log_store_impl::kv_log_store::{ REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY, @@ -160,16 +158,6 @@ pub struct StreamingMetrics { /// The progress made by the earliest in-flight barriers in the local barrier manager. pub barrier_manager_progress: IntCounter, - // Sink related metrics - sink_commit_duration: LabelGuardedHistogramVec<4>, - connector_sink_rows_received: LabelGuardedIntCounterVec<3>, - log_store_first_write_epoch: LabelGuardedIntGaugeVec<4>, - log_store_latest_write_epoch: LabelGuardedIntGaugeVec<4>, - log_store_write_rows: LabelGuardedIntCounterVec<4>, - log_store_latest_read_epoch: LabelGuardedIntGaugeVec<4>, - log_store_read_rows: LabelGuardedIntCounterVec<4>, - log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>, - pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec<4>, pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec<4>, pub kv_log_store_rewind_count: LabelGuardedIntCounterVec<4>, @@ -181,13 +169,6 @@ pub struct StreamingMetrics { pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec<4>, pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec<4>, - // Sink iceberg metrics - iceberg_write_qps: LabelGuardedIntCounterVec<3>, - iceberg_write_latency: LabelGuardedHistogramVec<3>, - iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec<3>, - iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec<3>, - iceberg_partition_num: LabelGuardedIntGaugeVec<3>, - // Memory management pub lru_runtime_loop_count: IntCounter, pub lru_latest_sequence: IntGauge, @@ -810,71 +791,6 @@ impl StreamingMetrics { ) .unwrap(); - let sink_commit_duration = register_guarded_histogram_vec_with_registry!( - "sink_commit_duration", - "Duration of commit op in sink", - &["actor_id", "connector", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!( - "connector_sink_rows_received", - "Number of rows received by sink", - &["connector_type", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!( - "log_store_first_write_epoch", - "The first write epoch of log store", - &["actor_id", "connector", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!( - "log_store_latest_write_epoch", - "The latest write epoch of log store", - &["actor_id", "connector", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let log_store_write_rows = register_guarded_int_counter_vec_with_registry!( - "log_store_write_rows", - "The write rate of rows", - &["actor_id", "connector", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!( - "log_store_latest_read_epoch", - "The latest read epoch of log store", - &["actor_id", "connector", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let log_store_read_rows = register_guarded_int_counter_vec_with_registry!( - "log_store_read_rows", - "The read rate of rows", - &["actor_id", "connector", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let log_store_reader_wait_new_future_duration_ns = - register_guarded_int_counter_vec_with_registry!( - "log_store_reader_wait_new_future_duration_ns", - "Accumulated duration of LogReader to wait for next call to create future", - &["actor_id", "connector", "sink_id", "sink_name"], - registry - ) - .unwrap(); - let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!( "kv_log_store_storage_write_count", "Write row count throughput of kv log store", @@ -1073,46 +989,6 @@ impl StreamingMetrics { .unwrap() .relabel_debug_1(level); - let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!( - "iceberg_write_qps", - "The qps of iceberg writer", - &["actor_id", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let iceberg_write_latency = register_guarded_histogram_vec_with_registry!( - "iceberg_write_latency", - "The latency of iceberg writer", - &["actor_id", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!( - "iceberg_rolling_unflushed_data_file", - "The unflushed data file count of iceberg rolling writer", - &["actor_id", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!( - "iceberg_position_delete_cache_num", - "The delete cache num of iceberg position delete writer", - &["actor_id", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!( - "iceberg_partition_num", - "The partition num of iceberg partition writer", - &["actor_id", "sink_id", "sink_name"], - registry - ) - .unwrap(); - Self { level, executor_row_count, @@ -1185,14 +1061,6 @@ impl StreamingMetrics { barrier_inflight_latency, barrier_sync_latency, barrier_manager_progress, - sink_commit_duration, - connector_sink_rows_received, - log_store_first_write_epoch, - log_store_latest_write_epoch, - log_store_write_rows, - log_store_latest_read_epoch, - log_store_read_rows, - log_store_reader_wait_new_future_duration_ns, kv_log_store_storage_write_count, kv_log_store_storage_write_size, kv_log_store_rewind_count, @@ -1203,11 +1071,6 @@ impl StreamingMetrics { kv_log_store_buffer_unconsumed_row_count, kv_log_store_buffer_unconsumed_epoch_count, kv_log_store_buffer_unconsumed_min_epoch, - iceberg_write_qps, - iceberg_write_latency, - iceberg_rolling_unflushed_data_file, - iceberg_position_delete_cache_num, - iceberg_partition_num, lru_runtime_loop_count, lru_latest_sequence, lru_watermark_sequence, @@ -1230,83 +1093,6 @@ impl StreamingMetrics { global_streaming_metrics(MetricLevel::Disabled) } - pub fn new_sink_metrics( - &self, - actor_id_str: &str, - sink_id_str: &str, - sink_name: &str, - connector: &str, - ) -> SinkMetrics { - let label_list = [actor_id_str, connector, sink_id_str, sink_name]; - let sink_commit_duration_metrics = self - .sink_commit_duration - .with_guarded_label_values(&label_list); - - let connector_sink_rows_received = self - .connector_sink_rows_received - .with_guarded_label_values(&[connector, sink_id_str, sink_name]); - - let log_store_latest_read_epoch = self - .log_store_latest_read_epoch - .with_guarded_label_values(&label_list); - - let log_store_latest_write_epoch = self - .log_store_latest_write_epoch - .with_guarded_label_values(&label_list); - - let log_store_first_write_epoch = self - .log_store_first_write_epoch - .with_guarded_label_values(&label_list); - - let initial_epoch = Epoch::now().0; - log_store_latest_read_epoch.set(initial_epoch as _); - log_store_first_write_epoch.set(initial_epoch as _); - log_store_latest_write_epoch.set(initial_epoch as _); - - let log_store_write_rows = self - .log_store_write_rows - .with_guarded_label_values(&label_list); - let log_store_read_rows = self - .log_store_read_rows - .with_guarded_label_values(&label_list); - let log_store_reader_wait_new_future_duration_ns = self - .log_store_reader_wait_new_future_duration_ns - .with_guarded_label_values(&label_list); - - let label_list = [actor_id_str, sink_id_str, sink_name]; - let iceberg_write_qps = self - .iceberg_write_qps - .with_guarded_label_values(&label_list); - let iceberg_write_latency = self - .iceberg_write_latency - .with_guarded_label_values(&label_list); - let iceberg_rolling_unflushed_data_file = self - .iceberg_rolling_unflushed_data_file - .with_guarded_label_values(&label_list); - let iceberg_position_delete_cache_num = self - .iceberg_position_delete_cache_num - .with_guarded_label_values(&label_list); - let iceberg_partition_num = self - .iceberg_partition_num - .with_guarded_label_values(&label_list); - - SinkMetrics { - sink_commit_duration_metrics, - connector_sink_rows_received, - log_store_first_write_epoch, - log_store_latest_write_epoch, - log_store_write_rows, - log_store_latest_read_epoch, - log_store_read_rows, - log_store_reader_wait_new_future_duration_ns, - iceberg_write_qps, - iceberg_write_latency, - iceberg_rolling_unflushed_data_file, - iceberg_position_delete_cache_num, - iceberg_partition_num, - } - } - pub fn new_actor_metrics(&self, actor_id: ActorId) -> ActorMetrics { let label_list: &[&str; 1] = &[&actor_id.to_string()]; let actor_execution_time = self diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 23162241ef298..6877d3e1550c9 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -27,10 +27,11 @@ use risingwave_common_estimate_size::EstimateSize; use risingwave_connector::dispatch_sink; use risingwave_connector::sink::catalog::SinkType; use risingwave_connector::sink::log_store::{ - LogReader, LogReaderExt, LogStoreFactory, LogWriter, LogWriterExt, + LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory, LogWriter, LogWriterExt, + LogWriterMetrics, }; use risingwave_connector::sink::{ - build_sink, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam, + build_sink, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam, GLOBAL_SINK_METRICS, }; use thiserror_ext::AsReport; @@ -218,12 +219,33 @@ impl SinkExecutor { if self.sink.is_sink_into_table() { processed_input.boxed() } else { + let labels = [ + &actor_id.to_string(), + "NA", // TODO: remove the connector label for log writer metrics + &sink_id.to_string(), + self.sink_param.sink_name.as_str(), + ]; + let log_store_first_write_epoch = GLOBAL_SINK_METRICS + .log_store_first_write_epoch + .with_guarded_label_values(&labels); + let log_store_latest_write_epoch = GLOBAL_SINK_METRICS + .log_store_latest_write_epoch + .with_guarded_label_values(&labels); + let log_store_write_rows = GLOBAL_SINK_METRICS + .log_store_write_rows + .with_guarded_label_values(&labels); + let log_writer_metrics = LogWriterMetrics { + log_store_first_write_epoch, + log_store_latest_write_epoch, + log_store_write_rows, + }; + self.log_store_factory .build() .map(move |(log_reader, log_writer)| { let write_log_stream = Self::execute_write_log( processed_input, - log_writer.monitored(self.sink_writer_param.sink_metrics.clone()), + log_writer.monitored(log_writer_metrics), actor_id, ); @@ -443,14 +465,33 @@ impl SinkExecutor { mut sink_writer_param: SinkWriterParam, actor_context: ActorContextRef, ) -> StreamExecutorResult { - let metrics = sink_writer_param.sink_metrics.clone(); - let visible_columns = columns .iter() .enumerate() .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx)) .collect_vec(); + let labels = [ + &actor_context.id.to_string(), + sink_writer_param.connector.as_str(), + &sink_writer_param.sink_id.to_string(), + sink_writer_param.sink_name.as_str(), + ]; + let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS + .log_store_reader_wait_new_future_duration_ns + .with_guarded_label_values(&labels); + let log_store_read_rows = GLOBAL_SINK_METRICS + .log_store_read_rows + .with_guarded_label_values(&labels); + let log_store_latest_read_epoch = GLOBAL_SINK_METRICS + .log_store_latest_read_epoch + .with_guarded_label_values(&labels); + let metrics = LogReaderMetrics { + log_store_latest_read_epoch, + log_store_read_rows, + log_store_reader_wait_new_future_duration_ns, + }; + let mut log_reader = log_reader .transform_chunk(move |chunk| { if visible_columns.len() != columns.len() { diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index c53c123a48cb5..862485e5e56f3 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -200,19 +200,9 @@ impl ExecutorBuilder for SinkExecutorBuilder { let format_desc_with_secret = SinkParam::fill_secret_for_format_desc(format_desc) .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?; - let actor_id_str = format!("{}", params.actor_context.id); - let sink_id_str = format!("{}", sink_id.sink_id); - - let sink_metrics = params.executor_stats.new_sink_metrics( - &actor_id_str, - &sink_id_str, - &sink_name, - connector, - ); - let sink_param = SinkParam { sink_id, - sink_name, + sink_name: sink_name.clone(), properties: properties_with_secret, columns: columns .iter() @@ -230,8 +220,12 @@ impl ExecutorBuilder for SinkExecutorBuilder { executor_id: params.executor_id, vnode_bitmap: params.vnode_bitmap.clone(), meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient), - sink_metrics, extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize), + + actor_id: params.actor_context.id, + sink_id, + sink_name, + connector: connector.to_string(), }; let log_store_identity = format!(