Skip to content

Commit

Permalink
ddl: introduce a new system variable to control the `store-write-bwli…
Browse files Browse the repository at this point in the history
…mit` when ingesting (#57145)

close #57156
  • Loading branch information
CbcWestwolf authored Nov 14, 2024
1 parent ae27921 commit 50dcee7
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 15 deletions.
14 changes: 7 additions & 7 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,7 @@ func get2JobsFromTable(sess *sess.Session) (*model.Job, *model.Job, error) {
}

// cancelRunningJob cancel a DDL job that is in the concurrent state.
func cancelRunningJob(_ *sess.Session, job *model.Job,
func cancelRunningJob(job *model.Job,
byWho model.AdminCommandOperator) (err error) {
// These states can't be cancelled.
if job.IsDone() || job.IsSynced() {
Expand All @@ -1327,7 +1327,7 @@ func cancelRunningJob(_ *sess.Session, job *model.Job,
}

// pauseRunningJob check and pause the running Job
func pauseRunningJob(_ *sess.Session, job *model.Job,
func pauseRunningJob(job *model.Job,
byWho model.AdminCommandOperator) (err error) {
if job.IsPausing() || job.IsPaused() {
return dbterror.ErrPausedDDLJob.GenWithStackByArgs(job.ID)
Expand All @@ -1346,7 +1346,7 @@ func pauseRunningJob(_ *sess.Session, job *model.Job,
}

// resumePausedJob check and resume the Paused Job
func resumePausedJob(_ *sess.Session, job *model.Job,
func resumePausedJob(job *model.Job,
byWho model.AdminCommandOperator) (err error) {
if !job.IsResumable() {
errMsg := fmt.Sprintf("job has not been paused, job state:%s, schema state:%s",
Expand All @@ -1368,7 +1368,7 @@ func resumePausedJob(_ *sess.Session, job *model.Job,
// processJobs command on the Job according to the process
func processJobs(
ctx context.Context,
process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error),
process func(*model.Job, model.AdminCommandOperator) (err error),
sessCtx sessionctx.Context,
ids []int64,
byWho model.AdminCommandOperator,
Expand Down Expand Up @@ -1415,7 +1415,7 @@ func processJobs(
}
delete(jobMap, job.ID)

err = process(ns, job, byWho)
err = process(job, byWho)
if err != nil {
jobErrs[i] = err
continue
Expand Down Expand Up @@ -1487,7 +1487,7 @@ func ResumeJobsBySystem(se sessionctx.Context, ids []int64) (errs []error, err e
// pprocessAllJobs processes all the jobs in the job table, 100 jobs at a time in case of high memory usage.
func processAllJobs(
ctx context.Context,
process func(*sess.Session, *model.Job, model.AdminCommandOperator) (err error),
process func(*model.Job, model.AdminCommandOperator) (err error),
se sessionctx.Context,
byWho model.AdminCommandOperator,
) (map[int64]error, error) {
Expand Down Expand Up @@ -1515,7 +1515,7 @@ func processAllJobs(
}

for _, job := range jobs {
err = process(ns, job, byWho)
err = process(job, byWho)
if err != nil {
jobErrs[job.ID] = err
continue
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func genConfig(
PausePDSchedulerScope: lightning.PausePDSchedulerScopeTable,
TaskType: kvutil.ExplicitTypeDDL,
DisableAutomaticCompactions: true,
StoreWriteBWLimit: int(variable.DDLReorgMaxWriteSpeed.Load()),
}
// Each backend will build a single dir in lightning dir.
if ImporterRangeConcurrencyForTest != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/job_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (s *JobSubmitter) addBatchDDLJobs2Table(jobWs []*JobWrapper) error {
setJobStateToQueueing(job)

if s.serverStateSyncer.IsUpgradingState() && !hasSysDB(job) {
if err = pauseRunningJob(sess.NewSession(se), job, model.AdminCommandBySystem); err != nil {
if err = pauseRunningJob(job, model.AdminCommandBySystem); err != nil {
logutil.DDLUpgradingLogger().Warn("pause user DDL by system failed", zap.Stringer("job", job), zap.Error(err))
jobW.cacheErr = err
continue
Expand Down
12 changes: 6 additions & 6 deletions pkg/ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %?`
updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = %? WHERE job_id = %? AND element_id = %? AND start_key = %?`
deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?`
loadGlobalVars = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (` // + nameList + ")"
loadGlobalVarsSQL = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (` // + nameList + ")"
// KeyOpDefaultTimeout is the default timeout for each key operation.
KeyOpDefaultTimeout = 2 * time.Second
// KeyOpRetryInterval is the interval between two key operations.
Expand Down Expand Up @@ -187,20 +187,20 @@ func UpdateDeleteRange(sctx sessionctx.Context, dr DelRangeTask, newStartKey, ol
func LoadDDLReorgVars(ctx context.Context, sctx sessionctx.Context) error {
// close issue #21391
// variable.TiDBRowFormatVersion is used to encode the new row for column type change.
return LoadGlobalVars(ctx, sctx, []string{variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize, variable.TiDBRowFormatVersion})
return loadGlobalVars(ctx, sctx, []string{variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize, variable.TiDBRowFormatVersion})
}

// LoadDDLVars loads ddl variable from mysql.global_variables.
func LoadDDLVars(ctx sessionctx.Context) error {
return LoadGlobalVars(context.Background(), ctx, []string{variable.TiDBDDLErrorCountLimit})
return loadGlobalVars(context.Background(), ctx, []string{variable.TiDBDDLErrorCountLimit})
}

// LoadGlobalVars loads global variable from mysql.global_variables.
func LoadGlobalVars(ctx context.Context, sctx sessionctx.Context, varNames []string) error {
// loadGlobalVars loads global variable from mysql.global_variables.
func loadGlobalVars(ctx context.Context, sctx sessionctx.Context, varNames []string) error {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
e := sctx.GetRestrictedSQLExecutor()
var buf strings.Builder
buf.WriteString(loadGlobalVars)
buf.WriteString(loadGlobalVarsSQL)
paramNames := make([]any, 0, len(varNames))
for i, name := range varNames {
if i > 0 {
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/test/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 20,
shard_count = 21,
deps = [
"//pkg/config",
"//pkg/ddl/schematracker",
Expand All @@ -32,6 +32,7 @@ go_test(
"//pkg/util/chunk",
"//pkg/util/dbterror",
"//pkg/util/dbterror/plannererrors",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
Expand Down
26 changes: 26 additions & 0 deletions pkg/executor/test/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/docker/go-units"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl/schematracker"
ddltestutil "github.com/pingcap/tidb/pkg/ddl/testutil"
Expand Down Expand Up @@ -888,6 +889,31 @@ func TestSetDDLErrorCountLimit(t *testing.T) {
res.Check(testkit.Rows("100"))
}

func TestSetDDLReorgMaxWriteSpeed(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.Equal(t, int64(variable.DefTiDBDDLReorgMaxWriteSpeed), variable.DDLReorgMaxWriteSpeed.Load())

// valid values
for _, val := range []int64{1, 0, 100, 1024 * 1024, 2147483647, units.PiB} {
tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_max_write_speed = %d", val))
require.Equal(t, val, variable.DDLReorgMaxWriteSpeed.Load())
tk.MustQuery("select @@global.tidb_ddl_reorg_max_write_speed").Check(testkit.Rows(strconv.FormatInt(val, 10)))
}
for _, val := range []string{"1", "0", "100", "2KB", "3MiB", "4 gb", "2147483647", "1125899906842624" /* 1PiB */} {
tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_max_write_speed = '%s'", val))
expected, err := units.RAMInBytes(val)
require.NoError(t, err)
require.Equal(t, expected, variable.DDLReorgMaxWriteSpeed.Load())
tk.MustQuery("select @@global.tidb_ddl_reorg_max_write_speed").Check(testkit.Rows(strconv.FormatInt(expected, 10)))
}

// invalid values
tk.MustExecToErr("set @@global.tidb_ddl_reorg_max_write_speed = -1")
tk.MustExecToErr("set @@global.tidb_ddl_reorg_max_write_speed = invalid_val")
tk.MustExecToErr("set @@global.tidb_ddl_reorg_max_write_speed = %d", units.PiB+1)
}

func TestLoadDDLDistributeVars(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
18 changes: 18 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync/atomic"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/executor/join/joinversion"
Expand Down Expand Up @@ -782,6 +783,23 @@ var defaultSysVars = []*SysVar{
SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize)))
return nil
}},
{Scope: ScopeGlobal, Name: TiDBDDLReorgMaxWriteSpeed, Value: strconv.Itoa(DefTiDBDDLReorgMaxWriteSpeed), Type: TypeStr,
SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
i64, err := units.RAMInBytes(val)
if err != nil {
return errors.Trace(err)
}
if i64 < 0 || i64 > units.PiB {
// Here we limit the max value to 1 PiB instead of math.MaxInt64, since:
// 1. it is large enough
// 2. units.RAMInBytes would first cast the size to a float, and may lose precision when the size is too large
return fmt.Errorf("invalid value for '%d', it should be within [%d, %d]", i64, 0, units.PiB)
}
DDLReorgMaxWriteSpeed.Store(i64)
return nil
}, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) {
return strconv.FormatInt(DDLReorgMaxWriteSpeed.Load(), 10), nil
}},
{Scope: ScopeGlobal, Name: TiDBDDLErrorCountLimit, Value: strconv.Itoa(DefTiDBDDLErrorCountLimit), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxInt64, SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
SetDDLErrorCountLimit(TidbOptInt64(val, DefTiDBDDLErrorCountLimit))
return nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ const (
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"

// TiDBDDLReorgMaxWriteSpeed defines the max write limitation for the lightning local backend
TiDBDDLReorgMaxWriteSpeed = "tidb_ddl_reorg_max_write_speed"

// TiDBEnableAutoIncrementInGenerated disables the mysql compatibility check on using auto-incremented columns in
// expression indexes and generated columns described here https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html for details.
TiDBEnableAutoIncrementInGenerated = "tidb_enable_auto_increment_in_generated"
Expand Down Expand Up @@ -1324,6 +1327,7 @@ const (
DefTiDBDDLReorgBatchSize = 256
DefTiDBDDLFlashbackConcurrency = 64
DefTiDBDDLErrorCountLimit = 512
DefTiDBDDLReorgMaxWriteSpeed = 0
DefTiDBMaxDeltaSchemaCount = 1024
DefTiDBPlacementMode = PlacementModeStrict
DefTiDBEnableAutoIncrementInGenerated = false
Expand Down Expand Up @@ -1590,6 +1594,7 @@ var (
ddlFlashbackConcurrency int32 = DefTiDBDDLFlashbackConcurrency
ddlErrorCountLimit int64 = DefTiDBDDLErrorCountLimit
ddlReorgRowFormat int64 = DefTiDBRowFormatV2
DDLReorgMaxWriteSpeed = atomic.NewInt64(DefTiDBDDLReorgMaxWriteSpeed)
maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount
// DDLSlowOprThreshold is the threshold for ddl slow operations, uint is millisecond.
DDLSlowOprThreshold = config.GetGlobalConfig().Instance.DDLSlowOprThreshold
Expand Down

0 comments on commit 50dcee7

Please sign in to comment.