Skip to content

Commit

Permalink
fix reorg on startup by seeding stacks block pool with unconfirmed bl…
Browse files Browse the repository at this point in the history
…ocks
  • Loading branch information
MicaiahReid committed Feb 20, 2024
1 parent 6af90ce commit 4a1a635
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 6 deletions.
2 changes: 1 addition & 1 deletion components/chainhook-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl Service {
observer_command_rx,
Some(observer_event_tx_moved),
None,
// unconfirmed_blocks,
unconfirmed_blocks,
self.ctx.clone(),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async fn start_and_ping_event_observer(config: EventObserverConfig, ingestion_po
observer_commands_rx,
None,
None,
// None,
None,
ctx,
)
.unwrap();
Expand Down
6 changes: 5 additions & 1 deletion components/chainhook-sdk/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::utils::{AbstractBlock, Context};

use chainhook_types::{
BitcoinBlockSignaling, BitcoinNetwork, BlockHeader, BlockIdentifier, BlockchainEvent,
StacksChainEvent, StacksNetwork, StacksNodeConfig,
StacksBlockData, StacksChainEvent, StacksNetwork, StacksNodeConfig,
};
use hiro_system_kit::slog;
use rocket::serde::json::Value as JsonValue;
Expand Down Expand Up @@ -92,6 +92,10 @@ impl Indexer {
}
}

pub fn seed_stacks_block_pool(&mut self, blocks: Vec<StacksBlockData>, ctx: &Context) {
self.stacks_blocks_pool.seed_block_pool(blocks, ctx);
}

pub fn handle_bitcoin_header(
&mut self,
header: BlockHeader,
Expand Down
26 changes: 26 additions & 0 deletions components/chainhook-sdk/src/indexer/stacks/blocks_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,32 @@ impl StacksBlockPool {
}
}

pub fn seed_block_pool(&mut self, blocks: Vec<StacksBlockData>, ctx: &Context) {
ctx.try_log(|logger| {
slog::info!(logger, "Seeding block pool with {} blocks", blocks.len())
});
for block in blocks {
let existing_entry = self.block_store.get(&block.block_identifier.clone());
if existing_entry.is_some() {
ctx.try_log(|logger| {
slog::info!(
logger,
"Seeding block pool: Stacks {} has already been processed; skipping",
block.block_identifier
)
});
continue;
}

match self.process_block(block, ctx) {
Ok(_) => {}
Err(e) => {
ctx.try_log(|logger| slog::info!(logger, "Error seeding block pool: {}", e));
}
}
}
}

pub fn process_block(
&mut self,
block: StacksBlockData,
Expand Down
2 changes: 1 addition & 1 deletion components/chainhook-sdk/src/indexer/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn process_stacks_blocks_and_check_expectations(
let mut blocks_processor = StacksBlockPool::new();

if let Some(block_pool_seed) = block_pool_seed {
// blocks_processor.seed_block_pool(block_pool_seed, &ctx);
blocks_processor.seed_block_pool(block_pool_seed, &Context::empty());
}

for (block_event, check_chain_event_expectations) in steps.into_iter() {
Expand Down
10 changes: 8 additions & 2 deletions components/chainhook-sdk/src/observer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use bitcoincore_rpc::{Auth, Client, RpcApi};
use chainhook_types::{
BitcoinBlockData, BitcoinBlockSignaling, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData,
BitcoinChainUpdatedWithReorgData, BitcoinNetwork, BlockIdentifier, BlockchainEvent,
StacksChainEvent, StacksNetwork, StacksNodeConfig, TransactionIdentifier,
StacksBlockData, StacksChainEvent, StacksNetwork, StacksNodeConfig, TransactionIdentifier,
};
use hiro_system_kit;
use hiro_system_kit::slog;
Expand Down Expand Up @@ -426,6 +426,7 @@ pub fn start_event_observer(
observer_commands_rx: Receiver<ObserverCommand>,
observer_events_tx: Option<crossbeam_channel::Sender<ObserverEvent>>,
observer_sidecar: Option<ObserverSidecar>,
stacks_block_pool_seed: Option<Vec<StacksBlockData>>,
ctx: Context,
) -> Result<(), Box<dyn Error>> {
match config.bitcoin_block_signaling {
Expand Down Expand Up @@ -460,6 +461,7 @@ pub fn start_event_observer(
observer_commands_rx,
observer_events_tx,
observer_sidecar,
stacks_block_pool_seed,
context_cloned,
);
let _ = hiro_system_kit::nestable_block_on(future);
Expand Down Expand Up @@ -542,6 +544,7 @@ pub async fn start_stacks_event_observer(
observer_commands_rx: Receiver<ObserverCommand>,
observer_events_tx: Option<crossbeam_channel::Sender<ObserverEvent>>,
observer_sidecar: Option<ObserverSidecar>,
stacks_block_pool_seed: Option<Vec<StacksBlockData>>,
ctx: Context,
) -> Result<(), Box<dyn Error>> {
let indexer_config = IndexerConfig {
Expand All @@ -553,7 +556,10 @@ pub async fn start_stacks_event_observer(
bitcoin_block_signaling: config.bitcoin_block_signaling.clone(),
};

let indexer = Indexer::new(indexer_config.clone());
let mut indexer = Indexer::new(indexer_config.clone());
if let Some(stacks_block_pool_seed) = stacks_block_pool_seed {
indexer.seed_stacks_block_pool(stacks_block_pool_seed, &ctx);
}

let log_level = if config.display_logs {
if cfg!(feature = "cli") {
Expand Down

0 comments on commit 4a1a635

Please sign in to comment.