Skip to content

Commit

Permalink
lightning: reuse dupDetector to implement in-memory ingest data (#46270)
Browse files Browse the repository at this point in the history
ref #45719
  • Loading branch information
lance6716 authored Aug 23, 2023
1 parent 9466e45 commit 6169cce
Show file tree
Hide file tree
Showing 12 changed files with 615 additions and 109 deletions.
8 changes: 7 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ go_library(
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/backend/local",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/membuf",
"//br/pkg/storage",
"//kv",
"//util/hack",
"//util/logutil",
"//util/mathutil",
"//util/size",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_sync//errgroup",
"@org_uber_go_zap//:zap",
Expand All @@ -47,11 +50,14 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 19,
shard_count = 20,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/backend/local",
"//br/pkg/lightning/common",
"//br/pkg/storage",
"//util/codec",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_golang_x_exp//rand",
Expand Down
202 changes: 202 additions & 0 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
package external

import (
"bytes"
"context"
"encoding/hex"
"sort"

"github.com/cockroachdb/pebble"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -58,3 +63,200 @@ func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIte
}
return iter, nil
}

// MemoryIngestData is the in-memory implementation of IngestData.
type MemoryIngestData struct {
keyAdapter local.KeyAdapter
duplicateDetection bool
duplicateDB *pebble.DB
dupDetectOpt local.DupDetectOpt

keys [][]byte
values [][]byte
ts uint64
}

var _ local.IngestData = (*MemoryIngestData)(nil)

func (m *MemoryIngestData) firstAndLastKeyIndex(lowerBound, upperBound []byte) (int, int) {
firstKeyIdx := 0
if len(lowerBound) > 0 {
lowerBound = m.keyAdapter.Encode(nil, lowerBound, local.MinRowID)
firstKeyIdx = sort.Search(len(m.keys), func(i int) bool {
return bytes.Compare(lowerBound, m.keys[i]) <= 0
})
if firstKeyIdx == len(m.keys) {
return -1, -1
}
}

lastKeyIdx := len(m.keys) - 1
if len(upperBound) > 0 {
upperBound = m.keyAdapter.Encode(nil, upperBound, local.MinRowID)
i := sort.Search(len(m.keys), func(i int) bool {
reverseIdx := len(m.keys) - 1 - i
return bytes.Compare(upperBound, m.keys[reverseIdx]) > 0
})
if i == len(m.keys) {
// should not happen
return -1, -1
}
lastKeyIdx = len(m.keys) - 1 - i
}
return firstKeyIdx, lastKeyIdx
}

// GetFirstAndLastKey implements IngestData.GetFirstAndLastKey.
func (m *MemoryIngestData) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) {
firstKeyIdx, lastKeyIdx := m.firstAndLastKeyIndex(lowerBound, upperBound)
if firstKeyIdx < 0 || firstKeyIdx > lastKeyIdx {
return nil, nil, nil
}
firstKey, err := m.keyAdapter.Decode(nil, m.keys[firstKeyIdx])
if err != nil {
return nil, nil, err
}
lastKey, err := m.keyAdapter.Decode(nil, m.keys[lastKeyIdx])
if err != nil {
return nil, nil, err
}
return firstKey, lastKey, nil
}

type memoryDataIter struct {
keys [][]byte
values [][]byte

firstKeyIdx int
lastKeyIdx int
curIdx int
}

// First implements ForwardIter.
func (m *memoryDataIter) First() bool {
if m.firstKeyIdx < 0 {
return false
}
m.curIdx = m.firstKeyIdx
return true
}

// Valid implements ForwardIter.
func (m *memoryDataIter) Valid() bool {
return m.firstKeyIdx <= m.curIdx && m.curIdx <= m.lastKeyIdx
}

// Next implements ForwardIter.
func (m *memoryDataIter) Next() bool {
m.curIdx++
return m.Valid()
}

// Key implements ForwardIter.
func (m *memoryDataIter) Key() []byte {
return m.keys[m.curIdx]
}

// Value implements ForwardIter.
func (m *memoryDataIter) Value() []byte {
return m.values[m.curIdx]
}

// Close implements ForwardIter.
func (m *memoryDataIter) Close() error {
return nil
}

// Error implements ForwardIter.
func (m *memoryDataIter) Error() error {
return nil
}

type memoryDataDupDetectIter struct {
iter *memoryDataIter
dupDetector *local.DupDetector
err error
curKey, curVal []byte
}

// First implements ForwardIter.
func (m *memoryDataDupDetectIter) First() bool {
if m.err != nil || !m.iter.First() {
return false
}
m.curKey, m.curVal, m.err = m.dupDetector.Init(m.iter)
return m.Valid()
}

// Valid implements ForwardIter.
func (m *memoryDataDupDetectIter) Valid() bool {
return m.err == nil && m.iter.Valid()
}

// Next implements ForwardIter.
func (m *memoryDataDupDetectIter) Next() bool {
if m.err != nil {
return false
}
key, val, ok, err := m.dupDetector.Next(m.iter)
if err != nil {
m.err = err
return false
}
if !ok {
return false
}
m.curKey, m.curVal = key, val
return true
}

// Key implements ForwardIter.
func (m *memoryDataDupDetectIter) Key() []byte {
return m.curKey
}

// Value implements ForwardIter.
func (m *memoryDataDupDetectIter) Value() []byte {
return m.curVal
}

// Close implements ForwardIter.
func (m *memoryDataDupDetectIter) Close() error {
return m.dupDetector.Close()
}

// Error implements ForwardIter.
func (m *memoryDataDupDetectIter) Error() error {
return m.err
}

// NewIter implements IngestData.NewIter.
func (m *MemoryIngestData) NewIter(ctx context.Context, lowerBound, upperBound []byte) local.ForwardIter {
firstKeyIdx, lastKeyIdx := m.firstAndLastKeyIndex(lowerBound, upperBound)
iter := &memoryDataIter{
keys: m.keys,
values: m.values,
firstKeyIdx: firstKeyIdx,
lastKeyIdx: lastKeyIdx,
}
if !m.duplicateDetection {
return iter
}
logger := log.FromContext(ctx)
detector := local.NewDupDetector(m.keyAdapter, m.duplicateDB.NewBatch(), logger, m.dupDetectOpt)
return &memoryDataDupDetectIter{
iter: iter,
dupDetector: detector,
}
}

// GetTS implements IngestData.GetTS.
func (m *MemoryIngestData) GetTS() uint64 {
return m.ts
}

// Finish implements IngestData.Finish.
func (m *MemoryIngestData) Finish(totalBytes, totalCount int64) {
//TODO implement me
panic("implement me")
}
Loading

0 comments on commit 6169cce

Please sign in to comment.