Skip to content

Commit

Permalink
register_*_method Into<Error>
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed Aug 14, 2021
1 parent 394a06b commit 33b4fa2
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 55 deletions.
9 changes: 5 additions & 4 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use futures_channel::oneshot;
use futures_util::future::FutureExt;
use jsonrpsee::{
http_server::HttpServerBuilder,
types::Error,
ws_server::{RpcModule, WsServerBuilder},
};

Expand All @@ -17,8 +18,8 @@ pub async fn http_server() -> String {
let server =
HttpServerBuilder::default().max_request_body_size(u32::MAX).build("127.0.0.1:0".parse().unwrap()).unwrap();
let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| (async { Ok("lo") }).boxed()).unwrap();
module.register_method::<_, _, Error>(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method::<_, _, Error>(ASYNC_METHOD_NAME, |_, _| (async { Ok("lo") }).boxed()).unwrap();
server_started_tx.send(server.local_addr().unwrap()).unwrap();
server.start(module).await
});
Expand All @@ -31,8 +32,8 @@ pub async fn ws_server() -> String {
tokio::spawn(async move {
let server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap();
let mut module = RpcModule::new(());
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| (async { Ok("lo") }).boxed()).unwrap();
module.register_method::<_, _, Error>(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method::<_, _, Error>(ASYNC_METHOD_NAME, |_, _| (async { Ok("lo") }).boxed()).unwrap();
module
.register_subscription(SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let x = "Hello";
Expand Down
3 changes: 2 additions & 1 deletion examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
use jsonrpsee::{
http_client::HttpClientBuilder,
http_server::{HttpServerBuilder, RpcModule},
types::Error,
types::{traits::Client, JsonValue},
};
use std::net::SocketAddr;
Expand All @@ -50,7 +51,7 @@ async fn main() -> anyhow::Result<()> {
async fn run_server() -> anyhow::Result<SocketAddr> {
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse()?)?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
module.register_method::<_, _, Error>("say_hello", |_, _| Ok("lo"))?;

let addr = server.local_addr()?;
tokio::spawn(server.start(module));
Expand Down
2 changes: 1 addition & 1 deletion examples/proc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn main() -> anyhow::Result<()> {
async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("state_getPairs", |_, _| Ok(vec![1, 2, 3]))?;
module.register_method::<_, _, Error>("state_getPairs", |_, _| Ok(vec![1, 2, 3]))?;

let addr = server.local_addr()?;
tokio::spawn(async move { server.start(RpcServerImpl.into_rpc()).await });
Expand Down
4 changes: 2 additions & 2 deletions examples/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
// DEALINGS IN THE SOFTWARE.

use jsonrpsee::{
types::{traits::Client, v2::params::JsonRpcParams},
types::{traits::Client, v2::params::JsonRpcParams, Error},
ws_client::WsClientBuilder,
ws_server::{RpcModule, WsServerBuilder},
};
Expand All @@ -47,7 +47,7 @@ async fn main() -> anyhow::Result<()> {
async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;
module.register_method::<_, _, Error>("say_hello", |_, _| Ok("lo"))?;
let addr = server.local_addr()?;
tokio::spawn(server.start(module));
Ok(addr)
Expand Down
24 changes: 12 additions & 12 deletions http-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,38 +47,38 @@ async fn server_with_handles() -> (SocketAddr, JoinHandle<Result<(), Error>>, St
let ctx = TestContext;
let mut module = RpcModule::new(ctx);
let addr = server.local_addr().unwrap();
module.register_method("say_hello", |_, _| Ok("lo")).unwrap();
module.register_async_method("say_hello_async", |_, _| async move { Ok("lo") }.boxed()).unwrap();
module.register_method::<_, _, Error>("say_hello", |_, _| Ok("lo")).unwrap();
module.register_async_method::<_, _, Error>("say_hello_async", |_, _| async move { Ok("lo") }.boxed()).unwrap();
module
.register_method("add", |params, _| {
.register_method::<_, _, Error>("add", |params, _| {
let params: Vec<u64> = params.parse()?;
let sum: u64 = params.into_iter().sum();
Ok(sum)
})
.unwrap();
module
.register_method("multiparam", |params, _| {
.register_method::<_, _, Error>("multiparam", |params, _| {
let params: (String, String, Vec<u8>) = params.parse()?;
let r = format!("string1={}, string2={}, vec={}", params.0.len(), params.1.len(), params.2.len());
Ok(r)
})
.unwrap();
module.register_method("notif", |_, _| Ok("")).unwrap();
module.register_method::<_, _, Error>("notif", |_, _| Ok("")).unwrap();
module
.register_method("should_err", |_, ctx| {
.register_method::<_, _, Error>("should_err", |_, ctx| {
let _ = ctx.err().map_err(|e| CallError::Failed(e.into()))?;
Ok("err")
})
.unwrap();

module
.register_method("should_ok", |_, ctx| {
.register_method::<_, _, Error>("should_ok", |_, ctx| {
let _ = ctx.ok().map_err(|e| CallError::Failed(e.into()))?;
Ok("ok")
})
.unwrap();
module
.register_async_method("should_ok_async", |_p, ctx| {
.register_async_method::<_, _, Error>("should_ok_async", |_p, ctx| {
async move {
let _ = ctx.ok().map_err(|e| CallError::Failed(e.into()))?;
Ok("ok")
Expand Down Expand Up @@ -369,12 +369,12 @@ async fn can_register_modules() {
let mut mod2 = RpcModule::new(cx2);

assert_eq!(mod1.method_names().count(), 0);
mod1.register_method("bla", |_, cx| Ok(format!("Gave me {}", cx))).unwrap();
mod1.register_method("bla2", |_, cx| Ok(format!("Gave me {}", cx))).unwrap();
mod2.register_method("yada", |_, cx| Ok(format!("Gave me {:?}", cx))).unwrap();
mod1.register_method::<_, _, Error>("bla", |_, cx| Ok(format!("Gave me {}", cx))).unwrap();
mod1.register_method::<_, _, Error>("bla2", |_, cx| Ok(format!("Gave me {}", cx))).unwrap();
mod2.register_method::<_, _, Error>("yada", |_, cx| Ok(format!("Gave me {:?}", cx))).unwrap();

// Won't register, name clashes
mod2.register_method("bla", |_, cx| Ok(format!("Gave me {:?}", cx))).unwrap();
mod2.register_method::<_, _, Error>("bla", |_, cx| Ok(format!("Gave me {:?}", cx))).unwrap();

assert_eq!(mod1.method_names().count(), 2);

Expand Down
9 changes: 5 additions & 4 deletions tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
use futures_channel::oneshot;
use jsonrpsee::{
http_server::HttpServerBuilder,
types::Error,
ws_server::{WsServerBuilder, WsStopHandle},
RpcModule,
};
Expand All @@ -42,7 +43,7 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsStopHandle)
let server = rt.block_on(WsServerBuilder::default().build("127.0.0.1:0")).unwrap();

let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("hello")).unwrap();
module.register_method::<_, _, Error>("say_hello", |_, _| Ok("hello")).unwrap();

module
.register_subscription("subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
Expand Down Expand Up @@ -102,7 +103,7 @@ pub async fn websocket_server() -> SocketAddr {
let rt = tokio::runtime::Runtime::new().unwrap();
let server = rt.block_on(WsServerBuilder::default().build("127.0.0.1:0")).unwrap();
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("hello")).unwrap();
module.register_method::<_, _, Error>("say_hello", |_, _| Ok("hello")).unwrap();

rt.block_on(async move {
server_started_tx.send(server.local_addr().unwrap()).unwrap();
Expand All @@ -118,8 +119,8 @@ pub async fn http_server() -> SocketAddr {
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse().unwrap()).unwrap();
let mut module = RpcModule::new(());
let addr = server.local_addr().unwrap();
module.register_method("say_hello", |_, _| Ok("hello")).unwrap();
module.register_method("notif", |_, _| Ok("")).unwrap();
module.register_method::<_, _, Error>("say_hello", |_, _| Ok("hello")).unwrap();
module.register_method::<_, _, Error>("notif", |_, _| Ok("")).unwrap();

tokio::spawn(server.start(module));
addr
Expand Down
20 changes: 11 additions & 9 deletions utils/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,12 @@ impl<Context> From<RpcModule<Context>> for Methods {

impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// Register a new synchronous RPC method, which computes the response with the given callback.
pub fn register_method<R, F>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
pub fn register_method<R, F, E>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
where
Context: Send + Sync + 'static,
R: Serialize,
F: Fn(RpcParams, &Context) -> Result<R, Error> + Send + Sync + 'static,
F: Fn(RpcParams, &Context) -> Result<R, E> + Send + Sync + 'static,
E: Into<Error>,
{
self.methods.verify_method_name(method_name)?;

Expand All @@ -219,7 +220,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
self.methods.mut_callbacks().insert(
method_name,
MethodCallback::Sync(Arc::new(move |id, params, tx, _| {
match callback(params, &*ctx) {
match callback(params, &*ctx).map_err(Into::into) {
Ok(res) => send_response(id, &tx, res),
Err(Error::Call(CallError::InvalidParams)) => {
send_error(id, &tx, JsonRpcErrorCode::InvalidParams.into())
Expand Down Expand Up @@ -254,10 +255,11 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
}

/// Register a new asynchronous RPC method, which computes the response with the given callback.
pub fn register_async_method<R, F>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
pub fn register_async_method<R, F, E>(&mut self, method_name: &'static str, callback: F) -> Result<(), Error>
where
R: Serialize + Send + Sync + 'static,
F: Fn(RpcParams<'static>, Arc<Context>) -> BoxFuture<'static, Result<R, Error>> + Copy + Send + Sync + 'static,
F: Fn(RpcParams<'static>, Arc<Context>) -> BoxFuture<'static, Result<R, E>> + Copy + Send + Sync + 'static,
E: Into<Error>,
{
self.methods.verify_method_name(method_name)?;

Expand All @@ -268,7 +270,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
MethodCallback::Async(Arc::new(move |id, params, tx, _| {
let ctx = ctx.clone();
let future = async move {
match callback(params, ctx).await {
match callback(params, ctx).await.map_err(Into::into) {
Ok(res) => send_response(id, &tx, res),
Err(Error::Call(CallError::InvalidParams)) => {
send_error(id, &tx, JsonRpcErrorCode::InvalidParams.into())
Expand Down Expand Up @@ -490,9 +492,9 @@ mod tests {
fn rpc_modules_with_different_contexts_can_be_merged() {
let cx = Vec::<u8>::new();
let mut mod1 = RpcModule::new(cx);
mod1.register_method("bla with Vec context", |_: RpcParams, _| Ok(())).unwrap();
mod1.register_method::<_, _, Error>("bla with Vec context", |_: RpcParams, _| Ok(())).unwrap();
let mut mod2 = RpcModule::new(String::new());
mod2.register_method("bla with String context", |_: RpcParams, _| Ok(())).unwrap();
mod2.register_method::<_, _, Error>("bla with String context", |_: RpcParams, _| Ok(())).unwrap();

mod1.merge(mod2).unwrap();

Expand All @@ -514,7 +516,7 @@ mod tests {
fn rpc_register_alias() {
let mut module = RpcModule::new(());

module.register_method("hello_world", |_: RpcParams, _| Ok(())).unwrap();
module.register_method::<_, _, Error>("hello_world", |_: RpcParams, _| Ok(())).unwrap();
module.register_alias("hello_foobar", "hello_world").unwrap();

assert!(module.method("hello_world").is_some());
Expand Down
42 changes: 20 additions & 22 deletions ws-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,20 @@ async fn server_with_handles() -> (SocketAddr, JoinHandle<()>, StopHandle) {
let server = WsServerBuilder::default().build("127.0.0.1:0").with_default_timeout().await.unwrap().unwrap();
let mut module = RpcModule::new(());
module
.register_method("say_hello", |_, _| {
.register_method::<_, _, Error>("say_hello", |_, _| {
log::debug!("server respond to hello");
Ok("hello")
})
.unwrap();
module
.register_method("add", |params, _| {
.register_method::<_, _, Error>("add", |params, _| {
let params: Vec<u64> = params.parse()?;
let sum: u64 = params.into_iter().sum();
Ok(sum)
})
.unwrap();
module
.register_async_method("say_hello_async", |_, _| {
.register_async_method::<_, _, Error>("say_hello_async", |_, _| {
async move {
log::debug!("server respond to hello");
// Call some async function inside.
Expand All @@ -86,7 +86,7 @@ async fn server_with_handles() -> (SocketAddr, JoinHandle<()>, StopHandle) {
})
.unwrap();
module
.register_async_method("add_async", |params, _| {
.register_async_method::<_, _, Error>("add_async", |params, _| {
async move {
let params: Vec<u64> = params.parse()?;
let sum: u64 = params.into_iter().sum();
Expand All @@ -95,12 +95,10 @@ async fn server_with_handles() -> (SocketAddr, JoinHandle<()>, StopHandle) {
.boxed()
})
.unwrap();
module.register_method("invalid_params", |_params, _| Err::<(), _>(CallError::InvalidParams.into())).unwrap();
module.register_method("invalid_params", |_params, _| Err::<(), _>(CallError::InvalidParams)).unwrap();
module.register_method("call_fail", |_params, _| Err::<(), _>(CallError::Failed(Box::new(MyAppError)))).unwrap();
module
.register_method("call_fail", |_params, _| Err::<(), _>(CallError::Failed(Box::new(MyAppError)).into()))
.unwrap();
module
.register_method("sleep_for", |params, _| {
.register_method::<_, _, Error>("sleep_for", |params, _| {
let sleep: Vec<u64> = params.parse()?;
std::thread::sleep(std::time::Duration::from_millis(sleep[0]));
Ok("Yawn!")
Expand All @@ -122,21 +120,21 @@ async fn server_with_context() -> SocketAddr {
let mut rpc_module = RpcModule::new(ctx);

rpc_module
.register_method("should_err", |_p, ctx| {
.register_method::<_, _, Error>("should_err", |_p, ctx| {
let _ = ctx.err().map_err(|e| CallError::Failed(e.into()))?;
Ok("err")
})
.unwrap();

rpc_module
.register_method("should_ok", |_p, ctx| {
.register_method::<_, _, Error>("should_ok", |_p, ctx| {
let _ = ctx.ok().map_err(|e| CallError::Failed(e.into()))?;
Ok("ok")
})
.unwrap();

rpc_module
.register_async_method("should_ok_async", |_p, ctx| {
.register_async_method::<_, _, Error>("should_ok_async", |_p, ctx| {
async move {
let _ = ctx.ok().map_err(|e| CallError::Failed(e.into()))?;
// Call some async function inside.
Expand All @@ -151,7 +149,7 @@ async fn server_with_context() -> SocketAddr {
async move {
let _ = ctx.ok().map_err(|e| CallError::Failed(e.into()))?;
// Async work that returns an error
futures_util::future::err::<(), Error>(CallError::Failed(String::from("nah").into()).into()).await
futures_util::future::err::<(), _>(CallError::Failed(String::from("nah").into())).await
}
.boxed()
})
Expand All @@ -169,7 +167,7 @@ async fn can_set_the_max_request_body_size() {
// Rejects all requests larger than 10 bytes
let server = WsServerBuilder::default().max_request_body_size(10).build(addr).await.unwrap();
let mut module = RpcModule::new(());
module.register_method("anything", |_p, _cx| Ok(())).unwrap();
module.register_method::<_, _, Error>("anything", |_p, _cx| Ok(())).unwrap();
let addr = server.local_addr().unwrap();
tokio::spawn(server.start(module));

Expand All @@ -192,7 +190,7 @@ async fn can_set_max_connections() {
// Server that accepts max 2 connections
let server = WsServerBuilder::default().max_connections(2).build(addr).await.unwrap();
let mut module = RpcModule::new(());
module.register_method("anything", |_p, _cx| Ok(())).unwrap();
module.register_method::<_, _, Error>("anything", |_p, _cx| Ok(())).unwrap();
let addr = server.local_addr().unwrap();

tokio::spawn(server.start(module));
Expand Down Expand Up @@ -453,12 +451,12 @@ async fn invalid_request_object() {
#[tokio::test]
async fn register_methods_works() {
let mut module = RpcModule::new(());
assert!(module.register_method("say_hello", |_, _| Ok("lo")).is_ok());
assert!(module.register_method("say_hello", |_, _| Ok("lo")).is_err());
assert!(module.register_method::<_, _, Error>("say_hello", |_, _| Ok("lo")).is_ok());
assert!(module.register_method::<_, _, Error>("say_hello", |_, _| Ok("lo")).is_err());
assert!(module.register_subscription("subscribe_hello", "unsubscribe_hello", |_, _, _| Ok(())).is_ok());
assert!(module.register_subscription("subscribe_hello_again", "unsubscribe_hello", |_, _, _| Ok(())).is_err());
assert!(
module.register_method("subscribe_hello_again", |_, _| Ok("lo")).is_ok(),
module.register_method::<_, _, Error>("subscribe_hello_again", |_, _| Ok("lo")).is_ok(),
"Failed register_subscription should not have side-effects"
);
}
Expand Down Expand Up @@ -529,12 +527,12 @@ async fn can_register_modules() {

assert_eq!(mod1.method_names().count(), 0);
assert_eq!(mod2.method_names().count(), 0);
mod1.register_method("bla", |_, cx| Ok(format!("Gave me {}", cx))).unwrap();
mod1.register_method("bla2", |_, cx| Ok(format!("Gave me {}", cx))).unwrap();
mod2.register_method("yada", |_, cx| Ok(format!("Gave me {:?}", cx))).unwrap();
mod1.register_method::<_, _, Error>("bla", |_, cx| Ok(format!("Gave me {}", cx))).unwrap();
mod1.register_method::<_, _, Error>("bla2", |_, cx| Ok(format!("Gave me {}", cx))).unwrap();
mod2.register_method::<_, _, Error>("yada", |_, cx| Ok(format!("Gave me {:?}", cx))).unwrap();

// Won't register, name clashes
mod2.register_method("bla", |_, cx| Ok(format!("Gave me {:?}", cx))).unwrap();
mod2.register_method::<_, _, Error>("bla", |_, cx| Ok(format!("Gave me {:?}", cx))).unwrap();

assert_eq!(mod1.method_names().count(), 2);
let err = mod1.merge(mod2).unwrap_err();
Expand Down

0 comments on commit 33b4fa2

Please sign in to comment.