Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: report changes in supported protocols to ConnectionHandler #3651

Merged
merged 61 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
f2801fb
Report changes in supported protocols back to `ConnectionHandler`
thomaseizinger Mar 20, 2023
9a28cd2
Allow reporting of supported protocols by remote
thomaseizinger Mar 22, 2023
e536dea
Consume supported protocols in identify
thomaseizinger Mar 24, 2023
7699a1e
Report a remote's protocols to other handlers
thomaseizinger Mar 24, 2023
450fc1e
Add test for kademlia client mode
thomaseizinger Mar 24, 2023
a972b9c
Implement kademlia client-mode
thomaseizinger Mar 24, 2023
f9cf33f
Add tests for two servers connecting
thomaseizinger Mar 24, 2023
deb30f3
Report additions and removals of protocols instead
thomaseizinger Apr 13, 2023
3c1bf5f
Report listen protocols on startup to connection
thomaseizinger Apr 13, 2023
9e70729
Introduce `SupportedProtocols` type
thomaseizinger Apr 13, 2023
f1328c5
Deduplicate code and propagate supported protocols only once
thomaseizinger Apr 19, 2023
a4fbcda
Extend docs
thomaseizinger Apr 19, 2023
3e33108
Merge branch 'master' into 2680-explore
thomaseizinger Apr 19, 2023
572ed90
Fix compile errors
thomaseizinger Apr 19, 2023
586394b
Report changes to listen/external address set
thomaseizinger Apr 26, 2023
b329a09
Simplify identify protocol
thomaseizinger Apr 26, 2023
a63af89
Check for changes in inbound protocols on every poll
thomaseizinger Apr 26, 2023
72510dd
Fix clippy lints
thomaseizinger Apr 26, 2023
8ffafdd
Report changes in `SupportedProtocols`
thomaseizinger Apr 26, 2023
ea1a087
Only report changes to handler if there actually was a change
thomaseizinger Apr 26, 2023
27bc507
Do not implicitly dial peers upon push
thomaseizinger Apr 27, 2023
58039d9
Merge branch 'master' into 2680-explore
thomaseizinger Apr 27, 2023
74dd94a
Remove unused import
thomaseizinger Apr 27, 2023
628b519
Fix compile error
thomaseizinger Apr 27, 2023
c7b5011
Remove dbg!
thomaseizinger Apr 27, 2023
a02ca55
Update swarm/src/handler.rs
thomaseizinger Apr 27, 2023
c347c8a
Merge branch '2680-explore' of github.com:libp2p/rust-libp2p into 268…
thomaseizinger May 2, 2023
f3e5e71
Combine match arms where possible
thomaseizinger May 2, 2023
b7fa7ef
Add comment explaining static hashset
thomaseizinger May 2, 2023
2bd9d73
Add docs
thomaseizinger May 2, 2023
e90c40d
Update supported protocols for push messages
thomaseizinger May 2, 2023
84979e4
Use `pop` to avoid panicking branch in `remove`
thomaseizinger May 2, 2023
dbfc7e7
Use `poll_unpin`
thomaseizinger May 2, 2023
f2d2c88
Use `if let` for consistency
thomaseizinger May 2, 2023
b41aeb8
Rewrite to `if let` for consistency
thomaseizinger May 2, 2023
bcd872b
Fill in todo in toggle
thomaseizinger May 2, 2023
021f1d4
Merge branch 'master' into 2680-explore
thomaseizinger May 2, 2023
fb096ad
Merge branch 'master' into 2680-explore
thomaseizinger May 2, 2023
e95c738
Merge branch 'master' into 2680-explore
thomaseizinger May 4, 2023
82642b8
Restore `libp2p-kad`
thomaseizinger May 4, 2023
eb66489
Fix formatting
thomaseizinger May 4, 2023
8c47bd6
Fix unit tests
thomaseizinger May 4, 2023
bf9421e
Change test to assert actual events received
thomaseizinger May 4, 2023
a82343a
Don't report empty set of protocols to handler
thomaseizinger May 4, 2023
19cd9b9
Use constructor
thomaseizinger May 4, 2023
6d3e9ee
Introduce test helper
thomaseizinger May 4, 2023
df93a4e
Make tests less noisy
thomaseizinger May 4, 2023
c50bcfd
Further simplify test and add comments
thomaseizinger May 4, 2023
a799798
Add failing test
thomaseizinger May 4, 2023
0260ad1
Make test pass, i.e. only report actual changes back to the handler
thomaseizinger May 4, 2023
bf99654
Extract helpers to make fields crate-private and add docs
thomaseizinger May 4, 2023
46f4e96
Return `SmallVec` from `from_full_sets` which allows iteration
thomaseizinger May 4, 2023
ae7fc93
Fix clippy warnings
thomaseizinger May 4, 2023
fe9a6e3
Fix rustdoc
thomaseizinger May 4, 2023
d7651b4
Merge branch 'master' into 2680-explore
thomaseizinger May 8, 2023
75681c1
Add changelog entry
thomaseizinger May 8, 2023
bdfb04f
Merge branch 'master' into 2680-explore
thomaseizinger May 8, 2023
88362e5
Sort imports
thomaseizinger May 8, 2023
3c8d326
Merge branch 'master' into 2680-explore
thomaseizinger May 8, 2023
77e4e5b
Merge branch 'master' into 2680-explore
thomaseizinger May 8, 2023
5a772e1
Merge branch 'master' into 2680-explore
thomaseizinger May 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion protocols/dcutr/src/handler/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ impl ConnectionHandler for Handler {
| ConnectionEvent::FullyNegotiatedOutbound(_)
| ConnectionEvent::DialUpgradeError(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::AddressChange(_) => {}
| ConnectionEvent::AddressChange(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {}
}
}
}
4 changes: 3 additions & 1 deletion protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,9 @@ impl ConnectionHandler for Handler {
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
}
ConnectionEvent::AddressChange(_) => {}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {}
}
}
}
5 changes: 4 additions & 1 deletion protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,10 @@ impl ConnectionHandler for Handler {
warn!("Dial upgrade error {:?}", e);
self.upgrade_errors.push_back(e);
}
ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {}
}
}
}
13 changes: 1 addition & 12 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ impl NetworkBehaviour for Behaviour {
fn poll(
&mut self,
_cx: &mut Context<'_>,
params: &mut impl PollParameters,
_: &mut impl PollParameters,
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
Expand All @@ -344,7 +344,6 @@ impl NetworkBehaviour for Behaviour {
.chain(self.external_addresses.iter())
.cloned()
.collect(),
supported_protocols: supported_protocols(params),
protocol: Protocol::Push,
},
}),
Expand All @@ -361,7 +360,6 @@ impl NetworkBehaviour for Behaviour {
.chain(self.external_addresses.iter())
.cloned()
.collect(),
supported_protocols: supported_protocols(params),
protocol: Protocol::Identify(connection_id),
},
}),
Expand Down Expand Up @@ -488,15 +486,6 @@ pub enum Event {
},
}

fn supported_protocols(params: &impl PollParameters) -> Vec<String> {
// The protocol names can be bytes, but the identify protocol except UTF-8 strings.
// There's not much we can do to solve this conflict except strip non-UTF-8 characters.
params
.supported_protocols()
.map(|p| String::from_utf8_lossy(&p).to_string())
.collect()
}

/// If there is a given peer_id in the multiaddr, make sure it is the same as
/// the given peer_id. If there is no peer_id for the peer in the mutiaddr, this returns true.
fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
Expand Down
22 changes: 17 additions & 5 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use libp2p_identity::PeerId;
use libp2p_identity::PublicKey;
use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ProtocolsChange,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
Expand Down Expand Up @@ -83,6 +84,8 @@ pub struct Handler {

/// Address observed by or for the remote.
observed_addr: Multiaddr,

local_supported_protocols: Vec<String>,
}

/// An event from `Behaviour` with the information requested by the `Handler`.
Expand All @@ -91,9 +94,6 @@ pub struct InEvent {
/// The addresses that the peer is listening on.
pub listen_addrs: Vec<Multiaddr>,

/// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
pub supported_protocols: Vec<String>,

/// The protocol w.r.t. the information requested.
pub protocol: Protocol,
}
Expand Down Expand Up @@ -138,6 +138,7 @@ impl Handler {
protocol_version,
agent_version,
observed_addr,
local_supported_protocols: vec![],
}
}

Expand Down Expand Up @@ -186,6 +187,10 @@ impl Handler {
) {
match output {
future::Either::Left(remote_info) => {
self.events
.push(ConnectionHandlerEvent::ReportRemoteProtocols {
protocols: remote_info.protocols.clone(),
});
self.events
.push(ConnectionHandlerEvent::Custom(Event::Identified(
remote_info,
Expand Down Expand Up @@ -238,7 +243,6 @@ impl ConnectionHandler for Handler {
&mut self,
InEvent {
listen_addrs,
supported_protocols,
protocol,
}: Self::InEvent,
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
) {
Expand All @@ -247,7 +251,7 @@ impl ConnectionHandler for Handler {
protocol_version: self.protocol_version.clone(),
agent_version: self.agent_version.clone(),
listen_addrs,
protocols: supported_protocols,
protocols: self.local_supported_protocols.clone(),
observed_addr: self.observed_addr.clone(),
};

Expand Down Expand Up @@ -307,6 +311,10 @@ impl ConnectionHandler for Handler {
self.inbound_identify_push.take();

if let Ok(info) = res {
self.events
.push(ConnectionHandlerEvent::ReportRemoteProtocols {
protocols: info.protocols.clone(),
});
return Poll::Ready(ConnectionHandlerEvent::Custom(Event::Identified(info)));
}
}
Expand Down Expand Up @@ -345,6 +353,10 @@ impl ConnectionHandler for Handler {
self.on_dial_upgrade_error(dial_upgrade_error)
}
ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {}
ConnectionEvent::LocalProtocolsChange(ProtocolsChange { protocols }) => {
self.local_supported_protocols = protocols.to_vec();
}
ConnectionEvent::RemoteProtocolsChange(_) => {}
}
}
}
4 changes: 4 additions & 0 deletions protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ serde = { version = "1.0", optional = true, features = ["derive"] }
thiserror = "1"

[dev-dependencies]
async-std = { version = "1.12.0", features = ["attributes"] }
env_logger = "0.10.0"
futures-timer = "3.0"
libp2p-identify = { path = "../identify" }
libp2p-noise = { path = "../../transports/noise" }
libp2p-swarm = { path = "../../swarm", features = ["macros"] }
libp2p-swarm-test = { path = "../../swarm-test" }
libp2p-yamux = { path = "../../muxers/yamux" }
quickcheck = { package = "quickcheck-ext", path = "../../misc/quickcheck-ext" }

Expand Down
30 changes: 28 additions & 2 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub struct Kademlia<TStore> {

/// The record storage.
store: TStore,
mode: Mode,
}

/// The configurable strategies for the insertion of peers
Expand Down Expand Up @@ -182,6 +183,7 @@ pub struct KademliaConfig {
connection_idle_timeout: Duration,
kbucket_inserts: KademliaBucketInserts,
caching: KademliaCaching,
mode: Mode,
}

impl Default for KademliaConfig {
Expand All @@ -199,6 +201,7 @@ impl Default for KademliaConfig {
connection_idle_timeout: Duration::from_secs(10),
kbucket_inserts: KademliaBucketInserts::OnConnected,
caching: KademliaCaching::Enabled { max_peers: 1 },
mode: Mode::Server,
}
}
}
Expand Down Expand Up @@ -399,6 +402,14 @@ impl KademliaConfig {
self.caching = c;
self
}

/// Sets the mode.
///
/// TODO: More docs.
pub fn set_mode(&mut self, m: Mode) -> &mut Self {
self.mode = m;
self
}
}

impl<TStore> Kademlia<TStore>
Expand Down Expand Up @@ -453,6 +464,7 @@ where
connection_idle_timeout: config.connection_idle_timeout,
external_addresses: Default::default(),
local_peer_id: id,
mode: config.mode,
}
}

Expand Down Expand Up @@ -1976,7 +1988,7 @@ where
Ok(KademliaHandler::new(
KademliaHandlerConfig {
protocol_config: self.protocol_config.clone(),
allow_listening: true,
allow_listening: self.mode == Mode::Server,
idle_timeout: self.connection_idle_timeout,
},
ConnectedPoint::Listener {
Expand All @@ -1997,7 +2009,7 @@ where
Ok(KademliaHandler::new(
KademliaHandlerConfig {
protocol_config: self.protocol_config.clone(),
allow_listening: true,
allow_listening: self.mode == Mode::Server,
idle_timeout: self.connection_idle_timeout,
},
ConnectedPoint::Dialer {
Expand Down Expand Up @@ -2062,6 +2074,14 @@ where
self.connection_updated(source, address, NodeStatus::Connected);
}

KademliaHandlerEvent::ProtocolNotSupported { endpoint } => {
let address = match endpoint {
ConnectedPoint::Dialer { address, .. } => Some(address),
ConnectedPoint::Listener { .. } => None,
};
self.connection_updated(source, address, NodeStatus::Disconnected);
}

KademliaHandlerEvent::FindNodeReq { key, request_id } => {
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);

Expand Down Expand Up @@ -3190,3 +3210,9 @@ pub enum RoutingUpdate {
/// peer ID).
Failed,
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Mode {
Client,
Server,
}
44 changes: 33 additions & 11 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use libp2p_core::{upgrade, ConnectedPoint};
use libp2p_identity::PeerId;
use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
ProtocolsChange,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
Expand Down Expand Up @@ -92,10 +93,12 @@ pub struct KademliaHandler<TUserData> {
enum ProtocolStatus {
/// It is as yet unknown whether the remote supports the
/// configured protocol name.
Unconfirmed,
Unknown,
/// The configured protocol name has been confirmed by the remote
/// but has not yet been reported to the `Kademlia` behaviour.
Confirmed,
/// The configured protocol name(s) are not or no longer supported by the remote.
NotSupported,
/// The configured protocol has been confirmed by the remote
/// and the confirmation reported to the `Kademlia` behaviour.
Reported,
Expand Down Expand Up @@ -226,13 +229,11 @@ impl<TUserData> InboundSubstreamState<TUserData> {
#[derive(Debug)]
pub enum KademliaHandlerEvent<TUserData> {
/// The configured protocol name has been confirmed by the peer through
/// a successfully negotiated substream.
///
/// This event is only emitted once by a handler upon the first
/// successfully negotiated inbound or outbound substream and
/// indicates that the connected peer participates in the Kademlia
/// overlay network identified by the configured protocol name.
/// a successfully negotiated substream or by learning the supported protocols of the remote.
ProtocolConfirmed { endpoint: ConnectedPoint },
/// The configured protocol name(s) are not or no longer supported by the peer on the provided
/// connection and it should be removed from the routing table.
ProtocolNotSupported { endpoint: ConnectedPoint },

/// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
/// returned is not specified, but should be around 20.
Expand Down Expand Up @@ -501,7 +502,7 @@ where
num_requested_outbound_streams: 0,
requested_streams: Default::default(),
keep_alive,
protocol_status: ProtocolStatus::Unconfirmed,
protocol_status: ProtocolStatus::Unknown,
}
}

Expand All @@ -520,7 +521,7 @@ where
protocol, msg, user_data,
));
self.num_requested_outbound_streams -= 1;
if let ProtocolStatus::Unconfirmed = self.protocol_status {
if let ProtocolStatus::Unknown = self.protocol_status {
// Upon the first successfully negotiated substream, we know that the
// remote is configured with the same protocol name and we want
// the behaviour to add this peer to the routing table, if possible.
Expand All @@ -542,7 +543,7 @@ where
future::Either::Right(p) => void::unreachable(p),
};

if let ProtocolStatus::Unconfirmed = self.protocol_status {
if let ProtocolStatus::Unknown = self.protocol_status {
// Upon the first successfully negotiated substream, we know that the
// remote is configured with the same protocol name and we want
// the behaviour to add this peer to the routing table, if possible.
Expand Down Expand Up @@ -786,7 +787,28 @@ where
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
}
ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::LocalProtocolsChange(_) => {}
ConnectionEvent::RemoteProtocolsChange(ProtocolsChange { protocols }) => {
// TODO: We should cache this / it will get simpler with #2831.
let kademlia_protocols = self
.config
.protocol_config
.protocol_names()
.iter()
.filter_map(|b| String::from_utf8(b.to_vec()).ok())
.collect::<Vec<_>>();

let remote_supports_our_kademlia_protocols =
kademlia_protocols.iter().all(|p| protocols.contains(p));
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

if remote_supports_our_kademlia_protocols {
self.protocol_status = ProtocolStatus::Confirmed;
} else {
self.protocol_status = ProtocolStatus::NotSupported;
}
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion protocols/kad/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub use behaviour::{
AddProviderContext, AddProviderError, AddProviderOk, AddProviderPhase, AddProviderResult,
BootstrapError, BootstrapOk, BootstrapResult, GetClosestPeersError, GetClosestPeersOk,
GetClosestPeersResult, GetProvidersError, GetProvidersOk, GetProvidersResult, GetRecordError,
GetRecordOk, GetRecordResult, InboundRequest, NoKnownPeers, PeerRecord, PutRecordContext,
GetRecordOk, GetRecordResult, InboundRequest, Mode, NoKnownPeers, PeerRecord, PutRecordContext,
PutRecordError, PutRecordOk, PutRecordPhase, PutRecordResult, QueryInfo, QueryMut, QueryRef,
QueryResult, QueryStats,
};
Expand Down
Loading