Skip to content

Commit

Permalink
*: fix cop task runtime information is wrong in the concurrent execut…
Browse files Browse the repository at this point in the history
…or (#19849) (#19947)
  • Loading branch information
ti-srebot authored Oct 19, 2020
1 parent cae843b commit 34b70e1
Show file tree
Hide file tree
Showing 18 changed files with 569 additions and 94 deletions.
25 changes: 25 additions & 0 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tipb/go-tipb"
)
Expand Down Expand Up @@ -156,6 +158,29 @@ func (s *testSuite) TestSelectWithRuntimeStats(c *C) {
c.Assert(err, IsNil)
}

func (s *testSuite) TestSelectResultRuntimeStats(c *C) {
basic := &execdetails.BasicRuntimeStats{}
basic.Record(time.Second, 20)
s1 := &selectResultRuntimeStats{
copRespTime: []time.Duration{time.Second, time.Millisecond},
procKeys: []int64{100, 200},
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
}
s2 := *s1
stmtStats := execdetails.NewRuntimeStatsColl()
stmtStats.RegisterStats(1, basic)
stmtStats.RegisterStats(1, s1)
stmtStats.RegisterStats(1, &s2)
stats := stmtStats.GetRootStats(1)
expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 2ms}"
c.Assert(stats.String(), Equals, expect)
// Test for idempotence.
c.Assert(stats.String(), Equals, expect)
}

func (s *testSuite) createSelectStreaming(batch, totalRows int, c *C) (*streamResult, []*types.FieldType) {
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetDAGRequest(&tipb.DAGRequest{}).
Expand Down
69 changes: 52 additions & 17 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,8 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv
return
}
if r.stats == nil {
stmtCtx := r.ctx.GetSessionVars().StmtCtx
id := r.rootPlanID
originRuntimeStats := stmtCtx.RuntimeStatsColl.GetRootStats(id)
r.stats = &selectResultRuntimeStats{
RuntimeStats: originRuntimeStats,
backoffSleep: make(map[string]time.Duration),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
}
Expand Down Expand Up @@ -326,7 +323,6 @@ type CopRuntimeStats interface {
}

type selectResultRuntimeStats struct {
execdetails.RuntimeStats
copRespTime []time.Duration
procKeys []int64
backoffSleep map[string]time.Duration
Expand All @@ -351,14 +347,48 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntim
}
}

func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
newRs := selectResultRuntimeStats{
copRespTime: make([]time.Duration, 0, len(s.copRespTime)),
procKeys: make([]int64, 0, len(s.procKeys)),
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
CoprCacheHitNum: s.CoprCacheHitNum,
}
newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...)
newRs.procKeys = append(newRs.procKeys, s.procKeys...)
for k, v := range s.backoffSleep {
newRs.backoffSleep[k] += v
}
newRs.totalProcessTime += s.totalProcessTime
newRs.totalWaitTime += s.totalWaitTime
for k, v := range s.rpcStat.Stats {
newRs.rpcStat.Stats[k] = v
}
return &newRs
}

func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) {
other, ok := rs.(*selectResultRuntimeStats)
if !ok {
return
}
s.copRespTime = append(s.copRespTime, other.copRespTime...)
s.procKeys = append(s.procKeys, other.procKeys...)

for k, v := range other.backoffSleep {
s.backoffSleep[k] += v
}
s.totalProcessTime += other.totalProcessTime
s.totalWaitTime += other.totalWaitTime
s.rpcStat.Merge(other.rpcStat)
s.CoprCacheHitNum += other.CoprCacheHitNum
}

func (s *selectResultRuntimeStats) String() string {
buf := bytes.NewBuffer(nil)
if s.RuntimeStats != nil {
buf.WriteString(s.RuntimeStats.String())
}
if len(s.copRespTime) > 0 {
size := len(s.copRespTime)
buf.WriteString(", ")
if size == 1 {
buf.WriteString(fmt.Sprintf("cop_task: {num: 1, max:%v, proc_keys: %v", s.copRespTime[0], s.procKeys[0]))
} else {
Expand Down Expand Up @@ -394,18 +424,18 @@ func (s *selectResultRuntimeStats) String() string {
}
}
}
}
copRPC := s.rpcStat.Stats[tikvrpc.CmdCop]
if copRPC != nil && copRPC.Count > 0 {
delete(s.rpcStat.Stats, tikvrpc.CmdCop)
buf.WriteString(", rpc_num: ")
buf.WriteString(strconv.FormatInt(copRPC.Count, 10))
buf.WriteString(", rpc_time: ")
buf.WriteString(time.Duration(copRPC.Consume).String())
copRPC := s.rpcStat.Stats[tikvrpc.CmdCop]
if copRPC != nil && copRPC.Count > 0 {
delete(s.rpcStat.Stats, tikvrpc.CmdCop)
buf.WriteString(", rpc_num: ")
buf.WriteString(strconv.FormatInt(copRPC.Count, 10))
buf.WriteString(", rpc_time: ")
buf.WriteString(time.Duration(copRPC.Consume).String())
}
buf.WriteString(fmt.Sprintf(", copr_cache_hit_ratio: %v",
strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(len(s.copRespTime)), 'f', 2, 64)))
buf.WriteString("}")
}
buf.WriteString("}")

rpcStatsStr := s.rpcStat.String()
if len(rpcStatsStr) > 0 {
Expand All @@ -427,3 +457,8 @@ func (s *selectResultRuntimeStats) String() string {
}
return buf.String()
}

// Tp implements the RuntimeStats interface.
func (s *selectResultRuntimeStats) Tp() int {
return execdetails.TpSelectResultRuntimeStats
}
6 changes: 2 additions & 4 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,11 +804,9 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, succ bool, hasMoreResults boo
execDetail := sessVars.StmtCtx.GetExecDetails()
// Attach commit/lockKeys runtime stats to executor runtime stats.
if (execDetail.CommitDetail != nil || execDetail.LockKeysDetail != nil) && sessVars.StmtCtx.RuntimeStatsColl != nil {
stats := sessVars.StmtCtx.RuntimeStatsColl.GetRootStats(a.Plan.ID())
statsWithCommit := &execdetails.RuntimeStatsWithCommit{
RuntimeStats: stats,
Commit: execDetail.CommitDetail,
LockKeys: execDetail.LockKeysDetail,
Commit: execDetail.CommitDetail,
LockKeys: execDetail.LockKeysDetail,
}
sessVars.StmtCtx.RuntimeStatsColl.RegisterStats(a.Plan.ID(), statsWithCommit)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (e *HashAggExec) Close() error {
}
partialConcurrencyInfo := execdetails.NewConcurrencyInfo("PartialConcurrency", partialConcurrency)
finalConcurrencyInfo := execdetails.NewConcurrencyInfo("FinalConcurrency", finalConcurrency)
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{}
runtimeStats.SetConcurrencyInfo(partialConcurrencyInfo, finalConcurrencyInfo)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
Expand Down
1 change: 0 additions & 1 deletion executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
Expand Down
4 changes: 1 addition & 3 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.innerPtrBytes = make([][]byte, 0, 8)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
}
e.stats = &indexLookUpJoinRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.startWorkers(ctx)
Expand Down
37 changes: 29 additions & 8 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,7 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.innerPtrBytes = make([][]byte, 0, 8)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
}
e.stats = &indexLookUpJoinRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.startWorkers(ctx)
Expand Down Expand Up @@ -738,7 +736,6 @@ func (e *IndexLookUpJoin) Close() error {
}

type indexLookUpJoinRuntimeStats struct {
*execdetails.BasicRuntimeStats
concurrency int
probe int64
innerWorker innerWorkerRuntimeStats
Expand All @@ -755,11 +752,8 @@ type innerWorkerRuntimeStats struct {

func (e *indexLookUpJoinRuntimeStats) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 16))
if e.BasicRuntimeStats != nil {
buf.WriteString(e.BasicRuntimeStats.String())
}
if e.innerWorker.totalTime > 0 {
buf.WriteString(", inner:{total:")
buf.WriteString("inner:{total:")
buf.WriteString(time.Duration(e.innerWorker.totalTime).String())
buf.WriteString(", concurrency:")
if e.concurrency > 0 {
Expand Down Expand Up @@ -787,3 +781,30 @@ func (e *indexLookUpJoinRuntimeStats) String() string {
}
return buf.String()
}

func (e *indexLookUpJoinRuntimeStats) Clone() execdetails.RuntimeStats {
return &indexLookUpJoinRuntimeStats{
concurrency: e.concurrency,
probe: e.probe,
innerWorker: e.innerWorker,
}
}

func (e *indexLookUpJoinRuntimeStats) Merge(rs execdetails.RuntimeStats) {
tmp, ok := rs.(*indexLookUpJoinRuntimeStats)
if !ok {
return
}
e.probe += tmp.probe
e.innerWorker.totalTime += tmp.innerWorker.totalTime
e.innerWorker.task += tmp.innerWorker.task
e.innerWorker.construct += tmp.innerWorker.construct
e.innerWorker.fetch += tmp.innerWorker.fetch
e.innerWorker.build += tmp.innerWorker.build
e.innerWorker.join += tmp.innerWorker.join
}

// Tp implements the RuntimeStats interface.
func (e *indexLookUpJoinRuntimeStats) Tp() int {
return execdetails.TpIndexLookUpJoinRuntimeStats
}
2 changes: 1 addition & 1 deletion executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func (e *IndexLookUpMergeJoin) Close() error {
e.memTracker = nil
if e.runtimeStats != nil {
concurrency := cap(e.resultCh)
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
Expand Down
1 change: 0 additions & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,6 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {
if e.stats == nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
Expand Down
50 changes: 40 additions & 10 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
}
if e.runtimeStats != nil {
e.stats = &hashJoinRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
concurrent: cap(e.joiners),
concurrent: cap(e.joiners),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
Expand Down Expand Up @@ -932,11 +931,9 @@ type joinRuntimeStats struct {
hashStat hashStatistic
}

func newJoinRuntimeStats(basic *execdetails.BasicRuntimeStats) *joinRuntimeStats {
func newJoinRuntimeStats() *joinRuntimeStats {
stats := &joinRuntimeStats{
RuntimeStatsWithConcurrencyInfo: &execdetails.RuntimeStatsWithConcurrencyInfo{
BasicRuntimeStats: basic,
},
RuntimeStatsWithConcurrencyInfo: &execdetails.RuntimeStatsWithConcurrencyInfo{},
}
return stats
}
Expand Down Expand Up @@ -973,9 +970,12 @@ func (e *joinRuntimeStats) String() string {
return buf.String()
}

type hashJoinRuntimeStats struct {
*execdetails.BasicRuntimeStats
// Tp implements the RuntimeStats interface.
func (e *joinRuntimeStats) Tp() int {
return execdetails.TpJoinRuntimeStats
}

type hashJoinRuntimeStats struct {
fetchAndBuildHashTable time.Duration
hashStat hashStatistic
fetchAndProbe int64
Expand All @@ -996,11 +996,15 @@ func (e *hashJoinRuntimeStats) setMaxFetchAndProbeTime(t int64) {
}
}

// Tp implements the RuntimeStats interface.
func (e *hashJoinRuntimeStats) Tp() int {
return execdetails.TpHashJoinRuntimeStats
}

func (e *hashJoinRuntimeStats) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 128))
buf.WriteString(e.BasicRuntimeStats.String())
if e.fetchAndBuildHashTable > 0 {
buf.WriteString(", build_hash_table:{total:")
buf.WriteString("build_hash_table:{total:")
buf.WriteString(e.fetchAndBuildHashTable.String())
buf.WriteString(", fetch:")
buf.WriteString((e.fetchAndBuildHashTable - e.hashStat.buildTableElapse).String())
Expand All @@ -1027,3 +1031,29 @@ func (e *hashJoinRuntimeStats) String() string {
}
return buf.String()
}

func (e *hashJoinRuntimeStats) Clone() execdetails.RuntimeStats {
return &hashJoinRuntimeStats{
fetchAndBuildHashTable: e.fetchAndBuildHashTable,
hashStat: e.hashStat,
fetchAndProbe: e.fetchAndProbe,
probe: e.probe,
concurrent: e.concurrent,
maxFetchAndProbe: e.maxFetchAndProbe,
}
}

func (e *hashJoinRuntimeStats) Merge(rs execdetails.RuntimeStats) {
tmp, ok := rs.(*hashJoinRuntimeStats)
if !ok {
return
}
e.fetchAndBuildHashTable += tmp.fetchAndBuildHashTable
e.hashStat.buildTableElapse += tmp.hashStat.buildTableElapse
e.hashStat.probeCollision += tmp.hashStat.probeCollision
e.fetchAndProbe += tmp.fetchAndProbe
e.probe += tmp.probe
if e.maxFetchAndProbe < tmp.maxFetchAndProbe {
e.maxFetchAndProbe = tmp.maxFetchAndProbe
}
}
Loading

0 comments on commit 34b70e1

Please sign in to comment.