diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index cb3077faaa631..2717bff72be16 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -50,14 +50,18 @@ import ( // GCWorker periodically triggers GC process on tikv server. type GCWorker struct { - uuid string - desc string - store tikv.Storage - pdClient pd.Client - gcIsRunning bool - lastFinish time.Time - cancel context.CancelFunc - done chan error + uuid string + desc string + store tikv.Storage + pdClient pd.Client + gcIsRunning bool + lastFinish time.Time + cancel context.CancelFunc + done chan error + testingKnobs struct { + scanLocks func(key []byte) []*tikv.Lock + resolveLocks func(regionID tikv.RegionVerID) (ok bool, err error) + } } // NewGCWorker creates a GCWorker instance. @@ -1003,6 +1007,7 @@ func (w *GCWorker) resolveLocksForRange(ctx context.Context, safePoint uint64, s ctx = context.WithValue(ctx, "injectedBackoff", struct{}{}) bo = tikv.NewBackofferWithVars(ctx, sleep, nil) }) +retryScanAndResolve: for { select { case <-ctx.Done(): @@ -1042,17 +1047,33 @@ func (w *GCWorker) resolveLocksForRange(ctx context.Context, safePoint uint64, s for i := range locksInfo { locks[i] = tikv.NewLock(locksInfo[i]) } - - ok, err1 := w.store.GetLockResolver().BatchResolveLocks(bo, locks, loc.Region) - if err1 != nil { - return stat, errors.Trace(err1) + if w.testingKnobs.scanLocks != nil { + locks = append(locks, w.testingKnobs.scanLocks(key)...) } - if !ok { - err = bo.Backoff(tikv.BoTxnLock, errors.Errorf("remain locks: %d", len(locks))) - if err != nil { - return stat, errors.Trace(err) + for { + ok, err1 := w.store.GetLockResolver().BatchResolveLocks(bo, locks, loc.Region) + if w.testingKnobs.resolveLocks != nil { + ok, err1 = w.testingKnobs.resolveLocks(loc.Region) } - continue + if err1 != nil { + return stat, errors.Trace(err1) + } + if !ok { + err = bo.Backoff(tikv.BoTxnLock, errors.Errorf("remain locks: %d", len(locks))) + if err != nil { + return stat, errors.Trace(err) + } + stillInSame, refreshedLoc, err := w.tryRelocateLocksRegion(bo, locks) + if err != nil { + return stat, errors.Trace(err) + } + if stillInSame { + loc = refreshedLoc + continue + } + continue retryScanAndResolve + } + break } if len(locks) < gcScanLockLimit { stat.CompletedRegions++ @@ -1078,6 +1099,18 @@ func (w *GCWorker) resolveLocksForRange(ctx context.Context, safePoint uint64, s return stat, nil } +func (w *GCWorker) tryRelocateLocksRegion(bo *tikv.Backoffer, locks []*tikv.Lock) (stillInSameRegion bool, refreshedLoc *tikv.KeyLocation, err error) { + if len(locks) == 0 { + return + } + refreshedLoc, err = w.store.GetRegionCache().LocateKey(bo, locks[0].Key) + if err != nil { + return + } + stillInSameRegion = refreshedLoc.Contains(locks[len(locks)-1].Key) + return +} + // resolveLocksPhysical uses TiKV's `PhysicalScanLock` to scan stale locks in the cluster and resolve them. It tries to // ensure no lock whose ts <= safePoint is left. func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) error { diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index d0d91b431bc37..548a3f917b1ca 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -839,6 +839,39 @@ func (s *testGCWorkerSuite) TestResolveLockRangeInfine(c *C) { c.Assert(err, NotNil) } +func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionCacheMiss(c *C) { + var ( + scanCnt int + scanCntRef = &scanCnt + resolveCnt int + resolveCntRef = &resolveCnt + ) + s.gcWorker.testingKnobs.scanLocks = func(key []byte) []*tikv.Lock { + *scanCntRef++ + return []*tikv.Lock{ + { + Key: []byte{1}, + }, + { + Key: []byte{1}, + }, + } + } + s.gcWorker.testingKnobs.resolveLocks = func(regionID tikv.RegionVerID) (ok bool, err error) { + *resolveCntRef++ + if *resolveCntRef == 1 { + s.gcWorker.store.GetRegionCache().InvalidateCachedRegion(regionID) + // mock the region cache miss error + return false, nil + } + return true, nil + } + _, err := s.gcWorker.resolveLocksForRange(context.Background(), 1, []byte{0}, []byte{10}) + c.Assert(err, IsNil) + c.Assert(resolveCnt, Equals, 2) + c.Assert(scanCnt, Equals, 1) +} + func (s *testGCWorkerSuite) TestRunGCJob(c *C) { gcSafePointCacheInterval = 0