Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support inc agg function #3465

Merged
merged 3 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 226 additions & 2 deletions internal/binder/function/funcs_inc_agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@
"fmt"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/pingcap/failpoint"

"github.com/lf-edge/ekuiper/v2/pkg/ast"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
)

var supportedIncAggFunc = map[string]struct{}{
"count": {},
"avg": {},
"count": {},
"avg": {},
"max": {},
"min": {},
"sum": {},
"merge_agg": {},
"collect": {},
"last_value": {},
}

func IsSupportedIncAgg(name string) bool {
Expand Down Expand Up @@ -66,9 +73,223 @@
val: ValidateOneNumberArg,
check: returnNilIfHasAnyNil,
}
builtins["inc_max"] = builtinFunc{
fType: ast.FuncTypeScalar,
exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
arg0 := args[0]
result, err := incrementalMax(ctx, arg0)
if err != nil {
return err, false
}
return result, true
},
val: ValidateOneNumberArg,
check: returnNilIfHasAnyNil,
}
builtins["inc_min"] = builtinFunc{
fType: ast.FuncTypeScalar,
exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
arg0 := args[0]
result, err := incrementalMin(ctx, arg0)
if err != nil {
return err, false
}
return result, true
},
val: ValidateOneNumberArg,
check: returnNilIfHasAnyNil,
}
builtins["inc_sum"] = builtinFunc{
fType: ast.FuncTypeScalar,
exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
arg0, err := cast.ToFloat64(args[0], cast.CONVERT_ALL)
if err != nil {
return err, false
}

Check warning on line 108 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L107-L108

Added lines #L107 - L108 were not covered by tests
result, err := incrementalSum(ctx, arg0)
if err != nil {
return err, false
}
return result, true
},
val: ValidateOneNumberArg,
check: returnNilIfHasAnyNil,
}
builtins["inc_merge_agg"] = builtinFunc{
fType: ast.FuncTypeScalar,
exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
arg0, ok := args[0].(map[string]interface{})
if !ok {
return fmt.Errorf("argument is not a map[string]interface{}"), false
}

Check warning on line 124 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L123-L124

Added lines #L123 - L124 were not covered by tests
result, err := incrementalMerge(ctx, arg0)
if err != nil {
return err, false
}
return result, true
},
val: ValidateOneNumberArg,
check: returnNilIfHasAnyNil,
}
builtins["inc_collect"] = builtinFunc{
fType: ast.FuncTypeScalar,
exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
arg0 := args[0]
result, err := incrementalCollect(ctx, arg0)
if err != nil {
return err, false
}
return result, true
},
val: ValidateOneNumberArg,
check: returnNilIfHasAnyNil,
}
builtins["inc_last_value"] = builtinFunc{
fType: ast.FuncTypeScalar,
exec: func(ctx api.FunctionContext, args []interface{}) (interface{}, bool) {
arg0 := args[0]
arg1, ok := args[1].(bool)
if !ok {
return fmt.Errorf("second argument is not a bool"), false
}

Check warning on line 154 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L153-L154

Added lines #L153 - L154 were not covered by tests
result, err := incrementalLastValue(ctx, arg0, arg1)
if err != nil {
return err, false
}
return result, true
},
val: ValidateTwoNumberArg,
check: returnNilIfHasAnyNil,
}
}

func incrementalLastValue(ctx api.FunctionContext, arg interface{}, ignoreNil bool) (interface{}, error) {
failpoint.Inject("inc_err", func() {
failpoint.Return(nil, fmt.Errorf("inc err"))
})
key := fmt.Sprintf("%v_inc_last_value", ctx.GetFuncId())
v, err := ctx.GetState(key)
if err != nil {
return nil, err
}

Check warning on line 174 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L173-L174

Added lines #L173 - L174 were not covered by tests
if arg == nil {
if !ignoreNil {
return nil, nil
} else {
return v, nil
}

Check warning on line 180 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L176-L180

Added lines #L176 - L180 were not covered by tests
} else {
ctx.PutState(key, arg)
return arg, nil
}
}

func incrementalCollect(ctx api.FunctionContext, arg interface{}) ([]interface{}, error) {
failpoint.Inject("inc_err", func() {
failpoint.Return(nil, fmt.Errorf("inc err"))
})
key := fmt.Sprintf("%v_inc_collect", ctx.GetFuncId())
var listV []interface{}
v, err := ctx.GetState(key)
if err != nil {
return nil, err
}

Check warning on line 196 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L195-L196

Added lines #L195 - L196 were not covered by tests
if v == nil {
listV = make([]interface{}, 0)
} else {
llv, ok := v.([]interface{})
if ok {
listV = llv
}
}
listV = append(listV, arg)
ctx.PutState(key, listV)
return listV, nil
}

func incrementalMerge(ctx api.FunctionContext, arg map[string]interface{}) (map[string]interface{}, error) {
failpoint.Inject("inc_err", func() {
failpoint.Return(nil, fmt.Errorf("inc err"))
})
key := fmt.Sprintf("%v_inc_merge_agg", ctx.GetFuncId())
var mv map[string]interface{}
v, err := ctx.GetState(key)
if err != nil {
return nil, err
}

Check warning on line 219 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L218-L219

Added lines #L218 - L219 were not covered by tests
if v == nil {
mv = make(map[string]interface{})
} else {
mmv, ok := v.(map[string]interface{})
if ok {
mv = mmv
}
}
for k, value := range arg {
mv[k] = value
}
ctx.PutState(key, mv)
return mv, nil
}

func incrementalMin(ctx api.FunctionContext, arg interface{}) (interface{}, error) {
failpoint.Inject("inc_err", func() {
failpoint.Return(nil, fmt.Errorf("inc err"))
})
key := fmt.Sprintf("%v_inc_min", ctx.GetFuncId())
v, err := ctx.GetState(key)
if err != nil {
return nil, err
}

Check warning on line 243 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L242-L243

Added lines #L242 - L243 were not covered by tests
args := make([]interface{}, 0)
args = append(args, arg)
if v != nil {
args = append(args, v)
}
result, _ := min(args)
switch result.(type) {
case error:
return nil, err

Check warning on line 252 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L251-L252

Added lines #L251 - L252 were not covered by tests
case int64, float64, string:
ctx.PutState(key, result)
return result, nil
case nil:
return nil, nil

Check warning on line 257 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L256-L257

Added lines #L256 - L257 were not covered by tests
}
return nil, nil

Check warning on line 259 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L259

Added line #L259 was not covered by tests
}

func incrementalMax(ctx api.FunctionContext, arg interface{}) (interface{}, error) {
failpoint.Inject("inc_err", func() {
failpoint.Return(nil, fmt.Errorf("inc err"))
})
key := fmt.Sprintf("%v_inc_max", ctx.GetFuncId())
v, err := ctx.GetState(key)
if err != nil {
return nil, err
}

Check warning on line 270 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L269-L270

Added lines #L269 - L270 were not covered by tests
args := make([]interface{}, 0)
args = append(args, arg)
if v != nil {
args = append(args, v)
}
result, _ := max(args)
switch result.(type) {
case error:
return nil, err

Check warning on line 279 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L278-L279

Added lines #L278 - L279 were not covered by tests
case int64, float64, string:
ctx.PutState(key, result)
return result, nil
case nil:
return nil, nil

Check warning on line 284 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L283-L284

Added lines #L283 - L284 were not covered by tests
}
return nil, nil

Check warning on line 286 in internal/binder/function/funcs_inc_agg.go

View check run for this annotation

Codecov / codecov/patch

internal/binder/function/funcs_inc_agg.go#L286

Added line #L286 was not covered by tests
}

func incrementalCount(ctx api.FunctionContext, arg interface{}) (int64, error) {
failpoint.Inject("inc_err", func() {
failpoint.Return(0, fmt.Errorf("inc err"))
})
key := fmt.Sprintf("%v_inc_count", ctx.GetFuncId())
v, err := ctx.GetState(key)
if err != nil {
Expand All @@ -85,6 +306,9 @@
}

func incrementalSum(ctx api.FunctionContext, arg float64) (float64, error) {
failpoint.Inject("inc_err", func() {
failpoint.Return(0, fmt.Errorf("inc err"))
})
key := fmt.Sprintf("%v_inc_sum", ctx.GetFuncId())
v, err := ctx.GetState(key)
if err != nil {
Expand Down
109 changes: 104 additions & 5 deletions internal/binder/function/funcs_inc_agg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package function
import (
"testing"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/v2/internal/conf"
Expand Down Expand Up @@ -49,18 +50,116 @@ func TestIncAggFunction(t *testing.T) {
args2: []interface{}{3},
output2: float64(2),
},
{
funcName: "inc_max",
args1: []interface{}{1},
output1: int64(1),
args2: []interface{}{3},
output2: int64(3),
},
{
funcName: "inc_min",
args1: []interface{}{3},
output1: int64(3),
args2: []interface{}{1},
output2: int64(1),
},
{
funcName: "inc_sum",
args1: []interface{}{3},
output1: float64(3),
args2: []interface{}{1},
output2: float64(4),
},
{
funcName: "inc_merge_agg",
args1: []interface{}{map[string]interface{}{"a": 1}},
output1: map[string]interface{}{"a": 1},
args2: []interface{}{map[string]interface{}{"b": 2}},
output2: map[string]interface{}{"a": 1, "b": 2},
},
{
funcName: "inc_collect",
args1: []interface{}{1},
output1: []interface{}{1},
args2: []interface{}{2},
output2: []interface{}{1, 2},
},
{
funcName: "inc_last_value",
args1: []interface{}{1, true},
output1: 1,
args2: []interface{}{2, true},
output2: 2,
},
}
for index, tc := range testcases {
ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
tempStore, _ := state.CreateStore(tc.funcName, def.AtMostOnce)
fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), index)
f, ok := builtins[tc.funcName]
require.True(t, ok)
require.True(t, ok, tc.funcName)
got1, ok := f.exec(fctx, tc.args1)
require.True(t, ok)
require.Equal(t, tc.output1, got1)
require.True(t, ok, tc.funcName)
require.Equal(t, tc.output1, got1, tc.funcName)
got2, ok := f.exec(fctx, tc.args2)
require.True(t, ok)
require.Equal(t, tc.output2, got2)
require.True(t, ok, tc.funcName)
require.Equal(t, tc.output2, got2, tc.funcName)
}
}

func TestIncAggFunctionErr(t *testing.T) {
contextLogger := conf.Log.WithField("rule", "testExec")
registerIncAggFunc()
failpoint.Enable("github.com/lf-edge/ekuiper/v2/internal/binder/function/inc_err", `return(true)`)
defer failpoint.Disable("github.com/lf-edge/ekuiper/v2/internal/binder/function/inc_err")
testcases := []struct {
funcName string
args1 []interface{}
}{
{
funcName: "inc_count",
args1: []interface{}{1},
},
{
funcName: "inc_avg",
args1: []interface{}{1},
},
{
funcName: "inc_max",
args1: []interface{}{1},
},
{
funcName: "inc_min",
args1: []interface{}{3},
},
{
funcName: "inc_sum",
args1: []interface{}{3},
},
{
funcName: "inc_merge_agg",
args1: []interface{}{map[string]interface{}{"a": 1}},
},
{
funcName: "inc_collect",
args1: []interface{}{1},
},
{
funcName: "inc_last_value",
args1: []interface{}{1, true},
},
}
for index, tc := range testcases {
ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
tempStore, _ := state.CreateStore(tc.funcName, def.AtMostOnce)
fctx := kctx.NewDefaultFuncContext(ctx.WithMeta("mockRule0", "test", tempStore), index)
f, ok := builtins[tc.funcName]
require.True(t, ok, tc.funcName)
got, ok := f.exec(fctx, tc.args1)
require.False(t, ok, tc.funcName)
err, isErr := got.(error)
require.True(t, isErr)
require.Error(t, err)
}
}
Loading
Loading