From aa76e714c89399a5e5a5097278b5f5c4687aa9d8 Mon Sep 17 00:00:00 2001 From: Emma Zhong Date: Tue, 25 Jun 2024 16:13:11 -0700 Subject: [PATCH] [indexer] ignore tables not partitioned by epoch at epoch change (#18410) If a table is partitioned by not by epochs, we shouldn't do anything to it when advancing epochs. Tested locally. --- Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: --- .../sui-indexer/src/store/pg_indexer_store.rs | 8 +++++++ .../src/store/pg_partition_manager.rs | 24 +++++++++++++++---- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index d1a942c74c4e9..e94e8de2861f6 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -1358,6 +1358,14 @@ impl PgIndexerStore { EpochPartitionData::compose_data(epoch_to_commit, last_epoch); let table_partitions = self.partition_manager.get_table_partitions()?; for (table, (_, last_partition)) in table_partitions { + // Only advance epoch partition for epoch partitioned tables. + if !self + .partition_manager + .get_strategy(&table) + .is_epoch_partitioned() + { + continue; + } let guard = self.metrics.advance_epoch_latency.start_timer(); self.partition_manager.advance_epoch( table.clone(), diff --git a/crates/sui-indexer/src/store/pg_partition_manager.rs b/crates/sui-indexer/src/store/pg_partition_manager.rs index e785d0250f923..f27078ca47048 100644 --- a/crates/sui-indexer/src/store/pg_partition_manager.rs +++ b/crates/sui-indexer/src/store/pg_partition_manager.rs @@ -60,6 +60,16 @@ impl Clone for PgPartitionManager { pub enum PgPartitionStrategy { CheckpointSequenceNumber, TxSequenceNumber, + ObjectId, +} + +impl PgPartitionStrategy { + pub fn is_epoch_partitioned(&self) -> bool { + matches!( + self, + Self::CheckpointSequenceNumber | Self::TxSequenceNumber + ) + } } #[derive(Clone, Debug)] @@ -102,6 +112,7 @@ impl PgPartitionManager { let mut partition_strategies = HashMap::new(); partition_strategies.insert("events", PgPartitionStrategy::TxSequenceNumber); partition_strategies.insert("transactions", PgPartitionStrategy::TxSequenceNumber); + partition_strategies.insert("objects_version", PgPartitionStrategy::ObjectId); let manager = Self { cp, partition_strategies, @@ -152,18 +163,19 @@ impl PgPartitionManager { .unwrap_or(PgPartitionStrategy::CheckpointSequenceNumber) } - pub fn determine_partition_range( + pub fn determine_epoch_partition_range( &self, table_name: &str, data: &EpochPartitionData, - ) -> (u64, u64) { + ) -> Option<(u64, u64)> { match self.get_strategy(table_name) { PgPartitionStrategy::CheckpointSequenceNumber => { - (data.last_epoch_start_cp, data.next_epoch_start_cp) + Some((data.last_epoch_start_cp, data.next_epoch_start_cp)) } PgPartitionStrategy::TxSequenceNumber => { - (data.last_epoch_start_tx, data.next_epoch_start_tx) + Some((data.last_epoch_start_tx, data.next_epoch_start_tx)) } + PgPartitionStrategy::ObjectId => None, } } @@ -173,7 +185,9 @@ impl PgPartitionManager { last_partition: u64, data: &EpochPartitionData, ) -> Result<(), IndexerError> { - let partition_range = self.determine_partition_range(&table, data); + let Some(partition_range) = self.determine_epoch_partition_range(&table, data) else { + return Ok(()); + }; if data.next_epoch == 0 { tracing::info!("Epoch 0 partition has been created in the initial setup."); return Ok(());