Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set additive flag in initial window update. #95

Merged
merged 6 commits into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
# 0.6.0

Upgrade step 2 of 4. This version sets the non-standard flag, version 0.5.0
already recognises.

# 0.5.0

This version begins the upgrade process spawning multiple versions that
changes the meaning of the initial window update from *"This is the total
size of the receive window."* to *"This is the size of the receive window
in addition to the default size."* This is necessary for compatibility
with other yamux implementations. See issue #92 for details.

As a first step, version 0.5.0 interprets a non-standard flag to imply the
new meaning. Future versions will set this flag and eventually the new
meaning will always be assumed. Upgrading from the current implemention to
the new semantics requires deployment of every intermediate version, each of
which is only compatible with its immediate predecessor. Alternatively, if
the default configuration together with `lazy_open` set to `true` is
deployed on all communicating endpoints, one can skip directly to the end
of the transition.

# 0.4.9

- Bugfixes (#93).
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yamux"
version = "0.4.9"
version = "0.6.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "Apache-2.0 OR MIT"
description = "Multiplexer over reliable, ordered connections"
Expand Down
20 changes: 9 additions & 11 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,23 +463,20 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
}
log::trace!("{}: creating new outbound stream", self.id);
let id = self.next_stream_id()?;
if !self.config.lazy_open {
let mut frame = Frame::window_update(id, self.config.receive_window);
let extra_credit = self.config.receive_window - DEFAULT_CREDIT;
if extra_credit > 0 {
let mut frame = Frame::window_update(id, extra_credit);
frame.header_mut().syn();
frame.header_mut().additive();
log::trace!("{}: sending initial {}", self.id, frame.header());
self.socket.get_mut().send(&frame).await.or(Err(ConnectionError::Closed))?
}
let stream = {
let config = self.config.clone();
let sender = self.stream_sender.clone();
let window =
if self.config.lazy_open {
DEFAULT_CREDIT
} else {
self.config.receive_window
};
let window = self.config.receive_window;
let mut stream = Stream::new(id, self.id, config, window, DEFAULT_CREDIT, sender);
if self.config.lazy_open {
if extra_credit == 0 {
stream.set_flag(stream::Flag::Syn)
}
stream
Expand All @@ -489,7 +486,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
self.streams.insert(id, stream);
} else {
log::debug!("{}: open stream {} has been cancelled", self.id, id);
if !self.config.lazy_open {
if extra_credit > 0 {
let mut header = Header::data(id, 0);
header.rst();
let frame = Frame::new(header);
Expand Down Expand Up @@ -746,6 +743,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
}

let is_finish = frame.header().flags().contains(header::FIN); // half-close
let is_additive = frame.header().flags().contains(header::ADD); // additive window update

if frame.header().flags().contains(header::SYN) { // new stream
if !self.is_valid_remote_id(stream_id, Tag::WindowUpdate) {
Expand All @@ -761,7 +759,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
return Action::Terminate(Frame::protocol_error())
}
let stream = {
let credit = frame.header().credit();
let credit = frame.header().credit() + if is_additive { DEFAULT_CREDIT } else { 0 };
let config = self.config.clone();
let sender = self.stream_sender.clone();
let mut stream = Stream::new(stream_id, self.id, config, DEFAULT_CREDIT, credit, sender);
Expand Down
24 changes: 12 additions & 12 deletions src/frame/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl<T: HasSyn> Header<T> {
pub fn syn(&mut self) {
self.flags.0 |= SYN.0
}

/// Set the [`ADD`] flag.
pub fn additive(&mut self) {
self.flags.0 |= ADD.0
}
}

impl<T: HasAck> Header<T> {
Expand Down Expand Up @@ -332,8 +337,6 @@ impl Flags {
}
}

const MAX_FLAG_VAL: u16 = 8;

/// Indicates the start of a new stream.
pub const SYN: Flags = Flags(1);

Expand All @@ -346,6 +349,10 @@ pub const FIN: Flags = Flags(4);
/// Indicates an immediate stream reset.
pub const RST: Flags = Flags(8);

/// Temporary flag indicating that the initial window update is additive.
/// (See https://github.com/paritytech/yamux/issues/92)
pub const ADD: Flags = Flags(0x8000);

/// The serialised header size in bytes.
pub const HEADER_SIZE: usize = 12;

Expand Down Expand Up @@ -381,10 +388,6 @@ pub fn decode(buf: &[u8; HEADER_SIZE]) -> Result<Header<()>, HeaderDecodeError>
_marker: std::marker::PhantomData
};

if hdr.flags.0 > MAX_FLAG_VAL {
return Err(HeaderDecodeError::Flags(hdr.flags.0))
}

Ok(hdr)
}

Expand All @@ -395,17 +398,14 @@ pub enum HeaderDecodeError {
/// Unknown version.
Version(u8),
/// An unknown frame type.
Type(u8),
/// Unknown flags.
Flags(u16)
Type(u8)
}

impl std::fmt::Display for HeaderDecodeError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
HeaderDecodeError::Version(v) => write!(f, "unknown version: {}", v),
HeaderDecodeError::Type(t) => write!(f, "unknown frame type: {}", t),
HeaderDecodeError::Flags(x) => write!(f, "unknown flags type: {}", x)
HeaderDecodeError::Type(t) => write!(f, "unknown frame type: {}", t)
}
}
}
Expand All @@ -428,7 +428,7 @@ mod tests {
Header {
version: Version(0),
tag,
flags: Flags(std::cmp::min(g.gen(), MAX_FLAG_VAL)),
flags: Flags(g.gen()),
stream_id: StreamId(g.gen()),
length: Len(g.gen()),
_marker: std::marker::PhantomData
Expand Down
11 changes: 4 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,13 @@ pub enum WindowUpdateMode {
/// - max. number of streams = 8192
/// - window update mode = on receive
/// - read after close = true
/// - lazy open = false
#[derive(Debug, Clone)]
pub struct Config {
receive_window: u32,
max_buffer_size: usize,
max_num_streams: usize,
window_update_mode: WindowUpdateMode,
read_after_close: bool,
lazy_open: bool
read_after_close: bool
}

impl Default for Config {
Expand All @@ -94,8 +92,7 @@ impl Default for Config {
max_buffer_size: 1024 * 1024,
max_num_streams: 8192,
window_update_mode: WindowUpdateMode::OnReceive,
read_after_close: true,
lazy_open: false
read_after_close: true
}
}
}
Expand Down Expand Up @@ -149,8 +146,8 @@ impl Config {
/// to the remote. This allows opening a stream with a custom receive
/// window size (cf. [`Config::set_receive_window`]) which the remote
/// can directly make use of.
pub fn set_lazy_open(&mut self, b: bool) -> &mut Self {
self.lazy_open = b;
#[deprecated(since = "0.6.0")]
pub fn set_lazy_open(&mut self, _: bool) -> &mut Self {
self
}
}
Expand Down
1 change: 0 additions & 1 deletion src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ impl Arbitrary for TestConfig {
} else {
WindowUpdateMode::OnReceive
});
c.set_lazy_open(g.gen());
c.set_read_after_close(g.gen());
c.set_receive_window(g.gen_range(256 * 1024, 1024 * 1024));
TestConfig(c)
Expand Down