diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index 0344ab89d5e..39c284820e5 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -27,7 +27,7 @@ use crate::sync::external::{ use borsh::BorshDeserialize; use futures::{future, FutureExt}; use near_async::futures::{FutureSpawner, FutureSpawnerExt}; -use near_async::messaging::SendAsync; +use near_async::messaging::{CanSend, SendAsync}; use near_async::time::{Clock, Duration, Utc}; use near_chain::chain::{ApplyStatePartsRequest, LoadMemtrieRequest}; use near_chain::near_chain_primitives; @@ -38,9 +38,9 @@ use near_client_primitives::types::{ format_shard_sync_phase, DownloadStatus, ShardSyncDownload, ShardSyncStatus, }; use near_epoch_manager::EpochManagerAdapter; -use near_network::types::PeerManagerMessageRequest; use near_network::types::{ HighestHeightPeerInfo, NetworkRequests, NetworkResponses, PeerManagerAdapter, + PeerManagerMessageRequest, StateSyncEvent, }; use near_primitives::hash::CryptoHash; use near_primitives::network::PeerId; @@ -52,10 +52,8 @@ use near_primitives::state_sync::{ use near_primitives::types::{AccountId, EpochHeight, EpochId, ShardId, StateRoot}; use near_store::DBCol; use rand::seq::SliceRandom; -use rand::{thread_rng, Rng}; +use rand::thread_rng; use std::collections::HashMap; -use std::num::NonZeroUsize; -use std::ops::Add; use std::sync::atomic::Ordering; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; @@ -69,6 +67,9 @@ pub const MAX_STATE_PART_REQUEST: u64 = 16; /// Number of state parts already requested stored as pending. /// This number should not exceed MAX_STATE_PART_REQUEST times (number of peers in the network). pub const MAX_PENDING_PART: u64 = MAX_STATE_PART_REQUEST * 10000; +/// A node with external storage configured first tries to obtain state parts from peers. +/// For each part, it will make this many attempts before getting it from external storage. +pub const EXTERNAL_STORAGE_FALLBACK_THRESHOLD: u64 = 16; /// Time limit per state dump iteration. /// A node must check external storage for parts to dump again once time is up. pub const STATE_DUMP_ITERATION_TIME_LIMIT_SECS: u64 = 300; @@ -80,22 +81,6 @@ pub enum StateSyncResult { Completed, } -struct PendingRequestStatus { - clock: Clock, - /// Number of parts that are in progress (we requested them from a given peer but didn't get the answer yet). - missing_parts: usize, - wait_until: Utc, -} - -impl PendingRequestStatus { - fn new(clock: Clock, timeout: Duration) -> Self { - Self { clock: clock.clone(), missing_parts: 1, wait_until: clock.now_utc().add(timeout) } - } - fn expired(&self) -> bool { - self.clock.now_utc() > self.wait_until - } -} - pub enum StateSyncFileDownloadResult { StateHeader { header_length: u64, header: ShardStateSyncResponseHeader }, StatePart { part_length: u64 }, @@ -110,32 +95,21 @@ pub struct StateSyncGetFileResult { result: Result, } -/// How to retrieve the state data. -enum StateSyncInner { - /// Request both the state header and state parts from the peers. - Peers { - /// Which parts were requested from which peer and when. - last_part_id_requested: HashMap<(PeerId, ShardId), PendingRequestStatus>, - /// Map from which part we requested to whom. - requested_target: lru::LruCache<(u64, CryptoHash), PeerId>, - }, - /// Requests the state header from peers but gets the state parts from an - /// external storage. - External { - /// Chain ID. - chain_id: String, - /// This semaphore imposes a restriction on the maximum number of simultaneous downloads - semaphore: Arc, - /// Connection to the external storage. - external: ExternalConnection, - }, +struct StateSyncExternal { + /// Chain ID. + chain_id: String, + /// This semaphore imposes a restriction on the maximum number of simultaneous downloads + semaphore: Arc, + /// Connection to the external storage. + external: ExternalConnection, } /// Helper to track state sync. pub struct StateSync { clock: Clock, - /// How to retrieve the state data. - inner: StateSyncInner, + + /// External storage, if configured. + external: Option, /// Is used for communication with the peers. network_adapter: PeerManagerAdapter, @@ -168,13 +142,8 @@ impl StateSync { sync_config: &SyncConfig, catchup: bool, ) -> Self { - let inner = match sync_config { - SyncConfig::Peers => StateSyncInner::Peers { - last_part_id_requested: Default::default(), - requested_target: lru::LruCache::new( - NonZeroUsize::new(MAX_PENDING_PART as usize).unwrap(), - ), - }, + let external = match sync_config { + SyncConfig::Peers => None, SyncConfig::ExternalStorage(ExternalStorageConfig { location, num_concurrent_requests, @@ -206,17 +175,17 @@ impl StateSync { } else { *num_concurrent_requests } as usize; - StateSyncInner::External { + Some(StateSyncExternal { chain_id: chain_id.to_string(), semaphore: Arc::new(tokio::sync::Semaphore::new(num_permits)), external, - } + }) } }; let (tx, rx) = channel::(); StateSync { clock, - inner, + external, network_adapter, timeout, state_parts_apply_results: HashMap::new(), @@ -488,52 +457,10 @@ impl StateSync { &mut self, part_id: u64, shard_id: ShardId, - sync_hash: CryptoHash, + _sync_hash: CryptoHash, ) { - match &mut self.inner { - StateSyncInner::Peers { last_part_id_requested, requested_target } => { - let key = (part_id, sync_hash); - // Check that it came from the target that we requested it from. - if let Some(target) = requested_target.get(&key) { - if last_part_id_requested.get_mut(&(target.clone(), shard_id)).map_or( - false, - |request| { - request.missing_parts = request.missing_parts.saturating_sub(1); - request.missing_parts == 0 - }, - ) { - last_part_id_requested.remove(&(target.clone(), shard_id)); - } - } - } - StateSyncInner::External { .. } => { - // Do nothing. - } - } - } - - /// Avoids peers that already have outstanding requests for parts. - fn select_peers( - &mut self, - highest_height_peers: &[HighestHeightPeerInfo], - shard_id: ShardId, - ) -> Result, near_chain::Error> { - let peers: Vec = - highest_height_peers.iter().map(|peer| peer.peer_info.id.clone()).collect(); - let res = match &mut self.inner { - StateSyncInner::Peers { last_part_id_requested, .. } => { - last_part_id_requested.retain(|_, request| !request.expired()); - peers - .into_iter() - .filter(|peer| { - // If we still have a pending request from this node - don't add another one. - !last_part_id_requested.contains_key(&(peer.clone(), shard_id)) - }) - .collect::>() - } - StateSyncInner::External { .. } => peers, - }; - Ok(res) + // TODO: where is the part validated though? + self.network_adapter.send(StateSyncEvent::StatePartReceived(shard_id, part_id)); } /// Returns new ShardSyncDownload if successful, otherwise returns given shard_sync_download @@ -547,23 +474,21 @@ impl StateSync { runtime_adapter: Arc, state_parts_future_spawner: &dyn FutureSpawner, ) -> Result<(), near_chain::Error> { - let mut possible_targets = vec![]; - match self.inner { - StateSyncInner::Peers { .. } => { - possible_targets = self.select_peers(highest_height_peers, shard_id)?; - if possible_targets.is_empty() { - tracing::debug!(target: "sync", "Can't request a state header: No possible targets"); - // In most cases it means that all the targets are currently busy (that we have a pending request with them). - return Ok(()); - } - } - // We do not need to select a target for external storage. - StateSyncInner::External { .. } => {} - } - // Downloading strategy starts here match shard_sync_download.status { ShardSyncStatus::StateDownloadHeader => { + // If no external storage is configured, we have to request headers from our peers + let possible_targets = match self.external { + Some(_) => vec![], + None => { + if highest_height_peers.is_empty() { + tracing::debug!(target: "sync", "Can't request a state header: No possible targets"); + return Ok(()); + } + highest_height_peers.iter().map(|peer| peer.peer_info.id.clone()).collect() + } + }; + self.request_shard_header( chain, shard_id, @@ -577,7 +502,6 @@ impl StateSync { self.request_shard_parts( shard_id, sync_hash, - possible_targets, shard_sync_download, chain, runtime_adapter, @@ -601,49 +525,46 @@ impl StateSync { state_parts_future_spawner: &dyn FutureSpawner, ) { let header_download = new_shard_sync_download.get_header_download_mut().unwrap(); - match &mut self.inner { - StateSyncInner::Peers { .. } => { - let peer_id = possible_targets.choose(&mut thread_rng()).cloned().unwrap(); - tracing::debug!(target: "sync", ?peer_id, shard_id, ?sync_hash, ?possible_targets, "request_shard_header"); - assert!(header_download.run_me.load(Ordering::SeqCst)); - header_download.run_me.store(false, Ordering::SeqCst); - header_download.state_requests_count += 1; - header_download.last_target = Some(peer_id.clone()); - let run_me = header_download.run_me.clone(); - near_performance_metrics::actix::spawn( - std::any::type_name::(), - self.network_adapter - .send_async(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::StateRequestHeader { shard_id, sync_hash, peer_id }, - )) - .then(move |result| { - if let Ok(NetworkResponses::RouteNotFound) = - result.map(|f| f.as_network_response()) - { - // Send a StateRequestHeader on the next iteration - run_me.store(true, Ordering::SeqCst); - } - future::ready(()) - }), - ); - } - StateSyncInner::External { chain_id, external, .. } => { - let sync_block_header = chain.get_block_header(&sync_hash).unwrap(); - let epoch_id = sync_block_header.epoch_id(); - let epoch_info = chain.epoch_manager.get_epoch_info(epoch_id).unwrap(); - let epoch_height = epoch_info.epoch_height(); - request_header_from_external_storage( - header_download, - shard_id, - sync_hash, - epoch_id, - epoch_height, - &chain_id.clone(), - external.clone(), - state_parts_future_spawner, - self.state_parts_mpsc_tx.clone(), - ); - } + if let Some(StateSyncExternal { chain_id, external, .. }) = &self.external { + let sync_block_header = chain.get_block_header(&sync_hash).unwrap(); + let epoch_id = sync_block_header.epoch_id(); + let epoch_info = chain.epoch_manager.get_epoch_info(epoch_id).unwrap(); + let epoch_height = epoch_info.epoch_height(); + request_header_from_external_storage( + header_download, + shard_id, + sync_hash, + epoch_id, + epoch_height, + &chain_id.clone(), + external.clone(), + state_parts_future_spawner, + self.state_parts_mpsc_tx.clone(), + ); + } else { + let peer_id = possible_targets.choose(&mut thread_rng()).cloned().unwrap(); + tracing::debug!(target: "sync", ?peer_id, shard_id, ?sync_hash, ?possible_targets, "request_shard_header"); + assert!(header_download.run_me.load(Ordering::SeqCst)); + header_download.run_me.store(false, Ordering::SeqCst); + header_download.state_requests_count += 1; + header_download.last_target = Some(peer_id.clone()); + let run_me = header_download.run_me.clone(); + near_performance_metrics::actix::spawn( + std::any::type_name::(), + self.network_adapter + .send_async(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::StateRequestHeader { shard_id, sync_hash, peer_id }, + )) + .then(move |result| { + if let Ok(NetworkResponses::RouteNotFound) = + result.map(|f| f.as_network_response()) + { + // Send a StateRequestHeader on the next iteration + run_me.store(true, Ordering::SeqCst); + } + future::ready(()) + }), + ); } } @@ -652,7 +573,6 @@ impl StateSync { &mut self, shard_id: ShardId, sync_hash: CryptoHash, - possible_targets: Vec, new_shard_sync_download: &mut ShardSyncDownload, chain: &Chain, runtime_adapter: Arc, @@ -660,52 +580,23 @@ impl StateSync { ) { // Iterate over all parts that needs to be requested (i.e. download.run_me is true). // Parts are ordered such that its index match its part_id. - match &mut self.inner { - StateSyncInner::Peers { last_part_id_requested, requested_target } => { - // We'll select all the 'highest' peers + validators as candidates (excluding those that gave us timeout in the past). - // And for each one of them, we'll ask for up to 16 (MAX_STATE_PART_REQUEST) parts. - let possible_targets_sampler = - SamplerLimited::new(possible_targets, MAX_STATE_PART_REQUEST); - - // For every part that needs to be requested it is selected one - // peer (target) randomly to request the part from. - // IMPORTANT: here we use 'zip' with possible_target_sampler - - // which is limited. So at any moment we'll not request more - // than possible_targets.len() * MAX_STATE_PART_REQUEST parts. - for ((part_id, download), target) in - parts_to_fetch(new_shard_sync_download).zip(possible_targets_sampler) - { - sent_request_part( - self.clock.clone(), - target.clone(), - part_id, - shard_id, - sync_hash, - last_part_id_requested, - requested_target, - self.timeout, - ); - request_part_from_peers( - part_id, - target, - download, - shard_id, - sync_hash, - &self.network_adapter, - ); - } - } - StateSyncInner::External { chain_id, semaphore, external } => { - let sync_block_header = chain.get_block_header(&sync_hash).unwrap(); - let epoch_id = sync_block_header.epoch_id(); - let epoch_info = chain.epoch_manager.get_epoch_info(epoch_id).unwrap(); - let epoch_height = epoch_info.epoch_height(); - - let shard_state_header = chain.get_state_header(shard_id, sync_hash).unwrap(); - let state_root = shard_state_header.chunk_prev_state_root(); - let state_num_parts = shard_state_header.num_state_parts(); + let mut peer_requests_sent = 0; + for (part_id, download) in parts_to_fetch(new_shard_sync_download) { + if self.external.is_some() + && download.state_requests_count >= EXTERNAL_STORAGE_FALLBACK_THRESHOLD + { + let StateSyncExternal { chain_id, semaphore, external } = + self.external.as_ref().unwrap(); + if semaphore.available_permits() > 0 { + let sync_block_header = chain.get_block_header(&sync_hash).unwrap(); + let epoch_id = sync_block_header.epoch_id(); + let epoch_info = chain.epoch_manager.get_epoch_info(epoch_id).unwrap(); + let epoch_height = epoch_info.epoch_height(); + + let shard_state_header = chain.get_state_header(shard_id, sync_hash).unwrap(); + let state_root = shard_state_header.chunk_prev_state_root(); + let state_num_parts = shard_state_header.num_state_parts(); - for (part_id, download) in parts_to_fetch(new_shard_sync_download) { request_part_from_external_storage( part_id, download, @@ -722,8 +613,27 @@ impl StateSync { state_parts_future_spawner, self.state_parts_mpsc_tx.clone(), ); - if semaphore.available_permits() == 0 { - break; + } + } else { + if peer_requests_sent >= MAX_STATE_PART_REQUEST { + continue; + } + + // The request sent to the network adapater needs to include the sync_prev_prev_hash + // so that a peer hosting the correct snapshot can be selected. + if let Ok(header) = chain.get_block_header(&sync_hash) { + if let Ok(prev_header) = chain.get_block_header(&header.prev_hash()) { + let sync_prev_prev_hash = prev_header.prev_hash(); + request_part_from_peers( + part_id, + download, + shard_id, + sync_hash, + *sync_prev_prev_hash, + &self.network_adapter, + ); + + peer_requests_sent += 1; } } } @@ -1300,27 +1210,28 @@ fn request_part_from_external_storage( /// Asynchronously requests a state part from a suitable peer. fn request_part_from_peers( part_id: u64, - peer_id: PeerId, download: &mut DownloadStatus, shard_id: ShardId, sync_hash: CryptoHash, + sync_prev_prev_hash: CryptoHash, network_adapter: &PeerManagerAdapter, ) { download.run_me.store(false, Ordering::SeqCst); download.state_requests_count += 1; - download.last_target = Some(peer_id.clone()); let run_me = download.run_me.clone(); near_performance_metrics::actix::spawn( "StateSync", network_adapter .send_async(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::StateRequestPart { shard_id, sync_hash, part_id, peer_id }, + NetworkRequests::StateRequestPart { + shard_id, + sync_hash, + sync_prev_prev_hash, + part_id, + }, )) .then(move |result| { - // TODO: possible optimization - in the current code, even if one of the targets it not present in the network graph - // (so we keep getting RouteNotFound) - we'll still keep trying to assign parts to it. - // Fortunately only once every 60 seconds (timeout value). if let Ok(NetworkResponses::RouteNotFound) = result.map(|f| f.as_network_response()) { // Send a StateRequestPart on the next iteration @@ -1331,26 +1242,6 @@ fn request_part_from_peers( ); } -fn sent_request_part( - clock: Clock, - peer_id: PeerId, - part_id: u64, - shard_id: ShardId, - sync_hash: CryptoHash, - last_part_id_requested: &mut HashMap<(PeerId, ShardId), PendingRequestStatus>, - requested_target: &mut lru::LruCache<(u64, CryptoHash), PeerId>, - timeout: Duration, -) { - // FIXME: something is wrong - the index should have a shard_id too. - requested_target.put((part_id, sync_hash), peer_id.clone()); - last_part_id_requested - .entry((peer_id, shard_id)) - .and_modify(|pending_request| { - pending_request.missing_parts += 1; - }) - .or_insert_with(|| PendingRequestStatus::new(clock, timeout)); -} - /// Works around how data requests to external storage are done. /// This function investigates if the response is valid and updates `done` and `error` appropriately. /// If the response is successful, then the downloaded state file was written to the DB. @@ -1383,68 +1274,6 @@ fn process_download_response( } } -/// Create an abstract collection of elements to be shuffled. -/// Each element will appear in the shuffled output exactly `limit` times. -/// Use it as an iterator to access the shuffled collection. -/// -/// ```rust,ignore -/// let sampler = SamplerLimited::new(vec![1, 2, 3], 2); -/// -/// let res = sampler.collect::>(); -/// -/// assert!(res.len() == 6); -/// assert!(res.iter().filter(|v| v == 1).count() == 2); -/// assert!(res.iter().filter(|v| v == 2).count() == 2); -/// assert!(res.iter().filter(|v| v == 3).count() == 2); -/// ``` -/// -/// Out of the 90 possible values of `res` in the code above on of them is: -/// -/// ``` -/// vec![1, 2, 1, 3, 3, 2]; -/// ``` -struct SamplerLimited { - data: Vec, - limit: Vec, -} - -impl SamplerLimited { - fn new(data: Vec, limit: u64) -> Self { - if limit == 0 { - Self { data: vec![], limit: vec![] } - } else { - let len = data.len(); - Self { data, limit: vec![limit; len] } - } - } -} - -impl Iterator for SamplerLimited { - type Item = T; - - fn next(&mut self) -> Option { - if self.limit.is_empty() { - None - } else { - let len = self.limit.len(); - let ix = thread_rng().gen_range(0..len); - self.limit[ix] -= 1; - - if self.limit[ix] == 0 { - if ix + 1 != len { - self.limit[ix] = self.limit[len - 1]; - self.data.swap(ix, len - 1); - } - - self.limit.pop(); - self.data.pop() - } else { - Some(self.data[ix].clone()) - } - } - } -} - #[cfg(test)] mod test { use super::*; diff --git a/chain/client/src/test_utils/peer_manager_mock.rs b/chain/client/src/test_utils/peer_manager_mock.rs index 0f6c9031890..cfa39d71a84 100644 --- a/chain/client/src/test_utils/peer_manager_mock.rs +++ b/chain/client/src/test_utils/peer_manager_mock.rs @@ -1,5 +1,6 @@ -use near_network::types::SetChainInfo; -use near_network::types::{PeerManagerMessageRequest, PeerManagerMessageResponse}; +use near_network::types::{ + PeerManagerMessageRequest, PeerManagerMessageResponse, SetChainInfo, StateSyncEvent, +}; pub struct PeerManagerMock { handle: Box< @@ -37,3 +38,8 @@ impl actix::Handler for PeerManagerMock { type Result = (); fn handle(&mut self, _msg: SetChainInfo, _ctx: &mut Self::Context) {} } + +impl actix::Handler for PeerManagerMock { + type Result = (); + fn handle(&mut self, _msg: StateSyncEvent, _ctx: &mut Self::Context) {} +} diff --git a/chain/client/src/tests/catching_up.rs b/chain/client/src/tests/catching_up.rs index 19a5ced219a..f49db6989ec 100644 --- a/chain/client/src/tests/catching_up.rs +++ b/chain/client/src/tests/catching_up.rs @@ -101,8 +101,9 @@ enum ReceiptsSyncPhases { pub struct StateRequestStruct { pub shard_id: u64, pub sync_hash: CryptoHash, + pub sync_prev_prev_hash: Option, pub part_id: Option, - pub peer_id: PeerId, + pub peer_id: Option, } /// Sanity checks that the incoming and outgoing receipts are properly sent and received @@ -268,8 +269,9 @@ fn test_catchup_receipts_sync_common(wait_till: u64, send: u64, sync_hold: bool) let srs = StateRequestStruct { shard_id: *shard_id, sync_hash: *sync_hash, + sync_prev_prev_hash: None, part_id: None, - peer_id: peer_id.clone(), + peer_id: Some(peer_id.clone()), }; if !seen_hashes_with_state .contains(&hash_func(&borsh::to_vec(&srs).unwrap())) @@ -283,16 +285,17 @@ fn test_catchup_receipts_sync_common(wait_till: u64, send: u64, sync_hold: bool) if let NetworkRequests::StateRequestPart { shard_id, sync_hash, + sync_prev_prev_hash, part_id, - peer_id, } = msg { if sync_hold { let srs = StateRequestStruct { shard_id: *shard_id, sync_hash: *sync_hash, + sync_prev_prev_hash: Some(*sync_prev_prev_hash), part_id: Some(*part_id), - peer_id: peer_id.clone(), + peer_id: None, }; if !seen_hashes_with_state .contains(&hash_func(&borsh::to_vec(&srs).unwrap())) diff --git a/chain/network/src/network_protocol/borsh_conv.rs b/chain/network/src/network_protocol/borsh_conv.rs index 4ef69a0dc58..cb899f8f896 100644 --- a/chain/network/src/network_protocol/borsh_conv.rs +++ b/chain/network/src/network_protocol/borsh_conv.rs @@ -216,6 +216,9 @@ impl From<&mem::PeerMessage> for net::PeerMessage { panic!("Tier1Handshake is not supported in Borsh encoding") } mem::PeerMessage::Tier2Handshake(h) => net::PeerMessage::Handshake((&h).into()), + mem::PeerMessage::Tier3Handshake(_) => { + panic!("Tier3Handshake is not supported in Borsh encoding") + } mem::PeerMessage::HandshakeFailure(pi, hfr) => { net::PeerMessage::HandshakeFailure(pi, (&hfr).into()) } diff --git a/chain/network/src/network_protocol/mod.rs b/chain/network/src/network_protocol/mod.rs index 6c584219c70..a2a2c28b0e2 100644 --- a/chain/network/src/network_protocol/mod.rs +++ b/chain/network/src/network_protocol/mod.rs @@ -411,6 +411,7 @@ pub struct Disconnect { pub enum PeerMessage { Tier1Handshake(Handshake), Tier2Handshake(Handshake), + Tier3Handshake(Handshake), HandshakeFailure(PeerInfo, HandshakeFailureReason), /// When a failed nonce is used by some peer, this message is sent back as evidence. LastEdge(Edge), @@ -552,6 +553,7 @@ pub enum RoutedMessageBody { VersionedChunkEndorsement(ChunkEndorsement), EpochSyncRequest, EpochSyncResponse(CompressedEpochSyncProof), + StatePartRequest(StatePartRequest), } impl RoutedMessageBody { @@ -645,6 +647,7 @@ impl fmt::Debug for RoutedMessageBody { RoutedMessageBody::EpochSyncResponse(_) => { write!(f, "EpochSyncResponse") } + RoutedMessageBody::StatePartRequest(_) => write!(f, "StatePartRequest"), } } } diff --git a/chain/network/src/network_protocol/network.proto b/chain/network/src/network_protocol/network.proto index f60bd202312..cc95644d4d5 100644 --- a/chain/network/src/network_protocol/network.proto +++ b/chain/network/src/network_protocol/network.proto @@ -458,17 +458,15 @@ message PeerMessage { TraceContext trace_context = 26; oneof message_type { - // Handshakes for TIER1 and TIER2 networks are considered separate, - // so that a node binary which doesn't support TIER1 connection won't - // be even able to PARSE the handshake. This way we avoid accidental - // connections, such that one end thinks it is a TIER2 connection and the - // other thinks it is a TIER1 connection. As currently both TIER1 and TIER2 - // connections are handled by the same PeerActor, both fields use the same - // underlying message type. If we ever decide to separate the handshake - // implementations, we can copy the Handshake message type defition and - // make it evolve differently for TIER1 and TIER2. + // Handshakes for different network tiers explicitly use different PeerMessage variants. + // This way we avoid accidental connections, such that one end thinks it is a TIER2 connection + // and the other thinks it is a TIER1 connection. Currently the same PeerActor handles + // all types of connections, hence the contents are identical for all types of connections. + // If we ever decide to separate the handshake implementations, we can copy the Handshake message + // type definition and make it evolve differently for different tiers. Handshake tier1_handshake = 27; Handshake tier2_handshake = 4; + Handshake tier3_handshake = 33; HandshakeFailure handshake_failure = 5; LastEdge last_edge = 6; diff --git a/chain/network/src/network_protocol/proto_conv/peer_message.rs b/chain/network/src/network_protocol/proto_conv/peer_message.rs index 38b4250b15a..b73a66d7966 100644 --- a/chain/network/src/network_protocol/proto_conv/peer_message.rs +++ b/chain/network/src/network_protocol/proto_conv/peer_message.rs @@ -234,6 +234,7 @@ impl From<&PeerMessage> for proto::PeerMessage { message_type: Some(match x { PeerMessage::Tier1Handshake(h) => ProtoMT::Tier1Handshake(h.into()), PeerMessage::Tier2Handshake(h) => ProtoMT::Tier2Handshake(h.into()), + PeerMessage::Tier3Handshake(h) => ProtoMT::Tier3Handshake(h.into()), PeerMessage::HandshakeFailure(pi, hfr) => { ProtoMT::HandshakeFailure((pi, hfr).into()) } @@ -398,6 +399,9 @@ impl TryFrom<&proto::PeerMessage> for PeerMessage { ProtoMT::Tier2Handshake(h) => { PeerMessage::Tier2Handshake(h.try_into().map_err(Self::Error::Handshake)?) } + ProtoMT::Tier3Handshake(h) => { + PeerMessage::Tier3Handshake(h.try_into().map_err(Self::Error::Handshake)?) + } ProtoMT::HandshakeFailure(hf) => { let (pi, hfr) = hf.try_into().map_err(Self::Error::HandshakeFailure)?; PeerMessage::HandshakeFailure(pi, hfr) diff --git a/chain/network/src/network_protocol/state_sync.rs b/chain/network/src/network_protocol/state_sync.rs index d30170b83d5..c500893f3f4 100644 --- a/chain/network/src/network_protocol/state_sync.rs +++ b/chain/network/src/network_protocol/state_sync.rs @@ -107,3 +107,26 @@ pub enum SnapshotHostInfoVerificationError { )] TooManyShards(usize), } + +/// Message used to request a state part. +/// +#[derive( + Clone, + Debug, + Eq, + PartialEq, + Hash, + borsh::BorshSerialize, + borsh::BorshDeserialize, + ProtocolSchema, +)] +pub struct StatePartRequest { + /// Requested shard id + pub shard_id: ShardId, + /// Hash of the requested snapshot's state root + pub sync_hash: CryptoHash, + /// Requested part id + pub part_id: u64, + /// Public address of the node making the request + pub addr: std::net::SocketAddr, +} diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 4ff60267190..1e585d70b94 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -284,6 +284,18 @@ impl PeerActor { .start_outbound(peer_id.clone()) .map_err(ClosingReason::OutboundNotAllowed)? } + tcp::Tier::T3 => { + // Loop connections are allowed only on T1; see comment above + if peer_id == &network_state.config.node_id() { + return Err(ClosingReason::OutboundNotAllowed( + connection::PoolError::UnexpectedLoopConnection, + )); + } + network_state + .tier3 + .start_outbound(peer_id.clone()) + .map_err(ClosingReason::OutboundNotAllowed)? + } }, handshake_spec: HandshakeSpec { partial_edge_info: network_state.propose_edge(&clock, peer_id, None), @@ -293,10 +305,12 @@ impl PeerActor { }, }, }; - // Override force_encoding for outbound Tier1 connections, - // since Tier1Handshake is supported only with proto encoding. + // Override force_encoding for outbound Tier1 and Tier3 connections; + // Tier1Handshake and Tier3Handshake are supported only with proto encoding. let force_encoding = match &stream.type_ { - tcp::StreamType::Outbound { tier, .. } if tier == &tcp::Tier::T1 => { + tcp::StreamType::Outbound { tier, .. } + if tier == &tcp::Tier::T1 || tier == &tcp::Tier::T3 => + { Some(Encoding::Proto) } _ => force_encoding, @@ -480,6 +494,7 @@ impl PeerActor { let msg = match spec.tier { tcp::Tier::T1 => PeerMessage::Tier1Handshake(handshake), tcp::Tier::T2 => PeerMessage::Tier2Handshake(handshake), + tcp::Tier::T3 => PeerMessage::Tier3Handshake(handshake), }; self.send_message_or_log(&msg); } @@ -939,6 +954,9 @@ impl PeerActor { (PeerStatus::Connecting { .. }, PeerMessage::Tier2Handshake(msg)) => { self.process_handshake(ctx, tcp::Tier::T2, msg) } + (PeerStatus::Connecting { .. }, PeerMessage::Tier3Handshake(msg)) => { + self.process_handshake(ctx, tcp::Tier::T3, msg) + } (_, msg) => { tracing::warn!(target:"network","unexpected message during handshake: {}",msg) } @@ -1140,7 +1158,9 @@ impl PeerActor { self.stop(ctx, ClosingReason::DisconnectMessage); } - PeerMessage::Tier1Handshake(_) | PeerMessage::Tier2Handshake(_) => { + PeerMessage::Tier1Handshake(_) + | PeerMessage::Tier2Handshake(_) + | PeerMessage::Tier3Handshake(_) => { // Received handshake after already have seen handshake from this peer. tracing::debug!(target: "network", "Duplicate handshake from {}", self.peer_info); } @@ -1182,8 +1202,20 @@ impl PeerActor { self.stop(ctx, ClosingReason::Ban(ReasonForBan::Abusive)); } - // Add received peers to the peer store let node_id = self.network_state.config.node_id(); + + // Record our own IP address as observed by the peer. + if self.network_state.my_public_addr.read().is_none() { + if let Some(my_peer_info) = + direct_peers.iter().find(|peer_info| peer_info.id == node_id) + { + if let Some(addr) = my_peer_info.addr { + let mut my_public_addr = self.network_state.my_public_addr.write(); + *my_public_addr = Some(addr); + } + } + } + // Add received indirect peers to the peer store self.network_state.peer_store.add_indirect_peers( &self.clock, peers.into_iter().filter(|peer_info| peer_info.id != node_id), diff --git a/chain/network/src/peer_manager/connection/mod.rs b/chain/network/src/peer_manager/connection/mod.rs index ea9f7edccab..2035a673da5 100644 --- a/chain/network/src/peer_manager/connection/mod.rs +++ b/chain/network/src/peer_manager/connection/mod.rs @@ -36,8 +36,12 @@ impl tcp::Tier { match msg { PeerMessage::Tier1Handshake(_) => self == tcp::Tier::T1, PeerMessage::Tier2Handshake(_) => self == tcp::Tier::T2, + PeerMessage::Tier3Handshake(_) => self == tcp::Tier::T3, PeerMessage::HandshakeFailure(_, _) => true, PeerMessage::LastEdge(_) => true, + PeerMessage::VersionedStateResponse(_) => { + self == tcp::Tier::T2 || self == tcp::Tier::T3 + } PeerMessage::Routed(msg) => self.is_allowed_routed(&msg.body), _ => self == tcp::Tier::T2, } diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 06c1bad9ffe..2a249c7f734 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -28,7 +28,7 @@ use crate::state_witness::{ use crate::stats::metrics; use crate::store; use crate::tcp; -use crate::types::{ChainInfo, PeerType, ReasonForBan}; +use crate::types::{ChainInfo, PeerType, ReasonForBan, Tier3Request, Tier3RequestBody}; use anyhow::Context; use arc_swap::ArcSwap; use near_async::messaging::{CanSend, SendAsync, Sender}; @@ -38,7 +38,8 @@ use near_primitives::hash::CryptoHash; use near_primitives::network::PeerId; use near_primitives::stateless_validation::chunk_endorsement::ChunkEndorsement; use near_primitives::types::AccountId; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; +use std::collections::VecDeque; use std::net::SocketAddr; use std::num::NonZeroUsize; use std::sync::atomic::AtomicUsize; @@ -115,8 +116,11 @@ pub(crate) struct NetworkState { /// Connected peers (inbound and outbound) with their full peer information. pub tier2: connection::Pool, pub tier1: connection::Pool, + pub tier3: connection::Pool, /// Semaphore limiting inflight inbound handshakes. pub inbound_handshake_permits: Arc, + /// The public IP of this node; available after connecting to any one peer. + pub my_public_addr: Arc>>, /// Peer store that provides read/write access to peers. pub peer_store: peer_store::PeerStore, /// Information about state snapshots hosted by network peers. @@ -143,6 +147,9 @@ pub(crate) struct NetworkState { /// TODO(gprusak): consider removing it altogether. pub tier1_route_back: Mutex, + /// Queue of received requests to which a response should be made over TIER3. + pub tier3_requests: Mutex>, + /// Shared counter across all PeerActors, which counts number of `RoutedMessageBody::ForwardTx` /// messages sincce last block. pub txns_since_last_block: AtomicUsize, @@ -194,7 +201,9 @@ impl NetworkState { chain_info: Default::default(), tier2: connection::Pool::new(config.node_id()), tier1: connection::Pool::new(config.node_id()), + tier3: connection::Pool::new(config.node_id()), inbound_handshake_permits: Arc::new(tokio::sync::Semaphore::new(LIMIT_PENDING_PEERS)), + my_public_addr: Arc::new(RwLock::new(None)), peer_store, snapshot_hosts: Arc::new(SnapshotHostsCache::new(config.snapshot_hosts.clone())), connection_store: connection_store::ConnectionStore::new(store.clone()).unwrap(), @@ -203,6 +212,7 @@ impl NetworkState { account_announcements: Arc::new(AnnounceAccountCache::new(store)), tier2_route_back: Mutex::new(RouteBackCache::default()), tier1_route_back: Mutex::new(RouteBackCache::default()), + tier3_requests: Mutex::new(VecDeque::::new()), recent_routed_messages: Mutex::new(lru::LruCache::new( NonZeroUsize::new(RECENT_ROUTED_MESSAGES_CACHE_SIZE).unwrap(), )), @@ -349,6 +359,15 @@ impl NetworkState { // Write to the peer store this.peer_store.peer_connected(&clock, peer_info); } + tcp::Tier::T3 => { + if conn.peer_type == PeerType::Inbound { + // TODO: check that we are expecting this Tier3 connection + } + if !edge.verify() { + return Err(RegisterPeerError::InvalidEdge); + } + this.tier3.insert_ready(conn).map_err(RegisterPeerError::PoolError)?; + } } Ok(()) }).await.unwrap() @@ -369,14 +388,19 @@ impl NetworkState { let clock = clock.clone(); let conn = conn.clone(); self.spawn(async move { - let peer_id = conn.peer_info.id.clone(); - if conn.tier == tcp::Tier::T1 { - // There is no banning or routing table for TIER1. - // Just remove the connection from the network_state. - this.tier1.remove(&conn); + match conn.tier { + tcp::Tier::T1 => this.tier1.remove(&conn), + tcp::Tier::T2 => this.tier2.remove(&conn), + tcp::Tier::T3 => this.tier3.remove(&conn), + } + + // The rest of this function has to do with banning or routing, + // which are applicable only for TIER2. + if conn.tier != tcp::Tier::T2 { return; } - this.tier2.remove(&conn); + + let peer_id = conn.peer_info.id.clone(); // If the last edge we have with this peer represent a connection addition, create the edge // update that represents the connection removal. @@ -558,6 +582,17 @@ impl NetworkState { } } } + tcp::Tier::T3 => { + let peer_id = match &msg.target { + PeerIdOrHash::Hash(_) => { + // There is no route back cache for TIER3 as all connections are direct + debug_assert!(false); + return false; + } + PeerIdOrHash::PeerId(peer_id) => peer_id.clone(), + }; + return self.tier3.send_message(peer_id, Arc::new(PeerMessage::Routed(msg))); + } } } @@ -743,6 +778,19 @@ impl NetworkState { self.client.send(EpochSyncResponseMessage { from_peer: peer_id, proof }); None } + RoutedMessageBody::StatePartRequest(request) => { + // TODO: cap the size of this queue, + // perhaps preferentially allowing requests made by validators + self.tier3_requests.lock().push_back(Tier3Request { + peer_info: PeerInfo { id: peer_id, addr: Some(request.addr), account_id: None }, + body: Tier3RequestBody::StatePartRequest( + request.shard_id, + request.sync_hash, + request.part_id, + ), + }); + None + } body => { tracing::error!(target: "network", "Peer received unexpected message type: {:?}", body); None diff --git a/chain/network/src/peer_manager/network_state/routing.rs b/chain/network/src/peer_manager/network_state/routing.rs index 0fe045fcdad..ccbf28c7f3f 100644 --- a/chain/network/src/peer_manager/network_state/routing.rs +++ b/chain/network/src/peer_manager/network_state/routing.rs @@ -210,9 +210,14 @@ impl NetworkState { tracing::trace!(target: "network", route_back = ?msg.clone(), "Received peer message that requires response"); let from = &conn.peer_info.id; + match conn.tier { tcp::Tier::T1 => self.tier1_route_back.lock().insert(&clock, msg.hash(), from.clone()), tcp::Tier::T2 => self.tier2_route_back.lock().insert(&clock, msg.hash(), from.clone()), + tcp::Tier::T3 => { + // TIER3 connections are direct by design; no routing is performed + debug_assert!(false) + } } } diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 6e04e871203..08030ed6a5c 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -1,10 +1,11 @@ -use crate::client::{ClientSenderForNetwork, SetNetworkInfo}; +use crate::client::{ClientSenderForNetwork, SetNetworkInfo, StateRequestPart}; use crate::config; use crate::debug::{DebugStatus, GetDebugStatus}; use crate::network_protocol; use crate::network_protocol::SyncSnapshotHosts; use crate::network_protocol::{ Disconnect, Edge, PeerIdOrHash, PeerMessage, Ping, Pong, RawRoutedMessage, RoutedMessageBody, + StatePartRequest, }; use crate::peer::peer_actor::PeerActor; use crate::peer_manager::connection; @@ -18,7 +19,7 @@ use crate::tcp; use crate::types::{ ConnectedPeerInfo, HighestHeightPeerInfo, KnownProducer, NetworkInfo, NetworkRequests, NetworkResponses, PeerInfo, PeerManagerMessageRequest, PeerManagerMessageResponse, PeerType, - SetChainInfo, SnapshotHostInfo, + SetChainInfo, SnapshotHostInfo, StateSyncEvent, Tier3RequestBody, }; use ::time::ext::InstantExt as _; use actix::fut::future::wrap_future; @@ -87,6 +88,11 @@ pub(crate) const UPDATE_CONNECTION_STORE_INTERVAL: time::Duration = time::Durati /// How often to poll the NetworkState for closed connections we'd like to re-establish. pub(crate) const POLL_CONNECTION_STORE_INTERVAL: time::Duration = time::Duration::minutes(1); +/// How often we should check for pending Tier3 requests +const PROCESS_TIER3_REQUESTS_INTERVAL: time::Duration = time::Duration::seconds(2); +/// The length of time that a Tier3 connection is allowed to idle before it is stopped +const TIER3_IDLE_TIMEOUT: time::Duration = time::Duration::seconds(15); + /// Actor that manages peers connections. pub struct PeerManagerActor { pub(crate) clock: time::Clock, @@ -338,6 +344,60 @@ impl PeerManagerActor { } } }); + // Periodically process pending Tier3 requests. + arbiter.spawn({ + let clock = clock.clone(); + let state = state.clone(); + let arbiter = arbiter.clone(); + let mut interval = time::Interval::new(clock.now(), PROCESS_TIER3_REQUESTS_INTERVAL); + async move { + loop { + interval.tick(&clock).await; + + // TODO: consider where exactly to throttle these + if let Some(request) = state.tier3_requests.lock().pop_front() { + arbiter.spawn({ + let clock = clock.clone(); + let state = state.clone(); + async move { + if let Some(response) = match request.body { + Tier3RequestBody::StatePartRequest(shard_id, sync_hash, part_id) => { + state + .client + .send_async(StateRequestPart { shard_id, sync_hash, part_id }) + .await + .ok() + .flatten() + .map(|response| PeerMessage::VersionedStateResponse(*response.0)) + } + } { + if !state.tier3.load().ready.contains_key(&request.peer_info.id) { + let result = async { + let stream = tcp::Stream::connect( + &request.peer_info, + tcp::Tier::T3, + &state.config.socket_options + ).await.context("tcp::Stream::connect()")?; + PeerActor::spawn_and_handshake(clock.clone(),stream,None,state.clone()).await.context("PeerActor::spawn()")?; + anyhow::Ok(()) + }.await; + + if let Err(ref err) = result { + tracing::info!(target: "network", err = format!("{:#}", err), "failed to connect to {}", request.peer_info); + } + } + + state.tier3.send_message(request.peer_info.id, Arc::new(response)); + } + else { + tracing::debug!(target: "network", "client failed to produce response for {:?}", request); + } + } + }); + } + } + } + }); } }); Ok(Self::start_in_arbiter(&arbiter, move |_ctx| Self { @@ -553,6 +613,27 @@ impl PeerManagerActor { } } + /// TIER3 connections are established ad-hoc to transmit individual large messages. + /// Here we terminate these "single-purpose" connections after an idle timeout. + /// + /// When a TIER3 connection is established the intended message is already prepared in-memory, + /// so there is no concern of the timeout falling in between the handshake and the payload. + /// + /// A finer detail is that as long as a TIER3 connection remains open it can be reused to + /// transmit additional TIER3 payloads intended for the same peer. In such cases the message + /// can be lost if the timeout is reached while it is in flight. + fn stop_tier3_idle_connections(&self) { + let now = self.clock.now(); + let _ = self + .state + .tier3 + .load() + .ready + .values() + .filter(|p| now - p.last_time_received_message.load() > TIER3_IDLE_TIMEOUT) + .map(|p| p.stop(None)); + } + /// Periodically monitor list of peers and: /// - request new peers from connected peers, /// - bootstrap outbound connections from known peers, @@ -621,6 +702,9 @@ impl PeerManagerActor { // If there are too many active connections try to remove some connections self.maybe_stop_active_connection(); + // Close Tier3 connections which have been idle for too long + self.stop_tier3_idle_connections(); + // Find peers that are not reliable (too much behind) - and make sure that we're not routing messages through them. let unreliable_peers = self.unreliable_peers(); metrics::PEER_UNRELIABLE.set(unreliable_peers.len() as i64); @@ -788,11 +872,42 @@ impl PeerManagerActor { NetworkResponses::RouteNotFound } } - NetworkRequests::StateRequestPart { shard_id, sync_hash, part_id, peer_id } => { - if self.state.tier2.send_message( - peer_id, - Arc::new(PeerMessage::StateRequestPart(shard_id, sync_hash, part_id)), - ) { + NetworkRequests::StateRequestPart { + shard_id, + sync_hash, + sync_prev_prev_hash, + part_id, + } => { + let mut success = false; + + // The node needs to include its own public address in the request + // so that the reponse can be sent over Tier3 + if let Some(addr) = *self.state.my_public_addr.read() { + if let Some(peer_id) = self.state.snapshot_hosts.select_host_for_part( + &sync_prev_prev_hash, + shard_id, + part_id, + ) { + success = + self.state.send_message_to_peer( + &self.clock, + tcp::Tier::T2, + self.state.sign_message( + &self.clock, + RawRoutedMessage { + target: PeerIdOrHash::PeerId(peer_id), + body: RoutedMessageBody::StatePartRequest( + StatePartRequest { shard_id, sync_hash, part_id, addr }, + ), + }, + ), + ); + } else { + tracing::debug!(target: "network", "no hosts available for {shard_id}, {sync_prev_prev_hash}"); + } + } + + if success { NetworkResponses::NoResponse } else { NetworkResponses::RouteNotFound @@ -1141,6 +1256,25 @@ impl actix::Handler> for PeerManagerA } } +impl actix::Handler> for PeerManagerActor { + type Result = (); + #[perf] + fn handle( + &mut self, + msg: WithSpanContext, + _ctx: &mut Self::Context, + ) -> Self::Result { + let (_span, msg) = handler_debug_span!(target: "network", msg); + let _timer = + metrics::PEER_MANAGER_MESSAGES_TIME.with_label_values(&[(&msg).into()]).start_timer(); + match msg { + StateSyncEvent::StatePartReceived(shard_id, part_id) => { + self.state.snapshot_hosts.part_received(shard_id, part_id); + } + } + } +} + impl actix::Handler for PeerManagerActor { type Result = DebugStatus; #[perf] diff --git a/chain/network/src/peer_manager/tests/connection_pool.rs b/chain/network/src/peer_manager/tests/connection_pool.rs index 79e00807f20..15da55a57aa 100644 --- a/chain/network/src/peer_manager/tests/connection_pool.rs +++ b/chain/network/src/peer_manager/tests/connection_pool.rs @@ -273,7 +273,7 @@ async fn invalid_edge() { ]; for (name, edge) in &testcases { - for tier in [tcp::Tier::T1, tcp::Tier::T2] { + for tier in [tcp::Tier::T1, tcp::Tier::T2, tcp::Tier::T3] { tracing::info!(target:"test","{name} {tier:?}"); let stream = tcp::Stream::connect(&pm.peer_info(), tier, &SocketOptions::default()) .await @@ -303,6 +303,7 @@ async fn invalid_edge() { let handshake = match tier { tcp::Tier::T1 => PeerMessage::Tier1Handshake(handshake), tcp::Tier::T2 => PeerMessage::Tier2Handshake(handshake), + tcp::Tier::T3 => PeerMessage::Tier3Handshake(handshake), }; stream.write(&handshake).await; let reason = events diff --git a/chain/network/src/rate_limits/messages_limits.rs b/chain/network/src/rate_limits/messages_limits.rs index 08d2d8ea40f..54638a829c3 100644 --- a/chain/network/src/rate_limits/messages_limits.rs +++ b/chain/network/src/rate_limits/messages_limits.rs @@ -220,6 +220,7 @@ fn get_key_and_token_cost(message: &PeerMessage) -> Option<(RateLimitedPeerMessa RoutedMessageBody::VersionedChunkEndorsement(_) => Some((ChunkEndorsement, 1)), RoutedMessageBody::EpochSyncRequest => None, RoutedMessageBody::EpochSyncResponse(_) => None, + RoutedMessageBody::StatePartRequest(_) => None, // TODO RoutedMessageBody::Ping(_) | RoutedMessageBody::Pong(_) | RoutedMessageBody::_UnusedChunkStateWitness @@ -239,6 +240,7 @@ fn get_key_and_token_cost(message: &PeerMessage) -> Option<(RateLimitedPeerMessa PeerMessage::VersionedStateResponse(_) => Some((VersionedStateResponse, 1)), PeerMessage::Tier1Handshake(_) | PeerMessage::Tier2Handshake(_) + | PeerMessage::Tier3Handshake(_) | PeerMessage::HandshakeFailure(_, _) | PeerMessage::LastEdge(_) | PeerMessage::Disconnect(_) diff --git a/chain/network/src/snapshot_hosts/mod.rs b/chain/network/src/snapshot_hosts/mod.rs index ca430900ac1..e9509a1a7a0 100644 --- a/chain/network/src/snapshot_hosts/mod.rs +++ b/chain/network/src/snapshot_hosts/mod.rs @@ -313,7 +313,6 @@ impl SnapshotHostsCache { } /// Given a state part request, selects a peer host to which the request should be sent. - #[allow(dead_code)] pub fn select_host_for_part( &self, sync_hash: &CryptoHash, @@ -324,7 +323,6 @@ impl SnapshotHostsCache { } /// Triggered by state sync actor after processing a state part. - #[allow(dead_code)] pub fn part_received(&self, shard_id: ShardId, part_id: u64) { let mut inner = self.0.lock(); inner.peer_selector.remove(&(shard_id, part_id)); diff --git a/chain/network/src/tcp.rs b/chain/network/src/tcp.rs index 06ba01a5033..5e5e78a7a42 100644 --- a/chain/network/src/tcp.rs +++ b/chain/network/src/tcp.rs @@ -26,6 +26,11 @@ pub enum Tier { /// consensus messages. Also, Tier1 peer discovery actually happens on Tier2 network, i.e. /// Tier2 network is necessary to bootstrap Tier1 connections. T2, + /// Tier3 connections are created ad hoc to directly transfer large messages, e.g. state parts. + /// Requests for state parts are routed over Tier2. A node receiving such a request initiates a + /// direct Tier3 connections to send the response. By sending large responses over dedicated + /// connections we avoid delaying other messages and we minimize network bandwidth usage. + T3, } #[derive(Clone, Debug)] diff --git a/chain/network/src/test_loop.rs b/chain/network/src/test_loop.rs index 0063281ebb1..8d7ab2dd651 100644 --- a/chain/network/src/test_loop.rs +++ b/chain/network/src/test_loop.rs @@ -13,7 +13,7 @@ use crate::state_witness::{ }; use crate::types::{ NetworkRequests, NetworkResponses, PeerManagerMessageRequest, PeerManagerMessageResponse, - SetChainInfo, + SetChainInfo, StateSyncEvent, }; use near_async::actix::ActixResult; use near_async::futures::{FutureSpawner, FutureSpawnerExt}; @@ -188,6 +188,10 @@ impl Handler for TestLoopPeerManagerActor { fn handle(&mut self, _msg: SetChainInfo) {} } +impl Handler for TestLoopPeerManagerActor { + fn handle(&mut self, _msg: StateSyncEvent) {} +} + impl Handler for TestLoopPeerManagerActor { fn handle(&mut self, msg: PeerManagerMessageRequest) -> PeerManagerMessageResponse { let PeerManagerMessageRequest::NetworkRequests(request) = msg else { diff --git a/chain/network/src/test_utils.rs b/chain/network/src/test_utils.rs index 4806752517f..2bbb64d433c 100644 --- a/chain/network/src/test_utils.rs +++ b/chain/network/src/test_utils.rs @@ -1,7 +1,7 @@ use crate::network_protocol::PeerInfo; use crate::types::{ NetworkInfo, NetworkResponses, PeerManagerMessageRequest, PeerManagerMessageResponse, - SetChainInfo, + SetChainInfo, StateSyncEvent, }; use crate::PeerManagerActor; use actix::{Actor, ActorContext, Context, Handler}; @@ -259,6 +259,10 @@ impl CanSend for MockPeerManagerAdapter { fn send(&self, _msg: SetChainInfo) {} } +impl CanSend for MockPeerManagerAdapter { + fn send(&self, _msg: StateSyncEvent) {} +} + impl MockPeerManagerAdapter { pub fn pop(&self) -> Option { self.requests.write().unwrap().pop_front() diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 3ddcbbcc067..0efaeff5f9e 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -245,7 +245,12 @@ pub enum NetworkRequests { /// Request state header for given shard at given state root. StateRequestHeader { shard_id: ShardId, sync_hash: CryptoHash, peer_id: PeerId }, /// Request state part for given shard at given state root. - StateRequestPart { shard_id: ShardId, sync_hash: CryptoHash, part_id: u64, peer_id: PeerId }, + StateRequestPart { + shard_id: ShardId, + sync_hash: CryptoHash, + sync_prev_prev_hash: CryptoHash, + part_id: u64, + }, /// Ban given peer. BanPeer { peer_id: PeerId, ban_reason: ReasonForBan }, /// Announce account @@ -288,6 +293,12 @@ pub enum NetworkRequests { EpochSyncResponse { route_back: CryptoHash, proof: CompressedEpochSyncProof }, } +#[derive(Debug, actix::Message, strum::IntoStaticStr)] +#[rtype(result = "()")] +pub enum StateSyncEvent { + StatePartReceived(ShardId, u64), +} + /// Combines peer address info, chain. #[derive(Debug, Clone, Eq, PartialEq)] pub struct FullPeerInfo { @@ -399,6 +410,7 @@ pub struct PeerManagerAdapter { pub async_request_sender: AsyncSender, pub request_sender: Sender, pub set_chain_info_sender: Sender, + pub state_sync_event_sender: Sender, } #[cfg(test)] @@ -498,3 +510,17 @@ pub struct AccountIdOrPeerTrackingShard { /// Only send messages to peers whose latest chain height is no less `min_height` pub min_height: BlockHeight, } + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +/// An inbound request to which a response should be sent over Tier3 +pub struct Tier3Request { + /// Target peer to send the response to + pub peer_info: PeerInfo, + /// Contents of the request + pub body: Tier3RequestBody, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum Tier3RequestBody { + StatePartRequest(ShardId, CryptoHash, u64), +} diff --git a/core/store/src/config.rs b/core/store/src/config.rs index 38e551d58cb..f7c155ab32d 100644 --- a/core/store/src/config.rs +++ b/core/store/src/config.rs @@ -112,10 +112,10 @@ pub struct StateSnapshotConfig { pub enum StateSnapshotType { /// Consider this as the default "disabled" option. We need to have snapshotting enabled for resharding /// State snapshots involve filesystem operations and costly IO operations. - #[default] ForReshardingOnly, /// This is the "enabled" option where we create a snapshot at the beginning of every epoch. /// Needed if a node wants to be able to respond to state part requests. + #[default] EveryEpoch, }