Skip to content

Commit

Permalink
Merge pull request #8 from okx/rollback
Browse files Browse the repository at this point in the history
support rollback by savepoint
  • Loading branch information
wanyvic authored May 9, 2023
2 parents 79c1c46 + 85b40ee commit 9ddc4bb
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,6 @@ path = "tests/lib.rs"

[build-dependencies]
pulldown-cmark = "0.9.2"

[features]
rollback = []
33 changes: 33 additions & 0 deletions src/index.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::brc20::ledger::LedgerRead;
use crate::brc20::ScriptKey;

use {
self::{
entry::{
Expand All @@ -25,6 +26,9 @@ mod fetcher;
mod rtx;
mod updater;

use once_cell::sync::OnceCell;
use redb::Savepoint;

const SCHEMA_VERSION: u64 = 3;

macro_rules! define_table {
Expand All @@ -45,6 +49,12 @@ define_table! { SAT_TO_SATPOINT, u64, &SatPointValue }
define_table! { STATISTIC_TO_COUNT, u64, u64 }
define_table! { WRITE_TRANSACTION_STARTING_BLOCK_COUNT_TO_TIMESTAMP, u64, u128 }

pub(crate) struct HeightSavepoint(pub u64, pub Savepoint);

pub(crate) static mut GLOBAL_SAVEPOINTS: OnceCell<VecDeque<HeightSavepoint>> = OnceCell::new();

pub(crate) const SAVEPOINT_INTERVAL: u64 = 6;

pub(crate) struct Index {
client: Client,
database: Database,
Expand Down Expand Up @@ -372,6 +382,29 @@ impl Index {
self.reorged.load(atomic::Ordering::Relaxed)
}

pub(crate) fn reset_reorged(&self) {
self.reorged.store(false, atomic::Ordering::Relaxed);
}

pub(crate) unsafe fn backup_at_init(&self) -> Result {
let height = self.height().unwrap().unwrap_or(Height(0)).n();
GLOBAL_SAVEPOINTS.get_or_init(|| VecDeque::new());
let wtx = self.begin_write()?;
let sp = wtx.savepoint()?;
GLOBAL_SAVEPOINTS
.get_mut()
.unwrap()
.push_back(HeightSavepoint(height, sp));
wtx.commit()?;
Ok(())
}

pub(crate) fn restore_savepoint(&self, sp: &Savepoint) -> Result {
let mut wtx = self.begin_write()?;
wtx.restore_savepoint(sp)?;
Ok(())
}

fn begin_read(&self) -> Result<rtx::Rtx> {
Ok(rtx::Rtx(self.database.begin_read()?))
}
Expand Down
36 changes: 36 additions & 0 deletions src/index/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use {
tokio::sync::mpsc::{error::TryRecvError, Receiver, Sender},
};

use crate::index::{GLOBAL_SAVEPOINTS, SAVEPOINT_INTERVAL};

mod inscription_updater;

struct BlockData {
Expand Down Expand Up @@ -136,6 +138,40 @@ impl Updater {

uncommitted += 1;

#[cfg(feature = "rollback")]
{
// fast sync mode means no less than 18 blocks behind to the latest height
let is_fast_sync = {
if let Ok(count) = index.client.get_block_count() {
if count <= self.height + 3 * SAVEPOINT_INTERVAL {
false
} else {
true
}
} else {
log::warn!("Failed to fetch latest block height");
true
}
};
// make savepoint every 6 block
// commit must be done before making savepoint
// do not make savepoint in fast sync mode
if !is_fast_sync && self.height % SAVEPOINT_INTERVAL == 0 {
self.commit(wtx, value_cache)?;
value_cache = HashMap::new();
uncommitted = 0;
wtx = index.begin_write()?;
let sp = wtx.savepoint()?;
unsafe {
let savepoints = GLOBAL_SAVEPOINTS.get_mut().unwrap();
savepoints.push_back(HeightSavepoint(self.height, sp));
if savepoints.len() > 2 {
drop(savepoints.pop_front().unwrap().1);
}
}
}
}

if uncommitted == 5000 {
self.commit(wtx, value_cache)?;
value_cache = HashMap::new();
Expand Down
33 changes: 33 additions & 0 deletions src/subcommand/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ use {
};

mod api;

mod error;
mod response;

use self::api::*;
use self::response::ApiResponse;

use crate::index::{GLOBAL_SAVEPOINTS, SAVEPOINT_INTERVAL};

enum BlockQuery {
Height(u64),
Hash(BlockHash),
Expand Down Expand Up @@ -130,12 +133,42 @@ pub(crate) struct Server {

impl Server {
pub(crate) fn run(self, options: Options, index: Arc<Index>, handle: Handle) -> Result {
#[cfg(feature = "rollback")]
unsafe {
index.backup_at_init()?;
}

Runtime::new()?.block_on(async {
let clone = index.clone();
thread::spawn(move || loop {
if let Err(error) = clone.update() {
log::warn!("{error}");

#[cfg(feature = "rollback")]
if clone.is_reorged() {
let height = clone.height().unwrap().unwrap().n();
unsafe {
loop {
let hsp = GLOBAL_SAVEPOINTS
.get()
.unwrap()
.back()
.expect("savepoint not found");
if hsp.0 + SAVEPOINT_INTERVAL <= height
|| GLOBAL_SAVEPOINTS.get().unwrap().len() == 1
{
clone
.restore_savepoint(&hsp.1)
.expect("restore savepoint error");
break;
}
drop(GLOBAL_SAVEPOINTS.get_mut().unwrap().pop_back().unwrap().1)
}
}
clone.reset_reorged()
}
}

thread::sleep(Duration::from_millis(5000));
});

Expand Down

0 comments on commit 9ddc4bb

Please sign in to comment.