Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fast path point select #6937

Merged
merged 21 commits into from
Jul 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p plan.Plan
case *plan.PhysicalTableReader:
tableScan := v.TablePlans[0].(*plan.PhysicalTableScan)
return len(tableScan.Ranges) == 1 && tableScan.Ranges[0].IsPoint(ctx.GetSessionVars().StmtCtx)
case *plan.PointGetPlan:
return true
default:
return false
}
Expand Down
10 changes: 6 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildExecute(v)
case *plan.Explain:
return b.buildExplain(v)
case *plan.PointGetPlan:
return b.buildPointGet(v)
case *plan.Insert:
return b.buildInsert(v)
case *plan.LoadData:
Expand Down Expand Up @@ -1030,7 +1032,7 @@ func (b *executorBuilder) buildProjection(v *plan.PhysicalProjection) Executor {
// If the calculation row count for this Projection operator is smaller
// than a Chunk size, we turn back to the un-parallel Projection
// implementation to reduce the goroutine overhead.
if v.StatsInfo().Count() < int64(b.ctx.GetSessionVars().MaxChunkSize) {
if int64(v.StatsCount()) < int64(b.ctx.GetSessionVars().MaxChunkSize) {
e.numWorkers = 0
}
return e
Expand Down Expand Up @@ -1483,7 +1485,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plan.PhysicalTableReader) (*
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
} else {
e.feedback = statistics.NewQueryFeedback(ts.Table.ID, ts.Hist, ts.StatsInfo().Count(), ts.Desc)
e.feedback = statistics.NewQueryFeedback(ts.Table.ID, ts.Hist, int64(ts.StatsCount()), ts.Desc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why make this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go lint reports error because it returns an unexported type statsInfo.

}
collect := e.feedback.CollectFeedback(len(ts.Ranges))
e.dagPB.CollectRangeCounts = &collect
Expand Down Expand Up @@ -1540,7 +1542,7 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plan.PhysicalIndexReader) (*
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
} else {
e.feedback = statistics.NewQueryFeedback(is.Table.ID, is.Hist, is.StatsInfo().Count(), is.Desc)
e.feedback = statistics.NewQueryFeedback(is.Table.ID, is.Hist, int64(is.StatsCount()), is.Desc)
}
collect := e.feedback.CollectFeedback(len(is.Ranges))
e.dagPB.CollectRangeCounts = &collect
Expand Down Expand Up @@ -1609,7 +1611,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plan.PhysicalIndexLook
if containsLimit(indexReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
} else {
e.feedback = statistics.NewQueryFeedback(is.Table.ID, is.Hist, is.StatsInfo().Count(), is.Desc)
e.feedback = statistics.NewQueryFeedback(is.Table.ID, is.Hist, int64(is.StatsCount()), is.Desc)
}
// do not collect the feedback for table request.
collectTable := false
Expand Down
2 changes: 1 addition & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func isExpensiveQuery(p plan.Plan) bool {

func isPhysicalPlanExpensive(p plan.PhysicalPlan) bool {
expensiveRowThreshold := int64(config.GetGlobalConfig().Log.ExpensiveThreshold)
if p.StatsInfo().Count() > expensiveRowThreshold {
if int64(p.StatsCount()) > expensiveRowThreshold {
return true
}

Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1659,7 +1659,7 @@ func (s *testSuite) TestAdapterStatement(c *C) {
c.Check(stmt.OriginText(), Equals, "create table test.t (a int)")
}

func (s *testSuite) TestPointGet(c *C) {
func (s *testSuite) TestIsPointGet(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use mysql")
ctx := tk.Se.(sessionctx.Context)
Expand Down
200 changes: 200 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"golang.org/x/net/context"
)

func (b *executorBuilder) buildPointGet(p *plan.PointGetPlan) Executor {
return &PointGetExecutor{
ctx: b.ctx,
schema: p.Schema(),
tblInfo: p.TblInfo,
idxInfo: p.IndexInfo,
idxVals: p.IndexValues,
handle: p.Handle,
startTS: b.getStartTS(),
}
}

// PointGetExecutor executes point select query.
type PointGetExecutor struct {
ctx sessionctx.Context
schema *expression.Schema
tps []*types.FieldType
tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
idxVals []types.Datum
startTS uint64
snapshot kv.Snapshot
done bool
}

// Open implements the Executor interface.
func (e *PointGetExecutor) Open(context.Context) error {
return nil
}

// Close implements the Executor interface.
func (e *PointGetExecutor) Close() error {
return nil
}

// Next implements the Executor interface.
func (e *PointGetExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
}
e.done = true
var err error
e.snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.startTS})
if err != nil {
return errors.Trace(err)
}
if e.idxInfo != nil {
idxKey, err1 := e.encodeIndexKey()
if err1 != nil {
return errors.Trace(err1)
}
handleVal, err1 := e.get(idxKey)
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
return errors.Trace(err1)
}
if len(handleVal) == 0 {
return nil
}
e.handle, err1 = tables.DecodeHandle(handleVal)
if err1 != nil {
return errors.Trace(err1)
}
}
key := tablecodec.EncodeRowKeyWithHandle(e.tblInfo.ID, e.handle)
val, err := e.get(key)
if err != nil && !kv.ErrNotExist.Equal(err) {
return errors.Trace(err)
}
if len(val) == 0 {
if e.idxInfo != nil {
return kv.ErrNotExist.Gen("inconsistent extra index %s, handle %d not found in table",
e.idxInfo.Name.O, e.handle)
}
return nil
}
return e.decodeRowValToChunk(val, chk)
}

func (e *PointGetExecutor) encodeIndexKey() ([]byte, error) {
for i := range e.idxVals {
colInfo := e.tblInfo.Columns[e.idxInfo.Columns[i].Offset]
casted, err := table.CastValue(e.ctx, e.idxVals[i], colInfo)
if err != nil {
return nil, errors.Trace(err)
}
e.idxVals[i] = casted
}
encodedIdxVals, err := codec.EncodeKey(e.ctx.GetSessionVars().StmtCtx, nil, e.idxVals...)
if err != nil {
return nil, errors.Trace(err)
}
return tablecodec.EncodeIndexSeekKey(e.tblInfo.ID, e.idxInfo.ID, encodedIdxVals), nil
}

func (e *PointGetExecutor) get(key kv.Key) (val []byte, err error) {
txn := e.ctx.Txn()
if txn != nil && txn.Valid() && !txn.IsReadOnly() {
return txn.Get(key)
}
return e.snapshot.Get(key)
}

func (e *PointGetExecutor) decodeRowValToChunk(rowVal []byte, chk *chunk.Chunk) error {
colIDs := make(map[int64]int, e.schema.Len())
for i, col := range e.schema.Columns {
colIDs[col.ID] = i
}
colVals, err := tablecodec.CutRowNew(rowVal, colIDs)
if err != nil {
return errors.Trace(err)
}
decoder := codec.NewDecoder(chk, e.ctx.GetSessionVars().Location())
for id, offset := range colIDs {
if e.tblInfo.PKIsHandle && mysql.HasPriKeyFlag(e.schema.Columns[offset].RetType.Flag) {
chk.AppendInt64(offset, e.handle)
continue
}
if id == model.ExtraHandleID {
chk.AppendInt64(offset, e.handle)
continue
}
colVal := colVals[offset]
if len(colVal) == 0 {
colInfo := getColInfoByID(e.tblInfo, id)
d, err1 := table.GetColOriginDefaultValue(e.ctx, colInfo)
if err1 != nil {
return errors.Trace(err1)
}
chk.AppendDatum(offset, &d)
continue
}
_, err = decoder.DecodeOne(colVals[offset], offset, e.schema.Columns[offset].RetType)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
for _, col := range tbl.Columns {
if col.ID == colID {
return col
}
}
return nil
}

// Schema implements the Executor interface.
func (e *PointGetExecutor) Schema() *expression.Schema {
return e.schema
}

func (e *PointGetExecutor) retTypes() []*types.FieldType {
if e.tps == nil {
e.tps = make([]*types.FieldType, e.schema.Len())
for i := range e.schema.Columns {
e.tps[i] = e.schema.Columns[i].RetType
}
}
return e.tps
}

func (e *PointGetExecutor) newChunk() *chunk.Chunk {
return chunk.NewChunkWithCapacity(e.retTypes(), 1)
}
38 changes: 38 additions & 0 deletions executor/point_get_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/testkit"
)

func (s *testSuite) TestPointGet(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table point (id int primary key, c int, d varchar(10), unique c_d (c, d))")
tk.MustExec("insert point values (1, 1, 'a')")
tk.MustExec("insert point values (2, 2, 'b')")
tk.MustQuery("select * from point where id = 1 and c = 0").Check(testkit.Rows())
tk.MustQuery("select * from point where id < 0 and c = 1 and d = 'b'").Check(testkit.Rows())
result, err := tk.Exec("select id as ident from point where id = 1")
c.Assert(err, IsNil)
fields := result.Fields()
c.Assert(fields[0].ColumnAsName.O, Equals, "ident")

tk.MustExec("CREATE TABLE tab3(pk INTEGER PRIMARY KEY, col0 INTEGER, col1 FLOAT, col2 TEXT, col3 INTEGER, col4 FLOAT, col5 TEXT);")
tk.MustExec("CREATE UNIQUE INDEX idx_tab3_0 ON tab3 (col4);")
tk.MustExec("INSERT INTO tab3 VALUES(0,854,111.96,'mguub',711,966.36,'snwlo');")
tk.MustQuery("SELECT ALL * FROM tab3 WHERE col4 = 85;").Check(testkit.Rows())
}
7 changes: 6 additions & 1 deletion expression/simple_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ func ParseSimpleExpr(ctx sessionctx.Context, exprStr string, tableInfo *model.Ta
return nil, errors.Trace(err)
}
expr := stmts[0].(*ast.SelectStmt).Fields.Fields[0].Expr
rewriter := &simpleRewriter{tbl: tableInfo, ctx: ctx}
return RewriteSimpleExpr(ctx, tableInfo, expr)
}

// RewriteSimpleExpr rewrites simple ast.ExprNode to expression.Expression.
func RewriteSimpleExpr(ctx sessionctx.Context, tbl *model.TableInfo, expr ast.ExprNode) (Expression, error) {
rewriter := &simpleRewriter{tbl: tbl, ctx: ctx}
expr.Accept(rewriter)
if rewriter.err != nil {
return nil, errors.Trace(rewriter.err)
Expand Down
2 changes: 1 addition & 1 deletion plan/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (e *Explain) explainPlanInRowFormat(p PhysicalPlan, taskType, indent string
// operator id, task type, operator info, and the estemated row count.
func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent string, isLastChild bool) {
operatorInfo := p.ExplainInfo()
count := string(strconv.AppendFloat([]byte{}, p.StatsInfo().count, 'f', 2, 64))
count := string(strconv.AppendFloat([]byte{}, p.statsInfo().count, 'f', 2, 64))
row := []string{e.prettyIdentifier(p.ExplainID(), indent, isLastChild), count, taskType, operatorInfo}
e.Rows = append(e.Rows, row)
}
Expand Down
4 changes: 4 additions & 0 deletions plan/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type logicalOptRule interface {
// Optimize does optimization and creates a Plan.
// The node must be prepared first.
func Optimize(ctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) (Plan, error) {
fp := tryFastPlan(ctx, node)
if fp != nil {
return fp, nil
}
ctx.GetSessionVars().PlanID = 0
ctx.GetSessionVars().PlanColumnID = 0
builder := &planBuilder{
Expand Down
13 changes: 8 additions & 5 deletions plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type Plan interface {

context() sessionctx.Context

// StatsInfo will return the statsInfo for this plan.
StatsInfo() *statsInfo
// statsInfo will return the statsInfo for this plan.
statsInfo() *statsInfo
}

// taskType is the type of execution task.
Expand Down Expand Up @@ -91,7 +91,7 @@ func (p *requiredProp) enforceProperty(tsk task, ctx sessionctx.Context) task {
}
tsk = finishCopTask(ctx, tsk)
sortReqProp := &requiredProp{taskTp: rootTaskType, cols: p.cols, expectedCnt: math.MaxFloat64}
sort := PhysicalSort{ByItems: make([]*ByItems, 0, len(p.cols))}.init(ctx, tsk.plan().StatsInfo(), sortReqProp)
sort := PhysicalSort{ByItems: make([]*ByItems, 0, len(p.cols))}.init(ctx, tsk.plan().statsInfo(), sortReqProp)
for _, col := range p.cols {
sort.ByItems = append(sort.ByItems, &ByItems{col, p.desc})
}
Expand Down Expand Up @@ -229,6 +229,9 @@ type PhysicalPlan interface {
// getChildReqProps gets the required property by child index.
getChildReqProps(idx int) *requiredProp

// StatsCount returns the count of statsInfo for this plan.
StatsCount() float64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function can be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called by executor package to determine if the query is expensive.


// Get all the children.
Children() []PhysicalPlan

Expand Down Expand Up @@ -349,8 +352,8 @@ func (p *basePlan) ID() int {
return p.id
}

// StatsInfo implements the Plan interface.
func (p *basePlan) StatsInfo() *statsInfo {
// statsInfo implements the Plan interface.
func (p *basePlan) statsInfo() *statsInfo {
return p.stats
}

Expand Down
Loading