Skip to content

Commit

Permalink
disttask: refactor metrics, collect metrics in scheduler manager (#50634
Browse files Browse the repository at this point in the history
)

close #49615
  • Loading branch information
okJiang authored Jan 31, 2024
1 parent acd4999 commit 2c25e89
Show file tree
Hide file tree
Showing 15 changed files with 598 additions and 540 deletions.
15 changes: 15 additions & 0 deletions pkg/disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions pkg/disttask/framework/proto/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ const (
SubtaskStatePaused SubtaskState = "paused"
)

// AllSubtaskStates is all subtask state.
var AllSubtaskStates = []SubtaskState{
SubtaskStatePending,
SubtaskStateRunning,
SubtaskStateSucceed,
SubtaskStateFailed,
SubtaskStateCanceled,
SubtaskStatePaused,
}

type (
// SubtaskState is the state of subtask.
SubtaskState string
Expand Down
2 changes: 2 additions & 0 deletions pkg/disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "scheduler",
srcs = [
"balancer.go",
"collector.go",
"interface.go",
"nodes.go",
"scheduler.go",
Expand Down Expand Up @@ -32,6 +33,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_prometheus_client_golang//prometheus",
"@org_uber_go_zap//:zap",
],
)
Expand Down
126 changes: 126 additions & 0 deletions pkg/disttask/framework/scheduler/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package scheduler

import (
"strconv"
"sync/atomic"
"time"

"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/prometheus/client_golang/prometheus"
)

var subtaskCollector = newCollector()

func init() {
prometheus.MustRegister(subtaskCollector)
}

// Because the exec_id of a subtask may change, after all tasks
// are successful, subtasks will be migrated from tidb_subtask_background
// to tidb_subtask_background_history. In the above situation,
// the built-in collector of Prometheus needs to delete the previously
// added metrics, which is quite troublesome.
// Therefore, a custom collector is used.
type collector struct {
subtaskInfo atomic.Pointer[[]*proto.Subtask]

subtasks *prometheus.Desc
subtaskDuration *prometheus.Desc
}

func newCollector() *collector {
return &collector{
subtasks: prometheus.NewDesc(
"tidb_disttask_subtasks",
"Number of subtasks.",
[]string{"task_type", "task_id", "status", "exec_id"}, nil,
),
subtaskDuration: prometheus.NewDesc(
"tidb_disttask_subtask_duration",
"Duration of subtasks in different states.",
[]string{"task_type", "task_id", "status", "subtask_id", "exec_id"}, nil,
),
}
}

// Describe implements the prometheus.Collector interface.
func (c *collector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.subtasks
ch <- c.subtaskDuration
}

// Collect implements the prometheus.Collector interface.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
p := c.subtaskInfo.Load()
if p == nil {
return
}
subtasks := *p

// taskID => execID => state => cnt
subtaskCnt := make(map[int64]map[string]map[proto.SubtaskState]int)
taskType := make(map[int64]proto.TaskType)
for _, subtask := range subtasks {
if _, ok := subtaskCnt[subtask.TaskID]; !ok {
subtaskCnt[subtask.TaskID] = make(map[string]map[proto.SubtaskState]int)
}
if _, ok := subtaskCnt[subtask.TaskID][subtask.ExecID]; !ok {
subtaskCnt[subtask.TaskID][subtask.ExecID] = make(map[proto.SubtaskState]int)
}

subtaskCnt[subtask.TaskID][subtask.ExecID][subtask.State]++
taskType[subtask.TaskID] = subtask.Type

c.setDistSubtaskDuration(ch, subtask)
}
for taskID, execIDMap := range subtaskCnt {
for execID, stateMap := range execIDMap {
for state, cnt := range stateMap {
ch <- prometheus.MustNewConstMetric(c.subtasks, prometheus.GaugeValue,
float64(cnt),
taskType[taskID].String(),
strconv.Itoa(int(taskID)),
state.String(),
execID,
)
}
}
}
}

func (c *collector) setDistSubtaskDuration(ch chan<- prometheus.Metric, subtask *proto.Subtask) {
switch subtask.State {
case proto.SubtaskStatePending:
ch <- prometheus.MustNewConstMetric(c.subtaskDuration, prometheus.GaugeValue,
time.Since(subtask.CreateTime).Seconds(),
subtask.Type.String(),
strconv.Itoa(int(subtask.TaskID)),
subtask.State.String(),
strconv.Itoa(int(subtask.ID)),
subtask.ExecID,
)
case proto.SubtaskStateRunning:
ch <- prometheus.MustNewConstMetric(c.subtaskDuration, prometheus.GaugeValue,
time.Since(subtask.StartTime).Seconds(),
subtask.Type.String(),
strconv.Itoa(int(subtask.TaskID)),
subtask.State.String(),
strconv.Itoa(int(subtask.ID)),
subtask.ExecID,
)
}
}
2 changes: 2 additions & 0 deletions pkg/disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type TaskManager interface {
// The returned tasks are sorted by task order, see proto.Task, and only contains
// some fields, see row2TaskBasic.
GetTopUnfinishedTasks(ctx context.Context) ([]*proto.Task, error)
// GetAllSubtasks gets all subtasks with basic columns.
GetAllSubtasks(ctx context.Context) ([]*proto.Subtask, error)
GetTasksInStates(ctx context.Context, states ...any) (task []*proto.Task, err error)
GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
GCSubtasks(ctx context.Context) error
Expand Down
29 changes: 28 additions & 1 deletion pkg/disttask/framework/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ var (
// defaultHistorySubtaskTableGcInterval is the interval of gc history subtask table.
defaultHistorySubtaskTableGcInterval = 24 * time.Hour
// DefaultCleanUpInterval is the interval of cleanup routine.
DefaultCleanUpInterval = 10 * time.Minute
DefaultCleanUpInterval = 10 * time.Minute
defaultCollectMetricsInterval = 5 * time.Second
)

// WaitTaskFinished is used to sync the test.
Expand Down Expand Up @@ -162,6 +163,7 @@ func (sm *Manager) Start() {
sm.wg.Run(sm.scheduleTaskLoop)
sm.wg.Run(sm.gcSubtaskHistoryTableLoop)
sm.wg.Run(sm.cleanupTaskLoop)
sm.wg.Run(sm.collectLoop)
sm.wg.Run(func() {
sm.nodeMgr.maintainLiveNodesLoop(sm.ctx, sm.taskMgr)
})
Expand Down Expand Up @@ -419,3 +421,28 @@ func (sm *Manager) MockScheduler(task *proto.Task) *BaseScheduler {
serverID: sm.serverID,
})
}

func (sm *Manager) collectLoop() {
sm.logger.Info("collect loop start")
ticker := time.NewTicker(defaultCollectMetricsInterval)
defer ticker.Stop()
for {
select {
case <-sm.ctx.Done():
sm.logger.Info("collect loop exits")
return
case <-ticker.C:
sm.collect()
}
}
}

func (sm *Manager) collect() {
subtasks, err := sm.taskMgr.GetAllSubtasks(sm.ctx)
if err != nil {
sm.logger.Warn("get all subtasks failed", zap.Error(err))
return
}

subtaskCollector.subtaskInfo.Store(&subtasks)
}
21 changes: 14 additions & 7 deletions pkg/disttask/framework/storage/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ func row2BasicSubTask(r chunk.Row) *proto.Subtask {
if !r.IsNull(8) {
ordinal = int(r.GetInt64(8))
}

// subtask defines start time as bigint, to ensure backward compatible,
// we keep it that way, and we convert it here.
var startTime time.Time
if !r.IsNull(9) {
ts := r.GetInt64(9)
startTime = time.Unix(ts, 0)
}

subtask := &proto.Subtask{
ID: r.GetInt64(0),
Step: proto.Step(r.GetInt64(1)),
Expand All @@ -89,25 +98,23 @@ func row2BasicSubTask(r chunk.Row) *proto.Subtask {
Concurrency: int(r.GetInt64(6)),
CreateTime: createTime,
Ordinal: ordinal,
StartTime: startTime,
}
return subtask
}

// Row2SubTask converts a row to a subtask.
func Row2SubTask(r chunk.Row) *proto.Subtask {
subtask := row2BasicSubTask(r)
// subtask defines start/update time as bigint, to ensure backward compatible,

// subtask defines update time as bigint, to ensure backward compatible,
// we keep it that way, and we convert it here.
var startTime, updateTime time.Time
if !r.IsNull(9) {
ts := r.GetInt64(9)
startTime = time.Unix(ts, 0)
}
var updateTime time.Time
if !r.IsNull(10) {
ts := r.GetInt64(10)
updateTime = time.Unix(ts, 0)
}
subtask.StartTime = startTime

subtask.UpdateTime = updateTime
subtask.Meta = r.GetBytes(11)
subtask.Summary = r.GetJSON(12).String()
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/storage/subtask_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (mgr *TaskManager) RunningSubtasksBack2Pending(ctx context.Context, subtask
for _, subtask := range subtasks {
_, err := sqlexec.ExecSQL(ctx, se, `
update mysql.tidb_background_subtask
set state = %?, state_update_time = CURRENT_TIMESTAMP()
set state = %?, state_update_time = unix_timestamp()
where id = %? and exec_id = %? and state = %?`,
proto.SubtaskStatePending, subtask.ID, subtask.ExecID, proto.SubtaskStateRunning)
if err != nil {
Expand Down
20 changes: 18 additions & 2 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ const (
TaskColumns = basicTaskColumns + `, t.start_time, t.state_update_time, t.meta, t.dispatcher_id, t.error`
// InsertTaskColumns is the columns used in insert task.
InsertTaskColumns = `task_key, type, state, priority, concurrency, step, meta, create_time`
basicSubtaskColumns = `id, step, task_key, type, exec_id, state, concurrency, create_time, ordinal`
basicSubtaskColumns = `id, step, task_key, type, exec_id, state, concurrency, create_time, ordinal, start_time`
// SubtaskColumns is the columns for subtask.
SubtaskColumns = basicSubtaskColumns + `, start_time, state_update_time, meta, summary`
SubtaskColumns = basicSubtaskColumns + `, state_update_time, meta, summary`
// InsertSubtaskColumns is the columns used in insert subtask.
InsertSubtaskColumns = `step, task_key, exec_id, meta, state, type, concurrency, ordinal, create_time, checkpoint, summary`
)
Expand Down Expand Up @@ -730,3 +730,19 @@ func (mgr *TaskManager) GetSubtasksWithHistory(ctx context.Context, taskID int64
}
return subtasks, nil
}

// GetAllSubtasks gets all subtasks with basic columns.
func (mgr *TaskManager) GetAllSubtasks(ctx context.Context) ([]*proto.Subtask, error) {
rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+basicSubtaskColumns+` from mysql.tidb_background_subtask`)
if err != nil {
return nil, err
}
if len(rs) == 0 {
return nil, nil
}
subtasks := make([]*proto.Subtask, 0, len(rs))
for _, r := range rs {
subtasks = append(subtasks, row2BasicSubTask(r))
}
return subtasks, nil
}
Loading

0 comments on commit 2c25e89

Please sign in to comment.