Skip to content

Commit

Permalink
Make everything async
Browse files Browse the repository at this point in the history
We migrate BDK and other remaining blocking parts to async.
  • Loading branch information
tnull committed Oct 21, 2022
1 parent e82b5f7 commit ed49984
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 99 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ lightning-background-processor = { version = "0.0.110" }
lightning-rapid-gossip-sync = { version = "0.0.110" }

#bdk = "0.20.0"
bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch="master", features = ["use-esplora-async", "key-value-db"]}
bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch = "master", default-features = false, features = ["async-interface","use-esplora-async", "key-value-db"]}
bitcoin = "0.28.1"

rand = "0.8.5"
Expand Down
171 changes: 107 additions & 64 deletions src/access.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,47 @@
use crate::logger::{
log_error, log_given_level, log_internal, log_trace, FilesystemLogger, Logger,
};
use crate::Error;
use crate::{Config, Error};

use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
use lightning::chain::WatchedOutput;
use lightning::chain::{Confirm, Filter};

use bdk::blockchain::{Blockchain, EsploraBlockchain, GetBlockHash, GetHeight, GetTx};
use bdk::blockchain::EsploraBlockchain;
use bdk::database::BatchDatabase;
use bdk::esplora_client;
use bdk::wallet::AddressIndex;
use bdk::{SignOptions, SyncOptions};
use bdk::{FeeRate, SignOptions, SyncOptions};

use bitcoin::{BlockHash, Script, Transaction, Txid};
use bitcoin::{Script, Transaction, Txid};

use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use std::sync::{mpsc, Arc, Mutex, RwLock};

/// The minimum feerate we are allowed to send, as specify by LDK.
const MIN_FEERATE: u32 = 253;

// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold
// number of blocks after which BDK stops looking for scripts belonging to the wallet.
const BDK_CLIENT_STOP_GAP: usize = 20;

// The number of concurrent requests made against the API provider.
const BDK_CLIENT_CONCURRENCY: u8 = 8;

pub struct ChainAccess<D>
where
D: BatchDatabase,
{
blockchain: EsploraBlockchain,
blockchain: Arc<EsploraBlockchain>,
client: Arc<esplora_client::AsyncClient>,
wallet: Mutex<bdk::Wallet<D>>,
queued_transactions: Mutex<Vec<Txid>>,
watched_transactions: Mutex<Vec<Txid>>,
queued_outputs: Mutex<Vec<WatchedOutput>>,
watched_outputs: Mutex<Vec<WatchedOutput>>,
last_sync_height: tokio::sync::Mutex<Option<u32>>,
tokio_runtime: RwLock<Option<Arc<tokio::runtime::Runtime>>>,
_config: Arc<Config>,
logger: Arc<FilesystemLogger>,
}

Expand All @@ -39,38 +50,57 @@ where
D: BatchDatabase,
{
pub(crate) fn new(
blockchain: EsploraBlockchain, wallet: bdk::Wallet<D>, logger: Arc<FilesystemLogger>,
wallet: bdk::Wallet<D>, config: Arc<Config>, logger: Arc<FilesystemLogger>,
) -> Self {
let wallet = Mutex::new(wallet);
let watched_transactions = Mutex::new(Vec::new());
let queued_transactions = Mutex::new(Vec::new());
let watched_outputs = Mutex::new(Vec::new());
let queued_outputs = Mutex::new(Vec::new());
let last_sync_height = tokio::sync::Mutex::new(None);
let tokio_runtime = RwLock::new(None);
// TODO: Check that we can be sure that the Esplora client re-connects in case of failure
// and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime?
let blockchain = Arc::new(
EsploraBlockchain::new(&config.esplora_server_url, BDK_CLIENT_STOP_GAP)
.with_concurrency(BDK_CLIENT_CONCURRENCY),
);
let client_builder =
esplora_client::Builder::new(&format!("http://{}", &config.esplora_server_url));
let client = Arc::new(client_builder.build_async().unwrap());
Self {
blockchain,
client,
wallet,
queued_transactions,
watched_transactions,
queued_outputs,
watched_outputs,
last_sync_height,
tokio_runtime,
_config: config,
logger,
}
}

pub(crate) fn set_runtime(&self, tokio_runtime: Arc<tokio::runtime::Runtime>) {
*self.tokio_runtime.write().unwrap() = Some(tokio_runtime);
}

pub(crate) fn drop_runtime(&self) {
*self.tokio_runtime.write().unwrap() = None;
}

pub(crate) async fn sync_wallet(&self) -> Result<(), Error> {
let sync_options = SyncOptions { progress: None };

self.wallet.lock().unwrap().sync(&self.blockchain, sync_options)?;
self.wallet.lock().unwrap().sync(&self.blockchain, sync_options).await?;

Ok(())
}

pub(crate) async fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync)>) -> Result<(), Error> {
let client = &*self.blockchain;

let cur_height = client.get_height().await?;
pub(crate) async fn sync(&self, confirmables: Vec<&(dyn Confirm + Send + Sync)>) -> Result<(), Error> {
let cur_height = self.client.get_height().await?;

let mut locked_last_sync_height = self.last_sync_height.lock().await;
if cur_height >= locked_last_sync_height.unwrap_or(0) {
Expand All @@ -84,13 +114,11 @@ where
}

async fn sync_best_block_updated(
&self, confirmables: &Vec<&(dyn Confirm + Sync)>, cur_height: u32,
&self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>, cur_height: u32,
locked_last_sync_height: &mut tokio::sync::MutexGuard<'_, Option<u32>>,
) -> Result<(), Error> {
let client = &*self.blockchain;

// Inform the interface of the new block.
let cur_block_header = client.get_header(cur_height).await?;
let cur_block_header = self.client.get_header(cur_height).await?;
for c in confirmables {
c.best_block_updated(&cur_block_header, cur_height);
}
Expand All @@ -100,10 +128,8 @@ where
}

async fn sync_transactions_confirmed(
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
&self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>,
) -> Result<(), Error> {
let client = &*self.blockchain;

// First, check the confirmation status of registered transactions as well as the
// status of dependent transactions of registered outputs.

Expand All @@ -125,13 +151,13 @@ where
let mut unconfirmed_registered_txs = Vec::new();

for txid in registered_txs {
if let Some(tx_status) = client.get_tx_status(&txid).await? {
if let Some(tx_status) = self.client.get_tx_status(&txid).await? {
if tx_status.confirmed {
if let Some(tx) = client.get_tx(&txid).await? {
if let Some(tx) = self.client.get_tx(&txid).await? {
if let Some(block_height) = tx_status.block_height {
// TODO: Switch to `get_header_by_hash` once released upstream (https://github.com/bitcoindevkit/rust-esplora-client/pull/17)
let block_header = client.get_header(block_height).await?;
if let Some(merkle_proof) = client.get_merkle_proof(&txid).await? {
let block_header = self.client.get_header(block_height).await?;
if let Some(merkle_proof) = self.client.get_merkle_proof(&txid).await? {
if block_height == merkle_proof.block_height {
confirmed_txs.push((
tx,
Expand Down Expand Up @@ -160,20 +186,21 @@ where
let mut unspent_registered_outputs = Vec::new();

for output in registered_outputs {
if let Some(output_status) = client
if let Some(output_status) = self
.client
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)
.await?
{
if output_status.spent {
if let Some(spending_tx_status) = output_status.status {
if spending_tx_status.confirmed {
let spending_txid = output_status.txid.unwrap();
if let Some(spending_tx) = client.get_tx(&spending_txid).await? {
if let Some(spending_tx) = self.client.get_tx(&spending_txid).await? {
let block_height = spending_tx_status.block_height.unwrap();
// TODO: Switch to `get_header_by_hash` once released upstream (https://github.com/bitcoindevkit/rust-esplora-client/pull/17)
let block_header = client.get_header(block_height).await?;
let block_header = self.client.get_header(block_height).await?;
if let Some(merkle_proof) =
client.get_merkle_proof(&spending_txid).await?
self.client.get_merkle_proof(&spending_txid).await?
{
confirmed_txs.push((
spending_tx,
Expand Down Expand Up @@ -213,15 +240,15 @@ where
}

async fn sync_transaction_unconfirmed(
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
&self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>,
) -> Result<(), Error> {
let client = &*self.blockchain;
// Query the interface for relevant txids and check whether they have been
// reorged-out of the chain.
let relevant_txids =
confirmables.iter().flat_map(|c| c.get_relevant_txids()).collect::<HashSet<Txid>>();
for txid in relevant_txids {
let tx_unconfirmed = client
let tx_unconfirmed = self
.client
.get_tx_status(&txid)
.await
.ok()
Expand All @@ -240,12 +267,14 @@ where
pub(crate) fn create_funding_transaction(
&self, output_script: &Script, value_sats: u64, confirmation_target: ConfirmationTarget,
) -> Result<Transaction, Error> {
let num_blocks = num_blocks_from_conf_target(confirmation_target);
let fee_rate = self.blockchain.estimate_fee(num_blocks)?;

let locked_wallet = self.wallet.lock().unwrap();
let mut tx_builder = locked_wallet.build_tx();

let fallback_fee = fallback_fee_from_conf_target(confirmation_target);
let fee_rate = self
.estimate_fee(confirmation_target)
.unwrap_or(FeeRate::from_sat_per_kwu(fallback_fee as f32));

tx_builder.add_recipient(output_script.clone(), value_sats).fee_rate(fee_rate).enable_rbf();

let (mut psbt, _) = tx_builder.finish()?;
Expand All @@ -271,17 +300,43 @@ where
let address_info = self.wallet.lock().unwrap().get_address(AddressIndex::New)?;
Ok(address_info.address)
}

fn estimate_fee(&self, confirmation_target: ConfirmationTarget) -> Result<bdk::FeeRate, Error> {
let num_blocks = num_blocks_from_conf_target(confirmation_target);

let locked_runtime = self.tokio_runtime.read().unwrap();
if locked_runtime.as_ref().is_none() {
return Err(Error::FeeEstimationFailed);
}

let tokio_client = Arc::clone(&self.client);
let (sender, receiver) = mpsc::sync_channel(1);

locked_runtime.as_ref().unwrap().spawn(async move {
let res = tokio_client.get_fee_estimates().await;
let _ = sender.send(res);
});

let estimates = receiver
.recv()
.map_err(|_| Error::FeeEstimationFailed)?
.map_err(|_| Error::FeeEstimationFailed)?;

Ok(bdk::FeeRate::from_sat_per_vb(
esplora_client::convert_fee_rate(num_blocks, estimates)
.map_err(|_| Error::FeeEstimationFailed)?,
))
}
}

impl<D> FeeEstimator for ChainAccess<D>
where
D: BatchDatabase,
{
fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 {
let num_blocks = num_blocks_from_conf_target(confirmation_target);
let fallback_fee = fallback_fee_from_conf_target(confirmation_target);
self.blockchain
.estimate_fee(num_blocks)

self.estimate_fee(confirmation_target)
.map_or(fallback_fee, |fee_rate| (fee_rate.fee_wu(1000) as u32).max(MIN_FEERATE)) as u32
}
}
Expand All @@ -291,7 +346,22 @@ where
D: BatchDatabase,
{
fn broadcast_transaction(&self, tx: &Transaction) {
match self.blockchain.broadcast(tx) {
let locked_runtime = self.tokio_runtime.read().unwrap();
if locked_runtime.as_ref().is_none() {
log_error!(self.logger, "Failed to broadcast transaction: No runtime.");
return;
}

let tokio_client = Arc::clone(&self.client);
let tokio_tx = tx.clone();
let (sender, receiver) = mpsc::sync_channel(1);

locked_runtime.as_ref().unwrap().spawn(async move {
let res = tokio_client.broadcast(&tokio_tx).await;
let _ = sender.send(res);
});

match receiver.recv().unwrap() {
Ok(_) => {}
Err(err) => {
log_error!(self.logger, "Failed to broadcast transaction: {}", err);
Expand All @@ -315,33 +385,6 @@ where
}
}

impl<D> GetHeight for ChainAccess<D>
where
D: BatchDatabase,
{
fn get_height(&self) -> Result<u32, bdk::Error> {
self.blockchain.get_height()
}
}

impl<D> GetBlockHash for ChainAccess<D>
where
D: BatchDatabase,
{
fn get_block_hash(&self, height: u64) -> Result<BlockHash, bdk::Error> {
self.blockchain.get_block_hash(height)
}
}

impl<D> GetTx for ChainAccess<D>
where
D: BatchDatabase,
{
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, bdk::Error> {
self.blockchain.get_tx(txid)
}
}

fn num_blocks_from_conf_target(confirmation_target: ConfirmationTarget) -> usize {
match confirmation_target {
ConfirmationTarget::Background => 12,
Expand Down
7 changes: 4 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum Error {
NotRunning,
/// The funding transaction could not be created.
FundingTxCreationFailed,
/// Returned when we could not estimate a transaction fee.
FeeEstimationFailed,
/// A network connection has been closed.
ConnectionFailed,
/// Payment of the given invoice has already been intiated.
Expand Down Expand Up @@ -41,9 +43,8 @@ impl fmt::Display for Error {
match *self {
Self::AlreadyRunning => write!(f, "LDKLite is already running."),
Self::NotRunning => write!(f, "LDKLite is not running."),
Self::FundingTxCreationFailed => {
write!(f, "Funding transaction could not be created.")
}
Self::FundingTxCreationFailed => write!(f, "Funding transaction could not be created."),
Self::FeeEstimationFailed => write!(f, "Fee estimation failed."),
Self::ConnectionFailed => write!(f, "Network connection closed."),
Self::NonUniquePaymentHash => write!(f, "An invoice must not get payed twice."),
Self::InvoiceInvalid => write!(f, "The given invoice is invalid."),
Expand Down
Loading

0 comments on commit ed49984

Please sign in to comment.