diff --git a/Cargo.lock b/Cargo.lock index a29a09237ed..344f9e5b6ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,7 +91,7 @@ dependencies = [ "percent-encoding", "pin-project", "pin-project-lite", - "rand 0.8.4", + "rand 0.8.5", "regex", "serde", "sha-1", @@ -403,7 +403,7 @@ dependencies = [ "mime", "percent-encoding", "pin-project-lite", - "rand 0.8.4", + "rand 0.8.5", "serde", "serde_json", "serde_urlencoded", @@ -790,7 +790,7 @@ dependencies = [ "nearcore", "openssl-probe", "parking_lot 0.11.2", - "rand 0.8.4", + "rand 0.8.5", "tokio", ] @@ -1525,7 +1525,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfcf0ed7fe52a17a03854ec54a9f76d6d84508d1c0e66bc1793301c73fc8493c" dependencies = [ "byteorder", - "rand 0.8.4", + "rand 0.8.5", "rustc-hex", "static_assertions", ] @@ -1595,6 +1595,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "funty" version = "1.1.0" @@ -2238,9 +2244,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.112" +version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125" +checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" [[package]] name = "libfuzzer-sys" @@ -2991,8 +2997,10 @@ dependencies = [ "borsh", "bytes", "bytesize", + "chrono", "conqueue", "criterion", + "crossbeam-channel", "deepsize", "delay-detector", "futures", @@ -3009,10 +3017,15 @@ dependencies = [ "near-stable-hasher", "near-store", "once_cell", - "rand 0.7.3", + "parking_lot 0.11.2", + "protobuf 3.0.1", + "protobuf-codegen", + "rand 0.6.5", + "rand_pcg", "serde", "strum", "tempfile", + "thiserror", "tokio", "tokio-stream", "tokio-util 0.7.1", @@ -3260,7 +3273,7 @@ version = "0.0.0" dependencies = [ "arbitrary", "once_cell", - "rand 0.8.4", + "rand 0.8.5", "wasm-encoder", "wasm-smith", "wat", @@ -3321,7 +3334,7 @@ dependencies = [ "parity-wasm 0.41.0", "pwasm-utils 0.12.0", "pwasm-utils 0.18.2", - "rand 0.8.4", + "rand 0.8.5", "serde", "threadpool", "tracing", @@ -3610,9 +3623,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.8.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" +checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" [[package]] name = "oorandom" @@ -4056,7 +4069,7 @@ dependencies = [ "fnv", "lazy_static", "parking_lot 0.11.2", - "protobuf", + "protobuf 2.25.1", "regex", "thiserror", ] @@ -4067,6 +4080,57 @@ version = "2.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23129d50f2c9355ced935fce8a08bd706ee2e7ce2b3b33bf61dace0e379ac63a" +[[package]] +name = "protobuf" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "430ff91293021cc9369e04edced9594c9c7a03760e62afcf98b0f8ad99a56ef7" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror", +] + +[[package]] +name = "protobuf-codegen" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9248f242f2b2915e050852c56058dffe2ea9d71fba735d8eb33be78973b71cd" +dependencies = [ + "anyhow", + "once_cell", + "protobuf 3.0.1", + "protobuf-parse", + "regex", + "tempfile", + "thiserror", +] + +[[package]] +name = "protobuf-parse" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7aa0f9769c554c90cb1b28592f7efe03ad2540de046dfa21d89155776c27c29" +dependencies = [ + "anyhow", + "indexmap", + "log", + "protobuf 3.0.1", + "protobuf-support", + "tempfile", + "thiserror", + "which", +] + +[[package]] +name = "protobuf-support" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9553cd16fcb53a9b33da0735313393ba2697d090ae9af4a96f160992513ce8cb" +dependencies = [ + "thiserror", +] + [[package]] name = "psm" version = "0.1.16" @@ -4146,6 +4210,7 @@ dependencies = [ "rand_hc 0.1.0", "rand_isaac", "rand_jitter", + "rand_os", "rand_pcg", "rand_xorshift 0.1.1", "winapi", @@ -4166,14 +4231,13 @@ dependencies = [ [[package]] name = "rand" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha 0.3.1", "rand_core 0.6.3", - "rand_hc 0.3.1", ] [[package]] @@ -4257,15 +4321,6 @@ dependencies = [ "rand_core 0.5.1", ] -[[package]] -name = "rand_hc" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" -dependencies = [ - "rand_core 0.6.3", -] - [[package]] name = "rand_isaac" version = "0.1.1" @@ -4286,6 +4341,20 @@ dependencies = [ "winapi", ] +[[package]] +name = "rand_os" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071" +dependencies = [ + "cloudabi", + "fuchsia-cprng", + "libc", + "rand_core 0.4.2", + "rdrand", + "winapi", +] + [[package]] name = "rand_pcg" version = "0.1.2" @@ -4339,6 +4408,15 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + [[package]] name = "redis" version = "0.21.5" @@ -4401,9 +4479,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286" dependencies = [ "aho-corasick", "memchr", @@ -6259,7 +6337,7 @@ dependencies = [ "mach", "memoffset", "more-asserts", - "rand 0.8.4", + "rand 0.8.5", "region 2.2.0", "rustix", "thiserror", @@ -6307,6 +6385,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "which" +version = "4.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c4fb54e6113b6a8772ee41c3404fb0301ac79604489467e0a9ce1f3e97c24ae" +dependencies = [ + "either", + "lazy_static", + "libc", +] + [[package]] name = "winapi" version = "0.3.9" @@ -6460,7 +6549,7 @@ dependencies = [ "byteorder", "crunchy", "lazy_static", - "rand 0.8.4", + "rand 0.8.5", "rustc-hex", ] diff --git a/chain/client/src/test_utils.rs b/chain/client/src/test_utils.rs index bbec7feec3e..9d8f2366e0f 100644 --- a/chain/client/src/test_utils.rs +++ b/chain/client/src/test_utils.rs @@ -992,7 +992,6 @@ pub fn setup_mock_all_validators( | NetworkRequests::RequestUpdateNonce(_, _) | NetworkRequests::ResponseUpdateNonce(_) | NetworkRequests::ReceiptOutComeRequest(_, _) => {} - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] | NetworkRequests::IbfMessage { .. } => {} }; } diff --git a/chain/network/Cargo.toml b/chain/network/Cargo.toml index 6d74f0a439b..acd731d82cb 100644 --- a/chain/network/Cargo.toml +++ b/chain/network/Cargo.toml @@ -7,9 +7,17 @@ edition = "2021" rust-version = "1.60.0" publish = false +[build-dependencies] +anyhow = "1.0.55" +protobuf-codegen = "3.0.1" + [dependencies] +protobuf = "3.0.1" actix = "0.13.0" anyhow = "1.0.55" +chrono = "0.4.4" +crossbeam-channel = "0.5" +parking_lot = "0.11.2" borsh = { version = "0.9", features = ["rc"] } bytes = "1" bytesize = "1.1" @@ -18,9 +26,11 @@ deepsize = { version = "0.2.0", optional = true } futures = "0.3" itertools = "0.10.3" lru = "0.7.2" +thiserror = "1" near-rust-allocator-proxy = { version = "0.4", optional = true } once_cell = "1.5.2" -rand = "0.7" +rand = "0.6" +rand_pcg = "0.1" serde = { version = "1", features = ["alloc", "derive", "rc"], optional = true } strum = { version = "0.20", features = ["derive"] } tokio-stream = { version = "0.1.2", features = ["net"] } @@ -36,7 +46,7 @@ near-performance-metrics = { path = "../../utils/near-performance-metrics" } near-performance-metrics-macros = { path = "../../utils/near-performance-metrics-macros" } near-primitives = { path = "../../core/primitives" } near-rate-limiter = { path = "../../utils/near-rate-limiter" } -near-stable-hasher = { path = "../../utils/near-stable-hasher", optional = true } +near-stable-hasher = { path = "../../utils/near-stable-hasher"} near-store = { path = "../../core/store" } [dev-dependencies] @@ -57,7 +67,6 @@ performance_stats = [ ] protocol_feature_routing_exchange_algorithm = [ "near-primitives/protocol_feature_routing_exchange_algorithm", - "near-stable-hasher", ] sandbox = ["near-network-primitives/sandbox"] test_features = [ diff --git a/chain/network/build.rs b/chain/network/build.rs new file mode 100644 index 00000000000..7b66c022c92 --- /dev/null +++ b/chain/network/build.rs @@ -0,0 +1,9 @@ +fn main() -> anyhow::Result<()> { + println!("cargo:rerun-if-changed=src/network_protocol/network.proto"); + protobuf_codegen::Codegen::new() + .pure() + .includes(&["src/"]) + .input("src/network_protocol/network.proto") + .cargo_out_dir("proto") + .run() +} diff --git a/chain/network/src/network_protocol.rs b/chain/network/src/network_protocol/borsh.rs similarity index 71% rename from chain/network/src/network_protocol.rs rename to chain/network/src/network_protocol/borsh.rs index 9f70532ae4e..27b53be11ab 100644 --- a/chain/network/src/network_protocol.rs +++ b/chain/network/src/network_protocol/borsh.rs @@ -5,7 +5,7 @@ /// We need to maintain backwards compatibility, all changes to this file needs to be reviews. use borsh::{BorshDeserialize, BorshSerialize}; use near_network_primitives::types::{ - Edge, PartialEdgeInfo, PeerChainInfoV2, PeerInfo, RoutedMessage, RoutedMessageBody, + Edge, PartialEdgeInfo, PeerChainInfoV2, PeerInfo, RoutedMessage, }; use near_primitives::block::{Block, BlockHeader, GenesisId}; use near_primitives::challenge::Challenge; @@ -13,7 +13,7 @@ use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; use near_primitives::syncing::{EpochSyncFinalizationResponse, EpochSyncResponse}; use near_primitives::transaction::SignedTransaction; -use near_primitives::types::{EpochId, ProtocolVersion}; +use near_primitives::types::EpochId; use near_primitives::version::{PEER_MIN_ALLOWED_PROTOCOL_VERSION, PROTOCOL_VERSION}; use std::fmt::Formatter; use std::{fmt, io}; @@ -61,27 +61,6 @@ struct HandshakeAutoDes { partial_edge_info: PartialEdgeInfo, } -impl Handshake { - pub(crate) fn new( - version: ProtocolVersion, - peer_id: PeerId, - target_peer_id: PeerId, - listen_port: Option, - chain_info: PeerChainInfoV2, - partial_edge_info: PartialEdgeInfo, - ) -> Self { - Handshake { - protocol_version: version, - oldest_supported_version: PEER_MIN_ALLOWED_PROTOCOL_VERSION, - sender_peer_id: peer_id, - target_peer_id, - sender_listen_port: listen_port, - sender_chain_info: chain_info, - partial_edge_info, - } - } -} - // Use custom deserializer for HandshakeV2. Try to read version of the other peer from the header. // If the version is supported then fallback to standard deserializer. impl BorshDeserialize for Handshake { @@ -204,22 +183,18 @@ pub enum PeerMessage { EpochSyncFinalizationRequest(EpochId), EpochSyncFinalizationResponse(Box), - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] RoutingTableSyncV2(RoutingSyncV2), } #[cfg(target_arch = "x86_64")] // Non-x86_64 doesn't match this requirement yet but it's not bad as it's not production-ready const _: () = assert!(std::mem::size_of::() <= 1144, "PeerMessage > 1144 bytes"); -#[cfg(feature = "protocol_feature_routing_exchange_algorithm")] #[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))] #[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] pub enum RoutingSyncV2 { Version2(RoutingVersion2), } -#[cfg(feature = "protocol_feature_routing_exchange_algorithm")] const _: () = assert!(std::mem::size_of::() <= 80, "RoutingSyncV2 > 80 bytes"); -#[cfg(feature = "protocol_feature_routing_exchange_algorithm")] #[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))] #[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] pub struct PartialSync { @@ -227,7 +202,6 @@ pub struct PartialSync { pub(crate) ibf: Vec, } -#[cfg(feature = "protocol_feature_routing_exchange_algorithm")] #[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))] #[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] pub enum RoutingState { @@ -238,7 +212,6 @@ pub enum RoutingState { InitializeIbf, } -#[cfg(feature = "protocol_feature_routing_exchange_algorithm")] #[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))] #[derive(BorshSerialize, BorshDeserialize, PartialEq, Eq, Clone, Debug)] pub struct RoutingVersion2 { @@ -247,63 +220,3 @@ pub struct RoutingVersion2 { pub(crate) edges: Vec, pub(crate) routing_state: RoutingState, } - -impl fmt::Display for PeerMessage { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Display::fmt(self.msg_variant(), f) - } -} - -impl PeerMessage { - pub(crate) fn msg_variant(&self) -> &str { - if let PeerMessage::Routed(routed_message) = self { - routed_message.body.as_ref() - } else { - self.as_ref() - } - } - - pub(crate) fn is_client_message(&self) -> bool { - match self { - PeerMessage::Block(_) - | PeerMessage::BlockHeaders(_) - | PeerMessage::Challenge(_) - | PeerMessage::EpochSyncFinalizationResponse(_) - | PeerMessage::EpochSyncResponse(_) - | PeerMessage::Transaction(_) => true, - PeerMessage::Routed(r) => matches!( - r.body, - RoutedMessageBody::BlockApproval(_) - | RoutedMessageBody::ForwardTx(_) - | RoutedMessageBody::PartialEncodedChunk(_) - | RoutedMessageBody::PartialEncodedChunkForward(_) - | RoutedMessageBody::PartialEncodedChunkRequest(_) - | RoutedMessageBody::PartialEncodedChunkResponse(_) - | RoutedMessageBody::StateResponse(_) - | RoutedMessageBody::VersionedPartialEncodedChunk(_) - | RoutedMessageBody::VersionedStateResponse(_) - ), - _ => false, - } - } - - pub(crate) fn is_view_client_message(&self) -> bool { - match self { - PeerMessage::BlockHeadersRequest(_) - | PeerMessage::BlockRequest(_) - | PeerMessage::EpochSyncFinalizationRequest(_) - | PeerMessage::EpochSyncRequest(_) => true, - PeerMessage::Routed(r) => matches!( - r.body, - RoutedMessageBody::QueryRequest { .. } - | RoutedMessageBody::QueryResponse { .. } - | RoutedMessageBody::ReceiptOutcomeRequest(_) - | RoutedMessageBody::StateRequestHeader(_, _) - | RoutedMessageBody::StateRequestPart(_, _, _) - | RoutedMessageBody::TxStatusRequest(_, _) - | RoutedMessageBody::TxStatusResponse(_) - ), - _ => false, - } - } -} diff --git a/chain/network/src/network_protocol/borsh_conv.rs b/chain/network/src/network_protocol/borsh_conv.rs new file mode 100644 index 00000000000..0c4d6d28812 --- /dev/null +++ b/chain/network/src/network_protocol/borsh_conv.rs @@ -0,0 +1,160 @@ +/// Contains borsh <-> network_protocol conversions. +use crate::network_protocol as mem; +use crate::network_protocol::borsh as net; +use thiserror::Error; + +impl From<&net::Handshake> for mem::Handshake { + fn from(x: &net::Handshake) -> Self { + Self { + protocol_version: x.protocol_version, + oldest_supported_version: x.oldest_supported_version, + sender_peer_id: x.sender_peer_id.clone(), + target_peer_id: x.target_peer_id.clone(), + sender_listen_port: x.sender_listen_port, + sender_chain_info: x.sender_chain_info.clone(), + partial_edge_info: x.partial_edge_info.clone(), + } + } +} + +impl From<&mem::Handshake> for net::Handshake { + fn from(x: &mem::Handshake) -> Self { + Self { + protocol_version: x.protocol_version, + oldest_supported_version: x.oldest_supported_version, + sender_peer_id: x.sender_peer_id.clone(), + target_peer_id: x.target_peer_id.clone(), + sender_listen_port: x.sender_listen_port, + sender_chain_info: x.sender_chain_info.clone(), + partial_edge_info: x.partial_edge_info.clone(), + } + } +} + +////////////////////////////////////////// + +impl From<&net::HandshakeFailureReason> for mem::HandshakeFailureReason { + fn from(x: &net::HandshakeFailureReason) -> Self { + match x { + net::HandshakeFailureReason::ProtocolVersionMismatch { + version, + oldest_supported_version, + } => mem::HandshakeFailureReason::ProtocolVersionMismatch { + version: *version, + oldest_supported_version: *oldest_supported_version, + }, + net::HandshakeFailureReason::GenesisMismatch(genesis_id) => { + mem::HandshakeFailureReason::GenesisMismatch(genesis_id.clone()) + } + net::HandshakeFailureReason::InvalidTarget => { + mem::HandshakeFailureReason::InvalidTarget + } + } + } +} + +impl From<&mem::HandshakeFailureReason> for net::HandshakeFailureReason { + fn from(x: &mem::HandshakeFailureReason) -> Self { + match x { + mem::HandshakeFailureReason::ProtocolVersionMismatch { + version, + oldest_supported_version, + } => net::HandshakeFailureReason::ProtocolVersionMismatch { + version: *version, + oldest_supported_version: *oldest_supported_version, + }, + mem::HandshakeFailureReason::GenesisMismatch(genesis_id) => { + net::HandshakeFailureReason::GenesisMismatch(genesis_id.clone()) + } + mem::HandshakeFailureReason::InvalidTarget => { + net::HandshakeFailureReason::InvalidTarget + } + } + } +} + +////////////////////////////////////////// + +#[derive(Error, Debug)] +pub enum ParsePeerMessageError { + #[error("HandshakeV2 is deprecated")] + DeprecatedHandshakeV2, +} + +impl TryFrom<&net::PeerMessage> for mem::PeerMessage { + type Error = ParsePeerMessageError; + fn try_from(x: &net::PeerMessage) -> Result { + Ok(match x.clone() { + net::PeerMessage::Handshake(h) => mem::PeerMessage::Handshake((&h).into()), + net::PeerMessage::HandshakeFailure(pi, hfr) => { + mem::PeerMessage::HandshakeFailure(pi, (&hfr).into()) + } + net::PeerMessage::LastEdge(e) => mem::PeerMessage::LastEdge(e), + net::PeerMessage::SyncRoutingTable(rtu) => mem::PeerMessage::SyncRoutingTable(rtu), + net::PeerMessage::RequestUpdateNonce(e) => mem::PeerMessage::RequestUpdateNonce(e), + net::PeerMessage::ResponseUpdateNonce(e) => mem::PeerMessage::ResponseUpdateNonce(e), + net::PeerMessage::PeersRequest => mem::PeerMessage::PeersRequest, + net::PeerMessage::PeersResponse(pis) => mem::PeerMessage::PeersResponse(pis), + net::PeerMessage::BlockHeadersRequest(bhs) => { + mem::PeerMessage::BlockHeadersRequest(bhs) + } + net::PeerMessage::BlockHeaders(bhs) => mem::PeerMessage::BlockHeaders(bhs), + net::PeerMessage::BlockRequest(bh) => mem::PeerMessage::BlockRequest(bh), + net::PeerMessage::Block(b) => mem::PeerMessage::Block(b), + net::PeerMessage::Transaction(t) => mem::PeerMessage::Transaction(t), + net::PeerMessage::Routed(r) => mem::PeerMessage::Routed(r), + net::PeerMessage::Disconnect => mem::PeerMessage::Disconnect, + net::PeerMessage::Challenge(c) => mem::PeerMessage::Challenge(c), + net::PeerMessage::_HandshakeV2 => return Err(Self::Error::DeprecatedHandshakeV2), + net::PeerMessage::EpochSyncRequest(epoch_id) => { + mem::PeerMessage::EpochSyncRequest(epoch_id) + } + net::PeerMessage::EpochSyncResponse(esr) => mem::PeerMessage::EpochSyncResponse(esr), + net::PeerMessage::EpochSyncFinalizationRequest(epoch_id) => { + mem::PeerMessage::EpochSyncFinalizationRequest(epoch_id) + } + net::PeerMessage::EpochSyncFinalizationResponse(esfr) => { + mem::PeerMessage::EpochSyncFinalizationResponse(esfr) + } + net::PeerMessage::RoutingTableSyncV2(rs) => mem::PeerMessage::RoutingTableSyncV2(rs), + }) + } +} + +impl From<&mem::PeerMessage> for net::PeerMessage { + fn from(x: &mem::PeerMessage) -> Self { + match x.clone() { + mem::PeerMessage::Handshake(h) => net::PeerMessage::Handshake((&h).into()), + mem::PeerMessage::HandshakeFailure(pi, hfr) => { + net::PeerMessage::HandshakeFailure(pi, (&hfr).into()) + } + mem::PeerMessage::LastEdge(e) => net::PeerMessage::LastEdge(e), + mem::PeerMessage::SyncRoutingTable(rtu) => net::PeerMessage::SyncRoutingTable(rtu), + mem::PeerMessage::RequestUpdateNonce(e) => net::PeerMessage::RequestUpdateNonce(e), + mem::PeerMessage::ResponseUpdateNonce(e) => net::PeerMessage::ResponseUpdateNonce(e), + mem::PeerMessage::PeersRequest => net::PeerMessage::PeersRequest, + mem::PeerMessage::PeersResponse(pis) => net::PeerMessage::PeersResponse(pis), + mem::PeerMessage::BlockHeadersRequest(bhs) => { + net::PeerMessage::BlockHeadersRequest(bhs) + } + mem::PeerMessage::BlockHeaders(bhs) => net::PeerMessage::BlockHeaders(bhs), + mem::PeerMessage::BlockRequest(bh) => net::PeerMessage::BlockRequest(bh), + mem::PeerMessage::Block(b) => net::PeerMessage::Block(b), + mem::PeerMessage::Transaction(t) => net::PeerMessage::Transaction(t), + mem::PeerMessage::Routed(r) => net::PeerMessage::Routed(r), + mem::PeerMessage::Disconnect => net::PeerMessage::Disconnect, + mem::PeerMessage::Challenge(c) => net::PeerMessage::Challenge(c), + mem::PeerMessage::EpochSyncRequest(epoch_id) => { + net::PeerMessage::EpochSyncRequest(epoch_id) + } + mem::PeerMessage::EpochSyncResponse(esr) => net::PeerMessage::EpochSyncResponse(esr), + mem::PeerMessage::EpochSyncFinalizationRequest(epoch_id) => { + net::PeerMessage::EpochSyncFinalizationRequest(epoch_id) + } + mem::PeerMessage::EpochSyncFinalizationResponse(esfr) => { + net::PeerMessage::EpochSyncFinalizationResponse(esfr) + } + mem::PeerMessage::RoutingTableSyncV2(rs) => net::PeerMessage::RoutingTableSyncV2(rs), + } + } +} diff --git a/chain/network/src/network_protocol/mod.rs b/chain/network/src/network_protocol/mod.rs new file mode 100644 index 00000000000..800433e884a --- /dev/null +++ b/chain/network/src/network_protocol/mod.rs @@ -0,0 +1,212 @@ +/// Contains types that belong to the `network protocol. +mod borsh; +mod borsh_conv; +mod proto_conv; + +mod _proto { + include!(concat!(env!("OUT_DIR"), "/proto/mod.rs")); +} + +pub use _proto::network as proto; + +use ::borsh::{BorshDeserialize as _, BorshSerialize as _}; +use near_network_primitives::types::{ + Edge, PartialEdgeInfo, PeerChainInfoV2, PeerInfo, RoutedMessage, RoutedMessageBody, +}; +use near_primitives::block::{Block, BlockHeader, GenesisId}; +use near_primitives::challenge::Challenge; +use near_primitives::hash::CryptoHash; +use near_primitives::network::PeerId; +use near_primitives::syncing::{EpochSyncFinalizationResponse, EpochSyncResponse}; +use near_primitives::transaction::SignedTransaction; +use near_primitives::types::{EpochId, ProtocolVersion}; +use near_primitives::version::PEER_MIN_ALLOWED_PROTOCOL_VERSION; +use protobuf::Message as _; +use std::fmt; +use thiserror::Error; + +pub use self::borsh::{ + PartialSync, RoutingState, RoutingSyncV2, RoutingTableUpdate, RoutingVersion2, +}; + +/// Structure representing handshake between peers. +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct Handshake { + /// Current protocol version. + pub(crate) protocol_version: u32, + /// Oldest supported protocol version. + pub(crate) oldest_supported_version: u32, + /// Sender's peer id. + pub(crate) sender_peer_id: PeerId, + /// Receiver's peer id. + pub(crate) target_peer_id: PeerId, + /// Sender's listening addr. + pub(crate) sender_listen_port: Option, + /// Peer's chain information. + pub(crate) sender_chain_info: PeerChainInfoV2, + /// Represents new `edge`. Contains only `none` and `Signature` from the sender. + pub(crate) partial_edge_info: PartialEdgeInfo, +} + +impl Handshake { + pub(crate) fn new( + version: ProtocolVersion, + peer_id: PeerId, + target_peer_id: PeerId, + listen_port: Option, + chain_info: PeerChainInfoV2, + partial_edge_info: PartialEdgeInfo, + ) -> Self { + Handshake { + protocol_version: version, + oldest_supported_version: PEER_MIN_ALLOWED_PROTOCOL_VERSION, + sender_peer_id: peer_id, + target_peer_id, + sender_listen_port: listen_port, + sender_chain_info: chain_info, + partial_edge_info, + } + } +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub enum HandshakeFailureReason { + ProtocolVersionMismatch { version: u32, oldest_supported_version: u32 }, + GenesisMismatch(GenesisId), + InvalidTarget, +} + +#[derive(PartialEq, Eq, Clone, Debug, strum::AsRefStr, strum::EnumVariantNames)] +#[allow(clippy::large_enum_variant)] +pub enum PeerMessage { + Handshake(Handshake), + HandshakeFailure(PeerInfo, HandshakeFailureReason), + /// When a failed nonce is used by some peer, this message is sent back as evidence. + LastEdge(Edge), + /// Contains accounts and edge information. + SyncRoutingTable(RoutingTableUpdate), + RequestUpdateNonce(PartialEdgeInfo), + ResponseUpdateNonce(Edge), + + PeersRequest, + PeersResponse(Vec), + + BlockHeadersRequest(Vec), + BlockHeaders(Vec), + + BlockRequest(CryptoHash), + Block(Block), + + Transaction(SignedTransaction), + Routed(Box), + + /// Gracefully disconnect from other peer. + Disconnect, + Challenge(Challenge), + EpochSyncRequest(EpochId), + EpochSyncResponse(Box), + EpochSyncFinalizationRequest(EpochId), + EpochSyncFinalizationResponse(Box), + + RoutingTableSyncV2(RoutingSyncV2), +} + +impl fmt::Display for PeerMessage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(self.msg_variant(), f) + } +} + +#[derive(Copy, Clone, PartialEq, Eq, Debug)] +pub enum Encoding { + Borsh, + Proto, +} + +#[derive(Error, Debug)] +pub enum ParsePeerMessageError { + #[error("BorshDecode")] + BorshDecode(std::io::Error), + #[error("BorshConv")] + BorshConv(borsh_conv::ParsePeerMessageError), + #[error("ProtoDecode")] + ProtoDecode(protobuf::Error), + #[error("ProtoConv")] + ProtoConv(proto_conv::ParsePeerMessageError), +} + +impl PeerMessage { + pub(crate) fn serialize(&self, enc: Encoding) -> Vec { + match enc { + Encoding::Borsh => borsh::PeerMessage::from(self).try_to_vec().unwrap(), + Encoding::Proto => proto::PeerMessage::from(self).write_to_bytes().unwrap(), + } + } + + pub(crate) fn deserialize( + enc: Encoding, + data: &[u8], + ) -> Result { + Ok(match enc { + Encoding::Borsh => (&borsh::PeerMessage::try_from_slice(data) + .map_err(ParsePeerMessageError::BorshDecode)?) + .try_into() + .map_err(ParsePeerMessageError::BorshConv)?, + Encoding::Proto => (&proto::PeerMessage::parse_from_bytes(data) + .map_err(ParsePeerMessageError::ProtoDecode)?) + .try_into() + .map_err(ParsePeerMessageError::ProtoConv)?, + }) + } + + pub(crate) fn msg_variant(&self) -> &str { + match self { + PeerMessage::Routed(routed_message) => routed_message.body.as_ref(), + _ => self.as_ref(), + } + } + + pub(crate) fn is_client_message(&self) -> bool { + match self { + PeerMessage::Block(_) + | PeerMessage::BlockHeaders(_) + | PeerMessage::Challenge(_) + | PeerMessage::EpochSyncFinalizationResponse(_) + | PeerMessage::EpochSyncResponse(_) + | PeerMessage::Transaction(_) => true, + PeerMessage::Routed(r) => matches!( + r.body, + RoutedMessageBody::BlockApproval(_) + | RoutedMessageBody::ForwardTx(_) + | RoutedMessageBody::PartialEncodedChunk(_) + | RoutedMessageBody::PartialEncodedChunkForward(_) + | RoutedMessageBody::PartialEncodedChunkRequest(_) + | RoutedMessageBody::PartialEncodedChunkResponse(_) + | RoutedMessageBody::StateResponse(_) + | RoutedMessageBody::VersionedPartialEncodedChunk(_) + | RoutedMessageBody::VersionedStateResponse(_) + ), + _ => false, + } + } + + pub(crate) fn is_view_client_message(&self) -> bool { + match self { + PeerMessage::BlockHeadersRequest(_) + | PeerMessage::BlockRequest(_) + | PeerMessage::EpochSyncFinalizationRequest(_) + | PeerMessage::EpochSyncRequest(_) => true, + PeerMessage::Routed(r) => matches!( + r.body, + RoutedMessageBody::QueryRequest { .. } + | RoutedMessageBody::QueryResponse { .. } + | RoutedMessageBody::ReceiptOutcomeRequest(_) + | RoutedMessageBody::StateRequestHeader(_, _) + | RoutedMessageBody::StateRequestPart(_, _, _) + | RoutedMessageBody::TxStatusRequest(_, _) + | RoutedMessageBody::TxStatusResponse(_) + ), + _ => false, + } + } +} diff --git a/chain/network/src/network_protocol/network.proto b/chain/network/src/network_protocol/network.proto new file mode 100644 index 00000000000..6d2187c8e2d --- /dev/null +++ b/chain/network/src/network_protocol/network.proto @@ -0,0 +1,300 @@ +/// After changing this file, regenerate protobuf code. +/// See build.rs for details. +syntax = "proto3"; +package network; + +// Wrapper of borsh-encoded PublicKey. +// https://github.com/near/nearcore/blob/1a4edefd0116f7d1e222bc96569367a02fe64199/core/crypto/src/signature.rs#L201 +message PublicKey { + bytes borsh = 1; +} + +// Wrapper of borsh-encoded PeerInfo. +// https://github.com/near/nearcore/blob/1a4edefd0116f7d1e222bc96569367a02fe64199/chain/network-primitives/src/network_protocol/mod.rs#L30 +message PeerInfo { + bytes borsh = 1; +} + +// sha256 hash of the borsh-encoded NEAR Block. +message CryptoHash { + // sha256 hash (32 bytes) + bytes hash = 1; +} + +// Wrapper of borsh-encoded Edge. +// https://cs.github.com/near/nearcore/blob/1a4edefd0116f7d1e222bc96569367a02fe64199/chain/network-primitives/src/network_protocol/edge.rs#L32 +message Edge { + bytes borsh = 1; +} + +// Wrapper of the borsh-encoded PartialEdgeInfo. +// https://github.com/near/nearcore/blob/1a4edefd0116f7d1e222bc96569367a02fe64199/chain/network-primitives/src/network_protocol/edge.rs#L11 +message PartialEdgeInfo { + bytes borsh = 1; +} + +// Wrapper of the borsh-encoded AnnounceAccount. +// https://github.com/near/nearcore/blob/1a4edefd0116f7d1e222bc96569367a02fe64199/core/primitives/src/network.rs#L86 +message AnnounceAccount { + bytes borsh = 1; +} + +// Wrapper of the borsh-encoded NEAR chain block. +// https://github.com/near/nearcore/blob/1a4edefd0116f7d1e222bc96569367a02fe64199/core/primitives/src/block.rs#L77 +message Block { + bytes borsh = 1; +} + +// Wrapper of the borsh-encoded BlockHeader. +// https://github.com/near/nearcore/blob/1a4edefd0116f7d1e222bc96569367a02fe64199/core/primitives/src/block_header.rs#L325 +message BlockHeader { + bytes borsh = 1; +} + +// Unique identifier of the NEAR chain. +message GenesisId { + // Name of the chain (for example "mainnet"). + string chain_id = 1; + // Hash of the genesis block(?) of the NEAR chain. + CryptoHash hash = 2; +} + +// Basic information about the chain view maintained by a peer. +message PeerChainInfo { + GenesisId genesis_id = 1; + // Height of the highest NEAR chain block known to a peer. + uint64 height = 2; + // Shards of the NEAR chain tracked by the peer. + repeated uint64 tracked_shards = 3; + // Whether the peer is an archival node. + bool archival = 4; +} + +////////////////////////////////////// + +// Handshake is the first message exchanged after establishing a TCP connection. +// If A opened a connection B, then +// 1. A sends Handshake to B. +// 2a. If B accepts the handshake, it sends Handshake to A and connection is established. +// 2b. If B rejects the handshake, it sends HandshakeFailure to A. +// A may retry the Handshake with a different payload. +message Handshake { + // The protocol_version that the sender wants to use for communication. + // Currently NEAR protocol and NEAR network protocol are versioned together + // (it may change in the future), however peers may communicate with the newer version + // of the NEAR network protol, than the NEAR protocol version approved by the quorum of + // the validators. If B doesn't support protocol_version, it sends back HandshakeFailure + // with reason ProtocolVersionMismatch. + uint32 protocol_version = 1; + // Oldest version of the NEAR network protocol that the peer supports. + uint32 oldest_supported_version = 2; + // PeerId of the sender. + PublicKey sender_peer_id = 3; + // PeerId of the receiver that the sender expects. + // In case of mismatch, receiver sends back HandshakeFailure with + // reason InvalidTarget. + PublicKey target_peer_id = 4; + // TCP port on which sender is listening for inbound connections. + uint32 sender_listen_port = 5; + // Basic info about the NEAR chain that the sender belongs to. + // Sender expects receiver to belong to the same chain. + // In case of mismatch, receiver sends back HandshakeFailure with + // reason GenesisMismatch. + PeerChainInfo sender_chain_info = 6; + // Edge (sender,receiver) signed by sender, which once signed by + // receiver may be broadcasted to the network to prove that the + // connection has been established. + // In case receiver accepts the Handshake, it sends back back a Handshake + // containing his signature in this field. + PartialEdgeInfo partial_edge_info = 7; +} + +// Response to Handshake, in case the Handshake was rejected. +message HandshakeFailure { + enum Reason { + UNKNOWN = 0; + // Peer doesn't support protocol_version indicated in the handshake. + ProtocolVersionMismatch = 1; + // Peer doesn't belong to the chain indicated in the handshake. + GenesisMismatch = 2; + // target_id doesn't match the id of the peer. + InvalidTarget = 3; + } + // Reason for rejecting the Handshake. + Reason reason = 1; + + // Data about the peer. + PeerInfo peer_info = 2; + // GenesisId of the NEAR chain that the peer belongs to. + GenesisId genesis_id = 3; + // Newest NEAR network version supported by the peer. + uint32 version = 4; + // Oldest NEAR network version supported by the peer. + uint32 oldest_supported_version = 5; +} + +// TODO: document it. +message LastEdge { + Edge edge = 1; +} + +// Message sent whenever the sender learns about new connections +// between the peers in the network (I think). +// It provides a view of the whole NEAR network to each peer. +// +// Edges constitute a graph between PeerIds, signed by both of +// the peers. This is one of the first messages sent after Handshake. +// First RoutingTableUpdate contains the whole graph known to peer. +// Afterwards only the graph delta (changed edges) are included. +// +// Accounts provides a mapping AccountId -> PeerId, providing knowledge +// about which NEAR peer controls which NEAR account. +message RoutingTableUpdate { + repeated Edge edges = 1; + // list of known NEAR validator accounts + repeated AnnounceAccount accounts = 2; +} + +// TODO: document it. +message UpdateNonceRequest { + PartialEdgeInfo partial_edge_info = 1; +} + +// TODO: document it. +message UpdateNonceResponse { + Edge edge = 1; +} + +// Request to send a list of known healthy peers +// (i.e. considered honest and available by the receiver). +// Currently this list might include both +// - peers directly connected to the receiver +// - peers that the receiver transitively learned about from other peers. +message PeersRequest {} + +// Response to PeersRequest. +message PeersResponse { + repeated PeerInfo peers = 1; +} + +// Request to send back headers of the NEAR chain blocks. +// Receiver finds in block_hashes the first hash of a block it knows about +// and rends back BlockHeadersResponse with block headers following that block. +// At most 512 block headers are returned: +// https://github.com/near/nearcore/blob/1a4edefd0116f7d1e222bc96569367a02fe64199/chain/client/src/sync.rs#L38 +// It might happen that the receiver doesn't know some of the hashes in the list +// in the following cases: +// - sender's view of the chain forked from the receiver's view of the chain +// - sender's view of the chain is ahead of receiver's view of the chain. +message BlockHeadersRequest { + repeated CryptoHash block_hashes = 1; +} + +// A collection of headers of the NEAR chain blocks. +message BlockHeadersResponse { + repeated BlockHeader block_headers = 1; +} + +// Request to send back a NEAR chain block with a given hash. +message BlockRequest { + CryptoHash block_hash = 1; +} + +// NEAR chain Block. +// It might be send both as a response to BlockRequest, +// or unsolicitated in case a new Block is being broadcasted. +message BlockResponse { + Block block = 1; +} + +// Wrapper of borsh-encoded SignedTransaction +// https://github.com/near/nearcore/blob/1a4edefd0116f7d1e222bc96569367a02fe64199/core/primitives/src/transaction.rs#L218 +message SignedTransaction { + bytes borsh = 1; +} + +// Wrapper of borsh-encoded RoutedMessage +// https://github.com/near/nearcore/blob/1a4edefd0116f7d1e222bc96569367a02fe64199/chain/network-primitives/src/network_protocol/mod.rs#L295 +message RoutedMessage { + bytes borsh = 1; +} + +// Disconnect is send by a node before closing a TCP connection. +// There is no guarantee that it will be sent in all circumstances. +message Disconnect {} + +// Wrapper of borsh-encoded Challenge +// https://github.com/near/nearcore/blob/1a4edefd0116f7d1e222bc96569367a02fe64199/core/primitives/src/challenge.rs#L89 +message Challenge { + bytes borsh = 1; +} + +// TODO: document it +message EpochSyncRequest { + CryptoHash epoch_id = 1; +} + +// Wrapper of borsh-encoded EpochSyncResponse +// https://github.com/near/nearcore/blob/1a4edefd0116f7d1e222bc96569367a02fe64199/core/primitives/src/syncing.rs#L225 +message EpochSyncResponse { + bytes borsh = 1; +} + +// TODO: document it +message EpochSyncFinalizationRequest { + CryptoHash epoch_id = 1; +} + +// Wrapper of borsh-encoded EpochSyncFinalizationResponse +// https://github.com/near/nearcore/blob/1a4edefd0116f7d1e222bc96569367a02fe64199/core/primitives/src/syncing.rs#L202 +message EpochSyncFinalizationResponse { + bytes borsh = 1; +} + +// Wrapper of borsh-encoded RoutingSyncV2 +// https://github.com/near/nearcore/blob/1a4edefd0116f7d1e222bc96569367a02fe64199/chain/network/src/network_protocol.rs#L225 +message RoutingSyncV2 { + bytes borsh = 1; +} + +// PeerMessage is a wrapper of all message types exchanged between NEAR nodes. +// The wire format of a single message M consists of len(M)+4 bytes: +// : 4 bytes : little endian uint32 +// : N bytes : binary encoded protobuf PeerMessage M +message PeerMessage { + // Leaving 1,2,3 unused allows us to ensure that there will be no collision + // between borsh and protobuf encodings: + // https://docs.google.com/document/d/1gCWmt9O-h_-5JDXIqbKxAaSS3Q9pryB1f9DDY1mMav4/edit + reserved 1,2,3; + + oneof message_type { + Handshake handshake = 4; + HandshakeFailure handshake_failure = 5; + LastEdge last_edge = 6; + RoutingTableUpdate sync_routing_table = 7; + + UpdateNonceRequest update_nonce_request = 8; + UpdateNonceResponse update_nonce_response = 9; + + PeersRequest peers_request = 10; + PeersResponse peers_response = 11; + + BlockHeadersRequest block_headers_request = 12; + BlockHeadersResponse block_headers_response = 13; + + BlockRequest block_request = 14; + BlockResponse block_response = 15; + + SignedTransaction transaction = 16; + RoutedMessage routed = 17; + Disconnect disconnect = 18; + Challenge challenge = 19; + + EpochSyncRequest epoch_sync_request = 20; + EpochSyncResponse epoch_sync_response = 21; + EpochSyncFinalizationRequest epoch_sync_finalization_request = 22; + EpochSyncFinalizationResponse epoch_sync_finalization_response = 23; + + RoutingSyncV2 routing_table_sync_v2 = 24; + } +} diff --git a/chain/network/src/network_protocol/proto_conv.rs b/chain/network/src/network_protocol/proto_conv.rs new file mode 100644 index 00000000000..6279b89aceb --- /dev/null +++ b/chain/network/src/network_protocol/proto_conv.rs @@ -0,0 +1,625 @@ +/// Contains protobuf <-> network_protocol conversions. +use crate::network_protocol::proto; +use crate::network_protocol::proto::peer_message::Message_type as ProtoMT; +use crate::network_protocol::{ + Handshake, HandshakeFailureReason, PeerMessage, RoutingSyncV2, RoutingTableUpdate, +}; +use borsh::{BorshDeserialize as _, BorshSerialize as _}; +use near_network_primitives::types::{ + Edge, PartialEdgeInfo, PeerChainInfoV2, PeerInfo, RoutedMessage, +}; +use near_primitives::block::{Block, BlockHeader, GenesisId}; +use near_primitives::challenge::Challenge; +use near_primitives::hash::CryptoHash; +use near_primitives::network::{AnnounceAccount, PeerId}; +use near_primitives::syncing::{EpochSyncFinalizationResponse, EpochSyncResponse}; +use near_primitives::transaction::SignedTransaction; +use near_primitives::types::EpochId; +use protobuf::MessageField as MF; +use thiserror::Error; + +#[derive(Error, Debug)] +#[error("[{idx}]: {source}")] +pub struct ParseVecError { + idx: usize, + #[source] + source: E, +} + +fn try_from_vec<'a, X, Y: TryFrom<&'a X>>( + xs: &'a Vec, +) -> Result, ParseVecError> { + let mut ys = vec![]; + for (idx, x) in xs.iter().enumerate() { + ys.push(x.try_into().map_err(|source| ParseVecError { idx, source })?); + } + Ok(ys) +} + +#[derive(Error, Debug)] +pub enum ParseRequiredError { + #[error("missing, while required")] + Missing, + #[error(transparent)] + Other(E), +} + +fn try_from_required<'a, X, Y: TryFrom<&'a X>>( + x: &'a MF, +) -> Result> { + x.as_ref().ok_or(ParseRequiredError::Missing)?.try_into().map_err(ParseRequiredError::Other) +} + +impl From<&CryptoHash> for proto::CryptoHash { + fn from(x: &CryptoHash) -> Self { + let mut y = Self::new(); + y.hash = x.0.into(); + y + } +} + +pub type ParseCryptoHashError = Box; + +impl TryFrom<&proto::CryptoHash> for CryptoHash { + type Error = ParseCryptoHashError; + fn try_from(p: &proto::CryptoHash) -> Result { + CryptoHash::try_from(&p.hash[..]) + } +} + +////////////////////////////////////////// + +impl From<&GenesisId> for proto::GenesisId { + fn from(x: &GenesisId) -> Self { + Self { chain_id: x.chain_id.clone(), hash: MF::some((&x.hash).into()), ..Self::default() } + } +} + +#[derive(Error, Debug)] +pub enum ParseGenesisIdError { + #[error("hash: {0}")] + Hash(ParseRequiredError), +} + +impl TryFrom<&proto::GenesisId> for GenesisId { + type Error = ParseGenesisIdError; + fn try_from(p: &proto::GenesisId) -> Result { + Ok(Self { + chain_id: p.chain_id.clone(), + hash: try_from_required(&p.hash).map_err(Self::Error::Hash)?, + }) + } +} + +////////////////////////////////////////// + +impl From<&PeerChainInfoV2> for proto::PeerChainInfo { + fn from(x: &PeerChainInfoV2) -> Self { + Self { + genesis_id: MF::some((&x.genesis_id).into()), + height: x.height, + tracked_shards: x.tracked_shards.clone(), + archival: x.archival, + ..Self::default() + } + } +} + +#[derive(Error, Debug)] +pub enum ParsePeerChainInfoV2Error { + #[error("genesis_id {0}")] + GenesisId(ParseRequiredError), +} + +impl TryFrom<&proto::PeerChainInfo> for PeerChainInfoV2 { + type Error = ParsePeerChainInfoV2Error; + fn try_from(p: &proto::PeerChainInfo) -> Result { + Ok(Self { + genesis_id: try_from_required(&p.genesis_id).map_err(Self::Error::GenesisId)?, + height: p.height, + tracked_shards: p.tracked_shards.clone(), + archival: p.archival, + }) + } +} + +////////////////////////////////////////// + +impl From<&PeerId> for proto::PublicKey { + fn from(x: &PeerId) -> Self { + Self { borsh: x.try_to_vec().unwrap(), ..Self::default() } + } +} + +pub type ParsePeerIdError = borsh::maybestd::io::Error; + +impl TryFrom<&proto::PublicKey> for PeerId { + type Error = ParsePeerIdError; + fn try_from(p: &proto::PublicKey) -> Result { + Self::try_from_slice(&p.borsh) + } +} + +////////////////////////////////////////// + +impl From<&PartialEdgeInfo> for proto::PartialEdgeInfo { + fn from(x: &PartialEdgeInfo) -> Self { + Self { borsh: x.try_to_vec().unwrap(), ..Self::default() } + } +} + +pub type ParsePartialEdgeInfoError = borsh::maybestd::io::Error; + +impl TryFrom<&proto::PartialEdgeInfo> for PartialEdgeInfo { + type Error = ParsePartialEdgeInfoError; + fn try_from(p: &proto::PartialEdgeInfo) -> Result { + Self::try_from_slice(&p.borsh) + } +} + +////////////////////////////////////////// + +impl From<&PeerInfo> for proto::PeerInfo { + fn from(x: &PeerInfo) -> Self { + Self { borsh: x.try_to_vec().unwrap(), ..Self::default() } + } +} + +pub type ParsePeerInfoError = borsh::maybestd::io::Error; + +impl TryFrom<&proto::PeerInfo> for PeerInfo { + type Error = ParsePeerInfoError; + fn try_from(x: &proto::PeerInfo) -> Result { + Self::try_from_slice(&x.borsh) + } +} + +////////////////////////////////////////// + +impl From<&Handshake> for proto::Handshake { + fn from(x: &Handshake) -> Self { + Self { + protocol_version: x.protocol_version, + oldest_supported_version: x.oldest_supported_version, + sender_peer_id: MF::some((&x.sender_peer_id).into()), + target_peer_id: MF::some((&x.target_peer_id).into()), + sender_listen_port: x.sender_listen_port.unwrap_or(0).into(), + sender_chain_info: MF::some((&x.sender_chain_info).into()), + partial_edge_info: MF::some((&x.partial_edge_info).into()), + ..Self::default() + } + } +} + +#[derive(Error, Debug)] +pub enum ParseHandshakeError { + #[error("sender_peer_id {0}")] + SenderPeerId(ParseRequiredError), + #[error("target_peer_id {0}")] + TargetPeerId(ParseRequiredError), + #[error("sender_listen_port {0}")] + SenderListenPort(std::num::TryFromIntError), + #[error("sender_chain_info {0}")] + SenderChainInfo(ParseRequiredError), + #[error("partial_edge_info {0}")] + PartialEdgeInfo(ParseRequiredError), +} + +impl TryFrom<&proto::Handshake> for Handshake { + type Error = ParseHandshakeError; + fn try_from(p: &proto::Handshake) -> Result { + Ok(Self { + protocol_version: p.protocol_version, + oldest_supported_version: p.oldest_supported_version, + sender_peer_id: try_from_required(&p.sender_peer_id) + .map_err(Self::Error::SenderPeerId)?, + target_peer_id: try_from_required(&p.target_peer_id) + .map_err(Self::Error::TargetPeerId)?, + sender_listen_port: { + let port = + u16::try_from(p.sender_listen_port).map_err(Self::Error::SenderListenPort)?; + if port == 0 { + None + } else { + Some(port) + } + }, + sender_chain_info: try_from_required(&p.sender_chain_info) + .map_err(Self::Error::SenderChainInfo)?, + partial_edge_info: try_from_required(&p.partial_edge_info) + .map_err(Self::Error::PartialEdgeInfo)?, + }) + } +} + +////////////////////////////////////////// + +impl From<(&PeerInfo, &HandshakeFailureReason)> for proto::HandshakeFailure { + fn from((pi, hfr): (&PeerInfo, &HandshakeFailureReason)) -> Self { + match hfr { + HandshakeFailureReason::ProtocolVersionMismatch { + version, + oldest_supported_version, + } => Self { + peer_info: MF::some(pi.into()), + reason: proto::handshake_failure::Reason::ProtocolVersionMismatch.into(), + version: *version, + oldest_supported_version: *oldest_supported_version, + ..Default::default() + }, + HandshakeFailureReason::GenesisMismatch(genesis_id) => Self { + peer_info: MF::some(pi.into()), + reason: proto::handshake_failure::Reason::GenesisMismatch.into(), + genesis_id: MF::some(genesis_id.into()), + ..Default::default() + }, + HandshakeFailureReason::InvalidTarget => Self { + peer_info: MF::some(pi.into()), + reason: proto::handshake_failure::Reason::InvalidTarget.into(), + ..Default::default() + }, + } + } +} + +#[derive(Error, Debug)] +pub enum ParseHandshakeFailureError { + #[error("peer_info: {0}")] + PeerInfo(ParseRequiredError), + #[error("genesis_id: {0}")] + GenesisId(ParseRequiredError), + #[error("reason: unknown")] + UnknownReason, +} + +impl TryFrom<&proto::HandshakeFailure> for (PeerInfo, HandshakeFailureReason) { + type Error = ParseHandshakeFailureError; + fn try_from(x: &proto::HandshakeFailure) -> Result { + let pi = try_from_required(&x.peer_info).map_err(Self::Error::PeerInfo)?; + let hfr = match x.reason.enum_value_or_default() { + proto::handshake_failure::Reason::ProtocolVersionMismatch => { + HandshakeFailureReason::ProtocolVersionMismatch { + version: x.version, + oldest_supported_version: x.oldest_supported_version, + } + } + proto::handshake_failure::Reason::GenesisMismatch => { + HandshakeFailureReason::GenesisMismatch( + try_from_required(&x.genesis_id).map_err(Self::Error::GenesisId)?, + ) + } + proto::handshake_failure::Reason::InvalidTarget => { + HandshakeFailureReason::InvalidTarget + } + proto::handshake_failure::Reason::UNKNOWN => return Err(Self::Error::UnknownReason), + }; + Ok((pi, hfr)) + } +} + +////////////////////////////////////////// + +impl From<&Edge> for proto::Edge { + fn from(x: &Edge) -> Self { + Self { borsh: x.try_to_vec().unwrap(), ..Self::default() } + } +} + +pub type ParseEdgeError = borsh::maybestd::io::Error; + +impl TryFrom<&proto::Edge> for Edge { + type Error = ParseEdgeError; + fn try_from(x: &proto::Edge) -> Result { + Self::try_from_slice(&x.borsh) + } +} + +////////////////////////////////////////// + +impl From<&AnnounceAccount> for proto::AnnounceAccount { + fn from(x: &AnnounceAccount) -> Self { + Self { borsh: x.try_to_vec().unwrap(), ..Self::default() } + } +} + +pub type ParseAnnounceAccountError = borsh::maybestd::io::Error; + +impl TryFrom<&proto::AnnounceAccount> for AnnounceAccount { + type Error = ParseAnnounceAccountError; + fn try_from(x: &proto::AnnounceAccount) -> Result { + Self::try_from_slice(&x.borsh) + } +} + +////////////////////////////////////////// + +impl From<&RoutingTableUpdate> for proto::RoutingTableUpdate { + fn from(x: &RoutingTableUpdate) -> Self { + Self { + edges: x.edges.iter().map(Into::into).collect(), + accounts: x.accounts.iter().map(Into::into).collect(), + ..Self::default() + } + } +} + +#[derive(Error, Debug)] +pub enum ParseRoutingTableUpdateError { + #[error("edges {0}")] + Edges(ParseVecError), + #[error("accounts {0}")] + Accounts(ParseVecError), +} + +impl TryFrom<&proto::RoutingTableUpdate> for RoutingTableUpdate { + type Error = ParseRoutingTableUpdateError; + fn try_from(x: &proto::RoutingTableUpdate) -> Result { + Ok(Self { + edges: try_from_vec(&x.edges).map_err(Self::Error::Edges)?, + accounts: try_from_vec(&x.accounts).map_err(Self::Error::Accounts)?, + }) + } +} + +////////////////////////////////////////// + +impl From<&BlockHeader> for proto::BlockHeader { + fn from(x: &BlockHeader) -> Self { + Self { borsh: x.try_to_vec().unwrap(), ..Self::default() } + } +} + +pub type ParseBlockHeaderError = borsh::maybestd::io::Error; + +impl TryFrom<&proto::BlockHeader> for BlockHeader { + type Error = ParseBlockHeaderError; + fn try_from(x: &proto::BlockHeader) -> Result { + Self::try_from_slice(&x.borsh) + } +} + +////////////////////////////////////////// + +impl From<&Block> for proto::Block { + fn from(x: &Block) -> Self { + Self { borsh: x.try_to_vec().unwrap(), ..Self::default() } + } +} + +pub type ParseBlockError = borsh::maybestd::io::Error; + +impl TryFrom<&proto::Block> for Block { + type Error = ParseBlockError; + fn try_from(x: &proto::Block) -> Result { + Self::try_from_slice(&x.borsh) + } +} + +////////////////////////////////////////// + +impl From<&PeerMessage> for proto::PeerMessage { + fn from(x: &PeerMessage) -> Self { + Self { + message_type: Some(match x { + PeerMessage::Handshake(h) => ProtoMT::Handshake(h.into()), + PeerMessage::HandshakeFailure(pi, hfr) => { + ProtoMT::HandshakeFailure((pi, hfr).into()) + } + PeerMessage::LastEdge(e) => ProtoMT::LastEdge(proto::LastEdge { + edge: MF::some(e.into()), + ..Default::default() + }), + PeerMessage::SyncRoutingTable(rtu) => ProtoMT::SyncRoutingTable(rtu.into()), + PeerMessage::RequestUpdateNonce(pei) => { + ProtoMT::UpdateNonceRequest(proto::UpdateNonceRequest { + partial_edge_info: MF::some(pei.into()), + ..Default::default() + }) + } + PeerMessage::ResponseUpdateNonce(e) => { + ProtoMT::UpdateNonceResponse(proto::UpdateNonceResponse { + edge: MF::some(e.into()), + ..Default::default() + }) + } + PeerMessage::PeersRequest => ProtoMT::PeersRequest(proto::PeersRequest::new()), + PeerMessage::PeersResponse(pis) => ProtoMT::PeersResponse(proto::PeersResponse { + peers: pis.iter().map(Into::into).collect(), + ..Default::default() + }), + PeerMessage::BlockHeadersRequest(bhs) => { + ProtoMT::BlockHeadersRequest(proto::BlockHeadersRequest { + block_hashes: bhs.iter().map(Into::into).collect(), + ..Default::default() + }) + } + PeerMessage::BlockHeaders(bhs) => { + ProtoMT::BlockHeadersResponse(proto::BlockHeadersResponse { + block_headers: bhs.iter().map(Into::into).collect(), + ..Default::default() + }) + } + PeerMessage::BlockRequest(bh) => ProtoMT::BlockRequest(proto::BlockRequest { + block_hash: MF::some(bh.into()), + ..Default::default() + }), + PeerMessage::Block(b) => ProtoMT::BlockResponse(proto::BlockResponse { + block: MF::some(b.into()), + ..Default::default() + }), + PeerMessage::Transaction(t) => ProtoMT::Transaction(proto::SignedTransaction { + borsh: t.try_to_vec().unwrap(), + ..Default::default() + }), + PeerMessage::Routed(r) => ProtoMT::Routed(proto::RoutedMessage { + borsh: r.try_to_vec().unwrap(), + ..Default::default() + }), + PeerMessage::Disconnect => ProtoMT::Disconnect(proto::Disconnect::new()), + PeerMessage::Challenge(r) => ProtoMT::Challenge(proto::Challenge { + borsh: r.try_to_vec().unwrap(), + ..Default::default() + }), + PeerMessage::EpochSyncRequest(epoch_id) => { + ProtoMT::EpochSyncRequest(proto::EpochSyncRequest { + epoch_id: MF::some((&epoch_id.0).into()), + ..Default::default() + }) + } + PeerMessage::EpochSyncResponse(esr) => { + ProtoMT::EpochSyncResponse(proto::EpochSyncResponse { + borsh: esr.try_to_vec().unwrap(), + ..Default::default() + }) + } + PeerMessage::EpochSyncFinalizationRequest(epoch_id) => { + ProtoMT::EpochSyncFinalizationRequest(proto::EpochSyncFinalizationRequest { + epoch_id: MF::some((&epoch_id.0).into()), + ..Default::default() + }) + } + PeerMessage::EpochSyncFinalizationResponse(esfr) => { + ProtoMT::EpochSyncFinalizationResponse(proto::EpochSyncFinalizationResponse { + borsh: esfr.try_to_vec().unwrap(), + ..Default::default() + }) + } + PeerMessage::RoutingTableSyncV2(rs) => { + ProtoMT::RoutingTableSyncV2(proto::RoutingSyncV2 { + borsh: rs.try_to_vec().unwrap(), + ..Default::default() + }) + } + }), + ..Default::default() + } + } +} + +pub type ParseTransactionError = borsh::maybestd::io::Error; +pub type ParseRoutedError = borsh::maybestd::io::Error; +pub type ParseChallengeError = borsh::maybestd::io::Error; +pub type ParseEpochSyncResponseError = borsh::maybestd::io::Error; +pub type ParseEpochSyncFinalizationResponseError = borsh::maybestd::io::Error; +pub type ParseRoutingTableSyncV2Error = borsh::maybestd::io::Error; + +#[derive(Error, Debug)] +pub enum ParsePeerMessageError { + #[error("empty message")] + Empty, + #[error("handshake: {0}")] + Handshake(ParseHandshakeError), + #[error("handshake_failure: {0}")] + HandshakeFailure(ParseHandshakeFailureError), + #[error("last_edge: {0}")] + LastEdge(ParseRequiredError), + #[error("sync_routing_table: {0}")] + SyncRoutingTable(ParseRoutingTableUpdateError), + #[error("update_nonce_requrest: {0}")] + UpdateNonceRequest(ParseRequiredError), + #[error("update_nonce_response: {0}")] + UpdateNonceResponse(ParseRequiredError), + #[error("peers_response: {0}")] + PeersResponse(ParseVecError), + #[error("block_headers_request: {0}")] + BlockHeadersRequest(ParseVecError), + #[error("block_headers_response: {0}")] + BlockHeadersResponse(ParseVecError), + #[error("block_request: {0}")] + BlockRequest(ParseRequiredError), + #[error("block_response: {0}")] + BlockResponse(ParseRequiredError), + #[error("transaction: {0}")] + Transaction(ParseTransactionError), + #[error("routed: {0}")] + Routed(ParseRoutedError), + #[error("challenge: {0}")] + Challenge(ParseChallengeError), + #[error("epoch_sync_request: {0}")] + EpochSyncRequest(ParseRequiredError), + #[error("epoch_sync_response: {0}")] + EpochSyncResponse(ParseEpochSyncResponseError), + #[error("epoch_sync_finalization_request: {0}")] + EpochSyncFinalizationRequest(ParseRequiredError), + #[error("epoch_sync_finalization_response: {0}")] + EpochSyncFinalizationResponse(ParseEpochSyncFinalizationResponseError), + #[error("routing_table_sync_v2")] + RoutingTableSyncV2(ParseRoutingTableSyncV2Error), +} + +impl TryFrom<&proto::PeerMessage> for PeerMessage { + type Error = ParsePeerMessageError; + fn try_from(x: &proto::PeerMessage) -> Result { + Ok(match x.message_type.as_ref().ok_or(Self::Error::Empty)? { + ProtoMT::Handshake(h) => { + PeerMessage::Handshake(h.try_into().map_err(Self::Error::Handshake)?) + } + ProtoMT::HandshakeFailure(hf) => { + let (pi, hfr) = hf.try_into().map_err(Self::Error::HandshakeFailure)?; + PeerMessage::HandshakeFailure(pi, hfr) + } + ProtoMT::LastEdge(le) => { + PeerMessage::LastEdge(try_from_required(&le.edge).map_err(Self::Error::LastEdge)?) + } + ProtoMT::SyncRoutingTable(rtu) => PeerMessage::SyncRoutingTable( + rtu.try_into().map_err(Self::Error::SyncRoutingTable)?, + ), + ProtoMT::UpdateNonceRequest(unr) => PeerMessage::RequestUpdateNonce( + try_from_required(&unr.partial_edge_info) + .map_err(Self::Error::UpdateNonceRequest)?, + ), + ProtoMT::UpdateNonceResponse(unr) => PeerMessage::ResponseUpdateNonce( + try_from_required(&unr.edge).map_err(Self::Error::UpdateNonceResponse)?, + ), + ProtoMT::PeersRequest(_) => PeerMessage::PeersRequest, + ProtoMT::PeersResponse(pr) => PeerMessage::PeersResponse( + try_from_vec(&pr.peers).map_err(Self::Error::PeersResponse)?, + ), + ProtoMT::BlockHeadersRequest(bhr) => PeerMessage::BlockHeadersRequest( + try_from_vec(&bhr.block_hashes).map_err(Self::Error::BlockHeadersRequest)?, + ), + ProtoMT::BlockHeadersResponse(bhr) => PeerMessage::BlockHeaders( + try_from_vec(&bhr.block_headers).map_err(Self::Error::BlockHeadersResponse)?, + ), + ProtoMT::BlockRequest(br) => PeerMessage::BlockRequest( + try_from_required(&br.block_hash).map_err(Self::Error::BlockRequest)?, + ), + ProtoMT::BlockResponse(br) => PeerMessage::Block( + try_from_required(&br.block).map_err(Self::Error::BlockResponse)?, + ), + ProtoMT::Transaction(t) => PeerMessage::Transaction( + SignedTransaction::try_from_slice(&t.borsh).map_err(Self::Error::Transaction)?, + ), + ProtoMT::Routed(r) => PeerMessage::Routed(Box::new( + RoutedMessage::try_from_slice(&r.borsh).map_err(Self::Error::Routed)?, + )), + ProtoMT::Disconnect(_) => PeerMessage::Disconnect, + ProtoMT::Challenge(c) => PeerMessage::Challenge( + Challenge::try_from_slice(&c.borsh).map_err(Self::Error::Challenge)?, + ), + ProtoMT::EpochSyncRequest(esr) => PeerMessage::EpochSyncRequest(EpochId( + try_from_required(&esr.epoch_id).map_err(Self::Error::EpochSyncRequest)?, + )), + ProtoMT::EpochSyncResponse(esr) => PeerMessage::EpochSyncResponse(Box::new( + EpochSyncResponse::try_from_slice(&esr.borsh) + .map_err(Self::Error::EpochSyncResponse)?, + )), + ProtoMT::EpochSyncFinalizationRequest(esr) => { + PeerMessage::EpochSyncFinalizationRequest(EpochId( + try_from_required(&esr.epoch_id) + .map_err(Self::Error::EpochSyncFinalizationRequest)?, + )) + } + ProtoMT::EpochSyncFinalizationResponse(esr) => { + PeerMessage::EpochSyncFinalizationResponse(Box::new( + EpochSyncFinalizationResponse::try_from_slice(&esr.borsh) + .map_err(Self::Error::EpochSyncFinalizationResponse)?, + )) + } + ProtoMT::RoutingTableSyncV2(rts) => PeerMessage::RoutingTableSyncV2( + RoutingSyncV2::try_from_slice(&rts.borsh) + .map_err(Self::Error::RoutingTableSyncV2)?, + ), + }) + } +} diff --git a/chain/network/src/peer/codec.rs b/chain/network/src/peer/codec.rs index bb3e5ebdb32..74ba890c44d 100644 --- a/chain/network/src/peer/codec.rs +++ b/chain/network/src/peer/codec.rs @@ -109,7 +109,6 @@ impl Decoder for Codec { mod test { use crate::peer::codec::{Codec, NETWORK_MESSAGE_MAX_SIZE_BYTES}; use crate::types::{Handshake, PeerMessage, RoutingTableUpdate}; - use borsh::{BorshDeserialize, BorshSerialize}; use bytes::{BufMut, BytesMut}; use near_crypto::{KeyType, SecretKey}; use near_network_primitives::types::{ @@ -124,11 +123,15 @@ mod test { use tokio_util::codec::{Decoder, Encoder}; fn test_codec(msg: PeerMessage) { - let mut codec = Codec::default(); - let mut buffer = BytesMut::new(); - codec.encode(msg.try_to_vec().unwrap(), &mut buffer).unwrap(); - let decoded = codec.decode(&mut buffer).unwrap().unwrap().unwrap(); - assert_eq!(PeerMessage::try_from_slice(&decoded).unwrap(), msg); + for enc in + [crate::network_protocol::Encoding::Proto, crate::network_protocol::Encoding::Borsh] + { + let mut codec = Codec::default(); + let mut buffer = BytesMut::new(); + codec.encode(msg.serialize(enc), &mut buffer).unwrap(); + let decoded = codec.decode(&mut buffer).unwrap().unwrap().unwrap(); + assert_eq!(PeerMessage::deserialize(enc, &decoded).unwrap(), msg); + } } #[test] diff --git a/chain/network/src/peer/mod.rs b/chain/network/src/peer/mod.rs index 77d796e84f9..fb64e617d0b 100644 --- a/chain/network/src/peer/mod.rs +++ b/chain/network/src/peer/mod.rs @@ -2,4 +2,3 @@ pub(crate) mod codec; pub(crate) mod peer_actor; mod tracker; mod transfer_stats; -mod utils; diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 9282fa73646..a603404c157 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -1,6 +1,6 @@ +use crate::network_protocol::{Encoding, ParsePeerMessageError}; use crate::peer::codec::Codec; use crate::peer::tracker::Tracker; -use crate::peer::utils; use crate::private_actix::{ PeersRequest, RegisterPeer, RegisterPeerResponse, SendMessage, Unregister, }; @@ -10,12 +10,10 @@ use crate::types::{ NetworkRequests, NetworkResponses, PeerManagerMessageRequest, PeerMessage, PeerRequest, PeerResponse, PeersResponse, }; -use crate::PeerManagerActor; use actix::{ - Actor, ActorContext, ActorFutureExt, Addr, Arbiter, AsyncContext, Context, - ContextFutureSpawner, Handler, Recipient, Running, StreamHandler, WrapFuture, + Actor, ActorContext, ActorFutureExt, Arbiter, AsyncContext, Context, ContextFutureSpawner, + Handler, Recipient, Running, StreamHandler, WrapFuture, }; -use borsh::{BorshDeserialize, BorshSerialize}; use lru::LruCache; use near_crypto::Signature; use near_network_primitives::types::{ @@ -29,7 +27,6 @@ use near_network_primitives::types::{Edge, PartialEdgeInfo}; use near_performance_metrics::framed_write::{FramedWrite, WriteHandler}; use near_performance_metrics_macros::perf; use near_primitives::block::GenesisId; -use near_primitives::borsh::maybestd::io::Error; use near_primitives::logging; use near_primitives::network::PeerId; use near_primitives::sharding::PartialEncodedChunk; @@ -46,6 +43,7 @@ use std::net::SocketAddr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; +use thiserror::Error; use tracing::{debug, error, info, trace, warn}; type WriteHalf = tokio::io::WriteHalf; @@ -81,7 +79,11 @@ pub(crate) struct PeerActor { /// Handshake timeout. handshake_timeout: Duration, /// Peer manager recipient to break the dependency loop. - peer_manager_addr: Addr, + /// PeerManager is a recipient of 2 types of messages, therefore + /// to inject a fake PeerManager in tests, we need a separate + /// recipient address for each message type. + peer_manager_addr: Recipient, + peer_manager_wrapper_addr: Recipient>, /// Addr for client to send messages related to the chain. client_addr: Recipient, /// Addr for view client to send messages related to the chain. @@ -105,6 +107,11 @@ pub(crate) struct PeerActor { routed_message_cache: LruCache<(PeerId, PeerIdOrHash, Signature), Instant>, /// A helper data structure for limiting reading throttle_controller: ThrottleController, + /// Whether we detected support for protocol buffers during handshake. + protocol_buffers_supported: bool, + /// Whether the PeerActor should skip protobuf support detection and use + /// a given encoding right away. + force_encoding: Option, } impl Debug for PeerActor { @@ -113,6 +120,15 @@ impl Debug for PeerActor { } } +/// A custom IOError type because FramedWrite is hiding the actual +/// underlying std::io::Error. +/// TODO: replace FramedWrite with sth more reasonable. +#[derive(Error, Debug)] +pub enum IOError { + #[error("{tid} Failed to send message {message_type} of size {size}")] + Send { tid: usize, message_type: String, size: usize }, +} + impl PeerActor { #[allow(clippy::too_many_arguments)] pub(crate) fn new( @@ -122,13 +138,15 @@ impl PeerActor { peer_type: PeerType, framed: FramedWrite, WriteHalf, Codec, Codec>, handshake_timeout: Duration, - peer_manager_addr: Addr, + peer_manager_addr: Recipient, + peer_manager_wrapper_addr: Recipient>, client_addr: Recipient, view_client_addr: Recipient, partial_edge_info: Option, txns_since_last_block: Arc, peer_counter: Arc, throttle_controller: ThrottleController, + force_encoding: Option, ) -> Self { PeerActor { my_node_info, @@ -140,6 +158,7 @@ impl PeerActor { framed, handshake_timeout, peer_manager_addr, + peer_manager_wrapper_addr, client_addr, view_client_addr, tracker: Default::default(), @@ -151,32 +170,89 @@ impl PeerActor { peer_counter, routed_message_cache: LruCache::new(ROUTED_MESSAGE_CACHE_SIZE), throttle_controller, + protocol_buffers_supported: false, + force_encoding, } } - fn send_message(&mut self, msg: &PeerMessage) { + fn parse_message(&mut self, msg: &[u8]) -> Result { + if let Some(e) = self.force_encoding { + return PeerMessage::deserialize(e, msg); + } + if self.peer_status == PeerStatus::Connecting { + if let Ok(msg) = PeerMessage::deserialize(Encoding::Proto, msg) { + self.protocol_buffers_supported = true; + return Ok(msg); + } + match PeerMessage::deserialize(Encoding::Borsh, msg) { + Ok(msg) => Ok(msg), + Err(err) => { + self.send_message_or_log(&PeerMessage::HandshakeFailure( + self.my_node_info.clone(), + HandshakeFailureReason::ProtocolVersionMismatch { + version: PROTOCOL_VERSION, + oldest_supported_version: PEER_MIN_ALLOWED_PROTOCOL_VERSION, + }, + )); + Err(err) + } + } + } else if self.protocol_buffers_supported { + PeerMessage::deserialize(Encoding::Proto, msg) + } else { + PeerMessage::deserialize(Encoding::Borsh, msg) + } + } + + fn send_message_or_log(&mut self, msg: &PeerMessage) { + if let Err(err) = self.send_message(msg) { + warn!(target: "network", "send_message(): {}", err); + } + } + + fn send_message(&mut self, msg: &PeerMessage) -> Result<(), IOError> { + if let Some(enc) = self.force_encoding { + return self.send_message_with_encoding(msg, enc); + } + if self.peer_status == PeerStatus::Connecting { + self.send_message_with_encoding(msg, Encoding::Proto)?; + self.send_message_with_encoding(msg, Encoding::Borsh)?; + } else if self.protocol_buffers_supported { + self.send_message_with_encoding(msg, Encoding::Proto)?; + } else { + self.send_message_with_encoding(msg, Encoding::Borsh)?; + } + Ok(()) + } + + fn send_message_with_encoding( + &mut self, + msg: &PeerMessage, + enc: Encoding, + ) -> Result<(), IOError> { // Skip sending block and headers if we received it or header from this peer. // Record block requests in tracker. match msg { - PeerMessage::Block(b) if self.tracker.has_received(b.hash()) => return, + PeerMessage::Block(b) if self.tracker.has_received(b.hash()) => return Ok(()), PeerMessage::BlockRequest(h) => self.tracker.push_request(*h), _ => (), }; - match msg.try_to_vec() { - Ok(bytes) => { - self.tracker.increment_sent(bytes.len() as u64); - let bytes_len = bytes.len(); - if !self.framed.write(bytes) { - #[cfg(feature = "performance_stats")] - let tid = near_rust_allocator_proxy::get_tid(); - #[cfg(not(feature = "performance_stats"))] - let tid = 0; - error!("{} Failed to send message {} of size {}", tid, msg.as_ref(), bytes_len,) - } - } - Err(err) => error!(target: "network", "Error converting message to bytes: {}", err), - }; + let bytes = msg.serialize(enc); + self.tracker.increment_sent(bytes.len() as u64); + let bytes_len = bytes.len(); + if !self.framed.write(bytes) { + #[cfg(feature = "performance_stats")] + let tid = near_rust_allocator_proxy::get_tid(); + #[cfg(not(feature = "performance_stats"))] + let tid = 0; + return Err(IOError::Send { + tid, + message_type: msg.as_ref().to_string(), + size: bytes_len, + }); + } + Ok(()) } fn fetch_client_chain_info(&self, ctx: &mut Context) { @@ -229,7 +305,7 @@ impl PeerActor { } }; - act.send_message(&handshake); + act.send_message_or_log(&handshake); actix::fut::ready(()) } Err(err) => { @@ -322,16 +398,18 @@ impl PeerActor { match res { Ok(NetworkViewClientResponses::TxStatus(tx_result)) => { let body = Box::new(RoutedMessageBody::TxStatusResponse(*tx_result)); - act.peer_manager_addr.do_send(PeerManagerMessageRequest::PeerRequest( - PeerRequest::RouteBack(body, msg_hash.unwrap()), - )); + let _ = + act.peer_manager_addr.do_send(PeerManagerMessageRequest::PeerRequest( + PeerRequest::RouteBack(body, msg_hash.unwrap()), + )); } Ok(NetworkViewClientResponses::QueryResponse { query_id, response }) => { let body = Box::new(RoutedMessageBody::QueryResponse { query_id, response }); - act.peer_manager_addr.do_send(PeerManagerMessageRequest::PeerRequest( - PeerRequest::RouteBack(body, msg_hash.unwrap()), - )); + let _ = + act.peer_manager_addr.do_send(PeerManagerMessageRequest::PeerRequest( + PeerRequest::RouteBack(body, msg_hash.unwrap()), + )); } Ok(NetworkViewClientResponses::StateResponse(state_response)) => { let body = match *state_response { @@ -342,22 +420,25 @@ impl PeerActor { RoutedMessageBody::VersionedStateResponse(state_response) } }; - act.peer_manager_addr.do_send(PeerManagerMessageRequest::PeerRequest( - PeerRequest::RouteBack(Box::new(body), msg_hash.unwrap()), - )); + let _ = + act.peer_manager_addr.do_send(PeerManagerMessageRequest::PeerRequest( + PeerRequest::RouteBack(Box::new(body), msg_hash.unwrap()), + )); } Ok(NetworkViewClientResponses::Block(block)) => { // MOO need protocol version - act.send_message(&PeerMessage::Block(*block)) + act.send_message_or_log(&PeerMessage::Block(*block)); } Ok(NetworkViewClientResponses::BlockHeaders(headers)) => { - act.send_message(&PeerMessage::BlockHeaders(headers)) + act.send_message_or_log(&PeerMessage::BlockHeaders(headers)); } Ok(NetworkViewClientResponses::EpochSyncResponse(response)) => { - act.send_message(&PeerMessage::EpochSyncResponse(response)) + act.send_message_or_log(&PeerMessage::EpochSyncResponse(response)); } Ok(NetworkViewClientResponses::EpochSyncFinalizationResponse(response)) => { - act.send_message(&PeerMessage::EpochSyncFinalizationResponse(response)) + act.send_message_or_log(&PeerMessage::EpochSyncFinalizationResponse( + response, + )); } Err(err) => { error!( @@ -464,7 +545,6 @@ impl PeerActor { NetworkClientMessages::EpochSyncFinalizationResponse(peer_id, response) } PeerMessage::Handshake(_) - | PeerMessage::_HandshakeV2 | PeerMessage::HandshakeFailure(_, _) | PeerMessage::PeersRequest | PeerMessage::PeersResponse(_) @@ -476,12 +556,8 @@ impl PeerActor { | PeerMessage::BlockRequest(_) | PeerMessage::BlockHeadersRequest(_) | PeerMessage::EpochSyncRequest(_) - | PeerMessage::EpochSyncFinalizationRequest(_) => { - error!(target: "network", "Peer receive_client_message received unexpected type: {:?}", msg); - return; - } - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - PeerMessage::RoutingTableSyncV2(_) => { + | PeerMessage::EpochSyncFinalizationRequest(_) + | PeerMessage::RoutingTableSyncV2(_) => { error!(target: "network", "Peer receive_client_message received unexpected type: {:?}", msg); return; } @@ -522,7 +598,7 @@ impl PeerActor { > UPDATE_INTERVAL_LAST_TIME_RECEIVED_MESSAGE { self.last_time_received_message_update = Clock::instant(); - self.peer_manager_addr.do_send(PeerManagerMessageRequest::PeerRequest( + let _ = self.peer_manager_addr.do_send(PeerManagerMessageRequest::PeerRequest( PeerRequest::ReceivedMessage(peer_id, self.last_time_received_message_update), )); } @@ -538,41 +614,19 @@ impl PeerActor { /// Check whenever we exceeded number of transactions we got since last block. /// If so, drop the transaction. - fn should_we_drop_msg_without_decoding(&self, msg: &[u8]) -> bool { - if utils::is_forward_transaction(msg).unwrap_or(false) { - let r = self.txns_since_last_block.load(Ordering::Acquire); - if r > MAX_TRANSACTIONS_PER_BLOCK_MESSAGE { - return true; - } - } - false - } - - // Checks errors from decoding a message. - // We may send `HandshakeFailure` to the other peer. - fn handle_peer_message_decode_error(&mut self, msg: &[u8], err: Error) { - if let Some(version) = err - .get_ref() - .and_then(|err| err.downcast_ref::()) - .and_then(|inner| { - if let HandshakeFailureReason::ProtocolVersionMismatch { version, .. } = *inner { - Some(version) - } else { - None - } - }) - { - debug!(target: "network", "Received connection from node with unsupported version: {}", version); - self.send_message(&PeerMessage::HandshakeFailure( - self.my_node_info.clone(), - HandshakeFailureReason::ProtocolVersionMismatch { - version: PROTOCOL_VERSION, - oldest_supported_version: PEER_MIN_ALLOWED_PROTOCOL_VERSION, - }, - )); + fn should_we_drop_msg(&self, msg: &PeerMessage) -> bool { + let m = if let PeerMessage::Routed(m) = msg { + m } else { - info!(target: "network", "Received invalid data {:?} from {}: {}", logging::pretty_vec(msg), self.peer_info, err); - } + return false; + }; + let _ = if let RoutedMessageBody::ForwardTx(t) = &m.body { + t + } else { + return false; + }; + let r = self.txns_since_last_block.load(Ordering::Acquire); + r > MAX_TRANSACTIONS_PER_BLOCK_MESSAGE } } @@ -606,22 +660,24 @@ impl Actor for PeerActor { debug!(target: "network", "{:?}: Peer {} disconnected. {:?}", self.my_node_info.id, self.peer_info, self.peer_status); if let Some(peer_info) = self.peer_info.as_ref() { if let PeerStatus::Banned(ban_reason) = self.peer_status { - self.peer_manager_addr.do_send(PeerManagerMessageRequest::Ban(Ban { + let _ = self.peer_manager_addr.do_send(PeerManagerMessageRequest::Ban(Ban { peer_id: peer_info.id.clone(), ban_reason, })); } else { - self.peer_manager_addr.do_send(PeerManagerMessageRequest::Unregister(Unregister { - peer_id: peer_info.id.clone(), - peer_type: self.peer_type, - // If the PeerActor is no longer in the Connecting state this means - // that the connection was consolidated at some point in the past. - // Only if the connection was consolidated try to remove this peer from the - // peer store. This avoids a situation in which both peers are connecting to - // each other, and after resolving the tie, a peer tries to remove the other - // peer from the active connection if it was added in the parallel connection. - remove_from_peer_store: self.peer_status != PeerStatus::Connecting, - })) + let _ = self.peer_manager_addr.do_send(PeerManagerMessageRequest::Unregister( + Unregister { + peer_id: peer_info.id.clone(), + peer_type: self.peer_type, + // If the PeerActor is no longer in the Connecting state this means + // that the connection was consolidated at some point in the past. + // Only if the connection was consolidated try to remove this peer from the + // peer store. This avoids a situation in which both peers are connecting to + // each other, and after resolving the tie, a peer tries to remove the other + // peer from the active connection if it was added in the parallel connection. + remove_from_peer_store: self.peer_status != PeerStatus::Connecting, + }, + )); } } Running::Stop @@ -648,19 +704,18 @@ impl StreamHandler, ReasonForBan>> for PeerActor { // as long as it travels to PeerManager, etc. self.update_stats_on_receiving_message(msg.len()); - - if self.should_we_drop_msg_without_decoding(&msg) { - return; - } - let peer_msg = match PeerMessage::try_from_slice(&msg) { - Ok(peer_msg) => peer_msg, + let peer_msg = match self.parse_message(&msg) { + Ok(msg) => msg, Err(err) => { - // This may send `HandshakeFailure` to the other peer. - self.handle_peer_message_decode_error(&msg, err); + debug!(target: "network", "Received invalid data {:?} from {}: {}", logging::pretty_vec(&msg), self.peer_info, err); return; } }; + if self.should_we_drop_msg(&peer_msg) { + return; + } + // Drop duplicated messages routed within DROP_DUPLICATED_MESSAGES_PERIOD ms if let PeerMessage::Routed(msg) = &peer_msg { let key = (msg.author.clone(), msg.target.clone(), msg.signature.clone()); @@ -721,12 +776,14 @@ impl StreamHandler, ReasonForBan>> for PeerActor { } HandshakeFailureReason::InvalidTarget => { debug!(target: "network", "Peer found was not what expected. Updating peer info with {:?}", peer_info); - self.peer_manager_addr.do_send(ActixMessageWrapper::new_without_size( - PeerManagerMessageRequest::PeerRequest(PeerRequest::UpdatePeerInfo( - peer_info, - )), - Some(self.throttle_controller.clone()), - )); + let _ = self.peer_manager_wrapper_addr.do_send( + ActixMessageWrapper::new_without_size( + PeerManagerMessageRequest::PeerRequest( + PeerRequest::UpdatePeerInfo(peer_info), + ), + Some(self.throttle_controller.clone()), + ), + ); } } ctx.stop(); @@ -744,12 +801,10 @@ impl StreamHandler, ReasonForBan>> for PeerActor { if handshake.sender_chain_info.genesis_id != self.genesis_id { debug!(target: "network", "Received connection from node with different genesis."); - ctx.address().do_send(SendMessage { - message: PeerMessage::HandshakeFailure( - self.my_node_info.clone(), - HandshakeFailureReason::GenesisMismatch(self.genesis_id.clone()), - ), - }); + self.send_message_or_log(&PeerMessage::HandshakeFailure( + self.my_node_info.clone(), + HandshakeFailureReason::GenesisMismatch(self.genesis_id.clone()), + )); return; // Connection will be closed by a handshake timeout } @@ -763,7 +818,7 @@ impl StreamHandler, ReasonForBan>> for PeerActor { if handshake.target_peer_id != self.my_node_info.id { debug!(target: "network", "Received handshake from {:?} to {:?} but I am {:?}", handshake.sender_peer_id, handshake.target_peer_id, self.my_node_info.id); - self.send_message(&PeerMessage::HandshakeFailure( + self.send_message_or_log(&PeerMessage::HandshakeFailure( self.my_node_info.clone(), HandshakeFailureReason::InvalidTarget, )); @@ -800,7 +855,7 @@ impl StreamHandler, ReasonForBan>> for PeerActor { account_id: None, }; self.chain_info = handshake.sender_chain_info.clone(); - self.peer_manager_addr + self.peer_manager_wrapper_addr .send(ActixMessageWrapper::new_without_size(PeerManagerMessageRequest::RegisterPeer(RegisterPeer { actor: ctx.address(), peer_info: peer_info.clone(), @@ -826,7 +881,7 @@ impl StreamHandler, ReasonForBan>> for PeerActor { }, Ok(RegisterPeerResponse::InvalidNonce(edge)) => { debug!(target: "network", "{:?}: Received invalid nonce from peer {:?} sending evidence.", act.my_node_id(), act.peer_addr); - act.send_message(&PeerMessage::LastEdge(*edge)); + act.send_message_or_log(&PeerMessage::LastEdge(*edge)); actix::fut::ready(()) } _ => { @@ -853,7 +908,7 @@ impl StreamHandler, ReasonForBan>> for PeerActor { return; } - self.peer_manager_addr + self.peer_manager_wrapper_addr .send(ActixMessageWrapper::new_without_size( PeerManagerMessageRequest::PeerRequest(PeerRequest::UpdateEdge(( self.other_peer_id().unwrap().clone(), @@ -882,14 +937,14 @@ impl StreamHandler, ReasonForBan>> for PeerActor { debug!(target: "network", "Duplicate handshake from {}", self.peer_info); } (PeerStatus::Ready, PeerMessage::PeersRequest) => { - self.peer_manager_addr.send(ActixMessageWrapper::new_without_size(PeerManagerMessageRequest::PeersRequest(PeersRequest {}), + self.peer_manager_wrapper_addr.send(ActixMessageWrapper::new_without_size(PeerManagerMessageRequest::PeersRequest(PeersRequest {}), Some(self.throttle_controller.clone()), )).into_actor(self).then(|res, act, _ctx| { if let Ok(peers) = res.map(|f|f.into_inner().as_peers_request_result()) { if !peers.peers.is_empty() { debug!(target: "network", "Peers request from {}: sending {} peers.", act.peer_info, peers.peers.len()); - act.send_message(&PeerMessage::PeersResponse(peers.peers)); + act.send_message_or_log(&PeerMessage::PeersResponse(peers.peers)); } } actix::fut::ready(()) @@ -897,10 +952,11 @@ impl StreamHandler, ReasonForBan>> for PeerActor { } (PeerStatus::Ready, PeerMessage::PeersResponse(peers)) => { debug!(target: "network", "Received peers from {}: {} peers.", self.peer_info, peers.len()); - self.peer_manager_addr.do_send(ActixMessageWrapper::new_without_size( - PeerManagerMessageRequest::PeersResponse(PeersResponse { peers }), - Some(self.throttle_controller.clone()), - )); + let _ = + self.peer_manager_wrapper_addr.do_send(ActixMessageWrapper::new_without_size( + PeerManagerMessageRequest::PeersResponse(PeersResponse { peers }), + Some(self.throttle_controller.clone()), + )); } (PeerStatus::Ready, PeerMessage::RequestUpdateNonce(edge_info)) => self .peer_manager_addr @@ -914,7 +970,7 @@ impl StreamHandler, ReasonForBan>> for PeerActor { .then(|res, act, ctx| { match res.map(|f| f.as_network_response()) { Ok(NetworkResponses::EdgeUpdate(edge)) => { - act.send_message(&PeerMessage::ResponseUpdateNonce(*edge)); + act.send_message_or_log(&PeerMessage::ResponseUpdateNonce(*edge)); } Ok(NetworkResponses::BanPeer(reason_for_ban)) => { act.ban_peer(ctx, reason_for_ban); @@ -940,22 +996,26 @@ impl StreamHandler, ReasonForBan>> for PeerActor { }) .spawn(ctx), (PeerStatus::Ready, PeerMessage::SyncRoutingTable(routing_table_update)) => { - self.peer_manager_addr.do_send(ActixMessageWrapper::new_without_size( - PeerManagerMessageRequest::NetworkRequests(NetworkRequests::SyncRoutingTable { - peer_id: self.other_peer_id().unwrap().clone(), - routing_table_update, - }), - Some(self.throttle_controller.clone()), - )); + let _ = + self.peer_manager_wrapper_addr.do_send(ActixMessageWrapper::new_without_size( + PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::SyncRoutingTable { + peer_id: self.other_peer_id().unwrap().clone(), + routing_table_update, + }, + ), + Some(self.throttle_controller.clone()), + )); } - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - (PeerStatus::Ready, PeerMessage::RoutingTableSyncV2(ibf_message)) => { + (PeerStatus::Ready, PeerMessage::RoutingTableSyncV2(ibf_message)) + if cfg!(feature = "protocol_feature_routing_exchange_algorithm") => + { // TODO(#5155) Add wrapper to be something like this for all messages. // self.peer_manager_addr.do_send(ActixMessageWrapper::new( // self.rate_limiter.clone, NetworkRequests::IbfMessage { // ... - self.peer_manager_addr.do_send(ActixMessageWrapper::new_without_size( + self.peer_manager_wrapper_addr.do_send(ActixMessageWrapper::new_without_size( PeerManagerMessageRequest::NetworkRequests(NetworkRequests::IbfMessage { peer_id: self.other_peer_id().unwrap().clone(), ibf_msg: ibf_message, @@ -970,7 +1030,7 @@ impl StreamHandler, ReasonForBan>> for PeerActor { if !routed_message.verify() { self.ban_peer(ctx, ReasonForBan::InvalidSignature); } else { - self.peer_manager_addr + self.peer_manager_wrapper_addr .send(ActixMessageWrapper::new_without_size( PeerManagerMessageRequest::RoutedMessageFrom(RoutedMessageFrom { msg: routed_message.clone(), @@ -1005,7 +1065,7 @@ impl Handler for PeerActor { #[perf] fn handle(&mut self, msg: SendMessage, _: &mut Self::Context) { let _d = delay_detector::DelayDetector::new(|| "send message".into()); - self.send_message(&msg.message); + self.send_message_or_log(&msg.message); } } @@ -1015,7 +1075,7 @@ impl Handler> for PeerActor { #[perf] fn handle(&mut self, msg: Arc, _: &mut Self::Context) { let _d = delay_detector::DelayDetector::new(|| "send message".into()); - self.send_message(&msg.as_ref().message); + self.send_message_or_log(&msg.as_ref().message); } } diff --git a/chain/network/src/peer/utils.rs b/chain/network/src/peer/utils.rs deleted file mode 100644 index 01fb8610d12..00000000000 --- a/chain/network/src/peer/utils.rs +++ /dev/null @@ -1,176 +0,0 @@ -use tracing::error; - -/// Determines size of `PeerId` based on first byte of it's representation. -/// Size of `PeerId` depends on type of `PublicMessage` it stores. -/// `PublicKey::ED25519` -> `1 + 32 bytes` -/// `PublicKey::SECP256K1` -> `1 + 64 bytes` -fn peer_id_type_field_len(enum_var: u8) -> Option { - // 1 byte for enum variant, then some number depending on the - // public key type - match enum_var { - 0 => Some(1 + 32), - 1 => Some(1 + 64), - _ => None, - } -} - -/// Checks `bytes` represents `PeerMessage::Routed(RoutedMessage)`, -/// and `RoutedMessage.body` has type of `RoutedMessageBody::ForwardTx`. -/// -/// This is done to avoid expensive `borsh`-deserializing. -pub(crate) fn is_forward_transaction(bytes: &[u8]) -> Option { - // PeerMessage::Routed variant == 13 - let peer_message_variant = *bytes.get(0)?; - if peer_message_variant != 13 { - return Some(false); - } - - // target: PeerIdOrHash - let author_variant_idx = { - let target_field_len = { - let target_field_variant = *bytes.get(1)?; - if target_field_variant == 0 { - // PeerIdOrHash::PeerId - let peer_id_variant = *bytes.get(2)?; - peer_id_type_field_len(peer_id_variant)? - } else if target_field_variant == 1 { - // PeerIdOrHash::Hash is always 32 bytes - 32 - } else { - error!("Unsupported variant of PeerIdOrHash {}", target_field_variant); - return None; - } - }; - 2 + target_field_len - }; - - // author: PeerId - let signature_variant_idx = { - let author_variant = *bytes.get(author_variant_idx)?; - let author_field_len = peer_id_type_field_len(author_variant)?; - - author_variant_idx + author_field_len - }; - - // ttl: u8 - let ttl_idx = { - let signature_variant = *bytes.get(signature_variant_idx)?; - - // signature: Signature - let signature_field_len = match signature_variant { - 0 => 1 + 64, // Signature::ED25519 - 1 => 1 + 65, // Signature::SECP256K1 - _ => { - return None; - } - }; - signature_variant_idx + signature_field_len - }; - - // ttl: u8 - let message_body_idx = ttl_idx + 1; - - // check if type is `RoutedMessageBody::ForwardTx` - let message_body_variant = *bytes.get(message_body_idx)?; - Some(message_body_variant == 1) -} - -#[cfg(test)] -mod tests { - use crate::peer::utils::is_forward_transaction; - use crate::types::PeerMessage; - use borsh::BorshSerialize; - use near_crypto::{KeyType, SecretKey}; - use near_network_primitives::types::{PeerIdOrHash, RoutedMessage, RoutedMessageBody}; - use near_primitives::hash::CryptoHash; - use near_primitives::network::PeerId; - use near_primitives::transaction::{SignedTransaction, Transaction}; - - #[derive(Debug, Copy, Clone)] - enum ForwardTxTargetType { - Hash, - PublicKey(KeyType), - } - - #[derive(Debug, Copy, Clone)] - struct ForwardTxType { - target: ForwardTxTargetType, - author: KeyType, - tx: KeyType, - } - - fn create_tx_forward(schema: ForwardTxType) -> PeerMessage { - let target = match schema.target { - ForwardTxTargetType::Hash => { - PeerIdOrHash::Hash(CryptoHash::hash_bytes(b"peer_id_hash")) - } - ForwardTxTargetType::PublicKey(key_type) => { - let secret_key = SecretKey::from_seed(key_type, "target_secret_key"); - PeerIdOrHash::PeerId(PeerId::new(secret_key.public_key())) - } - }; - - let (author, signature) = { - let secret_key = SecretKey::from_seed(schema.author, "author_secret_key"); - let public_key = secret_key.public_key(); - let author = PeerId::new(public_key); - let msg_data = CryptoHash::hash_bytes(b"msg_data"); - let signature = secret_key.sign(msg_data.as_ref()); - - (author, signature) - }; - - let tx = { - let secret_key = SecretKey::from_seed(schema.tx, "tx_secret_key"); - let public_key = secret_key.public_key(); - let tx_hash = CryptoHash::hash_bytes(b"this_great_tx_data"); - let signature = secret_key.sign(tx_hash.as_ref()); - - SignedTransaction::new( - signature, - Transaction::new( - "test_x".parse().unwrap(), - public_key, - "test_y".parse().unwrap(), - 7, - tx_hash, - ), - ) - }; - - PeerMessage::Routed( - RoutedMessage { - target, - author, - signature, - ttl: 99, - body: RoutedMessageBody::ForwardTx(tx), - } - .into(), - ) - } - - #[test] - fn test_tx_forward() { - let targets = [ - ForwardTxTargetType::PublicKey(KeyType::ED25519), - ForwardTxTargetType::PublicKey(KeyType::SECP256K1), - ForwardTxTargetType::Hash, - ]; - let authors = [KeyType::ED25519, KeyType::SECP256K1]; - let txs_keys = [KeyType::ED25519, KeyType::SECP256K1]; - - let schemas = targets - .iter() - .flat_map(|target| authors.iter().map(move |author| (*target, *author))) - .flat_map(|(target, author)| { - txs_keys.iter().map(move |tx| ForwardTxType { target, author, tx: *tx }) - }); - - schemas.for_each(|s| { - let msg = create_tx_forward(s); - let bytes = msg.try_to_vec().unwrap(); - assert!(is_forward_transaction(&bytes).unwrap()); - }) - } -} diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index bb194d6ce28..8401ffb85e3 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -22,7 +22,6 @@ use actix::{ Recipient, Running, StreamHandler, WrapFuture, }; use anyhow::bail; -#[cfg(feature = "protocol_feature_routing_exchange_algorithm")] use futures::FutureExt; use near_network_primitives::types::{ AccountOrPeerIdOrHash, Ban, Edge, InboundTcpConnect, KnownPeerStatus, KnownProducer, @@ -909,13 +908,15 @@ impl PeerManagerActor { peer_type, FramedWrite::new(write, Codec::default(), Codec::default(), ctx), handshake_timeout, - recipient, + recipient.clone().recipient(), + recipient.clone().recipient(), client_addr, view_client_addr, partial_edge_info, txns_since_last_block, peer_counter, rate_limiter, + None, ) }); } @@ -1846,15 +1847,20 @@ impl PeerManagerActor { NetworkResponses::NoResponse } - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] - NetworkRequests::IbfMessage { peer_id, ibf_msg } => match ibf_msg { - crate::network_protocol::RoutingSyncV2::Version2(ibf_msg) => { - if let Some(addr) = self.connected_peers.get(&peer_id).map(|p| p.addr.clone()) { - self.process_ibf_msg(&peer_id, ibf_msg, addr, throttle_controller) + NetworkRequests::IbfMessage { peer_id, ibf_msg } => { + if cfg!(feature = "protocol_feature_routing_exchange_algorithm") { + match ibf_msg { + crate::network_protocol::RoutingSyncV2::Version2(ibf_msg) => { + if let Some(addr) = + self.connected_peers.get(&peer_id).map(|p| p.addr.clone()) + { + self.process_ibf_msg(&peer_id, ibf_msg, addr, throttle_controller) + } + } } - NetworkResponses::NoResponse } - }, + NetworkResponses::NoResponse + } NetworkRequests::Challenge(challenge) => { // TODO(illia): smarter routing? Self::broadcast_message( @@ -2326,7 +2332,6 @@ impl PeerManagerActor { } } - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] fn process_ibf_msg( &self, peer_id: &PeerId, diff --git a/chain/network/src/peer_manager/peer_store.rs b/chain/network/src/peer_manager/peer_store.rs index 0eb393c7bc1..7727acf5869 100644 --- a/chain/network/src/peer_manager/peer_store.rs +++ b/chain/network/src/peer_manager/peer_store.rs @@ -734,7 +734,7 @@ mod test { let blacklist = Blacklist::from_iter( ["127.0.0.1:2".to_string(), "127.0.0.1:5".to_string()].into_iter(), ); - let mut peer_store = PeerStore::new(store.clone(), &[], blacklist).unwrap(); + let mut peer_store = PeerStore::new(store, &[], blacklist).unwrap(); // Peer 127.0.0.1:2 is there but is banned. assert_peers(&peer_store, &[&ids[1], &ids[2]], &[&ids[2]]); diff --git a/chain/network/src/routing/ibf_peer_set.rs b/chain/network/src/routing/ibf_peer_set.rs index 0653e57c18b..de98f22ec7f 100644 --- a/chain/network/src/routing/ibf_peer_set.rs +++ b/chain/network/src/routing/ibf_peer_set.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] use crate::routing::ibf_set::IbfSet; use borsh::{BorshDeserialize, BorshSerialize}; use near_network_primitives::types::{Edge, SimpleEdge}; diff --git a/chain/network/src/routing/mod.rs b/chain/network/src/routing/mod.rs index c43a84e0f98..4eb47a400bc 100644 --- a/chain/network/src/routing/mod.rs +++ b/chain/network/src/routing/mod.rs @@ -1,19 +1,14 @@ pub(crate) mod edge_validator_actor; pub mod graph; -#[cfg(feature = "protocol_feature_routing_exchange_algorithm")] pub(crate) mod ibf; -#[cfg(feature = "protocol_feature_routing_exchange_algorithm")] pub(crate) mod ibf_peer_set; -#[cfg(feature = "protocol_feature_routing_exchange_algorithm")] pub(crate) mod ibf_set; mod route_back_cache; #[cfg(feature = "test_features")] pub use crate::private_actix::GetRoutingTableResult; pub(crate) mod routing_table_actor; pub mod routing_table_view; -#[cfg(feature = "protocol_feature_routing_exchange_algorithm")] pub use crate::routing::ibf_peer_set::SlotMapId; -#[cfg(feature = "protocol_feature_routing_exchange_algorithm")] pub use crate::routing::ibf_set::IbfSet; pub use routing_table_actor::start_routing_table_actor; diff --git a/chain/network/src/routing/routing_table_actor.rs b/chain/network/src/routing/routing_table_actor.rs index 99e858ab7e4..0a153615d12 100644 --- a/chain/network/src/routing/routing_table_actor.rs +++ b/chain/network/src/routing/routing_table_actor.rs @@ -46,7 +46,6 @@ pub struct RoutingTableActor { /// Data structure with all edges. It's guaranteed that `peer.0` < `peer.1`. pub edges_info: HashMap<(PeerId, PeerId), Edge>, /// Data structure used for exchanging routing tables. - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] pub peer_ibf_set: crate::routing::ibf_peer_set::IbfPeerSet, /// Current view of the network represented by undirected graph. /// Nodes are Peers and edges are active connections. @@ -89,7 +88,6 @@ impl RoutingTableActor { let edge_validator_pool = SyncArbiter::start(4, || EdgeValidatorActor {}); Self { edges_info: Default::default(), - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] peer_ibf_set: Default::default(), raw_graph: Graph::new(my_peer_id), peer_forwarding: Default::default(), @@ -469,7 +467,6 @@ pub enum RoutingTableMessages { #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] RemovePeer(PeerId), /// Do new routing table exchange algorithm. - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] ProcessIbfMessage { peer_id: PeerId, ibf_msg: crate::types::RoutingVersion2 }, /// Start new routing table sync. #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] @@ -489,7 +486,6 @@ pub enum RoutingTableMessagesResponse { seed: u64, }, Empty, - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] ProcessIbfMessageResponse { ibf_msg: Option, }, @@ -510,7 +506,6 @@ pub enum RoutingTableMessagesResponse { }, } -#[cfg(feature = "protocol_feature_routing_exchange_algorithm")] impl RoutingTableActor { pub fn exchange_routing_tables_using_ibf( &self, @@ -609,7 +604,6 @@ impl Handler for RoutingTableActor { self.peer_ibf_set.remove_peer(&peer_id); RoutingTableMessagesResponse::Empty } - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] RoutingTableMessages::ProcessIbfMessage { peer_id, ibf_msg } => { match ibf_msg.routing_state { crate::types::RoutingState::PartialSync(partial_sync) => { diff --git a/chain/network/src/tests/actix.rs b/chain/network/src/tests/actix.rs new file mode 100644 index 00000000000..b4c13873a99 --- /dev/null +++ b/chain/network/src/tests/actix.rs @@ -0,0 +1,51 @@ +use anyhow::anyhow; + +// A system thread which is joined on drop. +// TODO: replace with std::thread::ScopedJoinHandle once it is stable. +pub struct Thread(Option>>); + +impl Thread { + pub fn spawn anyhow::Result<()>>(f: F) -> Self { + Self(Some(std::thread::spawn(f))) + } +} + +impl Drop for Thread { + fn drop(&mut self) { + self.0.take().unwrap().join().unwrap().unwrap(); + } +} + +pub struct ActixSystem { + pub addr: actix::Addr, + system: actix::System, + // dropping _thread has a side effect of joining the system thread. + // Still, linter considers it a dead_code, so "_" is needed to silence it. + _thread: Thread, +} + +impl ActixSystem { + pub async fn spawn anyhow::Result>>( + f: F, + ) -> anyhow::Result { + let (send, recv) = tokio::sync::oneshot::channel(); + let thread = Thread::spawn(move || { + let s = actix::System::new(); + s.block_on(async move { + let system = actix::System::current(); + let addr = f()?; + send.send((system, addr)).map_err(|_| anyhow!("send failed")) + })?; + s.run()?; + Ok(()) + }); + let (system, addr) = recv.await?; + Ok(Self { addr, system, _thread: thread }) + } +} + +impl Drop for ActixSystem { + fn drop(&mut self) { + self.system.stop(); + } +} diff --git a/chain/network/src/tests/cache.rs b/chain/network/src/tests/cache.rs index f9891b205f8..5ed681c7788 100644 --- a/chain/network/src/tests/cache.rs +++ b/chain/network/src/tests/cache.rs @@ -52,7 +52,7 @@ fn dont_load_on_build() { let announce0 = AnnounceAccount { account_id: "near0".parse().unwrap(), - peer_id: peer_id0.clone(), + peer_id: peer_id0, epoch_id: epoch_id0, signature: Signature::default(), }; diff --git a/chain/network/src/tests/data.rs b/chain/network/src/tests/data.rs new file mode 100644 index 00000000000..bf757eae03d --- /dev/null +++ b/chain/network/src/tests/data.rs @@ -0,0 +1,286 @@ +use crate::tests::util::{Clock, Duration, FakeClock}; +use crate::types::{Handshake, RoutingTableUpdate}; +use near_crypto::{InMemorySigner, KeyType}; +use near_network_primitives::types::{ + AccountOrPeerIdOrHash, Edge, PartialEdgeInfo, PeerChainInfoV2, PeerInfo, RawRoutedMessage, + RoutedMessage, RoutedMessageBody, +}; +use near_primitives::block::{genesis_chunks, Block, BlockHeader, GenesisId}; +use near_primitives::challenge::{BlockDoubleSign, Challenge, ChallengeBody}; +use near_primitives::hash::CryptoHash; +use near_primitives::network::{AnnounceAccount, PeerId}; +use near_primitives::num_rational::Rational; +use near_primitives::sharding::{ + ChunkHash, EncodedShardChunk, EncodedShardChunkBody, PartialEncodedChunkPart, + ReedSolomonWrapper, ShardChunk, +}; +use near_primitives::transaction::SignedTransaction; +use near_primitives::types::{AccountId, BlockHeight, EpochId, StateRoot}; +use near_primitives::validator_signer::{InMemoryValidatorSigner, ValidatorSigner}; +use near_primitives::version::PROTOCOL_VERSION; +use rand::distributions::Standard; +use rand::Rng; +use std::collections::HashMap; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + +pub fn make_genesis_block(clock: &Clock, chunks: Vec) -> Block { + Block::genesis( + PROTOCOL_VERSION, + chunks.into_iter().map(|c| c.take_header()).collect(), + clock.utc_now(), + 0, // height + 1000, // initial_gas_price + 1000, // initial_total_supply + CryptoHash::default(), // next_bp_hash (next block producers' hash) + ) +} + +pub fn make_block( + clock: &Clock, + signer: &dyn ValidatorSigner, + prev: &Block, + chunks: Vec, +) -> Block { + Block::produce( + PROTOCOL_VERSION, // this_epoch_protocol_version + PROTOCOL_VERSION, // next_epoch_protocol_version + prev.header(), // prev + prev.header().height() + 5, // height + prev.header().block_ordinal() + 1, // block_ordinal + chunks.into_iter().map(|c| c.take_header()).collect(), // chunks + EpochId::default(), // epoch_id + EpochId::default(), // next_epoch_id + None, // epoch_sync_data_hash + vec![], // approvals + Rational::from_integer(0), // gas_price_adjustment_rate + 0, // min_gas_price + 0, // max_gas_price + Some(0), // minted_amount + vec![], // challenges_result + vec![], // challenges + signer, + CryptoHash::default(), // next_bp_hash + CryptoHash::default(), // block_merkle_root + Some(clock.utc_now()), // timestamp_override + ) +} + +pub fn make_account_id(rng: &mut R) -> AccountId { + format!("account{}", rng.gen::()).parse().unwrap() +} + +pub fn make_signer(rng: &mut R) -> InMemorySigner { + let account_id = make_account_id(rng); + InMemorySigner::from_seed(account_id.clone(), KeyType::ED25519, &account_id) +} + +pub fn make_validator_signer(rng: &mut R) -> InMemoryValidatorSigner { + let account_id = make_account_id(rng); + InMemoryValidatorSigner::from_seed(account_id.clone(), KeyType::ED25519, &account_id) +} + +pub fn make_addr(rng: &mut R) -> SocketAddr { + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, rng.gen())) +} + +pub fn make_peer_info(rng: &mut R) -> PeerInfo { + let signer = make_signer(rng); + PeerInfo { + id: PeerId::new(signer.public_key), + addr: Some(make_addr(rng)), + account_id: Some(signer.account_id), + } +} + +pub fn make_announce_account(rng: &mut R) -> AnnounceAccount { + let peer_id = PeerId::new(make_signer(rng).public_key); + let validator_signer = make_validator_signer(rng); + let signature = validator_signer.sign_account_announce( + validator_signer.validator_id(), + &peer_id, + &EpochId::default(), + ); + AnnounceAccount { + account_id: validator_signer.validator_id().clone(), + peer_id: peer_id, + epoch_id: EpochId::default(), + signature, + } +} + +pub fn make_partial_edge(rng: &mut R) -> PartialEdgeInfo { + let a = make_signer(rng); + let b = make_signer(rng); + PartialEdgeInfo::new( + &PeerId::new(a.public_key), + &PeerId::new(b.public_key), + rng.gen(), + &a.secret_key, + ) +} + +pub fn make_edge(rng: &mut R, a: &InMemorySigner, b: &InMemorySigner) -> Edge { + let (a, b) = if a.public_key < b.public_key { (a, b) } else { (b, a) }; + let ap = PeerId::new(a.public_key.clone()); + let bp = PeerId::new(b.public_key.clone()); + let nonce = rng.gen(); + let hash = Edge::build_hash(&ap, &bp, nonce); + Edge::new(ap, bp, nonce, a.secret_key.sign(hash.as_ref()), b.secret_key.sign(hash.as_ref())) +} + +pub fn make_routing_table(rng: &mut R) -> RoutingTableUpdate { + let signers: Vec<_> = (0..7).map(|_| make_signer(rng)).collect(); + RoutingTableUpdate { + accounts: (0..10).map(|_| make_announce_account(rng)).collect(), + edges: { + let mut e = vec![]; + for i in 0..signers.len() { + for j in 0..i { + e.push(make_edge(rng, &signers[i], &signers[j])); + } + } + e + }, + } +} + +pub fn make_signed_transaction(rng: &mut R) -> SignedTransaction { + let sender = make_signer(rng); + let receiver = make_account_id(rng); + SignedTransaction::send_money( + rng.gen(), + sender.account_id.clone(), + receiver, + &sender, + 15, + CryptoHash::default(), + ) +} + +pub fn make_challenge(rng: &mut R) -> Challenge { + Challenge::produce( + ChallengeBody::BlockDoubleSign(BlockDoubleSign { + left_block_header: rng.sample_iter(&Standard).take(65).collect(), + right_block_header: rng.sample_iter(&Standard).take(34).collect(), + }), + &make_validator_signer(rng), + ) +} + +// Based on ShardsManager::prepare_partial_encoded_chunk_response_from_chunk. +// I give no guarantee that it will produce correct data, I'm just approximating +// the real thing, since this functionality is not encapsulated in +// the production code well enough to reuse it in tests. +pub fn make_chunk_parts(chunk: ShardChunk) -> Vec { + let mut rs = ReedSolomonWrapper::new(10, 5); + let (parts, _) = EncodedShardChunk::encode_transaction_receipts( + &mut rs, + chunk.transactions().to_vec(), + &chunk.receipts(), + ) + .unwrap(); + let mut content = EncodedShardChunkBody { parts }; + content.reconstruct(&mut rs).unwrap(); + let (_, merkle_paths) = content.get_merkle_hash_and_paths(); + let mut parts = vec![]; + for ord in 0..rs.total_shard_count() { + parts.push(PartialEncodedChunkPart { + part_ord: ord as u64, + part: content.parts[ord].take().unwrap(), + merkle_proof: merkle_paths[ord].clone(), + }); + } + parts +} + +struct ChunkSet { + chunks: HashMap, +} + +impl ChunkSet { + pub fn new() -> Self { + Self { chunks: HashMap::default() } + } + pub fn make(&mut self) -> Vec { + // TODO: these are always genesis chunks. + // Consider making this more realistic. + let chunks = genesis_chunks( + vec![StateRoot::default()], // state_roots + 4, // num_shards + 1000, // initial_gas_limit + 0, // genesis_height + PROTOCOL_VERSION, + ); + self.chunks.extend(chunks.iter().map(|c| (c.chunk_hash(), c.clone()))); + chunks + } +} + +pub struct Chain { + pub genesis_id: GenesisId, + pub blocks: Vec, + pub chunks: HashMap, +} + +impl Chain { + pub fn make(clock: &mut FakeClock, rng: &mut R, block_count: usize) -> Chain { + let mut chunks = ChunkSet::new(); + let mut blocks = vec![]; + blocks.push(make_genesis_block(&clock.clock(), chunks.make())); + let signer = make_validator_signer(rng); + for _ in 1..block_count { + clock.advance(Duration::seconds(15)); + blocks.push(make_block(&clock.clock(), &signer, blocks.last().unwrap(), chunks.make())); + } + Chain { + genesis_id: GenesisId { + chain_id: format!("testchain{}", rng.gen::()), + hash: Default::default(), + }, + blocks, + chunks: chunks.chunks, + } + } + + pub fn height(&self) -> BlockHeight { + self.blocks.last().unwrap().header().height() + } + + pub fn get_info(&self) -> PeerChainInfoV2 { + PeerChainInfoV2 { + genesis_id: self.genesis_id.clone(), + height: self.height(), + tracked_shards: Default::default(), + archival: false, + } + } + + pub fn get_block_headers(&self) -> Vec { + self.blocks.iter().map(|b| b.header().clone()).collect() + } +} + +pub fn make_handshake(rng: &mut R, chain: &Chain) -> Handshake { + let a = make_signer(rng); + let b = make_signer(rng); + let a_id = PeerId::new(a.public_key); + let b_id = PeerId::new(b.public_key); + Handshake::new( + PROTOCOL_VERSION, + a_id, + b_id, + Some(rng.gen()), + chain.get_info(), + make_partial_edge(rng), + ) +} + +pub fn make_routed_message(rng: &mut R, body: RoutedMessageBody) -> Box { + let signer = make_signer(rng); + let peer_id = PeerId::new(signer.public_key); + RawRoutedMessage { target: AccountOrPeerIdOrHash::PeerId(peer_id.clone()), body }.sign( + peer_id, + &signer.secret_key, + /*ttl=*/ 1, + ) +} diff --git a/chain/network/src/tests/mod.rs b/chain/network/src/tests/mod.rs index e8803efe769..0410984927c 100644 --- a/chain/network/src/tests/mod.rs +++ b/chain/network/src/tests/mod.rs @@ -1,2 +1,9 @@ +mod actix; +mod data; +mod network_protocol; +mod peer_actor; +mod peer_communication; +mod util; + mod cache; mod cache_edges; diff --git a/chain/network/src/tests/network_protocol.rs b/chain/network/src/tests/network_protocol.rs new file mode 100644 index 00000000000..09f894cf5db --- /dev/null +++ b/chain/network/src/tests/network_protocol.rs @@ -0,0 +1,106 @@ +use crate::network_protocol::Encoding; +use crate::tests::data; +use crate::tests::util::{make_rng, FakeClock}; +use crate::types::{HandshakeFailureReason, PeerMessage}; +use anyhow::{bail, Context as _}; +use near_network_primitives::types::{ + PartialEncodedChunkRequestMsg, PartialEncodedChunkResponseMsg, RoutedMessageBody, +}; +use near_primitives::syncing::EpochSyncResponse; +use near_primitives::types::EpochId; + +#[test] +fn serialize_deserialize() -> anyhow::Result<()> { + let mut rng = make_rng(89028037453); + let mut clock = FakeClock::default(); + + let chain = data::Chain::make(&mut clock, &mut rng, 12); + let a = data::make_signer(&mut rng); + let b = data::make_signer(&mut rng); + let edge = data::make_edge(&mut rng, &a, &b); + let epoch_id = EpochId(chain.blocks[1].hash().clone()); + + let chunk_hash = chain.blocks[3].chunks()[0].chunk_hash(); + let routed_message1 = data::make_routed_message( + &mut rng, + RoutedMessageBody::PartialEncodedChunkRequest(PartialEncodedChunkRequestMsg { + chunk_hash: chunk_hash.clone(), + part_ords: vec![], + tracking_shards: Default::default(), + }), + ); + let routed_message2 = data::make_routed_message( + &mut rng, + RoutedMessageBody::PartialEncodedChunkResponse(PartialEncodedChunkResponseMsg { + chunk_hash: chunk_hash.clone(), + parts: data::make_chunk_parts(chain.chunks[&chunk_hash].clone()), + receipts: vec![], + }), + ); + let msgs = [ + PeerMessage::Handshake(data::make_handshake(&mut rng, &chain)), + PeerMessage::HandshakeFailure( + data::make_peer_info(&mut rng), + HandshakeFailureReason::InvalidTarget, + ), + PeerMessage::LastEdge(edge.clone()), + PeerMessage::SyncRoutingTable(data::make_routing_table(&mut rng)), + PeerMessage::RequestUpdateNonce(data::make_partial_edge(&mut rng)), + PeerMessage::ResponseUpdateNonce(edge.clone()), + PeerMessage::PeersRequest, + PeerMessage::PeersResponse((0..5).map(|_| data::make_peer_info(&mut rng)).collect()), + PeerMessage::BlockHeadersRequest(chain.blocks.iter().map(|b| b.hash().clone()).collect()), + PeerMessage::BlockHeaders(chain.get_block_headers()), + PeerMessage::BlockRequest(chain.blocks[5].hash().clone()), + PeerMessage::Block(chain.blocks[5].clone()), + PeerMessage::Transaction(data::make_signed_transaction(&mut rng)), + PeerMessage::Routed(routed_message1), + PeerMessage::Routed(routed_message2), + PeerMessage::Disconnect, + PeerMessage::Challenge(data::make_challenge(&mut rng)), + PeerMessage::EpochSyncRequest(epoch_id.clone()), + PeerMessage::EpochSyncResponse(Box::new(EpochSyncResponse::UpToDate)), + PeerMessage::EpochSyncFinalizationRequest(epoch_id.clone()), + // TODO: EpochSyncFinalizationResponse + // TODO: RoutingTableSyncV2, + ]; + + // Check that serialize;deserialize = 1 + for enc in [Encoding::Proto, Encoding::Borsh] { + for m in &msgs { + (|| { + let m2 = PeerMessage::deserialize(enc, &m.serialize(enc)) + .with_context(|| format!("{m}"))?; + if *m != m2 { + bail!("deserialize(serialize({m}) = {m2}"); + } + anyhow::Ok(()) + })() + .with_context(|| format!("encoding={enc:?}"))?; + } + } + + // Test the unambiguous parsing argument described in + // https://docs.google.com/document/d/1gCWmt9O-h_-5JDXIqbKxAaSS3Q9pryB1f9DDY1mMav4/edit#heading=h.x1awbr2acslb + for m in &msgs { + let x = m.serialize(Encoding::Proto); + assert!(x[0] >= 32, "serialize({},PROTO)[0] = {:?}, want >= 32", m, x.get(0)); + let y = m.serialize(Encoding::Borsh); + assert!(y[0] <= 21, "serialize({},BORSH)[0] = {:?}, want <= 21", m, y.get(0)); + } + + // Encodings should never be compatible. + for (from, to) in [(Encoding::Proto, Encoding::Borsh), (Encoding::Borsh, Encoding::Proto)] { + for m in &msgs { + let bytes = &m.serialize(from); + match PeerMessage::deserialize(to, bytes) { + Err(_) => {} + Ok(m2) => { + bail!("from={from:?},to={to:?}: deserialize(serialize({m})) = {m2}, want error") + } + } + } + } + + Ok(()) +} diff --git a/chain/network/src/tests/peer_actor.rs b/chain/network/src/tests/peer_actor.rs new file mode 100644 index 00000000000..9ea282b1a30 --- /dev/null +++ b/chain/network/src/tests/peer_actor.rs @@ -0,0 +1,330 @@ +use crate::peer::codec::Codec; +use crate::peer::peer_actor::PeerActor; +use crate::private_actix::{PeerRequestResult, RegisterPeerResponse, SendMessage}; +use crate::tests::actix::ActixSystem; +use crate::tests::data; +use crate::types::{ + NetworkClientMessages, NetworkClientResponses, NetworkRequests, NetworkResponses, + PeerManagerMessageRequest, PeerManagerMessageResponse, PeerMessage, RoutingTableUpdate, +}; +use actix::{Actor, Context, Handler, StreamHandler as _}; +use near_crypto::InMemorySigner; +use near_network_primitives::types::{ + AccountOrPeerIdOrHash, Edge, NetworkViewClientMessages, NetworkViewClientResponses, + PartialEdgeInfo, PeerInfo, PeerType, RawRoutedMessage, RoutedMessage, RoutedMessageBody, +}; +use near_performance_metrics::framed_write::FramedWrite; +use near_primitives::block::{Block, BlockHeader}; +use near_primitives::challenge::Challenge; +use near_primitives::hash::CryptoHash; +use near_primitives::network::PeerId; +use near_primitives::sharding::{ChunkHash, PartialEncodedChunkPart}; +use near_primitives::syncing::EpochSyncResponse; +use near_primitives::transaction::SignedTransaction; +use near_primitives::types::EpochId; +use near_rate_limiter::{ + ActixMessageResponse, ActixMessageWrapper, ThrottleController, ThrottleFramedRead, + ThrottleToken, +}; + +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; +use std::time; +use tokio::net::{TcpListener, TcpStream}; +use tokio_stream::StreamExt; + +pub struct PeerConfig { + pub signer: InMemorySigner, + pub chain: Arc, + pub peers: Vec, + pub force_encoding: Option, +} + +impl PeerConfig { + pub fn id(&self) -> PeerId { + PeerId::new(self.signer.public_key.clone()) + } +} + +#[derive(Debug, PartialEq, Eq)] +pub enum Response { + HandshakeDone, + BlockRequest(CryptoHash), + Block(Block), + BlockHeadersRequest(Vec), + BlockHeaders(Vec), + Chunk(Vec), + ChunkRequest(ChunkHash), + RoutingTable(RoutingTableUpdate), + RequestUpdateNonce(PartialEdgeInfo), + ResponseUpdateNonce(Edge), + PeersResponse(Vec), + Transaction(SignedTransaction), + Challenge(Challenge), + EpochSyncRequest(EpochId), + EpochSyncResponse(EpochSyncResponse), + EpochSyncFinalizationRequest(EpochId), +} + +struct FakeActor { + cfg: Arc, + responses: tokio::sync::mpsc::UnboundedSender, +} + +impl Actor for FakeActor { + type Context = Context; +} + +impl Handler> for FakeActor { + type Result = ActixMessageResponse; + + fn handle( + &mut self, + msg: ActixMessageWrapper, + ctx: &mut Self::Context, + ) -> Self::Result { + let (msg, throttle_token) = msg.take(); + ActixMessageResponse::new( + self.handle(msg, ctx), + ThrottleToken::new_without_size(throttle_token.throttle_controller().cloned()), + ) + } +} + +impl Handler for FakeActor { + type Result = NetworkViewClientResponses; + fn handle(&mut self, msg: NetworkViewClientMessages, _ctx: &mut Self::Context) -> Self::Result { + println!("{}: view client message {}", self.cfg.id(), Into::<&'static str>::into(&msg)); + match msg { + NetworkViewClientMessages::GetChainInfo => { + let ci = self.cfg.chain.get_info(); + NetworkViewClientResponses::ChainInfo { + genesis_id: ci.genesis_id, + height: ci.height, + tracked_shards: ci.tracked_shards, + archival: ci.archival, + } + } + NetworkViewClientMessages::BlockRequest(block_hash) => { + self.responses.send(Response::BlockRequest(block_hash)).unwrap(); + NetworkViewClientResponses::NoResponse + } + NetworkViewClientMessages::BlockHeadersRequest(req) => { + self.responses.send(Response::BlockHeadersRequest(req)).unwrap(); + NetworkViewClientResponses::NoResponse + } + NetworkViewClientMessages::EpochSyncRequest { epoch_id } => { + self.responses.send(Response::EpochSyncRequest(epoch_id)).unwrap(); + NetworkViewClientResponses::NoResponse + } + NetworkViewClientMessages::EpochSyncFinalizationRequest { epoch_id } => { + self.responses.send(Response::EpochSyncFinalizationRequest(epoch_id)).unwrap(); + NetworkViewClientResponses::NoResponse + } + _ => panic!("unsupported message"), + } + } +} + +impl Handler for FakeActor { + type Result = NetworkClientResponses; + fn handle(&mut self, msg: NetworkClientMessages, _ctx: &mut Self::Context) -> Self::Result { + println!("{}: client message {}", self.cfg.id(), Into::<&'static str>::into(&msg)); + let mut resp = NetworkClientResponses::NoResponse; + match msg { + NetworkClientMessages::Block(b, _, _) => { + self.responses.send(Response::Block(b)).unwrap() + } + NetworkClientMessages::BlockHeaders(bhs, _) => { + self.responses.send(Response::BlockHeaders(bhs)).unwrap() + } + NetworkClientMessages::PartialEncodedChunkResponse(resp, _) => { + self.responses.send(Response::Chunk(resp.parts)).unwrap() + } + NetworkClientMessages::PartialEncodedChunkRequest(req, _) => { + self.responses.send(Response::ChunkRequest(req.chunk_hash)).unwrap() + } + NetworkClientMessages::Transaction { transaction, .. } => { + self.responses.send(Response::Transaction(transaction)).unwrap(); + resp = NetworkClientResponses::ValidTx; + } + NetworkClientMessages::Challenge(c) => { + self.responses.send(Response::Challenge(c)).unwrap() + } + NetworkClientMessages::EpochSyncResponse(_, resp) => { + self.responses.send(Response::EpochSyncResponse(*resp)).unwrap() + } + _ => panic!("unsupported message"), + }; + resp + } +} + +impl Handler for FakeActor { + type Result = PeerManagerMessageResponse; + fn handle(&mut self, msg: PeerManagerMessageRequest, _ctx: &mut Self::Context) -> Self::Result { + println!("{}: PeerManager message {}", self.cfg.id(), strum::AsStaticRef::as_static(&msg)); + match msg { + PeerManagerMessageRequest::RegisterPeer(msg) => { + self.responses.send(Response::HandshakeDone).unwrap(); + PeerManagerMessageResponse::RegisterPeerResponse(RegisterPeerResponse::Accept( + match msg.this_edge_info { + Some(_) => None, + None => Some(PartialEdgeInfo::new( + &self.cfg.id(), + &msg.peer_info.id, + msg.other_edge_info.nonce, + &self.cfg.signer.secret_key, + )), + }, + )) + } + PeerManagerMessageRequest::RoutedMessageFrom(_) => { + // Accept all incoming routed messages + PeerManagerMessageResponse::RoutedMessageFrom(true) + } + PeerManagerMessageRequest::NetworkRequests(req) => { + self.responses + .send(match req { + NetworkRequests::SyncRoutingTable { routing_table_update, .. } => { + Response::RoutingTable(routing_table_update) + } + NetworkRequests::RequestUpdateNonce(_, edge) => { + Response::RequestUpdateNonce(edge) + } + NetworkRequests::ResponseUpdateNonce(edge) => { + Response::ResponseUpdateNonce(edge) + } + _ => panic!("unsupported message"), + }) + .unwrap(); + PeerManagerMessageResponse::NetworkResponses(NetworkResponses::NoResponse) + } + PeerManagerMessageRequest::PeersRequest(_) => { + // PeerActor would panic if we returned a different response. + // This also triggers sending a message to the peer. + PeerManagerMessageResponse::PeerRequestResult(PeerRequestResult { + peers: self.cfg.peers.clone(), + }) + } + PeerManagerMessageRequest::PeersResponse(resp) => { + self.responses.send(Response::PeersResponse(resp.peers)).unwrap(); + PeerManagerMessageResponse::PeersResponseResult(()) + } + _ => panic!("unsupported message"), + } + } +} + +pub struct PeerHandle { + pub cfg: Arc, + peer_id: PeerId, + actix: ActixSystem, + responses: tokio::sync::mpsc::UnboundedReceiver, +} + +impl PeerHandle { + pub async fn recv(&mut self) -> Response { + self.responses.recv().await.unwrap() + } + + pub async fn send(&self, message: PeerMessage) { + self.actix.addr.send(SendMessage { message }).await.unwrap(); + } + + pub fn routed_message(&self, body: RoutedMessageBody) -> Box { + RawRoutedMessage { target: AccountOrPeerIdOrHash::PeerId(self.peer_id.clone()), body }.sign( + self.cfg.id(), + &self.cfg.signer.secret_key, + /*ttl=*/ 1, + ) + } + + async fn start_endpoint( + cfg: PeerConfig, + stream: tokio::net::TcpStream, + peer_id: PeerId, + stream_type: PeerType, + ) -> anyhow::Result { + let cfg = Arc::new(cfg); + let (send, recv) = tokio::sync::mpsc::unbounded_channel(); + + let peer_id_ = peer_id.clone(); + let cfg_ = cfg.clone(); + let actix = ActixSystem::spawn(move || { + let my_addr = stream.local_addr()?; + let peer_addr = stream.peer_addr()?; + let (read, write) = tokio::io::split(stream); + let handshake_timeout = time::Duration::from_secs(5); + let fa = FakeActor { cfg: cfg.clone(), responses: send }.start(); + let rate_limiter = ThrottleController::new(usize::MAX, usize::MAX); + let read = ThrottleFramedRead::new(read, Codec::default(), rate_limiter.clone()) + .take_while(|x| match x { + Ok(_) => true, + Err(_) => false, + }) + .map(Result::unwrap); + Ok(PeerActor::create(move |ctx| { + PeerActor::add_stream(read, ctx); + PeerActor::new( + PeerInfo { id: cfg.id(), addr: Some(my_addr), account_id: None }, + peer_addr.clone(), + Some(PeerInfo { + id: peer_id.clone(), + addr: Some(peer_addr.clone()), + account_id: None, + }), + stream_type, + FramedWrite::new(write, Codec::default(), Codec::default(), ctx), + handshake_timeout, + fa.clone().recipient(), + fa.clone().recipient(), + fa.clone().recipient(), + fa.clone().recipient(), + match stream_type { + PeerType::Inbound => None, + PeerType::Outbound => Some(PartialEdgeInfo::new( + &cfg.id(), + &peer_id, + 1, + &cfg.signer.secret_key, + )), + }, + Arc::new(AtomicUsize::new(0)), + Arc::new(AtomicUsize::new(0)), + rate_limiter, + cfg.force_encoding, + ) + })) + }) + .await?; + Ok(Self { actix, peer_id: peer_id_, cfg: cfg_, responses: recv }) + } + + pub async fn start( + outbound_cfg: PeerConfig, + inbound_cfg: PeerConfig, + ) -> anyhow::Result<(PeerHandle, PeerHandle)> { + // start a TCP connection. + let listener = TcpListener::bind("127.0.0.1:0").await?; + let connect_future = TcpStream::connect(listener.local_addr()?); + let accept_future = listener.accept(); + let (connect_result, accept_result) = tokio::join!(connect_future, accept_future); + let outbound_stream = connect_result?; + let (inbound_stream, _) = accept_result?; + + let outbound_id = outbound_cfg.id(); + let inbound_id = inbound_cfg.id(); + let outbound = PeerHandle::start_endpoint( + outbound_cfg, + outbound_stream, + inbound_id, + PeerType::Outbound, + ) + .await?; + let inbound = + PeerHandle::start_endpoint(inbound_cfg, inbound_stream, outbound_id, PeerType::Inbound) + .await?; + Ok((outbound, inbound)) + } +} diff --git a/chain/network/src/tests/peer_communication.rs b/chain/network/src/tests/peer_communication.rs new file mode 100644 index 00000000000..69e6e1d13c9 --- /dev/null +++ b/chain/network/src/tests/peer_communication.rs @@ -0,0 +1,190 @@ +#![allow(unused_imports)] +#![allow(dead_code)] +use crate::network_protocol::Encoding; +use crate::peer::codec::Codec; +use crate::peer::peer_actor::PeerActor; +use crate::private_actix::{PeerRequestResult, RegisterPeer, RegisterPeerResponse, SendMessage}; +use crate::stats::metrics::NetworkMetrics; +use crate::tests::data; +use crate::tests::peer_actor::{PeerConfig, PeerHandle, Response}; +use crate::tests::util::{make_rng, FakeClock}; +use crate::types::{ + NetworkClientMessages, NetworkClientResponses, NetworkRequests, NetworkResponses, + PeerManagerMessageRequest, PeerManagerMessageResponse, PeerMessage, RoutingTableUpdate, +}; +use actix::{Actor, Addr, Context, Handler, StreamHandler as _}; +use anyhow::{anyhow, bail, Context as _}; +use near_crypto::{InMemorySigner, KeyType, PublicKey, SecretKey}; +use near_network_primitives::types::{ + Edge, NetworkViewClientMessages, NetworkViewClientResponses, PartialEdgeInfo, + PartialEncodedChunkRequestMsg, PartialEncodedChunkResponseMsg, PeerInfo, PeerType, + RoutedMessageBody, +}; +use near_performance_metrics::framed_write::FramedWrite; +use near_primitives::block::{genesis_chunks, Block, BlockHeader, GenesisId}; +use near_primitives::hash::CryptoHash; +use near_primitives::network::{AnnounceAccount, PeerId}; +use near_primitives::num_rational::Rational; +use near_primitives::sharding::ShardChunk; +use near_primitives::syncing::EpochSyncResponse; +use near_primitives::types::{AccountId, BlockHeight, EpochId, StateRoot}; +use near_primitives::validator_signer::{InMemoryValidatorSigner, ValidatorSigner}; +use near_primitives::version::PROTOCOL_VERSION; +use near_rate_limiter::{ + ActixMessageResponse, ActixMessageWrapper, ThrottleController, ThrottleFramedRead, + ThrottleToken, +}; +use rand::Rng; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; +use std::time; +use tokio_stream::StreamExt; + +async fn test_peer_communication( + outbound_encoding: Option, + inbound_encoding: Option, +) -> anyhow::Result<()> { + let mut rng = make_rng(89028037453); + let mut clock = FakeClock::default(); + + let chain = Arc::new(data::Chain::make(&mut clock, &mut rng, 12)); + let (mut outbound, mut inbound) = PeerHandle::start( + PeerConfig { + signer: data::make_signer(&mut rng), + chain: chain.clone(), + peers: (0..5).map(|_| data::make_peer_info(&mut rng)).collect(), + force_encoding: outbound_encoding, + }, + PeerConfig { + signer: data::make_signer(&mut rng), + chain: chain.clone(), + peers: (0..5).map(|_| data::make_peer_info(&mut rng)).collect(), + force_encoding: inbound_encoding, + }, + ) + .await?; + + assert_eq!(Response::HandshakeDone, outbound.recv().await); + assert_eq!(Response::HandshakeDone, inbound.recv().await); + + // RequestUpdateNonce + let want = data::make_partial_edge(&mut rng); + outbound.send(PeerMessage::RequestUpdateNonce(want.clone())).await; + let got = inbound.recv().await; + assert_eq!(Response::RequestUpdateNonce(want), got); + + // ReponseUpdateNonce + let a = data::make_signer(&mut rng); + let b = data::make_signer(&mut rng); + let want = data::make_edge(&mut rng, &a, &b); + outbound.send(PeerMessage::ResponseUpdateNonce(want.clone())).await; + assert_eq!(Response::ResponseUpdateNonce(want), inbound.recv().await); + + // PeersRequest -> PeersResponse + // This test is different from the rest, because we cannot skip sending the response back. + let want = inbound.cfg.peers.clone(); + outbound.send(PeerMessage::PeersRequest).await; + assert_eq!(Response::PeersResponse(want), outbound.recv().await); + + // BlockRequest + let want = chain.blocks[5].hash().clone(); + outbound.send(PeerMessage::BlockRequest(want.clone())).await; + assert_eq!(Response::BlockRequest(want), inbound.recv().await); + + // Block + let want = chain.blocks[5].clone(); + outbound.send(PeerMessage::Block(want.clone())).await; + assert_eq!(Response::Block(want), inbound.recv().await); + + // BlockHeadersRequest + let want: Vec<_> = chain.blocks.iter().map(|b| b.hash().clone()).collect(); + outbound.send(PeerMessage::BlockHeadersRequest(want.clone())).await; + assert_eq!(Response::BlockHeadersRequest(want), inbound.recv().await); + + // BlockHeaders + let want = chain.get_block_headers(); + outbound.send(PeerMessage::BlockHeaders(want.clone())).await; + assert_eq!(Response::BlockHeaders(want), inbound.recv().await); + + // SyncRoutingTable + let want = data::make_routing_table(&mut rng); + outbound.send(PeerMessage::SyncRoutingTable(want.clone())).await; + assert_eq!(Response::RoutingTable(want), inbound.recv().await); + + // PartialEncodedChunkRequest + let want = chain.blocks[5].chunks()[2].chunk_hash(); + let msg = outbound.routed_message(RoutedMessageBody::PartialEncodedChunkRequest( + PartialEncodedChunkRequestMsg { + chunk_hash: want.clone(), + part_ords: vec![], + tracking_shards: Default::default(), + }, + )); + outbound.send(PeerMessage::Routed(msg)).await; + assert_eq!(Response::ChunkRequest(want), inbound.recv().await); + + // PartialEncodedChunkResponse + let want_hash = chain.blocks[3].chunks()[0].chunk_hash(); + let want_parts = data::make_chunk_parts(chain.chunks[&want_hash].clone()); + let msg = outbound.routed_message(RoutedMessageBody::PartialEncodedChunkResponse( + PartialEncodedChunkResponseMsg { + chunk_hash: want_hash, + parts: want_parts.clone(), + receipts: vec![], + }, + )); + outbound.send(PeerMessage::Routed(msg)).await; + assert_eq!(Response::Chunk(want_parts), inbound.recv().await); + + // Transaction + let want = data::make_signed_transaction(&mut rng); + outbound.send(PeerMessage::Transaction(want.clone())).await; + assert_eq!(Response::Transaction(want), inbound.recv().await); + + // Challenge + let want = data::make_challenge(&mut rng); + outbound.send(PeerMessage::Challenge(want.clone())).await; + assert_eq!(Response::Challenge(want), inbound.recv().await); + + // EpochSyncRequest + let want = EpochId(chain.blocks[1].hash().clone()); + outbound.send(PeerMessage::EpochSyncRequest(want.clone())).await; + assert_eq!(Response::EpochSyncRequest(want), inbound.recv().await); + + // EpochSyncResponse + let want = EpochSyncResponse::UpToDate; + outbound.send(PeerMessage::EpochSyncResponse(Box::new(want.clone()))).await; + assert_eq!(Response::EpochSyncResponse(want), inbound.recv().await); + + // EpochSyncFinalizationRequest + let want = EpochId(chain.blocks[1].hash().clone()); + outbound.send(PeerMessage::EpochSyncFinalizationRequest(want.clone())).await; + assert_eq!(Response::EpochSyncFinalizationRequest(want), inbound.recv().await); + + // TODO: + // LastEdge, HandshakeFailure, Disconnect - affect the state of the PeerActor and are + // observable only under specific conditions. + // ExpochSyncFinalizationResponse - needs some work to produce reasonable fake data. + // RoutingTableSyncV2 - not used yet, available under some feature flag. + Ok(()) +} + +#[tokio::test] +// Verifies that peers are able to establish a common encoding protocol. +async fn peer_communication() -> anyhow::Result<()> { + let encodings = [None, Some(Encoding::Proto), Some(Encoding::Borsh)]; + for outbound in &encodings { + for inbound in &encodings { + if let (Some(a), Some(b)) = (outbound, inbound) { + if *a != *b { + continue; + } + } + test_peer_communication(outbound.clone(), inbound.clone()) + .await + .with_context(|| format!("(outbound={outbound:?},inbound={inbound:?})"))?; + } + } + Ok(()) +} diff --git a/chain/network/src/tests/util.rs b/chain/network/src/tests/util.rs new file mode 100644 index 00000000000..cc564feb5ac --- /dev/null +++ b/chain/network/src/tests/util.rs @@ -0,0 +1,94 @@ +#![allow(dead_code)] +use once_cell::sync::Lazy; + +// TODO: consider wrapping these types to prevent interaction with +// real time in tests. +pub type Instant = std::time::Instant; +pub type Utc = chrono::DateTime; +pub type Duration = chrono::Duration; + +// Instant doesn't have a deterministic contructor, +// however since Instant is not convertible to an unix timestamp, +// we can snapshot Instant::now() at the process startup +// and treat it as a constant. All observable effects will be then +// deterministic. +static FAKE_CLOCK_MONO_START: Lazy = Lazy::new(Instant::now); + +// An arbitrary non-trivial deterministic Utc timestamp. +static FAKE_CLOCK_UTC_START: Lazy = + Lazy::new(|| Utc::from(std::time::SystemTime::UNIX_EPOCH) + Duration::seconds(89108233)); + +enum ClockInner<'a> { + Real, + Fake(&'a FakeClock), +} + +/// Clock encapsulates a system clock, allowing to replace it +/// with a fake in tests. +/// Since system clock is a source of external information, +/// it has to be replaced with a fake double, if we want our +/// tests to be deterministic. +/// +/// This is a reimplementation of primitives/src/time.rs +/// with a more systematic approach. +/// TODO: add tests, put it is some reusable package and use +/// throughout the nearcore codebase. +pub struct Clock<'a>(ClockInner<'a>); + +impl<'a> Clock<'a> { + pub fn real() -> Clock<'static> { + Clock(ClockInner::Real) + } + /// current time according to the monotonic clock + pub fn now(&self) -> Instant { + match self.0 { + ClockInner::Real => Instant::now(), + ClockInner::Fake(fake) => fake.now(), + } + } + /// current time according to the system/walltime clock + pub fn utc_now(&self) -> Utc { + match self.0 { + ClockInner::Real => chrono::Utc::now(), + ClockInner::Fake(fake) => fake.utc_now(), + } + } +} + +pub struct FakeClock { + mono: Instant, + utc: Utc, +} + +impl FakeClock { + pub fn new(utc: Utc) -> Self { + Self { utc, mono: *FAKE_CLOCK_MONO_START } + } + pub fn now(&self) -> Instant { + self.mono + } + pub fn utc_now(&self) -> Utc { + self.utc + } + pub fn clock(&self) -> Clock<'_> { + Clock(ClockInner::Fake(self)) + } + pub fn advance(&mut self, d: Duration) { + assert!(d >= Duration::zero()); + self.mono += d.to_std().unwrap(); + self.utc = self.utc + d; + } + pub fn set_utc(&mut self, utc: Utc) { + self.utc = utc; + } +} + +impl Default for FakeClock { + fn default() -> FakeClock { + Self::new(*FAKE_CLOCK_UTC_START) + } +} + +pub fn make_rng(seed: u64) -> rand_pcg::Pcg32 { + rand_pcg::Pcg32::new(seed, 0xa02bdbf7bb3c0a7) +} diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 89feaf2c751..49876cfda3b 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -2,7 +2,6 @@ pub use crate::network_protocol::{ Handshake, HandshakeFailureReason, PeerMessage, RoutingTableUpdate, }; -#[cfg(feature = "protocol_feature_routing_exchange_algorithm")] pub use crate::network_protocol::{PartialSync, RoutingState, RoutingSyncV2, RoutingVersion2}; use crate::private_actix::{ PeerRequestResult, PeersRequest, RegisterPeer, RegisterPeerResponse, Unregister, @@ -83,7 +82,7 @@ pub struct PeersResponse { /// which contains reply for each message to `PeerManager`. /// There is 1 to 1 mapping between an entry in `PeerManagerMessageRequest` and `PeerManagerMessageResponse`. #[cfg_attr(feature = "deepsize_feature", derive(deepsize::DeepSizeOf))] -#[derive(actix::Message, Debug)] +#[derive(actix::Message, Debug, strum::AsStaticStr)] #[rtype(result = "PeerManagerMessageResponse")] pub enum PeerManagerMessageRequest { RoutedMessageFrom(RoutedMessageFrom), @@ -323,7 +322,6 @@ pub enum NetworkRequests { Challenge(Challenge), // IbfMessage - #[cfg(feature = "protocol_feature_routing_exchange_algorithm")] IbfMessage { peer_id: PeerId, ibf_msg: RoutingSyncV2, diff --git a/deny.toml b/deny.toml index cbb45f7cfbf..6138bd6bee7 100644 --- a/deny.toml +++ b/deny.toml @@ -109,4 +109,6 @@ skip = [ # Wasmer requires a newer version and the rest of the ecosystem hasn't caught up yet. { name = "hashbrown", version = "0.11.0" }, + # prometheus depends on an old version of protobuf + { name = "protobuf", version = "=2.25.1" }, ]