Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ws client]: add tests #134

Merged
merged 14 commits into from
Dec 15, 2020
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license = "MIT"
edition = "2018"

[dependencies]
async-std = { version = "1.7.0", features = ["attributes"] }
async-std = { version = "1.8.0", features = ["attributes"] }
bs58 = "0.4.0"
fnv = "1.0.7"
futures = "0.3.8"
Expand All @@ -16,12 +16,12 @@ hashbrown = "0.9.1"
lazy_static = "1.4.0"
log = "0.4.11"
parking_lot = "0.11.1"
pin-project = "1.0.1"
pin-project = "1.0.2"
jsonrpsee-proc-macros = { path = "proc-macros" }
rand = "0.7.3"
serde = { version = "1.0.117", default-features = false, features = ["derive"] }
serde_json = "1.0.59"
smallvec = { version = "1.5.0", default-features = false }
serde = { version = "1.0.118", default-features = false, features = ["derive"] }
serde_json = "1.0.60"
smallvec = { version = "1.5.1", default-features = false }
thiserror = "1.0.22"

# used for tests and HTTP/hyper
Expand All @@ -33,11 +33,11 @@ hyper = { version = "0.13.9", features = ["stream"], optional = true }
unicase = { version = "2.6.0", optional = true }

# WS-related dependencies
async-tls = { version = "0.10.0", optional = true }
async-tls = { version = "0.11.0", optional = true }
bytes = { version = "0.6.0", optional = true }
soketto = { version = "0.4.2", optional = true }
url = { version = "2.2.0", optional = true }
webpki = { version = "0.21.3", optional = true }
webpki = { version = "0.21.4", optional = true }

[features]
default = ["http", "ws"]
Expand All @@ -46,7 +46,7 @@ ws = ["async-tls", "bytes", "soketto", "url", "webpki"]

[dev-dependencies]
criterion = "0.3.3"
env_logger = "0.8.1"
env_logger = "0.8.2"
jsonrpsee-test-utils = { path = "test-utils" }
num_cpus = "1.13.0"

Expand Down
124 changes: 124 additions & 0 deletions src/client/ws/tests.rs
Original file line number Diff line number Diff line change
@@ -1 +1,125 @@
#![cfg(test)]

use crate::client::{WsClient, WsConfig, WsSubscription};
use crate::types::error::Error;
use crate::types::jsonrpc;

use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::{Id, WebSocketTestServer};

fn assert_error_response(response: Result<jsonrpc::JsonValue, Error>, code: jsonrpc::ErrorCode, message: String) {
let expected = jsonrpc::Error { code, message, data: None };
match response {
Err(Error::Request(err)) => {
assert_eq!(err, expected);
}
e @ _ => panic!("Expected error: \"{}\", got: {:?}", expected, e),
};
}

#[tokio::test]
async fn method_call_works() {
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
ok_response("hello".into(), Id::Num(0_u64)),
)
.await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let response: jsonrpc::JsonValue = client.request("say_hello", jsonrpc::Params::None).await.unwrap();
let exp = jsonrpc::JsonValue::String("hello".to_string());
assert_eq!(response, exp);
}

#[tokio::test]
async fn notif_works() {
// this empty string shouldn't be read because the server shouldn't respond to notifications.
let server = WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), String::new()).await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
assert!(client.notification("notif", jsonrpc::Params::None).await.is_ok());
}

#[tokio::test]
async fn method_not_found_works() {
let server =
WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), method_not_found(Id::Num(0_u64)))
.await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let response: Result<jsonrpc::JsonValue, Error> = client.request("say_hello", jsonrpc::Params::None).await;
assert_error_response(response, jsonrpc::ErrorCode::MethodNotFound, METHOD_NOT_FOUND.into());
}

#[tokio::test]
async fn parse_error_works() {
let server =
WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), parse_error(Id::Num(0_u64))).await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let response: Result<jsonrpc::JsonValue, Error> = client.request("say_hello", jsonrpc::Params::None).await;
assert_error_response(response, jsonrpc::ErrorCode::ParseError, PARSE_ERROR.into());
}

#[tokio::test]
async fn invalid_request_works() {
let server =
WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), invalid_request(Id::Num(0_u64)))
.await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let response: Result<jsonrpc::JsonValue, Error> = client.request("say_hello", jsonrpc::Params::None).await;
assert_error_response(response, jsonrpc::ErrorCode::InvalidRequest, INVALID_REQUEST.into());
}

#[tokio::test]
async fn invalid_params_works() {
let server =
WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), invalid_params(Id::Num(0_u64)))
.await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let response: Result<jsonrpc::JsonValue, Error> = client.request("say_hello", jsonrpc::Params::None).await;
assert_error_response(response, jsonrpc::ErrorCode::InvalidParams, INVALID_PARAMS.into());
}

#[tokio::test]
async fn internal_error_works() {
let server =
WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), internal_error(Id::Num(0_u64)))
.await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let response: Result<jsonrpc::JsonValue, Error> = client.request("say_hello", jsonrpc::Params::None).await;
assert_error_response(response, jsonrpc::ErrorCode::InternalError, INTERNAL_ERROR.into());
}

#[tokio::test]
async fn subscription_works() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE, this doesn't test that subscription is unsubscribed when the subscription is dropped.

The MockedServer needs then parse the responses and maintain the requests so I prefer to have the MockedServer as simple as possible for now.

let server = WebSocketTestServer::with_hardcoded_subscription(
"127.0.0.1:0".parse().unwrap(),
server_subscription_id_response(Id::Num(0)),
server_subscription_response(jsonrpc::JsonValue::String("hello my friend".to_owned())),
)
.await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
{
let mut sub: WsSubscription<String> =
client.subscribe("subscribe_hello", jsonrpc::Params::None, "unsubscribe_hello").await.unwrap();
let response: String = sub.next().await.unwrap().into();
assert_eq!("hello my friend".to_owned(), response);
}
}

#[tokio::test]
async fn response_with_wrong_id() {
env_logger::init();
let server =
WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), internal_error(Id::Num(99_u64)))
.await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let err: Result<jsonrpc::JsonValue, Error> = client.request("say_hello", jsonrpc::Params::None).await;
assert!(matches!(err, Err(Error::TransportError(e)) if e.to_string().contains("background task closed")));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is error is quite bad, we just terminate the background thread and the frontend will receive https://docs.rs/futures/0.3.8/futures/channel/oneshot/struct.Canceled.html.

We could introduce another oneshot channel to send error in or find the pending request/subscription and send an error on that channel.

}
6 changes: 4 additions & 2 deletions test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = "1.8.0"
futures = "0.3.8"
hyper = "0.13.9"
serde = { version = "1.0.117", default-features = false, features = ["derive"] }
serde_json = "1.0.59"
log = "0.4.11"
serde = { version = "1.0.118", default-features = false, features = ["derive"] }
serde_json = "1.0.60"
soketto = "0.4.2"
tokio = { version = "0.2.23", features = ["dns", "stream", "tcp", "rt-threaded", "macros"] }
tokio-util = { version = "0.3.1", features = ["compat"] }
18 changes: 18 additions & 0 deletions test-utils/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,24 @@ pub fn internal_error(id: Id) -> String {
)
}

/// Hardcoded server response when a client initiates a new subscription.
///
/// NOTE: works only for one subscription because the subscription ID is hardcoded.
pub fn server_subscription_id_response(id: Id) -> String {
format!(
r#"{{"jsonrpc":"2.0","result":"D3wwzU6vvoUUYehv4qoFzq42DZnLoAETeFzeyk8swH4o","id":{}}}"#,
serde_json::to_string(&id).unwrap()
)
}

/// Server response to a hardcoded pending subscription
pub fn server_subscription_response(result: Value) -> String {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these functions be const these days?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.serde.rs/serde_json/fn.to_string.html is not const and I think format! doesn't work in const context.

format!(
r#"{{"jsonrpc":"2.0","method":"bar","params":{{"subscription":"D3wwzU6vvoUUYehv4qoFzq42DZnLoAETeFzeyk8swH4o","result":{}}}}}"#,
serde_json::to_string(&result).unwrap()
)
}

pub async fn http_request(body: Body, uri: Uri) -> Result<HttpResponse, String> {
let client = hyper::Client::new();
let r = hyper::Request::post(uri)
Expand Down
2 changes: 2 additions & 0 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! Shared test helpers for JSONRPC v2.

#![recursion_limit = "256"]

pub mod helpers;
pub mod types;
148 changes: 148 additions & 0 deletions test-utils/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use futures::channel::mpsc::{self, Receiver, Sender};
use futures::future::FutureExt;
use futures::io::{BufReader, BufWriter};
use futures::sink::SinkExt;
use futures::stream::{self, StreamExt};
use serde::{Deserialize, Serialize};
use soketto::handshake;
use soketto::handshake::{server::Response, Server};
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio_util::compat::{Compat, Tokio02AsyncReadCompatExt};

Expand Down Expand Up @@ -68,3 +74,145 @@ impl WebSocketTestClient {
self.tx.close().await.map_err(Into::into)
}
}

#[derive(Debug, Clone)]
pub enum ServerMode {
// Send out a hardcoded response on every connection.
Response(String),
// Send out a subscription ID on a request and continuously send out data on the subscription.
Subscription { subscription_id: String, subscription_response: String },
}

/// JSONRPC v2 dummy WebSocket server that sends a hardcoded response.
pub struct WebSocketTestServer {
local_addr: SocketAddr,
exit: Sender<()>,
}

impl WebSocketTestServer {
// Spawns a dummy `JSONRPC v2` WebSocket server that sends out a pre-configured `hardcoded response` for every connection.
pub async fn with_hardcoded_response(sockaddr: SocketAddr, response: String) -> Self {
let listener = async_std::net::TcpListener::bind(sockaddr).await.unwrap();
let local_addr = listener.local_addr().unwrap();
let (tx, rx) = mpsc::channel::<()>(4);
tokio::spawn(server_backend(listener, rx, ServerMode::Response(response)));

Self { local_addr, exit: tx }
}

// Spawns a dummy `JSONRPC v2` WebSocket server that sends out a pre-configured subscription ID and subscription response.
//
// NOTE: ignores the actual subscription and unsubscription method.
pub async fn with_hardcoded_subscription(
sockaddr: SocketAddr,
subscription_id: String,
subscription_response: String,
) -> Self {
let listener = async_std::net::TcpListener::bind(sockaddr).await.unwrap();
let local_addr = listener.local_addr().unwrap();
let (tx, rx) = mpsc::channel::<()>(4);
tokio::spawn(server_backend(listener, rx, ServerMode::Subscription { subscription_id, subscription_response }));

Self { local_addr, exit: tx }
}

pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}

pub async fn close(&mut self) {
self.exit.send(()).await.unwrap();
}
}

async fn server_backend(listener: async_std::net::TcpListener, mut exit: Receiver<()>, mode: ServerMode) {
let mut connections = Vec::new();

loop {
let conn_fut = listener.accept().fuse();
let exit_fut = exit.next();
futures::pin_mut!(exit_fut, conn_fut);

futures::select! {
_ = exit_fut => break,
conn = conn_fut => {
if let Ok((stream, _)) = conn {
let (tx, rx) = mpsc::channel::<()>(4);
let handle = tokio::spawn(connection_task(stream, mode.clone(), rx));
connections.push((handle, tx));
}
}
}
}

// close connections
for (handle, mut exit) in connections {
// If the actual connection was never established i.e., returned early
// It will most likely be caught on the client-side but just to be explicit.
exit.send(()).await.expect("WebSocket connection was never established");
handle.await.unwrap();
}
}

async fn connection_task(socket: async_std::net::TcpStream, mode: ServerMode, mut exit: Receiver<()>) {
let mut server = Server::new(socket);

let websocket_key = match server.receive_request().await {
Ok(req) => req.into_key(),
Err(_) => return,
};

let accept = server.send_response(&Response::Accept { key: &websocket_key, protocol: None }).await;

if accept.is_err() {
return;
}

let (mut sender, receiver) = server.into_builder().finish();

let ws_stream = stream::unfold(receiver, move |mut receiver| async {
let mut buf = Vec::new();
let ret = match receiver.receive_data(&mut buf).await {
Ok(_) => Ok(buf),
Err(err) => Err(err),
};
Some((ret, receiver))
});
futures::pin_mut!(ws_stream);

loop {
let next_ws = ws_stream.next().fuse();
let next_exit = exit.next().fuse();
let time_out = tokio::time::delay_for(Duration::from_secs(1)).fuse();
futures::pin_mut!(time_out, next_exit, next_ws);

futures::select! {
_ = time_out => {
if let ServerMode::Subscription { subscription_response, .. } = &mode {
if let Err(e) = sender.send_text(&subscription_response).await {
log::warn!("send response to subscription: {:?}", e);
}
}
}
ws = next_ws => {
// Got a request on the connection but don't care about the contents.
// Just send out the pre-configured hardcoded responses.
if let Some(Ok(_)) = ws {
match &mode {
ServerMode::Response(r) => {
if let Err(e) = sender.send_text(&r).await {
log::warn!("send response to request error: {:?}", e);
}
}
ServerMode::Subscription { subscription_id, .. } => {
if let Err(e) = sender.send_text(&subscription_id).await {
log::warn!("send subscription id error: {:?}", e);
}
}
}
}
}
_ = next_exit => break,
}
}
}