Skip to content

Commit

Permalink
workerpool: limit the rate to output deadlock warning (#3775)
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus authored Dec 8, 2021
1 parent bab536b commit efe0c36
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
20 changes: 17 additions & 3 deletions pkg/workerpool/pool_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/ticdc/pkg/notify"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -298,13 +299,19 @@ type worker struct {
isRunning int32
// notifies exits of run()
stopNotifier notify.Notifier

slowSynchronizeThreshold time.Duration
slowSynchronizeLimiter *rate.Limiter
}

func newWorker() *worker {
return &worker{
taskCh: make(chan task, 128),
handles: make(map[*defaultEventHandle]struct{}),
handleCancelCh: make(chan struct{}), // this channel must be unbuffered, i.e. blocking

slowSynchronizeThreshold: 10 * time.Second,
slowSynchronizeLimiter: rate.NewLimiter(rate.Every(time.Second*5), 1),
}
}

Expand Down Expand Up @@ -398,13 +405,20 @@ func (w *worker) synchronize() {
break
}

if time.Since(startTime) > time.Second*10 {
// likely the workerpool has deadlocked, or there is a bug in the event handlers.
log.Warn("synchronize is taking too long, report a bug", zap.Duration("elapsed", time.Since(startTime)))
if time.Since(startTime) > w.slowSynchronizeThreshold &&
w.slowSynchronizeLimiter.Allow() {
// likely the workerpool has deadlocked, or there is a bug
// in the event handlers.
logWarn("synchronize is taking too long, report a bug",
zap.Duration("elapsed", time.Since(startTime)),
zap.Stack("stacktrace"))
}
}
}

// A delegate to log.Warn. It exists only for testing.
var logWarn = log.Warn

func (w *worker) addHandle(handle *defaultEventHandle) {
w.handleRWLock.Lock()
defer w.handleRWLock.Unlock()
Expand Down
33 changes: 33 additions & 0 deletions pkg/workerpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)

func TestTaskError(t *testing.T) {
Expand Down Expand Up @@ -507,6 +508,38 @@ func TestGracefulUnregisterTimeout(t *testing.T) {
require.Truef(t, cerror.ErrWorkerPoolGracefulUnregisterTimedOut.Equal(err), "%s", err.Error())
}

func TestSynchronizeLog(t *testing.T) {
w := newWorker()
w.isRunning = 1
// Always report "synchronize is taking too long".
w.slowSynchronizeThreshold = time.Duration(0)
w.slowSynchronizeLimiter = rate.NewLimiter(rate.Every(100*time.Minute), 1)

counter := int32(0)
logWarn = func(msg string, fields ...zap.Field) {
atomic.AddInt32(&counter, 1)
}
defer func() { logWarn = log.Warn }()

doneCh := make(chan struct{})
go func() {
w.synchronize()
close(doneCh)
}()

time.Sleep(300 * time.Millisecond)
w.stopNotifier.Notify()
time.Sleep(300 * time.Millisecond)
w.stopNotifier.Notify()

// Close worker.
atomic.StoreInt32(&w.isRunning, 0)
w.stopNotifier.Close()
<-doneCh

require.EqualValues(t, 1, atomic.LoadInt32(&counter))
}

// Benchmark workerpool with ping-pong workflow.
// go test -benchmem -run='^$' -bench '^(BenchmarkWorkerpool)$' github.com/pingcap/ticdc/pkg/workerpool
func BenchmarkWorkerpool(b *testing.B) {
Expand Down

0 comments on commit efe0c36

Please sign in to comment.