Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorg resistance #2317

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/height.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ impl PartialEq<u64> for Height {
}
}

impl From<u64> for Height {
fn from(value: u64) -> Self {
Height(value)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
55 changes: 42 additions & 13 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,7 @@ impl Index {

let mut tx = database.begin_write()?;

if cfg!(test) {
tx.set_durability(redb::Durability::None);
} else {
tx.set_durability(redb::Durability::Immediate);
};
tx.set_durability(redb::Durability::Immediate);

tx.open_table(HEIGHT_TO_BLOCK_HASH)?;
tx.open_table(INSCRIPTION_ID_TO_INSCRIPTION_ENTRY)?;
Expand Down Expand Up @@ -396,7 +392,9 @@ impl Index {
}

pub(crate) fn update(&self) -> Result {
Updater::update(self)
let mut updater = Updater::new(self)?;

updater.update_index()
}

pub(crate) fn export(&self, filename: &String, include_addresses: bool) -> Result {
Expand Down Expand Up @@ -473,13 +471,7 @@ impl Index {
}

fn begin_write(&self) -> Result<WriteTransaction> {
if cfg!(test) {
let mut tx = self.database.begin_write()?;
tx.set_durability(redb::Durability::None);
Ok(tx)
} else {
Ok(self.database.begin_write()?)
}
Ok(self.database.begin_write()?)
}

fn increment_statistic(wtx: &WriteTransaction, statistic: Statistic, n: u64) -> Result {
Expand Down Expand Up @@ -3210,4 +3202,41 @@ mod tests {
)
}
}

#[test]
fn recover_from_reorg() {
for context in Context::configurations() {
context.mine_blocks(1);

let txid = context.rpc_server.broadcast_tx(TransactionTemplate {
inputs: &[(1, 0, 0)],
witness: inscription("text/plain;charset=utf-8", "hello").to_witness(),
..Default::default()
});

let first = InscriptionId { txid, index: 0 };
let first_location = SatPoint {
outpoint: OutPoint { txid, vout: 0 },
offset: 0,
};

context.mine_blocks(6);

let txid = context.rpc_server.broadcast_tx(TransactionTemplate {
inputs: &[(2, 0, 0)],
witness: inscription("text/plain;charset=utf-8", "hello").to_witness(),
..Default::default()
});

let _second = InscriptionId { txid, index: 0 };

context.mine_blocks(1);

context.rpc_server.invalidate_tip();

context
.index
.assert_inscription_location(first, first_location, None);
}
}
}
132 changes: 72 additions & 60 deletions src/index/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,24 @@ impl From<Block> for BlockData {
}
}

pub(crate) struct Updater {
pub(crate) struct Updater<'index> {
range_cache: HashMap<OutPointValue, Vec<u8>>,
height: u64,
index: &'index Index,
index_sats: bool,
initial_sync: bool,
sat_ranges_since_flush: u64,
savepoints: VecDeque<u64>,
outputs_cached: u64,
outputs_inserted_since_flush: u64,
outputs_traversed: u64,
outpoint_sender: Sender<OutPoint>,
value_receiver: Receiver<u64>,
value_cache: HashMap<OutPoint, u64>,
}

impl Updater {
pub(crate) fn update(index: &Index) -> Result {
impl<'index> Updater<'_> {
pub(crate) fn new(index: &'index Index) -> Result<Updater<'index>> {
let wtx = index.begin_write()?;

let height = wtx
Expand All @@ -51,35 +57,31 @@ impl Updater {
.map(|(height, _hash)| height.value() + 1)
.unwrap_or(0);

wtx
.open_table(WRITE_TRANSACTION_STARTING_BLOCK_COUNT_TO_TIMESTAMP)?
.insert(
&height,
&SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|duration| duration.as_millis())
.unwrap_or(0),
)?;

let mut updater = Self {
wtx.commit();

let (outpoint_sender, value_receiver) = Self::spawn_fetcher(index)?;

Ok(Updater {
range_cache: HashMap::new(),
height,
index,
index_sats: index.has_sat_index()?,
initial_sync: true,
sat_ranges_since_flush: 0,
savepoints: VecDeque::with_capacity(6),
outputs_cached: 0,
outputs_inserted_since_flush: 0,
outputs_traversed: 0,
};

updater.update_index(index, wtx)
outpoint_sender,
value_receiver,
value_cache: HashMap::new(),
})
}

fn update_index<'index>(
&mut self,
index: &'index Index,
mut wtx: WriteTransaction<'index>,
) -> Result {
let starting_height = index.client.get_block_count()? + 1;
pub(crate) fn update_index(&mut self) -> Result {
let mut wtx = self.index.begin_write()?;

let starting_height = self.index.client.get_block_count()? + 1;

let mut progress_bar = if cfg!(test)
|| log_enabled!(log::Level::Info)
Expand All @@ -96,27 +98,46 @@ impl Updater {
Some(progress_bar)
};

let rx = Self::fetch_blocks_from(index, self.height, self.index_sats)?;

let (mut outpoint_sender, mut value_receiver) = Self::spawn_fetcher(index)?;
let rx = Self::fetch_blocks_from(self.index, self.height, self.index_sats)?;

let mut uncommitted = 0;
let mut value_cache = HashMap::new();
while let Ok(block) = rx.recv() {
self.index_block(
index,
&mut outpoint_sender,
&mut value_receiver,
&mut wtx,
block,
&mut value_cache,
)?;
if self.index.client.get_block_count()? - self.index.block_height()?.unwrap_or(Height(0)).n()
< 50
Copy link
Contributor

@victorkirov victorkirov Aug 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be the maximum number of savepoints (i.e. 6). Anything higher is unnecessary unless you want to have extra reorg resistance during the initial indexing 😋

{
log::info!("not in initial sync; starting to create savepoints");
self.initial_sync = false;

if self.savepoints.len() >= 6 {
dbg!("here");
wtx.delete_persistent_savepoint(self.savepoints.pop_front().unwrap())?;
}

let savepoints: Vec<u64> = wtx.list_persistent_savepoints()?.into_iter().collect();
dbg!(&savepoints);
// self.savepoints.push_back(wtx.ephemeral_savepoint()?);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to create a persistent save_point here. Also, you'll need to commit wtx and reinitialise it directly after.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have to commit the txn and reinitialise it, it might make more sense for this logic to live directly after a self.commit(wtx) call or maybe even inside the commit function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I think wtx has to be clean (no unflushed writes) before creating the savepoint, and you have to commit straight after it as it would make the wtx dirty.


if let Some(prev_height) = self.height.checked_sub(1) {
let prev_hash = self.index.client.get_block_hash(prev_height)?;
dbg!(&prev_hash);
if prev_hash != block.header.prev_blockhash {
log::info!("reorg detected at or before {prev_height}; rolling back index");
self.index.reorged.store(true, atomic::Ordering::Relaxed);

let savepoint = wtx.get_persistent_savepoint(self.savepoints.pop_front().unwrap())?;
wtx.restore_savepoint(&savepoint)?;
self.savepoints.clear();
}
}
Comment on lines +120 to +131
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic should probably live before the create savepoint logic, otherwise we create a savepoint and immediately rollback to an older one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

your right, I put it into the Index now

}

self.index_block(&mut wtx, block)?;

if let Some(progress_bar) = &mut progress_bar {
progress_bar.inc(1);

if progress_bar.position() > progress_bar.length().unwrap() {
if let Ok(count) = index.client.get_block_count() {
if let Ok(count) = self.index.client.get_block_count() {
progress_bar.set_length(count + 1);
} else {
log::warn!("Failed to fetch latest block height");
Expand All @@ -127,10 +148,10 @@ impl Updater {
uncommitted += 1;

if uncommitted == 5000 {
self.commit(wtx, value_cache)?;
value_cache = HashMap::new();
self.commit(wtx)?;
self.value_cache = HashMap::new();
uncommitted = 0;
wtx = index.begin_write()?;
wtx = self.index.begin_write()?;
let height = wtx
.open_table(HEIGHT_TO_BLOCK_HASH)?
.range(0..)?
Expand Down Expand Up @@ -160,7 +181,7 @@ impl Updater {
}

if uncommitted > 0 {
self.commit(wtx, value_cache)?;
self.commit(wtx)?;
}

if let Some(progress_bar) = &mut progress_bar {
Expand Down Expand Up @@ -321,24 +342,16 @@ impl Updater {
Ok((outpoint_sender, value_receiver))
}

fn index_block(
&mut self,
index: &Index,
outpoint_sender: &mut Sender<OutPoint>,
value_receiver: &mut Receiver<u64>,
wtx: &mut WriteTransaction,
block: BlockData,
value_cache: &mut HashMap<OutPoint, u64>,
) -> Result<()> {
fn index_block(&mut self, wtx: &mut WriteTransaction, block: BlockData) -> Result<()> {
// If value_receiver still has values something went wrong with the last block
// Could be an assert, shouldn't recover from this and commit the last block
let Err(TryRecvError::Empty) = value_receiver.try_recv() else {
let Err(TryRecvError::Empty) = self.value_receiver.try_recv() else {
return Err(anyhow!("Previous block did not consume all input values"));
};

let mut outpoint_to_value = wtx.open_table(OUTPOINT_TO_VALUE)?;

let index_inscriptions = self.height >= index.first_inscription_height;
let index_inscriptions = self.height >= self.index.first_inscription_height;

if index_inscriptions {
// Send all missing input outpoints to be fetched right away
Expand All @@ -360,7 +373,7 @@ impl Updater {
continue;
}
// We don't need input values we already have in our value_cache from earlier blocks
if value_cache.contains_key(&prev_output) {
if self.value_cache.contains_key(&prev_output) {
continue;
}
// We don't need input values we already have in our outpoint_to_value table from earlier blocks that
Expand All @@ -369,7 +382,7 @@ impl Updater {
continue;
}
// We don't know the value of this tx input. Send this outpoint to background thread to be fetched
outpoint_sender.blocking_send(prev_output)?;
self.outpoint_sender.blocking_send(prev_output)?;
}
}
}
Expand All @@ -393,7 +406,7 @@ impl Updater {
let prev_hash = height_to_block_hash.get(&prev_height)?.unwrap();

if prev_hash.value() != &block.header.prev_blockhash.as_raw_hash().to_byte_array() {
index.reorged.store(true, atomic::Ordering::Relaxed);
self.index.reorged.store(true, atomic::Ordering::Relaxed);
return Err(anyhow!("reorg detected at or before {prev_height}"));
}
}
Expand Down Expand Up @@ -421,7 +434,7 @@ impl Updater {
let mut inscription_updater = InscriptionUpdater::new(
self.height,
&mut inscription_id_to_satpoint,
value_receiver,
&mut self.value_receiver,
&mut inscription_id_to_inscription_entry,
lost_sats,
&mut inscription_number_to_inscription_id,
Expand All @@ -431,7 +444,7 @@ impl Updater {
&mut satpoint_to_inscription_id,
block.header.time,
unbound_inscriptions,
value_cache,
&mut self.value_cache,
)?;

if self.index_sats {
Expand Down Expand Up @@ -471,7 +484,6 @@ impl Updater {
input_sat_ranges.push_back(SatRange::load(chunk.try_into().unwrap()));
}
}

self.index_transaction_sats(
tx,
*txid,
Expand Down Expand Up @@ -616,7 +628,7 @@ impl Updater {
Ok(())
}

fn commit(&mut self, wtx: WriteTransaction, value_cache: HashMap<OutPoint, u64>) -> Result {
fn commit(&mut self, wtx: WriteTransaction) -> Result {
log::info!(
"Committing at block height {}, {} outputs traversed, {} in map, {} cached",
self.height,
Expand Down Expand Up @@ -645,8 +657,8 @@ impl Updater {
{
let mut outpoint_to_value = wtx.open_table(OUTPOINT_TO_VALUE)?;

for (outpoint, value) in value_cache {
outpoint_to_value.insert(&outpoint.store(), &value)?;
for (outpoint, value) in &self.value_cache {
outpoint_to_value.insert(&outpoint.store(), value)?;
}
}

Expand Down