diff --git a/disttask/framework/dispatcher/BUILD.bazel b/disttask/framework/dispatcher/BUILD.bazel index bb4f1206363f0..254745ffe845f 100644 --- a/disttask/framework/dispatcher/BUILD.bazel +++ b/disttask/framework/dispatcher/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//disttask/framework/proto", "//disttask/framework/storage", "//domain/infosync", + "//metrics", "//resourcemanager/pool/spool", "//resourcemanager/util", "//sessionctx", diff --git a/disttask/framework/dispatcher/dispatcher.go b/disttask/framework/dispatcher/dispatcher.go index bcc1654be0c0f..179e4bf4c3722 100644 --- a/disttask/framework/dispatcher/dispatcher.go +++ b/disttask/framework/dispatcher/dispatcher.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" disttaskutil "github.com/pingcap/tidb/util/disttask" "github.com/pingcap/tidb/util/intest" @@ -338,6 +339,7 @@ func (d *BaseDispatcher) onRunning() error { } func (d *BaseDispatcher) onFinished() error { + metrics.UpdateMetricsForFinishTask(d.Task) logutil.Logger(d.logCtx).Debug("schedule task, task is finished", zap.String("state", d.Task.State)) return d.taskMgr.TransferSubTasks2History(d.Task.ID) } diff --git a/disttask/framework/dispatcher/dispatcher_manager.go b/disttask/framework/dispatcher/dispatcher_manager.go index 84445117ab0af..24bae373c9128 100644 --- a/disttask/framework/dispatcher/dispatcher_manager.go +++ b/disttask/framework/dispatcher/dispatcher_manager.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/storage" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/resourcemanager/pool/spool" "github.com/pingcap/tidb/resourcemanager/util" tidbutil "github.com/pingcap/tidb/util" @@ -55,6 +56,7 @@ func (dm *Manager) setRunningTask(task *proto.Task, dispatcher Dispatcher) { defer dm.runningTasks.Unlock() dm.runningTasks.taskIDs[task.ID] = struct{}{} dm.runningTasks.dispatchers[task.ID] = dispatcher + metrics.UpdateMetricsForRunTask(task) } func (dm *Manager) isRunningTask(taskID int64) bool { @@ -183,6 +185,7 @@ func (dm *Manager) dispatchTaskLoop() { if dm.isRunningTask(task.ID) { continue } + metrics.DistTaskGauge.WithLabelValues(task.Type, metrics.DispatchingStatus).Inc() // we check it before start dispatcher, so no need to check it again. // see startDispatcher. // this should not happen normally, unless user modify system table @@ -196,6 +199,7 @@ func (dm *Manager) dispatchTaskLoop() { // the task is not in runningTasks set when: // owner changed or task is cancelled when status is pending. if task.State == proto.TaskStateRunning || task.State == proto.TaskStateReverting || task.State == proto.TaskStateCancelling { + metrics.UpdateMetricsForDispatchTask(task) dm.startDispatcher(task) cnt++ continue @@ -203,6 +207,7 @@ func (dm *Manager) dispatchTaskLoop() { if dm.checkConcurrencyOverflow(cnt) { break } + metrics.UpdateMetricsForDispatchTask(task) dm.startDispatcher(task) cnt++ } diff --git a/disttask/framework/handle/BUILD.bazel b/disttask/framework/handle/BUILD.bazel index 45d49c70e3675..5d0928ab3582a 100644 --- a/disttask/framework/handle/BUILD.bazel +++ b/disttask/framework/handle/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//disttask/framework/proto", "//disttask/framework/storage", + "//metrics", "//util/backoff", "//util/logutil", "@com_github_pingcap_errors//:errors", diff --git a/disttask/framework/handle/handle.go b/disttask/framework/handle/handle.go index a2df40176c493..afc2b18c9f5da 100644 --- a/disttask/framework/handle/handle.go +++ b/disttask/framework/handle/handle.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/storage" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/util/backoff" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -55,6 +56,7 @@ func SubmitGlobalTask(taskKey, taskType string, concurrency int, taskMeta []byte if globalTask == nil { return nil, errors.Errorf("cannot find global task with ID %d", taskID) } + metrics.UpdateMetricsForAddTask(globalTask) } return globalTask, nil } diff --git a/metrics/disttask.go b/metrics/disttask.go index c9d1450b8ad47..fcd57664a855b 100644 --- a/metrics/disttask.go +++ b/metrics/disttask.go @@ -15,7 +15,9 @@ package metrics import ( + "fmt" "strconv" + "time" "github.com/pingcap/tidb/disttask/framework/proto" "github.com/prometheus/client_golang/prometheus" @@ -28,7 +30,19 @@ const ( lblSubTaskID = "subtask_id" ) +// status for task +const ( + DispatchingStatus = "dispatching" + WaitingStatus = "waiting" + RunningStatus = "running" + CompletedStatus = "completed" +) + var ( + //DistTaskGauge is the gauge of dist task count. + DistTaskGauge *prometheus.GaugeVec + //DistTaskStarttimeGauge is the gauge of dist task count. + DistTaskStarttimeGauge *prometheus.GaugeVec // DistTaskSubTaskCntGauge is the gauge of dist task subtask count. DistTaskSubTaskCntGauge *prometheus.GaugeVec // DistTaskSubTaskStartTimeGauge is the gauge of dist task subtask start time. @@ -37,6 +51,22 @@ var ( // InitDistTaskMetrics initializes disttask metrics. func InitDistTaskMetrics() { + DistTaskGauge = NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "disttask", + Name: "task_status", + Help: "Gauge of disttask.", + }, []string{lblTaskType, lblTaskStatus}) + + DistTaskStarttimeGauge = NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "disttask", + Name: "start_time", + Help: "Gauge of start_time of disttask.", + }, []string{lblTaskType, lblTaskStatus, lblTaskID}) + DistTaskSubTaskCntGauge = NewGaugeVec( prometheus.GaugeOpts{ Namespace: "tidb", @@ -91,3 +121,29 @@ func EndDistTaskSubTask(subtask *proto.Subtask) { strconv.Itoa(int(subtask.ID)), ) } + +// UpdateMetricsForAddTask update metrics when a task is added +func UpdateMetricsForAddTask(task *proto.Task) { + DistTaskGauge.WithLabelValues(task.Type, WaitingStatus).Inc() + DistTaskStarttimeGauge.WithLabelValues(task.Type, WaitingStatus, fmt.Sprint(task.ID)).Set(float64(time.Now().UnixMicro())) +} + +// UpdateMetricsForDispatchTask update metrics when a task is added +func UpdateMetricsForDispatchTask(task *proto.Task) { + DistTaskGauge.WithLabelValues(task.Type, WaitingStatus).Dec() + DistTaskStarttimeGauge.DeleteLabelValues(task.Type, WaitingStatus, fmt.Sprint(task.ID)) + DistTaskStarttimeGauge.WithLabelValues(task.Type, DispatchingStatus, fmt.Sprint(task.ID)).SetToCurrentTime() +} + +// UpdateMetricsForRunTask update metrics when a task starts running +func UpdateMetricsForRunTask(task *proto.Task) { + DistTaskStarttimeGauge.DeleteLabelValues(task.Type, DispatchingStatus, fmt.Sprint(task.ID)) + DistTaskGauge.WithLabelValues(task.Type, DispatchingStatus).Dec() + DistTaskGauge.WithLabelValues(task.Type, RunningStatus).Inc() +} + +// UpdateMetricsForFinishTask update metrics when a task is finished +func UpdateMetricsForFinishTask(task *proto.Task) { + DistTaskGauge.WithLabelValues(task.Type, RunningStatus).Dec() + DistTaskGauge.WithLabelValues(task.Type, CompletedStatus).Inc() +} diff --git a/metrics/grafana/tidb.json b/metrics/grafana/tidb.json index 06ade9cc86cb2..d09078d5c457e 100644 --- a/metrics/grafana/tidb.json +++ b/metrics/grafana/tidb.json @@ -13759,6 +13759,317 @@ }, "id": 153, "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 1 + }, + "hiddenSeries": false, + "id": 319, + "interval": "1s", + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "maxDataPoints": null, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "tidb_disttask_task_status{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\"}", + "hide": false, + "interval": "", + "legendFormat": "{{task_type}}_{{task_id}}_{{status}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Task Status", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 1 + }, + "hiddenSeries": false, + "id": 321, + "interval": "1s", + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "time() - timestamp(tidb_disttask_start_time{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",status=\"waiting\"})", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{task_type}}_{{task_id}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Task Waiting Duraion", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:121", + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:122", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 9 + }, + "hiddenSeries": false, + "id": 323, + "interval": "1s", + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "maxDataPoints": null, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "time() - timestamp(tidb_disttask_start_time{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",status=\"dispatching\"})", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{task_type}}_{{task_id}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Task Dispatching Duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:275", + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "$$hashKey": "object:276", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "aliasColors": {}, "bars": false, diff --git a/metrics/metrics.go b/metrics/metrics.go index 1a438827c09f5..895aa656f6a19 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -265,6 +265,8 @@ func RegisterMetrics() { prometheus.MustRegister(PlanReplayerTaskCounter) prometheus.MustRegister(PlanReplayerRegisterTaskGauge) + prometheus.MustRegister(DistTaskGauge) + prometheus.MustRegister(DistTaskStarttimeGauge) prometheus.MustRegister(DistTaskSubTaskCntGauge) prometheus.MustRegister(DistTaskSubTaskStartTimeGauge)