Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: seed forking handler with unconfirmed blocks to improve startup stability #505

Merged
merged 9 commits into from
Mar 14, 2024
2 changes: 1 addition & 1 deletion components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
info!(ctx.expect_logger(), "Starting service...",);

let mut service = Service::new(config, ctx);
return service.run(predicates).await;
return service.run(predicates, None).await;
}
},
Command::Config(subcmd) => match subcmd {
Expand Down
24 changes: 21 additions & 3 deletions components/chainhook-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv;
use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server};
use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop};
use crate::storage::{
confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, open_readwrite_stacks_db_conn,
confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, get_all_unconfirmed_blocks,
open_readonly_stacks_db_conn_with_retry, open_readwrite_stacks_db_conn,
};

use chainhook_sdk::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification};
Expand All @@ -20,7 +21,7 @@ use chainhook_sdk::types::{Chain, StacksChainEvent};
use chainhook_sdk::utils::Context;
use redis::{Commands, Connection};

use std::sync::mpsc::channel;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::time::{SystemTime, UNIX_EPOCH};

use self::http_api::get_entry_from_predicates_db;
Expand All @@ -38,6 +39,7 @@ impl Service {
pub async fn run(
&mut self,
predicates_from_startup: Vec<ChainhookFullSpecification>,
observer_commands_tx_rx: Option<(Sender<ObserverCommand>, Receiver<ObserverCommand>)>,
Copy link
Contributor Author

@MicaiahReid MicaiahReid Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being able to create the observer commands tx externally allows stopping the service from a test.

I don't usually love changing non-test code for tests, but I was stuck on this one.

) -> Result<(), String> {
let mut chainhook_config = ChainhookConfig::new();

Expand Down Expand Up @@ -149,7 +151,8 @@ impl Service {
}
}

let (observer_command_tx, observer_command_rx) = channel();
let (observer_command_tx, observer_command_rx) =
observer_commands_tx_rx.unwrap_or(channel());
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
// let (ordinal_indexer_command_tx, ordinal_indexer_command_rx) = channel();

Expand Down Expand Up @@ -211,6 +214,20 @@ impl Service {
});
}

let ctx = self.ctx.clone();
let stacks_db =
open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, &ctx)?;
let unconfirmed_blocks = match get_all_unconfirmed_blocks(&stacks_db, &ctx) {
Ok(blocks) => Some(blocks),
Err(e) => {
info!(
self.ctx.expect_logger(),
"Failed to get stacks blocks from db to seed block pool: {}", e
);
None
}
};

let observer_event_tx_moved = observer_event_tx.clone();
let moved_observer_command_tx = observer_command_tx.clone();
let _ = start_event_observer(
Expand All @@ -219,6 +236,7 @@ impl Service {
observer_command_rx,
Some(observer_event_tx_moved),
None,
unconfirmed_blocks,
self.ctx.clone(),
);

Expand Down
263 changes: 237 additions & 26 deletions components/chainhook-cli/src/service/tests/helpers/mock_service.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,37 @@
use crate::config::Config;
use crate::config::EventSourceConfig;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is mostly just moving some test setup functions here from a different file, and adding the TestSetupResult struct to clean up the response a bit.

use crate::config::LimitsConfig;
use crate::config::MonitoringConfig;
use crate::config::PathConfig;
use crate::config::PredicatesApi;
use crate::config::PredicatesApiConfig;
use crate::config::StorageConfig;
use crate::config::DEFAULT_REDIS_URI;
use crate::service::http_api::start_predicate_api_server;
use crate::service::PredicateStatus;
use crate::service::Service;
use chainhook_sdk::chainhooks::types::ChainhookFullSpecification;
use chainhook_sdk::indexer::IndexerConfig;
use chainhook_sdk::observer::ObserverCommand;
use chainhook_sdk::types::BitcoinBlockSignaling;
use chainhook_sdk::types::BitcoinNetwork;
use chainhook_sdk::types::Chain;
use chainhook_sdk::types::StacksNetwork;
use chainhook_sdk::types::StacksNodeConfig;
use chainhook_sdk::utils::Context;
use crate::config::{
Config, EventSourceConfig, LimitsConfig, MonitoringConfig, PathConfig, PredicatesApi,
PredicatesApiConfig, StorageConfig, DEFAULT_REDIS_URI,
};
use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv;
use crate::service::{
http_api::start_predicate_api_server, update_predicate_spec, update_predicate_status,
PredicateStatus, Service,
};
use chainhook_sdk::{
chainhooks::types::{
ChainhookFullSpecification, ChainhookSpecification, StacksChainhookFullSpecification,
},
indexer::IndexerConfig,
observer::ObserverCommand,
types::{BitcoinBlockSignaling, BitcoinNetwork, Chain, StacksNetwork, StacksNodeConfig},
utils::Context,
};
use redis::Commands;
use reqwest::Method;
use rocket::serde::json::Value as JsonValue;
use rocket::Shutdown;
use std::path::PathBuf;
use std::process::Stdio;
use std::process::{Child, Command};
use std::sync::mpsc;
use std::sync::mpsc::channel;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::Sender;

use super::get_free_port;
use super::mock_bitcoin_rpc::mock_bitcoin_rpc;
use super::mock_stacks_node::create_tmp_working_dir;
use super::mock_stacks_node::write_stacks_blocks_to_tsv;

pub async fn get_predicate_status(uuid: &str, port: u16) -> Result<PredicateStatus, String> {
let mut attempts = 0;
Expand Down Expand Up @@ -328,14 +332,19 @@ pub fn get_chainhook_config(

pub async fn start_chainhook_service(
config: Config,
chainhook_port: u16,
ping_startup_port: u16,
startup_predicates: Option<Vec<ChainhookFullSpecification>>,
ctx: &Context,
) -> Result<(), String> {
) -> Result<Sender<ObserverCommand>, String> {
let mut service = Service::new(config, ctx.clone());
let (observer_command_tx, observer_command_rx) = mpsc::channel();
let moved_observer_command_tx = observer_command_tx.clone();
let _ = hiro_system_kit::thread_named("Chainhook service")
.spawn(move || {
let future = service.run(startup_predicates.unwrap_or(vec![]));
let future = service.run(
startup_predicates.unwrap_or(vec![]),
Some((moved_observer_command_tx, observer_command_rx)),
);
let _ = hiro_system_kit::nestable_block_on(future);
})
.map_err(|e| {
Expand All @@ -354,14 +363,216 @@ pub async fn start_chainhook_service(
}

if let Ok(_client) = reqwest::Client::new()
.get(format!("http://localhost:{}/ping", chainhook_port))
.get(format!("http://localhost:{}/ping", ping_startup_port))
.send()
.await
{
break Ok(()); // Server is ready
break Ok(observer_command_tx); // Server is ready
}

tokio::time::sleep(std::time::Duration::from_secs(1)).await;
attempts += 1;
}
}

pub struct TestSetupResult {
pub redis_process: Child,
pub working_dir: String,
pub chainhook_service_port: u16,
pub redis_port: u16,
pub stacks_ingestion_port: u16,
pub stacks_rpc_port: u16,
pub bitcoin_rpc_port: u16,
pub prometheus_port: u16,
pub observer_command_tx: Sender<ObserverCommand>,
}

pub async fn setup_stacks_chainhook_test(
starting_chain_tip: u64,
redis_seed: Option<(StacksChainhookFullSpecification, PredicateStatus)>,
startup_predicates: Option<Vec<ChainhookFullSpecification>>,
) -> TestSetupResult {
let (
redis_port,
chainhook_service_port,
stacks_rpc_port,
stacks_ingestion_port,
bitcoin_rpc_port,
prometheus_port,
) = setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}"));

let mut redis_process = start_redis(redis_port)
.await
.unwrap_or_else(|e| panic!("test failed with error: {e}"));
flush_redis(redis_port);

let logger = hiro_system_kit::log::setup_logger();
let _guard = hiro_system_kit::log::setup_global_logger(logger.clone());
let ctx = Context {
logger: Some(logger),
tracer: false,
};

if let Some((predicate, status)) = redis_seed {
let client = redis::Client::open(format!("redis://localhost:{redis_port}/"))
.unwrap_or_else(|e| {
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});
let mut connection = client.get_connection().unwrap_or_else(|e| {
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});
let stacks_spec = predicate
.into_selected_network_specification(&StacksNetwork::Devnet)
.unwrap_or_else(|e| {
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});

let spec = ChainhookSpecification::Stacks(stacks_spec);
update_predicate_spec(&spec.key(), &spec, &mut connection, &ctx);
update_predicate_status(&spec.key(), status, &mut connection, &ctx);
}

let (working_dir, tsv_dir) = create_tmp_working_dir().unwrap_or_else(|e| {
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});

write_stacks_blocks_to_tsv(starting_chain_tip, &tsv_dir).unwrap_or_else(|e| {
std::fs::remove_dir_all(&working_dir).unwrap();
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});

let mut config = get_chainhook_config(
redis_port,
chainhook_service_port,
stacks_rpc_port,
stacks_ingestion_port,
bitcoin_rpc_port,
&working_dir,
&tsv_dir,
Some(prometheus_port),
);

consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx)
.await
.unwrap_or_else(|e| {
std::fs::remove_dir_all(&working_dir).unwrap();
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});

let observer_command_tx =
start_chainhook_service(config, chainhook_service_port, startup_predicates, &ctx)
.await
.unwrap_or_else(|e| {
std::fs::remove_dir_all(&working_dir).unwrap();
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});
TestSetupResult {
redis_process,
working_dir,
chainhook_service_port,
redis_port,
stacks_ingestion_port,
stacks_rpc_port,
bitcoin_rpc_port,
prometheus_port,
observer_command_tx,
}
}

pub async fn setup_bitcoin_chainhook_test(starting_chain_tip: u64) -> TestSetupResult {
let (
redis_port,
chainhook_service_port,
stacks_rpc_port,
stacks_ingestion_port,
bitcoin_rpc_port,
prometheus_port,
) = setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}"));

let mut redis_process = start_redis(redis_port)
.await
.unwrap_or_else(|e| panic!("test failed with error: {e}"));

flush_redis(redis_port);
let (working_dir, tsv_dir) = create_tmp_working_dir().unwrap_or_else(|e| {
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});

let logger = hiro_system_kit::log::setup_logger();
let _guard = hiro_system_kit::log::setup_global_logger(logger.clone());
let ctx = Context {
logger: Some(logger),
tracer: false,
};

let _ = hiro_system_kit::thread_named("Bitcoin rpc service")
.spawn(move || {
let future = mock_bitcoin_rpc(bitcoin_rpc_port, starting_chain_tip);
let _ = hiro_system_kit::nestable_block_on(future);
})
.expect("unable to spawn thread");

let config = get_chainhook_config(
redis_port,
chainhook_service_port,
stacks_rpc_port,
stacks_ingestion_port,
bitcoin_rpc_port,
&working_dir,
&tsv_dir,
Some(prometheus_port),
);

let terminator_tx = start_chainhook_service(config, chainhook_service_port, None, &ctx)
.await
.unwrap_or_else(|e| {
std::fs::remove_dir_all(&working_dir).unwrap();
flush_redis(redis_port);
redis_process.kill().unwrap();
panic!("test failed with error: {e}");
});
TestSetupResult {
redis_process,
working_dir,
chainhook_service_port,
redis_port,
stacks_ingestion_port,
stacks_rpc_port,
bitcoin_rpc_port,
prometheus_port,
observer_command_tx: terminator_tx,
}
}

pub fn setup_chainhook_service_ports() -> Result<(u16, u16, u16, u16, u16, u16), String> {
let redis_port = get_free_port()?;
let chainhook_service_port = get_free_port()?;
let stacks_rpc_port = get_free_port()?;
let stacks_ingestion_port = get_free_port()?;
let bitcoin_rpc_port = get_free_port()?;
let prometheus_port = get_free_port()?;
Ok((
redis_port,
chainhook_service_port,
stacks_rpc_port,
stacks_ingestion_port,
bitcoin_rpc_port,
prometheus_port,
))
}
Loading
Loading