Skip to content

Commit

Permalink
tikv: don't collapse batch resolve locks (#17025) (#17033)
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored May 8, 2020
1 parent a8a460b commit 1c6b36d
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 0 deletions.
4 changes: 4 additions & 0 deletions store/tikv/client_collapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (r reqCollapse) tryCollapseRequest(ctx context.Context, addr string, req *t
// can not collapse resolve lock lite
return
}
if len(resolveLock.TxnInfos) > 0 {
// can not collapse batch resolve locks which is only used by GC worker.
return
}
canCollapse = true
key := strconv.FormatUint(resolveLock.Context.RegionId, 10) + "-" + strconv.FormatUint(resolveLock.StartVersion, 10)
resp, err = r.collapse(ctx, key, &resolveRegionSf, addr, req, timeout)
Expand Down
102 changes: 102 additions & 0 deletions store/tikv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ package tikv
import (
"context"
"fmt"
"sync"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand Down Expand Up @@ -127,3 +130,102 @@ func (s *testClientSuite) TestSendWhenReconnect(c *C) {
conn.Close()
server.Stop()
}

// chanClient sends received requests to the channel.
type chanClient struct {
wg *sync.WaitGroup
ch chan<- *tikvrpc.Request
}

func (c *chanClient) Close() error {
return nil
}

func (c *chanClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
c.wg.Wait()
c.ch <- req
return nil, nil
}

func (s *testClientSuite) TestCollapseResolveLock(c *C) {
buildResolveLockReq := func(regionID uint64, startTS uint64, commitTS uint64, keys [][]byte) *tikvrpc.Request {
region := &metapb.Region{Id: regionID}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{
StartVersion: startTS,
CommitVersion: commitTS,
Keys: keys,
})
tikvrpc.SetContext(req, region, nil)
return req
}
buildBatchResolveLockReq := func(regionID uint64, txnInfos []*kvrpcpb.TxnInfo) *tikvrpc.Request {
region := &metapb.Region{Id: regionID}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{
TxnInfos: txnInfos,
})
tikvrpc.SetContext(req, region, nil)
return req
}

var wg sync.WaitGroup
reqCh := make(chan *tikvrpc.Request)
client := reqCollapse{&chanClient{wg: &wg, ch: reqCh}}
ctx := context.Background()

// Collapse ResolveLock.
resolveLockReq := buildResolveLockReq(1, 10, 20, nil)
wg.Add(1)
go client.SendRequest(ctx, "", resolveLockReq, time.Second)
go client.SendRequest(ctx, "", resolveLockReq, time.Second)
time.Sleep(300 * time.Millisecond)
wg.Done()
req := <-reqCh
c.Assert(*req, DeepEquals, *resolveLockReq)
select {
case <-reqCh:
c.Fatal("fail to collapse ResolveLock")
default:
}

// Don't collapse ResolveLockLite.
resolveLockLiteReq := buildResolveLockReq(1, 10, 20, [][]byte{[]byte("foo")})
wg.Add(1)
go client.SendRequest(ctx, "", resolveLockLiteReq, time.Second)
go client.SendRequest(ctx, "", resolveLockLiteReq, time.Second)
time.Sleep(300 * time.Millisecond)
wg.Done()
for i := 0; i < 2; i++ {
req := <-reqCh
c.Assert(*req, DeepEquals, *resolveLockLiteReq)
}

// Don't collapse BatchResolveLock.
batchResolveLockReq := buildBatchResolveLockReq(1, []*kvrpcpb.TxnInfo{
{Txn: 10, Status: 20},
})
wg.Add(1)
go client.SendRequest(ctx, "", batchResolveLockReq, time.Second)
go client.SendRequest(ctx, "", batchResolveLockReq, time.Second)
time.Sleep(300 * time.Millisecond)
wg.Done()
for i := 0; i < 2; i++ {
req := <-reqCh
c.Assert(*req, DeepEquals, *batchResolveLockReq)
}

// Mixed
wg.Add(1)
go client.SendRequest(ctx, "", resolveLockReq, time.Second)
go client.SendRequest(ctx, "", resolveLockLiteReq, time.Second)
go client.SendRequest(ctx, "", batchResolveLockReq, time.Second)
time.Sleep(300 * time.Millisecond)
wg.Done()
for i := 0; i < 3; i++ {
<-reqCh
}
select {
case <-reqCh:
c.Fatal("unexpected request")
default:
}
}

0 comments on commit 1c6b36d

Please sign in to comment.