-
Notifications
You must be signed in to change notification settings - Fork 236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implements "CloseChan" functionality for "Stream" #115
base: master
Are you sure you want to change the base?
Conversation
@schmichael we noticed that this project has been resurrected a bit, sorry to ping you directly but is this something that could get implemented? we have to make a choice whether we fork yamux soon to implement a few missing pieces for us, so we were delighted to see the project getting a release! |
Here is our fork with a slightly different implementation: https://github.com/desertbit/yamux |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @hadi77ir. This would need some cleaning up before merging.
Even then I'm unsure about accepting this. I believe the same can be accomplished with:
remoteClosed := make(chan struct{})
go func() {
defer close(remoteClosed)
_, err := someStream.Read([]byte{})
if err != nil {
if errors.Is(err, io.EOF) {
// Expected way to close a stream
return
}
// Unexpected error, log or send on a chan
return
}
}
In this example remoteClosed
can be used like the Stream.CloseChan()
you're proposing. The example does not illustrate how to actually process the bytes being read, but I think this approach should still compose well with existing stream processing code.
If users who need a close chan can create one themselves, I'm inclined to not complicate yamux further by adding an additional chan to manage.
Let me know if I'm missing something! I'm happy to have more people using and contributing to yamux, but given the enormous amount of traffic HashiCorp alone sends over it every day we have to be careful what changes we accept.
select { | ||
case <-s.closeChan: | ||
default: | ||
close(s.closeChan) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this check-and-close is non-atomic, not guarded by a mutex, and potentially called from multiple goroutines, the close(s.closeChan)
may panic. Sadly this is a logical race (TOCTOU to be specific) and not a data race, so Go's race detector won't mark it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just learned it from you. Thank you 😅 I'll fix this asap.
defer wg.Done() | ||
_, err := server.AcceptStream() | ||
if err != nil { | ||
t.Fatalf("err: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You cannot call t.Fatalf
from a goroutine in a test: https://pkg.go.dev/testing#T.FailNow (Fatalf calls FailNow)
If you look at some other test funcs you'll see they use an error chan to send errors back to the main test goroutine for checking.
@@ -75,6 +78,7 @@ func newStream(session *Session, id uint32, state streamState) *Stream { | |||
recvNotifyCh: make(chan struct{}, 1), | |||
sendNotifyCh: make(chan struct{}, 1), | |||
establishCh: make(chan struct{}, 1), | |||
closeChan: make(chan struct{}, 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closeChan is only ever closed, so there's no need for it to be buffered.
closeChan: make(chan struct{}, 1), | |
closeChan: make(chan struct{}), |
@schmichael if that’s an acceptable solution then we can work with that, would it make sense to capture that solution in a helper function? Like a /x/ or contrib package to indicate it is not part of the primary library but is more of a frame of reference? |
The problem is my solution doesn't compose with actually processing reads on that stream, so it's only appropriate when this stream is write/send-only (e.g. streaming logs or events and just wanting to gracefully detect when the other side has gone away). Taking a step back, yamux is designed to support this flow:
Since I'm happy to keep reviewing PRs on their merits and am always interested in hearing use cases, but I also want to be clear about yamux's intended scope. |
Alright well you’ve convinced me, thanks for taking the time to explain it, got a much better understanding of the library now 👍 |
For a personal project, I have an interface through which I implement wrappers for yamux, smux and other multiplexers.
The only difference between these were differences in the APIs surrounding the "CloseChan" that I tried to make consistent: "smux" had a channel that was closed once the stream was closed or reset but not for the "Session" (in the latest commit it has been implemented), while in "yamux" this is only implemented for "Session"s and not for "Stream"s.
So far, this is what I've done with the implementation and I think it is a nice thing to have, as it makes it possible to know when the stream has been closed and is of no use (without relying on
io.EOF
and other errors).