From 781bf9c1b76c8366cea2fa5b89f5e42e79341c98 Mon Sep 17 00:00:00 2001 From: James Kay Date: Thu, 10 Oct 2024 02:55:44 +0100 Subject: [PATCH] Cleanup --- Cargo.lock | 3 +- examples/Cargo.lock | 1 - linera-base/src/task.rs | 63 ++++++++++++------- linera-base/src/tracing_web.rs | 3 +- linera-core/src/chain_worker/actor.rs | 12 ++-- linera-execution/Cargo.toml | 2 - linera-execution/src/execution.rs | 16 +++-- linera-execution/src/execution_state_actor.rs | 10 ++- linera-execution/src/lib.rs | 22 ++++--- linera-execution/src/runtime.rs | 57 +++++++++++------ linera-execution/src/wasm/mod.rs | 39 +++++++----- linera-execution/src/wasm/wasmer.rs | 13 ++-- linera-storage/src/lib.rs | 17 +++-- 13 files changed, 159 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c557bcdc990..8f58c534c85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4419,7 +4419,6 @@ dependencies = [ "async-graphql", "async-trait", "bcs", - "bytes", "cfg-if", "cfg_aliases", "clap", @@ -8521,7 +8520,7 @@ dependencies = [ [[package]] name = "wasm_thread" version = "0.3.0" -source = "git+https://github.com/Twey/wasm_thread?branch=post-message#a55041aa3c854eb7aa2caa10b15d8ba377bde9eb" +source = "git+https://github.com/Twey/wasm_thread?branch=post-message#f4bc74d7095f007b6767338531bceb1ba559e000" dependencies = [ "futures", "js-sys", diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 5924806efe2..e4d3cfbdf70 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -3433,7 +3433,6 @@ dependencies = [ "async-graphql", "async-trait", "bcs", - "bytes", "cfg-if", "cfg_aliases", "clap", diff --git a/linera-base/src/task.rs b/linera-base/src/task.rs index c953dde7209..400746628a4 100644 --- a/linera-base/src/task.rs +++ b/linera-base/src/task.rs @@ -45,7 +45,9 @@ mod implementation { let (sender, receiver) = mpsc::unbounded_channel(); Self { sender, - join_handle: tokio::task::spawn_blocking(|| futures::executor::block_on(work(receiver.into()))), + join_handle: tokio::task::spawn_blocking(|| { + futures::executor::block_on(work(receiver.into())) + }), } } @@ -63,15 +65,15 @@ mod implementation { #[cfg(web)] mod implementation { - use std::convert::TryFrom; use futures::{channel::oneshot, future, stream, StreamExt as _}; + use std::convert::TryFrom; use wasm_bindgen::prelude::*; use web_sys::js_sys; use super::*; /// A type that satisfies the send/receive bounds, but can never be sent or received. - pub enum NoInput { } + pub enum NoInput {} impl TryFrom for NoInput { type Error = JsValue; @@ -82,7 +84,7 @@ mod implementation { impl Into for NoInput { fn into(self) -> JsValue { - match self { } + match self {} } } @@ -101,7 +103,7 @@ mod implementation { } } - impl std::error::Error for SendError { } + impl std::error::Error for SendError {} /// A new task running in a different thread. pub struct Blocking { @@ -138,23 +140,35 @@ mod implementation { work: impl FnOnce(InputReceiver) -> F + Send + 'static, ) -> Self where - Input: Into + TryFrom, - Output: Send + 'static, + Input: Into + TryFrom, + Output: Send + 'static, { let (ready_sender, ready_receiver) = oneshot::channel(); - let join_handle = wasm_thread::Builder::new().spawn(|| async move { - let (input_sender, input_receiver) = mpsc::unbounded_channel::(); - let input_receiver = tokio_stream::wrappers::UnboundedReceiverStream::new(input_receiver); - let onmessage = wasm_bindgen::closure::Closure:: Result<(), JsError>>::new(move |v: JsValue| -> Result<(), JsError> { - input_sender.send(v)?; - Ok(()) - }); - js_sys::global().dyn_into::().unwrap().set_onmessage(Some(onmessage.as_ref().unchecked_ref())); - onmessage.forget(); // doesn't truly forget it, but lets the JS GC take care of it - ready_sender.send(()).unwrap(); - work(input_receiver.filter_map(convert_or_discard::)).await - }).expect("should successfully start Web Worker"); - ready_receiver.await.expect("should successfully initialize the worker thread"); + let join_handle = wasm_thread::Builder::new() + .spawn(|| async move { + let (input_sender, input_receiver) = mpsc::unbounded_channel::(); + let input_receiver = + tokio_stream::wrappers::UnboundedReceiverStream::new(input_receiver); + let onmessage = wasm_bindgen::closure::Closure::< + dyn FnMut(JsValue) -> Result<(), JsError>, + >::new( + move |v: JsValue| -> Result<(), JsError> { + input_sender.send(v)?; + Ok(()) + }, + ); + js_sys::global() + .dyn_into::() + .unwrap() + .set_onmessage(Some(onmessage.as_ref().unchecked_ref())); + onmessage.forget(); // doesn't truly forget it, but lets the JS GC take care of it + ready_sender.send(()).unwrap(); + work(input_receiver.filter_map(convert_or_discard::)).await + }) + .expect("should successfully start Web Worker"); + ready_receiver + .await + .expect("should successfully initialize the worker thread"); Self { join_handle, _phantom: Default::default(), @@ -164,8 +178,13 @@ mod implementation { /// Sends a message to the task using /// [`postMessage`](https://developer.mozilla.org/en-US/docs/Web/API/Worker/postMessage). pub fn send(&self, message: Input) -> Result<(), SendError> - where Input: Into + TryFrom + Clone { - self.join_handle.thread().post_message(&message.clone().into()).map_err(|_| SendError(message)) + where + Input: Into + TryFrom + Clone, + { + self.join_handle + .thread() + .post_message(&message.clone().into()) + .map_err(|_| SendError(message)) } /// Waits for the task to complete and returns its output. diff --git a/linera-base/src/tracing_web.rs b/linera-base/src/tracing_web.rs index 91582b6cdb1..51e64440eed 100644 --- a/linera-base/src/tracing_web.rs +++ b/linera-base/src/tracing_web.rs @@ -4,7 +4,8 @@ //! This module provides unified handling for tracing subscribers within Linera binaries. use tracing_subscriber::{ - prelude::__tracing_subscriber_SubscriberExt as _, util::SubscriberInitExt as _, filter::LevelFilter, Layer as _, + filter::LevelFilter, prelude::__tracing_subscriber_SubscriberExt as _, + util::SubscriberInitExt as _, Layer as _, }; /// Initializes tracing for the browser, sending messages to the developer console and diff --git a/linera-core/src/chain_worker/actor.rs b/linera-core/src/chain_worker/actor.rs index c456b3763e6..1c91edb5b6d 100644 --- a/linera-core/src/chain_worker/actor.rs +++ b/linera-core/src/chain_worker/actor.rs @@ -181,10 +181,7 @@ where /// Returns the task handle and the endpoints to interact with the actor. async fn spawn_service_runtime_actor( chain_id: ChainId, - ) -> ( - linera_base::task::Blocking, - ServiceRuntimeEndpoint, - ) { + ) -> (linera_base::task::Blocking, ServiceRuntimeEndpoint) { let context = QueryContext { chain_id, next_block_height: BlockHeight(0), @@ -197,7 +194,8 @@ where let service_runtime_thread = linera_base::task::Blocking::spawn(move |_| async move { ServiceSyncRuntime::new(execution_state_sender, context).run(runtime_request_receiver) - }).await; + }) + .await; let endpoint = ServiceRuntimeEndpoint { incoming_execution_requests, @@ -316,9 +314,7 @@ where if let Some(thread) = self.service_runtime_thread { drop(self.worker); - thread - .join() - .await + thread.join().await } trace!("`ChainWorkerActor` finished"); diff --git a/linera-execution/Cargo.toml b/linera-execution/Cargo.toml index 782e2073ead..7e1d9aab4a6 100644 --- a/linera-execution/Cargo.toml +++ b/linera-execution/Cargo.toml @@ -17,7 +17,6 @@ fs = ["tokio/fs"] metrics = ["prometheus", "linera-views/metrics"] unstable-oracles = [] wasmer = [ - "bytes", "dep:wasmer", "linera-witty/wasmer", "wasm-encoder", @@ -37,7 +36,6 @@ anyhow.workspace = true async-graphql.workspace = true async-trait.workspace = true bcs.workspace = true -bytes = { workspace = true, optional = true } cfg-if.workspace = true clap.workspace = true custom_debug_derive.workspace = true diff --git a/linera-execution/src/execution.rs b/linera-execution/src/execution.rs index 16233a77c09..bc7a028cad7 100644 --- a/linera-execution/src/execution.rs +++ b/linera-execution/src/execution.rs @@ -215,7 +215,8 @@ where runtime.run_action(application_id, chain_id, action) } - }).await; + }) + .await; contract_runtime_task.send(code)?; @@ -508,11 +509,14 @@ where let (execution_state_sender, mut execution_state_receiver) = futures::channel::mpsc::unbounded(); let (code, description) = self.load_service(application_id).await?; - let execution_task = linera_base::task::Blocking::::spawn(move |_| async move { - let mut runtime = ServiceSyncRuntime::new(execution_state_sender, context); - runtime.preload_service(application_id, code, description)?; - runtime.run_query(application_id, query) - }).await; + let execution_task = linera_base::task::Blocking::::spawn( + move |_| async move { + let mut runtime = ServiceSyncRuntime::new(execution_state_sender, context); + runtime.preload_service(application_id, code, description)?; + runtime.run_query(application_id, query) + }, + ) + .await; while let Some(request) = execution_state_receiver.next().await { self.handle_request(request).await?; } diff --git a/linera-execution/src/execution_state_actor.rs b/linera-execution/src/execution_state_actor.rs index 0da54eb5c54..85a1a5f483a 100644 --- a/linera-execution/src/execution_state_actor.rs +++ b/linera-execution/src/execution_state_actor.rs @@ -66,7 +66,10 @@ where C: Context + Clone + Send + Sync + 'static, C::Extra: ExecutionRuntimeContext, { - pub(crate) async fn load_contract(&mut self, id: UserApplicationId) -> Result<(UserContractCode, UserApplicationDescription), ExecutionError> { + pub(crate) async fn load_contract( + &mut self, + id: UserApplicationId, + ) -> Result<(UserContractCode, UserApplicationDescription), ExecutionError> { #[cfg(with_metrics)] let _latency = LOAD_CONTRACT_LATENCY.measure_latency(); let description = self.system.registry.describe_application(id).await?; @@ -78,7 +81,10 @@ where Ok((code, description)) } - pub(crate) async fn load_service(&mut self, id: UserApplicationId) -> Result<(UserServiceCode, UserApplicationDescription), ExecutionError> { + pub(crate) async fn load_service( + &mut self, + id: UserApplicationId, + ) -> Result<(UserServiceCode, UserApplicationDescription), ExecutionError> { #[cfg(with_metrics)] let _latency = LOAD_SERVICE_LATENCY.measure_latency(); let description = self.system.registry.describe_application(id).await?; diff --git a/linera-execution/src/lib.rs b/linera-execution/src/lib.rs index bfdefaa0e93..f300fe8d142 100644 --- a/linera-execution/src/lib.rs +++ b/linera-execution/src/lib.rs @@ -5,7 +5,6 @@ //! Linera chain. #![cfg_attr(web, feature(trait_upcasting))] - #![deny(clippy::large_futures)] mod applications; @@ -52,11 +51,6 @@ use serde::{Deserialize, Serialize}; use system::OpenChainConfig; use thiserror::Error; -#[cfg(web)] -use { - js_sys::wasm_bindgen, - wasm_bindgen::JsValue, -}; #[cfg(with_testing)] pub use crate::applications::ApplicationRegistry; use crate::runtime::ContractSyncRuntime; @@ -83,6 +77,8 @@ pub use crate::{ }, transaction_tracker::TransactionTracker, }; +#[cfg(web)] +use {js_sys::wasm_bindgen, wasm_bindgen::JsValue}; /// The maximum length of an event key in bytes. const MAX_EVENT_KEY_LEN: usize = 64; @@ -143,25 +139,31 @@ impl From for UserServiceCode { dyn_clone::clone_trait_object!(UserServiceModule); impl UserServiceCode { - fn instantiate(&self, runtime: ServiceSyncRuntimeHandle) -> Result { + fn instantiate( + &self, + runtime: ServiceSyncRuntimeHandle, + ) -> Result { self.0.instantiate(runtime) } } impl UserContractCode { - fn instantiate(&self, runtime: ContractSyncRuntimeHandle) -> Result { + fn instantiate( + &self, + runtime: ContractSyncRuntimeHandle, + ) -> Result { self.0.instantiate(runtime) } } #[cfg(not(web))] -impl Post for T { } +impl Post for T {} #[cfg(web)] const _: () = { // TODO: add a vtable pointer into the JsValue rather than assuming the implementor - impl> Post for T { } + impl> Post for T {} impl Into for UserContractCode { fn into(self) -> JsValue { diff --git a/linera-execution/src/runtime.rs b/linera-execution/src/runtime.rs index 8c0d0aa39e8..fb59625eeb1 100644 --- a/linera-execution/src/runtime.rs +++ b/linera-execution/src/runtime.rs @@ -32,8 +32,8 @@ use crate::{ util::{ReceiverExt, UnboundedSenderExt}, BaseRuntime, ContractRuntime, ExecutionError, FinalizeContext, MessageContext, OperationContext, QueryContext, RawExecutionOutcome, ServiceRuntime, TransactionTracker, - UserApplicationDescription, UserApplicationId, UserContractInstance, UserServiceInstance, - MAX_EVENT_KEY_LEN, MAX_STREAM_NAME_LEN, UserContractCode, UserServiceCode, + UserApplicationDescription, UserApplicationId, UserContractCode, UserContractInstance, + UserServiceCode, UserServiceInstance, MAX_EVENT_KEY_LEN, MAX_STREAM_NAME_LEN, }; #[cfg(test)] @@ -1071,18 +1071,26 @@ impl ContractSyncRuntime { } // TODO: deduplicate `Contract`/`Service` traits and types - pub(crate) fn preload_contract(&self, id: UserApplicationId, code: UserContractCode, description: UserApplicationDescription) -> Result<(), ExecutionError> { - let this = self.0.as_ref().expect("contracts shouldn't be preloaded while the runtime is being dropped"); + pub(crate) fn preload_contract( + &self, + id: UserApplicationId, + code: UserContractCode, + description: UserApplicationDescription, + ) -> Result<(), ExecutionError> { + let this = self + .0 + .as_ref() + .expect("contracts shouldn't be preloaded while the runtime is being dropped"); let runtime_handle = this.clone(); let mut this_guard = this.inner(); - if let std::collections::hash_map::Entry::Vacant(e) = this_guard.loaded_applications.entry(id) { - e.insert( - LoadedApplication::new( - code.instantiate(runtime_handle)?, - description, - ), - ); + if let std::collections::hash_map::Entry::Vacant(e) = + this_guard.loaded_applications.entry(id) + { + e.insert(LoadedApplication::new( + code.instantiate(runtime_handle)?, + description, + )); this_guard.applications_to_finalize.push(id); } @@ -1416,18 +1424,27 @@ impl ServiceSyncRuntime { } } - pub(crate) fn preload_service(&self, id: UserApplicationId, code: UserServiceCode, description: UserApplicationDescription) -> Result<(), ExecutionError> { - let this = self.runtime.0.as_ref().expect("services shouldn't be preloaded while the runtime is being dropped"); + pub(crate) fn preload_service( + &self, + id: UserApplicationId, + code: UserServiceCode, + description: UserApplicationDescription, + ) -> Result<(), ExecutionError> { + let this = self + .runtime + .0 + .as_ref() + .expect("services shouldn't be preloaded while the runtime is being dropped"); let runtime_handle = this.clone(); let mut this_guard = this.inner(); - if let std::collections::hash_map::Entry::Vacant(e) = this_guard.loaded_applications.entry(id) { - e.insert( - LoadedApplication::new( - code.instantiate(runtime_handle)?, - description, - ), - ); + if let std::collections::hash_map::Entry::Vacant(e) = + this_guard.loaded_applications.entry(id) + { + e.insert(LoadedApplication::new( + code.instantiate(runtime_handle)?, + description, + )); this_guard.applications_to_finalize.push(id); } diff --git a/linera-execution/src/wasm/mod.rs b/linera-execution/src/wasm/mod.rs index a559e3f547e..241c25dcd9b 100644 --- a/linera-execution/src/wasm/mod.rs +++ b/linera-execution/src/wasm/mod.rs @@ -21,17 +21,17 @@ mod wasmer; mod wasmtime; use linera_base::data_types::Bytecode; -#[cfg(with_metrics)] -use { - std::sync::LazyLock, - linera_base::prometheus_util::{self, MeasureLatency}, - prometheus::HistogramVec, -}; use thiserror::Error; #[cfg(with_wasmer)] use wasmer::{WasmerContractInstance, WasmerServiceInstance}; #[cfg(with_wasmtime)] use wasmtime::{WasmtimeContractInstance, WasmtimeServiceInstance}; +#[cfg(with_metrics)] +use { + linera_base::prometheus_util::{self, MeasureLatency}, + prometheus::HistogramVec, + std::sync::LazyLock, +}; use self::sanitizer::sanitize; pub use self::{ @@ -215,17 +215,18 @@ impl UserServiceModule for WasmServiceModule { #[cfg(web)] const _: () = { - use { - js_sys::wasm_bindgen::{JsValue, JsCast as _}, - }; + use js_sys::wasm_bindgen::{JsCast as _, JsValue}; impl TryFrom for WasmServiceModule { type Error = JsValue; fn try_from(value: JsValue) -> Result { // XXX: currently `Wasmer` is the only backend enabled on the Web. - #[cfg(with_wasmer)] { - Ok(Self::Wasmer { module: value.dyn_into::()?.into() }) + #[cfg(with_wasmer)] + { + Ok(Self::Wasmer { + module: value.dyn_into::()?.into(), + }) } #[cfg(not(with_wasmer))] @@ -237,7 +238,9 @@ const _: () = { fn from(module: WasmServiceModule) -> JsValue { match module { #[cfg(with_wasmer)] - WasmServiceModule::Wasmer { module } => ::wasmer::Module::clone(&module).clone().into(), + WasmServiceModule::Wasmer { module } => { + ::wasmer::Module::clone(&module).clone().into() + } } } } @@ -247,8 +250,12 @@ const _: () = { fn try_from(value: JsValue) -> Result { // XXX: currently `Wasmer` is the only backend enabled on the Web. - #[cfg(with_wasmer)] { - Ok(Self::Wasmer { module: value.dyn_into::()?.into(), engine: Default::default() }) + #[cfg(with_wasmer)] + { + Ok(Self::Wasmer { + module: value.dyn_into::()?.into(), + engine: Default::default(), + }) } #[cfg(not(with_wasmer))] @@ -260,7 +267,9 @@ const _: () = { fn from(module: WasmContractModule) -> JsValue { match module { #[cfg(with_wasmer)] - WasmContractModule::Wasmer { module, engine: _ } => ::wasmer::Module::clone(&module).clone().into(), + WasmContractModule::Wasmer { module, engine: _ } => { + ::wasmer::Module::clone(&module).clone().into() + } } } } diff --git a/linera-execution/src/wasm/wasmer.rs b/linera-execution/src/wasm/wasmer.rs index 1a15d979235..04026f3013f 100644 --- a/linera-execution/src/wasm/wasmer.rs +++ b/linera-execution/src/wasm/wasmer.rs @@ -109,7 +109,10 @@ where Runtime: ServiceRuntime + WriteBatch + Clone + Unpin + 'static, { /// Prepares a runtime instance to call into the Wasm service. - pub fn prepare(service_module: &wasmer::Module, runtime: Runtime) -> Result { + pub fn prepare( + service_module: &wasmer::Module, + runtime: Runtime, + ) -> Result { let system_api_data = SystemApiData::new(runtime); let mut instance_builder = InstanceBuilder::new(SERVICE_ENGINE.clone(), system_api_data); @@ -266,7 +269,9 @@ impl CachedContractModule { } /// Creates a [`Module`] from a compiled contract using a headless [`Engine`]. - pub fn create_execution_instance(&self) -> Result<(wasmer::Engine, wasmer::Module), anyhow::Error> { + pub fn create_execution_instance( + &self, + ) -> Result<(wasmer::Engine, wasmer::Module), anyhow::Error> { #[cfg(web)] { Ok((wasmer::Engine::default(), self.0.clone())) @@ -277,9 +282,7 @@ impl CachedContractModule { let engine = wasmer::Engine::default(); let store = wasmer::Store::new(engine.clone()); let bytes = self.0.serialize()?; - let module = unsafe { - wasmer::Module::deserialize(&store, bytes) - }?; + let module = unsafe { wasmer::Module::deserialize(&store, bytes) }?; Ok((engine, module)) } } diff --git a/linera-storage/src/lib.rs b/linera-storage/src/lib.rs index 7f1f182acbd..8a7f81b2f19 100644 --- a/linera-storage/src/lib.rs +++ b/linera-storage/src/lib.rs @@ -300,8 +300,12 @@ pub trait Storage: Sized { compressed_bytes: contract_blob.inner_bytes(), }; let contract_bytecode = - linera_base::task::Blocking::::spawn(move |_| async move { compressed_contract_bytecode.decompress() }).await - .join().await?; + linera_base::task::Blocking::::spawn( + move |_| async move { compressed_contract_bytecode.decompress() }, + ) + .await + .join() + .await?; Ok(WasmContractModule::new(contract_bytecode, wasm_runtime) .await? .into()) @@ -338,9 +342,12 @@ pub trait Storage: Sized { let compressed_service_bytecode = CompressedBytecode { compressed_bytes: service_blob.inner_bytes(), }; - let service_bytecode = - linera_base::task::Blocking::::spawn(move |_| async move { compressed_service_bytecode.decompress() }) - .await.join().await?; + let service_bytecode = linera_base::task::Blocking::::spawn( + move |_| async move { compressed_service_bytecode.decompress() }, + ) + .await + .join() + .await?; Ok(WasmServiceModule::new(service_bytecode, wasm_runtime) .await? .into())