From 2f5b8e6393286773abe66423b99583b26e2ecb49 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 17 Dec 2021 19:23:26 +0100 Subject: [PATCH 1/5] protocols/relay/src/v2: Rework outbound_hop error types --- protocols/relay/src/v2/client/handler.rs | 84 +++--------------- .../relay/src/v2/protocol/outbound_hop.rs | 86 ++++++++++++++++--- 2 files changed, 85 insertions(+), 85 deletions(-) diff --git a/protocols/relay/src/v2/client/handler.rs b/protocols/relay/src/v2/client/handler.rs index 998e1fb2ea5..aaf1f2c55ef 100644 --- a/protocols/relay/src/v2/client/handler.rs +++ b/protocols/relay/src/v2/client/handler.rs @@ -373,45 +373,16 @@ impl ProtocolsHandler for Handler { } ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => { match error { - outbound_hop::UpgradeError::Decode(_) - | outbound_hop::UpgradeError::Io(_) - | outbound_hop::UpgradeError::ParseTypeField - | outbound_hop::UpgradeError::ParseStatusField - | outbound_hop::UpgradeError::MissingStatusField - | outbound_hop::UpgradeError::MissingReservationField - | outbound_hop::UpgradeError::NoAddressesInReservation - | outbound_hop::UpgradeError::InvalidReservationExpiration - | outbound_hop::UpgradeError::InvalidReservationAddrs - | outbound_hop::UpgradeError::UnexpectedTypeConnect - | outbound_hop::UpgradeError::UnexpectedTypeReserve => { + error @ outbound_hop::UpgradeError::Fatal(_) => { self.pending_error = Some(ProtocolsHandlerUpgrErr::Upgrade( upgrade::UpgradeError::Apply(EitherError::B(error)), )); } - outbound_hop::UpgradeError::UnexpectedStatus(status) => { - match status { - Status::Ok => { - unreachable!( - "Status success was explicitly expected earlier." - ) - } - // With either status below there is either no reason to stay - // connected or it is a protocol violation. - // Thus terminate the connection. - Status::ConnectionFailed - | Status::NoReservation - | Status::PermissionDenied - | Status::UnexpectedMessage - | Status::MalformedMessage => { - self.pending_error = - Some(ProtocolsHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(EitherError::B(error)), - )); - } - // The connection to the relay might still proof helpful CONNECT - // requests. Thus do not terminate the connection. - Status::ReservationRefused | Status::ResourceLimitExceeded => {} - } + outbound_hop::UpgradeError::ReservationFailed(_) => {} + outbound_hop::UpgradeError::CircuitFailed(_) => { + unreachable!( + "Do not emitt `CircuitFailed` for outgoing reservation." + ) } } } @@ -451,47 +422,16 @@ impl ProtocolsHandler for Handler { } ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => { match error { - outbound_hop::UpgradeError::Decode(_) - | outbound_hop::UpgradeError::Io(_) - | outbound_hop::UpgradeError::ParseTypeField - | outbound_hop::UpgradeError::ParseStatusField - | outbound_hop::UpgradeError::MissingStatusField - | outbound_hop::UpgradeError::MissingReservationField - | outbound_hop::UpgradeError::NoAddressesInReservation - | outbound_hop::UpgradeError::InvalidReservationExpiration - | outbound_hop::UpgradeError::InvalidReservationAddrs - | outbound_hop::UpgradeError::UnexpectedTypeConnect - | outbound_hop::UpgradeError::UnexpectedTypeReserve => { + error @ outbound_hop::UpgradeError::Fatal(_) => { self.pending_error = Some(ProtocolsHandlerUpgrErr::Upgrade( upgrade::UpgradeError::Apply(EitherError::B(error)), )); } - outbound_hop::UpgradeError::UnexpectedStatus(status) => { - match status { - Status::Ok => { - unreachable!( - "Status success was explicitly expected earlier." - ) - } - // With either status below there is either no reason to stay - // connected or it is a protocol violation. - // Thus terminate the connection. - Status::ReservationRefused - | Status::UnexpectedMessage - | Status::MalformedMessage => { - self.pending_error = - Some(ProtocolsHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(EitherError::B(error)), - )); - } - // While useless for reaching this particular destination, the - // connection to the relay might still proof helpful for other - // destinations. Thus do not terminate the connection. - Status::ResourceLimitExceeded - | Status::ConnectionFailed - | Status::NoReservation - | Status::PermissionDenied => {} - } + outbound_hop::UpgradeError::CircuitFailed(_) => {} + outbound_hop::UpgradeError::ReservationFailed(_) => { + unreachable!( + "Do not emitt `ReservationFailed` for outgoing circuit." + ) } } } diff --git a/protocols/relay/src/v2/protocol/outbound_hop.rs b/protocols/relay/src/v2/protocol/outbound_hop.rs index f55c45d64c3..4405281e2f1 100644 --- a/protocols/relay/src/v2/protocol/outbound_hop.rs +++ b/protocols/relay/src/v2/protocol/outbound_hop.rs @@ -97,35 +97,42 @@ impl upgrade::OutboundUpgrade for Upgrade { status, } = HopMessage::decode(Cursor::new(msg))?; - let r#type = hop_message::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; + let r#type = + hop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; match r#type { - hop_message::Type::Connect => return Err(UpgradeError::UnexpectedTypeConnect), - hop_message::Type::Reserve => return Err(UpgradeError::UnexpectedTypeReserve), + hop_message::Type::Connect => Err(FatalUpgradeError::UnexpectedTypeConnect)?, + hop_message::Type::Reserve => Err(FatalUpgradeError::UnexpectedTypeReserve)?, hop_message::Type::Status => {} } - let status = Status::from_i32(status.ok_or(UpgradeError::MissingStatusField)?) - .ok_or(UpgradeError::ParseStatusField)?; - match status { - Status::Ok => {} - s => return Err(UpgradeError::UnexpectedStatus(s)), - } + let status = Status::from_i32(status.ok_or(FatalUpgradeError::MissingStatusField)?) + .ok_or(FatalUpgradeError::ParseStatusField)?; let limit = limit.map(Into::into); let output = match self { Upgrade::Reserve => { - let reservation = reservation.ok_or(UpgradeError::MissingReservationField)?; + match status { + Status::Ok => {} + Status::ReservationRefused => Err(ReservationFailedReason::Refused)?, + Status::ResourceLimitExceeded => { + Err(ReservationFailedReason::ResourceLimitExceeded)? + } + s => Err(FatalUpgradeError::UnexpectedStatus(s))?, + } + + let reservation = + reservation.ok_or(FatalUpgradeError::MissingReservationField)?; let addrs = if reservation.addrs.is_empty() { - return Err(UpgradeError::NoAddressesInReservation); + Err(FatalUpgradeError::NoAddressesInReservation)? } else { reservation .addrs .into_iter() .map(TryFrom::try_from) .collect::, _>>() - .map_err(|_| UpgradeError::InvalidReservationAddrs)? + .map_err(|_| FatalUpgradeError::InvalidReservationAddrs)? }; let renewal_timeout = reservation @@ -142,7 +149,7 @@ impl upgrade::OutboundUpgrade for Upgrade { .and_then(|duration| duration.checked_sub(duration / 4)) .map(Duration::from_secs) .map(Delay::new) - .ok_or(UpgradeError::InvalidReservationExpiration) + .ok_or(FatalUpgradeError::InvalidReservationExpiration) }) .transpose()?; @@ -155,6 +162,17 @@ impl upgrade::OutboundUpgrade for Upgrade { } } Upgrade::Connect { .. } => { + match status { + Status::Ok => {} + Status::ResourceLimitExceeded => { + Err(CircuitFailedReason::ResourceLimitExceeded)? + } + Status::ConnectionFailed => Err(CircuitFailedReason::ConnectionFailed)?, + Status::NoReservation => Err(CircuitFailedReason::NoReservation)?, + Status::PermissionDenied => Err(CircuitFailedReason::PermissionDenied)?, + s => Err(FatalUpgradeError::UnexpectedStatus(s))?, + } + let FramedParts { io, read_buffer, @@ -182,6 +200,48 @@ impl upgrade::OutboundUpgrade for Upgrade { #[derive(Debug, Error)] pub enum UpgradeError { + #[error("Reservation failed")] + ReservationFailed(#[from] ReservationFailedReason), + #[error("Circuit failed")] + CircuitFailed(#[from] CircuitFailedReason), + #[error("Fatal")] + Fatal(#[from] FatalUpgradeError), +} + +impl From for UpgradeError { + fn from(error: std::io::Error) -> Self { + Self::Fatal(error.into()) + } +} + +impl From for UpgradeError { + fn from(error: prost::DecodeError) -> Self { + Self::Fatal(error.into()) + } +} + +#[derive(Debug, Error)] +pub enum CircuitFailedReason { + #[error("Remote reported resource limit exceeded.")] + ResourceLimitExceeded, + #[error("Relay failed to connect to destination.")] + ConnectionFailed, + #[error("Relay has no reservation for destination.")] + NoReservation, + #[error("Remote denied permission.")] + PermissionDenied, +} + +#[derive(Debug, Error)] +pub enum ReservationFailedReason { + #[error("Reservation refused.")] + Refused, + #[error("Remote reported resource limit exceeded.")] + ResourceLimitExceeded, +} + +#[derive(Debug, Error)] +pub enum FatalUpgradeError { #[error("Failed to decode message: {0}.")] Decode( #[from] From 36276108fcfe21c48087bdffbc333991d27fe251 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 21 Dec 2021 16:09:24 +0100 Subject: [PATCH 2/5] protocols/relay/src/v2: Refactor inbound_stop error handling --- protocols/relay/src/v2/client.rs | 43 ++++--- protocols/relay/src/v2/client/handler.rs | 106 ++++++++++++------ .../relay/src/v2/protocol/inbound_stop.rs | 26 ++++- 3 files changed, 125 insertions(+), 50 deletions(-) diff --git a/protocols/relay/src/v2/client.rs b/protocols/relay/src/v2/client.rs index f8bdfe892ef..18790d0101d 100644 --- a/protocols/relay/src/v2/client.rs +++ b/protocols/relay/src/v2/client.rs @@ -23,7 +23,7 @@ mod handler; pub mod transport; -use crate::v2::protocol::{self, inbound_stop}; +use crate::v2::protocol::{self, inbound_stop, outbound_hop}; use bytes::Bytes; use futures::channel::mpsc::Receiver; use futures::channel::oneshot; @@ -36,6 +36,7 @@ use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::dial_opts::DialOpts; use libp2p_swarm::{ NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + ProtocolsHandlerUpgrErr, }; use std::collections::{hash_map, HashMap, VecDeque}; use std::io::{Error, IoSlice}; @@ -57,6 +58,7 @@ pub enum Event { relay_peer_id: PeerId, /// Indicates whether the request replaces an existing reservation. renewal: bool, + error: ProtocolsHandlerUpgrErr, }, OutboundCircuitEstablished { relay_peer_id: PeerId, @@ -64,16 +66,19 @@ pub enum Event { }, OutboundCircuitReqFailed { relay_peer_id: PeerId, + error: ProtocolsHandlerUpgrErr, }, /// An inbound circuit has been established. InboundCircuitEstablished { src_peer_id: PeerId, limit: Option, }, - /// An inbound circuit request has been denied. - InboundCircuitReqDenied { - src_peer_id: PeerId, + InboundCircuitReqFailed { + relay_peer_id: PeerId, + error: ProtocolsHandlerUpgrErr, }, + /// An inbound circuit request has been denied. + InboundCircuitReqDenied { src_peer_id: PeerId }, /// Denying an inbound circuit request failed. InboundCircuitReqDenyFailed { src_peer_id: PeerId, @@ -169,15 +174,15 @@ impl NetworkBehaviour for Client { limit, }, )), - handler::Event::ReservationReqFailed { renewal } => { - self.queued_actions - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::ReservationReqFailed { - relay_peer_id: event_source, - renewal, - }, - )) - } + handler::Event::ReservationReqFailed { renewal, error } => self + .queued_actions + .push_back(NetworkBehaviourAction::GenerateEvent( + Event::ReservationReqFailed { + relay_peer_id: event_source, + renewal, + error, + }, + )), handler::Event::OutboundCircuitEstablished { limit } => { self.queued_actions .push_back(NetworkBehaviourAction::GenerateEvent( @@ -187,11 +192,12 @@ impl NetworkBehaviour for Client { }, )) } - handler::Event::OutboundCircuitReqFailed {} => { + handler::Event::OutboundCircuitReqFailed { error } => { self.queued_actions .push_back(NetworkBehaviourAction::GenerateEvent( Event::OutboundCircuitReqFailed { relay_peer_id: event_source, + error, }, )) } @@ -200,6 +206,15 @@ impl NetworkBehaviour for Client { .push_back(NetworkBehaviourAction::GenerateEvent( Event::InboundCircuitEstablished { src_peer_id, limit }, )), + handler::Event::InboundCircuitReqFailed { error } => { + self.queued_actions + .push_back(NetworkBehaviourAction::GenerateEvent( + Event::InboundCircuitReqFailed { + relay_peer_id: event_source, + error, + }, + )) + } handler::Event::InboundCircuitReqDenied { src_peer_id } => self .queued_actions .push_back(NetworkBehaviourAction::GenerateEvent( diff --git a/protocols/relay/src/v2/client/handler.rs b/protocols/relay/src/v2/client/handler.rs index aaf1f2c55ef..66e7d77a067 100644 --- a/protocols/relay/src/v2/client/handler.rs +++ b/protocols/relay/src/v2/client/handler.rs @@ -76,21 +76,24 @@ pub enum Event { ReservationReqFailed { /// Indicates whether the request replaces an existing reservation. renewal: bool, + error: ProtocolsHandlerUpgrErr, }, /// An outbound circuit has been established. - OutboundCircuitEstablished { - limit: Option, + OutboundCircuitEstablished { limit: Option }, + OutboundCircuitReqFailed { + error: ProtocolsHandlerUpgrErr, }, - OutboundCircuitReqFailed {}, /// An inbound circuit has been established. InboundCircuitEstablished { src_peer_id: PeerId, limit: Option, }, - /// An inbound circuit request has been denied. - InboundCircuitReqDenied { - src_peer_id: PeerId, + /// An inbound circuit request has failed. + InboundCircuitReqFailed { + error: ProtocolsHandlerUpgrErr, }, + /// An inbound circuit request has been denied. + InboundCircuitReqDenied { src_peer_id: PeerId }, /// Denying an inbound circuit request failed. InboundCircuitReqDenyFailed { src_peer_id: PeerId, @@ -149,7 +152,7 @@ pub struct Handler { /// A pending fatal error that results in the connection being closed. pending_error: Option< ProtocolsHandlerUpgrErr< - EitherError, + EitherError, >, >, /// Until when to keep the connection alive. @@ -190,7 +193,7 @@ impl ProtocolsHandler for Handler { type InEvent = In; type OutEvent = Event; type Error = ProtocolsHandlerUpgrErr< - EitherError, + EitherError, >; type InboundProtocol = inbound_stop::Upgrade; type OutboundProtocol = outbound_hop::Upgrade; @@ -328,24 +331,37 @@ impl ProtocolsHandler for Handler { _: Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>, ) { - match error { - ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => {} + let non_fatal_error = match error { + ProtocolsHandlerUpgrErr::Timeout => ProtocolsHandlerUpgrErr::Timeout, + ProtocolsHandlerUpgrErr::Timer => ProtocolsHandlerUpgrErr::Timer, ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( upgrade::NegotiationError::Failed, - )) => {} + )) => ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( + upgrade::NegotiationError::Failed, + )), ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( upgrade::NegotiationError::ProtocolError(e), )) => { self.pending_error = Some(ProtocolsHandlerUpgrErr::Upgrade( upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)), )); + return; } - ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => { + ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply( + inbound_stop::UpgradeError::Fatal(error), + )) => { self.pending_error = Some(ProtocolsHandlerUpgrErr::Upgrade( upgrade::UpgradeError::Apply(EitherError::A(error)), - )) + )); + return; } - } + }; + + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::InboundCircuitReqFailed { + error: non_fatal_error, + }, + )); } fn inject_dial_upgrade_error( @@ -355,13 +371,14 @@ impl ProtocolsHandler for Handler { ) { match open_info { OutboundOpenInfo::Reserve { mut to_listener } => { - let event = self.reservation.failed(); - - match error { - ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => {} + let non_fatal_error = match error { + ProtocolsHandlerUpgrErr::Timeout => ProtocolsHandlerUpgrErr::Timeout, + ProtocolsHandlerUpgrErr::Timer => ProtocolsHandlerUpgrErr::Timer, ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( upgrade::NegotiationError::Failed, - )) => {} + )) => ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( + upgrade::NegotiationError::Failed, + )), ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( upgrade::NegotiationError::ProtocolError(e), )) => { @@ -370,15 +387,21 @@ impl ProtocolsHandler for Handler { upgrade::NegotiationError::ProtocolError(e), ), )); + return; } ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => { match error { - error @ outbound_hop::UpgradeError::Fatal(_) => { + outbound_hop::UpgradeError::Fatal(error) => { self.pending_error = Some(ProtocolsHandlerUpgrErr::Upgrade( upgrade::UpgradeError::Apply(EitherError::B(error)), )); + return; + } + outbound_hop::UpgradeError::ReservationFailed(error) => { + ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply( + error, + )) } - outbound_hop::UpgradeError::ReservationFailed(_) => {} outbound_hop::UpgradeError::CircuitFailed(_) => { unreachable!( "Do not emitt `CircuitFailed` for outgoing reservation." @@ -386,7 +409,7 @@ impl ProtocolsHandler for Handler { } } } - } + }; if self.pending_error.is_none() { self.send_error_futs.push( @@ -402,15 +425,23 @@ impl ProtocolsHandler for Handler { // Transport is notified through dropping `to_listener`. } - self.queued_events - .push_back(ProtocolsHandlerEvent::Custom(event)); + let renewal = self.reservation.failed(); + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::ReservationReqFailed { + renewal, + error: non_fatal_error, + }, + )); } OutboundOpenInfo::Connect { send_back } => { - match error { - ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => {} + let non_fatal_error = match error { + ProtocolsHandlerUpgrErr::Timeout => ProtocolsHandlerUpgrErr::Timeout, + ProtocolsHandlerUpgrErr::Timer => ProtocolsHandlerUpgrErr::Timer, ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( upgrade::NegotiationError::Failed, - )) => {} + )) => ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( + upgrade::NegotiationError::Failed, + )), ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( upgrade::NegotiationError::ProtocolError(e), )) => { @@ -419,15 +450,21 @@ impl ProtocolsHandler for Handler { upgrade::NegotiationError::ProtocolError(e), ), )); + return; } ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => { match error { - error @ outbound_hop::UpgradeError::Fatal(_) => { + outbound_hop::UpgradeError::Fatal(error) => { self.pending_error = Some(ProtocolsHandlerUpgrErr::Upgrade( upgrade::UpgradeError::Apply(EitherError::B(error)), )); + return; + } + outbound_hop::UpgradeError::CircuitFailed(error) => { + ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply( + error, + )) } - outbound_hop::UpgradeError::CircuitFailed(_) => {} outbound_hop::UpgradeError::ReservationFailed(_) => { unreachable!( "Do not emitt `ReservationFailed` for outgoing circuit." @@ -440,7 +477,9 @@ impl ProtocolsHandler for Handler { let _ = send_back.send(Err(())); self.queued_events.push_back(ProtocolsHandlerEvent::Custom( - Event::OutboundCircuitReqFailed {}, + Event::OutboundCircuitReqFailed { + error: non_fatal_error, + }, )); } } @@ -578,7 +617,10 @@ impl Reservation { Event::ReservationReqAccepted { renewal, limit } } - fn failed(&mut self) -> Event { + /// Marks the current reservation as failed. + /// + /// Returns whether the reservation request was a renewal. + fn failed(&mut self) -> bool { let renewal = matches!( self, Reservation::Accepted { .. } | Reservation::Renewing { .. } @@ -586,7 +628,7 @@ impl Reservation { *self = Reservation::None; - Event::ReservationReqFailed { renewal } + renewal } fn forward_messages_to_transport_listener(&mut self, cx: &mut Context<'_>) { diff --git a/protocols/relay/src/v2/protocol/inbound_stop.rs b/protocols/relay/src/v2/protocol/inbound_stop.rs index 27a1fb9acde..6050edb07a2 100644 --- a/protocols/relay/src/v2/protocol/inbound_stop.rs +++ b/protocols/relay/src/v2/protocol/inbound_stop.rs @@ -66,19 +66,19 @@ impl upgrade::InboundUpgrade for Upgrade { } = StopMessage::decode(Cursor::new(msg))?; let r#type = - stop_message::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; + stop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; match r#type { stop_message::Type::Connect => { let src_peer_id = - PeerId::from_bytes(&peer.ok_or(UpgradeError::MissingPeer)?.id) - .map_err(|_| UpgradeError::ParsePeerId)?; + PeerId::from_bytes(&peer.ok_or(FatalUpgradeError::MissingPeer)?.id) + .map_err(|_| FatalUpgradeError::ParsePeerId)?; Ok(Circuit { substream, src_peer_id, limit: limit.map(Into::into), }) } - stop_message::Type::Status => Err(UpgradeError::UnexpectedTypeStatus), + stop_message::Type::Status => Err(FatalUpgradeError::UnexpectedTypeStatus)?, } } .boxed() @@ -87,6 +87,24 @@ impl upgrade::InboundUpgrade for Upgrade { #[derive(Debug, Error)] pub enum UpgradeError { + #[error("Fatal")] + Fatal(#[from] FatalUpgradeError), +} + +impl From for UpgradeError { + fn from(error: prost::DecodeError) -> Self { + Self::Fatal(error.into()) + } +} + +impl From for UpgradeError { + fn from(error: std::io::Error) -> Self { + Self::Fatal(error.into()) + } +} + +#[derive(Debug, Error)] +pub enum FatalUpgradeError { #[error("Failed to decode message: {0}.")] Decode( #[from] From 9a4ad189fc1b01136f850d601c50619d4fe2fb08 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 21 Dec 2021 16:43:32 +0100 Subject: [PATCH 3/5] protocols/relay/src/v2: Refactor outbound_stop error handling --- .../relay/src/v2/protocol/outbound_stop.rs | 40 +++++++-- protocols/relay/src/v2/relay.rs | 22 ++++- protocols/relay/src/v2/relay/handler.rs | 82 ++++++++----------- 3 files changed, 90 insertions(+), 54 deletions(-) diff --git a/protocols/relay/src/v2/protocol/outbound_stop.rs b/protocols/relay/src/v2/protocol/outbound_stop.rs index b3de1e98383..f21616cf935 100644 --- a/protocols/relay/src/v2/protocol/outbound_stop.rs +++ b/protocols/relay/src/v2/protocol/outbound_stop.rs @@ -95,17 +95,19 @@ impl upgrade::OutboundUpgrade for Upgrade { } = StopMessage::decode(Cursor::new(msg))?; let r#type = - stop_message::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; + stop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; match r#type { - stop_message::Type::Connect => return Err(UpgradeError::UnexpectedTypeConnect), + stop_message::Type::Connect => Err(FatalUpgradeError::UnexpectedTypeConnect)?, stop_message::Type::Status => {} } - let status = Status::from_i32(status.ok_or(UpgradeError::MissingStatusField)?) - .ok_or(UpgradeError::ParseStatusField)?; + let status = Status::from_i32(status.ok_or(FatalUpgradeError::MissingStatusField)?) + .ok_or(FatalUpgradeError::ParseStatusField)?; match status { Status::Ok => {} - s => return Err(UpgradeError::UnexpectedStatus(s)), + Status::ResourceLimitExceeded => Err(CircuitFailedReason::ResourceLimitExceeded)?, + Status::PermissionDenied => Err(CircuitFailedReason::PermissionDenied)?, + s => Err(FatalUpgradeError::UnexpectedStatus(s))?, } let FramedParts { @@ -127,6 +129,34 @@ impl upgrade::OutboundUpgrade for Upgrade { #[derive(Debug, Error)] pub enum UpgradeError { + #[error("Circuit failed")] + CircuitFailed(#[from] CircuitFailedReason), + #[error("Fatal")] + Fatal(#[from] FatalUpgradeError), +} + +impl From for UpgradeError { + fn from(error: std::io::Error) -> Self { + Self::Fatal(error.into()) + } +} + +impl From for UpgradeError { + fn from(error: prost::DecodeError) -> Self { + Self::Fatal(error.into()) + } +} + +#[derive(Debug, Error)] +pub enum CircuitFailedReason { + #[error("Remote reported resource limit exceeded.")] + ResourceLimitExceeded, + #[error("Remote reported permission denied.")] + PermissionDenied, +} + +#[derive(Debug, Error)] +pub enum FatalUpgradeError { #[error("Failed to decode message: {0}.")] Decode( #[from] diff --git a/protocols/relay/src/v2/relay.rs b/protocols/relay/src/v2/relay.rs index c9ee35dcd6c..e2240ea2a0e 100644 --- a/protocols/relay/src/v2/relay.rs +++ b/protocols/relay/src/v2/relay.rs @@ -29,13 +29,18 @@ use instant::Instant; use libp2p_core::connection::{ConnectedPoint, ConnectionId}; use libp2p_core::multiaddr::Protocol; use libp2p_core::PeerId; -use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters}; +use libp2p_swarm::{ + NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + ProtocolsHandlerUpgrErr, +}; use std::collections::{HashMap, HashSet, VecDeque}; use std::num::NonZeroU32; use std::ops::Add; use std::task::{Context, Poll}; use std::time::Duration; +use super::protocol::outbound_stop; + /// Configuration for the [`Relay`] [`NetworkBehaviour`]. /// /// # Panics @@ -132,6 +137,12 @@ pub enum Event { src_peer_id: PeerId, dst_peer_id: PeerId, }, + /// An outbound connect for an inbound cirucit request failed. + CircuitReqOutboundConnectFailed { + src_peer_id: PeerId, + dst_peer_id: PeerId, + error: ProtocolsHandlerUpgrErr, + }, /// Accepting an inbound circuit request failed. CircuitReqAcceptFailed { src_peer_id: PeerId, @@ -489,6 +500,7 @@ impl NetworkBehaviour for Relay { src_connection_id, inbound_circuit_req, status, + error, } => { self.queued_actions.push_back( NetworkBehaviourAction::NotifyHandler { @@ -502,6 +514,14 @@ impl NetworkBehaviour for Relay { } .into(), ); + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::CircuitReqOutboundConnectFailed { + src_peer_id, + dst_peer_id: event_source, + error, + }) + .into(), + ); } handler::Event::CircuitReqAccepted { dst_peer_id, diff --git a/protocols/relay/src/v2/relay/handler.rs b/protocols/relay/src/v2/relay/handler.rs index 00495e35532..e4f92bafc5a 100644 --- a/protocols/relay/src/v2/relay/handler.rs +++ b/protocols/relay/src/v2/relay/handler.rs @@ -200,6 +200,7 @@ pub enum Event { src_connection_id: ConnectionId, inbound_circuit_req: inbound_hop::CircuitReq, status: Status, + error: ProtocolsHandlerUpgrErr, }, /// An inbound circuit has closed. CircuitClosed { @@ -298,12 +299,14 @@ impl fmt::Debug for Event { src_connection_id, inbound_circuit_req: _, status, + error, } => f .debug_struct("Event::OutboundConnectNegotiationFailed") .field("circuit_id", circuit_id) .field("src_peer_id", src_peer_id) .field("src_connection_id", src_connection_id) .field("status", status) + .field("error", error) .finish(), Event::CircuitClosed { circuit_id, @@ -373,7 +376,7 @@ pub struct Handler { /// A pending fatal error that results in the connection being closed. pending_error: Option< ProtocolsHandlerUpgrErr< - EitherError, + EitherError, >, >, @@ -410,7 +413,7 @@ impl ProtocolsHandler for Handler { type InEvent = In; type OutEvent = Event; type Error = ProtocolsHandlerUpgrErr< - EitherError, + EitherError, >; type InboundProtocol = inbound_hop::Upgrade; type OutboundProtocol = outbound_stop::Upgrade; @@ -592,9 +595,12 @@ impl ProtocolsHandler for Handler { open_info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>, ) { - let status = match error { - ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => { - Status::ConnectionFailed + let (non_fatal_error, status) = match error { + ProtocolsHandlerUpgrErr::Timeout => { + (ProtocolsHandlerUpgrErr::Timeout, Status::ConnectionFailed) + } + ProtocolsHandlerUpgrErr::Timer => { + (ProtocolsHandlerUpgrErr::Timer, Status::ConnectionFailed) } ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( upgrade::NegotiationError::Failed, @@ -604,7 +610,7 @@ impl ProtocolsHandler for Handler { self.pending_error = Some(ProtocolsHandlerUpgrErr::Upgrade( upgrade::UpgradeError::Select(upgrade::NegotiationError::Failed), )); - Status::ConnectionFailed + return; } ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( upgrade::NegotiationError::ProtocolError(e), @@ -612,51 +618,30 @@ impl ProtocolsHandler for Handler { self.pending_error = Some(ProtocolsHandlerUpgrErr::Upgrade( upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)), )); - Status::ConnectionFailed + return; } - ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => { - match error { - outbound_stop::UpgradeError::Decode(_) - | outbound_stop::UpgradeError::Io(_) - | outbound_stop::UpgradeError::ParseTypeField - | outbound_stop::UpgradeError::MissingStatusField - | outbound_stop::UpgradeError::ParseStatusField - | outbound_stop::UpgradeError::UnexpectedTypeConnect => { - self.pending_error = Some(ProtocolsHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(EitherError::B(error)), - )); - Status::ConnectionFailed - } - outbound_stop::UpgradeError::UnexpectedStatus(status) => { - match status { - Status::Ok => { - unreachable!("Status success is explicitly exempt.") - } - // A destination node returning nonsensical status is a protocol - // violation. Thus terminate the connection. - Status::ReservationRefused - | Status::NoReservation - | Status::ConnectionFailed => { - self.pending_error = Some(ProtocolsHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(EitherError::B(error)), - )); - } - // With either status below there is no reason to stay connected. - // Thus terminate the connection. - Status::MalformedMessage | Status::UnexpectedMessage => { - self.pending_error = Some(ProtocolsHandlerUpgrErr::Upgrade( - upgrade::UpgradeError::Apply(EitherError::B(error)), - )) - } - // While useless for reaching this particular destination, the - // connection to the relay might still prove helpful for other - // destinations. Thus do not terminate the connection. - Status::ResourceLimitExceeded | Status::PermissionDenied => {} + ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => match error { + outbound_stop::UpgradeError::Fatal(error) => { + self.pending_error = Some(ProtocolsHandlerUpgrErr::Upgrade( + upgrade::UpgradeError::Apply(EitherError::B(error)), + )); + return; + } + outbound_stop::UpgradeError::CircuitFailed(error) => { + let status = match error { + outbound_stop::CircuitFailedReason::ResourceLimitExceeded => { + Status::ResourceLimitExceeded } - status - } + outbound_stop::CircuitFailedReason::PermissionDenied => { + Status::PermissionDenied + } + }; + ( + ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)), + status, + ) } - } + }, }; let OutboundOpenInfo { @@ -673,6 +658,7 @@ impl ProtocolsHandler for Handler { src_connection_id, inbound_circuit_req, status, + error: non_fatal_error, }, )); } From 2aa6e94c1f8aec4f15117b46dcd560e55953f246 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 21 Dec 2021 16:54:12 +0100 Subject: [PATCH 4/5] protocols/relay/src/v2: Refactor inbound_hop error handling --- .../relay/src/v2/protocol/inbound_hop.rs | 27 ++++++++++++-- protocols/relay/src/v2/relay.rs | 13 +++++++ protocols/relay/src/v2/relay/handler.rs | 37 +++++++++++++++---- 3 files changed, 65 insertions(+), 12 deletions(-) diff --git a/protocols/relay/src/v2/protocol/inbound_hop.rs b/protocols/relay/src/v2/protocol/inbound_hop.rs index 0da45c603ab..b9fc2ec569f 100644 --- a/protocols/relay/src/v2/protocol/inbound_hop.rs +++ b/protocols/relay/src/v2/protocol/inbound_hop.rs @@ -72,7 +72,8 @@ impl upgrade::InboundUpgrade for Upgrade { status: _, } = HopMessage::decode(Cursor::new(msg))?; - let r#type = hop_message::Type::from_i32(r#type).ok_or(UpgradeError::ParseTypeField)?; + let r#type = + hop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; match r#type { hop_message::Type::Reserve => Ok(Req::Reserve(ReservationReq { substream, @@ -81,11 +82,11 @@ impl upgrade::InboundUpgrade for Upgrade { max_circuit_bytes: self.max_circuit_bytes, })), hop_message::Type::Connect => { - let dst = PeerId::from_bytes(&peer.ok_or(UpgradeError::MissingPeer)?.id) - .map_err(|_| UpgradeError::ParsePeerId)?; + let dst = PeerId::from_bytes(&peer.ok_or(FatalUpgradeError::MissingPeer)?.id) + .map_err(|_| FatalUpgradeError::ParsePeerId)?; Ok(Req::Connect(CircuitReq { dst, substream })) } - hop_message::Type::Status => Err(UpgradeError::UnexpectedTypeStatus), + hop_message::Type::Status => Err(FatalUpgradeError::UnexpectedTypeStatus)?, } } .boxed() @@ -94,6 +95,24 @@ impl upgrade::InboundUpgrade for Upgrade { #[derive(Debug, Error)] pub enum UpgradeError { + #[error("Fatal")] + Fatal(#[from] FatalUpgradeError), +} + +impl From for UpgradeError { + fn from(error: prost::DecodeError) -> Self { + Self::Fatal(error.into()) + } +} + +impl From for UpgradeError { + fn from(error: std::io::Error) -> Self { + Self::Fatal(error.into()) + } +} + +#[derive(Debug, Error)] +pub enum FatalUpgradeError { #[error("Failed to decode message: {0}.")] Decode( #[from] diff --git a/protocols/relay/src/v2/relay.rs b/protocols/relay/src/v2/relay.rs index e2240ea2a0e..b1d29937abf 100644 --- a/protocols/relay/src/v2/relay.rs +++ b/protocols/relay/src/v2/relay.rs @@ -121,6 +121,10 @@ pub enum Event { }, /// An inbound reservation has timed out. ReservationTimedOut { src_peer_id: PeerId }, + CircuitReqReceiveFailed { + src_peer_id: PeerId, + error: ProtocolsHandlerUpgrErr, + }, /// An inbound circuit request has been denied. CircuitReqDenied { src_peer_id: PeerId, @@ -435,6 +439,15 @@ impl NetworkBehaviour for Relay { }; self.queued_actions.push_back(action.into()); } + handler::Event::CircuitReqReceiveFailed { error } => { + self.queued_actions.push_back( + NetworkBehaviourAction::GenerateEvent(Event::CircuitReqReceiveFailed { + src_peer_id: event_source, + error, + }) + .into(), + ); + } handler::Event::CircuitReqDenied { circuit_id, dst_peer_id, diff --git a/protocols/relay/src/v2/relay/handler.rs b/protocols/relay/src/v2/relay/handler.rs index e4f92bafc5a..2c3fafa7e68 100644 --- a/protocols/relay/src/v2/relay/handler.rs +++ b/protocols/relay/src/v2/relay/handler.rs @@ -160,6 +160,10 @@ pub enum Event { inbound_circuit_req: inbound_hop::CircuitReq, endpoint: ConnectedPoint, }, + /// Receiving an inbound circuit request failed. + CircuitReqReceiveFailed { + error: ProtocolsHandlerUpgrErr, + }, /// An inbound circuit request has been denied. CircuitReqDenied { circuit_id: Option, @@ -243,6 +247,10 @@ impl fmt::Debug for Event { .debug_struct("Event::CircuitReqReceived") .field("endpoint", endpoint) .finish(), + Event::CircuitReqReceiveFailed { error } => f + .debug_struct("Event::CircuitReqReceiveFailed") + .field("error", error) + .finish(), Event::CircuitReqDenied { circuit_id, dst_peer_id, @@ -376,7 +384,7 @@ pub struct Handler { /// A pending fatal error that results in the connection being closed. pending_error: Option< ProtocolsHandlerUpgrErr< - EitherError, + EitherError, >, >, @@ -413,7 +421,7 @@ impl ProtocolsHandler for Handler { type InEvent = In; type OutEvent = Event; type Error = ProtocolsHandlerUpgrErr< - EitherError, + EitherError, >; type InboundProtocol = inbound_hop::Upgrade; type OutboundProtocol = outbound_stop::Upgrade; @@ -570,24 +578,37 @@ impl ProtocolsHandler for Handler { _: Self::InboundOpenInfo, error: ProtocolsHandlerUpgrErr<::Error>, ) { - match error { - ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => {} + let non_fatal_error = match error { + ProtocolsHandlerUpgrErr::Timeout => ProtocolsHandlerUpgrErr::Timeout, + ProtocolsHandlerUpgrErr::Timer => ProtocolsHandlerUpgrErr::Timer, ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( upgrade::NegotiationError::Failed, - )) => {} + )) => ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( + upgrade::NegotiationError::Failed, + )), ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Select( upgrade::NegotiationError::ProtocolError(e), )) => { self.pending_error = Some(ProtocolsHandlerUpgrErr::Upgrade( upgrade::UpgradeError::Select(upgrade::NegotiationError::ProtocolError(e)), )); + return; } - ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply(error)) => { + ProtocolsHandlerUpgrErr::Upgrade(upgrade::UpgradeError::Apply( + inbound_hop::UpgradeError::Fatal(error), + )) => { self.pending_error = Some(ProtocolsHandlerUpgrErr::Upgrade( upgrade::UpgradeError::Apply(EitherError::A(error)), - )) + )); + return; } - } + }; + + self.queued_events.push_back(ProtocolsHandlerEvent::Custom( + Event::CircuitReqReceiveFailed { + error: non_fatal_error, + }, + )); } fn inject_dial_upgrade_error( From 6916384a900be45ac8aa5ff81c9c7b513317646e Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 21 Dec 2021 17:00:17 +0100 Subject: [PATCH 5/5] misc/metrics/src/relay: Update to error handling refactoring --- misc/metrics/src/relay.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/misc/metrics/src/relay.rs b/misc/metrics/src/relay.rs index dece3a7f294..db2b682b9d6 100644 --- a/misc/metrics/src/relay.rs +++ b/misc/metrics/src/relay.rs @@ -54,8 +54,10 @@ enum EventType { ReservationReqDenied, ReservationReqDenyFailed, ReservationTimedOut, + CircuitReqReceiveFailed, CircuitReqDenied, CircuitReqDenyFailed, + CircuitReqOutboundConnectFailed, CircuitReqAccepted, CircuitReqAcceptFailed, CircuitClosed, @@ -79,7 +81,13 @@ impl From<&libp2p_relay::v2::relay::Event> for EventType { libp2p_relay::v2::relay::Event::ReservationTimedOut { .. } => { EventType::ReservationTimedOut } + libp2p_relay::v2::relay::Event::CircuitReqReceiveFailed { .. } => { + EventType::CircuitReqReceiveFailed + } libp2p_relay::v2::relay::Event::CircuitReqDenied { .. } => EventType::CircuitReqDenied, + libp2p_relay::v2::relay::Event::CircuitReqOutboundConnectFailed { .. } => { + EventType::CircuitReqOutboundConnectFailed + } libp2p_relay::v2::relay::Event::CircuitReqDenyFailed { .. } => { EventType::CircuitReqDenyFailed }