diff --git a/src/agent.rs b/src/agent.rs index 6abac09ef22..67184f820ba 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -2,12 +2,14 @@ use crate::callback::Callback; use crate::scheduler::{scheduler, Runnable, Shared}; -use anymap::{AnyMap, Entry}; +use anymap::{self, AnyMap}; use bincode; use log::warn; use serde::{Deserialize, Serialize}; use slab::Slab; +use std::any::TypeId; use std::cell::RefCell; +use std::collections::{hash_map, HashMap, HashSet}; use std::fmt; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; @@ -265,11 +267,11 @@ impl Discoverer for Context { let mut scope_to_init = None; let bridge = LOCAL_AGENTS_POOL.with(|pool| { match pool.borrow_mut().entry::>() { - Entry::Occupied(mut entry) => { + anymap::Entry::Occupied(mut entry) => { // TODO Insert callback! entry.get_mut().create_bridge(callback) } - Entry::Vacant(entry) => { + anymap::Entry::Vacant(entry) => { let scope = AgentScope::::new(); let launched = LocalAgent::new(&scope); let responder = SlabResponder { @@ -474,11 +476,8 @@ struct RemoteAgent { } impl RemoteAgent { - pub fn new(worker: &Value, slab: SharedOutputSlab) -> Self { - RemoteAgent { - worker: worker.clone(), - slab, - } + pub fn new(worker: Value, slab: SharedOutputSlab) -> Self { + RemoteAgent { worker, slab } } fn create_bridge(&mut self, callback: Option>) -> PublicBridge { @@ -501,6 +500,8 @@ impl RemoteAgent { thread_local! { static REMOTE_AGENTS_POOL: RefCell = RefCell::new(AnyMap::new()); + static REMOTE_AGENTS_LOADED: RefCell> = RefCell::new(HashSet::new()); + static REMOTE_AGENTS_EARLY_MSGS_QUEUE: RefCell>>> = RefCell::new(HashMap::new()); } /// Create a single instance in a tab. @@ -510,23 +511,37 @@ impl Discoverer for Public { fn spawn_or_join(callback: Option>) -> Box> { let bridge = REMOTE_AGENTS_POOL.with(|pool| { match pool.borrow_mut().entry::>() { - Entry::Occupied(mut entry) => { + anymap::Entry::Occupied(mut entry) => { // TODO Insert callback! entry.get_mut().create_bridge(callback) } - Entry::Vacant(entry) => { - let slab_base: Shared>>> = + anymap::Entry::Vacant(entry) => { + let slab: Shared>>> = Rc::new(RefCell::new(Slab::new())); - let slab = slab_base.clone(); - let handler = move |data: Vec| { - let msg = FromWorker::::unpack(&data); - match msg { - FromWorker::WorkerLoaded => { - // TODO Use `AtomicBool` lock to check its loaded - // TODO Send `Connected` message - } - FromWorker::ProcessOutput(id, output) => { - locate_callback_and_respond::(&slab, id, output); + let handler = { + let slab = slab.clone(); + move |data: Vec, worker: Value| { + let msg = FromWorker::::unpack(&data); + match msg { + FromWorker::WorkerLoaded => { + // TODO Send `Connected` message + let _ = REMOTE_AGENTS_LOADED.with(|local| { + local.borrow_mut().insert(TypeId::of::()) + }); + REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|local| { + if let Some(msgs) = + local.borrow_mut().get_mut(&TypeId::of::()) + { + for msg in msgs.drain(..) { + let worker = &worker; + js! {@{worker}.postMessage(@{msg});}; + } + } + }); + } + FromWorker::ProcessOutput(id, output) => { + locate_callback_and_respond::(&slab, id, output); + } } } }; @@ -535,11 +550,11 @@ impl Discoverer for Public { var worker = new Worker(@{name_of_resource}); var handler = @{handler}; worker.onmessage = function(event) { - handler(event.data); + handler(event.data, worker); }; return worker; }; - let launched = RemoteAgent::new(&worker, slab_base); + let launched = RemoteAgent::new(worker, slab); entry.insert(launched).create_bridge(callback) } } @@ -557,6 +572,29 @@ pub struct PublicBridge { _agent: PhantomData, } +impl PublicBridge { + fn worker_is_loaded(&self) -> bool { + REMOTE_AGENTS_LOADED.with(|local| local.borrow().contains(&TypeId::of::())) + } + + fn msg_to_queue(&self, msg: Vec) { + REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|local| { + match local.borrow_mut().entry(TypeId::of::()) { + hash_map::Entry::Vacant(record) => { + record.insert({ + let mut v = Vec::new(); + v.push(msg); + v + }); + } + hash_map::Entry::Occupied(ref mut record) => { + record.get_mut().push(msg); + } + } + }); + } +} + fn send_to_remote(worker: &Value, msg: ToWorker) { // TODO Important! Implement. // Use a queue to collect a messages if an instance is not ready @@ -572,7 +610,12 @@ fn send_to_remote(worker: &Value, msg: ToWorker) { impl Bridge for PublicBridge { fn send(&mut self, msg: AGN::Input) { let msg = ToWorker::ProcessInput(self.id, msg); - send_to_remote::(&self.worker, msg); + if self.worker_is_loaded() { + send_to_remote::(&self.worker, msg); + } else { + let msg = msg.pack(); + self.msg_to_queue(msg); + } } } @@ -592,6 +635,12 @@ impl Drop for PublicBridge { let upd = ToWorker::Destroy; send_to_remote::(&self.worker, upd); pool.borrow_mut().remove::>(); + REMOTE_AGENTS_LOADED.with(|pool| { + pool.borrow_mut().remove(&TypeId::of::()); + }); + REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|pool| { + pool.borrow_mut().remove(&TypeId::of::()); + }); } }); }