Skip to content

Commit

Permalink
refactor(connector): migrate cdc source metric from connector to comp…
Browse files Browse the repository at this point in the history
…ute (#12283)
  • Loading branch information
chenzl25 authored Sep 14, 2023
1 parent a934185 commit 827ed5e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
snapshot_done: self.snapshot_done,
};

let source_id = get_event_stream_request.source_id.to_string();
let source_type = get_event_stream_request.source_type.to_string();

std::thread::spawn(move || {
let mut env = JVM
.as_ref()
Expand Down Expand Up @@ -163,6 +166,11 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {

while let Some(GetEventStreamResponse { events, .. }) = rx.recv().await {
tracing::debug!("receive events {:?}", events.len());
self.source_ctx
.metrics
.connector_source_rows_received
.with_label_values(&[&source_type, &source_id])
.inc_by(events.len() as u64);
let msgs = events.into_iter().map(SourceMessage::from).collect_vec();
yield msgs;
}
Expand Down
12 changes: 12 additions & 0 deletions src/connector/src/source/monitor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub struct SourceMetrics {
/// Report latest message id
pub latest_message_id: GenericGaugeVec<AtomicI64>,
pub rdkafka_native_metric: Arc<RdKafkaStats>,

pub connector_source_rows_received: GenericCounterVec<AtomicU64>,
}

pub static GLOBAL_SOURCE_METRICS: LazyLock<SourceMetrics> =
Expand Down Expand Up @@ -103,13 +105,23 @@ impl SourceMetrics {
registry,
)
.unwrap();

let connector_source_rows_received = register_int_counter_vec_with_registry!(
"connector_source_rows_received",
"Number of rows received by source",
&["source_type", "source_id"],
registry
)
.unwrap();

let rdkafka_native_metric = Arc::new(RdKafkaStats::new(registry.clone()));
SourceMetrics {
partition_input_count,
partition_input_bytes,
user_source_error_count,
latest_message_id,
rdkafka_native_metric,
connector_source_rows_received,
}
}
}
Expand Down

0 comments on commit 827ed5e

Please sign in to comment.