From c2a90bc04a77cd7a07f85da600b4444eca647623 Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Fri, 11 Oct 2024 12:38:37 +0200 Subject: [PATCH] Expose peer metdata version through incoming messagtes Summary: This allows incoming messages to expose metadata of remote peer --- crates/core/src/network/connection.rs | 2 + crates/core/src/network/connection_manager.rs | 4 +- crates/core/src/network/rpc_router.rs | 6 ++- crates/core/src/network/types.rs | 38 ++++++++++++++++++- 4 files changed, 46 insertions(+), 4 deletions(-) diff --git a/crates/core/src/network/connection.rs b/crates/core/src/network/connection.rs index 7e7dd7c14..ce868a5d7 100644 --- a/crates/core/src/network/connection.rs +++ b/crates/core/src/network/connection.rs @@ -339,6 +339,7 @@ pub mod test_util { use crate::network::MessageHandler; use crate::network::MessageRouterBuilder; use crate::network::NetworkError; + use crate::network::PeerMetadataVersion; use crate::network::ProtocolError; use crate::network::TransportConnect; use crate::TaskCenter; @@ -735,6 +736,7 @@ pub mod test_util { self.connection.downgrade(), header.msg_id, header.in_response_to, + PeerMetadataVersion::from(header), ), self.connection.protocol_version, ) diff --git a/crates/core/src/network/connection_manager.rs b/crates/core/src/network/connection_manager.rs index 889d39de9..1c24e4616 100644 --- a/crates/core/src/network/connection_manager.rs +++ b/crates/core/src/network/connection_manager.rs @@ -39,7 +39,7 @@ use super::transport_connector::TransportConnect; use super::{Handler, MessageRouter}; use crate::metadata::Urgency; use crate::network::handshake::{negotiate_protocol_version, wait_for_hello}; -use crate::network::Incoming; +use crate::network::{Incoming, PeerMetadataVersion}; use crate::Metadata; use crate::{cancellation_watcher, current_task_id, task_center, TaskId, TaskKind}; @@ -592,6 +592,7 @@ where connection.downgrade(), header.msg_id, header.in_response_to, + PeerMetadataVersion::from(header), ), connection.protocol_version, ) @@ -641,6 +642,7 @@ where WeakConnection::new_closed(peer_node_id), header.msg_id, header.in_response_to, + PeerMetadataVersion::from(header), ), protocol_version, ) diff --git a/crates/core/src/network/rpc_router.rs b/crates/core/src/network/rpc_router.rs index 8a554818b..b32ed7edf 100644 --- a/crates/core/src/network/rpc_router.rs +++ b/crates/core/src/network/rpc_router.rs @@ -370,7 +370,7 @@ where #[cfg(test)] mod test { - use crate::network::WeakConnection; + use crate::network::{PeerMetadataVersion, WeakConnection}; use super::*; use futures::future::join_all; @@ -438,6 +438,7 @@ mod test { WeakConnection::new_closed(GenerationalNodeId::new(1, 1)), 1, Some(42), + PeerMetadataVersion::default(), )) .await; @@ -450,6 +451,7 @@ mod test { WeakConnection::new_closed(GenerationalNodeId::new(1, 1)), 1, Some(42), + PeerMetadataVersion::default(), )); assert!(maybe_msg.is_some()); @@ -464,6 +466,7 @@ mod test { WeakConnection::new_closed(GenerationalNodeId::new(1, 1)), 1, Some(1), + PeerMetadataVersion::default(), )) .await; @@ -505,6 +508,7 @@ mod test { WeakConnection::new_closed(GenerationalNodeId::new(0, 0)), 1, Some(idx), + PeerMetadataVersion::default(), )); }); diff --git a/crates/core/src/network/types.rs b/crates/core/src/network/types.rs index 9da400520..07238d8d5 100644 --- a/crates/core/src/network/types.rs +++ b/crates/core/src/network/types.rs @@ -14,7 +14,8 @@ use std::time::{Duration, Instant}; use restate_types::net::codec::{Targeted, WireEncode}; use restate_types::net::RpcRequest; -use restate_types::{GenerationalNodeId, NodeId}; +use restate_types::protobuf::node::Header; +use restate_types::{GenerationalNodeId, NodeId, Version}; use crate::with_metadata; @@ -69,12 +70,31 @@ struct MsgMeta { in_response_to: Option, } +#[derive(Clone, Debug, Copy, Default)] +pub struct PeerMetadataVersion { + pub logs: Option, + pub nodes_config: Option, + pub partition_table: Option, + pub schema: Option, +} + +impl From
for PeerMetadataVersion { + fn from(value: Header) -> Self { + Self { + logs: value.my_logs_version.map(Version::from), + nodes_config: value.my_nodes_config_version.map(Version::from), + partition_table: value.my_partition_table_version.map(Version::from), + schema: value.my_schema_version.map(Version::from), + } + } +} /// A wrapper for incoming messages that includes the sender information #[derive(Debug, Clone)] pub struct Incoming { meta: MsgMeta, connection: WeakConnection, body: M, + metadata_version: PeerMetadataVersion, } impl Incoming { @@ -83,6 +103,7 @@ impl Incoming { connection: WeakConnection, msg_id: u64, in_response_to: Option, + metadata_version: PeerMetadataVersion, ) -> Self { Self { connection, @@ -91,13 +112,20 @@ impl Incoming { msg_id, in_response_to, }, + metadata_version, } } #[cfg(any(test, feature = "test-util"))] pub fn for_testing(connection: WeakConnection, body: M, in_response_to: Option) -> Self { let msg_id = generate_msg_id(); - Self::from_parts(body, connection, msg_id, in_response_to) + Self::from_parts( + body, + connection, + msg_id, + in_response_to, + PeerMetadataVersion::default(), + ) } pub fn peer(&self) -> &GenerationalNodeId { @@ -127,6 +155,10 @@ impl Incoming { self.meta.in_response_to } + pub fn metadata_version(&self) -> &PeerMetadataVersion { + &self.metadata_version + } + /// Creates a reciprocal for this incoming message without consuming it. This will internall /// clone the original connection reference. pub fn create_reciprocal(&self) -> Reciprocal { @@ -138,6 +170,7 @@ impl Incoming { connection: self.connection, body: f(self.body)?, meta: self.meta, + metadata_version: self.metadata_version, }) } @@ -146,6 +179,7 @@ impl Incoming { connection: self.connection, body: f(self.body), meta: self.meta, + metadata_version: self.metadata_version, } }