Skip to content

Commit

Permalink
only change subtask size for local ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Oct 26, 2023
1 parent 3be9b60 commit 4e089b9
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions pkg/ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,15 @@ func (dsp *BackfillingDispatcherExt) OnNextSubtasksBatch(
if tblInfo.Partition != nil {
return generatePartitionPlan(tblInfo)
}
return generateNonPartitionPlan(dsp.d, tblInfo, job)
instancesForLocal := 0
if !dsp.GlobalSort {
is, err := dsp.GetEligibleInstances(ctx, gTask)
if err != nil {
return nil, err
}
instancesForLocal = len(is)
}
return generateNonPartitionPlan(dsp.d, tblInfo, job, instancesForLocal)
case StepMergeSort:
res, err := generateMergePlan(taskHandle, gTask, logger)
if err != nil {
Expand Down Expand Up @@ -270,7 +278,8 @@ func generatePartitionPlan(tblInfo *model.TableInfo) (metas [][]byte, err error)
return subTaskMetas, nil
}

func generateNonPartitionPlan(d *ddl, tblInfo *model.TableInfo, job *model.Job) (metas [][]byte, err error) {
func generateNonPartitionPlan(
d *ddl, tblInfo *model.TableInfo, job *model.Job, instanceForLocal int) (metas [][]byte, err error) {
tbl, err := getTable(d.store, job.SchemaID, tblInfo)
if err != nil {
return nil, err
Expand All @@ -293,9 +302,15 @@ func generateNonPartitionPlan(d *ddl, tblInfo *model.TableInfo, job *model.Job)
return nil, err
}

// Make subtask large enough to reduce the overhead of local/global flush.
quota := variable.DDLDiskQuota.Load()
regionBatch := int(int64(quota) / int64(config.SplitRegionSize))
regionBatch := 20
if instanceForLocal > 0 {
// Make subtask large enough to reduce the overhead of local/global flush.
quota := variable.DDLDiskQuota.Load()
regionBatch = min(
int(int64(quota)/int64(config.SplitRegionSize)),
len(recordRegionMetas)/instanceForLocal,
)
}

subTaskMetas := make([][]byte, 0, 4)
sort.Slice(recordRegionMetas, func(i, j int) bool {
Expand Down

0 comments on commit 4e089b9

Please sign in to comment.