Skip to content

Commit

Permalink
Only acquire writeState exclusive lock when resetting connections (#2678
Browse files Browse the repository at this point in the history
)

This is a shared lock across all goroutines, so ideally we limit the
amount of sync points. There is no need to acquire the exclusive lock
when flushing a connection. Instead the shared read lock can be acquired
and individual connection exclusive locks can be acquired to ensure only
a single writer is using the connection at a time.

This is part of a broader fix to prevent deadlocks in the m3msg communication.
Before this change and adding write timeouts, the producer and consumer could deadlock
and stop all messages from flowing. If a producer was sending more messages than
it was acking, it would eventually fill up the TCP buffers and begin to block
writes. If the producer was flushing a connection and holding the exclusive lock,
it would end up blocking with lock. Since the lock was held, the ACKing process
could not clear the buffer and the flush would block forever.
  • Loading branch information
ryanhall07 authored Sep 29, 2020
1 parent 192f611 commit 946161b
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions src/msg/producer/writer/consumer_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (w *consumerWriterImpl) Write(connIndex int, b []byte) error {
_, err := writeConn.w.Write(b)
writeConn.writeLock.Unlock()

// Hold onto the write state lock until done, since flushing and
// Hold onto the write state lock until done, since
// closing connections are done by acquiring the write state lock.
w.writeState.RUnlock()

Expand Down Expand Up @@ -254,13 +254,17 @@ func (w *consumerWriterImpl) flushUntilClose() {
for {
select {
case <-flushTicker.C:
w.writeState.Lock()
w.writeState.RLock()
for _, conn := range w.writeState.conns {
conn.writeLock.Lock()
if err := conn.w.Flush(); err != nil {
w.notifyReset(err)
}
conn.writeLock.Unlock()
}
w.writeState.Unlock()
// Hold onto the write state lock until done, since
// closing connections are done by acquiring the write state lock.
w.writeState.RUnlock()
case <-w.doneCh:
return
}
Expand Down Expand Up @@ -297,8 +301,8 @@ func (w *consumerWriterImpl) resetConnectionUntilClose() {
}

func (w *consumerWriterImpl) resetTooSoon() bool {
w.writeState.Lock()
defer w.writeState.Unlock()
w.writeState.RLock()
defer w.writeState.RUnlock()
return w.nowFn().UnixNano() < w.writeState.lastResetNanos+int64(w.connOpts.ResetDelay())
}

Expand Down Expand Up @@ -393,8 +397,8 @@ func (w *consumerWriterImpl) notifyReset(err error) {
}

func (w *consumerWriterImpl) isClosed() bool {
w.writeState.Lock()
defer w.writeState.Unlock()
w.writeState.RLock()
defer w.writeState.RUnlock()
return w.writeState.closed
}

Expand Down

0 comments on commit 946161b

Please sign in to comment.