Skip to content

Commit

Permalink
Make everything async
Browse files Browse the repository at this point in the history
We migrate BDK and other remaining blocking parts to async.
  • Loading branch information
tnull committed Oct 20, 2022
1 parent e82b5f7 commit bac34b5
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 93 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ lightning-background-processor = { version = "0.0.110" }
lightning-rapid-gossip-sync = { version = "0.0.110" }

#bdk = "0.20.0"
bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch="master", features = ["use-esplora-async", "key-value-db"]}
bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch = "master", default-features = false, features = ["async-interface","use-esplora-async", "key-value-db"]}
bitcoin = "0.28.1"

rand = "0.8.5"
Expand Down
107 changes: 63 additions & 44 deletions src/access.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,47 @@
use crate::logger::{
log_error, log_given_level, log_internal, log_trace, FilesystemLogger, Logger,
};
use crate::Error;
use crate::{Config, Error};

use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
use lightning::chain::WatchedOutput;
use lightning::chain::{Confirm, Filter};

use bdk::blockchain::{Blockchain, EsploraBlockchain, GetBlockHash, GetHeight, GetTx};
use bdk::blockchain::{Blockchain, EsploraBlockchain};
use bdk::database::BatchDatabase;
use bdk::esplora_client;
use bdk::wallet::AddressIndex;
use bdk::{SignOptions, SyncOptions};

use bitcoin::{BlockHash, Script, Transaction, Txid};
use bitcoin::{Script, Transaction, Txid};

use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};

/// The minimum feerate we are allowed to send, as specify by LDK.
const MIN_FEERATE: u32 = 253;

// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold
// number of blocks after which BDK stops looking for scripts belonging to the wallet.
const BDK_CLIENT_STOP_GAP: usize = 20;

// The number of concurrent requests made against the API provider.
const BDK_CLIENT_CONCURRENCY: u8 = 8;

pub struct ChainAccess<D>
where
D: BatchDatabase,
{
blockchain: EsploraBlockchain,
_client: Arc<esplora_client::AsyncClient>,
wallet: Mutex<bdk::Wallet<D>>,
queued_transactions: Mutex<Vec<Txid>>,
watched_transactions: Mutex<Vec<Txid>>,
queued_outputs: Mutex<Vec<WatchedOutput>>,
watched_outputs: Mutex<Vec<WatchedOutput>>,
last_sync_height: tokio::sync::Mutex<Option<u32>>,
tokio_runtime: RwLock<Option<Arc<tokio::runtime::Runtime>>>,
_config: Arc<Config>,
logger: Arc<FilesystemLogger>,
}

Expand All @@ -39,30 +50,49 @@ where
D: BatchDatabase,
{
pub(crate) fn new(
blockchain: EsploraBlockchain, wallet: bdk::Wallet<D>, logger: Arc<FilesystemLogger>,
wallet: bdk::Wallet<D>, config: Arc<Config>, logger: Arc<FilesystemLogger>,
) -> Self {
let wallet = Mutex::new(wallet);
let watched_transactions = Mutex::new(Vec::new());
let queued_transactions = Mutex::new(Vec::new());
let watched_outputs = Mutex::new(Vec::new());
let queued_outputs = Mutex::new(Vec::new());
let last_sync_height = tokio::sync::Mutex::new(None);
let tokio_runtime = RwLock::new(None);
// TODO: Check that we can be sure that the Esplora client re-connects in case of failure
// and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime?
let blockchain = EsploraBlockchain::new(&config.esplora_server_url, BDK_CLIENT_STOP_GAP)
.with_concurrency(BDK_CLIENT_CONCURRENCY);
let client_builder =
esplora_client::Builder::new(&format!("http://{}", &config.esplora_server_url));
let client = Arc::new(client_builder.build_async().unwrap());
Self {
blockchain,
_client: client,
wallet,
queued_transactions,
watched_transactions,
queued_outputs,
watched_outputs,
last_sync_height,
tokio_runtime,
_config: config,
logger,
}
}

pub(crate) fn set_runtime(&self, tokio_runtime: Arc<tokio::runtime::Runtime>) {
*self.tokio_runtime.write().unwrap() = Some(tokio_runtime);
}

pub(crate) fn drop_runtime(&self) {
*self.tokio_runtime.write().unwrap() = None;
}

pub(crate) async fn sync_wallet(&self) -> Result<(), Error> {
let sync_options = SyncOptions { progress: None };

self.wallet.lock().unwrap().sync(&self.blockchain, sync_options)?;
self.wallet.lock().unwrap().sync(&self.blockchain, sync_options).await?;

Ok(())
}
Expand Down Expand Up @@ -237,11 +267,11 @@ where
Ok(())
}

pub(crate) fn create_funding_transaction(
pub(crate) async fn create_funding_transaction(
&self, output_script: &Script, value_sats: u64, confirmation_target: ConfirmationTarget,
) -> Result<Transaction, Error> {
let num_blocks = num_blocks_from_conf_target(confirmation_target);
let fee_rate = self.blockchain.estimate_fee(num_blocks)?;
let fee_rate = self.blockchain.estimate_fee(num_blocks).await?;

let locked_wallet = self.wallet.lock().unwrap();
let mut tx_builder = locked_wallet.build_tx();
Expand Down Expand Up @@ -280,9 +310,18 @@ where
fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 {
let num_blocks = num_blocks_from_conf_target(confirmation_target);
let fallback_fee = fallback_fee_from_conf_target(confirmation_target);
self.blockchain
.estimate_fee(num_blocks)
.map_or(fallback_fee, |fee_rate| (fee_rate.fee_wu(1000) as u32).max(MIN_FEERATE)) as u32

let locked_runtime = self.tokio_runtime.read().unwrap();
if locked_runtime.as_ref().is_none() {
return fallback_fee;
}

locked_runtime.as_ref().unwrap().block_on(async {
self.blockchain
.estimate_fee(num_blocks)
.await
.map_or(fallback_fee, |fee_rate| (fee_rate.fee_wu(1000) as u32).max(MIN_FEERATE)) as u32
})
}
}

Expand All @@ -291,13 +330,20 @@ where
D: BatchDatabase,
{
fn broadcast_transaction(&self, tx: &Transaction) {
match self.blockchain.broadcast(tx) {
Ok(_) => {}
Err(err) => {
log_error!(self.logger, "Failed to broadcast transaction: {}", err);
panic!("Failed to broadcast transaction: {}", err);
}
let locked_runtime = self.tokio_runtime.read().unwrap();
if locked_runtime.as_ref().is_none() {
return;
}

locked_runtime.as_ref().unwrap().block_on(async {
match self.blockchain.broadcast(tx).await {
Ok(_) => {}
Err(err) => {
log_error!(self.logger, "Failed to broadcast transaction: {}", err);
panic!("Failed to broadcast transaction: {}", err);
}
}
})
}
}

Expand All @@ -315,33 +361,6 @@ where
}
}

impl<D> GetHeight for ChainAccess<D>
where
D: BatchDatabase,
{
fn get_height(&self) -> Result<u32, bdk::Error> {
self.blockchain.get_height()
}
}

impl<D> GetBlockHash for ChainAccess<D>
where
D: BatchDatabase,
{
fn get_block_hash(&self, height: u64) -> Result<BlockHash, bdk::Error> {
self.blockchain.get_block_hash(height)
}
}

impl<D> GetTx for ChainAccess<D>
where
D: BatchDatabase,
{
fn get_tx(&self, txid: &Txid) -> Result<Option<Transaction>, bdk::Error> {
self.blockchain.get_tx(txid)
}
}

fn num_blocks_from_conf_target(confirmation_target: ConfirmationTarget) -> usize {
match confirmation_target {
ConfirmationTarget::Background => 12,
Expand Down
74 changes: 47 additions & 27 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
use bitcoin::secp256k1::Secp256k1;
use rand::{thread_rng, Rng};
use std::collections::{hash_map, VecDeque};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::Duration;

/// The event queue will be persisted under this key.
Expand Down Expand Up @@ -221,6 +220,7 @@ pub(crate) struct EventHandler {
keys_manager: Arc<KeysManager>,
inbound_payments: Arc<PaymentInfoStorage>,
outbound_payments: Arc<PaymentInfoStorage>,
tokio_runtime: RwLock<Option<Arc<tokio::runtime::Runtime>>>,
logger: Arc<FilesystemLogger>,
_config: Arc<Config>,
}
Expand All @@ -233,6 +233,7 @@ impl EventHandler {
inbound_payments: Arc<PaymentInfoStorage>, outbound_payments: Arc<PaymentInfoStorage>,
logger: Arc<FilesystemLogger>, _config: Arc<Config>,
) -> Self {
let tokio_runtime = RwLock::new(None);
Self {
event_queue,
chain_access,
Expand All @@ -241,10 +242,19 @@ impl EventHandler {
keys_manager,
inbound_payments,
outbound_payments,
tokio_runtime,
logger,
_config,
}
}

pub(crate) fn set_runtime(&self, tokio_runtime: Arc<tokio::runtime::Runtime>) {
*self.tokio_runtime.write().unwrap() = Some(tokio_runtime);
}

pub(crate) fn drop_runtime(&self) {
*self.tokio_runtime.write().unwrap() = None;
}
}

impl LdkEventHandler for EventHandler {
Expand All @@ -262,29 +272,36 @@ impl LdkEventHandler for EventHandler {
let confirmation_target = ConfirmationTarget::Normal;

// Sign the final funding transaction and broadcast it.
match self.chain_access.create_funding_transaction(
&output_script,
*channel_value_satoshis,
confirmation_target,
) {
Ok(final_tx) => {
// Give the funding transaction back to LDK for opening the channel.
if self
.channel_manager
.funding_transaction_generated(
&temporary_channel_id,
counterparty_node_id,
final_tx,
)
.is_err()
{
log_error!(self.logger, "Channel went away before we could fund it. The peer disconnected or refused the channel");
let locked_runtime = self.tokio_runtime.read().unwrap();
if locked_runtime.as_ref().is_none() {
return;
}

locked_runtime.as_ref().unwrap().block_on(async {
match self.chain_access.create_funding_transaction(
&output_script,
*channel_value_satoshis,
confirmation_target,
).await {
Ok(final_tx) => {
// Give the funding transaction back to LDK for opening the channel.
if self
.channel_manager
.funding_transaction_generated(
&temporary_channel_id,
counterparty_node_id,
final_tx,
)
.is_err()
{
log_error!(self.logger, "Channel went away before we could fund it. The peer disconnected or refused the channel");
}
}
Err(err) => {
log_error!(self.logger, "Failed to create funding transaction: {}", err);
}
}
Err(err) => {
log_error!(self.logger, "Failed to create funding transaction: {}", err);
}
}
});
}
LdkEvent::PaymentReceived { payment_hash, purpose, amount_msat } => {
log_info!(
Expand Down Expand Up @@ -387,11 +404,14 @@ impl LdkEventHandler for EventHandler {
let forwarding_channel_manager = self.channel_manager.clone();
let min = time_forwardable.as_millis() as u64;

// TODO: any way we still can use tokio here?
// TODO: stop this thread on shutdown
thread::spawn(move || {
let locked_runtime = self.tokio_runtime.read().unwrap();
if locked_runtime.as_ref().is_none() {
return;
}

locked_runtime.as_ref().unwrap().spawn(async move {
let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64;
thread::sleep(Duration::from_millis(millis_to_sleep));
tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
forwarding_channel_manager.process_pending_htlc_forwards();
});
}
Expand Down
Loading

0 comments on commit bac34b5

Please sign in to comment.