diff --git a/grpc/grpc.go b/grpc/grpc.go index b83a485..9e35354 100644 --- a/grpc/grpc.go +++ b/grpc/grpc.go @@ -131,7 +131,7 @@ func (mi *ModuleInstance) stream(c goja.ConstructorCall) *goja.Object { instanceMetrics: mi.metrics, builtinMetrics: mi.vu.State().BuiltinMetrics, done: make(chan struct{}), - state: opened, + writing: opened, writeQueueCh: make(chan message), diff --git a/grpc/stream.go b/grpc/stream.go index e89eaf4..478e9be 100644 --- a/grpc/stream.go +++ b/grpc/stream.go @@ -28,7 +28,6 @@ type message struct { const ( opened = iota + 1 - closing closed ) @@ -51,8 +50,8 @@ type stream struct { obj *goja.Object // the object that is given to js to interact with the stream - state int8 - done chan struct{} + writing int8 + done chan struct{} writeQueueCh chan message @@ -185,17 +184,19 @@ func (s *stream) readData(wg *sync.WaitGroup) { return } - if len(msg) == 0 && isRegularClosing(err) { + if len(msg) > 0 { + s.queueMessage(msg) + } + + if isRegularClosing(err) { s.logger.WithError(err).Debug("stream is cancelled/finished") s.tq.Queue(func() error { - return s.closeWithError(nil) + return s.closeWithError(err) }) return } - - s.queueMessage(msg) } } @@ -299,7 +300,7 @@ func (s *stream) on(event string, listener func(goja.Value) (goja.Value, error)) // write writes a message to the stream func (s *stream) write(input goja.Value) { - if s.state != opened { + if s.writing != opened { return } @@ -320,11 +321,13 @@ func (s *stream) write(input goja.Value) { // end closes client the stream func (s *stream) end() { - if s.state == closed || s.state == closing { + if s.writing == closed { return } - s.state = closing + s.logger.Debug("stream is closing") + + s.writing = closed s.writeQueueCh <- message{isClosing: true} } @@ -334,15 +337,13 @@ func (s *stream) closeWithError(err error) error { return s.callErrorListeners(err) } -// close changes the stream state to closed and triggers the end event listeners +// close closes the stream and call end event listeners +// Note: in the regular closing the io.EOF could come func (s *stream) close(err error) { - if s.state == closed { + if err == nil { return } - s.logger.WithError(err).Debug("stream is closing") - - s.state = closed close(s.done) s.tq.Queue(func() error { return s.callEventListeners(eventEnd) @@ -354,7 +355,7 @@ func (s *stream) close(err error) { } func (s *stream) callErrorListeners(e error) error { - if e == nil { + if e == nil || errors.Is(e, io.EOF) { return nil }