Skip to content

Commit

Permalink
f Make async work. Ugly though.
Browse files Browse the repository at this point in the history
  • Loading branch information
tnull committed Sep 30, 2022
1 parent ed9e7d0 commit aec3a52
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 44 deletions.
127 changes: 105 additions & 22 deletions src/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,47 @@ use crate::logger::{
log_error, log_given_level, log_info, log_internal, log_trace, log_warn, FilesystemLogger,
Logger,
};
use crate::{scid_utils, LdkLiteConfig};

use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
use lightning::chain::WatchedOutput;
use lightning::chain::{Confirm, Filter};
use lightning::chain::{Access, AccessError, Confirm, Filter};

use bdk::blockchain::{Blockchain, EsploraBlockchain, GetBlockHash, GetHeight, GetTx};
use bdk::database::BatchDatabase;
use bdk::esplora_client;
use bdk::wallet::AddressIndex;
use bdk::{SignOptions, SyncOptions};

use bitcoin::{BlockHash, Script, Transaction, Txid};
use bitcoin::{BlockHash, Script, Transaction, TxOut, Txid};

use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};

/// The minimum feerate we are allowed to send, as specify by LDK.
const MIN_FEERATE: u32 = 253;

// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold
// number of blocks after which BDK stops looking for scripts belonging to the wallet.
const BDK_CLIENT_STOP_GAP: usize = 20;

// The number of concurrent requests made against the API provider.
const BDK_CLIENT_CONCURRENCY: u8 = 8;

pub struct LdkLiteChainAccess<D>
where
D: BatchDatabase,
{
blockchain: EsploraBlockchain,
client: Arc<esplora_client::AsyncClient>,
wallet: Mutex<bdk::Wallet<D>>,
queued_transactions: Mutex<Vec<Txid>>,
watched_transactions: Mutex<Vec<Txid>>,
queued_outputs: Mutex<Vec<WatchedOutput>>,
watched_outputs: Mutex<Vec<WatchedOutput>>,
last_sync_height: tokio::sync::Mutex<Option<u32>>,
tokio_runtime: RwLock<Option<Arc<tokio::runtime::Runtime>>>,
config: Arc<LdkLiteConfig>,
logger: Arc<FilesystemLogger>,
}

Expand All @@ -41,26 +53,45 @@ where
D: BatchDatabase,
{
pub(crate) fn new(
blockchain: EsploraBlockchain, wallet: bdk::Wallet<D>, logger: Arc<FilesystemLogger>,
wallet: bdk::Wallet<D>, config: Arc<LdkLiteConfig>, logger: Arc<FilesystemLogger>,
) -> Self {
let wallet = Mutex::new(wallet);
let watched_transactions = Mutex::new(Vec::new());
let queued_transactions = Mutex::new(Vec::new());
let watched_outputs = Mutex::new(Vec::new());
let queued_outputs = Mutex::new(Vec::new());
let last_sync_height = tokio::sync::Mutex::new(None);
let tokio_runtime = RwLock::new(None);
// TODO: Check that we can be sure that the Esplora client re-connects in case of failure
// and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime?
let blockchain = EsploraBlockchain::new(&config.esplora_server_url, BDK_CLIENT_STOP_GAP)
.with_concurrency(BDK_CLIENT_CONCURRENCY);
let client_builder =
esplora_client::Builder::new(&format!("http://{}", &config.esplora_server_url));
let client = Arc::new(client_builder.build_async().unwrap());
Self {
blockchain,
client,
wallet,
queued_transactions,
watched_transactions,
queued_outputs,
watched_outputs,
last_sync_height,
tokio_runtime,
config,
logger,
}
}

pub(crate) fn set_runtime(&self, tokio_runtime: Arc<tokio::runtime::Runtime>) {
*self.tokio_runtime.write().unwrap() = Some(tokio_runtime);
}

pub(crate) fn drop_runtime(&self) {
*self.tokio_runtime.write().unwrap() = None;
}

pub(crate) async fn sync_wallet(&self) -> Result<(), Error> {
let sync_options = SyncOptions { progress: None };

Expand All @@ -74,9 +105,7 @@ where
}

pub(crate) async fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync)>) -> Result<(), Error> {
let client = &*self.blockchain;

let cur_height = client.get_height().await?;
let cur_height = self.client.get_height().await?;

let mut locked_last_sync_height = self.last_sync_height.lock().await;
if cur_height >= locked_last_sync_height.unwrap_or(0) {
Expand All @@ -93,10 +122,8 @@ where
&self, confirmables: &Vec<&(dyn Confirm + Sync)>, cur_height: u32,
locked_last_sync_height: &mut tokio::sync::MutexGuard<'_, Option<u32>>,
) -> Result<(), Error> {
let client = &*self.blockchain;

// Inform the interface of the new block.
let cur_block_header = client.get_header(cur_height).await?;
let cur_block_header = self.client.get_header(cur_height).await?;
for c in confirmables {
c.best_block_updated(&cur_block_header, cur_height);
}
Expand All @@ -108,8 +135,6 @@ where
async fn sync_transactions_confirmed(
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
) -> Result<(), Error> {
let client = &*self.blockchain;

// First, check the confirmation status of registered transactions as well as the
// status of dependent transactions of registered outputs.

Expand All @@ -131,12 +156,12 @@ where
let mut unconfirmed_registered_txs = Vec::new();

for txid in registered_txs {
if let Some(tx_status) = client.get_tx_status(&txid).await? {
if let Some(tx_status) = self.client.get_tx_status(&txid).await? {
if tx_status.confirmed {
if let Some(tx) = client.get_tx(&txid).await? {
if let Some(tx) = self.client.get_tx(&txid).await? {
if let Some(block_height) = tx_status.block_height {
let block_header = client.get_header(block_height).await?;
if let Some(merkle_proof) = client.get_merkle_proof(&txid).await? {
let block_header = self.client.get_header(block_height).await?;
if let Some(merkle_proof) = self.client.get_merkle_proof(&txid).await? {
confirmed_txs.push((
tx,
block_height,
Expand All @@ -163,19 +188,20 @@ where
let mut unspent_registered_outputs = Vec::new();

for output in registered_outputs {
if let Some(output_status) = client
if let Some(output_status) = self
.client
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)
.await?
{
if output_status.spent {
if let Some(spending_tx_status) = output_status.status {
if spending_tx_status.confirmed {
let spending_txid = output_status.txid.unwrap();
if let Some(spending_tx) = client.get_tx(&spending_txid).await? {
if let Some(spending_tx) = self.client.get_tx(&spending_txid).await? {
let block_height = spending_tx_status.block_height.unwrap();
let block_header = client.get_header(block_height).await?;
let block_header = self.client.get_header(block_height).await?;
if let Some(merkle_proof) =
client.get_merkle_proof(&spending_txid).await?
self.client.get_merkle_proof(&spending_txid).await?
{
confirmed_txs.push((
spending_tx,
Expand Down Expand Up @@ -217,13 +243,13 @@ where
async fn sync_transaction_unconfirmed(
&self, confirmables: &Vec<&(dyn Confirm + Sync)>,
) -> Result<(), Error> {
let client = &*self.blockchain;
// Query the interface for relevant txids and check whether they have been
// reorged-out of the chain.
let relevant_txids =
confirmables.iter().flat_map(|c| c.get_relevant_txids()).collect::<HashSet<Txid>>();
for txid in relevant_txids {
let tx_unconfirmed = client
let tx_unconfirmed = self
.client
.get_tx_status(&txid)
.await
.ok()
Expand Down Expand Up @@ -300,6 +326,63 @@ where
}
}

impl<D> Access for LdkLiteChainAccess<D>
where
D: BatchDatabase,
{
fn get_utxo(
&self, genesis_hash: &BlockHash, short_channel_id: u64,
) -> Result<TxOut, AccessError> {
if genesis_hash
!= &bitcoin::blockdata::constants::genesis_block(self.config.network)
.header
.block_hash()
{
return Err(AccessError::UnknownChain);
}

let locked_runtime = self.tokio_runtime.read().unwrap();
if locked_runtime.as_ref().is_none() {
return Err(AccessError::UnknownTx);
}

let block_height = scid_utils::block_from_scid(&short_channel_id);
let tx_index = scid_utils::tx_index_from_scid(&short_channel_id);
let vout = scid_utils::vout_from_scid(&short_channel_id);

let block_hash = self
.blockchain
.get_block_hash(block_height.into())
.map_err(|_| AccessError::UnknownTx)?;

let client_tokio = Arc::clone(&self.client);
let txout_opt: Arc<Mutex<Option<TxOut>>> = Arc::new(Mutex::new(None));
let txout_opt_tokio = Arc::clone(&txout_opt);

locked_runtime.as_ref().unwrap().spawn(async move {
let txid_res =
client_tokio.get_txid_at_block_index(&block_hash, tx_index as usize).await;

if let Some(txid) = txid_res.unwrap_or(None) {
let tx_res = client_tokio.get_tx(&txid).await;

if let Some(tx) = tx_res.unwrap_or(None) {
if let Some(tx_out) = tx.output.get(vout as usize) {
*txout_opt_tokio.lock().unwrap() = Some(tx_out.clone());
}
}
}
});

let locked_opt = txout_opt.lock().unwrap();
if let Some(tx_out) = &*locked_opt {
return Ok(tx_out.clone());
} else {
return Err(AccessError::UnknownTx);
}
}
}

impl<D> Filter for LdkLiteChainAccess<D>
where
D: BatchDatabase,
Expand Down
32 changes: 10 additions & 22 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ mod error;
pub mod event;
mod hex_utils;
mod io_utils;
mod scid_utils;
mod logger;
mod peer_store;
mod scid_utils;

use access::LdkLiteChainAccess;
pub use error::LdkLiteError as Error;
Expand Down Expand Up @@ -70,7 +70,6 @@ use lightning_invoice::utils::DefaultRouter;
use lightning_invoice::{payment, Currency, Invoice};

use bdk::bitcoin::secp256k1::Secp256k1;
use bdk::blockchain::esplora::EsploraBlockchain;
use bdk::blockchain::{GetBlockHash, GetHeight};
use bdk::sled;
use bdk::template::Bip84;
Expand All @@ -90,13 +89,6 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime};

// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold
// number of blocks after which BDK stops looking for scripts belonging to the wallet.
const BDK_CLIENT_STOP_GAP: usize = 20;

// The number of concurrent requests made against the API provider.
const BDK_CLIENT_CONCURRENCY: u8 = 8;

// The timeout after which we abandon retrying failed payments.
const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10);

Expand Down Expand Up @@ -223,17 +215,8 @@ impl LdkLiteBuilder {
database,
)?;

// TODO: Check that we can be sure that the Esplora client re-connects in case of failure
// and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime?
let blockchain = EsploraBlockchain::new(&config.esplora_server_url, BDK_CLIENT_STOP_GAP)
.with_concurrency(BDK_CLIENT_CONCURRENCY);

let chain_access = Arc::new(LdkLiteChainAccess::new(
blockchain,
bdk_wallet,
Arc::clone(&config),
Arc::clone(&logger),
));
let chain_access =
Arc::new(LdkLiteChainAccess::new(bdk_wallet, Arc::clone(&config), Arc::clone(&logger)));

// Step 3: Initialize Persist
let persister = Arc::new(FilesystemPersister::new(ldk_data_dir.clone()));
Expand Down Expand Up @@ -412,7 +395,7 @@ impl LdkLiteBuilder {
/// Wraps all objects that need to be preserved during the run time of [`LdkLite`]. Will be dropped
/// upon [`LdkLite::stop()`].
struct LdkLiteRuntime {
tokio_runtime: tokio::runtime::Runtime,
tokio_runtime: Arc<tokio::runtime::Runtime>,
_background_processor: BackgroundProcessor,
stop_networking: Arc<AtomicBool>,
stop_wallet_sync: Arc<AtomicBool>,
Expand Down Expand Up @@ -473,14 +456,19 @@ impl LdkLite {
runtime.stop_networking.store(true, Ordering::Release);
self.peer_manager.disconnect_all_peers();

// Drop the chain access held runtime.
self.chain_access.drop_runtime();

// Drop the runtime, which stops the background processor and any possibly remaining tokio threads.
*run_lock = None;
Ok(())
}

fn setup_runtime(&self) -> Result<LdkLiteRuntime, Error> {
let tokio_runtime =
tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap());

self.chain_access.set_runtime(Arc::clone(&tokio_runtime));

// Setup wallet sync
let chain_access = Arc::clone(&self.chain_access);
Expand Down

0 comments on commit aec3a52

Please sign in to comment.