Skip to content

Commit

Permalink
Merge pull request #34 from biryukovmaxim/handle_send_error
Browse files Browse the repository at this point in the history
Handle send error
  • Loading branch information
Totodore authored Jun 23, 2023
2 parents 96668c5 + b6d5e7c commit 6a1a642
Show file tree
Hide file tree
Showing 14 changed files with 311 additions and 58 deletions.
3 changes: 1 addition & 2 deletions e2e/src/engineioxide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.max_payload(1e6 as u64)
.build();


let addr = &"127.0.0.1:3000".parse().unwrap();
let handler = Arc::new(MyHandler);
let svc = EngineIoService::with_config(handler, config);
Expand All @@ -60,4 +59,4 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
server.await?;

Ok(())
}
}
4 changes: 2 additions & 2 deletions engineioxide/benches/benchmark_polling.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::{sync::Arc, time::Duration};
use std::str::FromStr;
use std::{sync::Arc, time::Duration};

use bytes::{Buf, Bytes};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use engineioxide::{handler::EngineIoHandler, service::EngineIoService, socket::Socket};

use engineioxide::sid_generator::Sid;
use http::Request;
use http_body::{Empty, Full};
use serde::{Deserialize, Serialize};
use tower::Service;
use engineioxide::sid_generator::Sid;

/// An OpenPacket is used to initiate a connection
#[derive(Debug, Serialize, Deserialize, PartialEq, PartialOrd)]
Expand Down
2 changes: 1 addition & 1 deletion engineioxide/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct EngineIoConfig {
/// The maximum number of packets that can be buffered per connection before being emitted to the client.
///
/// If the buffer if full the `emit()` method will return an error
///
///
/// Defaults to 128 packets
pub max_buffer_size: usize,

Expand Down
41 changes: 28 additions & 13 deletions socketioxide/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ use itertools::Itertools;
use serde::de::DeserializeOwned;

use crate::{
errors::{AckError, Error},
errors::{
AckError,
BroadcastError,
},
handler::AckResponse,
ns::Namespace,
operators::RoomParam,
packet::Packet,
socket::Socket,
socket::Socket
};

/// A room identifier
Expand Down Expand Up @@ -65,7 +68,6 @@ impl BroadcastOptions {

//TODO: Make an AsyncAdapter trait
pub trait Adapter: std::fmt::Debug + Send + Sync + 'static {

/// Create a new adapter and give the namespace ref to retrieve sockets.
fn new(ns: Weak<Namespace<Self>>) -> Self
where
Expand All @@ -87,7 +89,7 @@ pub trait Adapter: std::fmt::Debug + Send + Sync + 'static {
fn del_all(&self, sid: Sid);

/// Broadcast the packet to the sockets that match the [`BroadcastOptions`].
fn broadcast(&self, packet: Packet, opts: BroadcastOptions) -> Result<(), Error>;
fn broadcast(&self, packet: Packet, opts: BroadcastOptions) -> Result<(), BroadcastError>;

/// Broadcast the packet to the sockets that match the [`BroadcastOptions`] and return a stream of ack responses.
fn broadcast_with_ack<V: DeserializeOwned>(
Expand All @@ -112,7 +114,7 @@ pub trait Adapter: std::fmt::Debug + Send + Sync + 'static {
/// Remove the sockets that match the [`BroadcastOptions`] from the rooms.
fn del_sockets(&self, opts: BroadcastOptions, rooms: impl RoomParam);
/// Disconnect the sockets that match the [`BroadcastOptions`].
fn disconnect_socket(&self, opts: BroadcastOptions) -> Result<(), Error>;
fn disconnect_socket(&self, opts: BroadcastOptions) -> Result<(), BroadcastError>;

//TODO: implement
// fn server_side_emit(&self, packet: Packet, opts: BroadcastOptions) -> Result<u64, Error>;
Expand Down Expand Up @@ -169,13 +171,19 @@ impl Adapter for LocalAdapter {
}
}

fn broadcast(&self, packet: Packet, opts: BroadcastOptions) -> Result<(), Error> {
fn broadcast(&self, packet: Packet, opts: BroadcastOptions) -> Result<(), BroadcastError> {
let sockets = self.apply_opts(opts);

tracing::debug!("broadcasting packet to {} sockets", sockets.len());
sockets
let errors: Vec<_> = sockets
.into_iter()
.try_for_each(|socket| socket.send(packet.clone()))
.filter_map(|socket| socket.send(packet.clone()).err())
.collect();
if errors.is_empty() {
Ok(())
} else {
Err(errors.into())
}
}

fn broadcast_with_ack<V: DeserializeOwned>(
Expand Down Expand Up @@ -238,10 +246,17 @@ impl Adapter for LocalAdapter {
}
}

fn disconnect_socket(&self, opts: BroadcastOptions) -> Result<(), Error> {
self.apply_opts(opts)
fn disconnect_socket(&self, opts: BroadcastOptions) -> Result<(), BroadcastError> {
let errors: Vec<_> = self
.apply_opts(opts)
.into_iter()
.try_for_each(|socket| socket.disconnect())
.filter_map(|socket| socket.disconnect().err())
.collect();
if errors.is_empty() {
Ok(())
} else {
Err(errors.into())
}
}
}

Expand Down Expand Up @@ -292,7 +307,6 @@ impl LocalAdapter {

#[cfg(test)]
mod test {

use super::*;

#[tokio::test]
Expand Down Expand Up @@ -443,7 +457,8 @@ mod test {
let mut opts = BroadcastOptions::new(socket0);
opts.rooms = vec!["room5".to_string()];
match adapter.disconnect_socket(opts) {
Err(Error::EngineGone) | Ok(_) => {}
// todo it returns Ok, in previous commits it also returns Ok
Err(BroadcastError::SendError(_)) | Ok(_) => {}
e => panic!(
"should return an EngineGone error as it is a stub namespace: {:?}",
e
Expand Down
21 changes: 16 additions & 5 deletions socketioxide/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tracing::debug;
use tracing::error;

use crate::adapter::Adapter;
use crate::errors::SendError;
use crate::handshake::Handshake;
use crate::{
config::SocketIoConfig,
Expand Down Expand Up @@ -61,7 +62,7 @@ impl<A: Adapter> Client<A> {
auth: Value,
ns_path: String,
socket: &EIoSocket<Self>,
) -> Result<(), Error> {
) -> Result<(), SendError> {
debug!("auth: {:?}", auth);
let handshake = Handshake::new(auth, socket.req_data.clone());
let sid = socket.sid;
Expand Down Expand Up @@ -135,16 +136,26 @@ impl<A: Adapter> EngineIoHandler for Client<A> {
}
};
debug!("Packet: {:?}", packet);
let res = match packet.inner {
PacketData::Connect(auth) => self.sock_connect(auth, packet.ns, socket),

#[derive(Debug)]
enum CurrentError {
SendError(SendError),
CommonError(Error),
}
let res: Result<(), CurrentError> = match packet.inner {
PacketData::Connect(auth) => self
.sock_connect(auth, packet.ns, socket)
.map_err(CurrentError::SendError),
PacketData::BinaryEvent(_, _, _) | PacketData::BinaryAck(_, _) => {
self.sock_recv_bin_packet(socket, packet);
Ok(())
}
_ => self.sock_propagate_packet(packet, socket.sid),
_ => self
.sock_propagate_packet(packet, socket.sid)
.map_err(CurrentError::CommonError),
};
if let Err(err) = res {
error!("error while processing packet: {}", err);
error!("error while processing packet: {:?}", err);
socket.close();
}
}
Expand Down
15 changes: 7 additions & 8 deletions socketioxide/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub struct SocketIoConfigBuilder {
}

impl SocketIoConfigBuilder {

/// Create a new config builder
pub fn new() -> Self {
Self {
Expand All @@ -17,23 +16,23 @@ impl SocketIoConfigBuilder {
}

/// The path to listen for socket.io requests on.
///
///
/// Defaults to "/socket.io".
pub fn req_path(mut self, req_path: String) -> Self {
self.config.engine_config.req_path = req_path;
self
}

/// The interval at which the server will send a ping packet to the client.
///
///
/// Defaults to 25 seconds.
pub fn ping_interval(mut self, ping_interval: Duration) -> Self {
self.config.engine_config.ping_interval = ping_interval;
self
}

/// The amount of time the server will wait for a ping response from the client before closing the connection.
///
///
/// Defaults to 20 seconds.
pub fn ping_timeout(mut self, ping_timeout: Duration) -> Self {
self.config.engine_config.ping_timeout = ping_timeout;
Expand All @@ -42,7 +41,7 @@ impl SocketIoConfigBuilder {

/// The maximum number of packets that can be buffered per connection before being emitted to the client.
/// If the buffer if full the `emit()` method will return an error
///
///
/// Defaults to 128 packets.
pub fn max_buffer_size(mut self, max_buffer_size: usize) -> Self {
self.config.engine_config.max_buffer_size = max_buffer_size;
Expand All @@ -51,15 +50,15 @@ impl SocketIoConfigBuilder {

/// The maximum size of a payload in bytes.
/// If a payload is bigger than this value the `emit()` method will return an error.
///
///
/// Defaults to 100 kb.
pub fn max_payload(mut self, max_payload: u64) -> Self {
self.config.engine_config.max_payload = max_payload;
self
}

/// The amount of time the server will wait for an acknowledgement from the client before closing the connection.
///
///
/// Defaults to 5 seconds.
pub fn ack_timeout(mut self, ack_timeout: Duration) -> Self {
self.config.ack_timeout = ack_timeout;
Expand All @@ -85,7 +84,7 @@ pub struct SocketIoConfig {
pub(crate) engine_config: EngineIoConfig,

/// The amount of time the server will wait for an acknowledgement from the client before closing the connection.
///
///
/// Defaults to 5 seconds.
pub(crate) ack_timeout: Duration,
}
Expand Down
54 changes: 54 additions & 0 deletions socketioxide/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::retryer::Retryer;
use engineioxide::sid_generator::Sid;
use std::fmt::Debug;
use tokio::sync::oneshot;

/// Error type for socketio
Expand Down Expand Up @@ -42,4 +44,56 @@ pub enum AckError {
/// Internal error
#[error("internal error: {0}")]
InternalError(#[from] Error),

#[error("send channel error: {0:?}")]
SendChannel(#[from] SendError),
}

/// Error type for broadcast operations.
#[derive(Debug, thiserror::Error)]
pub enum BroadcastError {
/// An error occurred while sending packets.
#[error("Sending error: {0:?}")]
SendError(Vec<SendError>),

/// An error occurred while serializing the JSON packet.
#[error("Error serializing JSON packet: {0:?}")]
Serialize(#[from] serde_json::Error),
}

impl From<Vec<SendError>> for BroadcastError {
/// Converts a vector of `SendError` into a `BroadcastError`.
///
/// # Arguments
///
/// * `value` - A vector of `SendError` representing the sending errors.
///
/// # Returns
///
/// A `BroadcastError` containing the sending errors.
fn from(value: Vec<SendError>) -> Self {
Self::SendError(value)
}
}

/// Error type for sending operations.
#[derive(thiserror::Error, Debug)]
pub enum SendError {
/// An error occurred while serializing the JSON packet.
#[error("Error serializing JSON packet: {0:?}")]
Serialize(#[from] serde_json::Error),
/// An error occurred during the retry process in the `Retryer`.
#[error("Send error: {0:?}")]
RetryerError(#[from] RetryerError),
}

/// Error type for the `Retryer` struct indicating various failure scenarios during the retry process.
#[derive(thiserror::Error, Debug)]
pub enum RetryerError {
/// The packet was sent to a closed socket channel.
#[error("Sent to a closed socket channel, sid: {sid}")]
SocketClosed { sid: Sid },
/// There are remaining packets to be sent, indicating that the socket channel is full.
#[error("Sent to a full socket channel")]
Remaining(Retryer),
}
3 changes: 2 additions & 1 deletion socketioxide/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::future::BoxFuture;
use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;

use crate::errors::SendError;
use crate::{adapter::Adapter, errors::Error, packet::Packet, Socket};

pub type AckResponse<T> = (T, Vec<Vec<u8>>);
Expand Down Expand Up @@ -105,7 +106,7 @@ impl<A: Adapter> AckSender<A> {
}

/// Send the ack response to the client.
pub fn send(self, data: impl Serialize) -> Result<(), Error> {
pub fn send(self, data: impl Serialize) -> Result<(), SendError> {
if let Some(ack_id) = self.ack_id {
let ns = self.socket.ns().clone();
let data = serde_json::to_value(&data)?;
Expand Down
Loading

0 comments on commit 6a1a642

Please sign in to comment.