Skip to content

Commit

Permalink
Implment parser in async
Browse files Browse the repository at this point in the history
  • Loading branch information
shenjackyuanjie committed May 18, 2024
1 parent 521daf0 commit c6aabd2
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
6 changes: 4 additions & 2 deletions socketio/src/asynchronous/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rust_engineio::{
use std::collections::HashMap;
use url::Url;

use crate::{error::Result, Event, Payload, TransportType};
use crate::{error::Result, packet::PacketParser, Event, Payload, TransportType};

use super::{
callback::{
Expand All @@ -31,6 +31,7 @@ pub struct ClientBuilder {
tls_config: Option<TlsConnector>,
opening_headers: Option<HeaderMap>,
transport_type: TransportType,
packet_parser: PacketParser,
pub(crate) auth: Option<serde_json::Value>,
pub(crate) reconnect: bool,
pub(crate) reconnect_on_disconnect: bool,
Expand Down Expand Up @@ -90,6 +91,7 @@ impl ClientBuilder {
tls_config: None,
opening_headers: None,
transport_type: TransportType::Any,
packet_parser: PacketParser::default(),
auth: None,
reconnect: true,
reconnect_on_disconnect: false,
Expand Down Expand Up @@ -453,7 +455,7 @@ impl ClientBuilder {
TransportType::WebsocketUpgrade => builder.build_websocket_with_upgrade().await?,
};

let inner_socket = InnerSocket::new(engine_client)?;
let inner_socket = InnerSocket::new(engine_client, self.packet_parser.clone())?;
Ok(inner_socket)
}

Expand Down
21 changes: 15 additions & 6 deletions socketio/src/asynchronous/socket.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::generator::StreamGenerator;
use crate::{
error::Result,
packet::{Packet, PacketId},
packet::{Packet, PacketId, PacketParser},
Error, Event, Payload,
};
use async_stream::try_stream;
Expand All @@ -24,16 +24,22 @@ pub(crate) struct Socket {
engine_client: Arc<EngineClient>,
connected: Arc<AtomicBool>,
generator: StreamGenerator<Packet>,
packet_parser: PacketParser,
}

impl Socket {
/// Creates an instance of `Socket`.
pub(super) fn new(engine_client: EngineClient) -> Result<Self> {
pub(super) fn new(engine_client: EngineClient, packet_parser: PacketParser) -> Result<Self> {
let connected = Arc::new(AtomicBool::default());
Ok(Socket {
engine_client: Arc::new(engine_client.clone()),
connected: connected.clone(),
generator: StreamGenerator::new(Self::stream(engine_client, connected)),
generator: StreamGenerator::new(Self::stream(
engine_client,
connected,
packet_parser.clone(),
)),
packet_parser,
})
}

Expand Down Expand Up @@ -68,7 +74,8 @@ impl Socket {
}

// the packet, encoded as an engine.io message packet
let engine_packet = EnginePacket::new(EnginePacketId::Message, Bytes::from(&packet));
let engine_packet =
EnginePacket::new(EnginePacketId::Message, self.packet_parser.encode(&packet));
self.engine_client.emit(engine_packet).await?;

if let Some(attachments) = packet.attachments {
Expand All @@ -92,6 +99,7 @@ impl Socket {
fn stream(
client: EngineClient,
is_connected: Arc<AtomicBool>,
parser: PacketParser,
) -> Pin<Box<impl Stream<Item = Result<Packet>> + Send>> {
Box::pin(try_stream! {
for await received_data in client.clone() {
Expand All @@ -100,7 +108,7 @@ impl Socket {
if packet.packet_id == EnginePacketId::Message
|| packet.packet_id == EnginePacketId::MessageBinary
{
let packet = Self::handle_engineio_packet(packet, client.clone()).await?;
let packet = Self::handle_engineio_packet(packet, client.clone(), &parser).await?;
Self::handle_socketio_packet(&packet, is_connected.clone());

yield packet;
Expand Down Expand Up @@ -130,8 +138,9 @@ impl Socket {
async fn handle_engineio_packet(
packet: EnginePacket,
mut client: EngineClient,
parser: &PacketParser,
) -> Result<Packet> {
let mut socket_packet = Packet::try_from(&packet.data)?;
let mut socket_packet = parser.decode(&packet.data)?;

// Only handle attachments if there are any
if socket_packet.attachment_count > 0 {
Expand Down

0 comments on commit c6aabd2

Please sign in to comment.