diff --git a/CLI.md b/CLI.md index 42956b7067b..ac814167464 100644 --- a/CLI.md +++ b/CLI.md @@ -116,6 +116,7 @@ A Byzantine-fault tolerant sidechain with low-latency finality and high throughp Default value: `10` * `--wait-for-outgoing-messages` — Whether to wait until a quorum of validators has confirmed that all sent cross-chain messages have been delivered +* `--long-lived-services` — (EXPERIMENTAL) Whether application services can persist in some cases between queries * `--tokio-threads ` — The number of Tokio worker threads to use * `--blanket-message-policy ` — The policy for handling incoming messages diff --git a/examples/llm/README.md b/examples/llm/README.md index 49adecc1fef..22e198cf13f 100644 --- a/examples/llm/README.md +++ b/examples/llm/README.md @@ -9,11 +9,6 @@ The model used by Linera Stories is a 40M parameter TinyLlama by A. Karpathy. Fi CAVEAT: -* We currently use a local HTTP service to provide model data to the wallet - implementation (aka "node service"). In the future, model data will be stored on-chain - ([#1981](https://github.com/linera-io/linera-protocol/issues/1981)) or in an external - decentralized storage. - * Running larger LLMs with acceptable performance will likely require hardware acceleration ([#1931](https://github.com/linera-io/linera-protocol/issues/1931)). * The service currently is restarted when the wallet receives a new block for the chain where the @@ -59,8 +54,11 @@ Then, a node service for the current wallet has to be started: ```bash PORT=8080 -linera service --port $PORT & -``` +linera --long-lived-services service --port $PORT & + ``` + +The experimental option `--long-lived-services` is used for performance, to avoid +reloading the model between queries. Next, navigate to `llm/web-frontend` and install the requisite `npm` dependencies: @@ -75,6 +73,5 @@ Finally, navigate to `localhost:3000` to interact with the Linera ChatBot. ```bash echo "http://localhost:3000/$CHAIN?app=$APP_ID&port=$PORT" ``` - diff --git a/examples/llm/src/lib.rs b/examples/llm/src/lib.rs index b7d28b5488d..ab51398a772 100644 --- a/examples/llm/src/lib.rs +++ b/examples/llm/src/lib.rs @@ -11,11 +11,6 @@ The model used by Linera Stories is a 40M parameter TinyLlama by A. Karpathy. Fi CAVEAT: -* We currently use a local HTTP service to provide model data to the wallet - implementation (aka "node service"). In the future, model data will be stored on-chain - ([#1981](https://github.com/linera-io/linera-protocol/issues/1981)) or in an external - decentralized storage. - * Running larger LLMs with acceptable performance will likely require hardware acceleration ([#1931](https://github.com/linera-io/linera-protocol/issues/1931)). * The service currently is restarted when the wallet receives a new block for the chain where the @@ -61,8 +56,11 @@ Then, a node service for the current wallet has to be started: ```bash PORT=8080 -linera service --port $PORT & -``` +linera --long-lived-services service --port $PORT & + ``` + +The experimental option `--long-lived-services` is used for performance, to avoid +reloading the model between queries. Next, navigate to `llm/web-frontend` and install the requisite `npm` dependencies: @@ -77,7 +75,7 @@ Finally, navigate to `localhost:3000` to interact with the Linera ChatBot. ```bash echo "http://localhost:3000/$CHAIN?app=$APP_ID&port=$PORT" ``` - */ +*/ use async_graphql::{Request, Response}; use linera_sdk::base::{ContractAbi, ServiceAbi}; diff --git a/linera-chain/src/chain.rs b/linera-chain/src/chain.rs index 55b236184a4..498a941127b 100644 --- a/linera-chain/src/chain.rs +++ b/linera-chain/src/chain.rs @@ -22,10 +22,10 @@ use linera_base::{ }, }; use linera_execution::{ - system::OpenChainConfig, ExecutionError, ExecutionOutcome, ExecutionRequest, - ExecutionRuntimeContext, ExecutionStateView, Message, MessageContext, Operation, - OperationContext, Query, QueryContext, RawExecutionOutcome, RawOutgoingMessage, - ResourceController, ResourceTracker, Response, ServiceRuntimeRequest, TransactionTracker, + system::OpenChainConfig, ExecutionError, ExecutionOutcome, ExecutionRuntimeContext, + ExecutionStateView, Message, MessageContext, Operation, OperationContext, Query, QueryContext, + RawExecutionOutcome, RawOutgoingMessage, ResourceController, ResourceTracker, Response, + ServiceRuntimeEndpoint, TransactionTracker, }; use linera_views::{ context::Context, @@ -375,10 +375,7 @@ where &mut self, local_time: Timestamp, query: Query, - incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver< - ExecutionRequest, - >, - runtime_request_sender: &mut std::sync::mpsc::Sender, + service_runtime_endpoint: Option<&mut ServiceRuntimeEndpoint>, ) -> Result { let context = QueryContext { chain_id: self.chain_id(), @@ -387,12 +384,7 @@ where }; let response = self .execution_state - .query_application( - context, - query, - incoming_execution_requests, - runtime_request_sender, - ) + .query_application(context, query, service_runtime_endpoint) .await .map_err(|error| ChainError::ExecutionError(error, ChainExecutionContext::Query))?; Ok(response) diff --git a/linera-client/src/client_context.rs b/linera-client/src/client_context.rs index 9831047efcc..d71e0118047 100644 --- a/linera-client/src/client_context.rs +++ b/linera-client/src/client_context.rs @@ -162,6 +162,7 @@ where storage, options.max_pending_message_bundles, delivery, + options.long_lived_services, wallet.chain_ids(), "Client node", ); diff --git a/linera-client/src/client_options.rs b/linera-client/src/client_options.rs index bdb8d036a6e..771cda6d69f 100644 --- a/linera-client/src/client_options.rs +++ b/linera-client/src/client_options.rs @@ -129,6 +129,10 @@ pub struct ClientOptions { #[arg(long)] pub wait_for_outgoing_messages: bool, + /// (EXPERIMENTAL) Whether application services can persist in some cases between queries. + #[arg(long)] + pub long_lived_services: bool, + /// The number of Tokio worker threads to use. #[arg(long, env = "LINERA_CLIENT_TOKIO_THREADS")] pub tokio_threads: Option, diff --git a/linera-client/src/unit_tests/chain_listener.rs b/linera-client/src/unit_tests/chain_listener.rs index 742d79e8a71..2d0d6f873a5 100644 --- a/linera-client/src/unit_tests/chain_listener.rs +++ b/linera-client/src/unit_tests/chain_listener.rs @@ -159,6 +159,7 @@ async fn test_chain_listener() -> anyhow::Result<()> { storage.clone(), 10, delivery, + false, [chain_id0], "Client node", )), diff --git a/linera-core/src/chain_worker/actor.rs b/linera-core/src/chain_worker/actor.rs index b2a5696f9d3..f9ddd6085b9 100644 --- a/linera-core/src/chain_worker/actor.rs +++ b/linera-core/src/chain_worker/actor.rs @@ -22,8 +22,7 @@ use linera_chain::{ ChainStateView, }; use linera_execution::{ - committee::Epoch, ExecutionRequest, Query, QueryContext, Response, ServiceRuntimeRequest, - ServiceSyncRuntime, + committee::Epoch, Query, QueryContext, Response, ServiceRuntimeEndpoint, ServiceSyncRuntime, }; use linera_storage::Storage; use tokio::sync::{mpsc, oneshot, OwnedRwLockReadGuard}; @@ -134,7 +133,7 @@ where StorageClient: Storage + Clone + Send + Sync + 'static, { worker: ChainWorkerState, - service_runtime_thread: linera_base::task::BlockingFuture<()>, + service_runtime_thread: Option>, } impl ChainWorkerActor @@ -151,8 +150,14 @@ where tracked_chains: Option>>>, chain_id: ChainId, ) -> Result { - let (service_runtime_thread, execution_state_receiver, runtime_request_sender) = - Self::spawn_service_runtime_actor(chain_id); + let (service_runtime_thread, service_runtime_endpoint) = { + if config.long_lived_services { + let (thread, endpoint) = Self::spawn_service_runtime_actor(chain_id); + (Some(thread), Some(endpoint)) + } else { + (None, None) + } + }; let worker = ChainWorkerState::load( config, @@ -161,8 +166,7 @@ where blob_cache, tracked_chains, chain_id, - execution_state_receiver, - runtime_request_sender, + service_runtime_endpoint, ) .await?; @@ -179,8 +183,7 @@ where chain_id: ChainId, ) -> ( linera_base::task::BlockingFuture<()>, - futures::channel::mpsc::UnboundedReceiver, - std::sync::mpsc::Sender, + ServiceRuntimeEndpoint, ) { let context = QueryContext { chain_id, @@ -188,7 +191,7 @@ where local_time: Timestamp::from(0), }; - let (execution_state_sender, execution_state_receiver) = + let (execution_state_sender, incoming_execution_requests) = futures::channel::mpsc::unbounded(); let (runtime_request_sender, runtime_request_receiver) = std::sync::mpsc::channel(); @@ -196,11 +199,11 @@ where ServiceSyncRuntime::new(execution_state_sender, context).run(runtime_request_receiver) }); - ( - service_runtime_thread, - execution_state_receiver, + let endpoint = ServiceRuntimeEndpoint { + incoming_execution_requests, runtime_request_sender, - ) + }; + (service_runtime_thread, endpoint) } /// Runs the worker until there are no more incoming requests. @@ -311,10 +314,12 @@ where } } - drop(self.worker); - self.service_runtime_thread - .await - .expect("Service runtime thread should not panic"); + if let Some(thread) = self.service_runtime_thread { + drop(self.worker); + thread + .await + .expect("Service runtime thread should not panic"); + } trace!("`ChainWorkerActor` finished"); } diff --git a/linera-core/src/chain_worker/config.rs b/linera-core/src/chain_worker/config.rs index fde06f23781..4fe2cab25eb 100644 --- a/linera-core/src/chain_worker/config.rs +++ b/linera-core/src/chain_worker/config.rs @@ -17,6 +17,8 @@ pub struct ChainWorkerConfig { pub allow_inactive_chains: bool, /// Whether new messages from deprecated epochs are allowed. pub allow_messages_from_deprecated_epochs: bool, + /// Whether the user application services should be long-lived. + pub long_lived_services: bool, /// Blocks with a timestamp this far in the future will still be accepted, but the validator /// will wait until that timestamp before voting. pub grace_period: Duration, diff --git a/linera-core/src/chain_worker/state/mod.rs b/linera-core/src/chain_worker/state/mod.rs index b4835162ea5..da70217c51a 100644 --- a/linera-core/src/chain_worker/state/mod.rs +++ b/linera-core/src/chain_worker/state/mod.rs @@ -26,8 +26,7 @@ use linera_chain::{ ChainError, ChainStateView, }; use linera_execution::{ - committee::Epoch, ExecutionRequest, Message, Query, QueryContext, Response, - ServiceRuntimeRequest, SystemMessage, + committee::Epoch, Message, Query, QueryContext, Response, ServiceRuntimeEndpoint, SystemMessage, }; use linera_storage::{Clock as _, Storage}; use linera_views::views::{ClonableView, ViewError}; @@ -55,8 +54,7 @@ where storage: StorageClient, chain: ChainStateView, shared_chain_view: Option>>>, - execution_state_receiver: futures::channel::mpsc::UnboundedReceiver, - runtime_request_sender: std::sync::mpsc::Sender, + service_runtime_endpoint: Option, recent_hashed_certificate_values: Arc>, recent_blobs: Arc>, tracked_chains: Option>>>, @@ -76,8 +74,7 @@ where blob_cache: Arc>, tracked_chains: Option>>>, chain_id: ChainId, - execution_state_receiver: futures::channel::mpsc::UnboundedReceiver, - runtime_request_sender: std::sync::mpsc::Sender, + service_runtime_endpoint: Option, ) -> Result { let chain = storage.load_chain(chain_id).await?; @@ -86,8 +83,7 @@ where storage, chain, shared_chain_view: None, - execution_state_receiver, - runtime_request_sender, + service_runtime_endpoint, recent_hashed_certificate_values: certificate_value_cache, recent_blobs: blob_cache, tracked_chains, diff --git a/linera-core/src/chain_worker/state/temporary_changes.rs b/linera-core/src/chain_worker/state/temporary_changes.rs index 737aa0fdc45..b089b2a1868 100644 --- a/linera-core/src/chain_worker/state/temporary_changes.rs +++ b/linera-core/src/chain_worker/state/temporary_changes.rs @@ -102,12 +102,7 @@ where let response = self .0 .chain - .query_application( - local_time, - query, - &mut self.0.execution_state_receiver, - &mut self.0.runtime_request_sender, - ) + .query_application(local_time, query, self.0.service_runtime_endpoint.as_mut()) .await?; Ok(response) } diff --git a/linera-core/src/client.rs b/linera-core/src/client.rs index 3a5c3c197a4..72256376e3f 100644 --- a/linera-core/src/client.rs +++ b/linera-core/src/client.rs @@ -113,6 +113,7 @@ impl Client { storage: S, max_pending_message_bundles: usize, cross_chain_message_delivery: CrossChainMessageDelivery, + long_lived_services: bool, tracked_chains: impl IntoIterator, name: impl Into, ) -> Self { @@ -123,6 +124,7 @@ impl Client { tracked_chains.clone(), NonZeroUsize::new(20).expect("Chain worker limit should not be zero"), ) + .with_long_lived_services(long_lived_services) .with_allow_inactive_chains(true) .with_allow_messages_from_deprecated_epochs(true); let local_node = LocalNodeClient::new(state); diff --git a/linera-core/src/unit_tests/test_utils.rs b/linera-core/src/unit_tests/test_utils.rs index 390263e7759..1c960b917ea 100644 --- a/linera-core/src/unit_tests/test_utils.rs +++ b/linera-core/src/unit_tests/test_utils.rs @@ -786,6 +786,7 @@ where storage, 10, CrossChainMessageDelivery::NonBlocking, + false, [chain_id], format!("Client node for {:.8}", chain_id), )); diff --git a/linera-core/src/unit_tests/worker_tests.rs b/linera-core/src/unit_tests/worker_tests.rs index b2ba7a5ec9a..14b3cc39ac5 100644 --- a/linera-core/src/unit_tests/worker_tests.rs +++ b/linera-core/src/unit_tests/worker_tests.rs @@ -74,7 +74,11 @@ const TEST_GRACE_PERIOD_MICROS: u64 = 500_000; /// Instantiates the protocol with a single validator. Returns the corresponding committee /// and the (non-sharded, in-memory) "worker" that we can interact with. -fn init_worker(storage: S, is_client: bool) -> (Committee, WorkerState) +fn init_worker( + storage: S, + is_client: bool, + has_long_lived_services: bool, +) -> (Committee, WorkerState) where S: Storage + Clone + Send + Sync + 'static, { @@ -88,6 +92,7 @@ where ) .with_allow_inactive_chains(is_client) .with_allow_messages_from_deprecated_epochs(is_client) + .with_long_lived_services(has_long_lived_services) .with_grace_period(Duration::from_micros(TEST_GRACE_PERIOD_MICROS)); (committee, worker) } @@ -98,7 +103,9 @@ where I: IntoIterator, S: Storage + Clone + Send + Sync + 'static, { - let (committee, worker) = init_worker(storage, /* is_client */ false); + let (committee, worker) = init_worker( + storage, /* is_client */ false, /* has_long_lived_services */ false, + ); for (description, pubk, balance) in balances { worker .storage @@ -1687,7 +1694,9 @@ where { let storage = storage_builder.build().await?; let sender_key_pair = KeyPair::generate(); - let (committee, worker) = init_worker(storage, /* is_client */ false); + let (committee, worker) = init_worker( + storage, /* is_client */ false, /* has_long_lived_services */ false, + ); let certificate = make_simple_transfer_certificate( ChainDescription::Root(1), &sender_key_pair, @@ -1724,7 +1733,9 @@ where { let storage = storage_builder.build().await?; let sender_key_pair = KeyPair::generate(); - let (committee, worker) = init_worker(storage, /* is_client */ true); + let (committee, worker) = init_worker( + storage, /* is_client */ true, /* has_long_lived_services */ false, + ); let certificate = make_simple_transfer_certificate( ChainDescription::Root(1), &sender_key_pair, @@ -2886,7 +2897,7 @@ async fn test_cross_chain_helper() -> anyhow::Result<()> { TestClock::new(), ) .await?; - let (committee, worker) = init_worker(store, true); + let (committee, worker) = init_worker(store, true, false); let committees = BTreeMap::from_iter([(Epoch::from(1), committee.clone())]); let key_pair0 = KeyPair::generate(); @@ -3564,13 +3575,23 @@ where let key_pair = KeyPair::generate(); let balance = Amount::ZERO; - let (_committee, worker) = init_worker_with_chain( + let (committee, worker) = init_worker( storage.clone(), - chain_description, - key_pair.public(), - balance, - ) - .await; + /* is_client */ false, + /* has_long_lived_services */ true, + ); + worker + .storage + .create_chain( + committee, + ChainId::root(0), + chain_description, + key_pair.public(), + balance, + Timestamp::from(0), + ) + .await + .unwrap(); let mut applications; { @@ -3644,13 +3665,23 @@ where let key_pair = KeyPair::generate(); let balance = Amount::ZERO; - let (committee, worker) = init_worker_with_chain( + let (committee, worker) = init_worker( storage.clone(), - chain_description, - key_pair.public(), - balance, - ) - .await; + /* is_client */ false, + /* has_long_lived_services */ true, + ); + worker + .storage + .create_chain( + committee.clone(), + ChainId::root(0), + chain_description, + key_pair.public(), + balance, + Timestamp::from(0), + ) + .await + .unwrap(); let mut applications; { diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index a01f7ae2578..71677aa1023 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -313,6 +313,13 @@ where self } + #[tracing::instrument(level = "trace", skip(self, value))] + pub fn with_long_lived_services(mut self, value: bool) -> Self { + self.chain_worker_config.long_lived_services = value; + self + } + + #[tracing::instrument(level = "trace", skip(self, tracked_chains))] /// Configures the subset of chains that this worker is tracking. pub fn with_tracked_chains( mut self, diff --git a/linera-execution/src/execution.rs b/linera-execution/src/execution.rs index 47de181c7c4..1171b41cb7a 100644 --- a/linera-execution/src/execution.rs +++ b/linera-execution/src/execution.rs @@ -32,8 +32,8 @@ use crate::{ resources::ResourceController, system::SystemExecutionStateView, ContractSyncRuntime, ExecutionError, ExecutionOutcome, ExecutionRuntimeConfig, ExecutionRuntimeContext, Message, MessageContext, MessageKind, Operation, OperationContext, Query, QueryContext, - RawExecutionOutcome, RawOutgoingMessage, Response, SystemMessage, TransactionTracker, - UserApplicationDescription, UserApplicationId, + RawExecutionOutcome, RawOutgoingMessage, Response, ServiceSyncRuntime, SystemMessage, + TransactionTracker, UserApplicationDescription, UserApplicationId, }; /// A view accessing the execution state of a chain. @@ -45,6 +45,14 @@ pub struct ExecutionStateView { pub users: HashedReentrantCollectionView>, } +/// How to interact with a long-lived service runtime. +pub struct ServiceRuntimeEndpoint { + /// How to receive requests. + pub incoming_execution_requests: futures::channel::mpsc::UnboundedReceiver, + /// How to query the runtime. + pub runtime_request_sender: std::sync::mpsc::Sender, +} + #[cfg(with_testing)] impl ExecutionStateView> where @@ -443,10 +451,7 @@ where &mut self, context: QueryContext, query: Query, - incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver< - ExecutionRequest, - >, - runtime_request_sender: &mut std::sync::mpsc::Sender, + endpoint: Option<&mut ServiceRuntimeEndpoint>, ) -> Result { assert_eq!(context.chain_id, self.context().extra().chain_id()); match query { @@ -459,15 +464,22 @@ where bytes, } => { let ExecutionRuntimeConfig {} = self.context().extra().execution_runtime_config(); - let response = self - .query_user_application( - application_id, - context, - bytes, - incoming_execution_requests, - runtime_request_sender, - ) - .await?; + let response = match endpoint { + Some(endpoint) => { + self.query_user_application_with_long_lived_service( + application_id, + context, + bytes, + &mut endpoint.incoming_execution_requests, + &mut endpoint.runtime_request_sender, + ) + .await? + } + None => { + self.query_user_application(application_id, context, bytes) + .await? + } + }; Ok(Response::User(response)) } } @@ -478,6 +490,26 @@ where application_id: UserApplicationId, context: QueryContext, query: Vec, + ) -> Result, ExecutionError> { + let (execution_state_sender, mut execution_state_receiver) = + futures::channel::mpsc::unbounded(); + let execution_outcomes_future = linera_base::task::spawn_blocking(move || { + let mut runtime = ServiceSyncRuntime::new(execution_state_sender, context); + runtime.run_query(application_id, query) + }); + while let Some(request) = execution_state_receiver.next().await { + self.handle_request(request).await?; + } + + let response = execution_outcomes_future.await??; + Ok(response) + } + + async fn query_user_application_with_long_lived_service( + &mut self, + application_id: UserApplicationId, + context: QueryContext, + query: Vec, incoming_execution_requests: &mut futures::channel::mpsc::UnboundedReceiver< ExecutionRequest, >, diff --git a/linera-execution/src/lib.rs b/linera-execution/src/lib.rs index 6fd9944a84a..0748b8d410b 100644 --- a/linera-execution/src/lib.rs +++ b/linera-execution/src/lib.rs @@ -60,7 +60,7 @@ pub use crate::wasm::{ }; pub use crate::{ applications::ApplicationRegistryView, - execution::ExecutionStateView, + execution::{ExecutionStateView, ServiceRuntimeEndpoint}, execution_state_actor::ExecutionRequest, policy::ResourceControlPolicy, resources::{ResourceController, ResourceTracker}, diff --git a/linera-execution/src/test_utils/mod.rs b/linera-execution/src/test_utils/mod.rs index c2e5b3a9fc9..cc7593dcf32 100644 --- a/linera-execution/src/test_utils/mod.rs +++ b/linera-execution/src/test_utils/mod.rs @@ -27,8 +27,8 @@ pub use self::{ }; use crate::{ ApplicationRegistryView, ExecutionRequest, ExecutionRuntimeContext, ExecutionStateView, - QueryContext, ServiceRuntimeRequest, ServiceSyncRuntime, TestExecutionRuntimeContext, - UserApplicationDescription, UserApplicationId, + QueryContext, ServiceRuntimeEndpoint, ServiceRuntimeRequest, ServiceSyncRuntime, + TestExecutionRuntimeContext, UserApplicationDescription, UserApplicationId, }; pub fn create_dummy_user_application_description(index: u64) -> UserApplicationDescription { @@ -109,20 +109,18 @@ impl QueryContext { /// Spawns a thread running the [`ServiceSyncRuntime`] actor. /// /// Returns the endpoints to communicate with the actor. - pub fn spawn_service_runtime_actor( - self, - ) -> ( - futures::channel::mpsc::UnboundedReceiver, - std::sync::mpsc::Sender, - ) { - let (execution_state_sender, execution_state_receiver) = + pub fn spawn_service_runtime_actor(self) -> ServiceRuntimeEndpoint { + let (execution_state_sender, incoming_execution_requests) = futures::channel::mpsc::unbounded(); - let (request_sender, request_receiver) = std::sync::mpsc::channel(); + let (runtime_request_sender, runtime_request_receiver) = std::sync::mpsc::channel(); thread::spawn(move || { - ServiceSyncRuntime::new(execution_state_sender, self).run(request_receiver) + ServiceSyncRuntime::new(execution_state_sender, self).run(runtime_request_receiver) }); - (execution_state_receiver, request_sender) + ServiceRuntimeEndpoint { + incoming_execution_requests, + runtime_request_sender, + } } } diff --git a/linera-execution/tests/test_execution.rs b/linera-execution/tests/test_execution.rs index 4e801c1f169..36377bb1092 100644 --- a/linera-execution/tests/test_execution.rs +++ b/linera-execution/tests/test_execution.rs @@ -197,6 +197,14 @@ async fn test_simple_user_operation() -> anyhow::Result<()> { ] ); + { + let state_key = state_key.clone(); + caller_application.expect_call(ExpectedCall::handle_query(|runtime, _context, _query| { + let state = runtime.read_value_bytes(state_key)?.unwrap_or_default(); + Ok(state) + })); + } + caller_application.expect_call(ExpectedCall::handle_query(|runtime, _context, _query| { let state = runtime.read_value_bytes(state_key)?.unwrap_or_default(); Ok(state) @@ -207,8 +215,21 @@ async fn test_simple_user_operation() -> anyhow::Result<()> { next_block_height: BlockHeight(0), local_time: Timestamp::from(0), }; - let (mut execution_request_receiver, mut runtime_request_sender) = - context.spawn_service_runtime_actor(); + let mut service_runtime_endpoint = context.spawn_service_runtime_actor(); + assert_eq!( + view.query_application( + context, + Query::User { + application_id: caller_id, + bytes: vec![] + }, + Some(&mut service_runtime_endpoint), + ) + .await + .unwrap(), + Response::User(dummy_operation.clone()) + ); + assert_eq!( view.query_application( context, @@ -216,8 +237,7 @@ async fn test_simple_user_operation() -> anyhow::Result<()> { application_id: caller_id, bytes: vec![] }, - &mut execution_request_receiver, - &mut runtime_request_sender, + Some(&mut service_runtime_endpoint), ) .await .unwrap(), diff --git a/linera-execution/tests/test_system_execution.rs b/linera-execution/tests/test_system_execution.rs index 079bdeaa572..7c6db04cb09 100644 --- a/linera-execution/tests/test_system_execution.rs +++ b/linera-execution/tests/test_system_execution.rs @@ -117,12 +117,7 @@ async fn test_simple_system_query() -> anyhow::Result<()> { local_time: Timestamp::from(0), }; let response = view - .query_application( - context, - Query::System(SystemQuery), - &mut futures::channel::mpsc::unbounded().1, - &mut std::sync::mpsc::channel().0, - ) + .query_application(context, Query::System(SystemQuery), None) .await .unwrap(); assert_eq!( diff --git a/linera-execution/tests/wasm.rs b/linera-execution/tests/wasm.rs index 189cdbd8894..40fead5510f 100644 --- a/linera-execution/tests/wasm.rs +++ b/linera-execution/tests/wasm.rs @@ -125,8 +125,7 @@ async fn test_fuel_for_counter_wasm_application( next_block_height: BlockHeight(0), local_time: Timestamp::from(0), }; - let (mut execution_request_receiver, mut runtime_request_sender) = - context.spawn_service_runtime_actor(); + let mut service_runtime_endpoint = context.spawn_service_runtime_actor(); let expected_value = async_graphql::Response::new( async_graphql::Value::from_json(json!({"value" : increments.into_iter().sum::()})) .unwrap(), @@ -136,8 +135,7 @@ async fn test_fuel_for_counter_wasm_application( .query_application( context, Query::user(app_id, &request).unwrap(), - &mut execution_request_receiver, - &mut runtime_request_sender, + Some(&mut service_runtime_endpoint), ) .await? else {