Skip to content

Commit

Permalink
identify: Move I/O from Networkehaviour,
Browse files Browse the repository at this point in the history
reply to the Identification requests from the ConnectionHandler.
  • Loading branch information
jxs committed Dec 7, 2022
1 parent 8edad3f commit 0ad547f
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 3 deletions.
6 changes: 6 additions & 0 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ impl NetworkBehaviour for Behaviour {
score: AddressScore::Finite(1),
});
}
handler::Event::Identification(peer) => {
self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Sent {
peer_id: peer,
}));
}
handler::Event::IdentificationPushed => {
self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Pushed {
Expand Down
64 changes: 61 additions & 3 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use libp2p_swarm::{
};
use log::warn;
use smallvec::SmallVec;
use std::collections::VecDeque;
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};

pub struct Proto {
Expand Down Expand Up @@ -64,6 +65,17 @@ impl IntoConnectionHandler for Proto {
}
}

/// A pending reply to an inbound identification request.
enum Pending {
/// The reply is queued for sending.
Queued(Reply),
/// The reply is being sent.
Sending {
peer: PeerId,
io: Pin<Box<dyn Future<Output = Result<(), UpgradeError>> + Send>>,
},
}

/// A reply to an inbound identification request.
#[derive(Debug)]
pub struct Reply {
Expand All @@ -90,6 +102,9 @@ pub struct Handler {
>; 4],
>,

/// Pending replies to send.
pending_replies: VecDeque<Pending>,

/// Future that fires when we need to identify the node again.
trigger_next_identify: Delay,

Expand All @@ -106,11 +121,13 @@ pub struct Handler {
pub enum Event {
/// We obtained identification information from the remote.
Identified(Info),
/// We replied to an identification request from the remote.
Identification(PeerId),
/// We actively pushed our identification information to the remote.
IdentificationPushed,
/// We received a request for identification.
Identify(ReplySubstream<NegotiatedSubstream>),
/// Failed to identify the remote.
/// Failed to identify the remote, or to reply to an identification request.
IdentificationError(ConnectionHandlerUpgrErr<UpgradeError>),
}

Expand All @@ -129,6 +146,7 @@ impl Handler {
remote_peer_id,
inbound_identify_push: Default::default(),
events: SmallVec::new(),
pending_replies: VecDeque::new(),
trigger_next_identify: Delay::new(initial_delay),
keep_alive: KeepAlive::Yes,
interval,
Expand Down Expand Up @@ -230,8 +248,15 @@ impl ConnectionHandler for Handler {
),
});
}
InEvent::Identify(_) => {
todo!()
InEvent::Identify(reply) => {
if !self.pending_replies.is_empty() {
warn!(
"New inbound identify request from {} while a previous one \
is still pending. Queueing the new one.",
reply.peer,
);
}
self.pending_replies.push_back(Pending::Queued(reply));
}
}
}
Expand Down Expand Up @@ -274,6 +299,39 @@ impl ConnectionHandler for Handler {
}
}

// Check for pending replies to send.
if let Some(mut pending) = self.pending_replies.pop_front() {
loop {
match pending {
Pending::Queued(Reply { peer, io, info }) => {
let io = Box::pin(io.send(info));
pending = Pending::Sending { peer, io };
}
Pending::Sending { peer, mut io } => {
match Future::poll(Pin::new(&mut io), cx) {
Poll::Pending => {
self.pending_replies
.push_front(Pending::Sending { peer, io });
return Poll::Pending;
}
Poll::Ready(Ok(())) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::Identification(peer),
));
}
Poll::Ready(Err(err)) => {
return Poll::Ready(ConnectionHandlerEvent::Custom(
Event::IdentificationError(ConnectionHandlerUpgrErr::Upgrade(
libp2p_core::upgrade::UpgradeError::Apply(err),
)),
))
}
}
}
}
}
}

Poll::Pending
}

Expand Down

0 comments on commit 0ad547f

Please sign in to comment.