diff --git a/pkg/disttask/framework/scheduler/manager.go b/pkg/disttask/framework/scheduler/manager.go index 759e70612ab25..dabb57c273011 100644 --- a/pkg/disttask/framework/scheduler/manager.go +++ b/pkg/disttask/framework/scheduler/manager.go @@ -333,7 +333,12 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) { return } scheduler := factory(ctx, m.id, task, m.taskTable) - err := scheduler.Init(ctx) + + taskCtx, taskCancel := context.WithCancelCause(ctx) + m.registerCancelFunc(task.ID, taskCancel) + defer taskCancel(nil) + + err := scheduler.Init(taskCtx) if err != nil { m.logErrAndPersist(err, task.ID) return @@ -380,14 +385,11 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) { } switch task.State { case proto.TaskStateRunning: - runCtx, runCancel := context.WithCancelCause(ctx) - m.registerCancelFunc(task.ID, runCancel) - err = scheduler.Run(runCtx, task) - runCancel(nil) + err = scheduler.Run(taskCtx, task) case proto.TaskStatePausing: - err = scheduler.Pause(ctx, task) + err = scheduler.Pause(taskCtx, task) case proto.TaskStateReverting: - err = scheduler.Rollback(ctx, task) + err = scheduler.Rollback(taskCtx, task) } if err != nil { logutil.Logger(m.logCtx).Error("failed to handle task", zap.Error(err)) diff --git a/pkg/disttask/framework/scheduler/scheduler.go b/pkg/disttask/framework/scheduler/scheduler.go index 2af66f498c26f..99cea1918b995 100644 --- a/pkg/disttask/framework/scheduler/scheduler.go +++ b/pkg/disttask/framework/scheduler/scheduler.go @@ -124,9 +124,9 @@ func (*BaseScheduler) Init(_ context.Context) error { func (s *BaseScheduler) Run(ctx context.Context, task *proto.Task) (err error) { defer func() { if r := recover(); r != nil { - logutil.Logger(ctx).Error("BaseScheduler panicked", zap.Any("recover", r), zap.Stack("stack")) + logutil.Logger(s.logCtx).Error("BaseScheduler panicked", zap.Any("recover", r), zap.Stack("stack")) err4Panic := errors.Errorf("%v", r) - err1 := s.updateErrorToSubtask(ctx, task.ID, err4Panic) + err1 := s.updateErrorToSubtask(task.ID, err4Panic) if err == nil { err = err1 } @@ -139,7 +139,7 @@ func (s *BaseScheduler) Run(ctx context.Context, task *proto.Task) (err error) { if err == nil { return nil } - return s.updateErrorToSubtask(ctx, task.ID, err) + return s.updateErrorToSubtask(task.ID, err) } func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) (resErr error) { @@ -622,11 +622,11 @@ func (s *BaseScheduler) markSubTaskCanceledOrFailed(ctx context.Context, subtask return false } -func (s *BaseScheduler) updateErrorToSubtask(ctx context.Context, taskID int64, err error) error { +func (s *BaseScheduler) updateErrorToSubtask(taskID int64, err error) error { logger := logutil.Logger(s.logCtx) backoffer := backoff.NewExponential(dispatcher.RetrySQLInterval, 2, dispatcher.RetrySQLMaxInterval) - err1 := handle.RunWithRetry(ctx, dispatcher.RetrySQLTimes, backoffer, logger, - func(ctx context.Context) (bool, error) { + err1 := handle.RunWithRetry(s.logCtx, dispatcher.RetrySQLTimes, backoffer, logger, + func(_ context.Context) (bool, error) { return true, s.taskTable.UpdateErrorToSubtask(s.id, taskID, err) }, ) diff --git a/tests/realtikvtest/addindextest1/BUILD.bazel b/tests/realtikvtest/addindextest1/BUILD.bazel index 2e806d9556b52..cf4b7e113d7ce 100644 --- a/tests/realtikvtest/addindextest1/BUILD.bazel +++ b/tests/realtikvtest/addindextest1/BUILD.bazel @@ -2,7 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test") go_test( name = "addindextest1_test", - timeout = "short", + timeout = "long", srcs = [ "disttask_test.go", "main_test.go", @@ -11,6 +11,10 @@ go_test( deps = [ "//pkg/config", "//pkg/ddl", + "//pkg/ddl/util/callback", + "//pkg/disttask/framework/dispatcher", + "//pkg/disttask/framework/proto", + "//pkg/parser/model", "//pkg/testkit", "//tests/realtikvtest", "@com_github_pingcap_failpoint//:failpoint", diff --git a/tests/realtikvtest/addindextest1/disttask_test.go b/tests/realtikvtest/addindextest1/disttask_test.go index fa5c07861ad9d..c8e7c603cc2dd 100644 --- a/tests/realtikvtest/addindextest1/disttask_test.go +++ b/tests/realtikvtest/addindextest1/disttask_test.go @@ -20,6 +20,10 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/ddl/util/callback" + "github.com/pingcap/tidb/pkg/disttask/framework/dispatcher" + "github.com/pingcap/tidb/pkg/disttask/framework/proto" + "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/tests/realtikvtest" "github.com/stretchr/testify/require" @@ -118,74 +122,73 @@ func TestAddIndexDistCancel(t *testing.T) { tk.MustExec(`set global tidb_enable_dist_task=0;`) } -// TODO: flaky test which can't find the root cause, will run it later. -// func TestAddIndexDistPauseAndResume(t *testing.T) { -// store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) -// if store.Name() != "TiKV" { -// t.Skip("TiKV store only") -// } - -// tk := testkit.NewTestKit(t, store) -// tk1 := testkit.NewTestKit(t, store) -// tk.MustExec("drop database if exists test;") -// tk.MustExec("create database test;") -// tk.MustExec("use test;") - -// tk.MustExec("create table t(a bigint auto_random primary key) partition by hash(a) partitions 8;") -// tk.MustExec("insert into t values (), (), (), (), (), ()") -// tk.MustExec("insert into t values (), (), (), (), (), ()") -// tk.MustExec("insert into t values (), (), (), (), (), ()") -// tk.MustExec("insert into t values (), (), (), (), (), ()") -// tk.MustExec("split table t between (3) and (8646911284551352360) regions 50;") - -// ddl.MockDMLExecutionAddIndexSubTaskFinish = func() { -// row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows() -// require.Equal(t, 1, len(row)) -// jobID := row[0][0].(string) -// tk1.MustExec("admin pause ddl jobs " + jobID) -// <-ddl.TestSyncChan -// } - -// dispatcher.MockDMLExecutionOnPausedState = func(task *proto.Task) { -// row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows() -// require.Equal(t, 1, len(row)) -// jobID := row[0][0].(string) -// tk1.MustExec("admin resume ddl jobs " + jobID) -// } - -// ddl.MockDMLExecutionOnTaskFinished = func() { -// row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows() -// require.Equal(t, 1, len(row)) -// jobID := row[0][0].(string) -// tk1.MustExec("admin pause ddl jobs " + jobID) -// } - -// require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionAddIndexSubTaskFinish", "3*return(true)")) -// require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/mockDMLExecutionOnPausedState", "return(true)")) -// require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/syncDDLTaskPause", "return()")) -// tk.MustExec(`set global tidb_enable_dist_task=1;`) -// tk.MustExec("alter table t add index idx1(a);") -// tk.MustExec("admin check table t;") -// require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionAddIndexSubTaskFinish")) -// require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/mockDMLExecutionOnPausedState")) - -// // dist task succeed, job paused and resumed. -// var hook = &callback.TestDDLCallback{Do: dom} -// var resumeFunc = func(job *model.Job) { -// if job.IsPaused() { -// row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows() -// require.Equal(t, 1, len(row)) -// jobID := row[0][0].(string) -// tk1.MustExec("admin resume ddl jobs " + jobID) -// } -// } -// hook.OnJobUpdatedExported.Store(&resumeFunc) -// dom.DDL().SetHook(hook.Clone()) -// require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/pauseAfterDistTaskFinished", "1*return(true)")) -// tk.MustExec("alter table t add index idx3(a);") -// tk.MustExec("admin check table t;") -// require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/pauseAfterDistTaskFinished")) -// require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/syncDDLTaskPause")) - -// tk.MustExec(`set global tidb_enable_dist_task=0;`) -// } +func TestAddIndexDistPauseAndResume(t *testing.T) { + store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) + if store.Name() != "TiKV" { + t.Skip("TiKV store only") + } + + tk := testkit.NewTestKit(t, store) + tk1 := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists test;") + tk.MustExec("create database test;") + tk.MustExec("use test;") + + tk.MustExec("create table t(a bigint auto_random primary key) partition by hash(a) partitions 8;") + tk.MustExec("insert into t values (), (), (), (), (), ()") + tk.MustExec("insert into t values (), (), (), (), (), ()") + tk.MustExec("insert into t values (), (), (), (), (), ()") + tk.MustExec("insert into t values (), (), (), (), (), ()") + tk.MustExec("split table t between (3) and (8646911284551352360) regions 50;") + + ddl.MockDMLExecutionAddIndexSubTaskFinish = func() { + row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows() + require.Equal(t, 1, len(row)) + jobID := row[0][0].(string) + tk1.MustExec("admin pause ddl jobs " + jobID) + <-ddl.TestSyncChan + } + + dispatcher.MockDMLExecutionOnPausedState = func(task *proto.Task) { + row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows() + require.Equal(t, 1, len(row)) + jobID := row[0][0].(string) + tk1.MustExec("admin resume ddl jobs " + jobID) + } + + ddl.MockDMLExecutionOnTaskFinished = func() { + row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows() + require.Equal(t, 1, len(row)) + jobID := row[0][0].(string) + tk1.MustExec("admin pause ddl jobs " + jobID) + } + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionAddIndexSubTaskFinish", "3*return(true)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/mockDMLExecutionOnPausedState", "return(true)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/syncDDLTaskPause", "return()")) + tk.MustExec(`set global tidb_enable_dist_task=1;`) + tk.MustExec("alter table t add index idx1(a);") + tk.MustExec("admin check table t;") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionAddIndexSubTaskFinish")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/dispatcher/mockDMLExecutionOnPausedState")) + + // dist task succeed, job paused and resumed. + var hook = &callback.TestDDLCallback{Do: dom} + var resumeFunc = func(job *model.Job) { + if job.IsPaused() { + row := tk1.MustQuery("select job_id from mysql.tidb_ddl_job").Rows() + require.Equal(t, 1, len(row)) + jobID := row[0][0].(string) + tk1.MustExec("admin resume ddl jobs " + jobID) + } + } + hook.OnJobUpdatedExported.Store(&resumeFunc) + dom.DDL().SetHook(hook.Clone()) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/pauseAfterDistTaskFinished", "1*return(true)")) + tk.MustExec("alter table t add index idx3(a);") + tk.MustExec("admin check table t;") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/pauseAfterDistTaskFinished")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/syncDDLTaskPause")) + + tk.MustExec(`set global tidb_enable_dist_task=0;`) +}