Skip to content

Commit

Permalink
[Sharded-Execution-GRPC] Add GRPC communication for sharded execution.
Browse files Browse the repository at this point in the history
In this commit we replace the existing socket based communication with GRPC.
Here we get the basic GRPC reliability.

More reliability and better performance to come in subsequent commits
  • Loading branch information
manudhundi committed Sep 27, 2023
1 parent e3c66fe commit 32ca8ee
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 231 deletions.
11 changes: 6 additions & 5 deletions execution/executor-service/src/process_executor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ use crate::remote_executor_service::ExecutorService;
use aptos_logger::info;
use aptos_types::block_executor::partitioner::ShardId;
use std::net::SocketAddr;
use tokio::runtime::Runtime;

/// An implementation of the remote executor service that runs in a standalone process.
pub struct ProcessExecutorService {
_executor_service: ExecutorService,
executor_service: ExecutorService,
}

impl ProcessExecutorService {
Expand All @@ -33,8 +32,10 @@ impl ProcessExecutorService {
remote_shard_addresses,
);
executor_service.start();
Self {
_executor_service: executor_service,
}
Self { executor_service }
}

pub fn shutdown(&mut self) {
self.executor_service.shutdown()
}
}
8 changes: 3 additions & 5 deletions execution/executor-service/src/remote_cordinator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,16 @@ impl CoordinatorClient<RemoteStateViewClient> for RemoteCoordinatorClient {
let state_keys = Self::extract_state_keys(&command);
self.state_view_client.init_for_block(state_keys);
let (sub_blocks, concurrency, gas_limit) = command.into();
return ExecutorShardCommand::ExecuteSubBlocks(
ExecutorShardCommand::ExecuteSubBlocks(
self.state_view_client.clone(),
sub_blocks,
concurrency,
gas_limit,
);
)
},
}
},
Err(_) => {
ExecutorShardCommand::Stop
},
Err(_) => ExecutorShardCommand::Stop,
}
}

Expand Down
1 change: 0 additions & 1 deletion execution/executor-service/src/remote_executor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use aptos_secure_net::network_controller::NetworkController;
use aptos_types::block_executor::partitioner::ShardId;
use aptos_vm::sharded_block_executor::sharded_executor_service::ShardedExecutorService;
use std::{net::SocketAddr, sync::Arc, thread};
use tokio::runtime::Runtime;

/// A service that provides support for remote execution. Essentially, it reads a request from
/// the remote executor client and executes the block locally and returns the result.
Expand Down
19 changes: 6 additions & 13 deletions execution/executor-service/src/remote_state_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,12 @@ impl RemoteStateValueReceiver {
}

fn start(&self) {
loop {
match self.kv_rx.recv() {
Ok(message) => {
let state_view = self.state_view.clone();
let shard_id = self.shard_id;
self.thread_pool.spawn(move || {
Self::handle_message(shard_id, message, state_view);
});
},
Err(_) => {
break;
},
}
while let Ok(message) = self.kv_rx.recv() {
let state_view = self.state_view.clone();
let shard_id = self.shard_id;
self.thread_pool.spawn(move || {
Self::handle_message(shard_id, message, state_view);
});
}
}

Expand Down
19 changes: 6 additions & 13 deletions execution/executor-service/src/remote_state_view_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,12 @@ impl<S: StateView + Sync + Send + 'static> RemoteStateViewService<S> {
}

pub fn start(&self) {
loop {
match self.kv_rx.recv() {
Ok(message) => {
let state_view = self.state_view.clone();
let kv_txs = self.kv_tx.clone();
self.thread_pool.spawn(move || {
Self::handle_message(message, state_view, kv_txs);
});
}
Err(_) => {
break;
}
}
while let Ok(message) = self.kv_rx.recv() {
let state_view = self.state_view.clone();
let kv_txs = self.kv_tx.clone();
self.thread_pool.spawn(move || {
Self::handle_message(message, state_view, kv_txs);
});
}
}

Expand Down
36 changes: 17 additions & 19 deletions execution/executor-service/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use aptos_language_e2e_tests::data_store::FakeDataStore;
use aptos_secure_net::network_controller::NetworkController;
use aptos_vm::sharded_block_executor::ShardedBlockExecutor;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::thread;
use tokio::runtime::Runtime;

pub fn create_thread_remote_executor_shards(
num_shards: usize,
Expand Down Expand Up @@ -58,22 +56,22 @@ pub fn create_thread_remote_executor_shards(

#[test]
fn test_sharded_block_executor_no_conflict() {
for i in 0..1 {
thread::sleep(std::time::Duration::from_millis(10));
println!("Run number: {}", i);
let num_shards = 2;
let (mut controller, executor_client, mut executor_services) =
create_thread_remote_executor_shards(num_shards, Some(2));
controller.start();
//thread::sleep(std::time::Duration::from_millis(1000));
let sharded_block_executor = ShardedBlockExecutor::new(executor_client);
println!("Starting test_sharded_block_executor_no_conflict..................");
test_utils::test_sharded_block_executor_no_conflict(sharded_block_executor);
println!("Done testing test_sharded_block_executor_no_conflict..................");
use std::thread;

controller.shutdown();
executor_services.iter_mut().for_each(|executor_service| {
executor_service.shutdown();
});
}
let num_shards = 8;
let (mut controller, executor_client, mut executor_services) =
create_thread_remote_executor_shards(num_shards, Some(2));
controller.start();
let sharded_block_executor = ShardedBlockExecutor::new(executor_client);

// wait for the servers to be ready before sending messages
// TODO: We need to pass this test without this sleep
thread::sleep(std::time::Duration::from_millis(10));

test_utils::test_sharded_block_executor_no_conflict(sharded_block_executor);

controller.shutdown();
executor_services.iter_mut().for_each(|executor_service| {
executor_service.shutdown();
});
}
11 changes: 1 addition & 10 deletions execution/executor-service/src/thread_executor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@
// SPDX-License-Identifier: Apache-2.0
use crate::remote_executor_service::ExecutorService;
use aptos_types::block_executor::partitioner::ShardId;
use std::{net::SocketAddr, thread, thread::JoinHandle};
use tokio::runtime::Runtime;
use std::net::SocketAddr;

/// This is a simple implementation of RemoteExecutorService that runs the executor service in a
/// separate thread. This should be used for testing only.
pub struct ThreadExecutorService {
//_child: JoinHandle<()>,
_self_address: SocketAddr,
executor_service: ExecutorService,
}
Expand All @@ -30,15 +28,8 @@ impl ThreadExecutorService {
coordinator_address,
remote_shard_addresses,
);



println!("ThreadExecutorService starting at {}", self_address);
executor_service.start();
println!("ThreadExecutorService: controller started at {}", self_address);

Self {
// _child: child,
_self_address: self_address,
executor_service,
}
Expand Down
Loading

0 comments on commit 32ca8ee

Please sign in to comment.