Skip to content

Commit

Permalink
feat: augment predicate status returned by GET/LIST endpoints (#397)
Browse files Browse the repository at this point in the history
### Description

This PR accomplishes a few things:
- renames/adds/removes predicate statuses according to the flowchart in
docs/images/predicate-status-flowchart/PredicateStatusFlowchart.png
 - adds more context to errors returned in Interrupted status
 - adds status data to `GET /v1/chainhooks` endpoint
- fixes bug in chainhook service where block streaming continued past
`end_block`

Fixes #396, fixes #324 

Also: 
Improves how we handle a restart of `chainhook service` while predicates
are scanning/streaming. Here are the cases we now handle:
1. Predicates that were in `scanning` status when Chainhook was
terminated will resume scanning starting from their
`last_evaluated_block_height`. *Note: because we only save predicate
status every 10 scans, we could end up re-emiting matches on a resetart*
2. Predicates that were in `new` status when Chainhook was terminated
will start scanning at the predicate's `start_block`
3. Predicates that were in `streaming` status will _return_ to a
`scanning` status, starting at `last_evaluated_block_height` to catch up
on the missed blocks. Note, the `number_of_blocks_to_scan` is set to 0
for this temporary catch-up, as it's difficult to compute the number of
remaining blocks in the context of this change
4. If predicates were passed in at startup, we also register those to
begin scanning, which previously didn't happen
5. We now allow passing in a predicate at startup _and_ registering
additional predicates with the predicate registration server. This means
that if you use the same startup predicate repeatedly, it will already
be saved in redis and _not_ be reloaded.

Fixes: #298, fixes #390, fixes #402, fixes #403
#### Breaking change?

The rename of `ScanningData`'s `number_of_blocks_sent` field could
technically be considered breaking, let's discuss.


### Checklist

- [x] All tests pass
- [x] Tests added in this PR (if applicable)
Test coverage before: 23.2%
Test coverage after: 37.72%
  • Loading branch information
vabanaerytk authored Sep 14, 2023
1 parent 9501338 commit 0f32704
Show file tree
Hide file tree
Showing 23 changed files with 3,994 additions and 646 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ jobs:
- name: Cargo test
run: |
rustup update
cargo check
cargo test --all
RUST_BACKTRACE=1 cargo test --all -- --test-threads=1
- name: Semantic Release
uses: cycjimmy/semantic-release-action@v3
Expand Down Expand Up @@ -72,6 +71,6 @@ jobs:
context: .
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
file: ./dockerfiles/components/chainhook.dockerfile
file: ./dockerfiles/components/chainhook-node.dockerfile
# Only push if (there's a new release on main branch, or if building a non-main branch) and (Only run on non-PR events or only PRs that aren't from forks)
push: ${{ (github.ref != 'refs/heads/master' || steps.semantic.outputs.new_release_version != '') && (github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository) }}
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ components/chainhook-types-js/dist
*.rdb
*.redb
cache/

components/chainhook-cli/src/service/tests/fixtures/tmp
9 changes: 3 additions & 6 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config::generator::generate_config;
use crate::config::{Config, PredicatesApi};
use crate::config::Config;
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use crate::scan::stacks::{
consolidate_local_stacks_chainstate_using_csv, scan_stacks_chainstate_via_csv_using_predicate,
Expand Down Expand Up @@ -291,12 +291,8 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
match opts.command {
Command::Service(subcmd) => match subcmd {
ServiceCommand::Start(cmd) => {
let mut config =
let config =
Config::default(cmd.devnet, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
// We disable the API if a predicate was passed, and the --enable-
if cmd.predicates_paths.len() > 0 && !cmd.start_http_api {
config.http_api = PredicatesApi::Off;
}

let predicates = cmd
.predicates_paths
Expand Down Expand Up @@ -462,6 +458,7 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {

scan_bitcoin_chainstate_via_rpc_using_predicate(
&predicate_spec,
None,
&config,
&ctx,
)
Expand Down
165 changes: 113 additions & 52 deletions components/chainhook-cli/src/scan/bitcoin.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::config::{Config, PredicatesApi};
use crate::service::{
open_readwrite_predicates_db_conn_or_panic, update_predicate_status, PredicateStatus,
ScanningData,
open_readwrite_predicates_db_conn_or_panic, set_predicate_scanning_status,
set_unconfirmed_expiration_status, ScanningData,
};
use chainhook_sdk::bitcoincore_rpc::RpcApi;
use chainhook_sdk::bitcoincore_rpc::{Auth, Client};
Expand All @@ -16,16 +16,17 @@ use chainhook_sdk::indexer::bitcoin::{
};
use chainhook_sdk::observer::{gather_proofs, EventObserverConfig};
use chainhook_sdk::types::{
BitcoinBlockData, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData,
BitcoinBlockData, BitcoinChainEvent, BitcoinChainUpdatedWithBlocksData, BlockIdentifier, Chain,
};
use chainhook_sdk::utils::{file_append, send_request, BlockHeights, Context};
use std::collections::HashMap;

pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
predicate_spec: &BitcoinChainhookSpecification,
unfinished_scan_data: Option<ScanningData>,
config: &Config,
ctx: &Context,
) -> Result<(), String> {
) -> Result<bool, String> {
let auth = Auth::UserPass(
config.network.bitcoind_rpc_username.clone(),
config.network.bitcoind_rpc_password.clone(),
Expand All @@ -43,43 +44,70 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
BlockHeights::Blocks(blocks.clone()).get_sorted_entries()
} else {
let start_block = match predicate_spec.start_block {
Some(start_block) => start_block,
Some(start_block) => match &unfinished_scan_data {
Some(scan_data) => scan_data.last_evaluated_block_height,
None => start_block,
},
None => {
return Err(
"Bitcoin chainhook specification must include a field start_block in replay mode"
.into(),
);
}
};
let (end_block, update_end_block) = match predicate_spec.end_block {
Some(end_block) => (end_block, false),
None => match bitcoin_rpc.get_blockchain_info() {
Ok(result) => (result.blocks, true),
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
let (end_block, update_end_block) = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => match predicate_spec.end_block {
Some(end_block) => {
if end_block > result.blocks {
(result.blocks, true)
} else {
(end_block, false)
}
}
None => (result.blocks, true),
},
Err(e) => {
return Err(format!(
"unable to retrieve Bitcoin chain tip ({})",
e.to_string()
));
}
};
floating_end_block = update_end_block;
BlockHeights::BlockRange(start_block, end_block).get_sorted_entries()
};

let mut predicates_db_conn = match config.http_api {
PredicatesApi::On(ref api_config) => {
Some(open_readwrite_predicates_db_conn_or_panic(api_config, ctx))
}
PredicatesApi::Off => None,
};

info!(
ctx.expect_logger(),
"Starting predicate evaluation on Bitcoin blocks",
);

let mut last_block_scanned = BlockIdentifier::default();
let mut actions_triggered = 0;
let mut err_count = 0;

let event_observer_config = config.get_event_observer_config();
let bitcoin_config = event_observer_config.get_bitcoin_config();
let number_of_blocks_to_scan = block_heights_to_scan.len() as u64;
let mut number_of_blocks_scanned = 0;
let mut number_of_blocks_sent = 0u64;

let (mut number_of_blocks_to_scan, mut number_of_blocks_scanned, mut number_of_times_triggered) = {
let number_of_blocks_to_scan = block_heights_to_scan.len() as u64;
match &unfinished_scan_data {
Some(scan_data) => (
scan_data.number_of_blocks_to_scan,
scan_data.number_of_blocks_evaluated,
scan_data.number_of_times_triggered,
),
None => (number_of_blocks_to_scan, 0, 0u64),
}
};

let http_client = build_http_client();

while let Some(current_block_height) = block_heights_to_scan.pop_front() {
Expand Down Expand Up @@ -109,8 +137,9 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
continue;
}
};
last_block_scanned = block.block_identifier.clone();

match process_block_with_predicates(
let res = match process_block_with_predicates(
block,
&vec![&predicate_spec],
&event_observer_config,
Expand All @@ -120,81 +149,113 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
{
Ok(actions) => {
if actions > 0 {
number_of_blocks_sent += 1;
number_of_times_triggered += 1;
}
actions_triggered += actions
actions_triggered += actions;
Ok(())
}
Err(_) => err_count += 1,
}
Err(e) => {
err_count += 1;
Err(e)
}
};

if err_count >= 3 {
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
if res.is_err() {
return Err(format!(
"Scan aborted (consecutive action errors >= 3): {}",
res.unwrap_err()
));
} else {
return Err(format!("Scan aborted (consecutive action errors >= 3)"));
}
}

if let PredicatesApi::On(ref api_config) = config.http_api {
if number_of_blocks_scanned % 50 == 0 {
let status = PredicateStatus::Scanning(ScanningData {
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
if number_of_blocks_scanned % 10 == 0 || number_of_blocks_scanned == 1 {
set_predicate_scanning_status(
&predicate_spec.key(),
number_of_blocks_to_scan,
number_of_blocks_scanned,
number_of_blocks_sent,
number_of_times_triggered,
current_block_height,
});
let mut predicates_db_conn =
open_readwrite_predicates_db_conn_or_panic(api_config, &ctx);
update_predicate_status(
&predicate_spec.key(),
status,
&mut predicates_db_conn,
&ctx,
)
predicates_db_conn,
ctx,
);
}
}

if block_heights_to_scan.is_empty() && floating_end_block {
match bitcoin_rpc.get_blockchain_info() {
Ok(result) => {
for entry in (current_block_height + 1)..result.blocks {
block_heights_to_scan.push_back(entry);
let new_tip = match bitcoin_rpc.get_blockchain_info() {
Ok(result) => match predicate_spec.end_block {
Some(end_block) => {
if end_block > result.blocks {
result.blocks
} else {
end_block
}
}
}
None => result.blocks,
},
Err(_e) => {
continue;
}
};

for entry in (current_block_height + 1)..new_tip {
block_heights_to_scan.push_back(entry);
}
number_of_blocks_to_scan += block_heights_to_scan.len() as u64;
}
}
info!(
ctx.expect_logger(),
"{number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
);

if let PredicatesApi::On(ref api_config) = config.http_api {
let status = PredicateStatus::Scanning(ScanningData {
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
if let Some(predicate_end_block) = predicate_spec.end_block {
if predicate_end_block == last_block_scanned.index {
// todo: we need to find a way to check if this block is confirmed
// and if so, set the status to confirmed expiration
set_unconfirmed_expiration_status(
&Chain::Bitcoin,
number_of_blocks_scanned,
predicate_end_block,
&predicate_spec.key(),
predicates_db_conn,
ctx,
);
return Ok(true);
}
}
set_predicate_scanning_status(
&predicate_spec.key(),
number_of_blocks_to_scan,
number_of_blocks_scanned,
number_of_blocks_sent,
current_block_height: 0,
});
let mut predicates_db_conn = open_readwrite_predicates_db_conn_or_panic(api_config, &ctx);
update_predicate_status(&predicate_spec.key(), status, &mut predicates_db_conn, &ctx)
number_of_times_triggered,
last_block_scanned.index,
predicates_db_conn,
ctx,
);
}

Ok(())
return Ok(false);
}

pub async fn process_block_with_predicates(
block: BitcoinBlockData,
predicates: &Vec<&BitcoinChainhookSpecification>,
event_observer_config: &EventObserverConfig,
ctx: &Context,
) -> Result<u32, ()> {
) -> Result<u32, String> {
let chain_event =
BitcoinChainEvent::ChainUpdatedWithBlocks(BitcoinChainUpdatedWithBlocksData {
new_blocks: vec![block],
confirmed_blocks: vec![],
});

let (predicates_triggered, _predicates_evaluated) =
let (predicates_triggered, _predicates_evaluated, _predicates_expired) =
evaluate_bitcoin_chainhooks_on_chain_event(&chain_event, predicates, ctx);

execute_predicates_action(predicates_triggered, &event_observer_config, &ctx).await
Expand All @@ -204,7 +265,7 @@ pub async fn execute_predicates_action<'a>(
hits: Vec<BitcoinTriggerChainhook<'a>>,
config: &EventObserverConfig,
ctx: &Context,
) -> Result<u32, ()> {
) -> Result<u32, String> {
let mut actions_triggered = 0;
let mut proofs = HashMap::new();
for trigger in hits.into_iter() {
Expand Down
Loading

0 comments on commit 0f32704

Please sign in to comment.