Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into issue46197
Browse files Browse the repository at this point in the history
  • Loading branch information
time-and-fate committed Aug 22, 2023
2 parents 03b6c32 + cb248a1 commit b825a6d
Show file tree
Hide file tree
Showing 228 changed files with 3,938 additions and 5,163 deletions.
21 changes: 21 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,27 @@ bazel_importintotest: failpoint-enable bazel_ci_simple_prepare
-- //tests/realtikvtest/importintotest/...
./build/jenkins_collect_coverage.sh

# on timeout, bazel won't print log sometimes, so we use --test_output=all to print log always
bazel_importintotest2: failpoint-enable bazel_ci_simple_prepare
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --test_output=all --test_arg=-with-real-tikv --define gotags=deadlock,intest \
--@io_bazel_rules_go//go/config:cover_format=go_cover \
-- //tests/realtikvtest/importintotest2/...
./build/jenkins_collect_coverage.sh

# on timeout, bazel won't print log sometimes, so we use --test_output=all to print log always
bazel_importintotest3: failpoint-enable bazel_ci_simple_prepare
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --test_output=all --test_arg=-with-real-tikv --define gotags=deadlock,intest \
--@io_bazel_rules_go//go/config:cover_format=go_cover \
-- //tests/realtikvtest/importintotest3/...
./build/jenkins_collect_coverage.sh

# on timeout, bazel won't print log sometimes, so we use --test_output=all to print log always
bazel_importintotest4: failpoint-enable bazel_ci_simple_prepare
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --test_output=all --test_arg=-with-real-tikv --define gotags=deadlock,intest \
--@io_bazel_rules_go//go/config:cover_format=go_cover \
-- //tests/realtikvtest/importintotest4/...
./build/jenkins_collect_coverage.sh

bazel_lint: bazel_prepare
bazel build //... --//build:with_nogo_flag=true

Expand Down
18 changes: 16 additions & 2 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
}

retRanges := make([]kv.KeyRange, 0, 1+len(tbl.Indices))
kvRanges, err := distsql.TableHandleRangesToKVRanges(nil, []int64{tblID}, tbl.IsCommonHandle, ranges, nil)
kvRanges, err := distsql.TableHandleRangesToKVRanges(nil, []int64{tblID}, tbl.IsCommonHandle, ranges)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -482,7 +482,7 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
continue
}
ranges = ranger.FullRange()
idxRanges, err := distsql.IndexRangesToKVRanges(nil, tblID, index.ID, ranges, nil)
idxRanges, err := distsql.IndexRangesToKVRanges(nil, tblID, index.ID, ranges)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -633,6 +633,20 @@ func BuildBackupSchemas(
default:
if tableInfo.SepAutoInc() {
globalAutoID, err = autoIDAccess.IncrementID(tableInfo.Version).Get()
// For a nonclustered table with auto_increment column, both auto_increment_id and _tidb_rowid are required.
// See also https://github.com/pingcap/tidb/issues/46093
if rowID, err1 := autoIDAccess.RowID().Get(); err1 == nil {
tableInfo.AutoIncIDExtra = rowID + 1
} else {
// It is possible that the rowid meta key does not exist (i.e. table have auto_increment_id but no _rowid),
// so err1 != nil might be expected.
if globalAutoID == 0 {
// When both auto_increment_id and _rowid are missing, it must be something wrong.
return errors.Trace(err1)
}
// Print a warning in other scenes, should it be a INFO log?
log.Warn("get rowid error", zap.Error(err1))
}
} else {
globalAutoID, err = autoIDAccess.RowID().Get()
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func buildTableRequest(
var builder distsql.RequestBuilder
// Use low priority to reducing impact to other requests.
builder.Request.Priority = kv.PriorityLow
return builder.SetHandleRanges(nil, tableID, tableInfo.IsCommonHandle, ranges, nil).
return builder.SetHandleRanges(nil, tableID, tableInfo.IsCommonHandle, ranges).
SetStartTS(startTS).
SetChecksumRequest(checksum).
SetConcurrency(int(concurrency)).
Expand Down
8 changes: 6 additions & 2 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ go_library(
srcs = [
"byte_reader.go",
"codec.go",
"engine.go",
"file.go",
"iter.go",
"kv_reader.go",
"stat_reader.go",
"util.go",
"writer.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external",
Expand All @@ -21,6 +23,7 @@ go_library(
"//br/pkg/membuf",
"//br/pkg/storage",
"//kv",
"//util/hack",
"//util/logutil",
"//util/mathutil",
"//util/size",
Expand All @@ -36,21 +39,22 @@ go_test(
srcs = [
"byte_reader_test.go",
"codec_test.go",
"engine_test.go",
"file_test.go",
"iter_test.go",
"util_test.go",
"writer_test.go",
],
embed = [":external"],
flaky = True,
shard_count = 15,
shard_count = 19,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/storage",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_golang_x_exp//rand",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
],
)
60 changes: 60 additions & 0 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"context"
"encoding/hex"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// Engine stored sorted key/value pairs in an external storage.
type Engine struct {
storage storage.ExternalStorage
dataFiles []string
statsFiles []string
}

func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIter, error) {
logger := logutil.Logger(ctx)

var offsets []uint64
if len(e.statsFiles) == 0 {
offsets = make([]uint64, len(e.dataFiles))
logger.Info("no stats files",
zap.String("startKey", hex.EncodeToString(start)))
} else {
offs, err := seekPropsOffsets(ctx, start, e.statsFiles, e.storage)
if err != nil {
return nil, errors.Trace(err)
}
offsets = offs
logger.Info("seek props offsets",
zap.Uint64s("offsets", offsets),
zap.String("startKey", hex.EncodeToString(start)),
zap.Strings("dataFiles", prettyFileNames(e.dataFiles)),
zap.Strings("statsFiles", prettyFileNames(e.statsFiles)))
}
iter, err := NewMergeKVIter(ctx, e.dataFiles, offsets, e.storage, 64*1024)
if err != nil {
return nil, errors.Trace(err)
}
return iter, nil
}
109 changes: 109 additions & 0 deletions br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"bytes"
"context"
"slices"
"testing"
"time"

"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)

func TestIter(t *testing.T) {
seed := time.Now().Unix()
rand.Seed(uint64(seed))
t.Logf("seed: %d", seed)

totalKV := 300
kvPairs := make([]common.KvPair, totalKV)
for i := range kvPairs {
keyBuf := make([]byte, rand.Intn(10)+1)
rand.Read(keyBuf)
// make sure the key is unique
kvPairs[i].Key = append(keyBuf, byte(i/255), byte(i%255))
valBuf := make([]byte, rand.Intn(10)+1)
rand.Read(valBuf)
kvPairs[i].Val = valBuf
}

sortedKVPairs := make([]common.KvPair, totalKV)
copy(sortedKVPairs, kvPairs)
slices.SortFunc(sortedKVPairs, func(i, j common.KvPair) int {
return bytes.Compare(i.Key, j.Key)
})

ctx := context.Background()
store := storage.NewMemStorage()

for i := 0; i < 3; i++ {
w := NewWriterBuilder().
SetMemorySizeLimit(uint64(rand.Intn(100)+1)).
SetPropSizeDistance(uint64(rand.Intn(50)+1)).
SetPropKeysDistance(uint64(rand.Intn(10)+1)).
Build(store, i, "/subtask")
kvStart := i * 100
kvEnd := (i + 1) * 100
err := w.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs[kvStart:kvEnd]))
require.NoError(t, err)
_, err = w.Close(ctx)
require.NoError(t, err)
}

dataFiles, statFiles, err := GetAllFileNames(ctx, store, "/subtask")
require.NoError(t, err)

engine := Engine{
storage: store,
dataFiles: dataFiles,
statsFiles: statFiles,
}
iter, err := engine.createMergeIter(ctx, sortedKVPairs[0].Key)
require.NoError(t, err)
got := make([]common.KvPair, 0, totalKV)
for iter.Next() {
got = append(got, common.KvPair{
Key: iter.Key(),
Val: iter.Value(),
})
}
require.NoError(t, iter.Error())
require.Equal(t, sortedKVPairs, got)

pickStartIdx := rand.Intn(len(sortedKVPairs))
startKey := sortedKVPairs[pickStartIdx].Key
iter, err = engine.createMergeIter(ctx, startKey)
require.NoError(t, err)
got = make([]common.KvPair, 0, totalKV)
for iter.Next() {
got = append(got, common.KvPair{
Key: iter.Key(),
Val: iter.Value(),
})
}
require.NoError(t, iter.Error())
// got keys must be ascending
for i := 1; i < len(got); i++ {
require.True(t, bytes.Compare(got[i-1].Key, got[i].Key) < 0)
}
// the first key must be less than or equal to startKey
require.True(t, bytes.Compare(got[0].Key, startKey) <= 0)
}
78 changes: 1 addition & 77 deletions br/pkg/lightning/backend/external/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ package external
import (
"context"
"encoding/binary"
"path/filepath"
"strconv"
"strings"

"github.com/pingcap/tidb/br/pkg/storage"
)
Expand Down Expand Up @@ -117,77 +114,4 @@ func (s *KeyValueStore) Close() {
}
}

var statSuffix = filepath.Join("_stat", "0")

// GetAllFileNames returns a FilePathHandle that contains all data file paths
// and a slice of stat file paths.
func GetAllFileNames(
ctx context.Context,
store storage.ExternalStorage,
subDir string,
) (FilePathHandle, []string, error) {
var dataFilePaths FilePathHandle
var stats []string

err := store.WalkDir(ctx,
&storage.WalkOption{SubDir: subDir},
func(path string, size int64) error {
if strings.HasSuffix(path, statSuffix) {
stats = append(stats, path)
} else {
dir, file := filepath.Split(path)
writerID, err := strconv.Atoi(filepath.Base(dir))
if err != nil {
return err
}
seq, err := strconv.Atoi(file)
if err != nil {
return err
}
dataFilePaths.set(writerID, seq, path)
}
return nil
})
if err != nil {
return dataFilePaths, nil, err
}
return dataFilePaths, stats, nil
}

// FilePathHandle handles data file paths under a prefix path.
type FilePathHandle struct {
paths [][]string
}

func (p *FilePathHandle) set(writerID, seq int, path string) {
if writerID >= len(p.paths) {
p.paths = append(p.paths, make([][]string, writerID-len(p.paths)+1)...)
}
if seq >= len(p.paths[writerID]) {
p.paths[writerID] = append(p.paths[writerID], make([]string, seq-len(p.paths[writerID])+1)...)
}
p.paths[writerID][seq] = path
}

// Get returns the path of the data file with the given writerID and seq.
func (p *FilePathHandle) Get(writerID, seq int) string {
return p.paths[writerID][seq]
}

// ForEach applies the given function to each data file path.
func (p *FilePathHandle) ForEach(f func(writerID, seq int, path string)) {
for writerID, paths := range p.paths {
for seq, path := range paths {
f(writerID, seq, path)
}
}
}

// FlatSlice returns a flat slice of all data file paths.
func (p *FilePathHandle) FlatSlice() []string {
var paths []string
p.ForEach(func(writerID, seq int, path string) {
paths = append(paths, path)
})
return paths
}
const statSuffix = "_stat"
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/external/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ func (i *MergePropIter) prop() *rangeProperty {
return i.iter.curr
}

func (i *MergePropIter) readerIndex() int {
return i.iter.lastReaderIdx
}

// Close closes the iterator.
func (i *MergePropIter) Close() error {
return i.iter.close()
Expand Down
Loading

0 comments on commit b825a6d

Please sign in to comment.