Skip to content

Commit

Permalink
change(network): output more info for debug rpc timeout (nervosnetwor…
Browse files Browse the repository at this point in the history
…k#157)

* feat(network): output peer connected addr on RPC error

* change(network): also output number of connected peers

* change(network): remove rpc entry after timeout

* doc(network): comment about expect() on Option session public key

* change(network): output more info on rpc entry not found
  • Loading branch information
zeroqn committed Feb 11, 2020
1 parent 66f9b47 commit e5566de
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 56 deletions.
32 changes: 32 additions & 0 deletions core/network/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ use std::{
time::{Duration, Instant},
};

use derive_more::Display;
use futures::{pin_mut, task::AtomicWaker};
use futures_timer::Delay;
use serde_derive::{Deserialize, Serialize};
use tentacle::multiaddr::{Multiaddr, Protocol};

#[macro_export]
Expand Down Expand Up @@ -84,3 +86,33 @@ impl Future for HeartBeat {
Poll::Pending
}
}

#[derive(Debug, Display, PartialEq, Eq, Serialize, Deserialize, Clone)]
#[display(fmt = "{}:{}", host, port)]
pub struct ConnectedAddr {
host: String,
port: u16,
}

impl From<&Multiaddr> for ConnectedAddr {
fn from(multiaddr: &Multiaddr) -> Self {
use tentacle::multiaddr::Protocol::*;

let mut host = None;
let mut port = 0u16;

for comp in multiaddr.iter() {
match comp {
IP4(ip_addr) => host = Some(ip_addr.to_string()),
IP6(ip_addr) => host = Some(ip_addr.to_string()),
DNS4(dns_addr) | DNS6(dns_addr) => host = Some(dns_addr.to_string()),
TLS(tls_addr) => host = Some(tls_addr.to_string()),
TCP(p) => port = p,
_ => (),
}
}

let host = host.unwrap_or_else(|| multiaddr.to_string());
ConnectedAddr { host, port }
}
}
13 changes: 9 additions & 4 deletions core/network/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use tentacle::{ProtocolId, SessionId};

use protocol::{types::Address, ProtocolError, ProtocolErrorKind};

use crate::common::ConnectedAddr;

#[derive(Debug, Display)]
pub enum ErrorKind {
#[display(fmt = "{} offline", _0)]
Expand Down Expand Up @@ -40,14 +42,17 @@ pub enum ErrorKind {
#[display(fmt = "kind: session id not found in context")]
NoSessionId,

#[display(fmt = "kind: remote peer id not found in context")]
NoRemotePeerId,

#[display(fmt = "kind: rpc id not found in context")]
NoRpcId,

#[display(fmt = "kind: rpc future dropped")]
RpcDropped,
#[display(fmt = "kind: rpc future dropped {:?}", _0)]
RpcDropped(Option<ConnectedAddr>),

#[display(fmt = "kind: rpc timeout")]
RpcTimeout,
#[display(fmt = "kind: rpc timeout {:?}", _0)]
RpcTimeout(Option<ConnectedAddr>),

#[display(fmt = "kind: not reactor register for {}", _0)]
NoReactor(String),
Expand Down
12 changes: 9 additions & 3 deletions core/network/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ pub mod serde_multi;
use derive_more::Constructor;
use prost::Message;
use protocol::Bytes;
use tentacle::SessionId;
use tentacle::{secio::PeerId, SessionId};

use crate::{
common::ConnectedAddr,
endpoint::Endpoint,
error::{ErrorKind, NetworkError},
};

#[derive(Constructor)]
#[non_exhaustive]
pub struct RawSessionMessage {
pub(crate) sid: SessionId,
pub(crate) pid: PeerId,
pub(crate) msg: Bytes,
}

Expand Down Expand Up @@ -49,9 +52,12 @@ impl NetworkMessage {
}

#[derive(Constructor)]
#[non_exhaustive]
pub struct SessionMessage {
pub(crate) sid: SessionId,
pub(crate) msg: NetworkMessage,
pub(crate) sid: SessionId,
pub(crate) pid: PeerId,
pub(crate) msg: NetworkMessage,
pub(crate) connected_addr: Option<ConnectedAddr>,
}

#[cfg(test)]
Expand Down
10 changes: 8 additions & 2 deletions core/network/src/outbound/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures::future::{self, Either};
use futures_timer::Delay;
use log::warn;
use protocol::{
traits::{Context, MessageCodec, Priority, Rpc},
Bytes, ProtocolResult,
Expand Down Expand Up @@ -64,6 +65,7 @@ where
let endpoint = end.parse::<Endpoint>()?;
let sid = cx.session_id()?;
let rid = self.map.next_rpc_id();
let connected_addr = cx.remote_connected_addr();
let done_rx = self.map.insert::<R>(sid, rid);

let data = msg.encode().await?;
Expand All @@ -75,10 +77,14 @@ where
let timeout = Delay::new(self.timeout.rpc);
let ret = match future::select(done_rx, timeout).await {
Either::Left((ret, _timeout)) => {
ret.map_err(|_| NetworkError::from(ErrorKind::RpcDropped))?
ret.map_err(|_| NetworkError::from(ErrorKind::RpcDropped(connected_addr)))?
}
Either::Right((_unresolved, _timeout)) => {
return Err(NetworkError::from(ErrorKind::RpcTimeout).into());
if let Err(err) = self.map.take::<R>(sid, rid) {
warn!("rpc: remove {}, maybe we just got response", err);
}

return Err(NetworkError::from(ErrorKind::RpcTimeout(connected_addr)).into());
}
};

Expand Down
17 changes: 13 additions & 4 deletions core/network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod ident;
mod peer;
mod persist;

use peer::{ConnectedAddr, PeerState};
use peer::PeerState;
use persist::{NoopPersistence, PeerPersistence, Persistence};

pub use disc::DiscoveryAddrManager;
Expand Down Expand Up @@ -44,9 +44,10 @@ use tentacle::{
};

use crate::{
common::HeartBeat,
common::{ConnectedAddr, HeartBeat},
error::NetworkError,
event::{ConnectionEvent, ConnectionType, MultiUsersMessage, PeerManagerEvent, Session},
traits::PeerInfoQuerier,
};

const MAX_RETRY_COUNT: usize = 6;
Expand Down Expand Up @@ -471,6 +472,12 @@ impl PeerManagerHandle {
}
}

impl PeerInfoQuerier for PeerManagerHandle {
fn connected_addr(&self, pid: &PeerId) -> Option<ConnectedAddr> {
self.inner.connected_addr(pid)
}
}

pub struct PeerManager {
// core peer pool
inner: Arc<Inner>,
Expand Down Expand Up @@ -1038,10 +1045,12 @@ impl Future for PeerManager {
self.process_event(event);
}

let connected_peers_addr = self.connected_peers_addr();
debug!(
"network: {:?}: connected peer_addr(s): {:?}",
"network: {:?}: connected peer_addr(s) {}: {:?}",
self.peer_id,
self.connected_peers_addr()
connected_peers_addr.len(),
connected_peers_addr
);

// Check connecting count
Expand Down
32 changes: 2 additions & 30 deletions core/network/src/peer_manager/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,11 @@ use tentacle::{
secio::{PeerId, PublicKey},
};

use crate::common::ConnectedAddr;

pub const BACKOFF_BASE: usize = 5;
pub const VALID_ATTEMPT_INTERVAL: u64 = 4;

#[derive(Debug, Display, PartialEq, Eq, Serialize, Deserialize, Clone)]
#[display(fmt = "{}:{}", host, port)]
pub struct ConnectedAddr {
host: String,
port: u16,
}

impl From<&Multiaddr> for ConnectedAddr {
fn from(multiaddr: &Multiaddr) -> Self {
use tentacle::multiaddr::Protocol::*;

let mut host = None;
let mut port = 0u16;

for comp in multiaddr.iter() {
match comp {
IP4(ip_addr) => host = Some(ip_addr.to_string()),
IP6(ip_addr) => host = Some(ip_addr.to_string()),
DNS4(dns_addr) | DNS6(dns_addr) => host = Some(dns_addr.to_string()),
TLS(tls_addr) => host = Some(tls_addr.to_string()),
TCP(p) => port = p,
_ => (),
}
}

let host = host.unwrap_or_else(|| multiaddr.to_string());
ConnectedAddr { host, port }
}
}

// TODO: display next_retry
#[derive(Debug, Clone, Serialize, Deserialize, Display)]
#[display(
Expand Down
5 changes: 4 additions & 1 deletion core/network/src/protocols/transmitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ impl ServiceProtocol for Transmitter {
}

fn received(&mut self, ctx: ProtocolContextMutRef, data: tentacle::bytes::Bytes) {
let pubkey = ctx.session.remote_pubkey.as_ref();
// Peers without encryption will not able to connect to us.
let peer_id = pubkey.expect("impossible, no public key").peer_id();
let data = BytesMut::from(data.as_ref()).freeze();
let raw_msg = RawSessionMessage::new(ctx.session.id, data);

let raw_msg = RawSessionMessage::new(ctx.session.id, peer_id, data);
if self.msg_deliver.unbounded_send(raw_msg).is_err() {
error!("network: transmitter: msg receiver dropped");
}
Expand Down
26 changes: 23 additions & 3 deletions core/network/src/reactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,19 @@ where
let handler = Arc::clone(&self.handler);
let rpc_map = Arc::clone(&self.rpc_map);

let SessionMessage { sid, msg: net_msg } = smsg;
let SessionMessage {
sid,
msg: net_msg,
pid,
connected_addr,
..
} = smsg;

let endpoint = net_msg.url.to_owned();
let mut ctx = Context::new().set_session_id(sid);
let mut ctx = Context::new().set_session_id(sid).set_remote_peer_id(pid);
if let Some(ref connected_addr) = connected_addr {
ctx = ctx.set_remote_connected_addr(connected_addr.clone());
}

let react = async move {
let endpoint = net_msg.url.parse::<Endpoint>()?;
Expand All @@ -78,8 +87,19 @@ where
}
EndpointScheme::RpcResponse => {
let rpc_endpoint = RpcEndpoint::try_from(endpoint)?;
let resp_tx = rpc_map.take::<M>(sid, rpc_endpoint.rpc_id().value())?;
let rpc_id = rpc_endpoint.rpc_id().value();

if !rpc_map.contains(sid, rpc_id) {
let full_url = rpc_endpoint.endpoint().full_url();

warn!(
"rpc entry for {} from {:?} not found, maybe timeout",
full_url, connected_addr
);
return Ok(());
}

let resp_tx = rpc_map.take::<M>(sid, rpc_endpoint.rpc_id().value())?;
if resp_tx.send(content).is_err() {
let end = rpc_endpoint.endpoint().full_url();

Expand Down
21 changes: 17 additions & 4 deletions core/network/src/reactor/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use crate::{
endpoint::Endpoint,
error::{ErrorKind, NetworkError},
message::{NetworkMessage, RawSessionMessage, SessionMessage},
traits::Compression,
traits::{Compression, PeerInfoQuerier},
};

pub struct MessageRouter<C> {
pub struct MessageRouter<C, PQ> {
// Endpoint to reactor channel map
reactor_map: Arc<RwLock<HashMap<Endpoint, UnboundedSender<SessionMessage>>>>,

Expand All @@ -32,24 +32,30 @@ pub struct MessageRouter<C> {
// Compression to decompress message
compression: C,

// PeerInfo Querier
peer_info_querier: PQ,

// Fatal system error reporter
sys_tx: UnboundedSender<NetworkError>,
}

impl<C> MessageRouter<C>
impl<C, PQ> MessageRouter<C, PQ>
where
C: Compression + Send + Unpin + Clone + 'static,
PQ: PeerInfoQuerier + Send + Unpin + Clone + 'static,
{
pub fn new(
raw_msg_rx: UnboundedReceiver<RawSessionMessage>,
compression: C,
peer_info_querier: PQ,
sys_tx: UnboundedSender<NetworkError>,
) -> Self {
MessageRouter {
reactor_map: Default::default(),

raw_msg_rx,
compression,
peer_info_querier,

sys_tx,
}
Expand All @@ -66,6 +72,7 @@ where
pub fn route_raw_message(&self, raw_msg: RawSessionMessage) -> impl Future<Output = ()> {
let reactor_map = Arc::clone(&self.reactor_map);
let compression = self.compression.clone();
let peer_info_querier = self.peer_info_querier.clone();
let sys_tx = self.sys_tx.clone();

let route = async move {
Expand All @@ -78,9 +85,14 @@ where
let opt_smsg_tx = reactor_map.get(&endpoint).cloned();
let smsg_tx = opt_smsg_tx.ok_or_else(|| ErrorKind::NoReactor(endpoint.root()))?;

// Peer may disconnect when we try to fetch its connected address.
// This connected addr is mainly for debug purpose, so no error.
let connected_addr = peer_info_querier.connected_addr(&raw_msg.pid);
let smsg = SessionMessage {
sid: raw_msg.sid,
pid: raw_msg.pid,
msg: net_msg,
connected_addr,
};

if smsg_tx.unbounded_send(smsg).is_err() {
Expand All @@ -97,9 +109,10 @@ where
}
}

impl<C> Future for MessageRouter<C>
impl<C, PQ> Future for MessageRouter<C, PQ>
where
C: Compression + Send + Unpin + Clone + 'static,
PQ: PeerInfoQuerier + Send + Unpin + Clone + 'static,
{
type Output = ();

Expand Down
5 changes: 5 additions & 0 deletions core/network/src/rpc_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ impl RpcMap {
done_rx
}

pub fn contains(&self, sid: SessionId, rid: u64) -> bool {
let key = Key::new(sid, rid);
self.map.read().contains_key(&key)
}

pub fn take<T: Send + 'static>(
&self,
sid: SessionId,
Expand Down
Loading

0 comments on commit e5566de

Please sign in to comment.