Skip to content

Commit

Permalink
tests(socketio): add integration tests for DisconnectReason
Browse files Browse the repository at this point in the history
  • Loading branch information
Totodore committed Sep 17, 2023
1 parent 6605011 commit 6efd60d
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion engineioxide/tests/disconnect_reason.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
//! Tests for disconnect reasons
//! Test are made on polling and websocket transports:
//! * Heartbeat timeout
//! * Transport close
//! * Multiple http polling
//! * Packet parsing

use std::time::Duration;

use engineioxide::{
Expand Down Expand Up @@ -129,7 +136,7 @@ pub async fn multiple_http_polling() {
),
]));

let data = tokio::time::timeout(Duration::from_millis(1), rx.recv())
let data = tokio::time::timeout(Duration::from_millis(10), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::DisconnectError::MultipleHttpPolling")
.unwrap();
Expand Down
12 changes: 11 additions & 1 deletion socketioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,14 @@ dashmap = "5.4.0"

[dev-dependencies]
axum = "0.6.18"
tracing-subscriber = "0.3.17"
tokio = { version = "1.26.0", features = ["macros", "parking_lot"] }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
tokio-tungstenite = "0.20.0"
hyper = { version = "0.14.25", features = [
"http1",
"http2",
"server",
"stream",
"runtime",
"client",
] }
236 changes: 236 additions & 0 deletions socketioxide/tests/disconnect_reason.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
//! Tests for disconnect reasons
//! Test are made on polling and websocket transports for engine.io errors and only websocket for socket.io errors:
//! * Heartbeat timeout
//! * Transport close
//! * Multiple http polling
//! * Packet parsing
//!
//! * Client namespace disconnect
//! * Server namespace disconnect

use std::time::Duration;

use futures::SinkExt;
use socketioxide::{adapter::LocalAdapter, DisconnectReason, Namespace, NsHandlers};
use tokio::sync::mpsc;

mod fixture;

use fixture::{create_server, send_req};
use tokio_tungstenite::tungstenite::Message;

use crate::fixture::{create_polling_connection, create_ws_connection};

fn create_handler() -> (NsHandlers<LocalAdapter>, mpsc::Receiver<DisconnectReason>) {
let (tx, rx) = mpsc::channel::<DisconnectReason>(1);
let ns = Namespace::builder()
.add("/", move |socket| {
println!("Socket connected on / namespace with id: {}", socket.sid);
let tx = tx.clone();
socket.on_disconnect(move |socket, reason| {
println!("Socket.IO disconnected: {} {}", socket.sid, reason);
tx.try_send(reason).unwrap();
async move {}
});

async move {}
})
.build();
(ns, rx)
}

// Engine IO Disconnect Reason Tests

#[tokio::test]
pub async fn polling_heartbeat_timeout() {
let (ns, mut rx) = create_handler();
create_server(ns, 1234);
create_polling_connection(1234).await;

let data = tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::HeartbeatTimeout")
.unwrap();

assert_eq!(data, DisconnectReason::HeartbeatTimeout);
}

#[tokio::test]
pub async fn ws_heartbeat_timeout() {
let (ns, mut rx) = create_handler();
create_server(ns, 12344);
let _stream = create_ws_connection(12344).await;

let data = tokio::time::timeout(Duration::from_millis(500), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::HeartbeatTimeout")
.unwrap();

assert_eq!(data, DisconnectReason::HeartbeatTimeout);
}

#[tokio::test]
pub async fn polling_transport_closed() {
let (ns, mut rx) = create_handler();
create_server(ns, 1235);
let sid = create_polling_connection(1235).await;

send_req(
1235,
format!("transport=polling&sid={sid}"),
http::Method::POST,
Some("1".into()),
)
.await;

let data = tokio::time::timeout(Duration::from_millis(1), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::TransportClose")
.unwrap();

assert_eq!(data, DisconnectReason::TransportClose);
}

#[tokio::test]
pub async fn ws_transport_closed() {
let (ns, mut rx) = create_handler();
create_server(ns, 12345);
let mut stream = create_ws_connection(12345).await;

stream.send(Message::Text("1".into())).await.unwrap();

let data = tokio::time::timeout(Duration::from_millis(1), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::TransportClose")
.unwrap();

assert_eq!(data, DisconnectReason::TransportClose);
}

#[tokio::test]
pub async fn multiple_http_polling() {
let (ns, mut rx) = create_handler();
create_server(ns, 1236);
let sid = create_polling_connection(1236).await;

// First request to flush the server buffer containing the open packet
send_req(
1236,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
)
.await;

tokio::spawn(futures::future::join_all(vec![
send_req(
1236,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
),
send_req(
1236,
format!("transport=polling&sid={sid}"),
http::Method::GET,
None,
),
]));

let data = tokio::time::timeout(Duration::from_millis(10), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::DisconnectError::MultipleHttpPolling")
.unwrap();

assert_eq!(data, DisconnectReason::MultipleHttpPollingError);
}

#[tokio::test]
pub async fn polling_packet_parsing() {
let (ns, mut rx) = create_handler();
create_server(ns, 1237);
let sid = create_polling_connection(1237).await;
send_req(
1237,
format!("transport=polling&sid={sid}"),
http::Method::POST,
Some("aizdunazidaubdiz".into()),
)
.await;

let data = tokio::time::timeout(Duration::from_millis(1), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::PacketParsingError")
.unwrap();

assert_eq!(data, DisconnectReason::PacketParsingError);
}

#[tokio::test]
pub async fn ws_packet_parsing() {
let (ns, mut rx) = create_handler();
create_server(ns, 12347);
let mut stream = create_ws_connection(12347).await;
stream
.send(Message::Text("aizdunazidaubdiz".into()))
.await
.unwrap();

let data = tokio::time::timeout(Duration::from_millis(1), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::PacketParsingError")
.unwrap();

assert_eq!(data, DisconnectReason::PacketParsingError);
}

// Socket IO Disconnect Reason Tests

#[tokio::test]
pub async fn client_ns_disconnect() {
let (ns, mut rx) = create_handler();
create_server(ns, 12348);
let mut stream = create_ws_connection(12348).await;

stream.send(Message::Text("41".into())).await.unwrap();

let data = tokio::time::timeout(Duration::from_millis(1), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::ClientNSDisconnect")
.unwrap();

assert_eq!(data, DisconnectReason::ClientNSDisconnect);
}

#[tokio::test]
pub async fn server_ns_disconnect() {
let (tx, mut rx) = mpsc::channel::<DisconnectReason>(1);
let ns = Namespace::builder()
.add("/", move |socket| {
println!("Socket connected on / namespace with id: {}", socket.sid);
let sock = socket.clone();
let tx = tx.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
sock.disconnect().unwrap();
});

socket.on_disconnect(move |socket, reason| {
println!("Socket.IO disconnected: {} {}", socket.sid, reason);
tx.try_send(reason).unwrap();
async move {}
});

async move {}
})
.build();

create_server(ns, 12349);
let _stream = create_ws_connection(12349).await;

let data = tokio::time::timeout(Duration::from_millis(20), rx.recv())
.await
.expect("timeout waiting for DisconnectReason::ServerNSDisconnect")
.unwrap();
assert_eq!(data, DisconnectReason::ServerNSDisconnect);
}
92 changes: 92 additions & 0 deletions socketioxide/tests/fixture.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};

use futures::SinkExt;
use http::Request;
use hyper::{body::Buf, Server};
use serde::{Deserialize, Serialize};
use socketioxide::{adapter::LocalAdapter, NsHandlers, SocketIoConfig, SocketIoService};
use tokio::net::TcpStream;
use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};
/// An OpenPacket is used to initiate a connection
#[derive(Debug, Serialize, Deserialize, PartialEq, PartialOrd)]
#[serde(rename_all = "camelCase")]
struct OpenPacket {
sid: String,
upgrades: Vec<String>,
ping_interval: u64,
ping_timeout: u64,
max_payload: u64,
}

/// Params should be in the form of `key1=value1&key2=value2`
pub async fn send_req(
port: u16,
params: String,
method: http::Method,
body: Option<String>,
) -> String {
let body = body
.map(|b| hyper::Body::from(b))
.unwrap_or_else(hyper::Body::empty);
let req = Request::builder()
.method(method)
.uri(format!(
"http://127.0.0.1:{port}/socket.io/?EIO=4&{}",
params
))
.body(body)
.unwrap();
let mut res = hyper::Client::new().request(req).await.unwrap();
let body = hyper::body::aggregate(res.body_mut()).await.unwrap();
String::from_utf8(body.chunk().to_vec())
.unwrap()
.chars()
.skip(1)
.collect()
}

pub async fn create_polling_connection(port: u16) -> String {
let body = send_req(port, format!("transport=polling"), http::Method::GET, None).await;
let open_packet: OpenPacket = serde_json::from_str(&body).unwrap();

send_req(
port,
format!("transport=polling&sid={}", open_packet.sid),
http::Method::POST,
Some("40{}".to_string()),
)
.await;

open_packet.sid
}
pub async fn create_ws_connection(port: u16) -> WebSocketStream<MaybeTlsStream<TcpStream>> {
let mut ws = tokio_tungstenite::connect_async(format!(
"ws://127.0.0.1:{port}/socket.io/?EIO=4&transport=websocket"
))
.await
.unwrap()
.0;

ws.send(Message::Text("40{}".to_string())).await.unwrap();

ws
}

pub fn create_server(ns: NsHandlers<LocalAdapter>, port: u16) {
let config = SocketIoConfig::builder()
.ping_interval(Duration::from_millis(300))
.ping_timeout(Duration::from_millis(200))
.max_payload(1e6 as u64)
.build();

let addr = &SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);

let svc = SocketIoService::with_config(ns, config);

let server = Server::bind(addr).serve(svc.into_make_service());

tokio::spawn(server);
}

0 comments on commit 6efd60d

Please sign in to comment.