Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return error from subscription callbacks #799

Merged
merged 72 commits into from
Jun 29, 2022
Merged

Conversation

lexnv
Copy link
Contributor

@lexnv lexnv commented Jun 20, 2022

Subscription callbacks are now returning errors for improved ergonimics.

The jsonrpsee proc-macro renders the provided subscription method with
an additional return type.

#[rpc(client, server, namespace = "foo")]
pub trait Rpc {
	#[subscription(name = "subscribe", item = String)]
	fn sub(&self);
}


#[async_trait]
impl RpcServer for RpcServerImpl {
	fn sub(&self, pending: PendingSubscription) -> ReturnTypeSubscription {
		Ok(())
	}

While at it, implement pipe_from_try_stream on the PendingSubscription for better ergonomics,
which wraps the pending.accept() and pipes from the provided stream.

Before

.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, pending, _| {
    let idx = match params.one::<usize>() {
        Ok(idx) => idx,
        _ => {
            pending.reject("some special error);
            return;
        }
    };
    tokio::spawn(async move {
        if let Some(sink) = pending.accept() {
             sink.pipe_from_try_stream(stream).await;
             // ...
        }
    });
    Ok(())
}

After

.register_subscription("sub_one_param", "sub_one_param", "unsub_one_param", |params, pending, _| {
    let idx = match params.one::<usize>()?;

    tokio::spawn(async move {
        pending.pipe_from_try_stream(stream).await;
        // ...
    });
    Ok(())
}

Closes #734, #735.

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
…usion

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
@lexnv lexnv requested a review from a team as a code owner June 20, 2022 17:14
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
@lexnv lexnv self-assigned this Jun 20, 2022
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
core/src/server/rpc_module.rs Outdated Show resolved Hide resolved
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
…ubscription`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
types/src/error.rs Outdated Show resolved Hide resolved
niklasad1
niklasad1 previously approved these changes Jun 21, 2022
Copy link
Member

@niklasad1 niklasad1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work

core/src/server/rpc_module.rs Outdated Show resolved Hide resolved
core/src/server/rpc_module.rs Outdated Show resolved Hide resolved
@jsdw
Copy link
Collaborator

jsdw commented Jun 28, 2022

Looks good; the unified sink stuff def feels nicer API wise!

My only real blocker is that I don't think we should have accept() and maybe_accept() (the difference is confusing and one call can express enough info to use in both places). The other comment was an internal API nit so doesn't matter so much.

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Copy link
Contributor

@Xanewok Xanewok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks like a great ergonomics boost, nice! Left some drive-by notes, don't want to comment on the internals yet.

@@ -156,9 +156,10 @@ impl MethodSink {

if let Err(err) = self.send_raw(json) {
tracing::warn!("Error sending response {:?}", err);
false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a simple two-variant enum could help understand what the bool returned from send_error specifically means

Copy link
Member

@niklasad1 niklasad1 Jun 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree I think it was designed like that to be easy convert to "success/or not" in the middleware but it's unrelated this PR.

Let's tackle it another PR.

core/src/server/rpc_module.rs Outdated Show resolved Hide resolved
core/src/server/rpc_module.rs Outdated Show resolved Hide resolved
Copy link
Member

@niklasad1 niklasad1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, some minor comments that are optional to fix.

Would be nice to have some abstraction that decouples the state of the SubscriptionSink which includes inner method sink, unsubscribe future and close_notify somehow.

I have been thinking that we could just use tokio::sync::mpsc::unbounded_channel instead futures::mpsc::unbounded_channel and tokio::sync::Notify as the sender in tokio has async fn closed which returns once the receiver has been dropped which would cancel each subscription on that connection.

That would simplify the SubscriptionSink slighty :P

lexnv and others added 5 commits June 29, 2022 17:02
Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
core/src/server/rpc_module.rs Outdated Show resolved Hide resolved
Copy link
Collaborator

@jsdw jsdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, well done!

@lexnv lexnv merged commit 98c23fc into master Jun 29, 2022
@lexnv lexnv deleted the 734_error_subscriptions branch June 29, 2022 16:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[rpc module]: return error from subscriptions to make error handling more ergonomic
4 participants