Skip to content

Commit

Permalink
Web worker comms
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Dec 11, 2024
1 parent 0970280 commit fe70001
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 6 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ clap = { workspace = true }
console_error_panic_hook = { version = "0.1.7", optional = true }
futures = { workspace = true }
libp2p = { workspace = true }
serde_json = "1.0.125"
sp-io = { version = "30", features = ["disable_allocator", "disable_panic_handler"], default-features = false }
tokio = { version = "^1", default-features = false, features = ["sync", "macros", "io-util", "rt"] }
tokio_with_wasm = { version = "0.7.1", features = ["sync", "macros", "rt"] }
Expand Down
121 changes: 116 additions & 5 deletions web/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,53 @@

use std::sync::Arc;

use avail_light_core::data;
use avail_light_core::api::configuration::SharedConfig;
use avail_light_core::api::types::{PublishMessage, Request, Topic};
use avail_light_core::api::v2::transactions::Submitter;
use avail_light_core::light_client::OutputEvent as LcEvent;
use avail_light_core::network::{self, p2p, rpc, Network};
use avail_light_core::shutdown::Controller;
use avail_light_core::types::{Delay, PeerAddress};
use avail_light_core::utils::spawn_in_span;
use avail_light_core::{api, data};
use avail_rust::kate_recovery::couscous;
use clap::ValueEnum;
use libp2p::Multiaddr;
use std::str::FromStr;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::{
broadcast,
mpsc::{self, UnboundedSender},
};
use tokio_with_wasm::alias as tokio;
use tracing::{error, info, warn};
use wasm_bindgen::prelude::*;
use web_sys::js_sys;
use web_time::Duration;

#[tokio::main(flavor = "current_thread")]
#[wasm_bindgen(start)]
async fn main_js() {}

static mut SENDER: Option<UnboundedSender<String>> = None;

#[wasm_bindgen]
pub fn post_message(message: String) {
unsafe {
if let Some(sender) = &SENDER {
sender.send(message).expect("TODO");
}
}
}

fn send_message_to_browser(message: &str) {
let worker_scope = js_sys::global();
worker_scope
.dyn_ref::<web_sys::DedicatedWorkerGlobalScope>()
.expect("Should be running in a Web Worker")
.post_message(&JsValue::from_str(message))
.expect("Failed to post message");
}

#[wasm_bindgen]
pub async fn run(network_param: Option<String>, bootstrap_param: Option<String>) {
console_error_panic_hook::set_once();
Expand Down Expand Up @@ -62,6 +89,7 @@ pub async fn run(network_param: Option<String>, bootstrap_param: Option<String>)

let genesis_hash = &network.genesis_hash().to_string();
let (rpc_event_sender, rpc_event_receiver) = broadcast::channel(1000);
let mut publish_rpc_event_receiver = rpc_event_sender.subscribe();

let (rpc_client, rpc_subscriptions) = rpc::init(
db.clone(),
Expand All @@ -75,7 +103,7 @@ pub async fn run(network_param: Option<String>, bootstrap_param: Option<String>)

let (id_keys, _peer_id) = p2p::identity(&cfg_libp2p, db.clone()).unwrap();

let (p2p_client, p2p_event_loop, _p2p_event_receiver) = p2p::init(
let (p2p_client, _p2p_event_loop, _p2p_event_receiver) = p2p::init(
cfg_libp2p.clone(),
Default::default(),
id_keys,
Expand Down Expand Up @@ -103,7 +131,7 @@ pub async fn run(network_param: Option<String>, bootstrap_param: Option<String>)
)));

let (lc_sender, mut lc_receiver) = mpsc::unbounded_channel::<LcEvent>();
let (block_tx, _block_rx) =
let (block_tx, mut block_rx) =
broadcast::channel::<avail_light_core::types::BlockVerified>(1 << 7);

let channels = avail_light_core::types::ClientChannels {
Expand Down Expand Up @@ -131,7 +159,55 @@ pub async fn run(network_param: Option<String>, bootstrap_param: Option<String>)
lc_sender,
));

tokio::task::spawn(p2p_event_loop.run());
let topic = Topic::HeaderVerified;
spawn_in_span(shutdown.with_cancel(async move {
loop {
let message = match publish_rpc_event_receiver.recv().await {
Ok(value) => value,
Err(error) => {
error!(?topic, "Cannot receive message: {error}");
return;
},
};
let message: Option<PublishMessage> = match message.try_into() {
Ok(Some(message)) => Some(message),
Ok(None) => continue, // Silently skip
Err(error) => {
error!(?topic, "Cannot create message: {error}");
continue;
},
};

let message = serde_json::to_string(&message).unwrap();
send_message_to_browser(&message)
}
}));

let topic = Topic::ConfidenceAchieved;
spawn_in_span(shutdown.with_cancel(async move {
loop {
let message = match block_rx.recv().await {
Ok(value) => value,
Err(error) => {
error!(?topic, "Cannot receive message: {error}");
return;
},
};
let message: Option<PublishMessage> = match message.try_into() {
Ok(Some(message)) => Some(message),
Ok(None) => continue, // Silently skip
Err(error) => {
error!(?topic, "Cannot create message: {error}");
continue;
},
};

let message = serde_json::to_string(&message).unwrap();
send_message_to_browser(&message)
}
}));

// tokio::task::spawn(_p2p_event_loop.run());

let bootstraps = cfg_libp2p.bootstraps.clone();
let bootstrap_p2p_client = p2p_client.clone();
Expand All @@ -151,6 +227,41 @@ pub async fn run(network_param: Option<String>, bootstrap_param: Option<String>)
}
}));

let (sender, mut receiver) = mpsc::unbounded_channel::<String>();
unsafe {
SENDER = Some(sender);
}

let config = SharedConfig::default();

spawn_in_span(async move {
loop {
if let Some(message) = receiver.recv().await {
info!("Received message: {message}");
let request: Request = match serde_json::from_str(&message) {
Ok(request) => request,
Err(error) => {
error!("Failed to parse request: {error}");
continue;
},
};

let Ok(response) = api::v2::messages::handle_request(
request,
version,
&config,
None::<Arc<Submitter<data::DB>>>,
db.clone(),
)
.await
else {
continue;
};
send_message_to_browser(&serde_json::to_string(&response).unwrap());
}
}
});

if let Err(error) = light_client_handle.await {
error!("Error running light client: {error}")
};
Expand Down
11 changes: 11 additions & 0 deletions web/www/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,19 @@
</head>
<body>
<h1>Avail Light Client Web</h1>
<!-- <script type="module" src="./avail-light.js"></script> -->
<script type="module">
const worker = new Worker('./avail-light.js', { type: 'module' });
worker.onmessage = (event) => {
console.log(event.data);
let { message, topic } = JSON.parse(event.data);
if (topic == "header-verified") {
document.getElementById("text").innerHTML = message.block_number;
};
};
window.worker = worker;
// worker.postMessage({"type":"version","request_id":"{uuid}"});
</script>
<span id="text" />
</body>
</html>

0 comments on commit fe70001

Please sign in to comment.