Skip to content

Commit

Permalink
etcd_worker: fix missed watch events caused by progress notifications (
Browse files Browse the repository at this point in the history
…pingcap#3848)

(cherry picked from commit 06547d9)
  • Loading branch information
liuzix authored and asddongmen committed Dec 20, 2021
1 parent ea04f44 commit 7f6507b
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 2 deletions.
77 changes: 75 additions & 2 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package etcd

import (
"context"
"time"

"github.com/benbjohnson/clock"
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerrors "github.com/pingcap/tiflow/pkg/errors"
Expand All @@ -41,6 +43,14 @@ const (
backoffBaseDelayInMs = 500
// in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second
backoffMaxDelayInMs = 60 * 1000
// If no msg comes from a etcd watchCh for etcdWatchChTimeoutDuration long,
// we should cancel the watchCh and request a new watchCh from etcd client
etcdWatchChTimeoutDuration = 10 * time.Second
// If no msg comes from a etcd watchCh for etcdRequestProgressDuration long,
// we should call RequestProgress of etcd client
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
)

// set to var instead of const for mocking the value to speedup test
Expand All @@ -50,11 +60,13 @@ var maxTries int64 = 8
type Client struct {
cli *clientv3.Client
metrics map[string]prometheus.Counter
// clock is for making it easier to mock time-related data structures in unit tests
clock clock.Clock
}

// Wrap warps a clientv3.Client that provides etcd APIs required by TiCDC.
func Wrap(cli *clientv3.Client, metrics map[string]prometheus.Counter) *Client {
return &Client{cli: cli, metrics: metrics}
return &Client{cli: cli, metrics: metrics, clock: clock.New()}
}

// Unwrap returns a clientv3.Client
Expand Down Expand Up @@ -165,7 +177,68 @@ func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ..

// Watch delegates request to clientv3.Watcher.Watch
func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
return c.cli.Watch(ctx, key, opts...)
watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize)
go c.WatchWithChan(ctx, watchCh, key, opts...)
return watchCh
}

// WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh
func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, opts ...clientv3.OpOption) {
defer func() {
close(outCh)
log.Info("WatchWithChan exited")
}()
var lastRevision int64
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
watchCh := c.cli.Watch(watchCtx, key, opts...)

ticker := c.clock.Ticker(etcdRequestProgressDuration)
defer ticker.Stop()
lastReceivedResponseTime := c.clock.Now()

for {
select {
case <-ctx.Done():
cancel()
return
case response := <-watchCh:
lastReceivedResponseTime = c.clock.Now()
if response.Err() == nil && !response.IsProgressNotify() {
lastRevision = response.Header.Revision
}

Loop:
// we must loop here until the response is sent to outCh
// or otherwise the response will be lost
for {
select {
case <-ctx.Done():
cancel()
return
case outCh <- response: // it may block here
break Loop
case <-ticker.C:
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)))
}
}
}
case <-ticker.C:
if err := c.RequestProgress(ctx); err != nil {
log.Warn("failed to request progress for etcd watcher", zap.Error(err))
}
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
// cancel the last cancel func to reset it
log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack"))
cancel()
watchCtx, cancel = context.WithCancel(ctx)
watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1))
// we need to reset lastReceivedResponseTime after reset Watch
lastReceivedResponseTime = c.clock.Now()
}
}
}
}

// RequestProgress requests a progress notify response be sent in all watch channels.
Expand Down
7 changes: 7 additions & 0 deletions pkg/orchestrator/etcd_worker_bank_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator/util"
Expand Down Expand Up @@ -121,6 +122,12 @@ func (b *bankReactor) Tick(ctx context.Context, state ReactorState) (nextState R

func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) {
defer testleak.AfterTest(c)()

_ = failpoint.Enable("github.com/pingcap/ticdc/pkg/orchestrator/InjectProgressRequestAfterCommit", "10%return(true)")
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/pkg/orchestrator/InjectProgressRequestAfterCommit")
}()

totalAccountNumber := 25
workerNumber := 10
var wg sync.WaitGroup
Expand Down

0 comments on commit 7f6507b

Please sign in to comment.