-
Notifications
You must be signed in to change notification settings - Fork 236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implements "CloseChan" functionality for "Stream" #115
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -57,6 +57,9 @@ type Stream struct { | |||||
// closeTimer is set with stateLock held to honor the StreamCloseTimeout | ||||||
// setting on Session. | ||||||
closeTimer *time.Timer | ||||||
|
||||||
// closeChan is closed in case of stream being shutdown. | ||||||
closeChan chan struct{} | ||||||
} | ||||||
|
||||||
// newStream is used to construct a new stream within | ||||||
|
@@ -75,6 +78,7 @@ func newStream(session *Session, id uint32, state streamState) *Stream { | |||||
recvNotifyCh: make(chan struct{}, 1), | ||||||
sendNotifyCh: make(chan struct{}, 1), | ||||||
establishCh: make(chan struct{}, 1), | ||||||
closeChan: make(chan struct{}, 1), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. closeChan is only ever closed, so there's no need for it to be buffered.
Suggested change
|
||||||
} | ||||||
s.readDeadline.Store(time.Time{}) | ||||||
s.writeDeadline.Store(time.Time{}) | ||||||
|
@@ -106,11 +110,13 @@ START: | |||||
if s.recvBuf == nil || s.recvBuf.Len() == 0 { | ||||||
s.recvLock.Unlock() | ||||||
s.stateLock.Unlock() | ||||||
s.closeCloseChan() | ||||||
return 0, io.EOF | ||||||
} | ||||||
s.recvLock.Unlock() | ||||||
case streamReset: | ||||||
s.stateLock.Unlock() | ||||||
s.closeCloseChan() | ||||||
return 0, ErrConnectionReset | ||||||
} | ||||||
s.stateLock.Unlock() | ||||||
|
@@ -181,9 +187,11 @@ START: | |||||
fallthrough | ||||||
case streamClosed: | ||||||
s.stateLock.Unlock() | ||||||
s.closeCloseChan() | ||||||
return 0, ErrStreamClosed | ||||||
case streamReset: | ||||||
s.stateLock.Unlock() | ||||||
s.closeCloseChan() | ||||||
return 0, ErrConnectionReset | ||||||
} | ||||||
s.stateLock.Unlock() | ||||||
|
@@ -388,9 +396,29 @@ func (s *Stream) forceClose() { | |||||
s.stateLock.Lock() | ||||||
s.state = streamClosed | ||||||
s.stateLock.Unlock() | ||||||
s.closeCloseChan() | ||||||
s.notifyWaiting() | ||||||
} | ||||||
|
||||||
// CloseChan returns a read-only channel which is closed as | ||||||
// soon as the stream is closed. Note that when it is closed, | ||||||
// doesn't imply that the buffers are empty too. | ||||||
func (s *Stream) CloseChan() <-chan struct{} { | ||||||
return s.closeChan | ||||||
} | ||||||
|
||||||
// IsClosed returns true in case of the stream being closed. | ||||||
// Note that when it is closed, doesn't imply that the buffers | ||||||
// are empty too. | ||||||
func (s *Stream) IsClosed() bool { | ||||||
select { | ||||||
case <-s.closeChan: | ||||||
return true | ||||||
default: | ||||||
return false | ||||||
} | ||||||
} | ||||||
|
||||||
// processFlags is used to update the state of the stream | ||||||
// based on set flags, if any. Lock must be held | ||||||
func (s *Stream) processFlags(flags uint16) error { | ||||||
|
@@ -399,6 +427,7 @@ func (s *Stream) processFlags(flags uint16) error { | |||||
|
||||||
// Close the stream without holding the state lock | ||||||
closeStream := false | ||||||
closeCloseChan := false | ||||||
defer func() { | ||||||
if closeStream { | ||||||
if s.closeTimer != nil { | ||||||
|
@@ -408,6 +437,9 @@ func (s *Stream) processFlags(flags uint16) error { | |||||
|
||||||
s.session.closeStream(s.id) | ||||||
} | ||||||
if closeCloseChan { | ||||||
s.closeCloseChan() | ||||||
} | ||||||
}() | ||||||
|
||||||
if flags&flagACK == flagACK { | ||||||
|
@@ -438,6 +470,7 @@ func (s *Stream) processFlags(flags uint16) error { | |||||
if flags&flagRST == flagRST { | ||||||
s.state = streamReset | ||||||
closeStream = true | ||||||
closeCloseChan = true | ||||||
s.notifyWaiting() | ||||||
} | ||||||
return nil | ||||||
|
@@ -542,3 +575,13 @@ func (s *Stream) Shrink() { | |||||
} | ||||||
s.recvLock.Unlock() | ||||||
} | ||||||
|
||||||
// closeCloseChan closes the closeChan, to mark the end of | ||||||
// stream lifetime. | ||||||
func (s *Stream) closeCloseChan() { | ||||||
select { | ||||||
case <-s.closeChan: | ||||||
default: | ||||||
close(s.closeChan) | ||||||
} | ||||||
Comment on lines
+582
to
+586
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this check-and-close is non-atomic, not guarded by a mutex, and potentially called from multiple goroutines, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just learned it from you. Thank you 😅 I'll fix this asap. |
||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You cannot call
t.Fatalf
from a goroutine in a test: https://pkg.go.dev/testing#T.FailNow (Fatalf calls FailNow)If you look at some other test funcs you'll see they use an error chan to send errors back to the main test goroutine for checking.