Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema,planner,execuor: support hot history region #27224

Closed
wants to merge 51 commits into from
Closed
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
dd87813
infoschema: add tidb_hot_regions_history virtual table
IcePigZDB Aug 15, 2021
fe67ab4
planner: add extractor for tidb_hot_regions_history
IcePigZDB Aug 15, 2021
07b0cdd
executor: add retriver for tidb_hot_regions_history
IcePigZDB Aug 15, 2021
2df8855
Merge branch 'master' into hishotregionpr
IcePigZDB Aug 15, 2021
da5baab
planner: improve his hot regions extractor explanInfo, and use IntSet…
IcePigZDB Aug 16, 2021
88d132f
executor: unified use of ms in update_time, and move IntSet init to e…
IcePigZDB Aug 16, 2021
b1d877b
executor: fix time formate bug
IcePigZDB Aug 16, 2021
84fd7a4
executor: fix history hot regions memtable reader test time bug
IcePigZDB Aug 16, 2021
bc5e3eb
executor: call DecodeBytes for region range key, and update test rang…
IcePigZDB Aug 18, 2021
8324c3e
infoschema: change UPDATE_TIME type to TypeTimestamp to support TIMES…
IcePigZDB Aug 19, 2021
33f4f51
planner: use context.timezone instead of time.local
IcePigZDB Aug 19, 2021
28d2400
executor: change UPDATE_TIME type to TypeTimestamp and add timezone c…
IcePigZDB Aug 19, 2021
ac05ea6
executor: devide read and write hot types into two http request to fi…
IcePigZDB Aug 22, 2021
6a4a248
infoschema: add is_leader flag
IcePigZDB Aug 22, 2021
25b6a5d
planner: add extractor for is_leader and add update extractor test
IcePigZDB Aug 22, 2021
f8a45b3
executor: add is_leader in request and update retriver test
IcePigZDB Aug 22, 2021
351bd5a
planner: change extractor code order
IcePigZDB Aug 22, 2021
9728584
executor: remove debug test case
IcePigZDB Aug 22, 2021
b5ecdb9
executor: close httpServers after test down
IcePigZDB Aug 22, 2021
7f07ae1
Merge branch 'master' into hishotregionpr
IcePigZDB Aug 22, 2021
b8967a8
planner: roles from intset to slice
IcePigZDB Aug 23, 2021
e27bbf8
executor: roles from intset to slice
IcePigZDB Aug 23, 2021
bc8cba1
execuotr: use bool for IsLeader
IcePigZDB Aug 23, 2021
ca58a5a
Merge branch 'master' into hishotregionpr
IcePigZDB Aug 24, 2021
9357d24
executor: use http.MethodGet
IcePigZDB Aug 24, 2021
19c0037
Merge branch 'master' into hishotregionpr
IcePigZDB Aug 24, 2021
3c7b4e1
Merge branch 'master' into hishotregionpr
IcePigZDB Aug 24, 2021
3c238ba
planner: remove extraction unnecessary columns
IcePigZDB Aug 25, 2021
f9b0b1c
executor: remove extraction unnecessary columns
IcePigZDB Aug 25, 2021
47ecb36
planner: remove unuse function
IcePigZDB Aug 25, 2021
b258d65
planner: remove unuse function
IcePigZDB Aug 25, 2021
a49182d
planner: review from @rleungx
IcePigZDB Aug 25, 2021
5947504
executor: review from @rleungx
IcePigZDB Aug 25, 2021
37484c3
Merge branch 'master' into hishotregionpr
IcePigZDB Aug 25, 2021
d3917b1
Merge branch 'master' into hishotregionpr
IcePigZDB Aug 25, 2021
9ed1f79
planner: add removed cluster_log test case
IcePigZDB Aug 25, 2021
a81eb94
exectuor:change PD-Allow-follower-handle to false
IcePigZDB Aug 25, 2021
6f2d228
Merge branch 'master' into hishotregionpr
IcePigZDB Aug 25, 2021
13c044b
Merge branch 'master' into hishotregionpr
IcePigZDB Aug 28, 2021
f7f7810
infoschema: add is_learner field
IcePigZDB Aug 28, 2021
5092089
planner: add is_learner field
IcePigZDB Aug 28, 2021
826693b
executor: add is_learner field
IcePigZDB Aug 28, 2021
d81ccf8
planner: clear logic in parseUint64
IcePigZDB Sep 2, 2021
c881c52
planner: quantiles to uint64s and remove 3,4 in comment of IS_LEARNER
IcePigZDB Sep 14, 2021
361e891
planner: formate test
IcePigZDB Sep 22, 2021
f5e8925
executor: formate retriver
IcePigZDB Sep 22, 2021
6781bad
executor: add init of cancel
IcePigZDB Sep 22, 2021
160e34b
executor: formate
IcePigZDB Sep 23, 2021
039220c
Update executor/memtable_reader.go
IcePigZDB Oct 8, 2021
6debfe2
executor: simplfy code and move channel close to retrive func
IcePigZDB Oct 8, 2021
c41c11c
executor: move close channel to startRetriving
IcePigZDB Oct 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1523,6 +1523,14 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
extractor: v.Extractor.(*plannercore.ClusterLogTableExtractor),
},
}
case strings.ToLower(infoschema.TableTiDBHotRegionsHistory):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
retriever: &hotRegionsHistoryRetriver{
extractor: v.Extractor.(*plannercore.HotRegionsHistoryTableExtractor),
},
}
case strings.ToLower(infoschema.TableInspectionResult):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
Expand Down
320 changes: 320 additions & 0 deletions executor/memtable_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package executor

import (
"bytes"
"container/heap"
"context"
"encoding/json"
Expand All @@ -39,18 +40,22 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/set"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

const clusterLogBatchSize = 256
const hotRegionsHistoryBatchSize = 256

type dummyCloser struct{}

Expand Down Expand Up @@ -699,3 +704,318 @@ func (e *clusterLogRetriever) close() error {
func (e *clusterLogRetriever) getRuntimeStats() execdetails.RuntimeStats {
return nil
}

type hotRegionsStreamResult struct {
addr string
messages *HistoryHotRegions
err error
}

type hotRegionsResponseHeap []hotRegionsStreamResult

func (h hotRegionsResponseHeap) Len() int {
return len(h)
}

func (h hotRegionsResponseHeap) Less(i, j int) bool {
lhs, rhs := h[i].messages.HistoryHotRegion[0], h[j].messages.HistoryHotRegion[0]
if lhs.UpdateTime != rhs.UpdateTime {
return lhs.UpdateTime < rhs.UpdateTime
}
return lhs.HotDegree < rhs.HotDegree
}

func (h hotRegionsResponseHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *hotRegionsResponseHeap) Push(x interface{}) {
*h = append(*h, x.(hotRegionsStreamResult))
}

func (h *hotRegionsResponseHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

type hotRegionsHistoryRetriver struct {
isDrained bool
retrieving bool
heap *hotRegionsResponseHeap
extractor *plannercore.HotRegionsHistoryTableExtractor
cancel context.CancelFunc
}

// HistoryHotRegionsRequest wrap conditions push down to PD.
type HistoryHotRegionsRequest struct {
StartTime int64 `json:"start_time,omitempty"`
EndTime int64 `json:"end_time,omitempty"`
RegionIDs []uint64 `json:"region_ids,omitempty"`
StoreIDs []uint64 `json:"store_ids,omitempty"`
PeerIDs []uint64 `json:"peer_ids,omitempty"`
Roles []uint64 `json:"roles,omitempty"`
HotRegionTypes []string `json:"hot_region_types,omitempty"`
}

// HistoryHotRegions records filtered hot regions stored in each PD.
// it's the response of PD.
type HistoryHotRegions struct {
HistoryHotRegion []*HistoryHotRegion `json:"history_hot_region"`
}

// HistoryHotRegion records each hot region's statistics.
// it's the response of PD.
type HistoryHotRegion struct {
UpdateTime int64 `json:"update_time,omitempty"`
RegionID uint64 `json:"region_id,omitempty"`
StoreID uint64 `json:"store_id,omitempty"`
PeerID uint64 `json:"peer_id,omitempty"`
IsLeader bool `json:"is_leader,omitempty"`
HotRegionType string `json:"hot_region_type,omitempty"`
HotDegree int64 `json:"hot_degree,omitempty"`
FlowBytes float64 `json:"flow_bytes,omitempty"`
KeyRate float64 `json:"key_rate,omitempty"`
QueryRate float64 `json:"query_rate,omitempty"`
StartKey []byte `json:"start_key,omitempty"`
EndKey []byte `json:"end_key,omitempty"`
}

const (
// HotRegionTypeREAD hot read region.
HotRegionTypeREAD = "READ"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way of naming a variable doesn't meet the go style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to HotRegionTypeRead and HotRegionTypeWrite.

// HotRegionTypeWRITE hot write region.
HotRegionTypeWRITE = "WRITE"
)

func (e *hotRegionsHistoryRetriver) initialize(ctx context.Context, sctx sessionctx.Context) ([]chan hotRegionsStreamResult, error) {
if !hasPriv(sctx, mysql.ProcessPriv) {
return nil, plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
}
pdServers, err := infoschema.GetPDServerInfo(sctx)
if err != nil {
return nil, err
}

// To avoid search hot regions interface overload, the user should specify the time range in normally SQL.
if e.extractor.StartTime == 0 {
return nil, errors.New("denied to scan hot regions, please specified the start time, such as `update_time > '2020-01-01 00:00:00'`")
}
if e.extractor.EndTime == 0 {
return nil, errors.New("denied to scan hot regions, please specified the end time, such as `update_time < '2020-01-01 00:00:00'`")
}

// Divide read write into two request because of time range ovelap,
// because PD use [type,time] as key of hot regions.
if e.extractor.HotRegionTypes.Count() == 0 {
e.extractor.HotRegionTypes.Insert(HotRegionTypeREAD)
e.extractor.HotRegionTypes.Insert(HotRegionTypeWRITE)
}
hotRegionTypes := make([]string, 0, e.extractor.HotRegionTypes.Count())
for typ := range e.extractor.HotRegionTypes {
hotRegionTypes = append(hotRegionTypes, typ)
}
// set hotType before request
historyHotRegionsRequest := &HistoryHotRegionsRequest{
StartTime: e.extractor.StartTime,
EndTime: e.extractor.EndTime,
RegionIDs: e.extractor.RegionIDs,
StoreIDs: e.extractor.StoreIDs,
PeerIDs: e.extractor.PeerIDs,
Roles: e.extractor.Roles,
}

return e.startRetrieving(ctx, sctx, pdServers, historyHotRegionsRequest)
}

func (e *hotRegionsHistoryRetriver) startRetrieving(
ctx context.Context,
sctx sessionctx.Context,
serversInfo []infoschema.ServerInfo,
req *HistoryHotRegionsRequest,
) ([]chan hotRegionsStreamResult, error) {
var results []chan hotRegionsStreamResult
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seem we don't use stream here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to hotRegionsResult.

for _, srv := range serversInfo {
for typ := range e.extractor.HotRegionTypes {
req.HotRegionTypes = []string{typ}
jsonBody, err := json.Marshal(req)
if err != nil {
return nil, err
}
body := bytes.NewBuffer(jsonBody)
ch := make(chan hotRegionsStreamResult)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the channel need to be closed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add defer to close it.

ch := make(chan hotRegionsResult)
results = append(results, ch)
go func(ch chan hotRegionsResult, address string, body *bytes.Buffer) {
    util.WithRecovery(func() {
	defer close(ch)

results = append(results, ch)
go func(address string, body *bytes.Buffer) {
util.WithRecovery(func() {
url := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), address, pdapi.HotHistory)
req, err := http.NewRequest(http.MethodGet, url, body)
if err != nil {
ch <- hotRegionsStreamResult{err: errors.Trace(err)}
return
}
req.Header.Add("PD-Allow-follower-handle", "true")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we don't sync the hot region history among PDs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it should be false here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some hot regions history may lost if PD down, we have discussed this in design stage. The dafult reserve interval is 7 day, and we think the importance of hot region history is not worth synchronizing right now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider such a case that we have 3 PDs and it never changes the PD leader. If the request is sent to followers, the results here will be empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below code is used to get all PD servers, the results are merge in retrieve function

pdServers, err := infoschema.GetPDServerInfo(sctx)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, I missed the context of this line of code, it should be false~

resp, err := util.InternalHTTPClient().Do(req)
if err != nil {
ch <- hotRegionsStreamResult{err: errors.Trace(err)}
return
}
defer func() {
terror.Log(resp.Body.Close())
}()
if resp.StatusCode != http.StatusOK {
ch <- hotRegionsStreamResult{err: errors.Errorf("request %s failed: %s", url, resp.Status)}
return
}
var historyHotRegions HistoryHotRegions
if err = json.NewDecoder(resp.Body).Decode(&historyHotRegions); err != nil {
ch <- hotRegionsStreamResult{err: errors.Trace(err)}
return
}
ch <- hotRegionsStreamResult{addr: address, messages: &historyHotRegions}
}, nil)
}(srv.StatusAddr, body)
}
}
return results, nil
}

func (e *hotRegionsHistoryRetriver) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
if e.extractor.SkipRequest || e.isDrained {
return nil, nil
}

if !e.retrieving {
e.retrieving = true
results, err := e.initialize(ctx, sctx)
if err != nil {
e.isDrained = true
return nil, err
}
// Initialize the heap
e.heap = &hotRegionsResponseHeap{}
for _, ch := range results {
result := <-ch
if result.err != nil || len(result.messages.HistoryHotRegion) == 0 {
if result.err != nil {
sctx.GetSessionVars().StmtCtx.AppendWarning(result.err)
}
continue
}
*e.heap = append(*e.heap, result)
}
heap.Init(e.heap)
}
// Merge the results
var finalRows [][]types.Datum
allSchemas := sctx.GetInfoSchema().(infoschema.InfoSchema).AllSchemas()
tikvStore, ok := sctx.GetStore().(helper.Storage)
tz := sctx.GetSessionVars().Location()
if !ok {
return nil, errors.New("Information about hot region can be gotten only when the storage is TiKV")
}
tikvHelper := &helper.Helper{
Store: tikvStore,
RegionCache: tikvStore.GetRegionCache(),
}
for e.heap.Len() > 0 && len(finalRows) < hotRegionsHistoryBatchSize {
minTimeItem := heap.Pop(e.heap).(hotRegionsStreamResult)
row, err := e.getHotRegionRowWithSchemaInfo(minTimeItem.messages.HistoryHotRegion[0], tikvHelper, allSchemas, tz)
if err != nil {
return nil, err
}
if row != nil {
finalRows = append(finalRows, row)
}
minTimeItem.messages.HistoryHotRegion = minTimeItem.messages.HistoryHotRegion[1:]
// Fetch next message item
if len(minTimeItem.messages.HistoryHotRegion) != 0 {
heap.Push(e.heap, minTimeItem)
}
}
// All streams are drained
e.isDrained = e.heap.Len() == 0
return finalRows, nil
}

func (e *hotRegionsHistoryRetriver) getHotRegionRowWithSchemaInfo(
hisHotRegion *HistoryHotRegion,
tikvHelper *helper.Helper,
allSchemas []*model.DBInfo,
tz *time.Location,
) ([]types.Datum, error) {
_, startKey, _ := codec.DecodeBytes(hisHotRegion.StartKey, []byte{})
_, endKey, _ := codec.DecodeBytes(hisHotRegion.EndKey, []byte{})
region := &tikv.KeyLocation{StartKey: startKey, EndKey: endKey}
hotRange, err := helper.NewRegionFrameRange(region)
if err != nil {
return nil, err
}

f := tikvHelper.FindTableIndexOfRegion(allSchemas, hotRange)
// Ignore row without coresponding schema f.
IcePigZDB marked this conversation as resolved.
Show resolved Hide resolved
if f == nil {
return nil, nil
}
row := make([]types.Datum, len(infoschema.TableTiDBHotRegionsHistoryCols))
updateTimestamp := time.Unix(hisHotRegion.UpdateTime/1000, (hisHotRegion.UpdateTime%1000)*int64(time.Millisecond))

IcePigZDB marked this conversation as resolved.
Show resolved Hide resolved
if updateTimestamp.Location() != tz {
updateTimestamp.In(tz)
}
updateTime := types.NewTime(types.FromGoTime(updateTimestamp), mysql.TypeTimestamp, types.MinFsp)
row[0].SetMysqlTime(updateTime)
row[1].SetString(strings.ToUpper(f.DBName), mysql.DefaultCollationName)
row[2].SetString(strings.ToUpper(f.TableName), mysql.DefaultCollationName)
row[3].SetInt64(f.TableID)
if f.IndexName != "" {
row[4].SetString(strings.ToUpper(f.IndexName), mysql.DefaultCollationName)
row[5].SetInt64(f.IndexID)
} else {
row[4].SetNull()
row[5].SetNull()
}
row[6].SetInt64(int64(hisHotRegion.RegionID))
row[7].SetInt64(int64(hisHotRegion.StoreID))
row[8].SetInt64(int64(hisHotRegion.PeerID))
if hisHotRegion.IsLeader {
row[9].SetInt64(1)
} else {
row[9].SetInt64(0)
}

row[10].SetString(strings.ToUpper(hisHotRegion.HotRegionType), mysql.DefaultCollationName)
if hisHotRegion.HotDegree != 0 {
row[11].SetInt64(hisHotRegion.HotDegree)
} else {
row[11].SetNull()
}
if hisHotRegion.FlowBytes != 0 {
row[12].SetFloat64(float64(hisHotRegion.FlowBytes))
} else {
row[12].SetNull()
}
if hisHotRegion.KeyRate != 0 {
row[13].SetFloat64(float64(hisHotRegion.KeyRate))
} else {
row[13].SetNull()
}
if hisHotRegion.QueryRate != 0 {
row[14].SetFloat64(float64(hisHotRegion.QueryRate))
} else {
row[14].SetNull()
}
return row, nil
}

func (e *hotRegionsHistoryRetriver) close() error {
if e.cancel != nil {
e.cancel()
}
return nil
}

func (e *hotRegionsHistoryRetriver) getRuntimeStats() execdetails.RuntimeStats {
return nil
}
Loading