Skip to content

Commit

Permalink
src/connection: Flush socket while waiting for next frame
Browse files Browse the repository at this point in the history
In #112, flushing is no longer awaited, but instead, is called at each iteration
of the next loop in Connection. Unfortunately, when using wasm-ext with Noise,
frames may never get written until the next iteration is triggered by another
event (incoming frame or stream/control events).

The fix here is to make sure that flushing the I/O socket gets progressed in the
main loop.
  • Loading branch information
appaquet authored Feb 25, 2022
1 parent 25e7d27 commit 0580101
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

- Update `parking_lot` dependency. See [PR 126].

- Flush socket while waiting for next frame. See [PR 130].

[PR 126]: https://github.com/libp2p/rust-yamux/pull/126
[PR 130]: https://github.com/libp2p/rust-yamux/pull/130

# 0.10.0

Expand Down
39 changes: 25 additions & 14 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {

if let IoEvent::Inbound(frame) = next_io_event.await? {
if let Some(stream) = self.on_frame(frame).await? {
self.flush_nowait().await.or(Err(ConnectionError::Closed))?;
return Ok(Some(stream));
}
continue; // The socket sink still has a pending write.
Expand All @@ -432,7 +431,31 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
num_terminated += 1;
Either::Left(future::pending())
} else {
Either::Right(self.socket.next())
// Poll socket for next incoming frame, but also make sure any pending writes are properly flushed.
let socket = &mut self.socket;
let mut flush_done = false;
let next_frame = future::poll_fn(move |cx| {
if let Poll::Ready(res) = socket.poll_next_unpin(cx) {
return Poll::Ready(res);
}

// Prevent calling potentially heavy `flush` once it has completed.
if !flush_done {
match socket.poll_flush_unpin(cx) {
Poll::Ready(Ok(_)) => {
flush_done = true;
}
Poll::Ready(Err(err)) => {
return Poll::Ready(Some(Err(err.into())));
}
Poll::Pending => {}
}
}

Poll::Pending
});

Either::Right(next_frame)
};

let mut next_stream_command = if self.stream_receiver.is_terminated() {
Expand Down Expand Up @@ -476,12 +499,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {

if let Poll::Ready(frame) = frame {
if let Some(stream) = self.on_frame(frame.transpose().map_err(Into::into)).await? {
self.flush_nowait().await.or(Err(ConnectionError::Closed))?;
return Ok(Some(stream));
}
}

self.flush_nowait().await.or(Err(ConnectionError::Closed))?;
}
}

Expand Down Expand Up @@ -962,15 +982,6 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
}
}

/// Try to flush the underlying I/O stream, without waiting for it.
async fn flush_nowait(&mut self) -> Result<()> {
future::poll_fn(|cx| {
let _ = self.socket.get_mut().poll_flush_unpin(cx)?;
Poll::Ready(Ok(()))
})
.await
}

/// Remove stale streams and send necessary messages to the remote.
///
/// If we ever get async destructors we can replace this with streams
Expand Down

0 comments on commit 0580101

Please sign in to comment.