Skip to content

Commit

Permalink
Signed-off-by: Song Gao <disxiaofei@163.com>
Browse files Browse the repository at this point in the history
fix

Signed-off-by: Song Gao <disxiaofei@163.com>

fix

Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer committed Dec 25, 2024
1 parent 705cac1 commit 17f5887
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 23 deletions.
12 changes: 11 additions & 1 deletion internal/pkg/def/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ type RuleOption struct {
type PlanOptimizeStrategy struct {
EnableIncrementalWindow bool `json:"enableIncrementalWindow,omitempty" yaml:"enableIncrementalWindow,omitempty"`
EnableAliasPushdown bool `json:"enableAliasPushdown,omitempty" yaml:"enableAliasPushdown,omitempty"`
EnableAliasRefCal bool `json:"enableAliasRefCal,omitempty" yaml:"enableAliasRefCal,omitempty"`
}

func (p *PlanOptimizeStrategy) IsAliasRefCalEnable() bool {
if p == nil {
return false
}
return p.EnableAliasRefCal
}

type RestartStrategy struct {
Expand Down Expand Up @@ -140,7 +148,9 @@ func GetDefaultRule(name, sql string) *Rule {
MaxDelay: 30000,
JitterFactor: 0.1,
},
PlanOptimizeStrategy: &PlanOptimizeStrategy{},
PlanOptimizeStrategy: &PlanOptimizeStrategy{
EnableAliasRefCal: true,
},
},
}
}
Expand Down
34 changes: 18 additions & 16 deletions internal/topo/planner/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

"github.com/lf-edge/ekuiper/v2/internal/binder/function"
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
"github.com/lf-edge/ekuiper/v2/internal/schema"
"github.com/lf-edge/ekuiper/v2/internal/xsql"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
Expand All @@ -33,7 +34,7 @@ type streamInfo struct {

// Analyze the select statement by decorating the info from stream statement.
// Typically, set the correct stream name for fieldRefs
func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*ast.Call, []*ast.Call, error) {
func decorateStmt(s *ast.SelectStatement, store kv.KeyValue, opt *def.RuleOption) ([]*streamInfo, []*ast.Call, []*ast.Call, error) {
streamsFromStmt := xsql.GetStreams(s)
streamStmts := make([]*streamInfo, len(streamsFromStmt))
isSchemaless := false
Expand All @@ -54,7 +55,7 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*
if checkAliasReferenceCycle(s) {
return nil, nil, nil, fmt.Errorf("select fields have cycled alias")
}
if !isSchemaless {
if !isSchemaless && opt.PlanOptimizeStrategy.IsAliasRefCalEnable() {
if err := aliasFieldTopoSort(s, streamStmts); err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -120,21 +121,22 @@ func decorateStmt(s *ast.SelectStatement, store kv.KeyValue) ([]*streamInfo, []*
AliasRef: ar,
}
walkErr = fieldsMap.save(f.AName, ast.AliasStream, ar)
for _, subF := range s.Fields {
if f.AName == subF.AName {
continue
}
ast.WalkFunc(&subF, func(node ast.Node) bool {
switch fr := node.(type) {
case *ast.FieldRef:
if fr.Name == f.AName && fr.StreamName == streamName {
fr.StreamName = ast.AliasStream
fr.AliasRef = ar
}
return false
if opt.PlanOptimizeStrategy.IsAliasRefCalEnable() {
for _, subF := range s.Fields {
if f.AName == subF.AName {
continue
}
return true
})
ast.WalkFunc(&subF, func(node ast.Node) bool {
switch fr := node.(type) {
case *ast.FieldRef:
if fr.Name == f.AName && fr.StreamName == streamName {
fr.StreamName = ast.AliasStream
fr.AliasRef = ar
}
}
return true
})
}
}
}
}
Expand Down
6 changes: 1 addition & 5 deletions internal/topo/planner/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ var tests = []struct {
},
{ // 10
sql: `SELECT sum(temp) as temp1, count(temp) as temp FROM src1`,
r: newErrorStruct("invalid argument for func sum: aggregate argument is not allowed"),
r: newErrorStruct(""),
},
{ // 11
sql: `SELECT sum(temp) as temp1, count(temp) as ct FROM src1`,
Expand Down Expand Up @@ -198,7 +198,6 @@ func TestCheckTopoSort(t *testing.T) {
}

func Test_validation(t *testing.T) {
tests[10].r = newErrorStruct("invalid argument for func sum: aggregate argument is not allowed")
store, err := store.GetKV("stream")
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -311,9 +310,6 @@ func Test_validationSchemaless(t *testing.T) {
SendError: true,
}, store)
serr := tt.r.Serr()
if tt.sql == "SELECT sum(temp) as temp1, count(temp) as temp FROM src1" {
serr = ""
}
require.Equal(t, serr, testx.Errstring(err))
}
}
Expand Down
1 change: 1 addition & 0 deletions internal/topo/planner/plan_explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func TestExplainPlan(t *testing.T) {
p, err := createLogicalPlan(stmt, &def.RuleOption{
PlanOptimizeStrategy: &def.PlanOptimizeStrategy{
EnableIncrementalWindow: true,
EnableAliasRefCal: true,
},
Qos: 0,
}, kv)
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func createLogicalPlan(stmt *ast.SelectStatement, opt *def.RuleOption, store kv.
ds ast.Dimensions
)

streamStmts, analyticFuncs, analyticFieldFuncs, err := decorateStmt(stmt, store)
streamStmts, analyticFuncs, analyticFieldFuncs, err := decorateStmt(stmt, store, opt)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions internal/topo/planner/planner_alias_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ func TestPlannerAlias(t *testing.T) {
Qos: 0,
CheckpointInterval: 0,
SendError: true,
PlanOptimizeStrategy: &def.PlanOptimizeStrategy{
EnableAliasRefCal: true,
},
}, kv)
if !reflect.DeepEqual(tt.p, p) {
t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.sql, render.AsCode(tt.p), render.AsCode(p))
Expand Down
3 changes: 3 additions & 0 deletions internal/topo/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2570,6 +2570,9 @@ func Test_createLogicalPlan(t *testing.T) {
Qos: 0,
CheckpointInterval: 0,
SendError: true,
PlanOptimizeStrategy: &def.PlanOptimizeStrategy{
EnableAliasRefCal: true,
},
}, kv)
if tt.err != "" {
assert.EqualError(t, err, tt.err)
Expand Down
9 changes: 9 additions & 0 deletions internal/topo/topotest/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,7 @@ func TestSingleSQL(t *testing.T) {
SendError: true,
PlanOptimizeStrategy: &def.PlanOptimizeStrategy{
EnableIncrementalWindow: true,
EnableAliasRefCal: true,
},
},
{
Expand All @@ -1257,6 +1258,7 @@ func TestSingleSQL(t *testing.T) {
CheckpointInterval: cast.DurationConf(5 * time.Second),
PlanOptimizeStrategy: &def.PlanOptimizeStrategy{
EnableIncrementalWindow: true,
EnableAliasRefCal: true,
},
},
{
Expand All @@ -1266,6 +1268,7 @@ func TestSingleSQL(t *testing.T) {
CheckpointInterval: cast.DurationConf(5 * time.Second),
PlanOptimizeStrategy: &def.PlanOptimizeStrategy{
EnableIncrementalWindow: true,
EnableAliasRefCal: true,
},
},
}
Expand Down Expand Up @@ -1861,12 +1864,18 @@ func TestAliasSQL(t *testing.T) {
SendError: true,
Qos: def.AtLeastOnce,
CheckpointInterval: cast.DurationConf(5 * time.Second),
PlanOptimizeStrategy: &def.PlanOptimizeStrategy{
EnableAliasRefCal: true,
},
},
{
BufferLength: 100,
SendError: true,
Qos: def.ExactlyOnce,
CheckpointInterval: cast.DurationConf(5 * time.Second),
PlanOptimizeStrategy: &def.PlanOptimizeStrategy{
EnableAliasRefCal: true,
},
},
}
for _, opt := range options {
Expand Down

0 comments on commit 17f5887

Please sign in to comment.