Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use bytes::Bytes instead of Vec<u8> to represent binary payloads #285

Merged
merged 7 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ default-members = ["engineioxide", "socketioxide"]
resolver = "2"

[workspace.dependencies]
bytes = { version = "1.4.0", features = ["serde"] }
futures = "0.3.27"
tokio = "1.35.0"
tokio-tungstenite = "0.21.0"
Expand Down
1 change: 1 addition & 0 deletions e2e/engineioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ license.workspace = true
publish = false

[dependencies]
bytes.workspace = true
engineioxide = { path = "../../engineioxide", default-features = false, features = [
"tracing",
] }
Expand Down
3 changes: 2 additions & 1 deletion e2e/engineioxide/engineioxide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::{sync::Arc, time::Duration};

use bytes::Bytes;
use engineioxide::{
config::EngineIoConfig,
handler::EngineIoHandler,
Expand Down Expand Up @@ -32,7 +33,7 @@ impl EngineIoHandler for MyHandler {
socket.emit(msg).ok();
}

fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<Self::Data>>) {
fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>) {
println!("Ping pong binary message {:?}", data);
socket.emit_binary(data).ok();
}
Expand Down
5 changes: 3 additions & 2 deletions engineioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ features = ["v3"]
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
bytes.workspace = true
futures.workspace = true
http.workspace = true
http-body.workspace = true
Expand All @@ -34,14 +35,14 @@ pin-project-lite.workspace = true
hyper-util = { workspace = true, features = ["tokio"] }

base64 = "0.22.0"
bytes = "1.4.0"
rand = "0.8.5"
smallvec = { version = "1.13.1", features = ["union"] }

# Tracing
tracing = { workspace = true, optional = true }

# Engine.io V3 payload
itoa = { workspace = true, optional = true }
memchr = { version = "2.5.0", optional = true }
unicode-segmentation = { version = "1.10.1", optional = true }

Expand All @@ -53,7 +54,7 @@ axum.workspace = true
hyper-util = { workspace = true, features = ["tokio", "client-legacy"] }

[features]
v3 = ["memchr", "unicode-segmentation"]
v3 = ["memchr", "unicode-segmentation", "itoa"]
tracing = ["dep:tracing"]

[[bench]]
Expand Down
5 changes: 3 additions & 2 deletions engineioxide/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ engineioxide = { version = "0.3.0", features = ["v3"] }

## Basic example with axum :
```rust
use bytes::Bytes;
use engineioxide::layer::EngineIoLayer;
use engineioxide::handler::EngineIoHandler;
use engineioxide::{Socket, DisconnectReason};
Expand Down Expand Up @@ -52,7 +53,7 @@ impl EngineIoHandler for MyHandler {
fn on_message(&self, msg: String, socket: Arc<Socket<SocketState>>) {
*socket.data.id.lock().unwrap() = msg; // bind a provided user id to a socket
}
fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<SocketState>>) { }
fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
}

// Create a new engineio layer
Expand All @@ -64,4 +65,4 @@ let app = axum::Router::<()>::new()

// Spawn the axum server

```
```
6 changes: 4 additions & 2 deletions engineioxide/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! ## Configuration for the engine.io engine & transports
//! #### Example :
//! ```rust
//! # use bytes::Bytes;
//! # use engineioxide::config::EngineIoConfig;
//! # use engineioxide::service::EngineIoService;
//! # use engineioxide::handler::EngineIoHandler;
Expand All @@ -15,7 +16,7 @@
//! fn on_connect(&self, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: String, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//!
//! let config = EngineIoConfig::builder()
Expand Down Expand Up @@ -128,6 +129,7 @@ impl EngineIoConfigBuilder {
///
/// If the buffer if full the `emit()` method will return an error
/// ```
/// # use bytes::Bytes;
/// # use engineioxide::{
/// layer::EngineIoLayer,
/// handler::EngineIoHandler,
Expand All @@ -152,7 +154,7 @@ impl EngineIoConfigBuilder {
/// socket.emit(msg).unwrap();
/// }
///
/// fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<()>>) {
/// fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) {
/// println!("Ping pong binary message {:?}", data);
/// socket.emit_binary(data).unwrap();
/// }
Expand Down
3 changes: 2 additions & 1 deletion engineioxide/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl<H: EngineIoHandler> EngineIo<H> {

#[cfg(test)]
mod tests {
use bytes::Bytes;
use http::Request;

use super::*;
Expand All @@ -118,7 +119,7 @@ mod tests {
socket.emit(msg).ok();
}

fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<Self::Data>>) {
fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>) {
println!("Ping pong binary message {:?}", data);
socket.emit_binary(data).ok();
}
Expand Down
9 changes: 6 additions & 3 deletions engineioxide/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! ## An [`EngineIoHandler`] to get event calls for any engine.io socket
//! #### Example :
//! ```rust
//! # use bytes::Bytes;
//! # use engineioxide::service::EngineIoService;
//! # use engineioxide::handler::EngineIoHandler;
//! # use engineioxide::{Socket, DisconnectReason};
Expand Down Expand Up @@ -32,14 +33,16 @@
//! fn on_message(&self, msg: String, socket: Arc<Socket<SocketState>>) {
//! *socket.data.id.lock().unwrap() = msg; // bind a provided user id to a socket
//! }
//! fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<SocketState>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! }
//!
//! // Create an engine io service with the given handler
//! let svc = EngineIoService::new(MyHandler::default());
//! ```
use std::sync::Arc;

use bytes::Bytes;

use crate::socket::{DisconnectReason, Socket};

/// The [`EngineIoHandler`] trait can be implemented on any struct to handle socket events
Expand All @@ -59,7 +62,7 @@ pub trait EngineIoHandler: std::fmt::Debug + Send + Sync + 'static {
fn on_message(&self, msg: String, socket: Arc<Socket<Self::Data>>);

/// Called when a binary message is received from the client.
fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<Self::Data>>);
fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>);
}

impl<T: EngineIoHandler> EngineIoHandler for Arc<T> {
Expand All @@ -77,7 +80,7 @@ impl<T: EngineIoHandler> EngineIoHandler for Arc<T> {
(**self).on_message(msg, socket)
}

fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<Self::Data>>) {
fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>) {
(**self).on_binary(data, socket)
}
}
3 changes: 2 additions & 1 deletion engineioxide/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
//! #### Example with axum :
//! ```rust
//! # use bytes::Bytes;
//! # use engineioxide::layer::EngineIoLayer;
//! # use engineioxide::handler::EngineIoHandler;
//! # use engineioxide::{Socket, DisconnectReason};
Expand All @@ -15,7 +16,7 @@
//! fn on_connect(&self, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: String, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//! // Create a new engineio layer
//! let layer = EngineIoLayer::new(MyHandler);
Expand Down
33 changes: 20 additions & 13 deletions engineioxide/src/packet.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use base64::{engine::general_purpose, Engine};
use bytes::Bytes;
use serde::Serialize;

use crate::config::EngineIoConfig;
Expand Down Expand Up @@ -38,7 +39,7 @@ pub enum Packet {
/// Or to a websocket binary frame when using websocket connection
///
/// When receiving, it is only used with polling connection, websocket use binary frame
Binary(Vec<u8>), // Not part of the protocol, used internally
Binary(Bytes), // Not part of the protocol, used internally

/// Binary packet used to send binary data to the client
/// Converts to a String using base64 encoding when using polling connection
Expand All @@ -47,7 +48,7 @@ pub enum Packet {
/// When receiving, it is only used with polling connection, websocket use binary frame
///
/// This is a special packet, excepionally specific to the V3 protocol.
BinaryV3(Vec<u8>), // Not part of the protocol, used internally
BinaryV3(Bytes), // Not part of the protocol, used internally
}

impl Packet {
Expand All @@ -65,7 +66,7 @@ impl Packet {
}

/// If the packet is a binary packet, it returns the binary data
pub(crate) fn into_binary(self) -> Vec<u8> {
pub(crate) fn into_binary(self) -> Bytes {
match self {
Packet::Binary(data) => data,
Packet::BinaryV3(data) => data,
Expand Down Expand Up @@ -159,10 +160,16 @@ impl TryFrom<&str> for Packet {
b'4' => Packet::Message(value[1..].to_string()),
b'5' => Packet::Upgrade,
b'6' => Packet::Noop,
b'b' if value.as_bytes().get(1) == Some(&b'4') => {
Packet::BinaryV3(general_purpose::STANDARD.decode(value[2..].as_bytes())?)
}
b'b' => Packet::Binary(general_purpose::STANDARD.decode(value[1..].as_bytes())?),
b'b' if value.as_bytes().get(1) == Some(&b'4') => Packet::BinaryV3(
general_purpose::STANDARD
.decode(value[2..].as_bytes())?
.into(),
),
b'b' => Packet::Binary(
general_purpose::STANDARD
.decode(value[1..].as_bytes())?
.into(),
),
c => Err(Error::InvalidPacketType(Some(*c as char)))?,
};
Ok(res)
Expand Down Expand Up @@ -241,7 +248,7 @@ mod tests {

#[test]
fn test_binary_packet() {
let packet = Packet::Binary(vec![1, 2, 3]);
let packet = Packet::Binary(vec![1, 2, 3].into());
let packet_str: String = packet.try_into().unwrap();
assert_eq!(packet_str, "bAQID");
}
Expand All @@ -250,12 +257,12 @@ mod tests {
fn test_binary_packet_deserialize() {
let packet_str = "bAQID".to_string();
let packet: Packet = packet_str.try_into().unwrap();
assert_eq!(packet, Packet::Binary(vec![1, 2, 3]));
assert_eq!(packet, Packet::Binary(vec![1, 2, 3].into()));
}

#[test]
fn test_binary_packet_v3() {
let packet = Packet::BinaryV3(vec![1, 2, 3]);
let packet = Packet::BinaryV3(vec![1, 2, 3].into());
let packet_str: String = packet.try_into().unwrap();
assert_eq!(packet_str, "b4AQID");
}
Expand All @@ -264,7 +271,7 @@ mod tests {
fn test_binary_packet_v3_deserialize() {
let packet_str = "b4AQID".to_string();
let packet: Packet = packet_str.try_into().unwrap();
assert_eq!(packet, Packet::BinaryV3(vec![1, 2, 3]));
assert_eq!(packet, Packet::BinaryV3(vec![1, 2, 3].into()));
}

#[test]
Expand Down Expand Up @@ -310,11 +317,11 @@ mod tests {
let packet = Packet::Noop;
assert_eq!(packet.get_size_hint(false), 1);

let packet = Packet::Binary(vec![1, 2, 3]);
let packet = Packet::Binary(vec![1, 2, 3].into());
assert_eq!(packet.get_size_hint(false), 4);
assert_eq!(packet.get_size_hint(true), 5);

let packet = Packet::BinaryV3(vec![1, 2, 3]);
let packet = Packet::BinaryV3(vec![1, 2, 3].into());
assert_eq!(packet.get_size_hint(false), 4);
assert_eq!(packet.get_size_hint(true), 6);
}
Expand Down
3 changes: 2 additions & 1 deletion engineioxide/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! #### Example with a `hyper` standalone service :
//!
//! ```rust
//! # use bytes::Bytes;
//! # use engineioxide::layer::EngineIoLayer;
//! # use engineioxide::handler::EngineIoHandler;
//! # use engineioxide::service::EngineIoService;
Expand All @@ -17,7 +18,7 @@
//! fn on_connect(&self, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: String, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//!
//! // Create a new engine.io service that will return a 404 not found response for other requests
Expand Down
14 changes: 8 additions & 6 deletions engineioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//!
//! #### Example :
//! ```rust
//! # use bytes::Bytes;
//! # use engineioxide::service::EngineIoService;
//! # use engineioxide::handler::EngineIoHandler;
//! # use engineioxide::{Socket, DisconnectReason};
Expand Down Expand Up @@ -48,7 +49,7 @@
//! fn on_message(&self, msg: String, socket: Arc<Socket<SocketState>>) {
//! *socket.data.id.lock().unwrap() = msg; // bind a provided user id to a socket
//! }
//! fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<SocketState>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! }
//!
//! let svc = EngineIoService::new(MyHandler::default());
Expand All @@ -61,6 +62,7 @@ use std::{
time::Duration,
};

use bytes::Bytes;
use http::request::Parts;
use smallvec::{smallvec, SmallVec};
use tokio::{
Expand Down Expand Up @@ -129,14 +131,14 @@ impl Permit<'_> {
}
/// Consume the permit and emit a binary message to the client.
#[inline]
pub fn emit_binary(self, data: Vec<u8>) {
pub fn emit_binary(self, data: Bytes) {
self.inner.send(smallvec![Packet::Binary(data)]);
}

/// Consume the permit and emit a message with multiple binary data to the client.
///
/// It can be used to ensure atomicity when sending a string packet with adjacent binary packets.
pub fn emit_many(self, msg: String, data: Vec<Vec<u8>>) {
pub fn emit_many(self, msg: String, data: Vec<Bytes>) {
let mut packets = SmallVec::with_capacity(data.len() + 1);
packets.push(Packet::Message(msg));
for d in data {
Expand Down Expand Up @@ -438,11 +440,11 @@ where
/// If the transport is in polling mode, the message is buffered and sent as a text frame **encoded in base64** to the next polling request.
///
/// ⚠️ If the buffer is full or the socket is disconnected, an error will be returned with the original data
pub fn emit_binary(&self, data: Vec<u8>) -> Result<(), TrySendError<Vec<u8>>> {
pub fn emit_binary<B: Into<Bytes>>(&self, data: B) -> Result<(), TrySendError<Bytes>> {
if self.protocol == ProtocolVersion::V3 {
self.send(Packet::BinaryV3(data))
self.send(Packet::BinaryV3(data.into()))
} else {
self.send(Packet::Binary(data))
self.send(Packet::Binary(data.into()))
}
.map_err(|e| match e {
TrySendError::Full(p) => TrySendError::Full(p.into_binary()),
Expand Down
2 changes: 1 addition & 1 deletion engineioxide/src/transport/polling/payload/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ where
STRING_PACKET_IDENTIFIER_V3 => std::str::from_utf8(&packet_buf)
.map_err(|_| Error::InvalidPacketLength)
.and_then(Packet::try_from), // Convert the packet buffer to a Packet object
BINARY_PACKET_IDENTIFIER_V3 => Ok(Packet::BinaryV3(packet_buf)),
BINARY_PACKET_IDENTIFIER_V3 => Ok(Packet::BinaryV3(packet_buf.into())),
_ => Err(Error::InvalidPacketLength),
};

Expand Down
Loading