Skip to content

Commit

Permalink
Merge pull request #421 from quake/quake/gossip-private-address
Browse files Browse the repository at this point in the history
fix: gossip actor and graph should ignore private address
  • Loading branch information
chenyukang authored Dec 29, 2024
2 parents b9ac0a7 + f93989e commit bb5ff5f
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 6 deletions.
6 changes: 5 additions & 1 deletion src/fiber/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub struct FiberConfig {
)]
pub(crate) announce_listening_addr: Option<bool>,

/// whether to announce private address, this should be set to false unless you are running a private network or testing [default: false]
/// whether to announce or process private address, this should be set to false unless you are running a private network or testing [default: false]
#[arg(
name = "FIBER_ANNOUNCE_PRIVATE_ADDR",
long = "fiber-announce-private-addr",
Expand Down Expand Up @@ -377,6 +377,10 @@ impl FiberConfig {
self.announce_listening_addr.unwrap_or(false)
}

pub fn announce_private_addr(&self) -> bool {
self.announce_private_addr.unwrap_or(false)
}

pub fn open_channel_auto_accept_min_ckb_funding_amount(&self) -> u64 {
self.open_channel_auto_accept_min_ckb_funding_amount
.unwrap_or(DEFAULT_OPEN_CHANNEL_AUTO_ACCEPT_MIN_CKB_FUNDING_AMOUNT)
Expand Down
44 changes: 42 additions & 2 deletions src/fiber/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tentacle::{
secio::PeerId,
service::{ProtocolHandle, ProtocolMeta, ServiceAsyncControl, SessionType},
traits::ServiceProtocol,
utils::{is_reachable, multiaddr_to_socketaddr},
SessionId,
};
use tokio::sync::oneshot;
Expand Down Expand Up @@ -858,6 +859,7 @@ where
{
async fn new(
maintenance_interval: Duration,
announce_private_addr: bool,
store: S,
gossip_actor: ActorRef<GossipActorMessage>,
chain_actor: ActorRef<CkbChainMessage>,
Expand All @@ -871,6 +873,7 @@ where
ExtendedGossipMessageStoreActor::new(),
(
maintenance_interval,
announce_private_addr,
store.clone(),
gossip_actor,
chain_actor,
Expand Down Expand Up @@ -982,6 +985,7 @@ pub enum GossipMessageProcessingError {
}

pub struct ExtendedGossipMessageStoreState<S> {
announce_private_addr: bool,
store: S,
gossip_actor: ActorRef<GossipActorMessage>,
chain_actor: ActorRef<CkbChainMessage>,
Expand All @@ -993,11 +997,13 @@ pub struct ExtendedGossipMessageStoreState<S> {

impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
fn new(
announce_private_addr: bool,
store: S,
gossip_actor: ActorRef<GossipActorMessage>,
chain_actor: ActorRef<CkbChainMessage>,
) -> Self {
Self {
announce_private_addr,
store,
gossip_actor,
chain_actor,
Expand Down Expand Up @@ -1096,6 +1102,20 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
));
}

if !self.announce_private_addr {
if let BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement) = &message {
if !node_announcement.addresses.iter().any(|addr| {
multiaddr_to_socketaddr(addr)
.map(|socket_addr| is_reachable(socket_addr.ip()))
.unwrap_or_default()
}) {
return Err(GossipMessageProcessingError::ProcessingError(
"private address node announcement".to_string(),
));
}
}
}

trace!("New gossip message saved to memory: {:?}", message);
self.messages_to_be_saved.insert(message.clone());
Ok(message)
Expand Down Expand Up @@ -1137,6 +1157,7 @@ impl<S: GossipMessageStore + Send + Sync + 'static> Actor for ExtendedGossipMess
type State = ExtendedGossipMessageStoreState<S>;
type Arguments = (
Duration,
bool,
S,
ActorRef<GossipActorMessage>,
ActorRef<CkbChainMessage>,
Expand All @@ -1145,12 +1166,19 @@ impl<S: GossipMessageStore + Send + Sync + 'static> Actor for ExtendedGossipMess
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
(gossip_store_maintenance_interval, store, gossip_actor, chain_actor): Self::Arguments,
(
gossip_store_maintenance_interval,
announce_private_addr,
store,
gossip_actor,
chain_actor,
): Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
myself.send_interval(gossip_store_maintenance_interval, || {
ExtendedGossipMessageStoreMessage::Tick
});
Ok(ExtendedGossipMessageStoreState::new(
announce_private_addr,
store,
gossip_actor,
chain_actor,
Expand Down Expand Up @@ -2123,6 +2151,7 @@ impl GossipProtocolHandle {
name: Option<String>,
gossip_network_maintenance_interval: Duration,
gossip_store_maintenance_interval: Duration,
announce_private_addr: bool,
store: S,
chain_actor: ActorRef<CkbChainMessage>,
supervisor: ActorCell,
Expand All @@ -2141,6 +2170,7 @@ impl GossipProtocolHandle {
store_sender,
gossip_network_maintenance_interval,
gossip_store_maintenance_interval,
announce_private_addr,
store,
chain_actor,
),
Expand Down Expand Up @@ -2184,17 +2214,27 @@ where
oneshot::Sender<ExtendedGossipMessageStore<S>>,
Duration,
Duration,
bool,
S,
ActorRef<CkbChainMessage>,
);

async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
(rx, tx, network_maintenance_interval, store_maintenance_interval, store, chain_actor): Self::Arguments,
(
rx,
tx,
network_maintenance_interval,
store_maintenance_interval,
announce_private_addr,
store,
chain_actor,
): Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let store = ExtendedGossipMessageStore::new(
store_maintenance_interval,
announce_private_addr,
store,
myself.clone(),
chain_actor.clone(),
Expand Down
22 changes: 20 additions & 2 deletions src/fiber/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use serde_with::serde_as;
use std::collections::{HashMap, HashSet};
use tentacle::multiaddr::MultiAddr;
use tentacle::secio::PeerId;
use tentacle::utils::{is_reachable, multiaddr_to_socketaddr};
use thiserror::Error;
use tracing::log::error;
use tracing::{debug, info, trace};
Expand Down Expand Up @@ -218,6 +219,8 @@ pub struct NetworkGraph<S> {
// as a NetworkGraphStateStore.
store: S,
history: PaymentHistory<S>,
// Whether to process announcement of private address
announce_private_addr: bool,
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -246,14 +249,15 @@ where
+ Sync
+ 'static,
{
pub fn new(store: S, source: Pubkey) -> Self {
pub fn new(store: S, source: Pubkey, announce_private_addr: bool) -> Self {
let mut network_graph = Self {
source,
channels: HashMap::new(),
nodes: HashMap::new(),
latest_cursor: Cursor::default(),
store: store.clone(),
history: PaymentHistory::new(source, None, store),
announce_private_addr,
};
network_graph.load_from_store();
network_graph
Expand Down Expand Up @@ -438,7 +442,21 @@ where
}
}

fn process_node_announcement(&mut self, node_announcement: NodeAnnouncement) -> Option<Cursor> {
fn process_node_announcement(
&mut self,
mut node_announcement: NodeAnnouncement,
) -> Option<Cursor> {
if !self.announce_private_addr {
node_announcement.addresses.retain(|addr| {
multiaddr_to_socketaddr(addr)
.map(|socket_addr| is_reachable(socket_addr.ip()))
.unwrap_or_default()
});

if node_announcement.addresses.is_empty() {
return None;
}
}
let node_info = NodeInfo::from(node_announcement);
match self.nodes.get(&node_info.node_id) {
Some(old_node) if old_node.timestamp > node_info.timestamp => {
Expand Down
1 change: 1 addition & 0 deletions src/fiber/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2877,6 +2877,7 @@ where
Some(format!("gossip actor {:?}", my_peer_id)),
Duration::from_millis(config.gossip_network_maintenance_interval_ms()).into(),
Duration::from_millis(config.gossip_store_maintenance_interval_ms()).into(),
config.announce_private_addr(),
self.store.clone(),
self.chain_actor.clone(),
myself.get_cell(),
Expand Down
1 change: 1 addition & 0 deletions src/fiber/tests/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl GossipTestingContext {
None,
Duration::from_millis(50).into(),
Duration::from_millis(50).into(),
true,
store.clone(),
chain_actor.clone(),
root_actor.get_cell(),
Expand Down
2 changes: 1 addition & 1 deletion src/fiber/tests/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl MockNetworkGraph {
0,
));
}
let graph = NetworkGraph::new(store.clone(), public_key1.into());
let graph = NetworkGraph::new(store.clone(), public_key1.into(), true);

Self {
keys: keypairs.into_iter().map(|x| x.1).collect(),
Expand Down
1 change: 1 addition & 0 deletions src/fiber/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ impl NetworkNode {
let network_graph = Arc::new(TokioRwLock::new(NetworkGraph::new(
store.clone(),
public_key.clone(),
true,
)));

let network_actor = Actor::spawn_linked(
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ pub async fn main() -> Result<(), ExitMessage> {
let network_graph = Arc::new(RwLock::new(NetworkGraph::new(
store.clone(),
node_public_key.clone().into(),
fiber_config.announce_private_addr(),
)));

let secret_key = ckb_config.read_secret_key().map_err(|err| {
Expand Down

0 comments on commit bb5ff5f

Please sign in to comment.