From 946161b49f0a5513f5ba10ba29a9a6c652ea54be Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Tue, 29 Sep 2020 10:54:43 -0700 Subject: [PATCH] Only acquire writeState exclusive lock when resetting connections (#2678) This is a shared lock across all goroutines, so ideally we limit the amount of sync points. There is no need to acquire the exclusive lock when flushing a connection. Instead the shared read lock can be acquired and individual connection exclusive locks can be acquired to ensure only a single writer is using the connection at a time. This is part of a broader fix to prevent deadlocks in the m3msg communication. Before this change and adding write timeouts, the producer and consumer could deadlock and stop all messages from flowing. If a producer was sending more messages than it was acking, it would eventually fill up the TCP buffers and begin to block writes. If the producer was flushing a connection and holding the exclusive lock, it would end up blocking with lock. Since the lock was held, the ACKing process could not clear the buffer and the flush would block forever. --- src/msg/producer/writer/consumer_writer.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/msg/producer/writer/consumer_writer.go b/src/msg/producer/writer/consumer_writer.go index f7cb7a3e43..b6afb578df 100644 --- a/src/msg/producer/writer/consumer_writer.go +++ b/src/msg/producer/writer/consumer_writer.go @@ -212,7 +212,7 @@ func (w *consumerWriterImpl) Write(connIndex int, b []byte) error { _, err := writeConn.w.Write(b) writeConn.writeLock.Unlock() - // Hold onto the write state lock until done, since flushing and + // Hold onto the write state lock until done, since // closing connections are done by acquiring the write state lock. w.writeState.RUnlock() @@ -254,13 +254,17 @@ func (w *consumerWriterImpl) flushUntilClose() { for { select { case <-flushTicker.C: - w.writeState.Lock() + w.writeState.RLock() for _, conn := range w.writeState.conns { + conn.writeLock.Lock() if err := conn.w.Flush(); err != nil { w.notifyReset(err) } + conn.writeLock.Unlock() } - w.writeState.Unlock() + // Hold onto the write state lock until done, since + // closing connections are done by acquiring the write state lock. + w.writeState.RUnlock() case <-w.doneCh: return } @@ -297,8 +301,8 @@ func (w *consumerWriterImpl) resetConnectionUntilClose() { } func (w *consumerWriterImpl) resetTooSoon() bool { - w.writeState.Lock() - defer w.writeState.Unlock() + w.writeState.RLock() + defer w.writeState.RUnlock() return w.nowFn().UnixNano() < w.writeState.lastResetNanos+int64(w.connOpts.ResetDelay()) } @@ -393,8 +397,8 @@ func (w *consumerWriterImpl) notifyReset(err error) { } func (w *consumerWriterImpl) isClosed() bool { - w.writeState.Lock() - defer w.writeState.Unlock() + w.writeState.RLock() + defer w.writeState.RUnlock() return w.writeState.closed }