Skip to content

Commit

Permalink
Expose peer metdata version through incoming messagtes
Browse files Browse the repository at this point in the history
Summary:
This allows incoming messages to expose metadata of remote peer
  • Loading branch information
muhamadazmy committed Oct 11, 2024
1 parent 03b2a97 commit c2a90bc
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 4 deletions.
2 changes: 2 additions & 0 deletions crates/core/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ pub mod test_util {
use crate::network::MessageHandler;
use crate::network::MessageRouterBuilder;
use crate::network::NetworkError;
use crate::network::PeerMetadataVersion;
use crate::network::ProtocolError;
use crate::network::TransportConnect;
use crate::TaskCenter;
Expand Down Expand Up @@ -735,6 +736,7 @@ pub mod test_util {
self.connection.downgrade(),
header.msg_id,
header.in_response_to,
PeerMetadataVersion::from(header),
),
self.connection.protocol_version,
)
Expand Down
4 changes: 3 additions & 1 deletion crates/core/src/network/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use super::transport_connector::TransportConnect;
use super::{Handler, MessageRouter};
use crate::metadata::Urgency;
use crate::network::handshake::{negotiate_protocol_version, wait_for_hello};
use crate::network::Incoming;
use crate::network::{Incoming, PeerMetadataVersion};
use crate::Metadata;
use crate::{cancellation_watcher, current_task_id, task_center, TaskId, TaskKind};

Expand Down Expand Up @@ -592,6 +592,7 @@ where
connection.downgrade(),
header.msg_id,
header.in_response_to,
PeerMetadataVersion::from(header),
),
connection.protocol_version,
)
Expand Down Expand Up @@ -641,6 +642,7 @@ where
WeakConnection::new_closed(peer_node_id),
header.msg_id,
header.in_response_to,
PeerMetadataVersion::from(header),
),
protocol_version,
)
Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/network/rpc_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ where

#[cfg(test)]
mod test {
use crate::network::WeakConnection;
use crate::network::{PeerMetadataVersion, WeakConnection};

use super::*;
use futures::future::join_all;
Expand Down Expand Up @@ -438,6 +438,7 @@ mod test {
WeakConnection::new_closed(GenerationalNodeId::new(1, 1)),
1,
Some(42),
PeerMetadataVersion::default(),
))
.await;

Expand All @@ -450,6 +451,7 @@ mod test {
WeakConnection::new_closed(GenerationalNodeId::new(1, 1)),
1,
Some(42),
PeerMetadataVersion::default(),
));
assert!(maybe_msg.is_some());

Expand All @@ -464,6 +466,7 @@ mod test {
WeakConnection::new_closed(GenerationalNodeId::new(1, 1)),
1,
Some(1),
PeerMetadataVersion::default(),
))
.await;

Expand Down Expand Up @@ -505,6 +508,7 @@ mod test {
WeakConnection::new_closed(GenerationalNodeId::new(0, 0)),
1,
Some(idx),
PeerMetadataVersion::default(),
));
});

Expand Down
38 changes: 36 additions & 2 deletions crates/core/src/network/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use std::time::{Duration, Instant};

use restate_types::net::codec::{Targeted, WireEncode};
use restate_types::net::RpcRequest;
use restate_types::{GenerationalNodeId, NodeId};
use restate_types::protobuf::node::Header;
use restate_types::{GenerationalNodeId, NodeId, Version};

use crate::with_metadata;

Expand Down Expand Up @@ -69,12 +70,31 @@ struct MsgMeta {
in_response_to: Option<u64>,
}

#[derive(Clone, Debug, Copy, Default)]
pub struct PeerMetadataVersion {
pub logs: Option<Version>,
pub nodes_config: Option<Version>,
pub partition_table: Option<Version>,
pub schema: Option<Version>,
}

impl From<Header> for PeerMetadataVersion {
fn from(value: Header) -> Self {
Self {
logs: value.my_logs_version.map(Version::from),
nodes_config: value.my_nodes_config_version.map(Version::from),
partition_table: value.my_partition_table_version.map(Version::from),
schema: value.my_schema_version.map(Version::from),
}
}
}
/// A wrapper for incoming messages that includes the sender information
#[derive(Debug, Clone)]
pub struct Incoming<M> {
meta: MsgMeta,
connection: WeakConnection,
body: M,
metadata_version: PeerMetadataVersion,
}

impl<M> Incoming<M> {
Expand All @@ -83,6 +103,7 @@ impl<M> Incoming<M> {
connection: WeakConnection,
msg_id: u64,
in_response_to: Option<u64>,
metadata_version: PeerMetadataVersion,
) -> Self {
Self {
connection,
Expand All @@ -91,13 +112,20 @@ impl<M> Incoming<M> {
msg_id,
in_response_to,
},
metadata_version,
}
}

#[cfg(any(test, feature = "test-util"))]
pub fn for_testing(connection: WeakConnection, body: M, in_response_to: Option<u64>) -> Self {
let msg_id = generate_msg_id();
Self::from_parts(body, connection, msg_id, in_response_to)
Self::from_parts(
body,
connection,
msg_id,
in_response_to,
PeerMetadataVersion::default(),
)
}

pub fn peer(&self) -> &GenerationalNodeId {
Expand Down Expand Up @@ -127,6 +155,10 @@ impl<M> Incoming<M> {
self.meta.in_response_to
}

pub fn metadata_version(&self) -> &PeerMetadataVersion {
&self.metadata_version
}

/// Creates a reciprocal for this incoming message without consuming it. This will internall
/// clone the original connection reference.
pub fn create_reciprocal(&self) -> Reciprocal {
Expand All @@ -138,6 +170,7 @@ impl<M> Incoming<M> {
connection: self.connection,
body: f(self.body)?,
meta: self.meta,
metadata_version: self.metadata_version,
})
}

Expand All @@ -146,6 +179,7 @@ impl<M> Incoming<M> {
connection: self.connection,
body: f(self.body),
meta: self.meta,
metadata_version: self.metadata_version,
}
}

Expand Down

0 comments on commit c2a90bc

Please sign in to comment.