Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handling send to socket error #31

Merged
merged 2 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions socketioxide/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use futures::{
use itertools::Itertools;
use serde::de::DeserializeOwned;

use crate::errors::SendError;
use crate::{
errors::{AckError, Error},
handler::AckResponse,
Expand Down Expand Up @@ -65,7 +66,6 @@ impl BroadcastOptions {

//TODO: Make an AsyncAdapter trait
pub trait Adapter: std::fmt::Debug + Send + Sync + 'static {

/// Create a new adapter and give the namespace ref to retrieve sockets.
fn new(ns: Weak<Namespace<Self>>) -> Self
where
Expand Down Expand Up @@ -173,9 +173,14 @@ impl Adapter for LocalAdapter {
let sockets = self.apply_opts(opts);

tracing::debug!("broadcasting packet to {} sockets", sockets.len());
sockets
.into_iter()
.try_for_each(|socket| socket.send(packet.clone()))
sockets.into_iter().try_for_each(|socket| {
if let Err(SendError::SocketFull { .. }) = socket.send(packet.clone()) {
// todo skip message? return err? try send later? create delayed queue?
biryukovmaxim marked this conversation as resolved.
Show resolved Hide resolved
Ok(())
} else {
Ok(())
}
})
}

fn broadcast_with_ack<V: DeserializeOwned>(
Expand Down
35 changes: 35 additions & 0 deletions socketioxide/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use engineioxide::sid_generator::Sid;
use std::fmt::Debug;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::oneshot;
use tracing::warn;

/// Error type for socketio
#[derive(thiserror::Error, Debug)]
Expand All @@ -22,6 +25,9 @@ pub enum Error {
/// An engineio error
#[error("engineio error: {0}")]
EngineIoError(#[from] engineioxide::errors::Error),

#[error("send channel error: {0:?}")]
SendChannel(#[from] SendError<engineioxide::SendPacket>),
}

/// Error type for ack responses
Expand All @@ -42,4 +48,33 @@ pub enum AckError {
/// Internal error
#[error("internal error: {0}")]
InternalError(#[from] Error),

#[error("send channel error: {0:?}")]
SendChannel(#[from] SendError<engineioxide::SendPacket>),
}

/// Error type for ack responses
#[derive(thiserror::Error, Debug)]
pub enum SendError<T: Debug> {
#[error("sent to full socket chan, sid: {sid}, packet: {packet:?}")]
SocketFull { sid: Sid, packet: T },
#[error("sent to closed socket chan, sid: {sid}, packet: {packet:?}")]
SocketClosed { sid: Sid, packet: T },
#[error("error serializing json packet: {0:?}")]
Serialize(#[from] serde_json::Error),
}

impl<T: Debug> From<(TrySendError<T>, Sid)> for SendError<T> {
fn from((err, sid): (TrySendError<T>, Sid)) -> Self {
match err {
TrySendError::Closed(packet) => {
warn!("try to send to closed socket, sid: {sid}, packet: {packet:?}");
biryukovmaxim marked this conversation as resolved.
Show resolved Hide resolved
Self::SocketClosed { sid, packet }
}
TrySendError::Full(packet) => {
warn!("try to send to full socket, sid: {sid}, packet: {packet:?}");
Self::SocketFull { sid, packet }
}
}
}
}
10 changes: 9 additions & 1 deletion socketioxide/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::future::BoxFuture;
use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;

use crate::errors::SendError;
use crate::{adapter::Adapter, errors::Error, packet::Packet, Socket};

pub type AckResponse<T> = (T, Vec<Vec<u8>>);
Expand Down Expand Up @@ -114,7 +115,14 @@ impl<A: Adapter> AckSender<A> {
} else {
Packet::bin_ack(ns, data, self.binary, ack_id)
};
self.socket.send(packet)
match self.socket.send(packet) {
Err(err @ SendError::SocketFull { .. }) => {
// todo skip message? return err? try send later? create delayed queue?
Err(err.into())
}
Err(err) => Err(err.into()),
Ok(_) => Ok(()),
}
} else {
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions socketioxide/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl BinaryPacket {
}

impl TryInto<String> for Packet {
type Error = Error;
type Error = serde_json::Error;

fn try_into(self) -> Result<String, Self::Error> {
let mut res = self.inner.index().to_string();
Expand Down Expand Up @@ -371,8 +371,8 @@ impl TryFrom<String> for Packet {
}

impl TryInto<EnginePacket> for Packet {
type Error = Error;
fn try_into(self) -> Result<EnginePacket, Error> {
type Error = serde_json::Error;
fn try_into(self) -> Result<EnginePacket, Self::Error> {
Ok(EnginePacket::Message(self.try_into()?))
}
}
Expand Down
23 changes: 18 additions & 5 deletions socketioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;
use tokio::sync::oneshot;

use crate::errors::SendError;
use crate::{
adapter::{Adapter, Room},
errors::{AckError, Error},
Expand Down Expand Up @@ -141,7 +142,8 @@ impl<A: Adapter> Socket<A> {
pub fn emit(&self, event: impl Into<String>, data: impl Serialize) -> Result<(), Error> {
let ns = self.ns.path.clone();
let data = serde_json::to_value(data)?;
self.send(Packet::event(ns, event.into(), data))
self.send(Packet::event(ns, event.into(), data))?;
Ok(())
}

/// Emit a message to the client and wait for acknowledgement.
Expand Down Expand Up @@ -350,7 +352,7 @@ impl<A: Adapter> Socket<A> {
&self.ns.path
}

pub(crate) fn send(&self, mut packet: Packet) -> Result<(), Error> {
pub(crate) fn send(&self, mut packet: Packet) -> Result<(), SendError<EnginePacket>> {
let payload = match packet.inner {
PacketData::BinaryEvent(_, ref mut bin, _) | PacketData::BinaryAck(ref mut bin, _) => {
std::mem::take(&mut bin.bin)
Expand All @@ -359,10 +361,14 @@ impl<A: Adapter> Socket<A> {
};

//TODO: fix unwrap
self.tx.try_send(packet.try_into()?).unwrap();
self.tx
.try_send(packet.try_into()?)
.map_err(|err| (err, self.sid))?;

for bin in payload {
self.tx.try_send(EnginePacket::Binary(bin)).unwrap();
self.tx
.try_send(EnginePacket::Binary(bin))
.map_err(|err| (err, self.sid))?;
}
Ok(())
}
Expand All @@ -376,7 +382,14 @@ impl<A: Adapter> Socket<A> {
let ack = self.ack_counter.fetch_add(1, Ordering::SeqCst) + 1;
self.ack_message.write().unwrap().insert(ack, tx);
packet.inner.set_ack_id(ack);
self.send(packet)?;
match self.send(packet) {
Err(err @ SendError::SocketFull { .. }) => {
// todo skip message? return err? try send later? create delayed queue?
Err(err)
}
Err(err) => Err(err),
Ok(_) => Ok(()),
}?;
let timeout = timeout.unwrap_or(self.config.ack_timeout);
let v = tokio::time::timeout(timeout, rx).await??;
Ok((serde_json::from_value(v.0)?, v.1))
Expand Down