From de7245df20e3fa1ed3e9cb440314902ff81ba991 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Tue, 27 Sep 2022 08:18:49 +0200 Subject: [PATCH] fix tests --- .gitignore | 3 +- .../core/src/mempool/sync_protocol/mod.rs | 29 +++++------ .../core/src/mempool/sync_protocol/test.rs | 48 +++++++++---------- .../test_utils/mocks/connectivity_manager.rs | 11 +++++ 4 files changed, 50 insertions(+), 41 deletions(-) diff --git a/.gitignore b/.gitignore index 9af37fbe5b..eba14fa03a 100644 --- a/.gitignore +++ b/.gitignore @@ -60,4 +60,5 @@ buildtools/Output/ clients/base_node_grpc_client/package-lock.json clients/validator_node_grpc_client/package-lock.json -clients/wallet_grpc_client/package-lock.json \ No newline at end of file +clients/wallet_grpc_client/package-lock.json +pie/ diff --git a/base_layer/core/src/mempool/sync_protocol/mod.rs b/base_layer/core/src/mempool/sync_protocol/mod.rs index 0f76c7b149..08ad3bfe36 100644 --- a/base_layer/core/src/mempool/sync_protocol/mod.rs +++ b/base_layer/core/src/mempool/sync_protocol/mod.rs @@ -185,10 +185,6 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static async fn handle_block_event(&mut self, block_event: &BlockEvent) { use BlockEvent::{BlockSyncComplete, ValidBlockAdded}; - if self.permits.available_permits() < 1 { - // Sync is already in progress, so we should not bother trying to sync. - return; - } match block_event { ValidBlockAdded(_, BlockAddResult::ChainReorg { added, removed: _ }) => { if added.len() < self.config.block_sync_trigger { @@ -205,21 +201,24 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static return; }, } - // we need to make sure the service can start a sync - if self.num_synched.load(Ordering::Acquire) >= self.config.initial_sync_num_peers { - self.num_synched.fetch_sub(1, Ordering::Release); - } - let connection = match self + // we want to at least sync initial_sync_num_peers, so we reset the num_synced to 0, so it can run till + // initial_sync_num_peers again. This is made to run as a best effort in that it will at least run the + // initial_sync_num_peers + self.num_synched.store(0, Ordering::Release); + let connections = match self .connectivity - .select_connections(ConnectivitySelection::random_nodes(1, vec![])) + .select_connections(ConnectivitySelection::random_nodes( + self.config.initial_sync_num_peers, + vec![], + )) .await { - Ok(mut v) => { + Ok(v) => { if v.is_empty() { - error!(target: LOG_TARGET, "Mempool sync could not get a peer to sync to"); + error!(target: LOG_TARGET, "Mempool sync could not get any peers to sync to"); return; }; - v.pop().unwrap() + v }, Err(e) => { error!( @@ -229,7 +228,9 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static return; }, }; - self.spawn_initiator_protocol(connection).await; + for connection in connections { + self.spawn_initiator_protocol(connection).await; + } } fn is_synched(&self) -> bool { diff --git a/base_layer/core/src/mempool/sync_protocol/test.rs b/base_layer/core/src/mempool/sync_protocol/test.rs index 82940122f2..281228587a 100644 --- a/base_layer/core/src/mempool/sync_protocol/test.rs +++ b/base_layer/core/src/mempool/sync_protocol/test.rs @@ -25,14 +25,14 @@ use std::{fmt, io, iter::repeat_with, sync::Arc}; use futures::{Sink, SinkExt, Stream, StreamExt}; use tari_common::configuration::Network; use tari_comms::{ - connectivity::{ConnectivityEvent, ConnectivityEventTx}, + connectivity::ConnectivityEvent, framing, memsocket::MemorySocket, message::MessageExt, peer_manager::PeerFeatures, protocol::{ProtocolEvent, ProtocolNotification, ProtocolNotificationTx}, test_utils::{ - mocks::{create_connectivity_mock, create_peer_connection_mock_pair}, + mocks::{create_connectivity_mock, create_peer_connection_mock_pair, ConnectivityManagerMockState}, node_identity::build_node_identity, }, Bytes, @@ -83,32 +83,37 @@ async fn setup( num_txns: usize, ) -> ( ProtocolNotificationTx, - ConnectivityEventTx, + ConnectivityManagerMockState, Mempool, Vec, ) { let (protocol_notif_tx, protocol_notif_rx) = mpsc::channel(1); - let (connectivity_events_tx, _) = broadcast::channel(10); let (mempool, transactions) = new_mempool_with_transactions(num_txns).await; - let (connectivity, _) = create_connectivity_mock(); + let (connectivity, connectivity_manager_mock) = create_connectivity_mock(); + let connectivity_manager_mock_state = connectivity_manager_mock.spawn(); let (block_event_sender, _) = broadcast::channel(1); let block_receiver = block_event_sender.subscribe(); - let protocol = MempoolSyncProtocol::new( Default::default(), protocol_notif_rx, mempool.clone(), - connectivity,block_receiver + connectivity, + block_receiver, ); task::spawn(protocol.run()); - - (protocol_notif_tx, connectivity_events_tx, mempool, transactions) + connectivity_manager_mock_state.wait_until_event_receivers_ready().await; + ( + protocol_notif_tx, + connectivity_manager_mock_state, + mempool, + transactions, + ) } #[tokio::test] async fn empty_set() { - let (_, connectivity_events_tx, mempool1, _) = setup(0).await; + let (_, connectivity_manager_state, mempool1, _) = setup(0).await; let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); @@ -116,9 +121,7 @@ async fn empty_set() { create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await; // This node connected to a peer, so it should open the substream - connectivity_events_tx - .send(ConnectivityEvent::PeerConnected(node2_conn)) - .unwrap(); + connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn)); let substream = node1_mock.next_incoming_substream().await.unwrap(); let framed = framing::canonical(substream, MAX_FRAME_SIZE); @@ -138,7 +141,7 @@ async fn empty_set() { #[tokio::test] async fn synchronise() { - let (_, connectivity_events_tx, mempool1, transactions1) = setup(5).await; + let (_, connectivity_manager_state, mempool1, transactions1) = setup(5).await; let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); @@ -146,9 +149,7 @@ async fn synchronise() { create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await; // This node connected to a peer, so it should open the substream - connectivity_events_tx - .send(ConnectivityEvent::PeerConnected(node2_conn)) - .unwrap(); + connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn)); let substream = node1_mock.next_incoming_substream().await.unwrap(); let framed = framing::canonical(substream, MAX_FRAME_SIZE); @@ -172,17 +173,14 @@ async fn synchronise() { #[tokio::test] async fn duplicate_set() { - let (_, connectivity_events_tx, mempool1, transactions1) = setup(2).await; - + let (_, connectivity_manager_state, mempool1, transactions1) = setup(2).await; let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); let (_node1_conn, node1_mock, node2_conn, _) = create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await; // This node connected to a peer, so it should open the substream - connectivity_events_tx - .send(ConnectivityEvent::PeerConnected(node2_conn)) - .unwrap(); + connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn)); let substream = node1_mock.next_incoming_substream().await.unwrap(); let framed = framing::canonical(substream, MAX_FRAME_SIZE); @@ -276,7 +274,7 @@ async fn initiator_messages() { #[tokio::test] async fn responder_messages() { - let (_, connectivity_events_tx, _, transactions1) = setup(1).await; + let (_, connectivity_manager_state, _, transactions1) = setup(1).await; let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); @@ -284,9 +282,7 @@ async fn responder_messages() { create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await; // This node connected to a peer, so it should open the substream - connectivity_events_tx - .send(ConnectivityEvent::PeerConnected(node2_conn)) - .unwrap(); + connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn)); let substream = node1_mock.next_incoming_substream().await.unwrap(); let mut framed = framing::canonical(substream, MAX_FRAME_SIZE); diff --git a/comms/core/src/test_utils/mocks/connectivity_manager.rs b/comms/core/src/test_utils/mocks/connectivity_manager.rs index ae29b9211c..9f89e6a34e 100644 --- a/comms/core/src/test_utils/mocks/connectivity_manager.rs +++ b/comms/core/src/test_utils/mocks/connectivity_manager.rs @@ -75,6 +75,17 @@ impl ConnectivityManagerMockState { } } + pub async fn wait_until_event_receivers_ready(&self) { + let mut timeout = 0; + while self.event_tx.receiver_count() == 0 { + time::sleep(Duration::from_millis(10)).await; + timeout += 10; + if timeout > 5000 { + panic!("Event receiver not ready after 5 secs"); + } + } + } + async fn add_call(&self, call_str: String) { self.with_state(|state| state.calls.push(call_str)).await }