From 72927fdfa1e2d74395a2a252a6b09eb1824e5f1a Mon Sep 17 00:00:00 2001 From: shenjack <3695888@qq.com> Date: Sat, 18 May 2024 12:00:09 +0800 Subject: [PATCH 1/5] Add PacketParser and add default parser --- socketio/src/error.rs | 2 + socketio/src/packet.rs | 131 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 132 insertions(+), 1 deletion(-) diff --git a/socketio/src/error.rs b/socketio/src/error.rs index cc25d897..c7049a59 100644 --- a/socketio/src/error.rs +++ b/socketio/src/error.rs @@ -20,6 +20,8 @@ pub enum Error { IncompletePacket(), #[error("Got an invalid packet which did not follow the protocol format")] InvalidPacket(), + #[error("Error while parsing an incomplete packet: {0}")] + ParsePacketFailed(String), #[error("An error occurred while decoding the utf-8 text: {0}")] InvalidUtf8(#[from] Utf8Error), #[error("An error occurred while encoding/decoding base64: {0}")] diff --git a/socketio/src/packet.rs b/socketio/src/packet.rs index e74dedb5..8407273f 100644 --- a/socketio/src/packet.rs +++ b/socketio/src/packet.rs @@ -1,11 +1,13 @@ use crate::error::{Error, Result}; use crate::{Event, Payload}; use bytes::Bytes; +use rust_engineio::packet; use serde::de::IgnoredAny; use std::convert::TryFrom; -use std::fmt::Write; +use std::fmt::{Display, Write}; use std::str::from_utf8 as str_from_utf8; +use std::sync::Arc; /// An enumeration of the different `Packet` types in the `socket.io` protocol. #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -30,6 +32,133 @@ pub struct Packet { pub attachments: Option>, } +#[derive(Clone)] +/// Use to serialize and deserialize packets +/// +/// support [Custom parser](https://socket.io/docs/v4/custom-parser/) +pub struct PacketParser { + encode: Arc Bytes>>, + decode: Arc Result>>, +} + +impl Display for PacketParser { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "PacketSerializer") + } +} + +impl PacketParser { + pub fn default() -> Self { + Self { + encode: Arc::new(Box::new(Self::default_encode)), + decode: Arc::new(Box::new(Self::default_decode)), + } + } + + pub fn default_encode(packet: &Packet) -> Bytes { + // first the packet type + let mut buffer = String::new(); + buffer.push((packet.packet_type as u8 + b'0') as char); + + // eventually a number of attachments, followed by '-' + if let PacketId::BinaryAck | PacketId::BinaryEvent = packet.packet_type { + let _ = write!(buffer, "{}-", packet.attachment_count); + } + + // if the namespace is different from the default one append it as well, + // followed by ',' + if packet.nsp != "/" { + buffer.push_str(&packet.nsp); + buffer.push(','); + } + + // if an id is present append it... + if let Some(id) = packet.id { + let _ = write!(buffer, "{id}"); + } + + if packet.attachments.is_some() { + let num = packet.attachment_count - 1; + + // check if an event type is present + if let Some(event_type) = packet.data.as_ref() { + let _ = write!( + buffer, + "[{event_type},{{\"_placeholder\":true,\"num\":{num}}}]", + ); + } else { + let _ = write!(buffer, "[{{\"_placeholder\":true,\"num\":{num}}}]"); + } + } else if let Some(data) = packet.data.as_ref() { + buffer.push_str(data); + } + + Bytes::from(buffer) + } + + pub fn default_decode(payload: &Bytes) -> Result { + let mut payload = str_from_utf8(&payload).map_err(Error::InvalidUtf8)?; + let mut packet = Packet::default(); + + // packet_type + let id_char = payload.chars().next().ok_or(Error::IncompletePacket())?; + packet.packet_type = PacketId::try_from(id_char)?; + + payload = &payload[id_char.len_utf8()..]; + + // attachment_count + if let PacketId::BinaryAck | PacketId::BinaryEvent = packet.packet_type { + let (prefix, rest) = payload.split_once('-').ok_or(Error::IncompletePacket())?; + payload = rest; + packet.attachment_count = prefix.parse().map_err(|_| Error::InvalidPacket())?; + } + + // namespace + if payload.starts_with('/') { + let (prefix, rest) = payload.split_once(',').ok_or(Error::IncompletePacket())?; + payload = rest; + packet.nsp.clear(); // clearing the default + packet.nsp.push_str(prefix); + } + + // id + let Some((non_digit_idx, _)) = payload.char_indices().find(|(_, c)| !c.is_ascii_digit()) + else { + return Ok(packet); + }; + + if non_digit_idx > 0 { + let (prefix, rest) = payload.split_at(non_digit_idx); + payload = rest; + packet.id = Some(prefix.parse().map_err(|_| Error::InvalidPacket())?); + } + + // validate json + serde_json::from_str::(payload).map_err(Error::InvalidJson)?; + + match packet.packet_type { + PacketId::BinaryAck | PacketId::BinaryEvent => { + if payload.starts_with('[') && payload.ends_with(']') { + payload = &payload[1..payload.len() - 1]; + } + + let mut str = payload.replace("{\"_placeholder\":true,\"num\":0}", ""); + + if str.ends_with(',') { + str.pop(); + } + + if !str.is_empty() { + packet.data = Some(str); + } + } + _ => packet.data = Some(payload.to_string()), + } + + Ok(packet) + } +} + impl Packet { /// Returns a packet for a payload, could be used for both binary and non binary /// events and acks. Convenience method. From 521daf04ca5255cd974e13a01339e5896159ef1f Mon Sep 17 00:00:00 2001 From: shenjack <3695888@qq.com> Date: Sat, 18 May 2024 13:18:52 +0800 Subject: [PATCH 2/5] Implment packet_parser --- socketio/src/client/builder.rs | 12 +- socketio/src/packet.rs | 354 +++++++++++++++++++-------------- socketio/src/socket.rs | 15 +- 3 files changed, 219 insertions(+), 162 deletions(-) diff --git a/socketio/src/client/builder.rs b/socketio/src/client/builder.rs index 724971f0..fe2a1dce 100644 --- a/socketio/src/client/builder.rs +++ b/socketio/src/client/builder.rs @@ -1,6 +1,7 @@ use super::super::{event::Event, payload::Payload}; use super::callback::Callback; use super::client::Client; +use crate::packet::PacketParser; use crate::RawClient; use native_tls::TlsConnector; use rust_engineio::client::ClientBuilder as EngineIoClientBuilder; @@ -40,6 +41,7 @@ pub struct ClientBuilder { tls_config: Option, opening_headers: Option, transport_type: TransportType, + packet_parser: PacketParser, auth: Option, pub(crate) reconnect: bool, pub(crate) reconnect_on_disconnect: bool, @@ -91,6 +93,7 @@ impl ClientBuilder { tls_config: None, opening_headers: None, transport_type: TransportType::Any, + packet_parser: PacketParser::default(), auth: None, reconnect: true, reconnect_on_disconnect: false, @@ -306,6 +309,13 @@ impl ClientBuilder { self } + /// Specifies how to parser Packet + pub fn packet_parser(mut self, packet_parser: PacketParser) -> Self { + self.packet_parser = packet_parser; + + self + } + /// Connects the socket to a certain endpoint. This returns a connected /// [`Client`] instance. This method returns an [`std::result::Result::Err`] /// value if something goes wrong during connection. Also starts a separate @@ -357,7 +367,7 @@ impl ClientBuilder { TransportType::WebsocketUpgrade => builder.build_websocket_with_upgrade()?, }; - let inner_socket = InnerSocket::new(engine_client)?; + let inner_socket = InnerSocket::new(engine_client, self.packet_parser.clone())?; let socket = RawClient::new( inner_socket, diff --git a/socketio/src/packet.rs b/socketio/src/packet.rs index 8407273f..a34789be 100644 --- a/socketio/src/packet.rs +++ b/socketio/src/packet.rs @@ -5,7 +5,7 @@ use rust_engineio::packet; use serde::de::IgnoredAny; use std::convert::TryFrom; -use std::fmt::{Display, Write}; +use std::fmt::{Debug, Display, Write}; use std::str::from_utf8 as str_from_utf8; use std::sync::Arc; @@ -37,8 +37,8 @@ pub struct Packet { /// /// support [Custom parser](https://socket.io/docs/v4/custom-parser/) pub struct PacketParser { - encode: Arc Bytes>>, - decode: Arc Result>>, + encode: Arc Bytes + Send + Sync>>, + decode: Arc Result + Send + Sync>>, } impl Display for PacketParser { @@ -47,13 +47,56 @@ impl Display for PacketParser { } } -impl PacketParser { - pub fn default() -> Self { +impl Debug for PacketParser { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PacketSerializer").finish() + } +} + +impl Default for PacketParser { + fn default() -> Self { Self { encode: Arc::new(Box::new(Self::default_encode)), decode: Arc::new(Box::new(Self::default_decode)), } } +} + +impl PacketParser { + /// Creates a new instance of `PacketSerializer` with both encode and decode functions. + pub fn new( + encode: Box Bytes + Send + Sync>, + decode: Box Result + Send + Sync>, + ) -> Self { + Self { + encode: Arc::new(encode), + decode: Arc::new(decode), + } + } + + /// Creates a new instance of `PacketSerializer` with only encode function. and a default decode function. + pub fn new_encode(encode: Box Bytes + Send + Sync>) -> Self { + Self { + encode: Arc::new(encode), + decode: Arc::new(Box::new(Self::default_decode)), + } + } + + /// Creates a new instance of `PacketSerializer` with only decode function. and a default encode function. + pub fn new_decode(decode: Box Result + Send + Sync>) -> Self { + Self { + encode: Arc::new(Box::new(Self::default_encode)), + decode: Arc::new(decode), + } + } + + pub fn encode(&self, packet: &Packet) -> Bytes { + (self.encode)(packet) + } + + pub fn decode(&self, payload: &Bytes) -> Result { + (self.decode)(payload) + } pub fn default_encode(packet: &Packet) -> Bytes { // first the packet type @@ -276,135 +319,135 @@ impl Packet { } } -impl From for Bytes { - fn from(packet: Packet) -> Self { - Bytes::from(&packet) - } -} - -impl From<&Packet> for Bytes { - /// Method for encoding from a `Packet` to a `u8` byte stream. - /// The binary payload of a packet is not put at the end of the - /// stream as it gets handled and send by it's own logic via the socket. - fn from(packet: &Packet) -> Bytes { - // first the packet type - let mut buffer = String::new(); - buffer.push((packet.packet_type as u8 + b'0') as char); - - // eventually a number of attachments, followed by '-' - if let PacketId::BinaryAck | PacketId::BinaryEvent = packet.packet_type { - let _ = write!(buffer, "{}-", packet.attachment_count); - } - - // if the namespace is different from the default one append it as well, - // followed by ',' - if packet.nsp != "/" { - buffer.push_str(&packet.nsp); - buffer.push(','); - } - - // if an id is present append it... - if let Some(id) = packet.id { - let _ = write!(buffer, "{id}"); - } - - if packet.attachments.is_some() { - let num = packet.attachment_count - 1; - - // check if an event type is present - if let Some(event_type) = packet.data.as_ref() { - let _ = write!( - buffer, - "[{event_type},{{\"_placeholder\":true,\"num\":{num}}}]", - ); - } else { - let _ = write!(buffer, "[{{\"_placeholder\":true,\"num\":{num}}}]"); - } - } else if let Some(data) = packet.data.as_ref() { - buffer.push_str(data); - } - - Bytes::from(buffer) - } -} - -impl TryFrom for Packet { - type Error = Error; - fn try_from(value: Bytes) -> Result { - Packet::try_from(&value) - } -} - -impl TryFrom<&Bytes> for Packet { - type Error = Error; - /// Decodes a packet given a `Bytes` type. - /// The binary payload of a packet is not put at the end of the - /// stream as it gets handled and send by it's own logic via the socket. - /// Therefore this method does not return the correct value for the - /// binary data, instead the socket is responsible for handling - /// this member. This is done because the attachment is usually - /// send in another packet. - fn try_from(payload: &Bytes) -> Result { - let mut payload = str_from_utf8(&payload).map_err(Error::InvalidUtf8)?; - let mut packet = Packet::default(); - - // packet_type - let id_char = payload.chars().next().ok_or(Error::IncompletePacket())?; - packet.packet_type = PacketId::try_from(id_char)?; - payload = &payload[id_char.len_utf8()..]; - - // attachment_count - if let PacketId::BinaryAck | PacketId::BinaryEvent = packet.packet_type { - let (prefix, rest) = payload.split_once('-').ok_or(Error::IncompletePacket())?; - payload = rest; - packet.attachment_count = prefix.parse().map_err(|_| Error::InvalidPacket())?; - } - - // namespace - if payload.starts_with('/') { - let (prefix, rest) = payload.split_once(',').ok_or(Error::IncompletePacket())?; - payload = rest; - packet.nsp.clear(); // clearing the default - packet.nsp.push_str(prefix); - } - - // id - let Some((non_digit_idx, _)) = payload.char_indices().find(|(_, c)| !c.is_ascii_digit()) - else { - return Ok(packet); - }; - - if non_digit_idx > 0 { - let (prefix, rest) = payload.split_at(non_digit_idx); - payload = rest; - packet.id = Some(prefix.parse().map_err(|_| Error::InvalidPacket())?); - } - - // validate json - serde_json::from_str::(payload).map_err(Error::InvalidJson)?; - - match packet.packet_type { - PacketId::BinaryAck | PacketId::BinaryEvent => { - if payload.starts_with('[') && payload.ends_with(']') { - payload = &payload[1..payload.len() - 1]; - } - - let mut str = payload.replace("{\"_placeholder\":true,\"num\":0}", ""); - - if str.ends_with(',') { - str.pop(); - } - - if !str.is_empty() { - packet.data = Some(str); - } - } - _ => packet.data = Some(payload.to_string()), - } - - Ok(packet) - } -} +// impl From for Bytes { +// fn from(packet: Packet) -> Self { +// Bytes::from(&packet) +// } +// } + +// impl From<&Packet> for Bytes { +// /// Method for encoding from a `Packet` to a `u8` byte stream. +// /// The binary payload of a packet is not put at the end of the +// /// stream as it gets handled and send by it's own logic via the socket. +// fn from(packet: &Packet) -> Bytes { +// // first the packet type +// let mut buffer = String::new(); +// buffer.push((packet.packet_type as u8 + b'0') as char); + +// // eventually a number of attachments, followed by '-' +// if let PacketId::BinaryAck | PacketId::BinaryEvent = packet.packet_type { +// let _ = write!(buffer, "{}-", packet.attachment_count); +// } + +// // if the namespace is different from the default one append it as well, +// // followed by ',' +// if packet.nsp != "/" { +// buffer.push_str(&packet.nsp); +// buffer.push(','); +// } + +// // if an id is present append it... +// if let Some(id) = packet.id { +// let _ = write!(buffer, "{id}"); +// } + +// if packet.attachments.is_some() { +// let num = packet.attachment_count - 1; + +// // check if an event type is present +// if let Some(event_type) = packet.data.as_ref() { +// let _ = write!( +// buffer, +// "[{event_type},{{\"_placeholder\":true,\"num\":{num}}}]", +// ); +// } else { +// let _ = write!(buffer, "[{{\"_placeholder\":true,\"num\":{num}}}]"); +// } +// } else if let Some(data) = packet.data.as_ref() { +// buffer.push_str(data); +// } + +// Bytes::from(buffer) +// } +// } + +// impl TryFrom for Packet { +// type Error = Error; +// fn try_from(value: Bytes) -> Result { +// Packet::try_from(&value) +// } +// } + +// impl TryFrom<&Bytes> for Packet { +// type Error = Error; +// /// Decodes a packet given a `Bytes` type. +// /// The binary payload of a packet is not put at the end of the +// /// stream as it gets handled and send by it's own logic via the socket. +// /// Therefore this method does not return the correct value for the +// /// binary data, instead the socket is responsible for handling +// /// this member. This is done because the attachment is usually +// /// send in another packet. +// fn try_from(payload: &Bytes) -> Result { +// let mut payload = str_from_utf8(&payload).map_err(Error::InvalidUtf8)?; +// let mut packet = Packet::default(); + +// // packet_type +// let id_char = payload.chars().next().ok_or(Error::IncompletePacket())?; +// packet.packet_type = PacketId::try_from(id_char)?; +// payload = &payload[id_char.len_utf8()..]; + +// // attachment_count +// if let PacketId::BinaryAck | PacketId::BinaryEvent = packet.packet_type { +// let (prefix, rest) = payload.split_once('-').ok_or(Error::IncompletePacket())?; +// payload = rest; +// packet.attachment_count = prefix.parse().map_err(|_| Error::InvalidPacket())?; +// } + +// // namespace +// if payload.starts_with('/') { +// let (prefix, rest) = payload.split_once(',').ok_or(Error::IncompletePacket())?; +// payload = rest; +// packet.nsp.clear(); // clearing the default +// packet.nsp.push_str(prefix); +// } + +// // id +// let Some((non_digit_idx, _)) = payload.char_indices().find(|(_, c)| !c.is_ascii_digit()) +// else { +// return Ok(packet); +// }; + +// if non_digit_idx > 0 { +// let (prefix, rest) = payload.split_at(non_digit_idx); +// payload = rest; +// packet.id = Some(prefix.parse().map_err(|_| Error::InvalidPacket())?); +// } + +// // validate json +// serde_json::from_str::(payload).map_err(Error::InvalidJson)?; + +// match packet.packet_type { +// PacketId::BinaryAck | PacketId::BinaryEvent => { +// if payload.starts_with('[') && payload.ends_with(']') { +// payload = &payload[1..payload.len() - 1]; +// } + +// let mut str = payload.replace("{\"_placeholder\":true,\"num\":0}", ""); + +// if str.ends_with(',') { +// str.pop(); +// } + +// if !str.is_empty() { +// packet.data = Some(str); +// } +// } +// _ => packet.data = Some(payload.to_string()), +// } + +// Ok(packet) +// } +// } #[cfg(test)] mod test { @@ -415,7 +458,7 @@ mod test { /// https://github.com/socketio/socket.io-protocol fn test_decode() { let payload = Bytes::from_static(b"0{\"token\":\"123\"}"); - let packet = Packet::try_from(&payload); + let packet = PacketParser::default_decode(&payload); assert!(packet.is_ok()); assert_eq!( @@ -433,7 +476,7 @@ mod test { let utf8_data = "{\"token™\":\"123\"}".to_owned(); let utf8_payload = format!("0/admin™,{}", utf8_data); let payload = Bytes::from(utf8_payload); - let packet = Packet::try_from(&payload); + let packet = PacketParser::default_decode(&payload); assert!(packet.is_ok()); assert_eq!( @@ -449,7 +492,7 @@ mod test { ); let payload = Bytes::from_static(b"1/admin,"); - let packet = Packet::try_from(&payload); + let packet = PacketParser::default_decode(&payload); assert!(packet.is_ok()); assert_eq!( @@ -465,7 +508,7 @@ mod test { ); let payload = Bytes::from_static(b"2[\"hello\",1]"); - let packet = Packet::try_from(&payload); + let packet = PacketParser::default_decode(&payload); assert!(packet.is_ok()); assert_eq!( @@ -481,7 +524,7 @@ mod test { ); let payload = Bytes::from_static(b"2/admin,456[\"project:delete\",123]"); - let packet = Packet::try_from(&payload); + let packet = PacketParser::default_decode(&payload); assert!(packet.is_ok()); assert_eq!( @@ -497,7 +540,7 @@ mod test { ); let payload = Bytes::from_static(b"3/admin,456[]"); - let packet = Packet::try_from(&payload); + let packet = PacketParser::default_decode(&payload); assert!(packet.is_ok()); assert_eq!( @@ -513,7 +556,7 @@ mod test { ); let payload = Bytes::from_static(b"4/admin,{\"message\":\"Not authorized\"}"); - let packet = Packet::try_from(&payload); + let packet = PacketParser::default_decode(&payload); assert!(packet.is_ok()); assert_eq!( @@ -529,7 +572,7 @@ mod test { ); let payload = Bytes::from_static(b"51-[\"hello\",{\"_placeholder\":true,\"num\":0}]"); - let packet = Packet::try_from(&payload); + let packet = PacketParser::default_decode(&payload); assert!(packet.is_ok()); assert_eq!( @@ -547,7 +590,7 @@ mod test { let payload = Bytes::from_static( b"51-/admin,456[\"project:delete\",{\"_placeholder\":true,\"num\":0}]", ); - let packet = Packet::try_from(&payload); + let packet = PacketParser::default_decode(&payload); assert!(packet.is_ok()); assert_eq!( @@ -563,7 +606,7 @@ mod test { ); let payload = Bytes::from_static(b"61-/admin,456[{\"_placeholder\":true,\"num\":0}]"); - let packet = Packet::try_from(&payload); + let packet = PacketParser::default_decode(&payload); assert!(packet.is_ok()); assert_eq!( @@ -593,7 +636,7 @@ mod test { ); assert_eq!( - Bytes::from(&packet), + PacketParser::default_encode(&packet), "0{\"token\":\"123\"}".to_string().into_bytes() ); @@ -607,7 +650,7 @@ mod test { ); assert_eq!( - Bytes::from(&packet), + PacketParser::default_encode(&packet), "0/admin,{\"token\":\"123\"}".to_string().into_bytes() ); @@ -620,7 +663,10 @@ mod test { None, ); - assert_eq!(Bytes::from(&packet), "1/admin,".to_string().into_bytes()); + assert_eq!( + PacketParser::default_encode(&packet), + "1/admin,".to_string().into_bytes() + ); let packet = Packet::new( PacketId::Event, @@ -632,7 +678,7 @@ mod test { ); assert_eq!( - Bytes::from(&packet), + PacketParser::default_encode(&packet), "2[\"hello\",1]".to_string().into_bytes() ); @@ -646,7 +692,7 @@ mod test { ); assert_eq!( - Bytes::from(&packet), + PacketParser::default_encode(&packet), "2/admin,456[\"project:delete\",123]" .to_string() .into_bytes() @@ -662,7 +708,7 @@ mod test { ); assert_eq!( - Bytes::from(&packet), + PacketParser::default_encode(&packet), "3/admin,456[]".to_string().into_bytes() ); @@ -676,7 +722,7 @@ mod test { ); assert_eq!( - Bytes::from(&packet), + PacketParser::default_encode(&packet), "4/admin,{\"message\":\"Not authorized\"}" .to_string() .into_bytes() @@ -692,7 +738,7 @@ mod test { ); assert_eq!( - Bytes::from(&packet), + PacketParser::default_encode(&packet), "51-[\"hello\",{\"_placeholder\":true,\"num\":0}]" .to_string() .into_bytes() @@ -708,7 +754,7 @@ mod test { ); assert_eq!( - Bytes::from(&packet), + PacketParser::default_encode(&packet), "51-/admin,456[\"project:delete\",{\"_placeholder\":true,\"num\":0}]" .to_string() .into_bytes() @@ -724,7 +770,7 @@ mod test { ); assert_eq!( - Bytes::from(&packet), + PacketParser::default_encode(&packet), "61-/admin,456[{\"_placeholder\":true,\"num\":0}]" .to_string() .into_bytes() diff --git a/socketio/src/socket.rs b/socketio/src/socket.rs index b881bad0..b5bc4159 100644 --- a/socketio/src/socket.rs +++ b/socketio/src/socket.rs @@ -1,8 +1,6 @@ use crate::error::{Error, Result}; -use crate::packet::{Packet, PacketId}; -use bytes::Bytes; +use crate::packet::{Packet, PacketId, PacketParser}; use rust_engineio::{Client as EngineClient, Packet as EnginePacket, PacketId as EnginePacketId}; -use std::convert::TryFrom; use std::sync::{atomic::AtomicBool, Arc}; use std::{fmt::Debug, sync::atomic::Ordering}; @@ -11,18 +9,20 @@ use super::{event::Event, payload::Payload}; /// Handles communication in the `socket.io` protocol. #[derive(Clone, Debug)] pub(crate) struct Socket { - //TODO: 0.4.0 refactor this + // TODO: 0.4.0 refactor this engine_client: Arc, connected: Arc, + packet_parser: PacketParser, } impl Socket { /// Creates an instance of `Socket`. - pub(super) fn new(engine_client: EngineClient) -> Result { + pub(super) fn new(engine_client: EngineClient, packet_parser: PacketParser) -> Result { Ok(Socket { engine_client: Arc::new(engine_client), connected: Arc::new(AtomicBool::default()), + packet_parser, }) } @@ -57,7 +57,8 @@ impl Socket { } // the packet, encoded as an engine.io message packet - let engine_packet = EnginePacket::new(EnginePacketId::Message, Bytes::from(&packet)); + let engine_packet = + EnginePacket::new(EnginePacketId::Message, self.packet_parser.encode(&packet)); self.engine_client.emit(engine_packet)?; if let Some(attachments) = packet.attachments { @@ -119,7 +120,7 @@ impl Socket { /// Handles new incoming engineio packets fn handle_engineio_packet(&self, packet: EnginePacket) -> Result { - let mut socket_packet = Packet::try_from(&packet.data)?; + let mut socket_packet = self.packet_parser.decode(&packet.data)?; // Only handle attachments if there are any if socket_packet.attachment_count > 0 { From c6aabd23a44d9a86654ba76d16b18f272ae445cd Mon Sep 17 00:00:00 2001 From: shenjack <3695888@qq.com> Date: Sat, 18 May 2024 13:36:40 +0800 Subject: [PATCH 3/5] Implment parser in async --- socketio/src/asynchronous/client/builder.rs | 6 ++++-- socketio/src/asynchronous/socket.rs | 21 +++++++++++++++------ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/socketio/src/asynchronous/client/builder.rs b/socketio/src/asynchronous/client/builder.rs index 44710e19..e5b6d628 100644 --- a/socketio/src/asynchronous/client/builder.rs +++ b/socketio/src/asynchronous/client/builder.rs @@ -8,7 +8,7 @@ use rust_engineio::{ use std::collections::HashMap; use url::Url; -use crate::{error::Result, Event, Payload, TransportType}; +use crate::{error::Result, packet::PacketParser, Event, Payload, TransportType}; use super::{ callback::{ @@ -31,6 +31,7 @@ pub struct ClientBuilder { tls_config: Option, opening_headers: Option, transport_type: TransportType, + packet_parser: PacketParser, pub(crate) auth: Option, pub(crate) reconnect: bool, pub(crate) reconnect_on_disconnect: bool, @@ -90,6 +91,7 @@ impl ClientBuilder { tls_config: None, opening_headers: None, transport_type: TransportType::Any, + packet_parser: PacketParser::default(), auth: None, reconnect: true, reconnect_on_disconnect: false, @@ -453,7 +455,7 @@ impl ClientBuilder { TransportType::WebsocketUpgrade => builder.build_websocket_with_upgrade().await?, }; - let inner_socket = InnerSocket::new(engine_client)?; + let inner_socket = InnerSocket::new(engine_client, self.packet_parser.clone())?; Ok(inner_socket) } diff --git a/socketio/src/asynchronous/socket.rs b/socketio/src/asynchronous/socket.rs index 81a9ebcd..b34d3d62 100644 --- a/socketio/src/asynchronous/socket.rs +++ b/socketio/src/asynchronous/socket.rs @@ -1,7 +1,7 @@ use super::generator::StreamGenerator; use crate::{ error::Result, - packet::{Packet, PacketId}, + packet::{Packet, PacketId, PacketParser}, Error, Event, Payload, }; use async_stream::try_stream; @@ -24,16 +24,22 @@ pub(crate) struct Socket { engine_client: Arc, connected: Arc, generator: StreamGenerator, + packet_parser: PacketParser, } impl Socket { /// Creates an instance of `Socket`. - pub(super) fn new(engine_client: EngineClient) -> Result { + pub(super) fn new(engine_client: EngineClient, packet_parser: PacketParser) -> Result { let connected = Arc::new(AtomicBool::default()); Ok(Socket { engine_client: Arc::new(engine_client.clone()), connected: connected.clone(), - generator: StreamGenerator::new(Self::stream(engine_client, connected)), + generator: StreamGenerator::new(Self::stream( + engine_client, + connected, + packet_parser.clone(), + )), + packet_parser, }) } @@ -68,7 +74,8 @@ impl Socket { } // the packet, encoded as an engine.io message packet - let engine_packet = EnginePacket::new(EnginePacketId::Message, Bytes::from(&packet)); + let engine_packet = + EnginePacket::new(EnginePacketId::Message, self.packet_parser.encode(&packet)); self.engine_client.emit(engine_packet).await?; if let Some(attachments) = packet.attachments { @@ -92,6 +99,7 @@ impl Socket { fn stream( client: EngineClient, is_connected: Arc, + parser: PacketParser, ) -> Pin> + Send>> { Box::pin(try_stream! { for await received_data in client.clone() { @@ -100,7 +108,7 @@ impl Socket { if packet.packet_id == EnginePacketId::Message || packet.packet_id == EnginePacketId::MessageBinary { - let packet = Self::handle_engineio_packet(packet, client.clone()).await?; + let packet = Self::handle_engineio_packet(packet, client.clone(), &parser).await?; Self::handle_socketio_packet(&packet, is_connected.clone()); yield packet; @@ -130,8 +138,9 @@ impl Socket { async fn handle_engineio_packet( packet: EnginePacket, mut client: EngineClient, + parser: &PacketParser, ) -> Result { - let mut socket_packet = Packet::try_from(&packet.data)?; + let mut socket_packet = parser.decode(&packet.data)?; // Only handle attachments if there are any if socket_packet.attachment_count > 0 { From a80357a689455117099847f238d2714dd5bc4578 Mon Sep 17 00:00:00 2001 From: shenjack <3695888@qq.com> Date: Sat, 18 May 2024 13:39:06 +0800 Subject: [PATCH 4/5] remove useless use --- socketio/src/asynchronous/socket.rs | 1 - socketio/src/packet.rs | 1 - socketio/src/socket.rs | 4 ++-- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/socketio/src/asynchronous/socket.rs b/socketio/src/asynchronous/socket.rs index b34d3d62..b99affd3 100644 --- a/socketio/src/asynchronous/socket.rs +++ b/socketio/src/asynchronous/socket.rs @@ -5,7 +5,6 @@ use crate::{ Error, Event, Payload, }; use async_stream::try_stream; -use bytes::Bytes; use futures_util::{Stream, StreamExt}; use rust_engineio::{ asynchronous::Client as EngineClient, Packet as EnginePacket, PacketId as EnginePacketId, diff --git a/socketio/src/packet.rs b/socketio/src/packet.rs index a34789be..ea9b7b35 100644 --- a/socketio/src/packet.rs +++ b/socketio/src/packet.rs @@ -1,7 +1,6 @@ use crate::error::{Error, Result}; use crate::{Event, Payload}; use bytes::Bytes; -use rust_engineio::packet; use serde::de::IgnoredAny; use std::convert::TryFrom; diff --git a/socketio/src/socket.rs b/socketio/src/socket.rs index b5bc4159..36b3c3d9 100644 --- a/socketio/src/socket.rs +++ b/socketio/src/socket.rs @@ -1,11 +1,11 @@ use crate::error::{Error, Result}; +use crate::event::Event; use crate::packet::{Packet, PacketId, PacketParser}; +use crate::payload::Payload; use rust_engineio::{Client as EngineClient, Packet as EnginePacket, PacketId as EnginePacketId}; use std::sync::{atomic::AtomicBool, Arc}; use std::{fmt::Debug, sync::atomic::Ordering}; -use super::{event::Event, payload::Payload}; - /// Handles communication in the `socket.io` protocol. #[derive(Clone, Debug)] pub(crate) struct Socket { From cddbdb60cb750976dd2cd2770b5c4711e469fbe2 Mon Sep 17 00:00:00 2001 From: shenjack <3695888@qq.com> Date: Sat, 18 May 2024 13:48:23 +0800 Subject: [PATCH 5/5] remove comment --- socketio/src/packet.rs | 130 ----------------------------------------- 1 file changed, 130 deletions(-) diff --git a/socketio/src/packet.rs b/socketio/src/packet.rs index ea9b7b35..bbee50d4 100644 --- a/socketio/src/packet.rs +++ b/socketio/src/packet.rs @@ -318,136 +318,6 @@ impl Packet { } } -// impl From for Bytes { -// fn from(packet: Packet) -> Self { -// Bytes::from(&packet) -// } -// } - -// impl From<&Packet> for Bytes { -// /// Method for encoding from a `Packet` to a `u8` byte stream. -// /// The binary payload of a packet is not put at the end of the -// /// stream as it gets handled and send by it's own logic via the socket. -// fn from(packet: &Packet) -> Bytes { -// // first the packet type -// let mut buffer = String::new(); -// buffer.push((packet.packet_type as u8 + b'0') as char); - -// // eventually a number of attachments, followed by '-' -// if let PacketId::BinaryAck | PacketId::BinaryEvent = packet.packet_type { -// let _ = write!(buffer, "{}-", packet.attachment_count); -// } - -// // if the namespace is different from the default one append it as well, -// // followed by ',' -// if packet.nsp != "/" { -// buffer.push_str(&packet.nsp); -// buffer.push(','); -// } - -// // if an id is present append it... -// if let Some(id) = packet.id { -// let _ = write!(buffer, "{id}"); -// } - -// if packet.attachments.is_some() { -// let num = packet.attachment_count - 1; - -// // check if an event type is present -// if let Some(event_type) = packet.data.as_ref() { -// let _ = write!( -// buffer, -// "[{event_type},{{\"_placeholder\":true,\"num\":{num}}}]", -// ); -// } else { -// let _ = write!(buffer, "[{{\"_placeholder\":true,\"num\":{num}}}]"); -// } -// } else if let Some(data) = packet.data.as_ref() { -// buffer.push_str(data); -// } - -// Bytes::from(buffer) -// } -// } - -// impl TryFrom for Packet { -// type Error = Error; -// fn try_from(value: Bytes) -> Result { -// Packet::try_from(&value) -// } -// } - -// impl TryFrom<&Bytes> for Packet { -// type Error = Error; -// /// Decodes a packet given a `Bytes` type. -// /// The binary payload of a packet is not put at the end of the -// /// stream as it gets handled and send by it's own logic via the socket. -// /// Therefore this method does not return the correct value for the -// /// binary data, instead the socket is responsible for handling -// /// this member. This is done because the attachment is usually -// /// send in another packet. -// fn try_from(payload: &Bytes) -> Result { -// let mut payload = str_from_utf8(&payload).map_err(Error::InvalidUtf8)?; -// let mut packet = Packet::default(); - -// // packet_type -// let id_char = payload.chars().next().ok_or(Error::IncompletePacket())?; -// packet.packet_type = PacketId::try_from(id_char)?; -// payload = &payload[id_char.len_utf8()..]; - -// // attachment_count -// if let PacketId::BinaryAck | PacketId::BinaryEvent = packet.packet_type { -// let (prefix, rest) = payload.split_once('-').ok_or(Error::IncompletePacket())?; -// payload = rest; -// packet.attachment_count = prefix.parse().map_err(|_| Error::InvalidPacket())?; -// } - -// // namespace -// if payload.starts_with('/') { -// let (prefix, rest) = payload.split_once(',').ok_or(Error::IncompletePacket())?; -// payload = rest; -// packet.nsp.clear(); // clearing the default -// packet.nsp.push_str(prefix); -// } - -// // id -// let Some((non_digit_idx, _)) = payload.char_indices().find(|(_, c)| !c.is_ascii_digit()) -// else { -// return Ok(packet); -// }; - -// if non_digit_idx > 0 { -// let (prefix, rest) = payload.split_at(non_digit_idx); -// payload = rest; -// packet.id = Some(prefix.parse().map_err(|_| Error::InvalidPacket())?); -// } - -// // validate json -// serde_json::from_str::(payload).map_err(Error::InvalidJson)?; - -// match packet.packet_type { -// PacketId::BinaryAck | PacketId::BinaryEvent => { -// if payload.starts_with('[') && payload.ends_with(']') { -// payload = &payload[1..payload.len() - 1]; -// } - -// let mut str = payload.replace("{\"_placeholder\":true,\"num\":0}", ""); - -// if str.ends_with(',') { -// str.pop(); -// } - -// if !str.is_empty() { -// packet.data = Some(str); -// } -// } -// _ => packet.data = Some(payload.to_string()), -// } - -// Ok(packet) -// } -// } - #[cfg(test)] mod test { use super::*;