Skip to content

Commit

Permalink
feat(engineioxide): use bytes::Bytes to represent a binary payload
Browse files Browse the repository at this point in the history
This replaces the use of Vec<u8>.
  • Loading branch information
kelnos committed Apr 14, 2024
1 parent 89b6553 commit 1109c80
Show file tree
Hide file tree
Showing 18 changed files with 122 additions and 71 deletions.
1 change: 1 addition & 0 deletions e2e/engineioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ license.workspace = true
publish = false

[dependencies]
bytes.workspace = true
engineioxide = { path = "../../engineioxide", default-features = false, features = [
"tracing",
] }
Expand Down
3 changes: 2 additions & 1 deletion e2e/engineioxide/engineioxide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::{sync::Arc, time::Duration};

use bytes::Bytes;
use engineioxide::{
config::EngineIoConfig,
handler::EngineIoHandler,
Expand Down Expand Up @@ -32,7 +33,7 @@ impl EngineIoHandler for MyHandler {
socket.emit(msg).ok();
}

fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<Self::Data>>) {
fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>) {
println!("Ping pong binary message {:?}", data);
socket.emit_binary(data).ok();
}
Expand Down
5 changes: 3 additions & 2 deletions engineioxide/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ engineioxide = { version = "0.3.0", features = ["v3"] }

## Basic example with axum :
```rust
use bytes::Bytes;
use engineioxide::layer::EngineIoLayer;
use engineioxide::handler::EngineIoHandler;
use engineioxide::{Socket, DisconnectReason};
Expand Down Expand Up @@ -52,7 +53,7 @@ impl EngineIoHandler for MyHandler {
fn on_message(&self, msg: String, socket: Arc<Socket<SocketState>>) {
*socket.data.id.lock().unwrap() = msg; // bind a provided user id to a socket
}
fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<SocketState>>) { }
fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
}

// Create a new engineio layer
Expand All @@ -64,4 +65,4 @@ let app = axum::Router::<()>::new()

// Spawn the axum server

```
```
6 changes: 4 additions & 2 deletions engineioxide/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! ## Configuration for the engine.io engine & transports
//! #### Example :
//! ```rust
//! # use bytes::Bytes;
//! # use engineioxide::config::EngineIoConfig;
//! # use engineioxide::service::EngineIoService;
//! # use engineioxide::handler::EngineIoHandler;
Expand All @@ -15,7 +16,7 @@
//! fn on_connect(&self, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: String, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//!
//! let config = EngineIoConfig::builder()
Expand Down Expand Up @@ -128,6 +129,7 @@ impl EngineIoConfigBuilder {
///
/// If the buffer if full the `emit()` method will return an error
/// ```
/// # use bytes::Bytes;
/// # use engineioxide::{
/// layer::EngineIoLayer,
/// handler::EngineIoHandler,
Expand All @@ -152,7 +154,7 @@ impl EngineIoConfigBuilder {
/// socket.emit(msg).unwrap();
/// }
///
/// fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<()>>) {
/// fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) {
/// println!("Ping pong binary message {:?}", data);
/// socket.emit_binary(data).unwrap();
/// }
Expand Down
3 changes: 2 additions & 1 deletion engineioxide/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl<H: EngineIoHandler> EngineIo<H> {

#[cfg(test)]
mod tests {
use bytes::Bytes;
use http::Request;

use super::*;
Expand All @@ -118,7 +119,7 @@ mod tests {
socket.emit(msg).ok();
}

fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<Self::Data>>) {
fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>) {
println!("Ping pong binary message {:?}", data);
socket.emit_binary(data).ok();
}
Expand Down
9 changes: 6 additions & 3 deletions engineioxide/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! ## An [`EngineIoHandler`] to get event calls for any engine.io socket
//! #### Example :
//! ```rust
//! # use bytes::Bytes;
//! # use engineioxide::service::EngineIoService;
//! # use engineioxide::handler::EngineIoHandler;
//! # use engineioxide::{Socket, DisconnectReason};
Expand Down Expand Up @@ -32,14 +33,16 @@
//! fn on_message(&self, msg: String, socket: Arc<Socket<SocketState>>) {
//! *socket.data.id.lock().unwrap() = msg; // bind a provided user id to a socket
//! }
//! fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<SocketState>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! }
//!
//! // Create an engine io service with the given handler
//! let svc = EngineIoService::new(MyHandler::default());
//! ```
use std::sync::Arc;

use bytes::Bytes;

use crate::socket::{DisconnectReason, Socket};

/// The [`EngineIoHandler`] trait can be implemented on any struct to handle socket events
Expand All @@ -59,7 +62,7 @@ pub trait EngineIoHandler: std::fmt::Debug + Send + Sync + 'static {
fn on_message(&self, msg: String, socket: Arc<Socket<Self::Data>>);

/// Called when a binary message is received from the client.
fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<Self::Data>>);
fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>);
}

impl<T: EngineIoHandler> EngineIoHandler for Arc<T> {
Expand All @@ -77,7 +80,7 @@ impl<T: EngineIoHandler> EngineIoHandler for Arc<T> {
(**self).on_message(msg, socket)
}

fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<Self::Data>>) {
fn on_binary(&self, data: Bytes, socket: Arc<Socket<Self::Data>>) {
(**self).on_binary(data, socket)
}
}
3 changes: 2 additions & 1 deletion engineioxide/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
//! #### Example with axum :
//! ```rust
//! # use bytes::Bytes;
//! # use engineioxide::layer::EngineIoLayer;
//! # use engineioxide::handler::EngineIoHandler;
//! # use engineioxide::{Socket, DisconnectReason};
Expand All @@ -15,7 +16,7 @@
//! fn on_connect(&self, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: String, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//! // Create a new engineio layer
//! let layer = EngineIoLayer::new(MyHandler);
Expand Down
33 changes: 20 additions & 13 deletions engineioxide/src/packet.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use base64::{engine::general_purpose, Engine};
use bytes::Bytes;
use serde::Serialize;

use crate::config::EngineIoConfig;
Expand Down Expand Up @@ -38,7 +39,7 @@ pub enum Packet {
/// Or to a websocket binary frame when using websocket connection
///
/// When receiving, it is only used with polling connection, websocket use binary frame
Binary(Vec<u8>), // Not part of the protocol, used internally
Binary(Bytes), // Not part of the protocol, used internally

/// Binary packet used to send binary data to the client
/// Converts to a String using base64 encoding when using polling connection
Expand All @@ -47,7 +48,7 @@ pub enum Packet {
/// When receiving, it is only used with polling connection, websocket use binary frame
///
/// This is a special packet, excepionally specific to the V3 protocol.
BinaryV3(Vec<u8>), // Not part of the protocol, used internally
BinaryV3(Bytes), // Not part of the protocol, used internally
}

impl Packet {
Expand All @@ -65,7 +66,7 @@ impl Packet {
}

/// If the packet is a binary packet, it returns the binary data
pub(crate) fn into_binary(self) -> Vec<u8> {
pub(crate) fn into_binary(self) -> Bytes {
match self {
Packet::Binary(data) => data,
Packet::BinaryV3(data) => data,
Expand Down Expand Up @@ -159,10 +160,16 @@ impl TryFrom<&str> for Packet {
b'4' => Packet::Message(value[1..].to_string()),
b'5' => Packet::Upgrade,
b'6' => Packet::Noop,
b'b' if value.as_bytes().get(1) == Some(&b'4') => {
Packet::BinaryV3(general_purpose::STANDARD.decode(value[2..].as_bytes())?)
}
b'b' => Packet::Binary(general_purpose::STANDARD.decode(value[1..].as_bytes())?),
b'b' if value.as_bytes().get(1) == Some(&b'4') => Packet::BinaryV3(
general_purpose::STANDARD
.decode(value[2..].as_bytes())?
.into(),
),
b'b' => Packet::Binary(
general_purpose::STANDARD
.decode(value[1..].as_bytes())?
.into(),
),
c => Err(Error::InvalidPacketType(Some(*c as char)))?,
};
Ok(res)
Expand Down Expand Up @@ -241,7 +248,7 @@ mod tests {

#[test]
fn test_binary_packet() {
let packet = Packet::Binary(vec![1, 2, 3]);
let packet = Packet::Binary(vec![1, 2, 3].into());
let packet_str: String = packet.try_into().unwrap();
assert_eq!(packet_str, "bAQID");
}
Expand All @@ -250,12 +257,12 @@ mod tests {
fn test_binary_packet_deserialize() {
let packet_str = "bAQID".to_string();
let packet: Packet = packet_str.try_into().unwrap();
assert_eq!(packet, Packet::Binary(vec![1, 2, 3]));
assert_eq!(packet, Packet::Binary(vec![1, 2, 3].into()));
}

#[test]
fn test_binary_packet_v3() {
let packet = Packet::BinaryV3(vec![1, 2, 3]);
let packet = Packet::BinaryV3(vec![1, 2, 3].into());
let packet_str: String = packet.try_into().unwrap();
assert_eq!(packet_str, "b4AQID");
}
Expand All @@ -264,7 +271,7 @@ mod tests {
fn test_binary_packet_v3_deserialize() {
let packet_str = "b4AQID".to_string();
let packet: Packet = packet_str.try_into().unwrap();
assert_eq!(packet, Packet::BinaryV3(vec![1, 2, 3]));
assert_eq!(packet, Packet::BinaryV3(vec![1, 2, 3].into()));
}

#[test]
Expand Down Expand Up @@ -310,11 +317,11 @@ mod tests {
let packet = Packet::Noop;
assert_eq!(packet.get_size_hint(false), 1);

let packet = Packet::Binary(vec![1, 2, 3]);
let packet = Packet::Binary(vec![1, 2, 3].into());
assert_eq!(packet.get_size_hint(false), 4);
assert_eq!(packet.get_size_hint(true), 5);

let packet = Packet::BinaryV3(vec![1, 2, 3]);
let packet = Packet::BinaryV3(vec![1, 2, 3].into());
assert_eq!(packet.get_size_hint(false), 4);
assert_eq!(packet.get_size_hint(true), 6);
}
Expand Down
3 changes: 2 additions & 1 deletion engineioxide/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! #### Example with a `hyper` standalone service :
//!
//! ```rust
//! # use bytes::Bytes;
//! # use engineioxide::layer::EngineIoLayer;
//! # use engineioxide::handler::EngineIoHandler;
//! # use engineioxide::service::EngineIoService;
Expand All @@ -17,7 +18,7 @@
//! fn on_connect(&self, socket: Arc<Socket<()>>) { }
//! fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
//! fn on_message(&self, msg: String, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<()>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<()>>) { }
//! }
//!
//! // Create a new engine.io service that will return a 404 not found response for other requests
Expand Down
10 changes: 6 additions & 4 deletions engineioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//!
//! #### Example :
//! ```rust
//! # use bytes::Bytes;
//! # use engineioxide::service::EngineIoService;
//! # use engineioxide::handler::EngineIoHandler;
//! # use engineioxide::{Socket, DisconnectReason};
Expand Down Expand Up @@ -48,7 +49,7 @@
//! fn on_message(&self, msg: String, socket: Arc<Socket<SocketState>>) {
//! *socket.data.id.lock().unwrap() = msg; // bind a provided user id to a socket
//! }
//! fn on_binary(&self, data: Vec<u8>, socket: Arc<Socket<SocketState>>) { }
//! fn on_binary(&self, data: Bytes, socket: Arc<Socket<SocketState>>) { }
//! }
//!
//! let svc = EngineIoService::new(MyHandler::default());
Expand All @@ -61,6 +62,7 @@ use std::{
time::Duration,
};

use bytes::Bytes;
use http::request::Parts;
use smallvec::{smallvec, SmallVec};
use tokio::{
Expand Down Expand Up @@ -129,14 +131,14 @@ impl Permit<'_> {
}
/// Consume the permit and emit a binary message to the client.
#[inline]
pub fn emit_binary(self, data: Vec<u8>) {
pub fn emit_binary(self, data: Bytes) {
self.inner.send(smallvec![Packet::Binary(data)]);
}

/// Consume the permit and emit a message with multiple binary data to the client.
///
/// It can be used to ensure atomicity when sending a string packet with adjacent binary packets.
pub fn emit_many(self, msg: String, data: Vec<Vec<u8>>) {
pub fn emit_many(self, msg: String, data: Vec<Bytes>) {
let mut packets = SmallVec::with_capacity(data.len() + 1);
packets.push(Packet::Message(msg));
for d in data {
Expand Down Expand Up @@ -438,7 +440,7 @@ where
/// If the transport is in polling mode, the message is buffered and sent as a text frame **encoded in base64** to the next polling request.
///
/// ⚠️ If the buffer is full or the socket is disconnected, an error will be returned with the original data
pub fn emit_binary(&self, data: Vec<u8>) -> Result<(), TrySendError<Vec<u8>>> {
pub fn emit_binary(&self, data: Bytes) -> Result<(), TrySendError<Bytes>> {
if self.protocol == ProtocolVersion::V3 {
self.send(Packet::BinaryV3(data))
} else {
Expand Down
2 changes: 1 addition & 1 deletion engineioxide/src/transport/polling/payload/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ where
STRING_PACKET_IDENTIFIER_V3 => std::str::from_utf8(&packet_buf)
.map_err(|_| Error::InvalidPacketLength)
.and_then(Packet::try_from), // Convert the packet buffer to a Packet object
BINARY_PACKET_IDENTIFIER_V3 => Ok(Packet::BinaryV3(packet_buf)),
BINARY_PACKET_IDENTIFIER_V3 => Ok(Packet::BinaryV3(packet_buf.into())),
_ => Err(Error::InvalidPacketLength),
};

Expand Down
Loading

0 comments on commit 1109c80

Please sign in to comment.