diff --git a/Cargo.toml b/Cargo.toml index d996a65ec..c2a826356 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ lightning-background-processor = { version = "0.0.110" } lightning-rapid-gossip-sync = { version = "0.0.110" } #bdk = "0.20.0" -bdk = { git = "https://github.com/tnull/bdk", branch="feat/use-external-esplora-client", features = ["use-esplora-ureq", "key-value-db"]} +bdk = { git = "https://github.com/bitcoindevkit/bdk.git", branch = "master", default-features = false, features = ["async-interface","use-esplora-async", "key-value-db"]} bitcoin = "0.28.1" rand = "0.8.5" diff --git a/src/access.rs b/src/access.rs new file mode 100644 index 000000000..4718d4d9f --- /dev/null +++ b/src/access.rs @@ -0,0 +1,404 @@ +use crate::logger::{ + log_error, log_given_level, log_internal, log_trace, FilesystemLogger, Logger, +}; +use crate::{Config, Error}; + +use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; +use lightning::chain::WatchedOutput; +use lightning::chain::{Confirm, Filter}; + +use bdk::blockchain::EsploraBlockchain; +use bdk::database::BatchDatabase; +use bdk::esplora_client; +use bdk::wallet::AddressIndex; +use bdk::{FeeRate, SignOptions, SyncOptions}; + +use bitcoin::{Script, Transaction, Txid}; + +use std::collections::HashSet; +use std::sync::{mpsc, Arc, Mutex, RwLock}; + +/// The minimum feerate we are allowed to send, as specify by LDK. +const MIN_FEERATE: u32 = 253; + +// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold +// number of blocks after which BDK stops looking for scripts belonging to the wallet. +const BDK_CLIENT_STOP_GAP: usize = 20; + +// The number of concurrent requests made against the API provider. +const BDK_CLIENT_CONCURRENCY: u8 = 8; + +pub struct ChainAccess +where + D: BatchDatabase, +{ + blockchain: Arc, + client: Arc, + wallet: Mutex>, + queued_transactions: Mutex>, + watched_transactions: Mutex>, + queued_outputs: Mutex>, + watched_outputs: Mutex>, + last_sync_height: tokio::sync::Mutex>, + tokio_runtime: RwLock>>, + _config: Arc, + logger: Arc, +} + +impl ChainAccess +where + D: BatchDatabase, +{ + pub(crate) fn new( + wallet: bdk::Wallet, config: Arc, logger: Arc, + ) -> Self { + let wallet = Mutex::new(wallet); + let watched_transactions = Mutex::new(Vec::new()); + let queued_transactions = Mutex::new(Vec::new()); + let watched_outputs = Mutex::new(Vec::new()); + let queued_outputs = Mutex::new(Vec::new()); + let last_sync_height = tokio::sync::Mutex::new(None); + let tokio_runtime = RwLock::new(None); + // TODO: Check that we can be sure that the Esplora client re-connects in case of failure + // and and exits cleanly on drop. Otherwise we need to handle this/move it to the runtime? + let blockchain = Arc::new( + EsploraBlockchain::new(&config.esplora_server_url, BDK_CLIENT_STOP_GAP) + .with_concurrency(BDK_CLIENT_CONCURRENCY), + ); + let client_builder = + esplora_client::Builder::new(&format!("http://{}", &config.esplora_server_url)); + let client = Arc::new(client_builder.build_async().unwrap()); + Self { + blockchain, + client, + wallet, + queued_transactions, + watched_transactions, + queued_outputs, + watched_outputs, + last_sync_height, + tokio_runtime, + _config: config, + logger, + } + } + + pub(crate) fn set_runtime(&self, tokio_runtime: Arc) { + *self.tokio_runtime.write().unwrap() = Some(tokio_runtime); + } + + pub(crate) fn drop_runtime(&self) { + *self.tokio_runtime.write().unwrap() = None; + } + + pub(crate) async fn sync_wallet(&self) -> Result<(), Error> { + let sync_options = SyncOptions { progress: None }; + + self.wallet.lock().unwrap().sync(&self.blockchain, sync_options).await?; + + Ok(()) + } + + pub(crate) async fn sync( + &self, confirmables: Vec<&(dyn Confirm + Send + Sync)>, + ) -> Result<(), Error> { + let cur_height = self.client.get_height().await?; + + let mut locked_last_sync_height = self.last_sync_height.lock().await; + if cur_height >= locked_last_sync_height.unwrap_or(0) { + self.sync_best_block_updated(&confirmables, cur_height, &mut locked_last_sync_height) + .await?; + self.sync_transactions_confirmed(&confirmables).await?; + self.sync_transaction_unconfirmed(&confirmables).await?; + } + // TODO: check whether new outputs have been registered by now and process them + Ok(()) + } + + async fn sync_best_block_updated( + &self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>, cur_height: u32, + locked_last_sync_height: &mut tokio::sync::MutexGuard<'_, Option>, + ) -> Result<(), Error> { + // Inform the interface of the new block. + let cur_block_header = self.client.get_header(cur_height).await?; + for c in confirmables { + c.best_block_updated(&cur_block_header, cur_height); + } + + **locked_last_sync_height = Some(cur_height); + Ok(()) + } + + async fn sync_transactions_confirmed( + &self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>, + ) -> Result<(), Error> { + // First, check the confirmation status of registered transactions as well as the + // status of dependent transactions of registered outputs. + + let mut confirmed_txs = Vec::new(); + + // Check in the current queue, as well as in registered transactions leftover from + // previous iterations. + let registered_txs: HashSet = { + let locked_queued_transactions = self.queued_transactions.lock().unwrap(); + let locked_watched_transactions = self.watched_transactions.lock().unwrap(); + locked_watched_transactions + .iter() + .chain(locked_queued_transactions.iter()) + .cloned() + .collect() + }; + + // Remember all registered but unconfirmed transactions for future processing. + let mut unconfirmed_registered_txs = Vec::new(); + + for txid in registered_txs { + if let Some(tx_status) = self.client.get_tx_status(&txid).await? { + if tx_status.confirmed { + if let Some(tx) = self.client.get_tx(&txid).await? { + if let Some(block_height) = tx_status.block_height { + // TODO: Switch to `get_header_by_hash` once released upstream (https://github.com/bitcoindevkit/rust-esplora-client/pull/17) + let block_header = self.client.get_header(block_height).await?; + if let Some(merkle_proof) = self.client.get_merkle_proof(&txid).await? { + if block_height == merkle_proof.block_height { + confirmed_txs.push(( + tx, + block_height, + block_header, + merkle_proof.pos, + )); + continue; + } + } + } + } + } + } + unconfirmed_registered_txs.push(txid); + } + + // Check all registered outputs for dependent spending transactions. + let registered_outputs: Vec = { + let locked_queued_outputs = self.queued_outputs.lock().unwrap(); + let locked_watched_outputs = self.watched_outputs.lock().unwrap(); + locked_watched_outputs.iter().chain(locked_queued_outputs.iter()).cloned().collect() + }; + + // Remember all registered outputs that haven't been spent for future processing. + let mut unspent_registered_outputs = Vec::new(); + + for output in registered_outputs { + if let Some(output_status) = self + .client + .get_output_status(&output.outpoint.txid, output.outpoint.index as u64) + .await? + { + if output_status.spent { + if let Some(spending_tx_status) = output_status.status { + if spending_tx_status.confirmed { + let spending_txid = output_status.txid.unwrap(); + if let Some(spending_tx) = self.client.get_tx(&spending_txid).await? { + let block_height = spending_tx_status.block_height.unwrap(); + // TODO: Switch to `get_header_by_hash` once released upstream (https://github.com/bitcoindevkit/rust-esplora-client/pull/17) + let block_header = self.client.get_header(block_height).await?; + if let Some(merkle_proof) = + self.client.get_merkle_proof(&spending_txid).await? + { + confirmed_txs.push(( + spending_tx, + block_height, + block_header, + merkle_proof.pos, + )); + continue; + } + } + } + } + } + } + unspent_registered_outputs.push(output); + } + + // Sort all confirmed transactions first by block height, then by in-block + // position, and finally feed them to the interface in order. + confirmed_txs.sort_unstable_by( + |(_, block_height1, _, pos1), (_, block_height2, _, pos2)| { + block_height1.cmp(&block_height2).then_with(|| pos1.cmp(&pos2)) + }, + ); + for (tx, block_height, block_header, pos) in confirmed_txs { + for c in confirmables { + c.transactions_confirmed(&block_header, &[(pos, &tx)], block_height); + } + } + + *self.queued_transactions.lock().unwrap() = Vec::new(); + *self.watched_transactions.lock().unwrap() = unconfirmed_registered_txs; + *self.queued_outputs.lock().unwrap() = Vec::new(); + *self.watched_outputs.lock().unwrap() = unspent_registered_outputs; + + Ok(()) + } + + async fn sync_transaction_unconfirmed( + &self, confirmables: &Vec<&(dyn Confirm + Send + Sync)>, + ) -> Result<(), Error> { + // Query the interface for relevant txids and check whether they have been + // reorged-out of the chain. + let relevant_txids = + confirmables.iter().flat_map(|c| c.get_relevant_txids()).collect::>(); + for txid in relevant_txids { + let tx_unconfirmed = self + .client + .get_tx_status(&txid) + .await + .ok() + .unwrap_or(None) + .map_or(true, |status| !status.confirmed); + if tx_unconfirmed { + for c in confirmables { + c.transaction_unconfirmed(&txid); + } + } + } + + Ok(()) + } + + pub(crate) fn create_funding_transaction( + &self, output_script: &Script, value_sats: u64, confirmation_target: ConfirmationTarget, + ) -> Result { + let locked_wallet = self.wallet.lock().unwrap(); + let mut tx_builder = locked_wallet.build_tx(); + + let fallback_fee = fallback_fee_from_conf_target(confirmation_target); + let fee_rate = self + .estimate_fee(confirmation_target) + .unwrap_or(FeeRate::from_sat_per_kwu(fallback_fee as f32)); + + tx_builder.add_recipient(output_script.clone(), value_sats).fee_rate(fee_rate).enable_rbf(); + + let (mut psbt, _) = tx_builder.finish()?; + log_trace!(self.logger, "Created funding PSBT: {:?}", psbt); + + // We double-check that no inputs try to spend non-witness outputs. As we use a SegWit + // wallet descriptor this technically shouldn't ever happen, but better safe than sorry. + for input in &psbt.inputs { + if input.witness_utxo.is_none() { + log_error!(self.logger, "Tried to spend a non-witness funding output. This must not ever happen. Panicking!"); + panic!("Tried to spend a non-witness funding output. This must not ever happen."); + } + } + + if !locked_wallet.sign(&mut psbt, SignOptions::default())? { + return Err(Error::FundingTxCreationFailed); + } + + Ok(psbt.extract_tx()) + } + + pub(crate) fn get_new_address(&self) -> Result { + let address_info = self.wallet.lock().unwrap().get_address(AddressIndex::New)?; + Ok(address_info.address) + } + + fn estimate_fee(&self, confirmation_target: ConfirmationTarget) -> Result { + let num_blocks = num_blocks_from_conf_target(confirmation_target); + + let locked_runtime = self.tokio_runtime.read().unwrap(); + if locked_runtime.as_ref().is_none() { + return Err(Error::FeeEstimationFailed); + } + + let tokio_client = Arc::clone(&self.client); + let (sender, receiver) = mpsc::sync_channel(1); + + locked_runtime.as_ref().unwrap().spawn(async move { + let res = tokio_client.get_fee_estimates().await; + let _ = sender.send(res); + }); + + let estimates = receiver + .recv() + .map_err(|_| Error::FeeEstimationFailed)? + .map_err(|_| Error::FeeEstimationFailed)?; + + Ok(bdk::FeeRate::from_sat_per_vb( + esplora_client::convert_fee_rate(num_blocks, estimates) + .map_err(|_| Error::FeeEstimationFailed)?, + )) + } +} + +impl FeeEstimator for ChainAccess +where + D: BatchDatabase, +{ + fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { + let fallback_fee = fallback_fee_from_conf_target(confirmation_target); + + self.estimate_fee(confirmation_target) + .map_or(fallback_fee, |fee_rate| (fee_rate.fee_wu(1000) as u32).max(MIN_FEERATE)) as u32 + } +} + +impl BroadcasterInterface for ChainAccess +where + D: BatchDatabase, +{ + fn broadcast_transaction(&self, tx: &Transaction) { + let locked_runtime = self.tokio_runtime.read().unwrap(); + if locked_runtime.as_ref().is_none() { + log_error!(self.logger, "Failed to broadcast transaction: No runtime."); + return; + } + + let tokio_client = Arc::clone(&self.client); + let tokio_tx = tx.clone(); + let (sender, receiver) = mpsc::sync_channel(1); + + locked_runtime.as_ref().unwrap().spawn(async move { + let res = tokio_client.broadcast(&tokio_tx).await; + let _ = sender.send(res); + }); + + match receiver.recv().unwrap() { + Ok(_) => {} + Err(err) => { + log_error!(self.logger, "Failed to broadcast transaction: {}", err); + panic!("Failed to broadcast transaction: {}", err); + } + } + } +} + +impl Filter for ChainAccess +where + D: BatchDatabase, +{ + fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) { + self.queued_transactions.lock().unwrap().push(*txid); + } + + fn register_output(&self, output: WatchedOutput) -> Option<(usize, Transaction)> { + self.queued_outputs.lock().unwrap().push(output); + return None; + } +} + +fn num_blocks_from_conf_target(confirmation_target: ConfirmationTarget) -> usize { + match confirmation_target { + ConfirmationTarget::Background => 12, + ConfirmationTarget::Normal => 6, + ConfirmationTarget::HighPriority => 3, + } +} + +fn fallback_fee_from_conf_target(confirmation_target: ConfirmationTarget) -> u32 { + match confirmation_target { + ConfirmationTarget::Background => MIN_FEERATE, + ConfirmationTarget::Normal => 2000, + ConfirmationTarget::HighPriority => 5000, + } +} diff --git a/src/error.rs b/src/error.rs index e3b34613a..8e937ed35 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,130 +1,79 @@ use bdk::blockchain::esplora; -use lightning::ln::msgs; -use lightning::util::errors; -use lightning_invoice::payment; use std::fmt; -use std::io; -use std::time; #[derive(Debug)] /// An error that possibly needs to be handled by the user. -pub enum LdkLiteError { +pub enum Error { /// Returned when trying to start LdkLite while it is already running. AlreadyRunning, /// Returned when trying to stop LdkLite while it is not running. NotRunning, /// The funding transaction could not be created. FundingTxCreationFailed, + /// Returned when we could not estimate a transaction fee. + FeeEstimationFailed, /// A network connection has been closed. ConnectionFailed, /// Payment of the given invoice has already been intiated. NonUniquePaymentHash, + /// The given invoice is invalid. + InvoiceInvalid, + /// Invoice creation failed. + InvoiceCreationFailed, + /// No route for the given target could be found. + RoutingFailed, /// A given peer info could not be parsed. - PeerInfoParse(&'static str), - /// A wrapped LDK `APIError` - LdkApi(errors::APIError), - /// A wrapped LDK `DecodeError` - LdkDecode(msgs::DecodeError), - /// A wrapped LDK `PaymentError` - LdkPayment(payment::PaymentError), - /// A wrapped LDK `SignOrCreationError` - LdkInvoiceCreation(lightning_invoice::SignOrCreationError), - /// A wrapped BDK error - Bdk(bdk::Error), - /// A wrapped `EsploraError` - Esplora(esplora::EsploraError), - /// A wrapped `Bip32` error - Bip32(bitcoin::util::bip32::Error), - /// A wrapped `std::io::Error` - StdIo(io::Error), - /// A wrapped `SystemTimeError` - StdTime(time::SystemTimeError), + PeerInfoParseFailed, + /// A channel could not be opened. + ChannelCreationFailed, + /// A channel could not be closed. + ChannelClosingFailed, + /// Persistence failed. + PersistenceFailed, + /// A wallet operation failed. + WalletOperationFailed, + /// A siging operation failed. + WalletSigningFailed, + /// A chain access operation failed. + ChainAccessFailed, } -impl fmt::Display for LdkLiteError { +impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { - LdkLiteError::AlreadyRunning => write!(f, "LDKLite is already running."), - LdkLiteError::NotRunning => write!(f, "LDKLite is not running."), - LdkLiteError::FundingTxCreationFailed => { - write!(f, "the funding transaction could not be created") - } - LdkLiteError::ConnectionFailed => write!(f, "network connection closed"), - LdkLiteError::NonUniquePaymentHash => write!(f, "an invoice must not get payed twice."), - LdkLiteError::PeerInfoParse(ref e) => { - write!(f, "given peer info could not be parsed: {}", e) - } - LdkLiteError::LdkDecode(ref e) => write!(f, "LDK decode error: {}", e), - LdkLiteError::LdkApi(ref e) => write!(f, "LDK API error: {:?}", e), - LdkLiteError::LdkPayment(ref e) => write!(f, "LDK payment error: {:?}", e), - LdkLiteError::LdkInvoiceCreation(ref e) => { - write!(f, "LDK invoice sign or creation error: {:?}", e) - } - LdkLiteError::Bdk(ref e) => write!(f, "BDK error: {}", e), - LdkLiteError::Esplora(ref e) => write!(f, "Esplora error: {}", e), - LdkLiteError::Bip32(ref e) => write!(f, "Bitcoin error: {}", e), - LdkLiteError::StdIo(ref e) => write!(f, "IO error: {}", e), - LdkLiteError::StdTime(ref e) => write!(f, "time error: {}", e), + Self::AlreadyRunning => write!(f, "LDKLite is already running."), + Self::NotRunning => write!(f, "LDKLite is not running."), + Self::FundingTxCreationFailed => write!(f, "Funding transaction could not be created."), + Self::FeeEstimationFailed => write!(f, "Fee estimation failed."), + Self::ConnectionFailed => write!(f, "Network connection closed."), + Self::NonUniquePaymentHash => write!(f, "An invoice must not get payed twice."), + Self::InvoiceInvalid => write!(f, "The given invoice is invalid."), + Self::InvoiceCreationFailed => write!(f, "Failed to create invoice."), + Self::RoutingFailed => write!(f, "Failed to find route."), + Self::PeerInfoParseFailed => write!(f, "Failed to parse the given peer information."), + Self::ChannelCreationFailed => write!(f, "Failed to create channel."), + Self::ChannelClosingFailed => write!(f, "Failed to close channel."), + Self::PersistenceFailed => write!(f, "Failed to persist data."), + Self::WalletOperationFailed => write!(f, "Failed to conduct wallet operation."), + Self::WalletSigningFailed => write!(f, "Failed to sign given transaction."), + Self::ChainAccessFailed => write!(f, "Failed to conduct chain access operation."), } } } -impl From for LdkLiteError { - fn from(e: errors::APIError) -> Self { - Self::LdkApi(e) - } -} - -impl From for LdkLiteError { - fn from(e: msgs::DecodeError) -> Self { - Self::LdkDecode(e) - } -} - -impl From for LdkLiteError { - fn from(e: payment::PaymentError) -> Self { - Self::LdkPayment(e) - } -} - -impl From for LdkLiteError { - fn from(e: lightning_invoice::SignOrCreationError) -> Self { - Self::LdkInvoiceCreation(e) - } -} +impl std::error::Error for Error {} -impl From for LdkLiteError { +impl From for Error { fn from(e: bdk::Error) -> Self { - Self::Bdk(e) - } -} - -impl From for LdkLiteError { - fn from(e: bdk::sled::Error) -> Self { - Self::Bdk(bdk::Error::Sled(e)) - } -} - -impl From for LdkLiteError { - fn from(e: bitcoin::util::bip32::Error) -> Self { - Self::Bip32(e) - } -} - -impl From for LdkLiteError { - fn from(e: io::Error) -> Self { - Self::StdIo(e) - } -} - -impl From for LdkLiteError { - fn from(e: time::SystemTimeError) -> Self { - Self::StdTime(e) + match e { + bdk::Error::Signer(_) => Self::WalletSigningFailed, + _ => Self::WalletOperationFailed, + } } } -impl From for LdkLiteError { - fn from(e: esplora::EsploraError) -> Self { - Self::Esplora(e) +impl From for Error { + fn from(_e: esplora::EsploraError) -> Self { + Self::ChainAccessFailed } } diff --git a/src/event.rs b/src/event.rs new file mode 100644 index 000000000..8c137efe5 --- /dev/null +++ b/src/event.rs @@ -0,0 +1,508 @@ +use crate::{ + hex_utils, ChainAccess, ChannelManager, Config, Error, FilesystemPersister, NetworkGraph, + PaymentInfo, PaymentInfoStorage, PaymentStatus, +}; + +use crate::logger::{log_error, log_given_level, log_info, log_internal, FilesystemLogger, Logger}; + +use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; +use lightning::chain::keysinterface::KeysManager; +use lightning::ln::PaymentHash; +use lightning::routing::gossip::NodeId; +use lightning::util::events::Event as LdkEvent; +use lightning::util::events::EventHandler as LdkEventHandler; +use lightning::util::events::PaymentPurpose; +use lightning::util::persist::KVStorePersister; +use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; + +use bitcoin::secp256k1::Secp256k1; +use rand::{thread_rng, Rng}; +use std::collections::{hash_map, VecDeque}; +use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::time::Duration; + +/// The event queue will be persisted under this key. +pub(crate) const EVENTS_PERSISTENCE_KEY: &str = "events"; + +/// An event emitted by [`LdkLite`] that should be handled by the user. +/// +/// [`LdkLite`]: [`crate::LdkLite`] +#[derive(Debug, Clone)] +pub enum Event { + /// A payment we sent was successful. + PaymentSuccessful { + /// The hash of the payment. + payment_hash: PaymentHash, + }, + /// A payment we sent has failed. + PaymentFailed { + /// The hash of the payment. + payment_hash: PaymentHash, + }, + /// A payment has been received. + PaymentReceived { + /// The hash of the payment. + payment_hash: PaymentHash, + /// The value, in thousandths of a satoshi that has been received. + amount_msat: u64, + }, + // TODO: Implement after a corresponding LDK event is added. + //ChannelOpened { + //}, + /// A channel has been closed. + ChannelClosed { + /// The channel_id of the channel which has been closed. + channel_id: [u8; 32], + }, + // TODO: Implement on-chain events when better integrating with BDK wallet sync. + //OnChainPaymentSent { + //}, + //OnChainPaymentReceived { + //} +} + +impl Readable for Event { + fn read( + reader: &mut R, + ) -> Result { + match Readable::read(reader)? { + 0u8 => { + let payment_hash: PaymentHash = Readable::read(reader)?; + Ok(Self::PaymentSuccessful { payment_hash }) + } + 1u8 => { + let payment_hash: PaymentHash = Readable::read(reader)?; + Ok(Self::PaymentFailed { payment_hash }) + } + 2u8 => { + let payment_hash: PaymentHash = Readable::read(reader)?; + let amount_msat: u64 = Readable::read(reader)?; + Ok(Self::PaymentReceived { payment_hash, amount_msat }) + } + //3u8 => { + // TODO ChannelOpened + //} + 4u8 => { + let channel_id: [u8; 32] = Readable::read(reader)?; + Ok(Self::ChannelClosed { channel_id }) + } + //5u8 => { + // TODO OnChainPaymentSent + //} + //6u8 => { + // TODO OnChainPaymentReceived + //} + _ => Err(lightning::ln::msgs::DecodeError::InvalidValue), + } + } +} + +impl Writeable for Event { + fn write(&self, writer: &mut W) -> Result<(), lightning::io::Error> { + match self { + Self::PaymentSuccessful { payment_hash } => { + 0u8.write(writer)?; + payment_hash.write(writer)?; + Ok(()) + } + Self::PaymentFailed { payment_hash } => { + 1u8.write(writer)?; + payment_hash.write(writer)?; + Ok(()) + } + Self::PaymentReceived { payment_hash, amount_msat } => { + 2u8.write(writer)?; + payment_hash.write(writer)?; + amount_msat.write(writer)?; + Ok(()) + } + //Self::ChannelOpened { .. } => { + //TODO + //} + Self::ChannelClosed { channel_id } => { + 4u8.write(writer)?; + channel_id.write(writer)?; + Ok(()) + } //Self::OnChainPaymentSent { .. } => { + //TODO + //} + //Self::OnChainPaymentReceived { .. } => { + //TODO + //} + } + } +} + +pub(crate) struct EventQueue { + queue: Mutex, + notifier: Condvar, + persister: Arc, +} + +impl EventQueue { + pub(crate) fn new(persister: Arc) -> Self { + let queue: Mutex = Mutex::new(EventQueueSerWrapper(VecDeque::new())); + let notifier = Condvar::new(); + Self { queue, notifier, persister } + } + pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> { + let mut locked_queue = self.queue.lock().unwrap(); + locked_queue.0.push_back(event); + + self.persister + .persist(EVENTS_PERSISTENCE_KEY, &*locked_queue) + .map_err(|_| Error::PersistenceFailed)?; + + self.notifier.notify_one(); + Ok(()) + } + + pub(crate) fn next_event(&self) -> Event { + let locked_queue = self + .notifier + .wait_while(self.queue.lock().unwrap(), |queue| queue.0.is_empty()) + .unwrap(); + locked_queue.0.front().unwrap().clone() + } + + pub(crate) fn event_handled(&self) -> Result<(), Error> { + let mut locked_queue = self.queue.lock().unwrap(); + locked_queue.0.pop_front(); + self.persister + .persist(EVENTS_PERSISTENCE_KEY, &*locked_queue) + .map_err(|_| Error::PersistenceFailed)?; + self.notifier.notify_one(); + Ok(()) + } +} + +impl ReadableArgs> for EventQueue { + #[inline] + fn read( + reader: &mut R, persister: Arc, + ) -> Result { + let queue: Mutex = Mutex::new(Readable::read(reader)?); + let notifier = Condvar::new(); + Ok(Self { queue, notifier, persister }) + } +} + +struct EventQueueSerWrapper(VecDeque); + +impl Readable for EventQueueSerWrapper { + fn read( + reader: &mut R, + ) -> Result { + let len: u16 = Readable::read(reader)?; + let mut queue = VecDeque::with_capacity(len as usize); + for _ in 0..len { + queue.push_back(Readable::read(reader)?); + } + Ok(EventQueueSerWrapper(queue)) + } +} + +impl Writeable for EventQueueSerWrapper { + fn write(&self, writer: &mut W) -> Result<(), lightning::io::Error> { + (self.0.len() as u16).write(writer)?; + for e in self.0.iter() { + e.write(writer)?; + } + Ok(()) + } +} + +pub(crate) struct EventHandler { + chain_access: Arc>, + event_queue: Arc>, + channel_manager: Arc, + network_graph: Arc, + keys_manager: Arc, + inbound_payments: Arc, + outbound_payments: Arc, + tokio_runtime: RwLock>>, + logger: Arc, + _config: Arc, +} + +impl EventHandler { + pub fn new( + chain_access: Arc>, + event_queue: Arc>, channel_manager: Arc, + network_graph: Arc, keys_manager: Arc, + inbound_payments: Arc, outbound_payments: Arc, + logger: Arc, _config: Arc, + ) -> Self { + let tokio_runtime = RwLock::new(None); + Self { + event_queue, + chain_access, + channel_manager, + network_graph, + keys_manager, + inbound_payments, + outbound_payments, + tokio_runtime, + logger, + _config, + } + } + + pub(crate) fn set_runtime(&self, tokio_runtime: Arc) { + *self.tokio_runtime.write().unwrap() = Some(tokio_runtime); + } + + pub(crate) fn drop_runtime(&self) { + *self.tokio_runtime.write().unwrap() = None; + } +} + +impl LdkEventHandler for EventHandler { + fn handle_event(&self, event: &LdkEvent) { + match event { + LdkEvent::FundingGenerationReady { + temporary_channel_id, + counterparty_node_id, + channel_value_satoshis, + output_script, + .. + } => { + // Construct the raw transaction with one output, that is paid the amount of the + // channel. + let confirmation_target = ConfirmationTarget::Normal; + + // Sign the final funding transaction and broadcast it. + match self.chain_access.create_funding_transaction( + &output_script, + *channel_value_satoshis, + confirmation_target, + ) { + Ok(final_tx) => { + // Give the funding transaction back to LDK for opening the channel. + if self + .channel_manager + .funding_transaction_generated( + &temporary_channel_id, + counterparty_node_id, + final_tx, + ) + .is_err() + { + log_error!(self.logger, "Channel went away before we could fund it. The peer disconnected or refused the channel"); + } + } + Err(err) => { + log_error!(self.logger, "Failed to create funding transaction: {}", err); + } + }; + } + LdkEvent::PaymentReceived { payment_hash, purpose, amount_msat } => { + log_info!( + self.logger, + "Received payment from payment hash {} of {} millisatoshis", + hex_utils::to_string(&payment_hash.0), + amount_msat, + ); + let payment_preimage = match purpose { + PaymentPurpose::InvoicePayment { payment_preimage, .. } => *payment_preimage, + PaymentPurpose::SpontaneousPayment(preimage) => Some(*preimage), + }; + self.channel_manager.claim_funds(payment_preimage.unwrap()); + self.event_queue + .add_event(Event::PaymentReceived { + payment_hash: *payment_hash, + amount_msat: *amount_msat, + }) + .unwrap(); + } + LdkEvent::PaymentClaimed { payment_hash, purpose, amount_msat } => { + log_info!( + self.logger, + "Claimed payment from payment hash {} of {} millisatoshis", + hex_utils::to_string(&payment_hash.0), + amount_msat, + ); + let (payment_preimage, payment_secret) = match purpose { + PaymentPurpose::InvoicePayment { payment_preimage, payment_secret, .. } => { + (*payment_preimage, Some(*payment_secret)) + } + PaymentPurpose::SpontaneousPayment(preimage) => (Some(*preimage), None), + }; + let mut payments = self.inbound_payments.lock().unwrap(); + match payments.entry(*payment_hash) { + hash_map::Entry::Occupied(mut e) => { + let payment = e.get_mut(); + payment.status = PaymentStatus::Succeeded; + payment.preimage = payment_preimage; + payment.secret = payment_secret; + } + hash_map::Entry::Vacant(e) => { + e.insert(PaymentInfo { + preimage: payment_preimage, + secret: payment_secret, + status: PaymentStatus::Succeeded, + amount_msat: Some(*amount_msat), + }); + } + } + } + LdkEvent::PaymentSent { payment_preimage, payment_hash, fee_paid_msat, .. } => { + let mut payments = self.outbound_payments.lock().unwrap(); + for (hash, payment) in payments.iter_mut() { + if *hash == *payment_hash { + payment.preimage = Some(*payment_preimage); + payment.status = PaymentStatus::Succeeded; + log_info!( + self.logger, + "Successfully sent payment of {} millisatoshis{} from \ + payment hash {:?} with preimage {:?}", + payment.amount_msat.unwrap(), + if let Some(fee) = fee_paid_msat { + format!(" (fee {} msat)", fee) + } else { + "".to_string() + }, + hex_utils::to_string(&payment_hash.0), + hex_utils::to_string(&payment_preimage.0) + ); + } + } + self.event_queue + .add_event(Event::PaymentSuccessful { payment_hash: *payment_hash }) + .unwrap(); + } + LdkEvent::PaymentFailed { payment_hash, .. } => { + log_info!( + self.logger, + "Failed to send payment to payment hash {:?}: exhausted payment retry attempts", + hex_utils::to_string(&payment_hash.0) + ); + + let mut payments = self.outbound_payments.lock().unwrap(); + if payments.contains_key(&payment_hash) { + let payment = payments.get_mut(&payment_hash).unwrap(); + payment.status = PaymentStatus::Failed; + } + self.event_queue + .add_event(Event::PaymentFailed { payment_hash: *payment_hash }) + .unwrap(); + } + + LdkEvent::PaymentPathSuccessful { .. } => {} + LdkEvent::PaymentPathFailed { .. } => {} + LdkEvent::ProbeSuccessful { .. } => {} + LdkEvent::ProbeFailed { .. } => {} + LdkEvent::HTLCHandlingFailed { .. } => {} + LdkEvent::PendingHTLCsForwardable { time_forwardable } => { + let forwarding_channel_manager = self.channel_manager.clone(); + let min = time_forwardable.as_millis() as u64; + + let locked_runtime = self.tokio_runtime.read().unwrap(); + if locked_runtime.as_ref().is_none() { + return; + } + + locked_runtime.as_ref().unwrap().spawn(async move { + let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64; + tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await; + forwarding_channel_manager.process_pending_htlc_forwards(); + }); + } + LdkEvent::SpendableOutputs { outputs } => { + let destination_address = self.chain_access.get_new_address().unwrap(); + let output_descriptors = &outputs.iter().map(|a| a).collect::>(); + let tx_feerate = + self.chain_access.get_est_sat_per_1000_weight(ConfirmationTarget::Normal); + let spending_tx = self + .keys_manager + .spend_spendable_outputs( + output_descriptors, + Vec::new(), + destination_address.script_pubkey(), + tx_feerate, + &Secp256k1::new(), + ) + .unwrap(); + self.chain_access.broadcast_transaction(&spending_tx); + } + LdkEvent::OpenChannelRequest { .. } => {} + LdkEvent::PaymentForwarded { + prev_channel_id, + next_channel_id, + fee_earned_msat, + claim_from_onchain_tx, + } => { + let read_only_network_graph = self.network_graph.read_only(); + let nodes = read_only_network_graph.nodes(); + let channels = self.channel_manager.list_channels(); + + let node_str = |channel_id: &Option<[u8; 32]>| match channel_id { + None => String::new(), + Some(channel_id) => match channels.iter().find(|c| c.channel_id == *channel_id) + { + None => String::new(), + Some(channel) => { + match nodes.get(&NodeId::from_pubkey(&channel.counterparty.node_id)) { + None => "private node".to_string(), + Some(node) => match &node.announcement_info { + None => "unnamed node".to_string(), + Some(announcement) => { + format!("node {}", announcement.alias) + } + }, + } + } + }, + }; + let channel_str = |channel_id: &Option<[u8; 32]>| { + channel_id + .map(|channel_id| { + format!(" with channel {}", hex_utils::to_string(&channel_id)) + }) + .unwrap_or_default() + }; + let from_prev_str = + format!(" from {}{}", node_str(prev_channel_id), channel_str(prev_channel_id)); + let to_next_str = + format!(" to {}{}", node_str(next_channel_id), channel_str(next_channel_id)); + + let from_onchain_str = if *claim_from_onchain_tx { + "from onchain downstream claim" + } else { + "from HTLC fulfill message" + }; + if let Some(fee_earned) = fee_earned_msat { + log_info!( + self.logger, + "Forwarded payment{}{}, earning {} msat {}", + from_prev_str, + to_next_str, + fee_earned, + from_onchain_str + ); + } else { + log_info!( + self.logger, + "Forwarded payment{}{}, claiming onchain {}", + from_prev_str, + to_next_str, + from_onchain_str + ); + } + } + + LdkEvent::ChannelClosed { channel_id, reason, user_channel_id: _ } => { + log_info!( + self.logger, + "Channel {} closed due to: {:?}", + hex_utils::to_string(channel_id), + reason + ); + self.event_queue + .add_event(Event::ChannelClosed { channel_id: *channel_id }) + .unwrap(); + } + LdkEvent::DiscardFunding { .. } => {} + } + } +} diff --git a/src/io_utils.rs b/src/io_utils.rs new file mode 100644 index 000000000..a295e100c --- /dev/null +++ b/src/io_utils.rs @@ -0,0 +1,66 @@ +use crate::{Config, Error, FilesystemLogger, NetworkGraph, Scorer}; + +use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringParameters}; +use lightning::util::ser::ReadableArgs; + +use rand::{thread_rng, RngCore}; + +use std::fs; +use std::io::{BufReader, Write}; +use std::path::Path; +use std::sync::Arc; + +pub(crate) fn read_or_generate_seed_file(config: Arc) -> [u8; 32] { + let keys_seed_path = format!("{}/keys_seed", config.storage_dir_path); + let keys_seed = if Path::new(&keys_seed_path).exists() { + let seed = fs::read(keys_seed_path.clone()).expect("Failed to read keys seed file"); + assert_eq!(seed.len(), 32); + let mut key = [0; 32]; + key.copy_from_slice(&seed); + key + } else { + let mut key = [0; 32]; + thread_rng().fill_bytes(&mut key); + + let mut f = + fs::File::create(keys_seed_path.clone()).expect("Failed to create keys seed file"); + f.write_all(&key).expect("Failed to write node keys seed to disk"); + f.sync_all().expect("Failed to sync node keys seed to disk"); + key + }; + + keys_seed +} + +pub(crate) fn read_network_graph( + config: Arc, logger: Arc, +) -> Result { + let ldk_data_dir = format!("{}/ldk", &config.storage_dir_path.clone()); + let network_graph_path = format!("{}/network_graph", ldk_data_dir.clone()); + + if let Ok(file) = fs::File::open(network_graph_path) { + if let Ok(graph) = NetworkGraph::read(&mut BufReader::new(file), Arc::clone(&logger)) { + return Ok(graph); + } + } + + let genesis_hash = + bitcoin::blockdata::constants::genesis_block(config.network).header.block_hash(); + Ok(NetworkGraph::new(genesis_hash, logger)) +} + +pub(crate) fn read_scorer( + config: Arc, network_graph: Arc, logger: Arc, +) -> Scorer { + let ldk_data_dir = format!("{}/ldk", &config.storage_dir_path.clone()); + let scorer_path = format!("{}/scorer", ldk_data_dir.clone()); + + let params = ProbabilisticScoringParameters::default(); + if let Ok(file) = fs::File::open(scorer_path) { + let args = (params.clone(), Arc::clone(&network_graph), Arc::clone(&logger)); + if let Ok(scorer) = ProbabilisticScorer::read(&mut BufReader::new(file), args) { + return scorer; + } + } + ProbabilisticScorer::new(params, network_graph, logger) +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 000000000..58b0490e6 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,1007 @@ +// This file is Copyright its original authors, visible in version contror +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +#![crate_name = "ldk_lite"] + +//! A library providing a simplified API for the Lightning Dev Kit. While LDK itself provides a +//! highly configurable and adaptable interface, this API champions simplicity and ease of use over +//! configurability. To this end, it provides an opionated set of design choices and ready-to-go +//! default modules, while still enabling some configurability when dearly needed by the user: +//! - Chain data is accessed through an Esplora client. +//! - Wallet and channel states are persisted to disk. +//! - Gossip is retrieved over the P2P network. + +#![deny(missing_docs)] +#![deny(broken_intra_doc_links)] +#![deny(private_intra_doc_links)] +#![allow(bare_trait_objects)] +#![allow(ellipsis_inclusive_range_patterns)] +#![cfg_attr(docsrs, feature(doc_auto_cfg))] + +mod access; +mod error; +mod event; +mod hex_utils; +mod io_utils; +mod logger; +mod peer_store; + +use access::ChainAccess; +pub use error::Error; +pub use event::Event; +use event::{EventHandler, EventQueue}; +use peer_store::{PeerInfo, PeerInfoStorage}; + +use logger::{log_error, log_given_level, log_info, log_internal, FilesystemLogger, Logger}; + +use lightning::chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager, Recipient}; +use lightning::chain::{chainmonitor, Access, BestBlock, Confirm, Filter, Watch}; +use lightning::ln::channelmanager; +use lightning::ln::channelmanager::{ + ChainParameters, ChannelManagerReadArgs, SimpleArcChannelManager, +}; +use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler, SimpleArcPeerManager}; +use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret}; +use lightning::routing::gossip; +use lightning::routing::gossip::P2PGossipSync; +use lightning::routing::scoring::ProbabilisticScorer; + +use lightning::util::config::{ChannelHandshakeConfig, ChannelHandshakeLimits, UserConfig}; +use lightning::util::ser::ReadableArgs; + +use lightning_background_processor::BackgroundProcessor; +use lightning_background_processor::GossipSync as BPGossipSync; +use lightning_persister::FilesystemPersister; + +use lightning_net_tokio::SocketDescriptor; + +use lightning_invoice::utils::DefaultRouter; +use lightning_invoice::{payment, Currency, Invoice}; + +use bdk::bitcoin::secp256k1::Secp256k1; +use bdk::sled; +use bdk::template::Bip84; + +use bitcoin::hashes::sha256::Hash as Sha256; +use bitcoin::hashes::Hash; +use bitcoin::secp256k1::PublicKey; +use bitcoin::BlockHash; + +use rand::Rng; + +use std::collections::HashMap; +use std::convert::TryFrom; +use std::fs; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant, SystemTime}; + +// The timeout after which we abandon retrying failed payments. +const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10); + +#[derive(Debug, Clone)] +/// Represents the configuration of an [`LdkLite`] instance. +pub struct Config { + /// The path where the underlying LDK and BDK persist their data. + pub storage_dir_path: String, + /// The URL of the utilized Esplora server. + pub esplora_server_url: String, + /// The used Bitcoin network. + pub network: bitcoin::Network, + /// The TCP port the network node will listen on. + pub listening_port: u16, + /// The default CLTV expiry delta to be used for payments. + pub default_cltv_expiry_delta: u32, +} + +/// A builder for an [`LdkLite`] instance, allowing to set some configuration and module choices from +/// the getgo. +#[derive(Debug, Clone)] +pub struct Builder { + config: Config, +} + +impl Builder { + /// Creates a new builder instance with the default configuration. + pub fn new() -> Self { + // Set the config defaults + let storage_dir_path = "/tmp/ldk_lite/".to_string(); + let esplora_server_url = "https://blockstream.info/api".to_string(); + let network = bitcoin::Network::Testnet; + let listening_port = 9735; + let default_cltv_expiry_delta = 144; + + let config = Config { + storage_dir_path, + esplora_server_url, + network, + listening_port, + default_cltv_expiry_delta, + }; + + Self { config } + } + + /// Creates a new builder instance from an [`Config`]. + pub fn from_config(config: Config) -> Self { + Self { config } + } + + /// Sets the used storage directory path. + /// + /// Default: `/tmp/ldk_lite/` + pub fn set_storage_dir_path(&mut self, storage_dir_path: String) -> &mut Self { + self.config.storage_dir_path = storage_dir_path; + self + } + + /// Sets the Esplora server URL. + /// + /// Default: `https://blockstream.info/api` + pub fn set_esplora_server_url(&mut self, esplora_server_url: String) -> &mut Self { + self.config.esplora_server_url = esplora_server_url; + self + } + + /// Sets the Bitcoin network used. + /// + /// Options: `mainnet`/`bitcoin`, `testnet`, `regtest`, `signet` + /// + /// Default: `testnet` + pub fn set_network(&mut self, network: &str) -> &mut Self { + self.config.network = match network { + "mainnet" => bitcoin::Network::Bitcoin, + "bitcoin" => bitcoin::Network::Bitcoin, + "testnet" => bitcoin::Network::Testnet, + "regtest" => bitcoin::Network::Regtest, + "signet" => bitcoin::Network::Signet, + _ => bitcoin::Network::Testnet, + }; + self + } + + /// Sets the port on which [`LdkLite`] will listen for incoming network connections. + /// + /// Default: `9735` + pub fn set_listening_port(&mut self, listening_port: u16) -> &mut Self { + self.config.listening_port = listening_port; + self + } + + /// Builds an [`LdkLite`] instance according to the options previously configured. + pub fn build(&self) -> LdkLite { + let config = Arc::new(self.config.clone()); + + let ldk_data_dir = format!("{}/ldk", &config.storage_dir_path.clone()); + fs::create_dir_all(ldk_data_dir.clone()).expect("Failed to create LDK data directory"); + + let bdk_data_dir = format!("{}/bdk", config.storage_dir_path.clone()); + fs::create_dir_all(bdk_data_dir.clone()).expect("Failed to create BDK data directory"); + + // Step 0: Initialize the Logger + let log_file_path = format!("{}/ldk_lite.log", config.storage_dir_path.clone()); + let logger = Arc::new(FilesystemLogger::new(log_file_path)); + + // Step 1: Initialize the on-chain wallet and chain access + let seed = io_utils::read_or_generate_seed_file(Arc::clone(&config)); + let xprv = bitcoin::util::bip32::ExtendedPrivKey::new_master(config.network, &seed) + .expect("Failed to read wallet master key"); + + let wallet_name = bdk::wallet::wallet_name_from_descriptor( + Bip84(xprv.clone(), bdk::KeychainKind::External), + Some(Bip84(xprv.clone(), bdk::KeychainKind::Internal)), + config.network, + &Secp256k1::new(), + ) + .expect("Failed to derive on-chain wallet name"); + let database = sled::open(bdk_data_dir).expect("Failed to open BDK database"); + let database = + database.open_tree(wallet_name.clone()).expect("Failed to open BDK database"); + + let bdk_wallet = bdk::Wallet::new( + Bip84(xprv.clone(), bdk::KeychainKind::External), + Some(Bip84(xprv.clone(), bdk::KeychainKind::Internal)), + config.network, + database, + ) + .expect("Failed to setup on-chain wallet"); + + let chain_access = + Arc::new(ChainAccess::new(bdk_wallet, Arc::clone(&config), Arc::clone(&logger))); + + // Step 3: Initialize Persist + let persister = Arc::new(FilesystemPersister::new(ldk_data_dir.clone())); + + // Step 4: Initialize the ChainMonitor + let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( + None, + Arc::clone(&chain_access), + Arc::clone(&logger), + Arc::clone(&chain_access), + Arc::clone(&persister), + )); + + // Step 5: Initialize the KeysManager + let cur = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("System time error: Clock may have gone backwards"); + let keys_manager = Arc::new(KeysManager::new(&seed, cur.as_secs(), cur.subsec_nanos())); + + // Step 6: Read ChannelMonitor state from disk + let mut channel_monitors = persister + .read_channelmonitors(keys_manager.clone()) + .expect("Failed to read channel monitors from disk"); + + // Step 7: Initialize the ChannelManager + let mut user_config = UserConfig::default(); + user_config.channel_handshake_limits.force_announced_channel_preference = false; + let channel_manager = { + if let Ok(mut f) = fs::File::open(format!("{}/manager", ldk_data_dir.clone())) { + let mut channel_monitor_mut_references = Vec::new(); + for (_, channel_monitor) in channel_monitors.iter_mut() { + channel_monitor_mut_references.push(channel_monitor); + } + let read_args = ChannelManagerReadArgs::new( + Arc::clone(&keys_manager), + Arc::clone(&chain_access), + Arc::clone(&chain_monitor), + Arc::clone(&chain_access), + Arc::clone(&logger), + user_config, + channel_monitor_mut_references, + ); + let (_hash, channel_manager) = + <(BlockHash, ChannelManager)>::read(&mut f, read_args) + .expect("Failed to read channel manager from disk"); + channel_manager + } else { + // We're starting a fresh node. + let dummy_block_hash = bitcoin::blockdata::constants::genesis_block(config.network) + .header + .block_hash(); + + let chain_params = ChainParameters { + network: config.network, + best_block: BestBlock::new(dummy_block_hash, 0), + }; + let fresh_channel_manager = channelmanager::ChannelManager::new( + Arc::clone(&chain_access), + Arc::clone(&chain_monitor), + Arc::clone(&chain_access), + Arc::clone(&logger), + Arc::clone(&keys_manager), + user_config, + chain_params, + ); + fresh_channel_manager + } + }; + + let channel_manager = Arc::new(channel_manager); + + // Step 8: Give ChannelMonitors to ChainMonitor + for (_blockhash, channel_monitor) in channel_monitors.drain(..) { + let funding_outpoint = channel_monitor.get_funding_txo().0; + chain_monitor.watch_channel(funding_outpoint, channel_monitor).unwrap(); + } + + // Step 10: Initialize the P2PGossipSync + let network_graph = Arc::new( + io_utils::read_network_graph(Arc::clone(&config), Arc::clone(&logger)) + .expect("Failed to read the network graph"), + ); + let gossip_sync = Arc::new(P2PGossipSync::new( + Arc::clone(&network_graph), + None::>, + Arc::clone(&logger), + )); + + //// Step 11: Initialize the PeerManager + let ephemeral_bytes: [u8; 32] = rand::thread_rng().gen(); + let lightning_msg_handler = MessageHandler { + chan_handler: Arc::clone(&channel_manager), + route_handler: Arc::clone(&gossip_sync), + }; + + let peer_manager: Arc = Arc::new(PeerManager::new( + lightning_msg_handler, + keys_manager.get_node_secret(Recipient::Node).unwrap(), + &ephemeral_bytes, + Arc::clone(&logger), + Arc::new(IgnoringMessageHandler {}), + )); + + // Step 12: Initialize routing ProbabilisticScorer + let scorer = Arc::new(Mutex::new(io_utils::read_scorer( + Arc::clone(&config), + Arc::clone(&network_graph), + Arc::clone(&logger), + ))); + + // Step 13: Init payment info storage + // TODO: persist payment info to disk + let inbound_payments = Arc::new(Mutex::new(HashMap::new())); + let outbound_payments = Arc::new(Mutex::new(HashMap::new())); + + // Step 14: Restore event handler from disk or create a new one. + let event_queue = if let Ok(mut f) = + fs::File::open(format!("{}/{}", ldk_data_dir.clone(), event::EVENTS_PERSISTENCE_KEY)) + { + Arc::new( + EventQueue::read(&mut f, Arc::clone(&persister)) + .expect("Failed to read event queue from disk."), + ) + } else { + Arc::new(EventQueue::new(Arc::clone(&persister))) + }; + + let event_handler = Arc::new(EventHandler::new( + Arc::clone(&chain_access), + Arc::clone(&event_queue), + Arc::clone(&channel_manager), + Arc::clone(&network_graph), + Arc::clone(&keys_manager), + Arc::clone(&inbound_payments), + Arc::clone(&outbound_payments), + Arc::clone(&logger), + Arc::clone(&config), + )); + + //// Step 16: Create Router and InvoicePayer + let router = DefaultRouter::new( + Arc::clone(&network_graph), + Arc::clone(&logger), + keys_manager.get_secure_random_bytes(), + ); + + let invoice_payer = Arc::new(InvoicePayer::new( + Arc::clone(&channel_manager), + router, + Arc::clone(&scorer), + Arc::clone(&logger), + Arc::clone(&event_handler), + payment::Retry::Timeout(LDK_PAYMENT_RETRY_TIMEOUT), + )); + + let peer_store = if let Ok(mut f) = fs::File::open(format!( + "{}/{}", + ldk_data_dir.clone(), + peer_store::PEER_INFO_PERSISTENCE_KEY + )) { + Arc::new( + PeerInfoStorage::read(&mut f, Arc::clone(&persister)) + .expect("Failed to read peer information from disk."), + ) + } else { + Arc::new(PeerInfoStorage::new(Arc::clone(&persister))) + }; + + let running = RwLock::new(None); + + LdkLite { + running, + config, + chain_access, + event_queue, + event_handler, + channel_manager, + chain_monitor, + peer_manager, + keys_manager, + gossip_sync, + persister, + logger, + scorer, + invoice_payer, + inbound_payments, + outbound_payments, + peer_store, + } + } +} + +/// Wraps all objects that need to be preserved during the run time of [`LdkLite`]. Will be dropped +/// upon [`LdkLite::stop()`]. +struct Runtime { + tokio_runtime: Arc, + _background_processor: BackgroundProcessor, + stop_networking: Arc, + stop_wallet_sync: Arc, +} + +/// The main interface object of the simplified API, wrapping the necessary LDK and BDK functionalities. +/// +/// Needs to be initialized and instantiated through [`Builder::build`]. +pub struct LdkLite { + running: RwLock>, + config: Arc, + chain_access: Arc>, + event_queue: Arc>, + event_handler: Arc, + channel_manager: Arc, + chain_monitor: Arc, + peer_manager: Arc, + keys_manager: Arc, + gossip_sync: Arc, + persister: Arc, + logger: Arc, + scorer: Arc>, + invoice_payer: Arc>>, + inbound_payments: Arc, + outbound_payments: Arc, + peer_store: Arc>, +} + +impl LdkLite { + /// Starts the necessary background tasks, such as handling events coming from user input, + /// LDK/BDK, and the peer-to-peer network. After this returns, the [`LdkLite`] instance can be + /// controlled via the provided API methods in a thread-safe manner. + pub fn start(&mut self) -> Result<(), Error> { + // Acquire a run lock and hold it until we're setup. + let mut run_lock = self.running.write().unwrap(); + if run_lock.is_some() { + // We're already running. + return Err(Error::AlreadyRunning); + } + + let runtime = self.setup_runtime()?; + *run_lock = Some(runtime); + Ok(()) + } + + /// Disconnects all peers, stops all running background tasks, and shuts down [`LdkLite`]. + pub fn stop(&mut self) -> Result<(), Error> { + let mut run_lock = self.running.write().unwrap(); + if run_lock.is_none() { + return Err(Error::NotRunning); + } + + let runtime = run_lock.as_ref().unwrap(); + + // Stop wallet sync + runtime.stop_wallet_sync.store(true, Ordering::Release); + + // Stop networking + runtime.stop_networking.store(true, Ordering::Release); + self.peer_manager.disconnect_all_peers(); + + // Drop the held runtimes. + self.chain_access.drop_runtime(); + self.event_handler.drop_runtime(); + + // Drop the runtime, which stops the background processor and any possibly remaining tokio threads. + *run_lock = None; + Ok(()) + } + + fn setup_runtime(&self) -> Result { + let tokio_runtime = + Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()); + + self.chain_access.set_runtime(Arc::clone(&tokio_runtime)); + self.event_handler.set_runtime(Arc::clone(&tokio_runtime)); + + // Setup wallet sync + let chain_access = Arc::clone(&self.chain_access); + let sync_cman = Arc::clone(&self.channel_manager); + let sync_cmon = Arc::clone(&self.chain_monitor); + let sync_logger = Arc::clone(&self.logger); + let stop_wallet_sync = Arc::new(AtomicBool::new(false)); + let stop_sync = Arc::clone(&stop_wallet_sync); + + tokio_runtime.block_on(async move { + let mut rounds = 0; + loop { + if stop_sync.load(Ordering::Acquire) { + return; + } + // As syncing the on-chain wallet is much more time-intesive, we only sync every + // fifth round. + if rounds == 0 { + let now = Instant::now(); + match chain_access.sync_wallet().await { + Ok(()) => log_info!( + sync_logger, + "On-chain wallet sync finished in {}ms.", + now.elapsed().as_millis() + ), + Err(_) => log_error!(sync_logger, "On-chain wallet sync failed"), + } + } + rounds = (rounds + 1) % 5; + + let confirmables = vec![ + &*sync_cman as &(dyn Confirm + Send + Sync), + &*sync_cmon as &(dyn Confirm + Send + Sync), + ]; + let now = Instant::now(); + match chain_access.sync(confirmables).await { + Ok(()) => log_info!( + sync_logger, + "Lightning wallet sync finished in {}ms.", + now.elapsed().as_millis() + ), + Err(e) => log_error!(sync_logger, "Lightning wallet sync failed: {}", e), + } + tokio::time::sleep(Duration::from_secs(5)).await; + } + }); + + // Setup networking + let peer_manager_connection_handler = Arc::clone(&self.peer_manager); + let listening_port = self.config.listening_port; + let stop_networking = Arc::new(AtomicBool::new(false)); + let stop_listen = Arc::clone(&stop_networking); + + tokio_runtime.spawn(async move { + let listener = + tokio::net::TcpListener::bind(format!("0.0.0.0:{}", listening_port)).await.expect( + "Failed to bind to listen port - is something else already listening on it?", + ); + loop { + if stop_listen.load(Ordering::Acquire) { + return; + } + let peer_mgr = Arc::clone(&peer_manager_connection_handler); + let tcp_stream = listener.accept().await.unwrap().0; + tokio::spawn(async move { + lightning_net_tokio::setup_inbound( + Arc::clone(&peer_mgr), + tcp_stream.into_std().unwrap(), + ) + .await; + }); + } + }); + + // Regularly reconnect to channel peers. + let connect_cm = Arc::clone(&self.channel_manager); + let connect_pm = Arc::clone(&self.peer_manager); + let connect_logger = Arc::clone(&self.logger); + let connect_peer_store = Arc::clone(&self.peer_store); + let stop_connect = Arc::clone(&stop_networking); + tokio_runtime.spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + loop { + if stop_connect.load(Ordering::Acquire) { + return; + } + interval.tick().await; + let pm_peers = connect_pm.get_peer_node_ids(); + for node_id in connect_cm + .list_channels() + .iter() + .map(|chan| chan.counterparty.node_id) + .filter(|id| !pm_peers.contains(id)) + { + for peer_info in connect_peer_store.peers() { + if peer_info.pubkey == node_id { + let _ = do_connect_peer( + peer_info.pubkey, + peer_info.address.clone(), + Arc::clone(&connect_pm), + Arc::clone(&connect_logger), + ) + .await; + } + } + } + } + }); + + // Setup background processing + let _background_processor = BackgroundProcessor::start( + Arc::clone(&self.persister), + Arc::clone(&self.invoice_payer), + Arc::clone(&self.chain_monitor), + Arc::clone(&self.channel_manager), + BPGossipSync::p2p(Arc::clone(&self.gossip_sync)), + Arc::clone(&self.peer_manager), + Arc::clone(&self.logger), + Some(Arc::clone(&self.scorer)), + ); + + // TODO: frequently check back on background_processor if there was an error + + Ok(Runtime { tokio_runtime, _background_processor, stop_networking, stop_wallet_sync }) + } + + /// Blocks until the next event is available. + /// + /// Note: this will always return the same event until handling is confirmed via [`LdkLite::event_handled`]. + pub fn next_event(&self) -> Event { + self.event_queue.next_event() + } + + /// Confirm the last retrieved event handled. + pub fn event_handled(&self) { + self.event_queue.event_handled().unwrap(); + } + + /// Returns our own node id + pub fn my_node_id(&self) -> Result { + if self.running.read().unwrap().is_none() { + return Err(Error::NotRunning); + } + + Ok(self.channel_manager.get_our_node_id()) + } + + /// Retrieve a new on-chain/funding address. + pub fn new_funding_address(&mut self) -> Result { + if self.running.read().unwrap().is_none() { + return Err(Error::NotRunning); + } + + let funding_address = self.chain_access.get_new_address()?; + log_info!(self.logger, "Generated new funding address: {}", funding_address); + Ok(funding_address) + } + + /// Connect to a node and open a new channel. Disconnects and re-connects are handled automatically + /// + /// Returns a temporary channel id + pub fn connect_open_channel( + &self, node_pubkey_and_address: &str, channel_amount_sats: u64, announce_channel: bool, + ) -> Result<(), Error> { + let runtime_lock = self.running.read().unwrap(); + if runtime_lock.is_none() { + return Err(Error::NotRunning); + } + + let peer_info = PeerInfo::try_from(node_pubkey_and_address.to_string())?; + + let runtime = runtime_lock.as_ref().unwrap(); + + let con_peer_info = peer_info.clone(); + let con_success = Arc::new(AtomicBool::new(false)); + let con_success_cloned = Arc::clone(&con_success); + let con_logger = Arc::clone(&self.logger); + let con_pm = Arc::clone(&self.peer_manager); + + runtime.tokio_runtime.spawn(async move { + let res = connect_peer_if_necessary( + con_peer_info.pubkey, + con_peer_info.address, + con_pm, + con_logger, + ) + .await; + con_success_cloned.store(res.is_ok(), Ordering::Release); + }); + + if !con_success.load(Ordering::Acquire) { + return Err(Error::ConnectionFailed); + } + + let user_config = UserConfig { + channel_handshake_limits: ChannelHandshakeLimits { + // lnd's max to_self_delay is 2016, so we want to be compatible. + their_to_self_delay: 2016, + ..Default::default() + }, + channel_handshake_config: ChannelHandshakeConfig { + announced_channel: announce_channel, + ..Default::default() + }, + ..Default::default() + }; + + match self.channel_manager.create_channel( + peer_info.pubkey, + channel_amount_sats, + 0, + 0, + Some(user_config), + ) { + Ok(_) => { + self.peer_store.add_peer(peer_info.clone())?; + log_info!( + self.logger, + "Initiated channel creation with peer {}. ", + peer_info.pubkey + ); + Ok(()) + } + Err(e) => { + log_error!(self.logger, "Failed to initiate channel creation: {:?}", e); + Err(Error::ChannelCreationFailed) + } + } + } + + /// Close a previously opened channel. + pub fn close_channel( + &self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, + ) -> Result<(), Error> { + self.peer_store.remove_peer(counterparty_node_id)?; + match self.channel_manager.close_channel(channel_id, counterparty_node_id) { + Ok(_) => Ok(()), + Err(_) => Err(Error::ChannelClosingFailed), + } + } + + /// Send a payement given an invoice. + pub fn send_payment(&self, invoice: Invoice) -> Result { + if self.running.read().unwrap().is_none() { + return Err(Error::NotRunning); + } + + // TODO: ensure we never tried paying the given payment hash before + let status = match self.invoice_payer.pay_invoice(&invoice) { + Ok(_payment_id) => { + let payee_pubkey = invoice.recover_payee_pub_key(); + // TODO: is this unwrap safe? Would a payment to an invoice with None amount ever + // succeed? Should we allow to set the amount in the interface or via a dedicated + // method? + let amt_msat = invoice.amount_milli_satoshis().unwrap(); + log_info!(self.logger, "Initiated sending {} msats to {}", amt_msat, payee_pubkey); + PaymentStatus::Pending + } + Err(payment::PaymentError::Invoice(e)) => { + log_error!(self.logger, "Failed to send payment due to invalid invoice: {}", e); + return Err(Error::InvoiceInvalid); + } + Err(payment::PaymentError::Routing(e)) => { + log_error!(self.logger, "Failed to send payment due to routing failure: {}", e.err); + return Err(Error::RoutingFailed); + } + Err(payment::PaymentError::Sending(e)) => { + log_error!(self.logger, "Failed to send payment: {:?}", e); + PaymentStatus::Failed + } + }; + + let payment_hash = PaymentHash(invoice.payment_hash().clone().into_inner()); + let payment_secret = Some(invoice.payment_secret().clone()); + + let mut outbound_payments_lock = self.outbound_payments.lock().unwrap(); + outbound_payments_lock.insert( + payment_hash, + PaymentInfo { + preimage: None, + secret: payment_secret, + status, + amount_msat: invoice.amount_milli_satoshis(), + }, + ); + + Ok(payment_hash) + } + + /// Send a spontaneous, aka. "keysend", payment + pub fn send_spontaneous_payment( + &self, amount_msat: u64, node_id: &str, + ) -> Result { + if self.running.read().unwrap().is_none() { + return Err(Error::NotRunning); + } + + let pubkey = hex_utils::to_compressed_pubkey(node_id).ok_or(Error::PeerInfoParseFailed)?; + + let payment_preimage = PaymentPreimage(self.keys_manager.get_secure_random_bytes()); + let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); + + let status = match self.invoice_payer.pay_pubkey( + pubkey, + payment_preimage, + amount_msat, + self.config.default_cltv_expiry_delta, + ) { + Ok(_payment_id) => { + log_info!(self.logger, "Initiated sending {} msats to {}.", amount_msat, node_id); + PaymentStatus::Pending + } + Err(payment::PaymentError::Invoice(e)) => { + log_error!(self.logger, "Failed to send payment due to invalid invoice: {}", e); + return Err(Error::InvoiceInvalid); + } + Err(payment::PaymentError::Routing(e)) => { + log_error!(self.logger, "Failed to send payment due to routing failure: {}", e.err); + return Err(Error::RoutingFailed); + } + Err(payment::PaymentError::Sending(e)) => { + log_error!(self.logger, "Failed to send payment: {:?}", e); + PaymentStatus::Failed + } + }; + + let mut outbound_payments_lock = self.outbound_payments.lock().unwrap(); + outbound_payments_lock.insert( + payment_hash, + PaymentInfo { preimage: None, secret: None, status, amount_msat: Some(amount_msat) }, + ); + + Ok(payment_hash) + } + + /// Returns a payable invoice that can be used to request and receive a payment. + pub fn receive_payment( + &self, amount_msat: Option, description: &str, expiry_secs: u32, + ) -> Result { + let mut inbound_payments_lock = self.inbound_payments.lock().unwrap(); + + let currency = match self.config.network { + bitcoin::Network::Bitcoin => Currency::Bitcoin, + bitcoin::Network::Testnet => Currency::BitcoinTestnet, + bitcoin::Network::Regtest => Currency::Regtest, + bitcoin::Network::Signet => Currency::Signet, + }; + let keys_manager = Arc::clone(&self.keys_manager); + let invoice = match lightning_invoice::utils::create_invoice_from_channelmanager( + &self.channel_manager, + keys_manager, + currency, + amount_msat, + description.to_string(), + expiry_secs, + ) { + Ok(inv) => { + log_info!(self.logger, "Invoice created: {}", inv); + inv + } + Err(e) => { + log_error!(self.logger, "Failed to create invoice: {}", e); + return Err(Error::InvoiceCreationFailed); + } + }; + + let payment_hash = PaymentHash(invoice.payment_hash().clone().into_inner()); + inbound_payments_lock.insert( + payment_hash, + PaymentInfo { + preimage: None, + secret: Some(invoice.payment_secret().clone()), + status: PaymentStatus::Pending, + amount_msat, + }, + ); + Ok(invoice) + } + + /// Query for information about the status of a specific payment. + pub fn payment_info(&self, payment_hash: &[u8; 32]) -> Option { + let payment_hash = PaymentHash(*payment_hash); + + { + let outbound_payments_lock = self.outbound_payments.lock().unwrap(); + if let Some(payment_info) = outbound_payments_lock.get(&payment_hash) { + return Some((*payment_info).clone()); + } + } + + { + let inbound_payments_lock = self.inbound_payments.lock().unwrap(); + if let Some(payment_info) = inbound_payments_lock.get(&payment_hash) { + return Some((*payment_info).clone()); + } + } + + None + } +} + +async fn connect_peer_if_necessary( + pubkey: PublicKey, peer_addr: SocketAddr, peer_manager: Arc, + logger: Arc, +) -> Result<(), Error> { + for node_pubkey in peer_manager.get_peer_node_ids() { + if node_pubkey == pubkey { + return Ok(()); + } + } + + do_connect_peer(pubkey, peer_addr, peer_manager, logger).await +} + +async fn do_connect_peer( + pubkey: PublicKey, peer_addr: SocketAddr, peer_manager: Arc, + logger: Arc, +) -> Result<(), Error> { + log_info!(logger, "connecting to peer: {}@{}", pubkey, peer_addr); + match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), pubkey, peer_addr).await + { + Some(connection_closed_future) => { + let mut connection_closed_future = Box::pin(connection_closed_future); + loop { + match futures::poll!(&mut connection_closed_future) { + std::task::Poll::Ready(_) => { + log_info!(logger, "peer connection closed: {}@{}", pubkey, peer_addr); + return Err(Error::ConnectionFailed); + } + std::task::Poll::Pending => {} + } + // Avoid blocking the tokio context by sleeping a bit + match peer_manager.get_peer_node_ids().iter().find(|id| **id == pubkey) { + Some(_) => return Ok(()), + None => tokio::time::sleep(Duration::from_millis(10)).await, + } + } + } + None => { + log_error!(logger, "failed to connect to peer: {}@{}", pubkey, peer_addr); + Err(Error::ConnectionFailed) + } + } +} + +// +// Structs wrapping the particular information which should easily be +// understandable, parseable, and transformable, i.e., we'll try to avoid +// exposing too many technical detail here. +/// Represents a payment. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PaymentInfo { + /// The pre-image used by the payment. + pub preimage: Option, + /// The secret used by the payment. + pub secret: Option, + /// The status of the payment. + pub status: PaymentStatus, + /// The amount transferred. + pub amount_msat: Option, +} + +/// Represents the current status of a payment. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum PaymentStatus { + /// The payment is still pending. + Pending, + /// The payment suceeded. + Succeeded, + /// The payment failed. + Failed, +} + +type ChainMonitor = chainmonitor::ChainMonitor< + InMemorySigner, + Arc, + Arc>, + Arc>, + Arc, + Arc, +>; + +type PeerManager = SimpleArcPeerManager< + SocketDescriptor, + ChainMonitor, + ChainAccess, + ChainAccess, + dyn Access + Send + Sync, + FilesystemLogger, +>; + +pub(crate) type ChannelManager = SimpleArcChannelManager< + ChainMonitor, + ChainAccess, + ChainAccess, + FilesystemLogger, +>; + +type InvoicePayer = payment::InvoicePayer< + Arc, + Router, + Arc>, + Arc, + F, +>; + +type Router = DefaultRouter, Arc>; +type Scorer = ProbabilisticScorer, Arc>; + +type GossipSync = + P2PGossipSync, Arc, Arc>; + +pub(crate) type NetworkGraph = gossip::NetworkGraph>; + +pub(crate) type PaymentInfoStorage = Mutex>; + +#[cfg(test)] +mod tests {} diff --git a/src/logger.rs b/src/logger.rs index 317f294b4..5f3d5cb96 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -78,6 +78,7 @@ macro_rules! log_error { log_given_level!($logger, lightning::util::logger::Level::Error, $($arg)*) ) } +#[allow(unused_imports)] pub(crate) use log_error; #[allow(unused_macros)] @@ -86,6 +87,7 @@ macro_rules! log_warn { log_given_level!($logger, lightning::util::logger::Level::Warn, $($arg)*) ) } +#[allow(unused_imports)] pub(crate) use log_warn; #[allow(unused_macros)] @@ -94,6 +96,7 @@ macro_rules! log_info { log_given_level!($logger, lightning::util::logger::Level::Info, $($arg)*) ) } +#[allow(unused_imports)] pub(crate) use log_info; #[allow(unused_macros)] @@ -102,6 +105,8 @@ macro_rules! log_debug { log_given_level!($logger, lightning::util::logger::Level::Debug, $($arg)*) ) } +#[allow(unused_imports)] +pub(crate) use log_debug; #[allow(unused_macros)] macro_rules! log_trace { @@ -109,4 +114,5 @@ macro_rules! log_trace { log_given_level!($logger, lightning::util::logger::Level::Trace, $($arg)*) ) } +#[allow(unused_imports)] pub(crate) use log_trace; diff --git a/src/peer_store.rs b/src/peer_store.rs new file mode 100644 index 000000000..d8a856e2f --- /dev/null +++ b/src/peer_store.rs @@ -0,0 +1,172 @@ +use crate::hex_utils; +use crate::Error; + +use lightning::util::persist::KVStorePersister; +use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; + +use bitcoin::secp256k1::PublicKey; + +use std::convert::TryFrom; +use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; +use std::sync::{Arc, RwLock}; + +/// The peer information will be persisted under this key. +pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers"; + +pub(crate) struct PeerInfoStorage { + peers: RwLock, + persister: Arc, +} + +impl PeerInfoStorage { + pub(crate) fn new(persister: Arc) -> Self { + let peers = RwLock::new(PeerInfoStorageSerWrapper(Vec::new())); + Self { peers, persister } + } + + pub(crate) fn add_peer(&self, peer_info: PeerInfo) -> Result<(), Error> { + let mut locked_peers = self.peers.write().unwrap(); + + // Check if we have the peer. If so, either update it or do nothing. + for stored_info in locked_peers.0.iter_mut() { + if stored_info.pubkey == peer_info.pubkey { + if stored_info.address != peer_info.address { + stored_info.address = peer_info.address; + } + return Ok(()); + } + } + + locked_peers.0.push(peer_info); + self.persister + .persist(PEER_INFO_PERSISTENCE_KEY, &*locked_peers) + .map_err(|_| Error::PersistenceFailed)?; + + return Ok(()); + } + + pub(crate) fn remove_peer(&self, peer_pubkey: &PublicKey) -> Result<(), Error> { + let mut locked_peers = self.peers.write().unwrap(); + + locked_peers.0.retain(|info| info.pubkey != *peer_pubkey); + + self.persister + .persist(PEER_INFO_PERSISTENCE_KEY, &*locked_peers) + .map_err(|_| Error::PersistenceFailed)?; + + return Ok(()); + } + + pub(crate) fn peers(&self) -> Vec { + self.peers.read().unwrap().0.clone() + } +} + +impl ReadableArgs> for PeerInfoStorage { + #[inline] + fn read( + reader: &mut R, persister: Arc, + ) -> Result { + let peers: RwLock = RwLock::new(Readable::read(reader)?); + Ok(Self { peers, persister }) + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct PeerInfoStorageSerWrapper(Vec); + +impl Readable for PeerInfoStorageSerWrapper { + fn read( + reader: &mut R, + ) -> Result { + let len: u16 = Readable::read(reader)?; + let mut peers = Vec::with_capacity(len as usize); + for _ in 0..len { + peers.push(Readable::read(reader)?); + } + Ok(PeerInfoStorageSerWrapper(peers)) + } +} + +impl Writeable for PeerInfoStorageSerWrapper { + fn write(&self, writer: &mut W) -> Result<(), lightning::io::Error> { + (self.0.len() as u16).write(writer)?; + for e in self.0.iter() { + e.write(writer)?; + } + Ok(()) + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct PeerInfo { + pub pubkey: PublicKey, + pub address: SocketAddr, +} + +impl Readable for PeerInfo { + fn read( + reader: &mut R, + ) -> Result { + let pubkey = Readable::read(reader)?; + + let ip_type: u8 = Readable::read(reader)?; + let ip_addr = if ip_type == 0 { + let octets: [u8; 4] = Readable::read(reader)?; + IpAddr::from(octets) + } else { + let octets: [u8; 16] = Readable::read(reader)?; + IpAddr::from(octets) + }; + + let port: u16 = Readable::read(reader)?; + + let address = SocketAddr::new(ip_addr, port); + + Ok(PeerInfo { pubkey, address }) + } +} + +impl Writeable for PeerInfo { + fn write(&self, writer: &mut W) -> Result<(), lightning::io::Error> { + self.pubkey.write(writer)?; + + let ip_type: u8 = if self.address.ip().is_ipv4() { 0 } else { 1 }; + ip_type.write(writer)?; + + let octets = match self.address.ip() { + IpAddr::V4(ip) => ip.octets().to_vec(), + IpAddr::V6(ip) => ip.octets().to_vec(), + }; + octets.write(writer)?; + + self.address.port().write(writer)?; + + Ok(()) + } +} + +impl TryFrom for PeerInfo { + type Error = Error; + + fn try_from(peer_pubkey_and_ip_addr: String) -> Result { + let mut pubkey_and_addr = peer_pubkey_and_ip_addr.split("@"); + let pubkey = pubkey_and_addr.next(); + let peer_addr_str = pubkey_and_addr.next(); + if pubkey.is_none() || peer_addr_str.is_none() { + return Err(Error::PeerInfoParseFailed); + } + + let peer_addr = peer_addr_str.unwrap().to_socket_addrs().map(|mut r| r.next()); + if peer_addr.is_err() || peer_addr.as_ref().unwrap().is_none() { + return Err(Error::PeerInfoParseFailed); + } + + let pubkey = hex_utils::to_compressed_pubkey(pubkey.unwrap()); + if pubkey.is_none() { + return Err(Error::PeerInfoParseFailed); + } + + Ok(PeerInfo { pubkey: pubkey.unwrap(), address: peer_addr.unwrap().unwrap() }) + } +}