Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: report changes in supported protocols to ConnectionHandler #3651

Merged
merged 61 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
f2801fb
Report changes in supported protocols back to `ConnectionHandler`
thomaseizinger Mar 20, 2023
9a28cd2
Allow reporting of supported protocols by remote
thomaseizinger Mar 22, 2023
e536dea
Consume supported protocols in identify
thomaseizinger Mar 24, 2023
7699a1e
Report a remote's protocols to other handlers
thomaseizinger Mar 24, 2023
450fc1e
Add test for kademlia client mode
thomaseizinger Mar 24, 2023
a972b9c
Implement kademlia client-mode
thomaseizinger Mar 24, 2023
f9cf33f
Add tests for two servers connecting
thomaseizinger Mar 24, 2023
deb30f3
Report additions and removals of protocols instead
thomaseizinger Apr 13, 2023
3c1bf5f
Report listen protocols on startup to connection
thomaseizinger Apr 13, 2023
9e70729
Introduce `SupportedProtocols` type
thomaseizinger Apr 13, 2023
f1328c5
Deduplicate code and propagate supported protocols only once
thomaseizinger Apr 19, 2023
a4fbcda
Extend docs
thomaseizinger Apr 19, 2023
3e33108
Merge branch 'master' into 2680-explore
thomaseizinger Apr 19, 2023
572ed90
Fix compile errors
thomaseizinger Apr 19, 2023
586394b
Report changes to listen/external address set
thomaseizinger Apr 26, 2023
b329a09
Simplify identify protocol
thomaseizinger Apr 26, 2023
a63af89
Check for changes in inbound protocols on every poll
thomaseizinger Apr 26, 2023
72510dd
Fix clippy lints
thomaseizinger Apr 26, 2023
8ffafdd
Report changes in `SupportedProtocols`
thomaseizinger Apr 26, 2023
ea1a087
Only report changes to handler if there actually was a change
thomaseizinger Apr 26, 2023
27bc507
Do not implicitly dial peers upon push
thomaseizinger Apr 27, 2023
58039d9
Merge branch 'master' into 2680-explore
thomaseizinger Apr 27, 2023
74dd94a
Remove unused import
thomaseizinger Apr 27, 2023
628b519
Fix compile error
thomaseizinger Apr 27, 2023
c7b5011
Remove dbg!
thomaseizinger Apr 27, 2023
a02ca55
Update swarm/src/handler.rs
thomaseizinger Apr 27, 2023
c347c8a
Merge branch '2680-explore' of github.com:libp2p/rust-libp2p into 268…
thomaseizinger May 2, 2023
f3e5e71
Combine match arms where possible
thomaseizinger May 2, 2023
b7fa7ef
Add comment explaining static hashset
thomaseizinger May 2, 2023
2bd9d73
Add docs
thomaseizinger May 2, 2023
e90c40d
Update supported protocols for push messages
thomaseizinger May 2, 2023
84979e4
Use `pop` to avoid panicking branch in `remove`
thomaseizinger May 2, 2023
dbfc7e7
Use `poll_unpin`
thomaseizinger May 2, 2023
f2d2c88
Use `if let` for consistency
thomaseizinger May 2, 2023
b41aeb8
Rewrite to `if let` for consistency
thomaseizinger May 2, 2023
bcd872b
Fill in todo in toggle
thomaseizinger May 2, 2023
021f1d4
Merge branch 'master' into 2680-explore
thomaseizinger May 2, 2023
fb096ad
Merge branch 'master' into 2680-explore
thomaseizinger May 2, 2023
e95c738
Merge branch 'master' into 2680-explore
thomaseizinger May 4, 2023
82642b8
Restore `libp2p-kad`
thomaseizinger May 4, 2023
eb66489
Fix formatting
thomaseizinger May 4, 2023
8c47bd6
Fix unit tests
thomaseizinger May 4, 2023
bf9421e
Change test to assert actual events received
thomaseizinger May 4, 2023
a82343a
Don't report empty set of protocols to handler
thomaseizinger May 4, 2023
19cd9b9
Use constructor
thomaseizinger May 4, 2023
6d3e9ee
Introduce test helper
thomaseizinger May 4, 2023
df93a4e
Make tests less noisy
thomaseizinger May 4, 2023
c50bcfd
Further simplify test and add comments
thomaseizinger May 4, 2023
a799798
Add failing test
thomaseizinger May 4, 2023
0260ad1
Make test pass, i.e. only report actual changes back to the handler
thomaseizinger May 4, 2023
bf99654
Extract helpers to make fields crate-private and add docs
thomaseizinger May 4, 2023
46f4e96
Return `SmallVec` from `from_full_sets` which allows iteration
thomaseizinger May 4, 2023
ae7fc93
Fix clippy warnings
thomaseizinger May 4, 2023
fe9a6e3
Fix rustdoc
thomaseizinger May 4, 2023
d7651b4
Merge branch 'master' into 2680-explore
thomaseizinger May 8, 2023
75681c1
Add changelog entry
thomaseizinger May 8, 2023
bdfb04f
Merge branch 'master' into 2680-explore
thomaseizinger May 8, 2023
88362e5
Sort imports
thomaseizinger May 8, 2023
3c8d326
Merge branch 'master' into 2680-explore
thomaseizinger May 8, 2023
77e4e5b
Merge branch 'master' into 2680-explore
thomaseizinger May 8, 2023
5a772e1
Merge branch 'master' into 2680-explore
thomaseizinger May 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion protocols/dcutr/src/handler/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ impl ConnectionHandler for Handler {
| ConnectionEvent::FullyNegotiatedOutbound(_)
| ConnectionEvent::DialUpgradeError(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::AddressChange(_) => {}
| ConnectionEvent::AddressChange(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {}
}
}
}
4 changes: 3 additions & 1 deletion protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ impl ConnectionHandler for Handler {
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
}
ConnectionEvent::AddressChange(_) => {}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {}
}
}
}
5 changes: 4 additions & 1 deletion protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,10 @@ impl ConnectionHandler for Handler {
}) => {
log::debug!("Protocol negotiation failed: {e}")
}
ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {}
}
}
Handler::Disabled(_) => {}
Expand Down
166 changes: 51 additions & 115 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
// DEALINGS IN THE SOFTWARE.

use crate::handler::{self, Handler, InEvent};
use crate::protocol::{Info, Protocol, UpgradeError};
use crate::protocol::{Info, UpgradeError};
use libp2p_core::{multiaddr, ConnectedPoint, Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_identity::PublicKey;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::{
AddressScore, ConnectionDenied, DialError, ExternalAddresses, ListenAddresses,
NetworkBehaviour, NotifyHandler, PollParameters, StreamProtocol, StreamUpgradeError,
THandlerInEvent, ToSwarm,
NetworkBehaviour, NotifyHandler, PollParameters, StreamUpgradeError, THandlerInEvent, ToSwarm,
};
use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent};
use lru::LruCache;
Expand All @@ -50,10 +49,6 @@ pub struct Behaviour {
config: Config,
/// For each peer we're connected to, the observed address to send back to it.
connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
/// Pending requests to be fulfilled, either `Handler` requests for `Behaviour` info
/// to address identification requests, or push requests to peers
/// with current information about the local peer.
requests: Vec<Request>,
/// Pending events to be emitted when polled.
events: VecDeque<ToSwarm<Event, InEvent>>,
/// The addresses of all peers that we have discovered.
Expand All @@ -63,15 +58,6 @@ pub struct Behaviour {
external_addresses: ExternalAddresses,
}

/// A `Behaviour` request to be fulfilled, either `Handler` requests for `Behaviour` info
/// to address identification requests, or push requests to peers
/// with current information about the local peer.
#[derive(Debug, PartialEq, Eq)]
struct Request {
peer_id: PeerId,
protocol: Protocol,
}

/// Configuration for the [`identify::Behaviour`](Behaviour).
#[non_exhaustive]
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -184,7 +170,6 @@ impl Behaviour {
Self {
config,
connected: HashMap::new(),
requests: Vec::new(),
events: VecDeque::new(),
discovered_peers,
listen_addresses: Default::default(),
Expand All @@ -203,13 +188,11 @@ impl Behaviour {
continue;
}

let request = Request {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: p,
protocol: Protocol::Push,
};
if !self.requests.contains(&request) {
self.requests.push(request);
}
handler: NotifyHandler::Any,
event: InEvent::Push,
});
}
}

Expand Down Expand Up @@ -239,6 +222,14 @@ impl Behaviour {
}
}
}

fn all_addresses(&self) -> HashSet<Multiaddr> {
self.listen_addresses
.iter()
.chain(self.external_addresses.iter())
.cloned()
.collect()
Comment on lines +227 to +231
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note for sometime in the future, ideally we would prioritize the addresses we send to a remote, e.g. we should prioritize (in terms of order in the list) external addresses over listening addresses. Again, not for this pull request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is an easy enough change!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually isn't because we are collecting in a HashSet and thus lose any ordering.

}
}

impl NetworkBehaviour for Behaviour {
Expand All @@ -261,6 +252,7 @@ impl NetworkBehaviour for Behaviour {
self.config.protocol_version.clone(),
self.config.agent_version.clone(),
remote_addr.clone(),
self.all_addresses(),
))
}

Expand All @@ -280,13 +272,14 @@ impl NetworkBehaviour for Behaviour {
self.config.protocol_version.clone(),
self.config.agent_version.clone(),
addr.clone(), // TODO: This is weird? That is the public address we dialed, shouldn't need to tell the other party?
self.all_addresses(),
))
}

fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
_: ConnectionId,
event: THandlerOutEvent<Self>,
) {
match event {
Expand Down Expand Up @@ -315,12 +308,6 @@ impl NetworkBehaviour for Behaviour {
self.events
.push_back(ToSwarm::GenerateEvent(Event::Pushed { peer_id }));
}
handler::Event::Identify => {
self.requests.push(Request {
peer_id,
protocol: Protocol::Identify(connection_id),
});
}
handler::Event::IdentificationError(error) => {
self.events
.push_back(ToSwarm::GenerateEvent(Event::Error { peer_id, error }));
Expand All @@ -331,50 +318,13 @@ impl NetworkBehaviour for Behaviour {
fn poll(
&mut self,
_cx: &mut Context<'_>,
params: &mut impl PollParameters,
_: &mut impl PollParameters,
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}

// Check for pending requests.
match self.requests.pop() {
Some(Request {
peer_id,
protocol: Protocol::Push,
}) => Poll::Ready(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: InEvent {
listen_addrs: self
.listen_addresses
.iter()
.chain(self.external_addresses.iter())
.cloned()
.collect(),
supported_protocols: supported_protocols(params),
protocol: Protocol::Push,
},
}),
Some(Request {
peer_id,
protocol: Protocol::Identify(connection_id),
}) => Poll::Ready(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: InEvent {
listen_addrs: self
.listen_addresses
.iter()
.chain(self.external_addresses.iter())
.cloned()
.collect(),
supported_protocols: supported_protocols(params),
protocol: Protocol::Identify(connection_id),
},
}),
None => Poll::Pending,
}
Poll::Pending
}

fn handle_pending_outbound_connection(
Expand All @@ -393,8 +343,35 @@ impl NetworkBehaviour for Behaviour {
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
self.listen_addresses.on_swarm_event(&event);
self.external_addresses.on_swarm_event(&event);
let listen_addr_changed = self.listen_addresses.on_swarm_event(&event);
let external_addr_changed = self.external_addresses.on_swarm_event(&event);

if listen_addr_changed || external_addr_changed {
// notify all connected handlers about our changed addresses
let change_events = self
.connected
.iter()
.flat_map(|(peer, map)| map.keys().map(|id| (*peer, id)))
.map(|(peer_id, connection_id)| ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(*connection_id),
event: InEvent::AddressesChanged(self.all_addresses()),
})
.collect::<Vec<_>>();

self.events.extend(change_events)
}

if listen_addr_changed && self.config.push_listen_addr_updates {
// trigger an identify push for all connected peers
let push_events = self.connected.keys().map(|peer| ToSwarm::NotifyHandler {
peer_id: *peer,
handler: NotifyHandler::Any,
event: InEvent::Push,
});

self.events.extend(push_events);
}

match event {
FromSwarm::ConnectionEstablished(connection_established) => {
Expand All @@ -408,30 +385,11 @@ impl NetworkBehaviour for Behaviour {
}) => {
if remaining_established == 0 {
self.connected.remove(&peer_id);
self.requests.retain(|request| {
request
!= &Request {
peer_id,
protocol: Protocol::Push,
}
});
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
} else if let Some(addrs) = self.connected.get_mut(&peer_id) {
addrs.remove(&connection_id);
}
}
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let Some(peer_id) = peer_id {
if !self.connected.contains_key(&peer_id) {
self.requests.retain(|request| {
request
!= &Request {
peer_id,
protocol: Protocol::Push,
}
});
}
}
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

if let Some(entry) = peer_id.and_then(|id| self.discovered_peers.get_mut(&id)) {
if let DialError::Transport(errors) = error {
for (addr, _error) in errors {
Expand All @@ -440,20 +398,9 @@ impl NetworkBehaviour for Behaviour {
}
}
}
FromSwarm::NewListenAddr(_) | FromSwarm::ExpiredListenAddr(_) => {
if self.config.push_listen_addr_updates {
for p in self.connected.keys() {
let request = Request {
peer_id: *p,
protocol: Protocol::Push,
};
if !self.requests.contains(&request) {
self.requests.push(request);
}
}
}
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
FromSwarm::AddressChange(_)
FromSwarm::NewListenAddr(_)
| FromSwarm::ExpiredListenAddr(_)
| FromSwarm::AddressChange(_)
| FromSwarm::ListenFailure(_)
| FromSwarm::NewListener(_)
| FromSwarm::ListenerError(_)
Expand Down Expand Up @@ -496,17 +443,6 @@ pub enum Event {
},
}

fn supported_protocols(params: &impl PollParameters) -> Vec<StreamProtocol> {
// The protocol names can be bytes, but the identify protocol except UTF-8 strings.
// There's not much we can do to solve this conflict except strip non-UTF-8 characters.
params
.supported_protocols()
.filter_map(|p| {
StreamProtocol::try_from_owned(String::from_utf8_lossy(&p).to_string()).ok()
})
.collect()
}

/// If there is a given peer_id in the multiaddr, make sure it is the same as
/// the given peer_id. If there is no peer_id for the peer in the mutiaddr, this returns true.
fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
Expand Down
Loading