From 570d9d3eb42f524bb9e4dfb88d4f619ba4cc5798 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 21 Feb 2023 15:19:22 +1300 Subject: [PATCH 01/33] Add test for ACK backlog --- test-harness/tests/ack_backlog.rs | 256 ++++++++++++++++++++++++++++++ test-harness/tests/poll_api.rs | 9 +- 2 files changed, 262 insertions(+), 3 deletions(-) create mode 100644 test-harness/tests/ack_backlog.rs diff --git a/test-harness/tests/ack_backlog.rs b/test-harness/tests/ack_backlog.rs new file mode 100644 index 00000000..2f5dd931 --- /dev/null +++ b/test-harness/tests/ack_backlog.rs @@ -0,0 +1,256 @@ +use futures::channel::oneshot; +use futures::future::BoxFuture; +use futures::future::FutureExt; +use futures::stream::FuturesUnordered; +use futures::{future, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt}; +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; +use test_harness::bind; +use tokio::net::TcpStream; +use tokio_util::compat::TokioAsyncReadCompatExt; +use yamux::{Config, Connection, ConnectionError, Mode, Stream}; + +#[tokio::test] +async fn honours_ack_backlog_of_256() { + let _ = env_logger::try_init(); + + let (tx, rx) = oneshot::channel(); + + let (listener, address) = bind().await.expect("bind"); + + let server = async { + let socket = listener.accept().await.expect("accept").0.compat(); + let connection = Connection::new(socket, Config::default(), Mode::Server); + + Server::new(connection, rx).await + }; + + let client = async { + let socket = TcpStream::connect(address).await.expect("connect").compat(); + let connection = Connection::new(socket, Config::default(), Mode::Client); + + Client::new(connection, tx).await + }; + + let (server_processed, client_processed) = future::try_join(server, client).await.unwrap(); + + assert_eq!(server_processed, 257); + assert_eq!(client_processed, 257); +} + +enum Server { + Idle { + connection: Connection, + trigger: oneshot::Receiver<()>, + }, + Accepting { + connection: Connection, + worker_streams: FuturesUnordered>>, + streams_processed: usize, + }, + Poisoned, +} + +impl Server { + fn new(connection: Connection, trigger: oneshot::Receiver<()>) -> Self { + Server::Idle { + connection, + trigger, + } + } +} + +impl Future for Server +where + T: AsyncRead + AsyncWrite + Unpin, +{ + type Output = yamux::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + match mem::replace(this, Server::Poisoned) { + Server::Idle { + mut trigger, + connection, + } => match trigger.poll_unpin(cx) { + Poll::Ready(_) => { + *this = Server::Accepting { + connection, + worker_streams: Default::default(), + streams_processed: 0, + }; + continue; + } + Poll::Pending => { + *this = Server::Idle { + trigger, + connection, + }; + return Poll::Pending; + } + }, + Server::Accepting { + mut connection, + mut streams_processed, + mut worker_streams, + } => { + match connection.poll_next_inbound(cx)? { + Poll::Ready(Some(stream)) => { + worker_streams.push(pong_ping(stream).boxed()); + *this = Server::Accepting { + connection, + streams_processed, + worker_streams, + }; + continue; + } + Poll::Ready(None) => { + return Poll::Ready(Ok(streams_processed)); + } + Poll::Pending => {} + } + + match worker_streams.poll_next_unpin(cx)? { + Poll::Ready(Some(())) => { + streams_processed += 1; + *this = Server::Accepting { + connection, + streams_processed, + worker_streams, + }; + continue; + } + Poll::Ready(None) | Poll::Pending => {} + } + + *this = Server::Accepting { + connection, + streams_processed, + worker_streams, + }; + return Poll::Pending; + } + Server::Poisoned => unreachable!(), + } + } + } +} + +struct Client { + connection: Connection, + worker_streams: FuturesUnordered>>, + trigger: Option>, + streams_processed: usize, +} + +impl Client { + fn new(connection: Connection, trigger: oneshot::Sender<()>) -> Self { + Self { + connection, + trigger: Some(trigger), + worker_streams: FuturesUnordered::default(), + streams_processed: 0, + } + } +} + +impl Future for Client +where + T: AsyncRead + AsyncWrite + Unpin, +{ + type Output = yamux::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + // First, try to open 256 streams + if this.worker_streams.len() < 256 && this.streams_processed == 0 { + match this.connection.poll_new_outbound(cx)? { + Poll::Ready(stream) => { + this.worker_streams.push(ping_pong(stream).boxed()); + continue; + } + Poll::Pending => { + panic!("Should be able to open 256 streams without yielding") + } + } + } + + if this.worker_streams.len() == 256 && this.streams_processed == 0 { + let poll_result = this.connection.poll_new_outbound(cx); + + match (poll_result, this.trigger.take()) { + (Poll::Pending, Some(trigger)) => { + // This is what we want, our task gets parked because we have hit the limit. + // Tell the server to start processing streams and wait until we get woken. + + trigger.send(()).unwrap(); + return Poll::Pending; + } + (Poll::Ready(stream), None) => { + // We got woken because the server has started to acknowledge streams. + this.worker_streams.push(ping_pong(stream.unwrap()).boxed()); + continue; + } + (Poll::Ready(_), Some(_)) => { + panic!("should not be able to open stream if server hasn't acknowledged existing streams") + } + (Poll::Pending, None) => {} + } + } + + match this.worker_streams.poll_next_unpin(cx)? { + Poll::Ready(Some(())) => { + this.streams_processed += 1; + continue; + } + Poll::Ready(None) if this.streams_processed > 0 => { + return Poll::Ready(Ok(this.streams_processed)); + } + Poll::Ready(None) | Poll::Pending => {} + } + + // Allow the connection to make progress + match this.connection.poll_next_inbound(cx)? { + Poll::Ready(Some(_)) => { + panic!("server never opens stream") + } + Poll::Ready(None) => { + return Poll::Ready(Ok(this.streams_processed)); + } + Poll::Pending => {} + } + + return Poll::Pending; + } + } +} + +async fn ping_pong(mut stream: Stream) -> Result<(), ConnectionError> { + let mut buffer = [0u8; 4]; + stream.write_all(b"ping").await?; + stream.read_exact(&mut buffer).await?; + + assert_eq!(&buffer, b"pong"); + + stream.close().await?; + + Ok(()) +} + +async fn pong_ping(mut stream: Stream) -> Result<(), ConnectionError> { + let mut buffer = [0u8; 4]; + stream.write_all(b"pong").await?; + stream.read_exact(&mut buffer).await?; + + assert_eq!(&buffer, b"ping"); + + stream.close().await?; + + Ok(()) +} diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index 0cbf3a9f..55fb6c4d 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -1,10 +1,13 @@ -use futures::future::BoxFuture; +use futures::future::{BoxFuture, Either}; use futures::stream::FuturesUnordered; -use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; +use futures::{ + future, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt, StreamExt, +}; use quickcheck::QuickCheck; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Duration; use test_harness::*; use tokio::net::TcpStream; use tokio::runtime::Runtime; @@ -36,7 +39,7 @@ fn prop_config_send_recv_multi() { }; let (server_processed, client_processed) = - futures::future::try_join(server, client).await.unwrap(); + future::try_join(server, client).await.unwrap(); assert_eq!(server_processed, num_messagses); assert_eq!(client_processed, num_messagses); From 639687ce9a56e3c3d7a75121fb5a06230f901bcb Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 21 Feb 2023 15:58:33 +1300 Subject: [PATCH 02/33] Don't allow opening more than 256 without receiving an ACK --- yamux/src/connection.rs | 42 ++++++++++++++++++++++++++++++++--------- yamux/src/lib.rs | 5 +++++ 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index c2c19ea3..12d0adcd 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -91,19 +91,19 @@ mod cleanup; mod closing; mod stream; -use crate::Result; use crate::{ error::ConnectionError, frame::header::{self, Data, GoAway, Header, Ping, StreamId, Tag, WindowUpdate, CONNECTION_ID}, frame::{self, Frame}, Config, WindowUpdateMode, DEFAULT_CREDIT, MAX_COMMAND_BACKLOG, }; +use crate::{Result, MAX_ACK_BACKLOG}; use cleanup::Cleanup; use closing::Closing; use futures::{channel::mpsc, future::Either, prelude::*, sink::SinkExt, stream::Fuse}; use nohash_hasher::IntMap; -use std::collections::VecDeque; -use std::task::Context; +use std::collections::{HashSet, VecDeque}; +use std::task::{Context, Waker}; use std::{fmt, sync::Arc, task::Poll}; pub use stream::{Packet, State, Stream}; @@ -160,12 +160,16 @@ impl Connection { pub fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll> { loop { match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) { - ConnectionState::Active(mut active) => match active.new_outbound() { - Ok(stream) => { + ConnectionState::Active(mut active) => match active.poll_new_outbound(cx) { + Poll::Ready(Ok(stream)) => { self.inner = ConnectionState::Active(active); return Poll::Ready(Ok(stream)); } - Err(e) => { + Poll::Pending => { + self.inner = ConnectionState::Active(active); + return Poll::Pending; + } + Poll::Ready(Err(e)) => { self.inner = ConnectionState::Cleanup(active.cleanup(e)); continue; } @@ -352,6 +356,8 @@ struct Active { stream_receiver: mpsc::Receiver, dropped_streams: Vec, pending_frames: VecDeque>, + pending_acks: HashSet, + new_outbound_stream_waker: Option, } /// `Stream` to `Connection` commands. @@ -424,6 +430,8 @@ impl Active { }, dropped_streams: Vec::new(), pending_frames: VecDeque::default(), + pending_acks: HashSet::default(), + new_outbound_stream_waker: None, } } @@ -490,10 +498,16 @@ impl Active { } } - fn new_outbound(&mut self) -> Result { + fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll> { if self.streams.len() >= self.config.max_num_streams { log::error!("{}: maximum number of streams reached", self.id); - return Err(ConnectionError::TooManyStreams); + return Poll::Ready(Err(ConnectionError::TooManyStreams)); + } + + if self.pending_acks.len() >= MAX_ACK_BACKLOG { + log::debug!("{MAX_ACK_BACKLOG} streams waiting for ACK, parking task until remote acknowledges at least one stream"); + self.new_outbound_stream_waker = Some(cx.waker().clone()); + return Poll::Pending; } log::trace!("{}: creating new outbound stream", self.id); @@ -522,7 +536,9 @@ impl Active { log::debug!("{}: new outbound {} of {}", self.id, stream, self); self.streams.insert(id, stream.clone()); - Ok(stream) + self.pending_acks.insert(id); + + Poll::Ready(Ok(stream)) } fn on_send_frame(&mut self, frame: Frame>) { @@ -549,6 +565,14 @@ impl Active { /// if one was opened by the remote. fn on_frame(&mut self, frame: Frame<()>) -> Result> { log::trace!("{}: received: {}", self.id, frame.header()); + + if frame.header().flags().contains(header::ACK) { + self.pending_acks.remove(&frame.header().stream_id()); + if let Some(waker) = self.new_outbound_stream_waker.take() { + waker.wake(); + } + } + let action = match frame.header().tag() { Tag::Data => self.on_data(frame.into_data()), Tag::WindowUpdate => self.on_window_update(&frame.into_window_update()), diff --git a/yamux/src/lib.rs b/yamux/src/lib.rs index cafbf77b..918f43fd 100644 --- a/yamux/src/lib.rs +++ b/yamux/src/lib.rs @@ -49,6 +49,11 @@ pub type Result = std::result::Result; /// actual upper bound is this value + number of clones. const MAX_COMMAND_BACKLOG: usize = 32; +/// The maximum number of streams we will open without an acknowledgement from the other peer. +/// +/// This enables a very basic form of backpressure. +const MAX_ACK_BACKLOG: usize = 256; + /// Default maximum number of bytes a Yamux data frame might carry as its /// payload when being send. Larger Payloads will be split. /// From 4e477e4abc1b7039a04351ec3cbd8f664f61d7eb Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 10 May 2023 14:47:06 +0200 Subject: [PATCH 03/33] Add test for acknowledging streams --- test-harness/tests/ack_timing.rs | 243 +++++++++++++++++++++++++++++++ yamux/src/connection/stream.rs | 8 + 2 files changed, 251 insertions(+) create mode 100644 test-harness/tests/ack_timing.rs diff --git a/test-harness/tests/ack_timing.rs b/test-harness/tests/ack_timing.rs new file mode 100644 index 00000000..6afc8fe3 --- /dev/null +++ b/test-harness/tests/ack_timing.rs @@ -0,0 +1,243 @@ +use futures::future::BoxFuture; +use futures::future::FutureExt; +use futures::{future, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; +use test_harness::bind; +use tokio::net::TcpStream; +use tokio_util::compat::TokioAsyncReadCompatExt; +use yamux::{Config, Connection, ConnectionError, Mode, Stream}; + +#[tokio::test] +async fn stream_is_acknowledged_on_first_use() { + let _ = env_logger::try_init(); + + let (listener, address) = bind().await.expect("bind"); + + let server = async { + let socket = listener.accept().await.expect("accept").0.compat(); + let connection = Connection::new(socket, Config::default(), Mode::Server); + + Server::new(connection).await + }; + + let client = async { + let socket = TcpStream::connect(address).await.expect("connect").compat(); + let connection = Connection::new(socket, Config::default(), Mode::Client); + + Client::new(connection).await + }; + + let ((), ()) = future::try_join(server, client).await.unwrap(); +} + +enum Server { + Accepting { + connection: Connection, + }, + Working { + connection: Connection, + stream: BoxFuture<'static, yamux::Result<()>>, + }, + Idle { + connection: Connection, + }, + Poisoned, +} + +impl Server { + fn new(connection: Connection) -> Self { + Server::Accepting { connection } + } +} + +impl Future for Server +where + T: AsyncRead + AsyncWrite + Unpin, +{ + type Output = yamux::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + match mem::replace(this, Self::Poisoned) { + Self::Accepting { mut connection } => match connection.poll_next_inbound(cx)? { + Poll::Ready(Some(stream)) => { + *this = Self::Working { + connection, + stream: pong_ping(stream).boxed(), + }; + continue; + } + Poll::Ready(None) => { + panic!("connection closed before receiving a new stream") + } + Poll::Pending => { + *this = Self::Accepting { connection }; + return Poll::Pending; + } + }, + Self::Working { + mut connection, + mut stream, + } => { + match stream.poll_unpin(cx)? { + Poll::Ready(()) => { + *this = Self::Idle { connection }; + continue; + } + Poll::Pending => {} + } + + match connection.poll_next_inbound(cx)? { + Poll::Ready(Some(_)) => { + panic!("not expecting new stream"); + } + Poll::Ready(None) => { + panic!("connection closed before stream completed") + } + Poll::Pending => { + *this = Self::Working { connection, stream }; + return Poll::Pending; + } + } + } + Self::Idle { mut connection } => match connection.poll_next_inbound(cx)? { + Poll::Ready(Some(_)) => { + panic!("not expecting new stream"); + } + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Pending => { + *this = Self::Idle { connection }; + return Poll::Pending; + } + }, + Self::Poisoned => unreachable!(), + } + } + } +} + +enum Client { + Opening { + connection: Connection, + }, + Working { + connection: Connection, + stream: BoxFuture<'static, yamux::Result<()>>, + }, + Poisoned, +} + +impl Client { + fn new(connection: Connection) -> Self { + Self::Opening { connection } + } +} + +impl Future for Client +where + T: AsyncRead + AsyncWrite + Unpin, +{ + type Output = yamux::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + match mem::replace(this, Self::Poisoned) { + Self::Opening { mut connection } => match connection.poll_new_outbound(cx)? { + Poll::Ready(stream) => { + *this = Self::Working { + connection, + stream: ping_pong(stream).boxed(), + }; + continue; + } + Poll::Pending => { + *this = Self::Opening { connection }; + return Poll::Pending; + } + }, + Self::Working { + mut connection, + mut stream, + } => { + match stream.poll_unpin(cx)? { + Poll::Ready(()) => { + return Poll::Ready(Ok(())); + } + Poll::Pending => {} + } + + match connection.poll_next_inbound(cx)? { + Poll::Ready(Some(_)) => { + panic!("not expecting new stream"); + } + Poll::Ready(None) => { + panic!("connection closed before stream completed") + } + Poll::Pending => { + *this = Self::Working { connection, stream }; + return Poll::Pending; + } + } + } + Self::Poisoned => unreachable!(), + } + } + } +} + +/// Handler for the **outbound** stream on the client. +/// +/// Initially, the stream is not acknowledged. The server will only acknowledge the stream with the first frame. +async fn ping_pong(mut stream: Stream) -> Result<(), ConnectionError> { + assert!( + !stream.is_acknowledged(), + "newly returned stream should not be acknowledged" + ); + + let mut buffer = [0u8; 4]; + stream.write_all(b"ping").await?; + stream.read_exact(&mut buffer).await?; + + assert!( + stream.is_acknowledged(), + "stream should be acknowledged once we received the first data" + ); + assert_eq!(&buffer, b"pong"); + + stream.close().await?; + + Ok(()) +} + +/// Handler for the **inbound** stream on the server. +/// +/// Initially, the stream is not acknowledged. We only include the ACK flag in the first frame. +async fn pong_ping(mut stream: Stream) -> Result<(), ConnectionError> { + assert!( + !stream.is_acknowledged(), + "before sending anything we should not have acknowledged the stream to the remote" + ); + + let mut buffer = [0u8; 4]; + stream.write_all(b"pong").await?; + + assert!( + stream.is_acknowledged(), + "we should have sent an ACK flag with the first payload" + ); + + stream.read_exact(&mut buffer).await?; + + assert_eq!(&buffer, b"ping"); + + stream.close().await?; + + Ok(()) +} diff --git a/yamux/src/connection/stream.rs b/yamux/src/connection/stream.rs index d7405cd3..ab704010 100644 --- a/yamux/src/connection/stream.rs +++ b/yamux/src/connection/stream.rs @@ -131,6 +131,10 @@ impl Stream { matches!(self.shared().state(), State::Closed) } + pub fn is_acknowledged(&self) -> bool { + self.shared().acknowledged + } + /// Set the flag that should be set on the next outbound frame header. pub(crate) fn set_flag(&mut self, flag: Flag) { self.flag = flag @@ -396,6 +400,9 @@ pub(crate) struct Shared { pub(crate) reader: Option, pub(crate) writer: Option, config: Arc, + + /// Whether the stream has been acknowledged by the remote. + acknowledged: bool, } impl Shared { @@ -408,6 +415,7 @@ impl Shared { reader: None, writer: None, config, + acknowledged: false, } } From 0bd7e0e33d5d53e7c4ef7ed588033dde9555fef4 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 10 May 2023 15:09:11 +0200 Subject: [PATCH 04/33] Correctly track acknowledged state --- yamux/src/connection.rs | 6 +++++- yamux/src/connection/stream.rs | 11 +++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index 12d0adcd..188341d1 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -567,7 +567,11 @@ impl Active { log::trace!("{}: received: {}", self.id, frame.header()); if frame.header().flags().contains(header::ACK) { - self.pending_acks.remove(&frame.header().stream_id()); + let id = frame.header().stream_id(); + self.pending_acks.remove(&id); + if let Some(stream) = self.streams.get(&id) { + stream.set_acknowledged(); + } if let Some(waker) = self.new_outbound_stream_waker.take() { waker.wake(); } diff --git a/yamux/src/connection/stream.rs b/yamux/src/connection/stream.rs index ab704010..6513049d 100644 --- a/yamux/src/connection/stream.rs +++ b/yamux/src/connection/stream.rs @@ -8,6 +8,7 @@ // at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license // at https://opensource.org/licenses/MIT. +use crate::frame::header::ACK; use crate::{ chunks::Chunks, connection::{self, StreamCommand}, @@ -135,6 +136,10 @@ impl Stream { self.shared().acknowledged } + pub(crate) fn set_acknowledged(&self) { + self.shared().acknowledged = true; + } + /// Set the flag that should be set on the next outbound frame header. pub(crate) fn set_flag(&mut self, flag: Flag) { self.flag = flag @@ -355,6 +360,12 @@ impl AsyncWrite for Stream { let mut frame = Frame::data(self.id, body).expect("body <= u32::MAX").left(); self.add_flag(frame.header_mut()); log::trace!("{}/{}: write {} bytes", self.conn, self.id, n); + + // technically, the frame hasn't been sent yet on the wire but from the perspective of this data structure, we've queued the frame for sending + if frame.header().flags().contains(ACK) { + self.set_acknowledged(); + } + let cmd = StreamCommand::SendFrame(frame); self.sender .start_send(cmd) From 7c8f2f89f51dcc01d468cf6a03d9d1d2ac6b7406 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 22 May 2023 18:35:31 +1000 Subject: [PATCH 05/33] Update yamux/src/lib.rs Co-authored-by: Max Inden --- yamux/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yamux/src/lib.rs b/yamux/src/lib.rs index 918f43fd..ac90b360 100644 --- a/yamux/src/lib.rs +++ b/yamux/src/lib.rs @@ -51,7 +51,7 @@ const MAX_COMMAND_BACKLOG: usize = 32; /// The maximum number of streams we will open without an acknowledgement from the other peer. /// -/// This enables a very basic form of backpressure. +/// This enables a very basic form of backpressure on the creation of streams. const MAX_ACK_BACKLOG: usize = 256; /// Default maximum number of bytes a Yamux data frame might carry as its From 8f6748201083bec21ff19183b75558fc69a0d5f6 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 22 May 2023 18:39:03 +1000 Subject: [PATCH 06/33] Update yamux/src/connection.rs Co-authored-by: Max Inden --- yamux/src/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index 188341d1..8bbdea80 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -505,7 +505,7 @@ impl Active { } if self.pending_acks.len() >= MAX_ACK_BACKLOG { - log::debug!("{MAX_ACK_BACKLOG} streams waiting for ACK, parking task until remote acknowledges at least one stream"); + log::debug!("{MAX_ACK_BACKLOG} streams waiting for ACK, registering task for wake-up until remote acknowledges at least one stream"); self.new_outbound_stream_waker = Some(cx.waker().clone()); return Poll::Pending; } From 2d92a773d8a0ca1f963016c5816b6875136c2733 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 23 May 2023 23:14:04 +0200 Subject: [PATCH 07/33] Don't duplicate data --- test-harness/tests/poll_api.rs | 7 ++----- yamux/src/connection.rs | 17 ++++++++++------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index 55fb6c4d..159c156c 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -1,13 +1,10 @@ -use futures::future::{BoxFuture, Either}; +use futures::future::BoxFuture; use futures::stream::FuturesUnordered; -use futures::{ - future, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt, StreamExt, -}; +use futures::{future, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; use quickcheck::QuickCheck; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Duration; use test_harness::*; use tokio::net::TcpStream; use tokio::runtime::Runtime; diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index 188341d1..fa2958d4 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -102,7 +102,7 @@ use cleanup::Cleanup; use closing::Closing; use futures::{channel::mpsc, future::Either, prelude::*, sink::SinkExt, stream::Fuse}; use nohash_hasher::IntMap; -use std::collections::{HashSet, VecDeque}; +use std::collections::VecDeque; use std::task::{Context, Waker}; use std::{fmt, sync::Arc, task::Poll}; @@ -356,7 +356,6 @@ struct Active { stream_receiver: mpsc::Receiver, dropped_streams: Vec, pending_frames: VecDeque>, - pending_acks: HashSet, new_outbound_stream_waker: Option, } @@ -430,7 +429,6 @@ impl Active { }, dropped_streams: Vec::new(), pending_frames: VecDeque::default(), - pending_acks: HashSet::default(), new_outbound_stream_waker: None, } } @@ -504,7 +502,7 @@ impl Active { return Poll::Ready(Err(ConnectionError::TooManyStreams)); } - if self.pending_acks.len() >= MAX_ACK_BACKLOG { + if self.ack_backlog() >= MAX_ACK_BACKLOG { log::debug!("{MAX_ACK_BACKLOG} streams waiting for ACK, parking task until remote acknowledges at least one stream"); self.new_outbound_stream_waker = Some(cx.waker().clone()); return Poll::Pending; @@ -536,8 +534,6 @@ impl Active { log::debug!("{}: new outbound {} of {}", self.id, stream, self); self.streams.insert(id, stream.clone()); - self.pending_acks.insert(id); - Poll::Ready(Ok(stream)) } @@ -568,7 +564,6 @@ impl Active { if frame.header().flags().contains(header::ACK) { let id = frame.header().stream_id(); - self.pending_acks.remove(&id); if let Some(stream) = self.streams.get(&id) { stream.set_acknowledged(); } @@ -862,6 +857,14 @@ impl Active { Ok(proposed) } + /// The ACK backlog is defined as the number of streams that have not yet been acknowledged. + fn ack_backlog(&mut self) -> usize { + self.streams + .values() + .filter(|s| !s.is_acknowledged()) + .count() + } + // Check if the given stream ID is valid w.r.t. the provided tag and our connection mode. fn is_valid_remote_id(&self, id: StreamId, tag: Tag) -> bool { if tag == Tag::Ping || tag == Tag::GoAway { From 471edc73a5931899080ea4d915b178fb170c6fa8 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 23 May 2023 23:16:19 +0200 Subject: [PATCH 08/33] Reduce diff --- test-harness/tests/poll_api.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index 159c156c..0cbf3a9f 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -1,6 +1,6 @@ use futures::future::BoxFuture; use futures::stream::FuturesUnordered; -use futures::{future, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; +use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; use quickcheck::QuickCheck; use std::future::Future; use std::pin::Pin; @@ -36,7 +36,7 @@ fn prop_config_send_recv_multi() { }; let (server_processed, client_processed) = - future::try_join(server, client).await.unwrap(); + futures::future::try_join(server, client).await.unwrap(); assert_eq!(server_processed, num_messagses); assert_eq!(client_processed, num_messagses); From fbcbff02278c418b04e4c3429cd58e52d3ddf9d8 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 24 May 2023 10:47:51 +0200 Subject: [PATCH 09/33] Fix bug where we count inbound and outbound streams for ack backlog --- yamux/src/connection.rs | 28 +++++++++++++++----- yamux/src/connection/stream.rs | 48 ++++++++++++++++++++++++++++++++-- 2 files changed, 68 insertions(+), 8 deletions(-) diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index 8528d6e8..eb9c4ac5 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -107,6 +107,7 @@ use std::collections::VecDeque; use std::task::{Context, Waker}; use std::{fmt, sync::Arc, task::Poll}; +use crate::connection::stream::Direction; use crate::tagged_stream::TaggedStream; pub use stream::{Packet, State, Stream}; @@ -523,7 +524,8 @@ impl Active { self.pending_frames.push_back(frame.into()); } - let mut stream = self.make_new_stream(id, self.config.receive_window, DEFAULT_CREDIT); + let mut stream = + self.make_new_outbound_stream(id, self.config.receive_window, DEFAULT_CREDIT); if extra_credit == 0 { stream.set_flag(stream::Flag::Syn) @@ -712,7 +714,8 @@ impl Active { log::error!("{}: maximum number of streams reached", self.id); return Action::Terminate(Frame::internal_error()); } - let mut stream = self.make_new_stream(stream_id, DEFAULT_CREDIT, DEFAULT_CREDIT); + let mut stream = + self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT, DEFAULT_CREDIT); let mut window_update = None; { let mut shared = stream.shared(); @@ -829,7 +832,7 @@ impl Active { } let credit = frame.header().credit() + DEFAULT_CREDIT; - let mut stream = self.make_new_stream(stream_id, DEFAULT_CREDIT, credit); + let mut stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT, credit); stream.set_flag(stream::Flag::Ack); if is_finish { @@ -896,7 +899,7 @@ impl Active { Action::None } - fn make_new_stream(&mut self, id: StreamId, window: u32, credit: u32) -> Stream { + fn make_new_inbound_stream(&mut self, id: StreamId, window: u32, credit: u32) -> Stream { let config = self.config.clone(); let (sender, receiver) = mpsc::channel(10); // 10 is an arbitrary number. @@ -905,7 +908,19 @@ impl Active { waker.wake(); } - Stream::new(id, self.id, config, window, credit, sender) + Stream::new_inbound(id, self.id, config, window, credit, sender) + } + + fn make_new_outbound_stream(&mut self, id: StreamId, window: u32, credit: u32) -> Stream { + let config = self.config.clone(); + + let (sender, receiver) = mpsc::channel(10); // 10 is an arbitrary number. + self.stream_receivers.push(TaggedStream::new(id, receiver)); + if let Some(waker) = self.no_streams_waker.take() { + waker.wake(); + } + + Stream::new_outbound(id, self.id, config, window, credit, sender) } fn next_stream_id(&mut self) -> Result { @@ -921,10 +936,11 @@ impl Active { Ok(proposed) } - /// The ACK backlog is defined as the number of streams that have not yet been acknowledged. + /// The ACK backlog is defined as the number of outbound streams that have not yet been acknowledged. fn ack_backlog(&mut self) -> usize { self.streams .values() + .filter(|s| s.direction() == Direction::Outbound) .filter(|s| !s.is_acknowledged()) .count() } diff --git a/yamux/src/connection/stream.rs b/yamux/src/connection/stream.rs index 94c8a2dd..ea8368ef 100644 --- a/yamux/src/connection/stream.rs +++ b/yamux/src/connection/stream.rs @@ -83,6 +83,7 @@ pub struct Stream { sender: mpsc::Sender, flag: Flag, shared: Arc>, + direction: Direction, } impl fmt::Debug for Stream { @@ -101,7 +102,7 @@ impl fmt::Display for Stream { } impl Stream { - pub(crate) fn new( + pub(crate) fn new_inbound( id: StreamId, conn: connection::Id, config: Arc, @@ -109,13 +110,45 @@ impl Stream { credit: u32, sender: mpsc::Sender, ) -> Self { - Stream { + Self::new(id, conn, config, window, credit, sender, Direction::Inbound) + } + + pub(crate) fn new_outbound( + id: StreamId, + conn: connection::Id, + config: Arc, + window: u32, + credit: u32, + sender: mpsc::Sender, + ) -> Self { + Self::new( + id, + conn, + config, + window, + credit, + sender, + Direction::Outbound, + ) + } + + fn new( + id: StreamId, + conn: connection::Id, + config: Arc, + window: u32, + credit: u32, + sender: mpsc::Sender, + direction: Direction, + ) -> Self { + Self { id, conn, config: config.clone(), sender, flag: Flag::None, shared: Arc::new(Mutex::new(Shared::new(window, credit, config))), + direction, } } @@ -140,6 +173,10 @@ impl Stream { self.shared().acknowledged = true; } + pub(crate) fn direction(&self) -> Direction { + self.direction + } + /// Set the flag that should be set on the next outbound frame header. pub(crate) fn set_flag(&mut self, flag: Flag) { self.flag = flag @@ -157,6 +194,7 @@ impl Stream { sender: self.sender.clone(), flag: self.flag, shared: self.shared.clone(), + direction: self.direction, } } @@ -212,6 +250,12 @@ impl Stream { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Direction { + Inbound, + Outbound, +} + /// Byte data produced by the [`futures::stream::Stream`] impl of [`Stream`]. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Packet(Vec); From d7159807fcb1926c7c860d363f8eff9b298abbe2 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 24 May 2023 10:49:19 +0200 Subject: [PATCH 10/33] Window size for inbound streams always starts with DEFAULT_CREDIT --- yamux/src/connection.rs | 9 ++++----- yamux/src/connection/stream.rs | 13 ++++++++++--- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index eb9c4ac5..b40eae61 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -714,8 +714,7 @@ impl Active { log::error!("{}: maximum number of streams reached", self.id); return Action::Terminate(Frame::internal_error()); } - let mut stream = - self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT, DEFAULT_CREDIT); + let mut stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT); let mut window_update = None; { let mut shared = stream.shared(); @@ -832,7 +831,7 @@ impl Active { } let credit = frame.header().credit() + DEFAULT_CREDIT; - let mut stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT, credit); + let mut stream = self.make_new_inbound_stream(stream_id, credit); stream.set_flag(stream::Flag::Ack); if is_finish { @@ -899,7 +898,7 @@ impl Active { Action::None } - fn make_new_inbound_stream(&mut self, id: StreamId, window: u32, credit: u32) -> Stream { + fn make_new_inbound_stream(&mut self, id: StreamId, credit: u32) -> Stream { let config = self.config.clone(); let (sender, receiver) = mpsc::channel(10); // 10 is an arbitrary number. @@ -908,7 +907,7 @@ impl Active { waker.wake(); } - Stream::new_inbound(id, self.id, config, window, credit, sender) + Stream::new_inbound(id, self.id, config, credit, sender) } fn make_new_outbound_stream(&mut self, id: StreamId, window: u32, credit: u32) -> Stream { diff --git a/yamux/src/connection/stream.rs b/yamux/src/connection/stream.rs index ea8368ef..6a223903 100644 --- a/yamux/src/connection/stream.rs +++ b/yamux/src/connection/stream.rs @@ -16,7 +16,7 @@ use crate::{ header::{Data, Header, StreamId, WindowUpdate}, Frame, }, - Config, WindowUpdateMode, + Config, WindowUpdateMode, DEFAULT_CREDIT, }; use futures::{ channel::mpsc, @@ -106,11 +106,18 @@ impl Stream { id: StreamId, conn: connection::Id, config: Arc, - window: u32, credit: u32, sender: mpsc::Sender, ) -> Self { - Self::new(id, conn, config, window, credit, sender, Direction::Inbound) + Self::new( + id, + conn, + config, + DEFAULT_CREDIT, + credit, + sender, + Direction::Inbound, + ) } pub(crate) fn new_outbound( From 5793d27f5da10613ecef84335c850c0e1b1a8a31 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 24 May 2023 10:51:03 +0200 Subject: [PATCH 11/33] Credit for outbound stream always starts with DEFAULT_CREDIT --- yamux/src/connection.rs | 7 +++---- yamux/src/connection/stream.rs | 3 +-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index b40eae61..92e4f6cc 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -524,8 +524,7 @@ impl Active { self.pending_frames.push_back(frame.into()); } - let mut stream = - self.make_new_outbound_stream(id, self.config.receive_window, DEFAULT_CREDIT); + let mut stream = self.make_new_outbound_stream(id, self.config.receive_window); if extra_credit == 0 { stream.set_flag(stream::Flag::Syn) @@ -910,7 +909,7 @@ impl Active { Stream::new_inbound(id, self.id, config, credit, sender) } - fn make_new_outbound_stream(&mut self, id: StreamId, window: u32, credit: u32) -> Stream { + fn make_new_outbound_stream(&mut self, id: StreamId, window: u32) -> Stream { let config = self.config.clone(); let (sender, receiver) = mpsc::channel(10); // 10 is an arbitrary number. @@ -919,7 +918,7 @@ impl Active { waker.wake(); } - Stream::new_outbound(id, self.id, config, window, credit, sender) + Stream::new_outbound(id, self.id, config, window, sender) } fn next_stream_id(&mut self) -> Result { diff --git a/yamux/src/connection/stream.rs b/yamux/src/connection/stream.rs index 6a223903..4dc9952b 100644 --- a/yamux/src/connection/stream.rs +++ b/yamux/src/connection/stream.rs @@ -125,7 +125,6 @@ impl Stream { conn: connection::Id, config: Arc, window: u32, - credit: u32, sender: mpsc::Sender, ) -> Self { Self::new( @@ -133,7 +132,7 @@ impl Stream { conn, config, window, - credit, + DEFAULT_CREDIT, sender, Direction::Outbound, ) From e1cbf7af186c2399ced32e559c3e6bdb902facea Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 24 May 2023 10:53:32 +0200 Subject: [PATCH 12/33] Inline constructor This doesn't actually save any LoC. --- yamux/src/connection/stream.rs | 36 ++++++++-------------------------- 1 file changed, 8 insertions(+), 28 deletions(-) diff --git a/yamux/src/connection/stream.rs b/yamux/src/connection/stream.rs index 4dc9952b..045596f0 100644 --- a/yamux/src/connection/stream.rs +++ b/yamux/src/connection/stream.rs @@ -109,15 +109,15 @@ impl Stream { credit: u32, sender: mpsc::Sender, ) -> Self { - Self::new( + Self { id, conn, - config, - DEFAULT_CREDIT, - credit, + config: config.clone(), sender, - Direction::Inbound, - ) + flag: Flag::None, + shared: Arc::new(Mutex::new(Shared::new(DEFAULT_CREDIT, credit, config))), + direction: Direction::Inbound, + } } pub(crate) fn new_outbound( @@ -126,26 +126,6 @@ impl Stream { config: Arc, window: u32, sender: mpsc::Sender, - ) -> Self { - Self::new( - id, - conn, - config, - window, - DEFAULT_CREDIT, - sender, - Direction::Outbound, - ) - } - - fn new( - id: StreamId, - conn: connection::Id, - config: Arc, - window: u32, - credit: u32, - sender: mpsc::Sender, - direction: Direction, ) -> Self { Self { id, @@ -153,8 +133,8 @@ impl Stream { config: config.clone(), sender, flag: Flag::None, - shared: Arc::new(Mutex::new(Shared::new(window, credit, config))), - direction, + shared: Arc::new(Mutex::new(Shared::new(window, DEFAULT_CREDIT, config))), + direction: Direction::Outbound, } } From bc615d49593fc9de46646f2ecf35dc554702db1d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 24 May 2023 10:57:19 +0200 Subject: [PATCH 13/33] Introduce `is_outbound` function --- yamux/src/connection.rs | 5 ++--- yamux/src/connection/stream.rs | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index 92e4f6cc..58180da0 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -91,6 +91,7 @@ mod cleanup; mod closing; mod stream; +use crate::tagged_stream::TaggedStream; use crate::{ error::ConnectionError, frame::header::{self, Data, GoAway, Header, Ping, StreamId, Tag, WindowUpdate, CONNECTION_ID}, @@ -107,8 +108,6 @@ use std::collections::VecDeque; use std::task::{Context, Waker}; use std::{fmt, sync::Arc, task::Poll}; -use crate::connection::stream::Direction; -use crate::tagged_stream::TaggedStream; pub use stream::{Packet, State, Stream}; /// How the connection is used. @@ -938,7 +937,7 @@ impl Active { fn ack_backlog(&mut self) -> usize { self.streams .values() - .filter(|s| s.direction() == Direction::Outbound) + .filter(|s| s.is_outbound()) .filter(|s| !s.is_acknowledged()) .count() } diff --git a/yamux/src/connection/stream.rs b/yamux/src/connection/stream.rs index 045596f0..fba3dc4a 100644 --- a/yamux/src/connection/stream.rs +++ b/yamux/src/connection/stream.rs @@ -159,8 +159,8 @@ impl Stream { self.shared().acknowledged = true; } - pub(crate) fn direction(&self) -> Direction { - self.direction + pub(crate) fn is_outbound(&self) -> bool { + matches!(self.direction, Direction::Outbound) } /// Set the flag that should be set on the next outbound frame header. From c07a5ad1025e3d7e9063fdaf3649858699fdbd31 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 24 May 2023 11:01:53 +0200 Subject: [PATCH 14/33] Compute `is_outbound` based on `Mode` --- yamux/src/connection.rs | 2 +- yamux/src/connection/stream.rs | 26 +++++++++++++------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index 58180da0..281c06a1 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -937,7 +937,7 @@ impl Active { fn ack_backlog(&mut self) -> usize { self.streams .values() - .filter(|s| s.is_outbound()) + .filter(|s| s.is_outbound(self.mode)) .filter(|s| !s.is_acknowledged()) .count() } diff --git a/yamux/src/connection/stream.rs b/yamux/src/connection/stream.rs index fba3dc4a..8f954174 100644 --- a/yamux/src/connection/stream.rs +++ b/yamux/src/connection/stream.rs @@ -16,7 +16,7 @@ use crate::{ header::{Data, Header, StreamId, WindowUpdate}, Frame, }, - Config, WindowUpdateMode, DEFAULT_CREDIT, + Config, Mode, WindowUpdateMode, DEFAULT_CREDIT, }; use futures::{ channel::mpsc, @@ -83,7 +83,6 @@ pub struct Stream { sender: mpsc::Sender, flag: Flag, shared: Arc>, - direction: Direction, } impl fmt::Debug for Stream { @@ -116,7 +115,6 @@ impl Stream { sender, flag: Flag::None, shared: Arc::new(Mutex::new(Shared::new(DEFAULT_CREDIT, credit, config))), - direction: Direction::Inbound, } } @@ -134,7 +132,6 @@ impl Stream { sender, flag: Flag::None, shared: Arc::new(Mutex::new(Shared::new(window, DEFAULT_CREDIT, config))), - direction: Direction::Outbound, } } @@ -159,8 +156,18 @@ impl Stream { self.shared().acknowledged = true; } - pub(crate) fn is_outbound(&self) -> bool { - matches!(self.direction, Direction::Outbound) + /// Whether this is an outbound stream. + /// + /// Clients use odd IDs and servers use even IDs. + /// A stream is outbound if: + /// + /// - Its ID is odd and we are the client. + /// - Its ID is even and we are the server. + pub(crate) fn is_outbound(&self, our_mode: Mode) -> bool { + match our_mode { + Mode::Client => self.id.is_client(), + Mode::Server => self.id.is_server(), + } } /// Set the flag that should be set on the next outbound frame header. @@ -180,7 +187,6 @@ impl Stream { sender: self.sender.clone(), flag: self.flag, shared: self.shared.clone(), - direction: self.direction, } } @@ -236,12 +242,6 @@ impl Stream { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Direction { - Inbound, - Outbound, -} - /// Byte data produced by the [`futures::stream::Stream`] impl of [`Stream`]. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Packet(Vec); From f377492264531c58b3bbdeb1c706e4140d49bd8a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 24 May 2023 11:25:58 +0200 Subject: [PATCH 15/33] Track acknowledged as part of state --- test-harness/tests/ack_timing.rs | 8 +++--- yamux/src/connection.rs | 8 +++--- yamux/src/connection/stream.rs | 46 ++++++++++++++++++++------------ 3 files changed, 38 insertions(+), 24 deletions(-) diff --git a/test-harness/tests/ack_timing.rs b/test-harness/tests/ack_timing.rs index 6afc8fe3..5f835e24 100644 --- a/test-harness/tests/ack_timing.rs +++ b/test-harness/tests/ack_timing.rs @@ -197,7 +197,7 @@ where /// Initially, the stream is not acknowledged. The server will only acknowledge the stream with the first frame. async fn ping_pong(mut stream: Stream) -> Result<(), ConnectionError> { assert!( - !stream.is_acknowledged(), + stream.is_pending_ack(), "newly returned stream should not be acknowledged" ); @@ -206,7 +206,7 @@ async fn ping_pong(mut stream: Stream) -> Result<(), ConnectionError> { stream.read_exact(&mut buffer).await?; assert!( - stream.is_acknowledged(), + !stream.is_pending_ack(), "stream should be acknowledged once we received the first data" ); assert_eq!(&buffer, b"pong"); @@ -221,7 +221,7 @@ async fn ping_pong(mut stream: Stream) -> Result<(), ConnectionError> { /// Initially, the stream is not acknowledged. We only include the ACK flag in the first frame. async fn pong_ping(mut stream: Stream) -> Result<(), ConnectionError> { assert!( - !stream.is_acknowledged(), + stream.is_pending_ack(), "before sending anything we should not have acknowledged the stream to the remote" ); @@ -229,7 +229,7 @@ async fn pong_ping(mut stream: Stream) -> Result<(), ConnectionError> { stream.write_all(b"pong").await?; assert!( - stream.is_acknowledged(), + !stream.is_pending_ack(), "we should have sent an ACK flag with the first payload" ); diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index 281c06a1..f388a3e7 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -561,7 +561,7 @@ impl Active { let frame = match shared.update_state(self.id, stream_id, State::Closed) { // The stream was dropped without calling `poll_close`. // We reset the stream to inform the remote of the closure. - State::Open => { + State::Open { .. } => { let mut header = Header::data(stream_id, 0); header.rst(); Some(Frame::new(header)) @@ -626,7 +626,9 @@ impl Active { if frame.header().flags().contains(header::ACK) { let id = frame.header().stream_id(); if let Some(stream) = self.streams.get(&id) { - stream.set_acknowledged(); + stream + .shared() + .update_state(self.id, id, State::Open { acknowledged: true }); } if let Some(waker) = self.new_outbound_stream_waker.take() { waker.wake(); @@ -938,7 +940,7 @@ impl Active { self.streams .values() .filter(|s| s.is_outbound(self.mode)) - .filter(|s| !s.is_acknowledged()) + .filter(|s| s.is_pending_ack()) .count() } diff --git a/yamux/src/connection/stream.rs b/yamux/src/connection/stream.rs index 8f954174..8d07d61f 100644 --- a/yamux/src/connection/stream.rs +++ b/yamux/src/connection/stream.rs @@ -37,7 +37,18 @@ use std::{ #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum State { /// Open bidirectionally. - Open, + Open { + /// Whether the stream is acknowledged. + /// + /// For outbound streams, this tracks whether the remote has acknowledged our stream. + /// For inbound streams, this tracks whether we have acknowledged the stream to the remote. + /// + /// This starts out with `false` and is set to `true` when we receive an `ACK` flag for this stream. + /// We may also directly transition: + /// - from `Open` to `RecvClosed` if the remote immediately sends `FIN`. + /// - from `Open` to `Closed` if the remote immediately sends `RST`. + acknowledged: bool, + }, /// Open for incoming messages. SendClosed, /// Open for outgoing messages. @@ -148,12 +159,14 @@ impl Stream { matches!(self.shared().state(), State::Closed) } - pub fn is_acknowledged(&self) -> bool { - self.shared().acknowledged - } - - pub(crate) fn set_acknowledged(&self) { - self.shared().acknowledged = true; + /// Whether we are still waiting for the remote to acknowledge this stream. + pub fn is_pending_ack(&self) -> bool { + matches!( + self.shared().state(), + State::Open { + acknowledged: false + } + ) } /// Whether this is an outbound stream. @@ -389,7 +402,8 @@ impl AsyncWrite for Stream { // technically, the frame hasn't been sent yet on the wire but from the perspective of this data structure, we've queued the frame for sending if frame.header().flags().contains(ACK) { - self.set_acknowledged(); + self.shared() + .update_state(self.conn, self.id, State::Open { acknowledged: true }); } let cmd = StreamCommand::SendFrame(frame); @@ -439,22 +453,20 @@ pub(crate) struct Shared { pub(crate) reader: Option, pub(crate) writer: Option, config: Arc, - - /// Whether the stream has been acknowledged by the remote. - acknowledged: bool, } impl Shared { fn new(window: u32, credit: u32, config: Arc) -> Self { Shared { - state: State::Open, + state: State::Open { + acknowledged: false, + }, window, credit, buffer: Chunks::new(), reader: None, writer: None, config, - acknowledged: false, } } @@ -475,19 +487,19 @@ impl Shared { match (current, next) { (Closed, _) => {} - (Open, _) => self.state = next, + (Open { .. }, _) => self.state = next, (RecvClosed, Closed) => self.state = Closed, - (RecvClosed, Open) => {} + (RecvClosed, Open { .. }) => {} (RecvClosed, RecvClosed) => {} (RecvClosed, SendClosed) => self.state = Closed, (SendClosed, Closed) => self.state = Closed, - (SendClosed, Open) => {} + (SendClosed, Open { .. }) => {} (SendClosed, RecvClosed) => self.state = Closed, (SendClosed, SendClosed) => {} } log::trace!( - "{}/{}: update state: ({:?} {:?} {:?})", + "{}/{}: update state: (from {:?} to {:?} -> {:?})", cid, sid, current, From f2f5bc877c8027c5151563159ee4db2746fb4c49 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 18:44:18 +0100 Subject: [PATCH 16/33] Unify harness code --- test-harness/src/lib.rs | 60 +++++++++++++++++++---- test-harness/tests/concurrent.rs | 83 +++----------------------------- test-harness/tests/poll_api.rs | 4 +- test-harness/tests/tests.rs | 7 +-- 4 files changed, 63 insertions(+), 91 deletions(-) diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index 4c387b42..1e1771ab 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -10,7 +10,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::pin::Pin; use std::task::{Context, Poll}; use std::{fmt, io, mem}; -use tokio::net::{TcpListener, TcpStream}; +use tokio::net::{TcpListener, TcpSocket, TcpStream}; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; use yamux::ConnectionError; use yamux::{Config, WindowUpdateMode}; @@ -19,8 +19,9 @@ use yamux::{Connection, Mode}; pub async fn connected_peers( server_config: Config, client_config: Config, + buffer_sizes: Option, ) -> io::Result<(Connection>, Connection>)> { - let (listener, addr) = bind().await?; + let (listener, addr) = bind(buffer_sizes).await?; let server = async { let (stream, _) = listener.accept().await?; @@ -31,7 +32,7 @@ pub async fn connected_peers( )) }; let client = async { - let stream = TcpStream::connect(addr).await?; + let stream = new_socket(buffer_sizes)?.connect(addr).await?; Ok(Connection::new( stream.compat(), client_config, @@ -42,12 +43,27 @@ pub async fn connected_peers( futures::future::try_join(server, client).await } -pub async fn bind() -> io::Result<(TcpListener, SocketAddr)> { - let i = Ipv4Addr::new(127, 0, 0, 1); - let s = SocketAddr::V4(SocketAddrV4::new(i, 0)); - let l = TcpListener::bind(&s).await?; - let a = l.local_addr()?; - Ok((l, a)) +pub async fn bind(buffer_sizes: Option) -> io::Result<(TcpListener, SocketAddr)> { + let socket = new_socket(buffer_sizes)?; + socket.bind(SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + 0, + )))?; + + let listener = socket.listen(1024)?; + let address = listener.local_addr()?; + + Ok((listener, address)) +} + +fn new_socket(buffer_sizes: Option) -> io::Result { + let socket = TcpSocket::new_v4()?; + if let Some(size) = buffer_sizes { + socket.set_send_buffer_size(size.send)?; + socket.set_recv_buffer_size(size.recv)?; + } + + Ok(socket) } /// For each incoming stream of `c` echo back to the sender. @@ -76,6 +92,32 @@ pub async fn noop_server(c: impl Stream Self { + let send = if bool::arbitrary(g) { + 16 * 1024 + } else { + 32 * 1024 + }; + + // Have receive buffer size be some multiple of send buffer size. + let recv = if bool::arbitrary(g) { + send * 2 + } else { + send * 4 + }; + + TcpBufferSizes { send, recv } + } +} + pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): Msg) -> io::Result<()> { let id = stream.id(); let (mut reader, mut writer) = AsyncReadExt::split(stream); diff --git a/test-harness/tests/concurrent.rs b/test-harness/tests/concurrent.rs index 46501158..04b8e6e2 100644 --- a/test-harness/tests/concurrent.rs +++ b/test-harness/tests/concurrent.rs @@ -10,16 +10,10 @@ use futures::prelude::*; use futures::stream::FuturesUnordered; -use quickcheck::{Arbitrary, Gen, QuickCheck}; -use std::{ - io, - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, -}; +use quickcheck::QuickCheck; use test_harness::*; -use tokio::net::{TcpListener, TcpStream}; -use tokio::{net::TcpSocket, runtime::Runtime, task}; -use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; -use yamux::{Config, Connection, ConnectionError, Control, Mode, WindowUpdateMode}; +use tokio::{runtime::Runtime, task}; +use yamux::{Config, ConnectionError, Control, WindowUpdateMode}; const PAYLOAD_SIZE: usize = 128 * 1024; @@ -32,7 +26,9 @@ fn concurrent_streams() { let n_streams = 1000; Runtime::new().expect("new runtime").block_on(async move { - let (server, client) = connected_peers(tcp_buffer_sizes).await.unwrap(); + let (server, client) = connected_peers(config(), config(), tcp_buffer_sizes) + .await + .unwrap(); task::spawn(echo_server(server)); @@ -70,73 +66,6 @@ fn concurrent_streams() { QuickCheck::new().tests(3).quickcheck(prop as fn(_) -> _) } -/// Send and receive buffer size for a TCP socket. -#[derive(Clone, Debug, Copy)] -struct TcpBufferSizes { - send: u32, - recv: u32, -} - -impl Arbitrary for TcpBufferSizes { - fn arbitrary(g: &mut Gen) -> Self { - let send = if bool::arbitrary(g) { - 16 * 1024 - } else { - 32 * 1024 - }; - - // Have receive buffer size be some multiple of send buffer size. - let recv = if bool::arbitrary(g) { - send * 2 - } else { - send * 4 - }; - - TcpBufferSizes { send, recv } - } -} - -async fn connected_peers( - buffer_sizes: Option, -) -> io::Result<(Connection>, Connection>)> { - let (listener, addr) = bind(buffer_sizes).await?; - - let server = async { - let (stream, _) = listener.accept().await?; - Ok(Connection::new(stream.compat(), config(), Mode::Server)) - }; - let client = async { - let stream = new_socket(buffer_sizes)?.connect(addr).await?; - - Ok(Connection::new(stream.compat(), config(), Mode::Client)) - }; - - futures::future::try_join(server, client).await -} - -async fn bind(buffer_sizes: Option) -> io::Result<(TcpListener, SocketAddr)> { - let socket = new_socket(buffer_sizes)?; - socket.bind(SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(127, 0, 0, 1), - 0, - )))?; - - let listener = socket.listen(1024)?; - let address = listener.local_addr()?; - - Ok((listener, address)) -} - -fn new_socket(buffer_sizes: Option) -> io::Result { - let socket = TcpSocket::new_v4()?; - if let Some(size) = buffer_sizes { - socket.set_send_buffer_size(size.send)?; - socket.set_recv_buffer_size(size.recv)?; - } - - Ok(socket) -} - fn config() -> Config { let mut server_cfg = Config::default(); // Use a large frame size to speed up the test. diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index a3ec5759..1b8a5b21 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -20,7 +20,7 @@ fn prop_config_send_recv_multi() { Runtime::new().unwrap().block_on(async move { let num_messagses = msgs.len(); - let (listener, address) = bind().await.expect("bind"); + let (listener, address) = bind(None).await.expect("bind"); let server = async { let socket = listener.accept().await.expect("accept").0.compat(); @@ -55,7 +55,7 @@ fn prop_max_streams() { cfg.set_max_num_streams(max_streams); Runtime::new().unwrap().block_on(async move { - let (server, client) = connected_peers(cfg.clone(), cfg).await?; + let (server, client) = connected_peers(cfg.clone(), cfg, None).await?; task::spawn(EchoServer::new(server)); diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 5a4be48f..9b04e271 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -33,7 +33,7 @@ fn prop_config_send_recv_single() { msgs.insert(0, Msg(vec![1u8; yamux::DEFAULT_CREDIT as usize])); Runtime::new().unwrap().block_on(async move { - let (server, client) = connected_peers(cfg1, cfg2).await?; + let (server, client) = connected_peers(cfg1, cfg2, None).await?; let server = echo_server(server); let client = async { @@ -62,7 +62,8 @@ fn prop_send_recv() { } Runtime::new().unwrap().block_on(async move { - let (server, client) = connected_peers(Config::default(), Config::default()).await?; + let (server, client) = + connected_peers(Config::default(), Config::default(), None).await?; let server = echo_server(server); let client = async { @@ -88,7 +89,7 @@ fn prop_send_recv_half_closed() { Runtime::new().unwrap().block_on(async move { let (mut server, client) = - connected_peers(Config::default(), Config::default()).await?; + connected_peers(Config::default(), Config::default(), None).await?; // Server should be able to write on a stream shutdown by the client. let server = async { From 3b355672e4e48e902be2e7d2ae842a932f442407 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 18:55:25 +0100 Subject: [PATCH 17/33] Port `concurrent_streams` test to poll-api --- test-harness/tests/concurrent.rs | 82 -------------------------------- test-harness/tests/poll_api.rs | 61 ++++++++++++++++++++---- 2 files changed, 52 insertions(+), 91 deletions(-) delete mode 100644 test-harness/tests/concurrent.rs diff --git a/test-harness/tests/concurrent.rs b/test-harness/tests/concurrent.rs deleted file mode 100644 index 04b8e6e2..00000000 --- a/test-harness/tests/concurrent.rs +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -use futures::prelude::*; -use futures::stream::FuturesUnordered; -use quickcheck::QuickCheck; -use test_harness::*; -use tokio::{runtime::Runtime, task}; -use yamux::{Config, ConnectionError, Control, WindowUpdateMode}; - -const PAYLOAD_SIZE: usize = 128 * 1024; - -#[test] -fn concurrent_streams() { - let _ = env_logger::try_init(); - - fn prop(tcp_buffer_sizes: Option) { - let data = Msg(vec![0x42; PAYLOAD_SIZE]); - let n_streams = 1000; - - Runtime::new().expect("new runtime").block_on(async move { - let (server, client) = connected_peers(config(), config(), tcp_buffer_sizes) - .await - .unwrap(); - - task::spawn(echo_server(server)); - - let (mut ctrl, client) = Control::new(client); - task::spawn(noop_server(client)); - - let result = (0..n_streams) - .map(|_| { - let data = data.clone(); - let mut ctrl = ctrl.clone(); - - task::spawn(async move { - let mut stream = ctrl.open_stream().await?; - log::debug!("C: opened new stream {}", stream.id()); - - send_recv_message(&mut stream, data).await?; - stream.close().await?; - - Ok::<(), ConnectionError>(()) - }) - }) - .collect::>() - .try_collect::>() - .await - .unwrap() - .into_iter() - .collect::, ConnectionError>>(); - - ctrl.close().await.expect("close connection"); - - assert_eq!(result.unwrap().len(), n_streams); - }); - } - - QuickCheck::new().tests(3).quickcheck(prop as fn(_) -> _) -} - -fn config() -> Config { - let mut server_cfg = Config::default(); - // Use a large frame size to speed up the test. - server_cfg.set_split_send_size(PAYLOAD_SIZE); - // Use `WindowUpdateMode::OnRead` so window updates are sent by the - // `Stream`s and subject to backpressure from the stream command channel. Thus - // the `Connection` I/O loop will not need to send window updates - // directly as a result of reading a frame, which can otherwise - // lead to mutual write deadlocks if the socket send buffers are too small. - // With `OnRead` the socket send buffer can even be smaller than the size - // of a single frame for this test. - server_cfg.set_window_update_mode(WindowUpdateMode::OnRead); - server_cfg -} diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index 1b8a5b21..4225bb9a 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -3,6 +3,7 @@ use futures::stream::FuturesUnordered; use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; use quickcheck::QuickCheck; use std::future::Future; +use std::iter; use std::pin::Pin; use std::task::{Context, Poll}; use test_harness::*; @@ -33,7 +34,7 @@ fn prop_config_send_recv_multi() { let socket = TcpStream::connect(address).await.expect("connect").compat(); let connection = Connection::new(socket, cfg2.0, Mode::Client); - MessageSender::new(connection, msgs).await + MessageSender::new(connection, msgs, false).await }; let (server_processed, client_processed) = @@ -47,6 +48,40 @@ fn prop_config_send_recv_multi() { QuickCheck::new().quickcheck(prop as fn(_, _, _) -> _) } +#[test] +fn concurrent_streams() { + let _ = env_logger::try_init(); + + fn prop(tcp_buffer_sizes: Option) { + const PAYLOAD_SIZE: usize = 128 * 1024; + + let data = Msg(vec![0x42; PAYLOAD_SIZE]); + let n_streams = 1000; + + let mut cfg = Config::default(); + cfg.set_split_send_size(PAYLOAD_SIZE); // Use a large frame size to speed up the test. + + Runtime::new().expect("new runtime").block_on(async move { + let (server, client) = connected_peers(cfg.clone(), cfg, tcp_buffer_sizes) + .await + .unwrap(); + + task::spawn(echo_server(server)); + let client = MessageSender::new( + client, + iter::repeat(data).take(n_streams).collect::>(), + true, + ); + + let num_processed = client.await.unwrap(); + + assert_eq!(num_processed, n_streams); + }); + } + + QuickCheck::new().tests(3).quickcheck(prop as fn(_) -> _) +} + #[test] fn prop_max_streams() { fn prop(n: usize) -> Result { @@ -76,15 +111,18 @@ struct MessageSender { pending_messages: Vec, worker_streams: FuturesUnordered>, streams_processed: usize, + /// Whether to spawn a new task for each stream. + spawn_tasks: bool, } impl MessageSender { - fn new(connection: Connection, messages: Vec) -> Self { + fn new(connection: Connection, messages: Vec, spawn_tasks: bool) -> Self { Self { connection, pending_messages: messages, worker_streams: FuturesUnordered::default(), streams_processed: 0, + spawn_tasks, } } } @@ -108,13 +146,18 @@ where if let Some(message) = this.pending_messages.pop() { match this.connection.poll_new_outbound(cx)? { Poll::Ready(mut stream) => { - this.worker_streams.push( - async move { - send_recv_message(&mut stream, message).await.unwrap(); - stream.close().await.unwrap(); - } - .boxed(), - ); + let future = async move { + send_recv_message(&mut stream, message).await.unwrap(); + stream.close().await.unwrap(); + }; + + let worker_stream_future = if this.spawn_tasks { + async { task::spawn(future).await.unwrap() }.boxed() + } else { + future.boxed() + }; + + this.worker_streams.push(worker_stream_future); continue; } Poll::Pending => { From 384ade8ab8d62d93508d4d64d870f098e59e4b8a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 18:57:22 +0100 Subject: [PATCH 18/33] Replace custom channel with `futures_ringbuf` --- test-harness/Cargo.toml | 1 + test-harness/tests/tests.rs | 151 +----------------------------------- yamux/src/connection.rs | 2 +- 3 files changed, 4 insertions(+), 150 deletions(-) diff --git a/test-harness/Cargo.toml b/test-harness/Cargo.toml index 92fe8a2f..55cead3e 100644 --- a/test-harness/Cargo.toml +++ b/test-harness/Cargo.toml @@ -16,3 +16,4 @@ log = "0.4.17" [dev-dependencies] env_logger = "0.10" constrained-connection = "0.1" +futures_ringbuf = "0.4.0" diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 9b04e271..7356e1dc 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -8,7 +8,6 @@ // at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license // at https://opensource.org/licenses/MIT. -use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::executor::LocalPool; use futures::future::join; use futures::io::AsyncReadExt; @@ -16,9 +15,7 @@ use futures::prelude::*; use futures::task::{Spawn, SpawnExt}; use quickcheck::{QuickCheck, TestResult}; use std::panic::panic_any; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll, Waker}; + use test_harness::*; use tokio::{runtime::Runtime, task}; use yamux::{Config, Connection, ConnectionError, Control, Mode}; @@ -160,7 +157,7 @@ fn write_deadlock() { // Create a bounded channel representing the underlying "connection". // Each endpoint gets a name and a bounded capacity for its outbound // channel (which is the other's inbound channel). - let (server_endpoint, client_endpoint) = bounded::channel(("S", capacity), ("C", capacity)); + let (server_endpoint, client_endpoint) = futures_ringbuf::Endpoint::pair(capacity, capacity); // Create and spawn a "server" that echoes every message back to the client. let server = Connection::new(server_endpoint, Config::default(), Mode::Server); @@ -247,147 +244,3 @@ async fn send_on_single_stream( Ok(()) } - -/// This module implements a duplex connection via channels with bounded -/// capacities. The channels used for the implementation are unbounded -/// as the operate at the granularity of variably-sized chunks of bytes -/// (`Vec`), whereas the capacity bounds (i.e. max. number of bytes -/// in transit in one direction) are enforced separately. -mod bounded { - use super::*; - use futures::ready; - use std::io::{Error, ErrorKind, Result}; - - pub struct Endpoint { - name: &'static str, - capacity: usize, - send: UnboundedSender>, - send_guard: Arc>, - recv: UnboundedReceiver>, - recv_buf: Vec, - recv_guard: Arc>, - } - - /// A `ChannelGuard` is used to enforce the maximum number of - /// bytes "in transit" across all chunks of an unbounded channel. - #[derive(Default)] - struct ChannelGuard { - size: usize, - waker: Option, - } - - pub fn channel( - (name_a, capacity_a): (&'static str, usize), - (name_b, capacity_b): (&'static str, usize), - ) -> (Endpoint, Endpoint) { - let (a_to_b_sender, a_to_b_receiver) = unbounded(); - let (b_to_a_sender, b_to_a_receiver) = unbounded(); - - let a_to_b_guard = Arc::new(Mutex::new(ChannelGuard::default())); - let b_to_a_guard = Arc::new(Mutex::new(ChannelGuard::default())); - - let a = Endpoint { - name: name_a, - capacity: capacity_a, - send: a_to_b_sender, - send_guard: a_to_b_guard.clone(), - recv: b_to_a_receiver, - recv_buf: Vec::new(), - recv_guard: b_to_a_guard.clone(), - }; - - let b = Endpoint { - name: name_b, - capacity: capacity_b, - send: b_to_a_sender, - send_guard: b_to_a_guard, - recv: a_to_b_receiver, - recv_buf: Vec::new(), - recv_guard: a_to_b_guard, - }; - - (a, b) - } - - impl AsyncRead for Endpoint { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - if self.recv_buf.is_empty() { - match ready!(self.recv.poll_next_unpin(cx)) { - Some(bytes) => { - self.recv_buf = bytes; - } - None => return Poll::Ready(Ok(0)), - } - } - - let n = std::cmp::min(buf.len(), self.recv_buf.len()); - buf[0..n].copy_from_slice(&self.recv_buf[0..n]); - self.recv_buf = self.recv_buf.split_off(n); - - let mut guard = self.recv_guard.lock().unwrap(); - if let Some(waker) = guard.waker.take() { - log::debug!( - "{}: read: notifying waker after read of {} bytes", - self.name, - n - ); - waker.wake(); - } - guard.size -= n; - - log::debug!( - "{}: read: channel: {}/{}", - self.name, - guard.size, - self.capacity - ); - - Poll::Ready(Ok(n)) - } - } - - impl AsyncWrite for Endpoint { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - debug_assert!(!buf.is_empty()); - let mut guard = self.send_guard.lock().unwrap(); - let n = std::cmp::min(self.capacity - guard.size, buf.len()); - if n == 0 { - log::debug!("{}: write: channel full, registering waker", self.name); - guard.waker = Some(cx.waker().clone()); - return Poll::Pending; - } - - self.send - .unbounded_send(buf[0..n].to_vec()) - .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))?; - - guard.size += n; - log::debug!( - "{}: write: channel: {}/{}", - self.name, - guard.size, - self.capacity - ); - - Poll::Ready(Ok(n)) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.send.poll_flush_unpin(cx)).unwrap(); - Poll::Ready(Ok(())) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.send.poll_close_unpin(cx)).unwrap(); - Poll::Ready(Ok(())) - } - } -} diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index 43f76226..5389e9e1 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -458,7 +458,7 @@ impl Active { match self.stream_receivers.poll_next_unpin(cx) { Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => { - self.on_send_frame(frame.into()); + self.on_send_frame(frame); continue; } Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => { From 9c090d3c29e5ac5fbe6847cb25c494e8d10f3b06 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 19:01:36 +0100 Subject: [PATCH 19/33] Remove `prop_send_recv` This test is covered by `prop_send_recv_multi`. --- test-harness/tests/tests.rs | 49 +------------------------------------ 1 file changed, 1 insertion(+), 48 deletions(-) diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 7356e1dc..88f8a8a6 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -13,7 +13,7 @@ use futures::future::join; use futures::io::AsyncReadExt; use futures::prelude::*; use futures::task::{Spawn, SpawnExt}; -use quickcheck::{QuickCheck, TestResult}; +use quickcheck::QuickCheck; use std::panic::panic_any; use test_harness::*; @@ -51,34 +51,6 @@ fn prop_config_send_recv_single() { .quickcheck(prop as fn(_, _, _) -> _) } -#[test] -fn prop_send_recv() { - fn prop(msgs: Vec) -> Result { - if msgs.is_empty() { - return Ok(TestResult::discard()); - } - - Runtime::new().unwrap().block_on(async move { - let (server, client) = - connected_peers(Config::default(), Config::default(), None).await?; - - let server = echo_server(server); - let client = async { - let (control, client) = Control::new(client); - task::spawn(noop_server(client)); - send_on_separate_streams(control, msgs).await?; - - Ok(()) - }; - - futures::future::try_join(server, client).await?; - - Ok(TestResult::passed()) - }) - } - QuickCheck::new().tests(1).quickcheck(prop as fn(_) -> _) -} - #[test] fn prop_send_recv_half_closed() { fn prop(msg: Msg) -> Result<(), ConnectionError> { @@ -206,25 +178,6 @@ fn write_deadlock() { ); } -/// Send all messages, opening a new stream for each one. -async fn send_on_separate_streams( - mut control: Control, - iter: impl IntoIterator, -) -> Result<(), ConnectionError> { - for msg in iter { - let mut stream = control.open_stream().await?; - log::debug!("C: new stream: {}", stream); - - send_recv_message(&mut stream, msg).await?; - stream.close().await?; - } - - log::debug!("C: closing connection"); - control.close().await?; - - Ok(()) -} - /// Send all messages, using only a single stream. async fn send_on_single_stream( mut control: Control, From 660e6c93ae51ae578aa47b4ec2c5e2c31923aff8 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 19:09:04 +0100 Subject: [PATCH 20/33] Rewrite `prop_send_recv_half_closed` to poll-api and move --- test-harness/tests/poll_api.rs | 58 +++++++++++++++++++++++++++++++++- test-harness/tests/tests.rs | 53 ------------------------------- 2 files changed, 57 insertions(+), 54 deletions(-) diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index 4225bb9a..a87048da 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -1,6 +1,6 @@ use futures::future::BoxFuture; use futures::stream::FuturesUnordered; -use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; +use futures::{future, stream, AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; use quickcheck::QuickCheck; use std::future::Future; use std::iter; @@ -106,6 +106,62 @@ fn prop_max_streams() { QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _) } +#[test] +fn prop_send_recv_half_closed() { + fn prop(msg: Msg) -> Result<(), ConnectionError> { + let msg_len = msg.0.len(); + + Runtime::new().unwrap().block_on(async move { + let (mut server, mut client) = + connected_peers(Config::default(), Config::default(), None).await?; + + // Server should be able to write on a stream shutdown by the client. + let server = async { + let mut server = stream::poll_fn(move |cx| server.poll_next_inbound(cx)); + + let mut first_stream = server.next().await.ok_or(ConnectionError::Closed)??; + + task::spawn(noop_server(server)); + + let mut buf = vec![0; msg_len]; + first_stream.read_exact(&mut buf).await?; + first_stream.write_all(&buf).await?; + first_stream.close().await?; + + Result::<(), ConnectionError>::Ok(()) + }; + + // Client should be able to read after shutting down the stream. + let client = async { + let mut stream = future::poll_fn(|cx| client.poll_new_outbound(cx)) + .await + .unwrap(); + task::spawn(noop_server(stream::poll_fn(move |cx| { + client.poll_next_inbound(cx) + }))); + + stream.write_all(&msg.0).await?; + stream.close().await?; + + assert!(stream.is_write_closed()); + let mut buf = vec![0; msg_len]; + stream.read_exact(&mut buf).await?; + + assert_eq!(buf, msg.0); + assert_eq!(Some(0), stream.read(&mut buf).await.ok()); + assert!(stream.is_closed()); + + Result::<(), ConnectionError>::Ok(()) + }; + + futures::future::try_join(server, client).await?; + + Ok(()) + }) + } + QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _) +} + struct MessageSender { connection: Connection, pending_messages: Vec, diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 88f8a8a6..4ac8bb32 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -51,59 +51,6 @@ fn prop_config_send_recv_single() { .quickcheck(prop as fn(_, _, _) -> _) } -#[test] -fn prop_send_recv_half_closed() { - fn prop(msg: Msg) -> Result<(), ConnectionError> { - let msg_len = msg.0.len(); - - Runtime::new().unwrap().block_on(async move { - let (mut server, client) = - connected_peers(Config::default(), Config::default(), None).await?; - - // Server should be able to write on a stream shutdown by the client. - let server = async { - let mut server = stream::poll_fn(move |cx| server.poll_next_inbound(cx)); - - let mut first_stream = server.next().await.ok_or(ConnectionError::Closed)??; - - task::spawn(noop_server(server)); - - let mut buf = vec![0; msg_len]; - first_stream.read_exact(&mut buf).await?; - first_stream.write_all(&buf).await?; - first_stream.close().await?; - - Result::<(), ConnectionError>::Ok(()) - }; - - // Client should be able to read after shutting down the stream. - let client = async { - let (mut control, client) = Control::new(client); - task::spawn(noop_server(client)); - - let mut stream = control.open_stream().await?; - stream.write_all(&msg.0).await?; - stream.close().await?; - - assert!(stream.is_write_closed()); - let mut buf = vec![0; msg_len]; - stream.read_exact(&mut buf).await?; - - assert_eq!(buf, msg.0); - assert_eq!(Some(0), stream.read(&mut buf).await.ok()); - assert!(stream.is_closed()); - - Result::<(), ConnectionError>::Ok(()) - }; - - futures::future::try_join(server, client).await?; - - Ok(()) - }) - } - QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _) -} - /// This test simulates two endpoints of a Yamux connection which may be unable to /// write simultaneously but can make progress by reading. If both endpoints /// don't read in-between trying to finish their writes, a deadlock occurs. From 7c87447f2a793b7bf68832335b5d9eb9436e99f0 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 19:29:44 +0100 Subject: [PATCH 21/33] Rewrite `prop_config_send_recv_single` to poll-api --- test-harness/tests/tests.rs | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 4ac8bb32..8d60d4ca 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -15,9 +15,10 @@ use futures::prelude::*; use futures::task::{Spawn, SpawnExt}; use quickcheck::QuickCheck; use std::panic::panic_any; +use std::pin::pin; use test_harness::*; -use tokio::{runtime::Runtime, task}; +use tokio::runtime::Runtime; use yamux::{Config, Connection, ConnectionError, Control, Mode}; #[test] @@ -30,13 +31,18 @@ fn prop_config_send_recv_single() { msgs.insert(0, Msg(vec![1u8; yamux::DEFAULT_CREDIT as usize])); Runtime::new().unwrap().block_on(async move { - let (server, client) = connected_peers(cfg1, cfg2, None).await?; - + let (server, mut client) = connected_peers(cfg1, cfg2, None).await?; let server = echo_server(server); + let client = async { - let (control, client) = Control::new(client); - task::spawn(noop_server(client)); - send_on_single_stream(control, msgs).await?; + let stream = future::poll_fn(|cx| client.poll_new_outbound(cx)) + .await + .unwrap(); + let client_task = noop_server(stream::poll_fn(|cx| client.poll_next_inbound(cx))); + + future::select(pin!(client_task), pin!(send_on_single_stream(stream, msgs))).await; + + future::poll_fn(|cx| client.poll_close(cx)).await.unwrap(); Ok(()) }; @@ -127,10 +133,9 @@ fn write_deadlock() { /// Send all messages, using only a single stream. async fn send_on_single_stream( - mut control: Control, + mut stream: yamux::Stream, iter: impl IntoIterator, ) -> Result<(), ConnectionError> { - let mut stream = control.open_stream().await?; log::debug!("C: new stream: {}", stream); for msg in iter { @@ -139,8 +144,5 @@ async fn send_on_single_stream( stream.close().await?; - log::debug!("C: closing connection"); - control.close().await?; - Ok(()) } From 05eee85aeb2d8139b281e337d643d2c809ea1d61 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 19:31:19 +0100 Subject: [PATCH 22/33] Move test to `poll_api` --- test-harness/src/lib.rs | 16 ++++++++++ test-harness/tests/poll_api.rs | 42 +++++++++++++++++++++++-- test-harness/tests/tests.rs | 57 +--------------------------------- 3 files changed, 57 insertions(+), 58 deletions(-) diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index 1e1771ab..009f5e23 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -138,6 +138,22 @@ pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): Msg) -> io: Ok(()) } +/// Send all messages, using only a single stream. +pub async fn send_on_single_stream( + mut stream: yamux::Stream, + iter: impl IntoIterator, +) -> Result<(), ConnectionError> { + log::debug!("C: new stream: {}", stream); + + for msg in iter { + send_recv_message(&mut stream, msg).await?; + } + + stream.close().await?; + + Ok(()) +} + pub struct EchoServer { connection: Connection, worker_streams: FuturesUnordered>>, diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index a87048da..cbb3ec14 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -1,10 +1,12 @@ use futures::future::BoxFuture; use futures::stream::FuturesUnordered; -use futures::{future, stream, AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; +use futures::{ + future, stream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt, +}; use quickcheck::QuickCheck; use std::future::Future; use std::iter; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::task::{Context, Poll}; use test_harness::*; use tokio::net::TcpStream; @@ -162,6 +164,42 @@ fn prop_send_recv_half_closed() { QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _) } +#[test] +fn prop_config_send_recv_single() { + fn prop( + mut msgs: Vec, + TestConfig(cfg1): TestConfig, + TestConfig(cfg2): TestConfig, + ) -> Result<(), ConnectionError> { + msgs.insert(0, Msg(vec![1u8; yamux::DEFAULT_CREDIT as usize])); + + Runtime::new().unwrap().block_on(async move { + let (server, mut client) = connected_peers(cfg1, cfg2, None).await?; + let server = echo_server(server); + + let client = async { + let stream = future::poll_fn(|cx| client.poll_new_outbound(cx)) + .await + .unwrap(); + let client_task = noop_server(stream::poll_fn(|cx| client.poll_next_inbound(cx))); + + future::select(pin!(client_task), pin!(send_on_single_stream(stream, msgs))).await; + + future::poll_fn(|cx| client.poll_close(cx)).await.unwrap(); + + Ok(()) + }; + + futures::future::try_join(server, client).await?; + + Ok(()) + }) + } + QuickCheck::new() + .tests(10) + .quickcheck(prop as fn(_, _, _) -> _) +} + struct MessageSender { connection: Connection, pending_messages: Vec, diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 8d60d4ca..15a2745e 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -13,49 +13,10 @@ use futures::future::join; use futures::io::AsyncReadExt; use futures::prelude::*; use futures::task::{Spawn, SpawnExt}; -use quickcheck::QuickCheck; use std::panic::panic_any; -use std::pin::pin; use test_harness::*; -use tokio::runtime::Runtime; -use yamux::{Config, Connection, ConnectionError, Control, Mode}; - -#[test] -fn prop_config_send_recv_single() { - fn prop( - mut msgs: Vec, - TestConfig(cfg1): TestConfig, - TestConfig(cfg2): TestConfig, - ) -> Result<(), ConnectionError> { - msgs.insert(0, Msg(vec![1u8; yamux::DEFAULT_CREDIT as usize])); - - Runtime::new().unwrap().block_on(async move { - let (server, mut client) = connected_peers(cfg1, cfg2, None).await?; - let server = echo_server(server); - - let client = async { - let stream = future::poll_fn(|cx| client.poll_new_outbound(cx)) - .await - .unwrap(); - let client_task = noop_server(stream::poll_fn(|cx| client.poll_next_inbound(cx))); - - future::select(pin!(client_task), pin!(send_on_single_stream(stream, msgs))).await; - - future::poll_fn(|cx| client.poll_close(cx)).await.unwrap(); - - Ok(()) - }; - - futures::future::try_join(server, client).await?; - - Ok(()) - }) - } - QuickCheck::new() - .tests(10) - .quickcheck(prop as fn(_, _, _) -> _) -} +use yamux::{Config, Connection, Control, Mode}; /// This test simulates two endpoints of a Yamux connection which may be unable to /// write simultaneously but can make progress by reading. If both endpoints @@ -130,19 +91,3 @@ fn write_deadlock() { .unwrap(), ); } - -/// Send all messages, using only a single stream. -async fn send_on_single_stream( - mut stream: yamux::Stream, - iter: impl IntoIterator, -) -> Result<(), ConnectionError> { - log::debug!("C: new stream: {}", stream); - - for msg in iter { - send_recv_message(&mut stream, msg).await?; - } - - stream.close().await?; - - Ok(()) -} From efa73103ee471dd3086a83394c69c85ec332454c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 19:33:48 +0100 Subject: [PATCH 23/33] Rewrite deadlock test to use poll-api --- test-harness/tests/tests.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 15a2745e..bf521632 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -16,7 +16,7 @@ use futures::task::{Spawn, SpawnExt}; use std::panic::panic_any; use test_harness::*; -use yamux::{Config, Connection, Control, Mode}; +use yamux::{Config, Connection, Mode}; /// This test simulates two endpoints of a Yamux connection which may be unable to /// write simultaneously but can make progress by reading. If both endpoints @@ -57,12 +57,19 @@ fn write_deadlock() { // Create and spawn a "client" that sends messages expected to be echoed // by the server. - let client = Connection::new(client_endpoint, Config::default(), Mode::Client); - let (mut ctrl, client) = Control::new(client); + let mut client = Connection::new(client_endpoint, Config::default(), Mode::Client); + + let stream = pool + .run_until(future::poll_fn(|cx| client.poll_new_outbound(cx))) + .unwrap(); // Continuously advance the Yamux connection of the client in a background task. pool.spawner() - .spawn_obj(noop_server(client).boxed().into()) + .spawn_obj( + noop_server(stream::poll_fn(move |cx| client.poll_next_inbound(cx))) + .boxed() + .into(), + ) .unwrap(); // Send the message, expecting it to be echo'd. @@ -70,7 +77,6 @@ fn write_deadlock() { pool.spawner() .spawn_with_handle( async move { - let stream = ctrl.open_stream().await.unwrap(); let (mut reader, mut writer) = AsyncReadExt::split(stream); let mut b = vec![0; msg.len()]; // Write & read concurrently, so that the client is able From 0f10d51739596a97885501bb705f95b6dc6198f3 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 19:34:59 +0100 Subject: [PATCH 24/33] Move final test to poll_api --- test-harness/tests/poll_api.rs | 86 ++++++++++++++++++++++++++++- test-harness/tests/tests.rs | 99 ---------------------------------- 2 files changed, 85 insertions(+), 100 deletions(-) delete mode 100644 test-harness/tests/tests.rs diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index cbb3ec14..9b4332d1 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -1,11 +1,15 @@ -use futures::future::BoxFuture; +use futures::executor::LocalPool; +use futures::future::{join, BoxFuture}; +use futures::prelude::*; use futures::stream::FuturesUnordered; +use futures::task::{Spawn, SpawnExt}; use futures::{ future, stream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt, }; use quickcheck::QuickCheck; use std::future::Future; use std::iter; +use std::panic::panic_any; use std::pin::{pin, Pin}; use std::task::{Context, Poll}; use test_harness::*; @@ -200,6 +204,86 @@ fn prop_config_send_recv_single() { .quickcheck(prop as fn(_, _, _) -> _) } +/// This test simulates two endpoints of a Yamux connection which may be unable to +/// write simultaneously but can make progress by reading. If both endpoints +/// don't read in-between trying to finish their writes, a deadlock occurs. +#[test] +fn write_deadlock() { + let _ = env_logger::try_init(); + let mut pool = LocalPool::new(); + + // We make the message to transmit large enough s.t. the "server" + // is forced to start writing (i.e. echoing) the bytes before + // having read the entire payload. + let msg = vec![1u8; 1024 * 1024]; + + // We choose a "connection capacity" that is artificially below + // the size of a receive window. If it were equal or greater, + // multiple concurrently writing streams would be needed to non-deterministically + // provoke the write deadlock. This is supposed to reflect the + // fact that the sum of receive windows of all open streams can easily + // be larger than the send capacity of the connection at any point in time. + // Using such a low capacity here therefore yields a more reproducible test. + let capacity = 1024; + + // Create a bounded channel representing the underlying "connection". + // Each endpoint gets a name and a bounded capacity for its outbound + // channel (which is the other's inbound channel). + let (server_endpoint, client_endpoint) = futures_ringbuf::Endpoint::pair(capacity, capacity); + + // Create and spawn a "server" that echoes every message back to the client. + let server = Connection::new(server_endpoint, Config::default(), Mode::Server); + pool.spawner() + .spawn_obj( + async move { echo_server(server).await.unwrap() } + .boxed() + .into(), + ) + .unwrap(); + + // Create and spawn a "client" that sends messages expected to be echoed + // by the server. + let mut client = Connection::new(client_endpoint, Config::default(), Mode::Client); + + let stream = pool + .run_until(future::poll_fn(|cx| client.poll_new_outbound(cx))) + .unwrap(); + + // Continuously advance the Yamux connection of the client in a background task. + pool.spawner() + .spawn_obj( + noop_server(stream::poll_fn(move |cx| client.poll_next_inbound(cx))) + .boxed() + .into(), + ) + .unwrap(); + + // Send the message, expecting it to be echo'd. + pool.run_until( + pool.spawner() + .spawn_with_handle( + async move { + let (mut reader, mut writer) = AsyncReadExt::split(stream); + let mut b = vec![0; msg.len()]; + // Write & read concurrently, so that the client is able + // to start reading the echo'd bytes before it even finished + // sending them all. + let _ = join( + writer.write_all(msg.as_ref()).map_err(|e| panic_any(e)), + reader.read_exact(&mut b[..]).map_err(|e| panic_any(e)), + ) + .await; + let mut stream = reader.reunite(writer).unwrap(); + stream.close().await.unwrap(); + log::debug!("C: Stream {} done.", stream.id()); + assert_eq!(b, msg); + } + .boxed(), + ) + .unwrap(), + ); +} + struct MessageSender { connection: Connection, pending_messages: Vec, diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs deleted file mode 100644 index bf521632..00000000 --- a/test-harness/tests/tests.rs +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -use futures::executor::LocalPool; -use futures::future::join; -use futures::io::AsyncReadExt; -use futures::prelude::*; -use futures::task::{Spawn, SpawnExt}; -use std::panic::panic_any; - -use test_harness::*; -use yamux::{Config, Connection, Mode}; - -/// This test simulates two endpoints of a Yamux connection which may be unable to -/// write simultaneously but can make progress by reading. If both endpoints -/// don't read in-between trying to finish their writes, a deadlock occurs. -#[test] -fn write_deadlock() { - let _ = env_logger::try_init(); - let mut pool = LocalPool::new(); - - // We make the message to transmit large enough s.t. the "server" - // is forced to start writing (i.e. echoing) the bytes before - // having read the entire payload. - let msg = vec![1u8; 1024 * 1024]; - - // We choose a "connection capacity" that is artificially below - // the size of a receive window. If it were equal or greater, - // multiple concurrently writing streams would be needed to non-deterministically - // provoke the write deadlock. This is supposed to reflect the - // fact that the sum of receive windows of all open streams can easily - // be larger than the send capacity of the connection at any point in time. - // Using such a low capacity here therefore yields a more reproducible test. - let capacity = 1024; - - // Create a bounded channel representing the underlying "connection". - // Each endpoint gets a name and a bounded capacity for its outbound - // channel (which is the other's inbound channel). - let (server_endpoint, client_endpoint) = futures_ringbuf::Endpoint::pair(capacity, capacity); - - // Create and spawn a "server" that echoes every message back to the client. - let server = Connection::new(server_endpoint, Config::default(), Mode::Server); - pool.spawner() - .spawn_obj( - async move { echo_server(server).await.unwrap() } - .boxed() - .into(), - ) - .unwrap(); - - // Create and spawn a "client" that sends messages expected to be echoed - // by the server. - let mut client = Connection::new(client_endpoint, Config::default(), Mode::Client); - - let stream = pool - .run_until(future::poll_fn(|cx| client.poll_new_outbound(cx))) - .unwrap(); - - // Continuously advance the Yamux connection of the client in a background task. - pool.spawner() - .spawn_obj( - noop_server(stream::poll_fn(move |cx| client.poll_next_inbound(cx))) - .boxed() - .into(), - ) - .unwrap(); - - // Send the message, expecting it to be echo'd. - pool.run_until( - pool.spawner() - .spawn_with_handle( - async move { - let (mut reader, mut writer) = AsyncReadExt::split(stream); - let mut b = vec![0; msg.len()]; - // Write & read concurrently, so that the client is able - // to start reading the echo'd bytes before it even finished - // sending them all. - let _ = join( - writer.write_all(msg.as_ref()).map_err(|e| panic_any(e)), - reader.read_exact(&mut b[..]).map_err(|e| panic_any(e)), - ) - .await; - let mut stream = reader.reunite(writer).unwrap(); - stream.close().await.unwrap(); - log::debug!("C: Stream {} done.", stream.id()); - assert_eq!(b, msg); - } - .boxed(), - ) - .unwrap(), - ); -} From cb56b70e129571b105b93203cca37baa26fd0ab0 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Jun 2023 11:53:06 +0100 Subject: [PATCH 25/33] Fix errors after merge --- test-harness/tests/ack_backlog.rs | 2 +- test-harness/tests/ack_timing.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test-harness/tests/ack_backlog.rs b/test-harness/tests/ack_backlog.rs index 2f5dd931..2aa4e33a 100644 --- a/test-harness/tests/ack_backlog.rs +++ b/test-harness/tests/ack_backlog.rs @@ -18,7 +18,7 @@ async fn honours_ack_backlog_of_256() { let (tx, rx) = oneshot::channel(); - let (listener, address) = bind().await.expect("bind"); + let (listener, address) = bind(None).await.expect("bind"); let server = async { let socket = listener.accept().await.expect("accept").0.compat(); diff --git a/test-harness/tests/ack_timing.rs b/test-harness/tests/ack_timing.rs index 5f835e24..9e071855 100644 --- a/test-harness/tests/ack_timing.rs +++ b/test-harness/tests/ack_timing.rs @@ -14,7 +14,7 @@ use yamux::{Config, Connection, ConnectionError, Mode, Stream}; async fn stream_is_acknowledged_on_first_use() { let _ = env_logger::try_init(); - let (listener, address) = bind().await.expect("bind"); + let (listener, address) = bind(None).await.expect("bind"); let server = async { let socket = listener.accept().await.expect("accept").0.compat(); From 749fd9ab323d4191f1190668ee3dfba4c13e89fc Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Jun 2023 11:56:00 +0100 Subject: [PATCH 26/33] Move `MessageSender` to test-harness --- test-harness/src/lib.rs | 85 ++++++++++++++++++++++++++++++ test-harness/tests/poll_api.rs | 95 ++-------------------------------- 2 files changed, 88 insertions(+), 92 deletions(-) diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index 009f5e23..b01169a1 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -11,6 +11,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::{fmt, io, mem}; use tokio::net::{TcpListener, TcpSocket, TcpStream}; +use tokio::task; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; use yamux::ConnectionError; use yamux::{Config, WindowUpdateMode}; @@ -83,6 +84,90 @@ where .await } +pub struct MessageSender { + connection: Connection, + pending_messages: Vec, + worker_streams: FuturesUnordered>, + streams_processed: usize, + /// Whether to spawn a new task for each stream. + spawn_tasks: bool, +} + +impl MessageSender { + pub fn new(connection: Connection, messages: Vec, spawn_tasks: bool) -> Self { + Self { + connection, + pending_messages: messages, + worker_streams: FuturesUnordered::default(), + streams_processed: 0, + spawn_tasks, + } + } +} + +impl Future for MessageSender +where + T: AsyncRead + AsyncWrite + Unpin, +{ + type Output = yamux::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + if this.pending_messages.is_empty() && this.worker_streams.is_empty() { + futures::ready!(this.connection.poll_close(cx)?); + + return Poll::Ready(Ok(this.streams_processed)); + } + + if let Some(message) = this.pending_messages.pop() { + match this.connection.poll_new_outbound(cx)? { + Poll::Ready(mut stream) => { + let future = async move { + send_recv_message(&mut stream, message).await.unwrap(); + stream.close().await.unwrap(); + }; + + let worker_stream_future = if this.spawn_tasks { + async { task::spawn(future).await.unwrap() }.boxed() + } else { + future.boxed() + }; + + this.worker_streams.push(worker_stream_future); + continue; + } + Poll::Pending => { + this.pending_messages.push(message); + } + } + } + + match this.worker_streams.poll_next_unpin(cx) { + Poll::Ready(Some(())) => { + this.streams_processed += 1; + continue; + } + Poll::Ready(None) | Poll::Pending => {} + } + + match this.connection.poll_next_inbound(cx)? { + Poll::Ready(Some(stream)) => { + drop(stream); + panic!("Did not expect remote to open a stream"); + } + Poll::Ready(None) => { + panic!("Did not expect remote to close the connection"); + } + Poll::Pending => {} + } + + return Poll::Pending; + } + } +} + /// For each incoming stream, do nothing. pub async fn noop_server(c: impl Stream>) { c.for_each(|maybe_stream| { diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index 9b4332d1..37f0ad72 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -1,17 +1,12 @@ use futures::executor::LocalPool; -use futures::future::{join, BoxFuture}; +use futures::future::join; use futures::prelude::*; -use futures::stream::FuturesUnordered; use futures::task::{Spawn, SpawnExt}; -use futures::{ - future, stream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt, -}; +use futures::{future, stream, AsyncReadExt, AsyncWriteExt, FutureExt, StreamExt}; use quickcheck::QuickCheck; -use std::future::Future; use std::iter; use std::panic::panic_any; -use std::pin::{pin, Pin}; -use std::task::{Context, Poll}; +use std::pin::pin; use test_harness::*; use tokio::net::TcpStream; use tokio::runtime::Runtime; @@ -283,87 +278,3 @@ fn write_deadlock() { .unwrap(), ); } - -struct MessageSender { - connection: Connection, - pending_messages: Vec, - worker_streams: FuturesUnordered>, - streams_processed: usize, - /// Whether to spawn a new task for each stream. - spawn_tasks: bool, -} - -impl MessageSender { - fn new(connection: Connection, messages: Vec, spawn_tasks: bool) -> Self { - Self { - connection, - pending_messages: messages, - worker_streams: FuturesUnordered::default(), - streams_processed: 0, - spawn_tasks, - } - } -} - -impl Future for MessageSender -where - T: AsyncRead + AsyncWrite + Unpin, -{ - type Output = yamux::Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - loop { - if this.pending_messages.is_empty() && this.worker_streams.is_empty() { - futures::ready!(this.connection.poll_close(cx)?); - - return Poll::Ready(Ok(this.streams_processed)); - } - - if let Some(message) = this.pending_messages.pop() { - match this.connection.poll_new_outbound(cx)? { - Poll::Ready(mut stream) => { - let future = async move { - send_recv_message(&mut stream, message).await.unwrap(); - stream.close().await.unwrap(); - }; - - let worker_stream_future = if this.spawn_tasks { - async { task::spawn(future).await.unwrap() }.boxed() - } else { - future.boxed() - }; - - this.worker_streams.push(worker_stream_future); - continue; - } - Poll::Pending => { - this.pending_messages.push(message); - } - } - } - - match this.worker_streams.poll_next_unpin(cx) { - Poll::Ready(Some(())) => { - this.streams_processed += 1; - continue; - } - Poll::Ready(None) | Poll::Pending => {} - } - - match this.connection.poll_next_inbound(cx)? { - Poll::Ready(Some(stream)) => { - drop(stream); - panic!("Did not expect remote to open a stream"); - } - Poll::Ready(None) => { - panic!("Did not expect remote to close the connection"); - } - Poll::Pending => {} - } - - return Poll::Pending; - } - } -} From 7ff430fa100b10f2d942acf4e34964ec0b4280cc Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Jun 2023 12:07:39 +0100 Subject: [PATCH 27/33] Move benchmarks to test-harness --- test-harness/Cargo.toml | 9 +++++++++ {yamux => test-harness}/benches/concurrent.rs | 0 yamux/Cargo.toml | 12 ------------ 3 files changed, 9 insertions(+), 12 deletions(-) rename {yamux => test-harness}/benches/concurrent.rs (100%) diff --git a/test-harness/Cargo.toml b/test-harness/Cargo.toml index 55cead3e..39b702d5 100644 --- a/test-harness/Cargo.toml +++ b/test-harness/Cargo.toml @@ -14,6 +14,15 @@ anyhow = "1" log = "0.4.17" [dev-dependencies] +criterion = "0.5" env_logger = "0.10" +futures = "0.3.4" +quickcheck = "1.0" +tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros", "time"] } +tokio-util = { version = "0.7", features = ["compat"] } constrained-connection = "0.1" futures_ringbuf = "0.4.0" + +[[bench]] +name = "concurrent" +harness = false diff --git a/yamux/benches/concurrent.rs b/test-harness/benches/concurrent.rs similarity index 100% rename from yamux/benches/concurrent.rs rename to test-harness/benches/concurrent.rs diff --git a/yamux/Cargo.toml b/yamux/Cargo.toml index 09322d3c..b1407ac0 100644 --- a/yamux/Cargo.toml +++ b/yamux/Cargo.toml @@ -19,16 +19,4 @@ static_assertions = "1" pin-project = "1.1.0" [dev-dependencies] -anyhow = "1" -criterion = "0.5" -env_logger = "0.10" -futures = "0.3.4" quickcheck = "1.0" -tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros", "time"] } -tokio-util = { version = "0.7", features = ["compat"] } -constrained-connection = "0.1" -futures_ringbuf = "0.4.0" - -[[bench]] -name = "concurrent" -harness = false From d3667e3c980dcb8b25a846dba8b6e3c671f0f0ca Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Jun 2023 12:13:51 +0100 Subject: [PATCH 28/33] Add message multiplier to `MessageSender` --- test-harness/src/lib.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index b01169a1..e4d0d3c9 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -91,6 +91,8 @@ pub struct MessageSender { streams_processed: usize, /// Whether to spawn a new task for each stream. spawn_tasks: bool, + /// How many times to send each message on the stream + message_multiplier: u64, } impl MessageSender { @@ -101,8 +103,14 @@ impl MessageSender { worker_streams: FuturesUnordered::default(), streams_processed: 0, spawn_tasks, + message_multiplier: 1, } } + + pub fn with_message_multiplier(mut self, multiplier: u64) -> Self { + self.message_multiplier = multiplier; + self + } } impl Future for MessageSender @@ -124,8 +132,13 @@ where if let Some(message) = this.pending_messages.pop() { match this.connection.poll_new_outbound(cx)? { Poll::Ready(mut stream) => { + let multiplier = this.message_multiplier; + let future = async move { - send_recv_message(&mut stream, message).await.unwrap(); + for _ in 0..multiplier { + send_recv_message(&mut stream, &message).await.unwrap(); + } + stream.close().await.unwrap(); }; @@ -203,13 +216,13 @@ impl Arbitrary for TcpBufferSizes { } } -pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): Msg) -> io::Result<()> { +pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): &Msg) -> io::Result<()> { let id = stream.id(); let (mut reader, mut writer) = AsyncReadExt::split(stream); let len = msg.len(); let write_fut = async { - writer.write_all(&msg).await.unwrap(); + writer.write_all(msg).await.unwrap(); log::debug!("C: {}: sent {} bytes", id, len); }; let mut data = vec![0; msg.len()]; @@ -218,7 +231,7 @@ pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): Msg) -> io: log::debug!("C: {}: received {} bytes", id, data.len()); }; futures::future::join(write_fut, read_fut).await; - assert_eq!(data, msg); + assert_eq!(&data, msg); Ok(()) } @@ -231,7 +244,7 @@ pub async fn send_on_single_stream( log::debug!("C: new stream: {}", stream); for msg in iter { - send_recv_message(&mut stream, msg).await?; + send_recv_message(&mut stream, &msg).await?; } stream.close().await?; From 358a7285a189d1b45423e438989e74d4f7057230 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Jun 2023 12:20:57 +0100 Subject: [PATCH 29/33] Migrate benchmark away from `Control` --- test-harness/benches/concurrent.rs | 73 +++++++----------------------- test-harness/src/lib.rs | 70 ++++++++++++++++++++++------ 2 files changed, 72 insertions(+), 71 deletions(-) diff --git a/test-harness/benches/concurrent.rs b/test-harness/benches/concurrent.rs index 5da8ce29..f7179cee 100644 --- a/test-harness/benches/concurrent.rs +++ b/test-harness/benches/concurrent.rs @@ -10,10 +10,11 @@ use constrained_connection::{new_unconstrained_connection, samples, Endpoint}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; -use futures::{channel::mpsc, future, io::AsyncReadExt, prelude::*}; +use std::iter; use std::sync::Arc; +use test_harness::{dev_null_server, MessageSender, MessageSenderStrategy, Msg}; use tokio::{runtime::Runtime, task}; -use yamux::{Config, Connection, Control, Mode}; +use yamux::{Config, Connection, Mode}; criterion_group!(benches, concurrent); criterion_main!(benches); @@ -86,62 +87,20 @@ async fn oneway( server: Endpoint, client: Endpoint, ) { - let msg_len = data.0.len(); - let (tx, rx) = mpsc::unbounded(); + let server = Connection::new(server, config(), Mode::Server); + let client = Connection::new(client, config(), Mode::Client); - let server = async move { - let mut connection = Connection::new(server, config(), Mode::Server); + task::spawn(dev_null_server(server)); - while let Some(Ok(mut stream)) = stream::poll_fn(|cx| connection.poll_next_inbound(cx)) - .next() - .await - { - let tx = tx.clone(); - - task::spawn(async move { - let mut n = 0; - let mut b = vec![0; msg_len]; - - // Receive `nmessages` messages. - for _ in 0..nmessages { - stream.read_exact(&mut b[..]).await.unwrap(); - n += b.len(); - } - - tx.unbounded_send(n).expect("unbounded_send"); - stream.close().await.unwrap(); - }); - } - }; - task::spawn(server); - - let conn = Connection::new(client, config(), Mode::Client); - let (mut ctrl, conn) = Control::new(conn); - - task::spawn(conn.for_each(|r| { - r.unwrap(); - future::ready(()) - })); - - for _ in 0..nstreams { - let data = data.clone(); - let mut ctrl = ctrl.clone(); - task::spawn(async move { - let mut stream = ctrl.open_stream().await.unwrap(); - - // Send `nmessages` messages. - for _ in 0..nmessages { - stream.write_all(data.as_ref()).await.unwrap(); - } - - stream.close().await.unwrap(); - }); - } - - let n = rx + let messages = iter::repeat(data) + .map(|b| Msg(b.0.to_vec())) .take(nstreams) - .fold(0, |acc, n| future::ready(acc + n)) - .await; - assert_eq!(n, nstreams * nmessages * msg_len); - ctrl.close().await.expect("close"); + .collect(); // `MessageSender` will use 1 stream per message. + let num_streams_used = MessageSender::new(client, messages, true) + .with_message_multiplier(nmessages as u64) + .with_strategy(MessageSenderStrategy::Send) + .await + .unwrap(); + + assert_eq!(num_streams_used, nstreams); } diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index e4d0d3c9..ff99093c 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -69,8 +69,8 @@ fn new_socket(buffer_sizes: Option) -> io::Result { /// For each incoming stream of `c` echo back to the sender. pub async fn echo_server(mut c: Connection) -> Result<(), ConnectionError> -where - T: AsyncRead + AsyncWrite + Unpin, + where + T: AsyncRead + AsyncWrite + Unpin, { stream::poll_fn(|cx| c.poll_next_inbound(cx)) .try_for_each_concurrent(None, |mut stream| async move { @@ -84,6 +84,27 @@ where .await } +/// For each incoming stream of `c`, read to end but don't write back. +pub async fn dev_null_server(mut c: Connection) -> Result<(), ConnectionError> + where + T: AsyncRead + AsyncWrite + Unpin, +{ + stream::poll_fn(|cx| c.poll_next_inbound(cx)) + .try_for_each_concurrent(None, |mut stream| async move { + let mut buf = [0u8; 1024]; + + while let Ok(n) = stream.read(&mut buf).await { + if n == 0 { + break; + } + } + + stream.close().await?; + Ok(()) + }) + .await +} + pub struct MessageSender { connection: Connection, pending_messages: Vec, @@ -93,6 +114,13 @@ pub struct MessageSender { spawn_tasks: bool, /// How many times to send each message on the stream message_multiplier: u64, + strategy: MessageSenderStrategy, +} + +#[derive(Copy, Clone)] +pub enum MessageSenderStrategy { + SendRecv, + Send, } impl MessageSender { @@ -104,6 +132,7 @@ impl MessageSender { streams_processed: 0, spawn_tasks, message_multiplier: 1, + strategy: MessageSenderStrategy::SendRecv, } } @@ -111,11 +140,16 @@ impl MessageSender { self.message_multiplier = multiplier; self } + + pub fn with_strategy(mut self, strategy: MessageSenderStrategy) -> Self { + self.strategy = strategy; + self + } } impl Future for MessageSender -where - T: AsyncRead + AsyncWrite + Unpin, + where + T: AsyncRead + AsyncWrite + Unpin, { type Output = yamux::Result; @@ -133,10 +167,18 @@ where match this.connection.poll_new_outbound(cx)? { Poll::Ready(mut stream) => { let multiplier = this.message_multiplier; + let strategy = this.strategy; let future = async move { for _ in 0..multiplier { - send_recv_message(&mut stream, &message).await.unwrap(); + match strategy { + MessageSenderStrategy::SendRecv => { + send_recv_message(&mut stream, &message).await.unwrap() + } + MessageSenderStrategy::Send => { + stream.write_all(&message.0).await.unwrap() + } + }; } stream.close().await.unwrap(); @@ -182,12 +224,12 @@ where } /// For each incoming stream, do nothing. -pub async fn noop_server(c: impl Stream>) { +pub async fn noop_server(c: impl Stream>) { c.for_each(|maybe_stream| { drop(maybe_stream); future::ready(()) }) - .await; + .await; } /// Send and receive buffer size for a TCP socket. @@ -239,7 +281,7 @@ pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): &Msg) -> io /// Send all messages, using only a single stream. pub async fn send_on_single_stream( mut stream: yamux::Stream, - iter: impl IntoIterator, + iter: impl IntoIterator, ) -> Result<(), ConnectionError> { log::debug!("C: new stream: {}", stream); @@ -271,8 +313,8 @@ impl EchoServer { } impl Future for EchoServer -where - T: AsyncRead + AsyncWrite + Unpin, + where + T: AsyncRead + AsyncWrite + Unpin, { type Output = yamux::Result; @@ -308,7 +350,7 @@ where stream.close().await?; Ok(()) } - .boxed(), + .boxed(), ); continue; } @@ -342,8 +384,8 @@ impl OpenStreamsClient { } impl Future for OpenStreamsClient -where - T: AsyncRead + AsyncWrite + Unpin + fmt::Debug, + where + T: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { type Output = yamux::Result<(Connection, Vec)>; @@ -395,7 +437,7 @@ impl Arbitrary for Msg { msg } - fn shrink(&self) -> Box> { + fn shrink(&self) -> Box> { Box::new(self.0.shrink().filter(|v| !v.is_empty()).map(Msg)) } } From d5d5972cbdec1b6d045c1b1d11adcd453b719eb0 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Jun 2023 12:41:34 +0100 Subject: [PATCH 30/33] Remove `Control` and `ControlledConnection` --- CHANGELOG.md | 6 ++ yamux/Cargo.toml | 2 +- yamux/src/control.rs | 246 ------------------------------------------- yamux/src/lib.rs | 8 -- 4 files changed, 7 insertions(+), 255 deletions(-) delete mode 100644 yamux/src/control.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index f5cbde49..e0a3f271 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.12.0 - unreleased + +- Remove `Control` and `ControlledConnection`. + Users are encouraged to move to the `poll_` functions of `Connection`. + See [PR #164](https://github.com/libp2p/rust-yamux/pull/164). + # 0.11.1 - Avoid race condition between pending frames and closing stream. diff --git a/yamux/Cargo.toml b/yamux/Cargo.toml index b1407ac0..431bd6c7 100644 --- a/yamux/Cargo.toml +++ b/yamux/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yamux" -version = "0.11.1" +version = "0.12.0" authors = ["Parity Technologies "] license = "Apache-2.0 OR MIT" description = "Multiplexer over reliable, ordered connections" diff --git a/yamux/src/control.rs b/yamux/src/control.rs deleted file mode 100644 index 48c9aa86..00000000 --- a/yamux/src/control.rs +++ /dev/null @@ -1,246 +0,0 @@ -// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -use crate::MAX_COMMAND_BACKLOG; -use crate::{error::ConnectionError, Connection, Result, Stream}; -use futures::{ - channel::{mpsc, oneshot}, - prelude::*, -}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// A Yamux [`Connection`] controller. -/// -/// This presents an alternative API for using a yamux [`Connection`]. -/// -/// A [`Control`] communicates with a [`ControlledConnection`] via a channel. This allows -/// a [`Control`] to be cloned and shared between tasks and threads. -#[derive(Clone, Debug)] -pub struct Control { - /// Command channel to [`ControlledConnection`]. - sender: mpsc::Sender, -} - -impl Control { - pub fn new(connection: Connection) -> (Self, ControlledConnection) { - let (sender, receiver) = mpsc::channel(MAX_COMMAND_BACKLOG); - - let control = Control { sender }; - let connection = ControlledConnection { - state: State::Idle(connection), - commands: receiver, - }; - - (control, connection) - } - - /// Open a new stream to the remote. - pub async fn open_stream(&mut self) -> Result { - let (tx, rx) = oneshot::channel(); - self.sender.send(ControlCommand::OpenStream(tx)).await?; - rx.await? - } - - /// Close the connection. - pub async fn close(&mut self) -> Result<()> { - let (tx, rx) = oneshot::channel(); - if self - .sender - .send(ControlCommand::CloseConnection(tx)) - .await - .is_err() - { - // The receiver is closed which means the connection is already closed. - return Ok(()); - } - // A dropped `oneshot::Sender` means the `Connection` is gone, - // so we do not treat receive errors differently here. - let _ = rx.await; - Ok(()) - } -} - -/// Wraps a [`Connection`] which can be controlled with a [`Control`]. -pub struct ControlledConnection { - state: State, - commands: mpsc::Receiver, -} - -impl ControlledConnection -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - loop { - match std::mem::replace(&mut self.state, State::Poisoned) { - State::Idle(mut connection) => { - match connection.poll_next_inbound(cx) { - Poll::Ready(maybe_stream) => { - self.state = State::Idle(connection); - return Poll::Ready(maybe_stream); - } - Poll::Pending => {} - } - - match self.commands.poll_next_unpin(cx) { - Poll::Ready(Some(ControlCommand::OpenStream(reply))) => { - self.state = State::OpeningNewStream { reply, connection }; - continue; - } - Poll::Ready(Some(ControlCommand::CloseConnection(reply))) => { - self.commands.close(); - - self.state = State::Closing { - reply: Some(reply), - inner: Closing::DrainingControlCommands { connection }, - }; - continue; - } - Poll::Ready(None) => { - // Last `Control` sender was dropped, close te connection. - self.state = State::Closing { - reply: None, - inner: Closing::ClosingConnection { connection }, - }; - continue; - } - Poll::Pending => {} - } - - self.state = State::Idle(connection); - return Poll::Pending; - } - State::OpeningNewStream { - reply, - mut connection, - } => match connection.poll_new_outbound(cx) { - Poll::Ready(stream) => { - let _ = reply.send(stream); - - self.state = State::Idle(connection); - continue; - } - Poll::Pending => { - self.state = State::OpeningNewStream { reply, connection }; - return Poll::Pending; - } - }, - State::Closing { - reply, - inner: Closing::DrainingControlCommands { connection }, - } => match self.commands.poll_next_unpin(cx) { - Poll::Ready(Some(ControlCommand::OpenStream(new_reply))) => { - let _ = new_reply.send(Err(ConnectionError::Closed)); - - self.state = State::Closing { - reply, - inner: Closing::DrainingControlCommands { connection }, - }; - continue; - } - Poll::Ready(Some(ControlCommand::CloseConnection(new_reply))) => { - let _ = new_reply.send(()); - - self.state = State::Closing { - reply, - inner: Closing::DrainingControlCommands { connection }, - }; - continue; - } - Poll::Ready(None) => { - self.state = State::Closing { - reply, - inner: Closing::ClosingConnection { connection }, - }; - continue; - } - Poll::Pending => { - self.state = State::Closing { - reply, - inner: Closing::DrainingControlCommands { connection }, - }; - return Poll::Pending; - } - }, - State::Closing { - reply, - inner: Closing::ClosingConnection { mut connection }, - } => match connection.poll_close(cx) { - Poll::Ready(Ok(())) | Poll::Ready(Err(ConnectionError::Closed)) => { - if let Some(reply) = reply { - let _ = reply.send(()); - } - return Poll::Ready(None); - } - Poll::Ready(Err(other)) => { - if let Some(reply) = reply { - let _ = reply.send(()); - } - return Poll::Ready(Some(Err(other))); - } - Poll::Pending => { - self.state = State::Closing { - reply, - inner: Closing::ClosingConnection { connection }, - }; - return Poll::Pending; - } - }, - State::Poisoned => unreachable!(), - } - } - } -} - -#[derive(Debug)] -enum ControlCommand { - /// Open a new stream to the remote end. - OpenStream(oneshot::Sender>), - /// Close the whole connection. - CloseConnection(oneshot::Sender<()>), -} - -/// The state of a [`ControlledConnection`]. -enum State { - Idle(Connection), - OpeningNewStream { - reply: oneshot::Sender>, - connection: Connection, - }, - Closing { - /// A channel to the [`Control`] in case the close was requested. `None` if we are closing because the last [`Control`] was dropped. - reply: Option>, - inner: Closing, - }, - Poisoned, -} - -/// A sub-state of our larger state machine for a [`ControlledConnection`]. -/// -/// Closing connection involves two steps: -/// -/// 1. Draining and answered all remaining [`ControlCommands`]. -/// 1. Closing the underlying [`Connection`]. -enum Closing { - DrainingControlCommands { connection: Connection }, - ClosingConnection { connection: Connection }, -} - -impl futures::Stream for ControlledConnection -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().poll_next(cx) - } -} diff --git a/yamux/src/lib.rs b/yamux/src/lib.rs index 040bd8d7..8de20344 100644 --- a/yamux/src/lib.rs +++ b/yamux/src/lib.rs @@ -25,7 +25,6 @@ #![forbid(unsafe_code)] mod chunks; -mod control; mod error; mod frame; @@ -33,7 +32,6 @@ pub(crate) mod connection; mod tagged_stream; pub use crate::connection::{Connection, Mode, Packet, Stream}; -pub use crate::control::{Control, ControlledConnection}; pub use crate::error::ConnectionError; pub use crate::frame::{ header::{HeaderDecodeError, StreamId}, @@ -44,12 +42,6 @@ pub const DEFAULT_CREDIT: u32 = 256 * 1024; // as per yamux specification pub type Result = std::result::Result; -/// Arbitrary limit of our internal command channels. -/// -/// Since each [`mpsc::Sender`] gets a guaranteed slot in a channel the -/// actual upper bound is this value + number of clones. -const MAX_COMMAND_BACKLOG: usize = 32; - /// Default maximum number of bytes a Yamux data frame might carry as its /// payload when being send. Larger Payloads will be split. /// From 40f01c62c82eec0dfee72824f23aaf4786d0d65a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Jun 2023 12:44:01 +0100 Subject: [PATCH 31/33] Fix formatting --- test-harness/src/lib.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index ff99093c..4c4820cc 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -69,8 +69,8 @@ fn new_socket(buffer_sizes: Option) -> io::Result { /// For each incoming stream of `c` echo back to the sender. pub async fn echo_server(mut c: Connection) -> Result<(), ConnectionError> - where - T: AsyncRead + AsyncWrite + Unpin, +where + T: AsyncRead + AsyncWrite + Unpin, { stream::poll_fn(|cx| c.poll_next_inbound(cx)) .try_for_each_concurrent(None, |mut stream| async move { @@ -86,8 +86,8 @@ pub async fn echo_server(mut c: Connection) -> Result<(), ConnectionError> /// For each incoming stream of `c`, read to end but don't write back. pub async fn dev_null_server(mut c: Connection) -> Result<(), ConnectionError> - where - T: AsyncRead + AsyncWrite + Unpin, +where + T: AsyncRead + AsyncWrite + Unpin, { stream::poll_fn(|cx| c.poll_next_inbound(cx)) .try_for_each_concurrent(None, |mut stream| async move { @@ -148,8 +148,8 @@ impl MessageSender { } impl Future for MessageSender - where - T: AsyncRead + AsyncWrite + Unpin, +where + T: AsyncRead + AsyncWrite + Unpin, { type Output = yamux::Result; @@ -224,12 +224,12 @@ impl Future for MessageSender } /// For each incoming stream, do nothing. -pub async fn noop_server(c: impl Stream>) { +pub async fn noop_server(c: impl Stream>) { c.for_each(|maybe_stream| { drop(maybe_stream); future::ready(()) }) - .await; + .await; } /// Send and receive buffer size for a TCP socket. @@ -281,7 +281,7 @@ pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): &Msg) -> io /// Send all messages, using only a single stream. pub async fn send_on_single_stream( mut stream: yamux::Stream, - iter: impl IntoIterator, + iter: impl IntoIterator, ) -> Result<(), ConnectionError> { log::debug!("C: new stream: {}", stream); @@ -313,8 +313,8 @@ impl EchoServer { } impl Future for EchoServer - where - T: AsyncRead + AsyncWrite + Unpin, +where + T: AsyncRead + AsyncWrite + Unpin, { type Output = yamux::Result; @@ -350,7 +350,7 @@ impl Future for EchoServer stream.close().await?; Ok(()) } - .boxed(), + .boxed(), ); continue; } @@ -384,8 +384,8 @@ impl OpenStreamsClient { } impl Future for OpenStreamsClient - where - T: AsyncRead + AsyncWrite + Unpin + fmt::Debug, +where + T: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { type Output = yamux::Result<(Connection, Vec)>; @@ -437,7 +437,7 @@ impl Arbitrary for Msg { msg } - fn shrink(&self) -> Box> { + fn shrink(&self) -> Box> { Box::new(self.0.shrink().filter(|v| !v.is_empty()).map(Msg)) } } From d6a4261b609646b5877f3ab2a78d5c438e2773d2 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 3 Jul 2023 15:36:46 +0100 Subject: [PATCH 32/33] Update docs --- yamux/src/connection/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yamux/src/connection/stream.rs b/yamux/src/connection/stream.rs index 8d07d61f..4368548a 100644 --- a/yamux/src/connection/stream.rs +++ b/yamux/src/connection/stream.rs @@ -43,7 +43,7 @@ pub enum State { /// For outbound streams, this tracks whether the remote has acknowledged our stream. /// For inbound streams, this tracks whether we have acknowledged the stream to the remote. /// - /// This starts out with `false` and is set to `true` when we receive an `ACK` flag for this stream. + /// This starts out with `false` and is set to `true` when we receive or send an `ACK` flag for this stream. /// We may also directly transition: /// - from `Open` to `RecvClosed` if the remote immediately sends `FIN`. /// - from `Open` to `Closed` if the remote immediately sends `RST`. From 7b4fdba7105c0e1937265b6753cc5954f978a8b9 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 3 Jul 2023 15:39:48 +0100 Subject: [PATCH 33/33] Add docs --- yamux/src/connection/stream.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/yamux/src/connection/stream.rs b/yamux/src/connection/stream.rs index 4368548a..d9978ca0 100644 --- a/yamux/src/connection/stream.rs +++ b/yamux/src/connection/stream.rs @@ -401,6 +401,9 @@ impl AsyncWrite for Stream { log::trace!("{}/{}: write {} bytes", self.conn, self.id, n); // technically, the frame hasn't been sent yet on the wire but from the perspective of this data structure, we've queued the frame for sending + // We are tracking this information: + // a) to be consistent with outbound streams + // b) to correctly test our behaviour around timing of when ACKs are sent. See `ack_timing.rs` test. if frame.header().flags().contains(ACK) { self.shared() .update_state(self.conn, self.id, State::Open { acknowledged: true });