From cefe9fdf70d42ec5607269df813a6d0436fec4c9 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 4 Dec 2023 17:15:25 +1100 Subject: [PATCH] Restore crash safety for database pruning (#4975) * Add some DB sanity checks * Restore crash safety for database pruning --- beacon_node/beacon_chain/src/builder.rs | 12 +---- beacon_node/beacon_chain/src/migrate.rs | 39 ++++++++++------ beacon_node/store/src/hot_cold_store.rs | 62 ++++++++++++------------- database_manager/src/lib.rs | 21 ++------- 4 files changed, 59 insertions(+), 75 deletions(-) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index ad6af824a22..dab096ad435 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -34,7 +34,7 @@ use std::time::Duration; use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; use task_executor::{ShutdownReason, TaskExecutor}; use types::{ - BeaconBlock, BeaconState, ChainSpec, Checkpoint, Epoch, EthSpec, Graffiti, Hash256, Signature, + BeaconBlock, BeaconState, ChainSpec, Epoch, EthSpec, Graffiti, Hash256, Signature, SignedBeaconBlock, Slot, }; @@ -559,16 +559,6 @@ where .map_err(|e| format!("Failed to initialize blob info: {:?}", e))?, ); - // Store pruning checkpoint to prevent attempting to prune before the anchor state. - self.pending_io_batch.push( - store - .pruning_checkpoint_store_op(Checkpoint { - root: weak_subj_block_root, - epoch: weak_subj_state.slot().epoch(TEthSpec::slots_per_epoch()), - }) - .map_err(|e| format!("{:?}", e))?, - ); - let snapshot = BeaconSnapshot { beacon_block_root: weak_subj_block_root, beacon_block: Arc::new(weak_subj_block), diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 44763f31e26..81b413e42f4 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -512,13 +512,7 @@ impl, Cold: ItemStore> BackgroundMigrator Result { - let old_finalized_checkpoint = - store - .load_pruning_checkpoint()? - .unwrap_or_else(|| Checkpoint { - epoch: Epoch::new(0), - root: Hash256::zero(), - }); + let old_finalized_checkpoint = store.get_pruning_checkpoint(); let old_finalized_slot = old_finalized_checkpoint .epoch @@ -572,6 +566,21 @@ impl, Cold: ItemStore> BackgroundMigrator>()?; + // Quick sanity check. If the canonical block & state roots are incorrect then we could + // incorrectly delete canonical states, which would corrupt the database. + let expected_canonical_block_roots = new_finalized_slot + .saturating_sub(old_finalized_slot) + .as_usize() + .saturating_add(1); + if newly_finalized_chain.len() != expected_canonical_block_roots { + return Err(BeaconChainError::DBInconsistent(format!( + "canonical chain iterator is corrupt; \ + expected {} but got {} block roots", + expected_canonical_block_roots, + newly_finalized_chain.len() + ))); + } + // We don't know which blocks are shared among abandoned chains, so we buffer and delete // everything in one fell swoop. let mut abandoned_blocks: HashSet = HashSet::new(); @@ -735,11 +744,6 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator ?state_root, "slot" => summary.slot, + "reason" => reason, ); state_delete_batch.push(StoreOp::DeleteState(state_root, Some(summary.slot))); } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index c4c32fd7719..d796d06deba 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -10,9 +10,9 @@ use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator}; use crate::leveldb_store::{BytesKey, LevelDB}; use crate::memory_store::MemoryStore; use crate::metadata::{ - AnchorInfo, BlobInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY, - BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, - PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, + AnchorInfo, BlobInfo, CompactionTimestamp, SchemaVersion, ANCHOR_INFO_KEY, BLOB_INFO_KEY, + COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, SCHEMA_VERSION_KEY, SPLIT_KEY, + STATE_UPPER_LIMIT_NO_RETAIN, }; use crate::metrics; use crate::state_cache::{PutStateOutcome, StateCache}; @@ -77,6 +77,8 @@ pub struct HotColdDB, Cold: ItemStore> { /// LRU cache of deserialized blocks and blobs. Updated whenever a block or blob is loaded. block_cache: Mutex>, /// Cache of beacon states. + /// + /// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required. state_cache: Mutex>, /// Immutable validator cache. pub immutable_validators: Arc>>, @@ -2385,26 +2387,17 @@ impl, Cold: ItemStore> HotColdDB self.config.compact_on_prune } - /// Load the checkpoint to begin pruning from (the "old finalized checkpoint"). - pub fn load_pruning_checkpoint(&self) -> Result, Error> { - Ok(self - .hot_db - .get(&PRUNING_CHECKPOINT_KEY)? - .map(|pc: PruningCheckpoint| pc.checkpoint)) - } - - /// Store the checkpoint to begin pruning from (the "old finalized checkpoint"). - pub fn store_pruning_checkpoint(&self, checkpoint: Checkpoint) -> Result<(), Error> { - self.hot_db - .do_atomically(vec![self.pruning_checkpoint_store_op(checkpoint)?]) - } - - /// Create a staged store for the pruning checkpoint. - pub fn pruning_checkpoint_store_op( - &self, - checkpoint: Checkpoint, - ) -> Result { - PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY) + /// Get the checkpoint to begin pruning from (the "old finalized checkpoint"). + pub fn get_pruning_checkpoint(&self) -> Checkpoint { + // Since tree-states we infer the pruning checkpoint from the split, as this is simpler & + // safer in the presence of crashes that occur after pruning but before the split is + // updated. + // FIXME(sproul): ensure delete PRUNING_CHECKPOINT_KEY is deleted in DB migration + let split = self.get_split_info(); + Checkpoint { + epoch: split.slot.epoch(E::slots_per_epoch()), + root: split.block_root, + } } /// Load the timestamp of the last compaction as a `Duration` since the UNIX epoch. @@ -2917,8 +2910,8 @@ pub fn migrate_database, Cold: ItemStore>( store.store_cold_state(&state_root, &state, &mut cold_db_ops)?; } - // There are data dependencies between calls to `store_cold_state()` that prevent us from - // doing one big call to `store.cold_db.do_atomically()` at end of the loop. + // Cold states are diffed with respect to each other, so we need to finish writing previous + // states before storing new ones. store.cold_db.do_atomically(cold_db_ops)?; } @@ -2927,15 +2920,20 @@ pub fn migrate_database, Cold: ItemStore>( // procedure. // // Since it is pretty much impossible to be atomic across more than one database, we trade - // losing track of states to delete, for consistency. In other words: We should be safe to die - // at any point below but it may happen that some states won't be deleted from the hot database - // and will remain there forever. Since dying in these particular few lines should be an - // exceedingly rare event, this should be an acceptable tradeoff. + // temporarily losing track of blocks to delete, for consistency. In other words: We should be + // safe to die at any point below but it may happen that some blocks won't be deleted from the + // hot database and will remain there forever. We may also temporarily abandon states, but + // they will get picked up by the state pruning that iterates over the whole column. // Flush to disk all the states that have just been migrated to the cold store. store.cold_db.do_atomically(cold_db_block_ops)?; store.cold_db.sync()?; + // Update the split. + // + // NOTE(sproul): We do this in its own fsync'd transaction mostly for historical reasons, but + // I'm scared to change it, because doing an fsync with *more data* while holding the split + // write lock might have terrible performance implications (jamming the split for 100-500ms+). { let mut split_guard = store.split.write(); let latest_split_slot = split_guard.slot; @@ -2966,13 +2964,13 @@ pub fn migrate_database, Cold: ItemStore>( }; store.hot_db.put_sync(&SPLIT_KEY, &split)?; - // Split point is now persisted in the hot database on disk. The in-memory split point - // hasn't been modified elsewhere since we keep a write lock on it. It's safe to update + // Split point is now persisted in the hot database on disk. The in-memory split point + // hasn't been modified elsewhere since we keep a write lock on it. It's safe to update // the in-memory split point now. *split_guard = split; } - // Delete the states from the hot database if we got this far. + // Delete the blocks and states from the hot database if we got this far. store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?; // Update the cache's view of the finalized state. diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index cf1069fd2a7..c33b9b0dca9 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -294,32 +294,17 @@ fn parse_inspect_config(cli_args: &ArgMatches) -> Result pub fn inspect_db( inspect_config: InspectConfig, client_config: ClientConfig, - runtime_context: &RuntimeContext, - log: Logger, ) -> Result<(), String> { - let spec = runtime_context.eth2_config.spec.clone(); let hot_path = client_config.get_db_path(); let cold_path = client_config.get_freezer_db_path(); - let blobs_path = client_config.get_blobs_db_path(); - - let db = HotColdDB::, LevelDB>::open( - &hot_path, - &cold_path, - &blobs_path, - |_, _, _| Ok(()), - client_config.store, - spec, - log, - ) - .map_err(|e| format!("{:?}", e))?; let mut total = 0; let mut num_keys = 0; let sub_db = if inspect_config.freezer { - &db.cold_db + LevelDB::::open(&cold_path).map_err(|e| format!("Unable to open freezer DB: {e:?}"))? } else { - &db.hot_db + LevelDB::::open(&hot_path).map_err(|e| format!("Unable to open hot DB: {e:?}"))? }; let skip = inspect_config.skip.unwrap_or(0); @@ -653,7 +638,7 @@ pub fn run(cli_args: &ArgMatches<'_>, env: Environment) -> Result } ("inspect", Some(cli_args)) => { let inspect_config = parse_inspect_config(cli_args)?; - inspect_db(inspect_config, client_config, &context, log) + inspect_db::(inspect_config, client_config) } ("prune-payloads", Some(_)) => { prune_payloads(client_config, &context, log).map_err(format_err)