Skip to content

Commit

Permalink
fix(webocket): Avoid panic when polling quicksink after errors
Browse files Browse the repository at this point in the history
Pull-Request: libp2p#5482.
  • Loading branch information
lexnv authored Jul 9, 2024
1 parent 02fa7c8 commit c19c140
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 27 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ libp2p-upnp = { version = "0.2.2", path = "protocols/upnp" }
libp2p-webrtc = { version = "0.7.1-alpha", path = "transports/webrtc" }
libp2p-webrtc-utils = { version = "0.2.1", path = "misc/webrtc-utils" }
libp2p-webrtc-websys = { version = "0.3.0-alpha", path = "transports/webrtc-websys" }
libp2p-websocket = { version = "0.43.1", path = "transports/websocket" }
libp2p-websocket = { version = "0.43.2", path = "transports/websocket" }
libp2p-websocket-websys = { version = "0.3.2", path = "transports/websocket-websys" }
libp2p-webtransport-websys = { version = "0.3.0", path = "transports/webtransport-websys" }
libp2p-yamux = { version = "0.45.2", path = "muxers/yamux" }
Expand Down
6 changes: 6 additions & 0 deletions transports/websocket/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.43.2

- fix: Avoid websocket panic on polling after errors. See [PR 5482].

[PR 5482]: https://github.com/libp2p/rust-libp2p/pull/5482

## 0.43.1

## 0.43.0
Expand Down
3 changes: 2 additions & 1 deletion transports/websocket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-websocket"
edition = "2021"
rust-version = { workspace = true }
description = "WebSocket transport for libp2p"
version = "0.43.1"
version = "0.43.2"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -21,6 +21,7 @@ pin-project-lite = "0.2.14"
rw-stream-sink = { workspace = true }
soketto = "0.8.0"
tracing = { workspace = true }
thiserror = "1.0.61"
url = "2.5"
webpki-roots = "0.25"

Expand Down
2 changes: 1 addition & 1 deletion transports/websocket/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ fn location_to_multiaddr<T>(location: &str) -> Result<Multiaddr, Error<T>> {
/// The websocket connection.
pub struct Connection<T> {
receiver: BoxStream<'static, Result<Incoming, connection::Error>>,
sender: Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>,
sender: Pin<Box<dyn Sink<OutgoingData, Error = quicksink::Error<connection::Error>> + Send>>,
_marker: std::marker::PhantomData<T>,
}

Expand Down
72 changes: 49 additions & 23 deletions transports/websocket/src/quicksink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,6 @@
// Ok::<_, io::Error>(stdout)
// });
// ```
//
// # Panics
//
// - If any of the [`Sink`] methods produce an error, the sink transitions
// to a failure state and none of its methods must be called afterwards or
// else a panic will occur.
// - If [`Sink::poll_close`] has been called, no other sink method must be
// called afterwards or else a panic will be caused.

use futures::{ready, sink::Sink};
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -102,6 +94,15 @@ enum State {
Failed,
}

/// Errors the `Sink` may return.
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error<E> {
#[error("Error while sending over the sink, {0}")]
Send(E),
#[error("The Sink has closed")]
Closed,
}

pin_project! {
/// `SinkImpl` implements the `Sink` trait.
#[derive(Debug)]
Expand All @@ -119,7 +120,7 @@ where
F: FnMut(S, Action<A>) -> T,
T: Future<Output = Result<S, E>>,
{
type Error = E;
type Error = Error<E>;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Expand All @@ -135,28 +136,27 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
Poll::Ready(Err(e))
Poll::Ready(Err(Error::Send(e)))
}
}
}
State::Closing => match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
Ok(_) => {
this.future.set(None);
*this.state = State::Closed;
panic!("SinkImpl::poll_ready called on a closing sink.")
Poll::Ready(Err(Error::Closed))
}
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
Poll::Ready(Err(e))
Poll::Ready(Err(Error::Send(e)))
}
},
State::Empty => {
assert!(this.param.is_some());
Poll::Ready(Ok(()))
}
State::Closed => panic!("SinkImpl::poll_ready called on a closed sink."),
State::Failed => panic!("SinkImpl::poll_ready called after error."),
State::Closed | State::Failed => Poll::Ready(Err(Error::Closed)),
}
}

Expand Down Expand Up @@ -193,7 +193,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
},
State::Flushing => {
Expand All @@ -207,7 +207,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
}
}
Expand All @@ -221,11 +221,10 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
},
State::Closed => return Poll::Ready(Ok(())),
State::Failed => panic!("SinkImpl::poll_flush called after error."),
State::Closed | State::Failed => return Poll::Ready(Err(Error::Closed)),
}
}
}
Expand Down Expand Up @@ -253,7 +252,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
},
State::Flushing => {
Expand All @@ -266,7 +265,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
}
}
Expand All @@ -280,11 +279,11 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
},
State::Closed => return Poll::Ready(Ok(())),
State::Failed => panic!("SinkImpl::poll_closed called after error."),
State::Failed => return Poll::Ready(Err(Error::Closed)),
}
}
}
Expand Down Expand Up @@ -347,4 +346,31 @@ mod tests {
assert_eq!(&expected[..], &actual[..])
});
}

#[test]
fn error_does_not_panic() {
task::block_on(async {
let sink = make_sink(io::stdout(), |mut _stdout, _action| async move {
Err(io::Error::new(io::ErrorKind::Other, "oh no"))
});

futures::pin_mut!(sink);

let result = sink.send("hello").await;
match result {
Err(crate::quicksink::Error::Send(e)) => {
assert_eq!(e.kind(), io::ErrorKind::Other);
assert_eq!(e.to_string(), "oh no")
}
_ => panic!("unexpected result: {:?}", result),
};

// Call send again, expect not to panic.
let result = sink.send("hello").await;
match result {
Err(crate::quicksink::Error::Closed) => {}
_ => panic!("unexpected result: {:?}", result),
};
})
}
}

0 comments on commit c19c140

Please sign in to comment.