Skip to content

Commit

Permalink
fix: seed forking handler with unconfirmed blocks to improve startup …
Browse files Browse the repository at this point in the history
…stability (#505)

### Description

If Chainhook restarts in the middle of a reorg taking place, it doesn't
have any context for choosing the canonical fork. Then when block
collisions take place, Chainhook fails to process the new blocks,
causing gaps in the blocks Chainhook has available for evaluation.

This PR seeds the stacks block indexer with unconfirmed blocks on
startup, so that Chainhook has the necessary context to handle a reorg.

Most of the PR is to add two tests:
 - I've added some new functionality to our very thorough indexer tests
- In addition to providing the blocks to be mined and the order to mine
them, we also now allow providing some "unconfirmed" blocks to seed the
block pool with.
- I've added some test cases that reproduce what caused outages on the
Platform's Chainhook node
 - I've added a new service test that:
   - Verifies that unconfirmed blocks are stored on restart
- Verifies that those blocks are used to seed the block pool, and that a
reorg is handled correctly.

I committed these tests _before_ adding the fix, so you can confirm the
fix by checking out the commits with the new tests
(96d8249b239e53a877a98ff493ef7ae3571aca37 and
9aad55e2a88bf5b75c3e49d079c5967b7a8cf0e3), seeing that the tests break,
then pulling the last commit to see that the fix works.

Fixes #487
  • Loading branch information
vabanaerytk authored Mar 14, 2024
1 parent 5a7d64e commit a84fc4c
Show file tree
Hide file tree
Showing 14 changed files with 851 additions and 533 deletions.
2 changes: 1 addition & 1 deletion components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
info!(ctx.expect_logger(), "Starting service...",);

let mut service = Service::new(config, ctx);
return service.run(predicates).await;
return service.run(predicates, None).await;
}
},
Command::Config(subcmd) => match subcmd {
Expand Down
24 changes: 21 additions & 3 deletions components/chainhook-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv;
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop};
use crate::storage::{
confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, open_readwrite_stacks_db_conn,
confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, get_all_unconfirmed_blocks,
open_readonly_stacks_db_conn_with_retry, open_readwrite_stacks_db_conn,
};

use chainhook_sdk::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification};
Expand All @@ -20,7 +21,7 @@ use chainhook_sdk::types::{Chain, StacksChainEvent};
use chainhook_sdk::utils::Context;
use redis::{Commands, Connection};

use std::sync::mpsc::channel;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::time::{SystemTime, UNIX_EPOCH};

use self::http_api::get_entry_from_predicates_db;
Expand All @@ -38,6 +39,7 @@ impl Service {
pub async fn run(
&mut self,
predicates_from_startup: Vec<ChainhookFullSpecification>,
observer_commands_tx_rx: Option<(Sender<ObserverCommand>, Receiver<ObserverCommand>)>,
) -> Result<(), String> {
let mut chainhook_config = ChainhookConfig::new();

Expand Down Expand Up @@ -149,7 +151,8 @@ impl Service {
}
}

let (observer_command_tx, observer_command_rx) = channel();
let (observer_command_tx, observer_command_rx) =
observer_commands_tx_rx.unwrap_or(channel());
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
// let (ordinal_indexer_command_tx, ordinal_indexer_command_rx) = channel();

Expand Down Expand Up @@ -211,6 +214,20 @@ impl Service {
});
}

let ctx = self.ctx.clone();
let stacks_db =
open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, &ctx)?;
let unconfirmed_blocks = match get_all_unconfirmed_blocks(&stacks_db, &ctx) {
Ok(blocks) => Some(blocks),
Err(e) => {
info!(
self.ctx.expect_logger(),
"Failed to get stacks blocks from db to seed block pool: {}", e
);
None
}
};

let observer_event_tx_moved = observer_event_tx.clone();
let moved_observer_command_tx = observer_command_tx.clone();
let _ = start_event_observer(
Expand All @@ -219,6 +236,7 @@ impl Service {
observer_command_rx,
Some(observer_event_tx_moved),
None,
unconfirmed_blocks,
self.ctx.clone(),
);

Expand Down
263 changes: 237 additions & 26 deletions components/chainhook-cli/src/service/tests/helpers/mock_service.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,37 @@
use crate::config::Config;
use crate::config::EventSourceConfig;
use crate::config::LimitsConfig;
use crate::config::MonitoringConfig;
use crate::config::PathConfig;
use crate::config::PredicatesApi;
use crate::config::PredicatesApiConfig;
use crate::config::StorageConfig;
use crate::config::DEFAULT_REDIS_URI;
use crate::service::http_api::start_predicate_api_server;
use crate::service::PredicateStatus;
use crate::service::Service;
use chainhook_sdk::chainhooks::types::ChainhookFullSpecification;
use chainhook_sdk::indexer::IndexerConfig;
use chainhook_sdk::observer::ObserverCommand;
use chainhook_sdk::types::BitcoinBlockSignaling;
use chainhook_sdk::types::BitcoinNetwork;
use chainhook_sdk::types::Chain;
use chainhook_sdk::types::StacksNetwork;
use chainhook_sdk::types::StacksNodeConfig;
use chainhook_sdk::utils::Context;
use crate::config::{
Config, EventSourceConfig, LimitsConfig, MonitoringConfig, PathConfig, PredicatesApi,
PredicatesApiConfig, StorageConfig, DEFAULT_REDIS_URI,
};
use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv;
use crate::service::{
http_api::start_predicate_api_server, update_predicate_spec, update_predicate_status,
PredicateStatus, Service,
};
use chainhook_sdk::{
chainhooks::types::{
ChainhookFullSpecification, ChainhookSpecification, StacksChainhookFullSpecification,
},
indexer::IndexerConfig,
observer::ObserverCommand,
types::{BitcoinBlockSignaling, BitcoinNetwork, Chain, StacksNetwork, StacksNodeConfig},
utils::Context,
};
use redis::Commands;
use reqwest::Method;
use rocket::serde::json::Value as JsonValue;
use rocket::Shutdown;
use std::path::PathBuf;
use std::process::Stdio;
use std::process::{Child, Command};
use std::sync::mpsc;
use std::sync::mpsc::channel;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::Sender;

use super::get_free_port;
use super::mock_bitcoin_rpc::mock_bitcoin_rpc;
use super::mock_stacks_node::create_tmp_working_dir;
use super::mock_stacks_node::write_stacks_blocks_to_tsv;

pub async fn get_predicate_status(uuid: &str, port: u16) -> Result<PredicateStatus, String> {
let mut attempts = 0;
Expand Down Expand Up @@ -328,14 +332,19 @@ pub fn get_chainhook_config(

pub async fn start_chainhook_service(
config: Config,
chainhook_port: u16,
ping_startup_port: u16,
startup_predicates: Option<Vec<ChainhookFullSpecification>>,
ctx: &Context,
) -> Result<(), String> {
) -> Result<Sender<ObserverCommand>, String> {
let mut service = Service::new(config, ctx.clone());
let (observer_command_tx, observer_command_rx) = mpsc::channel();
let moved_observer_command_tx = observer_command_tx.clone();
let _ = hiro_system_kit::thread_named("Chainhook service")
.spawn(move || {
let future = service.run(startup_predicates.unwrap_or(vec![]));
let future = service.run(
startup_predicates.unwrap_or(vec![]),
Some((moved_observer_command_tx, observer_command_rx)),
);
let _ = hiro_system_kit::nestable_block_on(future);
})
.map_err(|e| {
Expand All @@ -354,14 +363,216 @@ pub async fn start_chainhook_service(
}

if let Ok(_client) = reqwest::Client::new()
.get(format!("http://localhost:{}/ping", chainhook_port))
.get(format!("http://localhost:{}/ping", ping_startup_port))
.send()
.await
{
break Ok(()); // Server is ready
break Ok(observer_command_tx); // Server is ready
}

tokio::time::sleep(std::time::Duration::from_secs(1)).await;
attempts += 1;
}
}

pub struct TestSetupResult {
pub redis_process: Child,
pub working_dir: String,
pub chainhook_service_port: u16,
pub redis_port: u16,
pub stacks_ingestion_port: u16,
pub stacks_rpc_port: u16,
pub bitcoin_rpc_port: u16,
pub prometheus_port: u16,
pub observer_command_tx: Sender<ObserverCommand>,
}

pub async fn setup_stacks_chainhook_test(
starting_chain_tip: u64,
redis_seed: Option<(StacksChainhookFullSpecification, PredicateStatus)>,
startup_predicates: Option<Vec<ChainhookFullSpecification>>,
) -> TestSetupResult {
let (
redis_port,
chainhook_service_port,
stacks_rpc_port,
stacks_ingestion_port,
bitcoin_rpc_port,
prometheus_port,
) = setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}"));

let mut redis_process = start_redis(redis_port)
.await
.unwrap_or_else(|e| panic!("test failed with error: {e}"));
flush_redis(redis_port);

let logger = hiro_system_kit::log::setup_logger();
let _guard = hiro_system_kit::log::setup_global_logger(logger.clone());
let ctx = Context {
logger: Some(logger),
tracer: false,
};

if let Some((predicate, status)) = redis_seed {
let client = redis::Client::open(format!("redis://localhost:{redis_port}/"))
.unwrap_or_else(|e| {
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});
let mut connection = client.get_connection().unwrap_or_else(|e| {
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});
let stacks_spec = predicate
.into_selected_network_specification(&StacksNetwork::Devnet)
.unwrap_or_else(|e| {
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});

let spec = ChainhookSpecification::Stacks(stacks_spec);
update_predicate_spec(&spec.key(), &spec, &mut connection, &ctx);
update_predicate_status(&spec.key(), status, &mut connection, &ctx);
}

let (working_dir, tsv_dir) = create_tmp_working_dir().unwrap_or_else(|e| {
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});

write_stacks_blocks_to_tsv(starting_chain_tip, &tsv_dir).unwrap_or_else(|e| {
std::fs::remove_dir_all(&working_dir).unwrap();
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});

let mut config = get_chainhook_config(
redis_port,
chainhook_service_port,
stacks_rpc_port,
stacks_ingestion_port,
bitcoin_rpc_port,
&working_dir,
&tsv_dir,
Some(prometheus_port),
);

consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx)
.await
.unwrap_or_else(|e| {
std::fs::remove_dir_all(&working_dir).unwrap();
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});

let observer_command_tx =
start_chainhook_service(config, chainhook_service_port, startup_predicates, &ctx)
.await
.unwrap_or_else(|e| {
std::fs::remove_dir_all(&working_dir).unwrap();
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});
TestSetupResult {
redis_process,
working_dir,
chainhook_service_port,
redis_port,
stacks_ingestion_port,
stacks_rpc_port,
bitcoin_rpc_port,
prometheus_port,
observer_command_tx,
}
}

pub async fn setup_bitcoin_chainhook_test(starting_chain_tip: u64) -> TestSetupResult {
let (
redis_port,
chainhook_service_port,
stacks_rpc_port,
stacks_ingestion_port,
bitcoin_rpc_port,
prometheus_port,
) = setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}"));

let mut redis_process = start_redis(redis_port)
.await
.unwrap_or_else(|e| panic!("test failed with error: {e}"));

flush_redis(redis_port);
let (working_dir, tsv_dir) = create_tmp_working_dir().unwrap_or_else(|e| {
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});

let logger = hiro_system_kit::log::setup_logger();
let _guard = hiro_system_kit::log::setup_global_logger(logger.clone());
let ctx = Context {
logger: Some(logger),
tracer: false,
};

let _ = hiro_system_kit::thread_named("Bitcoin rpc service")
.spawn(move || {
let future = mock_bitcoin_rpc(bitcoin_rpc_port, starting_chain_tip);
let _ = hiro_system_kit::nestable_block_on(future);
})
.expect("unable to spawn thread");

let config = get_chainhook_config(
redis_port,
chainhook_service_port,
stacks_rpc_port,
stacks_ingestion_port,
bitcoin_rpc_port,
&working_dir,
&tsv_dir,
Some(prometheus_port),
);

let terminator_tx = start_chainhook_service(config, chainhook_service_port, None, &ctx)
.await
.unwrap_or_else(|e| {
std::fs::remove_dir_all(&working_dir).unwrap();
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});
TestSetupResult {
redis_process,
working_dir,
chainhook_service_port,
redis_port,
stacks_ingestion_port,
stacks_rpc_port,
bitcoin_rpc_port,
prometheus_port,
observer_command_tx: terminator_tx,
}
}

pub fn setup_chainhook_service_ports() -> Result<(u16, u16, u16, u16, u16, u16), String> {
let redis_port = get_free_port()?;
let chainhook_service_port = get_free_port()?;
let stacks_rpc_port = get_free_port()?;
let stacks_ingestion_port = get_free_port()?;
let bitcoin_rpc_port = get_free_port()?;
let prometheus_port = get_free_port()?;
Ok((
redis_port,
chainhook_service_port,
stacks_rpc_port,
stacks_ingestion_port,
bitcoin_rpc_port,
prometheus_port,
))
}
Loading

0 comments on commit a84fc4c

Please sign in to comment.