Skip to content

Commit

Permalink
Early msgs queue for Public worker (yewstack#596)
Browse files Browse the repository at this point in the history
* Early msgs queue for Public worker

* Early msgs queue for Public worker

* update (#1)

* Fix typo

* Require fmt in travis script

* Apply cargo format to all modules

* ??

* Merge (yewstack#2)

* Fix typo

* Require fmt in travis script

* Apply cargo format to all modules

* avoid allocating in diff_classes

* avoid allocating for diff_kind

* avoid allocating for diff_value

* simplify diff_attributes and avoid allocations

* return iterator for diff_classes and diff_attributes

* rustfmt on vtags

* clean apply_diff

* more cleaning

* apply suggestions

* Update proc-macro2, syn and quote to 1.0

CLOSES yewstack#590

* Fixed typo

* Add support for optional callbacks to component properties

* Add tests for a component with optional callback

* Fix typo

* Add `Classes` to prelude

* binary ser/de issue fix

* merge (yewstack#3)

* Fix typo

* Require fmt in travis script

* Apply cargo format to all modules

* avoid allocating in diff_classes

* avoid allocating for diff_kind

* avoid allocating for diff_value

* simplify diff_attributes and avoid allocations

* return iterator for diff_classes and diff_attributes

* rustfmt on vtags

* clean apply_diff

* more cleaning

* apply suggestions

* Update proc-macro2, syn and quote to 1.0

CLOSES yewstack#590

* Fixed typo

* Add support for optional callbacks to component properties

* Add tests for a component with optional callback

* Fix typo

* Add `Classes` to prelude

* added bincode type for data ser de

* fixed Into func

* Update (yewstack#5)

* in yewstack org

* Initial implementation using an iterator adaptor to provide a coherent struct to implement Into<VNode> (via From<>) for

* update large table example to demonstrate new .html() method instead of 'for'

* ran cargo fmt

* Add a section for project templates to the README

* Change org to YewStack

* Implement FromIterator instead of wrapping iterator

* remove dead code

* ran fmt

* Add extend method to Classes

* change to union

* renamed union back to extend

* removed unused import of RangeFull

* update

* Fix touch events (yewstack#656)

* Update changelog for v0.9 release (yewstack#657)

* Implement Debug for ChildRenderer<T> (yewstack#655)

* Implement Debug for ChildRenderer<T>

* fix formatter type lifetime

* remove fmt

* cargo fmt

* Emit initial route to router subscribers (yewstack#634)

* Fix typo in RenderService (yewstack#658)

* Add From<&String> for Classes implementation

* @jstarry feedback

- cargo fmt
- rename DEDICATED_WORKERS_* to REMOTE_AGENTS_*
- remove unrelated changes

* TypeId ask key instead &str

* Remove .gitignore changes

* Update agent.rs

* Update agent.rs

* Fix merge conflict
  • Loading branch information
serzhiio authored and llebout committed Jan 20, 2020
1 parent 437b77e commit d47c338
Showing 1 changed file with 73 additions and 24 deletions.
97 changes: 73 additions & 24 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
use crate::callback::Callback;
use crate::scheduler::{scheduler, Runnable, Shared};
use anymap::{AnyMap, Entry};
use anymap::{self, AnyMap};
use bincode;
use log::warn;
use serde::{Deserialize, Serialize};
use slab::Slab;
use std::any::TypeId;
use std::cell::RefCell;
use std::collections::{hash_map, HashMap, HashSet};
use std::fmt;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
Expand Down Expand Up @@ -265,11 +267,11 @@ impl Discoverer for Context {
let mut scope_to_init = None;
let bridge = LOCAL_AGENTS_POOL.with(|pool| {
match pool.borrow_mut().entry::<LocalAgent<AGN>>() {
Entry::Occupied(mut entry) => {
anymap::Entry::Occupied(mut entry) => {
// TODO Insert callback!
entry.get_mut().create_bridge(callback)
}
Entry::Vacant(entry) => {
anymap::Entry::Vacant(entry) => {
let scope = AgentScope::<AGN>::new();
let launched = LocalAgent::new(&scope);
let responder = SlabResponder {
Expand Down Expand Up @@ -474,11 +476,8 @@ struct RemoteAgent<AGN: Agent> {
}

impl<AGN: Agent> RemoteAgent<AGN> {
pub fn new(worker: &Value, slab: SharedOutputSlab<AGN>) -> Self {
RemoteAgent {
worker: worker.clone(),
slab,
}
pub fn new(worker: Value, slab: SharedOutputSlab<AGN>) -> Self {
RemoteAgent { worker, slab }
}

fn create_bridge(&mut self, callback: Option<Callback<AGN::Output>>) -> PublicBridge<AGN> {
Expand All @@ -501,6 +500,8 @@ impl<AGN: Agent> RemoteAgent<AGN> {

thread_local! {
static REMOTE_AGENTS_POOL: RefCell<AnyMap> = RefCell::new(AnyMap::new());
static REMOTE_AGENTS_LOADED: RefCell<HashSet<TypeId>> = RefCell::new(HashSet::new());
static REMOTE_AGENTS_EARLY_MSGS_QUEUE: RefCell<HashMap<TypeId, Vec<Vec<u8>>>> = RefCell::new(HashMap::new());
}

/// Create a single instance in a tab.
Expand All @@ -510,23 +511,37 @@ impl Discoverer for Public {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let bridge = REMOTE_AGENTS_POOL.with(|pool| {
match pool.borrow_mut().entry::<RemoteAgent<AGN>>() {
Entry::Occupied(mut entry) => {
anymap::Entry::Occupied(mut entry) => {
// TODO Insert callback!
entry.get_mut().create_bridge(callback)
}
Entry::Vacant(entry) => {
let slab_base: Shared<Slab<Option<Callback<AGN::Output>>>> =
anymap::Entry::Vacant(entry) => {
let slab: Shared<Slab<Option<Callback<AGN::Output>>>> =
Rc::new(RefCell::new(Slab::new()));
let slab = slab_base.clone();
let handler = move |data: Vec<u8>| {
let msg = FromWorker::<AGN::Output>::unpack(&data);
match msg {
FromWorker::WorkerLoaded => {
// TODO Use `AtomicBool` lock to check its loaded
// TODO Send `Connected` message
}
FromWorker::ProcessOutput(id, output) => {
locate_callback_and_respond::<AGN>(&slab, id, output);
let handler = {
let slab = slab.clone();
move |data: Vec<u8>, worker: Value| {
let msg = FromWorker::<AGN::Output>::unpack(&data);
match msg {
FromWorker::WorkerLoaded => {
// TODO Send `Connected` message
let _ = REMOTE_AGENTS_LOADED.with(|local| {
local.borrow_mut().insert(TypeId::of::<AGN>())
});
REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|local| {
if let Some(msgs) =
local.borrow_mut().get_mut(&TypeId::of::<AGN>())
{
for msg in msgs.drain(..) {
let worker = &worker;
js! {@{worker}.postMessage(@{msg});};
}
}
});
}
FromWorker::ProcessOutput(id, output) => {
locate_callback_and_respond::<AGN>(&slab, id, output);
}
}
}
};
Expand All @@ -535,11 +550,11 @@ impl Discoverer for Public {
var worker = new Worker(@{name_of_resource});
var handler = @{handler};
worker.onmessage = function(event) {
handler(event.data);
handler(event.data, worker);
};
return worker;
};
let launched = RemoteAgent::new(&worker, slab_base);
let launched = RemoteAgent::new(worker, slab);
entry.insert(launched).create_bridge(callback)
}
}
Expand All @@ -557,6 +572,29 @@ pub struct PublicBridge<T: Agent> {
_agent: PhantomData<T>,
}

impl<AGN: Agent> PublicBridge<AGN> {
fn worker_is_loaded(&self) -> bool {
REMOTE_AGENTS_LOADED.with(|local| local.borrow().contains(&TypeId::of::<AGN>()))
}

fn msg_to_queue(&self, msg: Vec<u8>) {
REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|local| {
match local.borrow_mut().entry(TypeId::of::<AGN>()) {
hash_map::Entry::Vacant(record) => {
record.insert({
let mut v = Vec::new();
v.push(msg);
v
});
}
hash_map::Entry::Occupied(ref mut record) => {
record.get_mut().push(msg);
}
}
});
}
}

fn send_to_remote<AGN: Agent>(worker: &Value, msg: ToWorker<AGN::Input>) {
// TODO Important! Implement.
// Use a queue to collect a messages if an instance is not ready
Expand All @@ -572,7 +610,12 @@ fn send_to_remote<AGN: Agent>(worker: &Value, msg: ToWorker<AGN::Input>) {
impl<AGN: Agent> Bridge<AGN> for PublicBridge<AGN> {
fn send(&mut self, msg: AGN::Input) {
let msg = ToWorker::ProcessInput(self.id, msg);
send_to_remote::<AGN>(&self.worker, msg);
if self.worker_is_loaded() {
send_to_remote::<AGN>(&self.worker, msg);
} else {
let msg = msg.pack();
self.msg_to_queue(msg);
}
}
}

Expand All @@ -592,6 +635,12 @@ impl<AGN: Agent> Drop for PublicBridge<AGN> {
let upd = ToWorker::Destroy;
send_to_remote::<AGN>(&self.worker, upd);
pool.borrow_mut().remove::<RemoteAgent<AGN>>();
REMOTE_AGENTS_LOADED.with(|pool| {
pool.borrow_mut().remove(&TypeId::of::<AGN>());
});
REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|pool| {
pool.borrow_mut().remove(&TypeId::of::<AGN>());
});
}
});
}
Expand Down

0 comments on commit d47c338

Please sign in to comment.