From 9b1567402bb40cb2f698afe42b5555f82aa802f9 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Thu, 28 Apr 2022 11:21:04 +0200 Subject: [PATCH] migrated to thiserror --- Cargo.lock | 1 + chain/network/Cargo.toml | 1 + .../src/network_protocol/borsh_conv.rs | 18 +- chain/network/src/network_protocol/mod.rs | 28 +- .../src/network_protocol/proto_conv.rs | 338 +++++++++++++----- chain/network/src/peer/peer_actor.rs | 53 +-- chain/network/src/tests/network_protocol.rs | 9 + chain/network/src/tests/util.rs | 1 + core/primitives-core/src/hash.rs | 4 +- core/primitives-core/src/serialize.rs | 10 +- core/primitives/src/shard_layout.rs | 4 +- core/primitives/src/utils.rs | 2 +- core/primitives/src/views.rs | 4 +- 13 files changed, 332 insertions(+), 141 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cc86b5504f5..91f94019566 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3010,6 +3010,7 @@ dependencies = [ "sha2", "strum", "tempfile", + "thiserror", "tokio", "tokio-stream", "tokio-util 0.7.1", diff --git a/chain/network/Cargo.toml b/chain/network/Cargo.toml index 42c6fd134ad..133d3f00c74 100644 --- a/chain/network/Cargo.toml +++ b/chain/network/Cargo.toml @@ -29,6 +29,7 @@ deepsize = { version = "0.2.0", optional = true } futures = "0.3" itertools = "0.10.3" lru = "0.7.2" +thiserror = "1" near-rust-allocator-proxy = { version = "0.4", optional = true } once_cell = "1.5.2" rand = "0.8.5" diff --git a/chain/network/src/network_protocol/borsh_conv.rs b/chain/network/src/network_protocol/borsh_conv.rs index 6e1128b738f..0c4d6d28812 100644 --- a/chain/network/src/network_protocol/borsh_conv.rs +++ b/chain/network/src/network_protocol/borsh_conv.rs @@ -1,7 +1,7 @@ /// Contains borsh <-> network_protocol conversions. use crate::network_protocol as mem; use crate::network_protocol::borsh as net; -use anyhow::bail; +use thiserror::Error; impl From<&net::Handshake> for mem::Handshake { fn from(x: &net::Handshake) -> Self { @@ -75,13 +75,19 @@ impl From<&mem::HandshakeFailureReason> for net::HandshakeFailureReason { ////////////////////////////////////////// +#[derive(Error, Debug)] +pub enum ParsePeerMessageError { + #[error("HandshakeV2 is deprecated")] + DeprecatedHandshakeV2, +} + impl TryFrom<&net::PeerMessage> for mem::PeerMessage { - type Error = anyhow::Error; - fn try_from(x: &net::PeerMessage) -> anyhow::Result { + type Error = ParsePeerMessageError; + fn try_from(x: &net::PeerMessage) -> Result { Ok(match x.clone() { - net::PeerMessage::Handshake(h) => mem::PeerMessage::Handshake((&h).try_into()?), + net::PeerMessage::Handshake(h) => mem::PeerMessage::Handshake((&h).into()), net::PeerMessage::HandshakeFailure(pi, hfr) => { - mem::PeerMessage::HandshakeFailure(pi, (&hfr).try_into()?) + mem::PeerMessage::HandshakeFailure(pi, (&hfr).into()) } net::PeerMessage::LastEdge(e) => mem::PeerMessage::LastEdge(e), net::PeerMessage::SyncRoutingTable(rtu) => mem::PeerMessage::SyncRoutingTable(rtu), @@ -99,7 +105,7 @@ impl TryFrom<&net::PeerMessage> for mem::PeerMessage { net::PeerMessage::Routed(r) => mem::PeerMessage::Routed(r), net::PeerMessage::Disconnect => mem::PeerMessage::Disconnect, net::PeerMessage::Challenge(c) => mem::PeerMessage::Challenge(c), - net::PeerMessage::_HandshakeV2 => bail!("HandshakeV2 is deprecated"), + net::PeerMessage::_HandshakeV2 => return Err(Self::Error::DeprecatedHandshakeV2), net::PeerMessage::EpochSyncRequest(epoch_id) => { mem::PeerMessage::EpochSyncRequest(epoch_id) } diff --git a/chain/network/src/network_protocol/mod.rs b/chain/network/src/network_protocol/mod.rs index b71c00d5422..54e19c945b2 100644 --- a/chain/network/src/network_protocol/mod.rs +++ b/chain/network/src/network_protocol/mod.rs @@ -21,6 +21,7 @@ use near_primitives::transaction::SignedTransaction; use near_primitives::types::{EpochId, ProtocolVersion}; use near_primitives::version::PEER_MIN_ALLOWED_PROTOCOL_VERSION; use std::fmt; +use thiserror::Error; pub use self::borsh::{ PartialSync, RoutingState, RoutingSyncV2, RoutingTableUpdate, RoutingVersion2, @@ -120,6 +121,18 @@ pub enum Encoding { Proto, } +#[derive(Error, Debug)] +pub enum ParsePeerMessageError { + #[error("BorshDecode")] + BorshDecode(std::io::Error), + #[error("BorshConv")] + BorshConv(borsh_conv::ParsePeerMessageError), + #[error("ProtoDecode")] + ProtoDecode(prost::DecodeError), + #[error("ProtoConv")] + ProtoConv(proto_conv::ParsePeerMessageError), +} + impl PeerMessage { pub(crate) fn serialize(&self, enc: Encoding) -> Vec { match enc { @@ -128,10 +141,19 @@ impl PeerMessage { } } - pub(crate) fn deserialize(enc: Encoding, data: &[u8]) -> anyhow::Result { + pub(crate) fn deserialize( + enc: Encoding, + data: &[u8], + ) -> Result { Ok(match enc { - Encoding::Borsh => (&borsh::PeerMessage::try_from_slice(data)?).try_into()?, - Encoding::Proto => (&proto::PeerMessage::decode(data)?).try_into()?, + Encoding::Borsh => (&borsh::PeerMessage::try_from_slice(data) + .map_err(ParsePeerMessageError::BorshDecode)?) + .try_into() + .map_err(ParsePeerMessageError::BorshConv)?, + Encoding::Proto => (&proto::PeerMessage::decode(data) + .map_err(ParsePeerMessageError::ProtoDecode)?) + .try_into() + .map_err(ParsePeerMessageError::ProtoConv)?, }) } diff --git a/chain/network/src/network_protocol/proto_conv.rs b/chain/network/src/network_protocol/proto_conv.rs index 1e1a1a60a7a..a0a43a80cd1 100644 --- a/chain/network/src/network_protocol/proto_conv.rs +++ b/chain/network/src/network_protocol/proto_conv.rs @@ -4,7 +4,6 @@ use crate::network_protocol::proto::peer_message::MessageType as ProtoMT; use crate::network_protocol::{ Handshake, HandshakeFailureReason, PeerMessage, RoutingSyncV2, RoutingTableUpdate, }; -use anyhow::{bail, Context}; use borsh::{BorshDeserialize as _, BorshSerialize as _}; use near_network_primitives::types::{ Edge, PartialEdgeInfo, PeerChainInfoV2, PeerInfo, RoutedMessage, @@ -16,19 +15,38 @@ use near_primitives::network::{AnnounceAccount, PeerId}; use near_primitives::syncing::{EpochSyncFinalizationResponse, EpochSyncResponse}; use near_primitives::transaction::SignedTransaction; use near_primitives::types::EpochId; +use thiserror::Error; + +#[derive(Error, Debug)] +#[error("[{idx}]: {source}")] +pub struct ParseVecError { + idx: usize, + #[source] + source: E, +} -fn try_from_vec<'a, X, Y: TryFrom<&'a X>>(xs: &'a Vec) -> Result, Y::Error> { +fn try_from_vec<'a, X, Y: TryFrom<&'a X>>( + xs: &'a Vec, +) -> Result, ParseVecError> { let mut ys = vec![]; - for x in xs { - ys.push(x.try_into()?); + for (idx, x) in xs.iter().enumerate() { + ys.push(x.try_into().map_err(|source| ParseVecError { idx, source })?); } Ok(ys) } -fn try_from_required<'a, X, Y: TryFrom<&'a X, Error = anyhow::Error>>( +#[derive(Error, Debug)] +pub enum ParseRequiredError { + #[error("missing, while required")] + Missing, + #[error(transparent)] + Other(E), +} + +fn try_from_required<'a, X, Y: TryFrom<&'a X>>( x: &'a Option, -) -> anyhow::Result { - x.as_ref().context("missing")?.try_into() +) -> Result> { + x.as_ref().ok_or(ParseRequiredError::Missing)?.try_into().map_err(ParseRequiredError::Other) } impl From<&CryptoHash> for proto::CryptoHash { @@ -37,10 +55,12 @@ impl From<&CryptoHash> for proto::CryptoHash { } } +pub type ParseCryptoHashError = Box; + impl TryFrom<&proto::CryptoHash> for CryptoHash { - type Error = anyhow::Error; - fn try_from(p: &proto::CryptoHash) -> anyhow::Result { - CryptoHash::try_from(&p.hash[..]).map_err(|err| anyhow::Error::msg(err.to_string())) + type Error = ParseCryptoHashError; + fn try_from(p: &proto::CryptoHash) -> Result { + CryptoHash::try_from(&p.hash[..]) } } @@ -52,10 +72,19 @@ impl From<&GenesisId> for proto::GenesisId { } } +#[derive(Error, Debug)] +pub enum ParseGenesisIdError { + #[error("hash: {0}")] + Hash(ParseRequiredError), +} + impl TryFrom<&proto::GenesisId> for GenesisId { - type Error = anyhow::Error; - fn try_from(p: &proto::GenesisId) -> anyhow::Result { - Ok(Self { chain_id: p.chain_id.clone(), hash: try_from_required(&p.hash).context("hash")? }) + type Error = ParseGenesisIdError; + fn try_from(p: &proto::GenesisId) -> Result { + Ok(Self { + chain_id: p.chain_id.clone(), + hash: try_from_required(&p.hash).map_err(Self::Error::Hash)?, + }) } } @@ -72,11 +101,17 @@ impl From<&PeerChainInfoV2> for proto::PeerChainInfo { } } +#[derive(Error, Debug)] +pub enum ParsePeerChainInfoV2Error { + #[error("genesis_id {0}")] + GenesisId(ParseRequiredError), +} + impl TryFrom<&proto::PeerChainInfo> for PeerChainInfoV2 { - type Error = anyhow::Error; - fn try_from(p: &proto::PeerChainInfo) -> anyhow::Result { + type Error = ParsePeerChainInfoV2Error; + fn try_from(p: &proto::PeerChainInfo) -> Result { Ok(Self { - genesis_id: try_from_required(&p.genesis_id).context("genesis_id")?, + genesis_id: try_from_required(&p.genesis_id).map_err(Self::Error::GenesisId)?, height: p.height, tracked_shards: p.tracked_shards.clone(), archival: p.archival, @@ -92,10 +127,12 @@ impl From<&PeerId> for proto::PublicKey { } } +pub type ParsePeerIdError = borsh::maybestd::io::Error; + impl TryFrom<&proto::PublicKey> for PeerId { - type Error = anyhow::Error; - fn try_from(p: &proto::PublicKey) -> anyhow::Result { - Ok(Self::try_from_slice(&p.borsh)?) + type Error = ParsePeerIdError; + fn try_from(p: &proto::PublicKey) -> Result { + Self::try_from_slice(&p.borsh) } } @@ -107,10 +144,12 @@ impl From<&PartialEdgeInfo> for proto::PartialEdgeInfo { } } +pub type ParsePartialEdgeInfoError = borsh::maybestd::io::Error; + impl TryFrom<&proto::PartialEdgeInfo> for PartialEdgeInfo { - type Error = anyhow::Error; - fn try_from(p: &proto::PartialEdgeInfo) -> anyhow::Result { - Ok(Self::try_from_slice(&p.borsh)?) + type Error = ParsePartialEdgeInfoError; + fn try_from(p: &proto::PartialEdgeInfo) -> Result { + Self::try_from_slice(&p.borsh) } } @@ -122,10 +161,12 @@ impl From<&PeerInfo> for proto::PeerInfo { } } +pub type ParsePeerInfoError = borsh::maybestd::io::Error; + impl TryFrom<&proto::PeerInfo> for PeerInfo { - type Error = anyhow::Error; - fn try_from(x: &proto::PeerInfo) -> anyhow::Result { - Ok(Self::try_from_slice(&x.borsh)?) + type Error = ParsePeerInfoError; + fn try_from(x: &proto::PeerInfo) -> Result { + Self::try_from_slice(&x.borsh) } } @@ -145,16 +186,33 @@ impl From<&Handshake> for proto::Handshake { } } +#[derive(Error, Debug)] +pub enum ParseHandshakeError { + #[error("sender_peer_id {0}")] + SenderPeerId(ParseRequiredError), + #[error("target_peer_id {0}")] + TargetPeerId(ParseRequiredError), + #[error("sender_listen_port {0}")] + SenderListenPort(std::num::TryFromIntError), + #[error("sender_chain_info {0}")] + SenderChainInfo(ParseRequiredError), + #[error("partial_edge_info {0}")] + PartialEdgeInfo(ParseRequiredError), +} + impl TryFrom<&proto::Handshake> for Handshake { - type Error = anyhow::Error; - fn try_from(p: &proto::Handshake) -> anyhow::Result { + type Error = ParseHandshakeError; + fn try_from(p: &proto::Handshake) -> Result { Ok(Self { protocol_version: p.protocol_version, oldest_supported_version: p.oldest_supported_version, - sender_peer_id: try_from_required(&p.sender_peer_id).context("sender_peer_id")?, - target_peer_id: try_from_required(&p.target_peer_id).context("target_peer_id")?, + sender_peer_id: try_from_required(&p.sender_peer_id) + .map_err(Self::Error::SenderPeerId)?, + target_peer_id: try_from_required(&p.target_peer_id) + .map_err(Self::Error::TargetPeerId)?, sender_listen_port: { - let port = u16::try_from(p.sender_listen_port).context("sender_listen_port")?; + let port = + u16::try_from(p.sender_listen_port).map_err(Self::Error::SenderListenPort)?; if port == 0 { None } else { @@ -162,9 +220,9 @@ impl TryFrom<&proto::Handshake> for Handshake { } }, sender_chain_info: try_from_required(&p.sender_chain_info) - .context("sender_chain_info")?, + .map_err(Self::Error::SenderChainInfo)?, partial_edge_info: try_from_required(&p.partial_edge_info) - .context("partial_edge_info")?, + .map_err(Self::Error::PartialEdgeInfo)?, }) } } @@ -199,28 +257,39 @@ impl From<(&PeerInfo, &HandshakeFailureReason)> for proto::HandshakeFailure { } } +#[derive(Error, Debug)] +pub enum ParseHandshakeFailureError { + #[error("peer_info: {0}")] + PeerInfo(ParseRequiredError), + #[error("genesis_id: {0}")] + GenesisId(ParseRequiredError), + #[error("reason: unknown")] + UnknownReason, +} + impl TryFrom<&proto::HandshakeFailure> for (PeerInfo, HandshakeFailureReason) { - type Error = anyhow::Error; - fn try_from(x: &proto::HandshakeFailure) -> anyhow::Result { - let pi = try_from_required(&x.peer_info).context("peer_info")?; - let hfr = - match proto::handshake_failure::Reason::from_i32(x.reason).context("unknown reason")? { - proto::handshake_failure::Reason::ProtocolVersionMismatch => { - HandshakeFailureReason::ProtocolVersionMismatch { - version: x.version, - oldest_supported_version: x.oldest_supported_version, - } - } - proto::handshake_failure::Reason::GenesisMismatch => { - HandshakeFailureReason::GenesisMismatch( - try_from_required(&x.genesis_id).context("genesis_id")?, - ) + type Error = ParseHandshakeFailureError; + fn try_from(x: &proto::HandshakeFailure) -> Result { + let pi = try_from_required(&x.peer_info).map_err(Self::Error::PeerInfo)?; + let hfr = match proto::handshake_failure::Reason::from_i32(x.reason) + .unwrap_or(proto::handshake_failure::Reason::Unknown) + { + proto::handshake_failure::Reason::ProtocolVersionMismatch => { + HandshakeFailureReason::ProtocolVersionMismatch { + version: x.version, + oldest_supported_version: x.oldest_supported_version, } - proto::handshake_failure::Reason::InvalidTarget => { - HandshakeFailureReason::InvalidTarget - } - proto::handshake_failure::Reason::Unknown => bail!("unknown reason"), - }; + } + proto::handshake_failure::Reason::GenesisMismatch => { + HandshakeFailureReason::GenesisMismatch( + try_from_required(&x.genesis_id).map_err(Self::Error::GenesisId)?, + ) + } + proto::handshake_failure::Reason::InvalidTarget => { + HandshakeFailureReason::InvalidTarget + } + proto::handshake_failure::Reason::Unknown => return Err(Self::Error::UnknownReason), + }; Ok((pi, hfr)) } } @@ -233,10 +302,12 @@ impl From<&Edge> for proto::Edge { } } +pub type ParseEdgeError = borsh::maybestd::io::Error; + impl TryFrom<&proto::Edge> for Edge { - type Error = anyhow::Error; - fn try_from(x: &proto::Edge) -> anyhow::Result { - Ok(Self::try_from_slice(&x.borsh)?) + type Error = ParseEdgeError; + fn try_from(x: &proto::Edge) -> Result { + Self::try_from_slice(&x.borsh) } } @@ -248,10 +319,12 @@ impl From<&AnnounceAccount> for proto::AnnounceAccount { } } +pub type ParseAnnounceAccountError = borsh::maybestd::io::Error; + impl TryFrom<&proto::AnnounceAccount> for AnnounceAccount { - type Error = anyhow::Error; - fn try_from(x: &proto::AnnounceAccount) -> anyhow::Result { - Ok(Self::try_from_slice(&x.borsh)?) + type Error = ParseAnnounceAccountError; + fn try_from(x: &proto::AnnounceAccount) -> Result { + Self::try_from_slice(&x.borsh) } } @@ -266,12 +339,20 @@ impl From<&RoutingTableUpdate> for proto::RoutingTableUpdate { } } +#[derive(Error, Debug)] +pub enum ParseRoutingTableUpdateError { + #[error("edges {0}")] + Edges(ParseVecError), + #[error("accounts {0}")] + Accounts(ParseVecError), +} + impl TryFrom<&proto::RoutingTableUpdate> for RoutingTableUpdate { - type Error = anyhow::Error; - fn try_from(x: &proto::RoutingTableUpdate) -> anyhow::Result { + type Error = ParseRoutingTableUpdateError; + fn try_from(x: &proto::RoutingTableUpdate) -> Result { Ok(Self { - edges: try_from_vec(&x.edges).context("edges")?, - accounts: try_from_vec(&x.accounts).context("accounts")?, + edges: try_from_vec(&x.edges).map_err(Self::Error::Edges)?, + accounts: try_from_vec(&x.accounts).map_err(Self::Error::Accounts)?, }) } } @@ -284,10 +365,12 @@ impl From<&BlockHeader> for proto::BlockHeader { } } +pub type ParseBlockHeaderError = borsh::maybestd::io::Error; + impl TryFrom<&proto::BlockHeader> for BlockHeader { - type Error = anyhow::Error; - fn try_from(x: &proto::BlockHeader) -> anyhow::Result { - Ok(Self::try_from_slice(&x.borsh)?) + type Error = ParseBlockHeaderError; + fn try_from(x: &proto::BlockHeader) -> Result { + Self::try_from_slice(&x.borsh) } } @@ -299,10 +382,12 @@ impl From<&Block> for proto::Block { } } +pub type ParseBlockError = borsh::maybestd::io::Error; + impl TryFrom<&proto::Block> for Block { - type Error = anyhow::Error; - fn try_from(x: &proto::Block) -> anyhow::Result { - Ok(Self::try_from_slice(&x.borsh)?) + type Error = ParseBlockError; + fn try_from(x: &proto::Block) -> Result { + Self::try_from_slice(&x.borsh) } } @@ -390,72 +475,129 @@ impl From<&PeerMessage> for proto::PeerMessage { } } +pub type ParseTransactionError = borsh::maybestd::io::Error; +pub type ParseRoutedError = borsh::maybestd::io::Error; +pub type ParseChallengeError = borsh::maybestd::io::Error; +pub type ParseEpochSyncResponseError = borsh::maybestd::io::Error; +pub type ParseEpochSyncFinalizationResponseError = borsh::maybestd::io::Error; +pub type ParseRoutingTableSyncV2Error = borsh::maybestd::io::Error; + +#[derive(Error, Debug)] +pub enum ParsePeerMessageError { + #[error("empty message")] + Empty, + #[error("handshake: {0}")] + Handshake(ParseHandshakeError), + #[error("handshake_failure: {0}")] + HandshakeFailure(ParseHandshakeFailureError), + #[error("last_edge: {0}")] + LastEdge(ParseRequiredError), + #[error("sync_routing_table: {0}")] + SyncRoutingTable(ParseRoutingTableUpdateError), + #[error("update_nonce_requrest: {0}")] + UpdateNonceRequest(ParseRequiredError), + #[error("update_nonce_response: {0}")] + UpdateNonceResponse(ParseRequiredError), + #[error("peers_response: {0}")] + PeersResponse(ParseVecError), + #[error("block_headers_request: {0}")] + BlockHeadersRequest(ParseVecError), + #[error("block_headers_response: {0}")] + BlockHeadersResponse(ParseVecError), + #[error("block_request: {0}")] + BlockRequest(ParseRequiredError), + #[error("block_response: {0}")] + BlockResponse(ParseRequiredError), + #[error("transaction: {0}")] + Transaction(ParseTransactionError), + #[error("routed: {0}")] + Routed(ParseRoutedError), + #[error("challenge: {0}")] + Challenge(ParseChallengeError), + #[error("epoch_sync_request: {0}")] + EpochSyncRequest(ParseRequiredError), + #[error("epoch_sync_response: {0}")] + EpochSyncResponse(ParseEpochSyncResponseError), + #[error("epoch_sync_finalization_request: {0}")] + EpochSyncFinalizationRequest(ParseRequiredError), + #[error("epoch_sync_finalization_response: {0}")] + EpochSyncFinalizationResponse(ParseEpochSyncFinalizationResponseError), + #[error("routing_table_sync_v2")] + RoutingTableSyncV2(ParseRoutingTableSyncV2Error), +} + impl TryFrom<&proto::PeerMessage> for PeerMessage { - type Error = anyhow::Error; - fn try_from(x: &proto::PeerMessage) -> anyhow::Result { - Ok(match x.message_type.as_ref().context("empty or unknown")? { - ProtoMT::Handshake(h) => PeerMessage::Handshake(h.try_into().context("Handshake")?), + type Error = ParsePeerMessageError; + fn try_from(x: &proto::PeerMessage) -> Result { + Ok(match x.message_type.as_ref().ok_or(Self::Error::Empty)? { + ProtoMT::Handshake(h) => { + PeerMessage::Handshake(h.try_into().map_err(Self::Error::Handshake)?) + } ProtoMT::HandshakeFailure(hf) => { - let (pi, hfr) = hf.try_into().context("HandshakeFailure")?; + let (pi, hfr) = hf.try_into().map_err(Self::Error::HandshakeFailure)?; PeerMessage::HandshakeFailure(pi, hfr) } ProtoMT::LastEdge(le) => { - PeerMessage::LastEdge(try_from_required(&le.edge).context("LastEdge")?) - } - ProtoMT::SyncRoutingTable(rtu) => { - PeerMessage::SyncRoutingTable(rtu.try_into().context("SyncRoutingTable")?) + PeerMessage::LastEdge(try_from_required(&le.edge).map_err(Self::Error::LastEdge)?) } + ProtoMT::SyncRoutingTable(rtu) => PeerMessage::SyncRoutingTable( + rtu.try_into().map_err(Self::Error::SyncRoutingTable)?, + ), ProtoMT::UpdateNonceRequest(unr) => PeerMessage::RequestUpdateNonce( - try_from_required(&unr.partial_edge_info).context("UpdateNonceRequest")?, + try_from_required(&unr.partial_edge_info) + .map_err(Self::Error::UpdateNonceRequest)?, ), ProtoMT::UpdateNonceResponse(unr) => PeerMessage::ResponseUpdateNonce( - try_from_required(&unr.edge).context("UpdateNonceResponse")?, + try_from_required(&unr.edge).map_err(Self::Error::UpdateNonceResponse)?, ), ProtoMT::PeersRequest(_) => PeerMessage::PeersRequest, - ProtoMT::PeersResponse(pr) => { - PeerMessage::PeersResponse(try_from_vec(&pr.peers).context("PeersResponse")?) - } + ProtoMT::PeersResponse(pr) => PeerMessage::PeersResponse( + try_from_vec(&pr.peers).map_err(Self::Error::PeersResponse)?, + ), ProtoMT::BlockHeadersRequest(bhr) => PeerMessage::BlockHeadersRequest( - try_from_vec(&bhr.block_hashes).context("BlockHeadersRequest")?, + try_from_vec(&bhr.block_hashes).map_err(Self::Error::BlockHeadersRequest)?, ), ProtoMT::BlockHeadersResponse(bhr) => PeerMessage::BlockHeaders( - try_from_vec(&bhr.block_headers).context("BlockHeadersResponse")?, + try_from_vec(&bhr.block_headers).map_err(Self::Error::BlockHeadersResponse)?, ), ProtoMT::BlockRequest(br) => PeerMessage::BlockRequest( - try_from_required(&br.block_hash).context("BlockRequest")?, + try_from_required(&br.block_hash).map_err(Self::Error::BlockRequest)?, + ), + ProtoMT::BlockResponse(br) => PeerMessage::Block( + try_from_required(&br.block).map_err(Self::Error::BlockResponse)?, ), - ProtoMT::BlockResponse(br) => { - PeerMessage::Block(try_from_required(&br.block).context("BlockResponse")?) - } ProtoMT::Transaction(t) => PeerMessage::Transaction( - SignedTransaction::try_from_slice(&t.borsh).context("Transaction")?, + SignedTransaction::try_from_slice(&t.borsh).map_err(Self::Error::Transaction)?, ), ProtoMT::Routed(r) => PeerMessage::Routed(Box::new( - RoutedMessage::try_from_slice(&r.borsh).context("Routed")?, + RoutedMessage::try_from_slice(&r.borsh).map_err(Self::Error::Routed)?, )), ProtoMT::Disconnect(_) => PeerMessage::Disconnect, - ProtoMT::Challenge(c) => { - PeerMessage::Challenge(Challenge::try_from_slice(&c.borsh).context("Challenge")?) - } + ProtoMT::Challenge(c) => PeerMessage::Challenge( + Challenge::try_from_slice(&c.borsh).map_err(Self::Error::Challenge)?, + ), ProtoMT::EpochSyncRequest(esr) => PeerMessage::EpochSyncRequest(EpochId( - try_from_required(&esr.epoch_id).context("EpochSyncRequest")?, + try_from_required(&esr.epoch_id).map_err(Self::Error::EpochSyncRequest)?, )), ProtoMT::EpochSyncResponse(esr) => PeerMessage::EpochSyncResponse(Box::new( - EpochSyncResponse::try_from_slice(&esr.borsh).context("EpochSyncResponse")?, + EpochSyncResponse::try_from_slice(&esr.borsh) + .map_err(Self::Error::EpochSyncResponse)?, )), ProtoMT::EpochSyncFinalizationRequest(esr) => { PeerMessage::EpochSyncFinalizationRequest(EpochId( - try_from_required(&esr.epoch_id).context("EpochSyncFinalizationRequest")?, + try_from_required(&esr.epoch_id) + .map_err(Self::Error::EpochSyncFinalizationRequest)?, )) } ProtoMT::EpochSyncFinalizationResponse(esr) => { PeerMessage::EpochSyncFinalizationResponse(Box::new( EpochSyncFinalizationResponse::try_from_slice(&esr.borsh) - .context("EpochSyncFinalizationResponse")?, + .map_err(Self::Error::EpochSyncFinalizationResponse)?, )) } ProtoMT::RoutingTableSyncV2(rts) => PeerMessage::RoutingTableSyncV2( - RoutingSyncV2::try_from_slice(&rts.borsh).context("RoutingTableSyncV2")?, + RoutingSyncV2::try_from_slice(&rts.borsh) + .map_err(Self::Error::RoutingTableSyncV2)?, ), }) } diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 85c0f8a990e..f12f34f7fe2 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -1,3 +1,4 @@ +use crate::network_protocol::{Encoding, ParsePeerMessageError}; use crate::peer::codec::Codec; use crate::peer::tracker::Tracker; use crate::private_actix::{ @@ -42,6 +43,7 @@ use std::net::SocketAddr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; +use thiserror::Error; use tracing::{debug, error, info, trace, warn}; type WriteHalf = tokio::io::WriteHalf; @@ -106,7 +108,7 @@ pub(crate) struct PeerActor { protocol_buffers_supported: bool, /// Whether the PeerActor should skip protobuf support detection and use /// a given encoding right away. - force_encoding: Option, + force_encoding: Option, } impl Debug for PeerActor { @@ -115,6 +117,15 @@ impl Debug for PeerActor { } } +/// A custom IOError type because FramedWrite is hiding the actual +/// underlying std::io::Error. +/// TODO: replace FramedWrite with sth more reasonable. +#[derive(Error, Debug)] +pub enum IOError { + #[error("{tid} Failed to send message {message_type} of size {size}")] + Send { tid: i32, message_type: String, size: usize }, +} + impl PeerActor { #[allow(clippy::too_many_arguments)] pub(crate) fn new( @@ -132,7 +143,7 @@ impl PeerActor { txns_since_last_block: Arc, peer_counter: Arc, throttle_controller: ThrottleController, - force_encoding: Option, + force_encoding: Option, ) -> Self { PeerActor { my_node_info, @@ -161,17 +172,16 @@ impl PeerActor { } } - fn parse_message(&mut self, msg: &[u8]) -> anyhow::Result { + fn parse_message(&mut self, msg: &[u8]) -> Result { if let Some(e) = self.force_encoding { return PeerMessage::deserialize(e, msg); } if self.peer_status == PeerStatus::Connecting { - if let Ok(msg) = PeerMessage::deserialize(crate::network_protocol::Encoding::Proto, msg) - { + if let Ok(msg) = PeerMessage::deserialize(Encoding::Proto, msg) { self.protocol_buffers_supported = true; return Ok(msg); } - match PeerMessage::deserialize(crate::network_protocol::Encoding::Borsh, msg) { + match PeerMessage::deserialize(Encoding::Borsh, msg) { Ok(msg) => Ok(msg), Err(err) => { self.send_message_or_log(&PeerMessage::HandshakeFailure( @@ -185,29 +195,29 @@ impl PeerActor { } } } else if self.protocol_buffers_supported { - PeerMessage::deserialize(crate::network_protocol::Encoding::Proto, msg) + PeerMessage::deserialize(Encoding::Proto, msg) } else { - PeerMessage::deserialize(crate::network_protocol::Encoding::Borsh, msg) + PeerMessage::deserialize(Encoding::Borsh, msg) } } fn send_message_or_log(&mut self, msg: &PeerMessage) { if let Err(err) = self.send_message(msg) { - debug!(target: "network", "send_message(): {}", err); + warn!(target: "network", "send_message(): {}", err); } } - fn send_message(&mut self, msg: &PeerMessage) -> anyhow::Result<()> { + fn send_message(&mut self, msg: &PeerMessage) -> Result<(), IOError> { if let Some(enc) = self.force_encoding { return self.send_message_with_encoding(msg, enc); } if self.peer_status == PeerStatus::Connecting { - self.send_message_with_encoding(msg, crate::network_protocol::Encoding::Proto)?; - self.send_message_with_encoding(msg, crate::network_protocol::Encoding::Borsh)?; + self.send_message_with_encoding(msg, Encoding::Proto)?; + self.send_message_with_encoding(msg, Encoding::Borsh)?; } else if self.protocol_buffers_supported { - self.send_message_with_encoding(msg, crate::network_protocol::Encoding::Proto)?; + self.send_message_with_encoding(msg, Encoding::Proto)?; } else { - self.send_message_with_encoding(msg, crate::network_protocol::Encoding::Borsh)?; + self.send_message_with_encoding(msg, Encoding::Borsh)?; } Ok(()) } @@ -215,8 +225,8 @@ impl PeerActor { fn send_message_with_encoding( &mut self, msg: &PeerMessage, - mode: crate::network_protocol::Encoding, - ) -> anyhow::Result<()> { + enc: Encoding, + ) -> Result<(), IOError> { // Skip sending block and headers if we received it or header from this peer. // Record block requests in tracker. match msg { @@ -225,7 +235,7 @@ impl PeerActor { _ => (), }; - let bytes = msg.serialize(mode); + let bytes = msg.serialize(enc); self.tracker.increment_sent(bytes.len() as u64); let bytes_len = bytes.len(); if !self.framed.write(bytes) { @@ -233,12 +243,11 @@ impl PeerActor { let tid = near_rust_allocator_proxy::get_tid(); #[cfg(not(feature = "performance_stats"))] let tid = 0; - anyhow::bail!( - "{} Failed to send message {} of size {}", + return Err(IOError::Send { tid, - strum::AsStaticRef::as_static(msg), - bytes_len, - ) + message_type: strum::AsStaticRef::as_static(msg).to_string(), + size: bytes_len, + }); } Ok(()) } diff --git a/chain/network/src/tests/network_protocol.rs b/chain/network/src/tests/network_protocol.rs index 8cac200d3d8..09f894cf5db 100644 --- a/chain/network/src/tests/network_protocol.rs +++ b/chain/network/src/tests/network_protocol.rs @@ -80,6 +80,15 @@ fn serialize_deserialize() -> anyhow::Result<()> { } } + // Test the unambiguous parsing argument described in + // https://docs.google.com/document/d/1gCWmt9O-h_-5JDXIqbKxAaSS3Q9pryB1f9DDY1mMav4/edit#heading=h.x1awbr2acslb + for m in &msgs { + let x = m.serialize(Encoding::Proto); + assert!(x[0] >= 32, "serialize({},PROTO)[0] = {:?}, want >= 32", m, x.get(0)); + let y = m.serialize(Encoding::Borsh); + assert!(y[0] <= 21, "serialize({},BORSH)[0] = {:?}, want <= 21", m, y.get(0)); + } + // Encodings should never be compatible. for (from, to) in [(Encoding::Proto, Encoding::Borsh), (Encoding::Borsh, Encoding::Proto)] { for m in &msgs { diff --git a/chain/network/src/tests/util.rs b/chain/network/src/tests/util.rs index a34ac86282b..613b9508905 100644 --- a/chain/network/src/tests/util.rs +++ b/chain/network/src/tests/util.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] use once_cell::sync::Lazy; // TODO: consider wrapping these types to prevent interaction with diff --git a/core/primitives-core/src/hash.rs b/core/primitives-core/src/hash.rs index 28f46cffb5d..9067244f768 100644 --- a/core/primitives-core/src/hash.rs +++ b/core/primitives-core/src/hash.rs @@ -74,7 +74,7 @@ impl<'de> Deserialize<'de> for CryptoHash { } impl std::str::FromStr for CryptoHash { - type Err = Box; + type Err = Box; fn from_str(s: &str) -> Result { let bytes = from_base(s).map_err::(|e| e.to_string().into())?; @@ -83,7 +83,7 @@ impl std::str::FromStr for CryptoHash { } impl TryFrom<&[u8]> for CryptoHash { - type Error = Box; + type Error = Box; fn try_from(bytes: &[u8]) -> Result { Ok(CryptoHash(bytes.try_into()?)) diff --git a/core/primitives-core/src/serialize.rs b/core/primitives-core/src/serialize.rs index 6b7f6501bba..6ed5ed8e338 100644 --- a/core/primitives-core/src/serialize.rs +++ b/core/primitives-core/src/serialize.rs @@ -2,7 +2,7 @@ pub fn to_base>(input: T) -> String { bs58::encode(input).into_string() } -pub fn from_base(s: &str) -> Result, Box> { +pub fn from_base(s: &str) -> Result, Box> { bs58::decode(s).into_vec().map_err(|err| err.into()) } @@ -10,11 +10,11 @@ pub fn to_base64>(input: T) -> String { base64::encode(&input) } -pub fn from_base64(s: &str) -> Result, Box> { +pub fn from_base64(s: &str) -> Result, Box> { base64::decode(s).map_err(|err| err.into()) } -pub fn from_base_buf(s: &str, buffer: &mut Vec) -> Result<(), Box> { +pub fn from_base_buf(s: &str, buffer: &mut Vec) -> Result<(), Box> { match bs58::decode(s).into(buffer) { Ok(_) => Ok(()), Err(err) => Err(err.into()), @@ -34,8 +34,8 @@ where } } -pub trait BaseDecode: for<'a> TryFrom<&'a [u8], Error = Box> { - fn from_base(s: &str) -> Result> { +pub trait BaseDecode: for<'a> TryFrom<&'a [u8], Error = Box> { + fn from_base(s: &str) -> Result> { let bytes = from_base(s)?; Self::try_from(&bytes) } diff --git a/core/primitives/src/shard_layout.rs b/core/primitives/src/shard_layout.rs index ad061dff48c..627cd790011 100644 --- a/core/primitives/src/shard_layout.rs +++ b/core/primitives/src/shard_layout.rs @@ -294,7 +294,7 @@ impl ShardUId { } impl TryFrom<&[u8]> for ShardUId { - type Error = Box; + type Error = Box; /// Deserialize `bytes` to shard uid fn try_from(bytes: &[u8]) -> Result { @@ -319,7 +319,7 @@ pub fn get_block_shard_uid(block_hash: &CryptoHash, shard_uid: &ShardUId) -> Vec #[allow(unused)] pub fn get_block_shard_uid_rev( key: &[u8], -) -> Result<(CryptoHash, ShardUId), Box> { +) -> Result<(CryptoHash, ShardUId), Box> { if key.len() != 40 { return Err( std::io::Error::new(std::io::ErrorKind::InvalidInput, "Invalid key length").into() diff --git a/core/primitives/src/utils.rs b/core/primitives/src/utils.rs index 94f5c95b075..c074d2d1143 100644 --- a/core/primitives/src/utils.rs +++ b/core/primitives/src/utils.rs @@ -189,7 +189,7 @@ pub fn get_block_shard_id(block_hash: &CryptoHash, shard_id: ShardId) -> Vec pub fn get_block_shard_id_rev( key: &[u8], -) -> Result<(CryptoHash, ShardId), Box> { +) -> Result<(CryptoHash, ShardId), Box> { if key.len() != 40 { return Err( std::io::Error::new(std::io::ErrorKind::InvalidInput, "Invalid key length").into() diff --git a/core/primitives/src/views.rs b/core/primitives/src/views.rs index 47e0b6e8188..430f3465bb3 100644 --- a/core/primitives/src/views.rs +++ b/core/primitives/src/views.rs @@ -917,7 +917,7 @@ impl From for ActionView { } impl TryFrom for Action { - type Error = Box; + type Error = Box; fn try_from(action_view: ActionView) -> Result { Ok(match action_view { @@ -1423,7 +1423,7 @@ impl From for ReceiptView { } impl TryFrom for Receipt { - type Error = Box; + type Error = Box; fn try_from(receipt_view: ReceiptView) -> Result { Ok(Receipt {