Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support session level tidb_ddl_reorg_worker_cnt and batch_size #55334

Merged
merged 10 commits into from
Aug 13, 2024
Merged
21 changes: 15 additions & 6 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ func newBackfillCtx(id int, rInfo *reorgInfo,
}

exprCtx := sessCtx.GetExprCtx()
batchCnt := rInfo.ReorgMeta.BatchSize
if batchCnt == 0 {
batchCnt = int(variable.GetDDLReorgBatchSize())
}
return &backfillCtx{
id: id,
ddlCtx: rInfo.d,
Expand All @@ -188,7 +192,7 @@ func newBackfillCtx(id int, rInfo *reorgInfo,
loc: exprCtx.GetEvalCtx().Location(),
schemaName: schemaName,
table: tbl,
batchCnt: int(variable.GetDDLReorgBatchSize()),
batchCnt: batchCnt,
jobContext: jobCtx,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel(label, schemaName, tbl.Meta().Name.String())),
Expand Down Expand Up @@ -415,7 +419,11 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
})

// Change the batch size dynamically.
w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize())
newBatchCnt := job.ReorgMeta.BatchSize
if newBatchCnt == 0 {
newBatchCnt = int(variable.GetDDLReorgBatchSize())
}
w.GetCtx().batchCnt = newBatchCnt
result := w.handleBackfillTask(d, task, bf)
w.sendResult(result)

Expand Down Expand Up @@ -675,8 +683,9 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(

//nolint: forcetypeassert
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
importConc := ingest.ResolveConcurrency(job.ReorgMeta.Concurrency)
bcCtx, err := ingest.LitBackCtxMgr.Register(
ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName)
ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, importConc)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -705,16 +714,15 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
bcCtx.AttachCheckpointManager(cpMgr)
}

reorgCtx := dc.getReorgCtx(reorgInfo.Job.ID)
reorgCtx := dc.getReorgCtx(job.ID)
rowCntListener := &localRowCntListener{
prevPhysicalRowCnt: reorgCtx.getRowCount(),
reorgCtx: dc.getReorgCtx(reorgInfo.Job.ID),
reorgCtx: reorgCtx,
counter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", job.SchemaName, job.TableName)),
}

avgRowSize := estimateTableRowSize(ctx, dc.store, sctx.GetRestrictedSQLExecutor(), t)
concurrency := int(variable.GetDDLReorgWorkerCounter())

engines, err := bcCtx.Register(indexIDs, uniques, t)
if err != nil {
Expand All @@ -725,6 +733,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
return errors.Trace(err)
}

concurrency := ingest.ResolveConcurrency(job.ReorgMeta.Concurrency)
pipe, err := NewAddIndexIngestPipeline(
opCtx,
dc.store,
Expand Down
9 changes: 8 additions & 1 deletion pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,14 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
ddlObj := s.d
discovery := ddlObj.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()

return ingest.LitBackCtxMgr.Register(s.BaseTaskExecutor.Ctx(), job.ID, hasUnique, ddlObj.etcdCli, discovery, job.ReorgMeta.ResourceGroupName)
return ingest.LitBackCtxMgr.Register(
s.BaseTaskExecutor.Ctx(),
job.ID, hasUnique,
ddlObj.etcdCli,
discovery,
job.ReorgMeta.ResourceGroupName,
ingest.ResolveConcurrency(job.ReorgMeta.Concurrency),
)
}

func hasUniqueIndex(job *model.Job) (bool, error) {
Expand Down
45 changes: 25 additions & 20 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,11 @@ func NewAddIndexIngestPipeline(
if err != nil {
return nil, err
}
poolSize := copReadChunkPoolSize()
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize())
}
srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize)
readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, cpMgr)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, reorgMeta.BatchSize)
ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool,
tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, cpMgr, rowCntListener)
sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, cpMgr, rowCntListener)
Expand Down Expand Up @@ -226,11 +222,7 @@ func NewWriteIndexToExternalStoragePipeline(
if err != nil {
return nil, err
}
poolSize := copReadChunkPoolSize()
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize())
}
srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize)
readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)

backend, err := storage.ParseBackend(extStoreURI, nil)
Expand All @@ -248,7 +240,7 @@ func NewWriteIndexToExternalStoragePipeline(
})

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, nil)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.BatchSize)
writeOp := NewWriteExternalStoreOperator(
ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, onClose, memSizePerIndex, reorgMeta)
sinkOp := newIndexWriteResultSink(ctx, nil, tbl, indexes, nil, rowCntListener)
Expand All @@ -270,6 +262,16 @@ func NewWriteIndexToExternalStoragePipeline(
), nil
}

func createChunkPool(copCtx copr.CopContext, hintConc, hintBatchSize int) chan *chunk.Chunk {
poolSize := ingest.CopReadChunkPoolSize(hintConc)
batchSize := ingest.CopReadBatchSize(hintBatchSize)
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, batchSize)
}
return srcChkPool
}

// TableScanTask contains the start key and the end key of a region.
type TableScanTask struct {
ID int
Expand Down Expand Up @@ -457,19 +459,21 @@ func NewTableScanOperator(
srcChkPool chan *chunk.Chunk,
concurrency int,
cpMgr *ingest.CheckpointManager,
hintBatchSize int,
) *TableScanOperator {
pool := workerpool.NewWorkerPool(
"TableScanOperator",
util.DDL,
concurrency,
func() workerpool.Worker[TableScanTask, IndexRecordChunk] {
return &tableScanWorker{
ctx: ctx,
copCtx: copCtx,
sessPool: sessPool,
se: nil,
srcChkPool: srcChkPool,
cpMgr: cpMgr,
ctx: ctx,
copCtx: copCtx,
sessPool: sessPool,
se: nil,
srcChkPool: srcChkPool,
cpMgr: cpMgr,
hintBatchSize: hintBatchSize,
}
})
return &TableScanOperator{
Expand All @@ -484,7 +488,8 @@ type tableScanWorker struct {
se *session.Session
srcChkPool chan *chunk.Chunk

cpMgr *ingest.CheckpointManager
cpMgr *ingest.CheckpointManager
hintBatchSize int
}

func (w *tableScanWorker) HandleTask(task TableScanTask, sender func(IndexRecordChunk)) {
Expand Down Expand Up @@ -554,7 +559,7 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor

func (w *tableScanWorker) getChunk() *chunk.Chunk {
chk := <-w.srcChkPool
newCap := copReadBatchSize()
newCap := ingest.CopReadBatchSize(w.hintBatchSize)
if chk.Capacity() != newCap {
chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, newCap)
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses
if err != nil {
return nil, err
}
workerCnt := info.ReorgMeta.Concurrency
if workerCnt == 0 {
workerCnt = int(variable.GetDDLReorgWorkerCounter())
}
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
return &txnBackfillScheduler{
ctx: ctx,
reorgInfo: info,
Expand All @@ -93,7 +97,7 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses
tbl: tbl,
decodeColMap: decColMap,
jobCtx: jobCtx,
workers: make([]*backfillWorker, 0, variable.GetDDLReorgWorkerCounter()),
workers: make([]*backfillWorker, 0, workerCnt),
taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultCh: make(chan *backfillResult, backfillTaskChanSize),
}, nil
Expand Down Expand Up @@ -230,8 +234,11 @@ func restoreSessCtx(sessCtx sessionctx.Context) func(sessCtx sessionctx.Context)
}
}

func (*txnBackfillScheduler) expectedWorkerSize() (size int) {
workerCnt := int(variable.GetDDLReorgWorkerCounter())
func (b *txnBackfillScheduler) expectedWorkerSize() (size int) {
workerCnt := b.reorgInfo.ReorgMeta.Concurrency
if workerCnt == 0 {
workerCnt = int(variable.GetDDLReorgWorkerCounter())
}
return min(workerCnt, maxBackfillWorkerSize)
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4755,6 +4755,12 @@ func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model.
reorgMeta.IsDistReorg = variable.EnableDistTask.Load()
reorgMeta.IsFastReorg = variable.EnableFastReorg.Load()
reorgMeta.TargetScope = variable.ServiceScope.Load()
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok {
reorgMeta.Concurrency = variable.TidbOptInt(sv, 0)
}
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
reorgMeta.BatchSize = variable.TidbOptInt(sv, 0)
}

if reorgMeta.IsDistReorg && !reorgMeta.IsFastReorg {
return nil, dbterror.ErrUnsupportedDistTask
Expand All @@ -4770,6 +4776,17 @@ func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model.
LastReorgMetaFastReorgDisabled = true
})
}

logutil.DDLLogger().Info("initialize reorg meta",
zap.String("jobSchema", job.SchemaName),
zap.String("jobTable", job.TableName),
zap.Stringer("jobType", job.Type),
zap.Bool("enableDistTask", reorgMeta.IsDistReorg),
zap.Bool("enableFastReorg", reorgMeta.IsFastReorg),
zap.String("targetScope", reorgMeta.TargetScope),
zap.Int("concurrency", reorgMeta.Concurrency),
zap.Int("batchSize", reorgMeta.BatchSize),
)
return reorgMeta, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func FetchChunk4Test(copCtx copr.CopContext, tbl table.PhysicalTable, startKey,
}
opCtx := ddl.NewLocalOperatorCtx(context.Background(), 1)
src := testutil.NewOperatorTestSource(ddl.TableScanTask{1, startKey, endKey})
scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 1, nil)
scanOp := ddl.NewTableScanOperator(opCtx, sessPool, copCtx, srcChkPool, 1, nil, 0)
sink := testutil.NewOperatorTestSink[ddl.IndexRecordChunk]()

operator.Compose[ddl.TableScanTask](src, scanOp)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1954,7 +1954,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo
if indexInfo.Unique {
ctx := tidblogutil.WithCategory(ctx, "ddl-ingest")
if bc == nil {
bc, err = ingest.LitBackCtxMgr.Register(ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName)
bc, err = ingest.LitBackCtxMgr.Register(ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1)
if err != nil {
return err
}
Expand Down Expand Up @@ -2029,7 +2029,7 @@ func (w *worker) executeDistTask(t table.Table, reorgInfo *reorgInfo) error {
})
} else {
job := reorgInfo.Job
workerCntLimit := int(variable.GetDDLReorgWorkerCounter())
workerCntLimit := ingest.ResolveConcurrency(job.ReorgMeta.Concurrency)
cpuCount, err := handle.GetCPUCountOfNode(ctx)
if err != nil {
return err
Expand Down
15 changes: 0 additions & 15 deletions pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand All @@ -41,20 +40,6 @@ import (
kvutil "github.com/tikv/client-go/v2/util"
)

// copReadBatchSize is the batch size of coprocessor read.
// It multiplies the tidb_ddl_reorg_batch_size by 10 to avoid
// sending too many cop requests for the same handle range.
func copReadBatchSize() int {
return 10 * int(variable.GetDDLReorgBatchSize())
}

// copReadChunkPoolSize is the size of chunk pool, which
// represents the max concurrent ongoing coprocessor requests.
// It multiplies the tidb_ddl_reorg_worker_cnt by 10.
func copReadChunkPoolSize() int {
return 10 * int(variable.GetDDLReorgWorkerCounter())
}

func wrapInBeginRollback(se *sess.Session, f func(startTS uint64) error) error {
err := se.Begin(context.Background())
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type BackendCtxMgr interface {
etcdClient *clientv3.Client,
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
importConc int,
) (BackendCtx, error)
Unregister(jobID int64)
// EncodeJobSortPath encodes the job ID to the local disk sort path.
Expand Down Expand Up @@ -114,6 +115,7 @@ func (m *litBackendCtxMgr) Register(
etcdClient *clientv3.Client,
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
concurrency int,
) (BackendCtx, error) {
bc, exist := m.Load(jobID)
if exist {
Expand All @@ -131,7 +133,7 @@ func (m *litBackendCtxMgr) Register(
logutil.Logger(ctx).Error(LitErrCreateDirFail, zap.Error(err))
return nil, err
}
cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName)
cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency)
if err != nil {
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
Expand Down
29 changes: 27 additions & 2 deletions pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@ func genConfig(
memRoot MemRoot,
unique bool,
resourceGroup string,
concurrency int,
) (*litConfig, error) {
tidbCfg := tidb.GetGlobalConfig()
cfg := lightning.NewConfig()
cfg.TikvImporter.Backend = lightning.BackendLocal
// Each backend will build a single dir in lightning dir.
cfg.TikvImporter.SortedKVDir = jobSortPath
cfg.TikvImporter.RangeConcurrency = concurrency
if ImporterRangeConcurrencyForTest != nil {
cfg.TikvImporter.RangeConcurrency = int(ImporterRangeConcurrencyForTest.Load())
} else {
cfg.TikvImporter.RangeConcurrency = int(variable.GetDDLReorgWorkerCounter())
}
err := cfg.AdjustForDDL()
if err != nil {
Expand Down Expand Up @@ -91,6 +91,31 @@ func genConfig(
return c, nil
}

// ResolveConcurrency gets the concurrency used for ingest mode of adding index.
func ResolveConcurrency(hintConc int) int {
if hintConc > 0 {
return hintConc
}
return int(variable.GetDDLReorgWorkerCounter())
}

// CopReadBatchSize is the batch size of coprocessor read.
// It multiplies the tidb_ddl_reorg_batch_size by 10 to avoid
// sending too many cop requests for the same handle range.
func CopReadBatchSize(hintSize int) int {
if hintSize > 0 {
return hintSize
}
return 10 * int(variable.GetDDLReorgBatchSize())
}

// CopReadChunkPoolSize is the size of chunk pool, which
// represents the max concurrent ongoing coprocessor requests.
// It multiplies the tidb_ddl_reorg_worker_cnt by 10.
func CopReadChunkPoolSize(hintConc int) int {
return ResolveConcurrency(hintConc) * 10
}

// NewDDLTLS creates a common.TLS from the tidb config for DDL.
func NewDDLTLS() (*common.TLS, error) {
tidbCfg := tidb.GetGlobalConfig()
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) {
}

// Register implements BackendCtxMgr.Register interface.
func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client, pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string) (BackendCtx, error) {
func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client,
pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int) (BackendCtx, error) {
logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID))
if mockCtx, ok := m.runningJobs[jobID]; ok {
return mockCtx, nil
Expand Down
Loading