From 32626772c5922470614ae1860177d4b8538287a9 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Tue, 10 Dec 2024 12:00:13 -0800 Subject: [PATCH] [indexer-alt] Log TPS and CPS in all pipelines (#20441) ## Description Describe the changes or additions included in this PR. ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../pipeline/concurrent/commit_watermark.rs | 78 +++++-------- .../src/pipeline/concurrent/pruner.rs | 41 ++----- .../src/pipeline/logging.rs | 104 ++++++++++++++++++ .../src/pipeline/mod.rs | 5 +- .../src/pipeline/sequential/committer.rs | 32 +----- 5 files changed, 146 insertions(+), 114 deletions(-) create mode 100644 crates/sui-indexer-alt-framework/src/pipeline/logging.rs diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs index d92ae3d6ba36c..f35140fb6205a 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs @@ -18,9 +18,7 @@ use tracing::{debug, error, info, warn}; use crate::{ db::Db, metrics::IndexerMetrics, - pipeline::{ - CommitterConfig, WatermarkPart, LOUD_WATERMARK_UPDATE_INTERVAL, WARN_PENDING_WATERMARKS, - }, + pipeline::{logging::WatermarkLogger, CommitterConfig, WatermarkPart, WARN_PENDING_WATERMARKS}, watermarks::CommitterWatermark, }; @@ -79,8 +77,7 @@ pub(super) fn commit_watermark( // The watermark task will periodically output a log message at a higher log level to // demonstrate that the pipeline is making progress. - let mut next_loud_watermark_update = - watermark.checkpoint_hi_inclusive + LOUD_WATERMARK_UPDATE_INTERVAL; + let mut logger = WatermarkLogger::new("concurrent_committer", &watermark); info!(pipeline = H::NAME, ?watermark, "Starting commit watermark"); @@ -200,57 +197,32 @@ pub(super) fn commit_watermark( ); } - Ok(updated) => { + Ok(true) => { let elapsed = guard.stop_and_record(); - if updated { - metrics - .watermark_epoch_in_db - .with_label_values(&[H::NAME]) - .set(watermark.epoch_hi_inclusive); - - metrics - .watermark_checkpoint_in_db - .with_label_values(&[H::NAME]) - .set(watermark.checkpoint_hi_inclusive); - - metrics - .watermark_transaction_in_db - .with_label_values(&[H::NAME]) - .set(watermark.tx_hi); - - metrics - .watermark_timestamp_in_db_ms - .with_label_values(&[H::NAME]) - .set(watermark.timestamp_ms_hi_inclusive); - } - - if watermark.checkpoint_hi_inclusive > next_loud_watermark_update { - next_loud_watermark_update = watermark.checkpoint_hi_inclusive + LOUD_WATERMARK_UPDATE_INTERVAL; - - info!( - pipeline = H::NAME, - epoch = watermark.epoch_hi_inclusive, - checkpoint = watermark.checkpoint_hi_inclusive, - transaction = watermark.tx_hi, - timestamp = %watermark.timestamp(), - updated, - elapsed_ms = elapsed * 1000.0, - "Watermark", - ); - } else { - debug!( - pipeline = H::NAME, - epoch = watermark.epoch_hi_inclusive, - checkpoint = watermark.checkpoint_hi_inclusive, - transaction = watermark.tx_hi, - timestamp = %watermark.timestamp(), - updated, - elapsed_ms = elapsed * 1000.0, - "Watermark", - ); - } + logger.log::(&watermark, elapsed); + + metrics + .watermark_epoch_in_db + .with_label_values(&[H::NAME]) + .set(watermark.epoch_hi_inclusive); + + metrics + .watermark_checkpoint_in_db + .with_label_values(&[H::NAME]) + .set(watermark.checkpoint_hi_inclusive); + + metrics + .watermark_transaction_in_db + .with_label_values(&[H::NAME]) + .set(watermark.tx_hi); + + metrics + .watermark_timestamp_in_db_ms + .with_label_values(&[H::NAME]) + .set(watermark.timestamp_ms_hi_inclusive); } + Ok(false) => {} } } diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs index aa2648654a2b7..365bbc2f03646 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs @@ -11,7 +11,9 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use crate::{ - db::Db, metrics::IndexerMetrics, pipeline::LOUD_WATERMARK_UPDATE_INTERVAL, + db::Db, + metrics::IndexerMetrics, + pipeline::logging::{LoggerWatermark, WatermarkLogger}, watermarks::PrunerWatermark, }; @@ -51,7 +53,7 @@ pub(super) fn pruner( // The pruner task will periodically output a log message at a higher log level to // demonstrate that it is making progress. - let mut next_loud_watermark_update = 0; + let mut logger = WatermarkLogger::new("pruner", LoggerWatermark::default()); 'outer: loop { // (1) Get the latest pruning bounds from the database. @@ -186,37 +188,16 @@ pub(super) fn pruner( ) } - Ok(updated) => { + Ok(true) => { let elapsed = guard.stop_and_record(); + logger.log::(&watermark, elapsed); - if updated { - metrics - .watermark_pruner_hi_in_db - .with_label_values(&[H::NAME]) - .set(watermark.pruner_hi); - } - - if watermark.pruner_hi > next_loud_watermark_update { - next_loud_watermark_update = - watermark.pruner_hi + LOUD_WATERMARK_UPDATE_INTERVAL; - - info!( - pipeline = H::NAME, - pruner_hi = watermark.pruner_hi, - updated, - elapsed_ms = elapsed * 1000.0, - "Watermark" - ); - } else { - debug!( - pipeline = H::NAME, - pruner_hi = watermark.pruner_hi, - updated, - elapsed_ms = elapsed * 1000.0, - "Watermark" - ); - } + metrics + .watermark_pruner_hi_in_db + .with_label_values(&[H::NAME]) + .set(watermark.pruner_hi); } + Ok(false) => {} } } diff --git a/crates/sui-indexer-alt-framework/src/pipeline/logging.rs b/crates/sui-indexer-alt-framework/src/pipeline/logging.rs new file mode 100644 index 0000000000000..9610c5d088a89 --- /dev/null +++ b/crates/sui-indexer-alt-framework/src/pipeline/logging.rs @@ -0,0 +1,104 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Instant; + +use tracing::{debug, info}; + +use crate::watermarks::PrunerWatermark; + +use super::{CommitterWatermark, Processor}; + +/// Tracing message for the watermark update will be logged at info level at least this many +/// checkpoints. +const LOUD_WATERMARK_UPDATE_INTERVAL: i64 = 5 * 10; + +#[derive(Default)] +pub(crate) struct LoggerWatermark { + checkpoint: i64, + transaction: Option, +} + +pub(crate) struct WatermarkLogger { + name: &'static str, + timer: Instant, + prev_watermark: LoggerWatermark, +} + +impl WatermarkLogger { + pub fn new(name: &'static str, init_watermark: impl Into) -> Self { + Self { + name, + timer: Instant::now(), + prev_watermark: init_watermark.into(), + } + } + + /// Log the watermark update. + /// `watermark_update_latency` is the time spent to update the watermark. + /// + /// Given the new watermark, the logger will compare with the previous watermark to compute the + /// average TPS (transactions per second) and CPS (checkpoints per second) since the last update. + /// + /// If the watermark update is less than `LOUD_WATERMARK_UPDATE_INTERVAL` checkpoints apart, + /// the log message will be at debug level. Otherwise, it will be at info level. + pub fn log( + &mut self, + watermark: impl Into, + watermark_update_latency: f64, + ) { + let watermark: LoggerWatermark = watermark.into(); + let logger_timer_elapsed = self.timer.elapsed().as_secs_f64(); + let realtime_average_tps = match (self.prev_watermark.transaction, watermark.transaction) { + (Some(prev), Some(curr)) => Some((curr - prev) as f64 / logger_timer_elapsed), + _ => None, + }; + let realtime_average_cps = + (watermark.checkpoint - self.prev_watermark.checkpoint) as f64 / logger_timer_elapsed; + + if watermark.checkpoint < self.prev_watermark.checkpoint + LOUD_WATERMARK_UPDATE_INTERVAL { + debug!( + logger = self.name, + pipeline = H::NAME, + checkpoint = watermark.checkpoint, + transaction = watermark.transaction, + tps = realtime_average_tps, + cps = realtime_average_cps, + elapsed_ms = format!("{:.3}", watermark_update_latency * 1000.0), + "Updated watermark", + ); + return; + } + + info!( + logger = self.name, + pipeline = H::NAME, + checkpoint = watermark.checkpoint, + transaction = watermark.transaction, + tps = realtime_average_tps, + cps = realtime_average_cps, + elapsed_ms = format!("{:.3}", watermark_update_latency * 1000.0), + "Updated watermark", + ); + self.prev_watermark = watermark; + self.timer = Instant::now(); + } +} + +impl From<&CommitterWatermark<'_>> for LoggerWatermark { + fn from(watermark: &CommitterWatermark) -> Self { + Self { + checkpoint: watermark.checkpoint_hi_inclusive, + transaction: Some(watermark.tx_hi), + } + } +} + +impl From<&PrunerWatermark<'_>> for LoggerWatermark { + fn from(watermark: &PrunerWatermark) -> Self { + Self { + checkpoint: watermark.pruner_hi, + transaction: None, + } + } +} diff --git a/crates/sui-indexer-alt-framework/src/pipeline/mod.rs b/crates/sui-indexer-alt-framework/src/pipeline/mod.rs index 026a803f4fb04..14abb8095b966 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/mod.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/mod.rs @@ -9,13 +9,10 @@ pub use processor::Processor; use serde::{Deserialize, Serialize}; pub mod concurrent; +mod logging; mod processor; pub mod sequential; -/// Tracing message for the watermark update will be logged at info level at least this many -/// checkpoints. -const LOUD_WATERMARK_UPDATE_INTERVAL: i64 = 5 * 10; - /// Extra buffer added to channels between tasks in a pipeline. There does not need to be a huge /// capacity here because tasks already buffer rows to insert internally. const PIPELINE_BUFFER: usize = 5; diff --git a/crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs b/crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs index 73e3a437c0cb9..4f72c9cdc4244 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/sequential/committer.rs @@ -15,7 +15,7 @@ use tracing::{debug, info, warn}; use crate::{ db::Db, metrics::IndexerMetrics, - pipeline::{Indexed, LOUD_WATERMARK_UPDATE_INTERVAL, WARN_PENDING_WATERMARKS}, + pipeline::{logging::WatermarkLogger, Indexed, WARN_PENDING_WATERMARKS}, watermarks::CommitterWatermark, }; @@ -32,7 +32,7 @@ use super::{Handler, SequentialConfig}; /// single write), in a single transaction that includes all row updates and an update to the /// watermark table. /// -/// The committer can be configured to lag behind the ingestion serice by a fixed number of +/// The committer can be configured to lag behind the ingestion service by a fixed number of /// checkpoints (configured by `checkpoint_lag`). A value of `0` means no lag. /// /// Upon successful write, the task sends its new watermark back to the ingestion service, to @@ -80,8 +80,7 @@ pub(super) fn committer( // The committer task will periodically output a log message at a higher log level to // demonstrate that the pipeline is making progress. - let mut next_loud_watermark_update = - watermark.checkpoint_hi_inclusive + LOUD_WATERMARK_UPDATE_INTERVAL; + let mut logger = WatermarkLogger::new("sequential_committer", &watermark); // Data for checkpoint that haven't been written yet. Note that `pending_rows` includes // rows in `batch`. @@ -259,7 +258,6 @@ pub(super) fn committer( debug!( pipeline = H::NAME, - elapsed_ms = elapsed * 1000.0, attempt, affected, committed = batch_rows, @@ -267,6 +265,8 @@ pub(super) fn committer( "Wrote batch", ); + logger.log::(&watermark, elapsed); + metrics .total_committer_batches_succeeded .with_label_values(&[H::NAME]) @@ -307,28 +307,6 @@ pub(super) fn committer( .with_label_values(&[H::NAME]) .set(watermark.timestamp_ms_hi_inclusive); - if watermark.checkpoint_hi_inclusive > next_loud_watermark_update { - next_loud_watermark_update = watermark.checkpoint_hi_inclusive + LOUD_WATERMARK_UPDATE_INTERVAL; - - info!( - pipeline = H::NAME, - epoch = watermark.epoch_hi_inclusive, - checkpoint = watermark.checkpoint_hi_inclusive, - transaction = watermark.tx_hi, - timestamp = %watermark.timestamp(), - "Watermark", - ); - } else { - debug!( - pipeline = H::NAME, - epoch = watermark.epoch_hi_inclusive, - checkpoint = watermark.checkpoint_hi_inclusive, - transaction = watermark.tx_hi, - timestamp = %watermark.timestamp(), - "Watermark", - ); - } - // Ignore the result -- the ingestion service will close this channel // once it is done, but there may still be checkpoints buffered that need // processing.