Skip to content

Commit

Permalink
Merge branch 'master' into supplyCasesForPCParamLimit
Browse files Browse the repository at this point in the history
  • Loading branch information
fzzf678 authored Feb 1, 2023
2 parents afb3be2 + bdc2f1b commit 95bfcd8
Show file tree
Hide file tree
Showing 79 changed files with 1,051 additions and 337 deletions.
20 changes: 10 additions & 10 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,8 @@ def go_deps():
name = "com_github_chavacava_garif",
build_file_proto_mode = "disable",
importpath = "github.com/chavacava/garif",
sum = "h1:E7LT642ysztPWE0dfz43cWOvMiF42DyTRC+eZIaO4yI=",
version = "v0.0.0-20220630083739-93517212f375",
sum = "h1:cy5GCEZLUCshCGCRRUjxHrDUqkB4l5cuUt3ShEckQEo=",
version = "v0.0.0-20221024190013-b3ef35877348",
)

go_repository(
Expand Down Expand Up @@ -1040,8 +1040,8 @@ def go_deps():
name = "com_github_fatih_color",
build_file_proto_mode = "disable_global",
importpath = "github.com/fatih/color",
sum = "h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=",
version = "v1.13.0",
sum = "h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w=",
version = "v1.14.1",
)
go_repository(
name = "com_github_fatih_structs",
Expand Down Expand Up @@ -2763,8 +2763,8 @@ def go_deps():
name = "com_github_mattn_go_isatty",
build_file_proto_mode = "disable_global",
importpath = "github.com/mattn/go-isatty",
sum = "h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=",
version = "v0.0.16",
sum = "h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=",
version = "v0.0.17",
)
go_repository(
name = "com_github_mattn_go_runewidth",
Expand Down Expand Up @@ -2844,8 +2844,8 @@ def go_deps():
name = "com_github_mgechev_revive",
build_file_proto_mode = "disable",
importpath = "github.com/mgechev/revive",
sum = "h1:+2Hd/S8oO2H0Ikq2+egtNwQsVhAeELHjxjIUFX5ajLI=",
version = "v1.2.4",
sum = "h1:UF9AR8pOAuwNmhXj2odp4mxv9Nx2qUIwVz8ZsU+Mbec=",
version = "v1.2.5",
)

go_repository(
Expand Down Expand Up @@ -3544,8 +3544,8 @@ def go_deps():
name = "com_github_rivo_uniseg",
build_file_proto_mode = "disable_global",
importpath = "github.com/rivo/uniseg",
sum = "h1:YwD0ulJSJytLpiaWua0sBDusfsCZohxjxzVTYjwxfV8=",
version = "v0.4.2",
sum = "h1:utMvzDsuh3suAEnhH0RdHmoPbU648o6CvXxTx4SBMOw=",
version = "v0.4.3",
)
go_repository(
name = "com_github_rlmcpherson_s3gof3r",
Expand Down
14 changes: 7 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ bazel_test: failpoint-enable bazel_ci_prepare

bazel_coverage_test: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) \
--build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock \
--build_event_json_file=bazel_1.json --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...

Expand Down Expand Up @@ -444,27 +444,27 @@ bazel_golangcilinter:
-- run $$($(PACKAGE_DIRECTORIES)) --config ./.golangci.yaml

bazel_brietest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock,intest \
-- //tests/realtikvtest/brietest/...

bazel_pessimistictest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock,intest \
-- //tests/realtikvtest/pessimistictest/...

bazel_sessiontest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock,intest \
-- //tests/realtikvtest/sessiontest/...

bazel_statisticstest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock,intest \
-- //tests/realtikvtest/statisticstest/...

bazel_txntest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock,intest \
-- //tests/realtikvtest/txntest/...

bazel_addindextest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv --define gotags=deadlock,intest \
-- //tests/realtikvtest/addindextest/...

bazel_lint: bazel_prepare
Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ See the [Get Started](https://pingcap.github.io/tidb-dev-guide/get-started/intro
You can join the following groups or channels to discuss or ask questions about TiDB, and to keep yourself informed of the latest TiDB updates:

- Seek help when you use TiDB
- [TiDB Forum](https://ask.pingcap.com/)
- [Chinese TiDB Forum](https://asktug.com)
- TiDB Forum: [English](https://ask.pingcap.com/), [Chinese](https://asktug.com)
- Slack channels: [#everyone](https://slack.tidb.io/invite?team=tidb-community&channel=everyone&ref=pingcap-tidb) (English), [#tidb-japan](https://slack.tidb.io/invite?team=tidb-community&channel=tidb-japan&ref=github-tidb) (Japanese)
- [Stack Overflow](https://stackoverflow.com/questions/tagged/tidb) (questions tagged with #tidb)
- Discuss TiDB's implementation and design
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ go_test(
"//br/pkg/restore/split",
"//br/pkg/utils",
"//ddl",
"//keyspace",
"//kv",
"//parser",
"//parser/ast",
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ type DuplicateManager struct {
tableName string
splitCli split.SplitClient
tikvCli *tikv.KVStore
tikvCodec tikv.Codec
errorMgr *errormanager.ErrorManager
decoder *kv.TableKVDecoder
logger log.Logger
Expand All @@ -401,6 +402,7 @@ func NewDuplicateManager(
tableName string,
splitCli split.SplitClient,
tikvCli *tikv.KVStore,
tikvCodec tikv.Codec,
errMgr *errormanager.ErrorManager,
sessOpts *kv.SessionOptions,
concurrency int,
Expand All @@ -417,6 +419,7 @@ func NewDuplicateManager(
tableName: tableName,
splitCli: splitCli,
tikvCli: tikvCli,
tikvCodec: tikvCodec,
errorMgr: errMgr,
decoder: decoder,
logger: logger,
Expand All @@ -439,6 +442,10 @@ func (m *DuplicateManager) RecordDataConflictError(ctx context.Context, stream D
if err != nil {
return errors.Trace(err)
}
key, err = m.tikvCodec.DecodeKey(key)
if err != nil {
return errors.Trace(err)
}
m.hasDupe.Store(true)

h, err := m.decoder.DecodeHandleFromRowKey(key)
Expand Down Expand Up @@ -504,6 +511,10 @@ func (m *DuplicateManager) RecordIndexConflictError(ctx context.Context, stream
if err != nil {
return errors.Trace(err)
}
key, err = m.tikvCodec.DecodeKey(key)
if err != nil {
return errors.Trace(err)
}
m.hasDupe.Store(true)

h, err := m.decoder.DecodeHandleFromIndex(indexInfo, key, val)
Expand Down Expand Up @@ -581,6 +592,11 @@ func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) {
putToTaskFunc(ranges, indexInfo)
})
}

// Encode all the tasks
for i := range tasks {
tasks[i].StartKey, tasks[i].EndKey = m.tikvCodec.EncodeRange(tasks[i].StartKey, tasks[i].EndKey)
}
return tasks, nil
}

Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/local/duplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -52,7 +53,7 @@ func TestBuildDupTask(t *testing.T) {
{&lkv.SessionOptions{IndexID: info.Indices[1].ID}, false},
}
for _, tc := range testCases {
dupMgr, err := local.NewDuplicateManager(tbl, "t", nil, nil, nil,
dupMgr, err := local.NewDuplicateManager(tbl, "t", nil, nil, keyspace.CodecV1, nil,
tc.sessOpt, 4, atomic.NewBool(false), log.FromContext(context.Background()))
require.NoError(t, err)
tasks, err := local.BuildDuplicateTaskForTest(dupMgr)
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/hack"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -1045,6 +1046,8 @@ type Writer struct {
batchSize int64

lastMetaSeq int32

tikvCodec tikv.Codec
}

func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
Expand Down Expand Up @@ -1127,6 +1130,10 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [
return errorEngineClosed
}

for i := range kvs {
kvs[i].Key = w.tikvCodec.EncodeKey(kvs[i].Key)
}

w.Lock()
defer w.Unlock()

Expand Down
54 changes: 35 additions & 19 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,13 @@ func checkTiFlashVersion(ctx context.Context, g glue.Glue, checkCtx *backend.Che
type local struct {
engines sync.Map // sync version of map[uuid.UUID]*Engine

pdCtl *pdutil.PdController
splitCli split.SplitClient
tikvCli *tikvclient.KVStore
tls *common.TLS
pdAddr string
g glue.Glue
pdCtl *pdutil.PdController
splitCli split.SplitClient
tikvCli *tikvclient.KVStore
tls *common.TLS
pdAddr string
g glue.Glue
tikvCodec tikvclient.Codec

localStoreDir string

Expand Down Expand Up @@ -419,6 +420,7 @@ func NewLocalBackend(
g glue.Glue,
maxOpenFiles int,
errorMgr *errormanager.ErrorManager,
keyspaceName string,
) (backend.Backend, error) {
localFile := cfg.TikvImporter.SortedKVDir
rangeConcurrency := cfg.TikvImporter.RangeConcurrency
Expand Down Expand Up @@ -460,8 +462,19 @@ func NewLocalBackend(
if err != nil {
return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}
rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()))
pdCliForTiKV := tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient())

var pdCliForTiKV *tikvclient.CodecPDClient
if keyspaceName == "" {
pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient())
} else {
pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCtl.GetPDClient(), keyspaceName)
if err != nil {
return backend.MakeBackend(nil), common.ErrCreatePDClient.Wrap(err).GenWithStackByArgs()
}
}

tikvCodec := pdCliForTiKV.GetCodec()
rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec))
tikvCli, err := tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli)
if err != nil {
return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
Expand All @@ -484,13 +497,14 @@ func NewLocalBackend(
LastAlloc = alloc
}
local := &local{
engines: sync.Map{},
pdCtl: pdCtl,
splitCli: splitCli,
tikvCli: tikvCli,
tls: tls,
pdAddr: cfg.TiDB.PdAddr,
g: g,
engines: sync.Map{},
pdCtl: pdCtl,
splitCli: splitCli,
tikvCli: tikvCli,
tls: tls,
pdAddr: cfg.TiDB.PdAddr,
g: g,
tikvCodec: tikvCodec,

localStoreDir: localFile,
rangeConcurrency: worker.NewPool(ctx, rangeConcurrency, "range"),
Expand Down Expand Up @@ -975,6 +989,7 @@ func (local *local) WriteToTiKV(
Start: firstKey,
End: lastKey,
},
ApiVersion: local.tikvCodec.GetAPIVersion(),
}

leaderID := region.Leader.GetId()
Expand Down Expand Up @@ -1676,7 +1691,7 @@ func (local *local) CollectLocalDuplicateRows(ctx context.Context, tbl table.Tab
}()

atomicHasDupe := atomic.NewBool(false)
duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli,
duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec,
local.errorMgr, opts, local.dupeConcurrency, atomicHasDupe, log.FromContext(ctx))
if err != nil {
return false, errors.Trace(err)
Expand All @@ -1694,7 +1709,7 @@ func (local *local) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Ta
}()

atomicHasDupe := atomic.NewBool(false)
duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli,
duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec,
local.errorMgr, opts, local.dupeConcurrency, atomicHasDupe, log.FromContext(ctx))
if err != nil {
return false, errors.Trace(err)
Expand Down Expand Up @@ -1908,16 +1923,17 @@ func (local *local) LocalWriter(ctx context.Context, cfg *backend.LocalWriterCon
return nil, errors.Errorf("could not find engine for %s", engineUUID.String())
}
engine := e.(*Engine)
return openLocalWriter(cfg, engine, local.localWriterMemCacheSize, local.bufferPool.NewBuffer())
return openLocalWriter(cfg, engine, local.tikvCodec, local.localWriterMemCacheSize, local.bufferPool.NewBuffer())
}

func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) {
func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, tikvCodec tikvclient.Codec, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) {
w := &Writer{
engine: engine,
memtableSizeLimit: cacheSize,
kvBuffer: kvBuffer,
isKVSorted: cfg.IsKVSorted,
isWriteBatchSorted: true,
tikvCodec: tikvCodec,
}
// pre-allocate a long enough buffer to avoid a lot of runtime.growslice
// this can help save about 3% of CPU.
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/keyspace"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -332,7 +333,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) {
pool := membuf.NewPool()
defer pool.Destroy()
kvBuffer := pool.NewBuffer()
w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024, kvBuffer)
w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, keyspace.CodecV1, 1024, kvBuffer)
require.NoError(t, err)

ctx := context.Background()
Expand Down Expand Up @@ -1290,6 +1291,7 @@ func TestCheckPeersBusy(t *testing.T) {
bufferPool: membuf.NewPool(),
supportMultiIngest: true,
shouldCheckWriteStall: true,
tikvCodec: keyspace.CodecV1,
}

db, tmpPath := makePebbleDB(t, nil)
Expand Down
Loading

0 comments on commit 95bfcd8

Please sign in to comment.