From ed4998400299a12137edbde4ca107bcd626ff96a Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 20 Oct 2022 15:28:14 +0200 Subject: [PATCH] Make everything async We migrate BDK and other remaining blocking parts to async. --- Cargo.toml | 2 +- src/access.rs | 171 +++++++++++++++++++++++++++++++------------------- src/error.rs | 7 ++- src/event.rs | 27 +++++--- src/lib.rs | 45 +++++++------ 5 files changed, 153 insertions(+), 99 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 82f59261c..c2a826356 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ lightning-background-processor = { version = "0.0.110" } lightning-rapid-gossip-sync = { version = "0.0.110" } #bdk = "0.20.0" -bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch="master", features = ["use-esplora-async", "key-value-db"]} +bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch = "master", default-features = false, features = ["async-interface","use-esplora-async", "key-value-db"]} bitcoin = "0.28.1" rand = "0.8.5" diff --git a/src/access.rs b/src/access.rs index e2bd74906..24a80232d 100644 --- a/src/access.rs +++ b/src/access.rs @@ -1,36 +1,47 @@ use crate::logger::{ log_error, log_given_level, log_internal, log_trace, FilesystemLogger, Logger, }; -use crate::Error; +use crate::{Config, Error}; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; use lightning::chain::WatchedOutput; use lightning::chain::{Confirm, Filter}; -use bdk::blockchain::{Blockchain, EsploraBlockchain, GetBlockHash, GetHeight, GetTx}; +use bdk::blockchain::EsploraBlockchain; use bdk::database::BatchDatabase; +use bdk::esplora_client; use bdk::wallet::AddressIndex; -use bdk::{SignOptions, SyncOptions}; +use bdk::{FeeRate, SignOptions, SyncOptions}; -use bitcoin::{BlockHash, Script, Transaction, Txid}; +use bitcoin::{Script, Transaction, Txid}; use std::collections::HashSet; -use std::sync::{Arc, Mutex}; +use std::sync::{mpsc, Arc, Mutex, RwLock}; /// The minimum feerate we are allowed to send, as specify by LDK. const MIN_FEERATE: u32 = 253; +// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold +// number of blocks after which BDK stops looking for scripts belonging to the wallet. +const BDK_CLIENT_STOP_GAP: usize = 20; + +// The number of concurrent requests made against the API provider. +const BDK_CLIENT_CONCURRENCY: u8 = 8; + pub struct ChainAccess where D: BatchDatabase, { - blockchain: EsploraBlockchain, + blockchain: Arc, + client: Arc, wallet: Mutex>, queued_transactions: Mutex>, watched_transactions: Mutex>, queued_outputs: Mutex>, watched_outputs: Mutex>, last_sync_height: tokio::sync::Mutex>, + tokio_runtime: RwLock>>, + _config: Arc, logger: Arc, } @@ -39,7 +50,7 @@ where D: BatchDatabase, { pub(crate) fn new( - blockchain: EsploraBlockchain, wallet: bdk::Wallet, logger: Arc, + wallet: bdk::Wallet, config: Arc, logger: Arc, ) -> Self { let wallet = Mutex::new(wallet); let watched_transactions = Mutex::new(Vec::new()); @@ -47,30 +58,49 @@ where let watched_outputs = Mutex::new(Vec::new()); let queued_outputs = Mutex::new(Vec::new()); let last_sync_height = tokio::sync::Mutex::new(None); + let tokio_runtime = RwLock::new(None); + // TODO: Check that we can be sure that the Esplora client re-connects in case of failure + // and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime? + let blockchain = Arc::new( + EsploraBlockchain::new(&config.esplora_server_url, BDK_CLIENT_STOP_GAP) + .with_concurrency(BDK_CLIENT_CONCURRENCY), + ); + let client_builder = + esplora_client::Builder::new(&format!("http://{}", &config.esplora_server_url)); + let client = Arc::new(client_builder.build_async().unwrap()); Self { blockchain, + client, wallet, queued_transactions, watched_transactions, queued_outputs, watched_outputs, last_sync_height, + tokio_runtime, + _config: config, logger, } } + pub(crate) fn set_runtime(&self, tokio_runtime: Arc) { + *self.tokio_runtime.write().unwrap() = Some(tokio_runtime); + } + + pub(crate) fn drop_runtime(&self) { + *self.tokio_runtime.write().unwrap() = None; + } + pub(crate) async fn sync_wallet(&self) -> Result<(), Error> { let sync_options = SyncOptions { progress: None }; - self.wallet.lock().unwrap().sync(&self.blockchain, sync_options)?; + self.wallet.lock().unwrap().sync(&self.blockchain, sync_options).await?; Ok(()) } - pub(crate) async fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync)>) -> Result<(), Error> { - let client = &*self.blockchain; - - let cur_height = client.get_height().await?; + pub(crate) async fn sync(&self, confirmables: Vec<&(dyn Confirm + Send + Sync)>) -> Result<(), Error> { + let cur_height = self.client.get_height().await?; let mut locked_last_sync_height = self.last_sync_height.lock().await; if cur_height >= locked_last_sync_height.unwrap_or(0) { @@ -84,13 +114,11 @@ where } async fn sync_best_block_updated( - &self, confirmables: &Vec<&(dyn Confirm + Sync)>, cur_height: u32, + &self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>, cur_height: u32, locked_last_sync_height: &mut tokio::sync::MutexGuard<'_, Option>, ) -> Result<(), Error> { - let client = &*self.blockchain; - // Inform the interface of the new block. - let cur_block_header = client.get_header(cur_height).await?; + let cur_block_header = self.client.get_header(cur_height).await?; for c in confirmables { c.best_block_updated(&cur_block_header, cur_height); } @@ -100,10 +128,8 @@ where } async fn sync_transactions_confirmed( - &self, confirmables: &Vec<&(dyn Confirm + Sync)>, + &self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>, ) -> Result<(), Error> { - let client = &*self.blockchain; - // First, check the confirmation status of registered transactions as well as the // status of dependent transactions of registered outputs. @@ -125,13 +151,13 @@ where let mut unconfirmed_registered_txs = Vec::new(); for txid in registered_txs { - if let Some(tx_status) = client.get_tx_status(&txid).await? { + if let Some(tx_status) = self.client.get_tx_status(&txid).await? { if tx_status.confirmed { - if let Some(tx) = client.get_tx(&txid).await? { + if let Some(tx) = self.client.get_tx(&txid).await? { if let Some(block_height) = tx_status.block_height { // TODO: Switch to `get_header_by_hash` once released upstream (https://github.com/bitcoindevkit/rust-esplora-client/pull/17) - let block_header = client.get_header(block_height).await?; - if let Some(merkle_proof) = client.get_merkle_proof(&txid).await? { + let block_header = self.client.get_header(block_height).await?; + if let Some(merkle_proof) = self.client.get_merkle_proof(&txid).await? { if block_height == merkle_proof.block_height { confirmed_txs.push(( tx, @@ -160,7 +186,8 @@ where let mut unspent_registered_outputs = Vec::new(); for output in registered_outputs { - if let Some(output_status) = client + if let Some(output_status) = self + .client .get_output_status(&output.outpoint.txid, output.outpoint.index as u64) .await? { @@ -168,12 +195,12 @@ where if let Some(spending_tx_status) = output_status.status { if spending_tx_status.confirmed { let spending_txid = output_status.txid.unwrap(); - if let Some(spending_tx) = client.get_tx(&spending_txid).await? { + if let Some(spending_tx) = self.client.get_tx(&spending_txid).await? { let block_height = spending_tx_status.block_height.unwrap(); // TODO: Switch to `get_header_by_hash` once released upstream (https://github.com/bitcoindevkit/rust-esplora-client/pull/17) - let block_header = client.get_header(block_height).await?; + let block_header = self.client.get_header(block_height).await?; if let Some(merkle_proof) = - client.get_merkle_proof(&spending_txid).await? + self.client.get_merkle_proof(&spending_txid).await? { confirmed_txs.push(( spending_tx, @@ -213,15 +240,15 @@ where } async fn sync_transaction_unconfirmed( - &self, confirmables: &Vec<&(dyn Confirm + Sync)>, + &self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>, ) -> Result<(), Error> { - let client = &*self.blockchain; // Query the interface for relevant txids and check whether they have been // reorged-out of the chain. let relevant_txids = confirmables.iter().flat_map(|c| c.get_relevant_txids()).collect::>(); for txid in relevant_txids { - let tx_unconfirmed = client + let tx_unconfirmed = self + .client .get_tx_status(&txid) .await .ok() @@ -240,12 +267,14 @@ where pub(crate) fn create_funding_transaction( &self, output_script: &Script, value_sats: u64, confirmation_target: ConfirmationTarget, ) -> Result { - let num_blocks = num_blocks_from_conf_target(confirmation_target); - let fee_rate = self.blockchain.estimate_fee(num_blocks)?; - let locked_wallet = self.wallet.lock().unwrap(); let mut tx_builder = locked_wallet.build_tx(); + let fallback_fee = fallback_fee_from_conf_target(confirmation_target); + let fee_rate = self + .estimate_fee(confirmation_target) + .unwrap_or(FeeRate::from_sat_per_kwu(fallback_fee as f32)); + tx_builder.add_recipient(output_script.clone(), value_sats).fee_rate(fee_rate).enable_rbf(); let (mut psbt, _) = tx_builder.finish()?; @@ -271,6 +300,33 @@ where let address_info = self.wallet.lock().unwrap().get_address(AddressIndex::New)?; Ok(address_info.address) } + + fn estimate_fee(&self, confirmation_target: ConfirmationTarget) -> Result { + let num_blocks = num_blocks_from_conf_target(confirmation_target); + + let locked_runtime = self.tokio_runtime.read().unwrap(); + if locked_runtime.as_ref().is_none() { + return Err(Error::FeeEstimationFailed); + } + + let tokio_client = Arc::clone(&self.client); + let (sender, receiver) = mpsc::sync_channel(1); + + locked_runtime.as_ref().unwrap().spawn(async move { + let res = tokio_client.get_fee_estimates().await; + let _ = sender.send(res); + }); + + let estimates = receiver + .recv() + .map_err(|_| Error::FeeEstimationFailed)? + .map_err(|_| Error::FeeEstimationFailed)?; + + Ok(bdk::FeeRate::from_sat_per_vb( + esplora_client::convert_fee_rate(num_blocks, estimates) + .map_err(|_| Error::FeeEstimationFailed)?, + )) + } } impl FeeEstimator for ChainAccess @@ -278,10 +334,9 @@ where D: BatchDatabase, { fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { - let num_blocks = num_blocks_from_conf_target(confirmation_target); let fallback_fee = fallback_fee_from_conf_target(confirmation_target); - self.blockchain - .estimate_fee(num_blocks) + + self.estimate_fee(confirmation_target) .map_or(fallback_fee, |fee_rate| (fee_rate.fee_wu(1000) as u32).max(MIN_FEERATE)) as u32 } } @@ -291,7 +346,22 @@ where D: BatchDatabase, { fn broadcast_transaction(&self, tx: &Transaction) { - match self.blockchain.broadcast(tx) { + let locked_runtime = self.tokio_runtime.read().unwrap(); + if locked_runtime.as_ref().is_none() { + log_error!(self.logger, "Failed to broadcast transaction: No runtime."); + return; + } + + let tokio_client = Arc::clone(&self.client); + let tokio_tx = tx.clone(); + let (sender, receiver) = mpsc::sync_channel(1); + + locked_runtime.as_ref().unwrap().spawn(async move { + let res = tokio_client.broadcast(&tokio_tx).await; + let _ = sender.send(res); + }); + + match receiver.recv().unwrap() { Ok(_) => {} Err(err) => { log_error!(self.logger, "Failed to broadcast transaction: {}", err); @@ -315,33 +385,6 @@ where } } -impl GetHeight for ChainAccess -where - D: BatchDatabase, -{ - fn get_height(&self) -> Result { - self.blockchain.get_height() - } -} - -impl GetBlockHash for ChainAccess -where - D: BatchDatabase, -{ - fn get_block_hash(&self, height: u64) -> Result { - self.blockchain.get_block_hash(height) - } -} - -impl GetTx for ChainAccess -where - D: BatchDatabase, -{ - fn get_tx(&self, txid: &Txid) -> Result, bdk::Error> { - self.blockchain.get_tx(txid) - } -} - fn num_blocks_from_conf_target(confirmation_target: ConfirmationTarget) -> usize { match confirmation_target { ConfirmationTarget::Background => 12, diff --git a/src/error.rs b/src/error.rs index 6494ee391..8e937ed35 100644 --- a/src/error.rs +++ b/src/error.rs @@ -10,6 +10,8 @@ pub enum Error { NotRunning, /// The funding transaction could not be created. FundingTxCreationFailed, + /// Returned when we could not estimate a transaction fee. + FeeEstimationFailed, /// A network connection has been closed. ConnectionFailed, /// Payment of the given invoice has already been intiated. @@ -41,9 +43,8 @@ impl fmt::Display for Error { match *self { Self::AlreadyRunning => write!(f, "LDKLite is already running."), Self::NotRunning => write!(f, "LDKLite is not running."), - Self::FundingTxCreationFailed => { - write!(f, "Funding transaction could not be created.") - } + Self::FundingTxCreationFailed => write!(f, "Funding transaction could not be created."), + Self::FeeEstimationFailed => write!(f, "Fee estimation failed."), Self::ConnectionFailed => write!(f, "Network connection closed."), Self::NonUniquePaymentHash => write!(f, "An invoice must not get payed twice."), Self::InvoiceInvalid => write!(f, "The given invoice is invalid."), diff --git a/src/event.rs b/src/event.rs index 0ba7fc690..8c137efe5 100644 --- a/src/event.rs +++ b/src/event.rs @@ -18,8 +18,7 @@ use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use bitcoin::secp256k1::Secp256k1; use rand::{thread_rng, Rng}; use std::collections::{hash_map, VecDeque}; -use std::sync::{Arc, Condvar, Mutex}; -use std::thread; +use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::time::Duration; /// The event queue will be persisted under this key. @@ -221,6 +220,7 @@ pub(crate) struct EventHandler { keys_manager: Arc, inbound_payments: Arc, outbound_payments: Arc, + tokio_runtime: RwLock>>, logger: Arc, _config: Arc, } @@ -233,6 +233,7 @@ impl EventHandler { inbound_payments: Arc, outbound_payments: Arc, logger: Arc, _config: Arc, ) -> Self { + let tokio_runtime = RwLock::new(None); Self { event_queue, chain_access, @@ -241,10 +242,19 @@ impl EventHandler { keys_manager, inbound_payments, outbound_payments, + tokio_runtime, logger, _config, } } + + pub(crate) fn set_runtime(&self, tokio_runtime: Arc) { + *self.tokio_runtime.write().unwrap() = Some(tokio_runtime); + } + + pub(crate) fn drop_runtime(&self) { + *self.tokio_runtime.write().unwrap() = None; + } } impl LdkEventHandler for EventHandler { @@ -284,7 +294,7 @@ impl LdkEventHandler for EventHandler { Err(err) => { log_error!(self.logger, "Failed to create funding transaction: {}", err); } - } + }; } LdkEvent::PaymentReceived { payment_hash, purpose, amount_msat } => { log_info!( @@ -387,11 +397,14 @@ impl LdkEventHandler for EventHandler { let forwarding_channel_manager = self.channel_manager.clone(); let min = time_forwardable.as_millis() as u64; - // TODO: any way we still can use tokio here? - // TODO: stop this thread on shutdown - thread::spawn(move || { + let locked_runtime = self.tokio_runtime.read().unwrap(); + if locked_runtime.as_ref().is_none() { + return; + } + + locked_runtime.as_ref().unwrap().spawn(async move { let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64; - thread::sleep(Duration::from_millis(millis_to_sleep)); + tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await; forwarding_channel_manager.process_pending_htlc_forwards(); }); } diff --git a/src/lib.rs b/src/lib.rs index b7a1e194b..58b0490e6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,7 +65,6 @@ use lightning_invoice::utils::DefaultRouter; use lightning_invoice::{payment, Currency, Invoice}; use bdk::bitcoin::secp256k1::Secp256k1; -use bdk::blockchain::esplora::EsploraBlockchain; use bdk::sled; use bdk::template::Bip84; @@ -84,13 +83,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime}; -// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold -// number of blocks after which BDK stops looking for scripts belonging to the wallet. -const BDK_CLIENT_STOP_GAP: usize = 20; - -// The number of concurrent requests made against the API provider. -const BDK_CLIENT_CONCURRENCY: u8 = 8; - // The timeout after which we abandon retrying failed payments. const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10); @@ -221,12 +213,8 @@ impl Builder { ) .expect("Failed to setup on-chain wallet"); - // TODO: Check that we can be sure that the Esplora client re-connects in case of failure - // and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime? - let blockchain = EsploraBlockchain::new(&config.esplora_server_url, BDK_CLIENT_STOP_GAP) - .with_concurrency(BDK_CLIENT_CONCURRENCY); - - let chain_access = Arc::new(ChainAccess::new(blockchain, bdk_wallet, Arc::clone(&logger))); + let chain_access = + Arc::new(ChainAccess::new(bdk_wallet, Arc::clone(&config), Arc::clone(&logger))); // Step 3: Initialize Persist let persister = Arc::new(FilesystemPersister::new(ldk_data_dir.clone())); @@ -354,7 +342,7 @@ impl Builder { Arc::new(EventQueue::new(Arc::clone(&persister))) }; - let event_handler = EventHandler::new( + let event_handler = Arc::new(EventHandler::new( Arc::clone(&chain_access), Arc::clone(&event_queue), Arc::clone(&channel_manager), @@ -364,7 +352,7 @@ impl Builder { Arc::clone(&outbound_payments), Arc::clone(&logger), Arc::clone(&config), - ); + )); //// Step 16: Create Router and InvoicePayer let router = DefaultRouter::new( @@ -378,7 +366,7 @@ impl Builder { router, Arc::clone(&scorer), Arc::clone(&logger), - event_handler, + Arc::clone(&event_handler), payment::Retry::Timeout(LDK_PAYMENT_RETRY_TIMEOUT), )); @@ -402,6 +390,7 @@ impl Builder { config, chain_access, event_queue, + event_handler, channel_manager, chain_monitor, peer_manager, @@ -421,7 +410,7 @@ impl Builder { /// Wraps all objects that need to be preserved during the run time of [`LdkLite`]. Will be dropped /// upon [`LdkLite::stop()`]. struct Runtime { - tokio_runtime: tokio::runtime::Runtime, + tokio_runtime: Arc, _background_processor: BackgroundProcessor, stop_networking: Arc, stop_wallet_sync: Arc, @@ -435,6 +424,7 @@ pub struct LdkLite { config: Arc, chain_access: Arc>, event_queue: Arc>, + event_handler: Arc, channel_manager: Arc, chain_monitor: Arc, peer_manager: Arc, @@ -443,7 +433,7 @@ pub struct LdkLite { persister: Arc, logger: Arc, scorer: Arc>, - invoice_payer: Arc>, + invoice_payer: Arc>>, inbound_payments: Arc, outbound_payments: Arc, peer_store: Arc>, @@ -482,6 +472,10 @@ impl LdkLite { runtime.stop_networking.store(true, Ordering::Release); self.peer_manager.disconnect_all_peers(); + // Drop the held runtimes. + self.chain_access.drop_runtime(); + self.event_handler.drop_runtime(); + // Drop the runtime, which stops the background processor and any possibly remaining tokio threads. *run_lock = None; Ok(()) @@ -489,7 +483,10 @@ impl LdkLite { fn setup_runtime(&self) -> Result { let tokio_runtime = - tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); + Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()); + + self.chain_access.set_runtime(Arc::clone(&tokio_runtime)); + self.event_handler.set_runtime(Arc::clone(&tokio_runtime)); // Setup wallet sync let chain_access = Arc::clone(&self.chain_access); @@ -499,7 +496,7 @@ impl LdkLite { let stop_wallet_sync = Arc::new(AtomicBool::new(false)); let stop_sync = Arc::clone(&stop_wallet_sync); - tokio_runtime.spawn(async move { + tokio_runtime.block_on(async move { let mut rounds = 0; loop { if stop_sync.load(Ordering::Acquire) { @@ -521,8 +518,8 @@ impl LdkLite { rounds = (rounds + 1) % 5; let confirmables = vec![ - &*sync_cman as &(dyn Confirm + Sync), - &*sync_cmon as &(dyn Confirm + Sync), + &*sync_cman as &(dyn Confirm + Send + Sync), + &*sync_cmon as &(dyn Confirm + Send + Sync), ]; let now = Instant::now(); match chain_access.sync(confirmables).await { @@ -669,7 +666,7 @@ impl LdkLite { let con_logger = Arc::clone(&self.logger); let con_pm = Arc::clone(&self.peer_manager); - runtime.tokio_runtime.block_on(async move { + runtime.tokio_runtime.spawn(async move { let res = connect_peer_if_necessary( con_peer_info.pubkey, con_peer_info.address,