Skip to content

Commit

Permalink
etcd_worker: fix missed watch events caused by progress notifications (
Browse files Browse the repository at this point in the history
…pingcap#3848)

(cherry picked from commit 06547d9)
  • Loading branch information
liuzix authored and asddongmen committed Dec 15, 2021
1 parent 7992b85 commit 729e15b
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
return
case response := <-watchCh:
lastReceivedResponseTime = c.clock.Now()
if response.Err() == nil {
if response.Err() == nil && !response.IsProgressNotify() {
lastRevision = response.Header.Revision
}

Expand Down
76 changes: 64 additions & 12 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/orchestrator/util"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -69,7 +71,8 @@ type EtcdWorker struct {
// a `compare-and-swap` semantics, which is essential for implementing
// snapshot isolation for Reactor ticks.
deleteCounter int64
metrics *etcdWorkerMetrics

metrics *etcdWorkerMetrics
}

type etcdWorkerMetrics struct {
Expand Down Expand Up @@ -163,21 +166,40 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
return errors.Trace(err)
}

// Check whether the response is stale.
if worker.revision >= response.Header.GetRevision() {
// ProgressNotify implies no new events.
if response.IsProgressNotify() {
log.Debug("Etcd progress notification",
zap.Int64("revision", response.Header.GetRevision()))
// Note that we don't need to update the revision here, and we
// should not do so, because the revision of the progress notification
// may not satisfy the strict monotonicity we have expected.
//
// Updating `worker.revision` can cause a useful event with the
// same revision to be dropped erroneously.
//
// Refer to https://etcd.io/docs/v3.3/dev-guide/interacting_v3/#watch-progress
// "Note: The revision number in the progress notify response is the revision
// from the local etcd server node that the watch stream is connected to. [...]"
// This implies that the progress notification will NOT go through the raft
// consensus, thereby NOT affecting the revision (index).
continue
}
worker.revision = response.Header.GetRevision()

// ProgressNotify implies no new events.
if response.IsProgressNotify() {
// Check whether the response is stale.
if worker.revision >= response.Header.GetRevision() {
log.Info("Stale Etcd event dropped",
zap.Int64("event-revision", response.Header.GetRevision()),
zap.Int64("previous-revision", worker.revision),
zap.Any("events", response.Events))
continue
}
worker.revision = response.Header.GetRevision()

for _, event := range response.Events {
// handleEvent will apply the event to our internal `rawState`.
worker.handleEvent(ctx, event)
}

}

if len(pendingPatches) > 0 {
Expand Down Expand Up @@ -322,6 +344,10 @@ func (worker *EtcdWorker) applyPatchGroups(ctx context.Context, patchGroups [][]
}

func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState map[util.EtcdKey][]byte, size int) error {
if len(changedState) == 0 {
return nil
}

cmps := make([]clientv3.Cmp, 0, len(changedState))
ops := make([]clientv3.Op, 0, len(changedState))
hasDelete := false
Expand Down Expand Up @@ -365,6 +391,15 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m
txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
resp, err := worker.client.Txn(txnCtx).If(cmps...).Then(ops...).Commit()
cancel()

// For testing the situation where we have a progress notification that
// has the same revision as the committed Etcd transaction.
failpoint.Inject("InjectProgressRequestAfterCommit", func() {
if err := worker.client.RequestProgress(ctx); err != nil {
failpoint.Return(errors.Trace(err))
}
})

costTime := time.Since(startTime)
if costTime > etcdWorkerLogsWarnDuration {
log.Warn("Etcd transaction took too long", zap.Duration("duration", costTime))
Expand All @@ -380,6 +415,8 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m
return nil
}

// Logs the conditions for the failed Etcd transaction.
worker.logEtcdCmps(cmps)
return cerrors.ErrEtcdTryAgain.GenWithStackByArgs()
}

Expand All @@ -395,19 +432,34 @@ func (worker *EtcdWorker) applyUpdates() error {
return nil
}

func logEtcdOps(ops []clientv3.Op, commited bool) {
if log.GetLevel() != zapcore.DebugLevel || len(ops) == 0 {
func logEtcdOps(ops []clientv3.Op, committed bool) {
if committed && (log.GetLevel() != zapcore.DebugLevel || len(ops) == 0) {
return
}
log.Debug("[etcd worker] ==========Update State to ETCD==========")
logFn := log.Debug
if !committed {
logFn = log.Info
}

logFn("[etcd worker] ==========Update State to ETCD==========")
for _, op := range ops {
if op.IsDelete() {
log.Debug("[etcd worker] delete key", zap.ByteString("key", op.KeyBytes()))
logFn("[etcd worker] delete key", zap.ByteString("key", op.KeyBytes()))
} else {
log.Debug("[etcd worker] put key", zap.ByteString("key", op.KeyBytes()), zap.ByteString("value", op.ValueBytes()))
logFn("[etcd worker] put key", zap.ByteString("key", op.KeyBytes()), zap.ByteString("value", op.ValueBytes()))
}
}
log.Debug("[etcd worker] ============State Commit=============", zap.Bool("committed", commited))
logFn("[etcd worker] ============State Commit=============", zap.Bool("committed", committed))
}

func (worker *EtcdWorker) logEtcdCmps(cmps []clientv3.Cmp) {
log.Info("[etcd worker] ==========Failed Etcd Txn Cmps==========")
for _, cmp := range cmps {
cmp := etcdserverpb.Compare(cmp)
log.Info("[etcd worker] compare",
zap.String("cmp", cmp.String()))
}
log.Info("[etcd worker] ============End Failed Etcd Txn Cmps=============")
}

func (worker *EtcdWorker) cleanUp() {
Expand Down
7 changes: 7 additions & 0 deletions pkg/orchestrator/etcd_worker_bank_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator/util"
Expand Down Expand Up @@ -121,6 +122,12 @@ func (b *bankReactor) Tick(ctx context.Context, state ReactorState) (nextState R

func (s *etcdWorkerSuite) TestEtcdBank(c *check.C) {
defer testleak.AfterTest(c)()

_ = failpoint.Enable("github.com/pingcap/ticdc/pkg/orchestrator/InjectProgressRequestAfterCommit", "10%return(true)")
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/pkg/orchestrator/InjectProgressRequestAfterCommit")
}()

totalAccountNumber := 25
workerNumber := 10
var wg sync.WaitGroup
Expand Down

0 comments on commit 729e15b

Please sign in to comment.