From d0d4eca4452ea6977dd72bd9c4aa53fbf26596ef Mon Sep 17 00:00:00 2001 From: "TONG, Zhigao" Date: Wed, 31 Jan 2024 15:58:54 +0800 Subject: [PATCH 1/5] executor: enhance memory tracking for IndexLookup (#49336) close pingcap/tidb#45901 --- pkg/executor/distsql.go | 47 ++++++++++++++++++++++++----------- pkg/executor/table_reader.go | 16 ++++++++++++ pkg/kv/BUILD.bazel | 2 ++ pkg/kv/key.go | 48 ++++++++++++++++++++++++++++++++++++ pkg/kv/key_test.go | 22 +++++++++++++++++ 5 files changed, 121 insertions(+), 14 deletions(-) diff --git a/pkg/executor/distsql.go b/pkg/executor/distsql.go index 129452a79e88b..d0f73e5b08806 100644 --- a/pkg/executor/distsql.go +++ b/pkg/executor/distsql.go @@ -56,6 +56,7 @@ import ( "github.com/pingcap/tidb/pkg/util/logutil/consistency" "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/ranger" + "github.com/pingcap/tidb/pkg/util/size" "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" ) @@ -797,7 +798,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha } } -func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookupTableTask) (exec.Executor, error) { +func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookupTableTask) (*TableReaderExecutor, error) { table := e.table if e.partitionTableMode && task.partitionTable != nil { table = task.partitionTable @@ -1474,10 +1475,22 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er return w.compareData(ctx, task, tableReader) } - task.memTracker = w.memTracker - memUsage := int64(cap(task.handles) * 8) - task.memUsage = memUsage - task.memTracker.Consume(memUsage) + { + task.memTracker = w.memTracker + memUsage := int64(cap(task.handles))*size.SizeOfInterface + tableReader.memUsage() + for _, h := range task.handles { + memUsage += int64(h.MemUsage()) + } + if task.indexOrder != nil { + memUsage += task.indexOrder.MemUsage() + } + if task.duplicatedIndexOrder != nil { + memUsage += task.duplicatedIndexOrder.MemUsage() + } + memUsage += task.idxRows.MemoryUsage() + task.memUsage = memUsage + task.memTracker.Consume(memUsage) + } handleCnt := len(task.handles) task.rows = make([]chunk.Row, 0, handleCnt) for { @@ -1490,9 +1503,11 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er if chk.NumRows() == 0 { break } - memUsage = chk.MemoryUsage() - task.memUsage += memUsage - task.memTracker.Consume(memUsage) + { + memUsage := chk.MemoryUsage() + task.memUsage += memUsage + task.memTracker.Consume(memUsage) + } iter := chunk.NewIterator4Chunk(chk) for row := iter.Begin(); row != iter.End(); row = iter.Next() { task.rows = append(task.rows, row) @@ -1500,9 +1515,11 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er } defer trace.StartRegion(ctx, "IndexLookUpTableCompute").End() - memUsage = int64(cap(task.rows)) * int64(unsafe.Sizeof(chunk.Row{})) - task.memUsage += memUsage - task.memTracker.Consume(memUsage) + { + memUsage := int64(cap(task.rows)) * int64(unsafe.Sizeof(chunk.Row{})) + task.memUsage += memUsage + task.memTracker.Consume(memUsage) + } if w.keepOrder { task.rowIdx = make([]int, 0, len(task.rows)) for i := range task.rows { @@ -1513,9 +1530,11 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er rowIdx, _ := task.indexOrder.Get(handle) task.rowIdx = append(task.rowIdx, rowIdx.(int)) } - memUsage = int64(cap(task.rowIdx) * 4) - task.memUsage += memUsage - task.memTracker.Consume(memUsage) + { + memUsage := int64(cap(task.rowIdx) * int(size.SizeOfInt)) + task.memUsage += memUsage + task.memTracker.Consume(memUsage) + } sort.Sort(task) } diff --git a/pkg/executor/table_reader.go b/pkg/executor/table_reader.go index 4d87861c9511b..8d7624f7c23e9 100644 --- a/pkg/executor/table_reader.go +++ b/pkg/executor/table_reader.go @@ -20,6 +20,7 @@ import ( "context" "slices" "time" + "unsafe" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/distsql" @@ -42,6 +43,7 @@ import ( "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/ranger" + "github.com/pingcap/tidb/pkg/util/size" "github.com/pingcap/tidb/pkg/util/stringutil" "github.com/pingcap/tidb/pkg/util/tracing" "github.com/pingcap/tipb/go-tipb" @@ -139,6 +141,20 @@ func (e *TableReaderExecutor) setDummy() { e.dummy = true } +func (e *TableReaderExecutor) memUsage() int64 { + const sizeofTableReaderExecutor = int64(unsafe.Sizeof(*(*TableReaderExecutor)(nil))) + + res := sizeofTableReaderExecutor + res += size.SizeOfPointer * int64(cap(e.ranges)) + for _, v := range e.ranges { + res += v.MemUsage() + } + res += kv.KeyRangeSliceMemUsage(e.kvRanges) + res += int64(e.dagPB.Size()) + // TODO: add more statistics + return res +} + // Open initializes necessary variables for using this executor. func (e *TableReaderExecutor) Open(ctx context.Context) error { r, ctx := tracing.StartRegionEx(ctx, "TableReaderExecutor.Open") diff --git a/pkg/kv/BUILD.bazel b/pkg/kv/BUILD.bazel index 61433c6824390..62a5c6f210694 100644 --- a/pkg/kv/BUILD.bazel +++ b/pkg/kv/BUILD.bazel @@ -35,6 +35,7 @@ go_library( "//pkg/util/logutil", "//pkg/util/memory", "//pkg/util/set", + "//pkg/util/size", "//pkg/util/tiflash", "//pkg/util/tiflashcompute", "//pkg/util/trxevents", @@ -87,6 +88,7 @@ go_test( "//pkg/testkit/testutil", "//pkg/types", "//pkg/util/codec", + "//pkg/util/size", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/coprocessor", "@com_github_pingcap_kvproto//pkg/deadlock", diff --git a/pkg/kv/key.go b/pkg/kv/key.go index 5b1ad8baccde2..4536caab5118f 100644 --- a/pkg/kv/key.go +++ b/pkg/kv/key.go @@ -20,10 +20,12 @@ import ( "fmt" "strconv" "strings" + "unsafe" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/set" + "github.com/pingcap/tidb/pkg/util/size" ) // Key represents high-level Key type. @@ -102,6 +104,18 @@ type KeyRange struct { XXXsizecache int32 } +// KeyRangeSliceMemUsage return the memory usage of []KeyRange +func KeyRangeSliceMemUsage(k []KeyRange) int64 { + const sizeofKeyRange = int64(unsafe.Sizeof(*(*KeyRange)(nil))) + + res := sizeofKeyRange * int64(cap(k)) + for _, m := range k { + res += int64(cap(m.StartKey)) + int64(cap(m.EndKey)) + int64(cap(m.XXXunrecognized)) + } + + return res +} + // IsPoint checks if the key range represents a point. func (r *KeyRange) IsPoint() bool { if len(r.StartKey) != len(r.EndKey) { @@ -404,6 +418,12 @@ type strHandleVal struct { val any } +// SizeofHandleMap presents the memory size of struct HandleMap +const SizeofHandleMap = int64(unsafe.Sizeof(*(*HandleMap)(nil))) + +// SizeofStrHandleVal presents the memory size of struct strHandleVal +const SizeofStrHandleVal = int64(unsafe.Sizeof(*(*strHandleVal)(nil))) + // NewHandleMap creates a new map for handle. func NewHandleMap() *HandleMap { return &HandleMap{ @@ -436,6 +456,34 @@ func (m *HandleMap) Get(h Handle) (v any, ok bool) { return } +func calcStrsMemUsage(strs map[string]strHandleVal) int64 { + res := int64(0) + for key := range strs { + res += size.SizeOfString + int64(len(key)) + SizeofStrHandleVal + } + return res +} + +func calcIntsMemUsage(ints map[int64]interface{}) int64 { + return int64(len(ints)) * (size.SizeOfInt64 + size.SizeOfInterface) +} + +// MemUsage gets the memory usage. +func (m *HandleMap) MemUsage() int64 { + res := SizeofHandleMap + res += int64(len(m.partitionInts)) * (size.SizeOfInt64 + size.SizeOfMap) + for _, v := range m.partitionInts { + res += calcIntsMemUsage(v) + } + res += int64(len(m.partitionStrs)) * (size.SizeOfInt64 + size.SizeOfMap) + for _, v := range m.partitionStrs { + res += calcStrsMemUsage(v) + } + res += calcIntsMemUsage(m.ints) + res += calcStrsMemUsage(m.strs) + return res +} + // Set sets a value with a Handle. func (m *HandleMap) Set(h Handle, val any) { ints, strs := m.ints, m.strs diff --git a/pkg/kv/key_test.go b/pkg/kv/key_test.go index 8924167dce380..afd2b0cb65a7a 100644 --- a/pkg/kv/key_test.go +++ b/pkg/kv/key_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/testkit/testutil" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/codec" + "github.com/pingcap/tidb/pkg/util/size" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -167,22 +168,34 @@ func TestHandleMap(t *testing.T) { m := NewHandleMap() h := IntHandle(1) + assert.Equal(t, SizeofHandleMap, m.MemUsage()) + m.Set(h, 1) v, ok := m.Get(h) assert.True(t, ok) assert.Equal(t, 1, v) + assert.Equal(t, SizeofHandleMap+size.SizeOfInt64+size.SizeOfInterface, m.MemUsage()) + m.Delete(h) v, ok = m.Get(h) assert.False(t, ok) assert.Nil(t, v) + assert.Equal(t, SizeofHandleMap, m.MemUsage()) + ch := testutil.MustNewCommonHandle(t, 100, "abc") m.Set(ch, "a") v, ok = m.Get(ch) assert.True(t, ok) assert.Equal(t, "a", v) + { + key := string(ch.Encoded()) + sz := size.SizeOfString + int64(len(key)) + SizeofStrHandleVal + assert.Equal(t, SizeofHandleMap+sz, m.MemUsage()) + } + m.Delete(ch) v, ok = m.Get(ch) assert.False(t, ok) @@ -324,6 +337,15 @@ func TestKeyRangeDefinition(t *testing.T) { // And same default value. require.Equal(t, (*coprocessor.KeyRange)(unsafe.Pointer(&r1)), &r2) require.Equal(t, &r1, (*KeyRange)(unsafe.Pointer(&r2))) + + s := []KeyRange{{ + StartKey: []byte("s1"), + EndKey: []byte("e1"), + }, { + StartKey: []byte("s2"), + EndKey: []byte("e2"), + }} + require.Equal(t, int64(168), KeyRangeSliceMemUsage(s)) } func BenchmarkIsPoint(b *testing.B) { From 732fa8c986952bf26c17b33ba0a1e07a160eecf9 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Wed, 31 Jan 2024 16:09:24 +0800 Subject: [PATCH 2/5] doc: add batch coprocessor rfc (#39362) ref pingcap/tidb#39361 --- docs/design/2022-11-23-batch-cop.md | 149 ++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 docs/design/2022-11-23-batch-cop.md diff --git a/docs/design/2022-11-23-batch-cop.md b/docs/design/2022-11-23-batch-cop.md new file mode 100644 index 0000000000000..34f81fea82f46 --- /dev/null +++ b/docs/design/2022-11-23-batch-cop.md @@ -0,0 +1,149 @@ +# Proposal: support batch coprocessor for tikv + +* Authors: [cfzjywxk](https://github.com/cfzjywxk) +* Tracking issue: [39361](https://github.com/pingcap/tidb/issues/39361) + +## Motivation + +The fanout issue in index lookup queries is one cause of increased query latency and cost. If there are +1,000 handles and they are distributed in 1,000 regions, TiDB would construct 1,000 small tasks to retrieve +the 1000 related row contents, even when all the region leaders are in the same store. This results in the following problems: +1. Each task requires a single RPC request, there could be too many tasks or RPC requests though each +request just fetches a few rows. Sometimes the cost of RPC could not be ignored. +2. Increasing task numbers may lead to more queueing. Tuning the related concurrency parameters or task scheduling +policies become more complex and it’s difficult to get best performance. + +In the current coprocessor implementation, key ranges in the same region would be batched in a single +task(there is a hard coded 25000 upper limit), how about batching all the cop tasks which would +be sent to the same store? + +In a user situation, the index range scan returns 4000000 rows, and finally 400000 coprocessor table-lookup +tasks are generated, which means the key ranges are scattered in different regions. + +## Optimization + +### The IndexLookUp Execution Review + +Usually, the IndexLookUp executor may have an index worker which tries to read index keys and related row handles +according to the index filter conditions. Each time it fetches enough row handle data, it would create a +coprocessor table lookup task and send it to the table workers. The handle data size limit for one task could be configured +by the [tidb_index_lookup_size](https://docs.pingcap.com/tidb/dev/system-variables#tidb_index_lookup_size) +system variable. + +When the table worker gets a coprocessor task, it would split the handle ranges according to the region +information from the region cache. Then these region-aware tasks are processed by the coprocessor client +which has a default concurrency limit configured by the [tidb_distsql_scan_concurrency](https://docs.pingcap.com/tidb/dev/system-variables#tidb_distsql_scan_concurrency) system +variable. + +### Batching Strategy + +As coprocessor streaming is already deprecated, bringing it back may not be a good idea. To make the design +simple, we could just do the batching for each coprocessor table task separately. Different coprocessor table +tasks may still require different RPC requests, while row handle ranges within one task could be batched if +their region leaders are in the same store. The main idea is trying to batch sending the tasks using one +RPC for each original `copTask` if the row handle range-related region leaders are located in the same tikv store. + +With the batching optimization, the number of RPC requests may be at most the number of store nodes for each table lookup task +. Consider an extreme case, if the index scan returns 4000000 rows and each task range is one row +, there could be as many as `4000000/25000=160` table lookup tasks each containg 25000 key ranges. But now the RPC number +would become at most `160 * store_numbers`, for example if store_number is 10, the total request number is +1600 which is much less than the previous 400000. + +### Proto Change + +Create a new structure for the batched tasks, including the request `StoreBatchTask` and response `StoreBatchTaskResponse` types. + +```protobuf +message StoreBatchTask { + uint64 region_id = 1; + metapb.RegionEpoch region_epoch = 2; + metapb.Peer peer = 3; + repeated KeyRange ranges = 4; + uint64 task_id = 5; +} +``` + +```protobuf +message StoreBatchTaskResponse { + bytes data = 1 [(gogoproto.customtype) = "github.com/pingcap/kvproto/pkg/sharedbytes.SharedBytes", (gogoproto.nullable) = false]; + errorpb.Error region_error = 2; + kvrpcpb.LockInfo locked = 3; + string other_error = 4; + uint64 task_id = 5; + kvrpcpb.ExecDetailsV2 exec_details_v2 = 6; +} +``` + +Attach the batched tasks into the `Corprocessor` request. Reuse the `RegionInfo` mentioned above to store tasks +in different regions but the same store. +```protobuf +message Request { + … + + // Store the batched tasks belonging to other regions. + repeated StoreBatchTask tasks = 11; +} +``` + +Add batched task results in `Response`, different tasks may encounter different kinds of errors, collect them +together. +```protobuf +message Response { + … + repeated StoreBatchTaskResponse batch_responses = 13; +} +``` + +### The TiDB Side + +Adding a flag in `kv.Request` to indicate if the batch strategy is enabled or not. +```golang +type Request struct { + … + // EnableStoreBatch indicates if the tasks are batched. + EnableStoreBatch bool +} +``` + +Adding batch task related fields in `copr.copTask`. They would be collected when the `copTask` is being +prepared and the store batch is enabled. +```golang +type copTask struct { + … + // + batchTaskList []kvproto.Coprocessor.RegionInfo +} +``` + +When building coprocessor tasks in the `buildCopTasks` function, try to fill the `batchTaskList` if +necessary.The steps are: +1. Creating a map to record `store address => *copTask`.If store batch is enabled, tasks would be appended +to existing `copTask` when the store address is the same. +2. Split the ranges according to the region information as usual. After this, all the tasks correspond +to a single region. +3. When processing a new `KeyLocation`, try to append it as the batch task to the existing coprocessor task +if possible. + +The coprocessor client just sends the tasks as usual, the `Coprocessor` request is still a unary RPC +request though it may be batched. When handling `CopResponse`, if the batch path is enabled and +there are region errors or other errors processing batch tasks, rescheduling the cop tasks or +reporting errors to the upper layer. + +Note if the `keepOrder` is required, the partial query result could not be sent back until all the reads +have succeeded. + + + +### The TiKV Side + +A simple way is to change the logic in `Endpoint.parse_and_handle_unary_request`, after parsing the +original request, the batched task-related builder and handler could be also generated using the input +information from the RPC context, region information, and key ranges as long as they are properly passed in +the `Coprocessor` request. + +All the request handling could be scheduled to the read pool at the same time, +so before finishing something like `join_all` would be needed to wait for all the results of +different tasks. If any error is returned, do fill in the error fields in the `Response`. + +For the execution tracking, creating seperate trackers for the requests, all the execution details would be returned +to the client. From 0c9b0f4d2b2dfa4bf87db8be4031251b20953cfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Wed, 31 Jan 2024 17:43:55 +0800 Subject: [PATCH 3/5] *: delete useless `SessionVars.IDAllocator` (#50844) close pingcap/tidb#50842 --- pkg/sessionctx/variable/BUILD.bazel | 1 - pkg/sessionctx/variable/session.go | 5 ----- pkg/table/tables/tables.go | 33 ++++++----------------------- 3 files changed, 7 insertions(+), 32 deletions(-) diff --git a/pkg/sessionctx/variable/BUILD.bazel b/pkg/sessionctx/variable/BUILD.bazel index 45c3bd30d5dfa..fd106213ea0ef 100644 --- a/pkg/sessionctx/variable/BUILD.bazel +++ b/pkg/sessionctx/variable/BUILD.bazel @@ -25,7 +25,6 @@ go_library( "//pkg/errno", "//pkg/keyspace", "//pkg/kv", - "//pkg/meta/autoid", "//pkg/metrics", "//pkg/parser", "//pkg/parser/ast", diff --git a/pkg/sessionctx/variable/session.go b/pkg/sessionctx/variable/session.go index a5b7d15ea1939..33394102e310b 100644 --- a/pkg/sessionctx/variable/session.go +++ b/pkg/sessionctx/variable/session.go @@ -36,7 +36,6 @@ import ( "github.com/pingcap/tidb/pkg/domain/resourcegroup" "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" @@ -978,10 +977,6 @@ type SessionVars struct { // BatchCommit indicates if we should split the transaction into multiple batches. BatchCommit bool - // IDAllocator is provided by kvEncoder, if it is provided, we will use it to alloc auto id instead of using - // Table.alloc. - IDAllocator autoid.Allocator - // OptimizerSelectivityLevel defines the level of the selectivity estimation in plan. OptimizerSelectivityLevel int diff --git a/pkg/table/tables/tables.go b/pkg/table/tables/tables.go index 6298885ebfbce..e864ef1ce353a 100644 --- a/pkg/table/tables/tables.go +++ b/pkg/table/tables/tables.go @@ -1738,36 +1738,17 @@ func OverflowShardBits(recordID int64, shardRowIDBits uint64, typeBitsLength uin func (t *TableCommon) Allocators(ctx sessionctx.Context) autoid.Allocators { if ctx == nil { return t.allocs - } else if ctx.GetSessionVars().IDAllocator == nil { - // Use an independent allocator for global temporary tables. - if t.meta.TempTableType == model.TempTableGlobal { - if alloc := ctx.GetSessionVars().GetTemporaryTable(t.meta).GetAutoIDAllocator(); alloc != nil { - return autoid.NewAllocators(false, alloc) - } - // If the session is not in a txn, for example, in "show create table", use the original allocator. - // Otherwise the would be a nil pointer dereference. - } - return t.allocs } - // Replace the row id allocator with the one in session variables. - sessAlloc := ctx.GetSessionVars().IDAllocator - allocs := t.allocs.Allocs - retAllocs := make([]autoid.Allocator, 0, len(allocs)) - copy(retAllocs, allocs) - - overwritten := false - for i, a := range retAllocs { - if a.GetType() == autoid.RowIDAllocType { - retAllocs[i] = sessAlloc - overwritten = true - break + // Use an independent allocator for global temporary tables. + if t.meta.TempTableType == model.TempTableGlobal { + if alloc := ctx.GetSessionVars().GetTemporaryTable(t.meta).GetAutoIDAllocator(); alloc != nil { + return autoid.NewAllocators(false, alloc) } + // If the session is not in a txn, for example, in "show create table", use the original allocator. + // Otherwise the would be a nil pointer dereference. } - if !overwritten { - retAllocs = append(retAllocs, sessAlloc) - } - return autoid.NewAllocators(t.allocs.SepAutoInc, retAllocs...) + return t.allocs } // Type implements table.Table Type interface. From acd4999594d2428cf84a8d7bfa4639e75ab27c2d Mon Sep 17 00:00:00 2001 From: Shenghui Wu <793703860@qq.com> Date: Wed, 31 Jan 2024 18:24:24 +0800 Subject: [PATCH 4/5] executor: fix negative consumedbyte for hashRowcontainer (#50843) close pingcap/tidb#50841 --- pkg/executor/hash_table.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/executor/hash_table.go b/pkg/executor/hash_table.go index 0537a42fdc352..067c1ecb23b06 100644 --- a/pkg/executor/hash_table.go +++ b/pkg/executor/hash_table.go @@ -293,9 +293,10 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk memDelta int64 ) c.memTracker.Consume(-c.chkBufSizeForOneProbe) + defer func() { c.memTracker.Consume(memDelta) }() if needTrackMemUsage { c.memTracker.Consume(int64(cap(innerPtrs)) * rowPtrSize) - defer c.memTracker.Consume(-int64(cap(innerPtrs))*rowPtrSize + memDelta) + defer c.memTracker.Consume(-int64(cap(innerPtrs)) * rowPtrSize) } c.chkBufSizeForOneProbe = 0 From 2c25e893be0108589d62bd69e2e237effa19bbfd Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Wed, 31 Jan 2024 19:01:54 +0800 Subject: [PATCH 5/5] disttask: refactor metrics, collect metrics in scheduler manager (#50634) close pingcap/tidb#49615 --- pkg/disttask/framework/mock/scheduler_mock.go | 15 + pkg/disttask/framework/proto/subtask.go | 10 + pkg/disttask/framework/scheduler/BUILD.bazel | 2 + pkg/disttask/framework/scheduler/collector.go | 126 ++++++ pkg/disttask/framework/scheduler/interface.go | 2 + .../framework/scheduler/scheduler_manager.go | 29 +- pkg/disttask/framework/storage/converter.go | 21 +- .../framework/storage/subtask_state.go | 2 +- pkg/disttask/framework/storage/task_table.go | 20 +- .../framework/taskexecutor/task_executor.go | 60 +-- .../taskexecutor/task_executor_test.go | 100 +---- pkg/metrics/disttask.go | 60 +-- pkg/metrics/grafana/tidb.json | 380 +++++++++++++++++- pkg/metrics/grafana/tidb_runtime.json | 308 -------------- pkg/metrics/metrics.go | 3 - 15 files changed, 598 insertions(+), 540 deletions(-) create mode 100644 pkg/disttask/framework/scheduler/collector.go diff --git a/pkg/disttask/framework/mock/scheduler_mock.go b/pkg/disttask/framework/mock/scheduler_mock.go index ad578ebd3249f..9412bb04fa57d 100644 --- a/pkg/disttask/framework/mock/scheduler_mock.go +++ b/pkg/disttask/framework/mock/scheduler_mock.go @@ -323,6 +323,21 @@ func (mr *MockTaskManagerMockRecorder) GetAllNodes(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllNodes", reflect.TypeOf((*MockTaskManager)(nil).GetAllNodes), arg0) } +// GetAllSubtasks mocks base method. +func (m *MockTaskManager) GetAllSubtasks(arg0 context.Context) ([]*proto.Subtask, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAllSubtasks", arg0) + ret0, _ := ret[0].([]*proto.Subtask) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAllSubtasks indicates an expected call of GetAllSubtasks. +func (mr *MockTaskManagerMockRecorder) GetAllSubtasks(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllSubtasks", reflect.TypeOf((*MockTaskManager)(nil).GetAllSubtasks), arg0) +} + // GetAllSubtasksByStepAndState mocks base method. func (m *MockTaskManager) GetAllSubtasksByStepAndState(arg0 context.Context, arg1 int64, arg2 proto.Step, arg3 proto.SubtaskState) ([]*proto.Subtask, error) { m.ctrl.T.Helper() diff --git a/pkg/disttask/framework/proto/subtask.go b/pkg/disttask/framework/proto/subtask.go index 401888ac67ab2..9b9d7d9b4fabc 100644 --- a/pkg/disttask/framework/proto/subtask.go +++ b/pkg/disttask/framework/proto/subtask.go @@ -51,6 +51,16 @@ const ( SubtaskStatePaused SubtaskState = "paused" ) +// AllSubtaskStates is all subtask state. +var AllSubtaskStates = []SubtaskState{ + SubtaskStatePending, + SubtaskStateRunning, + SubtaskStateSucceed, + SubtaskStateFailed, + SubtaskStateCanceled, + SubtaskStatePaused, +} + type ( // SubtaskState is the state of subtask. SubtaskState string diff --git a/pkg/disttask/framework/scheduler/BUILD.bazel b/pkg/disttask/framework/scheduler/BUILD.bazel index ffb84e2d9f131..5b2caec655c21 100644 --- a/pkg/disttask/framework/scheduler/BUILD.bazel +++ b/pkg/disttask/framework/scheduler/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "scheduler", srcs = [ "balancer.go", + "collector.go", "interface.go", "nodes.go", "scheduler.go", @@ -32,6 +33,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_log//:log", + "@com_github_prometheus_client_golang//prometheus", "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/disttask/framework/scheduler/collector.go b/pkg/disttask/framework/scheduler/collector.go new file mode 100644 index 0000000000000..0f4f7fed46400 --- /dev/null +++ b/pkg/disttask/framework/scheduler/collector.go @@ -0,0 +1,126 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "strconv" + "sync/atomic" + "time" + + "github.com/pingcap/tidb/pkg/disttask/framework/proto" + "github.com/prometheus/client_golang/prometheus" +) + +var subtaskCollector = newCollector() + +func init() { + prometheus.MustRegister(subtaskCollector) +} + +// Because the exec_id of a subtask may change, after all tasks +// are successful, subtasks will be migrated from tidb_subtask_background +// to tidb_subtask_background_history. In the above situation, +// the built-in collector of Prometheus needs to delete the previously +// added metrics, which is quite troublesome. +// Therefore, a custom collector is used. +type collector struct { + subtaskInfo atomic.Pointer[[]*proto.Subtask] + + subtasks *prometheus.Desc + subtaskDuration *prometheus.Desc +} + +func newCollector() *collector { + return &collector{ + subtasks: prometheus.NewDesc( + "tidb_disttask_subtasks", + "Number of subtasks.", + []string{"task_type", "task_id", "status", "exec_id"}, nil, + ), + subtaskDuration: prometheus.NewDesc( + "tidb_disttask_subtask_duration", + "Duration of subtasks in different states.", + []string{"task_type", "task_id", "status", "subtask_id", "exec_id"}, nil, + ), + } +} + +// Describe implements the prometheus.Collector interface. +func (c *collector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.subtasks + ch <- c.subtaskDuration +} + +// Collect implements the prometheus.Collector interface. +func (c *collector) Collect(ch chan<- prometheus.Metric) { + p := c.subtaskInfo.Load() + if p == nil { + return + } + subtasks := *p + + // taskID => execID => state => cnt + subtaskCnt := make(map[int64]map[string]map[proto.SubtaskState]int) + taskType := make(map[int64]proto.TaskType) + for _, subtask := range subtasks { + if _, ok := subtaskCnt[subtask.TaskID]; !ok { + subtaskCnt[subtask.TaskID] = make(map[string]map[proto.SubtaskState]int) + } + if _, ok := subtaskCnt[subtask.TaskID][subtask.ExecID]; !ok { + subtaskCnt[subtask.TaskID][subtask.ExecID] = make(map[proto.SubtaskState]int) + } + + subtaskCnt[subtask.TaskID][subtask.ExecID][subtask.State]++ + taskType[subtask.TaskID] = subtask.Type + + c.setDistSubtaskDuration(ch, subtask) + } + for taskID, execIDMap := range subtaskCnt { + for execID, stateMap := range execIDMap { + for state, cnt := range stateMap { + ch <- prometheus.MustNewConstMetric(c.subtasks, prometheus.GaugeValue, + float64(cnt), + taskType[taskID].String(), + strconv.Itoa(int(taskID)), + state.String(), + execID, + ) + } + } + } +} + +func (c *collector) setDistSubtaskDuration(ch chan<- prometheus.Metric, subtask *proto.Subtask) { + switch subtask.State { + case proto.SubtaskStatePending: + ch <- prometheus.MustNewConstMetric(c.subtaskDuration, prometheus.GaugeValue, + time.Since(subtask.CreateTime).Seconds(), + subtask.Type.String(), + strconv.Itoa(int(subtask.TaskID)), + subtask.State.String(), + strconv.Itoa(int(subtask.ID)), + subtask.ExecID, + ) + case proto.SubtaskStateRunning: + ch <- prometheus.MustNewConstMetric(c.subtaskDuration, prometheus.GaugeValue, + time.Since(subtask.StartTime).Seconds(), + subtask.Type.String(), + strconv.Itoa(int(subtask.TaskID)), + subtask.State.String(), + strconv.Itoa(int(subtask.ID)), + subtask.ExecID, + ) + } +} diff --git a/pkg/disttask/framework/scheduler/interface.go b/pkg/disttask/framework/scheduler/interface.go index 131dc6493ba1e..542897277fe17 100644 --- a/pkg/disttask/framework/scheduler/interface.go +++ b/pkg/disttask/framework/scheduler/interface.go @@ -30,6 +30,8 @@ type TaskManager interface { // The returned tasks are sorted by task order, see proto.Task, and only contains // some fields, see row2TaskBasic. GetTopUnfinishedTasks(ctx context.Context) ([]*proto.Task, error) + // GetAllSubtasks gets all subtasks with basic columns. + GetAllSubtasks(ctx context.Context) ([]*proto.Subtask, error) GetTasksInStates(ctx context.Context, states ...any) (task []*proto.Task, err error) GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error) GCSubtasks(ctx context.Context) error diff --git a/pkg/disttask/framework/scheduler/scheduler_manager.go b/pkg/disttask/framework/scheduler/scheduler_manager.go index 4584670c878b0..f6ea8b0587e53 100644 --- a/pkg/disttask/framework/scheduler/scheduler_manager.go +++ b/pkg/disttask/framework/scheduler/scheduler_manager.go @@ -37,7 +37,8 @@ var ( // defaultHistorySubtaskTableGcInterval is the interval of gc history subtask table. defaultHistorySubtaskTableGcInterval = 24 * time.Hour // DefaultCleanUpInterval is the interval of cleanup routine. - DefaultCleanUpInterval = 10 * time.Minute + DefaultCleanUpInterval = 10 * time.Minute + defaultCollectMetricsInterval = 5 * time.Second ) // WaitTaskFinished is used to sync the test. @@ -162,6 +163,7 @@ func (sm *Manager) Start() { sm.wg.Run(sm.scheduleTaskLoop) sm.wg.Run(sm.gcSubtaskHistoryTableLoop) sm.wg.Run(sm.cleanupTaskLoop) + sm.wg.Run(sm.collectLoop) sm.wg.Run(func() { sm.nodeMgr.maintainLiveNodesLoop(sm.ctx, sm.taskMgr) }) @@ -419,3 +421,28 @@ func (sm *Manager) MockScheduler(task *proto.Task) *BaseScheduler { serverID: sm.serverID, }) } + +func (sm *Manager) collectLoop() { + sm.logger.Info("collect loop start") + ticker := time.NewTicker(defaultCollectMetricsInterval) + defer ticker.Stop() + for { + select { + case <-sm.ctx.Done(): + sm.logger.Info("collect loop exits") + return + case <-ticker.C: + sm.collect() + } + } +} + +func (sm *Manager) collect() { + subtasks, err := sm.taskMgr.GetAllSubtasks(sm.ctx) + if err != nil { + sm.logger.Warn("get all subtasks failed", zap.Error(err)) + return + } + + subtaskCollector.subtaskInfo.Store(&subtasks) +} diff --git a/pkg/disttask/framework/storage/converter.go b/pkg/disttask/framework/storage/converter.go index 4f43f27401be5..4d05122f2feb7 100644 --- a/pkg/disttask/framework/storage/converter.go +++ b/pkg/disttask/framework/storage/converter.go @@ -79,6 +79,15 @@ func row2BasicSubTask(r chunk.Row) *proto.Subtask { if !r.IsNull(8) { ordinal = int(r.GetInt64(8)) } + + // subtask defines start time as bigint, to ensure backward compatible, + // we keep it that way, and we convert it here. + var startTime time.Time + if !r.IsNull(9) { + ts := r.GetInt64(9) + startTime = time.Unix(ts, 0) + } + subtask := &proto.Subtask{ ID: r.GetInt64(0), Step: proto.Step(r.GetInt64(1)), @@ -89,6 +98,7 @@ func row2BasicSubTask(r chunk.Row) *proto.Subtask { Concurrency: int(r.GetInt64(6)), CreateTime: createTime, Ordinal: ordinal, + StartTime: startTime, } return subtask } @@ -96,18 +106,15 @@ func row2BasicSubTask(r chunk.Row) *proto.Subtask { // Row2SubTask converts a row to a subtask. func Row2SubTask(r chunk.Row) *proto.Subtask { subtask := row2BasicSubTask(r) - // subtask defines start/update time as bigint, to ensure backward compatible, + + // subtask defines update time as bigint, to ensure backward compatible, // we keep it that way, and we convert it here. - var startTime, updateTime time.Time - if !r.IsNull(9) { - ts := r.GetInt64(9) - startTime = time.Unix(ts, 0) - } + var updateTime time.Time if !r.IsNull(10) { ts := r.GetInt64(10) updateTime = time.Unix(ts, 0) } - subtask.StartTime = startTime + subtask.UpdateTime = updateTime subtask.Meta = r.GetBytes(11) subtask.Summary = r.GetJSON(12).String() diff --git a/pkg/disttask/framework/storage/subtask_state.go b/pkg/disttask/framework/storage/subtask_state.go index d3cd853fe551f..ac589671fbe67 100644 --- a/pkg/disttask/framework/storage/subtask_state.go +++ b/pkg/disttask/framework/storage/subtask_state.go @@ -123,7 +123,7 @@ func (mgr *TaskManager) RunningSubtasksBack2Pending(ctx context.Context, subtask for _, subtask := range subtasks { _, err := sqlexec.ExecSQL(ctx, se, ` update mysql.tidb_background_subtask - set state = %?, state_update_time = CURRENT_TIMESTAMP() + set state = %?, state_update_time = unix_timestamp() where id = %? and exec_id = %? and state = %?`, proto.SubtaskStatePending, subtask.ID, subtask.ExecID, proto.SubtaskStateRunning) if err != nil { diff --git a/pkg/disttask/framework/storage/task_table.go b/pkg/disttask/framework/storage/task_table.go index c54af6b2f44de..e5f4ca5c56a80 100644 --- a/pkg/disttask/framework/storage/task_table.go +++ b/pkg/disttask/framework/storage/task_table.go @@ -42,9 +42,9 @@ const ( TaskColumns = basicTaskColumns + `, t.start_time, t.state_update_time, t.meta, t.dispatcher_id, t.error` // InsertTaskColumns is the columns used in insert task. InsertTaskColumns = `task_key, type, state, priority, concurrency, step, meta, create_time` - basicSubtaskColumns = `id, step, task_key, type, exec_id, state, concurrency, create_time, ordinal` + basicSubtaskColumns = `id, step, task_key, type, exec_id, state, concurrency, create_time, ordinal, start_time` // SubtaskColumns is the columns for subtask. - SubtaskColumns = basicSubtaskColumns + `, start_time, state_update_time, meta, summary` + SubtaskColumns = basicSubtaskColumns + `, state_update_time, meta, summary` // InsertSubtaskColumns is the columns used in insert subtask. InsertSubtaskColumns = `step, task_key, exec_id, meta, state, type, concurrency, ordinal, create_time, checkpoint, summary` ) @@ -730,3 +730,19 @@ func (mgr *TaskManager) GetSubtasksWithHistory(ctx context.Context, taskID int64 } return subtasks, nil } + +// GetAllSubtasks gets all subtasks with basic columns. +func (mgr *TaskManager) GetAllSubtasks(ctx context.Context) ([]*proto.Subtask, error) { + rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+basicSubtaskColumns+` from mysql.tidb_background_subtask`) + if err != nil { + return nil, err + } + if len(rs) == 0 { + return nil, nil + } + subtasks := make([]*proto.Subtask, 0, len(rs)) + for _, r := range rs { + subtasks = append(subtasks, row2BasicSubTask(r)) + } + return subtasks, nil +} diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index 39dbccdbed210..09cc122b74338 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute" "github.com/pingcap/tidb/pkg/domain/infosync" - "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/backoff" "github.com/pingcap/tidb/pkg/util/gctuner" @@ -143,7 +142,7 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context) { continue } if !e.IsIdempotent(st) { - e.updateSubtaskStateAndError(ctx, st, proto.SubtaskStateFailed, ErrNonIdempotentSubtask) + e.updateSubtaskStateAndErrorImpl(ctx, st.ExecID, st.ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask) return } extraRunningSubtasks = append(extraRunningSubtasks, st) @@ -316,18 +315,6 @@ func (e *BaseTaskExecutor) runStep(resource *proto.StepResource) (resErr error) } }() - subtasks, err := e.taskTable.GetSubtasksByExecIDAndStepAndStates( - runStepCtx, e.id, task.ID, task.Step, - proto.SubtaskStatePending, proto.SubtaskStateRunning) - if err != nil { - e.onError(err) - return e.getError() - } - for _, subtask := range subtasks { - metrics.IncDistTaskSubTaskCnt(subtask) - metrics.StartDistTaskSubTask(subtask) - } - for { // check if any error occurs. if err := e.getError(); err != nil { @@ -352,15 +339,15 @@ func (e *BaseTaskExecutor) runStep(resource *proto.StepResource) (resErr error) e.logger.Info("subtask in running state and is not idempotent, fail it", zap.Int64("subtask-id", subtask.ID)) e.onError(ErrNonIdempotentSubtask) - e.updateSubtaskStateAndError(runStepCtx, subtask, proto.SubtaskStateFailed, ErrNonIdempotentSubtask) + e.updateSubtaskStateAndErrorImpl(runStepCtx, subtask.ExecID, subtask.ID, proto.SubtaskStateFailed, ErrNonIdempotentSubtask) e.markErrorHandled() break } } else { // subtask.State == proto.SubtaskStatePending - err := e.startSubtaskAndUpdateState(runStepCtx, subtask) + err := e.startSubtask(runStepCtx, subtask.ID) if err != nil { - e.logger.Warn("startSubtaskAndUpdateState meets error", zap.Error(err)) + e.logger.Warn("startSubtask meets error", zap.Error(err)) // should ignore ErrSubtaskNotFound // since the err only indicate that the subtask not owned by current task executor. if err == storage.ErrSubtaskNotFound { @@ -496,7 +483,7 @@ func (e *BaseTaskExecutor) onSubtaskFinished(ctx context.Context, executor execu return } - e.finishSubtaskAndUpdateState(ctx, subtask) + e.finishSubtask(ctx, subtask) finished = e.markSubTaskCanceledOrFailed(ctx, subtask) if finished { @@ -531,7 +518,7 @@ func (e *BaseTaskExecutor) Rollback() error { break } - e.updateSubtaskStateAndError(e.ctx, subtask, proto.SubtaskStateCanceled, nil) + e.updateSubtaskStateAndErrorImpl(e.ctx, subtask.ExecID, subtask.ID, proto.SubtaskStateCanceled, nil) if err = e.getError(); err != nil { return err } @@ -653,18 +640,6 @@ func (e *BaseTaskExecutor) resetError() { e.mu.handled = false } -func (e *BaseTaskExecutor) startSubtaskAndUpdateState(ctx context.Context, subtask *proto.Subtask) error { - err := e.startSubtask(ctx, subtask.ID) - if err == nil { - metrics.DecDistTaskSubTaskCnt(subtask) - metrics.EndDistTaskSubTask(subtask) - subtask.State = proto.SubtaskStateRunning - metrics.IncDistTaskSubTaskCnt(subtask) - metrics.StartDistTaskSubTask(subtask) - } - return err -} - func (e *BaseTaskExecutor) updateSubtaskStateAndErrorImpl(ctx context.Context, execID string, subtaskID int64, state proto.SubtaskState, subTaskErr error) { // retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes backoffer := backoff.NewExponential(scheduler.RetrySQLInterval, 2, scheduler.RetrySQLMaxInterval) @@ -708,25 +683,6 @@ func (e *BaseTaskExecutor) finishSubtask(ctx context.Context, subtask *proto.Sub } } -func (e *BaseTaskExecutor) updateSubtaskStateAndError(ctx context.Context, subtask *proto.Subtask, state proto.SubtaskState, subTaskErr error) { - metrics.DecDistTaskSubTaskCnt(subtask) - metrics.EndDistTaskSubTask(subtask) - e.updateSubtaskStateAndErrorImpl(ctx, subtask.ExecID, subtask.ID, state, subTaskErr) - subtask.State = state - metrics.IncDistTaskSubTaskCnt(subtask) - if !subtask.IsDone() { - metrics.StartDistTaskSubTask(subtask) - } -} - -func (e *BaseTaskExecutor) finishSubtaskAndUpdateState(ctx context.Context, subtask *proto.Subtask) { - metrics.DecDistTaskSubTaskCnt(subtask) - metrics.EndDistTaskSubTask(subtask) - e.finishSubtask(ctx, subtask) - subtask.State = proto.SubtaskStateSucceed - metrics.IncDistTaskSubTaskCnt(subtask) -} - // markSubTaskCanceledOrFailed check the error type and decide the subtasks' state. // 1. Only cancel subtasks when meet ErrCancelSubtask. // 2. Only fail subtasks when meet non retryable error. @@ -736,14 +692,14 @@ func (e *BaseTaskExecutor) markSubTaskCanceledOrFailed(ctx context.Context, subt err := errors.Cause(err) if ctx.Err() != nil && context.Cause(ctx) == ErrCancelSubtask { e.logger.Warn("subtask canceled", zap.Error(err)) - e.updateSubtaskStateAndError(e.ctx, subtask, proto.SubtaskStateCanceled, nil) + e.updateSubtaskStateAndErrorImpl(e.ctx, subtask.ExecID, subtask.ID, proto.SubtaskStateCanceled, nil) } else if e.IsRetryableError(err) { e.logger.Warn("meet retryable error", zap.Error(err)) } else if common.IsContextCanceledError(err) { e.logger.Info("meet context canceled for gracefully shutdown", zap.Error(err)) } else { e.logger.Warn("subtask failed", zap.Error(err)) - e.updateSubtaskStateAndError(e.ctx, subtask, proto.SubtaskStateFailed, err) + e.updateSubtaskStateAndErrorImpl(e.ctx, subtask.ExecID, subtask.ID, proto.SubtaskStateFailed, err) } e.markErrorHandled() return true diff --git a/pkg/disttask/framework/taskexecutor/task_executor_test.go b/pkg/disttask/framework/taskexecutor/task_executor_test.go index 2e2a1142aae02..8bdc21925974b 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor_test.go +++ b/pkg/disttask/framework/taskexecutor/task_executor_test.go @@ -78,9 +78,6 @@ func TestTaskExecutorRun(t *testing.T) { // 3. run subtask failed runSubtaskErr := errors.New("run subtask error") mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}, nil) @@ -95,9 +92,6 @@ func TestTaskExecutorRun(t *testing.T) { // 4. run subtask success mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}, nil) @@ -114,12 +108,6 @@ func TestTaskExecutorRun(t *testing.T) { // 5. run subtask one by one mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return( - []*proto.Subtask{ - {ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}, - {ID: 2, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}, - }, nil) // first round of the run loop mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ @@ -148,8 +136,6 @@ func TestTaskExecutorRun(t *testing.T) { // idempotent, so fail it. subtaskID := int64(2) theSubtask := &proto.Subtask{ID: subtaskID, Type: tp, Step: proto.StepOne, State: proto.SubtaskStateRunning, ExecID: "id"} - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{theSubtask}, nil) mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(theSubtask, nil) @@ -163,8 +149,6 @@ func TestTaskExecutorRun(t *testing.T) { // run previous left subtask in running state again, but the subtask idempotent, // run it again. theSubtask = &proto.Subtask{ID: subtaskID, Type: tp, Step: proto.StepOne, State: proto.SubtaskStateRunning, ExecID: "id"} - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{theSubtask}, nil) mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) // first round of the run loop mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, @@ -182,9 +166,6 @@ func TestTaskExecutorRun(t *testing.T) { require.True(t, ctrl.Satisfied()) // 6. cancel - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{ - ID: 2, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ @@ -198,9 +179,6 @@ func TestTaskExecutorRun(t *testing.T) { require.True(t, ctrl.Satisfied()) // 7. RunSubtask return context.Canceled - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{ - ID: 2, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ @@ -213,9 +191,6 @@ func TestTaskExecutorRun(t *testing.T) { require.True(t, ctrl.Satisfied()) // 8. grpc cancel - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{ - ID: 2, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ @@ -229,9 +204,6 @@ func TestTaskExecutorRun(t *testing.T) { require.True(t, ctrl.Satisfied()) // 9. annotate grpc cancel - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{ - ID: 2, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ @@ -251,9 +223,6 @@ func TestTaskExecutorRun(t *testing.T) { // 10. subtask owned by other executor mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}, nil) @@ -300,8 +269,6 @@ func TestTaskExecutorRun(t *testing.T) { mockSubtaskTable.EXPECT().HasSubtasksInStates(gomock.Any(), "id", task1.ID, task1.Step, unfinishedNormalSubtaskStates...).Return(true, nil) mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", task1.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(nil, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task1.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(nil, nil) mockStepExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) @@ -406,8 +373,6 @@ func TestTaskExecutor(t *testing.T) { subtasks := []*proto.Subtask{ {ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}, } - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", taskID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(subtasks, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", taskID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(subtasks[0], nil) mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), taskID, "id").Return(nil) @@ -426,8 +391,6 @@ func TestTaskExecutor(t *testing.T) { // 3. run one subtask, then task moved to history(ErrTaskNotFound). mockStepExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "id", taskID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(subtasks, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", taskID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(subtasks[0], nil) mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), taskID, "id").Return(nil) @@ -463,8 +426,6 @@ func TestRunStepCurrentSubtaskScheduledAway(t *testing.T) { // mock for runStep mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockStepExecutor, nil) mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates(gomock.Any(), "tidb1", task.ID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(subtasks, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "tidb1", task.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(subtasks[0], nil) mockSubtaskTable.EXPECT().StartSubtask(gomock.Any(), task.ID, "tidb1").Return(nil) @@ -581,47 +542,10 @@ func TestExecutorErrHandling(t *testing.T) { require.NoError(t, taskExecutor.RunStep(nil)) require.True(t, ctrl.Satisfied()) - // 5. GetSubtasksByStepAndStates meet retryable error. - getSubtasksByExecIDAndStepAndStatesErr := errors.New("get subtasks err") - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) - mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates( - gomock.Any(), - taskExecutor.id, - gomock.Any(), - proto.StepOne, - unfinishedNormalSubtaskStates...).Return(nil, getSubtasksByExecIDAndStepAndStatesErr) - mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(true) - require.NoError(t, taskExecutor.RunStep(nil)) - require.True(t, ctrl.Satisfied()) - - // 6. GetSubtasksByExecIDAndStepAndStates meet non retryable error. - mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) - mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates( - gomock.Any(), - taskExecutor.id, - gomock.Any(), - proto.StepOne, - unfinishedNormalSubtaskStates...).Return(nil, getSubtasksByExecIDAndStepAndStatesErr) - mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) - mockSubtaskTable.EXPECT().FailSubtask(taskExecutor.ctx, taskExecutor.id, gomock.Any(), getSubtasksByExecIDAndStepAndStatesErr) - require.NoError(t, taskExecutor.RunStep(nil)) - require.True(t, ctrl.Satisfied()) - - // 7. Cleanup meet retryable error. + // 5. Cleanup meet retryable error. cleanupErr := errors.New("cleanup err") mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates( - gomock.Any(), - taskExecutor.id, - gomock.Any(), - proto.StepOne, - unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}, nil) @@ -636,16 +560,9 @@ func TestExecutorErrHandling(t *testing.T) { require.NoError(t, taskExecutor.RunStep(nil)) require.True(t, ctrl.Satisfied()) - // 8. Cleanup meet non retryable error. + // 6. Cleanup meet non retryable error. mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates( - gomock.Any(), - taskExecutor.id, - gomock.Any(), - proto.StepOne, - unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}, nil) @@ -661,29 +578,22 @@ func TestExecutorErrHandling(t *testing.T) { require.NoError(t, taskExecutor.RunStep(nil)) require.True(t, ctrl.Satisfied()) - // 9. runSummaryCollectLoop meet retryable error. + // 7. runSummaryCollectLoop meet retryable error. require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockSummaryCollectErr", "return()")) mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(true) require.NoError(t, taskExecutor.RunStep(nil)) require.True(t, ctrl.Satisfied()) - // 10. runSummaryCollectLoop meet non retryable error. + // 8. runSummaryCollectLoop meet non retryable error. mockExtension.EXPECT().IsRetryableError(gomock.Any()).Return(false) mockSubtaskTable.EXPECT().FailSubtask(taskExecutor.ctx, taskExecutor.id, gomock.Any(), gomock.Any()) require.NoError(t, taskExecutor.RunStep(nil)) require.True(t, ctrl.Satisfied()) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/mockSummaryCollectErr")) - // 13. subtask succeed. + // 9. subtask succeed. mockExtension.EXPECT().GetStepExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil) mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtasksByExecIDAndStepAndStates( - gomock.Any(), - taskExecutor.id, - gomock.Any(), - proto.StepOne, - unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}}, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates(gomock.Any(), "id", task.ID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ ID: 1, Type: tp, Step: proto.StepOne, State: proto.SubtaskStatePending, ExecID: "id"}, nil) diff --git a/pkg/metrics/disttask.go b/pkg/metrics/disttask.go index dc8680d84846e..2e166c49b2d20 100644 --- a/pkg/metrics/disttask.go +++ b/pkg/metrics/disttask.go @@ -16,7 +16,6 @@ package metrics import ( "fmt" - "strconv" "time" "github.com/pingcap/tidb/pkg/disttask/framework/proto" @@ -28,6 +27,7 @@ const ( lblTaskType = "task_type" lblTaskID = "task_id" lblSubTaskID = "subtask_id" + lblExecID = "exec_id" ) // status for task @@ -45,8 +45,8 @@ var ( DistTaskStarttimeGauge *prometheus.GaugeVec // DistTaskSubTaskCntGauge is the gauge of dist task subtask count. DistTaskSubTaskCntGauge *prometheus.GaugeVec - // DistTaskSubTaskStartTimeGauge is the gauge of dist task subtask start time. - DistTaskSubTaskStartTimeGauge *prometheus.GaugeVec + // DistTaskSubTaskDurationGauge is the gauge of dist task subtask duration. + DistTaskSubTaskDurationGauge *prometheus.GaugeVec ) // InitDistTaskMetrics initializes disttask metrics. @@ -66,60 +66,6 @@ func InitDistTaskMetrics() { Name: "start_time", Help: "Gauge of start_time of disttask.", }, []string{lblTaskType, lblTaskStatus, lblTaskID}) - - DistTaskSubTaskCntGauge = NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "tidb", - Subsystem: "disttask", - Name: "subtask_cnt", - Help: "Gauge of subtask count.", - }, []string{lblTaskType, lblTaskID, lblTaskStatus}) - - DistTaskSubTaskStartTimeGauge = NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "tidb", - Subsystem: "disttask", - Name: "subtask_start_time", - Help: "Gauge of subtask start time.", - }, []string{lblTaskType, lblTaskID, lblTaskStatus, lblSubTaskID}) -} - -// IncDistTaskSubTaskCnt increases the count of dist task subtask. -func IncDistTaskSubTaskCnt(subtask *proto.Subtask) { - DistTaskSubTaskCntGauge.WithLabelValues( - subtask.Type.String(), - strconv.Itoa(int(subtask.TaskID)), - subtask.State.String(), - ).Inc() -} - -// DecDistTaskSubTaskCnt decreases the count of dist task subtask. -func DecDistTaskSubTaskCnt(subtask *proto.Subtask) { - DistTaskSubTaskCntGauge.WithLabelValues( - subtask.Type.String(), - strconv.Itoa(int(subtask.TaskID)), - subtask.State.String(), - ).Dec() -} - -// StartDistTaskSubTask sets the start time of dist task subtask. -func StartDistTaskSubTask(subtask *proto.Subtask) { - DistTaskSubTaskStartTimeGauge.WithLabelValues( - subtask.Type.String(), - strconv.Itoa(int(subtask.TaskID)), - subtask.State.String(), - strconv.Itoa(int(subtask.ID)), - ).SetToCurrentTime() -} - -// EndDistTaskSubTask deletes the start time of dist task subtask. -func EndDistTaskSubTask(subtask *proto.Subtask) { - DistTaskSubTaskStartTimeGauge.DeleteLabelValues( - subtask.Type.String(), - strconv.Itoa(int(subtask.TaskID)), - subtask.State.String(), - strconv.Itoa(int(subtask.ID)), - ) } // UpdateMetricsForAddTask update metrics when a task is added diff --git a/pkg/metrics/grafana/tidb.json b/pkg/metrics/grafana/tidb.json index 7ff82e29948b3..7a1a39e52a97f 100644 --- a/pkg/metrics/grafana/tidb.json +++ b/pkg/metrics/grafana/tidb.json @@ -14584,7 +14584,7 @@ "h": 8, "w": 12, "x": 0, - "y": 1 + "y": 20 }, "hiddenSeries": false, "id": 322, @@ -14624,7 +14624,7 @@ "expr": "sum(tidb_disttask_task_status{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\"}) by (status, task_type)", "hide": false, "interval": "", - "legendFormat": "{{task_type}}_{{status}}", + "legendFormat": "{{task_type}}-{{status}}", "queryType": "randomWalk", "refId": "A" } @@ -14685,7 +14685,7 @@ "gridPos": { "h": 8, "w": 12, - "x": 12, + "x": 0, "y": 20 }, "hiddenSeries": false, @@ -14717,17 +14717,17 @@ "targets": [ { "exemplar": true, - "expr": "sum(tidb_disttask_subtask_cnt{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=~\"succeed|failed|canceled|reverted|revert_failed\"}) by (task_id, task_type)", + "expr": "sum(tidb_disttask_subtasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=~\"succeed|failed|canceled|reverted|revert_failed\"}) by (task_id, task_type)", "interval": "", - "legendFormat": "{{task_type}}_{{task_id}}_completed_cnt", + "legendFormat": "{{task_type}}-task{{task_id}}-completed", "queryType": "randomWalk", "refId": "A" }, { "exemplar": true, - "expr": "sum(tidb_disttask_subtask_cnt{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (task_id, task_type)", + "expr": "sum(tidb_disttask_subtasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (task_id, task_type)", "interval": "", - "legendFormat": "{{task_type}}_{{task_id}}_total_cnt", + "legendFormat": "{{task_type}}-task{{task_id}}-total", "queryType": "randomWalk", "refId": "B" } @@ -14736,7 +14736,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Dist Task Subtasks", + "title": "Completed/Total Subtask Count", "tooltip": { "shared": true, "sort": 0, @@ -14773,6 +14773,357 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 30 + }, + "hiddenSeries": false, + "id": 329, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(tidb_disttask_subtasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=~\"pending\"}) by (exec_id)", + "interval": "", + "legendFormat": "{{exec_id}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Pending Subtask Count", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 28 + }, + "hiddenSeries": false, + "id": 330, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "tidb_disttask_subtask_duration{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=~\"running\"}", + "interval": "", + "legendFormat": "{{task_type}}-task{{task_id}}-subtask{{subtask_id}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "SubTask Running Duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "fieldConfig": { + "defaults": { + "custom": { + "align": null, + "filterable": false + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "mappings": [ + { + "from": "", + "id": 1, + "text": "", + "to": "", + "type": 1, + "value": "" + } + ], + "color": { + "mode": "thresholds" + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "status" + }, + "properties": [ + { + "id": "custom.width", + "value": 84 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "exec_id" + }, + "properties": [ + { + "id": "custom.width", + "value": 112 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "subtask_id" + }, + "properties": [ + { + "id": "custom.width", + "value": 91 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "task_id" + }, + "properties": [ + { + "id": "custom.width", + "value": 76 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "task_type" + }, + "properties": [ + { + "id": "custom.width", + "value": 108 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "pending_duration" + }, + "properties": [ + { + "id": "unit", + "value": "s" + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 36 + }, + "id": 331, + "options": { + "showHeader": true, + "sortBy": [ + { + "desc": true, + "displayName": "pending_duration" + } + ] + }, + "pluginVersion": "7.5.11", + "targets": [ + { + "exemplar": true, + "expr": "tidb_disttask_subtask_duration{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=\"pending\"}", + "format": "table", + "instant": true, + "interval": "", + "legendFormat": "", + "queryType": "randomWalk", + "refId": "A" + } + ], + "title": "Subtask Pending Duration", + "transformations": [ + { + "id": "organize", + "options": { + "excludeByName": { + "Time": true, + "__name__": true, + "instance": true, + "job": true + }, + "indexByName": {}, + "renameByName": { + "Time": "", + "Value": "pending_duration", + "__name__": "" + } + } + } + ], + "type": "table" + }, { "datasource": "${DS_TEST-CLUSTER}", "fieldConfig": { @@ -14781,6 +15132,7 @@ "mode": "palette-classic" }, "mappings": [], + "nullValueMode": "null as zero", "thresholds": { "mode": "absolute", "steps": [ @@ -14801,10 +15153,10 @@ "gridPos": { "h": 8, "w": 12, - "x": 0, - "y": 36 + "x": 12, + "y": 20 }, - "id": 23763571993, + "id": 332, "options": { "displayLabels": [], "legend": { @@ -14815,7 +15167,7 @@ "pieType": "pie", "reduceOptions": { "calcs": [ - "lastNotNull" + "last" ], "fields": "", "values": false @@ -14826,14 +15178,14 @@ "targets": [ { "exemplar": true, - "expr": "sum(tidb_disttask_subtask_cnt{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=~\"pending|running|reverting|revert_pending|paused\"}) by (instance)", + "expr": "sum(tidb_disttask_subtasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=~\"pending|running|reverting|revert_pending|paused\"}) by (exec_id)", "interval": "", "legendFormat": "", "queryType": "randomWalk", "refId": "A" } ], - "title": "Distributed Task Running Subtask Distribution on TiDB Nodes", + "title": "Uncompleted Subtask Distribution on TiDB Nodes", "type": "piechart" } ], diff --git a/pkg/metrics/grafana/tidb_runtime.json b/pkg/metrics/grafana/tidb_runtime.json index ab97eaa879118..7802db860356d 100644 --- a/pkg/metrics/grafana/tidb_runtime.json +++ b/pkg/metrics/grafana/tidb_runtime.json @@ -1517,314 +1517,6 @@ "align": false, "alignLevel": null } - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_TEST-CLUSTER}", - "description": "", - "fieldConfig": { - "defaults": {}, - "overrides": [] - }, - "fill": 1, - "fillGradient": 0, - "gridPos": { - "h": 7, - "w": 12, - "x": 0, - "y": 50 - }, - "hiddenSeries": false, - "id": 31, - "legend": { - "avg": false, - "current": false, - "max": false, - "min": false, - "show": true, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "nullPointMode": "null", - "options": { - "alertThreshold": true - }, - "percentage": false, - "pluginVersion": "7.5.11", - "pointradius": 2, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "exemplar": true, - "expr": "time()-tidb_disttask_subtask_start_time{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=\"pending\"}", - "interval": "", - "legendFormat": "{{status}}_subtask_id_{{subtask_id}}", - "queryType": "randomWalk", - "refId": "A" - } - ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Distributed Task SubTask Pending Duration", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "s", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - }, - "scopedVars": { - "instance": { - "text": "127.0.0.1:10080", - "value": "127.0.0.1:10080", - "selected": false - } - } - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_TEST-CLUSTER}", - "fieldConfig": { - "defaults": {}, - "overrides": [] - }, - "fill": 1, - "fillGradient": 0, - "gridPos": { - "h": 7, - "w": 12, - "x": 12, - "y": 43 - }, - "hiddenSeries": false, - "id": 32, - "legend": { - "avg": false, - "current": false, - "max": false, - "min": false, - "show": true, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "nullPointMode": "null", - "options": { - "alertThreshold": true - }, - "percentage": false, - "pluginVersion": "7.5.11", - "pointradius": 2, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "exemplar": true, - "expr": "tidb_disttask_subtask_cnt{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=\"pending\"}", - "interval": "", - "legendFormat": "pending_subtasks", - "queryType": "randomWalk", - "refId": "A" - } - ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Distributed Task Pending SubTask Cnt", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - }, - "scopedVars": { - "instance": { - "text": "127.0.0.1:10080", - "value": "127.0.0.1:10080", - "selected": false - } - } - }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "${DS_TEST-CLUSTER}", - "description": "", - "fieldConfig": { - "defaults": {}, - "overrides": [] - }, - "fill": 1, - "fillGradient": 0, - "gridPos": { - "h": 7, - "w": 12, - "x": 12, - "y": 50 - }, - "hiddenSeries": false, - "id": 33, - "legend": { - "avg": false, - "current": false, - "max": false, - "min": false, - "show": true, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "nullPointMode": "null", - "options": { - "alertThreshold": true - }, - "percentage": false, - "pluginVersion": "7.5.11", - "pointradius": 2, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "exemplar": true, - "expr": "time()-tidb_disttask_subtask_start_time{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=\"running\"}", - "interval": "", - "legendFormat": "{{status}}_subtask_id_{{subtask_id}}", - "queryType": "randomWalk", - "refId": "A" - } - ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Distributed Task SubTask Running Duration", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "format": "s", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - }, - "scopedVars": { - "instance": { - "text": "127.0.0.1:10080", - "value": "127.0.0.1:10080", - "selected": false - } - } } ], "repeat": "instance", diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index febefcba69075..0ba2630c656b1 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -270,9 +270,6 @@ func RegisterMetrics() { prometheus.MustRegister(DistTaskGauge) prometheus.MustRegister(DistTaskStarttimeGauge) - prometheus.MustRegister(DistTaskSubTaskCntGauge) - prometheus.MustRegister(DistTaskSubTaskStartTimeGauge) - prometheus.MustRegister(RunawayCheckerCounter) prometheus.MustRegister(GlobalSortWriteToCloudStorageDuration) prometheus.MustRegister(GlobalSortWriteToCloudStorageRate)