diff --git a/neqo-client/src/main.rs b/neqo-client/src/main.rs index 7584697a47..17cae92da1 100644 --- a/neqo-client/src/main.rs +++ b/neqo-client/src/main.rs @@ -1170,12 +1170,12 @@ mod old { Ok(()) } - fn read(&mut self, client: &mut Connection, stream_id: StreamId) -> Res { + fn read(&mut self, client: &mut Connection, stream_id: StreamId) -> Res<()> { let mut maybe_maybe_out_file = self.streams.get_mut(&stream_id); match &mut maybe_maybe_out_file { None => { println!("Data on unexpected stream: {stream_id}"); - return Ok(false); + return Ok(()); } Some(maybe_out_file) => { let fin_recvd = Self::read_from_stream( @@ -1191,25 +1191,15 @@ mod old { } self.streams.remove(&stream_id); self.download_urls(client); - if self.streams.is_empty() && self.url_queue.is_empty() { - return Ok(false); - } } } } - Ok(true) - } - - /// Just in case we didn't get a resumption token event, this - /// iterates through events until one is found. - fn get_token(&mut self, client: &mut Connection) { - for event in client.events() { - if let ConnectionEvent::ResumptionToken(token) = event { - self.token = Some(token); - } - } + Ok(()) } + /// Handle events on the connection. + /// + /// Returns `Ok(true)` when done, i.e. url queue is empty and streams are closed. fn handle(&mut self, client: &mut Connection) -> Res { while let Some(event) = client.next_event() { match event { @@ -1217,11 +1207,7 @@ mod old { client.authenticated(AuthenticationStatus::Ok, Instant::now()); } ConnectionEvent::RecvStreamReadable { stream_id } => { - if !self.read(client, stream_id)? { - self.get_token(client); - client.close(Instant::now(), 0, "kthxbye!"); - return Ok(false); - }; + self.read(client, stream_id)?; } ConnectionEvent::SendStreamWritable { stream_id } => { println!("stream {stream_id} writable"); @@ -1253,7 +1239,12 @@ mod old { } } - Ok(true) + if self.streams.is_empty() && self.url_queue.is_empty() { + // Handler is done. + return Ok(true); + } + + Ok(false) } } @@ -1324,12 +1315,29 @@ mod old { pub async fn run(mut self) -> Res> { loop { - if !self.handler.handle(&mut self.client)? { - break; + let handler_done = self.handler.handle(&mut self.client)?; + + match (handler_done, self.args.resume, self.handler.token.is_some()) { + // Handler isn't done. Continue. + (false, _, _) => {}, + // Handler done. Resumption token needed but not present. Continue. + (true, true, false) => { + qdebug!("Handler done. Waiting for resumption token."); + } + // Handler is done, no resumption token needed. Close. + (true, false, _) | + // Handler is done, resumption token needed and present. Close. + (true, true, true) => { + self.client.close(Instant::now(), 0, "kthxbye!"); + } } self.process(None).await?; + if let State::Closed(..) = self.client.state() { + return Ok(self.handler.token.take()); + } + match ready(self.socket, self.timeout.as_mut()).await? { Ready::Socket => loop { let dgram = self.socket.recv(&self.local_addr)?; @@ -1343,25 +1351,7 @@ mod old { self.timeout = None; } } - - if let State::Closed(..) = self.client.state() { - break; - } } - - let token = if self.args.resume { - // If we haven't received an event, take a token if there is one. - // Lots of servers don't provide NEW_TOKEN, but a session ticket - // without NEW_TOKEN is better than nothing. - self.handler - .token - .take() - .or_else(|| self.client.take_resumption_token(Instant::now())) - } else { - None - }; - - Ok(token) } async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> {