Skip to content

Commit

Permalink
Node
Browse files Browse the repository at this point in the history
node
  • Loading branch information
191220029 authored and genedna committed Nov 11, 2024
1 parent 174e497 commit 36585b9
Show file tree
Hide file tree
Showing 14 changed files with 519 additions and 12 deletions.
17 changes: 16 additions & 1 deletion .github/workflows/base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
with:
node-version: 16

test:
check_fmt:
name: Fmt Check
runs-on: ubuntu-latest
steps:
Expand All @@ -41,6 +41,21 @@ jobs:
command: fmt
args: --check

test:
name: Unit test & Doc test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- uses: actions-rs/cargo@v1
with:
command: test
args: --all

# test:
# name: Test Suite
# runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ members = ["."]
tokio = { version = "1.28", features = ["rt", "sync", "rt-multi-thread"] }
log = "0.4"
env_logger = "0.10.1"
async-trait = "0.1.83"

[dev-dependencies]
simplelog = "0.12"
Expand Down
8 changes: 2 additions & 6 deletions src/connection/out_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,9 @@ impl OutChannels {
}

/// # Output Channel
/// Wrapper of senders of `tokio::sync::mpsc` and `tokio::sync::broadcast`. **Dagrs** will
/// Wrapper of senderrs of `tokio::sync::mpsc` and `tokio::sync::broadcast`. **Dagrs** will
/// decide the inner type of channel when building the graph.
/// ## Implements
/// - `blocking_send`: sends the message, blocked if no capacity left in the channel. Returns `Ok()`
/// if message sent; returns `Err(SendErr)` if error occurs.
/// - `send`: sends the message, waiting until there is capacity asynchronously. Returns `Ok()`
/// if message sent; returns `Err(SendErr)` if error occurs.
/// Learn more about [Tokio Channels](https://tokio.rs/tokio/tutorial/channels).
enum OutChannel {
/// Sender of a `tokio::sync::mpsc` channel.
Mpsc(mpsc::Sender<Content>),

Check warning on line 53 in src/connection/out_channel.rs

View workflow job for this annotation

GitHub Actions / Unit test & Doc test

variants `Mpsc` and `Bcst` are never constructed

Check warning on line 53 in src/connection/out_channel.rs

View workflow job for this annotation

GitHub Actions / Check

variants `Mpsc` and `Bcst` are never constructed
Expand Down
1 change: 0 additions & 1 deletion src/graph/mod.rs

This file was deleted.

2 changes: 0 additions & 2 deletions src/graph/node.rs

This file was deleted.

17 changes: 15 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,15 @@
mod connection;
pub mod graph;
pub mod connection;
pub mod node;
pub mod utils;

pub use connection::{
in_channel::{InChannels, RecvErr},
information_packet::Content,
out_channel::{OutChannels, SendErr},
};
pub use node::{
action::{Action, EmptyAction},
default_node::DefaultNode,
node::*,
};
pub use utils::{env::EnvVar, output::Output};
63 changes: 63 additions & 0 deletions src/node/action.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::sync::Arc;

use async_trait::async_trait;

use crate::{
connection::{in_channel::InChannels, out_channel::OutChannels},
utils::{env::EnvVar, output::Output},
};

/// Node specific behavior
///
/// [`Action`] stores the specific execution logic of a task.
///
/// # Example
/// An implementation of [`Action`]: `HelloAction`, having private
/// fields `statement` and `repeat`.
///
/// ```rust
/// use std::sync::Arc;
/// use dagrs::{Action, EnvVar, Output, InChannels, OutChannels};
/// use async_trait::async_trait;
///
/// struct HelloAction{
/// statement: String,
/// repeat: usize,
/// }
///
/// #[async_trait]
/// impl Action for HelloAction{
/// async fn run(&self, _: &mut InChannels, _: &OutChannels, _: Arc<EnvVar>) -> Output{
/// for i in 0..self.repeat {
/// println!("{}",self.statement);
/// }
/// Output::empty()
/// }
/// }
///
/// let hello=HelloAction {
/// statement: "hello world!".to_string(),
/// repeat: 10
/// };
///
/// ```
#[async_trait]
pub trait Action: Send + Sync {
async fn run(
&self,
in_channels: &mut InChannels,
out_channels: &OutChannels,
env: Arc<EnvVar>,
) -> Output;
}

/// An empty implementaion of [`Action`].
///
/// Used as a placeholder when creating a `Node` without `Action`.
pub struct EmptyAction;
#[async_trait]
impl Action for EmptyAction {
async fn run(&self, _: &mut InChannels, _: &OutChannels, _: Arc<EnvVar>) -> Output {
Output::Out(None)
}
}
157 changes: 157 additions & 0 deletions src/node/default_node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use std::sync::Arc;

use crate::{
connection::{in_channel::InChannels, out_channel::OutChannels},
utils::{env::EnvVar, output::Output},
};

use super::{
action::{Action, EmptyAction},
node::{Node, NodeId, NodeName, NodeTable},
};

/// # Default node type
///
/// [`DefaultNode`] is a default implementation of the [`Node`] trait. Users can use this node
/// type to build tasks to meet most needs.
///
/// ## Create a `DefaultNode`:
/// - use the method `new`. Required attributes: node's name; [`NodeTable`](for id allocation).
///
/// ```rust
/// use dagrs::{NodeName, NodeTable, DefaultNode};
///
/// let node_name = "Node X";
/// let mut node_table = NodeTable::new();
/// let mut node = DefaultNode::new(
/// NodeName::from(node_name),
/// &mut node_table,
/// );
/// ```
///
/// - use the method `with_action`. Required attributes: node's name; [`NodeTable`](for id allocation);
/// execution logic [`Action`].
///
/// ```rust
/// use dagrs::{NodeName, NodeTable, DefaultNode, EmptyAction};
///
/// let node_name = "Node X";
/// let mut node_table = NodeTable::new();
/// let mut node = DefaultNode::with_action(
/// NodeName::from(node_name),
/// Box::new(EmptyAction),
/// &mut node_table,
/// );
/// ```
pub struct DefaultNode {
id: NodeId,
name: NodeName,
action: Box<dyn Action>,
in_channels: InChannels,
out_channels: OutChannels,
}

impl Node for DefaultNode {
fn id(&self) -> NodeId {
self.id.clone()
}

fn name(&self) -> NodeName {
self.name.clone()
}

fn input_channels(&mut self) -> &mut InChannels {
&mut self.in_channels
}

fn output_channels(&mut self) -> &mut OutChannels {
&mut self.out_channels
}

fn run(&mut self, env: Arc<EnvVar>) -> Output {
tokio::runtime::Runtime::new().unwrap().block_on(async {
self.action
.run(&mut self.in_channels, &self.out_channels, env)
.await
})
}
}

impl DefaultNode {
pub fn new(name: NodeName, node_table: &mut NodeTable) -> Self {
Self {
id: node_table.alloc_id_for(&name),
name,
action: Box::new(EmptyAction),
in_channels: InChannels::default(),
out_channels: OutChannels::default(),
}
}

pub fn with_action(
name: NodeName,
action: Box<dyn Action>,
node_table: &mut NodeTable,
) -> Self {
Self {
id: node_table.alloc_id_for(&name),
name,
action,
in_channels: InChannels::default(),
out_channels: OutChannels::default(),
}
}
}

#[cfg(test)]
mod test_default_node {

use std::sync::Arc;

use crate::{Content, EnvVar, InChannels, Node, NodeName, NodeTable, OutChannels, Output};

use super::{Action, DefaultNode};

use async_trait::async_trait;

/// An implementation of [`Action`] that returns [`Output::Out`] containing a String "Hello world".
#[derive(Default)]
pub struct HelloAction;
#[async_trait]
impl Action for HelloAction {
async fn run(&self, _: &mut InChannels, _: &OutChannels, _: Arc<EnvVar>) -> Output {
Output::Out(Some(Content::new("Hello world".to_string())))
}
}

impl HelloAction {
pub fn new() -> Box<Self> {
Box::new(Self::default())
}
}

/// Test for create a default node.
///
/// Step 1: create a [`DefaultNode`] with [`HelloAction`].
///
/// Step 2: run the node and verify its output.
#[test]
fn create_default_node() {
let node_name = "Test Node";

let mut node_table = NodeTable::new();
let mut node = DefaultNode::with_action(
NodeName::from(node_name),
HelloAction::new(),
&mut node_table,
);

// Check if node table has key-value pair (node.name, node.id)
assert_eq!(node_table.get(node_name).unwrap(), &node.id());

let env = Arc::new(EnvVar::new(node_table));
let out = node.run(env).get_out().unwrap();
let out: &String = out.get().unwrap();
assert_eq!(out, "Hello world");
}
}
29 changes: 29 additions & 0 deletions src/node/id_allocate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::sync::atomic::AtomicUsize;

use super::node::NodeId;

/// IDAllocator for Node.
struct IDAllocator {
id: AtomicUsize,
}

impl IDAllocator {
fn alloc(&self) -> NodeId {
let origin = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if origin > self.id.load(std::sync::atomic::Ordering::Relaxed) {
panic!("Too many tasks.")
} else {
NodeId(origin)
}
}
}

/// The global task uniquely identifies an instance of the allocator.
static ID_ALLOCATOR: IDAllocator = IDAllocator {
id: AtomicUsize::new(1),
};

/// Assign node's id.
pub(crate) fn alloc_id() -> NodeId {
ID_ALLOCATOR.alloc()
}
4 changes: 4 additions & 0 deletions src/node/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod action;
pub mod default_node;
pub mod id_allocate;
pub mod node;
Loading

0 comments on commit 36585b9

Please sign in to comment.