From e30396dbe2ae4d25b47f8a9b4a52f3d595dbfec6 Mon Sep 17 00:00:00 2001 From: crazycs Date: Thu, 31 Dec 2020 17:35:15 +0800 Subject: [PATCH 1/3] cherry pick #21982 to release-4.0 Signed-off-by: ti-srebot --- executor/distsql.go | 69 ++++++++++++++++++++++++++------------- executor/distsql_test.go | 29 +++++++++++++--- executor/executor_test.go | 4 +++ 3 files changed, 75 insertions(+), 27 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index c06aa4757d47c..53b6b24118d16 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -385,7 +385,6 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { e.feedback.Invalidate() return err } - e.initRuntimeStats() err = e.open(ctx) if err != nil { e.feedback.Invalidate() @@ -398,6 +397,7 @@ func (e *IndexLookUpExecutor) open(ctx context.Context) error { // instead of in function "Open", because this "IndexLookUpExecutor" may be // constructed by a "IndexLookUpJoin" and "Open" will not be called in that // situation. + e.initRuntimeStats() e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) @@ -622,10 +622,15 @@ func (e *IndexLookUpExecutor) initRuntimeStats() { if e.runtimeStats != nil { if e.stats == nil { e.stats = &IndexLookUpRunTimeStats{ +<<<<<<< HEAD IndexScan: 0, TableRowScan: 0, TableTaskNum: 0, Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency, +======= + indexScanBasicStats: &execdetails.BasicRuntimeStats{}, + Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency(), +>>>>>>> 56ef0ab2a... executor: improve the runtime stats of index lookup reader (#21982) } e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } @@ -693,16 +698,15 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes chk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize) } idxID := w.idxLookup.getIndexPlanRootID() - var basicStats *execdetails.BasicRuntimeStats if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { - if idxID != w.idxLookup.id { - basicStats = &execdetails.BasicRuntimeStats{} - w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(idxID, basicStats) + if idxID != w.idxLookup.id && w.idxLookup.stats != nil { + w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(idxID, w.idxLookup.stats.indexScanBasicStats) } } for { startTime := time.Now() handles, retChunk, scannedKeys, err := w.extractTaskHandles(ctx, chk, result, count) + finishFetch := time.Now() if err != nil { doneCh := make(chan error, 1) doneCh <- err @@ -716,9 +720,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes return count, nil } task := w.buildTableTask(handles, retChunk) - if w.idxLookup.stats != nil { - atomic.AddInt64(&w.idxLookup.stats.IndexScan, int64(time.Since(startTime))) - } + finishBuild := time.Now() select { case <-ctx.Done(): return count, nil @@ -727,8 +729,10 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes case w.workCh <- task: w.resultCh <- task } - if basicStats != nil { - basicStats.Record(time.Since(startTime), chk.NumRows()) + if w.idxLookup.stats != nil { + atomic.AddInt64(&w.idxLookup.stats.FetchHandle, int64(finishFetch.Sub(startTime))) + atomic.AddInt64(&w.idxLookup.stats.TaskWait, int64(time.Since(finishBuild))) + atomic.AddInt64(&w.idxLookup.stats.FetchHandleTotal, int64(time.Since(startTime))) } } } @@ -751,10 +755,14 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, } } chk.SetRequiredRows(requiredRows, w.maxChunkSize) + startTime := time.Now() err = errors.Trace(idxResult.Next(ctx, chk)) if err != nil { return handles, nil, scannedKeys, err } + if w.idxLookup.stats != nil { + w.idxLookup.stats.indexScanBasicStats.Record(time.Since(startTime), chk.NumRows()) + } if chk.NumRows() == 0 { return handles, retChk, scannedKeys, nil } @@ -873,39 +881,52 @@ func (w *tableWorker) pickAndExecTask(ctx context.Context) { // IndexLookUpRunTimeStats record the indexlookup runtime stat type IndexLookUpRunTimeStats struct { - IndexScan int64 - TableRowScan int64 - TableTaskNum int64 - Concurrency int + // indexScanBasicStats uses to record basic runtime stats for index scan. + indexScanBasicStats *execdetails.BasicRuntimeStats + FetchHandleTotal int64 + FetchHandle int64 + TaskWait int64 + TableRowScan int64 + TableTaskNum int64 + Concurrency int } func (e *IndexLookUpRunTimeStats) String() string { var buf bytes.Buffer - indexScan := atomic.LoadInt64(&e.IndexScan) + fetchHandle := atomic.LoadInt64(&e.FetchHandleTotal) + indexScan := atomic.LoadInt64(&e.FetchHandle) + taskWait := atomic.LoadInt64(&e.TaskWait) tableScan := atomic.LoadInt64(&e.TableRowScan) tableTaskNum := atomic.LoadInt64(&e.TableTaskNum) concurrency := e.Concurrency if indexScan != 0 { +<<<<<<< HEAD buf.WriteString(fmt.Sprintf("index_task:%s", time.Duration(indexScan))) +======= + buf.WriteString(fmt.Sprintf("index_task: {total_time: %s, fetch_handle: %s, build: %s, wait: %s}", + execdetails.FormatDuration(time.Duration(fetchHandle)), + execdetails.FormatDuration(time.Duration(indexScan)), + execdetails.FormatDuration(time.Duration(fetchHandle-indexScan-taskWait)), + execdetails.FormatDuration(time.Duration(taskWait)))) +>>>>>>> 56ef0ab2a... executor: improve the runtime stats of index lookup reader (#21982) } if tableScan != 0 { if buf.Len() > 0 { buf.WriteByte(',') } +<<<<<<< HEAD buf.WriteString(fmt.Sprintf(" table_task:{num:%d, concurrency:%d, time:%s}", tableTaskNum, concurrency, time.Duration(tableScan))) +======= + buf.WriteString(fmt.Sprintf(" table_task: {total_time: %v, num: %d, concurrency: %d}", execdetails.FormatDuration(time.Duration(tableScan)), tableTaskNum, concurrency)) +>>>>>>> 56ef0ab2a... executor: improve the runtime stats of index lookup reader (#21982) } return buf.String() } // Clone implements the RuntimeStats interface. func (e *IndexLookUpRunTimeStats) Clone() execdetails.RuntimeStats { - newRs := &IndexLookUpRunTimeStats{ - IndexScan: e.IndexScan, - TableRowScan: e.TableRowScan, - TableTaskNum: e.TableTaskNum, - Concurrency: e.Concurrency, - } - return newRs + newRs := *e + return &newRs } // Merge implements the RuntimeStats interface. @@ -914,7 +935,9 @@ func (e *IndexLookUpRunTimeStats) Merge(other execdetails.RuntimeStats) { if !ok { return } - e.IndexScan += tmp.IndexScan + e.FetchHandleTotal += tmp.FetchHandleTotal + e.FetchHandle += tmp.FetchHandle + e.TaskWait += tmp.TaskWait e.TableRowScan += tmp.TableRowScan e.TableTaskNum += tmp.TableTaskNum e.Concurrency += tmp.Concurrency diff --git a/executor/distsql_test.go b/executor/distsql_test.go index 9775f9980c3c1..3c0ca0a36e99d 100644 --- a/executor/distsql_test.go +++ b/executor/distsql_test.go @@ -256,13 +256,34 @@ func (s *testSuite3) TestPushLimitDownIndexLookUpReader(c *C) { func (s *testSuite3) TestIndexLookUpStats(c *C) { stats := &executor.IndexLookUpRunTimeStats{ - IndexScan: int64(2 * time.Second), - TableRowScan: int64(2 * time.Second), - TableTaskNum: 2, - Concurrency: 1, + FetchHandleTotal: int64(5 * time.Second), + FetchHandle: int64(2 * time.Second), + TaskWait: int64(2 * time.Second), + TableRowScan: int64(2 * time.Second), + TableTaskNum: 2, + Concurrency: 1, } +<<<<<<< HEAD c.Assert(stats.String(), Equals, "index_task:2s, table_task:{num:2, concurrency:1, time:2s}") c.Assert(stats.String(), Equals, stats.Clone().String()) stats.Merge(stats.Clone()) c.Assert(stats.String(), Equals, "index_task:4s, table_task:{num:4, concurrency:2, time:4s}") +======= + c.Assert(stats.String(), Equals, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}, table_task: {total_time: 2s, num: 2, concurrency: 1}") + c.Assert(stats.String(), Equals, stats.Clone().String()) + stats.Merge(stats.Clone()) + c.Assert(stats.String(), Equals, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 2}") +} + +func (s *testSuite3) TestIndexLookUpGetResultChunk(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists tbl") + tk.MustExec("create table tbl(a int, b int, c int, key idx_a(a))") + for i := 0; i < 101; i++ { + tk.MustExec(fmt.Sprintf("insert into tbl values(%d,%d,%d)", i, i, i)) + } + tk.MustQuery("select * from tbl use index(idx_a) where a > 99 order by a asc limit 1").Check(testkit.Rows("100 100 100")) + tk.MustQuery("select * from tbl use index(idx_a) where a > 10 order by a asc limit 4,1").Check(testkit.Rows("15 15 15")) +>>>>>>> 56ef0ab2a... executor: improve the runtime stats of index lookup reader (#21982) } diff --git a/executor/executor_test.go b/executor/executor_test.go index 3c8b5d86217c1..7ec23f9891613 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -6222,7 +6222,11 @@ func (s *testSerialSuite1) TestIndexlookupRuntimeStats(c *C) { rows := tk.MustQuery(sql).Rows() c.Assert(len(rows), Equals, 3) explain := fmt.Sprintf("%v", rows[0]) +<<<<<<< HEAD c.Assert(explain, Matches, ".*time:.*loops:.*index_task:.*table_task:{num.*concurrency.*time.*}.*") +======= + c.Assert(explain, Matches, ".*time:.*loops:.*index_task:.*table_task: {total_time.*num.*concurrency.*}.*") +>>>>>>> 56ef0ab2a... executor: improve the runtime stats of index lookup reader (#21982) indexExplain := fmt.Sprintf("%v", rows[1]) tableExplain := fmt.Sprintf("%v", rows[2]) c.Assert(indexExplain, Matches, ".*time:.*loops:.*cop_task:.*") From d1773850830b75b5c82ffadfee5d150f298d2c10 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 26 Jan 2021 16:54:37 +0800 Subject: [PATCH 2/3] Revert "cherry pick #21982 to release-4.0" This reverts commit e30396dbe2ae4d25b47f8a9b4a52f3d595dbfec6. --- executor/distsql.go | 69 +++++++++++++-------------------------- executor/distsql_test.go | 29 +++------------- executor/executor_test.go | 4 --- 3 files changed, 27 insertions(+), 75 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index 53b6b24118d16..c06aa4757d47c 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -385,6 +385,7 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { e.feedback.Invalidate() return err } + e.initRuntimeStats() err = e.open(ctx) if err != nil { e.feedback.Invalidate() @@ -397,7 +398,6 @@ func (e *IndexLookUpExecutor) open(ctx context.Context) error { // instead of in function "Open", because this "IndexLookUpExecutor" may be // constructed by a "IndexLookUpJoin" and "Open" will not be called in that // situation. - e.initRuntimeStats() e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) @@ -622,15 +622,10 @@ func (e *IndexLookUpExecutor) initRuntimeStats() { if e.runtimeStats != nil { if e.stats == nil { e.stats = &IndexLookUpRunTimeStats{ -<<<<<<< HEAD IndexScan: 0, TableRowScan: 0, TableTaskNum: 0, Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency, -======= - indexScanBasicStats: &execdetails.BasicRuntimeStats{}, - Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency(), ->>>>>>> 56ef0ab2a... executor: improve the runtime stats of index lookup reader (#21982) } e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } @@ -698,15 +693,16 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes chk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize) } idxID := w.idxLookup.getIndexPlanRootID() + var basicStats *execdetails.BasicRuntimeStats if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { - if idxID != w.idxLookup.id && w.idxLookup.stats != nil { - w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(idxID, w.idxLookup.stats.indexScanBasicStats) + if idxID != w.idxLookup.id { + basicStats = &execdetails.BasicRuntimeStats{} + w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(idxID, basicStats) } } for { startTime := time.Now() handles, retChunk, scannedKeys, err := w.extractTaskHandles(ctx, chk, result, count) - finishFetch := time.Now() if err != nil { doneCh := make(chan error, 1) doneCh <- err @@ -720,7 +716,9 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes return count, nil } task := w.buildTableTask(handles, retChunk) - finishBuild := time.Now() + if w.idxLookup.stats != nil { + atomic.AddInt64(&w.idxLookup.stats.IndexScan, int64(time.Since(startTime))) + } select { case <-ctx.Done(): return count, nil @@ -729,10 +727,8 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes case w.workCh <- task: w.resultCh <- task } - if w.idxLookup.stats != nil { - atomic.AddInt64(&w.idxLookup.stats.FetchHandle, int64(finishFetch.Sub(startTime))) - atomic.AddInt64(&w.idxLookup.stats.TaskWait, int64(time.Since(finishBuild))) - atomic.AddInt64(&w.idxLookup.stats.FetchHandleTotal, int64(time.Since(startTime))) + if basicStats != nil { + basicStats.Record(time.Since(startTime), chk.NumRows()) } } } @@ -755,14 +751,10 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, } } chk.SetRequiredRows(requiredRows, w.maxChunkSize) - startTime := time.Now() err = errors.Trace(idxResult.Next(ctx, chk)) if err != nil { return handles, nil, scannedKeys, err } - if w.idxLookup.stats != nil { - w.idxLookup.stats.indexScanBasicStats.Record(time.Since(startTime), chk.NumRows()) - } if chk.NumRows() == 0 { return handles, retChk, scannedKeys, nil } @@ -881,52 +873,39 @@ func (w *tableWorker) pickAndExecTask(ctx context.Context) { // IndexLookUpRunTimeStats record the indexlookup runtime stat type IndexLookUpRunTimeStats struct { - // indexScanBasicStats uses to record basic runtime stats for index scan. - indexScanBasicStats *execdetails.BasicRuntimeStats - FetchHandleTotal int64 - FetchHandle int64 - TaskWait int64 - TableRowScan int64 - TableTaskNum int64 - Concurrency int + IndexScan int64 + TableRowScan int64 + TableTaskNum int64 + Concurrency int } func (e *IndexLookUpRunTimeStats) String() string { var buf bytes.Buffer - fetchHandle := atomic.LoadInt64(&e.FetchHandleTotal) - indexScan := atomic.LoadInt64(&e.FetchHandle) - taskWait := atomic.LoadInt64(&e.TaskWait) + indexScan := atomic.LoadInt64(&e.IndexScan) tableScan := atomic.LoadInt64(&e.TableRowScan) tableTaskNum := atomic.LoadInt64(&e.TableTaskNum) concurrency := e.Concurrency if indexScan != 0 { -<<<<<<< HEAD buf.WriteString(fmt.Sprintf("index_task:%s", time.Duration(indexScan))) -======= - buf.WriteString(fmt.Sprintf("index_task: {total_time: %s, fetch_handle: %s, build: %s, wait: %s}", - execdetails.FormatDuration(time.Duration(fetchHandle)), - execdetails.FormatDuration(time.Duration(indexScan)), - execdetails.FormatDuration(time.Duration(fetchHandle-indexScan-taskWait)), - execdetails.FormatDuration(time.Duration(taskWait)))) ->>>>>>> 56ef0ab2a... executor: improve the runtime stats of index lookup reader (#21982) } if tableScan != 0 { if buf.Len() > 0 { buf.WriteByte(',') } -<<<<<<< HEAD buf.WriteString(fmt.Sprintf(" table_task:{num:%d, concurrency:%d, time:%s}", tableTaskNum, concurrency, time.Duration(tableScan))) -======= - buf.WriteString(fmt.Sprintf(" table_task: {total_time: %v, num: %d, concurrency: %d}", execdetails.FormatDuration(time.Duration(tableScan)), tableTaskNum, concurrency)) ->>>>>>> 56ef0ab2a... executor: improve the runtime stats of index lookup reader (#21982) } return buf.String() } // Clone implements the RuntimeStats interface. func (e *IndexLookUpRunTimeStats) Clone() execdetails.RuntimeStats { - newRs := *e - return &newRs + newRs := &IndexLookUpRunTimeStats{ + IndexScan: e.IndexScan, + TableRowScan: e.TableRowScan, + TableTaskNum: e.TableTaskNum, + Concurrency: e.Concurrency, + } + return newRs } // Merge implements the RuntimeStats interface. @@ -935,9 +914,7 @@ func (e *IndexLookUpRunTimeStats) Merge(other execdetails.RuntimeStats) { if !ok { return } - e.FetchHandleTotal += tmp.FetchHandleTotal - e.FetchHandle += tmp.FetchHandle - e.TaskWait += tmp.TaskWait + e.IndexScan += tmp.IndexScan e.TableRowScan += tmp.TableRowScan e.TableTaskNum += tmp.TableTaskNum e.Concurrency += tmp.Concurrency diff --git a/executor/distsql_test.go b/executor/distsql_test.go index 3c0ca0a36e99d..9775f9980c3c1 100644 --- a/executor/distsql_test.go +++ b/executor/distsql_test.go @@ -256,34 +256,13 @@ func (s *testSuite3) TestPushLimitDownIndexLookUpReader(c *C) { func (s *testSuite3) TestIndexLookUpStats(c *C) { stats := &executor.IndexLookUpRunTimeStats{ - FetchHandleTotal: int64(5 * time.Second), - FetchHandle: int64(2 * time.Second), - TaskWait: int64(2 * time.Second), - TableRowScan: int64(2 * time.Second), - TableTaskNum: 2, - Concurrency: 1, + IndexScan: int64(2 * time.Second), + TableRowScan: int64(2 * time.Second), + TableTaskNum: 2, + Concurrency: 1, } -<<<<<<< HEAD c.Assert(stats.String(), Equals, "index_task:2s, table_task:{num:2, concurrency:1, time:2s}") c.Assert(stats.String(), Equals, stats.Clone().String()) stats.Merge(stats.Clone()) c.Assert(stats.String(), Equals, "index_task:4s, table_task:{num:4, concurrency:2, time:4s}") -======= - c.Assert(stats.String(), Equals, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}, table_task: {total_time: 2s, num: 2, concurrency: 1}") - c.Assert(stats.String(), Equals, stats.Clone().String()) - stats.Merge(stats.Clone()) - c.Assert(stats.String(), Equals, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 2}") -} - -func (s *testSuite3) TestIndexLookUpGetResultChunk(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("drop table if exists tbl") - tk.MustExec("create table tbl(a int, b int, c int, key idx_a(a))") - for i := 0; i < 101; i++ { - tk.MustExec(fmt.Sprintf("insert into tbl values(%d,%d,%d)", i, i, i)) - } - tk.MustQuery("select * from tbl use index(idx_a) where a > 99 order by a asc limit 1").Check(testkit.Rows("100 100 100")) - tk.MustQuery("select * from tbl use index(idx_a) where a > 10 order by a asc limit 4,1").Check(testkit.Rows("15 15 15")) ->>>>>>> 56ef0ab2a... executor: improve the runtime stats of index lookup reader (#21982) } diff --git a/executor/executor_test.go b/executor/executor_test.go index 7ec23f9891613..3c8b5d86217c1 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -6222,11 +6222,7 @@ func (s *testSerialSuite1) TestIndexlookupRuntimeStats(c *C) { rows := tk.MustQuery(sql).Rows() c.Assert(len(rows), Equals, 3) explain := fmt.Sprintf("%v", rows[0]) -<<<<<<< HEAD c.Assert(explain, Matches, ".*time:.*loops:.*index_task:.*table_task:{num.*concurrency.*time.*}.*") -======= - c.Assert(explain, Matches, ".*time:.*loops:.*index_task:.*table_task: {total_time.*num.*concurrency.*}.*") ->>>>>>> 56ef0ab2a... executor: improve the runtime stats of index lookup reader (#21982) indexExplain := fmt.Sprintf("%v", rows[1]) tableExplain := fmt.Sprintf("%v", rows[2]) c.Assert(indexExplain, Matches, ".*time:.*loops:.*cop_task:.*") From 269e2d9ad67881b4b0af1fc16299cf171565d772 Mon Sep 17 00:00:00 2001 From: crazycs Date: Thu, 31 Dec 2020 17:35:15 +0800 Subject: [PATCH 3/3] executor: improve the runtime stats of index lookup reader (#21982) Signed-off-by: crazycs520 --- executor/distsql.go | 66 ++++++++++++++++++++++----------------- executor/distsql_test.go | 14 +++++---- executor/executor_test.go | 2 +- 3 files changed, 46 insertions(+), 36 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index f06d3f0ae2609..af8ede2792127 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -385,7 +385,6 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { e.feedback.Invalidate() return err } - e.initRuntimeStats() err = e.open(ctx) if err != nil { e.feedback.Invalidate() @@ -398,6 +397,7 @@ func (e *IndexLookUpExecutor) open(ctx context.Context) error { // instead of in function "Open", because this "IndexLookUpExecutor" may be // constructed by a "IndexLookUpJoin" and "Open" will not be called in that // situation. + e.initRuntimeStats() e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) @@ -622,10 +622,8 @@ func (e *IndexLookUpExecutor) initRuntimeStats() { if e.runtimeStats != nil { if e.stats == nil { e.stats = &IndexLookUpRunTimeStats{ - IndexScan: 0, - TableRowScan: 0, - TableTaskNum: 0, - Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency, + indexScanBasicStats: &execdetails.BasicRuntimeStats{}, + Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency, } e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } @@ -693,16 +691,15 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes chk = chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.idxLookup.maxChunkSize) } idxID := w.idxLookup.getIndexPlanRootID() - var basicStats *execdetails.BasicRuntimeStats if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { - if idxID != w.idxLookup.id { - basicStats = &execdetails.BasicRuntimeStats{} - w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(idxID, basicStats) + if idxID != w.idxLookup.id && w.idxLookup.stats != nil { + w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(idxID, w.idxLookup.stats.indexScanBasicStats) } } for { startTime := time.Now() handles, retChunk, scannedKeys, err := w.extractTaskHandles(ctx, chk, result, count) + finishFetch := time.Now() if err != nil { doneCh := make(chan error, 1) doneCh <- err @@ -716,9 +713,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes return count, nil } task := w.buildTableTask(handles, retChunk) - if w.idxLookup.stats != nil { - atomic.AddInt64(&w.idxLookup.stats.IndexScan, int64(time.Since(startTime))) - } + finishBuild := time.Now() select { case <-ctx.Done(): return count, nil @@ -727,8 +722,10 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes case w.workCh <- task: w.resultCh <- task } - if basicStats != nil { - basicStats.Record(time.Since(startTime), chk.NumRows()) + if w.idxLookup.stats != nil { + atomic.AddInt64(&w.idxLookup.stats.FetchHandle, int64(finishFetch.Sub(startTime))) + atomic.AddInt64(&w.idxLookup.stats.TaskWait, int64(time.Since(finishBuild))) + atomic.AddInt64(&w.idxLookup.stats.FetchHandleTotal, int64(time.Since(startTime))) } } } @@ -751,10 +748,14 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, } } chk.SetRequiredRows(requiredRows, w.maxChunkSize) + startTime := time.Now() err = errors.Trace(idxResult.Next(ctx, chk)) if err != nil { return handles, nil, scannedKeys, err } + if w.idxLookup.stats != nil { + w.idxLookup.stats.indexScanBasicStats.Record(time.Since(startTime), chk.NumRows()) + } if chk.NumRows() == 0 { return handles, retChk, scannedKeys, nil } @@ -873,39 +874,44 @@ func (w *tableWorker) pickAndExecTask(ctx context.Context) { // IndexLookUpRunTimeStats record the indexlookup runtime stat type IndexLookUpRunTimeStats struct { - IndexScan int64 - TableRowScan int64 - TableTaskNum int64 - Concurrency int + // indexScanBasicStats uses to record basic runtime stats for index scan. + indexScanBasicStats *execdetails.BasicRuntimeStats + FetchHandleTotal int64 + FetchHandle int64 + TaskWait int64 + TableRowScan int64 + TableTaskNum int64 + Concurrency int } func (e *IndexLookUpRunTimeStats) String() string { var buf bytes.Buffer - indexScan := atomic.LoadInt64(&e.IndexScan) + fetchHandle := atomic.LoadInt64(&e.FetchHandleTotal) + indexScan := atomic.LoadInt64(&e.FetchHandle) + taskWait := atomic.LoadInt64(&e.TaskWait) tableScan := atomic.LoadInt64(&e.TableRowScan) tableTaskNum := atomic.LoadInt64(&e.TableTaskNum) concurrency := e.Concurrency if indexScan != 0 { - buf.WriteString(fmt.Sprintf("index_task: %s", execdetails.FormatDuration(time.Duration(indexScan)))) + buf.WriteString(fmt.Sprintf("index_task: {total_time: %s, fetch_handle: %s, build: %s, wait: %s}", + execdetails.FormatDuration(time.Duration(fetchHandle)), + execdetails.FormatDuration(time.Duration(indexScan)), + execdetails.FormatDuration(time.Duration(fetchHandle-indexScan-taskWait)), + execdetails.FormatDuration(time.Duration(taskWait)))) } if tableScan != 0 { if buf.Len() > 0 { buf.WriteByte(',') } - buf.WriteString(fmt.Sprintf(" table_task: {num: %d, concurrency: %d, time: %s}", tableTaskNum, concurrency, execdetails.FormatDuration(time.Duration(tableScan)))) + buf.WriteString(fmt.Sprintf(" table_task: {total_time: %v, num: %d, concurrency: %d}", execdetails.FormatDuration(time.Duration(tableScan)), tableTaskNum, concurrency)) } return buf.String() } // Clone implements the RuntimeStats interface. func (e *IndexLookUpRunTimeStats) Clone() execdetails.RuntimeStats { - newRs := &IndexLookUpRunTimeStats{ - IndexScan: e.IndexScan, - TableRowScan: e.TableRowScan, - TableTaskNum: e.TableTaskNum, - Concurrency: e.Concurrency, - } - return newRs + newRs := *e + return &newRs } // Merge implements the RuntimeStats interface. @@ -914,7 +920,9 @@ func (e *IndexLookUpRunTimeStats) Merge(other execdetails.RuntimeStats) { if !ok { return } - e.IndexScan += tmp.IndexScan + e.FetchHandleTotal += tmp.FetchHandleTotal + e.FetchHandle += tmp.FetchHandle + e.TaskWait += tmp.TaskWait e.TableRowScan += tmp.TableRowScan e.TableTaskNum += tmp.TableTaskNum e.Concurrency += tmp.Concurrency diff --git a/executor/distsql_test.go b/executor/distsql_test.go index 5c17826d8754e..55affe68329dc 100644 --- a/executor/distsql_test.go +++ b/executor/distsql_test.go @@ -256,13 +256,15 @@ func (s *testSuite3) TestPushLimitDownIndexLookUpReader(c *C) { func (s *testSuite3) TestIndexLookUpStats(c *C) { stats := &executor.IndexLookUpRunTimeStats{ - IndexScan: int64(2 * time.Second), - TableRowScan: int64(2 * time.Second), - TableTaskNum: 2, - Concurrency: 1, + FetchHandleTotal: int64(5 * time.Second), + FetchHandle: int64(2 * time.Second), + TaskWait: int64(2 * time.Second), + TableRowScan: int64(2 * time.Second), + TableTaskNum: 2, + Concurrency: 1, } - c.Assert(stats.String(), Equals, "index_task: 2s, table_task: {num: 2, concurrency: 1, time: 2s}") + c.Assert(stats.String(), Equals, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}, table_task: {total_time: 2s, num: 2, concurrency: 1}") c.Assert(stats.String(), Equals, stats.Clone().String()) stats.Merge(stats.Clone()) - c.Assert(stats.String(), Equals, "index_task: 4s, table_task: {num: 4, concurrency: 2, time: 4s}") + c.Assert(stats.String(), Equals, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 2}") } diff --git a/executor/executor_test.go b/executor/executor_test.go index 1148c96f36699..ffd53553a923f 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -6222,7 +6222,7 @@ func (s *testSerialSuite1) TestIndexlookupRuntimeStats(c *C) { rows := tk.MustQuery(sql).Rows() c.Assert(len(rows), Equals, 3) explain := fmt.Sprintf("%v", rows[0]) - c.Assert(explain, Matches, ".*time:.*loops:.*index_task:.*table_task: {num.*concurrency.*time.*}.*") + c.Assert(explain, Matches, ".*time:.*loops:.*index_task:.*table_task: {total_time.*num.*concurrency.*}.*") indexExplain := fmt.Sprintf("%v", rows[1]) tableExplain := fmt.Sprintf("%v", rows[2]) c.Assert(indexExplain, Matches, ".*time:.*loops:.*cop_task:.*")