diff --git a/store/tikv/client_collapse.go b/store/tikv/client_collapse.go index 5b0386ef70bfa..5fc99420c0012 100644 --- a/store/tikv/client_collapse.go +++ b/store/tikv/client_collapse.go @@ -57,6 +57,10 @@ func (r reqCollapse) tryCollapseRequest(ctx context.Context, addr string, req *t // can not collapse resolve lock lite return } + if len(resolveLock.TxnInfos) > 0 { + // can not collapse batch resolve locks which is only used by GC worker. + return + } canCollapse = true key := strconv.FormatUint(resolveLock.Context.RegionId, 10) + "-" + strconv.FormatUint(resolveLock.StartVersion, 10) resp, err = r.collapse(ctx, key, &resolveRegionSf, addr, req, timeout) diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index 6e1184c5c3624..7d2bde13a075b 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -16,11 +16,14 @@ package tikv import ( "context" "fmt" + "sync" "testing" "time" . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -127,3 +130,102 @@ func (s *testClientSuite) TestSendWhenReconnect(c *C) { conn.Close() server.Stop() } + +// chanClient sends received requests to the channel. +type chanClient struct { + wg *sync.WaitGroup + ch chan<- *tikvrpc.Request +} + +func (c *chanClient) Close() error { + return nil +} + +func (c *chanClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + c.wg.Wait() + c.ch <- req + return nil, nil +} + +func (s *testClientSuite) TestCollapseResolveLock(c *C) { + buildResolveLockReq := func(regionID uint64, startTS uint64, commitTS uint64, keys [][]byte) *tikvrpc.Request { + region := &metapb.Region{Id: regionID} + req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{ + StartVersion: startTS, + CommitVersion: commitTS, + Keys: keys, + }) + tikvrpc.SetContext(req, region, nil) + return req + } + buildBatchResolveLockReq := func(regionID uint64, txnInfos []*kvrpcpb.TxnInfo) *tikvrpc.Request { + region := &metapb.Region{Id: regionID} + req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{ + TxnInfos: txnInfos, + }) + tikvrpc.SetContext(req, region, nil) + return req + } + + var wg sync.WaitGroup + reqCh := make(chan *tikvrpc.Request) + client := reqCollapse{&chanClient{wg: &wg, ch: reqCh}} + ctx := context.Background() + + // Collapse ResolveLock. + resolveLockReq := buildResolveLockReq(1, 10, 20, nil) + wg.Add(1) + go client.SendRequest(ctx, "", resolveLockReq, time.Second) + go client.SendRequest(ctx, "", resolveLockReq, time.Second) + time.Sleep(300 * time.Millisecond) + wg.Done() + req := <-reqCh + c.Assert(*req, DeepEquals, *resolveLockReq) + select { + case <-reqCh: + c.Fatal("fail to collapse ResolveLock") + default: + } + + // Don't collapse ResolveLockLite. + resolveLockLiteReq := buildResolveLockReq(1, 10, 20, [][]byte{[]byte("foo")}) + wg.Add(1) + go client.SendRequest(ctx, "", resolveLockLiteReq, time.Second) + go client.SendRequest(ctx, "", resolveLockLiteReq, time.Second) + time.Sleep(300 * time.Millisecond) + wg.Done() + for i := 0; i < 2; i++ { + req := <-reqCh + c.Assert(*req, DeepEquals, *resolveLockLiteReq) + } + + // Don't collapse BatchResolveLock. + batchResolveLockReq := buildBatchResolveLockReq(1, []*kvrpcpb.TxnInfo{ + {Txn: 10, Status: 20}, + }) + wg.Add(1) + go client.SendRequest(ctx, "", batchResolveLockReq, time.Second) + go client.SendRequest(ctx, "", batchResolveLockReq, time.Second) + time.Sleep(300 * time.Millisecond) + wg.Done() + for i := 0; i < 2; i++ { + req := <-reqCh + c.Assert(*req, DeepEquals, *batchResolveLockReq) + } + + // Mixed + wg.Add(1) + go client.SendRequest(ctx, "", resolveLockReq, time.Second) + go client.SendRequest(ctx, "", resolveLockLiteReq, time.Second) + go client.SendRequest(ctx, "", batchResolveLockReq, time.Second) + time.Sleep(300 * time.Millisecond) + wg.Done() + for i := 0; i < 3; i++ { + <-reqCh + } + select { + case <-reqCh: + c.Fatal("unexpected request") + default: + } +}