diff --git a/socketio/src/asynchronous/client/builder.rs b/socketio/src/asynchronous/client/builder.rs index 44710e19..e5b6d628 100644 --- a/socketio/src/asynchronous/client/builder.rs +++ b/socketio/src/asynchronous/client/builder.rs @@ -8,7 +8,7 @@ use rust_engineio::{ use std::collections::HashMap; use url::Url; -use crate::{error::Result, Event, Payload, TransportType}; +use crate::{error::Result, packet::PacketParser, Event, Payload, TransportType}; use super::{ callback::{ @@ -31,6 +31,7 @@ pub struct ClientBuilder { tls_config: Option, opening_headers: Option, transport_type: TransportType, + packet_parser: PacketParser, pub(crate) auth: Option, pub(crate) reconnect: bool, pub(crate) reconnect_on_disconnect: bool, @@ -90,6 +91,7 @@ impl ClientBuilder { tls_config: None, opening_headers: None, transport_type: TransportType::Any, + packet_parser: PacketParser::default(), auth: None, reconnect: true, reconnect_on_disconnect: false, @@ -453,7 +455,7 @@ impl ClientBuilder { TransportType::WebsocketUpgrade => builder.build_websocket_with_upgrade().await?, }; - let inner_socket = InnerSocket::new(engine_client)?; + let inner_socket = InnerSocket::new(engine_client, self.packet_parser.clone())?; Ok(inner_socket) } diff --git a/socketio/src/asynchronous/socket.rs b/socketio/src/asynchronous/socket.rs index 81a9ebcd..b34d3d62 100644 --- a/socketio/src/asynchronous/socket.rs +++ b/socketio/src/asynchronous/socket.rs @@ -1,7 +1,7 @@ use super::generator::StreamGenerator; use crate::{ error::Result, - packet::{Packet, PacketId}, + packet::{Packet, PacketId, PacketParser}, Error, Event, Payload, }; use async_stream::try_stream; @@ -24,16 +24,22 @@ pub(crate) struct Socket { engine_client: Arc, connected: Arc, generator: StreamGenerator, + packet_parser: PacketParser, } impl Socket { /// Creates an instance of `Socket`. - pub(super) fn new(engine_client: EngineClient) -> Result { + pub(super) fn new(engine_client: EngineClient, packet_parser: PacketParser) -> Result { let connected = Arc::new(AtomicBool::default()); Ok(Socket { engine_client: Arc::new(engine_client.clone()), connected: connected.clone(), - generator: StreamGenerator::new(Self::stream(engine_client, connected)), + generator: StreamGenerator::new(Self::stream( + engine_client, + connected, + packet_parser.clone(), + )), + packet_parser, }) } @@ -68,7 +74,8 @@ impl Socket { } // the packet, encoded as an engine.io message packet - let engine_packet = EnginePacket::new(EnginePacketId::Message, Bytes::from(&packet)); + let engine_packet = + EnginePacket::new(EnginePacketId::Message, self.packet_parser.encode(&packet)); self.engine_client.emit(engine_packet).await?; if let Some(attachments) = packet.attachments { @@ -92,6 +99,7 @@ impl Socket { fn stream( client: EngineClient, is_connected: Arc, + parser: PacketParser, ) -> Pin> + Send>> { Box::pin(try_stream! { for await received_data in client.clone() { @@ -100,7 +108,7 @@ impl Socket { if packet.packet_id == EnginePacketId::Message || packet.packet_id == EnginePacketId::MessageBinary { - let packet = Self::handle_engineio_packet(packet, client.clone()).await?; + let packet = Self::handle_engineio_packet(packet, client.clone(), &parser).await?; Self::handle_socketio_packet(&packet, is_connected.clone()); yield packet; @@ -130,8 +138,9 @@ impl Socket { async fn handle_engineio_packet( packet: EnginePacket, mut client: EngineClient, + parser: &PacketParser, ) -> Result { - let mut socket_packet = Packet::try_from(&packet.data)?; + let mut socket_packet = parser.decode(&packet.data)?; // Only handle attachments if there are any if socket_packet.attachment_count > 0 {