Skip to content

Commit

Permalink
test(socketio): switch from rust_socketio client to custom test har…
Browse files Browse the repository at this point in the history
…ness
  • Loading branch information
Totodore committed Apr 4, 2024
1 parent 7c26a31 commit b645663
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 100 deletions.
2 changes: 1 addition & 1 deletion engineioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ where
sid: Sid,
close_fn: Box<dyn Fn(Sid, DisconnectReason) + Send + Sync>,
) -> Arc<Socket<D>> {
self.new_dummy_piped(sid, close_fn).0
Socket::new_dummy_piped(sid, close_fn).0
}

/// Create a dummy socket for testing purpose with a
Expand Down
5 changes: 4 additions & 1 deletion socketioxide/src/ns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ impl<A: Adapter> Namespace<A> {
}

self.sockets.write().unwrap().insert(sid, socket.clone());
#[cfg(feature = "tracing")]
tracing::trace!(?socket.id, ?self.path, "socket added to namespace");

let protocol = esocket.protocol.into();

if let Err(_e) = socket.send(Packet::connect(&self.path, socket.id, protocol)) {
Expand Down Expand Up @@ -129,7 +132,7 @@ impl<A: Adapter> Namespace<A> {
}
}

#[cfg(test)]
#[cfg(any(test, socketioxide_test))]
impl<A: Adapter> Namespace<A> {
pub fn new_dummy<const S: usize>(sockets: [Sid; S]) -> Arc<Self> {
let ns = Namespace::new(Cow::Borrowed("/"), || {});
Expand Down
1 change: 1 addition & 0 deletions socketioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,7 @@ impl<A: Adapter> PartialEq for Socket<A> {

#[cfg(any(test, socketioxide_test))]
impl<A: Adapter> Socket<A> {
/// Creates a dummy socket for testing purposes
pub fn new_dummy(sid: Sid, ns: Arc<Namespace<A>>) -> Socket<A> {
let close_fn = Box::new(move |_, _| ());
let s = Socket::new(
Expand Down
118 changes: 62 additions & 56 deletions socketioxide/tests/extractors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,83 +5,89 @@ use serde_json::json;
use socketioxide::extract::{Data, SocketRef, State, TryData};
use tokio::sync::mpsc;

use fixture::{create_server, create_server_with_state};

use crate::fixture::socketio_client;

use engineioxide::Packet as EioPacket;
use socketioxide::packet::Packet;
use socketioxide::SocketIo;
mod fixture;
mod utils;

async fn timeout_rcv<T: std::fmt::Debug>(srx: &mut tokio::sync::mpsc::Receiver<T>) -> T {
tokio::time::timeout(Duration::from_millis(200), srx.recv())
.await
.unwrap()
.unwrap()
}

#[tokio::test]
pub async fn state_extractor() {
const PORT: u16 = 2000;
const TIMEOUT: Duration = Duration::from_millis(200);
let state = 1112i32;
let io = create_server_with_state(PORT, state).await;
let (tx, mut rx) = mpsc::channel::<i32>(4);
io.ns("/", move |socket: SocketRef, state: State<i32>| {
assert_ok!(tx.try_send(*state));
socket.on("test", move |State(state): State<i32>| {
assert_ok!(tx.try_send(*state))
let (_, io) = SocketIo::builder().with_state(state).build_svc();

io.ns("/", |socket: SocketRef, State(state): State<i32>| {
assert_ok!(socket.emit("state", state));
socket.on("test", |socket: SocketRef, State(state): State<i32>| {
assert_ok!(socket.emit("state", state));
});
});
let client = assert_ok!(socketio_client(PORT, ()).await);
assert_eq!(
tokio::time::timeout(TIMEOUT, rx.recv())
.await
.unwrap()
.unwrap(),
state
);

assert_ok!(client.emit("test", json!("foo")).await);
assert_eq!(
tokio::time::timeout(TIMEOUT, rx.recv())
.await
.unwrap()
.unwrap(),
state
);

assert_ok!(client.disconnect().await);
let res_packet = EioPacket::Message(Packet::event("/", "state", state.into()).into());

// Connect packet
let (stx, mut srx) = io.new_dummy_sock("/", ()).await;
srx.recv().await;

// First echoed res packet from connect handler
assert_eq!(timeout_rcv(&mut srx).await, res_packet);

let packet = EioPacket::Message(Packet::event("/", "test", json!("foo")).into());
assert_ok!(stx.try_send(packet));

// second echoed res packet from test event handler
assert_eq!(timeout_rcv(&mut srx).await, res_packet);
}

#[tokio::test]
pub async fn data_extractor() {
const PORT: u16 = 2001;
let io = create_server(PORT).await;
let (_, io) = SocketIo::new_svc();
let (tx, mut rx) = mpsc::channel::<String>(4);
let tx1 = tx.clone();

io.ns("/", move |socket: SocketRef, Data(data): Data<String>| {
assert_ok!(tx.try_send(data));
socket.on("test", move |Data(data): Data<String>| {
assert_ok!(tx.try_send(data));
});
});

assert_ok!(socketio_client(PORT, ()).await);
assert_ok!(socketio_client(PORT, 1321).await);
io.new_dummy_sock("/", ()).await;
assert!(matches!(
rx.try_recv(),
Err(mpsc::error::TryRecvError::Empty)
));
io.new_dummy_sock("/", 1321).await;
assert!(matches!(
rx.try_recv(),
Err(mpsc::error::TryRecvError::Empty)
));

// Capacity should be the same as the handler should not be called
assert_eq!(tx1.capacity(), 4);

let client = assert_ok!(socketio_client(PORT, "foo").await);
assert_eq!(rx.recv().await.unwrap(), "foo");
let (stx, _rtx) = io.new_dummy_sock("/", "foo").await;
assert_eq!(timeout_rcv(&mut rx).await, "foo");

assert_ok!(client.emit("test", json!("oof")).await);
assert_eq!(rx.recv().await.unwrap(), "oof");
let packet = EioPacket::Message(Packet::event("/", "test", json!("oof")).into());
assert_ok!(stx.try_send(packet));
assert_eq!(timeout_rcv(&mut rx).await, "oof");

assert_ok!(client.emit("test", json!({ "test": 132 })).await);
let packet = EioPacket::Message(Packet::event("/", "test", json!({ "test": 132 })).into());
assert_ok!(stx.try_send(packet));
// Capacity should be the same as the handler should not be called
assert_eq!(tx1.capacity(), 4);

assert_ok!(client.disconnect().await);
}

#[tokio::test]
pub async fn try_data_extractor() {
const PORT: u16 = 2002;
let io = create_server(PORT).await;
let (_, io) = SocketIo::new_svc();
let (tx, mut rx) = mpsc::channel::<Result<String, serde_json::Error>>(4);
io.ns("/", move |s: SocketRef, TryData(data): TryData<String>| {
assert_ok!(tx.try_send(data));
Expand All @@ -91,24 +97,24 @@ pub async fn try_data_extractor() {
});

// Non deserializable data
assert_ok!(socketio_client(PORT, ()).await);
assert_err!(rx.recv().await.unwrap());
io.new_dummy_sock("/", ()).await;
assert_err!(timeout_rcv(&mut rx).await);

// Non deserializable data
assert_ok!(socketio_client(PORT, 1321).await);
assert_err!(rx.recv().await.unwrap());
io.new_dummy_sock("/", 1321).await;
assert_err!(timeout_rcv(&mut rx).await);

let client = assert_ok!(socketio_client(PORT, "foo").await);
let res = assert_ok!(rx.recv().await.unwrap());
let (stx, _rtx) = io.new_dummy_sock("/", "foo").await;
let res = assert_ok!(timeout_rcv(&mut rx).await);
assert_eq!(res, "foo");

assert_ok!(client.emit("test", json!("oof")).await);
let res = assert_ok!(rx.recv().await.unwrap());
let packet = EioPacket::Message(Packet::event("/", "test", json!("oof")).into());
assert_ok!(stx.try_send(packet));
let res = assert_ok!(timeout_rcv(&mut rx).await);
assert_eq!(res, "oof");

// Non deserializable data
assert_ok!(client.emit("test", json!({ "test": 132 })).await);
assert_err!(rx.recv().await.unwrap());

assert_ok!(client.disconnect().await);
let packet = EioPacket::Message(Packet::event("/", "test", json!({ "test": 132 })).into());
assert_ok!(stx.try_send(packet));
assert_err!(timeout_rcv(&mut rx).await);
}
44 changes: 2 additions & 42 deletions socketioxide/tests/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,15 @@ use std::{
};

use engineioxide::service::NotFoundService;
use futures::{future::BoxFuture, SinkExt};
use futures::SinkExt;
use http::Request;
use http_body_util::{BodyExt, Either, Empty, Full};
use hyper::server::conn::http1;
use hyper_util::{
client::legacy::Client,
rt::{TokioExecutor, TokioIo},
};
use rust_socketio::{
asynchronous::{Client as SocketIoClient, ClientBuilder},
Payload,
};

use serde::{Deserialize, Serialize};
use socketioxide::{adapter::LocalAdapter, service::SocketIoService, SocketIo};
use tokio::net::{TcpListener, TcpStream};
Expand Down Expand Up @@ -103,18 +100,6 @@ pub async fn create_ws_connection(port: u16) -> WebSocketStream<MaybeTlsStream<T
ws
}

pub async fn create_server_with_state<T: Send + Sync + 'static>(port: u16, state: T) -> SocketIo {
let (svc, io) = SocketIo::builder()
.ping_interval(Duration::from_millis(300))
.ping_timeout(Duration::from_millis(200))
.with_state(state)
.build_svc();

spawn_server(port, svc).await;

io
}

pub async fn create_server(port: u16) -> SocketIo {
let (svc, io) = SocketIo::builder()
.ping_interval(Duration::from_millis(300))
Expand All @@ -126,31 +111,6 @@ pub async fn create_server(port: u16) -> SocketIo {
io
}

pub async fn socketio_client_with_handler<F>(
port: u16,
event: &str,
callback: F,
auth: impl Into<serde_json::Value>,
) -> Result<SocketIoClient, rust_socketio::Error>
where
F: FnMut(Payload, SocketIoClient) -> BoxFuture<'static, ()> + Send + Sync + 'static,
{
ClientBuilder::new(format!("http://127.0.0.1:{}", port))
.on(event, callback)
.auth(auth)
.connect()
.await
}
pub async fn socketio_client(
port: u16,
auth: impl Into<serde_json::Value>,
) -> Result<SocketIoClient, rust_socketio::Error> {
ClientBuilder::new(format!("http://127.0.0.1:{}", port))
.auth(auth)
.connect()
.await
}

async fn spawn_server(port: u16, svc: SocketIoService<NotFoundService, LocalAdapter>) {
let addr = &SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let listener = TcpListener::bind(&addr).await.unwrap();
Expand Down

0 comments on commit b645663

Please sign in to comment.