Skip to content

Commit

Permalink
Merge pull request #35 from nervosnetwork/quake/long-fork-detection
Browse files Browse the repository at this point in the history
feat: add long fork detection
  • Loading branch information
quake authored Jul 14, 2021
2 parents 3cb3ffa + 5bcd89c commit b5bb875
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 3 deletions.
45 changes: 45 additions & 0 deletions src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,23 @@ where
}))
}

pub fn get_block_hash(&self, block_number: BlockNumber) -> Result<Option<Byte32>, Error> {
let mut key_prefix_header = vec![KeyPrefix::Header as u8];
key_prefix_header.extend_from_slice(&block_number.to_be_bytes());
Ok(
match self
.store
.iter(&key_prefix_header, IteratorDirection::Forward)?
.next()
{
Some((key, _v)) if key.starts_with(&key_prefix_header) => {
Some(Byte32::from_slice(&key[9..]).expect("stored block key"))
}
_ => None,
},
)
}

pub fn prune(&self) -> Result<(), Error> {
let (tip_number, _tip_hash) = self.tip()?.expect("stored tip");
if tip_number > self.keep_num {
Expand Down Expand Up @@ -1475,4 +1492,32 @@ mod tests {
.len()
);
}

#[test]
fn get_block_hash() {
let indexer = new_indexer::<RocksdbStore>("get_block_hash");

let block_hashes: Vec<Byte32> = (0..10)
.map(|i| {
let cellbase = TransactionBuilder::default()
.input(CellInput::new_cellbase_input(i))
.build();
let block = BlockBuilder::default()
.transaction(cellbase)
.header(HeaderBuilder::default().number(i.pack()).build())
.build();
indexer.append(&block).unwrap();
block.hash()
})
.collect();

block_hashes.into_iter().enumerate().for_each(|(i, hash)| {
assert_eq!(
hash,
indexer.get_block_hash(i as BlockNumber).unwrap().unwrap()
)
});

assert!(indexer.get_block_hash(10).unwrap().is_none());
}
}
37 changes: 34 additions & 3 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ impl Service {
}

pub async fn poll(&self, rpc_client: gen_client::Client) {
let indexer = Indexer::new(self.store.clone(), 100, 1000);
// assume that long fork will not happen >= 100 blocks.
let keep_num = 100;
let indexer = Indexer::new(self.store.clone(), keep_num, 1000);
// 0.37.0 and above supports hex format
let use_hex_format = loop {
match rpc_client.local_node_info().await {
Expand All @@ -98,8 +100,37 @@ impl Service {
info!("append {}, {}", block.number(), block.hash());
indexer.append(&block).expect("append block should be OK");
} else {
info!("rollback {}, {}", tip_number, tip_hash);
indexer.rollback().expect("rollback block should be OK");
// Long fork detection
let longest_fork_number = tip_number.saturating_sub(keep_num);
match get_block_by_number(
&rpc_client,
longest_fork_number,
use_hex_format,
)
.await
{
Ok(Some(block)) => {
let stored_block_hash = indexer
.get_block_hash(longest_fork_number)
.expect("get block hash should be OK")
.expect("stored block header");
if block.hash() != stored_block_hash {
error!("long fork detected, ckb-indexer stored block {} => {:#x}, ckb node returns block {} => {:#x}, please check if ckb-indexer is connected to the same network ckb node.", longest_fork_number, stored_block_hash, longest_fork_number, block.hash());
thread::sleep(self.poll_interval);
} else {
info!("rollback {}, {}", tip_number, tip_hash);
indexer.rollback().expect("rollback block should be OK");
}
}
Ok(None) => {
error!("long fork detected, ckb-indexer stored block {}, ckb node returns none, please check if ckb-indexer is connected to the same network ckb node.", longest_fork_number);
thread::sleep(self.poll_interval);
}
Err(err) => {
error!("cannot get block from ckb node, error: {}", err);
thread::sleep(self.poll_interval);
}
}
}
}
Ok(None) => {
Expand Down

0 comments on commit b5bb875

Please sign in to comment.