diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 94a693da8d0..78c0df71683 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -15,7 +15,9 @@ package etcd import ( "context" + "time" + "github.com/benbjohnson/clock" "github.com/pingcap/errors" "github.com/pingcap/log" cerrors "github.com/pingcap/ticdc/pkg/errors" @@ -41,6 +43,14 @@ const ( backoffBaseDelayInMs = 500 // in previous/backoff retry pkg, the DefaultMaxInterval = 60 * time.Second backoffMaxDelayInMs = 60 * 1000 + // If no msg comes from a etcd watchCh for etcdWatchChTimeoutDuration long, + // we should cancel the watchCh and request a new watchCh from etcd client + etcdWatchChTimeoutDuration = 10 * time.Second + // If no msg comes from a etcd watchCh for etcdRequestProgressDuration long, + // we should call RequestProgress of etcd client + etcdRequestProgressDuration = 1 * time.Second + // etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future + etcdWatchChBufferSize = 16 ) // set to var instead of const for mocking the value to speedup test @@ -50,11 +60,13 @@ var maxTries int64 = 8 type Client struct { cli *clientv3.Client metrics map[string]prometheus.Counter + // clock is for making it easier to mock time-related data structures in unit tests + clock clock.Clock } // Wrap warps a clientv3.Client that provides etcd APIs required by TiCDC. func Wrap(cli *clientv3.Client, metrics map[string]prometheus.Counter) *Client { - return &Client{cli: cli, metrics: metrics} + return &Client{cli: cli, metrics: metrics, clock: clock.New()} } // Unwrap returns a clientv3.Client @@ -165,7 +177,70 @@ func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts .. // Watch delegates request to clientv3.Watcher.Watch func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { - return c.cli.Watch(ctx, key, opts...) + watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize) + go c.WatchWithChan(ctx, watchCh, key, opts...) + return watchCh +} + +// WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh +func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, opts ...clientv3.OpOption) { + defer func() { + close(outCh) + log.Info("WatchWithChan exited") + }() + var lastRevision int64 + watchCtx, cancel := context.WithCancel(ctx) + defer cancel() + watchCh := c.cli.Watch(watchCtx, key, opts...) + + ticker := c.clock.Ticker(etcdRequestProgressDuration) + defer ticker.Stop() + lastReceivedResponseTime := c.clock.Now() + + for { + select { + case <-ctx.Done(): + cancel() + return + case response := <-watchCh: + lastReceivedResponseTime = c.clock.Now() + if response.Err() == nil { + lastRevision = response.Header.Revision + } + + Loop: + // we must loop here until the response is sent to outCh + // or otherwise the response will be lost + for { + select { + case <-ctx.Done(): + cancel() + return + case outCh <- response: // it may block here + break Loop + case <-ticker.C: + if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration { + log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime))) + } + } + } + + ticker.Reset(etcdRequestProgressDuration) + case <-ticker.C: + if err := c.RequestProgress(ctx); err != nil { + log.Warn("failed to request progress for etcd watcher", zap.Error(err)) + } + if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration { + // cancel the last cancel func to reset it + log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack")) + cancel() + watchCtx, cancel = context.WithCancel(ctx) + watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) + // we need to reset lastReceivedResponseTime after reset Watch + lastReceivedResponseTime = c.clock.Now() + } + } + } } // RequestProgress requests a progress notify response be sent in all watch channels. diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index 88d248fc409..9108a9029df 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -17,6 +17,7 @@ import ( "context" "time" + "github.com/benbjohnson/clock" "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/ticdc/pkg/util/testleak" @@ -45,6 +46,23 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3. return nil, errors.New("mock error") } +type mockWatcher struct { + clientv3.Watcher + watchCh chan clientv3.WatchResponse + resetCount *int + requestCount *int +} + +func (m mockWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { + *m.resetCount++ + return m.watchCh +} + +func (m mockWatcher) RequestProgress(ctx context.Context) error { + *m.requestCount++ + return nil +} + func (s *clientSuite) TestRetry(c *check.C) { defer testleak.AfterTest(c)() originValue := maxTries @@ -91,3 +109,113 @@ func (s *etcdSuite) TestDelegateLease(c *check.C) { c.Assert(err, check.IsNil) c.Assert(ttlResp.TTL, check.Equals, int64(-1)) } + +// test no data lost when WatchCh blocked +func (s *etcdSuite) TestWatchChBlocked(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + cli := clientv3.NewCtxClient(context.TODO()) + resetCount := 0 + requestCount := 0 + watchCh := make(chan clientv3.WatchResponse, 1) + watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount} + cli.Watcher = watcher + + sentRes := []clientv3.WatchResponse{ + {CompactRevision: 1}, + {CompactRevision: 2}, + {CompactRevision: 3}, + {CompactRevision: 4}, + {CompactRevision: 5}, + {CompactRevision: 6}, + } + + go func() { + for _, r := range sentRes { + watchCh <- r + } + }() + + mockClock := clock.NewMock() + watchCli := Wrap(cli, nil) + watchCli.clock = mockClock + + key := "testWatchChBlocked" + outCh := make(chan clientv3.WatchResponse, 6) + revision := int64(1) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + + go func() { + watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision)) + }() + receivedRes := make([]clientv3.WatchResponse, 0) + // wait for WatchWithChan set up + r := <-outCh + receivedRes = append(receivedRes, r) + // move time forward + mockClock.Add(time.Second * 30) + + for r := range outCh { + receivedRes = append(receivedRes, r) + } + + c.Check(sentRes, check.DeepEquals, receivedRes) + // make sure watchCh has been reset since timeout + c.Assert(*watcher.resetCount > 1, check.IsTrue) + // make sure RequestProgress has been call since timeout + c.Assert(*watcher.requestCount > 1, check.IsTrue) + // make sure etcdRequestProgressDuration is less than etcdWatchChTimeoutDuration + c.Assert(etcdRequestProgressDuration, check.Less, etcdWatchChTimeoutDuration) +} + +// test no data lost when OutCh blocked +func (s *etcdSuite) TestOutChBlocked(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + + cli := clientv3.NewCtxClient(context.TODO()) + resetCount := 0 + requestCount := 0 + watchCh := make(chan clientv3.WatchResponse, 1) + watcher := mockWatcher{watchCh: watchCh, resetCount: &resetCount, requestCount: &requestCount} + cli.Watcher = watcher + + mockClock := clock.NewMock() + watchCli := Wrap(cli, nil) + watchCli.clock = mockClock + + sentRes := []clientv3.WatchResponse{ + {CompactRevision: 1}, + {CompactRevision: 2}, + {CompactRevision: 3}, + } + + go func() { + for _, r := range sentRes { + watchCh <- r + } + }() + + key := "testOutChBlocked" + outCh := make(chan clientv3.WatchResponse, 1) + revision := int64(1) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + go func() { + watchCli.WatchWithChan(ctx, outCh, key, clientv3.WithPrefix(), clientv3.WithRev(revision)) + }() + receivedRes := make([]clientv3.WatchResponse, 0) + // wait for WatchWithChan set up + r := <-outCh + receivedRes = append(receivedRes, r) + // move time forward + mockClock.Add(time.Second * 30) + + for r := range outCh { + receivedRes = append(receivedRes, r) + } + + c.Check(sentRes, check.DeepEquals, receivedRes) +} diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 102fe84db11..e3da7bd5734 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -32,6 +32,19 @@ import ( "golang.org/x/time/rate" ) +<<<<<<< HEAD +======= +const ( + // etcdTxnTimeoutDuration represents the timeout duration for committing a + // transaction to Etcd + etcdTxnTimeoutDuration = 30 * time.Second + // etcdWorkerLogsWarnDuration when EtcdWorker commits a txn to etcd or ticks + // it reactor takes more than etcdWorkerLogsWarnDuration, it will print a log + etcdWorkerLogsWarnDuration = 1 * time.Second + deletionCounterKey = "/meta/ticdc-delete-etcd-key-count" +) + +>>>>>>> 60479c898 (etcd_worker: add timeout for etcd txn and watchCh (#3667)) // EtcdWorker handles all interactions with Etcd type EtcdWorker struct { client *etcd.Client @@ -58,6 +71,18 @@ type EtcdWorker struct { // a `compare-and-swap` semantics, which is essential for implementing // snapshot isolation for Reactor ticks. deleteCounter int64 +<<<<<<< HEAD +======= + + metrics *etcdWorkerMetrics +} + +type etcdWorkerMetrics struct { + // kv events related metrics + metricEtcdTxnSize prometheus.Observer + metricEtcdTxnDuration prometheus.Observer + metricEtcdWorkerTickDuration prometheus.Observer +>>>>>>> 60479c898 (etcd_worker: add timeout for etcd txn and watchCh (#3667)) } type etcdUpdate struct { @@ -102,13 +127,13 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, return errors.Trace(err) } - ctx1, cancel := context.WithCancel(ctx) - defer cancel() - ticker := time.NewTicker(timerInterval) defer ticker.Stop() - watchCh := worker.client.Watch(ctx1, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) + watchCtx, cancel := context.WithCancel(ctx) + defer cancel() + watchCh := worker.client.Watch(watchCtx, worker.prefix.String(), clientv3.WithPrefix(), clientv3.WithRev(worker.revision+1)) + var ( pendingPatches [][]DataPatch exiting bool @@ -120,7 +145,6 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, // should never be closed sessionDone = make(chan struct{}) } - lastReceivedEventTime := time.Now() // tickRate represents the number of times EtcdWorker can tick // the reactor per second @@ -135,18 +159,25 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, return cerrors.ErrEtcdSessionDone.GenWithStackByArgs() case <-ticker.C: // There is no new event to handle on timer ticks, so we have nothing here. +<<<<<<< HEAD if time.Since(lastReceivedEventTime) > etcdRequestProgressDuration { if err := worker.client.RequestProgress(ctx); err != nil { log.Warn("failed to request progress for etcd watcher", zap.Error(err)) } } case response = <-watchCh: +======= + case response := <-watchCh: +>>>>>>> 60479c898 (etcd_worker: add timeout for etcd txn and watchCh (#3667)) // In this select case, we receive new events from Etcd, and call handleEvent if appropriate. if err := response.Err(); err != nil { return errors.Trace(err) } +<<<<<<< HEAD lastReceivedEventTime = time.Now() +======= +>>>>>>> 60479c898 (etcd_worker: add timeout for etcd txn and watchCh (#3667)) // Check whether the response is stale. if worker.revision >= response.Header.GetRevision() { continue @@ -198,6 +229,14 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, } // it is safe that a batch of updates has been applied to worker.state before worker.reactor.Tick nextState, err := worker.reactor.Tick(ctx, worker.state) +<<<<<<< HEAD +======= + costTime := time.Since(startTime) + if costTime > etcdWorkerLogsWarnDuration { + log.Warn("EtcdWorker reactor tick took too long", zap.Duration("duration", costTime)) + } + worker.metrics.metricEtcdWorkerTickDuration.Observe(costTime.Seconds()) +>>>>>>> 60479c898 (etcd_worker: add timeout for etcd txn and watchCh (#3667)) if err != nil { if !cerrors.ErrReactorFinished.Equal(errors.Cause(err)) { return errors.Trace(err) @@ -295,6 +334,7 @@ func (worker *EtcdWorker) applyPatchGroups(ctx context.Context, patchGroups [][] return patchGroups, nil } +<<<<<<< HEAD func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch) error { state := worker.cloneRawState() changedSet := make(map[util.EtcdKey]struct{}) @@ -309,6 +349,15 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch) } cmps := make([]clientv3.Cmp, 0, len(changedSet)) ops := make([]clientv3.Op, 0, len(changedSet)) +======= +func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState map[util.EtcdKey][]byte, size int) error { + if len(changedState) == 0 { + return nil + } + + cmps := make([]clientv3.Cmp, 0, len(changedState)) + ops := make([]clientv3.Op, 0, len(changedState)) +>>>>>>> 60479c898 (etcd_worker: add timeout for etcd txn and watchCh (#3667)) hasDelete := false for key := range changedSet { // make sure someone else has not updated the key after the last snapshot @@ -344,7 +393,21 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch) panic("unreachable") } +<<<<<<< HEAD resp, err := worker.client.Txn(ctx).If(cmps...).Then(ops...).Commit() +======= + worker.metrics.metricEtcdTxnSize.Observe(float64(size)) + startTime := time.Now() + + txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) + resp, err := worker.client.Txn(txnCtx).If(cmps...).Then(ops...).Commit() + cancel() + costTime := time.Since(startTime) + if costTime > etcdWorkerLogsWarnDuration { + log.Warn("Etcd transaction took too long", zap.Duration("duration", costTime)) + } + worker.metrics.metricEtcdTxnDuration.Observe(costTime.Seconds()) +>>>>>>> 60479c898 (etcd_worker: add timeout for etcd txn and watchCh (#3667)) if err != nil { return errors.Trace(err) }