Skip to content

Commit

Permalink
fix(netbench): correctly handle stream shutdown in duplex (#1898)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Aug 8, 2023
1 parent 1ff2cd2 commit bddeb95
Showing 1 changed file with 20 additions and 11 deletions.
31 changes: 20 additions & 11 deletions netbench/netbench/src/duplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,16 @@ impl<T: AsyncRead + AsyncWrite> super::Connection for Connection<T> {
Poll::Ready(result) => {
sent += result? as u64;
}
Poll::Pending if sent == 0 => {
return Poll::Pending;
}
Poll::Pending => {
break;
}
}
}

if sent == 0 {
return Poll::Pending;
}

// if the whole buffer was accepted, make sure it's flushed to the socket
if sent == bytes {
if let Poll::Ready(res) = self.inner.as_mut().poll_flush(cx) {
res?;
Expand All @@ -125,22 +125,31 @@ impl<T: AsyncRead + AsyncWrite> super::Connection for Connection<T> {

match self.inner.as_mut().poll_read(cx, &mut buf) {
Poll::Ready(_) => {
if buf.filled().is_empty() && self.stream_opened {
// we got at least one byte back so loop around and try to get some more
if !buf.filled().is_empty() {
received += buf.filled().len() as u64;
continue;
}

// when we get 0 bytes, it means we don't have any more data so close the
// stream
if self.stream_opened {
self.close_stream()?;
return Ok(0).into();
}
received += buf.filled().len() as u64;

break;
}
// we didn't get any data on any iterations so we're pending
Poll::Pending if received == 0 => {
return Poll::Pending;
}
// we got at least one byte previously so return that
Poll::Pending => {
break;
}
}
}

if received == 0 {
return Poll::Pending;
}

Ok(received).into()
}

Expand Down

0 comments on commit bddeb95

Please sign in to comment.