From fdd54355559e32aa5a4c271f3c0f3f363207c195 Mon Sep 17 00:00:00 2001 From: Mathieu Baudet <1105398+ma2bd@users.noreply.github.com> Date: Wed, 4 Sep 2024 23:24:22 -0400 Subject: [PATCH] address comments --- linera-chain/src/chain.rs | 20 ++++--------- linera-core/src/chain_worker/actor.rs | 29 +++++++++---------- linera-core/src/chain_worker/state/mod.rs | 14 +++------ .../chain_worker/state/temporary_changes.rs | 7 +---- linera-execution/src/execution.rs | 24 ++++++++------- linera-execution/src/lib.rs | 2 +- linera-execution/src/test_utils/mod.rs | 22 +++++++------- linera-execution/tests/test_execution.rs | 7 ++--- .../tests/test_system_execution.rs | 2 +- linera-execution/tests/wasm.rs | 6 ++-- 10 files changed, 54 insertions(+), 79 deletions(-) diff --git a/linera-chain/src/chain.rs b/linera-chain/src/chain.rs index 6296ebefbae..498a941127b 100644 --- a/linera-chain/src/chain.rs +++ b/linera-chain/src/chain.rs @@ -22,10 +22,10 @@ use linera_base::{ }, }; use linera_execution::{ - system::OpenChainConfig, ExecutionError, ExecutionOutcome, ExecutionRequest, - ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext, Operation, - OperationContext, Query, QueryContext, RawExecutionOutcome, RawOutgoingMessage, - ResourceController, ResourceTracker, Response, ServiceRuntimeRequest, TransactionTracker, + system::OpenChainConfig, ExecutionError, ExecutionOutcome, ExecutionRuntimeContext, + ExecutionStateView, Message, MessageContext, Operation, OperationContext, Query, QueryContext, + RawExecutionOutcome, RawOutgoingMessage, ResourceController, ResourceTracker, Response, + ServiceRuntimeEndpoint, TransactionTracker, }; use linera_views::{ context::Context, @@ -375,10 +375,7 @@ where &mut self, local_time: Timestamp, query: Query, - incoming_execution_requests: Option< - &mut futures::channel::mpsc::UnboundedReceiver, - >, - runtime_request_sender: Option<&mut std::sync::mpsc::Sender>, + service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>, ) -> Result { let context = QueryContext { chain_id: self.chain_id(), @@ -387,12 +384,7 @@ where }; let response = self .execution_state - .query_application( - context, - query, - incoming_execution_requests, - runtime_request_sender, - ) + .query_application(context, query, service_runtime_endpoint) .await .map_err(|error| ChainError::ExecutionError(error, ChainExecutionContext::Query))?; Ok(response) diff --git a/linera-core/src/chain_worker/actor.rs b/linera-core/src/chain_worker/actor.rs index 4aa94eb3f88..f9ddd6085b9 100644 --- a/linera-core/src/chain_worker/actor.rs +++ b/linera-core/src/chain_worker/actor.rs @@ -22,8 +22,7 @@ use linera_chain::{ ChainStateView, }; use linera_execution::{ - committee::Epoch, ExecutionRequest, Query, QueryContext, Response, ServiceRuntimeRequest, - ServiceSyncRuntime, + committee::Epoch, Query, QueryContext, Response, ServiceRuntimeEndpoint, ServiceSyncRuntime, }; use linera_storage::Storage; use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard}; @@ -151,12 +150,12 @@ where tracked_chains: Option>>>, chain_id: ChainId, ) -> Result { - let (service_runtime_thread, execution_state_receiver, runtime_request_sender) = { + let (service_runtime_thread, service_runtime_endpoint) = { if config.long_lived_services { - let (thread, receiver, sender) = Self::spawn_service_runtime_actor(chain_id); - (Some(thread), Some(receiver), Some(sender)) + let (thread, endpoint) = Self::spawn_service_runtime_actor(chain_id); + (Some(thread), Some(endpoint)) } else { - (None, None, None) + (None, None) } }; @@ -167,8 +166,7 @@ where blob_cache, tracked_chains, chain_id, - execution_state_receiver, - runtime_request_sender, + service_runtime_endpoint, ) .await?; @@ -185,8 +183,7 @@ where chain_id: ChainId, ) -> ( linera_base::task::BlockingFuture<()>, - futures::channel::mpsc::UnboundedReceiver, - std::sync::mpsc::Sender, + ServiceRuntimeEndpoint, ) { let context = QueryContext { chain_id, @@ -194,7 +191,7 @@ where local_time: Timestamp::from(0), }; - let (execution_state_sender, execution_state_receiver) = + let (execution_state_sender, incoming_execution_requests) = futures::channel::mpsc::unbounded(); let (runtime_request_sender, runtime_request_receiver) = std::sync::mpsc::channel(); @@ -202,11 +199,11 @@ where ServiceSyncRuntime::new(execution_state_sender, context).run(runtime_request_receiver) }); - ( - service_runtime_thread, - execution_state_receiver, + let endpoint = ServiceRuntimeEndpoint { + incoming_execution_requests, runtime_request_sender, - ) + }; + (service_runtime_thread, endpoint) } /// Runs the worker until there are no more incoming requests. @@ -317,8 +314,8 @@ where } } - drop(self.worker); if let Some(thread) = self.service_runtime_thread { + drop(self.worker); thread .await .expect("Service runtime thread should not panic"); diff --git a/linera-core/src/chain_worker/state/mod.rs b/linera-core/src/chain_worker/state/mod.rs index a1405991625..da70217c51a 100644 --- a/linera-core/src/chain_worker/state/mod.rs +++ b/linera-core/src/chain_worker/state/mod.rs @@ -26,8 +26,7 @@ use linera_chain::{ ChainError, ChainStateView, }; use linera_execution::{ - committee::Epoch, ExecutionRequest, Message, Query, QueryContext, Response, - ServiceRuntimeRequest, SystemMessage, + committee::Epoch, Message, Query, QueryContext, Response, ServiceRuntimeEndpoint, SystemMessage, }; use linera_storage::{Clock as _, Storage}; use linera_views::views::{ClonableView, ViewError}; @@ -55,8 +54,7 @@ where storage: StorageClient, chain: ChainStateView, shared_chain_view: Option>>>, - execution_state_receiver: Option>, - runtime_request_sender: Option>, + service_runtime_endpoint: Option, recent_hashed_certificate_values: Arc>, recent_blobs: Arc>, tracked_chains: Option>>>, @@ -76,10 +74,7 @@ where blob_cache: Arc>, tracked_chains: Option>>>, chain_id: ChainId, - execution_state_receiver: Option< - futures::channel::mpsc::UnboundedReceiver, - >, - runtime_request_sender: Option>, + service_runtime_endpoint: Option, ) -> Result { let chain = storage.load_chain(chain_id).await?; @@ -88,8 +83,7 @@ where storage, chain, shared_chain_view: None, - execution_state_receiver, - runtime_request_sender, + service_runtime_endpoint, recent_hashed_certificate_values: certificate_value_cache, recent_blobs: blob_cache, tracked_chains, diff --git a/linera-core/src/chain_worker/state/temporary_changes.rs b/linera-core/src/chain_worker/state/temporary_changes.rs index b0a2b45080c..b089b2a1868 100644 --- a/linera-core/src/chain_worker/state/temporary_changes.rs +++ b/linera-core/src/chain_worker/state/temporary_changes.rs @@ -102,12 +102,7 @@ where let response = self .0 .chain - .query_application( - local_time, - query, - self.0.execution_state_receiver.as_mut(), - self.0.runtime_request_sender.as_mut(), - ) + .query_application(local_time, query, self.0.service_runtime_endpoint.as_mut()) .await?; Ok(response) } diff --git a/linera-execution/src/execution.rs b/linera-execution/src/execution.rs index dc138a334bf..1171b41cb7a 100644 --- a/linera-execution/src/execution.rs +++ b/linera-execution/src/execution.rs @@ -45,6 +45,14 @@ pub struct ExecutionStateView { pub users: HashedReentrantCollectionView>, } +/// How to interact with a long-lived service runtime. +pub struct ServiceRuntimeEndpoint { + /// How to receive requests. + pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver, + /// How to query the runtime. + pub runtime_request_sender: std::sync::mpsc::Sender, +} + #[cfg(with_testing)] impl ExecutionStateView> where @@ -443,10 +451,7 @@ where &mut self, context: QueryContext, query: Query, - incoming_execution_requests: Option< - &mut futures::channel::mpsc::UnboundedReceiver, - >, - runtime_request_sender: Option<&mut std::sync::mpsc::Sender>, + endpoint: Option<&mut ServiceRuntimeEndpoint>, ) -> Result { assert_eq!(context.chain_id, self.context().extra().chain_id()); match query { @@ -459,22 +464,21 @@ where bytes, } => { let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config(); - let response = match (incoming_execution_requests, runtime_request_sender) { - (Some(requests), Some(sender)) => { + let response = match endpoint { + Some(endpoint) => { self.query_user_application_with_long_lived_service( application_id, context, bytes, - requests, - sender, + &mut endpoint.incoming_execution_requests, + &mut endpoint.runtime_request_sender, ) .await? } - (None, None) => { + None => { self.query_user_application(application_id, context, bytes) .await? } - _ => unreachable!("invalid parameters"), }; Ok(Response::User(response)) } diff --git a/linera-execution/src/lib.rs b/linera-execution/src/lib.rs index 6fd9944a84a..0748b8d410b 100644 --- a/linera-execution/src/lib.rs +++ b/linera-execution/src/lib.rs @@ -60,7 +60,7 @@ pub use crate::wasm::{ }; pub use crate::{ applications::ApplicationRegistryView, - execution::ExecutionStateView, + execution::{ExecutionStateView, ServiceRuntimeEndpoint}, execution_state_actor::ExecutionRequest, policy::ResourceControlPolicy, resources::{ResourceController, ResourceTracker}, diff --git a/linera-execution/src/test_utils/mod.rs b/linera-execution/src/test_utils/mod.rs index c2e5b3a9fc9..cc7593dcf32 100644 --- a/linera-execution/src/test_utils/mod.rs +++ b/linera-execution/src/test_utils/mod.rs @@ -27,8 +27,8 @@ pub use self::{ }; use crate::{ ApplicationRegistryView, ExecutionRequest, ExecutionRuntimeContext, ExecutionStateView, - QueryContext, ServiceRuntimeRequest, ServiceSyncRuntime, TestExecutionRuntimeContext, - UserApplicationDescription, UserApplicationId, + QueryContext, ServiceRuntimeEndpoint, ServiceRuntimeRequest, ServiceSyncRuntime, + TestExecutionRuntimeContext, UserApplicationDescription, UserApplicationId, }; pub fn create_dummy_user_application_description(index: u64) -> UserApplicationDescription { @@ -109,20 +109,18 @@ impl QueryContext { /// Spawns a thread running the [`ServiceSyncRuntime`] actor. /// /// Returns the endpoints to communicate with the actor. - pub fn spawn_service_runtime_actor( - self, - ) -> ( - futures::channel::mpsc::UnboundedReceiver, - std::sync::mpsc::Sender, - ) { - let (execution_state_sender, execution_state_receiver) = + pub fn spawn_service_runtime_actor(self) -> ServiceRuntimeEndpoint { + let (execution_state_sender, incoming_execution_requests) = futures::channel::mpsc::unbounded(); - let (request_sender, request_receiver) = std::sync::mpsc::channel(); + let (runtime_request_sender, runtime_request_receiver) = std::sync::mpsc::channel(); thread::spawn(move || { - ServiceSyncRuntime::new(execution_state_sender, self).run(request_receiver) + ServiceSyncRuntime::new(execution_state_sender, self).run(runtime_request_receiver) }); - (execution_state_receiver, request_sender) + ServiceRuntimeEndpoint { + incoming_execution_requests, + runtime_request_sender, + } } } diff --git a/linera-execution/tests/test_execution.rs b/linera-execution/tests/test_execution.rs index bcecbf71b7e..c6a7cc1cf39 100644 --- a/linera-execution/tests/test_execution.rs +++ b/linera-execution/tests/test_execution.rs @@ -207,8 +207,7 @@ async fn test_simple_user_operation() -> anyhow::Result<()> { next_block_height: BlockHeight(0), local_time: Timestamp::from(0), }; - let (mut execution_request_receiver, mut runtime_request_sender) = - context.spawn_service_runtime_actor(); + let mut service_runtime_endpoint = context.spawn_service_runtime_actor(); assert_eq!( view.query_application( context, @@ -216,8 +215,7 @@ async fn test_simple_user_operation() -> anyhow::Result<()> { application_id: caller_id, bytes: vec![] }, - Some(&mut execution_request_receiver), - Some(&mut runtime_request_sender), + Some(&mut service_runtime_endpoint), ) .await .unwrap(), @@ -232,7 +230,6 @@ async fn test_simple_user_operation() -> anyhow::Result<()> { bytes: vec![] }, None, - None, ) .await .unwrap(), diff --git a/linera-execution/tests/test_system_execution.rs b/linera-execution/tests/test_system_execution.rs index c461e1d8895..7c6db04cb09 100644 --- a/linera-execution/tests/test_system_execution.rs +++ b/linera-execution/tests/test_system_execution.rs @@ -117,7 +117,7 @@ async fn test_simple_system_query() -> anyhow::Result<()> { local_time: Timestamp::from(0), }; let response = view - .query_application(context, Query::System(SystemQuery), None, None) + .query_application(context, Query::System(SystemQuery), None) .await .unwrap(); assert_eq!( diff --git a/linera-execution/tests/wasm.rs b/linera-execution/tests/wasm.rs index c83273c7d09..40fead5510f 100644 --- a/linera-execution/tests/wasm.rs +++ b/linera-execution/tests/wasm.rs @@ -125,8 +125,7 @@ async fn test_fuel_for_counter_wasm_application( next_block_height: BlockHeight(0), local_time: Timestamp::from(0), }; - let (mut execution_request_receiver, mut runtime_request_sender) = - context.spawn_service_runtime_actor(); + let mut service_runtime_endpoint = context.spawn_service_runtime_actor(); let expected_value = async_graphql::Response::new( async_graphql::Value::from_json(json!({"value" : increments.into_iter().sum::()})) .unwrap(), @@ -136,8 +135,7 @@ async fn test_fuel_for_counter_wasm_application( .query_application( context, Query::user(app_id, &request).unwrap(), - Some(&mut execution_request_receiver), - Some(&mut runtime_request_sender), + Some(&mut service_runtime_endpoint), ) .await? else {