Skip to content

Commit

Permalink
disttask: add metrics collection for dispatcher (#47018)
Browse files Browse the repository at this point in the history
close #47017
  • Loading branch information
JK1Zhang authored Sep 25, 2023
1 parent 48f6b35 commit 84fe6be
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 0 deletions.
1 change: 1 addition & 0 deletions disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//disttask/framework/proto",
"//disttask/framework/storage",
"//domain/infosync",
"//metrics",
"//resourcemanager/pool/spool",
"//resourcemanager/util",
"//sessionctx",
Expand Down
2 changes: 2 additions & 0 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
disttaskutil "github.com/pingcap/tidb/util/disttask"
"github.com/pingcap/tidb/util/intest"
Expand Down Expand Up @@ -338,6 +339,7 @@ func (d *BaseDispatcher) onRunning() error {
}

func (d *BaseDispatcher) onFinished() error {
metrics.UpdateMetricsForFinishTask(d.Task)
logutil.Logger(d.logCtx).Debug("schedule task, task is finished", zap.String("state", d.Task.State))
return d.taskMgr.TransferSubTasks2History(d.Task.ID)
}
Expand Down
5 changes: 5 additions & 0 deletions disttask/framework/dispatcher/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/resourcemanager/pool/spool"
"github.com/pingcap/tidb/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -55,6 +56,7 @@ func (dm *Manager) setRunningTask(task *proto.Task, dispatcher Dispatcher) {
defer dm.runningTasks.Unlock()
dm.runningTasks.taskIDs[task.ID] = struct{}{}
dm.runningTasks.dispatchers[task.ID] = dispatcher
metrics.UpdateMetricsForRunTask(task)
}

func (dm *Manager) isRunningTask(taskID int64) bool {
Expand Down Expand Up @@ -183,6 +185,7 @@ func (dm *Manager) dispatchTaskLoop() {
if dm.isRunningTask(task.ID) {
continue
}
metrics.DistTaskGauge.WithLabelValues(task.Type, metrics.DispatchingStatus).Inc()
// we check it before start dispatcher, so no need to check it again.
// see startDispatcher.
// this should not happen normally, unless user modify system table
Expand All @@ -196,13 +199,15 @@ func (dm *Manager) dispatchTaskLoop() {
// the task is not in runningTasks set when:
// owner changed or task is cancelled when status is pending.
if task.State == proto.TaskStateRunning || task.State == proto.TaskStateReverting || task.State == proto.TaskStateCancelling {
metrics.UpdateMetricsForDispatchTask(task)
dm.startDispatcher(task)
cnt++
continue
}
if dm.checkConcurrencyOverflow(cnt) {
break
}
metrics.UpdateMetricsForDispatchTask(task)
dm.startDispatcher(task)
cnt++
}
Expand Down
1 change: 1 addition & 0 deletions disttask/framework/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//disttask/framework/proto",
"//disttask/framework/storage",
"//metrics",
"//util/backoff",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
Expand Down
2 changes: 2 additions & 0 deletions disttask/framework/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/util/backoff"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -55,6 +56,7 @@ func SubmitGlobalTask(taskKey, taskType string, concurrency int, taskMeta []byte
if globalTask == nil {
return nil, errors.Errorf("cannot find global task with ID %d", taskID)
}
metrics.UpdateMetricsForAddTask(globalTask)
}
return globalTask, nil
}
Expand Down
56 changes: 56 additions & 0 deletions metrics/disttask.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package metrics

import (
"fmt"
"strconv"
"time"

"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -28,7 +30,19 @@ const (
lblSubTaskID = "subtask_id"
)

// status for task
const (
DispatchingStatus = "dispatching"
WaitingStatus = "waiting"
RunningStatus = "running"
CompletedStatus = "completed"
)

var (
//DistTaskGauge is the gauge of dist task count.
DistTaskGauge *prometheus.GaugeVec
//DistTaskStarttimeGauge is the gauge of dist task count.
DistTaskStarttimeGauge *prometheus.GaugeVec
// DistTaskSubTaskCntGauge is the gauge of dist task subtask count.
DistTaskSubTaskCntGauge *prometheus.GaugeVec
// DistTaskSubTaskStartTimeGauge is the gauge of dist task subtask start time.
Expand All @@ -37,6 +51,22 @@ var (

// InitDistTaskMetrics initializes disttask metrics.
func InitDistTaskMetrics() {
DistTaskGauge = NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "disttask",
Name: "task_status",
Help: "Gauge of disttask.",
}, []string{lblTaskType, lblTaskStatus})

DistTaskStarttimeGauge = NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "disttask",
Name: "start_time",
Help: "Gauge of start_time of disttask.",
}, []string{lblTaskType, lblTaskStatus, lblTaskID})

DistTaskSubTaskCntGauge = NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Expand Down Expand Up @@ -91,3 +121,29 @@ func EndDistTaskSubTask(subtask *proto.Subtask) {
strconv.Itoa(int(subtask.ID)),
)
}

// UpdateMetricsForAddTask update metrics when a task is added
func UpdateMetricsForAddTask(task *proto.Task) {
DistTaskGauge.WithLabelValues(task.Type, WaitingStatus).Inc()
DistTaskStarttimeGauge.WithLabelValues(task.Type, WaitingStatus, fmt.Sprint(task.ID)).Set(float64(time.Now().UnixMicro()))
}

// UpdateMetricsForDispatchTask update metrics when a task is added
func UpdateMetricsForDispatchTask(task *proto.Task) {
DistTaskGauge.WithLabelValues(task.Type, WaitingStatus).Dec()
DistTaskStarttimeGauge.DeleteLabelValues(task.Type, WaitingStatus, fmt.Sprint(task.ID))
DistTaskStarttimeGauge.WithLabelValues(task.Type, DispatchingStatus, fmt.Sprint(task.ID)).SetToCurrentTime()
}

// UpdateMetricsForRunTask update metrics when a task starts running
func UpdateMetricsForRunTask(task *proto.Task) {
DistTaskStarttimeGauge.DeleteLabelValues(task.Type, DispatchingStatus, fmt.Sprint(task.ID))
DistTaskGauge.WithLabelValues(task.Type, DispatchingStatus).Dec()
DistTaskGauge.WithLabelValues(task.Type, RunningStatus).Inc()
}

// UpdateMetricsForFinishTask update metrics when a task is finished
func UpdateMetricsForFinishTask(task *proto.Task) {
DistTaskGauge.WithLabelValues(task.Type, RunningStatus).Dec()
DistTaskGauge.WithLabelValues(task.Type, CompletedStatus).Inc()
}
Loading

0 comments on commit 84fe6be

Please sign in to comment.