Skip to content

Commit

Permalink
fix mempool tests
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Jul 15, 2024
1 parent ddba8aa commit 834ab52
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 42 deletions.
1 change: 0 additions & 1 deletion mempool/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ mod core_mempool_test;
#[cfg(test)]
mod integration_tests;
#[cfg(test)]
#[cfg(wrong_network_abstraction)]
mod multi_node_test;
#[cfg(test)]
mod node;
Expand Down
39 changes: 28 additions & 11 deletions mempool/src/tests/multi_node_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ use aptos_config::{
};
use aptos_netcore::transport::ConnectionOrigin;
use aptos_network::{
peer_manager::{PeerManagerNotification, PeerManagerRequest},
peer_manager::PeerManagerRequest,
protocols::{
network::ReceivedMessage,
wire::messaging::v1::{DirectSendMsg, NetworkMessage},
},
ProtocolId,
};
use aptos_types::{transaction::SignedTransaction, PeerId};
Expand Down Expand Up @@ -362,12 +366,18 @@ impl TestHarness {
let receiver_id =
*self.peer_to_node_id.get(&lookup_peer_network_id).unwrap();
let receiver = self.mut_node(&receiver_id);
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
priority: 0,
raw_msg: msg.mdata.into(),
}),
sender: PeerNetworkId::new(network_id, sender_peer_id),
receive_timestamp_micros: 0,
rpc_replier: None,
};

receiver.send_network_req(
network_id,
ProtocolId::MempoolDirectSend,
PeerManagerNotification::RecvMessage(sender_peer_id, msg),
);
receiver.send_network_req(network_id, ProtocolId::MempoolDirectSend, rmsg);
receiver.wait_for_event(SharedMempoolNotification::NewTransactions);

// Verify transaction was inserted into Mempool
Expand Down Expand Up @@ -432,11 +442,18 @@ impl TestHarness {
let receiver_id =
*self.peer_to_node_id.get(&lookup_peer_network_id).unwrap();
let receiver = self.mut_node(&receiver_id);
receiver.send_network_req(
network_id,
ProtocolId::MempoolDirectSend,
PeerManagerNotification::RecvMessage(sender_peer_id, msg),
);
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
priority: 0,
raw_msg: msg.mdata.into(),
}),
sender: PeerNetworkId::new(network_id, sender_peer_id),
receive_timestamp_micros: 0,
rpc_replier: None,
};

receiver.send_network_req(network_id, ProtocolId::MempoolDirectSend, rmsg);
},
request => panic!(
"did not receive expected broadcast ACK, instead got {:?}",
Expand Down
77 changes: 47 additions & 30 deletions mempool/src/tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,54 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{
core_mempool::{CoreMempool, TimelineState},
network::MempoolSyncMsg,
shared_mempool::{start_shared_mempool, types::SharedMempoolNotification},
tests::common::TestTransaction,
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::{
config::{PeerRole, RoleType},
config::{Identity, NodeConfig, PeerRole, RoleType},
network_id::{NetworkId, PeerNetworkId},
};
use aptos_types::PeerId;
use aptos_crypto::{x25519::PrivateKey, Uniform};
use aptos_event_notifications::{ReconfigNotification, ReconfigNotificationListener};
use aptos_infallible::{Mutex, MutexGuard, RwLock};
use aptos_netcore::transport::ConnectionOrigin;
use aptos_network::{
application::{
interface::{NetworkClient, NetworkServiceEvents},
storage::PeersAndMetadata,
},
peer_manager::{ConnectionRequestSender, PeerManagerRequest, PeerManagerRequestSender},
protocols::{
network::{
NetworkEvents, NetworkSender, NewNetworkEvents, NewNetworkSender, ReceivedMessage,
},
wire::handshake::v1::ProtocolId::MempoolDirectSend,
},
transport::ConnectionMetadata,
ProtocolId,
};
use aptos_storage_interface::mock::MockDbReaderWriter;
use aptos_types::{
on_chain_config::{InMemoryOnChainConfig, OnChainConfigPayload},
PeerId,
};
use aptos_vm_validator::mocks::mock_vm_validator::MockVMValidator;
use enum_dispatch::enum_dispatch;
use std::collections::HashSet;
use futures::{
channel::mpsc::{self, unbounded, UnboundedReceiver},
FutureExt, StreamExt,
};
use rand::rngs::StdRng;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::runtime::Runtime;

#[cfg(wrong_network_abstraction)]
type MempoolNetworkHandle = (
NetworkId,
NetworkSender<MempoolSyncMsg>,
Expand All @@ -25,7 +64,6 @@ pub struct NodeId {
num: u32,
}

#[cfg(wrong_network_abstraction)]
impl NodeId {
pub(crate) fn new(node_type: NodeType, num: u32) -> Self {
NodeId { node_type, num }
Expand All @@ -34,14 +72,10 @@ impl NodeId {

/// Yet another type, to determine the differences between
/// Validators, ValidatorFullNodes, and FullNodes
/// TODO: much code is currently disabled under [cfg(wrong_network_abstraction)], NodeType may or may not come back when that changes?
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
pub enum NodeType {
#[allow(unused)]
Validator,
#[allow(unused)]
ValidatorFullNode,
#[allow(unused)]
FullNode,
}

Expand Down Expand Up @@ -103,7 +137,6 @@ pub struct ValidatorNodeInfo {
vfn_peer_id: PeerId,
}

#[cfg(wrong_network_abstraction)]
impl ValidatorNodeInfo {
fn new(peer_id: PeerId, vfn_peer_id: PeerId) -> Self {
ValidatorNodeInfo {
Expand Down Expand Up @@ -141,7 +174,6 @@ pub struct ValidatorFullNodeInfo {
vfn_peer_id: PeerId,
}

#[cfg(wrong_network_abstraction)]
impl ValidatorFullNodeInfo {
fn new(peer_id: PeerId, vfn_peer_id: PeerId) -> Self {
ValidatorFullNodeInfo {
Expand Down Expand Up @@ -179,7 +211,6 @@ pub struct FullNodeInfo {
peer_role: PeerRole,
}

#[cfg(wrong_network_abstraction)]
impl FullNodeInfo {
fn new(peer_id: PeerId, peer_role: PeerRole) -> Self {
FullNodeInfo { peer_id, peer_role }
Expand Down Expand Up @@ -209,7 +240,6 @@ impl NodeInfoTrait for FullNodeInfo {
}

/// Provides a `NodeInfo` and `NodeConfig` for a validator
#[cfg(wrong_network_abstraction)]
pub fn validator_config(rng: &mut StdRng) -> (ValidatorNodeInfo, NodeConfig) {
let config = NodeConfig::generate_random_config_with_template(
&NodeConfig::get_default_validator_config(),
Expand All @@ -228,7 +258,6 @@ pub fn validator_config(rng: &mut StdRng) -> (ValidatorNodeInfo, NodeConfig) {
}

/// Provides a `NodeInfo` and `NodeConfig` for a ValidatorFullNode
#[cfg(wrong_network_abstraction)]
pub fn vfn_config(rng: &mut StdRng, peer_id: PeerId) -> (ValidatorFullNodeInfo, NodeConfig) {
let mut vfn_config = NodeConfig::generate_random_config_with_template(
&NodeConfig::get_default_vfn_config(),
Expand Down Expand Up @@ -262,7 +291,6 @@ pub fn vfn_config(rng: &mut StdRng, peer_id: PeerId) -> (ValidatorFullNodeInfo,
}

/// Provides a `NodeInfo` and `NodeConfig` for a public full node
#[cfg(wrong_network_abstraction)]
pub fn public_full_node_config(
rng: &mut StdRng,
peer_role: PeerRole,
Expand All @@ -282,7 +310,6 @@ pub fn public_full_node_config(
}

/// A struct representing a node, it's network interfaces, mempool, and a mempool event subscriber
#[cfg(wrong_network_abstraction)]
pub struct Node {
/// The identifying Node
node_info: NodeInfo,
Expand All @@ -299,7 +326,6 @@ pub struct Node {
}

/// Reimplement `NodeInfoTrait` for simplicity
#[cfg(wrong_network_abstraction)]
impl NodeInfoTrait for Node {
fn supported_networks(&self) -> Vec<NetworkId> {
self.node_info.supported_networks()
Expand All @@ -318,7 +344,6 @@ impl NodeInfoTrait for Node {
}
}

#[cfg(wrong_network_abstraction)]
impl Node {
/// Sets up a single node by starting up mempool and any network handles
pub fn new(node: NodeInfo, config: NodeConfig) -> Node {
Expand Down Expand Up @@ -423,12 +448,12 @@ impl Node {
.get_next_network_req(runtime)
}

/// Send network request `PeerManagerNotification` from a remote peer to the local node
/// Send network request `ReceivedMessage` from a remote peer to the local node
pub fn send_network_req(
&mut self,
network_id: NetworkId,
protocol: ProtocolId,
notif: PeerManagerNotification,
notif: ReceivedMessage,
) {
self.get_network_interface(network_id)
.send_network_req(protocol, notif);
Expand All @@ -437,25 +462,20 @@ impl Node {

/// A simplistic view of the entire network stack for a given `NetworkId`
/// Allows us to mock out the network without dealing with the details
#[cfg(wrong_network_abstraction)]
pub struct NodeNetworkInterface {
/// Peer request receiver for messages
pub(crate) network_reqs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
/// Peer notification sender for sending outgoing messages to other peers
pub(crate) network_notifs_tx: aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>,
}

#[cfg(wrong_network_abstraction)]
impl NodeNetworkInterface {
fn get_next_network_req(&mut self, runtime: Arc<Runtime>) -> PeerManagerRequest {
runtime.block_on(self.network_reqs_rx.next()).unwrap()
}

fn send_network_req(&mut self, protocol: ProtocolId, message: PeerManagerNotification) {
let remote_peer_id = match &message {
PeerManagerNotification::RecvRpc(peer_id, _) => *peer_id,
PeerManagerNotification::RecvMessage(peer_id, _) => *peer_id,
};
fn send_network_req(&mut self, protocol: ProtocolId, message: ReceivedMessage) {
let remote_peer_id = message.sender.peer_id();

self.network_notifs_tx
.push((remote_peer_id, protocol), message)
Expand All @@ -466,7 +486,6 @@ impl NodeNetworkInterface {
// Below here are static functions to help build a new `Node`

/// Sets up the network handles for a `Node`
#[cfg(wrong_network_abstraction)]
fn setup_node_network_interfaces(
node: &NodeInfo,
) -> (
Expand Down Expand Up @@ -510,7 +529,6 @@ fn setup_node_network_interfaces(
}

/// Builds a single network interface with associated queues, and attaches it to the top level network
#[cfg(wrong_network_abstraction)]
fn setup_node_network_interface(
peer_network_id: PeerNetworkId,
) -> (NodeNetworkInterface, MempoolNetworkHandle) {
Expand All @@ -537,7 +555,6 @@ fn setup_node_network_interface(
}

/// Starts up the mempool resources for a single node
#[cfg(wrong_network_abstraction)]
fn start_node_mempool(
config: NodeConfig,
network_client: NetworkClient<MempoolSyncMsg>,
Expand Down

0 comments on commit 834ab52

Please sign in to comment.