Skip to content

Commit

Permalink
change sync and result collecting for sub
Browse files Browse the repository at this point in the history
Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>
  • Loading branch information
mteodor committed Sep 6, 2019
1 parent 5110bfd commit 1c38893
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 40 deletions.
48 changes: 39 additions & 9 deletions tools/mqtt-bench/bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,14 @@ func Benchmark(cfg Config) {
var err error

checkConnection(cfg.MQTT.Broker.URL, 1)
subsResults := make(subsResults)
var subsResults map[string](*[]float64)
var caByte []byte
if cfg.MQTT.TLS.MTLS {
caFile, err := os.Open(cfg.MQTT.TLS.CA)
defer caFile.Close()
if err != nil {
fmt.Println(err)
}

caByte, _ = ioutil.ReadAll(caFile)
}

Expand All @@ -117,6 +116,11 @@ func Benchmark(cfg Config) {
}

resCh := make(chan *runResults)
donePub := make(chan bool)
finishedPub := make(chan bool)
finishedSub := make(chan bool)

resR := make(chan *map[string](*[]float64))
startStamp := time.Now()

n := len(mf.Channels)
Expand Down Expand Up @@ -166,7 +170,7 @@ func Benchmark(cfg Config) {

wg.Add(1)

go c.runSubscriber(&wg, &subsResults, cfg.Test.Count*cfg.Test.Pubs)
go c.runSubscriber(&wg, cfg.Test.Count*cfg.Test.Pubs, &donePub, &resR)
}

wg.Wait()
Expand Down Expand Up @@ -212,15 +216,41 @@ func Benchmark(cfg Config) {
if cfg.Test.Pubs > 0 {
results = make([]*runResults, cfg.Test.Pubs)
}

for i := 0; i < cfg.Test.Pubs; i++ {
select {
case result := <-resCh:
{
results[i] = result
// Wait for publishers to be don
go func() {
for i := 0; i < cfg.Test.Pubs; i++ {
select {
case result := <-resCh:
{
results[i] = result
}
}
}
finishedPub <- true
}()

go func() {
for i := 0; i < cfg.Test.Subs; i++ {
select {
case r := <-resR:
{
for k, v := range *r {
subsResults[k] = v
}
}
}
}
finishedSub <- true
}()

<-finishedPub
// Send signal to subscribers that all the publishers are done
for i := 0; i < cfg.Test.Subs; i++ {
donePub <- true
}

<-finishedSub

totalTime := time.Now().Sub(start)
totals := calculateTotalResults(results, totalTime, subsResults)
if totals == nil {
Expand Down
41 changes: 23 additions & 18 deletions tools/mqtt-bench/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func (c *Client) runPublisher(r chan *runResults) {
times = append(times, diff)
case <-donePub:
// Calculate results
log.Printf("done for %s\n ratio %d %d\n", runResults.ID, runResults.Successes, runResults.Successes+runResults.Failures)
duration := time.Now().Sub(started)
timeMatrix := mat.NewDense(1, len(times), times)
runResults.MsgTimeMin = mat.Min(timeMatrix)
Expand All @@ -111,8 +110,8 @@ func (c *Client) runPublisher(r chan *runResults) {
}

// Subscriber
func (c *Client) runSubscriber(wg *sync.WaitGroup, subsResults *subsResults, tot int) {
c.subscribe(wg, subsResults, tot)
func (c *Client) runSubscriber(wg *sync.WaitGroup, tot int, donePub *chan bool, res *chan *map[string](*[]float64)) {
c.subscribe(wg, tot, donePub, res)
}

func (c *Client) generate(ch chan *message, done chan bool) {
Expand All @@ -129,10 +128,25 @@ func (c *Client) generate(ch chan *message, done chan bool) {
return
}

func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int) {
func (c *Client) subscribe(wg *sync.WaitGroup, tot int, donePub *chan bool, res *chan *map[string](*[]float64)) {
clientID := fmt.Sprintf("sub-%v-%v", time.Now().Format(time.RFC3339Nano), c.ID)
c.ID = clientID
i := 0
subsResults := make(map[string](*[]float64), 1)
i := 1
a := []float64{}

go func() {
for {
select {
case <-*donePub:
fmt.Printf("finished publishing, close sub %s\n", c.ID)
time.Sleep(2 * time.Second)
subsResults[c.MsgTopic] = &a
*res <- &subsResults
return
}
}
}()

onConnected := func(client mqtt.Client) {
wg.Done()
Expand All @@ -151,38 +165,29 @@ func (c *Client) subscribe(wg *sync.WaitGroup, subsResults *subsResults, tot int
token := (*c.mqttClient).Subscribe(c.MsgTopic, c.MsgQoS, func(cl mqtt.Client, msg mqtt.Message) {

arrival := float64(time.Now().UnixNano())
var id string
var timeSent float64

if c.GetSenML() != nil {
mp, err := senml.Decode(msg.Payload(), senml.JSON)
if err != nil && !c.Quiet {
log.Printf("Failed to decode message %s\n", err.Error())
}
id = mp.Records[0].BaseName
timeSent = *mp.Records[0].Value
} else {
tst := testMsg{}
json.Unmarshal(msg.Payload(), &tst)
id = tst.ClientID
timeSent = tst.Sent
}

arrivalTimes, ok := (*subsResults)[id]
if !ok {
t := []float64{}
arrivalTimes = &t
(*subsResults)[id] = arrivalTimes
}
a := *arrivalTimes
a = append(a, (arrival - timeSent))
(*subsResults)[id] = &a
i++
if i == tot {
subsResults[c.MsgTopic] = &a
*res <- &subsResults
}

})

token.Wait()

}

func (c *Client) publish(in, out chan *message, doneGen chan bool, donePub chan bool) {
Expand Down
21 changes: 8 additions & 13 deletions tools/mqtt-bench/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,6 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, sr su
totals.MsgTimeMin = results[0].MsgTimeMin
for i, res := range results {

if len(sr) > 0 {
a, ok := sr[res.ID]
if ok {
times := mat.NewDense(1, len(*a), *a)
res.MsgDelTimeMin = mat.Min(times) / 1000
res.MsgDelTimeMax = mat.Max(times) / 1000
res.MsgDelTimeMean = stat.Mean(*(sr[res.ID]), nil) / 1000
res.MsgDelTimeStd = stat.StdDev(*(sr[res.ID]), nil) / 1000
} else {
log.Println("No data for del ", res.ID)
}
}

totals.Successes += res.Successes
totals.Failures += res.Failures
totals.TotalMsgsPerSec += res.MsgsPerSec
Expand Down Expand Up @@ -106,6 +93,14 @@ func calculateTotalResults(results []*runResults, totalTime time.Duration, sr su
bws[i] = res.MsgsPerSec
}

for _, v := range sr {
times := mat.NewDense(1, len(*v), *v)
totals.MsgDelTimeMin = mat.Min(times) / 1000
totals.MsgDelTimeMax = mat.Max(times) / 1000
totals.MsgDelTimeMeanAvg = stat.Mean(*v, nil) / 1000
totals.MsgDelTimeMeanStd = stat.StdDev(*v, nil) / 1000
}

totals.Ratio = float64(totals.Successes) / float64(totals.Successes+totals.Failures)
totals.AvgMsgsPerSec = stat.Mean(msgsPerSecs, nil)
totals.AvgRunTime = stat.Mean(runTimes, nil)
Expand Down

0 comments on commit 1c38893

Please sign in to comment.