diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index 3cb8687ab7..2edfb0e4f5 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -131,9 +131,7 @@ pub async fn server_with_subscription_and_handle() -> (SocketAddr, ServerHandle) pub async fn server_with_subscription() -> SocketAddr { let (addr, handle) = server_with_subscription_and_handle().await; - tokio::spawn(handle.stopped()); - addr } diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 6f92780a60..d4041cf6ae 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -55,6 +55,8 @@ use tokio::time::interval; use tokio_stream::wrappers::IntervalStream; use tower_http::cors::CorsLayer; +use crate::helpers::server_with_sleeping_subscription; + #[tokio::test] async fn ws_subscription_works() { init_logger(); @@ -104,74 +106,40 @@ async fn ws_subscription_works_over_proxy_stream() { async fn ws_unsubscription_works() { init_logger(); - let server_addr = server_with_subscription().await; + let (tx, mut rx) = futures::channel::mpsc::channel(1); + let server_addr = server_with_sleeping_subscription(tx).await; let server_url = format!("ws://{}", server_addr); - let client = WsClientBuilder::default().max_concurrent_requests(1).build(&server_url).await.unwrap(); - - let mut sub: Subscription = - client.subscribe("subscribe_foo", rpc_params![], "unsubscribe_foo").await.unwrap(); + let client = WsClientBuilder::default().build(&server_url).await.unwrap(); - // It's technically possible to have race-conditions between the notifications and the unsubscribe message. - // So let's wait for the first notification and then unsubscribe. - let _item = sub.next().await.unwrap().unwrap(); + let sub: Subscription = + client.subscribe("subscribe_sleep", rpc_params![], "unsubscribe_sleep").await.unwrap(); sub.unsubscribe().await.unwrap(); - let mut success = false; - - // Wait until a slot is available, as only one concurrent call is allowed. - // Then when this finishes we know that unsubscribe call has been finished. - for _ in 0..30 { - let res: Result = client.request("say_hello", rpc_params![]).await; - if res.is_ok() { - success = true; - break; - } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - - assert!(success); + let res = rx.next().with_default_timeout().await.expect("Test must complete in 1 min"); + // When the subscription is closed a message is sent out on this channel. + assert!(res.is_some()); } #[tokio::test] async fn ws_unsubscription_works_over_proxy_stream() { init_logger(); - let server_addr = server_with_subscription().await; + let (tx, mut rx) = futures::channel::mpsc::channel(1); + let server_addr = server_with_sleeping_subscription(tx).await; let server_url = format!("ws://{}", server_addr); - let socks_stream = connect_over_socks_stream(server_addr).await; - let data_stream = DataStream::new(socks_stream); - - let client = WsClientBuilder::default() - .max_concurrent_requests(1) - .build_with_stream(&server_url, data_stream) - .await - .unwrap(); - - let mut sub: Subscription = - client.subscribe("subscribe_foo", rpc_params![], "unsubscribe_foo").await.unwrap(); + let stream = DataStream::new(connect_over_socks_stream(server_addr).await); + let client = WsClientBuilder::default().build_with_stream(&server_url, stream).await.unwrap(); - // It's technically possible to have race-conditions between the notifications and the unsubscribe message. - // So let's wait for the first notification and then unsubscribe. - let _item = sub.next().await.unwrap().unwrap(); + let sub: Subscription = + client.subscribe("subscribe_sleep", rpc_params![], "unsubscribe_sleep").await.unwrap(); sub.unsubscribe().await.unwrap(); - let mut success = false; - - // Wait until a slot is available, as only one concurrent call is allowed. - // Then when this finishes we know that unsubscribe call has been finished. - for _ in 0..30 { - let res: Result = client.request("say_hello", rpc_params![]).await; - if res.is_ok() { - success = true; - break; - } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } - - assert!(success); + let res = rx.next().with_default_timeout().await.expect("Test must complete in 1 min"); + // When the subscription is closed a message is sent out on this channel. + assert!(res.is_some()); } #[tokio::test]