Skip to content

Commit

Permalink
fix(engineio/socket): atomically adjacent packet requirement for bina…
Browse files Browse the repository at this point in the history
…ry payloads (#287)
  • Loading branch information
Totodore authored Mar 20, 2024
1 parent fc599c3 commit 00cf4b5
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 159 deletions.
1 change: 1 addition & 0 deletions engineioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ hyper-util = { workspace = true, features = ["tokio"] }
base64 = "0.22.0"
bytes = "1.4.0"
rand = "0.8.5"
smallvec = { version = "1.13.1", features = ["union"] }

# Tracing
tracing = { workspace = true, optional = true }
Expand Down
67 changes: 33 additions & 34 deletions engineioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use std::{
};

use http::request::Parts;
use smallvec::{smallvec, SmallVec};
use tokio::{
sync::{
mpsc::{self},
Expand Down Expand Up @@ -118,44 +119,35 @@ impl From<&Error> for Option<DisconnectReason> {
/// A permit to emit a message to the client.
/// A permit holds a place in the internal channel to send one packet to the client.
pub struct Permit<'a> {
inner: mpsc::Permit<'a, Packet>,
inner: mpsc::Permit<'a, PacketBuf>,
}
impl Permit<'_> {
/// Consume the permit and emit a message to the client.
#[inline]
pub fn emit(self, msg: String) {
self.inner.send(Packet::Message(msg));
self.inner.send(smallvec![Packet::Message(msg)]);
}
/// Consume the permit and emit a binary message to the client.
#[inline]
pub fn emit_binary(self, data: Vec<u8>) {
self.inner.send(Packet::Binary(data));
self.inner.send(smallvec![Packet::Binary(data)]);
}
}

/// An [`Iterator`] over the permits returned by the [`reserve`](Socket::reserve) function
#[derive(Debug)]
pub struct PermitIterator<'a> {
inner: mpsc::PermitIterator<'a, Packet>,
}

impl<'a> Iterator for PermitIterator<'a> {
type Item = Permit<'a>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
let inner = self.inner.next()?;
Some(Permit { inner })
}
}
impl ExactSizeIterator for PermitIterator<'_> {
#[inline]
fn len(&self) -> usize {
self.inner.len()
/// Consume the permit and emit a message with multiple binary data to the client.
///
/// It can be used to ensure atomicity when sending a string packet with adjacent binary packets.
pub fn emit_many(self, msg: String, data: Vec<Vec<u8>>) {
let mut packets = SmallVec::with_capacity(data.len() + 1);
packets.push(Packet::Message(msg));
for d in data {
packets.push(Packet::Binary(d));
}
self.inner.send(packets);
}
}
impl std::iter::FusedIterator for PermitIterator<'_> {}

/// Buffered packets to send to the client
pub(crate) type PacketBuf = SmallVec<[Packet; 10]>;
/// A [`Socket`] represents a client connection to the server.
/// It is agnostic to the [`TransportType`].
///
Expand All @@ -179,7 +171,7 @@ where
/// without any mutex
transport: AtomicU8,

/// Channel to receive [`Packet`] from the connection
/// Channel to send [`PacketBuf`] to the connection
///
/// It is used and managed by the [`EngineIo`](crate::engine) struct depending on the transport type
///
Expand All @@ -192,10 +184,12 @@ where
/// * From the fn [`on_ws_req_init`](crate::engine::EngineIo) if the transport is websocket
/// * Automatically via the [`close_session fn`](crate::engine::EngineIo::close_session) as a fallback.
/// Because with polling transport, if the client is not currently polling then the encoder will never be able to close the channel
pub(crate) internal_rx: Mutex<PeekableReceiver<Packet>>,
///
/// The channel is made of a [`SmallVec`] of [`Packet`]s so that adjacent packets can be sent atomically.
pub(crate) internal_rx: Mutex<PeekableReceiver<PacketBuf>>,

/// Channel to send [Packet] to the internal connection
internal_tx: mpsc::Sender<Packet>,
/// Channel to send [PacketBuf] to the internal connection
internal_tx: mpsc::Sender<PacketBuf>,

/// Internal channel to receive Pong [`Packets`](Packet) (v4 protocol) or Ping (v3 protocol) in the heartbeat job
/// which is running in a separate task
Expand Down Expand Up @@ -266,7 +260,12 @@ where
pub(crate) fn send(&self, packet: Packet) -> Result<(), TrySendError<Packet>> {
#[cfg(feature = "tracing")]
tracing::debug!("[sid={}] sending packet: {:?}", self.id, packet);
self.internal_tx.try_send(packet)?;
self.internal_tx
.try_send(smallvec![packet])
.map_err(|p| match p {
TrySendError::Full(mut p) => TrySendError::Full(p.pop().unwrap()),
TrySendError::Closed(mut p) => TrySendError::Closed(p.pop().unwrap()),
})?;
Ok(())
}

Expand Down Expand Up @@ -334,7 +333,7 @@ where
heartbeat_rx.try_recv().ok();

self.internal_tx
.try_send(Packet::Ping)
.try_send(smallvec![Packet::Ping])
.map_err(|_| Error::HeartbeatTimeout)?;
tokio::time::timeout(timeout, heartbeat_rx.recv())
.await
Expand Down Expand Up @@ -363,7 +362,7 @@ where
#[cfg(feature = "tracing")]
tracing::debug!("[sid={}] ping received, sending pong", self.id);
self.internal_tx
.try_send(Packet::Pong)
.try_send(smallvec![Packet::Pong])
.map_err(|_| Error::HeartbeatTimeout)?;
}
}
Expand Down Expand Up @@ -395,9 +394,9 @@ where
/// If the internal chan is full, the function will return a [`TrySendError::Full`] error.
/// If the socket is closed, the function will return a [`TrySendError::Closed`] error.
#[inline]
pub fn reserve(&self, n: usize) -> Result<PermitIterator<'_>, TrySendError<()>> {
let inner = self.internal_tx.try_reserve_many(n)?;
Ok(PermitIterator { inner })
pub fn reserve(&self) -> Result<Permit<'_>, TrySendError<()>> {
let permit = self.internal_tx.try_reserve()?;
Ok(Permit { inner: permit })
}

/// Emits a message to the client.
Expand Down
Loading

0 comments on commit 00cf4b5

Please sign in to comment.