From 0cb0dba586262b3a8b5d4cea20473c8103408049 Mon Sep 17 00:00:00 2001 From: veritius Date: Wed, 17 Apr 2024 13:54:35 +1000 Subject: [PATCH] Correctly revert this time --- README.md | 11 ++- stardust/Cargo.toml | 6 +- stardust/examples/simple.rs | 88 ------------------ stardust/src/channels/config.rs | 11 +-- stardust/src/channels/extension.rs | 4 +- stardust/src/channels/id.rs | 3 +- stardust/src/channels/mod.rs | 18 +++- stardust/src/channels/registry.rs | 76 +++++++++++++-- stardust/src/connections/debug.rs | 3 +- stardust/src/connections/groups.rs | 9 +- stardust/src/connections/peer.rs | 14 +-- stardust/src/connections/security.rs | 7 +- stardust/src/diagnostics/connections.rs | 27 ------ stardust/src/diagnostics/messages.rs | 45 --------- stardust/src/diagnostics/mod.rs | 7 -- stardust/src/hashing/mod.rs | 14 ++- stardust/src/hashing/resource.rs | 2 +- stardust/src/hashing/stablehash.rs | 25 ++--- stardust/src/lib.rs | 3 - stardust/src/messages/direction.rs | 29 ++++-- stardust/src/messages/queue.rs | 49 +--------- stardust/src/plugin.rs | 35 +------ stardust/src/testing/mod.rs | 3 - stardust/src/testing/transport.rs | 117 ------------------------ 24 files changed, 164 insertions(+), 442 deletions(-) delete mode 100644 stardust/examples/simple.rs delete mode 100644 stardust/src/diagnostics/connections.rs delete mode 100644 stardust/src/diagnostics/messages.rs delete mode 100644 stardust/src/diagnostics/mod.rs delete mode 100644 stardust/src/testing/mod.rs delete mode 100644 stardust/src/testing/transport.rs diff --git a/README.md b/README.md index 5d768507c..5253331d1 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ The following features are planned to be created as additional crates, as part o ## Usage | Bevy | Stardust | | ---- | -------- | -| 0.13 | 0.6 | +| 0.13 | 0.5 | | 0.12 | 0.2 | | 0.11 | 0.1 | @@ -51,7 +51,8 @@ The following features are planned to be created as additional crates, as part o // For the purpose of this example, we'll assume they magically appeared somehow. use std::any::TypeId; -use bevy::{prelude::*, app::{ScheduleRunnerPlugin, MainSchedulePlugin}}; +use bevy_ecs::prelude::*; +use bevy_app::{prelude::*, ScheduleRunnerPlugin, MainSchedulePlugin}; use bevy_stardust::prelude::*; // Channels are accessed with types in the type system. @@ -106,7 +107,7 @@ const MESSAGE: Bytes = Bytes::from_static("Hello, world!".as_bytes()); // Queueing messages just requires component access. // This means you can use query filters to achieve better parallelism. fn send_words_system( - registry: Res, + registry: ChannelRegistry, mut query: Query<(Entity, &mut NetworkMessages), With> ) { // The ChannelId must be retrieved from the registry. @@ -127,12 +128,12 @@ fn send_words_system( // The reading queue is a different component from the sending queue. // This means you can read and send bytes in parallel, or in different systems. fn read_words_system( - registry: Res, + registry: ChannelRegistry, query: Query<(Entity, &NetworkMessages), With> ) { let channel = registry.channel_id(TypeId::of::()).unwrap(); for (entity, incoming) in query.iter() { - let messages = incoming.get(channel); + let messages = incoming.channel_queue(channel); for message in messages.iter() { // Stardust only outputs bytes, so you need to convert to the desired type. // Also, in real products, don't unwrap, write checks. Never trust user data. diff --git a/stardust/Cargo.toml b/stardust/Cargo.toml index 04c8b467c..0acc4c30d 100644 --- a/stardust/Cargo.toml +++ b/stardust/Cargo.toml @@ -1,6 +1,6 @@ [package] name="bevy_stardust" -version="0.6.0" +version="0.5.1" edition="2021" authors=["Veritius "] license="MIT OR Apache-2.0" @@ -14,9 +14,7 @@ bytes = "1.5.0" smallvec = "1.11.1" gxhash = { version = "=3.0.0", optional = true } # 3.1 is broken, remove this bound when it's fixed -[dev-dependencies] -fastrand = "2" - [features] +default = ["reflect"] reflect = [] hashing = ["reflect", "dep:gxhash"] \ No newline at end of file diff --git a/stardust/examples/simple.rs b/stardust/examples/simple.rs deleted file mode 100644 index 643b9ecae..000000000 --- a/stardust/examples/simple.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::any::TypeId; - -use bevy::app::{AppLabel, SubApp, ScheduleRunnerPlugin}; -use bevy::log::LogPlugin; -use bevy::prelude::*; -use bevy_stardust::prelude::*; -use bevy_stardust::testing::transport::*; - -struct MyChannelA; -struct MyChannelB; -struct MyChannelC; - -#[derive(Resource)] -struct AppName(&'static str); - -fn main() { - let mut left = App::new(); - left.insert_resource(AppName("Left")); - - let mut right = App::new(); - right.insert_resource(AppName("Right")); - - let (link_left, link_right) = pair(); - left.world.spawn((NetworkPeer::new(), NetworkMessages::::new(), NetworkMessages::::new(), link_left)); - right.world.spawn((NetworkPeer::new(), NetworkMessages::::new(), NetworkMessages::::new(), link_right)); - - for app in [&mut left, &mut right] { - app.add_plugins((StardustPlugin, LinkTransportPlugin)); - - let config = ChannelConfiguration { - reliable: ReliabilityGuarantee::Reliable, - ordered: OrderingGuarantee::Ordered, - fragmented: false, - priority: 0, - }; - - app.add_channel::(config.clone()); - app.add_channel::(config.clone()); - app.add_channel::(config.clone()); - - app.add_systems(Update, ( - read_system, - write_system::, - write_system::, - write_system::, - )); - - // Manually invoke finish as this is a subapp. - app.finish(); - } - - #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, AppLabel)] - enum SubappLabel { Left, Right } - - let mut core = App::new(); - core.add_plugins((ScheduleRunnerPlugin::default(), LogPlugin::default())); - core.insert_sub_app(SubappLabel::Left, SubApp::new(left, |_,_| {})); - core.insert_sub_app(SubappLabel::Right, SubApp::new(right, |_,_| {})); - - core.run(); -} - -fn read_system( - name: Res, - query: Query<&NetworkMessages, With>, -) { - for incoming in query.iter() { - for (channel, queues) in incoming.iter() { - for payload in queues { - info!("{}: Received a message from a peer on channel {channel:?}: {payload:?}", name.0); - } - } - } -} - -fn write_system( - name: Res, - registry: Res, - mut query: Query<&mut NetworkMessages, With>, -) { - for mut outgoing in query.iter_mut() { - let rand = fastrand::u128(..); - let bytes = Bytes::copy_from_slice(&rand.to_be_bytes()[..]); - - info!("{}: Sent a message to a peer: {bytes:?}", name.0); - outgoing.push(registry.channel_id(TypeId::of::()).unwrap(), bytes); - } -} \ No newline at end of file diff --git a/stardust/src/channels/config.rs b/stardust/src/channels/config.rs index 515ef4100..27b84d5d4 100644 --- a/stardust/src/channels/config.rs +++ b/stardust/src/channels/config.rs @@ -2,14 +2,11 @@ //! //! All settings are not definitive, but hints to transport layers as how to treat channels. -use bevy::reflect::Reflect; - #[cfg(feature="hashing")] use {std::hash::Hasher, crate::hashing::StableHash}; /// Configuration for a channel. -#[derive(Debug, Clone, Hash, Reflect)] -#[reflect(Debug, Hash)] +#[derive(Debug, Clone)] pub struct ChannelConfiguration { /// Whether messages should be resent if they're missed. pub reliable: ReliabilityGuarantee, @@ -37,8 +34,7 @@ impl StableHash for &ChannelConfiguration { } /// The reliability guarantee of a channel. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Reflect)] -#[reflect(Debug, PartialEq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ReliabilityGuarantee { /// Messages are not guaranteed to arrive. Unreliable, @@ -58,8 +54,7 @@ impl StableHash for ReliabilityGuarantee { } /// The ordering guarantee of a channel. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Reflect)] -#[reflect(Debug, PartialEq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum OrderingGuarantee { /// Messages will be available in the order they are received. /// This is not necessarily the order they were sent. If that matters, use a different variant. diff --git a/stardust/src/channels/extension.rs b/stardust/src/channels/extension.rs index b89a37830..3e29b25ff 100644 --- a/stardust/src/channels/extension.rs +++ b/stardust/src/channels/extension.rs @@ -2,7 +2,7 @@ use bevy::app::App; use crate::channels::config::ChannelConfiguration; -use super::{id::Channel, ChannelRegistryMut}; +use super::{id::Channel, SetupChannelRegistry}; mod sealed { pub trait Sealed {} @@ -33,7 +33,7 @@ impl ChannelSetupAppExt for App { } // Add to registry - let mut registry = self.world.resource_mut::(); + let mut registry = self.world.resource_mut::(); registry.0.register_channel::(config); } } \ No newline at end of file diff --git a/stardust/src/channels/id.rs b/stardust/src/channels/id.rs index 19d699ea1..8b70cf141 100644 --- a/stardust/src/channels/id.rs +++ b/stardust/src/channels/id.rs @@ -73,7 +73,8 @@ impl Default for ChannelMarker { /// /// Channel identifiers are generated by the `ChannelRegistry` and are unique to the `World` they originated from. /// Attempting to use a `ChannelId` in another `World` will probably panic, or give you unintended results. -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Reflect)] +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] +#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))] #[repr(transparent)] pub struct ChannelId(u32); diff --git a/stardust/src/channels/mod.rs b/stardust/src/channels/mod.rs index 0b538218f..4066bdf14 100644 --- a/stardust/src/channels/mod.rs +++ b/stardust/src/channels/mod.rs @@ -27,4 +27,20 @@ mod extension; pub use config::*; pub use id::*; pub use registry::*; -pub use extension::ChannelSetupAppExt; \ No newline at end of file +pub use extension::ChannelSetupAppExt; + +use std::sync::Arc; +use bevy::prelude::*; + +pub(super) fn channel_build(app: &mut App) { + // Create setup channel registry + app.insert_resource(registry::SetupChannelRegistry(Box::new(ChannelRegistryInner::new()))); + +} + +pub(super) fn channel_finish(app: &mut App) { + // Remove SetupChannelRegistry and put the inner into an Arc inside ChannelRegistry + // This dramatically improves + let registry = app.world.remove_resource::().unwrap(); + app.insert_resource(FinishedChannelRegistry(Arc::from(registry.0))); +} \ No newline at end of file diff --git a/stardust/src/channels/registry.rs b/stardust/src/channels/registry.rs index 36d9cb655..c983a2791 100644 --- a/stardust/src/channels/registry.rs +++ b/stardust/src/channels/registry.rs @@ -1,14 +1,15 @@ //! The channel registry. use std::{any::TypeId, collections::BTreeMap, ops::{Deref, DerefMut}, sync::Arc}; -use bevy::prelude::*; +use bevy::{ecs::{component::ComponentId, system::SystemParam}, prelude::*}; use crate::prelude::ChannelConfiguration; use super::{id::{Channel, ChannelId}, ToChannelId}; +/// Mutable access to the channel registry, only available during app setup. #[derive(Resource)] -pub(crate) struct ChannelRegistryMut(pub(crate) Box); +pub(crate) struct SetupChannelRegistry(pub(crate) Box); -impl Deref for ChannelRegistryMut { +impl Deref for SetupChannelRegistry { type Target = ChannelRegistryInner; #[inline] @@ -17,20 +18,22 @@ impl Deref for ChannelRegistryMut { } } -impl DerefMut for ChannelRegistryMut { +impl DerefMut for SetupChannelRegistry { #[inline] fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } -/// Read-only access to the channel registry, only available after app setup. +/// Immutable access to the channel registry, only available after app setup. /// -/// This can be freely and cheaply cloned, and will point to the same inner channel registry. +/// In almost all cases, you should just use the [`ChannelRegistry`] systemparam. +/// However, this type can be cloned and will point to the same inner value. +/// This makes it useful for asynchronous programming, like in futures. #[derive(Resource, Clone)] -pub struct ChannelRegistry(pub(crate) Arc); +pub struct FinishedChannelRegistry(pub(crate) Arc); -impl Deref for ChannelRegistry { +impl Deref for FinishedChannelRegistry { type Target = ChannelRegistryInner; #[inline] @@ -39,6 +42,63 @@ impl Deref for ChannelRegistry { } } +/// Access to the configuration of registered channels, at any point. +/// +/// If you're writing async code, you might want to look at [`FinishedChannelRegistry`]. +pub struct ChannelRegistry<'a>(&'a ChannelRegistryInner); + +unsafe impl<'a> SystemParam for ChannelRegistry<'a> { + type State = (ComponentId, ComponentId); + type Item<'w, 's> = ChannelRegistry<'w>; + + fn init_state(world: &mut World, system_meta: &mut bevy::ecs::system::SystemMeta) -> Self::State { + // SAFETY: Since we can't register accesses, we do it through Res which can + ( + as SystemParam>::init_state(world, system_meta), + as SystemParam>::init_state(world, system_meta), + ) + } + + unsafe fn get_param<'w, 's>( + state: &'s mut Self::State, + _system_meta: &bevy::ecs::system::SystemMeta, + world: bevy::ecs::world::unsafe_world_cell::UnsafeWorldCell<'w>, + _change_tick: bevy::ecs::component::Tick, + ) -> Self::Item<'w, 's> { + if let Some(ptr) = world.get_resource_by_id(state.0) { + return ChannelRegistry(ptr.deref::().0.as_ref()); + } + + if let Some(ptr) = world.get_resource_by_id(state.1) { + return ChannelRegistry(ptr.deref::().0.as_ref()); + } + + panic!("Neither SetupChannelRegistry or FinishedChannelRegistry were present when attempting to create ChannelRegistry") + } +} + +impl ChannelRegistry<'_> { + /// Gets the id fom the `ToChannelId` implementation. + #[inline] + pub fn channel_id(&self, from: impl ToChannelId) -> Option { + self.0.channel_id(from) + } + + /// Gets the channel configuration for `id`. + #[inline] + pub fn channel_config(&self, id: impl ToChannelId) -> Option<&ChannelData> { + self.0.channel_config(id) + } +} + +impl Deref for ChannelRegistry<'_> { + type Target = ChannelRegistryInner; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + /// Stores channel configuration data. Accessible through the [`ChannelRegistry`] system parameter. pub struct ChannelRegistryInner { pub(super) channel_type_ids: BTreeMap, diff --git a/stardust/src/connections/debug.rs b/stardust/src/connections/debug.rs index f5e45f8c9..45b61c474 100644 --- a/stardust/src/connections/debug.rs +++ b/stardust/src/connections/debug.rs @@ -2,8 +2,7 @@ use bevy::prelude::*; /// Used to intentionally reduce the performance of peers for testing purposes. /// If applied to a `NetworkPeer` entity, reduces performance for that peer specifically. -#[derive(Debug, Clone, Component, Reflect)] -#[reflect(Debug, Component)] +#[derive(Debug, Component)] pub struct NetworkPerformanceReduction { /// Chance to drop a packet when sending, if the transport is packet-based. /// This chance is from `0.0` (never) to `1.0` (always), with `0.5` dropping 50% of the time. diff --git a/stardust/src/connections/groups.rs b/stardust/src/connections/groups.rs index 0eef97ddc..8ed87326d 100644 --- a/stardust/src/connections/groups.rs +++ b/stardust/src/connections/groups.rs @@ -6,10 +6,15 @@ use smallvec::SmallVec; /// A collection of network peers, used for organisational purposes. /// /// This can be used for anything, such as teams of players, rooms for replication, or administrative permissions. -#[derive(Debug, Default, Component, Reflect)] -#[reflect(Debug, Component)] +#[derive(Debug, Component)] pub struct NetworkGroup(pub(crate) SmallVec<[Entity; 8]>); +impl Default for NetworkGroup { + fn default() -> Self { + Self(SmallVec::default()) + } +} + impl NetworkGroup { /// Adds the peer to the network group. /// Does nothing if the peer is already included. diff --git a/stardust/src/connections/peer.rs b/stardust/src/connections/peer.rs index 652d8cdc7..4da6cc0d1 100644 --- a/stardust/src/connections/peer.rs +++ b/stardust/src/connections/peer.rs @@ -16,9 +16,9 @@ use bevy::prelude::*; /// - [`NetworkMessages`](crate::messages::NetworkMessages), relating to messages /// - [`NetworkPeerUid`], relating to persistent data /// - [`NetworkPeerLifestage`], relating to connection state -/// - [`NetworkSecurity`](super::security::NetworkSecurity), relating to encryption -#[derive(Debug, Component, Reflect)] -#[reflect(Debug, Component)] +/// - [`SecurityLevel`](super::security::SecurityLevel), relating to encryption +#[derive(Debug, Component)] +#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))] pub struct NetworkPeer { /// The point in time this peer was added to the `World`. pub joined: Instant, @@ -62,8 +62,8 @@ impl NetworkPeer { /// /// This exists to model the average lifecycle of a connection, from an initial handshake to being disconnected. /// An `Ord` implementation is provided, with variants being 'greater' if they're later in the model lifecycle. -#[derive(Debug, Component, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Reflect)] -#[reflect(Debug, Component, PartialEq)] +#[derive(Debug, Component, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))] #[non_exhaustive] pub enum NetworkPeerLifestage { /// Midway through a [handshake]. @@ -89,8 +89,8 @@ pub enum NetworkPeerLifestage { /// /// If you're working with another ID namespace, like UUIDs and Steam IDs, you should /// map the ids from that space into a unique value here through some kind of associative array. -#[derive(Component, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Reflect)] -#[reflect(Debug, Component, PartialEq, Hash)] +#[derive(Component, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))] pub struct NetworkPeerUid(pub u64); impl std::fmt::Debug for NetworkPeerUid { diff --git a/stardust/src/connections/security.rs b/stardust/src/connections/security.rs index ead9dafc8..ee0cb11e3 100644 --- a/stardust/src/connections/security.rs +++ b/stardust/src/connections/security.rs @@ -8,10 +8,9 @@ use bevy::prelude::*; /// /// This value is set by the transport layer managing this peer. /// It's up to it to provide an appropriate value here. -#[derive(Debug, Component, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Reflect)] -#[reflect(Debug, Component, PartialEq)] -#[non_exhaustive] -pub enum NetworkSecurity { +#[derive(Debug, Component, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))] +pub enum SecurityLevel { /// Communication is encrypted but not authenticated, or is fully plain text. /// /// **For end users:** diff --git a/stardust/src/diagnostics/connections.rs b/stardust/src/diagnostics/connections.rs deleted file mode 100644 index 445cec1d0..000000000 --- a/stardust/src/diagnostics/connections.rs +++ /dev/null @@ -1,27 +0,0 @@ -use bevy::{prelude::*, diagnostic::*}; -use crate::prelude::*; - -/// Adds diagnostics about connections. -pub struct NetworkPeerDiagnosticPlugin; - -impl Plugin for NetworkPeerDiagnosticPlugin { - fn build(&self, app: &mut App) { - app.register_diagnostic(Diagnostic::new(Self::COUNT) - .with_smoothing_factor(0.0) - .with_max_history_length(1)); - - app.add_systems(Update, diagnostic_system); - } -} - -impl NetworkPeerDiagnosticPlugin { - /// Diagnostic path for the amount of entities with [`NetworkPeer`]. - pub const COUNT: DiagnosticPath = DiagnosticPath::const_new("net/core/peers/total"); -} - -fn diagnostic_system( - mut diagnostics: Diagnostics, - query: Query<(), With>, -) { - diagnostics.add_measurement(&NetworkPeerDiagnosticPlugin::COUNT, || query.iter().count() as f64); -} \ No newline at end of file diff --git a/stardust/src/diagnostics/messages.rs b/stardust/src/diagnostics/messages.rs deleted file mode 100644 index a36bd65b4..000000000 --- a/stardust/src/diagnostics/messages.rs +++ /dev/null @@ -1,45 +0,0 @@ -use bevy::{prelude::*, diagnostic::*}; -use crate::prelude::*; - -/// Adds diagnostics about how many messages are being sent. -pub struct MessageCountDiagnosticsPlugin; - -impl Plugin for MessageCountDiagnosticsPlugin { - fn build(&self, app: &mut App) { - app.register_diagnostic(Diagnostic::new(Self::INCOMING_COUNT)); - app.register_diagnostic(Diagnostic::new(Self::OUTGOING_COUNT)); - - app.add_systems(Update, diagnostic_system); - } -} - -impl MessageCountDiagnosticsPlugin { - /// The number of incoming messages in queues for all peers. - pub const INCOMING_COUNT: DiagnosticPath = DiagnosticPath::const_new("net/core/messages/outgoing"); - - /// The number of outgoing messages in queues for all peers. - pub const OUTGOING_COUNT: DiagnosticPath = DiagnosticPath::const_new("net/core/messages/outgoing"); -} - -type QueryFilter = (Or<(With>, With>)>, With); - -fn diagnostic_system( - mut diagnostics: Diagnostics, - query: Query<(Option<&NetworkMessages>, Option<&NetworkMessages>), QueryFilter>, -) { - let mut incoming_count: usize = 0; - let mut outgoing_count: usize = 0; - - for (incoming, outgoing) in query.iter() { - if let Some(incoming) = incoming { - incoming_count += incoming.count(); - } - - if let Some(outgoing) = outgoing { - outgoing_count += outgoing.count(); - } - } - - diagnostics.add_measurement(&MessageCountDiagnosticsPlugin::INCOMING_COUNT, || incoming_count as f64); - diagnostics.add_measurement(&MessageCountDiagnosticsPlugin::OUTGOING_COUNT, || outgoing_count as f64); -} \ No newline at end of file diff --git a/stardust/src/diagnostics/mod.rs b/stardust/src/diagnostics/mod.rs deleted file mode 100644 index cba88c4c5..000000000 --- a/stardust/src/diagnostics/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -//! Diagnostics about various parts of Stardust. - -mod connections; -mod messages; - -pub use connections::*; -pub use messages::*; \ No newline at end of file diff --git a/stardust/src/hashing/mod.rs b/stardust/src/hashing/mod.rs index 1ef2fdc92..a49cf8628 100644 --- a/stardust/src/hashing/mod.rs +++ b/stardust/src/hashing/mod.rs @@ -1,7 +1,5 @@ //! Hashing of Stardust's configuration and related plugins. -pub use gxhash; - mod stablehash; mod resource; @@ -9,9 +7,14 @@ use bevy::prelude::*; pub(crate) use resource::{PendingHashValues, finalise_hasher_system}; -pub use stablehash::{StableHash, STABLE_HASHER_SEED}; +pub use stablehash::StableHash; pub use resource::ProtocolConfigHash; +mod sealed { + pub trait Sealed {} + impl Sealed for bevy::app::App {} +} + /// Extends Bevy's `App` to add methods for generating the [ProtocolId]. pub trait HashingAppExt: sealed::Sealed { /// Hashes `value` immediately. @@ -26,9 +29,4 @@ impl HashingAppExt for App { let mut hasher = self.world.resource_mut::(); value.hash(&mut hasher.state); } -} - -mod sealed { - pub trait Sealed {} - impl Sealed for bevy::app::App {} } \ No newline at end of file diff --git a/stardust/src/hashing/resource.rs b/stardust/src/hashing/resource.rs index cae829c36..489971714 100644 --- a/stardust/src/hashing/resource.rs +++ b/stardust/src/hashing/resource.rs @@ -4,7 +4,7 @@ use gxhash::GxHasher; use super::stablehash::STABLE_HASHER_SEED; /// A unique value generated during `App` creation, used to ensure two clients have consistent network setups. -#[derive(Debug, Resource)] +#[derive(Resource)] pub struct ProtocolConfigHash { int: u64, } diff --git a/stardust/src/hashing/stablehash.rs b/stardust/src/hashing/stablehash.rs index ac801677a..fd2f479d5 100644 --- a/stardust/src/hashing/stablehash.rs +++ b/stardust/src/hashing/stablehash.rs @@ -1,23 +1,18 @@ use std::hash::Hasher; /// Pre-defined seed used in GxHasher. -pub const STABLE_HASHER_SEED: i64 = 0x68066CFE6F752C27; +pub(super) const STABLE_HASHER_SEED: i64 = 0x68066CFE6F752C27; -/// A stably hashable type, for comparing data across machines. +/// A stably hashable type, for comparing configurations across the network. +/// Since `#[derive(Hash)]` does not guarantee stability, this trait exists instead. +/// You should implement it manually. /// -/// Implementors must uphold the following guarantees: -/// - The hash is fully deterministic and only changes with crate versions. -/// - Derive macros should not be used without `#[repr(C)]`, as the Rust compiler can reorder fields. -/// - Semantic versioning applies: a change in a hashed value should be considered a breaking change. -/// - The hash must be the same regardless of CPU architecture, platform, Rust version, or compilation. -/// - Endianness must be taken into account when using the `Hasher`. -/// - `write` and `write_u8` are fine and don't change with endianness. -/// - Other functions like `write_u32` should have consistent endianness. -/// - Use `to_be` or `to_le` on integer types. -/// -/// While `StableHash` is generic over the `Hasher`, -/// you should only use a hasher that is known to be stable. -/// Currently, this is `gxhash`, but this may change in future. +/// This must always feed the same bytes into the hasher no matter the architecture, platform, Rust version, or compilation. +/// If this guarantee is not upheld, different compilations of the same application may become incompatible. +/// If possible, you should always go through the `StableHash` implementation of a type, rather than using the `Hasher`'s API. +/// +/// Notes for implementors: +/// - Only write bytes (`write`, `write_u8`) - don't use other functions pub trait StableHash { /// Hashes the type through `H`. fn hash(&self, state: &mut H); diff --git a/stardust/src/lib.rs b/stardust/src/lib.rs index 7c6a3b7c5..7a9fe5c41 100644 --- a/stardust/src/lib.rs +++ b/stardust/src/lib.rs @@ -1,15 +1,12 @@ #![doc = include_str!("../README.md")] -#![forbid(unsafe_code)] #![warn(missing_docs)] pub mod channels; pub mod connections; -pub mod diagnostics; pub mod messages; pub mod plugin; pub mod prelude; pub mod scheduling; -pub mod testing; #[cfg(feature="hashing")] pub mod hashing; \ No newline at end of file diff --git a/stardust/src/messages/direction.rs b/stardust/src/messages/direction.rs index afee74f17..1ad88d619 100644 --- a/stardust/src/messages/direction.rs +++ b/stardust/src/messages/direction.rs @@ -1,9 +1,8 @@ -use std::fmt::Debug; -use bevy::reflect::Reflect; +use std::{any::Any, fmt::Debug}; /// The direction a message is going, as an enum for dynamic use. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Reflect)] -#[reflect(Debug, PartialEq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))] pub enum Direction { /// Messages being sent to a remote peer. Outgoing, @@ -16,14 +15,26 @@ pub enum Direction { /// Implemented by: /// - [`Outgoing`], corresponding to [`Direction::Outgoing`] /// - [`Incoming`], corresponding to [`Direction::Incoming`] -pub trait DirectionType: Debug + Send + Sync + Reflect + sealed::Sealed { +#[cfg(not(feature="reflect"))] +pub trait DirectionType: Debug + Send + Sync + Any + sealed::Sealed { + /// Returns the corresponding [`Direction`]. + fn as_enum() -> Direction; +} + +/// The direction a message is going, as a trait for use in the type system. +/// +/// Implemented by: +/// - [`Outgoing`], corresponding to [`Direction::Outgoing`] +/// - [`Incoming`], corresponding to [`Direction::Incoming`] +#[cfg(feature="reflect")] +pub trait DirectionType: Debug + Send + Sync + Any + bevy::reflect::Reflect + sealed::Sealed { /// Returns the corresponding [`Direction`]. fn as_enum() -> Direction; } /// Messages being sent to a remote peer. Counterpart to [`Incoming`]. -#[derive(Debug, Clone, Copy, Reflect)] -#[reflect(Debug)] +#[derive(Debug, Clone, Copy)] +#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))] pub struct Outgoing; impl DirectionType for Outgoing { fn as_enum() -> Direction { @@ -32,8 +43,8 @@ impl DirectionType for Outgoing { } /// Messages being received from a remote peer. Counterpart to [`Outgoing`]. -#[derive(Debug, Clone, Copy, Reflect)] -#[reflect(Debug)] +#[derive(Debug, Clone, Copy)] +#[cfg_attr(feature="reflect", derive(bevy::reflect::Reflect))] pub struct Incoming; impl DirectionType for Incoming { fn as_enum() -> Direction { diff --git a/stardust/src/messages/queue.rs b/stardust/src/messages/queue.rs index 3a57c33b5..d8451c728 100644 --- a/stardust/src/messages/queue.rs +++ b/stardust/src/messages/queue.rs @@ -9,13 +9,10 @@ static EMPTY_SLICE: &[Bytes] = &[]; /// A queue-like structure for storing messages, separated by channels. /// /// The items in this queue **do not** persist across frames. -/// They are cleared in [`NetworkWrite::Clear`] in [`PostUpdate`]. -#[derive(Component, Reflect)] -#[reflect(Debug, Component)] +/// They are cleared in [`NetworkWrite::Clear`]. +#[derive(Component)] pub struct NetworkMessages { - #[reflect(ignore)] pub(crate) queue_map: HashMap>, - #[reflect(ignore)] phantom: PhantomData } @@ -52,7 +49,7 @@ impl NetworkMessages { } /// Returns a slice of the queue for channel `channel`. - pub fn get(&self, channel: ChannelId) -> &[Bytes] { + pub fn channel_queue(&self, channel: ChannelId) -> &[Bytes] { self.queue_map .get(&channel) .map_or(EMPTY_SLICE, |v| v.as_slice()) @@ -60,48 +57,10 @@ impl NetworkMessages { /// Returns an iterator over all queues, including their channel ids. /// The iterator does not contain empty queues. - pub fn iter(&self) -> impl Iterator { + pub fn all_queues(&self) -> impl Iterator { self.queue_map .iter() .filter(|(_,v)| v.len() != 0) .map(|(k,v)| (k.clone(), v.as_slice())) } - - /// Resizes all buffers based on `func`. - /// - /// `func` takes the current capacity of the buffer as an input, - /// and outputs the new size of the buffer. Nothing happens if the two values are equal. - /// If the returned value is lesser, the buffer will resize to either the length or the target, - /// whichever is greater. If the returned value is greater, the length will be exactly so. - pub fn resize(&mut self, func: impl Fn(usize) -> usize) { - self.queue_map.iter_mut().for_each(|(_, buf)| { - use std::cmp::Ordering; - - let cur_len = buf.len(); - let new_len = func(cur_len); - match new_len.cmp(&cur_len) { - Ordering::Equal => {}, - Ordering::Less => { - buf.shrink_to(new_len); - }, - Ordering::Greater => { - let diff = new_len - cur_len; - buf.reserve(diff); - }, - } - }); - } -} - -impl Default for NetworkMessages { - #[inline] - fn default() -> Self { - Self::new() - } -} - -impl std::fmt::Debug for NetworkMessages { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!("NetworkMessages<{}>", std::any::type_name::())) - } } \ No newline at end of file diff --git a/stardust/src/plugin.rs b/stardust/src/plugin.rs index 7f00ea43c..86782dabb 100644 --- a/stardust/src/plugin.rs +++ b/stardust/src/plugin.rs @@ -1,6 +1,5 @@ //! The Stardust core plugin. -use std::sync::Arc; use bevy::prelude::*; use crate::prelude::*; @@ -10,32 +9,7 @@ pub struct StardustPlugin; impl Plugin for StardustPlugin { fn build(&self, app: &mut App) { - // Register connection types - app.register_type::(); - app.register_type::(); - app.register_type::(); - app.register_type::(); - app.register_type::(); - app.register_type::(); - - // Register channel types - app.register_type::(); - app.register_type::(); - app.register_type::(); - app.register_type::(); - - // Register messaging types - app.register_type::(); - app.register_type::(); - app.register_type::(); - app.register_type::>(); - app.register_type::>(); - - // Setup orderings - crate::scheduling::configure_scheduling(app); - - // Add ChannelRegistryMut - app.insert_resource(ChannelRegistryMut(Box::new(ChannelRegistryInner::new()))); + crate::channels::channel_build(app); // Add systems app.add_systems(Last, crate::connections::systems::despawn_closed_connections_system); @@ -44,6 +18,9 @@ impl Plugin for StardustPlugin { crate::messages::systems::clear_message_queue_system::, ).in_set(NetworkWrite::Clear)); + // Setup orderings + crate::scheduling::configure_scheduling(app); + // Hashing-related functionality #[cfg(feature="hashing")] { use crate::hashing::*; @@ -53,8 +30,6 @@ impl Plugin for StardustPlugin { } fn finish(&self, app: &mut App) { - // Remove SetupChannelRegistry and put the inner into an Arc inside ChannelRegistry - let registry = app.world.remove_resource::().unwrap(); - app.insert_resource(ChannelRegistry(Arc::from(registry.0))); + crate::channels::channel_finish(app); } } \ No newline at end of file diff --git a/stardust/src/testing/mod.rs b/stardust/src/testing/mod.rs deleted file mode 100644 index 89b256d11..000000000 --- a/stardust/src/testing/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! Utilities for testing - -pub mod transport; \ No newline at end of file diff --git a/stardust/src/testing/transport.rs b/stardust/src/testing/transport.rs deleted file mode 100644 index 8493d1262..000000000 --- a/stardust/src/testing/transport.rs +++ /dev/null @@ -1,117 +0,0 @@ -//! A simple transport layer using inter-thread communications, intended for use in tests and examples. -//! -//! Usage is simple, just add [`LinkTransportPlugin`] to all involved apps. -//! Then, use [`pair`] to create two [`Link`] components that communicate with eachother. -//! These 'links' don't do any kind of handshake. Once added to an entity, they communicate immediately. - -use std::sync::{mpsc::{channel, Receiver, Sender, TryRecvError}, Mutex}; -use bevy::prelude::*; -use bytes::Bytes; -use crate::prelude::*; - -/// Adds a simple transport plugin for apps part of the same process. -/// See the [top level documentation](self) for more information. -pub struct LinkTransportPlugin; - -impl Plugin for LinkTransportPlugin { - fn build(&self, app: &mut App) { - app.add_systems(PreUpdate, (recv_link_data, remove_disconnected) - .chain().in_set(NetworkRead::Receive)); - - app.add_systems(PostUpdate, (send_link_data, remove_disconnected) - .chain().in_set(NetworkWrite::Send)); - } -} - -/// A connection to another `Link`, made with [`pair`]. -/// -/// A `Link` will only communicate with its counterpart. -#[derive(Component)] -pub struct Link(SideInner); - -/// Creates two connected [`Link`] objects. -pub fn pair() -> (Link, Link) { - let (left_tx, left_rx) = channel(); - let (right_tx, right_rx) = channel(); - - let left = Link(SideInner { - sender: left_tx, - receiver: Mutex::new(right_rx), - disconnected: false, - }); - - let right = Link(SideInner { - sender: right_tx, - receiver: Mutex::new(left_rx), - disconnected: false, - }); - - return (left, right); -} - -struct SideInner { - sender: Sender, - // Makes the struct Sync, so it can be in a Component. - // Use Exclusive when it's stabilised. - receiver: Mutex>, - disconnected: bool, -} - -struct Message { - channel: ChannelId, - payload: Bytes, -} - -fn recv_link_data( - mut query: Query<(&mut Link, &mut NetworkMessages), With>, -) { - query.par_iter_mut().for_each(|(mut link, mut queue)| { - let receiver = link.0.receiver.get_mut().unwrap(); - loop { - match receiver.try_recv() { - Ok(message) => { - queue.push(message.channel, message.payload); - }, - Err(TryRecvError::Empty) => { break }, - Err(TryRecvError::Disconnected) => { - link.0.disconnected = true; - break; - }, - } - } - }); -} - -fn send_link_data( - mut query: Query<(&mut Link, &NetworkMessages), With>, -) { - query.par_iter_mut().for_each(|(mut link, queue)| { - let sender = &link.0.sender; - 'outer: for (channel, queue) in queue.iter() { - for payload in queue.iter().cloned() { - match sender.send(Message { channel, payload }) { - Ok(_) => {}, - Err(_) => { - link.0.disconnected = true; - break 'outer; - }, - } - } - } - }); -} - -fn remove_disconnected( - mut commands: Commands, - mut query: Query<(Entity, &Link, Option<&mut NetworkPeerLifestage>)>, -) { - for (entity, link, stage) in query.iter_mut() { - if link.0.disconnected { - debug!("Link on entity {entity:?} disconnected"); - commands.entity(entity).remove::(); - if let Some(mut stage) = stage { - *stage = NetworkPeerLifestage::Closed; - } - } - } -} \ No newline at end of file