Skip to content

Commit

Permalink
request-response: replace inject_* with on_event
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Nov 7, 2022
1 parent d020da5 commit d926cd2
Showing 1 changed file with 116 additions and 79 deletions.
195 changes: 116 additions & 79 deletions protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ mod protocol;
use crate::codec::RequestResponseCodec;
use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD};

use libp2p_swarm::handler::{
DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError,
StreamEvent,
};
pub use protocol::{ProtocolSupport, RequestProtocol, ResponseProtocol};

use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered};
Expand Down Expand Up @@ -86,7 +90,7 @@ where

impl<TCodec> RequestResponseHandler<TCodec>
where
TCodec: RequestResponseCodec,
TCodec: RequestResponseCodec + Send + Clone + 'static,
{
pub(super) fn new(
inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
Expand All @@ -108,6 +112,83 @@ where
inbound_request_id,
}
}

fn on_fully_negotiated_inbound(
&mut self,
FullyNegotiatedInbound {
protocol: sent,
info: request_id,
}: FullyNegotiatedInbound<
<Self as ConnectionHandler>::InboundProtocol,
<Self as ConnectionHandler>::InboundOpenInfo,
>,
) {
if sent {
self.pending_events
.push_back(RequestResponseHandlerEvent::ResponseSent(request_id))
} else {
self.pending_events
.push_back(RequestResponseHandlerEvent::ResponseOmission(request_id))
}
}

fn on_dial_upgrade_error(
&mut self,
DialUpgradeError { info, error }: DialUpgradeError<
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::OutboundProtocol,
>,
) {
match error {
ConnectionHandlerUpgrErr::Timeout => {
self.pending_events
.push_back(RequestResponseHandlerEvent::OutboundTimeout(info));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
// The remote merely doesn't support the protocol(s) we requested.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
// An event is reported to permit user code to react to the fact that
// the remote peer does not support the requested protocol(s).
self.pending_events.push_back(
RequestResponseHandlerEvent::OutboundUnsupportedProtocols(info),
);
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error);
}
}
}
fn on_listen_upgrade_error(
&mut self,
ListenUpgradeError { info, error }: ListenUpgradeError<
<Self as ConnectionHandler>::InboundOpenInfo,
<Self as ConnectionHandler>::InboundProtocol,
>,
) {
match error {
ConnectionHandlerUpgrErr::Timeout => self
.pending_events
.push_back(RequestResponseHandlerEvent::InboundTimeout(info)),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
// The local peer merely doesn't support the protocol(s) requested.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
// An event is reported to permit user code to react to the fact that
// the local peer does not support the requested protocol(s).
self.pending_events.push_back(
RequestResponseHandlerEvent::InboundUnsupportedProtocols(info),
);
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error);
}
}
}
}

/// The events emitted by the [`RequestResponseHandler`].
Expand Down Expand Up @@ -236,88 +317,11 @@ where
SubstreamProtocol::new(proto, request_id).with_timeout(self.substream_timeout)
}

fn inject_fully_negotiated_inbound(&mut self, sent: bool, request_id: RequestId) {
if sent {
self.pending_events
.push_back(RequestResponseHandlerEvent::ResponseSent(request_id))
} else {
self.pending_events
.push_back(RequestResponseHandlerEvent::ResponseOmission(request_id))
}
}

fn inject_fully_negotiated_outbound(
&mut self,
response: TCodec::Response,
request_id: RequestId,
) {
self.pending_events
.push_back(RequestResponseHandlerEvent::Response {
request_id,
response,
});
}

fn inject_event(&mut self, request: Self::InEvent) {
fn on_behaviour_event(&mut self, request: Self::InEvent) {
self.keep_alive = KeepAlive::Yes;
self.outbound.push_back(request);
}

fn inject_dial_upgrade_error(
&mut self,
info: RequestId,
error: ConnectionHandlerUpgrErr<io::Error>,
) {
match error {
ConnectionHandlerUpgrErr::Timeout => {
self.pending_events
.push_back(RequestResponseHandlerEvent::OutboundTimeout(info));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
// The remote merely doesn't support the protocol(s) we requested.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
// An event is reported to permit user code to react to the fact that
// the remote peer does not support the requested protocol(s).
self.pending_events.push_back(
RequestResponseHandlerEvent::OutboundUnsupportedProtocols(info),
);
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error);
}
}
}

fn inject_listen_upgrade_error(
&mut self,
info: RequestId,
error: ConnectionHandlerUpgrErr<io::Error>,
) {
match error {
ConnectionHandlerUpgrErr::Timeout => self
.pending_events
.push_back(RequestResponseHandlerEvent::InboundTimeout(info)),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
// The local peer merely doesn't support the protocol(s) requested.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
// An event is reported to permit user code to react to the fact that
// the local peer does not support the requested protocol(s).
self.pending_events.push_back(
RequestResponseHandlerEvent::InboundUnsupportedProtocols(info),
);
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error);
}
}
}

fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
Expand Down Expand Up @@ -387,4 +391,37 @@ where

Poll::Pending
}

fn on_event(
&mut self,
event: StreamEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
StreamEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
self.on_fully_negotiated_inbound(fully_negotiated_inbound)
}
StreamEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol: response,
info: request_id,
}) => {
self.pending_events
.push_back(RequestResponseHandlerEvent::Response {
request_id,
response,
});
}
StreamEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)
}
StreamEvent::ListenUpgradeError(listen_upgrade_error) => {
self.on_listen_upgrade_error(listen_upgrade_error)
}
StreamEvent::AddressChange(_) => {}
}
}
}

0 comments on commit d926cd2

Please sign in to comment.