diff --git a/Cargo.lock b/Cargo.lock index 9ab4d473a0e..3cfae99c917 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,6 +123,15 @@ dependencies = [ "yew", ] +[[package]] +name = "atomic-polyfill" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3ff7eb3f316534d83a8a2c3d1674ace8a5a71198eba31e2e2b597833f699b28" +dependencies = [ + "critical-section", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -417,6 +426,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" +[[package]] +name = "cobs" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15" + [[package]] name = "colorchoice" version = "1.0.0" @@ -531,6 +546,12 @@ dependencies = [ "libc", ] +[[package]] +name = "critical-section" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52" + [[package]] name = "crypto-common" version = "0.1.6" @@ -1149,9 +1170,9 @@ dependencies = [ [[package]] name = "gloo-worker" -version = "0.1.2" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09110b5555bcafe508cee0fb94308af9aac7a85f980d3c88b270d117c6c6911d" +checksum = "13471584da78061a28306d1359dd0178d8d6fc1c7c80e5e35d27260346e0516a" dependencies = [ "anymap2", "bincode", @@ -1159,28 +1180,42 @@ dependencies = [ "gloo-utils", "js-sys", "serde", - "slab", "wasm-bindgen", + "wasm-bindgen-futures", "web-sys", ] [[package]] name = "gloo-worker" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13471584da78061a28306d1359dd0178d8d6fc1c7c80e5e35d27260346e0516a" +checksum = "cdec38f5350e6f71425895382d3f0e5e45ad78b69c9905f097a171b80c73112c" dependencies = [ - "anymap2", "bincode", - "gloo-console", + "futures 0.3.28", "gloo-utils", + "gloo-worker-macros", "js-sys", + "pinned", "serde", + "thiserror", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", ] +[[package]] +name = "gloo-worker-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "956caa58d4857bc9941749d55e4bd3000032d8212762586fa5705632967140e7" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.27", +] + [[package]] name = "h2" version = "0.3.20" @@ -1200,6 +1235,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hash32" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67" +dependencies = [ + "byteorder", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1237,6 +1281,20 @@ dependencies = [ "http", ] +[[package]] +name = "heapless" +version = "0.7.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db04bc24a18b9ea980628ecf00e6c0264f3c1426dac36c00cb49b6fbad8b0743" +dependencies = [ + "atomic-polyfill", + "hash32", + "rustc_version", + "serde", + "spin", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.4.1" @@ -2004,6 +2062,17 @@ dependencies = [ "yew", ] +[[package]] +name = "postcard" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9ee729232311d3cd113749948b689627618133b1c5012b77342c1950b25eaeb" +dependencies = [ + "cobs", + "heapless", + "serde", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -2020,6 +2089,22 @@ dependencies = [ "syn 2.0.27", ] +[[package]] +name = "primes" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a61082d8bceecd71a3870e9162002bb75f7ba9c7aa8b76227e887782fef9c8" + +[[package]] +name = "proc-macro-crate" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" +dependencies = [ + "once_cell", + "toml_edit", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -2452,6 +2537,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "ssr_router" @@ -2473,6 +2561,12 @@ dependencies = [ "yew", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "strsim" version = "0.10.0" @@ -2759,6 +2853,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "toml_datetime" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" + +[[package]] +name = "toml_edit" +version = "0.19.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a" +dependencies = [ + "indexmap 2.0.0", + "toml_datetime", + "winnow", +] + [[package]] name = "tower" version = "0.4.13" @@ -3397,6 +3508,15 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" +[[package]] +name = "winnow" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acaaa1190073b2b101e15083c38ee8ec891b5e05cbee516521e94ec008f61e64" +dependencies = [ + "memchr", +] + [[package]] name = "winreg" version = "0.10.1" @@ -3438,9 +3558,24 @@ dependencies = [ name = "yew-agent" version = "0.2.0" dependencies = [ - "gloo-worker 0.1.2", + "futures 0.3.28", + "gloo-worker 0.3.0", "serde", + "wasm-bindgen", "yew", + "yew-agent-macro", +] + +[[package]] +name = "yew-agent-macro" +version = "0.1.0" +dependencies = [ + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.27", + "trybuild", + "yew-agent", ] [[package]] @@ -3494,6 +3629,7 @@ name = "yew-worker-fib" version = "0.1.0" dependencies = [ "js-sys", + "postcard", "serde", "wasm-bindgen", "web-sys", @@ -3501,6 +3637,17 @@ dependencies = [ "yew-agent", ] +[[package]] +name = "yew-worker-prime" +version = "0.1.0" +dependencies = [ + "futures 0.3.28", + "primes", + "serde", + "yew", + "yew-agent", +] + [[package]] name = "zxcvbn" version = "2.2.2" diff --git a/examples/README.md b/examples/README.md index 58da019a898..1b5a01fa937 100644 --- a/examples/README.md +++ b/examples/README.md @@ -57,7 +57,8 @@ As an example, check out the TodoMVC example here: + - Yew • Web Worker Fibonacci + Yew • Web Worker Fibonacci + + + + - - - + + - - diff --git a/examples/web_worker_fib/src/agent.rs b/examples/web_worker_fib/src/agent.rs index e002caeada4..afe6f6803e5 100644 --- a/examples/web_worker_fib/src/agent.rs +++ b/examples/web_worker_fib/src/agent.rs @@ -1,59 +1,39 @@ +use js_sys::Uint8Array; use serde::{Deserialize, Serialize}; -use yew_agent::{HandlerId, Public, WorkerLink}; - -pub struct Worker { - link: WorkerLink, -} - -#[derive(Serialize, Deserialize)] -pub struct WorkerInput { - pub n: u32, -} - -#[derive(Serialize, Deserialize)] -pub struct WorkerOutput { - pub value: u32, -} - -impl yew_agent::Worker for Worker { - type Input = WorkerInput; - type Message = (); - type Output = WorkerOutput; - type Reach = Public; - - fn create(link: WorkerLink) -> Self { - Self { link } +use wasm_bindgen::JsValue; +use yew_agent::prelude::*; +use yew_agent::Codec; + +/// Example to use a custom codec. +pub struct Postcard; + +impl Codec for Postcard { + fn encode(input: I) -> JsValue + where + I: Serialize, + { + let buf = postcard::to_vec::<_, 32>(&input).expect("can't serialize a worker message"); + Uint8Array::from(buf.as_slice()).into() } - fn update(&mut self, _msg: Self::Message) { - // no messaging + fn decode(input: JsValue) -> O + where + O: for<'de> Deserialize<'de>, + { + let data = Uint8Array::from(input).to_vec(); + postcard::from_bytes(&data).expect("can't deserialize a worker message") } +} - fn handle_input(&mut self, msg: Self::Input, id: HandlerId) { - // this runs in a web worker - // and does not block the main - // browser thread! - - let n = msg.n; - - fn fib(n: u32) -> u32 { - if n <= 1 { - 1 - } else { - fib(n - 1) + fib(n - 2) - } +#[oneshot] +pub async fn FibonacciTask(n: u32) -> u32 { + fn fib(n: u32) -> u32 { + if n <= 1 { + 1 + } else { + fib(n - 1) + fib(n - 2) } - - let output = Self::Output { value: fib(n) }; - - self.link.respond(id, output); - } - - fn name_of_resource() -> &'static str { - "worker.js" } - fn resource_path_is_relative() -> bool { - true - } + fib(n) } diff --git a/examples/web_worker_fib/src/bin/worker.rs b/examples/web_worker_fib/src/bin/worker.rs index d785d8e067f..46bdb6f0dab 100644 --- a/examples/web_worker_fib/src/bin/worker.rs +++ b/examples/web_worker_fib/src/bin/worker.rs @@ -1,6 +1,6 @@ -use yew_agent::PublicWorker; -use yew_worker_fib::agent::Worker; +use yew_agent::Registrable; +use yew_worker_fib::agent::{FibonacciTask, Postcard}; fn main() { - Worker::register(); + FibonacciTask::registrar().encoding::().register(); } diff --git a/examples/web_worker_fib/src/lib.rs b/examples/web_worker_fib/src/lib.rs index 14813431551..5d072bf3e0e 100644 --- a/examples/web_worker_fib/src/lib.rs +++ b/examples/web_worker_fib/src/lib.rs @@ -3,82 +3,78 @@ pub mod agent; -use std::rc::Rc; - use web_sys::HtmlInputElement; +use yew::platform::spawn_local; use yew::prelude::*; -use yew_agent::{Bridge, Bridged}; +use yew_agent::oneshot::{use_oneshot_runner, OneshotProvider}; -use crate::agent::{Worker, WorkerInput, WorkerOutput}; +use crate::agent::{FibonacciTask, Postcard}; -pub struct App { - clicker_value: u32, - input_ref: NodeRef, - worker: Box>, - fibonacci_output: String, -} +#[function_component] +fn Main() -> Html { + let input_value = use_state_eq(|| 44); + let output = use_state(|| "Try out some fibonacci calculations!".to_string()); + let fib_task = use_oneshot_runner::(); -pub enum Message { - Click, - RunWorker, - WorkerMsg(WorkerOutput), -} + let clicker_value = use_state_eq(|| 0); -impl Component for App { - type Message = Message; - type Properties = (); + let calculate = { + let input_value = *input_value; + let output = output.clone(); + move |_e: MouseEvent| { + let fib_agent = fib_task.clone(); + let output = output.clone(); - fn create(ctx: &Context) -> Self { - let cb = { - let link = ctx.link().clone(); - move |e| link.send_message(Self::Message::WorkerMsg(e)) - }; - let worker = Worker::bridge(Rc::new(cb)); + spawn_local(async move { + // start the worker + let output_value = fib_agent.run(input_value).await; - Self { - clicker_value: 0, - input_ref: NodeRef::default(), - worker, - fibonacci_output: String::from("Try out some fibonacci calculations!"), + output.set(format!("Fibonacci value: {}", output_value)); + }); } - } + }; - fn update(&mut self, _ctx: &Context, msg: Self::Message) -> bool { - match msg { - Self::Message::Click => { - self.clicker_value += 1; - } - Self::Message::RunWorker => { - if let Some(input) = self.input_ref.cast::() { - // start the worker off! - self.worker.send(WorkerInput { - n: input.value_as_number() as u32, - }); - } - } - Self::Message::WorkerMsg(output) => { - // the worker is done! - self.fibonacci_output = format!("Fibonacci value: {}", output.value); - } + let on_input_change = { + let input_value = input_value.clone(); + move |e: InputEvent| { + input_value.set( + e.target_unchecked_into::() + .value() + .parse() + .expect("failed to parse"), + ); } + }; - true - } + let inc_clicker = { + let clicker_value = clicker_value.clone(); - fn view(&self, ctx: &Context) -> Html { - html! { - <> -

{ "Web worker demo" }

-

{ "Submit a value to calculate, then increase the counter on the main thread!"}

-

{ "Large numbers will take some time!" }

-

{ "Output: " } { &self.fibonacci_output }

-
- - -

-

{ "Main thread value: " } { self.clicker_value }

- - + move |_e: MouseEvent| { + clicker_value.set(*clicker_value + 1); } + }; + + html! { + <> +

{ "Web worker demo" }

+

{ "Submit a value to calculate, then increase the counter on the main thread!"}

+

{ "Large numbers will take some time!" }

+

{ "Output: " } { &*output }

+
+ + +

+

{ "Main thread value: " } { *clicker_value }

+ + + } +} + +#[function_component] +pub fn App() -> Html { + html! { + path="/worker.js"> +
+ > } } diff --git a/examples/web_worker_prime/Cargo.toml b/examples/web_worker_prime/Cargo.toml new file mode 100644 index 00000000000..b66142ba89b --- /dev/null +++ b/examples/web_worker_prime/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "yew-worker-prime" +version = "0.1.0" +edition = "2021" + +[dependencies] +yew-agent = { path = "../../packages/yew-agent" } +yew = { path = "../../packages/yew", features = ["csr"] } +futures = "0.3.25" +primes = "0.3.0" +serde = { version = "1.0.147", features = ["derive"] } diff --git a/examples/web_worker_prime/README.md b/examples/web_worker_prime/README.md new file mode 100644 index 00000000000..82ded23b81a --- /dev/null +++ b/examples/web_worker_prime/README.md @@ -0,0 +1,17 @@ +# Web Worker Prime + +[![Demo](https://img.shields.io/website?label=demo&url=https%3A%2F%2Fexamples.yew.rs%2Fweb_worker_prime)](https://examples.yew.rs/web_worker_prime) + +Calculate primes until stop button is pressed, without blocking the main thread. + +## Concepts + +The example illustrates how to use reactor agents to offload CPU bound tasks to a worker thread in a Yew application. + +## Running + +Run this application with the trunk development server: + +```bash +trunk serve --open +``` diff --git a/examples/web_worker_prime/index.html b/examples/web_worker_prime/index.html new file mode 100644 index 00000000000..eacef73d53d --- /dev/null +++ b/examples/web_worker_prime/index.html @@ -0,0 +1,15 @@ + + + + + + Yew • Web Worker Prime + + + + + + + + + diff --git a/examples/web_worker_prime/src/agent.rs b/examples/web_worker_prime/src/agent.rs new file mode 100644 index 00000000000..595239f1626 --- /dev/null +++ b/examples/web_worker_prime/src/agent.rs @@ -0,0 +1,38 @@ +use std::time::Duration; + +use futures::sink::SinkExt; +use futures::{FutureExt, StreamExt}; +use serde::{Deserialize, Serialize}; +use yew::platform::time::sleep; +use yew_agent::prelude::*; + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum ControlSignal { + Start, + Stop, +} + +#[reactor] +pub async fn PrimeReactor(mut scope: ReactorScope) { + while let Some(m) = scope.next().await { + if m == ControlSignal::Start { + 'inner: for i in 1.. { + // This is not the most efficient way to calculate prime, + // but this example is here to demonstrate how primes can be + // sent to the application in an ascending order. + if primes::is_prime(i) { + scope.send(i).await.unwrap(); + } + + futures::select! { + m = scope.next() => { + if m == Some(ControlSignal::Stop) { + break 'inner; + } + }, + _ = sleep(Duration::from_millis(100)).fuse() => {}, + } + } + } + } +} diff --git a/examples/web_worker_prime/src/bin/app.rs b/examples/web_worker_prime/src/bin/app.rs new file mode 100644 index 00000000000..415969eb135 --- /dev/null +++ b/examples/web_worker_prime/src/bin/app.rs @@ -0,0 +1,3 @@ +fn main() { + yew::Renderer::::new().render(); +} diff --git a/examples/web_worker_prime/src/bin/worker.rs b/examples/web_worker_prime/src/bin/worker.rs new file mode 100644 index 00000000000..51b124e440f --- /dev/null +++ b/examples/web_worker_prime/src/bin/worker.rs @@ -0,0 +1,6 @@ +use yew_agent::Registrable; +use yew_worker_prime::agent::PrimeReactor; + +fn main() { + PrimeReactor::registrar().register(); +} diff --git a/examples/web_worker_prime/src/lib.rs b/examples/web_worker_prime/src/lib.rs new file mode 100644 index 00000000000..a909f90f2e0 --- /dev/null +++ b/examples/web_worker_prime/src/lib.rs @@ -0,0 +1,64 @@ +pub mod agent; +use agent::{ControlSignal, PrimeReactor}; +use yew::prelude::*; +use yew_agent::reactor::{use_reactor_subscription, ReactorProvider}; + +#[function_component] +fn Main() -> Html { + let prime_sub = use_reactor_subscription::(); + let started = use_state_eq(|| false); + let skip_len = use_state_eq(|| 0); + + let result_s = prime_sub + .iter() + // Skip results in previous runs. + .skip(*skip_len) + .fold("".to_string(), |mut output, item| { + if !output.is_empty() { + output.push_str(", "); + } + + output.push_str(&item.to_string()); + + output + }); + + let start_prime_calc = use_callback( + (prime_sub.clone(), started.setter(), skip_len.setter()), + |_input, (prime_sub, started_setter, skip_len)| { + skip_len.set(prime_sub.len()); + prime_sub.send(ControlSignal::Start); + started_setter.set(true); + }, + ); + + let stop_prime_calc = use_callback( + (prime_sub, started.setter()), + |_input, (prime_sub, started_setter)| { + prime_sub.send(ControlSignal::Stop); + started_setter.set(false); + }, + ); + + html! { + <> +

{"Find Prime"}

+

{"This page demonstrates how to calculate prime in a web worker."}

+ if *started { + + } else { + + } +
{result_s}
+ + } +} + +#[function_component] +pub fn App() -> Html { + html! { + path="/worker.js"> +
+ > + } +} diff --git a/packages/yew-agent-macro/Cargo.toml b/packages/yew-agent-macro/Cargo.toml new file mode 100644 index 00000000000..69548444adf --- /dev/null +++ b/packages/yew-agent-macro/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "yew-agent-macro" +version = "0.1.0" +edition = "2021" +rust-version = "1.64.0" +authors = ["Kaede Hoshikawa "] +repository = "https://github.com/yewstack/yew" +homepage = "https://yew.rs" +documentation = "https://docs.rs/yew/" +readme = "../../README.md" +description = "Macro Support for Yew Agents" +license = "MIT OR Apache-2.0" + +[lib] +proc-macro = true + +[dependencies] +proc-macro2 = "1" +quote = "1" +syn = { version = "2", features = ["full", "extra-traits"] } + +[dev-dependencies] +rustversion = "1" +trybuild = "1" +yew-agent = { path = "../yew-agent" } diff --git a/packages/yew-agent-macro/src/agent_fn.rs b/packages/yew-agent-macro/src/agent_fn.rs new file mode 100644 index 00000000000..ef598323bd1 --- /dev/null +++ b/packages/yew-agent-macro/src/agent_fn.rs @@ -0,0 +1,241 @@ +use proc_macro2::{Span, TokenStream}; +use quote::ToTokens; +use syn::parse::{Parse, ParseStream}; +use syn::punctuated::Punctuated; +use syn::token::Comma; +use syn::{Attribute, FnArg, Generics, Ident, Item, ItemFn, Signature, Type, Visibility}; + +pub trait AgentFnType { + type RecvType; + type OutputType; + + fn attr_name() -> &'static str; + fn agent_type_name() -> &'static str; + fn parse_recv_type(sig: &Signature) -> syn::Result; + fn parse_output_type(sig: &Signature) -> syn::Result; + + fn extract_fn_arg_type(arg: &FnArg) -> syn::Result { + let ty = match arg { + FnArg::Typed(arg) => arg.ty.clone(), + + FnArg::Receiver(_) => { + return Err(syn::Error::new_spanned( + arg, + format!("{} agents can't accept a receiver", Self::agent_type_name()), + )); + } + }; + + Ok(*ty) + } + + fn assert_no_left_argument(rest_inputs: I, expected_len: usize) -> syn::Result<()> + where + I: ExactSizeIterator + IntoIterator, + T: ToTokens, + { + // Checking after param parsing may make it a little inefficient + // but that's a requirement for better error messages in case of receivers + // `>0` because first one is already consumed. + if rest_inputs.len() > 0 { + let params: TokenStream = rest_inputs + .into_iter() + .map(|it| it.to_token_stream()) + .collect(); + return Err(syn::Error::new_spanned( + params, + format!( + "{} agent can accept at most {} argument{}", + Self::agent_type_name(), + expected_len, + if expected_len > 1 { "s" } else { "" } + ), + )); + } + + Ok(()) + } +} + +#[derive(Clone)] +pub struct AgentFn +where + F: AgentFnType + 'static, +{ + pub recv_type: F::RecvType, + pub output_type: F::OutputType, + pub generics: Generics, + pub vis: Visibility, + pub attrs: Vec, + pub name: Ident, + pub agent_name: Option, + pub is_async: bool, + + pub func: ItemFn, +} + +impl Parse for AgentFn +where + F: AgentFnType + 'static, +{ + fn parse(input: ParseStream) -> syn::Result { + let parsed: Item = input.parse()?; + + let func = match parsed { + Item::Fn(m) => m, + + item => { + return Err(syn::Error::new_spanned( + item, + format!( + "`{}` attribute can only be applied to functions", + F::attr_name() + ), + )) + } + }; + + let ItemFn { + attrs, vis, sig, .. + } = func.clone(); + + if sig.generics.lifetimes().next().is_some() { + return Err(syn::Error::new_spanned( + sig.generics, + format!( + "{} agents can't have generic lifetime parameters", + F::agent_type_name() + ), + )); + } + + if sig.constness.is_some() { + return Err(syn::Error::new_spanned( + sig.constness, + format!("const functions can't be {} agents", F::agent_type_name()), + )); + } + + if sig.abi.is_some() { + return Err(syn::Error::new_spanned( + sig.abi, + format!("extern functions can't be {} agents", F::agent_type_name()), + )); + } + let recv_type = F::parse_recv_type(&sig)?; + let output_type = F::parse_output_type(&sig)?; + + let is_async = sig.asyncness.is_some(); + + Ok(Self { + recv_type, + output_type, + generics: sig.generics, + is_async, + vis, + attrs, + name: sig.ident, + agent_name: None, + func, + }) + } +} + +impl AgentFn +where + F: AgentFnType + 'static, +{ + /// Filters attributes that should be copied to agent definition. + pub fn filter_attrs_for_agent_struct(&self) -> Vec { + self.attrs + .iter() + .filter_map(|m| { + m.path() + .get_ident() + .and_then(|ident| match ident.to_string().as_str() { + "doc" | "allow" => Some(m.clone()), + _ => None, + }) + }) + .collect() + } + + /// Filters attributes that should be copied to the agent impl block. + pub fn filter_attrs_for_agent_impl(&self) -> Vec { + self.attrs + .iter() + .filter_map(|m| { + m.path() + .get_ident() + .and_then(|ident| match ident.to_string().as_str() { + "allow" => Some(m.clone()), + _ => None, + }) + }) + .collect() + } + + pub fn phantom_generics(&self) -> Punctuated { + self.generics + .type_params() + .map(|ty_param| ty_param.ident.clone()) // create a new Punctuated sequence without any type bounds + .collect::>() + } + + pub fn merge_agent_name(&mut self, name: AgentName) -> syn::Result<()> { + if let Some(ref m) = name.agent_name { + if m == &self.name { + return Err(syn::Error::new_spanned( + m, + format!( + "the {} must not have the same name as the function", + F::agent_type_name() + ), + )); + } + } + + self.agent_name = name.agent_name; + + Ok(()) + } + + pub fn inner_fn_ident(&self) -> Ident { + if self.agent_name.is_some() { + self.name.clone() + } else { + Ident::new("inner", Span::mixed_site()) + } + } + + pub fn agent_name(&self) -> Ident { + self.agent_name.clone().unwrap_or_else(|| self.name.clone()) + } + + pub fn print_inner_fn(&self) -> ItemFn { + let mut func = self.func.clone(); + func.sig.ident = self.inner_fn_ident(); + + func.vis = Visibility::Inherited; + + func + } +} + +pub struct AgentName { + agent_name: Option, +} + +impl Parse for AgentName { + fn parse(input: ParseStream) -> syn::Result { + if input.is_empty() { + return Ok(Self { agent_name: None }); + } + + let agent_name = input.parse()?; + + Ok(Self { + agent_name: Some(agent_name), + }) + } +} diff --git a/packages/yew-agent-macro/src/lib.rs b/packages/yew-agent-macro/src/lib.rs new file mode 100644 index 00000000000..8acd9797357 --- /dev/null +++ b/packages/yew-agent-macro/src/lib.rs @@ -0,0 +1,30 @@ +use proc_macro::TokenStream; +use syn::parse_macro_input; + +mod agent_fn; +mod oneshot; +mod reactor; + +use agent_fn::{AgentFn, AgentName}; +use oneshot::{oneshot_impl, OneshotFn}; +use reactor::{reactor_impl, ReactorFn}; + +#[proc_macro_attribute] +pub fn reactor(attr: TokenStream, item: TokenStream) -> TokenStream { + let item = parse_macro_input!(item as AgentFn); + let attr = parse_macro_input!(attr as AgentName); + + reactor_impl(attr, item) + .unwrap_or_else(|err| err.to_compile_error()) + .into() +} + +#[proc_macro_attribute] +pub fn oneshot(attr: TokenStream, item: TokenStream) -> TokenStream { + let item = parse_macro_input!(item as AgentFn); + let attr = parse_macro_input!(attr as AgentName); + + oneshot_impl(attr, item) + .unwrap_or_else(|err| err.to_compile_error()) + .into() +} diff --git a/packages/yew-agent-macro/src/oneshot.rs b/packages/yew-agent-macro/src/oneshot.rs new file mode 100644 index 00000000000..0d22ef2558c --- /dev/null +++ b/packages/yew-agent-macro/src/oneshot.rs @@ -0,0 +1,130 @@ +use proc_macro2::{Span, TokenStream}; +use quote::quote; +use syn::{parse_quote, Ident, ReturnType, Signature, Type}; + +use crate::agent_fn::{AgentFn, AgentFnType, AgentName}; + +pub struct OneshotFn {} + +impl AgentFnType for OneshotFn { + type OutputType = Type; + type RecvType = Type; + + fn attr_name() -> &'static str { + "oneshot" + } + + fn agent_type_name() -> &'static str { + "oneshot" + } + + fn parse_recv_type(sig: &Signature) -> syn::Result { + let mut inputs = sig.inputs.iter(); + let arg = inputs + .next() + .ok_or_else(|| syn::Error::new_spanned(&sig.ident, "expected 1 argument"))?; + + let ty = Self::extract_fn_arg_type(arg)?; + + Self::assert_no_left_argument(inputs, 1)?; + + Ok(ty) + } + + fn parse_output_type(sig: &Signature) -> syn::Result { + let ty = match &sig.output { + ReturnType::Default => { + parse_quote! { () } + } + ReturnType::Type(_, ty) => *ty.clone(), + }; + + Ok(ty) + } +} + +pub fn oneshot_impl(name: AgentName, mut agent_fn: AgentFn) -> syn::Result { + agent_fn.merge_agent_name(name)?; + + let struct_attrs = agent_fn.filter_attrs_for_agent_struct(); + let oneshot_impl_attrs = agent_fn.filter_attrs_for_agent_impl(); + let phantom_generics = agent_fn.phantom_generics(); + let oneshot_name = agent_fn.agent_name(); + let fn_name = agent_fn.inner_fn_ident(); + let inner_fn = agent_fn.print_inner_fn(); + + let AgentFn { + recv_type: input_type, + generics, + output_type, + vis, + is_async, + .. + } = agent_fn; + let (impl_generics, ty_generics, where_clause) = generics.split_for_impl(); + let fn_generics = ty_generics.as_turbofish(); + + let in_ident = Ident::new("_input", Span::mixed_site()); + + let fn_call = if is_async { + quote! { #fn_name #fn_generics (#in_ident).await } + } else { + quote! { #fn_name #fn_generics (#in_ident) } + }; + let crate_name = quote! { yew_agent }; + + let quoted = quote! { + #(#struct_attrs)* + #[allow(unused_parens)] + #vis struct #oneshot_name #generics #where_clause { + inner: ::std::pin::Pin<::std::boxed::Box>>, + _marker: ::std::marker::PhantomData<(#phantom_generics)>, + } + + // we cannot disable any lints here because it will be applied to the function body + // as well. + #(#oneshot_impl_attrs)* + impl #impl_generics ::#crate_name::oneshot::Oneshot for #oneshot_name #ty_generics #where_clause { + type Input = #input_type; + + fn create(#in_ident: Self::Input) -> Self { + #inner_fn + + Self { + inner: ::std::boxed::Box::pin( + async move { + #fn_call + } + ), + _marker: ::std::marker::PhantomData, + } + } + } + + impl #impl_generics ::std::future::Future for #oneshot_name #ty_generics #where_clause { + type Output = #output_type; + + fn poll(mut self: ::std::pin::Pin<&mut Self>, cx: &mut ::std::task::Context<'_>) -> ::std::task::Poll { + ::std::future::Future::poll(::std::pin::Pin::new(&mut self.inner), cx) + } + } + + impl #impl_generics ::#crate_name::Registrable for #oneshot_name #ty_generics #where_clause { + type Registrar = ::#crate_name::oneshot::OneshotRegistrar; + + fn registrar() -> Self::Registrar { + ::#crate_name::oneshot::OneshotRegistrar::::new() + } + } + + impl #impl_generics ::#crate_name::Spawnable for #oneshot_name #ty_generics #where_clause { + type Spawner = ::#crate_name::oneshot::OneshotSpawner; + + fn spawner() -> Self::Spawner { + ::#crate_name::oneshot::OneshotSpawner::::new() + } + } + }; + + Ok(quoted) +} diff --git a/packages/yew-agent-macro/src/reactor.rs b/packages/yew-agent-macro/src/reactor.rs new file mode 100644 index 00000000000..089e6939f1a --- /dev/null +++ b/packages/yew-agent-macro/src/reactor.rs @@ -0,0 +1,135 @@ +use proc_macro2::{Span, TokenStream}; +use quote::quote; +use syn::{Ident, ReturnType, Signature, Type}; + +use crate::agent_fn::{AgentFn, AgentFnType, AgentName}; + +pub struct ReactorFn {} + +impl AgentFnType for ReactorFn { + type OutputType = (); + type RecvType = Type; + + fn attr_name() -> &'static str { + "reactor" + } + + fn agent_type_name() -> &'static str { + "reactor" + } + + fn parse_recv_type(sig: &Signature) -> syn::Result { + let mut inputs = sig.inputs.iter(); + let arg = inputs + .next() + .ok_or_else(|| syn::Error::new_spanned(&sig.ident, "expected 1 argument"))?; + + let ty = Self::extract_fn_arg_type(arg)?; + + Self::assert_no_left_argument(inputs, 1)?; + + Ok(ty) + } + + fn parse_output_type(sig: &Signature) -> syn::Result { + match &sig.output { + ReturnType::Default => {} + ReturnType::Type(_, ty) => { + return Err(syn::Error::new_spanned( + ty, + "reactor agents cannot return any value", + )) + } + } + + Ok(()) + } +} + +pub fn reactor_impl(name: AgentName, mut agent_fn: AgentFn) -> syn::Result { + agent_fn.merge_agent_name(name)?; + + if !agent_fn.is_async { + return Err(syn::Error::new_spanned( + &agent_fn.name, + "reactor agents must be asynchronous", + )); + } + + let struct_attrs = agent_fn.filter_attrs_for_agent_struct(); + let reactor_impl_attrs = agent_fn.filter_attrs_for_agent_impl(); + let phantom_generics = agent_fn.phantom_generics(); + let reactor_name = agent_fn.agent_name(); + let fn_name = agent_fn.inner_fn_ident(); + let inner_fn = agent_fn.print_inner_fn(); + + let AgentFn { + recv_type, + generics, + vis, + .. + } = agent_fn; + + let (impl_generics, ty_generics, where_clause) = generics.split_for_impl(); + let fn_generics = ty_generics.as_turbofish(); + + let scope_ident = Ident::new("_scope", Span::mixed_site()); + + let fn_call = quote! { #fn_name #fn_generics (#scope_ident).await }; + let crate_name = quote! { yew_agent }; + + let quoted = quote! { + #(#struct_attrs)* + #[allow(unused_parens)] + #vis struct #reactor_name #generics #where_clause { + inner: ::std::pin::Pin<::std::boxed::Box>>, + _marker: ::std::marker::PhantomData<(#phantom_generics)>, + } + + // we cannot disable any lints here because it will be applied to the function body + // as well. + #(#reactor_impl_attrs)* + impl #impl_generics ::#crate_name::reactor::Reactor for #reactor_name #ty_generics #where_clause { + type Scope = #recv_type; + + fn create(#scope_ident: Self::Scope) -> Self { + #inner_fn + + Self { + inner: ::std::boxed::Box::pin( + async move { + #fn_call + } + ), + _marker: ::std::marker::PhantomData, + } + } + } + + impl #impl_generics ::std::future::Future for #reactor_name #ty_generics #where_clause { + type Output = (); + + fn poll(mut self: ::std::pin::Pin<&mut Self>, cx: &mut ::std::task::Context<'_>) -> ::std::task::Poll { + ::std::future::Future::poll(::std::pin::Pin::new(&mut self.inner), cx) + } + } + + impl #impl_generics ::#crate_name::Registrable for #reactor_name #ty_generics #where_clause { + type Registrar = ::#crate_name::reactor::ReactorRegistrar; + + fn registrar() -> Self::Registrar { + ::#crate_name::reactor::ReactorRegistrar::::new() + } + } + + impl #impl_generics ::#crate_name::Spawnable for #reactor_name #ty_generics #where_clause { + type Spawner = ::#crate_name::reactor::ReactorSpawner; + + fn spawner() -> Self::Spawner { + ::#crate_name::reactor::ReactorSpawner::::new() + } + } + }; + + Ok(quoted) +} diff --git a/packages/yew-agent/Cargo.toml b/packages/yew-agent/Cargo.toml index 18aef8fa6d3..9bfe65c1d9f 100644 --- a/packages/yew-agent/Cargo.toml +++ b/packages/yew-agent/Cargo.toml @@ -9,10 +9,15 @@ edition = "2021" readme = "../../README.md" description = "Agents for Yew" license = "MIT OR Apache-2.0" +rust-version = "1.64.0" [dependencies] yew = { version = "0.20.0", path = "../yew" } -gloo-worker = "0.1" +gloo-worker = { version = "0.3", features = ["futures"] } +wasm-bindgen = "0.2" +serde = { version = "1", features = ["derive"] } +futures = "0.3" +yew-agent-macro = { version = "0.1", path = "../yew-agent-macro" } [dev-dependencies] serde = "1.0.164" diff --git a/packages/yew-agent/src/hooks.rs b/packages/yew-agent/src/hooks.rs deleted file mode 100644 index 028b385fed1..00000000000 --- a/packages/yew-agent/src/hooks.rs +++ /dev/null @@ -1,132 +0,0 @@ -use std::cell::RefCell; -use std::rc::Rc; - -use yew::prelude::*; - -use crate::*; - -/// State handle for [`use_bridge`] hook -pub struct UseBridgeHandle -where - T: Bridged, -{ - inner: Rc>>>, -} - -impl UseBridgeHandle -where - T: Bridged, -{ - /// Send a message to an worker. - pub fn send(&self, msg: T::Input) { - let mut bridge = self.inner.borrow_mut(); - bridge.send(msg); - } -} - -/// A hook to bridge to an [`Worker`]. -/// -/// This hooks will only bridge the worker once over the entire component lifecycle. -/// -/// Takes a callback as the only argument. The callback will be updated on every render to make -/// sure captured values (if any) are up to date. -/// -/// # Examples -/// -/// ``` -/// # mod example { -/// use serde::{Deserialize, Serialize}; -/// use yew::prelude::*; -/// use yew_agent::{use_bridge, UseBridgeHandle}; -/// -/// // This would usually live in the same file as your worker -/// #[derive(Serialize, Deserialize)] -/// pub enum WorkerResponseType { -/// IncrementCounter, -/// } -/// # mod my_worker_mod { -/// # use yew_agent::{HandlerId, Public, WorkerLink}; -/// # use super::WorkerResponseType; -/// # pub struct MyWorker { -/// # pub link: WorkerLink, -/// # } -/// -/// # impl yew_agent::Worker for MyWorker { -/// # type Input = (); -/// # type Output = WorkerResponseType; -/// # type Reach = Public; -/// # type Message = (); -/// # -/// # fn create(link: WorkerLink) -> Self { -/// # MyWorker { link } -/// # } -/// # -/// # fn update(&mut self, _msg: Self::Message) { -/// # // do nothing -/// # } -/// # -/// # fn handle_input(&mut self, _msg: Self::Input, id: HandlerId) { -/// # self.link.respond(id, WorkerResponseType::IncrementCounter); -/// # } -/// # } -/// # } -/// use my_worker_mod::MyWorker; // note that ::Output == WorkerResponseType -/// #[function_component(UseBridge)] -/// fn bridge() -> Html { -/// let counter = use_state(|| 0); -/// -/// // a scoped block to clone the state in -/// { -/// let counter = counter.clone(); -/// // response will be of type MyWorker::Output, i.e. WorkerResponseType -/// let bridge: UseBridgeHandle = use_bridge(move |response| match response { -/// WorkerResponseType::IncrementCounter => { -/// counter.set(*counter + 1); -/// } -/// }); -/// } -/// -/// html! { -///
-/// {*counter} -///
-/// } -/// } -/// # } -/// ``` -#[hook] -pub fn use_bridge(on_output: F) -> UseBridgeHandle -where - T: Bridged, - F: Fn(T::Output) + 'static, -{ - let on_output = Rc::new(on_output); - - let on_output_clone = on_output.clone(); - let on_output_ref = use_mut_ref(move || on_output_clone); - - // Refresh the callback on every render. - { - let mut on_output_ref = on_output_ref.borrow_mut(); - *on_output_ref = on_output; - } - - let bridge = use_mut_ref(move || { - T::bridge({ - Rc::new(move |output| { - let on_output = on_output_ref.borrow().clone(); - on_output(output); - }) - }) - }); - - UseBridgeHandle { inner: bridge } -} - -impl Clone for UseBridgeHandle { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } - } -} diff --git a/packages/yew-agent/src/lib.rs b/packages/yew-agent/src/lib.rs index 9a68c12f2cb..160d7d16b81 100644 --- a/packages/yew-agent/src/lib.rs +++ b/packages/yew-agent/src/lib.rs @@ -1,7 +1,112 @@ //! This module contains Yew's web worker implementation. +//! +//! ## Types +//! +//! There're a couple kinds of agents: +//! +//! #### Oneshot +//! +//! A kind of agent that for each input, a single output is returned. +//! +//! #### Reactor +//! +//! A kind of agent that can send many inputs and receive many outputs over a single bridge. +//! +//! #### Worker +//! +//! The low-level implementation of agents that provides an actor model and communicates with +//! multiple bridges. +//! +//! ## Reachability +//! +//! When an agent is spawned, each agent is associated with a reachability. +//! +//! #### Private +//! +//! Each time a bridge is created, a new instance +//! of agent is spawned. This allows parallel computing between agents. +//! +//! #### Public +//! +//! Public agents are shared among all children of a provider. +//! Only 1 instance will be spawned for each public agents provider. +//! +//! ### Provider +//! +//! Each Agent requires a provider to provide communications and maintain bridges. +//! All hooks must be called within a provider. +//! +//! ## Communications with Agents +//! +//! Hooks provides means to communicate with agent instances. +//! +//! #### Bridge +//! +//! See: [`use_worker_bridge`](worker::use_worker_bridge), +//! [`use_reactor_bridge`](reactor::use_reactor_bridge) +//! +//! A bridge takes a callback to receive outputs from agents +//! and provides a handle to send inputs to agents. +//! +//! #### Subscription +//! +//! See: [`use_worker_subscription`](worker::use_worker_subscription), +//! [`use_reactor_subscription`](reactor::use_reactor_subscription) +//! +//! Similar to bridges, a subscription produces a handle to send inputs to agents. However, instead +//! of notifying the receiver with a callback, it collect all outputs into a slice. +//! +//! #### Runner +//! +//! See: [`use_oneshot_runner`](oneshot::use_oneshot_runner) +//! +//! Unlike other agents, oneshot bridges provide a `use_oneshot_runner` hook to execute oneshot +//! agents on demand. -mod hooks; +#![deny( + clippy::all, + missing_docs, + missing_debug_implementations, + bare_trait_objects, + anonymous_parameters, + elided_lifetimes_in_paths +)] + +extern crate self as yew_agent; + +pub mod oneshot; +pub mod reactor; +pub mod worker; #[doc(inline)] -pub use gloo_worker::*; -pub use hooks::{use_bridge, UseBridgeHandle}; +pub use gloo_worker::{Bincode, Codec, Registrable, Spawnable}; + +mod reach; +pub mod scope_ext; + +pub use reach::Reach; + +mod utils; + +#[doc(hidden)] +pub mod __vendored { + pub use futures; +} + +pub mod prelude { + //! Prelude module to be imported when working with `yew-agent`. + //! + //! This module re-exports the frequently used types from the crate. + pub use crate::oneshot::{oneshot, use_oneshot_runner, UseOneshotRunnerHandle}; + pub use crate::reach::Reach; + pub use crate::reactor::{ + reactor, use_reactor_bridge, use_reactor_subscription, ReactorEvent, ReactorScope, + UseReactorBridgeHandle, UseReactorSubscriptionHandle, + }; + pub use crate::scope_ext::{AgentScopeExt, ReactorBridgeHandle, WorkerBridgeHandle}; + pub use crate::worker::{ + use_worker_bridge, use_worker_subscription, UseWorkerBridgeHandle, + UseWorkerSubscriptionHandle, WorkerScope, + }; + pub use crate::{Registrable, Spawnable}; +} diff --git a/packages/yew-agent/src/oneshot/hooks.rs b/packages/yew-agent/src/oneshot/hooks.rs new file mode 100644 index 00000000000..7e6437dc56a --- /dev/null +++ b/packages/yew-agent/src/oneshot/hooks.rs @@ -0,0 +1,54 @@ +use yew::prelude::*; + +use super::provider::OneshotProviderState; +use super::Oneshot; + +/// Hook handle for [`use_oneshot_runner`] +#[derive(Debug)] +pub struct UseOneshotRunnerHandle +where + T: Oneshot + 'static, +{ + state: OneshotProviderState, +} + +impl UseOneshotRunnerHandle +where + T: Oneshot + 'static, +{ + /// Runs an oneshot agent. + pub async fn run(&self, input: T::Input) -> T::Output { + self.state.create_bridge().run(input).await + } +} + +impl Clone for UseOneshotRunnerHandle +where + T: Oneshot + 'static, +{ + fn clone(&self) -> Self { + Self { + state: self.state.clone(), + } + } +} + +impl PartialEq for UseOneshotRunnerHandle +where + T: Oneshot, +{ + fn eq(&self, rhs: &Self) -> bool { + self.state == rhs.state + } +} + +/// A hook to create a runner to an oneshot agent. +#[hook] +pub fn use_oneshot_runner() -> UseOneshotRunnerHandle +where + T: Oneshot + 'static, +{ + let state = use_context::>().expect("failed to find worker context"); + + UseOneshotRunnerHandle { state } +} diff --git a/packages/yew-agent/src/oneshot/mod.rs b/packages/yew-agent/src/oneshot/mod.rs new file mode 100644 index 00000000000..d3c96bb050c --- /dev/null +++ b/packages/yew-agent/src/oneshot/mod.rs @@ -0,0 +1,12 @@ +//! This module provides task agent implementation. + +mod hooks; +mod provider; + +#[doc(inline)] +pub use gloo_worker::oneshot::{Oneshot, OneshotBridge, OneshotRegistrar, OneshotSpawner}; +pub use hooks::{use_oneshot_runner, UseOneshotRunnerHandle}; +pub use provider::OneshotProvider; +pub(crate) use provider::OneshotProviderState; +/// A procedural macro to create oneshot agents. +pub use yew_agent_macro::oneshot; diff --git a/packages/yew-agent/src/oneshot/provider.rs b/packages/yew-agent/src/oneshot/provider.rs new file mode 100644 index 00000000000..b56516669ef --- /dev/null +++ b/packages/yew-agent/src/oneshot/provider.rs @@ -0,0 +1,130 @@ +use core::fmt; +use std::any::type_name; +use std::cell::RefCell; +use std::rc::Rc; + +use serde::{Deserialize, Serialize}; +use yew::prelude::*; + +use super::{Oneshot, OneshotBridge, OneshotSpawner}; +use crate::utils::get_next_id; +use crate::worker::WorkerProviderProps; +use crate::{Bincode, Codec, Reach}; + +pub(crate) struct OneshotProviderState +where + T: Oneshot + 'static, +{ + id: usize, + spawn_bridge_fn: Rc OneshotBridge>, + reach: Reach, + held_bridge: Rc>>>, +} + +impl fmt::Debug for OneshotProviderState +where + T: Oneshot, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(type_name::()) + } +} + +impl OneshotProviderState +where + T: Oneshot, +{ + fn get_held_bridge(&self) -> OneshotBridge { + let mut held_bridge = self.held_bridge.borrow_mut(); + + match held_bridge.as_mut() { + Some(m) => m.fork(), + None => { + let bridge = (self.spawn_bridge_fn)(); + *held_bridge = Some(bridge.fork()); + bridge + } + } + } + + /// Creates a bridge, uses "fork" for public agents. + pub fn create_bridge(&self) -> OneshotBridge { + match self.reach { + Reach::Public => { + let held_bridge = self.get_held_bridge(); + held_bridge.fork() + } + Reach::Private => (self.spawn_bridge_fn)(), + } + } +} + +impl Clone for OneshotProviderState +where + T: Oneshot, +{ + fn clone(&self) -> Self { + Self { + id: self.id, + spawn_bridge_fn: self.spawn_bridge_fn.clone(), + reach: self.reach, + held_bridge: self.held_bridge.clone(), + } + } +} + +impl PartialEq for OneshotProviderState +where + T: Oneshot, +{ + fn eq(&self, rhs: &Self) -> bool { + self.id == rhs.id + } +} + +/// The Oneshot Agent Provider. +/// +/// This component provides its children access to an oneshot agent. +#[function_component] +pub fn OneshotProvider(props: &WorkerProviderProps) -> Html +where + T: Oneshot + 'static, + T::Input: Serialize + for<'de> Deserialize<'de> + 'static, + T::Output: Serialize + for<'de> Deserialize<'de> + 'static, + C: Codec + 'static, +{ + let WorkerProviderProps { + children, + path, + lazy, + reach, + } = props.clone(); + + // Creates a spawning function so Codec is can be erased from contexts. + let spawn_bridge_fn: Rc OneshotBridge> = { + let path = path.clone(); + Rc::new(move || OneshotSpawner::::new().encoding::().spawn(&path)) + }; + + let state = { + use_memo((path, lazy, reach), move |(_path, lazy, reach)| { + let state = OneshotProviderState:: { + id: get_next_id(), + spawn_bridge_fn, + reach: *reach, + held_bridge: Rc::default(), + }; + + if *reach == Reach::Public && !*lazy { + state.get_held_bridge(); + } + state + }) + }; + + html! { + > context={(*state).clone()}> + {children} + >> + } +} diff --git a/packages/yew-agent/src/reach.rs b/packages/yew-agent/src/reach.rs new file mode 100644 index 00000000000..1bab773a503 --- /dev/null +++ b/packages/yew-agent/src/reach.rs @@ -0,0 +1,8 @@ +/// The reachability of an agent. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Copy)] +pub enum Reach { + /// Public Reachability. + Public, + /// Private Reachability. + Private, +} diff --git a/packages/yew-agent/src/reactor/hooks.rs b/packages/yew-agent/src/reactor/hooks.rs new file mode 100644 index 00000000000..975c11abb24 --- /dev/null +++ b/packages/yew-agent/src/reactor/hooks.rs @@ -0,0 +1,280 @@ +use std::any::type_name; +use std::fmt; +use std::ops::Deref; +use std::rc::Rc; + +use futures::sink::SinkExt; +use futures::stream::{SplitSink, StreamExt}; +use wasm_bindgen::UnwrapThrowExt; +use yew::platform::pinned::RwLock; +use yew::platform::spawn_local; +use yew::prelude::*; + +use super::provider::ReactorProviderState; +use super::{Reactor, ReactorBridge, ReactorScoped}; +use crate::utils::{BridgeIdState, OutputsAction, OutputsState}; + +type ReactorTx = + Rc, <::Scope as ReactorScoped>::Input>>>; + +/// A type that represents events from a reactor. +pub enum ReactorEvent +where + R: Reactor, +{ + /// The reactor agent has sent an output. + Output(::Output), + /// The reactor agent has exited. + Finished, +} + +impl fmt::Debug for ReactorEvent +where + R: Reactor, + ::Output: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Output(m) => f.debug_tuple("ReactorEvent::Output").field(&m).finish(), + Self::Finished => f.debug_tuple("ReactorEvent::Finished").finish(), + } + } +} + +/// Hook handle for the [`use_reactor_bridge`] hook. +pub struct UseReactorBridgeHandle +where + R: 'static + Reactor, +{ + tx: ReactorTx, + ctr: UseReducerDispatcher, +} + +impl fmt::Debug for UseReactorBridgeHandle +where + R: 'static + Reactor, + ::Input: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct(type_name::()) + .field("inner", &self.tx) + .finish() + } +} + +impl Clone for UseReactorBridgeHandle +where + R: 'static + Reactor, +{ + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + ctr: self.ctr.clone(), + } + } +} + +impl UseReactorBridgeHandle +where + R: 'static + Reactor, +{ + /// Send an input to a reactor agent. + pub fn send(&self, msg: ::Input) { + let tx = self.tx.clone(); + spawn_local(async move { + let mut tx = tx.write().await; + let _ = tx.send(msg).await; + }); + } + + /// Reset the bridge. + /// + /// Disconnect the old bridge and re-connects the agent with a new bridge. + pub fn reset(&self) { + self.ctr.dispatch(()); + } +} + +impl PartialEq for UseReactorBridgeHandle +where + R: 'static + Reactor, +{ + fn eq(&self, rhs: &Self) -> bool { + self.ctr == rhs.ctr + } +} + +/// A hook to bridge to a [`Reactor`]. +/// +/// This hooks will only bridge the reactor once over the entire component lifecycle. +/// +/// Takes a callback as the argument. +/// +/// The callback will be updated on every render to make sure captured values (if any) are up to +/// date. +#[hook] +pub fn use_reactor_bridge(on_output: F) -> UseReactorBridgeHandle +where + R: 'static + Reactor, + F: Fn(ReactorEvent) + 'static, +{ + let ctr = use_reducer(BridgeIdState::default); + + let worker_state = use_context::>() + .expect_throw("cannot find a provider for current agent."); + + let on_output = Rc::new(on_output); + + let on_output_ref = { + let on_output = on_output.clone(); + use_mut_ref(move || on_output) + }; + + // Refresh the callback on every render. + { + let mut on_output_ref = on_output_ref.borrow_mut(); + *on_output_ref = on_output; + } + + let tx = use_memo((worker_state, ctr.inner), |(state, _ctr)| { + let bridge = state.create_bridge(); + + let (tx, mut rx) = bridge.split(); + + spawn_local(async move { + while let Some(m) = rx.next().await { + let on_output = on_output_ref.borrow().clone(); + on_output(ReactorEvent::::Output(m)); + } + + let on_output = on_output_ref.borrow().clone(); + on_output(ReactorEvent::::Finished); + }); + + RwLock::new(tx) + }); + + UseReactorBridgeHandle { + tx: tx.clone(), + ctr: ctr.dispatcher(), + } +} + +/// Hook handle for the [`use_reactor_subscription`] hook. +pub struct UseReactorSubscriptionHandle +where + R: 'static + Reactor, +{ + bridge: UseReactorBridgeHandle, + outputs: Vec::Output>>, + finished: bool, + ctr: usize, +} + +impl UseReactorSubscriptionHandle +where + R: 'static + Reactor, +{ + /// Send an input to a reactor agent. + pub fn send(&self, msg: ::Input) { + self.bridge.send(msg); + } + + /// Returns whether the current bridge has received a finish message. + pub fn finished(&self) -> bool { + self.finished + } + + /// Reset the subscription. + /// + /// This disconnects the old bridge and re-connects the agent with a new bridge. + /// Existing outputs stored in the subscription will also be cleared. + pub fn reset(&self) { + self.bridge.reset(); + } +} + +impl Clone for UseReactorSubscriptionHandle +where + R: 'static + Reactor, +{ + fn clone(&self) -> Self { + Self { + bridge: self.bridge.clone(), + outputs: self.outputs.clone(), + ctr: self.ctr, + finished: self.finished, + } + } +} + +impl fmt::Debug for UseReactorSubscriptionHandle +where + R: 'static + Reactor, + ::Input: fmt::Debug, + ::Output: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct(type_name::()) + .field("bridge", &self.bridge) + .field("outputs", &self.outputs) + .finish() + } +} + +impl Deref for UseReactorSubscriptionHandle +where + R: 'static + Reactor, +{ + type Target = [Rc<::Output>]; + + fn deref(&self) -> &Self::Target { + &self.outputs + } +} + +impl PartialEq for UseReactorSubscriptionHandle +where + R: 'static + Reactor, +{ + fn eq(&self, rhs: &Self) -> bool { + self.bridge == rhs.bridge && self.ctr == rhs.ctr + } +} + +/// A hook to subscribe to the outputs of a [Reactor] agent. +/// +/// All outputs sent to current bridge will be collected into a slice. +#[hook] +pub fn use_reactor_subscription() -> UseReactorSubscriptionHandle +where + R: 'static + Reactor, +{ + let outputs = use_reducer(OutputsState::<::Output>::default); + + let bridge = { + let outputs = outputs.clone(); + use_reactor_bridge::(move |output| { + outputs.dispatch(match output { + ReactorEvent::Output(m) => OutputsAction::Push(m.into()), + ReactorEvent::Finished => OutputsAction::Close, + }) + }) + }; + + { + let outputs = outputs.clone(); + use_effect_with(bridge.clone(), move |_| { + outputs.dispatch(OutputsAction::Reset); + + || {} + }); + } + + UseReactorSubscriptionHandle { + bridge, + outputs: outputs.inner.clone(), + ctr: outputs.ctr, + finished: outputs.closed, + } +} diff --git a/packages/yew-agent/src/reactor/mod.rs b/packages/yew-agent/src/reactor/mod.rs new file mode 100644 index 00000000000..cd97555abb8 --- /dev/null +++ b/packages/yew-agent/src/reactor/mod.rs @@ -0,0 +1,53 @@ +//! This module contains the reactor agent implementation. +//! +//! Reactor agents are agents that receive multiple inputs and send multiple outputs over a single +//! bridge. A reactor is defined as an async function that takes a [ReactorScope] +//! as the argument. +//! +//! The reactor scope is a stream that produces inputs from the bridge and a +//! sink that implements an additional send method to send outputs to the connected bridge. +//! When the bridge disconnects, the output stream and input sink will be closed. +//! +//! # Example +//! +//! ``` +//! # use serde::{Serialize, Deserialize}; +//! # #[derive(Serialize, Deserialize)] +//! # pub struct ReactorInput {} +//! # #[derive(Serialize, Deserialize)] +//! # pub struct ReactorOutput {} +//! # +//! use futures::sink::SinkExt; +//! use futures::stream::StreamExt; +//! use yew_agent::reactor::{reactor, ReactorScope}; +//! #[reactor(MyReactor)] +//! pub async fn my_reactor(mut scope: ReactorScope) { +//! while let Some(input) = scope.next().await { +//! // handles each input. +//! // ... +//! # let output = ReactorOutput { /* ... */ }; +//! +//! // sends output +//! if scope.send(output).await.is_err() { +//! // sender closed, the bridge is disconnected +//! break; +//! } +//! } +//! } +//! ``` + +mod hooks; +mod provider; + +#[doc(inline)] +pub use gloo_worker::reactor::{ + Reactor, ReactorBridge, ReactorRegistrar, ReactorScope, ReactorScoped, ReactorSpawner, +}; +pub use hooks::{ + use_reactor_bridge, use_reactor_subscription, ReactorEvent, UseReactorBridgeHandle, + UseReactorSubscriptionHandle, +}; +pub use provider::ReactorProvider; +pub(crate) use provider::ReactorProviderState; +/// A procedural macro to create reactor agents. +pub use yew_agent_macro::reactor; diff --git a/packages/yew-agent/src/reactor/provider.rs b/packages/yew-agent/src/reactor/provider.rs new file mode 100644 index 00000000000..03ec9008ac2 --- /dev/null +++ b/packages/yew-agent/src/reactor/provider.rs @@ -0,0 +1,133 @@ +use std::any::type_name; +use std::cell::RefCell; +use std::fmt; +use std::rc::Rc; + +use gloo_worker::reactor::ReactorScoped; +use serde::{Deserialize, Serialize}; +use yew::prelude::*; + +use super::{Reactor, ReactorBridge, ReactorSpawner}; +use crate::utils::get_next_id; +use crate::worker::WorkerProviderProps; +use crate::{Bincode, Codec, Reach}; + +pub(crate) struct ReactorProviderState +where + T: Reactor + 'static, +{ + id: usize, + spawn_bridge_fn: Rc ReactorBridge>, + reach: Reach, + held_bridge: Rc>>>, +} + +impl fmt::Debug for ReactorProviderState +where + T: Reactor, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(type_name::()) + } +} + +impl ReactorProviderState +where + T: Reactor, +{ + fn get_held_bridge(&self) -> ReactorBridge { + let mut held_bridge = self.held_bridge.borrow_mut(); + + match held_bridge.as_mut() { + Some(m) => m.fork(), + None => { + let bridge = (self.spawn_bridge_fn)(); + *held_bridge = Some(bridge.fork()); + bridge + } + } + } + + /// Creates a bridge, uses "fork" for public agents. + pub fn create_bridge(&self) -> ReactorBridge { + match self.reach { + Reach::Public => { + let held_bridge = self.get_held_bridge(); + held_bridge.fork() + } + Reach::Private => (self.spawn_bridge_fn)(), + } + } +} + +impl Clone for ReactorProviderState +where + T: Reactor, +{ + fn clone(&self) -> Self { + Self { + id: self.id, + spawn_bridge_fn: self.spawn_bridge_fn.clone(), + reach: self.reach, + held_bridge: self.held_bridge.clone(), + } + } +} + +impl PartialEq for ReactorProviderState +where + T: Reactor, +{ + fn eq(&self, rhs: &Self) -> bool { + self.id == rhs.id + } +} + +/// The Reactor Agent Provider. +/// +/// This component provides its children access to a reactor agent. +#[function_component] +pub fn ReactorProvider(props: &WorkerProviderProps) -> Html +where + R: 'static + Reactor, + <::Scope as ReactorScoped>::Input: + Serialize + for<'de> Deserialize<'de> + 'static, + <::Scope as ReactorScoped>::Output: + Serialize + for<'de> Deserialize<'de> + 'static, + C: Codec + 'static, +{ + let WorkerProviderProps { + children, + path, + lazy, + reach, + } = props.clone(); + + // Creates a spawning function so Codec is can be erased from contexts. + let spawn_bridge_fn: Rc ReactorBridge> = { + let path = path.clone(); + Rc::new(move || ReactorSpawner::::new().encoding::().spawn(&path)) + }; + + let state = { + use_memo((path, lazy, reach), move |(_path, lazy, reach)| { + let state = ReactorProviderState:: { + id: get_next_id(), + spawn_bridge_fn, + reach: *reach, + held_bridge: Rc::default(), + }; + + if *reach == Reach::Public && !*lazy { + state.get_held_bridge(); + } + state + }) + }; + + html! { + > context={(*state).clone()}> + {children} + >> + } +} diff --git a/packages/yew-agent/src/scope_ext.rs b/packages/yew-agent/src/scope_ext.rs new file mode 100644 index 00000000000..6120062f9c6 --- /dev/null +++ b/packages/yew-agent/src/scope_ext.rs @@ -0,0 +1,145 @@ +//! This module contains extensions to the component scope for agent access. + +use std::any::type_name; +use std::fmt; +use std::rc::Rc; + +use futures::stream::SplitSink; +use futures::{SinkExt, StreamExt}; +use wasm_bindgen::UnwrapThrowExt; +use yew::html::Scope; +use yew::platform::pinned::RwLock; +use yew::platform::spawn_local; +use yew::prelude::*; + +use crate::oneshot::{Oneshot, OneshotProviderState}; +use crate::reactor::{Reactor, ReactorBridge, ReactorEvent, ReactorProviderState, ReactorScoped}; +use crate::worker::{Worker, WorkerBridge, WorkerProviderState}; + +/// A Worker Bridge Handle. +#[derive(Debug)] +pub struct WorkerBridgeHandle +where + W: Worker, +{ + inner: WorkerBridge, +} + +impl WorkerBridgeHandle +where + W: Worker, +{ + /// Sends a message to the worker agent. + pub fn send(&self, input: W::Input) { + self.inner.send(input) + } +} + +type ReactorTx = + Rc, <::Scope as ReactorScoped>::Input>>>; + +/// A Reactor Bridge Handle. +pub struct ReactorBridgeHandle +where + R: Reactor + 'static, +{ + tx: ReactorTx, +} + +impl fmt::Debug for ReactorBridgeHandle +where + R: Reactor + 'static, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct(type_name::()).finish_non_exhaustive() + } +} + +impl ReactorBridgeHandle +where + R: Reactor + 'static, +{ + /// Sends a message to the reactor agent. + pub fn send(&self, input: ::Input) { + let tx = self.tx.clone(); + spawn_local(async move { + let mut tx = tx.write().await; + let _ = tx.send(input).await; + }); + } +} + +/// An extension to [`Scope`](yew::html::Scope) that provides communication mechanism to agents. +/// +/// You can access them on `ctx.link()` +pub trait AgentScopeExt { + /// Bridges to a Worker Agent. + fn bridge_worker(&self, callback: Callback) -> WorkerBridgeHandle + where + W: Worker + 'static; + + /// Bridges to a Reactor Agent. + fn bridge_reactor(&self, callback: Callback>) -> ReactorBridgeHandle + where + R: Reactor + 'static, + ::Output: 'static; + + /// Runs an oneshot in an Oneshot Agent. + fn run_oneshot(&self, input: T::Input, callback: Callback) + where + T: Oneshot + 'static; +} + +impl AgentScopeExt for Scope +where + COMP: Component, +{ + fn bridge_worker(&self, callback: Callback) -> WorkerBridgeHandle + where + W: Worker + 'static, + { + let inner = self + .context::>((|_| {}).into()) + .expect_throw("failed to bridge to agent.") + .0 + .create_bridge(callback); + + WorkerBridgeHandle { inner } + } + + fn bridge_reactor(&self, callback: Callback>) -> ReactorBridgeHandle + where + R: Reactor + 'static, + ::Output: 'static, + { + let (tx, mut rx) = self + .context::>((|_| {}).into()) + .expect_throw("failed to bridge to agent.") + .0 + .create_bridge() + .split(); + + spawn_local(async move { + while let Some(m) = rx.next().await { + callback.emit(ReactorEvent::::Output(m)); + } + + callback.emit(ReactorEvent::::Finished); + }); + + let tx = Rc::new(RwLock::new(tx)); + + ReactorBridgeHandle { tx } + } + + fn run_oneshot(&self, input: T::Input, callback: Callback) + where + T: Oneshot + 'static, + { + let (inner, _) = self + .context::>((|_| {}).into()) + .expect_throw("failed to bridge to agent."); + + spawn_local(async move { callback.emit(inner.create_bridge().run(input).await) }); + } +} diff --git a/packages/yew-agent/src/utils.rs b/packages/yew-agent/src/utils.rs new file mode 100644 index 00000000000..5dfefb020df --- /dev/null +++ b/packages/yew-agent/src/utils.rs @@ -0,0 +1,83 @@ +use std::rc::Rc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use yew::Reducible; + +/// Gets a unique worker id +pub(crate) fn get_next_id() -> usize { + static CTR: AtomicUsize = AtomicUsize::new(0); + + CTR.fetch_add(1, Ordering::SeqCst) +} + +#[derive(Default, PartialEq)] +pub(crate) struct BridgeIdState { + pub inner: usize, +} + +impl Reducible for BridgeIdState { + type Action = (); + + fn reduce(self: Rc, _: Self::Action) -> Rc { + Self { + inner: self.inner + 1, + } + .into() + } +} + +pub(crate) enum OutputsAction { + Push(Rc), + Close, + Reset, +} + +pub(crate) struct OutputsState { + pub ctr: usize, + pub inner: Vec>, + pub closed: bool, +} + +impl Clone for OutputsState { + fn clone(&self) -> Self { + Self { + ctr: self.ctr, + inner: self.inner.clone(), + closed: self.closed, + } + } +} + +impl Reducible for OutputsState { + type Action = OutputsAction; + + fn reduce(mut self: Rc, action: Self::Action) -> Rc { + { + let this = Rc::make_mut(&mut self); + this.ctr += 1; + + match action { + OutputsAction::Push(m) => this.inner.push(m), + OutputsAction::Close => { + this.closed = true; + } + OutputsAction::Reset => { + this.closed = false; + this.inner = Vec::new(); + } + } + } + + self + } +} + +impl Default for OutputsState { + fn default() -> Self { + Self { + ctr: 0, + inner: Vec::new(), + closed: false, + } + } +} diff --git a/packages/yew-agent/src/worker/hooks.rs b/packages/yew-agent/src/worker/hooks.rs new file mode 100644 index 00000000000..19745e64dfd --- /dev/null +++ b/packages/yew-agent/src/worker/hooks.rs @@ -0,0 +1,219 @@ +use std::any::type_name; +use std::fmt; +use std::ops::Deref; +use std::rc::Rc; + +use wasm_bindgen::prelude::*; +use yew::prelude::*; + +use crate::utils::{BridgeIdState, OutputsAction, OutputsState}; +use crate::worker::provider::WorkerProviderState; +use crate::worker::{Worker, WorkerBridge}; + +/// Hook handle for the [`use_worker_bridge`] hook. +pub struct UseWorkerBridgeHandle +where + T: Worker, +{ + inner: WorkerBridge, + ctr: UseReducerDispatcher, +} + +impl UseWorkerBridgeHandle +where + T: Worker, +{ + /// Send an input to a worker agent. + pub fn send(&self, msg: T::Input) { + self.inner.send(msg); + } + + /// Reset the bridge. + /// + /// Disconnect the old bridge and re-connects the agent with a new bridge. + pub fn reset(&self) { + self.ctr.dispatch(()); + } +} + +impl Clone for UseWorkerBridgeHandle +where + T: Worker, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + ctr: self.ctr.clone(), + } + } +} + +impl fmt::Debug for UseWorkerBridgeHandle +where + T: Worker, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct(type_name::()) + .field("inner", &self.inner) + .finish() + } +} + +impl PartialEq for UseWorkerBridgeHandle +where + T: Worker, +{ + fn eq(&self, rhs: &Self) -> bool { + self.inner == rhs.inner + } +} + +/// A hook to bridge to a [`Worker`]. +/// +/// This hooks will only bridge the worker once over the entire component lifecycle. +/// +/// Takes a callback as the argument. +/// +/// The callback will be updated on every render to make sure captured values (if any) are up to +/// date. +#[hook] +pub fn use_worker_bridge(on_output: F) -> UseWorkerBridgeHandle +where + T: Worker + 'static, + F: Fn(T::Output) + 'static, +{ + let ctr = use_reducer(BridgeIdState::default); + + let worker_state = use_context::>() + .expect_throw("cannot find a provider for current agent."); + + let on_output = Rc::new(on_output); + + let on_output_clone = on_output.clone(); + let on_output_ref = use_mut_ref(move || on_output_clone); + + // Refresh the callback on every render. + { + let mut on_output_ref = on_output_ref.borrow_mut(); + *on_output_ref = on_output; + } + + let bridge = use_memo((worker_state, ctr.inner), |(state, _ctr)| { + state.create_bridge(Callback::from(move |output| { + let on_output = on_output_ref.borrow().clone(); + on_output(output); + })) + }); + + UseWorkerBridgeHandle { + inner: (*bridge).clone(), + ctr: ctr.dispatcher(), + } +} + +/// Hook handle for the [`use_worker_subscription`] hook. +pub struct UseWorkerSubscriptionHandle +where + T: Worker, +{ + bridge: UseWorkerBridgeHandle, + outputs: Vec>, + ctr: usize, +} + +impl UseWorkerSubscriptionHandle +where + T: Worker, +{ + /// Send an input to a worker agent. + pub fn send(&self, msg: T::Input) { + self.bridge.send(msg); + } + + /// Reset the subscription. + /// + /// This disconnects the old bridge and re-connects the agent with a new bridge. + /// Existing outputs stored in the subscription will also be cleared. + pub fn reset(&self) { + self.bridge.reset(); + } +} + +impl Clone for UseWorkerSubscriptionHandle +where + T: Worker, +{ + fn clone(&self) -> Self { + Self { + bridge: self.bridge.clone(), + outputs: self.outputs.clone(), + ctr: self.ctr, + } + } +} + +impl fmt::Debug for UseWorkerSubscriptionHandle +where + T: Worker, + T::Output: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct(type_name::()) + .field("bridge", &self.bridge) + .field("outputs", &self.outputs) + .finish() + } +} + +impl Deref for UseWorkerSubscriptionHandle +where + T: Worker, +{ + type Target = [Rc]; + + fn deref(&self) -> &[Rc] { + &self.outputs + } +} + +impl PartialEq for UseWorkerSubscriptionHandle +where + T: Worker, +{ + fn eq(&self, rhs: &Self) -> bool { + self.bridge == rhs.bridge && self.ctr == rhs.ctr + } +} + +/// A hook to subscribe to the outputs of a [Worker] agent. +/// +/// All outputs sent to current bridge will be collected into a slice. +#[hook] +pub fn use_worker_subscription() -> UseWorkerSubscriptionHandle +where + T: Worker + 'static, +{ + let outputs = use_reducer(OutputsState::default); + + let bridge = { + let outputs = outputs.clone(); + use_worker_bridge::(move |output| { + outputs.dispatch(OutputsAction::Push(Rc::new(output))) + }) + }; + + { + let outputs_dispatcher = outputs.dispatcher(); + use_effect_with(bridge.clone(), move |_| { + outputs_dispatcher.dispatch(OutputsAction::Reset); + + || {} + }); + } + + UseWorkerSubscriptionHandle { + bridge, + outputs: outputs.inner.clone(), + ctr: outputs.ctr, + } +} diff --git a/packages/yew-agent/src/worker/mod.rs b/packages/yew-agent/src/worker/mod.rs new file mode 100644 index 00000000000..61603c62781 --- /dev/null +++ b/packages/yew-agent/src/worker/mod.rs @@ -0,0 +1,77 @@ +//! This module contains the worker agent implementation. +//! +//! This is a low-level implementation that uses an actor model. +//! +//! # Example +//! +//! ``` +//! # mod example { +//! use serde::{Deserialize, Serialize}; +//! use yew::prelude::*; +//! use yew_agent::worker::{use_worker_bridge, UseWorkerBridgeHandle}; +//! +//! // This would usually live in the same file as your worker +//! #[derive(Serialize, Deserialize)] +//! pub enum WorkerResponseType { +//! IncrementCounter, +//! } +//! # mod my_worker_mod { +//! # use yew_agent::worker::{HandlerId, WorkerScope}; +//! # use super::WorkerResponseType; +//! # pub struct MyWorker {} +//! # +//! # impl yew_agent::worker::Worker for MyWorker { +//! # type Input = (); +//! # type Output = WorkerResponseType; +//! # type Message = (); +//! # +//! # fn create(scope: &WorkerScope) -> Self { +//! # MyWorker {} +//! # } +//! # +//! # fn update(&mut self, scope: &WorkerScope, _msg: Self::Message) { +//! # // do nothing +//! # } +//! # +//! # fn received(&mut self, scope: &WorkerScope, _msg: Self::Input, id: HandlerId) { +//! # scope.respond(id, WorkerResponseType::IncrementCounter); +//! # } +//! # } +//! # } +//! use my_worker_mod::MyWorker; // note that ::Output == WorkerResponseType +//! #[function_component(UseWorkerBridge)] +//! fn bridge() -> Html { +//! let counter = use_state(|| 0); +//! +//! // a scoped block to clone the state in +//! { +//! let counter = counter.clone(); +//! // response will be of type MyWorker::Output, i.e. WorkerResponseType +//! let bridge: UseWorkerBridgeHandle = use_worker_bridge(move |response| match response { +//! WorkerResponseType::IncrementCounter => { +//! counter.set(*counter + 1); +//! } +//! }); +//! } +//! +//! html! { +//!
+//! {*counter} +//!
+//! } +//! } +//! # } +//! ``` + +mod hooks; +mod provider; + +#[doc(inline)] +pub use gloo_worker::{ + HandlerId, Worker, WorkerBridge, WorkerDestroyHandle, WorkerRegistrar, WorkerScope, +}; +pub use hooks::{ + use_worker_bridge, use_worker_subscription, UseWorkerBridgeHandle, UseWorkerSubscriptionHandle, +}; +pub(crate) use provider::WorkerProviderState; +pub use provider::{WorkerProvider, WorkerProviderProps}; diff --git a/packages/yew-agent/src/worker/provider.rs b/packages/yew-agent/src/worker/provider.rs new file mode 100644 index 00000000000..b452729d027 --- /dev/null +++ b/packages/yew-agent/src/worker/provider.rs @@ -0,0 +1,159 @@ +use std::any::type_name; +use std::cell::RefCell; +use std::fmt; +use std::rc::Rc; + +use gloo_worker::Spawnable; +use serde::{Deserialize, Serialize}; +use yew::prelude::*; + +use super::{Worker, WorkerBridge}; +use crate::reach::Reach; +use crate::utils::get_next_id; +use crate::{Bincode, Codec}; + +/// Properties for [WorkerProvider]. +#[derive(Debug, Properties, PartialEq, Clone)] +pub struct WorkerProviderProps { + /// The path to an agent. + pub path: AttrValue, + + /// The reachability of an agent. + /// + /// Default: [`Public`](Reach::Public). + #[prop_or(Reach::Public)] + pub reach: Reach, + + /// Lazily spawn the agent. + /// + /// The agent will be spawned when the first time a hook requests a bridge. + /// + /// Does not affect private agents. + /// + /// Default: `true` + #[prop_or(true)] + pub lazy: bool, + + /// Children of the provider. + #[prop_or_default] + pub children: Html, +} + +pub(crate) struct WorkerProviderState +where + W: Worker, +{ + id: usize, + spawn_bridge_fn: Rc WorkerBridge>, + reach: Reach, + held_bridge: Rc>>>, +} + +impl fmt::Debug for WorkerProviderState +where + W: Worker, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(type_name::()) + } +} + +impl WorkerProviderState +where + W: Worker, + W::Output: 'static, +{ + fn get_held_bridge(&self) -> WorkerBridge { + let mut held_bridge = self.held_bridge.borrow_mut(); + + match held_bridge.as_mut() { + Some(m) => m.clone(), + None => { + let bridge = (self.spawn_bridge_fn)(); + *held_bridge = Some(bridge.clone()); + bridge + } + } + } + + /// Creates a bridge, uses "fork" for public agents. + pub fn create_bridge(&self, cb: Callback) -> WorkerBridge { + match self.reach { + Reach::Public => { + let held_bridge = self.get_held_bridge(); + held_bridge.fork(Some(move |m| cb.emit(m))) + } + Reach::Private => (self.spawn_bridge_fn)(), + } + } +} + +impl Clone for WorkerProviderState +where + W: Worker, +{ + fn clone(&self) -> Self { + Self { + id: self.id, + spawn_bridge_fn: self.spawn_bridge_fn.clone(), + reach: self.reach, + held_bridge: self.held_bridge.clone(), + } + } +} + +impl PartialEq for WorkerProviderState +where + W: Worker, +{ + fn eq(&self, rhs: &Self) -> bool { + self.id == rhs.id + } +} + +/// The Worker Agent Provider. +/// +/// This component provides its children access to a worker agent. +#[function_component] +pub fn WorkerProvider(props: &WorkerProviderProps) -> Html +where + W: Worker + 'static, + W::Input: Serialize + for<'de> Deserialize<'de> + 'static, + W::Output: Serialize + for<'de> Deserialize<'de> + 'static, + C: Codec + 'static, +{ + let WorkerProviderProps { + children, + path, + lazy, + reach, + } = props.clone(); + + // Creates a spawning function so Codec is can be erased from contexts. + let spawn_bridge_fn: Rc WorkerBridge> = { + let path = path.clone(); + Rc::new(move || W::spawner().encoding::().spawn(&path)) + }; + + let state = { + use_memo((path, lazy, reach), move |(_path, lazy, reach)| { + let state = WorkerProviderState:: { + id: get_next_id(), + spawn_bridge_fn, + reach: *reach, + held_bridge: Rc::default(), + }; + + if *reach == Reach::Public && !*lazy { + state.get_held_bridge(); + } + state + }) + }; + + html! { + > context={(*state).clone()}> + {children} + >> + } +}