diff --git a/src/height.rs b/src/height.rs index e2a3e20749..2bae1d0d6a 100644 --- a/src/height.rs +++ b/src/height.rs @@ -46,6 +46,12 @@ impl PartialEq for Height { } } +impl From for Height { + fn from(value: u64) -> Self { + Height(value) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/index.rs b/src/index.rs index b2b208c7c7..50c9b77d1c 100644 --- a/src/index.rs +++ b/src/index.rs @@ -221,11 +221,7 @@ impl Index { let mut tx = database.begin_write()?; - if cfg!(test) { - tx.set_durability(redb::Durability::None); - } else { - tx.set_durability(redb::Durability::Immediate); - }; + tx.set_durability(redb::Durability::Immediate); tx.open_table(HEIGHT_TO_BLOCK_HASH)?; tx.open_table(INSCRIPTION_ID_TO_INSCRIPTION_ENTRY)?; @@ -396,7 +392,9 @@ impl Index { } pub(crate) fn update(&self) -> Result { - Updater::update(self) + let mut updater = Updater::new(self)?; + + updater.update_index() } pub(crate) fn export(&self, filename: &String, include_addresses: bool) -> Result { @@ -473,13 +471,7 @@ impl Index { } fn begin_write(&self) -> Result { - if cfg!(test) { - let mut tx = self.database.begin_write()?; - tx.set_durability(redb::Durability::None); - Ok(tx) - } else { - Ok(self.database.begin_write()?) - } + Ok(self.database.begin_write()?) } fn increment_statistic(wtx: &WriteTransaction, statistic: Statistic, n: u64) -> Result { @@ -3210,4 +3202,41 @@ mod tests { ) } } + + #[test] + fn recover_from_reorg() { + for context in Context::configurations() { + context.mine_blocks(1); + + let txid = context.rpc_server.broadcast_tx(TransactionTemplate { + inputs: &[(1, 0, 0)], + witness: inscription("text/plain;charset=utf-8", "hello").to_witness(), + ..Default::default() + }); + + let first = InscriptionId { txid, index: 0 }; + let first_location = SatPoint { + outpoint: OutPoint { txid, vout: 0 }, + offset: 0, + }; + + context.mine_blocks(6); + + let txid = context.rpc_server.broadcast_tx(TransactionTemplate { + inputs: &[(2, 0, 0)], + witness: inscription("text/plain;charset=utf-8", "hello").to_witness(), + ..Default::default() + }); + + let _second = InscriptionId { txid, index: 0 }; + + context.mine_blocks(1); + + context.rpc_server.invalidate_tip(); + + context + .index + .assert_inscription_location(first, first_location, None); + } + } } diff --git a/src/index/updater.rs b/src/index/updater.rs index 7b78fb435e..10a7213281 100644 --- a/src/index/updater.rs +++ b/src/index/updater.rs @@ -29,18 +29,24 @@ impl From for BlockData { } } -pub(crate) struct Updater { +pub(crate) struct Updater<'index> { range_cache: HashMap>, height: u64, + index: &'index Index, index_sats: bool, + initial_sync: bool, sat_ranges_since_flush: u64, + savepoints: VecDeque, outputs_cached: u64, outputs_inserted_since_flush: u64, outputs_traversed: u64, + outpoint_sender: Sender, + value_receiver: Receiver, + value_cache: HashMap, } -impl Updater { - pub(crate) fn update(index: &Index) -> Result { +impl<'index> Updater<'_> { + pub(crate) fn new(index: &'index Index) -> Result> { let wtx = index.begin_write()?; let height = wtx @@ -51,35 +57,31 @@ impl Updater { .map(|(height, _hash)| height.value() + 1) .unwrap_or(0); - wtx - .open_table(WRITE_TRANSACTION_STARTING_BLOCK_COUNT_TO_TIMESTAMP)? - .insert( - &height, - &SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .map(|duration| duration.as_millis()) - .unwrap_or(0), - )?; - - let mut updater = Self { + wtx.commit(); + + let (outpoint_sender, value_receiver) = Self::spawn_fetcher(index)?; + + Ok(Updater { range_cache: HashMap::new(), height, + index, index_sats: index.has_sat_index()?, + initial_sync: true, sat_ranges_since_flush: 0, + savepoints: VecDeque::with_capacity(6), outputs_cached: 0, outputs_inserted_since_flush: 0, outputs_traversed: 0, - }; - - updater.update_index(index, wtx) + outpoint_sender, + value_receiver, + value_cache: HashMap::new(), + }) } - fn update_index<'index>( - &mut self, - index: &'index Index, - mut wtx: WriteTransaction<'index>, - ) -> Result { - let starting_height = index.client.get_block_count()? + 1; + pub(crate) fn update_index(&mut self) -> Result { + let mut wtx = self.index.begin_write()?; + + let starting_height = self.index.client.get_block_count()? + 1; let mut progress_bar = if cfg!(test) || log_enabled!(log::Level::Info) @@ -96,27 +98,46 @@ impl Updater { Some(progress_bar) }; - let rx = Self::fetch_blocks_from(index, self.height, self.index_sats)?; - - let (mut outpoint_sender, mut value_receiver) = Self::spawn_fetcher(index)?; + let rx = Self::fetch_blocks_from(self.index, self.height, self.index_sats)?; let mut uncommitted = 0; - let mut value_cache = HashMap::new(); while let Ok(block) = rx.recv() { - self.index_block( - index, - &mut outpoint_sender, - &mut value_receiver, - &mut wtx, - block, - &mut value_cache, - )?; + if self.index.client.get_block_count()? - self.index.block_height()?.unwrap_or(Height(0)).n() + < 50 + { + log::info!("not in initial sync; starting to create savepoints"); + self.initial_sync = false; + + if self.savepoints.len() >= 6 { + dbg!("here"); + wtx.delete_persistent_savepoint(self.savepoints.pop_front().unwrap())?; + } + + let savepoints: Vec = wtx.list_persistent_savepoints()?.into_iter().collect(); + dbg!(&savepoints); + // self.savepoints.push_back(wtx.ephemeral_savepoint()?); + + if let Some(prev_height) = self.height.checked_sub(1) { + let prev_hash = self.index.client.get_block_hash(prev_height)?; + dbg!(&prev_hash); + if prev_hash != block.header.prev_blockhash { + log::info!("reorg detected at or before {prev_height}; rolling back index"); + self.index.reorged.store(true, atomic::Ordering::Relaxed); + + let savepoint = wtx.get_persistent_savepoint(self.savepoints.pop_front().unwrap())?; + wtx.restore_savepoint(&savepoint)?; + self.savepoints.clear(); + } + } + } + + self.index_block(&mut wtx, block)?; if let Some(progress_bar) = &mut progress_bar { progress_bar.inc(1); if progress_bar.position() > progress_bar.length().unwrap() { - if let Ok(count) = index.client.get_block_count() { + if let Ok(count) = self.index.client.get_block_count() { progress_bar.set_length(count + 1); } else { log::warn!("Failed to fetch latest block height"); @@ -127,10 +148,10 @@ impl Updater { uncommitted += 1; if uncommitted == 5000 { - self.commit(wtx, value_cache)?; - value_cache = HashMap::new(); + self.commit(wtx)?; + self.value_cache = HashMap::new(); uncommitted = 0; - wtx = index.begin_write()?; + wtx = self.index.begin_write()?; let height = wtx .open_table(HEIGHT_TO_BLOCK_HASH)? .range(0..)? @@ -160,7 +181,7 @@ impl Updater { } if uncommitted > 0 { - self.commit(wtx, value_cache)?; + self.commit(wtx)?; } if let Some(progress_bar) = &mut progress_bar { @@ -321,24 +342,16 @@ impl Updater { Ok((outpoint_sender, value_receiver)) } - fn index_block( - &mut self, - index: &Index, - outpoint_sender: &mut Sender, - value_receiver: &mut Receiver, - wtx: &mut WriteTransaction, - block: BlockData, - value_cache: &mut HashMap, - ) -> Result<()> { + fn index_block(&mut self, wtx: &mut WriteTransaction, block: BlockData) -> Result<()> { // If value_receiver still has values something went wrong with the last block // Could be an assert, shouldn't recover from this and commit the last block - let Err(TryRecvError::Empty) = value_receiver.try_recv() else { + let Err(TryRecvError::Empty) = self.value_receiver.try_recv() else { return Err(anyhow!("Previous block did not consume all input values")); }; let mut outpoint_to_value = wtx.open_table(OUTPOINT_TO_VALUE)?; - let index_inscriptions = self.height >= index.first_inscription_height; + let index_inscriptions = self.height >= self.index.first_inscription_height; if index_inscriptions { // Send all missing input outpoints to be fetched right away @@ -360,7 +373,7 @@ impl Updater { continue; } // We don't need input values we already have in our value_cache from earlier blocks - if value_cache.contains_key(&prev_output) { + if self.value_cache.contains_key(&prev_output) { continue; } // We don't need input values we already have in our outpoint_to_value table from earlier blocks that @@ -369,7 +382,7 @@ impl Updater { continue; } // We don't know the value of this tx input. Send this outpoint to background thread to be fetched - outpoint_sender.blocking_send(prev_output)?; + self.outpoint_sender.blocking_send(prev_output)?; } } } @@ -393,7 +406,7 @@ impl Updater { let prev_hash = height_to_block_hash.get(&prev_height)?.unwrap(); if prev_hash.value() != &block.header.prev_blockhash.as_raw_hash().to_byte_array() { - index.reorged.store(true, atomic::Ordering::Relaxed); + self.index.reorged.store(true, atomic::Ordering::Relaxed); return Err(anyhow!("reorg detected at or before {prev_height}")); } } @@ -421,7 +434,7 @@ impl Updater { let mut inscription_updater = InscriptionUpdater::new( self.height, &mut inscription_id_to_satpoint, - value_receiver, + &mut self.value_receiver, &mut inscription_id_to_inscription_entry, lost_sats, &mut inscription_number_to_inscription_id, @@ -431,7 +444,7 @@ impl Updater { &mut satpoint_to_inscription_id, block.header.time, unbound_inscriptions, - value_cache, + &mut self.value_cache, )?; if self.index_sats { @@ -471,7 +484,6 @@ impl Updater { input_sat_ranges.push_back(SatRange::load(chunk.try_into().unwrap())); } } - self.index_transaction_sats( tx, *txid, @@ -616,7 +628,7 @@ impl Updater { Ok(()) } - fn commit(&mut self, wtx: WriteTransaction, value_cache: HashMap) -> Result { + fn commit(&mut self, wtx: WriteTransaction) -> Result { log::info!( "Committing at block height {}, {} outputs traversed, {} in map, {} cached", self.height, @@ -645,8 +657,8 @@ impl Updater { { let mut outpoint_to_value = wtx.open_table(OUTPOINT_TO_VALUE)?; - for (outpoint, value) in value_cache { - outpoint_to_value.insert(&outpoint.store(), &value)?; + for (outpoint, value) in &self.value_cache { + outpoint_to_value.insert(&outpoint.store(), value)?; } }