diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index a4d7b4f58e4e6..c0f7caa4a9d01 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -423,6 +423,9 @@ func txnLockTTL(startTime time.Time, txnSize int) uint64 { return lockTTL + uint64(elapsed) } +var preSplitDetectThreshold uint32 = 100000 +var preSplitSizeThreshold uint32 = 32 << 20 + // doActionOnMutations groups keys into primary batch and secondary batches, if primary batch exists in the key, // it does action on primary batch first, then on secondary batches. If action is commit, secondary batches // is done in background goroutine. @@ -435,6 +438,68 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo return errors.Trace(err) } + // Pre-split regions to avoid too much write workload into a single region. + // In the large transaction case, this operation is important to avoid TiKV 'server is busy' error. + var preSplited bool + preSplitDetectThresholdVal := atomic.LoadUint32(&preSplitDetectThreshold) + for _, group := range groups { + if uint32(group.mutations.len()) >= preSplitDetectThresholdVal { + logutil.BgLogger().Info("2PC detect large amount of mutations on a single region", + zap.Uint64("region", group.region.GetID()), + zap.Int("mutations count", group.mutations.len())) + // Use context.Background, this time should not add up to Backoffer. + if preSplitAndScatterIn2PC(context.Background(), c.store, group) { + preSplited = true + } + } + } + // Reload region cache again. + if preSplited { + groups, err = c.store.regionCache.GroupSortedMutationsByRegion(bo, mutations) + if err != nil { + return errors.Trace(err) + } + } + + return c.doActionOnGroupMutations(bo, action, groups) +} + +func preSplitAndScatterIn2PC(ctx context.Context, store *tikvStore, group groupedMutations) bool { + length := group.mutations.len() + splitKeys := make([][]byte, 0, 4) + + preSplitSizeThresholdVal := atomic.LoadUint32(&preSplitSizeThreshold) + regionSize := 0 + for i := 0; i < length; i++ { + regionSize = regionSize + len(group.mutations.keys[i]) + len(group.mutations.values[i]) + // The second condition is used for testing. + if regionSize >= int(preSplitSizeThresholdVal) { + regionSize = 0 + splitKeys = append(splitKeys, group.mutations.keys[i]) + } + } + if len(splitKeys) == 0 { + return false + } + + regionIDs, err := store.SplitRegions(ctx, splitKeys, true) + if err != nil { + logutil.BgLogger().Warn("2PC split regions failed", zap.Uint64("regionID", group.region.id), zap.Int("keys count", length), zap.Error(err)) + return false + } + + for _, regionID := range regionIDs { + err := store.WaitScatterRegionFinish(regionID, 0) + if err != nil { + logutil.BgLogger().Warn("2PC wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err)) + } + } + // Invalidate the old region cache information. + store.regionCache.InvalidateCachedRegion(group.region) + return true +} + +func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPhaseCommitAction, groups []groupedMutations) error { action.tiKVTxnRegionsNumHistogram().Observe(float64(len(groups))) var batches []batchMutations @@ -467,6 +532,7 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo _, actionIsCleanup := action.(actionCleanup) _, actionIsPessimiticLock := action.(actionPessimisticLock) + var err error failpoint.Inject("skipKeyReturnOK", func(val failpoint.Value) { valStr, ok := val.(string) if ok && c.connID > 0 { diff --git a/store/tikv/2pc_slow_test.go b/store/tikv/2pc_slow_test.go index bf96a24485ac3..c7a70a188306e 100644 --- a/store/tikv/2pc_slow_test.go +++ b/store/tikv/2pc_slow_test.go @@ -15,7 +15,12 @@ package tikv -import . "github.com/pingcap/check" +import ( + "context" + "sync/atomic" + + . "github.com/pingcap/check" +) // TestCommitMultipleRegions tests commit multiple regions. // The test takes too long under the race detector. @@ -35,3 +40,38 @@ func (s *testCommitterSuite) TestCommitMultipleRegions(c *C) { } s.mustCommit(c, m) } + +func (s *testTiclientSuite) TestSplitRegionIn2PC(c *C) { + const preSplitThresholdInTest = 500 + old := atomic.LoadUint32(&preSplitDetectThreshold) + defer atomic.StoreUint32(&preSplitDetectThreshold, old) + atomic.StoreUint32(&preSplitDetectThreshold, preSplitThresholdInTest) + + old = atomic.LoadUint32(&preSplitSizeThreshold) + defer atomic.StoreUint32(&preSplitSizeThreshold, old) + atomic.StoreUint32(&preSplitSizeThreshold, 5000) + + bo := NewBackoffer(context.Background(), 1) + startKey := encodeKey(s.prefix, s08d("key", 0)) + endKey := encodeKey(s.prefix, s08d("key", preSplitThresholdInTest)) + checkKeyRegion := func(bo *Backoffer, start, end []byte, checker Checker) { + // Check regions after split. + loc1, err := s.store.regionCache.LocateKey(bo, start) + c.Assert(err, IsNil) + loc2, err := s.store.regionCache.LocateKey(bo, end) + c.Assert(err, IsNil) + c.Assert(loc1.Region.id, checker, loc2.Region.id) + } + + // Check before test. + checkKeyRegion(bo, startKey, endKey, Equals) + txn := s.beginTxn(c) + for i := 0; i < preSplitThresholdInTest; i++ { + err := txn.Set(encodeKey(s.prefix, s08d("key", i)), valueBytes(i)) + c.Assert(err, IsNil) + } + err := txn.Commit(context.Background()) + c.Assert(err, IsNil) + // Check region split after test. + checkKeyRegion(bo, startKey, endKey, Not(Equals)) +}