From e4bc75895f0aae5ce9a7ddc4890f2dba2588879a Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 18 Aug 2020 16:34:07 +0800 Subject: [PATCH 1/4] cherry pick #18916 to release-4.0 Signed-off-by: ti-srebot --- distsql/distsql_test.go | 6 -- distsql/select_result.go | 141 +++++++++++++++++++++++++-- distsql/select_result_test.go | 7 +- distsql/stream.go | 12 ++- kv/kv.go | 3 - planner/core/common_plans.go | 6 -- store/tikv/batch_coprocessor.go | 32 +++++- store/tikv/coprocessor.go | 27 ++--- store/tikv/region_request.go | 71 +++++++++++--- store/tikv/snapshot.go | 31 +++--- store/tikv/snapshot_test.go | 10 +- util/execdetails/execdetails.go | 70 +------------ util/execdetails/execdetails_test.go | 21 ---- 13 files changed, 269 insertions(+), 168 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 151b76e34713e..309997296067c 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tipb/go-tipb" @@ -429,11 +428,6 @@ func (r *mockResultSubset) GetData() []byte { return r.data } // GetStartKey implements kv.ResultSubset interface. func (r *mockResultSubset) GetStartKey() kv.Key { return nil } -// GetExecDetails implements kv.ResultSubset interface. -func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails { - return &execdetails.ExecDetails{} -} - // MemSize implements kv.ResultSubset interface. func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) } diff --git a/distsql/select_result.go b/distsql/select_result.go index 1fcbbe605dc82..6b91a8a80f42e 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -14,8 +14,11 @@ package distsql import ( + "bytes" "context" "fmt" + "sort" + "strconv" "sync/atomic" "time" @@ -26,6 +29,8 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" @@ -83,6 +88,8 @@ type selectResult struct { fetchDuration time.Duration durationReported bool memTracker *memory.Tracker + + stats *selectResultRuntimeStats } func (r *selectResult) Fetch(ctx context.Context) { @@ -130,14 +137,18 @@ func (r *selectResult) fetchResp(ctx context.Context) error { for _, warning := range r.selectResp.Warnings { sc.AppendWarning(terror.ClassTiKV.Synthesize(terror.ErrCode(warning.Code), warning.Msg)) } - resultDetail := resultSubset.GetExecDetails() - r.updateCopRuntimeStats(ctx, resultDetail, resultSubset.RespTime()) r.feedback.Update(resultSubset.GetStartKey(), r.selectResp.OutputCounts) r.partialCount++ - if resultDetail != nil { - resultDetail.CopTime = duration + + hasStats, ok := resultSubset.(CopRuntimeStats) + if ok { + copStats := hasStats.GetCopRuntimeStats() + if copStats != nil { + r.updateCopRuntimeStats(ctx, copStats, resultSubset.RespTime()) + copStats.CopTime = duration + sc.MergeExecDetails(&copStats.ExecDetails, nil) + } } - sc.MergeExecDetails(resultDetail, nil) if len(r.selectResp.Chunks) != 0 { break } @@ -233,8 +244,8 @@ func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) erro return nil } -func (r *selectResult) updateCopRuntimeStats(ctx context.Context, detail *execdetails.ExecDetails, respTime time.Duration) { - callee := detail.CalleeAddress +func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv.CopRuntimeStats, respTime time.Duration) { + callee := copStats.CalleeAddress if r.rootPlanID == nil || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" { return } @@ -245,8 +256,19 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, detail *execde return } + if r.stats == nil { + stmtCtx := r.ctx.GetSessionVars().StmtCtx + id := r.rootPlanID.String() + originRuntimeStats := stmtCtx.RuntimeStatsColl.GetRootStats(id) + r.stats = &selectResultRuntimeStats{ + RuntimeStats: originRuntimeStats, + backoffSleep: make(map[string]time.Duration), + rpcStat: tikv.NewRegionRequestRuntimeStats(), + } + r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, r.stats) + } + r.stats.mergeCopRuntimeStats(copStats, respTime) - r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneReaderStats(r.rootPlanID.String(), respTime, detail) for i, detail := range r.selectResp.GetExecutionSummaries() { if detail != nil && detail.TimeProcessedNs != nil && detail.NumProducedRows != nil && detail.NumIterations != nil { @@ -289,3 +311,106 @@ func (r *selectResult) Close() error { } return r.resp.Close() } + +// CopRuntimeStats is a interface uses to check whether the result has cop runtime stats. +type CopRuntimeStats interface { + // GetCopRuntimeStats gets the cop runtime stats information. + GetCopRuntimeStats() *tikv.CopRuntimeStats +} + +type selectResultRuntimeStats struct { + execdetails.RuntimeStats + copRespTime []time.Duration + procKeys []int64 + backoffSleep map[string]time.Duration + totalProcessTime time.Duration + totalWaitTime time.Duration + rpcStat tikv.RegionRequestRuntimeStats +} + +func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntimeStats, respTime time.Duration) { + s.copRespTime = append(s.copRespTime, respTime) + s.procKeys = append(s.procKeys, copStats.ProcessedKeys) + + for k, v := range copStats.BackoffSleep { + s.backoffSleep[k] += v + } + s.totalProcessTime += copStats.ProcessTime + s.totalWaitTime += copStats.WaitTime + s.rpcStat.Merge(copStats.RegionRequestRuntimeStats) +} + +func (s *selectResultRuntimeStats) String() string { + buf := bytes.NewBuffer(nil) + if s.RuntimeStats != nil { + buf.WriteString(s.RuntimeStats.String()) + } + if len(s.copRespTime) > 0 { + size := len(s.copRespTime) + buf.WriteString(", ") + if size == 1 { + buf.WriteString(fmt.Sprintf("cop_task: {num: 1, max:%v, proc_keys: %v", s.copRespTime[0], s.procKeys[0])) + } else { + sort.Slice(s.copRespTime, func(i, j int) bool { + return s.copRespTime[i] < s.copRespTime[j] + }) + vMax, vMin := s.copRespTime[size-1], s.copRespTime[0] + vP95 := s.copRespTime[size*19/20] + sum := 0.0 + for _, t := range s.copRespTime { + sum += float64(t) + } + vAvg := time.Duration(sum / float64(size)) + + sort.Slice(s.procKeys, func(i, j int) bool { + return s.procKeys[i] < s.procKeys[j] + }) + keyMax := s.procKeys[size-1] + keyP95 := s.procKeys[size*19/20] + buf.WriteString(fmt.Sprintf("cop_task: {num: %v, max: %v, min: %v, avg: %v, p95: %v", size, vMax, vMin, vAvg, vP95)) + if keyMax > 0 { + buf.WriteString(", max_proc_keys: ") + buf.WriteString(strconv.FormatInt(keyMax, 10)) + buf.WriteString(", p95_proc_keys: ") + buf.WriteString(strconv.FormatInt(keyP95, 10)) + } + if s.totalProcessTime > 0 { + buf.WriteString(", tot_proc: ") + buf.WriteString(s.totalProcessTime.String()) + if s.totalWaitTime > 0 { + buf.WriteString(", tot_wait: ") + buf.WriteString(s.totalWaitTime.String()) + } + } + } + } + copRPC := s.rpcStat.Stats[tikvrpc.CmdCop] + delete(s.rpcStat.Stats, tikvrpc.CmdCop) + if copRPC.Count > 0 { + buf.WriteString(", rpc_num: ") + buf.WriteString(strconv.FormatInt(copRPC.Count, 10)) + buf.WriteString(", rpc_time: ") + buf.WriteString(time.Duration(copRPC.Consume).String()) + } + buf.WriteString("}") + + rpcStatsStr := s.rpcStat.String() + if len(rpcStatsStr) > 0 { + buf.WriteString(", ") + buf.WriteString(rpcStatsStr) + } + + if len(s.backoffSleep) > 0 { + buf.WriteString(", backoff{") + idx := 0 + for k, d := range s.backoffSleep { + if idx > 0 { + buf.WriteString(", ") + } + idx++ + buf.WriteString(fmt.Sprintf("%s: %s", k, d.String())) + } + buf.WriteString("}") + } + return buf.String() +} diff --git a/distsql/select_result_test.go b/distsql/select_result_test.go index 7ac21151f7205..0f470aec309da 100644 --- a/distsql/select_result_test.go +++ b/distsql/select_result_test.go @@ -19,6 +19,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tipb/go-tipb" @@ -30,7 +31,7 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) { sr := selectResult{ctx: ctx} c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil) sr.rootPlanID = copPlan{} - sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "a"}, 0) + sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0) ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl() t := uint64(1) @@ -40,13 +41,13 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) { }, } c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue) - sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "callee"}, 0) + sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0) c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats("callee"), IsFalse) sr.copPlanIDs = []fmt.Stringer{copPlan{}} c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil) c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs)) - sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "callee"}, 0) + sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0) c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats("callee").String(), Equals, "time:1ns, loops:1") } diff --git a/distsql/stream.go b/distsql/stream.go index 934eb28e16885..f1817084cdf44 100644 --- a/distsql/stream.go +++ b/distsql/stream.go @@ -106,11 +106,15 @@ func (r *streamResult) readDataFromResponse(ctx context.Context, resp kv.Respons } r.feedback.Update(resultSubset.GetStartKey(), stream.OutputCounts) r.partialCount++ - resultDetail := resultSubset.GetExecDetails() - if resultDetail != nil { - resultDetail.CopTime = duration + + hasStats, ok := resultSubset.(CopRuntimeStats) + if ok { + copStats := hasStats.GetCopRuntimeStats() + if copStats != nil { + copStats.CopTime = duration + r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(&copStats.ExecDetails, nil) + } } - r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(resultDetail, nil) return false, nil } diff --git a/kv/kv.go b/kv/kv.go index 07af8ec6903d5..5bce2d7a7e30b 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/store/tikv/oracle" - "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" ) @@ -339,8 +338,6 @@ type ResultSubset interface { GetData() []byte // GetStartKey gets the start key. GetStartKey() Key - // GetExecDetails gets the detail information. - GetExecDetails() *execdetails.ExecDetails // MemSize returns how many bytes of memory this result use for tracing memory usage. MemSize() int64 // RespTime returns the response time for the request. diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index aa709c5d51a66..da75e927c719d 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -998,12 +998,6 @@ func getRuntimeInfo(ctx sessionctx.Context, p Plan) (actRows, analyzeInfo, memor analyzeInfo = "time:0ns, loops:0" actRows = "0" } - switch p.(type) { - case *PhysicalTableReader, *PhysicalIndexReader, *PhysicalIndexLookUpReader: - if s := runtimeStatsColl.GetReaderStats(explainID); s != nil && len(s.String()) > 0 { - analyzeInfo += ", " + s.String() - } - } memoryInfo = "N/A" memTracker := ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String()) diff --git a/store/tikv/batch_coprocessor.go b/store/tikv/batch_coprocessor.go index a4530c2521c9d..b8a773fdfa71e 100644 --- a/store/tikv/batch_coprocessor.go +++ b/store/tikv/batch_coprocessor.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" - "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" @@ -42,7 +41,7 @@ type batchCopTask struct { type batchCopResponse struct { pbResp *coprocessor.BatchResponse - detail *execdetails.ExecDetails + detail *CopRuntimeStats // batch Cop Response is yet to return startKey. So batchCop cannot retry partially. startKey kv.Key @@ -63,8 +62,13 @@ func (rs *batchCopResponse) GetStartKey() kv.Key { // GetExecDetails is unavailable currently, because TiFlash has not collected exec details for batch cop. // TODO: Will fix in near future. +<<<<<<< HEAD func (rs *batchCopResponse) GetExecDetails() *execdetails.ExecDetails { return &execdetails.ExecDetails{} +======= +func (rs *batchCopResponse) GetCopRuntimeStats() *CopRuntimeStats { + return rs.detail +>>>>>>> ea3da25... *: record more rpc runtime information in cop runtime stats (#18916) } // MemSize returns how many bytes of memory this response use @@ -77,9 +81,6 @@ func (rs *batchCopResponse) MemSize() int64 { rs.respSize += int64(cap(rs.startKey)) if rs.detail != nil { rs.respSize += int64(sizeofExecDetails) - if rs.detail.CommitDetail != nil { - rs.respSize += int64(sizeofCommitDetails) - } } if rs.pbResp != nil { // Using a approximate size since it's hard to get a accurate value. @@ -304,7 +305,11 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task * for idx := 0; idx < len(tasks); idx++ { ret, err := b.handleTaskOnce(ctx, bo, tasks[idx]) if err != nil { +<<<<<<< HEAD resp := &batchCopResponse{err: errors.Trace(err)} +======= + resp := &batchCopResponse{err: errors.Trace(err), detail: new(CopRuntimeStats)} +>>>>>>> ea3da25... *: record more rpc runtime information in cop runtime stats (#18916) b.sendToRespCh(resp) break } @@ -415,7 +420,24 @@ func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *copro b.sendToRespCh(&batchCopResponse{ pbResp: response, +<<<<<<< HEAD }) +======= + detail: new(CopRuntimeStats), + } + + resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond + resp.detail.BackoffSleep = make(map[string]time.Duration, len(bo.backoffTimes)) + resp.detail.BackoffTimes = make(map[string]int, len(bo.backoffTimes)) + for backoff := range bo.backoffTimes { + backoffName := backoff.String() + resp.detail.BackoffTimes[backoffName] = bo.backoffTimes[backoff] + resp.detail.BackoffSleep[backoffName] = time.Duration(bo.backoffSleepMS[backoff]) * time.Millisecond + } + resp.detail.CalleeAddress = task.storeAddr + + b.sendToRespCh(&resp) +>>>>>>> ea3da25... *: record more rpc runtime information in cop runtime stats (#18916) return } diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 8a936ae58197e..69e3d18a3cb11 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -433,7 +433,7 @@ type copIteratorTaskSender struct { type copResponse struct { pbResp *coprocessor.Response - detail *execdetails.ExecDetails + detail *CopRuntimeStats startKey kv.Key err error respSize int64 @@ -455,7 +455,7 @@ func (rs *copResponse) GetStartKey() kv.Key { return rs.startKey } -func (rs *copResponse) GetExecDetails() *execdetails.ExecDetails { +func (rs *copResponse) GetCopRuntimeStats() *CopRuntimeStats { return rs.detail } @@ -469,9 +469,6 @@ func (rs *copResponse) MemSize() int64 { rs.respSize += int64(cap(rs.startKey)) if rs.detail != nil { rs.respSize += int64(sizeofExecDetails) - if rs.detail.CommitDetail != nil { - rs.respSize += int64(sizeofCommitDetails) - } } if rs.pbResp != nil { // Using a approximate size since it's hard to get a accurate value. @@ -775,6 +772,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch }) req.StoreTp = task.storeType startTime := time.Now() + worker.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, ReadTimeoutMedium, task.storeType, task.storeAddr) if err != nil { if task.storeType == kv.TiDB { @@ -840,7 +838,7 @@ type clientHelper struct { *minCommitTSPushed Client resolveLite bool - stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats + RegionRequestRuntimeStats } // ResolveLocks wraps the ResolveLocks function and store the resolved result. @@ -848,9 +846,9 @@ func (ch *clientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks var err error var resolvedLocks []uint64 var msBeforeTxnExpired int64 - if ch.stats != nil { + if ch.Stats != nil { defer func(start time.Time) { - recordRegionRequestRuntimeStats(ch.stats, tikvrpc.CmdResolveLock, time.Since(start)) + recordRegionRequestRuntimeStats(ch.Stats, tikvrpc.CmdResolveLock, time.Since(start)) }(time.Now()) } if ch.resolveLite { @@ -874,7 +872,7 @@ func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID if len(directStoreAddr) > 0 { sender.storeAddr = directStoreAddr } - sender.stats = ch.stats + sender.Stats = ch.Stats req.Context.ResolvedLocks = ch.minCommitTSPushed.Get() resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType) return resp, ctx, sender.storeAddr, err @@ -1023,8 +1021,9 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon resp.startKey = task.ranges.at(0).StartKey } if resp.detail == nil { - resp.detail = new(execdetails.ExecDetails) + resp.detail = new(CopRuntimeStats) } + resp.detail.Stats = worker.Stats resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond resp.detail.BackoffSleep = make(map[string]time.Duration, len(bo.backoffTimes)) resp.detail.BackoffTimes = make(map[string]int, len(bo.backoffTimes)) @@ -1078,6 +1077,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon return nil, nil } +// CopRuntimeStats contains execution detail information. +type CopRuntimeStats struct { + execdetails.ExecDetails + RegionRequestRuntimeStats +} + func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error { errCode := errno.ErrUnknown errMsg := err.Error() @@ -1101,7 +1106,7 @@ func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, pbResp: &coprocessor.Response{ Data: data, }, - detail: &execdetails.ExecDetails{}, + detail: &CopRuntimeStats{}, } worker.sendToRespCh(resp, ch, true) return nil diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 4e4a3bab710e5..0463a6bff6d77 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -14,7 +14,9 @@ package tikv import ( + "bytes" "context" + "fmt" "strconv" "sync" "sync/atomic" @@ -61,14 +63,54 @@ type RegionRequestSender struct { storeAddr string rpcError error failStoreIDs map[uint64]struct{} - stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats + RegionRequestRuntimeStats } // RegionRequestRuntimeStats records the runtime stats of send region requests. type RegionRequestRuntimeStats struct { - count int64 + Stats map[tikvrpc.CmdType]*RPCRuntimeStats +} + +// NewRegionRequestRuntimeStats returns a new RegionRequestRuntimeStats. +func NewRegionRequestRuntimeStats() RegionRequestRuntimeStats { + return RegionRequestRuntimeStats{ + Stats: make(map[tikvrpc.CmdType]*RPCRuntimeStats), + } +} + +// RPCRuntimeStats indicates the RPC request count and consume time. +type RPCRuntimeStats struct { + Count int64 // Send region request consume time. - consume int64 + Consume int64 +} + +// String implements fmt.Stringer interface. +func (r *RegionRequestRuntimeStats) String() string { + var buf bytes.Buffer + for k, v := range r.Stats { + if buf.Len() > 0 { + buf.WriteByte(',') + } + buf.WriteString(fmt.Sprintf("%s:{num_rpc:%d, total_time:%s}", k.String(), v.Count, time.Duration(v.Consume))) + } + return buf.String() +} + +// Merge merges other RegionRequestRuntimeStats. +func (r *RegionRequestRuntimeStats) Merge(rs RegionRequestRuntimeStats) { + for cmd, v := range rs.Stats { + stat, ok := r.Stats[cmd] + if !ok { + r.Stats[cmd] = &RPCRuntimeStats{ + Count: v.Count, + Consume: v.Consume, + } + continue + } + stat.Count += v.Count + stat.Consume += v.Consume + } } // RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way. @@ -92,9 +134,9 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []co if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil { ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx) } - if ss.stats != nil { + if ss.Stats != nil { defer func(start time.Time) { - recordRegionRequestRuntimeStats(ss.stats, req.Type, time.Since(start)) + recordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start)) }(time.Now()) } resp, err = ss.client.SendRequest(ctx, rpcCtx.Addr, req, timout) @@ -111,17 +153,17 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []co return } -func recordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { +func recordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { stat, ok := stats[cmd] if !ok { - stats[cmd] = &RegionRequestRuntimeStats{ - count: 1, - consume: int64(d), + stats[cmd] = &RPCRuntimeStats{ + Count: 1, + Consume: int64(d), } return } - stat.count++ - stat.consume += int64(d) + stat.Count++ + stat.Consume += int64(d) } func (ss *RegionBatchRequestSender) onSendFail(bo *Backoffer, ctxs []copTaskAndRPCContext, err error) error { @@ -344,9 +386,14 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, rpcCtx *RPCContext, } defer s.releaseStoreToken(rpcCtx.Store) } +<<<<<<< HEAD if s.stats != nil { +======= + + if s.Stats != nil { +>>>>>>> ea3da25... *: record more rpc runtime information in cop runtime stats (#18916) defer func(start time.Time) { - recordRegionRequestRuntimeStats(s.stats, req.Type, time.Since(start)) + recordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start)) }(time.Now()) } ctx := bo.ctx diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 1b812392b1426..f78a9720fecf0 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -236,9 +236,9 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll Client: s.store.client, } if s.mu.stats != nil { - cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) + cli.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) defer func() { - s.mergeRegionRequestStats(cli.stats) + s.mergeRegionRequestStats(cli.Stats) }() } @@ -364,9 +364,9 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { resolveLite: true, } if s.mu.stats != nil { - cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) + cli.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) defer func() { - s.mergeRegionRequestStats(cli.stats) + s.mergeRegionRequestStats(cli.Stats) }() } @@ -591,30 +591,30 @@ func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) { } } -func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats) { +func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats) { s.mu.Lock() defer s.mu.Unlock() if s.mu.stats == nil { return } - if s.mu.stats.rpcStats == nil { - s.mu.stats.rpcStats = stats + if s.mu.stats.rpcStats.Stats == nil { + s.mu.stats.rpcStats.Stats = stats return } for k, v := range stats { - stat, ok := s.mu.stats.rpcStats[k] + stat, ok := s.mu.stats.rpcStats.Stats[k] if !ok { - s.mu.stats.rpcStats[k] = v + s.mu.stats.rpcStats.Stats[k] = v continue } - stat.count += v.count - stat.consume += v.consume + stat.Count += v.Count + stat.Consume += v.Consume } } // SnapshotRuntimeStats records the runtime stats of snapshot. type SnapshotRuntimeStats struct { - rpcStats map[tikvrpc.CmdType]*RegionRequestRuntimeStats + rpcStats RegionRequestRuntimeStats backoffSleepMS map[backoffType]int backoffTimes map[backoffType]int } @@ -622,12 +622,7 @@ type SnapshotRuntimeStats struct { // String implements fmt.Stringer interface. func (rs *SnapshotRuntimeStats) String() string { var buf bytes.Buffer - for k, v := range rs.rpcStats { - if buf.Len() > 0 { - buf.WriteByte(',') - } - buf.WriteString(fmt.Sprintf("%s:{num_rpc:%d, total_time:%s}", k.String(), v.count, time.Duration(v.consume))) - } + buf.WriteString(rs.rpcStats.String()) for k, v := range rs.backoffTimes { if buf.Len() > 0 { buf.WriteByte(',') diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 36fa86645ba9f..518ef35925951 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -303,13 +303,13 @@ func (s *testSnapshotSuite) TestSnapshotThreadSafe(c *C) { } func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) { - reqStats := make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) - recordRegionRequestRuntimeStats(reqStats, tikvrpc.CmdGet, time.Second) - recordRegionRequestRuntimeStats(reqStats, tikvrpc.CmdGet, time.Millisecond) + reqStats := NewRegionRequestRuntimeStats() + recordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Second) + recordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Millisecond) snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: 0}, 0) snapshot.SetOption(kv.CollectRuntimeStats, &SnapshotRuntimeStats{}) - snapshot.mergeRegionRequestStats(reqStats) - snapshot.mergeRegionRequestStats(reqStats) + snapshot.mergeRegionRequestStats(reqStats.Stats) + snapshot.mergeRegionRequestStats(reqStats.Stats) bo := NewBackofferWithVars(context.Background(), 2000, nil) err := bo.BackoffWithMaxSleep(boTxnLockFast, 30, errors.New("test")) c.Assert(err, IsNil) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index ddedcfaf8fef3..7530fe7889d4f 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -309,49 +309,6 @@ func (crs *CopRuntimeStats) String() string { procTimes[n-1], procTimes[0], procTimes[n*4/5], procTimes[n*19/20], totalIters, totalTasks) } -// ReaderRuntimeStats collects stats for TableReader, IndexReader and IndexLookupReader -type ReaderRuntimeStats struct { - sync.Mutex - - copRespTime []time.Duration - procKeys []int64 -} - -// recordOneCopTask record once cop response time to update maxcopRespTime -func (rrs *ReaderRuntimeStats) recordOneCopTask(t time.Duration, detail *ExecDetails) { - rrs.Lock() - defer rrs.Unlock() - rrs.copRespTime = append(rrs.copRespTime, t) - rrs.procKeys = append(rrs.procKeys, detail.ProcessedKeys) -} - -func (rrs *ReaderRuntimeStats) String() string { - size := len(rrs.copRespTime) - if size == 0 { - return "" - } - if size == 1 { - return fmt.Sprintf("rpc num: 1, rpc time:%v, proc keys:%v", rrs.copRespTime[0], rrs.procKeys[0]) - } - sort.Slice(rrs.copRespTime, func(i, j int) bool { - return rrs.copRespTime[i] < rrs.copRespTime[j] - }) - vMax, vMin := rrs.copRespTime[size-1], rrs.copRespTime[0] - vP80, vP95 := rrs.copRespTime[size*4/5], rrs.copRespTime[size*19/20] - sum := 0.0 - for _, t := range rrs.copRespTime { - sum += float64(t) - } - vAvg := time.Duration(sum / float64(size)) - - sort.Slice(rrs.procKeys, func(i, j int) bool { - return rrs.procKeys[i] < rrs.procKeys[j] - }) - keyMax := rrs.procKeys[size-1] - keyP95 := rrs.procKeys[size*19/20] - return fmt.Sprintf("rpc num: %v, rpc max:%v, min:%v, avg:%v, p80:%v, p95:%v, proc keys max:%v, p95:%v", size, vMax, vMin, vAvg, vP80, vP95, keyMax, keyP95) -} - // RuntimeStats is used to express the executor runtime information. type RuntimeStats interface { GetActRows() int64 @@ -392,16 +349,15 @@ func (e *BasicRuntimeStats) String() string { // RuntimeStatsColl collects executors's execution info. type RuntimeStatsColl struct { - mu sync.Mutex - rootStats map[string]RuntimeStats - copStats map[string]*CopRuntimeStats - readerStats map[string]*ReaderRuntimeStats + mu sync.Mutex + rootStats map[string]RuntimeStats + copStats map[string]*CopRuntimeStats } // NewRuntimeStatsColl creates new executor collector. func NewRuntimeStatsColl() *RuntimeStatsColl { return &RuntimeStatsColl{rootStats: make(map[string]RuntimeStats), - copStats: make(map[string]*CopRuntimeStats), readerStats: make(map[string]*ReaderRuntimeStats)} + copStats: make(map[string]*CopRuntimeStats)} } // RegisterStats register execStat for a executor. @@ -441,12 +397,6 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID, address string, summary *tip copStats.RecordOneCopTask(address, summary) } -// RecordOneReaderStats records a specific stats for TableReader, IndexReader and IndexLookupReader. -func (e *RuntimeStatsColl) RecordOneReaderStats(planID string, copRespTime time.Duration, detail *ExecDetails) { - readerStats := e.GetReaderStats(planID) - readerStats.recordOneCopTask(copRespTime, detail) -} - // ExistsRootStats checks if the planID exists in the rootStats collection. func (e *RuntimeStatsColl) ExistsRootStats(planID string) bool { e.mu.Lock() @@ -463,18 +413,6 @@ func (e *RuntimeStatsColl) ExistsCopStats(planID string) bool { return exists } -// GetReaderStats gets the ReaderRuntimeStats specified by planID. -func (e *RuntimeStatsColl) GetReaderStats(planID string) *ReaderRuntimeStats { - e.mu.Lock() - defer e.mu.Unlock() - stats, exists := e.readerStats[planID] - if !exists { - stats = &ReaderRuntimeStats{copRespTime: make([]time.Duration, 0, 20)} - e.readerStats[planID] = stats - } - return stats -} - // ConcurrencyInfo is used to save the concurrency information of the executor operator type ConcurrencyInfo struct { concurrencyName string diff --git a/util/execdetails/execdetails_test.go b/util/execdetails/execdetails_test.go index 4805ee672c0df..616b262990f4a 100644 --- a/util/execdetails/execdetails_test.go +++ b/util/execdetails/execdetails_test.go @@ -111,24 +111,3 @@ func TestCopRuntimeStats(t *testing.T) { t.Fatal("table_reader not exists") } } - -func TestReaderStats(t *testing.T) { - r := new(ReaderRuntimeStats) - if r.String() != "" { - t.Fatal() - } - - r.procKeys = append(r.procKeys, 100) - r.copRespTime = append(r.copRespTime, time.Millisecond*100) - if r.String() != "rpc num: 1, rpc time:100ms, proc keys:100" { - t.Fatal() - } - - for i := 0; i < 100; i++ { - r.procKeys = append(r.procKeys, int64(i)) - r.copRespTime = append(r.copRespTime, time.Millisecond*time.Duration(i)) - } - if r.String() != "rpc num: 101, rpc max:100ms, min:0s, avg:50ms, p80:80ms, p95:95ms, proc keys max:100, p95:95" { - t.Fatal() - } -} From b046ee77fe7e0fdeaf773d91c051ab23986bde2e Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 2 Sep 2020 10:35:11 +0800 Subject: [PATCH 2/4] Revert "cherry pick #18916 to release-4.0" This reverts commit e4bc75895f0aae5ce9a7ddc4890f2dba2588879a. --- distsql/distsql_test.go | 6 ++ distsql/select_result.go | 141 ++------------------------- distsql/select_result_test.go | 7 +- distsql/stream.go | 12 +-- kv/kv.go | 3 + planner/core/common_plans.go | 6 ++ store/tikv/batch_coprocessor.go | 32 +----- store/tikv/coprocessor.go | 27 +++-- store/tikv/region_request.go | 71 +++----------- store/tikv/snapshot.go | 31 +++--- store/tikv/snapshot_test.go | 10 +- util/execdetails/execdetails.go | 70 ++++++++++++- util/execdetails/execdetails_test.go | 21 ++++ 13 files changed, 168 insertions(+), 269 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 309997296067c..151b76e34713e 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tipb/go-tipb" @@ -428,6 +429,11 @@ func (r *mockResultSubset) GetData() []byte { return r.data } // GetStartKey implements kv.ResultSubset interface. func (r *mockResultSubset) GetStartKey() kv.Key { return nil } +// GetExecDetails implements kv.ResultSubset interface. +func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails { + return &execdetails.ExecDetails{} +} + // MemSize implements kv.ResultSubset interface. func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) } diff --git a/distsql/select_result.go b/distsql/select_result.go index 6b91a8a80f42e..1fcbbe605dc82 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -14,11 +14,8 @@ package distsql import ( - "bytes" "context" "fmt" - "sort" - "strconv" "sync/atomic" "time" @@ -29,8 +26,6 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" - "github.com/pingcap/tidb/store/tikv" - "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" @@ -88,8 +83,6 @@ type selectResult struct { fetchDuration time.Duration durationReported bool memTracker *memory.Tracker - - stats *selectResultRuntimeStats } func (r *selectResult) Fetch(ctx context.Context) { @@ -137,18 +130,14 @@ func (r *selectResult) fetchResp(ctx context.Context) error { for _, warning := range r.selectResp.Warnings { sc.AppendWarning(terror.ClassTiKV.Synthesize(terror.ErrCode(warning.Code), warning.Msg)) } + resultDetail := resultSubset.GetExecDetails() + r.updateCopRuntimeStats(ctx, resultDetail, resultSubset.RespTime()) r.feedback.Update(resultSubset.GetStartKey(), r.selectResp.OutputCounts) r.partialCount++ - - hasStats, ok := resultSubset.(CopRuntimeStats) - if ok { - copStats := hasStats.GetCopRuntimeStats() - if copStats != nil { - r.updateCopRuntimeStats(ctx, copStats, resultSubset.RespTime()) - copStats.CopTime = duration - sc.MergeExecDetails(&copStats.ExecDetails, nil) - } + if resultDetail != nil { + resultDetail.CopTime = duration } + sc.MergeExecDetails(resultDetail, nil) if len(r.selectResp.Chunks) != 0 { break } @@ -244,8 +233,8 @@ func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) erro return nil } -func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv.CopRuntimeStats, respTime time.Duration) { - callee := copStats.CalleeAddress +func (r *selectResult) updateCopRuntimeStats(ctx context.Context, detail *execdetails.ExecDetails, respTime time.Duration) { + callee := detail.CalleeAddress if r.rootPlanID == nil || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" { return } @@ -256,19 +245,8 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv return } - if r.stats == nil { - stmtCtx := r.ctx.GetSessionVars().StmtCtx - id := r.rootPlanID.String() - originRuntimeStats := stmtCtx.RuntimeStatsColl.GetRootStats(id) - r.stats = &selectResultRuntimeStats{ - RuntimeStats: originRuntimeStats, - backoffSleep: make(map[string]time.Duration), - rpcStat: tikv.NewRegionRequestRuntimeStats(), - } - r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, r.stats) - } - r.stats.mergeCopRuntimeStats(copStats, respTime) + r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneReaderStats(r.rootPlanID.String(), respTime, detail) for i, detail := range r.selectResp.GetExecutionSummaries() { if detail != nil && detail.TimeProcessedNs != nil && detail.NumProducedRows != nil && detail.NumIterations != nil { @@ -311,106 +289,3 @@ func (r *selectResult) Close() error { } return r.resp.Close() } - -// CopRuntimeStats is a interface uses to check whether the result has cop runtime stats. -type CopRuntimeStats interface { - // GetCopRuntimeStats gets the cop runtime stats information. - GetCopRuntimeStats() *tikv.CopRuntimeStats -} - -type selectResultRuntimeStats struct { - execdetails.RuntimeStats - copRespTime []time.Duration - procKeys []int64 - backoffSleep map[string]time.Duration - totalProcessTime time.Duration - totalWaitTime time.Duration - rpcStat tikv.RegionRequestRuntimeStats -} - -func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntimeStats, respTime time.Duration) { - s.copRespTime = append(s.copRespTime, respTime) - s.procKeys = append(s.procKeys, copStats.ProcessedKeys) - - for k, v := range copStats.BackoffSleep { - s.backoffSleep[k] += v - } - s.totalProcessTime += copStats.ProcessTime - s.totalWaitTime += copStats.WaitTime - s.rpcStat.Merge(copStats.RegionRequestRuntimeStats) -} - -func (s *selectResultRuntimeStats) String() string { - buf := bytes.NewBuffer(nil) - if s.RuntimeStats != nil { - buf.WriteString(s.RuntimeStats.String()) - } - if len(s.copRespTime) > 0 { - size := len(s.copRespTime) - buf.WriteString(", ") - if size == 1 { - buf.WriteString(fmt.Sprintf("cop_task: {num: 1, max:%v, proc_keys: %v", s.copRespTime[0], s.procKeys[0])) - } else { - sort.Slice(s.copRespTime, func(i, j int) bool { - return s.copRespTime[i] < s.copRespTime[j] - }) - vMax, vMin := s.copRespTime[size-1], s.copRespTime[0] - vP95 := s.copRespTime[size*19/20] - sum := 0.0 - for _, t := range s.copRespTime { - sum += float64(t) - } - vAvg := time.Duration(sum / float64(size)) - - sort.Slice(s.procKeys, func(i, j int) bool { - return s.procKeys[i] < s.procKeys[j] - }) - keyMax := s.procKeys[size-1] - keyP95 := s.procKeys[size*19/20] - buf.WriteString(fmt.Sprintf("cop_task: {num: %v, max: %v, min: %v, avg: %v, p95: %v", size, vMax, vMin, vAvg, vP95)) - if keyMax > 0 { - buf.WriteString(", max_proc_keys: ") - buf.WriteString(strconv.FormatInt(keyMax, 10)) - buf.WriteString(", p95_proc_keys: ") - buf.WriteString(strconv.FormatInt(keyP95, 10)) - } - if s.totalProcessTime > 0 { - buf.WriteString(", tot_proc: ") - buf.WriteString(s.totalProcessTime.String()) - if s.totalWaitTime > 0 { - buf.WriteString(", tot_wait: ") - buf.WriteString(s.totalWaitTime.String()) - } - } - } - } - copRPC := s.rpcStat.Stats[tikvrpc.CmdCop] - delete(s.rpcStat.Stats, tikvrpc.CmdCop) - if copRPC.Count > 0 { - buf.WriteString(", rpc_num: ") - buf.WriteString(strconv.FormatInt(copRPC.Count, 10)) - buf.WriteString(", rpc_time: ") - buf.WriteString(time.Duration(copRPC.Consume).String()) - } - buf.WriteString("}") - - rpcStatsStr := s.rpcStat.String() - if len(rpcStatsStr) > 0 { - buf.WriteString(", ") - buf.WriteString(rpcStatsStr) - } - - if len(s.backoffSleep) > 0 { - buf.WriteString(", backoff{") - idx := 0 - for k, d := range s.backoffSleep { - if idx > 0 { - buf.WriteString(", ") - } - idx++ - buf.WriteString(fmt.Sprintf("%s: %s", k, d.String())) - } - buf.WriteString("}") - } - return buf.String() -} diff --git a/distsql/select_result_test.go b/distsql/select_result_test.go index 0f470aec309da..7ac21151f7205 100644 --- a/distsql/select_result_test.go +++ b/distsql/select_result_test.go @@ -19,7 +19,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tipb/go-tipb" @@ -31,7 +30,7 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) { sr := selectResult{ctx: ctx} c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil) sr.rootPlanID = copPlan{} - sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0) + sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "a"}, 0) ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl() t := uint64(1) @@ -41,13 +40,13 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) { }, } c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue) - sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0) + sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "callee"}, 0) c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats("callee"), IsFalse) sr.copPlanIDs = []fmt.Stringer{copPlan{}} c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil) c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs)) - sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0) + sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "callee"}, 0) c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats("callee").String(), Equals, "time:1ns, loops:1") } diff --git a/distsql/stream.go b/distsql/stream.go index f1817084cdf44..934eb28e16885 100644 --- a/distsql/stream.go +++ b/distsql/stream.go @@ -106,15 +106,11 @@ func (r *streamResult) readDataFromResponse(ctx context.Context, resp kv.Respons } r.feedback.Update(resultSubset.GetStartKey(), stream.OutputCounts) r.partialCount++ - - hasStats, ok := resultSubset.(CopRuntimeStats) - if ok { - copStats := hasStats.GetCopRuntimeStats() - if copStats != nil { - copStats.CopTime = duration - r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(&copStats.ExecDetails, nil) - } + resultDetail := resultSubset.GetExecDetails() + if resultDetail != nil { + resultDetail.CopTime = duration } + r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(resultDetail, nil) return false, nil } diff --git a/kv/kv.go b/kv/kv.go index 5bce2d7a7e30b..07af8ec6903d5 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" ) @@ -338,6 +339,8 @@ type ResultSubset interface { GetData() []byte // GetStartKey gets the start key. GetStartKey() Key + // GetExecDetails gets the detail information. + GetExecDetails() *execdetails.ExecDetails // MemSize returns how many bytes of memory this result use for tracing memory usage. MemSize() int64 // RespTime returns the response time for the request. diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index da75e927c719d..aa709c5d51a66 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -998,6 +998,12 @@ func getRuntimeInfo(ctx sessionctx.Context, p Plan) (actRows, analyzeInfo, memor analyzeInfo = "time:0ns, loops:0" actRows = "0" } + switch p.(type) { + case *PhysicalTableReader, *PhysicalIndexReader, *PhysicalIndexLookUpReader: + if s := runtimeStatsColl.GetReaderStats(explainID); s != nil && len(s.String()) > 0 { + analyzeInfo += ", " + s.String() + } + } memoryInfo = "N/A" memTracker := ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String()) diff --git a/store/tikv/batch_coprocessor.go b/store/tikv/batch_coprocessor.go index b8a773fdfa71e..a4530c2521c9d 100644 --- a/store/tikv/batch_coprocessor.go +++ b/store/tikv/batch_coprocessor.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" @@ -41,7 +42,7 @@ type batchCopTask struct { type batchCopResponse struct { pbResp *coprocessor.BatchResponse - detail *CopRuntimeStats + detail *execdetails.ExecDetails // batch Cop Response is yet to return startKey. So batchCop cannot retry partially. startKey kv.Key @@ -62,13 +63,8 @@ func (rs *batchCopResponse) GetStartKey() kv.Key { // GetExecDetails is unavailable currently, because TiFlash has not collected exec details for batch cop. // TODO: Will fix in near future. -<<<<<<< HEAD func (rs *batchCopResponse) GetExecDetails() *execdetails.ExecDetails { return &execdetails.ExecDetails{} -======= -func (rs *batchCopResponse) GetCopRuntimeStats() *CopRuntimeStats { - return rs.detail ->>>>>>> ea3da25... *: record more rpc runtime information in cop runtime stats (#18916) } // MemSize returns how many bytes of memory this response use @@ -81,6 +77,9 @@ func (rs *batchCopResponse) MemSize() int64 { rs.respSize += int64(cap(rs.startKey)) if rs.detail != nil { rs.respSize += int64(sizeofExecDetails) + if rs.detail.CommitDetail != nil { + rs.respSize += int64(sizeofCommitDetails) + } } if rs.pbResp != nil { // Using a approximate size since it's hard to get a accurate value. @@ -305,11 +304,7 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task * for idx := 0; idx < len(tasks); idx++ { ret, err := b.handleTaskOnce(ctx, bo, tasks[idx]) if err != nil { -<<<<<<< HEAD resp := &batchCopResponse{err: errors.Trace(err)} -======= - resp := &batchCopResponse{err: errors.Trace(err), detail: new(CopRuntimeStats)} ->>>>>>> ea3da25... *: record more rpc runtime information in cop runtime stats (#18916) b.sendToRespCh(resp) break } @@ -420,24 +415,7 @@ func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *copro b.sendToRespCh(&batchCopResponse{ pbResp: response, -<<<<<<< HEAD }) -======= - detail: new(CopRuntimeStats), - } - - resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond - resp.detail.BackoffSleep = make(map[string]time.Duration, len(bo.backoffTimes)) - resp.detail.BackoffTimes = make(map[string]int, len(bo.backoffTimes)) - for backoff := range bo.backoffTimes { - backoffName := backoff.String() - resp.detail.BackoffTimes[backoffName] = bo.backoffTimes[backoff] - resp.detail.BackoffSleep[backoffName] = time.Duration(bo.backoffSleepMS[backoff]) * time.Millisecond - } - resp.detail.CalleeAddress = task.storeAddr - - b.sendToRespCh(&resp) ->>>>>>> ea3da25... *: record more rpc runtime information in cop runtime stats (#18916) return } diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 69e3d18a3cb11..8a936ae58197e 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -433,7 +433,7 @@ type copIteratorTaskSender struct { type copResponse struct { pbResp *coprocessor.Response - detail *CopRuntimeStats + detail *execdetails.ExecDetails startKey kv.Key err error respSize int64 @@ -455,7 +455,7 @@ func (rs *copResponse) GetStartKey() kv.Key { return rs.startKey } -func (rs *copResponse) GetCopRuntimeStats() *CopRuntimeStats { +func (rs *copResponse) GetExecDetails() *execdetails.ExecDetails { return rs.detail } @@ -469,6 +469,9 @@ func (rs *copResponse) MemSize() int64 { rs.respSize += int64(cap(rs.startKey)) if rs.detail != nil { rs.respSize += int64(sizeofExecDetails) + if rs.detail.CommitDetail != nil { + rs.respSize += int64(sizeofCommitDetails) + } } if rs.pbResp != nil { // Using a approximate size since it's hard to get a accurate value. @@ -772,7 +775,6 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch }) req.StoreTp = task.storeType startTime := time.Now() - worker.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, ReadTimeoutMedium, task.storeType, task.storeAddr) if err != nil { if task.storeType == kv.TiDB { @@ -838,7 +840,7 @@ type clientHelper struct { *minCommitTSPushed Client resolveLite bool - RegionRequestRuntimeStats + stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats } // ResolveLocks wraps the ResolveLocks function and store the resolved result. @@ -846,9 +848,9 @@ func (ch *clientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks var err error var resolvedLocks []uint64 var msBeforeTxnExpired int64 - if ch.Stats != nil { + if ch.stats != nil { defer func(start time.Time) { - recordRegionRequestRuntimeStats(ch.Stats, tikvrpc.CmdResolveLock, time.Since(start)) + recordRegionRequestRuntimeStats(ch.stats, tikvrpc.CmdResolveLock, time.Since(start)) }(time.Now()) } if ch.resolveLite { @@ -872,7 +874,7 @@ func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID if len(directStoreAddr) > 0 { sender.storeAddr = directStoreAddr } - sender.Stats = ch.Stats + sender.stats = ch.stats req.Context.ResolvedLocks = ch.minCommitTSPushed.Get() resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType) return resp, ctx, sender.storeAddr, err @@ -1021,9 +1023,8 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon resp.startKey = task.ranges.at(0).StartKey } if resp.detail == nil { - resp.detail = new(CopRuntimeStats) + resp.detail = new(execdetails.ExecDetails) } - resp.detail.Stats = worker.Stats resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond resp.detail.BackoffSleep = make(map[string]time.Duration, len(bo.backoffTimes)) resp.detail.BackoffTimes = make(map[string]int, len(bo.backoffTimes)) @@ -1077,12 +1078,6 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon return nil, nil } -// CopRuntimeStats contains execution detail information. -type CopRuntimeStats struct { - execdetails.ExecDetails - RegionRequestRuntimeStats -} - func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error { errCode := errno.ErrUnknown errMsg := err.Error() @@ -1106,7 +1101,7 @@ func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, pbResp: &coprocessor.Response{ Data: data, }, - detail: &CopRuntimeStats{}, + detail: &execdetails.ExecDetails{}, } worker.sendToRespCh(resp, ch, true) return nil diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 0463a6bff6d77..4e4a3bab710e5 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -14,9 +14,7 @@ package tikv import ( - "bytes" "context" - "fmt" "strconv" "sync" "sync/atomic" @@ -63,54 +61,14 @@ type RegionRequestSender struct { storeAddr string rpcError error failStoreIDs map[uint64]struct{} - RegionRequestRuntimeStats + stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats } // RegionRequestRuntimeStats records the runtime stats of send region requests. type RegionRequestRuntimeStats struct { - Stats map[tikvrpc.CmdType]*RPCRuntimeStats -} - -// NewRegionRequestRuntimeStats returns a new RegionRequestRuntimeStats. -func NewRegionRequestRuntimeStats() RegionRequestRuntimeStats { - return RegionRequestRuntimeStats{ - Stats: make(map[tikvrpc.CmdType]*RPCRuntimeStats), - } -} - -// RPCRuntimeStats indicates the RPC request count and consume time. -type RPCRuntimeStats struct { - Count int64 + count int64 // Send region request consume time. - Consume int64 -} - -// String implements fmt.Stringer interface. -func (r *RegionRequestRuntimeStats) String() string { - var buf bytes.Buffer - for k, v := range r.Stats { - if buf.Len() > 0 { - buf.WriteByte(',') - } - buf.WriteString(fmt.Sprintf("%s:{num_rpc:%d, total_time:%s}", k.String(), v.Count, time.Duration(v.Consume))) - } - return buf.String() -} - -// Merge merges other RegionRequestRuntimeStats. -func (r *RegionRequestRuntimeStats) Merge(rs RegionRequestRuntimeStats) { - for cmd, v := range rs.Stats { - stat, ok := r.Stats[cmd] - if !ok { - r.Stats[cmd] = &RPCRuntimeStats{ - Count: v.Count, - Consume: v.Consume, - } - continue - } - stat.Count += v.Count - stat.Consume += v.Consume - } + consume int64 } // RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way. @@ -134,9 +92,9 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []co if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil { ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx) } - if ss.Stats != nil { + if ss.stats != nil { defer func(start time.Time) { - recordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start)) + recordRegionRequestRuntimeStats(ss.stats, req.Type, time.Since(start)) }(time.Now()) } resp, err = ss.client.SendRequest(ctx, rpcCtx.Addr, req, timout) @@ -153,17 +111,17 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []co return } -func recordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { +func recordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { stat, ok := stats[cmd] if !ok { - stats[cmd] = &RPCRuntimeStats{ - Count: 1, - Consume: int64(d), + stats[cmd] = &RegionRequestRuntimeStats{ + count: 1, + consume: int64(d), } return } - stat.Count++ - stat.Consume += int64(d) + stat.count++ + stat.consume += int64(d) } func (ss *RegionBatchRequestSender) onSendFail(bo *Backoffer, ctxs []copTaskAndRPCContext, err error) error { @@ -386,14 +344,9 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, rpcCtx *RPCContext, } defer s.releaseStoreToken(rpcCtx.Store) } -<<<<<<< HEAD if s.stats != nil { -======= - - if s.Stats != nil { ->>>>>>> ea3da25... *: record more rpc runtime information in cop runtime stats (#18916) defer func(start time.Time) { - recordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start)) + recordRegionRequestRuntimeStats(s.stats, req.Type, time.Since(start)) }(time.Now()) } ctx := bo.ctx diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index f78a9720fecf0..1b812392b1426 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -236,9 +236,9 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll Client: s.store.client, } if s.mu.stats != nil { - cli.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) + cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) defer func() { - s.mergeRegionRequestStats(cli.Stats) + s.mergeRegionRequestStats(cli.stats) }() } @@ -364,9 +364,9 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { resolveLite: true, } if s.mu.stats != nil { - cli.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) + cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) defer func() { - s.mergeRegionRequestStats(cli.Stats) + s.mergeRegionRequestStats(cli.stats) }() } @@ -591,30 +591,30 @@ func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) { } } -func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats) { +func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats) { s.mu.Lock() defer s.mu.Unlock() if s.mu.stats == nil { return } - if s.mu.stats.rpcStats.Stats == nil { - s.mu.stats.rpcStats.Stats = stats + if s.mu.stats.rpcStats == nil { + s.mu.stats.rpcStats = stats return } for k, v := range stats { - stat, ok := s.mu.stats.rpcStats.Stats[k] + stat, ok := s.mu.stats.rpcStats[k] if !ok { - s.mu.stats.rpcStats.Stats[k] = v + s.mu.stats.rpcStats[k] = v continue } - stat.Count += v.Count - stat.Consume += v.Consume + stat.count += v.count + stat.consume += v.consume } } // SnapshotRuntimeStats records the runtime stats of snapshot. type SnapshotRuntimeStats struct { - rpcStats RegionRequestRuntimeStats + rpcStats map[tikvrpc.CmdType]*RegionRequestRuntimeStats backoffSleepMS map[backoffType]int backoffTimes map[backoffType]int } @@ -622,7 +622,12 @@ type SnapshotRuntimeStats struct { // String implements fmt.Stringer interface. func (rs *SnapshotRuntimeStats) String() string { var buf bytes.Buffer - buf.WriteString(rs.rpcStats.String()) + for k, v := range rs.rpcStats { + if buf.Len() > 0 { + buf.WriteByte(',') + } + buf.WriteString(fmt.Sprintf("%s:{num_rpc:%d, total_time:%s}", k.String(), v.count, time.Duration(v.consume))) + } for k, v := range rs.backoffTimes { if buf.Len() > 0 { buf.WriteByte(',') diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 518ef35925951..36fa86645ba9f 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -303,13 +303,13 @@ func (s *testSnapshotSuite) TestSnapshotThreadSafe(c *C) { } func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) { - reqStats := NewRegionRequestRuntimeStats() - recordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Second) - recordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Millisecond) + reqStats := make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) + recordRegionRequestRuntimeStats(reqStats, tikvrpc.CmdGet, time.Second) + recordRegionRequestRuntimeStats(reqStats, tikvrpc.CmdGet, time.Millisecond) snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: 0}, 0) snapshot.SetOption(kv.CollectRuntimeStats, &SnapshotRuntimeStats{}) - snapshot.mergeRegionRequestStats(reqStats.Stats) - snapshot.mergeRegionRequestStats(reqStats.Stats) + snapshot.mergeRegionRequestStats(reqStats) + snapshot.mergeRegionRequestStats(reqStats) bo := NewBackofferWithVars(context.Background(), 2000, nil) err := bo.BackoffWithMaxSleep(boTxnLockFast, 30, errors.New("test")) c.Assert(err, IsNil) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 7530fe7889d4f..ddedcfaf8fef3 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -309,6 +309,49 @@ func (crs *CopRuntimeStats) String() string { procTimes[n-1], procTimes[0], procTimes[n*4/5], procTimes[n*19/20], totalIters, totalTasks) } +// ReaderRuntimeStats collects stats for TableReader, IndexReader and IndexLookupReader +type ReaderRuntimeStats struct { + sync.Mutex + + copRespTime []time.Duration + procKeys []int64 +} + +// recordOneCopTask record once cop response time to update maxcopRespTime +func (rrs *ReaderRuntimeStats) recordOneCopTask(t time.Duration, detail *ExecDetails) { + rrs.Lock() + defer rrs.Unlock() + rrs.copRespTime = append(rrs.copRespTime, t) + rrs.procKeys = append(rrs.procKeys, detail.ProcessedKeys) +} + +func (rrs *ReaderRuntimeStats) String() string { + size := len(rrs.copRespTime) + if size == 0 { + return "" + } + if size == 1 { + return fmt.Sprintf("rpc num: 1, rpc time:%v, proc keys:%v", rrs.copRespTime[0], rrs.procKeys[0]) + } + sort.Slice(rrs.copRespTime, func(i, j int) bool { + return rrs.copRespTime[i] < rrs.copRespTime[j] + }) + vMax, vMin := rrs.copRespTime[size-1], rrs.copRespTime[0] + vP80, vP95 := rrs.copRespTime[size*4/5], rrs.copRespTime[size*19/20] + sum := 0.0 + for _, t := range rrs.copRespTime { + sum += float64(t) + } + vAvg := time.Duration(sum / float64(size)) + + sort.Slice(rrs.procKeys, func(i, j int) bool { + return rrs.procKeys[i] < rrs.procKeys[j] + }) + keyMax := rrs.procKeys[size-1] + keyP95 := rrs.procKeys[size*19/20] + return fmt.Sprintf("rpc num: %v, rpc max:%v, min:%v, avg:%v, p80:%v, p95:%v, proc keys max:%v, p95:%v", size, vMax, vMin, vAvg, vP80, vP95, keyMax, keyP95) +} + // RuntimeStats is used to express the executor runtime information. type RuntimeStats interface { GetActRows() int64 @@ -349,15 +392,16 @@ func (e *BasicRuntimeStats) String() string { // RuntimeStatsColl collects executors's execution info. type RuntimeStatsColl struct { - mu sync.Mutex - rootStats map[string]RuntimeStats - copStats map[string]*CopRuntimeStats + mu sync.Mutex + rootStats map[string]RuntimeStats + copStats map[string]*CopRuntimeStats + readerStats map[string]*ReaderRuntimeStats } // NewRuntimeStatsColl creates new executor collector. func NewRuntimeStatsColl() *RuntimeStatsColl { return &RuntimeStatsColl{rootStats: make(map[string]RuntimeStats), - copStats: make(map[string]*CopRuntimeStats)} + copStats: make(map[string]*CopRuntimeStats), readerStats: make(map[string]*ReaderRuntimeStats)} } // RegisterStats register execStat for a executor. @@ -397,6 +441,12 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID, address string, summary *tip copStats.RecordOneCopTask(address, summary) } +// RecordOneReaderStats records a specific stats for TableReader, IndexReader and IndexLookupReader. +func (e *RuntimeStatsColl) RecordOneReaderStats(planID string, copRespTime time.Duration, detail *ExecDetails) { + readerStats := e.GetReaderStats(planID) + readerStats.recordOneCopTask(copRespTime, detail) +} + // ExistsRootStats checks if the planID exists in the rootStats collection. func (e *RuntimeStatsColl) ExistsRootStats(planID string) bool { e.mu.Lock() @@ -413,6 +463,18 @@ func (e *RuntimeStatsColl) ExistsCopStats(planID string) bool { return exists } +// GetReaderStats gets the ReaderRuntimeStats specified by planID. +func (e *RuntimeStatsColl) GetReaderStats(planID string) *ReaderRuntimeStats { + e.mu.Lock() + defer e.mu.Unlock() + stats, exists := e.readerStats[planID] + if !exists { + stats = &ReaderRuntimeStats{copRespTime: make([]time.Duration, 0, 20)} + e.readerStats[planID] = stats + } + return stats +} + // ConcurrencyInfo is used to save the concurrency information of the executor operator type ConcurrencyInfo struct { concurrencyName string diff --git a/util/execdetails/execdetails_test.go b/util/execdetails/execdetails_test.go index 616b262990f4a..4805ee672c0df 100644 --- a/util/execdetails/execdetails_test.go +++ b/util/execdetails/execdetails_test.go @@ -111,3 +111,24 @@ func TestCopRuntimeStats(t *testing.T) { t.Fatal("table_reader not exists") } } + +func TestReaderStats(t *testing.T) { + r := new(ReaderRuntimeStats) + if r.String() != "" { + t.Fatal() + } + + r.procKeys = append(r.procKeys, 100) + r.copRespTime = append(r.copRespTime, time.Millisecond*100) + if r.String() != "rpc num: 1, rpc time:100ms, proc keys:100" { + t.Fatal() + } + + for i := 0; i < 100; i++ { + r.procKeys = append(r.procKeys, int64(i)) + r.copRespTime = append(r.copRespTime, time.Millisecond*time.Duration(i)) + } + if r.String() != "rpc num: 101, rpc max:100ms, min:0s, avg:50ms, p80:80ms, p95:95ms, proc keys max:100, p95:95" { + t.Fatal() + } +} From 1688c20c2bf8d0c8c3fa557c34af957e4b16f841 Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 18 Aug 2020 16:34:07 +0800 Subject: [PATCH 3/4] *: record more rpc runtime information in cop runtime stats (#18916) --- distsql/distsql_test.go | 6 -- distsql/select_result.go | 142 +++++++++++++++++++++++++-- distsql/select_result_test.go | 7 +- distsql/stream.go | 12 ++- kv/kv.go | 3 - planner/core/common_plans.go | 6 -- store/tikv/batch_coprocessor.go | 12 +-- store/tikv/coprocessor.go | 27 ++--- store/tikv/region_request.go | 69 ++++++++++--- store/tikv/snapshot.go | 31 +++--- store/tikv/snapshot_test.go | 10 +- util/execdetails/execdetails.go | 70 +------------ util/execdetails/execdetails_test.go | 21 ---- 13 files changed, 244 insertions(+), 172 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 84a4664366994..e65d94224958f 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tipb/go-tipb" ) @@ -422,11 +421,6 @@ func (r *mockResultSubset) GetData() []byte { return r.data } // GetStartKey implements kv.ResultSubset interface. func (r *mockResultSubset) GetStartKey() kv.Key { return nil } -// GetExecDetails implements kv.ResultSubset interface. -func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails { - return &execdetails.ExecDetails{} -} - // MemSize implements kv.ResultSubset interface. func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) } diff --git a/distsql/select_result.go b/distsql/select_result.go index 98f607d38d228..98281f529bca8 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -14,7 +14,11 @@ package distsql import ( + "bytes" "context" + "fmt" + "sort" + "strconv" "sync/atomic" "time" @@ -25,6 +29,8 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" @@ -82,6 +88,8 @@ type selectResult struct { fetchDuration time.Duration durationReported bool memTracker *memory.Tracker + + stats *selectResultRuntimeStats } func (r *selectResult) Fetch(ctx context.Context) { @@ -129,14 +137,18 @@ func (r *selectResult) fetchResp(ctx context.Context) error { for _, warning := range r.selectResp.Warnings { sc.AppendWarning(terror.ClassTiKV.Synthesize(terror.ErrCode(warning.Code), warning.Msg)) } - resultDetail := resultSubset.GetExecDetails() - r.updateCopRuntimeStats(ctx, resultDetail, resultSubset.RespTime()) r.feedback.Update(resultSubset.GetStartKey(), r.selectResp.OutputCounts) r.partialCount++ - if resultDetail != nil { - resultDetail.CopTime = duration + + hasStats, ok := resultSubset.(CopRuntimeStats) + if ok { + copStats := hasStats.GetCopRuntimeStats() + if copStats != nil { + r.updateCopRuntimeStats(ctx, copStats, resultSubset.RespTime()) + copStats.CopTime = duration + sc.MergeExecDetails(&copStats.ExecDetails, nil) + } } - sc.MergeExecDetails(resultDetail, nil) if len(r.selectResp.Chunks) != 0 { break } @@ -232,8 +244,8 @@ func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) erro return nil } -func (r *selectResult) updateCopRuntimeStats(ctx context.Context, detail *execdetails.ExecDetails, respTime time.Duration) { - callee := detail.CalleeAddress +func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv.CopRuntimeStats, respTime time.Duration) { + callee := copStats.CalleeAddress if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" { return } @@ -244,8 +256,19 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, detail *execde return } + if r.stats == nil { + stmtCtx := r.ctx.GetSessionVars().StmtCtx + id := r.rootPlanID + originRuntimeStats := stmtCtx.RuntimeStatsColl.GetRootStats(id) + r.stats = &selectResultRuntimeStats{ + RuntimeStats: originRuntimeStats, + backoffSleep: make(map[string]time.Duration), + rpcStat: tikv.NewRegionRequestRuntimeStats(), + } + r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, r.stats) + } + r.stats.mergeCopRuntimeStats(copStats, respTime) - r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneReaderStats(r.rootPlanID, respTime, detail) for i, detail := range r.selectResp.GetExecutionSummaries() { if detail != nil && detail.TimeProcessedNs != nil && detail.NumProducedRows != nil && detail.NumIterations != nil { @@ -289,3 +312,106 @@ func (r *selectResult) Close() error { } return r.resp.Close() } + +// CopRuntimeStats is a interface uses to check whether the result has cop runtime stats. +type CopRuntimeStats interface { + // GetCopRuntimeStats gets the cop runtime stats information. + GetCopRuntimeStats() *tikv.CopRuntimeStats +} + +type selectResultRuntimeStats struct { + execdetails.RuntimeStats + copRespTime []time.Duration + procKeys []int64 + backoffSleep map[string]time.Duration + totalProcessTime time.Duration + totalWaitTime time.Duration + rpcStat tikv.RegionRequestRuntimeStats +} + +func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntimeStats, respTime time.Duration) { + s.copRespTime = append(s.copRespTime, respTime) + s.procKeys = append(s.procKeys, copStats.ProcessedKeys) + + for k, v := range copStats.BackoffSleep { + s.backoffSleep[k] += v + } + s.totalProcessTime += copStats.ProcessTime + s.totalWaitTime += copStats.WaitTime + s.rpcStat.Merge(copStats.RegionRequestRuntimeStats) +} + +func (s *selectResultRuntimeStats) String() string { + buf := bytes.NewBuffer(nil) + if s.RuntimeStats != nil { + buf.WriteString(s.RuntimeStats.String()) + } + if len(s.copRespTime) > 0 { + size := len(s.copRespTime) + buf.WriteString(", ") + if size == 1 { + buf.WriteString(fmt.Sprintf("cop_task: {num: 1, max:%v, proc_keys: %v", s.copRespTime[0], s.procKeys[0])) + } else { + sort.Slice(s.copRespTime, func(i, j int) bool { + return s.copRespTime[i] < s.copRespTime[j] + }) + vMax, vMin := s.copRespTime[size-1], s.copRespTime[0] + vP95 := s.copRespTime[size*19/20] + sum := 0.0 + for _, t := range s.copRespTime { + sum += float64(t) + } + vAvg := time.Duration(sum / float64(size)) + + sort.Slice(s.procKeys, func(i, j int) bool { + return s.procKeys[i] < s.procKeys[j] + }) + keyMax := s.procKeys[size-1] + keyP95 := s.procKeys[size*19/20] + buf.WriteString(fmt.Sprintf("cop_task: {num: %v, max: %v, min: %v, avg: %v, p95: %v", size, vMax, vMin, vAvg, vP95)) + if keyMax > 0 { + buf.WriteString(", max_proc_keys: ") + buf.WriteString(strconv.FormatInt(keyMax, 10)) + buf.WriteString(", p95_proc_keys: ") + buf.WriteString(strconv.FormatInt(keyP95, 10)) + } + if s.totalProcessTime > 0 { + buf.WriteString(", tot_proc: ") + buf.WriteString(s.totalProcessTime.String()) + if s.totalWaitTime > 0 { + buf.WriteString(", tot_wait: ") + buf.WriteString(s.totalWaitTime.String()) + } + } + } + } + copRPC := s.rpcStat.Stats[tikvrpc.CmdCop] + delete(s.rpcStat.Stats, tikvrpc.CmdCop) + if copRPC.Count > 0 { + buf.WriteString(", rpc_num: ") + buf.WriteString(strconv.FormatInt(copRPC.Count, 10)) + buf.WriteString(", rpc_time: ") + buf.WriteString(time.Duration(copRPC.Consume).String()) + } + buf.WriteString("}") + + rpcStatsStr := s.rpcStat.String() + if len(rpcStatsStr) > 0 { + buf.WriteString(", ") + buf.WriteString(rpcStatsStr) + } + + if len(s.backoffSleep) > 0 { + buf.WriteString(", backoff{") + idx := 0 + for k, d := range s.backoffSleep { + if idx > 0 { + buf.WriteString(", ") + } + idx++ + buf.WriteString(fmt.Sprintf("%s: %s", k, d.String())) + } + buf.WriteString("}") + } + return buf.String() +} diff --git a/distsql/select_result_test.go b/distsql/select_result_test.go index ada163df3859c..469d687517752 100644 --- a/distsql/select_result_test.go +++ b/distsql/select_result_test.go @@ -18,6 +18,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tipb/go-tipb" @@ -29,7 +30,7 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) { sr := selectResult{ctx: ctx} c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil) sr.rootPlanID = 1234 - sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "a"}, 0) + sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0) ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl() t := uint64(1) @@ -39,12 +40,12 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) { }, } c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue) - sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "callee"}, 0) + sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0) c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats(1234), IsFalse) sr.copPlanIDs = []int{sr.rootPlanID} c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil) c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs)) - sr.updateCopRuntimeStats(context.Background(), &execdetails.ExecDetails{CalleeAddress: "callee"}, 0) + sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0) c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats(1234).String(), Equals, "time:1ns, loops:1") } diff --git a/distsql/stream.go b/distsql/stream.go index 934eb28e16885..f1817084cdf44 100644 --- a/distsql/stream.go +++ b/distsql/stream.go @@ -106,11 +106,15 @@ func (r *streamResult) readDataFromResponse(ctx context.Context, resp kv.Respons } r.feedback.Update(resultSubset.GetStartKey(), stream.OutputCounts) r.partialCount++ - resultDetail := resultSubset.GetExecDetails() - if resultDetail != nil { - resultDetail.CopTime = duration + + hasStats, ok := resultSubset.(CopRuntimeStats) + if ok { + copStats := hasStats.GetCopRuntimeStats() + if copStats != nil { + copStats.CopTime = duration + r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(&copStats.ExecDetails, nil) + } } - r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(resultDetail, nil) return false, nil } diff --git a/kv/kv.go b/kv/kv.go index 3b814a8733096..314bb4ac4c296 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/store/tikv/oracle" - "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" ) @@ -341,8 +340,6 @@ type ResultSubset interface { GetData() []byte // GetStartKey gets the start key. GetStartKey() Key - // GetExecDetails gets the detail information. - GetExecDetails() *execdetails.ExecDetails // MemSize returns how many bytes of memory this result use for tracing memory usage. MemSize() int64 // RespTime returns the response time for the request. diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index d33f889d0dbb3..2d7d910490ee7 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -1024,12 +1024,6 @@ func getRuntimeInfo(ctx sessionctx.Context, p Plan) (actRows, analyzeInfo, memor analyzeInfo = "time:0ns, loops:0" actRows = "0" } - switch p.(type) { - case *PhysicalTableReader, *PhysicalIndexReader, *PhysicalIndexLookUpReader: - if s := runtimeStatsColl.GetReaderStats(explainID); s != nil && len(s.String()) > 0 { - analyzeInfo += ", " + s.String() - } - } memoryInfo = "N/A" memTracker := ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithoutLock(p.ID()) diff --git a/store/tikv/batch_coprocessor.go b/store/tikv/batch_coprocessor.go index 0ee09c06f24da..59db552d4a0de 100644 --- a/store/tikv/batch_coprocessor.go +++ b/store/tikv/batch_coprocessor.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" - "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" @@ -42,7 +41,7 @@ type batchCopTask struct { type batchCopResponse struct { pbResp *coprocessor.BatchResponse - detail *execdetails.ExecDetails + detail *CopRuntimeStats // batch Cop Response is yet to return startKey. So batchCop cannot retry partially. startKey kv.Key @@ -63,7 +62,7 @@ func (rs *batchCopResponse) GetStartKey() kv.Key { // GetExecDetails is unavailable currently, because TiFlash has not collected exec details for batch cop. // TODO: Will fix in near future. -func (rs *batchCopResponse) GetExecDetails() *execdetails.ExecDetails { +func (rs *batchCopResponse) GetCopRuntimeStats() *CopRuntimeStats { return rs.detail } @@ -77,9 +76,6 @@ func (rs *batchCopResponse) MemSize() int64 { rs.respSize += int64(cap(rs.startKey)) if rs.detail != nil { rs.respSize += int64(sizeofExecDetails) - if rs.detail.CommitDetail != nil { - rs.respSize += int64(sizeofCommitDetails) - } } if rs.pbResp != nil { // Using a approximate size since it's hard to get a accurate value. @@ -304,7 +300,7 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task * for idx := 0; idx < len(tasks); idx++ { ret, err := b.handleTaskOnce(ctx, bo, tasks[idx]) if err != nil { - resp := &batchCopResponse{err: errors.Trace(err), detail: new(execdetails.ExecDetails)} + resp := &batchCopResponse{err: errors.Trace(err), detail: new(CopRuntimeStats)} b.sendToRespCh(resp) break } @@ -415,7 +411,7 @@ func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *copro resp := batchCopResponse{ pbResp: response, - detail: new(execdetails.ExecDetails), + detail: new(CopRuntimeStats), } resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 8a936ae58197e..69e3d18a3cb11 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -433,7 +433,7 @@ type copIteratorTaskSender struct { type copResponse struct { pbResp *coprocessor.Response - detail *execdetails.ExecDetails + detail *CopRuntimeStats startKey kv.Key err error respSize int64 @@ -455,7 +455,7 @@ func (rs *copResponse) GetStartKey() kv.Key { return rs.startKey } -func (rs *copResponse) GetExecDetails() *execdetails.ExecDetails { +func (rs *copResponse) GetCopRuntimeStats() *CopRuntimeStats { return rs.detail } @@ -469,9 +469,6 @@ func (rs *copResponse) MemSize() int64 { rs.respSize += int64(cap(rs.startKey)) if rs.detail != nil { rs.respSize += int64(sizeofExecDetails) - if rs.detail.CommitDetail != nil { - rs.respSize += int64(sizeofCommitDetails) - } } if rs.pbResp != nil { // Using a approximate size since it's hard to get a accurate value. @@ -775,6 +772,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch }) req.StoreTp = task.storeType startTime := time.Now() + worker.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) resp, rpcCtx, storeAddr, err := worker.SendReqCtx(bo, req, task.region, ReadTimeoutMedium, task.storeType, task.storeAddr) if err != nil { if task.storeType == kv.TiDB { @@ -840,7 +838,7 @@ type clientHelper struct { *minCommitTSPushed Client resolveLite bool - stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats + RegionRequestRuntimeStats } // ResolveLocks wraps the ResolveLocks function and store the resolved result. @@ -848,9 +846,9 @@ func (ch *clientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks var err error var resolvedLocks []uint64 var msBeforeTxnExpired int64 - if ch.stats != nil { + if ch.Stats != nil { defer func(start time.Time) { - recordRegionRequestRuntimeStats(ch.stats, tikvrpc.CmdResolveLock, time.Since(start)) + recordRegionRequestRuntimeStats(ch.Stats, tikvrpc.CmdResolveLock, time.Since(start)) }(time.Now()) } if ch.resolveLite { @@ -874,7 +872,7 @@ func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID if len(directStoreAddr) > 0 { sender.storeAddr = directStoreAddr } - sender.stats = ch.stats + sender.Stats = ch.Stats req.Context.ResolvedLocks = ch.minCommitTSPushed.Get() resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType) return resp, ctx, sender.storeAddr, err @@ -1023,8 +1021,9 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon resp.startKey = task.ranges.at(0).StartKey } if resp.detail == nil { - resp.detail = new(execdetails.ExecDetails) + resp.detail = new(CopRuntimeStats) } + resp.detail.Stats = worker.Stats resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond resp.detail.BackoffSleep = make(map[string]time.Duration, len(bo.backoffTimes)) resp.detail.BackoffTimes = make(map[string]int, len(bo.backoffTimes)) @@ -1078,6 +1077,12 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon return nil, nil } +// CopRuntimeStats contains execution detail information. +type CopRuntimeStats struct { + execdetails.ExecDetails + RegionRequestRuntimeStats +} + func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error { errCode := errno.ErrUnknown errMsg := err.Error() @@ -1101,7 +1106,7 @@ func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, pbResp: &coprocessor.Response{ Data: data, }, - detail: &execdetails.ExecDetails{}, + detail: &CopRuntimeStats{}, } worker.sendToRespCh(resp, ch, true) return nil diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 4e4a3bab710e5..57ba5f0df8d90 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -14,7 +14,9 @@ package tikv import ( + "bytes" "context" + "fmt" "strconv" "sync" "sync/atomic" @@ -61,14 +63,54 @@ type RegionRequestSender struct { storeAddr string rpcError error failStoreIDs map[uint64]struct{} - stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats + RegionRequestRuntimeStats } // RegionRequestRuntimeStats records the runtime stats of send region requests. type RegionRequestRuntimeStats struct { - count int64 + Stats map[tikvrpc.CmdType]*RPCRuntimeStats +} + +// NewRegionRequestRuntimeStats returns a new RegionRequestRuntimeStats. +func NewRegionRequestRuntimeStats() RegionRequestRuntimeStats { + return RegionRequestRuntimeStats{ + Stats: make(map[tikvrpc.CmdType]*RPCRuntimeStats), + } +} + +// RPCRuntimeStats indicates the RPC request count and consume time. +type RPCRuntimeStats struct { + Count int64 // Send region request consume time. - consume int64 + Consume int64 +} + +// String implements fmt.Stringer interface. +func (r *RegionRequestRuntimeStats) String() string { + var buf bytes.Buffer + for k, v := range r.Stats { + if buf.Len() > 0 { + buf.WriteByte(',') + } + buf.WriteString(fmt.Sprintf("%s:{num_rpc:%d, total_time:%s}", k.String(), v.Count, time.Duration(v.Consume))) + } + return buf.String() +} + +// Merge merges other RegionRequestRuntimeStats. +func (r *RegionRequestRuntimeStats) Merge(rs RegionRequestRuntimeStats) { + for cmd, v := range rs.Stats { + stat, ok := r.Stats[cmd] + if !ok { + r.Stats[cmd] = &RPCRuntimeStats{ + Count: v.Count, + Consume: v.Consume, + } + continue + } + stat.Count += v.Count + stat.Consume += v.Consume + } } // RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way. @@ -92,9 +134,9 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []co if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil { ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx) } - if ss.stats != nil { + if ss.Stats != nil { defer func(start time.Time) { - recordRegionRequestRuntimeStats(ss.stats, req.Type, time.Since(start)) + recordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start)) }(time.Now()) } resp, err = ss.client.SendRequest(ctx, rpcCtx.Addr, req, timout) @@ -111,17 +153,17 @@ func (ss *RegionBatchRequestSender) sendStreamReqToAddr(bo *Backoffer, ctxs []co return } -func recordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { +func recordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) { stat, ok := stats[cmd] if !ok { - stats[cmd] = &RegionRequestRuntimeStats{ - count: 1, - consume: int64(d), + stats[cmd] = &RPCRuntimeStats{ + Count: 1, + Consume: int64(d), } return } - stat.count++ - stat.consume += int64(d) + stat.Count++ + stat.Consume += int64(d) } func (ss *RegionBatchRequestSender) onSendFail(bo *Backoffer, ctxs []copTaskAndRPCContext, err error) error { @@ -344,9 +386,10 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, rpcCtx *RPCContext, } defer s.releaseStoreToken(rpcCtx.Store) } - if s.stats != nil { + + if s.Stats != nil { defer func(start time.Time) { - recordRegionRequestRuntimeStats(s.stats, req.Type, time.Since(start)) + recordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start)) }(time.Now()) } ctx := bo.ctx diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 1b812392b1426..f78a9720fecf0 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -236,9 +236,9 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll Client: s.store.client, } if s.mu.stats != nil { - cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) + cli.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) defer func() { - s.mergeRegionRequestStats(cli.stats) + s.mergeRegionRequestStats(cli.Stats) }() } @@ -364,9 +364,9 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { resolveLite: true, } if s.mu.stats != nil { - cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) + cli.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) defer func() { - s.mergeRegionRequestStats(cli.stats) + s.mergeRegionRequestStats(cli.Stats) }() } @@ -591,30 +591,30 @@ func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) { } } -func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats) { +func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats) { s.mu.Lock() defer s.mu.Unlock() if s.mu.stats == nil { return } - if s.mu.stats.rpcStats == nil { - s.mu.stats.rpcStats = stats + if s.mu.stats.rpcStats.Stats == nil { + s.mu.stats.rpcStats.Stats = stats return } for k, v := range stats { - stat, ok := s.mu.stats.rpcStats[k] + stat, ok := s.mu.stats.rpcStats.Stats[k] if !ok { - s.mu.stats.rpcStats[k] = v + s.mu.stats.rpcStats.Stats[k] = v continue } - stat.count += v.count - stat.consume += v.consume + stat.Count += v.Count + stat.Consume += v.Consume } } // SnapshotRuntimeStats records the runtime stats of snapshot. type SnapshotRuntimeStats struct { - rpcStats map[tikvrpc.CmdType]*RegionRequestRuntimeStats + rpcStats RegionRequestRuntimeStats backoffSleepMS map[backoffType]int backoffTimes map[backoffType]int } @@ -622,12 +622,7 @@ type SnapshotRuntimeStats struct { // String implements fmt.Stringer interface. func (rs *SnapshotRuntimeStats) String() string { var buf bytes.Buffer - for k, v := range rs.rpcStats { - if buf.Len() > 0 { - buf.WriteByte(',') - } - buf.WriteString(fmt.Sprintf("%s:{num_rpc:%d, total_time:%s}", k.String(), v.count, time.Duration(v.consume))) - } + buf.WriteString(rs.rpcStats.String()) for k, v := range rs.backoffTimes { if buf.Len() > 0 { buf.WriteByte(',') diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 36fa86645ba9f..518ef35925951 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -303,13 +303,13 @@ func (s *testSnapshotSuite) TestSnapshotThreadSafe(c *C) { } func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) { - reqStats := make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) - recordRegionRequestRuntimeStats(reqStats, tikvrpc.CmdGet, time.Second) - recordRegionRequestRuntimeStats(reqStats, tikvrpc.CmdGet, time.Millisecond) + reqStats := NewRegionRequestRuntimeStats() + recordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Second) + recordRegionRequestRuntimeStats(reqStats.Stats, tikvrpc.CmdGet, time.Millisecond) snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: 0}, 0) snapshot.SetOption(kv.CollectRuntimeStats, &SnapshotRuntimeStats{}) - snapshot.mergeRegionRequestStats(reqStats) - snapshot.mergeRegionRequestStats(reqStats) + snapshot.mergeRegionRequestStats(reqStats.Stats) + snapshot.mergeRegionRequestStats(reqStats.Stats) bo := NewBackofferWithVars(context.Background(), 2000, nil) err := bo.BackoffWithMaxSleep(boTxnLockFast, 30, errors.New("test")) c.Assert(err, IsNil) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 9bbc09fbdf40c..8f480e1354a20 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -323,49 +323,6 @@ func (crs *CopRuntimeStats) String() string { procTimes[n-1], procTimes[0], procTimes[n*4/5], procTimes[n*19/20], totalIters, totalTasks) } -// ReaderRuntimeStats collects stats for TableReader, IndexReader and IndexLookupReader -type ReaderRuntimeStats struct { - sync.Mutex - - copRespTime []time.Duration - procKeys []int64 -} - -// recordOneCopTask record once cop response time to update maxcopRespTime -func (rrs *ReaderRuntimeStats) recordOneCopTask(t time.Duration, detail *ExecDetails) { - rrs.Lock() - defer rrs.Unlock() - rrs.copRespTime = append(rrs.copRespTime, t) - rrs.procKeys = append(rrs.procKeys, detail.ProcessedKeys) -} - -func (rrs *ReaderRuntimeStats) String() string { - size := len(rrs.copRespTime) - if size == 0 { - return "" - } - if size == 1 { - return fmt.Sprintf("rpc num: 1, rpc time:%v, proc keys:%v", rrs.copRespTime[0], rrs.procKeys[0]) - } - sort.Slice(rrs.copRespTime, func(i, j int) bool { - return rrs.copRespTime[i] < rrs.copRespTime[j] - }) - vMax, vMin := rrs.copRespTime[size-1], rrs.copRespTime[0] - vP80, vP95 := rrs.copRespTime[size*4/5], rrs.copRespTime[size*19/20] - sum := 0.0 - for _, t := range rrs.copRespTime { - sum += float64(t) - } - vAvg := time.Duration(sum / float64(size)) - - sort.Slice(rrs.procKeys, func(i, j int) bool { - return rrs.procKeys[i] < rrs.procKeys[j] - }) - keyMax := rrs.procKeys[size-1] - keyP95 := rrs.procKeys[size*19/20] - return fmt.Sprintf("rpc num: %v, rpc max:%v, min:%v, avg:%v, p80:%v, p95:%v, proc keys max:%v, p95:%v", size, vMax, vMin, vAvg, vP80, vP95, keyMax, keyP95) -} - // RuntimeStats is used to express the executor runtime information. type RuntimeStats interface { GetActRows() int64 @@ -406,16 +363,15 @@ func (e *BasicRuntimeStats) String() string { // RuntimeStatsColl collects executors's execution info. type RuntimeStatsColl struct { - mu sync.Mutex - rootStats map[int]RuntimeStats - copStats map[int]*CopRuntimeStats - readerStats map[int]*ReaderRuntimeStats + mu sync.Mutex + rootStats map[int]RuntimeStats + copStats map[int]*CopRuntimeStats } // NewRuntimeStatsColl creates new executor collector. func NewRuntimeStatsColl() *RuntimeStatsColl { return &RuntimeStatsColl{rootStats: make(map[int]RuntimeStats), - copStats: make(map[int]*CopRuntimeStats), readerStats: make(map[int]*ReaderRuntimeStats)} + copStats: make(map[int]*CopRuntimeStats)} } // RegisterStats register execStat for a executor. @@ -455,12 +411,6 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID int, address string, summary copStats.RecordOneCopTask(address, summary) } -// RecordOneReaderStats records a specific stats for TableReader, IndexReader and IndexLookupReader. -func (e *RuntimeStatsColl) RecordOneReaderStats(planID int, copRespTime time.Duration, detail *ExecDetails) { - readerStats := e.GetReaderStats(planID) - readerStats.recordOneCopTask(copRespTime, detail) -} - // ExistsRootStats checks if the planID exists in the rootStats collection. func (e *RuntimeStatsColl) ExistsRootStats(planID int) bool { e.mu.Lock() @@ -477,18 +427,6 @@ func (e *RuntimeStatsColl) ExistsCopStats(planID int) bool { return exists } -// GetReaderStats gets the ReaderRuntimeStats specified by planID. -func (e *RuntimeStatsColl) GetReaderStats(planID int) *ReaderRuntimeStats { - e.mu.Lock() - defer e.mu.Unlock() - stats, exists := e.readerStats[planID] - if !exists { - stats = &ReaderRuntimeStats{copRespTime: make([]time.Duration, 0, 20)} - e.readerStats[planID] = stats - } - return stats -} - // ConcurrencyInfo is used to save the concurrency information of the executor operator type ConcurrencyInfo struct { concurrencyName string diff --git a/util/execdetails/execdetails_test.go b/util/execdetails/execdetails_test.go index e59b1b80df84b..65a5c5b9797f4 100644 --- a/util/execdetails/execdetails_test.go +++ b/util/execdetails/execdetails_test.go @@ -114,24 +114,3 @@ func TestCopRuntimeStats(t *testing.T) { t.Fatal("table_reader not exists") } } - -func TestReaderStats(t *testing.T) { - r := new(ReaderRuntimeStats) - if r.String() != "" { - t.Fatal() - } - - r.procKeys = append(r.procKeys, 100) - r.copRespTime = append(r.copRespTime, time.Millisecond*100) - if r.String() != "rpc num: 1, rpc time:100ms, proc keys:100" { - t.Fatal() - } - - for i := 0; i < 100; i++ { - r.procKeys = append(r.procKeys, int64(i)) - r.copRespTime = append(r.copRespTime, time.Millisecond*time.Duration(i)) - } - if r.String() != "rpc num: 101, rpc max:100ms, min:0s, avg:50ms, p80:80ms, p95:95ms, proc keys max:100, p95:95" { - t.Fatal() - } -} From 813514136f07257ace9ded429f54da94a7fd4262 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 19 Aug 2020 12:05:57 +0800 Subject: [PATCH 4/4] distsql: fix panic on selectResultRuntimeStats.String (#19277) --- distsql/select_result.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 98281f529bca8..365b5cb6adb28 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -386,8 +386,8 @@ func (s *selectResultRuntimeStats) String() string { } } copRPC := s.rpcStat.Stats[tikvrpc.CmdCop] - delete(s.rpcStat.Stats, tikvrpc.CmdCop) - if copRPC.Count > 0 { + if copRPC != nil && copRPC.Count > 0 { + delete(s.rpcStat.Stats, tikvrpc.CmdCop) buf.WriteString(", rpc_num: ") buf.WriteString(strconv.FormatInt(copRPC.Count, 10)) buf.WriteString(", rpc_time: ")