From 52002aaff69c0f2bfd1172dc57c93109a15736c7 Mon Sep 17 00:00:00 2001 From: Evan Zhou Date: Tue, 9 Jun 2020 11:40:41 +0800 Subject: [PATCH] support sample step option in KvScanRequest (#401) --- go.mod | 2 +- go.sum | 2 ++ tikv/mvcc.go | 29 ++++++++++++++++++++++------- tikv/mvcc_test.go | 34 ++++++++++++++++++++++++++++++++++ tikv/raftstore/snap_builder.go | 4 ++++ 5 files changed, 63 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index fc5b689b..18bfa881 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/pingcap/badger v1.5.1-0.20200604041313-19c397305fcc github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 - github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 + github.com/pingcap/kvproto v0.0.0-20200608081027-d02a6f65e956 github.com/pingcap/log v0.0.0-20200511115504-543df19646ad github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181 github.com/pingcap/tidb v1.1.0-beta.0.20200604055950-efc1c154d098 diff --git a/go.sum b/go.sum index 013dfc40..fdeff357 100644 --- a/go.sum +++ b/go.sum @@ -475,6 +475,8 @@ github.com/pingcap/kvproto v0.0.0-20200428135407-0f5ffe459677 h1:90pbLYmkk7bXLgy github.com/pingcap/kvproto v0.0.0-20200428135407-0f5ffe459677/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29 h1:NpW1OuYrIl+IQrSsVbtyHpHpazmSCHy+ysrOixY0xY4= github.com/pingcap/kvproto v0.0.0-20200518112156-d4aeb467de29/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200608081027-d02a6f65e956 h1:G4ekqFoMUusFnlPglbhifaZKcuJjDgkpdgaDywLL11E= +github.com/pingcap/kvproto v0.0.0-20200608081027-d02a6f65e956/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM= diff --git a/tikv/mvcc.go b/tikv/mvcc.go index a43acf8e..6a0d3849 100644 --- a/tikv/mvcc.go +++ b/tikv/mvcc.go @@ -1239,11 +1239,19 @@ func isResolved(startTS uint64, resolved []uint64) bool { } type kvScanProcessor struct { - buf []byte - pairs []*kvrpcpb.KvPair + buf []byte + pairs []*kvrpcpb.KvPair + sampleStep uint32 + scanCnt uint32 } func (p *kvScanProcessor) Process(key, value []byte) (err error) { + if p.sampleStep > 0 { + p.scanCnt++ + if (p.scanCnt-1)%p.sampleStep != 0 { + return nil + } + } p.pairs = append(p.pairs, &kvrpcpb.KvPair{ Key: safeCopy(key), Value: safeCopy(value), @@ -1274,15 +1282,22 @@ func (store *MVCCStore) Scan(reqCtx *requestCtx, req *kvrpcpb.ScanRequest) []*kv endKey = InternalKeyPrefix } } - lockPairs := store.collectRangeLock(req.GetVersion(), startKey, endKey, req.Context.ResolvedLocks) - - var scanProc = &kvScanProcessor{} + var lockPairs []*kvrpcpb.KvPair + limit := req.GetLimit() + if req.SampleStep == 0 { + lockPairs = store.collectRangeLock(req.GetVersion(), startKey, endKey, req.Context.ResolvedLocks) + } else { + limit = req.SampleStep * limit + } + var scanProc = &kvScanProcessor{ + sampleStep: req.SampleStep, + } reader := reqCtx.getDBReader() var err error if req.Reverse { - err = reader.ReverseScan(startKey, endKey, int(req.GetLimit()), req.GetVersion(), scanProc) + err = reader.ReverseScan(startKey, endKey, int(limit), req.GetVersion(), scanProc) } else { - err = reader.Scan(startKey, endKey, int(req.GetLimit()), req.GetVersion(), scanProc) + err = reader.Scan(startKey, endKey, int(limit), req.GetVersion(), scanProc) } if err != nil { scanProc.pairs = append(scanProc.pairs[:0], &kvrpcpb.KvPair{ diff --git a/tikv/mvcc_test.go b/tikv/mvcc_test.go index 993f712e..e17c8328 100644 --- a/tikv/mvcc_test.go +++ b/tikv/mvcc_test.go @@ -1475,3 +1475,37 @@ func (s *testMvccSuite) TestPessimisticLockForce(c *C) { MustUnLocked(k, store) MustGetVal(k, v2, 13, store) } + +func (s *testMvccSuite) TestScanSampleStep(c *C) { + store, err := NewTestStore("TestScanSampleStep", "TestScanSampleStep", c) + c.Assert(err, IsNil) + defer CleanTestStore(store) + for i := 0; i < 1000; i++ { + k := genScanSampleStepKey(i) + MustPrewritePut(k, k, k, 1, store) + MustCommit(k, 1, 2, store) + } + sampleStep := 10 + scanReq := &kvrpcpb.ScanRequest{ + StartKey: genScanSampleStepKey(100), + EndKey: genScanSampleStepKey(900), + Limit: 100, + Version: 2, + SampleStep: uint32(sampleStep), + } + pairs := store.MvccStore.Scan(store.newReqCtx(), scanReq) + c.Assert(len(pairs), Equals, 80) + for i, pair := range pairs { + c.Assert(genScanSampleStepKey(100+i*sampleStep), BytesEquals, pair.Key) + } + scanReq.Limit = 20 + pairs = store.MvccStore.Scan(store.newReqCtx(), scanReq) + c.Assert(len(pairs), Equals, 20) + for i, pair := range pairs { + c.Assert(genScanSampleStepKey(100+i*sampleStep), BytesEquals, pair.Key) + } +} + +func genScanSampleStepKey(i int) []byte { + return []byte(fmt.Sprintf("t%0.4d", i)) +} diff --git a/tikv/raftstore/snap_builder.go b/tikv/raftstore/snap_builder.go index e7c6e265..c4defa9a 100644 --- a/tikv/raftstore/snap_builder.go +++ b/tikv/raftstore/snap_builder.go @@ -205,6 +205,10 @@ func (b *snapBuilder) addDBEntry() error { if len(val) == 0 { writeType = byte(kvrpcpb.Op_Del) } + if len(meta) == 0 { + // delete range entry. + meta = mvcc.NewDBUserMeta(item.Version(), item.Version()) + } err = b.addSSTKey(b.curDBKey, meta.StartTS(), meta.CommitTS(), val, writeType) if err != nil { return err