From d926cd26450cc78fafbf1d3609a1b5142131cac0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 7 Nov 2022 16:14:25 +0000 Subject: [PATCH] request-response: replace inject_* with on_event --- protocols/request-response/src/handler.rs | 195 +++++++++++++--------- 1 file changed, 116 insertions(+), 79 deletions(-) diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index fd843b7d4735..49be0244f284 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -23,6 +23,10 @@ mod protocol; use crate::codec::RequestResponseCodec; use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; +use libp2p_swarm::handler::{ + DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, + StreamEvent, +}; pub use protocol::{ProtocolSupport, RequestProtocol, ResponseProtocol}; use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered}; @@ -86,7 +90,7 @@ where impl RequestResponseHandler where - TCodec: RequestResponseCodec, + TCodec: RequestResponseCodec + Send + Clone + 'static, { pub(super) fn new( inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, @@ -108,6 +112,83 @@ where inbound_request_id, } } + + fn on_fully_negotiated_inbound( + &mut self, + FullyNegotiatedInbound { + protocol: sent, + info: request_id, + }: FullyNegotiatedInbound< + ::InboundProtocol, + ::InboundOpenInfo, + >, + ) { + if sent { + self.pending_events + .push_back(RequestResponseHandlerEvent::ResponseSent(request_id)) + } else { + self.pending_events + .push_back(RequestResponseHandlerEvent::ResponseOmission(request_id)) + } + } + + fn on_dial_upgrade_error( + &mut self, + DialUpgradeError { info, error }: DialUpgradeError< + ::OutboundOpenInfo, + ::OutboundProtocol, + >, + ) { + match error { + ConnectionHandlerUpgrErr::Timeout => { + self.pending_events + .push_back(RequestResponseHandlerEvent::OutboundTimeout(info)); + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + // The remote merely doesn't support the protocol(s) we requested. + // This is no reason to close the connection, which may + // successfully communicate with other protocols already. + // An event is reported to permit user code to react to the fact that + // the remote peer does not support the requested protocol(s). + self.pending_events.push_back( + RequestResponseHandlerEvent::OutboundUnsupportedProtocols(info), + ); + } + _ => { + // Anything else is considered a fatal error or misbehaviour of + // the remote peer and results in closing the connection. + self.pending_error = Some(error); + } + } + } + fn on_listen_upgrade_error( + &mut self, + ListenUpgradeError { info, error }: ListenUpgradeError< + ::InboundOpenInfo, + ::InboundProtocol, + >, + ) { + match error { + ConnectionHandlerUpgrErr::Timeout => self + .pending_events + .push_back(RequestResponseHandlerEvent::InboundTimeout(info)), + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { + // The local peer merely doesn't support the protocol(s) requested. + // This is no reason to close the connection, which may + // successfully communicate with other protocols already. + // An event is reported to permit user code to react to the fact that + // the local peer does not support the requested protocol(s). + self.pending_events.push_back( + RequestResponseHandlerEvent::InboundUnsupportedProtocols(info), + ); + } + _ => { + // Anything else is considered a fatal error or misbehaviour of + // the remote peer and results in closing the connection. + self.pending_error = Some(error); + } + } + } } /// The events emitted by the [`RequestResponseHandler`]. @@ -236,88 +317,11 @@ where SubstreamProtocol::new(proto, request_id).with_timeout(self.substream_timeout) } - fn inject_fully_negotiated_inbound(&mut self, sent: bool, request_id: RequestId) { - if sent { - self.pending_events - .push_back(RequestResponseHandlerEvent::ResponseSent(request_id)) - } else { - self.pending_events - .push_back(RequestResponseHandlerEvent::ResponseOmission(request_id)) - } - } - - fn inject_fully_negotiated_outbound( - &mut self, - response: TCodec::Response, - request_id: RequestId, - ) { - self.pending_events - .push_back(RequestResponseHandlerEvent::Response { - request_id, - response, - }); - } - - fn inject_event(&mut self, request: Self::InEvent) { + fn on_behaviour_event(&mut self, request: Self::InEvent) { self.keep_alive = KeepAlive::Yes; self.outbound.push_back(request); } - fn inject_dial_upgrade_error( - &mut self, - info: RequestId, - error: ConnectionHandlerUpgrErr, - ) { - match error { - ConnectionHandlerUpgrErr::Timeout => { - self.pending_events - .push_back(RequestResponseHandlerEvent::OutboundTimeout(info)); - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { - // The remote merely doesn't support the protocol(s) we requested. - // This is no reason to close the connection, which may - // successfully communicate with other protocols already. - // An event is reported to permit user code to react to the fact that - // the remote peer does not support the requested protocol(s). - self.pending_events.push_back( - RequestResponseHandlerEvent::OutboundUnsupportedProtocols(info), - ); - } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error); - } - } - } - - fn inject_listen_upgrade_error( - &mut self, - info: RequestId, - error: ConnectionHandlerUpgrErr, - ) { - match error { - ConnectionHandlerUpgrErr::Timeout => self - .pending_events - .push_back(RequestResponseHandlerEvent::InboundTimeout(info)), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { - // The local peer merely doesn't support the protocol(s) requested. - // This is no reason to close the connection, which may - // successfully communicate with other protocols already. - // An event is reported to permit user code to react to the fact that - // the local peer does not support the requested protocol(s). - self.pending_events.push_back( - RequestResponseHandlerEvent::InboundUnsupportedProtocols(info), - ); - } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error); - } - } - } - fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive } @@ -387,4 +391,37 @@ where Poll::Pending } + + fn on_event( + &mut self, + event: StreamEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + StreamEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { + self.on_fully_negotiated_inbound(fully_negotiated_inbound) + } + StreamEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol: response, + info: request_id, + }) => { + self.pending_events + .push_back(RequestResponseHandlerEvent::Response { + request_id, + response, + }); + } + StreamEvent::DialUpgradeError(dial_upgrade_error) => { + self.on_dial_upgrade_error(dial_upgrade_error) + } + StreamEvent::ListenUpgradeError(listen_upgrade_error) => { + self.on_listen_upgrade_error(listen_upgrade_error) + } + StreamEvent::AddressChange(_) => {} + } + } }