Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support custom parser in socketio packet #436

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions socketio/src/asynchronous/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rust_engineio::{
use std::collections::HashMap;
use url::Url;

use crate::{error::Result, Event, Payload, TransportType};
use crate::{error::Result, packet::PacketParser, Event, Payload, TransportType};

use super::{
callback::{
Expand All @@ -31,6 +31,7 @@ pub struct ClientBuilder {
tls_config: Option<TlsConnector>,
opening_headers: Option<HeaderMap>,
transport_type: TransportType,
packet_parser: PacketParser,
pub(crate) auth: Option<serde_json::Value>,
pub(crate) reconnect: bool,
pub(crate) reconnect_on_disconnect: bool,
Expand Down Expand Up @@ -90,6 +91,7 @@ impl ClientBuilder {
tls_config: None,
opening_headers: None,
transport_type: TransportType::Any,
packet_parser: PacketParser::default(),
auth: None,
reconnect: true,
reconnect_on_disconnect: false,
Expand Down Expand Up @@ -453,7 +455,7 @@ impl ClientBuilder {
TransportType::WebsocketUpgrade => builder.build_websocket_with_upgrade().await?,
};

let inner_socket = InnerSocket::new(engine_client)?;
let inner_socket = InnerSocket::new(engine_client, self.packet_parser.clone())?;
Ok(inner_socket)
}

Expand Down
22 changes: 15 additions & 7 deletions socketio/src/asynchronous/socket.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use super::generator::StreamGenerator;
use crate::{
error::Result,
packet::{Packet, PacketId},
packet::{Packet, PacketId, PacketParser},
Error, Event, Payload,
};
use async_stream::try_stream;
use bytes::Bytes;
use futures_util::{Stream, StreamExt};
use rust_engineio::{
asynchronous::Client as EngineClient, Packet as EnginePacket, PacketId as EnginePacketId,
Expand All @@ -24,16 +23,22 @@ pub(crate) struct Socket {
engine_client: Arc<EngineClient>,
connected: Arc<AtomicBool>,
generator: StreamGenerator<Packet>,
packet_parser: PacketParser,
}

impl Socket {
/// Creates an instance of `Socket`.
pub(super) fn new(engine_client: EngineClient) -> Result<Self> {
pub(super) fn new(engine_client: EngineClient, packet_parser: PacketParser) -> Result<Self> {
let connected = Arc::new(AtomicBool::default());
Ok(Socket {
engine_client: Arc::new(engine_client.clone()),
connected: connected.clone(),
generator: StreamGenerator::new(Self::stream(engine_client, connected)),
generator: StreamGenerator::new(Self::stream(
engine_client,
connected,
packet_parser.clone(),
)),
packet_parser,
})
}

Expand Down Expand Up @@ -68,7 +73,8 @@ impl Socket {
}

// the packet, encoded as an engine.io message packet
let engine_packet = EnginePacket::new(EnginePacketId::Message, Bytes::from(&packet));
let engine_packet =
EnginePacket::new(EnginePacketId::Message, self.packet_parser.encode(&packet));
self.engine_client.emit(engine_packet).await?;

if let Some(attachments) = packet.attachments {
Expand All @@ -92,6 +98,7 @@ impl Socket {
fn stream(
client: EngineClient,
is_connected: Arc<AtomicBool>,
parser: PacketParser,
) -> Pin<Box<impl Stream<Item = Result<Packet>> + Send>> {
Box::pin(try_stream! {
for await received_data in client.clone() {
Expand All @@ -100,7 +107,7 @@ impl Socket {
if packet.packet_id == EnginePacketId::Message
|| packet.packet_id == EnginePacketId::MessageBinary
{
let packet = Self::handle_engineio_packet(packet, client.clone()).await?;
let packet = Self::handle_engineio_packet(packet, client.clone(), &parser).await?;
Self::handle_socketio_packet(&packet, is_connected.clone());

yield packet;
Expand Down Expand Up @@ -130,8 +137,9 @@ impl Socket {
async fn handle_engineio_packet(
packet: EnginePacket,
mut client: EngineClient,
parser: &PacketParser,
) -> Result<Packet> {
let mut socket_packet = Packet::try_from(&packet.data)?;
let mut socket_packet = parser.decode(&packet.data)?;

// Only handle attachments if there are any
if socket_packet.attachment_count > 0 {
Expand Down
12 changes: 11 additions & 1 deletion socketio/src/client/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::super::{event::Event, payload::Payload};
use super::callback::Callback;
use super::client::Client;
use crate::packet::PacketParser;
use crate::RawClient;
use native_tls::TlsConnector;
use rust_engineio::client::ClientBuilder as EngineIoClientBuilder;
Expand Down Expand Up @@ -40,6 +41,7 @@ pub struct ClientBuilder {
tls_config: Option<TlsConnector>,
opening_headers: Option<HeaderMap>,
transport_type: TransportType,
packet_parser: PacketParser,
auth: Option<serde_json::Value>,
pub(crate) reconnect: bool,
pub(crate) reconnect_on_disconnect: bool,
Expand Down Expand Up @@ -91,6 +93,7 @@ impl ClientBuilder {
tls_config: None,
opening_headers: None,
transport_type: TransportType::Any,
packet_parser: PacketParser::default(),
auth: None,
reconnect: true,
reconnect_on_disconnect: false,
Expand Down Expand Up @@ -306,6 +309,13 @@ impl ClientBuilder {
self
}

/// Specifies how to parser Packet
pub fn packet_parser(mut self, packet_parser: PacketParser) -> Self {
self.packet_parser = packet_parser;

self
}

/// Connects the socket to a certain endpoint. This returns a connected
/// [`Client`] instance. This method returns an [`std::result::Result::Err`]
/// value if something goes wrong during connection. Also starts a separate
Expand Down Expand Up @@ -357,7 +367,7 @@ impl ClientBuilder {
TransportType::WebsocketUpgrade => builder.build_websocket_with_upgrade()?,
};

let inner_socket = InnerSocket::new(engine_client)?;
let inner_socket = InnerSocket::new(engine_client, self.packet_parser.clone())?;

let socket = RawClient::new(
inner_socket,
Expand Down
2 changes: 2 additions & 0 deletions socketio/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum Error {
IncompletePacket(),
#[error("Got an invalid packet which did not follow the protocol format")]
InvalidPacket(),
#[error("Error while parsing an incomplete packet: {0}")]
ParsePacketFailed(String),
#[error("An error occurred while decoding the utf-8 text: {0}")]
InvalidUtf8(#[from] Utf8Error),
#[error("An error occurred while encoding/decoding base64: {0}")]
Expand Down
Loading
Loading