Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(engineio/socket): atomically adjacent packet requirement for binary payloads #287

Merged
merged 5 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions engineioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ hyper-util = { workspace = true, features = ["tokio"] }
base64 = "0.22.0"
bytes = "1.4.0"
rand = "0.8.5"
smallvec = { version = "1.13.1", features = ["union"] }

# Tracing
tracing = { workspace = true, optional = true }
Expand Down
67 changes: 33 additions & 34 deletions engineioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use std::{
};

use http::request::Parts;
use smallvec::{smallvec, SmallVec};
use tokio::{
sync::{
mpsc::{self},
Expand Down Expand Up @@ -118,44 +119,35 @@ impl From<&Error> for Option<DisconnectReason> {
/// A permit to emit a message to the client.
/// A permit holds a place in the internal channel to send one packet to the client.
pub struct Permit<'a> {
inner: mpsc::Permit<'a, Packet>,
inner: mpsc::Permit<'a, PacketBuf>,
}
impl Permit<'_> {
/// Consume the permit and emit a message to the client.
#[inline]
pub fn emit(self, msg: String) {
self.inner.send(Packet::Message(msg));
self.inner.send(smallvec![Packet::Message(msg)]);
}
/// Consume the permit and emit a binary message to the client.
#[inline]
pub fn emit_binary(self, data: Vec<u8>) {
self.inner.send(Packet::Binary(data));
self.inner.send(smallvec![Packet::Binary(data)]);
}
}

/// An [`Iterator`] over the permits returned by the [`reserve`](Socket::reserve) function
#[derive(Debug)]
pub struct PermitIterator<'a> {
inner: mpsc::PermitIterator<'a, Packet>,
}

impl<'a> Iterator for PermitIterator<'a> {
type Item = Permit<'a>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
let inner = self.inner.next()?;
Some(Permit { inner })
}
}
impl ExactSizeIterator for PermitIterator<'_> {
#[inline]
fn len(&self) -> usize {
self.inner.len()
/// Consume the permit and emit a message with multiple binary data to the client.
///
/// It can be used to ensure atomicity when sending a string packet with adjacent binary packets.
pub fn emit_many(self, msg: String, data: Vec<Vec<u8>>) {
let mut packets = SmallVec::with_capacity(data.len() + 1);
packets.push(Packet::Message(msg));
for d in data {
packets.push(Packet::Binary(d));
}
self.inner.send(packets);
}
}
impl std::iter::FusedIterator for PermitIterator<'_> {}

/// Buffered packets to send to the client
pub(crate) type PacketBuf = SmallVec<[Packet; 10]>;
/// A [`Socket`] represents a client connection to the server.
/// It is agnostic to the [`TransportType`].
///
Expand All @@ -179,7 +171,7 @@ where
/// without any mutex
transport: AtomicU8,

/// Channel to receive [`Packet`] from the connection
/// Channel to send [`PacketBuf`] to the connection
///
/// It is used and managed by the [`EngineIo`](crate::engine) struct depending on the transport type
///
Expand All @@ -192,10 +184,12 @@ where
/// * From the fn [`on_ws_req_init`](crate::engine::EngineIo) if the transport is websocket
/// * Automatically via the [`close_session fn`](crate::engine::EngineIo::close_session) as a fallback.
/// Because with polling transport, if the client is not currently polling then the encoder will never be able to close the channel
pub(crate) internal_rx: Mutex<PeekableReceiver<Packet>>,
///
/// The channel is made of a [`SmallVec`] of [`Packet`]s so that adjacent packets can be sent atomically.
pub(crate) internal_rx: Mutex<PeekableReceiver<PacketBuf>>,

/// Channel to send [Packet] to the internal connection
internal_tx: mpsc::Sender<Packet>,
/// Channel to send [PacketBuf] to the internal connection
internal_tx: mpsc::Sender<PacketBuf>,

/// Internal channel to receive Pong [`Packets`](Packet) (v4 protocol) or Ping (v3 protocol) in the heartbeat job
/// which is running in a separate task
Expand Down Expand Up @@ -266,7 +260,12 @@ where
pub(crate) fn send(&self, packet: Packet) -> Result<(), TrySendError<Packet>> {
#[cfg(feature = "tracing")]
tracing::debug!("[sid={}] sending packet: {:?}", self.id, packet);
self.internal_tx.try_send(packet)?;
self.internal_tx
.try_send(smallvec![packet])
.map_err(|p| match p {
TrySendError::Full(mut p) => TrySendError::Full(p.pop().unwrap()),
TrySendError::Closed(mut p) => TrySendError::Closed(p.pop().unwrap()),
})?;
Ok(())
}

Expand Down Expand Up @@ -334,7 +333,7 @@ where
heartbeat_rx.try_recv().ok();

self.internal_tx
.try_send(Packet::Ping)
.try_send(smallvec![Packet::Ping])
.map_err(|_| Error::HeartbeatTimeout)?;
tokio::time::timeout(timeout, heartbeat_rx.recv())
.await
Expand Down Expand Up @@ -363,7 +362,7 @@ where
#[cfg(feature = "tracing")]
tracing::debug!("[sid={}] ping received, sending pong", self.id);
self.internal_tx
.try_send(Packet::Pong)
.try_send(smallvec![Packet::Pong])
.map_err(|_| Error::HeartbeatTimeout)?;
}
}
Expand Down Expand Up @@ -395,9 +394,9 @@ where
/// If the internal chan is full, the function will return a [`TrySendError::Full`] error.
/// If the socket is closed, the function will return a [`TrySendError::Closed`] error.
#[inline]
pub fn reserve(&self, n: usize) -> Result<PermitIterator<'_>, TrySendError<()>> {
let inner = self.internal_tx.try_reserve_many(n)?;
Ok(PermitIterator { inner })
pub fn reserve(&self) -> Result<Permit<'_>, TrySendError<()>> {
let permit = self.internal_tx.try_reserve()?;
Ok(Permit { inner: permit })
}

/// Emits a message to the client.
Expand Down
Loading
Loading