diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 94a693da8d0..2c8ba2c7f93 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -165,7 +165,74 @@ func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts .. // Watch delegates request to clientv3.Watcher.Watch func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { +<<<<<<< HEAD return c.cli.Watch(ctx, key, opts...) +======= + watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize) + go c.WatchWithChan(ctx, watchCh, key, opts...) + return watchCh +} + +// WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh +func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, opts ...clientv3.OpOption) { + defer func() { + close(outCh) + log.Info("WatchWithChan exited") + }() + var lastRevision int64 + watchCtx, cancel := context.WithCancel(ctx) + defer cancel() + watchCh := c.cli.Watch(watchCtx, key, opts...) + + ticker := c.clock.Ticker(etcdRequestProgressDuration) + defer ticker.Stop() + lastReceivedResponseTime := c.clock.Now() + + for { + select { + case <-ctx.Done(): + cancel() + return + case response := <-watchCh: + lastReceivedResponseTime = c.clock.Now() + if response.Err() == nil && !response.IsProgressNotify() { + lastRevision = response.Header.Revision + } + + Loop: + // we must loop here until the response is sent to outCh + // or otherwise the response will be lost + for { + select { + case <-ctx.Done(): + cancel() + return + case outCh <- response: // it may block here + break Loop + case <-ticker.C: + if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration { + log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime))) + } + } + } + + ticker.Reset(etcdRequestProgressDuration) + case <-ticker.C: + if err := c.RequestProgress(ctx); err != nil { + log.Warn("failed to request progress for etcd watcher", zap.Error(err)) + } + if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration { + // cancel the last cancel func to reset it + log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack")) + cancel() + watchCtx, cancel = context.WithCancel(ctx) + watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1)) + // we need to reset lastReceivedResponseTime after reset Watch + lastReceivedResponseTime = c.clock.Now() + } + } + } +>>>>>>> 06547d9f9 (etcd_worker: fix missed watch events caused by progress notifications (#3848)) } // RequestProgress requests a progress notify response be sent in all watch channels. diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index acccfbdb21e..4aceb3d8886 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/etcd" @@ -27,6 +28,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" + "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -163,16 +165,39 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if err := response.Err(); err != nil { return errors.Trace(err) } +<<<<<<< HEAD lastReceivedEventTime = time.Now() +======= + + // ProgressNotify implies no new events. + if response.IsProgressNotify() { + log.Debug("Etcd progress notification", + zap.Int64("revision", response.Header.GetRevision())) + // Note that we don't need to update the revision here, and we + // should not do so, because the revision of the progress notification + // may not satisfy the strict monotonicity we have expected. + // + // Updating `worker.revision` can cause a useful event with the + // same revision to be dropped erroneously. + // + // Refer to https://etcd.io/docs/v3.3/dev-guide/interacting_v3/#watch-progress + // "Note: The revision number in the progress notify response is the revision + // from the local etcd server node that the watch stream is connected to. [...]" + // This implies that the progress notification will NOT go through the raft + // consensus, thereby NOT affecting the revision (index). + continue + } + +>>>>>>> 06547d9f9 (etcd_worker: fix missed watch events caused by progress notifications (#3848)) // Check whether the response is stale. if worker.revision >= response.Header.GetRevision() { + log.Info("Stale Etcd event dropped", + zap.Int64("event-revision", response.Header.GetRevision()), + zap.Int64("previous-revision", worker.revision), + zap.Any("events", response.Events)) continue } worker.revision = response.Header.GetRevision() - // ProgressNotify implies no new events. - if response.IsProgressNotify() { - continue - } for _, event := range response.Events { // handleEvent will apply the event to our internal `rawState`. @@ -362,10 +387,29 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m worker.metrics.metricEtcdTxnSize.Observe(float64(size)) startTime := time.Now() +<<<<<<< HEAD resp, err := worker.client.Txn(ctx).If(cmps...).Then(ops...).Commit() costTime := time.Since(startTime).Seconds() if costTime > time.Second.Seconds()*1 { log.Warn("etcdWorker commit etcd txn cost time more than 1 second") +======= + + txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) + resp, err := worker.client.Txn(txnCtx).If(cmps...).Then(ops...).Commit() + cancel() + + // For testing the situation where we have a progress notification that + // has the same revision as the committed Etcd transaction. + failpoint.Inject("InjectProgressRequestAfterCommit", func() { + if err := worker.client.RequestProgress(ctx); err != nil { + failpoint.Return(errors.Trace(err)) + } + }) + + costTime := time.Since(startTime) + if costTime > etcdWorkerLogsWarnDuration { + log.Warn("Etcd transaction took too long", zap.Duration("duration", costTime)) +>>>>>>> 06547d9f9 (etcd_worker: fix missed watch events caused by progress notifications (#3848)) } worker.metrics.metricEtcdTxnDuration.Observe(costTime) if err != nil { @@ -378,6 +422,8 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m return nil } + // Logs the conditions for the failed Etcd transaction. + worker.logEtcdCmps(cmps) return cerrors.ErrEtcdTryAgain.GenWithStackByArgs() } @@ -393,19 +439,34 @@ func (worker *EtcdWorker) applyUpdates() error { return nil } -func logEtcdOps(ops []clientv3.Op, commited bool) { - if log.GetLevel() != zapcore.DebugLevel || len(ops) == 0 { +func logEtcdOps(ops []clientv3.Op, committed bool) { + if committed && (log.GetLevel() != zapcore.DebugLevel || len(ops) == 0) { return } - log.Debug("[etcd worker] ==========Update State to ETCD==========") + logFn := log.Debug + if !committed { + logFn = log.Info + } + + logFn("[etcd worker] ==========Update State to ETCD==========") for _, op := range ops { if op.IsDelete() { - log.Debug("[etcd worker] delete key", zap.ByteString("key", op.KeyBytes())) + logFn("[etcd worker] delete key", zap.ByteString("key", op.KeyBytes())) } else { - log.Debug("[etcd worker] put key", zap.ByteString("key", op.KeyBytes()), zap.ByteString("value", op.ValueBytes())) + logFn("[etcd worker] put key", zap.ByteString("key", op.KeyBytes()), zap.ByteString("value", op.ValueBytes())) } } - log.Debug("[etcd worker] ============State Commit=============", zap.Bool("committed", commited)) + logFn("[etcd worker] ============State Commit=============", zap.Bool("committed", committed)) +} + +func (worker *EtcdWorker) logEtcdCmps(cmps []clientv3.Cmp) { + log.Info("[etcd worker] ==========Failed Etcd Txn Cmps==========") + for _, cmp := range cmps { + cmp := etcdserverpb.Compare(cmp) + log.Info("[etcd worker] compare", + zap.String("cmp", cmp.String())) + } + log.Info("[etcd worker] ============End Failed Etcd Txn Cmps=============") } func (worker *EtcdWorker) cleanUp() { diff --git a/pkg/orchestrator/etcd_worker_bank_test.go b/pkg/orchestrator/etcd_worker_bank_test.go index 8785115ec31..622f69912e0 100644 --- a/pkg/orchestrator/etcd_worker_bank_test.go +++ b/pkg/orchestrator/etcd_worker_bank_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/log" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/orchestrator/util" @@ -121,6 +122,12 @@ func (b *bankReactor) Tick(ctx context.Context, state ReactorState) (nextState R func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) { defer testleak.AfterTest(c)() + + _ = failpoint.Enable("github.com/pingcap/ticdc/pkg/orchestrator/InjectProgressRequestAfterCommit", "10%return(true)") + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/pkg/orchestrator/InjectProgressRequestAfterCommit") + }() + totalAccountNumber := 25 workerNumber := 10 var wg sync.WaitGroup