diff --git a/Cargo.toml b/Cargo.toml index 5c156edefb..2591c84f0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,3 +83,6 @@ path = "tests/lib.rs" [build-dependencies] pulldown-cmark = "0.9.2" + +[features] +rollback = [] diff --git a/src/index.rs b/src/index.rs index 7ccdd174c3..29baefb060 100644 --- a/src/index.rs +++ b/src/index.rs @@ -1,5 +1,6 @@ use crate::brc20::ledger::LedgerRead; use crate::brc20::ScriptKey; + use { self::{ entry::{ @@ -25,6 +26,9 @@ mod fetcher; mod rtx; mod updater; +use once_cell::sync::OnceCell; +use redb::Savepoint; + const SCHEMA_VERSION: u64 = 3; macro_rules! define_table { @@ -45,6 +49,12 @@ define_table! { SAT_TO_SATPOINT, u64, &SatPointValue } define_table! { STATISTIC_TO_COUNT, u64, u64 } define_table! { WRITE_TRANSACTION_STARTING_BLOCK_COUNT_TO_TIMESTAMP, u64, u128 } +pub(crate) struct HeightSavepoint(pub u64, pub Savepoint); + +pub(crate) static mut GLOBAL_SAVEPOINTS: OnceCell> = OnceCell::new(); + +pub(crate) const SAVEPOINT_INTERVAL: u64 = 6; + pub(crate) struct Index { client: Client, database: Database, @@ -372,6 +382,29 @@ impl Index { self.reorged.load(atomic::Ordering::Relaxed) } + pub(crate) fn reset_reorged(&self) { + self.reorged.store(false, atomic::Ordering::Relaxed); + } + + pub(crate) unsafe fn backup_at_init(&self) -> Result { + let height = self.height().unwrap().unwrap_or(Height(0)).n(); + GLOBAL_SAVEPOINTS.get_or_init(|| VecDeque::new()); + let wtx = self.begin_write()?; + let sp = wtx.savepoint()?; + GLOBAL_SAVEPOINTS + .get_mut() + .unwrap() + .push_back(HeightSavepoint(height, sp)); + wtx.commit()?; + Ok(()) + } + + pub(crate) fn restore_savepoint(&self, sp: &Savepoint) -> Result { + let mut wtx = self.begin_write()?; + wtx.restore_savepoint(sp)?; + Ok(()) + } + fn begin_read(&self) -> Result { Ok(rtx::Rtx(self.database.begin_read()?)) } diff --git a/src/index/updater.rs b/src/index/updater.rs index d9be725176..e352e71d4c 100644 --- a/src/index/updater.rs +++ b/src/index/updater.rs @@ -9,6 +9,8 @@ use { tokio::sync::mpsc::{error::TryRecvError, Receiver, Sender}, }; +use crate::index::{GLOBAL_SAVEPOINTS, SAVEPOINT_INTERVAL}; + mod inscription_updater; struct BlockData { @@ -136,6 +138,40 @@ impl Updater { uncommitted += 1; + #[cfg(feature = "rollback")] + { + // fast sync mode means no less than 18 blocks behind to the latest height + let is_fast_sync = { + if let Ok(count) = index.client.get_block_count() { + if count <= self.height + 3 * SAVEPOINT_INTERVAL { + false + } else { + true + } + } else { + log::warn!("Failed to fetch latest block height"); + true + } + }; + // make savepoint every 6 block + // commit must be done before making savepoint + // do not make savepoint in fast sync mode + if !is_fast_sync && self.height % SAVEPOINT_INTERVAL == 0 { + self.commit(wtx, value_cache)?; + value_cache = HashMap::new(); + uncommitted = 0; + wtx = index.begin_write()?; + let sp = wtx.savepoint()?; + unsafe { + let savepoints = GLOBAL_SAVEPOINTS.get_mut().unwrap(); + savepoints.push_back(HeightSavepoint(self.height, sp)); + if savepoints.len() > 2 { + drop(savepoints.pop_front().unwrap().1); + } + } + } + } + if uncommitted == 5000 { self.commit(wtx, value_cache)?; value_cache = HashMap::new(); diff --git a/src/subcommand/server.rs b/src/subcommand/server.rs index 59baf3292c..1dcbd1f8ee 100644 --- a/src/subcommand/server.rs +++ b/src/subcommand/server.rs @@ -37,12 +37,15 @@ use { }; mod api; + mod error; mod response; use self::api::*; use self::response::ApiResponse; +use crate::index::{GLOBAL_SAVEPOINTS, SAVEPOINT_INTERVAL}; + enum BlockQuery { Height(u64), Hash(BlockHash), @@ -130,12 +133,42 @@ pub(crate) struct Server { impl Server { pub(crate) fn run(self, options: Options, index: Arc, handle: Handle) -> Result { + #[cfg(feature = "rollback")] + unsafe { + index.backup_at_init()?; + } + Runtime::new()?.block_on(async { let clone = index.clone(); thread::spawn(move || loop { if let Err(error) = clone.update() { log::warn!("{error}"); + + #[cfg(feature = "rollback")] + if clone.is_reorged() { + let height = clone.height().unwrap().unwrap().n(); + unsafe { + loop { + let hsp = GLOBAL_SAVEPOINTS + .get() + .unwrap() + .back() + .expect("savepoint not found"); + if hsp.0 + SAVEPOINT_INTERVAL <= height + || GLOBAL_SAVEPOINTS.get().unwrap().len() == 1 + { + clone + .restore_savepoint(&hsp.1) + .expect("restore savepoint error"); + break; + } + drop(GLOBAL_SAVEPOINTS.get_mut().unwrap().pop_back().unwrap().1) + } + } + clone.reset_reorged() + } } + thread::sleep(Duration::from_millis(5000)); });