Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix cop task runtime information is wrong in the concurrent executor (#19849) #19947

Merged
merged 7 commits into from
Oct 19, 2020
25 changes: 25 additions & 0 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tipb/go-tipb"
)
Expand Down Expand Up @@ -156,6 +158,29 @@ func (s *testSuite) TestSelectWithRuntimeStats(c *C) {
c.Assert(err, IsNil)
}

func (s *testSuite) TestSelectResultRuntimeStats(c *C) {
basic := &execdetails.BasicRuntimeStats{}
basic.Record(time.Second, 20)
s1 := &selectResultRuntimeStats{
copRespTime: []time.Duration{time.Second, time.Millisecond},
procKeys: []int64{100, 200},
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
}
s2 := *s1
stmtStats := execdetails.NewRuntimeStatsColl()
stmtStats.RegisterStats(1, basic)
stmtStats.RegisterStats(1, s1)
stmtStats.RegisterStats(1, &s2)
stats := stmtStats.GetRootStats(1)
expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 2ms}"
c.Assert(stats.String(), Equals, expect)
// Test for idempotence.
c.Assert(stats.String(), Equals, expect)
}

func (s *testSuite) createSelectStreaming(batch, totalRows int, c *C) (*streamResult, []*types.FieldType) {
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetDAGRequest(&tipb.DAGRequest{}).
Expand Down
69 changes: 52 additions & 17 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,8 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv
return
}
if r.stats == nil {
stmtCtx := r.ctx.GetSessionVars().StmtCtx
id := r.rootPlanID
originRuntimeStats := stmtCtx.RuntimeStatsColl.GetRootStats(id)
r.stats = &selectResultRuntimeStats{
RuntimeStats: originRuntimeStats,
backoffSleep: make(map[string]time.Duration),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
}
Expand Down Expand Up @@ -326,7 +323,6 @@ type CopRuntimeStats interface {
}

type selectResultRuntimeStats struct {
execdetails.RuntimeStats
copRespTime []time.Duration
procKeys []int64
backoffSleep map[string]time.Duration
Expand All @@ -351,14 +347,48 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntim
}
}

func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
newRs := selectResultRuntimeStats{
copRespTime: make([]time.Duration, 0, len(s.copRespTime)),
procKeys: make([]int64, 0, len(s.procKeys)),
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
CoprCacheHitNum: s.CoprCacheHitNum,
}
newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...)
newRs.procKeys = append(newRs.procKeys, s.procKeys...)
for k, v := range s.backoffSleep {
newRs.backoffSleep[k] += v
}
newRs.totalProcessTime += s.totalProcessTime
newRs.totalWaitTime += s.totalWaitTime
for k, v := range s.rpcStat.Stats {
newRs.rpcStat.Stats[k] = v
}
return &newRs
}

func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) {
other, ok := rs.(*selectResultRuntimeStats)
if !ok {
return
}
s.copRespTime = append(s.copRespTime, other.copRespTime...)
s.procKeys = append(s.procKeys, other.procKeys...)

for k, v := range other.backoffSleep {
s.backoffSleep[k] += v
}
s.totalProcessTime += other.totalProcessTime
s.totalWaitTime += other.totalWaitTime
s.rpcStat.Merge(other.rpcStat)
s.CoprCacheHitNum += other.CoprCacheHitNum
}

func (s *selectResultRuntimeStats) String() string {
buf := bytes.NewBuffer(nil)
if s.RuntimeStats != nil {
buf.WriteString(s.RuntimeStats.String())
}
if len(s.copRespTime) > 0 {
size := len(s.copRespTime)
buf.WriteString(", ")
if size == 1 {
buf.WriteString(fmt.Sprintf("cop_task: {num: 1, max:%v, proc_keys: %v", s.copRespTime[0], s.procKeys[0]))
} else {
Expand Down Expand Up @@ -394,18 +424,18 @@ func (s *selectResultRuntimeStats) String() string {
}
}
}
}
copRPC := s.rpcStat.Stats[tikvrpc.CmdCop]
if copRPC != nil && copRPC.Count > 0 {
delete(s.rpcStat.Stats, tikvrpc.CmdCop)
buf.WriteString(", rpc_num: ")
buf.WriteString(strconv.FormatInt(copRPC.Count, 10))
buf.WriteString(", rpc_time: ")
buf.WriteString(time.Duration(copRPC.Consume).String())
copRPC := s.rpcStat.Stats[tikvrpc.CmdCop]
if copRPC != nil && copRPC.Count > 0 {
delete(s.rpcStat.Stats, tikvrpc.CmdCop)
buf.WriteString(", rpc_num: ")
buf.WriteString(strconv.FormatInt(copRPC.Count, 10))
buf.WriteString(", rpc_time: ")
buf.WriteString(time.Duration(copRPC.Consume).String())
}
buf.WriteString(fmt.Sprintf(", copr_cache_hit_ratio: %v",
strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(len(s.copRespTime)), 'f', 2, 64)))
buf.WriteString("}")
}
buf.WriteString("}")

rpcStatsStr := s.rpcStat.String()
if len(rpcStatsStr) > 0 {
Expand All @@ -427,3 +457,8 @@ func (s *selectResultRuntimeStats) String() string {
}
return buf.String()
}

// Tp implements the RuntimeStats interface.
func (s *selectResultRuntimeStats) Tp() int {
return execdetails.TpSelectResultRuntimeStats
}
6 changes: 2 additions & 4 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,11 +804,9 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, succ bool, hasMoreResults boo
execDetail := sessVars.StmtCtx.GetExecDetails()
// Attach commit/lockKeys runtime stats to executor runtime stats.
if (execDetail.CommitDetail != nil || execDetail.LockKeysDetail != nil) && sessVars.StmtCtx.RuntimeStatsColl != nil {
stats := sessVars.StmtCtx.RuntimeStatsColl.GetRootStats(a.Plan.ID())
statsWithCommit := &execdetails.RuntimeStatsWithCommit{
RuntimeStats: stats,
Commit: execDetail.CommitDetail,
LockKeys: execDetail.LockKeysDetail,
Commit: execDetail.CommitDetail,
LockKeys: execDetail.LockKeysDetail,
}
sessVars.StmtCtx.RuntimeStatsColl.RegisterStats(a.Plan.ID(), statsWithCommit)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (e *HashAggExec) Close() error {
}
partialConcurrencyInfo := execdetails.NewConcurrencyInfo("PartialConcurrency", partialConcurrency)
finalConcurrencyInfo := execdetails.NewConcurrencyInfo("FinalConcurrency", finalConcurrency)
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{}
runtimeStats.SetConcurrencyInfo(partialConcurrencyInfo, finalConcurrencyInfo)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
Expand Down
1 change: 0 additions & 1 deletion executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
Expand Down
4 changes: 1 addition & 3 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.innerPtrBytes = make([][]byte, 0, 8)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
}
e.stats = &indexLookUpJoinRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.startWorkers(ctx)
Expand Down
37 changes: 29 additions & 8 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,7 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.innerPtrBytes = make([][]byte, 0, 8)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
}
e.stats = &indexLookUpJoinRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.startWorkers(ctx)
Expand Down Expand Up @@ -738,7 +736,6 @@ func (e *IndexLookUpJoin) Close() error {
}

type indexLookUpJoinRuntimeStats struct {
*execdetails.BasicRuntimeStats
concurrency int
probe int64
innerWorker innerWorkerRuntimeStats
Expand All @@ -755,11 +752,8 @@ type innerWorkerRuntimeStats struct {

func (e *indexLookUpJoinRuntimeStats) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 16))
if e.BasicRuntimeStats != nil {
buf.WriteString(e.BasicRuntimeStats.String())
}
if e.innerWorker.totalTime > 0 {
buf.WriteString(", inner:{total:")
buf.WriteString("inner:{total:")
buf.WriteString(time.Duration(e.innerWorker.totalTime).String())
buf.WriteString(", concurrency:")
if e.concurrency > 0 {
Expand Down Expand Up @@ -787,3 +781,30 @@ func (e *indexLookUpJoinRuntimeStats) String() string {
}
return buf.String()
}

func (e *indexLookUpJoinRuntimeStats) Clone() execdetails.RuntimeStats {
return &indexLookUpJoinRuntimeStats{
concurrency: e.concurrency,
probe: e.probe,
innerWorker: e.innerWorker,
}
}

func (e *indexLookUpJoinRuntimeStats) Merge(rs execdetails.RuntimeStats) {
tmp, ok := rs.(*indexLookUpJoinRuntimeStats)
if !ok {
return
}
e.probe += tmp.probe
e.innerWorker.totalTime += tmp.innerWorker.totalTime
e.innerWorker.task += tmp.innerWorker.task
e.innerWorker.construct += tmp.innerWorker.construct
e.innerWorker.fetch += tmp.innerWorker.fetch
e.innerWorker.build += tmp.innerWorker.build
e.innerWorker.join += tmp.innerWorker.join
}

// Tp implements the RuntimeStats interface.
func (e *indexLookUpJoinRuntimeStats) Tp() int {
return execdetails.TpIndexLookUpJoinRuntimeStats
}
2 changes: 1 addition & 1 deletion executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func (e *IndexLookUpMergeJoin) Close() error {
e.memTracker = nil
if e.runtimeStats != nil {
concurrency := cap(e.resultCh)
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
Expand Down
1 change: 0 additions & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,6 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {
if e.stats == nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
Expand Down
50 changes: 40 additions & 10 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
}
if e.runtimeStats != nil {
e.stats = &hashJoinRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
concurrent: cap(e.joiners),
concurrent: cap(e.joiners),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
Expand Down Expand Up @@ -932,11 +931,9 @@ type joinRuntimeStats struct {
hashStat hashStatistic
}

func newJoinRuntimeStats(basic *execdetails.BasicRuntimeStats) *joinRuntimeStats {
func newJoinRuntimeStats() *joinRuntimeStats {
stats := &joinRuntimeStats{
RuntimeStatsWithConcurrencyInfo: &execdetails.RuntimeStatsWithConcurrencyInfo{
BasicRuntimeStats: basic,
},
RuntimeStatsWithConcurrencyInfo: &execdetails.RuntimeStatsWithConcurrencyInfo{},
}
return stats
}
Expand Down Expand Up @@ -973,9 +970,12 @@ func (e *joinRuntimeStats) String() string {
return buf.String()
}

type hashJoinRuntimeStats struct {
*execdetails.BasicRuntimeStats
// Tp implements the RuntimeStats interface.
func (e *joinRuntimeStats) Tp() int {
return execdetails.TpJoinRuntimeStats
}

type hashJoinRuntimeStats struct {
fetchAndBuildHashTable time.Duration
hashStat hashStatistic
fetchAndProbe int64
Expand All @@ -996,11 +996,15 @@ func (e *hashJoinRuntimeStats) setMaxFetchAndProbeTime(t int64) {
}
}

// Tp implements the RuntimeStats interface.
func (e *hashJoinRuntimeStats) Tp() int {
return execdetails.TpHashJoinRuntimeStats
}

func (e *hashJoinRuntimeStats) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 128))
buf.WriteString(e.BasicRuntimeStats.String())
if e.fetchAndBuildHashTable > 0 {
buf.WriteString(", build_hash_table:{total:")
buf.WriteString("build_hash_table:{total:")
buf.WriteString(e.fetchAndBuildHashTable.String())
buf.WriteString(", fetch:")
buf.WriteString((e.fetchAndBuildHashTable - e.hashStat.buildTableElapse).String())
Expand All @@ -1027,3 +1031,29 @@ func (e *hashJoinRuntimeStats) String() string {
}
return buf.String()
}

func (e *hashJoinRuntimeStats) Clone() execdetails.RuntimeStats {
return &hashJoinRuntimeStats{
fetchAndBuildHashTable: e.fetchAndBuildHashTable,
hashStat: e.hashStat,
fetchAndProbe: e.fetchAndProbe,
probe: e.probe,
concurrent: e.concurrent,
maxFetchAndProbe: e.maxFetchAndProbe,
}
}

func (e *hashJoinRuntimeStats) Merge(rs execdetails.RuntimeStats) {
tmp, ok := rs.(*hashJoinRuntimeStats)
if !ok {
return
}
e.fetchAndBuildHashTable += tmp.fetchAndBuildHashTable
e.hashStat.buildTableElapse += tmp.hashStat.buildTableElapse
e.hashStat.probeCollision += tmp.hashStat.probeCollision
e.fetchAndProbe += tmp.fetchAndProbe
e.probe += tmp.probe
if e.maxFetchAndProbe < tmp.maxFetchAndProbe {
e.maxFetchAndProbe = tmp.maxFetchAndProbe
}
}
Loading