Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rpc module): stream API for SubscriptionSink #639

Merged
merged 24 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
72024ea
feat(rpc module): add_stream to subscription sink
niklasad1 Jan 5, 2022
fe176df
fix some nits
niklasad1 Jan 5, 2022
eebc73d
Merge remote-tracking branch 'origin/master' into na-hacky-sink-add-s…
niklasad1 Jan 5, 2022
a29f988
unify parameters to rpc methods
niklasad1 Jan 12, 2022
6aa22e2
Update core/src/server/rpc_module.rs
niklasad1 Jan 12, 2022
9bdea0d
Update tests/tests/integration_tests.rs
niklasad1 Jan 13, 2022
79a8e55
address grumbles
niklasad1 Jan 13, 2022
7e81acf
fix subscription tests
niklasad1 Jan 13, 2022
6982598
new type for `SubscriptionCallback` and glue code
niklasad1 Jan 17, 2022
d589d24
remove unsed code
niklasad1 Jan 17, 2022
92bb97e
remove todo
niklasad1 Jan 18, 2022
b9598ca
add missing feature tokio/macros
niklasad1 Jan 18, 2022
bce48da
make `add_stream` cancel-safe
niklasad1 Jan 18, 2022
6d16927
rename add_stream and return status
niklasad1 Jan 19, 2022
a47a965
fix nits
niklasad1 Jan 19, 2022
07f80e2
rename stream API -> streamify
niklasad1 Jan 19, 2022
bedf808
Update core/src/server/rpc_module.rs
niklasad1 Jan 19, 2022
03ef669
provide proper close reason
niklasad1 Jan 19, 2022
ef7b965
Merge remote-tracking branch 'origin/na-hacky-sink-add-stream' into n…
niklasad1 Jan 19, 2022
f7aa544
spelling
niklasad1 Jan 19, 2022
72f4726
consume_and_streamify + docs
niklasad1 Jan 20, 2022
7a20a52
fmt
niklasad1 Jan 20, 2022
de4ac6a
rename API pipe_from_stream
niklasad1 Jan 20, 2022
fa15cfb
improve logging; indicate which subscription method that failed
niklasad1 Jan 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ anyhow = "1"
arrayvec = "0.7.1"
async-trait = "0.1"
beef = { version = "0.5.1", features = ["impl_serde"] }
async-channel = { version = "1.6", optional = true }
thiserror = "1"
futures-channel = { version = "0.3.14", default-features = false }
futures-util = { version = "0.3.14", default-features = false, optional = true }
Expand All @@ -29,6 +30,7 @@ tokio = { version = "1.8", features = ["rt"], optional = true }
default = []
http-helpers = ["futures-util"]
server = [
"async-channel",
"futures-util",
"rustc-hash",
"tracing",
Expand Down
2 changes: 1 addition & 1 deletion core/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::io;

use crate::{to_json_raw_value, Error};
use futures_channel::mpsc;
use futures_util::stream::StreamExt;
use futures_util::StreamExt;
use jsonrpsee_types::error::{
CallError, ErrorCode, ErrorObject, ErrorResponse, CALL_EXECUTION_FAILED_CODE, OVERSIZED_RESPONSE_CODE,
OVERSIZED_RESPONSE_MSG, UNKNOWN_ERROR_CODE,
Expand Down
206 changes: 93 additions & 113 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ use crate::server::helpers::MethodSink;
use crate::server::resource_limiting::{ResourceGuard, ResourceTable, ResourceVec, Resources};
use crate::to_json_raw_value;
use crate::traits::{IdProvider, ToRpcParams};
use beef::Cow;
use futures_channel::{mpsc, oneshot};
use futures_util::{future::BoxFuture, FutureExt, StreamExt};
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
use jsonrpsee_types::error::{invalid_subscription_err, ErrorCode, CALL_EXECUTION_FAILED_CODE};
use jsonrpsee_types::{
Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload, SubscriptionResponse,
Expand All @@ -51,16 +50,35 @@ use serde::{de::DeserializeOwned, Serialize};
/// implemented as a function pointer to a `Fn` function taking four arguments:
/// the `id`, `params`, a channel the function uses to communicate the result (or error)
/// back to `jsonrpsee`, and the connection ID (useful for the websocket transport).
pub type SyncMethod = Arc<dyn Send + Sync + Fn(Id, Params, &MethodSink, ConnectionId, &dyn IdProvider) -> bool>;
pub type SyncMethod = Arc<dyn Send + Sync + Fn(Id, Params, &MethodSink) -> bool>;
/// Similar to [`SyncMethod`], but represents an asynchronous handler and takes an additional argument containing a [`ResourceGuard`] if configured.
pub type AsyncMethod<'a> = Arc<
dyn Send + Sync + Fn(Id<'a>, Params<'a>, MethodSink, Option<ResourceGuard>, &dyn IdProvider) -> BoxFuture<'a, bool>,
dyn Send + Sync + Fn(Id<'a>, Params<'a>, MethodSink, ConnectionId, Option<ResourceGuard>) -> BoxFuture<'a, bool>,
>;
/// Method callback for subscriptions.
pub type SubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, &MethodSink, ConnState) -> bool>;

/// Connection ID, used for stateful protocol such as WebSockets.
/// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value.
pub type ConnectionId = usize;
/// Raw RPC response.
pub type RawRpcResponse = (String, mpsc::UnboundedReceiver<String>, mpsc::UnboundedSender<String>);
pub type RawRpcResponse = (String, mpsc::UnboundedReceiver<String>, async_channel::Sender<()>);

/// Data for stateful connections.
pub struct ConnState<'a> {
/// Connection ID
pub conn_id: ConnectionId,
/// Channel to know whether the connection is closed or not.
pub close: async_channel::Receiver<()>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: this panics if count > usize::MAX / 2

but if we reach that we likely have other problems such as OOM :)

/// ID provider.
pub id_provider: &'a dyn IdProvider,
}

impl<'a> std::fmt::Debug for ConnState<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConnState").field("conn_id", &self.conn_id).field("close", &self.close).finish()
}
}

type Subscribers = Arc<Mutex<FxHashMap<SubscriptionKey, (MethodSink, oneshot::Receiver<()>)>>>;

Expand All @@ -73,11 +91,13 @@ struct SubscriptionKey {

/// Callback wrapper that can be either sync or async.
#[derive(Clone)]
enum MethodKind {
pub enum MethodKind {
/// Synchronous method handler.
Sync(SyncMethod),
/// Asynchronous method handler.
Async(AsyncMethod<'static>),
/// Subscription method handler
Subscription(SubscriptionMethod),
}

/// Information about resources the method uses during its execution. Initialized when the the server starts.
Expand Down Expand Up @@ -144,6 +164,13 @@ impl MethodCallback {
MethodCallback { callback: MethodKind::Async(callback), resources: MethodResources::Uninitialized([].into()) }
}

fn new_subscription(callback: SubscriptionMethod) -> Self {
MethodCallback {
callback: MethodKind::Subscription(callback),
resources: MethodResources::Uninitialized([].into()),
}
}

/// Attempt to claim resources prior to executing a method. On success returns a guard that releases
/// claimed resources when dropped.
pub fn claim(&self, name: &str, resources: &Resources) -> Result<ResourceGuard, Error> {
Expand All @@ -153,50 +180,9 @@ impl MethodCallback {
}
}

/// Execute the callback, sending the resulting JSON (success or error) to the specified sink.
pub fn execute(
&self,
sink: &MethodSink,
req: Request<'_>,
conn_id: ConnectionId,
claimed: Option<ResourceGuard>,
id_gen: &dyn IdProvider,
) -> MethodResult<bool> {
let id = req.id.clone();
let params = Params::new(req.params.map(|params| params.get()));

let result = match &self.callback {
MethodKind::Sync(callback) => {
tracing::trace!(
"[MethodCallback::execute] Executing sync callback, params={:?}, req.id={:?}, conn_id={:?}",
params,
id,
conn_id
);

let result = (callback)(id, params, sink, conn_id, id_gen);

// Release claimed resources
drop(claimed);

MethodResult::Sync(result)
}
MethodKind::Async(callback) => {
let sink = sink.clone();
let params = params.into_owned();
let id = id.into_owned();
tracing::trace!(
"[MethodCallback::execute] Executing async callback, params={:?}, req.id={:?}, conn_id={:?}",
params,
id,
conn_id
);

MethodResult::Async((callback)(id, params, sink, claimed, id_gen))
}
};

result
/// Get handle to the callback.
pub fn inner(&self) -> &MethodKind {
&self.callback
}
}

Expand All @@ -205,6 +191,7 @@ impl Debug for MethodKind {
match self {
Self::Async(_) => write!(f, "Async"),
Self::Sync(_) => write!(f, "Sync"),
Self::Subscription(_) => write!(f, "Subscription"),
}
}
}
Expand Down Expand Up @@ -306,51 +293,6 @@ impl Methods {
self.callbacks.get_key_value(method_name).map(|(k, v)| (*k, v))
}

/// Attempt to execute a callback, sending the resulting JSON (success or error) to the specified sink.
pub fn execute(
&self,
sink: &MethodSink,
req: Request,
conn_id: ConnectionId,
id_gen: &dyn IdProvider,
) -> MethodResult<bool> {
tracing::trace!("[Methods::execute] Executing request: {:?}", req);
match self.callbacks.get(&*req.method) {
Some(callback) => callback.execute(sink, req, conn_id, None, id_gen),
None => {
sink.send_error(req.id, ErrorCode::MethodNotFound.into());
MethodResult::Sync(false)
}
}
}

/// Attempt to execute a callback while checking that the call does not exhaust the available resources,
// sending the resulting JSON (success or error) to the specified sink.
pub fn execute_with_resources<'r>(
&self,
sink: &MethodSink,
req: Request<'r>,
conn_id: ConnectionId,
resources: &Resources,
id_gen: &dyn IdProvider,
) -> Result<(&'static str, MethodResult<bool>), Cow<'r, str>> {
tracing::trace!("[Methods::execute_with_resources] Executing request: {:?}", req);
match self.callbacks.get_key_value(&*req.method) {
Some((&name, callback)) => match callback.claim(&req.method, resources) {
Ok(guard) => Ok((name, callback.execute(sink, req, conn_id, Some(guard), id_gen))),
Err(err) => {
tracing::error!("[Methods::execute_with_resources] failed to lock resources: {:?}", err);
sink.send_error(req.id, ErrorCode::ServerIsBusy.into());
Ok((name, MethodResult::Sync(false)))
}
},
None => {
sink.send_error(req.id, ErrorCode::MethodNotFound.into());
Err(req.method)
}
}
}

/// Helper to call a method on the `RPC module` without having to spin up a server.
///
/// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation.
Expand Down Expand Up @@ -425,14 +367,24 @@ impl Methods {
/// Wrapper over [`Methods::execute`] to execute a callback.
async fn inner_call(&self, req: Request<'_>) -> RawRpcResponse {
let (tx, mut rx) = mpsc::unbounded();
let sink = MethodSink::new(tx.clone());
let sink = MethodSink::new(tx);
let (close_tx, close_rx) = async_channel::unbounded();

if let MethodResult::Async(fut) = self.execute(&sink, req, 0, &RandomIntegerIdProvider) {
fut.await;
}
let id = req.id.clone();
let params = Params::new(req.params.map(|params| params.get()));

let _result = match self.method(&req.method).map(|c| &c.callback) {
None => todo!(),
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
Some(MethodKind::Sync(cb)) => (cb)(id, params, &sink),
Some(MethodKind::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), sink, 0, None).await,
Some(MethodKind::Subscription(cb)) => {
let conn_state = ConnState { conn_id: 0, close: close_rx, id_provider: &RandomIntegerIdProvider };
(cb)(id, params, &sink, conn_state)
}
};

let resp = rx.next().await.expect("tx and rx still alive; qed");
(resp, rx, tx)
(resp, rx, close_tx)
}

/// Helper to create a subscription on the `RPC module` without having to spin up a server.
Expand Down Expand Up @@ -527,7 +479,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
let ctx = self.ctx.clone();
let callback = self.methods.verify_and_insert(
method_name,
MethodCallback::new_sync(Arc::new(move |id, params, sink, _, _| match callback(params, &*ctx) {
MethodCallback::new_sync(Arc::new(move |id, params, sink| match callback(params, &*ctx) {
Ok(res) => sink.send_response(id, res),
Err(err) => sink.send_call_error(id, err),
})),
Expand All @@ -550,7 +502,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
let ctx = self.ctx.clone();
let callback = self.methods.verify_and_insert(
method_name,
MethodCallback::new_async(Arc::new(move |id, params, sink, claimed, _| {
MethodCallback::new_async(Arc::new(move |id, params, sink, _, claimed| {
let ctx = ctx.clone();
let future = async move {
let result = match callback(params, ctx).await {
Expand Down Expand Up @@ -585,7 +537,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
let ctx = self.ctx.clone();
let callback = self.methods.verify_and_insert(
method_name,
MethodCallback::new_async(Arc::new(move |id, params, sink, claimed, _| {
MethodCallback::new_async(Arc::new(move |id, params, sink, _, claimed| {
let ctx = ctx.clone();

tokio::task::spawn_blocking(move || {
Expand Down Expand Up @@ -671,12 +623,12 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
let subscribers = subscribers.clone();
self.methods.mut_callbacks().insert(
subscribe_method_name,
MethodCallback::new_sync(Arc::new(move |id, params, method_sink, conn_id, id_provider| {
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn| {
let (conn_tx, conn_rx) = oneshot::channel::<()>();

let sub_id = {
let sub_id: RpcSubscriptionId = id_provider.next_id().into_owned();
let uniq_sub = SubscriptionKey { conn_id, sub_id: sub_id.clone() };
let sub_id: RpcSubscriptionId = conn.id_provider.next_id().into_owned();
let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: sub_id.clone() };

subscribers.lock().insert(uniq_sub, (method_sink.clone(), conn_rx));

Expand All @@ -687,9 +639,10 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {

let sink = SubscriptionSink {
inner: method_sink.clone(),
close: conn.close,
method: notif_method_name,
subscribers: subscribers.clone(),
uniq_sub: SubscriptionKey { conn_id, sub_id },
uniq_sub: SubscriptionKey { conn_id: conn.conn_id, sub_id },
is_connected: Some(conn_tx),
};
if let Err(err) = callback(params, sink, ctx.clone()) {
Expand All @@ -710,7 +663,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
{
self.methods.mut_callbacks().insert(
unsubscribe_method_name,
MethodCallback::new_sync(Arc::new(move |id, params, sink, conn_id, _| {
MethodCallback::new_subscription(Arc::new(move |id, params, sink, conn| {
let sub_id = match params.one::<RpcSubscriptionId>() {
Ok(sub_id) => sub_id,
Err(_) => {
Expand All @@ -727,7 +680,11 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
};
let sub_id = sub_id.into_owned();

if subscribers.lock().remove(&SubscriptionKey { conn_id, sub_id: sub_id.clone() }).is_some() {
if subscribers
.lock()
.remove(&SubscriptionKey { conn_id: conn.conn_id, sub_id: sub_id.clone() })
.is_some()
{
sink.send_response(id, "Unsubscribed")
} else {
let err = to_json_raw_value(&format!(
Expand Down Expand Up @@ -764,6 +721,8 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
pub struct SubscriptionSink {
/// Sink.
inner: MethodSink,
/// Close
close: async_channel::Receiver<()>,
/// MethodCallback.
method: &'static str,
/// Unique subscription.
Expand All @@ -786,9 +745,30 @@ impl SubscriptionSink {
self.inner_send(msg).map_err(Into::into)
}

/// Consume the sink by passing a stream to be sent via the sink.
pub async fn add_stream<S, T>(mut self, mut stream: S)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about calling this into_stream? I think "add" implies there could be more than one and that it doesn't quite relay the information about the important changes that this call makes to the sink.

Copy link
Member Author

@niklasad1 niklasad1 Jan 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like add_stream either, but into_stream is not really great either it doesn't return the stream....

maybe run_stream, from_stream, spawn_stream or something else?!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair; I'd have liked as_stream but as_ is "taken" with different semantics so can't do that.

Of your suggestions I like from_stream the best.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this method consumes a stream, feeding the items into the subscription?

I guess I'd go with something like consume_stream or read_from_stream. into_, as_, and from_ all sortof feel like I should expect some result back from this call to me!

Copy link
Member Author

@niklasad1 niklasad1 Jan 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this method consumes a stream, feeding the items into the subscription?

yes

I guess we could return a type that impls Sink/SinkExt instead here to make it more readable and flexible i.e, to deal with errors and so on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamify()?
I think consume_stream is so-so. Yes, we do consume it, but that's not really the point. Rather we're "hooking up" the stream to the sink and leave it there for the duration of the subscription.
with_stream?

Copy link
Collaborator

@jsdw jsdw Jan 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipe, maybe? we're piping a stream into the subscription.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sink.pipe_from_stream? I quite like pipe!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like pipe_from_stream, let's settle for that?

where
S: Stream<Item = T> + Unpin,
T: Serialize,
{
loop {
tokio::select! {
Some(item) = stream.next() => {
if let Err(Error::SubscriptionClosed(_)) = self.send(&item) {
break;
}
},
// No messages should be sent over this channel (just ignore and continue)
Some(_) = self.close.next() => {},
// Stream or connection was dropped => close stream.
else => break,
}
}
}

/// Returns whether this channel is closed without needing a context.
pub fn is_closed(&self) -> bool {
self.inner.is_closed()
self.inner.is_closed() || self.close.is_closed()
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
}

fn build_message<T: Serialize>(&self, result: &T) -> Result<String, Error> {
Expand All @@ -806,7 +786,7 @@ impl SubscriptionSink {
self.inner.send_raw(msg).map_err(|_| Some(SubscriptionClosedReason::ConnectionReset))
}
Some(_) => Err(Some(SubscriptionClosedReason::Unsubscribed)),
// NOTE(niklasad1): this should be unreachble, after the first error is detected the subscription is closed.
// NOTE(niklasad1): this should be unreachable, after the first error is detected the subscription is closed.
None => Err(None),
};

Expand Down Expand Up @@ -850,15 +830,15 @@ impl Drop for SubscriptionSink {
/// Wrapper struct that maintains a subscription "mainly" for testing.
#[derive(Debug)]
pub struct Subscription {
tx: mpsc::UnboundedSender<String>,
tx: async_channel::Sender<()>,
rx: mpsc::UnboundedReceiver<String>,
sub_id: RpcSubscriptionId<'static>,
}

impl Subscription {
/// Close the subscription channel.
pub fn close(&mut self) {
self.tx.close_channel();
self.tx.close();
}

/// Get the subscription ID
Expand Down
Loading