From 85ea1e93ffd8933871cffec89224dd8fdca9e9ee Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sun, 25 Feb 2024 17:19:43 +0100 Subject: [PATCH] fix(client): wait for resumption token The QUIC Interop testcase `resumption` requires a client to download two files on two consecutive connections, establishing the second connection with the resumption token of the first. Previously the client would close the first connection once the first file is downloaded. There is a race condition where the first file might be finished downloading before receiving the resumption token from the server. With this commit, the client will wait for both (1) the file being downloaded and (2) receiving the resumption token from the server. --- neqo-client/src/main.rs | 74 ++++++++++++++++++----------------------- 1 file changed, 32 insertions(+), 42 deletions(-) diff --git a/neqo-client/src/main.rs b/neqo-client/src/main.rs index 7584697a47..17cae92da1 100644 --- a/neqo-client/src/main.rs +++ b/neqo-client/src/main.rs @@ -1170,12 +1170,12 @@ mod old { Ok(()) } - fn read(&mut self, client: &mut Connection, stream_id: StreamId) -> Res { + fn read(&mut self, client: &mut Connection, stream_id: StreamId) -> Res<()> { let mut maybe_maybe_out_file = self.streams.get_mut(&stream_id); match &mut maybe_maybe_out_file { None => { println!("Data on unexpected stream: {stream_id}"); - return Ok(false); + return Ok(()); } Some(maybe_out_file) => { let fin_recvd = Self::read_from_stream( @@ -1191,25 +1191,15 @@ mod old { } self.streams.remove(&stream_id); self.download_urls(client); - if self.streams.is_empty() && self.url_queue.is_empty() { - return Ok(false); - } } } } - Ok(true) - } - - /// Just in case we didn't get a resumption token event, this - /// iterates through events until one is found. - fn get_token(&mut self, client: &mut Connection) { - for event in client.events() { - if let ConnectionEvent::ResumptionToken(token) = event { - self.token = Some(token); - } - } + Ok(()) } + /// Handle events on the connection. + /// + /// Returns `Ok(true)` when done, i.e. url queue is empty and streams are closed. fn handle(&mut self, client: &mut Connection) -> Res { while let Some(event) = client.next_event() { match event { @@ -1217,11 +1207,7 @@ mod old { client.authenticated(AuthenticationStatus::Ok, Instant::now()); } ConnectionEvent::RecvStreamReadable { stream_id } => { - if !self.read(client, stream_id)? { - self.get_token(client); - client.close(Instant::now(), 0, "kthxbye!"); - return Ok(false); - }; + self.read(client, stream_id)?; } ConnectionEvent::SendStreamWritable { stream_id } => { println!("stream {stream_id} writable"); @@ -1253,7 +1239,12 @@ mod old { } } - Ok(true) + if self.streams.is_empty() && self.url_queue.is_empty() { + // Handler is done. + return Ok(true); + } + + Ok(false) } } @@ -1324,12 +1315,29 @@ mod old { pub async fn run(mut self) -> Res> { loop { - if !self.handler.handle(&mut self.client)? { - break; + let handler_done = self.handler.handle(&mut self.client)?; + + match (handler_done, self.args.resume, self.handler.token.is_some()) { + // Handler isn't done. Continue. + (false, _, _) => {}, + // Handler done. Resumption token needed but not present. Continue. + (true, true, false) => { + qdebug!("Handler done. Waiting for resumption token."); + } + // Handler is done, no resumption token needed. Close. + (true, false, _) | + // Handler is done, resumption token needed and present. Close. + (true, true, true) => { + self.client.close(Instant::now(), 0, "kthxbye!"); + } } self.process(None).await?; + if let State::Closed(..) = self.client.state() { + return Ok(self.handler.token.take()); + } + match ready(self.socket, self.timeout.as_mut()).await? { Ready::Socket => loop { let dgram = self.socket.recv(&self.local_addr)?; @@ -1343,25 +1351,7 @@ mod old { self.timeout = None; } } - - if let State::Closed(..) = self.client.state() { - break; - } } - - let token = if self.args.resume { - // If we haven't received an event, take a token if there is one. - // Lots of servers don't provide NEW_TOKEN, but a session ticket - // without NEW_TOKEN is better than nothing. - self.handler - .token - .take() - .or_else(|| self.client.take_resumption_token(Instant::now())) - } else { - None - }; - - Ok(token) } async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> {