Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Squash network inbound queues #13956

Merged
merged 37 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
374aa04
squash inbound path inital work. local cluster works.
brianolson Jun 28, 2024
9865dd7
clippy passes
brianolson Jul 9, 2024
9573f16
network test fixes
brianolson Jul 9, 2024
5c1a6e3
Merge branch 'main' into squash-inbound
brianolson Jul 9, 2024
f5b929f
cleanup. fmt. metrics.
brianolson Jul 9, 2024
3af93ab
delete dead code
brianolson Jul 10, 2024
6f364dd
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 10, 2024
b645b2e
PeerNotification goes away
brianolson Jul 10, 2024
841d909
PeerNotification goes away
brianolson Jul 10, 2024
dac4777
comment deprecate PeerManagerNotification
brianolson Jul 10, 2024
1bc9f43
Merge branch 'squash-inbound' into squash-inbound-2
brianolson Jul 10, 2024
62b6dff
add inbound queue delay metric aptos_network_inbound_queue_time
brianolson Jul 10, 2024
64f3ea8
Merge branch 'squash-inbound' into squash-inbound-2
brianolson Jul 10, 2024
026c14a
fmt
brianolson Jul 10, 2024
34bac4e
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 10, 2024
085901a
fmt
brianolson Jul 11, 2024
f41fc0d
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 11, 2024
aec0b06
rx_at -> receive_timestamp_micros
brianolson Jul 11, 2024
28435c3
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 12, 2024
b3c2b64
Merge branch 'squash-inbound' into squash-inbound-2
brianolson Jul 12, 2024
adce732
less PeerManagerNotification
brianolson Jul 15, 2024
c8a822c
PR cleanup
brianolson Jul 15, 2024
b135f3b
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 15, 2024
f935228
cleanup and PR refactor
brianolson Jul 15, 2024
7e4d5bb
unix_micros() util
brianolson Jul 15, 2024
26ec289
cleanup
brianolson Jul 15, 2024
ddba8aa
Merge branch 'squash-inbound' into squash-inbound-2
brianolson Jul 15, 2024
834ab52
fix mempool tests
brianolson Jul 15, 2024
925ed26
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 16, 2024
09081fd
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 17, 2024
506982b
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 18, 2024
504162b
drop unused max_concurrent_network_reqs
brianolson Jul 19, 2024
f5dfe7d
drop unused PeerManagerNotification
brianolson Jul 19, 2024
9298a59
count UNKNOWN_LABEL inbound messages
brianolson Jul 19, 2024
c04a29c
Merge remote-tracking branch 'origin/main' into squash-inbound-2
brianolson Jul 19, 2024
a744ce8
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 19, 2024
5634764
PR cleanup
brianolson Jul 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 84 additions & 59 deletions consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
test_utils::{self, consensus_runtime, placeholder_ledger_info, timed_block_on},
};
use aptos_channels::{self, aptos_channel, message_queues::QueueStyle};
use aptos_config::network_id::NetworkId;
use aptos_config::network_id::{NetworkId, PeerNetworkId};
use aptos_consensus_types::{
block::{block_test_utils::certificate_for_genesis, Block},
common::Author,
Expand All @@ -21,14 +21,13 @@ use aptos_consensus_types::{
use aptos_infallible::{Mutex, RwLock};
use aptos_network::{
application::storage::PeersAndMetadata,
peer_manager::{
ConnectionRequestSender, PeerManagerNotification, PeerManagerRequest,
PeerManagerRequestSender,
},
peer_manager::{ConnectionRequestSender, PeerManagerRequest, PeerManagerRequestSender},
protocols::{
network::{NewNetworkEvents, RpcError, SerializedRequest},
rpc::InboundRpcRequest,
wire::handshake::v1::ProtocolIdSet,
network::{NewNetworkEvents, ReceivedMessage, RpcError, SerializedRequest},
wire::{
handshake::v1::ProtocolIdSet,
messaging::v1::{DirectSendMsg, NetworkMessage, RpcRequest},
},
},
ProtocolId,
};
Expand Down Expand Up @@ -65,11 +64,8 @@ pub struct NetworkPlayground {
/// These events will usually be handled by the event loop spawned in
/// `ConsensusNetworkImpl`.
///
node_consensus_txs: Arc<
Mutex<
HashMap<TwinId, aptos_channel::Sender<(PeerId, ProtocolId), PeerManagerNotification>>,
>,
>,
node_consensus_txs:
Arc<Mutex<HashMap<TwinId, aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>>>>,
/// Nodes' outbound handlers forward their outbound non-rpc messages to this
/// queue.
outbound_msgs_tx: mpsc::Sender<(TwinId, PeerManagerRequest)>,
Expand Down Expand Up @@ -131,12 +127,7 @@ impl NetworkPlayground {
mut network_reqs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
mut outbound_msgs_tx: mpsc::Sender<(TwinId, PeerManagerRequest)>,
node_consensus_txs: Arc<
Mutex<
HashMap<
TwinId,
aptos_channel::Sender<(PeerId, ProtocolId), PeerManagerNotification>,
>,
>,
Mutex<HashMap<TwinId, aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>>>,
>,
author_to_twin_ids: Arc<RwLock<AuthorToTwinIds>>,
) {
Expand Down Expand Up @@ -175,16 +166,23 @@ impl NetworkPlayground {
let node_consensus_tx =
node_consensus_txs.lock().get(dst_twin_id).unwrap().clone();

let inbound_req = InboundRpcRequest {
protocol_id: outbound_req.protocol_id,
data: outbound_req.data,
res_tx: outbound_req.res_tx,
};

node_consensus_tx
.push(
(src_twin_id.author, ProtocolId::ConsensusRpcBcs),
PeerManagerNotification::RecvRpc(src_twin_id.author, inbound_req),
ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest {
protocol_id: outbound_req.protocol_id,
request_id: 123,
priority: 0,
raw_request: outbound_req.data.into(),
}),
sender: PeerNetworkId::new(
NetworkId::Validator,
src_twin_id.author,
),
receive_timestamp_micros: 0,
rpc_replier: Some(Arc::new(outbound_req.res_tx)),
},
)
.unwrap();
},
Expand All @@ -201,7 +199,7 @@ impl NetworkPlayground {
pub fn add_node(
&mut self,
twin_id: TwinId,
consensus_tx: aptos_channel::Sender<(PeerId, ProtocolId), PeerManagerNotification>,
consensus_tx: aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>,
network_reqs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
conn_mgr_reqs_rx: aptos_channels::Receiver<aptos_network::ConnectivityRequest>,
) {
Expand Down Expand Up @@ -231,7 +229,7 @@ impl NetworkPlayground {
&mut self,
src_twin_id: TwinId,
dst_twin_id: TwinId,
msg_notif: PeerManagerNotification,
rmsg: ReceivedMessage,
) -> (Author, ConsensusMsg) {
let node_consensus_tx = self
.node_consensus_txs
Expand All @@ -241,21 +239,24 @@ impl NetworkPlayground {
.clone();

// copy message data
let msg_copy = match &msg_notif {
PeerManagerNotification::RecvMessage(src, msg) => {
let msg: ConsensusMsg = msg.to_message().unwrap();
(*src, msg)
let source_address = rmsg.sender.peer_id();
let consensus_msg = match &rmsg.message {
NetworkMessage::DirectSendMsg(dmsg) => dmsg
.protocol_id
.from_bytes(dmsg.raw_msg.as_slice())
.unwrap(),
wrong_message => {
panic!(
"[network playground] Unexpected ReceivedMessage: {:?}",
wrong_message
);
},
msg_notif => panic!(
"[network playground] Unexpected PeerManagerNotification: {:?}",
msg_notif
),
};
let _ = node_consensus_tx.push(
(src_twin_id.author, ProtocolId::ConsensusDirectSendBcs),
msg_notif,
rmsg,
);
msg_copy
(source_address, consensus_msg)
}

/// Wait for exactly `num_messages` to be enqueued and delivered. Return a
Expand All @@ -276,7 +277,7 @@ impl NetworkPlayground {
let (src_twin_id, net_req) = self.outbound_msgs_rx.next().await
.expect("[network playground] waiting for messages, but message queue has shutdown unexpectedly");

// Convert PeerManagerRequest to corresponding PeerManagerNotification,
// Convert PeerManagerRequest to corresponding ReceivedMessage,
// and extract destination peer
let (dst, msg) = match &net_req {
PeerManagerRequest::SendDirectSend(dst_inner, msg_inner) => {
Expand All @@ -294,11 +295,17 @@ impl NetworkPlayground {

// Deliver and copy message if it's not dropped
if !self.is_message_dropped(&src_twin_id, dst_twin_id, consensus_msg) {
let msg_notif =
PeerManagerNotification::RecvMessage(src_twin_id.author, msg.clone());
let msg_copy = self
.deliver_message(src_twin_id, *dst_twin_id, msg_notif)
.await;
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
priority: 0,
raw_msg: msg.mdata.clone().into(),
}),
sender: PeerNetworkId::new(NetworkId::Validator, src_twin_id.author),
receive_timestamp_micros: 0,
rpc_replier: None,
};
let msg_copy = self.deliver_message(src_twin_id, *dst_twin_id, rmsg).await;

// Only insert msg_copy once for twins (if delivered)
if idx == 0 && msg_inspector(&msg_copy) {
Expand Down Expand Up @@ -395,7 +402,7 @@ impl NetworkPlayground {
pub async fn start(mut self) {
// Take the next queued message
while let Some((src_twin_id, net_req)) = self.outbound_msgs_rx.next().await {
// Convert PeerManagerRequest to corresponding PeerManagerNotification,
// Convert PeerManagerRequest to corresponding ReceivedMessage,
// and extract destination peer
let (dst, msg) = match &net_req {
PeerManagerRequest::SendDirectSend(dst_inner, msg_inner) => {
Expand All @@ -410,14 +417,21 @@ impl NetworkPlayground {
let dst_twin_ids = self.get_twin_ids(dst);

for dst_twin_id in dst_twin_ids.iter() {
let msg_notif =
PeerManagerNotification::RecvMessage(src_twin_id.author, msg.clone());
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
priority: 0,
raw_msg: msg.mdata.clone().into(),
}),
sender: PeerNetworkId::new(NetworkId::Validator, src_twin_id.author),
receive_timestamp_micros: 0,
rpc_replier: None,
};
let consensus_msg = msg.to_message().unwrap();

// Deliver and copy message it if it's not dropped
if !self.is_message_dropped(&src_twin_id, dst_twin_id, consensus_msg) {
self.deliver_message(src_twin_id, *dst_twin_id, msg_notif)
.await;
self.deliver_message(src_twin_id, *dst_twin_id, rmsg).await;
}
}
}
Expand Down Expand Up @@ -531,7 +545,6 @@ mod tests {
storage::PeersAndMetadata,
},
protocols::{
direct_send::Message,
network,
network::{NetworkEvents, NewNetworkSender},
},
Expand Down Expand Up @@ -849,10 +862,16 @@ mod tests {

let peer_id = PeerId::random();
let protocol_id = ProtocolId::ConsensusDirectSendBcs;
let bad_msg = PeerManagerNotification::RecvMessage(peer_id, Message {
protocol_id,
mdata: Bytes::from_static(b"\xde\xad\xbe\xef"),
});
let bad_msg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id,
priority: 0,
raw_msg: Bytes::from_static(b"\xde\xad\xbe\xef").into(),
}),
sender: PeerNetworkId::new(NetworkId::Validator, peer_id),
receive_timestamp_micros: 0,
rpc_replier: None,
};

peer_mgr_notifs_tx
.push((peer_id, protocol_id), bad_msg)
Expand All @@ -864,11 +883,17 @@ mod tests {

let protocol_id = ProtocolId::ConsensusRpcJson;
let (res_tx, _res_rx) = oneshot::channel();
let liveness_check_msg = PeerManagerNotification::RecvRpc(peer_id, InboundRpcRequest {
protocol_id,
data: Bytes::from(serde_json::to_vec(&liveness_check_msg).unwrap()),
res_tx,
});
let liveness_check_msg = ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest {
protocol_id,
request_id: 0, // TODO: seq?
priority: 0,
raw_request: Bytes::from(serde_json::to_vec(&liveness_check_msg).unwrap()).into(),
}),
sender: PeerNetworkId::new(NetworkId::Validator, peer_id),
receive_timestamp_micros: 0,
rpc_replier: Some(Arc::new(res_tx)),
};

peer_mgr_notifs_tx
.push((peer_id, protocol_id), liveness_check_msg)
Expand Down
39 changes: 28 additions & 11 deletions mempool/src/tests/multi_node_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ use aptos_config::{
};
use aptos_netcore::transport::ConnectionOrigin;
use aptos_network::{
peer_manager::{PeerManagerNotification, PeerManagerRequest},
peer_manager::PeerManagerRequest,
protocols::{
network::ReceivedMessage,
wire::messaging::v1::{DirectSendMsg, NetworkMessage},
},
ProtocolId,
};
use aptos_types::{transaction::SignedTransaction, PeerId};
Expand Down Expand Up @@ -362,12 +366,18 @@ impl TestHarness {
let receiver_id =
*self.peer_to_node_id.get(&lookup_peer_network_id).unwrap();
let receiver = self.mut_node(&receiver_id);
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
priority: 0,
raw_msg: msg.mdata.into(),
}),
sender: PeerNetworkId::new(network_id, sender_peer_id),
receive_timestamp_micros: 0,
rpc_replier: None,
};

receiver.send_network_req(
network_id,
ProtocolId::MempoolDirectSend,
PeerManagerNotification::RecvMessage(sender_peer_id, msg),
);
receiver.send_network_req(network_id, ProtocolId::MempoolDirectSend, rmsg);
receiver.wait_for_event(SharedMempoolNotification::NewTransactions);

// Verify transaction was inserted into Mempool
Expand Down Expand Up @@ -432,11 +442,18 @@ impl TestHarness {
let receiver_id =
*self.peer_to_node_id.get(&lookup_peer_network_id).unwrap();
let receiver = self.mut_node(&receiver_id);
receiver.send_network_req(
network_id,
ProtocolId::MempoolDirectSend,
PeerManagerNotification::RecvMessage(sender_peer_id, msg),
);
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
priority: 0,
raw_msg: msg.mdata.into(),
}),
sender: PeerNetworkId::new(network_id, sender_peer_id),
receive_timestamp_micros: 0,
rpc_replier: None,
};

receiver.send_network_req(network_id, ProtocolId::MempoolDirectSend, rmsg);
},
request => panic!(
"did not receive expected broadcast ACK, instead got {:?}",
Expand Down
23 changes: 9 additions & 14 deletions mempool/src/tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ use aptos_network::{
interface::{NetworkClient, NetworkServiceEvents},
storage::PeersAndMetadata,
},
peer_manager::{
ConnectionRequestSender, PeerManagerNotification, PeerManagerRequest,
PeerManagerRequestSender,
},
peer_manager::{ConnectionRequestSender, PeerManagerRequest, PeerManagerRequestSender},
protocols::{
network::{NetworkEvents, NetworkSender, NewNetworkEvents, NewNetworkSender},
network::{
NetworkEvents, NetworkSender, NewNetworkEvents, NewNetworkSender, ReceivedMessage,
},
wire::handshake::v1::ProtocolId::MempoolDirectSend,
},
transport::ConnectionMetadata,
Expand Down Expand Up @@ -449,12 +448,12 @@ impl Node {
.get_next_network_req(runtime)
}

/// Send network request `PeerManagerNotification` from a remote peer to the local node
/// Send network request `ReceivedMessage` from a remote peer to the local node
pub fn send_network_req(
&mut self,
network_id: NetworkId,
protocol: ProtocolId,
notif: PeerManagerNotification,
notif: ReceivedMessage,
) {
self.get_network_interface(network_id)
.send_network_req(protocol, notif);
Expand All @@ -467,20 +466,16 @@ pub struct NodeNetworkInterface {
/// Peer request receiver for messages
pub(crate) network_reqs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
/// Peer notification sender for sending outgoing messages to other peers
pub(crate) network_notifs_tx:
aptos_channel::Sender<(PeerId, ProtocolId), PeerManagerNotification>,
pub(crate) network_notifs_tx: aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>,
}

impl NodeNetworkInterface {
fn get_next_network_req(&mut self, runtime: Arc<Runtime>) -> PeerManagerRequest {
runtime.block_on(self.network_reqs_rx.next()).unwrap()
}

fn send_network_req(&mut self, protocol: ProtocolId, message: PeerManagerNotification) {
let remote_peer_id = match &message {
PeerManagerNotification::RecvRpc(peer_id, _) => *peer_id,
PeerManagerNotification::RecvMessage(peer_id, _) => *peer_id,
};
fn send_network_req(&mut self, protocol: ProtocolId, message: ReceivedMessage) {
let remote_peer_id = message.sender.peer_id();

self.network_notifs_tx
.push((remote_peer_id, protocol), message)
Expand Down
Loading
Loading