From dbbc8cecfc4f6311ed3968b0a25b9d13e7903b81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 13 Jun 2024 22:36:12 +0800 Subject: [PATCH] feat(example): use changeset staging with rpc polling example --- .../example_bitcoind_rpc_polling/src/main.rs | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/example-crates/example_bitcoind_rpc_polling/src/main.rs b/example-crates/example_bitcoind_rpc_polling/src/main.rs index 1d4674a366..98945d7a27 100644 --- a/example-crates/example_bitcoind_rpc_polling/src/main.rs +++ b/example-crates/example_bitcoind_rpc_polling/src/main.rs @@ -11,12 +11,12 @@ use bdk_bitcoind_rpc::{ bitcoincore_rpc::{Auth, Client, RpcApi}, Emitter, }; -use bdk_chain::persist::PersistBackend; +use bdk_chain::persist::{PersistBackend, StageExt}; use bdk_chain::{ bitcoin::{constants::genesis_block, Block, Transaction}, indexed_tx_graph, keychain, local_chain::{self, LocalChain}, - ConfirmationTimeHeightAnchor, IndexedTxGraph, + Append, ConfirmationTimeHeightAnchor, IndexedTxGraph, }; use example_cli::{ anyhow, @@ -176,6 +176,7 @@ fn main() -> anyhow::Result<()> { let chain_tip = chain.lock().unwrap().tip(); let rpc_client = rpc_args.new_client()?; let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height); + let mut db_stage = ChangeSet::default(); let mut last_db_commit = Instant::now(); let mut last_print = Instant::now(); @@ -185,17 +186,18 @@ fn main() -> anyhow::Result<()> { let mut chain = chain.lock().unwrap(); let mut graph = graph.lock().unwrap(); - let mut db = db.lock().unwrap(); let chain_changeset = chain .apply_update(emission.checkpoint) .expect("must always apply as we receive blocks in order from emitter"); let graph_changeset = graph.apply_block_relevant(&emission.block, height); - db.write_changes(&(chain_changeset, graph_changeset))?; + db_stage.append((chain_changeset, graph_changeset)); // commit staged db changes in intervals if last_db_commit.elapsed() >= DB_COMMIT_DELAY { + let db = &mut *db.lock().unwrap(); last_db_commit = Instant::now(); + db_stage.commit_to(db)?; println!( "[{:>10}s] committed to db (took {}s)", start.elapsed().as_secs_f32(), @@ -230,8 +232,11 @@ fn main() -> anyhow::Result<()> { mempool_txs.iter().map(|(tx, time)| (tx, *time)), ); { - let mut db = db.lock().unwrap(); - db.write_changes(&(local_chain::ChangeSet::default(), graph_changeset))?; + let db = &mut *db.lock().unwrap(); + db_stage.append_and_commit_to( + (local_chain::ChangeSet::default(), graph_changeset), + db, + )?; } } RpcCommands::Live { rpc_args } => { @@ -287,9 +292,9 @@ fn main() -> anyhow::Result<()> { let mut tip_height = 0_u32; let mut last_db_commit = Instant::now(); let mut last_print = Option::::None; + let mut db_stage = ChangeSet::default(); for emission in rx { - let mut db = db.lock().unwrap(); let mut graph = graph.lock().unwrap(); let mut chain = chain.lock().unwrap(); @@ -314,11 +319,12 @@ fn main() -> anyhow::Result<()> { continue; } }; - - db.write_changes(&changeset)?; + db_stage.append(changeset); if last_db_commit.elapsed() >= DB_COMMIT_DELAY { + let db = &mut *db.lock().unwrap(); last_db_commit = Instant::now(); + db_stage.commit_to(db)?; println!( "[{:>10}s] committed to db (took {}s)", start.elapsed().as_secs_f32(),