Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: sink metrics #18730

Merged
merged 10 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use super::decouple_checkpoint_log_sink::{
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
};
use super::writer::SinkWriter;
use super::{DummySinkCommitCoordinator, SinkWriterParam};
use super::{DummySinkCommitCoordinator, SinkWriterMetrics, SinkWriterParam};
use crate::error::ConnectorResult;
use crate::sink::{
Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
Expand Down Expand Up @@ -553,7 +553,7 @@ impl Sink for ClickHouseSink {

Ok(DecoupleCheckpointLogSinkerOf::new(
writer,
writer_param.sink_metrics,
SinkWriterMetrics::new(&writer_param),
commit_checkpoint_interval,
))
}
Expand Down
19 changes: 10 additions & 9 deletions src/connector/src/sink/decouple_checkpoint_log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use async_trait::async_trait;

use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
use crate::sink::writer::SinkWriter;
use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics};
use crate::sink::{LogSinker, Result, SinkLogReader, SinkWriterMetrics};

pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE: u64 = 10;
pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE: u64 = 1;
pub const COMMIT_CHECKPOINT_INTERVAL: &str = "commit_checkpoint_interval";
Expand All @@ -33,7 +34,7 @@ pub fn default_commit_checkpoint_interval() -> u64 {
/// we delay the checkpoint barrier to make commits less frequent.
pub struct DecoupleCheckpointLogSinkerOf<W> {
writer: W,
sink_metrics: SinkMetrics,
sink_writer_metrics: SinkWriterMetrics,
commit_checkpoint_interval: NonZeroU64,
}

Expand All @@ -42,12 +43,12 @@ impl<W> DecoupleCheckpointLogSinkerOf<W> {
/// decouple log reader `KvLogStoreReader`.
pub fn new(
writer: W,
sink_metrics: SinkMetrics,
sink_writer_metrics: SinkWriterMetrics,
commit_checkpoint_interval: NonZeroU64,
) -> Self {
DecoupleCheckpointLogSinkerOf {
writer,
sink_metrics,
sink_writer_metrics,
commit_checkpoint_interval,
}
}
Expand All @@ -57,7 +58,6 @@ impl<W> DecoupleCheckpointLogSinkerOf<W> {
impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSinkerOf<W> {
async fn consume_log_and_sink(self, log_reader: &mut impl SinkLogReader) -> Result<!> {
let mut sink_writer = self.writer;
let sink_metrics = self.sink_metrics;
#[derive(Debug)]
enum LogConsumerState {
/// Mark that the log consumer is not initialized yet
Expand All @@ -74,6 +74,7 @@ impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSink

let mut current_checkpoint: u64 = 0;
let commit_checkpoint_interval = self.commit_checkpoint_interval;
let sink_writer_metrics = self.sink_writer_metrics;

loop {
let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
Expand All @@ -87,8 +88,8 @@ impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSink
// force commit on update vnode bitmap
let start_time = Instant::now();
sink_writer.barrier(true).await?;
sink_metrics
.sink_commit_duration_metrics
sink_writer_metrics
.sink_commit_duration
.observe(start_time.elapsed().as_millis() as f64);
log_reader.truncate(TruncateOffset::Barrier { epoch: *prev_epoch })?;
current_checkpoint = 0;
Expand Down Expand Up @@ -149,8 +150,8 @@ impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSink
if current_checkpoint >= commit_checkpoint_interval.get() {
let start_time = Instant::now();
sink_writer.barrier(true).await?;
sink_metrics
.sink_commit_duration_metrics
sink_writer_metrics
.sink_commit_duration
.observe(start_time.elapsed().as_millis() as f64);
log_reader.truncate(TruncateOffset::Barrier { epoch })?;
current_checkpoint = 0;
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/sink/deltalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use super::decouple_checkpoint_log_sink::{
};
use super::writer::SinkWriter;
use super::{
Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam,
Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterMetrics, SinkWriterParam,
SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
};

Expand Down Expand Up @@ -289,6 +289,8 @@ impl Sink for DeltaLakeSink {
self.param.downstream_pk.clone(),
)
.await?;

let metrics = SinkWriterMetrics::new(&writer_param);
let writer = CoordinatedSinkWriter::new(
writer_param
.meta_client
Expand All @@ -312,7 +314,7 @@ impl Sink for DeltaLakeSink {

Ok(DecoupleCheckpointLogSinkerOf::new(
writer,
writer_param.sink_metrics,
metrics,
commit_checkpoint_interval,
))
}
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use super::doris_starrocks_connector::{
HeaderBuilder, InserterInner, InserterInnerBuilder, DORIS_DELETE_SIGN, DORIS_SUCCESS_STATUS,
POOL_IDLE_TIMEOUT,
};
use super::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT};
use super::{
Result, SinkError, SinkWriterMetrics, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};
use crate::sink::encoder::{JsonEncoder, RowEncoder};
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
use crate::sink::{DummySinkCommitCoordinator, Sink, SinkParam, SinkWriter, SinkWriterParam};
Expand Down Expand Up @@ -209,7 +211,7 @@ impl Sink for DorisSink {
self.is_append_only,
)
.await?
.into_log_sinker(writer_param.sink_metrics))
.into_log_sinker(SinkWriterMetrics::new(&writer_param)))
}

async fn validate(&self) -> Result<()> {
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/sink/file_sink/opendal_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::sink::catalog::SinkEncode;
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam, SinkWriter,
SinkWriterMetrics,
};
use crate::source::TryFromBTreeMap;
use crate::with_options::WithOptions;
Expand Down Expand Up @@ -126,7 +127,7 @@ impl<S: OpendalSinkBackend> Sink for FileSink<S> {
self.format_desc.encode.clone(),
self.engine_type.clone(),
)?
.into_log_sinker(writer_param.sink_metrics))
.into_log_sinker(SinkWriterMetrics::new(&writer_param)))
}
}

Expand Down
Loading
Loading