Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(client): support batched notifications #1327

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,26 @@ async fn notification_handler_works() {
}
}

#[tokio::test]
async fn batched_notification_handler_works() {
let server = WebSocketTestServer::with_hardcoded_notification(
"127.0.0.1:0".parse().unwrap(),
server_batched_notification("test", "batched server originated notification works".into()),
)
.with_default_timeout()
.await
.unwrap();

let uri = to_ws_uri_string(server.local_addr());
let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap();
{
let mut nh: Subscription<String> =
client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap();
let response: String = nh.next().with_default_timeout().await.unwrap().unwrap().unwrap();
assert_eq!("batched server originated notification works".to_owned(), response);
}
}

#[tokio::test]
async fn notification_close_on_lagging() {
init_logger();
Expand Down
36 changes: 20 additions & 16 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,32 +767,36 @@ fn handle_backend_messages<R: TransportReceiverT>(
let mut batch = Vec::with_capacity(raw_responses.len());

let mut range = None;
let mut got_notif = false;

for r in raw_responses {
let Ok(response) = serde_json::from_str::<Response<_>>(r.get()) else {
if let Ok(response) = serde_json::from_str::<Response<_>>(r.get()) {
let id = response.id.try_parse_inner_as_number()?;
let result = ResponseSuccess::try_from(response).map(|s| s.result);
batch.push(InnerBatchResponse { id, result });

let r = range.get_or_insert(id..id);

if id < r.start {
r.start = id;
}

if id > r.end {
r.end = id;
}
} else if let Ok(notif) = serde_json::from_str::<Notification<_>>(r.get()) {
got_notif = true;
process_notification(&mut manager.lock(), notif);
} else {
return Err(unparse_error(raw));
};

let id = response.id.try_parse_inner_as_number()?;
let result = ResponseSuccess::try_from(response).map(|s| s.result);
batch.push(InnerBatchResponse { id, result });

let r = range.get_or_insert(id..id);

if id < r.start {
r.start = id;
}

if id > r.end {
r.end = id;
}
}

if let Some(mut range) = range {
// the range is exclusive so need to add one.
range.end += 1;
process_batch_response(&mut manager.lock(), batch, range)?;
} else {
} else if !got_notif {
return Err(EmptyBatchRequest.into());
}
} else {
Expand Down
5 changes: 5 additions & 0 deletions test-utils/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ pub fn server_notification(method: &str, params: Value) -> String {
format!(r#"{{"jsonrpc":"2.0","method":"{}", "params":{} }}"#, method, serde_json::to_string(&params).unwrap())
}

/// Batched server originated notification
pub fn server_batched_notification(method: &str, params: Value) -> String {
format!(r#"[{{"jsonrpc":"2.0","method":"{}", "params":{} }}]"#, method, serde_json::to_string(&params).unwrap())
}

pub async fn http_request(body: Body, uri: Uri) -> Result<HttpResponse, String> {
let client = hyper::Client::new();
http_post(client, body, uri).await
Expand Down
Loading