From 974afc549568667ade4287e83d0043879931ce50 Mon Sep 17 00:00:00 2001 From: mmyj Date: Thu, 5 Mar 2020 21:45:52 +0800 Subject: [PATCH] executor: Improve the performance of `WindowExec` by using sliding window #14294 aggFuncSum bench --- executor/benchmark_test.go | 150 +++++++++++++++++++++---------------- executor/join_pkg_test.go | 2 +- executor/pkg_test.go | 4 +- 3 files changed, 90 insertions(+), 66 deletions(-) diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index 2a2cb506e7f1e..99b66982ef8c2 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -49,7 +49,7 @@ var ( type mockDataSourceParameters struct { schema *expression.Schema - genDataFunc func(row int, typ *types.FieldType) interface{} + genDataFunc func(col, row int, typ *types.FieldType) interface{} ndvs []int // number of distinct values on columns[i] and zero represents no limit orders []bool // columns[i] should be ordered if orders[i] is true rows int // number of rows the DataSource should output @@ -106,7 +106,7 @@ func (mds *mockDataSource) genColDatums(col int) (results []interface{}) { results = make([]interface{}, 0, rows) if mds.p.genDataFunc != nil { for i := 0; i < rows; i++ { - results = append(results, mds.p.genDataFunc(i, typ)) + results = append(results, mds.p.genDataFunc(col, i, typ)) } } else if NDV == 0 { for i := 0; i < rows; i++ { @@ -185,9 +185,9 @@ func (mds *mockDataSource) Next(ctx context.Context, req *chunk.Chunk) error { func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource { baseExec := newBaseExecutor(opt.ctx, opt.schema, nil) m := &mockDataSource{baseExec, opt, nil, nil, 0} - types := retTypes(m) - colData := make([][]interface{}, len(types)) - for i := 0; i < len(types); i++ { + rTypes := retTypes(m) + colData := make([][]interface{}, len(rTypes)) + for i := 0; i < len(rTypes); i++ { colData[i] = m.genColDatums(i) } @@ -199,12 +199,14 @@ func buildMockDataSource(opt mockDataSourceParameters) *mockDataSource { for i := 0; i < m.p.rows; i++ { idx := i / m.maxChunkSize retTypes := retTypes(m) - for colIdx := 0; colIdx < len(types); colIdx++ { + for colIdx := 0; colIdx < len(rTypes); colIdx++ { switch retTypes[colIdx].Tp { case mysql.TypeLong, mysql.TypeLonglong: m.genData[idx].AppendInt64(colIdx, colData[colIdx][i].(int64)) - case mysql.TypeDouble: + case mysql.TypeDouble, mysql.TypeFloat: m.genData[idx].AppendFloat64(colIdx, colData[colIdx][i].(float64)) + case mysql.TypeDecimal: + m.genData[idx].AppendMyDecimal(colIdx, colData[colIdx][i].(*types.MyDecimal)) case mysql.TypeVarString: m.genData[idx].AppendString(colIdx, colData[colIdx][i].(string)) default: @@ -509,27 +511,25 @@ type windowTestCase struct { dataSourceSorted bool ctx sessionctx.Context rawDataSmall string -} - -func (a windowTestCase) columns() []*expression.Column { - return []*expression.Column{ - {Index: 0, RetType: types.NewFieldType(mysql.TypeDouble)}, - {Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}, - {Index: 2, RetType: types.NewFieldType(mysql.TypeVarString)}, - {Index: 3, RetType: types.NewFieldType(mysql.TypeLonglong)}, - } + columns []*expression.Column // the columns of mock schema } func (a windowTestCase) String() string { - return fmt.Sprintf("(func:%v, numFunc:%v, ndv:%v, rows:%v, sorted:%v, concurrency:%v)", - a.windowFunc, a.numFunc, a.ndv, a.rows, a.dataSourceSorted, a.concurrency) + return fmt.Sprintf("(func:%v, aggColType:%s, numFunc:%v, ndv:%v, rows:%v, sorted:%v, concurrency:%v)", + a.windowFunc, a.columns[0].RetType, a.numFunc, a.ndv, a.rows, a.dataSourceSorted, a.concurrency) } func defaultWindowTestCase() *windowTestCase { ctx := mock.NewContext() ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize - return &windowTestCase{ast.WindowFuncRowNumber, 1, nil, 1000, 10000000, 1, true, ctx, strings.Repeat("x", 16)} + return &windowTestCase{ast.WindowFuncRowNumber, 1, nil, 1000, 10000000, 1, true, ctx, strings.Repeat("x", 16), + []*expression.Column{ + {Index: 0, RetType: types.NewFieldType(mysql.TypeDouble)}, + {Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}, + {Index: 2, RetType: types.NewFieldType(mysql.TypeVarString)}, + {Index: 3, RetType: types.NewFieldType(mysql.TypeLonglong)}, + }} } func benchmarkWindowExecWithCase(b *testing.B, casTest *windowTestCase) { @@ -538,29 +538,50 @@ func benchmarkWindowExecWithCase(b *testing.B, casTest *windowTestCase) { b.Fatal(err) } - cols := casTest.columns() + const ndvIndex = 1 + datumSet := make(map[string]bool, casTest.ndv) + datums := make([]interface{}, 0, casTest.ndv) + genDataFunc := func(col, row int, typ *types.FieldType) interface{} { + var ret interface{} + switch typ.Tp { + case mysql.TypeLong, mysql.TypeLonglong: + ret = int64(row) + case mysql.TypeDouble, mysql.TypeFloat: + ret = float64(row) + case mysql.TypeDecimal: + ret = types.NewDecFromInt(int64(row)) + case mysql.TypeVarString: + ret = casTest.rawDataSmall + default: + panic("not implement") + } + if col != ndvIndex { + return ret + } + + if len(datums) < casTest.ndv { + str := fmt.Sprintf("%v", ret) + datumSet[str] = true + datums = append(datums, ret) + return ret + } + return datums[rand.Intn(casTest.ndv)] + } + + cols := casTest.columns dataSource := buildMockDataSource(mockDataSourceParameters{ - schema: expression.NewSchema(cols...), - ndvs: []int{0, casTest.ndv, 0, 0}, - orders: []bool{false, casTest.dataSourceSorted, false, false}, - rows: casTest.rows, - ctx: casTest.ctx, - genDataFunc: func(row int, typ *types.FieldType) interface{} { - switch typ.Tp { - case mysql.TypeLong, mysql.TypeLonglong: - return int64(row) - case mysql.TypeVarString: - return casTest.rawDataSmall - default: - panic("not implement") - } - }, + schema: expression.NewSchema(cols...), + ndvs: []int{0, casTest.ndv, 0, 0}, + orders: []bool{false, casTest.dataSourceSorted, false, false}, + rows: casTest.rows, + ctx: casTest.ctx, + genDataFunc: genDataFunc, }) b.ResetTimer() for i := 0; i < b.N; i++ { b.StopTimer() // prepare a new window-executor - childCols := casTest.columns() + childCols := casTest.columns schema := expression.NewSchema(childCols...) windowExec := buildWindowExecutor(casTest.ctx, casTest.windowFunc, casTest.numFunc, casTest.frame, dataSource, schema, childCols[1:2], casTest.concurrency, casTest.dataSourceSorted) tmpCtx := context.Background() @@ -675,11 +696,14 @@ func BenchmarkWindowFunctionsWithFrame(b *testing.B) { } } -func benchmarkWindowFunctionsWithSlidingWindow(b *testing.B, frameType ast.FrameType) { +func baseBenchmarkWindowFunctionsWithSlidingWindow(b *testing.B, frameType ast.FrameType) { b.ReportAllocs() - windowFuncs := []string{ - ast.AggFuncCount, - ast.AggFuncSum, + windowFuncs := []struct { + aggFunc string + aggColTypes []byte + }{ + {ast.AggFuncSum, []byte{mysql.TypeDecimal, mysql.TypeFloat}}, + {ast.AggFuncCount, []byte{mysql.TypeLong}}, } rows := []int{1000, 100000} ndvs := []int{10, 100} @@ -687,30 +711,30 @@ func benchmarkWindowFunctionsWithSlidingWindow(b *testing.B, frameType ast.Frame {Type: frameType, Start: &core.FrameBound{Type: ast.Preceding, Num: 10}, End: &core.FrameBound{Type: ast.Following, Num: 10}}, {Type: frameType, Start: &core.FrameBound{Type: ast.Preceding, Num: 100}, End: &core.FrameBound{Type: ast.Following, Num: 100}}, } - for _, row := range rows { - for _, ndv := range ndvs { - for _, frame := range frames { - for _, windowFunc := range windowFuncs { - cas := defaultWindowTestCase() - cas.rows = row - cas.ndv = ndv - cas.windowFunc = windowFunc - cas.frame = frame - b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) { - benchmarkWindowExecWithCase(b, cas) - }) + for _, windowFunc := range windowFuncs { + for _, aggColType := range windowFunc.aggColTypes { + for _, row := range rows { + for _, ndv := range ndvs { + for _, frame := range frames { + cas := defaultWindowTestCase() + cas.rows = row + cas.ndv = ndv + cas.windowFunc = windowFunc.aggFunc + cas.frame = frame + cas.columns[0] = &expression.Column{Index: 0, RetType: types.NewFieldType(aggColType)} + b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) { + benchmarkWindowExecWithCase(b, cas) + }) + } } } } } } -func BenchmarkWindowFunctionsWithSlidingWindowRows(b *testing.B) { - benchmarkWindowFunctionsWithSlidingWindow(b, ast.Rows) -} - -func BenchmarkWindowFunctionsWithSlidingWindowRanges(b *testing.B) { - benchmarkWindowFunctionsWithSlidingWindow(b, ast.Ranges) +func BenchmarkWindowFunctionsWithSlidingWindow(b *testing.B) { + baseBenchmarkWindowFunctionsWithSlidingWindow(b, ast.Rows) + baseBenchmarkWindowFunctionsWithSlidingWindow(b, ast.Ranges) } type hashJoinTestCase struct { @@ -823,7 +847,7 @@ func benchmarkHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) { opt1 := mockDataSourceParameters{ rows: casTest.rows, ctx: casTest.ctx, - genDataFunc: func(row int, typ *types.FieldType) interface{} { + genDataFunc: func(col, row int, typ *types.FieldType) interface{} { switch typ.Tp { case mysql.TypeLong, mysql.TypeLonglong: return int64(row) @@ -1015,7 +1039,7 @@ func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) { schema: expression.NewSchema(casTest.columns()...), rows: casTest.rows, ctx: casTest.ctx, - genDataFunc: func(row int, typ *types.FieldType) interface{} { + genDataFunc: func(col, row int, typ *types.FieldType) interface{} { switch typ.Tp { case mysql.TypeLong, mysql.TypeLonglong: return int64(row) @@ -1139,7 +1163,7 @@ func (tc indexJoinTestCase) getMockDataSourceOptByRows(rows int) mockDataSourceP schema: expression.NewSchema(tc.columns()...), rows: rows, ctx: tc.ctx, - genDataFunc: func(row int, typ *types.FieldType) interface{} { + genDataFunc: func(col, row int, typ *types.FieldType) interface{} { switch typ.Tp { case mysql.TypeLong, mysql.TypeLonglong: return int64(row) @@ -1431,7 +1455,7 @@ func newMergeJoinBenchmark(numOuterRows, numInnerDup, numInnerRedundant int) (tc schema: expression.NewSchema(tc.columns()...), rows: numOuterRows, ctx: tc.ctx, - genDataFunc: func(row int, typ *types.FieldType) interface{} { + genDataFunc: func(col, row int, typ *types.FieldType) interface{} { switch typ.Tp { case mysql.TypeLong, mysql.TypeLonglong: return int64(row) @@ -1449,7 +1473,7 @@ func newMergeJoinBenchmark(numOuterRows, numInnerDup, numInnerRedundant int) (tc schema: expression.NewSchema(tc.columns()...), rows: numInnerRows, ctx: tc.ctx, - genDataFunc: func(row int, typ *types.FieldType) interface{} { + genDataFunc: func(col, row int, typ *types.FieldType) interface{} { row = row / numInnerDup switch typ.Tp { case mysql.TypeLong, mysql.TypeLonglong: diff --git a/executor/join_pkg_test.go b/executor/join_pkg_test.go index 8086f643884d9..b2dd917e3ddca 100644 --- a/executor/join_pkg_test.go +++ b/executor/join_pkg_test.go @@ -33,7 +33,7 @@ func (s *pkgTestSuite) TestJoinExec(c *C) { opt1 := mockDataSourceParameters{ rows: casTest.rows, ctx: casTest.ctx, - genDataFunc: func(row int, typ *types.FieldType) interface{} { + genDataFunc: func(col, row int, typ *types.FieldType) interface{} { switch typ.Tp { case mysql.TypeLong, mysql.TypeLonglong: return int64(row) diff --git a/executor/pkg_test.go b/executor/pkg_test.go index a4e1986d728d8..3bbf5e5500401 100644 --- a/executor/pkg_test.go +++ b/executor/pkg_test.go @@ -30,7 +30,7 @@ func (s *pkgTestSuite) TestNestedLoopApply(c *C) { schema: outerSchema, rows: 6, ctx: sctx, - genDataFunc: func(row int, typ *types.FieldType) interface{} { + genDataFunc: func(col, row int, typ *types.FieldType) interface{} { return int64(row + 1) }, }) @@ -41,7 +41,7 @@ func (s *pkgTestSuite) TestNestedLoopApply(c *C) { schema: innerSchema, rows: 6, ctx: sctx, - genDataFunc: func(row int, typ *types.FieldType) interface{} { + genDataFunc: func(col, row int, typ *types.FieldType) interface{} { return int64(row + 1) }, })