diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 9ece3a867b954..8c833ca6d49f8 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -57,8 +57,8 @@ type GCWorker struct { cancel context.CancelFunc done chan error testingKnobs struct { - scanLocks func(key []byte) []*tikv.Lock - resolveLocks func(regionID tikv.RegionVerID) (ok bool, err error) + scanLocks func(key []byte, regionID uint64) []*tikv.Lock + resolveLocks func(locks []*tikv.Lock, regionID tikv.RegionVerID) (ok bool, err error) } } @@ -868,12 +868,18 @@ retryScanAndResolve: locks[i] = tikv.NewLock(locksInfo[i]) } if w.testingKnobs.scanLocks != nil { - locks = append(locks, w.testingKnobs.scanLocks(key)...) + locks = append(locks, w.testingKnobs.scanLocks(key, loc.Region.GetID())...) } + locForResolve := loc for { - ok, err1 := w.store.GetLockResolver().BatchResolveLocks(bo, locks, loc.Region) + var ( + ok bool + err1 error + ) if w.testingKnobs.resolveLocks != nil { - ok, err1 = w.testingKnobs.resolveLocks(loc.Region) + ok, err1 = w.testingKnobs.resolveLocks(locks, locForResolve.Region) + } else { + ok, err1 = w.store.GetLockResolver().BatchResolveLocks(bo, locks, locForResolve.Region) } if err1 != nil { return regions, errors.Trace(err1) @@ -888,7 +894,7 @@ retryScanAndResolve: return regions, errors.Trace(err) } if stillInSame { - loc = refreshedLoc + locForResolve = refreshedLoc continue } continue retryScanAndResolve @@ -901,7 +907,7 @@ retryScanAndResolve: } else { logutil.Logger(ctx).Info("[gc worker] region has more than limit locks", zap.String("uuid", w.uuid), - zap.Uint64("region", loc.Region.GetID()), + zap.Uint64("region", locForResolve.Region.GetID()), zap.Int("scan lock limit", gcScanLockLimit)) metrics.GCRegionTooManyLocksCounter.Inc() key = locks[len(locks)-1].Key diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index bfe06ad9ef367..e77c85b100793 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -14,6 +14,7 @@ package gcworker import ( + "bytes" "context" "math" "strconv" @@ -23,6 +24,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/errorpb" + "github.com/pingcap/kvproto/pkg/metapb" pd "github.com/pingcap/pd/client" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" @@ -38,12 +40,17 @@ func TestT(t *testing.T) { } type testGCWorkerSuite struct { - store tikv.Storage - cluster *mocktikv.Cluster - oracle *mockoracle.MockOracle - gcWorker *GCWorker - dom *domain.Domain - pdClient pd.Client + store tikv.Storage + cluster *mocktikv.Cluster + oracle *mockoracle.MockOracle + gcWorker *GCWorker + dom *domain.Domain + pdClient pd.Client + initRegion struct { + storeIDs []uint64 + peerIDs []uint64 + regionID uint64 + } } var _ = Suite(&testGCWorkerSuite{}) @@ -52,7 +59,7 @@ func (s *testGCWorkerSuite) SetUpTest(c *C) { tikv.NewGCHandlerFunc = NewGCWorker s.cluster = mocktikv.NewCluster() - mocktikv.BootstrapWithSingleStore(s.cluster) + s.initRegion.storeIDs, s.initRegion.peerIDs, s.initRegion.regionID, _ = mocktikv.BootstrapWithMultiStores(s.cluster, 3) store, err := mockstore.NewMockTikvStore(mockstore.WithCluster(s.cluster)) s.store = store.(tikv.Storage) @@ -346,7 +353,7 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionCacheMiss(c *C) { resolveCnt int resolveCntRef = &resolveCnt ) - s.gcWorker.testingKnobs.scanLocks = func(key []byte) []*tikv.Lock { + s.gcWorker.testingKnobs.scanLocks = func(key []byte, regionID uint64) []*tikv.Lock { *scanCntRef++ return []*tikv.Lock{ { @@ -357,7 +364,7 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionCacheMiss(c *C) { }, } } - s.gcWorker.testingKnobs.resolveLocks = func(regionID tikv.RegionVerID) (ok bool, err error) { + s.gcWorker.testingKnobs.resolveLocks = func(locks []*tikv.Lock, regionID tikv.RegionVerID) (ok bool, err error) { *resolveCntRef++ if *resolveCntRef == 1 { s.gcWorker.store.GetRegionCache().InvalidateCachedRegion(regionID) @@ -372,6 +379,74 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionCacheMiss(c *C) { c.Assert(scanCnt, Equals, 1) } +func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionEnlargeCausedByRegionMerge(c *C) { + var ( + firstAccess = true + firstAccessRef = &firstAccess + resolvedLock [][]byte + ) + + // key range: ['' - 'm' - 'z'] + region2 := s.cluster.AllocID() + newPeers := []uint64{s.cluster.AllocID(), s.cluster.AllocID(), s.cluster.AllocID()} + s.cluster.Split(s.initRegion.regionID, region2, []byte("m"), newPeers, newPeers[0]) + + // init a, b lock in region1 and o, p locks in region2 + s.gcWorker.testingKnobs.scanLocks = func(key []byte, regionID uint64) []*tikv.Lock { + if regionID == s.initRegion.regionID { + return []*tikv.Lock{{Key: []byte("a")}, {Key: []byte("b")}} + } + if regionID == region2 { + return []*tikv.Lock{{Key: []byte("o")}, {Key: []byte("p")}} + } + return []*tikv.Lock{} + } + + s.gcWorker.testingKnobs.resolveLocks = func(locks []*tikv.Lock, regionID tikv.RegionVerID) (ok bool, err error) { + if regionID.GetID() == s.initRegion.regionID && *firstAccessRef { + *firstAccessRef = false + // merge region2 into region1 and return EpochNotMatch error. + mCluster := s.cluster + mCluster.Merge(s.initRegion.regionID, region2) + regionMeta, _ := mCluster.GetRegion(s.initRegion.regionID) + s.store.GetRegionCache().OnRegionEpochNotMatch( + tikv.NewNoopBackoff(context.Background()), + &tikv.RPCContext{Region: regionID, Store: &tikv.Store{}}, + []*metapb.Region{regionMeta}) + // also let region1 contains all 4 locks + s.gcWorker.testingKnobs.scanLocks = func(key []byte, regionID uint64) []*tikv.Lock { + if regionID == s.initRegion.regionID { + locks := []*tikv.Lock{ + {Key: []byte("a")}, + {Key: []byte("b")}, + {Key: []byte("o")}, + {Key: []byte("p")}, + } + for i, lock := range locks { + if bytes.Compare(key, lock.Key) <= 0 { + return locks[i:] + } + } + } + return []*tikv.Lock{} + } + return false, nil + } + for _, lock := range locks { + resolvedLock = append(resolvedLock, lock.Key) + } + return true, nil + } + + _, err := s.gcWorker.resolveLocksForRange(context.Background(), 1, []byte(""), []byte("z")) + c.Assert(err, IsNil) + c.Assert(len(resolvedLock), Equals, 4) + expects := [][]byte{[]byte("a"), []byte("b"), []byte("o"), []byte("p")} + for i, l := range resolvedLock { + c.Assert(l, BytesEquals, expects[i]) + } +} + func (s *testGCWorkerSuite) TestRunGCJob(c *C) { gcSafePointCacheInterval = 0 err := RunGCJob(context.Background(), s.store, 0, "mock", 1)