diff --git a/disttask/importinto/BUILD.bazel b/disttask/importinto/BUILD.bazel index 62600abaec..b0a9cb759b 100644 --- a/disttask/importinto/BUILD.bazel +++ b/disttask/importinto/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "importinto", srcs = [ "dispatcher.go", + "encode_and_sort_operator.go", "job.go", "planner.go", "proto.go", @@ -32,6 +33,7 @@ go_library( "//disttask/framework/scheduler", "//disttask/framework/scheduler/execute", "//disttask/framework/storage", + "//disttask/operator", "//domain/infosync", "//errno", "//executor/asyncloaddata", @@ -40,9 +42,12 @@ go_library( "//meta/autoid", "//parser/ast", "//parser/mysql", + "//resourcemanager/pool/workerpool", + "//resourcemanager/util", "//sessionctx", "//sessionctx/variable", "//table/tables", + "//util", "//util/dbterror/exeerrors", "//util/etcd", "//util/logutil", @@ -63,6 +68,7 @@ go_test( timeout = "short", srcs = [ "dispatcher_test.go", + "encode_and_sort_operator_test.go", "planner_test.go", "subtask_executor_test.go", "wrapper_test.go", @@ -70,14 +76,17 @@ go_test( embed = [":importinto"], flaky = True, race = "on", - shard_count = 5, + shard_count = 6, deps = [ "//br/pkg/lightning/checkpoints", "//br/pkg/lightning/mydump", "//br/pkg/lightning/verification", + "//disttask/framework/mock/execute", "//disttask/framework/planner", "//disttask/framework/proto", + "//disttask/framework/scheduler/execute", "//disttask/framework/storage", + "//disttask/operator", "//domain/infosync", "//executor/importer", "//meta/autoid", @@ -86,9 +95,12 @@ go_test( "//testkit", "//util/logutil", "@com_github_ngaut_pools//:pools", + "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", "@com_github_tikv_client_go_v2//util", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//:zap", ], ) diff --git a/disttask/importinto/encode_and_sort_operator.go b/disttask/importinto/encode_and_sort_operator.go new file mode 100644 index 0000000000..fa5e32e7b4 --- /dev/null +++ b/disttask/importinto/encode_and_sort_operator.go @@ -0,0 +1,134 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importinto + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/disttask/operator" + "github.com/pingcap/tidb/resourcemanager/pool/workerpool" + "github.com/pingcap/tidb/resourcemanager/util" + tidbutil "github.com/pingcap/tidb/util" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +// encodeAndSortOperator is an operator that encodes and sorts data. +// this operator process data of a subtask, i.e. one engine, it contains a lot +// of data chunks, each chunk is a data file or part of it. +// we don't split into encode and sort operators of chunk level, we parallel +// them inside. +type encodeAndSortOperator struct { + *operator.AsyncOperator[*importStepMinimalTask, workerpool.None] + wg tidbutil.WaitGroupWrapper + firstErr atomic.Error + + ctx context.Context + cancel context.CancelFunc + + logger *zap.Logger + errCh chan error +} + +var _ operator.Operator = (*encodeAndSortOperator)(nil) +var _ operator.WithSource[*importStepMinimalTask] = (*encodeAndSortOperator)(nil) +var _ operator.WithSink[workerpool.None] = (*encodeAndSortOperator)(nil) + +func newEncodeAndSortOperator(ctx context.Context, concurrency int, logger *zap.Logger) *encodeAndSortOperator { + subCtx, cancel := context.WithCancel(ctx) + op := &encodeAndSortOperator{ + ctx: subCtx, + cancel: cancel, + logger: logger, + errCh: make(chan error), + } + pool := workerpool.NewWorkerPool( + "encodeAndSortOperator", + util.ImportInto, + concurrency, + func() workerpool.Worker[*importStepMinimalTask, workerpool.None] { + return &chunkWorker{ + ctx: subCtx, + op: op, + } + }, + ) + op.AsyncOperator = operator.NewAsyncOperator(subCtx, pool) + return op +} + +func (op *encodeAndSortOperator) Open() error { + op.wg.Run(func() { + for err := range op.errCh { + if op.firstErr.CompareAndSwap(nil, err) { + op.cancel() + } else { + if errors.Cause(err) != context.Canceled { + op.logger.Error("error on encode and sort", zap.Error(err)) + } + } + } + }) + return op.AsyncOperator.Open() +} + +func (op *encodeAndSortOperator) Close() error { + // TODO: handle close err after we separate wait part from close part. + // right now AsyncOperator.Close always returns nil, ok to ignore it. + // nolint:errcheck + op.AsyncOperator.Close() + op.cancel() + close(op.errCh) + op.wg.Wait() + // see comments on interface definition, this Close is actually WaitAndClose. + return op.firstErr.Load() +} + +func (*encodeAndSortOperator) String() string { + return "encodeAndSortOperator" +} + +func (op *encodeAndSortOperator) hasError() bool { + return op.firstErr.Load() != nil +} + +func (op *encodeAndSortOperator) onError(err error) { + op.errCh <- err +} + +func (op *encodeAndSortOperator) Done() <-chan struct{} { + return op.ctx.Done() +} + +type chunkWorker struct { + ctx context.Context + op *encodeAndSortOperator +} + +func (w *chunkWorker) HandleTask(task *importStepMinimalTask, _ func(workerpool.None)) { + if w.op.hasError() { + return + } + // we don't use the input send function, it makes workflow more complex + // we send result to errCh and handle it here. + executor := newImportMinimalTaskExecutor(task) + if err := executor.Run(w.ctx); err != nil { + w.op.onError(err) + } +} + +func (*chunkWorker) Close() { +} diff --git a/disttask/importinto/encode_and_sort_operator_test.go b/disttask/importinto/encode_and_sort_operator_test.go new file mode 100644 index 0000000000..1f7b347e66 --- /dev/null +++ b/disttask/importinto/encode_and_sort_operator_test.go @@ -0,0 +1,118 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importinto + +import ( + "context" + "os" + "path" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/pingcap/errors" + mockexecute "github.com/pingcap/tidb/disttask/framework/mock/execute" + "github.com/pingcap/tidb/disttask/framework/scheduler/execute" + "github.com/pingcap/tidb/disttask/operator" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "go.uber.org/zap" +) + +func TestEncodeAndSortOperator(t *testing.T) { + bak := os.Stdout + logFileName := path.Join(t.TempDir(), "test.log") + file, err := os.OpenFile(logFileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + require.NoError(t, err) + os.Stdout = file + t.Cleanup(func() { + require.NoError(t, os.Stdout.Close()) + os.Stdout = bak + }) + logger := zap.NewExample() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + executor := mockexecute.NewMockMiniTaskExecutor(ctrl) + backup := newImportMinimalTaskExecutor + t.Cleanup(func() { + newImportMinimalTaskExecutor = backup + }) + newImportMinimalTaskExecutor = func(t *importStepMinimalTask) execute.MiniTaskExecutor { + return executor + } + + source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) + op := newEncodeAndSortOperator(context.Background(), 3, logger) + op.SetSource(source) + require.NoError(t, op.Open()) + require.Greater(t, len(op.String()), 0) + + // cancel on error + mockErr := errors.New("mock err") + executor.EXPECT().Run(gomock.Any()).Return(mockErr) + source.Channel() <- &importStepMinimalTask{} + require.Eventually(t, func() bool { + return op.hasError() + }, 3*time.Second, 300*time.Millisecond) + require.Equal(t, mockErr, op.firstErr.Load()) + // should not block + <-op.ctx.Done() + require.ErrorIs(t, op.Close(), mockErr) + + // cancel on error and log other errors + mockErr2 := errors.New("mock err 2") + source = operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) + op = newEncodeAndSortOperator(context.Background(), 2, logger) + op.SetSource(source) + executor1 := mockexecute.NewMockMiniTaskExecutor(ctrl) + executor2 := mockexecute.NewMockMiniTaskExecutor(ctrl) + var id atomic.Int32 + newImportMinimalTaskExecutor = func(t *importStepMinimalTask) execute.MiniTaskExecutor { + if id.Add(1) == 1 { + return executor1 + } + return executor2 + } + var wg sync.WaitGroup + wg.Add(2) + // wait until 2 executor start running, else workerpool will be cancelled. + executor1.EXPECT().Run(gomock.Any()).DoAndReturn(func(context.Context) error { + wg.Done() + wg.Wait() + return mockErr2 + }) + executor2.EXPECT().Run(gomock.Any()).DoAndReturn(func(context.Context) error { + wg.Done() + wg.Wait() + // wait error in executor1 has been processed + require.Eventually(t, func() bool { + return op.hasError() + }, 3*time.Second, 300*time.Millisecond) + return errors.New("mock error should be logged") + }) + require.NoError(t, op.Open()) + // send 2 tasks + source.Channel() <- &importStepMinimalTask{} + source.Channel() <- &importStepMinimalTask{} + // should not block + <-op.ctx.Done() + require.ErrorIs(t, op.Close(), mockErr2) + require.NoError(t, os.Stdout.Sync()) + content, err := os.ReadFile(logFileName) + require.NoError(t, err) + require.Contains(t, string(content), "mock error should be logged") +} diff --git a/disttask/importinto/scheduler.go b/disttask/importinto/scheduler.go index e242978581..deb2050a26 100644 --- a/disttask/importinto/scheduler.go +++ b/disttask/importinto/scheduler.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/scheduler" "github.com/pingcap/tidb/disttask/framework/scheduler/execute" + "github.com/pingcap/tidb/disttask/operator" "github.com/pingcap/tidb/executor/asyncloaddata" "github.com/pingcap/tidb/executor/importer" "github.com/pingcap/tidb/meta/autoid" @@ -98,7 +99,7 @@ func (s *importStepExecutor) SplitSubtask(ctx context.Context, subtask *proto.Su if err != nil { return nil, err } - s.logger.Info("split subtask", zap.Int32("engine-id", subtaskMeta.ID)) + s.logger.Info("split and run subtask", zap.Int32("engine-id", subtaskMeta.ID)) dataEngine, err := s.tableImporter.OpenDataEngine(ctx, subtaskMeta.ID) if err != nil { @@ -123,15 +124,31 @@ func (s *importStepExecutor) SplitSubtask(ctx context.Context, subtask *proto.Su } s.sharedVars.Store(subtaskMeta.ID, sharedVars) - miniTask := make([]proto.MinimalTask, 0, len(subtaskMeta.Chunks)) + source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) + op := newEncodeAndSortOperator(ctx, int(s.taskMeta.Plan.ThreadCnt), s.logger) + op.SetSource(source) + pipeline := operator.NewAsyncPipeline(op) + if err = pipeline.Execute(); err != nil { + return nil, err + } + +outer: for _, chunk := range subtaskMeta.Chunks { - miniTask = append(miniTask, &importStepMinimalTask{ + // TODO: current workpool impl doesn't drain the input channel, it will + // just return on context cancel(error happened), so we add this select. + select { + case source.Channel() <- &importStepMinimalTask{ Plan: s.taskMeta.Plan, Chunk: chunk, SharedVars: sharedVars, - }) + }: + case <-op.Done(): + break outer + } } - return miniTask, nil + source.Finish() + + return nil, pipeline.Close() } func (s *importStepExecutor) OnFinished(ctx context.Context, subtaskMetaBytes []byte) ([]byte, error) { @@ -266,14 +283,7 @@ func (*importScheduler) GetSubtaskExecutor(_ context.Context, task *proto.Task, func (*importScheduler) GetMiniTaskExecutor(minimalTask proto.MinimalTask, _ string, step int64) (execute.MiniTaskExecutor, error) { switch step { - case StepImport: - task, ok := minimalTask.(*importStepMinimalTask) - if !ok { - return nil, errors.Errorf("invalid task type %T", minimalTask) - } - return &ImportMinimalTaskExecutor{mTtask: task}, nil case StepPostProcess: - mTask, ok := minimalTask.(*postProcessStepMinimalTask) if !ok { return nil, errors.Errorf("invalid task type %T", minimalTask) diff --git a/disttask/importinto/subtask_executor.go b/disttask/importinto/subtask_executor.go index 842cb71ee4..18a9e78d75 100644 --- a/disttask/importinto/subtask_executor.go +++ b/disttask/importinto/subtask_executor.go @@ -29,6 +29,7 @@ import ( verify "github.com/pingcap/tidb/br/pkg/lightning/verification" tidb "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/disttask/framework/proto" + "github.com/pingcap/tidb/disttask/framework/scheduler/execute" "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/executor/importer" "github.com/pingcap/tidb/kv" @@ -43,13 +44,20 @@ import ( // TestSyncChan is used to test. var TestSyncChan = make(chan struct{}) -// ImportMinimalTaskExecutor is a minimal task executor for IMPORT INTO. -type ImportMinimalTaskExecutor struct { +// importMinimalTaskExecutor is a minimal task executor for IMPORT INTO. +type importMinimalTaskExecutor struct { mTtask *importStepMinimalTask } -// Run implements the MiniTaskExecutor.Run interface. -func (e *ImportMinimalTaskExecutor) Run(ctx context.Context) error { +var newImportMinimalTaskExecutor = newImportMinimalTaskExecutor0 + +func newImportMinimalTaskExecutor0(t *importStepMinimalTask) execute.MiniTaskExecutor { + return &importMinimalTaskExecutor{ + mTtask: t, + } +} + +func (e *importMinimalTaskExecutor) Run(ctx context.Context) error { logger := logutil.BgLogger().With(zap.String("type", proto.ImportInto), zap.Int64("table-id", e.mTtask.Plan.TableInfo.ID)) logger.Info("run minimal task") failpoint.Inject("waitBeforeSortChunk", func() { diff --git a/disttask/operator/operator.go b/disttask/operator/operator.go index df6480362c..2e3bb5a68c 100644 --- a/disttask/operator/operator.go +++ b/disttask/operator/operator.go @@ -25,6 +25,8 @@ import ( // Operator is the basic operation unit in the task execution. type Operator interface { Open() error + // Close wait task done and close the operator. + // TODO: the wait part should be separated from the close part. Close() error String() string } diff --git a/disttask/operator/pipeline.go b/disttask/operator/pipeline.go index 3c9f32e2a9..a920eddb78 100644 --- a/disttask/operator/pipeline.go +++ b/disttask/operator/pipeline.go @@ -22,7 +22,7 @@ type AsyncPipeline struct { ops []Operator } -// Execute starts all operators waiting to handle tasks. +// Execute opens all operators, it's run asynchronously. func (p *AsyncPipeline) Execute() error { // Start running each operator. for i, op := range p.ops { diff --git a/resourcemanager/util/util.go b/resourcemanager/util/util.go index 3ddbd024b5..138cfbef6f 100644 --- a/resourcemanager/util/util.go +++ b/resourcemanager/util/util.go @@ -57,4 +57,6 @@ const ( DistTask // CheckTable is for admin check table component. CheckTable + // ImportInto is for import into component. + ImportInto )