Skip to content

Commit

Permalink
Add retry when sending record to clickhouse in theia_clickhouse_test (#…
Browse files Browse the repository at this point in the history
…92)

Signed-off-by: Yun-Tang Hsu <hsuy@vmware.com>
  • Loading branch information
yuntanghsu authored Aug 22, 2022
1 parent 162e514 commit d26e483
Showing 1 changed file with 41 additions and 34 deletions.
75 changes: 41 additions & 34 deletions test/e2e/theia_clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait"

"antrea.io/theia/pkg/theia/commands"
)
Expand Down Expand Up @@ -96,18 +97,14 @@ const (
numFieldsInTableInfo = 7
dateBaseName = "default"
defaultPath = "/var/lib/clickhouse/"
flowsRelatedTableNum = 4
)

var tableColumnNumberMap = map[string]string{
".inner.flows_node_view": "16",
".inner.flows_node_view_local": "16",
".inner.flows_pod_view": "20",
".inner.flows_pod_view_local": "20",
".inner.flows_policy_view": "27",
".inner.flows_policy_view_local": "27",
"flows": "49",
"flows_local": "49",
"recommendations": "4",
"recommendations_local": "4",
}

Expand Down Expand Up @@ -186,13 +183,13 @@ func testTheiaGetClickHouseDiskInfo(t *testing.T, data *TestData) {
// 1 default flows 50000 13.09 MiB 49
// 1 default recommendations 10 2.34 KiB 4
func testTheiaGetClickHouseTableInfo(t *testing.T, data *TestData, connect *sql.DB) {
// send 10000 records to clickhouse
commitNum := 10
// send 1000 records to clickhouse
commitNum := 1
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
sendTraffic(t, commitNum, connect)
sendTraffic(t, connect, commitNum)
}()
wg.Wait()
// retrieve metrics
Expand All @@ -210,11 +207,11 @@ func testTheiaGetClickHouseTableInfo(t *testing.T, data *TestData, connect *sql.
assert.Containsf(stdout, "TotalBytes", "stdout: %s", stdout)
assert.Containsf(stdout, "TotalCols", "stdout: %s", stdout)
// check four tables are in db
assert.Containsf(stdout, ".inner.flows_node_view", "stdout: %s", stdout)
assert.Containsf(stdout, ".inner.flows_pod_view", "stdout: %s", stdout)
assert.Containsf(stdout, ".inner.flows_policy_view", "stdout: %s", stdout)
assert.Containsf(stdout, "flows", "stdout: %s", stdout)
assert.Containsf(stdout, "recommendations", "stdout: %s", stdout)
assert.Containsf(stdout, ".inner.flows_node_view_local", "stdout: %s", stdout)
assert.Containsf(stdout, ".inner.flows_pod_view_local", "stdout: %s", stdout)
assert.Containsf(stdout, ".inner.flows_policy_view_local", "stdout: %s", stdout)
assert.Containsf(stdout, "flows_local", "stdout: %s", stdout)
assert.Containsf(stdout, "recommendations_local", "stdout: %s", stdout)

flowNum := 0
for i := 1; i < length; i++ {
Expand All @@ -228,7 +225,7 @@ func testTheiaGetClickHouseTableInfo(t *testing.T, data *TestData, connect *sql.
assert.Equal(numFieldsInTableInfo, len(tableInfoArray), "tableInfoArray: %s", tableInfoArray)
assert.Equalf(dateBaseName, tableInfoArray[1], "tableInfoArray: %s", tableInfoArray)
assert.Equal(expectedColNum, tableInfoArray[6], "tableInfoArray: %s", tableInfoArray)
if tableName == "flows" || tableName == "flows_local" {
if tableName == "flows_local" {
num, error := strconv.Atoi(tableInfoArray[3])
assert.NoError(error)
flowNum += num
Expand All @@ -242,12 +239,12 @@ func testTheiaGetClickHouseTableInfo(t *testing.T, data *TestData, connect *sql.
// Shard RowsPerSecond BytesPerSecond
// 1 4763 1.48 MiB
func testTheiaGetClickHouseInsertRate(t *testing.T, data *TestData, connect *sql.DB) {
commitNum := 70
commitNum := 80
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
sendTraffic(t, commitNum, connect)
sendTraffic(t, connect, commitNum)
}()
// need to wait at least 1 min to get the insertion rate.
// insertion rate is the average ProfileEvent_InsertedRows in system.metric_log in current minute
Expand All @@ -270,9 +267,7 @@ func testTheiaGetClickHouseInsertRate(t *testing.T, data *TestData, connect *sql
assert.Equal(4, len(tableInfoArray), "tableInfoArray: %s", tableInfoArray)
actualInsertRate, error := strconv.Atoi(tableInfoArray[1])
assert.NoError(error)
tableNum := len(tableColumnNumberMap)
percent := (actualInsertRate/tableNum - recordPerCommit/insertInterval) * 100 / (recordPerCommit / insertInterval)
assert.LessOrEqualf(percent, threshold, "stdout: %s, expectedInsertRate: %s", stdout, recordPerCommit/insertInterval)
assert.InDeltaf(actualInsertRate/flowsRelatedTableNum, recordPerCommit/insertInterval, float64(actualInsertRate/flowsRelatedTableNum)*0.30, "Difference between actual insertionRate and expected insertionRate should below 30%%, stdout: %s", stdout)
}
wg.Wait()
}
Expand Down Expand Up @@ -350,23 +345,35 @@ func addFakeRecord(t *testing.T, stmt *sql.Stmt) {

func writeRecords(t *testing.T, connect *sql.DB, wg *sync.WaitGroup) {
defer wg.Done()
// Test ping DB
var err error
err = connect.Ping()
require.NoError(t, err)
// Test open Transaction
tx, err := connect.Begin()
require.NoError(t, err)
stmt, _ := tx.Prepare(insertQuery)
defer stmt.Close()
for j := 0; j < recordPerCommit; j++ {
addFakeRecord(t, stmt)
}
err = tx.Commit()
assert.NoError(t, err)
err := wait.PollImmediate(5*defaultInterval, defaultTimeout, func() (bool, error) {
// Test ping DB
err := connect.Ping()
if err != nil {
return false, nil
}
// Test open Transaction
tx, err := connect.Begin()
if err != nil {
return false, nil
}
stmt, _ := tx.Prepare(insertQuery)
defer stmt.Close()
for j := 0; j < recordPerCommit; j++ {
addFakeRecord(t, stmt)
}
if err != nil {
return false, nil
}
err = tx.Commit()
if err != nil {
return false, nil
}
return true, nil
})
assert.NoError(t, err, "Unable to commit successfully to ClickHouse")
}

func sendTraffic(t *testing.T, commitNum int, connect *sql.DB) {
func sendTraffic(t *testing.T, connect *sql.DB, commitNum int) {
var wg sync.WaitGroup
for i := 0; i < commitNum; i++ {
wg.Add(1)
Expand Down

0 comments on commit d26e483

Please sign in to comment.