Skip to content

Commit

Permalink
Fix for a fuzzer-discovered integer underflow of the flow control win…
Browse files Browse the repository at this point in the history
…dow size (#692)

Removed the SubAssign, etc. syntactic sugar functions and switched to return Result on over/underflow

Whenever possible, switched to returning a library GoAway protocol
error. Otherwise we check for over/underflow only with `debug_assert!`,
assuming that those code paths do not over/underflow.


Signed-off-by: Michael Rodler <mrodler@amazon.de>
Signed-off-by: Daniele Ahmed <ahmeddan@amazon.de>
Co-authored-by: Michael Rodler <mrodler@amazon.de>
Co-authored-by: Daniele Ahmed <ahmeddan@amazon.de>
  • Loading branch information
3 people authored Jun 26, 2023
1 parent 478f7b9 commit 0189722
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 65 deletions.
4 changes: 3 additions & 1 deletion src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ where

/// connection flow control
pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
self.inner.streams.set_target_connection_window_size(size);
let _res = self.inner.streams.set_target_connection_window_size(size);
// TODO: proper error handling
debug_assert!(_res.is_ok());
}

/// Send a new SETTINGS frame with an updated initial window size.
Expand Down
2 changes: 1 addition & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub type PingPayload = [u8; 8];
pub type WindowSize = u32;

// Constants
pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1;
pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; // i32::MAX as u32
pub const DEFAULT_REMOTE_RESET_STREAM_MAX: usize = 20;
pub const DEFAULT_RESET_STREAM_MAX: usize = 10;
pub const DEFAULT_RESET_STREAM_SECS: u64 = 30;
Expand Down
73 changes: 40 additions & 33 deletions src/proto/streams/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ impl FlowControl {
self.window_size > self.available
}

pub fn claim_capacity(&mut self, capacity: WindowSize) {
self.available -= capacity;
pub fn claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
self.available.decrease_by(capacity)
}

pub fn assign_capacity(&mut self, capacity: WindowSize) {
self.available += capacity;
pub fn assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
self.available.increase_by(capacity)
}

/// If a WINDOW_UPDATE frame should be sent, returns a positive number
Expand Down Expand Up @@ -136,36 +136,38 @@ impl FlowControl {
///
/// This is called after receiving a SETTINGS frame with a lower
/// INITIAL_WINDOW_SIZE value.
pub fn dec_send_window(&mut self, sz: WindowSize) {
pub fn dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
tracing::trace!(
"dec_window; sz={}; window={}, available={}",
sz,
self.window_size,
self.available
);
// This should not be able to overflow `window_size` from the bottom.
self.window_size -= sz;
// ~~This should not be able to overflow `window_size` from the bottom.~~ wrong. it can.
self.window_size.decrease_by(sz)?;
Ok(())
}

/// Decrement the recv-side window size.
///
/// This is called after receiving a SETTINGS ACK frame with a lower
/// INITIAL_WINDOW_SIZE value.
pub fn dec_recv_window(&mut self, sz: WindowSize) {
pub fn dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
tracing::trace!(
"dec_recv_window; sz={}; window={}, available={}",
sz,
self.window_size,
self.available
);
// This should not be able to overflow `window_size` from the bottom.
self.window_size -= sz;
self.available -= sz;
self.window_size.decrease_by(sz)?;
self.available.decrease_by(sz)?;
Ok(())
}

/// Decrements the window reflecting data has actually been sent. The caller
/// must ensure that the window has capacity.
pub fn send_data(&mut self, sz: WindowSize) {
pub fn send_data(&mut self, sz: WindowSize) -> Result<(), Reason> {
tracing::trace!(
"send_data; sz={}; window={}; available={}",
sz,
Expand All @@ -176,12 +178,13 @@ impl FlowControl {
// If send size is zero it's meaningless to update flow control window
if sz > 0 {
// Ensure that the argument is correct
assert!(self.window_size >= sz as usize);
assert!(self.window_size.0 >= sz as i32);

// Update values
self.window_size -= sz;
self.available -= sz;
self.window_size.decrease_by(sz)?;
self.available.decrease_by(sz)?;
}
Ok(())
}
}

Expand All @@ -208,6 +211,29 @@ impl Window {
assert!(self.0 >= 0, "negative Window");
self.0 as WindowSize
}

pub fn decrease_by(&mut self, other: WindowSize) -> Result<(), Reason> {
if let Some(v) = self.0.checked_sub(other as i32) {
self.0 = v;
Ok(())
} else {
Err(Reason::FLOW_CONTROL_ERROR)
}
}

pub fn increase_by(&mut self, other: WindowSize) -> Result<(), Reason> {
let other = self.add(other)?;
self.0 = other.0;
Ok(())
}

pub fn add(&self, other: WindowSize) -> Result<Self, Reason> {
if let Some(v) = self.0.checked_add(other as i32) {
Ok(Self(v))
} else {
Err(Reason::FLOW_CONTROL_ERROR)
}
}
}

impl PartialEq<usize> for Window {
Expand All @@ -230,25 +256,6 @@ impl PartialOrd<usize> for Window {
}
}

impl ::std::ops::SubAssign<WindowSize> for Window {
fn sub_assign(&mut self, other: WindowSize) {
self.0 -= other as i32;
}
}

impl ::std::ops::Add<WindowSize> for Window {
type Output = Self;
fn add(self, other: WindowSize) -> Self::Output {
Window(self.0 + other as i32)
}
}

impl ::std::ops::AddAssign<WindowSize> for Window {
fn add_assign(&mut self, other: WindowSize) {
self.0 += other as i32;
}
}

impl fmt::Display for Window {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
Expand Down
32 changes: 24 additions & 8 deletions src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ impl Prioritize {
flow.inc_window(config.remote_init_window_sz)
.expect("invalid initial window size");

flow.assign_capacity(config.remote_init_window_sz);
// TODO: proper error handling
let _res = flow.assign_capacity(config.remote_init_window_sz);
debug_assert!(_res.is_ok());

tracing::trace!("Prioritize::new; flow={:?}", flow);

Expand Down Expand Up @@ -253,7 +255,9 @@ impl Prioritize {
if available as usize > capacity {
let diff = available - capacity as WindowSize;

stream.send_flow.claim_capacity(diff);
// TODO: proper error handling
let _res = stream.send_flow.claim_capacity(diff);
debug_assert!(_res.is_ok());

self.assign_connection_capacity(diff, stream, counts);
}
Expand Down Expand Up @@ -324,7 +328,9 @@ impl Prioritize {
pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
let available = stream.send_flow.available().as_size();
if available > 0 {
stream.send_flow.claim_capacity(available);
// TODO: proper error handling
let _res = stream.send_flow.claim_capacity(available);
debug_assert!(_res.is_ok());
// Re-assign all capacity to the connection
self.assign_connection_capacity(available, stream, counts);
}
Expand All @@ -337,7 +343,9 @@ impl Prioritize {
if stream.requested_send_capacity as usize > stream.buffered_send_data {
let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize;

stream.send_flow.claim_capacity(reserved);
// TODO: proper error handling
let _res = stream.send_flow.claim_capacity(reserved);
debug_assert!(_res.is_ok());
self.assign_connection_capacity(reserved, stream, counts);
}
}
Expand All @@ -363,7 +371,9 @@ impl Prioritize {
let span = tracing::trace_span!("assign_connection_capacity", inc);
let _e = span.enter();

self.flow.assign_capacity(inc);
// TODO: proper error handling
let _res = self.flow.assign_capacity(inc);
debug_assert!(_res.is_ok());

// Assign newly acquired capacity to streams pending capacity.
while self.flow.available() > 0 {
Expand Down Expand Up @@ -443,7 +453,9 @@ impl Prioritize {
stream.assign_capacity(assign, self.max_buffer_size);

// Claim the capacity from the connection
self.flow.claim_capacity(assign);
// TODO: proper error handling
let _res = self.flow.claim_capacity(assign);
debug_assert!(_res.is_ok());
}

tracing::trace!(
Expand Down Expand Up @@ -763,12 +775,16 @@ impl Prioritize {
// Assign the capacity back to the connection that
// was just consumed from the stream in the previous
// line.
self.flow.assign_capacity(len);
// TODO: proper error handling
let _res = self.flow.assign_capacity(len);
debug_assert!(_res.is_ok());
});

let (eos, len) = tracing::trace_span!("updating connection flow")
.in_scope(|| {
self.flow.send_data(len);
// TODO: proper error handling
let _res = self.flow.send_data(len);
debug_assert!(_res.is_ok());

// Wrap the frame's data payload to ensure that the
// correct amount of data gets written.
Expand Down
49 changes: 36 additions & 13 deletions src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Recv {
// settings
flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
.expect("invalid initial remote window size");
flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE);
flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();

Recv {
init_window_sz: config.local_init_window_sz,
Expand Down Expand Up @@ -363,7 +363,9 @@ impl Recv {
self.in_flight_data -= capacity;

// Assign capacity to connection
self.flow.assign_capacity(capacity);
// TODO: proper error handling
let _res = self.flow.assign_capacity(capacity);
debug_assert!(_res.is_ok());

if self.flow.unclaimed_capacity().is_some() {
if let Some(task) = task.take() {
Expand Down Expand Up @@ -391,7 +393,9 @@ impl Recv {
stream.in_flight_recv_data -= capacity;

// Assign capacity to stream
stream.recv_flow.assign_capacity(capacity);
// TODO: proper error handling
let _res = stream.recv_flow.assign_capacity(capacity);
debug_assert!(_res.is_ok());

if stream.recv_flow.unclaimed_capacity().is_some() {
// Queue the stream for sending the WINDOW_UPDATE frame.
Expand Down Expand Up @@ -437,7 +441,11 @@ impl Recv {
///
/// The `task` is an optional parked task for the `Connection` that might
/// be blocked on needing more window capacity.
pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>) {
pub fn set_target_connection_window(
&mut self,
target: WindowSize,
task: &mut Option<Waker>,
) -> Result<(), Reason> {
tracing::trace!(
"set_target_connection_window; target={}; available={}, reserved={}",
target,
Expand All @@ -450,11 +458,15 @@ impl Recv {
//
// Update the flow controller with the difference between the new
// target and the current target.
let current = (self.flow.available() + self.in_flight_data).checked_size();
let current = self
.flow
.available()
.add(self.in_flight_data)?
.checked_size();
if target > current {
self.flow.assign_capacity(target - current);
self.flow.assign_capacity(target - current)?;
} else {
self.flow.claim_capacity(current - target);
self.flow.claim_capacity(current - target)?;
}

// If changing the target capacity means we gained a bunch of capacity,
Expand All @@ -465,6 +477,7 @@ impl Recv {
task.wake();
}
}
Ok(())
}

pub(crate) fn apply_local_settings(
Expand Down Expand Up @@ -504,9 +517,13 @@ impl Recv {
let dec = old_sz - target;
tracing::trace!("decrementing all windows; dec={}", dec);

store.for_each(|mut stream| {
stream.recv_flow.dec_recv_window(dec);
})
store.try_for_each(|mut stream| {
stream
.recv_flow
.dec_recv_window(dec)
.map_err(proto::Error::library_go_away)?;
Ok::<_, proto::Error>(())
})?;
}
Ordering::Greater => {
// We must increase the (local) window on every open stream.
Expand All @@ -519,7 +536,10 @@ impl Recv {
.recv_flow
.inc_window(inc)
.map_err(proto::Error::library_go_away)?;
stream.recv_flow.assign_capacity(inc);
stream
.recv_flow
.assign_capacity(inc)
.map_err(proto::Error::library_go_away)?;
Ok::<_, proto::Error>(())
})?;
}
Expand Down Expand Up @@ -626,7 +646,10 @@ impl Recv {
}

// Update stream level flow control
stream.recv_flow.send_data(sz);
stream
.recv_flow
.send_data(sz)
.map_err(proto::Error::library_go_away)?;

// Track the data as in-flight
stream.in_flight_recv_data += sz;
Expand Down Expand Up @@ -667,7 +690,7 @@ impl Recv {
}

// Update connection level flow control
self.flow.send_data(sz);
self.flow.send_data(sz).map_err(Error::library_go_away)?;

// Track the data as in-flight
self.in_flight_data += sz;
Expand Down
Loading

0 comments on commit 0189722

Please sign in to comment.