Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(socketio): improve testing by directly using the rust_socketio client #219

Merged
merged 2 commits into from
Dec 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
criterion = { version = "0.5.1", features = ["html_reports"] }
axum = "0.7.2"
salvo = { version = "0.63.0", features = ["tower-compat"] }
rust_socketio = { version = "0.4.2", features = ["async"] }

[workspace.package]
version = "0.9.1"
Expand Down
1 change: 1 addition & 0 deletions socketioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ engineioxide = { path = "../engineioxide", features = [
"test-utils",
] }
tokio-tungstenite.workspace = true
rust_socketio.workspace = true
axum.workspace = true
salvo.workspace = true
tokio = { workspace = true, features = [
Expand Down
152 changes: 76 additions & 76 deletions socketioxide/tests/extractors.rs
Original file line number Diff line number Diff line change
@@ -1,99 +1,99 @@
use futures::SinkExt;
use socketioxide::extract::{Data, SocketRef, State};
//! Tests for extractors
use serde_json::json;
use socketioxide::extract::{Data, SocketRef, State, TryData};
use tokio::sync::mpsc;

use fixture::{create_server, create_server_with_state};

use crate::fixture::socketio_client;

mod fixture;
use fixture::{create_server, create_server_with_state, create_ws_connection};
use tokio_tungstenite::tungstenite::Message;
mod utils;

#[tokio::test]
pub async fn state_extractor() {
const PORT: u16 = 2000;
let state = 1112i32;
let io = create_server_with_state(2000, state).await;
let io = create_server_with_state(PORT, state).await;
let (tx, mut rx) = mpsc::channel::<i32>(4);
io.ns("/", move |socket: SocketRef, state: State<i32>| {
println!("Socket connected on / namespace with id: {}", socket.id);
tx.try_send(*state).unwrap();

let tx1 = tx.clone();
let tx2 = tx.clone();
let tx3 = tx.clone();
assert_ok!(tx.try_send(*state));
socket.on("test", move |State(state): State<i32>| {
println!("test event received");
tx.try_send(*state).unwrap();
});
socket.on("async_test", move |State(state): State<i32>| async move {
println!("async_test event received");
tx2.try_send(*state).unwrap();
});
// This handler should not be called
socket.on("ko_test", move |State(_): State<String>| {
println!("ko_test event received");
tx3.try_send(1213231).unwrap();
});
socket.on_disconnect(move |State(state): State<i32>| {
println!("close event received");
tx1.try_send(*state).unwrap();
assert_ok!(tx.try_send(*state))
});
});

let mut stream = create_ws_connection(2000).await;
stream
.send(Message::Text("42[\"test\", 1]".to_string()))
.await
.unwrap();
stream
.send(Message::Text("42[\"async_test\", 2]".to_string()))
.await
.unwrap();
stream
.send(Message::Text("42[\"ko_test\", 2]".to_string()))
.await
.unwrap();
let client = assert_ok!(socketio_client(PORT, ()).await);
assert_eq!(rx.recv().await.unwrap(), state);

assert_ok!(client.emit("test", json!("foo")).await);
assert_eq!(rx.recv().await.unwrap(), state);
assert_eq!(rx.recv().await.unwrap(), state);
stream.close(None).await.unwrap();
assert_eq!(rx.recv().await.unwrap(), state);

assert_ok!(client.disconnect().await);
}

#[tokio::test]
pub async fn data_extractor() {
let io = create_server(2001).await;
let (tx, mut rx) = mpsc::channel::<i32>(4);
io.ns("/", move |socket: SocketRef| {
println!("Socket connected on / namespace with id: {}", socket.id);
let tx1 = tx.clone();
let tx2 = tx.clone();
socket.on("test", move |Data(data): Data<i32>| {
println!("test event received");
tx1.try_send(data).unwrap();
});
socket.on("async_test", move |Data(data): Data<i32>| async move {
println!("async_test event received");
tx2.try_send(data).unwrap();
const PORT: u16 = 2001;
let io = create_server(PORT).await;
let (tx, mut rx) = mpsc::channel::<String>(4);
let tx1 = tx.clone();
io.ns("/", move |socket: SocketRef, Data(data): Data<String>| {
assert_ok!(tx.try_send(data));
socket.on("test", move |Data(data): Data<String>| {
assert_ok!(tx.try_send(data));
});
// This handler should not be called
socket.on("ko_test", move |Data(_): Data<String>| {
println!("ko_test event received");
tx.try_send(1213231).unwrap();
});

assert_ok!(socketio_client(PORT, ()).await);
assert_ok!(socketio_client(PORT, 1321).await);

// Capacity should be the same as the handler should not be called
assert_eq!(tx1.capacity(), 4);

let client = assert_ok!(socketio_client(PORT, "foo").await);
assert_eq!(rx.recv().await.unwrap(), "foo");

assert_ok!(client.emit("test", json!("oof")).await);
assert_eq!(rx.recv().await.unwrap(), "oof");

assert_ok!(client.emit("test", json!({ "test": 132 })).await);
// Capacity should be the same as the handler should not be called
assert_eq!(tx1.capacity(), 4);

assert_ok!(client.disconnect().await);
}

#[tokio::test]
pub async fn try_data_extractor() {
const PORT: u16 = 2002;
let io = create_server(PORT).await;
let (tx, mut rx) = mpsc::channel::<Result<String, serde_json::Error>>(4);
io.ns("/", move |s: SocketRef, TryData(data): TryData<String>| {
assert_ok!(tx.try_send(data));
s.on("test", move |TryData(data): TryData<String>| {
assert_ok!(tx.try_send(data));
});
});

let mut stream = create_ws_connection(2001).await;
stream
.send(Message::Text("42[\"test\", 1]".to_string()))
.await
.unwrap();
stream
.send(Message::Text("42[\"async_test\", 2]".to_string()))
.await
.unwrap();
stream
.send(Message::Text("42[\"ko_test\", 2]".to_string()))
.await
.unwrap();
assert_eq!(rx.recv().await.unwrap(), 1);
assert_eq!(rx.recv().await.unwrap(), 2);
stream.close(None).await.unwrap();
// Non deserializable data
assert_ok!(socketio_client(PORT, ()).await);
assert_err!(rx.recv().await.unwrap());

// Non deserializable data
assert_ok!(socketio_client(PORT, 1321).await);
assert_err!(rx.recv().await.unwrap());

let client = assert_ok!(socketio_client(PORT, "foo").await);
let res = assert_ok!(rx.recv().await.unwrap());
assert_eq!(res, "foo");

assert_ok!(client.emit("test", json!("oof")).await);
let res = assert_ok!(rx.recv().await.unwrap());
assert_eq!(res, "oof");

// Non deserializable data
assert_ok!(client.emit("test", json!({ "test": 132 })).await);
assert_err!(rx.recv().await.unwrap());

assert_ok!(client.disconnect().await);
}
32 changes: 31 additions & 1 deletion socketioxide/tests/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@ use std::{
};

use engineioxide::service::NotFoundService;
use futures::SinkExt;
use futures::{future::BoxFuture, SinkExt};
use http::Request;
use http_body_util::{BodyExt, Either, Empty, Full};
use hyper::server::conn::http1;
use hyper_util::{
client::legacy::Client,
rt::{TokioExecutor, TokioIo},
};
use rust_socketio::{
asynchronous::{Client as SocketIoClient, ClientBuilder},
Payload,
};
use serde::{Deserialize, Serialize};
use socketioxide::{adapter::LocalAdapter, service::SocketIoService, SocketIo};
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};

/// An OpenPacket is used to initiate a connection
#[derive(Debug, Serialize, Deserialize, PartialEq, PartialOrd)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -120,6 +125,31 @@ pub async fn create_server(port: u16) -> SocketIo {
io
}

pub async fn socketio_client_with_handler<F>(
port: u16,
event: &str,
callback: F,
auth: impl Into<serde_json::Value>,
) -> Result<SocketIoClient, rust_socketio::Error>
where
F: FnMut(Payload, SocketIoClient) -> BoxFuture<'static, ()> + Send + Sync + 'static,
{
ClientBuilder::new(format!("http://127.0.0.1:{}", port))
.on(event, callback)
.auth(auth)
.connect()
.await
}
pub async fn socketio_client(
port: u16,
auth: impl Into<serde_json::Value>,
) -> Result<SocketIoClient, rust_socketio::Error> {
ClientBuilder::new(format!("http://127.0.0.1:{}", port))
.auth(auth)
.connect()
.await
}

async fn spawn_server(port: u16, svc: SocketIoService<NotFoundService, LocalAdapter>) {
let addr = &SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let listener = TcpListener::bind(&addr).await.unwrap();
Expand Down
43 changes: 43 additions & 0 deletions socketioxide/tests/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#![allow(dead_code)]

#[macro_export]
macro_rules! assert_ok {
($e:expr) => {
assert_ok!($e,)
};
($e:expr,) => {{
use std::result::Result::*;
match $e {
Ok(v) => v,
Err(e) => panic!("assertion failed: Err({:?})", e),
}
}};
($e:expr, $($arg:tt)+) => {{
use std::result::Result::*;
match $e {
Ok(v) => v,
Err(e) => panic!("assertion failed: Err({:?}): {}", e, format_args!($($arg)+)),
}
}};
}

#[macro_export]
macro_rules! assert_err {
($e:expr) => {
assert_err!($e,);
};
($e:expr,) => {{
use std::result::Result::*;
match $e {
Ok(v) => panic!("assertion failed: Ok({:?})", v),
Err(e) => e,
}
}};
($e:expr, $($arg:tt)+) => {{
use std::result::Result::*;
match $e {
Ok(v) => panic!("assertion failed: Ok({:?}): {}", v, format_args!($($arg)+)),
Err(e) => e,
}
}};
}