Skip to content

Commit

Permalink
Link transport plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Veritius committed Apr 11, 2024
1 parent 0974888 commit 2b060ad
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 0 deletions.
1 change: 1 addition & 0 deletions stardust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod messages;
pub mod plugin;
pub mod prelude;
pub mod scheduling;
pub mod testing;

#[cfg(feature="hashing")]
pub mod hashing;
3 changes: 3 additions & 0 deletions stardust/src/testing/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Utilities for testing
pub mod transport;
115 changes: 115 additions & 0 deletions stardust/src/testing/transport.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//! A simple transport layer using inter-thread communications, intended for use in tests and examples.
//!
//! Usage is simple, just add [`LinkTransportPlugin`] to all involved apps.
//! Then, use [`pair`] to create two [`Link`] components that communicate with eachother.
//! These 'links' don't do any kind of handshake. Once added to an entity, they communicate immediately.
use std::sync::{mpsc::{channel, Receiver, Sender, TryRecvError}, Mutex};
use bevy::prelude::*;
use bytes::Bytes;
use crate::prelude::*;

/// Adds a simple transport plugin for apps part of the same process.
/// See the [top level documentation](self) for more information.
pub struct LinkTransportPlugin;

impl Plugin for LinkTransportPlugin {
fn build(&self, app: &mut App) {
app.add_systems(PreUpdate, (recv_link_data, remove_disconnected)
.chain().in_set(NetworkRead::Receive));

app.add_systems(PostUpdate, (send_link_data, remove_disconnected)
.chain().in_set(NetworkWrite::Send));
}
}

/// A connection to another `Link`, made with [`pair`].
///
/// A `Link` will only communicate with its counterpart.
#[derive(Component)]
pub struct Link(SideInner);

/// Creates two connected [`Link`] objects.
pub fn pair() -> [Link; 2] {
let (left_tx, left_rx) = channel();
let (right_tx, right_rx) = channel();

let left = Link(SideInner {
sender: left_tx,
receiver: Mutex::new(right_rx),
disconnected: false,
});

let right = Link(SideInner {
sender: right_tx,
receiver: Mutex::new(left_rx),
disconnected: false,
});

return [left, right];
}

struct SideInner {
sender: Sender<Message>,
// Makes the struct Sync, so it can be in a Component.
// Use Exclusive when it's stabilised.
receiver: Mutex<Receiver<Message>>,
disconnected: bool,
}

struct Message {
channel: ChannelId,
payload: Bytes,
}

fn recv_link_data(
mut query: Query<(&mut Link, &mut NetworkMessages<Incoming>), With<NetworkPeer>>,
) {
query.par_iter_mut().for_each(|(mut link, mut queue)| {
let receiver = link.0.receiver.get_mut().unwrap();
loop {
match receiver.try_recv() {
Ok(message) => {
queue.push(message.channel, message.payload);
},
Err(TryRecvError::Empty) => { break },
Err(TryRecvError::Disconnected) => {
link.0.disconnected = true;
break;
},
}
}
});
}

fn send_link_data(
mut query: Query<(&mut Link, &NetworkMessages<Outgoing>), With<NetworkPeer>>,
) {
query.par_iter_mut().for_each(|(mut link, queue)| {
let sender = &link.0.sender;
'outer: for (channel, queue) in queue.all_queues() {
for payload in queue.iter().cloned() {
match sender.send(Message { channel, payload }) {
Ok(_) => {},
Err(_) => {
link.0.disconnected = true;
break 'outer;
},
}
}
}
});
}

fn remove_disconnected(
mut commands: Commands,
mut query: Query<(Entity, &Link, &mut NetworkPeerLifestage)>,
) {
for (entity, link, mut stage) in query.iter_mut() {
if link.0.disconnected {
debug!("Link on entity {entity:?} disconnected");
commands.entity(entity).remove::<Link>();
*stage = NetworkPeerLifestage::Closed;
}
}
}

0 comments on commit 2b060ad

Please sign in to comment.