From a81f404db92deea3cfb9854588c77a85e9566e4e Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Sat, 31 Aug 2019 15:21:49 +0200 Subject: [PATCH 01/31] refactor code Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/client.go | 16 ++--- tools/mqtt-bench/results.go | 126 ++++++++++++++++++------------------ 2 files changed, 71 insertions(+), 71 deletions(-) diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index f8b1fcbcd1..2d9d9985a0 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -81,9 +81,9 @@ func (c *Client) runPublisher(r chan *runResults) { case m := <-pubMsgs: cid := m.ID if m.Error { - runResults.Failures++ + runResults.failures++ } else { - runResults.Successes++ + runResults.successes++ runResults.ID = cid times = append(times, float64(m.Delivered.Sub(m.Sent).Nanoseconds()/1000)) // in microseconds } @@ -91,12 +91,12 @@ func (c *Client) runPublisher(r chan *runResults) { // Calculate results duration := time.Now().Sub(started) timeMatrix := mat.NewDense(1, len(times), times) - runResults.MsgTimeMin = mat.Min(timeMatrix) - runResults.MsgTimeMax = mat.Max(timeMatrix) - runResults.MsgTimeMean = stat.Mean(times, nil) - runResults.MsgTimeStd = stat.StdDev(times, nil) - runResults.RunTime = duration.Seconds() - runResults.MsgsPerSec = float64(runResults.Successes) / duration.Seconds() + runResults.msgTimeMin = mat.Min(timeMatrix) + runResults.msgTimeMax = mat.Max(timeMatrix) + runResults.msgTimeMean = stat.Mean(times, nil) + runResults.msgTimeStd = stat.StdDev(times, nil) + runResults.runTime = duration.Seconds() + runResults.msgsPerSec = float64(runResults.successes) / duration.Seconds() // Report results and exit r <- runResults diff --git a/tools/mqtt-bench/results.go b/tools/mqtt-bench/results.go index 86a49e249d..3d8420e0d0 100644 --- a/tools/mqtt-bench/results.go +++ b/tools/mqtt-bench/results.go @@ -16,18 +16,18 @@ import ( type runResults struct { ID string `json:"id"` - Successes int64 `json:"successes"` - Failures int64 `json:"failures"` - RunTime float64 `json:"run_time"` - MsgTimeMin float64 `json:"msg_time_min"` - MsgTimeMax float64 `json:"msg_time_max"` - MsgTimeMean float64 `json:"msg_time_mean"` - MsgTimeStd float64 `json:"msg_time_std"` - MsgDelTimeMin float64 `json:"msg_del_time_min"` - MsgDelTimeMax float64 `json:"msg_del_time_max"` - MsgDelTimeMean float64 `json:"msg_del_time_mean"` - MsgDelTimeStd float64 `json:"msg_del_time_std"` - MsgsPerSec float64 `json:"msgs_per_sec"` + successes int64 `json:"successes"` + failures int64 `json:"failures"` + runTime float64 `json:"run_time"` + msgTimeMin float64 `json:"msg_time_min"` + msgTimeMax float64 `json:"msg_time_max"` + msgTimeMean float64 `json:"msg_time_mean"` + msgTimeStd float64 `json:"msg_time_std"` + msgDelTimeMin float64 `json:"msg_del_time_min"` + msgDelTimeMax float64 `json:"msg_del_time_max"` + msgDelTimeMean float64 `json:"msg_del_time_mean"` + msgDelTimeStd float64 `json:"msg_del_time_std"` + msgsPerSec float64 `json:"msgs_per_sec"` } type subTimes map[string][]float64 @@ -62,58 +62,58 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, subTi runTimes := make([]float64, len(results)) bws := make([]float64, len(results)) - totals.TotalRunTime = totalTime.Seconds() + totals.totalRunTime = totalTime.Seconds() - totals.MsgTimeMin = results[0].MsgTimeMin + totals.msgTimeMin = results[0].msgTimeMin for i, res := range results { if len(*subTimes) > 0 { times := mat.NewDense(1, len((*subTimes)[res.ID]), (*subTimes)[res.ID]) - subTimeRunResults.MsgTimeMin = mat.Min(times) - subTimeRunResults.MsgTimeMax = mat.Max(times) - subTimeRunResults.MsgTimeMean = stat.Mean((*subTimes)[res.ID], nil) - subTimeRunResults.MsgTimeStd = stat.StdDev((*subTimes)[res.ID], nil) + subTimeRunResults.msgTimeMin = mat.Min(times) + subTimeRunResults.msgTimeMax = mat.Max(times) + subTimeRunResults.msgTimeMean = stat.Mean((*subTimes)[res.ID], nil) + subTimeRunResults.msgTimeStd = stat.StdDev((*subTimes)[res.ID], nil) } - res.MsgDelTimeMin = subTimeRunResults.MsgTimeMin - res.MsgDelTimeMax = subTimeRunResults.MsgTimeMax - res.MsgDelTimeMean = subTimeRunResults.MsgTimeMean - res.MsgDelTimeStd = subTimeRunResults.MsgTimeStd + res.msgDelTimeMin = subTimeRunResults.msgTimeMin + res.msgDelTimeMax = subTimeRunResults.msgTimeMax + res.msgDelTimeMean = subTimeRunResults.msgTimeMean + res.msgDelTimeStd = subTimeRunResults.msgTimeStd - totals.Successes += res.Successes - totals.Failures += res.Failures - totals.TotalMsgsPerSec += res.MsgsPerSec + totals.successes += res.successes + totals.failures += res.failures + totals.totalMsgsPerSec += res.msgsPerSec - if res.MsgTimeMin < totals.MsgTimeMin { - totals.MsgTimeMin = res.MsgTimeMin + if res.msgTimeMin < totals.msgTimeMin { + totals.msgTimeMin = res.msgTimeMin } - if res.MsgTimeMax > totals.MsgTimeMax { - totals.MsgTimeMax = res.MsgTimeMax + if res.msgTimeMax > totals.msgTimeMax { + totals.msgTimeMax = res.msgTimeMax } - if subTimeRunResults.MsgTimeMin < totals.MsgDelTimeMin { - totals.MsgDelTimeMin = subTimeRunResults.MsgTimeMin + if subTimeRunResults.msgTimeMin < totals.msgDelTimeMin { + totals.msgDelTimeMin = subTimeRunResults.msgTimeMin } - if subTimeRunResults.MsgTimeMax > totals.MsgDelTimeMax { - totals.MsgDelTimeMax = subTimeRunResults.MsgTimeMax + if subTimeRunResults.msgTimeMax > totals.msgDelTimeMax { + totals.msgDelTimeMax = subTimeRunResults.msgTimeMax } - msgTimeMeansDelivered[i] = subTimeRunResults.MsgTimeMean - msgTimeMeans[i] = res.MsgTimeMean - msgsPerSecs[i] = res.MsgsPerSec - runTimes[i] = res.RunTime - bws[i] = res.MsgsPerSec + msgTimeMeansDelivered[i] = subTimeRunResults.msgTimeMean + msgTimeMeans[i] = res.msgTimeMean + msgsPerSecs[i] = res.msgsPerSec + runTimes[i] = res.runTime + bws[i] = res.msgsPerSec } - totals.Ratio = float64(totals.Successes) / float64(totals.Successes+totals.Failures) - totals.AvgMsgsPerSec = stat.Mean(msgsPerSecs, nil) - totals.AvgRunTime = stat.Mean(runTimes, nil) - totals.MsgDelTimeMeanAvg = stat.Mean(msgTimeMeansDelivered, nil) - totals.MsgDelTimeMeanStd = stat.StdDev(msgTimeMeansDelivered, nil) - totals.MsgTimeMeanAvg = stat.Mean(msgTimeMeans, nil) - totals.MsgTimeMeanStd = stat.StdDev(msgTimeMeans, nil) + totals.ratio = float64(totals.successes) / float64(totals.successes+totals.failures) + totals.avgMsgsPerSec = stat.Mean(msgsPerSecs, nil) + totals.avgRunTime = stat.Mean(runTimes, nil) + totals.msgDelTimeMeanAvg = stat.Mean(msgTimeMeansDelivered, nil) + totals.msgDelTimeMeanStd = stat.StdDev(msgTimeMeansDelivered, nil) + totals.msgTimeMeanAvg = stat.Mean(msgTimeMeans, nil) + totals.msgTimeMeanStd = stat.StdDev(msgTimeMeans, nil) return totals } @@ -137,27 +137,27 @@ func printResults(results []*runResults, totals *totalResults, format string, qu if !quiet { for _, res := range results { fmt.Printf("======= CLIENT %s =======\n", res.ID) - fmt.Printf("Ratio: %.3f (%d/%d)\n", float64(res.Successes)/float64(res.Successes+res.Failures), res.Successes, res.Successes+res.Failures) - fmt.Printf("Runtime (s): %.3f\n", res.RunTime) - fmt.Printf("Msg time min (us): %.3f\n", res.MsgTimeMin) - fmt.Printf("Msg time max (us): %.3f\n", res.MsgTimeMax) - fmt.Printf("Msg time mean (us): %.3f\n", res.MsgTimeMean) - fmt.Printf("Msg time std (us): %.3f\n", res.MsgTimeStd) - - fmt.Printf("Bandwidth (msg/sec): %.3f\n\n", res.MsgsPerSec) + fmt.Printf("Ratio: %.3f (%d/%d)\n", float64(res.successes)/float64(res.successes+res.failures), res.successes, res.successes+res.failures) + fmt.Printf("Runtime (s): %.3f\n", res.runTime) + fmt.Printf("Msg time min (us): %.3f\n", res.msgTimeMin) + fmt.Printf("Msg time max (us): %.3f\n", res.msgTimeMax) + fmt.Printf("Msg time mean (us): %.3f\n", res.msgTimeMean) + fmt.Printf("Msg time std (us): %.3f\n", res.msgTimeStd) + + fmt.Printf("Bandwidth (msg/sec): %.3f\n\n", res.msgsPerSec) } } fmt.Printf("========= TOTAL (%d) =========\n", len(results)) - fmt.Printf("Total Ratio: %.3f (%d/%d)\n", totals.Ratio, totals.Successes, totals.Successes+totals.Failures) - fmt.Printf("Total Runtime (sec): %.3f\n", totals.TotalRunTime) - fmt.Printf("Average Runtime (sec): %.3f\n", totals.AvgRunTime) - fmt.Printf("Msg time min (us): %.3f\n", totals.MsgTimeMin) - fmt.Printf("Msg time max (us): %.3f\n", totals.MsgTimeMax) - fmt.Printf("Msg time mean mean (us): %.3f\n", totals.MsgTimeMeanAvg) - fmt.Printf("Msg time mean std (us): %.3f\n", totals.MsgTimeMeanStd) - - fmt.Printf("Average Bandwidth (msg/sec): %.3f\n", totals.AvgMsgsPerSec) - fmt.Printf("Total Bandwidth (msg/sec): %.3f\n", totals.TotalMsgsPerSec) + fmt.Printf("Total Ratio: %.3f (%d/%d)\n", totals.ratio, totals.successes, totals.successes+totals.failures) + fmt.Printf("Total Runtime (sec): %.3f\n", totals.totalRunTime) + fmt.Printf("Average Runtime (sec): %.3f\n", totals.avgRunTime) + fmt.Printf("Msg time min (us): %.3f\n", totals.msgTimeMin) + fmt.Printf("Msg time max (us): %.3f\n", totals.msgTimeMax) + fmt.Printf("Msg time mean mean (us): %.3f\n", totals.msgTimeMeanAvg) + fmt.Printf("Msg time mean std (us): %.3f\n", totals.msgTimeMeanStd) + + fmt.Printf("Average Bandwidth (msg/sec): %.3f\n", totals.avgMsgsPerSec) + fmt.Printf("Total Bandwidth (msg/sec): %.3f\n", totals.totalMsgsPerSec) } return } From fb5cd1280695202ac61126515a253d1158c0da8e Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Sat, 31 Aug 2019 16:33:27 +0200 Subject: [PATCH 02/31] connect each thing with each channel Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 69e064aae7..50e3ac64cf 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -112,12 +112,13 @@ func Benchmark(cfg Config) { resCh := make(chan *runResults) done := make(chan bool) + start := time.Now() n := len(mf.Channels) var cert tls.Certificate // Subscribers for i := 0; i < cfg.Test.Subs; i++ { - mfChann := mf.Channels[i%n] + mfConn := mf.Channels[i%n] mfThing := mf.Things[i%n] if cfg.MQTT.TLS.MTLS { @@ -132,7 +133,7 @@ func Benchmark(cfg Config) { BrokerURL: cfg.MQTT.Broker.URL, BrokerUser: mfThing.ThingID, BrokerPass: mfThing.ThingKey, - MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfChann.ChannelID), + MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfConn.ChannelID), MsgSize: cfg.MQTT.Message.Size, MsgCount: cfg.Test.Count, MsgQoS: byte(cfg.MQTT.Message.QoS), @@ -155,7 +156,7 @@ func Benchmark(cfg Config) { start := time.Now() // Publishers for i := 0; i < cfg.Test.Pubs; i++ { - mfChann := mf.Channels[i%n] + mfConn := mf.Channels[i%n] mfThing := mf.Things[i%n] if cfg.MQTT.TLS.MTLS { @@ -170,7 +171,7 @@ func Benchmark(cfg Config) { BrokerURL: cfg.MQTT.Broker.URL, BrokerUser: mfThing.ThingID, BrokerPass: mfThing.ThingKey, - MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfChann.ChannelID), + MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfConn.ChannelID), MsgSize: cfg.MQTT.Message.Size, MsgCount: cfg.Test.Count, MsgQoS: byte(cfg.MQTT.Message.QoS), From 68003d2dd914227ab2c808567fd5abe30348c66a Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Sun, 1 Sep 2019 08:21:01 +0200 Subject: [PATCH 03/31] reverting - structure fields must be exported Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/client.go | 10 +-- tools/mqtt-bench/results.go | 128 ++++++++++++++++++------------------ 2 files changed, 69 insertions(+), 69 deletions(-) diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index 2d9d9985a0..f73b1c0382 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -91,12 +91,12 @@ func (c *Client) runPublisher(r chan *runResults) { // Calculate results duration := time.Now().Sub(started) timeMatrix := mat.NewDense(1, len(times), times) - runResults.msgTimeMin = mat.Min(timeMatrix) - runResults.msgTimeMax = mat.Max(timeMatrix) - runResults.msgTimeMean = stat.Mean(times, nil) - runResults.msgTimeStd = stat.StdDev(times, nil) + runResults.MsgTimeMin = mat.Min(timeMatrix) + runResults.MsgTimeMax = mat.Max(timeMatrix) + runResults.MsgTimeMean = stat.Mean(times, nil) + runResults.MsgTimeStd = stat.StdDev(times, nil) runResults.runTime = duration.Seconds() - runResults.msgsPerSec = float64(runResults.successes) / duration.Seconds() + runResults.MsgsPerSec = float64(runResults.successes) / duration.Seconds() // Report results and exit r <- runResults diff --git a/tools/mqtt-bench/results.go b/tools/mqtt-bench/results.go index 3d8420e0d0..343c10d68a 100644 --- a/tools/mqtt-bench/results.go +++ b/tools/mqtt-bench/results.go @@ -16,18 +16,18 @@ import ( type runResults struct { ID string `json:"id"` - successes int64 `json:"successes"` - failures int64 `json:"failures"` - runTime float64 `json:"run_time"` - msgTimeMin float64 `json:"msg_time_min"` - msgTimeMax float64 `json:"msg_time_max"` - msgTimeMean float64 `json:"msg_time_mean"` - msgTimeStd float64 `json:"msg_time_std"` - msgDelTimeMin float64 `json:"msg_del_time_min"` - msgDelTimeMax float64 `json:"msg_del_time_max"` - msgDelTimeMean float64 `json:"msg_del_time_mean"` - msgDelTimeStd float64 `json:"msg_del_time_std"` - msgsPerSec float64 `json:"msgs_per_sec"` + Successes int64 `json:"successes"` + Failures int64 `json:"failures"` + RunTime float64 `json:"run_time"` + MsgTimeMin float64 `json:"msg_time_min"` + MsgTimeMax float64 `json:"msg_time_max"` + MsgTimeMean float64 `json:"msg_time_mean"` + MsgTimeStd float64 `json:"msg_time_std"` + MsgDelTimeMin float64 `json:"msg_del_time_min"` + MsgDelTimeMax float64 `json:"msg_del_time_max"` + MsgDelTimeMean float64 `json:"msg_del_time_mean"` + MsgDelTimeStd float64 `json:"msg_del_time_std"` + MsgsPerSec float64 `json:"msgs_per_sec"` } type subTimes map[string][]float64 @@ -55,65 +55,65 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, subTi return nil } totals := new(totalResults) - subTimeRunResults := runResults{} + subTimerunResults := runResults{} msgTimeMeans := make([]float64, len(results)) msgTimeMeansDelivered := make([]float64, len(results)) msgsPerSecs := make([]float64, len(results)) runTimes := make([]float64, len(results)) bws := make([]float64, len(results)) - totals.totalRunTime = totalTime.Seconds() + totals.TotalRunTime = totalTime.Seconds() - totals.msgTimeMin = results[0].msgTimeMin + totals.MsgTimeMin = results[0].MsgTimeMin for i, res := range results { if len(*subTimes) > 0 { times := mat.NewDense(1, len((*subTimes)[res.ID]), (*subTimes)[res.ID]) - subTimeRunResults.msgTimeMin = mat.Min(times) - subTimeRunResults.msgTimeMax = mat.Max(times) - subTimeRunResults.msgTimeMean = stat.Mean((*subTimes)[res.ID], nil) - subTimeRunResults.msgTimeStd = stat.StdDev((*subTimes)[res.ID], nil) + subTimerunResults.MsgTimeMin = mat.Min(times) + subTimerunResults.MsgTimeMax = mat.Max(times) + subTimerunResults.MsgTimeMean = stat.Mean((*subTimes)[res.ID], nil) + subTimerunResults.MsgTimeStd = stat.StdDev((*subTimes)[res.ID], nil) } - res.msgDelTimeMin = subTimeRunResults.msgTimeMin - res.msgDelTimeMax = subTimeRunResults.msgTimeMax - res.msgDelTimeMean = subTimeRunResults.msgTimeMean - res.msgDelTimeStd = subTimeRunResults.msgTimeStd + res.MsgDelTimeMin = subTimerunResults.MsgTimeMin + res.MsgDelTimeMax = subTimerunResults.MsgTimeMax + res.MsgDelTimeMean = subTimerunResults.MsgTimeMean + res.MsgDelTimeStd = subTimerunResults.MsgTimeStd - totals.successes += res.successes - totals.failures += res.failures - totals.totalMsgsPerSec += res.msgsPerSec + totals.Successes += res.Successes + totals.Failures += res.Failures + totals.TotalMsgsPerSec += res.MsgsPerSec - if res.msgTimeMin < totals.msgTimeMin { - totals.msgTimeMin = res.msgTimeMin + if res.MsgTimeMin < totals.MsgTimeMin { + totals.MsgTimeMin = res.MsgTimeMin } - if res.msgTimeMax > totals.msgTimeMax { - totals.msgTimeMax = res.msgTimeMax + if res.MsgTimeMax > totals.MsgTimeMax { + totals.MsgTimeMax = res.MsgTimeMax } - if subTimeRunResults.msgTimeMin < totals.msgDelTimeMin { - totals.msgDelTimeMin = subTimeRunResults.msgTimeMin + if subTimerunResults.MsgTimeMin < totals.MsgDelTimeMin { + totals.MsgDelTimeMin = subTimerunResults.MsgTimeMin } - if subTimeRunResults.msgTimeMax > totals.msgDelTimeMax { - totals.msgDelTimeMax = subTimeRunResults.msgTimeMax + if subTimerunResults.MsgTimeMax > totals.MsgDelTimeMax { + totals.MsgDelTimeMax = subTimerunResults.MsgTimeMax } - msgTimeMeansDelivered[i] = subTimeRunResults.msgTimeMean - msgTimeMeans[i] = res.msgTimeMean - msgsPerSecs[i] = res.msgsPerSec - runTimes[i] = res.runTime - bws[i] = res.msgsPerSec + msgTimeMeansDelivered[i] = subTimerunResults.MsgTimeMean + msgTimeMeans[i] = res.MsgTimeMean + msgsPerSecs[i] = res.MsgsPerSec + runTimes[i] = res.RunTime + bws[i] = res.MsgsPerSec } - totals.ratio = float64(totals.successes) / float64(totals.successes+totals.failures) - totals.avgMsgsPerSec = stat.Mean(msgsPerSecs, nil) - totals.avgRunTime = stat.Mean(runTimes, nil) - totals.msgDelTimeMeanAvg = stat.Mean(msgTimeMeansDelivered, nil) - totals.msgDelTimeMeanStd = stat.StdDev(msgTimeMeansDelivered, nil) - totals.msgTimeMeanAvg = stat.Mean(msgTimeMeans, nil) - totals.msgTimeMeanStd = stat.StdDev(msgTimeMeans, nil) + totals.Ratio = float64(totals.Successes) / float64(totals.Successes+totals.Failures) + totals.AvgMsgsPerSec = stat.Mean(msgsPerSecs, nil) + totals.AvgRunTime = stat.Mean(runTimes, nil) + totals.MsgDelTimeMeanAvg = stat.Mean(msgTimeMeansDelivered, nil) + totals.MsgDelTimeMeanStd = stat.StdDev(msgTimeMeansDelivered, nil) + totals.MsgTimeMeanAvg = stat.Mean(msgTimeMeans, nil) + totals.MsgTimeMeanStd = stat.StdDev(msgTimeMeans, nil) return totals } @@ -137,27 +137,27 @@ func printResults(results []*runResults, totals *totalResults, format string, qu if !quiet { for _, res := range results { fmt.Printf("======= CLIENT %s =======\n", res.ID) - fmt.Printf("Ratio: %.3f (%d/%d)\n", float64(res.successes)/float64(res.successes+res.failures), res.successes, res.successes+res.failures) - fmt.Printf("Runtime (s): %.3f\n", res.runTime) - fmt.Printf("Msg time min (us): %.3f\n", res.msgTimeMin) - fmt.Printf("Msg time max (us): %.3f\n", res.msgTimeMax) - fmt.Printf("Msg time mean (us): %.3f\n", res.msgTimeMean) - fmt.Printf("Msg time std (us): %.3f\n", res.msgTimeStd) - - fmt.Printf("Bandwidth (msg/sec): %.3f\n\n", res.msgsPerSec) + fmt.Printf("Ratio: %.3f (%d/%d)\n", float64(res.Successes)/float64(res.Successes+res.Failures), res.Successes, res.Successes+res.Failures) + fmt.Printf("Runtime (s): %.3f\n", res.RunTime) + fmt.Printf("Msg time min (us): %.3f\n", res.MsgTimeMin) + fmt.Printf("Msg time max (us): %.3f\n", res.MsgTimeMax) + fmt.Printf("Msg time mean (us): %.3f\n", res.MsgTimeMean) + fmt.Printf("Msg time std (us): %.3f\n", res.MsgTimeStd) + + fmt.Printf("Bandwidth (msg/sec): %.3f\n\n", res.MsgsPerSec) } } fmt.Printf("========= TOTAL (%d) =========\n", len(results)) - fmt.Printf("Total Ratio: %.3f (%d/%d)\n", totals.ratio, totals.successes, totals.successes+totals.failures) - fmt.Printf("Total Runtime (sec): %.3f\n", totals.totalRunTime) - fmt.Printf("Average Runtime (sec): %.3f\n", totals.avgRunTime) - fmt.Printf("Msg time min (us): %.3f\n", totals.msgTimeMin) - fmt.Printf("Msg time max (us): %.3f\n", totals.msgTimeMax) - fmt.Printf("Msg time mean mean (us): %.3f\n", totals.msgTimeMeanAvg) - fmt.Printf("Msg time mean std (us): %.3f\n", totals.msgTimeMeanStd) - - fmt.Printf("Average Bandwidth (msg/sec): %.3f\n", totals.avgMsgsPerSec) - fmt.Printf("Total Bandwidth (msg/sec): %.3f\n", totals.totalMsgsPerSec) + fmt.Printf("Total Ratio: %.3f (%d/%d)\n", totals.Ratio, totals.Successes, totals.Successes+totals.Failures) + fmt.Printf("Total Runtime (sec): %.3f\n", totals.TotalRunTime) + fmt.Printf("Average Runtime (sec): %.3f\n", totals.AvgRunTime) + fmt.Printf("Msg time min (us): %.3f\n", totals.MsgTimeMin) + fmt.Printf("Msg time max (us): %.3f\n", totals.MsgTimeMax) + fmt.Printf("Msg time mean mean (us): %.3f\n", totals.MsgTimeMeanAvg) + fmt.Printf("Msg time mean std (us): %.3f\n", totals.MsgTimeMeanStd) + + fmt.Printf("Average Bandwidth (msg/sec): %.3f\n", totals.AvgMsgsPerSec) + fmt.Printf("Total Bandwidth (msg/sec): %.3f\n", totals.TotalMsgsPerSec) } return } From f6e4d2390fb540789f9c000644560adb1895b3d5 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Sun, 1 Sep 2019 08:24:18 +0200 Subject: [PATCH 04/31] reverting - structure fields must be exported Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/client.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index f73b1c0382..f8b1fcbcd1 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -81,9 +81,9 @@ func (c *Client) runPublisher(r chan *runResults) { case m := <-pubMsgs: cid := m.ID if m.Error { - runResults.failures++ + runResults.Failures++ } else { - runResults.successes++ + runResults.Successes++ runResults.ID = cid times = append(times, float64(m.Delivered.Sub(m.Sent).Nanoseconds()/1000)) // in microseconds } @@ -95,8 +95,8 @@ func (c *Client) runPublisher(r chan *runResults) { runResults.MsgTimeMax = mat.Max(timeMatrix) runResults.MsgTimeMean = stat.Mean(times, nil) runResults.MsgTimeStd = stat.StdDev(times, nil) - runResults.runTime = duration.Seconds() - runResults.MsgsPerSec = float64(runResults.successes) / duration.Seconds() + runResults.RunTime = duration.Seconds() + runResults.MsgsPerSec = float64(runResults.Successes) / duration.Seconds() // Report results and exit r <- runResults From 1a1283576853ae9835e75dc47d8e93f2e02a8e10 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Sun, 1 Sep 2019 08:32:43 +0200 Subject: [PATCH 05/31] revert some names Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/results.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tools/mqtt-bench/results.go b/tools/mqtt-bench/results.go index 343c10d68a..86a49e249d 100644 --- a/tools/mqtt-bench/results.go +++ b/tools/mqtt-bench/results.go @@ -55,7 +55,7 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, subTi return nil } totals := new(totalResults) - subTimerunResults := runResults{} + subTimeRunResults := runResults{} msgTimeMeans := make([]float64, len(results)) msgTimeMeansDelivered := make([]float64, len(results)) msgsPerSecs := make([]float64, len(results)) @@ -69,16 +69,16 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, subTi if len(*subTimes) > 0 { times := mat.NewDense(1, len((*subTimes)[res.ID]), (*subTimes)[res.ID]) - subTimerunResults.MsgTimeMin = mat.Min(times) - subTimerunResults.MsgTimeMax = mat.Max(times) - subTimerunResults.MsgTimeMean = stat.Mean((*subTimes)[res.ID], nil) - subTimerunResults.MsgTimeStd = stat.StdDev((*subTimes)[res.ID], nil) + subTimeRunResults.MsgTimeMin = mat.Min(times) + subTimeRunResults.MsgTimeMax = mat.Max(times) + subTimeRunResults.MsgTimeMean = stat.Mean((*subTimes)[res.ID], nil) + subTimeRunResults.MsgTimeStd = stat.StdDev((*subTimes)[res.ID], nil) } - res.MsgDelTimeMin = subTimerunResults.MsgTimeMin - res.MsgDelTimeMax = subTimerunResults.MsgTimeMax - res.MsgDelTimeMean = subTimerunResults.MsgTimeMean - res.MsgDelTimeStd = subTimerunResults.MsgTimeStd + res.MsgDelTimeMin = subTimeRunResults.MsgTimeMin + res.MsgDelTimeMax = subTimeRunResults.MsgTimeMax + res.MsgDelTimeMean = subTimeRunResults.MsgTimeMean + res.MsgDelTimeStd = subTimeRunResults.MsgTimeStd totals.Successes += res.Successes totals.Failures += res.Failures @@ -92,15 +92,15 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, subTi totals.MsgTimeMax = res.MsgTimeMax } - if subTimerunResults.MsgTimeMin < totals.MsgDelTimeMin { - totals.MsgDelTimeMin = subTimerunResults.MsgTimeMin + if subTimeRunResults.MsgTimeMin < totals.MsgDelTimeMin { + totals.MsgDelTimeMin = subTimeRunResults.MsgTimeMin } - if subTimerunResults.MsgTimeMax > totals.MsgDelTimeMax { - totals.MsgDelTimeMax = subTimerunResults.MsgTimeMax + if subTimeRunResults.MsgTimeMax > totals.MsgDelTimeMax { + totals.MsgDelTimeMax = subTimeRunResults.MsgTimeMax } - msgTimeMeansDelivered[i] = subTimerunResults.MsgTimeMean + msgTimeMeansDelivered[i] = subTimeRunResults.MsgTimeMean msgTimeMeans[i] = res.MsgTimeMean msgsPerSecs[i] = res.MsgsPerSec runTimes[i] = res.RunTime From 1c4a008de935de0ccd473aad9a84e34a9aec98f8 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Sun, 1 Sep 2019 08:34:52 +0200 Subject: [PATCH 06/31] move meausuring time start Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 50e3ac64cf..7206e26ffd 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -112,7 +112,6 @@ func Benchmark(cfg Config) { resCh := make(chan *runResults) done := make(chan bool) - start := time.Now() n := len(mf.Channels) var cert tls.Certificate From b276e848bc8f45333e618c9c70caca86c8cc3942 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Sun, 1 Sep 2019 16:02:48 +0200 Subject: [PATCH 07/31] add pub-to-sub delivery time measure Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 23 +++++++++++++++++++++-- tools/mqtt-bench/client.go | 18 ++++++++++++++++++ tools/mqtt-bench/results.go | 11 ++++++++--- 3 files changed, 47 insertions(+), 5 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 7206e26ffd..ea143a22ff 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -192,8 +192,27 @@ func Benchmark(cfg Config) { results = make([]*runResults, cfg.Test.Pubs) } - for i := 0; i < cfg.Test.Pubs; i++ { - results[i] = <-resCh + k, j := 0 + for i := 0; i < cfg.Test.Pubs*cfg.Test.Count; i++ { + + select { + case result := <-resCh: + { + if k > cfg.Test.Pubs { + log.Printf("Something went wrong with messages") + } + results[k] = result + k++ + } + case <-done: + { + // every time subscriber receives a message it will signal done + if j >= cfg.Test.Pubs*cfg.Test.Subs*cfg.Test.Count { + break + } + j++ + } + } } totalTime := time.Now().Sub(start) diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index f8b1fcbcd1..2c7b320ba1 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -141,10 +141,27 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subTimes *subTimes, done *chan bo log.Printf("Client %v had lost connection to the broker: %s\n", c.ID, reason.Error()) } c.connect(onConnected, connLost) + i := 0 token := (*c.mqttClient).Subscribe(c.MsgTopic, c.MsgQoS, func(cl mqtt.Client, msg mqtt.Message) { + + i++ mp := messagePayload{} err := json.Unmarshal(msg.Payload(), &mp) + arrival := time.Now() + //times = append(times, float64(m.Delivered.Sub(m.Sent).Nanoseconds()/1000)) // in microseconds + arrivalTimes, ok := (*subTimes)[mp.ID] + if !ok { + clientArrivalTimes := make([]float64, 50) + + arrivalTimes = &clientArrivalTimes + (*subTimes)[mp.ID] = arrivalTimes + + } + // remove - fmt.Printf("%d dif %f:\n", i, float64(arrival.Sub(mp.Sent).Nanoseconds()/1000)) + *arrivalTimes = append(*arrivalTimes, float64(arrival.Sub(mp.Sent).Nanoseconds()/1000)) + + *doneSub <- true if err != nil { log.Printf("Client %s failed to decode message\n", clientID) } @@ -167,6 +184,7 @@ func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan m.Sent = time.Now() m.ID = clientID m.Payload.Sent = m.Sent + m.Payload.ID = clientID pload, err := json.Marshal(m.Payload) if err != nil { diff --git a/tools/mqtt-bench/results.go b/tools/mqtt-bench/results.go index 86a49e249d..c3d39a2e18 100644 --- a/tools/mqtt-bench/results.go +++ b/tools/mqtt-bench/results.go @@ -67,12 +67,12 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, subTi totals.MsgTimeMin = results[0].MsgTimeMin for i, res := range results { if len(*subTimes) > 0 { - times := mat.NewDense(1, len((*subTimes)[res.ID]), (*subTimes)[res.ID]) + times := mat.NewDense(1, len(*((*subTimes)[res.ID])), *((*subTimes)[res.ID])) subTimeRunResults.MsgTimeMin = mat.Min(times) subTimeRunResults.MsgTimeMax = mat.Max(times) - subTimeRunResults.MsgTimeMean = stat.Mean((*subTimes)[res.ID], nil) - subTimeRunResults.MsgTimeStd = stat.StdDev((*subTimes)[res.ID], nil) + subTimeRunResults.MsgTimeMean = stat.Mean(*((*subTimes)[res.ID]), nil) + subTimeRunResults.MsgTimeStd = stat.StdDev(*((*subTimes)[res.ID]), nil) } res.MsgDelTimeMin = subTimeRunResults.MsgTimeMin @@ -144,6 +144,11 @@ func printResults(results []*runResults, totals *totalResults, format string, qu fmt.Printf("Msg time mean (us): %.3f\n", res.MsgTimeMean) fmt.Printf("Msg time std (us): %.3f\n", res.MsgTimeStd) + fmt.Printf("Msg time min (us): %.3f\n", res.MsgDelTimeMin) + fmt.Printf("Msg time max (us): %.3f\n", res.MsgDelTimeMax) + fmt.Printf("Msg time mean (us): %.3f\n", res.MsgDelTimeMean) + fmt.Printf("Msg time std (us): %.3f\n", res.MsgDelTimeStd) + fmt.Printf("Bandwidth (msg/sec): %.3f\n\n", res.MsgsPerSec) } } From b3ed709c62e8a8d687870e5b5936c46ea2025cb4 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Mon, 2 Sep 2019 14:50:30 +0200 Subject: [PATCH 08/31] add pub-to-sub delivery time measure Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 67 ++++++++++++++++++++++++++++++------- tools/mqtt-bench/client.go | 15 ++++++--- tools/mqtt-bench/results.go | 2 +- 3 files changed, 66 insertions(+), 18 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index ea143a22ff..44a389fe26 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -14,6 +14,7 @@ import ( "time" "github.com/BurntSushi/toml" + "github.com/cisco/senml" ) // Keep struct names exported, otherwise Viper unmarshaling won't work @@ -22,10 +23,11 @@ type mqttBrokerConfig struct { } type mqttMessageConfig struct { - Size int `toml:"size" mapstructure:"size"` - Format string `toml:"format" mapstructure:"format"` - QoS int `toml:"qos" mapstructure:"qos"` - Retain bool `toml:"retain" mapstructure:"retain"` + Size int `toml:"size" mapstructure:"size"` + Payload string `toml:"payload" mapstructure:"payload"` + Format string `toml:"format" mapstructure:"format"` + QoS int `toml:"qos" mapstructure:"qos"` + Retain bool `toml:"retain" mapstructure:"retain"` } type mqttTLSConfig struct { @@ -102,8 +104,6 @@ func Benchmark(cfg Config) { caByte, _ = ioutil.ReadAll(caFile) } - payload := string(make([]byte, cfg.MQTT.Message.Size)) - mf := mainflux{} if _, err := toml.DecodeFile(cfg.Mf.ConnFile, &mf); err != nil { log.Fatalf("Cannot load Mainflux connections config %s \nuse tools/provision to create file", cfg.Mf.ConnFile) @@ -111,6 +111,7 @@ func Benchmark(cfg Config) { resCh := make(chan *runResults) done := make(chan bool) + doneSub := make(chan bool) n := len(mf.Channels) var cert tls.Certificate @@ -142,18 +143,25 @@ func Benchmark(cfg Config) { CA: caByte, ClientCert: cert, Retain: cfg.MQTT.Message.Retain, - Message: payload, + Message: nil, } wg.Add(1) - go c.runSubscriber(&wg, &subTimes, &done) + go c.runSubscriber(&wg, &subTimes, &done, &doneSub) } wg.Wait() - start := time.Now() // Publishers + start := time.Now() + if len(cfg.MQTT.Message) == 0 { + payload := string(make([]byte, cfg.MQTT.Message.Size)) + } else { + + } + + msg := prepareSenML(cfg.Test.Count, cfg.MQTT.Message) for i := 0; i < cfg.Test.Pubs; i++ { mfConn := mf.Channels[i%n] mfThing := mf.Things[i%n] @@ -192,7 +200,8 @@ func Benchmark(cfg Config) { results = make([]*runResults, cfg.Test.Pubs) } - k, j := 0 + k := 0 + j := 0 for i := 0; i < cfg.Test.Pubs*cfg.Test.Count; i++ { select { @@ -206,8 +215,8 @@ func Benchmark(cfg Config) { } case <-done: { - // every time subscriber receives a message it will signal done - if j >= cfg.Test.Pubs*cfg.Test.Subs*cfg.Test.Count { + // every time subscriber receives MsgCount messages it will signal done + if j >= cfg.Test.Subs { break } j++ @@ -224,3 +233,37 @@ func Benchmark(cfg Config) { // Print sats printResults(results, totals, cfg.MQTT.Message.Format, cfg.Log.Quiet) } + +func prepareSenML(sz int, msg senml.SenMLRecord) senml.SenML { + + t := (float64)(time.Now().Nanosecond()) + timeStamp := senml.SenMLRecord{ + BaseName: "", + Name: "timeSent", + Value: &t, + } + + records := make([]senml.SenMLRecord, sz) + records[0] = timeStamp + + for i := 1; i < sz; i++ { + records[i] = msg + } + + s := senml.SenML{ + Records: records, + } + + return s +} + +func getPayload(cid string, time float64, f func() senml.SenML) ([]byte, error) { + s := f() + s.Records[0].Value = &time + s.Records[0].BaseName = cid + payload, err := senml.Encode(s, senml.JSON, senml.OutputOptions{}) + if err != nil { + return nil, err + } + return payload, nil +} diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index 2c7b320ba1..99d3991357 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/cisco/senml" mqtt "github.com/eclipse/paho.mqtt.golang" mat "gonum.org/v1/gonum/mat" stat "gonum.org/v1/gonum/stat" @@ -27,7 +28,7 @@ type Client struct { BrokerUser string BrokerPass string MsgTopic string - Message string + Message func(cid string, time float64, f func() senml.SenML) ([]byte, error) MsgSize int MsgCount int MsgQoS byte @@ -106,11 +107,11 @@ func (c *Client) runPublisher(r chan *runResults) { } // Subscriber -func (c *Client) runSubscriber(wg *sync.WaitGroup, subTimes *subTimes, done *chan bool) { +func (c *Client) runSubscriber(wg *sync.WaitGroup, subTimes *subTimes, done, doneSub *chan bool) { defer wg.Done() // Start subscriber - c.subscribe(wg, subTimes, done) + c.subscribe(wg, subTimes, done, doneSub) } func (c *Client) generate(ch chan *message, done chan bool) { @@ -127,7 +128,7 @@ func (c *Client) generate(ch chan *message, done chan bool) { return } -func (c *Client) subscribe(wg *sync.WaitGroup, subTimes *subTimes, done *chan bool) { +func (c *Client) subscribe(wg *sync.WaitGroup, subTimes *subTimes, done, doneSub *chan bool) { clientID := fmt.Sprintf("sub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID) c.ID = clientID @@ -139,6 +140,7 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subTimes *subTimes, done *chan bo connLost := func(client mqtt.Client, reason error) { log.Printf("Client %v had lost connection to the broker: %s\n", c.ID, reason.Error()) + *doneSub <- true } c.connect(onConnected, connLost) i := 0 @@ -161,7 +163,10 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subTimes *subTimes, done *chan bo // remove - fmt.Printf("%d dif %f:\n", i, float64(arrival.Sub(mp.Sent).Nanoseconds()/1000)) *arrivalTimes = append(*arrivalTimes, float64(arrival.Sub(mp.Sent).Nanoseconds()/1000)) - *doneSub <- true + if i == c.MsgCount { + *doneSub <- true + } + if err != nil { log.Printf("Client %s failed to decode message\n", clientID) } diff --git a/tools/mqtt-bench/results.go b/tools/mqtt-bench/results.go index c3d39a2e18..cdd188920c 100644 --- a/tools/mqtt-bench/results.go +++ b/tools/mqtt-bench/results.go @@ -30,7 +30,7 @@ type runResults struct { MsgsPerSec float64 `json:"msgs_per_sec"` } -type subTimes map[string][]float64 +type subTimes map[string](*[]float64) type totalResults struct { Ratio float64 `json:"ratio"` From def3917238c4303888af234ad5e50ac253fcd7bc Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Tue, 3 Sep 2019 11:16:41 +0200 Subject: [PATCH 09/31] add pub-to-sub delivery time measure Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 59 ++++++++++++++++++++------ tools/mqtt-bench/client.go | 20 +++++---- tools/mqtt-bench/cmd/main.go | 3 +- tools/mqtt-bench/templates/config.toml | 18 +++++++- writers/postgres/init.go | 2 +- 5 files changed, 76 insertions(+), 26 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 44a389fe26..5c7445bb92 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -5,6 +5,7 @@ package bench import ( "crypto/tls" + "encoding/json" "fmt" "io/ioutil" "log" @@ -91,7 +92,7 @@ func Benchmark(cfg Config) { var wg sync.WaitGroup var err error - checkConnection(cfg.MQTT.Broker.URL, 1) + //checkConnection(cfg.MQTT.Broker.URL, 1) subTimes := make(subTimes) var caByte []byte if cfg.MQTT.TLS.MTLS { @@ -116,6 +117,10 @@ func Benchmark(cfg Config) { n := len(mf.Channels) var cert tls.Certificate + msg := prepareSenML(cfg.MQTT.Message.Size, cfg.MQTT.Message.Payload) + getSenML := func() senml.SenML { + return msg + } // Subscribers for i := 0; i < cfg.Test.Subs; i++ { mfConn := mf.Channels[i%n] @@ -155,13 +160,7 @@ func Benchmark(cfg Config) { // Publishers start := time.Now() - if len(cfg.MQTT.Message) == 0 { - payload := string(make([]byte, cfg.MQTT.Message.Size)) - } else { - - } - msg := prepareSenML(cfg.Test.Count, cfg.MQTT.Message) for i := 0; i < cfg.Test.Pubs; i++ { mfConn := mf.Channels[i%n] mfThing := mf.Things[i%n] @@ -188,7 +187,8 @@ func Benchmark(cfg Config) { CA: caByte, ClientCert: cert, Retain: cfg.MQTT.Message.Retain, - Message: payload, + Message: getPayload, + GetSenML: getSenML, } go c.runPublisher(resCh) @@ -234,20 +234,53 @@ func Benchmark(cfg Config) { printResults(results, totals, cfg.MQTT.Message.Format, cfg.Log.Quiet) } -func prepareSenML(sz int, msg senml.SenMLRecord) senml.SenML { +func prepareSenML(sz int, payload string) senml.SenML { t := (float64)(time.Now().Nanosecond()) timeStamp := senml.SenMLRecord{ - BaseName: "", + BaseName: "pub-2019-08-31T12:38:25.139715762+02:00-57", Name: "timeSent", Value: &t, } - records := make([]senml.SenMLRecord, sz) + tsByte, err := json.Marshal(timeStamp) + if err != nil { + log.Fatalf("Failed to create test message") + } + + pload := []byte(payload) + if len(payload) == 0 && sz > len(tsByte) { + pload = make([]byte, sz-len(tsByte)) + } + + sml := senml.SenMLRecord{} + err = json.Unmarshal(pload, &sml) + if err != nil { + log.Println("cannot unmarshal payload") + } + + msgByte, err := json.Marshal(sml) + if err != nil { + log.Fatalf("Failed to create test message") + } + + // how many records to make messae long sz bytes + n := (sz-len(tsByte))/len(msgByte) + 1 + if sz < len(tsByte) { + n = 1 + } + + records := make([]senml.SenMLRecord, n) records[0] = timeStamp + for i := 1; i < n; i++ { + // is this needed + // i think we need id to be saved with db writer to t + sml.Time = float64(time.Now().Nanosecond()) + records[i] = sml + } - for i := 1; i < sz; i++ { - records[i] = msg + for i := 1; i < n; i++ { + log.Printf("%f\n", records[i].Time) } s := senml.SenML{ diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index 99d3991357..52335f543a 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -29,6 +29,7 @@ type Client struct { BrokerPass string MsgTopic string Message func(cid string, time float64, f func() senml.SenML) ([]byte, error) + GetSenML func() senml.SenML MsgSize int MsgCount int MsgQoS byte @@ -148,20 +149,24 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subTimes *subTimes, done, doneSub token := (*c.mqttClient).Subscribe(c.MsgTopic, c.MsgQoS, func(cl mqtt.Client, msg mqtt.Message) { i++ - mp := messagePayload{} + //mp := messagePayload{} + mp := senml.SenML{} err := json.Unmarshal(msg.Payload(), &mp) arrival := time.Now() - //times = append(times, float64(m.Delivered.Sub(m.Sent).Nanoseconds()/1000)) // in microseconds - arrivalTimes, ok := (*subTimes)[mp.ID] + //times = append(times, float64(m.Delivered.Sub(m.Sent).Nanoseconds()/1000)) // in microsecondsme + id := mp.Records[0].BaseName + time := mp.Records[0].Time + arrivalTimes, ok := (*subTimes)[id] if !ok { clientArrivalTimes := make([]float64, 50) arrivalTimes = &clientArrivalTimes - (*subTimes)[mp.ID] = arrivalTimes + (*subTimes)[id] = arrivalTimes } // remove - fmt.Printf("%d dif %f:\n", i, float64(arrival.Sub(mp.Sent).Nanoseconds()/1000)) - *arrivalTimes = append(*arrivalTimes, float64(arrival.Sub(mp.Sent).Nanoseconds()/1000)) + + *arrivalTimes = append(*arrivalTimes, float64(arrival.Nanosecond())-time) if i == c.MsgCount { *doneSub <- true @@ -188,10 +193,7 @@ func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan case m := <-in: m.Sent = time.Now() m.ID = clientID - m.Payload.Sent = m.Sent - m.Payload.ID = clientID - - pload, err := json.Marshal(m.Payload) + pload, err := c.Message(m.ID, float64(m.Sent.Nanosecond()), c.GetSenML) if err != nil { log.Printf("Failed to marshal payload - %s", err.Error()) } diff --git a/tools/mqtt-bench/cmd/main.go b/tools/mqtt-bench/cmd/main.go index 22d50ed3ca..8db5e33036 100644 --- a/tools/mqtt-bench/cmd/main.go +++ b/tools/mqtt-bench/cmd/main.go @@ -6,7 +6,7 @@ package main import ( "log" - "github.com/mainflux/mainflux/tools/mqtt-bench" + bench "github.com/mainflux/mainflux/tools/mqtt-bench" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -45,6 +45,7 @@ Complete documentation is available at https://mainflux.readthedocs.io`, // MQTT Message rootCmd.PersistentFlags().IntVarP(&bconf.MQTT.Message.Size, "size", "z", 100, "Size of message payload bytes") + rootCmd.PersistentFlags().StringVarP(&bconf.MQTT.Message.Payload, "payload", "l", "{\"n\":\"lon\",\"t\":-4,\"v\":1.3}", "Template message") rootCmd.PersistentFlags().StringVarP(&bconf.MQTT.Message.Format, "format", "f", "text", "Output format: text|json") rootCmd.PersistentFlags().IntVarP(&bconf.MQTT.Message.QoS, "qos", "q", 0, "QoS for published messages, values 0 1 2") rootCmd.PersistentFlags().BoolVarP(&bconf.MQTT.Message.Retain, "retain", "r", false, "Retain mqtt messages") diff --git a/tools/mqtt-bench/templates/config.toml b/tools/mqtt-bench/templates/config.toml index 11895618a1..311b76460c 100644 --- a/tools/mqtt-bench/templates/config.toml +++ b/tools/mqtt-bench/templates/config.toml @@ -1,12 +1,17 @@ [mqtt] [mqtt.broker] - url = "tcp://localhost:1883" + url = "tcp://142.93.118.47:1883" [mqtt.message] size = 100 format = "text" qos = 2 +<<<<<<< HEAD retain = false +======= + retain = true + payload = "" +>>>>>>> add pub-to-sub delivery time measure [mqtt.tls] mtls = false @@ -14,12 +19,21 @@ ca = "ca.crt" [test] +<<<<<<< HEAD pubs = 100 subs = 30 +======= +pubs = 1 +subs = 1 +>>>>>>> add pub-to-sub delivery time measure count = 100 [log] quiet = false [mainflux] -connections_file = "../provision/mfconn.toml" \ No newline at end of file +<<<<<<< HEAD +connections_file = "../provision/mfconn.toml" +======= +connections_file = "connections.toml" +>>>>>>> add pub-to-sub delivery time measure diff --git a/writers/postgres/init.go b/writers/postgres/init.go index aa1c971525..b40a49bfb8 100644 --- a/writers/postgres/init.go +++ b/writers/postgres/init.go @@ -65,7 +65,7 @@ func migrateDB(db *sqlx.DB) error { bool_value BOOL, data_value TEXT, value_sum FLOAT, - time FlOAT, + time FLOAT, update_time FLOAT, link TEXT, PRIMARY KEY (id) From 90616a3bdaefc2ec281cd60e3a9c495c1c1ccc2f Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Tue, 3 Sep 2019 15:58:48 +0200 Subject: [PATCH 10/31] add pub-to-sub delivery time measure Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 25 +++++++------------ tools/mqtt-bench/client.go | 33 +++++++++++++++----------- tools/mqtt-bench/results.go | 12 +++++----- tools/mqtt-bench/templates/config.toml | 18 +++----------- 4 files changed, 37 insertions(+), 51 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 5c7445bb92..fa6fc59bda 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -93,7 +93,7 @@ func Benchmark(cfg Config) { var err error //checkConnection(cfg.MQTT.Broker.URL, 1) - subTimes := make(subTimes) + subsResults := make(subsResults) var caByte []byte if cfg.MQTT.TLS.MTLS { caFile, err := os.Open(cfg.MQTT.TLS.CA) @@ -153,7 +153,7 @@ func Benchmark(cfg Config) { wg.Add(1) - go c.runSubscriber(&wg, &subTimes, &done, &doneSub) + go c.runSubscriber(&wg, &subsResults, &done, &doneSub) } wg.Wait() @@ -210,11 +210,13 @@ func Benchmark(cfg Config) { if k > cfg.Test.Pubs { log.Printf("Something went wrong with messages") } + log.Printf("Receieved result\n") results[k] = result k++ } - case <-done: + case <-doneSub: { + log.Printf("Subscriber done") // every time subscriber receives MsgCount messages it will signal done if j >= cfg.Test.Subs { break @@ -225,7 +227,7 @@ func Benchmark(cfg Config) { } totalTime := time.Now().Sub(start) - totals := calculateTotalResults(results, totalTime, &subTimes) + totals := calculateTotalResults(results, totalTime, &subsResults) if totals == nil { return } @@ -239,22 +241,16 @@ func prepareSenML(sz int, payload string) senml.SenML { t := (float64)(time.Now().Nanosecond()) timeStamp := senml.SenMLRecord{ BaseName: "pub-2019-08-31T12:38:25.139715762+02:00-57", - Name: "timeSent", Value: &t, } tsByte, err := json.Marshal(timeStamp) - if err != nil { + if err != nil || len(payload) == 0 { log.Fatalf("Failed to create test message") } - pload := []byte(payload) - if len(payload) == 0 && sz > len(tsByte) { - pload = make([]byte, sz-len(tsByte)) - } - sml := senml.SenMLRecord{} - err = json.Unmarshal(pload, &sml) + err = json.Unmarshal([]byte(payload), &sml) if err != nil { log.Println("cannot unmarshal payload") } @@ -279,10 +275,6 @@ func prepareSenML(sz int, payload string) senml.SenML { records[i] = sml } - for i := 1; i < n; i++ { - log.Printf("%f\n", records[i].Time) - } - s := senml.SenML{ Records: records, } @@ -298,5 +290,6 @@ func getPayload(cid string, time float64, f func() senml.SenML) ([]byte, error) if err != nil { return nil, err } + log.Printf("pld%s\n", string(payload)) return payload, nil } diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index 52335f543a..036fc8e7d4 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -7,7 +7,6 @@ import ( "crypto/rsa" "crypto/tls" "crypto/x509" - "encoding/json" "fmt" "log" "net" @@ -108,11 +107,11 @@ func (c *Client) runPublisher(r chan *runResults) { } // Subscriber -func (c *Client) runSubscriber(wg *sync.WaitGroup, subTimes *subTimes, done, doneSub *chan bool) { +func (c *Client) runSubscriber(wg *sync.WaitGroup, subsResults *subsResults, done, doneSub *chan bool) { defer wg.Done() // Start subscriber - c.subscribe(wg, subTimes, done, doneSub) + c.subscribe(wg, subsResults, done, doneSub) } func (c *Client) generate(ch chan *message, done chan bool) { @@ -129,7 +128,7 @@ func (c *Client) generate(ch chan *message, done chan bool) { return } -func (c *Client) subscribe(wg *sync.WaitGroup, subTimes *subTimes, done, doneSub *chan bool) { +func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, done, doneSub *chan bool) { clientID := fmt.Sprintf("sub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID) c.ID = clientID @@ -150,27 +149,33 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subTimes *subTimes, done, doneSub i++ //mp := messagePayload{} - mp := senml.SenML{} - err := json.Unmarshal(msg.Payload(), &mp) - arrival := time.Now() + //mp := senml.SenML{} + arrival := time.Now().UnixNano() + log.Printf("pld: %s\n", string(msg.Payload())) + mp, err := senml.Decode(msg.Payload(), senml.JSON) + if err != nil { + log.Printf("Failed to decode message %s\n", err.Error()) + } + //times = append(times, float64(m.Delivered.Sub(m.Sent).Nanoseconds()/1000)) // in microsecondsme id := mp.Records[0].BaseName - time := mp.Records[0].Time - arrivalTimes, ok := (*subTimes)[id] + timeSent := *mp.Records[0].Value + arrivalTimes, ok := (*subsResults)[id] if !ok { clientArrivalTimes := make([]float64, 50) - arrivalTimes = &clientArrivalTimes - (*subTimes)[id] = arrivalTimes + (*subsResults)[id] = arrivalTimes } // remove - fmt.Printf("%d dif %f:\n", i, float64(arrival.Sub(mp.Sent).Nanoseconds()/1000)) - *arrivalTimes = append(*arrivalTimes, float64(arrival.Nanosecond())-time) + *arrivalTimes = append(*arrivalTimes, float64(arrival)-timeSent) - if i == c.MsgCount { + if i == c.MsgCount-1 { + log.Printf("Subscriber %s has finished receiving", c.ID) *doneSub <- true } + log.Printf("recieved msg %d in %f \n", i, float64(arrival)-timeSent) if err != nil { log.Printf("Client %s failed to decode message\n", clientID) @@ -193,7 +198,7 @@ func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan case m := <-in: m.Sent = time.Now() m.ID = clientID - pload, err := c.Message(m.ID, float64(m.Sent.Nanosecond()), c.GetSenML) + pload, err := c.Message(m.ID, float64(m.Sent.UnixNano()), c.GetSenML) if err != nil { log.Printf("Failed to marshal payload - %s", err.Error()) } diff --git a/tools/mqtt-bench/results.go b/tools/mqtt-bench/results.go index cdd188920c..1c1defe8fd 100644 --- a/tools/mqtt-bench/results.go +++ b/tools/mqtt-bench/results.go @@ -30,7 +30,7 @@ type runResults struct { MsgsPerSec float64 `json:"msgs_per_sec"` } -type subTimes map[string](*[]float64) +type subsResults map[string](*[]float64) type totalResults struct { Ratio float64 `json:"ratio"` @@ -50,7 +50,7 @@ type totalResults struct { AvgMsgsPerSec float64 `json:"avg_msgs_per_sec"` } -func calculateTotalResults(results []*runResults, totalTime time.Duration, subTimes *subTimes) *totalResults { +func calculateTotalResults(results []*runResults, totalTime time.Duration, subsResults *subsResults) *totalResults { if results == nil || len(results) < 1 { return nil } @@ -66,13 +66,13 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, subTi totals.MsgTimeMin = results[0].MsgTimeMin for i, res := range results { - if len(*subTimes) > 0 { - times := mat.NewDense(1, len(*((*subTimes)[res.ID])), *((*subTimes)[res.ID])) + if len(*subsResults) > 0 { + times := mat.NewDense(1, len(*((*subsResults)[res.ID])), *((*subsResults)[res.ID])) subTimeRunResults.MsgTimeMin = mat.Min(times) subTimeRunResults.MsgTimeMax = mat.Max(times) - subTimeRunResults.MsgTimeMean = stat.Mean(*((*subTimes)[res.ID]), nil) - subTimeRunResults.MsgTimeStd = stat.StdDev(*((*subTimes)[res.ID]), nil) + subTimeRunResults.MsgTimeMean = stat.Mean(*((*subsResults)[res.ID]), nil) + subTimeRunResults.MsgTimeStd = stat.StdDev(*((*subsResults)[res.ID]), nil) } res.MsgDelTimeMin = subTimeRunResults.MsgTimeMin diff --git a/tools/mqtt-bench/templates/config.toml b/tools/mqtt-bench/templates/config.toml index 311b76460c..083ac3f515 100644 --- a/tools/mqtt-bench/templates/config.toml +++ b/tools/mqtt-bench/templates/config.toml @@ -3,15 +3,11 @@ url = "tcp://142.93.118.47:1883" [mqtt.message] - size = 100 + size = 200 format = "text" qos = 2 -<<<<<<< HEAD retain = false -======= - retain = true - payload = "" ->>>>>>> add pub-to-sub delivery time measure + payload = "{\"n\":\"lon\",\"t\":-4,\"v\":1.3}" [mqtt.tls] mtls = false @@ -19,21 +15,13 @@ ca = "ca.crt" [test] -<<<<<<< HEAD pubs = 100 subs = 30 -======= -pubs = 1 -subs = 1 ->>>>>>> add pub-to-sub delivery time measure count = 100 +count =5 [log] quiet = false [mainflux] -<<<<<<< HEAD connections_file = "../provision/mfconn.toml" -======= -connections_file = "connections.toml" ->>>>>>> add pub-to-sub delivery time measure From 94f992a8610d3d97b27254b3094950e3baf2e411 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Wed, 4 Sep 2019 17:43:11 +0200 Subject: [PATCH 11/31] improve sync between pub and sub Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 45 +++++++++++-------- tools/mqtt-bench/client.go | 90 +++++++++++++++++++++---------------- tools/mqtt-bench/results.go | 31 +++++++++---- 3 files changed, 101 insertions(+), 65 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index fa6fc59bda..978b16367c 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -111,13 +111,14 @@ func Benchmark(cfg Config) { } resCh := make(chan *runResults) - done := make(chan bool) + finishPub := make(chan bool) doneSub := make(chan bool) + startStamp := time.Now() n := len(mf.Channels) var cert tls.Certificate - msg := prepareSenML(cfg.MQTT.Message.Size, cfg.MQTT.Message.Payload) + msg := buildSenML(cfg.MQTT.Message.Size, cfg.MQTT.Message.Payload) getSenML := func() senml.SenML { return msg } @@ -138,7 +139,7 @@ func Benchmark(cfg Config) { BrokerURL: cfg.MQTT.Broker.URL, BrokerUser: mfThing.ThingID, BrokerPass: mfThing.ThingKey, - MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfConn.ChannelID), + MsgTopic: getTopic(mfConn.ChannelID, startStamp), MsgSize: cfg.MQTT.Message.Size, MsgCount: cfg.Test.Count, MsgQoS: byte(cfg.MQTT.Message.QoS), @@ -153,10 +154,11 @@ func Benchmark(cfg Config) { wg.Add(1) - go c.runSubscriber(&wg, &subsResults, &done, &doneSub) + go c.runSubscriber(&wg, &subsResults, cfg.Test.Count*cfg.Test.Pubs, &finishPub, &doneSub) } wg.Wait() + finishPub <- true // Publishers start := time.Now() @@ -177,7 +179,7 @@ func Benchmark(cfg Config) { BrokerURL: cfg.MQTT.Broker.URL, BrokerUser: mfThing.ThingID, BrokerPass: mfThing.ThingKey, - MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfConn.ChannelID), + MsgTopic: getTopic(mfConn.ChannelID, startStamp), MsgSize: cfg.MQTT.Message.Size, MsgCount: cfg.Test.Count, MsgQoS: byte(cfg.MQTT.Message.QoS), @@ -187,7 +189,7 @@ func Benchmark(cfg Config) { CA: caByte, ClientCert: cert, Retain: cfg.MQTT.Message.Retain, - Message: getPayload, + Message: getSenMLPayload, GetSenML: getSenML, } @@ -202,32 +204,36 @@ func Benchmark(cfg Config) { k := 0 j := 0 - for i := 0; i < cfg.Test.Pubs*cfg.Test.Count; i++ { + for i := 0; i < cfg.Test.Pubs+cfg.Test.Subs; i++ { select { case result := <-resCh: { + fmt.Printf("done, results prepared\n") if k > cfg.Test.Pubs { log.Printf("Something went wrong with messages") } - log.Printf("Receieved result\n") results[k] = result k++ + if k == cfg.Test.Pubs { + fmt.Printf("Publishers finished") + finishPub <- true + } } case <-doneSub: { - log.Printf("Subscriber done") // every time subscriber receives MsgCount messages it will signal done if j >= cfg.Test.Subs { break } + fmt.Printf("done with subscribers\n") j++ } } } totalTime := time.Now().Sub(start) - totals := calculateTotalResults(results, totalTime, &subsResults) + totals := calculateTotalResults(results, totalTime, subsResults) if totals == nil { return } @@ -236,9 +242,9 @@ func Benchmark(cfg Config) { printResults(results, totals, cfg.MQTT.Message.Format, cfg.Log.Quiet) } -func prepareSenML(sz int, payload string) senml.SenML { +func buildSenML(sz int, payload string) senml.SenML { - t := (float64)(time.Now().Nanosecond()) + t := (float64)(time.Now().UnixNano()) timeStamp := senml.SenMLRecord{ BaseName: "pub-2019-08-31T12:38:25.139715762+02:00-57", Value: &t, @@ -252,7 +258,7 @@ func prepareSenML(sz int, payload string) senml.SenML { sml := senml.SenMLRecord{} err = json.Unmarshal([]byte(payload), &sml) if err != nil { - log.Println("cannot unmarshal payload") + log.Fatalf("Cannot unmarshal payload") } msgByte, err := json.Marshal(sml) @@ -260,7 +266,7 @@ func prepareSenML(sz int, payload string) senml.SenML { log.Fatalf("Failed to create test message") } - // how many records to make messae long sz bytes + // how many records to make message long sz bytes n := (sz-len(tsByte))/len(msgByte) + 1 if sz < len(tsByte) { n = 1 @@ -271,7 +277,7 @@ func prepareSenML(sz int, payload string) senml.SenML { for i := 1; i < n; i++ { // is this needed // i think we need id to be saved with db writer to t - sml.Time = float64(time.Now().Nanosecond()) + sml.Time = float64(time.Now().UnixNano()) records[i] = sml } @@ -282,14 +288,17 @@ func prepareSenML(sz int, payload string) senml.SenML { return s } -func getPayload(cid string, time float64, f func() senml.SenML) ([]byte, error) { - s := f() +func getSenMLPayload(cid string, time float64, getSenML func() senml.SenML) ([]byte, error) { + s := getSenML() s.Records[0].Value = &time s.Records[0].BaseName = cid payload, err := senml.Encode(s, senml.JSON, senml.OutputOptions{}) if err != nil { return nil, err } - log.Printf("pld%s\n", string(payload)) return payload, nil } + +func getTopic(ch string, start time.Time) string { + return fmt.Sprintf("channels/%s/messages/%d/test", ch, start.UnixNano()) +} diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index 036fc8e7d4..f25f8c899f 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -9,6 +9,7 @@ import ( "crypto/x509" "fmt" "log" + "math" "net" "strings" "sync" @@ -66,28 +67,29 @@ func (c *Client) runPublisher(r chan *runResults) { doneGen := make(chan bool) donePub := make(chan bool) runResults := new(runResults) - - started := time.Now() + Inf := float64(math.Inf(+1)) + var diff float64 // Start generator go c.generate(newMsgs, doneGen) + started := time.Now() // Start publisher go c.publish(newMsgs, pubMsgs, doneGen, donePub) - times := []float64{} - for { select { case m := <-pubMsgs: cid := m.ID if m.Error { runResults.Failures++ + diff = Inf } else { runResults.Successes++ - runResults.ID = cid - times = append(times, float64(m.Delivered.Sub(m.Sent).Nanoseconds()/1000)) // in microseconds + diff = float64(m.Delivered.Sub(m.Sent).Nanoseconds() / 1000) // in microseconds } + runResults.ID = cid + times = append(times, diff) case <-donePub: // Calculate results duration := time.Now().Sub(started) @@ -107,11 +109,8 @@ func (c *Client) runPublisher(r chan *runResults) { } // Subscriber -func (c *Client) runSubscriber(wg *sync.WaitGroup, subsResults *subsResults, done, doneSub *chan bool) { - defer wg.Done() - - // Start subscriber - c.subscribe(wg, subsResults, done, doneSub) +func (c *Client) runSubscriber(wg *sync.WaitGroup, subsResults *subsResults, tot int, finishPub, doneSub *chan bool) { + c.subscribe(wg, subsResults, tot, finishPub, doneSub) } func (c *Client) generate(ch chan *message, done chan bool) { @@ -128,61 +127,69 @@ func (c *Client) generate(ch chan *message, done chan bool) { return } -func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, done, doneSub *chan bool) { +func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int, finishPub, doneSub *chan bool) { + doneRec := make(chan bool) clientID := fmt.Sprintf("sub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID) c.ID = clientID onConnected := func(client mqtt.Client) { + wg.Done() + fmt.Printf("subscribed to %s\n\n", c.MsgTopic) if !c.Quiet { log.Printf("Client %v is connected to the broker %v\n", clientID, c.BrokerURL) } } - connLost := func(client mqtt.Client, reason error) { log.Printf("Client %v had lost connection to the broker: %s\n", c.ID, reason.Error()) - *doneSub <- true + doneRec <- true + } + if c.connect(onConnected, connLost) != nil { + wg.Done() + log.Printf("Client %v failed connecting to the broker\n", c.ID) + doneRec <- true } - c.connect(onConnected, connLost) - i := 0 + i := 0 token := (*c.mqttClient).Subscribe(c.MsgTopic, c.MsgQoS, func(cl mqtt.Client, msg mqtt.Message) { - i++ - //mp := messagePayload{} - //mp := senml.SenML{} - arrival := time.Now().UnixNano() - log.Printf("pld: %s\n", string(msg.Payload())) + arrival := float64(time.Now().UnixNano()) mp, err := senml.Decode(msg.Payload(), senml.JSON) if err != nil { - log.Printf("Failed to decode message %s\n", err.Error()) + //log.Printf("Failed to decode message %s\n", err.Error()) } - - //times = append(times, float64(m.Delivered.Sub(m.Sent).Nanoseconds()/1000)) // in microsecondsme id := mp.Records[0].BaseName timeSent := *mp.Records[0].Value arrivalTimes, ok := (*subsResults)[id] if !ok { - clientArrivalTimes := make([]float64, 50) - arrivalTimes = &clientArrivalTimes + t := []float64{} + arrivalTimes = &t (*subsResults)[id] = arrivalTimes - } - // remove - fmt.Printf("%d dif %f:\n", i, float64(arrival.Sub(mp.Sent).Nanoseconds()/1000)) - - *arrivalTimes = append(*arrivalTimes, float64(arrival)-timeSent) - - if i == c.MsgCount-1 { + a := *arrivalTimes + a = append(a, arrival-timeSent) + (*subsResults)[id] = &a + log.Printf("msg-%d - %s del:%f, snt:%f, dif:%f\n\n", i, string(msg.Payload()), arrival, timeSent, arrival-timeSent) + i++ + if i == tot { log.Printf("Subscriber %s has finished receiving", c.ID) - *doneSub <- true + doneRec <- true } - log.Printf("recieved msg %d in %f \n", i, float64(arrival)-timeSent) - if err != nil { - log.Printf("Client %s failed to decode message\n", clientID) - } }) token.Wait() + for { + select { + case <-doneRec: + *doneSub <- true + return + case <-*finishPub: + fmt.Printf("finished publishing, close sub %s\n", c.ID) + time.Sleep(2 * time.Second) + *doneSub <- true + return + } + } } func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan bool) { @@ -202,6 +209,7 @@ func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan if err != nil { log.Printf("Failed to marshal payload - %s", err.Error()) } + fmt.Printf("pub:%s - %s- %s\n\n", c.ID, c.MsgTopic, string(pload)) token := client.Publish(m.Topic, m.QoS, c.Retain, pload) token.Wait() if token.Error() != nil { @@ -232,6 +240,7 @@ func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan if ctr < c.MsgCount { flushMessages := make([]message, c.MsgCount-ctr) for _, m := range flushMessages { + m.Error = true out <- &m } } @@ -239,7 +248,12 @@ func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan } if c.connect(onConnected, connLost) != nil { - out <- &message{} + log.Printf("Failed to connect %s\n", c.ID) + flushMessages := make([]message, c.MsgCount-ctr) + for _, m := range flushMessages { + m.Error = true + out <- &m + } donePub <- true } diff --git a/tools/mqtt-bench/results.go b/tools/mqtt-bench/results.go index 1c1defe8fd..a93fddde4f 100644 --- a/tools/mqtt-bench/results.go +++ b/tools/mqtt-bench/results.go @@ -50,7 +50,7 @@ type totalResults struct { AvgMsgsPerSec float64 `json:"avg_msgs_per_sec"` } -func calculateTotalResults(results []*runResults, totalTime time.Duration, subsResults *subsResults) *totalResults { +func calculateTotalResults(results []*runResults, totalTime time.Duration, sr subsResults) *totalResults { if results == nil || len(results) < 1 { return nil } @@ -65,14 +65,27 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, subsR totals.TotalRunTime = totalTime.Seconds() totals.MsgTimeMin = results[0].MsgTimeMin + log.Printf("length:%d\n", len(sr)) + for k, v := range sr { + fmt.Printf("%s\n", k) + fmt.Printf("%v\n", v) + for t, k := range *(sr[k]) { + fmt.Printf("%d\n", t) + fmt.Printf("%f\n", k) + } + } for i, res := range results { - if len(*subsResults) > 0 { - times := mat.NewDense(1, len(*((*subsResults)[res.ID])), *((*subsResults)[res.ID])) + + if len(sr) > 0 { + log.Printf("get results %s\n", res.ID) + log.Printf("length:%d %s\n", len(*sr[res.ID]), res.ID) + + times := mat.NewDense(1, len(*sr[res.ID]), *sr[res.ID]) subTimeRunResults.MsgTimeMin = mat.Min(times) subTimeRunResults.MsgTimeMax = mat.Max(times) - subTimeRunResults.MsgTimeMean = stat.Mean(*((*subsResults)[res.ID]), nil) - subTimeRunResults.MsgTimeStd = stat.StdDev(*((*subsResults)[res.ID]), nil) + subTimeRunResults.MsgTimeMean = stat.Mean(*(sr[res.ID]), nil) + subTimeRunResults.MsgTimeStd = stat.StdDev(*(sr[res.ID]), nil) } res.MsgDelTimeMin = subTimeRunResults.MsgTimeMin @@ -144,10 +157,10 @@ func printResults(results []*runResults, totals *totalResults, format string, qu fmt.Printf("Msg time mean (us): %.3f\n", res.MsgTimeMean) fmt.Printf("Msg time std (us): %.3f\n", res.MsgTimeStd) - fmt.Printf("Msg time min (us): %.3f\n", res.MsgDelTimeMin) - fmt.Printf("Msg time max (us): %.3f\n", res.MsgDelTimeMax) - fmt.Printf("Msg time mean (us): %.3f\n", res.MsgDelTimeMean) - fmt.Printf("Msg time std (us): %.3f\n", res.MsgDelTimeStd) + fmt.Printf("Msg del time min (us): %.3f\n", res.MsgDelTimeMin) + fmt.Printf("Msg del time max (us): %.3f\n", res.MsgDelTimeMax) + fmt.Printf("Msg del time mean (us): %.3f\n", res.MsgDelTimeMean) + fmt.Printf("Msg del time std (us): %.3f\n", res.MsgDelTimeStd) fmt.Printf("Bandwidth (msg/sec): %.3f\n\n", res.MsgsPerSec) } From ac921095a731107236851cb891c019c9268183bd Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Wed, 4 Sep 2019 18:27:13 +0200 Subject: [PATCH 12/31] improve sync between pub and sub Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/client.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index f25f8c899f..82a874b72e 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -131,6 +131,22 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int doneRec := make(chan bool) clientID := fmt.Sprintf("sub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID) c.ID = clientID + go func() { + fmt.Printf("go func\n") + for { + select { + case <-doneRec: + fmt.Printf("finished publishing, close sub %s\n", c.ID) + *doneSub <- true + return + case <-*finishPub: + fmt.Printf("finished publishing, close sub %s\n", c.ID) + time.Sleep(2 * time.Second) + *doneSub <- true + return + } + } + }() onConnected := func(client mqtt.Client) { wg.Done() @@ -149,6 +165,7 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int doneRec <- true } + doneRec <- true i := 0 token := (*c.mqttClient).Subscribe(c.MsgTopic, c.MsgQoS, func(cl mqtt.Client, msg mqtt.Message) { @@ -178,18 +195,7 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int }) token.Wait() - for { - select { - case <-doneRec: - *doneSub <- true - return - case <-*finishPub: - fmt.Printf("finished publishing, close sub %s\n", c.ID) - time.Sleep(2 * time.Second) - *doneSub <- true - return - } - } + } func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan bool) { From d7bd2f19404df2f2d214c68957c03980399fa9da Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Wed, 4 Sep 2019 19:04:35 +0200 Subject: [PATCH 13/31] improve sync between pub and sub Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 57 ++++++++++++++++++++++---------------- tools/mqtt-bench/client.go | 3 +- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 978b16367c..2906d6b973 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -158,7 +158,6 @@ func Benchmark(cfg Config) { } wg.Wait() - finishPub <- true // Publishers start := time.Now() @@ -203,34 +202,44 @@ func Benchmark(cfg Config) { } k := 0 - j := 0 - for i := 0; i < cfg.Test.Pubs+cfg.Test.Subs; i++ { - - select { - case result := <-resCh: - { - fmt.Printf("done, results prepared\n") - if k > cfg.Test.Pubs { - log.Printf("Something went wrong with messages") - } - results[k] = result - k++ - if k == cfg.Test.Pubs { - fmt.Printf("Publishers finished") - finishPub <- true + + go func() { + for i := 0; i < cfg.Test.Pubs; i++ { + select { + case result := <-resCh: + { + fmt.Printf("done, results prepared\n") + + results[k] = result + k++ + if k == cfg.Test.Pubs { + fmt.Printf("Publishers finished %d\n", i) + finishPub <- true + } } } - case <-doneSub: - { - // every time subscriber receives MsgCount messages it will signal done - if j >= cfg.Test.Subs { - break + } + }() + + finishSub := make(chan bool) + go func() { + for i := 0; i < cfg.Test.Subs; i++ { + select { + case <-doneSub: + { + fmt.Printf("done with subscribers %d\n", i) + // every time subscriber receives MsgCount messages it will signal done + if i == cfg.Test.Subs-1 { + finishSub <- true + break + } + } - fmt.Printf("done with subscribers\n") - j++ } } - } + }() + <-finishPub + <-finishSub totalTime := time.Now().Sub(start) totals := calculateTotalResults(results, totalTime, subsResults) diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index 82a874b72e..cb66f790ea 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -136,7 +136,7 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int for { select { case <-doneRec: - fmt.Printf("finished publishing, close sub %s\n", c.ID) + fmt.Printf("finished receiveing, close sub %s\n", c.ID) *doneSub <- true return case <-*finishPub: @@ -165,7 +165,6 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int doneRec <- true } - doneRec <- true i := 0 token := (*c.mqttClient).Subscribe(c.MsgTopic, c.MsgQoS, func(cl mqtt.Client, msg mqtt.Message) { From 16d9a5330b9e2e974b65391a02637658057566d7 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 08:56:35 +0200 Subject: [PATCH 14/31] improve sync between pub and sub Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 2906d6b973..4f0ddb6d66 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -201,8 +201,6 @@ func Benchmark(cfg Config) { results = make([]*runResults, cfg.Test.Pubs) } - k := 0 - go func() { for i := 0; i < cfg.Test.Pubs; i++ { select { @@ -210,9 +208,8 @@ func Benchmark(cfg Config) { { fmt.Printf("done, results prepared\n") - results[k] = result - k++ - if k == cfg.Test.Pubs { + results[i] = result + if i == cfg.Test.Pubs-1 { fmt.Printf("Publishers finished %d\n", i) finishPub <- true } @@ -230,6 +227,7 @@ func Benchmark(cfg Config) { fmt.Printf("done with subscribers %d\n", i) // every time subscriber receives MsgCount messages it will signal done if i == cfg.Test.Subs-1 { + fmt.Printf("done with subscribers %d\n", i) finishSub <- true break } From 66a3f3492530d7bcf53a8e03c4ed00d648bc4bec Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 08:57:14 +0200 Subject: [PATCH 15/31] improve sync between pub and sub Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/templates/config.toml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tools/mqtt-bench/templates/config.toml b/tools/mqtt-bench/templates/config.toml index 083ac3f515..b764fc5d3b 100644 --- a/tools/mqtt-bench/templates/config.toml +++ b/tools/mqtt-bench/templates/config.toml @@ -15,9 +15,14 @@ ca = "ca.crt" [test] +<<<<<<< HEAD pubs = 100 subs = 30 count = 100 +======= +pubs = 1 +subs = 1 +>>>>>>> improve sync between pub and sub count =5 [log] From e85defd96129c2729e476be60c7de27bfb6dee7a Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 11:58:31 +0200 Subject: [PATCH 16/31] improve sync between pub and sub Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 53 +++++++++++++++++++++++++++++++++---- tools/mqtt-bench/client.go | 3 +-- tools/mqtt-bench/results.go | 15 +++++++---- 3 files changed, 59 insertions(+), 12 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 4f0ddb6d66..36df1e83c5 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -73,6 +73,12 @@ type mainflux struct { Channels []mfChannel `toml:"channels" mapstructure:"channels"` } +type testMsg struct { + ClientID string + Sent float64 + Payload []byte +} + // Config struct holds benchmark configuration type Config struct { MQTT mqttConfig `toml:"mqtt" mapstructure:"mqtt"` @@ -118,10 +124,18 @@ func Benchmark(cfg Config) { n := len(mf.Channels) var cert tls.Certificate - msg := buildSenML(cfg.MQTT.Message.Size, cfg.MQTT.Message.Payload) + var msg senml.SenML + getPload := getBytePayload + + if len(cfg.MQTT.Message.Payload) > 0 { + msg = buildSenML(cfg.MQTT.Message.Size, cfg.MQTT.Message.Payload) + getPload = getSenMLPayload + + } getSenML := func() senml.SenML { return msg } + // Subscribers for i := 0; i < cfg.Test.Subs; i++ { mfConn := mf.Channels[i%n] @@ -188,7 +202,7 @@ func Benchmark(cfg Config) { CA: caByte, ClientCert: cert, Retain: cfg.MQTT.Message.Retain, - Message: getSenMLPayload, + Message: getPload, GetSenML: getSenML, } @@ -236,7 +250,6 @@ func Benchmark(cfg Config) { } } }() - <-finishPub <-finishSub totalTime := time.Now().Sub(start) @@ -249,13 +262,17 @@ func Benchmark(cfg Config) { printResults(results, totals, cfg.MQTT.Message.Format, cfg.Log.Quiet) } -func buildSenML(sz int, payload string) senml.SenML { - +func getSenMLTimeStamp() senml.SenMLRecord { t := (float64)(time.Now().UnixNano()) timeStamp := senml.SenMLRecord{ BaseName: "pub-2019-08-31T12:38:25.139715762+02:00-57", Value: &t, } + return timeStamp +} + +func buildSenML(sz int, payload string) senml.SenML { + timeStamp := getSenMLTimeStamp() tsByte, err := json.Marshal(timeStamp) if err != nil || len(payload) == 0 { @@ -295,6 +312,32 @@ func buildSenML(sz int, payload string) senml.SenML { return s } +func getBytePayload(cid string, time float64, getSenML func() senml.SenML) ([]byte, error) { + + msg := testMsg{} + msg.ClientID = cid + msg.Sent = time + + tsByte, err := json.Marshal(msg) + if err != nil { + log.Fatalf("Failed to create test message") + } + + // Need to sort this out + m := 500 - len(tsByte) + if m < 0 { + return tsByte, nil + } + add := make([]byte, m) + msg.Payload = add + + b, err := json.Marshal(msg) + if err != nil { + return nil, err + } + return b, nil +} + func getSenMLPayload(cid string, time float64, getSenML func() senml.SenML) ([]byte, error) { s := getSenML() s.Records[0].Value = &time diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index cb66f790ea..77d482be29 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -132,7 +132,6 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int clientID := fmt.Sprintf("sub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID) c.ID = clientID go func() { - fmt.Printf("go func\n") for { select { case <-doneRec: @@ -182,7 +181,7 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int (*subsResults)[id] = arrivalTimes } a := *arrivalTimes - a = append(a, arrival-timeSent) + a = append(a, (arrival - timeSent)) (*subsResults)[id] = &a log.Printf("msg-%d - %s del:%f, snt:%f, dif:%f\n\n", i, string(msg.Payload()), arrival, timeSent, arrival-timeSent) i++ diff --git a/tools/mqtt-bench/results.go b/tools/mqtt-bench/results.go index a93fddde4f..a3df1917fc 100644 --- a/tools/mqtt-bench/results.go +++ b/tools/mqtt-bench/results.go @@ -82,10 +82,10 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, sr su times := mat.NewDense(1, len(*sr[res.ID]), *sr[res.ID]) - subTimeRunResults.MsgTimeMin = mat.Min(times) - subTimeRunResults.MsgTimeMax = mat.Max(times) - subTimeRunResults.MsgTimeMean = stat.Mean(*(sr[res.ID]), nil) - subTimeRunResults.MsgTimeStd = stat.StdDev(*(sr[res.ID]), nil) + subTimeRunResults.MsgTimeMin = mat.Min(times) / 1000 + subTimeRunResults.MsgTimeMax = mat.Max(times) / 1000 + subTimeRunResults.MsgTimeMean = stat.Mean(*(sr[res.ID]), nil) / 1000 + subTimeRunResults.MsgTimeStd = stat.StdDev(*(sr[res.ID]), nil) / 1000 } res.MsgDelTimeMin = subTimeRunResults.MsgTimeMin @@ -155,7 +155,7 @@ func printResults(results []*runResults, totals *totalResults, format string, qu fmt.Printf("Msg time min (us): %.3f\n", res.MsgTimeMin) fmt.Printf("Msg time max (us): %.3f\n", res.MsgTimeMax) fmt.Printf("Msg time mean (us): %.3f\n", res.MsgTimeMean) - fmt.Printf("Msg time std (us): %.3f\n", res.MsgTimeStd) + fmt.Printf("Msg time std (us): %.3f\n\n", res.MsgTimeStd) fmt.Printf("Msg del time min (us): %.3f\n", res.MsgDelTimeMin) fmt.Printf("Msg del time max (us): %.3f\n", res.MsgDelTimeMax) @@ -174,6 +174,11 @@ func printResults(results []*runResults, totals *totalResults, format string, qu fmt.Printf("Msg time mean mean (us): %.3f\n", totals.MsgTimeMeanAvg) fmt.Printf("Msg time mean std (us): %.3f\n", totals.MsgTimeMeanStd) + fmt.Printf("Msg del time min (us): %.3f\n", totals.MsgDelTimeMin) + fmt.Printf("Msg del time max (us): %.3f\n", totals.MsgDelTimeMax) + fmt.Printf("Msg del time mean (us): %.3f\n", totals.MsgDelTimeMeanAvg) + fmt.Printf("Msg del time std (us): %.3f\n", totals.MsgDelTimeMeanStd) + fmt.Printf("Average Bandwidth (msg/sec): %.3f\n", totals.AvgMsgsPerSec) fmt.Printf("Total Bandwidth (msg/sec): %.3f\n", totals.TotalMsgsPerSec) } From 3644847bd242af18ab0ac7aeb3494eb1b981eed8 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 12:42:37 +0200 Subject: [PATCH 17/31] add random payload Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 16 ++++++++-------- tools/mqtt-bench/client.go | 26 +++++++++++++++++++------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 36df1e83c5..79be35e1d9 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -128,12 +128,12 @@ func Benchmark(cfg Config) { getPload := getBytePayload if len(cfg.MQTT.Message.Payload) > 0 { + fmt.Printf("size") msg = buildSenML(cfg.MQTT.Message.Size, cfg.MQTT.Message.Payload) getPload = getSenMLPayload - } - getSenML := func() senml.SenML { - return msg + getSenML := func() *senml.SenML { + return &msg } // Subscribers @@ -163,7 +163,7 @@ func Benchmark(cfg Config) { CA: caByte, ClientCert: cert, Retain: cfg.MQTT.Message.Retain, - Message: nil, + GetSenML: getSenML, } wg.Add(1) @@ -312,7 +312,7 @@ func buildSenML(sz int, payload string) senml.SenML { return s } -func getBytePayload(cid string, time float64, getSenML func() senml.SenML) ([]byte, error) { +func getBytePayload(cid string, time float64, getSenML func() *senml.SenML) ([]byte, error) { msg := testMsg{} msg.ClientID = cid @@ -323,7 +323,7 @@ func getBytePayload(cid string, time float64, getSenML func() senml.SenML) ([]by log.Fatalf("Failed to create test message") } - // Need to sort this out + // TODO - Need to sort this out m := 500 - len(tsByte) if m < 0 { return tsByte, nil @@ -338,8 +338,8 @@ func getBytePayload(cid string, time float64, getSenML func() senml.SenML) ([]by return b, nil } -func getSenMLPayload(cid string, time float64, getSenML func() senml.SenML) ([]byte, error) { - s := getSenML() +func getSenMLPayload(cid string, time float64, getSenML func() *senml.SenML) ([]byte, error) { + s := *getSenML() s.Records[0].Value = &time s.Records[0].BaseName = cid payload, err := senml.Encode(s, senml.JSON, senml.OutputOptions{}) diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index 77d482be29..9ac0470633 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -7,6 +7,7 @@ import ( "crypto/rsa" "crypto/tls" "crypto/x509" + "encoding/json" "fmt" "log" "math" @@ -28,8 +29,8 @@ type Client struct { BrokerUser string BrokerPass string MsgTopic string - Message func(cid string, time float64, f func() senml.SenML) ([]byte, error) - GetSenML func() senml.SenML + Message func(cid string, time float64, f func() *senml.SenML) ([]byte, error) + GetSenML func() *senml.SenML MsgSize int MsgCount int MsgQoS byte @@ -168,12 +169,23 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int token := (*c.mqttClient).Subscribe(c.MsgTopic, c.MsgQoS, func(cl mqtt.Client, msg mqtt.Message) { arrival := float64(time.Now().UnixNano()) - mp, err := senml.Decode(msg.Payload(), senml.JSON) - if err != nil { - //log.Printf("Failed to decode message %s\n", err.Error()) + var id string + var timeSent float64 + + if c.GetSenML() == nil { + mp, err := senml.Decode(msg.Payload(), senml.JSON) + if err != nil && !c.Quiet { + log.Printf("Failed to decode message %s\n", err.Error()) + } + id = mp.Records[0].BaseName + timeSent = *mp.Records[0].Value + } else { + tst := testMsg{} + json.Unmarshal(msg.Payload(), &tst) + id = tst.ClientID + timeSent = tst.Sent } - id := mp.Records[0].BaseName - timeSent := *mp.Records[0].Value + arrivalTimes, ok := (*subsResults)[id] if !ok { t := []float64{} From 4f7d10b45111d7c9b97e844ec70eb2ca6202f969 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 12:43:46 +0200 Subject: [PATCH 18/31] revert changes for config.toml Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/templates/config.toml | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/tools/mqtt-bench/templates/config.toml b/tools/mqtt-bench/templates/config.toml index b764fc5d3b..1b058b4b20 100644 --- a/tools/mqtt-bench/templates/config.toml +++ b/tools/mqtt-bench/templates/config.toml @@ -1,13 +1,12 @@ [mqtt] [mqtt.broker] - url = "tcp://142.93.118.47:1883" + url = "tcp://localhost:1883" [mqtt.message] - size = 200 + size = 100 format = "text" qos = 2 - retain = false - payload = "{\"n\":\"lon\",\"t\":-4,\"v\":1.3}" + retain = true [mqtt.tls] mtls = false @@ -15,18 +14,12 @@ ca = "ca.crt" [test] -<<<<<<< HEAD -pubs = 100 -subs = 30 -count = 100 -======= -pubs = 1 +pubs = 3 subs = 1 ->>>>>>> improve sync between pub and sub -count =5 +count = 100 [log] quiet = false [mainflux] -connections_file = "../provision/mfconn.toml" +connections_file = "../provision/connections.toml" \ No newline at end of file From 92a1975475a60e2a82085ec5684d3bda925ec4dd Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 12:44:10 +0200 Subject: [PATCH 19/31] add random payload Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/cmd/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/mqtt-bench/cmd/main.go b/tools/mqtt-bench/cmd/main.go index 8db5e33036..2c67646624 100644 --- a/tools/mqtt-bench/cmd/main.go +++ b/tools/mqtt-bench/cmd/main.go @@ -45,7 +45,7 @@ Complete documentation is available at https://mainflux.readthedocs.io`, // MQTT Message rootCmd.PersistentFlags().IntVarP(&bconf.MQTT.Message.Size, "size", "z", 100, "Size of message payload bytes") - rootCmd.PersistentFlags().StringVarP(&bconf.MQTT.Message.Payload, "payload", "l", "{\"n\":\"lon\",\"t\":-4,\"v\":1.3}", "Template message") + rootCmd.PersistentFlags().StringVarP(&bconf.MQTT.Message.Payload, "payload", "l", "", "Template message") rootCmd.PersistentFlags().StringVarP(&bconf.MQTT.Message.Format, "format", "f", "text", "Output format: text|json") rootCmd.PersistentFlags().IntVarP(&bconf.MQTT.Message.QoS, "qos", "q", 0, "QoS for published messages, values 0 1 2") rootCmd.PersistentFlags().BoolVarP(&bconf.MQTT.Message.Retain, "retain", "r", false, "Retain mqtt messages") From 63ad958ed934d21eda16f7a037471c622e42672b Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 12:55:22 +0200 Subject: [PATCH 20/31] remove printfs Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 8 +------- tools/mqtt-bench/client.go | 4 ---- tools/mqtt-bench/results.go | 12 ------------ 3 files changed, 1 insertion(+), 23 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 79be35e1d9..430aeabdad 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -98,7 +98,7 @@ func Benchmark(cfg Config) { var wg sync.WaitGroup var err error - //checkConnection(cfg.MQTT.Broker.URL, 1) + checkConnection(cfg.MQTT.Broker.URL, 1) subsResults := make(subsResults) var caByte []byte if cfg.MQTT.TLS.MTLS { @@ -128,7 +128,6 @@ func Benchmark(cfg Config) { getPload := getBytePayload if len(cfg.MQTT.Message.Payload) > 0 { - fmt.Printf("size") msg = buildSenML(cfg.MQTT.Message.Size, cfg.MQTT.Message.Payload) getPload = getSenMLPayload } @@ -220,11 +219,8 @@ func Benchmark(cfg Config) { select { case result := <-resCh: { - fmt.Printf("done, results prepared\n") - results[i] = result if i == cfg.Test.Pubs-1 { - fmt.Printf("Publishers finished %d\n", i) finishPub <- true } } @@ -238,10 +234,8 @@ func Benchmark(cfg Config) { select { case <-doneSub: { - fmt.Printf("done with subscribers %d\n", i) // every time subscriber receives MsgCount messages it will signal done if i == cfg.Test.Subs-1 { - fmt.Printf("done with subscribers %d\n", i) finishSub <- true break } diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index 9ac0470633..0655090f00 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -136,11 +136,9 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int for { select { case <-doneRec: - fmt.Printf("finished receiveing, close sub %s\n", c.ID) *doneSub <- true return case <-*finishPub: - fmt.Printf("finished publishing, close sub %s\n", c.ID) time.Sleep(2 * time.Second) *doneSub <- true return @@ -150,7 +148,6 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int onConnected := func(client mqtt.Client) { wg.Done() - fmt.Printf("subscribed to %s\n\n", c.MsgTopic) if !c.Quiet { log.Printf("Client %v is connected to the broker %v\n", clientID, c.BrokerURL) } @@ -225,7 +222,6 @@ func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan if err != nil { log.Printf("Failed to marshal payload - %s", err.Error()) } - fmt.Printf("pub:%s - %s- %s\n\n", c.ID, c.MsgTopic, string(pload)) token := client.Publish(m.Topic, m.QoS, c.Retain, pload) token.Wait() if token.Error() != nil { diff --git a/tools/mqtt-bench/results.go b/tools/mqtt-bench/results.go index a3df1917fc..8e38d9419f 100644 --- a/tools/mqtt-bench/results.go +++ b/tools/mqtt-bench/results.go @@ -65,21 +65,9 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, sr su totals.TotalRunTime = totalTime.Seconds() totals.MsgTimeMin = results[0].MsgTimeMin - log.Printf("length:%d\n", len(sr)) - for k, v := range sr { - fmt.Printf("%s\n", k) - fmt.Printf("%v\n", v) - for t, k := range *(sr[k]) { - fmt.Printf("%d\n", t) - fmt.Printf("%f\n", k) - } - } for i, res := range results { if len(sr) > 0 { - log.Printf("get results %s\n", res.ID) - log.Printf("length:%d %s\n", len(*sr[res.ID]), res.ID) - times := mat.NewDense(1, len(*sr[res.ID]), *sr[res.ID]) subTimeRunResults.MsgTimeMin = mat.Min(times) / 1000 From 5a82bf4704cbe0896b8d2828b55a59cfd40df4e2 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 13:27:23 +0200 Subject: [PATCH 21/31] add logging Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/client.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index 0655090f00..06053695ad 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -179,8 +179,10 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int } else { tst := testMsg{} json.Unmarshal(msg.Payload(), &tst) + id = tst.ClientID timeSent = tst.Sent + fmt.Printf("recieved %s %f\n", id, timeSent) } arrivalTimes, ok := (*subsResults)[id] @@ -192,7 +194,6 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int a := *arrivalTimes a = append(a, (arrival - timeSent)) (*subsResults)[id] = &a - log.Printf("msg-%d - %s del:%f, snt:%f, dif:%f\n\n", i, string(msg.Payload()), arrival, timeSent, arrival-timeSent) i++ if i == tot { log.Printf("Subscriber %s has finished receiving", c.ID) @@ -325,10 +326,11 @@ func checkConnection(broker string, timeoutSecs int) { host := strings.Trim(s[1], "/") port := s[2] + log.Println("Testing connection...") conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%s", host, port), time.Duration(timeoutSecs)*time.Second) conClose := func() { if conn != nil { - log.Println("Closing connection...") + log.Println("Closing testing connection...") conn.Close() } } From ef4cf2bab69b6ead5eb1e26f240982b9e2f7937a Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 13:56:08 +0200 Subject: [PATCH 22/31] add payload Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/templates/fanin.toml | 1 + tools/mqtt-bench/templates/fanout-mtls.toml | 1 + tools/mqtt-bench/templates/fanout.toml | 1 + 3 files changed, 3 insertions(+) diff --git a/tools/mqtt-bench/templates/fanin.toml b/tools/mqtt-bench/templates/fanin.toml index 96a307ea39..74d45cec2f 100644 --- a/tools/mqtt-bench/templates/fanin.toml +++ b/tools/mqtt-bench/templates/fanin.toml @@ -7,6 +7,7 @@ format = "text" qos = 2 retain = true + payload = "" [mqtt.tls] mtls = false diff --git a/tools/mqtt-bench/templates/fanout-mtls.toml b/tools/mqtt-bench/templates/fanout-mtls.toml index be50ba7901..6725333b2e 100644 --- a/tools/mqtt-bench/templates/fanout-mtls.toml +++ b/tools/mqtt-bench/templates/fanout-mtls.toml @@ -7,6 +7,7 @@ format = "text" qos = 2 retain = true + payload = "" [mqtt.tls] mtls = true diff --git a/tools/mqtt-bench/templates/fanout.toml b/tools/mqtt-bench/templates/fanout.toml index 4f78879fe4..a56a2ac3fb 100644 --- a/tools/mqtt-bench/templates/fanout.toml +++ b/tools/mqtt-bench/templates/fanout.toml @@ -7,6 +7,7 @@ format = "text" qos = 2 retain = true + payload = "" [mqtt.tls] mtls = false From 25b444a8674e61a8d268478e74b3b4f43748a1d4 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 13:56:12 +0200 Subject: [PATCH 23/31] add payload Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/templates/subscribe-mtls.toml | 1 + tools/mqtt-bench/templates/subscribe.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/tools/mqtt-bench/templates/subscribe-mtls.toml b/tools/mqtt-bench/templates/subscribe-mtls.toml index 8949577e49..400a07c92d 100644 --- a/tools/mqtt-bench/templates/subscribe-mtls.toml +++ b/tools/mqtt-bench/templates/subscribe-mtls.toml @@ -7,6 +7,7 @@ format = "text" qos = 2 retain = true + payload = "" [mqtt.tls] mtls = true diff --git a/tools/mqtt-bench/templates/subscribe.toml b/tools/mqtt-bench/templates/subscribe.toml index 84ccf11420..010c3c2e1a 100644 --- a/tools/mqtt-bench/templates/subscribe.toml +++ b/tools/mqtt-bench/templates/subscribe.toml @@ -7,6 +7,7 @@ format = "text" qos = 2 retain = true + payload = "" [mqtt.tls] mtls = false From 1a0194c44c92496ad62f41df1f51d2026298d209 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 13:56:29 +0200 Subject: [PATCH 24/31] rename variable Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 430aeabdad..c98edf389c 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -137,7 +137,7 @@ func Benchmark(cfg Config) { // Subscribers for i := 0; i < cfg.Test.Subs; i++ { - mfConn := mf.Channels[i%n] + mfChan := mf.Channels[i%n] mfThing := mf.Things[i%n] if cfg.MQTT.TLS.MTLS { @@ -152,7 +152,7 @@ func Benchmark(cfg Config) { BrokerURL: cfg.MQTT.Broker.URL, BrokerUser: mfThing.ThingID, BrokerPass: mfThing.ThingKey, - MsgTopic: getTopic(mfConn.ChannelID, startStamp), + MsgTopic: getTopic(mfChan.ChannelID, startStamp), MsgSize: cfg.MQTT.Message.Size, MsgCount: cfg.Test.Count, MsgQoS: byte(cfg.MQTT.Message.QoS), @@ -176,7 +176,7 @@ func Benchmark(cfg Config) { start := time.Now() for i := 0; i < cfg.Test.Pubs; i++ { - mfConn := mf.Channels[i%n] + mfChan := mf.Channels[i%n] mfThing := mf.Things[i%n] if cfg.MQTT.TLS.MTLS { @@ -191,7 +191,7 @@ func Benchmark(cfg Config) { BrokerURL: cfg.MQTT.Broker.URL, BrokerUser: mfThing.ThingID, BrokerPass: mfThing.ThingKey, - MsgTopic: getTopic(mfConn.ChannelID, startStamp), + MsgTopic: getTopic(mfChan.ChannelID, startStamp), MsgSize: cfg.MQTT.Message.Size, MsgCount: cfg.Test.Count, MsgQoS: byte(cfg.MQTT.Message.QoS), From 897302c94fe0005b9acb0a96073d121c83d005f6 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 14:06:04 +0200 Subject: [PATCH 25/31] add payload Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/templates/config.toml | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tools/mqtt-bench/templates/config.toml b/tools/mqtt-bench/templates/config.toml index 1b058b4b20..fba30879bb 100644 --- a/tools/mqtt-bench/templates/config.toml +++ b/tools/mqtt-bench/templates/config.toml @@ -6,7 +6,8 @@ size = 100 format = "text" qos = 2 - retain = true + retain = false + payload = "" [mqtt.tls] mtls = false @@ -14,12 +15,12 @@ ca = "ca.crt" [test] -pubs = 3 -subs = 1 +pubs = 100 +subs = 30 count = 100 [log] quiet = false [mainflux] -connections_file = "../provision/connections.toml" \ No newline at end of file +connections_file = "../provision/mfconn.toml" \ No newline at end of file From 2ce4606633c905073aa48ac009e7b95474ea5b81 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 15:33:57 +0200 Subject: [PATCH 26/31] small changes Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 9 +++--- tools/mqtt-bench/client.go | 66 ++++++++++++++++++-------------------- 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index c98edf389c..71d14684cc 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -124,17 +124,18 @@ func Benchmark(cfg Config) { n := len(mf.Channels) var cert tls.Certificate - var msg senml.SenML + var msg *senml.SenML getPload := getBytePayload if len(cfg.MQTT.Message.Payload) > 0 { - msg = buildSenML(cfg.MQTT.Message.Size, cfg.MQTT.Message.Payload) + m := buildSenML(cfg.MQTT.Message.Size, cfg.MQTT.Message.Payload) + msg = &m getPload = getSenMLPayload } + getSenML := func() *senml.SenML { - return &msg + return msg } - // Subscribers for i := 0; i < cfg.Test.Subs; i++ { mfChan := mf.Channels[i%n] diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index 06053695ad..703cae16cd 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -7,7 +7,6 @@ import ( "crypto/rsa" "crypto/tls" "crypto/x509" - "encoding/json" "fmt" "log" "math" @@ -132,14 +131,17 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int doneRec := make(chan bool) clientID := fmt.Sprintf("sub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID) c.ID = clientID + i := 0 go func() { for { select { case <-doneRec: + log.Printf("Subscriber %s has finished receiving %d", c.ID, i) *doneSub <- true return case <-*finishPub: - time.Sleep(2 * time.Second) + log.Printf("Subscriber %s has finished receiving %d", c.ID, i) + time.Sleep(4 * time.Second) *doneSub <- true return } @@ -162,41 +164,37 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int doneRec <- true } - i := 0 token := (*c.mqttClient).Subscribe(c.MsgTopic, c.MsgQoS, func(cl mqtt.Client, msg mqtt.Message) { - arrival := float64(time.Now().UnixNano()) - var id string - var timeSent float64 - - if c.GetSenML() == nil { - mp, err := senml.Decode(msg.Payload(), senml.JSON) - if err != nil && !c.Quiet { - log.Printf("Failed to decode message %s\n", err.Error()) - } - id = mp.Records[0].BaseName - timeSent = *mp.Records[0].Value - } else { - tst := testMsg{} - json.Unmarshal(msg.Payload(), &tst) - - id = tst.ClientID - timeSent = tst.Sent - fmt.Printf("recieved %s %f\n", id, timeSent) - } - - arrivalTimes, ok := (*subsResults)[id] - if !ok { - t := []float64{} - arrivalTimes = &t - (*subsResults)[id] = arrivalTimes - } - a := *arrivalTimes - a = append(a, (arrival - timeSent)) - (*subsResults)[id] = &a + // arrival := float64(time.Now().UnixNano()) + // var id string + // var timeSent float64 + + // if c.GetSenML() != nil { + // mp, err := senml.Decode(msg.Payload(), senml.JSON) + // if err != nil && !c.Quiet { + // log.Printf("Failed to decode message %s\n", err.Error()) + // } + // id = mp.Records[0].BaseName + // timeSent = *mp.Records[0].Value + // } else { + // tst := testMsg{} + // json.Unmarshal(msg.Payload(), &tst) + // id = tst.ClientID + // timeSent = tst.Sent + // } + + // arrivalTimes, ok := (*subsResults)[id] + // if !ok { + // t := []float64{} + // arrivalTimes = &t + // (*subsResults)[id] = arrivalTimes + // } + // a := *arrivalTimes + // a = append(a, (arrival - timeSent)) + // (*subsResults)[id] = &a i++ if i == tot { - log.Printf("Subscriber %s has finished receiving", c.ID) doneRec <- true } @@ -209,7 +207,7 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan bool) { clientID := fmt.Sprintf("pub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID) c.ID = clientID - ctr := 0 + ctr := 1 onConnected := func(client mqtt.Client) { if !c.Quiet { log.Printf("Client %v is connected to the broker %v\n", clientID, c.BrokerURL) From 26f410fef9bc5ffa0e9cd0c5932c821aa8920b1f Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 16:32:54 +0200 Subject: [PATCH 27/31] refactor sync Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 43 +++++++++--------------- tools/mqtt-bench/client.go | 67 +++++++++++++++++++------------------- 2 files changed, 49 insertions(+), 61 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 71d14684cc..542f38108b 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -118,7 +118,7 @@ func Benchmark(cfg Config) { resCh := make(chan *runResults) finishPub := make(chan bool) - doneSub := make(chan bool) + doneSub := make(chan string) startStamp := time.Now() n := len(mf.Channels) @@ -215,37 +215,24 @@ func Benchmark(cfg Config) { results = make([]*runResults, cfg.Test.Pubs) } - go func() { - for i := 0; i < cfg.Test.Pubs; i++ { - select { - case result := <-resCh: - { - results[i] = result - if i == cfg.Test.Pubs-1 { - finishPub <- true - } - } + for i := 0; i < cfg.Test.Pubs; i++ { + select { + case result := <-resCh: + { + results[i] = result } } - }() - - finishSub := make(chan bool) - go func() { - for i := 0; i < cfg.Test.Subs; i++ { - select { - case <-doneSub: - { - // every time subscriber receives MsgCount messages it will signal done - if i == cfg.Test.Subs-1 { - finishSub <- true - break - } - - } + } + finishPub <- true + + for i := 0; i < cfg.Test.Subs; i++ { + select { + case msg := <-doneSub: + { + fmt.Println(msg) } } - }() - <-finishSub + } totalTime := time.Now().Sub(start) totals := calculateTotalResults(results, totalTime, subsResults) diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index 703cae16cd..bdcd128af9 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -7,6 +7,7 @@ import ( "crypto/rsa" "crypto/tls" "crypto/x509" + "encoding/json" "fmt" "log" "math" @@ -92,6 +93,7 @@ func (c *Client) runPublisher(r chan *runResults) { times = append(times, diff) case <-donePub: // Calculate results + log.Printf("done for %s\n ratio %d %d\n", runResults.ID, runResults.Successes, runResults.Successes+runResults.Failures) duration := time.Now().Sub(started) timeMatrix := mat.NewDense(1, len(times), times) runResults.MsgTimeMin = mat.Min(timeMatrix) @@ -109,7 +111,7 @@ func (c *Client) runPublisher(r chan *runResults) { } // Subscriber -func (c *Client) runSubscriber(wg *sync.WaitGroup, subsResults *subsResults, tot int, finishPub, doneSub *chan bool) { +func (c *Client) runSubscriber(wg *sync.WaitGroup, subsResults *subsResults, tot int, finishPub *chan bool, doneSub *chan string) { c.subscribe(wg, subsResults, tot, finishPub, doneSub) } @@ -127,7 +129,7 @@ func (c *Client) generate(ch chan *message, done chan bool) { return } -func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int, finishPub, doneSub *chan bool) { +func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int, finishPub *chan bool, doneSub *chan string) { doneRec := make(chan bool) clientID := fmt.Sprintf("sub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID) c.ID = clientID @@ -137,12 +139,11 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int select { case <-doneRec: log.Printf("Subscriber %s has finished receiving %d", c.ID, i) - *doneSub <- true + *doneSub <- fmt.Sprintf("Subscriber %s has finished receiving %d", c.ID, i) return case <-*finishPub: - log.Printf("Subscriber %s has finished receiving %d", c.ID, i) - time.Sleep(4 * time.Second) - *doneSub <- true + time.Sleep(2 * time.Second) + *doneSub <- fmt.Sprintf("Subscriber %s has finished receiving %d", c.ID, i) return } } @@ -166,33 +167,33 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int token := (*c.mqttClient).Subscribe(c.MsgTopic, c.MsgQoS, func(cl mqtt.Client, msg mqtt.Message) { - // arrival := float64(time.Now().UnixNano()) - // var id string - // var timeSent float64 - - // if c.GetSenML() != nil { - // mp, err := senml.Decode(msg.Payload(), senml.JSON) - // if err != nil && !c.Quiet { - // log.Printf("Failed to decode message %s\n", err.Error()) - // } - // id = mp.Records[0].BaseName - // timeSent = *mp.Records[0].Value - // } else { - // tst := testMsg{} - // json.Unmarshal(msg.Payload(), &tst) - // id = tst.ClientID - // timeSent = tst.Sent - // } - - // arrivalTimes, ok := (*subsResults)[id] - // if !ok { - // t := []float64{} - // arrivalTimes = &t - // (*subsResults)[id] = arrivalTimes - // } - // a := *arrivalTimes - // a = append(a, (arrival - timeSent)) - // (*subsResults)[id] = &a + arrival := float64(time.Now().UnixNano()) + var id string + var timeSent float64 + + if c.GetSenML() != nil { + mp, err := senml.Decode(msg.Payload(), senml.JSON) + if err != nil && !c.Quiet { + log.Printf("Failed to decode message %s\n", err.Error()) + } + id = mp.Records[0].BaseName + timeSent = *mp.Records[0].Value + } else { + tst := testMsg{} + json.Unmarshal(msg.Payload(), &tst) + id = tst.ClientID + timeSent = tst.Sent + } + + arrivalTimes, ok := (*subsResults)[id] + if !ok { + t := []float64{} + arrivalTimes = &t + (*subsResults)[id] = arrivalTimes + } + a := *arrivalTimes + a = append(a, (arrival - timeSent)) + (*subsResults)[id] = &a i++ if i == tot { doneRec <- true From e69e91d51558ebb1a8543351333a53e6e72752fc Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 17:10:09 +0200 Subject: [PATCH 28/31] refactor sync Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 16 ++-------------- tools/mqtt-bench/client.go | 26 +++----------------------- 2 files changed, 5 insertions(+), 37 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 542f38108b..e160b0a8fe 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -117,8 +117,6 @@ func Benchmark(cfg Config) { } resCh := make(chan *runResults) - finishPub := make(chan bool) - doneSub := make(chan string) startStamp := time.Now() n := len(mf.Channels) @@ -168,7 +166,7 @@ func Benchmark(cfg Config) { wg.Add(1) - go c.runSubscriber(&wg, &subsResults, cfg.Test.Count*cfg.Test.Pubs, &finishPub, &doneSub) + go c.runSubscriber(&wg, &subsResults, cfg.Test.Count*cfg.Test.Pubs) } wg.Wait() @@ -223,18 +221,8 @@ func Benchmark(cfg Config) { } } } - finishPub <- true - - for i := 0; i < cfg.Test.Subs; i++ { - select { - case msg := <-doneSub: - { - fmt.Println(msg) - } - } - } - totalTime := time.Now().Sub(start) + time.Sleep(2 * time.Second) totals := calculateTotalResults(results, totalTime, subsResults) if totals == nil { return diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index bdcd128af9..c3fe18bec4 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -111,8 +111,8 @@ func (c *Client) runPublisher(r chan *runResults) { } // Subscriber -func (c *Client) runSubscriber(wg *sync.WaitGroup, subsResults *subsResults, tot int, finishPub *chan bool, doneSub *chan string) { - c.subscribe(wg, subsResults, tot, finishPub, doneSub) +func (c *Client) runSubscriber(wg *sync.WaitGroup, subsResults *subsResults, tot int) { + c.subscribe(wg, subsResults, tot) } func (c *Client) generate(ch chan *message, done chan bool) { @@ -129,25 +129,10 @@ func (c *Client) generate(ch chan *message, done chan bool) { return } -func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int, finishPub *chan bool, doneSub *chan string) { - doneRec := make(chan bool) +func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int) { clientID := fmt.Sprintf("sub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID) c.ID = clientID i := 0 - go func() { - for { - select { - case <-doneRec: - log.Printf("Subscriber %s has finished receiving %d", c.ID, i) - *doneSub <- fmt.Sprintf("Subscriber %s has finished receiving %d", c.ID, i) - return - case <-*finishPub: - time.Sleep(2 * time.Second) - *doneSub <- fmt.Sprintf("Subscriber %s has finished receiving %d", c.ID, i) - return - } - } - }() onConnected := func(client mqtt.Client) { wg.Done() @@ -157,12 +142,10 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int } connLost := func(client mqtt.Client, reason error) { log.Printf("Client %v had lost connection to the broker: %s\n", c.ID, reason.Error()) - doneRec <- true } if c.connect(onConnected, connLost) != nil { wg.Done() log.Printf("Client %v failed connecting to the broker\n", c.ID) - doneRec <- true } token := (*c.mqttClient).Subscribe(c.MsgTopic, c.MsgQoS, func(cl mqtt.Client, msg mqtt.Message) { @@ -195,9 +178,6 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int a = append(a, (arrival - timeSent)) (*subsResults)[id] = &a i++ - if i == tot { - doneRec <- true - } }) From 10983535980ced20db56e95d6af767c99bad48fe Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Thu, 5 Sep 2019 17:53:33 +0200 Subject: [PATCH 29/31] refactor results Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 1 - tools/mqtt-bench/results.go | 32 +++++++++++++++----------------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index e160b0a8fe..92dd9a95c3 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -222,7 +222,6 @@ func Benchmark(cfg Config) { } } totalTime := time.Now().Sub(start) - time.Sleep(2 * time.Second) totals := calculateTotalResults(results, totalTime, subsResults) if totals == nil { return diff --git a/tools/mqtt-bench/results.go b/tools/mqtt-bench/results.go index 8e38d9419f..8b2917b837 100644 --- a/tools/mqtt-bench/results.go +++ b/tools/mqtt-bench/results.go @@ -55,7 +55,6 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, sr su return nil } totals := new(totalResults) - subTimeRunResults := runResults{} msgTimeMeans := make([]float64, len(results)) msgTimeMeansDelivered := make([]float64, len(results)) msgsPerSecs := make([]float64, len(results)) @@ -68,18 +67,17 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, sr su for i, res := range results { if len(sr) > 0 { - times := mat.NewDense(1, len(*sr[res.ID]), *sr[res.ID]) - - subTimeRunResults.MsgTimeMin = mat.Min(times) / 1000 - subTimeRunResults.MsgTimeMax = mat.Max(times) / 1000 - subTimeRunResults.MsgTimeMean = stat.Mean(*(sr[res.ID]), nil) / 1000 - subTimeRunResults.MsgTimeStd = stat.StdDev(*(sr[res.ID]), nil) / 1000 - + a, ok := sr[res.ID] + if ok { + times := mat.NewDense(1, len(*a), *a) + res.MsgDelTimeMin = mat.Min(times) / 1000 + res.MsgDelTimeMax = mat.Max(times) / 1000 + res.MsgDelTimeMean = stat.Mean(*(sr[res.ID]), nil) / 1000 + res.MsgDelTimeStd = stat.StdDev(*(sr[res.ID]), nil) / 1000 + } else { + log.Println("No data for del ", res.ID) + } } - res.MsgDelTimeMin = subTimeRunResults.MsgTimeMin - res.MsgDelTimeMax = subTimeRunResults.MsgTimeMax - res.MsgDelTimeMean = subTimeRunResults.MsgTimeMean - res.MsgDelTimeStd = subTimeRunResults.MsgTimeStd totals.Successes += res.Successes totals.Failures += res.Failures @@ -93,15 +91,15 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, sr su totals.MsgTimeMax = res.MsgTimeMax } - if subTimeRunResults.MsgTimeMin < totals.MsgDelTimeMin { - totals.MsgDelTimeMin = subTimeRunResults.MsgTimeMin + if res.MsgDelTimeMin < totals.MsgDelTimeMin { + totals.MsgDelTimeMin = res.MsgDelTimeMin } - if subTimeRunResults.MsgTimeMax > totals.MsgDelTimeMax { - totals.MsgDelTimeMax = subTimeRunResults.MsgTimeMax + if res.MsgDelTimeMax > totals.MsgDelTimeMax { + totals.MsgDelTimeMax = res.MsgDelTimeMax } - msgTimeMeansDelivered[i] = subTimeRunResults.MsgTimeMean + msgTimeMeansDelivered[i] = res.MsgDelTimeMean msgTimeMeans[i] = res.MsgTimeMean msgsPerSecs[i] = res.MsgsPerSec runTimes[i] = res.RunTime From b45120444be382429fcd7c28f6cf941b1836862f Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Fri, 6 Sep 2019 11:44:56 +0200 Subject: [PATCH 30/31] change sync and result collecting for sub Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 48 ++++++++++++++++++++++++++++++------- tools/mqtt-bench/client.go | 41 +++++++++++++++++-------------- tools/mqtt-bench/results.go | 21 +++++++--------- 3 files changed, 70 insertions(+), 40 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 92dd9a95c3..76e0932ec9 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -99,7 +99,7 @@ func Benchmark(cfg Config) { var err error checkConnection(cfg.MQTT.Broker.URL, 1) - subsResults := make(subsResults) + var subsResults map[string](*[]float64) var caByte []byte if cfg.MQTT.TLS.MTLS { caFile, err := os.Open(cfg.MQTT.TLS.CA) @@ -107,7 +107,6 @@ func Benchmark(cfg Config) { if err != nil { fmt.Println(err) } - caByte, _ = ioutil.ReadAll(caFile) } @@ -117,6 +116,11 @@ func Benchmark(cfg Config) { } resCh := make(chan *runResults) + donePub := make(chan bool) + finishedPub := make(chan bool) + finishedSub := make(chan bool) + + resR := make(chan *map[string](*[]float64)) startStamp := time.Now() n := len(mf.Channels) @@ -166,7 +170,7 @@ func Benchmark(cfg Config) { wg.Add(1) - go c.runSubscriber(&wg, &subsResults, cfg.Test.Count*cfg.Test.Pubs) + go c.runSubscriber(&wg, cfg.Test.Count*cfg.Test.Pubs, &donePub, &resR) } wg.Wait() @@ -212,15 +216,41 @@ func Benchmark(cfg Config) { if cfg.Test.Pubs > 0 { results = make([]*runResults, cfg.Test.Pubs) } - - for i := 0; i < cfg.Test.Pubs; i++ { - select { - case result := <-resCh: - { - results[i] = result + // Wait for publishers to be don + go func() { + for i := 0; i < cfg.Test.Pubs; i++ { + select { + case result := <-resCh: + { + results[i] = result + } + } + } + finishedPub <- true + }() + + go func() { + for i := 0; i < cfg.Test.Subs; i++ { + select { + case r := <-resR: + { + for k, v := range *r { + subsResults[k] = v + } + } } } + finishedSub <- true + }() + + <-finishedPub + // Send signal to subscribers that all the publishers are done + for i := 0; i < cfg.Test.Subs; i++ { + donePub <- true } + + <-finishedSub + totalTime := time.Now().Sub(start) totals := calculateTotalResults(results, totalTime, subsResults) if totals == nil { diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index c3fe18bec4..8b728960a5 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -93,7 +93,6 @@ func (c *Client) runPublisher(r chan *runResults) { times = append(times, diff) case <-donePub: // Calculate results - log.Printf("done for %s\n ratio %d %d\n", runResults.ID, runResults.Successes, runResults.Successes+runResults.Failures) duration := time.Now().Sub(started) timeMatrix := mat.NewDense(1, len(times), times) runResults.MsgTimeMin = mat.Min(timeMatrix) @@ -111,8 +110,8 @@ func (c *Client) runPublisher(r chan *runResults) { } // Subscriber -func (c *Client) runSubscriber(wg *sync.WaitGroup, subsResults *subsResults, tot int) { - c.subscribe(wg, subsResults, tot) +func (c *Client) runSubscriber(wg *sync.WaitGroup, tot int, donePub *chan bool, res *chan *map[string](*[]float64)) { + c.subscribe(wg, tot, donePub, res) } func (c *Client) generate(ch chan *message, done chan bool) { @@ -129,10 +128,25 @@ func (c *Client) generate(ch chan *message, done chan bool) { return } -func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int) { +func (c *Client) subscribe(wg *sync.WaitGroup, tot int, donePub *chan bool, res *chan *map[string](*[]float64)) { clientID := fmt.Sprintf("sub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID) c.ID = clientID - i := 0 + subsResults := make(map[string](*[]float64), 1) + i := 1 + a := []float64{} + + go func() { + for { + select { + case <-*donePub: + fmt.Printf("finished publishing, close sub %s\n", c.ID) + time.Sleep(2 * time.Second) + subsResults[c.MsgTopic] = &a + *res <- &subsResults + return + } + } + }() onConnected := func(client mqtt.Client) { wg.Done() @@ -151,7 +165,6 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int token := (*c.mqttClient).Subscribe(c.MsgTopic, c.MsgQoS, func(cl mqtt.Client, msg mqtt.Message) { arrival := float64(time.Now().UnixNano()) - var id string var timeSent float64 if c.GetSenML() != nil { @@ -159,30 +172,22 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int if err != nil && !c.Quiet { log.Printf("Failed to decode message %s\n", err.Error()) } - id = mp.Records[0].BaseName timeSent = *mp.Records[0].Value } else { tst := testMsg{} json.Unmarshal(msg.Payload(), &tst) - id = tst.ClientID timeSent = tst.Sent } - arrivalTimes, ok := (*subsResults)[id] - if !ok { - t := []float64{} - arrivalTimes = &t - (*subsResults)[id] = arrivalTimes - } - a := *arrivalTimes a = append(a, (arrival - timeSent)) - (*subsResults)[id] = &a i++ + if i == tot { + subsResults[c.MsgTopic] = &a + *res <- &subsResults + } }) - token.Wait() - } func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan bool) { diff --git a/tools/mqtt-bench/results.go b/tools/mqtt-bench/results.go index 8b2917b837..f9dfbe249a 100644 --- a/tools/mqtt-bench/results.go +++ b/tools/mqtt-bench/results.go @@ -66,19 +66,6 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, sr su totals.MsgTimeMin = results[0].MsgTimeMin for i, res := range results { - if len(sr) > 0 { - a, ok := sr[res.ID] - if ok { - times := mat.NewDense(1, len(*a), *a) - res.MsgDelTimeMin = mat.Min(times) / 1000 - res.MsgDelTimeMax = mat.Max(times) / 1000 - res.MsgDelTimeMean = stat.Mean(*(sr[res.ID]), nil) / 1000 - res.MsgDelTimeStd = stat.StdDev(*(sr[res.ID]), nil) / 1000 - } else { - log.Println("No data for del ", res.ID) - } - } - totals.Successes += res.Successes totals.Failures += res.Failures totals.TotalMsgsPerSec += res.MsgsPerSec @@ -106,6 +93,14 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, sr su bws[i] = res.MsgsPerSec } + for _, v := range sr { + times := mat.NewDense(1, len(*v), *v) + totals.MsgDelTimeMin = mat.Min(times) / 1000 + totals.MsgDelTimeMax = mat.Max(times) / 1000 + totals.MsgDelTimeMeanAvg = stat.Mean(*v, nil) / 1000 + totals.MsgDelTimeMeanStd = stat.StdDev(*v, nil) / 1000 + } + totals.Ratio = float64(totals.Successes) / float64(totals.Successes+totals.Failures) totals.AvgMsgsPerSec = stat.Mean(msgsPerSecs, nil) totals.AvgRunTime = stat.Mean(runTimes, nil) From 72e2f532711bbdfc5c527fc9b94a97612abee4e0 Mon Sep 17 00:00:00 2001 From: Mirko Teodorovic Date: Fri, 6 Sep 2019 11:56:27 +0200 Subject: [PATCH 31/31] fix comments Signed-off-by: Mirko Teodorovic --- tools/mqtt-bench/bench.go | 5 ++--- tools/mqtt-bench/client.go | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/tools/mqtt-bench/bench.go b/tools/mqtt-bench/bench.go index 76e0932ec9..fdc411e524 100644 --- a/tools/mqtt-bench/bench.go +++ b/tools/mqtt-bench/bench.go @@ -289,7 +289,7 @@ func buildSenML(sz int, payload string) senml.SenML { log.Fatalf("Failed to create test message") } - // how many records to make message long sz bytes + // How many records to make message long sz bytes n := (sz-len(tsByte))/len(msgByte) + 1 if sz < len(tsByte) { n = 1 @@ -298,8 +298,7 @@ func buildSenML(sz int, payload string) senml.SenML { records := make([]senml.SenMLRecord, n) records[0] = timeStamp for i := 1; i < n; i++ { - // is this needed - // i think we need id to be saved with db writer to t + // Timestamp for each record when saving to db sml.Time = float64(time.Now().UnixNano()) records[i] = sml } diff --git a/tools/mqtt-bench/client.go b/tools/mqtt-bench/client.go index 8b728960a5..d118b4e0a7 100644 --- a/tools/mqtt-bench/client.go +++ b/tools/mqtt-bench/client.go @@ -139,7 +139,6 @@ func (c *Client) subscribe(wg *sync.WaitGroup, tot int, donePub *chan bool, res for { select { case <-*donePub: - fmt.Printf("finished publishing, close sub %s\n", c.ID) time.Sleep(2 * time.Second) subsResults[c.MsgTopic] = &a *res <- &subsResults