Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

change(network): output more info for debug rpc timeout #157

Merged
merged 5 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions core/network/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ use std::{
time::{Duration, Instant},
};

use derive_more::Display;
use futures::{pin_mut, task::AtomicWaker};
use futures_timer::Delay;
use serde_derive::{Deserialize, Serialize};
use tentacle::multiaddr::{Multiaddr, Protocol};

#[macro_export]
Expand Down Expand Up @@ -84,3 +86,33 @@ impl Future for HeartBeat {
Poll::Pending
}
}

#[derive(Debug, Display, PartialEq, Eq, Serialize, Deserialize, Clone)]
#[display(fmt = "{}:{}", host, port)]
pub struct ConnectedAddr {
host: String,
port: u16,
}

impl From<&Multiaddr> for ConnectedAddr {
fn from(multiaddr: &Multiaddr) -> Self {
use tentacle::multiaddr::Protocol::*;

let mut host = None;
let mut port = 0u16;

for comp in multiaddr.iter() {
match comp {
IP4(ip_addr) => host = Some(ip_addr.to_string()),
IP6(ip_addr) => host = Some(ip_addr.to_string()),
DNS4(dns_addr) | DNS6(dns_addr) => host = Some(dns_addr.to_string()),
TLS(tls_addr) => host = Some(tls_addr.to_string()),
TCP(p) => port = p,
_ => (),
}
}

let host = host.unwrap_or_else(|| multiaddr.to_string());
ConnectedAddr { host, port }
}
}
13 changes: 9 additions & 4 deletions core/network/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use tentacle::{ProtocolId, SessionId};

use protocol::{types::Address, ProtocolError, ProtocolErrorKind};

use crate::common::ConnectedAddr;

#[derive(Debug, Display)]
pub enum ErrorKind {
#[display(fmt = "{} offline", _0)]
Expand Down Expand Up @@ -40,14 +42,17 @@ pub enum ErrorKind {
#[display(fmt = "kind: session id not found in context")]
NoSessionId,

#[display(fmt = "kind: remote peer id not found in context")]
NoRemotePeerId,

#[display(fmt = "kind: rpc id not found in context")]
NoRpcId,

#[display(fmt = "kind: rpc future dropped")]
RpcDropped,
#[display(fmt = "kind: rpc future dropped {:?}", _0)]
RpcDropped(Option<ConnectedAddr>),

#[display(fmt = "kind: rpc timeout")]
RpcTimeout,
#[display(fmt = "kind: rpc timeout {:?}", _0)]
RpcTimeout(Option<ConnectedAddr>),

#[display(fmt = "kind: not reactor register for {}", _0)]
NoReactor(String),
Expand Down
12 changes: 9 additions & 3 deletions core/network/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@ pub mod serde_multi;
use derive_more::Constructor;
use prost::Message;
use protocol::Bytes;
use tentacle::SessionId;
use tentacle::{secio::PeerId, SessionId};

use crate::{
common::ConnectedAddr,
endpoint::Endpoint,
error::{ErrorKind, NetworkError},
};

#[derive(Constructor)]
#[non_exhaustive]
pub struct RawSessionMessage {
pub(crate) sid: SessionId,
pub(crate) pid: PeerId,
pub(crate) msg: Bytes,
}

Expand Down Expand Up @@ -49,9 +52,12 @@ impl NetworkMessage {
}

#[derive(Constructor)]
#[non_exhaustive]
pub struct SessionMessage {
pub(crate) sid: SessionId,
pub(crate) msg: NetworkMessage,
pub(crate) sid: SessionId,
pub(crate) pid: PeerId,
pub(crate) msg: NetworkMessage,
pub(crate) connected_addr: Option<ConnectedAddr>,
}

#[cfg(test)]
Expand Down
10 changes: 8 additions & 2 deletions core/network/src/outbound/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures::future::{self, Either};
use futures_timer::Delay;
use log::warn;
use protocol::{
traits::{Context, MessageCodec, Priority, Rpc},
Bytes, ProtocolResult,
Expand Down Expand Up @@ -64,6 +65,7 @@ where
let endpoint = end.parse::<Endpoint>()?;
let sid = cx.session_id()?;
let rid = self.map.next_rpc_id();
let connected_addr = cx.remote_connected_addr();
let done_rx = self.map.insert::<R>(sid, rid);

let data = msg.encode().await?;
Expand All @@ -75,10 +77,14 @@ where
let timeout = Delay::new(self.timeout.rpc);
let ret = match future::select(done_rx, timeout).await {
Either::Left((ret, _timeout)) => {
ret.map_err(|_| NetworkError::from(ErrorKind::RpcDropped))?
ret.map_err(|_| NetworkError::from(ErrorKind::RpcDropped(connected_addr)))?
}
Either::Right((_unresolved, _timeout)) => {
return Err(NetworkError::from(ErrorKind::RpcTimeout).into());
if let Err(err) = self.map.take::<R>(sid, rid) {
warn!("rpc: remove {}, maybe we just got response", err);
}

return Err(NetworkError::from(ErrorKind::RpcTimeout(connected_addr)).into());
}
};

Expand Down
17 changes: 13 additions & 4 deletions core/network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod ident;
mod peer;
mod persist;

use peer::{ConnectedAddr, PeerState};
use peer::PeerState;
use persist::{NoopPersistence, PeerPersistence, Persistence};

pub use disc::DiscoveryAddrManager;
Expand Down Expand Up @@ -44,9 +44,10 @@ use tentacle::{
};

use crate::{
common::HeartBeat,
common::{ConnectedAddr, HeartBeat},
error::NetworkError,
event::{ConnectionEvent, ConnectionType, MultiUsersMessage, PeerManagerEvent, Session},
traits::PeerInfoQuerier,
};

const MAX_RETRY_COUNT: usize = 6;
Expand Down Expand Up @@ -471,6 +472,12 @@ impl PeerManagerHandle {
}
}

impl PeerInfoQuerier for PeerManagerHandle {
fn connected_addr(&self, pid: &PeerId) -> Option<ConnectedAddr> {
self.inner.connected_addr(pid)
}
}

pub struct PeerManager {
// core peer pool
inner: Arc<Inner>,
Expand Down Expand Up @@ -1038,10 +1045,12 @@ impl Future for PeerManager {
self.process_event(event);
}

let connected_peers_addr = self.connected_peers_addr();
debug!(
"network: {:?}: connected peer_addr(s): {:?}",
"network: {:?}: connected peer_addr(s) {}: {:?}",
self.peer_id,
self.connected_peers_addr()
connected_peers_addr.len(),
connected_peers_addr
);

// Check connecting count
Expand Down
32 changes: 2 additions & 30 deletions core/network/src/peer_manager/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,39 +13,11 @@ use tentacle::{
secio::{PeerId, PublicKey},
};

use crate::common::ConnectedAddr;

pub const BACKOFF_BASE: usize = 5;
pub const VALID_ATTEMPT_INTERVAL: u64 = 4;

#[derive(Debug, Display, PartialEq, Eq, Serialize, Deserialize, Clone)]
#[display(fmt = "{}:{}", host, port)]
pub struct ConnectedAddr {
host: String,
port: u16,
}

impl From<&Multiaddr> for ConnectedAddr {
fn from(multiaddr: &Multiaddr) -> Self {
use tentacle::multiaddr::Protocol::*;

let mut host = None;
let mut port = 0u16;

for comp in multiaddr.iter() {
match comp {
IP4(ip_addr) => host = Some(ip_addr.to_string()),
IP6(ip_addr) => host = Some(ip_addr.to_string()),
DNS4(dns_addr) | DNS6(dns_addr) => host = Some(dns_addr.to_string()),
TLS(tls_addr) => host = Some(tls_addr.to_string()),
TCP(p) => port = p,
_ => (),
}
}

let host = host.unwrap_or_else(|| multiaddr.to_string());
ConnectedAddr { host, port }
}
}

// TODO: display next_retry
#[derive(Debug, Clone, Serialize, Deserialize, Display)]
#[display(
Expand Down
5 changes: 4 additions & 1 deletion core/network/src/protocols/transmitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ impl ServiceProtocol for Transmitter {
}

fn received(&mut self, ctx: ProtocolContextMutRef, data: tentacle::bytes::Bytes) {
let pubkey = ctx.session.remote_pubkey.as_ref();
// Peers without encryption will not able to connect to us.
let peer_id = pubkey.expect("impossible, no public key").peer_id();
let data = BytesMut::from(data.as_ref()).freeze();
let raw_msg = RawSessionMessage::new(ctx.session.id, data);

let raw_msg = RawSessionMessage::new(ctx.session.id, peer_id, data);
if self.msg_deliver.unbounded_send(raw_msg).is_err() {
error!("network: transmitter: msg receiver dropped");
}
Expand Down
20 changes: 17 additions & 3 deletions core/network/src/reactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,19 @@ where
let handler = Arc::clone(&self.handler);
let rpc_map = Arc::clone(&self.rpc_map);

let SessionMessage { sid, msg: net_msg } = smsg;
let SessionMessage {
sid,
msg: net_msg,
pid,
connected_addr,
..
} = smsg;

let endpoint = net_msg.url.to_owned();
let mut ctx = Context::new().set_session_id(sid);
let mut ctx = Context::new().set_session_id(sid).set_remote_peer_id(pid);
if let Some(connected_addr) = connected_addr {
ctx = ctx.set_remote_connected_addr(connected_addr);
}

let react = async move {
let endpoint = net_msg.url.parse::<Endpoint>()?;
Expand All @@ -78,8 +87,13 @@ where
}
EndpointScheme::RpcResponse => {
let rpc_endpoint = RpcEndpoint::try_from(endpoint)?;
let resp_tx = rpc_map.take::<M>(sid, rpc_endpoint.rpc_id().value())?;
let rpc_id = rpc_endpoint.rpc_id().value();
if !rpc_map.contains(sid, rpc_id) {
warn!("network: reactor: rpc entry not found, if there's timeout message, it's ok here");
return Ok(());
}

let resp_tx = rpc_map.take::<M>(sid, rpc_endpoint.rpc_id().value())?;
if resp_tx.send(content).is_err() {
let end = rpc_endpoint.endpoint().full_url();

Expand Down
21 changes: 17 additions & 4 deletions core/network/src/reactor/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use crate::{
endpoint::Endpoint,
error::{ErrorKind, NetworkError},
message::{NetworkMessage, RawSessionMessage, SessionMessage},
traits::Compression,
traits::{Compression, PeerInfoQuerier},
};

pub struct MessageRouter<C> {
pub struct MessageRouter<C, PQ> {
// Endpoint to reactor channel map
reactor_map: Arc<RwLock<HashMap<Endpoint, UnboundedSender<SessionMessage>>>>,

Expand All @@ -32,24 +32,30 @@ pub struct MessageRouter<C> {
// Compression to decompress message
compression: C,

// PeerInfo Querier
peer_info_querier: PQ,

// Fatal system error reporter
sys_tx: UnboundedSender<NetworkError>,
}

impl<C> MessageRouter<C>
impl<C, PQ> MessageRouter<C, PQ>
where
C: Compression + Send + Unpin + Clone + 'static,
PQ: PeerInfoQuerier + Send + Unpin + Clone + 'static,
{
pub fn new(
raw_msg_rx: UnboundedReceiver<RawSessionMessage>,
compression: C,
peer_info_querier: PQ,
sys_tx: UnboundedSender<NetworkError>,
) -> Self {
MessageRouter {
reactor_map: Default::default(),

raw_msg_rx,
compression,
peer_info_querier,

sys_tx,
}
Expand All @@ -66,6 +72,7 @@ where
pub fn route_raw_message(&self, raw_msg: RawSessionMessage) -> impl Future<Output = ()> {
let reactor_map = Arc::clone(&self.reactor_map);
let compression = self.compression.clone();
let peer_info_querier = self.peer_info_querier.clone();
let sys_tx = self.sys_tx.clone();

let route = async move {
Expand All @@ -78,9 +85,14 @@ where
let opt_smsg_tx = reactor_map.get(&endpoint).cloned();
let smsg_tx = opt_smsg_tx.ok_or_else(|| ErrorKind::NoReactor(endpoint.root()))?;

// Peer may disconnect when we try to fetch its connected address.
// This connected addr is mainly for debug purpose, so no error.
let connected_addr = peer_info_querier.connected_addr(&raw_msg.pid);
let smsg = SessionMessage {
sid: raw_msg.sid,
pid: raw_msg.pid,
msg: net_msg,
connected_addr,
};

if smsg_tx.unbounded_send(smsg).is_err() {
Expand All @@ -97,9 +109,10 @@ where
}
}

impl<C> Future for MessageRouter<C>
impl<C, PQ> Future for MessageRouter<C, PQ>
where
C: Compression + Send + Unpin + Clone + 'static,
PQ: PeerInfoQuerier + Send + Unpin + Clone + 'static,
{
type Output = ();

Expand Down
5 changes: 5 additions & 0 deletions core/network/src/rpc_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ impl RpcMap {
done_rx
}

pub fn contains(&self, sid: SessionId, rid: u64) -> bool {
let key = Key::new(sid, rid);
self.map.read().contains_key(&key)
}

pub fn take<T: Send + 'static>(
&self,
sid: SessionId,
Expand Down
Loading