Skip to content

Commit

Permalink
Send Connected and Disconnected messages for Job and Context
Browse files Browse the repository at this point in the history
  • Loading branch information
therustmonk committed Jun 9, 2018
1 parent 9784dae commit f84cd87
Showing 1 changed file with 34 additions and 5 deletions.
39 changes: 34 additions & 5 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ impl Discoverer for Context {
let upd = AgentUpdate::Create(agent_link);
scope.send(upd);
}
let upd = AgentUpdate::Connected(HandlerId(bridge.id));
bridge.scope.send(upd);
Box::new(bridge)
}
}
Expand Down Expand Up @@ -245,7 +247,11 @@ impl<AGN: Agent> Drop for ContextBridge<AGN> {
false
}
};
let upd = AgentUpdate::Connected(HandlerId(self.id));
self.scope.send(upd);
if terminate_worker {
let upd = AgentUpdate::Destroy;
self.scope.send(upd);
pool.borrow_mut().remove::<LaunchedAgent<AGN>>();
}
});
Expand All @@ -262,20 +268,22 @@ impl Discoverer for Job {
let agent_link = AgentLink::connect(&scope, responder);
let upd = AgentUpdate::Create(agent_link);
scope.send(upd);
let upd = AgentUpdate::Connected(JOB_SINGLE_ID);
scope.send(upd);
let bridge = JobBridge { scope };
Box::new(bridge)
}
}

const JOB_SINGLE_ID: usize = 0;
const JOB_SINGLE_ID: HandlerId = HandlerId(0);

struct CallbackResponder<AGN: Agent> {
callback: Callback<AGN::Output>,
}

impl<AGN: Agent> Responder<AGN> for CallbackResponder<AGN> {
fn response(&self, id: usize, output: AGN::Output) {
assert_eq!(id, JOB_SINGLE_ID);
assert_eq!(id, JOB_SINGLE_ID.0);
self.callback.emit(output);
}
}
Expand All @@ -286,13 +294,15 @@ struct JobBridge<AGN: Agent> {

impl<AGN: Agent> Bridge<AGN> for JobBridge<AGN> {
fn send(&self, msg: AGN::Input) {
let upd = AgentUpdate::Input(msg, HandlerId(JOB_SINGLE_ID));
let upd = AgentUpdate::Input(msg, JOB_SINGLE_ID);
self.scope.send(upd);
}
}

impl<AGN: Agent> Drop for JobBridge<AGN> {
fn drop(&mut self) {
let upd = AgentUpdate::Disconnected(JOB_SINGLE_ID);
self.scope.send(upd);
let upd = AgentUpdate::Destroy;
self.scope.send(upd);
}
Expand Down Expand Up @@ -338,6 +348,7 @@ impl Discoverer for Public {
},
FromWorker::TypeDetected => {
info!("Worker handshake finished");
// TODO Send `AgetUpdate::Connected(_)` message
},
FromWorker::ProcessOutput(id, data) => {
let msg = AGN::Output::unpack(&data);
Expand Down Expand Up @@ -386,12 +397,18 @@ pub trait Agent: Sized + 'static {
/// Creates an instance of an agent.
fn create(link: AgentLink<Self>) -> Self;

/// This metthod called on every update message.
/// This method called on every update message.
fn update(&mut self, msg: Self::Message);

/// This metthod called on every incoming message.
/// This method called on when a new bridge created.
fn connected(&mut self, id: HandlerId) { }

/// This method called on every incoming message.
fn handle(&mut self, msg: Self::Input, id: HandlerId);

/// This method called on when a new bridge destroyed.
fn disconnected(&mut self, id: HandlerId) { }

/// Creates an instance of an agent.
fn destroy(&mut self) { }

Expand Down Expand Up @@ -590,7 +607,9 @@ impl<AGN> AgentRunnable<AGN> {
enum AgentUpdate<AGN: Agent> {
Create(AgentLink<AGN>),
Message(AGN::Message),
Connected(HandlerId),
Input(AGN::Input, HandlerId),
Disconnected(HandlerId),
Destroy,
}

Expand Down Expand Up @@ -618,11 +637,21 @@ where
.expect("agent was not created to process messages")
.update(msg);
}
AgentUpdate::Connected(id) => {
this.agent.as_mut()
.expect("agent was not created to send a connected message")
.connected(id);
}
AgentUpdate::Input(inp, id) => {
this.agent.as_mut()
.expect("agent was not created to process inputs")
.handle(inp, id);
}
AgentUpdate::Disconnected(id) => {
this.agent.as_mut()
.expect("agent was not created to send a disconnected message")
.disconnected(id);
}
AgentUpdate::Destroy => {
let mut agent = this.agent.take()
.expect("trying to destroy not existent agent");
Expand Down

0 comments on commit f84cd87

Please sign in to comment.