diff --git a/Makefile b/Makefile index 321c05fd636c7..6d15aaef2fbe5 100644 --- a/Makefile +++ b/Makefile @@ -385,7 +385,7 @@ mock_lightning: tools/bin/mockgen gen_mock: tools/bin/mockgen tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,Pool,Scheduler,Extension > disttask/framework/mock/scheduler_mock.go - tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/dispatcher Dispatcher > disttask/framework/mock/dispatcher_mock.go + tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/dispatcher Dispatcher,CleanUpRoutine > disttask/framework/mock/dispatcher_mock.go tools/bin/mockgen -package execute github.com/pingcap/tidb/disttask/framework/scheduler/execute SubtaskExecutor > disttask/framework/mock/execute/execute_mock.go tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/importinto MiniTaskExecutor > disttask/importinto/mock/import_mock.go tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/planner LogicalPlan,PipelineSpec > disttask/framework/mock/plan_mock.go diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index b35206b6be329..771a8a5708cbe 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -12,6 +12,7 @@ go_library( name = "ddl", srcs = [ "backfilling.go", + "backfilling_clean_s3.go", "backfilling_dispatcher.go", "backfilling_dist_scheduler.go", "backfilling_import_cloud.go", diff --git a/ddl/backfilling_clean_s3.go b/ddl/backfilling_clean_s3.go new file mode 100644 index 0000000000000..24523e94653c5 --- /dev/null +++ b/ddl/backfilling_clean_s3.go @@ -0,0 +1,83 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + "encoding/json" + "strconv" + + "github.com/pingcap/tidb/br/pkg/lightning/backend/external" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/disttask/framework/dispatcher" + "github.com/pingcap/tidb/disttask/framework/proto" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +var _ dispatcher.CleanUpRoutine = (*BackfillCleanUpS3)(nil) + +// BackfillCleanUpS3 implements dispatcher.CleanUpRoutine. +type BackfillCleanUpS3 struct { +} + +func newBackfillCleanUpS3() dispatcher.CleanUpRoutine { + return &BackfillCleanUpS3{} +} + +// CleanUp implements the CleanUpRoutine.CleanUp interface. +func (*BackfillCleanUpS3) CleanUp(ctx context.Context, task *proto.Task) error { + var gTaskMeta BackfillGlobalMeta + if err := json.Unmarshal(task.Meta, &gTaskMeta); err != nil { + return err + } + // Not use cloud storage, no need to cleanUp. + if len(gTaskMeta.CloudStorageURI) == 0 { + return nil + } + backend, err := storage.ParseBackend(gTaskMeta.CloudStorageURI, nil) + if err != nil { + logutil.Logger(ctx).Warn("failed to parse cloud storage uri", zap.Error(err)) + return err + } + extStore, err := storage.NewWithDefaultOpt(ctx, backend) + if err != nil { + logutil.Logger(ctx).Warn("failed to create cloud storage", zap.Error(err)) + return err + } + prefix := strconv.Itoa(int(gTaskMeta.Job.ID)) + err = external.CleanUpFiles(ctx, extStore, prefix) + if err != nil { + logutil.Logger(ctx).Warn("cannot cleanup cloud storage files", zap.Error(err)) + return err + } + redactCloudStorageURI(ctx, task, &gTaskMeta) + return nil +} + +func redactCloudStorageURI( + ctx context.Context, + gTask *proto.Task, + origin *BackfillGlobalMeta, +) { + origin.CloudStorageURI = ast.RedactURL(origin.CloudStorageURI) + metaBytes, err := json.Marshal(origin) + if err != nil { + logutil.Logger(ctx).Warn("failed to marshal task meta", zap.Error(err)) + return + } + gTask.Meta = metaBytes +} diff --git a/ddl/backfilling_dispatcher.go b/ddl/backfilling_dispatcher.go index 63045ae0a3e94..ccbac822d4ef4 100644 --- a/ddl/backfilling_dispatcher.go +++ b/ddl/backfilling_dispatcher.go @@ -21,7 +21,6 @@ import ( "encoding/json" "math" "sort" - "strconv" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend/external" @@ -34,7 +33,6 @@ import ( "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" - "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/table" @@ -81,13 +79,6 @@ func (h *backfillingDispatcherExt) OnNextSubtasksBatch( job := &gTaskMeta.Job useExtStore := len(gTaskMeta.CloudStorageURI) > 0 - defer func() { - // Only redact when the task is complete. - if len(taskMeta) == 0 && useExtStore { - cleanupCloudStorageFiles(ctx, &gTaskMeta) - redactCloudStorageURI(ctx, gTask, &gTaskMeta) - } - }() tblInfo, err := getTblInfo(h.d, job) if err != nil { @@ -182,7 +173,7 @@ func (h *backfillingDispatcherExt) GetEligibleInstances(ctx context.Context, _ * return serverInfos, nil } -// IsRetryableErr implements TaskFlowHandle.IsRetryableErr interface. +// IsRetryableErr implements dispatcher.Extension.IsRetryableErr interface. func (*backfillingDispatcherExt) IsRetryableErr(error) bool { return true } @@ -494,36 +485,3 @@ func getSummaryFromLastStep( } return minKey, maxKey, totalKVSize, allDataFiles, allStatFiles, nil } - -func cleanupCloudStorageFiles(ctx context.Context, gTaskMeta *BackfillGlobalMeta) { - backend, err := storage.ParseBackend(gTaskMeta.CloudStorageURI, nil) - if err != nil { - logutil.Logger(ctx).Warn("failed to parse cloud storage uri", zap.Error(err)) - return - } - extStore, err := storage.NewWithDefaultOpt(ctx, backend) - if err != nil { - logutil.Logger(ctx).Warn("failed to create cloud storage", zap.Error(err)) - return - } - prefix := strconv.Itoa(int(gTaskMeta.Job.ID)) - err = external.CleanUpFiles(ctx, extStore, prefix) - if err != nil { - logutil.Logger(ctx).Warn("cannot cleanup cloud storage files", zap.Error(err)) - return - } -} - -func redactCloudStorageURI( - ctx context.Context, - gTask *proto.Task, - origin *BackfillGlobalMeta, -) { - origin.CloudStorageURI = ast.RedactURL(origin.CloudStorageURI) - metaBytes, err := json.Marshal(origin) - if err != nil { - logutil.Logger(ctx).Warn("fail to marshal task meta", zap.Error(err)) - return - } - gTask.Meta = metaBytes -} diff --git a/ddl/ddl.go b/ddl/ddl.go index b3e155bf746ed..a288fa1fd9e55 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -706,6 +706,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl { func(ctx context.Context, taskMgr *storage.TaskManager, serverID string, task *proto.Task) dispatcher.Dispatcher { return newLitBackfillDispatcher(ctx, taskMgr, serverID, task, backFillDsp) }) + dispatcher.RegisterDispatcherCleanUpFactory(BackfillTaskType, newBackfillCleanUpS3) } // Register functions for enable/disable ddl when changing system variable `tidb_enable_ddl`. diff --git a/disttask/framework/BUILD.bazel b/disttask/framework/BUILD.bazel index 193811f8781b7..a974d5bb41ff3 100644 --- a/disttask/framework/BUILD.bazel +++ b/disttask/framework/BUILD.bazel @@ -13,7 +13,7 @@ go_test( ], flaky = True, race = "off", - shard_count = 30, + shard_count = 31, deps = [ "//disttask/framework/dispatcher", "//disttask/framework/handle", diff --git a/disttask/framework/dispatcher/BUILD.bazel b/disttask/framework/dispatcher/BUILD.bazel index 7389f57d380ad..bb4f1206363f0 100644 --- a/disttask/framework/dispatcher/BUILD.bazel +++ b/disttask/framework/dispatcher/BUILD.bazel @@ -37,7 +37,7 @@ go_test( embed = [":dispatcher"], flaky = True, race = "off", - shard_count = 13, + shard_count = 14, deps = [ "//disttask/framework/mock", "//disttask/framework/proto", diff --git a/disttask/framework/dispatcher/dispatcher.go b/disttask/framework/dispatcher/dispatcher.go index b3c2502369d16..bcc1654be0c0f 100644 --- a/disttask/framework/dispatcher/dispatcher.go +++ b/disttask/framework/dispatcher/dispatcher.go @@ -39,8 +39,6 @@ const ( MaxSubtaskConcurrency = 256 // DefaultLiveNodesCheckInterval is the tick interval of fetching all server infos from etcd. DefaultLiveNodesCheckInterval = 2 - // defaultHistorySubtaskTableGcInterval is the interval of gc history subtask table. - defaultHistorySubtaskTableGcInterval = 24 * time.Hour ) var ( diff --git a/disttask/framework/dispatcher/dispatcher_manager.go b/disttask/framework/dispatcher/dispatcher_manager.go index 25ea508aeec86..84445117ab0af 100644 --- a/disttask/framework/dispatcher/dispatcher_manager.go +++ b/disttask/framework/dispatcher/dispatcher_manager.go @@ -32,8 +32,13 @@ import ( var ( // DefaultDispatchConcurrency is the default concurrency for dispatching task. - DefaultDispatchConcurrency = 3 - checkTaskRunningInterval = 3 * time.Second + DefaultDispatchConcurrency = 4 + // checkTaskRunningInterval is the interval for loading tasks. + checkTaskRunningInterval = 3 * time.Second + // defaultHistorySubtaskTableGcInterval is the interval of gc history subtask table. + defaultHistorySubtaskTableGcInterval = 24 * time.Hour + // defaultCleanUpInterval is the interval of cleanUp routine. + defaultCleanUpInterval = 10 * time.Minute ) // WaitTaskFinished is used to sync the test. @@ -89,20 +94,20 @@ type Manager struct { inited bool serverID string + finishCh chan struct{} + runningTasks struct { syncutil.RWMutex taskIDs map[int64]struct{} dispatchers map[int64]Dispatcher } - finishedTaskCh chan *proto.Task } // NewManager creates a dispatcher struct. func NewManager(ctx context.Context, taskTable *storage.TaskManager, serverID string) (*Manager, error) { dispatcherManager := &Manager{ - taskMgr: taskTable, - finishedTaskCh: make(chan *proto.Task, DefaultDispatchConcurrency), - serverID: serverID, + taskMgr: taskTable, + serverID: serverID, } gPool, err := spool.NewPool("dispatch_pool", int32(DefaultDispatchConcurrency), util.DistTask, spool.WithBlocking(true)) if err != nil { @@ -112,6 +117,7 @@ func NewManager(ctx context.Context, taskTable *storage.TaskManager, serverID st dispatcherManager.ctx, dispatcherManager.cancel = context.WithCancel(ctx) dispatcherManager.runningTasks.taskIDs = make(map[int64]struct{}) dispatcherManager.runningTasks.dispatchers = make(map[int64]Dispatcher) + dispatcherManager.finishCh = make(chan struct{}, DefaultDispatchConcurrency) return dispatcherManager, nil } @@ -119,7 +125,8 @@ func NewManager(ctx context.Context, taskTable *storage.TaskManager, serverID st // Start the dispatcherManager, start the dispatchTaskLoop to start multiple dispatchers. func (dm *Manager) Start() { dm.wg.Run(dm.dispatchTaskLoop) - dm.wg.Run(dm.gcSubtaskHistoryTable) + dm.wg.Run(dm.gcSubtaskHistoryTableLoop) + dm.wg.Run(dm.cleanUpLoop) dm.inited = true } @@ -130,6 +137,7 @@ func (dm *Manager) Stop() { dm.wg.Wait() dm.clearRunningTasks() dm.inited = false + close(dm.finishCh) } // Inited check the manager inited. @@ -179,7 +187,7 @@ func (dm *Manager) dispatchTaskLoop() { // see startDispatcher. // this should not happen normally, unless user modify system table // directly. - if GetDispatcherFactory(task.Type) == nil { + if getDispatcherFactory(task.Type) == nil { logutil.BgLogger().Warn("unknown task type", zap.Int64("task-id", task.ID), zap.String("task-type", task.Type)) dm.failTask(task, errors.New("unknown task type")) @@ -212,7 +220,7 @@ func (dm *Manager) failTask(task *proto.Task, err error) { } } -func (dm *Manager) gcSubtaskHistoryTable() { +func (dm *Manager) gcSubtaskHistoryTableLoop() { historySubtaskTableGcInterval := defaultHistorySubtaskTableGcInterval failpoint.Inject("historySubtaskTableGcInterval", func(val failpoint.Value) { if seconds, ok := val.(int); ok { @@ -222,7 +230,7 @@ func (dm *Manager) gcSubtaskHistoryTable() { <-WaitTaskFinished }) - logutil.Logger(dm.ctx).Info("task table gc loop start") + logutil.Logger(dm.ctx).Info("subtask table gc loop start") ticker := time.NewTicker(historySubtaskTableGcInterval) defer ticker.Stop() for { @@ -231,11 +239,11 @@ func (dm *Manager) gcSubtaskHistoryTable() { logutil.BgLogger().Info("subtask history table gc loop exits", zap.Error(dm.ctx.Err())) return case <-ticker.C: - err := dm.taskMgr.GC() + err := dm.taskMgr.GCSubtasks() if err != nil { logutil.BgLogger().Warn("subtask history table gc failed", zap.Error(err)) } else { - logutil.Logger(dm.ctx).Info("subtask history table gc") + logutil.Logger(dm.ctx).Info("subtask history table gc success") } } } @@ -253,7 +261,7 @@ func (*Manager) checkConcurrencyOverflow(cnt int) bool { func (dm *Manager) startDispatcher(task *proto.Task) { // Using the pool with block, so it wouldn't return an error. _ = dm.gPool.Run(func() { - dispatcherFactory := GetDispatcherFactory(task.Type) + dispatcherFactory := getDispatcherFactory(task.Type) dispatcher := dispatcherFactory(dm.ctx, dm.taskMgr, dm.serverID, task) if err := dispatcher.Init(); err != nil { logutil.BgLogger().Error("init dispatcher failed", zap.Error(err)) @@ -264,9 +272,81 @@ func (dm *Manager) startDispatcher(task *proto.Task) { dm.setRunningTask(task, dispatcher) dispatcher.ExecuteTask() dm.delRunningTask(task.ID) + dm.finishCh <- struct{}{} }) } +func (dm *Manager) cleanUpLoop() { + logutil.Logger(dm.ctx).Info("cleanUp loop start") + ticker := time.NewTicker(defaultCleanUpInterval) + defer ticker.Stop() + for { + select { + case <-dm.ctx.Done(): + logutil.BgLogger().Info("cleanUp loop exits", zap.Error(dm.ctx.Err())) + return + case <-dm.finishCh: + dm.doCleanUpRoutine() + case <-ticker.C: + dm.doCleanUpRoutine() + } + } +} + +// WaitCleanUpFinished is used to sync the test. +var WaitCleanUpFinished = make(chan struct{}) + +// doCleanUpRoutine processes clean up routine defined by each type of tasks. +// For example: +// +// tasks with global sort should clean up tmp files stored on S3. +func (dm *Manager) doCleanUpRoutine() { + logutil.Logger(dm.ctx).Info("cleanUp routine start") + tasks, err := dm.taskMgr.GetGlobalTasksInStates( + proto.TaskStateFailed, + proto.TaskStateReverted, + proto.TaskStateSucceed, + ) + if err != nil { + logutil.BgLogger().Warn("cleanUp routine failed", zap.Error(err)) + return + } + err = dm.cleanUpFinishedTasks(tasks) + if err != nil { + logutil.BgLogger().Warn("cleanUp routine failed", zap.Error(err)) + return + } + failpoint.Inject("WaitCleanUpFinished", func() { + WaitCleanUpFinished <- struct{}{} + }) + logutil.Logger(dm.ctx).Info("cleanUp routine success") +} + +func (dm *Manager) cleanUpFinishedTasks(tasks []*proto.Task) error { + cleanedTasks := make([]*proto.Task, 0) + var firstErr error + for _, task := range tasks { + cleanUpFactory := getDispatcherCleanUpFactory(task.Type) + if cleanUpFactory != nil { + cleanUp := cleanUpFactory() + err := cleanUp.CleanUp(dm.ctx, task) + if err != nil { + firstErr = err + break + } + cleanedTasks = append(cleanedTasks, task) + } else { + // if task doesn't register cleanUp function, mark it as cleaned. + cleanedTasks = append(cleanedTasks, task) + } + } + if firstErr != nil { + logutil.BgLogger().Warn("cleanUp routine failed", zap.Error(errors.Trace(firstErr))) + } + + return dm.taskMgr.TransferTasks2History(cleanedTasks) +} + // MockDispatcher mock one dispatcher for one task, only used for tests. func (dm *Manager) MockDispatcher(task *proto.Task) *BaseDispatcher { return NewBaseDispatcher(dm.ctx, dm.taskMgr, dm.serverID, task) diff --git a/disttask/framework/dispatcher/dispatcher_test.go b/disttask/framework/dispatcher/dispatcher_test.go index ae9117065308e..b3f7ef9e62de7 100644 --- a/disttask/framework/dispatcher/dispatcher_test.go +++ b/disttask/framework/dispatcher/dispatcher_test.go @@ -343,6 +343,11 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel, if len(tasks) == taskCnt { break } + historyTasks, err := mgr.GetGlobalTasksFromHistoryInStates(expectedState) + require.NoError(t, err) + if len(tasks)+len(historyTasks) == taskCnt { + break + } time.Sleep(time.Millisecond * 50) } require.Less(t, i, cnt) @@ -423,7 +428,6 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel, } checkGetTaskState(proto.TaskStateReverted) require.Len(t, tasks, taskCnt) - checkGetRunningTaskCnt(0) } func TestSimple(t *testing.T) { @@ -487,3 +491,70 @@ func TestVerifyTaskStateTransform(t *testing.T) { require.Equal(t, tc.expect, dispatcher.VerifyTaskStateTransform(tc.oldState, tc.newState)) } } + +func TestCleanUpRoutine(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/MockDisableDistTask", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/MockDisableDistTask")) + }() + store := testkit.CreateMockStore(t) + gtk := testkit.NewTestKit(t, store) + pool := pools.NewResourcePool(func() (pools.Resource, error) { + return gtk.Session(), nil + }, 1, 1, time.Second) + defer pool.Close() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dsp, mgr := MockDispatcherManager(t, pool) + mockCleanupRountine := mock.NewMockCleanUpRoutine(ctrl) + mockCleanupRountine.EXPECT().CleanUp(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + dispatcher.RegisterDispatcherFactory(proto.TaskTypeExample, + func(ctx context.Context, taskMgr *storage.TaskManager, serverID string, task *proto.Task) dispatcher.Dispatcher { + mockDispatcher := dsp.MockDispatcher(task) + mockDispatcher.Extension = &numberExampleDispatcherExt{} + return mockDispatcher + }) + dispatcher.RegisterDispatcherCleanUpFactory(proto.TaskTypeExample, + func() dispatcher.CleanUpRoutine { + return mockCleanupRountine + }) + dsp.Start() + defer dsp.Stop() + require.NoError(t, mgr.StartManager(":4000", "background")) + + taskID, err := mgr.AddNewGlobalTask("test", proto.TaskTypeExample, 1, nil) + require.NoError(t, err) + + checkTaskRunningCnt := func() []*proto.Task { + var tasks []*proto.Task + require.Eventually(t, func() bool { + var err error + tasks, err = mgr.GetGlobalTasksInStates(proto.TaskStateRunning) + require.NoError(t, err) + return len(tasks) == 1 + }, time.Second, 50*time.Millisecond) + return tasks + } + + checkSubtaskCnt := func(tasks []*proto.Task, taskID int64) { + require.Eventually(t, func() bool { + cnt, err := mgr.GetSubtaskInStatesCnt(taskID, proto.TaskStatePending) + require.NoError(t, err) + return int64(subtaskCnt) == cnt + }, time.Second, 50*time.Millisecond) + } + + tasks := checkTaskRunningCnt() + checkSubtaskCnt(tasks, taskID) + for i := 1; i <= subtaskCnt; i++ { + err = mgr.UpdateSubtaskStateAndError(int64(i), proto.TaskStateSucceed, nil) + require.NoError(t, err) + } + dsp.DoCleanUpRoutine() + require.Eventually(t, func() bool { + tasks, err := mgr.GetGlobalTasksFromHistoryInStates(proto.TaskStateSucceed) + require.NoError(t, err) + return len(tasks) != 0 + }, time.Second*10, time.Millisecond*300) +} diff --git a/disttask/framework/dispatcher/interface.go b/disttask/framework/dispatcher/interface.go index ebbd1c3e2f167..7a9b627ae257a 100644 --- a/disttask/framework/dispatcher/interface.go +++ b/disttask/framework/dispatcher/interface.go @@ -59,14 +59,14 @@ type Extension interface { GetNextStep(h TaskHandle, task *proto.Task) int64 } -// FactoryFn is used to create a dispatcher. -type FactoryFn func(ctx context.Context, taskMgr *storage.TaskManager, serverID string, task *proto.Task) Dispatcher +// dispatcherFactoryFn is used to create a dispatcher. +type dispatcherFactoryFn func(ctx context.Context, taskMgr *storage.TaskManager, serverID string, task *proto.Task) Dispatcher var dispatcherFactoryMap = struct { syncutil.RWMutex - m map[string]FactoryFn + m map[string]dispatcherFactoryFn }{ - m: make(map[string]FactoryFn), + m: make(map[string]dispatcherFactoryFn), } // RegisterDispatcherFactory is used to register the dispatcher factory. @@ -75,22 +75,59 @@ var dispatcherFactoryMap = struct { // after the server start, there's should be no write to the map. // but for index backfill, the register call stack is so deep, not sure // if it's safe to do so, so we use a lock here. -func RegisterDispatcherFactory(taskType string, ctor FactoryFn) { +func RegisterDispatcherFactory(taskType string, ctor dispatcherFactoryFn) { dispatcherFactoryMap.Lock() defer dispatcherFactoryMap.Unlock() dispatcherFactoryMap.m[taskType] = ctor } -// GetDispatcherFactory is used to get the dispatcher factory. -func GetDispatcherFactory(taskType string) FactoryFn { +// getDispatcherFactory is used to get the dispatcher factory. +func getDispatcherFactory(taskType string) dispatcherFactoryFn { dispatcherFactoryMap.RLock() defer dispatcherFactoryMap.RUnlock() return dispatcherFactoryMap.m[taskType] } -// ClearDispatcherFactory is only used in test +// ClearDispatcherFactory is only used in test. func ClearDispatcherFactory() { dispatcherFactoryMap.Lock() defer dispatcherFactoryMap.Unlock() - dispatcherFactoryMap.m = make(map[string]FactoryFn) + dispatcherFactoryMap.m = make(map[string]dispatcherFactoryFn) +} + +// CleanUpRoutine is used for the framework to do some clean up work if the task is finished. +type CleanUpRoutine interface { + // CleanUp do the clean up work. + CleanUp(ctx context.Context, task *proto.Task) error +} +type cleanUpFactoryFn func() CleanUpRoutine + +var cleanUpFactoryMap = struct { + syncutil.RWMutex + m map[string]cleanUpFactoryFn +}{ + m: make(map[string]cleanUpFactoryFn), +} + +// RegisterDispatcherCleanUpFactory is used to register the dispatcher clean up factory. +// normally dispatcher cleanup is used in the dispatcher_manager gcTaskLoop to do clean up +// works when tasks are finished. +func RegisterDispatcherCleanUpFactory(taskType string, ctor cleanUpFactoryFn) { + cleanUpFactoryMap.Lock() + defer cleanUpFactoryMap.Unlock() + cleanUpFactoryMap.m[taskType] = ctor +} + +// getDispatcherCleanUpFactory is used to get the dispatcher factory. +func getDispatcherCleanUpFactory(taskType string) cleanUpFactoryFn { + cleanUpFactoryMap.RLock() + defer cleanUpFactoryMap.RUnlock() + return cleanUpFactoryMap.m[taskType] +} + +// ClearDispatcherCleanUpFactory is only used in test. +func ClearDispatcherCleanUpFactory() { + cleanUpFactoryMap.Lock() + defer cleanUpFactoryMap.Unlock() + cleanUpFactoryMap.m = make(map[string]cleanUpFactoryFn) } diff --git a/disttask/framework/dispatcher/main_test.go b/disttask/framework/dispatcher/main_test.go index 93ba8964271fd..4811add10ca95 100644 --- a/disttask/framework/dispatcher/main_test.go +++ b/disttask/framework/dispatcher/main_test.go @@ -25,6 +25,7 @@ import ( type DispatcherManagerForTest interface { GetRunningTaskCnt() int DelRunningTask(globalTaskID int64) + DoCleanUpRoutine() } // GetRunningGTaskCnt implements Dispatcher.GetRunningGTaskCnt interface. @@ -37,6 +38,11 @@ func (dm *Manager) DelRunningTask(globalTaskID int64) { dm.delRunningTask(globalTaskID) } +// DoCleanUpRoutine implements Dispatcher.DoCleanUpRoutine interface. +func (dm *Manager) DoCleanUpRoutine() { + dm.doCleanUpRoutine() +} + func TestMain(m *testing.M) { testsetup.SetupForCommonTest() diff --git a/disttask/framework/framework_rollback_test.go b/disttask/framework/framework_rollback_test.go index 1ed99440e8c49..c05ee5e0c03e7 100644 --- a/disttask/framework/framework_rollback_test.go +++ b/disttask/framework/framework_rollback_test.go @@ -77,6 +77,8 @@ func (dsp *rollbackDispatcherExt) GetNextStep(_ dispatcher.TaskHandle, task *pro func registerRollbackTaskMeta(t *testing.T, ctrl *gomock.Controller, m *sync.Map) { mockExtension := mock.NewMockExtension(ctrl) mockExecutor := mockexecute.NewMockSubtaskExecutor(ctrl) + mockCleanupRountine := mock.NewMockCleanUpRoutine(ctrl) + mockCleanupRountine.EXPECT().CleanUp(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockExecutor.EXPECT().Init(gomock.Any()).Return(nil).AnyTimes() mockExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil).AnyTimes() mockExecutor.EXPECT().Rollback(gomock.Any()).DoAndReturn( @@ -92,7 +94,7 @@ func registerRollbackTaskMeta(t *testing.T, ctrl *gomock.Controller, m *sync.Map }).AnyTimes() mockExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockExecutor, nil).AnyTimes() - registerTaskMetaInner(t, proto.TaskTypeExample, mockExtension, &rollbackDispatcherExt{}) + registerTaskMetaInner(t, proto.TaskTypeExample, mockExtension, mockCleanupRountine, &rollbackDispatcherExt{}) rollbackCnt.Store(0) } diff --git a/disttask/framework/framework_test.go b/disttask/framework/framework_test.go index ec5f61ed49a5d..39349b3d950c4 100644 --- a/disttask/framework/framework_test.go +++ b/disttask/framework/framework_test.go @@ -109,6 +109,8 @@ func getMockSubtaskExecutor(ctrl *gomock.Controller) *mockexecute.MockSubtaskExe func RegisterTaskMeta(t *testing.T, ctrl *gomock.Controller, m *sync.Map, dispatcherHandle dispatcher.Extension) { mockExtension := mock.NewMockExtension(ctrl) + mockCleanupRountine := mock.NewMockCleanUpRoutine(ctrl) + mockCleanupRountine.EXPECT().CleanUp(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockSubtaskExecutor := getMockSubtaskExecutor(ctrl) mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, subtask *proto.Subtask) error { @@ -123,12 +125,13 @@ func RegisterTaskMeta(t *testing.T, ctrl *gomock.Controller, m *sync.Map, dispat return nil }).AnyTimes() mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil).AnyTimes() - registerTaskMetaInner(t, proto.TaskTypeExample, mockExtension, dispatcherHandle) + registerTaskMetaInner(t, proto.TaskTypeExample, mockExtension, mockCleanupRountine, dispatcherHandle) } -func registerTaskMetaInner(t *testing.T, taskType string, mockExtension scheduler.Extension, dispatcherHandle dispatcher.Extension) { +func registerTaskMetaInner(t *testing.T, taskType string, mockExtension scheduler.Extension, mockCleanup dispatcher.CleanUpRoutine, dispatcherHandle dispatcher.Extension) { t.Cleanup(func() { dispatcher.ClearDispatcherFactory() + dispatcher.ClearDispatcherCleanUpFactory() scheduler.ClearSchedulers() }) dispatcher.RegisterDispatcherFactory(taskType, @@ -137,6 +140,12 @@ func registerTaskMetaInner(t *testing.T, taskType string, mockExtension schedule baseDispatcher.Extension = dispatcherHandle return baseDispatcher }) + + dispatcher.RegisterDispatcherCleanUpFactory(taskType, + func() dispatcher.CleanUpRoutine { + return mockCleanup + }) + scheduler.RegisterTaskType(taskType, func(ctx context.Context, id string, task *proto.Task, taskTable scheduler.TaskTable) scheduler.Scheduler { s := scheduler.NewBaseScheduler(ctx, id, task.ID, taskTable) @@ -148,6 +157,8 @@ func registerTaskMetaInner(t *testing.T, taskType string, mockExtension schedule func RegisterTaskMetaForExample2(t *testing.T, ctrl *gomock.Controller, m *sync.Map, dispatcherHandle dispatcher.Extension) { mockExtension := mock.NewMockExtension(ctrl) + mockCleanupRountine := mock.NewMockCleanUpRoutine(ctrl) + mockCleanupRountine.EXPECT().CleanUp(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockSubtaskExecutor := getMockSubtaskExecutor(ctrl) mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, subtask *proto.Subtask) error { @@ -162,11 +173,13 @@ func RegisterTaskMetaForExample2(t *testing.T, ctrl *gomock.Controller, m *sync. return nil }).AnyTimes() mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil).AnyTimes() - registerTaskMetaInner(t, proto.TaskTypeExample2, mockExtension, dispatcherHandle) + registerTaskMetaInner(t, proto.TaskTypeExample2, mockExtension, mockCleanupRountine, dispatcherHandle) } func RegisterTaskMetaForExample3(t *testing.T, ctrl *gomock.Controller, m *sync.Map, dispatcherHandle dispatcher.Extension) { mockExtension := mock.NewMockExtension(ctrl) + mockCleanupRountine := mock.NewMockCleanUpRoutine(ctrl) + mockCleanupRountine.EXPECT().CleanUp(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockSubtaskExecutor := getMockSubtaskExecutor(ctrl) mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, subtask *proto.Subtask) error { @@ -181,7 +194,7 @@ func RegisterTaskMetaForExample3(t *testing.T, ctrl *gomock.Controller, m *sync. return nil }).AnyTimes() mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil).AnyTimes() - registerTaskMetaInner(t, proto.TaskTypeExample3, mockExtension, dispatcherHandle) + registerTaskMetaInner(t, proto.TaskTypeExample3, mockExtension, mockCleanupRountine, dispatcherHandle) } func DispatchTask(taskKey string, t *testing.T) *proto.Task { @@ -203,8 +216,7 @@ func WaitTaskExit(t *testing.T, taskKey string) *proto.Task { } time.Sleep(time.Second) - task, err = mgr.GetGlobalTaskByKey(taskKey) - + task, err = mgr.GetGlobalTaskByKeyWithHistory(taskKey) require.NoError(t, err) require.NotNil(t, task) if task.State != proto.TaskStatePending && task.State != proto.TaskStateRunning && task.State != proto.TaskStateCancelling && task.State != proto.TaskStateReverting && task.State != proto.TaskStatePausing { @@ -270,10 +282,10 @@ func DispatchMultiTasksAndOneFail(t *testing.T, num int, m []sync.Map) []*proto. require.FailNow(t, "timeout") } time.Sleep(time.Second) - task, err = mgr.GetGlobalTaskByID(taskID[0]) - tasks[0] = task + task, err = mgr.GetTaskByIDWithHistory(taskID[0]) require.NoError(t, err) require.NotNil(t, task) + tasks[0] = task if task.State != proto.TaskStatePending && task.State != proto.TaskStateRunning && task.State != proto.TaskStateCancelling && task.State != proto.TaskStateReverting { break } @@ -293,10 +305,11 @@ func DispatchMultiTasksAndOneFail(t *testing.T, num int, m []sync.Map) []*proto. require.FailNow(t, "timeout") } time.Sleep(time.Second) - task, err = mgr.GetGlobalTaskByID(taskID[i]) - tasks[i] = task + task, err = mgr.GetTaskByIDWithHistory(taskID[i]) require.NoError(t, err) require.NotNil(t, task) + tasks[i] = task + if task.State != proto.TaskStatePending && task.State != proto.TaskStateRunning && task.State != proto.TaskStateCancelling && task.State != proto.TaskStateReverting { break } @@ -642,3 +655,20 @@ func TestFrameworkRunSubtaskCancel(t *testing.T) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/disttask/framework/scheduler/MockRunSubtaskCancel")) distContext.Close() } + +func TestFrameworkCleanUpRoutine(t *testing.T) { + var m sync.Map + ctrl := gomock.NewController(t) + defer ctrl.Finish() + RegisterTaskMeta(t, ctrl, &m, &testDispatcherExt{}) + distContext := testkit.NewDistExecutionContext(t, 3) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/disttask/framework/dispatcher/WaitCleanUpFinished", "return()")) + DispatchTaskAndCheckSuccess("key1", t, &m) + <-dispatcher.WaitCleanUpFinished + mgr, err := storage.GetTaskManager() + require.NoError(t, err) + tasks, err := mgr.GetGlobalTaskByKeyWithHistory("key1") + require.NoError(t, err) + require.NotEmpty(t, tasks) + distContext.Close() +} diff --git a/disttask/framework/handle/handle.go b/disttask/framework/handle/handle.go index 86bf9fe792329..a2df40176c493 100644 --- a/disttask/framework/handle/handle.go +++ b/disttask/framework/handle/handle.go @@ -73,7 +73,7 @@ func WaitGlobalTask(ctx context.Context, globalTask *proto.Task) error { case <-ctx.Done(): return ctx.Err() case <-ticker.C: - found, err := globalTaskManager.GetGlobalTaskByID(globalTask.ID) + found, err := globalTaskManager.GetTaskByIDWithHistory(globalTask.ID) if err != nil { return errors.Errorf("cannot get global task with ID %d, err %s", globalTask.ID, err.Error()) } diff --git a/disttask/framework/mock/dispatcher_mock.go b/disttask/framework/mock/dispatcher_mock.go index abb3ad16df85f..3a820029df9b0 100644 --- a/disttask/framework/mock/dispatcher_mock.go +++ b/disttask/framework/mock/dispatcher_mock.go @@ -1,12 +1,14 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/pingcap/tidb/disttask/framework/dispatcher (interfaces: Dispatcher) +// Source: github.com/pingcap/tidb/disttask/framework/dispatcher (interfaces: Dispatcher,CleanUpRoutine) // Package mock is a generated GoMock package. package mock import ( + context "context" reflect "reflect" + proto "github.com/pingcap/tidb/disttask/framework/proto" gomock "go.uber.org/mock/gomock" ) @@ -70,3 +72,40 @@ func (mr *MockDispatcherMockRecorder) Init() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockDispatcher)(nil).Init)) } + +// MockCleanUpRoutine is a mock of CleanUpRoutine interface. +type MockCleanUpRoutine struct { + ctrl *gomock.Controller + recorder *MockCleanUpRoutineMockRecorder +} + +// MockCleanUpRoutineMockRecorder is the mock recorder for MockCleanUpRoutine. +type MockCleanUpRoutineMockRecorder struct { + mock *MockCleanUpRoutine +} + +// NewMockCleanUpRoutine creates a new mock instance. +func NewMockCleanUpRoutine(ctrl *gomock.Controller) *MockCleanUpRoutine { + mock := &MockCleanUpRoutine{ctrl: ctrl} + mock.recorder = &MockCleanUpRoutineMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockCleanUpRoutine) EXPECT() *MockCleanUpRoutineMockRecorder { + return m.recorder +} + +// CleanUp mocks base method. +func (m *MockCleanUpRoutine) CleanUp(arg0 context.Context, arg1 *proto.Task) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CleanUp", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// CleanUp indicates an expected call of CleanUp. +func (mr *MockCleanUpRoutineMockRecorder) CleanUp(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanUp", reflect.TypeOf((*MockCleanUpRoutine)(nil).CleanUp), arg0, arg1) +} diff --git a/disttask/framework/proto/task.go b/disttask/framework/proto/task.go index 2403f76304586..0798a5149a9f0 100644 --- a/disttask/framework/proto/task.go +++ b/disttask/framework/proto/task.go @@ -132,6 +132,8 @@ const ( TaskTypeExample3 = "Example2" // ImportInto is TaskType of ImportInto. ImportInto = "ImportInto" + // Backfill is TaskType of add index Backfilling process. + Backfill = "backfill" ) // Type2Int converts task type to int. diff --git a/disttask/framework/storage/BUILD.bazel b/disttask/framework/storage/BUILD.bazel index c360f3e7bb71f..6a65604959789 100644 --- a/disttask/framework/storage/BUILD.bazel +++ b/disttask/framework/storage/BUILD.bazel @@ -31,7 +31,7 @@ go_test( srcs = ["table_test.go"], flaky = True, race = "on", - shard_count = 6, + shard_count = 7, deps = [ ":storage", "//disttask/framework/proto", diff --git a/disttask/framework/storage/table_test.go b/disttask/framework/storage/table_test.go index 09f4d14f3ff43..c38f7fd23456d 100644 --- a/disttask/framework/storage/table_test.go +++ b/disttask/framework/storage/table_test.go @@ -481,7 +481,7 @@ func TestSubtaskHistoryTable(t *testing.T) { require.NoError(t, err) require.Len(t, subTasks, 3) - // test GC + // test GC history table. failpoint.Enable("github.com/pingcap/tidb/disttask/framework/storage/subtaskHistoryKeepSeconds", "return(1)") defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/disttask/framework/storage/subtaskHistoryKeepSeconds")) @@ -492,13 +492,57 @@ func TestSubtaskHistoryTable(t *testing.T) { require.NoError(t, sm.UpdateSubtaskStateAndError(subTask4, proto.TaskStateFailed, nil)) require.NoError(t, sm.TransferSubTasks2History(taskID2)) - require.NoError(t, sm.GC()) + require.NoError(t, sm.GCSubtasks()) historySubTasksCnt, err = storage.GetSubtasksFromHistoryForTest(sm) require.NoError(t, err) require.Equal(t, 1, historySubTasksCnt) } +func TestTaskHistoryTable(t *testing.T) { + pool := GetResourcePool(t) + gm := GetTaskManager(t, pool) + defer pool.Close() + + _, err := gm.AddNewGlobalTask("1", proto.TaskTypeExample, 1, nil) + require.NoError(t, err) + taskID, err := gm.AddNewGlobalTask("2", proto.TaskTypeExample, 1, nil) + require.NoError(t, err) + + tasks, err := gm.GetGlobalTasksInStates(proto.TaskStatePending) + require.NoError(t, err) + require.Equal(t, 2, len(tasks)) + + require.NoError(t, gm.TransferTasks2History(tasks)) + + tasks, err = gm.GetGlobalTasksInStates(proto.TaskStatePending) + require.NoError(t, err) + require.Equal(t, 0, len(tasks)) + num, err := storage.GetTasksFromHistoryForTest(gm) + require.NoError(t, err) + require.Equal(t, 2, num) + + task, err := gm.GetTaskByIDWithHistory(taskID) + require.NoError(t, err) + require.NotNil(t, task) + + task, err = gm.GetGlobalTaskByKeyWithHistory("1") + require.NoError(t, err) + require.NotNil(t, task) + + // task with fail transfer + _, err = gm.AddNewGlobalTask("3", proto.TaskTypeExample, 1, nil) + require.NoError(t, err) + tasks, err = gm.GetGlobalTasksInStates(proto.TaskStatePending) + require.NoError(t, err) + require.Equal(t, 1, len(tasks)) + tasks[0].Error = errors.New("mock err") + require.NoError(t, gm.TransferTasks2History(tasks)) + num, err = storage.GetTasksFromHistoryForTest(gm) + require.NoError(t, err) + require.Equal(t, 3, num) +} + func TestPauseAndResume(t *testing.T) { pool := GetResourcePool(t) sm := GetTaskManager(t, pool) diff --git a/disttask/framework/storage/task_table.go b/disttask/framework/storage/task_table.go index 1e71846da198b..0feab6022bdcd 100644 --- a/disttask/framework/storage/task_table.go +++ b/disttask/framework/storage/task_table.go @@ -199,9 +199,9 @@ func (stm *TaskManager) AddNewGlobalTask(key, tp string, concurrency int, meta [ // AddGlobalTaskWithSession adds a new task to global task table with session. func (stm *TaskManager) AddGlobalTaskWithSession(se sessionctx.Context, key, tp string, concurrency int, meta []byte) (taskID int64, err error) { _, err = ExecSQL(stm.ctx, se, - `insert into mysql.tidb_global_task(task_key, type, state, concurrency, step, meta, state_update_time) - values (%?, %?, %?, %?, %?, %?, %?)`, - key, tp, proto.TaskStatePending, concurrency, proto.StepInit, meta, time.Now().UTC().String()) + `insert into mysql.tidb_global_task(task_key, type, state, concurrency, step, meta, start_time, state_update_time) + values (%?, %?, %?, %?, %?, %?, CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP())`, + key, tp, proto.TaskStatePending, concurrency, proto.StepInit, meta) if err != nil { return 0, err } @@ -248,6 +248,23 @@ func (stm *TaskManager) GetGlobalTasksInStates(states ...interface{}) (task []*p return task, nil } +// GetGlobalTasksFromHistoryInStates gets the tasks in history table in the states. +func (stm *TaskManager) GetGlobalTasksFromHistoryInStates(states ...interface{}) (task []*proto.Task, err error) { + if len(states) == 0 { + return task, nil + } + + rs, err := stm.executeSQLWithNewSession(stm.ctx, "select id, task_key, type, dispatcher_id, state, start_time, state_update_time, meta, concurrency, step, error from mysql.tidb_global_task_history where state in ("+strings.Repeat("%?,", len(states)-1)+"%?)", states...) + if err != nil { + return task, err + } + + for _, r := range rs { + task = append(task, row2GlobeTask(r)) + } + return task, nil +} + // GetGlobalTaskByID gets the task by the global task ID. func (stm *TaskManager) GetGlobalTaskByID(taskID int64) (task *proto.Task, err error) { rs, err := stm.executeSQLWithNewSession(stm.ctx, "select id, task_key, type, dispatcher_id, state, start_time, state_update_time, meta, concurrency, step, error from mysql.tidb_global_task where id = %?", taskID) @@ -261,7 +278,21 @@ func (stm *TaskManager) GetGlobalTaskByID(taskID int64) (task *proto.Task, err e return row2GlobeTask(rs[0]), nil } -// GetGlobalTaskByKey gets the task by the task key +// GetTaskByIDWithHistory gets the task by the global task ID from both tidb_global_task and tidb_global_task_history. +func (stm *TaskManager) GetTaskByIDWithHistory(taskID int64) (task *proto.Task, err error) { + rs, err := stm.executeSQLWithNewSession(stm.ctx, "select id, task_key, type, dispatcher_id, state, start_time, state_update_time, meta, concurrency, step, error from mysql.tidb_global_task where id = %? "+ + "union select id, task_key, type, dispatcher_id, state, start_time, state_update_time, meta, concurrency, step, error from mysql.tidb_global_task_history where id = %?", taskID, taskID) + if err != nil { + return task, err + } + if len(rs) == 0 { + return nil, nil + } + + return row2GlobeTask(rs[0]), nil +} + +// GetGlobalTaskByKey gets the task by the task key. func (stm *TaskManager) GetGlobalTaskByKey(key string) (task *proto.Task, err error) { rs, err := stm.executeSQLWithNewSession(stm.ctx, "select id, task_key, type, dispatcher_id, state, start_time, state_update_time, meta, concurrency, step, error from mysql.tidb_global_task where task_key = %?", key) if err != nil { @@ -274,6 +305,20 @@ func (stm *TaskManager) GetGlobalTaskByKey(key string) (task *proto.Task, err er return row2GlobeTask(rs[0]), nil } +// GetGlobalTaskByKeyWithHistory gets the task from history table by the task key. +func (stm *TaskManager) GetGlobalTaskByKeyWithHistory(key string) (task *proto.Task, err error) { + rs, err := stm.executeSQLWithNewSession(stm.ctx, "select id, task_key, type, dispatcher_id, state, start_time, state_update_time, meta, concurrency, step, error from mysql.tidb_global_task where task_key = %?"+ + "union select id, task_key, type, dispatcher_id, state, start_time, state_update_time, meta, concurrency, step, error from mysql.tidb_global_task_history where task_key = %?", key, key) + if err != nil { + return task, err + } + if len(rs) == 0 { + return nil, nil + } + + return row2GlobeTask(rs[0]), nil +} + // row2SubTask converts a row to a subtask. func row2SubTask(r chunk.Row) *proto.Subtask { // subtask defines start/update time as bigint, to ensure backward compatible, @@ -845,8 +890,8 @@ func (stm *TaskManager) TransferSubTasks2History(taskID int64) error { }) } -// GC deletes the history subtask which is older than the given days. -func (stm *TaskManager) GC() error { +// GCSubtasks deletes the history subtask which is older than the given days. +func (stm *TaskManager) GCSubtasks() error { subtaskHistoryKeepSeconds := defaultSubtaskKeepDays * 24 * 60 * 60 failpoint.Inject("subtaskHistoryKeepSeconds", func(val failpoint.Value) { if val, ok := val.(int); ok { @@ -860,6 +905,54 @@ func (stm *TaskManager) GC() error { return err } +// TransferTasks2History transfer the selected tasks into tidb_global_task_history table by taskIDs. +func (stm *TaskManager) TransferTasks2History(tasks []*proto.Task) error { + if len(tasks) == 0 { + return nil + } + return stm.WithNewTxn(stm.ctx, func(se sessionctx.Context) error { + insertSQL := new(strings.Builder) + if err := sqlexec.FormatSQL(insertSQL, "insert into mysql.tidb_global_task_history"+ + "(id, task_key, type, dispatcher_id, state, start_time, state_update_time,"+ + "meta, concurrency, step, error) values"); err != nil { + return err + } + + for i, task := range tasks { + if i != 0 { + if err := sqlexec.FormatSQL(insertSQL, ","); err != nil { + return err + } + } + if err := sqlexec.FormatSQL(insertSQL, "(%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)", + task.ID, task.Key, task.Type, task.DispatcherID, + task.State, task.StartTime, task.StateUpdateTime, + task.Meta, task.Concurrency, task.Step, serializeErr(task.Error)); err != nil { + return err + } + } + _, err := ExecSQL(stm.ctx, se, insertSQL.String()) + if err != nil { + return err + } + + // delete taskIDs tasks + deleteSQL := new(strings.Builder) + if err := sqlexec.FormatSQL(deleteSQL, "delete from mysql.tidb_global_task where id in("); err != nil { + return err + } + deleteElems := make([]string, 0, len(tasks)) + for _, task := range tasks { + deleteElems = append(deleteElems, fmt.Sprintf("%d", task.ID)) + } + + deleteSQL.WriteString(strings.Join(deleteElems, ", ")) + deleteSQL.WriteString(")") + _, err = ExecSQL(stm.ctx, se, deleteSQL.String()) + return err + }) +} + // GetNodesByRole gets nodes map from dist_framework_meta by role. func (stm *TaskManager) GetNodesByRole(role string) (map[string]bool, error) { rs, err := stm.executeSQLWithNewSession(stm.ctx, diff --git a/disttask/framework/storage/util.go b/disttask/framework/storage/util.go index 98ef4df4e9f72..c9feff2ce73be 100644 --- a/disttask/framework/storage/util.go +++ b/disttask/framework/storage/util.go @@ -52,3 +52,13 @@ func GetSubtasksByTaskIDForTest(stm *TaskManager, taskID int64) ([]*proto.Subtas } return subtasks, nil } + +// GetTasksFromHistoryForTest gets tasks from history table for test. +func GetTasksFromHistoryForTest(stm *TaskManager) (int, error) { + rs, err := stm.executeSQLWithNewSession(stm.ctx, + "select * from mysql.tidb_global_task_history") + if err != nil { + return 0, err + } + return len(rs), nil +} diff --git a/disttask/importinto/BUILD.bazel b/disttask/importinto/BUILD.bazel index 1133667f891c9..d409a31dfc769 100644 --- a/disttask/importinto/BUILD.bazel +++ b/disttask/importinto/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "importinto", srcs = [ + "clean_s3.go", "dispatcher.go", "encode_and_sort_operator.go", "job.go", diff --git a/disttask/importinto/clean_s3.go b/disttask/importinto/clean_s3.go new file mode 100644 index 0000000000000..d4c05cf893bae --- /dev/null +++ b/disttask/importinto/clean_s3.go @@ -0,0 +1,77 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importinto + +import ( + "context" + "encoding/json" + "strconv" + + "github.com/pingcap/tidb/br/pkg/lightning/backend/external" + "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/disttask/framework/dispatcher" + "github.com/pingcap/tidb/disttask/framework/proto" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +var _ dispatcher.CleanUpRoutine = (*ImportCleanUpS3)(nil) + +// ImportCleanUpS3 implements dispatcher.CleanUpRoutine. +type ImportCleanUpS3 struct { +} + +func newImportCleanUpS3() dispatcher.CleanUpRoutine { + return &ImportCleanUpS3{} +} + +// CleanUp implements the CleanUpRoutine.CleanUp interface. +func (*ImportCleanUpS3) CleanUp(ctx context.Context, task *proto.Task) error { + // we can only clean up files after all write&ingest subtasks are finished, + // since they might share the same file. + taskMeta := &TaskMeta{} + err := json.Unmarshal(task.Meta, taskMeta) + if err != nil { + return err + } + defer redactSensitiveInfo(task, taskMeta) + // Not use cloud storage, no need to cleanUp. + if taskMeta.Plan.CloudStorageURI == "" { + return nil + } + logger := logutil.BgLogger().With(zap.Int64("task-id", task.ID)) + callLog := log.BeginTask(logger, "cleanup global sorted data") + defer callLog.End(zap.InfoLevel, nil) + + controller, err := buildController(&taskMeta.Plan, taskMeta.Stmt) + if err != nil { + logger.Warn("failed to build controller", zap.Error(err)) + return err + } + if err = controller.InitDataStore(ctx); err != nil { + logger.Warn("failed to init data store", zap.Error(err)) + return err + } + if err = external.CleanUpFiles(ctx, controller.GlobalSortStore, + strconv.Itoa(int(task.ID))); err != nil { + logger.Warn("failed to clean up files of task", zap.Error(err)) + return err + } + return nil +} + +func init() { + dispatcher.RegisterDispatcherCleanUpFactory(proto.ImportInto, newImportCleanUpS3) +} diff --git a/disttask/importinto/dispatcher.go b/disttask/importinto/dispatcher.go index f979bbdbb2b9d..cf3810a7a7d58 100644 --- a/disttask/importinto/dispatcher.go +++ b/disttask/importinto/dispatcher.go @@ -25,11 +25,9 @@ import ( dmysql "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/br/pkg/lightning/backend/external" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" - "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/disttask/framework/dispatcher" @@ -662,10 +660,6 @@ func job2Step(ctx context.Context, logger *zap.Logger, taskMeta *TaskMeta, step func (dsp *ImportDispatcherExt) finishJob(ctx context.Context, logger *zap.Logger, taskHandle dispatcher.TaskHandle, gTask *proto.Task, taskMeta *TaskMeta) error { dsp.unregisterTask(ctx, gTask) - if dsp.GlobalSort { - cleanUpGlobalSortedData(ctx, gTask, taskMeta) - } - redactSensitiveInfo(gTask, taskMeta) summary := &importer.JobSummary{ImportedRows: taskMeta.Result.LoadedRowCnt} // retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes backoffer := backoff.NewExponential(dispatcher.RetrySQLInterval, 2, dispatcher.RetrySQLMaxInterval) @@ -683,10 +677,6 @@ func (dsp *ImportDispatcherExt) failJob(ctx context.Context, taskHandle dispatch taskMeta *TaskMeta, logger *zap.Logger, errorMsg string) error { dsp.switchTiKV2NormalMode(ctx, gTask, logger) dsp.unregisterTask(ctx, gTask) - if dsp.GlobalSort { - cleanUpGlobalSortedData(ctx, gTask, taskMeta) - } - redactSensitiveInfo(gTask, taskMeta) // retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes backoffer := backoff.NewExponential(dispatcher.RetrySQLInterval, 2, dispatcher.RetrySQLMaxInterval) return handle.RunWithRetry(ctx, dispatcher.RetrySQLTimes, backoffer, logger, @@ -699,29 +689,6 @@ func (dsp *ImportDispatcherExt) failJob(ctx context.Context, taskHandle dispatch ) } -func cleanUpGlobalSortedData(ctx context.Context, gTask *proto.Task, taskMeta *TaskMeta) { - // we can only clean up files after all write&ingest subtasks are finished, - // since they might share the same file. - // we don't return error here, since the task is already done, we should - // return success if the task is success. - // TODO: maybe add a way to notify user that there are files left in global sorted storage. - logger := logutil.BgLogger().With(zap.Int64("task-id", gTask.ID)) - callLog := log.BeginTask(logger, "cleanup global sorted data") - defer callLog.End(zap.InfoLevel, nil) - - controller, err := buildController(&taskMeta.Plan, taskMeta.Stmt) - if err != nil { - logger.Warn("failed to build controller", zap.Error(err)) - } - if err = controller.InitDataStore(ctx); err != nil { - logger.Warn("failed to init data store", zap.Error(err)) - } - if err = external.CleanUpFiles(ctx, controller.GlobalSortStore, - strconv.Itoa(int(gTask.ID))); err != nil { - logger.Warn("failed to clean up files of task", zap.Error(err)) - } -} - func redactSensitiveInfo(gTask *proto.Task, taskMeta *TaskMeta) { taskMeta.Stmt = "" taskMeta.Plan.Path = ast.RedactURL(taskMeta.Plan.Path) diff --git a/tests/realtikvtest/addindextest/BUILD.bazel b/tests/realtikvtest/addindextest/BUILD.bazel index 0cafae1d93802..f9723b948186c 100644 --- a/tests/realtikvtest/addindextest/BUILD.bazel +++ b/tests/realtikvtest/addindextest/BUILD.bazel @@ -43,6 +43,7 @@ go_test( "//ddl/ingest", "//ddl/testutil", "//ddl/util/callback", + "//disttask/framework/dispatcher", "//disttask/operator", "//domain", "//errno", diff --git a/tests/realtikvtest/addindextest/global_sort_test.go b/tests/realtikvtest/addindextest/global_sort_test.go index 167e7094a000d..145a8ec8b0da4 100644 --- a/tests/realtikvtest/addindextest/global_sort_test.go +++ b/tests/realtikvtest/addindextest/global_sort_test.go @@ -22,9 +22,11 @@ import ( "testing" "github.com/fsouza/fake-gcs-server/fakestorage" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend/external" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/ddl/util/callback" + "github.com/pingcap/tidb/disttask/framework/dispatcher" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit" @@ -55,6 +57,7 @@ func TestGlobalSortCleanupCloudFiles(t *testing.T) { store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t) tk := testkit.NewTestKit(t, store) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/disttask/framework/dispatcher/WaitCleanUpFinished", "return()")) tk.MustExec("drop database if exists addindexlit;") tk.MustExec("create database addindexlit;") tk.MustExec("use addindexlit;") @@ -91,7 +94,7 @@ func TestGlobalSortCleanupCloudFiles(t *testing.T) { tk.MustExec("alter table t add index idx(a);") dom.DDL().SetHook(origin) tk.MustExec("admin check table t;") - + <-dispatcher.WaitCleanUpFinished storeBackend, err := storage.ParseBackend(sortStorageURI, nil) require.NoError(t, err) opts := &storage.ExternalStorageOptions{NoCredentials: true} @@ -103,4 +106,5 @@ func TestGlobalSortCleanupCloudFiles(t *testing.T) { require.Greater(t, jobID, int64(0)) require.Equal(t, 0, len(dataFiles)) require.Equal(t, 0, len(statFiles)) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/disttask/framework/dispatcher/WaitCleanUpFinished")) } diff --git a/tests/realtikvtest/importintotest/BUILD.bazel b/tests/realtikvtest/importintotest/BUILD.bazel index 0317cb205284e..f7e1bf8c9bc84 100644 --- a/tests/realtikvtest/importintotest/BUILD.bazel +++ b/tests/realtikvtest/importintotest/BUILD.bazel @@ -24,6 +24,7 @@ go_test( "//br/pkg/streamhelper", "//br/pkg/utils", "//config", + "//disttask/framework/dispatcher", "//disttask/framework/proto", "//disttask/framework/scheduler", "//disttask/framework/storage", diff --git a/tests/realtikvtest/importintotest/import_into_test.go b/tests/realtikvtest/importintotest/import_into_test.go index b746eca539b7c..ebf942c9c7749 100644 --- a/tests/realtikvtest/importintotest/import_into_test.go +++ b/tests/realtikvtest/importintotest/import_into_test.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/mock/mocklocal" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/disttask/framework/dispatcher" "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/disttask/importinto" "github.com/pingcap/tidb/domain/infosync" @@ -777,7 +778,7 @@ func (s *mockGCSSuite) checkTaskMetaRedacted(jobID int64) { s.NoError(err) taskKey := importinto.TaskKey(jobID) s.NoError(err) - globalTask, err2 := globalTaskManager.GetGlobalTaskByKey(taskKey) + globalTask, err2 := globalTaskManager.GetGlobalTaskByKeyWithHistory(taskKey) s.NoError(err2) s.Regexp(`[?&]access-key=xxxxxx`, string(globalTask.Meta)) s.Contains(string(globalTask.Meta), "secret-access-key=xxxxxx") @@ -820,6 +821,7 @@ func (s *mockGCSSuite) TestImportMode() { // NOTE: this case only runs when current instance is TiDB owner, if you run it locally, // better start a cluster without TiDB instance. s.enableFailpoint("github.com/pingcap/tidb/parser/ast/forceRedactURL", "return(true)") + s.enableFailpoint("github.com/pingcap/tidb/disttask/framework/dispatcher/WaitCleanUpFinished", "return()") sql := fmt.Sprintf(`IMPORT INTO load_data.import_mode FROM 'gs://test-load/import_mode-*.tsv?access-key=aaaaaa&secret-access-key=bbbbbb&endpoint=%s'`, gcsEndpoint) rows := s.tk.MustQuery(sql).Rows() s.Len(rows, 1) @@ -827,8 +829,8 @@ func (s *mockGCSSuite) TestImportMode() { s.NoError(err) s.tk.MustQuery("SELECT * FROM load_data.import_mode;").Sort().Check(testkit.Rows("1 11 111")) s.Greater(intoNormalTime, intoImportTime) + <-dispatcher.WaitCleanUpFinished s.checkTaskMetaRedacted(int64(jobID)) - // after import step, we should enter normal mode, i.e. we only call ToImportMode once intoNormalTime, intoImportTime = time.Time{}, time.Time{} switcher.EXPECT().ToImportMode(gomock.Any(), gomock.Any()).DoAndReturn(toImportModeFn).Times(1) @@ -843,6 +845,7 @@ func (s *mockGCSSuite) TestImportMode() { s.Greater(intoNormalTime, intoImportTime) s.NoError(failpoint.Disable("github.com/pingcap/tidb/disttask/importinto/clearLastSwitchTime")) s.NoError(failpoint.Disable("github.com/pingcap/tidb/disttask/importinto/waitBeforePostProcess")) + <-dispatcher.WaitCleanUpFinished // test disable_tikv_import_mode, should not call ToImportMode and ToNormalMode s.tk.MustExec("truncate table load_data.import_mode;") @@ -850,6 +853,7 @@ func (s *mockGCSSuite) TestImportMode() { s.tk.MustQuery(sql) s.tk.MustQuery("SELECT * FROM load_data.import_mode;").Sort().Check(testkit.Rows("1 11 111")) s.tk.MustExec("truncate table load_data.import_mode;") + <-dispatcher.WaitCleanUpFinished // test with multirocksdb s.enableFailpoint("github.com/pingcap/tidb/ddl/util/IsRaftKv2", "return(true)") @@ -858,6 +862,8 @@ func (s *mockGCSSuite) TestImportMode() { s.tk.MustQuery(sql) s.tk.MustQuery("SELECT * FROM load_data.import_mode;").Sort().Check(testkit.Rows("1 11 111")) s.tk.MustExec("truncate table load_data.import_mode;") + <-dispatcher.WaitCleanUpFinished + s.NoError(failpoint.Disable("github.com/pingcap/tidb/ddl/util/IsRaftKv2")) // to normal mode should be called on error @@ -867,11 +873,14 @@ func (s *mockGCSSuite) TestImportMode() { s.enableFailpoint("github.com/pingcap/tidb/disttask/importinto/waitBeforeSortChunk", "return(true)") s.enableFailpoint("github.com/pingcap/tidb/disttask/importinto/errorWhenSortChunk", "return(true)") s.enableFailpoint("github.com/pingcap/tidb/executor/importer/setLastImportJobID", `return(true)`) + sql = fmt.Sprintf(`IMPORT INTO load_data.import_mode FROM 'gs://test-load/import_mode-*.tsv?access-key=aaaaaa&secret-access-key=bbbbbb&endpoint=%s'`, gcsEndpoint) err = s.tk.QueryToErr(sql) s.Error(err) s.Greater(intoNormalTime, intoImportTime) + <-dispatcher.WaitCleanUpFinished s.checkTaskMetaRedacted(importer.TestLastImportJobID.Load()) + s.NoError(failpoint.Disable("github.com/pingcap/tidb/disttask/framework/dispatcher/WaitCleanUpFinished")) } func (s *mockGCSSuite) TestRegisterTask() { diff --git a/tests/realtikvtest/importintotest/job_test.go b/tests/realtikvtest/importintotest/job_test.go index 98d623b77be54..3154d07ac30bd 100644 --- a/tests/realtikvtest/importintotest/job_test.go +++ b/tests/realtikvtest/importintotest/job_test.go @@ -401,7 +401,7 @@ func (s *mockGCSSuite) TestCancelJob() { globalTaskManager, err := storage.GetTaskManager() s.NoError(err) taskKey := importinto.TaskKey(jobID) - globalTask, err := globalTaskManager.GetGlobalTaskByKey(taskKey) + globalTask, err := globalTaskManager.GetGlobalTaskByKeyWithHistory(taskKey) s.NoError(err) return globalTask } @@ -493,7 +493,7 @@ func (s *mockGCSSuite) TestCancelJob() { taskKey := importinto.TaskKey(int64(jobID2)) s.NoError(err) s.Require().Eventually(func() bool { - globalTask, err2 := globalTaskManager.GetGlobalTaskByKey(taskKey) + globalTask, err2 := globalTaskManager.GetGlobalTaskByKeyWithHistory(taskKey) s.NoError(err2) subtasks, err2 := globalTaskManager.GetSubtasksForImportInto(globalTask.ID, importinto.StepPostProcess) s.NoError(err2) @@ -622,7 +622,7 @@ func (s *mockGCSSuite) TestKillBeforeFinish() { taskKey := importinto.TaskKey(jobID) s.NoError(err) s.Require().Eventually(func() bool { - globalTask, err2 := globalTaskManager.GetGlobalTaskByKey(taskKey) + globalTask, err2 := globalTaskManager.GetGlobalTaskByKeyWithHistory(taskKey) s.NoError(err2) return globalTask.State == proto.TaskStateReverted }, 5*time.Second, 1*time.Second) diff --git a/tests/realtikvtest/importintotest4/BUILD.bazel b/tests/realtikvtest/importintotest4/BUILD.bazel index 243c775e6acf7..9ae963e045b25 100644 --- a/tests/realtikvtest/importintotest4/BUILD.bazel +++ b/tests/realtikvtest/importintotest4/BUILD.bazel @@ -13,6 +13,7 @@ go_test( deps = [ "//br/pkg/lightning/config", "//config", + "//disttask/framework/dispatcher", "//disttask/framework/storage", "//disttask/importinto", "//executor/importer", diff --git a/tests/realtikvtest/importintotest4/global_sort_test.go b/tests/realtikvtest/importintotest4/global_sort_test.go index 7119c7a2f8768..66b01e234f911 100644 --- a/tests/realtikvtest/importintotest4/global_sort_test.go +++ b/tests/realtikvtest/importintotest4/global_sort_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/fsouza/fake-gcs-server/fakestorage" + "github.com/pingcap/tidb/disttask/framework/dispatcher" "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/disttask/importinto" "github.com/pingcap/tidb/executor/importer" @@ -57,6 +58,8 @@ func (s *mockGCSSuite) TestGlobalSortBasic() { s.tk.MustExec(`create table t (a bigint primary key, b varchar(100), c varchar(100), d int, key(a), key(c,d), key(d));`) s.enableFailpoint("github.com/pingcap/tidb/parser/ast/forceRedactURL", "return(true)") + s.enableFailpoint("github.com/pingcap/tidb/disttask/framework/dispatcher/WaitCleanUpFinished", "return()") + sortStorageURI := fmt.Sprintf("gs://sorted/import?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint) importSQL := fmt.Sprintf(`import into t FROM 'gs://gs-basic/t.*.csv?endpoint=%s' with __max_engine_size = '1', cloud_storage_uri='%s'`, gcsEndpoint, sortStorageURI) @@ -70,6 +73,8 @@ func (s *mockGCSSuite) TestGlobalSortBasic() { )) // check all sorted data cleaned up + <-dispatcher.WaitCleanUpFinished + _, files, err := s.server.ListObjectsWithOptions("sorted", fakestorage.ListOptions{Prefix: "import"}) s.NoError(err) s.Len(files, 0) @@ -82,7 +87,7 @@ func (s *mockGCSSuite) TestGlobalSortBasic() { globalTaskManager, err := storage.GetTaskManager() s.NoError(err) taskKey := importinto.TaskKey(int64(jobID)) - globalTask, err2 := globalTaskManager.GetGlobalTaskByKey(taskKey) + globalTask, err2 := globalTaskManager.GetGlobalTaskByKeyWithHistory(taskKey) s.NoError(err2) taskMeta := importinto.TaskMeta{} s.NoError(json.Unmarshal(globalTask.Meta, &taskMeta)) @@ -97,6 +102,7 @@ func (s *mockGCSSuite) TestGlobalSortBasic() { "1 foo1 bar1 123", "2 foo2 bar2 456", "3 foo3 bar3 789", "4 foo4 bar4 123", "5 foo5 bar5 223", "6 foo6 bar6 323", )) + <-dispatcher.WaitCleanUpFinished // failed task, should clean up all sorted data too. s.enableFailpoint("github.com/pingcap/tidb/disttask/importinto/failWhenDispatchWriteIngestSubtask", "return(true)") @@ -106,11 +112,13 @@ func (s *mockGCSSuite) TestGlobalSortBasic() { jobID, err = strconv.Atoi(result[0][0].(string)) s.NoError(err) s.Eventually(func() bool { - globalTask, err2 = globalTaskManager.GetGlobalTaskByKey(importinto.TaskKey(int64(jobID))) + globalTask, err2 = globalTaskManager.GetGlobalTaskByKeyWithHistory(importinto.TaskKey(int64(jobID))) s.NoError(err2) return globalTask.State == "failed" }, 10*time.Second, 300*time.Millisecond) // check all sorted data cleaned up + <-dispatcher.WaitCleanUpFinished + _, files, err = s.server.ListObjectsWithOptions("sorted", fakestorage.ListOptions{Prefix: "import"}) s.NoError(err) s.Len(files, 0) diff --git a/tests/realtikvtest/importintotest4/split_file_test.go b/tests/realtikvtest/importintotest4/split_file_test.go index c4ca398f2fd8e..1226073823eb9 100644 --- a/tests/realtikvtest/importintotest4/split_file_test.go +++ b/tests/realtikvtest/importintotest4/split_file_test.go @@ -58,8 +58,9 @@ func (s *mockGCSSuite) TestSplitFile() { s.NoError(err) taskKey := importinto.TaskKey(int64(jobID)) s.NoError(err) - globalTask, err2 := globalTaskManager.GetGlobalTaskByKey(taskKey) + globalTask, err2 := globalTaskManager.GetGlobalTaskByKeyWithHistory(taskKey) s.NoError(err2) + subtasks, err2 := globalTaskManager.GetSubtasksForImportInto(globalTask.ID, importinto.StepImport) s.NoError(err2) s.Len(subtasks, 3)