Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add runtime information for DML statement in explain analyze (#19106) #21066

Merged
merged 3 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,19 +384,22 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic
}()

toCheck := e
isExplainAnalyze := false
if explain, ok := e.(*ExplainExec); ok {
if explain.analyzeExec != nil {
toCheck = explain.analyzeExec
if analyze := explain.getAnalyzeExecToExecutedNoDelay(); analyze != nil {
toCheck = analyze
isExplainAnalyze = true
}
}

// If the executor doesn't return any result to the client, we execute it without delay.
if toCheck.Schema().Len() == 0 {
handled = !isExplainAnalyze
if isPessimistic {
return true, nil, a.handlePessimisticDML(ctx, e)
return handled, nil, a.handlePessimisticDML(ctx, toCheck)
}
r, err := a.handleNoDelayExecutor(ctx, e)
return true, r, err
r, err := a.handleNoDelayExecutor(ctx, toCheck)
return handled, r, err
} else if proj, ok := toCheck.(*ProjectionExec); ok && proj.calculateNoDelay {
// Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example:
// the Projection has two expressions and two columns in the schema, but we should
Expand Down
17 changes: 16 additions & 1 deletion executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ExplainExec struct {

explain *core.Explain
analyzeExec Executor
executed bool
rows [][]string
cursor int
}
Expand Down Expand Up @@ -79,7 +80,8 @@ func (e *ExplainExec) generateExplainInfo(ctx context.Context) (rows [][]string,
closed = true
}
}()
if e.analyzeExec != nil {
if e.analyzeExec != nil && !e.executed {
e.executed = true
chk := newFirstChunk(e.analyzeExec)
var nextErr, closeErr error
for {
Expand Down Expand Up @@ -111,3 +113,16 @@ func (e *ExplainExec) generateExplainInfo(ctx context.Context) (rows [][]string,
}
return e.explain.Rows, nil
}

// getAnalyzeExecToExecutedNoDelay gets the analyze DML executor to execute in handleNoDelay function.
// For explain analyze insert/update/delete statement, the analyze executor should be executed in handleNoDelay
// function and then commit transaction if needed.
// Otherwise, in autocommit transaction, the table record change of analyze executor(insert/update/delete...)
// will not be committed.
func (e *ExplainExec) getAnalyzeExecToExecutedNoDelay() Executor {
if e.analyzeExec != nil && !e.executed && e.analyzeExec.Schema().Len() == 0 {
e.executed = true
return e.analyzeExec
}
return nil
}
22 changes: 22 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,28 @@ func (s *testIntegrationSerialSuite) TestExplainAnalyzePointGet(c *C) {
checkExplain("BatchGet")
}

func (s *testIntegrationSerialSuite) TestExplainAnalyzeDML(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec(" create table t (a int, b int, unique index (a));")
tk.MustExec("insert into t values (1,1)")

res := tk.MustQuery("explain analyze select * from t where a=1;")
checkExplain := func(rpc string) {
resBuff := bytes.NewBufferString("")
for _, row := range res.Rows() {
fmt.Fprintf(resBuff, "%s\n", row)
}
explain := resBuff.String()
c.Assert(strings.Contains(explain, rpc+":{num_rpc:"), IsTrue, Commentf("%s", explain))
c.Assert(strings.Contains(explain, "total_time:"), IsTrue, Commentf("%s", explain))
}
checkExplain("Get")
res = tk.MustQuery("explain analyze insert ignore into t values (1,1),(2,2),(3,3),(4,4);")
checkExplain("BatchGet")
}

func (s *testIntegrationSuite) TestPartialBatchPointGet(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
3 changes: 1 addition & 2 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
} else {
logutil.BgLogger().Error("get txn failed", zap.Error(err1))
}
err = finishStmt(ctx, se, err, s)
}

if rs != nil {
Expand All @@ -335,8 +336,6 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
}, err
}

err = finishStmt(ctx, se, err, s)

if se.hasQuerySpecial() {
// The special query will be handled later in handleQuerySpecial,
// then should call the ExecStmt.FinishExecuteStmt to finish this statement.
Expand Down