From dfe962ad166ae4b6e55f02c1f050011a4e9ba8a5 Mon Sep 17 00:00:00 2001 From: max143672 Date: Wed, 21 Jun 2023 11:26:03 +0400 Subject: [PATCH 1/2] handling send to socket error --- socketioxide/src/adapter.rs | 13 +++++++++---- socketioxide/src/errors.rs | 35 +++++++++++++++++++++++++++++++++++ socketioxide/src/handler.rs | 10 +++++++++- socketioxide/src/packet.rs | 6 +++--- socketioxide/src/socket.rs | 23 ++++++++++++++++++----- 5 files changed, 74 insertions(+), 13 deletions(-) diff --git a/socketioxide/src/adapter.rs b/socketioxide/src/adapter.rs index 8d980e70..fb764b75 100644 --- a/socketioxide/src/adapter.rs +++ b/socketioxide/src/adapter.rs @@ -17,6 +17,7 @@ use futures::{ use itertools::Itertools; use serde::de::DeserializeOwned; +use crate::errors::SendError; use crate::{ errors::{AckError, Error}, handler::AckResponse, @@ -65,7 +66,6 @@ impl BroadcastOptions { //TODO: Make an AsyncAdapter trait pub trait Adapter: std::fmt::Debug + Send + Sync + 'static { - /// Create a new adapter and give the namespace ref to retrieve sockets. fn new(ns: Weak>) -> Self where @@ -173,9 +173,14 @@ impl Adapter for LocalAdapter { let sockets = self.apply_opts(opts); tracing::debug!("broadcasting packet to {} sockets", sockets.len()); - sockets - .into_iter() - .try_for_each(|socket| socket.send(packet.clone())) + sockets.into_iter().try_for_each(|socket| { + if let Err(SendError::SocketFull { .. }) = socket.send(packet.clone()) { + // todo skip message? return err? try send later? create delayed queue? + Ok(()) + } else { + Ok(()) + } + }) } fn broadcast_with_ack( diff --git a/socketioxide/src/errors.rs b/socketioxide/src/errors.rs index 8e8729ec..f65a16ab 100644 --- a/socketioxide/src/errors.rs +++ b/socketioxide/src/errors.rs @@ -1,5 +1,8 @@ use engineioxide::sid_generator::Sid; +use std::fmt::Debug; +use tokio::sync::mpsc::error::TrySendError; use tokio::sync::oneshot; +use tracing::warn; /// Error type for socketio #[derive(thiserror::Error, Debug)] @@ -22,6 +25,9 @@ pub enum Error { /// An engineio error #[error("engineio error: {0}")] EngineIoError(#[from] engineioxide::errors::Error), + + #[error("send channel error: {0:?}")] + SendChannel(#[from] SendError), } /// Error type for ack responses @@ -42,4 +48,33 @@ pub enum AckError { /// Internal error #[error("internal error: {0}")] InternalError(#[from] Error), + + #[error("send channel error: {0:?}")] + SendChannel(#[from] SendError), +} + +/// Error type for ack responses +#[derive(thiserror::Error, Debug)] +pub enum SendError { + #[error("sent to full socket chan, sid: {sid}, packet: {packet:?}")] + SocketFull { sid: Sid, packet: T }, + #[error("sent to closed socket chan, sid: {sid}, packet: {packet:?}")] + SocketClosed { sid: Sid, packet: T }, + #[error("error serializing json packet: {0:?}")] + Serialize(#[from] serde_json::Error), +} + +impl From<(TrySendError, Sid)> for SendError { + fn from((err, sid): (TrySendError, Sid)) -> Self { + match err { + TrySendError::Closed(packet) => { + warn!("try to send to closed socket, sid: {sid}, packet: {packet:?}"); + Self::SocketClosed { sid, packet } + } + TrySendError::Full(packet) => { + warn!("try to send to full socket, sid: {sid}, packet: {packet:?}"); + Self::SocketFull { sid, packet } + } + } + } } diff --git a/socketioxide/src/handler.rs b/socketioxide/src/handler.rs index 4f1f9582..b177160f 100644 --- a/socketioxide/src/handler.rs +++ b/socketioxide/src/handler.rs @@ -4,6 +4,7 @@ use futures::future::BoxFuture; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; +use crate::errors::SendError; use crate::{adapter::Adapter, errors::Error, packet::Packet, Socket}; pub type AckResponse = (T, Vec>); @@ -114,7 +115,14 @@ impl AckSender { } else { Packet::bin_ack(ns, data, self.binary, ack_id) }; - self.socket.send(packet) + match self.socket.send(packet) { + Err(err @ SendError::SocketFull { .. }) => { + // todo skip message? return err? try send later? create delayed queue? + Err(err.into()) + } + Err(err) => Err(err.into()), + Ok(_) => Ok(()), + } } else { Ok(()) } diff --git a/socketioxide/src/packet.rs b/socketioxide/src/packet.rs index c3d14bbd..672a329a 100644 --- a/socketioxide/src/packet.rs +++ b/socketioxide/src/packet.rs @@ -187,7 +187,7 @@ impl BinaryPacket { } impl TryInto for Packet { - type Error = Error; + type Error = serde_json::Error; fn try_into(self) -> Result { let mut res = self.inner.index().to_string(); @@ -371,8 +371,8 @@ impl TryFrom for Packet { } impl TryInto for Packet { - type Error = Error; - fn try_into(self) -> Result { + type Error = serde_json::Error; + fn try_into(self) -> Result { Ok(EnginePacket::Message(self.try_into()?)) } } diff --git a/socketioxide/src/socket.rs b/socketioxide/src/socket.rs index cea7a747..7f7b8e45 100644 --- a/socketioxide/src/socket.rs +++ b/socketioxide/src/socket.rs @@ -15,6 +15,7 @@ use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use tokio::sync::oneshot; +use crate::errors::SendError; use crate::{ adapter::{Adapter, Room}, errors::{AckError, Error}, @@ -141,7 +142,8 @@ impl Socket { pub fn emit(&self, event: impl Into, data: impl Serialize) -> Result<(), Error> { let ns = self.ns.path.clone(); let data = serde_json::to_value(data)?; - self.send(Packet::event(ns, event.into(), data)) + self.send(Packet::event(ns, event.into(), data))?; + Ok(()) } /// Emit a message to the client and wait for acknowledgement. @@ -350,7 +352,7 @@ impl Socket { &self.ns.path } - pub(crate) fn send(&self, mut packet: Packet) -> Result<(), Error> { + pub(crate) fn send(&self, mut packet: Packet) -> Result<(), SendError> { let payload = match packet.inner { PacketData::BinaryEvent(_, ref mut bin, _) | PacketData::BinaryAck(ref mut bin, _) => { std::mem::take(&mut bin.bin) @@ -359,10 +361,14 @@ impl Socket { }; //TODO: fix unwrap - self.tx.try_send(packet.try_into()?).unwrap(); + self.tx + .try_send(packet.try_into()?) + .map_err(|err| (err, self.sid))?; for bin in payload { - self.tx.try_send(EnginePacket::Binary(bin)).unwrap(); + self.tx + .try_send(EnginePacket::Binary(bin)) + .map_err(|err| (err, self.sid))?; } Ok(()) } @@ -376,7 +382,14 @@ impl Socket { let ack = self.ack_counter.fetch_add(1, Ordering::SeqCst) + 1; self.ack_message.write().unwrap().insert(ack, tx); packet.inner.set_ack_id(ack); - self.send(packet)?; + match self.send(packet) { + Err(err @ SendError::SocketFull { .. }) => { + // todo skip message? return err? try send later? create delayed queue? + Err(err) + } + Err(err) => Err(err), + Ok(_) => Ok(()), + }?; let timeout = timeout.unwrap_or(self.config.ack_timeout); let v = tokio::time::timeout(timeout, rx).await??; Ok((serde_json::from_value(v.0)?, v.1)) From 24044e5983896b8f40e8dbfaf975641d3ac028a8 Mon Sep 17 00:00:00 2001 From: max143672 Date: Wed, 21 Jun 2023 13:50:29 +0400 Subject: [PATCH 2/2] handle broadcast error --- socketioxide/src/adapter.rs | 24 ++++++++++++------------ socketioxide/src/errors.rs | 27 ++++++++++++++++++++++++--- socketioxide/src/handler.rs | 11 ++--------- socketioxide/src/operators.rs | 3 ++- socketioxide/src/socket.rs | 13 ++----------- 5 files changed, 42 insertions(+), 36 deletions(-) diff --git a/socketioxide/src/adapter.rs b/socketioxide/src/adapter.rs index fb764b75..a222d105 100644 --- a/socketioxide/src/adapter.rs +++ b/socketioxide/src/adapter.rs @@ -17,9 +17,8 @@ use futures::{ use itertools::Itertools; use serde::de::DeserializeOwned; -use crate::errors::SendError; use crate::{ - errors::{AckError, Error}, + errors::{AckError, BroadcastError, Error}, handler::AckResponse, ns::Namespace, operators::RoomParam, @@ -87,7 +86,7 @@ pub trait Adapter: std::fmt::Debug + Send + Sync + 'static { fn del_all(&self, sid: Sid); /// Broadcast the packet to the sockets that match the [`BroadcastOptions`]. - fn broadcast(&self, packet: Packet, opts: BroadcastOptions) -> Result<(), Error>; + fn broadcast(&self, packet: Packet, opts: BroadcastOptions) -> Result<(), BroadcastError>; /// Broadcast the packet to the sockets that match the [`BroadcastOptions`] and return a stream of ack responses. fn broadcast_with_ack( @@ -169,18 +168,19 @@ impl Adapter for LocalAdapter { } } - fn broadcast(&self, packet: Packet, opts: BroadcastOptions) -> Result<(), Error> { + fn broadcast(&self, packet: Packet, opts: BroadcastOptions) -> Result<(), BroadcastError> { let sockets = self.apply_opts(opts); tracing::debug!("broadcasting packet to {} sockets", sockets.len()); - sockets.into_iter().try_for_each(|socket| { - if let Err(SendError::SocketFull { .. }) = socket.send(packet.clone()) { - // todo skip message? return err? try send later? create delayed queue? - Ok(()) - } else { - Ok(()) - } - }) + let errors: Vec<_> = sockets + .into_iter() + .filter_map(|socket| socket.send(packet.clone()).err()) + .collect(); + if errors.is_empty() { + Ok(()) + } else { + Err(errors.into()) + } } fn broadcast_with_ack( diff --git a/socketioxide/src/errors.rs b/socketioxide/src/errors.rs index f65a16ab..5419bb8c 100644 --- a/socketioxide/src/errors.rs +++ b/socketioxide/src/errors.rs @@ -1,8 +1,9 @@ use engineioxide::sid_generator::Sid; use std::fmt::Debug; +use std::ops::Deref; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::oneshot; -use tracing::warn; +use tracing::debug; /// Error type for socketio #[derive(thiserror::Error, Debug)] @@ -28,6 +29,8 @@ pub enum Error { #[error("send channel error: {0:?}")] SendChannel(#[from] SendError), + #[error("broadcast packet error: {0:?}")] + BroadcastError(#[from] BroadcastError), } /// Error type for ack responses @@ -53,6 +56,24 @@ pub enum AckError { SendChannel(#[from] SendError), } +#[derive(Debug, thiserror::Error)] +#[error("send channel error: {0:?}")] +pub struct BroadcastError(Vec>); + +impl From>> for BroadcastError { + fn from(value: Vec>) -> Self { + Self(value) + } +} + +impl Deref for BroadcastError { + type Target = Vec>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + /// Error type for ack responses #[derive(thiserror::Error, Debug)] pub enum SendError { @@ -68,11 +89,11 @@ impl From<(TrySendError, Sid)> for SendError { fn from((err, sid): (TrySendError, Sid)) -> Self { match err { TrySendError::Closed(packet) => { - warn!("try to send to closed socket, sid: {sid}, packet: {packet:?}"); + debug!("try to send to closed socket, sid: {sid}, packet: {packet:?}"); Self::SocketClosed { sid, packet } } TrySendError::Full(packet) => { - warn!("try to send to full socket, sid: {sid}, packet: {packet:?}"); + debug!("try to send to full socket, sid: {sid}, packet: {packet:?}"); Self::SocketFull { sid, packet } } } diff --git a/socketioxide/src/handler.rs b/socketioxide/src/handler.rs index b177160f..82cfdb9b 100644 --- a/socketioxide/src/handler.rs +++ b/socketioxide/src/handler.rs @@ -4,7 +4,6 @@ use futures::future::BoxFuture; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; -use crate::errors::SendError; use crate::{adapter::Adapter, errors::Error, packet::Packet, Socket}; pub type AckResponse = (T, Vec>); @@ -115,14 +114,8 @@ impl AckSender { } else { Packet::bin_ack(ns, data, self.binary, ack_id) }; - match self.socket.send(packet) { - Err(err @ SendError::SocketFull { .. }) => { - // todo skip message? return err? try send later? create delayed queue? - Err(err.into()) - } - Err(err) => Err(err.into()), - Ok(_) => Ok(()), - } + self.socket.send(packet)?; + Ok(()) } else { Ok(()) } diff --git a/socketioxide/src/operators.rs b/socketioxide/src/operators.rs index 80363545..3ab198d5 100644 --- a/socketioxide/src/operators.rs +++ b/socketioxide/src/operators.rs @@ -231,7 +231,8 @@ impl Operators { data: impl serde::Serialize, ) -> Result<(), Error> { let packet = self.get_packet(event, data)?; - self.ns.adapter.broadcast(packet, self.opts) + self.ns.adapter.broadcast(packet, self.opts)?; + Ok(()) } /// Emit a message to all clients selected with the previous operators and return a stream of acknowledgements. diff --git a/socketioxide/src/socket.rs b/socketioxide/src/socket.rs index 7f7b8e45..62f6a34e 100644 --- a/socketioxide/src/socket.rs +++ b/socketioxide/src/socket.rs @@ -8,8 +8,7 @@ use std::{ time::Duration, }; -use engineioxide::sid_generator::Sid; -use engineioxide::SendPacket as EnginePacket; +use engineioxide::{sid_generator::Sid, SendPacket as EnginePacket}; use futures::Future; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; @@ -360,7 +359,6 @@ impl Socket { _ => vec![], }; - //TODO: fix unwrap self.tx .try_send(packet.try_into()?) .map_err(|err| (err, self.sid))?; @@ -382,14 +380,7 @@ impl Socket { let ack = self.ack_counter.fetch_add(1, Ordering::SeqCst) + 1; self.ack_message.write().unwrap().insert(ack, tx); packet.inner.set_ack_id(ack); - match self.send(packet) { - Err(err @ SendError::SocketFull { .. }) => { - // todo skip message? return err? try send later? create delayed queue? - Err(err) - } - Err(err) => Err(err), - Ok(_) => Ok(()), - }?; + self.send(packet)?; let timeout = timeout.unwrap_or(self.config.ack_timeout); let v = tokio::time::timeout(timeout, rx).await??; Ok((serde_json::from_value(v.0)?, v.1))