diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index b3e81f083..299728a45 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -1107,6 +1107,15 @@ impl Connection { self.streams.set_max_concurrent(dir, count); } + /// Current number of remotely initiated streams that may be concurrently open + /// + /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams), + /// it will not change immediately, even if fewer streams are open. Instead, it will + /// decrement by one for each time a remotely initiated stream of matching directionality is closed. + pub fn max_concurrent_streams(&self, dir: Dir) -> u64 { + self.streams.max_concurrent(dir) + } + /// See [`TransportConfig::receive_window()`] pub fn set_receive_window(&mut self, receive_window: VarInt) { if self.streams.set_receive_window(receive_window) { diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs index 9c7b7fa2a..e3a019157 100644 --- a/quinn-proto/src/connection/streams/mod.rs +++ b/quinn-proto/src/connection/streams/mod.rs @@ -56,6 +56,7 @@ impl<'a> Streams<'a> { /// Accept a remotely initiated stream of a certain directionality, if possible /// /// Returns `None` if there are no new incoming streams for this connection. + /// Has no impact on the data flow-control or stream concurrency limits. pub fn accept(&mut self, dir: Dir) -> Option { if self.state.next_remote[dir as usize] == self.state.next_reported_remote[dir as usize] { return None; @@ -79,6 +80,18 @@ impl<'a> Streams<'a> { pub fn send_streams(&self) -> usize { self.state.send_streams } + + /// The number of remotely initiated open streams of a certain directionality. + /// + /// Includes remotely initiated streams, which have not been accepted via [`accept`](Self::accept). + /// These streams count against the respective concurrency limit reported by + /// [`Connection::max_concurrent_streams`](super::Connection::max_concurrent_streams). + pub fn remote_open_streams(&self, dir: Dir) -> u64 { + // total opened - total closed = total opened - ( total permitted - total permitted unclosed ) + self.state.next_remote[dir as usize] + - (self.state.max_remote[dir as usize] + - self.state.allocated_remote_count[dir as usize]) + } } /// Access to streams diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index 558e1d1c7..e82d6ab3d 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -31,9 +31,9 @@ pub struct StreamsState { pub(super) max: [u64; 2], /// Maximum number of remotely-initiated streams that may be opened over the lifetime of the /// connection so far, per direction - max_remote: [u64; 2], + pub(super) max_remote: [u64; 2], /// Number of streams that we've given the peer permission to open and which aren't fully closed - allocated_remote_count: [u64; 2], + pub(super) allocated_remote_count: [u64; 2], /// Size of the desired stream flow control window. May be smaller than `allocated_remote_count` /// due to `set_max_concurrent` calls. max_concurrent_remote_count: [u64; 2], @@ -763,6 +763,10 @@ impl StreamsState { self.ensure_remote_streams(dir); } + pub fn max_concurrent(&self, dir: Dir) -> u64 { + self.allocated_remote_count[dir as usize] + } + /// Set the receive_window and returns wether the receive_window has been /// expanded or shrunk: true if expanded, false if shrunk. pub fn set_receive_window(&mut self, receive_window: VarInt) -> bool {