Skip to content

Commit

Permalink
Don't pre-allocate memory for every possible stream per connection
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn committed Dec 17, 2023
1 parent 1a57c84 commit 9156a3e
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 50 deletions.
24 changes: 19 additions & 5 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use bytes::Bytes;
use thiserror::Error;
use tracing::trace;

use self::state::get_or_insert_recv;

use super::spaces::{Retransmits, ThinRetransmits};
use crate::{frame, Dir, StreamId, VarInt};
use crate::{connection::streams::state::get_or_insert_send, frame, Dir, StreamId, VarInt};

mod recv;
use recv::Recv;
Expand Down Expand Up @@ -133,7 +135,7 @@ impl<'a> RecvStream<'a> {
hash_map::Entry::Occupied(s) => s,
hash_map::Entry::Vacant(_) => return Err(UnknownStream { _private: () }),
};
let stream = entry.get_mut();
let stream = get_or_insert_recv(self.state.stream_receive_window)(entry.get_mut());

let (read_credits, stop_sending) = stream.stop()?;
if stop_sending.should_transmit() {
Expand Down Expand Up @@ -207,11 +209,16 @@ impl<'a> SendStream<'a> {
}

let limit = self.state.write_limit();

let max_send_data = self.state.max_send_data(self.id);

let stream = self
.state
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(WriteError::UnknownStream)?;

if limit == 0 {
trace!(
stream = %self.id, max_data = self.state.max_data, data_sent = self.state.data_sent,
Expand All @@ -237,8 +244,9 @@ impl<'a> SendStream<'a> {

/// Check if this stream was stopped, get the reason if it was
pub fn stopped(&mut self) -> Result<Option<VarInt>, UnknownStream> {
match self.state.send.get(&self.id) {
Some(s) => Ok(s.stop_reason),
match self.state.send.get(&self.id).as_ref() {
Some(Some(s)) => Ok(s.stop_reason),
Some(None) => Ok(None),
None => Err(UnknownStream { _private: () }),
}
}
Expand All @@ -249,10 +257,12 @@ impl<'a> SendStream<'a> {
///
/// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
pub fn finish(&mut self) -> Result<(), FinishError> {
let max_send_data = self.state.max_send_data(self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(FinishError::UnknownStream)?;

let was_pending = stream.is_pending();
Expand All @@ -269,10 +279,12 @@ impl<'a> SendStream<'a> {
/// # Panics
/// - when applied to a receive stream
pub fn reset(&mut self, error_code: VarInt) -> Result<(), UnknownStream> {
let max_send_data = self.state.max_send_data(self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(UnknownStream { _private: () })?;

if matches!(stream.state, SendState::ResetSent) {
Expand All @@ -296,10 +308,12 @@ impl<'a> SendStream<'a> {
/// # Panics
/// - when applied to a receive stream
pub fn set_priority(&mut self, priority: i32) -> Result<(), UnknownStream> {
let max_send_data = self.state.max_send_data(self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(UnknownStream { _private: () })?;

stream.priority = priority;
Expand All @@ -317,7 +331,7 @@ impl<'a> SendStream<'a> {
.get(&self.id)
.ok_or(UnknownStream { _private: () })?;

Ok(stream.priority)
Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default())
}
}

Expand Down
22 changes: 12 additions & 10 deletions quinn-proto/src/connection/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::mem;
use thiserror::Error;
use tracing::debug;

use super::state::get_or_insert_recv;
use super::{Retransmits, ShouldTransmit, StreamHalf, StreamId, StreamsState, UnknownStream};
use crate::connection::assembler::{Assembler, Chunk, IllegalOrderedRead};
use crate::{frame, TransportError, VarInt};
Expand All @@ -18,14 +19,14 @@ pub(super) struct Recv {
}

impl Recv {
pub(super) fn new(initial_max_data: u64) -> Self {
Self {
pub(super) fn new(initial_max_data: u64) -> Box<Self> {
Box::new(Self {
state: RecvState::default(),
assembler: Assembler::new(),
sent_max_stream_data: initial_max_data,
end: 0,
stopped: false,
}
})
}

/// Process a STREAM frame
Expand Down Expand Up @@ -215,15 +216,16 @@ impl<'a> Chunks<'a> {
streams: &'a mut StreamsState,
pending: &'a mut Retransmits,
) -> Result<Self, ReadableError> {
let entry = match streams.recv.entry(id) {
let mut entry = match streams.recv.entry(id) {
Entry::Occupied(entry) => entry,
Entry::Vacant(_) => return Err(ReadableError::UnknownStream),
};

let mut recv = match entry.get().stopped {
true => return Err(ReadableError::UnknownStream),
false => entry.remove(),
};
let mut recv =
match get_or_insert_recv(streams.stream_receive_window)(entry.get_mut()).stopped {
true => return Err(ReadableError::UnknownStream),
false => entry.remove().unwrap(), // this can't fail due to the previous get_or_insert_with
};

recv.assembler.ensure_ordering(ordered)?;
Ok(Self {
Expand Down Expand Up @@ -313,7 +315,7 @@ impl<'a> Chunks<'a> {
self.pending.max_stream_data.insert(self.id);
}
// Return the stream to storage for future use
self.streams.recv.insert(self.id, rs);
self.streams.recv.insert(self.id, Some(rs));
}

// Issue connection-level flow control credit for any data we read regardless of state
Expand All @@ -331,7 +333,7 @@ impl<'a> Drop for Chunks<'a> {
}

enum ChunksState {
Readable(Recv),
Readable(Box<Recv>),
Reset(VarInt),
Finished,
Finalized,
Expand Down
6 changes: 3 additions & 3 deletions quinn-proto/src/connection/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ pub(super) struct Send {
}

impl Send {
pub(super) fn new(max_data: VarInt) -> Self {
Self {
pub(super) fn new(max_data: VarInt) -> Box<Self> {
Box::new(Self {
max_data: max_data.into(),
state: SendState::Ready,
pending: SendBuffer::new(),
priority: 0,
fin_pending: false,
connection_blocked: false,
stop_reason: None,
}
})
}

/// Whether the stream has been reset
Expand Down
Loading

0 comments on commit 9156a3e

Please sign in to comment.