Skip to content

Commit

Permalink
feat: push down alias decode (#3460)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer authored Dec 24, 2024
1 parent d6f6598 commit 705cac1
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 69 deletions.
73 changes: 62 additions & 11 deletions internal/converter/json/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type FastJsonConverter struct {
}

type FastJsonConverterConf struct {
UseInt64 bool `json:"useInt64ForWholeNumber"`
UseInt64 bool `json:"useInt64ForWholeNumber"`
ColAliasMapping map[string]string `json:"colAliasMapping"`
}

func NewFastJsonConverter(schema map[string]*ast.JsonStreamField, props map[string]any) *FastJsonConverter {
Expand Down Expand Up @@ -120,7 +121,7 @@ func (f *FastJsonConverter) decodeWithSchema(b []byte, schema map[string]*ast.Js
if err != nil {
return nil, err
}
subMap, err := f.decodeObject(obj, schema)
subMap, err := f.decodeObject(obj, schema, false)
if err != nil {
return nil, err
}
Expand All @@ -132,7 +133,7 @@ func (f *FastJsonConverter) decodeWithSchema(b []byte, schema map[string]*ast.Js
if err != nil {
return nil, err
}
m, err := f.decodeObject(obj, schema)
m, err := f.decodeObject(obj, schema, true)
if err != nil {
return nil, err
}
Expand All @@ -159,7 +160,7 @@ func (f *FastJsonConverter) decodeArray(array []*fastjson.Value, field *ast.Json
if field != nil {
props = field.Properties
}
subMap, err := f.decodeObject(childObj, props)
subMap, err := f.decodeObject(childObj, props, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -214,7 +215,7 @@ func (f *FastJsonConverter) decodeArray(array []*fastjson.Value, field *ast.Json
return vs, nil
}

func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string]*ast.JsonStreamField) (map[string]interface{}, error) {
func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string]*ast.JsonStreamField, isOuter bool) (map[string]interface{}, error) {
m := make(map[string]interface{})
var err error
obj.Visit(func(k []byte, v *fastjson.Value) {
Expand Down Expand Up @@ -242,13 +243,23 @@ func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string
if schema != nil && schema[key] != nil {
props = schema[key].Properties
}
childMap, err2 := f.decodeObject(childObj, props)
childMap, err2 := f.decodeObject(childObj, props, false)
if err2 != nil {
err = err2
return
}
if childMap != nil {
m[key] = childMap
set := false
if isOuter && len(f.ColAliasMapping) > 0 {
alias, ok := f.ColAliasMapping[key]
if ok {
set = true
m[alias] = childMap
}
}
if !set {
m[key] = childMap
}
}
case fastjson.TypeArray:
add, valid := f.checkSchema(key, "array", schema)
Expand All @@ -274,7 +285,17 @@ func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string
return
}
if subList != nil {
m[key] = subList
set := false
if isOuter && len(f.ColAliasMapping) > 0 {
alias, ok := f.ColAliasMapping[key]
if ok {
set = true
m[alias] = subList
}
}
if !set {
m[key] = subList
}
}
case fastjson.TypeString:
if schema != nil {
Expand All @@ -289,7 +310,17 @@ func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string
return
}
if v != nil {
m[key] = v
set := false
if isOuter && len(f.ColAliasMapping) > 0 {
alias, ok := f.ColAliasMapping[key]
if ok {
set = true
m[alias] = v
}
}
if !set {
m[key] = v
}
}
case fastjson.TypeNumber:
if schema != nil {
Expand All @@ -304,7 +335,17 @@ func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string
return
}
if v != nil {
m[key] = v
set := false
if isOuter && len(f.ColAliasMapping) > 0 {
alias, ok := f.ColAliasMapping[key]
if ok {
set = true
m[alias] = v
}
}
if !set {
m[key] = v
}
}
case fastjson.TypeTrue, fastjson.TypeFalse:
if schema != nil {
Expand All @@ -319,7 +360,17 @@ func (f *FastJsonConverter) decodeObject(obj *fastjson.Object, schema map[string
return
}
if v != nil {
m[key] = v
set := false
if isOuter && len(f.ColAliasMapping) > 0 {
alias, ok := f.ColAliasMapping[key]
if ok {
set = true
m[alias] = v
}
}
if !set {
m[key] = v
}
}
}
})
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/def/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type RuleOption struct {

type PlanOptimizeStrategy struct {
EnableIncrementalWindow bool `json:"enableIncrementalWindow,omitempty" yaml:"enableIncrementalWindow,omitempty"`
EnableAliasPushdown bool `json:"enableAliasPushdown,omitempty" yaml:"enableAliasPushdown,omitempty"`
}

type RestartStrategy struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/operator/windowfunc_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

type WindowFuncOperator struct {
WindowFuncField ast.Field
WindowFuncField *ast.Field
}

type windowFuncHandle interface {
Expand Down
4 changes: 2 additions & 2 deletions internal/topo/operator/windowfunc_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestWindowFuncApplyCollection(t *testing.T) {
{
data: data1,
op: &WindowFuncOperator{
WindowFuncField: ast.Field{
WindowFuncField: &ast.Field{
Name: "row_number",
Expr: &ast.Call{
Name: "row_number",
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestWindowFuncApplyCollection(t *testing.T) {
{
data: data2,
op: &WindowFuncOperator{
WindowFuncField: ast.Field{
WindowFuncField: &ast.Field{
Name: "row_number",
Expr: &ast.Call{
Name: "row_number",
Expand Down
23 changes: 23 additions & 0 deletions internal/topo/planner/dataSourcePlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type DataSourcePlan struct {
iet bool
timestampFormat string
timestampField string
// col -> alias
colAliasMapping map[string]string
// intermediate status
isWildCard bool
fields map[string]*ast.JsonStreamField
Expand Down Expand Up @@ -117,6 +119,21 @@ func (p *DataSourcePlan) BuildExplainInfo() {
}
info += " ]"
}
if len(p.colAliasMapping) > 0 {
info += ", ColAliasMapping:[ "
keys := make([]string, 0)
for col, alias := range p.colAliasMapping {
keys = append(keys, col+":"+alias)
}
sort.Strings(keys)
for i := 0; i < len(keys); i++ {
info += keys[i]
if i != len(keys)-1 {
info += ", "
}
}
info += " ]"
}
p.baseLogicalPlan.ExplainInfo.Info = info
}

Expand Down Expand Up @@ -357,6 +374,12 @@ func markPruneJSONStreamField(cur interface{}, field *ast.JsonStreamField) {
}

func (p *DataSourcePlan) getField(name string, strict bool) (*ast.JsonStreamField, error) {
for col, alias := range p.colAliasMapping {
if name == alias {
name = col
break
}
}
if !p.isSchemaless {
r, ok := p.streamFields[name]
if !ok {
Expand Down
7 changes: 5 additions & 2 deletions internal/topo/planner/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@

package planner

import "github.com/lf-edge/ekuiper/v2/internal/pkg/def"

var optRuleList = []logicalOptRule{
&columnPruner{},
&predicatePushDown{},
&pushProjectionPlan{},
&pushAliasDecode{},
}

func optimize(p LogicalPlan) (LogicalPlan, error) {
func optimize(p LogicalPlan, options *def.RuleOption) (LogicalPlan, error) {
var err error
for _, rule := range optRuleList {
p, err = rule.optimize(p)
p, err = rule.optimize(p, options)
if err != nil {
return nil, err
}
Expand Down
36 changes: 36 additions & 0 deletions internal/topo/planner/plan_explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,39 @@ func TestSupportedWindowType(t *testing.T) {
require.Equal(t, tc.ok, supportedWindowType(tc.w))
}
}

func TestExplainPushAlias(t *testing.T) {
kv, err := store.GetKV("stream")
require.NoError(t, err)
require.NoError(t, prepareStream())

testcases := []struct {
sql string
explain string
}{
{
sql: `select a as a1 from stream`,
explain: `{"op":"ProjectPlan_0","info":"Fields:[ stream.a1 ]"}
{"op":"DataSourcePlan_1","info":"StreamName: stream, StreamFields:[ a ], ColAliasMapping:[ a:a1 ]"}`,
},
{
sql: `select a as a1, * from stream`,
explain: `{"op":"ProjectPlan_0","info":"Fields:[ $$alias.a1,aliasRef:stream.a, * ]"}
{"op":"DataSourcePlan_1","info":"StreamName: stream, StreamFields:[ a, b ]"}`,
},
}
for _, tc := range testcases {
stmt, err := xsql.NewParser(strings.NewReader(tc.sql)).Parse()
require.NoError(t, err)
p, err := createLogicalPlan(stmt, &def.RuleOption{
PlanOptimizeStrategy: &def.PlanOptimizeStrategy{
EnableAliasPushdown: true,
},
Qos: 0,
}, kv)
require.NoError(t, err)
explain, err := ExplainFromLogicalPlan(p, "")
require.NoError(t, err)
require.Equal(t, tc.explain, explain, tc.sql)
}
}
Loading

0 comments on commit 705cac1

Please sign in to comment.