Skip to content

Commit

Permalink
kv/client: don't resolve lock immediately after a region is initializ…
Browse files Browse the repository at this point in the history
…ed (#2235)
  • Loading branch information
amyangfei authored Jul 12, 2021
1 parent 526d5de commit bda60ed
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 8 deletions.
21 changes: 14 additions & 7 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, s
}

func (w *regionWorker) resolveLock(ctx context.Context) error {
// tikv resolved update interval is 1s, use half of the resolck lock interval
// as lock penalty.
resolveLockPenalty := 10
resolveLockInterval := 20 * time.Second
failpoint.Inject("kvClientResolveLockInterval", func(val failpoint.Value) {
resolveLockInterval = time.Duration(val.(int)) * time.Second
Expand Down Expand Up @@ -304,6 +307,17 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
zap.Duration("duration", sinceLastResolvedTs), zap.Duration("since last event", sinceLastResolvedTs))
return errReconnect
}
// Only resolve lock if the resovled-ts keeps unchanged for
// more than resolveLockPenalty times.
if rts.ts.penalty < resolveLockPenalty {
if lastResolvedTs > rts.ts.resolvedTs {
rts.ts.resolvedTs = lastResolvedTs
rts.ts.eventTime = time.Now()
rts.ts.penalty = 0
}
w.rtsManager.Upsert(rts)
continue
}
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock",
zap.Uint64("regionID", rts.regionID),
zap.Stringer("span", state.getRegionSpan()),
Expand Down Expand Up @@ -586,13 +600,6 @@ func (w *regionWorker) handleEventEntry(
}
w.metrics.metricPullEventInitializedCounter.Inc()

select {
case w.rtsUpdateCh <- &regionTsInfo{regionID: regionID, ts: newResolvedTsItem(state.sri.ts)}:
default:
// rtsUpdateCh block often means too many regions are suffering
// lock resolve, the kv client status is not very healthy.
log.Warn("region is not upsert into rts manager", zap.Uint64("region-id", regionID))
}
state.initialized = true
w.session.regionRouter.Release(state.sri.rpcCtx.Addr)
cachedEvents := state.matcher.matchCachedRow()
Expand Down
8 changes: 7 additions & 1 deletion cdc/kv/resolvedts_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type tsItem struct {
sortByEvTime bool
resolvedTs uint64
eventTime time.Time
penalty int
}

func newResolvedTsItem(ts uint64) tsItem {
Expand Down Expand Up @@ -92,9 +93,14 @@ func (rm *regionTsManager) Upsert(item *regionTsInfo) {
if old, ok := rm.m[item.regionID]; ok {
// in a single resolved ts manager, the resolved ts of a region should not be fallen back
if !item.ts.sortByEvTime {
if item.ts.resolvedTs > old.ts.resolvedTs || item.ts.eventTime.After(old.ts.eventTime) {
if item.ts.resolvedTs == old.ts.resolvedTs && item.ts.eventTime.After(old.ts.eventTime) {
old.ts.penalty++
old.ts.eventTime = item.ts.eventTime
heap.Fix(&rm.h, old.index)
} else if item.ts.resolvedTs > old.ts.resolvedTs {
old.ts.resolvedTs = item.ts.resolvedTs
old.ts.eventTime = item.ts.eventTime
old.ts.penalty = 0
heap.Fix(&rm.h, old.index)
}
} else {
Expand Down
29 changes: 29 additions & 0 deletions cdc/kv/resolvedts_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,35 @@ func (s *rtsHeapSuite) TestRegionTsManagerResolvedTs(c *check.C) {
c.Assert(rts, check.IsNil)
}

func (s *rtsHeapSuite) TestRegionTsManagerPenalty(c *check.C) {
defer testleak.AfterTest(c)()
mgr := newRegionTsManager()
initRegions := []*regionTsInfo{
{regionID: 100, ts: newResolvedTsItem(1000)},
}
for _, rts := range initRegions {
mgr.Upsert(rts)
}
c.Assert(mgr.Len(), check.Equals, 1)

// test penalty increases if resolved ts keeps unchanged
for i := 0; i < 6; i++ {
rts := &regionTsInfo{regionID: 100, ts: newResolvedTsItem(1000)}
mgr.Upsert(rts)
}
rts := mgr.Pop()
c.Assert(rts.ts.resolvedTs, check.Equals, uint64(1000))
c.Assert(rts.ts.penalty, check.Equals, 6)

// test penalty is cleared to zero if resolved ts is advanced
mgr.Upsert(rts)
rtsNew := &regionTsInfo{regionID: 100, ts: newResolvedTsItem(2000)}
mgr.Upsert(rtsNew)
rts = mgr.Pop()
c.Assert(rts.ts.penalty, check.DeepEquals, 0)
c.Assert(rts.ts.resolvedTs, check.DeepEquals, uint64(2000))
}

func (s *rtsHeapSuite) TestRegionTsManagerEvTime(c *check.C) {
defer testleak.AfterTest(c)()
mgr := newRegionTsManager()
Expand Down

0 comments on commit bda60ed

Please sign in to comment.