Skip to content

Commit

Permalink
executor: add runtime information for DML statement in explain analyze (
Browse files Browse the repository at this point in the history
#19106) (#21066)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
ti-srebot authored Nov 16, 2020
1 parent c05d221 commit 1337d34
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 8 deletions.
13 changes: 8 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,19 +384,22 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic
}()

toCheck := e
isExplainAnalyze := false
if explain, ok := e.(*ExplainExec); ok {
if explain.analyzeExec != nil {
toCheck = explain.analyzeExec
if analyze := explain.getAnalyzeExecToExecutedNoDelay(); analyze != nil {
toCheck = analyze
isExplainAnalyze = true
}
}

// If the executor doesn't return any result to the client, we execute it without delay.
if toCheck.Schema().Len() == 0 {
handled = !isExplainAnalyze
if isPessimistic {
return true, nil, a.handlePessimisticDML(ctx, e)
return handled, nil, a.handlePessimisticDML(ctx, toCheck)
}
r, err := a.handleNoDelayExecutor(ctx, e)
return true, r, err
r, err := a.handleNoDelayExecutor(ctx, toCheck)
return handled, r, err
} else if proj, ok := toCheck.(*ProjectionExec); ok && proj.calculateNoDelay {
// Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example:
// the Projection has two expressions and two columns in the schema, but we should
Expand Down
17 changes: 16 additions & 1 deletion executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ExplainExec struct {

explain *core.Explain
analyzeExec Executor
executed bool
rows [][]string
cursor int
}
Expand Down Expand Up @@ -79,7 +80,8 @@ func (e *ExplainExec) generateExplainInfo(ctx context.Context) (rows [][]string,
closed = true
}
}()
if e.analyzeExec != nil {
if e.analyzeExec != nil && !e.executed {
e.executed = true
chk := newFirstChunk(e.analyzeExec)
var nextErr, closeErr error
for {
Expand Down Expand Up @@ -111,3 +113,16 @@ func (e *ExplainExec) generateExplainInfo(ctx context.Context) (rows [][]string,
}
return e.explain.Rows, nil
}

// getAnalyzeExecToExecutedNoDelay gets the analyze DML executor to execute in handleNoDelay function.
// For explain analyze insert/update/delete statement, the analyze executor should be executed in handleNoDelay
// function and then commit transaction if needed.
// Otherwise, in autocommit transaction, the table record change of analyze executor(insert/update/delete...)
// will not be committed.
func (e *ExplainExec) getAnalyzeExecToExecutedNoDelay() Executor {
if e.analyzeExec != nil && !e.executed && e.analyzeExec.Schema().Len() == 0 {
e.executed = true
return e.analyzeExec
}
return nil
}
22 changes: 22 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,28 @@ func (s *testIntegrationSerialSuite) TestExplainAnalyzePointGet(c *C) {
checkExplain("BatchGet")
}

func (s *testIntegrationSerialSuite) TestExplainAnalyzeDML(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec(" create table t (a int, b int, unique index (a));")
tk.MustExec("insert into t values (1,1)")

res := tk.MustQuery("explain analyze select * from t where a=1;")
checkExplain := func(rpc string) {
resBuff := bytes.NewBufferString("")
for _, row := range res.Rows() {
fmt.Fprintf(resBuff, "%s\n", row)
}
explain := resBuff.String()
c.Assert(strings.Contains(explain, rpc+":{num_rpc:"), IsTrue, Commentf("%s", explain))
c.Assert(strings.Contains(explain, "total_time:"), IsTrue, Commentf("%s", explain))
}
checkExplain("Get")
res = tk.MustQuery("explain analyze insert ignore into t values (1,1),(2,2),(3,3),(4,4);")
checkExplain("BatchGet")
}

func (s *testIntegrationSuite) TestPartialBatchPointGet(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
3 changes: 1 addition & 2 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
} else {
logutil.BgLogger().Error("get txn failed", zap.Error(err1))
}
err = finishStmt(ctx, se, err, s)
}

if rs != nil {
Expand All @@ -335,8 +336,6 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
}, err
}

err = finishStmt(ctx, se, err, s)

if se.hasQuerySpecial() {
// The special query will be handled later in handleQuerySpecial,
// then should call the ExecStmt.FinishExecuteStmt to finish this statement.
Expand Down

0 comments on commit 1337d34

Please sign in to comment.