diff --git a/Cargo.lock b/Cargo.lock index 62230bdc9b315c..db4e6fbddbe787 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1296,6 +1296,7 @@ dependencies = [ "serde", "serde_json", "thiserror", + "tokio", ] [[package]] @@ -3269,12 +3270,16 @@ dependencies = [ "aptos-config", "aptos-logger", "aptos-metrics-core", + "aptos-protos", "aptos-retrier", "bcs 0.1.4", "crossbeam-channel", "once_cell", "serde", "thiserror", + "tokio", + "tonic 0.10.0", + "tonic-reflection", ] [[package]] diff --git a/crates/aptos-protos/proto/aptos/remote_executor/v1/network_msg.proto b/crates/aptos-protos/proto/aptos/remote_executor/v1/network_msg.proto new file mode 100644 index 00000000000000..5afe67579f1e72 --- /dev/null +++ b/crates/aptos-protos/proto/aptos/remote_executor/v1/network_msg.proto @@ -0,0 +1,19 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; + +package aptos.remote_executor.v1; + +message NetworkMessage { + bytes message = 1; + string message_type = 2; +} + +message Empty { + +} + +service NetworkMessageService { + rpc SimpleMsgExchange(NetworkMessage) returns (Empty); +} \ No newline at end of file diff --git a/crates/aptos-protos/src/pb/aptos.remote_executor.v1.rs b/crates/aptos-protos/src/pb/aptos.remote_executor.v1.rs new file mode 100644 index 00000000000000..11011e96c5eea6 --- /dev/null +++ b/crates/aptos-protos/src/pb/aptos.remote_executor.v1.rs @@ -0,0 +1,61 @@ +// Copyright © Aptos Foundation + +// @generated +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct NetworkMessage { + #[prost(bytes="vec", tag="1")] + pub message: ::prost::alloc::vec::Vec, + #[prost(string, tag="2")] + pub message_type: ::prost::alloc::string::String, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Empty { +} +/// Encoded file descriptor set for the `aptos.remote_executor.v1` package +pub const FILE_DESCRIPTOR_SET: &[u8] = &[ + 0x0a, 0xf1, 0x04, 0x0a, 0x2a, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, + 0x65, 0x5f, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x2f, 0x76, 0x31, 0x2f, 0x6e, 0x65, + 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x5f, 0x6d, 0x73, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x18, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x65, 0x78, + 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x2e, 0x76, 0x31, 0x22, 0x4d, 0x0a, 0x0e, 0x4e, 0x65, 0x74, + 0x77, 0x6f, 0x72, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x54, 0x79, 0x70, 0x65, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x32, 0x77, 0x0a, 0x15, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5e, 0x0a, 0x11, 0x53, 0x69, + 0x6d, 0x70, 0x6c, 0x65, 0x4d, 0x73, 0x67, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, + 0x28, 0x2e, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x65, + 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x4e, 0x65, 0x74, 0x77, 0x6f, + 0x72, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x61, 0x70, 0x74, 0x6f, + 0x73, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, + 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x4a, 0xcf, 0x02, 0x0a, 0x06, 0x12, + 0x04, 0x03, 0x00, 0x12, 0x01, 0x0a, 0x4e, 0x0a, 0x01, 0x0c, 0x12, 0x03, 0x03, 0x00, 0x12, 0x32, + 0x44, 0x20, 0x43, 0x6f, 0x70, 0x79, 0x72, 0x69, 0x67, 0x68, 0x74, 0x20, 0xc2, 0xa9, 0x20, 0x41, + 0x70, 0x74, 0x6f, 0x73, 0x20, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x0a, + 0x20, 0x53, 0x50, 0x44, 0x58, 0x2d, 0x4c, 0x69, 0x63, 0x65, 0x6e, 0x73, 0x65, 0x2d, 0x49, 0x64, + 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x3a, 0x20, 0x41, 0x70, 0x61, 0x63, 0x68, 0x65, + 0x2d, 0x32, 0x2e, 0x30, 0x0a, 0x0a, 0x08, 0x0a, 0x01, 0x02, 0x12, 0x03, 0x05, 0x00, 0x21, 0x0a, + 0x0a, 0x0a, 0x02, 0x04, 0x00, 0x12, 0x04, 0x07, 0x00, 0x0a, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x04, + 0x00, 0x01, 0x12, 0x03, 0x07, 0x08, 0x16, 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x00, 0x02, 0x00, 0x12, + 0x03, 0x08, 0x04, 0x16, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x05, 0x12, 0x03, 0x08, + 0x04, 0x09, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x01, 0x12, 0x03, 0x08, 0x0a, 0x11, + 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x03, 0x12, 0x03, 0x08, 0x14, 0x15, 0x0a, 0x0b, + 0x0a, 0x04, 0x04, 0x00, 0x02, 0x01, 0x12, 0x03, 0x09, 0x04, 0x1c, 0x0a, 0x0c, 0x0a, 0x05, 0x04, + 0x00, 0x02, 0x01, 0x05, 0x12, 0x03, 0x09, 0x04, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, + 0x01, 0x01, 0x12, 0x03, 0x09, 0x0b, 0x17, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x01, 0x03, + 0x12, 0x03, 0x09, 0x1a, 0x1b, 0x0a, 0x0a, 0x0a, 0x02, 0x04, 0x01, 0x12, 0x04, 0x0c, 0x00, 0x0e, + 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x01, 0x01, 0x12, 0x03, 0x0c, 0x08, 0x0d, 0x0a, 0x0a, 0x0a, + 0x02, 0x06, 0x00, 0x12, 0x04, 0x10, 0x00, 0x12, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x06, 0x00, 0x01, + 0x12, 0x03, 0x10, 0x08, 0x1d, 0x0a, 0x0b, 0x0a, 0x04, 0x06, 0x00, 0x02, 0x00, 0x12, 0x03, 0x11, + 0x04, 0x3a, 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x01, 0x12, 0x03, 0x11, 0x08, 0x19, + 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x02, 0x12, 0x03, 0x11, 0x1a, 0x28, 0x0a, 0x0c, + 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x03, 0x12, 0x03, 0x11, 0x33, 0x38, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +]; +include!("aptos.remote_executor.v1.serde.rs"); +include!("aptos.remote_executor.v1.tonic.rs"); +// @@protoc_insertion_point(module) diff --git a/crates/aptos-protos/src/pb/aptos.remote_executor.v1.serde.rs b/crates/aptos-protos/src/pb/aptos.remote_executor.v1.serde.rs new file mode 100644 index 00000000000000..3c799d6e8c7ddb --- /dev/null +++ b/crates/aptos-protos/src/pb/aptos.remote_executor.v1.serde.rs @@ -0,0 +1,185 @@ +// Copyright © Aptos Foundation + +// @generated +impl serde::Serialize for Empty { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let len = 0; + let struct_ser = serializer.serialize_struct("aptos.remote_executor.v1.Empty", len)?; + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for Empty { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + Err(serde::de::Error::unknown_field(value, FIELDS)) + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = Empty; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct aptos.remote_executor.v1.Empty") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + while map.next_key::()?.is_some() { + let _ = map.next_value::()?; + } + Ok(Empty { + }) + } + } + deserializer.deserialize_struct("aptos.remote_executor.v1.Empty", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for NetworkMessage { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.message.is_empty() { + len += 1; + } + if !self.message_type.is_empty() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("aptos.remote_executor.v1.NetworkMessage", len)?; + if !self.message.is_empty() { + struct_ser.serialize_field("message", pbjson::private::base64::encode(&self.message).as_str())?; + } + if !self.message_type.is_empty() { + struct_ser.serialize_field("messageType", &self.message_type)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for NetworkMessage { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "message", + "message_type", + "messageType", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Message, + MessageType, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "message" => Ok(GeneratedField::Message), + "messageType" | "message_type" => Ok(GeneratedField::MessageType), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = NetworkMessage; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct aptos.remote_executor.v1.NetworkMessage") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut message__ = None; + let mut message_type__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::Message => { + if message__.is_some() { + return Err(serde::de::Error::duplicate_field("message")); + } + message__ = + Some(map.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) + ; + } + GeneratedField::MessageType => { + if message_type__.is_some() { + return Err(serde::de::Error::duplicate_field("messageType")); + } + message_type__ = Some(map.next_value()?); + } + } + } + Ok(NetworkMessage { + message: message__.unwrap_or_default(), + message_type: message_type__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("aptos.remote_executor.v1.NetworkMessage", FIELDS, GeneratedVisitor) + } +} diff --git a/crates/aptos-protos/src/pb/aptos.remote_executor.v1.tonic.rs b/crates/aptos-protos/src/pb/aptos.remote_executor.v1.tonic.rs new file mode 100644 index 00000000000000..84cc02c328d07a --- /dev/null +++ b/crates/aptos-protos/src/pb/aptos.remote_executor.v1.tonic.rs @@ -0,0 +1,301 @@ +// Copyright © Aptos Foundation + +// @generated +/// Generated client implementations. +pub mod network_message_service_client { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + /// + #[derive(Debug, Clone)] + pub struct NetworkMessageServiceClient { + inner: tonic::client::Grpc, + } + impl NetworkMessageServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl NetworkMessageServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> NetworkMessageServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + Send + Sync, + { + NetworkMessageServiceClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// + pub async fn simple_msg_exchange( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/aptos.remote_executor.v1.NetworkMessageService/SimpleMsgExchange", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "aptos.remote_executor.v1.NetworkMessageService", + "SimpleMsgExchange", + ), + ); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod network_message_service_server { + #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with NetworkMessageServiceServer. + #[async_trait] + pub trait NetworkMessageService: Send + Sync + 'static { + /// + async fn simple_msg_exchange( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + } + /// + #[derive(Debug)] + pub struct NetworkMessageServiceServer { + inner: _Inner, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + struct _Inner(Arc); + impl NetworkMessageServiceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> + for NetworkMessageServiceServer + where + T: NetworkMessageService, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); + match req.uri().path() { + "/aptos.remote_executor.v1.NetworkMessageService/SimpleMsgExchange" => { + #[allow(non_camel_case_types)] + struct SimpleMsgExchangeSvc(pub Arc); + impl< + T: NetworkMessageService, + > tonic::server::UnaryService + for SimpleMsgExchangeSvc { + type Response = super::Empty; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).simple_msg_exchange(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = SimpleMsgExchangeSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } + } + } + } + impl Clone for NetworkMessageServiceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService + for NetworkMessageServiceServer { + const NAME: &'static str = "aptos.remote_executor.v1.NetworkMessageService"; + } +} diff --git a/crates/aptos-protos/src/pb/mod.rs b/crates/aptos-protos/src/pb/mod.rs index 9e786ac6edc43b..053c32a7b9aebf 100644 --- a/crates/aptos-protos/src/pb/mod.rs +++ b/crates/aptos-protos/src/pb/mod.rs @@ -18,6 +18,13 @@ pub mod aptos { } } } + pub mod remote_executor { + // @@protoc_insertion_point(attribute:aptos.remote_executor.v1) + pub mod v1 { + include!("aptos.remote_executor.v1.rs"); + // @@protoc_insertion_point(aptos.remote_executor.v1) + } + } pub mod transaction { // @@protoc_insertion_point(attribute:aptos.transaction.v1) pub mod v1 { diff --git a/execution/executor-service/Cargo.toml b/execution/executor-service/Cargo.toml index 90872f811f28aa..3b4840a5bc0338 100644 --- a/execution/executor-service/Cargo.toml +++ b/execution/executor-service/Cargo.toml @@ -37,6 +37,7 @@ rayon = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true } [dev-dependencies] aptos-language-e2e-tests = { workspace = true } diff --git a/execution/executor-service/src/process_executor_service.rs b/execution/executor-service/src/process_executor_service.rs index cfc849e7dba825..f60e70ed1fb420 100644 --- a/execution/executor-service/src/process_executor_service.rs +++ b/execution/executor-service/src/process_executor_service.rs @@ -7,7 +7,7 @@ use std::net::SocketAddr; /// An implementation of the remote executor service that runs in a standalone process. pub struct ProcessExecutorService { - _executor_service: ExecutorService, + executor_service: ExecutorService, } impl ProcessExecutorService { @@ -32,8 +32,10 @@ impl ProcessExecutorService { remote_shard_addresses, ); executor_service.start(); - Self { - _executor_service: executor_service, - } + Self { executor_service } + } + + pub fn shutdown(&mut self) { + self.executor_service.shutdown() } } diff --git a/execution/executor-service/src/remote_cordinator_client.rs b/execution/executor-service/src/remote_cordinator_client.rs index 4dc1311dffcf1a..17e73543217359 100644 --- a/execution/executor-service/src/remote_cordinator_client.rs +++ b/execution/executor-service/src/remote_cordinator_client.rs @@ -76,20 +76,24 @@ impl RemoteCoordinatorClient { impl CoordinatorClient for RemoteCoordinatorClient { fn receive_execute_command(&self) -> ExecutorShardCommand { - let message = self.command_rx.recv().unwrap(); - let request: RemoteExecutionRequest = bcs::from_bytes(&message.data).unwrap(); - match request { - RemoteExecutionRequest::ExecuteBlock(command) => { - let state_keys = Self::extract_state_keys(&command); - self.state_view_client.init_for_block(state_keys); - let (sub_blocks, concurrency, gas_limit) = command.into(); - ExecutorShardCommand::ExecuteSubBlocks( - self.state_view_client.clone(), - sub_blocks, - concurrency, - gas_limit, - ) + match self.command_rx.recv() { + Ok(message) => { + let request: RemoteExecutionRequest = bcs::from_bytes(&message.data).unwrap(); + match request { + RemoteExecutionRequest::ExecuteBlock(command) => { + let state_keys = Self::extract_state_keys(&command); + self.state_view_client.init_for_block(state_keys); + let (sub_blocks, concurrency, gas_limit) = command.into(); + ExecutorShardCommand::ExecuteSubBlocks( + self.state_view_client.clone(), + sub_blocks, + concurrency, + gas_limit, + ) + }, + } }, + Err(_) => ExecutorShardCommand::Stop, } } diff --git a/execution/executor-service/src/remote_executor_service.rs b/execution/executor-service/src/remote_executor_service.rs index 0b9dd8f539bc3a..7bd62fbd8aefd0 100644 --- a/execution/executor-service/src/remote_executor_service.rs +++ b/execution/executor-service/src/remote_executor_service.rs @@ -8,11 +8,12 @@ use crate::{ use aptos_secure_net::network_controller::NetworkController; use aptos_types::block_executor::partitioner::ShardId; use aptos_vm::sharded_block_executor::sharded_executor_service::ShardedExecutorService; -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc, thread}; /// A service that provides support for remote execution. Essentially, it reads a request from /// the remote executor client and executes the block locally and returns the result. pub struct ExecutorService { + shard_id: ShardId, controller: NetworkController, executor_service: Arc>, } @@ -47,6 +48,7 @@ impl ExecutorService { )); Self { + shard_id, controller, executor_service, } @@ -54,6 +56,17 @@ impl ExecutorService { pub fn start(&mut self) { self.controller.start(); - self.executor_service.start(); + let thread_name = format!("ExecutorService-{}", self.shard_id); + let builder = thread::Builder::new().name(thread_name); + let executor_service_clone = self.executor_service.clone(); + builder + .spawn(move || { + executor_service_clone.start(); + }) + .expect("Failed to spawn thread"); + } + + pub fn shutdown(&mut self) { + self.controller.shutdown(); } } diff --git a/execution/executor-service/src/remote_state_view.rs b/execution/executor-service/src/remote_state_view.rs index 1658773cff6e43..9642b2cb75a9a2 100644 --- a/execution/executor-service/src/remote_state_view.rs +++ b/execution/executor-service/src/remote_state_view.rs @@ -186,8 +186,7 @@ impl RemoteStateValueReceiver { } fn start(&self) { - loop { - let message = self.kv_rx.recv().unwrap(); + while let Ok(message) = self.kv_rx.recv() { let state_view = self.state_view.clone(); let shard_id = self.shard_id; self.thread_pool.spawn(move || { diff --git a/execution/executor-service/src/remote_state_view_service.rs b/execution/executor-service/src/remote_state_view_service.rs index 8f155ab3064b1d..10a51f4893baaf 100644 --- a/execution/executor-service/src/remote_state_view_service.rs +++ b/execution/executor-service/src/remote_state_view_service.rs @@ -56,8 +56,7 @@ impl RemoteStateViewService { } pub fn start(&self) { - loop { - let message = self.kv_rx.recv().unwrap(); + while let Ok(message) = self.kv_rx.recv() { let state_view = self.state_view.clone(); let kv_txs = self.kv_tx.clone(); self.thread_pool.spawn(move || { diff --git a/execution/executor-service/src/test_utils.rs b/execution/executor-service/src/test_utils.rs index 377b9c90fc3ca7..da7863a7da2cd3 100644 --- a/execution/executor-service/src/test_utils.rs +++ b/execution/executor-service/src/test_utils.rs @@ -1,11 +1,13 @@ // Copyright © Aptos Foundation use aptos_block_partitioner::{v2::config::PartitionerV2Config, PartitionerConfig}; +use aptos_crypto::hash::CryptoHash; use aptos_language_e2e_tests::{ account::AccountData, common_transactions::peer_to_peer_txn, data_store::FakeDataStore, executor::FakeExecutor, }; use aptos_types::{ + account_address::AccountAddress, block_executor::partitioner::PartitionedTransactions, state_store::state_key::StateKeyInner, transaction::{analyzed_transaction::AnalyzedTransaction, Transaction, TransactionOutput}, @@ -14,7 +16,14 @@ use aptos_vm::{ sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}, AptosVM, VMExecutor, }; -use std::sync::Arc; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +pub fn generate_account_at(executor: &mut FakeExecutor, address: AccountAddress) -> AccountData { + executor.new_account_data_at(address) +} fn generate_non_conflicting_sender_receiver( executor: &mut FakeExecutor, @@ -95,7 +104,7 @@ pub fn compare_txn_outputs( pub fn test_sharded_block_executor_no_conflict>( sharded_block_executor: ShardedBlockExecutor, ) { - let num_txns = 10; + let num_txns = 400; let num_shards = sharded_block_executor.num_shards(); let mut executor = FakeExecutor::from_head_genesis(); let mut transactions = Vec::new(); @@ -127,3 +136,55 @@ pub fn test_sharded_block_executor_no_conflict> .unwrap(); compare_txn_outputs(unsharded_txn_output, sharded_txn_output); } + +pub fn sharded_block_executor_with_conflict>( + sharded_block_executor: ShardedBlockExecutor, + concurrency: usize, +) { + let num_txns = 800; + let num_shards = sharded_block_executor.num_shards(); + let num_accounts = 80; + let mut executor = FakeExecutor::from_head_genesis(); + let mut transactions = Vec::new(); + let mut accounts = Vec::new(); + let mut txn_hash_to_account = HashMap::new(); + for _ in 0..num_accounts { + let account = generate_account_at(&mut executor, AccountAddress::random()); + accounts.push(Mutex::new(account)); + } + for i in 1..num_txns / num_accounts { + for j in 0..num_accounts { + let sender = &mut accounts[j].lock().unwrap(); + let sender_addr = *sender.address(); + let receiver = &accounts[(j + i) % num_accounts].lock().unwrap(); + let transfer_amount = 1_000; + let txn = generate_p2p_txn(sender, receiver, transfer_amount); + txn_hash_to_account.insert(txn.transaction().hash(), sender_addr); + transactions.push(txn) + } + } + + let partitioner = PartitionerV2Config::default() + .max_partitioning_rounds(2) + .cross_shard_dep_avoid_threshold(0.9) + .partition_last_round(true) + .build(); + let partitioned_txns = partitioner.partition(transactions.clone(), num_shards); + + let execution_ordered_txns = PartitionedTransactions::flatten(partitioned_txns.clone()) + .into_iter() + .map(|t| t.into_txn()) + .collect(); + let sharded_txn_output = sharded_block_executor + .execute_block( + Arc::new(executor.data_store().clone()), + partitioned_txns, + concurrency, + None, + ) + .unwrap(); + + let unsharded_txn_output = + AptosVM::execute_block(execution_ordered_txns, executor.data_store(), None).unwrap(); + compare_txn_outputs(unsharded_txn_output, sharded_txn_output); +} diff --git a/execution/executor-service/src/tests.rs b/execution/executor-service/src/tests.rs index 861604c3fafca5..e206fedbc71677 100644 --- a/execution/executor-service/src/tests.rs +++ b/execution/executor-service/src/tests.rs @@ -55,12 +55,45 @@ pub fn create_thread_remote_executor_shards( } #[test] -#[ignore] fn test_sharded_block_executor_no_conflict() { + use std::thread; + let num_shards = 8; - let (mut controller, executor_client, _executor_services) = + let (mut controller, executor_client, mut executor_services) = create_thread_remote_executor_shards(num_shards, Some(2)); controller.start(); let sharded_block_executor = ShardedBlockExecutor::new(executor_client); + + // wait for the servers to be ready before sending messages + // TODO: We need to pass this test without this sleep + thread::sleep(std::time::Duration::from_millis(10)); + test_utils::test_sharded_block_executor_no_conflict(sharded_block_executor); + + controller.shutdown(); + executor_services.iter_mut().for_each(|executor_service| { + executor_service.shutdown(); + }); +} + +#[test] +fn test_sharded_block_executor_with_conflict() { + use std::thread; + + let num_shards = 8; + let (mut controller, executor_client, mut executor_services) = + create_thread_remote_executor_shards(num_shards, Some(2)); + controller.start(); + let sharded_block_executor = ShardedBlockExecutor::new(executor_client); + + // wait for the servers to be ready before sending messages + // TODO: We need to pass this test without this sleep + thread::sleep(std::time::Duration::from_millis(10)); + + test_utils::sharded_block_executor_with_conflict(sharded_block_executor, 2); + + controller.shutdown(); + executor_services.iter_mut().for_each(|executor_service| { + executor_service.shutdown(); + }); } diff --git a/execution/executor-service/src/thread_executor_service.rs b/execution/executor-service/src/thread_executor_service.rs index a067e934d8ade5..ccf0e61c519827 100644 --- a/execution/executor-service/src/thread_executor_service.rs +++ b/execution/executor-service/src/thread_executor_service.rs @@ -2,13 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use crate::remote_executor_service::ExecutorService; use aptos_types::block_executor::partitioner::ShardId; -use std::{net::SocketAddr, thread, thread::JoinHandle}; +use std::net::SocketAddr; /// This is a simple implementation of RemoteExecutorService that runs the executor service in a /// separate thread. This should be used for testing only. pub struct ThreadExecutorService { - _child: JoinHandle<()>, _self_address: SocketAddr, + executor_service: ExecutorService, } impl ThreadExecutorService { @@ -28,19 +28,14 @@ impl ThreadExecutorService { coordinator_address, remote_shard_addresses, ); - - let thread_name = format!("ThreadExecutorService-{}", shard_id); - let builder = thread::Builder::new().name(thread_name); - - let child = builder - .spawn(move || { - executor_service.start(); - }) - .expect("Failed to spawn thread"); - + executor_service.start(); Self { - _child: child, _self_address: self_address, + executor_service, } } + + pub fn shutdown(&mut self) { + self.executor_service.shutdown() + } } diff --git a/secure/net/Cargo.toml b/secure/net/Cargo.toml index 5bd47aa7eb8680..7a01db336f819c 100644 --- a/secure/net/Cargo.toml +++ b/secure/net/Cargo.toml @@ -15,12 +15,16 @@ rust-version = { workspace = true } [dependencies] aptos-logger = { workspace = true } aptos-metrics-core = { workspace = true } +aptos-protos = { workspace = true } aptos-retrier = { workspace = true } bcs = { workspace = true } crossbeam-channel = { workspace = true } once_cell = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true } +tonic = { workspace = true } +tonic-reflection = { workspace = true } [dev-dependencies] aptos-config = { workspace = true } diff --git a/secure/net/src/grpc_network_service/mod.rs b/secure/net/src/grpc_network_service/mod.rs new file mode 100644 index 00000000000000..308913831aad2c --- /dev/null +++ b/secure/net/src/grpc_network_service/mod.rs @@ -0,0 +1,205 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::network_controller::{Message, MessageType}; +use aptos_logger::{error, info}; +use aptos_protos::remote_executor::v1::{ + network_message_service_client::NetworkMessageServiceClient, + network_message_service_server::{NetworkMessageService, NetworkMessageServiceServer}, + Empty, NetworkMessage, FILE_DESCRIPTOR_SET, +}; +use crossbeam_channel::Sender; +use std::{ + collections::HashMap, + net::SocketAddr, + sync::{Arc, Mutex}, +}; +use tokio::{runtime::Runtime, sync::oneshot}; +use tonic::{ + transport::{Channel, Server}, + Request, Response, Status, +}; + +pub struct GRPCNetworkMessageServiceServerWrapper { + inbound_handlers: Arc>>>, +} + +impl GRPCNetworkMessageServiceServerWrapper { + pub fn new(inbound_handlers: Arc>>>) -> Self { + Self { inbound_handlers } + } + + // Note: The object is consumed here. That is once the server is started, we cannot/should not + // use the object anymore + pub fn start( + self, + rt: &Runtime, + _service: String, + server_addr: SocketAddr, + rpc_timeout_ms: u64, + server_shutdown_rx: oneshot::Receiver<()>, + ) { + rt.spawn(async move { + self.start_async(server_addr, rpc_timeout_ms, server_shutdown_rx) + .await; + }); + } + + async fn start_async( + self, + server_addr: SocketAddr, + rpc_timeout_ms: u64, + server_shutdown_rx: oneshot::Receiver<()>, + ) { + let reflection_service = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build() + .unwrap(); + + info!("Starting Server async at {:?}", server_addr); + // NOTE: (1) serve_with_shutdown() starts the server, if successful the task does not return + // till the server is shutdown. Hence this should be called as a separate + // non-blocking task. Signal handler 'server_shutdown_rx' is needed to shutdown + // the server + // (2) There is no easy way to know if/when the server has started successfully. Hence + // we may need to implement a healthcheck service to check if the server is up + Server::builder() + .timeout(std::time::Duration::from_millis(rpc_timeout_ms)) + .add_service(NetworkMessageServiceServer::new(self)) + .add_service(reflection_service) + .serve_with_shutdown(server_addr, async { + server_shutdown_rx.await.ok(); + }) + .await + .unwrap(); + info!("Server shutdown at {:?}", server_addr); + } +} + +#[tonic::async_trait] +impl NetworkMessageService for GRPCNetworkMessageServiceServerWrapper { + async fn simple_msg_exchange( + &self, + request: Request, + ) -> Result, Status> { + let remote_addr = request.remote_addr(); + let network_message = request.into_inner(); + let msg = Message::new(network_message.message); + let message_type = MessageType::new(network_message.message_type); + + if let Some(handler) = self.inbound_handlers.lock().unwrap().get(&message_type) { + // Send the message to the registered handler + handler.send(msg).unwrap(); + } else { + error!( + "No handler registered for sender: {:?} and msg type {:?}", + remote_addr, message_type + ); + } + Ok(Response::new(Empty {})) + } +} + +pub struct GRPCNetworkMessageServiceClientWrapper { + remote_addr: String, + remote_channel: NetworkMessageServiceClient, +} + +impl GRPCNetworkMessageServiceClientWrapper { + pub fn new(rt: &Runtime, remote_addr: SocketAddr) -> Self { + Self { + remote_addr: remote_addr.to_string(), + remote_channel: rt + .block_on(async { Self::get_channel(format!("http://{}", remote_addr)).await }), + } + } + + async fn get_channel(remote_addr: String) -> NetworkMessageServiceClient { + info!("Trying to connect to remote server at {:?}", remote_addr); + let conn = tonic::transport::Endpoint::new(remote_addr) + .unwrap() + .connect_lazy(); + NetworkMessageServiceClient::new(conn) + } + + pub async fn send_message( + &mut self, + sender_addr: SocketAddr, + message: Message, + mt: &MessageType, + ) { + let request = tonic::Request::new(NetworkMessage { + message: message.data, + message_type: mt.get_type(), + }); + // TODO: Retry with exponential backoff on failures + match self.remote_channel.simple_msg_exchange(request).await { + Ok(_) => {}, + Err(e) => { + panic!( + "Error '{}' sending message to {} on node {:?}", + e, self.remote_addr, sender_addr + ); + }, + } + } +} + +#[test] +fn basic_test() { + use aptos_config::utils; + use std::{ + net::{IpAddr, Ipv4Addr}, + thread, + }; + + let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), utils::get_available_port()); + let message_type = "test_type".to_string(); + let server_handlers: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + + let (msg_tx, msg_rx) = crossbeam_channel::unbounded(); + server_handlers + .lock() + .unwrap() + .insert(MessageType::new(message_type.clone()), msg_tx); + let server = GRPCNetworkMessageServiceServerWrapper::new(server_handlers); + + let rt = Runtime::new().unwrap(); + let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel(); + server.start( + &rt, + "unit tester".to_string(), + server_addr, + 1000, + server_shutdown_rx, + ); + + let mut grpc_client = GRPCNetworkMessageServiceClientWrapper::new(&rt, server_addr); + + let client_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), utils::get_available_port()); + let test_message_content = "test1".as_bytes().to_vec(); + + // wait for the server to be ready before sending messages + // TODO: We need to implement retry on send_message failures such that we can pass this test + // without this sleep + thread::sleep(std::time::Duration::from_millis(10)); + + for _ in 0..2 { + rt.block_on(async { + grpc_client + .send_message( + client_addr, + Message::new(test_message_content.clone()), + &MessageType::new(message_type.clone()), + ) + .await; + }); + } + + for _ in 0..2 { + let received_msg = msg_rx.recv().unwrap(); + assert_eq!(received_msg.data, test_message_content); + } + server_shutdown_tx.send(()).unwrap(); +} diff --git a/secure/net/src/lib.rs b/secure/net/src/lib.rs index ce603a7a2f5180..5a9ee8726a56fc 100644 --- a/secure/net/src/lib.rs +++ b/secure/net/src/lib.rs @@ -16,6 +16,7 @@ //! Internally both the client and server leverage a NetworkStream that communications in blocks //! where a block is a length prefixed array of bytes. +pub mod grpc_network_service; pub mod network_controller; use aptos_logger::{info, trace, warn, Schema}; diff --git a/secure/net/src/network_controller/inbound_handler.rs b/secure/net/src/network_controller/inbound_handler.rs index 2ed2ed952bbf6f..955e54b013f962 100644 --- a/secure/net/src/network_controller/inbound_handler.rs +++ b/secure/net/src/network_controller/inbound_handler.rs @@ -1,34 +1,31 @@ // Copyright © Aptos Foundation use crate::{ - network_controller::{error::Error, Message, MessageType, NetworkMessage}, - NetworkServer, + grpc_network_service::GRPCNetworkMessageServiceServerWrapper, + network_controller::{Message, MessageType}, }; -use aptos_logger::{error, warn}; +use aptos_logger::warn; use crossbeam_channel::Sender; use std::{ collections::HashMap, net::SocketAddr, sync::{Arc, Mutex}, - thread, }; +use tokio::{runtime::Runtime, sync::oneshot}; pub struct InboundHandler { service: String, - server: Arc>, - // Used to route incoming messages to correct channel. + listen_addr: SocketAddr, + rpc_timeout_ms: u64, inbound_handlers: Arc>>>, } impl InboundHandler { - pub fn new(service: String, listen_addr: SocketAddr, timeout_ms: u64) -> Self { + pub fn new(service: String, listen_addr: SocketAddr, rpc_timeout_ms: u64) -> Self { Self { service: service.clone(), - server: Arc::new(Mutex::new(NetworkServer::new( - service, - listen_addr, - timeout_ms, - ))), + listen_addr, + rpc_timeout_ms, inbound_handlers: Arc::new(Mutex::new(HashMap::new())), } } @@ -43,24 +40,21 @@ impl InboundHandler { inbound_handlers.insert(MessageType::new(message_type), sender); } - pub fn start(&mut self) { - let inbound_handlers = self.inbound_handlers.clone(); // Clone the hashmap for the thread - let server_clone = self.server.clone(); // Clone the server to move into the thread - // Spawn a thread to handle incoming messages - let thread_name = format!("{}_network_inbound_handler", self.service); - let builder = thread::Builder::new().name(thread_name); - builder - .spawn(move || { - loop { - // Receive incoming messages from the server - if let Err(e) = - Self::process_one_incoming_message(&server_clone, &inbound_handlers) - { - error!("Error processing incoming messages: {:?}", e); - } - } - }) - .expect("Failed to spawn network_inbound_handler thread"); + pub fn start(&self, rt: &Runtime) -> Option> { + if self.inbound_handlers.lock().unwrap().is_empty() { + return None; + } + + let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel(); + // The server is started in a separate task + GRPCNetworkMessageServiceServerWrapper::new(self.inbound_handlers.clone()).start( + rt, + self.service.clone(), + self.listen_addr, + self.rpc_timeout_ms, + server_shutdown_rx, + ); + Some(server_shutdown_tx) } // Helper function to short-circuit the network message not to be sent over the network for self messages @@ -73,28 +67,4 @@ impl InboundHandler { warn!("No handler registered for message type: {:?}", message_type); } } - - fn process_one_incoming_message( - network_server: &Arc>, - inbound_handlers: &Arc>>>, - ) -> Result<(), Error> { - let message = network_server.lock().unwrap().read()?; - let network_msg: NetworkMessage = bcs::from_bytes(&message)?; - // Get the sender's SocketAddr from the received message - let sender = network_msg.sender; - let msg = network_msg.message; - let message_type = network_msg.message_type; - - // Check if there is a registered handler for the sender - if let Some(handler) = inbound_handlers.lock().unwrap().get(&message_type) { - // Send the message to the registered handler - handler.send(msg)?; - } else { - warn!( - "No handler registered for sender: {:?} and msg type {:?}", - sender, message_type - ); - } - Ok(()) - } } diff --git a/secure/net/src/network_controller/mod.rs b/secure/net/src/network_controller/mod.rs index 7a851746732759..daf79d21bab40e 100644 --- a/secure/net/src/network_controller/mod.rs +++ b/secure/net/src/network_controller/mod.rs @@ -3,12 +3,14 @@ use crate::network_controller::{ inbound_handler::InboundHandler, outbound_handler::OutboundHandler, }; +use aptos_logger::{info, warn}; use crossbeam_channel::{unbounded, Receiver, Sender}; use serde::{Deserialize, Serialize}; use std::{ net::SocketAddr, sync::{Arc, Mutex}, }; +use tokio::{runtime::Runtime, sync::oneshot}; mod error; mod inbound_handler; @@ -64,10 +66,26 @@ impl Message { } } +/// NetworkController is the main entry point for sending and receiving messages over the network. +/// 1. If a node acts as both client and server, albeit in different contexts, GRPC needs separate +/// runtimes for client context and server context. Otherwise we a hang in GRPC. This seems to be +/// an internal bug in GRPC. +/// 2. We want to use tokio runtimes because it is best for async IO and tonic GRPC +/// implementation is async. However, we want the rest of the system (remote executor service) +/// to use rayon thread pools because it is best for CPU bound tasks. +/// 3. NetworkController, InboundHandler and OutboundHandler work as a bridge between the sync and +/// async worlds. +/// 4. We need to shutdown all the async tasks spawned by the NetworkController runtimes, otherwise +/// the program will hang, or have resource leaks. #[allow(dead_code)] pub struct NetworkController { inbound_handler: Arc>, outbound_handler: OutboundHandler, + inbound_rpc_runtime: Runtime, + outbound_rpc_runtime: Runtime, + inbound_server_shutdown_tx: Option>, + outbound_task_shutdown_tx: Option>, + listen_addr: SocketAddr, } impl NetworkController { @@ -78,9 +96,16 @@ impl NetworkController { timeout_ms, ))); let outbound_handler = OutboundHandler::new(service, listen_addr, inbound_handler.clone()); + info!("Network controller created for node {}", listen_addr); Self { inbound_handler, outbound_handler, + inbound_rpc_runtime: Runtime::new().unwrap(), + outbound_rpc_runtime: Runtime::new().unwrap(), + // we initialize the shutdown handles when we start the network controller + inbound_server_shutdown_tx: None, + outbound_task_shutdown_tx: None, + listen_addr, } } @@ -109,8 +134,32 @@ impl NetworkController { } pub fn start(&mut self) { - self.inbound_handler.lock().unwrap().start(); - self.outbound_handler.start(); + info!( + "Starting network controller started for at {}", + self.listen_addr + ); + self.inbound_server_shutdown_tx = self + .inbound_handler + .lock() + .unwrap() + .start(&self.inbound_rpc_runtime); + self.outbound_task_shutdown_tx = self.outbound_handler.start(&self.outbound_rpc_runtime); + } + + // TODO: This is still not a very clean shutdown. We don't wait for the full shutdown after + // sending the signal. May not matter much for now because we shutdown before exiting the + // process. Ideally, we want to fix this. + pub fn shutdown(&mut self) { + info!("Shutting down network controller at {}", self.listen_addr); + if let Some(shutdown_signal) = self.inbound_server_shutdown_tx.take() { + shutdown_signal.send(()).unwrap(); + } + + if let Some(shutdown_signal) = self.outbound_task_shutdown_tx.take() { + shutdown_signal.send(Message::new(vec![])).unwrap_or_else(|_| { + warn!("Failed to send shutdown signal to outbound task; probably already shutdown"); + }) + } } } @@ -118,7 +167,10 @@ impl NetworkController { mod tests { use crate::network_controller::{Message, NetworkController}; use aptos_config::utils; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + thread, + }; #[test] fn test_basic_send_receive() { @@ -144,6 +196,10 @@ mod tests { network_controller1.start(); network_controller2.start(); + // wait for the server to be ready to serve + // TODO: We need to pass this test without this sleep + thread::sleep(std::time::Duration::from_millis(10)); + let test1_message = "test1".as_bytes().to_vec(); test1_sender .send(Message::new(test1_message.clone())) @@ -159,5 +215,8 @@ mod tests { let received_test2_message = test2_receiver.recv().unwrap(); assert_eq!(received_test2_message.data, test2_message); + + network_controller1.shutdown(); + network_controller2.shutdown(); } } diff --git a/secure/net/src/network_controller/outbound_handler.rs b/secure/net/src/network_controller/outbound_handler.rs index dc1e4f44544b48..d1ce53e1842cfe 100644 --- a/secure/net/src/network_controller/outbound_handler.rs +++ b/secure/net/src/network_controller/outbound_handler.rs @@ -1,24 +1,25 @@ // Copyright © Aptos Foundation use crate::{ - network_controller::{inbound_handler::InboundHandler, Message, MessageType, NetworkMessage}, - NetworkClient, + grpc_network_service::GRPCNetworkMessageServiceClientWrapper, + network_controller::{inbound_handler::InboundHandler, Message, MessageType}, }; -use aptos_retrier::{fixed_retry_strategy, retry}; -use crossbeam_channel::{Receiver, Select}; +use aptos_logger::{info, warn}; +use crossbeam_channel::{unbounded, Receiver, Select, Sender}; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, + mem, net::SocketAddr, sync::{Arc, Mutex}, - thread, }; +use tokio::runtime::Runtime; pub struct OutboundHandler { - service: String, - network_clients: Arc>>, + _service: String, + remote_addresses: HashSet, address: SocketAddr, // Used to route outgoing messages to correct network client with the correct message type - handlers: Arc, SocketAddr, MessageType)>>>, + handlers: Vec<(Receiver, SocketAddr, MessageType)>, inbound_handler: Arc>, } @@ -29,84 +30,127 @@ impl OutboundHandler { inbound_handler: Arc>, ) -> Self { Self { - service, - network_clients: Arc::new(Mutex::new(HashMap::new())), + _service: service, + remote_addresses: HashSet::new(), address: listen_addr, - handlers: Arc::new(Mutex::new(Vec::new())), + handlers: Vec::new(), inbound_handler, } } pub fn register_handler( - &self, + &mut self, message_type: String, remote_addr: SocketAddr, receiver: Receiver, ) { - // Create a remote client if it doesn't exist - self.network_clients - .lock() - .unwrap() - .entry(remote_addr) - .or_insert_with(|| NetworkClient::new(message_type.clone(), remote_addr, 5000)); - let mut handlers = self.handlers.lock().unwrap(); - handlers.push((receiver, remote_addr, MessageType::new(message_type))); + self.remote_addresses.insert(remote_addr); + self.handlers + .push((receiver, remote_addr, MessageType::new(message_type))); } - pub fn start(&mut self) { - let outbound_handlers = self.handlers.clone(); + pub fn start(&mut self, rt: &Runtime) -> Option> { + if self.handlers.is_empty() { + return None; + } + + // Register a signal handler to stop the outbound task + let (stop_signal_tx, stop_signal_rx) = unbounded(); + self.handlers.push(( + stop_signal_rx, + self.address, + MessageType::new("stop_task".to_string()), + )); + + // Create a grpc client for each remote address + let mut grpc_clients: HashMap = + HashMap::new(); + self.remote_addresses.iter().for_each(|remote_addr| { + grpc_clients.insert( + *remote_addr, + GRPCNetworkMessageServiceClientWrapper::new(rt, *remote_addr), + ); + }); + + // Prepare for objects to be moved into the async block (&mut self cannot be moved into the + // async block) let address = self.address; - let network_clients = self.network_clients.clone(); - let thread_name = format!("{}_network_outbound_handler", self.service); - let builder = thread::Builder::new().name(thread_name); let inbound_handler = self.inbound_handler.clone(); - builder - .spawn(move || loop { - Self::process_one_outgoing_message( - outbound_handlers.clone(), - network_clients.clone(), - &address, - inbound_handler.clone(), - ) - }) - .expect("Failed to spawn outbound handler thread"); + // Moving the handlers out of self is fine because once 'start()' is called we do not intend + // to register any more handlers. A reference count like Arc has issues of being + // used across sync and async boundaries, and also not the most efficient because we pay + // the cost of the mutex when there is no contention + let outbound_handlers = mem::take(self.handlers.as_mut()); + + // TODO: Consider using multiple tasks for outbound handlers + rt.spawn(async move { + info!("Starting outbound handler at {}", address.to_string()); + Self::process_one_outgoing_message( + outbound_handlers, + &address, + inbound_handler.clone(), + &mut grpc_clients, + ) + .await; + info!("Stopping outbound handler at {}", address.to_string()); + }); + Some(stop_signal_tx) } - fn process_one_outgoing_message( - outbound_handlers: Arc, SocketAddr, MessageType)>>>, - network_clients: Arc>>, + async fn process_one_outgoing_message( + outbound_handlers: Vec<(Receiver, SocketAddr, MessageType)>, socket_addr: &SocketAddr, inbound_handler: Arc>, + grpc_clients: &mut HashMap, ) { - let mut select = Select::new(); - let handlers = outbound_handlers.lock().unwrap(); + loop { + let mut select = Select::new(); + for (receiver, _, _) in outbound_handlers.iter() { + select.recv(receiver); + } - for (receiver, _, _) in handlers.iter() { - select.recv(receiver); - } - let oper = select.select(); - let index = oper.index(); - let msg = oper.recv(&handlers[index].0).unwrap(); - let remote_addr = &handlers[index].1; - let message_type = &handlers[index].2; - if remote_addr == socket_addr { - // If the remote address is the same as the local address, then we are sending a message to ourselves - // so we should just pass it to the inbound handler - inbound_handler - .lock() - .unwrap() - .send_incoming_message_to_handler(message_type, msg); - return; - } - let mut binding = network_clients.lock().unwrap(); - let network_client = binding.get_mut(remote_addr).unwrap(); - let msg = bcs::to_bytes(&NetworkMessage::new( - *socket_addr, - msg, - message_type.clone(), - )) - .unwrap(); + let index; + let msg; + { + let oper = select.select(); + index = oper.index(); + match oper.recv(&outbound_handlers[index].0) { + Ok(m) => { + msg = m; + }, + Err(e) => { + warn!( + "{:?} for outbound handler on {:?}. This can happen in shutdown,\ + but should not happen otherwise", + e.to_string(), + socket_addr + ); + return; + }, + } + } + + let remote_addr = &outbound_handlers[index].1; + let message_type = &outbound_handlers[index].2; - retry(fixed_retry_strategy(5, 20), || network_client.write(&msg)).unwrap(); + if message_type.get_type() == "stop_task" { + return; + } + + if remote_addr == socket_addr { + // If the remote address is the same as the local address, then we are sending a message to ourselves + // so we should just pass it to the inbound handler + inbound_handler + .lock() + .unwrap() + .send_incoming_message_to_handler(message_type, msg); + } else { + grpc_clients + .get_mut(remote_addr) + .unwrap() + .send_message(*socket_addr, msg, message_type) + .await; + } + } } }