Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move Identify I/O from NetworkBehaviour to ConnectionHandler #3208

Merged
merged 18 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 34 additions & 71 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::handler::{self, Proto, Push};
use crate::handler::{self, InEvent, Proto, Reply};
use crate::protocol::{Info, ReplySubstream, UpgradeError};
use futures::prelude::*;
use libp2p_core::{
connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey,
};
Expand All @@ -35,7 +34,6 @@ use std::num::NonZeroUsize;
use std::{
collections::{HashMap, HashSet, VecDeque},
iter::FromIterator,
pin::Pin,
task::Context,
task::Poll,
time::Duration,
Expand All @@ -51,8 +49,8 @@ pub struct Behaviour {
config: Config,
/// For each peer we're connected to, the observed address to send back to it.
connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
/// Pending replies to send.
pending_replies: VecDeque<Reply>,
/// Pending requests to respond.
requests: VecDeque<Request>,
/// Pending events to be emitted when polled.
events: VecDeque<NetworkBehaviourAction<Event, Proto>>,
/// Peers to which an active push with current information about
Expand All @@ -63,18 +61,10 @@ pub struct Behaviour {
}

/// A pending reply to an inbound identification request.
enum Reply {
/// The reply is queued for sending.
Queued {
peer: PeerId,
io: ReplySubstream<NegotiatedSubstream>,
observed: Multiaddr,
},
/// The reply is being sent.
Sending {
peer: PeerId,
io: Pin<Box<dyn Future<Output = Result<(), UpgradeError>> + Send>>,
},
struct Request {
peer: PeerId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The handler could learn this via IntoConnectionHandler. It will never change across its lifespan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated!

io: ReplySubstream<NegotiatedSubstream>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to not pass this back and forth.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed thanks, updated!

observed: Multiaddr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, a Handler can learn this via IntoConnectionHandler and it won't change1 across its lifetime.

Footnotes

  1. AddressChange is not emitted at the moment but once it will, it is directly given to the handler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we only get it on connection_established which seems to be called after new_handler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to implement IntoConnectionHandler additionally on a struct that you return from new_handler, said struct can then capture the ConnectedPoint and initialise the ConnectionHandler with it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks Thomas, updated!

}

/// Configuration for the [`identify::Behaviour`](Behaviour).
Expand Down Expand Up @@ -184,7 +174,7 @@ impl Behaviour {
Self {
config,
connected: HashMap::new(),
pending_replies: VecDeque::new(),
requests: VecDeque::new(),
events: VecDeque::new(),
pending_push: HashSet::new(),
discovered_peers,
Expand Down Expand Up @@ -271,6 +261,12 @@ impl NetworkBehaviour for Behaviour {
score: AddressScore::Finite(1),
});
}
handler::Event::Identification(peer) => {
self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Sent {
peer_id: peer,
}));
}
handler::Event::IdentificationPushed => {
self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Pushed {
Expand All @@ -287,7 +283,7 @@ impl NetworkBehaviour for Behaviour {
with an established connection and calling `NetworkBehaviour::on_event` \
with `FromSwarm::ConnectionEstablished ensures there is an entry; qed",
);
self.pending_replies.push_back(Reply::Queued {
self.requests.push_back(Request {
peer: peer_id,
io: sender,
observed: observed.clone(),
Expand All @@ -305,7 +301,7 @@ impl NetworkBehaviour for Behaviour {

fn poll(
&mut self,
cx: &mut Context<'_>,
_cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(event) = self.events.pop_front() {
Expand Down Expand Up @@ -333,7 +329,7 @@ impl NetworkBehaviour for Behaviour {
observed_addr,
};

(*peer, Push(info))
(*peer, InEvent::Push(info))
})
});

Expand All @@ -346,55 +342,21 @@ impl NetworkBehaviour for Behaviour {
});
}

// Check for pending replies to send.
if let Some(r) = self.pending_replies.pop_front() {
let mut sending = 0;
let to_send = self.pending_replies.len() + 1;
let mut reply = Some(r);
loop {
match reply {
Some(Reply::Queued { peer, io, observed }) => {
let info = Info {
listen_addrs: listen_addrs(params),
protocols: supported_protocols(params),
public_key: self.config.local_public_key.clone(),
protocol_version: self.config.protocol_version.clone(),
agent_version: self.config.agent_version.clone(),
observed_addr: observed,
};
let io = Box::pin(io.send(info));
reply = Some(Reply::Sending { peer, io });
}
Some(Reply::Sending { peer, mut io }) => {
sending += 1;
match Future::poll(Pin::new(&mut io), cx) {
Poll::Ready(Ok(())) => {
let event = Event::Sent { peer_id: peer };
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending => {
self.pending_replies.push_back(Reply::Sending { peer, io });
if sending == to_send {
// All remaining futures are NotReady
break;
} else {
reply = self.pending_replies.pop_front();
}
}
Poll::Ready(Err(err)) => {
let event = Event::Error {
peer_id: peer,
error: ConnectionHandlerUpgrErr::Upgrade(
libp2p_core::upgrade::UpgradeError::Apply(err),
),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
}
}
None => unreachable!(),
}
}
// Check for pending requests to send back to the handler for reply.
if let Some(Request { peer, io, observed }) = self.requests.pop_front() {
let info = Info {
listen_addrs: listen_addrs(params),
protocols: supported_protocols(params),
public_key: self.config.local_public_key.clone(),
protocol_version: self.config.protocol_version.clone(),
agent_version: self.config.agent_version.clone(),
observed_addr: observed,
};
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id: peer,
handler: NotifyHandler::Any,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the curlpit! This will be buggy with > 1 connection per peer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Thomas, addressed.

event: InEvent::Identify(Reply { peer, info, io }),
});
}

Poll::Pending
Expand Down Expand Up @@ -557,6 +519,7 @@ impl PeerCache {
mod tests {
use super::*;
use futures::pin_mut;
use futures::prelude::*;
use libp2p::mplex::MplexConfig;
use libp2p::noise;
use libp2p::tcp;
Expand Down Expand Up @@ -618,7 +581,7 @@ mod tests {

// nb. Either swarm may receive the `Identified` event first, upon which
// it will permit the connection to be closed, as defined by
// `IdentifyHandler::connection_keep_alive`. Hence the test succeeds if
// `Handler::connection_keep_alive`. Hence the test succeeds if
// either `Identified` event arrives correctly.
async_std::task::block_on(async move {
loop {
Expand Down
106 changes: 92 additions & 14 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use libp2p_swarm::{
};
use log::warn;
use smallvec::SmallVec;
use std::collections::VecDeque;
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};

pub struct Proto {
Expand Down Expand Up @@ -64,6 +65,25 @@ impl IntoConnectionHandler for Proto {
}
}

/// A pending reply to an inbound identification request.
enum Pending {
/// The reply is queued for sending.
Queued(Reply),
/// The reply is being sent.
Sending {
peer: PeerId,
io: Pin<Box<dyn Future<Output = Result<(), UpgradeError>> + Send>>,
},
}

/// A reply to an inbound identification request.
#[derive(Debug)]
pub struct Reply {
pub peer: PeerId,
pub io: ReplySubstream<NegotiatedSubstream>,
pub info: Info,
}

/// Protocol handler for sending and receiving identification requests.
///
/// Outbound requests are sent periodically. The handler performs expects
Expand All @@ -82,6 +102,9 @@ pub struct Handler {
>; 4],
>,

/// Pending replies to send.
pending_replies: VecDeque<Pending>,

/// Future that fires when we need to identify the node again.
trigger_next_identify: Delay,

Expand All @@ -92,31 +115,39 @@ pub struct Handler {
interval: Duration,
}

/// Event produced by the `IdentifyHandler`.
/// Event produced by the `Handler`.
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Event {
/// We obtained identification information from the remote.
Identified(Info),
/// We replied to an identification request from the remote.
Identification(PeerId),
/// We actively pushed our identification information to the remote.
IdentificationPushed,
/// We received a request for identification.
Identify(ReplySubstream<NegotiatedSubstream>),
/// Failed to identify the remote.
/// Failed to identify the remote, or to reply to an identification request.
IdentificationError(ConnectionHandlerUpgrErr<UpgradeError>),
}

/// Identifying information of the local node that is pushed to a remote.
#[derive(Debug)]
pub struct Push(pub Info);
#[allow(clippy::large_enum_variant)]
pub enum InEvent {
/// Identifying information of the local node that is pushed to a remote.
Push(Info),
/// Identifying information requested from this node.
Identify(Reply),
}

impl Handler {
/// Creates a new `IdentifyHandler`.
/// Creates a new `Handler`.
pub fn new(initial_delay: Duration, interval: Duration, remote_peer_id: PeerId) -> Self {
Self {
remote_peer_id,
inbound_identify_push: Default::default(),
events: SmallVec::new(),
pending_replies: VecDeque::new(),
trigger_next_identify: Delay::new(initial_delay),
keep_alive: KeepAlive::Yes,
interval,
Expand Down Expand Up @@ -195,7 +226,7 @@ impl Handler {
}

impl ConnectionHandler for Handler {
type InEvent = Push;
type InEvent = InEvent;
type OutEvent = Event;
type Error = io::Error;
type InboundProtocol = SelectUpgrade<Protocol, PushProtocol<InboundPush>>;
Expand All @@ -207,14 +238,28 @@ impl ConnectionHandler for Handler {
SubstreamProtocol::new(SelectUpgrade::new(Protocol, PushProtocol::inbound()), ())
}

fn on_behaviour_event(&mut self, Push(push): Self::InEvent) {
self.events
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
EitherUpgrade::B(PushProtocol::outbound(push)),
(),
),
});
fn on_behaviour_event(&mut self, event: Self::InEvent) {
match event {
InEvent::Push(push) => {
self.events
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
EitherUpgrade::B(PushProtocol::outbound(push)),
(),
),
});
}
InEvent::Identify(reply) => {
if !self.pending_replies.is_empty() {
warn!(
"New inbound identify request from {} while a previous one \
is still pending. Queueing the new one.",
reply.peer,
);
}
self.pending_replies.push_back(Pending::Queued(reply));
}
}
}

fn connection_keep_alive(&self) -> KeepAlive {
Expand Down Expand Up @@ -255,6 +300,39 @@ impl ConnectionHandler for Handler {
}
}

// Check for pending replies to send.
if let Some(mut pending) = self.pending_replies.pop_front() {
loop {
match pending {
Pending::Queued(Reply { peer, io, info }) => {
let io = Box::pin(io.send(info));
pending = Pending::Sending { peer, io };
}
Pending::Sending { peer, mut io } => {
match Future::poll(Pin::new(&mut io), cx) {
Poll::Pending => {
self.pending_replies
.push_front(Pending::Sending { peer, io });
return Poll::Pending;
}
Poll::Ready(Ok(())) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::Identification(peer),
));
}
Poll::Ready(Err(err)) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::IdentificationError(ConnectionHandlerUpgrErr::Upgrade(
libp2p_core::upgrade::UpgradeError::Apply(err),
)),
))
}
}
}
}
}
}

Poll::Pending
}

Expand Down