Skip to content

Commit

Permalink
disttask: add notify when task submitted or changed & refactor test (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Dec 28, 2023
1 parent 37b61fb commit 60dbb61
Show file tree
Hide file tree
Showing 23 changed files with 297 additions and 337 deletions.
13 changes: 11 additions & 2 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2101,7 +2101,7 @@ func (w *worker) executeDistTask(reorgInfo *reorgInfo) error {
if err != nil {
return err
}
err = handle.WaitTask(ctx, task.ID)
err = handle.WaitTaskDoneOrPaused(ctx, task.ID)
if err := w.isReorgRunnable(reorgInfo.Job.ID, true); err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
logutil.BgLogger().Warn("job paused by user", zap.String("category", "ddl"), zap.Error(err))
Expand Down Expand Up @@ -2136,7 +2136,7 @@ func (w *worker) executeDistTask(reorgInfo *reorgInfo) error {

g.Go(func() error {
defer close(done)
err := handle.SubmitAndWaitTask(ctx, taskKey, taskType, concurrency, metaData)
err := submitAndWaitTask(ctx, taskKey, taskType, concurrency, metaData)
failpoint.Inject("pauseAfterDistTaskFinished", func() {
MockDMLExecutionOnTaskFinished()
})
Expand Down Expand Up @@ -2209,6 +2209,15 @@ func (w *worker) updateJobRowCount(taskKey string, jobID int64) {
w.getReorgCtx(jobID).setRowCount(rowCount)
}

// submitAndWaitTask submits a task and wait for it to finish.
func submitAndWaitTask(ctx context.Context, taskKey string, taskType proto.TaskType, concurrency int, taskMeta []byte) error {
task, err := handle.SubmitTask(ctx, taskKey, taskType, concurrency, taskMeta)
if err != nil {
return err
}
return handle.WaitTaskDoneOrPaused(ctx, task.ID)
}

func getNextPartitionInfo(reorg *reorgInfo, t table.PartitionedTable, currPhysicalTableID int64) (int64, kv.Key, kv.Key, error) {
pi := t.Meta().GetPartitionInfo()
if pi == nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/disttask/framework/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ go_test(
],
flaky = True,
race = "off",
shard_count = 29,
shard_count = 25,
deps = [
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/testkit",
"//pkg/util",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
],
Expand Down
20 changes: 6 additions & 14 deletions pkg/disttask/framework/framework_err_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,23 @@ import (

"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/stretchr/testify/require"
)

func TestPlanErr(t *testing.T) {
func TestRetryErrOnNextSubtasksBatch(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2)
defer ctrl.Finish()
testutil.RegisterTaskMeta(t, ctrl, testutil.GetPlanErrSchedulerExt(ctrl, testContext), testContext, nil)
testutil.DispatchTaskAndCheckSuccess(ctx, t, "key1", testContext, nil)
testContext.CallTime = 0
submitTaskAndCheckSuccessForBasic(ctx, t, "key1", testContext)
distContext.Close()
}

func TestRevertPlanErr(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2)
defer ctrl.Finish()
testutil.RegisterTaskMeta(t, ctrl, testutil.GetPlanErrSchedulerExt(ctrl, testContext), testContext, nil)
testutil.DispatchTaskAndCheckSuccess(ctx, t, "key1", testContext, nil)
testContext.CallTime = 0
distContext.Close()
}

func TestPlanNotRetryableErr(t *testing.T) {
func TestPlanNotRetryableOnNextSubtasksBatchErr(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 2)
defer ctrl.Finish()

testutil.RegisterTaskMeta(t, ctrl, testutil.GetPlanNotRetryableErrSchedulerExt(ctrl), testContext, nil)
testutil.DispatchTaskAndCheckState(ctx, t, "key1", testContext, proto.TaskStateFailed)
task := testutil.SubmitAndWaitTask(ctx, t, "key1")
require.Equal(t, proto.TaskStateFailed, task.State)
distContext.Close()
}
21 changes: 15 additions & 6 deletions pkg/disttask/framework/framework_ha_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,22 @@
package framework_test

import (
"context"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/stretchr/testify/require"
)

func submitTaskAndCheckSuccessForHA(ctx context.Context, t *testing.T, taskKey string, testContext *testutil.TestContext) {
submitTaskAndCheckSuccess(ctx, t, taskKey, testContext, map[proto.Step]int{
proto.StepOne: 10,
proto.StepTwo: 5,
})
}

func TestHABasic(t *testing.T) {
ctx, ctrl, testContext, distContext := testutil.InitTestContext(t, 4)
defer ctrl.Finish()
Expand All @@ -30,7 +39,7 @@ func TestHABasic(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockCleanExecutor", "return()"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockStopManager", "4*return()"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBDown", "return(\":4000\")"))
testutil.DispatchTaskAndCheckSuccess(ctx, t, "😊", testContext, nil)
submitTaskAndCheckSuccessForHA(ctx, t, "😊", testContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBDown"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockStopManager"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockCleanExecutor"))
Expand All @@ -45,7 +54,7 @@ func TestHAManyNodes(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockCleanExecutor", "return()"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockStopManager", "30*return()"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBDown", "return(\":4000\")"))
testutil.DispatchTaskAndCheckSuccess(ctx, t, "😊", testContext, nil)
submitTaskAndCheckSuccessForHA(ctx, t, "😊", testContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBDown"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockStopManager"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockCleanExecutor"))
Expand All @@ -64,7 +73,7 @@ func TestHAFailInDifferentStage(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBDown", "return(\":4000\")"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBDown2", "return()"))

testutil.DispatchTaskAndCheckSuccess(ctx, t, "😊", testContext, nil)
submitTaskAndCheckSuccessForHA(ctx, t, "😊", testContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBDown"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBDown2"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockStopManager"))
Expand All @@ -84,7 +93,7 @@ func TestHAFailInDifferentStageManyNodes(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBDown", "return(\":4000\")"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBDown2", "return()"))

testutil.DispatchTaskAndCheckSuccess(ctx, t, "😊", testContext, nil)
submitTaskAndCheckSuccessForHA(ctx, t, "😊", testContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBDown"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBDown2"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockStopManager"))
Expand All @@ -98,7 +107,7 @@ func TestHAReplacedButRunning(t *testing.T) {

testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockHATestSchedulerExt(ctrl), testContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBPartitionThenResume", "10*return(true)"))
testutil.DispatchTaskAndCheckSuccess(ctx, t, "😊", testContext, nil)
submitTaskAndCheckSuccessForHA(ctx, t, "😊", testContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBPartitionThenResume"))
distContext.Close()
}
Expand All @@ -109,7 +118,7 @@ func TestHAReplacedButRunningManyNodes(t *testing.T) {

testutil.RegisterTaskMeta(t, ctrl, testutil.GetMockHATestSchedulerExt(ctrl), testContext, nil)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBPartitionThenResume", "30*return(true)"))
testutil.DispatchTaskAndCheckSuccess(ctx, t, "😊", testContext, nil)
submitTaskAndCheckSuccessForHA(ctx, t, "😊", testContext)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockTiDBPartitionThenResume"))
distContext.Close()
}
11 changes: 6 additions & 5 deletions pkg/disttask/framework/framework_pause_and_resume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ func TestFrameworkPauseAndResume(t *testing.T) {
// 1. schedule and pause one running task.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask", "2*return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "return()"))
testutil.DispatchTaskAndCheckState(ctx, t, "key1", testContext, proto.TaskStatePaused)
task1 := testutil.SubmitAndWaitTask(ctx, t, "key1")
require.Equal(t, proto.TaskStatePaused, task1.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pauseTaskAfterRefreshTask"))
// 4 subtask scheduled.
require.NoError(t, handle.ResumeTask(ctx, "key1"))
<-scheduler.TestSyncChan
testutil.WaitTaskExit(ctx, t, "key1")
testutil.WaitTaskDoneOrPaused(ctx, t, task1.Key)
CheckSubtasksState(ctx, t, 1, proto.TaskStateSucceed, 4)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume"))

Expand All @@ -64,13 +65,13 @@ func TestFrameworkPauseAndResume(t *testing.T) {
// 2. pause pending task.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask", "2*return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume", "1*return()"))
testutil.DispatchTaskAndCheckState(ctx, t, "key2", testContext, proto.TaskStatePaused)

task2 := testutil.SubmitAndWaitTask(ctx, t, "key2")
require.Equal(t, proto.TaskStatePaused, task2.State)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/pausePendingTask"))
// 4 subtask scheduled.
require.NoError(t, handle.ResumeTask(ctx, "key2"))
<-scheduler.TestSyncChan
testutil.WaitTaskExit(ctx, t, "key2")
testutil.WaitTaskDoneOrPaused(ctx, t, task2.Key)
CheckSubtasksState(ctx, t, 1, proto.TaskStateSucceed, 4)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/syncAfterResume"))

Expand Down
3 changes: 2 additions & 1 deletion pkg/disttask/framework/framework_rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func TestFrameworkRollback(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/cancelTaskAfterRefreshTask"))
}()

testutil.DispatchTaskAndCheckState(ctx, t, "key1", testContext, proto.TaskStateReverted)
task := testutil.SubmitAndWaitTask(ctx, t, "key1")
require.Equal(t, proto.TaskStateReverted, task.State)
require.Equal(t, int32(2), testContext.RollbackCnt.Load())
testContext.RollbackCnt.Store(0)
distContext.Close()
Expand Down
Loading

0 comments on commit 60dbb61

Please sign in to comment.