Skip to content

Commit

Permalink
planner: encode insert/delete/update executor plan information in sl…
Browse files Browse the repository at this point in the history
…ow log plan field (#19176)
  • Loading branch information
crazycs520 authored Aug 18, 2020
1 parent 04b7492 commit 67214e7
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 29 deletions.
20 changes: 10 additions & 10 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,10 +933,10 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) {
tk.MustExec("/**/insert into t values(4, 'd')")
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, sum_cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text, plan
max_prewrite_regions, avg_affected_rows, query_sample_text
from information_schema.statements_summary
where digest_text like 'insert into t%'`,
).Check(testkit.Rows("Insert test test.t <nil> 4 0 0 0 0 0 2 2 1 1 1 insert into t values(1, 'a') "))
).Check(testkit.Rows("Insert test test.t <nil> 4 0 0 0 0 0 2 2 1 1 1 insert into t values(1, 'a')"))

// Test point get.
tk.MustExec("drop table if exists p")
Expand Down Expand Up @@ -983,10 +983,10 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) {
// select ... order by
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, sum_cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text, plan
max_prewrite_regions, avg_affected_rows, query_sample_text
from information_schema.statements_summary
order by exec_count desc limit 1`,
).Check(testkit.Rows("Insert test test.t <nil> 4 0 0 0 0 0 2 2 1 1 1 insert into t values(1, 'a') "))
).Check(testkit.Rows("Insert test test.t <nil> 4 0 0 0 0 0 2 2 1 1 1 insert into t values(1, 'a')"))

// Test different plans with same digest.
c.Assert(failpoint.Enable(failpointName, "return(1000)"), IsNil)
Expand Down Expand Up @@ -1029,16 +1029,16 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) {
tk.MustExec("commit")
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, sum_cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text, prev_sample_text, plan
max_prewrite_regions, avg_affected_rows, query_sample_text, prev_sample_text
from information_schema.statements_summary
where digest_text like 'insert into t%'`,
).Check(testkit.Rows("Insert test test.t <nil> 1 0 0 0 0 0 0 0 0 0 1 insert into t values(1, 'a') "))
).Check(testkit.Rows("Insert test test.t <nil> 1 0 0 0 0 0 0 0 0 0 1 insert into t values(1, 'a') "))
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, sum_cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text, prev_sample_text, plan
max_prewrite_regions, avg_affected_rows, query_sample_text, prev_sample_text
from information_schema.statements_summary
where digest_text='commit'`,
).Check(testkit.Rows("Commit test <nil> <nil> 1 0 0 0 0 0 2 2 1 1 0 commit insert into t values(1, 'a') "))
).Check(testkit.Rows("Commit test <nil> <nil> 1 0 0 0 0 0 2 2 1 1 0 commit insert into t values(1, 'a')"))

tk.MustQuery("select * from t where a=2")
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, sum_cop_task_num, avg_total_keys,
Expand Down Expand Up @@ -1183,10 +1183,10 @@ func (s *testTableSuite) TestStmtSummaryHistoryTable(c *C) {
tk.MustExec("/**/insert into test_summary values(4, 'd')")
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, sum_cop_task_num, avg_total_keys,
max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions,
max_prewrite_regions, avg_affected_rows, query_sample_text, plan
max_prewrite_regions, avg_affected_rows, query_sample_text
from information_schema.statements_summary_history
where digest_text like 'insert into test_summary%'`,
).Check(testkit.Rows("Insert test test.test_summary <nil> 4 0 0 0 0 0 2 2 1 1 1 insert into test_summary values(1, 'a') "))
).Check(testkit.Rows("Insert test test.test_summary <nil> 4 0 0 0 0 0 2 2 1 1 1 insert into test_summary values(1, 'a')"))

tk.MustExec("set global tidb_stmt_summary_history_size = 0")
tk.MustQuery(`select stmt_type, schema_name, table_names, index_names, exec_count, sum_cop_task_num, avg_total_keys,
Expand Down
38 changes: 26 additions & 12 deletions planner/core/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,26 @@ type planEncoder struct {
func EncodePlan(p Plan) string {
pn := encoderPool.Get().(*planEncoder)
defer encoderPool.Put(pn)
selectPlan := getSelectPlan(p)
if selectPlan == nil {
if p == nil || p.SCtx() == nil {
return ""
}
failpoint.Inject("mockPlanRowCount", func(val failpoint.Value) {
selectPlan.statsInfo().RowCount = float64(val.(int))
})
return pn.encodePlanTree(selectPlan)
selectPlan := getSelectPlan(p)
if selectPlan != nil {
failpoint.Inject("mockPlanRowCount", func(val failpoint.Value) {
selectPlan.statsInfo().RowCount = float64(val.(int))
})
}
return pn.encodePlanTree(p)
}

func (pn *planEncoder) encodePlanTree(p PhysicalPlan) string {
func (pn *planEncoder) encodePlanTree(p Plan) string {
pn.encodedPlans = make(map[int]bool)
pn.buf.Reset()
pn.encodePlan(p, true, 0)
return plancodec.Compress(pn.buf.Bytes())
}

func (pn *planEncoder) encodePlan(p PhysicalPlan, isRoot bool, depth int) {
func (pn *planEncoder) encodePlan(p Plan, isRoot bool, depth int) {
var storeType kv.StoreType = kv.UnSpecified
if !isRoot {
switch copPlan := p.(type) {
Expand All @@ -71,17 +73,29 @@ func (pn *planEncoder) encodePlan(p PhysicalPlan, isRoot bool, depth int) {
}
taskTypeInfo := plancodec.EncodeTaskType(isRoot, storeType)
actRows, analyzeInfo, memoryInfo, diskInfo := getRuntimeInfo(p.SCtx(), p)
plancodec.EncodePlanNode(depth, p.ID(), p.TP(), p.statsInfo().RowCount, taskTypeInfo, p.ExplainInfo(), actRows, analyzeInfo, memoryInfo, diskInfo, &pn.buf)
rowCount := 0.0
if statsInfo := p.statsInfo(); statsInfo != nil {
rowCount = p.statsInfo().RowCount
}
plancodec.EncodePlanNode(depth, p.ID(), p.TP(), rowCount, taskTypeInfo, p.ExplainInfo(), actRows, analyzeInfo, memoryInfo, diskInfo, &pn.buf)
pn.encodedPlans[p.ID()] = true

depth++
for _, child := range p.Children() {

selectPlan := getSelectPlan(p)
if selectPlan == nil {
return
}
if !pn.encodedPlans[selectPlan.ID()] {
pn.encodePlan(selectPlan, isRoot, depth)
return
}
for _, child := range selectPlan.Children() {
if pn.encodedPlans[child.ID()] {
continue
}
pn.encodePlan(child.(PhysicalPlan), isRoot, depth)
}
switch copPlan := p.(type) {
switch copPlan := selectPlan.(type) {
case *PhysicalTableReader:
pn.encodePlan(copPlan.tablePlan, false, depth)
case *PhysicalIndexReader:
Expand Down
24 changes: 17 additions & 7 deletions planner/core/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,24 @@ func (s *testPlanNormalize) TestEncodeDecodePlan(c *C) {
tk.MustExec("set tidb_enable_collect_execution_info=1;")

tk.Se.GetSessionVars().PlanID = 0
getPlanTree := func() string {
info := tk.Se.ShowProcess()
c.Assert(info, NotNil)
p, ok := info.Plan.(core.Plan)
c.Assert(ok, IsTrue)
encodeStr := core.EncodePlan(p)
planTree, err := plancodec.DecodePlan(encodeStr)
c.Assert(err, IsNil)
return planTree
}
tk.MustExec("select max(a) from t1 where a>0;")
info := tk.Se.ShowProcess()
c.Assert(info, NotNil)
p, ok := info.Plan.(core.Plan)
c.Assert(ok, IsTrue)
encodeStr := core.EncodePlan(p)
planTree, err := plancodec.DecodePlan(encodeStr)
c.Assert(err, IsNil)
planTree := getPlanTree()
c.Assert(strings.Contains(planTree, "time"), IsTrue)
c.Assert(strings.Contains(planTree, "loops"), IsTrue)

tk.MustExec("insert into t1 values (1,1,1);")
planTree = getPlanTree()
c.Assert(strings.Contains(planTree, "Insert"), IsTrue)
c.Assert(strings.Contains(planTree, "time"), IsTrue)
c.Assert(strings.Contains(planTree, "loops"), IsTrue)
}
Expand Down

0 comments on commit 67214e7

Please sign in to comment.