diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index f85367d32e5bf..d4b20a86d7a2e 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -131,6 +131,9 @@ impl CommonSplitReader for CdcSplitReader { snapshot_done: self.snapshot_done, }; + let source_id = get_event_stream_request.source_id.to_string(); + let source_type = get_event_stream_request.source_type.to_string(); + std::thread::spawn(move || { let mut env = JVM .as_ref() @@ -163,6 +166,11 @@ impl CommonSplitReader for CdcSplitReader { while let Some(GetEventStreamResponse { events, .. }) = rx.recv().await { tracing::debug!("receive events {:?}", events.len()); + self.source_ctx + .metrics + .connector_source_rows_received + .with_label_values(&[&source_type, &source_id]) + .inc_by(events.len() as u64); let msgs = events.into_iter().map(SourceMessage::from).collect_vec(); yield msgs; } diff --git a/src/connector/src/source/monitor/metrics.rs b/src/connector/src/source/monitor/metrics.rs index c6ea9998e55e4..fa3e836993c4f 100644 --- a/src/connector/src/source/monitor/metrics.rs +++ b/src/connector/src/source/monitor/metrics.rs @@ -62,6 +62,8 @@ pub struct SourceMetrics { /// Report latest message id pub latest_message_id: GenericGaugeVec, pub rdkafka_native_metric: Arc, + + pub connector_source_rows_received: GenericCounterVec, } pub static GLOBAL_SOURCE_METRICS: LazyLock = @@ -103,6 +105,15 @@ impl SourceMetrics { registry, ) .unwrap(); + + let connector_source_rows_received = register_int_counter_vec_with_registry!( + "connector_source_rows_received", + "Number of rows received by source", + &["source_type", "source_id"], + registry + ) + .unwrap(); + let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone())); SourceMetrics { partition_input_count, @@ -110,6 +121,7 @@ impl SourceMetrics { user_source_error_count, latest_message_id, rdkafka_native_metric, + connector_source_rows_received, } } }