From 34b70e166708868d0d3821bf2c31e06739e1eb23 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Mon, 19 Oct 2020 17:11:38 +0800 Subject: [PATCH] *: fix cop task runtime information is wrong in the concurrent executor (#19849) (#19947) --- distsql/distsql_test.go | 25 +++ distsql/select_result.go | 69 +++++-- executor/adapter.go | 6 +- executor/aggregate.go | 2 +- executor/batch_point_get.go | 1 - executor/index_lookup_hash_join.go | 4 +- executor/index_lookup_join.go | 37 +++- executor/index_lookup_merge_join.go | 2 +- executor/insert_common.go | 1 - executor/join.go | 50 ++++- executor/join_pkg_test.go | 38 ++++ executor/point_get.go | 43 +++-- executor/projection.go | 4 +- executor/shuffle.go | 2 +- executor/update.go | 1 - store/tikv/snapshot.go | 55 ++++++ util/execdetails/execdetails.go | 275 +++++++++++++++++++++++++-- util/execdetails/execdetails_test.go | 48 ++++- 18 files changed, 569 insertions(+), 94 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index e65d94224958f..5a5cfa8230db1 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -27,9 +27,11 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tipb/go-tipb" ) @@ -156,6 +158,29 @@ func (s *testSuite) TestSelectWithRuntimeStats(c *C) { c.Assert(err, IsNil) } +func (s *testSuite) TestSelectResultRuntimeStats(c *C) { + basic := &execdetails.BasicRuntimeStats{} + basic.Record(time.Second, 20) + s1 := &selectResultRuntimeStats{ + copRespTime: []time.Duration{time.Second, time.Millisecond}, + procKeys: []int64{100, 200}, + backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond}, + totalProcessTime: time.Second, + totalWaitTime: time.Second, + rpcStat: tikv.NewRegionRequestRuntimeStats(), + } + s2 := *s1 + stmtStats := execdetails.NewRuntimeStatsColl() + stmtStats.RegisterStats(1, basic) + stmtStats.RegisterStats(1, s1) + stmtStats.RegisterStats(1, &s2) + stats := stmtStats.GetRootStats(1) + expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 2ms}" + c.Assert(stats.String(), Equals, expect) + // Test for idempotence. + c.Assert(stats.String(), Equals, expect) +} + func (s *testSuite) createSelectStreaming(batch, totalRows int, c *C) (*streamResult, []*types.FieldType) { request, err := (&RequestBuilder{}).SetKeyRanges(nil). SetDAGRequest(&tipb.DAGRequest{}). diff --git a/distsql/select_result.go b/distsql/select_result.go index 91fac037a2ec8..61853c7e2687f 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -264,11 +264,8 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv return } if r.stats == nil { - stmtCtx := r.ctx.GetSessionVars().StmtCtx id := r.rootPlanID - originRuntimeStats := stmtCtx.RuntimeStatsColl.GetRootStats(id) r.stats = &selectResultRuntimeStats{ - RuntimeStats: originRuntimeStats, backoffSleep: make(map[string]time.Duration), rpcStat: tikv.NewRegionRequestRuntimeStats(), } @@ -326,7 +323,6 @@ type CopRuntimeStats interface { } type selectResultRuntimeStats struct { - execdetails.RuntimeStats copRespTime []time.Duration procKeys []int64 backoffSleep map[string]time.Duration @@ -351,14 +347,48 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntim } } +func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats { + newRs := selectResultRuntimeStats{ + copRespTime: make([]time.Duration, 0, len(s.copRespTime)), + procKeys: make([]int64, 0, len(s.procKeys)), + backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)), + rpcStat: tikv.NewRegionRequestRuntimeStats(), + CoprCacheHitNum: s.CoprCacheHitNum, + } + newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...) + newRs.procKeys = append(newRs.procKeys, s.procKeys...) + for k, v := range s.backoffSleep { + newRs.backoffSleep[k] += v + } + newRs.totalProcessTime += s.totalProcessTime + newRs.totalWaitTime += s.totalWaitTime + for k, v := range s.rpcStat.Stats { + newRs.rpcStat.Stats[k] = v + } + return &newRs +} + +func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) { + other, ok := rs.(*selectResultRuntimeStats) + if !ok { + return + } + s.copRespTime = append(s.copRespTime, other.copRespTime...) + s.procKeys = append(s.procKeys, other.procKeys...) + + for k, v := range other.backoffSleep { + s.backoffSleep[k] += v + } + s.totalProcessTime += other.totalProcessTime + s.totalWaitTime += other.totalWaitTime + s.rpcStat.Merge(other.rpcStat) + s.CoprCacheHitNum += other.CoprCacheHitNum +} + func (s *selectResultRuntimeStats) String() string { buf := bytes.NewBuffer(nil) - if s.RuntimeStats != nil { - buf.WriteString(s.RuntimeStats.String()) - } if len(s.copRespTime) > 0 { size := len(s.copRespTime) - buf.WriteString(", ") if size == 1 { buf.WriteString(fmt.Sprintf("cop_task: {num: 1, max:%v, proc_keys: %v", s.copRespTime[0], s.procKeys[0])) } else { @@ -394,18 +424,18 @@ func (s *selectResultRuntimeStats) String() string { } } } - } - copRPC := s.rpcStat.Stats[tikvrpc.CmdCop] - if copRPC != nil && copRPC.Count > 0 { - delete(s.rpcStat.Stats, tikvrpc.CmdCop) - buf.WriteString(", rpc_num: ") - buf.WriteString(strconv.FormatInt(copRPC.Count, 10)) - buf.WriteString(", rpc_time: ") - buf.WriteString(time.Duration(copRPC.Consume).String()) + copRPC := s.rpcStat.Stats[tikvrpc.CmdCop] + if copRPC != nil && copRPC.Count > 0 { + delete(s.rpcStat.Stats, tikvrpc.CmdCop) + buf.WriteString(", rpc_num: ") + buf.WriteString(strconv.FormatInt(copRPC.Count, 10)) + buf.WriteString(", rpc_time: ") + buf.WriteString(time.Duration(copRPC.Consume).String()) + } buf.WriteString(fmt.Sprintf(", copr_cache_hit_ratio: %v", strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(len(s.copRespTime)), 'f', 2, 64))) + buf.WriteString("}") } - buf.WriteString("}") rpcStatsStr := s.rpcStat.String() if len(rpcStatsStr) > 0 { @@ -427,3 +457,8 @@ func (s *selectResultRuntimeStats) String() string { } return buf.String() } + +// Tp implements the RuntimeStats interface. +func (s *selectResultRuntimeStats) Tp() int { + return execdetails.TpSelectResultRuntimeStats +} diff --git a/executor/adapter.go b/executor/adapter.go index 329198cc58aef..38a1cc1e3eba0 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -804,11 +804,9 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, succ bool, hasMoreResults boo execDetail := sessVars.StmtCtx.GetExecDetails() // Attach commit/lockKeys runtime stats to executor runtime stats. if (execDetail.CommitDetail != nil || execDetail.LockKeysDetail != nil) && sessVars.StmtCtx.RuntimeStatsColl != nil { - stats := sessVars.StmtCtx.RuntimeStatsColl.GetRootStats(a.Plan.ID()) statsWithCommit := &execdetails.RuntimeStatsWithCommit{ - RuntimeStats: stats, - Commit: execDetail.CommitDetail, - LockKeys: execDetail.LockKeysDetail, + Commit: execDetail.CommitDetail, + LockKeys: execDetail.LockKeysDetail, } sessVars.StmtCtx.RuntimeStatsColl.RegisterStats(a.Plan.ID(), statsWithCommit) } diff --git a/executor/aggregate.go b/executor/aggregate.go index e1311636c0326..bcbb9495470d7 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -246,7 +246,7 @@ func (e *HashAggExec) Close() error { } partialConcurrencyInfo := execdetails.NewConcurrencyInfo("PartialConcurrency", partialConcurrency) finalConcurrencyInfo := execdetails.NewConcurrencyInfo("FinalConcurrency", finalConcurrency) - runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} + runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{} runtimeStats.SetConcurrencyInfo(partialConcurrencyInfo, finalConcurrencyInfo) e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) } diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 98df02e091349..934dfdb1be892 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -145,7 +145,6 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { if e.runtimeStats != nil { snapshotStats := &tikv.SnapshotRuntimeStats{} e.stats = &runtimeStatsWithSnapshot{ - BasicRuntimeStats: e.runtimeStats, SnapshotRuntimeStats: snapshotStats, } snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index f85bb60d34835..03a1c468ac6cd 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -148,9 +148,7 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error { e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.innerPtrBytes = make([][]byte, 0, 8) if e.runtimeStats != nil { - e.stats = &indexLookUpJoinRuntimeStats{ - BasicRuntimeStats: e.runtimeStats, - } + e.stats = &indexLookUpJoinRuntimeStats{} e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } e.startWorkers(ctx) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 30d19474d0f25..ba8d42d70d8a1 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -178,9 +178,7 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) e.innerPtrBytes = make([][]byte, 0, 8) if e.runtimeStats != nil { - e.stats = &indexLookUpJoinRuntimeStats{ - BasicRuntimeStats: e.runtimeStats, - } + e.stats = &indexLookUpJoinRuntimeStats{} e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } e.startWorkers(ctx) @@ -738,7 +736,6 @@ func (e *IndexLookUpJoin) Close() error { } type indexLookUpJoinRuntimeStats struct { - *execdetails.BasicRuntimeStats concurrency int probe int64 innerWorker innerWorkerRuntimeStats @@ -755,11 +752,8 @@ type innerWorkerRuntimeStats struct { func (e *indexLookUpJoinRuntimeStats) String() string { buf := bytes.NewBuffer(make([]byte, 0, 16)) - if e.BasicRuntimeStats != nil { - buf.WriteString(e.BasicRuntimeStats.String()) - } if e.innerWorker.totalTime > 0 { - buf.WriteString(", inner:{total:") + buf.WriteString("inner:{total:") buf.WriteString(time.Duration(e.innerWorker.totalTime).String()) buf.WriteString(", concurrency:") if e.concurrency > 0 { @@ -787,3 +781,30 @@ func (e *indexLookUpJoinRuntimeStats) String() string { } return buf.String() } + +func (e *indexLookUpJoinRuntimeStats) Clone() execdetails.RuntimeStats { + return &indexLookUpJoinRuntimeStats{ + concurrency: e.concurrency, + probe: e.probe, + innerWorker: e.innerWorker, + } +} + +func (e *indexLookUpJoinRuntimeStats) Merge(rs execdetails.RuntimeStats) { + tmp, ok := rs.(*indexLookUpJoinRuntimeStats) + if !ok { + return + } + e.probe += tmp.probe + e.innerWorker.totalTime += tmp.innerWorker.totalTime + e.innerWorker.task += tmp.innerWorker.task + e.innerWorker.construct += tmp.innerWorker.construct + e.innerWorker.fetch += tmp.innerWorker.fetch + e.innerWorker.build += tmp.innerWorker.build + e.innerWorker.join += tmp.innerWorker.join +} + +// Tp implements the RuntimeStats interface. +func (e *indexLookUpJoinRuntimeStats) Tp() int { + return execdetails.TpIndexLookUpJoinRuntimeStats +} diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index 07e925872d35e..2683b94d48dae 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -736,7 +736,7 @@ func (e *IndexLookUpMergeJoin) Close() error { e.memTracker = nil if e.runtimeStats != nil { concurrency := cap(e.resultCh) - runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} + runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{} runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) } diff --git a/executor/insert_common.go b/executor/insert_common.go index 7fb3d51597602..0df7ba25558a8 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -937,7 +937,6 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool { if e.stats == nil { snapshotStats := &tikv.SnapshotRuntimeStats{} e.stats = &runtimeStatsWithSnapshot{ - BasicRuntimeStats: e.runtimeStats, SnapshotRuntimeStats: snapshotStats, } e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) diff --git a/executor/join.go b/executor/join.go index 9c34ad30c5988..03ea7fb5b9f46 100644 --- a/executor/join.go +++ b/executor/join.go @@ -175,8 +175,7 @@ func (e *HashJoinExec) Open(ctx context.Context) error { } if e.runtimeStats != nil { e.stats = &hashJoinRuntimeStats{ - BasicRuntimeStats: e.runtimeStats, - concurrent: cap(e.joiners), + concurrent: cap(e.joiners), } e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } @@ -932,11 +931,9 @@ type joinRuntimeStats struct { hashStat hashStatistic } -func newJoinRuntimeStats(basic *execdetails.BasicRuntimeStats) *joinRuntimeStats { +func newJoinRuntimeStats() *joinRuntimeStats { stats := &joinRuntimeStats{ - RuntimeStatsWithConcurrencyInfo: &execdetails.RuntimeStatsWithConcurrencyInfo{ - BasicRuntimeStats: basic, - }, + RuntimeStatsWithConcurrencyInfo: &execdetails.RuntimeStatsWithConcurrencyInfo{}, } return stats } @@ -973,9 +970,12 @@ func (e *joinRuntimeStats) String() string { return buf.String() } -type hashJoinRuntimeStats struct { - *execdetails.BasicRuntimeStats +// Tp implements the RuntimeStats interface. +func (e *joinRuntimeStats) Tp() int { + return execdetails.TpJoinRuntimeStats +} +type hashJoinRuntimeStats struct { fetchAndBuildHashTable time.Duration hashStat hashStatistic fetchAndProbe int64 @@ -996,11 +996,15 @@ func (e *hashJoinRuntimeStats) setMaxFetchAndProbeTime(t int64) { } } +// Tp implements the RuntimeStats interface. +func (e *hashJoinRuntimeStats) Tp() int { + return execdetails.TpHashJoinRuntimeStats +} + func (e *hashJoinRuntimeStats) String() string { buf := bytes.NewBuffer(make([]byte, 0, 128)) - buf.WriteString(e.BasicRuntimeStats.String()) if e.fetchAndBuildHashTable > 0 { - buf.WriteString(", build_hash_table:{total:") + buf.WriteString("build_hash_table:{total:") buf.WriteString(e.fetchAndBuildHashTable.String()) buf.WriteString(", fetch:") buf.WriteString((e.fetchAndBuildHashTable - e.hashStat.buildTableElapse).String()) @@ -1027,3 +1031,29 @@ func (e *hashJoinRuntimeStats) String() string { } return buf.String() } + +func (e *hashJoinRuntimeStats) Clone() execdetails.RuntimeStats { + return &hashJoinRuntimeStats{ + fetchAndBuildHashTable: e.fetchAndBuildHashTable, + hashStat: e.hashStat, + fetchAndProbe: e.fetchAndProbe, + probe: e.probe, + concurrent: e.concurrent, + maxFetchAndProbe: e.maxFetchAndProbe, + } +} + +func (e *hashJoinRuntimeStats) Merge(rs execdetails.RuntimeStats) { + tmp, ok := rs.(*hashJoinRuntimeStats) + if !ok { + return + } + e.fetchAndBuildHashTable += tmp.fetchAndBuildHashTable + e.hashStat.buildTableElapse += tmp.hashStat.buildTableElapse + e.hashStat.probeCollision += tmp.hashStat.probeCollision + e.fetchAndProbe += tmp.fetchAndProbe + e.probe += tmp.probe + if e.maxFetchAndProbe < tmp.maxFetchAndProbe { + e.maxFetchAndProbe = tmp.maxFetchAndProbe + } +} diff --git a/executor/join_pkg_test.go b/executor/join_pkg_test.go index fe139444fd2dc..2939ed0ab5763 100644 --- a/executor/join_pkg_test.go +++ b/executor/join_pkg_test.go @@ -15,6 +15,7 @@ package executor import ( "context" + "time" . "github.com/pingcap/check" "github.com/pingcap/failpoint" @@ -104,3 +105,40 @@ func (s *pkgTestSuite) TestJoinExec(c *C) { } } } + +func (s *pkgTestSuite) TestHashJoinRuntimeStats(c *C) { + stats := &hashJoinRuntimeStats{ + fetchAndBuildHashTable: 2 * time.Second, + hashStat: hashStatistic{ + probeCollision: 1, + buildTableElapse: time.Millisecond * 100, + }, + fetchAndProbe: int64(5 * time.Second), + probe: int64(4 * time.Second), + concurrent: 4, + maxFetchAndProbe: int64(2 * time.Second), + } + c.Assert(stats.String(), Equals, "build_hash_table:{total:2s, fetch:1.9s, build:100ms}, probe:{concurrency:4, total:5s, max:2s, probe:4s, fetch:1s, probe_collision:1}") + c.Assert(stats.String(), Equals, stats.Clone().String()) + stats.Merge(stats.Clone()) + c.Assert(stats.String(), Equals, "build_hash_table:{total:4s, fetch:3.8s, build:200ms}, probe:{concurrency:4, total:10s, max:2s, probe:8s, fetch:2s, probe_collision:2}") +} + +func (s *pkgTestSuite) TestIndexJoinRuntimeStats(c *C) { + stats := indexLookUpJoinRuntimeStats{ + concurrency: 5, + probe: int64(time.Second), + innerWorker: innerWorkerRuntimeStats{ + totalTime: int64(time.Second * 5), + task: 16, + construct: int64(100 * time.Millisecond), + fetch: int64(300 * time.Millisecond), + build: int64(250 * time.Millisecond), + join: int64(150 * time.Millisecond), + }, + } + c.Assert(stats.String(), Equals, "inner:{total:5s, concurrency:5, task:16, construct:100ms, fetch:300ms, build:250ms, join:150ms}, probe:1s") + c.Assert(stats.String(), Equals, stats.Clone().String()) + stats.Merge(stats.Clone()) + c.Assert(stats.String(), Equals, "inner:{total:10s, concurrency:5, task:32, construct:200ms, fetch:600ms, build:500ms, join:300ms}, probe:2s") +} diff --git a/executor/point_get.go b/executor/point_get.go index 44c5eb64924e6..46756260acf97 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -158,7 +158,6 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { if e.runtimeStats != nil { snapshotStats := &tikv.SnapshotRuntimeStats{} e.stats = &runtimeStatsWithSnapshot{ - BasicRuntimeStats: e.runtimeStats, SnapshotRuntimeStats: snapshotStats, } e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) @@ -421,23 +420,43 @@ func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo { } type runtimeStatsWithSnapshot struct { - *execdetails.BasicRuntimeStats *tikv.SnapshotRuntimeStats } func (e *runtimeStatsWithSnapshot) String() string { - var basic, rpcStatsStr string - if e.BasicRuntimeStats != nil { - basic = e.BasicRuntimeStats.String() - } if e.SnapshotRuntimeStats != nil { - rpcStatsStr = e.SnapshotRuntimeStats.String() + return e.SnapshotRuntimeStats.String() } - if rpcStatsStr == "" { - return basic + return "" +} + +// Clone implements the RuntimeStats interface. +func (e *runtimeStatsWithSnapshot) Clone() execdetails.RuntimeStats { + newRs := &runtimeStatsWithSnapshot{} + if e.SnapshotRuntimeStats != nil { + snapshotStats := e.SnapshotRuntimeStats.Clone() + newRs.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats) } - if basic == "" { - return rpcStatsStr + return newRs +} + +// Merge implements the RuntimeStats interface. +func (e *runtimeStatsWithSnapshot) Merge(other execdetails.RuntimeStats) { + tmp, ok := other.(*runtimeStatsWithSnapshot) + if !ok { + return + } + if tmp.SnapshotRuntimeStats != nil { + if e.SnapshotRuntimeStats == nil { + snapshotStats := tmp.SnapshotRuntimeStats.Clone() + e.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats) + return + } + e.SnapshotRuntimeStats.Merge(tmp.SnapshotRuntimeStats) } - return basic + ", " + rpcStatsStr +} + +// Tp implements the RuntimeStats interface. +func (e *runtimeStatsWithSnapshot) Tp() int { + return execdetails.TpRuntimeStatsWithSnapshot } diff --git a/executor/projection.go b/executor/projection.go index bfe30382c7d6d..c36d435ab231a 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -309,9 +309,7 @@ func (e *ProjectionExec) Close() error { } } if e.baseExecutor.runtimeStats != nil { - runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{ - BasicRuntimeStats: e.runtimeStats, - } + runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{} if e.isUnparallelExec() { runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0)) } else { diff --git a/executor/shuffle.go b/executor/shuffle.go index c1f39dd7574da..7de58e7139880 100644 --- a/executor/shuffle.go +++ b/executor/shuffle.go @@ -145,7 +145,7 @@ func (e *ShuffleExec) Close() error { e.executed = false if e.runtimeStats != nil { - runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} + runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{} runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("ShuffleConcurrency", e.concurrency)) e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) } diff --git a/executor/update.go b/executor/update.go index ca6eaf7cf6267..b912eea050251 100644 --- a/executor/update.go +++ b/executor/update.go @@ -299,7 +299,6 @@ func (e *UpdateExec) collectRuntimeStatsEnabled() bool { if e.stats == nil { snapshotStats := &tikv.SnapshotRuntimeStats{} e.stats = &runtimeStatsWithSnapshot{ - BasicRuntimeStats: e.runtimeStats, SnapshotRuntimeStats: snapshotStats, } e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index a8a929a2ef5de..2896a16b19843 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -630,6 +631,60 @@ type SnapshotRuntimeStats struct { backoffTimes map[backoffType]int } +// Tp implements the RuntimeStats interface. +func (rs *SnapshotRuntimeStats) Tp() int { + return execdetails.TpSnapshotRuntimeStats +} + +// Clone implements the RuntimeStats interface. +func (rs *SnapshotRuntimeStats) Clone() execdetails.RuntimeStats { + newRs := SnapshotRuntimeStats{rpcStats: NewRegionRequestRuntimeStats()} + if rs.rpcStats.Stats != nil { + for k, v := range rs.rpcStats.Stats { + newRs.rpcStats.Stats[k] = v + } + } + if len(rs.backoffSleepMS) > 0 { + newRs.backoffSleepMS = make(map[backoffType]int) + newRs.backoffTimes = make(map[backoffType]int) + for k, v := range rs.backoffSleepMS { + newRs.backoffSleepMS[k] += v + } + for k, v := range rs.backoffTimes { + newRs.backoffTimes[k] += v + } + } + return &newRs +} + +// Merge implements the RuntimeStats interface. +func (rs *SnapshotRuntimeStats) Merge(other execdetails.RuntimeStats) { + tmp, ok := other.(*SnapshotRuntimeStats) + if !ok { + return + } + if tmp.rpcStats.Stats != nil { + if rs.rpcStats.Stats == nil { + rs.rpcStats.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats, len(tmp.rpcStats.Stats)) + } + rs.rpcStats.Merge(tmp.rpcStats) + } + if len(tmp.backoffSleepMS) > 0 { + if rs.backoffSleepMS == nil { + rs.backoffSleepMS = make(map[backoffType]int) + } + if rs.backoffTimes == nil { + rs.backoffTimes = make(map[backoffType]int) + } + for k, v := range tmp.backoffSleepMS { + rs.backoffSleepMS[k] += v + } + for k, v := range tmp.backoffTimes { + rs.backoffTimes[k] += v + } + } +} + // String implements fmt.Stringer interface. func (rs *SnapshotRuntimeStats) String() string { var buf bytes.Buffer diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index c8e5e245d17ad..3da02d56185a2 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -88,6 +88,41 @@ type CommitDetails struct { TxnRetry int } +// Merge merges commit details into itself. +func (cd *CommitDetails) Merge(other *CommitDetails) { + cd.GetCommitTsTime += other.GetCommitTsTime + cd.PrewriteTime += other.PrewriteTime + cd.WaitPrewriteBinlogTime += other.WaitPrewriteBinlogTime + cd.CommitTime += other.CommitTime + cd.LocalLatchTime += other.LocalLatchTime + cd.CommitBackoffTime += other.CommitBackoffTime + cd.ResolveLockTime += other.ResolveLockTime + cd.WriteKeys += other.WriteKeys + cd.WriteSize += other.WriteSize + cd.PrewriteRegionNum += other.PrewriteRegionNum + cd.TxnRetry += other.TxnRetry + cd.Mu.BackoffTypes = append(cd.Mu.BackoffTypes, other.Mu.BackoffTypes...) +} + +// Clone returns a deep copy of itself. +func (cd *CommitDetails) Clone() *CommitDetails { + commit := &CommitDetails{ + GetCommitTsTime: cd.GetCommitTsTime, + PrewriteTime: cd.PrewriteTime, + WaitPrewriteBinlogTime: cd.WaitPrewriteBinlogTime, + CommitTime: cd.CommitTime, + LocalLatchTime: cd.LocalLatchTime, + CommitBackoffTime: cd.CommitBackoffTime, + ResolveLockTime: cd.ResolveLockTime, + WriteKeys: cd.WriteKeys, + WriteSize: cd.WriteSize, + PrewriteRegionNum: cd.PrewriteRegionNum, + TxnRetry: cd.TxnRetry, + } + commit.Mu.BackoffTypes = append([]fmt.Stringer{}, cd.Mu.BackoffTypes...) + return commit +} + // LockKeysDetails contains pessimistic lock keys detail information. type LockKeysDetails struct { TotalTime time.Duration @@ -117,6 +152,22 @@ func (ld *LockKeysDetails) Merge(lockKey *LockKeysDetails) { ld.RetryCount++ } +// Clone returns a deep copy of itself. +func (ld *LockKeysDetails) Clone() *LockKeysDetails { + lock := &LockKeysDetails{ + TotalTime: ld.TotalTime, + RegionNum: ld.RegionNum, + LockKeys: ld.LockKeys, + ResolveLockTime: ld.ResolveLockTime, + BackoffTime: ld.BackoffTime, + LockRPCTime: ld.LockRPCTime, + LockRPCCount: ld.LockRPCCount, + RetryCount: ld.RetryCount, + } + lock.Mu.BackoffTypes = append([]fmt.Stringer{}, ld.Mu.BackoffTypes...) + return lock +} + const ( // CopTimeStr represents the sum of cop-task time spend in TiDB distSQL. CopTimeStr = "Cop_time" @@ -360,10 +411,33 @@ func (crs *CopRuntimeStats) String() string { procTimes[n-1], procTimes[0], procTimes[n*4/5], procTimes[n*19/20], totalIters, totalTasks) } +const ( + // TpBasicRuntimeStats is the tp for BasicRuntimeStats. + TpBasicRuntimeStats int = iota + // TpRuntimeStatsWithCommit is the tp for RuntimeStatsWithCommit. + TpRuntimeStatsWithCommit + // TpRuntimeStatsWithConcurrencyInfo is the tp for RuntimeStatsWithConcurrencyInfo. + TpRuntimeStatsWithConcurrencyInfo + // TpSnapshotRuntimeStats is the tp for SnapshotRuntimeStats. + TpSnapshotRuntimeStats + // TpHashJoinRuntimeStats is the tp for HashJoinRuntimeStats. + TpHashJoinRuntimeStats + // TpIndexLookUpJoinRuntimeStats is the tp for IndexLookUpJoinRuntimeStats. + TpIndexLookUpJoinRuntimeStats + // TpRuntimeStatsWithSnapshot is the tp for RuntimeStatsWithSnapshot. + TpRuntimeStatsWithSnapshot + // TpJoinRuntimeStats is the tp for JoinRuntimeStats. + TpJoinRuntimeStats + // TpSelectResultRuntimeStats is the tp for SelectResultRuntimeStats. + TpSelectResultRuntimeStats +) + // RuntimeStats is used to express the executor runtime information. type RuntimeStats interface { - GetActRows() int64 String() string + Merge(RuntimeStats) + Clone() RuntimeStats + Tp() int } // BasicRuntimeStats is the basic runtime stats. @@ -376,11 +450,87 @@ type BasicRuntimeStats struct { rows int64 } -// GetActRows implements the RuntimeStats interface. +// GetActRows return total rows of BasicRuntimeStats. func (e *BasicRuntimeStats) GetActRows() int64 { return e.rows } +// Clone implements the RuntimeStats interface. +func (e *BasicRuntimeStats) Clone() RuntimeStats { + return &BasicRuntimeStats{ + loop: e.loop, + consume: e.consume, + rows: e.rows, + } +} + +// Merge implements the RuntimeStats interface. +func (e *BasicRuntimeStats) Merge(rs RuntimeStats) { + tmp, ok := rs.(*BasicRuntimeStats) + if !ok { + return + } + e.loop += tmp.loop + e.consume += tmp.consume + e.rows += tmp.rows +} + +// Tp implements the RuntimeStats interface. +func (e *BasicRuntimeStats) Tp() int { + return TpBasicRuntimeStats +} + +// RootRuntimeStats is the executor runtime stats that combine with multiple runtime stats. +type RootRuntimeStats struct { + basics []*BasicRuntimeStats + groupRss [][]RuntimeStats +} + +// GetActRows return total rows of RootRuntimeStats. +func (e *RootRuntimeStats) GetActRows() int64 { + num := int64(0) + for _, basic := range e.basics { + num += basic.GetActRows() + } + return num +} + +// String implements the RuntimeStats interface. +func (e *RootRuntimeStats) String() string { + buf := bytes.NewBuffer(make([]byte, 0, 32)) + if len(e.basics) > 0 { + if len(e.basics) == 1 { + buf.WriteString(e.basics[0].String()) + } else { + basic := e.basics[0].Clone() + for i := 1; i < len(e.basics); i++ { + basic.Merge(e.basics[i]) + } + buf.WriteString(basic.String()) + } + } + if len(e.groupRss) > 0 { + if buf.Len() > 0 { + buf.WriteString(", ") + } + for i, rss := range e.groupRss { + if i > 0 { + buf.WriteString(", ") + } + if len(rss) == 1 { + buf.WriteString(rss[0].String()) + continue + } + rs := rss[0].Clone() + for i := 1; i < len(rss); i++ { + rs.Merge(rss[i]) + } + buf.WriteString(rs.String()) + } + } + return buf.String() +} + // Record records executor's execution. func (e *BasicRuntimeStats) Record(d time.Duration, rowNum int) { atomic.AddInt32(&e.loop, 1) @@ -401,30 +551,53 @@ func (e *BasicRuntimeStats) String() string { // RuntimeStatsColl collects executors's execution info. type RuntimeStatsColl struct { mu sync.Mutex - rootStats map[int]RuntimeStats + rootStats map[int]*RootRuntimeStats copStats map[int]*CopRuntimeStats } // NewRuntimeStatsColl creates new executor collector. func NewRuntimeStatsColl() *RuntimeStatsColl { - return &RuntimeStatsColl{rootStats: make(map[int]RuntimeStats), + return &RuntimeStatsColl{rootStats: make(map[int]*RootRuntimeStats), copStats: make(map[int]*CopRuntimeStats)} } // RegisterStats register execStat for a executor. func (e *RuntimeStatsColl) RegisterStats(planID int, info RuntimeStats) { e.mu.Lock() - e.rootStats[planID] = info + stats, ok := e.rootStats[planID] + if !ok { + stats = &RootRuntimeStats{} + e.rootStats[planID] = stats + } + if basic, ok := info.(*BasicRuntimeStats); ok { + stats.basics = append(stats.basics, basic) + } else { + tp := info.Tp() + found := false + for i, rss := range stats.groupRss { + if len(rss) == 0 { + continue + } + if rss[0].Tp() == tp { + stats.groupRss[i] = append(stats.groupRss[i], info) + found = true + break + } + } + if !found { + stats.groupRss = append(stats.groupRss, []RuntimeStats{info}) + } + } e.mu.Unlock() } // GetRootStats gets execStat for a executor. -func (e *RuntimeStatsColl) GetRootStats(planID int) RuntimeStats { +func (e *RuntimeStatsColl) GetRootStats(planID int) *RootRuntimeStats { e.mu.Lock() defer e.mu.Unlock() runtimeStats, exists := e.rootStats[planID] if !exists { - runtimeStats = &BasicRuntimeStats{} + runtimeStats = &RootRuntimeStats{} e.rootStats[planID] = runtimeStats } return runtimeStats @@ -492,14 +665,17 @@ func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo { // RuntimeStatsWithConcurrencyInfo is the BasicRuntimeStats with ConcurrencyInfo. type RuntimeStatsWithConcurrencyInfo struct { - *BasicRuntimeStats - // protect concurrency sync.Mutex // executor concurrency information concurrency []*ConcurrencyInfo } +// Tp implements the RuntimeStats interface. +func (e *RuntimeStatsWithConcurrencyInfo) Tp() int { + return TpRuntimeStatsWithConcurrencyInfo +} + // SetConcurrencyInfo sets the concurrency informations. // We must clear the concurrencyInfo first when we call the SetConcurrencyInfo. // When the num <= 0, it means the exector operator is not executed parallel. @@ -512,37 +688,91 @@ func (e *RuntimeStatsWithConcurrencyInfo) SetConcurrencyInfo(infos ...*Concurren } } +// Clone implements the RuntimeStats interface. +func (e *RuntimeStatsWithConcurrencyInfo) Clone() RuntimeStats { + newRs := &RuntimeStatsWithConcurrencyInfo{ + concurrency: make([]*ConcurrencyInfo, 0, len(e.concurrency)), + } + newRs.concurrency = append(newRs.concurrency, e.concurrency...) + return newRs +} + +// String implements the RuntimeStats interface. func (e *RuntimeStatsWithConcurrencyInfo) String() string { var result string - if e.BasicRuntimeStats != nil { - result = fmt.Sprintf("time:%v, loops:%d", time.Duration(e.consume), e.loop) - } if len(e.concurrency) > 0 { - for _, concurrency := range e.concurrency { + for i, concurrency := range e.concurrency { + if i > 0 { + result += ", " + } if concurrency.concurrencyNum > 0 { - result += fmt.Sprintf(", %s:%d", concurrency.concurrencyName, concurrency.concurrencyNum) + result += fmt.Sprintf("%s:%d", concurrency.concurrencyName, concurrency.concurrencyNum) } else { - result += fmt.Sprintf(", %s:OFF", concurrency.concurrencyName) + result += fmt.Sprintf("%s:OFF", concurrency.concurrencyName) } } } return result } +// Merge implements the RuntimeStats interface. +func (e *RuntimeStatsWithConcurrencyInfo) Merge(rs RuntimeStats) { + tmp, ok := rs.(*RuntimeStatsWithConcurrencyInfo) + if !ok { + return + } + e.concurrency = append(e.concurrency, tmp.concurrency...) +} + // RuntimeStatsWithCommit is the RuntimeStats with commit detail. type RuntimeStatsWithCommit struct { - RuntimeStats Commit *CommitDetails LockKeys *LockKeysDetails } +// Tp implements the RuntimeStats interface. +func (e *RuntimeStatsWithCommit) Tp() int { + return TpRuntimeStatsWithCommit +} + +// Merge implements the RuntimeStats interface. +func (e *RuntimeStatsWithCommit) Merge(rs RuntimeStats) { + tmp, ok := rs.(*RuntimeStatsWithCommit) + if !ok { + return + } + if tmp.Commit != nil { + if e.Commit == nil { + e.Commit = &CommitDetails{} + } + e.Commit.Merge(tmp.Commit) + } + + if tmp.LockKeys != nil { + if e.LockKeys == nil { + e.LockKeys = &LockKeysDetails{} + } + e.LockKeys.Merge(tmp.LockKeys) + } +} + +// Clone implements the RuntimeStats interface. +func (e *RuntimeStatsWithCommit) Clone() RuntimeStats { + newRs := RuntimeStatsWithCommit{} + if e.Commit != nil { + newRs.Commit = e.Commit.Clone() + } + if e.LockKeys != nil { + newRs.LockKeys = e.LockKeys.Clone() + } + return &newRs +} + +// String implements the RuntimeStats interface. func (e *RuntimeStatsWithCommit) String() string { buf := bytes.NewBuffer(make([]byte, 0, 32)) - if e.RuntimeStats != nil { - buf.WriteString(e.RuntimeStats.String()) - } if e.Commit != nil { - buf.WriteString(", commit_txn: {") + buf.WriteString("commit_txn: {") if e.Commit.PrewriteTime > 0 { buf.WriteString("prewrite:") buf.WriteString(e.Commit.PrewriteTime.String()) @@ -596,7 +826,10 @@ func (e *RuntimeStatsWithCommit) String() string { buf.WriteString("}") } if e.LockKeys != nil { - buf.WriteString(", lock_keys: {") + if buf.Len() > 0 { + buf.WriteString(", ") + } + buf.WriteString("lock_keys: {") if e.LockKeys.TotalTime > 0 { buf.WriteString("time:") buf.WriteString(e.LockKeys.TotalTime.String()) diff --git a/util/execdetails/execdetails_test.go b/util/execdetails/execdetails_test.go index a16bcbc66db84..78e4f3387185f 100644 --- a/util/execdetails/execdetails_test.go +++ b/util/execdetails/execdetails_test.go @@ -121,10 +121,6 @@ func TestCopRuntimeStats(t *testing.T) { } func TestRuntimeStatsWithCommit(t *testing.T) { - basicStats := &BasicRuntimeStats{ - loop: 1, - consume: int64(time.Second), - } commitDetail := &CommitDetails{ GetCommitTsTime: time.Second, PrewriteTime: time.Second, @@ -151,10 +147,9 @@ func TestRuntimeStatsWithCommit(t *testing.T) { TxnRetry: 2, } stats := &RuntimeStatsWithCommit{ - RuntimeStats: basicStats, - Commit: commitDetail, + Commit: commitDetail, } - expect := "time:1s, loops:1, commit_txn: {prewrite:1s, get_commit_ts:1s, commit:1s, backoff: {time: 1s, type: [backoff1 backoff2]}, resolve_lock: 1s, region_num:5, write_keys:3, write_byte:66, txn_retry:2}" + expect := "commit_txn: {prewrite:1s, get_commit_ts:1s, commit:1s, backoff: {time: 1s, type: [backoff1 backoff2]}, resolve_lock: 1s, region_num:5, write_keys:3, write_byte:66, txn_retry:2}" if stats.String() != expect { t.Fatalf("%v != %v", stats.String(), expect) } @@ -183,10 +178,43 @@ func TestRuntimeStatsWithCommit(t *testing.T) { RetryCount: 2, } stats = &RuntimeStatsWithCommit{ - RuntimeStats: basicStats, - LockKeys: lockDetail, + LockKeys: lockDetail, + } + expect = "lock_keys: {time:1s, region:2, keys:10, resolve_lock:2s, backoff: {time: 3s, type: [backoff4 backoff5]}, lock_rpc:5s, rpc_count:50, retry_count:2}" + if stats.String() != expect { + t.Fatalf("%v != %v", stats.String(), expect) + } +} + +func TestRootRuntimeStats(t *testing.T) { + basic1 := &BasicRuntimeStats{} + basic2 := &BasicRuntimeStats{} + basic1.Record(time.Second, 20) + basic2.Record(time.Second*2, 30) + pid := 1 + stmtStats := NewRuntimeStatsColl() + stmtStats.RegisterStats(pid, basic1) + stmtStats.RegisterStats(pid, basic2) + concurrency := &RuntimeStatsWithConcurrencyInfo{} + concurrency.SetConcurrencyInfo(NewConcurrencyInfo("worker", 15)) + stmtStats.RegisterStats(pid, concurrency) + commitDetail := &CommitDetails{ + GetCommitTsTime: time.Second, + PrewriteTime: time.Second, + CommitTime: time.Second, + WriteKeys: 3, + WriteSize: 66, + PrewriteRegionNum: 5, + TxnRetry: 2, } - expect = "time:1s, loops:1, lock_keys: {time:1s, region:2, keys:10, resolve_lock:2s, backoff: {time: 3s, type: [backoff4 backoff5]}, lock_rpc:5s, rpc_count:50, retry_count:2}" + stmtStats.RegisterStats(pid, &RuntimeStatsWithCommit{ + Commit: commitDetail, + }) + concurrency = &RuntimeStatsWithConcurrencyInfo{} + concurrency.SetConcurrencyInfo(NewConcurrencyInfo("concurrent", 0)) + stmtStats.RegisterStats(pid, concurrency) + stats := stmtStats.GetRootStats(1) + expect := "time:3s, loops:2, worker:15, concurrent:OFF, commit_txn: {prewrite:1s, get_commit_ts:1s, commit:1s, region_num:5, write_keys:3, write_byte:66, txn_retry:2}" if stats.String() != expect { t.Fatalf("%v != %v", stats.String(), expect) }