Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make it possible to override method name in subscriptions #568

Merged
merged 23 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
df60861
feat: override `method` subscription notif
niklasad1 Nov 17, 2021
d6eb4ea
Arrow syntax for overwrites (#569)
maciejhirsz Nov 18, 2021
86ea9db
Merge remote-tracking branch 'origin/master' into na-override-notif-m…
niklasad1 Nov 18, 2021
84aad42
check that unique notifs are used
niklasad1 Nov 18, 2021
eea1b2b
check that custom sub name is unique
niklasad1 Nov 18, 2021
e72a33a
cargo fmt
niklasad1 Nov 18, 2021
673699a
address grumbles
niklasad1 Nov 18, 2021
2913c0b
Update proc-macros/src/rpc_macro.rs
niklasad1 Nov 18, 2021
37f77d1
commit added tests
niklasad1 Nov 18, 2021
70477f9
Merge remote-tracking branch 'origin/na-override-notif-method-name' i…
niklasad1 Nov 18, 2021
66972ef
Update proc-macros/src/render_server.rs
niklasad1 Nov 19, 2021
5067f0d
Update proc-macros/src/render_server.rs
niklasad1 Nov 19, 2021
6886c2b
Update proc-macros/src/rpc_macro.rs
niklasad1 Nov 19, 2021
1eb0a80
Update proc-macros/src/rpc_macro.rs
niklasad1 Nov 19, 2021
2be32e9
Update utils/src/server/rpc_module.rs
niklasad1 Nov 19, 2021
b67be8f
grumbles
niklasad1 Nov 19, 2021
3e8a18e
fix long lines
niklasad1 Nov 19, 2021
48ebf92
Update utils/src/server/rpc_module.rs
niklasad1 Nov 19, 2021
b01fbe8
Update utils/src/server/rpc_module.rs
niklasad1 Nov 19, 2021
a1a9577
Update proc-macros/src/rpc_macro.rs
niklasad1 Nov 19, 2021
86b06ed
Update proc-macros/src/render_server.rs
niklasad1 Nov 19, 2021
b76efe8
Update proc-macros/src/render_server.rs
niklasad1 Nov 19, 2021
b20a0c3
more grumbles
niklasad1 Nov 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws
module.register_method(SYNC_METHOD_NAME, |_, _| Ok("lo")).unwrap();
module.register_async_method(ASYNC_METHOD_NAME, |_, _| async { Ok("lo") }).unwrap();
module
.register_subscription(SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
.register_subscription(SUB_METHOD_NAME, SUB_METHOD_NAME, UNSUB_METHOD_NAME, |_params, mut sink, _ctx| {
let x = "Hello";
tokio::spawn(async move { sink.send(&x) });
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion examples/proc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
async fn storage_keys(&self, storage_key: StorageKey, hash: Option<Hash>) -> Result<Vec<StorageKey>, Error>;

/// Subscription that takes a `StorageKey` as input and produces a `Vec<Hash>`.
#[subscription(name = "subscribeStorage", item = Vec<Hash>)]
#[subscription(name = "subscribeStorage" => "override", item = Vec<Hash>)]
fn subscribe_storage(&self, keys: Option<Vec<StorageKey>>) -> Result<(), Error>;
}

Expand Down
4 changes: 2 additions & 2 deletions examples/ws_sub_with_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module
.register_subscription("sub_one_param", "unsub_one_param", |params, mut sink, _| {
.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, mut sink, _| {
let idx: usize = params.one()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS.chars().nth(idx));
Expand All @@ -72,7 +72,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
})
.unwrap();
module
.register_subscription("sub_params_two", "unsub_params_two", |params, mut sink, _| {
.register_subscription("sub_params_two", "params_two", "unsub_params_two", |params, mut sink, _| {
let (one, two): (usize, usize) = params.parse()?;
std::thread::spawn(move || loop {
let _ = sink.send(&LETTERS[one..two].to_string());
Expand Down
2 changes: 1 addition & 1 deletion examples/ws_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn main() -> anyhow::Result<()> {
async fn run_server() -> anyhow::Result<SocketAddr> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_subscription("subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
module.register_subscription("subscribe_hello", "s_hello", "unsubscribe_hello", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"hello my friend") {
return;
Expand Down
39 changes: 31 additions & 8 deletions proc-macros/src/attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use proc_macro2::{Span, TokenStream as TokenStream2, TokenTree};
use std::{fmt, iter};
use syn::parse::{Parse, ParseStream, Parser};
use syn::punctuated::Punctuated;
use syn::{spanned::Spanned, Attribute, Error, Token};
use syn::{spanned::Spanned, Attribute, Error, LitInt, LitStr, Token};

pub(crate) struct AttributeMeta {
pub path: syn::Path,
Expand All @@ -48,15 +48,22 @@ pub enum ParamKind {

#[derive(Debug, Clone)]
pub struct Resource {
pub name: syn::LitStr,
pub name: LitStr,
pub assign: Token![=],
pub value: syn::LitInt,
pub value: LitInt,
}

pub struct Aliases {
pub list: Punctuated<syn::LitStr, Token![,]>,
pub struct NameMapping {
pub name: String,
pub mapped: Option<String>,
}

pub struct Bracketed<T> {
pub list: Punctuated<T, Token![,]>,
}

pub type Aliases = Bracketed<LitStr>;

impl Parse for Argument {
fn parse(input: ParseStream) -> syn::Result<Self> {
let label = input.parse()?;
Expand Down Expand Up @@ -91,15 +98,31 @@ impl Parse for Resource {
}
}

impl Parse for Aliases {
impl Parse for NameMapping {
fn parse(input: ParseStream) -> syn::Result<Self> {
let name = input.parse::<LitStr>()?.value();

let mapped = if input.peek(Token![=>]) {
input.parse::<Token![=>]>()?;

Some(input.parse::<LitStr>()?.value())
} else {
None
};

Ok(NameMapping { name, mapped })
}
}

impl<T: Parse> Parse for Bracketed<T> {
fn parse(input: ParseStream) -> syn::Result<Self> {
let content;

syn::bracketed!(content in input);

let list = content.parse_terminated(Parse::parse)?;

Ok(Aliases { list })
Ok(Bracketed { list })
}
}

Expand Down Expand Up @@ -201,7 +224,7 @@ impl Argument {

/// Asserts that the argument is `key = "string"` and gets the value of the string
pub fn string(self) -> syn::Result<String> {
self.value::<syn::LitStr>().map(|lit| lit.value())
self.value::<LitStr>().map(|lit| lit.value())
}
}

Expand Down
12 changes: 11 additions & 1 deletion proc-macros/src/render_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ impl RpcDescription {
let rust_method_name = &sub.signature.sig.ident;
// Name of the RPC method to subscribe to (e.g. `foo_sub`).
let rpc_sub_name = self.rpc_identifier(&sub.name);
// Custom method name to use when sending notifs on the subscription if configured.
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
let maybe_custom_notif = sub.override_notif_method.as_ref().map(|m| self.rpc_identifier(m));
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
// Name of the RPC method to unsubscribe (e.g. `foo_sub`).
let rpc_unsub_name = self.rpc_identifier(&sub.unsubscribe);
// `parsing` is the code associated with parsing structure from the
Expand All @@ -184,8 +186,16 @@ impl RpcDescription {
check_name(&rpc_sub_name, rust_method_name.span());
check_name(&rpc_unsub_name, rust_method_name.span());

let notif_name = match maybe_custom_notif {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
Some(notif) => {
check_name(&notif, rust_method_name.span());
notif
}
None => rpc_sub_name.clone(),
};

handle_register_result(quote! {
rpc.register_subscription(#rpc_sub_name, #rpc_unsub_name, |params, sink, context| {
rpc.register_subscription(#rpc_sub_name, #notif_name, #rpc_unsub_name, |params, sink, context| {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
#parsing
context.as_ref().#rust_method_name(sink, #params_seq)
})
Expand Down
24 changes: 21 additions & 3 deletions proc-macros/src/rpc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
//! Declaration of the JSON RPC generator procedural macros.

use crate::{
attributes::{optional, parse_param_kind, Aliases, Argument, AttributeMeta, MissingArgument, ParamKind, Resource},
attributes::{
optional, parse_param_kind, Aliases, Argument, AttributeMeta, MissingArgument, NameMapping, ParamKind, Resource,
},
helpers::extract_doc_comments,
};

Expand Down Expand Up @@ -95,6 +97,9 @@ impl RpcMethod {
#[derive(Debug, Clone)]
pub struct RpcSubscription {
pub name: String,
/// By default the server will send out subscriptions `method` in `name` above
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
/// but it's possible to override it with another name that this field does.
pub override_notif_method: Option<String>,
pub docs: TokenStream2,
pub unsubscribe: String,
pub params: Vec<(syn::PatIdent, syn::Type)>,
Expand All @@ -111,7 +116,9 @@ impl RpcSubscription {
AttributeMeta::parse(attr)?.retain(["aliases", "item", "name", "param_kind", "unsubscribe_aliases"])?;

let aliases = parse_aliases(aliases)?;
let name = name?.string()?;
let map = name?.value::<NameMapping>()?;
let name = map.name;
let override_notif_method = map.mapped;
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
let item = item?.value()?;
let param_kind = parse_param_kind(param_kind)?;
let unsubscribe_aliases = parse_aliases(unsubscribe_aliases)?;
Expand All @@ -135,7 +142,18 @@ impl RpcSubscription {
// We've analyzed attributes and don't need them anymore.
sub.attrs.clear();

Ok(Self { name, unsubscribe, unsubscribe_aliases, params, param_kind, item, signature: sub, aliases, docs })
Ok(Self {
name,
override_notif_method,
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
unsubscribe,
unsubscribe_aliases,
params,
param_kind,
item,
signature: sub,
aliases,
docs,
})
}
}

Expand Down
12 changes: 12 additions & 0 deletions proc-macros/tests/ui/correct/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ pub trait Rpc {

#[subscription(name = "echo", aliases = ["ECHO"], item = u32, unsubscribe_aliases = ["NotInterested", "listenNoMore"])]
fn sub_with_params(&self, val: u32) -> RpcResult<()>;

// This will send notifications to the client with `method=subscribe_override`
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
#[subscription(name = "subscribe_method" => "subscribe_override", item = u32)]
fn sub_with_override_notif_method(&self) -> RpcResult<()>;
}

pub struct RpcServerImpl;
Expand Down Expand Up @@ -68,6 +72,10 @@ impl RpcServer for RpcServerImpl {
sink.send(&val)?;
sink.send(&val)
}

fn sub_with_override_notif_method(&self, mut sink: SubscriptionSink) -> RpcResult<()> {
sink.send(&1)
}
}

pub async fn websocket_server() -> SocketAddr {
Expand Down Expand Up @@ -102,4 +110,8 @@ async fn main() {
assert_eq!(first_recv, Some("Response_A".to_string()));
let second_recv = sub.next().await.unwrap();
assert_eq!(second_recv, Some("Response_B".to_string()));

let mut sub = client.sub_with_override_notif_method().await.unwrap();
let recv = sub.next().await.unwrap();
assert_eq!(recv, Some(1));
}
33 changes: 19 additions & 14 deletions tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
module.register_method("say_hello", |_, _| Ok("hello")).unwrap();

module
.register_subscription("subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"hello from subscription") {
break;
Expand All @@ -52,7 +52,7 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
.unwrap();

module
.register_subscription("subscribe_foo", "unsubscribe_foo", |_, mut sink, _| {
.register_subscription("subscribe_foo", "subscribe_foo", "unsubscribe_foo", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&1337) {
break;
Expand All @@ -64,21 +64,26 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle
.unwrap();

module
.register_subscription("subscribe_add_one", "unsubscribe_add_one", |params, mut sink, _| {
let mut count: usize = params.one()?;
std::thread::spawn(move || loop {
count = count.wrapping_add(1);
if let Err(Error::SubscriptionClosed(_)) = sink.send(&count) {
break;
}
std::thread::sleep(Duration::from_millis(100));
});
Ok(())
})
.register_subscription(
"subscribe_add_one",
"subscribe_add_one",
"unsubscribe_add_one",
|params, mut sink, _| {
let mut count: usize = params.one()?;
std::thread::spawn(move || loop {
count = count.wrapping_add(1);
if let Err(Error::SubscriptionClosed(_)) = sink.send(&count) {
break;
}
std::thread::sleep(Duration::from_millis(100));
});
Ok(())
},
)
.unwrap();

module
.register_subscription("subscribe_noop", "unsubscribe_noop", |_, mut sink, _| {
.register_subscription("subscribe_noop", "subscribe_noop", "unsubscribe_noop", |_, mut sink, _| {
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(1));
sink.close("Server closed the stream because it was lazy")
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ async fn ws_server_should_stop_subscription_after_client_drop() {
let mut module = RpcModule::new(tx);

module
.register_subscription("subscribe_hello", "unsubscribe_hello", |_, mut sink, mut tx| {
.register_subscription("subscribe_hello", "subscribe_hello", "unsubscribe_hello", |_, mut sink, mut tx| {
tokio::spawn(async move {
let close_err = loop {
if let Err(Error::SubscriptionClosed(err)) = sink.send(&1) {
Expand Down
2 changes: 1 addition & 1 deletion tests/tests/proc_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ async fn multiple_blocking_calls_overlap() {

#[tokio::test]
async fn subscriptions_do_not_work_for_http_servers() {
let htserver = HttpServerBuilder::default().build("127.0.0.1:0".parse().unwrap()).unwrap();
let htserver = HttpServerBuilder::default().build("127.0.0.1:0").unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let addr = htserver.local_addr().unwrap();
let htserver_url = format!("http://{}", addr);
let _handle = htserver.start(RpcServerImpl.into_rpc()).unwrap();
Expand Down
12 changes: 7 additions & 5 deletions utils/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// use jsonrpsee_utils::server::rpc_module::RpcModule;
///
/// let mut ctx = RpcModule::new(99_usize);
/// ctx.register_subscription("sub", "unsub", |params, mut sink, ctx| {
/// ctx.register_subscription("sub", "sub_notif", "unsub", |params, mut sink, ctx| {
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
/// let x: usize = params.one()?;
/// std::thread::spawn(move || {
/// let sum = x + (*ctx);
Expand All @@ -541,6 +541,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
pub fn register_subscription<F>(
&mut self,
subscribe_method_name: &'static str,
notif_method_name: &'static str,
unsubscribe_method_name: &'static str,
callback: F,
) -> Result<(), Error>
Expand All @@ -554,6 +555,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {

self.methods.verify_method_name(subscribe_method_name)?;
self.methods.verify_method_name(unsubscribe_method_name)?;

let ctx = self.ctx.clone();
let subscribers = Subscribers::default();

Expand All @@ -577,7 +579,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {

let sink = SubscriptionSink {
inner: method_sink.clone(),
method: subscribe_method_name,
method: notif_method_name,
subscribers: subscribers.clone(),
uniq_sub: SubscriptionKey { conn_id, sub_id },
is_connected: Some(conn_tx),
Expand Down Expand Up @@ -784,7 +786,7 @@ mod tests {
fn rpc_context_modules_can_register_subscriptions() {
let cx = ();
let mut cxmodule = RpcModule::new(cx);
let _subscription = cxmodule.register_subscription("hi", "goodbye", |_, _, _| Ok(()));
let _subscription = cxmodule.register_subscription("hi", "hi", "goodbye", |_, _, _| Ok(()));

assert!(cxmodule.method("hi").is_some());
assert!(cxmodule.method("goodbye").is_some());
Expand Down Expand Up @@ -922,7 +924,7 @@ mod tests {
async fn subscribing_without_server() {
let mut module = RpcModule::new(());
module
.register_subscription("my_sub", "my_unsub", |_, mut sink, _| {
.register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| {
let mut stream_data = vec!['0', '1', '2'];
std::thread::spawn(move || loop {
tracing::debug!("This is your friendly subscription sending data.");
Expand Down Expand Up @@ -956,7 +958,7 @@ mod tests {
async fn close_test_subscribing_without_server() {
let mut module = RpcModule::new(());
module
.register_subscription("my_sub", "my_unsub", |_, mut sink, _| {
.register_subscription("my_sub", "my_sub", "my_unsub", |_, mut sink, _| {
std::thread::spawn(move || loop {
if let Err(Error::SubscriptionClosed(_)) = sink.send(&"lo") {
return;
Expand Down
Loading