Skip to content

Commit

Permalink
Merge branch 'master' into add_lint_check
Browse files Browse the repository at this point in the history
  • Loading branch information
CabinfeverB committed Jan 31, 2024
2 parents a7ce8e5 + 2c25e89 commit ee1b2a1
Show file tree
Hide file tree
Showing 25 changed files with 877 additions and 587 deletions.
149 changes: 149 additions & 0 deletions docs/design/2022-11-23-batch-cop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Proposal: support batch coprocessor for tikv

* Authors: [cfzjywxk](https://github.com/cfzjywxk)
* Tracking issue: [39361](https://github.com/pingcap/tidb/issues/39361)

## Motivation

The fanout issue in index lookup queries is one cause of increased query latency and cost. If there are
1,000 handles and they are distributed in 1,000 regions, TiDB would construct 1,000 small tasks to retrieve
the 1000 related row contents, even when all the region leaders are in the same store. This results in the following problems:
1. Each task requires a single RPC request, there could be too many tasks or RPC requests though each
request just fetches a few rows. Sometimes the cost of RPC could not be ignored.
2. Increasing task numbers may lead to more queueing. Tuning the related concurrency parameters or task scheduling
policies become more complex and it’s difficult to get best performance.

In the current coprocessor implementation, key ranges in the same region would be batched in a single
task(there is a hard coded 25000 upper limit), how about batching all the cop tasks which would
be sent to the same store?

In a user situation, the index range scan returns 4000000 rows, and finally 400000 coprocessor table-lookup
tasks are generated, which means the key ranges are scattered in different regions.

## Optimization

### The IndexLookUp Execution Review

Usually, the IndexLookUp executor may have an index worker which tries to read index keys and related row handles
according to the index filter conditions. Each time it fetches enough row handle data, it would create a
coprocessor table lookup task and send it to the table workers. The handle data size limit for one task could be configured
by the [tidb_index_lookup_size](https://docs.pingcap.com/tidb/dev/system-variables#tidb_index_lookup_size)
system variable.

When the table worker gets a coprocessor task, it would split the handle ranges according to the region
information from the region cache. Then these region-aware tasks are processed by the coprocessor client
which has a default concurrency limit configured by the [tidb_distsql_scan_concurrency](https://docs.pingcap.com/tidb/dev/system-variables#tidb_distsql_scan_concurrency) system
variable.

### Batching Strategy

As coprocessor streaming is already deprecated, bringing it back may not be a good idea. To make the design
simple, we could just do the batching for each coprocessor table task separately. Different coprocessor table
tasks may still require different RPC requests, while row handle ranges within one task could be batched if
their region leaders are in the same store. The main idea is trying to batch sending the tasks using one
RPC for each original `copTask` if the row handle range-related region leaders are located in the same tikv store.

With the batching optimization, the number of RPC requests may be at most the number of store nodes for each table lookup task
. Consider an extreme case, if the index scan returns 4000000 rows and each task range is one row
, there could be as many as `4000000/25000=160` table lookup tasks each containg 25000 key ranges. But now the RPC number
would become at most `160 * store_numbers`, for example if store_number is 10, the total request number is
1600 which is much less than the previous 400000.

### Proto Change

Create a new structure for the batched tasks, including the request `StoreBatchTask` and response `StoreBatchTaskResponse` types.

```protobuf
message StoreBatchTask {
uint64 region_id = 1;
metapb.RegionEpoch region_epoch = 2;
metapb.Peer peer = 3;
repeated KeyRange ranges = 4;
uint64 task_id = 5;
}
```

```protobuf
message StoreBatchTaskResponse {
bytes data = 1 [(gogoproto.customtype) = "github.com/pingcap/kvproto/pkg/sharedbytes.SharedBytes", (gogoproto.nullable) = false];
errorpb.Error region_error = 2;
kvrpcpb.LockInfo locked = 3;
string other_error = 4;
uint64 task_id = 5;
kvrpcpb.ExecDetailsV2 exec_details_v2 = 6;
}
```

Attach the batched tasks into the `Corprocessor` request. Reuse the `RegionInfo` mentioned above to store tasks
in different regions but the same store.
```protobuf
message Request {
// Store the batched tasks belonging to other regions.
repeated StoreBatchTask tasks = 11;
}
```

Add batched task results in `Response`, different tasks may encounter different kinds of errors, collect them
together.
```protobuf
message Response {
repeated StoreBatchTaskResponse batch_responses = 13;
}
```

### The TiDB Side

Adding a flag in `kv.Request` to indicate if the batch strategy is enabled or not.
```golang
type Request struct {
// EnableStoreBatch indicates if the tasks are batched.
EnableStoreBatch bool
}
```

Adding batch task related fields in `copr.copTask`. They would be collected when the `copTask` is being
prepared and the store batch is enabled.
```golang
type copTask struct {
//
batchTaskList []kvproto.Coprocessor.RegionInfo
}
```

When building coprocessor tasks in the `buildCopTasks` function, try to fill the `batchTaskList` if
necessary.The steps are:
1. Creating a map to record `store address => *copTask`.If store batch is enabled, tasks would be appended
to existing `copTask` when the store address is the same.
2. Split the ranges according to the region information as usual. After this, all the tasks correspond
to a single region.
3. When processing a new `KeyLocation`, try to append it as the batch task to the existing coprocessor task
if possible.

The coprocessor client just sends the tasks as usual, the `Coprocessor` request is still a unary RPC
request though it may be batched. When handling `CopResponse`, if the batch path is enabled and
there are region errors or other errors processing batch tasks, rescheduling the cop tasks or
reporting errors to the upper layer.

Note if the `keepOrder` is required, the partial query result could not be sent back until all the reads
have succeeded.



### The TiKV Side

A simple way is to change the logic in `Endpoint.parse_and_handle_unary_request`, after parsing the
original request, the batched task-related builder and handler could be also generated using the input
information from the RPC context, region information, and key ranges as long as they are properly passed in
the `Coprocessor` request.

All the request handling could be scheduled to the read pool at the same time,
so before finishing something like `join_all` would be needed to wait for all the results of
different tasks. If any error is returned, do fill in the error fields in the `Response`.

For the execution tracking, creating seperate trackers for the requests, all the execution details would be returned
to the client.
15 changes: 15 additions & 0 deletions pkg/disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions pkg/disttask/framework/proto/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ const (
SubtaskStatePaused SubtaskState = "paused"
)

// AllSubtaskStates is all subtask state.
var AllSubtaskStates = []SubtaskState{
SubtaskStatePending,
SubtaskStateRunning,
SubtaskStateSucceed,
SubtaskStateFailed,
SubtaskStateCanceled,
SubtaskStatePaused,
}

type (
// SubtaskState is the state of subtask.
SubtaskState string
Expand Down
2 changes: 2 additions & 0 deletions pkg/disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "scheduler",
srcs = [
"balancer.go",
"collector.go",
"interface.go",
"nodes.go",
"scheduler.go",
Expand Down Expand Up @@ -32,6 +33,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_prometheus_client_golang//prometheus",
"@org_uber_go_zap//:zap",
],
)
Expand Down
126 changes: 126 additions & 0 deletions pkg/disttask/framework/scheduler/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package scheduler

import (
"strconv"
"sync/atomic"
"time"

"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/prometheus/client_golang/prometheus"
)

var subtaskCollector = newCollector()

func init() {
prometheus.MustRegister(subtaskCollector)
}

// Because the exec_id of a subtask may change, after all tasks
// are successful, subtasks will be migrated from tidb_subtask_background
// to tidb_subtask_background_history. In the above situation,
// the built-in collector of Prometheus needs to delete the previously
// added metrics, which is quite troublesome.
// Therefore, a custom collector is used.
type collector struct {
subtaskInfo atomic.Pointer[[]*proto.Subtask]

subtasks *prometheus.Desc
subtaskDuration *prometheus.Desc
}

func newCollector() *collector {
return &collector{
subtasks: prometheus.NewDesc(
"tidb_disttask_subtasks",
"Number of subtasks.",
[]string{"task_type", "task_id", "status", "exec_id"}, nil,
),
subtaskDuration: prometheus.NewDesc(
"tidb_disttask_subtask_duration",
"Duration of subtasks in different states.",
[]string{"task_type", "task_id", "status", "subtask_id", "exec_id"}, nil,
),
}
}

// Describe implements the prometheus.Collector interface.
func (c *collector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.subtasks
ch <- c.subtaskDuration
}

// Collect implements the prometheus.Collector interface.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
p := c.subtaskInfo.Load()
if p == nil {
return
}
subtasks := *p

// taskID => execID => state => cnt
subtaskCnt := make(map[int64]map[string]map[proto.SubtaskState]int)
taskType := make(map[int64]proto.TaskType)
for _, subtask := range subtasks {
if _, ok := subtaskCnt[subtask.TaskID]; !ok {
subtaskCnt[subtask.TaskID] = make(map[string]map[proto.SubtaskState]int)
}
if _, ok := subtaskCnt[subtask.TaskID][subtask.ExecID]; !ok {
subtaskCnt[subtask.TaskID][subtask.ExecID] = make(map[proto.SubtaskState]int)
}

subtaskCnt[subtask.TaskID][subtask.ExecID][subtask.State]++
taskType[subtask.TaskID] = subtask.Type

c.setDistSubtaskDuration(ch, subtask)
}
for taskID, execIDMap := range subtaskCnt {
for execID, stateMap := range execIDMap {
for state, cnt := range stateMap {
ch <- prometheus.MustNewConstMetric(c.subtasks, prometheus.GaugeValue,
float64(cnt),
taskType[taskID].String(),
strconv.Itoa(int(taskID)),
state.String(),
execID,
)
}
}
}
}

func (c *collector) setDistSubtaskDuration(ch chan<- prometheus.Metric, subtask *proto.Subtask) {
switch subtask.State {
case proto.SubtaskStatePending:
ch <- prometheus.MustNewConstMetric(c.subtaskDuration, prometheus.GaugeValue,
time.Since(subtask.CreateTime).Seconds(),
subtask.Type.String(),
strconv.Itoa(int(subtask.TaskID)),
subtask.State.String(),
strconv.Itoa(int(subtask.ID)),
subtask.ExecID,
)
case proto.SubtaskStateRunning:
ch <- prometheus.MustNewConstMetric(c.subtaskDuration, prometheus.GaugeValue,
time.Since(subtask.StartTime).Seconds(),
subtask.Type.String(),
strconv.Itoa(int(subtask.TaskID)),
subtask.State.String(),
strconv.Itoa(int(subtask.ID)),
subtask.ExecID,
)
}
}
2 changes: 2 additions & 0 deletions pkg/disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type TaskManager interface {
// The returned tasks are sorted by task order, see proto.Task, and only contains
// some fields, see row2TaskBasic.
GetTopUnfinishedTasks(ctx context.Context) ([]*proto.Task, error)
// GetAllSubtasks gets all subtasks with basic columns.
GetAllSubtasks(ctx context.Context) ([]*proto.Subtask, error)
GetTasksInStates(ctx context.Context, states ...any) (task []*proto.Task, err error)
GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
GCSubtasks(ctx context.Context) error
Expand Down
29 changes: 28 additions & 1 deletion pkg/disttask/framework/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ var (
// defaultHistorySubtaskTableGcInterval is the interval of gc history subtask table.
defaultHistorySubtaskTableGcInterval = 24 * time.Hour
// DefaultCleanUpInterval is the interval of cleanup routine.
DefaultCleanUpInterval = 10 * time.Minute
DefaultCleanUpInterval = 10 * time.Minute
defaultCollectMetricsInterval = 5 * time.Second
)

// WaitTaskFinished is used to sync the test.
Expand Down Expand Up @@ -162,6 +163,7 @@ func (sm *Manager) Start() {
sm.wg.Run(sm.scheduleTaskLoop)
sm.wg.Run(sm.gcSubtaskHistoryTableLoop)
sm.wg.Run(sm.cleanupTaskLoop)
sm.wg.Run(sm.collectLoop)
sm.wg.Run(func() {
sm.nodeMgr.maintainLiveNodesLoop(sm.ctx, sm.taskMgr)
})
Expand Down Expand Up @@ -419,3 +421,28 @@ func (sm *Manager) MockScheduler(task *proto.Task) *BaseScheduler {
serverID: sm.serverID,
})
}

func (sm *Manager) collectLoop() {
sm.logger.Info("collect loop start")
ticker := time.NewTicker(defaultCollectMetricsInterval)
defer ticker.Stop()
for {
select {
case <-sm.ctx.Done():
sm.logger.Info("collect loop exits")
return
case <-ticker.C:
sm.collect()
}
}
}

func (sm *Manager) collect() {
subtasks, err := sm.taskMgr.GetAllSubtasks(sm.ctx)
if err != nil {
sm.logger.Warn("get all subtasks failed", zap.Error(err))
return
}

subtaskCollector.subtaskInfo.Store(&subtasks)
}
Loading

0 comments on commit ee1b2a1

Please sign in to comment.