diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index b09a1abad85ba..2cd903d3bc290 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -111,6 +111,7 @@ go_test( "//br/pkg/restore/split", "//br/pkg/utils", "//ddl", + "//keyspace", "//kv", "//parser", "//parser/ast", diff --git a/br/pkg/lightning/backend/local/duplicate.go b/br/pkg/lightning/backend/local/duplicate.go index 8877c16ae7740..fe6cd110a026c 100644 --- a/br/pkg/lightning/backend/local/duplicate.go +++ b/br/pkg/lightning/backend/local/duplicate.go @@ -387,6 +387,7 @@ type DuplicateManager struct { tableName string splitCli split.SplitClient tikvCli *tikv.KVStore + tikvCodec tikv.Codec errorMgr *errormanager.ErrorManager decoder *kv.TableKVDecoder logger log.Logger @@ -401,6 +402,7 @@ func NewDuplicateManager( tableName string, splitCli split.SplitClient, tikvCli *tikv.KVStore, + tikvCodec tikv.Codec, errMgr *errormanager.ErrorManager, sessOpts *kv.SessionOptions, concurrency int, @@ -417,6 +419,7 @@ func NewDuplicateManager( tableName: tableName, splitCli: splitCli, tikvCli: tikvCli, + tikvCodec: tikvCodec, errorMgr: errMgr, decoder: decoder, logger: logger, @@ -439,6 +442,10 @@ func (m *DuplicateManager) RecordDataConflictError(ctx context.Context, stream D if err != nil { return errors.Trace(err) } + key, err = m.tikvCodec.DecodeKey(key) + if err != nil { + return errors.Trace(err) + } m.hasDupe.Store(true) h, err := m.decoder.DecodeHandleFromRowKey(key) @@ -504,6 +511,10 @@ func (m *DuplicateManager) RecordIndexConflictError(ctx context.Context, stream if err != nil { return errors.Trace(err) } + key, err = m.tikvCodec.DecodeKey(key) + if err != nil { + return errors.Trace(err) + } m.hasDupe.Store(true) h, err := m.decoder.DecodeHandleFromIndex(indexInfo, key, val) @@ -581,6 +592,11 @@ func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) { putToTaskFunc(ranges, indexInfo) }) } + + // Encode all the tasks + for i := range tasks { + tasks[i].StartKey, tasks[i].EndKey = m.tikvCodec.EncodeRange(tasks[i].StartKey, tasks[i].EndKey) + } return tasks, nil } diff --git a/br/pkg/lightning/backend/local/duplicate_test.go b/br/pkg/lightning/backend/local/duplicate_test.go index d1db76aae92f8..7c7d8a6182e25 100644 --- a/br/pkg/lightning/backend/local/duplicate_test.go +++ b/br/pkg/lightning/backend/local/duplicate_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend/local" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/keyspace" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" @@ -52,7 +53,7 @@ func TestBuildDupTask(t *testing.T) { {&lkv.SessionOptions{IndexID: info.Indices[1].ID}, false}, } for _, tc := range testCases { - dupMgr, err := local.NewDuplicateManager(tbl, "t", nil, nil, nil, + dupMgr, err := local.NewDuplicateManager(tbl, "t", nil, nil, keyspace.CodecV1, nil, tc.sessOpt, 4, atomic.NewBool(false), log.FromContext(context.Background())) require.NoError(t, err) tasks, err := local.BuildDuplicateTaskForTest(dupMgr) diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index 9b757ed91fde4..04ac7dce7a7fe 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/util/hack" + "github.com/tikv/client-go/v2/tikv" "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -1045,6 +1046,8 @@ type Writer struct { batchSize int64 lastMetaSeq int32 + + tikvCodec tikv.Codec } func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { @@ -1127,6 +1130,10 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [ return errorEngineClosed } + for i := range kvs { + kvs[i].Key = w.tikvCodec.EncodeKey(kvs[i].Key) + } + w.Lock() defer w.Unlock() diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 5b59ce5b37d65..05ae60c4497d7 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -349,12 +349,13 @@ func checkTiFlashVersion(ctx context.Context, g glue.Glue, checkCtx *backend.Che type local struct { engines sync.Map // sync version of map[uuid.UUID]*Engine - pdCtl *pdutil.PdController - splitCli split.SplitClient - tikvCli *tikvclient.KVStore - tls *common.TLS - pdAddr string - g glue.Glue + pdCtl *pdutil.PdController + splitCli split.SplitClient + tikvCli *tikvclient.KVStore + tls *common.TLS + pdAddr string + g glue.Glue + tikvCodec tikvclient.Codec localStoreDir string @@ -419,6 +420,7 @@ func NewLocalBackend( g glue.Glue, maxOpenFiles int, errorMgr *errormanager.ErrorManager, + keyspaceName string, ) (backend.Backend, error) { localFile := cfg.TikvImporter.SortedKVDir rangeConcurrency := cfg.TikvImporter.RangeConcurrency @@ -460,8 +462,19 @@ func NewLocalBackend( if err != nil { return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() } - rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig())) - pdCliForTiKV := tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient()) + + var pdCliForTiKV *tikvclient.CodecPDClient + if keyspaceName == "" { + pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient()) + } else { + pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCtl.GetPDClient(), keyspaceName) + if err != nil { + return backend.MakeBackend(nil), common.ErrCreatePDClient.Wrap(err).GenWithStackByArgs() + } + } + + tikvCodec := pdCliForTiKV.GetCodec() + rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec)) tikvCli, err := tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli) if err != nil { return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs() @@ -484,13 +497,14 @@ func NewLocalBackend( LastAlloc = alloc } local := &local{ - engines: sync.Map{}, - pdCtl: pdCtl, - splitCli: splitCli, - tikvCli: tikvCli, - tls: tls, - pdAddr: cfg.TiDB.PdAddr, - g: g, + engines: sync.Map{}, + pdCtl: pdCtl, + splitCli: splitCli, + tikvCli: tikvCli, + tls: tls, + pdAddr: cfg.TiDB.PdAddr, + g: g, + tikvCodec: tikvCodec, localStoreDir: localFile, rangeConcurrency: worker.NewPool(ctx, rangeConcurrency, "range"), @@ -975,6 +989,7 @@ func (local *local) WriteToTiKV( Start: firstKey, End: lastKey, }, + ApiVersion: local.tikvCodec.GetAPIVersion(), } leaderID := region.Leader.GetId() @@ -1676,7 +1691,7 @@ func (local *local) CollectLocalDuplicateRows(ctx context.Context, tbl table.Tab }() atomicHasDupe := atomic.NewBool(false) - duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, + duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec, local.errorMgr, opts, local.dupeConcurrency, atomicHasDupe, log.FromContext(ctx)) if err != nil { return false, errors.Trace(err) @@ -1694,7 +1709,7 @@ func (local *local) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Ta }() atomicHasDupe := atomic.NewBool(false) - duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, + duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec, local.errorMgr, opts, local.dupeConcurrency, atomicHasDupe, log.FromContext(ctx)) if err != nil { return false, errors.Trace(err) @@ -1908,16 +1923,17 @@ func (local *local) LocalWriter(ctx context.Context, cfg *backend.LocalWriterCon return nil, errors.Errorf("could not find engine for %s", engineUUID.String()) } engine := e.(*Engine) - return openLocalWriter(cfg, engine, local.localWriterMemCacheSize, local.bufferPool.NewBuffer()) + return openLocalWriter(cfg, engine, local.tikvCodec, local.localWriterMemCacheSize, local.bufferPool.NewBuffer()) } -func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) { +func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, tikvCodec tikvclient.Codec, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) { w := &Writer{ engine: engine, memtableSizeLimit: cacheSize, kvBuffer: kvBuffer, isKVSorted: cfg.IsKVSorted, isWriteBatchSorted: true, + tikvCodec: tikvCodec, } // pre-allocate a long enough buffer to avoid a lot of runtime.growslice // this can help save about 3% of CPU. diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index a485bdecaca4a..04d63ffb7479b 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -47,6 +47,7 @@ import ( "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/keyspace" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/tablecodec" @@ -332,7 +333,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) { pool := membuf.NewPool() defer pool.Destroy() kvBuffer := pool.NewBuffer() - w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024, kvBuffer) + w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, keyspace.CodecV1, 1024, kvBuffer) require.NoError(t, err) ctx := context.Background() @@ -1290,6 +1291,7 @@ func TestCheckPeersBusy(t *testing.T) { bufferPool: membuf.NewPool(), supportMultiIngest: true, shouldCheckWriteStall: true, + tikvCodec: keyspace.CodecV1, } db, tmpPath := makePebbleDB(t, nil) diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 46c38e112b57d..ae83d41efd7f6 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -412,6 +412,38 @@ var ( taskCfgRecorderKey = "taskCfgRecorderKey" ) +func getKeyspaceName(g glue.Glue) (string, error) { + db, err := g.GetDB() + if err != nil { + return "", err + } + if db == nil { + return "", nil + } + + rows, err := db.Query("show config where Type = 'tidb' and name = 'keyspace-name'") + if err != nil { + return "", err + } + //nolint: errcheck + defer rows.Close() + + var ( + _type string + _instance string + _name string + value string + ) + if rows.Next() { + err = rows.Scan(&_type, &_instance, &_name, &value) + if err != nil { + return "", err + } + } + + return value, rows.Err() +} + func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *options) (err error) { build.LogInfo(build.Lightning) o.logger.Info("cfg", zap.Stringer("cfg", taskCfg)) @@ -541,6 +573,13 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti dbMetas := mdl.GetDatabases() web.BroadcastInitProgress(dbMetas) + keyspaceName, err := getKeyspaceName(g) + if err != nil { + o.logger.Error("fail to get keyspace name", zap.Error(err)) + return errors.Trace(err) + } + o.logger.Info("acquired keyspace name", zap.String("keyspaceName", keyspaceName)) + param := &restore.ControllerParam{ DBMetas: dbMetas, Status: &l.status, @@ -550,6 +589,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti CheckpointStorage: o.checkpointStorage, CheckpointName: o.checkpointName, DupIndicator: o.dupIndicator, + KeyspaceName: keyspaceName, } var procedure *restore.Controller diff --git a/br/pkg/lightning/restore/BUILD.bazel b/br/pkg/lightning/restore/BUILD.bazel index 06e503e0519db..647c4cea4d191 100644 --- a/br/pkg/lightning/restore/BUILD.bazel +++ b/br/pkg/lightning/restore/BUILD.bazel @@ -45,6 +45,7 @@ go_library( "//br/pkg/version/build", "//ddl", "//errno", + "//keyspace", "//kv", "//meta/autoid", "//parser", diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 543eddc3435fd..1032419ab777f 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -54,6 +54,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/br/pkg/version/build" + "github.com/pingcap/tidb/keyspace" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" @@ -231,6 +232,8 @@ type Controller struct { preInfoGetter PreRestoreInfoGetter precheckItemBuilder *PrecheckItemBuilder + + keyspaceName string } // LightningStatus provides the finished bytes and total bytes of the current task. @@ -266,6 +269,8 @@ type ControllerParam struct { CheckpointName string // DupIndicator can expose the duplicate detection result to the caller DupIndicator *atomic.Bool + // Keyspace name + KeyspaceName string } func NewRestoreController( @@ -353,7 +358,7 @@ func NewRestoreControllerWithPauser( } } - backend, err = local.NewLocalBackend(ctx, tls, cfg, p.Glue, maxOpenFiles, errorMgr) + backend, err = local.NewLocalBackend(ctx, tls, cfg, p.Glue, maxOpenFiles, errorMgr, p.KeyspaceName) if err != nil { return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err) } @@ -437,6 +442,8 @@ func NewRestoreControllerWithPauser( preInfoGetter: preInfoGetter, precheckItemBuilder: preCheckBuilder, + + keyspaceName: p.KeyspaceName, } return rc, nil @@ -1500,7 +1507,7 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { // Disable GC because TiDB enables GC already. kvStore, err = driver.TiKVDriver{}.OpenWithOptions( - fmt.Sprintf("tikv://%s?disableGC=true", rc.cfg.TiDB.PdAddr), + fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", rc.cfg.TiDB.PdAddr, rc.keyspaceName), driver.WithSecurity(rc.tls.ToTiKVSecurityConfig()), ) if err != nil { @@ -2333,7 +2340,14 @@ func (cr *chunkRestore) deliverLoop( var startRealOffset, currRealOffset int64 // save to 0 at first for hasMoreKVs { - var dataChecksum, indexChecksum verify.KVChecksum + c := keyspace.CodecV1 + if t.kvStore != nil { + c = t.kvStore.GetCodec() + } + var ( + dataChecksum = verify.NewKVChecksumWithKeyspace(c) + indexChecksum = verify.NewKVChecksumWithKeyspace(c) + ) var columns []string var kvPacket []deliveredKVs // init these two field as checkpoint current value, so even if there are no kv pairs delivered, @@ -2360,7 +2374,7 @@ func (cr *chunkRestore) deliverLoop( hasMoreKVs = false break populate } - p.kvs.ClassifyAndAppend(&dataKVs, &dataChecksum, &indexKVs, &indexChecksum) + p.kvs.ClassifyAndAppend(&dataKVs, dataChecksum, &indexKVs, indexChecksum) columns = p.columns currOffset = p.offset currRealOffset = p.realOffset @@ -2427,8 +2441,8 @@ func (cr *chunkRestore) deliverLoop( // No need to apply a lock since this is the only thread updating `cr.chunk.**`. // In local mode, we should write these checkpoints after engine flushed. lastOffset := cr.chunk.Chunk.Offset - cr.chunk.Checksum.Add(&dataChecksum) - cr.chunk.Checksum.Add(&indexChecksum) + cr.chunk.Checksum.Add(dataChecksum) + cr.chunk.Checksum.Add(indexChecksum) cr.chunk.Chunk.Offset = currOffset cr.chunk.Chunk.RealOffset = currRealOffset cr.chunk.Chunk.PrevRowIDMax = rowID diff --git a/br/pkg/lightning/verification/BUILD.bazel b/br/pkg/lightning/verification/BUILD.bazel index 9f308aeb81e11..122e4b0e8535f 100644 --- a/br/pkg/lightning/verification/BUILD.bazel +++ b/br/pkg/lightning/verification/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//br/pkg/lightning/common", + "@com_github_tikv_client_go_v2//tikv", "@org_uber_go_zap//zapcore", ], ) diff --git a/br/pkg/lightning/verification/checksum.go b/br/pkg/lightning/verification/checksum.go index 0a44177f80fe4..9eb2130074eec 100644 --- a/br/pkg/lightning/verification/checksum.go +++ b/br/pkg/lightning/verification/checksum.go @@ -19,15 +19,18 @@ import ( "hash/crc64" "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap/zapcore" ) var ecmaTable = crc64.MakeTable(crc64.ECMA) type KVChecksum struct { - bytes uint64 - kvs uint64 - checksum uint64 + base uint64 + prefixLen int + bytes uint64 + kvs uint64 + checksum uint64 } func NewKVChecksum(checksum uint64) *KVChecksum { @@ -36,6 +39,14 @@ func NewKVChecksum(checksum uint64) *KVChecksum { } } +func NewKVChecksumWithKeyspace(k tikv.Codec) *KVChecksum { + ks := k.GetKeyspace() + return &KVChecksum{ + base: crc64.Update(0, ecmaTable, ks), + prefixLen: len(ks), + } +} + func MakeKVChecksum(bytes uint64, kvs uint64, checksum uint64) KVChecksum { return KVChecksum{ bytes: bytes, @@ -45,10 +56,10 @@ func MakeKVChecksum(bytes uint64, kvs uint64, checksum uint64) KVChecksum { } func (c *KVChecksum) UpdateOne(kv common.KvPair) { - sum := crc64.Update(0, ecmaTable, kv.Key) + sum := crc64.Update(c.base, ecmaTable, kv.Key) sum = crc64.Update(sum, ecmaTable, kv.Val) - c.bytes += uint64(len(kv.Key) + len(kv.Val)) + c.bytes += uint64(c.prefixLen + len(kv.Key) + len(kv.Val)) c.kvs++ c.checksum ^= sum } @@ -62,11 +73,12 @@ func (c *KVChecksum) Update(kvs []common.KvPair) { ) for _, pair := range kvs { - sum = crc64.Update(0, ecmaTable, pair.Key) + sum = crc64.Update(c.base, ecmaTable, pair.Key) sum = crc64.Update(sum, ecmaTable, pair.Val) checksum ^= sum kvNum++ - bytes += (len(pair.Key) + len(pair.Val)) + bytes += c.prefixLen + bytes += len(pair.Key) + len(pair.Val) } c.bytes += uint64(bytes) diff --git a/ddl/ingest/backend.go b/ddl/ingest/backend.go index 26344359dd6b9..a94639d89d6bb 100644 --- a/ddl/ingest/backend.go +++ b/ddl/ingest/backend.go @@ -19,7 +19,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" - "github.com/pingcap/tidb/br/pkg/lightning/config" + lightning "github.com/pingcap/tidb/br/pkg/lightning/config" tikv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table" @@ -33,7 +33,7 @@ type BackendContext struct { jobID int64 backend *backend.Backend ctx context.Context - cfg *config.Config + cfg *lightning.Config EngMgr engineManager sysVars map[string]string diskRoot DiskRoot @@ -99,7 +99,7 @@ func (bc *BackendContext) Flush(indexID int64) error { logutil.BgLogger().Info(LitInfoUnsafeImport, zap.Int64("index ID", indexID), zap.Uint64("current disk usage", bc.diskRoot.CurrentUsage()), zap.Uint64("max disk quota", bc.diskRoot.MaxQuota())) - err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)) + err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(lightning.SplitRegionSize)*int64(lightning.MaxSplitRegionSizeRatio), int64(lightning.SplitRegionKeys)) if err != nil { logutil.BgLogger().Error(LitErrIngestDataErr, zap.Int64("index ID", indexID), zap.Error(err), zap.Uint64("current disk usage", bc.diskRoot.CurrentUsage()), diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index 14bb4fb3aa67a..49788c9760b58 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -56,7 +56,7 @@ func (m *backendCtxManager) Register(ctx context.Context, unique bool, jobID int if !ok { return nil, genBackendAllocMemFailedErr(m.memRoot, jobID) } - cfg, err := generateLightningConfig(m.memRoot, jobID, unique) + cfg, err := genConfig(m.memRoot, jobID, unique) if err != nil { logutil.BgLogger().Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err)) return nil, err @@ -67,7 +67,7 @@ func (m *backendCtxManager) Register(ctx context.Context, unique bool, jobID int return nil, err } - bcCtx := newBackendContext(ctx, jobID, &bd, cfg, defaultImportantVariables, m.memRoot, m.diskRoot) + bcCtx := newBackendContext(ctx, jobID, &bd, cfg.Lightning, defaultImportantVariables, m.memRoot, m.diskRoot) m.Store(jobID, bcCtx) m.memRoot.Consume(StructSizeBackendCtx) @@ -80,15 +80,16 @@ func (m *backendCtxManager) Register(ctx context.Context, unique bool, jobID int return bc, nil } -func createLocalBackend(ctx context.Context, cfg *config.Config, glue glue.Glue) (backend.Backend, error) { - tls, err := cfg.ToTLS() +func createLocalBackend(ctx context.Context, cfg *Config, glue glue.Glue) (backend.Backend, error) { + tls, err := cfg.Lightning.ToTLS() if err != nil { logutil.BgLogger().Error(LitErrCreateBackendFail, zap.Error(err)) return backend.Backend{}, err } - errorMgr := errormanager.New(nil, cfg, log.Logger{Logger: logutil.BgLogger()}) - return local.NewLocalBackend(ctx, tls, cfg, glue, int(LitRLimit), errorMgr) + logutil.BgLogger().Info("[ddl-ingest] create local backend for adding index", zap.String("keyspaceName", cfg.KeyspaceName)) + errorMgr := errormanager.New(nil, cfg.Lightning, log.Logger{Logger: logutil.BgLogger()}) + return local.NewLocalBackend(ctx, tls, cfg.Lightning, glue, int(LitRLimit), errorMgr, cfg.KeyspaceName) } func newBackendContext(ctx context.Context, jobID int64, be *backend.Backend, diff --git a/ddl/ingest/config.go b/ddl/ingest/config.go index 7fd251a361939..7362c2fbc5823 100644 --- a/ddl/ingest/config.go +++ b/ddl/ingest/config.go @@ -20,8 +20,8 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" - "github.com/pingcap/tidb/br/pkg/lightning/config" - tidbconf "github.com/pingcap/tidb/config" + lightning "github.com/pingcap/tidb/br/pkg/lightning/config" + tidb "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/size" "go.uber.org/zap" @@ -30,10 +30,16 @@ import ( // ImporterRangeConcurrencyForTest is only used for test. var ImporterRangeConcurrencyForTest *atomic.Int32 -func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config.Config, error) { - tidbCfg := tidbconf.GetGlobalConfig() - cfg := config.NewConfig() - cfg.TikvImporter.Backend = config.BackendLocal +// Config is the configuration for the lightning local backend used in DDL. +type Config struct { + Lightning *lightning.Config + KeyspaceName string +} + +func genConfig(memRoot MemRoot, jobID int64, unique bool) (*Config, error) { + tidbCfg := tidb.GetGlobalConfig() + cfg := lightning.NewConfig() + cfg.TikvImporter.Backend = lightning.BackendLocal // Each backend will build a single dir in lightning dir. cfg.TikvImporter.SortedKVDir = filepath.Join(LitSortPath, encodeBackendTag(jobID)) if ImporterRangeConcurrencyForTest != nil { @@ -47,9 +53,9 @@ func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config adjustImportMemory(memRoot, cfg) cfg.Checkpoint.Enable = true if unique { - cfg.TikvImporter.DuplicateResolution = config.DupeResAlgErr + cfg.TikvImporter.DuplicateResolution = lightning.DupeResAlgErr } else { - cfg.TikvImporter.DuplicateResolution = config.DupeResAlgNone + cfg.TikvImporter.DuplicateResolution = lightning.DupeResAlgNone } cfg.TiDB.PdAddr = tidbCfg.Path cfg.TiDB.Host = "127.0.0.1" @@ -59,7 +65,12 @@ func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config cfg.Security.CertPath = tidbCfg.Security.ClusterSSLCert cfg.Security.KeyPath = tidbCfg.Security.ClusterSSLKey - return cfg, err + c := &Config{ + Lightning: cfg, + KeyspaceName: tidb.GetGlobalKeyspaceName(), + } + + return c, err } var ( @@ -83,7 +94,7 @@ func generateLocalEngineConfig(id int64, dbName, tbName string) *backend.EngineC } // adjustImportMemory adjusts the lightning memory parameters according to the memory root's max limitation. -func adjustImportMemory(memRoot MemRoot, cfg *config.Config) { +func adjustImportMemory(memRoot MemRoot, cfg *lightning.Config) { var scale int64 // Try aggressive resource usage successful. if tryAggressiveMemory(memRoot, cfg) { @@ -104,8 +115,8 @@ func adjustImportMemory(memRoot MemRoot, cfg *config.Config) { return } - cfg.TikvImporter.LocalWriterMemCacheSize /= config.ByteSize(scale) - cfg.TikvImporter.EngineMemCacheSize /= config.ByteSize(scale) + cfg.TikvImporter.LocalWriterMemCacheSize /= lightning.ByteSize(scale) + cfg.TikvImporter.EngineMemCacheSize /= lightning.ByteSize(scale) // TODO: adjust range concurrency number to control total concurrency in the future. logutil.BgLogger().Info(LitInfoChgMemSetting, zap.Int64("local writer memory cache size", int64(cfg.TikvImporter.LocalWriterMemCacheSize)), @@ -114,7 +125,7 @@ func adjustImportMemory(memRoot MemRoot, cfg *config.Config) { } // tryAggressiveMemory lightning memory parameters according memory root's max limitation. -func tryAggressiveMemory(memRoot MemRoot, cfg *config.Config) bool { +func tryAggressiveMemory(memRoot MemRoot, cfg *lightning.Config) bool { var defaultMemSize int64 defaultMemSize = int64(int(cfg.TikvImporter.LocalWriterMemCacheSize) * cfg.TikvImporter.RangeConcurrency) defaultMemSize += int64(cfg.TikvImporter.EngineMemCacheSize) diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index d732665bb45a4..8a28b91b6cb2c 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -113,7 +113,6 @@ func TrySetupGlobalResourceController(ctx context.Context, serverID uint64, s kv // TiKVDriver implements engine TiKV. type TiKVDriver struct { - keyspaceName string pdConfig config.PDClient security config.Security tikvConfig config.TiKVClient