Skip to content

Commit

Permalink
fix(state): untriggerd rule always cancel (lf-edge#3435)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying authored Dec 10, 2024
1 parent 85af151 commit 444c9af
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 27 deletions.
240 changes: 240 additions & 0 deletions fvt/rulestate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fvt

import (
"net/http"
"testing"
"time"

"github.com/stretchr/testify/suite"
)

type RuleStateTestSuite struct {
suite.Suite
}

func TestRuleTestSuite(t *testing.T) {
suite.Run(t, new(RuleStateTestSuite))
}

func (s *RuleStateTestSuite) TestUpdate() {
s.Run("init rule1", func() {
conf := map[string]any{
"interval": "10ms",
}
resp, err := client.CreateConf("sources/simulator/confKeys/ttt", conf)
s.Require().NoError(err)
s.Require().Equal(http.StatusOK, resp.StatusCode)

streamSql := `{"sql": "create stream simStream() WITH (TYPE=\"simulator\", FORMAT=\"json\", CONF_KEY=\"ttt\", SHARED=\"true\")"}`
resp, err = client.CreateStream(streamSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)

ruleSql := `{
"id": "rule1",
"name": "keep rule",
"sql": "SELECT * FROM simStream",
"actions": [
{
"nop":{}
}
],
"options": {
"sendError": false
}
}`
resp, err = client.CreateRule(ruleSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)

ruleSql2 := `{
"id": "rule2",
"name": "to update rule",
"sql": "SELECT * FROM simStream",
"actions": [
{
"nop":{}
}
],
"options": {
"sendError": false,
"bufferLength": 2
}
}`
resp, err = client.CreateRule(ruleSql2)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)
})
s.Run("stop and update rule2 but not start", func() {
resp, err := client.StopRule("rule2")
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusOK, resp.StatusCode)

ruleSql2 := `{
"id": "rule2",
"triggered": false,
"name": "to update rule",
"sql": "SELECT * FROM simStream",
"actions": [
{
"nop":{}
}
],
"options": {
"sendError": false,
"bufferLength": 2
}
}`
resp, err = client.UpdateRule("rule2", ruleSql2)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusOK, resp.StatusCode)
})
s.Run("check no buffer is not full exp", func() {
// Get metrics
metrics, err := client.GetRuleStatus("rule1")
s.Require().NoError(err)
s.Equal("running", metrics["status"])
s.T().Log(metrics)
exp, ok := metrics["source_simStream_0_exceptions_total"]
s.True(ok)
s.Require().True(exp.(float64) == 0)
sinkOut1, ok := metrics["source_simStream_0_records_in_total"]
s.True(ok)
// Get 2nd metrics
time.Sleep(50 * time.Millisecond)
metrics, err = client.GetRuleStatus("rule1")
s.Require().NoError(err)
s.Equal("running", metrics["status"])
s.T().Log(metrics)
exp, ok = metrics["source_simStream_0_exceptions_total"]
s.True(ok)
s.Require().True(exp.(float64) == 0, "has exception")
sinkOut2, ok := metrics["source_simStream_0_records_in_total"]
s.True(ok)
s.Require().True(sinkOut2.(float64)-sinkOut1.(float64) > 0)
})
s.Run("clean up", func() {
res, e := client.Delete("rules/rule2")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)

res, e = client.Delete("rules/rule1")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)

res, e = client.Delete("streams/simStream")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)
})
}

func (s *RuleStateTestSuite) TestCreateStoppedRule() {
s.Run("init rule1", func() {
conf := map[string]any{
"interval": "10ms",
}
resp, err := client.CreateConf("sources/simulator/confKeys/ttt", conf)
s.Require().NoError(err)
s.Require().Equal(http.StatusOK, resp.StatusCode)

streamSql := `{"sql": "create stream simStream() WITH (TYPE=\"simulator\", FORMAT=\"json\", CONF_KEY=\"ttt\", SHARED=\"true\")"}`
resp, err = client.CreateStream(streamSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)

ruleSql := `{
"id": "rule1",
"name": "keep rule",
"sql": "SELECT * FROM simStream",
"actions": [
{
"nop":{}
}
],
"options": {
"sendError": false
}
}`
resp, err = client.CreateRule(ruleSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)

ruleSql2 := `{
"triggered": false,
"id": "rule2",
"name": "to update rule",
"sql": "SELECT * FROM simStream",
"actions": [
{
"nop":{}
}
],
"options": {
"sendError": false,
"bufferLength": 2
}
}`
resp, err = client.CreateRule(ruleSql2)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)
})
s.Run("check no buffer is not full exp", func() {
// Get metrics
metrics, err := client.GetRuleStatus("rule1")
s.Require().NoError(err)
s.Equal("running", metrics["status"])
s.T().Log(metrics)
exp, ok := metrics["source_simStream_0_exceptions_total"]
s.True(ok)
s.Require().True(exp.(float64) == 0)
sinkOut1, ok := metrics["source_simStream_0_records_in_total"]
s.True(ok)
// Get 2nd metrics
time.Sleep(50 * time.Millisecond)
metrics, err = client.GetRuleStatus("rule1")
s.Require().NoError(err)
s.Equal("running", metrics["status"])
s.T().Log(metrics)
exp, ok = metrics["source_simStream_0_exceptions_total"]
s.True(ok)
s.Require().True(exp.(float64) == 0, "has exception")
sinkOut2, ok := metrics["source_simStream_0_records_in_total"]
s.True(ok)
s.Require().True(sinkOut2.(float64)-sinkOut1.(float64) > 0)
})
s.Run("clean up", func() {
res, e := client.Delete("rules/rule2")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)

res, e = client.Delete("rules/rule1")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)

res, e = client.Delete("streams/simStream")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)
})
}
13 changes: 13 additions & 0 deletions fvt/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ func (sdk *SDK) RestartRule(ruleId string) (resp *http.Response, err error) {
return http.Post(sdk.baseUrl.JoinPath("rules", ruleId, "restart").String(), ContentTypeJson, nil)
}

func (sdk *SDK) StopRule(ruleId string) (resp *http.Response, err error) {
return http.Post(sdk.baseUrl.JoinPath("rules", ruleId, "stop").String(), ContentTypeJson, nil)
}

func (sdk *SDK) UpdateRule(name, ruleJson string) (resp *http.Response, err error) {
req, err := http.NewRequest(http.MethodPut, sdk.baseUrl.JoinPath("rules", name).String(), bytes.NewBufferString(ruleJson))
if err != nil {
fmt.Println(err)
return
}
return sdk.httpClient.Do(req)
}

func (sdk *SDK) DeleteRule(name string) (resp *http.Response, err error) {
req, err := http.NewRequest(http.MethodDelete, sdk.baseUrl.JoinPath("rules", name).String(), nil)
if err != nil {
Expand Down
9 changes: 7 additions & 2 deletions internal/server/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (rr *RuleRegistry) CreateRule(name, ruleJson string) (id string, err error)
// create state and save
rs := rule.NewState(r)
// Validate the topo
err = rs.Validate()
tp, err := rs.Validate()
if err != nil {
return r.Id, err
}
Expand All @@ -124,6 +124,7 @@ func (rr *RuleRegistry) CreateRule(name, ruleJson string) (id string, err error)
}
// Start the rule asyncly
if r.Triggered {
rs.WithTopo(tp)
go func() {
panicOrError := infra.SafeRun(func() error {
// Start the rule which runs async
Expand All @@ -133,6 +134,8 @@ func (rr *RuleRegistry) CreateRule(name, ruleJson string) (id string, err error)
logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError)
}
}()
} else if tp != nil {
tp.Cancel()
}
return r.Id, nil
}
Expand Down Expand Up @@ -174,7 +177,7 @@ func (rr *RuleRegistry) UpdateRule(ruleId, ruleJson string) error {
oldRule := rs.Rule
rs.Rule = r
// validateRule only check plan is valid, topology shouldn't be changed before ruleState stop
newTopo, err := rs.ValidateRule()
newTopo, err := rs.Validate()
if err != nil {
rs.Rule = oldRule
return err
Expand All @@ -189,6 +192,8 @@ func (rr *RuleRegistry) UpdateRule(ruleId, ruleJson string) error {
if err2 != nil {
return err2
}
} else if newTopo != nil {
newTopo.Cancel()
}
return err1
}
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/planner/planner_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func splitSource(ctx api.StreamContext, t *DataSourcePlan, ss api.Source, option
index++
}

if t.streamStmt.Options.SHARED && len(ops) > 0 {
if t.streamStmt.Options.SHARED {
// Create subtopo in the end to avoid errors in the middle
srcSubtopo, existed := topo.GetOrCreateSubTopo(string(t.name))
if !existed {
Expand Down
37 changes: 14 additions & 23 deletions internal/topo/rule/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,33 +113,24 @@ func (s *State) WithTopo(topo *topo.Topo) *State {
return s
}

// Validate is the second level validation
// It tries to plan and return any errors
// Only run when creating the Rule
func (s *State) Validate() error {
// Validate tries to plan and return the planned topo and any errors
// Need to cancel the topo if it is of no use because the input/output channels are set
// Otherwise, the shared source may send to these channels and hang
func (s *State) Validate() (*topo.Topo, error) {
s.Lock()
defer s.Unlock()
err := infra.SafeRun(func() error {
if tp, err := planner.Plan(s.Rule); err != nil {
return err
} else {
s.topology = tp
}
return nil
})
return err
}

func (s *State) ValidateRule() (*topo.Topo, error) {
s.Lock()
defer s.Unlock()
var topo *topo.Topo
var err error
infra.SafeRun(func() error {
topo, err = planner.Plan(s.Rule)
var (
tp *topo.Topo
err error
)
err = infra.SafeRun(func() error {
tp, err = planner.Plan(s.Rule)
return err
})
return topo, err
if err != nil {
return nil, err
}
return tp, err
}

func (s *State) transit(newState RunState, err error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/rule/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestAPIs(t *testing.T) {
assert.Equal(t, Stopped, st.currentState)
// Update rule
st.Rule = def.GetDefaultRule("testAPI", "select abc from demo where a > 3")
e = st.Validate()
_, e = st.Validate()
assert.NoError(t, e)
e = st.Start()
assert.NoError(t, e)
Expand Down

0 comments on commit 444c9af

Please sign in to comment.