Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(client): wait for resumption token #1676

Merged
merged 1 commit into from
Feb 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 32 additions & 42 deletions neqo-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1170,12 +1170,12 @@ mod old {
Ok(())
}

fn read(&mut self, client: &mut Connection, stream_id: StreamId) -> Res<bool> {
fn read(&mut self, client: &mut Connection, stream_id: StreamId) -> Res<()> {
let mut maybe_maybe_out_file = self.streams.get_mut(&stream_id);
match &mut maybe_maybe_out_file {
None => {
println!("Data on unexpected stream: {stream_id}");
return Ok(false);
return Ok(());
}
Some(maybe_out_file) => {
let fin_recvd = Self::read_from_stream(
Expand All @@ -1191,37 +1191,23 @@ mod old {
}
self.streams.remove(&stream_id);
self.download_urls(client);
if self.streams.is_empty() && self.url_queue.is_empty() {
return Ok(false);
}
}
}
}
Ok(true)
}

/// Just in case we didn't get a resumption token event, this
/// iterates through events until one is found.
fn get_token(&mut self, client: &mut Connection) {
for event in client.events() {
if let ConnectionEvent::ResumptionToken(token) = event {
self.token = Some(token);
}
}
Ok(())
}

/// Handle events on the connection.
///
/// Returns `Ok(true)` when done, i.e. url queue is empty and streams are closed.
fn handle(&mut self, client: &mut Connection) -> Res<bool> {
while let Some(event) = client.next_event() {
match event {
ConnectionEvent::AuthenticationNeeded => {
client.authenticated(AuthenticationStatus::Ok, Instant::now());
}
ConnectionEvent::RecvStreamReadable { stream_id } => {
if !self.read(client, stream_id)? {
self.get_token(client);
client.close(Instant::now(), 0, "kthxbye!");
return Ok(false);
};
self.read(client, stream_id)?;
}
ConnectionEvent::SendStreamWritable { stream_id } => {
println!("stream {stream_id} writable");
Expand Down Expand Up @@ -1253,7 +1239,12 @@ mod old {
}
}

Ok(true)
if self.streams.is_empty() && self.url_queue.is_empty() {
// Handler is done.
return Ok(true);
}

Ok(false)
}
}

Expand Down Expand Up @@ -1324,12 +1315,29 @@ mod old {

pub async fn run(mut self) -> Res<Option<ResumptionToken>> {
loop {
if !self.handler.handle(&mut self.client)? {
break;
let handler_done = self.handler.handle(&mut self.client)?;

match (handler_done, self.args.resume, self.handler.token.is_some()) {
// Handler isn't done. Continue.
(false, _, _) => {},
// Handler done. Resumption token needed but not present. Continue.
(true, true, false) => {
qdebug!("Handler done. Waiting for resumption token.");
}
// Handler is done, no resumption token needed. Close.
(true, false, _) |
// Handler is done, resumption token needed and present. Close.
(true, true, true) => {
self.client.close(Instant::now(), 0, "kthxbye!");
}
}

self.process(None).await?;

if let State::Closed(..) = self.client.state() {
return Ok(self.handler.token.take());
}

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgram = self.socket.recv(&self.local_addr)?;
Expand All @@ -1343,25 +1351,7 @@ mod old {
self.timeout = None;
}
}

if let State::Closed(..) = self.client.state() {
break;
}
}

let token = if self.args.resume {
// If we haven't received an event, take a token if there is one.
// Lots of servers don't provide NEW_TOKEN, but a session ticket
// without NEW_TOKEN is better than nothing.
self.handler
.token
.take()
.or_else(|| self.client.take_resumption_token(Instant::now()))
} else {
None
};

Ok(token)
}

async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> {
Expand Down
Loading