Skip to content

Commit

Permalink
[indexer] ignore tables not partitioned by epoch at epoch change (#18410
Browse files Browse the repository at this point in the history
)

If a table is partitioned by not by epochs, we shouldn't do anything to
it when advancing epochs.

Tested locally.

---

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol:
- [ ] Nodes (Validators and Full nodes):
- [ ] Indexer:
- [ ] JSON-RPC:
- [ ] GraphQL:
- [ ] CLI:
- [ ] Rust SDK:
  • Loading branch information
emmazzz committed Aug 3, 2024
1 parent 1d6d789 commit aa76e71
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
8 changes: 8 additions & 0 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,14 @@ impl<T: R2D2Connection + 'static> PgIndexerStore<T> {
EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
let table_partitions = self.partition_manager.get_table_partitions()?;
for (table, (_, last_partition)) in table_partitions {
// Only advance epoch partition for epoch partitioned tables.
if !self
.partition_manager
.get_strategy(&table)
.is_epoch_partitioned()
{
continue;
}
let guard = self.metrics.advance_epoch_latency.start_timer();
self.partition_manager.advance_epoch(
table.clone(),
Expand Down
24 changes: 19 additions & 5 deletions crates/sui-indexer/src/store/pg_partition_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ impl<T: R2D2Connection> Clone for PgPartitionManager<T> {
pub enum PgPartitionStrategy {
CheckpointSequenceNumber,
TxSequenceNumber,
ObjectId,
}

impl PgPartitionStrategy {
pub fn is_epoch_partitioned(&self) -> bool {
matches!(
self,
Self::CheckpointSequenceNumber | Self::TxSequenceNumber
)
}
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -102,6 +112,7 @@ impl<T: R2D2Connection> PgPartitionManager<T> {
let mut partition_strategies = HashMap::new();
partition_strategies.insert("events", PgPartitionStrategy::TxSequenceNumber);
partition_strategies.insert("transactions", PgPartitionStrategy::TxSequenceNumber);
partition_strategies.insert("objects_version", PgPartitionStrategy::ObjectId);
let manager = Self {
cp,
partition_strategies,
Expand Down Expand Up @@ -152,18 +163,19 @@ impl<T: R2D2Connection> PgPartitionManager<T> {
.unwrap_or(PgPartitionStrategy::CheckpointSequenceNumber)
}

pub fn determine_partition_range(
pub fn determine_epoch_partition_range(
&self,
table_name: &str,
data: &EpochPartitionData,
) -> (u64, u64) {
) -> Option<(u64, u64)> {
match self.get_strategy(table_name) {
PgPartitionStrategy::CheckpointSequenceNumber => {
(data.last_epoch_start_cp, data.next_epoch_start_cp)
Some((data.last_epoch_start_cp, data.next_epoch_start_cp))
}
PgPartitionStrategy::TxSequenceNumber => {
(data.last_epoch_start_tx, data.next_epoch_start_tx)
Some((data.last_epoch_start_tx, data.next_epoch_start_tx))
}
PgPartitionStrategy::ObjectId => None,
}
}

Expand All @@ -173,7 +185,9 @@ impl<T: R2D2Connection> PgPartitionManager<T> {
last_partition: u64,
data: &EpochPartitionData,
) -> Result<(), IndexerError> {
let partition_range = self.determine_partition_range(&table, data);
let Some(partition_range) = self.determine_epoch_partition_range(&table, data) else {
return Ok(());
};
if data.next_epoch == 0 {
tracing::info!("Epoch 0 partition has been created in the initial setup.");
return Ok(());
Expand Down

0 comments on commit aa76e71

Please sign in to comment.