Skip to content

Commit

Permalink
outbound peer eviction
Browse files Browse the repository at this point in the history
  • Loading branch information
sistemd committed Feb 7, 2024
1 parent eaba589 commit 628e733
Show file tree
Hide file tree
Showing 10 changed files with 322 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ rayon = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sha2 = "0.10.7"
sha3 = { workspace = true }
tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread", "sync"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
Expand Down
75 changes: 70 additions & 5 deletions crates/p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ pub fn kademlia_protocol_name(chain_id: ChainId) -> String {
pub struct Behaviour {
cfg: Config,
peers: PeerSet,
swarm: crate::Client,
/// Secret value used in the peer eviction process.
///
/// This value is used to pick the peer to be evicted in a deterministic, but
/// unpredictable way.
eviction_secret: [u8; 32],
inner: Inner,
}

Expand Down Expand Up @@ -332,6 +338,9 @@ impl NetworkBehaviour for Behaviour {
if effective_role.is_dialer() {
// This really is an outbound connection, and not a connection that requires
// hole-punching.

self.evict_outbound_peer()?;

self.peers.upsert(
peer_id,
|peer| {
Expand Down Expand Up @@ -365,6 +374,7 @@ impl Behaviour {
pub fn new(
identity: &identity::Keypair,
chain_id: ChainId,
swarm: crate::Client,
cfg: Config,
) -> (Self, relay::client::Transport) {
const PROVIDER_PUBLICATION_INTERVAL: Duration = Duration::from_secs(600);
Expand Down Expand Up @@ -414,6 +424,8 @@ impl Behaviour {
Self {
peers: PeerSet::new(cfg.eviction_timeout),
cfg,
swarm,
eviction_secret: identity.derive_secret(b"eviction").unwrap(),
inner: Inner {
relay,
autonat: autonat::Behaviour::new(peer_id, Default::default()),
Expand Down Expand Up @@ -468,6 +480,59 @@ impl Behaviour {
Ok(())
}

/// Evict an outbound peer if the maximum number of outbound peers has been reached.
fn evict_outbound_peer(&mut self) -> Result<(), ConnectionDenied> {
let mut candidates: Vec<_> = self.outbound_peers().collect();
if candidates.len() < self.cfg.max_outbound_peers {
return Ok(());
}

// Only peers which are flagged as not useful are considered for eviction.
candidates.retain(|(_, peer)| !peer.useful);

// The peer to be evicted is the one with the highest SHA3(eviction_secret || peer_id)
// value. This is deterministic but unpredictable by any outside observer.
candidates.sort_by_key(|(peer_id, _)| {
use sha3::{Digest, Sha3_256};
let mut hasher = Sha3_256::default();
hasher.update(self.eviction_secret);
hasher.update(peer_id.to_bytes());
hasher.finalize()
});
let (peer_id, _) = candidates.pop().ok_or_else(|| {
tracing::debug!("Outbound peer limit reached, but no peers could be evicted");
ConnectionDenied::new(anyhow!(
"outbound peer limit reached and no peers could be evicted"
))
})?;
drop(candidates);

// Disconnect the evicted peer.
tracing::debug!(%peer_id, "Evicting outbound peer");
self.peers.update(peer_id, |peer| {
peer.connectivity = Connectivity::Disconnecting {
connected_at: peer.connected_at(),
};
peer.evicted = true;
});
tokio::spawn({
let swarm = self.swarm.clone();
async move {
if let Err(e) = swarm.disconnect(peer_id).await {
tracing::debug!(%peer_id, %e, "Failed to disconnect evicted peer");
}
}
});

Ok(())
}

pub fn not_useful(&mut self, peer_id: PeerId) {
self.peers.update(peer_id, |peer| {
peer.useful = false;
});
}

pub fn kademlia_mut(&mut self) -> &mut kad::Behaviour<MemoryStore> {
&mut self.inner.kademlia
}
Expand Down Expand Up @@ -500,26 +565,26 @@ impl Behaviour {
self.peers.iter()
}

pub fn num_outbound_peers(&self) -> usize {
/// Outbound peers connected to us.
pub fn outbound_peers(&self) -> impl Iterator<Item = (PeerId, &Peer)> {
self.peers
.iter()
.filter(|(_, peer)| peer.is_outbound())
.count()
.filter(|(_, peer)| peer.is_connected() && peer.is_outbound())
}

/// Number of inbound non-relayed peers.
fn num_inbound_direct_peers(&self) -> usize {
self.peers
.iter()
.filter(|(_, peer)| peer.is_inbound() && !peer.is_relayed())
.filter(|(_, peer)| peer.is_connected() && peer.is_inbound() && !peer.is_relayed())
.count()
}

/// Number of inbound relayed peers.
fn num_inbound_relayed_peers(&self) -> usize {
self.peers
.iter()
.filter(|(_, peer)| peer.is_inbound() && peer.is_relayed())
.filter(|(_, peer)| peer.is_connected() && peer.is_inbound() && peer.is_relayed())
.count()
}
}
Expand Down
12 changes: 12 additions & 0 deletions crates/p2p/src/client/peer_aware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,18 @@ impl Client {
receiver.await.expect("Sender not to be dropped")
}

/// Mark a peer as not useful.
///
/// These peers will be candidates for outbound peer eviction.
pub async fn not_useful(&self, peer_id: PeerId) {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::NotUseful { peer_id, sender })
.await
.expect("Command receiver not to be dropped");
receiver.await.expect("Sender not to be dropped")
}

#[cfg(test)]
pub(crate) fn for_test(&self) -> test_utils::Client {
test_utils::Client::new(self.sender.clone())
Expand Down
20 changes: 15 additions & 5 deletions crates/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ pub use behaviour::{kademlia_protocol_name, IDENTIFY_PROTOCOL_NAME};
pub fn new(keypair: Keypair, cfg: Config, chain_id: ChainId) -> (Client, EventReceiver, MainLoop) {
let local_peer_id = keypair.public().to_peer_id();

let (behaviour, relay_transport) = behaviour::Behaviour::new(&keypair, chain_id, cfg.clone());
let (command_sender, command_receiver) = mpsc::channel(1);
let client = Client::new(command_sender, local_peer_id);

let (behaviour, relay_transport) =
behaviour::Behaviour::new(&keypair, chain_id, client.clone(), cfg.clone());

let swarm = Swarm::new(
transport::create(&keypair, relay_transport),
Expand All @@ -61,11 +65,10 @@ pub fn new(keypair: Keypair, cfg: Config, chain_id: ChainId) -> (Client, EventRe
swarm::Config::with_tokio_executor().with_idle_connection_timeout(Duration::MAX),
);

let (command_sender, command_receiver) = mpsc::channel(1);
let (event_sender, event_receiver) = mpsc::channel(1);

(
Client::new(command_sender, local_peer_id),
client,
event_receiver,
MainLoop::new(swarm, command_receiver, event_sender, cfg, chain_id),
)
Expand All @@ -78,10 +81,12 @@ pub struct Config {
pub direct_connection_timeout: Duration,
/// A relayed peer can only connect once in this period.
pub relay_connection_timeout: Duration,
/// Maximum number of direct (non-relayed) peers.
/// Maximum number of direct (non-relayed) inbound peers.
pub max_inbound_direct_peers: usize,
/// Maximum number of relayed peers.
/// Maximum number of relayed inbound peers.
pub max_inbound_relayed_peers: usize,
/// Maximum number of outbound peers.
pub max_outbound_peers: usize,
/// The minimum number of peers to maintain. If the number of outbound peers drops below this
/// number, the node will attempt to connect to more peers.
pub low_watermark: usize,
Expand Down Expand Up @@ -175,6 +180,10 @@ enum Command {
new_block: NewBlock,
sender: EmptyResultSender,
},
NotUseful {
peer_id: PeerId,
sender: oneshot::Sender<()>,
},
/// For testing purposes only
_Test(TestCommand),
}
Expand Down Expand Up @@ -226,6 +235,7 @@ pub enum Event {
#[derive(Debug)]
pub enum TestEvent {
NewListenAddress(Multiaddr),
KademliaBootstrapStarted,
KademliaBootstrapCompleted(Result<PeerId, PeerId>),
StartProvidingCompleted(Result<RecordKey, RecordKey>),
ConnectionEstablished { outbound: bool, remote: PeerId },
Expand Down
17 changes: 15 additions & 2 deletions crates/p2p/src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,14 @@ impl MainLoop {
_ => self.ongoing_bootstrap = None,
}
}
if self.swarm.behaviour_mut().num_outbound_peers() < self.cfg.low_watermark {
if self.swarm.behaviour_mut().outbound_peers().count() < self.cfg.low_watermark {
if let Ok(query_id) = self.swarm.behaviour_mut().kademlia_mut().bootstrap() {
self.ongoing_bootstrap = Some(query_id);
send_test_event(
&self.event_sender,
TestEvent::KademliaBootstrapStarted,
)
.await;
}
}
}
Expand Down Expand Up @@ -229,6 +234,7 @@ impl MainLoop {
connection_id: _, // TODO consider tracking connection IDs for peers
..
} => {
tracing::debug!(%peer_id, "Connection closed");
if num_established == 0 {
send_test_event(
&self.event_sender,
Expand Down Expand Up @@ -351,7 +357,10 @@ impl MainLoop {
let num_connections = connection_counters.num_connections();

let result = match result {
Ok(BootstrapOk { peer, .. }) => {
Ok(BootstrapOk {
peer,
..,
}) => {
tracing::debug!(%num_peers, %num_connections, "Periodic bootstrap completed");
Ok(peer)
}
Expand Down Expand Up @@ -857,6 +866,10 @@ impl MainLoop {
let result = self.publish_data(topic, &data);
let _ = sender.send(result);
}
Command::NotUseful { peer_id, sender } => {
self.swarm.behaviour_mut().not_useful(peer_id);
let _ = sender.send(());
}
Command::_Test(command) => self.handle_test_command(command).await,
};
}
Expand Down
7 changes: 6 additions & 1 deletion crates/p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ impl Peer {

pub fn connected_at(&self) -> Option<Instant> {
match self.connectivity {
Connectivity::Connected { connected_at } => Some(connected_at),
Connectivity::Connected { connected_at, .. } => Some(connected_at),
Connectivity::Disconnecting { connected_at, .. } => connected_at,
Connectivity::Disconnected { connected_at, .. } => connected_at,
Connectivity::Dialing => None,
}
Expand All @@ -62,6 +63,10 @@ pub enum Connectivity {
/// When the peer was connected.
connected_at: Instant,
},
Disconnecting {
/// When the peer was connected, if he was connected.
connected_at: Option<Instant>,
},
Disconnected {
/// When the peer was connected, if he was connected.
connected_at: Option<Instant>,
Expand Down
Loading

0 comments on commit 628e733

Please sign in to comment.