From 4472dc175c65452eadb18e2969641957cb27c207 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 14 Jun 2022 11:39:31 +0200 Subject: [PATCH] Add fake out requests system in peers.rs (#2369) * Add fake out requests system in peers.rs * CHANGELOG --- bin/wasm-node/CHANGELOG.md | 1 + src/libp2p/peers.rs | 154 ++++++++++++++++++++++++++++--------- 2 files changed, 117 insertions(+), 38 deletions(-) diff --git a/bin/wasm-node/CHANGELOG.md b/bin/wasm-node/CHANGELOG.md index 4e957ec196..c572dae8e1 100644 --- a/bin/wasm-node/CHANGELOG.md +++ b/bin/wasm-node/CHANGELOG.md @@ -12,6 +12,7 @@ - Fix another panic in case of a carefully-crafted LEB128 length. ([#2337](https://github.com/paritytech/smoldot/pull/2337)) - Fix a panic when decoding a block header containing a large number of Aura authorities. ([#2338](https://github.com/paritytech/smoldot/pull/2338)) - Fix multiple panics when decoding network messages in case where these messages were truncated. ([#2340](https://github.com/paritytech/smoldot/pull/2340), [#2355](https://github.com/paritytech/smoldot/pull/2355)) +- Fix panic when the Kademlia random discovery process initiates a request on a connection that has just started shutting down. ([#2369](https://github.com/paritytech/smoldot/pull/2369)) - Fix subscriptions to `chainHead_unstable_follow` being immediately shut down if the gap between the finalized block and the best block is above a certain threshold. This could lead to loops where the JSON-RPC client tries to re-open a subscription, only for it to be immediately shut down again. ## 0.6.17 - 2022-05-31 diff --git a/src/libp2p/peers.rs b/src/libp2p/peers.rs index 2882d8492d..a9c9c52635 100644 --- a/src/libp2p/peers.rs +++ b/src/libp2p/peers.rs @@ -150,6 +150,13 @@ pub struct Peers { /// function. // TODO: explain why it can't grow unbounded pending_desired_out_notifs: VecDeque<(DesiredOutNotificationId, usize, usize)>, + + /// List of so-called "fake outgoing requests" for which an event about their failure should + /// be reported as soon as possible. + fake_out_requests_to_report: VecDeque, + + /// Identifier to assign to the next fake outgoing requests. Increased one by one. + next_fake_out_request_id: u64, } /// See [`Peers::peers_notifications_out`]. @@ -183,6 +190,14 @@ struct Connection { /// - If the handshake is in progress and the connection is inbound, contains `None`. peer_index: Option, + /// List of so-called fake outgoing requests that [`Peers::start_request`] has created + /// regarding this connection. Always empty unless the connection is currently shutting down. + /// The API user thinks that the connection is currently executing this request, but in + /// reality it is not. When the connection finishes its shutdown, these fake requests are + /// pushed to [`Peers::fake_out_requests_to_report`] in order to be reported as having failed. + fake_out_requests: Vec, + + /// Opaque data decided by the API user. user_data: TConn, } @@ -220,6 +235,8 @@ where pending_desired_out_notifs: VecDeque::new(), // TODO: capacity? desired_in_notifications: slab::Slab::new(), // TODO: capacity? desired_out_notifications: slab::Slab::new(), // TODO: capacity? + fake_out_requests_to_report: VecDeque::with_capacity(8), + next_fake_out_request_id: 0, } } @@ -281,6 +298,16 @@ where /// Returns the next event produced by the service. pub fn next_event(&mut self) -> Option> { loop { + if let Some(fake_out_request_to_report) = self.fake_out_requests_to_report.pop_front() { + if self.fake_out_requests_to_report.is_empty() { + self.fake_out_requests_to_report.shrink_to(8) + }; + return Some(Event::Response { + request_id: OutRequestId(OutRequestIdInner::Fake(fake_out_request_to_report)), + response: Err(RequestError::ConnectionShutdown), + }); + } + let event = match self.inner.next_event() { Some(ev) => ev, None => return None, @@ -393,6 +420,7 @@ where user_data: Connection { peer_index: Some(expected_peer_index), + fake_out_requests, user_data, }, } => { @@ -425,6 +453,8 @@ where self.try_clean_up_peer(expected_peer_index); + self.fake_out_requests_to_report.extend(fake_out_requests); + // Only produce a `Disconnected` event if connection wasn't handshaking. if was_established { return Some(Event::Disconnected { @@ -438,12 +468,15 @@ where collection::Event::Shutdown { user_data: Connection { - peer_index: None, .. + peer_index: None, + fake_out_requests, + .. }, .. } => { // Connection was incoming but its handshake wasn't finished yet. // The shutdown isn't reported. + debug_assert!(fake_out_requests.is_empty()); } collection::Event::InboundError { @@ -467,7 +500,7 @@ where response, } => { return Some(Event::Response { - request_id: OutRequestId(substream_id), + request_id: OutRequestId(OutRequestIdInner::Real(substream_id)), response, }); } @@ -723,6 +756,7 @@ where false, Connection { peer_index: None, + fake_out_requests: Vec::new(), user_data, }, ) @@ -750,6 +784,7 @@ where true, Connection { peer_index: Some(peer_index), + fake_out_requests: Vec::new(), user_data, }, ); @@ -856,7 +891,9 @@ where // If substream is closed, try to open it. if matches!(current_state.open, NotificationsOutOpenState::Closed) { - if let Some(connection_id) = self.connection_id_for_peer(peer_id) { + if let ConnectionIdForPeer::Connected(connection_id) = + self.connection_id_for_peer(peer_id) + { let id = DesiredOutNotificationId(self.desired_out_notifications.insert(Some(( peer_index, @@ -1168,6 +1205,7 @@ where /// Panics if `protocol_index` isn't a valid index in [`Config::request_response_protocols`]. /// Panics if there is no open connection with the target. /// + #[track_caller] pub fn start_request( &mut self, target: &PeerId, @@ -1175,13 +1213,30 @@ where request_data: Vec, timeout: TNow, ) -> OutRequestId { - let target_connection_id = self.connection_id_for_peer(target).unwrap(); - OutRequestId(self.inner.start_request( + let target_connection_id = match self.connection_id_for_peer(target) { + ConnectionIdForPeer::Connected(id) => id, + ConnectionIdForPeer::NotConnected => panic!(), // As documented. + ConnectionIdForPeer::ConnectedButShuttingDown(id) => { + // The situation where we are connected to the peer but only through a connection + // that is shutting down is a bit complicated. We can't start a request, as that + // would be invalid, so we generate a so-called "fake request" and when we + // connection later shuts down we report about the request failure. + // Note that we don't immediately report the failure, as that could cause an upper + // layer to immediately try the request again before it has processed the relevant + // `Disconnect` event. + let fake_id = self.next_fake_out_request_id; + self.next_fake_out_request_id += 1; + self.inner[id].fake_out_requests.push(fake_id); + return OutRequestId(OutRequestIdInner::Fake(fake_id)); + } + }; + + OutRequestId(OutRequestIdInner::Real(self.inner.start_request( target_connection_id, protocol_index, request_data, timeout, - )) + ))) } /// Responds to a previously-emitted [`Event::RequestIn`]. @@ -1197,21 +1252,13 @@ where /// Returns `true` if there exists an established connection with the given peer. pub fn has_established_connection(&self, peer_id: &PeerId) -> bool { - let peer_index = match self.peer_indices.get(peer_id) { - Some(idx) => *idx, - None => return false, - }; - - self.connections_by_peer - .range( - (peer_index, ConnectionId::min_value())..=(peer_index, ConnectionId::max_value()), - ) - .any(|(_, connection_id)| { - // Note that connections that are shutting down are still counted, - // as we report the disconnected event only at the end of the - // shutdown. - self.inner.connection_state(*connection_id).established - }) + // Connections that are shutting down are still counted, as we report the disconnected + // event only at the end of the shutdown. + match self.connection_id_for_peer(peer_id) { + ConnectionIdForPeer::Connected(_) + | ConnectionIdForPeer::ConnectedButShuttingDown(_) => true, + ConnectionIdForPeer::NotConnected => false, + } } /// Returns an iterator to the list of [`PeerId`]s that we have an established connection @@ -1248,25 +1295,41 @@ where } /// Picks the connection to use to send requests or notifications to the given peer. - fn connection_id_for_peer(&self, target: &PeerId) -> Option { - let peer_index = *self.peer_indices.get(target)?; + /// + /// This function tries to find a connection that is established and not shutting down. If it + /// can't find any, it instead tries to find a connection that is established but shutting + /// down. While it is technically not possible to send requests or notifications to connections + /// that are shutting down, the API user is still allowed to do so, and thus shouldn't lead to + /// a panic or error. + fn connection_id_for_peer(&self, target: &PeerId) -> ConnectionIdForPeer { + let peer_index = match self.peer_indices.get(target) { + Some(i) => *i, + None => return ConnectionIdForPeer::NotConnected, + }; - if let Some((_, connection_id)) = self - .connections_by_peer - .range( - (peer_index, collection::ConnectionId::min_value()) - ..=(peer_index, collection::ConnectionId::max_value()), - ) - .find(|(_, connection_id)| { - let state = self.inner.connection_state(*connection_id); - // TODO: this is wrong because shutting down connections do not immediately lead to a Disconnect event in the public API, meaning that the user can get a panic despite using the API correctly - state.established && !state.shutting_down - }) - { - return Some(*connection_id); + let mut found_shutting_down = None; + + for (_, connection_id) in self.connections_by_peer.range( + (peer_index, collection::ConnectionId::min_value()) + ..=(peer_index, collection::ConnectionId::max_value()), + ) { + let state = self.inner.connection_state(*connection_id); + if !state.established { + continue; + } + + if !state.shutting_down { + return ConnectionIdForPeer::Connected(*connection_id); + } else { + found_shutting_down = Some(*connection_id); + } } - None + if let Some(found_shutting_down) = found_shutting_down { + ConnectionIdForPeer::ConnectedButShuttingDown(found_shutting_down) + } else { + ConnectionIdForPeer::NotConnected + } } fn peer_index_or_insert(&mut self, peer_id: &PeerId) -> usize { @@ -1335,6 +1398,12 @@ impl ops::IndexMut for Peers { } } +enum ConnectionIdForPeer { + Connected(ConnectionId), + ConnectedButShuttingDown(ConnectionId), + NotConnected, +} + /// See [`Event::DesiredInNotification`]. #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct DesiredInNotificationId(usize); @@ -1349,7 +1418,16 @@ pub struct InRequestId(collection::SubstreamId); /// See [`Peers::start_request`]. #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct OutRequestId(collection::SubstreamId); +pub struct OutRequestId(OutRequestIdInner); + +/// There are two kinds of outgoing requests: "real" ones, and "fake" ones. Fake requests exist for +/// API consistency purposes: when a connection is shutting down and the user starts an outgoing +/// request, we create a fake request then report its failure. +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +enum OutRequestIdInner { + Real(collection::SubstreamId), + Fake(u64), +} /// Event happening over the network. See [`Peers::next_event`]. // TODO: in principle we could return `&PeerId` instead of `PeerId` most of the time, but this causes many borrow checker issues in the upper layer and I'm not motivated enough to deal with that