From 4f04405cdb08b1b4645b80d0dcf19cc07ba0fcdd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kawalec?= Date: Fri, 8 May 2020 14:33:47 +0200 Subject: [PATCH 01/11] conditially require serialize --- yew/src/agent.rs | 219 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 152 insertions(+), 67 deletions(-) diff --git a/yew/src/agent.rs b/yew/src/agent.rs index c3ca7ab7313..a3da7fc465c 100644 --- a/yew/src/agent.rs +++ b/yew/src/agent.rs @@ -69,7 +69,7 @@ impl Deserialize<'de>> Packed for T { } /// Type alias to a sharable Slab that owns optional callbacks that emit messages of the type of the specified Agent. -type SharedOutputSlab = Shared::Output>>>>; +type SharedOutputSlab = Shared::Reach as Discoverer>::Output>>>>; /// Id of responses handler. #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)] @@ -91,7 +91,7 @@ impl HandlerId { /// This trait allows registering or getting the address of a worker. pub trait Bridged: Agent + Sized + 'static { /// Creates a messaging bridge between a worker and the component. - fn bridge(callback: Callback) -> Box>; + fn bridge(callback: Callback<::Output>) -> Box>; } /// This trait allows the creation of a dispatcher to an existing agent that will not send replies when messages are sent. @@ -144,18 +144,20 @@ pub trait Threaded { fn register(); } -impl Threaded for T +impl Threaded for AGN where - T: Agent, + AGN: Agent>, + I: Serialize + for<'de> Deserialize<'de>, + O: Serialize + for<'de> Deserialize<'de> { fn register() { - let scope = AgentScope::::new(); + let scope = AgentScope::::new(); let responder = WorkerResponder {}; let link = AgentLink::connect(&scope, responder); let upd = AgentLifecycleEvent::Create(link); scope.send(upd); let handler = move |data: Vec| { - let msg = ToWorker::::unpack(&data); + let msg = ToWorker::<::Input>::unpack(&data); match msg { ToWorker::Connected(id) => { let upd = AgentLifecycleEvent::Connected(id); @@ -180,7 +182,7 @@ where } } }; - let loaded: FromWorker = FromWorker::WorkerLoaded; + let loaded: FromWorker<::Output> = FromWorker::WorkerLoaded; let loaded = loaded.pack(); cfg_match! { feature = "std_web" => js! { @@ -203,7 +205,7 @@ impl Bridged for T where T: Agent, { - fn bridge(callback: Callback) -> Box> { + fn bridge(callback: Callback<::Output>) -> Box> { Self::Reach::spawn_or_join(Some(callback)) } } @@ -221,8 +223,13 @@ where /// Determine a visibility of an agent. #[doc(hidden)] pub trait Discoverer { + type Input: fmt::Debug; + type Output; + /// Spawns an agent and returns `Bridge` implementation. - fn spawn_or_join(_callback: Option>) -> Box> { + fn spawn_or_join(_callback: Option>) -> Box> + where AGN::Reach: Discoverer + { unimplemented!( "The Reach type that you tried to use with this Agent does not have Discoverer properly implemented for it yet. Please see @@ -234,7 +241,7 @@ https://docs.rs/yew/latest/yew/agent/ for other Reach options." /// Bridge to a specific kind of worker. pub trait Bridge { /// Send a message to an agent. - fn send(&mut self, msg: AGN::Input); + fn send(&mut self, msg: ::Input); } // <<< SAME THREAD >>> @@ -259,7 +266,7 @@ impl LocalAgent { self.slab.clone() } - fn create_bridge(&mut self, callback: Option>) -> ContextBridge { + fn create_bridge(&mut self, callback: Option::Output>>) -> ContextBridge { let respondable = callback.is_some(); let mut slab = self.slab.borrow_mut(); let id: usize = slab.insert(callback); @@ -283,10 +290,18 @@ thread_local! { /// Create a single instance in the current thread. #[allow(missing_debug_implementations)] -pub struct Context; +pub struct Context { + _input: PhantomData, + _output: PhantomData +} + +impl Discoverer for Context { + type Input = I; + type Output = O; -impl Discoverer for Context { - fn spawn_or_join(callback: Option>) -> Box> { + fn spawn_or_join(callback: Option>) -> Box> + where AGN::Reach: Discoverer + { let mut scope_to_init = None; let bridge = LOCAL_AGENTS_POOL.with(|pool| { let mut pool = pool.borrow_mut(); @@ -314,14 +329,14 @@ impl Discoverer for Context { } } -impl Dispatchable for Context {} +impl Dispatchable for Context {} struct SlabResponder { - slab: Shared>>>, + slab: Shared::Output>>>>, } impl Responder for SlabResponder { - fn respond(&self, id: HandlerId, output: AGN::Output) { + fn respond(&self, id: HandlerId, output: ::Output) { locate_callback_and_respond::(&self.slab, id, output); } } @@ -331,7 +346,7 @@ impl Responder for SlabResponder { fn locate_callback_and_respond( slab: &SharedOutputSlab, id: HandlerId, - output: AGN::Output, + output: ::Output, ) { let callback = { let slab = slab.borrow(); @@ -355,7 +370,7 @@ struct ContextBridge { } impl Bridge for ContextBridge { - fn send(&mut self, msg: AGN::Input) { + fn send(&mut self, msg: ::Input) { let upd = AgentLifecycleEvent::Input(msg, self.id); self.scope.send(upd); } @@ -392,10 +407,18 @@ impl Drop for ContextBridge { /// Create an instance in the current thread. #[allow(missing_debug_implementations)] -pub struct Job; +pub struct Job { + _input: PhantomData, + _output: PhantomData +} -impl Discoverer for Job { - fn spawn_or_join(callback: Option>) -> Box> { +impl Discoverer for Job { + type Input = I; + type Output = O; + + fn spawn_or_join(callback: Option>) -> Box> + where AGN::Reach: Discoverer + { let callback = callback.expect("Callback required for Job"); let scope = AgentScope::::new(); let responder = CallbackResponder { callback }; @@ -412,11 +435,11 @@ impl Discoverer for Job { const SINGLETON_ID: HandlerId = HandlerId(0, true); struct CallbackResponder { - callback: Callback, + callback: Callback<::Output>, } impl Responder for CallbackResponder { - fn respond(&self, id: HandlerId, output: AGN::Output) { + fn respond(&self, id: HandlerId, output: ::Output) { assert_eq!(id.raw_id(), SINGLETON_ID.raw_id()); self.callback.emit(output); } @@ -427,7 +450,7 @@ struct JobBridge { } impl Bridge for JobBridge { - fn send(&mut self, msg: AGN::Input) { + fn send(&mut self, msg: ::Input) { let upd = AgentLifecycleEvent::Input(msg, SINGLETON_ID); self.scope.send(upd); } @@ -446,18 +469,29 @@ impl Drop for JobBridge { /// Create a new instance for every bridge. #[allow(missing_debug_implementations)] -pub struct Private; +pub struct Private { + _input: PhantomData, + _output: PhantomData +} + +impl Discoverer for Private +where I: Serialize + for<'de> Deserialize<'de>, + O: Serialize + for<'de> Deserialize<'de> +{ + type Input = I; + type Output = O; -impl Discoverer for Private { - fn spawn_or_join(callback: Option>) -> Box> { + fn spawn_or_join(callback: Option>) -> Box> + where AGN::Reach: Discoverer + { let callback = callback.expect("Callback required for Private agents"); let handler = move |data: Vec, #[cfg(feature = "std_web")] worker: Value, #[cfg(feature = "web_sys")] worker: &Worker| { - let msg = FromWorker::::unpack(&data); + let msg = FromWorker::::unpack(&data); match msg { FromWorker::WorkerLoaded => { - send_to_remote::(&worker, ToWorker::Connected(SINGLETON_ID)); + send_to_remote::(&worker, ToWorker::Connected(SINGLETON_ID)); } FromWorker::ProcessOutput(id, output) => { assert_eq!(id.raw_id(), SINGLETON_ID.raw_id()); @@ -493,22 +527,28 @@ impl Discoverer for Private { } /// A connection manager for components interaction with workers. -pub struct PrivateBridge { +pub struct PrivateBridge +where ::Input: Serialize + for<'de> Deserialize<'de> +{ #[cfg(feature = "std_web")] worker: Value, #[cfg(feature = "web_sys")] worker: Worker, - _agent: PhantomData, + _agent: PhantomData, } -impl fmt::Debug for PrivateBridge { +impl fmt::Debug for PrivateBridge +where ::Input: Serialize + for<'de> Deserialize<'de> +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("PrivateBridge<_>") } } -impl Bridge for PrivateBridge { - fn send(&mut self, msg: AGN::Input) { +impl Bridge for PrivateBridge +where ::Input: Serialize + for<'de> Deserialize<'de> +{ + fn send(&mut self, msg: ::Input) { // TODO(#937): Important! Implement. // Use a queue to collect a messages if an instance is not ready // and send them to an agent when it will reported readiness. @@ -527,13 +567,15 @@ impl Bridge for PrivateBridge { } } -impl Drop for PrivateBridge { +impl Drop for PrivateBridge +where ::Input: Serialize + for<'de> Deserialize<'de> +{ fn drop(&mut self) { let disconnected = ToWorker::Disconnected(SINGLETON_ID); - send_to_remote::(&self.worker, disconnected); + send_to_remote::<::Input>(&self.worker, disconnected); let destroy = ToWorker::Destroy; - send_to_remote::(&self.worker, destroy); + send_to_remote::<::Input>(&self.worker, destroy); } } @@ -545,7 +587,10 @@ struct RemoteAgent { slab: SharedOutputSlab, } -impl RemoteAgent { +impl RemoteAgent +where ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ pub fn new( #[cfg(feature = "std_web")] worker: Value, #[cfg(feature = "web_sys")] worker: Worker, @@ -554,7 +599,7 @@ impl RemoteAgent { RemoteAgent { worker, slab } } - fn create_bridge(&mut self, callback: Option>) -> PublicBridge { + fn create_bridge(&mut self, callback: Option::Output>>) -> PublicBridge { let respondable = callback.is_some(); let mut slab = self.slab.borrow_mut(); let id: usize = slab.insert(callback); @@ -584,23 +629,34 @@ thread_local! { /// Create a single instance in a tab. #[allow(missing_debug_implementations)] -pub struct Public; +pub struct Public { + _input: PhantomData, + _output: PhantomData +} + +impl Discoverer for Public +where I: Serialize + for<'de> Deserialize<'de>, + O: Serialize + for<'de> Deserialize<'de> +{ + type Input = I; + type Output = O; -impl Discoverer for Public { - fn spawn_or_join(callback: Option>) -> Box> { + fn spawn_or_join(callback: Option>) -> Box> + where AGN::Reach: Discoverer + { let bridge = REMOTE_AGENTS_POOL.with(|pool| { let mut pool = pool.borrow_mut(); match pool.entry::>() { anymap::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback), anymap::Entry::Vacant(entry) => { - let slab: Shared>>> = + let slab: Shared>>> = Rc::new(RefCell::new(Slab::new())); let handler = { let slab = slab.clone(); move |data: Vec, #[cfg(feature = "std_web")] worker: Value, #[cfg(feature = "web_sys")] worker: &Worker| { - let msg = FromWorker::::unpack(&data); + let msg = FromWorker::::unpack(&data); match msg { FromWorker::WorkerLoaded => { REMOTE_AGENTS_LOADED.with(|loaded| { @@ -656,10 +712,16 @@ impl Discoverer for Public { } } -impl Dispatchable for Public {} +impl Dispatchable for Public +where I: Serialize + for<'de> Deserialize<'de>, + O: Serialize + for<'de> Deserialize<'de> +{} /// A connection manager for components interaction with workers. -pub struct PublicBridge { +pub struct PublicBridge +where ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ #[cfg(feature = "std_web")] worker: Value, #[cfg(feature = "web_sys")] @@ -668,13 +730,19 @@ pub struct PublicBridge { _agent: PhantomData, } -impl fmt::Debug for PublicBridge { +impl fmt::Debug for PublicBridge +where ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("PublicBridge<_>") } } -impl PublicBridge { +impl PublicBridge +where ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ fn worker_is_loaded(&self) -> bool { REMOTE_AGENTS_LOADED.with(|loaded| loaded.borrow().contains(&TypeId::of::())) } @@ -694,20 +762,22 @@ impl PublicBridge { } /// Send a message to the worker, queuing it up if necessary - fn send_message(&self, msg: ToWorker) { + fn send_message(&self, msg: ToWorker<::Input>) { if self.worker_is_loaded() { - send_to_remote::(&self.worker, msg); + send_to_remote::<::Input>(&self.worker, msg); } else { self.msg_to_queue(msg.pack()); } } } -fn send_to_remote( +fn send_to_remote( #[cfg(feature = "std_web")] worker: &Value, #[cfg(feature = "web_sys")] worker: &Worker, - msg: ToWorker, -) { + msg: ToWorker, +) +where Input: Serialize + for<'de> Deserialize<'de> +{ let msg = msg.pack(); cfg_match! { feature = "std_web" => js! { @@ -719,14 +789,20 @@ fn send_to_remote( }; } -impl Bridge for PublicBridge { - fn send(&mut self, msg: AGN::Input) { +impl Bridge for PublicBridge +where ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ + fn send(&mut self, msg: ::Input) { let msg = ToWorker::ProcessInput(self.id, msg); self.send_message(msg); } } -impl Drop for PublicBridge { +impl Drop for PublicBridge +where ::Input: Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ fn drop(&mut self) { let terminate_worker = REMOTE_AGENTS_POOL.with(|pool| { let mut pool = pool.borrow_mut(); @@ -765,9 +841,15 @@ impl Drop for PublicBridge { /// Create a single instance in a browser. #[allow(missing_debug_implementations)] -pub struct Global; +pub struct Global { + _input: PhantomData, + _output: PhantomData +} -impl Discoverer for Global {} +impl Discoverer for Global { + type Input = I; + type Output = O; +} /// Declares the behavior of the agent. pub trait Agent: Sized + 'static { @@ -776,9 +858,9 @@ pub trait Agent: Sized + 'static { /// Type of an input message. type Message; /// Incoming message type. - type Input: Serialize + for<'de> Deserialize<'de>; + //type Input: Serialize + for<'de> Deserialize<'de>; /// Outgoing message type. - type Output: Serialize + for<'de> Deserialize<'de>; + //type Output: Serialize + for<'de> Deserialize<'de>; /// Creates an instance of an agent. fn create(link: AgentLink) -> Self; @@ -790,7 +872,7 @@ pub trait Agent: Sized + 'static { fn connected(&mut self, _id: HandlerId) {} /// This method called on every incoming message. - fn handle_input(&mut self, msg: Self::Input, id: HandlerId); + fn handle_input(&mut self, msg: ::Input, id: HandlerId); /// This method called on when a new bridge destroyed. fn disconnected(&mut self, _id: HandlerId) {} @@ -856,13 +938,16 @@ impl Default for AgentScope { /// Defines communication from Worker to Consumers pub trait Responder { /// Implementation for communication channel from Worker to Consumers - fn respond(&self, id: HandlerId, output: AGN::Output); + fn respond(&self, id: HandlerId, output: ::Output); } struct WorkerResponder {} -impl Responder for WorkerResponder { - fn respond(&self, id: HandlerId, output: AGN::Output) { +impl Responder for WorkerResponder +where AGN: Agent, + ::Output: Serialize + for<'de> Deserialize<'de> +{ + fn respond(&self, id: HandlerId, output: ::Output) { let msg = FromWorker::ProcessOutput(id, output); let data = msg.pack(); cfg_match! { @@ -894,7 +979,7 @@ impl AgentLink { } /// Send response to an agent. - pub fn respond(&self, id: HandlerId, output: AGN::Output) { + pub fn respond(&self, id: HandlerId, output: ::Output) { self.responder.respond(id, output); } @@ -952,7 +1037,7 @@ pub enum AgentLifecycleEvent { /// Client connected Connected(HandlerId), /// Received mesasge from Client - Input(AGN::Input, HandlerId), + Input(::Input, HandlerId), /// Client disconnected Disconnected(HandlerId), /// Request to destroy agent From 01d639873344e6b7951923d4fa9f529829fbd07f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kawalec?= Date: Mon, 11 May 2020 13:19:30 +0200 Subject: [PATCH 02/11] we're using a trait alias and a clearer implementation of the constraints --- yew/src/agent.rs | 203 ++++++++++++++++++++--------------------------- yew/src/lib.rs | 2 + 2 files changed, 86 insertions(+), 119 deletions(-) diff --git a/yew/src/agent.rs b/yew/src/agent.rs index a3da7fc465c..11b984e7d73 100644 --- a/yew/src/agent.rs +++ b/yew/src/agent.rs @@ -69,7 +69,14 @@ impl Deserialize<'de>> Packed for T { } /// Type alias to a sharable Slab that owns optional callbacks that emit messages of the type of the specified Agent. -type SharedOutputSlab = Shared::Reach as Discoverer>::Output>>>>; +type SharedOutputSlab = Shared>>>; + +pub trait AsyncAgent = Agent +where ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>; + +pub trait SyncAgent = Agent +where ::Input: fmt::Debug; /// Id of responses handler. #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)] @@ -91,7 +98,7 @@ impl HandlerId { /// This trait allows registering or getting the address of a worker. pub trait Bridged: Agent + Sized + 'static { /// Creates a messaging bridge between a worker and the component. - fn bridge(callback: Callback<::Output>) -> Box>; + fn bridge(callback: Callback) -> Box>; } /// This trait allows the creation of a dispatcher to an existing agent that will not send replies when messages are sent. @@ -144,11 +151,8 @@ pub trait Threaded { fn register(); } -impl Threaded for AGN +impl Threaded for AGN where - AGN: Agent>, - I: Serialize + for<'de> Deserialize<'de>, - O: Serialize + for<'de> Deserialize<'de> { fn register() { let scope = AgentScope::::new(); @@ -157,7 +161,7 @@ where let upd = AgentLifecycleEvent::Create(link); scope.send(upd); let handler = move |data: Vec| { - let msg = ToWorker::<::Input>::unpack(&data); + let msg = ToWorker::::unpack(&data); match msg { ToWorker::Connected(id) => { let upd = AgentLifecycleEvent::Connected(id); @@ -182,7 +186,7 @@ where } } }; - let loaded: FromWorker<::Output> = FromWorker::WorkerLoaded; + let loaded: FromWorker = FromWorker::WorkerLoaded; let loaded = loaded.pack(); cfg_match! { feature = "std_web" => js! { @@ -204,8 +208,9 @@ where impl Bridged for T where T: Agent, + ::Reach: Dispatchable, { - fn bridge(callback: Callback<::Output>) -> Box> { + fn bridge(callback: Callback) -> Box> { Self::Reach::spawn_or_join(Some(callback)) } } @@ -213,22 +218,20 @@ where impl Dispatched for T where T: Agent, - ::Reach: Dispatchable, + ::Reach: Dispatchable, { fn dispatcher() -> Dispatcher { - Dispatcher(Self::Reach::spawn_or_join::(None)) + Dispatcher(Self::Reach::spawn_or_join(None)) } } /// Determine a visibility of an agent. #[doc(hidden)] pub trait Discoverer { - type Input: fmt::Debug; - type Output; + type LocalAgent: Agent; /// Spawns an agent and returns `Bridge` implementation. - fn spawn_or_join(_callback: Option>) -> Box> - where AGN::Reach: Discoverer + fn spawn_or_join(_callback: Option::Output>>) -> Box> { unimplemented!( "The Reach type that you tried to use with this Agent does not have @@ -241,7 +244,7 @@ https://docs.rs/yew/latest/yew/agent/ for other Reach options." /// Bridge to a specific kind of worker. pub trait Bridge { /// Send a message to an agent. - fn send(&mut self, msg: ::Input); + fn send(&mut self, msg: AGN::Input); } // <<< SAME THREAD >>> @@ -266,7 +269,7 @@ impl LocalAgent { self.slab.clone() } - fn create_bridge(&mut self, callback: Option::Output>>) -> ContextBridge { + fn create_bridge(&mut self, callback: Option>) -> ContextBridge { let respondable = callback.is_some(); let mut slab = self.slab.borrow_mut(); let id: usize = slab.insert(callback); @@ -290,17 +293,14 @@ thread_local! { /// Create a single instance in the current thread. #[allow(missing_debug_implementations)] -pub struct Context { - _input: PhantomData, - _output: PhantomData +pub struct Context { + _agent: PhantomData } -impl Discoverer for Context { - type Input = I; - type Output = O; +impl Discoverer for Context { + type LocalAgent = AGN; - fn spawn_or_join(callback: Option>) -> Box> - where AGN::Reach: Discoverer + fn spawn_or_join(callback: Option>) -> Box> { let mut scope_to_init = None; let bridge = LOCAL_AGENTS_POOL.with(|pool| { @@ -329,14 +329,14 @@ impl Discoverer for Context { } } -impl Dispatchable for Context {} +impl Dispatchable for Context {} struct SlabResponder { - slab: Shared::Output>>>>, + slab: Shared>>>, } impl Responder for SlabResponder { - fn respond(&self, id: HandlerId, output: ::Output) { + fn respond(&self, id: HandlerId, output: AGN::Output) { locate_callback_and_respond::(&self.slab, id, output); } } @@ -346,7 +346,7 @@ impl Responder for SlabResponder { fn locate_callback_and_respond( slab: &SharedOutputSlab, id: HandlerId, - output: ::Output, + output: AGN::Output, ) { let callback = { let slab = slab.borrow(); @@ -370,7 +370,7 @@ struct ContextBridge { } impl Bridge for ContextBridge { - fn send(&mut self, msg: ::Input) { + fn send(&mut self, msg: AGN::Input) { let upd = AgentLifecycleEvent::Input(msg, self.id); self.scope.send(upd); } @@ -407,17 +407,14 @@ impl Drop for ContextBridge { /// Create an instance in the current thread. #[allow(missing_debug_implementations)] -pub struct Job { - _input: PhantomData, - _output: PhantomData +pub struct Job { + _agent: PhantomData } -impl Discoverer for Job { - type Input = I; - type Output = O; +impl Discoverer for Job { + type LocalAgent = AGN; - fn spawn_or_join(callback: Option>) -> Box> - where AGN::Reach: Discoverer + fn spawn_or_join(callback: Option>) -> Box> { let callback = callback.expect("Callback required for Job"); let scope = AgentScope::::new(); @@ -435,11 +432,11 @@ impl Discoverer for Job { const SINGLETON_ID: HandlerId = HandlerId(0, true); struct CallbackResponder { - callback: Callback<::Output>, + callback: Callback, } impl Responder for CallbackResponder { - fn respond(&self, id: HandlerId, output: ::Output) { + fn respond(&self, id: HandlerId, output: AGN::Output) { assert_eq!(id.raw_id(), SINGLETON_ID.raw_id()); self.callback.emit(output); } @@ -449,8 +446,8 @@ struct JobBridge { scope: AgentScope, } -impl Bridge for JobBridge { - fn send(&mut self, msg: ::Input) { +impl Bridge for JobBridge { + fn send(&mut self, msg: AGN::Input) { let upd = AgentLifecycleEvent::Input(msg, SINGLETON_ID); self.scope.send(upd); } @@ -469,29 +466,24 @@ impl Drop for JobBridge { /// Create a new instance for every bridge. #[allow(missing_debug_implementations)] -pub struct Private { - _input: PhantomData, - _output: PhantomData +pub struct Private { + _agent: PhantomData } -impl Discoverer for Private -where I: Serialize + for<'de> Deserialize<'de>, - O: Serialize + for<'de> Deserialize<'de> +impl Discoverer for Private { - type Input = I; - type Output = O; + type LocalAgent = AGN; - fn spawn_or_join(callback: Option>) -> Box> - where AGN::Reach: Discoverer + fn spawn_or_join(callback: Option>) -> Box> { let callback = callback.expect("Callback required for Private agents"); let handler = move |data: Vec, #[cfg(feature = "std_web")] worker: Value, #[cfg(feature = "web_sys")] worker: &Worker| { - let msg = FromWorker::::unpack(&data); + let msg = FromWorker::::unpack(&data); match msg { FromWorker::WorkerLoaded => { - send_to_remote::(&worker, ToWorker::Connected(SINGLETON_ID)); + send_to_remote::(&worker, ToWorker::Connected(SINGLETON_ID)); } FromWorker::ProcessOutput(id, output) => { assert_eq!(id.raw_id(), SINGLETON_ID.raw_id()); @@ -527,8 +519,7 @@ where I: Serialize + for<'de> Deserialize<'de>, } /// A connection manager for components interaction with workers. -pub struct PrivateBridge -where ::Input: Serialize + for<'de> Deserialize<'de> +pub struct PrivateBridge { #[cfg(feature = "std_web")] worker: Value, @@ -537,18 +528,16 @@ where ::Input: Serialize + for<'de> Deserialize<'de> _agent: PhantomData, } -impl fmt::Debug for PrivateBridge -where ::Input: Serialize + for<'de> Deserialize<'de> +impl fmt::Debug for PrivateBridge { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("PrivateBridge<_>") } } -impl Bridge for PrivateBridge -where ::Input: Serialize + for<'de> Deserialize<'de> +impl Bridge for PrivateBridge { - fn send(&mut self, msg: ::Input) { + fn send(&mut self, msg: AGN::Input) { // TODO(#937): Important! Implement. // Use a queue to collect a messages if an instance is not ready // and send them to an agent when it will reported readiness. @@ -567,19 +556,18 @@ where ::Input: Serialize + for<'de> Deserialize<'de> } } -impl Drop for PrivateBridge -where ::Input: Serialize + for<'de> Deserialize<'de> +impl Drop for PrivateBridge { fn drop(&mut self) { let disconnected = ToWorker::Disconnected(SINGLETON_ID); - send_to_remote::<::Input>(&self.worker, disconnected); + send_to_remote::(&self.worker, disconnected); let destroy = ToWorker::Destroy; - send_to_remote::<::Input>(&self.worker, destroy); + send_to_remote::(&self.worker, destroy); } } -struct RemoteAgent { +struct RemoteAgent { #[cfg(feature = "std_web")] worker: Value, #[cfg(feature = "web_sys")] @@ -587,9 +575,7 @@ struct RemoteAgent { slab: SharedOutputSlab, } -impl RemoteAgent -where ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +impl RemoteAgent { pub fn new( #[cfg(feature = "std_web")] worker: Value, @@ -599,7 +585,7 @@ where ::Input: Serialize + for<'de> Deserialize<'de>, RemoteAgent { worker, slab } } - fn create_bridge(&mut self, callback: Option::Output>>) -> PublicBridge { + fn create_bridge(&mut self, callback: Option>) -> PublicBridge { let respondable = callback.is_some(); let mut slab = self.slab.borrow_mut(); let id: usize = slab.insert(callback); @@ -629,34 +615,29 @@ thread_local! { /// Create a single instance in a tab. #[allow(missing_debug_implementations)] -pub struct Public { - _input: PhantomData, - _output: PhantomData +pub struct Public { + _agent: PhantomData } -impl Discoverer for Public -where I: Serialize + for<'de> Deserialize<'de>, - O: Serialize + for<'de> Deserialize<'de> +impl Discoverer for Public { - type Input = I; - type Output = O; + type LocalAgent = AGN; - fn spawn_or_join(callback: Option>) -> Box> - where AGN::Reach: Discoverer + fn spawn_or_join(callback: Option>) -> Box> { let bridge = REMOTE_AGENTS_POOL.with(|pool| { let mut pool = pool.borrow_mut(); match pool.entry::>() { anymap::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback), anymap::Entry::Vacant(entry) => { - let slab: Shared>>> = + let slab: Shared>>> = Rc::new(RefCell::new(Slab::new())); let handler = { let slab = slab.clone(); move |data: Vec, #[cfg(feature = "std_web")] worker: Value, #[cfg(feature = "web_sys")] worker: &Worker| { - let msg = FromWorker::::unpack(&data); + let msg = FromWorker::::unpack(&data); match msg { FromWorker::WorkerLoaded => { REMOTE_AGENTS_LOADED.with(|loaded| { @@ -712,15 +693,11 @@ where I: Serialize + for<'de> Deserialize<'de>, } } -impl Dispatchable for Public -where I: Serialize + for<'de> Deserialize<'de>, - O: Serialize + for<'de> Deserialize<'de> +impl Dispatchable for Public {} /// A connection manager for components interaction with workers. -pub struct PublicBridge -where ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, +pub struct PublicBridge { #[cfg(feature = "std_web")] worker: Value, @@ -730,18 +707,14 @@ where ::Input: Serialize + for<'de> Deserialize<'de>, _agent: PhantomData, } -impl fmt::Debug for PublicBridge -where ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, +impl fmt::Debug for PublicBridge { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("PublicBridge<_>") } } -impl PublicBridge -where ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, +impl PublicBridge { fn worker_is_loaded(&self) -> bool { REMOTE_AGENTS_LOADED.with(|loaded| loaded.borrow().contains(&TypeId::of::())) @@ -762,9 +735,9 @@ where ::Input: Serialize + for<'de> Deserialize<'de>, } /// Send a message to the worker, queuing it up if necessary - fn send_message(&self, msg: ToWorker<::Input>) { + fn send_message(&self, msg: ToWorker) { if self.worker_is_loaded() { - send_to_remote::<::Input>(&self.worker, msg); + send_to_remote::(&self.worker, msg); } else { self.msg_to_queue(msg.pack()); } @@ -789,19 +762,15 @@ where Input: Serialize + for<'de> Deserialize<'de> }; } -impl Bridge for PublicBridge -where ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, +impl Bridge for PublicBridge { - fn send(&mut self, msg: ::Input) { + fn send(&mut self, msg: AGN::Input) { let msg = ToWorker::ProcessInput(self.id, msg); self.send_message(msg); } } -impl Drop for PublicBridge -where ::Input: Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>, +impl Drop for PublicBridge { fn drop(&mut self) { let terminate_worker = REMOTE_AGENTS_POOL.with(|pool| { @@ -841,14 +810,12 @@ where ::Input: Serialize + for<'de> Deserialize<'de>, /// Create a single instance in a browser. #[allow(missing_debug_implementations)] -pub struct Global { - _input: PhantomData, - _output: PhantomData +pub struct Global { + _agent: PhantomData } -impl Discoverer for Global { - type Input = I; - type Output = O; +impl Discoverer for Global { + type LocalAgent = AGN; } /// Declares the behavior of the agent. @@ -858,9 +825,9 @@ pub trait Agent: Sized + 'static { /// Type of an input message. type Message; /// Incoming message type. - //type Input: Serialize + for<'de> Deserialize<'de>; + type Input; /// Outgoing message type. - //type Output: Serialize + for<'de> Deserialize<'de>; + type Output; /// Creates an instance of an agent. fn create(link: AgentLink) -> Self; @@ -872,7 +839,7 @@ pub trait Agent: Sized + 'static { fn connected(&mut self, _id: HandlerId) {} /// This method called on every incoming message. - fn handle_input(&mut self, msg: ::Input, id: HandlerId); + fn handle_input(&mut self, msg: Self::Input, id: HandlerId); /// This method called on when a new bridge destroyed. fn disconnected(&mut self, _id: HandlerId) {} @@ -938,16 +905,14 @@ impl Default for AgentScope { /// Defines communication from Worker to Consumers pub trait Responder { /// Implementation for communication channel from Worker to Consumers - fn respond(&self, id: HandlerId, output: ::Output); + fn respond(&self, id: HandlerId, output: AGN::Output); } struct WorkerResponder {} -impl Responder for WorkerResponder -where AGN: Agent, - ::Output: Serialize + for<'de> Deserialize<'de> +impl Responder for WorkerResponder { - fn respond(&self, id: HandlerId, output: ::Output) { + fn respond(&self, id: HandlerId, output: AGN::Output) { let msg = FromWorker::ProcessOutput(id, output); let data = msg.pack(); cfg_match! { @@ -979,7 +944,7 @@ impl AgentLink { } /// Send response to an agent. - pub fn respond(&self, id: HandlerId, output: ::Output) { + pub fn respond(&self, id: HandlerId, output: AGN::Output) { self.responder.respond(id, output); } @@ -1037,7 +1002,7 @@ pub enum AgentLifecycleEvent { /// Client connected Connected(HandlerId), /// Received mesasge from Client - Input(::Input, HandlerId), + Input(AGN::Input, HandlerId), /// Client disconnected Disconnected(HandlerId), /// Request to destroy agent diff --git a/yew/src/lib.rs b/yew/src/lib.rs index 26d35dff5fa..9eae59e42ae 100644 --- a/yew/src/lib.rs +++ b/yew/src/lib.rs @@ -1,6 +1,8 @@ #![allow(clippy::needless_doctest_main)] #![doc(html_logo_url = "https://static.yew.rs/logo.svg")] +#![feature(trait_alias)] + //! # Yew Framework - API Documentation //! //! Yew is a modern Rust framework for creating multi-threaded front-end web apps with WebAssembly From 7a2019f166eef78e8aede9a8b6b326a2fd01a8df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kawalec?= Date: Mon, 11 May 2020 14:17:56 +0200 Subject: [PATCH 03/11] after merging with separated agent --- examples/multi_thread/src/context.rs | 2 +- examples/multi_thread/src/job.rs | 2 +- examples/multi_thread/src/native_worker.rs | 2 +- examples/pub_sub/src/event_bus.rs | 2 +- yew-router/src/agent/mod.rs | 2 +- yew/src/agent.rs | 1136 -------------------- yew/src/agent/link.rs | 190 ++++ yew/src/agent/local/context.rs | 140 +++ yew/src/agent/local/job.rs | 61 ++ yew/src/agent/local/mod.rs | 7 + yew/src/agent/mod.rs | 117 ++ yew/src/agent/pool.rs | 86 ++ yew/src/agent/worker/mod.rs | 158 +++ yew/src/agent/worker/private.rs | 116 ++ yew/src/agent/worker/public.rs | 310 ++++++ yew/src/lib.rs | 4 +- 16 files changed, 1192 insertions(+), 1143 deletions(-) delete mode 100644 yew/src/agent.rs create mode 100644 yew/src/agent/link.rs create mode 100644 yew/src/agent/local/context.rs create mode 100644 yew/src/agent/local/job.rs create mode 100644 yew/src/agent/local/mod.rs create mode 100644 yew/src/agent/mod.rs create mode 100644 yew/src/agent/pool.rs create mode 100644 yew/src/agent/worker/mod.rs create mode 100644 yew/src/agent/worker/private.rs create mode 100644 yew/src/agent/worker/public.rs diff --git a/examples/multi_thread/src/context.rs b/examples/multi_thread/src/context.rs index 37d41f36c00..abfa799a2ac 100644 --- a/examples/multi_thread/src/context.rs +++ b/examples/multi_thread/src/context.rs @@ -25,7 +25,7 @@ pub struct Worker { } impl Agent for Worker { - type Reach = Context; + type Reach = Context; type Message = Msg; type Input = Request; type Output = Response; diff --git a/examples/multi_thread/src/job.rs b/examples/multi_thread/src/job.rs index 4128ceef49b..948c874dc6d 100644 --- a/examples/multi_thread/src/job.rs +++ b/examples/multi_thread/src/job.rs @@ -25,7 +25,7 @@ pub struct Worker { } impl Agent for Worker { - type Reach = Job; + type Reach = Job; type Message = Msg; type Input = Request; type Output = Response; diff --git a/examples/multi_thread/src/native_worker.rs b/examples/multi_thread/src/native_worker.rs index 385635c95ef..387ff124650 100644 --- a/examples/multi_thread/src/native_worker.rs +++ b/examples/multi_thread/src/native_worker.rs @@ -25,7 +25,7 @@ pub struct Worker { } impl Agent for Worker { - type Reach = Public; + type Reach = Public; type Message = Msg; type Input = Request; type Output = Response; diff --git a/examples/pub_sub/src/event_bus.rs b/examples/pub_sub/src/event_bus.rs index 977793f86f9..adc61013afe 100644 --- a/examples/pub_sub/src/event_bus.rs +++ b/examples/pub_sub/src/event_bus.rs @@ -13,7 +13,7 @@ pub struct EventBus { } impl Agent for EventBus { - type Reach = Context; + type Reach = Context; type Message = (); type Input = Request; type Output = String; diff --git a/yew-router/src/agent/mod.rs b/yew-router/src/agent/mod.rs index 73699221122..c5a0e814ded 100644 --- a/yew-router/src/agent/mod.rs +++ b/yew-router/src/agent/mod.rs @@ -87,7 +87,7 @@ where type Input = RouteRequest; type Message = Msg; type Output = Route; - type Reach = Context; + type Reach = Context; fn create(link: AgentLink>) -> Self { let callback = link.callback(Msg::BrowserNavigationRouteChanged); diff --git a/yew/src/agent.rs b/yew/src/agent.rs deleted file mode 100644 index 11b984e7d73..00000000000 --- a/yew/src/agent.rs +++ /dev/null @@ -1,1136 +0,0 @@ -//! This module contains types to support multi-threading in Yew. - -use crate::callback::Callback; -use crate::scheduler::{scheduler, Runnable, Shared}; -use anymap::{self, AnyMap}; -use cfg_if::cfg_if; -use cfg_match::cfg_match; -use log::warn; -use serde::{Deserialize, Serialize}; -use slab::Slab; -use std::any::TypeId; -use std::cell::RefCell; -use std::collections::{hash_map, HashMap, HashSet}; -use std::fmt; -use std::marker::PhantomData; -use std::ops::{Deref, DerefMut}; -use std::rc::Rc; -cfg_if! { - if #[cfg(feature = "std_web")] { - use stdweb::Value; - #[allow(unused_imports)] - use stdweb::{_js_impl, js}; - } else if #[cfg(feature = "web_sys")] { - use crate::utils; - use js_sys::{Array, Reflect, Uint8Array}; - use wasm_bindgen::{closure::Closure, JsCast, JsValue}; - use web_sys::{Blob, BlobPropertyBag, DedicatedWorkerGlobalScope, MessageEvent, Url, Worker, WorkerOptions}; - } -} - -/// Serializable messages to worker -#[derive(Serialize, Deserialize, Debug)] -enum ToWorker { - /// Client is connected - Connected(HandlerId), - /// Incoming message to Worker - ProcessInput(HandlerId, T), - /// Client is disconnected - Disconnected(HandlerId), - /// Worker should be terminated - Destroy, -} - -/// Serializable messages sent by worker to consumer -#[derive(Serialize, Deserialize, Debug)] -enum FromWorker { - /// Worker sends this message when `wasm` bundle has loaded. - WorkerLoaded, - /// Outgoing message to consumer - ProcessOutput(HandlerId, T), -} - -/// Message packager, based on serde::Serialize/Deserialize -pub trait Packed { - /// Pack serializable message into Vec - fn pack(&self) -> Vec; - /// Unpack deserializable message of byte slice - fn unpack(data: &[u8]) -> Self; -} - -impl Deserialize<'de>> Packed for T { - fn pack(&self) -> Vec { - bincode::serialize(&self).expect("can't serialize an agent message") - } - - fn unpack(data: &[u8]) -> Self { - bincode::deserialize(&data).expect("can't deserialize an agent message") - } -} - -/// Type alias to a sharable Slab that owns optional callbacks that emit messages of the type of the specified Agent. -type SharedOutputSlab = Shared>>>; - -pub trait AsyncAgent = Agent -where ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>; - -pub trait SyncAgent = Agent -where ::Input: fmt::Debug; - -/// Id of responses handler. -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)] -pub struct HandlerId(usize, bool); - -impl HandlerId { - fn new(id: usize, respondable: bool) -> Self { - HandlerId(id, respondable) - } - fn raw_id(self) -> usize { - self.0 - } - /// Indicates if a handler id corresponds to callback in the Agent runtime. - pub fn is_respondable(self) -> bool { - self.1 - } -} - -/// This trait allows registering or getting the address of a worker. -pub trait Bridged: Agent + Sized + 'static { - /// Creates a messaging bridge between a worker and the component. - fn bridge(callback: Callback) -> Box>; -} - -/// This trait allows the creation of a dispatcher to an existing agent that will not send replies when messages are sent. -pub trait Dispatched: Agent + Sized + 'static { - /// Creates a dispatcher to the agent that will not send messages back. - /// - /// # Note - /// Dispatchers don't have `HandlerId`s and therefore `Agent::handle` will be supplied `None` - /// for the `id` parameter, and `connected` and `disconnected` will not be called. - /// - /// # Important - /// Because the Agents using Context or Public reaches use the number of existing bridges to - /// keep track of if the agent itself should exist, creating dispatchers will not guarantee that - /// an Agent will exist to service requests sent from Dispatchers. You **must** keep at least one - /// bridge around if you wish to use a dispatcher. If you are using agents in a write-only manner, - /// then it is suggested that you create a bridge that handles no-op responses as high up in the - /// component hierarchy as possible - oftentimes the root component for simplicity's sake. - fn dispatcher() -> Dispatcher; -} - -/// A newtype around a bridge to indicate that it is distinct from a normal bridge -pub struct Dispatcher(Box>); - -impl fmt::Debug for Dispatcher { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("Dispatcher<_>") - } -} - -impl Deref for Dispatcher { - type Target = dyn Bridge; - - fn deref(&self) -> &Self::Target { - self.0.deref() - } -} -impl DerefMut for Dispatcher { - fn deref_mut(&mut self) -> &mut Self::Target { - self.0.deref_mut() - } -} - -/// Marker trait to indicate which Discoverers are able to be used with dispatchers. -pub trait Dispatchable: Discoverer {} - -/// Implements rules to register a worker in a separate thread. -pub trait Threaded { - /// Executes an agent in the current environment. - /// Uses in `main` function of a worker. - fn register(); -} - -impl Threaded for AGN -where -{ - fn register() { - let scope = AgentScope::::new(); - let responder = WorkerResponder {}; - let link = AgentLink::connect(&scope, responder); - let upd = AgentLifecycleEvent::Create(link); - scope.send(upd); - let handler = move |data: Vec| { - let msg = ToWorker::::unpack(&data); - match msg { - ToWorker::Connected(id) => { - let upd = AgentLifecycleEvent::Connected(id); - scope.send(upd); - } - ToWorker::ProcessInput(id, value) => { - let upd = AgentLifecycleEvent::Input(value, id); - scope.send(upd); - } - ToWorker::Disconnected(id) => { - let upd = AgentLifecycleEvent::Disconnected(id); - scope.send(upd); - } - ToWorker::Destroy => { - let upd = AgentLifecycleEvent::Destroy; - scope.send(upd); - // Terminates web worker - cfg_match! { - feature = "std_web" => js! { self.close(); }, - feature = "web_sys" => worker_self().close(), - }; - } - } - }; - let loaded: FromWorker = FromWorker::WorkerLoaded; - let loaded = loaded.pack(); - cfg_match! { - feature = "std_web" => js! { - var handler = @{handler}; - self.onmessage = function(event) { - handler(event.data); - }; - self.postMessage(@{loaded}); - }, - feature = "web_sys" => ({ - let worker = worker_self(); - worker.set_onmessage_closure(handler); - worker.post_message_vec(loaded); - }), - }; - } -} - -impl Bridged for T -where - T: Agent, - ::Reach: Dispatchable, -{ - fn bridge(callback: Callback) -> Box> { - Self::Reach::spawn_or_join(Some(callback)) - } -} - -impl Dispatched for T -where - T: Agent, - ::Reach: Dispatchable, -{ - fn dispatcher() -> Dispatcher { - Dispatcher(Self::Reach::spawn_or_join(None)) - } -} - -/// Determine a visibility of an agent. -#[doc(hidden)] -pub trait Discoverer { - type LocalAgent: Agent; - - /// Spawns an agent and returns `Bridge` implementation. - fn spawn_or_join(_callback: Option::Output>>) -> Box> - { - unimplemented!( - "The Reach type that you tried to use with this Agent does not have -Discoverer properly implemented for it yet. Please see -https://docs.rs/yew/latest/yew/agent/ for other Reach options." - ); - } -} - -/// Bridge to a specific kind of worker. -pub trait Bridge { - /// Send a message to an agent. - fn send(&mut self, msg: AGN::Input); -} - -// <<< SAME THREAD >>> - -struct LocalAgent { - scope: AgentScope, - slab: SharedOutputSlab, -} - -type Last = bool; - -impl LocalAgent { - pub fn new(scope: &AgentScope) -> Self { - let slab = Rc::new(RefCell::new(Slab::new())); - LocalAgent { - scope: scope.clone(), - slab, - } - } - - fn slab(&self) -> SharedOutputSlab { - self.slab.clone() - } - - fn create_bridge(&mut self, callback: Option>) -> ContextBridge { - let respondable = callback.is_some(); - let mut slab = self.slab.borrow_mut(); - let id: usize = slab.insert(callback); - let id = HandlerId::new(id, respondable); - ContextBridge { - scope: self.scope.clone(), - id, - } - } - - fn remove_bridge(&mut self, bridge: &ContextBridge) -> Last { - let mut slab = self.slab.borrow_mut(); - let _ = slab.remove(bridge.id.raw_id()); - slab.is_empty() - } -} - -thread_local! { - static LOCAL_AGENTS_POOL: RefCell = RefCell::new(AnyMap::new()); -} - -/// Create a single instance in the current thread. -#[allow(missing_debug_implementations)] -pub struct Context { - _agent: PhantomData -} - -impl Discoverer for Context { - type LocalAgent = AGN; - - fn spawn_or_join(callback: Option>) -> Box> - { - let mut scope_to_init = None; - let bridge = LOCAL_AGENTS_POOL.with(|pool| { - let mut pool = pool.borrow_mut(); - match pool.entry::>() { - anymap::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback), - anymap::Entry::Vacant(entry) => { - let scope = AgentScope::::new(); - let launched = LocalAgent::new(&scope); - let responder = SlabResponder { - slab: launched.slab(), - }; - scope_to_init = Some((scope, responder)); - entry.insert(launched).create_bridge(callback) - } - } - }); - if let Some((scope, responder)) = scope_to_init { - let agent_link = AgentLink::connect(&scope, responder); - let upd = AgentLifecycleEvent::Create(agent_link); - scope.send(upd); - } - let upd = AgentLifecycleEvent::Connected(bridge.id); - bridge.scope.send(upd); - Box::new(bridge) - } -} - -impl Dispatchable for Context {} - -struct SlabResponder { - slab: Shared>>>, -} - -impl Responder for SlabResponder { - fn respond(&self, id: HandlerId, output: AGN::Output) { - locate_callback_and_respond::(&self.slab, id, output); - } -} - -/// The slab contains the callback, the id is used to look up the callback, -/// and the output is the message that will be sent via the callback. -fn locate_callback_and_respond( - slab: &SharedOutputSlab, - id: HandlerId, - output: AGN::Output, -) { - let callback = { - let slab = slab.borrow(); - match slab.get(id.raw_id()).cloned() { - Some(callback) => callback, - None => { - warn!("Id of handler does not exist in the slab: {}.", id.raw_id()); - return; - } - } - }; - match callback { - Some(callback) => callback.emit(output), - None => warn!("The Id of the handler: {}, while present in the slab, is not associated with a callback.", id.raw_id()), - } -} - -struct ContextBridge { - scope: AgentScope, - id: HandlerId, -} - -impl Bridge for ContextBridge { - fn send(&mut self, msg: AGN::Input) { - let upd = AgentLifecycleEvent::Input(msg, self.id); - self.scope.send(upd); - } -} - -impl Drop for ContextBridge { - fn drop(&mut self) { - let terminate_worker = LOCAL_AGENTS_POOL.with(|pool| { - let mut pool = pool.borrow_mut(); - let terminate_worker = { - if let Some(launched) = pool.get_mut::>() { - launched.remove_bridge(self) - } else { - false - } - }; - - if terminate_worker { - pool.remove::>(); - } - - terminate_worker - }); - - let upd = AgentLifecycleEvent::Disconnected(self.id); - self.scope.send(upd); - - if terminate_worker { - let upd = AgentLifecycleEvent::Destroy; - self.scope.send(upd); - } - } -} - -/// Create an instance in the current thread. -#[allow(missing_debug_implementations)] -pub struct Job { - _agent: PhantomData -} - -impl Discoverer for Job { - type LocalAgent = AGN; - - fn spawn_or_join(callback: Option>) -> Box> - { - let callback = callback.expect("Callback required for Job"); - let scope = AgentScope::::new(); - let responder = CallbackResponder { callback }; - let agent_link = AgentLink::connect(&scope, responder); - let upd = AgentLifecycleEvent::Create(agent_link); - scope.send(upd); - let upd = AgentLifecycleEvent::Connected(SINGLETON_ID); - scope.send(upd); - let bridge = JobBridge { scope }; - Box::new(bridge) - } -} - -const SINGLETON_ID: HandlerId = HandlerId(0, true); - -struct CallbackResponder { - callback: Callback, -} - -impl Responder for CallbackResponder { - fn respond(&self, id: HandlerId, output: AGN::Output) { - assert_eq!(id.raw_id(), SINGLETON_ID.raw_id()); - self.callback.emit(output); - } -} - -struct JobBridge { - scope: AgentScope, -} - -impl Bridge for JobBridge { - fn send(&mut self, msg: AGN::Input) { - let upd = AgentLifecycleEvent::Input(msg, SINGLETON_ID); - self.scope.send(upd); - } -} - -impl Drop for JobBridge { - fn drop(&mut self) { - let upd = AgentLifecycleEvent::Disconnected(SINGLETON_ID); - self.scope.send(upd); - let upd = AgentLifecycleEvent::Destroy; - self.scope.send(upd); - } -} - -// <<< SEPARATE THREAD >>> - -/// Create a new instance for every bridge. -#[allow(missing_debug_implementations)] -pub struct Private { - _agent: PhantomData -} - -impl Discoverer for Private -{ - type LocalAgent = AGN; - - fn spawn_or_join(callback: Option>) -> Box> - { - let callback = callback.expect("Callback required for Private agents"); - let handler = move |data: Vec, - #[cfg(feature = "std_web")] worker: Value, - #[cfg(feature = "web_sys")] worker: &Worker| { - let msg = FromWorker::::unpack(&data); - match msg { - FromWorker::WorkerLoaded => { - send_to_remote::(&worker, ToWorker::Connected(SINGLETON_ID)); - } - FromWorker::ProcessOutput(id, output) => { - assert_eq!(id.raw_id(), SINGLETON_ID.raw_id()); - callback.emit(output); - } - } - }; - - // TODO(#947): Drop handler when bridge is dropped - let name_of_resource = AGN::name_of_resource(); - let worker = cfg_match! { - feature = "std_web" => js! { - var worker = new Worker(@{name_of_resource}); - var handler = @{handler}; - worker.onmessage = function(event) { - handler(event.data, worker); - }; - return worker; - }, - feature = "web_sys" => ({ - let worker = worker_new(name_of_resource, AGN::is_module()); - let worker_clone = worker.clone(); - worker.set_onmessage_closure(move |data: Vec| handler(data, &worker_clone)); - worker - }), - }; - let bridge = PrivateBridge { - worker, - _agent: PhantomData, - }; - Box::new(bridge) - } -} - -/// A connection manager for components interaction with workers. -pub struct PrivateBridge -{ - #[cfg(feature = "std_web")] - worker: Value, - #[cfg(feature = "web_sys")] - worker: Worker, - _agent: PhantomData, -} - -impl fmt::Debug for PrivateBridge -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("PrivateBridge<_>") - } -} - -impl Bridge for PrivateBridge -{ - fn send(&mut self, msg: AGN::Input) { - // TODO(#937): Important! Implement. - // Use a queue to collect a messages if an instance is not ready - // and send them to an agent when it will reported readiness. - let msg = ToWorker::ProcessInput(SINGLETON_ID, msg).pack(); - cfg_match! { - feature = "std_web" => ({ - let worker = &self.worker; - js! { - var worker = @{worker}; - var bytes = @{msg}; - worker.postMessage(bytes); - }; - }), - feature = "web_sys" => self.worker.post_message_vec(msg), - } - } -} - -impl Drop for PrivateBridge -{ - fn drop(&mut self) { - let disconnected = ToWorker::Disconnected(SINGLETON_ID); - send_to_remote::(&self.worker, disconnected); - - let destroy = ToWorker::Destroy; - send_to_remote::(&self.worker, destroy); - } -} - -struct RemoteAgent { - #[cfg(feature = "std_web")] - worker: Value, - #[cfg(feature = "web_sys")] - worker: Worker, - slab: SharedOutputSlab, -} - -impl RemoteAgent -{ - pub fn new( - #[cfg(feature = "std_web")] worker: Value, - #[cfg(feature = "web_sys")] worker: Worker, - slab: SharedOutputSlab, - ) -> Self { - RemoteAgent { worker, slab } - } - - fn create_bridge(&mut self, callback: Option>) -> PublicBridge { - let respondable = callback.is_some(); - let mut slab = self.slab.borrow_mut(); - let id: usize = slab.insert(callback); - let id = HandlerId::new(id, respondable); - let bridge = PublicBridge { - worker: self.worker.clone(), - id, - _agent: PhantomData, - }; - bridge.send_message(ToWorker::Connected(bridge.id)); - - bridge - } - - fn remove_bridge(&mut self, bridge: &PublicBridge) -> Last { - let mut slab = self.slab.borrow_mut(); - let _ = slab.remove(bridge.id.raw_id()); - slab.is_empty() - } -} - -thread_local! { - static REMOTE_AGENTS_POOL: RefCell = RefCell::new(AnyMap::new()); - static REMOTE_AGENTS_LOADED: RefCell> = RefCell::new(HashSet::new()); - static REMOTE_AGENTS_EARLY_MSGS_QUEUE: RefCell>>> = RefCell::new(HashMap::new()); -} - -/// Create a single instance in a tab. -#[allow(missing_debug_implementations)] -pub struct Public { - _agent: PhantomData -} - -impl Discoverer for Public -{ - type LocalAgent = AGN; - - fn spawn_or_join(callback: Option>) -> Box> - { - let bridge = REMOTE_AGENTS_POOL.with(|pool| { - let mut pool = pool.borrow_mut(); - match pool.entry::>() { - anymap::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback), - anymap::Entry::Vacant(entry) => { - let slab: Shared>>> = - Rc::new(RefCell::new(Slab::new())); - let handler = { - let slab = slab.clone(); - move |data: Vec, - #[cfg(feature = "std_web")] worker: Value, - #[cfg(feature = "web_sys")] worker: &Worker| { - let msg = FromWorker::::unpack(&data); - match msg { - FromWorker::WorkerLoaded => { - REMOTE_AGENTS_LOADED.with(|loaded| { - let _ = loaded.borrow_mut().insert(TypeId::of::()); - }); - - REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|queue| { - let mut queue = queue.borrow_mut(); - if let Some(msgs) = queue.get_mut(&TypeId::of::()) { - for msg in msgs.drain(..) { - cfg_match! { - feature = "std_web" => ({ - let worker = &worker; - js! {@{worker}.postMessage(@{msg});}; - }), - feature = "web_sys" => worker.post_message_vec(msg), - } - } - } - }); - } - FromWorker::ProcessOutput(id, output) => { - locate_callback_and_respond::(&slab, id, output); - } - } - } - }; - let name_of_resource = AGN::name_of_resource(); - let worker = cfg_match! { - feature = "std_web" => js! { - var worker = new Worker(@{name_of_resource}); - var handler = @{handler}; - worker.onmessage = function(event) { - handler(event.data, worker); - }; - return worker; - }, - feature = "web_sys" => ({ - let worker = worker_new(name_of_resource, AGN::is_module()); - let worker_clone = worker.clone(); - worker.set_onmessage_closure(move |data: Vec| { - handler(data, &worker_clone); - }); - worker - }), - }; - let launched = RemoteAgent::new(worker, slab); - entry.insert(launched).create_bridge(callback) - } - } - }); - Box::new(bridge) - } -} - -impl Dispatchable for Public -{} - -/// A connection manager for components interaction with workers. -pub struct PublicBridge -{ - #[cfg(feature = "std_web")] - worker: Value, - #[cfg(feature = "web_sys")] - worker: Worker, - id: HandlerId, - _agent: PhantomData, -} - -impl fmt::Debug for PublicBridge -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("PublicBridge<_>") - } -} - -impl PublicBridge -{ - fn worker_is_loaded(&self) -> bool { - REMOTE_AGENTS_LOADED.with(|loaded| loaded.borrow().contains(&TypeId::of::())) - } - - fn msg_to_queue(&self, msg: Vec) { - REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|queue| { - let mut queue = queue.borrow_mut(); - match queue.entry(TypeId::of::()) { - hash_map::Entry::Vacant(record) => { - record.insert(vec![msg]); - } - hash_map::Entry::Occupied(ref mut record) => { - record.get_mut().push(msg); - } - } - }); - } - - /// Send a message to the worker, queuing it up if necessary - fn send_message(&self, msg: ToWorker) { - if self.worker_is_loaded() { - send_to_remote::(&self.worker, msg); - } else { - self.msg_to_queue(msg.pack()); - } - } -} - -fn send_to_remote( - #[cfg(feature = "std_web")] worker: &Value, - #[cfg(feature = "web_sys")] worker: &Worker, - msg: ToWorker, -) -where Input: Serialize + for<'de> Deserialize<'de> -{ - let msg = msg.pack(); - cfg_match! { - feature = "std_web" => js! { - var worker = @{worker}; - var bytes = @{msg}; - worker.postMessage(bytes); - }, - feature = "web_sys" => worker.post_message_vec(msg), - }; -} - -impl Bridge for PublicBridge -{ - fn send(&mut self, msg: AGN::Input) { - let msg = ToWorker::ProcessInput(self.id, msg); - self.send_message(msg); - } -} - -impl Drop for PublicBridge -{ - fn drop(&mut self) { - let terminate_worker = REMOTE_AGENTS_POOL.with(|pool| { - let mut pool = pool.borrow_mut(); - let terminate_worker = { - if let Some(launched) = pool.get_mut::>() { - launched.remove_bridge(self) - } else { - false - } - }; - - if terminate_worker { - pool.remove::>(); - } - - terminate_worker - }); - - let disconnected = ToWorker::Disconnected(self.id); - self.send_message(disconnected); - - if terminate_worker { - let destroy = ToWorker::Destroy; - self.send_message(destroy); - - REMOTE_AGENTS_LOADED.with(|loaded| { - loaded.borrow_mut().remove(&TypeId::of::()); - }); - - REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|queue| { - queue.borrow_mut().remove(&TypeId::of::()); - }); - } - } -} - -/// Create a single instance in a browser. -#[allow(missing_debug_implementations)] -pub struct Global { - _agent: PhantomData -} - -impl Discoverer for Global { - type LocalAgent = AGN; -} - -/// Declares the behavior of the agent. -pub trait Agent: Sized + 'static { - /// Reach capability of the agent. - type Reach: Discoverer; - /// Type of an input message. - type Message; - /// Incoming message type. - type Input; - /// Outgoing message type. - type Output; - - /// Creates an instance of an agent. - fn create(link: AgentLink) -> Self; - - /// This method called on every update message. - fn update(&mut self, msg: Self::Message); - - /// This method called on when a new bridge created. - fn connected(&mut self, _id: HandlerId) {} - - /// This method called on every incoming message. - fn handle_input(&mut self, msg: Self::Input, id: HandlerId); - - /// This method called on when a new bridge destroyed. - fn disconnected(&mut self, _id: HandlerId) {} - - /// This method called when the agent is destroyed. - fn destroy(&mut self) {} - - /// Represents the name of loading resorce for remote workers which - /// have to live in a separate files. - fn name_of_resource() -> &'static str { - "main.js" - } - - /// Signifies if resource is a module. - /// This has pending browser support. - fn is_module() -> bool { - false - } -} - -/// This struct holds a reference to a component and to a global scheduler. -pub struct AgentScope { - shared_agent: Shared>, -} - -impl fmt::Debug for AgentScope { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("AgentScope<_>") - } -} - -impl Clone for AgentScope { - fn clone(&self) -> Self { - AgentScope { - shared_agent: self.shared_agent.clone(), - } - } -} - -impl AgentScope { - /// Create agent scope - pub fn new() -> Self { - let shared_agent = Rc::new(RefCell::new(AgentRunnable::new())); - AgentScope { shared_agent } - } - /// Schedule message for sending to agent - pub fn send(&self, update: AgentLifecycleEvent) { - let envelope = AgentEnvelope { - shared_agent: self.shared_agent.clone(), - update, - }; - let runnable: Box = Box::new(envelope); - scheduler().push(runnable); - } -} - -impl Default for AgentScope { - fn default() -> Self { - Self::new() - } -} - -/// Defines communication from Worker to Consumers -pub trait Responder { - /// Implementation for communication channel from Worker to Consumers - fn respond(&self, id: HandlerId, output: AGN::Output); -} - -struct WorkerResponder {} - -impl Responder for WorkerResponder -{ - fn respond(&self, id: HandlerId, output: AGN::Output) { - let msg = FromWorker::ProcessOutput(id, output); - let data = msg.pack(); - cfg_match! { - feature = "std_web" => js! { - var data = @{data}; - self.postMessage(data); - }, - feature = "web_sys" => worker_self().post_message_vec(data), - }; - } -} - -/// Link to agent's scope for creating callbacks. -pub struct AgentLink { - scope: AgentScope, - responder: Rc>, -} - -impl AgentLink { - /// Create link for a scope. - pub fn connect(scope: &AgentScope, responder: T) -> Self - where - T: Responder + 'static, - { - AgentLink { - scope: scope.clone(), - responder: Rc::new(responder), - } - } - - /// Send response to an agent. - pub fn respond(&self, id: HandlerId, output: AGN::Output) { - self.responder.respond(id, output); - } - - /// Create a callback which will send a message to the agent when invoked. - pub fn callback(&self, function: F) -> Callback - where - F: Fn(IN) -> AGN::Message + 'static, - { - let scope = self.scope.clone(); - let closure = move |input| { - let output = function(input); - scope.send(AgentLifecycleEvent::Message(output)); - }; - closure.into() - } -} - -impl fmt::Debug for AgentLink { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("AgentLink<_>") - } -} - -impl Clone for AgentLink { - fn clone(&self) -> Self { - AgentLink { - scope: self.scope.clone(), - responder: self.responder.clone(), - } - } -} - -struct AgentRunnable { - agent: Option, - // TODO(#939): Use agent field to control create message this flag - destroyed: bool, -} - -impl AgentRunnable { - fn new() -> Self { - AgentRunnable { - agent: None, - destroyed: false, - } - } -} - -/// Local Agent messages -#[derive(Debug)] -pub enum AgentLifecycleEvent { - /// Request to create link - Create(AgentLink), - /// Internal Agent message - Message(AGN::Message), - /// Client connected - Connected(HandlerId), - /// Received mesasge from Client - Input(AGN::Input, HandlerId), - /// Client disconnected - Disconnected(HandlerId), - /// Request to destroy agent - Destroy, -} - -struct AgentEnvelope { - shared_agent: Shared>, - update: AgentLifecycleEvent, -} - -impl Runnable for AgentEnvelope -where - AGN: Agent, -{ - fn run(self: Box) { - let mut this = self.shared_agent.borrow_mut(); - if this.destroyed { - return; - } - match self.update { - AgentLifecycleEvent::Create(link) => { - this.agent = Some(AGN::create(link)); - } - AgentLifecycleEvent::Message(msg) => { - this.agent - .as_mut() - .expect("agent was not created to process messages") - .update(msg); - } - AgentLifecycleEvent::Connected(id) => { - this.agent - .as_mut() - .expect("agent was not created to send a connected message") - .connected(id); - } - AgentLifecycleEvent::Input(inp, id) => { - this.agent - .as_mut() - .expect("agent was not created to process inputs") - .handle_input(inp, id); - } - AgentLifecycleEvent::Disconnected(id) => { - this.agent - .as_mut() - .expect("agent was not created to send a disconnected message") - .disconnected(id); - } - AgentLifecycleEvent::Destroy => { - let mut agent = this - .agent - .take() - .expect("trying to destroy not existent agent"); - agent.destroy(); - } - } - } -} - -#[cfg(feature = "web_sys")] -fn worker_new(name_of_resource: &str, is_module: bool) -> Worker { - let href = utils::document().location().unwrap().href().unwrap(); - let script_url = format!("{}{}", href, name_of_resource); - let wasm_url = format!("{}{}", href, name_of_resource.replace(".js", "_bg.wasm")); - let array = Array::new(); - array.push( - &format!( - r#"importScripts("{}");wasm_bindgen("{}");"#, - script_url, wasm_url - ) - .into(), - ); - let blob = Blob::new_with_str_sequence_and_options( - &array, - BlobPropertyBag::new().type_("application/javascript"), - ) - .unwrap(); - let url = Url::create_object_url_with_blob(&blob).unwrap(); - - if is_module { - let options = WorkerOptions::new(); - Reflect::set( - options.as_ref(), - &JsValue::from_str("type"), - &JsValue::from_str("module"), - ) - .unwrap(); - Worker::new_with_options(&url, &options).expect("failed to spawn worker") - } else { - Worker::new(&url).expect("failed to spawn worker") - } -} - -#[cfg(feature = "web_sys")] -fn worker_self() -> DedicatedWorkerGlobalScope { - JsValue::from(js_sys::global()).into() -} - -#[cfg(feature = "web_sys")] -trait WorkerExt { - fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec)); - - fn post_message_vec(&self, data: Vec); -} - -#[cfg(feature = "web_sys")] -macro_rules! worker_ext_impl { - ($($type:ident),+) => {$( - impl WorkerExt for $type { - fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec)) { - let handler = move |message: MessageEvent| { - let data = Uint8Array::from(message.data()).to_vec(); - handler(data); - }; - let closure = Closure::wrap(Box::new(handler) as Box); - self.set_onmessage(Some(closure.as_ref().unchecked_ref())); - closure.forget(); - } - - fn post_message_vec(&self, data: Vec) { - self.post_message(&Uint8Array::from(data.as_slice())) - .expect("failed to post message"); - } - } - )+}; -} - -#[cfg(feature = "web_sys")] -worker_ext_impl! { - Worker, DedicatedWorkerGlobalScope -} diff --git a/yew/src/agent/link.rs b/yew/src/agent/link.rs new file mode 100644 index 00000000000..81d9573b9a8 --- /dev/null +++ b/yew/src/agent/link.rs @@ -0,0 +1,190 @@ +use super::*; +use crate::callback::Callback; +use crate::scheduler::{scheduler, Runnable, Shared}; +use std::cell::RefCell; +use std::fmt; +use std::rc::Rc; + +/// Defines communication from Worker to Consumers +pub(crate) trait Responder { + /// Implementation for communication channel from Worker to Consumers + fn respond(&self, id: HandlerId, output: AGN::Output); +} + +/// Link to agent's scope for creating callbacks. +pub struct AgentLink { + scope: AgentScope, + responder: Rc>, +} + +impl AgentLink { + /// Create link for a scope. + pub(crate) fn connect(scope: &AgentScope, responder: T) -> Self + where + T: Responder + 'static, + { + AgentLink { + scope: scope.clone(), + responder: Rc::new(responder), + } + } + + /// Send response to an agent. + pub fn respond(&self, id: HandlerId, output: AGN::Output) { + self.responder.respond(id, output); + } + + /// Create a callback which will send a message to the agent when invoked. + pub fn callback(&self, function: F) -> Callback + where + F: Fn(IN) -> AGN::Message + 'static, + { + let scope = self.scope.clone(); + let closure = move |input| { + let output = function(input); + scope.send(AgentLifecycleEvent::Message(output)); + }; + closure.into() + } +} + +impl fmt::Debug for AgentLink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("AgentLink<_>") + } +} + +impl Clone for AgentLink { + fn clone(&self) -> Self { + AgentLink { + scope: self.scope.clone(), + responder: self.responder.clone(), + } + } +} +/// This struct holds a reference to a component and to a global scheduler. +pub(crate) struct AgentScope { + shared_agent: Shared>, +} + +impl fmt::Debug for AgentScope { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("AgentScope<_>") + } +} + +impl Clone for AgentScope { + fn clone(&self) -> Self { + AgentScope { + shared_agent: self.shared_agent.clone(), + } + } +} + +impl AgentScope { + /// Create agent scope + pub fn new() -> Self { + let shared_agent = Rc::new(RefCell::new(AgentRunnable::new())); + AgentScope { shared_agent } + } + /// Schedule message for sending to agent + pub fn send(&self, update: AgentLifecycleEvent) { + let envelope = AgentEnvelope { + shared_agent: self.shared_agent.clone(), + update, + }; + let runnable: Box = Box::new(envelope); + scheduler().push(runnable); + } +} + +impl Default for AgentScope { + fn default() -> Self { + Self::new() + } +} + +struct AgentRunnable { + agent: Option, + // TODO(#939): Use agent field to control create message this flag + destroyed: bool, +} + +impl AgentRunnable { + fn new() -> Self { + AgentRunnable { + agent: None, + destroyed: false, + } + } +} + +/// Local Agent messages +#[derive(Debug)] +pub(crate) enum AgentLifecycleEvent { + /// Request to create link + Create(AgentLink), + /// Internal Agent message + Message(AGN::Message), + /// Client connected + Connected(HandlerId), + /// Received mesasge from Client + Input(AGN::Input, HandlerId), + /// Client disconnected + Disconnected(HandlerId), + /// Request to destroy agent + Destroy, +} + +struct AgentEnvelope { + shared_agent: Shared>, + update: AgentLifecycleEvent, +} + +impl Runnable for AgentEnvelope +where + AGN: Agent, +{ + fn run(self: Box) { + let mut this = self.shared_agent.borrow_mut(); + if this.destroyed { + return; + } + match self.update { + AgentLifecycleEvent::Create(link) => { + this.agent = Some(AGN::create(link)); + } + AgentLifecycleEvent::Message(msg) => { + this.agent + .as_mut() + .expect("agent was not created to process messages") + .update(msg); + } + AgentLifecycleEvent::Connected(id) => { + this.agent + .as_mut() + .expect("agent was not created to send a connected message") + .connected(id); + } + AgentLifecycleEvent::Input(inp, id) => { + this.agent + .as_mut() + .expect("agent was not created to process inputs") + .handle_input(inp, id); + } + AgentLifecycleEvent::Disconnected(id) => { + this.agent + .as_mut() + .expect("agent was not created to send a disconnected message") + .disconnected(id); + } + AgentLifecycleEvent::Destroy => { + let mut agent = this + .agent + .take() + .expect("trying to destroy not existent agent"); + agent.destroy(); + } + } + } +} \ No newline at end of file diff --git a/yew/src/agent/local/context.rs b/yew/src/agent/local/context.rs new file mode 100644 index 00000000000..e6e75924659 --- /dev/null +++ b/yew/src/agent/local/context.rs @@ -0,0 +1,140 @@ +use super::*; +use crate::callback::Callback; +use crate::scheduler::Shared; +use crate::agent::SyncAgent; +use anymap::{self, AnyMap}; +use slab::Slab; +use std::cell::RefCell; +use std::rc::Rc; +use std::marker::PhantomData; + +thread_local! { + static LOCAL_AGENTS_POOL: RefCell = RefCell::new(AnyMap::new()); +} + +/// Create a single instance in the current thread. +#[allow(missing_debug_implementations)] +pub struct Context { + _agent: PhantomData +} + +impl Discoverer for Context { + type Agent = AGN; + + fn spawn_or_join(callback: Option>) -> Box> + { + let mut scope_to_init = None; + let bridge = LOCAL_AGENTS_POOL.with(|pool| { + let mut pool = pool.borrow_mut(); + match pool.entry::>() { + anymap::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback), + anymap::Entry::Vacant(entry) => { + let scope = AgentScope::::new(); + let launched = LocalAgent::new(&scope); + let responder = SlabResponder { + slab: launched.slab(), + }; + scope_to_init = Some((scope, responder)); + entry.insert(launched).create_bridge(callback) + } + } + }); + if let Some((scope, responder)) = scope_to_init { + let agent_link = AgentLink::connect(&scope, responder); + let upd = AgentLifecycleEvent::Create(agent_link); + scope.send(upd); + } + let upd = AgentLifecycleEvent::Connected(bridge.id); + bridge.scope.send(upd); + Box::new(bridge) + } +} + +struct SlabResponder { + slab: Shared>>>, +} + +impl Responder for SlabResponder { + fn respond(&self, id: HandlerId, output: AGN::Output) { + locate_callback_and_respond::(&self.slab, id, output); + } +} + +impl Dispatchable for Context {} + +struct ContextBridge { + scope: AgentScope, + id: HandlerId, +} + +impl Bridge for ContextBridge { + fn send(&mut self, msg: AGN::Input) { + let upd = AgentLifecycleEvent::Input(msg, self.id); + self.scope.send(upd); + } +} + +impl Drop for ContextBridge { + fn drop(&mut self) { + let terminate_worker = LOCAL_AGENTS_POOL.with(|pool| { + let mut pool = pool.borrow_mut(); + let terminate_worker = { + if let Some(launched) = pool.get_mut::>() { + launched.remove_bridge(self) + } else { + false + } + }; + + if terminate_worker { + pool.remove::>(); + } + + terminate_worker + }); + + let upd = AgentLifecycleEvent::Disconnected(self.id); + self.scope.send(upd); + + if terminate_worker { + let upd = AgentLifecycleEvent::Destroy; + self.scope.send(upd); + } + } +} + +struct LocalAgent { + scope: AgentScope, + slab: SharedOutputSlab, +} + +impl LocalAgent { + pub fn new(scope: &AgentScope) -> Self { + let slab = Rc::new(RefCell::new(Slab::new())); + LocalAgent { + scope: scope.clone(), + slab, + } + } + + fn slab(&self) -> SharedOutputSlab { + self.slab.clone() + } + + fn create_bridge(&mut self, callback: Option>) -> ContextBridge { + let respondable = callback.is_some(); + let mut slab = self.slab.borrow_mut(); + let id: usize = slab.insert(callback); + let id = HandlerId::new(id, respondable); + ContextBridge { + scope: self.scope.clone(), + id, + } + } + + fn remove_bridge(&mut self, bridge: &ContextBridge) -> Last { + let mut slab = self.slab.borrow_mut(); + let _ = slab.remove(bridge.id.raw_id()); + slab.is_empty() + } +} \ No newline at end of file diff --git a/yew/src/agent/local/job.rs b/yew/src/agent/local/job.rs new file mode 100644 index 00000000000..faca62ebb34 --- /dev/null +++ b/yew/src/agent/local/job.rs @@ -0,0 +1,61 @@ +use super::*; +use crate::callback::Callback; +use crate::agent::SyncAgent; +use std::marker::PhantomData; + +const SINGLETON_ID: HandlerId = HandlerId(0, true); + +/// Create an instance in the current thread. +#[allow(missing_debug_implementations)] +pub struct Job { + _agent: PhantomData +} + +impl Discoverer for Job { + type Agent = AGN; + + fn spawn_or_join(callback: Option>) -> Box> + { + let callback = callback.expect("Callback required for Job"); + let scope = AgentScope::::new(); + let responder = CallbackResponder { callback }; + let agent_link = AgentLink::connect(&scope, responder); + let upd = AgentLifecycleEvent::Create(agent_link); + scope.send(upd); + let upd = AgentLifecycleEvent::Connected(SINGLETON_ID); + scope.send(upd); + let bridge = JobBridge { scope }; + Box::new(bridge) + } +} + +struct JobBridge { + scope: AgentScope, +} + +impl Bridge for JobBridge { + fn send(&mut self, msg: AGN::Input) { + let upd = AgentLifecycleEvent::Input(msg, SINGLETON_ID); + self.scope.send(upd); + } +} + +impl Drop for JobBridge { + fn drop(&mut self) { + let upd = AgentLifecycleEvent::Disconnected(SINGLETON_ID); + self.scope.send(upd); + let upd = AgentLifecycleEvent::Destroy; + self.scope.send(upd); + } +} + +struct CallbackResponder { + callback: Callback, +} + +impl Responder for CallbackResponder { + fn respond(&self, id: HandlerId, output: AGN::Output) { + assert_eq!(id.raw_id(), SINGLETON_ID.raw_id()); + self.callback.emit(output); + } +} \ No newline at end of file diff --git a/yew/src/agent/local/mod.rs b/yew/src/agent/local/mod.rs new file mode 100644 index 00000000000..b56be911a64 --- /dev/null +++ b/yew/src/agent/local/mod.rs @@ -0,0 +1,7 @@ +mod context; +mod job; + +use super::*; + +pub use context::Context; +pub use job::Job; \ No newline at end of file diff --git a/yew/src/agent/mod.rs b/yew/src/agent/mod.rs new file mode 100644 index 00000000000..2be480d57c3 --- /dev/null +++ b/yew/src/agent/mod.rs @@ -0,0 +1,117 @@ +//! This module contains types to support multi-threading and state management. + +mod link; +mod local; +mod pool; +mod worker; + +pub use link::AgentLink; +pub(crate) use link::*; +pub use local::{Context, Job}; +pub(crate) use pool::*; +pub use pool::{Dispatched, Dispatcher}; +pub use worker::{Private, Public, Threaded}; + +use crate::callback::Callback; +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::ops::{Deref, DerefMut}; + +// Demand Input and Output serializable on worker agents, +// but no need for that constraint for local ones +pub trait WorkerAgent = Agent +where ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>; + +pub trait SyncAgent = Agent +where ::Input: fmt::Debug; + +/// Declares the behavior of the agent. +pub trait Agent: Sized + 'static { + /// Reach capability of the agent. + type Reach: Discoverer; + /// Type of an input message. + type Message; + /// Incoming message type. + type Input; + /// Outgoing message type. + type Output; + + /// Creates an instance of an agent. + fn create(link: AgentLink) -> Self; + + /// This method called on every update message. + fn update(&mut self, msg: Self::Message); + + /// This method called on when a new bridge created. + fn connected(&mut self, _id: HandlerId) {} + + /// This method called on every incoming message. + fn handle_input(&mut self, msg: Self::Input, id: HandlerId); + + /// This method called on when a new bridge destroyed. + fn disconnected(&mut self, _id: HandlerId) {} + + /// This method called when the agent is destroyed. + fn destroy(&mut self) {} + + /// Represents the name of loading resorce for remote workers which + /// have to live in a separate files. + fn name_of_resource() -> &'static str { + "main.js" + } + + /// Signifies if resource is a module. + /// This has pending browser support. + fn is_module() -> bool { + false + } +} + +/// Id of responses handler. +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash, Clone, Copy)] +pub struct HandlerId(usize, bool); + +impl HandlerId { + fn new(id: usize, respondable: bool) -> Self { + HandlerId(id, respondable) + } + fn raw_id(self) -> usize { + self.0 + } + /// Indicates if a handler id corresponds to callback in the Agent runtime. + pub fn is_respondable(self) -> bool { + self.1 + } +} + +/// Determine a visibility of an agent. +#[doc(hidden)] +pub trait Discoverer { + type Agent: Agent; + + /// Spawns an agent and returns `Bridge` implementation. + fn spawn_or_join(_callback: Option::Output>>) -> Box>; +} + +/// Bridge to a specific kind of worker. +pub trait Bridge { + /// Send a message to an agent. + fn send(&mut self, msg: AGN::Input); +} + +/// This trait allows registering or getting the address of a worker. +pub trait Bridged: Agent + Sized + 'static { + /// Creates a messaging bridge between a worker and the component. + fn bridge(callback: Callback) -> Box>; +} + +impl Bridged for T +where + T: Agent, + ::Reach: Discoverer, +{ + fn bridge(callback: Callback) -> Box> { + Self::Reach::spawn_or_join(Some(callback)) + } +} diff --git a/yew/src/agent/pool.rs b/yew/src/agent/pool.rs new file mode 100644 index 00000000000..ff3a065d6c5 --- /dev/null +++ b/yew/src/agent/pool.rs @@ -0,0 +1,86 @@ +use super::*; +use crate::scheduler::Shared; +use log::warn; +use slab::Slab; + +pub(crate) type Last = bool; + +/// Type alias to a sharable Slab that owns optional callbacks that emit messages of the type of the specified Agent. +pub(crate) type SharedOutputSlab = Shared::Output>>>>; + +/// The slab contains the callback, the id is used to look up the callback, +/// and the output is the message that will be sent via the callback. +pub(crate) fn locate_callback_and_respond( + slab: &SharedOutputSlab, + id: HandlerId, + output: AGN::Output, +) { + let callback = { + let slab = slab.borrow(); + match slab.get(id.raw_id()).cloned() { + Some(callback) => callback, + None => { + warn!("Id of handler does not exist in the slab: {}.", id.raw_id()); + return; + } + } + }; + match callback { + Some(callback) => callback.emit(output), + None => warn!("The Id of the handler: {}, while present in the slab, is not associated with a callback.", id.raw_id()), + } +} + +/// A newtype around a bridge to indicate that it is distinct from a normal bridge +pub struct Dispatcher(pub(crate) Box>); + +impl fmt::Debug for Dispatcher { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Dispatcher<_>") + } +} + +impl Deref for Dispatcher { + type Target = dyn Bridge; + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} +impl DerefMut for Dispatcher { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.deref_mut() + } +} + +/// This trait allows the creation of a dispatcher to an existing agent that will not send replies when messages are sent. +pub trait Dispatched: Agent + Sized + 'static { + /// Creates a dispatcher to the agent that will not send messages back. + /// + /// # Note + /// Dispatchers don't have `HandlerId`s and therefore `Agent::handle` will be supplied `None` + /// for the `id` parameter, and `connected` and `disconnected` will not be called. + /// + /// # Important + /// Because the Agents using Context or Public reaches use the number of existing bridges to + /// keep track of if the agent itself should exist, creating dispatchers will not guarantee that + /// an Agent will exist to service requests sent from Dispatchers. You **must** keep at least one + /// bridge around if you wish to use a dispatcher. If you are using agents in a write-only manner, + /// then it is suggested that you create a bridge that handles no-op responses as high up in the + /// component hierarchy as possible - oftentimes the root component for simplicity's sake. + fn dispatcher() -> Dispatcher; +} + +#[doc(hidden)] +pub trait Dispatchable {} + +impl Dispatched for T +where + T: Agent, + ::Reach: Discoverer, + ::Reach: Dispatchable, +{ + fn dispatcher() -> Dispatcher { + Dispatcher(Self::Reach::spawn_or_join(None)) + } +} diff --git a/yew/src/agent/worker/mod.rs b/yew/src/agent/worker/mod.rs new file mode 100644 index 00000000000..2c665b47376 --- /dev/null +++ b/yew/src/agent/worker/mod.rs @@ -0,0 +1,158 @@ +mod private; +mod public; + +pub use private::Private; +pub use public::Public; + +use super::*; +use cfg_if::cfg_if; +use cfg_match::cfg_match; +use serde::{Deserialize, Serialize}; +cfg_if! { + if #[cfg(feature = "std_web")] { + use stdweb::Value; + #[allow(unused_imports)] + use stdweb::{_js_impl, js}; + } else if #[cfg(feature = "web_sys")] { + use crate::utils; + use js_sys::{Array, Reflect, Uint8Array}; + use wasm_bindgen::{closure::Closure, JsCast, JsValue}; + use web_sys::{Blob, BlobPropertyBag, DedicatedWorkerGlobalScope, MessageEvent, Url, Worker, WorkerOptions}; + } +} + +/// Implements rules to register a worker in a separate thread. +pub trait Threaded { + /// Executes an agent in the current environment. + /// Uses in `main` function of a worker. + fn register(); +} + +/// Message packager, based on serde::Serialize/Deserialize +pub trait Packed { + /// Pack serializable message into Vec + fn pack(&self) -> Vec; + /// Unpack deserializable message of byte slice + fn unpack(data: &[u8]) -> Self; +} + +impl Deserialize<'de>> Packed for T { + fn pack(&self) -> Vec { + bincode::serialize(&self).expect("can't serialize an agent message") + } + + fn unpack(data: &[u8]) -> Self { + bincode::deserialize(&data).expect("can't deserialize an agent message") + } +} + +/// Serializable messages to worker +#[derive(Serialize, Deserialize, Debug)] +enum ToWorker { + /// Client is connected + Connected(HandlerId), + /// Incoming message to Worker + ProcessInput(HandlerId, T), + /// Client is disconnected + Disconnected(HandlerId), + /// Worker should be terminated + Destroy, +} + +/// Serializable messages sent by worker to consumer +#[derive(Serialize, Deserialize, Debug)] +enum FromWorker { + /// Worker sends this message when `wasm` bundle has loaded. + WorkerLoaded, + /// Outgoing message to consumer + ProcessOutput(HandlerId, T), +} + +fn send_to_remote( + #[cfg(feature = "std_web")] worker: &Value, + #[cfg(feature = "web_sys")] worker: &Worker, + msg: ToWorker, +) { + let msg = msg.pack(); + cfg_match! { + feature = "std_web" => js! { + var worker = @{worker}; + var bytes = @{msg}; + worker.postMessage(bytes); + }, + feature = "web_sys" => worker.post_message_vec(msg), + }; +} + +#[cfg(feature = "web_sys")] +fn worker_new(name_of_resource: &str, is_module: bool) -> Worker { + let href = utils::document().location().unwrap().href().unwrap(); + let script_url = format!("{}{}", href, name_of_resource); + let wasm_url = format!("{}{}", href, name_of_resource.replace(".js", "_bg.wasm")); + let array = Array::new(); + array.push( + &format!( + r#"importScripts("{}");wasm_bindgen("{}");"#, + script_url, wasm_url + ) + .into(), + ); + let blob = Blob::new_with_str_sequence_and_options( + &array, + BlobPropertyBag::new().type_("application/javascript"), + ) + .unwrap(); + let url = Url::create_object_url_with_blob(&blob).unwrap(); + + if is_module { + let options = WorkerOptions::new(); + Reflect::set( + options.as_ref(), + &JsValue::from_str("type"), + &JsValue::from_str("module"), + ) + .unwrap(); + Worker::new_with_options(&url, &options).expect("failed to spawn worker") + } else { + Worker::new(&url).expect("failed to spawn worker") + } +} + +#[cfg(feature = "web_sys")] +fn worker_self() -> DedicatedWorkerGlobalScope { + JsValue::from(js_sys::global()).into() +} + +#[cfg(feature = "web_sys")] +trait WorkerExt { + fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec)); + + fn post_message_vec(&self, data: Vec); +} + +#[cfg(feature = "web_sys")] +macro_rules! worker_ext_impl { + ($($type:ident),+) => {$( + impl WorkerExt for $type { + fn set_onmessage_closure(&self, handler: impl 'static + Fn(Vec)) { + let handler = move |message: MessageEvent| { + let data = Uint8Array::from(message.data()).to_vec(); + handler(data); + }; + let closure = Closure::wrap(Box::new(handler) as Box); + self.set_onmessage(Some(closure.as_ref().unchecked_ref())); + closure.forget(); + } + + fn post_message_vec(&self, data: Vec) { + self.post_message(&Uint8Array::from(data.as_slice())) + .expect("failed to post message"); + } + } + )+}; +} + +#[cfg(feature = "web_sys")] +worker_ext_impl! { + Worker, DedicatedWorkerGlobalScope +} \ No newline at end of file diff --git a/yew/src/agent/worker/private.rs b/yew/src/agent/worker/private.rs new file mode 100644 index 00000000000..c0332f69db6 --- /dev/null +++ b/yew/src/agent/worker/private.rs @@ -0,0 +1,116 @@ +use super::*; +use crate::callback::Callback; +use cfg_if::cfg_if; +use cfg_match::cfg_match; +use std::fmt; +use std::marker::PhantomData; +cfg_if! { + if #[cfg(feature = "std_web")] { + use stdweb::Value; + #[allow(unused_imports)] + use stdweb::{_js_impl, js}; + } else if #[cfg(feature = "web_sys")] { + use web_sys::{Worker}; + } +} + +const SINGLETON_ID: HandlerId = HandlerId(0, true); + +/// Create a new instance for every bridge. +#[allow(missing_debug_implementations)] +pub struct Private { + _agent: PhantomData +} + +impl Discoverer for Private +{ + type Agent = AGN; + + fn spawn_or_join(callback: Option>) -> Box> + { + let callback = callback.expect("Callback required for Private agents"); + let handler = move |data: Vec, + #[cfg(feature = "std_web")] worker: Value, + #[cfg(feature = "web_sys")] worker: &Worker| { + let msg = FromWorker::::unpack(&data); + match msg { + FromWorker::WorkerLoaded => { + send_to_remote::(&worker, ToWorker::Connected(SINGLETON_ID)); + } + FromWorker::ProcessOutput(id, output) => { + assert_eq!(id.raw_id(), SINGLETON_ID.raw_id()); + callback.emit(output); + } + } + }; + + // TODO(#947): Drop handler when bridge is dropped + let name_of_resource = AGN::name_of_resource(); + let worker = cfg_match! { + feature = "std_web" => js! { + var worker = new Worker(@{name_of_resource}); + var handler = @{handler}; + worker.onmessage = function(event) { + handler(event.data, worker); + }; + return worker; + }, + feature = "web_sys" => ({ + let worker = worker_new(name_of_resource, AGN::is_module()); + let worker_clone = worker.clone(); + worker.set_onmessage_closure(move |data: Vec| handler(data, &worker_clone)); + worker + }), + }; + let bridge = PrivateBridge { + worker, + _agent: PhantomData, + }; + Box::new(bridge) + } +} + +/// A connection manager for components interaction with workers. +pub struct PrivateBridge { + #[cfg(feature = "std_web")] + worker: Value, + #[cfg(feature = "web_sys")] + worker: Worker, + _agent: PhantomData, +} + +impl fmt::Debug for PrivateBridge { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("PrivateBridge<_>") + } +} + +impl Bridge for PrivateBridge { + fn send(&mut self, msg: AGN::Input) { + // TODO(#937): Important! Implement. + // Use a queue to collect a messages if an instance is not ready + // and send them to an agent when it will reported readiness. + let msg = ToWorker::ProcessInput(SINGLETON_ID, msg).pack(); + cfg_match! { + feature = "std_web" => ({ + let worker = &self.worker; + js! { + var worker = @{worker}; + var bytes = @{msg}; + worker.postMessage(bytes); + }; + }), + feature = "web_sys" => self.worker.post_message_vec(msg), + } + } +} + +impl Drop for PrivateBridge { + fn drop(&mut self) { + let disconnected = ToWorker::Disconnected(SINGLETON_ID); + send_to_remote::(&self.worker, disconnected); + + let destroy = ToWorker::Destroy; + send_to_remote::(&self.worker, destroy); + } +} \ No newline at end of file diff --git a/yew/src/agent/worker/public.rs b/yew/src/agent/worker/public.rs new file mode 100644 index 00000000000..c2997a7ff65 --- /dev/null +++ b/yew/src/agent/worker/public.rs @@ -0,0 +1,310 @@ +use super::*; +use crate::callback::Callback; +use crate::scheduler::Shared; +use anymap::{self, AnyMap}; +use cfg_if::cfg_if; +use cfg_match::cfg_match; +use slab::Slab; +use std::any::TypeId; +use std::cell::RefCell; +use std::collections::{hash_map, HashMap, HashSet}; +use std::fmt; +use std::marker::PhantomData; +use std::rc::Rc; +cfg_if! { + if #[cfg(feature = "std_web")] { + use stdweb::Value; + #[allow(unused_imports)] + use stdweb::{_js_impl, js}; + } else if #[cfg(feature = "web_sys")] { + use super::WorkerExt; + use web_sys::{Worker}; + } +} + +thread_local! { + static REMOTE_AGENTS_POOL: RefCell = RefCell::new(AnyMap::new()); + static REMOTE_AGENTS_LOADED: RefCell> = RefCell::new(HashSet::new()); + static REMOTE_AGENTS_EARLY_MSGS_QUEUE: RefCell>>> = RefCell::new(HashMap::new()); +} + +/// Create a single instance in a tab. +#[allow(missing_debug_implementations)] +pub struct Public { + _agent: PhantomData +} + +impl Discoverer for Public +{ + type Agent = AGN; + + fn spawn_or_join(callback: Option>) -> Box> + { + let bridge = REMOTE_AGENTS_POOL.with(|pool| { + let mut pool = pool.borrow_mut(); + match pool.entry::>() { + anymap::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback), + anymap::Entry::Vacant(entry) => { + let slab: Shared>>> = + Rc::new(RefCell::new(Slab::new())); + let handler = { + let slab = slab.clone(); + move |data: Vec, + #[cfg(feature = "std_web")] worker: Value, + #[cfg(feature = "web_sys")] worker: &Worker| { + let msg = FromWorker::::unpack(&data); + match msg { + FromWorker::WorkerLoaded => { + REMOTE_AGENTS_LOADED.with(|loaded| { + let _ = loaded.borrow_mut().insert(TypeId::of::()); + }); + + REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|queue| { + let mut queue = queue.borrow_mut(); + if let Some(msgs) = queue.get_mut(&TypeId::of::()) { + for msg in msgs.drain(..) { + cfg_match! { + feature = "std_web" => ({ + let worker = &worker; + js! {@{worker}.postMessage(@{msg});}; + }), + feature = "web_sys" => worker.post_message_vec(msg), + } + } + } + }); + } + FromWorker::ProcessOutput(id, output) => { + locate_callback_and_respond::(&slab, id, output); + } + } + } + }; + let name_of_resource = AGN::name_of_resource(); + let worker = cfg_match! { + feature = "std_web" => js! { + var worker = new Worker(@{name_of_resource}); + var handler = @{handler}; + worker.onmessage = function(event) { + handler(event.data, worker); + }; + return worker; + }, + feature = "web_sys" => ({ + let worker = worker_new(name_of_resource, AGN::is_module()); + let worker_clone = worker.clone(); + worker.set_onmessage_closure(move |data: Vec| { + handler(data, &worker_clone); + }); + worker + }), + }; + let launched = RemoteAgent::new(worker, slab); + entry.insert(launched).create_bridge(callback) + } + } + }); + Box::new(bridge) + } +} + +impl Dispatchable for Public {} + +/// A connection manager for components interaction with workers. +pub struct PublicBridge { + #[cfg(feature = "std_web")] + worker: Value, + #[cfg(feature = "web_sys")] + worker: Worker, + id: HandlerId, + _agent: PhantomData, +} + +impl fmt::Debug for PublicBridge { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("PublicBridge<_>") + } +} + +impl PublicBridge { + fn worker_is_loaded(&self) -> bool { + REMOTE_AGENTS_LOADED.with(|loaded| loaded.borrow().contains(&TypeId::of::())) + } + + fn msg_to_queue(&self, msg: Vec) { + REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|queue| { + let mut queue = queue.borrow_mut(); + match queue.entry(TypeId::of::()) { + hash_map::Entry::Vacant(record) => { + record.insert(vec![msg]); + } + hash_map::Entry::Occupied(ref mut record) => { + record.get_mut().push(msg); + } + } + }); + } + + /// Send a message to the worker, queuing it up if necessary + fn send_message(&self, msg: ToWorker) { + if self.worker_is_loaded() { + send_to_remote::(&self.worker, msg); + } else { + self.msg_to_queue(msg.pack()); + } + } +} + +impl Bridge for PublicBridge { + fn send(&mut self, msg: AGN::Input) { + let msg = ToWorker::ProcessInput(self.id, msg); + self.send_message(msg); + } +} + +impl Drop for PublicBridge { + fn drop(&mut self) { + let terminate_worker = REMOTE_AGENTS_POOL.with(|pool| { + let mut pool = pool.borrow_mut(); + let terminate_worker = { + if let Some(launched) = pool.get_mut::>() { + launched.remove_bridge(self) + } else { + false + } + }; + + if terminate_worker { + pool.remove::>(); + } + + terminate_worker + }); + + let disconnected = ToWorker::Disconnected(self.id); + self.send_message(disconnected); + + if terminate_worker { + let destroy = ToWorker::Destroy; + self.send_message(destroy); + + REMOTE_AGENTS_LOADED.with(|loaded| { + loaded.borrow_mut().remove(&TypeId::of::()); + }); + + REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|queue| { + queue.borrow_mut().remove(&TypeId::of::()); + }); + } + } +} + +struct WorkerResponder {} + +impl Responder for WorkerResponder { + fn respond(&self, id: HandlerId, output: AGN::Output) { + let msg = FromWorker::ProcessOutput(id, output); + let data = msg.pack(); + cfg_match! { + feature = "std_web" => js! { + var data = @{data}; + self.postMessage(data); + }, + feature = "web_sys" => worker_self().post_message_vec(data), + }; + } +} + +impl Threaded for T +where + T: WorkerAgent>, +{ + fn register() { + let scope = AgentScope::::new(); + let responder = WorkerResponder {}; + let link = AgentLink::connect(&scope, responder); + let upd = AgentLifecycleEvent::Create(link); + scope.send(upd); + let handler = move |data: Vec| { + let msg = ToWorker::::unpack(&data); + match msg { + ToWorker::Connected(id) => { + let upd = AgentLifecycleEvent::Connected(id); + scope.send(upd); + } + ToWorker::ProcessInput(id, value) => { + let upd = AgentLifecycleEvent::Input(value, id); + scope.send(upd); + } + ToWorker::Disconnected(id) => { + let upd = AgentLifecycleEvent::Disconnected(id); + scope.send(upd); + } + ToWorker::Destroy => { + let upd = AgentLifecycleEvent::Destroy; + scope.send(upd); + // Terminates web worker + cfg_match! { + feature = "std_web" => js! { self.close(); }, + feature = "web_sys" => worker_self().close(), + }; + } + } + }; + let loaded: FromWorker = FromWorker::WorkerLoaded; + let loaded = loaded.pack(); + cfg_match! { + feature = "std_web" => js! { + var handler = @{handler}; + self.onmessage = function(event) { + handler(event.data); + }; + self.postMessage(@{loaded}); + }, + feature = "web_sys" => ({ + let worker = worker_self(); + worker.set_onmessage_closure(handler); + worker.post_message_vec(loaded); + }), + }; + } +} + +struct RemoteAgent { + #[cfg(feature = "std_web")] + worker: Value, + #[cfg(feature = "web_sys")] + worker: Worker, + slab: SharedOutputSlab, +} + +impl RemoteAgent { + pub fn new( + #[cfg(feature = "std_web")] worker: Value, + #[cfg(feature = "web_sys")] worker: Worker, + slab: SharedOutputSlab, + ) -> Self { + RemoteAgent { worker, slab } + } + + fn create_bridge(&mut self, callback: Option>) -> PublicBridge { + let respondable = callback.is_some(); + let mut slab = self.slab.borrow_mut(); + let id: usize = slab.insert(callback); + let id = HandlerId::new(id, respondable); + let bridge = PublicBridge { + worker: self.worker.clone(), + id, + _agent: PhantomData, + }; + bridge.send_message(ToWorker::Connected(bridge.id)); + + bridge + } + + fn remove_bridge(&mut self, bridge: &PublicBridge) -> Last { + let mut slab = self.slab.borrow_mut(); + let _ = slab.remove(bridge.id.raw_id()); + slab.is_empty() + } +} \ No newline at end of file diff --git a/yew/src/lib.rs b/yew/src/lib.rs index 9eae59e42ae..6d77117d7bd 100644 --- a/yew/src/lib.rs +++ b/yew/src/lib.rs @@ -206,7 +206,7 @@ where /// ``` pub mod prelude { #[cfg(feature = "agent")] - pub use crate::agent::{Bridge, Bridged, Threaded}; + pub use crate::agent::{Bridge, Bridged, Dispatched, Threaded}; pub use crate::app::App; pub use crate::callback::Callback; pub use crate::events::*; @@ -221,7 +221,7 @@ pub mod prelude { #[cfg(feature = "agent")] pub mod worker { pub use crate::agent::{ - Agent, AgentLink, Bridge, Bridged, Context, Global, HandlerId, Job, Private, Public, + Agent, AgentLink, Bridge, Bridged, Context, HandlerId, Job, Private, Public, }; } } From df362edd69da1240f60fca3d8d0fd8c71e9043f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kawalec?= Date: Mon, 11 May 2020 14:38:44 +0200 Subject: [PATCH 04/11] no trait aliases :( --- yew/src/agent/local/context.rs | 8 ++-- yew/src/agent/local/job.rs | 6 ++- yew/src/agent/mod.rs | 9 ----- yew/src/agent/worker/mod.rs | 8 +++- yew/src/agent/worker/private.rs | 31 ++++++++++++--- yew/src/agent/worker/public.rs | 67 +++++++++++++++++++++++++-------- yew/src/lib.rs | 2 - 7 files changed, 92 insertions(+), 39 deletions(-) diff --git a/yew/src/agent/local/context.rs b/yew/src/agent/local/context.rs index e6e75924659..7e8b310e9b6 100644 --- a/yew/src/agent/local/context.rs +++ b/yew/src/agent/local/context.rs @@ -1,7 +1,6 @@ use super::*; use crate::callback::Callback; use crate::scheduler::Shared; -use crate::agent::SyncAgent; use anymap::{self, AnyMap}; use slab::Slab; use std::cell::RefCell; @@ -18,7 +17,10 @@ pub struct Context { _agent: PhantomData } -impl Discoverer for Context { +impl Discoverer for Context +where AGN: Agent, + ::Input: fmt::Debug +{ type Agent = AGN; fn spawn_or_join(callback: Option>) -> Box> @@ -60,7 +62,7 @@ impl Responder for SlabResponder { } } -impl Dispatchable for Context {} +impl Dispatchable for Context {} struct ContextBridge { scope: AgentScope, diff --git a/yew/src/agent/local/job.rs b/yew/src/agent/local/job.rs index faca62ebb34..ccffc055df5 100644 --- a/yew/src/agent/local/job.rs +++ b/yew/src/agent/local/job.rs @@ -1,6 +1,5 @@ use super::*; use crate::callback::Callback; -use crate::agent::SyncAgent; use std::marker::PhantomData; const SINGLETON_ID: HandlerId = HandlerId(0, true); @@ -11,7 +10,10 @@ pub struct Job { _agent: PhantomData } -impl Discoverer for Job { +impl Discoverer for Job +where AGN: Agent, + ::Input: fmt::Debug +{ type Agent = AGN; fn spawn_or_join(callback: Option>) -> Box> diff --git a/yew/src/agent/mod.rs b/yew/src/agent/mod.rs index 2be480d57c3..ab6eee4aa7a 100644 --- a/yew/src/agent/mod.rs +++ b/yew/src/agent/mod.rs @@ -17,15 +17,6 @@ use serde::{Deserialize, Serialize}; use std::fmt; use std::ops::{Deref, DerefMut}; -// Demand Input and Output serializable on worker agents, -// but no need for that constraint for local ones -pub trait WorkerAgent = Agent -where ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de>; - -pub trait SyncAgent = Agent -where ::Input: fmt::Debug; - /// Declares the behavior of the agent. pub trait Agent: Sized + 'static { /// Reach capability of the agent. diff --git a/yew/src/agent/worker/mod.rs b/yew/src/agent/worker/mod.rs index 2c665b47376..e5ce422c9ec 100644 --- a/yew/src/agent/worker/mod.rs +++ b/yew/src/agent/worker/mod.rs @@ -68,11 +68,15 @@ enum FromWorker { ProcessOutput(HandlerId, T), } -fn send_to_remote( +fn send_to_remote( #[cfg(feature = "std_web")] worker: &Value, #[cfg(feature = "web_sys")] worker: &Worker, msg: ToWorker, -) { +) +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ let msg = msg.pack(); cfg_match! { feature = "std_web" => js! { diff --git a/yew/src/agent/worker/private.rs b/yew/src/agent/worker/private.rs index c0332f69db6..323e8eb4eb5 100644 --- a/yew/src/agent/worker/private.rs +++ b/yew/src/agent/worker/private.rs @@ -22,7 +22,10 @@ pub struct Private { _agent: PhantomData } -impl Discoverer for Private +impl Discoverer for Private +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> { type Agent = AGN; @@ -71,21 +74,33 @@ impl Discoverer for Private } /// A connection manager for components interaction with workers. -pub struct PrivateBridge { +pub struct PrivateBridge +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ #[cfg(feature = "std_web")] worker: Value, #[cfg(feature = "web_sys")] worker: Worker, - _agent: PhantomData, + _agent: PhantomData, } -impl fmt::Debug for PrivateBridge { +impl fmt::Debug for PrivateBridge +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("PrivateBridge<_>") } } -impl Bridge for PrivateBridge { +impl Bridge for PrivateBridge +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ fn send(&mut self, msg: AGN::Input) { // TODO(#937): Important! Implement. // Use a queue to collect a messages if an instance is not ready @@ -105,7 +120,11 @@ impl Bridge for PrivateBridge { } } -impl Drop for PrivateBridge { +impl Drop for PrivateBridge +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ fn drop(&mut self) { let disconnected = ToWorker::Disconnected(SINGLETON_ID); send_to_remote::(&self.worker, disconnected); diff --git a/yew/src/agent/worker/public.rs b/yew/src/agent/worker/public.rs index c2997a7ff65..9f469a0af28 100644 --- a/yew/src/agent/worker/public.rs +++ b/yew/src/agent/worker/public.rs @@ -34,7 +34,10 @@ pub struct Public { _agent: PhantomData } -impl Discoverer for Public +impl Discoverer for Public +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> { type Agent = AGN; @@ -108,10 +111,14 @@ impl Discoverer for Public } } -impl Dispatchable for Public {} +impl Dispatchable for Public {} /// A connection manager for components interaction with workers. -pub struct PublicBridge { +pub struct PublicBridge +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ #[cfg(feature = "std_web")] worker: Value, #[cfg(feature = "web_sys")] @@ -120,13 +127,21 @@ pub struct PublicBridge { _agent: PhantomData, } -impl fmt::Debug for PublicBridge { +impl fmt::Debug for PublicBridge +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("PublicBridge<_>") } } -impl PublicBridge { +impl PublicBridge +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ fn worker_is_loaded(&self) -> bool { REMOTE_AGENTS_LOADED.with(|loaded| loaded.borrow().contains(&TypeId::of::())) } @@ -155,14 +170,22 @@ impl PublicBridge { } } -impl Bridge for PublicBridge { +impl Bridge for PublicBridge +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ fn send(&mut self, msg: AGN::Input) { let msg = ToWorker::ProcessInput(self.id, msg); self.send_message(msg); } } -impl Drop for PublicBridge { +impl Drop for PublicBridge +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ fn drop(&mut self) { let terminate_worker = REMOTE_AGENTS_POOL.with(|pool| { let mut pool = pool.borrow_mut(); @@ -201,7 +224,11 @@ impl Drop for PublicBridge { struct WorkerResponder {} -impl Responder for WorkerResponder { +impl Responder for WorkerResponder +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ fn respond(&self, id: HandlerId, output: AGN::Output) { let msg = FromWorker::ProcessOutput(id, output); let data = msg.pack(); @@ -215,18 +242,20 @@ impl Responder for WorkerResponder { } } -impl Threaded for T +impl Threaded for AGN where - T: WorkerAgent>, + AGN: Agent>, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> { fn register() { - let scope = AgentScope::::new(); + let scope = AgentScope::::new(); let responder = WorkerResponder {}; let link = AgentLink::connect(&scope, responder); let upd = AgentLifecycleEvent::Create(link); scope.send(upd); let handler = move |data: Vec| { - let msg = ToWorker::::unpack(&data); + let msg = ToWorker::::unpack(&data); match msg { ToWorker::Connected(id) => { let upd = AgentLifecycleEvent::Connected(id); @@ -251,7 +280,7 @@ where } } }; - let loaded: FromWorker = FromWorker::WorkerLoaded; + let loaded: FromWorker = FromWorker::WorkerLoaded; let loaded = loaded.pack(); cfg_match! { feature = "std_web" => js! { @@ -270,7 +299,11 @@ where } } -struct RemoteAgent { +struct RemoteAgent +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ #[cfg(feature = "std_web")] worker: Value, #[cfg(feature = "web_sys")] @@ -278,7 +311,11 @@ struct RemoteAgent { slab: SharedOutputSlab, } -impl RemoteAgent { +impl RemoteAgent +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ pub fn new( #[cfg(feature = "std_web")] worker: Value, #[cfg(feature = "web_sys")] worker: Worker, diff --git a/yew/src/lib.rs b/yew/src/lib.rs index 6d77117d7bd..dc9a6b199e2 100644 --- a/yew/src/lib.rs +++ b/yew/src/lib.rs @@ -1,8 +1,6 @@ #![allow(clippy::needless_doctest_main)] #![doc(html_logo_url = "https://static.yew.rs/logo.svg")] -#![feature(trait_alias)] - //! # Yew Framework - API Documentation //! //! Yew is a modern Rust framework for creating multi-threaded front-end web apps with WebAssembly From b7f182915d824f9aab3205d7272c9903fb7125fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kawalec?= Date: Mon, 11 May 2020 15:11:58 +0200 Subject: [PATCH 05/11] specialized the types for Drop a bit --- yew/src/agent/worker/public.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/yew/src/agent/worker/public.rs b/yew/src/agent/worker/public.rs index 9f469a0af28..31e918548b3 100644 --- a/yew/src/agent/worker/public.rs +++ b/yew/src/agent/worker/public.rs @@ -111,7 +111,11 @@ where AGN: Agent, } } -impl Dispatchable for Public {} +impl Dispatchable for Public +where AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de> +{ } /// A connection manager for components interaction with workers. pub struct PublicBridge From 5d12b1b55188d09795fb10ac427b337074367dae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kawalec?= Date: Mon, 11 May 2020 17:51:40 +0200 Subject: [PATCH 06/11] added an additional constraint on an agent --- .travis.yml | 2 +- yew/src/agent/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index e4dfcaebce7..1185229bdc8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,7 @@ before_cache: - ./ci/clear_cache.sh rust: - - 1.40.0 # min supported + - 1.43.0 # min supported - stable - beta diff --git a/yew/src/agent/mod.rs b/yew/src/agent/mod.rs index ab6eee4aa7a..071c898aac3 100644 --- a/yew/src/agent/mod.rs +++ b/yew/src/agent/mod.rs @@ -20,7 +20,7 @@ use std::ops::{Deref, DerefMut}; /// Declares the behavior of the agent. pub trait Agent: Sized + 'static { /// Reach capability of the agent. - type Reach: Discoverer; + type Reach: Discoverer; /// Type of an input message. type Message; /// Incoming message type. From 218567bece7d1fb2eb358c474f781a108e20d3d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kawalec?= Date: Mon, 11 May 2020 18:04:06 +0200 Subject: [PATCH 07/11] emitted with fmt fixes --- yew/src/agent/local/context.rs | 14 +++--- yew/src/agent/local/job.rs | 12 ++--- yew/src/agent/local/mod.rs | 2 +- yew/src/agent/mod.rs | 4 +- yew/src/agent/worker/mod.rs | 10 ++-- yew/src/agent/worker/private.rs | 42 +++++++++-------- yew/src/agent/worker/public.rs | 82 ++++++++++++++++++--------------- 7 files changed, 91 insertions(+), 75 deletions(-) diff --git a/yew/src/agent/local/context.rs b/yew/src/agent/local/context.rs index 7e8b310e9b6..c5bad2d36c5 100644 --- a/yew/src/agent/local/context.rs +++ b/yew/src/agent/local/context.rs @@ -4,8 +4,8 @@ use crate::scheduler::Shared; use anymap::{self, AnyMap}; use slab::Slab; use std::cell::RefCell; -use std::rc::Rc; use std::marker::PhantomData; +use std::rc::Rc; thread_local! { static LOCAL_AGENTS_POOL: RefCell = RefCell::new(AnyMap::new()); @@ -14,17 +14,17 @@ thread_local! { /// Create a single instance in the current thread. #[allow(missing_debug_implementations)] pub struct Context { - _agent: PhantomData + _agent: PhantomData, } impl Discoverer for Context -where AGN: Agent, - ::Input: fmt::Debug +where + AGN: Agent, + ::Input: fmt::Debug, { type Agent = AGN; - fn spawn_or_join(callback: Option>) -> Box> - { + fn spawn_or_join(callback: Option>) -> Box> { let mut scope_to_init = None; let bridge = LOCAL_AGENTS_POOL.with(|pool| { let mut pool = pool.borrow_mut(); @@ -139,4 +139,4 @@ impl LocalAgent { let _ = slab.remove(bridge.id.raw_id()); slab.is_empty() } -} \ No newline at end of file +} diff --git a/yew/src/agent/local/job.rs b/yew/src/agent/local/job.rs index ccffc055df5..ad35ee6c6d0 100644 --- a/yew/src/agent/local/job.rs +++ b/yew/src/agent/local/job.rs @@ -7,17 +7,17 @@ const SINGLETON_ID: HandlerId = HandlerId(0, true); /// Create an instance in the current thread. #[allow(missing_debug_implementations)] pub struct Job { - _agent: PhantomData + _agent: PhantomData, } impl Discoverer for Job -where AGN: Agent, - ::Input: fmt::Debug +where + AGN: Agent, + ::Input: fmt::Debug, { type Agent = AGN; - fn spawn_or_join(callback: Option>) -> Box> - { + fn spawn_or_join(callback: Option>) -> Box> { let callback = callback.expect("Callback required for Job"); let scope = AgentScope::::new(); let responder = CallbackResponder { callback }; @@ -60,4 +60,4 @@ impl Responder for CallbackResponder { assert_eq!(id.raw_id(), SINGLETON_ID.raw_id()); self.callback.emit(output); } -} \ No newline at end of file +} diff --git a/yew/src/agent/local/mod.rs b/yew/src/agent/local/mod.rs index b56be911a64..27aca55e740 100644 --- a/yew/src/agent/local/mod.rs +++ b/yew/src/agent/local/mod.rs @@ -4,4 +4,4 @@ mod job; use super::*; pub use context::Context; -pub use job::Job; \ No newline at end of file +pub use job::Job; diff --git a/yew/src/agent/mod.rs b/yew/src/agent/mod.rs index 071c898aac3..b9ddedb7901 100644 --- a/yew/src/agent/mod.rs +++ b/yew/src/agent/mod.rs @@ -82,7 +82,9 @@ pub trait Discoverer { type Agent: Agent; /// Spawns an agent and returns `Bridge` implementation. - fn spawn_or_join(_callback: Option::Output>>) -> Box>; + fn spawn_or_join( + _callback: Option::Output>>, + ) -> Box>; } /// Bridge to a specific kind of worker. diff --git a/yew/src/agent/worker/mod.rs b/yew/src/agent/worker/mod.rs index e5ce422c9ec..62074352c24 100644 --- a/yew/src/agent/worker/mod.rs +++ b/yew/src/agent/worker/mod.rs @@ -72,10 +72,10 @@ fn send_to_remote( #[cfg(feature = "std_web")] worker: &Value, #[cfg(feature = "web_sys")] worker: &Worker, msg: ToWorker, -) -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +) where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { let msg = msg.pack(); cfg_match! { @@ -159,4 +159,4 @@ macro_rules! worker_ext_impl { #[cfg(feature = "web_sys")] worker_ext_impl! { Worker, DedicatedWorkerGlobalScope -} \ No newline at end of file +} diff --git a/yew/src/agent/worker/private.rs b/yew/src/agent/worker/private.rs index 323e8eb4eb5..9bac871c007 100644 --- a/yew/src/agent/worker/private.rs +++ b/yew/src/agent/worker/private.rs @@ -19,18 +19,18 @@ const SINGLETON_ID: HandlerId = HandlerId(0, true); /// Create a new instance for every bridge. #[allow(missing_debug_implementations)] pub struct Private { - _agent: PhantomData + _agent: PhantomData, } impl Discoverer for Private -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { type Agent = AGN; - fn spawn_or_join(callback: Option>) -> Box> - { + fn spawn_or_join(callback: Option>) -> Box> { let callback = callback.expect("Callback required for Private agents"); let handler = move |data: Vec, #[cfg(feature = "std_web")] worker: Value, @@ -75,9 +75,10 @@ where AGN: Agent, /// A connection manager for components interaction with workers. pub struct PrivateBridge -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { #[cfg(feature = "std_web")] worker: Value, @@ -87,9 +88,10 @@ where AGN: Agent, } impl fmt::Debug for PrivateBridge -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("PrivateBridge<_>") @@ -97,9 +99,10 @@ where AGN: Agent, } impl Bridge for PrivateBridge -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { fn send(&mut self, msg: AGN::Input) { // TODO(#937): Important! Implement. @@ -121,9 +124,10 @@ where AGN: Agent, } impl Drop for PrivateBridge -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { fn drop(&mut self) { let disconnected = ToWorker::Disconnected(SINGLETON_ID); @@ -132,4 +136,4 @@ where AGN: Agent, let destroy = ToWorker::Destroy; send_to_remote::(&self.worker, destroy); } -} \ No newline at end of file +} diff --git a/yew/src/agent/worker/public.rs b/yew/src/agent/worker/public.rs index 31e918548b3..394df8c7d47 100644 --- a/yew/src/agent/worker/public.rs +++ b/yew/src/agent/worker/public.rs @@ -31,18 +31,18 @@ thread_local! { /// Create a single instance in a tab. #[allow(missing_debug_implementations)] pub struct Public { - _agent: PhantomData + _agent: PhantomData, } impl Discoverer for Public -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { type Agent = AGN; - fn spawn_or_join(callback: Option>) -> Box> - { + fn spawn_or_join(callback: Option>) -> Box> { let bridge = REMOTE_AGENTS_POOL.with(|pool| { let mut pool = pool.borrow_mut(); match pool.entry::>() { @@ -112,16 +112,19 @@ where AGN: Agent, } impl Dispatchable for Public -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> -{ } +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, +{ +} /// A connection manager for components interaction with workers. pub struct PublicBridge -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { #[cfg(feature = "std_web")] worker: Value, @@ -132,9 +135,10 @@ where AGN: Agent, } impl fmt::Debug for PublicBridge -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("PublicBridge<_>") @@ -142,9 +146,10 @@ where AGN: Agent, } impl PublicBridge -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { fn worker_is_loaded(&self) -> bool { REMOTE_AGENTS_LOADED.with(|loaded| loaded.borrow().contains(&TypeId::of::())) @@ -175,9 +180,10 @@ where AGN: Agent, } impl Bridge for PublicBridge -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { fn send(&mut self, msg: AGN::Input) { let msg = ToWorker::ProcessInput(self.id, msg); @@ -186,9 +192,10 @@ where AGN: Agent, } impl Drop for PublicBridge -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { fn drop(&mut self) { let terminate_worker = REMOTE_AGENTS_POOL.with(|pool| { @@ -229,9 +236,10 @@ where AGN: Agent, struct WorkerResponder {} impl Responder for WorkerResponder -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { fn respond(&self, id: HandlerId, output: AGN::Output) { let msg = FromWorker::ProcessOutput(id, output); @@ -250,7 +258,7 @@ impl Threaded for AGN where AGN: Agent>, ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> + ::Output: Serialize + for<'de> Deserialize<'de>, { fn register() { let scope = AgentScope::::new(); @@ -304,9 +312,10 @@ where } struct RemoteAgent -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { #[cfg(feature = "std_web")] worker: Value, @@ -316,9 +325,10 @@ where AGN: Agent, } impl RemoteAgent -where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, - ::Output: Serialize + for<'de> Deserialize<'de> +where + AGN: Agent, + ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Output: Serialize + for<'de> Deserialize<'de>, { pub fn new( #[cfg(feature = "std_web")] worker: Value, @@ -348,4 +358,4 @@ where AGN: Agent, let _ = slab.remove(bridge.id.raw_id()); slab.is_empty() } -} \ No newline at end of file +} From c8cab20797c7cfc86cc8b64d49952088e9b2e056 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kawalec?= Date: Mon, 11 May 2020 18:20:51 +0200 Subject: [PATCH 08/11] dropped the useless debug constraint on the input --- yew/src/agent/worker/mod.rs | 2 +- yew/src/agent/worker/private.rs | 10 +++++----- yew/src/agent/worker/public.rs | 22 +++++++++++----------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/yew/src/agent/worker/mod.rs b/yew/src/agent/worker/mod.rs index 62074352c24..12bee8d8bdc 100644 --- a/yew/src/agent/worker/mod.rs +++ b/yew/src/agent/worker/mod.rs @@ -74,7 +74,7 @@ fn send_to_remote( msg: ToWorker, ) where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { let msg = msg.pack(); diff --git a/yew/src/agent/worker/private.rs b/yew/src/agent/worker/private.rs index 9bac871c007..f50aaea3931 100644 --- a/yew/src/agent/worker/private.rs +++ b/yew/src/agent/worker/private.rs @@ -25,7 +25,7 @@ pub struct Private { impl Discoverer for Private where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { type Agent = AGN; @@ -77,7 +77,7 @@ where pub struct PrivateBridge where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { #[cfg(feature = "std_web")] @@ -90,7 +90,7 @@ where impl fmt::Debug for PrivateBridge where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -101,7 +101,7 @@ where impl Bridge for PrivateBridge where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { fn send(&mut self, msg: AGN::Input) { @@ -126,7 +126,7 @@ where impl Drop for PrivateBridge where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { fn drop(&mut self) { diff --git a/yew/src/agent/worker/public.rs b/yew/src/agent/worker/public.rs index 394df8c7d47..8ed7bde1da9 100644 --- a/yew/src/agent/worker/public.rs +++ b/yew/src/agent/worker/public.rs @@ -37,7 +37,7 @@ pub struct Public { impl Discoverer for Public where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { type Agent = AGN; @@ -114,7 +114,7 @@ where impl Dispatchable for Public where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { } @@ -123,7 +123,7 @@ where pub struct PublicBridge where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { #[cfg(feature = "std_web")] @@ -137,7 +137,7 @@ where impl fmt::Debug for PublicBridge where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -148,7 +148,7 @@ where impl PublicBridge where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { fn worker_is_loaded(&self) -> bool { @@ -182,7 +182,7 @@ where impl Bridge for PublicBridge where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { fn send(&mut self, msg: AGN::Input) { @@ -194,7 +194,7 @@ where impl Drop for PublicBridge where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { fn drop(&mut self) { @@ -238,7 +238,7 @@ struct WorkerResponder {} impl Responder for WorkerResponder where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { fn respond(&self, id: HandlerId, output: AGN::Output) { @@ -257,7 +257,7 @@ where impl Threaded for AGN where AGN: Agent>, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { fn register() { @@ -314,7 +314,7 @@ where struct RemoteAgent where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { #[cfg(feature = "std_web")] @@ -327,7 +327,7 @@ where impl RemoteAgent where AGN: Agent, - ::Input: fmt::Debug + Serialize + for<'de> Deserialize<'de>, + ::Input: Serialize + for<'de> Deserialize<'de>, ::Output: Serialize + for<'de> Deserialize<'de>, { pub fn new( From 0bd543a9c090900006d372dc5ae379072f6eacb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kawalec?= Date: Mon, 11 May 2020 19:47:03 +0200 Subject: [PATCH 09/11] fix stdweb examples --- yew-stdweb/examples/multi_thread/src/context.rs | 2 +- yew-stdweb/examples/multi_thread/src/job.rs | 2 +- yew-stdweb/examples/multi_thread/src/native_worker.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/yew-stdweb/examples/multi_thread/src/context.rs b/yew-stdweb/examples/multi_thread/src/context.rs index 03b92c69ac1..5639dc81438 100644 --- a/yew-stdweb/examples/multi_thread/src/context.rs +++ b/yew-stdweb/examples/multi_thread/src/context.rs @@ -25,7 +25,7 @@ pub struct Worker { } impl Agent for Worker { - type Reach = Context; + type Reach = Context; type Message = Msg; type Input = Request; type Output = Response; diff --git a/yew-stdweb/examples/multi_thread/src/job.rs b/yew-stdweb/examples/multi_thread/src/job.rs index 182a7ba0f7e..4e380f8b780 100644 --- a/yew-stdweb/examples/multi_thread/src/job.rs +++ b/yew-stdweb/examples/multi_thread/src/job.rs @@ -25,7 +25,7 @@ pub struct Worker { } impl Agent for Worker { - type Reach = Job; + type Reach = Job; type Message = Msg; type Input = Request; type Output = Response; diff --git a/yew-stdweb/examples/multi_thread/src/native_worker.rs b/yew-stdweb/examples/multi_thread/src/native_worker.rs index 5e253fe70d5..0f9bb71d716 100644 --- a/yew-stdweb/examples/multi_thread/src/native_worker.rs +++ b/yew-stdweb/examples/multi_thread/src/native_worker.rs @@ -25,7 +25,7 @@ pub struct Worker { } impl Agent for Worker { - type Reach = Public; + type Reach = Public; type Message = Msg; type Input = Request; type Output = Response; From 9ce25a08e67a778309c50dabcf16857879e2111f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kawalec?= Date: Tue, 12 May 2020 10:00:13 +0200 Subject: [PATCH 10/11] dropped unneeded debug constraints --- .travis.yml | 2 +- README.md | 2 +- yew/src/agent/local/context.rs | 3 +-- yew/src/agent/local/job.rs | 1 - 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1185229bdc8..4ad5496a999 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,7 @@ before_cache: - ./ci/clear_cache.sh rust: - - 1.43.0 # min supported + - 1.42.0 # min supported - stable - beta diff --git a/README.md b/README.md index 035af58bb39..7a6743fc9ab 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Crate Info API Docs Discord Chat - Rustc Version 1.40+ + Rustc Version 1.42+ Yew Awesome

diff --git a/yew/src/agent/local/context.rs b/yew/src/agent/local/context.rs index c5bad2d36c5..bca0ffbc570 100644 --- a/yew/src/agent/local/context.rs +++ b/yew/src/agent/local/context.rs @@ -19,8 +19,7 @@ pub struct Context { impl Discoverer for Context where - AGN: Agent, - ::Input: fmt::Debug, + AGN: Agent { type Agent = AGN; diff --git a/yew/src/agent/local/job.rs b/yew/src/agent/local/job.rs index ad35ee6c6d0..fedfb223807 100644 --- a/yew/src/agent/local/job.rs +++ b/yew/src/agent/local/job.rs @@ -13,7 +13,6 @@ pub struct Job { impl Discoverer for Job where AGN: Agent, - ::Input: fmt::Debug, { type Agent = AGN; From 70da2414e87ec1c9f4884a0a79eab906b02cf08a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Kawalec?= Date: Tue, 12 May 2020 10:16:38 +0200 Subject: [PATCH 11/11] linter --- yew/src/agent/local/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yew/src/agent/local/context.rs b/yew/src/agent/local/context.rs index bca0ffbc570..42a31bccc51 100644 --- a/yew/src/agent/local/context.rs +++ b/yew/src/agent/local/context.rs @@ -19,7 +19,7 @@ pub struct Context { impl Discoverer for Context where - AGN: Agent + AGN: Agent, { type Agent = AGN;