Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Twey committed Oct 10, 2024
1 parent 0ab38f8 commit 781bf9c
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 99 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 41 additions & 22 deletions linera-base/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ mod implementation {
let (sender, receiver) = mpsc::unbounded_channel();
Self {
sender,
join_handle: tokio::task::spawn_blocking(|| futures::executor::block_on(work(receiver.into()))),
join_handle: tokio::task::spawn_blocking(|| {
futures::executor::block_on(work(receiver.into()))
}),
}
}

Expand All @@ -63,15 +65,15 @@ mod implementation {

Check warning on line 65 in linera-base/src/task.rs

View workflow job for this annotation

GitHub Actions / lint

Diff in /home/runner/work/linera-protocol/linera-protocol/linera-base/src/task.rs
#[cfg(web)]
mod implementation {
use std::convert::TryFrom;
use futures::{channel::oneshot, future, stream, StreamExt as _};
use std::convert::TryFrom;
use wasm_bindgen::prelude::*;
use web_sys::js_sys;

use super::*;

/// A type that satisfies the send/receive bounds, but can never be sent or received.
pub enum NoInput { }
pub enum NoInput {}

impl TryFrom<JsValue> for NoInput {
type Error = JsValue;
Expand All @@ -82,7 +84,7 @@ mod implementation {

impl Into<JsValue> for NoInput {
fn into(self) -> JsValue {
match self { }
match self {}
}
}

Expand All @@ -101,7 +103,7 @@ mod implementation {
}
}

impl<T> std::error::Error for SendError<T> { }
impl<T> std::error::Error for SendError<T> {}

/// A new task running in a different thread.
pub struct Blocking<Input = NoInput, Output = ()> {
Expand Down Expand Up @@ -138,23 +140,35 @@ mod implementation {
work: impl FnOnce(InputReceiver<Input>) -> F + Send + 'static,
) -> Self
where
Input: Into<JsValue> + TryFrom<JsValue>,
Output: Send + 'static,
Input: Into<JsValue> + TryFrom<JsValue>,
Output: Send + 'static,
{
let (ready_sender, ready_receiver) = oneshot::channel();
let join_handle = wasm_thread::Builder::new().spawn(|| async move {
let (input_sender, input_receiver) = mpsc::unbounded_channel::<JsValue>();
let input_receiver = tokio_stream::wrappers::UnboundedReceiverStream::new(input_receiver);
let onmessage = wasm_bindgen::closure::Closure::<dyn FnMut(JsValue) -> Result<(), JsError>>::new(move |v: JsValue| -> Result<(), JsError> {
input_sender.send(v)?;
Ok(())
});
js_sys::global().dyn_into::<web_sys::DedicatedWorkerGlobalScope>().unwrap().set_onmessage(Some(onmessage.as_ref().unchecked_ref()));
onmessage.forget(); // doesn't truly forget it, but lets the JS GC take care of it
ready_sender.send(()).unwrap();
work(input_receiver.filter_map(convert_or_discard::<JsValue, Input>)).await
}).expect("should successfully start Web Worker");
ready_receiver.await.expect("should successfully initialize the worker thread");
let join_handle = wasm_thread::Builder::new()
.spawn(|| async move {
let (input_sender, input_receiver) = mpsc::unbounded_channel::<JsValue>();
let input_receiver =
tokio_stream::wrappers::UnboundedReceiverStream::new(input_receiver);
let onmessage = wasm_bindgen::closure::Closure::<
dyn FnMut(JsValue) -> Result<(), JsError>,
>::new(
move |v: JsValue| -> Result<(), JsError> {
input_sender.send(v)?;
Ok(())
},
);
js_sys::global()
.dyn_into::<web_sys::DedicatedWorkerGlobalScope>()
.unwrap()
.set_onmessage(Some(onmessage.as_ref().unchecked_ref()));
onmessage.forget(); // doesn't truly forget it, but lets the JS GC take care of it
ready_sender.send(()).unwrap();
work(input_receiver.filter_map(convert_or_discard::<JsValue, Input>)).await
})
.expect("should successfully start Web Worker");
ready_receiver
.await
.expect("should successfully initialize the worker thread");
Self {
join_handle,
_phantom: Default::default(),
Expand All @@ -164,8 +178,13 @@ mod implementation {
/// Sends a message to the task using
/// [`postMessage`](https://developer.mozilla.org/en-US/docs/Web/API/Worker/postMessage).
pub fn send(&self, message: Input) -> Result<(), SendError<Input>>
where Input: Into<JsValue> + TryFrom<JsValue> + Clone {
self.join_handle.thread().post_message(&message.clone().into()).map_err(|_| SendError(message))
where
Input: Into<JsValue> + TryFrom<JsValue> + Clone,
{
self.join_handle
.thread()
.post_message(&message.clone().into())
.map_err(|_| SendError(message))
}

/// Waits for the task to complete and returns its output.
Expand Down
3 changes: 2 additions & 1 deletion linera-base/src/tracing_web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
//! This module provides unified handling for tracing subscribers within Linera binaries.

use tracing_subscriber::{
prelude::__tracing_subscriber_SubscriberExt as _, util::SubscriberInitExt as _, filter::LevelFilter, Layer as _,
filter::LevelFilter, prelude::__tracing_subscriber_SubscriberExt as _,
util::SubscriberInitExt as _, Layer as _,
};

/// Initializes tracing for the browser, sending messages to the developer console and
Expand Down
12 changes: 4 additions & 8 deletions linera-core/src/chain_worker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,7 @@ where
/// Returns the task handle and the endpoints to interact with the actor.
async fn spawn_service_runtime_actor(
chain_id: ChainId,
) -> (
linera_base::task::Blocking,
ServiceRuntimeEndpoint,
) {
) -> (linera_base::task::Blocking, ServiceRuntimeEndpoint) {
let context = QueryContext {
chain_id,
next_block_height: BlockHeight(0),
Expand All @@ -197,7 +194,8 @@ where

let service_runtime_thread = linera_base::task::Blocking::spawn(move |_| async move {
ServiceSyncRuntime::new(execution_state_sender, context).run(runtime_request_receiver)
}).await;
})
.await;

let endpoint = ServiceRuntimeEndpoint {
incoming_execution_requests,
Expand Down Expand Up @@ -316,9 +314,7 @@ where

if let Some(thread) = self.service_runtime_thread {
drop(self.worker);
thread
.join()
.await
thread.join().await
}

trace!("`ChainWorkerActor` finished");
Expand Down
2 changes: 0 additions & 2 deletions linera-execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ fs = ["tokio/fs"]
metrics = ["prometheus", "linera-views/metrics"]
unstable-oracles = []
wasmer = [
"bytes",
"dep:wasmer",
"linera-witty/wasmer",
"wasm-encoder",
Expand All @@ -37,7 +36,6 @@ anyhow.workspace = true
async-graphql.workspace = true
async-trait.workspace = true
bcs.workspace = true
bytes = { workspace = true, optional = true }
cfg-if.workspace = true
clap.workspace = true
custom_debug_derive.workspace = true
Expand Down
16 changes: 10 additions & 6 deletions linera-execution/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ where

runtime.run_action(application_id, chain_id, action)
}
}).await;
})
.await;

contract_runtime_task.send(code)?;

Expand Down Expand Up @@ -508,11 +509,14 @@ where
let (execution_state_sender, mut execution_state_receiver) =
futures::channel::mpsc::unbounded();
let (code, description) = self.load_service(application_id).await?;
let execution_task = linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(move |_| async move {
let mut runtime = ServiceSyncRuntime::new(execution_state_sender, context);
runtime.preload_service(application_id, code, description)?;
runtime.run_query(application_id, query)
}).await;
let execution_task = linera_base::task::Blocking::<linera_base::task::NoInput, _>::spawn(
move |_| async move {
let mut runtime = ServiceSyncRuntime::new(execution_state_sender, context);
runtime.preload_service(application_id, code, description)?;
runtime.run_query(application_id, query)
},
)
.await;
while let Some(request) = execution_state_receiver.next().await {
self.handle_request(request).await?;
}
Expand Down
10 changes: 8 additions & 2 deletions linera-execution/src/execution_state_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ where
C: Context + Clone + Send + Sync + 'static,
C::Extra: ExecutionRuntimeContext,
{
pub(crate) async fn load_contract(&mut self, id: UserApplicationId) -> Result<(UserContractCode, UserApplicationDescription), ExecutionError> {
pub(crate) async fn load_contract(
&mut self,
id: UserApplicationId,
) -> Result<(UserContractCode, UserApplicationDescription), ExecutionError> {
#[cfg(with_metrics)]
let _latency = LOAD_CONTRACT_LATENCY.measure_latency();
let description = self.system.registry.describe_application(id).await?;
Expand All @@ -78,7 +81,10 @@ where
Ok((code, description))
}

pub(crate) async fn load_service(&mut self, id: UserApplicationId) -> Result<(UserServiceCode, UserApplicationDescription), ExecutionError> {
pub(crate) async fn load_service(
&mut self,
id: UserApplicationId,
) -> Result<(UserServiceCode, UserApplicationDescription), ExecutionError> {
#[cfg(with_metrics)]
let _latency = LOAD_SERVICE_LATENCY.measure_latency();
let description = self.system.registry.describe_application(id).await?;
Expand Down
22 changes: 12 additions & 10 deletions linera-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//! Linera chain.

#![cfg_attr(web, feature(trait_upcasting))]

#![deny(clippy::large_futures)]

mod applications;
Expand Down Expand Up @@ -52,11 +51,6 @@ use serde::{Deserialize, Serialize};
use system::OpenChainConfig;
use thiserror::Error;

#[cfg(web)]
use {
js_sys::wasm_bindgen,
wasm_bindgen::JsValue,
};
#[cfg(with_testing)]
pub use crate::applications::ApplicationRegistry;
use crate::runtime::ContractSyncRuntime;
Expand All @@ -83,6 +77,8 @@ pub use crate::{
},

Check warning on line 77 in linera-execution/src/lib.rs

View workflow job for this annotation

GitHub Actions / lint

Diff in /home/runner/work/linera-protocol/linera-protocol/linera-execution/src/lib.rs
transaction_tracker::TransactionTracker,
};
#[cfg(web)]
use {js_sys::wasm_bindgen, wasm_bindgen::JsValue};

/// The maximum length of an event key in bytes.
const MAX_EVENT_KEY_LEN: usize = 64;
Expand Down Expand Up @@ -143,25 +139,31 @@ impl<T: UserServiceModule + Send + Sync + 'static> From<T> for UserServiceCode {
dyn_clone::clone_trait_object!(UserServiceModule);

impl UserServiceCode {
fn instantiate(&self, runtime: ServiceSyncRuntimeHandle) -> Result<UserServiceInstance, ExecutionError> {
fn instantiate(
&self,
runtime: ServiceSyncRuntimeHandle,
) -> Result<UserServiceInstance, ExecutionError> {
self.0.instantiate(runtime)
}
}

impl UserContractCode {
fn instantiate(&self, runtime: ContractSyncRuntimeHandle) -> Result<UserContractInstance, ExecutionError> {
fn instantiate(
&self,
runtime: ContractSyncRuntimeHandle,
) -> Result<UserContractInstance, ExecutionError> {
self.0.instantiate(runtime)
}
}

#[cfg(not(web))]
impl<T: Send + Sync> Post for T { }
impl<T: Send + Sync> Post for T {}

#[cfg(web)]
const _: () = {
// TODO: add a vtable pointer into the JsValue rather than assuming the implementor

impl<T: dyn_convert::DynInto<JsValue>> Post for T { }
impl<T: dyn_convert::DynInto<JsValue>> Post for T {}

impl Into<JsValue> for UserContractCode {
fn into(self) -> JsValue {
Expand Down
Loading

0 comments on commit 781bf9c

Please sign in to comment.