Skip to content

Commit

Permalink
WIP holymoly
Browse files Browse the repository at this point in the history
  • Loading branch information
tnull committed Oct 16, 2024
1 parent c20359a commit bb4bcd8
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 8 deletions.
6 changes: 6 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,12 @@ fn build_with_store_internal(

liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager)));

gossip_source.set_gossip_verifier(
Arc::clone(&chain_source),
Arc::clone(&peer_manager),
Arc::clone(&runtime),
);

let connection_manager =
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));

Expand Down
4 changes: 4 additions & 0 deletions src/chain/bitcoind_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ impl BitcoindRpcClient {
Self { rpc_client, latest_mempool_timestamp }
}

pub(crate) fn rpc_client(&self) -> Arc<RpcClient> {
Arc::clone(&self.rpc_client)
}

pub(crate) async fn broadcast_transaction(&self, tx: &Transaction) -> std::io::Result<Txid> {
let tx_serialized = bitcoin::consensus::encode::serialize_hex(tx);
let tx_json = serde_json::json!(tx_serialized);
Expand Down
8 changes: 8 additions & 0 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use lightning_transaction_sync::EsploraSyncClient;

use lightning_block_sync::init::{synchronize_listeners, validate_best_block_header};
use lightning_block_sync::poll::{ChainPoller, ChainTip, ValidatedBlockHeader};
use lightning_block_sync::rpc::RpcClient;
use lightning_block_sync::SpvClient;

use bdk_esplora::EsploraAsyncExt;
Expand Down Expand Up @@ -192,6 +193,13 @@ impl ChainSource {
}
}

pub(crate) fn as_utxo_source(&self) -> Option<Arc<RpcClient>> {
match self {
Self::BitcoindRpc { bitcoind_rpc_client, .. } => Some(bitcoind_rpc_client.rpc_client()),
_ => None,
}
}

pub(crate) async fn continuously_sync_wallets(
&self, mut stop_sync_receiver: tokio::sync::watch::Receiver<()>,
channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
Expand Down
67 changes: 59 additions & 8 deletions src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
// accordance with one or both of these licenses.

use crate::chain::ChainSource;
use crate::config::RGS_SYNC_TIMEOUT_SECS;
use crate::logger::{log_trace, FilesystemLogger, Logger};
use crate::types::{GossipSync, Graph, P2PGossipSync, RapidGossipSync};
use crate::logger::{log_error, log_trace, FilesystemLogger, Logger};
use crate::types::{GossipSync, Graph, P2PGossipSync, PeerManager, RapidGossipSync, UtxoLookup};
use crate::Error;

use lightning::routing::utxo::UtxoLookup;
use lightning_block_sync::gossip::{FutureSpawner, GossipVerifier};

use std::future::Future;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Duration;

pub(crate) enum GossipSource {
P2PNetwork {
gossip_sync: Arc<P2PGossipSync>,
logger: Arc<FilesystemLogger>,
},
RapidGossipSync {
gossip_sync: Arc<RapidGossipSync>,
Expand All @@ -32,10 +35,10 @@ impl GossipSource {
pub fn new_p2p(network_graph: Arc<Graph>, logger: Arc<FilesystemLogger>) -> Self {
let gossip_sync = Arc::new(P2PGossipSync::new(
network_graph,
None::<Arc<dyn UtxoLookup + Send + Sync>>,
logger,
None::<Arc<UtxoLookup>>,
Arc::clone(&logger),
));
Self::P2PNetwork { gossip_sync }
Self::P2PNetwork { gossip_sync, logger }
}

pub fn new_rgs(
Expand All @@ -58,9 +61,30 @@ impl GossipSource {
}
}

pub(crate) fn set_gossip_verifier(
&self, chain_source: Arc<ChainSource>, peer_manager: Arc<PeerManager>,
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
) {
match self {
_ => (),
Self::P2PNetwork { gossip_sync, logger } => {
if let Some(utxo_source) = chain_source.as_utxo_source() {
let spawner = RuntimeSpawner::new(Arc::clone(&runtime), Arc::clone(&logger));
let gossip_verifier = Arc::new(GossipVerifier::new(
utxo_source,
spawner,
Arc::clone(gossip_sync),
peer_manager,
));
gossip_sync.add_utxo_lookup(Some(gossip_verifier));
}
},
}
}

pub async fn update_rgs_snapshot(&self) -> Result<u32, Error> {
match self {
Self::P2PNetwork { gossip_sync: _ } => Ok(0),
Self::P2PNetwork { gossip_sync: _, .. } => Ok(0),
Self::RapidGossipSync { gossip_sync, server_url, latest_sync_timestamp, logger } => {
let query_timestamp = latest_sync_timestamp.load(Ordering::Acquire);
let query_url = format!("{}/{}", server_url, query_timestamp);
Expand Down Expand Up @@ -101,3 +125,30 @@ impl GossipSource {
}
}
}

pub(crate) struct RuntimeSpawner {
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
logger: Arc<FilesystemLogger>,
}

impl RuntimeSpawner {
pub(crate) fn new(
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>, logger: Arc<FilesystemLogger>,
) -> Self {
Self { runtime, logger }
}
}

impl FutureSpawner for RuntimeSpawner {
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
let rt_lock = self.runtime.read().unwrap();
if rt_lock.is_none() {
log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen.");
debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen.");
return;
}

let runtime = rt_lock.as_ref().unwrap();
runtime.spawn(future);
}
}

0 comments on commit bb4bcd8

Please sign in to comment.