diff --git a/executor/aggregate.go b/executor/aggregate.go index bcd1a7abe2cd3..96b6a10fd472b 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/types/json" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/set" @@ -243,8 +244,9 @@ func (e *HashAggExec) Close() error { partialConcurrency = cap(e.partialWorkers) finalConcurrency = cap(e.finalWorkers) } - e.runtimeStats.SetConcurrencyInfo("PartialConcurrency", partialConcurrency) - e.runtimeStats.SetConcurrencyInfo("FinalConcurrency", finalConcurrency) + partialConcurrencyInfo := execdetails.NewConcurrencyInfo("PartialConcurrency", partialConcurrency) + finalConcurrencyInfo := execdetails.NewConcurrencyInfo("FinalConcurrency", finalConcurrency) + e.runtimeStats.SetConcurrencyInfo(partialConcurrencyInfo, finalConcurrencyInfo) } return e.baseExecutor.Close() } diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index bccd28e0f4511..9056faa5d06d8 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" @@ -295,7 +296,7 @@ func (e *IndexNestedLoopHashJoin) Close() error { } if e.runtimeStats != nil { concurrency := cap(e.joinChkResourceCh) - e.runtimeStats.SetConcurrencyInfo("Concurrency", concurrency) + e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) } for i := range e.joinChkResourceCh { close(e.joinChkResourceCh[i]) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index ec7650ccf5ffb..14890aa716dc9 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mvmap" @@ -681,7 +682,7 @@ func (e *IndexLookUpJoin) Close() error { e.memTracker = nil if e.runtimeStats != nil { concurrency := cap(e.resultCh) - e.runtimeStats.SetConcurrencyInfo("Concurrency", concurrency) + e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) } return e.baseExecutor.Close() } diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index 62e32eccaa665..03d15e37fae9c 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tidb/util/stringutil" @@ -686,7 +687,7 @@ func (e *IndexLookUpMergeJoin) Close() error { e.memTracker = nil if e.runtimeStats != nil { concurrency := cap(e.resultCh) - e.runtimeStats.SetConcurrencyInfo("Concurrency", concurrency) + e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) } return e.baseExecutor.Close() } diff --git a/executor/join.go b/executor/join.go index d8dcebff11c98..eeda4eec329fa 100644 --- a/executor/join.go +++ b/executor/join.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/disk" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/stringutil" ) @@ -138,8 +139,10 @@ func (e *HashJoinExec) Close() error { if e.runtimeStats != nil { concurrency := cap(e.joiners) - e.runtimeStats.SetConcurrencyInfo("Concurrency", concurrency) - e.runtimeStats.SetAdditionalInfo(e.rowContainer.stat.String()) + e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + if e.rowContainer != nil { + e.runtimeStats.SetAdditionalInfo(e.rowContainer.stat.String()) + } } err := e.baseExecutor.Close() return err diff --git a/executor/projection.go b/executor/projection.go index 7c855451e2b84..920d82f8d2e52 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" @@ -308,9 +309,9 @@ func (e *ProjectionExec) Close() error { } if e.runtimeStats != nil { if e.isUnparallelExec() { - e.runtimeStats.SetConcurrencyInfo("Concurrency", 0) + e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0)) } else { - e.runtimeStats.SetConcurrencyInfo("Concurrency", int(e.numWorkers)) + e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", int(e.numWorkers))) } } return e.baseExecutor.Close() diff --git a/executor/shuffle.go b/executor/shuffle.go index ff55544a9c22d..39d4d6d7078af 100644 --- a/executor/shuffle.go +++ b/executor/shuffle.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/spaolacci/murmur3" "go.uber.org/zap" @@ -144,7 +145,7 @@ func (e *ShuffleExec) Close() error { e.executed = false if e.runtimeStats != nil { - e.runtimeStats.SetConcurrencyInfo("ShuffleConcurrency", e.concurrency) + e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("ShuffleConcurrency", e.concurrency)) } err := e.dataSource.Close() diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index f126b9821a858..c2a516e3dbcec 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -360,12 +360,17 @@ type RuntimeStatsColl struct { readerStats map[string]*ReaderRuntimeStats } -// concurrencyInfo is used to save the concurrency information of the executor operator -type concurrencyInfo struct { +// ConcurrencyInfo is used to save the concurrency information of the executor operator +type ConcurrencyInfo struct { concurrencyName string concurrencyNum int } +// NewConcurrencyInfo creates new executor's concurrencyInfo. +func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo { + return &ConcurrencyInfo{name, num} +} + // RuntimeStats collects one executor's execution info. type RuntimeStats struct { // executor's Next() called times. @@ -378,7 +383,7 @@ type RuntimeStats struct { // protect concurrency mu sync.Mutex // executor concurrency information - concurrency []concurrencyInfo + concurrency []*ConcurrencyInfo // additional information for executors additionalInfo string @@ -466,12 +471,16 @@ func (e *RuntimeStats) SetRowNum(rowNum int64) { atomic.StoreInt64(&e.rows, rowNum) } -// SetConcurrencyInfo sets the concurrency information. +// SetConcurrencyInfo sets the concurrency informations. +// We must clear the concurrencyInfo first when we call the SetConcurrencyInfo. // When the num <= 0, it means the exector operator is not executed parallel. -func (e *RuntimeStats) SetConcurrencyInfo(name string, num int) { +func (e *RuntimeStats) SetConcurrencyInfo(infos ...*ConcurrencyInfo) { e.mu.Lock() defer e.mu.Unlock() - e.concurrency = append(e.concurrency, concurrencyInfo{concurrencyName: name, concurrencyNum: num}) + e.concurrency = e.concurrency[:0] + for _, info := range infos { + e.concurrency = append(e.concurrency, info) + } } // SetAdditionalInfo sets the additional information.