From 0565fce21b9e98302756fd0866735c452a2857f9 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Thu, 9 Sep 2021 14:16:33 +0200 Subject: [PATCH 1/5] Reject overflowing connection with status code 429 --- test-utils/src/types.rs | 23 ++++++++--- ws-server/src/server.rs | 88 ++++++++++++++++++++++++----------------- ws-server/src/tests.rs | 9 +---- 3 files changed, 71 insertions(+), 49 deletions(-) diff --git a/test-utils/src/types.rs b/test-utils/src/types.rs index 49f98f9fa6..efafdddc35 100644 --- a/test-utils/src/types.rs +++ b/test-utils/src/types.rs @@ -88,8 +88,21 @@ impl std::fmt::Debug for WebSocketTestClient { } } +#[derive(Debug)] +pub enum WebSocketTestError { + Redirect, + RejectedWithStatusCode(u16), + Soketto(SokettoError) +} + +impl From for WebSocketTestError { + fn from(err: io::Error) -> Self { + WebSocketTestError::Soketto(SokettoError::Io(err)) + } +} + impl WebSocketTestClient { - pub async fn new(url: SocketAddr) -> Result { + pub async fn new(url: SocketAddr) -> Result { let socket = TcpStream::connect(url).await?; let mut client = handshake::Client::new(BufReader::new(BufWriter::new(socket.compat())), "test-client", "/"); match client.handshake().await { @@ -98,12 +111,12 @@ impl WebSocketTestClient { Ok(Self { tx, rx }) } Ok(handshake::ServerResponse::Redirect { .. }) => { - Err(SokettoError::Io(io::Error::new(io::ErrorKind::Other, "Redirection not supported in tests"))) + Err(WebSocketTestError::Redirect) } - Ok(handshake::ServerResponse::Rejected { .. }) => { - Err(SokettoError::Io(io::Error::new(io::ErrorKind::Other, "Rejected"))) + Ok(handshake::ServerResponse::Rejected { status_code }) => { + Err(WebSocketTestError::RejectedWithStatusCode(status_code)) } - Err(err) => Err(err), + Err(err) => Err(WebSocketTestError::Soketto(err)), } } diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index a421fb35ce..0d0803c2e8 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -86,13 +86,17 @@ impl Server { if connections.count() >= self.cfg.max_connections as usize { log::warn!("Too many connections. Try again in a while."); + connections.add(Box::pin(handshake(socket, Handshake::Reject { status_code: 429 }))); continue; } let methods = &methods; let cfg = &self.cfg; - connections.add(Box::pin(handshake(socket, id, methods, cfg, &stop_monitor))); + connections.add(Box::pin(handshake( + socket, + Handshake::Accept { conn_id: id, methods, cfg, stop_monitor: &stop_monitor }, + ))); id = id.wrapping_add(1); } @@ -139,49 +143,59 @@ impl<'a> Future for Incoming<'a> { } } -async fn handshake( - socket: tokio::net::TcpStream, - conn_id: ConnectionId, - methods: &Methods, - cfg: &Settings, - stop_monitor: &StopMonitor, -) -> Result<(), Error> { +enum Handshake<'a> { + Reject { status_code: u16 }, + Accept { conn_id: ConnectionId, methods: &'a Methods, cfg: &'a Settings, stop_monitor: &'a StopMonitor }, +} + +async fn handshake(socket: tokio::net::TcpStream, details: Handshake<'_>) -> Result<(), Error> { // For each incoming background_task we perform a handshake. let mut server = SokettoServer::new(BufReader::new(BufWriter::new(socket.compat()))); - let key = { - let req = server.receive_request().await?; - let host_check = cfg.allowed_hosts.verify("Host", Some(req.headers().host)); - let origin_check = cfg.allowed_origins.verify("Origin", req.headers().origin); - - host_check.and(origin_check).map(|()| req.key()) - }; - - match key { - Ok(key) => { - let accept = Response::Accept { key, protocol: None }; - server.send_response(&accept).await?; - } - Err(error) => { - let reject = Response::Reject { status_code: 403 }; + match details { + Handshake::Reject { status_code } => { + // Forced rejection, don't need to read anything from the socket + let reject = Response::Reject { status_code }; server.send_response(&reject).await?; - return Err(error); + Ok(()) } - } + Handshake::Accept { conn_id, methods, cfg, stop_monitor } => { + let key = { + let req = server.receive_request().await?; + let host_check = cfg.allowed_hosts.verify("Host", Some(req.headers().host)); + let origin_check = cfg.allowed_origins.verify("Origin", req.headers().origin); - let join_result = tokio::spawn(background_task( - server, - conn_id, - methods.clone(), - cfg.max_request_body_size, - stop_monitor.clone(), - )) - .await; - - match join_result { - Err(_) => Err(Error::Custom("Background task was aborted".into())), - Ok(result) => result, + host_check.and(origin_check).map(|()| req.key()) + }; + + match key { + Ok(key) => { + let accept = Response::Accept { key, protocol: None }; + server.send_response(&accept).await?; + } + Err(error) => { + let reject = Response::Reject { status_code: 403 }; + server.send_response(&reject).await?; + + return Err(error); + } + } + + let join_result = tokio::spawn(background_task( + server, + conn_id, + methods.clone(), + cfg.max_request_body_size, + stop_monitor.clone(), + )) + .await; + + match join_result { + Err(_) => Err(Error::Custom("Background task was aborted".into())), + Ok(result) => result, + } + } } } diff --git a/ws-server/src/tests.rs b/ws-server/src/tests.rs index ab8213aabe..4c9d39d2cc 100644 --- a/ws-server/src/tests.rs +++ b/ws-server/src/tests.rs @@ -31,7 +31,7 @@ use crate::{future::StopHandle, RpcModule, WsServerBuilder}; use anyhow::anyhow; use futures_util::FutureExt; use jsonrpsee_test_utils::helpers::*; -use jsonrpsee_test_utils::types::{Id, TestContext, WebSocketTestClient}; +use jsonrpsee_test_utils::types::{Id, TestContext, WebSocketTestClient, WebSocketTestError}; use jsonrpsee_test_utils::TimeoutFutureExt; use serde_json::Value as JsonValue; use std::fmt; @@ -203,12 +203,7 @@ async fn can_set_max_connections() { assert!(conn2.is_ok()); // Third connection is rejected assert!(conn3.is_err()); - - let err = match conn3 { - Err(soketto::handshake::Error::Io(err)) => err, - _ => panic!("Invalid error kind; expected std::io::Error"), - }; - assert_eq!(err.kind(), std::io::ErrorKind::ConnectionReset); + assert!(matches!(conn3, Err(WebSocketTestError::RejectedWithStatusCode(429)))); // Decrement connection count drop(conn2); From 1bb41512ad08d1eb887b27453e1eb78eed62def3 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Thu, 9 Sep 2021 14:30:43 +0200 Subject: [PATCH 2/5] fmt --- test-utils/src/types.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/test-utils/src/types.rs b/test-utils/src/types.rs index efafdddc35..c1af1c0a1d 100644 --- a/test-utils/src/types.rs +++ b/test-utils/src/types.rs @@ -92,7 +92,7 @@ impl std::fmt::Debug for WebSocketTestClient { pub enum WebSocketTestError { Redirect, RejectedWithStatusCode(u16), - Soketto(SokettoError) + Soketto(SokettoError), } impl From for WebSocketTestError { @@ -110,9 +110,7 @@ impl WebSocketTestClient { let (tx, rx) = client.into_builder().finish(); Ok(Self { tx, rx }) } - Ok(handshake::ServerResponse::Redirect { .. }) => { - Err(WebSocketTestError::Redirect) - } + Ok(handshake::ServerResponse::Redirect { .. }) => Err(WebSocketTestError::Redirect), Ok(handshake::ServerResponse::Rejected { status_code }) => { Err(WebSocketTestError::RejectedWithStatusCode(status_code)) } From b777d2ea1e85f5a0bcf3b1463a1cbbfb3a26a4c7 Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Thu, 9 Sep 2021 15:18:01 +0200 Subject: [PATCH 3/5] rename Handshake -> HandshakeMode for clarity; verbose test --- ws-server/src/server.rs | 14 +++++++------- ws-server/src/tests.rs | 4 +++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 0d0803c2e8..53789b99f8 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -86,7 +86,7 @@ impl Server { if connections.count() >= self.cfg.max_connections as usize { log::warn!("Too many connections. Try again in a while."); - connections.add(Box::pin(handshake(socket, Handshake::Reject { status_code: 429 }))); + connections.add(Box::pin(handshake(socket, HandshakeMode::Reject { status_code: 429 }))); continue; } @@ -95,7 +95,7 @@ impl Server { connections.add(Box::pin(handshake( socket, - Handshake::Accept { conn_id: id, methods, cfg, stop_monitor: &stop_monitor }, + HandshakeMode::Accept { conn_id: id, methods, cfg, stop_monitor: &stop_monitor }, ))); id = id.wrapping_add(1); @@ -143,24 +143,24 @@ impl<'a> Future for Incoming<'a> { } } -enum Handshake<'a> { +enum HandshakeMode<'a> { Reject { status_code: u16 }, Accept { conn_id: ConnectionId, methods: &'a Methods, cfg: &'a Settings, stop_monitor: &'a StopMonitor }, } -async fn handshake(socket: tokio::net::TcpStream, details: Handshake<'_>) -> Result<(), Error> { +async fn handshake(socket: tokio::net::TcpStream, mode: HandshakeMode<'_>) -> Result<(), Error> { // For each incoming background_task we perform a handshake. let mut server = SokettoServer::new(BufReader::new(BufWriter::new(socket.compat()))); - match details { - Handshake::Reject { status_code } => { + match mode { + HandshakeMode::Reject { status_code } => { // Forced rejection, don't need to read anything from the socket let reject = Response::Reject { status_code }; server.send_response(&reject).await?; Ok(()) } - Handshake::Accept { conn_id, methods, cfg, stop_monitor } => { + HandshakeMode::Accept { conn_id, methods, cfg, stop_monitor } => { let key = { let req = server.receive_request().await?; let host_check = cfg.allowed_hosts.verify("Host", Some(req.headers().host)); diff --git a/ws-server/src/tests.rs b/ws-server/src/tests.rs index 4c9d39d2cc..bf2f35d29f 100644 --- a/ws-server/src/tests.rs +++ b/ws-server/src/tests.rs @@ -203,7 +203,9 @@ async fn can_set_max_connections() { assert!(conn2.is_ok()); // Third connection is rejected assert!(conn3.is_err()); - assert!(matches!(conn3, Err(WebSocketTestError::RejectedWithStatusCode(429)))); + if !matches!(conn3, Err(WebSocketTestError::RejectedWithStatusCode(429))) { + panic!("Expected RejectedWithStatusCode(429), got: {:#?}", conn3); + } // Decrement connection count drop(conn2); From a92c73d3d603cee200e8dcecda2eb0d2966279ab Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Thu, 9 Sep 2021 17:07:23 +0200 Subject: [PATCH 4/5] Gracefully shutdown after rejecting to hopefully fix the errors on windows --- ws-server/src/server.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 53789b99f8..9ffb03ba1b 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -35,7 +35,6 @@ use crate::types::{ }; use futures_channel::mpsc; use futures_util::io::{BufReader, BufWriter}; -// use futures_util::future::FutureExt; use futures_util::stream::StreamExt; use soketto::handshake::{server::Response, Server as SokettoServer}; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; @@ -158,6 +157,11 @@ async fn handshake(socket: tokio::net::TcpStream, mode: HandshakeMode<'_>) -> Re let reject = Response::Reject { status_code }; server.send_response(&reject).await?; + let (mut sender, _) = server.into_builder().finish(); + + // Gracefully shut down the connection + sender.close().await?; + Ok(()) } HandshakeMode::Accept { conn_id, methods, cfg, stop_monitor } => { From 960daa17d094af0c8bebbde4b3889acbeac9a00c Mon Sep 17 00:00:00 2001 From: Maciej Hirsz Date: Tue, 14 Sep 2021 08:21:36 +0200 Subject: [PATCH 5/5] HandshakeMode -> HandshakeResponse; tweak pending subscriptions on shutdown test --- tests/tests/integration_tests.rs | 14 +++++++++++--- ws-server/src/server.rs | 12 ++++++------ 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 87ed638de4..72e7dab58c 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -281,8 +281,16 @@ async fn ws_close_pending_subscription_when_server_terminated() { // no new request should be accepted. assert!(matches!(sub2, Err(_))); + // consume final message - assert!(matches!(sub.next().await, Ok(Some(_)))); - // the already established subscription should also be closed. - assert!(matches!(sub.next().await, Ok(None))); + for _ in 0..2 { + match sub.next().await { + // All good, exit test + Ok(None) => return, + // Try again + _ => continue, + } + } + + panic!("subscription keeps sending messages after server shutdown"); } diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 9ffb03ba1b..502a53735d 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -85,7 +85,7 @@ impl Server { if connections.count() >= self.cfg.max_connections as usize { log::warn!("Too many connections. Try again in a while."); - connections.add(Box::pin(handshake(socket, HandshakeMode::Reject { status_code: 429 }))); + connections.add(Box::pin(handshake(socket, HandshakeResponse::Reject { status_code: 429 }))); continue; } @@ -94,7 +94,7 @@ impl Server { connections.add(Box::pin(handshake( socket, - HandshakeMode::Accept { conn_id: id, methods, cfg, stop_monitor: &stop_monitor }, + HandshakeResponse::Accept { conn_id: id, methods, cfg, stop_monitor: &stop_monitor }, ))); id = id.wrapping_add(1); @@ -142,17 +142,17 @@ impl<'a> Future for Incoming<'a> { } } -enum HandshakeMode<'a> { +enum HandshakeResponse<'a> { Reject { status_code: u16 }, Accept { conn_id: ConnectionId, methods: &'a Methods, cfg: &'a Settings, stop_monitor: &'a StopMonitor }, } -async fn handshake(socket: tokio::net::TcpStream, mode: HandshakeMode<'_>) -> Result<(), Error> { +async fn handshake(socket: tokio::net::TcpStream, mode: HandshakeResponse<'_>) -> Result<(), Error> { // For each incoming background_task we perform a handshake. let mut server = SokettoServer::new(BufReader::new(BufWriter::new(socket.compat()))); match mode { - HandshakeMode::Reject { status_code } => { + HandshakeResponse::Reject { status_code } => { // Forced rejection, don't need to read anything from the socket let reject = Response::Reject { status_code }; server.send_response(&reject).await?; @@ -164,7 +164,7 @@ async fn handshake(socket: tokio::net::TcpStream, mode: HandshakeMode<'_>) -> Re Ok(()) } - HandshakeMode::Accept { conn_id, methods, cfg, stop_monitor } => { + HandshakeResponse::Accept { conn_id, methods, cfg, stop_monitor } => { let key = { let req = server.receive_request().await?; let host_check = cfg.allowed_hosts.verify("Host", Some(req.headers().host));