Skip to content
This repository has been archived by the owner on May 21, 2024. It is now read-only.

Commit

Permalink
Avoid closing the stream if server still open
Browse files Browse the repository at this point in the history
  • Loading branch information
olegbespalov committed Sep 4, 2023
1 parent 9da0f76 commit 5a49a61
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
2 changes: 1 addition & 1 deletion grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (mi *ModuleInstance) stream(c goja.ConstructorCall) *goja.Object {
instanceMetrics: mi.metrics,
builtinMetrics: mi.vu.State().BuiltinMetrics,
done: make(chan struct{}),
state: opened,
writing: opened,

writeQueueCh: make(chan message),

Expand Down
33 changes: 17 additions & 16 deletions grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type message struct {

const (
opened = iota + 1
closing
closed
)

Expand All @@ -51,8 +50,8 @@ type stream struct {

obj *goja.Object // the object that is given to js to interact with the stream

state int8
done chan struct{}
writing int8
done chan struct{}

writeQueueCh chan message

Expand Down Expand Up @@ -185,17 +184,19 @@ func (s *stream) readData(wg *sync.WaitGroup) {
return
}

if len(msg) == 0 && isRegularClosing(err) {
if len(msg) > 0 {
s.queueMessage(msg)
}

if isRegularClosing(err) {
s.logger.WithError(err).Debug("stream is cancelled/finished")

s.tq.Queue(func() error {
return s.closeWithError(nil)
return s.closeWithError(err)
})

return
}

s.queueMessage(msg)
}
}

Expand Down Expand Up @@ -299,7 +300,7 @@ func (s *stream) on(event string, listener func(goja.Value) (goja.Value, error))

// write writes a message to the stream
func (s *stream) write(input goja.Value) {
if s.state != opened {
if s.writing != opened {
return
}

Expand All @@ -320,11 +321,13 @@ func (s *stream) write(input goja.Value) {

// end closes client the stream
func (s *stream) end() {
if s.state == closed || s.state == closing {
if s.writing == closed {
return
}

s.state = closing
s.logger.Debug("stream is closing")

s.writing = closed
s.writeQueueCh <- message{isClosing: true}
}

Expand All @@ -334,15 +337,13 @@ func (s *stream) closeWithError(err error) error {
return s.callErrorListeners(err)
}

// close changes the stream state to closed and triggers the end event listeners
// close closes the stream and call end event listeners
// Note: in the regular closing the io.EOF could come
func (s *stream) close(err error) {
if s.state == closed {
if err == nil {
return
}

s.logger.WithError(err).Debug("stream is closing")

s.state = closed
close(s.done)
s.tq.Queue(func() error {
return s.callEventListeners(eventEnd)
Expand All @@ -354,7 +355,7 @@ func (s *stream) close(err error) {
}

func (s *stream) callErrorListeners(e error) error {
if e == nil {
if e == nil || errors.Is(e, io.EOF) {
return nil
}

Expand Down

0 comments on commit 5a49a61

Please sign in to comment.