Skip to content

Commit

Permalink
Reorganize agents module (#1202)
Browse files Browse the repository at this point in the history
  • Loading branch information
jstarry authored May 10, 2020
1 parent d55765a commit 7a3f6db
Show file tree
Hide file tree
Showing 11 changed files with 1,148 additions and 1,088 deletions.
1,086 changes: 0 additions & 1,086 deletions yew/src/agent.rs

This file was deleted.

190 changes: 190 additions & 0 deletions yew/src/agent/link.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use super::*;
use crate::callback::Callback;
use crate::scheduler::{scheduler, Runnable, Shared};
use std::cell::RefCell;
use std::fmt;
use std::rc::Rc;

/// Defines communication from Worker to Consumers
pub(crate) trait Responder<AGN: Agent> {
/// Implementation for communication channel from Worker to Consumers
fn respond(&self, id: HandlerId, output: AGN::Output);
}

/// Link to agent's scope for creating callbacks.
pub struct AgentLink<AGN: Agent> {
scope: AgentScope<AGN>,
responder: Rc<dyn Responder<AGN>>,
}

impl<AGN: Agent> AgentLink<AGN> {
/// Create link for a scope.
pub(crate) fn connect<T>(scope: &AgentScope<AGN>, responder: T) -> Self
where
T: Responder<AGN> + 'static,
{
AgentLink {
scope: scope.clone(),
responder: Rc::new(responder),
}
}

/// Send response to an agent.
pub fn respond(&self, id: HandlerId, output: AGN::Output) {
self.responder.respond(id, output);
}

/// Create a callback which will send a message to the agent when invoked.
pub fn callback<F, IN>(&self, function: F) -> Callback<IN>
where
F: Fn(IN) -> AGN::Message + 'static,
{
let scope = self.scope.clone();
let closure = move |input| {
let output = function(input);
scope.send(AgentLifecycleEvent::Message(output));
};
closure.into()
}
}

impl<AGN: Agent> fmt::Debug for AgentLink<AGN> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("AgentLink<_>")
}
}

impl<AGN: Agent> Clone for AgentLink<AGN> {
fn clone(&self) -> Self {
AgentLink {
scope: self.scope.clone(),
responder: self.responder.clone(),
}
}
}
/// This struct holds a reference to a component and to a global scheduler.
pub(crate) struct AgentScope<AGN: Agent> {
shared_agent: Shared<AgentRunnable<AGN>>,
}

impl<AGN: Agent> fmt::Debug for AgentScope<AGN> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("AgentScope<_>")
}
}

impl<AGN: Agent> Clone for AgentScope<AGN> {
fn clone(&self) -> Self {
AgentScope {
shared_agent: self.shared_agent.clone(),
}
}
}

impl<AGN: Agent> AgentScope<AGN> {
/// Create agent scope
pub fn new() -> Self {
let shared_agent = Rc::new(RefCell::new(AgentRunnable::new()));
AgentScope { shared_agent }
}
/// Schedule message for sending to agent
pub fn send(&self, update: AgentLifecycleEvent<AGN>) {
let envelope = AgentEnvelope {
shared_agent: self.shared_agent.clone(),
update,
};
let runnable: Box<dyn Runnable> = Box::new(envelope);
scheduler().push(runnable);
}
}

impl<AGN: Agent> Default for AgentScope<AGN> {
fn default() -> Self {
Self::new()
}
}

struct AgentRunnable<AGN> {
agent: Option<AGN>,
// TODO(#939): Use agent field to control create message this flag
destroyed: bool,
}

impl<AGN> AgentRunnable<AGN> {
fn new() -> Self {
AgentRunnable {
agent: None,
destroyed: false,
}
}
}

/// Local Agent messages
#[derive(Debug)]
pub(crate) enum AgentLifecycleEvent<AGN: Agent> {
/// Request to create link
Create(AgentLink<AGN>),
/// Internal Agent message
Message(AGN::Message),
/// Client connected
Connected(HandlerId),
/// Received mesasge from Client
Input(AGN::Input, HandlerId),
/// Client disconnected
Disconnected(HandlerId),
/// Request to destroy agent
Destroy,
}

struct AgentEnvelope<AGN: Agent> {
shared_agent: Shared<AgentRunnable<AGN>>,
update: AgentLifecycleEvent<AGN>,
}

impl<AGN> Runnable for AgentEnvelope<AGN>
where
AGN: Agent,
{
fn run(self: Box<Self>) {
let mut this = self.shared_agent.borrow_mut();
if this.destroyed {
return;
}
match self.update {
AgentLifecycleEvent::Create(link) => {
this.agent = Some(AGN::create(link));
}
AgentLifecycleEvent::Message(msg) => {
this.agent
.as_mut()
.expect("agent was not created to process messages")
.update(msg);
}
AgentLifecycleEvent::Connected(id) => {
this.agent
.as_mut()
.expect("agent was not created to send a connected message")
.connected(id);
}
AgentLifecycleEvent::Input(inp, id) => {
this.agent
.as_mut()
.expect("agent was not created to process inputs")
.handle_input(inp, id);
}
AgentLifecycleEvent::Disconnected(id) => {
this.agent
.as_mut()
.expect("agent was not created to send a disconnected message")
.disconnected(id);
}
AgentLifecycleEvent::Destroy => {
let mut agent = this
.agent
.take()
.expect("trying to destroy not existent agent");
agent.destroy();
}
}
}
}
133 changes: 133 additions & 0 deletions yew/src/agent/local/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use super::*;
use crate::callback::Callback;
use crate::scheduler::Shared;
use anymap::{self, AnyMap};
use slab::Slab;
use std::cell::RefCell;
use std::rc::Rc;

thread_local! {
static LOCAL_AGENTS_POOL: RefCell<AnyMap> = RefCell::new(AnyMap::new());
}

/// Create a single instance in the current thread.
#[allow(missing_debug_implementations)]
pub struct Context;

impl Discoverer for Context {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let mut scope_to_init = None;
let bridge = LOCAL_AGENTS_POOL.with(|pool| {
let mut pool = pool.borrow_mut();
match pool.entry::<LocalAgent<AGN>>() {
anymap::Entry::Occupied(mut entry) => entry.get_mut().create_bridge(callback),
anymap::Entry::Vacant(entry) => {
let scope = AgentScope::<AGN>::new();
let launched = LocalAgent::new(&scope);
let responder = SlabResponder {
slab: launched.slab(),
};
scope_to_init = Some((scope, responder));
entry.insert(launched).create_bridge(callback)
}
}
});
if let Some((scope, responder)) = scope_to_init {
let agent_link = AgentLink::connect(&scope, responder);
let upd = AgentLifecycleEvent::Create(agent_link);
scope.send(upd);
}
let upd = AgentLifecycleEvent::Connected(bridge.id);
bridge.scope.send(upd);
Box::new(bridge)
}
}

struct SlabResponder<AGN: Agent> {
slab: Shared<Slab<Option<Callback<AGN::Output>>>>,
}

impl<AGN: Agent> Responder<AGN> for SlabResponder<AGN> {
fn respond(&self, id: HandlerId, output: AGN::Output) {
locate_callback_and_respond::<AGN>(&self.slab, id, output);
}
}

impl Dispatchable for Context {}

struct ContextBridge<AGN: Agent> {
scope: AgentScope<AGN>,
id: HandlerId,
}

impl<AGN: Agent> Bridge<AGN> for ContextBridge<AGN> {
fn send(&mut self, msg: AGN::Input) {
let upd = AgentLifecycleEvent::Input(msg, self.id);
self.scope.send(upd);
}
}

impl<AGN: Agent> Drop for ContextBridge<AGN> {
fn drop(&mut self) {
let terminate_worker = LOCAL_AGENTS_POOL.with(|pool| {
let mut pool = pool.borrow_mut();
let terminate_worker = {
if let Some(launched) = pool.get_mut::<LocalAgent<AGN>>() {
launched.remove_bridge(self)
} else {
false
}
};

if terminate_worker {
pool.remove::<LocalAgent<AGN>>();
}

terminate_worker
});

let upd = AgentLifecycleEvent::Disconnected(self.id);
self.scope.send(upd);

if terminate_worker {
let upd = AgentLifecycleEvent::Destroy;
self.scope.send(upd);
}
}
}

struct LocalAgent<AGN: Agent> {
scope: AgentScope<AGN>,
slab: SharedOutputSlab<AGN>,
}

impl<AGN: Agent> LocalAgent<AGN> {
pub fn new(scope: &AgentScope<AGN>) -> Self {
let slab = Rc::new(RefCell::new(Slab::new()));
LocalAgent {
scope: scope.clone(),
slab,
}
}

fn slab(&self) -> SharedOutputSlab<AGN> {
self.slab.clone()
}

fn create_bridge(&mut self, callback: Option<Callback<AGN::Output>>) -> ContextBridge<AGN> {
let respondable = callback.is_some();
let mut slab = self.slab.borrow_mut();
let id: usize = slab.insert(callback);
let id = HandlerId::new(id, respondable);
ContextBridge {
scope: self.scope.clone(),
id,
}
}

fn remove_bridge(&mut self, bridge: &ContextBridge<AGN>) -> Last {
let mut slab = self.slab.borrow_mut();
let _ = slab.remove(bridge.id.raw_id());
slab.is_empty()
}
}
54 changes: 54 additions & 0 deletions yew/src/agent/local/job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use super::*;
use crate::callback::Callback;

const SINGLETON_ID: HandlerId = HandlerId(0, true);

/// Create an instance in the current thread.
#[allow(missing_debug_implementations)]
pub struct Job;

impl Discoverer for Job {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let callback = callback.expect("Callback required for Job");
let scope = AgentScope::<AGN>::new();
let responder = CallbackResponder { callback };
let agent_link = AgentLink::connect(&scope, responder);
let upd = AgentLifecycleEvent::Create(agent_link);
scope.send(upd);
let upd = AgentLifecycleEvent::Connected(SINGLETON_ID);
scope.send(upd);
let bridge = JobBridge { scope };
Box::new(bridge)
}
}

struct JobBridge<AGN: Agent> {
scope: AgentScope<AGN>,
}

impl<AGN: Agent> Bridge<AGN> for JobBridge<AGN> {
fn send(&mut self, msg: AGN::Input) {
let upd = AgentLifecycleEvent::Input(msg, SINGLETON_ID);
self.scope.send(upd);
}
}

impl<AGN: Agent> Drop for JobBridge<AGN> {
fn drop(&mut self) {
let upd = AgentLifecycleEvent::Disconnected(SINGLETON_ID);
self.scope.send(upd);
let upd = AgentLifecycleEvent::Destroy;
self.scope.send(upd);
}
}

struct CallbackResponder<AGN: Agent> {
callback: Callback<AGN::Output>,
}

impl<AGN: Agent> Responder<AGN> for CallbackResponder<AGN> {
fn respond(&self, id: HandlerId, output: AGN::Output) {
assert_eq!(id.raw_id(), SINGLETON_ID.raw_id());
self.callback.emit(output);
}
}
7 changes: 7 additions & 0 deletions yew/src/agent/local/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod context;
mod job;

use super::*;

pub use context::Context;
pub use job::Job;
Loading

0 comments on commit 7a3f6db

Please sign in to comment.