diff --git a/examples/minimal-client-server/src/main.rs b/examples/minimal-client-server/src/main.rs index 978344e..5312e15 100644 --- a/examples/minimal-client-server/src/main.rs +++ b/examples/minimal-client-server/src/main.rs @@ -20,6 +20,15 @@ use minimal_server::MinimalServer; pub mod minimal_client; use minimal_client::MinimalClient; +/// An enum identifier or the two nodes +#[derive(PartialEq)] +pub enum NodeIdentifier { + /// The server node + ServerNode, + /// The client node + ClientNode, +} + /// A request that asks for the sum of two `u64`s. #[derive(Clone, Copy)] pub struct AddTwoIntsRequest { diff --git a/examples/minimal-client-server/src/minimal_client.rs b/examples/minimal-client-server/src/minimal_client.rs index a7de65f..9bda849 100644 --- a/examples/minimal-client-server/src/minimal_client.rs +++ b/examples/minimal-client-server/src/minimal_client.rs @@ -3,7 +3,7 @@ //! containing the sum of the two `u64`s. //! -use super::{AddTwoIntsRequest, AddTwoIntsResponse}; +use super::{AddTwoIntsRequest, AddTwoIntsResponse, NodeIdentifier}; use ncomm_clients_and_servers::local::LocalClient; use ncomm_core::{Client, Node}; @@ -41,7 +41,11 @@ impl MinimalClient { } } -impl Node for MinimalClient { +impl Node for MinimalClient { + fn get_id(&self) -> NodeIdentifier { + NodeIdentifier::ClientNode + } + fn get_update_delay_us(&self) -> u128 { 500_000 } diff --git a/examples/minimal-client-server/src/minimal_server.rs b/examples/minimal-client-server/src/minimal_server.rs index b1fc254..43820cb 100644 --- a/examples/minimal-client-server/src/minimal_server.rs +++ b/examples/minimal-client-server/src/minimal_server.rs @@ -3,7 +3,7 @@ //! the wrapping sum of the two u64s. //! -use super::{AddTwoIntsRequest, AddTwoIntsResponse}; +use super::{AddTwoIntsRequest, AddTwoIntsResponse, NodeIdentifier}; use ncomm_clients_and_servers::local::{LocalClient, LocalServer}; use ncomm_core::{Node, Server}; @@ -48,7 +48,11 @@ impl MinimalServer { } } -impl Node for MinimalServer { +impl Node for MinimalServer { + fn get_id(&self) -> NodeIdentifier { + NodeIdentifier::ServerNode + } + fn get_update_delay_us(&self) -> u128 { 500_000 } diff --git a/examples/minimal-publisher-subscriber/src/main.rs b/examples/minimal-publisher-subscriber/src/main.rs index 6fee963..b973233 100644 --- a/examples/minimal-publisher-subscriber/src/main.rs +++ b/examples/minimal-publisher-subscriber/src/main.rs @@ -21,6 +21,15 @@ use minimal_publisher::MinimalPublisher; pub mod minimal_subscriber; use minimal_subscriber::MinimalSubscriber; +/// Identifier for the two nodes. +#[derive(PartialEq)] +pub enum NodeIdentifier { + /// The publisher node + PublisherNode, + /// The subscriber node + SubscriberNode, +} + fn main() { let mut publisher_node = MinimalPublisher::new(); let subscriber_node = MinimalSubscriber::new(publisher_node.create_subscriber()); diff --git a/examples/minimal-publisher-subscriber/src/minimal_publisher.rs b/examples/minimal-publisher-subscriber/src/minimal_publisher.rs index 049c266..75d04c8 100644 --- a/examples/minimal-publisher-subscriber/src/minimal_publisher.rs +++ b/examples/minimal-publisher-subscriber/src/minimal_publisher.rs @@ -3,6 +3,8 @@ //! the message "Hello, World! {count}" to the subscriber. //! +use super::NodeIdentifier; + use ncomm_core::{Node, Publisher}; use ncomm_publishers_and_subscribers::local::{LocalPublisher, LocalSubscriber}; @@ -36,7 +38,11 @@ impl MinimalPublisher { } } -impl Node for MinimalPublisher { +impl Node for MinimalPublisher { + fn get_id(&self) -> NodeIdentifier { + NodeIdentifier::PublisherNode + } + fn get_update_delay_us(&self) -> u128 { 500_000u128 } diff --git a/examples/minimal-publisher-subscriber/src/minimal_subscriber.rs b/examples/minimal-publisher-subscriber/src/minimal_subscriber.rs index 3505240..8be8fd0 100644 --- a/examples/minimal-publisher-subscriber/src/minimal_subscriber.rs +++ b/examples/minimal-publisher-subscriber/src/minimal_subscriber.rs @@ -3,6 +3,8 @@ //! the publisher node. //! +use super::NodeIdentifier; + use ncomm_core::{Node, Subscriber}; use ncomm_publishers_and_subscribers::local::LocalSubscriber; @@ -27,7 +29,11 @@ impl MinimalSubscriber { } } -impl Node for MinimalSubscriber { +impl Node for MinimalSubscriber { + fn get_id(&self) -> NodeIdentifier { + NodeIdentifier::SubscriberNode + } + fn get_update_delay_us(&self) -> u128 { 500_000 } diff --git a/examples/minimal-update-client-server/src/fibonacci_update_client.rs b/examples/minimal-update-client-server/src/fibonacci_update_client.rs index bd08867..871e6b1 100644 --- a/examples/minimal-update-client-server/src/fibonacci_update_client.rs +++ b/examples/minimal-update-client-server/src/fibonacci_update_client.rs @@ -3,7 +3,7 @@ //! fibonacci series. //! -use super::{FibonacciRequest, FibonacciResponse, FibonacciUpdate}; +use super::{FibonacciRequest, FibonacciResponse, FibonacciUpdate, NodeIdentifier}; use ncomm_core::{Node, UpdateClient}; use ncomm_update_clients_and_servers::local::LocalUpdateClient; @@ -49,7 +49,11 @@ impl FibonacciUpdateClient { } } -impl Node for FibonacciUpdateClient { +impl Node for FibonacciUpdateClient { + fn get_id(&self) -> NodeIdentifier { + NodeIdentifier::FibonacciClient + } + fn get_update_delay_us(&self) -> u128 { 100_000 } diff --git a/examples/minimal-update-client-server/src/fibonacci_update_server.rs b/examples/minimal-update-client-server/src/fibonacci_update_server.rs index cd26e65..8613e54 100644 --- a/examples/minimal-update-client-server/src/fibonacci_update_server.rs +++ b/examples/minimal-update-client-server/src/fibonacci_update_server.rs @@ -4,7 +4,7 @@ //! into the series //! -use super::{FibonacciRequest, FibonacciResponse, FibonacciUpdate}; +use super::{FibonacciRequest, FibonacciResponse, FibonacciUpdate, NodeIdentifier}; use ncomm_core::{Node, UpdateServer}; use ncomm_update_clients_and_servers::local::{LocalUpdateClient, LocalUpdateServer}; @@ -93,7 +93,11 @@ impl FibonacciUpdateServer { } } -impl Node for FibonacciUpdateServer { +impl Node for FibonacciUpdateServer { + fn get_id(&self) -> NodeIdentifier { + NodeIdentifier::FibonacciServer + } + fn get_update_delay_us(&self) -> u128 { 100_000 } diff --git a/examples/minimal-update-client-server/src/main.rs b/examples/minimal-update-client-server/src/main.rs index d5b118d..9fb42a7 100644 --- a/examples/minimal-update-client-server/src/main.rs +++ b/examples/minimal-update-client-server/src/main.rs @@ -19,6 +19,15 @@ use fibonacci_update_client::FibonacciUpdateClient; pub mod fibonacci_update_server; use fibonacci_update_server::FibonacciUpdateServer; +#[derive(PartialEq)] +/// An identifier for the fibonacci client and server +pub enum NodeIdentifier { + /// The fibonacci client + FibonacciClient, + /// The fibonacci server + FibonacciServer, +} + #[derive(Clone, Copy, Debug, PartialEq, Eq)] /// A request asking for a the nth order of the fibonacci sequence. pub struct FibonacciRequest { diff --git a/ncomm-core/src/executor.rs b/ncomm-core/src/executor.rs index 0d615d9..6d58a5b 100644 --- a/ncomm-core/src/executor.rs +++ b/ncomm-core/src/executor.rs @@ -27,7 +27,10 @@ pub enum ExecutorState { } /// An executor handles the scheduling and execution of nodes -pub trait Executor { +/// +/// All nodes should have some unique ID that makes them identifiable +/// as trait objects +pub trait Executor { /// Starts the nodes contained by the executor fn start(&mut self); @@ -43,13 +46,16 @@ pub trait Executor { fn check_interrupt(&mut self) -> bool; /// Add a node to the executor. - fn add_node(&mut self, node: Box); + fn add_node(&mut self, node: Box>); /// Add a node to the executor with some given context. /// /// Note: The context is mainly to allow for extra configuration when /// adding nodes. - fn add_node_with_context(&mut self, node: Box, _ctx: CTX) { + fn add_node_with_context(&mut self, node: Box>, _ctx: CTX) { self.add_node(node); } + + /// Remove a node from the executor. + fn remove_node(&mut self, id: &ID) -> Option>>; } diff --git a/ncomm-core/src/node.rs b/ncomm-core/src/node.rs index 754cafd..e4f0599 100644 --- a/ncomm-core/src/node.rs +++ b/ncomm-core/src/node.rs @@ -11,7 +11,13 @@ /// A Node represents a singular process that performs some singular /// purpose -pub trait Node: Send { +/// +/// Nodes should also all be given unique IDs so that they can be identified as +/// trait objects. +pub trait Node: Send { + /// Return the node's ID + fn get_id(&self) -> ID; + /// Return the node's update rate (in us) fn get_update_delay_us(&self) -> u128; diff --git a/ncomm-executors/src/lib.rs b/ncomm-executors/src/lib.rs index 6568c3a..0f14ed4 100644 --- a/ncomm-executors/src/lib.rs +++ b/ncomm-executors/src/lib.rs @@ -29,32 +29,39 @@ use std::cmp::{Ord, Ordering}; /// of their next update. /// /// This ensures that nodes are updated at the correct time -pub(crate) struct NodeWrapper { +pub(crate) struct NodeWrapper { /// The timestamp of the nodes next update pub priority: u128, /// The nde this NodeWrapper is wrapping around - pub node: Box, + pub node: Box>, } -impl Ord for NodeWrapper { +impl NodeWrapper { + /// Destroy the node wrapper returning the node it was wrapping. + pub fn destroy(self) -> Box> { + self.node + } +} + +impl Ord for NodeWrapper { fn cmp(&self, other: &Self) -> Ordering { self.priority.cmp(&other.priority).reverse() } } -impl PartialOrd for NodeWrapper { +impl PartialOrd for NodeWrapper { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl PartialEq for NodeWrapper { +impl PartialEq for NodeWrapper { fn eq(&self, other: &Self) -> bool { self.priority == other.priority } } -impl Eq for NodeWrapper {} +impl Eq for NodeWrapper {} /// This method performs binary search insertion into the sorted vector /// `vec` with the node `node`. @@ -62,7 +69,7 @@ impl Eq for NodeWrapper {} /// This is just a convenience method I found myself using a ton so I decided /// to make it its own method. #[inline(always)] -pub(crate) fn insert_into(vec: &mut Vec, node: NodeWrapper) { +pub(crate) fn insert_into(vec: &mut Vec>, node: NodeWrapper) { // If another node is found with the same priority, insert the node after that // node. Otherwise, insert the node into the position it should be in in the // sorted vector diff --git a/ncomm-executors/src/simple_executor.rs b/ncomm-executors/src/simple_executor.rs index 0de29cf..f7f54c7 100644 --- a/ncomm-executors/src/simple_executor.rs +++ b/ncomm-executors/src/simple_executor.rs @@ -32,9 +32,9 @@ use crate::{insert_into, NodeWrapper}; /// Addendum: The Simple Executor will also busy wait between node executions /// so do not expect the SimpleExecutor to yield CPU time to other processes while /// it is running. -pub struct SimpleExecutor { +pub struct SimpleExecutor { // The sorted backing vector for the executor - backing: Vec, + backing: Vec>, // The quanta high-precision clock backing the SimplExecutor clock: Clock, // The current state of the executor @@ -47,7 +47,7 @@ pub struct SimpleExecutor { interrupted: bool, } -impl SimpleExecutor { +impl SimpleExecutor { /// Create a new Simple Executor without any Nodes pub fn new(interrupt: Receiver) -> Self { let clock = Clock::new(); @@ -64,7 +64,7 @@ impl SimpleExecutor { } /// Creates a new Simple Executor with a number of Nodes - pub fn new_with(interrupt: Receiver, mut nodes: Vec>) -> Self { + pub fn new_with(interrupt: Receiver, mut nodes: Vec>>) -> Self { let mut backing = Vec::new(); for node in nodes.drain(..) { backing.push(NodeWrapper { priority: 0, node }); @@ -84,7 +84,7 @@ impl SimpleExecutor { } } -impl Executor for SimpleExecutor { +impl Executor for SimpleExecutor { /// For each node in the simple executor we should reset their priority to 0 /// and start the node. We should also set the start_instant to the current time. /// @@ -190,13 +190,22 @@ impl Executor for SimpleExecutor { /// Add a node to the Simple Executor. /// - /// Node: If nodes are added in start `ExecutorState::Started` or - /// `ExecutorState::Running` the node will have to be updated and - /// inserted into the backing vector by priority. - fn add_node(&mut self, node: Box) { + /// Note: Nodes can only be added to the executor when it is not running. + /// + /// Additionally, only 1 node can exist per id so additional nodes added with + /// the same id will replace the previous node of a given id. + fn add_node(&mut self, node: Box>) { + if let Some(idx) = self + .backing + .iter() + .position(|node_wrapper| node_wrapper.node.get_id().eq(&node.get_id())) + { + self.backing.remove(idx); + } + if self.state == ExecutorState::Stopped { self.backing.push(NodeWrapper { priority: 0, node }); - } else { + } else if self.state == ExecutorState::Started { insert_into( &mut self.backing, NodeWrapper { @@ -210,6 +219,25 @@ impl Executor for SimpleExecutor { ); } } + + /// Remove a node from the Simple Executor. + /// + /// Note: Nodes can only be removed from the executor when it is not running. + fn remove_node(&mut self, id: &ID) -> Option>> { + if self.state != ExecutorState::Running { + let idx = self + .backing + .iter() + .position(|node_wrapper| node_wrapper.node.get_id().eq(id)); + if let Some(idx) = idx { + Some(self.backing.remove(idx).destroy()) + } else { + None + } + } else { + None + } + } } #[cfg(test)] @@ -228,14 +256,16 @@ mod tests { } pub struct SimpleNode { + id: u8, pub update_delay: u128, pub num: u8, state: State, } impl SimpleNode { - pub fn new(update_delay: u128) -> Self { + pub fn new(id: u8, update_delay: u128) -> Self { Self { + id, update_delay, num: 0, state: State::Stopped, @@ -243,7 +273,10 @@ mod tests { } } - impl Node for SimpleNode { + impl Node for SimpleNode { + fn get_id(&self) -> u8 { + self.id + } fn start(&mut self) { self.state = State::Started; } @@ -272,8 +305,8 @@ mod tests { let mut executor = SimpleExecutor::new_with( rx, vec![ - Box::new(SimpleNode::new(100_000)), - Box::new(SimpleNode::new(250_000)), + Box::new(SimpleNode::new(0, 100_000)), + Box::new(SimpleNode::new(1, 250_000)), ], ); let original_start_instant = executor.start_instant; @@ -298,8 +331,8 @@ mod tests { let mut executor = SimpleExecutor::new_with( rx, vec![ - Box::new(SimpleNode::new(10_000)), - Box::new(SimpleNode::new(25_000)), + Box::new(SimpleNode::new(0, 10_000)), + Box::new(SimpleNode::new(1, 25_000)), ], ); @@ -329,8 +362,8 @@ mod tests { let mut executor = SimpleExecutor::new_with( rx, vec![ - Box::new(SimpleNode::new(10_000)), - Box::new(SimpleNode::new(25_000)), + Box::new(SimpleNode::new(0, 110_000)), + Box::new(SimpleNode::new(1, 25_000)), ], ); @@ -346,16 +379,57 @@ mod tests { let mut executor = SimpleExecutor::new_with( rx, vec![ - Box::new(SimpleNode::new(10_000)), - Box::new(SimpleNode::new(25_000)), + Box::new(SimpleNode::new(0, 10_000)), + Box::new(SimpleNode::new(1, 25_000)), ], ); - executor.add_node(Box::new(SimpleNode::new(1_000))); + executor.add_node(Box::new(SimpleNode::new(2, 1_000))); assert_eq!(executor.backing.len(), 3); } + #[test] + fn test_add_node_same_id() { + let (_, rx) = unbounded(); + + let mut executor = SimpleExecutor::new_with( + rx, + vec![ + Box::new(SimpleNode::new(0, 10_000)), + Box::new(SimpleNode::new(1, 25_000)), + ], + ); + + executor.add_node(Box::new(SimpleNode::new(0, 1_000))); + + assert_eq!(executor.backing.len(), 2); + let zero_id = executor + .backing + .iter() + .find(|node_wrapper| node_wrapper.node.get_id().eq(&0)) + .unwrap(); + assert_eq!(zero_id.node.get_update_delay_us(), 1_000); + } + + #[test] + fn test_remove_node() { + let (_, rx) = unbounded(); + + let mut executor = SimpleExecutor::new_with( + rx, + vec![ + Box::new(SimpleNode::new(0, 10_000)), + Box::new(SimpleNode::new(1, 25_000)), + ], + ); + + executor.remove_node(&0); + + assert_eq!(executor.backing.len(), 1); + assert_eq!(executor.backing[0].node.get_id(), 1); + } + #[test] fn test_update_loop() { let (tx, rx) = unbounded(); @@ -363,8 +437,8 @@ mod tests { let mut executor = SimpleExecutor::new_with( rx, vec![ - Box::new(SimpleNode::new(10_000)), - Box::new(SimpleNode::new(25_000)), + Box::new(SimpleNode::new(0, 10_000)), + Box::new(SimpleNode::new(1, 25_000)), ], ); diff --git a/ncomm-executors/src/threadpool_executor.rs b/ncomm-executors/src/threadpool_executor.rs index adba9e3..da9588c 100644 --- a/ncomm-executors/src/threadpool_executor.rs +++ b/ncomm-executors/src/threadpool_executor.rs @@ -27,9 +27,9 @@ use crate::{insert_into, NodeWrapper}; /// Addendum: The main thread of the ThreadPool is conducting the scheduling so /// the ThreadPool will only have n-1 worker threads where n is the total number /// of threads allocated to the threadpool executor. -pub struct ThreadPoolExecutor { +pub struct ThreadPoolExecutor { // The sorted backing vector for the executor - backing: Vec, + backing: Vec>, // The quanta high-precision clock backing the ThreadPoll scheduler clock: Clock, // The ThreadPool to execute nodes on @@ -44,7 +44,7 @@ pub struct ThreadPoolExecutor { interrupted: bool, } -impl ThreadPoolExecutor { +impl ThreadPoolExecutor { /// Creates a new ThreadPool executor without any Nodes pub fn new(threads: usize, interrupt: Receiver) -> Self { let clock = Clock::new(); @@ -66,7 +66,7 @@ impl ThreadPoolExecutor { pub fn new_with( threads: usize, interrupt: Receiver, - mut nodes: Vec>, + mut nodes: Vec>>, ) -> Self { let mut backing = Vec::new(); for node in nodes.drain(..) { @@ -89,7 +89,7 @@ impl ThreadPoolExecutor { } } -impl Executor for ThreadPoolExecutor { +impl Executor for ThreadPoolExecutor { /// For each node in the ThreadPool executor the node will be updated /// and start_instant will be set to the current instant /// @@ -199,13 +199,22 @@ impl Executor for ThreadPoolExecutor { /// Add a node to the ThreadPool Executor. /// - /// Note: If the executor is currently in `ExecutorState::Started` or - /// `ExecutorState::Running` the node will be added with maximum - /// priority to the backing vector. - fn add_node(&mut self, node: Box) { + /// Note: Nodes can only be added to the executor when it is not running. + /// + /// Additionally, only 1 node can exist per id so additional nodes added with the same + /// id will replace the previous node of a given id + fn add_node(&mut self, node: Box>) { + if let Some(idx) = self + .backing + .iter() + .position(|node_wrapper| node_wrapper.node.get_id().eq(&node.get_id())) + { + self.backing.remove(idx); + } + if self.state == ExecutorState::Stopped { self.backing.push(NodeWrapper { priority: 0, node }); - } else { + } else if self.state == ExecutorState::Started { insert_into( &mut self.backing, NodeWrapper { @@ -219,6 +228,25 @@ impl Executor for ThreadPoolExecutor { ); } } + + /// Remove a node from the Threadpool Executor. + /// + /// Note: Nodes can only be removed from hte executor when it is not running + fn remove_node(&mut self, id: &ID) -> Option>> { + if self.state != ExecutorState::Running { + let idx = self + .backing + .iter() + .position(|node_wrapper| node_wrapper.node.get_id().eq(id)); + if let Some(idx) = idx { + Some(self.backing.remove(idx).destroy()) + } else { + None + } + } else { + None + } + } } #[cfg(test)] @@ -235,14 +263,16 @@ mod tests { } struct SimpleNode { + id: u8, update_delay: u128, num: u8, state: State, } impl SimpleNode { - pub fn new(update_delay: u128) -> Self { + pub fn new(id: u8, update_delay: u128) -> Self { Self { + id, update_delay, num: 0, state: State::Stopped, @@ -250,7 +280,11 @@ mod tests { } } - impl Node for SimpleNode { + impl Node for SimpleNode { + fn get_id(&self) -> u8 { + self.id + } + fn start(&mut self) { self.state = State::Started; } @@ -277,8 +311,8 @@ mod tests { 3, rx, vec![ - Box::new(SimpleNode::new(10_000)), - Box::new(SimpleNode::new(25_000)), + Box::new(SimpleNode::new(0, 10_000)), + Box::new(SimpleNode::new(1, 25_000)), ], ); let original_start_instant = executor.start_instant; @@ -305,8 +339,8 @@ mod tests { 3, rx, vec![ - Box::new(SimpleNode::new(10_000)), - Box::new(SimpleNode::new(25_000)), + Box::new(SimpleNode::new(0, 10_000)), + Box::new(SimpleNode::new(1, 25_000)), ], ); @@ -335,8 +369,8 @@ mod tests { 3, rx, vec![ - Box::new(SimpleNode::new(10_000)), - Box::new(SimpleNode::new(25_000)), + Box::new(SimpleNode::new(0, 10_000)), + Box::new(SimpleNode::new(1, 25_000)), ], ); @@ -353,16 +387,59 @@ mod tests { 3, rx, vec![ - Box::new(SimpleNode::new(10_000)), - Box::new(SimpleNode::new(25_000)), + Box::new(SimpleNode::new(0, 10_000)), + Box::new(SimpleNode::new(1, 25_000)), ], ); - executor.add_node(Box::new(SimpleNode::new(1_000))); + executor.add_node(Box::new(SimpleNode::new(2, 1_000))); assert_eq!(executor.backing.len(), 3); } + #[test] + fn test_add_node_same_id() { + let (_, rx) = unbounded(); + + let mut executor = ThreadPoolExecutor::new_with( + 3, + rx, + vec![ + Box::new(SimpleNode::new(0, 10_000)), + Box::new(SimpleNode::new(1, 25_000)), + ], + ); + + executor.add_node(Box::new(SimpleNode::new(0, 1_000))); + + assert_eq!(executor.backing.len(), 2); + let node_zero = executor + .backing + .iter() + .find(|node_wrapper| node_wrapper.node.get_id().eq(&0)) + .unwrap(); + assert_eq!(node_zero.node.get_update_delay_us(), 1_000); + } + + #[test] + fn test_remove_node() { + let (_, rx) = unbounded(); + + let mut executor = ThreadPoolExecutor::new_with( + 3, + rx, + vec![ + Box::new(SimpleNode::new(0, 10_000)), + Box::new(SimpleNode::new(1, 25_000)), + ], + ); + + executor.remove_node(&0); + + assert_eq!(executor.backing.len(), 1); + assert_eq!(executor.backing[0].node.get_id(), 1); + } + #[test] fn test_update_loop() { let (tx, rx) = unbounded(); @@ -371,8 +448,8 @@ mod tests { 2, rx, vec![ - Box::new(SimpleNode::new(10_000)), - Box::new(SimpleNode::new(25_000)), + Box::new(SimpleNode::new(0, 10_000)), + Box::new(SimpleNode::new(1, 25_000)), ], );