From a85b148a17c2123d36e48f9a8e67bd932d062469 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 23 Nov 2023 16:32:55 +0100 Subject: [PATCH 1/2] chore: remove `WindowUpdateMode::OnReceive` Follow up to previous deprecation https://github.com/libp2p/rust-yamux/pull/177. Fixes https://github.com/libp2p/rust-yamux/issues/175. --- CHANGELOG.md | 6 ++++ yamux/Cargo.toml | 3 +- yamux/src/connection.rs | 65 +++++++--------------------------- yamux/src/connection/stream.rs | 26 ++++---------- yamux/src/lib.rs | 34 ------------------ 5 files changed, 28 insertions(+), 106 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c60beba..6fca93f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.13.0 + +- Remove `WindowUpdateMode`. + Behavior will always be `WindowUpdateMode::OnRead`, thus enabling flow-control and enforcing backpressure. + See [PR 178](https://github.com/libp2p/rust-yamux/pull/178). + # 0.12.0 - Remove `Control` and `ControlledConnection`. diff --git a/yamux/Cargo.toml b/yamux/Cargo.toml index 431bd6c7..1cb16302 100644 --- a/yamux/Cargo.toml +++ b/yamux/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yamux" -version = "0.12.0" +version = "0.13.0" authors = ["Parity Technologies "] license = "Apache-2.0 OR MIT" description = "Multiplexer over reliable, ordered connections" @@ -19,4 +19,5 @@ static_assertions = "1" pin-project = "1.1.0" [dev-dependencies] +futures = { version = "0.3.12", default-features = false, features = ["executor"] } quickcheck = "1.0" diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index 52004d04..4fc92721 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -21,7 +21,7 @@ use crate::{ error::ConnectionError, frame::header::{self, Data, GoAway, Header, Ping, StreamId, Tag, WindowUpdate, CONNECTION_ID}, frame::{self, Frame}, - Config, WindowUpdateMode, DEFAULT_CREDIT, + Config, DEFAULT_CREDIT, }; use crate::{Result, MAX_ACK_BACKLOG}; use cleanup::Cleanup; @@ -304,9 +304,7 @@ enum Action { /// Nothing to be done. None, /// A new stream has been opened by the remote. - New(Stream, Option>), - /// A window update should be sent to the remote. - Update(Frame), + New(Stream), /// A ping should be answered. Ping(Frame), /// A stream should be reset. @@ -504,23 +502,13 @@ impl Active { // The remote may be out of credit though and blocked on // writing more data. We may need to reset the stream. State::SendClosed => { - if self.config.window_update_mode == WindowUpdateMode::OnRead - && shared.window == 0 - { - // The remote may be waiting for a window update - // which we will never send, so reset the stream now. - let mut header = Header::data(stream_id, 0); - header.rst(); - Some(Frame::new(header)) - } else { - // The remote has either still credit or will be given more - // (due to an enqueued window update or because the update - // mode is `OnReceive`) or we already have inbound frames in - // the socket buffer which will be processed later. In any - // case we will reply with an RST in `Connection::on_data` - // because the stream will no longer be known. - None - } + // The remote has either still credit or will be given more + // (due to an enqueued window update or because the update + // mode is `OnReceive`) or we already have inbound frames in + // the socket buffer which will be processed later. In any + // case we will reply with an RST in `Connection::on_data` + // because the stream will no longer be known. + None } // The stream was properly closed. We already have sent our FIN frame. The // remote end has already done so in the past. @@ -569,18 +557,10 @@ impl Active { }; match action { Action::None => {} - Action::New(stream, update) => { + Action::New(stream) => { log::trace!("{}: new inbound {} of {}", self.id, stream, self); - if let Some(f) = update { - log::trace!("{}/{}: sending update", self.id, f.header().stream_id()); - self.pending_frames.push_back(f.into()); - } return Ok(Some(stream)); } - Action::Update(f) => { - log::trace!("{}: sending update: {:?}", self.id, f.header()); - self.pending_frames.push_back(f.into()); - } Action::Ping(f) => { log::trace!("{}/{}: pong", self.id, f.header().stream_id()); self.pending_frames.push_back(f.into()); @@ -641,7 +621,6 @@ impl Active { return Action::Terminate(Frame::internal_error()); } let mut stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT); - let mut window_update = None; { let mut shared = stream.shared(); if is_finish { @@ -649,21 +628,10 @@ impl Active { } shared.window = shared.window.saturating_sub(frame.body_len()); shared.buffer.push(frame.into_body()); - - if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) { - if let Some(credit) = shared.next_window_update() { - shared.window += credit; - let mut frame = Frame::window_update(stream_id, credit); - frame.header_mut().ack(); - window_update = Some(frame) - } - } - } - if window_update.is_none() { - stream.set_flag(stream::Flag::Ack) } + stream.set_flag(stream::Flag::Ack); self.streams.insert(stream_id, stream.clone_shared()); - return Action::New(stream, window_update); + return Action::New(stream); } if let Some(s) = self.streams.get_mut(&stream_id) { @@ -695,13 +663,6 @@ impl Active { if let Some(w) = shared.reader.take() { w.wake() } - if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) { - if let Some(credit) = shared.next_window_update() { - shared.window += credit; - let frame = Frame::window_update(stream_id, credit); - return Action::Update(frame); - } - } } else { log::trace!( "{}/{}: data frame for unknown stream, possibly dropped earlier: {:?}", @@ -766,7 +727,7 @@ impl Active { .update_state(self.id, stream_id, State::RecvClosed); } self.streams.insert(stream_id, stream.clone_shared()); - return Action::New(stream, None); + return Action::New(stream); } if let Some(s) = self.streams.get_mut(&stream_id) { diff --git a/yamux/src/connection/stream.rs b/yamux/src/connection/stream.rs index c2d8396c..84ab08f8 100644 --- a/yamux/src/connection/stream.rs +++ b/yamux/src/connection/stream.rs @@ -16,7 +16,7 @@ use crate::{ header::{Data, Header, StreamId, WindowUpdate}, Frame, }, - Config, WindowUpdateMode, DEFAULT_CREDIT, + Config, DEFAULT_CREDIT, }; use futures::{ channel::mpsc, @@ -200,12 +200,6 @@ impl Stream { /// Send new credit to the sending side via a window update message if /// permitted. fn send_window_update(&mut self, cx: &mut Context) -> Poll> { - // When using [`WindowUpdateMode::OnReceive`] window update messages are - // send early on data receival (see [`crate::Connection::on_frame`]). - if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) { - return Poll::Ready(Ok(())); - } - let mut shared = self.shared.lock(); if let Some(credit) = shared.next_window_update() { @@ -487,6 +481,7 @@ impl Shared { current // Return the previous stream state for informational purposes. } + // TODO: This does not need to live in shared any longer. /// Calculate the number of additional window bytes the receiving side /// should grant the sending side via a window update message. /// @@ -499,19 +494,12 @@ impl Shared { return None; } - let new_credit = match self.config.window_update_mode { - WindowUpdateMode::OnReceive => { - debug_assert!(self.config.receive_window >= self.window); + let new_credit = { + debug_assert!(self.config.receive_window >= self.window); + let bytes_received = self.config.receive_window.saturating_sub(self.window); + let buffer_len: u32 = self.buffer.len().try_into().unwrap_or(std::u32::MAX); - self.config.receive_window.saturating_sub(self.window) - } - WindowUpdateMode::OnRead => { - debug_assert!(self.config.receive_window >= self.window); - let bytes_received = self.config.receive_window.saturating_sub(self.window); - let buffer_len: u32 = self.buffer.len().try_into().unwrap_or(std::u32::MAX); - - bytes_received.saturating_sub(buffer_len) - } + bytes_received.saturating_sub(buffer_len) }; // Send WindowUpdate message when half or more of the configured receive diff --git a/yamux/src/lib.rs b/yamux/src/lib.rs index 3ee8c1ad..20e0c38c 100644 --- a/yamux/src/lib.rs +++ b/yamux/src/lib.rs @@ -63,32 +63,6 @@ const MAX_ACK_BACKLOG: usize = 256; /// https://github.com/paritytech/yamux/issues/100. const DEFAULT_SPLIT_SEND_SIZE: usize = 16 * 1024; -/// Specifies when window update frames are sent. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum WindowUpdateMode { - /// Send window updates as soon as a [`Stream`]'s receive window drops to 0. - /// - /// This ensures that the sender can resume sending more data as soon as possible - /// but a slow reader on the receiving side may be overwhelmed, i.e. it accumulates - /// data in its buffer which may reach its limit (see `set_max_buffer_size`). - /// In this mode, window updates merely prevent head of line blocking but do not - /// effectively exercise back pressure on senders. - OnReceive, - - /// Send window updates only when data is read on the receiving end. - /// - /// This ensures that senders do not overwhelm receivers and keeps buffer usage - /// low. However, depending on the protocol, there is a risk of deadlock, namely - /// if both endpoints want to send data larger than the receivers window and they - /// do not read before finishing their writes. Use this mode only if you are sure - /// that this will never happen, i.e. if - /// - /// - Endpoints *A* and *B* never write at the same time, *or* - /// - Endpoints *A* and *B* write at most *n* frames concurrently such that the sum - /// of the frame lengths is less or equal to the available credit of *A* and *B* - /// respectively. - OnRead, -} /// Yamux configuration. /// @@ -105,7 +79,6 @@ pub struct Config { receive_window: u32, max_buffer_size: usize, max_num_streams: usize, - window_update_mode: WindowUpdateMode, read_after_close: bool, split_send_size: usize, } @@ -116,7 +89,6 @@ impl Default for Config { receive_window: DEFAULT_CREDIT, max_buffer_size: 1024 * 1024, max_num_streams: 8192, - window_update_mode: WindowUpdateMode::OnRead, read_after_close: true, split_send_size: DEFAULT_SPLIT_SEND_SIZE, } @@ -147,12 +119,6 @@ impl Config { self } - /// Set the window update mode to use. - pub fn set_window_update_mode(&mut self, m: WindowUpdateMode) -> &mut Self { - self.window_update_mode = m; - self - } - /// Allow or disallow streams to read from buffered data after /// the connection has been closed. pub fn set_read_after_close(&mut self, b: bool) -> &mut Self { From 2a6a14becc3ecd431e2a6de0d5df94f1e900f998 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 23 Nov 2023 16:36:44 +0100 Subject: [PATCH 2/2] fix(harness): remove mode --- test-harness/benches/concurrent.rs | 10 ++-------- test-harness/src/lib.rs | 7 +------ 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/test-harness/benches/concurrent.rs b/test-harness/benches/concurrent.rs index f7179cee..e619de40 100644 --- a/test-harness/benches/concurrent.rs +++ b/test-harness/benches/concurrent.rs @@ -74,12 +74,6 @@ fn concurrent(c: &mut Criterion) { group.finish(); } -fn config() -> Config { - let mut c = Config::default(); - c.set_window_update_mode(yamux::WindowUpdateMode::OnRead); - c -} - async fn oneway( nstreams: usize, nmessages: usize, @@ -87,8 +81,8 @@ async fn oneway( server: Endpoint, client: Endpoint, ) { - let server = Connection::new(server, config(), Mode::Server); - let client = Connection::new(client, config(), Mode::Client); + let server = Connection::new(server, Config::default(), Mode::Server); + let client = Connection::new(client, Config::default(), Mode::Client); task::spawn(dev_null_server(server)); diff --git a/test-harness/src/lib.rs b/test-harness/src/lib.rs index 4c4820cc..e02c12e0 100644 --- a/test-harness/src/lib.rs +++ b/test-harness/src/lib.rs @@ -13,8 +13,8 @@ use std::{fmt, io, mem}; use tokio::net::{TcpListener, TcpSocket, TcpStream}; use tokio::task; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; +use yamux::Config; use yamux::ConnectionError; -use yamux::{Config, WindowUpdateMode}; use yamux::{Connection, Mode}; pub async fn connected_peers( @@ -448,11 +448,6 @@ pub struct TestConfig(pub Config); impl Arbitrary for TestConfig { fn arbitrary(g: &mut Gen) -> Self { let mut c = Config::default(); - c.set_window_update_mode(if bool::arbitrary(g) { - WindowUpdateMode::OnRead - } else { - WindowUpdateMode::OnReceive - }); c.set_read_after_close(Arbitrary::arbitrary(g)); c.set_receive_window(256 * 1024 + u32::arbitrary(g) % (768 * 1024)); TestConfig(c)