diff --git a/e2e/engineioxide/Cargo.toml b/e2e/engineioxide/Cargo.toml index 4f8c33c7..685a09c4 100644 --- a/e2e/engineioxide/Cargo.toml +++ b/e2e/engineioxide/Cargo.toml @@ -12,6 +12,7 @@ license.workspace = true publish = false [dependencies] +bytes.workspace = true engineioxide = { path = "../../engineioxide", default-features = false, features = [ "tracing", ] } diff --git a/e2e/engineioxide/engineioxide.rs b/e2e/engineioxide/engineioxide.rs index 12b76b7f..f174be59 100644 --- a/e2e/engineioxide/engineioxide.rs +++ b/e2e/engineioxide/engineioxide.rs @@ -2,6 +2,7 @@ use std::{sync::Arc, time::Duration}; +use bytes::Bytes; use engineioxide::{ config::EngineIoConfig, handler::EngineIoHandler, @@ -32,7 +33,7 @@ impl EngineIoHandler for MyHandler { socket.emit(msg).ok(); } - fn on_binary(&self, data: Vec, socket: Arc>) { + fn on_binary(&self, data: Bytes, socket: Arc>) { println!("Ping pong binary message {:?}", data); socket.emit_binary(data).ok(); } diff --git a/engineioxide/Readme.md b/engineioxide/Readme.md index 228216f2..1d511526 100644 --- a/engineioxide/Readme.md +++ b/engineioxide/Readme.md @@ -20,6 +20,7 @@ engineioxide = { version = "0.3.0", features = ["v3"] } ## Basic example with axum : ```rust +use bytes::Bytes; use engineioxide::layer::EngineIoLayer; use engineioxide::handler::EngineIoHandler; use engineioxide::{Socket, DisconnectReason}; @@ -52,7 +53,7 @@ impl EngineIoHandler for MyHandler { fn on_message(&self, msg: String, socket: Arc>) { *socket.data.id.lock().unwrap() = msg; // bind a provided user id to a socket } - fn on_binary(&self, data: Vec, socket: Arc>) { } + fn on_binary(&self, data: Bytes, socket: Arc>) { } } // Create a new engineio layer @@ -64,4 +65,4 @@ let app = axum::Router::<()>::new() // Spawn the axum server -``` \ No newline at end of file +``` diff --git a/engineioxide/src/config.rs b/engineioxide/src/config.rs index c97e12f6..ced83068 100644 --- a/engineioxide/src/config.rs +++ b/engineioxide/src/config.rs @@ -1,6 +1,7 @@ //! ## Configuration for the engine.io engine & transports //! #### Example : //! ```rust +//! # use bytes::Bytes; //! # use engineioxide::config::EngineIoConfig; //! # use engineioxide::service::EngineIoService; //! # use engineioxide::handler::EngineIoHandler; @@ -15,7 +16,7 @@ //! fn on_connect(&self, socket: Arc>) { } //! fn on_disconnect(&self, socket: Arc>, reason: DisconnectReason) { } //! fn on_message(&self, msg: String, socket: Arc>) { } -//! fn on_binary(&self, data: Vec, socket: Arc>) { } +//! fn on_binary(&self, data: Bytes, socket: Arc>) { } //! } //! //! let config = EngineIoConfig::builder() @@ -128,6 +129,7 @@ impl EngineIoConfigBuilder { /// /// If the buffer if full the `emit()` method will return an error /// ``` + /// # use bytes::Bytes; /// # use engineioxide::{ /// layer::EngineIoLayer, /// handler::EngineIoHandler, @@ -152,7 +154,7 @@ impl EngineIoConfigBuilder { /// socket.emit(msg).unwrap(); /// } /// - /// fn on_binary(&self, data: Vec, socket: Arc>) { + /// fn on_binary(&self, data: Bytes, socket: Arc>) { /// println!("Ping pong binary message {:?}", data); /// socket.emit_binary(data).unwrap(); /// } diff --git a/engineioxide/src/engine.rs b/engineioxide/src/engine.rs index 41eb3838..8271a50a 100644 --- a/engineioxide/src/engine.rs +++ b/engineioxide/src/engine.rs @@ -95,6 +95,7 @@ impl EngineIo { #[cfg(test)] mod tests { + use bytes::Bytes; use http::Request; use super::*; @@ -118,7 +119,7 @@ mod tests { socket.emit(msg).ok(); } - fn on_binary(&self, data: Vec, socket: Arc>) { + fn on_binary(&self, data: Bytes, socket: Arc>) { println!("Ping pong binary message {:?}", data); socket.emit_binary(data).ok(); } diff --git a/engineioxide/src/handler.rs b/engineioxide/src/handler.rs index 48d4c9f9..8dcd7d90 100644 --- a/engineioxide/src/handler.rs +++ b/engineioxide/src/handler.rs @@ -1,6 +1,7 @@ //! ## An [`EngineIoHandler`] to get event calls for any engine.io socket //! #### Example : //! ```rust +//! # use bytes::Bytes; //! # use engineioxide::service::EngineIoService; //! # use engineioxide::handler::EngineIoHandler; //! # use engineioxide::{Socket, DisconnectReason}; @@ -32,7 +33,7 @@ //! fn on_message(&self, msg: String, socket: Arc>) { //! *socket.data.id.lock().unwrap() = msg; // bind a provided user id to a socket //! } -//! fn on_binary(&self, data: Vec, socket: Arc>) { } +//! fn on_binary(&self, data: Bytes, socket: Arc>) { } //! } //! //! // Create an engine io service with the given handler @@ -40,6 +41,8 @@ //! ``` use std::sync::Arc; +use bytes::Bytes; + use crate::socket::{DisconnectReason, Socket}; /// The [`EngineIoHandler`] trait can be implemented on any struct to handle socket events @@ -59,7 +62,7 @@ pub trait EngineIoHandler: std::fmt::Debug + Send + Sync + 'static { fn on_message(&self, msg: String, socket: Arc>); /// Called when a binary message is received from the client. - fn on_binary(&self, data: Vec, socket: Arc>); + fn on_binary(&self, data: Bytes, socket: Arc>); } impl EngineIoHandler for Arc { @@ -77,7 +80,7 @@ impl EngineIoHandler for Arc { (**self).on_message(msg, socket) } - fn on_binary(&self, data: Vec, socket: Arc>) { + fn on_binary(&self, data: Bytes, socket: Arc>) { (**self).on_binary(data, socket) } } diff --git a/engineioxide/src/layer.rs b/engineioxide/src/layer.rs index ed1f09c6..b2578cab 100644 --- a/engineioxide/src/layer.rs +++ b/engineioxide/src/layer.rs @@ -2,6 +2,7 @@ //! //! #### Example with axum : //! ```rust +//! # use bytes::Bytes; //! # use engineioxide::layer::EngineIoLayer; //! # use engineioxide::handler::EngineIoHandler; //! # use engineioxide::{Socket, DisconnectReason}; @@ -15,7 +16,7 @@ //! fn on_connect(&self, socket: Arc>) { } //! fn on_disconnect(&self, socket: Arc>, reason: DisconnectReason) { } //! fn on_message(&self, msg: String, socket: Arc>) { } -//! fn on_binary(&self, data: Vec, socket: Arc>) { } +//! fn on_binary(&self, data: Bytes, socket: Arc>) { } //! } //! // Create a new engineio layer //! let layer = EngineIoLayer::new(MyHandler); diff --git a/engineioxide/src/packet.rs b/engineioxide/src/packet.rs index 4dda495d..e8c31bf9 100644 --- a/engineioxide/src/packet.rs +++ b/engineioxide/src/packet.rs @@ -1,4 +1,5 @@ use base64::{engine::general_purpose, Engine}; +use bytes::Bytes; use serde::Serialize; use crate::config::EngineIoConfig; @@ -38,7 +39,7 @@ pub enum Packet { /// Or to a websocket binary frame when using websocket connection /// /// When receiving, it is only used with polling connection, websocket use binary frame - Binary(Vec), // Not part of the protocol, used internally + Binary(Bytes), // Not part of the protocol, used internally /// Binary packet used to send binary data to the client /// Converts to a String using base64 encoding when using polling connection @@ -47,7 +48,7 @@ pub enum Packet { /// When receiving, it is only used with polling connection, websocket use binary frame /// /// This is a special packet, excepionally specific to the V3 protocol. - BinaryV3(Vec), // Not part of the protocol, used internally + BinaryV3(Bytes), // Not part of the protocol, used internally } impl Packet { @@ -65,7 +66,7 @@ impl Packet { } /// If the packet is a binary packet, it returns the binary data - pub(crate) fn into_binary(self) -> Vec { + pub(crate) fn into_binary(self) -> Bytes { match self { Packet::Binary(data) => data, Packet::BinaryV3(data) => data, @@ -159,10 +160,16 @@ impl TryFrom<&str> for Packet { b'4' => Packet::Message(value[1..].to_string()), b'5' => Packet::Upgrade, b'6' => Packet::Noop, - b'b' if value.as_bytes().get(1) == Some(&b'4') => { - Packet::BinaryV3(general_purpose::STANDARD.decode(value[2..].as_bytes())?) - } - b'b' => Packet::Binary(general_purpose::STANDARD.decode(value[1..].as_bytes())?), + b'b' if value.as_bytes().get(1) == Some(&b'4') => Packet::BinaryV3( + general_purpose::STANDARD + .decode(value[2..].as_bytes())? + .into(), + ), + b'b' => Packet::Binary( + general_purpose::STANDARD + .decode(value[1..].as_bytes())? + .into(), + ), c => Err(Error::InvalidPacketType(Some(*c as char)))?, }; Ok(res) @@ -241,7 +248,7 @@ mod tests { #[test] fn test_binary_packet() { - let packet = Packet::Binary(vec![1, 2, 3]); + let packet = Packet::Binary(vec![1, 2, 3].into()); let packet_str: String = packet.try_into().unwrap(); assert_eq!(packet_str, "bAQID"); } @@ -250,12 +257,12 @@ mod tests { fn test_binary_packet_deserialize() { let packet_str = "bAQID".to_string(); let packet: Packet = packet_str.try_into().unwrap(); - assert_eq!(packet, Packet::Binary(vec![1, 2, 3])); + assert_eq!(packet, Packet::Binary(vec![1, 2, 3].into())); } #[test] fn test_binary_packet_v3() { - let packet = Packet::BinaryV3(vec![1, 2, 3]); + let packet = Packet::BinaryV3(vec![1, 2, 3].into()); let packet_str: String = packet.try_into().unwrap(); assert_eq!(packet_str, "b4AQID"); } @@ -264,7 +271,7 @@ mod tests { fn test_binary_packet_v3_deserialize() { let packet_str = "b4AQID".to_string(); let packet: Packet = packet_str.try_into().unwrap(); - assert_eq!(packet, Packet::BinaryV3(vec![1, 2, 3])); + assert_eq!(packet, Packet::BinaryV3(vec![1, 2, 3].into())); } #[test] @@ -310,11 +317,11 @@ mod tests { let packet = Packet::Noop; assert_eq!(packet.get_size_hint(false), 1); - let packet = Packet::Binary(vec![1, 2, 3]); + let packet = Packet::Binary(vec![1, 2, 3].into()); assert_eq!(packet.get_size_hint(false), 4); assert_eq!(packet.get_size_hint(true), 5); - let packet = Packet::BinaryV3(vec![1, 2, 3]); + let packet = Packet::BinaryV3(vec![1, 2, 3].into()); assert_eq!(packet.get_size_hint(false), 4); assert_eq!(packet.get_size_hint(true), 6); } diff --git a/engineioxide/src/service/mod.rs b/engineioxide/src/service/mod.rs index f1a63892..4a5fae1b 100644 --- a/engineioxide/src/service/mod.rs +++ b/engineioxide/src/service/mod.rs @@ -4,6 +4,7 @@ //! #### Example with a `hyper` standalone service : //! //! ```rust +//! # use bytes::Bytes; //! # use engineioxide::layer::EngineIoLayer; //! # use engineioxide::handler::EngineIoHandler; //! # use engineioxide::service::EngineIoService; @@ -17,7 +18,7 @@ //! fn on_connect(&self, socket: Arc>) { } //! fn on_disconnect(&self, socket: Arc>, reason: DisconnectReason) { } //! fn on_message(&self, msg: String, socket: Arc>) { } -//! fn on_binary(&self, data: Vec, socket: Arc>) { } +//! fn on_binary(&self, data: Bytes, socket: Arc>) { } //! } //! //! // Create a new engine.io service that will return a 404 not found response for other requests diff --git a/engineioxide/src/socket.rs b/engineioxide/src/socket.rs index b5e9792f..29143b33 100644 --- a/engineioxide/src/socket.rs +++ b/engineioxide/src/socket.rs @@ -7,6 +7,7 @@ //! //! #### Example : //! ```rust +//! # use bytes::Bytes; //! # use engineioxide::service::EngineIoService; //! # use engineioxide::handler::EngineIoHandler; //! # use engineioxide::{Socket, DisconnectReason}; @@ -48,7 +49,7 @@ //! fn on_message(&self, msg: String, socket: Arc>) { //! *socket.data.id.lock().unwrap() = msg; // bind a provided user id to a socket //! } -//! fn on_binary(&self, data: Vec, socket: Arc>) { } +//! fn on_binary(&self, data: Bytes, socket: Arc>) { } //! } //! //! let svc = EngineIoService::new(MyHandler::default()); @@ -61,6 +62,7 @@ use std::{ time::Duration, }; +use bytes::Bytes; use http::request::Parts; use smallvec::{smallvec, SmallVec}; use tokio::{ @@ -129,14 +131,14 @@ impl Permit<'_> { } /// Consume the permit and emit a binary message to the client. #[inline] - pub fn emit_binary(self, data: Vec) { + pub fn emit_binary(self, data: Bytes) { self.inner.send(smallvec![Packet::Binary(data)]); } /// Consume the permit and emit a message with multiple binary data to the client. /// /// It can be used to ensure atomicity when sending a string packet with adjacent binary packets. - pub fn emit_many(self, msg: String, data: Vec>) { + pub fn emit_many(self, msg: String, data: Vec) { let mut packets = SmallVec::with_capacity(data.len() + 1); packets.push(Packet::Message(msg)); for d in data { @@ -438,7 +440,7 @@ where /// If the transport is in polling mode, the message is buffered and sent as a text frame **encoded in base64** to the next polling request. /// /// ⚠️ If the buffer is full or the socket is disconnected, an error will be returned with the original data - pub fn emit_binary(&self, data: Vec) -> Result<(), TrySendError>> { + pub fn emit_binary(&self, data: Bytes) -> Result<(), TrySendError> { if self.protocol == ProtocolVersion::V3 { self.send(Packet::BinaryV3(data)) } else { diff --git a/engineioxide/src/transport/polling/payload/decoder.rs b/engineioxide/src/transport/polling/payload/decoder.rs index 39855261..7f8120cf 100644 --- a/engineioxide/src/transport/polling/payload/decoder.rs +++ b/engineioxide/src/transport/polling/payload/decoder.rs @@ -205,7 +205,7 @@ where STRING_PACKET_IDENTIFIER_V3 => std::str::from_utf8(&packet_buf) .map_err(|_| Error::InvalidPacketLength) .and_then(Packet::try_from), // Convert the packet buffer to a Packet object - BINARY_PACKET_IDENTIFIER_V3 => Ok(Packet::BinaryV3(packet_buf)), + BINARY_PACKET_IDENTIFIER_V3 => Ok(Packet::BinaryV3(packet_buf.into())), _ => Err(Error::InvalidPacketLength), }; diff --git a/engineioxide/src/transport/polling/payload/encoder.rs b/engineioxide/src/transport/polling/payload/encoder.rs index 4d20edb3..839391fa 100644 --- a/engineioxide/src/transport/polling/payload/encoder.rs +++ b/engineioxide/src/transport/polling/payload/encoder.rs @@ -106,35 +106,51 @@ pub async fn v4_encoder( } } - Ok(Payload::new(data, false)) + Ok(Payload::new(data.into(), false)) } /// Encode one packet into a *binary* payload according to the /// [engine.io v3 protocol](https://github.com/socketio/engine.io-protocol/tree/v3#payload) #[cfg(feature = "v3")] -pub fn v3_bin_packet_encoder(packet: Packet, data: &mut Vec) -> Result<(), Error> { +pub fn v3_bin_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> Result<(), Error> { use crate::transport::polling::payload::BINARY_PACKET_SEPARATOR_V3; + use bytes::BufMut; + match packet { Packet::BinaryV3(bin) => { - data.push(0x1); - let len = (bin.len() + 1).to_string(); + let len_len = if let (_, Some(upper)) = len.chars().size_hint() { + upper + } else { + 0 + }; + + data.reserve(1 + len_len + 2 + bin.len()); + + data.put_u8(0x1); for char in len.chars() { - data.push(char as u8 - 48); + data.put_u8(char as u8 - 48); } - data.push(BINARY_PACKET_SEPARATOR_V3); // separator - data.push(0x04); // message packet type + data.put_u8(BINARY_PACKET_SEPARATOR_V3); // separator + data.put_u8(0x04); // message packet type data.extend_from_slice(&bin); // raw data } packet => { let packet: String = packet.try_into()?; - data.push(0x0); // 0 = string - let len = packet.len().to_string(); + let len_len = if let (_, Some(upper)) = len.chars().size_hint() { + upper + } else { + 0 + }; + + data.reserve(1 + len_len + 1 + packet.as_bytes().len()); + + data.put_u8(0x0); // 0 = string for char in len.chars() { - data.push(char as u8 - 48); + data.put_u8(char as u8 - 48); } - data.push(BINARY_PACKET_SEPARATOR_V3); // separator + data.put_u8(BINARY_PACKET_SEPARATOR_V3); // separator data.extend_from_slice(packet.as_bytes()); // packet } }; @@ -144,8 +160,9 @@ pub fn v3_bin_packet_encoder(packet: Packet, data: &mut Vec) -> Result<(), E /// Encode one packet into a *string* payload according to the /// [engine.io v3 protocol](https://github.com/socketio/engine.io-protocol/tree/v3#payload) #[cfg(feature = "v3")] -pub fn v3_string_packet_encoder(packet: Packet, data: &mut Vec) -> Result<(), Error> { +pub fn v3_string_packet_encoder(packet: Packet, data: &mut bytes::BytesMut) -> Result<(), Error> { use crate::transport::polling::payload::STRING_PACKET_SEPARATOR_V3; + use bytes::BufMut; let packet: String = packet.try_into()?; let packet = format!( "{}{}{}", @@ -153,7 +170,7 @@ pub fn v3_string_packet_encoder(packet: Packet, data: &mut Vec) -> Result<() STRING_PACKET_SEPARATOR_V3 as char, packet ); - data.extend_from_slice(packet.as_bytes()); + data.put_slice(packet.as_bytes()); Ok(()) } @@ -164,7 +181,7 @@ pub async fn v3_binary_encoder( mut rx: MutexGuard<'_, PeekableReceiver>, max_payload: u64, ) -> Result { - let mut data: Vec = Vec::new(); + let mut data = bytes::BytesMut::new(); let mut packet_buffer: Vec = Vec::new(); // estimated size of the `packet_buffer` in bytes @@ -218,7 +235,7 @@ pub async fn v3_binary_encoder( #[cfg(feature = "tracing")] tracing::debug!("sending packet: {:?}", &data); - Ok(Payload::new(data, has_binary)) + Ok(Payload::new(data.freeze(), has_binary)) } /// Encode multiple packet packet into a *string* payload according to the @@ -228,7 +245,7 @@ pub async fn v3_string_encoder( mut rx: MutexGuard<'_, PeekableReceiver>, max_payload: u64, ) -> Result { - let mut data: Vec = Vec::new(); + let mut data = bytes::BytesMut::new(); #[cfg(feature = "tracing")] tracing::debug!("encoding payload with v3 string encoder"); @@ -252,12 +269,12 @@ pub async fn v3_string_encoder( } } - Ok(Payload::new(data, false)) + Ok(Payload::new(data.freeze(), false)) } #[cfg(test)] mod tests { - + use bytes::Bytes; use tokio::sync::Mutex; use PacketBuf; @@ -273,8 +290,10 @@ mod tests { let rx = rx.lock().await; tx.try_send(smallvec::smallvec![Packet::Message("hello€".into())]) .unwrap(); - tx.try_send(smallvec::smallvec![Packet::Binary(vec![1, 2, 3, 4])]) - .unwrap(); + tx.try_send(smallvec::smallvec![Packet::Binary(Bytes::from_static(&[ + 1, 2, 3, 4 + ]))]) + .unwrap(); tx.try_send(smallvec::smallvec![Packet::Message("hello€".into())]) .unwrap(); let Payload { data, .. } = v4_encoder(rx, MAX_PAYLOAD).await.unwrap(); @@ -288,8 +307,10 @@ mod tests { let mutex = Mutex::new(PeekableReceiver::new(rx)); tx.try_send(smallvec::smallvec![Packet::Message("hello€".into())]) .unwrap(); - tx.try_send(smallvec::smallvec![Packet::Binary(vec![1, 2, 3, 4])]) - .unwrap(); + tx.try_send(smallvec::smallvec![Packet::Binary(Bytes::from_static(&[ + 1, 2, 3, 4 + ]))]) + .unwrap(); tx.try_send(smallvec::smallvec![Packet::Message("hello€".into())]) .unwrap(); tx.try_send(smallvec::smallvec![Packet::Message("hello€".into())]) @@ -321,8 +342,10 @@ mod tests { tx.try_send(smallvec::smallvec![Packet::Message("hello€".into())]) .unwrap(); - tx.try_send(smallvec::smallvec![Packet::BinaryV3(vec![1, 2, 3, 4])]) - .unwrap(); + tx.try_send(smallvec::smallvec![Packet::BinaryV3(Bytes::from_static( + &[1, 2, 3, 4] + ))]) + .unwrap(); tx.try_send(smallvec::smallvec![Packet::Message("hello€".into())]) .unwrap(); let Payload { @@ -341,8 +364,10 @@ mod tests { let mutex = Mutex::new(PeekableReceiver::new(rx)); tx.try_send(smallvec::smallvec![Packet::Message("hello€".into())]) .unwrap(); - tx.try_send(smallvec::smallvec![Packet::BinaryV3(vec![1, 2, 3, 4])]) - .unwrap(); + tx.try_send(smallvec::smallvec![Packet::BinaryV3(Bytes::from_static( + &[1, 2, 3, 4] + ))]) + .unwrap(); tx.try_send(smallvec::smallvec![Packet::Message("hello€".into())]) .unwrap(); tx.try_send(smallvec::smallvec![Packet::Message("hello€".into())]) @@ -371,12 +396,14 @@ mod tests { tx.try_send(smallvec::smallvec![Packet::Message("hello€".into())]) .unwrap(); - tx.try_send(smallvec::smallvec![Packet::BinaryV3(vec![1, 2, 3, 4])]) - .unwrap(); + tx.try_send(smallvec::smallvec![Packet::BinaryV3(Bytes::from_static( + &[1, 2, 3, 4] + ))]) + .unwrap(); let Payload { data, has_binary, .. } = v3_binary_encoder(rx, MAX_PAYLOAD).await.unwrap(); - assert_eq!(data, PAYLOAD); + assert_eq!(*data, PAYLOAD); assert!(has_binary); } @@ -393,8 +420,10 @@ mod tests { let mutex = Mutex::new(PeekableReceiver::new(rx)); tx.try_send(smallvec::smallvec![Packet::Message("hellooo€".into())]) .unwrap(); - tx.try_send(smallvec::smallvec![Packet::BinaryV3(vec![1, 2, 3, 4])]) - .unwrap(); + tx.try_send(smallvec::smallvec![Packet::BinaryV3(Bytes::from_static( + &[1, 2, 3, 4] + ))]) + .unwrap(); tx.try_send(smallvec::smallvec![Packet::Message("hello€".into())]) .unwrap(); tx.try_send(smallvec::smallvec![Packet::Message("hello€".into())]) @@ -402,7 +431,7 @@ mod tests { { let rx = mutex.lock().await; let Payload { data, .. } = v3_binary_encoder(rx, MAX_PAYLOAD).await.unwrap(); - assert_eq!(data, PAYLOAD); + assert_eq!(*data, PAYLOAD); } { let rx = mutex.lock().await; diff --git a/engineioxide/src/transport/polling/payload/mod.rs b/engineioxide/src/transport/polling/payload/mod.rs index 5aea15bf..63c7ef3a 100644 --- a/engineioxide/src/transport/polling/payload/mod.rs +++ b/engineioxide/src/transport/polling/payload/mod.rs @@ -4,6 +4,7 @@ use crate::{ errors::Error, packet::Packet, peekable::PeekableReceiver, service::ProtocolVersion, socket::PacketBuf, }; +use bytes::Bytes; use futures::Stream; use http::Request; use tokio::sync::MutexGuard; @@ -54,11 +55,11 @@ pub fn decoder( /// A payload to transmit to the client through http polling pub struct Payload { - pub data: Vec, + pub data: Bytes, pub has_binary: bool, } impl Payload { - pub fn new(data: impl Into>, has_binary: bool) -> Self { + pub fn new(data: Bytes, has_binary: bool) -> Self { Self { data: data.into(), has_binary, diff --git a/engineioxide/src/transport/ws.rs b/engineioxide/src/transport/ws.rs index f53cb096..96c99cb1 100644 --- a/engineioxide/src/transport/ws.rs +++ b/engineioxide/src/transport/ws.rs @@ -187,7 +187,7 @@ where // The first byte is the message type, which we don't need. let _ = data.remove(0); } - engine.handler.on_binary(data, socket.clone()); + engine.handler.on_binary(data.into(), socket.clone()); Ok(()) } Message::Close(_) => break, @@ -216,7 +216,8 @@ where macro_rules! map_fn { ($item:ident) => { let res = match $item { - Packet::Binary(mut bin) | Packet::BinaryV3(mut bin) => { + Packet::Binary(bin) | Packet::BinaryV3(bin) => { + let mut bin: Vec = bin.into(); if socket.protocol == ProtocolVersion::V3 { // v3 protocol requires packet type as the first byte bin.insert(0, 0x04); diff --git a/engineioxide/tests/disconnect_reason.rs b/engineioxide/tests/disconnect_reason.rs index a3247338..7d54df86 100644 --- a/engineioxide/tests/disconnect_reason.rs +++ b/engineioxide/tests/disconnect_reason.rs @@ -7,6 +7,7 @@ use std::{sync::Arc, time::Duration}; +use bytes::Bytes; use engineioxide::{ handler::EngineIoHandler, socket::{DisconnectReason, Socket}, @@ -42,7 +43,7 @@ impl EngineIoHandler for MyHandler { socket.emit(msg).ok(); } - fn on_binary(&self, data: Vec, socket: Arc>) { + fn on_binary(&self, data: Bytes, socket: Arc>) { println!("Ping pong binary message {:?}", data); socket.emit_binary(data).ok(); } diff --git a/socketioxide/src/client.rs b/socketioxide/src/client.rs index 181e8cc8..33fb1d84 100644 --- a/socketioxide/src/client.rs +++ b/socketioxide/src/client.rs @@ -296,8 +296,8 @@ impl EngineIoHandler for Client { /// When a binary payload is received from a socket, it is applied to the partial binary packet /// /// If the packet is complete, it is propagated to the namespace - fn on_binary(&self, data: Vec, socket: Arc>) { - if apply_payload_on_packet(Bytes::copy_from_slice(&data), &socket) { + fn on_binary(&self, data: Bytes, socket: Arc>) { + if apply_payload_on_packet(data, &socket) { if let Some(packet) = socket.data.partial_bin_packet.lock().unwrap().take() { if let Err(ref err) = self.sock_propagate_packet(packet, socket.id) { #[cfg(feature = "tracing")] diff --git a/socketioxide/src/socket.rs b/socketioxide/src/socket.rs index 98b598eb..e82ec4fa 100644 --- a/socketioxide/src/socket.rs +++ b/socketioxide/src/socket.rs @@ -117,7 +117,7 @@ impl<'a> PermitExt<'a> for Permit<'a> { let msg = packet.into(); if let Some(bin_payloads) = bin_payloads { - self.emit_many(msg, bin_payloads.into_iter().map(Into::into).collect()); + self.emit_many(msg, bin_payloads); } else { self.emit(msg); } diff --git a/socketioxide/tests/connect.rs b/socketioxide/tests/connect.rs index 39320d84..b90d38d5 100644 --- a/socketioxide/tests/connect.rs +++ b/socketioxide/tests/connect.rs @@ -26,8 +26,7 @@ pub async fn connect_middleware() { )); assert!(matches!( - s.bin(vec![Bytes::from_static(&[0, 1, 2, 3])]) - .emit("test", ()), + s.bin(vec![Bytes::from_static(&[0, 1, 2, 3])]).emit("test", ()), Err(SendError::Socket(SocketError::Closed(()))) )); assert!(matches!(