Skip to content

Commit

Permalink
fix(iroh-gossip): connection loop misuses tokio::select! leading to…
Browse files Browse the repository at this point in the history
… read errors (#2572)

## Description

The connection loop of iroh-gossip misused tokio-select by selecting
over a future that is not cancellation safe. This means that if the
timings are bad, the message reading future would be aborted midway in a
message, and then restart by reading a length, which would then yield
some random number because it would be reading some random bytes in the
middle of a message. This means it would lead to random connection
drops.

## Breaking Changes

Backport from #2570 

<!-- Optional, if there are any breaking changes document them,
including how to migrate older code. -->

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [ ] ~~Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.~~
- [ ] ~~Tests if relevant.~~
- [x] All breaking changes documented.
  • Loading branch information
Frando authored Aug 1, 2024
1 parent 8e4e586 commit 32bb0f3
Showing 1 changed file with 17 additions and 18 deletions.
35 changes: 17 additions & 18 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,27 +652,26 @@ async fn connection_loop(
};
let mut send_buf = BytesMut::new();
let mut recv_buf = BytesMut::new();
loop {
tokio::select! {
biased;
// If `send_rx` is closed,
// stop selecting it but don't quit.
// We are not going to use connection for sending anymore,
// but the other side may still want to use it to
// send data to us.
Some(msg) = send_rx.recv(), if !send_rx.is_closed() => {
write_message(&mut send, &mut send_buf, &msg, max_message_size).await?
}
let send_loop = async {
while let Some(msg) = send_rx.recv().await {
write_message(&mut send, &mut send_buf, &msg, max_message_size).await?
}
Ok::<_, anyhow::Error>(())
};

msg = read_message(&mut recv, &mut recv_buf, max_message_size) => {
let msg = msg?;
match msg {
None => break,
Some(msg) => in_event_tx.send(InEvent::RecvMessage(from, msg)).await?
}
let recv_loop = async {
loop {
let msg = read_message(&mut recv, &mut recv_buf, max_message_size).await?;
match msg {
None => break,
Some(msg) => in_event_tx.send(InEvent::RecvMessage(from, msg)).await?,
}
}
}
Ok::<_, anyhow::Error>(())
};

tokio::try_join!(send_loop, recv_loop)?;

Ok(())
}

Expand Down

0 comments on commit 32bb0f3

Please sign in to comment.