Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd_worker: add timeout for etcd txn and watchCh (#3667) #3758

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 77 additions & 2 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ package etcd

import (
"context"
"time"

"github.com/benbjohnson/clock"
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerrors "github.com/pingcap/ticdc/pkg/errors"
Expand All @@ -41,6 +43,14 @@ const (
backoffBaseDelayInMs = 500
// in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second
backoffMaxDelayInMs = 60 * 1000
// If no msg comes from a etcd watchCh for etcdWatchChTimeoutDuration long,
// we should cancel the watchCh and request a new watchCh from etcd client
etcdWatchChTimeoutDuration = 10 * time.Second
// If no msg comes from a etcd watchCh for etcdRequestProgressDuration long,
// we should call RequestProgress of etcd client
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
)

// set to var instead of const for mocking the value to speedup test
Expand All @@ -50,11 +60,13 @@ var maxTries int64 = 8
type Client struct {
cli *clientv3.Client
metrics map[string]prometheus.Counter
// clock is for making it easier to mock time-related data structures in unit tests
clock clock.Clock
}

// Wrap warps a clientv3.Client that provides etcd APIs required by TiCDC.
func Wrap(cli *clientv3.Client, metrics map[string]prometheus.Counter) *Client {
return &Client{cli: cli, metrics: metrics}
return &Client{cli: cli, metrics: metrics, clock: clock.New()}
}

// Unwrap returns a clientv3.Client
Expand Down Expand Up @@ -165,7 +177,70 @@ func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ..

// Watch delegates request to clientv3.Watcher.Watch
func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
return c.cli.Watch(ctx, key, opts...)
watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize)
go c.WatchWithChan(ctx, watchCh, key, opts...)
return watchCh
}

// WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh
func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, opts ...clientv3.OpOption) {
defer func() {
close(outCh)
log.Info("WatchWithChan exited")
}()
var lastRevision int64
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
watchCh := c.cli.Watch(watchCtx, key, opts...)

ticker := c.clock.Ticker(etcdRequestProgressDuration)
defer ticker.Stop()
lastReceivedResponseTime := c.clock.Now()

for {
select {
case <-ctx.Done():
cancel()
return
case response := <-watchCh:
lastReceivedResponseTime = c.clock.Now()
if response.Err() == nil {
lastRevision = response.Header.Revision
}

Loop:
// we must loop here until the response is sent to outCh
// or otherwise the response will be lost
for {
select {
case <-ctx.Done():
cancel()
return
case outCh <- response: // it may block here
break Loop
case <-ticker.C:
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)))
}
}
}

ticker.Reset(etcdRequestProgressDuration)
case <-ticker.C:
if err := c.RequestProgress(ctx); err != nil {
log.Warn("failed to request progress for etcd watcher", zap.Error(err))
}
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
// cancel the last cancel func to reset it
log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack"))
cancel()
watchCtx, cancel = context.WithCancel(ctx)
watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1))
// we need to reset lastReceivedResponseTime after reset Watch
lastReceivedResponseTime = c.clock.Now()
}
}
}
}

// RequestProgress requests a progress notify response be sent in all watch channels.
Expand Down
128 changes: 128 additions & 0 deletions pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"time"

"github.com/benbjohnson/clock"
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/ticdc/pkg/util/testleak"
Expand Down Expand Up @@ -45,6 +46,23 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3.
return nil, errors.New("mock error")
}

type mockWatcher struct {
clientv3.Watcher
watchCh chan clientv3.WatchResponse
resetCount *int
requestCount *int
}

func (m mockWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
*m.resetCount++
return m.watchCh
}

func (m mockWatcher) RequestProgress(ctx context.Context) error {
*m.requestCount++
return nil
}

func (s *clientSuite) TestRetry(c *check.C) {
defer testleak.AfterTest(c)()
originValue := maxTries
Expand Down Expand Up @@ -91,3 +109,113 @@ func (s *etcdSuite) TestDelegateLease(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(ttlResp.TTL, check.Equals, int64(-1))
}

// test no data lost when WatchCh blocked
func (s *etcdSuite) TestWatchChBlocked(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
cli := clientv3.NewCtxClient(context.TODO())
resetCount := 0
requestCount := 0
watchCh := make(chan clientv3.WatchResponse, 1)
watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount}
cli.Watcher = watcher

sentRes := []clientv3.WatchResponse{
{CompactRevision: 1},
{CompactRevision: 2},
{CompactRevision: 3},
{CompactRevision: 4},
{CompactRevision: 5},
{CompactRevision: 6},
}

go func() {
for _, r := range sentRes {
watchCh <- r
}
}()

mockClock := clock.NewMock()
watchCli := Wrap(cli, nil)
watchCli.clock = mockClock

key := "testWatchChBlocked"
outCh := make(chan clientv3.WatchResponse, 6)
revision := int64(1)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()

go func() {
watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision))
}()
receivedRes := make([]clientv3.WatchResponse, 0)
// wait for WatchWithChan set up
r := <-outCh
receivedRes = append(receivedRes, r)
// move time forward
mockClock.Add(time.Second * 30)

for r := range outCh {
receivedRes = append(receivedRes, r)
}

c.Check(sentRes, check.DeepEquals, receivedRes)
// make sure watchCh has been reset since timeout
c.Assert(*watcher.resetCount > 1, check.IsTrue)
// make sure RequestProgress has been call since timeout
c.Assert(*watcher.requestCount > 1, check.IsTrue)
// make sure etcdRequestProgressDuration is less than etcdWatchChTimeoutDuration
c.Assert(etcdRequestProgressDuration, check.Less, etcdWatchChTimeoutDuration)
}

// test no data lost when OutCh blocked
func (s *etcdSuite) TestOutChBlocked(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

cli := clientv3.NewCtxClient(context.TODO())
resetCount := 0
requestCount := 0
watchCh := make(chan clientv3.WatchResponse, 1)
watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount}
cli.Watcher = watcher

mockClock := clock.NewMock()
watchCli := Wrap(cli, nil)
watchCli.clock = mockClock

sentRes := []clientv3.WatchResponse{
{CompactRevision: 1},
{CompactRevision: 2},
{CompactRevision: 3},
}

go func() {
for _, r := range sentRes {
watchCh <- r
}
}()

key := "testOutChBlocked"
outCh := make(chan clientv3.WatchResponse, 1)
revision := int64(1)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
go func() {
watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision))
}()
receivedRes := make([]clientv3.WatchResponse, 0)
// wait for WatchWithChan set up
r := <-outCh
receivedRes = append(receivedRes, r)
// move time forward
mockClock.Add(time.Second * 30)

for r := range outCh {
receivedRes = append(receivedRes, r)
}

c.Check(sentRes, check.DeepEquals, receivedRes)
}
Loading