From e38588acee3a232cad6a8e40d41d72467a374301 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 26 Sep 2024 13:56:30 +0800 Subject: [PATCH 01/10] refactor sink_commit_duration; introduce SinkWriterMetrics --- src/connector/src/sink/clickhouse.rs | 4 +- .../src/sink/decouple_checkpoint_log_sink.rs | 19 +- src/connector/src/sink/deltalake.rs | 6 +- src/connector/src/sink/doris.rs | 6 +- .../src/sink/file_sink/opendal_sink.rs | 3 +- src/connector/src/sink/iceberg/mod.rs | 7 +- src/connector/src/sink/mod.rs | 201 +++++++++++++++++- src/connector/src/sink/mongodb.rs | 6 +- src/connector/src/sink/remote.rs | 32 +-- src/connector/src/sink/snowflake.rs | 4 +- src/connector/src/sink/sqlserver.rs | 6 +- src/connector/src/sink/starrocks.rs | 8 +- src/connector/src/sink/test_sink.rs | 4 +- src/connector/src/sink/writer.rs | 18 +- .../src/executor/monitor/streaming_stats.rs | 81 +------ src/stream/src/from_proto/sink.rs | 22 +- 16 files changed, 275 insertions(+), 152 deletions(-) diff --git a/src/connector/src/sink/clickhouse.rs b/src/connector/src/sink/clickhouse.rs index 21a7c56dc815..9b20419fb61f 100644 --- a/src/connector/src/sink/clickhouse.rs +++ b/src/connector/src/sink/clickhouse.rs @@ -39,7 +39,7 @@ use super::decouple_checkpoint_log_sink::{ default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, }; use super::writer::SinkWriter; -use super::{DummySinkCommitCoordinator, SinkWriterParam}; +use super::{DummySinkCommitCoordinator, SinkWriterMetrics, SinkWriterParam}; use crate::error::ConnectorResult; use crate::sink::{ Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, @@ -553,7 +553,7 @@ impl Sink for ClickHouseSink { Ok(DecoupleCheckpointLogSinkerOf::new( writer, - writer_param.sink_metrics, + SinkWriterMetrics::new(&writer_param), commit_checkpoint_interval, )) } diff --git a/src/connector/src/sink/decouple_checkpoint_log_sink.rs b/src/connector/src/sink/decouple_checkpoint_log_sink.rs index 59e3335eb36d..0b877068c24b 100644 --- a/src/connector/src/sink/decouple_checkpoint_log_sink.rs +++ b/src/connector/src/sink/decouple_checkpoint_log_sink.rs @@ -16,10 +16,11 @@ use std::num::NonZeroU64; use std::time::Instant; use async_trait::async_trait; +use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedHistogramVec}; use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; use crate::sink::writer::SinkWriter; -use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics}; +use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics, SinkWriterMetrics}; pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE: u64 = 10; pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE: u64 = 1; pub const COMMIT_CHECKPOINT_INTERVAL: &str = "commit_checkpoint_interval"; @@ -33,7 +34,7 @@ pub fn default_commit_checkpoint_interval() -> u64 { /// we delay the checkpoint barrier to make commits less frequent. pub struct DecoupleCheckpointLogSinkerOf { writer: W, - sink_metrics: SinkMetrics, + sink_writer_metrics: SinkWriterMetrics, commit_checkpoint_interval: NonZeroU64, } @@ -42,12 +43,12 @@ impl DecoupleCheckpointLogSinkerOf { /// decouple log reader `KvLogStoreReader`. pub fn new( writer: W, - sink_metrics: SinkMetrics, + sink_writer_metrics: SinkWriterMetrics, commit_checkpoint_interval: NonZeroU64, ) -> Self { DecoupleCheckpointLogSinkerOf { writer, - sink_metrics, + sink_writer_metrics, commit_checkpoint_interval, } } @@ -57,7 +58,6 @@ impl DecoupleCheckpointLogSinkerOf { impl> LogSinker for DecoupleCheckpointLogSinkerOf { async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut sink_writer = self.writer; - let sink_metrics = self.sink_metrics; #[derive(Debug)] enum LogConsumerState { /// Mark that the log consumer is not initialized yet @@ -74,6 +74,7 @@ impl> LogSinker for DecoupleCheckpointLogSink let mut current_checkpoint: u64 = 0; let commit_checkpoint_interval = self.commit_checkpoint_interval; + let sink_writer_metrics = self.sink_writer_metrics; loop { let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?; @@ -87,8 +88,8 @@ impl> LogSinker for DecoupleCheckpointLogSink // force commit on update vnode bitmap let start_time = Instant::now(); sink_writer.barrier(true).await?; - sink_metrics - .sink_commit_duration_metrics + sink_writer_metrics + .sink_commit_duration .observe(start_time.elapsed().as_millis() as f64); log_reader.truncate(TruncateOffset::Barrier { epoch: *prev_epoch })?; current_checkpoint = 0; @@ -149,8 +150,8 @@ impl> LogSinker for DecoupleCheckpointLogSink if current_checkpoint >= commit_checkpoint_interval.get() { let start_time = Instant::now(); sink_writer.barrier(true).await?; - sink_metrics - .sink_commit_duration_metrics + sink_writer_metrics + .sink_commit_duration .observe(start_time.elapsed().as_millis() as f64); log_reader.truncate(TruncateOffset::Barrier { epoch })?; current_checkpoint = 0; diff --git a/src/connector/src/sink/deltalake.rs b/src/connector/src/sink/deltalake.rs index 494adb2dd6fe..45a439273e93 100644 --- a/src/connector/src/sink/deltalake.rs +++ b/src/connector/src/sink/deltalake.rs @@ -46,7 +46,7 @@ use super::decouple_checkpoint_log_sink::{ }; use super::writer::SinkWriter; use super::{ - Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam, + Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, }; @@ -289,6 +289,8 @@ impl Sink for DeltaLakeSink { self.param.downstream_pk.clone(), ) .await?; + + let metrics = SinkWriterMetrics::new(&writer_param); let writer = CoordinatedSinkWriter::new( writer_param .meta_client @@ -312,7 +314,7 @@ impl Sink for DeltaLakeSink { Ok(DecoupleCheckpointLogSinkerOf::new( writer, - writer_param.sink_metrics, + metrics, commit_checkpoint_interval, )) } diff --git a/src/connector/src/sink/doris.rs b/src/connector/src/sink/doris.rs index 0571c9a2bd6b..735a3b6c7402 100644 --- a/src/connector/src/sink/doris.rs +++ b/src/connector/src/sink/doris.rs @@ -35,7 +35,9 @@ use super::doris_starrocks_connector::{ HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_DELETE_SIGN, DORIS_SUCCESS_STATUS, POOL_IDLE_TIMEOUT, }; -use super::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use super::{ + Result, SinkError, SinkWriterMetrics, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, +}; use crate::sink::encoder::{JsonEncoder, RowEncoder}; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam}; @@ -209,7 +211,7 @@ impl Sink for DorisSink { self.is_append_only, ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(SinkWriterMetrics::new(&writer_param))) } async fn validate(&self) -> Result<()> { diff --git a/src/connector/src/sink/file_sink/opendal_sink.rs b/src/connector/src/sink/file_sink/opendal_sink.rs index d3771d9122d6..be7c1e1740ba 100644 --- a/src/connector/src/sink/file_sink/opendal_sink.rs +++ b/src/connector/src/sink/file_sink/opendal_sink.rs @@ -31,6 +31,7 @@ use crate::sink::catalog::SinkEncode; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam, SinkWriter, + SinkWriterMetrics, }; use crate::source::TryFromBTreeMap; use crate::with_options::WithOptions; @@ -126,7 +127,7 @@ impl Sink for FileSink { self.format_desc.encode.clone(), self.engine_type.clone(), )? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(SinkWriterMetrics::new(&writer_param))) } } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 53fe7fae6fab..012382baf03b 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -55,7 +55,8 @@ use super::decouple_checkpoint_log_sink::{ default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, }; use super::{ - Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, + Sink, SinkError, SinkWriterMetrics, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, + SINK_TYPE_UPSERT, }; use crate::connector_common::IcebergCommon; use crate::sink::coordinate::CoordinatedSinkWriter; @@ -367,6 +368,8 @@ impl Sink for IcebergSink { } else { IcebergWriter::new_append_only(table, &writer_param).await? }; + + let metrics = SinkWriterMetrics::new(&writer_param); let writer = CoordinatedSinkWriter::new( writer_param .meta_client @@ -390,7 +393,7 @@ impl Sink for IcebergSink { Ok(DecoupleCheckpointLogSinkerOf::new( writer, - writer_param.sink_metrics, + metrics, commit_checkpoint_interval, )) } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 4711dd1a3bdb..553f5c749df4 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -47,6 +47,7 @@ pub mod writer; use std::collections::BTreeMap; use std::future::Future; +use std::sync::LazyLock; use ::clickhouse::error::Error as ClickHouseError; use ::deltalake::DeltaTableError; @@ -61,14 +62,23 @@ use decouple_checkpoint_log_sink::{ use deltalake::DELTALAKE_SINK; use iceberg::ICEBERG_SINK; use opendal::Error as OpendalError; +use prometheus::Registry; use risingwave_common::array::ArrayError; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; +use risingwave_common::config::MetricLevel; +use risingwave_common::hash::ActorId; use risingwave_common::metrics::{ - LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedIntGauge, + LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter, + LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, }; +use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use risingwave_common::secret::{LocalSecretManager, SecretError}; use risingwave_common::session_config::sink_decouple::SinkDecouple; +use risingwave_common::{ + register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry, + register_guarded_int_gauge_vec_with_registry, +}; use risingwave_pb::catalog::PbSinkType; use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema}; use risingwave_rpc_client::error::RpcError; @@ -86,6 +96,7 @@ use crate::sink::catalog::{SinkCatalog, SinkId}; use crate::sink::file_sink::fs::FsSink; use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset}; use crate::sink::writer::SinkWriter; + const BOUNDED_CHANNEL_SIZE: usize = 16; #[macro_export] macro_rules! for_all_sinks { @@ -287,9 +298,30 @@ impl SinkParam { } } +pub static GLOBAL_SINK_METRICS: LazyLock = + LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY)); + #[derive(Clone)] pub struct SinkMetrics { - pub sink_commit_duration_metrics: LabelGuardedHistogram<4>, + pub sink_commit_duration: LabelGuardedHistogramVec<4>, + // pub connector_sink_rows_received: LabelGuardedIntCounterVec<3>, + // + // // Log store metrics + // pub log_store_first_write_epoch: LabelGuardedIntGaugeVec<4>, + // pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec<4>, + // pub log_store_write_rows: LabelGuardedIntCounterVec<4>, + // pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec<4>, + // pub log_store_read_rows: LabelGuardedIntCounterVec<4>, + // pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>, + // + // // Iceberg metrics + // pub iceberg_write_qps: LabelGuardedIntCounterVec<3>, + // pub iceberg_write_latency: LabelGuardedHistogramVec<3>, + // pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec<3>, + // pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec<3>, + // pub iceberg_partition_num: LabelGuardedIntGaugeVec<3>, + + // pub sink_commit_duration: LabelGuardedHistogram<4>, pub connector_sink_rows_received: LabelGuardedIntCounter<3>, pub log_store_first_write_epoch: LabelGuardedIntGauge<4>, pub log_store_latest_write_epoch: LabelGuardedIntGauge<4>, @@ -297,7 +329,6 @@ pub struct SinkMetrics { pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>, pub log_store_read_rows: LabelGuardedIntCounter<4>, pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>, - pub iceberg_write_qps: LabelGuardedIntCounter<3>, pub iceberg_write_latency: LabelGuardedHistogram<3>, pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGauge<3>, @@ -306,9 +337,116 @@ pub struct SinkMetrics { } impl SinkMetrics { - fn for_test() -> Self { - SinkMetrics { - sink_commit_duration_metrics: LabelGuardedHistogram::test_histogram(), + pub fn new(registry: &Registry) -> Self { + let sink_commit_duration = register_guarded_histogram_vec_with_registry!( + "sink_commit_duration", + "Duration of commit op in sink", + &["actor_id", "connector", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!( + "connector_sink_rows_received", + "Number of rows received by sink", + &["connector_type", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!( + "log_store_first_write_epoch", + "The first write epoch of log store", + &["actor_id", "connector", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!( + "log_store_latest_write_epoch", + "The latest write epoch of log store", + &["actor_id", "connector", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let log_store_write_rows = register_guarded_int_counter_vec_with_registry!( + "log_store_write_rows", + "The write rate of rows", + &["actor_id", "connector", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!( + "log_store_latest_read_epoch", + "The latest read epoch of log store", + &["actor_id", "connector", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let log_store_read_rows = register_guarded_int_counter_vec_with_registry!( + "log_store_read_rows", + "The read rate of rows", + &["actor_id", "connector", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let log_store_reader_wait_new_future_duration_ns = + register_guarded_int_counter_vec_with_registry!( + "log_store_reader_wait_new_future_duration_ns", + "Accumulated duration of LogReader to wait for next call to create future", + &["actor_id", "connector", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!( + "iceberg_write_qps", + "The qps of iceberg writer", + &["actor_id", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let iceberg_write_latency = register_guarded_histogram_vec_with_registry!( + "iceberg_write_latency", + "The latency of iceberg writer", + &["actor_id", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!( + "iceberg_rolling_unflushed_data_file", + "The unflushed data file count of iceberg rolling writer", + &["actor_id", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!( + "iceberg_position_delete_cache_num", + "The delete cache num of iceberg position delete writer", + &["actor_id", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!( + "iceberg_partition_num", + "The partition num of iceberg partition writer", + &["actor_id", "sink_id", "sink_name"], + registry + ) + .unwrap(); + + Self { + sink_commit_duration, + + // TODO connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter(), log_store_first_write_epoch: LabelGuardedIntGauge::test_int_gauge(), log_store_latest_write_epoch: LabelGuardedIntGauge::test_int_gauge(), @@ -324,10 +462,31 @@ impl SinkMetrics { iceberg_partition_num: LabelGuardedIntGauge::test_int_gauge(), } } + + fn for_test() -> Self { + todo!() + // SinkMetrics { + // sink_commit_duration_metrics: LabelGuardedHistogram::test_histogram(), + // connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter(), + // log_store_first_write_epoch: LabelGuardedIntGauge::test_int_gauge(), + // log_store_latest_write_epoch: LabelGuardedIntGauge::test_int_gauge(), + // log_store_latest_read_epoch: LabelGuardedIntGauge::test_int_gauge(), + // log_store_write_rows: LabelGuardedIntCounter::test_int_counter(), + // log_store_read_rows: LabelGuardedIntCounter::test_int_counter(), + // log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter::test_int_counter( + // ), + // iceberg_write_qps: LabelGuardedIntCounter::test_int_counter(), + // iceberg_write_latency: LabelGuardedHistogram::test_histogram(), + // iceberg_rolling_unflushed_data_file: LabelGuardedIntGauge::test_int_gauge(), + // iceberg_position_delete_cache_num: LabelGuardedIntGauge::test_int_gauge(), + // iceberg_partition_num: LabelGuardedIntGauge::test_int_gauge(), + // } + } } #[derive(Clone)] pub struct SinkWriterParam { + // TODO(eric): deprecate executor_id pub executor_id: u64, pub vnode_bitmap: Option, pub meta_client: Option, @@ -337,6 +496,31 @@ pub struct SinkWriterParam { // 2. The index of the extra partition value column. // More detail of partition value column, see `PartitionComputeInfo` pub extra_partition_col_idx: Option, + + pub actor_id: ActorId, + pub sink_id: SinkId, + pub sink_name: String, + pub connector: String, +} +pub struct SinkWriterMetrics { + pub sink_commit_duration: LabelGuardedHistogram<4>, +} + +impl SinkWriterMetrics { + pub fn new(writer_param: &SinkWriterParam) -> Self { + let sink_commit_duration = writer_param + .sink_metrics + .sink_commit_duration + .with_guarded_label_values(&[ + &writer_param.actor_id.to_string(), + writer_param.connector.as_str(), + &writer_param.sink_id.to_string(), + writer_param.sink_name.as_str(), + ]); + return SinkWriterMetrics { + sink_commit_duration, + }; + } } #[derive(Clone)] @@ -370,6 +554,11 @@ impl SinkWriterParam { meta_client: Default::default(), sink_metrics: SinkMetrics::for_test(), extra_partition_col_idx: Default::default(), + + actor_id: 1, + sink_id: SinkId::new(1), + sink_name: "test_sink".to_string(), + connector: "test_connector".to_string(), } } } diff --git a/src/connector/src/sink/mongodb.rs b/src/connector/src/sink/mongodb.rs index d09d44b0de9d..d11572856d78 100644 --- a/src/connector/src/sink/mongodb.rs +++ b/src/connector/src/sink/mongodb.rs @@ -38,8 +38,8 @@ use crate::deserialize_bool_from_string; use crate::sink::encoder::RowEncoder; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterParam, - SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, + DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriterMetrics, + SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; pub const MONGODB_SINK: &str = "mongodb"; @@ -302,7 +302,7 @@ impl Sink for MongodbSink { self.is_append_only, ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(SinkWriterMetrics::new(&writer_param))) } } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index aa8ca0625d05..523761155237 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -28,6 +28,7 @@ use prost::Message; use risingwave_common::array::StreamChunk; use risingwave_common::bail; use risingwave_common::catalog::{ColumnDesc, ColumnId}; +use risingwave_common::metrics::LabelGuardedHistogram; use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::types::DataType; use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, JVM}; @@ -64,7 +65,7 @@ use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset}; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError, - SinkLogReader, SinkMetrics, SinkParam, SinkWriterParam, + SinkLogReader, SinkMetrics, SinkParam, SinkWriterMetrics, SinkWriterParam, }; macro_rules! def_remote_sink { @@ -255,8 +256,8 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe pub struct RemoteLogSinker { request_sender: BidiStreamSender, response_stream: BidiStreamReceiver, - sink_metrics: SinkMetrics, stream_chunk_converter: StreamChunkConverter, + sink_writer_metrics: SinkWriterMetrics, } impl RemoteLogSinker { @@ -287,11 +288,10 @@ impl RemoteLogSinker { .start_sink_writer_stream(payload_schema, sink_proto) .await?; - let sink_metrics = writer_param.sink_metrics; Ok(RemoteLogSinker { request_sender, response_stream, - sink_metrics, + sink_writer_metrics: SinkWriterMetrics::new(&writer_param), stream_chunk_converter: StreamChunkConverter::new( sink_name, sink_param.schema(), @@ -307,7 +307,7 @@ impl LogSinker for RemoteLogSinker { async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut request_tx = self.request_sender; let mut response_err_stream_rx = self.response_stream; - let sink_metrics = self.sink_metrics; + let sink_writer_metrics = self.sink_writer_metrics; let (response_tx, mut response_rx) = unbounded_channel(); @@ -335,7 +335,7 @@ impl LogSinker for RemoteLogSinker { queue: &mut VecDeque<(TruncateOffset, Option)>, persisted_offset: TruncateOffset, log_reader: &mut impl SinkLogReader, - metrics: &SinkMetrics, + sink_writer_metrics: &SinkWriterMetrics, ) -> Result<()> { while let Some((sent_offset, _)) = queue.front() && sent_offset < &persisted_offset @@ -357,8 +357,8 @@ impl LogSinker for RemoteLogSinker { if let (TruncateOffset::Barrier { .. }, Some(start_time)) = (persisted_offset, start_time) { - metrics - .sink_commit_duration_metrics + sink_writer_metrics + .sink_commit_duration .observe(start_time.elapsed().as_millis() as f64); } @@ -398,7 +398,7 @@ impl LogSinker for RemoteLogSinker { chunk_id: batch_id as _, }, log_reader, - &sink_metrics, + &sink_writer_metrics, )?; } SinkWriterStreamResponse { @@ -417,7 +417,7 @@ impl LogSinker for RemoteLogSinker { &mut sent_offset_queue, TruncateOffset::Barrier { epoch }, log_reader, - &sink_metrics, + &sink_writer_metrics, )?; } response => { @@ -437,10 +437,11 @@ impl LogSinker for RemoteLogSinker { if let Some(prev_offset) = &prev_offset { prev_offset.check_next_offset(offset)?; } - let cardinality = chunk.cardinality(); - sink_metrics - .connector_sink_rows_received - .inc_by(cardinality as _); + // TODO + // let cardinality = chunk.cardinality(); + // sink_writer_metrics + // .connector_sink_rows_received + // .inc_by(cardinality as _); let chunk = self.stream_chunk_converter.convert_chunk(chunk)?; request_tx @@ -527,6 +528,7 @@ impl Sink for CoordinatedRemoteSink { } async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { + let metrics = SinkWriterMetrics::new(&writer_param); Ok(CoordinatedSinkWriter::new( writer_param .meta_client @@ -543,7 +545,7 @@ impl Sink for CoordinatedRemoteSink { .await?, ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(metrics)) } async fn new_coordinator(&self) -> Result { diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index d87072a2502e..9e65ba6f4ea0 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -38,7 +38,7 @@ use super::encoder::{ TimestamptzHandlingMode, }; use super::writer::LogSinkerOf; -use super::{SinkError, SinkParam}; +use super::{SinkError, SinkParam, SinkWriterMetrics}; use crate::sink::writer::SinkWriterExt; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; @@ -114,7 +114,7 @@ impl Sink for SnowflakeSink { self.pk_indices.clone(), self.is_append_only, )? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(SinkWriterMetrics::new(&writer_param))) } async fn validate(&self) -> Result<()> { diff --git a/src/connector/src/sink/sqlserver.rs b/src/connector/src/sink/sqlserver.rs index d40e2a2647b2..4a76d4d35cbc 100644 --- a/src/connector/src/sink/sqlserver.rs +++ b/src/connector/src/sink/sqlserver.rs @@ -31,7 +31,9 @@ use tokio::net::TcpStream; use tokio_util::compat::TokioAsyncWriteCompatExt; use with_options::WithOptions; -use super::{SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use super::{ + SinkError, SinkWriterMetrics, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, +}; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam}; @@ -257,7 +259,7 @@ ORDER BY self.is_append_only, ) .await? - .into_log_sinker(writer_param.sink_metrics)) + .into_log_sinker(SinkWriterMetrics::new(&writer_param))) } } diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 5c3e724721d1..456415a62e0d 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -44,8 +44,8 @@ use super::doris_starrocks_connector::{ }; use super::encoder::{JsonEncoder, RowEncoder}; use super::{ - SinkCommitCoordinator, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, - SINK_TYPE_UPSERT, + SinkCommitCoordinator, SinkError, SinkParam, SinkWriterMetrics, SINK_TYPE_APPEND_ONLY, + SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf; @@ -313,6 +313,8 @@ impl Sink for StarrocksSink { self.is_append_only, writer_param.executor_id, )?; + + let metrics = SinkWriterMetrics::new(&writer_param); let writer = CoordinatedSinkWriter::new( writer_param .meta_client @@ -331,7 +333,7 @@ impl Sink for StarrocksSink { Ok(DecoupleCheckpointLogSinkerOf::new( writer, - writer_param.sink_metrics, + metrics, commit_checkpoint_interval, )) } diff --git a/src/connector/src/sink/test_sink.rs b/src/connector/src/sink/test_sink.rs index 70c92b8b7c4d..082fabadd0e4 100644 --- a/src/connector/src/sink/test_sink.rs +++ b/src/connector/src/sink/test_sink.rs @@ -19,7 +19,7 @@ use parking_lot::Mutex; use crate::sink::boxed::{BoxCoordinator, BoxWriter}; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; -use crate::sink::{Sink, SinkError, SinkParam, SinkWriterParam}; +use crate::sink::{Sink, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam}; pub trait BuildBoxWriterTrait = FnMut(SinkParam, SinkWriterParam) -> BoxWriter<()> + Send + 'static; @@ -57,7 +57,7 @@ impl Sink for TestSink { &self, writer_param: SinkWriterParam, ) -> crate::sink::Result { - let metrics = writer_param.sink_metrics.clone(); + let metrics = SinkWriterMetrics::new(&writer_param); Ok(build_box_writer(self.param.clone(), writer_param).into_log_sinker(metrics)) } } diff --git a/src/connector/src/sink/writer.rs b/src/connector/src/sink/writer.rs index 7057fa8c8b51..a038fb99cace 100644 --- a/src/connector/src/sink/writer.rs +++ b/src/connector/src/sink/writer.rs @@ -29,7 +29,7 @@ use crate::sink::formatter::SinkFormatter; use crate::sink::log_store::{ DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogStoreReadItem, TruncateOffset, }; -use crate::sink::{LogSinker, Result, SinkError, SinkLogReader, SinkMetrics}; +use crate::sink::{LogSinker, Result, SinkError, SinkLogReader, SinkMetrics, SinkWriterMetrics}; #[async_trait] pub trait SinkWriter: Send + 'static { @@ -112,14 +112,14 @@ pub trait FormattedSink { pub struct LogSinkerOf { writer: W, - sink_metrics: SinkMetrics, + sink_writer_metrics: SinkWriterMetrics, } impl LogSinkerOf { - pub fn new(writer: W, sink_metrics: SinkMetrics) -> Self { + pub fn new(writer: W, sink_writer_metrics: SinkWriterMetrics) -> Self { LogSinkerOf { writer, - sink_metrics, + sink_writer_metrics, } } } @@ -128,7 +128,7 @@ impl LogSinkerOf { impl> LogSinker for LogSinkerOf { async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result { let mut sink_writer = self.writer; - let sink_metrics = self.sink_metrics; + let metrics = self.sink_writer_metrics; #[derive(Debug)] enum LogConsumerState { /// Mark that the log consumer is not initialized yet @@ -196,8 +196,8 @@ impl> LogSinker for LogSinkerOf { if is_checkpoint { let start_time = Instant::now(); sink_writer.barrier(true).await?; - sink_metrics - .sink_commit_duration_metrics + metrics + .sink_commit_duration .observe(start_time.elapsed().as_millis() as f64); log_reader.truncate(TruncateOffset::Barrier { epoch })?; } else { @@ -218,10 +218,10 @@ impl T where T: SinkWriter + Sized, { - pub fn into_log_sinker(self, sink_metrics: SinkMetrics) -> LogSinkerOf { + pub fn into_log_sinker(self, sink_writer_metrics: SinkWriterMetrics) -> LogSinkerOf { LogSinkerOf { writer: self, - sink_metrics, + sink_writer_metrics, } } } diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs index 97a747c281bf..3b201a1abd56 100644 --- a/src/stream/src/executor/monitor/streaming_stats.rs +++ b/src/stream/src/executor/monitor/streaming_stats.rs @@ -160,7 +160,7 @@ pub struct StreamingMetrics { /// The progress made by the earliest in-flight barriers in the local barrier manager. pub barrier_manager_progress: IntCounter, - // Sink related metrics + // Sink related metrics TODO: TO REMOVE sink_commit_duration: LabelGuardedHistogramVec<4>, connector_sink_rows_received: LabelGuardedIntCounterVec<3>, log_store_first_write_epoch: LabelGuardedIntGaugeVec<4>, @@ -181,7 +181,7 @@ pub struct StreamingMetrics { pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec<4>, pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec<4>, - // Sink iceberg metrics + // Sink iceberg metrics TODO: to remove iceberg_write_qps: LabelGuardedIntCounterVec<3>, iceberg_write_latency: LabelGuardedHistogramVec<3>, iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec<3>, @@ -1230,83 +1230,6 @@ impl StreamingMetrics { global_streaming_metrics(MetricLevel::Disabled) } - pub fn new_sink_metrics( - &self, - actor_id_str: &str, - sink_id_str: &str, - sink_name: &str, - connector: &str, - ) -> SinkMetrics { - let label_list = [actor_id_str, connector, sink_id_str, sink_name]; - let sink_commit_duration_metrics = self - .sink_commit_duration - .with_guarded_label_values(&label_list); - - let connector_sink_rows_received = self - .connector_sink_rows_received - .with_guarded_label_values(&[connector, sink_id_str, sink_name]); - - let log_store_latest_read_epoch = self - .log_store_latest_read_epoch - .with_guarded_label_values(&label_list); - - let log_store_latest_write_epoch = self - .log_store_latest_write_epoch - .with_guarded_label_values(&label_list); - - let log_store_first_write_epoch = self - .log_store_first_write_epoch - .with_guarded_label_values(&label_list); - - let initial_epoch = Epoch::now().0; - log_store_latest_read_epoch.set(initial_epoch as _); - log_store_first_write_epoch.set(initial_epoch as _); - log_store_latest_write_epoch.set(initial_epoch as _); - - let log_store_write_rows = self - .log_store_write_rows - .with_guarded_label_values(&label_list); - let log_store_read_rows = self - .log_store_read_rows - .with_guarded_label_values(&label_list); - let log_store_reader_wait_new_future_duration_ns = self - .log_store_reader_wait_new_future_duration_ns - .with_guarded_label_values(&label_list); - - let label_list = [actor_id_str, sink_id_str, sink_name]; - let iceberg_write_qps = self - .iceberg_write_qps - .with_guarded_label_values(&label_list); - let iceberg_write_latency = self - .iceberg_write_latency - .with_guarded_label_values(&label_list); - let iceberg_rolling_unflushed_data_file = self - .iceberg_rolling_unflushed_data_file - .with_guarded_label_values(&label_list); - let iceberg_position_delete_cache_num = self - .iceberg_position_delete_cache_num - .with_guarded_label_values(&label_list); - let iceberg_partition_num = self - .iceberg_partition_num - .with_guarded_label_values(&label_list); - - SinkMetrics { - sink_commit_duration_metrics, - connector_sink_rows_received, - log_store_first_write_epoch, - log_store_latest_write_epoch, - log_store_write_rows, - log_store_latest_read_epoch, - log_store_read_rows, - log_store_reader_wait_new_future_duration_ns, - iceberg_write_qps, - iceberg_write_latency, - iceberg_rolling_unflushed_data_file, - iceberg_position_delete_cache_num, - iceberg_partition_num, - } - } - pub fn new_actor_metrics(&self, actor_id: ActorId) -> ActorMetrics { let label_list: &[&str; 1] = &[&actor_id.to_string()]; let actor_execution_time = self diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index c53c123a48cb..941416e182d7 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -22,7 +22,8 @@ use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkId, SinkType}; use risingwave_connector::sink::file_sink::fs::FsSink; use risingwave_connector::sink::{ - SinkError, SinkMetaClient, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, + SinkError, SinkMetaClient, SinkMetrics, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, + GLOBAL_SINK_METRICS, SINK_TYPE_OPTION, }; use risingwave_pb::catalog::Table; use risingwave_pb::plan_common::PbColumnCatalog; @@ -200,19 +201,9 @@ impl ExecutorBuilder for SinkExecutorBuilder { let format_desc_with_secret = SinkParam::fill_secret_for_format_desc(format_desc) .map_err(|e| StreamExecutorError::from((e, sink_id.sink_id)))?; - let actor_id_str = format!("{}", params.actor_context.id); - let sink_id_str = format!("{}", sink_id.sink_id); - - let sink_metrics = params.executor_stats.new_sink_metrics( - &actor_id_str, - &sink_id_str, - &sink_name, - connector, - ); - let sink_param = SinkParam { sink_id, - sink_name, + sink_name: sink_name.clone(), properties: properties_with_secret, columns: columns .iter() @@ -230,8 +221,13 @@ impl ExecutorBuilder for SinkExecutorBuilder { executor_id: params.executor_id, vnode_bitmap: params.vnode_bitmap.clone(), meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient), - sink_metrics, + sink_metrics: GLOBAL_SINK_METRICS.clone(), // TODO: remove me extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize), + + actor_id: params.actor_context.id, + sink_id, + sink_name, + connector: connector.to_string(), }; let log_store_identity = format!( From b8e325009806221a192d43edff64f0d41fb1a6be Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 26 Sep 2024 14:02:56 +0800 Subject: [PATCH 02/10] refactor iceberg metrics --- src/connector/src/sink/iceberg/mod.rs | 148 +++++++++++++++++--------- src/connector/src/sink/mod.rs | 34 +++--- 2 files changed, 115 insertions(+), 67 deletions(-) diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 012382baf03b..b2e5d53366ff 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -41,6 +41,9 @@ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::Schema; +use risingwave_common::metrics::{ + LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedMetric, +}; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; use risingwave_pb::connector_service::SinkMetadata; @@ -414,6 +417,16 @@ impl Sink for IcebergSink { pub struct IcebergWriter { inner_writer: IcebergWriterEnum, schema: SchemaRef, + metrics: IcebergWriterMetrics, +} + +pub struct IcebergWriterMetrics { + // NOTE: These 2 metrics are not used directly by us, but only kept for lifecycle management. + // They are actually used in `PrometheusWriterBuilder`: + // WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone()) + // We keep them here to let the guard cleans the labels from metrics registry when dropped + write_qps: LabelGuardedIntCounter<3>, + write_latency: LabelGuardedHistogram<3>, } enum IcebergWriterEnum { @@ -449,38 +462,57 @@ impl IcebergWriter { pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result { let builder_helper = table.builder_helper()?; + let SinkWriterParam { + sink_metrics, + extra_partition_col_idx, + actor_id, + sink_id, + sink_name, + .. + } = writer_param; + let metrics_labels = [ + &actor_id.to_string(), + &sink_id.to_string(), + sink_name.as_str(), + ]; + + // Metrics + let write_qps = sink_metrics + .iceberg_write_qps + .with_guarded_label_values(&metrics_labels); + let write_latency = sink_metrics + .iceberg_write_latency + .with_guarded_label_values(&metrics_labels); + let rolling_unflushed_data_file = sink_metrics + .iceberg_rolling_unflushed_data_file + .with_guarded_label_values(&metrics_labels); let data_file_builder = DataFileWriterBuilder::new(MonitoredBaseFileWriterBuilder::new( builder_helper .rolling_writer_builder(builder_helper.parquet_writer_builder(0, None)?)?, - writer_param - .sink_metrics - .iceberg_rolling_unflushed_data_file - .clone(), + rolling_unflushed_data_file, )); - if let Some(extra_partition_col_idx) = writer_param.extra_partition_col_idx { + + if let Some(extra_partition_col_idx) = extra_partition_col_idx { let partition_data_file_builder = builder_helper.precompute_partition_writer_builder( data_file_builder.clone(), - extra_partition_col_idx, + *extra_partition_col_idx, )?; let dispatch_builder = builder_helper .dispatcher_writer_builder(partition_data_file_builder, data_file_builder)?; let prometheus_builder = PrometheusWriterBuilder::new( dispatch_builder, - WriterMetrics::new( - writer_param.sink_metrics.iceberg_write_qps.deref().clone(), - writer_param - .sink_metrics - .iceberg_write_latency - .deref() - .clone(), - ), + WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone()), ); - let schema = Self::schema_with_extra_partition_col(&table, extra_partition_col_idx)?; + let schema = Self::schema_with_extra_partition_col(&table, *extra_partition_col_idx)?; let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); Ok(Self { inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), schema, + metrics: IcebergWriterMetrics { + write_qps, + write_latency, + }, }) } else { let partition_data_file_builder = @@ -490,20 +522,17 @@ impl IcebergWriter { // wrap a layer with collect write metrics let prometheus_builder = PrometheusWriterBuilder::new( dispatch_builder, - WriterMetrics::new( - writer_param.sink_metrics.iceberg_write_qps.deref().clone(), - writer_param - .sink_metrics - .iceberg_write_latency - .deref() - .clone(), - ), + WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone()), ); let schema = table.current_arrow_schema()?; let inner_writer = RecordBatchWriter::new(prometheus_builder.build(&schema).await?); Ok(Self { inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), schema, + metrics: IcebergWriterMetrics { + write_qps, + write_latency, + }, }) } } @@ -514,20 +543,45 @@ impl IcebergWriter { writer_param: &SinkWriterParam, ) -> Result { let builder_helper = table.builder_helper()?; + let SinkWriterParam { + executor_id, + vnode_bitmap, + meta_client, + sink_metrics, + extra_partition_col_idx, + actor_id, + sink_id, + sink_name, + connector, + } = writer_param; + let metrics_labels = [ + &actor_id.to_string(), + &sink_id.to_string(), + sink_name.as_str(), + ]; + + // Metrics + let write_qps = sink_metrics + .iceberg_write_qps + .with_guarded_label_values(&metrics_labels); + let write_latency = sink_metrics + .iceberg_write_latency + .with_guarded_label_values(&metrics_labels); + let rolling_unflushed_data_file = sink_metrics + .iceberg_rolling_unflushed_data_file + .with_guarded_label_values(&metrics_labels); + let position_delete_cache_num = sink_metrics + .iceberg_position_delete_cache_num + .with_guarded_label_values(&metrics_labels); + let data_file_builder = DataFileWriterBuilder::new(MonitoredBaseFileWriterBuilder::new( builder_helper .rolling_writer_builder(builder_helper.parquet_writer_builder(0, None)?)?, - writer_param - .sink_metrics - .iceberg_rolling_unflushed_data_file - .clone(), + rolling_unflushed_data_file, )); let position_delete_builder = MonitoredPositionDeleteWriterBuilder::new( builder_helper.position_delete_writer_builder(0, 1024)?, - writer_param - .sink_metrics - .iceberg_position_delete_cache_num - .clone(), + position_delete_cache_num, ); let equality_delete_builder = builder_helper.equality_delete_writer_builder(unique_column_ids.clone(), 0)?; @@ -537,30 +591,27 @@ impl IcebergWriter { equality_delete_builder, unique_column_ids, ); - if let Some(extra_partition_col_idx) = writer_param.extra_partition_col_idx { + if let Some(extra_partition_col_idx) = extra_partition_col_idx { let partition_delta_builder = builder_helper.precompute_partition_writer_builder( delta_builder.clone(), - extra_partition_col_idx, + *extra_partition_col_idx, )?; let dispatch_builder = builder_helper.dispatcher_writer_builder(partition_delta_builder, delta_builder)?; // wrap a layer with collect write metrics let prometheus_builder = PrometheusWriterBuilder::new( dispatch_builder, - WriterMetrics::new( - writer_param.sink_metrics.iceberg_write_qps.deref().clone(), - writer_param - .sink_metrics - .iceberg_write_latency - .deref() - .clone(), - ), + WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone()), ); - let schema = Self::schema_with_extra_partition_col(&table, extra_partition_col_idx)?; + let schema = Self::schema_with_extra_partition_col(&table, *extra_partition_col_idx)?; let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); Ok(Self { inner_writer: IcebergWriterEnum::Upsert(inner_writer), schema, + metrics: IcebergWriterMetrics { + write_qps, + write_latency, + }, }) } else { let partition_delta_builder = @@ -570,20 +621,17 @@ impl IcebergWriter { // wrap a layer with collect write metrics let prometheus_builder = PrometheusWriterBuilder::new( dispatch_builder, - WriterMetrics::new( - writer_param.sink_metrics.iceberg_write_qps.deref().clone(), - writer_param - .sink_metrics - .iceberg_write_latency - .deref() - .clone(), - ), + WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone()), ); let schema = table.current_arrow_schema()?; let inner_writer = DeltaWriter::new(prometheus_builder.build(&schema).await?); Ok(Self { inner_writer: IcebergWriterEnum::Upsert(inner_writer), schema, + metrics: IcebergWriterMetrics { + write_qps, + write_latency, + }, }) } } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 553f5c749df4..2955f2010bcf 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -314,12 +314,12 @@ pub struct SinkMetrics { // pub log_store_read_rows: LabelGuardedIntCounterVec<4>, // pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>, // - // // Iceberg metrics - // pub iceberg_write_qps: LabelGuardedIntCounterVec<3>, - // pub iceberg_write_latency: LabelGuardedHistogramVec<3>, - // pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec<3>, - // pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec<3>, - // pub iceberg_partition_num: LabelGuardedIntGaugeVec<3>, + // Iceberg metrics + pub iceberg_write_qps: LabelGuardedIntCounterVec<3>, + pub iceberg_write_latency: LabelGuardedHistogramVec<3>, + pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec<3>, + pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec<3>, + pub iceberg_partition_num: LabelGuardedIntGaugeVec<3>, // pub sink_commit_duration: LabelGuardedHistogram<4>, pub connector_sink_rows_received: LabelGuardedIntCounter<3>, @@ -329,11 +329,11 @@ pub struct SinkMetrics { pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>, pub log_store_read_rows: LabelGuardedIntCounter<4>, pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>, - pub iceberg_write_qps: LabelGuardedIntCounter<3>, - pub iceberg_write_latency: LabelGuardedHistogram<3>, - pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGauge<3>, - pub iceberg_position_delete_cache_num: LabelGuardedIntGauge<3>, - pub iceberg_partition_num: LabelGuardedIntGauge<3>, + // pub iceberg_write_qps: LabelGuardedIntCounter<3>, + // pub iceberg_write_latency: LabelGuardedHistogram<3>, + // pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGauge<3>, + // pub iceberg_position_delete_cache_num: LabelGuardedIntGauge<3>, + // pub iceberg_partition_num: LabelGuardedIntGauge<3>, } impl SinkMetrics { @@ -445,8 +445,13 @@ impl SinkMetrics { Self { sink_commit_duration, + iceberg_write_qps, + iceberg_write_latency, + iceberg_rolling_unflushed_data_file, + iceberg_position_delete_cache_num, + iceberg_partition_num, - // TODO + // TODO: remove these connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter(), log_store_first_write_epoch: LabelGuardedIntGauge::test_int_gauge(), log_store_latest_write_epoch: LabelGuardedIntGauge::test_int_gauge(), @@ -455,11 +460,6 @@ impl SinkMetrics { log_store_read_rows: LabelGuardedIntCounter::test_int_counter(), log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter::test_int_counter( ), - iceberg_write_qps: LabelGuardedIntCounter::test_int_counter(), - iceberg_write_latency: LabelGuardedHistogram::test_histogram(), - iceberg_rolling_unflushed_data_file: LabelGuardedIntGauge::test_int_gauge(), - iceberg_position_delete_cache_num: LabelGuardedIntGauge::test_int_gauge(), - iceberg_partition_num: LabelGuardedIntGauge::test_int_gauge(), } } From 1d4c2377b51b4fd7f806ff872163a3af56e2e86a Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 26 Sep 2024 14:32:13 +0800 Subject: [PATCH 03/10] refactor log store reader --- src/connector/src/sink/log_store.rs | 20 +++++++++++++++----- src/connector/src/sink/mod.rs | 23 ++++++++++------------- src/stream/src/executor/sink.rs | 28 ++++++++++++++++++++++++---- 3 files changed, 49 insertions(+), 22 deletions(-) diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index 3c25b1ec7d75..925e821ba11a 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -25,10 +25,13 @@ use futures::{TryFuture, TryFutureExt}; use risingwave_common::array::StreamChunk; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; -use risingwave_common::metrics::LabelGuardedIntCounter; +use risingwave_common::metrics::{ + LabelGuardedIntCounter, LabelGuardedIntCounterVec, LabelGuardedIntGauge, + LabelGuardedIntGaugeVec, +}; use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH}; -use crate::sink::SinkMetrics; +use crate::sink::{SinkMetrics, GLOBAL_SINK_METRICS}; pub type LogStoreResult = Result; pub type ChunkId = usize; @@ -268,11 +271,17 @@ impl LogReader for BackpressureMonitoredLogReader { pub struct MonitoredLogReader { inner: R, read_epoch: u64, - metrics: SinkMetrics, + metrics: LogReaderMetrics, +} + +pub struct LogReaderMetrics { + pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>, + pub log_store_read_rows: LabelGuardedIntCounter<4>, + pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>, } impl MonitoredLogReader { - pub fn new(inner: R, metrics: SinkMetrics) -> Self { + pub fn new(inner: R, metrics: LogReaderMetrics) -> Self { Self { inner, read_epoch: INVALID_EPOCH, @@ -327,7 +336,8 @@ where TransformChunkLogReader { f, inner: self } } - pub fn monitored(self, metrics: SinkMetrics) -> impl LogReader { + pub fn monitored(self, metrics: LogReaderMetrics) -> impl LogReader { + // TODO: The `clone()` can be avoided if move backpressure inside `MonitoredLogReader` let wait_new_future_duration = metrics.log_store_reader_wait_new_future_duration_ns.clone(); BackpressureMonitoredLogReader::new( MonitoredLogReader::new(self, metrics), diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 2955f2010bcf..866aa7da135e 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -310,10 +310,9 @@ pub struct SinkMetrics { // pub log_store_first_write_epoch: LabelGuardedIntGaugeVec<4>, // pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec<4>, // pub log_store_write_rows: LabelGuardedIntCounterVec<4>, - // pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec<4>, - // pub log_store_read_rows: LabelGuardedIntCounterVec<4>, - // pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>, - // + pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec<4>, + pub log_store_read_rows: LabelGuardedIntCounterVec<4>, + pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>, // Iceberg metrics pub iceberg_write_qps: LabelGuardedIntCounterVec<3>, pub iceberg_write_latency: LabelGuardedHistogramVec<3>, @@ -326,9 +325,9 @@ pub struct SinkMetrics { pub log_store_first_write_epoch: LabelGuardedIntGauge<4>, pub log_store_latest_write_epoch: LabelGuardedIntGauge<4>, pub log_store_write_rows: LabelGuardedIntCounter<4>, - pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>, - pub log_store_read_rows: LabelGuardedIntCounter<4>, - pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>, + // pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>, + // pub log_store_read_rows: LabelGuardedIntCounter<4>, + // pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>, // pub iceberg_write_qps: LabelGuardedIntCounter<3>, // pub iceberg_write_latency: LabelGuardedHistogram<3>, // pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGauge<3>, @@ -445,6 +444,9 @@ impl SinkMetrics { Self { sink_commit_duration, + log_store_latest_read_epoch, + log_store_read_rows, + log_store_reader_wait_new_future_duration_ns, iceberg_write_qps, iceberg_write_latency, iceberg_rolling_unflushed_data_file, @@ -455,11 +457,7 @@ impl SinkMetrics { connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter(), log_store_first_write_epoch: LabelGuardedIntGauge::test_int_gauge(), log_store_latest_write_epoch: LabelGuardedIntGauge::test_int_gauge(), - log_store_latest_read_epoch: LabelGuardedIntGauge::test_int_gauge(), log_store_write_rows: LabelGuardedIntCounter::test_int_counter(), - log_store_read_rows: LabelGuardedIntCounter::test_int_counter(), - log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter::test_int_counter( - ), } } @@ -508,8 +506,7 @@ pub struct SinkWriterMetrics { impl SinkWriterMetrics { pub fn new(writer_param: &SinkWriterParam) -> Self { - let sink_commit_duration = writer_param - .sink_metrics + let sink_commit_duration = GLOBAL_SINK_METRICS .sink_commit_duration .with_guarded_label_values(&[ &writer_param.actor_id.to_string(), diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 23162241ef29..06bca69754a1 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -27,10 +27,11 @@ use risingwave_common_estimate_size::EstimateSize; use risingwave_connector::dispatch_sink; use risingwave_connector::sink::catalog::SinkType; use risingwave_connector::sink::log_store::{ - LogReader, LogReaderExt, LogStoreFactory, LogWriter, LogWriterExt, + BackpressureMonitoredLogReader, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory, + LogWriter, LogWriterExt, MonitoredLogReader, }; use risingwave_connector::sink::{ - build_sink, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam, + build_sink, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam, GLOBAL_SINK_METRICS, }; use thiserror_ext::AsReport; @@ -443,14 +444,33 @@ impl SinkExecutor { mut sink_writer_param: SinkWriterParam, actor_context: ActorContextRef, ) -> StreamExecutorResult { - let metrics = sink_writer_param.sink_metrics.clone(); - let visible_columns = columns .iter() .enumerate() .filter_map(|(idx, column)| (!column.is_hidden).then_some(idx)) .collect_vec(); + let labels = [ + &actor_context.id.to_string(), + sink_writer_param.connector.as_str(), + &sink_writer_param.sink_id.to_string(), + sink_writer_param.sink_name.as_str(), + ]; + let log_store_reader_wait_new_future_duration_ns = GLOBAL_SINK_METRICS + .log_store_reader_wait_new_future_duration_ns + .with_guarded_label_values(&labels); + let log_store_read_rows = GLOBAL_SINK_METRICS + .log_store_read_rows + .with_guarded_label_values(&labels); + let log_store_latest_read_epoch = GLOBAL_SINK_METRICS + .log_store_latest_read_epoch + .with_guarded_label_values(&labels); + let metrics = LogReaderMetrics { + log_store_latest_read_epoch, + log_store_read_rows, + log_store_reader_wait_new_future_duration_ns, + }; + let mut log_reader = log_reader .transform_chunk(move |chunk| { if visible_columns.len() != columns.len() { From 72be8fbbf5b61960f0d3b1507920dbdb7ce7a9bc Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 26 Sep 2024 14:50:04 +0800 Subject: [PATCH 04/10] refactor log store writer --- risedev.yml | 7 +++++++ src/connector/src/sink/log_store.rs | 17 +++++++++-------- src/connector/src/sink/mod.rs | 18 +++++++++--------- src/stream/src/executor/sink.rs | 25 +++++++++++++++++++++++-- 4 files changed, 48 insertions(+), 19 deletions(-) diff --git a/risedev.yml b/risedev.yml index 96443a7c0d6e..bff947b10276 100644 --- a/risedev.yml +++ b/risedev.yml @@ -132,6 +132,13 @@ profile: - use: grafana - use: kafka persist-data: true + - use: schema-registry + + kafka-family: + steps: + - use: kafka + persist-data: true + - use: schema-registry standalone-full-peripherals: steps: diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index 925e821ba11a..bddc79b48654 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -25,14 +25,9 @@ use futures::{TryFuture, TryFutureExt}; use risingwave_common::array::StreamChunk; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; -use risingwave_common::metrics::{ - LabelGuardedIntCounter, LabelGuardedIntCounterVec, LabelGuardedIntGauge, - LabelGuardedIntGaugeVec, -}; +use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge}; use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH}; -use crate::sink::{SinkMetrics, GLOBAL_SINK_METRICS}; - pub type LogStoreResult = Result; pub type ChunkId = usize; @@ -348,7 +343,13 @@ where pub struct MonitoredLogWriter { inner: W, - metrics: SinkMetrics, + metrics: LogWriterMetrics, +} + +pub struct LogWriterMetrics { + pub log_store_first_write_epoch: LabelGuardedIntGauge<4>, + pub log_store_latest_write_epoch: LabelGuardedIntGauge<4>, + pub log_store_write_rows: LabelGuardedIntCounter<4>, } impl LogWriter for MonitoredLogWriter { @@ -405,7 +406,7 @@ impl T where T: LogWriter + Sized, { - pub fn monitored(self, metrics: SinkMetrics) -> MonitoredLogWriter { + pub fn monitored(self, metrics: LogWriterMetrics) -> MonitoredLogWriter { MonitoredLogWriter { inner: self, metrics, diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 866aa7da135e..6f51b6acfeea 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -307,9 +307,9 @@ pub struct SinkMetrics { // pub connector_sink_rows_received: LabelGuardedIntCounterVec<3>, // // // Log store metrics - // pub log_store_first_write_epoch: LabelGuardedIntGaugeVec<4>, - // pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec<4>, - // pub log_store_write_rows: LabelGuardedIntCounterVec<4>, + pub log_store_first_write_epoch: LabelGuardedIntGaugeVec<4>, + pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec<4>, + pub log_store_write_rows: LabelGuardedIntCounterVec<4>, pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec<4>, pub log_store_read_rows: LabelGuardedIntCounterVec<4>, pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>, @@ -322,9 +322,9 @@ pub struct SinkMetrics { // pub sink_commit_duration: LabelGuardedHistogram<4>, pub connector_sink_rows_received: LabelGuardedIntCounter<3>, - pub log_store_first_write_epoch: LabelGuardedIntGauge<4>, - pub log_store_latest_write_epoch: LabelGuardedIntGauge<4>, - pub log_store_write_rows: LabelGuardedIntCounter<4>, + // pub log_store_first_write_epoch: LabelGuardedIntGauge<4>, + // pub log_store_latest_write_epoch: LabelGuardedIntGauge<4>, + // pub log_store_write_rows: LabelGuardedIntCounter<4>, // pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>, // pub log_store_read_rows: LabelGuardedIntCounter<4>, // pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>, @@ -444,6 +444,9 @@ impl SinkMetrics { Self { sink_commit_duration, + log_store_first_write_epoch, + log_store_latest_write_epoch, + log_store_write_rows, log_store_latest_read_epoch, log_store_read_rows, log_store_reader_wait_new_future_duration_ns, @@ -455,9 +458,6 @@ impl SinkMetrics { // TODO: remove these connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter(), - log_store_first_write_epoch: LabelGuardedIntGauge::test_int_gauge(), - log_store_latest_write_epoch: LabelGuardedIntGauge::test_int_gauge(), - log_store_write_rows: LabelGuardedIntCounter::test_int_counter(), } } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 06bca69754a1..0f91dd9aaa04 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -28,7 +28,7 @@ use risingwave_connector::dispatch_sink; use risingwave_connector::sink::catalog::SinkType; use risingwave_connector::sink::log_store::{ BackpressureMonitoredLogReader, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory, - LogWriter, LogWriterExt, MonitoredLogReader, + LogWriter, LogWriterExt, LogWriterMetrics, MonitoredLogReader, }; use risingwave_connector::sink::{ build_sink, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam, GLOBAL_SINK_METRICS, @@ -219,12 +219,33 @@ impl SinkExecutor { if self.sink.is_sink_into_table() { processed_input.boxed() } else { + let labels = [ + &actor_id.to_string(), + "NA", // TODO: remove the connector label for log writer metrics + &sink_id.to_string(), + self.sink_param.sink_name.as_str(), + ]; + let log_store_first_write_epoch = GLOBAL_SINK_METRICS + .log_store_first_write_epoch + .with_guarded_label_values(&labels); + let log_store_latest_write_epoch = GLOBAL_SINK_METRICS + .log_store_latest_write_epoch + .with_guarded_label_values(&labels); + let log_store_write_rows = GLOBAL_SINK_METRICS + .log_store_write_rows + .with_guarded_label_values(&labels); + let log_writer_metrics = LogWriterMetrics { + log_store_first_write_epoch, + log_store_latest_write_epoch, + log_store_write_rows, + }; + self.log_store_factory .build() .map(move |(log_reader, log_writer)| { let write_log_stream = Self::execute_write_log( processed_input, - log_writer.monitored(self.sink_writer_param.sink_metrics.clone()), + log_writer.monitored(log_writer_metrics), actor_id, ); From f5be47728b043d6665a18c7363b1a63bf87d788d Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 26 Sep 2024 14:53:25 +0800 Subject: [PATCH 05/10] little refactor: remove dead code --- src/connector/src/sink/remote.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 523761155237..901960328532 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -554,8 +554,6 @@ impl Sink for CoordinatedRemoteSink { } pub struct CoordinatedRemoteSinkWriter { - #[expect(dead_code)] - properties: BTreeMap, epoch: Option, batch_id: u64, stream_handle: SinkWriterStreamHandle, @@ -570,7 +568,6 @@ impl CoordinatedRemoteSinkWriter { .await?; Ok(Self { - properties: param.properties, epoch: None, batch_id: 0, stream_handle, From b0cc191e98c18f77afdaf2de6e766c4c9193588e Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 26 Sep 2024 15:01:52 +0800 Subject: [PATCH 06/10] refactor connector_sink_rows_received --- src/connector/src/sink/mod.rs | 27 ++++++++++++++++++--------- src/connector/src/sink/remote.rs | 23 +++++++++-------------- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 6f51b6acfeea..4c5bdc1001e4 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -304,8 +304,7 @@ pub static GLOBAL_SINK_METRICS: LazyLock = #[derive(Clone)] pub struct SinkMetrics { pub sink_commit_duration: LabelGuardedHistogramVec<4>, - // pub connector_sink_rows_received: LabelGuardedIntCounterVec<3>, - // + pub connector_sink_rows_received: LabelGuardedIntCounterVec<3>, // // Log store metrics pub log_store_first_write_epoch: LabelGuardedIntGaugeVec<4>, pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec<4>, @@ -319,9 +318,8 @@ pub struct SinkMetrics { pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec<3>, pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec<3>, pub iceberg_partition_num: LabelGuardedIntGaugeVec<3>, - // pub sink_commit_duration: LabelGuardedHistogram<4>, - pub connector_sink_rows_received: LabelGuardedIntCounter<3>, + // pub connector_sink_rows_received: LabelGuardedIntCounter<3>, // pub log_store_first_write_epoch: LabelGuardedIntGauge<4>, // pub log_store_latest_write_epoch: LabelGuardedIntGauge<4>, // pub log_store_write_rows: LabelGuardedIntCounter<4>, @@ -444,6 +442,7 @@ impl SinkMetrics { Self { sink_commit_duration, + connector_sink_rows_received, log_store_first_write_epoch, log_store_latest_write_epoch, log_store_write_rows, @@ -455,9 +454,6 @@ impl SinkMetrics { iceberg_rolling_unflushed_data_file, iceberg_position_delete_cache_num, iceberg_partition_num, - - // TODO: remove these - connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter(), } } @@ -500,8 +496,11 @@ pub struct SinkWriterParam { pub sink_name: String, pub connector: String, } + +#[derive(Clone)] pub struct SinkWriterMetrics { pub sink_commit_duration: LabelGuardedHistogram<4>, + pub connector_sink_rows_received: LabelGuardedIntCounter<3>, } impl SinkWriterMetrics { @@ -514,9 +513,19 @@ impl SinkWriterMetrics { &writer_param.sink_id.to_string(), writer_param.sink_name.as_str(), ]); - return SinkWriterMetrics { + let connector_sink_rows_received = GLOBAL_SINK_METRICS + .connector_sink_rows_received + .with_guarded_label_values(&[ + // TODO: should have `actor_id` as label + // &writer_param.actor_id.to_string(), + writer_param.connector.as_str(), + &writer_param.sink_id.to_string(), + writer_param.sink_name.as_str(), + ]); + Self { sink_commit_duration, - }; + connector_sink_rows_received, + } } } diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 901960328532..e6250e30132a 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -437,11 +437,10 @@ impl LogSinker for RemoteLogSinker { if let Some(prev_offset) = &prev_offset { prev_offset.check_next_offset(offset)?; } - // TODO - // let cardinality = chunk.cardinality(); - // sink_writer_metrics - // .connector_sink_rows_received - // .inc_by(cardinality as _); + let cardinality = chunk.cardinality(); + sink_writer_metrics + .connector_sink_rows_received + .inc_by(cardinality as _); let chunk = self.stream_chunk_converter.convert_chunk(chunk)?; request_tx @@ -541,8 +540,7 @@ impl Sink for CoordinatedRemoteSink { "sink needs coordination and should not have singleton input" )) })?, - CoordinatedRemoteSinkWriter::new(self.param.clone(), writer_param.sink_metrics.clone()) - .await?, + CoordinatedRemoteSinkWriter::new(self.param.clone(), metrics.clone()).await?, ) .await? .into_log_sinker(metrics)) @@ -557,11 +555,11 @@ pub struct CoordinatedRemoteSinkWriter { epoch: Option, batch_id: u64, stream_handle: SinkWriterStreamHandle, - sink_metrics: SinkMetrics, + metrics: SinkWriterMetrics, } impl CoordinatedRemoteSinkWriter { - pub async fn new(param: SinkParam, sink_metrics: SinkMetrics) -> Result { + pub async fn new(param: SinkParam, metrics: SinkWriterMetrics) -> Result { let sink_proto = param.to_proto(); let stream_handle = EmbeddedConnectorClient::new()? .start_sink_writer_stream(sink_proto.table_schema.clone(), sink_proto) @@ -571,7 +569,7 @@ impl CoordinatedRemoteSinkWriter { epoch: None, batch_id: 0, stream_handle, - sink_metrics, + metrics, }) } @@ -582,8 +580,6 @@ impl CoordinatedRemoteSinkWriter { ) -> CoordinatedRemoteSinkWriter { use futures::StreamExt; - let properties = BTreeMap::from([("output.path".to_string(), "/tmp/rw".to_string())]); - let stream_handle = SinkWriterStreamHandle::for_test( request_sender, ReceiverStream::new(response_receiver) @@ -592,7 +588,6 @@ impl CoordinatedRemoteSinkWriter { ); CoordinatedRemoteSinkWriter { - properties, epoch: None, batch_id: 0, stream_handle, @@ -607,7 +602,7 @@ impl SinkWriter for CoordinatedRemoteSinkWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { let cardinality = chunk.cardinality(); - self.sink_metrics + self.metrics .connector_sink_rows_received .inc_by(cardinality as _); From 409c6b57f734f5bb7d43d3393422bcf397bcdb87 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 26 Sep 2024 15:08:13 +0800 Subject: [PATCH 07/10] remove the sink metrics from StreamingStats --- src/connector/src/sink/iceberg/mod.rs | 25 ++-- src/connector/src/sink/mod.rs | 2 - .../src/executor/monitor/streaming_stats.rs | 135 ------------------ src/stream/src/from_proto/sink.rs | 5 +- 4 files changed, 12 insertions(+), 155 deletions(-) diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index b2e5d53366ff..6cdf30b02fa0 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -58,8 +58,8 @@ use super::decouple_checkpoint_log_sink::{ default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf, }; use super::{ - Sink, SinkError, SinkWriterMetrics, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, - SINK_TYPE_UPSERT, + Sink, SinkError, SinkWriterMetrics, SinkWriterParam, GLOBAL_SINK_METRICS, + SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; use crate::connector_common::IcebergCommon; use crate::sink::coordinate::CoordinatedSinkWriter; @@ -463,7 +463,6 @@ impl IcebergWriter { pub async fn new_append_only(table: Table, writer_param: &SinkWriterParam) -> Result { let builder_helper = table.builder_helper()?; let SinkWriterParam { - sink_metrics, extra_partition_col_idx, actor_id, sink_id, @@ -477,13 +476,13 @@ impl IcebergWriter { ]; // Metrics - let write_qps = sink_metrics + let write_qps = GLOBAL_SINK_METRICS .iceberg_write_qps .with_guarded_label_values(&metrics_labels); - let write_latency = sink_metrics + let write_latency = GLOBAL_SINK_METRICS .iceberg_write_latency .with_guarded_label_values(&metrics_labels); - let rolling_unflushed_data_file = sink_metrics + let rolling_unflushed_data_file = GLOBAL_SINK_METRICS .iceberg_rolling_unflushed_data_file .with_guarded_label_values(&metrics_labels); @@ -544,15 +543,11 @@ impl IcebergWriter { ) -> Result { let builder_helper = table.builder_helper()?; let SinkWriterParam { - executor_id, - vnode_bitmap, - meta_client, - sink_metrics, extra_partition_col_idx, actor_id, sink_id, sink_name, - connector, + .. } = writer_param; let metrics_labels = [ &actor_id.to_string(), @@ -561,16 +556,16 @@ impl IcebergWriter { ]; // Metrics - let write_qps = sink_metrics + let write_qps = GLOBAL_SINK_METRICS .iceberg_write_qps .with_guarded_label_values(&metrics_labels); - let write_latency = sink_metrics + let write_latency = GLOBAL_SINK_METRICS .iceberg_write_latency .with_guarded_label_values(&metrics_labels); - let rolling_unflushed_data_file = sink_metrics + let rolling_unflushed_data_file = GLOBAL_SINK_METRICS .iceberg_rolling_unflushed_data_file .with_guarded_label_values(&metrics_labels); - let position_delete_cache_num = sink_metrics + let position_delete_cache_num = GLOBAL_SINK_METRICS .iceberg_position_delete_cache_num .with_guarded_label_values(&metrics_labels); diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 4c5bdc1001e4..8075500c756d 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -484,7 +484,6 @@ pub struct SinkWriterParam { pub executor_id: u64, pub vnode_bitmap: Option, pub meta_client: Option, - pub sink_metrics: SinkMetrics, // The val has two effect: // 1. Indicates that the sink will accpect the data chunk with extra partition value column. // 2. The index of the extra partition value column. @@ -558,7 +557,6 @@ impl SinkWriterParam { executor_id: Default::default(), vnode_bitmap: Default::default(), meta_client: Default::default(), - sink_metrics: SinkMetrics::for_test(), extra_partition_col_idx: Default::default(), actor_id: 1, diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs index 3b201a1abd56..3a75dadd1448 100644 --- a/src/stream/src/executor/monitor/streaming_stats.rs +++ b/src/stream/src/executor/monitor/streaming_stats.rs @@ -160,16 +160,6 @@ pub struct StreamingMetrics { /// The progress made by the earliest in-flight barriers in the local barrier manager. pub barrier_manager_progress: IntCounter, - // Sink related metrics TODO: TO REMOVE - sink_commit_duration: LabelGuardedHistogramVec<4>, - connector_sink_rows_received: LabelGuardedIntCounterVec<3>, - log_store_first_write_epoch: LabelGuardedIntGaugeVec<4>, - log_store_latest_write_epoch: LabelGuardedIntGaugeVec<4>, - log_store_write_rows: LabelGuardedIntCounterVec<4>, - log_store_latest_read_epoch: LabelGuardedIntGaugeVec<4>, - log_store_read_rows: LabelGuardedIntCounterVec<4>, - log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>, - pub kv_log_store_storage_write_count: LabelGuardedIntCounterVec<4>, pub kv_log_store_storage_write_size: LabelGuardedIntCounterVec<4>, pub kv_log_store_rewind_count: LabelGuardedIntCounterVec<4>, @@ -181,13 +171,6 @@ pub struct StreamingMetrics { pub kv_log_store_buffer_unconsumed_epoch_count: LabelGuardedIntGaugeVec<4>, pub kv_log_store_buffer_unconsumed_min_epoch: LabelGuardedIntGaugeVec<4>, - // Sink iceberg metrics TODO: to remove - iceberg_write_qps: LabelGuardedIntCounterVec<3>, - iceberg_write_latency: LabelGuardedHistogramVec<3>, - iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec<3>, - iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec<3>, - iceberg_partition_num: LabelGuardedIntGaugeVec<3>, - // Memory management pub lru_runtime_loop_count: IntCounter, pub lru_latest_sequence: IntGauge, @@ -810,71 +793,6 @@ impl StreamingMetrics { ) .unwrap(); - let sink_commit_duration = register_guarded_histogram_vec_with_registry!( - "sink_commit_duration", - "Duration of commit op in sink", - &["actor_id", "connector", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!( - "connector_sink_rows_received", - "Number of rows received by sink", - &["connector_type", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!( - "log_store_first_write_epoch", - "The first write epoch of log store", - &["actor_id", "connector", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!( - "log_store_latest_write_epoch", - "The latest write epoch of log store", - &["actor_id", "connector", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let log_store_write_rows = register_guarded_int_counter_vec_with_registry!( - "log_store_write_rows", - "The write rate of rows", - &["actor_id", "connector", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!( - "log_store_latest_read_epoch", - "The latest read epoch of log store", - &["actor_id", "connector", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let log_store_read_rows = register_guarded_int_counter_vec_with_registry!( - "log_store_read_rows", - "The read rate of rows", - &["actor_id", "connector", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let log_store_reader_wait_new_future_duration_ns = - register_guarded_int_counter_vec_with_registry!( - "log_store_reader_wait_new_future_duration_ns", - "Accumulated duration of LogReader to wait for next call to create future", - &["actor_id", "connector", "sink_id", "sink_name"], - registry - ) - .unwrap(); - let kv_log_store_storage_write_count = register_guarded_int_counter_vec_with_registry!( "kv_log_store_storage_write_count", "Write row count throughput of kv log store", @@ -1073,46 +991,6 @@ impl StreamingMetrics { .unwrap() .relabel_debug_1(level); - let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!( - "iceberg_write_qps", - "The qps of iceberg writer", - &["actor_id", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let iceberg_write_latency = register_guarded_histogram_vec_with_registry!( - "iceberg_write_latency", - "The latency of iceberg writer", - &["actor_id", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!( - "iceberg_rolling_unflushed_data_file", - "The unflushed data file count of iceberg rolling writer", - &["actor_id", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!( - "iceberg_position_delete_cache_num", - "The delete cache num of iceberg position delete writer", - &["actor_id", "sink_id", "sink_name"], - registry - ) - .unwrap(); - - let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!( - "iceberg_partition_num", - "The partition num of iceberg partition writer", - &["actor_id", "sink_id", "sink_name"], - registry - ) - .unwrap(); - Self { level, executor_row_count, @@ -1185,14 +1063,6 @@ impl StreamingMetrics { barrier_inflight_latency, barrier_sync_latency, barrier_manager_progress, - sink_commit_duration, - connector_sink_rows_received, - log_store_first_write_epoch, - log_store_latest_write_epoch, - log_store_write_rows, - log_store_latest_read_epoch, - log_store_read_rows, - log_store_reader_wait_new_future_duration_ns, kv_log_store_storage_write_count, kv_log_store_storage_write_size, kv_log_store_rewind_count, @@ -1203,11 +1073,6 @@ impl StreamingMetrics { kv_log_store_buffer_unconsumed_row_count, kv_log_store_buffer_unconsumed_epoch_count, kv_log_store_buffer_unconsumed_min_epoch, - iceberg_write_qps, - iceberg_write_latency, - iceberg_rolling_unflushed_data_file, - iceberg_position_delete_cache_num, - iceberg_partition_num, lru_runtime_loop_count, lru_latest_sequence, lru_watermark_sequence, diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 941416e182d7..06c64468f1b5 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -22,8 +22,8 @@ use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkId, SinkType}; use risingwave_connector::sink::file_sink::fs::FsSink; use risingwave_connector::sink::{ - SinkError, SinkMetaClient, SinkMetrics, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, - GLOBAL_SINK_METRICS, SINK_TYPE_OPTION, + SinkError, SinkMetaClient, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, GLOBAL_SINK_METRICS, + SINK_TYPE_OPTION, }; use risingwave_pb::catalog::Table; use risingwave_pb::plan_common::PbColumnCatalog; @@ -221,7 +221,6 @@ impl ExecutorBuilder for SinkExecutorBuilder { executor_id: params.executor_id, vnode_bitmap: params.vnode_bitmap.clone(), meta_client: params.env.meta_client().map(SinkMetaClient::MetaClient), - sink_metrics: GLOBAL_SINK_METRICS.clone(), // TODO: remove me extra_partition_col_idx: sink_desc.extra_partition_col_idx.map(|v| v as usize), actor_id: params.actor_context.id, From f937ed29993db99eb0894a6bc4abfca3a81aa7c7 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 26 Sep 2024 15:12:26 +0800 Subject: [PATCH 08/10] clean code --- .../src/sink/decouple_checkpoint_log_sink.rs | 4 +- src/connector/src/sink/iceberg/mod.rs | 30 +++++++------- src/connector/src/sink/mod.rs | 40 ++----------------- .../src/executor/monitor/streaming_stats.rs | 1 - src/stream/src/executor/sink.rs | 4 +- src/stream/src/from_proto/sink.rs | 3 +- 6 files changed, 24 insertions(+), 58 deletions(-) diff --git a/src/connector/src/sink/decouple_checkpoint_log_sink.rs b/src/connector/src/sink/decouple_checkpoint_log_sink.rs index 0b877068c24b..5da0e725bf65 100644 --- a/src/connector/src/sink/decouple_checkpoint_log_sink.rs +++ b/src/connector/src/sink/decouple_checkpoint_log_sink.rs @@ -16,11 +16,11 @@ use std::num::NonZeroU64; use std::time::Instant; use async_trait::async_trait; -use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedHistogramVec}; use crate::sink::log_store::{LogStoreReadItem, TruncateOffset}; use crate::sink::writer::SinkWriter; -use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics, SinkWriterMetrics}; +use crate::sink::{LogSinker, Result, SinkLogReader, SinkWriterMetrics}; + pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE: u64 = 10; pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE: u64 = 1; pub const COMMIT_CHECKPOINT_INTERVAL: &str = "commit_checkpoint_interval"; diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 6cdf30b02fa0..bc83503eb6ae 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -417,7 +417,7 @@ impl Sink for IcebergSink { pub struct IcebergWriter { inner_writer: IcebergWriterEnum, schema: SchemaRef, - metrics: IcebergWriterMetrics, + _metrics: IcebergWriterMetrics, } pub struct IcebergWriterMetrics { @@ -425,8 +425,8 @@ pub struct IcebergWriterMetrics { // They are actually used in `PrometheusWriterBuilder`: // WriterMetrics::new(write_qps.deref().clone(), write_latency.deref().clone()) // We keep them here to let the guard cleans the labels from metrics registry when dropped - write_qps: LabelGuardedIntCounter<3>, - write_latency: LabelGuardedHistogram<3>, + _write_qps: LabelGuardedIntCounter<3>, + _write_latency: LabelGuardedHistogram<3>, } enum IcebergWriterEnum { @@ -508,9 +508,9 @@ impl IcebergWriter { Ok(Self { inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), schema, - metrics: IcebergWriterMetrics { - write_qps, - write_latency, + _metrics: IcebergWriterMetrics { + _write_qps: write_qps, + _write_latency: write_latency, }, }) } else { @@ -528,9 +528,9 @@ impl IcebergWriter { Ok(Self { inner_writer: IcebergWriterEnum::AppendOnly(inner_writer), schema, - metrics: IcebergWriterMetrics { - write_qps, - write_latency, + _metrics: IcebergWriterMetrics { + _write_qps: write_qps, + _write_latency: write_latency, }, }) } @@ -603,9 +603,9 @@ impl IcebergWriter { Ok(Self { inner_writer: IcebergWriterEnum::Upsert(inner_writer), schema, - metrics: IcebergWriterMetrics { - write_qps, - write_latency, + _metrics: IcebergWriterMetrics { + _write_qps: write_qps, + _write_latency: write_latency, }, }) } else { @@ -623,9 +623,9 @@ impl IcebergWriter { Ok(Self { inner_writer: IcebergWriterEnum::Upsert(inner_writer), schema, - metrics: IcebergWriterMetrics { - write_qps, - write_latency, + _metrics: IcebergWriterMetrics { + _write_qps: write_qps, + _write_latency: write_latency, }, }) } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 8075500c756d..589ce0dc75d0 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -66,11 +66,10 @@ use prometheus::Registry; use risingwave_common::array::ArrayError; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; -use risingwave_common::config::MetricLevel; use risingwave_common::hash::ActorId; use risingwave_common::metrics::{ LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter, - LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec, + LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec, }; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; use risingwave_common::secret::{LocalSecretManager, SecretError}; @@ -305,32 +304,21 @@ pub static GLOBAL_SINK_METRICS: LazyLock = pub struct SinkMetrics { pub sink_commit_duration: LabelGuardedHistogramVec<4>, pub connector_sink_rows_received: LabelGuardedIntCounterVec<3>, - // // Log store metrics + + // Log store metrics pub log_store_first_write_epoch: LabelGuardedIntGaugeVec<4>, pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec<4>, pub log_store_write_rows: LabelGuardedIntCounterVec<4>, pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec<4>, pub log_store_read_rows: LabelGuardedIntCounterVec<4>, pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>, + // Iceberg metrics pub iceberg_write_qps: LabelGuardedIntCounterVec<3>, pub iceberg_write_latency: LabelGuardedHistogramVec<3>, pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec<3>, pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec<3>, pub iceberg_partition_num: LabelGuardedIntGaugeVec<3>, - // pub sink_commit_duration: LabelGuardedHistogram<4>, - // pub connector_sink_rows_received: LabelGuardedIntCounter<3>, - // pub log_store_first_write_epoch: LabelGuardedIntGauge<4>, - // pub log_store_latest_write_epoch: LabelGuardedIntGauge<4>, - // pub log_store_write_rows: LabelGuardedIntCounter<4>, - // pub log_store_latest_read_epoch: LabelGuardedIntGauge<4>, - // pub log_store_read_rows: LabelGuardedIntCounter<4>, - // pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter<4>, - // pub iceberg_write_qps: LabelGuardedIntCounter<3>, - // pub iceberg_write_latency: LabelGuardedHistogram<3>, - // pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGauge<3>, - // pub iceberg_position_delete_cache_num: LabelGuardedIntGauge<3>, - // pub iceberg_partition_num: LabelGuardedIntGauge<3>, } impl SinkMetrics { @@ -456,26 +444,6 @@ impl SinkMetrics { iceberg_partition_num, } } - - fn for_test() -> Self { - todo!() - // SinkMetrics { - // sink_commit_duration_metrics: LabelGuardedHistogram::test_histogram(), - // connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter(), - // log_store_first_write_epoch: LabelGuardedIntGauge::test_int_gauge(), - // log_store_latest_write_epoch: LabelGuardedIntGauge::test_int_gauge(), - // log_store_latest_read_epoch: LabelGuardedIntGauge::test_int_gauge(), - // log_store_write_rows: LabelGuardedIntCounter::test_int_counter(), - // log_store_read_rows: LabelGuardedIntCounter::test_int_counter(), - // log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounter::test_int_counter( - // ), - // iceberg_write_qps: LabelGuardedIntCounter::test_int_counter(), - // iceberg_write_latency: LabelGuardedHistogram::test_histogram(), - // iceberg_rolling_unflushed_data_file: LabelGuardedIntGauge::test_int_gauge(), - // iceberg_position_delete_cache_num: LabelGuardedIntGauge::test_int_gauge(), - // iceberg_partition_num: LabelGuardedIntGauge::test_int_gauge(), - // } - } } #[derive(Clone)] diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs index 3a75dadd1448..b748bf4ec393 100644 --- a/src/stream/src/executor/monitor/streaming_stats.rs +++ b/src/stream/src/executor/monitor/streaming_stats.rs @@ -27,7 +27,6 @@ use risingwave_common::metrics::{ RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec, RelabeledGuardedIntGaugeVec, }; use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; -use risingwave_common::util::epoch::Epoch; use risingwave_common::{ register_guarded_gauge_vec_with_registry, register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry, diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 0f91dd9aaa04..6877d3e1550c 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -27,8 +27,8 @@ use risingwave_common_estimate_size::EstimateSize; use risingwave_connector::dispatch_sink; use risingwave_connector::sink::catalog::SinkType; use risingwave_connector::sink::log_store::{ - BackpressureMonitoredLogReader, LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory, - LogWriter, LogWriterExt, LogWriterMetrics, MonitoredLogReader, + LogReader, LogReaderExt, LogReaderMetrics, LogStoreFactory, LogWriter, LogWriterExt, + LogWriterMetrics, }; use risingwave_connector::sink::{ build_sink, LogSinker, Sink, SinkImpl, SinkParam, SinkWriterParam, GLOBAL_SINK_METRICS, diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index 06c64468f1b5..862485e5e56f 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -22,8 +22,7 @@ use risingwave_connector::match_sink_name_str; use risingwave_connector::sink::catalog::{SinkFormatDesc, SinkId, SinkType}; use risingwave_connector::sink::file_sink::fs::FsSink; use risingwave_connector::sink::{ - SinkError, SinkMetaClient, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, GLOBAL_SINK_METRICS, - SINK_TYPE_OPTION, + SinkError, SinkMetaClient, SinkParam, SinkWriterParam, CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, }; use risingwave_pb::catalog::Table; use risingwave_pb::plan_common::PbColumnCatalog; From 8d73061b917fd56fbcb4c3c720e046956af02a80 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 26 Sep 2024 15:18:11 +0800 Subject: [PATCH 09/10] fix cfg(test) code --- src/connector/src/sink/iceberg/mod.rs | 5 ++--- src/connector/src/sink/mod.rs | 8 ++++++++ src/connector/src/sink/remote.rs | 7 +++---- src/connector/src/sink/writer.rs | 2 +- src/stream/src/executor/monitor/streaming_stats.rs | 1 - 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index bc83503eb6ae..95dc0b531219 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -41,9 +41,7 @@ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::bail; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::Schema; -use risingwave_common::metrics::{ - LabelGuardedHistogram, LabelGuardedIntCounter, LabelGuardedMetric, -}; +use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter}; use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized; use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; use risingwave_pb::connector_service::SinkMetadata; @@ -417,6 +415,7 @@ impl Sink for IcebergSink { pub struct IcebergWriter { inner_writer: IcebergWriterEnum, schema: SchemaRef, + // See comments below _metrics: IcebergWriterMetrics, } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 589ce0dc75d0..7b1021058058 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -494,6 +494,14 @@ impl SinkWriterMetrics { connector_sink_rows_received, } } + + #[cfg(test)] + pub fn for_test() -> Self { + Self { + sink_commit_duration: LabelGuardedHistogram::test_histogram(), + connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter(), + } + } } #[derive(Clone)] diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index e6250e30132a..4fe0a76552fc 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, VecDeque}; +use std::collections::VecDeque; use std::marker::PhantomData; use std::ops::Deref; use std::pin::pin; @@ -28,7 +28,6 @@ use prost::Message; use risingwave_common::array::StreamChunk; use risingwave_common::bail; use risingwave_common::catalog::{ColumnDesc, ColumnId}; -use risingwave_common::metrics::LabelGuardedHistogram; use risingwave_common::session_config::sink_decouple::SinkDecouple; use risingwave_common::types::DataType; use risingwave_jni_core::jvm_runtime::{execute_with_jni_env, JVM}; @@ -65,7 +64,7 @@ use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset}; use crate::sink::writer::{LogSinkerOf, SinkWriter, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, LogSinker, Result, Sink, SinkCommitCoordinator, SinkError, - SinkLogReader, SinkMetrics, SinkParam, SinkWriterMetrics, SinkWriterParam, + SinkLogReader, SinkParam, SinkWriterMetrics, SinkWriterParam, }; macro_rules! def_remote_sink { @@ -591,7 +590,7 @@ impl CoordinatedRemoteSinkWriter { epoch: None, batch_id: 0, stream_handle, - sink_metrics: SinkMetrics::for_test(), + metrics: SinkWriterMetrics::for_test(), } } } diff --git a/src/connector/src/sink/writer.rs b/src/connector/src/sink/writer.rs index a038fb99cace..144619545529 100644 --- a/src/connector/src/sink/writer.rs +++ b/src/connector/src/sink/writer.rs @@ -29,7 +29,7 @@ use crate::sink::formatter::SinkFormatter; use crate::sink::log_store::{ DeliveryFutureManager, DeliveryFutureManagerAddFuture, LogStoreReadItem, TruncateOffset, }; -use crate::sink::{LogSinker, Result, SinkError, SinkLogReader, SinkMetrics, SinkWriterMetrics}; +use crate::sink::{LogSinker, Result, SinkError, SinkLogReader, SinkWriterMetrics}; #[async_trait] pub trait SinkWriter: Send + 'static { diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs index b748bf4ec393..05a4dfc0bf1a 100644 --- a/src/stream/src/executor/monitor/streaming_stats.rs +++ b/src/stream/src/executor/monitor/streaming_stats.rs @@ -32,7 +32,6 @@ use risingwave_common::{ register_guarded_int_counter_vec_with_registry, register_guarded_int_gauge_vec_with_registry, }; use risingwave_connector::sink::catalog::SinkId; -use risingwave_connector::sink::SinkMetrics; use crate::common::log_store_impl::kv_log_store::{ REWIND_BACKOFF_FACTOR, REWIND_BASE_DELAY, REWIND_MAX_DELAY, From 3bd68107e936a713e65142848e4a6172619accac Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Thu, 26 Sep 2024 15:18:59 +0800 Subject: [PATCH 10/10] clean --- risedev.yml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/risedev.yml b/risedev.yml index bff947b10276..96443a7c0d6e 100644 --- a/risedev.yml +++ b/risedev.yml @@ -132,13 +132,6 @@ profile: - use: grafana - use: kafka persist-data: true - - use: schema-registry - - kafka-family: - steps: - - use: kafka - persist-data: true - - use: schema-registry standalone-full-peripherals: steps: