Skip to content

Commit

Permalink
Support sort on window agg
Browse files Browse the repository at this point in the history
  • Loading branch information
spongedu committed Dec 1, 2018
1 parent 3fd0062 commit d36b70b
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 9 deletions.
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,7 @@ func (b *executorBuilder) buildSort(v *plannercore.PhysicalSort) Executor {
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),
ByItems: v.ByItems,
schema: v.Schema(),
streamWindowSort: v.StreamWinSort,
}
metrics.ExecutorCounter.WithLabelValues("SortExec").Inc()
return &sortExec
Expand Down
93 changes: 91 additions & 2 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@ type SortExec struct {
rowChunks *chunk.List
// rowPointer store the chunk index and row index for each row.
rowPtrs []chunk.RowPtr

memTracker *memory.Tracker

streamWindowSort bool

sterminated bool
chk *chunk.Chunk
}

// Close implements the Executor Close interface.
Expand All @@ -63,6 +67,9 @@ func (e *SortExec) Close() error {
func (e *SortExec) Open(ctx context.Context) error {
e.fetched = false
e.Idx = 0
e.sterminated = false
e.chk = nil


// To avoid duplicated initialization for TopNExec.
if e.memTracker == nil {
Expand All @@ -74,6 +81,18 @@ func (e *SortExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *SortExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if !e.streamWindowSort {
return e.next(ctx, chk)
} else {
if e.sterminated {
return nil
}
e.sreset()
return e.snext(ctx, chk)
}
}

func (e *SortExec) next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
Expand Down Expand Up @@ -110,6 +129,74 @@ func (e *SortExec) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}

func (e *SortExec) snext(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if !e.fetched {
err := e.fetchWindow(ctx)
if err != nil {
return errors.Trace(err)
}
e.initPointers()
e.initCompareFuncs()
allColumnExpr := e.buildKeyColumns()
if allColumnExpr {
sort.Slice(e.rowPtrs, e.keyColumnsLess)
} else {
e.buildKeyExprsAndTypes()
err = e.buildKeyChunks()
if err != nil {
return errors.Trace(err)
}
sort.Slice(e.rowPtrs, e.keyChunksLess)
}
e.fetched = true
}
for chk.NumRows() < e.maxChunkSize {
if e.Idx >= len(e.rowPtrs) {
break
}
rowPtr := e.rowPtrs[e.Idx]
chk.AppendRow(e.rowChunks.GetRow(rowPtr))
e.Idx++
}
return nil
}

func (e *SortExec) sreset() {
e.Idx = 0
e.fetched = false
}

func (e *SortExec) fetchWindow(ctx context.Context) error {
fields := e.retTypes()
e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize)
e.rowChunks.GetMemTracker().AttachTo(e.memTracker)
e.rowChunks.GetMemTracker().SetLabel("rowChunks")
for {
if e.chk == nil {
e.chk = e.children[0].newFirstChunk()
}
err := e.children[0].Next(ctx, e.chk)
if err != nil {
return errors.Trace(err)
}
rowCount := e.chk.NumRows()
if rowCount == 0 {
e.sterminated = true
break
}
e.rowChunks.Add(e.chk)
if rowCount < e.maxChunkSize {
break
}
}
return nil
}

func (e *SortExec) fetchRowChunks(ctx context.Context) error {
fields := e.retTypes()
e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize)
Expand All @@ -125,7 +212,6 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
if rowCount == 0 {
break
}
e.rowChunks.Add(chk)
}
return nil
}
Expand Down Expand Up @@ -222,6 +308,9 @@ func (e *SortExec) keyChunksLess(i, j int) bool {
return e.lessRow(keyRowI, keyRowJ)
}




// TopNExec implements a Top-N algorithm and it is built from a SELECT statement with ORDER BY and LIMIT.
// Instead of sorting all the rows fetched from the table, it keeps the Top-N elements only in a heap to reduce memory usage.
type TopNExec struct {
Expand Down
1 change: 0 additions & 1 deletion expression/aggregation/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/pingcap/tidb/types"
)

// TODO: Complete here
type AggWindowDesc struct {
WinColName string
Size uint64
Expand Down
2 changes: 1 addition & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty)
}

func (ls *LogicalSort) getPhysicalSort(prop *property.PhysicalProperty) *PhysicalSort {
ps := PhysicalSort{ByItems: ls.ByItems}.Init(ls.ctx, ls.stats.ScaleByExpectCnt(prop.ExpectedCnt), &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64})
ps := PhysicalSort{ByItems: ls.ByItems, StreamWinSort: ls.StreamWindowSort}.Init(ls.ctx, ls.stats.ScaleByExpectCnt(prop.ExpectedCnt), &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64})
return ps
}

Expand Down
11 changes: 6 additions & 5 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ func (b *PlanBuilder) buildUnion(union *ast.UnionStmt) (LogicalPlan, error) {
oldLen := unionPlan.Schema().Len()

if union.OrderBy != nil {
unionPlan, err = b.buildSort(unionPlan, union.OrderBy.Items, nil)
unionPlan, err = b.buildSort(unionPlan, union.OrderBy.Items, nil, false)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -830,7 +830,7 @@ func (by *ByItems) Clone() *ByItems {
return &ByItems{Expr: by.Expr.Clone(), Desc: by.Desc}
}

func (b *PlanBuilder) buildSort(p LogicalPlan, byItems []*ast.ByItem, aggMapper map[*ast.AggregateFuncExpr]int) (*LogicalSort, error) {
func (b *PlanBuilder) buildSort(p LogicalPlan, byItems []*ast.ByItem, aggMapper map[*ast.AggregateFuncExpr]int, streamWinSort bool) (*LogicalSort, error) {
b.curClause = orderByClause
sort := LogicalSort{}.Init(b.ctx)
exprs := make([]*ByItems, 0, len(byItems))
Expand All @@ -845,6 +845,7 @@ func (b *PlanBuilder) buildSort(p LogicalPlan, byItems []*ast.ByItem, aggMapper
}
sort.ByItems = exprs
sort.SetChildren(p)
sort.StreamWindowSort = streamWinSort
return sort, nil
}

Expand Down Expand Up @@ -1761,7 +1762,7 @@ func (b *PlanBuilder) buildSelect(sel *ast.SelectStmt) (p LogicalPlan, err error
}

if sel.OrderBy != nil {
p, err = b.buildSort(p, sel.OrderBy.Items, orderMap)
p, err = b.buildSort(p, sel.OrderBy.Items, orderMap, sel.StreamWindowSpec != nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -2120,7 +2121,7 @@ func (b *PlanBuilder) buildUpdate(update *ast.UpdateStmt) (Plan, error) {
}
}
if sel.OrderBy != nil {
p, err = b.buildSort(p, sel.OrderBy.Items, nil)
p, err = b.buildSort(p, sel.OrderBy.Items, nil, false)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -2280,7 +2281,7 @@ func (b *PlanBuilder) buildDelete(delete *ast.DeleteStmt) (Plan, error) {
}

if sel.OrderBy != nil {
p, err = b.buildSort(p, sel.OrderBy.Items, nil)
p, err = b.buildSort(p, sel.OrderBy.Items, nil, false)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ type LogicalSort struct {
baseLogicalPlan

ByItems []*ByItems
StreamWindowSort bool
}

func (ls *LogicalSort) extractCorrelatedCols() []*expression.CorrelatedColumn {
Expand Down
1 change: 1 addition & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ type PhysicalSort struct {
basePhysicalPlan

ByItems []*ByItems
StreamWinSort bool
}

// NominalSort asks sort properties for its child. It is a fake operator that will not
Expand Down

0 comments on commit d36b70b

Please sign in to comment.