Skip to content

Commit

Permalink
[ws client]: add tests (#134)
Browse files Browse the repository at this point in the history
* [test utils]: add `internal_err` and consts
[errors]: unify client/server errors
[test utils]: fake WebSocket jsonrpc server
[ws client]: export errors
[ws client]: add some basic tests

* fmt

* remove log target

* fix nits

* [ws client]: add subscription test

* revert unintendend changes.

* fmt

* [ws client]: fix panic in tests

* cleanup

* tests(ws client): test for invalid request ID.

* fix nits
  • Loading branch information
niklasad1 authored Dec 15, 2020
1 parent 646a308 commit 1c5bdb0
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 0 deletions.
123 changes: 123 additions & 0 deletions src/client/ws/tests.rs
Original file line number Diff line number Diff line change
@@ -1 +1,124 @@
#![cfg(test)]

use crate::client::{WsClient, WsConfig, WsSubscription};
use crate::types::error::Error;
use crate::types::jsonrpc;

use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::types::{Id, WebSocketTestServer};

fn assert_error_response(response: Result<jsonrpc::JsonValue, Error>, code: jsonrpc::ErrorCode, message: String) {
let expected = jsonrpc::Error { code, message, data: None };
match response {
Err(Error::Request(err)) => {
assert_eq!(err, expected);
}
e @ _ => panic!("Expected error: \"{}\", got: {:?}", expected, e),
};
}

#[tokio::test]
async fn method_call_works() {
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
ok_response("hello".into(), Id::Num(0_u64)),
)
.await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let response: jsonrpc::JsonValue = client.request("say_hello", jsonrpc::Params::None).await.unwrap();
let exp = jsonrpc::JsonValue::String("hello".to_string());
assert_eq!(response, exp);
}

#[tokio::test]
async fn notif_works() {
// this empty string shouldn't be read because the server shouldn't respond to notifications.
let server = WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), String::new()).await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
assert!(client.notification("notif", jsonrpc::Params::None).await.is_ok());
}

#[tokio::test]
async fn method_not_found_works() {
let server =
WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), method_not_found(Id::Num(0_u64)))
.await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let response: Result<jsonrpc::JsonValue, Error> = client.request("say_hello", jsonrpc::Params::None).await;
assert_error_response(response, jsonrpc::ErrorCode::MethodNotFound, METHOD_NOT_FOUND.into());
}

#[tokio::test]
async fn parse_error_works() {
let server =
WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), parse_error(Id::Num(0_u64))).await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let response: Result<jsonrpc::JsonValue, Error> = client.request("say_hello", jsonrpc::Params::None).await;
assert_error_response(response, jsonrpc::ErrorCode::ParseError, PARSE_ERROR.into());
}

#[tokio::test]
async fn invalid_request_works() {
let server =
WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), invalid_request(Id::Num(0_u64)))
.await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let response: Result<jsonrpc::JsonValue, Error> = client.request("say_hello", jsonrpc::Params::None).await;
assert_error_response(response, jsonrpc::ErrorCode::InvalidRequest, INVALID_REQUEST.into());
}

#[tokio::test]
async fn invalid_params_works() {
let server =
WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), invalid_params(Id::Num(0_u64)))
.await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let response: Result<jsonrpc::JsonValue, Error> = client.request("say_hello", jsonrpc::Params::None).await;
assert_error_response(response, jsonrpc::ErrorCode::InvalidParams, INVALID_PARAMS.into());
}

#[tokio::test]
async fn internal_error_works() {
let server =
WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), internal_error(Id::Num(0_u64)))
.await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let response: Result<jsonrpc::JsonValue, Error> = client.request("say_hello", jsonrpc::Params::None).await;
assert_error_response(response, jsonrpc::ErrorCode::InternalError, INTERNAL_ERROR.into());
}

#[tokio::test]
async fn subscription_works() {
let server = WebSocketTestServer::with_hardcoded_subscription(
"127.0.0.1:0".parse().unwrap(),
server_subscription_id_response(Id::Num(0)),
server_subscription_response(jsonrpc::JsonValue::String("hello my friend".to_owned())),
)
.await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
{
let mut sub: WsSubscription<String> =
client.subscribe("subscribe_hello", jsonrpc::Params::None, "unsubscribe_hello").await.unwrap();
let response: String = sub.next().await.unwrap().into();
assert_eq!("hello my friend".to_owned(), response);
}
}

#[tokio::test]
async fn response_with_wrong_id() {
let server =
WebSocketTestServer::with_hardcoded_response("127.0.0.1:0".parse().unwrap(), internal_error(Id::Num(99_u64)))
.await;
let uri = to_ws_uri_string(server.local_addr());
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let err: Result<jsonrpc::JsonValue, Error> = client.request("say_hello", jsonrpc::Params::None).await;
assert!(matches!(err, Err(Error::TransportError(e)) if e.to_string().contains("background task closed")));
}
2 changes: 2 additions & 0 deletions test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = "1.8.0"
futures = "0.3.8"
hyper = "0.13.9"
log = "0.4.11"
serde = { version = "1.0.118", default-features = false, features = ["derive"] }
serde_json = "1.0.60"
soketto = "0.4.2"
Expand Down
18 changes: 18 additions & 0 deletions test-utils/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,24 @@ pub fn internal_error(id: Id) -> String {
)
}

/// Hardcoded server response when a client initiates a new subscription.
///
/// NOTE: works only for one subscription because the subscription ID is hardcoded.
pub fn server_subscription_id_response(id: Id) -> String {
format!(
r#"{{"jsonrpc":"2.0","result":"D3wwzU6vvoUUYehv4qoFzq42DZnLoAETeFzeyk8swH4o","id":{}}}"#,
serde_json::to_string(&id).unwrap()
)
}

/// Server response to a hardcoded pending subscription
pub fn server_subscription_response(result: Value) -> String {
format!(
r#"{{"jsonrpc":"2.0","method":"bar","params":{{"subscription":"D3wwzU6vvoUUYehv4qoFzq42DZnLoAETeFzeyk8swH4o","result":{}}}}}"#,
serde_json::to_string(&result).unwrap()
)
}

pub async fn http_request(body: Body, uri: Uri) -> Result<HttpResponse, String> {
let client = hyper::Client::new();
let r = hyper::Request::post(uri)
Expand Down
2 changes: 2 additions & 0 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! Shared test helpers for JSONRPC v2.
#![recursion_limit = "256"]

pub mod helpers;
pub mod types;
148 changes: 148 additions & 0 deletions test-utils/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use futures::channel::mpsc::{self, Receiver, Sender};
use futures::future::FutureExt;
use futures::io::{BufReader, BufWriter};
use futures::sink::SinkExt;
use futures::stream::{self, StreamExt};
use serde::{Deserialize, Serialize};
use soketto::handshake;
use soketto::handshake::{server::Response, Server};
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio_util::compat::{Compat, Tokio02AsyncReadCompatExt};

Expand Down Expand Up @@ -68,3 +74,145 @@ impl WebSocketTestClient {
self.tx.close().await.map_err(Into::into)
}
}

#[derive(Debug, Clone)]
pub enum ServerMode {
// Send out a hardcoded response on every connection.
Response(String),
// Send out a subscription ID on a request and continuously send out data on the subscription.
Subscription { subscription_id: String, subscription_response: String },
}

/// JSONRPC v2 dummy WebSocket server that sends a hardcoded response.
pub struct WebSocketTestServer {
local_addr: SocketAddr,
exit: Sender<()>,
}

impl WebSocketTestServer {
// Spawns a dummy `JSONRPC v2` WebSocket server that sends out a pre-configured `hardcoded response` for every connection.
pub async fn with_hardcoded_response(sockaddr: SocketAddr, response: String) -> Self {
let listener = async_std::net::TcpListener::bind(sockaddr).await.unwrap();
let local_addr = listener.local_addr().unwrap();
let (tx, rx) = mpsc::channel::<()>(4);
tokio::spawn(server_backend(listener, rx, ServerMode::Response(response)));

Self { local_addr, exit: tx }
}

// Spawns a dummy `JSONRPC v2` WebSocket server that sends out a pre-configured subscription ID and subscription response.
//
// NOTE: ignores the actual subscription and unsubscription method.
pub async fn with_hardcoded_subscription(
sockaddr: SocketAddr,
subscription_id: String,
subscription_response: String,
) -> Self {
let listener = async_std::net::TcpListener::bind(sockaddr).await.unwrap();
let local_addr = listener.local_addr().unwrap();
let (tx, rx) = mpsc::channel::<()>(4);
tokio::spawn(server_backend(listener, rx, ServerMode::Subscription { subscription_id, subscription_response }));

Self { local_addr, exit: tx }
}

pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}

pub async fn close(&mut self) {
self.exit.send(()).await.unwrap();
}
}

async fn server_backend(listener: async_std::net::TcpListener, mut exit: Receiver<()>, mode: ServerMode) {
let mut connections = Vec::new();

loop {
let conn_fut = listener.accept().fuse();
let exit_fut = exit.next();
futures::pin_mut!(exit_fut, conn_fut);

futures::select! {
_ = exit_fut => break,
conn = conn_fut => {
if let Ok((stream, _)) = conn {
let (tx, rx) = mpsc::channel::<()>(4);
let handle = tokio::spawn(connection_task(stream, mode.clone(), rx));
connections.push((handle, tx));
}
}
}
}

// close connections
for (handle, mut exit) in connections {
// If the actual connection was never established i.e., returned early
// It will most likely be caught on the client-side but just to be explicit.
exit.send(()).await.expect("WebSocket connection was never established");
handle.await.unwrap();
}
}

async fn connection_task(socket: async_std::net::TcpStream, mode: ServerMode, mut exit: Receiver<()>) {
let mut server = Server::new(socket);

let websocket_key = match server.receive_request().await {
Ok(req) => req.into_key(),
Err(_) => return,
};

let accept = server.send_response(&Response::Accept { key: &websocket_key, protocol: None }).await;

if accept.is_err() {
return;
}

let (mut sender, receiver) = server.into_builder().finish();

let ws_stream = stream::unfold(receiver, move |mut receiver| async {
let mut buf = Vec::new();
let ret = match receiver.receive_data(&mut buf).await {
Ok(_) => Ok(buf),
Err(err) => Err(err),
};
Some((ret, receiver))
});
futures::pin_mut!(ws_stream);

loop {
let next_ws = ws_stream.next().fuse();
let next_exit = exit.next().fuse();
let time_out = tokio::time::delay_for(Duration::from_secs(1)).fuse();
futures::pin_mut!(time_out, next_exit, next_ws);

futures::select! {
_ = time_out => {
if let ServerMode::Subscription { subscription_response, .. } = &mode {
if let Err(e) = sender.send_text(&subscription_response).await {
log::warn!("send response to subscription: {:?}", e);
}
}
}
ws = next_ws => {
// Got a request on the connection but don't care about the contents.
// Just send out the pre-configured hardcoded responses.
if let Some(Ok(_)) = ws {
match &mode {
ServerMode::Response(r) => {
if let Err(e) = sender.send_text(&r).await {
log::warn!("send response to request error: {:?}", e);
}
}
ServerMode::Subscription { subscription_id, .. } => {
if let Err(e) = sender.send_text(&subscription_id).await {
log::warn!("send subscription id error: {:?}", e);
}
}
}
}
}
_ = next_exit => break,
}
}
}

0 comments on commit 1c5bdb0

Please sign in to comment.