Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#3848
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
liuzix authored and ti-chi-bot committed Dec 14, 2021
1 parent 4127b84 commit 8a32dc4
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 6 deletions.
67 changes: 67 additions & 0 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,74 @@ func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ..

// Watch delegates request to clientv3.Watcher.Watch
func (c *Client) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
<<<<<<< HEAD
return c.cli.Watch(ctx, key, opts...)
=======
watchCh := make(chan clientv3.WatchResponse, etcdWatchChBufferSize)
go c.WatchWithChan(ctx, watchCh, key, opts...)
return watchCh
}

// WatchWithChan maintains a watchCh and sends all msg from the watchCh to outCh
func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchResponse, key string, opts ...clientv3.OpOption) {
defer func() {
close(outCh)
log.Info("WatchWithChan exited")
}()
var lastRevision int64
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
watchCh := c.cli.Watch(watchCtx, key, opts...)

ticker := c.clock.Ticker(etcdRequestProgressDuration)
defer ticker.Stop()
lastReceivedResponseTime := c.clock.Now()

for {
select {
case <-ctx.Done():
cancel()
return
case response := <-watchCh:
lastReceivedResponseTime = c.clock.Now()
if response.Err() == nil && !response.IsProgressNotify() {
lastRevision = response.Header.Revision
}

Loop:
// we must loop here until the response is sent to outCh
// or otherwise the response will be lost
for {
select {
case <-ctx.Done():
cancel()
return
case outCh <- response: // it may block here
break Loop
case <-ticker.C:
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
log.Warn("etcd client outCh blocking too long, the etcdWorker may be stuck", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)))
}
}
}

ticker.Reset(etcdRequestProgressDuration)
case <-ticker.C:
if err := c.RequestProgress(ctx); err != nil {
log.Warn("failed to request progress for etcd watcher", zap.Error(err))
}
if c.clock.Since(lastReceivedResponseTime) >= etcdWatchChTimeoutDuration {
// cancel the last cancel func to reset it
log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack"))
cancel()
watchCtx, cancel = context.WithCancel(ctx)
watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1))
// we need to reset lastReceivedResponseTime after reset Watch
lastReceivedResponseTime = c.clock.Now()
}
}
}
>>>>>>> 06547d9f9 (etcd_worker: fix missed watch events caused by progress notifications (#3848))
}

// RequestProgress requests a progress notify response be sent in all watch channels.
Expand Down
83 changes: 77 additions & 6 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/orchestrator/util"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -145,18 +147,47 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
if err := response.Err(); err != nil {
return errors.Trace(err)
}
<<<<<<< HEAD
lastReceivedEventTime = time.Now()
=======

// ProgressNotify implies no new events.
if response.IsProgressNotify() {
log.Debug("Etcd progress notification",
zap.Int64("revision", response.Header.GetRevision()))
// Note that we don't need to update the revision here, and we
// should not do so, because the revision of the progress notification
// may not satisfy the strict monotonicity we have expected.
//
// Updating `worker.revision` can cause a useful event with the
// same revision to be dropped erroneously.
//
// Refer to https://etcd.io/docs/v3.3/dev-guide/interacting_v3/#watch-progress
// "Note: The revision number in the progress notify response is the revision
// from the local etcd server node that the watch stream is connected to. [...]"
// This implies that the progress notification will NOT go through the raft
// consensus, thereby NOT affecting the revision (index).
continue
}
>>>>>>> 06547d9f9 (etcd_worker: fix missed watch events caused by progress notifications (#3848))

// Check whether the response is stale.
if worker.revision >= response.Header.GetRevision() {
log.Info("Stale Etcd event dropped",
zap.Int64("event-revision", response.Header.GetRevision()),
zap.Int64("previous-revision", worker.revision),
zap.Any("events", response.Events))
continue
}
worker.revision = response.Header.GetRevision()
<<<<<<< HEAD

// ProgressNotify implies no new events.
if response.IsProgressNotify() {
continue
}
=======
>>>>>>> 06547d9f9 (etcd_worker: fix missed watch events caused by progress notifications (#3848))

for _, event := range response.Events {
// handleEvent will apply the event to our internal `rawState`.
Expand Down Expand Up @@ -344,7 +375,30 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch)
panic("unreachable")
}

<<<<<<< HEAD
resp, err := worker.client.Txn(ctx).If(cmps...).Then(ops...).Commit()
=======
worker.metrics.metricEtcdTxnSize.Observe(float64(size))
startTime := time.Now()

txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
resp, err := worker.client.Txn(txnCtx).If(cmps...).Then(ops...).Commit()
cancel()

// For testing the situation where we have a progress notification that
// has the same revision as the committed Etcd transaction.
failpoint.Inject("InjectProgressRequestAfterCommit", func() {
if err := worker.client.RequestProgress(ctx); err != nil {
failpoint.Return(errors.Trace(err))
}
})

costTime := time.Since(startTime)
if costTime > etcdWorkerLogsWarnDuration {
log.Warn("Etcd transaction took too long", zap.Duration("duration", costTime))
}
worker.metrics.metricEtcdTxnDuration.Observe(costTime.Seconds())
>>>>>>> 06547d9f9 (etcd_worker: fix missed watch events caused by progress notifications (#3848))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -355,6 +409,8 @@ func (worker *EtcdWorker) applyPatches(ctx context.Context, patches []DataPatch)
return nil
}

// Logs the conditions for the failed Etcd transaction.
worker.logEtcdCmps(cmps)
return cerrors.ErrEtcdTryAgain.GenWithStackByArgs()
}

Expand All @@ -370,19 +426,34 @@ func (worker *EtcdWorker) applyUpdates() error {
return nil
}

func logEtcdOps(ops []clientv3.Op, commited bool) {
if log.GetLevel() != zapcore.DebugLevel || len(ops) == 0 {
func logEtcdOps(ops []clientv3.Op, committed bool) {
if committed && (log.GetLevel() != zapcore.DebugLevel || len(ops) == 0) {
return
}
log.Debug("[etcd worker] ==========Update State to ETCD==========")
logFn := log.Debug
if !committed {
logFn = log.Info
}

logFn("[etcd worker] ==========Update State to ETCD==========")
for _, op := range ops {
if op.IsDelete() {
log.Debug("[etcd worker] delete key", zap.ByteString("key", op.KeyBytes()))
logFn("[etcd worker] delete key", zap.ByteString("key", op.KeyBytes()))
} else {
log.Debug("[etcd worker] put key", zap.ByteString("key", op.KeyBytes()), zap.ByteString("value", op.ValueBytes()))
logFn("[etcd worker] put key", zap.ByteString("key", op.KeyBytes()), zap.ByteString("value", op.ValueBytes()))
}
}
log.Debug("[etcd worker] ============State Commit=============", zap.Bool("committed", commited))
logFn("[etcd worker] ============State Commit=============", zap.Bool("committed", committed))
}

func (worker *EtcdWorker) logEtcdCmps(cmps []clientv3.Cmp) {
log.Info("[etcd worker] ==========Failed Etcd Txn Cmps==========")
for _, cmp := range cmps {
cmp := etcdserverpb.Compare(cmp)
log.Info("[etcd worker] compare",
zap.String("cmp", cmp.String()))
}
log.Info("[etcd worker] ============End Failed Etcd Txn Cmps=============")
}

func (worker *EtcdWorker) cleanUp() {
Expand Down
7 changes: 7 additions & 0 deletions pkg/orchestrator/etcd_worker_bank_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator/util"
Expand Down Expand Up @@ -121,6 +122,12 @@ func (b *bankReactor) Tick(ctx context.Context, state ReactorState) (nextState R

func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) {
defer testleak.AfterTest(c)()

_ = failpoint.Enable("github.com/pingcap/ticdc/pkg/orchestrator/InjectProgressRequestAfterCommit", "10%return(true)")
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/pkg/orchestrator/InjectProgressRequestAfterCommit")
}()

totalAccountNumber := 25
workerNumber := 10
var wg sync.WaitGroup
Expand Down

0 comments on commit 8a32dc4

Please sign in to comment.