Skip to content

Commit

Permalink
using clouser
Browse files Browse the repository at this point in the history
  • Loading branch information
shenjackyuanjie committed Apr 17, 2024
1 parent cbcb35d commit 21a6407
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 72 deletions.
18 changes: 0 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion engineio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ async-stream = "0.3.5"
thiserror = "1.0"
native-tls = "0.2.11"
url = "2.5.0"
rmp = "0.8"

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_tokio"] }
Expand Down
17 changes: 2 additions & 15 deletions engineio/src/asynchronous/async_transports/polling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,24 +101,11 @@ impl Stream for PollingTransport {

#[async_trait]
impl AsyncTransport for PollingTransport {
async fn emit(&self, data: Bytes, is_binary_att: bool) -> Result<()> {
let data_to_send = if is_binary_att {
// the binary attachment gets `base64` encoded
let mut packet_bytes = BytesMut::with_capacity(data.len() + 1);
packet_bytes.put_u8(b'b');

let encoded_data = general_purpose::STANDARD.encode(data);
packet_bytes.put(encoded_data.as_bytes());

packet_bytes.freeze()
} else {
data
};

async fn emit(&self, data: Bytes) -> Result<()> {
let status = self
.client
.post(self.address().await?)
.body(data_to_send)
.body(data)
.send()
.await?
.status()
Expand Down
103 changes: 81 additions & 22 deletions engineio/src/packet.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,93 @@
use base64::{engine::general_purpose, Engine as _};
use bytes::{BufMut, Bytes, BytesMut};
use rmp::{decode, encode};
use serde::{Deserialize, Serialize};
use std::char;
use std::fmt::{Display, Formatter, Result as FmtResult, Write};
use std::ops::Index;

use crate::error::{Error, Result};

pub mod normal;
pub mod message_pack;

// /// Serializer of Engine.IO packet
// #[derive(Copy, Clone, Debug, Eq, PartialEq)]
// pub enum PacketSerializer {
// /// Normal serializer
// Normal,
// /// MessagePack serializer
// MessagePack,
// }

// impl Default for PacketSerializer {
// fn default() -> Self {
// PacketSerializer::Normal
// }
// }

pub trait PacketSerializer{
fn from_bytes(bytes: &[u8]) -> Result<Packet>;
fn to_bytes(&self) -> Result<Bytes>;
pub struct PacketSerializer {
decode: Box<dyn Fn(Bytes) -> Result<Packet> + Send + Sync>,
encode: Box<dyn Fn(Packet) -> Bytes + Send + Sync>,
}

fn default_decode(bytes: Bytes) -> Result<Packet> {
if bytes.is_empty() {
return Err(Error::IncompletePacket());
}

let is_base64 = *bytes.first().ok_or(Error::IncompletePacket())? == b'b';

// only 'messages' packets could be encoded
let packet_id = if is_base64 {
PacketId::MessageBinary
} else {
(*bytes.first().ok_or(Error::IncompletePacket())?).try_into()?
};

if bytes.len() == 1 && packet_id == PacketId::Message {
return Err(Error::IncompletePacket());
}

let data: Bytes = bytes.slice(1..);

Ok(Packet {
packet_id,
data: if is_base64 {
Bytes::from(general_purpose::STANDARD.decode(data.as_ref())?)
} else {
data
},
})
}

fn default_encode(packet: Packet) -> Bytes {
let mut result = BytesMut::with_capacity(packet.data.len() + 1);
result.put_u8(packet.packet_id.to_string_byte());
if packet.packet_id == PacketId::MessageBinary {
result.extend(general_purpose::STANDARD.encode(packet.data).into_bytes());
} else {
result.put(packet.data);
}
result.freeze()
}


impl PacketSerializer {
const SEPARATOR: char = '\x1e';

pub fn new(
decode: Box<dyn Fn(Bytes) -> Result<Packet> + Send + Sync>,
encode: Box<dyn Fn(Packet) -> Bytes + Send + Sync>,
) -> Self {
Self {
decode,
encode,
}
}

pub fn default() -> Self {
let decode = Box::new(default_decode);
let encode = Box::new(default_encode);
Self::new(decode, encode)
}

pub fn decode(&self, datas: Bytes) -> Result<Packet> {
(self.decode)(datas)
}

pub fn decode_payload(&self, datas: Bytes) -> Result<Payload> {
datas
.split(|&c| c as char == PacketSerializer::SEPARATOR)
.map(|slice| self.decode(datas.slice_ref(slice)))
.collect::<Result<Vec<Packet>>>()
.map(Payload)
}

pub fn encode(&self, packet: Packet) -> Bytes {
(self.encode)(packet)
}
}

/// Enumeration of the `engine.io` `Packet` types.
Expand Down
5 changes: 3 additions & 2 deletions engineio/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub const DEFAULT_MAX_POLL_TIMEOUT: Duration = Duration::from_secs(45);
#[derive(Clone)]
pub struct Socket {
transport: Arc<TransportType>,
serializer: dyn PacketSerializer,
serializer: PacketSerializer,
on_close: OptionalCallback<()>,
on_data: OptionalCallback<Bytes>,
on_error: OptionalCallback<String>,
Expand Down Expand Up @@ -150,7 +150,8 @@ impl Socket {
continue;
}

let payload = Payload::try_from(data)?;
// let payload = Payload::try_from(data)?;
let payload = self.serializer.decode_payload(data)?;
let mut iter = payload.into_iter();

if let Some(packet) = iter.next() {
Expand Down
16 changes: 2 additions & 14 deletions engineio/src/transports/polling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,11 @@ impl PollingTransport {
}

impl Transport for PollingTransport {
fn emit(&self, data: Bytes, is_binary_att: bool) -> Result<()> {
let data_to_send = if is_binary_att {
// the binary attachment gets `base64` encoded
let mut packet_bytes = BytesMut::with_capacity(data.len() + 1);
packet_bytes.put_u8(b'b');

let encoded_data = general_purpose::STANDARD.encode(data);
packet_bytes.put(encoded_data.as_bytes());

packet_bytes.freeze()
} else {
data
};
fn emit(&self, data: Bytes, _is_binary_att: bool) -> Result<()> {
let status = self
.client
.post(self.address()?)
.body(data_to_send)
.body(data)
.send()?
.status()
.as_u16();
Expand Down

0 comments on commit 21a6407

Please sign in to comment.