-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(websocket client): drop subscriptions that can't keep up with the internal buffer size #166
Conversation
This commit changes the behavior in the `WebSocket Client` where each subscription channel is used in a non-blocking matter until it is determined as full or disconnected. When that occurs the channel is simply dropped and when the user `poll` the subscription it will return all sent subscriptions before it was and terminate (return None) once it's polled one last time. Similarly as `Streams` works in Rust. It also adds configuration for the `WebSocket Client` to configure capacity for the different internal channels to avoid filling the buffers when it's not expected.
Either::Right(Err(e)) => { | ||
// TODO: https://github.com/paritytech/jsonrpsee/issues/67 | ||
log::error!("Client Error: {:?}", e); | ||
log::error!("Client Error: {:?} terminating connection", e); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated, but it closes the event loop once an error is received.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the sending ends of the channels going to shut down gracefully when we drop the receivers here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, the senders are not gracefully terminated but once the receiver
is dropped here all senders will receive None -> Error::Internal once any of the operations (method call, notification, subscription) is invoked.
Thus, not possible for the user to know the exact failure reason without another channel or message (but we have logs lol)
EDIT:
I can revert this change if you want and handle it in a separate PR, I think it would make sense to incorporate this with https://github.com/paritytech/jsonrpsee/pull/134/files and fix better error messages for it.
Then add some similar tests that we have for the HTTP
such as an invalid request ID and so on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Up to you; it's fine to keep here too.
Make the repo structure more understable w.r.t testing.
…ch/jsonrpsee into v2-ws-client-drop-subscriptions
Ensure that if more than the requested channel buffer capacity is exceeded it should not deadlock. Such as spawning alot of concurrent requests, notifications or new subscriptions.
/// | ||
/// Ignores any malformed packet. | ||
pub async fn next(&mut self) -> Notif { | ||
pub async fn next(&mut self) -> Option<Notif> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the major change in this PR.
/// Configuration. | ||
pub struct Config { | ||
/// Backend channel for serving requests and notifications. | ||
pub request_channel_capacity: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this becomes useless in practice because we clone the sender every time, see PR description for more info.
Either::Right(Err(e)) => { | ||
// TODO: https://github.com/paritytech/jsonrpsee/issues/67 | ||
log::error!("Client Error: {:?}", e); | ||
log::error!("Client Error: {:?} terminating connection", e); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Up to you; it's fine to keep here too.
@@ -0,0 +1 @@ | |||
#![cfg(test)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there code missing here perhaps? There was a test removed in ws/tests.rs
– was it meant to be moved here? Or is it all in integration_tests
?
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment on why these tests are removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, some of the tests here were not really "unit tests" so I moved them to integration tests
instead. Thus, needed both the "non-mocked" server and client.
let notifs_tx = match active_subscriptions.get_mut(&request_id) { | ||
None => { | ||
log::debug!("Invalid subscription response: {:?}", request_id); | ||
continue; | ||
} | ||
Some((notifs_tx, _)) => notifs_tx, | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a few questions and nitpicks, but overall lgtm. :)
Co-authored-by: David <dvdplm@gmail.com>
* update http example * ungeneric crate * update dependencies * [client]: add WebSocket client again. * [deps]: remove needless dev dependencies * nits: forgot to commit new files * [ws client]: `send_text` instead of `send_binary` This is temporary fix to work with the `server` which assumes that `WebSocket` resonses are `text` * chore: add a bunch of more logging * [ws]: port tests but some are failing. * chore: fmt * [server API]: expose `fn local_addr` as public API. * [tests]: make them less ugly by using `127.0.0.1:0` * fix: a bunch of compiler warnings. * [api]: uniform naming, `bind -> new` in transport. * [websocket server]: reply when deserial fails When the server receives an request with invalid JSON `-32700, Parse error shall be returned` * chore: fmt * [tests]: fix remaining tests * [websocket server]: support `binary` and `text` * [ws server]: fix bug in subscription response. `.await` was missing in RegisteredSubscription::send() and no responses were actually sent which this commit fixes. * [client API]: export `WsSubscription` * [examples]: use `localhost` instead of `127.0.0.1` Hostname is required when using `wss` and `127.0.0.1` is not valid hostname. * [examples]: add subscription example. * chore: fmt * [ws server]: fix bug register new subscription. Fixes newly introduced bug that causes `register_subscription` to have side-effects even if the subscription fails. * fmt * more uniform logs * [ws server]: simple subscription test. * [ws server]: subscription tests improved. * [tests]: extract test helpers to a separate crate (#125) * [ws server]: don't close connection when `deserialization` fails (#131) * [ws server]: don't close conn. when `deser` fails * Update src/ws/transport.rs * grumbles: prefer matching of if else. * chore: CI warn `intra_doc_link_resolution_failure` (#139) Since we have not updated the documentation properly it's annoying that the entire job fails. * chore: rustfmt.toml (#138) * chore: add `rustfmt.toml` for formatting * style: `cargo fmt --all` with new config * [server raw params]: fix debug implementation (#137) * [server]: simply raw params impl Use debug implementation of `common::Params` instead of doing something similar that doesn't work properly. * [raw params]: derive `Debug` impl. * [ws server]: parse subscription ID for unsubscription instead of hardcoding `JsonValue::Null` (#136) * [ws server]: fix broken unsubscribe. Try to parse the subscription ID as the first element of an Array or the `subscription` field of an Object/Map. If both of those fails then regard it as a error. * fmt * fix grumbles: remove space indentation * fix(ws server): sub/unsubscribe to same method should generate an error (#140) * fix(ws server): sub/unsubscribe to same method err Subscribe and unsubscribe to the same method should generate an error, which this commit fixed. This bug was introduced by myself in fc87889 * Update src/ws/server.rs Co-authored-by: David <dvdplm@gmail.com> Co-authored-by: David <dvdplm@gmail.com> * chore: add naive benches for request/response (#142) Co-authored-by: Niklas <niklasadolfsson1@gmail.com> * fix(ws server): remove faulty debug_assert (#145) The code assumed that `subscription id` is still in `active_subscriptions` when the connection was dropped. The list of subscriptions (kept in raw server) are not notified when a client dropped its subscription/unsubscribed thus it's possible that the actual subscriptions are closed before the entire client was dropped. * ci(benches): cargo check on benches. (#146) * fix(http client): implement `clone` uniform API. (#147) * chore(deps): update `futures v0.3.7` (#148) * chore(deps): update remaining crates (#149) * chore(deps): update `futures v0.3.7` * chore(deps): bump the rest of deps * Improve HTTP client background thread (#150) * refactor: resultify API + some crate reorg (#144) * [ws client]: resultify API and fix subscribe. * The commit changes the API to return `Err` when it's possible and to not ignore underlying errors. * Fix that `fn subscribe` doesn't accept the subscription and unsubscription to be same which causes errors in the server. * nits: Err::SubscriptionMetod -> Err::Subscription * refactor(client): common error type * refactor(http client): resultify * refactor(common): rename common -> types.. This commit renames the `common module` to `types` and tries to distinguish the types that is directly related to the `JSON-RPC v2 specification` from others. Somethings are a little big sloppy named as naming is hard. Also, as bonus a removed a bunch of needless stuff in http server related to subscription. * Update src/ws/tests.rs * style: cargo fmt * fix(grumble): matches -> assert(matches) * fix(grumbles): `jsonrpc_v2` -> `jsonrpc` * fix(nit): remove unused code. * fix(benches): make it compile again. * style: cargo fmt * fix nits (#151) * fix(ws client): send binary (1 byte less payload) * docs(ws server): fix bad comment. * chore: add `editorconfig` (#152) * chore: make `debug log` less verbose. (#153) * chore: make `debug log` less verbose. The debug logging was just too verbose and this commit simplies it as follows: ``` DEBUG recv: {"jsonrpc":"2.0","method":"<METHOD>","params":<PARAMS>,"id":<ID>} DEBUG send: {"jsonrpc":"2.0","result":"<RESULT>","id":<ID>} ``` * style: cargo fmt * fix: missed logs * [jsonrpc types]: implement Display for Request/Response (#160) * feat(jsonrpc response/request): impl `Display` * refactor(logging): use display impl * use serde_json for verbosity * [http client]: refactor with "syncronous-like" design (#156) * experimental * ci(benches): sync and concurrent roundtrips Improve benchmarks to take concurrent requests into account. * ci(benches): sync and concurrent roundtrips Improve benchmarks to take concurrent requests into account. * fix(nits) * feat(http client): limit max request body size * test(http transport): request limit test * test(http client): add tests. * fix typo * fix(benches): make it compile again. * fix(ws example): revert unintentional change. * test(http client): subscription response on call. * fix(cleanup) * fix(benches): make it compile again. * Update src/client/http/transport.rs * fix(http client): `&str` -> `AsRef<str>` * docs(client types): better docs for Mismatch type. * style: `Default::default` -> `HttpConfig::default` * fix(http client): read body size from header. Expermential to read number of bytes from `HTTP Content Length` to pre-allocate the number of bytes and bail early if the length is bigger than the `max_request_body size` Need to be benched with bigger requests. * test(raw http): enable tests to works again. * style: cargo fmt * benches: address grumbles * feat(jsonrpc response/request): impl `Display` * refactor(logging): use display impl * fix(http client): nits. * Update benches/benches.rs * fix bad merge. * chore(deps): update dependencies. (#164) * feat(http server): configurable request body limit (#162) * feat(http server): configurable request body limit * refactor(crate reorg): to have shared http helpers. * Merge client and server errors. * Move `http_server_utils` to `utils/http` * Minor cleanup * fix nits * fix(hyper helper): u64 -> u32 * Update src/utils/http/hyper_helpers.rs Co-authored-by: David <dvdplm@gmail.com> * Update src/utils/http/hyper_helpers.rs Co-authored-by: David <dvdplm@gmail.com> * fix: grumbles * Update src/utils/http/hyper_helpers.rs Co-authored-by: David <dvdplm@gmail.com> * Update src/http/server.rs Co-authored-by: David <dvdplm@gmail.com> Co-authored-by: David <dvdplm@gmail.com> * ci: remove nightly (#167) Use stabilized `broken_intra_doc_links` instead of `intra_doc_link_resolution_failure` * fix(websocket client): drop subscriptions that can't keep up with the internal buffer size (#166) * fix(ws client): drop subscriptions when full. This commit changes the behavior in the `WebSocket Client` where each subscription channel is used in a non-blocking matter until it is determined as full or disconnected. When that occurs the channel is simply dropped and when the user `poll` the subscription it will return all sent subscriptions before it was and terminate (return None) once it's polled one last time. Similarly as `Streams` works in Rust. It also adds configuration for the `WebSocket Client` to configure capacity for the different internal channels to avoid filling the buffers when it's not expected. * tests(ws client): simple subscription test. * fix: nits * Update src/client/ws/client.rs * refactor(tests): introduce integration_tests Make the repo structure more understable w.r.t testing. * chore(license): add missing license headers * Update src/client/ws/client.rs * Update src/client/ws/client.rs * style: remove unintended spaces. * tests: add concurrent deadlock test Ensure that if more than the requested channel buffer capacity is exceeded it should not deadlock. Such as spawning alot of concurrent requests, notifications or new subscriptions. * Update src/client/ws/client.rs * fix: review grumbles * fix nits: `remove needless closure` * fix: cargo fmt * Update src/client/ws/client.rs Co-authored-by: David <dvdplm@gmail.com> * fix more nits Co-authored-by: David <dvdplm@gmail.com> * fix(ws client): embed request id in `SubscriptionClosed` (#170) * fix(ws client): embed request id SubscriptClosed Fixes #169 * Update src/client/ws/client.rs * Update src/client/ws/client.rs Co-authored-by: David <dvdplm@gmail.com> * Update src/client/ws/client.rs Co-authored-by: David <dvdplm@gmail.com> Co-authored-by: David <dvdplm@gmail.com> * chore(deps): bump dependencies (#172) * [ws client]: add tests (#134) * [test utils]: add `internal_err` and consts [errors]: unify client/server errors [test utils]: fake WebSocket jsonrpc server [ws client]: export errors [ws client]: add some basic tests * fmt * remove log target * fix nits * [ws client]: add subscription test * revert unintendend changes. * fmt * [ws client]: fix panic in tests * cleanup * tests(ws client): test for invalid request ID. * fix nits * [ws client]: kill raw client (#171) * getting started * WIP WIP * cleanup * cleanup v2 * cleanup v3 * perf: use BufReader BufWriter * fix(request manager): resultify insert API The rationale behind this change is that the `insert_methods` takes ownership of the `send_back_oneshot` and if the operation fails it should be propagated the frontend. So returning the `Err(send_back_oneshot)` if it fails makes it possible. * fix nits * examples(ws): revert changes * Update tests/integration_tests.rs * nits: fix unwraps * Update src/client/ws/manager.rs Co-authored-by: David <dvdplm@gmail.com> * Update src/client/ws/transport.rs Co-authored-by: David <dvdplm@gmail.com> * Update src/client/ws/client.rs Co-authored-by: David <dvdplm@gmail.com> * fix build * refactor: simplify `Error::InvalidRequestId` It was hard to use when the expected id is not known. * fix(ws client): error handling. * fix(grumble error type): better error message. * fix(grumble): docs `JSONRPC WebSocket transport` * fix(ws manager): fix grumbles. * Add better documentation * Rename methods. * Add `proof` to unreachable! * fix(ws manager): fix nit in docs. * fix(grumbles): ws client * fix more nits * fix compile warning: export websocket transports. * Update src/client/ws/manager.rs Co-authored-by: David <dvdplm@gmail.com> * deps: tokio 1.0 and hyper 0.14 (#176) * deps: tokio 1.0 and hyper 0.14 * Update Cargo.toml * refactor: crate re-organization with separate crates (#177) * [ci]: feature `http` and `ws` removed. * refactor: re-org crate with smaller crates. * fmt * [ci]: remove default features Currently there are no features in the crates, so that check is not needed. * [http client]: remove unused dependency tokio * docs(http client): fix nits tokio 0.2 -> tokio 1.0 (#178) * docs(http client): tokio 0.2 -> tokio 1.0 * fix: better link * [ci]: github actions (#179) * docs(http client): tokio 0.2 -> tokio 1.0 * [ci]: remove travis * [ci]: add github actions. * [ci]: fix identation nits * [ci]: use cache for actions * [ci]: filter to clippy * [ci]: remove hacks * separate action for ci and benchmarks * [ci]: tweak to run on master branch. * examples/subscription -> examples/ws_subscription * force CI Co-authored-by: David <dvdplm@gmail.com> Co-authored-by: Maciej Hirsz <1096222+maciejhirsz@users.noreply.github.com> Co-authored-by: Atkins <atkinschang@gmail.com>
Closes #143
The major change in this PR it's using Sender::try_send for the internal message passing channel for each subscription within the client which will drop the channel if the buffer gets full.
Thus, I had to change the API a little bit for
Subscriptions
to behave asStreams
in Rust that now returnsNone
if the channel is full or is disconnected instead of waiting for successful responses.The capacity for the internal buffers are now user-configurable
NOTE
This PR doesn't use Sender::try_send for the tasks that depend on remote responses (method calls, notifications, and register "new" subscriptions). That's pointless because the sender is "cloned" because it takes
immutable ref
, (this we could maybe change to &mut self). ThusSender::clone
increases the buffer size by one and it may diverge significantly if the many requests, notifications are performed "concurrently".TLDR; It shouldn't deadlock for those but it might allow more
buffer size
than "configured"