From 705cac1aa086827c1a4ef2b399b14dea2b484f26 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Tue, 24 Dec 2024 09:15:43 +0800 Subject: [PATCH] feat: push down alias decode (#3460) Signed-off-by: Song Gao --- internal/converter/json/converter.go | 73 +++++++++-- internal/pkg/def/rule.go | 1 + internal/topo/operator/windowfunc_operator.go | 2 +- .../topo/operator/windowfunc_operator_test.go | 4 +- internal/topo/planner/dataSourcePlan.go | 23 ++++ internal/topo/planner/optimizer.go | 7 +- internal/topo/planner/plan_explain_test.go | 36 +++++ internal/topo/planner/planner.go | 124 ++++++++++++++---- internal/topo/planner/planner_source.go | 4 + internal/topo/planner/push_proj.go | 108 +++++++++++---- internal/topo/planner/rules.go | 8 +- internal/topo/planner/windowFuncPlan.go | 2 +- 12 files changed, 323 insertions(+), 69 deletions(-) diff --git a/internal/converter/json/converter.go b/internal/converter/json/converter.go index 22da770d3e..5767b4eed3 100644 --- a/internal/converter/json/converter.go +++ b/internal/converter/json/converter.go @@ -35,7 +35,8 @@ type FastJsonConverter struct { } type FastJsonConverterConf struct { - UseInt64 bool `json:"useInt64ForWholeNumber"` + UseInt64 bool `json:"useInt64ForWholeNumber"` + ColAliasMapping map[string]string `json:"colAliasMapping"` } func NewFastJsonConverter(schema map[string]*ast.JsonStreamField, props map[string]any) *FastJsonConverter { @@ -120,7 +121,7 @@ func (f *FastJsonConverter) decodeWithSchema(b []byte, schema map[string]*ast.Js if err != nil { return nil, err } - subMap, err := f.decodeObject(obj, schema) + subMap, err := f.decodeObject(obj, schema, false) if err != nil { return nil, err } @@ -132,7 +133,7 @@ func (f *FastJsonConverter) decodeWithSchema(b []byte, schema map[string]*ast.Js if err != nil { return nil, err } - m, err := f.decodeObject(obj, schema) + m, err := f.decodeObject(obj, schema, true) if err != nil { return nil, err } @@ -159,7 +160,7 @@ func (f *FastJsonConverter) decodeArray(array []*fastjson.Value, field *ast.Json if field != nil { props = field.Properties } - subMap, err := f.decodeObject(childObj, props) + subMap, err := f.decodeObject(childObj, props, false) if err != nil { return nil, err } @@ -214,7 +215,7 @@ func (f *FastJsonConverter) decodeArray(array []*fastjson.Value, field *ast.Json return vs, nil } -func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string]*ast.JsonStreamField) (map[string]interface{}, error) { +func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string]*ast.JsonStreamField, isOuter bool) (map[string]interface{}, error) { m := make(map[string]interface{}) var err error obj.Visit(func(k []byte, v *fastjson.Value) { @@ -242,13 +243,23 @@ func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string if schema != nil && schema[key] != nil { props = schema[key].Properties } - childMap, err2 := f.decodeObject(childObj, props) + childMap, err2 := f.decodeObject(childObj, props, false) if err2 != nil { err = err2 return } if childMap != nil { - m[key] = childMap + set := false + if isOuter && len(f.ColAliasMapping) > 0 { + alias, ok := f.ColAliasMapping[key] + if ok { + set = true + m[alias] = childMap + } + } + if !set { + m[key] = childMap + } } case fastjson.TypeArray: add, valid := f.checkSchema(key, "array", schema) @@ -274,7 +285,17 @@ func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string return } if subList != nil { - m[key] = subList + set := false + if isOuter && len(f.ColAliasMapping) > 0 { + alias, ok := f.ColAliasMapping[key] + if ok { + set = true + m[alias] = subList + } + } + if !set { + m[key] = subList + } } case fastjson.TypeString: if schema != nil { @@ -289,7 +310,17 @@ func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string return } if v != nil { - m[key] = v + set := false + if isOuter && len(f.ColAliasMapping) > 0 { + alias, ok := f.ColAliasMapping[key] + if ok { + set = true + m[alias] = v + } + } + if !set { + m[key] = v + } } case fastjson.TypeNumber: if schema != nil { @@ -304,7 +335,17 @@ func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string return } if v != nil { - m[key] = v + set := false + if isOuter && len(f.ColAliasMapping) > 0 { + alias, ok := f.ColAliasMapping[key] + if ok { + set = true + m[alias] = v + } + } + if !set { + m[key] = v + } } case fastjson.TypeTrue, fastjson.TypeFalse: if schema != nil { @@ -319,7 +360,17 @@ func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string return } if v != nil { - m[key] = v + set := false + if isOuter && len(f.ColAliasMapping) > 0 { + alias, ok := f.ColAliasMapping[key] + if ok { + set = true + m[alias] = v + } + } + if !set { + m[key] = v + } } } }) diff --git a/internal/pkg/def/rule.go b/internal/pkg/def/rule.go index 1ba4091705..a03e379d18 100644 --- a/internal/pkg/def/rule.go +++ b/internal/pkg/def/rule.go @@ -46,6 +46,7 @@ type RuleOption struct { type PlanOptimizeStrategy struct { EnableIncrementalWindow bool `json:"enableIncrementalWindow,omitempty" yaml:"enableIncrementalWindow,omitempty"` + EnableAliasPushdown bool `json:"enableAliasPushdown,omitempty" yaml:"enableAliasPushdown,omitempty"` } type RestartStrategy struct { diff --git a/internal/topo/operator/windowfunc_operator.go b/internal/topo/operator/windowfunc_operator.go index a8fca2abe8..1d92faf612 100644 --- a/internal/topo/operator/windowfunc_operator.go +++ b/internal/topo/operator/windowfunc_operator.go @@ -25,7 +25,7 @@ import ( ) type WindowFuncOperator struct { - WindowFuncField ast.Field + WindowFuncField *ast.Field } type windowFuncHandle interface { diff --git a/internal/topo/operator/windowfunc_operator_test.go b/internal/topo/operator/windowfunc_operator_test.go index bcdf9d4337..2ce4047bab 100644 --- a/internal/topo/operator/windowfunc_operator_test.go +++ b/internal/topo/operator/windowfunc_operator_test.go @@ -78,7 +78,7 @@ func TestWindowFuncApplyCollection(t *testing.T) { { data: data1, op: &WindowFuncOperator{ - WindowFuncField: ast.Field{ + WindowFuncField: &ast.Field{ Name: "row_number", Expr: &ast.Call{ Name: "row_number", @@ -128,7 +128,7 @@ func TestWindowFuncApplyCollection(t *testing.T) { { data: data2, op: &WindowFuncOperator{ - WindowFuncField: ast.Field{ + WindowFuncField: &ast.Field{ Name: "row_number", Expr: &ast.Call{ Name: "row_number", diff --git a/internal/topo/planner/dataSourcePlan.go b/internal/topo/planner/dataSourcePlan.go index cb46e05aca..2eae442d1d 100644 --- a/internal/topo/planner/dataSourcePlan.go +++ b/internal/topo/planner/dataSourcePlan.go @@ -41,6 +41,8 @@ type DataSourcePlan struct { iet bool timestampFormat string timestampField string + // col -> alias + colAliasMapping map[string]string // intermediate status isWildCard bool fields map[string]*ast.JsonStreamField @@ -117,6 +119,21 @@ func (p *DataSourcePlan) BuildExplainInfo() { } info += " ]" } + if len(p.colAliasMapping) > 0 { + info += ", ColAliasMapping:[ " + keys := make([]string, 0) + for col, alias := range p.colAliasMapping { + keys = append(keys, col+":"+alias) + } + sort.Strings(keys) + for i := 0; i < len(keys); i++ { + info += keys[i] + if i != len(keys)-1 { + info += ", " + } + } + info += " ]" + } p.baseLogicalPlan.ExplainInfo.Info = info } @@ -357,6 +374,12 @@ func markPruneJSONStreamField(cur interface{}, field *ast.JsonStreamField) { } func (p *DataSourcePlan) getField(name string, strict bool) (*ast.JsonStreamField, error) { + for col, alias := range p.colAliasMapping { + if name == alias { + name = col + break + } + } if !p.isSchemaless { r, ok := p.streamFields[name] if !ok { diff --git a/internal/topo/planner/optimizer.go b/internal/topo/planner/optimizer.go index 8e9ad39f72..7dc633f7e6 100644 --- a/internal/topo/planner/optimizer.go +++ b/internal/topo/planner/optimizer.go @@ -14,16 +14,19 @@ package planner +import "github.com/lf-edge/ekuiper/v2/internal/pkg/def" + var optRuleList = []logicalOptRule{ &columnPruner{}, &predicatePushDown{}, &pushProjectionPlan{}, + &pushAliasDecode{}, } -func optimize(p LogicalPlan) (LogicalPlan, error) { +func optimize(p LogicalPlan, options *def.RuleOption) (LogicalPlan, error) { var err error for _, rule := range optRuleList { - p, err = rule.optimize(p) + p, err = rule.optimize(p, options) if err != nil { return nil, err } diff --git a/internal/topo/planner/plan_explain_test.go b/internal/topo/planner/plan_explain_test.go index 6bfed9a30c..b4114a3e70 100644 --- a/internal/topo/planner/plan_explain_test.go +++ b/internal/topo/planner/plan_explain_test.go @@ -207,3 +207,39 @@ func TestSupportedWindowType(t *testing.T) { require.Equal(t, tc.ok, supportedWindowType(tc.w)) } } + +func TestExplainPushAlias(t *testing.T) { + kv, err := store.GetKV("stream") + require.NoError(t, err) + require.NoError(t, prepareStream()) + + testcases := []struct { + sql string + explain string + }{ + { + sql: `select a as a1 from stream`, + explain: `{"op":"ProjectPlan_0","info":"Fields:[ stream.a1 ]"} + {"op":"DataSourcePlan_1","info":"StreamName: stream, StreamFields:[ a ], ColAliasMapping:[ a:a1 ]"}`, + }, + { + sql: `select a as a1, * from stream`, + explain: `{"op":"ProjectPlan_0","info":"Fields:[ $$alias.a1,aliasRef:stream.a, * ]"} + {"op":"DataSourcePlan_1","info":"StreamName: stream, StreamFields:[ a, b ]"}`, + }, + } + for _, tc := range testcases { + stmt, err := xsql.NewParser(strings.NewReader(tc.sql)).Parse() + require.NoError(t, err) + p, err := createLogicalPlan(stmt, &def.RuleOption{ + PlanOptimizeStrategy: &def.PlanOptimizeStrategy{ + EnableAliasPushdown: true, + }, + Qos: 0, + }, kv) + require.NoError(t, err) + explain, err := ExplainFromLogicalPlan(p, "") + require.NoError(t, err) + require.Equal(t, tc.explain, explain, tc.sql) + } +} diff --git a/internal/topo/planner/planner.go b/internal/topo/planner/planner.go index 5467e0509b..f8dc6ffaf0 100644 --- a/internal/topo/planner/planner.go +++ b/internal/topo/planner/planner.go @@ -368,7 +368,7 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *def.RuleOption, store kv. if err != nil { return nil, err } - windowFuncFields, incAggFields := rewriteStmt(stmt, opt) + rewriteRes := rewriteStmt(stmt, opt) for _, sInfo := range streamStmts { if sInfo.stmt.StreamType == ast.TypeTable && sInfo.stmt.Options.KIND == ast.StreamKindLookup { @@ -377,13 +377,15 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *def.RuleOption, store kv. } lookupTableChildren[string(sInfo.stmt.Name)] = sInfo.stmt.Options } else { + p = DataSourcePlan{ - name: sInfo.stmt.Name, - streamStmt: sInfo.stmt, - streamFields: sInfo.schema.ToJsonSchema(), - isSchemaless: sInfo.schema == nil, - iet: opt.IsEventTime, - allMeta: opt.SendMetaToSink, + name: sInfo.stmt.Name, + streamStmt: sInfo.stmt, + streamFields: sInfo.schema.ToJsonSchema(), + isSchemaless: sInfo.schema == nil, + iet: opt.IsEventTime, + allMeta: opt.SendMetaToSink, + colAliasMapping: rewriteRes.dsColAliasMapping[sInfo.stmt.Name], }.Init() if sInfo.stmt.StreamType == ast.TypeStream { children = append(children, p) @@ -429,12 +431,12 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *def.RuleOption, store kv. if len(children) == 0 { return nil, errors.New("cannot run window for TABLE sources") } - if len(incAggFields) > 0 { + if len(rewriteRes.incAggFields) > 0 { incWp := IncWindowPlan{ WType: w.WindowType, Length: int(w.Length.Val), Dimensions: dimensions.GetGroups(), - IncAggFuncs: incAggFields, + IncAggFuncs: rewriteRes.incAggFields, Condition: w.Filter, TriggerCondition: w.TriggerCondition, }.Init() @@ -539,7 +541,7 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *def.RuleOption, store kv. p.SetChildren(children) children = []LogicalPlan{p} } - if dimensions != nil && len(incAggFields) < 1 { + if dimensions != nil && len(rewriteRes.incAggFields) < 1 { ds = dimensions.GetGroups() if ds != nil && len(ds) > 0 { p = AggregatePlan{ @@ -552,13 +554,13 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *def.RuleOption, store kv. if stmt.Having != nil { p = HavingPlan{ condition: stmt.Having, - IsIncAgg: len(incAggFields) > 0, + IsIncAgg: len(rewriteRes.incAggFields) > 0, }.Init() p.SetChildren(children) children = []LogicalPlan{p} } - if len(windowFuncFields) > 0 { - for _, wf := range windowFuncFields { + if len(rewriteRes.windowFuncFields) > 0 { + for _, wf := range rewriteRes.windowFuncFields { p = WindowFuncPlan{ windowFuncField: wf, }.Init() @@ -624,7 +626,7 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *def.RuleOption, store kv. } p = ProjectPlan{ fields: fields, - isAggregate: xsql.WithAggFields(stmt) && len(incAggFields) < 1, + isAggregate: xsql.WithAggFields(stmt) && len(rewriteRes.incAggFields) < 1, sendMeta: opt.SendMetaToSink, sendNil: opt.SendNil, enableLimit: enableLimit, @@ -649,7 +651,7 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *def.RuleOption, store kv. p.SetChildren(children) } - return optimize(p) + return optimize(p, opt) } // extractSRFMapping extracts the set-returning-function in the field @@ -678,8 +680,8 @@ func Transform(op node.UnOperation, name string, options *def.RuleOption) *node. return unaryOperator } -func extractWindowFuncFields(stmt *ast.SelectStatement) []ast.Field { - windowFuncFields := make([]ast.Field, 0) +func extractWindowFuncFields(stmt *ast.SelectStatement) []*ast.Field { + windowFuncFields := make([]*ast.Field, 0) windowFunctionCount := 0 ast.WalkFunc(stmt.Fields, func(n ast.Node) bool { switch wf := n.(type) { @@ -699,7 +701,7 @@ func extractWindowFuncFields(stmt *ast.SelectStatement) []ast.Field { newFieldRef := &ast.FieldRef{ Name: newName, } - windowFuncFields = append(windowFuncFields, newField) + windowFuncFields = append(windowFuncFields, &newField) rewriteIntoBypass(newFieldRef, wf) } } @@ -708,13 +710,21 @@ func extractWindowFuncFields(stmt *ast.SelectStatement) []ast.Field { return windowFuncFields } +type rewriteResult struct { + windowFuncFields []*ast.Field + incAggFields []*ast.Field + dsColAliasMapping map[ast.StreamName]map[string]string +} + // rewrite stmt will do following things: // 1. extract and rewrite the window function // 2. extract and rewrite the aggregation function -func rewriteStmt(stmt *ast.SelectStatement, opt *def.RuleOption) ([]ast.Field, []*ast.Field) { - windowFunctionPlanField := extractWindowFuncFields(stmt) - incAggWindowPlanField := rewriteIfIncAggStmt(stmt, opt) - return windowFunctionPlanField, incAggWindowPlanField +func rewriteStmt(stmt *ast.SelectStatement, opt *def.RuleOption) rewriteResult { + result := rewriteResult{} + result.windowFuncFields = extractWindowFuncFields(stmt) + result.incAggFields = rewriteIfIncAggStmt(stmt, opt) + result.dsColAliasMapping = rewriteIfPushdownAlias(stmt, opt) + return result } func rewriteIfIncAggStmt(stmt *ast.SelectStatement, opt *def.RuleOption) []*ast.Field { @@ -825,3 +835,73 @@ var supportedWType = map[ast.WindowType]struct{}{ ast.HOPPING_WINDOW: {}, ast.TUMBLING_WINDOW: {}, } + +func rewriteIfPushdownAlias(stmt *ast.SelectStatement, opt *def.RuleOption) map[ast.StreamName]map[string]string { + if opt.PlanOptimizeStrategy == nil { + return nil + } + if !opt.PlanOptimizeStrategy.EnableAliasPushdown { + return nil + } + if hasWildcard(stmt) { + return nil + } + dsColAliasMapping := make(map[ast.StreamName]map[string]string) + for index, field := range stmt.Fields { + afr, ok := field.Expr.(*ast.FieldRef) + if ok && afr.IsAlias() && afr.Expression != nil { + cfr, ok := afr.Expression.(*ast.FieldRef) + if ok && cfr.IsColumn() && cfr.Name == field.Name { + columnUsed := searchColumnUsedCount(stmt, field.Name) + if columnUsed == 1 { + newField := buildField(afr.Name, cfr.StreamName) + stmt.Fields[index] = newField + v, ok := dsColAliasMapping[cfr.StreamName] + if !ok { + v = make(map[string]string) + } + v[cfr.Name] = afr.Name + dsColAliasMapping[cfr.StreamName] = v + } + } + } + } + return dsColAliasMapping +} + +func hasWildcard(stmt *ast.SelectStatement) bool { + wildcard := false + ast.WalkFunc(stmt, func(n ast.Node) bool { + switch n.(type) { + case *ast.Wildcard: + wildcard = true + return false + } + return true + }) + return wildcard +} + +func searchColumnUsedCount(stmt *ast.SelectStatement, colName string) int { + count := 0 + ast.WalkFunc(stmt, func(n ast.Node) bool { + fr, ok := n.(*ast.FieldRef) + if ok && fr.IsColumn() { + if fr.Name == colName { + count++ + } + } + return true + }) + return count +} + +func buildField(colName string, streamName ast.StreamName) ast.Field { + return ast.Field{ + Name: colName, + Expr: &ast.FieldRef{ + Name: colName, + StreamName: streamName, + }, + } +} diff --git a/internal/topo/planner/planner_source.go b/internal/topo/planner/planner_source.go index ac7a94c292..06001f1e56 100644 --- a/internal/topo/planner/planner_source.go +++ b/internal/topo/planner/planner_source.go @@ -121,6 +121,10 @@ func splitSource(ctx api.StreamContext, t *DataSourcePlan, ss api.Source, option index++ ops = append(ops, op) } + if len(t.colAliasMapping) > 0 { + props["colAliasMapping"] = t.colAliasMapping + } + // Need to check after source has provisioned, so do not put it before provision featureSet, err := checkFeatures(ss, sp, props) if err != nil { diff --git a/internal/topo/planner/push_proj.go b/internal/topo/planner/push_proj.go index 276663ee27..5a817da741 100644 --- a/internal/topo/planner/push_proj.go +++ b/internal/topo/planner/push_proj.go @@ -15,6 +15,7 @@ package planner import ( + "github.com/lf-edge/ekuiper/v2/internal/pkg/def" "github.com/lf-edge/ekuiper/v2/pkg/ast" ) @@ -22,7 +23,7 @@ type pushProjectionPlan struct{} // pushProjectionPlan inject Projection Plan between the shared Datasource and its father only if the Plan have windowPlan // We use Projection to remove the unused column before windowPlan in order to reduce memory consuming -func (pp *pushProjectionPlan) optimize(plan LogicalPlan) (LogicalPlan, error) { +func (pp *pushProjectionPlan) optimize(plan LogicalPlan, _ *def.RuleOption) (LogicalPlan, error) { if pp.searchJoinPlan(plan) { return plan, nil } @@ -30,7 +31,7 @@ func (pp *pushProjectionPlan) optimize(plan LogicalPlan) (LogicalPlan, error) { ctx := &searchCtx{ find: make([]*sharedSource, 0), } - pp.searchSharedDataSource(ctx, plan, nil) + searchSharedDataSource(ctx, plan, nil) if len(ctx.find) > 0 { pp.pushProjection(ctx) } @@ -68,31 +69,6 @@ func (pp *pushProjectionPlan) searchJoinPlan(plan LogicalPlan) bool { return false } -type searchCtx struct { - find []*sharedSource -} - -type sharedSource struct { - ds *DataSourcePlan - father LogicalPlan -} - -func (pp *pushProjectionPlan) searchSharedDataSource(ctx *searchCtx, plan, father LogicalPlan) { - switch ds := plan.(type) { - case *DataSourcePlan: - if ds.streamStmt.Options.SHARED { - ctx.find = append(ctx.find, &sharedSource{ - ds: ds, - father: father, - }) - } - default: - } - for _, child := range plan.Children() { - pp.searchSharedDataSource(ctx, child, plan) - } -} - func (pp *pushProjectionPlan) pushProjection(ctx *searchCtx) { for _, search := range ctx.find { p := ProjectPlan{ @@ -126,3 +102,81 @@ func buildFields(ds *DataSourcePlan) []ast.Field { func (pp *pushProjectionPlan) name() string { return "push_projection" } + +type pushAliasDecode struct{} + +func (p *pushAliasDecode) optimize(plan LogicalPlan, option *def.RuleOption) (LogicalPlan, error) { + if option.PlanOptimizeStrategy == nil { + return plan, nil + } + if !option.PlanOptimizeStrategy.EnableAliasPushdown { + return plan, nil + } + ctx := &searchCtx{ + find: make([]*sharedSource, 0), + noSharedDatasource: make([]*DataSourcePlan, 0), + } + searchSharedDataSource(ctx, plan, nil) + if len(ctx.find) > 0 { + return plan, nil + } + searchNoSharedDatasource(ctx, plan) + if len(ctx.noSharedDatasource) < 1 { + return plan, nil + } + for _, ds := range ctx.noSharedDatasource { + if len(ds.streamFields) > 0 { + for col, alias := range ds.colAliasMapping { + v, ok := ds.streamFields[alias] + if ok { + ds.streamFields[col] = v + delete(ds.streamFields, alias) + } + } + } + } + return plan, nil +} + +func (p *pushAliasDecode) name() string { + return "push_alias" +} + +type searchCtx struct { + find []*sharedSource + noSharedDatasource []*DataSourcePlan +} + +type sharedSource struct { + ds *DataSourcePlan + father LogicalPlan +} + +func searchSharedDataSource(ctx *searchCtx, plan, father LogicalPlan) { + switch ds := plan.(type) { + case *DataSourcePlan: + if ds.streamStmt.Options.SHARED { + ctx.find = append(ctx.find, &sharedSource{ + ds: ds, + father: father, + }) + } + default: + } + for _, child := range plan.Children() { + searchSharedDataSource(ctx, child, plan) + } +} + +func searchNoSharedDatasource(ctx *searchCtx, plan LogicalPlan) { + switch ds := plan.(type) { + case *DataSourcePlan: + if !ds.streamStmt.Options.SHARED { + ctx.noSharedDatasource = append(ctx.noSharedDatasource, ds) + } + default: + } + for _, child := range plan.Children() { + searchNoSharedDatasource(ctx, child) + } +} diff --git a/internal/topo/planner/rules.go b/internal/topo/planner/rules.go index a5e9ce0cc5..e394e7dd82 100644 --- a/internal/topo/planner/rules.go +++ b/internal/topo/planner/rules.go @@ -14,14 +14,16 @@ package planner +import "github.com/lf-edge/ekuiper/v2/internal/pkg/def" + type logicalOptRule interface { - optimize(LogicalPlan) (LogicalPlan, error) + optimize(LogicalPlan, *def.RuleOption) (LogicalPlan, error) name() string } type predicatePushDown struct{} -func (r *predicatePushDown) optimize(lp LogicalPlan) (LogicalPlan, error) { +func (r *predicatePushDown) optimize(lp LogicalPlan, _ *def.RuleOption) (LogicalPlan, error) { _, p := lp.PushDownPredicate(nil) return p, nil } @@ -32,7 +34,7 @@ func (r *predicatePushDown) name() string { type columnPruner struct{} -func (r *columnPruner) optimize(lp LogicalPlan) (LogicalPlan, error) { +func (r *columnPruner) optimize(lp LogicalPlan, _ *def.RuleOption) (LogicalPlan, error) { err := lp.PruneColumns(nil) return lp, err } diff --git a/internal/topo/planner/windowFuncPlan.go b/internal/topo/planner/windowFuncPlan.go index 9f2cfd084e..d896381b0c 100644 --- a/internal/topo/planner/windowFuncPlan.go +++ b/internal/topo/planner/windowFuncPlan.go @@ -20,7 +20,7 @@ import ( type WindowFuncPlan struct { baseLogicalPlan - windowFuncField ast.Field + windowFuncField *ast.Field } func (p WindowFuncPlan) Init() *WindowFuncPlan {