Skip to content

Commit

Permalink
server: graceful shutdown check Incoming::Closed (#1216)
Browse files Browse the repository at this point in the history
* server: graceful shutdown check `Incoming::Closed`

* Update server/src/transport/ws.rs
  • Loading branch information
niklasad1 committed Oct 24, 2023
1 parent 83fd6b1 commit 5ef8a87
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions server/src/transport/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use jsonrpsee_types::error::{
use jsonrpsee_types::{ErrorObject, Id, InvalidRequest, Notification, Params, Request};
use soketto::connection::Error as SokettoError;
use soketto::data::ByteSlice125;
use soketto::Incoming;

use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::{IntervalStream, ReceiverStream};
Expand Down Expand Up @@ -535,10 +536,13 @@ async fn graceful_shutdown(
//
// The receiver is not cancel-safe such that it's used in a stream to enforce that.
let disconnect_stream = futures_util::stream::unfold((receiver, data), |(mut receiver, mut data)| async {
if let Err(SokettoError::Closed) = receiver.receive(&mut data).await {
None
} else {
Some(((), (receiver, data)))
match receiver.receive(&mut data).await {
Ok(Incoming::Closed(_)) | Err(SokettoError::Closed) => None,
Ok(Incoming::Data(_) | Incoming::Pong(_)) => Some(((), (receiver, data))),
Err(e) => {
tracing::warn!("Graceful shutdown got WebSocket error: {e} but polling until the connection is closed or all pending calls has been executed");
Some(((), (receiver, data)))
}
}
});

Expand Down

0 comments on commit 5ef8a87

Please sign in to comment.