Skip to content

Commit

Permalink
cherry pick pingcap#19106 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
jianyilyu authored and ti-srebot committed Nov 16, 2020
1 parent c05d221 commit 6206b0c
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 6 deletions.
13 changes: 8 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,19 +384,22 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic
}()

toCheck := e
isExplainAnalyze := false
if explain, ok := e.(*ExplainExec); ok {
if explain.analyzeExec != nil {
toCheck = explain.analyzeExec
if analyze := explain.getAnalyzeExecToExecutedNoDelay(); analyze != nil {
toCheck = analyze
isExplainAnalyze = true
}
}

// If the executor doesn't return any result to the client, we execute it without delay.
if toCheck.Schema().Len() == 0 {
handled = !isExplainAnalyze
if isPessimistic {
return true, nil, a.handlePessimisticDML(ctx, e)
return handled, nil, a.handlePessimisticDML(ctx, toCheck)
}
r, err := a.handleNoDelayExecutor(ctx, e)
return true, r, err
r, err := a.handleNoDelayExecutor(ctx, toCheck)
return handled, r, err
} else if proj, ok := toCheck.(*ProjectionExec); ok && proj.calculateNoDelay {
// Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example:
// the Projection has two expressions and two columns in the schema, but we should
Expand Down
17 changes: 16 additions & 1 deletion executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ExplainExec struct {

explain *core.Explain
analyzeExec Executor
executed bool
rows [][]string
cursor int
}
Expand Down Expand Up @@ -79,7 +80,8 @@ func (e *ExplainExec) generateExplainInfo(ctx context.Context) (rows [][]string,
closed = true
}
}()
if e.analyzeExec != nil {
if e.analyzeExec != nil && !e.executed {
e.executed = true
chk := newFirstChunk(e.analyzeExec)
var nextErr, closeErr error
for {
Expand Down Expand Up @@ -111,3 +113,16 @@ func (e *ExplainExec) generateExplainInfo(ctx context.Context) (rows [][]string,
}
return e.explain.Rows, nil
}

// getAnalyzeExecToExecutedNoDelay gets the analyze DML executor to execute in handleNoDelay function.
// For explain analyze insert/update/delete statement, the analyze executor should be executed in handleNoDelay
// function and then commit transaction if needed.
// Otherwise, in autocommit transaction, the table record change of analyze executor(insert/update/delete...)
// will not be committed.
func (e *ExplainExec) getAnalyzeExecToExecutedNoDelay() Executor {
if e.analyzeExec != nil && !e.executed && e.analyzeExec.Schema().Len() == 0 {
e.executed = true
return e.analyzeExec
}
return nil
}
50 changes: 50 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,56 @@ func (s *testIntegrationSerialSuite) TestExplainAnalyzePointGet(c *C) {
checkExplain("BatchGet")
}

<<<<<<< HEAD
=======
func (s *testIntegrationSerialSuite) TestExplainAnalyzeDML(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec(" create table t (a int, b int, unique index (a));")
tk.MustExec("insert into t values (1,1)")

res := tk.MustQuery("explain analyze select * from t where a=1;")
checkExplain := func(rpc string) {
resBuff := bytes.NewBufferString("")
for _, row := range res.Rows() {
fmt.Fprintf(resBuff, "%s\n", row)
}
explain := resBuff.String()
c.Assert(strings.Contains(explain, rpc+":{num_rpc:"), IsTrue, Commentf("%s", explain))
c.Assert(strings.Contains(explain, "total_time:"), IsTrue, Commentf("%s", explain))
}
checkExplain("Get")
res = tk.MustQuery("explain analyze insert ignore into t values (1,1),(2,2),(3,3),(4,4);")
checkExplain("BatchGet")
}

func (s *testIntegrationSuite) TestPartitionExplain(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec(`create table pt (id int, c int, key i_id(id), key i_c(c)) partition by range (c) (
partition p0 values less than (4),
partition p1 values less than (7),
partition p2 values less than (10))`)

tk.MustExec("set @@tidb_enable_index_merge = 1;")

var input []string
var output []struct {
SQL string
Plan []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + tt).Rows())
})
tk.MustQuery("explain " + tt).Check(testkit.Rows(output[i].Plan...))
}
}

>>>>>>> c704b9756... executor: add runtime information for DML statement in explain analyze (#19106)
func (s *testIntegrationSuite) TestPartialBatchPointGet(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
17 changes: 17 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,24 @@ func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec
if recordSets, err = s.executeStatement(ctx, stmt, recordSets, multiQuery); err != nil {
return nil, err
}
err = finishStmt(ctx, se, err, s)
}
<<<<<<< HEAD
=======
if rs != nil {
return &execStmtResult{
RecordSet: rs,
sql: s,
se: se,
}, err
}

// If it is not a select statement, we record its slow log here,
// then it could include the transaction commit time.
s.(*executor.ExecStmt).FinishExecuteStmt(origTxnCtx.StartTS, err == nil, false)
return nil, err
}
>>>>>>> c704b9756... executor: add runtime information for DML statement in explain analyze (#19106)

if s.sessionVars.ClientCapability&mysql.ClientMultiResults == 0 && len(recordSets) > 1 {
// return the first recordset if client doesn't support ClientMultiResults.
Expand Down

0 comments on commit 6206b0c

Please sign in to comment.