Skip to content

Commit

Permalink
executor: Improve the performance of WindowExec by using sliding wi…
Browse files Browse the repository at this point in the history
…ndow pingcap#14294

aggFuncSum bench
  • Loading branch information
mmyj committed Mar 5, 2020
1 parent 6834938 commit 974afc5
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 66 deletions.
150 changes: 87 additions & 63 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (

type mockDataSourceParameters struct {
schema *expression.Schema
genDataFunc func(row int, typ *types.FieldType) interface{}
genDataFunc func(col, row int, typ *types.FieldType) interface{}
ndvs []int // number of distinct values on columns[i] and zero represents no limit
orders []bool // columns[i] should be ordered if orders[i] is true
rows int // number of rows the DataSource should output
Expand Down Expand Up @@ -106,7 +106,7 @@ func (mds *mockDataSource) genColDatums(col int) (results []interface{}) {
results = make([]interface{}, 0, rows)
if mds.p.genDataFunc != nil {
for i := 0; i < rows; i++ {
results = append(results, mds.p.genDataFunc(i, typ))
results = append(results, mds.p.genDataFunc(col, i, typ))
}
} else if NDV == 0 {
for i := 0; i < rows; i++ {
Expand Down Expand Up @@ -185,9 +185,9 @@ func (mds *mockDataSource) Next(ctx context.Context, req *chunk.Chunk) error {
func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource {
baseExec := newBaseExecutor(opt.ctx, opt.schema, nil)
m := &mockDataSource{baseExec, opt, nil, nil, 0}
types := retTypes(m)
colData := make([][]interface{}, len(types))
for i := 0; i < len(types); i++ {
rTypes := retTypes(m)
colData := make([][]interface{}, len(rTypes))
for i := 0; i < len(rTypes); i++ {
colData[i] = m.genColDatums(i)
}

Expand All @@ -199,12 +199,14 @@ func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource {
for i := 0; i < m.p.rows; i++ {
idx := i / m.maxChunkSize
retTypes := retTypes(m)
for colIdx := 0; colIdx < len(types); colIdx++ {
for colIdx := 0; colIdx < len(rTypes); colIdx++ {
switch retTypes[colIdx].Tp {
case mysql.TypeLong, mysql.TypeLonglong:
m.genData[idx].AppendInt64(colIdx, colData[colIdx][i].(int64))
case mysql.TypeDouble:
case mysql.TypeDouble, mysql.TypeFloat:
m.genData[idx].AppendFloat64(colIdx, colData[colIdx][i].(float64))
case mysql.TypeDecimal:
m.genData[idx].AppendMyDecimal(colIdx, colData[colIdx][i].(*types.MyDecimal))
case mysql.TypeVarString:
m.genData[idx].AppendString(colIdx, colData[colIdx][i].(string))
default:
Expand Down Expand Up @@ -509,27 +511,25 @@ type windowTestCase struct {
dataSourceSorted bool
ctx sessionctx.Context
rawDataSmall string
}

func (a windowTestCase) columns() []*expression.Column {
return []*expression.Column{
{Index: 0, RetType: types.NewFieldType(mysql.TypeDouble)},
{Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)},
{Index: 2, RetType: types.NewFieldType(mysql.TypeVarString)},
{Index: 3, RetType: types.NewFieldType(mysql.TypeLonglong)},
}
columns []*expression.Column // the columns of mock schema
}

func (a windowTestCase) String() string {
return fmt.Sprintf("(func:%v, numFunc:%v, ndv:%v, rows:%v, sorted:%v, concurrency:%v)",
a.windowFunc, a.numFunc, a.ndv, a.rows, a.dataSourceSorted, a.concurrency)
return fmt.Sprintf("(func:%v, aggColType:%s, numFunc:%v, ndv:%v, rows:%v, sorted:%v, concurrency:%v)",
a.windowFunc, a.columns[0].RetType, a.numFunc, a.ndv, a.rows, a.dataSourceSorted, a.concurrency)
}

func defaultWindowTestCase() *windowTestCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
return &windowTestCase{ast.WindowFuncRowNumber, 1, nil, 1000, 10000000, 1, true, ctx, strings.Repeat("x", 16)}
return &windowTestCase{ast.WindowFuncRowNumber, 1, nil, 1000, 10000000, 1, true, ctx, strings.Repeat("x", 16),
[]*expression.Column{
{Index: 0, RetType: types.NewFieldType(mysql.TypeDouble)},
{Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)},
{Index: 2, RetType: types.NewFieldType(mysql.TypeVarString)},
{Index: 3, RetType: types.NewFieldType(mysql.TypeLonglong)},
}}
}

func benchmarkWindowExecWithCase(b *testing.B, casTest *windowTestCase) {
Expand All @@ -538,29 +538,50 @@ func benchmarkWindowExecWithCase(b *testing.B, casTest *windowTestCase) {
b.Fatal(err)
}

cols := casTest.columns()
const ndvIndex = 1
datumSet := make(map[string]bool, casTest.ndv)
datums := make([]interface{}, 0, casTest.ndv)
genDataFunc := func(col, row int, typ *types.FieldType) interface{} {
var ret interface{}
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
ret = int64(row)
case mysql.TypeDouble, mysql.TypeFloat:
ret = float64(row)
case mysql.TypeDecimal:
ret = types.NewDecFromInt(int64(row))
case mysql.TypeVarString:
ret = casTest.rawDataSmall
default:
panic("not implement")
}
if col != ndvIndex {
return ret
}

if len(datums) < casTest.ndv {
str := fmt.Sprintf("%v", ret)
datumSet[str] = true
datums = append(datums, ret)
return ret
}
return datums[rand.Intn(casTest.ndv)]
}

cols := casTest.columns
dataSource := buildMockDataSource(mockDataSourceParameters{
schema: expression.NewSchema(cols...),
ndvs: []int{0, casTest.ndv, 0, 0},
orders: []bool{false, casTest.dataSourceSorted, false, false},
rows: casTest.rows,
ctx: casTest.ctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
case mysql.TypeVarString:
return casTest.rawDataSmall
default:
panic("not implement")
}
},
schema: expression.NewSchema(cols...),
ndvs: []int{0, casTest.ndv, 0, 0},
orders: []bool{false, casTest.dataSourceSorted, false, false},
rows: casTest.rows,
ctx: casTest.ctx,
genDataFunc: genDataFunc,
})

b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer() // prepare a new window-executor
childCols := casTest.columns()
childCols := casTest.columns
schema := expression.NewSchema(childCols...)
windowExec := buildWindowExecutor(casTest.ctx, casTest.windowFunc, casTest.numFunc, casTest.frame, dataSource, schema, childCols[1:2], casTest.concurrency, casTest.dataSourceSorted)
tmpCtx := context.Background()
Expand Down Expand Up @@ -675,42 +696,45 @@ func BenchmarkWindowFunctionsWithFrame(b *testing.B) {
}
}

func benchmarkWindowFunctionsWithSlidingWindow(b *testing.B, frameType ast.FrameType) {
func baseBenchmarkWindowFunctionsWithSlidingWindow(b *testing.B, frameType ast.FrameType) {
b.ReportAllocs()
windowFuncs := []string{
ast.AggFuncCount,
ast.AggFuncSum,
windowFuncs := []struct {
aggFunc string
aggColTypes []byte
}{
{ast.AggFuncSum, []byte{mysql.TypeDecimal, mysql.TypeFloat}},
{ast.AggFuncCount, []byte{mysql.TypeLong}},
}
rows := []int{1000, 100000}
ndvs := []int{10, 100}
frames := []*core.WindowFrame{
{Type: frameType, Start: &core.FrameBound{Type: ast.Preceding, Num: 10}, End: &core.FrameBound{Type: ast.Following, Num: 10}},
{Type: frameType, Start: &core.FrameBound{Type: ast.Preceding, Num: 100}, End: &core.FrameBound{Type: ast.Following, Num: 100}},
}
for _, row := range rows {
for _, ndv := range ndvs {
for _, frame := range frames {
for _, windowFunc := range windowFuncs {
cas := defaultWindowTestCase()
cas.rows = row
cas.ndv = ndv
cas.windowFunc = windowFunc
cas.frame = frame
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkWindowExecWithCase(b, cas)
})
for _, windowFunc := range windowFuncs {
for _, aggColType := range windowFunc.aggColTypes {
for _, row := range rows {
for _, ndv := range ndvs {
for _, frame := range frames {
cas := defaultWindowTestCase()
cas.rows = row
cas.ndv = ndv
cas.windowFunc = windowFunc.aggFunc
cas.frame = frame
cas.columns[0] = &expression.Column{Index: 0, RetType: types.NewFieldType(aggColType)}
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkWindowExecWithCase(b, cas)
})
}
}
}
}
}
}

func BenchmarkWindowFunctionsWithSlidingWindowRows(b *testing.B) {
benchmarkWindowFunctionsWithSlidingWindow(b, ast.Rows)
}

func BenchmarkWindowFunctionsWithSlidingWindowRanges(b *testing.B) {
benchmarkWindowFunctionsWithSlidingWindow(b, ast.Ranges)
func BenchmarkWindowFunctionsWithSlidingWindow(b *testing.B) {
baseBenchmarkWindowFunctionsWithSlidingWindow(b, ast.Rows)
baseBenchmarkWindowFunctionsWithSlidingWindow(b, ast.Ranges)
}

type hashJoinTestCase struct {
Expand Down Expand Up @@ -823,7 +847,7 @@ func benchmarkHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) {
opt1 := mockDataSourceParameters{
rows: casTest.rows,
ctx: casTest.ctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
genDataFunc: func(col, row int, typ *types.FieldType) interface{} {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
Expand Down Expand Up @@ -1015,7 +1039,7 @@ func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
schema: expression.NewSchema(casTest.columns()...),
rows: casTest.rows,
ctx: casTest.ctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
genDataFunc: func(col, row int, typ *types.FieldType) interface{} {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
Expand Down Expand Up @@ -1139,7 +1163,7 @@ func (tc indexJoinTestCase) getMockDataSourceOptByRows(rows int) mockDataSourceP
schema: expression.NewSchema(tc.columns()...),
rows: rows,
ctx: tc.ctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
genDataFunc: func(col, row int, typ *types.FieldType) interface{} {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
Expand Down Expand Up @@ -1431,7 +1455,7 @@ func newMergeJoinBenchmark(numOuterRows, numInnerDup, numInnerRedundant int) (tc
schema: expression.NewSchema(tc.columns()...),
rows: numOuterRows,
ctx: tc.ctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
genDataFunc: func(col, row int, typ *types.FieldType) interface{} {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
Expand All @@ -1449,7 +1473,7 @@ func newMergeJoinBenchmark(numOuterRows, numInnerDup, numInnerRedundant int) (tc
schema: expression.NewSchema(tc.columns()...),
rows: numInnerRows,
ctx: tc.ctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
genDataFunc: func(col, row int, typ *types.FieldType) interface{} {
row = row / numInnerDup
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
Expand Down
2 changes: 1 addition & 1 deletion executor/join_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (s *pkgTestSuite) TestJoinExec(c *C) {
opt1 := mockDataSourceParameters{
rows: casTest.rows,
ctx: casTest.ctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
genDataFunc: func(col, row int, typ *types.FieldType) interface{} {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
Expand Down
4 changes: 2 additions & 2 deletions executor/pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (s *pkgTestSuite) TestNestedLoopApply(c *C) {
schema: outerSchema,
rows: 6,
ctx: sctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
genDataFunc: func(col, row int, typ *types.FieldType) interface{} {
return int64(row + 1)
},
})
Expand All @@ -41,7 +41,7 @@ func (s *pkgTestSuite) TestNestedLoopApply(c *C) {
schema: innerSchema,
rows: 6,
ctx: sctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
genDataFunc: func(col, row int, typ *types.FieldType) interface{} {
return int64(row + 1)
},
})
Expand Down

0 comments on commit 974afc5

Please sign in to comment.