Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Add measuring time from pub to sub #839

Merged
merged 31 commits into from
Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a81f404
refactor code
mteodor Aug 31, 2019
fb5cd12
connect each thing with each channel
mteodor Aug 31, 2019
68003d2
reverting - structure fields must be exported
mteodor Sep 1, 2019
f6e4d23
reverting - structure fields must be exported
mteodor Sep 1, 2019
1a12835
revert some names
mteodor Sep 1, 2019
1c4a008
move meausuring time start
mteodor Sep 1, 2019
b276e84
add pub-to-sub delivery time measure
mteodor Sep 1, 2019
b3ed709
add pub-to-sub delivery time measure
mteodor Sep 2, 2019
def3917
add pub-to-sub delivery time measure
mteodor Sep 3, 2019
90616a3
add pub-to-sub delivery time measure
mteodor Sep 3, 2019
94f992a
improve sync between pub and sub
mteodor Sep 4, 2019
ac92109
improve sync between pub and sub
mteodor Sep 4, 2019
d7bd2f1
improve sync between pub and sub
mteodor Sep 4, 2019
16d9a53
improve sync between pub and sub
mteodor Sep 5, 2019
66a3f34
improve sync between pub and sub
mteodor Sep 5, 2019
e85defd
improve sync between pub and sub
mteodor Sep 5, 2019
3644847
add random payload
mteodor Sep 5, 2019
4f7d10b
revert changes for config.toml
mteodor Sep 5, 2019
92a1975
add random payload
mteodor Sep 5, 2019
63ad958
remove printfs
mteodor Sep 5, 2019
5a82bf4
add logging
mteodor Sep 5, 2019
ef4cf2b
add payload
mteodor Sep 5, 2019
25b444a
add payload
mteodor Sep 5, 2019
1a0194c
rename variable
mteodor Sep 5, 2019
897302c
add payload
mteodor Sep 5, 2019
2ce4606
small changes
mteodor Sep 5, 2019
26f410f
refactor sync
mteodor Sep 5, 2019
e69e91d
refactor sync
mteodor Sep 5, 2019
1098353
refactor results
mteodor Sep 5, 2019
b451204
change sync and result collecting for sub
mteodor Sep 6, 2019
72e2f53
fix comments
mteodor Sep 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 165 additions & 20 deletions tools/mqtt-bench/bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package bench

import (
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"log"
Expand All @@ -14,6 +15,7 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/cisco/senml"
)

// Keep struct names exported, otherwise Viper unmarshaling won't work
Expand All @@ -22,10 +24,11 @@ type mqttBrokerConfig struct {
}

type mqttMessageConfig struct {
Size int `toml:"size" mapstructure:"size"`
Format string `toml:"format" mapstructure:"format"`
QoS int `toml:"qos" mapstructure:"qos"`
Retain bool `toml:"retain" mapstructure:"retain"`
Size int `toml:"size" mapstructure:"size"`
Payload string `toml:"payload" mapstructure:"payload"`
Format string `toml:"format" mapstructure:"format"`
QoS int `toml:"qos" mapstructure:"qos"`
Retain bool `toml:"retain" mapstructure:"retain"`
}

type mqttTLSConfig struct {
Expand Down Expand Up @@ -70,6 +73,12 @@ type mainflux struct {
Channels []mfChannel `toml:"channels" mapstructure:"channels"`
}

type testMsg struct {
ClientID string
Sent float64
Payload []byte
}

// Config struct holds benchmark configuration
type Config struct {
MQTT mqttConfig `toml:"mqtt" mapstructure:"mqtt"`
Expand All @@ -90,34 +99,48 @@ func Benchmark(cfg Config) {
var err error

checkConnection(cfg.MQTT.Broker.URL, 1)
subTimes := make(subTimes)
var subsResults map[string](*[]float64)
var caByte []byte
if cfg.MQTT.TLS.MTLS {
caFile, err := os.Open(cfg.MQTT.TLS.CA)
defer caFile.Close()
if err != nil {
fmt.Println(err)
}

caByte, _ = ioutil.ReadAll(caFile)
}

payload := string(make([]byte, cfg.MQTT.Message.Size))

mf := mainflux{}
if _, err := toml.DecodeFile(cfg.Mf.ConnFile, &mf); err != nil {
log.Fatalf("Cannot load Mainflux connections config %s \nuse tools/provision to create file", cfg.Mf.ConnFile)
}

resCh := make(chan *runResults)
done := make(chan bool)
donePub := make(chan bool)
finishedPub := make(chan bool)
finishedSub := make(chan bool)

resR := make(chan *map[string](*[]float64))
startStamp := time.Now()

n := len(mf.Channels)
var cert tls.Certificate

var msg *senml.SenML
getPload := getBytePayload

if len(cfg.MQTT.Message.Payload) > 0 {
m := buildSenML(cfg.MQTT.Message.Size, cfg.MQTT.Message.Payload)
msg = &m
getPload = getSenMLPayload
}

getSenML := func() *senml.SenML {
return msg
}
// Subscribers
for i := 0; i < cfg.Test.Subs; i++ {
mfChann := mf.Channels[i%n]
mfChan := mf.Channels[i%n]
mfThing := mf.Things[i%n]

if cfg.MQTT.TLS.MTLS {
Expand All @@ -132,7 +155,7 @@ func Benchmark(cfg Config) {
BrokerURL: cfg.MQTT.Broker.URL,
BrokerUser: mfThing.ThingID,
BrokerPass: mfThing.ThingKey,
MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfChann.ChannelID),
MsgTopic: getTopic(mfChan.ChannelID, startStamp),
MsgSize: cfg.MQTT.Message.Size,
MsgCount: cfg.Test.Count,
MsgQoS: byte(cfg.MQTT.Message.QoS),
Expand All @@ -142,20 +165,21 @@ func Benchmark(cfg Config) {
CA: caByte,
ClientCert: cert,
Retain: cfg.MQTT.Message.Retain,
Message: payload,
GetSenML: getSenML,
}

wg.Add(1)

go c.runSubscriber(&wg, &subTimes, &done)
go c.runSubscriber(&wg, cfg.Test.Count*cfg.Test.Pubs, &donePub, &resR)
}

wg.Wait()

start := time.Now()
// Publishers
start := time.Now()

for i := 0; i < cfg.Test.Pubs; i++ {
mfChann := mf.Channels[i%n]
mfChan := mf.Channels[i%n]
mfThing := mf.Things[i%n]

if cfg.MQTT.TLS.MTLS {
Expand All @@ -170,7 +194,7 @@ func Benchmark(cfg Config) {
BrokerURL: cfg.MQTT.Broker.URL,
BrokerUser: mfThing.ThingID,
BrokerPass: mfThing.ThingKey,
MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfChann.ChannelID),
MsgTopic: getTopic(mfChan.ChannelID, startStamp),
MsgSize: cfg.MQTT.Message.Size,
MsgCount: cfg.Test.Count,
MsgQoS: byte(cfg.MQTT.Message.QoS),
Expand All @@ -180,7 +204,8 @@ func Benchmark(cfg Config) {
CA: caByte,
ClientCert: cert,
Retain: cfg.MQTT.Message.Retain,
Message: payload,
Message: getPload,
GetSenML: getSenML,
}

go c.runPublisher(resCh)
Expand All @@ -191,17 +216,137 @@ func Benchmark(cfg Config) {
if cfg.Test.Pubs > 0 {
results = make([]*runResults, cfg.Test.Pubs)
}
// Wait for publishers to be don
go func() {
for i := 0; i < cfg.Test.Pubs; i++ {
select {
case result := <-resCh:
{
results[i] = result
}
}
}
finishedPub <- true
}()

go func() {
for i := 0; i < cfg.Test.Subs; i++ {
select {
case r := <-resR:
{
for k, v := range *r {
subsResults[k] = v
}
}
}
}
finishedSub <- true
}()

for i := 0; i < cfg.Test.Pubs; i++ {
results[i] = <-resCh
<-finishedPub
// Send signal to subscribers that all the publishers are done
for i := 0; i < cfg.Test.Subs; i++ {
donePub <- true
}

<-finishedSub

totalTime := time.Now().Sub(start)
totals := calculateTotalResults(results, totalTime, &subTimes)
totals := calculateTotalResults(results, totalTime, subsResults)
if totals == nil {
return
}

// Print sats
printResults(results, totals, cfg.MQTT.Message.Format, cfg.Log.Quiet)
}

func getSenMLTimeStamp() senml.SenMLRecord {
t := (float64)(time.Now().UnixNano())
timeStamp := senml.SenMLRecord{
BaseName: "pub-2019-08-31T12:38:25.139715762+02:00-57",
Value: &t,
}
return timeStamp
}

func buildSenML(sz int, payload string) senml.SenML {
timeStamp := getSenMLTimeStamp()

tsByte, err := json.Marshal(timeStamp)
if err != nil || len(payload) == 0 {
log.Fatalf("Failed to create test message")
}

sml := senml.SenMLRecord{}
err = json.Unmarshal([]byte(payload), &sml)
if err != nil {
log.Fatalf("Cannot unmarshal payload")
}

msgByte, err := json.Marshal(sml)
if err != nil {
log.Fatalf("Failed to create test message")
}

// How many records to make message long sz bytes
n := (sz-len(tsByte))/len(msgByte) + 1
if sz < len(tsByte) {
n = 1
}

records := make([]senml.SenMLRecord, n)
records[0] = timeStamp
for i := 1; i < n; i++ {
// Timestamp for each record when saving to db
sml.Time = float64(time.Now().UnixNano())
records[i] = sml
}

s := senml.SenML{
Records: records,
}

return s
}

func getBytePayload(cid string, time float64, getSenML func() *senml.SenML) ([]byte, error) {

msg := testMsg{}
msg.ClientID = cid
msg.Sent = time

tsByte, err := json.Marshal(msg)
if err != nil {
log.Fatalf("Failed to create test message")
}

// TODO - Need to sort this out
m := 500 - len(tsByte)
if m < 0 {
return tsByte, nil
}
add := make([]byte, m)
msg.Payload = add

b, err := json.Marshal(msg)
if err != nil {
return nil, err
}
return b, nil
}

func getSenMLPayload(cid string, time float64, getSenML func() *senml.SenML) ([]byte, error) {
s := *getSenML()
s.Records[0].Value = &time
s.Records[0].BaseName = cid
payload, err := senml.Encode(s, senml.JSON, senml.OutputOptions{})
if err != nil {
return nil, err
}
return payload, nil
}

func getTopic(ch string, start time.Time) string {
return fmt.Sprintf("channels/%s/messages/%d/test", ch, start.UnixNano())
}
Loading