Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Extensions to Request/MethodResponse #1306

Merged
merged 9 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ After this release one must do something like:


module
.register_subscription("sub", "s", "unsub", |_, pending, _| async move {
.register_subscription("sub", "s", "unsub", |_, pending, _, _| async move {
let stream = stream();
pipe_from_stream(sink, stream).await
})
Expand Down Expand Up @@ -493,7 +493,7 @@ Example:

```rust
module
.register_subscription::<RpcResult<(), _, _>::("sub", "s", "unsub", |_, pending, _| async move {
.register_subscription::<RpcResult<(), _, _>::("sub", "s", "unsub", |_, pending, _, _| async move {
// This just answers the RPC call and if this fails => no close notification is sent out.
pending.accept().await?;
// This is sent out as a `close notification/message`.
Expand Down
14 changes: 7 additions & 7 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::se
SUB_METHOD_NAME,
SUB_METHOD_NAME,
UNSUB_METHOD_NAME,
|_params, pending, _ctx| async move {
|_params, pending, _ctx, _| async move {
let sink = pending.accept().await?;
let msg = SubscriptionMessage::from("Hello");
sink.send(msg).await?;
Expand All @@ -169,22 +169,22 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::se
fn gen_rpc_module() -> jsonrpsee::RpcModule<()> {
let mut module = jsonrpsee::RpcModule::new(());

module.register_method(SYNC_FAST_CALL, |_, _| "lo").unwrap();
module.register_async_method(ASYNC_FAST_CALL, |_, _| async { "lo" }).unwrap();
module.register_method(SYNC_FAST_CALL, |_, _, _| "lo").unwrap();
module.register_async_method(ASYNC_FAST_CALL, |_, _, _| async { "lo" }).unwrap();

module.register_method(SYNC_MEM_CALL, |_, _| "A".repeat(MIB)).unwrap();
module.register_method(SYNC_MEM_CALL, |_, _, _| "A".repeat(MIB)).unwrap();

module.register_async_method(ASYNC_MEM_CALL, |_, _| async move { "A".repeat(MIB) }).unwrap();
module.register_async_method(ASYNC_MEM_CALL, |_, _, _| async move { "A".repeat(MIB) }).unwrap();

module
.register_method(SYNC_SLOW_CALL, |_, _| {
.register_method(SYNC_SLOW_CALL, |_, _, _| {
std::thread::sleep(SLOW_CALL);
"slow call"
})
.unwrap();

module
.register_async_method(ASYNC_SLOW_CALL, |_, _| async move {
.register_async_method(ASYNC_SLOW_CALL, |_, _, _| async move {
tokio::time::sleep(SLOW_CALL).await;
"slow call async"
})
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pin-project = { version = "1", optional = true }
[features]
default = []
http-helpers = ["bytes", "futures-util", "http-body", "http-body-util", "http"]
server = ["futures-util/alloc", "rustc-hash/std", "parking_lot", "rand", "tokio/rt", "tokio/sync", "tokio/macros", "tokio/time"]
server = ["futures-util/alloc", "rustc-hash/std", "parking_lot", "rand", "tokio/rt", "tokio/sync", "tokio/macros", "tokio/time", "http"]
client = ["futures-util/sink", "tokio/sync"]
async-client = [
"client",
Expand Down
1 change: 0 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ cfg_http_helpers! {
cfg_server! {
pub mod id_providers;
pub mod server;

}

cfg_client! {
Expand Down
41 changes: 38 additions & 3 deletions core/src/server/method_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::server::{BoundedWriter, LOG_TARGET};
use std::task::Poll;

use futures_util::{Future, FutureExt};
use http::Extensions;
use jsonrpsee_types::error::{
reject_too_big_batch_response, ErrorCode, ErrorObject, OVERSIZED_RESPONSE_CODE, OVERSIZED_RESPONSE_MSG,
};
Expand Down Expand Up @@ -59,6 +60,8 @@ pub struct MethodResponse {
/// Optional callback that may be utilized to notif
/// that the method response has been processed
on_close: Option<MethodResponseNotifyTx>,
/// The response's extensions.
extensions: Extensions,
}

impl MethodResponse {
Expand Down Expand Up @@ -121,6 +124,7 @@ impl MethodResponse {
success_or_error: MethodResponseResult::Success,
kind: ResponseKind::Batch,
on_close: None,
extensions: Extensions::new(),
}
}

Expand All @@ -140,6 +144,19 @@ impl MethodResponse {
/// If the serialization of `result` exceeds `max_response_size` then
/// the response is changed to an JSON-RPC error object.
pub fn response<T>(id: Id, rp: ResponsePayload<T>, max_response_size: usize) -> Self
where
T: Serialize + Clone,
{
Self::response_with_extensions(id, rp, max_response_size, Extensions::new())
}

/// Similar to [`MethodResponse::response`] but with extensions.
pub fn response_with_extensions<T>(
id: Id,
rp: ResponsePayload<T>,
max_response_size: usize,
extensions: Extensions,
) -> Self
where
T: Serialize + Clone,
{
Expand All @@ -158,7 +175,7 @@ impl MethodResponse {
// Safety - serde_json does not emit invalid UTF-8.
let result = unsafe { String::from_utf8_unchecked(writer.into_bytes()) };

Self { result, success_or_error, kind, on_close: rp.on_exit }
Self { result, success_or_error, kind, on_close: rp.on_exit, extensions: Extensions::new() }
}
Err(err) => {
tracing::error!(target: LOG_TARGET, "Error serializing response: {:?}", err);
Expand All @@ -180,6 +197,7 @@ impl MethodResponse {
success_or_error: MethodResponseResult::Failed(err_code),
kind,
on_close: rp.on_exit,
extensions: Extensions::new(),
}
} else {
let err = ErrorCode::InternalError;
Expand All @@ -191,6 +209,7 @@ impl MethodResponse {
success_or_error: MethodResponseResult::Failed(err.code()),
kind,
on_close: rp.on_exit,
extensions,
}
}
}
Expand All @@ -205,8 +224,8 @@ impl MethodResponse {
rp
}

/// Create a [`MethodResponse`] from a JSON-RPC error.
pub fn error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
/// Similar to [`MethodResponse::error`] but with extensions.
pub fn error_with_extensions<'a>(id: Id, err: impl Into<ErrorObject<'a>>, extensions: Extensions) -> Self {
let err: ErrorObject = err.into();
let err_code = err.code();
let err = InnerResponsePayload::<()>::error_borrowed(err);
Expand All @@ -216,8 +235,24 @@ impl MethodResponse {
success_or_error: MethodResponseResult::Failed(err_code),
kind: ResponseKind::MethodCall,
on_close: None,
extensions,
}
}

/// Create a [`MethodResponse`] from a JSON-RPC error.
pub fn error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
Self::error_with_extensions(id, err, Extensions::new())
}

/// Returns a reference to the associated extensions.
pub fn extensions(&self) -> &Extensions {
&self.extensions
}

/// Returns a reference to the associated extensions.
pub fn extensions_mut(&mut self) -> &mut Extensions {
&mut self.extensions
}
}

/// Represent the outcome of a method call success or failed.
Expand Down
1 change: 1 addition & 0 deletions core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod subscription;

pub use error::*;
pub use helpers::*;
pub use http::Extensions;
pub use method_response::*;
pub use rpc_module::*;
pub use subscription::*;
Expand Down
Loading
Loading