Skip to content

Commit

Permalink
chore(test): remove batch trace fvt
Browse files Browse the repository at this point in the history
Restore later

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Dec 24, 2024
1 parent 45527a3 commit 53fc736
Showing 1 changed file with 0 additions and 199 deletions.
199 changes: 0 additions & 199 deletions fvt/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,205 +38,6 @@ func TestTraceTestSuite(t *testing.T) {
suite.Run(t, new(TraceTestSuite))
}

// Cover multiple sink, memory, batch, window
func (s *TraceTestSuite) TestComplexTrace() {
s.Run("init rule1", func() {
streamSql := `{"sql": "create stream pushStream() WITH (TYPE=\"httppush\", DATASOURCE=\"/test/sim\", FORMAT=\"json\", SHARED=\"true\")"}`
resp, err := client.CreateStream(streamSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)

ruleSql := `{
"id": "rule1",
"name": "http push to multiple sinks including memory",
"sql": "SELECT a + b as c FROM pushStream",
"actions": [
{
"log": {
"format": "delimited",
"sendSingle": false,
"batchSize": 2
},
"memory": {
"topic": "fvt/mem1",
"sendSingle": true
}
}
],
"options": {
"sendError": false
}
}`
resp, err = client.CreateRule(ruleSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)
})
s.Run("init rule2", func() {
streamSql := `{"sql": "create stream memStream() WITH (TYPE=\"memory\", DATASOURCE=\"fvt/mem1\", FORMAT=\"json\")"}`
resp, err := client.CreateStream(streamSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)

ruleSql := `{
"id": "rule2",
"name": "use window from memory source with cache",
"sql": "SELECT count(*) FROM memStream GROUP BY SlidingWindow(ms, 100)",
"actions": [
{
"log": {
"sendSingle": true,
"enableCache": true
}
}
],
"options": {
"sendError": false
}
}`
resp, err = client.CreateRule(ruleSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)
})
time.Sleep(ConstantInterval)
s.Run("enable trace", func() {
resp, err := client.Post("rules/rule1/trace/start", `{"strategy": "always"}`)
s.Require().NoError(err)
s.Require().Equal(http.StatusOK, resp.StatusCode)

resp, err = client.Post("rules/rule2/trace/start", `{"strategy": "always"}`)
s.Require().NoError(err)
s.Require().Equal(http.StatusOK, resp.StatusCode)
})
time.Sleep(ConstantInterval)
s.Run("send data by http", func() {
resp, err := http.Post("http://127.0.0.1:10081/test/sim", ContentTypeJson, bytes.NewBufferString("{\"a\": 12,\"b\": 21}"))
s.Require().NoError(err)
s.Require().Equal(http.StatusOK, resp.StatusCode)
resp, err = http.Post("http://127.0.0.1:10081/test/sim", ContentTypeJson, bytes.NewBufferString("{\"a\": 22,\"b\": 41}"))
s.Require().NoError(err)
s.Require().Equal(http.StatusOK, resp.StatusCode)
time.Sleep(500 * time.Millisecond)
resp, err = http.Post("http://127.0.0.1:10081/test/sim", ContentTypeJson, bytes.NewBufferString("{\"a\": 32,\"b\": 61}"))
s.Require().NoError(err)
s.Require().Equal(http.StatusOK, resp.StatusCode)
})
s.Run("assert rule1 trace", func() {
var rule1Ids []string
// Assert rule1 traces
r := TryAssert(10, time.Second, func() bool {
resp, e := client.Get("trace/rule/rule1")
s.Require().NoError(e)
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
s.Require().NoError(err)
err = json.Unmarshal(body, &rule1Ids)
s.Require().NoError(err)
return len(rule1Ids) == 4
})
fmt.Println(len(rule1Ids))
s.Require().True(r)
// assert each trace, just check 1/2/3
for i := 1; i < 4; i++ {
tid := rule1Ids[i]
resp, e := client.Get(path.Join("trace", tid))
s.NoError(e)
s.Equal(http.StatusOK, resp.StatusCode)
act, resultMap, err := GetResponseResultTextAndMap(resp)
s.NoError(err)
all, err := os.ReadFile(filepath.Join("result", "trace", fmt.Sprintf("complex%d.json", i)))
s.NoError(err)
exps := make([]map[string]any, 0)
err = json.Unmarshal(all, &exps)
s.NoError(err)
find := false
for _, exp := range exps {
if s.compareTrace(exp, resultMap) {
find = true
break
}
}
if !find {
fmt.Println(fmt.Sprintf("complex%d.json", i))
fmt.Println(string(act))
s.Fail(fmt.Sprintf("trace 1 file %d compares fail", i))
}
}
})
s.Run("assert rule2 trace", func() {
var (
rule2Ids []string
checkMap = map[int]int{
1: 2,
2: 3,
4: 4,
5: 5,
}
)
// Assert rule2 traces
r := TryAssert(10, 100*time.Millisecond, func() bool {
resp, e := client.Get("trace/rule/rule2")
s.Require().NoError(e)
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
s.Require().NoError(err)
err = json.Unmarshal(body, &rule2Ids)
s.Require().NoError(err)
return len(rule2Ids) == 6
})
s.Require().True(r)
for i, tid := range rule2Ids {
eid, ok := checkMap[i]
if !ok {
continue
}
resp, e := client.Get(path.Join("trace", tid))
s.NoError(e)
s.Equal(http.StatusOK, resp.StatusCode)
act, resultMap, err := GetResponseResultTextAndMap(resp)
s.NoError(err)

all, err := os.ReadFile(filepath.Join("result", "trace", fmt.Sprintf("complex%d.json", eid)))
s.NoError(err)
exps := make([]map[string]any, 0)
err = json.Unmarshal(all, &exps)
s.NoError(err)
find := false
for _, exp := range exps {
if s.compareTrace(exp, resultMap) {
find = true
break
}
}
if !find {
fmt.Println(fmt.Sprintf("complex%d.json", eid))
fmt.Println(string(act))
s.Fail(fmt.Sprintf("trace 2 file %d compares fail", eid))
}
}
})
s.Run("clean", func() {
res, e := client.Delete("rules/rule2")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)

res, e = client.Delete("rules/rule1")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)

res, e = client.Delete("streams/memStream")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)

res, e = client.Delete("streams/pushStream")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)
})
}

// Cover ratelimit, lookup table
func (s *TraceTestSuite) TestLookup() {
s.Run("init mem table", func() {
Expand Down

0 comments on commit 53fc736

Please sign in to comment.