From 6efd60df3d485c9ce1cc1dac3cbbf87220d96e66 Mon Sep 17 00:00:00 2001 From: Totodore Date: Sun, 17 Sep 2023 16:01:17 +0200 Subject: [PATCH] tests(socketio): add integration tests for `DisconnectReason` --- Cargo.lock | 2 + engineioxide/tests/disconnect_reason.rs | 9 +- socketioxide/Cargo.toml | 12 +- socketioxide/tests/disconnect_reason.rs | 236 ++++++++++++++++++++++++ socketioxide/tests/fixture.rs | 92 +++++++++ 5 files changed, 349 insertions(+), 2 deletions(-) create mode 100644 socketioxide/tests/disconnect_reason.rs create mode 100644 socketioxide/tests/fixture.rs diff --git a/Cargo.lock b/Cargo.lock index 46057a97..014334bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1264,11 +1264,13 @@ dependencies = [ "futures", "http", "http-body", + "hyper", "itertools 0.11.0", "serde", "serde_json", "thiserror", "tokio", + "tokio-tungstenite 0.20.0", "tower", "tower-http", "tracing", diff --git a/engineioxide/tests/disconnect_reason.rs b/engineioxide/tests/disconnect_reason.rs index 441f998d..0fceaa2a 100644 --- a/engineioxide/tests/disconnect_reason.rs +++ b/engineioxide/tests/disconnect_reason.rs @@ -1,3 +1,10 @@ +//! Tests for disconnect reasons +//! Test are made on polling and websocket transports: +//! * Heartbeat timeout +//! * Transport close +//! * Multiple http polling +//! * Packet parsing + use std::time::Duration; use engineioxide::{ @@ -129,7 +136,7 @@ pub async fn multiple_http_polling() { ), ])); - let data = tokio::time::timeout(Duration::from_millis(1), rx.recv()) + let data = tokio::time::timeout(Duration::from_millis(10), rx.recv()) .await .expect("timeout waiting for DisconnectReason::DisconnectError::MultipleHttpPolling") .unwrap(); diff --git a/socketioxide/Cargo.toml b/socketioxide/Cargo.toml index d4e4e6b0..1cdc8961 100644 --- a/socketioxide/Cargo.toml +++ b/socketioxide/Cargo.toml @@ -34,4 +34,14 @@ dashmap = "5.4.0" [dev-dependencies] axum = "0.6.18" -tracing-subscriber = "0.3.17" +tokio = { version = "1.26.0", features = ["macros", "parking_lot"] } +tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } +tokio-tungstenite = "0.20.0" +hyper = { version = "0.14.25", features = [ + "http1", + "http2", + "server", + "stream", + "runtime", + "client", +] } diff --git a/socketioxide/tests/disconnect_reason.rs b/socketioxide/tests/disconnect_reason.rs new file mode 100644 index 00000000..035ac82c --- /dev/null +++ b/socketioxide/tests/disconnect_reason.rs @@ -0,0 +1,236 @@ +//! Tests for disconnect reasons +//! Test are made on polling and websocket transports for engine.io errors and only websocket for socket.io errors: +//! * Heartbeat timeout +//! * Transport close +//! * Multiple http polling +//! * Packet parsing +//! +//! * Client namespace disconnect +//! * Server namespace disconnect + +use std::time::Duration; + +use futures::SinkExt; +use socketioxide::{adapter::LocalAdapter, DisconnectReason, Namespace, NsHandlers}; +use tokio::sync::mpsc; + +mod fixture; + +use fixture::{create_server, send_req}; +use tokio_tungstenite::tungstenite::Message; + +use crate::fixture::{create_polling_connection, create_ws_connection}; + +fn create_handler() -> (NsHandlers, mpsc::Receiver) { + let (tx, rx) = mpsc::channel::(1); + let ns = Namespace::builder() + .add("/", move |socket| { + println!("Socket connected on / namespace with id: {}", socket.sid); + let tx = tx.clone(); + socket.on_disconnect(move |socket, reason| { + println!("Socket.IO disconnected: {} {}", socket.sid, reason); + tx.try_send(reason).unwrap(); + async move {} + }); + + async move {} + }) + .build(); + (ns, rx) +} + +// Engine IO Disconnect Reason Tests + +#[tokio::test] +pub async fn polling_heartbeat_timeout() { + let (ns, mut rx) = create_handler(); + create_server(ns, 1234); + create_polling_connection(1234).await; + + let data = tokio::time::timeout(Duration::from_millis(500), rx.recv()) + .await + .expect("timeout waiting for DisconnectReason::HeartbeatTimeout") + .unwrap(); + + assert_eq!(data, DisconnectReason::HeartbeatTimeout); +} + +#[tokio::test] +pub async fn ws_heartbeat_timeout() { + let (ns, mut rx) = create_handler(); + create_server(ns, 12344); + let _stream = create_ws_connection(12344).await; + + let data = tokio::time::timeout(Duration::from_millis(500), rx.recv()) + .await + .expect("timeout waiting for DisconnectReason::HeartbeatTimeout") + .unwrap(); + + assert_eq!(data, DisconnectReason::HeartbeatTimeout); +} + +#[tokio::test] +pub async fn polling_transport_closed() { + let (ns, mut rx) = create_handler(); + create_server(ns, 1235); + let sid = create_polling_connection(1235).await; + + send_req( + 1235, + format!("transport=polling&sid={sid}"), + http::Method::POST, + Some("1".into()), + ) + .await; + + let data = tokio::time::timeout(Duration::from_millis(1), rx.recv()) + .await + .expect("timeout waiting for DisconnectReason::TransportClose") + .unwrap(); + + assert_eq!(data, DisconnectReason::TransportClose); +} + +#[tokio::test] +pub async fn ws_transport_closed() { + let (ns, mut rx) = create_handler(); + create_server(ns, 12345); + let mut stream = create_ws_connection(12345).await; + + stream.send(Message::Text("1".into())).await.unwrap(); + + let data = tokio::time::timeout(Duration::from_millis(1), rx.recv()) + .await + .expect("timeout waiting for DisconnectReason::TransportClose") + .unwrap(); + + assert_eq!(data, DisconnectReason::TransportClose); +} + +#[tokio::test] +pub async fn multiple_http_polling() { + let (ns, mut rx) = create_handler(); + create_server(ns, 1236); + let sid = create_polling_connection(1236).await; + + // First request to flush the server buffer containing the open packet + send_req( + 1236, + format!("transport=polling&sid={sid}"), + http::Method::GET, + None, + ) + .await; + + tokio::spawn(futures::future::join_all(vec![ + send_req( + 1236, + format!("transport=polling&sid={sid}"), + http::Method::GET, + None, + ), + send_req( + 1236, + format!("transport=polling&sid={sid}"), + http::Method::GET, + None, + ), + ])); + + let data = tokio::time::timeout(Duration::from_millis(10), rx.recv()) + .await + .expect("timeout waiting for DisconnectReason::DisconnectError::MultipleHttpPolling") + .unwrap(); + + assert_eq!(data, DisconnectReason::MultipleHttpPollingError); +} + +#[tokio::test] +pub async fn polling_packet_parsing() { + let (ns, mut rx) = create_handler(); + create_server(ns, 1237); + let sid = create_polling_connection(1237).await; + send_req( + 1237, + format!("transport=polling&sid={sid}"), + http::Method::POST, + Some("aizdunazidaubdiz".into()), + ) + .await; + + let data = tokio::time::timeout(Duration::from_millis(1), rx.recv()) + .await + .expect("timeout waiting for DisconnectReason::PacketParsingError") + .unwrap(); + + assert_eq!(data, DisconnectReason::PacketParsingError); +} + +#[tokio::test] +pub async fn ws_packet_parsing() { + let (ns, mut rx) = create_handler(); + create_server(ns, 12347); + let mut stream = create_ws_connection(12347).await; + stream + .send(Message::Text("aizdunazidaubdiz".into())) + .await + .unwrap(); + + let data = tokio::time::timeout(Duration::from_millis(1), rx.recv()) + .await + .expect("timeout waiting for DisconnectReason::PacketParsingError") + .unwrap(); + + assert_eq!(data, DisconnectReason::PacketParsingError); +} + +// Socket IO Disconnect Reason Tests + +#[tokio::test] +pub async fn client_ns_disconnect() { + let (ns, mut rx) = create_handler(); + create_server(ns, 12348); + let mut stream = create_ws_connection(12348).await; + + stream.send(Message::Text("41".into())).await.unwrap(); + + let data = tokio::time::timeout(Duration::from_millis(1), rx.recv()) + .await + .expect("timeout waiting for DisconnectReason::ClientNSDisconnect") + .unwrap(); + + assert_eq!(data, DisconnectReason::ClientNSDisconnect); +} + +#[tokio::test] +pub async fn server_ns_disconnect() { + let (tx, mut rx) = mpsc::channel::(1); + let ns = Namespace::builder() + .add("/", move |socket| { + println!("Socket connected on / namespace with id: {}", socket.sid); + let sock = socket.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(10)).await; + sock.disconnect().unwrap(); + }); + + socket.on_disconnect(move |socket, reason| { + println!("Socket.IO disconnected: {} {}", socket.sid, reason); + tx.try_send(reason).unwrap(); + async move {} + }); + + async move {} + }) + .build(); + + create_server(ns, 12349); + let _stream = create_ws_connection(12349).await; + + let data = tokio::time::timeout(Duration::from_millis(20), rx.recv()) + .await + .expect("timeout waiting for DisconnectReason::ServerNSDisconnect") + .unwrap(); + assert_eq!(data, DisconnectReason::ServerNSDisconnect); +} diff --git a/socketioxide/tests/fixture.rs b/socketioxide/tests/fixture.rs new file mode 100644 index 00000000..e6a675ec --- /dev/null +++ b/socketioxide/tests/fixture.rs @@ -0,0 +1,92 @@ +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + time::Duration, +}; + +use futures::SinkExt; +use http::Request; +use hyper::{body::Buf, Server}; +use serde::{Deserialize, Serialize}; +use socketioxide::{adapter::LocalAdapter, NsHandlers, SocketIoConfig, SocketIoService}; +use tokio::net::TcpStream; +use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream}; +/// An OpenPacket is used to initiate a connection +#[derive(Debug, Serialize, Deserialize, PartialEq, PartialOrd)] +#[serde(rename_all = "camelCase")] +struct OpenPacket { + sid: String, + upgrades: Vec, + ping_interval: u64, + ping_timeout: u64, + max_payload: u64, +} + +/// Params should be in the form of `key1=value1&key2=value2` +pub async fn send_req( + port: u16, + params: String, + method: http::Method, + body: Option, +) -> String { + let body = body + .map(|b| hyper::Body::from(b)) + .unwrap_or_else(hyper::Body::empty); + let req = Request::builder() + .method(method) + .uri(format!( + "http://127.0.0.1:{port}/socket.io/?EIO=4&{}", + params + )) + .body(body) + .unwrap(); + let mut res = hyper::Client::new().request(req).await.unwrap(); + let body = hyper::body::aggregate(res.body_mut()).await.unwrap(); + String::from_utf8(body.chunk().to_vec()) + .unwrap() + .chars() + .skip(1) + .collect() +} + +pub async fn create_polling_connection(port: u16) -> String { + let body = send_req(port, format!("transport=polling"), http::Method::GET, None).await; + let open_packet: OpenPacket = serde_json::from_str(&body).unwrap(); + + send_req( + port, + format!("transport=polling&sid={}", open_packet.sid), + http::Method::POST, + Some("40{}".to_string()), + ) + .await; + + open_packet.sid +} +pub async fn create_ws_connection(port: u16) -> WebSocketStream> { + let mut ws = tokio_tungstenite::connect_async(format!( + "ws://127.0.0.1:{port}/socket.io/?EIO=4&transport=websocket" + )) + .await + .unwrap() + .0; + + ws.send(Message::Text("40{}".to_string())).await.unwrap(); + + ws +} + +pub fn create_server(ns: NsHandlers, port: u16) { + let config = SocketIoConfig::builder() + .ping_interval(Duration::from_millis(300)) + .ping_timeout(Duration::from_millis(200)) + .max_payload(1e6 as u64) + .build(); + + let addr = &SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); + + let svc = SocketIoService::with_config(ns, config); + + let server = Server::bind(addr).serve(svc.into_make_service()); + + tokio::spawn(server); +}