Skip to content

Commit

Permalink
[indexer-alt] Log TPS and CPS in all pipelines (#20441)
Browse files Browse the repository at this point in the history
## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Dec 10, 2024
1 parent 6e847b0 commit 3262677
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use tracing::{debug, error, info, warn};
use crate::{
db::Db,
metrics::IndexerMetrics,
pipeline::{
CommitterConfig, WatermarkPart, LOUD_WATERMARK_UPDATE_INTERVAL, WARN_PENDING_WATERMARKS,
},
pipeline::{logging::WatermarkLogger, CommitterConfig, WatermarkPart, WARN_PENDING_WATERMARKS},
watermarks::CommitterWatermark,
};

Expand Down Expand Up @@ -79,8 +77,7 @@ pub(super) fn commit_watermark<H: Handler + 'static>(

// The watermark task will periodically output a log message at a higher log level to
// demonstrate that the pipeline is making progress.
let mut next_loud_watermark_update =
watermark.checkpoint_hi_inclusive + LOUD_WATERMARK_UPDATE_INTERVAL;
let mut logger = WatermarkLogger::new("concurrent_committer", &watermark);

info!(pipeline = H::NAME, ?watermark, "Starting commit watermark");

Expand Down Expand Up @@ -200,57 +197,32 @@ pub(super) fn commit_watermark<H: Handler + 'static>(
);
}

Ok(updated) => {
Ok(true) => {
let elapsed = guard.stop_and_record();

if updated {
metrics
.watermark_epoch_in_db
.with_label_values(&[H::NAME])
.set(watermark.epoch_hi_inclusive);

metrics
.watermark_checkpoint_in_db
.with_label_values(&[H::NAME])
.set(watermark.checkpoint_hi_inclusive);

metrics
.watermark_transaction_in_db
.with_label_values(&[H::NAME])
.set(watermark.tx_hi);

metrics
.watermark_timestamp_in_db_ms
.with_label_values(&[H::NAME])
.set(watermark.timestamp_ms_hi_inclusive);
}

if watermark.checkpoint_hi_inclusive > next_loud_watermark_update {
next_loud_watermark_update = watermark.checkpoint_hi_inclusive + LOUD_WATERMARK_UPDATE_INTERVAL;

info!(
pipeline = H::NAME,
epoch = watermark.epoch_hi_inclusive,
checkpoint = watermark.checkpoint_hi_inclusive,
transaction = watermark.tx_hi,
timestamp = %watermark.timestamp(),
updated,
elapsed_ms = elapsed * 1000.0,
"Watermark",
);
} else {
debug!(
pipeline = H::NAME,
epoch = watermark.epoch_hi_inclusive,
checkpoint = watermark.checkpoint_hi_inclusive,
transaction = watermark.tx_hi,
timestamp = %watermark.timestamp(),
updated,
elapsed_ms = elapsed * 1000.0,
"Watermark",
);
}
logger.log::<H>(&watermark, elapsed);

metrics
.watermark_epoch_in_db
.with_label_values(&[H::NAME])
.set(watermark.epoch_hi_inclusive);

metrics
.watermark_checkpoint_in_db
.with_label_values(&[H::NAME])
.set(watermark.checkpoint_hi_inclusive);

metrics
.watermark_transaction_in_db
.with_label_values(&[H::NAME])
.set(watermark.tx_hi);

metrics
.watermark_timestamp_in_db_ms
.with_label_values(&[H::NAME])
.set(watermark.timestamp_ms_hi_inclusive);
}
Ok(false) => {}
}
}

Expand Down
41 changes: 11 additions & 30 deletions crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

use crate::{
db::Db, metrics::IndexerMetrics, pipeline::LOUD_WATERMARK_UPDATE_INTERVAL,
db::Db,
metrics::IndexerMetrics,
pipeline::logging::{LoggerWatermark, WatermarkLogger},
watermarks::PrunerWatermark,
};

Expand Down Expand Up @@ -51,7 +53,7 @@ pub(super) fn pruner<H: Handler + 'static>(

// The pruner task will periodically output a log message at a higher log level to
// demonstrate that it is making progress.
let mut next_loud_watermark_update = 0;
let mut logger = WatermarkLogger::new("pruner", LoggerWatermark::default());

'outer: loop {
// (1) Get the latest pruning bounds from the database.
Expand Down Expand Up @@ -186,37 +188,16 @@ pub(super) fn pruner<H: Handler + 'static>(
)
}

Ok(updated) => {
Ok(true) => {
let elapsed = guard.stop_and_record();
logger.log::<H>(&watermark, elapsed);

if updated {
metrics
.watermark_pruner_hi_in_db
.with_label_values(&[H::NAME])
.set(watermark.pruner_hi);
}

if watermark.pruner_hi > next_loud_watermark_update {
next_loud_watermark_update =
watermark.pruner_hi + LOUD_WATERMARK_UPDATE_INTERVAL;

info!(
pipeline = H::NAME,
pruner_hi = watermark.pruner_hi,
updated,
elapsed_ms = elapsed * 1000.0,
"Watermark"
);
} else {
debug!(
pipeline = H::NAME,
pruner_hi = watermark.pruner_hi,
updated,
elapsed_ms = elapsed * 1000.0,
"Watermark"
);
}
metrics
.watermark_pruner_hi_in_db
.with_label_values(&[H::NAME])
.set(watermark.pruner_hi);
}
Ok(false) => {}
}
}

Expand Down
104 changes: 104 additions & 0 deletions crates/sui-indexer-alt-framework/src/pipeline/logging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::time::Instant;

use tracing::{debug, info};

use crate::watermarks::PrunerWatermark;

use super::{CommitterWatermark, Processor};

/// Tracing message for the watermark update will be logged at info level at least this many
/// checkpoints.
const LOUD_WATERMARK_UPDATE_INTERVAL: i64 = 5 * 10;

#[derive(Default)]
pub(crate) struct LoggerWatermark {
checkpoint: i64,
transaction: Option<i64>,
}

pub(crate) struct WatermarkLogger {
name: &'static str,
timer: Instant,
prev_watermark: LoggerWatermark,
}

impl WatermarkLogger {
pub fn new(name: &'static str, init_watermark: impl Into<LoggerWatermark>) -> Self {
Self {
name,
timer: Instant::now(),
prev_watermark: init_watermark.into(),
}
}

/// Log the watermark update.
/// `watermark_update_latency` is the time spent to update the watermark.
///
/// Given the new watermark, the logger will compare with the previous watermark to compute the
/// average TPS (transactions per second) and CPS (checkpoints per second) since the last update.
///
/// If the watermark update is less than `LOUD_WATERMARK_UPDATE_INTERVAL` checkpoints apart,
/// the log message will be at debug level. Otherwise, it will be at info level.
pub fn log<H: Processor>(
&mut self,
watermark: impl Into<LoggerWatermark>,
watermark_update_latency: f64,
) {
let watermark: LoggerWatermark = watermark.into();
let logger_timer_elapsed = self.timer.elapsed().as_secs_f64();
let realtime_average_tps = match (self.prev_watermark.transaction, watermark.transaction) {
(Some(prev), Some(curr)) => Some((curr - prev) as f64 / logger_timer_elapsed),
_ => None,
};
let realtime_average_cps =
(watermark.checkpoint - self.prev_watermark.checkpoint) as f64 / logger_timer_elapsed;

if watermark.checkpoint < self.prev_watermark.checkpoint + LOUD_WATERMARK_UPDATE_INTERVAL {
debug!(
logger = self.name,
pipeline = H::NAME,
checkpoint = watermark.checkpoint,
transaction = watermark.transaction,
tps = realtime_average_tps,
cps = realtime_average_cps,
elapsed_ms = format!("{:.3}", watermark_update_latency * 1000.0),
"Updated watermark",
);
return;
}

info!(
logger = self.name,
pipeline = H::NAME,
checkpoint = watermark.checkpoint,
transaction = watermark.transaction,
tps = realtime_average_tps,
cps = realtime_average_cps,
elapsed_ms = format!("{:.3}", watermark_update_latency * 1000.0),
"Updated watermark",
);
self.prev_watermark = watermark;
self.timer = Instant::now();
}
}

impl From<&CommitterWatermark<'_>> for LoggerWatermark {
fn from(watermark: &CommitterWatermark) -> Self {
Self {
checkpoint: watermark.checkpoint_hi_inclusive,
transaction: Some(watermark.tx_hi),
}
}
}

impl From<&PrunerWatermark<'_>> for LoggerWatermark {
fn from(watermark: &PrunerWatermark) -> Self {
Self {
checkpoint: watermark.pruner_hi,
transaction: None,
}
}
}
5 changes: 1 addition & 4 deletions crates/sui-indexer-alt-framework/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@ pub use processor::Processor;
use serde::{Deserialize, Serialize};

pub mod concurrent;
mod logging;
mod processor;
pub mod sequential;

/// Tracing message for the watermark update will be logged at info level at least this many
/// checkpoints.
const LOUD_WATERMARK_UPDATE_INTERVAL: i64 = 5 * 10;

/// Extra buffer added to channels between tasks in a pipeline. There does not need to be a huge
/// capacity here because tasks already buffer rows to insert internally.
const PIPELINE_BUFFER: usize = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing::{debug, info, warn};
use crate::{
db::Db,
metrics::IndexerMetrics,
pipeline::{Indexed, LOUD_WATERMARK_UPDATE_INTERVAL, WARN_PENDING_WATERMARKS},
pipeline::{logging::WatermarkLogger, Indexed, WARN_PENDING_WATERMARKS},
watermarks::CommitterWatermark,
};

Expand All @@ -32,7 +32,7 @@ use super::{Handler, SequentialConfig};
/// single write), in a single transaction that includes all row updates and an update to the
/// watermark table.
///
/// The committer can be configured to lag behind the ingestion serice by a fixed number of
/// The committer can be configured to lag behind the ingestion service by a fixed number of
/// checkpoints (configured by `checkpoint_lag`). A value of `0` means no lag.
///
/// Upon successful write, the task sends its new watermark back to the ingestion service, to
Expand Down Expand Up @@ -80,8 +80,7 @@ pub(super) fn committer<H: Handler + 'static>(

// The committer task will periodically output a log message at a higher log level to
// demonstrate that the pipeline is making progress.
let mut next_loud_watermark_update =
watermark.checkpoint_hi_inclusive + LOUD_WATERMARK_UPDATE_INTERVAL;
let mut logger = WatermarkLogger::new("sequential_committer", &watermark);

// Data for checkpoint that haven't been written yet. Note that `pending_rows` includes
// rows in `batch`.
Expand Down Expand Up @@ -259,14 +258,15 @@ pub(super) fn committer<H: Handler + 'static>(

debug!(
pipeline = H::NAME,
elapsed_ms = elapsed * 1000.0,
attempt,
affected,
committed = batch_rows,
pending = pending_rows,
"Wrote batch",
);

logger.log::<H>(&watermark, elapsed);

metrics
.total_committer_batches_succeeded
.with_label_values(&[H::NAME])
Expand Down Expand Up @@ -307,28 +307,6 @@ pub(super) fn committer<H: Handler + 'static>(
.with_label_values(&[H::NAME])
.set(watermark.timestamp_ms_hi_inclusive);

if watermark.checkpoint_hi_inclusive > next_loud_watermark_update {
next_loud_watermark_update = watermark.checkpoint_hi_inclusive + LOUD_WATERMARK_UPDATE_INTERVAL;

info!(
pipeline = H::NAME,
epoch = watermark.epoch_hi_inclusive,
checkpoint = watermark.checkpoint_hi_inclusive,
transaction = watermark.tx_hi,
timestamp = %watermark.timestamp(),
"Watermark",
);
} else {
debug!(
pipeline = H::NAME,
epoch = watermark.epoch_hi_inclusive,
checkpoint = watermark.checkpoint_hi_inclusive,
transaction = watermark.tx_hi,
timestamp = %watermark.timestamp(),
"Watermark",
);
}

// Ignore the result -- the ingestion service will close this channel
// once it is done, but there may still be checkpoints buffered that need
// processing.
Expand Down

0 comments on commit 3262677

Please sign in to comment.