From 5d78e741bc6930516835ad2d9de9656606b9d88d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 13 Jun 2024 18:22:43 +0800 Subject: [PATCH] feat(wallet)!: change persist API to use `StageExt` and `StageExtAsync` --- crates/wallet/Cargo.toml | 1 + crates/wallet/src/wallet/mod.rs | 91 ++++++++++++------- crates/wallet/tests/wallet.rs | 11 +-- example-crates/wallet_electrum/src/main.rs | 4 +- .../wallet_esplora_async/src/main.rs | 4 +- .../wallet_esplora_blocking/src/main.rs | 4 +- example-crates/wallet_rpc/src/main.rs | 4 +- 7 files changed, 70 insertions(+), 49 deletions(-) diff --git a/crates/wallet/Cargo.toml b/crates/wallet/Cargo.toml index d7f3290ace..d5b52be060 100644 --- a/crates/wallet/Cargo.toml +++ b/crates/wallet/Cargo.toml @@ -30,6 +30,7 @@ js-sys = "0.3" [features] default = ["std"] std = ["bitcoin/std", "miniscript/std", "bdk_chain/std"] +async = ["bdk_chain/async"] compiler = ["miniscript/compiler"] all-keys = ["keys-bip39"] keys-bip39 = ["bip39"] diff --git a/crates/wallet/src/wallet/mod.rs b/crates/wallet/src/wallet/mod.rs index 87efd4784b..821b0d8a5a 100644 --- a/crates/wallet/src/wallet/mod.rs +++ b/crates/wallet/src/wallet/mod.rs @@ -26,6 +26,7 @@ use bdk_chain::{ local_chain::{ self, ApplyHeaderError, CannotConnectError, CheckPoint, CheckPointIter, LocalChain, }, + persist::{PersistBackend, StageExt}, spk_client::{FullScanRequest, FullScanResult, SyncRequest, SyncResult}, tx_graph::{CanonicalTx, TxGraph}, Append, BlockId, ChainPosition, ConfirmationTime, ConfirmationTimeHeightAnchor, FullTxOut, @@ -40,7 +41,6 @@ use bitcoin::{ use bitcoin::{consensus::encode::serialize, transaction, BlockHash, Psbt}; use bitcoin::{constants::genesis_block, Amount}; use core::fmt; -use core::mem; use core::ops::Deref; use descriptor::error::Error as DescriptorError; use miniscript::psbt::{PsbtExt, PsbtInputExt, PsbtInputSatisfier}; @@ -393,18 +393,6 @@ impl Wallet { }) } - /// Stage a ['ChangeSet'] to be persisted later. - /// - /// [`commit`]: Self::commit - fn stage(&mut self, changeset: ChangeSet) { - self.stage.append(changeset) - } - - /// Take the staged [`ChangeSet`] to be persisted now. - pub fn take_staged(&mut self) -> ChangeSet { - mem::take(&mut self.stage) - } - /// Load [`Wallet`] from the given previously persisted [`ChangeSet`]. /// /// Note that the descriptor secret keys are not persisted to the db; this means that after @@ -687,7 +675,7 @@ impl Wallet { /// # let changeset = ChangeSet::default(); /// # let mut wallet = Wallet::load_from_changeset(changeset).expect("load wallet"); /// let next_address = wallet.reveal_next_address(KeychainKind::External); - /// db.write_changes(&wallet.take_staged())?; + /// wallet.commit_to(&mut db)?; /// /// // Now it's safe to show the user their next address! /// println!("Next address: {}", next_address.address); @@ -730,7 +718,7 @@ impl Wallet { .reveal_to_target(&keychain, index) .expect("keychain must exist"); - self.stage(indexed_tx_graph::ChangeSet::from(index_changeset).into()); + self.stage.append(index_changeset.into()); spk_iter.map(move |(index, spk)| AddressInfo { index, @@ -913,7 +901,7 @@ impl Wallet { /// [`list_output`]: Self::list_output pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) { let additions = self.indexed_graph.insert_txout(outpoint, txout); - self.stage(ChangeSet::from(additions)); + self.stage.append(additions.into()); } /// Calculates the fee of a given transaction. Returns [`Amount::ZERO`] if `tx` is a coinbase transaction. @@ -1082,7 +1070,7 @@ impl Wallet { ) -> Result { let changeset = self.chain.insert_block(block_id)?; let changed = !changeset.is_empty(); - self.stage(changeset.into()); + self.stage.append(changeset.into()); Ok(changed) } @@ -1144,7 +1132,7 @@ impl Wallet { } let changed = !changeset.is_empty(); - self.stage(changeset); + self.stage.append(changeset); Ok(changed) } @@ -1469,9 +1457,7 @@ impl Wallet { .expect("keychain must exist"); let spk = spk.into(); self.indexed_graph.index.mark_used(change_keychain, index); - self.stage(ChangeSet::from(indexed_tx_graph::ChangeSet::from( - index_changeset, - ))); + self.stage.append(index_changeset.into()); spk } }; @@ -2281,25 +2267,60 @@ impl Wallet { /// [`commit`]: Self::commit pub fn apply_update(&mut self, update: impl Into) -> Result<(), CannotConnectError> { let update = update.into(); - let mut changeset = match update.chain { - Some(chain_update) => ChangeSet::from(self.chain.apply_update(chain_update)?), - None => ChangeSet::default(), - }; - + let mut changeset = ChangeSet::default(); + if let Some(chain_update) = update.chain { + changeset.append(self.chain.apply_update(chain_update)?.into()); + } let (_, index_changeset) = self .indexed_graph .index .reveal_to_target_multi(&update.last_active_indices); - changeset.append(ChangeSet::from(indexed_tx_graph::ChangeSet::from( - index_changeset, - ))); - changeset.append(ChangeSet::from( - self.indexed_graph.apply_update(update.graph), - )); - self.stage(changeset); + changeset.append(index_changeset.into()); + changeset.append(self.indexed_graph.apply_update(update.graph).into()); + self.stage.append(changeset); Ok(()) } + /// Commits all currently [`staged`](Wallet::staged) changes to the `persist_backend`. + /// + /// This returns whether anything was persisted. + /// + /// # Error + /// + /// Returns a backend-defined error if this fails. + pub fn commit_to(&mut self, persist_backend: &mut B) -> Result + where + B: PersistBackend, + { + let committed = StageExt::commit_to(&mut self.stage, persist_backend)?; + Ok(committed.is_some()) + } + + /// Commits all currently [`staged`](Wallet::staged) changes to the async `persist_backend`. + /// + /// This returns whether anything was persisted. + /// + /// # Error + /// + /// Returns a backend-defined error if this fails. + #[cfg(feature = "async")] + pub async fn commit_to_async( + &mut self, + persist_backend: &mut B, + ) -> Result + where + B: bdk_chain::persist::PersistBackendAsync + Send + Sync, + { + let committed = + bdk_chain::persist::StageExtAsync::commit_to(&mut self.stage, persist_backend).await?; + Ok(committed.is_some()) + } + + /// Get the staged [`ChangeSet`] that is yet to be committed. + pub fn staged(&self) -> &ChangeSet { + &self.stage + } + /// Get a reference to the inner [`TxGraph`]. pub fn tx_graph(&self) -> &TxGraph { self.indexed_graph.graph() @@ -2369,7 +2390,7 @@ impl Wallet { .apply_block_relevant(block, height) .into(), ); - self.stage(changeset); + self.stage.append(changeset); Ok(()) } @@ -2392,7 +2413,7 @@ impl Wallet { let indexed_graph_changeset = self .indexed_graph .batch_insert_relevant_unconfirmed(unconfirmed_txs); - self.stage(ChangeSet::from(indexed_graph_changeset)); + self.stage.append(indexed_graph_changeset.into()); } } diff --git a/crates/wallet/tests/wallet.rs b/crates/wallet/tests/wallet.rs index beddacd6e1..3ca18b7601 100644 --- a/crates/wallet/tests/wallet.rs +++ b/crates/wallet/tests/wallet.rs @@ -90,11 +90,10 @@ fn load_recovers_wallet() -> anyhow::Result<()> { wallet.reveal_next_address(KeychainKind::External); // persist new wallet changes - let staged_changeset = wallet.take_staged(); - let db = &mut create_new(&file_path).expect("must create db"); - db.write_changes(&staged_changeset) + let mut db = create_new(&file_path).expect("must create db"); + wallet + .commit_to(&mut db) .map_err(|e| anyhow!("write changes error: {}", e))?; - wallet.spk_index().clone() }; @@ -158,9 +157,9 @@ fn new_or_load() -> anyhow::Result<()> { let wallet_keychains: BTreeMap<_, _> = { let wallet = &mut Wallet::new_or_load(desc, change_desc, None, Network::Testnet) .expect("must init wallet"); - let staged_changeset = wallet.take_staged(); let mut db = new_or_load(&file_path).expect("must create db"); - db.write_changes(&staged_changeset) + wallet + .commit_to(&mut db) .map_err(|e| anyhow!("write changes error: {}", e))?; wallet.keychains().map(|(k, v)| (*k, v.clone())).collect() }; diff --git a/example-crates/wallet_electrum/src/main.rs b/example-crates/wallet_electrum/src/main.rs index f634466fa1..c7c563c152 100644 --- a/example-crates/wallet_electrum/src/main.rs +++ b/example-crates/wallet_electrum/src/main.rs @@ -33,7 +33,7 @@ fn main() -> Result<(), anyhow::Error> { )?; let address = wallet.next_unused_address(KeychainKind::External); - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; println!("Generated Address: {}", address); let balance = wallet.balance(); @@ -72,7 +72,7 @@ fn main() -> Result<(), anyhow::Error> { println!(); wallet.apply_update(update)?; - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; let balance = wallet.balance(); println!("Wallet balance after syncing: {} sats", balance.total()); diff --git a/example-crates/wallet_esplora_async/src/main.rs b/example-crates/wallet_esplora_async/src/main.rs index c3e4244cdb..6a666d49ba 100644 --- a/example-crates/wallet_esplora_async/src/main.rs +++ b/example-crates/wallet_esplora_async/src/main.rs @@ -30,7 +30,7 @@ async fn main() -> Result<(), anyhow::Error> { )?; let address = wallet.next_unused_address(KeychainKind::External); - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; println!("Generated Address: {}", address); let balance = wallet.balance(); @@ -78,7 +78,7 @@ async fn main() -> Result<(), anyhow::Error> { let _ = update.graph_update.update_last_seen_unconfirmed(now); wallet.apply_update(update)?; - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; println!(); let balance = wallet.balance(); diff --git a/example-crates/wallet_esplora_blocking/src/main.rs b/example-crates/wallet_esplora_blocking/src/main.rs index 3b431465a0..21e9fe6884 100644 --- a/example-crates/wallet_esplora_blocking/src/main.rs +++ b/example-crates/wallet_esplora_blocking/src/main.rs @@ -29,7 +29,7 @@ fn main() -> Result<(), anyhow::Error> { )?; let address = wallet.next_unused_address(KeychainKind::External); - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; println!("Generated Address: {}", address); let balance = wallet.balance(); @@ -55,7 +55,7 @@ fn main() -> Result<(), anyhow::Error> { let _ = update.graph_update.update_last_seen_unconfirmed(now); wallet.apply_update(update)?; - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; println!(); let balance = wallet.balance(); diff --git a/example-crates/wallet_rpc/src/main.rs b/example-crates/wallet_rpc/src/main.rs index f5d12f8a60..e0d617493f 100644 --- a/example-crates/wallet_rpc/src/main.rs +++ b/example-crates/wallet_rpc/src/main.rs @@ -147,7 +147,7 @@ fn main() -> anyhow::Result<()> { let connected_to = block_emission.connected_to(); let start_apply_block = Instant::now(); wallet.apply_block_connected_to(&block_emission.block, height, connected_to)?; - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; let elapsed = start_apply_block.elapsed().as_secs_f32(); println!( "Applied block {} at height {} in {}s", @@ -157,7 +157,7 @@ fn main() -> anyhow::Result<()> { Emission::Mempool(mempool_emission) => { let start_apply_mempool = Instant::now(); wallet.apply_unconfirmed_txs(mempool_emission.iter().map(|(tx, time)| (tx, *time))); - db.write_changes(&wallet.take_staged())?; + wallet.commit_to(&mut db)?; println!( "Applied unconfirmed transactions in {}s", start_apply_mempool.elapsed().as_secs_f32()