Skip to content

Commit

Permalink
web-sys agent conversion (#818)
Browse files Browse the repository at this point in the history
* Converting `agent`.

* Remove wrong `cfg` in imports.

* Move general changes to different PR.

* Some optimisations.

* Rename `stdweb` feature to `std_web`.

* Fix `ArrayBuffer` to `Uint8Array` conversions.

* Add js module worker.

* Use `cfg-if`` and `cfg-match` to make things clearer.

* Fix `std_web` build.

* Add some polish.
  • Loading branch information
daxpedda authored and jstarry committed Jan 9, 2020
1 parent 2aa747e commit d9235d9
Showing 1 changed file with 175 additions and 44 deletions.
219 changes: 175 additions & 44 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::callback::Callback;
use crate::scheduler::{scheduler, Runnable, Shared};
use anymap::{self, AnyMap};
use bincode;
use cfg_if::cfg_if;
use cfg_match::cfg_match;
use log::warn;
use serde::{Deserialize, Serialize};
use slab::Slab;
Expand All @@ -14,9 +16,17 @@ use std::fmt;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use stdweb::Value;
#[allow(unused_imports)]
use stdweb::{_js_impl, js};
cfg_if! {
if #[cfg(feature = "std_web")] {
use stdweb::Value;
#[allow(unused_imports)]
use stdweb::{_js_impl, js};
} else if #[cfg(feature = "web_sys")] {
use js_sys::{Reflect, Uint8Array};
use wasm_bindgen::{closure::Closure, JsCast, JsValue};
use web_sys::{DedicatedWorkerGlobalScope, MessageEvent, Worker, WorkerOptions};
}
}

/// Serializable messages to worker
#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -162,21 +172,29 @@ where
ToWorker::Destroy => {
let upd = AgentLifecycleEvent::Destroy;
scope.send(upd);
js! {
// Terminates web worker
self.close();
// Terminates web worker
cfg_match! {
feature = "std_web" => js! { self.close(); },
feature = "web_sys" => worker_self().close(),
};
}
}
};
let loaded: FromWorker<T::Output> = FromWorker::WorkerLoaded;
let loaded = loaded.pack();
js! {
var handler = @{handler};
self.onmessage = function(event) {
handler(event.data);
};
self.postMessage(@{loaded});
cfg_match! {
feature = "std_web" => js! {
var handler = @{handler};
self.onmessage = function(event) {
handler(event.data);
};
self.postMessage(@{loaded});
},
feature = "web_sys" => ({
let worker = worker_self();
worker.set_onmessage_closure(handler);
worker.post_message_vec(loaded);
}),
};
}
}
Expand Down Expand Up @@ -433,13 +451,20 @@ impl Discoverer for Private {
};
// TODO Need somethig better...
let name_of_resource = AGN::name_of_resource();
let worker = js! {
var worker = new Worker(@{name_of_resource});
var handler = @{handler};
worker.onmessage = function(event) {
handler(event.data);
};
return worker;
let worker = cfg_match! {
feature = "std_web" => js! {
var worker = new Worker(@{name_of_resource});
var handler = @{handler};
worker.onmessage = function(event) {
handler(event.data);
};
return worker;
},
feature = "web_sys" => ({
let worker = worker_new(name_of_resource, AGN::is_module());
worker.set_onmessage_closure(handler);
worker
}),
};
let bridge = PrivateBridge {
worker,
Expand All @@ -451,7 +476,10 @@ impl Discoverer for Private {

/// A connection manager for components interaction with workers.
pub struct PrivateBridge<T: Agent> {
#[cfg(feature = "std_web")]
worker: Value,
#[cfg(feature = "web_sys")]
worker: Worker,
_agent: PhantomData<T>,
}

Expand All @@ -467,12 +495,17 @@ impl<AGN: Agent> Bridge<AGN> for PrivateBridge<AGN> {
// Use a queue to collect a messages if an instance is not ready
// and send them to an agent when it will reported readiness.
let msg = ToWorker::ProcessInput(SINGLETON_ID, msg).pack();
let worker = &self.worker;
js! {
var worker = @{worker};
var bytes = @{msg};
worker.postMessage(bytes);
};
cfg_match! {
feature = "std_web" => ({
let worker = &self.worker;
js! {
var worker = @{worker};
var bytes = @{msg};
worker.postMessage(bytes);
};
}),
feature = "web_sys" => self.worker.post_message_vec(msg),
}
}
}

Expand All @@ -483,12 +516,19 @@ impl<AGN: Agent> Drop for PrivateBridge<AGN> {
}

struct RemoteAgent<AGN: Agent> {
#[cfg(feature = "std_web")]
worker: Value,
#[cfg(feature = "web_sys")]
worker: Worker,
slab: SharedOutputSlab<AGN>,
}

impl<AGN: Agent> RemoteAgent<AGN> {
pub fn new(worker: Value, slab: SharedOutputSlab<AGN>) -> Self {
pub fn new(
#[cfg(feature = "std_web")] worker: Value,
#[cfg(feature = "web_sys")] worker: Worker,
slab: SharedOutputSlab<AGN>,
) -> Self {
RemoteAgent { worker, slab }
}

Expand Down Expand Up @@ -533,7 +573,9 @@ impl Discoverer for Public {
Rc::new(RefCell::new(Slab::new()));
let handler = {
let slab = slab.clone();
move |data: Vec<u8>, worker: Value| {
move |data: Vec<u8>,
#[cfg(feature = "std_web")] worker: Value,
#[cfg(feature = "web_sys")] worker: &Worker| {
let msg = FromWorker::<AGN::Output>::unpack(&data);
match msg {
FromWorker::WorkerLoaded => {
Expand All @@ -546,8 +588,13 @@ impl Discoverer for Public {
local.borrow_mut().get_mut(&TypeId::of::<AGN>())
{
for msg in msgs.drain(..) {
let worker = &worker;
js! {@{worker}.postMessage(@{msg});};
cfg_match! {
feature = "std_web" => ({
let worker = &worker;
js! {@{worker}.postMessage(@{msg});};
}),
feature = "web_sys" => worker.post_message_vec(msg),
}
}
}
});
Expand All @@ -559,13 +606,23 @@ impl Discoverer for Public {
}
};
let name_of_resource = AGN::name_of_resource();
let worker = js! {
var worker = new Worker(@{name_of_resource});
var handler = @{handler};
worker.onmessage = function(event) {
handler(event.data, worker);
};
return worker;
let worker = cfg_match! {
feature = "std_web" => js! {
var worker = new Worker(@{name_of_resource});
var handler = @{handler};
worker.onmessage = function(event) {
handler(event.data, worker);
};
return worker;
},
feature = "web_sys" => ({
let worker = worker_new(name_of_resource, AGN::is_module());
let worker_clone = worker.clone();
worker.set_onmessage_closure(move |data: Vec<u8>| {
handler(data, &worker_clone);
});
worker
}),
};
let launched = RemoteAgent::new(worker, slab);
entry.insert(launched).create_bridge(callback)
Expand All @@ -580,7 +637,10 @@ impl Dispatchable for Public {}

/// A connection manager for components interaction with workers.
pub struct PublicBridge<AGN: Agent> {
#[cfg(feature = "std_web")]
worker: Value,
#[cfg(feature = "web_sys")]
worker: Worker,
id: HandlerId,
_agent: PhantomData<AGN>,
}
Expand Down Expand Up @@ -614,15 +674,22 @@ impl<AGN: Agent> PublicBridge<AGN> {
}
}

fn send_to_remote<AGN: Agent>(worker: &Value, msg: ToWorker<AGN::Input>) {
fn send_to_remote<AGN: Agent>(
#[cfg(feature = "std_web")] worker: &Value,
#[cfg(feature = "web_sys")] worker: &Worker,
msg: ToWorker<AGN::Input>,
) {
// TODO Important! Implement.
// Use a queue to collect a messages if an instance is not ready
// and send them to an agent when it will reported readiness.
let msg = msg.pack();
js! {
var worker = @{worker};
var bytes = @{msg};
worker.postMessage(bytes);
cfg_match! {
feature = "std_web" => js! {
var worker = @{worker};
var bytes = @{msg};
worker.postMessage(bytes);
},
feature = "web_sys" => worker.post_message_vec(msg),
};
}

Expand Down Expand Up @@ -705,6 +772,12 @@ pub trait Agent: Sized + 'static {
fn name_of_resource() -> &'static str {
"main.js"
}

/// Signifies if resource is a module.
/// This has pending browser support.
fn is_module() -> bool {
false
}
}

/// This struct holds a reference to a component and to a global scheduler.
Expand Down Expand Up @@ -761,9 +834,12 @@ impl<AGN: Agent> Responder<AGN> for WorkerResponder {
fn respond(&self, id: HandlerId, output: AGN::Output) {
let msg = FromWorker::ProcessOutput(id, output);
let data = msg.pack();
js! {
var data = @{data};
self.postMessage(data);
cfg_match! {
feature = "std_web" => js! {
var data = @{data};
self.postMessage(data);
},
feature = "web_sys" => worker_self().post_message_vec(data),
};
}
}
Expand Down Expand Up @@ -904,3 +980,58 @@ where
}
}
}

#[cfg(feature = "web_sys")]
fn worker_new(name_of_resource: &str, is_module: bool) -> Worker {
if is_module {
let options = WorkerOptions::new();
Reflect::set(
options.as_ref(),
&JsValue::from_str("type"),
&JsValue::from_str("module"),
)
.unwrap();
Worker::new_with_options(name_of_resource, &options).expect("failed to spawn worker")
} else {
Worker::new(name_of_resource).expect("failed to spawn worker")
}
}

#[cfg(feature = "web_sys")]
fn worker_self() -> DedicatedWorkerGlobalScope {
JsValue::from(js_sys::global()).into()
}

#[cfg(feature = "web_sys")]
trait WorkerExt {
fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec<u8>));

fn post_message_vec(&self, data: Vec<u8>);
}

#[cfg(feature = "web_sys")]
macro_rules! worker_ext_impl {
($($type:ident),+) => {$(
impl WorkerExt for $type {
fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec<u8>)) {
let handler = move |message: MessageEvent| {
let data = Uint8Array::from(message.data()).to_vec();
handler(data);
};
let closure = Closure::wrap(Box::new(handler) as Box<dyn Fn(MessageEvent)>);
self.set_onmessage(Some(closure.as_ref().unchecked_ref()));
closure.forget();
}

fn post_message_vec(&self, data: Vec<u8>) {
self.post_message(&Uint8Array::from(data.as_slice()))
.expect("failed to post message");
}
}
)+};
}

#[cfg(feature = "web_sys")]
worker_ext_impl! {
Worker, DedicatedWorkerGlobalScope
}

0 comments on commit d9235d9

Please sign in to comment.