Skip to content

Commit

Permalink
Merge pull request #56 from arik-so/2023/08/connect-to-peers-asynchro…
Browse files Browse the repository at this point in the history
…nously

Track gossip data prior to peer connection loop.
  • Loading branch information
TheBlueMatt authored Sep 6, 2023
2 parents ed395b1 + 42f64e6 commit cf3c4a9
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 91 deletions.
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ pub(crate) const MAX_SNAPSHOT_SCOPE: u32 = 3600 * 24 * 21; // three weeks
/// That reminder may be either in the form of a channel announcement, or in the form of empty
/// updates in both directions.
pub(crate) const CHANNEL_REMINDER_AGE: Duration = Duration::from_secs(6 * 24 * 60 * 60);
/// The number of successful peer connections to await prior to continuing to gossip storage.
/// The application will still work if the number of specified peers is lower, as long as there is
/// at least one successful peer connection, but it may result in long startup times.
pub(crate) const CONNECTED_PEER_ASSERTION_LIMIT: usize = 5;
pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true;

pub(crate) fn snapshot_generation_interval() -> u32 {
Expand Down
202 changes: 111 additions & 91 deletions src/tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ use lightning;
use lightning::ln::peer_handler::{
ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
};
use lightning::{log_error, log_info, log_warn};
use lightning::{log_info, log_warn};
use lightning::routing::gossip::NetworkGraph;
use lightning::sign::KeysManager;
use lightning::util::logger::Logger;
use tokio::sync::mpsc;
use tokio::task::JoinSet;

use crate::config;
use crate::downloader::GossipRouter;
use crate::types::{GossipMessage, GossipPeerManager};

pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(persistence_sender: mpsc::Sender<GossipMessage>,
completion_sender: mpsc::Sender<()>,
network_graph: Arc<NetworkGraph<L>>,
logger: L
completion_sender: mpsc::Sender<()>,
network_graph: Arc<NetworkGraph<L>>,
logger: L,
) where L::Target: Logger {
let mut key = [42; 32];
let mut random_data = [43; 32];
Expand Down Expand Up @@ -66,12 +67,29 @@ pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(pe

log_info!(logger, "Connecting to Lightning peers...");
let peers = config::ln_peers();
let mut handles = JoinSet::new();
let mut connected_peer_count = 0;

if peers.len() <= config::CONNECTED_PEER_ASSERTION_LIMIT {
log_warn!(logger, "Peer assertion threshold is {}, but only {} peers specified.", config::CONNECTED_PEER_ASSERTION_LIMIT, peers.len());
}

for current_peer in peers {
let initial_connection_succeeded = connect_peer(current_peer, peer_handler.clone(), logger.clone()).await;
if initial_connection_succeeded {
connected_peer_count += 1;
let peer_handler_clone = peer_handler.clone();
let logger_clone = logger.clone();
handles.spawn(async move {
connect_peer(current_peer, peer_handler_clone, logger_clone).await
});
}

while let Some(connection_result) = handles.join_next().await {
if let Ok(connection) = connection_result {
if connection {
connected_peer_count += 1;
if connected_peer_count >= config::CONNECTED_PEER_ASSERTION_LIMIT {
break;
}
}
}
}

Expand All @@ -81,99 +99,101 @@ pub(crate) async fn download_gossip<L: Deref + Clone + Send + Sync + 'static>(pe

log_info!(logger, "Connected to {} Lightning peers!", connected_peer_count);

tokio::spawn(async move {
let mut previous_announcement_count = 0u64;
let mut previous_update_count = 0u64;
let mut is_caught_up_with_gossip = false;

let mut i = 0u32;
let mut latest_new_gossip_time = Instant::now();
let mut needs_to_notify_persister = false;

loop {
i += 1; // count the background activity
let sleep = tokio::time::sleep(Duration::from_secs(5));
sleep.await;

{
let counter = router.counter.read().unwrap();
let total_message_count = counter.channel_announcements + counter.channel_updates;
let new_message_count = total_message_count - previous_announcement_count - previous_update_count;

let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
// TODO: make new message threshold (20) adjust based on connected peer count
is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
if new_message_count > 0 {
latest_new_gossip_time = Instant::now();
}

// if we either aren't caught up, or just stopped/started being caught up
if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
log_info!(
logger,
"gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
i,
total_message_count,
new_message_count,
counter.channel_announcements,
counter.channel_announcements_with_mismatched_scripts,
counter.channel_updates,
counter.channel_updates_without_htlc_max_msats
);
} else {
log_info!(logger, "Monitoring for gossip…")
}

if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
log_info!(logger, "caught up with gossip!");
needs_to_notify_persister = true;
} else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
log_info!(logger, "Received new messages since catching up with gossip!");
}
let mut previous_announcement_count = 0u64;
let mut previous_update_count = 0u64;
let mut is_caught_up_with_gossip = false;

let mut i = 0u32;
let mut latest_new_gossip_time = Instant::now();
let mut needs_to_notify_persister = false;

loop {
i += 1; // count the background activity
let sleep = tokio::time::sleep(Duration::from_secs(5));
sleep.await;

{
let counter = router.counter.read().unwrap();
let total_message_count = counter.channel_announcements + counter.channel_updates;
let new_message_count = total_message_count - previous_announcement_count - previous_update_count;

let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
// TODO: make new message threshold (20) adjust based on connected peer count
is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
if new_message_count > 0 {
latest_new_gossip_time = Instant::now();
}

let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
if continuous_caught_up_duration.as_secs() > 600 {
log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!");
}
// if we either aren't caught up, or just stopped/started being caught up
if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
log_info!(
logger,
"gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
i,
total_message_count,
new_message_count,
counter.channel_announcements,
counter.channel_announcements_with_mismatched_scripts,
counter.channel_updates,
counter.channel_updates_without_htlc_max_msats
);
} else {
log_info!(logger, "Monitoring for gossip…")
}

previous_announcement_count = counter.channel_announcements;
previous_update_count = counter.channel_updates;
if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
log_info!(logger, "caught up with gossip!");
needs_to_notify_persister = true;
} else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
log_info!(logger, "Received new messages since catching up with gossip!");
}

if needs_to_notify_persister {
needs_to_notify_persister = false;
completion_sender.send(()).await.unwrap();
let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
if continuous_caught_up_duration.as_secs() > 600 {
log_warn!(logger, "No new gossip messages in 10 minutes! Something's amiss!");
}

previous_announcement_count = counter.channel_announcements;
previous_update_count = counter.channel_updates;
}
});

if needs_to_notify_persister {
needs_to_notify_persister = false;
completion_sender.send(()).await.unwrap();
}
}
}

async fn connect_peer<L: Deref + Clone + Send + Sync + 'static>(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager<L>, logger: L) -> bool where L::Target: Logger {
log_info!(logger, "Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
let connection = lightning_net_tokio::connect_outbound(
Arc::clone(&peer_manager),
current_peer.0,
current_peer.1,
).await;
if let Some(disconnection_future) = connection {
log_info!(logger, "Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
tokio::spawn(async move {
disconnection_future.await;
loop {
log_warn!(logger, "Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
Arc::clone(&peer_manager),
current_peer.0,
current_peer.1,
).await {
disconnection_future.await;
// we seek to find out if the first connection attempt was successful
let (sender, mut receiver) = mpsc::channel::<bool>(1);
tokio::spawn(async move {
log_info!(logger, "Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
let mut is_first_iteration = true;
loop {
if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
Arc::clone(&peer_manager),
current_peer.0,
current_peer.1,
).await {
log_info!(logger, "Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
if is_first_iteration {
sender.send(true).await.unwrap();
}
disconnection_future.await;
log_warn!(logger, "Disconnected from peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
} else {
log_warn!(logger, "Failed to connect to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
if is_first_iteration {
sender.send(false).await.unwrap();
}
tokio::time::sleep(Duration::from_secs(10)).await;
}
});
true
} else {
log_error!(logger, "Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
false
}
is_first_iteration = false;
tokio::time::sleep(Duration::from_secs(10)).await;
log_warn!(logger, "Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
}
});

let success = receiver.recv().await.unwrap();
success
}

0 comments on commit cf3c4a9

Please sign in to comment.