From f2f5bc877c8027c5151563159ee4db2746fb4c49 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 18:44:18 +0100 Subject: [PATCH 01/16] Unify harness code --- test-harness/src/lib.rs | 60 +++++++++++++++++++---- test-harness/tests/concurrent.rs | 83 +++----------------------------- test-harness/tests/poll_api.rs | 4 +- test-harness/tests/tests.rs | 7 +-- 4 files changed, 63 insertions(+), 91 deletions(-) diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index 4c387b42..1e1771ab 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -10,7 +10,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::pin::Pin; use std::task::{Context, Poll}; use std::{fmt, io, mem}; -use tokio::net::{TcpListener, TcpStream}; +use tokio::net::{TcpListener, TcpSocket, TcpStream}; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; use yamux::ConnectionError; use yamux::{Config, WindowUpdateMode}; @@ -19,8 +19,9 @@ use yamux::{Connection, Mode}; pub async fn connected_peers( server_config: Config, client_config: Config, + buffer_sizes: Option, ) -> io::Result<(Connection>, Connection>)> { - let (listener, addr) = bind().await?; + let (listener, addr) = bind(buffer_sizes).await?; let server = async { let (stream, _) = listener.accept().await?; @@ -31,7 +32,7 @@ pub async fn connected_peers( )) }; let client = async { - let stream = TcpStream::connect(addr).await?; + let stream = new_socket(buffer_sizes)?.connect(addr).await?; Ok(Connection::new( stream.compat(), client_config, @@ -42,12 +43,27 @@ pub async fn connected_peers( futures::future::try_join(server, client).await } -pub async fn bind() -> io::Result<(TcpListener, SocketAddr)> { - let i = Ipv4Addr::new(127, 0, 0, 1); - let s = SocketAddr::V4(SocketAddrV4::new(i, 0)); - let l = TcpListener::bind(&s).await?; - let a = l.local_addr()?; - Ok((l, a)) +pub async fn bind(buffer_sizes: Option) -> io::Result<(TcpListener, SocketAddr)> { + let socket = new_socket(buffer_sizes)?; + socket.bind(SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + 0, + )))?; + + let listener = socket.listen(1024)?; + let address = listener.local_addr()?; + + Ok((listener, address)) +} + +fn new_socket(buffer_sizes: Option) -> io::Result { + let socket = TcpSocket::new_v4()?; + if let Some(size) = buffer_sizes { + socket.set_send_buffer_size(size.send)?; + socket.set_recv_buffer_size(size.recv)?; + } + + Ok(socket) } /// For each incoming stream of `c` echo back to the sender. @@ -76,6 +92,32 @@ pub async fn noop_server(c: impl Stream Self { + let send = if bool::arbitrary(g) { + 16 * 1024 + } else { + 32 * 1024 + }; + + // Have receive buffer size be some multiple of send buffer size. + let recv = if bool::arbitrary(g) { + send * 2 + } else { + send * 4 + }; + + TcpBufferSizes { send, recv } + } +} + pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): Msg) -> io::Result<()> { let id = stream.id(); let (mut reader, mut writer) = AsyncReadExt::split(stream); diff --git a/test-harness/tests/concurrent.rs b/test-harness/tests/concurrent.rs index 46501158..04b8e6e2 100644 --- a/test-harness/tests/concurrent.rs +++ b/test-harness/tests/concurrent.rs @@ -10,16 +10,10 @@ use futures::prelude::*; use futures::stream::FuturesUnordered; -use quickcheck::{Arbitrary, Gen, QuickCheck}; -use std::{ - io, - net::{Ipv4Addr, SocketAddr, SocketAddrV4}, -}; +use quickcheck::QuickCheck; use test_harness::*; -use tokio::net::{TcpListener, TcpStream}; -use tokio::{net::TcpSocket, runtime::Runtime, task}; -use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; -use yamux::{Config, Connection, ConnectionError, Control, Mode, WindowUpdateMode}; +use tokio::{runtime::Runtime, task}; +use yamux::{Config, ConnectionError, Control, WindowUpdateMode}; const PAYLOAD_SIZE: usize = 128 * 1024; @@ -32,7 +26,9 @@ fn concurrent_streams() { let n_streams = 1000; Runtime::new().expect("new runtime").block_on(async move { - let (server, client) = connected_peers(tcp_buffer_sizes).await.unwrap(); + let (server, client) = connected_peers(config(), config(), tcp_buffer_sizes) + .await + .unwrap(); task::spawn(echo_server(server)); @@ -70,73 +66,6 @@ fn concurrent_streams() { QuickCheck::new().tests(3).quickcheck(prop as fn(_) -> _) } -/// Send and receive buffer size for a TCP socket. -#[derive(Clone, Debug, Copy)] -struct TcpBufferSizes { - send: u32, - recv: u32, -} - -impl Arbitrary for TcpBufferSizes { - fn arbitrary(g: &mut Gen) -> Self { - let send = if bool::arbitrary(g) { - 16 * 1024 - } else { - 32 * 1024 - }; - - // Have receive buffer size be some multiple of send buffer size. - let recv = if bool::arbitrary(g) { - send * 2 - } else { - send * 4 - }; - - TcpBufferSizes { send, recv } - } -} - -async fn connected_peers( - buffer_sizes: Option, -) -> io::Result<(Connection>, Connection>)> { - let (listener, addr) = bind(buffer_sizes).await?; - - let server = async { - let (stream, _) = listener.accept().await?; - Ok(Connection::new(stream.compat(), config(), Mode::Server)) - }; - let client = async { - let stream = new_socket(buffer_sizes)?.connect(addr).await?; - - Ok(Connection::new(stream.compat(), config(), Mode::Client)) - }; - - futures::future::try_join(server, client).await -} - -async fn bind(buffer_sizes: Option) -> io::Result<(TcpListener, SocketAddr)> { - let socket = new_socket(buffer_sizes)?; - socket.bind(SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(127, 0, 0, 1), - 0, - )))?; - - let listener = socket.listen(1024)?; - let address = listener.local_addr()?; - - Ok((listener, address)) -} - -fn new_socket(buffer_sizes: Option) -> io::Result { - let socket = TcpSocket::new_v4()?; - if let Some(size) = buffer_sizes { - socket.set_send_buffer_size(size.send)?; - socket.set_recv_buffer_size(size.recv)?; - } - - Ok(socket) -} - fn config() -> Config { let mut server_cfg = Config::default(); // Use a large frame size to speed up the test. diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index a3ec5759..1b8a5b21 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -20,7 +20,7 @@ fn prop_config_send_recv_multi() { Runtime::new().unwrap().block_on(async move { let num_messagses = msgs.len(); - let (listener, address) = bind().await.expect("bind"); + let (listener, address) = bind(None).await.expect("bind"); let server = async { let socket = listener.accept().await.expect("accept").0.compat(); @@ -55,7 +55,7 @@ fn prop_max_streams() { cfg.set_max_num_streams(max_streams); Runtime::new().unwrap().block_on(async move { - let (server, client) = connected_peers(cfg.clone(), cfg).await?; + let (server, client) = connected_peers(cfg.clone(), cfg, None).await?; task::spawn(EchoServer::new(server)); diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 5a4be48f..9b04e271 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -33,7 +33,7 @@ fn prop_config_send_recv_single() { msgs.insert(0, Msg(vec![1u8; yamux::DEFAULT_CREDIT as usize])); Runtime::new().unwrap().block_on(async move { - let (server, client) = connected_peers(cfg1, cfg2).await?; + let (server, client) = connected_peers(cfg1, cfg2, None).await?; let server = echo_server(server); let client = async { @@ -62,7 +62,8 @@ fn prop_send_recv() { } Runtime::new().unwrap().block_on(async move { - let (server, client) = connected_peers(Config::default(), Config::default()).await?; + let (server, client) = + connected_peers(Config::default(), Config::default(), None).await?; let server = echo_server(server); let client = async { @@ -88,7 +89,7 @@ fn prop_send_recv_half_closed() { Runtime::new().unwrap().block_on(async move { let (mut server, client) = - connected_peers(Config::default(), Config::default()).await?; + connected_peers(Config::default(), Config::default(), None).await?; // Server should be able to write on a stream shutdown by the client. let server = async { From 3b355672e4e48e902be2e7d2ae842a932f442407 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 18:55:25 +0100 Subject: [PATCH 02/16] Port `concurrent_streams` test to poll-api --- test-harness/tests/concurrent.rs | 82 -------------------------------- test-harness/tests/poll_api.rs | 61 ++++++++++++++++++++---- 2 files changed, 52 insertions(+), 91 deletions(-) delete mode 100644 test-harness/tests/concurrent.rs diff --git a/test-harness/tests/concurrent.rs b/test-harness/tests/concurrent.rs deleted file mode 100644 index 04b8e6e2..00000000 --- a/test-harness/tests/concurrent.rs +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -use futures::prelude::*; -use futures::stream::FuturesUnordered; -use quickcheck::QuickCheck; -use test_harness::*; -use tokio::{runtime::Runtime, task}; -use yamux::{Config, ConnectionError, Control, WindowUpdateMode}; - -const PAYLOAD_SIZE: usize = 128 * 1024; - -#[test] -fn concurrent_streams() { - let _ = env_logger::try_init(); - - fn prop(tcp_buffer_sizes: Option) { - let data = Msg(vec![0x42; PAYLOAD_SIZE]); - let n_streams = 1000; - - Runtime::new().expect("new runtime").block_on(async move { - let (server, client) = connected_peers(config(), config(), tcp_buffer_sizes) - .await - .unwrap(); - - task::spawn(echo_server(server)); - - let (mut ctrl, client) = Control::new(client); - task::spawn(noop_server(client)); - - let result = (0..n_streams) - .map(|_| { - let data = data.clone(); - let mut ctrl = ctrl.clone(); - - task::spawn(async move { - let mut stream = ctrl.open_stream().await?; - log::debug!("C: opened new stream {}", stream.id()); - - send_recv_message(&mut stream, data).await?; - stream.close().await?; - - Ok::<(), ConnectionError>(()) - }) - }) - .collect::>() - .try_collect::>() - .await - .unwrap() - .into_iter() - .collect::, ConnectionError>>(); - - ctrl.close().await.expect("close connection"); - - assert_eq!(result.unwrap().len(), n_streams); - }); - } - - QuickCheck::new().tests(3).quickcheck(prop as fn(_) -> _) -} - -fn config() -> Config { - let mut server_cfg = Config::default(); - // Use a large frame size to speed up the test. - server_cfg.set_split_send_size(PAYLOAD_SIZE); - // Use `WindowUpdateMode::OnRead` so window updates are sent by the - // `Stream`s and subject to backpressure from the stream command channel. Thus - // the `Connection` I/O loop will not need to send window updates - // directly as a result of reading a frame, which can otherwise - // lead to mutual write deadlocks if the socket send buffers are too small. - // With `OnRead` the socket send buffer can even be smaller than the size - // of a single frame for this test. - server_cfg.set_window_update_mode(WindowUpdateMode::OnRead); - server_cfg -} diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index 1b8a5b21..4225bb9a 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -3,6 +3,7 @@ use futures::stream::FuturesUnordered; use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; use quickcheck::QuickCheck; use std::future::Future; +use std::iter; use std::pin::Pin; use std::task::{Context, Poll}; use test_harness::*; @@ -33,7 +34,7 @@ fn prop_config_send_recv_multi() { let socket = TcpStream::connect(address).await.expect("connect").compat(); let connection = Connection::new(socket, cfg2.0, Mode::Client); - MessageSender::new(connection, msgs).await + MessageSender::new(connection, msgs, false).await }; let (server_processed, client_processed) = @@ -47,6 +48,40 @@ fn prop_config_send_recv_multi() { QuickCheck::new().quickcheck(prop as fn(_, _, _) -> _) } +#[test] +fn concurrent_streams() { + let _ = env_logger::try_init(); + + fn prop(tcp_buffer_sizes: Option) { + const PAYLOAD_SIZE: usize = 128 * 1024; + + let data = Msg(vec![0x42; PAYLOAD_SIZE]); + let n_streams = 1000; + + let mut cfg = Config::default(); + cfg.set_split_send_size(PAYLOAD_SIZE); // Use a large frame size to speed up the test. + + Runtime::new().expect("new runtime").block_on(async move { + let (server, client) = connected_peers(cfg.clone(), cfg, tcp_buffer_sizes) + .await + .unwrap(); + + task::spawn(echo_server(server)); + let client = MessageSender::new( + client, + iter::repeat(data).take(n_streams).collect::>(), + true, + ); + + let num_processed = client.await.unwrap(); + + assert_eq!(num_processed, n_streams); + }); + } + + QuickCheck::new().tests(3).quickcheck(prop as fn(_) -> _) +} + #[test] fn prop_max_streams() { fn prop(n: usize) -> Result { @@ -76,15 +111,18 @@ struct MessageSender { pending_messages: Vec, worker_streams: FuturesUnordered>, streams_processed: usize, + /// Whether to spawn a new task for each stream. + spawn_tasks: bool, } impl MessageSender { - fn new(connection: Connection, messages: Vec) -> Self { + fn new(connection: Connection, messages: Vec, spawn_tasks: bool) -> Self { Self { connection, pending_messages: messages, worker_streams: FuturesUnordered::default(), streams_processed: 0, + spawn_tasks, } } } @@ -108,13 +146,18 @@ where if let Some(message) = this.pending_messages.pop() { match this.connection.poll_new_outbound(cx)? { Poll::Ready(mut stream) => { - this.worker_streams.push( - async move { - send_recv_message(&mut stream, message).await.unwrap(); - stream.close().await.unwrap(); - } - .boxed(), - ); + let future = async move { + send_recv_message(&mut stream, message).await.unwrap(); + stream.close().await.unwrap(); + }; + + let worker_stream_future = if this.spawn_tasks { + async { task::spawn(future).await.unwrap() }.boxed() + } else { + future.boxed() + }; + + this.worker_streams.push(worker_stream_future); continue; } Poll::Pending => { From 384ade8ab8d62d93508d4d64d870f098e59e4b8a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 18:57:22 +0100 Subject: [PATCH 03/16] Replace custom channel with `futures_ringbuf` --- test-harness/Cargo.toml | 1 + test-harness/tests/tests.rs | 151 +----------------------------------- yamux/src/connection.rs | 2 +- 3 files changed, 4 insertions(+), 150 deletions(-) diff --git a/test-harness/Cargo.toml b/test-harness/Cargo.toml index 92fe8a2f..55cead3e 100644 --- a/test-harness/Cargo.toml +++ b/test-harness/Cargo.toml @@ -16,3 +16,4 @@ log = "0.4.17" [dev-dependencies] env_logger = "0.10" constrained-connection = "0.1" +futures_ringbuf = "0.4.0" diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 9b04e271..7356e1dc 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -8,7 +8,6 @@ // at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license // at https://opensource.org/licenses/MIT. -use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::executor::LocalPool; use futures::future::join; use futures::io::AsyncReadExt; @@ -16,9 +15,7 @@ use futures::prelude::*; use futures::task::{Spawn, SpawnExt}; use quickcheck::{QuickCheck, TestResult}; use std::panic::panic_any; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll, Waker}; + use test_harness::*; use tokio::{runtime::Runtime, task}; use yamux::{Config, Connection, ConnectionError, Control, Mode}; @@ -160,7 +157,7 @@ fn write_deadlock() { // Create a bounded channel representing the underlying "connection". // Each endpoint gets a name and a bounded capacity for its outbound // channel (which is the other's inbound channel). - let (server_endpoint, client_endpoint) = bounded::channel(("S", capacity), ("C", capacity)); + let (server_endpoint, client_endpoint) = futures_ringbuf::Endpoint::pair(capacity, capacity); // Create and spawn a "server" that echoes every message back to the client. let server = Connection::new(server_endpoint, Config::default(), Mode::Server); @@ -247,147 +244,3 @@ async fn send_on_single_stream( Ok(()) } - -/// This module implements a duplex connection via channels with bounded -/// capacities. The channels used for the implementation are unbounded -/// as the operate at the granularity of variably-sized chunks of bytes -/// (`Vec`), whereas the capacity bounds (i.e. max. number of bytes -/// in transit in one direction) are enforced separately. -mod bounded { - use super::*; - use futures::ready; - use std::io::{Error, ErrorKind, Result}; - - pub struct Endpoint { - name: &'static str, - capacity: usize, - send: UnboundedSender>, - send_guard: Arc>, - recv: UnboundedReceiver>, - recv_buf: Vec, - recv_guard: Arc>, - } - - /// A `ChannelGuard` is used to enforce the maximum number of - /// bytes "in transit" across all chunks of an unbounded channel. - #[derive(Default)] - struct ChannelGuard { - size: usize, - waker: Option, - } - - pub fn channel( - (name_a, capacity_a): (&'static str, usize), - (name_b, capacity_b): (&'static str, usize), - ) -> (Endpoint, Endpoint) { - let (a_to_b_sender, a_to_b_receiver) = unbounded(); - let (b_to_a_sender, b_to_a_receiver) = unbounded(); - - let a_to_b_guard = Arc::new(Mutex::new(ChannelGuard::default())); - let b_to_a_guard = Arc::new(Mutex::new(ChannelGuard::default())); - - let a = Endpoint { - name: name_a, - capacity: capacity_a, - send: a_to_b_sender, - send_guard: a_to_b_guard.clone(), - recv: b_to_a_receiver, - recv_buf: Vec::new(), - recv_guard: b_to_a_guard.clone(), - }; - - let b = Endpoint { - name: name_b, - capacity: capacity_b, - send: b_to_a_sender, - send_guard: b_to_a_guard, - recv: a_to_b_receiver, - recv_buf: Vec::new(), - recv_guard: a_to_b_guard, - }; - - (a, b) - } - - impl AsyncRead for Endpoint { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - if self.recv_buf.is_empty() { - match ready!(self.recv.poll_next_unpin(cx)) { - Some(bytes) => { - self.recv_buf = bytes; - } - None => return Poll::Ready(Ok(0)), - } - } - - let n = std::cmp::min(buf.len(), self.recv_buf.len()); - buf[0..n].copy_from_slice(&self.recv_buf[0..n]); - self.recv_buf = self.recv_buf.split_off(n); - - let mut guard = self.recv_guard.lock().unwrap(); - if let Some(waker) = guard.waker.take() { - log::debug!( - "{}: read: notifying waker after read of {} bytes", - self.name, - n - ); - waker.wake(); - } - guard.size -= n; - - log::debug!( - "{}: read: channel: {}/{}", - self.name, - guard.size, - self.capacity - ); - - Poll::Ready(Ok(n)) - } - } - - impl AsyncWrite for Endpoint { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - debug_assert!(!buf.is_empty()); - let mut guard = self.send_guard.lock().unwrap(); - let n = std::cmp::min(self.capacity - guard.size, buf.len()); - if n == 0 { - log::debug!("{}: write: channel full, registering waker", self.name); - guard.waker = Some(cx.waker().clone()); - return Poll::Pending; - } - - self.send - .unbounded_send(buf[0..n].to_vec()) - .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))?; - - guard.size += n; - log::debug!( - "{}: write: channel: {}/{}", - self.name, - guard.size, - self.capacity - ); - - Poll::Ready(Ok(n)) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.send.poll_flush_unpin(cx)).unwrap(); - Poll::Ready(Ok(())) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.send.poll_close_unpin(cx)).unwrap(); - Poll::Ready(Ok(())) - } - } -} diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index 43f76226..5389e9e1 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -458,7 +458,7 @@ impl Active { match self.stream_receivers.poll_next_unpin(cx) { Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => { - self.on_send_frame(frame.into()); + self.on_send_frame(frame); continue; } Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => { From 9c090d3c29e5ac5fbe6847cb25c494e8d10f3b06 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 19:01:36 +0100 Subject: [PATCH 04/16] Remove `prop_send_recv` This test is covered by `prop_send_recv_multi`. --- test-harness/tests/tests.rs | 49 +------------------------------------ 1 file changed, 1 insertion(+), 48 deletions(-) diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 7356e1dc..88f8a8a6 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -13,7 +13,7 @@ use futures::future::join; use futures::io::AsyncReadExt; use futures::prelude::*; use futures::task::{Spawn, SpawnExt}; -use quickcheck::{QuickCheck, TestResult}; +use quickcheck::QuickCheck; use std::panic::panic_any; use test_harness::*; @@ -51,34 +51,6 @@ fn prop_config_send_recv_single() { .quickcheck(prop as fn(_, _, _) -> _) } -#[test] -fn prop_send_recv() { - fn prop(msgs: Vec) -> Result { - if msgs.is_empty() { - return Ok(TestResult::discard()); - } - - Runtime::new().unwrap().block_on(async move { - let (server, client) = - connected_peers(Config::default(), Config::default(), None).await?; - - let server = echo_server(server); - let client = async { - let (control, client) = Control::new(client); - task::spawn(noop_server(client)); - send_on_separate_streams(control, msgs).await?; - - Ok(()) - }; - - futures::future::try_join(server, client).await?; - - Ok(TestResult::passed()) - }) - } - QuickCheck::new().tests(1).quickcheck(prop as fn(_) -> _) -} - #[test] fn prop_send_recv_half_closed() { fn prop(msg: Msg) -> Result<(), ConnectionError> { @@ -206,25 +178,6 @@ fn write_deadlock() { ); } -/// Send all messages, opening a new stream for each one. -async fn send_on_separate_streams( - mut control: Control, - iter: impl IntoIterator, -) -> Result<(), ConnectionError> { - for msg in iter { - let mut stream = control.open_stream().await?; - log::debug!("C: new stream: {}", stream); - - send_recv_message(&mut stream, msg).await?; - stream.close().await?; - } - - log::debug!("C: closing connection"); - control.close().await?; - - Ok(()) -} - /// Send all messages, using only a single stream. async fn send_on_single_stream( mut control: Control, From 660e6c93ae51ae578aa47b4ec2c5e2c31923aff8 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 19:09:04 +0100 Subject: [PATCH 05/16] Rewrite `prop_send_recv_half_closed` to poll-api and move --- test-harness/tests/poll_api.rs | 58 +++++++++++++++++++++++++++++++++- test-harness/tests/tests.rs | 53 ------------------------------- 2 files changed, 57 insertions(+), 54 deletions(-) diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index 4225bb9a..a87048da 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -1,6 +1,6 @@ use futures::future::BoxFuture; use futures::stream::FuturesUnordered; -use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; +use futures::{future, stream, AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; use quickcheck::QuickCheck; use std::future::Future; use std::iter; @@ -106,6 +106,62 @@ fn prop_max_streams() { QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _) } +#[test] +fn prop_send_recv_half_closed() { + fn prop(msg: Msg) -> Result<(), ConnectionError> { + let msg_len = msg.0.len(); + + Runtime::new().unwrap().block_on(async move { + let (mut server, mut client) = + connected_peers(Config::default(), Config::default(), None).await?; + + // Server should be able to write on a stream shutdown by the client. + let server = async { + let mut server = stream::poll_fn(move |cx| server.poll_next_inbound(cx)); + + let mut first_stream = server.next().await.ok_or(ConnectionError::Closed)??; + + task::spawn(noop_server(server)); + + let mut buf = vec![0; msg_len]; + first_stream.read_exact(&mut buf).await?; + first_stream.write_all(&buf).await?; + first_stream.close().await?; + + Result::<(), ConnectionError>::Ok(()) + }; + + // Client should be able to read after shutting down the stream. + let client = async { + let mut stream = future::poll_fn(|cx| client.poll_new_outbound(cx)) + .await + .unwrap(); + task::spawn(noop_server(stream::poll_fn(move |cx| { + client.poll_next_inbound(cx) + }))); + + stream.write_all(&msg.0).await?; + stream.close().await?; + + assert!(stream.is_write_closed()); + let mut buf = vec![0; msg_len]; + stream.read_exact(&mut buf).await?; + + assert_eq!(buf, msg.0); + assert_eq!(Some(0), stream.read(&mut buf).await.ok()); + assert!(stream.is_closed()); + + Result::<(), ConnectionError>::Ok(()) + }; + + futures::future::try_join(server, client).await?; + + Ok(()) + }) + } + QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _) +} + struct MessageSender { connection: Connection, pending_messages: Vec, diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 88f8a8a6..4ac8bb32 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -51,59 +51,6 @@ fn prop_config_send_recv_single() { .quickcheck(prop as fn(_, _, _) -> _) } -#[test] -fn prop_send_recv_half_closed() { - fn prop(msg: Msg) -> Result<(), ConnectionError> { - let msg_len = msg.0.len(); - - Runtime::new().unwrap().block_on(async move { - let (mut server, client) = - connected_peers(Config::default(), Config::default(), None).await?; - - // Server should be able to write on a stream shutdown by the client. - let server = async { - let mut server = stream::poll_fn(move |cx| server.poll_next_inbound(cx)); - - let mut first_stream = server.next().await.ok_or(ConnectionError::Closed)??; - - task::spawn(noop_server(server)); - - let mut buf = vec![0; msg_len]; - first_stream.read_exact(&mut buf).await?; - first_stream.write_all(&buf).await?; - first_stream.close().await?; - - Result::<(), ConnectionError>::Ok(()) - }; - - // Client should be able to read after shutting down the stream. - let client = async { - let (mut control, client) = Control::new(client); - task::spawn(noop_server(client)); - - let mut stream = control.open_stream().await?; - stream.write_all(&msg.0).await?; - stream.close().await?; - - assert!(stream.is_write_closed()); - let mut buf = vec![0; msg_len]; - stream.read_exact(&mut buf).await?; - - assert_eq!(buf, msg.0); - assert_eq!(Some(0), stream.read(&mut buf).await.ok()); - assert!(stream.is_closed()); - - Result::<(), ConnectionError>::Ok(()) - }; - - futures::future::try_join(server, client).await?; - - Ok(()) - }) - } - QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _) -} - /// This test simulates two endpoints of a Yamux connection which may be unable to /// write simultaneously but can make progress by reading. If both endpoints /// don't read in-between trying to finish their writes, a deadlock occurs. From 7c87447f2a793b7bf68832335b5d9eb9436e99f0 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 19:29:44 +0100 Subject: [PATCH 06/16] Rewrite `prop_config_send_recv_single` to poll-api --- test-harness/tests/tests.rs | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 4ac8bb32..8d60d4ca 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -15,9 +15,10 @@ use futures::prelude::*; use futures::task::{Spawn, SpawnExt}; use quickcheck::QuickCheck; use std::panic::panic_any; +use std::pin::pin; use test_harness::*; -use tokio::{runtime::Runtime, task}; +use tokio::runtime::Runtime; use yamux::{Config, Connection, ConnectionError, Control, Mode}; #[test] @@ -30,13 +31,18 @@ fn prop_config_send_recv_single() { msgs.insert(0, Msg(vec![1u8; yamux::DEFAULT_CREDIT as usize])); Runtime::new().unwrap().block_on(async move { - let (server, client) = connected_peers(cfg1, cfg2, None).await?; - + let (server, mut client) = connected_peers(cfg1, cfg2, None).await?; let server = echo_server(server); + let client = async { - let (control, client) = Control::new(client); - task::spawn(noop_server(client)); - send_on_single_stream(control, msgs).await?; + let stream = future::poll_fn(|cx| client.poll_new_outbound(cx)) + .await + .unwrap(); + let client_task = noop_server(stream::poll_fn(|cx| client.poll_next_inbound(cx))); + + future::select(pin!(client_task), pin!(send_on_single_stream(stream, msgs))).await; + + future::poll_fn(|cx| client.poll_close(cx)).await.unwrap(); Ok(()) }; @@ -127,10 +133,9 @@ fn write_deadlock() { /// Send all messages, using only a single stream. async fn send_on_single_stream( - mut control: Control, + mut stream: yamux::Stream, iter: impl IntoIterator, ) -> Result<(), ConnectionError> { - let mut stream = control.open_stream().await?; log::debug!("C: new stream: {}", stream); for msg in iter { @@ -139,8 +144,5 @@ async fn send_on_single_stream( stream.close().await?; - log::debug!("C: closing connection"); - control.close().await?; - Ok(()) } From 05eee85aeb2d8139b281e337d643d2c809ea1d61 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 19:31:19 +0100 Subject: [PATCH 07/16] Move test to `poll_api` --- test-harness/src/lib.rs | 16 ++++++++++ test-harness/tests/poll_api.rs | 42 +++++++++++++++++++++++-- test-harness/tests/tests.rs | 57 +--------------------------------- 3 files changed, 57 insertions(+), 58 deletions(-) diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index 1e1771ab..009f5e23 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -138,6 +138,22 @@ pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): Msg) -> io: Ok(()) } +/// Send all messages, using only a single stream. +pub async fn send_on_single_stream( + mut stream: yamux::Stream, + iter: impl IntoIterator, +) -> Result<(), ConnectionError> { + log::debug!("C: new stream: {}", stream); + + for msg in iter { + send_recv_message(&mut stream, msg).await?; + } + + stream.close().await?; + + Ok(()) +} + pub struct EchoServer { connection: Connection, worker_streams: FuturesUnordered>>, diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index a87048da..cbb3ec14 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -1,10 +1,12 @@ use futures::future::BoxFuture; use futures::stream::FuturesUnordered; -use futures::{future, stream, AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; +use futures::{ + future, stream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt, +}; use quickcheck::QuickCheck; use std::future::Future; use std::iter; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::task::{Context, Poll}; use test_harness::*; use tokio::net::TcpStream; @@ -162,6 +164,42 @@ fn prop_send_recv_half_closed() { QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _) } +#[test] +fn prop_config_send_recv_single() { + fn prop( + mut msgs: Vec, + TestConfig(cfg1): TestConfig, + TestConfig(cfg2): TestConfig, + ) -> Result<(), ConnectionError> { + msgs.insert(0, Msg(vec![1u8; yamux::DEFAULT_CREDIT as usize])); + + Runtime::new().unwrap().block_on(async move { + let (server, mut client) = connected_peers(cfg1, cfg2, None).await?; + let server = echo_server(server); + + let client = async { + let stream = future::poll_fn(|cx| client.poll_new_outbound(cx)) + .await + .unwrap(); + let client_task = noop_server(stream::poll_fn(|cx| client.poll_next_inbound(cx))); + + future::select(pin!(client_task), pin!(send_on_single_stream(stream, msgs))).await; + + future::poll_fn(|cx| client.poll_close(cx)).await.unwrap(); + + Ok(()) + }; + + futures::future::try_join(server, client).await?; + + Ok(()) + }) + } + QuickCheck::new() + .tests(10) + .quickcheck(prop as fn(_, _, _) -> _) +} + struct MessageSender { connection: Connection, pending_messages: Vec, diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 8d60d4ca..15a2745e 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -13,49 +13,10 @@ use futures::future::join; use futures::io::AsyncReadExt; use futures::prelude::*; use futures::task::{Spawn, SpawnExt}; -use quickcheck::QuickCheck; use std::panic::panic_any; -use std::pin::pin; use test_harness::*; -use tokio::runtime::Runtime; -use yamux::{Config, Connection, ConnectionError, Control, Mode}; - -#[test] -fn prop_config_send_recv_single() { - fn prop( - mut msgs: Vec, - TestConfig(cfg1): TestConfig, - TestConfig(cfg2): TestConfig, - ) -> Result<(), ConnectionError> { - msgs.insert(0, Msg(vec![1u8; yamux::DEFAULT_CREDIT as usize])); - - Runtime::new().unwrap().block_on(async move { - let (server, mut client) = connected_peers(cfg1, cfg2, None).await?; - let server = echo_server(server); - - let client = async { - let stream = future::poll_fn(|cx| client.poll_new_outbound(cx)) - .await - .unwrap(); - let client_task = noop_server(stream::poll_fn(|cx| client.poll_next_inbound(cx))); - - future::select(pin!(client_task), pin!(send_on_single_stream(stream, msgs))).await; - - future::poll_fn(|cx| client.poll_close(cx)).await.unwrap(); - - Ok(()) - }; - - futures::future::try_join(server, client).await?; - - Ok(()) - }) - } - QuickCheck::new() - .tests(10) - .quickcheck(prop as fn(_, _, _) -> _) -} +use yamux::{Config, Connection, Control, Mode}; /// This test simulates two endpoints of a Yamux connection which may be unable to /// write simultaneously but can make progress by reading. If both endpoints @@ -130,19 +91,3 @@ fn write_deadlock() { .unwrap(), ); } - -/// Send all messages, using only a single stream. -async fn send_on_single_stream( - mut stream: yamux::Stream, - iter: impl IntoIterator, -) -> Result<(), ConnectionError> { - log::debug!("C: new stream: {}", stream); - - for msg in iter { - send_recv_message(&mut stream, msg).await?; - } - - stream.close().await?; - - Ok(()) -} From efa73103ee471dd3086a83394c69c85ec332454c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 19:33:48 +0100 Subject: [PATCH 08/16] Rewrite deadlock test to use poll-api --- test-harness/tests/tests.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs index 15a2745e..bf521632 100644 --- a/test-harness/tests/tests.rs +++ b/test-harness/tests/tests.rs @@ -16,7 +16,7 @@ use futures::task::{Spawn, SpawnExt}; use std::panic::panic_any; use test_harness::*; -use yamux::{Config, Connection, Control, Mode}; +use yamux::{Config, Connection, Mode}; /// This test simulates two endpoints of a Yamux connection which may be unable to /// write simultaneously but can make progress by reading. If both endpoints @@ -57,12 +57,19 @@ fn write_deadlock() { // Create and spawn a "client" that sends messages expected to be echoed // by the server. - let client = Connection::new(client_endpoint, Config::default(), Mode::Client); - let (mut ctrl, client) = Control::new(client); + let mut client = Connection::new(client_endpoint, Config::default(), Mode::Client); + + let stream = pool + .run_until(future::poll_fn(|cx| client.poll_new_outbound(cx))) + .unwrap(); // Continuously advance the Yamux connection of the client in a background task. pool.spawner() - .spawn_obj(noop_server(client).boxed().into()) + .spawn_obj( + noop_server(stream::poll_fn(move |cx| client.poll_next_inbound(cx))) + .boxed() + .into(), + ) .unwrap(); // Send the message, expecting it to be echo'd. @@ -70,7 +77,6 @@ fn write_deadlock() { pool.spawner() .spawn_with_handle( async move { - let stream = ctrl.open_stream().await.unwrap(); let (mut reader, mut writer) = AsyncReadExt::split(stream); let mut b = vec![0; msg.len()]; // Write & read concurrently, so that the client is able From 0f10d51739596a97885501bb705f95b6dc6198f3 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 28 Jun 2023 19:34:59 +0100 Subject: [PATCH 09/16] Move final test to poll_api --- test-harness/tests/poll_api.rs | 86 ++++++++++++++++++++++++++++- test-harness/tests/tests.rs | 99 ---------------------------------- 2 files changed, 85 insertions(+), 100 deletions(-) delete mode 100644 test-harness/tests/tests.rs diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index cbb3ec14..9b4332d1 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -1,11 +1,15 @@ -use futures::future::BoxFuture; +use futures::executor::LocalPool; +use futures::future::{join, BoxFuture}; +use futures::prelude::*; use futures::stream::FuturesUnordered; +use futures::task::{Spawn, SpawnExt}; use futures::{ future, stream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt, }; use quickcheck::QuickCheck; use std::future::Future; use std::iter; +use std::panic::panic_any; use std::pin::{pin, Pin}; use std::task::{Context, Poll}; use test_harness::*; @@ -200,6 +204,86 @@ fn prop_config_send_recv_single() { .quickcheck(prop as fn(_, _, _) -> _) } +/// This test simulates two endpoints of a Yamux connection which may be unable to +/// write simultaneously but can make progress by reading. If both endpoints +/// don't read in-between trying to finish their writes, a deadlock occurs. +#[test] +fn write_deadlock() { + let _ = env_logger::try_init(); + let mut pool = LocalPool::new(); + + // We make the message to transmit large enough s.t. the "server" + // is forced to start writing (i.e. echoing) the bytes before + // having read the entire payload. + let msg = vec![1u8; 1024 * 1024]; + + // We choose a "connection capacity" that is artificially below + // the size of a receive window. If it were equal or greater, + // multiple concurrently writing streams would be needed to non-deterministically + // provoke the write deadlock. This is supposed to reflect the + // fact that the sum of receive windows of all open streams can easily + // be larger than the send capacity of the connection at any point in time. + // Using such a low capacity here therefore yields a more reproducible test. + let capacity = 1024; + + // Create a bounded channel representing the underlying "connection". + // Each endpoint gets a name and a bounded capacity for its outbound + // channel (which is the other's inbound channel). + let (server_endpoint, client_endpoint) = futures_ringbuf::Endpoint::pair(capacity, capacity); + + // Create and spawn a "server" that echoes every message back to the client. + let server = Connection::new(server_endpoint, Config::default(), Mode::Server); + pool.spawner() + .spawn_obj( + async move { echo_server(server).await.unwrap() } + .boxed() + .into(), + ) + .unwrap(); + + // Create and spawn a "client" that sends messages expected to be echoed + // by the server. + let mut client = Connection::new(client_endpoint, Config::default(), Mode::Client); + + let stream = pool + .run_until(future::poll_fn(|cx| client.poll_new_outbound(cx))) + .unwrap(); + + // Continuously advance the Yamux connection of the client in a background task. + pool.spawner() + .spawn_obj( + noop_server(stream::poll_fn(move |cx| client.poll_next_inbound(cx))) + .boxed() + .into(), + ) + .unwrap(); + + // Send the message, expecting it to be echo'd. + pool.run_until( + pool.spawner() + .spawn_with_handle( + async move { + let (mut reader, mut writer) = AsyncReadExt::split(stream); + let mut b = vec![0; msg.len()]; + // Write & read concurrently, so that the client is able + // to start reading the echo'd bytes before it even finished + // sending them all. + let _ = join( + writer.write_all(msg.as_ref()).map_err(|e| panic_any(e)), + reader.read_exact(&mut b[..]).map_err(|e| panic_any(e)), + ) + .await; + let mut stream = reader.reunite(writer).unwrap(); + stream.close().await.unwrap(); + log::debug!("C: Stream {} done.", stream.id()); + assert_eq!(b, msg); + } + .boxed(), + ) + .unwrap(), + ); +} + struct MessageSender { connection: Connection, pending_messages: Vec, diff --git a/test-harness/tests/tests.rs b/test-harness/tests/tests.rs deleted file mode 100644 index bf521632..00000000 --- a/test-harness/tests/tests.rs +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -use futures::executor::LocalPool; -use futures::future::join; -use futures::io::AsyncReadExt; -use futures::prelude::*; -use futures::task::{Spawn, SpawnExt}; -use std::panic::panic_any; - -use test_harness::*; -use yamux::{Config, Connection, Mode}; - -/// This test simulates two endpoints of a Yamux connection which may be unable to -/// write simultaneously but can make progress by reading. If both endpoints -/// don't read in-between trying to finish their writes, a deadlock occurs. -#[test] -fn write_deadlock() { - let _ = env_logger::try_init(); - let mut pool = LocalPool::new(); - - // We make the message to transmit large enough s.t. the "server" - // is forced to start writing (i.e. echoing) the bytes before - // having read the entire payload. - let msg = vec![1u8; 1024 * 1024]; - - // We choose a "connection capacity" that is artificially below - // the size of a receive window. If it were equal or greater, - // multiple concurrently writing streams would be needed to non-deterministically - // provoke the write deadlock. This is supposed to reflect the - // fact that the sum of receive windows of all open streams can easily - // be larger than the send capacity of the connection at any point in time. - // Using such a low capacity here therefore yields a more reproducible test. - let capacity = 1024; - - // Create a bounded channel representing the underlying "connection". - // Each endpoint gets a name and a bounded capacity for its outbound - // channel (which is the other's inbound channel). - let (server_endpoint, client_endpoint) = futures_ringbuf::Endpoint::pair(capacity, capacity); - - // Create and spawn a "server" that echoes every message back to the client. - let server = Connection::new(server_endpoint, Config::default(), Mode::Server); - pool.spawner() - .spawn_obj( - async move { echo_server(server).await.unwrap() } - .boxed() - .into(), - ) - .unwrap(); - - // Create and spawn a "client" that sends messages expected to be echoed - // by the server. - let mut client = Connection::new(client_endpoint, Config::default(), Mode::Client); - - let stream = pool - .run_until(future::poll_fn(|cx| client.poll_new_outbound(cx))) - .unwrap(); - - // Continuously advance the Yamux connection of the client in a background task. - pool.spawner() - .spawn_obj( - noop_server(stream::poll_fn(move |cx| client.poll_next_inbound(cx))) - .boxed() - .into(), - ) - .unwrap(); - - // Send the message, expecting it to be echo'd. - pool.run_until( - pool.spawner() - .spawn_with_handle( - async move { - let (mut reader, mut writer) = AsyncReadExt::split(stream); - let mut b = vec![0; msg.len()]; - // Write & read concurrently, so that the client is able - // to start reading the echo'd bytes before it even finished - // sending them all. - let _ = join( - writer.write_all(msg.as_ref()).map_err(|e| panic_any(e)), - reader.read_exact(&mut b[..]).map_err(|e| panic_any(e)), - ) - .await; - let mut stream = reader.reunite(writer).unwrap(); - stream.close().await.unwrap(); - log::debug!("C: Stream {} done.", stream.id()); - assert_eq!(b, msg); - } - .boxed(), - ) - .unwrap(), - ); -} From 749fd9ab323d4191f1190668ee3dfba4c13e89fc Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Jun 2023 11:56:00 +0100 Subject: [PATCH 10/16] Move `MessageSender` to test-harness --- test-harness/src/lib.rs | 85 ++++++++++++++++++++++++++++++ test-harness/tests/poll_api.rs | 95 ++-------------------------------- 2 files changed, 88 insertions(+), 92 deletions(-) diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index 009f5e23..b01169a1 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -11,6 +11,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::{fmt, io, mem}; use tokio::net::{TcpListener, TcpSocket, TcpStream}; +use tokio::task; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; use yamux::ConnectionError; use yamux::{Config, WindowUpdateMode}; @@ -83,6 +84,90 @@ where .await } +pub struct MessageSender { + connection: Connection, + pending_messages: Vec, + worker_streams: FuturesUnordered>, + streams_processed: usize, + /// Whether to spawn a new task for each stream. + spawn_tasks: bool, +} + +impl MessageSender { + pub fn new(connection: Connection, messages: Vec, spawn_tasks: bool) -> Self { + Self { + connection, + pending_messages: messages, + worker_streams: FuturesUnordered::default(), + streams_processed: 0, + spawn_tasks, + } + } +} + +impl Future for MessageSender +where + T: AsyncRead + AsyncWrite + Unpin, +{ + type Output = yamux::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + if this.pending_messages.is_empty() && this.worker_streams.is_empty() { + futures::ready!(this.connection.poll_close(cx)?); + + return Poll::Ready(Ok(this.streams_processed)); + } + + if let Some(message) = this.pending_messages.pop() { + match this.connection.poll_new_outbound(cx)? { + Poll::Ready(mut stream) => { + let future = async move { + send_recv_message(&mut stream, message).await.unwrap(); + stream.close().await.unwrap(); + }; + + let worker_stream_future = if this.spawn_tasks { + async { task::spawn(future).await.unwrap() }.boxed() + } else { + future.boxed() + }; + + this.worker_streams.push(worker_stream_future); + continue; + } + Poll::Pending => { + this.pending_messages.push(message); + } + } + } + + match this.worker_streams.poll_next_unpin(cx) { + Poll::Ready(Some(())) => { + this.streams_processed += 1; + continue; + } + Poll::Ready(None) | Poll::Pending => {} + } + + match this.connection.poll_next_inbound(cx)? { + Poll::Ready(Some(stream)) => { + drop(stream); + panic!("Did not expect remote to open a stream"); + } + Poll::Ready(None) => { + panic!("Did not expect remote to close the connection"); + } + Poll::Pending => {} + } + + return Poll::Pending; + } + } +} + /// For each incoming stream, do nothing. pub async fn noop_server(c: impl Stream>) { c.for_each(|maybe_stream| { diff --git a/test-harness/tests/poll_api.rs b/test-harness/tests/poll_api.rs index 9b4332d1..37f0ad72 100644 --- a/test-harness/tests/poll_api.rs +++ b/test-harness/tests/poll_api.rs @@ -1,17 +1,12 @@ use futures::executor::LocalPool; -use futures::future::{join, BoxFuture}; +use futures::future::join; use futures::prelude::*; -use futures::stream::FuturesUnordered; use futures::task::{Spawn, SpawnExt}; -use futures::{ - future, stream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt, -}; +use futures::{future, stream, AsyncReadExt, AsyncWriteExt, FutureExt, StreamExt}; use quickcheck::QuickCheck; -use std::future::Future; use std::iter; use std::panic::panic_any; -use std::pin::{pin, Pin}; -use std::task::{Context, Poll}; +use std::pin::pin; use test_harness::*; use tokio::net::TcpStream; use tokio::runtime::Runtime; @@ -283,87 +278,3 @@ fn write_deadlock() { .unwrap(), ); } - -struct MessageSender { - connection: Connection, - pending_messages: Vec, - worker_streams: FuturesUnordered>, - streams_processed: usize, - /// Whether to spawn a new task for each stream. - spawn_tasks: bool, -} - -impl MessageSender { - fn new(connection: Connection, messages: Vec, spawn_tasks: bool) -> Self { - Self { - connection, - pending_messages: messages, - worker_streams: FuturesUnordered::default(), - streams_processed: 0, - spawn_tasks, - } - } -} - -impl Future for MessageSender -where - T: AsyncRead + AsyncWrite + Unpin, -{ - type Output = yamux::Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - loop { - if this.pending_messages.is_empty() && this.worker_streams.is_empty() { - futures::ready!(this.connection.poll_close(cx)?); - - return Poll::Ready(Ok(this.streams_processed)); - } - - if let Some(message) = this.pending_messages.pop() { - match this.connection.poll_new_outbound(cx)? { - Poll::Ready(mut stream) => { - let future = async move { - send_recv_message(&mut stream, message).await.unwrap(); - stream.close().await.unwrap(); - }; - - let worker_stream_future = if this.spawn_tasks { - async { task::spawn(future).await.unwrap() }.boxed() - } else { - future.boxed() - }; - - this.worker_streams.push(worker_stream_future); - continue; - } - Poll::Pending => { - this.pending_messages.push(message); - } - } - } - - match this.worker_streams.poll_next_unpin(cx) { - Poll::Ready(Some(())) => { - this.streams_processed += 1; - continue; - } - Poll::Ready(None) | Poll::Pending => {} - } - - match this.connection.poll_next_inbound(cx)? { - Poll::Ready(Some(stream)) => { - drop(stream); - panic!("Did not expect remote to open a stream"); - } - Poll::Ready(None) => { - panic!("Did not expect remote to close the connection"); - } - Poll::Pending => {} - } - - return Poll::Pending; - } - } -} From 7ff430fa100b10f2d942acf4e34964ec0b4280cc Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Jun 2023 12:07:39 +0100 Subject: [PATCH 11/16] Move benchmarks to test-harness --- test-harness/Cargo.toml | 9 +++++++++ {yamux => test-harness}/benches/concurrent.rs | 0 yamux/Cargo.toml | 12 ------------ 3 files changed, 9 insertions(+), 12 deletions(-) rename {yamux => test-harness}/benches/concurrent.rs (100%) diff --git a/test-harness/Cargo.toml b/test-harness/Cargo.toml index 55cead3e..39b702d5 100644 --- a/test-harness/Cargo.toml +++ b/test-harness/Cargo.toml @@ -14,6 +14,15 @@ anyhow = "1" log = "0.4.17" [dev-dependencies] +criterion = "0.5" env_logger = "0.10" +futures = "0.3.4" +quickcheck = "1.0" +tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros", "time"] } +tokio-util = { version = "0.7", features = ["compat"] } constrained-connection = "0.1" futures_ringbuf = "0.4.0" + +[[bench]] +name = "concurrent" +harness = false diff --git a/yamux/benches/concurrent.rs b/test-harness/benches/concurrent.rs similarity index 100% rename from yamux/benches/concurrent.rs rename to test-harness/benches/concurrent.rs diff --git a/yamux/Cargo.toml b/yamux/Cargo.toml index 09322d3c..b1407ac0 100644 --- a/yamux/Cargo.toml +++ b/yamux/Cargo.toml @@ -19,16 +19,4 @@ static_assertions = "1" pin-project = "1.1.0" [dev-dependencies] -anyhow = "1" -criterion = "0.5" -env_logger = "0.10" -futures = "0.3.4" quickcheck = "1.0" -tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros", "time"] } -tokio-util = { version = "0.7", features = ["compat"] } -constrained-connection = "0.1" -futures_ringbuf = "0.4.0" - -[[bench]] -name = "concurrent" -harness = false From d3667e3c980dcb8b25a846dba8b6e3c671f0f0ca Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Jun 2023 12:13:51 +0100 Subject: [PATCH 12/16] Add message multiplier to `MessageSender` --- test-harness/src/lib.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index b01169a1..e4d0d3c9 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -91,6 +91,8 @@ pub struct MessageSender { streams_processed: usize, /// Whether to spawn a new task for each stream. spawn_tasks: bool, + /// How many times to send each message on the stream + message_multiplier: u64, } impl MessageSender { @@ -101,8 +103,14 @@ impl MessageSender { worker_streams: FuturesUnordered::default(), streams_processed: 0, spawn_tasks, + message_multiplier: 1, } } + + pub fn with_message_multiplier(mut self, multiplier: u64) -> Self { + self.message_multiplier = multiplier; + self + } } impl Future for MessageSender @@ -124,8 +132,13 @@ where if let Some(message) = this.pending_messages.pop() { match this.connection.poll_new_outbound(cx)? { Poll::Ready(mut stream) => { + let multiplier = this.message_multiplier; + let future = async move { - send_recv_message(&mut stream, message).await.unwrap(); + for _ in 0..multiplier { + send_recv_message(&mut stream, &message).await.unwrap(); + } + stream.close().await.unwrap(); }; @@ -203,13 +216,13 @@ impl Arbitrary for TcpBufferSizes { } } -pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): Msg) -> io::Result<()> { +pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): &Msg) -> io::Result<()> { let id = stream.id(); let (mut reader, mut writer) = AsyncReadExt::split(stream); let len = msg.len(); let write_fut = async { - writer.write_all(&msg).await.unwrap(); + writer.write_all(msg).await.unwrap(); log::debug!("C: {}: sent {} bytes", id, len); }; let mut data = vec![0; msg.len()]; @@ -218,7 +231,7 @@ pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): Msg) -> io: log::debug!("C: {}: received {} bytes", id, data.len()); }; futures::future::join(write_fut, read_fut).await; - assert_eq!(data, msg); + assert_eq!(&data, msg); Ok(()) } @@ -231,7 +244,7 @@ pub async fn send_on_single_stream( log::debug!("C: new stream: {}", stream); for msg in iter { - send_recv_message(&mut stream, msg).await?; + send_recv_message(&mut stream, &msg).await?; } stream.close().await?; From 358a7285a189d1b45423e438989e74d4f7057230 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Jun 2023 12:20:57 +0100 Subject: [PATCH 13/16] Migrate benchmark away from `Control` --- test-harness/benches/concurrent.rs | 73 +++++++----------------------- test-harness/src/lib.rs | 70 ++++++++++++++++++++++------ 2 files changed, 72 insertions(+), 71 deletions(-) diff --git a/test-harness/benches/concurrent.rs b/test-harness/benches/concurrent.rs index 5da8ce29..f7179cee 100644 --- a/test-harness/benches/concurrent.rs +++ b/test-harness/benches/concurrent.rs @@ -10,10 +10,11 @@ use constrained_connection::{new_unconstrained_connection, samples, Endpoint}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; -use futures::{channel::mpsc, future, io::AsyncReadExt, prelude::*}; +use std::iter; use std::sync::Arc; +use test_harness::{dev_null_server, MessageSender, MessageSenderStrategy, Msg}; use tokio::{runtime::Runtime, task}; -use yamux::{Config, Connection, Control, Mode}; +use yamux::{Config, Connection, Mode}; criterion_group!(benches, concurrent); criterion_main!(benches); @@ -86,62 +87,20 @@ async fn oneway( server: Endpoint, client: Endpoint, ) { - let msg_len = data.0.len(); - let (tx, rx) = mpsc::unbounded(); + let server = Connection::new(server, config(), Mode::Server); + let client = Connection::new(client, config(), Mode::Client); - let server = async move { - let mut connection = Connection::new(server, config(), Mode::Server); + task::spawn(dev_null_server(server)); - while let Some(Ok(mut stream)) = stream::poll_fn(|cx| connection.poll_next_inbound(cx)) - .next() - .await - { - let tx = tx.clone(); - - task::spawn(async move { - let mut n = 0; - let mut b = vec![0; msg_len]; - - // Receive `nmessages` messages. - for _ in 0..nmessages { - stream.read_exact(&mut b[..]).await.unwrap(); - n += b.len(); - } - - tx.unbounded_send(n).expect("unbounded_send"); - stream.close().await.unwrap(); - }); - } - }; - task::spawn(server); - - let conn = Connection::new(client, config(), Mode::Client); - let (mut ctrl, conn) = Control::new(conn); - - task::spawn(conn.for_each(|r| { - r.unwrap(); - future::ready(()) - })); - - for _ in 0..nstreams { - let data = data.clone(); - let mut ctrl = ctrl.clone(); - task::spawn(async move { - let mut stream = ctrl.open_stream().await.unwrap(); - - // Send `nmessages` messages. - for _ in 0..nmessages { - stream.write_all(data.as_ref()).await.unwrap(); - } - - stream.close().await.unwrap(); - }); - } - - let n = rx + let messages = iter::repeat(data) + .map(|b| Msg(b.0.to_vec())) .take(nstreams) - .fold(0, |acc, n| future::ready(acc + n)) - .await; - assert_eq!(n, nstreams * nmessages * msg_len); - ctrl.close().await.expect("close"); + .collect(); // `MessageSender` will use 1 stream per message. + let num_streams_used = MessageSender::new(client, messages, true) + .with_message_multiplier(nmessages as u64) + .with_strategy(MessageSenderStrategy::Send) + .await + .unwrap(); + + assert_eq!(num_streams_used, nstreams); } diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index e4d0d3c9..ff99093c 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -69,8 +69,8 @@ fn new_socket(buffer_sizes: Option) -> io::Result { /// For each incoming stream of `c` echo back to the sender. pub async fn echo_server(mut c: Connection) -> Result<(), ConnectionError> -where - T: AsyncRead + AsyncWrite + Unpin, + where + T: AsyncRead + AsyncWrite + Unpin, { stream::poll_fn(|cx| c.poll_next_inbound(cx)) .try_for_each_concurrent(None, |mut stream| async move { @@ -84,6 +84,27 @@ where .await } +/// For each incoming stream of `c`, read to end but don't write back. +pub async fn dev_null_server(mut c: Connection) -> Result<(), ConnectionError> + where + T: AsyncRead + AsyncWrite + Unpin, +{ + stream::poll_fn(|cx| c.poll_next_inbound(cx)) + .try_for_each_concurrent(None, |mut stream| async move { + let mut buf = [0u8; 1024]; + + while let Ok(n) = stream.read(&mut buf).await { + if n == 0 { + break; + } + } + + stream.close().await?; + Ok(()) + }) + .await +} + pub struct MessageSender { connection: Connection, pending_messages: Vec, @@ -93,6 +114,13 @@ pub struct MessageSender { spawn_tasks: bool, /// How many times to send each message on the stream message_multiplier: u64, + strategy: MessageSenderStrategy, +} + +#[derive(Copy, Clone)] +pub enum MessageSenderStrategy { + SendRecv, + Send, } impl MessageSender { @@ -104,6 +132,7 @@ impl MessageSender { streams_processed: 0, spawn_tasks, message_multiplier: 1, + strategy: MessageSenderStrategy::SendRecv, } } @@ -111,11 +140,16 @@ impl MessageSender { self.message_multiplier = multiplier; self } + + pub fn with_strategy(mut self, strategy: MessageSenderStrategy) -> Self { + self.strategy = strategy; + self + } } impl Future for MessageSender -where - T: AsyncRead + AsyncWrite + Unpin, + where + T: AsyncRead + AsyncWrite + Unpin, { type Output = yamux::Result; @@ -133,10 +167,18 @@ where match this.connection.poll_new_outbound(cx)? { Poll::Ready(mut stream) => { let multiplier = this.message_multiplier; + let strategy = this.strategy; let future = async move { for _ in 0..multiplier { - send_recv_message(&mut stream, &message).await.unwrap(); + match strategy { + MessageSenderStrategy::SendRecv => { + send_recv_message(&mut stream, &message).await.unwrap() + } + MessageSenderStrategy::Send => { + stream.write_all(&message.0).await.unwrap() + } + }; } stream.close().await.unwrap(); @@ -182,12 +224,12 @@ where } /// For each incoming stream, do nothing. -pub async fn noop_server(c: impl Stream>) { +pub async fn noop_server(c: impl Stream>) { c.for_each(|maybe_stream| { drop(maybe_stream); future::ready(()) }) - .await; + .await; } /// Send and receive buffer size for a TCP socket. @@ -239,7 +281,7 @@ pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): &Msg) -> io /// Send all messages, using only a single stream. pub async fn send_on_single_stream( mut stream: yamux::Stream, - iter: impl IntoIterator, + iter: impl IntoIterator, ) -> Result<(), ConnectionError> { log::debug!("C: new stream: {}", stream); @@ -271,8 +313,8 @@ impl EchoServer { } impl Future for EchoServer -where - T: AsyncRead + AsyncWrite + Unpin, + where + T: AsyncRead + AsyncWrite + Unpin, { type Output = yamux::Result; @@ -308,7 +350,7 @@ where stream.close().await?; Ok(()) } - .boxed(), + .boxed(), ); continue; } @@ -342,8 +384,8 @@ impl OpenStreamsClient { } impl Future for OpenStreamsClient -where - T: AsyncRead + AsyncWrite + Unpin + fmt::Debug, + where + T: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { type Output = yamux::Result<(Connection, Vec)>; @@ -395,7 +437,7 @@ impl Arbitrary for Msg { msg } - fn shrink(&self) -> Box> { + fn shrink(&self) -> Box> { Box::new(self.0.shrink().filter(|v| !v.is_empty()).map(Msg)) } } From d5d5972cbdec1b6d045c1b1d11adcd453b719eb0 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Jun 2023 12:41:34 +0100 Subject: [PATCH 14/16] Remove `Control` and `ControlledConnection` --- CHANGELOG.md | 6 ++ yamux/Cargo.toml | 2 +- yamux/src/control.rs | 246 ------------------------------------------- yamux/src/lib.rs | 8 -- 4 files changed, 7 insertions(+), 255 deletions(-) delete mode 100644 yamux/src/control.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index f5cbde49..e0a3f271 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.12.0 - unreleased + +- Remove `Control` and `ControlledConnection`. + Users are encouraged to move to the `poll_` functions of `Connection`. + See [PR #164](https://github.com/libp2p/rust-yamux/pull/164). + # 0.11.1 - Avoid race condition between pending frames and closing stream. diff --git a/yamux/Cargo.toml b/yamux/Cargo.toml index b1407ac0..431bd6c7 100644 --- a/yamux/Cargo.toml +++ b/yamux/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yamux" -version = "0.11.1" +version = "0.12.0" authors = ["Parity Technologies "] license = "Apache-2.0 OR MIT" description = "Multiplexer over reliable, ordered connections" diff --git a/yamux/src/control.rs b/yamux/src/control.rs deleted file mode 100644 index 48c9aa86..00000000 --- a/yamux/src/control.rs +++ /dev/null @@ -1,246 +0,0 @@ -// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -use crate::MAX_COMMAND_BACKLOG; -use crate::{error::ConnectionError, Connection, Result, Stream}; -use futures::{ - channel::{mpsc, oneshot}, - prelude::*, -}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// A Yamux [`Connection`] controller. -/// -/// This presents an alternative API for using a yamux [`Connection`]. -/// -/// A [`Control`] communicates with a [`ControlledConnection`] via a channel. This allows -/// a [`Control`] to be cloned and shared between tasks and threads. -#[derive(Clone, Debug)] -pub struct Control { - /// Command channel to [`ControlledConnection`]. - sender: mpsc::Sender, -} - -impl Control { - pub fn new(connection: Connection) -> (Self, ControlledConnection) { - let (sender, receiver) = mpsc::channel(MAX_COMMAND_BACKLOG); - - let control = Control { sender }; - let connection = ControlledConnection { - state: State::Idle(connection), - commands: receiver, - }; - - (control, connection) - } - - /// Open a new stream to the remote. - pub async fn open_stream(&mut self) -> Result { - let (tx, rx) = oneshot::channel(); - self.sender.send(ControlCommand::OpenStream(tx)).await?; - rx.await? - } - - /// Close the connection. - pub async fn close(&mut self) -> Result<()> { - let (tx, rx) = oneshot::channel(); - if self - .sender - .send(ControlCommand::CloseConnection(tx)) - .await - .is_err() - { - // The receiver is closed which means the connection is already closed. - return Ok(()); - } - // A dropped `oneshot::Sender` means the `Connection` is gone, - // so we do not treat receive errors differently here. - let _ = rx.await; - Ok(()) - } -} - -/// Wraps a [`Connection`] which can be controlled with a [`Control`]. -pub struct ControlledConnection { - state: State, - commands: mpsc::Receiver, -} - -impl ControlledConnection -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - loop { - match std::mem::replace(&mut self.state, State::Poisoned) { - State::Idle(mut connection) => { - match connection.poll_next_inbound(cx) { - Poll::Ready(maybe_stream) => { - self.state = State::Idle(connection); - return Poll::Ready(maybe_stream); - } - Poll::Pending => {} - } - - match self.commands.poll_next_unpin(cx) { - Poll::Ready(Some(ControlCommand::OpenStream(reply))) => { - self.state = State::OpeningNewStream { reply, connection }; - continue; - } - Poll::Ready(Some(ControlCommand::CloseConnection(reply))) => { - self.commands.close(); - - self.state = State::Closing { - reply: Some(reply), - inner: Closing::DrainingControlCommands { connection }, - }; - continue; - } - Poll::Ready(None) => { - // Last `Control` sender was dropped, close te connection. - self.state = State::Closing { - reply: None, - inner: Closing::ClosingConnection { connection }, - }; - continue; - } - Poll::Pending => {} - } - - self.state = State::Idle(connection); - return Poll::Pending; - } - State::OpeningNewStream { - reply, - mut connection, - } => match connection.poll_new_outbound(cx) { - Poll::Ready(stream) => { - let _ = reply.send(stream); - - self.state = State::Idle(connection); - continue; - } - Poll::Pending => { - self.state = State::OpeningNewStream { reply, connection }; - return Poll::Pending; - } - }, - State::Closing { - reply, - inner: Closing::DrainingControlCommands { connection }, - } => match self.commands.poll_next_unpin(cx) { - Poll::Ready(Some(ControlCommand::OpenStream(new_reply))) => { - let _ = new_reply.send(Err(ConnectionError::Closed)); - - self.state = State::Closing { - reply, - inner: Closing::DrainingControlCommands { connection }, - }; - continue; - } - Poll::Ready(Some(ControlCommand::CloseConnection(new_reply))) => { - let _ = new_reply.send(()); - - self.state = State::Closing { - reply, - inner: Closing::DrainingControlCommands { connection }, - }; - continue; - } - Poll::Ready(None) => { - self.state = State::Closing { - reply, - inner: Closing::ClosingConnection { connection }, - }; - continue; - } - Poll::Pending => { - self.state = State::Closing { - reply, - inner: Closing::DrainingControlCommands { connection }, - }; - return Poll::Pending; - } - }, - State::Closing { - reply, - inner: Closing::ClosingConnection { mut connection }, - } => match connection.poll_close(cx) { - Poll::Ready(Ok(())) | Poll::Ready(Err(ConnectionError::Closed)) => { - if let Some(reply) = reply { - let _ = reply.send(()); - } - return Poll::Ready(None); - } - Poll::Ready(Err(other)) => { - if let Some(reply) = reply { - let _ = reply.send(()); - } - return Poll::Ready(Some(Err(other))); - } - Poll::Pending => { - self.state = State::Closing { - reply, - inner: Closing::ClosingConnection { connection }, - }; - return Poll::Pending; - } - }, - State::Poisoned => unreachable!(), - } - } - } -} - -#[derive(Debug)] -enum ControlCommand { - /// Open a new stream to the remote end. - OpenStream(oneshot::Sender>), - /// Close the whole connection. - CloseConnection(oneshot::Sender<()>), -} - -/// The state of a [`ControlledConnection`]. -enum State { - Idle(Connection), - OpeningNewStream { - reply: oneshot::Sender>, - connection: Connection, - }, - Closing { - /// A channel to the [`Control`] in case the close was requested. `None` if we are closing because the last [`Control`] was dropped. - reply: Option>, - inner: Closing, - }, - Poisoned, -} - -/// A sub-state of our larger state machine for a [`ControlledConnection`]. -/// -/// Closing connection involves two steps: -/// -/// 1. Draining and answered all remaining [`ControlCommands`]. -/// 1. Closing the underlying [`Connection`]. -enum Closing { - DrainingControlCommands { connection: Connection }, - ClosingConnection { connection: Connection }, -} - -impl futures::Stream for ControlledConnection -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().poll_next(cx) - } -} diff --git a/yamux/src/lib.rs b/yamux/src/lib.rs index 040bd8d7..8de20344 100644 --- a/yamux/src/lib.rs +++ b/yamux/src/lib.rs @@ -25,7 +25,6 @@ #![forbid(unsafe_code)] mod chunks; -mod control; mod error; mod frame; @@ -33,7 +32,6 @@ pub(crate) mod connection; mod tagged_stream; pub use crate::connection::{Connection, Mode, Packet, Stream}; -pub use crate::control::{Control, ControlledConnection}; pub use crate::error::ConnectionError; pub use crate::frame::{ header::{HeaderDecodeError, StreamId}, @@ -44,12 +42,6 @@ pub const DEFAULT_CREDIT: u32 = 256 * 1024; // as per yamux specification pub type Result = std::result::Result; -/// Arbitrary limit of our internal command channels. -/// -/// Since each [`mpsc::Sender`] gets a guaranteed slot in a channel the -/// actual upper bound is this value + number of clones. -const MAX_COMMAND_BACKLOG: usize = 32; - /// Default maximum number of bytes a Yamux data frame might carry as its /// payload when being send. Larger Payloads will be split. /// From 40f01c62c82eec0dfee72824f23aaf4786d0d65a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 30 Jun 2023 12:44:01 +0100 Subject: [PATCH 15/16] Fix formatting --- test-harness/src/lib.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index ff99093c..4c4820cc 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -69,8 +69,8 @@ fn new_socket(buffer_sizes: Option) -> io::Result { /// For each incoming stream of `c` echo back to the sender. pub async fn echo_server(mut c: Connection) -> Result<(), ConnectionError> - where - T: AsyncRead + AsyncWrite + Unpin, +where + T: AsyncRead + AsyncWrite + Unpin, { stream::poll_fn(|cx| c.poll_next_inbound(cx)) .try_for_each_concurrent(None, |mut stream| async move { @@ -86,8 +86,8 @@ pub async fn echo_server(mut c: Connection) -> Result<(), ConnectionError> /// For each incoming stream of `c`, read to end but don't write back. pub async fn dev_null_server(mut c: Connection) -> Result<(), ConnectionError> - where - T: AsyncRead + AsyncWrite + Unpin, +where + T: AsyncRead + AsyncWrite + Unpin, { stream::poll_fn(|cx| c.poll_next_inbound(cx)) .try_for_each_concurrent(None, |mut stream| async move { @@ -148,8 +148,8 @@ impl MessageSender { } impl Future for MessageSender - where - T: AsyncRead + AsyncWrite + Unpin, +where + T: AsyncRead + AsyncWrite + Unpin, { type Output = yamux::Result; @@ -224,12 +224,12 @@ impl Future for MessageSender } /// For each incoming stream, do nothing. -pub async fn noop_server(c: impl Stream>) { +pub async fn noop_server(c: impl Stream>) { c.for_each(|maybe_stream| { drop(maybe_stream); future::ready(()) }) - .await; + .await; } /// Send and receive buffer size for a TCP socket. @@ -281,7 +281,7 @@ pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): &Msg) -> io /// Send all messages, using only a single stream. pub async fn send_on_single_stream( mut stream: yamux::Stream, - iter: impl IntoIterator, + iter: impl IntoIterator, ) -> Result<(), ConnectionError> { log::debug!("C: new stream: {}", stream); @@ -313,8 +313,8 @@ impl EchoServer { } impl Future for EchoServer - where - T: AsyncRead + AsyncWrite + Unpin, +where + T: AsyncRead + AsyncWrite + Unpin, { type Output = yamux::Result; @@ -350,7 +350,7 @@ impl Future for EchoServer stream.close().await?; Ok(()) } - .boxed(), + .boxed(), ); continue; } @@ -384,8 +384,8 @@ impl OpenStreamsClient { } impl Future for OpenStreamsClient - where - T: AsyncRead + AsyncWrite + Unpin + fmt::Debug, +where + T: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { type Output = yamux::Result<(Connection, Vec)>; @@ -437,7 +437,7 @@ impl Arbitrary for Msg { msg } - fn shrink(&self) -> Box> { + fn shrink(&self) -> Box> { Box::new(self.0.shrink().filter(|v| !v.is_empty()).map(Msg)) } } From c05525b98cfdaf01f0a55e037219db1c35cffa8a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 3 Jul 2023 07:53:01 +0100 Subject: [PATCH 16/16] Update CHANGELOG.md Co-authored-by: Max Inden --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0a3f271..b9389a98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ # 0.12.0 - unreleased - Remove `Control` and `ControlledConnection`. - Users are encouraged to move to the `poll_` functions of `Connection`. + Users have to move to the `poll_` functions of `Connection`. See [PR #164](https://github.com/libp2p/rust-yamux/pull/164). # 0.11.1