diff --git a/src/fiber/graph.rs b/src/fiber/graph.rs index 525c2f2c..a357cce5 100644 --- a/src/fiber/graph.rs +++ b/src/fiber/graph.rs @@ -11,8 +11,6 @@ use ckb_types::packed::{OutPoint, Script}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use std::collections::HashMap; -use tentacle::multiaddr::Multiaddr; -use tentacle::secio::PeerId; use thiserror::Error; use tracing::log::error; use tracing::{debug, info, warn}; @@ -138,8 +136,6 @@ pub struct NetworkGraph { // Similar to the best_height, this is the last update time of the network graph. // We assume that we have already synced the graph up to this time - ASSUME_MAX_MESSAGE_TIMESTAMP_GAP. last_update_timestamp: u64, - // when we restarting a node, we will reconnect to these peers - connected_peer_addresses: HashMap, nodes: HashMap, store: S, chain_hash: Hash256, @@ -172,7 +168,6 @@ where last_update_timestamp: 0, channels: HashMap::new(), nodes: HashMap::new(), - connected_peer_addresses: HashMap::new(), store, chain_hash: get_chain_hash(), }; @@ -212,9 +207,6 @@ where } self.nodes.insert(node.node_id, node.clone()); } - for (peer, addr) in self.store.get_connected_peer(None) { - self.connected_peer_addresses.insert(peer, addr); - } } pub fn get_best_height(&self) -> u64 { @@ -421,25 +413,6 @@ where self.chain_hash == chain_hash } - pub fn add_connected_peer(&mut self, peer_id: &PeerId, address: Multiaddr) { - self.connected_peer_addresses - .insert(peer_id.clone(), address.clone()); - self.store.insert_connected_peer(peer_id.clone(), address); - } - - pub fn get_connected_peers(&self) -> Vec<(&PeerId, &Multiaddr)> { - self.connected_peer_addresses.iter().collect() - } - - pub fn get_peers_to_sync_network_graph(&self) -> Vec<(&PeerId, &Multiaddr)> { - self.connected_peer_addresses.iter().take(3).collect() - } - - pub fn remove_connected_peer(&mut self, peer_id: &PeerId) { - self.connected_peer_addresses.remove(peer_id); - self.store.remove_connected_peer(peer_id); - } - pub fn get_node_inbounds( &self, node_id: Pubkey, @@ -479,7 +452,6 @@ where pub fn reset(&mut self) { self.channels.clear(); self.nodes.clear(); - self.connected_peer_addresses.clear(); } pub fn init_payment_session(&self, payment_request: SendPaymentCommand) -> PaymentSession { @@ -808,9 +780,6 @@ pub trait NetworkGraphStateStore { ) -> (Vec, JsonBytes); fn insert_channel(&self, channel: ChannelInfo); fn insert_node(&self, node: NodeInfo); - fn insert_connected_peer(&self, peer_id: PeerId, multiaddr: Multiaddr); - fn get_connected_peer(&self, peer_id: Option) -> Vec<(PeerId, Multiaddr)>; - fn remove_connected_peer(&self, peer_id: &PeerId); fn get_payment_session(&self, payment_hash: Hash256) -> Option; fn insert_payment_session(&self, session: PaymentSession); } diff --git a/src/fiber/network.rs b/src/fiber/network.rs index ded5b300..3467266e 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -1,5 +1,3 @@ -use crate::ckb::config::UdtCfgInfos; -use crate::fiber::serde_utils::EntityHex; use ckb_hash::blake2b_256; use ckb_jsonrpc_types::{BlockNumber, Status, TxStatus}; use ckb_types::core::TransactionView; @@ -14,8 +12,10 @@ use ractor::{ use rand::Rng; use secp256k1::{Message, Secp256k1}; use serde::{Deserialize, Serialize}; -use serde_with::serde_as; +use serde_with::{serde_as, DisplayFromStr}; use std::borrow::Cow; +use std::collections::hash_map::Entry; +use std::hash::RandomState; use tentacle::utils::extract_peer_id; use std::collections::{HashMap, HashSet}; @@ -67,12 +67,14 @@ use super::types::{ }; use super::{FiberConfig, ASSUME_NETWORK_ACTOR_ALIVE}; +use crate::ckb::config::UdtCfgInfos; use crate::ckb::contracts::{check_udt_script, get_udt_whitelist, is_udt_type_auto_accept}; use crate::ckb::{CkbChainMessage, FundingRequest, FundingTx, TraceTxRequest, TraceTxResponse}; use crate::fiber::channel::{ AddTlcCommand, AddTlcResponse, TxCollaborationCommand, TxUpdateCommand, }; use crate::fiber::graph::{ChannelInfo, PaymentSession}; +use crate::fiber::serde_utils::EntityHex; use crate::fiber::types::{ secp256k1_instance, FiberChannelMessage, PaymentOnionPacket, PeeledPaymentOnionPacket, TxSignatures, @@ -98,6 +100,13 @@ const ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW: &str = const ASSUME_NETWORK_MYSELF_ALIVE: &str = "network actor myself alive"; +// This is the default approximate number of peers that we need to keep connection to to make the +// network operating normally. +const NUM_PEER_CONNECTIONS: usize = 40; + +// The duration for which we will try to maintain the number of peers in connection. +const MAINTAINING_CONNECTIONS_INTERVAL: Duration = Duration::from_secs(3600); + pub(crate) fn get_chain_hash() -> Hash256 { Default::default() } @@ -152,8 +161,14 @@ pub struct NodeInfoResponse { #[derive(Debug)] pub enum NetworkActorCommand { /// Network commands + // Connect to a peer, and optionally also save the peer to the peer store. ConnectPeer(Multiaddr), DisconnectPeer(PeerId), + // Save the address of a peer to the peer store, the address here must be a valid + // multiaddr with the peer id. + SavePeerAddress(Multiaddr), + // We need to maintain a certain number of peers connections to keep the network running. + MaintainConnections(usize), // For internal use and debugging only. Most of the messages requires some // changes to local state. Even if we can send a message to a peer, some // part of the local state is not changed. @@ -539,7 +554,8 @@ pub struct NetworkActor { impl NetworkActor where - S: ChannelActorStateStore + S: NetworkActorStateStore + + ChannelActorStateStore + NetworkGraphStateStore + InvoiceStore + Clone @@ -669,10 +685,7 @@ where } }; for message in messages { - if let Err(e) = self - .process_broadcasted_message(&state.network, message) - .await - { + if let Err(e) = self.process_broadcasted_message(state, message).await { let fail_message = format!("Failed to process broadcasted message: {:?}", &e); error!("{}", &fail_message); @@ -1007,10 +1020,6 @@ where self.on_service_event(e).await; } NetworkActorEvent::PeerConnected(id, pubkey, session) => { - self.network_graph - .write() - .await - .add_connected_peer(&id, session.address.clone()); state.on_peer_connected(&id, pubkey, &session).await; // Notify outside observers. myself @@ -1023,7 +1032,6 @@ where .expect(ASSUME_NETWORK_MYSELF_ALIVE); } NetworkActorEvent::PeerDisconnected(id, session) => { - self.network_graph.write().await.remove_connected_peer(&id); state.on_peer_disconnected(&id); // Notify outside observers. myself @@ -1196,7 +1204,6 @@ where state: &mut NetworkActorState, command: NetworkActorCommand, ) -> crate::Result<()> { - debug!("Handling command: {:?}", command); match command { NetworkActorCommand::SendFiberMessage(FiberMessageWithPeerId { peer_id, message }) => { state.send_message_to_peer(&peer_id, message).await?; @@ -1235,6 +1242,53 @@ where } } + NetworkActorCommand::SavePeerAddress(addr) => match extract_peer_id(&addr) { + Some(peer) => { + debug!("Saved peer id {:?} with address {:?}", &peer, &addr); + state.save_peer_address(peer, addr); + } + None => { + error!("Failed to save address to peer store: unable to extract peer id from address {:?}", &addr); + } + }, + + NetworkActorCommand::MaintainConnections(num_peers) => { + debug!("Maintaining connections to {} peers", num_peers); + + let num_connected_peers = state.peer_session_map.len(); + if num_connected_peers >= num_peers { + debug!( + "Already connected to {} peers, skipping connecting to more peers", + num_connected_peers, + ); + return Ok(()); + } + let peers_to_connect = state + .state_to_be_persisted + .sample_n_peers_to_connect(num_peers - num_connected_peers); + debug!( + "Randomly selected peers to connect: {:?}", + &peers_to_connect + ); + for (peer_id, addresses) in peers_to_connect { + if let Some(session) = state.get_peer_session(&peer_id) { + debug!( + "Randomly selected peer {:?} already connected with session id {:?}, skipping connection", + peer_id, session + ); + continue; + } + for addr in addresses { + state + .network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::ConnectPeer(addr.clone()), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + } + } + } + NetworkActorCommand::OpenChannel(open_channel, reply) => { match state.create_outbound_channel(open_channel).await { Ok((_, channel_id)) => { @@ -1535,10 +1589,7 @@ where ); for message in broadcasted_message_queue { let (_peer_id, message) = message; - if let Err(e) = self - .process_broadcasted_message(&state.network, message) - .await - { + if let Err(e) = self.process_broadcasted_message(state, message).await { error!("Failed to process broadcasted message: {:?}", e); } } @@ -1753,13 +1804,12 @@ where NetworkActorCommand::BroadcastMessage(message.clone()), )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); - self.process_broadcasted_message(&state.network, message) - .await + self.process_broadcasted_message(state, message).await } async fn process_broadcasted_message( &self, - network: &ActorRef, + state: &mut NetworkActorState, message: FiberBroadcastMessage, ) -> Result<(), Error> { match message { @@ -1785,18 +1835,17 @@ where &node_announcement ); - // TODO: bookkeeping how many nodes we have connected to. Stop connecting once we surpass a threshold. - for addr in &node_announcement.addresses { - network.send_message(NetworkActorMessage::new_command( - NetworkActorCommand::ConnectPeer(addr.clone()), - ))?; - } - // Add the node to the network graph. self.network_graph .write() .await - .process_node_announcement(node_announcement); + .process_node_announcement(node_announcement.clone()); + + let peer_id = node_announcement.peer_id(); + state.save_announced_peer_addresses( + peer_id, + node_announcement.addresses.clone(), + ); Ok(()) } _ => { @@ -2050,6 +2099,11 @@ struct NetworkSyncState { // The timestamp we started syncing. starting_time: u64, // All the pinned peers that we are going to sync with. + // TODO: the intention of passing a few peer addresses to the sync status was to let the user + // select a few peers to sync network graph (these peers may have faster connection to the node). + // After some refactoring, the code below is a little bit clouded. We are currently only connecting + // to random peers. If this functionality is desired, we should make a config option for it. + // Otherwise, remove this completely. pinned_syncing_peers: Vec<(PeerId, Multiaddr)>, active_syncers: HashMap>, // Number of peers with whom we succeeded to sync. @@ -2150,13 +2204,13 @@ impl NetworkSyncStatus { starting_height: u64, ending_height: u64, starting_time: u64, - syncing_peers: Vec<(PeerId, Multiaddr)>, + pinned_syncing_peers: Vec<(PeerId, Multiaddr)>, ) -> Self { let state = NetworkSyncState { starting_height, ending_height, starting_time, - pinned_syncing_peers: syncing_peers, + pinned_syncing_peers, active_syncers: Default::default(), succeeded: 0, failed: 0, @@ -2193,6 +2247,7 @@ enum RequestState { pub struct NetworkActorState { store: S, + state_to_be_persisted: PersistentNetworkActorState, // The name of the node to be announced to the network, may be empty. node_name: Option, peer_id: PeerId, @@ -2264,6 +2319,104 @@ pub struct NetworkActorState { broadcasted_message_queue: Vec<(PeerId, FiberBroadcastMessage)>, } +#[serde_as] +#[derive(Default, Clone, Serialize, Deserialize)] +pub struct PersistentNetworkActorState { + // These addresses are announced by the peer itself to the network. + // When a new NodeAnnouncement message is received, we will overwrite the old addresses. + #[serde_as(as = "Vec<(DisplayFromStr, _)>")] + announced_peer_addresses: HashMap>, + // These addresses are saved by the user (e.g. the user sends a ConnectPeer rpc to the node), + // we will then save these addresses to the peer store. + #[serde_as(as = "Vec<(DisplayFromStr, _)>")] + saved_peer_addresses: HashMap>, +} + +impl PersistentNetworkActorState { + pub fn new() -> Self { + Default::default() + } + + fn get_peer_addresses(&self, peer_id: &PeerId) -> HashSet { + let empty = vec![]; + self.announced_peer_addresses + .get(peer_id) + .unwrap_or(&empty) + .iter() + .chain( + self.saved_peer_addresses + .get(peer_id) + .unwrap_or(&empty) + .iter(), + ) + .map(|addr| addr.clone()) + .collect::>() + } + + /// Save a single peer address to the peer store. If this address for the peer does not exist, + /// then return false, otherwise return true. + fn save_peer_address(&mut self, peer_id: PeerId, addr: Multiaddr) -> bool { + match self.saved_peer_addresses.entry(peer_id) { + Entry::Occupied(mut entry) => { + if entry.get().contains(&addr) { + false + } else { + entry.get_mut().push(addr); + true + } + } + Entry::Vacant(entry) => { + entry.insert(vec![addr]); + true + } + } + } + + /// Save announced peer addresses to the peer store. If the peer addresses are updated, + /// return true, otherwise return false. This method will NOT keep the old announced addresses. + fn save_announced_peer_addresses(&mut self, peer_id: PeerId, addr: Vec) -> bool { + match self.announced_peer_addresses.entry(peer_id) { + Entry::Occupied(mut entry) => { + if entry.get() == &addr { + false + } else { + entry.insert(addr); + true + } + } + Entry::Vacant(entry) => { + entry.insert(addr); + true + } + } + } + + pub(crate) fn sample_n_peers_to_connect(&self, n: usize) -> HashMap> { + let nodes = self + .saved_peer_addresses + .keys() + .into_iter() + .chain(self.announced_peer_addresses.keys().into_iter()) + .collect::>(); + + nodes + .into_iter() + .take(n) + .map(|peer_id| { + ( + peer_id.clone(), + self.get_peer_addresses(peer_id).into_iter().collect(), + ) + }) + .collect() + } +} + +pub trait NetworkActorStateStore { + fn get_network_actor_state(&self, id: &PeerId) -> Option; + fn insert_network_actor_state(&self, id: &PeerId, state: PersistentNetworkActorState); +} + static CHANNEL_ACTOR_NAME_PREFIX: AtomicU64 = AtomicU64::new(0u64); // ractor requires that the actor name is unique, so we add a prefix to the actor name. @@ -2278,7 +2431,8 @@ fn generate_channel_actor_name(local_peer_id: &PeerId, remote_peer_id: &PeerId) impl NetworkActorState where - S: ChannelActorStateStore + S: NetworkActorStateStore + + ChannelActorStateStore + NetworkGraphStateStore + InvoiceStore + Clone @@ -2943,6 +3097,40 @@ where self.maybe_tell_syncer_peer_disconnected(id); } + pub(crate) fn get_peer_addresses(&self, peer_id: &PeerId) -> HashSet { + self.state_to_be_persisted.get_peer_addresses(peer_id) + } + + pub(crate) fn save_peer_address(&mut self, peer_id: PeerId, address: Multiaddr) -> bool { + if self + .state_to_be_persisted + .save_peer_address(peer_id, address) + { + self.persist_state(); + true + } else { + false + } + } + + pub(crate) fn save_announced_peer_addresses( + &mut self, + peer_id: PeerId, + addresses: Vec, + ) { + if self + .state_to_be_persisted + .save_announced_peer_addresses(peer_id, addresses) + { + self.persist_state(); + } + } + + fn persist_state(&self) { + self.store + .insert_network_actor_state(&self.peer_id, self.state_to_be_persisted.clone()); + } + async fn maybe_sync_network_graph(&mut self, peer_id: &PeerId) { if let NetworkSyncStatus::Running(state) = &mut self.sync_status { if let Some(_) = state @@ -3341,7 +3529,8 @@ pub struct NetworkActorStartArguments { #[rasync_trait] impl Actor for NetworkActor where - S: ChannelActorStateStore + S: NetworkActorStateStore + + ChannelActorStateStore + NetworkGraphStateStore + InvoiceStore + Clone @@ -3431,17 +3620,19 @@ where let mut graph = self.network_graph.write().await; - let peers_to_sync_network_graph = graph - .get_peers_to_sync_network_graph() - .into_iter() - .map(|(a, b)| (a.clone(), b.clone())) - .collect(); + let mut state_to_be_persisted = self + .store + .get_network_actor_state(&my_peer_id) + .unwrap_or_default(); + + for bootnode in &config.bootnode_addrs { + let addr = Multiaddr::from_str(bootnode.as_str()).expect("valid bootnode"); + let peer_id = extract_peer_id(&addr).expect("valid peer id"); + state_to_be_persisted.save_peer_address(peer_id, addr); + } + let height = graph.get_best_height(); let last_update = graph.get_last_update_timestamp(); - debug!( - "Trying to sync network graph with peers {:?} with height {} and last update {:?}", - &peers_to_sync_network_graph, &height, &last_update - ); let chain_actor = self.chain_actor.clone(); let current_block_number = call!(chain_actor, CkbChainMessage::GetCurrentBlockNumber, ()) @@ -3452,10 +3643,12 @@ where height, current_block_number, last_update, - peers_to_sync_network_graph, + vec![], ); + let mut state = NetworkActorState { store: self.store.clone(), + state_to_be_persisted, node_name: config.announced_node_name, peer_id: my_peer_id, announced_addrs, @@ -3494,18 +3687,6 @@ where let node_announcement = state.get_or_create_new_node_announcement_message(); graph.process_node_announcement(node_announcement); - // load the connected peers from the network graph - let peers = graph.get_connected_peers(); - // TODO: we need to bootstrap the network if no peers are connected. - if peers.is_empty() { - warn!("No connected peers found in the network graph"); - } - for (_peer_id, addr) in peers { - myself.send_message(NetworkActorMessage::new_command( - NetworkActorCommand::ConnectPeer(addr.clone()), - ))?; - } - let announce_node_interval_seconds = config.announce_node_interval_seconds(); if announce_node_interval_seconds > 0 { myself.send_interval(Duration::from_secs(announce_node_interval_seconds), || { @@ -3515,9 +3696,47 @@ where }); } + // Save bootnodes to the network actor state. + state.persist_state(); + Ok(state) } + async fn post_start( + &self, + myself: ActorRef, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + debug!("Trying to connect to peers with mutual channels"); + for (peer_id, channel_id, channel_state) in self.store.get_channel_states(None) { + let addresses = state.get_peer_addresses(&peer_id); + + debug!( + "Reconnecting channel {:x} peers {:?} in state {:?}", + &channel_id, &peer_id, &channel_state + ); + for addr in addresses { + myself + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::ConnectPeer(addr), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + } + } + + myself + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::MaintainConnections(NUM_PEER_CONNECTIONS), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + myself.send_interval(MAINTAINING_CONNECTIONS_INTERVAL, || { + NetworkActorMessage::new_command(NetworkActorCommand::MaintainConnections( + NUM_PEER_CONNECTIONS, + )) + }); + Ok(()) + } + async fn handle( &self, myself: ActorRef, @@ -3547,6 +3766,8 @@ where if let Err(err) = state.control.close().await { error!("Failed to close tentacle service: {}", err); } + debug!("Saving network actor state for {:?}", state.peer_id); + state.persist_state(); debug!("Network service for {:?} shutdown", state.peer_id); // The event receiver may have been closed already. // We ignore the error here. @@ -3695,7 +3916,14 @@ pub(crate) fn emit_service_event( } pub async fn start_network< - S: ChannelActorStateStore + NetworkGraphStateStore + InvoiceStore + Clone + Send + Sync + 'static, + S: NetworkActorStateStore + + ChannelActorStateStore + + NetworkGraphStateStore + + InvoiceStore + + Clone + + Send + + Sync + + 'static, >( config: FiberConfig, chain_actor: ActorRef, diff --git a/src/fiber/tests/channel.rs b/src/fiber/tests/channel.rs index 68f7be5f..ee495709 100644 --- a/src/fiber/tests/channel.rs +++ b/src/fiber/tests/channel.rs @@ -1233,3 +1233,20 @@ async fn test_commitment_tx_capacity() { output_capacity as u128 ); } + +#[tokio::test] +async fn test_connect_to_peers_with_mutual_channel_on_restart() { + let node_a_funding_amount = 100000000000; + let node_b_funding_amount = 6200000000; + + let (mut node_a, node_b, _new_channel_id) = + create_nodes_with_established_channel(node_a_funding_amount, node_b_funding_amount, true) + .await; + + node_a.restart().await; + + node_a.expect_event( + |event| matches!(event, NetworkServiceEvent::PeerConnected(id, _addr) if id == &node_b.peer_id), + ) + .await; +} diff --git a/src/fiber/tests/graph.rs b/src/fiber/tests/graph.rs index ce9fd6d4..9d784dac 100644 --- a/src/fiber/tests/graph.rs +++ b/src/fiber/tests/graph.rs @@ -11,17 +11,8 @@ use ckb_types::{ prelude::Entity, }; use secp256k1::{PublicKey, SecretKey, XOnlyPublicKey}; -use tentacle::{multiaddr::Multiaddr, secio::PeerId}; -use super::test_utils::{generate_keypair, generate_pubkey}; - -fn generate_keys(num: usize) -> Vec { - let mut keys = vec![]; - for _ in 0..num { - keys.push(generate_pubkey()); - } - keys -} +use super::test_utils::generate_keypair; fn generate_key_pairs(num: usize) -> Vec<(SecretKey, PublicKey)> { let mut keys = vec![]; @@ -193,41 +184,6 @@ impl MockNetworkGraph { } } -#[test] -fn test_graph_connected_peers() { - let temp_path = tempfile::tempdir().unwrap(); - let store = Store::new(temp_path.path()); - let keys = generate_keys(1); - let public_key1 = keys[0]; - let mut network_graph = NetworkGraph::new(store, public_key1.into()); - - let peer_id = PeerId::random(); - let address: Multiaddr = "/ip4/127.0.0.1/tcp/10000".parse().unwrap(); - network_graph.add_connected_peer(&peer_id, address.clone()); - - let connected_peers = network_graph.get_connected_peers(); - assert_eq!(connected_peers.len(), 1); - assert_eq!(connected_peers[0], (&peer_id, &address)); - - network_graph.reset(); - let connected_peers = network_graph.get_connected_peers(); - assert_eq!(connected_peers.len(), 0); - - // load from db - network_graph.load_from_store(); - let connected_peers = network_graph.get_connected_peers(); - assert_eq!(connected_peers.len(), 1); - assert_eq!(connected_peers[0], (&peer_id, &address)); - - network_graph.remove_connected_peer(&peer_id); - let connected_peers = network_graph.get_connected_peers(); - assert_eq!(connected_peers.len(), 0); - - network_graph.load_from_store(); - let connected_peers = network_graph.get_connected_peers(); - assert_eq!(connected_peers.len(), 0); -} - #[test] fn test_graph_channel_info() { let mut mock_network = MockNetworkGraph::new(1); diff --git a/src/fiber/tests/network.rs b/src/fiber/tests/network.rs index 236a0c10..6b7cfe46 100644 --- a/src/fiber/tests/network.rs +++ b/src/fiber/tests/network.rs @@ -2,7 +2,8 @@ use super::test_utils::{init_tracing, NetworkNode}; use crate::{ fiber::{ graph::{ChannelInfo, NetworkGraphStateStore}, - network::get_chain_hash, + network::{get_chain_hash, NetworkActorStateStore}, + tests::test_utils::NetworkNodeConfigBuilder, types::{ ChannelAnnouncement, ChannelUpdate, FiberBroadcastMessage, FiberMessage, NodeAnnouncement, Privkey, Pubkey, @@ -21,8 +22,11 @@ use ckb_types::{ packed::OutPoint, prelude::{Builder, Entity, Pack}, }; -use std::str::FromStr; -use tentacle::{multiaddr::MultiAddr, secio::PeerId}; +use std::{borrow::Cow, str::FromStr}; +use tentacle::{ + multiaddr::{MultiAddr, Protocol}, + secio::PeerId, +}; fn get_test_priv_key() -> Privkey { Privkey::from_slice(&[42u8; 32]) @@ -37,6 +41,21 @@ fn get_test_peer_id() -> PeerId { PeerId::from_public_key(&pub_key) } +fn get_fake_peer_id_and_address() -> (PeerId, MultiAddr) { + let peer_id = PeerId::random(); + let mut address = MultiAddr::from_str(&format!( + "/ip4/{}.{}.{}.{}/tcp/{}", + rand::random::(), + rand::random::(), + rand::random::(), + rand::random::(), + rand::random::() + )) + .expect("valid multiaddr"); + address.push(Protocol::P2P(Cow::Owned(peer_id.clone().into_bytes()))); + (peer_id, address) +} + fn create_fake_channel_announcement_mesage( priv_key: Privkey, capacity: u64, @@ -520,3 +539,106 @@ async fn test_sync_node_announcement_after_restart() { let node = node2.store.get_nodes(Some(test_pub_key)); assert!(!node.is_empty()); } + +#[tokio::test] +async fn test_persisting_network_state() { + let mut node = NetworkNode::new().await; + let state = node.store.clone(); + let peer_id = node.peer_id.clone(); + node.stop().await; + assert!(state.get_network_actor_state(&peer_id).is_some()) +} + +#[tokio::test] +async fn test_persisting_bootnode() { + let (boot_peer_id, address) = get_fake_peer_id_and_address(); + let address_string = format!("{}", &address); + + let mut node = NetworkNode::new_with_config( + NetworkNodeConfigBuilder::new() + .fiber_config_updater(move |config| config.bootnode_addrs = vec![address_string]) + .build(), + ) + .await; + let state = node.store.clone(); + let peer_id = node.peer_id.clone(); + node.stop().await; + + let state = state.get_network_actor_state(&peer_id).unwrap(); + let peers = state.sample_n_peers_to_connect(1); + assert_eq!(peers.get(&boot_peer_id), Some(&vec![address])); +} + +#[tokio::test] +async fn test_persisting_announced_nodes() { + let mut node = new_synced_node("test").await; + + let announcement = create_fake_node_announcement_mesage_version1(); + let node_pk = announcement.node_id; + let peer_id = node_pk.tentacle_peer_id(); + + node.network_actor + .send_message(NetworkActorMessage::Event(NetworkActorEvent::PeerMessage( + peer_id.clone(), + FiberMessage::BroadcastMessage(FiberBroadcastMessage::NodeAnnouncement( + create_fake_node_announcement_mesage_version1(), + )), + ))) + .expect("send message to network actor"); + + // Wait for the above message to be processed. + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + node.stop().await; + let state = node.store.clone(); + let state = state.get_network_actor_state(&node.peer_id).unwrap(); + let peers = state.sample_n_peers_to_connect(1); + assert!(peers.get(&peer_id).is_some()); +} + +#[tokio::test] +async fn test_connecting_to_bootnode() { + let boot_node = NetworkNode::new().await; + let boot_node_address = format!("{}", boot_node.get_node_address()); + let boot_node_id = &boot_node.peer_id; + + let mut node = NetworkNode::new_with_config( + NetworkNodeConfigBuilder::new() + .fiber_config_updater(move |config| config.bootnode_addrs = vec![boot_node_address]) + .build(), + ) + .await; + + node.expect_event( + |event| matches!(event, NetworkServiceEvent::PeerConnected(id, _addr) if id == boot_node_id), + ) + .await; +} + +#[tokio::test] +async fn test_saving_and_connecting_to_node() { + init_tracing(); + + let node1 = NetworkNode::new().await; + let node1_address = node1.get_node_address().clone(); + let node1_id = &node1.peer_id; + + let mut node2 = NetworkNode::new().await; + + node2 + .network_actor + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::SavePeerAddress(node1_address), + )) + .expect("send message to network actor"); + + // Wait for the above message to be processed. + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + node2.restart().await; + + node2.expect_event( + |event| matches!(event, NetworkServiceEvent::PeerConnected(id, _addr) if id == node1_id), + ) + .await; +} diff --git a/src/fiber/tests/test_utils.rs b/src/fiber/tests/test_utils.rs index f123d1a4..63383fed 100644 --- a/src/fiber/tests/test_utils.rs +++ b/src/fiber/tests/test_utils.rs @@ -1,6 +1,3 @@ -use crate::fiber::graph::{ChannelInfo, NetworkGraph, NodeInfo}; -use crate::fiber::types::Pubkey; -use crate::invoice::{CkbInvoice, InvoiceError, InvoiceStore}; use ckb_jsonrpc_types::JsonBytes; use ckb_types::packed::OutPoint; use ckb_types::{core::TransactionView, packed::Byte32}; @@ -17,7 +14,6 @@ use std::{ time::Duration, }; use tempfile::TempDir as OldTempDir; -use tentacle::multiaddr::Multiaddr; use tentacle::{multiaddr::MultiAddr, secio::PeerId}; use tokio::sync::RwLock as TokioRwLock; use tokio::{ @@ -30,19 +26,20 @@ use crate::{ actors::{RootActor, RootActorMessage}, ckb::tests::test_utils::{submit_tx, trace_tx, trace_tx_hash, MockChainActor}, ckb::CkbChainMessage, + fiber::channel::{ChannelActorState, ChannelActorStateStore, ChannelState}, + fiber::graph::NetworkGraphStateStore, + fiber::graph::PaymentSession, + fiber::graph::{ChannelInfo, NetworkGraph, NodeInfo}, fiber::network::NetworkActorStartArguments, + fiber::network::{NetworkActor, NetworkActorCommand, NetworkActorMessage}, + fiber::network::{NetworkActorStateStore, PersistentNetworkActorState}, + fiber::types::Hash256, + fiber::types::Pubkey, + invoice::{CkbInvoice, InvoiceError, InvoiceStore}, tasks::{new_tokio_cancellation_token, new_tokio_task_tracker}, FiberConfig, NetworkServiceEvent, }; -use crate::fiber::graph::NetworkGraphStateStore; -use crate::fiber::graph::PaymentSession; -use crate::fiber::{ - channel::{ChannelActorState, ChannelActorStateStore, ChannelState}, - types::Hash256, - NetworkActor, NetworkActorCommand, NetworkActorMessage, -}; - static RETAIN_VAR: &str = "TEST_TEMP_RETAIN"; #[derive(Debug)] @@ -465,15 +462,28 @@ impl NetworkNode { #[derive(Clone, Default)] pub struct MemoryStore { + network_actor_sate_map: Arc>>, channel_actor_state_map: Arc>>, channels_map: Arc>>, - pub nodes_map: Arc>>, - connected_peer_addresses: Arc>>, + nodes_map: Arc>>, payment_sessions: Arc>>, invoice_store: Arc>>, invoice_hash_to_preimage: Arc>>, } +impl NetworkActorStateStore for MemoryStore { + fn get_network_actor_state(&self, id: &PeerId) -> Option { + self.network_actor_sate_map.read().unwrap().get(id).cloned() + } + + fn insert_network_actor_state(&self, id: &PeerId, state: PersistentNetworkActorState) { + self.network_actor_sate_map + .write() + .unwrap() + .insert(id.clone(), state); + } +} + impl NetworkGraphStateStore for MemoryStore { fn get_channels(&self, outpoint: Option) -> Vec { if let Some(outpoint) = outpoint { @@ -538,38 +548,6 @@ impl NetworkGraphStateStore for MemoryStore { .insert(node.node_id.clone(), node); } - fn insert_connected_peer(&self, peer_id: PeerId, multiaddr: Multiaddr) { - self.connected_peer_addresses - .write() - .unwrap() - .insert(peer_id, multiaddr); - } - - fn get_connected_peer(&self, peer_id: Option) -> Vec<(PeerId, Multiaddr)> { - if let Some(peer_id) = peer_id { - let mut res = vec![]; - - if let Some(addr) = self.connected_peer_addresses.read().unwrap().get(&peer_id) { - res.push((peer_id, addr.clone())); - } - res - } else { - self.connected_peer_addresses - .read() - .unwrap() - .iter() - .map(|(peer_id, addr)| (peer_id.clone(), addr.clone())) - .collect() - } - } - - fn remove_connected_peer(&self, peer_id: &PeerId) { - self.connected_peer_addresses - .write() - .unwrap() - .remove(peer_id); - } - fn get_payment_session(&self, id: Hash256) -> Option { self.payment_sessions.read().unwrap().get(&id).cloned() } diff --git a/src/fiber/types.rs b/src/fiber/types.rs index 4009e37d..8b49f39a 100644 --- a/src/fiber/types.rs +++ b/src/fiber/types.rs @@ -1494,6 +1494,10 @@ impl NodeAnnouncement { }; deterministically_hash(&unsigned_announcement) } + + pub fn peer_id(&self) -> PeerId { + PeerId::from_public_key(&self.node_id.into()) + } } impl From for molecule_fiber::UdtCellDep { diff --git a/src/main.rs b/src/main.rs index 94d4c2e9..1d17c837 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,24 @@ -use ckb_hash::blake2b_256; -use core::default::Default; use fnn::actors::RootActor; use fnn::cch::CchMessage; -use fnn::ckb::contracts::{get_script_by_contract, init_contracts_context, Contract}; -use fnn::ckb::CkbChainActor; -use fnn::fiber::graph::NetworkGraph; -use fnn::fiber::{channel::ChannelSubscribers, NetworkActorCommand, NetworkActorMessage}; +use fnn::ckb::{ + contracts::{get_script_by_contract, init_contracts_context, Contract}, + CkbChainActor, +}; +use fnn::fiber::{channel::ChannelSubscribers, graph::NetworkGraph}; use fnn::store::Store; use fnn::tasks::{ cancel_tasks_and_wait_for_completion, new_tokio_cancellation_token, new_tokio_task_tracker, }; use fnn::watchtower::{WatchtowerActor, WatchtowerMessage}; use fnn::{start_cch, start_network, start_rpc, Config}; -use ractor::Actor; -use secp256k1::Secp256k1; -use std::str::FromStr; + +use core::default::Default; use std::sync::Arc; use std::time::Duration; -use tentacle::multiaddr::Multiaddr; + +use ckb_hash::blake2b_256; +use ractor::Actor; +use secp256k1::Secp256k1; use tokio::sync::{mpsc, RwLock}; use tokio::{select, signal}; use tracing::{debug, error, info, info_span, trace}; @@ -85,8 +86,6 @@ pub async fn main() { const CHANNEL_SIZE: usize = 4000; let (event_sender, mut event_receiver) = mpsc::channel(CHANNEL_SIZE); - let bootnodes = fiber_config.bootnode_addrs.clone(); - let network_graph = Arc::new(RwLock::new(NetworkGraph::new( store.clone(), node_public_key.clone().into(), @@ -112,14 +111,6 @@ pub async fn main() { ) .await; - for bootnode in bootnodes { - let addr = Multiaddr::from_str(&bootnode).expect("valid bootnode"); - let command = NetworkActorCommand::ConnectPeer(addr); - network_actor - .send_message(NetworkActorMessage::new_command(command)) - .expect("ckb actor alive") - } - let watchtower_actor = Actor::spawn_linked( Some("watchtower".to_string()), WatchtowerActor::new(store.clone()), diff --git a/src/rpc/README.md b/src/rpc/README.md index b939ecef..2d154b8b 100644 --- a/src/rpc/README.md +++ b/src/rpc/README.md @@ -241,6 +241,7 @@ Attempts to connect to a peer. ###### Params * `address` - The address of the peer to connect to +* `save` - Whether to save the peer address, an optional parameter (default value true) ###### Returns diff --git a/src/rpc/peer.rs b/src/rpc/peer.rs index f1ffa4be..7022ccd8 100644 --- a/src/rpc/peer.rs +++ b/src/rpc/peer.rs @@ -9,9 +9,10 @@ use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use tentacle::{multiaddr::MultiAddr, secio::PeerId}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub(crate) struct ConnectPeerParams { address: MultiAddr, + save: Option, } #[serde_as] @@ -45,6 +46,15 @@ impl PeerRpcServer for PeerRpcServerImpl { async fn connect_peer(&self, params: ConnectPeerParams) -> Result<(), ErrorObjectOwned> { let message = NetworkActorMessage::Command(NetworkActorCommand::ConnectPeer(params.address.clone())); + if params.save.unwrap_or(true) { + crate::handle_actor_cast!( + self.actor, + NetworkActorMessage::Command(NetworkActorCommand::SavePeerAddress( + params.address.clone() + )), + params.clone() + )?; + } crate::handle_actor_cast!(self.actor, message, params) } diff --git a/src/store.rs b/src/store.rs index 4ef45fc6..c465f57d 100644 --- a/src/store.rs +++ b/src/store.rs @@ -2,6 +2,7 @@ use crate::{ fiber::{ channel::{ChannelActorState, ChannelActorStateStore, ChannelState}, graph::{ChannelInfo, NetworkGraphStateStore, NodeInfo, PaymentSession}, + network::{NetworkActorStateStore, PersistentNetworkActorState}, types::{Hash256, Pubkey}, }, invoice::{CkbInvoice, InvoiceError, InvoiceStore}, @@ -13,7 +14,7 @@ use ckb_types::prelude::Entity; use rocksdb::{prelude::*, DBIterator, Direction, IteratorMode, WriteBatch, DB}; use serde_json; use std::{path::Path, sync::Arc}; -use tentacle::{multiaddr::Multiaddr, secio::PeerId}; +use tentacle::secio::PeerId; #[derive(Clone)] pub struct Store { @@ -157,18 +158,19 @@ impl Batch { serde_json::to_vec(&node).expect("serialize NodeInfo should be OK"), ); } - KeyValue::PeerIdMultiAddr(peer_id, multiaddr) => { - let key = [&[PEER_ID_MULTIADDR_PREFIX], peer_id.as_bytes()].concat(); + KeyValue::WatchtowerChannel(channel_id, channel_data) => { + let key = [&[WATCHTOWER_CHANNEL_PREFIX], channel_id.as_ref()].concat(); self.put( key, - serde_json::to_vec(&multiaddr).expect("serialize Multiaddr should be OK"), + serde_json::to_vec(&channel_data).expect("serialize ChannelData should be OK"), ); } - KeyValue::WatchtowerChannel(channel_id, channel_data) => { - let key = [&[WATCHTOWER_CHANNEL_PREFIX], channel_id.as_ref()].concat(); + KeyValue::NetworkActorState(peer_id, persistent_network_actor_state) => { + let key = [&[PEER_ID_NETWORK_ACTOR_STATE_PREFIX], peer_id.as_bytes()].concat(); self.put( key, - serde_json::to_vec(&channel_data).expect("serialize ChannelData should be OK"), + serde_json::to_vec(&persistent_network_actor_state) + .expect("serialize PersistentNetworkActorState should be OK"), ); } } @@ -188,24 +190,26 @@ impl Batch { } /// -/// +--------------+--------------------+--------------------------+ -/// | KeyPrefix:: | Key:: | Value:: | -/// +--------------+--------------------+--------------------------+ -/// | 0 | Hash256 | ChannelActorState | -/// | 32 | Hash256 | CkbInvoice | -/// | 64 | PeerId | Hash256 | ChannelState | -/// | 96 | ChannelId | ChannelInfo | -/// | 97 | Block | Index | ChannelId | -/// | 98 | Timestamp | ChannelId | -/// | 128 | NodeId | NodeInfo | -/// | 129 | Timestamp | NodeId | -/// | 160 | PeerId | MultiAddr | -/// | 192 | Hash256 | PaymentSession | -/// | 224 | Hash256 | ChannelData | -/// +--------------+--------------------+--------------------------+ +/// +--------------+--------------------+-----------------------------+ +/// | KeyPrefix:: | Key:: | Value:: | +/// +--------------+--------------------+-----------------------------+ +/// | 0 | Hash256 | ChannelActorState | +/// | 16 | PeerId | PersistentNetworkActorState | +/// | 32 | Hash256 | CkbInvoice | +/// | 64 | PeerId | Hash256 | ChannelState | +/// | 96 | ChannelId | ChannelInfo | +/// | 97 | Block | Index | ChannelId | +/// | 98 | Timestamp | ChannelId | +/// | 128 | NodeId | NodeInfo | +/// | 129 | Timestamp | NodeId | +/// | 160 | PeerId | MultiAddr | +/// | 192 | Hash256 | PaymentSession | +/// | 224 | Hash256 | ChannelData | +/// +--------------+--------------------+-----------------------------+ /// const CHANNEL_ACTOR_STATE_PREFIX: u8 = 0; +const PEER_ID_NETWORK_ACTOR_STATE_PREFIX: u8 = 16; const CKB_INVOICE_PREFIX: u8 = 32; const CKB_INVOICE_PREIMAGE_PREFIX: u8 = 33; const PEER_ID_CHANNEL_ID_PREFIX: u8 = 64; @@ -214,7 +218,6 @@ const CHANNEL_ANNOUNCEMENT_INDEX_PREFIX: u8 = 97; const CHANNEL_UPDATE_INDEX_PREFIX: u8 = 98; pub(crate) const NODE_INFO_PREFIX: u8 = 128; const NODE_ANNOUNCEMENT_INDEX_PREFIX: u8 = 129; -const PEER_ID_MULTIADDR_PREFIX: u8 = 160; const PAYMENT_SESSION_PREFIX: u8 = 192; const WATCHTOWER_CHANNEL_PREFIX: u8 = 224; @@ -223,11 +226,33 @@ enum KeyValue { CkbInvoice(Hash256, CkbInvoice), CkbInvoicePreimage(Hash256, Hash256), PeerIdChannelId((PeerId, Hash256), ChannelState), - PeerIdMultiAddr(PeerId, Multiaddr), NodeInfo(Pubkey, NodeInfo), ChannelInfo(OutPoint, ChannelInfo), WatchtowerChannel(Hash256, ChannelData), PaymentSession(Hash256, PaymentSession), + NetworkActorState(PeerId, PersistentNetworkActorState), +} + +impl NetworkActorStateStore for Store { + fn get_network_actor_state(&self, id: &PeerId) -> Option { + let mut key = Vec::with_capacity(33); + key.push(PEER_ID_NETWORK_ACTOR_STATE_PREFIX); + key.extend_from_slice(id.as_bytes()); + let iter = self + .db + .prefix_iterator(key.as_ref()) + .find(|(col_key, _)| col_key.starts_with(&key)); + iter.map(|(_key, value)| { + serde_json::from_slice(value.as_ref()) + .expect("deserialize PersistentNetworkActorState should be OK") + }) + } + + fn insert_network_actor_state(&self, id: &PeerId, state: PersistentNetworkActorState) { + let mut batch = self.batch(); + batch.put_kv(KeyValue::NetworkActorState(id.clone(), state)); + batch.commit(); + } } impl ChannelActorStateStore for Store { @@ -446,30 +471,6 @@ impl NetworkGraphStateStore for Store { (nodes, JsonBytes::from_bytes(last_key.into())) } - fn get_connected_peer(&self, peer_id: Option) -> Vec<(PeerId, Multiaddr)> { - let key = match peer_id { - Some(peer_id) => { - let mut key = Vec::with_capacity(33); - key.push(PEER_ID_MULTIADDR_PREFIX); - key.extend_from_slice(peer_id.as_bytes()); - key - } - None => vec![PEER_ID_MULTIADDR_PREFIX], - }; - let iter = self - .db - .prefix_iterator(key.as_ref()) - .take_while(|(col_key, _)| col_key.starts_with(&key)); - iter.map(|(key, value)| { - let peer_id = - PeerId::from_bytes(key[1..].into()).expect("deserialize peer id should be OK"); - let addr = - serde_json::from_slice(value.as_ref()).expect("deserialize Multiaddr should be OK"); - (peer_id, addr) - }) - .collect() - } - fn insert_channel(&self, channel: ChannelInfo) { let mut batch = self.batch(); batch.put_kv(KeyValue::ChannelInfo(channel.out_point(), channel.clone())); @@ -482,23 +483,6 @@ impl NetworkGraphStateStore for Store { batch.commit(); } - fn insert_connected_peer(&self, peer_id: PeerId, multiaddr: Multiaddr) { - let mut batch = self.batch(); - batch.put_kv(KeyValue::PeerIdMultiAddr(peer_id, multiaddr)); - batch.commit(); - } - - fn remove_connected_peer(&self, peer_id: &PeerId) { - let prefix = [&[PEER_ID_MULTIADDR_PREFIX], peer_id.as_bytes()].concat(); - let iter = self - .db - .prefix_iterator(prefix.as_ref()) - .take_while(|(key, _)| key.starts_with(&prefix)); - for (key, _) in iter { - self.db.delete(key).expect("delete should be OK"); - } - } - fn get_payment_session(&self, payment_hash: Hash256) -> Option { let prefix = [&[PAYMENT_SESSION_PREFIX], payment_hash.as_ref()].concat(); self.get(prefix).map(|v| {