Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unsubscription test #1244

Merged
merged 4 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ pub async fn server_with_subscription_and_handle() -> (SocketAddr, ServerHandle)

pub async fn server_with_subscription() -> SocketAddr {
let (addr, handle) = server_with_subscription_and_handle().await;

tokio::spawn(handle.stopped());

addr
}

Expand Down
70 changes: 19 additions & 51 deletions tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
use tower_http::cors::CorsLayer;

use crate::helpers::server_with_sleeping_subscription;

#[tokio::test]
async fn ws_subscription_works() {
init_logger();
Expand Down Expand Up @@ -104,74 +106,40 @@ async fn ws_subscription_works_over_proxy_stream() {
async fn ws_unsubscription_works() {
init_logger();

let server_addr = server_with_subscription().await;
let (tx, mut rx) = futures::channel::mpsc::channel(1);
let server_addr = server_with_sleeping_subscription(tx).await;
let server_url = format!("ws://{}", server_addr);
let client = WsClientBuilder::default().max_concurrent_requests(1).build(&server_url).await.unwrap();

let mut sub: Subscription<usize> =
client.subscribe("subscribe_foo", rpc_params![], "unsubscribe_foo").await.unwrap();
let client = WsClientBuilder::default().build(&server_url).await.unwrap();

// It's technically possible to have race-conditions between the notifications and the unsubscribe message.
// So let's wait for the first notification and then unsubscribe.
let _item = sub.next().await.unwrap().unwrap();
let sub: Subscription<usize> =
client.subscribe("subscribe_sleep", rpc_params![], "unsubscribe_sleep").await.unwrap();

sub.unsubscribe().await.unwrap();

let mut success = false;

// Wait until a slot is available, as only one concurrent call is allowed.
// Then when this finishes we know that unsubscribe call has been finished.
for _ in 0..30 {
let res: Result<String, _> = client.request("say_hello", rpc_params![]).await;
if res.is_ok() {
success = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

assert!(success);
// When the subscription is closed a message is sent out on this channel.
let res = rx.next().with_timeout(std::time::Duration::from_secs(10)).await.expect("Timeout 10 secs exceeded");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be a way to avoid relying on the sleep here? Might be possible that the CI will lag at some point and we'd exceed this?

Copy link
Member Author

@niklasad1 niklasad1 Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove it, it just a way to ensure that the test fails if the "future never completes".

The underlying subscription will run forever and can only be cancelled if the connection is closed or unsubscribed. https://github.com/paritytech/jsonrpsee/blob/d3a34030f04c43359e380a9938430d515c06d794/tests/tests/helpers.rs#L176-#L188

assert!(res.is_some());
}

#[tokio::test]
async fn ws_unsubscription_works_over_proxy_stream() {
init_logger();

let server_addr = server_with_subscription().await;
let (tx, mut rx) = futures::channel::mpsc::channel(1);
let server_addr = server_with_sleeping_subscription(tx).await;
let server_url = format!("ws://{}", server_addr);

let socks_stream = connect_over_socks_stream(server_addr).await;
let data_stream = DataStream::new(socks_stream);

let client = WsClientBuilder::default()
.max_concurrent_requests(1)
.build_with_stream(&server_url, data_stream)
.await
.unwrap();

let mut sub: Subscription<usize> =
client.subscribe("subscribe_foo", rpc_params![], "unsubscribe_foo").await.unwrap();
let stream = DataStream::new(connect_over_socks_stream(server_addr).await);
let client = WsClientBuilder::default().build_with_stream(&server_url, stream).await.unwrap();

// It's technically possible to have race-conditions between the notifications and the unsubscribe message.
// So let's wait for the first notification and then unsubscribe.
let _item = sub.next().await.unwrap().unwrap();
let sub: Subscription<usize> =
client.subscribe("subscribe_sleep", rpc_params![], "unsubscribe_sleep").await.unwrap();

sub.unsubscribe().await.unwrap();

let mut success = false;

// Wait until a slot is available, as only one concurrent call is allowed.
// Then when this finishes we know that unsubscribe call has been finished.
for _ in 0..30 {
let res: Result<String, _> = client.request("say_hello", rpc_params![]).await;
if res.is_ok() {
success = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

assert!(success);
// When the subscription is closed a message is sent out on this channel.
let res = rx.next().with_timeout(std::time::Duration::from_secs(10)).await.expect("Timeout 10 secs exceeded");
assert!(res.is_some());
}

#[tokio::test]
Expand Down
Loading