diff --git a/examples/agents/src/bin/worker.rs b/examples/agents/src/bin/worker.rs index c9febfed648..c7b2940504b 100644 --- a/examples/agents/src/bin/worker.rs +++ b/examples/agents/src/bin/worker.rs @@ -1,5 +1,5 @@ use agents::native_worker::Worker; -use yew_agent::PublicAgent; +use yew_agent::PublicWorker; fn main() { wasm_logger::init(wasm_logger::Config::default()); diff --git a/examples/agents/src/lib.rs b/examples/agents/src/lib.rs index 3e2d598e6b2..7be78dfbdab 100644 --- a/examples/agents/src/lib.rs +++ b/examples/agents/src/lib.rs @@ -1,5 +1,6 @@ pub mod native_worker; +use std::rc::Rc; use yew::{html, Component, Context, Html}; use yew_agent::{Bridge, Bridged}; @@ -17,9 +18,9 @@ impl Component for Model { type Properties = (); fn create(ctx: &Context) -> Self { - let link = ctx.link(); - let callback = link.callback(|_| Msg::DataReceived); - let worker = native_worker::Worker::bridge(callback); + let link = ctx.link().clone(); + let callback = move |_| link.send_message(Msg::DataReceived); + let worker = native_worker::Worker::bridge(Rc::new(callback)); Self { worker } } diff --git a/examples/agents/src/native_worker.rs b/examples/agents/src/native_worker.rs index 0ab156ce12b..12a2a061a44 100644 --- a/examples/agents/src/native_worker.rs +++ b/examples/agents/src/native_worker.rs @@ -1,6 +1,6 @@ use gloo_timers::callback::Interval; use serde::{Deserialize, Serialize}; -use yew_agent::{Agent, AgentLink, HandlerId, Public}; +use yew_agent::{HandlerId, Public, WorkerLink}; #[derive(Serialize, Deserialize, Debug)] pub enum Request { @@ -17,17 +17,17 @@ pub enum Msg { } pub struct Worker { - link: AgentLink, + link: WorkerLink, _interval: Interval, } -impl Agent for Worker { +impl yew_agent::Worker for Worker { type Reach = Public; type Message = Msg; type Input = Request; type Output = Response; - fn create(link: AgentLink) -> Self { + fn create(link: WorkerLink) -> Self { let duration = 3; let interval = { diff --git a/examples/web_worker_fib/src/agent.rs b/examples/web_worker_fib/src/agent.rs index ce4e6216e6c..7f35e379aaa 100644 --- a/examples/web_worker_fib/src/agent.rs +++ b/examples/web_worker_fib/src/agent.rs @@ -1,8 +1,8 @@ use serde::{Deserialize, Serialize}; -use yew_agent::{Agent, AgentLink, HandlerId, Public}; +use yew_agent::{HandlerId, Public, WorkerLink}; pub struct Worker { - link: AgentLink, + link: WorkerLink, } #[derive(Serialize, Deserialize)] @@ -15,13 +15,13 @@ pub struct WorkerOutput { pub value: u32, } -impl Agent for Worker { +impl yew_agent::Worker for Worker { type Reach = Public; type Message = (); type Input = WorkerInput; type Output = WorkerOutput; - fn create(link: AgentLink) -> Self { + fn create(link: WorkerLink) -> Self { Self { link } } diff --git a/examples/web_worker_fib/src/app.rs b/examples/web_worker_fib/src/app.rs index 7ab2f9d5b1e..77d31ff5d1d 100644 --- a/examples/web_worker_fib/src/app.rs +++ b/examples/web_worker_fib/src/app.rs @@ -1,4 +1,5 @@ use crate::agent::{Worker, WorkerInput, WorkerOutput}; +use std::rc::Rc; use web_sys::HtmlInputElement; use yew::prelude::*; @@ -22,7 +23,11 @@ impl Component for Model { type Properties = (); fn create(ctx: &Context) -> Self { - let worker = Worker::bridge(ctx.link().callback(Self::Message::WorkerMsg)); + let cb = { + let link = ctx.link().clone(); + move |e| link.send_message(Self::Message::WorkerMsg(e)) + }; + let worker = Worker::bridge(Rc::new(cb)); Self { clicker_value: 0, diff --git a/examples/web_worker_fib/src/lib.rs b/examples/web_worker_fib/src/lib.rs index 3832440df89..54a09d4be92 100644 --- a/examples/web_worker_fib/src/lib.rs +++ b/examples/web_worker_fib/src/lib.rs @@ -5,7 +5,7 @@ pub mod agent; pub mod app; use app::Model; use wasm_bindgen::prelude::*; -use yew_agent::PublicAgent; +use yew_agent::PublicWorker; #[wasm_bindgen(start)] pub fn start() { diff --git a/packages/yew-agent/Cargo.toml b/packages/yew-agent/Cargo.toml index e91650590c1..9079a40b58d 100644 --- a/packages/yew-agent/Cargo.toml +++ b/packages/yew-agent/Cargo.toml @@ -7,28 +7,6 @@ readme = "../../README.md" description = "Agents for Yew" license = "MIT OR Apache-2.0" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] -anymap2 = "0.13" -bincode = "1" -gloo-console = "0.2" -gloo-utils = "0.1" -js-sys = "0.3" -serde = { version = "1", features = ["derive"] } -slab = "0.4" -wasm-bindgen = "0.2" yew = { version = "0.19.3", path = "../yew" } -wasm-bindgen-futures = "0.4" - -[dependencies.web-sys] -version = "0.3" -features = [ - "Blob", - "BlobPropertyBag", - "DedicatedWorkerGlobalScope", - "MessageEvent", - "Url", - "Worker", - "WorkerOptions", -] +gloo-worker = "0.1" diff --git a/packages/yew-agent/src/hooks.rs b/packages/yew-agent/src/hooks.rs index d665788da2e..74c5bd4b65d 100644 --- a/packages/yew-agent/src/hooks.rs +++ b/packages/yew-agent/src/hooks.rs @@ -16,16 +16,16 @@ impl UseBridgeHandle where T: Bridged, { - /// Send a message to an agent. + /// Send a message to an worker. pub fn send(&self, msg: T::Input) { let mut bridge = self.inner.borrow_mut(); bridge.send(msg); } } -/// A hook to bridge to an Agent. +/// A hook to bridge to an [`Worker`]. /// -/// This hooks will only bridge the agent once over the entire component lifecycle. +/// This hooks will only bridge the worker once over the entire component lifecycle. /// /// Takes a callback as the only argument. The callback will be updated on every render to make /// sure captured values (if any) are up to date. @@ -46,16 +46,18 @@ where } let bridge = use_mut_ref(move || { - T::bridge(Callback::from(move |output| { - let on_output = on_output_ref.borrow().clone(); - on_output(output); - })) + T::bridge({ + Rc::new(move |output| { + let on_output = on_output_ref.borrow().clone(); + on_output(output); + }) + }) }); UseBridgeHandle { inner: bridge } } -impl Clone for UseBridgeHandle { +impl Clone for UseBridgeHandle { fn clone(&self) -> Self { Self { inner: self.inner.clone(), diff --git a/packages/yew-agent/src/lib.rs b/packages/yew-agent/src/lib.rs index 1c44060aab5..9a68c12f2cb 100644 --- a/packages/yew-agent/src/lib.rs +++ b/packages/yew-agent/src/lib.rs @@ -1,119 +1,7 @@ //! This module contains Yew's web worker implementation. mod hooks; -mod link; -mod pool; -mod worker; +#[doc(inline)] +pub use gloo_worker::*; pub use hooks::{use_bridge, UseBridgeHandle}; -pub use link::AgentLink; -pub(crate) use link::*; -pub(crate) use pool::*; -pub use pool::{Dispatched, Dispatcher}; -pub use worker::{Private, PrivateAgent, Public, PublicAgent}; - -use serde::{Deserialize, Serialize}; -use std::fmt; -use std::ops::{Deref, DerefMut}; -use yew::callback::Callback; - -/// Declares the behavior of the agent. -pub trait Agent: Sized + 'static { - /// Reach capability of the agent. - type Reach: Discoverer; - /// Type of an input message. - type Message; - /// Incoming message type. - type Input; - /// Outgoing message type. - type Output; - - /// Creates an instance of an agent. - fn create(link: AgentLink) -> Self; - - /// This method called on every update message. - fn update(&mut self, msg: Self::Message); - - /// This method called on when a new bridge created. - fn connected(&mut self, _id: HandlerId) {} - - /// This method called on every incoming message. - fn handle_input(&mut self, msg: Self::Input, id: HandlerId); - - /// This method called on when a new bridge destroyed. - fn disconnected(&mut self, _id: HandlerId) {} - - /// This method called when the agent is destroyed. - fn destroy(&mut self) {} - - /// Represents the name of loading resorce for remote workers which - /// have to live in a separate files. - fn name_of_resource() -> &'static str { - "main.js" - } - - /// Indicates whether the name of the resource is relative. - /// - /// The default implementation returns `false`, which will cause the result - /// returned by [`Self::name_of_resource`] to be interpreted as an absolute - /// URL. If `true` is returned, it will be interpreted as a relative URL. - fn resource_path_is_relative() -> bool { - false - } - - /// Signifies if resource is a module. - /// This has pending browser support. - fn is_module() -> bool { - false - } -} - -/// Id of responses handler. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)] -pub struct HandlerId(usize, bool); - -impl HandlerId { - fn new(id: usize, respondable: bool) -> Self { - HandlerId(id, respondable) - } - fn raw_id(self) -> usize { - self.0 - } - /// Indicates if a handler id corresponds to callback in the Agent runtime. - pub fn is_respondable(self) -> bool { - self.1 - } -} - -/// Determine a visibility of an agent. -#[doc(hidden)] -pub trait Discoverer { - type Agent: Agent; - - /// Spawns an agent and returns `Bridge` implementation. - fn spawn_or_join( - _callback: Option::Output>>, - ) -> Box>; -} - -/// Bridge to a specific kind of worker. -pub trait Bridge { - /// Send a message to an agent. - fn send(&mut self, msg: AGN::Input); -} - -/// This trait allows registering or getting the address of a worker. -pub trait Bridged: Agent + Sized + 'static { - /// Creates a messaging bridge between a worker and the component. - fn bridge(callback: Callback) -> Box>; -} - -impl Bridged for T -where - T: Agent, - ::Reach: Discoverer, -{ - fn bridge(callback: Callback) -> Box> { - Self::Reach::spawn_or_join(Some(callback)) - } -} diff --git a/packages/yew-agent/src/link.rs b/packages/yew-agent/src/link.rs deleted file mode 100644 index 9d7ef52f10c..00000000000 --- a/packages/yew-agent/src/link.rs +++ /dev/null @@ -1,258 +0,0 @@ -use super::*; -use std::cell::RefCell; -use std::fmt; -use std::future::Future; -use std::rc::Rc; -use wasm_bindgen_futures::spawn_local; -use yew::callback::Callback; -use yew::html::ImplicitClone; -use yew::scheduler::{self, Runnable, Shared}; - -/// Defines communication from Worker to Consumers -pub(crate) trait Responder { - /// Implementation for communication channel from Worker to Consumers - fn respond(&self, id: HandlerId, output: AGN::Output); -} - -/// Link to agent's scope for creating callbacks. -pub struct AgentLink { - scope: AgentScope, - responder: Rc>, -} - -impl AgentLink { - /// Create link for a scope. - pub(crate) fn connect(scope: &AgentScope, responder: T) -> Self - where - T: Responder + 'static, - { - AgentLink { - scope: scope.clone(), - responder: Rc::new(responder), - } - } - - /// Send response to an agent. - pub fn respond(&self, id: HandlerId, output: AGN::Output) { - self.responder.respond(id, output); - } - - /// Send a message to the agent - pub fn send_message(&self, msg: T) - where - T: Into, - { - self.scope.send(AgentLifecycleEvent::Message(msg.into())); - } - - /// Send an input to self - pub fn send_input(&self, input: T) - where - T: Into, - { - let handler_id = HandlerId::new(0, false); - self.scope - .send(AgentLifecycleEvent::Input(input.into(), handler_id)); - } - - /// Create a callback which will send a message to the agent when invoked. - pub fn callback(&self, function: F) -> Callback - where - M: Into, - F: Fn(IN) -> M + 'static, - { - let scope = self.scope.clone(); - let closure = move |input| { - let output = function(input).into(); - scope.send(AgentLifecycleEvent::Message(output)); - }; - closure.into() - } - - /// This method creates a [`Callback`] which returns a Future which - /// returns a message to be sent back to the agent - /// - /// # Panics - /// If the future panics, then the promise will not resolve, and - /// will leak. - pub fn callback_future(&self, function: FN) -> Callback - where - M: Into, - FU: Future + 'static, - FN: Fn(IN) -> FU + 'static, - { - let link = self.clone(); - - let closure = move |input: IN| { - let future: FU = function(input); - link.send_future(future); - }; - - closure.into() - } - - /// This method processes a Future that returns a message and sends it back to the agent. - /// - /// # Panics - /// If the future panics, then the promise will not resolve, and will leak. - pub fn send_future(&self, future: F) - where - M: Into, - F: Future + 'static, - { - let link: AgentLink = self.clone(); - let js_future = async move { - let message: AGN::Message = future.await.into(); - let cb = link.callback(|m: AGN::Message| m); - cb.emit(message); - }; - spawn_local(js_future); - } -} - -impl fmt::Debug for AgentLink { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("AgentLink<_>") - } -} - -impl Clone for AgentLink { - fn clone(&self) -> Self { - AgentLink { - scope: self.scope.clone(), - responder: self.responder.clone(), - } - } -} -/// This struct holds a reference to a component and to a global scheduler. -pub(crate) struct AgentScope { - state: Shared>, -} - -impl fmt::Debug for AgentScope { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("AgentScope<_>") - } -} - -impl Clone for AgentScope { - fn clone(&self) -> Self { - AgentScope { - state: self.state.clone(), - } - } -} - -impl AgentScope { - /// Create agent scope - pub fn new() -> Self { - let state = Rc::new(RefCell::new(AgentState::new())); - AgentScope { state } - } - - /// Schedule message for sending to agent - pub fn send(&self, event: AgentLifecycleEvent) { - scheduler::push(Box::new(AgentRunnable { - state: self.state.clone(), - event, - })); - } -} - -impl Default for AgentScope { - fn default() -> Self { - Self::new() - } -} - -impl ImplicitClone for AgentScope {} - -struct AgentState { - agent: Option, - // TODO(#939): Use agent field to control create message this flag - destroyed: bool, -} - -impl AgentState { - fn new() -> Self { - AgentState { - agent: None, - destroyed: false, - } - } -} - -/// Internal Agent lifecycle events -#[derive(Debug)] -pub(crate) enum AgentLifecycleEvent { - /// Request to create link - Create(AgentLink), - /// Internal Agent message - Message(AGN::Message), - /// Client connected - Connected(HandlerId), - /// Received message from Client - Input(AGN::Input, HandlerId), - /// Client disconnected - Disconnected(HandlerId), - /// Request to destroy agent - Destroy, -} - -struct AgentRunnable { - state: Shared>, - event: AgentLifecycleEvent, -} - -impl Runnable for AgentRunnable -where - AGN: Agent, -{ - fn run(self: Box) { - let mut state = self.state.borrow_mut(); - if state.destroyed { - return; - } - match self.event { - AgentLifecycleEvent::Create(link) => { - state.agent = Some(AGN::create(link)); - } - AgentLifecycleEvent::Message(msg) => { - state - .agent - .as_mut() - .expect("agent was not created to process messages") - .update(msg); - } - AgentLifecycleEvent::Connected(id) => { - state - .agent - .as_mut() - .expect("agent was not created to send a connected message") - .connected(id); - } - AgentLifecycleEvent::Input(inp, id) => { - state - .agent - .as_mut() - .expect("agent was not created to process inputs") - .handle_input(inp, id); - } - AgentLifecycleEvent::Disconnected(id) => { - state - .agent - .as_mut() - .expect("agent was not created to send a disconnected message") - .disconnected(id); - } - AgentLifecycleEvent::Destroy => { - let mut agent = state - .agent - .take() - .expect("trying to destroy not existent agent"); - agent.destroy(); - state.destroyed = true; - } - } - } -} diff --git a/packages/yew-agent/src/pool.rs b/packages/yew-agent/src/pool.rs deleted file mode 100644 index 66c5a717000..00000000000 --- a/packages/yew-agent/src/pool.rs +++ /dev/null @@ -1,89 +0,0 @@ -use super::*; -use gloo_console as console; -use slab::Slab; -use yew::scheduler::Shared; - -pub(crate) type Last = bool; - -/// Type alias to a sharable Slab that owns optional callbacks that emit messages of the type of the specified Agent. -pub(crate) type SharedOutputSlab = Shared::Output>>>>; - -/// The slab contains the callback, the id is used to look up the callback, -/// and the output is the message that will be sent via the callback. -pub(crate) fn locate_callback_and_respond( - slab: &SharedOutputSlab, - id: HandlerId, - output: AGN::Output, -) { - let callback = { - let slab = slab.borrow(); - match slab.get(id.raw_id()).cloned() { - Some(callback) => callback, - None => { - console::warn!(format!( - "Id of handler does not exist in the slab: {}.", - id.raw_id() - )); - return; - } - } - }; - match callback { - Some(callback) => callback.emit(output), - None => console::warn!(format!("The Id of the handler: {}, while present in the slab, is not associated with a callback.", id.raw_id())), - } -} - -/// A newtype around a bridge to indicate that it is distinct from a normal bridge -pub struct Dispatcher(pub(crate) Box>); - -impl fmt::Debug for Dispatcher { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("Dispatcher<_>") - } -} - -impl Deref for Dispatcher { - type Target = dyn Bridge; - - fn deref(&self) -> &Self::Target { - self.0.deref() - } -} -impl DerefMut for Dispatcher { - fn deref_mut(&mut self) -> &mut Self::Target { - self.0.deref_mut() - } -} - -/// This trait allows the creation of a dispatcher to an existing agent that will not send replies when messages are sent. -pub trait Dispatched: Agent + Sized + 'static { - /// Creates a dispatcher to the agent that will not send messages back. - /// - /// # Note - /// Dispatchers don't have `HandlerId`s and therefore `Agent::handle` will be supplied `None` - /// for the `id` parameter, and `connected` and `disconnected` will not be called. - /// - /// # Important - /// Because the Agents using Context or Public reaches use the number of existing bridges to - /// keep track of if the agent itself should exist, creating dispatchers will not guarantee that - /// an Agent will exist to service requests sent from Dispatchers. You **must** keep at least one - /// bridge around if you wish to use a dispatcher. If you are using agents in a write-only manner, - /// then it is suggested that you create a bridge that handles no-op responses as high up in the - /// component hierarchy as possible - oftentimes the root component for simplicity's sake. - fn dispatcher() -> Dispatcher; -} - -#[doc(hidden)] -pub trait Dispatchable {} - -impl Dispatched for T -where - T: Agent, - ::Reach: Discoverer, - ::Reach: Dispatchable, -{ - fn dispatcher() -> Dispatcher { - Dispatcher(Self::Reach::spawn_or_join(None)) - } -} diff --git a/packages/yew-agent/src/worker/mod.rs b/packages/yew-agent/src/worker/mod.rs deleted file mode 100644 index 3920277780b..00000000000 --- a/packages/yew-agent/src/worker/mod.rs +++ /dev/null @@ -1,165 +0,0 @@ -mod private; -mod public; -mod queue; - -pub use private::{Private, PrivateAgent}; -pub use public::{Public, PublicAgent}; - -use super::*; -use js_sys::{Array, Reflect, Uint8Array}; -use serde::{Deserialize, Serialize}; -use wasm_bindgen::{closure::Closure, JsCast, JsValue, UnwrapThrowExt}; -use web_sys::{ - Blob, BlobPropertyBag, DedicatedWorkerGlobalScope, MessageEvent, Url, Worker, WorkerOptions, -}; - -pub(crate) struct WorkerResponder {} - -impl Responder for WorkerResponder -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, -{ - fn respond(&self, id: HandlerId, output: AGN::Output) { - let msg = FromWorker::ProcessOutput(id, output); - let data = msg.pack(); - worker_self().post_message_vec(data); - } -} - -/// Message packager, based on serde::Serialize/Deserialize -pub trait Packed { - /// Pack serializable message into Vec - fn pack(&self) -> Vec; - /// Unpack deserializable message of byte slice - fn unpack(data: &[u8]) -> Self; -} - -impl Deserialize<'de>> Packed for T { - fn pack(&self) -> Vec { - bincode::serialize(&self).expect("can't serialize an agent message") - } - - fn unpack(data: &[u8]) -> Self { - bincode::deserialize(data).expect("can't deserialize an agent message") - } -} - -/// Serializable messages to worker -#[derive(Serialize, Deserialize, Debug)] -enum ToWorker { - /// Client is connected - Connected(HandlerId), - /// Incoming message to Worker - ProcessInput(HandlerId, T), - /// Client is disconnected - Disconnected(HandlerId), - /// Worker should be terminated - Destroy, -} - -/// Serializable messages sent by worker to consumer -#[derive(Serialize, Deserialize, Debug)] -enum FromWorker { - /// Worker sends this message when `wasm` bundle has loaded. - WorkerLoaded, - /// Outgoing message to consumer - ProcessOutput(HandlerId, T), -} - -fn send_to_remote(worker: &Worker, msg: ToWorker) -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, -{ - let msg = msg.pack(); - worker.post_message_vec(msg); -} -fn worker_new(name_of_resource: &str, resource_is_relative: bool, is_module: bool) -> Worker { - let origin = gloo_utils::document() - .location() - .unwrap_throw() - .origin() - .unwrap_throw(); - let pathname = gloo_utils::window().location().pathname().unwrap_throw(); - - let prefix = if resource_is_relative { - pathname - .rfind(|c| c == '/') - .map(|i| &pathname[..i]) - .unwrap_or_default() - } else { - "" - }; - let script_url = format!("{}{}/{}", origin, prefix, name_of_resource); - let wasm_url = format!( - "{}{}/{}", - origin, - prefix, - name_of_resource.replace(".js", "_bg.wasm") - ); - let array = Array::new(); - array.push( - &format!( - r#"importScripts("{}");wasm_bindgen("{}");"#, - script_url, wasm_url - ) - .into(), - ); - let blob = Blob::new_with_str_sequence_and_options( - &array, - BlobPropertyBag::new().type_("application/javascript"), - ) - .unwrap(); - let url = Url::create_object_url_with_blob(&blob).unwrap(); - - if is_module { - let options = WorkerOptions::new(); - Reflect::set( - options.as_ref(), - &JsValue::from_str("type"), - &JsValue::from_str("module"), - ) - .unwrap(); - Worker::new_with_options(&url, &options).expect("failed to spawn worker") - } else { - Worker::new(&url).expect("failed to spawn worker") - } -} - -fn worker_self() -> DedicatedWorkerGlobalScope { - JsValue::from(js_sys::global()).into() -} - -trait WorkerExt { - fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec)); - - fn post_message_vec(&self, data: Vec); -} - -macro_rules! worker_ext_impl { - ($($type:ident),+) => {$( - impl WorkerExt for $type { - fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec)) { - let handler = move |message: MessageEvent| { - let data = Uint8Array::from(message.data()).to_vec(); - handler(data); - }; - let closure = Closure::wrap(Box::new(handler) as Box); - self.set_onmessage(Some(closure.as_ref().unchecked_ref())); - closure.forget(); - } - - fn post_message_vec(&self, data: Vec) { - self.post_message(&Uint8Array::from(data.as_slice())) - .expect("failed to post message"); - } - } - )+}; -} - -worker_ext_impl! { - Worker, DedicatedWorkerGlobalScope -} diff --git a/packages/yew-agent/src/worker/private.rs b/packages/yew-agent/src/worker/private.rs deleted file mode 100644 index 45a6bb327f1..00000000000 --- a/packages/yew-agent/src/worker/private.rs +++ /dev/null @@ -1,210 +0,0 @@ -use super::*; -use queue::Queue; -use std::cell::RefCell; -use std::fmt; -use std::marker::PhantomData; -use std::rc::Rc; -use std::sync::atomic::{AtomicUsize, Ordering}; -use web_sys::Worker; -use yew::callback::Callback; - -thread_local! { - static QUEUE: Queue = Queue::new(); -} - -static PRIVATE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0); -const SINGLETON_ID: HandlerId = HandlerId(0, true); - -/// Create a new instance for every bridge. -#[allow(missing_debug_implementations)] -pub struct Private { - _agent: PhantomData, -} - -/// A trait to enable private agents being registered in a web worker. -pub trait PrivateAgent { - /// Executes an agent in the current environment. - /// Uses in `main` function of a worker. - fn register(); -} - -impl PrivateAgent for AGN -where - AGN: Agent>, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, -{ - fn register() { - let scope = AgentScope::::new(); - let responder = WorkerResponder {}; - let link = AgentLink::connect(&scope, responder); - let upd = AgentLifecycleEvent::Create(link); - scope.send(upd); - let handler = move |data: Vec| { - let msg = ToWorker::::unpack(&data); - match msg { - ToWorker::Connected(_id) => { - let upd = AgentLifecycleEvent::Connected(SINGLETON_ID); - scope.send(upd); - } - ToWorker::ProcessInput(_id, value) => { - let upd = AgentLifecycleEvent::Input(value, SINGLETON_ID); - scope.send(upd); - } - ToWorker::Disconnected(_id) => { - let upd = AgentLifecycleEvent::Disconnected(SINGLETON_ID); - scope.send(upd); - } - ToWorker::Destroy => { - let upd = AgentLifecycleEvent::Destroy; - scope.send(upd); - // Terminates web worker - worker_self().close(); - } - } - }; - let loaded: FromWorker = FromWorker::WorkerLoaded; - let loaded = loaded.pack(); - let worker = worker_self(); - worker.set_onmessage_closure(handler); - worker.post_message_vec(loaded); - } -} - -impl Discoverer for Private -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, -{ - type Agent = AGN; - - fn spawn_or_join(callback: Option>) -> Box> { - let id = PRIVATE_ID_COUNTER.fetch_add(1, Ordering::Relaxed); - let callback = callback.expect("Callback required for Private agents"); - let handler = move |data: Vec, worker: &Worker| { - let msg = FromWorker::::unpack(&data); - match msg { - FromWorker::WorkerLoaded => { - QUEUE.with(|queue| { - queue.insert_loaded_agent(id); - - if let Some(msgs) = queue.remove_msg_queue(&id) { - for msg in msgs { - worker.post_message_vec(msg) - } - } - }); - } - FromWorker::ProcessOutput(id, output) => { - assert_eq!(id.raw_id(), SINGLETON_ID.raw_id()); - callback.emit(output); - } - } - }; - - let name_of_resource = AGN::name_of_resource(); - let is_relative = AGN::resource_path_is_relative(); - let handler_cell = Rc::new(RefCell::new(Some(handler))); - - let worker = { - let handler_cell = handler_cell.clone(); - let worker = worker_new(name_of_resource, is_relative, AGN::is_module()); - let worker_clone = worker.clone(); - worker.set_onmessage_closure(move |data: Vec| { - if let Some(handler) = handler_cell.borrow().as_ref() { - handler(data, &worker_clone) - } - }); - worker - }; - let bridge = PrivateBridge { - handler_cell, - worker, - _agent: PhantomData, - id, - }; - bridge.send_message(ToWorker::Connected(SINGLETON_ID)); - Box::new(bridge) - } -} - -/// A connection manager for components interaction with workers. -pub struct PrivateBridge -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, - HNDL: Fn(Vec, &Worker), -{ - handler_cell: Rc>>, - worker: Worker, - _agent: PhantomData, - id: usize, -} - -impl PrivateBridge -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, - HNDL: Fn(Vec, &Worker), -{ - /// Send a message to the worker, queuing the message if necessary - fn send_message(&self, msg: ToWorker) { - QUEUE.with(|queue| { - if queue.is_worker_loaded(&self.id) { - send_to_remote::(&self.worker, msg); - } else { - queue.add_msg_to_queue(msg.pack(), self.id); - } - }); - } -} - -impl fmt::Debug for PrivateBridge -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, - HNDL: Fn(Vec, &Worker), -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("PrivateBridge<_>") - } -} - -impl Bridge for PrivateBridge -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, - HNDL: Fn(Vec, &Worker), -{ - fn send(&mut self, msg: AGN::Input) { - let msg = ToWorker::ProcessInput(SINGLETON_ID, msg); - self.send_message(msg); - } -} - -impl Drop for PrivateBridge -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, - HNDL: Fn(Vec, &Worker), -{ - fn drop(&mut self) { - let disconnected = ToWorker::Disconnected(SINGLETON_ID); - send_to_remote::(&self.worker, disconnected); - - let destroy = ToWorker::Destroy; - send_to_remote::(&self.worker, destroy); - - self.handler_cell.borrow_mut().take(); - - QUEUE.with(|queue| { - queue.remove_agent(&self.id); - }); - } -} diff --git a/packages/yew-agent/src/worker/public.rs b/packages/yew-agent/src/worker/public.rs deleted file mode 100644 index 7379ec1986a..00000000000 --- a/packages/yew-agent/src/worker/public.rs +++ /dev/null @@ -1,274 +0,0 @@ -use super::WorkerExt; -use super::*; -use anymap2::{self, AnyMap}; -use queue::Queue; -use slab::Slab; -use std::any::TypeId; -use std::cell::RefCell; -use std::fmt; -use std::marker::PhantomData; -use std::rc::Rc; -use web_sys::Worker; -use yew::callback::Callback; -use yew::scheduler::Shared; - -thread_local! { - static REMOTE_AGENTS_POOL: RefCell = RefCell::new(AnyMap::new()); - static QUEUE: Queue = Queue::new(); -} - -/// Create a single instance in a tab. -#[allow(missing_debug_implementations)] -pub struct Public { - _agent: PhantomData, -} - -impl Discoverer for Public -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, -{ - type Agent = AGN; - - fn spawn_or_join(callback: Option>) -> Box> { - let bridge = REMOTE_AGENTS_POOL.with(|pool| { - let mut pool = pool.borrow_mut(); - match pool.entry::>() { - anymap2::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback), - anymap2::Entry::Vacant(entry) => { - let slab: Shared>>> = - Rc::new(RefCell::new(Slab::new())); - let handler = { - let slab = slab.clone(); - move |data: Vec, worker: &Worker| { - let msg = FromWorker::::unpack(&data); - match msg { - FromWorker::WorkerLoaded => { - QUEUE.with(|queue| { - queue.insert_loaded_agent(TypeId::of::()); - - if let Some(msgs) = - queue.remove_msg_queue(&TypeId::of::()) - { - for msg in msgs { - worker.post_message_vec(msg) - } - } - }); - } - FromWorker::ProcessOutput(id, output) => { - locate_callback_and_respond::(&slab, id, output); - } - } - } - }; - let name_of_resource = AGN::name_of_resource(); - let is_relative = AGN::resource_path_is_relative(); - let worker = { - let worker = worker_new(name_of_resource, is_relative, AGN::is_module()); - let worker_clone = worker.clone(); - worker.set_onmessage_closure(move |data: Vec| { - handler(data, &worker_clone); - }); - worker - }; - let launched = RemoteAgent::new(worker, slab); - entry.insert(launched).create_bridge(callback) - } - } - }); - Box::new(bridge) - } -} - -impl Dispatchable for Public -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, -{ -} - -/// A connection manager for components interaction with workers. -pub struct PublicBridge -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, -{ - worker: Worker, - id: HandlerId, - _agent: PhantomData, -} - -impl fmt::Debug for PublicBridge -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("PublicBridge<_>") - } -} - -impl PublicBridge -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, -{ - /// Send a message to the worker, queuing the message if necessary - fn send_message(&self, msg: ToWorker) { - QUEUE.with(|queue| { - if queue.is_worker_loaded(&TypeId::of::()) { - send_to_remote::(&self.worker, msg); - } else { - queue.add_msg_to_queue(msg.pack(), TypeId::of::()); - } - }); - } -} - -impl Bridge for PublicBridge -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, -{ - fn send(&mut self, msg: AGN::Input) { - let msg = ToWorker::ProcessInput(self.id, msg); - self.send_message(msg); - } -} - -impl Drop for PublicBridge -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, -{ - fn drop(&mut self) { - let terminate_worker = REMOTE_AGENTS_POOL.with(|pool| { - let mut pool = pool.borrow_mut(); - let terminate_worker = { - if let Some(launched) = pool.get_mut::>() { - launched.remove_bridge(self) - } else { - false - } - }; - - if terminate_worker { - pool.remove::>(); - } - - terminate_worker - }); - - let disconnected = ToWorker::Disconnected(self.id); - self.send_message(disconnected); - - if terminate_worker { - let destroy = ToWorker::Destroy; - self.send_message(destroy); - - QUEUE.with(|queue| { - queue.remove_agent(&TypeId::of::()); - }); - } - } -} - -/// A trait to enable public agents being registered in a web worker. -pub trait PublicAgent { - /// Executes an agent in the current environment. - /// Uses in `main` function of a worker. - fn register(); -} - -impl PublicAgent for AGN -where - AGN: Agent>, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, -{ - fn register() { - let scope = AgentScope::::new(); - let responder = WorkerResponder {}; - let link = AgentLink::connect(&scope, responder); - let upd = AgentLifecycleEvent::Create(link); - scope.send(upd); - let handler = move |data: Vec| { - let msg = ToWorker::::unpack(&data); - match msg { - ToWorker::Connected(id) => { - let upd = AgentLifecycleEvent::Connected(id); - scope.send(upd); - } - ToWorker::ProcessInput(id, value) => { - let upd = AgentLifecycleEvent::Input(value, id); - scope.send(upd); - } - ToWorker::Disconnected(id) => { - let upd = AgentLifecycleEvent::Disconnected(id); - scope.send(upd); - } - ToWorker::Destroy => { - let upd = AgentLifecycleEvent::Destroy; - scope.send(upd); - // Terminates web worker - worker_self().close(); - } - } - }; - let loaded: FromWorker = FromWorker::WorkerLoaded; - let loaded = loaded.pack(); - let worker = worker_self(); - worker.set_onmessage_closure(handler); - worker.post_message_vec(loaded); - } -} - -struct RemoteAgent -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, -{ - worker: Worker, - slab: SharedOutputSlab, -} - -impl RemoteAgent -where - AGN: Agent, - ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, -{ - pub fn new(worker: Worker, slab: SharedOutputSlab) -> Self { - RemoteAgent { worker, slab } - } - - fn create_bridge(&mut self, callback: Option>) -> PublicBridge { - let respondable = callback.is_some(); - let mut slab = self.slab.borrow_mut(); - let id: usize = slab.insert(callback); - let id = HandlerId::new(id, respondable); - let bridge = PublicBridge { - worker: self.worker.clone(), - id, - _agent: PhantomData, - }; - bridge.send_message(ToWorker::Connected(bridge.id)); - - bridge - } - - fn remove_bridge(&mut self, bridge: &PublicBridge) -> Last { - let mut slab = self.slab.borrow_mut(); - let _ = slab.remove(bridge.id.raw_id()); - slab.is_empty() - } -} diff --git a/packages/yew-agent/src/worker/queue.rs b/packages/yew-agent/src/worker/queue.rs deleted file mode 100644 index cbe2e83bb3c..00000000000 --- a/packages/yew-agent/src/worker/queue.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::cell::RefCell; -use std::collections::{hash_map, HashMap, HashSet}; -use std::hash::Hash; - -/// Thread-local instance used to queue worker messages -pub struct Queue { - loaded_agents: RefCell>, - msg_queue: RefCell>>>, -} - -impl Queue { - pub fn new() -> Queue { - Queue { - loaded_agents: RefCell::new(HashSet::new()), - msg_queue: RefCell::new(HashMap::new()), - } - } - - #[inline] - pub fn remove_msg_queue(&self, id: &T) -> Option>> { - self.msg_queue.borrow_mut().remove(id) - } - - #[inline] - pub fn insert_loaded_agent(&self, id: T) { - self.loaded_agents.borrow_mut().insert(id); - } - - #[inline] - pub fn is_worker_loaded(&self, id: &T) -> bool { - self.loaded_agents.borrow().contains(id) - } - - pub fn add_msg_to_queue(&self, msg: Vec, id: T) { - let mut queue = self.msg_queue.borrow_mut(); - match queue.entry(id) { - hash_map::Entry::Vacant(record) => { - record.insert(vec![msg]); - } - hash_map::Entry::Occupied(ref mut record) => { - record.get_mut().push(msg); - } - } - } - - /// This is called by a worker's `Drop` implementation in order to remove the worker from the list - /// of loaded workers. - pub fn remove_agent(&self, id: &T) { - self.loaded_agents.borrow_mut().remove(id); - self.msg_queue.borrow_mut().remove(id); - } -} diff --git a/packages/yew/src/html/conversion.rs b/packages/yew/src/html/conversion.rs index 36b9246d5ee..fde68c601cf 100644 --- a/packages/yew/src/html/conversion.rs +++ b/packages/yew/src/html/conversion.rs @@ -13,7 +13,7 @@ impl ImplicitClone for Rc {} impl ImplicitClone for NodeRef {} impl ImplicitClone for Scope {} -// TODO there are still a few missing like AgentScope +// TODO there are still a few missing /// A trait similar to `Into` which allows conversion to a value of a `Properties` struct. pub trait IntoPropValue {