Skip to content

Commit

Permalink
fix(core): fixes stale chain metadata being sent to listening state
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Dec 12, 2022
1 parent dc2cd82 commit f3d2c9c
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 374 deletions.
11 changes: 10 additions & 1 deletion base_layer/core/src/base_node/chain_metadata_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,19 @@ impl Display for PeerChainMetadata {

#[derive(Debug)]
pub enum ChainMetadataEvent {
PeerChainMetadataReceived(Vec<PeerChainMetadata>),
PeerChainMetadataReceived(PeerChainMetadata),
NetworkSilence,
}

impl ChainMetadataEvent {
pub fn peer_metadata(&self) -> Option<PeerChainMetadata> {
match self {
Self::PeerChainMetadataReceived(metadata) => Some(metadata.clone()),
_ => None,
}
}
}

#[derive(Clone)]
pub struct ChainMetadataHandle {
event_stream: broadcast::Sender<Arc<ChainMetadataEvent>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use log::*;
use tari_comms::connectivity::ConnectivityRequester;
use tari_p2p::services::liveness::LivenessHandle;
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tokio::sync::broadcast;
Expand All @@ -38,17 +37,16 @@ impl ServiceInitializer for ChainMetadataServiceInitializer {
async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> {
debug!(target: LOG_TARGET, "Initializing Chain Metadata Service");
// Buffer size set to 1 because only the most recent metadata is applicable
let (publisher, _) = broadcast::channel(1);
let (publisher, _) = broadcast::channel(20);

let handle = ChainMetadataHandle::new(publisher.clone());
context.register_handle(handle);

context.spawn_until_shutdown(|handles| {
let liveness = handles.expect_handle::<LivenessHandle>();
let base_node = handles.expect_handle::<LocalNodeCommsInterface>();
let connectivity = handles.expect_handle::<ConnectivityRequester>();

ChainMetadataService::new(liveness, base_node, connectivity, publisher).run()
ChainMetadataService::new(liveness, base_node, publisher).run()
});

debug!(target: LOG_TARGET, "Chain Metadata Service initialized");
Expand Down
166 changes: 27 additions & 139 deletions base_layer/core/src/base_node/chain_metadata_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use num_format::{Locale, ToFormattedString};
use prost::Message;
use tari_common::log_if_error;
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::{
connectivity::{ConnectivityEvent, ConnectivityRequester},
message::MessageExt,
};
use tari_comms::message::MessageExt;
use tari_p2p::services::liveness::{LivenessEvent, LivenessHandle, MetadataKey, PingPongEvent};
use tokio::sync::broadcast;

Expand All @@ -49,8 +46,6 @@ const NUM_ROUNDS_NETWORK_SILENCE: u16 = 3;
pub(super) struct ChainMetadataService {
liveness: LivenessHandle,
base_node: LocalNodeCommsInterface,
peer_chain_metadata: Vec<PeerChainMetadata>,
connectivity: ConnectivityRequester,
event_publisher: broadcast::Sender<Arc<ChainMetadataEvent>>,
number_of_rounds_no_pings: u16,
}
Expand All @@ -64,14 +59,11 @@ impl ChainMetadataService {
pub fn new(
liveness: LivenessHandle,
base_node: LocalNodeCommsInterface,
connectivity: ConnectivityRequester,
event_publisher: broadcast::Sender<Arc<ChainMetadataEvent>>,
) -> Self {
Self {
liveness,
base_node,
peer_chain_metadata: Vec::new(),
connectivity,
event_publisher,
number_of_rounds_no_pings: 0,
}
Expand All @@ -81,7 +73,6 @@ impl ChainMetadataService {
pub async fn run(mut self) {
let mut liveness_event_stream = self.liveness.get_event_stream();
let mut block_event_stream = self.base_node.get_block_event_stream();
let mut connectivity_events = self.connectivity.get_event_subscription();

log_if_error!(
target: LOG_TARGET,
Expand All @@ -108,29 +99,10 @@ impl ChainMetadataService {
);
},

Ok(event) = connectivity_events.recv() => {
self.handle_connectivity_event(event);
}
}
}
}

fn handle_connectivity_event(&mut self, event: ConnectivityEvent) {
use ConnectivityEvent::{PeerBanned, PeerDisconnected};
match event {
PeerDisconnected(node_id) | PeerBanned(node_id) => {
if let Some(pos) = self.peer_chain_metadata.iter().position(|p| *p.node_id() == node_id) {
debug!(
target: LOG_TARGET,
"Removing disconnected/banned peer `{}` from chain metadata list ", node_id
);
self.peer_chain_metadata.remove(pos);
}
},
_ => {},
}
}

/// Handle BlockEvents
async fn handle_block_event(&mut self, event: &BlockEvent) -> Result<(), ChainMetadataSyncError> {
match event {
Expand Down Expand Up @@ -166,8 +138,7 @@ impl ChainMetadataService {
);
self.number_of_rounds_no_pings = 0;
if event.metadata.has(MetadataKey::ChainMetadata) {
self.collect_chain_state_from_ping_pong(event)?;
self.send_chain_metadata_to_event_publisher().await?;
self.send_chain_metadata_to_event_publisher(event).await?;
}
},
// Received a pong, check if our neighbour sent it and it contains ChainMetadata
Expand All @@ -179,8 +150,7 @@ impl ChainMetadataService {
);
self.number_of_rounds_no_pings = 0;
if event.metadata.has(MetadataKey::ChainMetadata) {
self.collect_chain_state_from_ping_pong(event)?;
self.send_chain_metadata_to_event_publisher().await?;
self.send_chain_metadata_to_event_publisher(event).await?;
}
},
// New ping round has begun
Expand All @@ -197,10 +167,6 @@ impl ChainMetadataService {
self.number_of_rounds_no_pings = 0;
}
}
// Ensure that we're waiting for the correct amount of peers to respond
// and have allocated space for their replies

self.resize_chainstate_buffer(*num_peers);
},
}

Expand All @@ -212,31 +178,10 @@ impl ChainMetadataService {
Ok(())
}

async fn send_chain_metadata_to_event_publisher(&mut self) -> Result<(), ChainMetadataSyncError> {
// send only fails if there are no subscribers.
let _size = self
.event_publisher
.send(Arc::new(ChainMetadataEvent::PeerChainMetadataReceived(
self.peer_chain_metadata.clone(),
)));

Ok(())
}

fn resize_chainstate_buffer(&mut self, n: usize) {
match self.peer_chain_metadata.capacity() {
cap if n > cap => {
let additional = n - self.peer_chain_metadata.len();
self.peer_chain_metadata.reserve_exact(additional);
},
cap if n < cap => {
self.peer_chain_metadata.shrink_to(cap);
},
_ => {},
}
}

fn collect_chain_state_from_ping_pong(&mut self, event: &PingPongEvent) -> Result<(), ChainMetadataSyncError> {
async fn send_chain_metadata_to_event_publisher(
&mut self,
event: &PingPongEvent,
) -> Result<(), ChainMetadataSyncError> {
let chain_metadata_bytes = event
.metadata
.get(MetadataKey::ChainMetadata)
Expand All @@ -252,19 +197,15 @@ impl ChainMetadataService {
chain_metadata.accumulated_difficulty().to_formatted_string(&Locale::en),
);

if let Some(pos) = self
.peer_chain_metadata
.iter()
.position(|peer_chainstate| *peer_chainstate.node_id() == event.node_id)
{
self.peer_chain_metadata.remove(pos);
}
let peer_chain_metadata = PeerChainMetadata::new(event.node_id.clone(), chain_metadata, event.latency);

// send only fails if there are no subscribers.
let _size = self
.event_publisher
.send(Arc::new(ChainMetadataEvent::PeerChainMetadataReceived(
peer_chain_metadata,
)));

self.peer_chain_metadata.push(PeerChainMetadata::new(
event.node_id.clone(),
chain_metadata,
event.latency,
));
Ok(())
}
}
Expand All @@ -274,13 +215,7 @@ mod test {
use std::convert::TryInto;

use futures::StreamExt;
use tari_comms::{
peer_manager::NodeId,
test_utils::{
mocks::{create_connectivity_mock, ConnectivityManagerMockState},
node_identity::build_many_node_identities,
},
};
use tari_comms::peer_manager::NodeId;
use tari_p2p::services::liveness::{
mock::{create_p2p_liveness_mock, LivenessMockState},
LivenessRequest,
Expand Down Expand Up @@ -323,33 +258,24 @@ mod test {
fn setup() -> (
ChainMetadataService,
LivenessMockState,
ConnectivityManagerMockState,
reply_channel::TryReceiver<NodeCommsRequest, NodeCommsResponse, CommsInterfaceError>,
broadcast::Receiver<Arc<ChainMetadataEvent>>,
) {
let (liveness_handle, mock, _) = create_p2p_liveness_mock(1);
let liveness_mock_state = mock.get_mock_state();
task::spawn(mock.run());

let (base_node, base_node_receiver) = create_base_node_nci();
let (publisher, _) = broadcast::channel(1);

let (connectivity, mock) = create_connectivity_mock();
let connectivity_mock_state = mock.get_shared_state();
task::spawn(mock.run());
let (publisher, event_rx) = broadcast::channel(10);

let service = ChainMetadataService::new(liveness_handle, base_node, connectivity, publisher);
let service = ChainMetadataService::new(liveness_handle, base_node, publisher);

(
service,
liveness_mock_state,
connectivity_mock_state,
base_node_receiver,
)
(service, liveness_mock_state, base_node_receiver, event_rx)
}

#[tokio::test]
async fn update_liveness_chain_metadata() {
let (mut service, liveness_mock_state, _, mut base_node_receiver) = setup();
let (mut service, liveness_mock_state, mut base_node_receiver, _) = setup();

let mut proto_chain_metadata = create_sample_proto_chain_metadata();
proto_chain_metadata.height_of_longest_chain = Some(123);
Expand All @@ -375,7 +301,7 @@ mod test {
}
#[tokio::test]
async fn handle_liveness_event_ok() {
let (mut service, _, _, _) = setup();
let (mut service, _, _, mut events_rx) = setup();

let mut metadata = Metadata::new();
let proto_chain_metadata = create_sample_proto_chain_metadata();
Expand All @@ -388,57 +314,19 @@ mod test {
latency: None,
};

// To prevent the chain metadata buffer being flushed after receiving a single pong event,
// extend it's capacity to 2
service.peer_chain_metadata.reserve_exact(2);
let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
service.handle_liveness_event(&sample_event).await.unwrap();
assert_eq!(service.peer_chain_metadata.len(), 1);
let metadata = service.peer_chain_metadata.remove(0);
let metadata = events_rx.recv().await.unwrap().peer_metadata().unwrap();
assert_eq!(*metadata.node_id(), node_id);
assert_eq!(
metadata.claimed_chain_metadata().height_of_longest_chain(),
proto_chain_metadata.height_of_longest_chain.unwrap()
);
}

#[tokio::test]
async fn handle_liveness_event_banned_peer() {
let (mut service, _, _, _) = setup();

let mut metadata = Metadata::new();
let proto_chain_metadata = create_sample_proto_chain_metadata();
metadata.insert(MetadataKey::ChainMetadata, proto_chain_metadata.to_encoded_bytes());

service.peer_chain_metadata.reserve_exact(3);

let nodes = build_many_node_identities(2, Default::default());
for node in &nodes {
let pong_event = PingPongEvent {
metadata: metadata.clone(),
node_id: node.node_id().clone(),
latency: None,
};

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
service.handle_liveness_event(&sample_event).await.unwrap();
}

assert!(service
.peer_chain_metadata
.iter()
.any(|p| p.node_id() == nodes[0].node_id()));
service.handle_connectivity_event(ConnectivityEvent::PeerBanned(nodes[0].node_id().clone()));
// Check that banned peer was removed
assert!(service
.peer_chain_metadata
.iter()
.all(|p| p.node_id() != nodes[0].node_id()));
}

#[tokio::test]
async fn handle_liveness_event_no_metadata() {
let (mut service, _, _, _) = setup();
let (mut service, _, _, mut event_rx) = setup();

let metadata = Metadata::new();
let node_id = NodeId::new();
Expand All @@ -450,12 +338,12 @@ mod test {

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
service.handle_liveness_event(&sample_event).await.unwrap();
assert!(service.peer_chain_metadata.is_empty());
assert!(event_rx.try_recv().is_err());
}

#[tokio::test]
async fn handle_liveness_event_bad_metadata() {
let (mut service, _, _, _) = setup();
let (mut service, _, _, mut event_rx) = setup();

let mut metadata = Metadata::new();
metadata.insert(MetadataKey::ChainMetadata, b"no-good".to_vec());
Expand All @@ -469,6 +357,6 @@ mod test {
let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
let err = service.handle_liveness_event(&sample_event).await.unwrap_err();
unpack_enum!(ChainMetadataSyncError::DecodeError(_err) = err);
assert_eq!(service.peer_chain_metadata.len(), 0);
assert!(event_rx.try_recv().is_err());
}
}
Loading

0 comments on commit f3d2c9c

Please sign in to comment.