diff --git a/CHANGELOG.md b/CHANGELOG.md index f4c9339f..6db29d3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,10 @@ - Update `parking_lot` dependency. See [PR 126]. +- Flush socket while waiting for next frame. See [PR 130]. + [PR 126]: https://github.com/libp2p/rust-yamux/pull/126 +[PR 130]: https://github.com/libp2p/rust-yamux/pull/130 # 0.10.0 diff --git a/src/connection.rs b/src/connection.rs index e87a76f2..c902835a 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -408,7 +408,6 @@ impl Connection { if let IoEvent::Inbound(frame) = next_io_event.await? { if let Some(stream) = self.on_frame(frame).await? { - self.flush_nowait().await.or(Err(ConnectionError::Closed))?; return Ok(Some(stream)); } continue; // The socket sink still has a pending write. @@ -432,7 +431,31 @@ impl Connection { num_terminated += 1; Either::Left(future::pending()) } else { - Either::Right(self.socket.next()) + // Poll socket for next incoming frame, but also make sure any pending writes are properly flushed. + let socket = &mut self.socket; + let mut flush_done = false; + let next_frame = future::poll_fn(move |cx| { + if let Poll::Ready(res) = socket.poll_next_unpin(cx) { + return Poll::Ready(res); + } + + // Prevent calling potentially heavy `flush` once it has completed. + if !flush_done { + match socket.poll_flush_unpin(cx) { + Poll::Ready(Ok(_)) => { + flush_done = true; + } + Poll::Ready(Err(err)) => { + return Poll::Ready(Some(Err(err.into()))); + } + Poll::Pending => {} + } + } + + Poll::Pending + }); + + Either::Right(next_frame) }; let mut next_stream_command = if self.stream_receiver.is_terminated() { @@ -476,12 +499,9 @@ impl Connection { if let Poll::Ready(frame) = frame { if let Some(stream) = self.on_frame(frame.transpose().map_err(Into::into)).await? { - self.flush_nowait().await.or(Err(ConnectionError::Closed))?; return Ok(Some(stream)); } } - - self.flush_nowait().await.or(Err(ConnectionError::Closed))?; } } @@ -962,15 +982,6 @@ impl Connection { } } - /// Try to flush the underlying I/O stream, without waiting for it. - async fn flush_nowait(&mut self) -> Result<()> { - future::poll_fn(|cx| { - let _ = self.socket.get_mut().poll_flush_unpin(cx)?; - Poll::Ready(Ok(())) - }) - .await - } - /// Remove stale streams and send necessary messages to the remote. /// /// If we ever get async destructors we can replace this with streams