Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/poc_middleware_layer_v4' into re…
Browse files Browse the repository at this point in the history
…move_cors_v2

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv committed Aug 12, 2022
2 parents fa2a091 + 5685da9 commit e14e25b
Show file tree
Hide file tree
Showing 19 changed files with 147 additions and 99 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ jobs:
command: fmt
args: --all -- --check

- name: Cargo clippy
uses: actions-rs/cargo@v1.0.3
- name: Check clippy
uses: actions-rs/clippy-check@v1
with:
command: clippy
args: --all-targets
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features

check-docs:
name: Check rustdoc
Expand Down
53 changes: 51 additions & 2 deletions benches/bench.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::sync::Arc;

use crate::helpers::{ws_handshake, KIB};
use criterion::*;
use futures_util::future::{join_all, FutureExt};
use futures_util::stream::FuturesUnordered;
use helpers::{http_client, ws_client, SUB_METHOD_NAME, UNSUB_METHOD_NAME};
use jsonrpsee::core::client::{ClientT, SubscriptionClientT};
use jsonrpsee::http_client::HeaderMap;
use jsonrpsee::types::{Id, ParamsSer, RequestSer};
use pprof::criterion::{Output, PProfProfiler};
use tokio::runtime::Runtime as TokioRuntime;
Expand Down Expand Up @@ -85,10 +87,11 @@ trait RequestBencher {
fn http_benches(crit: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let (url, _server) = rt.block_on(helpers::http_server(rt.handle().clone()));
let client = Arc::new(http_client(&url));
let client = Arc::new(http_client(&url, HeaderMap::new()));
round_trip(&rt, crit, client.clone(), "http_round_trip", Self::REQUEST_TYPE);
http_concurrent_conn_calls(&rt, crit, &url, "http_concurrent_conn_calls", Self::REQUEST_TYPE);
batch_round_trip(&rt, crit, client, "http_batch_requests", Self::REQUEST_TYPE);
http_custom_headers_round_trip(&rt, crit, &url, "http_custom_headers_round_trip", Self::REQUEST_TYPE);
}

fn websocket_benches(crit: &mut Criterion) {
Expand All @@ -99,6 +102,7 @@ trait RequestBencher {
ws_concurrent_conn_calls(&rt, crit, &url, "ws_concurrent_conn_calls", Self::REQUEST_TYPE);
ws_concurrent_conn_subs(&rt, crit, &url, "ws_concurrent_conn_subs", Self::REQUEST_TYPE);
batch_round_trip(&rt, crit, client, "ws_batch_requests", Self::REQUEST_TYPE);
ws_custom_headers_handshake(&rt, crit, &url, "ws_custom_headers_handshake", Self::REQUEST_TYPE);
}

fn subscriptions(crit: &mut Criterion) {
Expand Down Expand Up @@ -293,7 +297,7 @@ fn http_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str
for conns in [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024] {
group.bench_function(format!("{}", conns), |b| {
b.to_async(rt).iter_with_setup(
|| (0..conns).map(|_| http_client(url)),
|| (0..conns).map(|_| http_client(url, HeaderMap::new())),
|clients| async {
let tasks = clients.map(|client| {
rt.spawn(async move {
Expand All @@ -307,3 +311,48 @@ fn http_concurrent_conn_calls(rt: &TokioRuntime, crit: &mut Criterion, url: &str
}
group.finish();
}

/// Bench `round_trip` with different header sizes.
fn http_custom_headers_round_trip(
rt: &TokioRuntime,
crit: &mut Criterion,
url: &str,
name: &str,
request: RequestType,
) {
let method_name = request.methods()[0];

for header_size in [0, KIB, 5 * KIB, 25 * KIB, 100 * KIB] {
let mut headers = HeaderMap::new();
if header_size != 0 {
headers.insert("key", "A".repeat(header_size).parse().unwrap());
}

let client = Arc::new(http_client(url, headers));
let bench_name = format!("{}/{}kb", name, header_size / KIB);

crit.bench_function(&request.group_name(&bench_name), |b| {
b.to_async(rt).iter(|| async {
black_box(client.request::<String>(method_name, None).await.unwrap());
})
});
}
}

/// Bench WS handshake with different header sizes.
fn ws_custom_headers_handshake(rt: &TokioRuntime, crit: &mut Criterion, url: &str, name: &str, request: RequestType) {
let mut group = crit.benchmark_group(request.group_name(name));
for header_size in [0, KIB, 2 * KIB, 4 * KIB] {
group.bench_function(format!("{}kb", header_size / KIB), |b| {
b.to_async(rt).iter(|| async move {
let mut headers = HeaderMap::new();
if header_size != 0 {
headers.insert("key", "A".repeat(header_size).parse().unwrap());
}

ws_handshake(url, headers).await;
})
});
}
group.finish();
}
18 changes: 14 additions & 4 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use jsonrpsee::client_transport::ws::{Uri, WsTransportClientBuilder};
use jsonrpsee::http_client::{HeaderMap, HttpClient, HttpClientBuilder};
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};

pub(crate) const SYNC_FAST_CALL: &str = "fast_call";
Expand All @@ -13,6 +14,9 @@ pub(crate) const UNSUB_METHOD_NAME: &str = "unsub";
pub(crate) const SYNC_METHODS: [&str; 3] = [SYNC_FAST_CALL, SYNC_MEM_CALL, SYNC_SLOW_CALL];
pub(crate) const ASYNC_METHODS: [&str; 3] = [SYNC_FAST_CALL, SYNC_MEM_CALL, SYNC_SLOW_CALL];

// 1 KiB = 1024 bytes
pub(crate) const KIB: usize = 1024;

/// Run jsonrpc HTTP server for benchmarks.
#[cfg(feature = "jsonrpc-crate")]
pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_http_server::Server) {
Expand Down Expand Up @@ -160,9 +164,9 @@ fn gen_rpc_module() -> jsonrpsee::RpcModule<()> {
module.register_method(SYNC_FAST_CALL, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_FAST_CALL, |_, _| async { Ok("lo") }).unwrap();

module.register_method(SYNC_MEM_CALL, |_, _| Ok("A".repeat(1 * 1024 * 1024))).unwrap();
module.register_method(SYNC_MEM_CALL, |_, _| Ok("A".repeat(1024 * 1024))).unwrap();

module.register_async_method(ASYNC_MEM_CALL, |_, _| async move { Ok("A".repeat(1 * 1024 * 1024)) }).unwrap();
module.register_async_method(ASYNC_MEM_CALL, |_, _| async move { Ok("A".repeat(1024 * 1024)) }).unwrap();

module
.register_method(SYNC_SLOW_CALL, |_, _| {
Expand All @@ -181,10 +185,11 @@ fn gen_rpc_module() -> jsonrpsee::RpcModule<()> {
module
}

pub(crate) fn http_client(url: &str) -> HttpClient {
pub(crate) fn http_client(url: &str, headers: HeaderMap) -> HttpClient {
HttpClientBuilder::default()
.max_request_body_size(u32::MAX)
.max_concurrent_requests(1024 * 1024)
.set_headers(headers)
.build(url)
.unwrap()
}
Expand All @@ -197,3 +202,8 @@ pub(crate) async fn ws_client(url: &str) -> WsClient {
.await
.unwrap()
}

pub(crate) async fn ws_handshake(url: &str, headers: HeaderMap) {
let uri: Uri = url.parse().unwrap();
WsTransportClientBuilder::default().max_request_body_size(u32::MAX).set_headers(headers).build(uri).await.unwrap();
}
4 changes: 3 additions & 1 deletion client/http-client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ impl HttpTransportClient {
}

let mut req = hyper::Request::post(&self.target);
req.headers_mut().map(|headers| *headers = self.headers.clone());
if let Some(headers) = req.headers_mut() {
*headers = self.headers.clone();
}
let req = req.body(From::from(body)).expect("URI and request headers are valid; qed");

let response = self.client.request(req).await.map_err(|e| Error::Http(Box::new(e)))?;
Expand Down
1 change: 1 addition & 0 deletions client/transport/src/ws/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
/// Stream to represent either a unencrypted or encrypted socket stream.
#[pin_project(project = EitherStreamProj)]
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum EitherStream {
/// Unencrypted socket stream.
Plain(#[pin] TcpStream),
Expand Down
4 changes: 1 addition & 3 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,7 @@ impl ClientT for Client {

rx_log_from_json(&json_values, self.max_log_length);

let values: Result<_, _> =
json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect();
Ok(values?)
json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect()
}.instrument(trace.into_span()).await
}
}
Expand Down
20 changes: 10 additions & 10 deletions core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,17 @@ impl Error {
}
}

impl Into<ErrorObjectOwned> for Error {
fn into(self) -> ErrorObjectOwned {
match self {
impl From<Error> for ErrorObjectOwned {
fn from(err: Error) -> Self {
match err {
Error::Call(CallError::Custom(err)) => err,
Error::Call(CallError::InvalidParams(e)) => {
ErrorObject::owned(INVALID_PARAMS_CODE, e.to_string(), None::<()>)
}
Error::Call(CallError::Failed(e)) => {
ErrorObject::owned(CALL_EXECUTION_FAILED_CODE, e.to_string(), None::<()>)
}
_ => ErrorObject::owned(UNKNOWN_ERROR_CODE, self.to_string(), None::<()>),
_ => ErrorObject::owned(UNKNOWN_ERROR_CODE, err.to_string(), None::<()>),
}
}
}
Expand All @@ -173,18 +173,18 @@ pub enum SubscriptionClosed {
Failed(ErrorObject<'static>),
}

impl Into<ErrorObjectOwned> for SubscriptionClosed {
fn into(self) -> ErrorObjectOwned {
match self {
Self::RemotePeerAborted => {
impl From<SubscriptionClosed> for ErrorObjectOwned {
fn from(err: SubscriptionClosed) -> Self {
match err {
SubscriptionClosed::RemotePeerAborted => {
ErrorObject::owned(SUBSCRIPTION_CLOSED, "Subscription was closed by the remote peer", None::<()>)
}
Self::Success => ErrorObject::owned(
SubscriptionClosed::Success => ErrorObject::owned(
SUBSCRIPTION_CLOSED,
"Subscription was completed by the server successfully",
None::<()>,
),
Self::Failed(err) => err,
SubscriptionClosed::Failed(err) => err,
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions core/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ where
type Instant = (A::Instant, B::Instant);

fn on_connect(&self, remote_addr: std::net::SocketAddr, headers: &Headers) {
(self.0.on_connect(remote_addr, headers), self.1.on_connect(remote_addr, headers));
self.0.on_connect(remote_addr, headers);
self.1.on_connect(remote_addr, headers);
}

fn on_request(&self) -> Self::Instant {
Expand All @@ -170,7 +171,8 @@ where
}

fn on_disconnect(&self, remote_addr: std::net::SocketAddr) {
(self.0.on_disconnect(remote_addr), self.1.on_disconnect(remote_addr));
self.0.on_disconnect(remote_addr);
self.1.on_disconnect(remote_addr);
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ impl SubscriptionSink {

fn is_active_subscription(&self) -> bool {
match self.unsubscribe.as_ref() {
Some(unsubscribe) => !unsubscribe.has_changed().is_err(),
Some(unsubscribe) => unsubscribe.has_changed().is_ok(),
_ => false,
}
}
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion examples/examples/multi_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async fn main() -> anyhow::Result<()> {
println!("response: {:?}", response);
let _response: Result<String, _> = client.request("unknown_method", None).await;
let _ = client.request::<String>("say_hello", None).await?;
let _ = client.request::<()>("thready", rpc_params![4]).await?;
client.request::<()>("thready", rpc_params![4]).await?;

Ok(())
}
Expand Down
18 changes: 6 additions & 12 deletions examples/examples/ws_pubsub_with_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,9 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let stream = IntervalStream::new(interval).map(move |_| item);

tokio::spawn(async move {
match sink.pipe_from_stream(stream).await {
SubscriptionClosed::Failed(err) => {
sink.close(err);
}
_ => (),
};
if let SubscriptionClosed::Failed(err) = sink.pipe_from_stream(stream).await {
sink.close(err);
}
});
Ok(())
})
Expand All @@ -94,12 +91,9 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let stream = IntervalStream::new(interval).map(move |_| item);

tokio::spawn(async move {
match sink.pipe_from_stream(stream).await {
SubscriptionClosed::Failed(err) => {
sink.close(err);
}
_ => (),
};
if let SubscriptionClosed::Failed(err) = sink.pipe_from_stream(stream).await {
sink.close(err);
}
});

Ok(())
Expand Down
Loading

0 comments on commit e14e25b

Please sign in to comment.