Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow customizing the delay before closing a Kademlia connection #1477

Merged
merged 7 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ mod test;

use crate::K_VALUE;
use crate::addresses::Addresses;
use crate::handler::{KademliaHandler, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn};
use crate::handler::{KademliaHandler, KademliaHandlerConfig, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn};
use crate::jobs::*;
use crate::kbucket::{self, KBucketsTable, NodeStatus};
use crate::protocol::{KadConnectionType, KadPeer};
use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
use fnv::{FnvHashMap, FnvHashSet};
Expand Down Expand Up @@ -77,6 +77,9 @@ pub struct Kademlia<TStore> {
/// The TTL of provider records.
provider_record_ttl: Option<Duration>,

/// How long to keep connections alive when they're idle.
connection_idle_timeout: Duration,

/// Queued events to return when the behaviour is being polled.
queued_events: VecDeque<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>,

Expand All @@ -97,6 +100,7 @@ pub struct KademliaConfig {
record_publication_interval: Option<Duration>,
provider_record_ttl: Option<Duration>,
provider_publication_interval: Option<Duration>,
connection_idle_timeout: Duration,
}

impl Default for KademliaConfig {
Expand All @@ -110,6 +114,7 @@ impl Default for KademliaConfig {
record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)),
provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)),
provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
connection_idle_timeout: Duration::from_secs(10),
}
}
}
Expand Down Expand Up @@ -217,6 +222,12 @@ impl KademliaConfig {
self.provider_publication_interval = interval;
self
}

/// Sets the amount of time to keep connections alive when they're idle.
pub fn set_connection_idle_timeout(&mut self, duration: Duration) -> &mut Self {
self.connection_idle_timeout = duration;
self
}
}

impl<TStore> Kademlia<TStore>
Expand Down Expand Up @@ -264,6 +275,7 @@ where
put_record_job,
record_ttl: config.record_ttl,
provider_record_ttl: config.provider_record_ttl,
connection_idle_timeout: config.connection_idle_timeout,
}
}

Expand Down Expand Up @@ -1052,11 +1064,16 @@ where
type OutEvent = KademliaEvent;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
let mut handler = KademliaHandler::dial_and_listen();
let mut protocol_config = KademliaProtocolConfig::default();
if let Some(name) = self.protocol_name_override.as_ref() {
handler = handler.with_protocol_name(name.clone());
protocol_config = protocol_config.with_protocol_name(name.clone());
}
handler

KademliaHandler::new(KademliaHandlerConfig {
protocol_config,
allow_listening: true,
idle_timeout: self.connection_idle_timeout,
})
}

fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
Expand Down Expand Up @@ -1335,7 +1352,7 @@ where

fn poll(&mut self, cx: &mut Context, parameters: &mut impl PollParameters) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
<KademliaHandler<QueryId> as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
Expand Down
70 changes: 35 additions & 35 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use libp2p_core::{
upgrade::{self, InboundUpgrade, OutboundUpgrade}
};
use log::trace;
use std::{borrow::Cow, error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration};
use std::{error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration};
use wasm_timer::Instant;

/// Protocol handler that handles Kademlia communications with the remote.
Expand All @@ -48,10 +48,7 @@ use wasm_timer::Instant;
/// It also handles requests made by the remote.
pub struct KademliaHandler<TUserData> {
/// Configuration for the Kademlia protocol.
config: KademliaProtocolConfig,

/// If false, we always refuse incoming Kademlia substreams.
allow_listening: bool,
config: KademliaHandlerConfig,

/// Next unique ID of a connection.
next_connec_unique_id: UniqueConnecId,
Expand All @@ -63,6 +60,19 @@ pub struct KademliaHandler<TUserData> {
keep_alive: KeepAlive,
}

/// Configuration of a [`KademliaHandler`].
#[derive(Debug, Clone)]
pub struct KademliaHandlerConfig {
/// Configuration of the wire protocol.
pub protocol_config: KademliaProtocolConfig,

/// If false, we deny incoming requests.
pub allow_listening: bool,

/// Time after which we close an idle connection.
pub idle_timeout: Duration,
}

/// State of an active substream, opened either by us or by the remote.
enum SubstreamState<TUserData> {
/// We haven't started opening the outgoing substream yet.
Expand Down Expand Up @@ -369,42 +379,22 @@ pub struct KademliaRequestId {
struct UniqueConnecId(u64);

impl<TUserData> KademliaHandler<TUserData> {
/// Create a `KademliaHandler` that only allows sending messages to the remote but denying
/// incoming connections.
pub fn dial_only() -> Self {
KademliaHandler::with_allow_listening(false)
}

/// Create a `KademliaHandler` that only allows sending messages but also receive incoming
/// requests.
///
/// The `Default` trait implementation wraps around this function.
pub fn dial_and_listen() -> Self {
KademliaHandler::with_allow_listening(true)
}
/// Create a [`KademliaHandler`] using the given configuration.
pub fn new(config: KademliaHandlerConfig) -> Self {
let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout);

fn with_allow_listening(allow_listening: bool) -> Self {
KademliaHandler {
config: Default::default(),
allow_listening,
config,
next_connec_unique_id: UniqueConnecId(0),
substreams: Vec::new(),
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(10)),
keep_alive,
}
}

/// Modifies the protocol name used on the wire. Can be used to create incompatibilities
/// between networks on purpose.
pub fn with_protocol_name(mut self, name: impl Into<Cow<'static, [u8]>>) -> Self {
self.config = self.config.with_protocol_name(name);
self
}
}

impl<TUserData> Default for KademliaHandler<TUserData> {
#[inline]
fn default() -> Self {
KademliaHandler::dial_and_listen()
KademliaHandler::new(Default::default())
}
}

Expand All @@ -422,8 +412,8 @@ where

#[inline]
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
if self.allow_listening {
SubstreamProtocol::new(self.config.clone()).map_upgrade(upgrade::EitherUpgrade::A)
if self.config.allow_listening {
SubstreamProtocol::new(self.config.protocol_config.clone()).map_upgrade(upgrade::EitherUpgrade::A)
} else {
SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade))
}
Expand All @@ -449,7 +439,7 @@ where
EitherOutput::Second(p) => void::unreachable(p),
};

debug_assert!(self.allow_listening);
debug_assert!(self.config.allow_listening);
let connec_unique_id = self.next_connec_unique_id;
self.next_connec_unique_id.0 += 1;
self.substreams
Expand Down Expand Up @@ -635,7 +625,7 @@ where
let mut substream = self.substreams.swap_remove(n);

loop {
match advance_substream(substream, self.config.clone(), cx) {
match advance_substream(substream, self.config.protocol_config.clone(), cx) {
(Some(new_state), Some(event), _) => {
self.substreams.push(new_state);
return Poll::Ready(event);
Expand Down Expand Up @@ -672,6 +662,16 @@ where
}
}

impl Default for KademliaHandlerConfig {
fn default() -> Self {
KademliaHandlerConfig {
protocol_config: Default::default(),
allow_listening: true,
idle_timeout: Duration::from_secs(10),
}
}
}

/// Advances one substream.
///
/// Returns the new state for that substream, an event to generate, and whether the substream
Expand Down