diff --git a/src/msg/producer/writer/consumer_writer.go b/src/msg/producer/writer/consumer_writer.go index f7cb7a3e43..b6afb578df 100644 --- a/src/msg/producer/writer/consumer_writer.go +++ b/src/msg/producer/writer/consumer_writer.go @@ -212,7 +212,7 @@ func (w *consumerWriterImpl) Write(connIndex int, b []byte) error { _, err := writeConn.w.Write(b) writeConn.writeLock.Unlock() - // Hold onto the write state lock until done, since flushing and + // Hold onto the write state lock until done, since // closing connections are done by acquiring the write state lock. w.writeState.RUnlock() @@ -254,13 +254,17 @@ func (w *consumerWriterImpl) flushUntilClose() { for { select { case <-flushTicker.C: - w.writeState.Lock() + w.writeState.RLock() for _, conn := range w.writeState.conns { + conn.writeLock.Lock() if err := conn.w.Flush(); err != nil { w.notifyReset(err) } + conn.writeLock.Unlock() } - w.writeState.Unlock() + // Hold onto the write state lock until done, since + // closing connections are done by acquiring the write state lock. + w.writeState.RUnlock() case <-w.doneCh: return } @@ -297,8 +301,8 @@ func (w *consumerWriterImpl) resetConnectionUntilClose() { } func (w *consumerWriterImpl) resetTooSoon() bool { - w.writeState.Lock() - defer w.writeState.Unlock() + w.writeState.RLock() + defer w.writeState.RUnlock() return w.nowFn().UnixNano() < w.writeState.lastResetNanos+int64(w.connOpts.ResetDelay()) } @@ -393,8 +397,8 @@ func (w *consumerWriterImpl) notifyReset(err error) { } func (w *consumerWriterImpl) isClosed() bool { - w.writeState.Lock() - defer w.writeState.Unlock() + w.writeState.RLock() + defer w.writeState.RUnlock() return w.writeState.closed }