From f114ea86f0650732658c2feec6679216259057b8 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Sun, 1 Sep 2024 09:33:46 -0500 Subject: [PATCH 01/20] learn own public addr from peer infos and keep it in network_state --- chain/network/src/peer/peer_actor.rs | 12 +++++++++++- chain/network/src/peer_manager/network_state/mod.rs | 5 ++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 9c41692b95f..66fcdbecccc 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -1182,8 +1182,18 @@ impl PeerActor { self.stop(ctx, ClosingReason::Ban(ReasonForBan::Abusive)); } - // Add received peers to the peer store let node_id = self.network_state.config.node_id(); + + // Record our own IP address as observed by the peer. + if self.network_state.my_public_addr.read().is_none() { + if let Some(my_peer_info) = direct_peers.iter().find(|peer_info| peer_info.id == node_id) { + if let Some(addr) = my_peer_info.addr { + let mut my_public_addr = self.network_state.my_public_addr.write(); + *my_public_addr = Some(addr); + } + } + } + // Add received indirect peers to the peer store self.network_state.peer_store.add_indirect_peers( &self.clock, peers.into_iter().filter(|peer_info| peer_info.id != node_id), diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 341d8435a43..815a073fb55 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -38,7 +38,7 @@ use near_primitives::hash::CryptoHash; use near_primitives::network::PeerId; use near_primitives::stateless_validation::chunk_endorsement::ChunkEndorsement; use near_primitives::types::AccountId; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use std::net::SocketAddr; use std::num::NonZeroUsize; use std::sync::atomic::AtomicUsize; @@ -117,6 +117,8 @@ pub(crate) struct NetworkState { pub tier1: connection::Pool, /// Semaphore limiting inflight inbound handshakes. pub inbound_handshake_permits: Arc, + /// The public IP of this node; available after connecting to any one peer. + pub my_public_addr: Arc>>, /// Peer store that provides read/write access to peers. pub peer_store: peer_store::PeerStore, /// Information about state snapshots hosted by network peers. @@ -195,6 +197,7 @@ impl NetworkState { tier2: connection::Pool::new(config.node_id()), tier1: connection::Pool::new(config.node_id()), inbound_handshake_permits: Arc::new(tokio::sync::Semaphore::new(LIMIT_PENDING_PEERS)), + my_public_addr: Arc::new(RwLock::new(None)), peer_store, snapshot_hosts: Arc::new(SnapshotHostsCache::new(config.snapshot_hosts.clone())), connection_store: connection_store::ConnectionStore::new(store.clone()).unwrap(), From 61e101fee85171434787abab86b47291cdcc40d9 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Sun, 8 Sep 2024 17:31:47 -0400 Subject: [PATCH 02/20] add Tier3Handshake PeerMessage variant --- chain/network/src/network_protocol/borsh_conv.rs | 3 +++ chain/network/src/network_protocol/mod.rs | 1 + chain/network/src/network_protocol/network.proto | 16 +++++++--------- .../network_protocol/proto_conv/peer_message.rs | 4 ++++ chain/network/src/rate_limits/messages_limits.rs | 1 + 5 files changed, 16 insertions(+), 9 deletions(-) diff --git a/chain/network/src/network_protocol/borsh_conv.rs b/chain/network/src/network_protocol/borsh_conv.rs index 4ef69a0dc58..cb899f8f896 100644 --- a/chain/network/src/network_protocol/borsh_conv.rs +++ b/chain/network/src/network_protocol/borsh_conv.rs @@ -216,6 +216,9 @@ impl From<&mem::PeerMessage> for net::PeerMessage { panic!("Tier1Handshake is not supported in Borsh encoding") } mem::PeerMessage::Tier2Handshake(h) => net::PeerMessage::Handshake((&h).into()), + mem::PeerMessage::Tier3Handshake(_) => { + panic!("Tier3Handshake is not supported in Borsh encoding") + } mem::PeerMessage::HandshakeFailure(pi, hfr) => { net::PeerMessage::HandshakeFailure(pi, (&hfr).into()) } diff --git a/chain/network/src/network_protocol/mod.rs b/chain/network/src/network_protocol/mod.rs index 6c584219c70..045b9892755 100644 --- a/chain/network/src/network_protocol/mod.rs +++ b/chain/network/src/network_protocol/mod.rs @@ -411,6 +411,7 @@ pub struct Disconnect { pub enum PeerMessage { Tier1Handshake(Handshake), Tier2Handshake(Handshake), + Tier3Handshake(Handshake), HandshakeFailure(PeerInfo, HandshakeFailureReason), /// When a failed nonce is used by some peer, this message is sent back as evidence. LastEdge(Edge), diff --git a/chain/network/src/network_protocol/network.proto b/chain/network/src/network_protocol/network.proto index f60bd202312..cc95644d4d5 100644 --- a/chain/network/src/network_protocol/network.proto +++ b/chain/network/src/network_protocol/network.proto @@ -458,17 +458,15 @@ message PeerMessage { TraceContext trace_context = 26; oneof message_type { - // Handshakes for TIER1 and TIER2 networks are considered separate, - // so that a node binary which doesn't support TIER1 connection won't - // be even able to PARSE the handshake. This way we avoid accidental - // connections, such that one end thinks it is a TIER2 connection and the - // other thinks it is a TIER1 connection. As currently both TIER1 and TIER2 - // connections are handled by the same PeerActor, both fields use the same - // underlying message type. If we ever decide to separate the handshake - // implementations, we can copy the Handshake message type defition and - // make it evolve differently for TIER1 and TIER2. + // Handshakes for different network tiers explicitly use different PeerMessage variants. + // This way we avoid accidental connections, such that one end thinks it is a TIER2 connection + // and the other thinks it is a TIER1 connection. Currently the same PeerActor handles + // all types of connections, hence the contents are identical for all types of connections. + // If we ever decide to separate the handshake implementations, we can copy the Handshake message + // type definition and make it evolve differently for different tiers. Handshake tier1_handshake = 27; Handshake tier2_handshake = 4; + Handshake tier3_handshake = 33; HandshakeFailure handshake_failure = 5; LastEdge last_edge = 6; diff --git a/chain/network/src/network_protocol/proto_conv/peer_message.rs b/chain/network/src/network_protocol/proto_conv/peer_message.rs index 38b4250b15a..b73a66d7966 100644 --- a/chain/network/src/network_protocol/proto_conv/peer_message.rs +++ b/chain/network/src/network_protocol/proto_conv/peer_message.rs @@ -234,6 +234,7 @@ impl From<&PeerMessage> for proto::PeerMessage { message_type: Some(match x { PeerMessage::Tier1Handshake(h) => ProtoMT::Tier1Handshake(h.into()), PeerMessage::Tier2Handshake(h) => ProtoMT::Tier2Handshake(h.into()), + PeerMessage::Tier3Handshake(h) => ProtoMT::Tier3Handshake(h.into()), PeerMessage::HandshakeFailure(pi, hfr) => { ProtoMT::HandshakeFailure((pi, hfr).into()) } @@ -398,6 +399,9 @@ impl TryFrom<&proto::PeerMessage> for PeerMessage { ProtoMT::Tier2Handshake(h) => { PeerMessage::Tier2Handshake(h.try_into().map_err(Self::Error::Handshake)?) } + ProtoMT::Tier3Handshake(h) => { + PeerMessage::Tier3Handshake(h.try_into().map_err(Self::Error::Handshake)?) + } ProtoMT::HandshakeFailure(hf) => { let (pi, hfr) = hf.try_into().map_err(Self::Error::HandshakeFailure)?; PeerMessage::HandshakeFailure(pi, hfr) diff --git a/chain/network/src/rate_limits/messages_limits.rs b/chain/network/src/rate_limits/messages_limits.rs index 08d2d8ea40f..614c85882c1 100644 --- a/chain/network/src/rate_limits/messages_limits.rs +++ b/chain/network/src/rate_limits/messages_limits.rs @@ -239,6 +239,7 @@ fn get_key_and_token_cost(message: &PeerMessage) -> Option<(RateLimitedPeerMessa PeerMessage::VersionedStateResponse(_) => Some((VersionedStateResponse, 1)), PeerMessage::Tier1Handshake(_) | PeerMessage::Tier2Handshake(_) + | PeerMessage::Tier3Handshake(_) | PeerMessage::HandshakeFailure(_, _) | PeerMessage::LastEdge(_) | PeerMessage::Disconnect(_) From 0530d27ddd3bb86e66ab7a0ffadd6701bcbe6c63 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Mon, 9 Sep 2024 10:57:30 -0400 Subject: [PATCH 03/20] add Tier3 connection pool --- chain/network/src/peer/peer_actor.rs | 21 ++++++++++++++---- .../src/peer_manager/network_state/mod.rs | 22 +++++++++++++++++++ .../src/peer_manager/network_state/routing.rs | 5 +++++ chain/network/src/tcp.rs | 5 +++++ 4 files changed, 49 insertions(+), 4 deletions(-) diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 66fcdbecccc..efd0777cbd8 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -284,6 +284,18 @@ impl PeerActor { .start_outbound(peer_id.clone()) .map_err(ClosingReason::OutboundNotAllowed)? } + tcp::Tier::T3 => { + // Loop connections are allowed only on T1; see comment above + if peer_id == &network_state.config.node_id() { + return Err(ClosingReason::OutboundNotAllowed( + connection::PoolError::UnexpectedLoopConnection, + )); + } + network_state + .tier3 + .start_outbound(peer_id.clone()) + .map_err(ClosingReason::OutboundNotAllowed)? + } }, handshake_spec: HandshakeSpec { partial_edge_info: network_state.propose_edge(&clock, peer_id, None), @@ -293,10 +305,10 @@ impl PeerActor { }, }, }; - // Override force_encoding for outbound Tier1 connections, - // since Tier1Handshake is supported only with proto encoding. + // Override force_encoding for outbound Tier1 and Tier3 connections; + // Tier1Handshake and Tier3Handshake are supported only with proto encoding. let force_encoding = match &stream.type_ { - tcp::StreamType::Outbound { tier, .. } if tier == &tcp::Tier::T1 => { + tcp::StreamType::Outbound { tier, .. } if tier == &tcp::Tier::T1 || tier == &tcp::Tier::T3 => { Some(Encoding::Proto) } _ => force_encoding, @@ -480,6 +492,7 @@ impl PeerActor { let msg = match spec.tier { tcp::Tier::T1 => PeerMessage::Tier1Handshake(handshake), tcp::Tier::T2 => PeerMessage::Tier2Handshake(handshake), + tcp::Tier::T3 => PeerMessage::Tier3Handshake(handshake), }; self.send_message_or_log(&msg); } @@ -1140,7 +1153,7 @@ impl PeerActor { self.stop(ctx, ClosingReason::DisconnectMessage); } - PeerMessage::Tier1Handshake(_) | PeerMessage::Tier2Handshake(_) => { + PeerMessage::Tier1Handshake(_) | PeerMessage::Tier2Handshake(_) | PeerMessage::Tier3Handshake(_) => { // Received handshake after already have seen handshake from this peer. tracing::debug!(target: "network", "Duplicate handshake from {}", self.peer_info); } diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 815a073fb55..b9571577679 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -115,6 +115,7 @@ pub(crate) struct NetworkState { /// Connected peers (inbound and outbound) with their full peer information. pub tier2: connection::Pool, pub tier1: connection::Pool, + pub tier3: connection::Pool, /// Semaphore limiting inflight inbound handshakes. pub inbound_handshake_permits: Arc, /// The public IP of this node; available after connecting to any one peer. @@ -196,6 +197,7 @@ impl NetworkState { chain_info: Default::default(), tier2: connection::Pool::new(config.node_id()), tier1: connection::Pool::new(config.node_id()), + tier3: connection::Pool::new(config.node_id()), inbound_handshake_permits: Arc::new(tokio::sync::Semaphore::new(LIMIT_PENDING_PEERS)), my_public_addr: Arc::new(RwLock::new(None)), peer_store, @@ -352,6 +354,15 @@ impl NetworkState { // Write to the peer store this.peer_store.peer_connected(&clock, peer_info); } + tcp::Tier::T3 => { + if conn.peer_type == PeerType::Inbound { + // TODO: check that we are expecting this Tier3 connection + } + if !edge.verify() { + return Err(RegisterPeerError::InvalidEdge); + } + this.tier3.insert_ready(conn).map_err(RegisterPeerError::PoolError)?; + } } Ok(()) }).await.unwrap() @@ -561,6 +572,17 @@ impl NetworkState { } } } + tcp::Tier::T3 => { + let peer_id = match &msg.target { + PeerIdOrHash::Hash(_) => { + // There is no route back cache for TIER3 as all connections are direct + debug_assert!(false); + return false; + } + PeerIdOrHash::PeerId(peer_id) => peer_id.clone(), + }; + return self.tier3.send_message(peer_id, Arc::new(PeerMessage::Routed(msg))); + } } } diff --git a/chain/network/src/peer_manager/network_state/routing.rs b/chain/network/src/peer_manager/network_state/routing.rs index 0fe045fcdad..ccbf28c7f3f 100644 --- a/chain/network/src/peer_manager/network_state/routing.rs +++ b/chain/network/src/peer_manager/network_state/routing.rs @@ -210,9 +210,14 @@ impl NetworkState { tracing::trace!(target: "network", route_back = ?msg.clone(), "Received peer message that requires response"); let from = &conn.peer_info.id; + match conn.tier { tcp::Tier::T1 => self.tier1_route_back.lock().insert(&clock, msg.hash(), from.clone()), tcp::Tier::T2 => self.tier2_route_back.lock().insert(&clock, msg.hash(), from.clone()), + tcp::Tier::T3 => { + // TIER3 connections are direct by design; no routing is performed + debug_assert!(false) + } } } diff --git a/chain/network/src/tcp.rs b/chain/network/src/tcp.rs index 06ba01a5033..5e5e78a7a42 100644 --- a/chain/network/src/tcp.rs +++ b/chain/network/src/tcp.rs @@ -26,6 +26,11 @@ pub enum Tier { /// consensus messages. Also, Tier1 peer discovery actually happens on Tier2 network, i.e. /// Tier2 network is necessary to bootstrap Tier1 connections. T2, + /// Tier3 connections are created ad hoc to directly transfer large messages, e.g. state parts. + /// Requests for state parts are routed over Tier2. A node receiving such a request initiates a + /// direct Tier3 connections to send the response. By sending large responses over dedicated + /// connections we avoid delaying other messages and we minimize network bandwidth usage. + T3, } #[derive(Clone, Debug)] From d7535b918beeb43968e699a20d6ce3d683942b1f Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Mon, 9 Sep 2024 16:11:26 -0400 Subject: [PATCH 04/20] fix test connection_pool::invalid_edge --- chain/network/src/peer/peer_actor.rs | 3 +++ chain/network/src/peer_manager/tests/connection_pool.rs | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index efd0777cbd8..9991ac34558 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -952,6 +952,9 @@ impl PeerActor { (PeerStatus::Connecting { .. }, PeerMessage::Tier2Handshake(msg)) => { self.process_handshake(ctx, tcp::Tier::T2, msg) } + (PeerStatus::Connecting { .. }, PeerMessage::Tier3Handshake(msg)) => { + self.process_handshake(ctx, tcp::Tier::T3, msg) + } (_, msg) => { tracing::warn!(target:"network","unexpected message during handshake: {}",msg) } diff --git a/chain/network/src/peer_manager/tests/connection_pool.rs b/chain/network/src/peer_manager/tests/connection_pool.rs index 79e00807f20..15da55a57aa 100644 --- a/chain/network/src/peer_manager/tests/connection_pool.rs +++ b/chain/network/src/peer_manager/tests/connection_pool.rs @@ -273,7 +273,7 @@ async fn invalid_edge() { ]; for (name, edge) in &testcases { - for tier in [tcp::Tier::T1, tcp::Tier::T2] { + for tier in [tcp::Tier::T1, tcp::Tier::T2, tcp::Tier::T3] { tracing::info!(target:"test","{name} {tier:?}"); let stream = tcp::Stream::connect(&pm.peer_info(), tier, &SocketOptions::default()) .await @@ -303,6 +303,7 @@ async fn invalid_edge() { let handshake = match tier { tcp::Tier::T1 => PeerMessage::Tier1Handshake(handshake), tcp::Tier::T2 => PeerMessage::Tier2Handshake(handshake), + tcp::Tier::T3 => PeerMessage::Tier3Handshake(handshake), }; stream.write(&handshake).await; let reason = events From a7f8b833228831426d9a687f9baa986ded750f07 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Mon, 9 Sep 2024 18:14:06 -0400 Subject: [PATCH 05/20] add new routed message StatePartRequest --- chain/network/src/network_protocol/mod.rs | 2 ++ .../src/network_protocol/state_sync.rs | 23 +++++++++++++++++++ .../src/rate_limits/messages_limits.rs | 1 + 3 files changed, 26 insertions(+) diff --git a/chain/network/src/network_protocol/mod.rs b/chain/network/src/network_protocol/mod.rs index 045b9892755..a2a2c28b0e2 100644 --- a/chain/network/src/network_protocol/mod.rs +++ b/chain/network/src/network_protocol/mod.rs @@ -553,6 +553,7 @@ pub enum RoutedMessageBody { VersionedChunkEndorsement(ChunkEndorsement), EpochSyncRequest, EpochSyncResponse(CompressedEpochSyncProof), + StatePartRequest(StatePartRequest), } impl RoutedMessageBody { @@ -646,6 +647,7 @@ impl fmt::Debug for RoutedMessageBody { RoutedMessageBody::EpochSyncResponse(_) => { write!(f, "EpochSyncResponse") } + RoutedMessageBody::StatePartRequest(_) => write!(f, "StatePartRequest"), } } } diff --git a/chain/network/src/network_protocol/state_sync.rs b/chain/network/src/network_protocol/state_sync.rs index d30170b83d5..c500893f3f4 100644 --- a/chain/network/src/network_protocol/state_sync.rs +++ b/chain/network/src/network_protocol/state_sync.rs @@ -107,3 +107,26 @@ pub enum SnapshotHostInfoVerificationError { )] TooManyShards(usize), } + +/// Message used to request a state part. +/// +#[derive( + Clone, + Debug, + Eq, + PartialEq, + Hash, + borsh::BorshSerialize, + borsh::BorshDeserialize, + ProtocolSchema, +)] +pub struct StatePartRequest { + /// Requested shard id + pub shard_id: ShardId, + /// Hash of the requested snapshot's state root + pub sync_hash: CryptoHash, + /// Requested part id + pub part_id: u64, + /// Public address of the node making the request + pub addr: std::net::SocketAddr, +} diff --git a/chain/network/src/rate_limits/messages_limits.rs b/chain/network/src/rate_limits/messages_limits.rs index 614c85882c1..54638a829c3 100644 --- a/chain/network/src/rate_limits/messages_limits.rs +++ b/chain/network/src/rate_limits/messages_limits.rs @@ -220,6 +220,7 @@ fn get_key_and_token_cost(message: &PeerMessage) -> Option<(RateLimitedPeerMessa RoutedMessageBody::VersionedChunkEndorsement(_) => Some((ChunkEndorsement, 1)), RoutedMessageBody::EpochSyncRequest => None, RoutedMessageBody::EpochSyncResponse(_) => None, + RoutedMessageBody::StatePartRequest(_) => None, // TODO RoutedMessageBody::Ping(_) | RoutedMessageBody::Pong(_) | RoutedMessageBody::_UnusedChunkStateWitness From aad734884d62ae9ddf72ef4a711ad2cb78ca72d1 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Tue, 10 Sep 2024 19:40:49 -0400 Subject: [PATCH 06/20] enable snapshot generation by default --- core/store/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/store/src/config.rs b/core/store/src/config.rs index 38e551d58cb..f7c155ab32d 100644 --- a/core/store/src/config.rs +++ b/core/store/src/config.rs @@ -112,10 +112,10 @@ pub struct StateSnapshotConfig { pub enum StateSnapshotType { /// Consider this as the default "disabled" option. We need to have snapshotting enabled for resharding /// State snapshots involve filesystem operations and costly IO operations. - #[default] ForReshardingOnly, /// This is the "enabled" option where we create a snapshot at the beginning of every epoch. /// Needed if a node wants to be able to respond to state part requests. + #[default] EveryEpoch, } From 3c7eb0907c6eb481370c1d3967d9d12860ab46ab Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Tue, 10 Sep 2024 19:41:29 -0400 Subject: [PATCH 07/20] send state part request over new routed msg --- .../src/peer_manager/peer_manager_actor.rs | 38 +++++++++++++++---- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 6e04e871203..0dff0c7ad12 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -4,7 +4,7 @@ use crate::debug::{DebugStatus, GetDebugStatus}; use crate::network_protocol; use crate::network_protocol::SyncSnapshotHosts; use crate::network_protocol::{ - Disconnect, Edge, PeerIdOrHash, PeerMessage, Ping, Pong, RawRoutedMessage, RoutedMessageBody, + Disconnect, Edge, PeerIdOrHash, PeerMessage, Ping, Pong, RawRoutedMessage, RoutedMessageBody, StatePartRequest }; use crate::peer::peer_actor::PeerActor; use crate::peer_manager::connection; @@ -789,13 +789,35 @@ impl PeerManagerActor { } } NetworkRequests::StateRequestPart { shard_id, sync_hash, part_id, peer_id } => { - if self.state.tier2.send_message( - peer_id, - Arc::new(PeerMessage::StateRequestPart(shard_id, sync_hash, part_id)), - ) { - NetworkResponses::NoResponse - } else { - NetworkResponses::RouteNotFound + // TODO: send over Tier1 when applicable + match *self.state.my_public_addr.read() { + Some(addr) => { + if self.state.send_message_to_peer( + &self.clock, + tcp::Tier::T2, + self.state.sign_message( + &self.clock, + RawRoutedMessage { + target: PeerIdOrHash::PeerId(peer_id), + body: RoutedMessageBody::StatePartRequest(StatePartRequest{ + shard_id, + sync_hash, + part_id, + addr, + }) + } + ), + ) { + NetworkResponses::NoResponse + } else { + NetworkResponses::RouteNotFound + } + } + None => { + // The node needs to know its own public address + // to solicit a response via Tier3 + NetworkResponses::RouteNotFound + } } } NetworkRequests::SnapshotHostInfo { sync_hash, epoch_height, mut shards } => { From 3bc6c0a6232438c97af128eb24a9fdc056c1eee8 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Wed, 11 Sep 2024 14:29:56 -0400 Subject: [PATCH 08/20] add tier3_requests queue --- .../src/peer_manager/network_state/mod.rs | 21 ++++++++++++++++++- chain/network/src/types.rs | 16 ++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index b9571577679..53768ec8a82 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -28,7 +28,7 @@ use crate::state_witness::{ use crate::stats::metrics; use crate::store; use crate::tcp; -use crate::types::{ChainInfo, PeerType, ReasonForBan}; +use crate::types::{ChainInfo, PeerType, ReasonForBan, Tier3Request, Tier3RequestBody}; use anyhow::Context; use arc_swap::ArcSwap; use near_async::messaging::{CanSend, SendAsync, Sender}; @@ -39,6 +39,7 @@ use near_primitives::network::PeerId; use near_primitives::stateless_validation::chunk_endorsement::ChunkEndorsement; use near_primitives::types::AccountId; use parking_lot::{Mutex, RwLock}; +use std::collections::VecDeque; use std::net::SocketAddr; use std::num::NonZeroUsize; use std::sync::atomic::AtomicUsize; @@ -146,6 +147,9 @@ pub(crate) struct NetworkState { /// TODO(gprusak): consider removing it altogether. pub tier1_route_back: Mutex, + /// Queue of received requests to which a response should be made over TIER3. + pub tier3_requests: Mutex>, + /// Shared counter across all PeerActors, which counts number of `RoutedMessageBody::ForwardTx` /// messages sincce last block. pub txns_since_last_block: AtomicUsize, @@ -208,6 +212,7 @@ impl NetworkState { account_announcements: Arc::new(AnnounceAccountCache::new(store)), tier2_route_back: Mutex::new(RouteBackCache::default()), tier1_route_back: Mutex::new(RouteBackCache::default()), + tier3_requests: Mutex::new(VecDeque::::new()), recent_routed_messages: Mutex::new(lru::LruCache::new( NonZeroUsize::new(RECENT_ROUTED_MESSAGES_CACHE_SIZE).unwrap(), )), @@ -768,6 +773,20 @@ impl NetworkState { self.client.send(EpochSyncResponseMessage { from_peer: peer_id, proof }); None } + RoutedMessageBody::StatePartRequest(request) => { + // TODO: cap the size of this queue, + // perhaps preferentially allowing requests made by validators + self.tier3_requests.lock().push_back(Tier3Request { + peer_id, + addr: request.addr, + body: Tier3RequestBody::StatePartRequest ( + request.shard_id, + request.sync_hash, + request.part_id, + ), + }); + None + } body => { tracing::error!(target: "network", "Peer received unexpected message type: {:?}", body); None diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 3ddcbbcc067..e0b7c638f6a 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -498,3 +498,19 @@ pub struct AccountIdOrPeerTrackingShard { /// Only send messages to peers whose latest chain height is no less `min_height` pub min_height: BlockHeight, } + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +/// An inbound request to which a response should be sent over Tier3 +pub struct Tier3Request { + /// Target peer to send the response to + pub peer_id: PeerId, + /// Public address provided by the target peer + pub addr: std::net::SocketAddr, + /// Contents of the request + pub body: Tier3RequestBody, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum Tier3RequestBody { + StatePartRequest(ShardId, CryptoHash, u64), +} From bfd99f651571f16a4209c2f624d5ced405f0435b Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Wed, 11 Sep 2024 17:40:41 -0400 Subject: [PATCH 09/20] implement response via tier3 --- .../src/peer_manager/network_state/mod.rs | 7 +- .../src/peer_manager/peer_manager_actor.rs | 72 ++++++++++++++++++- chain/network/src/types.rs | 4 +- 3 files changed, 76 insertions(+), 7 deletions(-) diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 53768ec8a82..4f238f9b681 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -777,8 +777,11 @@ impl NetworkState { // TODO: cap the size of this queue, // perhaps preferentially allowing requests made by validators self.tier3_requests.lock().push_back(Tier3Request { - peer_id, - addr: request.addr, + peer_info: PeerInfo { + id: peer_id, + addr: Some(request.addr), + account_id: None, + }, body: Tier3RequestBody::StatePartRequest ( request.shard_id, request.sync_hash, diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 0dff0c7ad12..398f40d5849 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -1,4 +1,4 @@ -use crate::client::{ClientSenderForNetwork, SetNetworkInfo}; +use crate::client::{ClientSenderForNetwork, SetNetworkInfo, StateRequestPart}; use crate::config; use crate::debug::{DebugStatus, GetDebugStatus}; use crate::network_protocol; @@ -18,7 +18,7 @@ use crate::tcp; use crate::types::{ ConnectedPeerInfo, HighestHeightPeerInfo, KnownProducer, NetworkInfo, NetworkRequests, NetworkResponses, PeerInfo, PeerManagerMessageRequest, PeerManagerMessageResponse, PeerType, - SetChainInfo, SnapshotHostInfo, + SetChainInfo, SnapshotHostInfo, Tier3RequestBody }; use ::time::ext::InstantExt as _; use actix::fut::future::wrap_future; @@ -77,6 +77,10 @@ const UNRELIABLE_PEER_HORIZON: u64 = 60; /// Due to implementation limits of `Graph` in `near-network`, we support up to 128 client. pub const MAX_TIER2_PEERS: usize = 128; +/// In Tier3 we serve requests over outbound connections and receive responses over inbound. +/// The number of concurrent outbound connections is limited to conserve the resources of the node. +pub const MAX_TIER3_OUTBOUND: usize = 5; + /// When picking a peer to connect to, we'll pick from the 'safer peers' /// (a.k.a. ones that we've been connected to in the past) with these odds. /// Otherwise, we'd pick any peer that we've heard about. @@ -86,6 +90,8 @@ const PREFER_PREVIOUSLY_CONNECTED_PEER: f64 = 0.6; pub(crate) const UPDATE_CONNECTION_STORE_INTERVAL: time::Duration = time::Duration::minutes(1); /// How often to poll the NetworkState for closed connections we'd like to re-establish. pub(crate) const POLL_CONNECTION_STORE_INTERVAL: time::Duration = time::Duration::minutes(1); +/// How often we should check for pending Tier3 requests +const PROCESS_TIER3_REQUESTS_INTERVAL: time::Duration = time::Duration::seconds(15); /// Actor that manages peers connections. pub struct PeerManagerActor { @@ -338,6 +344,68 @@ impl PeerManagerActor { } } }); + // Periodically process pending Tier3 requests. + arbiter.spawn({ + let clock = clock.clone(); + let state = state.clone(); + let arbiter = arbiter.clone(); + let mut interval = time::Interval::new(clock.now(), PROCESS_TIER3_REQUESTS_INTERVAL); + async move { + loop { + interval.tick(&clock).await; + + let tier3 = state.tier3.load(); + + let mut potential_outbound_connections = + tier3.ready.values().filter(|peer| peer.peer_type == PeerType::Outbound).count() + + tier3.outbound_handshakes.len(); + + while potential_outbound_connections < MAX_TIER3_OUTBOUND { + if let Some(request) = state.tier3_requests.lock().pop_front() { + // TODO: handle the case in which we already have + // a connection to the requesting node here + arbiter.spawn({ + let clock = clock.clone(); + let state = state.clone(); + async move { + let result = async { + let stream = tcp::Stream::connect( + &request.peer_info, + tcp::Tier::T2, + &state.config.socket_options + ).await.context("tcp::Stream::connect()")?; + PeerActor::spawn_and_handshake(clock.clone(),stream,None,state.clone()).await.context("PeerActor::spawn()")?; + anyhow::Ok(()) + }.await; + + if let Err(ref err) = result { + tracing::info!(target: "network", err = format!("{:#}", err), "failed to connect to {}", request.peer_info); + } else { + if let Some(response) = match request.body { + Tier3RequestBody::StatePartRequest(shard_id, sync_hash, part_id) => { + state + .client + .send_async(StateRequestPart { shard_id, sync_hash, part_id }) + .await + .ok() + .flatten() + .map(|response| PeerMessage::VersionedStateResponse(*response.0)) + } + } { + state.tier3.send_message(request.peer_info.id, Arc::new(response)); + } + } + } + }); + + potential_outbound_connections += 1; + } else { + break; + } + } + } + } + }); } }); Ok(Self::start_in_arbiter(&arbiter, move |_ctx| Self { diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index e0b7c638f6a..67c057e610d 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -503,9 +503,7 @@ pub struct AccountIdOrPeerTrackingShard { /// An inbound request to which a response should be sent over Tier3 pub struct Tier3Request { /// Target peer to send the response to - pub peer_id: PeerId, - /// Public address provided by the target peer - pub addr: std::net::SocketAddr, + pub peer_info: PeerInfo, /// Contents of the request pub body: Tier3RequestBody, } From 436cd6acc16a41185fa6aa13fee568b5dd5824d6 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Thu, 12 Sep 2024 12:51:47 -0400 Subject: [PATCH 10/20] fix allowed messages --- chain/network/src/peer_manager/connection/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/chain/network/src/peer_manager/connection/mod.rs b/chain/network/src/peer_manager/connection/mod.rs index 7bbc50f23f7..62b866bc30d 100644 --- a/chain/network/src/peer_manager/connection/mod.rs +++ b/chain/network/src/peer_manager/connection/mod.rs @@ -36,8 +36,10 @@ impl tcp::Tier { match msg { PeerMessage::Tier1Handshake(_) => self == tcp::Tier::T1, PeerMessage::Tier2Handshake(_) => self == tcp::Tier::T2, + PeerMessage::Tier3Handshake(_) => self == tcp::Tier::T3, PeerMessage::HandshakeFailure(_, _) => true, PeerMessage::LastEdge(_) => true, + PeerMessage::VersionedStateResponse(_) => self == tcp::Tier::T2 || self == tcp::Tier::T3, PeerMessage::Routed(msg) => self.is_allowed_routed(&msg.body), _ => self == tcp::Tier::T2, } From 8d1106366440bde8301d65412524a43beb9e3dd9 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Thu, 12 Sep 2024 12:53:37 -0400 Subject: [PATCH 11/20] bugfix tier3 init --- chain/network/src/peer_manager/peer_manager_actor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 398f40d5849..6849828ef93 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -371,7 +371,7 @@ impl PeerManagerActor { let result = async { let stream = tcp::Stream::connect( &request.peer_info, - tcp::Tier::T2, + tcp::Tier::T3, &state.config.socket_options ).await.context("tcp::Stream::connect()")?; PeerActor::spawn_and_handshake(clock.clone(),stream,None,state.clone()).await.context("PeerActor::spawn()")?; From 02a8bb2d36d0645971071446f5de04eaa8fec27b Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Fri, 13 Sep 2024 14:00:02 -0400 Subject: [PATCH 12/20] successfully handle multiple requests from same peer --- .../src/peer_manager/peer_manager_actor.rs | 94 ++++++++----------- 1 file changed, 41 insertions(+), 53 deletions(-) diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 6849828ef93..cfbc7216edb 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -77,10 +77,6 @@ const UNRELIABLE_PEER_HORIZON: u64 = 60; /// Due to implementation limits of `Graph` in `near-network`, we support up to 128 client. pub const MAX_TIER2_PEERS: usize = 128; -/// In Tier3 we serve requests over outbound connections and receive responses over inbound. -/// The number of concurrent outbound connections is limited to conserve the resources of the node. -pub const MAX_TIER3_OUTBOUND: usize = 5; - /// When picking a peer to connect to, we'll pick from the 'safer peers' /// (a.k.a. ones that we've been connected to in the past) with these odds. /// Otherwise, we'd pick any peer that we've heard about. @@ -91,7 +87,7 @@ pub(crate) const UPDATE_CONNECTION_STORE_INTERVAL: time::Duration = time::Durati /// How often to poll the NetworkState for closed connections we'd like to re-establish. pub(crate) const POLL_CONNECTION_STORE_INTERVAL: time::Duration = time::Duration::minutes(1); /// How often we should check for pending Tier3 requests -const PROCESS_TIER3_REQUESTS_INTERVAL: time::Duration = time::Duration::seconds(15); +const PROCESS_TIER3_REQUESTS_INTERVAL: time::Duration = time::Duration::seconds(2); /// Actor that manages peers connections. pub struct PeerManagerActor { @@ -354,54 +350,47 @@ impl PeerManagerActor { loop { interval.tick(&clock).await; - let tier3 = state.tier3.load(); - - let mut potential_outbound_connections = - tier3.ready.values().filter(|peer| peer.peer_type == PeerType::Outbound).count() - + tier3.outbound_handshakes.len(); - - while potential_outbound_connections < MAX_TIER3_OUTBOUND { - if let Some(request) = state.tier3_requests.lock().pop_front() { - // TODO: handle the case in which we already have - // a connection to the requesting node here - arbiter.spawn({ - let clock = clock.clone(); - let state = state.clone(); - async move { - let result = async { - let stream = tcp::Stream::connect( - &request.peer_info, - tcp::Tier::T3, - &state.config.socket_options - ).await.context("tcp::Stream::connect()")?; - PeerActor::spawn_and_handshake(clock.clone(),stream,None,state.clone()).await.context("PeerActor::spawn()")?; - anyhow::Ok(()) - }.await; - - if let Err(ref err) = result { - tracing::info!(target: "network", err = format!("{:#}", err), "failed to connect to {}", request.peer_info); - } else { - if let Some(response) = match request.body { - Tier3RequestBody::StatePartRequest(shard_id, sync_hash, part_id) => { - state - .client - .send_async(StateRequestPart { shard_id, sync_hash, part_id }) - .await - .ok() - .flatten() - .map(|response| PeerMessage::VersionedStateResponse(*response.0)) - } - } { - state.tier3.send_message(request.peer_info.id, Arc::new(response)); + // TODO: consider where exactly to throttle these + if let Some(request) = state.tier3_requests.lock().pop_front() { + tracing::info!(target: "db", "seving request from {}", request.peer_info); + arbiter.spawn({ + let clock = clock.clone(); + let state = state.clone(); + async move { + if let Some(response) = match request.body { + Tier3RequestBody::StatePartRequest(shard_id, sync_hash, part_id) => { + state + .client + .send_async(StateRequestPart { shard_id, sync_hash, part_id }) + .await + .ok() + .flatten() + .map(|response| PeerMessage::VersionedStateResponse(*response.0)) + } + } { + if !state.tier3.load().ready.contains_key(&request.peer_info.id) { + let result = async { + let stream = tcp::Stream::connect( + &request.peer_info, + tcp::Tier::T3, + &state.config.socket_options + ).await.context("tcp::Stream::connect()")?; + PeerActor::spawn_and_handshake(clock.clone(),stream,None,state.clone()).await.context("PeerActor::spawn()")?; + anyhow::Ok(()) + }.await; + + if let Err(ref err) = result { + tracing::info!(target: "network", err = format!("{:#}", err), "failed to connect to {}", request.peer_info); } } - } - }); - potential_outbound_connections += 1; - } else { - break; - } + state.tier3.send_message(request.peer_info.id, Arc::new(response)); + } + else { + tracing::debug!(target: "network", "failed to produce response to request {:?}", request); + } + } + }); } } } @@ -857,7 +846,6 @@ impl PeerManagerActor { } } NetworkRequests::StateRequestPart { shard_id, sync_hash, part_id, peer_id } => { - // TODO: send over Tier1 when applicable match *self.state.my_public_addr.read() { Some(addr) => { if self.state.send_message_to_peer( @@ -882,8 +870,8 @@ impl PeerManagerActor { } } None => { - // The node needs to know its own public address - // to solicit a response via Tier3 + // The node needs to include its own public address in the request + // so that the reponse can be sent over Tier3 NetworkResponses::RouteNotFound } } From 016650d8d44edf71d48bf5785851c699f453d463 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Sat, 14 Sep 2024 16:47:55 -0400 Subject: [PATCH 13/20] use SnapshotHostsCache for peer selection --- chain/client/src/sync/state.rs | 57 ++++++++++++------- .../src/peer_manager/peer_manager_actor.rs | 36 +++++++----- chain/network/src/snapshot_hosts/mod.rs | 37 ++++++------ chain/network/src/types.rs | 7 ++- 4 files changed, 81 insertions(+), 56 deletions(-) diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index 0344ab89d5e..198a59d2901 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -675,24 +675,38 @@ impl StateSync { for ((part_id, download), target) in parts_to_fetch(new_shard_sync_download).zip(possible_targets_sampler) { - sent_request_part( - self.clock.clone(), - target.clone(), - part_id, - shard_id, - sync_hash, - last_part_id_requested, - requested_target, - self.timeout, - ); - request_part_from_peers( - part_id, - target, - download, - shard_id, - sync_hash, - &self.network_adapter, - ); + // In flat storage snapshots are indexed according to the hash of the second to + // last block in the epoch. When performing state sync the sync hash is defined + // as the hash of the first block after the epoch. The request sent to the + // network adapater needs to include the sync_prev_prev_hash so that a peer + // hosting the correct snapshot can be selected. + // TODO(saketh): consider whether we can unify things in a cleaner way + if let Ok(header) = chain.get_block_header(&sync_hash) { + if let Ok(prev_header) = chain.get_block_header(&header.prev_hash()) { + let sync_prev_prev_hash = prev_header.prev_hash(); + + sent_request_part( + self.clock.clone(), + target.clone(), + part_id, + shard_id, + sync_hash, + last_part_id_requested, + requested_target, + self.timeout, + ); + + request_part_from_peers( + part_id, + target, + download, + shard_id, + sync_hash, + *sync_prev_prev_hash, + &self.network_adapter, + ); + } + } } } StateSyncInner::External { chain_id, semaphore, external } => { @@ -1304,10 +1318,12 @@ fn request_part_from_peers( download: &mut DownloadStatus, shard_id: ShardId, sync_hash: CryptoHash, + sync_prev_prev_hash: CryptoHash, network_adapter: &PeerManagerAdapter, ) { download.run_me.store(false, Ordering::SeqCst); download.state_requests_count += 1; + // TODO(saketh): clean this up now that targets are picked in peer manager download.last_target = Some(peer_id.clone()); let run_me = download.run_me.clone(); @@ -1315,12 +1331,9 @@ fn request_part_from_peers( "StateSync", network_adapter .send_async(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::StateRequestPart { shard_id, sync_hash, part_id, peer_id }, + NetworkRequests::StateRequestPart { shard_id, sync_hash, sync_prev_prev_hash, part_id }, )) .then(move |result| { - // TODO: possible optimization - in the current code, even if one of the targets it not present in the network graph - // (so we keep getting RouteNotFound) - we'll still keep trying to assign parts to it. - // Fortunately only once every 60 seconds (timeout value). if let Ok(NetworkResponses::RouteNotFound) = result.map(|f| f.as_network_response()) { // Send a StateRequestPart on the next iteration diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index cfbc7216edb..972b1ff08d4 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -352,7 +352,6 @@ impl PeerManagerActor { // TODO: consider where exactly to throttle these if let Some(request) = state.tier3_requests.lock().pop_front() { - tracing::info!(target: "db", "seving request from {}", request.peer_info); arbiter.spawn({ let clock = clock.clone(); let state = state.clone(); @@ -845,10 +844,19 @@ impl PeerManagerActor { NetworkResponses::RouteNotFound } } - NetworkRequests::StateRequestPart { shard_id, sync_hash, part_id, peer_id } => { - match *self.state.my_public_addr.read() { - Some(addr) => { - if self.state.send_message_to_peer( + NetworkRequests::StateRequestPart { shard_id, sync_hash, sync_prev_prev_hash, part_id } => { + let mut success = false; + + // The node needs to include its own public address in the request + // so that the reponse can be sent over Tier3 + if let Some(addr) = *self.state.my_public_addr.read() { + if let Some(peer_id) = self.state.snapshot_hosts.select_host( + &sync_prev_prev_hash, + shard_id, + part_id + ) { + tracing::info!(target: "db", "sending request for {} {} to {}", shard_id, part_id, peer_id); + success = self.state.send_message_to_peer( &self.clock, tcp::Tier::T2, self.state.sign_message( @@ -863,18 +871,18 @@ impl PeerManagerActor { }) } ), - ) { - NetworkResponses::NoResponse - } else { - NetworkResponses::RouteNotFound - } + ); } - None => { - // The node needs to include its own public address in the request - // so that the reponse can be sent over Tier3 - NetworkResponses::RouteNotFound + else { + tracing::debug!(target: "network", "no hosts available for {shard_id}, {sync_prev_prev_hash}"); } } + + if success { + NetworkResponses::NoResponse + } else { + NetworkResponses::RouteNotFound + } } NetworkRequests::SnapshotHostInfo { sync_hash, epoch_height, mut shards } => { if shards.len() > MAX_SHARDS_PER_SNAPSHOT_HOST_INFO { diff --git a/chain/network/src/snapshot_hosts/mod.rs b/chain/network/src/snapshot_hosts/mod.rs index 59362401090..293ba675ca3 100644 --- a/chain/network/src/snapshot_hosts/mod.rs +++ b/chain/network/src/snapshot_hosts/mod.rs @@ -10,7 +10,6 @@ use crate::network_protocol::SnapshotHostInfoVerificationError; use lru::LruCache; use near_primitives::hash::CryptoHash; use near_primitives::network::PeerId; -use near_primitives::state_part::PartId; use near_primitives::types::ShardId; use parking_lot::Mutex; use rayon::iter::ParallelBridge; @@ -42,10 +41,10 @@ pub struct Config { pub part_selection_cache_batch_size: u32, } -pub(crate) fn priority_score(peer_id: &PeerId, part_id: &PartId) -> [u8; 32] { +pub(crate) fn priority_score(peer_id: &PeerId, part_id: u64) -> [u8; 32] { let mut h = Sha256::new(); h.update(peer_id.public_key().key_data()); - h.update(part_id.idx.to_le_bytes()); + h.update(part_id.to_le_bytes()); h.finalize().into() } @@ -145,23 +144,23 @@ struct PeerSelector { } impl PeerSelector { - fn next(&mut self, part_id: &PartId) -> Option { - self.selectors.entry(part_id.idx).or_default().next() + fn next(&mut self, part_id: u64) -> Option { + self.selectors.entry(part_id).or_default().next() } - fn len(&self, part_id: &PartId) -> usize { - match self.selectors.get(&part_id.idx) { + fn len(&self, part_id: u64) -> usize { + match self.selectors.get(&part_id) { Some(s) => s.len(), None => 0, } } - fn insert_peers>(&mut self, part_id: &PartId, peers: T) { - self.selectors.entry(part_id.idx).or_default().insert_peers(peers); + fn insert_peers>(&mut self, part_id: u64, peers: T) { + self.selectors.entry(part_id).or_default().insert_peers(peers); } - fn seen_peers(&self, part_id: &PartId) -> HashSet { - match self.selectors.get(&part_id.idx) { + fn seen_peers(&self, part_id: u64) -> HashSet { + match self.selectors.get(&part_id) { Some(s) => { let mut ret = HashSet::new(); for p in s.peers.iter() { @@ -174,15 +173,15 @@ impl PeerSelector { } // have we already returned every peer we know about? - fn tried_everybody(&self, part_id: &PartId) -> bool { - match self.selectors.get(&part_id.idx) { + fn tried_everybody(&self, part_id: u64) -> bool { + match self.selectors.get(&part_id) { Some(s) => s.tried_everybody(), None => true, } } - fn clear(&mut self, part_id: &PartId) { - self.selectors.remove(&part_id.idx); + fn clear(&mut self, part_id: u64) { + self.selectors.remove(&part_id); } } @@ -220,7 +219,7 @@ impl Inner { &mut self, sync_hash: &CryptoHash, shard_id: ShardId, - part_id: &PartId, + part_id: u64, max_entries_added: usize, ) { let selector = self.state_part_selectors.get(&shard_id).unwrap(); @@ -340,7 +339,7 @@ impl SnapshotHostsCache { &self, sync_hash: &CryptoHash, shard_id: ShardId, - part_id: &PartId, + part_id: u64, ) -> Option { let mut inner = self.0.lock(); let num_hosts = inner.hosts.len(); @@ -358,7 +357,7 @@ impl SnapshotHostsCache { // associated with it that we were going to use to respond to future calls to select_host() // TODO: get rid of the dead_code and hook this up to the decentralized state sync #[allow(dead_code)] - pub fn part_received(&self, _sync_hash: &CryptoHash, shard_id: ShardId, part_id: &PartId) { + pub fn part_received(&self, _sync_hash: &CryptoHash, shard_id: ShardId, part_id: u64) { let mut inner = self.0.lock(); let selector = inner.state_part_selectors.entry(shard_id).or_default(); selector.clear(part_id); @@ -366,7 +365,7 @@ impl SnapshotHostsCache { // used for testing purposes only to check that we clear state after part_received() is called #[allow(dead_code)] - pub(crate) fn part_peer_state_len(&self, shard_id: ShardId, part_id: &PartId) -> usize { + pub(crate) fn part_peer_state_len(&self, shard_id: ShardId, part_id: u64) -> usize { let inner = self.0.lock(); match inner.state_part_selectors.get(&shard_id) { Some(s) => s.len(part_id), diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 67c057e610d..8f8a7c953f8 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -245,7 +245,12 @@ pub enum NetworkRequests { /// Request state header for given shard at given state root. StateRequestHeader { shard_id: ShardId, sync_hash: CryptoHash, peer_id: PeerId }, /// Request state part for given shard at given state root. - StateRequestPart { shard_id: ShardId, sync_hash: CryptoHash, part_id: u64, peer_id: PeerId }, + StateRequestPart { + shard_id: ShardId, + sync_hash: CryptoHash, + sync_prev_prev_hash: CryptoHash, + part_id: u64 + }, /// Ban given peer. BanPeer { peer_id: PeerId, ban_reason: ReasonForBan }, /// Announce account From c0fd295b9a6263b967fec5317bf127f771232f2a Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Sat, 14 Sep 2024 20:28:45 -0400 Subject: [PATCH 14/20] include shard_id in prio and fix tests --- chain/network/src/snapshot_hosts/mod.rs | 5 +++-- chain/network/src/snapshot_hosts/tests.rs | 15 +++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/chain/network/src/snapshot_hosts/mod.rs b/chain/network/src/snapshot_hosts/mod.rs index 293ba675ca3..14c6e46cd98 100644 --- a/chain/network/src/snapshot_hosts/mod.rs +++ b/chain/network/src/snapshot_hosts/mod.rs @@ -41,9 +41,10 @@ pub struct Config { pub part_selection_cache_batch_size: u32, } -pub(crate) fn priority_score(peer_id: &PeerId, part_id: u64) -> [u8; 32] { +pub(crate) fn priority_score(peer_id: &PeerId, shard_id: ShardId, part_id: u64) -> [u8; 32] { let mut h = Sha256::new(); h.update(peer_id.public_key().key_data()); + h.update(shard_id.to_le_bytes()); h.update(part_id.to_le_bytes()); h.finalize().into() } @@ -233,7 +234,7 @@ impl Inner { { continue; } - let score = priority_score(peer_id, part_id); + let score = priority_score(peer_id, shard_id, part_id); if new_peers.len() < max_entries_added { new_peers.push(ReversePartPriority { peer_id: peer_id.clone(), score }); } else { diff --git a/chain/network/src/snapshot_hosts/tests.rs b/chain/network/src/snapshot_hosts/tests.rs index 8be5c16388a..aba2c6c6c3e 100644 --- a/chain/network/src/snapshot_hosts/tests.rs +++ b/chain/network/src/snapshot_hosts/tests.rs @@ -9,7 +9,6 @@ use near_crypto::SecretKey; use near_o11y::testonly::init_test_logger; use near_primitives::hash::CryptoHash; use near_primitives::network::PeerId; -use near_primitives::state_part::PartId; use near_primitives::types::EpochHeight; use near_primitives::types::ShardId; use rand::Rng; @@ -298,7 +297,7 @@ async fn run_select_peer_test( actions: &[SelectPeerAction], peers: &[Arc], sync_hash: &CryptoHash, - part_id: &PartId, + part_id: u64, part_selection_cache_batch_size: u32, ) { let config = @@ -319,7 +318,7 @@ async fn run_select_peer_test( assert!(err.is_none()); } SelectPeerAction::CallSelect(wanted) => { - let peer = cache.select_host(sync_hash, 0, &part_id); + let peer = cache.select_host(sync_hash, 0, part_id); let wanted = match wanted { Some(idx) => Some(&peers[*idx].peer_id), None => None, @@ -327,8 +326,8 @@ async fn run_select_peer_test( assert!(peer.as_ref() == wanted, "got: {:?} want: {:?}", &peer, &wanted); } SelectPeerAction::PartReceived => { - cache.part_received(sync_hash, 0, &part_id); - assert_eq!(cache.part_peer_state_len(0, &part_id), 0); + cache.part_received(sync_hash, 0, part_id); + assert_eq!(cache.part_peer_state_len(0, part_id), 0); } } } @@ -339,13 +338,13 @@ async fn test_select_peer() { init_test_logger(); let mut rng = make_rng(2947294234); let sync_hash = CryptoHash(rng.gen()); - let part_id = PartId { idx: 0, total: 100 }; + let part_id = 0; let num_peers = SELECT_PEER_CASES.iter().map(|t| t.num_peers).max().unwrap(); let mut peers = Vec::with_capacity(num_peers); for _ in 0..num_peers { let key = data::make_secret_key(&mut rng); let peer_id = PeerId::new(key.public_key()); - let score = priority_score(&peer_id, &part_id); + let score = priority_score(&peer_id, 0u64, part_id); let info = Arc::new(SnapshotHostInfo::new(peer_id, sync_hash, 123, vec![0, 1, 2, 3], &key)); peers.push((info, score)); } @@ -361,7 +360,7 @@ async fn test_select_peer() { &t.actions, &peers[..t.num_peers], &sync_hash, - &part_id, + part_id, t.part_selection_cache_batch_size, ) .await; From b3499e6df4d239abc29e78418fa205f6d94faed2 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Sun, 15 Sep 2024 16:32:06 -0400 Subject: [PATCH 15/20] simplify peer selection and fix some bugs --- .../src/peer_manager/peer_manager_actor.rs | 2 +- chain/network/src/snapshot_hosts/mod.rs | 275 ++++++++---------- chain/network/src/snapshot_hosts/tests.rs | 7 +- 3 files changed, 124 insertions(+), 160 deletions(-) diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 972b1ff08d4..26c939c9907 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -850,7 +850,7 @@ impl PeerManagerActor { // The node needs to include its own public address in the request // so that the reponse can be sent over Tier3 if let Some(addr) = *self.state.my_public_addr.read() { - if let Some(peer_id) = self.state.snapshot_hosts.select_host( + if let Some(peer_id) = self.state.snapshot_hosts.select_host_for_part( &sync_prev_prev_hash, shard_id, part_id diff --git a/chain/network/src/snapshot_hosts/mod.rs b/chain/network/src/snapshot_hosts/mod.rs index 14c6e46cd98..88f7268911c 100644 --- a/chain/network/src/snapshot_hosts/mod.rs +++ b/chain/network/src/snapshot_hosts/mod.rs @@ -41,6 +41,10 @@ pub struct Config { pub part_selection_cache_batch_size: u32, } +/// When multiple hosts offer the same part, this hash is compared +/// to determine the order in which to query them. All nodes +/// use the same hashing scheme, resulting in a rough consensus on +/// which hosts serve requests for which parts. pub(crate) fn priority_score(peer_id: &PeerId, shard_id: ShardId, part_id: u64) -> [u8; 32] { let mut h = Sha256::new(); h.update(peer_id.public_key().key_data()); @@ -50,84 +54,62 @@ pub(crate) fn priority_score(peer_id: &PeerId, shard_id: ShardId, part_id: u64) } #[derive(Clone, Debug, PartialEq, Eq)] -struct PartPriority { +struct StatePartHost { + /// A peer host for some desired state part peer_id: PeerId, + /// Priority score computed over the peer_id, shard_id, and part_id score: [u8; 32], - // TODO: consider storing this on disk, so we can remember who hasn't - // been able to provide us with the parts across restarts - times_returned: usize, + /// The number of times we have already queried this host for this part + num_requests: usize, } -impl PartPriority { - fn inc(&mut self) { - self.times_returned += 1; - } -} - -impl PartialOrd for PartPriority { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for PartPriority { +impl Ord for StatePartHost { fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.times_returned - .cmp(&other.times_returned) - .reverse() - .then_with(|| self.score.cmp(&other.score).reverse()) + // std::collections:BinaryHeap used in PeerPartSelector is a max-heap. + // We prefer hosts with the least num_requests, after which we break + // ties according to the priority score and the peer_id. + self.num_requests.cmp(&other.num_requests).reverse() + .then_with(|| self.score.cmp(&other.score)) .then_with(|| self.peer_id.cmp(&other.peer_id)) } } -impl From for PartPriority { - fn from(ReversePartPriority { peer_id, score }: ReversePartPriority) -> Self { - Self { peer_id, score, times_returned: 0 } - } -} - -// used in insert_part_hosts() to iterate through the list of unseen hosts -// and keep the top N hosts as we go through. We use this struct there instead -// of PartPriority because we need the comparator to be the opposite of what -// it is for that struct -#[derive(Clone, Debug, PartialEq, Eq)] -struct ReversePartPriority { - peer_id: PeerId, - score: [u8; 32], -} - -impl PartialOrd for ReversePartPriority { +impl PartialOrd for StatePartHost { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl Ord for ReversePartPriority { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.score.cmp(&other.score).then_with(|| self.peer_id.cmp(&other.peer_id)) +impl StatePartHost { + fn increment_num_requests(&mut self) { + self.num_requests += 1; } } + #[derive(Default)] struct PartPeerSelector { - peers: BinaryHeap, + /// Ordered collection of available hosts for some desired state part + peers: BinaryHeap, } impl PartPeerSelector { fn next(&mut self) -> Option { match self.peers.pop() { - Some(mut priority) => { - priority.inc(); - let peer_id = priority.peer_id.clone(); - self.peers.push(priority); + Some(mut p) => { + p.increment_num_requests(); + let peer_id = p.peer_id.clone(); + self.peers.push(p); Some(peer_id) } None => None, } } - fn insert_peers>(&mut self, peers: T) { - self.peers.extend(peers) + fn insert_peers>(&mut self, peers: T) { + for p in peers { + self.peers.push(p); + } } fn len(&self) -> usize { @@ -135,61 +117,25 @@ impl PartPeerSelector { } fn tried_everybody(&self) -> bool { - self.peers.iter().all(|priority| priority.times_returned > 0) - } -} - -#[derive(Default)] -struct PeerSelector { - selectors: HashMap, -} - -impl PeerSelector { - fn next(&mut self, part_id: u64) -> Option { - self.selectors.entry(part_id).or_default().next() - } - - fn len(&self, part_id: u64) -> usize { - match self.selectors.get(&part_id) { - Some(s) => s.len(), - None => 0, - } - } - - fn insert_peers>(&mut self, part_id: u64, peers: T) { - self.selectors.entry(part_id).or_default().insert_peers(peers); + self.peers.iter().all(|priority| priority.num_requests > 0) } - fn seen_peers(&self, part_id: u64) -> HashSet { - match self.selectors.get(&part_id) { - Some(s) => { - let mut ret = HashSet::new(); - for p in s.peers.iter() { - ret.insert(p.peer_id.clone()); - } - ret - } - None => HashSet::new(), - } - } - - // have we already returned every peer we know about? - fn tried_everybody(&self, part_id: u64) -> bool { - match self.selectors.get(&part_id) { - Some(s) => s.tried_everybody(), - None => true, - } - } - - fn clear(&mut self, part_id: u64) { - self.selectors.remove(&part_id); + fn peer_set(&self) -> HashSet { + self.peers.iter().map(|p| p.peer_id.clone()).collect() } } + struct Inner { /// The latest known SnapshotHostInfo for each node in the network hosts: LruCache>, - state_part_selectors: HashMap, + /// The hash for the most recent active state sync + sync_hash: Option, + /// Number of available hosts for the active state sync, by shard + hosts_for_shard: HashMap>, + /// Local data structures used to distribute state part requests among known hosts + peer_selector: HashMap<(ShardId, u64), PartPeerSelector>, + /// Batch size for populating the peer_selector from the hosts part_selection_cache_batch_size: usize, } @@ -208,44 +154,80 @@ impl Inner { if !self.is_new(&d) { return None; } + + if self.sync_hash == Some(d.sync_hash) { + for shard_id in &d.shards { + self.hosts_for_shard + .entry(shard_id.clone()) + .or_insert(HashSet::default()) + .insert(d.peer_id.clone()); + } + } self.hosts.push(d.peer_id.clone(), d.clone()); + Some(d) } - // Try to insert up to max_entries_added more peers into the state part selector for this part ID - // this will look for the best priority `max_entries_added` peers that we haven't yet added to the set - // of peers to ask for this part, and will add them to the heap so that we can return one of those next - // time select_host() is called - fn insert_part_hosts( + /// Given a state part request produced by the local node, + /// selects a host to which the request should be routed. + pub fn select_host_for_part( &mut self, sync_hash: &CryptoHash, shard_id: ShardId, part_id: u64, - max_entries_added: usize, - ) { - let selector = self.state_part_selectors.get(&shard_id).unwrap(); - let seen_peers = selector.seen_peers(part_id); - - let mut new_peers = BinaryHeap::new(); - for (peer_id, info) in self.hosts.iter() { - if seen_peers.contains(peer_id) - || info.sync_hash != *sync_hash - || !info.shards.contains(&shard_id) - { - continue; + ) -> Option { + // Reset internal state if the sync_hash has changed + if self.sync_hash != Some(*sync_hash) { + self.sync_hash = Some(*sync_hash); + self.hosts_for_shard.clear(); + self.peer_selector.clear(); + + for (peer_id, info) in self.hosts.iter() { + if info.sync_hash == *sync_hash { + for shard_id in &info.shards { + self.hosts_for_shard.entry(shard_id.clone()) + .or_insert(HashSet::default()) + .insert(peer_id.clone()); + } + } } - let score = priority_score(peer_id, shard_id, part_id); - if new_peers.len() < max_entries_added { - new_peers.push(ReversePartPriority { peer_id: peer_id.clone(), score }); - } else { - if score < new_peers.peek().unwrap().score { + } + + let selector = &mut self.peer_selector + .entry((shard_id, part_id)) + .or_insert(PartPeerSelector::default()); + + // Insert more hosts into the selector if needed + let available_hosts = self.hosts_for_shard.get(&shard_id)?; + if selector.tried_everybody() && selector.len() < available_hosts.len() { + let mut new_peers = BinaryHeap::new(); + let already_included = selector.peer_set(); + + for peer_id in available_hosts { + if already_included.contains(peer_id) { + continue; + } + + let score = priority_score(peer_id, shard_id, part_id); + tracing::info!(target: "db", "score for {} {} {} is {:?}", peer_id, shard_id, part_id, score); + + // Wrap entries with `Reverse` so that we pop the *least* desirable options + new_peers.push(std::cmp::Reverse(StatePartHost { + peer_id: peer_id.clone(), + score, + num_requests: 0, + })); + + if new_peers.len() > self.part_selection_cache_batch_size { new_peers.pop(); - new_peers.push(ReversePartPriority { peer_id: peer_id.clone(), score }); } } + + selector.insert_peers(new_peers.drain().map(|e| e.0)); } - let selector = self.state_part_selectors.get_mut(&shard_id).unwrap(); - selector.insert_peers(part_id, new_peers.into_iter().map(Into::into)); + + let res = selector.next(); + res } } @@ -253,13 +235,11 @@ pub(crate) struct SnapshotHostsCache(Mutex); impl SnapshotHostsCache { pub fn new(config: Config) -> Self { - debug_assert!(config.part_selection_cache_batch_size > 0); - let hosts = - LruCache::new(NonZeroUsize::new(config.snapshot_hosts_cache_size as usize).unwrap()); - let state_part_selectors = HashMap::new(); Self(Mutex::new(Inner { - hosts, - state_part_selectors, + hosts: LruCache::new(NonZeroUsize::new(config.snapshot_hosts_cache_size as usize).unwrap()), + sync_hash: None, + hosts_for_shard: HashMap::new(), + peer_selector: HashMap::new(), part_selection_cache_batch_size: config.part_selection_cache_batch_size as usize, })) } @@ -331,46 +311,29 @@ impl SnapshotHostsCache { self.0.lock().hosts.iter().map(|(_, v)| v.clone()).collect() } - // Selecs a peer to send the request for this part ID to. Chooses based on a priority score - // calculated as a hash of the Peer ID plus the part ID, and will return different hosts - // on subsequent calls, eventually iterating over all valid SnapshotHostInfos we know about - // TODO: get rid of the dead_code and hook this up to the decentralized state sync - #[allow(dead_code)] - pub fn select_host( + /// Given a state part request, selects a peer host to which the request should be sent. + pub fn select_host_for_part( &self, sync_hash: &CryptoHash, shard_id: ShardId, part_id: u64, ) -> Option { - let mut inner = self.0.lock(); - let num_hosts = inner.hosts.len(); - let selector = inner.state_part_selectors.entry(shard_id).or_default(); - - if selector.tried_everybody(part_id) && selector.len(part_id) < num_hosts { - let max_entries_added = inner.part_selection_cache_batch_size; - inner.insert_part_hosts(sync_hash, shard_id, part_id, max_entries_added); - } - let selector = inner.state_part_selectors.get_mut(&shard_id).unwrap(); - selector.next(part_id) + tracing::info!(target: "db", "Processing request for host {} {} {}", sync_hash, shard_id, part_id); + self.0.lock().select_host_for_part(sync_hash, shard_id, part_id) } - // Lets us know that we have already successfully retrieved this part, and we can free any data - // associated with it that we were going to use to respond to future calls to select_host() - // TODO: get rid of the dead_code and hook this up to the decentralized state sync - #[allow(dead_code)] - pub fn part_received(&self, _sync_hash: &CryptoHash, shard_id: ShardId, part_id: u64) { + /// Called when a state part is successfully received + /// so that the internal state can be cleaned up eagerly. + pub fn part_received(&self, sync_hash: &CryptoHash, shard_id: ShardId, part_id: u64) { let mut inner = self.0.lock(); - let selector = inner.state_part_selectors.entry(shard_id).or_default(); - selector.clear(part_id); + if inner.sync_hash == Some(*sync_hash) { + inner.peer_selector.remove(&(shard_id, part_id)); + } } - // used for testing purposes only to check that we clear state after part_received() is called - #[allow(dead_code)] - pub(crate) fn part_peer_state_len(&self, shard_id: ShardId, part_id: u64) -> usize { + #[cfg(test)] + pub(crate) fn has_selector(&self, shard_id: ShardId, part_id: u64) -> bool { let inner = self.0.lock(); - match inner.state_part_selectors.get(&shard_id) { - Some(s) => s.len(part_id), - None => 0, - } + inner.peer_selector.contains_key(&(shard_id, part_id)) } } diff --git a/chain/network/src/snapshot_hosts/tests.rs b/chain/network/src/snapshot_hosts/tests.rs index aba2c6c6c3e..20d48adccc4 100644 --- a/chain/network/src/snapshot_hosts/tests.rs +++ b/chain/network/src/snapshot_hosts/tests.rs @@ -318,7 +318,7 @@ async fn run_select_peer_test( assert!(err.is_none()); } SelectPeerAction::CallSelect(wanted) => { - let peer = cache.select_host(sync_hash, 0, part_id); + let peer = cache.select_host_for_part(sync_hash, 0, part_id); let wanted = match wanted { Some(idx) => Some(&peers[*idx].peer_id), None => None, @@ -326,8 +326,9 @@ async fn run_select_peer_test( assert!(peer.as_ref() == wanted, "got: {:?} want: {:?}", &peer, &wanted); } SelectPeerAction::PartReceived => { + assert!(cache.has_selector(0, part_id)); cache.part_received(sync_hash, 0, part_id); - assert_eq!(cache.part_peer_state_len(0, part_id), 0); + assert!(!cache.has_selector(0, part_id)); } } } @@ -348,7 +349,7 @@ async fn test_select_peer() { let info = Arc::new(SnapshotHostInfo::new(peer_id, sync_hash, 123, vec![0, 1, 2, 3], &key)); peers.push((info, score)); } - peers.sort_by(|(_linfo, lscore), (_rinfo, rscore)| lscore.partial_cmp(rscore).unwrap()); + peers.sort_by(|(_linfo, lscore), (_rinfo, rscore)| lscore.partial_cmp(rscore).unwrap().reverse()); let peers = peers.into_iter().map(|(info, _score)| info).collect::>(); tracing::debug!( "run_select_peer_test peers: {:?}", From fa6c0694ed6bb2a59cf249e2e0f78cacbb3f7c8c Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Sun, 15 Sep 2024 23:36:01 -0400 Subject: [PATCH 16/20] implement an idle timeout for tier3 connections --- .../src/peer_manager/network_state/mod.rs | 17 ++++++---- .../src/peer_manager/peer_manager_actor.rs | 32 +++++++++++++++++-- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 4f238f9b681..ae4499d082d 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -388,14 +388,19 @@ impl NetworkState { let clock = clock.clone(); let conn = conn.clone(); self.spawn(async move { - let peer_id = conn.peer_info.id.clone(); - if conn.tier == tcp::Tier::T1 { - // There is no banning or routing table for TIER1. - // Just remove the connection from the network_state. - this.tier1.remove(&conn); + match conn.tier { + tcp::Tier::T1 => this.tier1.remove(&conn), + tcp::Tier::T2 => this.tier2.remove(&conn), + tcp::Tier::T3 => this.tier3.remove(&conn), + } + + // The rest of this function has to do with banning or routing, + // which are applicable only for TIER2. + if conn.tier != tcp::Tier::T2 { return; } - this.tier2.remove(&conn); + + let peer_id = conn.peer_info.id.clone(); // If the last edge we have with this peer represent a connection addition, create the edge // update that represents the connection removal. diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 26c939c9907..c67476dbaed 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -86,8 +86,11 @@ const PREFER_PREVIOUSLY_CONNECTED_PEER: f64 = 0.6; pub(crate) const UPDATE_CONNECTION_STORE_INTERVAL: time::Duration = time::Duration::minutes(1); /// How often to poll the NetworkState for closed connections we'd like to re-establish. pub(crate) const POLL_CONNECTION_STORE_INTERVAL: time::Duration = time::Duration::minutes(1); + /// How often we should check for pending Tier3 requests const PROCESS_TIER3_REQUESTS_INTERVAL: time::Duration = time::Duration::seconds(2); +/// The length of time that a Tier3 connection is allowed to idle before it is stopped +const TIER3_IDLE_TIMEOUT: time::Duration = time::Duration::seconds(15); /// Actor that manages peers connections. pub struct PeerManagerActor { @@ -386,7 +389,7 @@ impl PeerManagerActor { state.tier3.send_message(request.peer_info.id, Arc::new(response)); } else { - tracing::debug!(target: "network", "failed to produce response to request {:?}", request); + tracing::debug!(target: "network", "client failed to produce response for {:?}", request); } } }); @@ -609,6 +612,29 @@ impl PeerManagerActor { } } + /// TIER3 connections are established ad-hoc to transmit individual large messages. + /// Here we terminate these "single-purpose" connections after an idle timeout. + /// + /// When a TIER3 connection is established the intended message is already prepared in-memory, + /// so there is no concern of the timeout falling in between the handshake and the payload. + /// + /// A finer detail is that as long as a TIER3 connection remains open it can be reused to + /// transmit additional TIER3 payloads intended for the same peer. In such cases the message + /// can be lost if the timeout is reached while it is in flight. + fn stop_tier3_idle_connections(&self) { + let now = self.clock.now(); + let _ = self.state + .tier3 + .load() + .ready + .values() + .filter(|p| { + now - p.last_time_received_message.load() + > TIER3_IDLE_TIMEOUT + }) + .map(|p| p.stop(None)); + } + /// Periodically monitor list of peers and: /// - request new peers from connected peers, /// - bootstrap outbound connections from known peers, @@ -677,6 +703,9 @@ impl PeerManagerActor { // If there are too many active connections try to remove some connections self.maybe_stop_active_connection(); + // Close Tier3 connections which have been idle for too long + self.stop_tier3_idle_connections(); + // Find peers that are not reliable (too much behind) - and make sure that we're not routing messages through them. let unreliable_peers = self.unreliable_peers(); metrics::PEER_UNRELIABLE.set(unreliable_peers.len() as i64); @@ -855,7 +884,6 @@ impl PeerManagerActor { shard_id, part_id ) { - tracing::info!(target: "db", "sending request for {} {} to {}", shard_id, part_id, peer_id); success = self.state.send_message_to_peer( &self.clock, tcp::Tier::T2, From 9a267cfcd4aa3aa629a9b2fec91a291b0960e8d7 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Tue, 17 Sep 2024 13:13:13 -0400 Subject: [PATCH 17/20] clear peer selector when state part is received --- chain/client/src/sync/state.rs | 17 ++++++++++----- .../src/test_utils/peer_manager_mock.rs | 8 +++++-- .../src/peer_manager/peer_manager_actor.rs | 21 ++++++++++++++++++- chain/network/src/snapshot_hosts/mod.rs | 16 +++++++------- chain/network/src/test_utils.rs | 6 +++++- chain/network/src/types.rs | 7 +++++++ 6 files changed, 57 insertions(+), 18 deletions(-) diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index 198a59d2901..6913e4c9bec 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -27,7 +27,7 @@ use crate::sync::external::{ use borsh::BorshDeserialize; use futures::{future, FutureExt}; use near_async::futures::{FutureSpawner, FutureSpawnerExt}; -use near_async::messaging::SendAsync; +use near_async::messaging::{SendAsync, CanSend}; use near_async::time::{Clock, Duration, Utc}; use near_chain::chain::{ApplyStatePartsRequest, LoadMemtrieRequest}; use near_chain::near_chain_primitives; @@ -38,9 +38,8 @@ use near_client_primitives::types::{ format_shard_sync_phase, DownloadStatus, ShardSyncDownload, ShardSyncStatus, }; use near_epoch_manager::EpochManagerAdapter; -use near_network::types::PeerManagerMessageRequest; use near_network::types::{ - HighestHeightPeerInfo, NetworkRequests, NetworkResponses, PeerManagerAdapter, + HighestHeightPeerInfo, NetworkRequests, NetworkResponses, PeerManagerAdapter, PeerManagerMessageRequest, StateSyncEvent, }; use near_primitives::hash::CryptoHash; use near_primitives::network::PeerId; @@ -504,10 +503,18 @@ impl StateSync { ) { last_part_id_requested.remove(&(target.clone(), shard_id)); } + + self.network_adapter.send(StateSyncEvent::StatePartReceived ( + shard_id, part_id + )); } } StateSyncInner::External { .. } => { - // Do nothing. + // It is possible that we have previously made peer requests for this part + // before falling back to the External host. + self.network_adapter.send(StateSyncEvent::StatePartReceived ( + shard_id, part_id + )); } } } @@ -1324,7 +1331,7 @@ fn request_part_from_peers( download.run_me.store(false, Ordering::SeqCst); download.state_requests_count += 1; // TODO(saketh): clean this up now that targets are picked in peer manager - download.last_target = Some(peer_id.clone()); + download.last_target = Some(peer_id); let run_me = download.run_me.clone(); near_performance_metrics::actix::spawn( diff --git a/chain/client/src/test_utils/peer_manager_mock.rs b/chain/client/src/test_utils/peer_manager_mock.rs index 0f6c9031890..6d72b48b0f1 100644 --- a/chain/client/src/test_utils/peer_manager_mock.rs +++ b/chain/client/src/test_utils/peer_manager_mock.rs @@ -1,5 +1,4 @@ -use near_network::types::SetChainInfo; -use near_network::types::{PeerManagerMessageRequest, PeerManagerMessageResponse}; +use near_network::types::{PeerManagerMessageRequest, PeerManagerMessageResponse, SetChainInfo, StateSyncEvent}; pub struct PeerManagerMock { handle: Box< @@ -37,3 +36,8 @@ impl actix::Handler for PeerManagerMock { type Result = (); fn handle(&mut self, _msg: SetChainInfo, _ctx: &mut Self::Context) {} } + +impl actix::Handler for PeerManagerMock { + type Result = (); + fn handle(&mut self, _msg: StateSyncEvent, _ctx: &mut Self::Context) {} +} diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index c67476dbaed..3629f5f2d2d 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -18,7 +18,7 @@ use crate::tcp; use crate::types::{ ConnectedPeerInfo, HighestHeightPeerInfo, KnownProducer, NetworkInfo, NetworkRequests, NetworkResponses, PeerInfo, PeerManagerMessageRequest, PeerManagerMessageResponse, PeerType, - SetChainInfo, SnapshotHostInfo, Tier3RequestBody + SetChainInfo, SnapshotHostInfo, Tier3RequestBody, StateSyncEvent }; use ::time::ext::InstantExt as _; use actix::fut::future::wrap_future; @@ -1255,6 +1255,25 @@ impl actix::Handler> for PeerManagerA } } +impl actix::Handler> for PeerManagerActor { + type Result = (); + #[perf] + fn handle( + &mut self, + msg: WithSpanContext, + _ctx: &mut Self::Context, + ) -> Self::Result { + let (_span, msg) = handler_debug_span!(target: "network", msg); + let _timer = + metrics::PEER_MANAGER_MESSAGES_TIME.with_label_values(&[(&msg).into()]).start_timer(); + match msg { + StateSyncEvent::StatePartReceived(shard_id, part_id) => { + self.state.snapshot_hosts.part_received(shard_id, part_id); + } + } + } +} + impl actix::Handler for PeerManagerActor { type Result = DebugStatus; #[perf] diff --git a/chain/network/src/snapshot_hosts/mod.rs b/chain/network/src/snapshot_hosts/mod.rs index 88f7268911c..b37f0650781 100644 --- a/chain/network/src/snapshot_hosts/mod.rs +++ b/chain/network/src/snapshot_hosts/mod.rs @@ -129,7 +129,7 @@ impl PartPeerSelector { struct Inner { /// The latest known SnapshotHostInfo for each node in the network hosts: LruCache>, - /// The hash for the most recent active state sync + /// The hash for the most recent active state sync, inferred from part requests sync_hash: Option, /// Number of available hosts for the active state sync, by shard hosts_for_shard: HashMap>, @@ -158,7 +158,7 @@ impl Inner { if self.sync_hash == Some(d.sync_hash) { for shard_id in &d.shards { self.hosts_for_shard - .entry(shard_id.clone()) + .entry(*shard_id) .or_insert(HashSet::default()) .insert(d.peer_id.clone()); } @@ -185,7 +185,7 @@ impl Inner { for (peer_id, info) in self.hosts.iter() { if info.sync_hash == *sync_hash { for shard_id in &info.shards { - self.hosts_for_shard.entry(shard_id.clone()) + self.hosts_for_shard.entry(*shard_id) .or_insert(HashSet::default()) .insert(peer_id.clone()); } @@ -322,13 +322,11 @@ impl SnapshotHostsCache { self.0.lock().select_host_for_part(sync_hash, shard_id, part_id) } - /// Called when a state part is successfully received - /// so that the internal state can be cleaned up eagerly. - pub fn part_received(&self, sync_hash: &CryptoHash, shard_id: ShardId, part_id: u64) { + /// Triggered by state sync actor after processing a state part. + pub fn part_received(&self, shard_id: ShardId, part_id: u64) { + tracing::info!(target: "db", "clearing internal state for {} {}", shard_id, part_id); let mut inner = self.0.lock(); - if inner.sync_hash == Some(*sync_hash) { - inner.peer_selector.remove(&(shard_id, part_id)); - } + inner.peer_selector.remove(&(shard_id, part_id)); } #[cfg(test)] diff --git a/chain/network/src/test_utils.rs b/chain/network/src/test_utils.rs index 4806752517f..7a8504c5228 100644 --- a/chain/network/src/test_utils.rs +++ b/chain/network/src/test_utils.rs @@ -1,7 +1,7 @@ use crate::network_protocol::PeerInfo; use crate::types::{ NetworkInfo, NetworkResponses, PeerManagerMessageRequest, PeerManagerMessageResponse, - SetChainInfo, + SetChainInfo, StateSyncEvent }; use crate::PeerManagerActor; use actix::{Actor, ActorContext, Context, Handler}; @@ -259,6 +259,10 @@ impl CanSend for MockPeerManagerAdapter { fn send(&self, _msg: SetChainInfo) {} } +impl CanSend for MockPeerManagerAdapter { + fn send(&self, _msg: StateSyncEvent) {} +} + impl MockPeerManagerAdapter { pub fn pop(&self) -> Option { self.requests.write().unwrap().pop_front() diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 8f8a7c953f8..28ccc9feefc 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -293,6 +293,12 @@ pub enum NetworkRequests { EpochSyncResponse { route_back: CryptoHash, proof: CompressedEpochSyncProof }, } +#[derive(Debug, actix::Message, strum::IntoStaticStr)] +#[rtype(result = "()")] +pub enum StateSyncEvent { + StatePartReceived(ShardId, u64), +} + /// Combines peer address info, chain. #[derive(Debug, Clone, Eq, PartialEq)] pub struct FullPeerInfo { @@ -404,6 +410,7 @@ pub struct PeerManagerAdapter { pub async_request_sender: AsyncSender, pub request_sender: Sender, pub set_chain_info_sender: Sender, + pub state_sync_event_sender: Sender, } #[cfg(test)] From 4134f2cc071d4334de02ec8e802335c24fbef347 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Tue, 17 Sep 2024 13:18:42 -0400 Subject: [PATCH 18/20] fix test --- chain/network/src/snapshot_hosts/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/network/src/snapshot_hosts/tests.rs b/chain/network/src/snapshot_hosts/tests.rs index 20d48adccc4..3f684ec03df 100644 --- a/chain/network/src/snapshot_hosts/tests.rs +++ b/chain/network/src/snapshot_hosts/tests.rs @@ -327,7 +327,7 @@ async fn run_select_peer_test( } SelectPeerAction::PartReceived => { assert!(cache.has_selector(0, part_id)); - cache.part_received(sync_hash, 0, part_id); + cache.part_received(0, part_id); assert!(!cache.has_selector(0, part_id)); } } From a826bdb465d8a5cfb5eb8ec472ddcf3c7e1439f5 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Tue, 17 Sep 2024 18:18:22 -0400 Subject: [PATCH 19/20] try peers first before external storage --- chain/client/src/sync/state.rs | 420 +++++++++------------------------ 1 file changed, 112 insertions(+), 308 deletions(-) diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index 6913e4c9bec..77c7cbc8878 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -51,10 +51,8 @@ use near_primitives::state_sync::{ use near_primitives::types::{AccountId, EpochHeight, EpochId, ShardId, StateRoot}; use near_store::DBCol; use rand::seq::SliceRandom; -use rand::{thread_rng, Rng}; +use rand::thread_rng; use std::collections::HashMap; -use std::num::NonZeroUsize; -use std::ops::Add; use std::sync::atomic::Ordering; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; @@ -68,6 +66,9 @@ pub const MAX_STATE_PART_REQUEST: u64 = 16; /// Number of state parts already requested stored as pending. /// This number should not exceed MAX_STATE_PART_REQUEST times (number of peers in the network). pub const MAX_PENDING_PART: u64 = MAX_STATE_PART_REQUEST * 10000; +/// A node with external storage configured first tries to obtain state parts from peers. +/// For each part, it will make this many attempts before getting it from external storage. +pub const EXTERNAL_STORAGE_FALLBACK_THRESHOLD: u64 = 16; /// Time limit per state dump iteration. /// A node must check external storage for parts to dump again once time is up. pub const STATE_DUMP_ITERATION_TIME_LIMIT_SECS: u64 = 300; @@ -79,22 +80,6 @@ pub enum StateSyncResult { Completed, } -struct PendingRequestStatus { - clock: Clock, - /// Number of parts that are in progress (we requested them from a given peer but didn't get the answer yet). - missing_parts: usize, - wait_until: Utc, -} - -impl PendingRequestStatus { - fn new(clock: Clock, timeout: Duration) -> Self { - Self { clock: clock.clone(), missing_parts: 1, wait_until: clock.now_utc().add(timeout) } - } - fn expired(&self) -> bool { - self.clock.now_utc() > self.wait_until - } -} - pub enum StateSyncFileDownloadResult { StateHeader { header_length: u64, header: ShardStateSyncResponseHeader }, StatePart { part_length: u64 }, @@ -109,32 +94,21 @@ pub struct StateSyncGetFileResult { result: Result, } -/// How to retrieve the state data. -enum StateSyncInner { - /// Request both the state header and state parts from the peers. - Peers { - /// Which parts were requested from which peer and when. - last_part_id_requested: HashMap<(PeerId, ShardId), PendingRequestStatus>, - /// Map from which part we requested to whom. - requested_target: lru::LruCache<(u64, CryptoHash), PeerId>, - }, - /// Requests the state header from peers but gets the state parts from an - /// external storage. - External { - /// Chain ID. - chain_id: String, - /// This semaphore imposes a restriction on the maximum number of simultaneous downloads - semaphore: Arc, - /// Connection to the external storage. - external: ExternalConnection, - }, +struct StateSyncExternal { + /// Chain ID. + chain_id: String, + /// This semaphore imposes a restriction on the maximum number of simultaneous downloads + semaphore: Arc, + /// Connection to the external storage. + external: ExternalConnection, } /// Helper to track state sync. pub struct StateSync { clock: Clock, - /// How to retrieve the state data. - inner: StateSyncInner, + + /// External storage, if configured. + external: Option, /// Is used for communication with the peers. network_adapter: PeerManagerAdapter, @@ -167,13 +141,10 @@ impl StateSync { sync_config: &SyncConfig, catchup: bool, ) -> Self { - let inner = match sync_config { - SyncConfig::Peers => StateSyncInner::Peers { - last_part_id_requested: Default::default(), - requested_target: lru::LruCache::new( - NonZeroUsize::new(MAX_PENDING_PART as usize).unwrap(), - ), - }, + let external = match sync_config { + SyncConfig::Peers => { + None + } SyncConfig::ExternalStorage(ExternalStorageConfig { location, num_concurrent_requests, @@ -205,17 +176,17 @@ impl StateSync { } else { *num_concurrent_requests } as usize; - StateSyncInner::External { + Some(StateSyncExternal { chain_id: chain_id.to_string(), semaphore: Arc::new(tokio::sync::Semaphore::new(num_permits)), external, - } + }) } }; let (tx, rx) = channel::(); StateSync { clock, - inner, + external, network_adapter, timeout, state_parts_apply_results: HashMap::new(), @@ -487,60 +458,12 @@ impl StateSync { &mut self, part_id: u64, shard_id: ShardId, - sync_hash: CryptoHash, + _sync_hash: CryptoHash, ) { - match &mut self.inner { - StateSyncInner::Peers { last_part_id_requested, requested_target } => { - let key = (part_id, sync_hash); - // Check that it came from the target that we requested it from. - if let Some(target) = requested_target.get(&key) { - if last_part_id_requested.get_mut(&(target.clone(), shard_id)).map_or( - false, - |request| { - request.missing_parts = request.missing_parts.saturating_sub(1); - request.missing_parts == 0 - }, - ) { - last_part_id_requested.remove(&(target.clone(), shard_id)); - } - - self.network_adapter.send(StateSyncEvent::StatePartReceived ( - shard_id, part_id - )); - } - } - StateSyncInner::External { .. } => { - // It is possible that we have previously made peer requests for this part - // before falling back to the External host. - self.network_adapter.send(StateSyncEvent::StatePartReceived ( - shard_id, part_id - )); - } - } - } - - /// Avoids peers that already have outstanding requests for parts. - fn select_peers( - &mut self, - highest_height_peers: &[HighestHeightPeerInfo], - shard_id: ShardId, - ) -> Result, near_chain::Error> { - let peers: Vec = - highest_height_peers.iter().map(|peer| peer.peer_info.id.clone()).collect(); - let res = match &mut self.inner { - StateSyncInner::Peers { last_part_id_requested, .. } => { - last_part_id_requested.retain(|_, request| !request.expired()); - peers - .into_iter() - .filter(|peer| { - // If we still have a pending request from this node - don't add another one. - !last_part_id_requested.contains_key(&(peer.clone(), shard_id)) - }) - .collect::>() - } - StateSyncInner::External { .. } => peers, - }; - Ok(res) + // TODO: where is the part validated though? + self.network_adapter.send(StateSyncEvent::StatePartReceived ( + shard_id, part_id + )); } /// Returns new ShardSyncDownload if successful, otherwise returns given shard_sync_download @@ -554,23 +477,21 @@ impl StateSync { runtime_adapter: Arc, state_parts_future_spawner: &dyn FutureSpawner, ) -> Result<(), near_chain::Error> { - let mut possible_targets = vec![]; - match self.inner { - StateSyncInner::Peers { .. } => { - possible_targets = self.select_peers(highest_height_peers, shard_id)?; - if possible_targets.is_empty() { - tracing::debug!(target: "sync", "Can't request a state header: No possible targets"); - // In most cases it means that all the targets are currently busy (that we have a pending request with them). - return Ok(()); - } - } - // We do not need to select a target for external storage. - StateSyncInner::External { .. } => {} - } - // Downloading strategy starts here match shard_sync_download.status { ShardSyncStatus::StateDownloadHeader => { + // If no external storage is configured, we have to request headers from our peers + let possible_targets = match self.external { + Some(_) => vec![], + None => { + if highest_height_peers.is_empty() { + tracing::debug!(target: "sync", "Can't request a state header: No possible targets"); + return Ok(()); + } + highest_height_peers.iter().map(|peer| peer.peer_info.id.clone()).collect() + } + }; + self.request_shard_header( chain, shard_id, @@ -584,7 +505,6 @@ impl StateSync { self.request_shard_parts( shard_id, sync_hash, - possible_targets, shard_sync_download, chain, runtime_adapter, @@ -608,49 +528,46 @@ impl StateSync { state_parts_future_spawner: &dyn FutureSpawner, ) { let header_download = new_shard_sync_download.get_header_download_mut().unwrap(); - match &mut self.inner { - StateSyncInner::Peers { .. } => { - let peer_id = possible_targets.choose(&mut thread_rng()).cloned().unwrap(); - tracing::debug!(target: "sync", ?peer_id, shard_id, ?sync_hash, ?possible_targets, "request_shard_header"); - assert!(header_download.run_me.load(Ordering::SeqCst)); - header_download.run_me.store(false, Ordering::SeqCst); - header_download.state_requests_count += 1; - header_download.last_target = Some(peer_id.clone()); - let run_me = header_download.run_me.clone(); - near_performance_metrics::actix::spawn( - std::any::type_name::(), - self.network_adapter - .send_async(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::StateRequestHeader { shard_id, sync_hash, peer_id }, - )) - .then(move |result| { - if let Ok(NetworkResponses::RouteNotFound) = - result.map(|f| f.as_network_response()) - { - // Send a StateRequestHeader on the next iteration - run_me.store(true, Ordering::SeqCst); - } - future::ready(()) - }), - ); - } - StateSyncInner::External { chain_id, external, .. } => { - let sync_block_header = chain.get_block_header(&sync_hash).unwrap(); - let epoch_id = sync_block_header.epoch_id(); - let epoch_info = chain.epoch_manager.get_epoch_info(epoch_id).unwrap(); - let epoch_height = epoch_info.epoch_height(); - request_header_from_external_storage( - header_download, - shard_id, - sync_hash, - epoch_id, - epoch_height, - &chain_id.clone(), - external.clone(), - state_parts_future_spawner, - self.state_parts_mpsc_tx.clone(), - ); - } + if let Some(StateSyncExternal { chain_id, external, .. }) = &self.external { + let sync_block_header = chain.get_block_header(&sync_hash).unwrap(); + let epoch_id = sync_block_header.epoch_id(); + let epoch_info = chain.epoch_manager.get_epoch_info(epoch_id).unwrap(); + let epoch_height = epoch_info.epoch_height(); + request_header_from_external_storage( + header_download, + shard_id, + sync_hash, + epoch_id, + epoch_height, + &chain_id.clone(), + external.clone(), + state_parts_future_spawner, + self.state_parts_mpsc_tx.clone(), + ); + } else { + let peer_id = possible_targets.choose(&mut thread_rng()).cloned().unwrap(); + tracing::debug!(target: "sync", ?peer_id, shard_id, ?sync_hash, ?possible_targets, "request_shard_header"); + assert!(header_download.run_me.load(Ordering::SeqCst)); + header_download.run_me.store(false, Ordering::SeqCst); + header_download.state_requests_count += 1; + header_download.last_target = Some(peer_id.clone()); + let run_me = header_download.run_me.clone(); + near_performance_metrics::actix::spawn( + std::any::type_name::(), + self.network_adapter + .send_async(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::StateRequestHeader { shard_id, sync_hash, peer_id }, + )) + .then(move |result| { + if let Ok(NetworkResponses::RouteNotFound) = + result.map(|f| f.as_network_response()) + { + // Send a StateRequestHeader on the next iteration + run_me.store(true, Ordering::SeqCst); + } + future::ready(()) + }), + ); } } @@ -659,7 +576,6 @@ impl StateSync { &mut self, shard_id: ShardId, sync_hash: CryptoHash, - possible_targets: Vec, new_shard_sync_download: &mut ShardSyncDownload, chain: &Chain, runtime_adapter: Arc, @@ -667,66 +583,20 @@ impl StateSync { ) { // Iterate over all parts that needs to be requested (i.e. download.run_me is true). // Parts are ordered such that its index match its part_id. - match &mut self.inner { - StateSyncInner::Peers { last_part_id_requested, requested_target } => { - // We'll select all the 'highest' peers + validators as candidates (excluding those that gave us timeout in the past). - // And for each one of them, we'll ask for up to 16 (MAX_STATE_PART_REQUEST) parts. - let possible_targets_sampler = - SamplerLimited::new(possible_targets, MAX_STATE_PART_REQUEST); - - // For every part that needs to be requested it is selected one - // peer (target) randomly to request the part from. - // IMPORTANT: here we use 'zip' with possible_target_sampler - - // which is limited. So at any moment we'll not request more - // than possible_targets.len() * MAX_STATE_PART_REQUEST parts. - for ((part_id, download), target) in - parts_to_fetch(new_shard_sync_download).zip(possible_targets_sampler) - { - // In flat storage snapshots are indexed according to the hash of the second to - // last block in the epoch. When performing state sync the sync hash is defined - // as the hash of the first block after the epoch. The request sent to the - // network adapater needs to include the sync_prev_prev_hash so that a peer - // hosting the correct snapshot can be selected. - // TODO(saketh): consider whether we can unify things in a cleaner way - if let Ok(header) = chain.get_block_header(&sync_hash) { - if let Ok(prev_header) = chain.get_block_header(&header.prev_hash()) { - let sync_prev_prev_hash = prev_header.prev_hash(); - - sent_request_part( - self.clock.clone(), - target.clone(), - part_id, - shard_id, - sync_hash, - last_part_id_requested, - requested_target, - self.timeout, - ); - - request_part_from_peers( - part_id, - target, - download, - shard_id, - sync_hash, - *sync_prev_prev_hash, - &self.network_adapter, - ); - } - } - } - } - StateSyncInner::External { chain_id, semaphore, external } => { - let sync_block_header = chain.get_block_header(&sync_hash).unwrap(); - let epoch_id = sync_block_header.epoch_id(); - let epoch_info = chain.epoch_manager.get_epoch_info(epoch_id).unwrap(); - let epoch_height = epoch_info.epoch_height(); + let mut peer_requests_sent = 0; + for (part_id, download) in parts_to_fetch(new_shard_sync_download) { + if self.external.is_some() && download.state_requests_count >= EXTERNAL_STORAGE_FALLBACK_THRESHOLD { + let StateSyncExternal { chain_id, semaphore, external } = self.external.as_ref().unwrap(); + if semaphore.available_permits() > 0 { + let sync_block_header = chain.get_block_header(&sync_hash).unwrap(); + let epoch_id = sync_block_header.epoch_id(); + let epoch_info = chain.epoch_manager.get_epoch_info(epoch_id).unwrap(); + let epoch_height = epoch_info.epoch_height(); + + let shard_state_header = chain.get_state_header(shard_id, sync_hash).unwrap(); + let state_root = shard_state_header.chunk_prev_state_root(); + let state_num_parts = shard_state_header.num_state_parts(); - let shard_state_header = chain.get_state_header(shard_id, sync_hash).unwrap(); - let state_root = shard_state_header.chunk_prev_state_root(); - let state_num_parts = shard_state_header.num_state_parts(); - - for (part_id, download) in parts_to_fetch(new_shard_sync_download) { request_part_from_external_storage( part_id, download, @@ -743,8 +613,27 @@ impl StateSync { state_parts_future_spawner, self.state_parts_mpsc_tx.clone(), ); - if semaphore.available_permits() == 0 { - break; + } + } else { + if peer_requests_sent >= MAX_STATE_PART_REQUEST { + continue; + } + + // The request sent to the network adapater needs to include the sync_prev_prev_hash + // so that a peer hosting the correct snapshot can be selected. + if let Ok(header) = chain.get_block_header(&sync_hash) { + if let Ok(prev_header) = chain.get_block_header(&header.prev_hash()) { + let sync_prev_prev_hash = prev_header.prev_hash(); + request_part_from_peers( + part_id, + download, + shard_id, + sync_hash, + *sync_prev_prev_hash, + &self.network_adapter, + ); + + peer_requests_sent += 1; } } } @@ -1321,7 +1210,6 @@ fn request_part_from_external_storage( /// Asynchronously requests a state part from a suitable peer. fn request_part_from_peers( part_id: u64, - peer_id: PeerId, download: &mut DownloadStatus, shard_id: ShardId, sync_hash: CryptoHash, @@ -1330,8 +1218,6 @@ fn request_part_from_peers( ) { download.run_me.store(false, Ordering::SeqCst); download.state_requests_count += 1; - // TODO(saketh): clean this up now that targets are picked in peer manager - download.last_target = Some(peer_id); let run_me = download.run_me.clone(); near_performance_metrics::actix::spawn( @@ -1351,26 +1237,6 @@ fn request_part_from_peers( ); } -fn sent_request_part( - clock: Clock, - peer_id: PeerId, - part_id: u64, - shard_id: ShardId, - sync_hash: CryptoHash, - last_part_id_requested: &mut HashMap<(PeerId, ShardId), PendingRequestStatus>, - requested_target: &mut lru::LruCache<(u64, CryptoHash), PeerId>, - timeout: Duration, -) { - // FIXME: something is wrong - the index should have a shard_id too. - requested_target.put((part_id, sync_hash), peer_id.clone()); - last_part_id_requested - .entry((peer_id, shard_id)) - .and_modify(|pending_request| { - pending_request.missing_parts += 1; - }) - .or_insert_with(|| PendingRequestStatus::new(clock, timeout)); -} - /// Works around how data requests to external storage are done. /// This function investigates if the response is valid and updates `done` and `error` appropriately. /// If the response is successful, then the downloaded state file was written to the DB. @@ -1403,68 +1269,6 @@ fn process_download_response( } } -/// Create an abstract collection of elements to be shuffled. -/// Each element will appear in the shuffled output exactly `limit` times. -/// Use it as an iterator to access the shuffled collection. -/// -/// ```rust,ignore -/// let sampler = SamplerLimited::new(vec![1, 2, 3], 2); -/// -/// let res = sampler.collect::>(); -/// -/// assert!(res.len() == 6); -/// assert!(res.iter().filter(|v| v == 1).count() == 2); -/// assert!(res.iter().filter(|v| v == 2).count() == 2); -/// assert!(res.iter().filter(|v| v == 3).count() == 2); -/// ``` -/// -/// Out of the 90 possible values of `res` in the code above on of them is: -/// -/// ``` -/// vec![1, 2, 1, 3, 3, 2]; -/// ``` -struct SamplerLimited { - data: Vec, - limit: Vec, -} - -impl SamplerLimited { - fn new(data: Vec, limit: u64) -> Self { - if limit == 0 { - Self { data: vec![], limit: vec![] } - } else { - let len = data.len(); - Self { data, limit: vec![limit; len] } - } - } -} - -impl Iterator for SamplerLimited { - type Item = T; - - fn next(&mut self) -> Option { - if self.limit.is_empty() { - None - } else { - let len = self.limit.len(); - let ix = thread_rng().gen_range(0..len); - self.limit[ix] -= 1; - - if self.limit[ix] == 0 { - if ix + 1 != len { - self.limit[ix] = self.limit[len - 1]; - self.data.swap(ix, len - 1); - } - - self.limit.pop(); - self.data.pop() - } else { - Some(self.data[ix].clone()) - } - } - } -} - #[cfg(test)] mod test { use super::*; From 0ecf34affc9fb48ef6a59d766f8f1fcfb2c839f2 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Wed, 18 Sep 2024 18:04:31 -0400 Subject: [PATCH 20/20] get catching_up test compiling --- chain/client/src/sync/state.rs | 49 +++++++++-------- .../src/test_utils/peer_manager_mock.rs | 4 +- chain/client/src/tests/catching_up.rs | 11 ++-- chain/network/src/peer/peer_actor.rs | 12 +++-- .../src/peer_manager/connection/mod.rs | 4 +- .../src/peer_manager/network_state/mod.rs | 8 +-- .../src/peer_manager/peer_manager_actor.rs | 53 ++++++++++--------- chain/network/src/snapshot_hosts/mod.rs | 1 - chain/network/src/test_loop.rs | 6 ++- chain/network/src/test_utils.rs | 2 +- chain/network/src/types.rs | 2 +- 11 files changed, 85 insertions(+), 67 deletions(-) diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index 77c7cbc8878..39c284820e5 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -27,7 +27,7 @@ use crate::sync::external::{ use borsh::BorshDeserialize; use futures::{future, FutureExt}; use near_async::futures::{FutureSpawner, FutureSpawnerExt}; -use near_async::messaging::{SendAsync, CanSend}; +use near_async::messaging::{CanSend, SendAsync}; use near_async::time::{Clock, Duration, Utc}; use near_chain::chain::{ApplyStatePartsRequest, LoadMemtrieRequest}; use near_chain::near_chain_primitives; @@ -39,7 +39,8 @@ use near_client_primitives::types::{ }; use near_epoch_manager::EpochManagerAdapter; use near_network::types::{ - HighestHeightPeerInfo, NetworkRequests, NetworkResponses, PeerManagerAdapter, PeerManagerMessageRequest, StateSyncEvent, + HighestHeightPeerInfo, NetworkRequests, NetworkResponses, PeerManagerAdapter, + PeerManagerMessageRequest, StateSyncEvent, }; use near_primitives::hash::CryptoHash; use near_primitives::network::PeerId; @@ -142,9 +143,7 @@ impl StateSync { catchup: bool, ) -> Self { let external = match sync_config { - SyncConfig::Peers => { - None - } + SyncConfig::Peers => None, SyncConfig::ExternalStorage(ExternalStorageConfig { location, num_concurrent_requests, @@ -461,9 +460,7 @@ impl StateSync { _sync_hash: CryptoHash, ) { // TODO: where is the part validated though? - self.network_adapter.send(StateSyncEvent::StatePartReceived ( - shard_id, part_id - )); + self.network_adapter.send(StateSyncEvent::StatePartReceived(shard_id, part_id)); } /// Returns new ShardSyncDownload if successful, otherwise returns given shard_sync_download @@ -555,18 +552,18 @@ impl StateSync { near_performance_metrics::actix::spawn( std::any::type_name::(), self.network_adapter - .send_async(PeerManagerMessageRequest::NetworkRequests( + .send_async(PeerManagerMessageRequest::NetworkRequests( NetworkRequests::StateRequestHeader { shard_id, sync_hash, peer_id }, - )) - .then(move |result| { - if let Ok(NetworkResponses::RouteNotFound) = - result.map(|f| f.as_network_response()) - { - // Send a StateRequestHeader on the next iteration - run_me.store(true, Ordering::SeqCst); - } - future::ready(()) - }), + )) + .then(move |result| { + if let Ok(NetworkResponses::RouteNotFound) = + result.map(|f| f.as_network_response()) + { + // Send a StateRequestHeader on the next iteration + run_me.store(true, Ordering::SeqCst); + } + future::ready(()) + }), ); } } @@ -585,8 +582,11 @@ impl StateSync { // Parts are ordered such that its index match its part_id. let mut peer_requests_sent = 0; for (part_id, download) in parts_to_fetch(new_shard_sync_download) { - if self.external.is_some() && download.state_requests_count >= EXTERNAL_STORAGE_FALLBACK_THRESHOLD { - let StateSyncExternal { chain_id, semaphore, external } = self.external.as_ref().unwrap(); + if self.external.is_some() + && download.state_requests_count >= EXTERNAL_STORAGE_FALLBACK_THRESHOLD + { + let StateSyncExternal { chain_id, semaphore, external } = + self.external.as_ref().unwrap(); if semaphore.available_permits() > 0 { let sync_block_header = chain.get_block_header(&sync_hash).unwrap(); let epoch_id = sync_block_header.epoch_id(); @@ -1224,7 +1224,12 @@ fn request_part_from_peers( "StateSync", network_adapter .send_async(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::StateRequestPart { shard_id, sync_hash, sync_prev_prev_hash, part_id }, + NetworkRequests::StateRequestPart { + shard_id, + sync_hash, + sync_prev_prev_hash, + part_id, + }, )) .then(move |result| { if let Ok(NetworkResponses::RouteNotFound) = result.map(|f| f.as_network_response()) diff --git a/chain/client/src/test_utils/peer_manager_mock.rs b/chain/client/src/test_utils/peer_manager_mock.rs index 6d72b48b0f1..cfa39d71a84 100644 --- a/chain/client/src/test_utils/peer_manager_mock.rs +++ b/chain/client/src/test_utils/peer_manager_mock.rs @@ -1,4 +1,6 @@ -use near_network::types::{PeerManagerMessageRequest, PeerManagerMessageResponse, SetChainInfo, StateSyncEvent}; +use near_network::types::{ + PeerManagerMessageRequest, PeerManagerMessageResponse, SetChainInfo, StateSyncEvent, +}; pub struct PeerManagerMock { handle: Box< diff --git a/chain/client/src/tests/catching_up.rs b/chain/client/src/tests/catching_up.rs index 19a5ced219a..f49db6989ec 100644 --- a/chain/client/src/tests/catching_up.rs +++ b/chain/client/src/tests/catching_up.rs @@ -101,8 +101,9 @@ enum ReceiptsSyncPhases { pub struct StateRequestStruct { pub shard_id: u64, pub sync_hash: CryptoHash, + pub sync_prev_prev_hash: Option, pub part_id: Option, - pub peer_id: PeerId, + pub peer_id: Option, } /// Sanity checks that the incoming and outgoing receipts are properly sent and received @@ -268,8 +269,9 @@ fn test_catchup_receipts_sync_common(wait_till: u64, send: u64, sync_hold: bool) let srs = StateRequestStruct { shard_id: *shard_id, sync_hash: *sync_hash, + sync_prev_prev_hash: None, part_id: None, - peer_id: peer_id.clone(), + peer_id: Some(peer_id.clone()), }; if !seen_hashes_with_state .contains(&hash_func(&borsh::to_vec(&srs).unwrap())) @@ -283,16 +285,17 @@ fn test_catchup_receipts_sync_common(wait_till: u64, send: u64, sync_hold: bool) if let NetworkRequests::StateRequestPart { shard_id, sync_hash, + sync_prev_prev_hash, part_id, - peer_id, } = msg { if sync_hold { let srs = StateRequestStruct { shard_id: *shard_id, sync_hash: *sync_hash, + sync_prev_prev_hash: Some(*sync_prev_prev_hash), part_id: Some(*part_id), - peer_id: peer_id.clone(), + peer_id: None, }; if !seen_hashes_with_state .contains(&hash_func(&borsh::to_vec(&srs).unwrap())) diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 98d39d03724..1e585d70b94 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -308,7 +308,9 @@ impl PeerActor { // Override force_encoding for outbound Tier1 and Tier3 connections; // Tier1Handshake and Tier3Handshake are supported only with proto encoding. let force_encoding = match &stream.type_ { - tcp::StreamType::Outbound { tier, .. } if tier == &tcp::Tier::T1 || tier == &tcp::Tier::T3 => { + tcp::StreamType::Outbound { tier, .. } + if tier == &tcp::Tier::T1 || tier == &tcp::Tier::T3 => + { Some(Encoding::Proto) } _ => force_encoding, @@ -1156,7 +1158,9 @@ impl PeerActor { self.stop(ctx, ClosingReason::DisconnectMessage); } - PeerMessage::Tier1Handshake(_) | PeerMessage::Tier2Handshake(_) | PeerMessage::Tier3Handshake(_) => { + PeerMessage::Tier1Handshake(_) + | PeerMessage::Tier2Handshake(_) + | PeerMessage::Tier3Handshake(_) => { // Received handshake after already have seen handshake from this peer. tracing::debug!(target: "network", "Duplicate handshake from {}", self.peer_info); } @@ -1202,7 +1206,9 @@ impl PeerActor { // Record our own IP address as observed by the peer. if self.network_state.my_public_addr.read().is_none() { - if let Some(my_peer_info) = direct_peers.iter().find(|peer_info| peer_info.id == node_id) { + if let Some(my_peer_info) = + direct_peers.iter().find(|peer_info| peer_info.id == node_id) + { if let Some(addr) = my_peer_info.addr { let mut my_public_addr = self.network_state.my_public_addr.write(); *my_public_addr = Some(addr); diff --git a/chain/network/src/peer_manager/connection/mod.rs b/chain/network/src/peer_manager/connection/mod.rs index ce4f3b6e302..2035a673da5 100644 --- a/chain/network/src/peer_manager/connection/mod.rs +++ b/chain/network/src/peer_manager/connection/mod.rs @@ -39,7 +39,9 @@ impl tcp::Tier { PeerMessage::Tier3Handshake(_) => self == tcp::Tier::T3, PeerMessage::HandshakeFailure(_, _) => true, PeerMessage::LastEdge(_) => true, - PeerMessage::VersionedStateResponse(_) => self == tcp::Tier::T2 || self == tcp::Tier::T3, + PeerMessage::VersionedStateResponse(_) => { + self == tcp::Tier::T2 || self == tcp::Tier::T3 + } PeerMessage::Routed(msg) => self.is_allowed_routed(&msg.body), _ => self == tcp::Tier::T2, } diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 0f563a9c278..2a249c7f734 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -782,12 +782,8 @@ impl NetworkState { // TODO: cap the size of this queue, // perhaps preferentially allowing requests made by validators self.tier3_requests.lock().push_back(Tier3Request { - peer_info: PeerInfo { - id: peer_id, - addr: Some(request.addr), - account_id: None, - }, - body: Tier3RequestBody::StatePartRequest ( + peer_info: PeerInfo { id: peer_id, addr: Some(request.addr), account_id: None }, + body: Tier3RequestBody::StatePartRequest( request.shard_id, request.sync_hash, request.part_id, diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 3629f5f2d2d..08030ed6a5c 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -4,7 +4,8 @@ use crate::debug::{DebugStatus, GetDebugStatus}; use crate::network_protocol; use crate::network_protocol::SyncSnapshotHosts; use crate::network_protocol::{ - Disconnect, Edge, PeerIdOrHash, PeerMessage, Ping, Pong, RawRoutedMessage, RoutedMessageBody, StatePartRequest + Disconnect, Edge, PeerIdOrHash, PeerMessage, Ping, Pong, RawRoutedMessage, RoutedMessageBody, + StatePartRequest, }; use crate::peer::peer_actor::PeerActor; use crate::peer_manager::connection; @@ -18,7 +19,7 @@ use crate::tcp; use crate::types::{ ConnectedPeerInfo, HighestHeightPeerInfo, KnownProducer, NetworkInfo, NetworkRequests, NetworkResponses, PeerInfo, PeerManagerMessageRequest, PeerManagerMessageResponse, PeerType, - SetChainInfo, SnapshotHostInfo, Tier3RequestBody, StateSyncEvent + SetChainInfo, SnapshotHostInfo, StateSyncEvent, Tier3RequestBody, }; use ::time::ext::InstantExt as _; use actix::fut::future::wrap_future; @@ -623,15 +624,13 @@ impl PeerManagerActor { /// can be lost if the timeout is reached while it is in flight. fn stop_tier3_idle_connections(&self) { let now = self.clock.now(); - let _ = self.state + let _ = self + .state .tier3 .load() .ready .values() - .filter(|p| { - now - p.last_time_received_message.load() - > TIER3_IDLE_TIMEOUT - }) + .filter(|p| now - p.last_time_received_message.load() > TIER3_IDLE_TIMEOUT) .map(|p| p.stop(None)); } @@ -873,7 +872,12 @@ impl PeerManagerActor { NetworkResponses::RouteNotFound } } - NetworkRequests::StateRequestPart { shard_id, sync_hash, sync_prev_prev_hash, part_id } => { + NetworkRequests::StateRequestPart { + shard_id, + sync_hash, + sync_prev_prev_hash, + part_id, + } => { let mut success = false; // The node needs to include its own public address in the request @@ -882,26 +886,23 @@ impl PeerManagerActor { if let Some(peer_id) = self.state.snapshot_hosts.select_host_for_part( &sync_prev_prev_hash, shard_id, - part_id + part_id, ) { - success = self.state.send_message_to_peer( - &self.clock, - tcp::Tier::T2, - self.state.sign_message( + success = + self.state.send_message_to_peer( &self.clock, - RawRoutedMessage { - target: PeerIdOrHash::PeerId(peer_id), - body: RoutedMessageBody::StatePartRequest(StatePartRequest{ - shard_id, - sync_hash, - part_id, - addr, - }) - } - ), - ); - } - else { + tcp::Tier::T2, + self.state.sign_message( + &self.clock, + RawRoutedMessage { + target: PeerIdOrHash::PeerId(peer_id), + body: RoutedMessageBody::StatePartRequest( + StatePartRequest { shard_id, sync_hash, part_id, addr }, + ), + }, + ), + ); + } else { tracing::debug!(target: "network", "no hosts available for {shard_id}, {sync_prev_prev_hash}"); } } diff --git a/chain/network/src/snapshot_hosts/mod.rs b/chain/network/src/snapshot_hosts/mod.rs index 0799d20ba34..e9509a1a7a0 100644 --- a/chain/network/src/snapshot_hosts/mod.rs +++ b/chain/network/src/snapshot_hosts/mod.rs @@ -90,7 +90,6 @@ impl StatePartHost { } } - #[derive(Default)] struct PartPeerSelector { /// Ordered collection of available hosts for some desired state part diff --git a/chain/network/src/test_loop.rs b/chain/network/src/test_loop.rs index 0063281ebb1..8d7ab2dd651 100644 --- a/chain/network/src/test_loop.rs +++ b/chain/network/src/test_loop.rs @@ -13,7 +13,7 @@ use crate::state_witness::{ }; use crate::types::{ NetworkRequests, NetworkResponses, PeerManagerMessageRequest, PeerManagerMessageResponse, - SetChainInfo, + SetChainInfo, StateSyncEvent, }; use near_async::actix::ActixResult; use near_async::futures::{FutureSpawner, FutureSpawnerExt}; @@ -188,6 +188,10 @@ impl Handler for TestLoopPeerManagerActor { fn handle(&mut self, _msg: SetChainInfo) {} } +impl Handler for TestLoopPeerManagerActor { + fn handle(&mut self, _msg: StateSyncEvent) {} +} + impl Handler for TestLoopPeerManagerActor { fn handle(&mut self, msg: PeerManagerMessageRequest) -> PeerManagerMessageResponse { let PeerManagerMessageRequest::NetworkRequests(request) = msg else { diff --git a/chain/network/src/test_utils.rs b/chain/network/src/test_utils.rs index 7a8504c5228..2bbb64d433c 100644 --- a/chain/network/src/test_utils.rs +++ b/chain/network/src/test_utils.rs @@ -1,7 +1,7 @@ use crate::network_protocol::PeerInfo; use crate::types::{ NetworkInfo, NetworkResponses, PeerManagerMessageRequest, PeerManagerMessageResponse, - SetChainInfo, StateSyncEvent + SetChainInfo, StateSyncEvent, }; use crate::PeerManagerActor; use actix::{Actor, ActorContext, Context, Handler}; diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 28ccc9feefc..0efaeff5f9e 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -249,7 +249,7 @@ pub enum NetworkRequests { shard_id: ShardId, sync_hash: CryptoHash, sync_prev_prev_hash: CryptoHash, - part_id: u64 + part_id: u64, }, /// Ban given peer. BanPeer { peer_id: PeerId, ban_reason: ReasonForBan },