Skip to content

Commit

Permalink
support sample step option in KvScanRequest (#401)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Jun 9, 2020
1 parent d8e9dc0 commit 52002aa
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 8 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/pingcap/badger v1.5.1-0.20200604041313-19c397305fcc
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29
github.com/pingcap/kvproto v0.0.0-20200608081027-d02a6f65e956
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181
github.com/pingcap/tidb v1.1.0-beta.0.20200604055950-efc1c154d098
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ github.com/pingcap/kvproto v0.0.0-20200428135407-0f5ffe459677 h1:90pbLYmkk7bXLgy
github.com/pingcap/kvproto v0.0.0-20200428135407-0f5ffe459677/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 h1:NpW1OuYrIl+IQrSsVbtyHpHpazmSCHy+ysrOixY0xY4=
github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20200608081027-d02a6f65e956 h1:G4ekqFoMUusFnlPglbhifaZKcuJjDgkpdgaDywLL11E=
github.com/pingcap/kvproto v0.0.0-20200608081027-d02a6f65e956/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM=
Expand Down
29 changes: 22 additions & 7 deletions tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,11 +1239,19 @@ func isResolved(startTS uint64, resolved []uint64) bool {
}

type kvScanProcessor struct {
buf []byte
pairs []*kvrpcpb.KvPair
buf []byte
pairs []*kvrpcpb.KvPair
sampleStep uint32
scanCnt uint32
}

func (p *kvScanProcessor) Process(key, value []byte) (err error) {
if p.sampleStep > 0 {
p.scanCnt++
if (p.scanCnt-1)%p.sampleStep != 0 {
return nil
}
}
p.pairs = append(p.pairs, &kvrpcpb.KvPair{
Key: safeCopy(key),
Value: safeCopy(value),
Expand Down Expand Up @@ -1274,15 +1282,22 @@ func (store *MVCCStore) Scan(reqCtx *requestCtx, req *kvrpcpb.ScanRequest) []*kv
endKey = InternalKeyPrefix
}
}
lockPairs := store.collectRangeLock(req.GetVersion(), startKey, endKey, req.Context.ResolvedLocks)

var scanProc = &kvScanProcessor{}
var lockPairs []*kvrpcpb.KvPair
limit := req.GetLimit()
if req.SampleStep == 0 {
lockPairs = store.collectRangeLock(req.GetVersion(), startKey, endKey, req.Context.ResolvedLocks)
} else {
limit = req.SampleStep * limit
}
var scanProc = &kvScanProcessor{
sampleStep: req.SampleStep,
}
reader := reqCtx.getDBReader()
var err error
if req.Reverse {
err = reader.ReverseScan(startKey, endKey, int(req.GetLimit()), req.GetVersion(), scanProc)
err = reader.ReverseScan(startKey, endKey, int(limit), req.GetVersion(), scanProc)
} else {
err = reader.Scan(startKey, endKey, int(req.GetLimit()), req.GetVersion(), scanProc)
err = reader.Scan(startKey, endKey, int(limit), req.GetVersion(), scanProc)
}
if err != nil {
scanProc.pairs = append(scanProc.pairs[:0], &kvrpcpb.KvPair{
Expand Down
34 changes: 34 additions & 0 deletions tikv/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1475,3 +1475,37 @@ func (s *testMvccSuite) TestPessimisticLockForce(c *C) {
MustUnLocked(k, store)
MustGetVal(k, v2, 13, store)
}

func (s *testMvccSuite) TestScanSampleStep(c *C) {
store, err := NewTestStore("TestScanSampleStep", "TestScanSampleStep", c)
c.Assert(err, IsNil)
defer CleanTestStore(store)
for i := 0; i < 1000; i++ {
k := genScanSampleStepKey(i)
MustPrewritePut(k, k, k, 1, store)
MustCommit(k, 1, 2, store)
}
sampleStep := 10
scanReq := &kvrpcpb.ScanRequest{
StartKey: genScanSampleStepKey(100),
EndKey: genScanSampleStepKey(900),
Limit: 100,
Version: 2,
SampleStep: uint32(sampleStep),
}
pairs := store.MvccStore.Scan(store.newReqCtx(), scanReq)
c.Assert(len(pairs), Equals, 80)
for i, pair := range pairs {
c.Assert(genScanSampleStepKey(100+i*sampleStep), BytesEquals, pair.Key)
}
scanReq.Limit = 20
pairs = store.MvccStore.Scan(store.newReqCtx(), scanReq)
c.Assert(len(pairs), Equals, 20)
for i, pair := range pairs {
c.Assert(genScanSampleStepKey(100+i*sampleStep), BytesEquals, pair.Key)
}
}

func genScanSampleStepKey(i int) []byte {
return []byte(fmt.Sprintf("t%0.4d", i))
}
4 changes: 4 additions & 0 deletions tikv/raftstore/snap_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ func (b *snapBuilder) addDBEntry() error {
if len(val) == 0 {
writeType = byte(kvrpcpb.Op_Del)
}
if len(meta) == 0 {
// delete range entry.
meta = mvcc.NewDBUserMeta(item.Version(), item.Version())
}
err = b.addSSTKey(b.curDBKey, meta.StartTS(), meta.CommitTS(), val, writeType)
if err != nil {
return err
Expand Down

0 comments on commit 52002aa

Please sign in to comment.