diff --git a/components/chainhook-cli/src/cli/mod.rs b/components/chainhook-cli/src/cli/mod.rs index ecb6dc9..3fa1b1a 100644 --- a/components/chainhook-cli/src/cli/mod.rs +++ b/components/chainhook-cli/src/cli/mod.rs @@ -310,7 +310,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> { info!(ctx.expect_logger(), "Starting service...",); let mut service = Service::new(config, ctx); - return service.run(predicates).await; + return service.run(predicates, None).await; } }, Command::Config(subcmd) => match subcmd { diff --git a/components/chainhook-cli/src/service/mod.rs b/components/chainhook-cli/src/service/mod.rs index 5c524b5..1d1df8a 100644 --- a/components/chainhook-cli/src/service/mod.rs +++ b/components/chainhook-cli/src/service/mod.rs @@ -6,7 +6,8 @@ use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv; use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server}; use crate::service::runloops::{start_bitcoin_scan_runloop, start_stacks_scan_runloop}; use crate::storage::{ - confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, open_readwrite_stacks_db_conn, + confirm_entries_in_stacks_blocks, draft_entries_in_stacks_blocks, get_all_unconfirmed_blocks, + open_readonly_stacks_db_conn_with_retry, open_readwrite_stacks_db_conn, }; use chainhook_sdk::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification}; @@ -20,7 +21,7 @@ use chainhook_sdk::types::{Chain, StacksChainEvent}; use chainhook_sdk::utils::Context; use redis::{Commands, Connection}; -use std::sync::mpsc::channel; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::time::{SystemTime, UNIX_EPOCH}; use self::http_api::get_entry_from_predicates_db; @@ -38,6 +39,7 @@ impl Service { pub async fn run( &mut self, predicates_from_startup: Vec, + observer_commands_tx_rx: Option<(Sender, Receiver)>, ) -> Result<(), String> { let mut chainhook_config = ChainhookConfig::new(); @@ -149,7 +151,8 @@ impl Service { } } - let (observer_command_tx, observer_command_rx) = channel(); + let (observer_command_tx, observer_command_rx) = + observer_commands_tx_rx.unwrap_or(channel()); let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded(); // let (ordinal_indexer_command_tx, ordinal_indexer_command_rx) = channel(); @@ -211,6 +214,20 @@ impl Service { }); } + let ctx = self.ctx.clone(); + let stacks_db = + open_readonly_stacks_db_conn_with_retry(&config.expected_cache_path(), 3, &ctx)?; + let unconfirmed_blocks = match get_all_unconfirmed_blocks(&stacks_db, &ctx) { + Ok(blocks) => Some(blocks), + Err(e) => { + info!( + self.ctx.expect_logger(), + "Failed to get stacks blocks from db to seed block pool: {}", e + ); + None + } + }; + let observer_event_tx_moved = observer_event_tx.clone(); let moved_observer_command_tx = observer_command_tx.clone(); let _ = start_event_observer( @@ -219,6 +236,7 @@ impl Service { observer_command_rx, Some(observer_event_tx_moved), None, + unconfirmed_blocks, self.ctx.clone(), ); diff --git a/components/chainhook-cli/src/service/tests/helpers/mock_service.rs b/components/chainhook-cli/src/service/tests/helpers/mock_service.rs index 890f2ec..55ae3e8 100644 --- a/components/chainhook-cli/src/service/tests/helpers/mock_service.rs +++ b/components/chainhook-cli/src/service/tests/helpers/mock_service.rs @@ -1,24 +1,21 @@ -use crate::config::Config; -use crate::config::EventSourceConfig; -use crate::config::LimitsConfig; -use crate::config::MonitoringConfig; -use crate::config::PathConfig; -use crate::config::PredicatesApi; -use crate::config::PredicatesApiConfig; -use crate::config::StorageConfig; -use crate::config::DEFAULT_REDIS_URI; -use crate::service::http_api::start_predicate_api_server; -use crate::service::PredicateStatus; -use crate::service::Service; -use chainhook_sdk::chainhooks::types::ChainhookFullSpecification; -use chainhook_sdk::indexer::IndexerConfig; -use chainhook_sdk::observer::ObserverCommand; -use chainhook_sdk::types::BitcoinBlockSignaling; -use chainhook_sdk::types::BitcoinNetwork; -use chainhook_sdk::types::Chain; -use chainhook_sdk::types::StacksNetwork; -use chainhook_sdk::types::StacksNodeConfig; -use chainhook_sdk::utils::Context; +use crate::config::{ + Config, EventSourceConfig, LimitsConfig, MonitoringConfig, PathConfig, PredicatesApi, + PredicatesApiConfig, StorageConfig, DEFAULT_REDIS_URI, +}; +use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv; +use crate::service::{ + http_api::start_predicate_api_server, update_predicate_spec, update_predicate_status, + PredicateStatus, Service, +}; +use chainhook_sdk::{ + chainhooks::types::{ + ChainhookFullSpecification, ChainhookSpecification, StacksChainhookFullSpecification, + }, + indexer::IndexerConfig, + observer::ObserverCommand, + types::{BitcoinBlockSignaling, BitcoinNetwork, Chain, StacksNetwork, StacksNodeConfig}, + utils::Context, +}; use redis::Commands; use reqwest::Method; use rocket::serde::json::Value as JsonValue; @@ -26,8 +23,15 @@ use rocket::Shutdown; use std::path::PathBuf; use std::process::Stdio; use std::process::{Child, Command}; +use std::sync::mpsc; use std::sync::mpsc::channel; use std::sync::mpsc::Receiver; +use std::sync::mpsc::Sender; + +use super::get_free_port; +use super::mock_bitcoin_rpc::mock_bitcoin_rpc; +use super::mock_stacks_node::create_tmp_working_dir; +use super::mock_stacks_node::write_stacks_blocks_to_tsv; pub async fn get_predicate_status(uuid: &str, port: u16) -> Result { let mut attempts = 0; @@ -328,14 +332,19 @@ pub fn get_chainhook_config( pub async fn start_chainhook_service( config: Config, - chainhook_port: u16, + ping_startup_port: u16, startup_predicates: Option>, ctx: &Context, -) -> Result<(), String> { +) -> Result, String> { let mut service = Service::new(config, ctx.clone()); + let (observer_command_tx, observer_command_rx) = mpsc::channel(); + let moved_observer_command_tx = observer_command_tx.clone(); let _ = hiro_system_kit::thread_named("Chainhook service") .spawn(move || { - let future = service.run(startup_predicates.unwrap_or(vec![])); + let future = service.run( + startup_predicates.unwrap_or(vec![]), + Some((moved_observer_command_tx, observer_command_rx)), + ); let _ = hiro_system_kit::nestable_block_on(future); }) .map_err(|e| { @@ -354,14 +363,216 @@ pub async fn start_chainhook_service( } if let Ok(_client) = reqwest::Client::new() - .get(format!("http://localhost:{}/ping", chainhook_port)) + .get(format!("http://localhost:{}/ping", ping_startup_port)) .send() .await { - break Ok(()); // Server is ready + break Ok(observer_command_tx); // Server is ready } tokio::time::sleep(std::time::Duration::from_secs(1)).await; attempts += 1; } } + +pub struct TestSetupResult { + pub redis_process: Child, + pub working_dir: String, + pub chainhook_service_port: u16, + pub redis_port: u16, + pub stacks_ingestion_port: u16, + pub stacks_rpc_port: u16, + pub bitcoin_rpc_port: u16, + pub prometheus_port: u16, + pub observer_command_tx: Sender, +} + +pub async fn setup_stacks_chainhook_test( + starting_chain_tip: u64, + redis_seed: Option<(StacksChainhookFullSpecification, PredicateStatus)>, + startup_predicates: Option>, +) -> TestSetupResult { + let ( + redis_port, + chainhook_service_port, + stacks_rpc_port, + stacks_ingestion_port, + bitcoin_rpc_port, + prometheus_port, + ) = setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}")); + + let mut redis_process = start_redis(redis_port) + .await + .unwrap_or_else(|e| panic!("test failed with error: {e}")); + flush_redis(redis_port); + + let logger = hiro_system_kit::log::setup_logger(); + let _guard = hiro_system_kit::log::setup_global_logger(logger.clone()); + let ctx = Context { + logger: Some(logger), + tracer: false, + }; + + if let Some((predicate, status)) = redis_seed { + let client = redis::Client::open(format!("redis://localhost:{redis_port}/")) + .unwrap_or_else(|e| { + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + let mut connection = client.get_connection().unwrap_or_else(|e| { + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + let stacks_spec = predicate + .into_selected_network_specification(&StacksNetwork::Devnet) + .unwrap_or_else(|e| { + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + + let spec = ChainhookSpecification::Stacks(stacks_spec); + update_predicate_spec(&spec.key(), &spec, &mut connection, &ctx); + update_predicate_status(&spec.key(), status, &mut connection, &ctx); + } + + let (working_dir, tsv_dir) = create_tmp_working_dir().unwrap_or_else(|e| { + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + + write_stacks_blocks_to_tsv(starting_chain_tip, &tsv_dir).unwrap_or_else(|e| { + std::fs::remove_dir_all(&working_dir).unwrap(); + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + + let mut config = get_chainhook_config( + redis_port, + chainhook_service_port, + stacks_rpc_port, + stacks_ingestion_port, + bitcoin_rpc_port, + &working_dir, + &tsv_dir, + Some(prometheus_port), + ); + + consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx) + .await + .unwrap_or_else(|e| { + std::fs::remove_dir_all(&working_dir).unwrap(); + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + + let observer_command_tx = + start_chainhook_service(config, chainhook_service_port, startup_predicates, &ctx) + .await + .unwrap_or_else(|e| { + std::fs::remove_dir_all(&working_dir).unwrap(); + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + TestSetupResult { + redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port, + stacks_rpc_port, + bitcoin_rpc_port, + prometheus_port, + observer_command_tx, + } +} + +pub async fn setup_bitcoin_chainhook_test(starting_chain_tip: u64) -> TestSetupResult { + let ( + redis_port, + chainhook_service_port, + stacks_rpc_port, + stacks_ingestion_port, + bitcoin_rpc_port, + prometheus_port, + ) = setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}")); + + let mut redis_process = start_redis(redis_port) + .await + .unwrap_or_else(|e| panic!("test failed with error: {e}")); + + flush_redis(redis_port); + let (working_dir, tsv_dir) = create_tmp_working_dir().unwrap_or_else(|e| { + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + + let logger = hiro_system_kit::log::setup_logger(); + let _guard = hiro_system_kit::log::setup_global_logger(logger.clone()); + let ctx = Context { + logger: Some(logger), + tracer: false, + }; + + let _ = hiro_system_kit::thread_named("Bitcoin rpc service") + .spawn(move || { + let future = mock_bitcoin_rpc(bitcoin_rpc_port, starting_chain_tip); + let _ = hiro_system_kit::nestable_block_on(future); + }) + .expect("unable to spawn thread"); + + let config = get_chainhook_config( + redis_port, + chainhook_service_port, + stacks_rpc_port, + stacks_ingestion_port, + bitcoin_rpc_port, + &working_dir, + &tsv_dir, + Some(prometheus_port), + ); + + let terminator_tx = start_chainhook_service(config, chainhook_service_port, None, &ctx) + .await + .unwrap_or_else(|e| { + std::fs::remove_dir_all(&working_dir).unwrap(); + flush_redis(redis_port); + redis_process.kill().unwrap(); + panic!("test failed with error: {e}"); + }); + TestSetupResult { + redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port, + stacks_rpc_port, + bitcoin_rpc_port, + prometheus_port, + observer_command_tx: terminator_tx, + } +} + +pub fn setup_chainhook_service_ports() -> Result<(u16, u16, u16, u16, u16, u16), String> { + let redis_port = get_free_port()?; + let chainhook_service_port = get_free_port()?; + let stacks_rpc_port = get_free_port()?; + let stacks_ingestion_port = get_free_port()?; + let bitcoin_rpc_port = get_free_port()?; + let prometheus_port = get_free_port()?; + Ok(( + redis_port, + chainhook_service_port, + stacks_rpc_port, + stacks_ingestion_port, + bitcoin_rpc_port, + prometheus_port, + )) +} diff --git a/components/chainhook-cli/src/service/tests/helpers/mock_stacks_node.rs b/components/chainhook-cli/src/service/tests/helpers/mock_stacks_node.rs index ef7c32a..6d03534 100644 --- a/components/chainhook-cli/src/service/tests/helpers/mock_stacks_node.rs +++ b/components/chainhook-cli/src/service/tests/helpers/mock_stacks_node.rs @@ -8,7 +8,7 @@ use chainhook_sdk::types::{ STXTransferEventData, SmartContractEventData, StacksTransactionEventPayload, }; -use super::{branch_and_height_to_prefixed_hash, height_to_prefixed_hash}; +use super::{branch_and_height_to_prefixed_hash, make_block_hash}; pub const TEST_WORKING_DIR: &str = "src/service/tests/fixtures/tmp"; @@ -128,12 +128,19 @@ fn create_stacks_new_transaction(index: u64) -> NewTransaction { } } -pub fn create_stacks_new_block(height: u64, burn_block_height: u64) -> NewBlock { - let parent_height = if height == 0 { 0 } else { height - 1 }; - let parent_burn_block_height = if burn_block_height == 0 { - 0 - } else { - burn_block_height - 1 +pub fn create_stacks_new_block( + fork_id: u8, + height: u64, + parent_fork_id: u8, + burn_block_height: u64, +) -> NewBlock { + let parent_height = match height { + 0 => 0, + _ => height - 1, + }; + let parent_burn_block_height = match burn_block_height { + 0 => 0, + _ => burn_block_height - 1, }; let mut events = vec![]; @@ -238,16 +245,16 @@ pub fn create_stacks_new_block(height: u64, burn_block_height: u64) -> NewBlock )); NewBlock { block_height: height, - block_hash: height_to_prefixed_hash(height), - index_block_hash: height_to_prefixed_hash(height), + block_hash: make_block_hash(fork_id, height), + index_block_hash: make_block_hash(fork_id, height), burn_block_height: burn_block_height, - burn_block_hash: height_to_prefixed_hash(burn_block_height), - parent_block_hash: height_to_prefixed_hash(parent_height), - parent_index_block_hash: height_to_prefixed_hash(parent_height), + burn_block_hash: make_block_hash(0, burn_block_height), + parent_block_hash: make_block_hash(parent_fork_id, parent_height), + parent_index_block_hash: make_block_hash(parent_fork_id, parent_height), parent_microblock: "0x0000000000000000000000000000000000000000000000000000000000000000" .into(), parent_microblock_sequence: 0, - parent_burn_block_hash: height_to_prefixed_hash(parent_burn_block_height), + parent_burn_block_hash: make_block_hash(0, parent_burn_block_height), parent_burn_block_height: burn_block_height, parent_burn_block_timestamp: 0, transactions: (0..4).map(|i| create_stacks_new_transaction(i)).collect(), @@ -257,10 +264,12 @@ pub fn create_stacks_new_block(height: u64, burn_block_height: u64) -> NewBlock } fn create_stacks_block_received_record( + fork_id: u8, height: u64, + parent_fork_id: u8, burn_block_height: u64, ) -> Result { - let block = create_stacks_new_block(height, burn_block_height); + let block = create_stacks_new_block(fork_id, height, parent_fork_id, burn_block_height); let serialized_block = serde_json::to_string(&block) .map_err(|e| format!("failed to serialize stacks block: {}", e.to_string()))?; Ok(Record { @@ -281,7 +290,7 @@ pub fn write_stacks_blocks_to_tsv(block_count: u64, dir: &str) -> Result<(), Str .expect("unable to create csv writer"); for i in 1..block_count + 1 { writer - .serialize(create_stacks_block_received_record(i, i + 100)?) + .serialize(create_stacks_block_received_record(0, i, 0, i + 100)?) .map_err(|e| format!("failed to write tsv file: {}", e.to_string()))?; } Ok(()) @@ -289,10 +298,12 @@ pub fn write_stacks_blocks_to_tsv(block_count: u64, dir: &str) -> Result<(), Str pub async fn mine_stacks_block( port: u16, + fork_id: u8, height: u64, + parent_fork_id: u8, burn_block_height: u64, ) -> Result<(), String> { - let block = create_stacks_new_block(height, burn_block_height); + let block = create_stacks_new_block(fork_id, height, parent_fork_id, burn_block_height); let serialized_block = serde_json::to_string(&block).unwrap(); let client = reqwest::Client::new(); let _res = client diff --git a/components/chainhook-cli/src/service/tests/helpers/mod.rs b/components/chainhook-cli/src/service/tests/helpers/mod.rs index 76e186e..0dd9b9c 100644 --- a/components/chainhook-cli/src/service/tests/helpers/mod.rs +++ b/components/chainhook-cli/src/service/tests/helpers/mod.rs @@ -5,11 +5,13 @@ pub mod mock_bitcoin_rpc; pub mod mock_service; pub mod mock_stacks_node; -pub fn height_to_prefixed_hash(height: u64) -> String { - format!("0x{}", height_to_hash_str(height)) -} -fn height_to_hash_str(height: u64) -> String { - format!("{:0>64}", height.to_string()) +pub fn make_block_hash(fork_id: u8, block_height: u64) -> String { + #![cfg_attr(rustfmt, rustfmt_skip)] + let mut hash = vec![ + fork_id, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ]; + hash.append(&mut block_height.to_be_bytes().to_vec()); + hex::encode(&hash[..]) } pub fn branch_and_height_to_prefixed_hash(branch: Option, height: u64) -> String { diff --git a/components/chainhook-cli/src/service/tests/mod.rs b/components/chainhook-cli/src/service/tests/mod.rs index 5a98537..8d44c4f 100644 --- a/components/chainhook-cli/src/service/tests/mod.rs +++ b/components/chainhook-cli/src/service/tests/mod.rs @@ -1,7 +1,5 @@ -use chainhook_sdk::chainhooks::types::{ - ChainhookFullSpecification, ChainhookSpecification, StacksChainhookFullSpecification, -}; -use chainhook_sdk::types::{Chain, StacksNetwork}; +use chainhook_sdk::chainhooks::types::ChainhookFullSpecification; +use chainhook_sdk::types::Chain; use chainhook_sdk::utils::Context; use rocket::serde::json::Value as JsonValue; use rocket::Shutdown; @@ -16,26 +14,23 @@ use test_case::test_case; use chainhook_sdk::observer::ObserverCommand; use self::helpers::build_predicates::{build_bitcoin_payload, build_stacks_payload, DEFAULT_UUID}; -use self::helpers::mock_bitcoin_rpc::mock_bitcoin_rpc; use self::helpers::mock_service::{ call_deregister_predicate, filter_predicate_status_from_all_predicates, flush_redis, - start_chainhook_service, start_redis, -}; -use self::helpers::mock_stacks_node::{ - create_tmp_working_dir, mine_burn_block, mine_stacks_block, write_stacks_blocks_to_tsv, + start_chainhook_service, }; -use crate::scan::stacks::consolidate_local_stacks_chainstate_using_csv; +use self::helpers::mock_stacks_node::{mine_burn_block, mine_stacks_block}; +use crate::config::PredicatesApi; use crate::service::tests::helpers::build_predicates::get_random_uuid; -use crate::service::tests::helpers::get_free_port; use crate::service::tests::helpers::mock_service::{ - build_predicate_api_server, call_get_predicate, call_register_predicate, get_chainhook_config, - get_predicate_status, + build_predicate_api_server, call_get_predicate, call_ping, call_register_predicate, + get_chainhook_config, get_predicate_status, setup_bitcoin_chainhook_test, + setup_stacks_chainhook_test, TestSetupResult, }; use crate::service::tests::helpers::mock_stacks_node::create_burn_fork_at; use crate::service::{PredicateStatus, PredicateStatus::*, ScanningData, StreamingData}; +use crate::storage::{get_all_unconfirmed_blocks, open_readonly_stacks_db_conn}; use super::http_api::document_predicate_api_server; -use super::{update_predicate_spec, update_predicate_status}; pub mod helpers; mod observer_tests; @@ -346,23 +341,6 @@ fn _assert_interrupted_status((status, _, _): (PredicateStatus, Option, Opt } } -fn setup_chainhook_service_ports() -> Result<(u16, u16, u16, u16, u16, u16), String> { - let redis_port = get_free_port()?; - let chainhook_service_port = get_free_port()?; - let stacks_rpc_port = get_free_port()?; - let stacks_ingestion_port = get_free_port()?; - let bitcoin_rpc_port = get_free_port()?; - let prometheus_port = get_free_port()?; - Ok(( - redis_port, - chainhook_service_port, - stacks_rpc_port, - stacks_ingestion_port, - bitcoin_rpc_port, - prometheus_port, - )) -} - async fn await_new_scanning_status_complete( uuid: &str, chainhook_service_port: u16, @@ -381,110 +359,6 @@ async fn await_new_scanning_status_complete( } } } - -async fn setup_stacks_chainhook_test( - starting_chain_tip: u64, - redis_seed: Option<(StacksChainhookFullSpecification, PredicateStatus)>, - startup_predicates: Option>, -) -> (Child, String, u16, u16, u16, u16, u16) { - let ( - redis_port, - chainhook_service_port, - stacks_rpc_port, - stacks_ingestion_port, - bitcoin_rpc_port, - prometheus_port, - ) = setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}")); - - let mut redis_process = start_redis(redis_port) - .await - .unwrap_or_else(|e| panic!("test failed with error: {e}")); - flush_redis(redis_port); - - let logger = hiro_system_kit::log::setup_logger(); - let _guard = hiro_system_kit::log::setup_global_logger(logger.clone()); - let ctx = Context { - logger: Some(logger), - tracer: false, - }; - - if let Some((predicate, status)) = redis_seed { - let client = redis::Client::open(format!("redis://localhost:{redis_port}/")) - .unwrap_or_else(|e| { - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - let mut connection = client.get_connection().unwrap_or_else(|e| { - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - let stacks_spec = predicate - .into_selected_network_specification(&StacksNetwork::Devnet) - .unwrap_or_else(|e| { - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - - let spec = ChainhookSpecification::Stacks(stacks_spec); - update_predicate_spec(&spec.key(), &spec, &mut connection, &ctx); - update_predicate_status(&spec.key(), status, &mut connection, &ctx); - } - - let (working_dir, tsv_dir) = create_tmp_working_dir().unwrap_or_else(|e| { - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - - write_stacks_blocks_to_tsv(starting_chain_tip, &tsv_dir).unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - - let mut config = get_chainhook_config( - redis_port, - chainhook_service_port, - stacks_rpc_port, - stacks_ingestion_port, - bitcoin_rpc_port, - &working_dir, - &tsv_dir, - Some(prometheus_port), - ); - - consolidate_local_stacks_chainstate_using_csv(&mut config, &ctx) - .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - - start_chainhook_service(config, chainhook_service_port, startup_predicates, &ctx) - .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - ( - redis_process, - working_dir, - chainhook_service_port, - redis_port, - stacks_ingestion_port, - bitcoin_rpc_port, - prometheus_port, - ) -} - #[test_case(5, 0, Some(1), Some(3), Some(3), Some(3) => using assert_confirmed_expiration_status; "predicate_end_block lower than starting_chain_tip ends with ConfirmedExpiration status")] #[test_case(5, 0, Some(1), None, Some(5), Some(5) => using assert_streaming_status; "no predicate_end_block ends with Streaming status")] #[test_case(3, 0, Some(1), Some(5), Some(3), Some(3) => using assert_streaming_status; "predicate_end_block greater than chain_tip ends with Streaming status")] @@ -502,15 +376,17 @@ async fn test_stacks_predicate_status_is_updated( expected_evaluations: Option, expected_occurrences: Option, ) -> (PredicateStatus, Option, Option) { - let ( + let TestSetupResult { mut redis_process, working_dir, chainhook_service_port, redis_port, stacks_ingestion_port, - _, - _, - ) = setup_stacks_chainhook_test(starting_chain_tip, None, None).await; + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port: _, + observer_command_tx: _, + } = setup_stacks_chainhook_test(starting_chain_tip, None, None).await; let uuid = &get_random_uuid(); let predicate = build_stacks_payload( @@ -522,130 +398,43 @@ async fn test_stacks_predicate_status_is_updated( ); let _ = call_register_predicate(&predicate, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); await_new_scanning_status_complete(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); for i in 1..blocks_to_mine + 1 { mine_stacks_block( stacks_ingestion_port, + 0, i + starting_chain_tip, + 0, i + starting_chain_tip + 100, ) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); } sleep(Duration::new(2, 0)); let result = get_predicate_status(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); let found_predicate_status = filter_predicate_status_from_all_predicates(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - assert_eq!(found_predicate_status, result); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); + assert_eq!(found_predicate_status, result); (result, expected_evaluations, expected_occurrences) } -async fn setup_bitcoin_chainhook_test( - starting_chain_tip: u64, -) -> (Child, String, u16, u16, u16, u16, u16) { - let ( - redis_port, - chainhook_service_port, - stacks_rpc_port, - stacks_ingestion_port, - bitcoin_rpc_port, - prometheus_port, - ) = setup_chainhook_service_ports().unwrap_or_else(|e| panic!("test failed with error: {e}")); - - let mut redis_process = start_redis(redis_port) - .await - .unwrap_or_else(|e| panic!("test failed with error: {e}")); - - flush_redis(redis_port); - let (working_dir, tsv_dir) = create_tmp_working_dir().unwrap_or_else(|e| { - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - - let logger = hiro_system_kit::log::setup_logger(); - let _guard = hiro_system_kit::log::setup_global_logger(logger.clone()); - let ctx = Context { - logger: Some(logger), - tracer: false, - }; - - let _ = hiro_system_kit::thread_named("Bitcoin rpc service") - .spawn(move || { - let future = mock_bitcoin_rpc(bitcoin_rpc_port, starting_chain_tip); - let _ = hiro_system_kit::nestable_block_on(future); - }) - .expect("unable to spawn thread"); - - let config = get_chainhook_config( - redis_port, - chainhook_service_port, - stacks_rpc_port, - stacks_ingestion_port, - bitcoin_rpc_port, - &working_dir, - &tsv_dir, - Some(prometheus_port), - ); - - start_chainhook_service(config, chainhook_service_port, None, &ctx) - .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - ( - redis_process, - working_dir, - chainhook_service_port, - redis_port, - stacks_ingestion_port, - bitcoin_rpc_port, - prometheus_port, - ) -} - #[test_case(5, 1, Some(1), Some(3), Some(3), Some(3) => using assert_unconfirmed_expiration_status; "predicate_end_block lower than starting_chain_tip with predicate_end_block confirmations < CONFIRMED_SEGMENT_MINIMUM_LENGTH ends with UnconfirmedExpiration status")] #[test_case(10, 1, Some(1), Some(3), Some(3), Some(3) => using assert_confirmed_expiration_status; "predicate_end_block lower than starting_chain_tip with predicate_end_block confirmations >= CONFIRMED_SEGMENT_MINIMUM_LENGTH ends with ConfirmedExpiration status")] #[test_case(1, 3, Some(1), Some(3), Some(4), Some(3) => using assert_unconfirmed_expiration_status; "predicate_end_block greater than starting_chain_tip and mining blocks so that predicate_end_block confirmations < CONFIRMED_SEGMENT_MINIMUM_LENGTH ends with UnconfirmedExpiration status")] @@ -661,15 +450,17 @@ async fn test_bitcoin_predicate_status_is_updated( expected_evaluations: Option, expected_occurrences: Option, ) -> (PredicateStatus, Option, Option) { - let ( + let TestSetupResult { mut redis_process, working_dir, chainhook_service_port, redis_port, stacks_ingestion_port, + stacks_rpc_port: _, bitcoin_rpc_port, - _, - ) = setup_bitcoin_chainhook_test(starting_chain_tip).await; + prometheus_port: _, + observer_command_tx: _, + } = setup_bitcoin_chainhook_test(starting_chain_tip).await; let uuid = &get_random_uuid(); let predicate = build_bitcoin_payload( @@ -684,21 +475,13 @@ async fn test_bitcoin_predicate_status_is_updated( let _ = call_register_predicate(&predicate, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); await_new_scanning_status_complete(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); for i in 1..blocks_to_mine + 1 { mine_burn_block( @@ -708,36 +491,22 @@ async fn test_bitcoin_predicate_status_is_updated( i + starting_chain_tip, ) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); } sleep(Duration::new(2, 0)); let result = get_predicate_status(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); let found_predicate_status = filter_predicate_status_from_all_predicates(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - assert_eq!(found_predicate_status, result); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); + assert_eq!(found_predicate_status, result); (result, expected_evaluations, expected_occurrences) } @@ -758,17 +527,19 @@ async fn test_bitcoin_predicate_status_is_updated_with_reorg( fork_blocks_to_mine: u64, predicate_start_block: Option, predicate_end_block: Option, -) { +) -> Result<(), String> { let starting_chain_tip = 0; - let ( + let TestSetupResult { mut redis_process, working_dir, chainhook_service_port, redis_port, stacks_ingestion_port, + stacks_rpc_port: _, bitcoin_rpc_port, - _, - ) = setup_bitcoin_chainhook_test(starting_chain_tip).await; + prometheus_port: _, + observer_command_tx: _, + } = setup_bitcoin_chainhook_test(starting_chain_tip).await; let uuid = &get_random_uuid(); let predicate = build_bitcoin_payload( @@ -783,12 +554,7 @@ async fn test_bitcoin_predicate_status_is_updated_with_reorg( let _ = call_register_predicate(&predicate, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; let genesis_branch_key = '0'; let first_block_mined_height = starting_chain_tip + 1; @@ -801,23 +567,13 @@ async fn test_bitcoin_predicate_status_is_updated_with_reorg( block_height, ) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; } sleep(Duration::new(2, 0)); let status = get_predicate_status(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; assert_streaming_status((status, None, None)); let branch_key = '1'; @@ -831,12 +587,7 @@ async fn test_bitcoin_predicate_status_is_updated_with_reorg( fork_point, ) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; let reorg_point = last_block_mined_height + 1; let first_fork_block_mined_height = first_fork_block_mined_height + 1; @@ -850,22 +601,12 @@ async fn test_bitcoin_predicate_status_is_updated_with_reorg( block_height, ) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; if block_height == reorg_point { sleep(Duration::new(2, 0)); let status = get_predicate_status(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; assert_streaming_status((status, None, None)); } } @@ -873,27 +614,29 @@ async fn test_bitcoin_predicate_status_is_updated_with_reorg( sleep(Duration::new(2, 0)); let status = get_predicate_status(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; + cleanup(&working_dir, redis_port, &mut redis_process); assert_confirmed_expiration_status((status, None, None)); - - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + Ok(()) } #[test_case(Chain::Stacks; "for stacks chain")] #[test_case(Chain::Bitcoin; "for bitcoin chain")] #[tokio::test] #[cfg_attr(not(feature = "redis_tests"), ignore)] -async fn test_deregister_predicate(chain: Chain) { - let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, _) = match &chain - { +async fn test_deregister_predicate(chain: Chain) -> Result<(), String> { + let TestSetupResult { + mut redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port: _, + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port: _, + observer_command_tx: _, + } = match &chain { Chain::Stacks => setup_stacks_chainhook_test(3, None, None).await, Chain::Bitcoin => setup_bitcoin_chainhook_test(3).await, }; @@ -919,49 +662,27 @@ async fn test_deregister_predicate(chain: Chain) { let _ = call_register_predicate(&predicate, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; let result = call_get_predicate(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; assert_eq!(result.get("status"), Some(&json!(200))); let result = call_deregister_predicate(&chain, uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; assert_eq!(result.get("status"), Some(&json!(200))); let mut attempts = 0; loop { let result = call_get_predicate(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; if result.get("status") == Some(&json!(404)) { break; } else if attempts == 3 { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); panic!("predicate was not successfully derigistered"); } else { attempts += 1; @@ -969,9 +690,8 @@ async fn test_deregister_predicate(chain: Chain) { } } - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); + Ok(()) } #[test_case(New, 6 => using assert_confirmed_expiration_status; "preloaded predicate with new status should get scanned until completion")] @@ -1013,38 +733,37 @@ async fn test_restarting_with_saved_predicates( let predicate = serde_json::from_value(predicate).expect("failed to set up stacks chanhook spec for test"); - let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, _) = - setup_stacks_chainhook_test(starting_chain_tip, Some((predicate, starting_status)), None) - .await; + let TestSetupResult { + mut redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port: _, + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port: _, + observer_command_tx: _, + } = setup_stacks_chainhook_test(starting_chain_tip, Some((predicate, starting_status)), None) + .await; await_new_scanning_status_complete(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); sleep(Duration::new(2, 0)); let result = get_predicate_status(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process)) + .unwrap(); - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); (result, None, None) } #[tokio::test] #[cfg_attr(not(feature = "redis_tests"), ignore)] -async fn it_allows_specifying_startup_predicate() { +async fn it_allows_specifying_startup_predicate() -> Result<(), String> { let uuid = &get_random_uuid(); let predicate = build_stacks_payload( Some("devnet"), @@ -1056,37 +775,35 @@ async fn it_allows_specifying_startup_predicate() { let predicate = serde_json::from_value(predicate).expect("failed to set up stacks chanhook spec for test"); let startup_predicate = ChainhookFullSpecification::Stacks(predicate); - let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, _) = - setup_stacks_chainhook_test(3, None, Some(vec![startup_predicate])).await; + let TestSetupResult { + mut redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port: _, + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port: _, + observer_command_tx: _, + } = setup_stacks_chainhook_test(3, None, Some(vec![startup_predicate])).await; await_new_scanning_status_complete(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; sleep(Duration::new(2, 0)); let result = get_predicate_status(uuid, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); assert_confirmed_expiration_status((result, None, None)); + Ok(()) } #[tokio::test] #[cfg_attr(not(feature = "redis_tests"), ignore)] -async fn register_predicate_responds_409_if_uuid_in_use() { +async fn register_predicate_responds_409_if_uuid_in_use() -> Result<(), String> { let uuid = &get_random_uuid(); let predicate = build_stacks_payload( Some("devnet"), @@ -1099,22 +816,25 @@ async fn register_predicate_responds_409_if_uuid_in_use() { .expect("failed to set up stacks chanhook spec for test"); let startup_predicate = ChainhookFullSpecification::Stacks(stacks_spec); - let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, _) = - setup_stacks_chainhook_test(3, None, Some(vec![startup_predicate])).await; + let TestSetupResult { + mut redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port: _, + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port: _, + observer_command_tx: _, + } = setup_stacks_chainhook_test(3, None, Some(vec![startup_predicate])).await; let result = call_register_predicate(&predicate, chainhook_service_port) .await - .unwrap_or_else(|e| { - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); - panic!("test failed with error: {e}"); - }); - assert_eq!(result.get("status"), Some(&json!(409))); + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; - std::fs::remove_dir_all(&working_dir).unwrap(); - flush_redis(redis_port); - redis_process.kill().unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); + assert_eq!(result.get("status"), Some(&json!(409))); + Ok(()) } #[test] @@ -1130,3 +850,161 @@ fn it_generates_open_api_spec() { "breaking change detected: open api spec has been updated" ) } + +#[tokio::test] +#[cfg_attr(not(feature = "redis_tests"), ignore)] +async fn it_seeds_block_pool_on_startup() -> Result<(), String> { + let starting_chain_tip = 3; + let TestSetupResult { + mut redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port, + stacks_rpc_port, + bitcoin_rpc_port, + prometheus_port: _, + observer_command_tx, + } = setup_stacks_chainhook_test(starting_chain_tip, None, None).await; + + let blocks_to_mine = 4; + for i in 1..blocks_to_mine + 1 { + mine_stacks_block( + stacks_ingestion_port, + 0, + i + starting_chain_tip, + 0, + i + starting_chain_tip + 100, + ) + .await + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; + } + // we need these blocks to propagate through new stacks block events and save to the db, so give it some time + sleep(Duration::new(1, 0)); + + let logger = hiro_system_kit::log::setup_logger(); + let _guard = hiro_system_kit::log::setup_global_logger(logger.clone()); + let ctx = Context { + logger: Some(logger), + tracer: false, + }; + let db_path = { + let mut destination_path = PathBuf::new(); + destination_path.push(&working_dir); + destination_path + }; + let stacks_db = open_readonly_stacks_db_conn(&db_path, &ctx).expect("unable to read stacks_db"); + // validate that all blocks we just mined are saved as unconfirmed blocks in the database + let unconfirmed_blocks = get_all_unconfirmed_blocks(&stacks_db, &ctx) + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; + let mut unconfirmed_height = starting_chain_tip + 1; + assert_eq!( + blocks_to_mine, + unconfirmed_blocks.len() as u64, + "Number of blocks left unconfirmed in db is not what expected. Expected: {}, Actual: {}", + blocks_to_mine, + unconfirmed_blocks.len() + ); + for block in unconfirmed_blocks.iter() { + assert_eq!( + unconfirmed_height, block.block_identifier.index, + "Unexpected unconfirmed block height. Expected: {}, Actual: {}", + unconfirmed_height, block.block_identifier.index + ); + unconfirmed_height += 1; + } + // terminate chainhook service + let _ = observer_command_tx.send(ObserverCommand::Terminate); + sleep(Duration::new(1, 0)); + let tsv_dir = format!("./{working_dir}/stacks_blocks.tsv"); + let mut config = get_chainhook_config( + redis_port, + chainhook_service_port, + stacks_rpc_port, + stacks_ingestion_port, + bitcoin_rpc_port, + &working_dir, + &tsv_dir, + None, + ); + // the API is still running, so don't restart it + config.http_api = PredicatesApi::Off; + let _ = start_chainhook_service(config, stacks_ingestion_port, None, &ctx).await; + // validate that all of the unconfirmed blocks we just saved are still available after a restart + let unconfirmed_blocks = get_all_unconfirmed_blocks(&stacks_db, &ctx).unwrap(); + let mut unconfirmed_height = starting_chain_tip + 1; + assert_eq!( + blocks_to_mine, + unconfirmed_blocks.len() as u64, + "Number of blocks left unconfirmed in db is not what expected. Expected: {}, Actual: {}", + blocks_to_mine, + unconfirmed_blocks.len() + ); + for block in unconfirmed_blocks.iter() { + assert_eq!( + unconfirmed_height, block.block_identifier.index, + "Unexpected unconfirmed block height. Expected: {}, Actual: {}", + unconfirmed_height, block.block_identifier.index + ); + unconfirmed_height += 1; + } + // mine a block on that same fork + let next_block_height = blocks_to_mine + starting_chain_tip + 1; + mine_stacks_block( + stacks_ingestion_port, + 0, + next_block_height, + 0, + next_block_height + 100, + ) + .await + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; + + // mine the same block number we just mined, but on a different fork + mine_stacks_block( + stacks_ingestion_port, + 1, + next_block_height, + 0, + next_block_height + 100, + ) + .await + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; + + sleep(Duration::new(1, 0)); + // confirm that there was a reorg + let metrics = call_ping(stacks_ingestion_port) + .await + .map_err(|e| cleanup_err(e, &working_dir, redis_port, &mut redis_process))?; + let stacks_last_reorg_data = metrics.get("stacks").unwrap().get("last_reorg").unwrap(); + let applied_blocks = stacks_last_reorg_data + .get("applied_blocks") + .unwrap() + .as_u64() + .unwrap(); + let rolled_back_blocks = stacks_last_reorg_data + .get("rolled_back_blocks") + .unwrap() + .as_u64() + .unwrap(); + cleanup(&working_dir, redis_port, &mut redis_process); + assert_eq!(applied_blocks, 1); + assert_eq!(rolled_back_blocks, 1); + Ok(()) +} + +fn cleanup_err( + error: String, + working_dir: &str, + redis_port: u16, + redis_process: &mut Child, +) -> String { + cleanup(working_dir, redis_port, redis_process); + format!("test failed with error: {error}") +} + +fn cleanup(working_dir: &str, redis_port: u16, redis_process: &mut Child) { + std::fs::remove_dir_all(&working_dir).unwrap(); + flush_redis(redis_port); + redis_process.kill().unwrap(); +} diff --git a/components/chainhook-cli/src/service/tests/observer_tests.rs b/components/chainhook-cli/src/service/tests/observer_tests.rs index f094feb..ae00f3c 100644 --- a/components/chainhook-cli/src/service/tests/observer_tests.rs +++ b/components/chainhook-cli/src/service/tests/observer_tests.rs @@ -14,6 +14,7 @@ use crate::service::tests::{ build_predicates::build_stacks_payload, mock_service::{ call_observer_svc, call_ping, call_prometheus, call_register_predicate, flush_redis, + TestSetupResult, }, }, setup_bitcoin_chainhook_test, setup_stacks_chainhook_test, @@ -26,15 +27,17 @@ use super::helpers::{ #[tokio::test] #[cfg_attr(not(feature = "redis_tests"), ignore)] async fn ping_endpoint_returns_metrics() { - let ( + let TestSetupResult { mut redis_process, working_dir, chainhook_service_port, redis_port, stacks_ingestion_port, - _, - _, - ) = setup_stacks_chainhook_test(1, None, None).await; + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port: _, + observer_command_tx: _, + } = setup_stacks_chainhook_test(1, None, None).await; let uuid = &get_random_uuid(); let predicate = build_stacks_payload(Some("devnet"), None, None, None, Some(uuid)); @@ -68,8 +71,17 @@ async fn ping_endpoint_returns_metrics() { #[tokio::test] #[cfg_attr(not(feature = "redis_tests"), ignore)] async fn prometheus_endpoint_returns_encoded_metrics() { - let (mut redis_process, working_dir, chainhook_service_port, redis_port, _, _, prometheus_port) = - setup_stacks_chainhook_test(1, None, None).await; + let TestSetupResult { + mut redis_process, + working_dir, + chainhook_service_port, + redis_port, + stacks_ingestion_port: _, + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port, + observer_command_tx: _, + } = setup_stacks_chainhook_test(1, None, None).await; let uuid = &get_random_uuid(); let predicate = build_stacks_payload(Some("devnet"), None, None, None, Some(uuid)); @@ -128,8 +140,17 @@ async fn await_observer_started(port: u16) { #[tokio::test] #[cfg_attr(not(feature = "redis_tests"), ignore)] async fn bitcoin_rpc_requests_are_forwarded(endpoint: &str, body: Value) { - let (mut redis_process, working_dir, _, redis_port, stacks_ingestion_port, _, _) = - setup_bitcoin_chainhook_test(1).await; + let TestSetupResult { + mut redis_process, + working_dir, + chainhook_service_port: _, + redis_port, + stacks_ingestion_port, + stacks_rpc_port: _, + bitcoin_rpc_port: _, + prometheus_port: _, + observer_command_tx: _, + } = setup_bitcoin_chainhook_test(1).await; await_observer_started(stacks_ingestion_port).await; @@ -158,6 +179,7 @@ async fn start_and_ping_event_observer(config: EventObserverConfig, ingestion_po observer_commands_rx, None, None, + None, ctx, ) .unwrap(); diff --git a/components/chainhook-cli/src/storage/mod.rs b/components/chainhook-cli/src/storage/mod.rs index c403a83..dc6a945 100644 --- a/components/chainhook-cli/src/storage/mod.rs +++ b/components/chainhook-cli/src/storage/mod.rs @@ -2,7 +2,13 @@ use std::path::PathBuf; use chainhook_sdk::types::{BlockIdentifier, StacksBlockData, StacksBlockUpdate}; use chainhook_sdk::utils::Context; -use rocksdb::{Options, DB}; +use rocksdb::{Direction, IteratorMode, Options, DB}; + +const UNCONFIRMED_KEY_PREFIX: &[u8; 2] = b"~:"; +const CONFIRMED_KEY_PREFIX: &[u8; 2] = b"b:"; +const KEY_SUFFIX: &[u8; 2] = b":d"; +const LAST_UNCONFIRMED_KEY_PREFIX: &[u8; 3] = b"m:~"; +const LAST_CONFIRMED_KEY_PREFIX: &[u8; 3] = b"m:t"; fn get_db_default_options() -> Options { let mut opts = Options::default(); @@ -87,26 +93,26 @@ pub fn open_readwrite_stacks_db_conn(base_dir: &PathBuf, _ctx: &Context) -> Resu fn get_block_key(block_identifier: &BlockIdentifier) -> [u8; 12] { let mut key = [0u8; 12]; - key[..2].copy_from_slice(b"b:"); + key[..2].copy_from_slice(CONFIRMED_KEY_PREFIX); key[2..10].copy_from_slice(&block_identifier.index.to_be_bytes()); - key[10..].copy_from_slice(b":d"); + key[10..].copy_from_slice(KEY_SUFFIX); key } fn get_unconfirmed_block_key(block_identifier: &BlockIdentifier) -> [u8; 12] { let mut key = [0u8; 12]; - key[..2].copy_from_slice(b"~:"); + key[..2].copy_from_slice(UNCONFIRMED_KEY_PREFIX); key[2..10].copy_from_slice(&block_identifier.index.to_be_bytes()); - key[10..].copy_from_slice(b":d"); + key[10..].copy_from_slice(KEY_SUFFIX); key } fn get_last_confirmed_insert_key() -> [u8; 3] { - *b"m:t" + *LAST_CONFIRMED_KEY_PREFIX } fn get_last_unconfirmed_insert_key() -> [u8; 3] { - *b"m:~" + *LAST_UNCONFIRMED_KEY_PREFIX } pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB, _ctx: &Context) { @@ -164,6 +170,35 @@ pub fn get_last_unconfirmed_block_height_inserted(stacks_db: &DB, _ctx: &Context }) } +pub fn get_all_unconfirmed_blocks( + stacks_db: &DB, + _ctx: &Context, +) -> Result, String> { + let unconfirmed_key_prefix = UNCONFIRMED_KEY_PREFIX; + let mut blocks = vec![]; + let iter = stacks_db.iterator(IteratorMode::From( + unconfirmed_key_prefix, + Direction::Forward, + )); + for item in iter { + match item { + Ok((k, v)) => { + if k.starts_with(unconfirmed_key_prefix) { + let spec: StacksBlockData = serde_json::from_slice(&v[..]).map_err(|e| { + format!("unable to deserialize Stacks block {}", e.to_string()) + })?; + blocks.push(spec); + } else { + // we're past the set of keys we're looking for, so we've found all unconfirmed + return Ok(blocks); + } + } + Err(e) => return Err(format!("failed to get all unconfirmed blocks: {e}")), + }; + } + Ok(blocks) +} + pub fn get_last_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option { stacks_db .get(get_last_confirmed_insert_key()) diff --git a/components/chainhook-sdk/src/indexer/mod.rs b/components/chainhook-sdk/src/indexer/mod.rs index fbf580b..9b0c97e 100644 --- a/components/chainhook-sdk/src/indexer/mod.rs +++ b/components/chainhook-sdk/src/indexer/mod.rs @@ -6,7 +6,7 @@ use crate::utils::{AbstractBlock, Context}; use chainhook_types::{ BitcoinBlockSignaling, BitcoinNetwork, BlockHeader, BlockIdentifier, BlockchainEvent, - StacksChainEvent, StacksNetwork, StacksNodeConfig, + StacksBlockData, StacksChainEvent, StacksNetwork, StacksNodeConfig, }; use hiro_system_kit::slog; use rocket::serde::json::Value as JsonValue; @@ -92,6 +92,10 @@ impl Indexer { } } + pub fn seed_stacks_block_pool(&mut self, blocks: Vec, ctx: &Context) { + self.stacks_blocks_pool.seed_block_pool(blocks, ctx); + } + pub fn handle_bitcoin_header( &mut self, header: BlockHeader, diff --git a/components/chainhook-sdk/src/indexer/stacks/blocks_pool.rs b/components/chainhook-sdk/src/indexer/stacks/blocks_pool.rs index 4e52c7b..1d4ee90 100644 --- a/components/chainhook-sdk/src/indexer/stacks/blocks_pool.rs +++ b/components/chainhook-sdk/src/indexer/stacks/blocks_pool.rs @@ -43,6 +43,32 @@ impl StacksBlockPool { } } + pub fn seed_block_pool(&mut self, blocks: Vec, ctx: &Context) { + ctx.try_log(|logger| { + slog::info!(logger, "Seeding block pool with {} blocks", blocks.len()) + }); + for block in blocks { + let existing_entry = self.block_store.get(&block.block_identifier.clone()); + if existing_entry.is_some() { + ctx.try_log(|logger| { + slog::info!( + logger, + "Seeding block pool: Stacks {} has already been processed; skipping", + block.block_identifier + ) + }); + continue; + } + + match self.process_block(block, ctx) { + Ok(_) => {} + Err(e) => { + ctx.try_log(|logger| slog::info!(logger, "Error seeding block pool: {}", e)); + } + } + } + } + pub fn process_block( &mut self, block: StacksBlockData, diff --git a/components/chainhook-sdk/src/indexer/stacks/tests.rs b/components/chainhook-sdk/src/indexer/stacks/tests.rs index 94f7eb8..d4d78f2 100644 --- a/components/chainhook-sdk/src/indexer/stacks/tests.rs +++ b/components/chainhook-sdk/src/indexer/stacks/tests.rs @@ -15,262 +15,271 @@ use test_case::test_case; #[test] fn test_stacks_vector_001() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_001()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_001(), None)); } #[test] fn test_stacks_vector_002() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_002()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_002(), None)); } #[test] fn test_stacks_vector_003() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_003()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_003(), None)); } #[test] fn test_stacks_vector_004() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_004()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_004(), None)); } #[test] fn test_stacks_vector_005() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_005()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_005(), None)); } #[test] fn test_stacks_vector_006() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_006()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_006(), None)); } #[test] fn test_stacks_vector_007() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_007()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_007(), None)); } #[test] fn test_stacks_vector_008() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_008()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_008(), None)); } #[test] fn test_stacks_vector_009() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_009()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_009(), None)); } #[test] fn test_stacks_vector_010() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_010()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_010(), None)); } #[test] fn test_stacks_vector_011() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_011()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_011(), None)); } #[test] fn test_stacks_vector_012() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_012()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_012(), None)); } #[test] fn test_stacks_vector_013() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_013()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_013(), None)); } #[test] fn test_stacks_vector_014() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_014()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_014(), None)); } #[test] fn test_stacks_vector_015() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_015()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_015(), None)); } #[test] fn test_stacks_vector_016() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_016()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_016(), None)); } #[test] fn test_stacks_vector_017() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_017()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_017(), None)); } #[test] fn test_stacks_vector_018() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_018()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_018(), None)); } #[test] fn test_stacks_vector_019() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_019()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_019(), None)); } #[test] fn test_stacks_vector_020() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_020()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_020(), None)); } #[test] fn test_stacks_vector_021() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_021()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_021(), None)); } #[test] fn test_stacks_vector_022() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_022()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_022(), None)); } #[test] fn test_stacks_vector_023() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_023()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_023(), None)); } #[test] fn test_stacks_vector_024() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_024()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_024(), None)); } #[test] fn test_stacks_vector_025() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_025()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_025(), None)); } #[test] fn test_stacks_vector_026() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_026()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_026(), None)); } #[test] fn test_stacks_vector_027() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_027()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_027(), None)); } #[test] fn test_stacks_vector_028() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_028()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_028(), None)); } #[test] fn test_stacks_vector_029() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_029()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_029(), None)); } #[test] fn test_stacks_vector_030() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_030()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_030(), None)); } #[test] fn test_stacks_vector_031() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_031()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_031(), None)); } #[test] fn test_stacks_vector_032() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_032()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_032(), None)); } #[test] fn test_stacks_vector_033() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_033()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_033(), None)); } #[test] fn test_stacks_vector_034() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_034()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_034(), None)); } #[test] fn test_stacks_vector_035() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_035()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_035(), None)); } #[test] fn test_stacks_vector_036() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_036()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_036(), None)); } #[test] fn test_stacks_vector_037() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_037()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_037(), None)); } #[test] fn test_stacks_vector_038() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_038()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_038(), None)); } #[test] fn test_stacks_vector_039() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_039()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_039(), None)); } #[test] fn test_stacks_vector_040() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_040()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_040(), None)); } // #[test] // fn test_stacks_vector_041() { -// process_stacks_blocks_and_check_expectations(helpers::shapes::get_vector_041()); +// process_stacks_blocks_and_check_expectations((helpers::shapes::get_vector_041(), None)); // } #[test] fn test_stacks_vector_042() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_042()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_042(), None)); } #[test] fn test_stacks_vector_043() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_043()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_043(), None)); } #[test] fn test_stacks_vector_044() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_044()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_044(), None)); } #[test] fn test_stacks_vector_045() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_045()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_045(), None)); } #[test] fn test_stacks_vector_046() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_046()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_046(), None)); } #[test] fn test_stacks_vector_047() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_047()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_047(), None)); } #[test] fn test_stacks_vector_048() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_048()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_048(), None)); } #[test] fn test_stacks_vector_049() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_049()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_049(), None)); } #[test] fn test_stacks_vector_050() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_050()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_050(), None)); } #[test] fn test_stacks_vector_051() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_051()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_051(), None)); } #[test] fn test_stacks_vector_052() { - process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_052()); + process_stacks_blocks_and_check_expectations((helpers::stacks_shapes::get_vector_052(), None)); +} + +#[test] +fn test_stacks_vector_053() { + process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_053()); +} +#[test] +fn test_stacks_vector_054() { + process_stacks_blocks_and_check_expectations(helpers::stacks_shapes::get_vector_054()); } #[test_case(StacksTransactionEventPayload::STXTransferEvent(STXTransferEventData { diff --git a/components/chainhook-sdk/src/indexer/tests/helpers/stacks_shapes.rs b/components/chainhook-sdk/src/indexer/tests/helpers/stacks_shapes.rs index 3168491..1a7f600 100644 --- a/components/chainhook-sdk/src/indexer/tests/helpers/stacks_shapes.rs +++ b/components/chainhook-sdk/src/indexer/tests/helpers/stacks_shapes.rs @@ -2,7 +2,7 @@ use crate::utils::Context; use super::{super::StacksChainEventExpectation, BlockEvent}; use super::{microblocks, stacks_blocks}; -use chainhook_types::StacksChainEvent; +use chainhook_types::{StacksBlockData, StacksChainEvent}; use hiro_system_kit::slog; pub fn expect_no_chain_update() -> StacksChainEventExpectation { @@ -3739,3 +3739,91 @@ pub fn get_vector_052() -> Vec<(BlockEvent, StacksChainEventExpectation)> { ), ] } + +/// Vector 053: Generate the following blocks +/// +/// A1(0) - B1(1) - C1(3) +/// \ B2(2) +/// +/// +pub fn get_vector_053() -> ( + Vec<(BlockEvent, StacksChainEventExpectation)>, + Option>, +) { + ( + vec![ + ( + stacks_blocks::B1(None), + expect_chain_updated_with_block(stacks_blocks::B1(None), vec![]), + ), + ( + stacks_blocks::B2(None), + expect_chain_updated_with_block_reorg( + vec![stacks_blocks::B1(None)], + vec![stacks_blocks::B2(None)], + vec![], + ), + ), + ( + stacks_blocks::C1(None), + expect_chain_updated_with_block_reorg( + vec![stacks_blocks::B2(None)], + vec![stacks_blocks::B1(None), stacks_blocks::C1(None)], + vec![], + ), + ), + ], + Some(vec![get_block_from_block_event(stacks_blocks::A1(None))]), + ) +} +/// Vector 054: Generate the following blocks +/// +/// A1(0) - B1(0) - C1(1) - D1(4) +/// \ B2(2) - C2(3) +/// +/// +pub fn get_vector_054() -> ( + Vec<(BlockEvent, StacksChainEventExpectation)>, + Option>, +) { + ( + vec![ + ( + stacks_blocks::C1(None), + expect_chain_updated_with_block(stacks_blocks::C1(None), vec![]), + ), + (stacks_blocks::B2(None), expect_no_chain_update()), + ( + stacks_blocks::C2(None), + expect_chain_updated_with_block_reorg( + vec![stacks_blocks::B1(None), stacks_blocks::C1(None)], + vec![stacks_blocks::B2(None), stacks_blocks::C2(None)], + vec![], + ), + ), + ( + stacks_blocks::D1(None), + expect_chain_updated_with_block_reorg( + vec![stacks_blocks::B2(None), stacks_blocks::C2(None)], + vec![ + stacks_blocks::B1(None), + stacks_blocks::C1(None), + stacks_blocks::D1(None), + ], + vec![], + ), + ), + ], + Some(vec![ + get_block_from_block_event(stacks_blocks::A1(None)), + get_block_from_block_event(stacks_blocks::B1(None)), + ]), + ) +} + +fn get_block_from_block_event(block_event: BlockEvent) -> StacksBlockData { + match block_event { + BlockEvent::Block(block) => block, + _ => unreachable!(), + } +} diff --git a/components/chainhook-sdk/src/indexer/tests/mod.rs b/components/chainhook-sdk/src/indexer/tests/mod.rs index 9781a4c..4705aaf 100644 --- a/components/chainhook-sdk/src/indexer/tests/mod.rs +++ b/components/chainhook-sdk/src/indexer/tests/mod.rs @@ -3,14 +3,22 @@ use crate::utils::{AbstractBlock, Context}; use self::helpers::BlockEvent; use super::{fork_scratch_pad::ForkScratchPad, StacksBlockPool}; -use chainhook_types::{BitcoinBlockData, BlockchainEvent, StacksChainEvent}; +use chainhook_types::{BitcoinBlockData, BlockchainEvent, StacksBlockData, StacksChainEvent}; pub type StacksChainEventExpectation = Box) -> ()>; pub fn process_stacks_blocks_and_check_expectations( - steps: Vec<(BlockEvent, StacksChainEventExpectation)>, + (steps, block_pool_seed): ( + Vec<(BlockEvent, StacksChainEventExpectation)>, + Option>, + ), ) { let mut blocks_processor = StacksBlockPool::new(); + + if let Some(block_pool_seed) = block_pool_seed { + blocks_processor.seed_block_pool(block_pool_seed, &Context::empty()); + } + for (block_event, check_chain_event_expectations) in steps.into_iter() { match block_event { BlockEvent::Block(block) => { diff --git a/components/chainhook-sdk/src/observer/mod.rs b/components/chainhook-sdk/src/observer/mod.rs index b27e3b2..cc1108b 100644 --- a/components/chainhook-sdk/src/observer/mod.rs +++ b/components/chainhook-sdk/src/observer/mod.rs @@ -27,7 +27,7 @@ use bitcoincore_rpc::{Auth, Client, RpcApi}; use chainhook_types::{ BitcoinBlockData, BitcoinBlockSignaling, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData, BitcoinChainUpdatedWithReorgData, BitcoinNetwork, BlockIdentifier, BlockchainEvent, - StacksChainEvent, StacksNetwork, StacksNodeConfig, TransactionIdentifier, + StacksBlockData, StacksChainEvent, StacksNetwork, StacksNodeConfig, TransactionIdentifier, }; use hiro_system_kit; use hiro_system_kit::slog; @@ -426,6 +426,7 @@ pub fn start_event_observer( observer_commands_rx: Receiver, observer_events_tx: Option>, observer_sidecar: Option, + stacks_block_pool_seed: Option>, ctx: Context, ) -> Result<(), Box> { match config.bitcoin_block_signaling { @@ -460,6 +461,7 @@ pub fn start_event_observer( observer_commands_rx, observer_events_tx, observer_sidecar, + stacks_block_pool_seed, context_cloned, ); let _ = hiro_system_kit::nestable_block_on(future); @@ -542,6 +544,7 @@ pub async fn start_stacks_event_observer( observer_commands_rx: Receiver, observer_events_tx: Option>, observer_sidecar: Option, + stacks_block_pool_seed: Option>, ctx: Context, ) -> Result<(), Box> { let indexer_config = IndexerConfig { @@ -553,7 +556,10 @@ pub async fn start_stacks_event_observer( bitcoin_block_signaling: config.bitcoin_block_signaling.clone(), }; - let indexer = Indexer::new(indexer_config.clone()); + let mut indexer = Indexer::new(indexer_config.clone()); + if let Some(stacks_block_pool_seed) = stacks_block_pool_seed { + indexer.seed_stacks_block_pool(stacks_block_pool_seed, &ctx); + } let log_level = if config.display_logs { if cfg!(feature = "cli") {