Skip to content

Commit

Permalink
gcworker: fix gc miss locks when region merged during scanning & reso…
Browse files Browse the repository at this point in the history
…lving locks (#22252) (#22266)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
Signed-off-by: lysu <sulifx@gmail.com>
  • Loading branch information
ti-srebot authored Jan 11, 2021
1 parent 9ab0bbf commit 0cc3d77
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 16 deletions.
20 changes: 13 additions & 7 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ type GCWorker struct {
cancel context.CancelFunc
done chan error
testingKnobs struct {
scanLocks func(key []byte) []*tikv.Lock
resolveLocks func(regionID tikv.RegionVerID) (ok bool, err error)
scanLocks func(key []byte, regionID uint64) []*tikv.Lock
resolveLocks func(locks []*tikv.Lock, regionID tikv.RegionVerID) (ok bool, err error)
}
}

Expand Down Expand Up @@ -868,12 +868,18 @@ retryScanAndResolve:
locks[i] = tikv.NewLock(locksInfo[i])
}
if w.testingKnobs.scanLocks != nil {
locks = append(locks, w.testingKnobs.scanLocks(key)...)
locks = append(locks, w.testingKnobs.scanLocks(key, loc.Region.GetID())...)
}
locForResolve := loc
for {
ok, err1 := w.store.GetLockResolver().BatchResolveLocks(bo, locks, loc.Region)
var (
ok bool
err1 error
)
if w.testingKnobs.resolveLocks != nil {
ok, err1 = w.testingKnobs.resolveLocks(loc.Region)
ok, err1 = w.testingKnobs.resolveLocks(locks, locForResolve.Region)
} else {
ok, err1 = w.store.GetLockResolver().BatchResolveLocks(bo, locks, locForResolve.Region)
}
if err1 != nil {
return regions, errors.Trace(err1)
Expand All @@ -888,7 +894,7 @@ retryScanAndResolve:
return regions, errors.Trace(err)
}
if stillInSame {
loc = refreshedLoc
locForResolve = refreshedLoc
continue
}
continue retryScanAndResolve
Expand All @@ -901,7 +907,7 @@ retryScanAndResolve:
} else {
logutil.Logger(ctx).Info("[gc worker] region has more than limit locks",
zap.String("uuid", w.uuid),
zap.Uint64("region", loc.Region.GetID()),
zap.Uint64("region", locForResolve.Region.GetID()),
zap.Int("scan lock limit", gcScanLockLimit))
metrics.GCRegionTooManyLocksCounter.Inc()
key = locks[len(locks)-1].Key
Expand Down
93 changes: 84 additions & 9 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package gcworker

import (
"bytes"
"context"
"math"
"strconv"
Expand All @@ -23,6 +24,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
Expand All @@ -38,12 +40,17 @@ func TestT(t *testing.T) {
}

type testGCWorkerSuite struct {
store tikv.Storage
cluster *mocktikv.Cluster
oracle *mockoracle.MockOracle
gcWorker *GCWorker
dom *domain.Domain
pdClient pd.Client
store tikv.Storage
cluster *mocktikv.Cluster
oracle *mockoracle.MockOracle
gcWorker *GCWorker
dom *domain.Domain
pdClient pd.Client
initRegion struct {
storeIDs []uint64
peerIDs []uint64
regionID uint64
}
}

var _ = Suite(&testGCWorkerSuite{})
Expand All @@ -52,7 +59,7 @@ func (s *testGCWorkerSuite) SetUpTest(c *C) {
tikv.NewGCHandlerFunc = NewGCWorker

s.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(s.cluster)
s.initRegion.storeIDs, s.initRegion.peerIDs, s.initRegion.regionID, _ = mocktikv.BootstrapWithMultiStores(s.cluster, 3)
store, err := mockstore.NewMockTikvStore(mockstore.WithCluster(s.cluster))

s.store = store.(tikv.Storage)
Expand Down Expand Up @@ -346,7 +353,7 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionCacheMiss(c *C) {
resolveCnt int
resolveCntRef = &resolveCnt
)
s.gcWorker.testingKnobs.scanLocks = func(key []byte) []*tikv.Lock {
s.gcWorker.testingKnobs.scanLocks = func(key []byte, regionID uint64) []*tikv.Lock {
*scanCntRef++
return []*tikv.Lock{
{
Expand All @@ -357,7 +364,7 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionCacheMiss(c *C) {
},
}
}
s.gcWorker.testingKnobs.resolveLocks = func(regionID tikv.RegionVerID) (ok bool, err error) {
s.gcWorker.testingKnobs.resolveLocks = func(locks []*tikv.Lock, regionID tikv.RegionVerID) (ok bool, err error) {
*resolveCntRef++
if *resolveCntRef == 1 {
s.gcWorker.store.GetRegionCache().InvalidateCachedRegion(regionID)
Expand All @@ -372,6 +379,74 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionCacheMiss(c *C) {
c.Assert(scanCnt, Equals, 1)
}

func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionEnlargeCausedByRegionMerge(c *C) {
var (
firstAccess = true
firstAccessRef = &firstAccess
resolvedLock [][]byte
)

// key range: ['' - 'm' - 'z']
region2 := s.cluster.AllocID()
newPeers := []uint64{s.cluster.AllocID(), s.cluster.AllocID(), s.cluster.AllocID()}
s.cluster.Split(s.initRegion.regionID, region2, []byte("m"), newPeers, newPeers[0])

// init a, b lock in region1 and o, p locks in region2
s.gcWorker.testingKnobs.scanLocks = func(key []byte, regionID uint64) []*tikv.Lock {
if regionID == s.initRegion.regionID {
return []*tikv.Lock{{Key: []byte("a")}, {Key: []byte("b")}}
}
if regionID == region2 {
return []*tikv.Lock{{Key: []byte("o")}, {Key: []byte("p")}}
}
return []*tikv.Lock{}
}

s.gcWorker.testingKnobs.resolveLocks = func(locks []*tikv.Lock, regionID tikv.RegionVerID) (ok bool, err error) {
if regionID.GetID() == s.initRegion.regionID && *firstAccessRef {
*firstAccessRef = false
// merge region2 into region1 and return EpochNotMatch error.
mCluster := s.cluster
mCluster.Merge(s.initRegion.regionID, region2)
regionMeta, _ := mCluster.GetRegion(s.initRegion.regionID)
s.store.GetRegionCache().OnRegionEpochNotMatch(
tikv.NewNoopBackoff(context.Background()),
&tikv.RPCContext{Region: regionID, Store: &tikv.Store{}},
[]*metapb.Region{regionMeta})
// also let region1 contains all 4 locks
s.gcWorker.testingKnobs.scanLocks = func(key []byte, regionID uint64) []*tikv.Lock {
if regionID == s.initRegion.regionID {
locks := []*tikv.Lock{
{Key: []byte("a")},
{Key: []byte("b")},
{Key: []byte("o")},
{Key: []byte("p")},
}
for i, lock := range locks {
if bytes.Compare(key, lock.Key) <= 0 {
return locks[i:]
}
}
}
return []*tikv.Lock{}
}
return false, nil
}
for _, lock := range locks {
resolvedLock = append(resolvedLock, lock.Key)
}
return true, nil
}

_, err := s.gcWorker.resolveLocksForRange(context.Background(), 1, []byte(""), []byte("z"))
c.Assert(err, IsNil)
c.Assert(len(resolvedLock), Equals, 4)
expects := [][]byte{[]byte("a"), []byte("b"), []byte("o"), []byte("p")}
for i, l := range resolvedLock {
c.Assert(l, BytesEquals, expects[i])
}
}

func (s *testGCWorkerSuite) TestRunGCJob(c *C) {
gcSafePointCacheInterval = 0
err := RunGCJob(context.Background(), s.store, 0, "mock", 1)
Expand Down

0 comments on commit 0cc3d77

Please sign in to comment.