-
Notifications
You must be signed in to change notification settings - Fork 984
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(quic): allow closing stream after remote dropped it #3164
Conversation
- Don't call `quinn_proto::SendStream::finish` if the stream is already closing. - Only stop a stream on drop if the remote did not finish it yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @elenaf9. I will test this out in a bit.
I deployed this to kademlia-exporter.max-inden.de/ and successfully connected to it via a patched https://github.com/mxinden/libp2p-lookup. |
Nice! Do I understand it correctly that before it always failed due to the issue in identify? Or was it a flaky problem and we need to do some more testing before we can confirm #3144 is fixed? |
Correct. It was consistently failing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small suggestion. Otherwise looks good to me. Thanks for the patch and the unit tests!
/// `true` if the stream finished, i.e. the writing side closed. | ||
/// `true` if the writing side of the stream is closing, i.e. `AsyncWrite::poll_close` | ||
/// was called. | ||
pub is_finishing: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub is_finishing: bool, | |
pub is_write_closing: bool, |
Would that not be more intuitive / consistent with is_write_closed
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative would be to introduce an enum:
write_state: WriteState,
// ...
enum WriteState {
Open,
Closing,
Closed,
}
I don't have a strong opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finish
is QUIC terminology right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like WriteState
. Added in c6256e1.
finish
is QUIC terminology right?
Not really QUIC terminology, but used in quinn
. I stuck with it to avoid confusion on inner events like StreamEvent::Finished
. But thinking about it again I think using the AsyncWrite
terminology of close
is more intuitive.
transports/quic/tests/smoke.rs
Outdated
let too_much_data = vec![0; 10]; | ||
|
||
assert!( | ||
async_std::future::timeout(Duration::from_secs(1), stream_a.write_all(&too_much_data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usage of a timeout in a unit test is unfortunate. That said, I can not think of an alternative, and I think it is worth it. Thus I suggest let's keep as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could assert that it returns Poll::Pending
instead and Poll::Ready
once you read from the other side again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could assert that it returns
Poll::Pending
instead andPoll::Ready
once you read from the other side again.
👍. Done in 9f2a7ee.
@@ -182,6 +183,66 @@ fn concurrent_connections_and_streams_tokio() { | |||
.quickcheck(prop::<quic::tokio::Provider> as fn(_, _) -> _); | |||
} | |||
|
|||
#[cfg(feature = "async-std")] | |||
#[async_std::test] | |||
async fn backpressure() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! A few suggestions!
#[cfg(feature = "async-std")] | ||
#[async_std::test] | ||
async fn read_after_peer_dropped_stream() { | ||
let _ = env_logger::try_init(); | ||
let (mut stream_a, mut stream_b) = build_streams::<quic::async_std::Provider>().await; | ||
|
||
let data = vec![0; 10]; | ||
|
||
stream_b.close().now_or_never(); | ||
stream_a.write_all(&data).await.unwrap(); | ||
stream_a.close().await.unwrap(); | ||
drop(stream_a); | ||
|
||
stream_b.close().await.unwrap(); | ||
let mut buf = Vec::new(); | ||
stream_b.read_to_end(&mut buf).await.unwrap(); | ||
assert_eq!(data, buf) | ||
} | ||
|
||
#[cfg(feature = "async-std")] | ||
#[async_std::test] | ||
#[should_panic] | ||
async fn write_after_peer_dropped_stream() { | ||
let _ = env_logger::try_init(); | ||
let (stream_a, mut stream_b) = build_streams::<quic::async_std::Provider>().await; | ||
drop(stream_a); | ||
futures_timer::Delay::new(Duration::from_millis(1)).await; | ||
|
||
let data = vec![0; 10]; | ||
stream_b.write_all(&data).await.expect("Write failed."); | ||
stream_b.close().await.expect("Close failed."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move these to the muxer test harness? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am in favor of moving them to the test harness.
However, contrary to the other tests in the harness these tests test a specific sequential order in which alice and bob do their calls, rather than the things running in parallel. So I think it would require some more "infrastructure" code and ideally we'd also add it for the other muxers. Since I don't want to block this PR on this I would propose to do it in a follow up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, that makes sense. I didn't consider that :)
Pushing the boundaries of the harness here 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracked the idea in issue #3211.
transports/quic/tests/smoke.rs
Outdated
let too_much_data = vec![0; 10]; | ||
|
||
assert!( | ||
async_std::future::timeout(Duration::from_secs(1), stream_a.write_all(&too_much_data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could assert that it returns Poll::Pending
instead and Poll::Ready
once you read from the other side again.
|
||
let data = vec![0; 10]; | ||
stream_b.write_all(&data).await.expect("Write failed."); | ||
stream_b.close().await.expect("Close failed."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the line that will panic? I'd like to see an unwrap_err
then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately this is not deterministic; it may either panic on the write_all
or on the close
, depending on whether the STOP_SENDING
frame made it to B before B starts writing or not.
/// `true` if the stream finished, i.e. the writing side closed. | ||
/// `true` if the writing side of the stream is closing, i.e. `AsyncWrite::poll_close` | ||
/// was called. | ||
pub is_finishing: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finish
is QUIC terminology right?
A few suggestions regarding the PR description/title:
I also often reference this blog post: https://cbea.ms/git-commit/ If we follow these guidelines, all squash commits will be nice commits |
Agree with all but nr 3. I personally prefer bullet points to avoid unnecessary filler words, at least if no additional explanation is needed 🙂 Edit: Ah you might be referring to the formatting and not the phrasing itself. I don't feel strongly about it, but why not use bullet points? |
Adding Thanks for prioritizing this @elenaf9 🙏 |
I totally get what you are saying and I agree with the gist of it. The issue with bullet points can be that they are often too concise. It is sometimes nice to read a little paragraph that summarizes what is happening and why. Often, I start the commit message with: "Previously, our behaviour was XYZ. That caused problem ABC. We fix this by doing 1, 2 and 3." That could or could not involve bulletpoints but it makes it clear what is the description of the original behaviour, what is the problem and what is the fix. I don't think there are too many unnecessary filler words in there but YMMV :) Here is a recent example: #3152 I don't feel too strongly about it, just noticed it and wanted to start a discussion to see where everyone is at! |
Description
STOP_SENDING
on a stream when dropping it if the remote did not finish the stream yet.quinn_proto::SendStream::finish
once. (A second call to it will always fail. Though I don't think this was the issue in transports/quic:STOP_SENDING
failspoll_close
#3144.)Fixes #3144.
Links to any relevant issues
Fixes #3144 following proposed idea nr.2 from @mxinden.
I implemented it slightly different compared to quinn, since quinn only sets the
all_data_read
flag when the stream finishes while reading. In identify however we might receive a FIN for a stream on which we are not reading anymore, thus what we do now is in thedrop
implementation we manually check the receive stream again if it has finished.Change checklist