From 7614f05574d2d45eb07b50fa843f698ca14332e1 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Wed, 7 Feb 2024 04:55:36 +0000 Subject: [PATCH] blockstore: atomize slot clearing, relax parent slot meta check clear_unconfirmed_slot can leave blockstore in an irrecoverable state if it panics in the middle. write batch this function, so that any errors can be recovered after restart. additionally relax the constraint that the parent slot meta must exist, as it could have been cleaned up if outdated. --- ledger/src/blockstore.rs | 48 ++--- ledger/src/blockstore/blockstore_purge.rs | 228 ++++++++++++++++------ 2 files changed, 181 insertions(+), 95 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 8b964e5b3ce9a1..98af01291d4fdb 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1111,9 +1111,8 @@ impl Blockstore { self.completed_slots_senders.lock().unwrap().clear(); } - /// Range-delete all entries which prefix matches the specified `slot`, - /// remove `slot` its' parents SlotMeta next_slots list, and - /// clear `slot`'s SlotMeta (except for next_slots). + /// Clear `slot` from the Blockstore, see ``Blockstore::purge_slot_cleanup_chaining` + /// for more details. /// /// This function currently requires `insert_shreds_lock`, as both /// `clear_unconfirmed_slot()` and `insert_shreds_handle_duplicate()` @@ -1121,40 +1120,19 @@ impl Blockstore { /// family. pub fn clear_unconfirmed_slot(&self, slot: Slot) { let _lock = self.insert_shreds_lock.lock().unwrap(); - if let Some(mut slot_meta) = self - .meta(slot) - .expect("Couldn't fetch from SlotMeta column family") - { - // Clear all slot related information - self.run_purge(slot, slot, PurgeType::PrimaryIndex) - .expect("Purge database operations failed"); - - // Clear this slot as a next slot from parent - if let Some(parent_slot) = slot_meta.parent_slot { - let mut parent_slot_meta = self - .meta(parent_slot) - .expect("Couldn't fetch from SlotMeta column family") - .expect("Unconfirmed slot should have had parent slot set"); - // .retain() is a linear scan; however, next_slots should - // only contain several elements so this isn't so bad - parent_slot_meta - .next_slots - .retain(|&next_slot| next_slot != slot); - self.meta_cf - .put(parent_slot, &parent_slot_meta) - .expect("Couldn't insert into SlotMeta column family"); - } - // Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` - // field. - slot_meta.clear_unconfirmed_slot(); - self.meta_cf - .put(slot, &slot_meta) - .expect("Couldn't insert into SlotMeta column family"); - } else { - error!( + // Purge the slot and insert an empty `SlotMeta` with only the `next_slots` field preserved. + // Shreds inherently contain the slot of their parent which updates the parent's `next_slots` + // when the child is inserted through `Blockstore::handle_chaining()`. + // However we are only purging and repairing the parent slot here. Since the child will not be + // reinserted the chaining will be lost. In order for bank forks discovery to ingest the child, + // we must retain the chain by preserving `next_slots`. + match self.purge_slot_cleanup_chaining(slot) { + Ok(_) => {} + Err(BlockstoreError::SlotUnavailable) => error!( "clear_unconfirmed_slot() called on slot {} with no SlotMeta", slot - ); + ), + Err(e) => panic!("Purge database operations failed {}", e), } } diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 92f9453eabb6ed..7a107afc5d0a1c 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -129,6 +129,7 @@ impl Blockstore { } } + #[cfg(test)] pub(crate) fn run_purge( &self, from_slot: Slot, @@ -138,11 +139,61 @@ impl Blockstore { self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default()) } + /// Purges all columns relating to `slot`. + /// + /// Additionally we cleanup the parent of `slot`, by clearing `slot` from + /// the parent's `next_slots`. We reinsert an orphaned `slot_meta` for `slot` + /// that preserves `slot`'s `next_slots`. This ensures that `slot`'s fork is + /// replayable upon repair of `slot`. + pub(crate) fn purge_slot_cleanup_chaining(&self, slot: Slot) -> Result { + let Some(mut slot_meta) = self.meta(slot)? else { + return Err(BlockstoreError::SlotUnavailable); + }; + let mut write_batch = self.db.batch()?; + + let columns_purged = self.purge_range(&mut write_batch, slot, slot, PurgeType::Exact)?; + + if let Some(parent_slot) = slot_meta.parent_slot { + let parent_slot_meta = self.meta(parent_slot)?; + if let Some(mut parent_slot_meta) = parent_slot_meta { + // .retain() is a linear scan; however, next_slots should + // only contain several elements so this isn't so bad + parent_slot_meta + .next_slots + .retain(|&next_slot| next_slot != slot); + write_batch.put::(parent_slot, &parent_slot_meta)?; + } else { + error!( + "Parent slot meta {} for child {} is missing or cleaned up. + Falling back to orphan repair to remedy the situation", + parent_slot, slot + ); + } + } + + // Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` field. + slot_meta.clear_unconfirmed_slot(); + write_batch.put::(slot, &slot_meta)?; + + if let Err(e) = self.db.write(write_batch) { + error!( + "Error: {:?} while submitting write batch for slot {:?}", + e, slot, + ); + return Err(e); + } + Ok(columns_purged) + } + /// A helper function to `purge_slots` that executes the ledger clean up. /// The cleanup applies to \[`from_slot`, `to_slot`\]. /// /// When `from_slot` is 0, any sst-file with a key-range completely older /// than `to_slot` will also be deleted. + /// + /// Note: slots > `to_slot` that chained to a purged slot are not properly + /// cleaned up. This function is not intended to be used if such slots need + /// to be replayed. pub(crate) fn run_purge_with_stats( &self, from_slot: Slot, @@ -150,84 +201,130 @@ impl Blockstore { purge_type: PurgeType, purge_stats: &mut PurgeStats, ) -> Result { - let mut write_batch = self - .db - .batch() - .expect("Database Error: Failed to get write batch"); + let mut write_batch = self.db.batch()?; + let mut delete_range_timer = Measure::start("delete_range"); + let columns_purged = self.purge_range(&mut write_batch, from_slot, to_slot, purge_type)?; + delete_range_timer.stop(); + + let mut write_timer = Measure::start("write_batch"); + if let Err(e) = self.db.write(write_batch) { + error!( + "Error: {:?} while submitting write batch for purge from_slot {} to_slot {}", + e, from_slot, to_slot, + ); + return Err(e); + } + write_timer.stop(); + + let mut purge_files_in_range_timer = Measure::start("delete_file_in_range"); + // purge_files_in_range delete any files whose slot range is within + // [from_slot, to_slot]. When from_slot is 0, it is safe to run + // purge_files_in_range because if purge_files_in_range deletes any + // sst file that contains any range-deletion tombstone, the deletion + // range of that tombstone will be completely covered by the new + // range-delete tombstone (0, to_slot) issued above. + // + // On the other hand, purge_files_in_range is more effective and + // efficient than the compaction filter (which runs key-by-key) + // because all the sst files that have key range below to_slot + // can be deleted immediately. + if columns_purged && from_slot == 0 { + self.purge_files_in_range(from_slot, to_slot); + } + purge_files_in_range_timer.stop(); + + purge_stats.delete_range += delete_range_timer.as_us(); + purge_stats.write_batch += write_timer.as_us(); + purge_stats.delete_files_in_range += purge_files_in_range_timer.as_us(); + + // only drop w_active_transaction_status_index after we do db.write(write_batch); + // otherwise, readers might be confused with inconsistent state between + // self.active_transaction_status_index and RockDb's TransactionStatusIndex contents + Ok(columns_purged) + } + + fn purge_range( + &self, + write_batch: &mut WriteBatch, + from_slot: Slot, + to_slot: Slot, + purge_type: PurgeType, + ) -> Result { + let mut w_active_transaction_status_index = + self.active_transaction_status_index.write().unwrap(); let mut columns_purged = self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok(); - let mut w_active_transaction_status_index = - self.active_transaction_status_index.write().unwrap(); + match purge_type { PurgeType::Exact => { - self.purge_special_columns_exact(&mut write_batch, from_slot, to_slot)?; + self.purge_special_columns_exact(write_batch, from_slot, to_slot)?; } PurgeType::PrimaryIndex => { self.purge_special_columns_with_primary_index( - &mut write_batch, + write_batch, &mut columns_purged, &mut w_active_transaction_status_index, to_slot, @@ -241,43 +338,7 @@ impl Blockstore { // in no spiky periodic huge delete_range for them. } } - delete_range_timer.stop(); - - let mut write_timer = Measure::start("write_batch"); - if let Err(e) = self.db.write(write_batch) { - error!( - "Error: {:?} while submitting write batch for slot {:?} retrying...", - e, from_slot - ); - return Err(e); - } - write_timer.stop(); - - let mut purge_files_in_range_timer = Measure::start("delete_file_in_range"); - // purge_files_in_range delete any files whose slot range is within - // [from_slot, to_slot]. When from_slot is 0, it is safe to run - // purge_files_in_range because if purge_files_in_range deletes any - // sst file that contains any range-deletion tombstone, the deletion - // range of that tombstone will be completely covered by the new - // range-delete tombstone (0, to_slot) issued above. - // - // On the other hand, purge_files_in_range is more effective and - // efficient than the compaction filter (which runs key-by-key) - // because all the sst files that have key range below to_slot - // can be deleted immediately. - if columns_purged && from_slot == 0 { - self.purge_files_in_range(from_slot, to_slot); - } - purge_files_in_range_timer.stop(); - - purge_stats.delete_range += delete_range_timer.as_us(); - purge_stats.write_batch += write_timer.as_us(); - purge_stats.delete_files_in_range += purge_files_in_range_timer.as_us(); - // only drop w_active_transaction_status_index after we do db.write(write_batch); - // otherwise, readers might be confused with inconsistent state between - // self.active_transaction_status_index and RockDb's TransactionStatusIndex contents - drop(w_active_transaction_status_index); Ok(columns_purged) } @@ -1198,4 +1259,51 @@ pub mod tests { .purge_special_columns_exact(&mut write_batch, slot, slot + 1) .unwrap(); } + + #[test] + fn test_purge_slot_cleanup_chaining_missing_slot_meta() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let (shreds, _) = make_many_slot_entries(0, 10, 5); + blockstore.insert_shreds(shreds, None, false).unwrap(); + + assert!(matches!( + blockstore.purge_slot_cleanup_chaining(11).unwrap_err(), + BlockstoreError::SlotUnavailable + )); + } + + #[test] + fn test_purge_slot_cleanup_chaining() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let (shreds, _) = make_many_slot_entries(0, 10, 5); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let (slot_11, _) = make_slot_entries(11, 4, 5, true); + blockstore.insert_shreds(slot_11, None, false).unwrap(); + let (slot_12, _) = make_slot_entries(12, 5, 5, true); + blockstore.insert_shreds(slot_12, None, false).unwrap(); + + blockstore.purge_slot_cleanup_chaining(5).unwrap(); + + let slot_meta = blockstore.meta(5).unwrap().unwrap(); + let expected_slot_meta = SlotMeta { + slot: 5, + // Only the next_slots should be preserved + next_slots: vec![6, 12], + ..SlotMeta::default() + }; + assert_eq!(slot_meta, expected_slot_meta); + + let parent_slot_meta = blockstore.meta(4).unwrap().unwrap(); + assert_eq!(parent_slot_meta.next_slots, vec![11]); + + let child_slot_meta = blockstore.meta(6).unwrap().unwrap(); + assert_eq!(child_slot_meta.parent_slot.unwrap(), 5); + + let child_slot_meta = blockstore.meta(12).unwrap().unwrap(); + assert_eq!(child_slot_meta.parent_slot.unwrap(), 5); + } }