From c085d886d03d8ec43017331093417fb65ffa48c2 Mon Sep 17 00:00:00 2001 From: manudhundi Date: Thu, 28 Sep 2023 19:07:20 -0700 Subject: [PATCH] [Sharded-Execution-GRPC] Addressing comments of previous commit --- .../remote_executor/v1/network_msg.proto | 7 +- .../src/pb/aptos.remote_executor.v1.rs | 85 +++++++++---------- .../src/pb/aptos.remote_executor.v1.serde.rs | 20 +---- .../src/pb/aptos.remote_executor.v1.tonic.rs | 46 +++++----- crates/aptos-protos/src/pb/mod.rs | 2 + execution/executor-service/src/test_utils.rs | 65 +++++++++++++- execution/executor-service/src/tests.rs | 22 +++++ secure/net/src/grpc_network_service/mod.rs | 48 ++++++----- .../src/network_controller/inbound_handler.rs | 9 +- .../network_controller/outbound_handler.rs | 19 +++-- 10 files changed, 198 insertions(+), 125 deletions(-) diff --git a/crates/aptos-protos/proto/aptos/remote_executor/v1/network_msg.proto b/crates/aptos-protos/proto/aptos/remote_executor/v1/network_msg.proto index b87b4a2d693b7d..5afe67579f1e72 100644 --- a/crates/aptos-protos/proto/aptos/remote_executor/v1/network_msg.proto +++ b/crates/aptos-protos/proto/aptos/remote_executor/v1/network_msg.proto @@ -6,15 +6,14 @@ syntax = "proto3"; package aptos.remote_executor.v1; message NetworkMessage { - string sender_addr = 1; - bytes message = 2; - string message_type = 3; + bytes message = 1; + string message_type = 2; } message Empty { } -service RemoteExecution { +service NetworkMessageService { rpc SimpleMsgExchange(NetworkMessage) returns (Empty); } \ No newline at end of file diff --git a/crates/aptos-protos/src/pb/aptos.remote_executor.v1.rs b/crates/aptos-protos/src/pb/aptos.remote_executor.v1.rs index 54892901643934..11011e96c5eea6 100644 --- a/crates/aptos-protos/src/pb/aptos.remote_executor.v1.rs +++ b/crates/aptos-protos/src/pb/aptos.remote_executor.v1.rs @@ -4,11 +4,9 @@ #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct NetworkMessage { - #[prost(string, tag="1")] - pub sender_addr: ::prost::alloc::string::String, - #[prost(bytes="vec", tag="2")] + #[prost(bytes="vec", tag="1")] pub message: ::prost::alloc::vec::Vec, - #[prost(string, tag="3")] + #[prost(string, tag="2")] pub message_type: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] @@ -17,51 +15,46 @@ pub struct Empty { } /// Encoded file descriptor set for the `aptos.remote_executor.v1` package pub const FILE_DESCRIPTOR_SET: &[u8] = &[ - 0x0a, 0xc3, 0x05, 0x0a, 0x2a, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, + 0x0a, 0xf1, 0x04, 0x0a, 0x2a, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2f, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x2f, 0x76, 0x31, 0x2f, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x5f, 0x6d, 0x73, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x18, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x65, 0x78, - 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x2e, 0x76, 0x31, 0x22, 0x6e, 0x0a, 0x0e, 0x4e, 0x65, 0x74, - 0x77, 0x6f, 0x72, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, - 0x65, 0x6e, 0x64, 0x65, 0x72, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x0a, 0x73, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x12, 0x18, 0x0a, 0x07, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x54, 0x79, 0x70, 0x65, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, - 0x74, 0x79, 0x32, 0x71, 0x0a, 0x0f, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x45, 0x78, 0x65, 0x63, - 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x5e, 0x0a, 0x11, 0x53, 0x69, 0x6d, 0x70, 0x6c, 0x65, 0x4d, - 0x73, 0x67, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x28, 0x2e, 0x61, 0x70, 0x74, - 0x6f, 0x73, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, - 0x6f, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x72, 0x65, 0x6d, - 0x6f, 0x74, 0x65, 0x5f, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x2e, 0x76, 0x31, 0x2e, - 0x45, 0x6d, 0x70, 0x74, 0x79, 0x4a, 0x86, 0x03, 0x0a, 0x06, 0x12, 0x04, 0x03, 0x00, 0x13, 0x01, - 0x0a, 0x4e, 0x0a, 0x01, 0x0c, 0x12, 0x03, 0x03, 0x00, 0x12, 0x32, 0x44, 0x20, 0x43, 0x6f, 0x70, - 0x79, 0x72, 0x69, 0x67, 0x68, 0x74, 0x20, 0xc2, 0xa9, 0x20, 0x41, 0x70, 0x74, 0x6f, 0x73, 0x20, - 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x0a, 0x20, 0x53, 0x50, 0x44, 0x58, - 0x2d, 0x4c, 0x69, 0x63, 0x65, 0x6e, 0x73, 0x65, 0x2d, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, - 0x69, 0x65, 0x72, 0x3a, 0x20, 0x41, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2d, 0x32, 0x2e, 0x30, 0x0a, - 0x0a, 0x08, 0x0a, 0x01, 0x02, 0x12, 0x03, 0x05, 0x00, 0x21, 0x0a, 0x0a, 0x0a, 0x02, 0x04, 0x00, - 0x12, 0x04, 0x07, 0x00, 0x0b, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x00, 0x01, 0x12, 0x03, 0x07, - 0x08, 0x16, 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x00, 0x02, 0x00, 0x12, 0x03, 0x08, 0x04, 0x1b, 0x0a, - 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x05, 0x12, 0x03, 0x08, 0x04, 0x0a, 0x0a, 0x0c, 0x0a, - 0x05, 0x04, 0x00, 0x02, 0x00, 0x01, 0x12, 0x03, 0x08, 0x0b, 0x16, 0x0a, 0x0c, 0x0a, 0x05, 0x04, - 0x00, 0x02, 0x00, 0x03, 0x12, 0x03, 0x08, 0x19, 0x1a, 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x00, 0x02, - 0x01, 0x12, 0x03, 0x09, 0x04, 0x16, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x01, 0x05, 0x12, - 0x03, 0x09, 0x04, 0x09, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x01, 0x01, 0x12, 0x03, 0x09, - 0x0a, 0x11, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x01, 0x03, 0x12, 0x03, 0x09, 0x14, 0x15, - 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x00, 0x02, 0x02, 0x12, 0x03, 0x0a, 0x04, 0x1c, 0x0a, 0x0c, 0x0a, - 0x05, 0x04, 0x00, 0x02, 0x02, 0x05, 0x12, 0x03, 0x0a, 0x04, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, - 0x00, 0x02, 0x02, 0x01, 0x12, 0x03, 0x0a, 0x0b, 0x17, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, - 0x02, 0x03, 0x12, 0x03, 0x0a, 0x1a, 0x1b, 0x0a, 0x0a, 0x0a, 0x02, 0x04, 0x01, 0x12, 0x04, 0x0d, - 0x00, 0x0f, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x01, 0x01, 0x12, 0x03, 0x0d, 0x08, 0x0d, 0x0a, - 0x0a, 0x0a, 0x02, 0x06, 0x00, 0x12, 0x04, 0x11, 0x00, 0x13, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x06, - 0x00, 0x01, 0x12, 0x03, 0x11, 0x08, 0x17, 0x0a, 0x0b, 0x0a, 0x04, 0x06, 0x00, 0x02, 0x00, 0x12, - 0x03, 0x12, 0x04, 0x3a, 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x01, 0x12, 0x03, 0x12, - 0x08, 0x19, 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x02, 0x12, 0x03, 0x12, 0x1a, 0x28, - 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x03, 0x12, 0x03, 0x12, 0x33, 0x38, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x2e, 0x76, 0x31, 0x22, 0x4d, 0x0a, 0x0e, 0x4e, 0x65, 0x74, + 0x77, 0x6f, 0x72, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x54, 0x79, 0x70, 0x65, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x32, 0x77, 0x0a, 0x15, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5e, 0x0a, 0x11, 0x53, 0x69, + 0x6d, 0x70, 0x6c, 0x65, 0x4d, 0x73, 0x67, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, + 0x28, 0x2e, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x65, + 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x4e, 0x65, 0x74, 0x77, 0x6f, + 0x72, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x1f, 0x2e, 0x61, 0x70, 0x74, 0x6f, + 0x73, 0x2e, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, + 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x4a, 0xcf, 0x02, 0x0a, 0x06, 0x12, + 0x04, 0x03, 0x00, 0x12, 0x01, 0x0a, 0x4e, 0x0a, 0x01, 0x0c, 0x12, 0x03, 0x03, 0x00, 0x12, 0x32, + 0x44, 0x20, 0x43, 0x6f, 0x70, 0x79, 0x72, 0x69, 0x67, 0x68, 0x74, 0x20, 0xc2, 0xa9, 0x20, 0x41, + 0x70, 0x74, 0x6f, 0x73, 0x20, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x0a, + 0x20, 0x53, 0x50, 0x44, 0x58, 0x2d, 0x4c, 0x69, 0x63, 0x65, 0x6e, 0x73, 0x65, 0x2d, 0x49, 0x64, + 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x3a, 0x20, 0x41, 0x70, 0x61, 0x63, 0x68, 0x65, + 0x2d, 0x32, 0x2e, 0x30, 0x0a, 0x0a, 0x08, 0x0a, 0x01, 0x02, 0x12, 0x03, 0x05, 0x00, 0x21, 0x0a, + 0x0a, 0x0a, 0x02, 0x04, 0x00, 0x12, 0x04, 0x07, 0x00, 0x0a, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x04, + 0x00, 0x01, 0x12, 0x03, 0x07, 0x08, 0x16, 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x00, 0x02, 0x00, 0x12, + 0x03, 0x08, 0x04, 0x16, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x05, 0x12, 0x03, 0x08, + 0x04, 0x09, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x01, 0x12, 0x03, 0x08, 0x0a, 0x11, + 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x03, 0x12, 0x03, 0x08, 0x14, 0x15, 0x0a, 0x0b, + 0x0a, 0x04, 0x04, 0x00, 0x02, 0x01, 0x12, 0x03, 0x09, 0x04, 0x1c, 0x0a, 0x0c, 0x0a, 0x05, 0x04, + 0x00, 0x02, 0x01, 0x05, 0x12, 0x03, 0x09, 0x04, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, + 0x01, 0x01, 0x12, 0x03, 0x09, 0x0b, 0x17, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x01, 0x03, + 0x12, 0x03, 0x09, 0x1a, 0x1b, 0x0a, 0x0a, 0x0a, 0x02, 0x04, 0x01, 0x12, 0x04, 0x0c, 0x00, 0x0e, + 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x01, 0x01, 0x12, 0x03, 0x0c, 0x08, 0x0d, 0x0a, 0x0a, 0x0a, + 0x02, 0x06, 0x00, 0x12, 0x04, 0x10, 0x00, 0x12, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x06, 0x00, 0x01, + 0x12, 0x03, 0x10, 0x08, 0x1d, 0x0a, 0x0b, 0x0a, 0x04, 0x06, 0x00, 0x02, 0x00, 0x12, 0x03, 0x11, + 0x04, 0x3a, 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x01, 0x12, 0x03, 0x11, 0x08, 0x19, + 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x02, 0x12, 0x03, 0x11, 0x1a, 0x28, 0x0a, 0x0c, + 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x03, 0x12, 0x03, 0x11, 0x33, 0x38, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, ]; include!("aptos.remote_executor.v1.serde.rs"); include!("aptos.remote_executor.v1.tonic.rs"); diff --git a/crates/aptos-protos/src/pb/aptos.remote_executor.v1.serde.rs b/crates/aptos-protos/src/pb/aptos.remote_executor.v1.serde.rs index 9fcb9a93d6e178..3c799d6e8c7ddb 100644 --- a/crates/aptos-protos/src/pb/aptos.remote_executor.v1.serde.rs +++ b/crates/aptos-protos/src/pb/aptos.remote_executor.v1.serde.rs @@ -80,9 +80,6 @@ impl serde::Serialize for NetworkMessage { { use serde::ser::SerializeStruct; let mut len = 0; - if !self.sender_addr.is_empty() { - len += 1; - } if !self.message.is_empty() { len += 1; } @@ -90,9 +87,6 @@ impl serde::Serialize for NetworkMessage { len += 1; } let mut struct_ser = serializer.serialize_struct("aptos.remote_executor.v1.NetworkMessage", len)?; - if !self.sender_addr.is_empty() { - struct_ser.serialize_field("senderAddr", &self.sender_addr)?; - } if !self.message.is_empty() { struct_ser.serialize_field("message", pbjson::private::base64::encode(&self.message).as_str())?; } @@ -109,8 +103,6 @@ impl<'de> serde::Deserialize<'de> for NetworkMessage { D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ - "sender_addr", - "senderAddr", "message", "message_type", "messageType", @@ -118,7 +110,6 @@ impl<'de> serde::Deserialize<'de> for NetworkMessage { #[allow(clippy::enum_variant_names)] enum GeneratedField { - SenderAddr, Message, MessageType, } @@ -142,7 +133,6 @@ impl<'de> serde::Deserialize<'de> for NetworkMessage { E: serde::de::Error, { match value { - "senderAddr" | "sender_addr" => Ok(GeneratedField::SenderAddr), "message" => Ok(GeneratedField::Message), "messageType" | "message_type" => Ok(GeneratedField::MessageType), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), @@ -164,22 +154,15 @@ impl<'de> serde::Deserialize<'de> for NetworkMessage { where V: serde::de::MapAccess<'de>, { - let mut sender_addr__ = None; let mut message__ = None; let mut message_type__ = None; while let Some(k) = map.next_key()? { match k { - GeneratedField::SenderAddr => { - if sender_addr__.is_some() { - return Err(serde::de::Error::duplicate_field("senderAddr")); - } - sender_addr__ = Some(map.next_value()?); - } GeneratedField::Message => { if message__.is_some() { return Err(serde::de::Error::duplicate_field("message")); } - message__ = + message__ = Some(map.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) ; } @@ -192,7 +175,6 @@ impl<'de> serde::Deserialize<'de> for NetworkMessage { } } Ok(NetworkMessage { - sender_addr: sender_addr__.unwrap_or_default(), message: message__.unwrap_or_default(), message_type: message_type__.unwrap_or_default(), }) diff --git a/crates/aptos-protos/src/pb/aptos.remote_executor.v1.tonic.rs b/crates/aptos-protos/src/pb/aptos.remote_executor.v1.tonic.rs index 75d931d741fc4b..84cc02c328d07a 100644 --- a/crates/aptos-protos/src/pb/aptos.remote_executor.v1.tonic.rs +++ b/crates/aptos-protos/src/pb/aptos.remote_executor.v1.tonic.rs @@ -2,16 +2,16 @@ // @generated /// Generated client implementations. -pub mod remote_execution_client { +pub mod network_message_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; use tonic::codegen::http::Uri; /// #[derive(Debug, Clone)] - pub struct RemoteExecutionClient { + pub struct NetworkMessageServiceClient { inner: tonic::client::Grpc, } - impl RemoteExecutionClient { + impl NetworkMessageServiceClient { /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where @@ -22,7 +22,7 @@ pub mod remote_execution_client { Ok(Self::new(conn)) } } - impl RemoteExecutionClient + impl NetworkMessageServiceClient where T: tonic::client::GrpcService, T::Error: Into, @@ -40,7 +40,7 @@ pub mod remote_execution_client { pub fn with_interceptor( inner: T, interceptor: F, - ) -> RemoteExecutionClient> + ) -> NetworkMessageServiceClient> where F: tonic::service::Interceptor, T::ResponseBody: Default, @@ -54,7 +54,7 @@ pub mod remote_execution_client { http::Request, >>::Error: Into + Send + Sync, { - RemoteExecutionClient::new(InterceptedService::new(inner, interceptor)) + NetworkMessageServiceClient::new(InterceptedService::new(inner, interceptor)) } /// Compress requests with the given encoding. /// @@ -103,13 +103,13 @@ pub mod remote_execution_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/aptos.remote_executor.v1.RemoteExecution/SimpleMsgExchange", + "/aptos.remote_executor.v1.NetworkMessageService/SimpleMsgExchange", ); let mut req = request.into_request(); req.extensions_mut() .insert( GrpcMethod::new( - "aptos.remote_executor.v1.RemoteExecution", + "aptos.remote_executor.v1.NetworkMessageService", "SimpleMsgExchange", ), ); @@ -118,12 +118,12 @@ pub mod remote_execution_client { } } /// Generated server implementations. -pub mod remote_execution_server { +pub mod network_message_service_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - /// Generated trait containing gRPC methods that should be implemented for use with RemoteExecutionServer. + /// Generated trait containing gRPC methods that should be implemented for use with NetworkMessageServiceServer. #[async_trait] - pub trait RemoteExecution: Send + Sync + 'static { + pub trait NetworkMessageService: Send + Sync + 'static { /// async fn simple_msg_exchange( &self, @@ -132,7 +132,7 @@ pub mod remote_execution_server { } /// #[derive(Debug)] - pub struct RemoteExecutionServer { + pub struct NetworkMessageServiceServer { inner: _Inner, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, @@ -140,7 +140,7 @@ pub mod remote_execution_server { max_encoding_message_size: Option, } struct _Inner(Arc); - impl RemoteExecutionServer { + impl NetworkMessageServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } @@ -192,9 +192,10 @@ pub mod remote_execution_server { self } } - impl tonic::codegen::Service> for RemoteExecutionServer + impl tonic::codegen::Service> + for NetworkMessageServiceServer where - T: RemoteExecution, + T: NetworkMessageService, B: Body + Send + 'static, B::Error: Into + Send + 'static, { @@ -210,11 +211,11 @@ pub mod remote_execution_server { fn call(&mut self, req: http::Request) -> Self::Future { let inner = self.inner.clone(); match req.uri().path() { - "/aptos.remote_executor.v1.RemoteExecution/SimpleMsgExchange" => { + "/aptos.remote_executor.v1.NetworkMessageService/SimpleMsgExchange" => { #[allow(non_camel_case_types)] - struct SimpleMsgExchangeSvc(pub Arc); + struct SimpleMsgExchangeSvc(pub Arc); impl< - T: RemoteExecution, + T: NetworkMessageService, > tonic::server::UnaryService for SimpleMsgExchangeSvc { type Response = super::Empty; @@ -271,7 +272,7 @@ pub mod remote_execution_server { } } } - impl Clone for RemoteExecutionServer { + impl Clone for NetworkMessageServiceServer { fn clone(&self) -> Self { let inner = self.inner.clone(); Self { @@ -283,7 +284,7 @@ pub mod remote_execution_server { } } } - impl Clone for _Inner { + impl Clone for _Inner { fn clone(&self) -> Self { Self(Arc::clone(&self.0)) } @@ -293,7 +294,8 @@ pub mod remote_execution_server { write!(f, "{:?}", self.0) } } - impl tonic::server::NamedService for RemoteExecutionServer { - const NAME: &'static str = "aptos.remote_executor.v1.RemoteExecution"; + impl tonic::server::NamedService + for NetworkMessageServiceServer { + const NAME: &'static str = "aptos.remote_executor.v1.NetworkMessageService"; } } diff --git a/crates/aptos-protos/src/pb/mod.rs b/crates/aptos-protos/src/pb/mod.rs index 12fdefa41abf07..053c32a7b9aebf 100644 --- a/crates/aptos-protos/src/pb/mod.rs +++ b/crates/aptos-protos/src/pb/mod.rs @@ -1,3 +1,5 @@ +// Copyright © Aptos Foundation + // @generated pub mod aptos { pub mod indexer { diff --git a/execution/executor-service/src/test_utils.rs b/execution/executor-service/src/test_utils.rs index 377b9c90fc3ca7..da7863a7da2cd3 100644 --- a/execution/executor-service/src/test_utils.rs +++ b/execution/executor-service/src/test_utils.rs @@ -1,11 +1,13 @@ // Copyright © Aptos Foundation use aptos_block_partitioner::{v2::config::PartitionerV2Config, PartitionerConfig}; +use aptos_crypto::hash::CryptoHash; use aptos_language_e2e_tests::{ account::AccountData, common_transactions::peer_to_peer_txn, data_store::FakeDataStore, executor::FakeExecutor, }; use aptos_types::{ + account_address::AccountAddress, block_executor::partitioner::PartitionedTransactions, state_store::state_key::StateKeyInner, transaction::{analyzed_transaction::AnalyzedTransaction, Transaction, TransactionOutput}, @@ -14,7 +16,14 @@ use aptos_vm::{ sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}, AptosVM, VMExecutor, }; -use std::sync::Arc; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +pub fn generate_account_at(executor: &mut FakeExecutor, address: AccountAddress) -> AccountData { + executor.new_account_data_at(address) +} fn generate_non_conflicting_sender_receiver( executor: &mut FakeExecutor, @@ -95,7 +104,7 @@ pub fn compare_txn_outputs( pub fn test_sharded_block_executor_no_conflict>( sharded_block_executor: ShardedBlockExecutor, ) { - let num_txns = 10; + let num_txns = 400; let num_shards = sharded_block_executor.num_shards(); let mut executor = FakeExecutor::from_head_genesis(); let mut transactions = Vec::new(); @@ -127,3 +136,55 @@ pub fn test_sharded_block_executor_no_conflict> .unwrap(); compare_txn_outputs(unsharded_txn_output, sharded_txn_output); } + +pub fn sharded_block_executor_with_conflict>( + sharded_block_executor: ShardedBlockExecutor, + concurrency: usize, +) { + let num_txns = 800; + let num_shards = sharded_block_executor.num_shards(); + let num_accounts = 80; + let mut executor = FakeExecutor::from_head_genesis(); + let mut transactions = Vec::new(); + let mut accounts = Vec::new(); + let mut txn_hash_to_account = HashMap::new(); + for _ in 0..num_accounts { + let account = generate_account_at(&mut executor, AccountAddress::random()); + accounts.push(Mutex::new(account)); + } + for i in 1..num_txns / num_accounts { + for j in 0..num_accounts { + let sender = &mut accounts[j].lock().unwrap(); + let sender_addr = *sender.address(); + let receiver = &accounts[(j + i) % num_accounts].lock().unwrap(); + let transfer_amount = 1_000; + let txn = generate_p2p_txn(sender, receiver, transfer_amount); + txn_hash_to_account.insert(txn.transaction().hash(), sender_addr); + transactions.push(txn) + } + } + + let partitioner = PartitionerV2Config::default() + .max_partitioning_rounds(2) + .cross_shard_dep_avoid_threshold(0.9) + .partition_last_round(true) + .build(); + let partitioned_txns = partitioner.partition(transactions.clone(), num_shards); + + let execution_ordered_txns = PartitionedTransactions::flatten(partitioned_txns.clone()) + .into_iter() + .map(|t| t.into_txn()) + .collect(); + let sharded_txn_output = sharded_block_executor + .execute_block( + Arc::new(executor.data_store().clone()), + partitioned_txns, + concurrency, + None, + ) + .unwrap(); + + let unsharded_txn_output = + AptosVM::execute_block(execution_ordered_txns, executor.data_store(), None).unwrap(); + compare_txn_outputs(unsharded_txn_output, sharded_txn_output); +} diff --git a/execution/executor-service/src/tests.rs b/execution/executor-service/src/tests.rs index 8130afaf7bf46d..e206fedbc71677 100644 --- a/execution/executor-service/src/tests.rs +++ b/execution/executor-service/src/tests.rs @@ -75,3 +75,25 @@ fn test_sharded_block_executor_no_conflict() { executor_service.shutdown(); }); } + +#[test] +fn test_sharded_block_executor_with_conflict() { + use std::thread; + + let num_shards = 8; + let (mut controller, executor_client, mut executor_services) = + create_thread_remote_executor_shards(num_shards, Some(2)); + controller.start(); + let sharded_block_executor = ShardedBlockExecutor::new(executor_client); + + // wait for the servers to be ready before sending messages + // TODO: We need to pass this test without this sleep + thread::sleep(std::time::Duration::from_millis(10)); + + test_utils::sharded_block_executor_with_conflict(sharded_block_executor, 2); + + controller.shutdown(); + executor_services.iter_mut().for_each(|executor_service| { + executor_service.shutdown(); + }); +} diff --git a/secure/net/src/grpc_network_service/mod.rs b/secure/net/src/grpc_network_service/mod.rs index 4ab645b8d32873..4244ffbb56b14f 100644 --- a/secure/net/src/grpc_network_service/mod.rs +++ b/secure/net/src/grpc_network_service/mod.rs @@ -4,8 +4,8 @@ use crate::network_controller::{Message, MessageType}; use aptos_logger::{error, info}; use aptos_protos::remote_executor::v1::{ - remote_execution_client::RemoteExecutionClient, - remote_execution_server::{RemoteExecution, RemoteExecutionServer}, + network_message_service_client::NetworkMessageServiceClient, + network_message_service_server::{NetworkMessageService, NetworkMessageServiceServer}, Empty, NetworkMessage, FILE_DESCRIPTOR_SET, }; use crossbeam_channel::Sender; @@ -20,11 +20,11 @@ use tonic::{ Request, Response, Status, }; -pub struct RemoteExecutionServerWrapper { +pub struct GRPCNetworkMessageServiceServerWrapper { inbound_handlers: Arc>>>, } -impl RemoteExecutionServerWrapper { +impl GRPCNetworkMessageServiceServerWrapper { pub fn new(inbound_handlers: Arc>>>) -> Self { Self { inbound_handlers } } @@ -36,14 +36,21 @@ impl RemoteExecutionServerWrapper { rt: &Runtime, _service: String, server_addr: SocketAddr, + rpc_timeout_ms: u64, server_shutdown_rx: oneshot::Receiver<()>, ) { rt.spawn(async move { - self.start_async(server_addr, server_shutdown_rx).await; + self.start_async(server_addr, rpc_timeout_ms, server_shutdown_rx) + .await; }); } - async fn start_async(self, server_addr: SocketAddr, server_shutdown_rx: oneshot::Receiver<()>) { + async fn start_async( + self, + server_addr: SocketAddr, + rpc_timeout_ms: u64, + server_shutdown_rx: oneshot::Receiver<()>, + ) { let reflection_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) .build() @@ -54,7 +61,8 @@ impl RemoteExecutionServerWrapper { // till the server is shutdown. Hence this should be called as a separate non-blocking task. // Signal handler 'server_shutdown_rx' is needed to shutdown the server Server::builder() - .add_service(RemoteExecutionServer::new(self)) + .timeout(std::time::Duration::from_millis(rpc_timeout_ms)) + .add_service(NetworkMessageServiceServer::new(self)) .add_service(reflection_service) .serve_with_shutdown(server_addr, async { server_shutdown_rx.await.ok(); @@ -66,13 +74,13 @@ impl RemoteExecutionServerWrapper { } #[tonic::async_trait] -impl RemoteExecution for RemoteExecutionServerWrapper { +impl NetworkMessageService for GRPCNetworkMessageServiceServerWrapper { async fn simple_msg_exchange( &self, request: Request, ) -> Result, Status> { + let remote_addr = request.remote_addr(); let network_message = request.into_inner(); - let sender = network_message.sender_addr; let msg = Message::new(network_message.message); let message_type = MessageType::new(network_message.message_type); @@ -82,19 +90,19 @@ impl RemoteExecution for RemoteExecutionServerWrapper { } else { error!( "No handler registered for sender: {:?} and msg type {:?}", - sender, message_type + remote_addr, message_type ); } Ok(Response::new(Empty {})) } } -pub struct RemoteExecutionClientWrapper { +pub struct GRPCNetworkMessageServiceClientWrapper { remote_addr: String, - remote_channel: RemoteExecutionClient, + remote_channel: NetworkMessageServiceClient, } -impl RemoteExecutionClientWrapper { +impl GRPCNetworkMessageServiceClientWrapper { pub fn new(rt: &Runtime, remote_addr: SocketAddr) -> Self { Self { remote_addr: remote_addr.to_string(), @@ -103,12 +111,12 @@ impl RemoteExecutionClientWrapper { } } - async fn get_channel(remote_addr: String) -> RemoteExecutionClient { + async fn get_channel(remote_addr: String) -> NetworkMessageServiceClient { info!("Trying to connect to remote server at {:?}", remote_addr); let conn = tonic::transport::Endpoint::new(remote_addr) .unwrap() .connect_lazy(); - RemoteExecutionClient::new(conn) + NetworkMessageServiceClient::new(conn) } pub async fn send_message( @@ -118,15 +126,14 @@ impl RemoteExecutionClientWrapper { mt: &MessageType, ) { let request = tonic::Request::new(NetworkMessage { - sender_addr: sender_addr.to_string(), message: message.data, message_type: mt.get_type(), }); - // TODO: Retry with exponential backoff on failure + // TODO: Retry with exponential backoff on failures match self.remote_channel.simple_msg_exchange(request).await { Ok(_) => {}, Err(e) => { - error!( + panic!( "Error '{}' sending message to {} on node {:?}", e, self.remote_addr, sender_addr ); @@ -153,7 +160,7 @@ fn basic_test() { .lock() .unwrap() .insert(MessageType::new(message_type.clone()), msg_tx); - let server = RemoteExecutionServerWrapper::new(server_handlers); + let server = GRPCNetworkMessageServiceServerWrapper::new(server_handlers); let rt = Runtime::new().unwrap(); let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel(); @@ -161,10 +168,11 @@ fn basic_test() { &rt, "unit tester".to_string(), server_addr, + 1000, server_shutdown_rx, ); - let mut grpc_client = RemoteExecutionClientWrapper::new(&rt, server_addr); + let mut grpc_client = GRPCNetworkMessageServiceClientWrapper::new(&rt, server_addr); let client_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), utils::get_available_port()); let test_message_content = "test1".as_bytes().to_vec(); diff --git a/secure/net/src/network_controller/inbound_handler.rs b/secure/net/src/network_controller/inbound_handler.rs index 725abc3cc9fde1..955e54b013f962 100644 --- a/secure/net/src/network_controller/inbound_handler.rs +++ b/secure/net/src/network_controller/inbound_handler.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation use crate::{ - grpc_network_service::RemoteExecutionServerWrapper, + grpc_network_service::GRPCNetworkMessageServiceServerWrapper, network_controller::{Message, MessageType}, }; use aptos_logger::warn; @@ -16,14 +16,16 @@ use tokio::{runtime::Runtime, sync::oneshot}; pub struct InboundHandler { service: String, listen_addr: SocketAddr, + rpc_timeout_ms: u64, inbound_handlers: Arc>>>, } impl InboundHandler { - pub fn new(service: String, listen_addr: SocketAddr, _: u64) -> Self { + pub fn new(service: String, listen_addr: SocketAddr, rpc_timeout_ms: u64) -> Self { Self { service: service.clone(), listen_addr, + rpc_timeout_ms, inbound_handlers: Arc::new(Mutex::new(HashMap::new())), } } @@ -45,10 +47,11 @@ impl InboundHandler { let (server_shutdown_tx, server_shutdown_rx) = oneshot::channel(); // The server is started in a separate task - RemoteExecutionServerWrapper::new(self.inbound_handlers.clone()).start( + GRPCNetworkMessageServiceServerWrapper::new(self.inbound_handlers.clone()).start( rt, self.service.clone(), self.listen_addr, + self.rpc_timeout_ms, server_shutdown_rx, ); Some(server_shutdown_tx) diff --git a/secure/net/src/network_controller/outbound_handler.rs b/secure/net/src/network_controller/outbound_handler.rs index 11089d7994d19e..d1ce53e1842cfe 100644 --- a/secure/net/src/network_controller/outbound_handler.rs +++ b/secure/net/src/network_controller/outbound_handler.rs @@ -1,10 +1,10 @@ // Copyright © Aptos Foundation use crate::{ - grpc_network_service::RemoteExecutionClientWrapper, + grpc_network_service::GRPCNetworkMessageServiceClientWrapper, network_controller::{inbound_handler::InboundHandler, Message, MessageType}, }; -use aptos_logger::{info, trace}; +use aptos_logger::{info, warn}; use crossbeam_channel::{unbounded, Receiver, Select, Sender}; use std::{ collections::{HashMap, HashSet}, @@ -63,11 +63,12 @@ impl OutboundHandler { )); // Create a grpc client for each remote address - let mut grpc_clients: HashMap = HashMap::new(); + let mut grpc_clients: HashMap = + HashMap::new(); self.remote_addresses.iter().for_each(|remote_addr| { grpc_clients.insert( *remote_addr, - RemoteExecutionClientWrapper::new(rt, *remote_addr), + GRPCNetworkMessageServiceClientWrapper::new(rt, *remote_addr), ); }); @@ -100,7 +101,7 @@ impl OutboundHandler { outbound_handlers: Vec<(Receiver, SocketAddr, MessageType)>, socket_addr: &SocketAddr, inbound_handler: Arc>, - grpc_clients: &mut HashMap, + grpc_clients: &mut HashMap, ) { loop { let mut select = Select::new(); @@ -118,13 +119,13 @@ impl OutboundHandler { msg = m; }, Err(e) => { - // not necessarily an error, just means a sender closed a channel - trace!( - "{:?} on outbound handler for {:?}", + warn!( + "{:?} for outbound handler on {:?}. This can happen in shutdown,\ + but should not happen otherwise", e.to_string(), socket_addr ); - continue; + return; }, } }