Skip to content

Commit

Permalink
Add message forwarded check to handler
Browse files Browse the repository at this point in the history
Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>
  • Loading branch information
darkodraskovic committed Jul 13, 2020
1 parent e0eb378 commit fb7d6dc
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
32 changes: 24 additions & 8 deletions pkg/messaging/mqtt/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (

const (
topic = "topic"
data = "payload"
channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b"
subtopic = "engine"
)

var (
msgChan = make(chan messaging.Message)
data = []byte("payload")
)

func TestPubsub(t *testing.T) {
Expand All @@ -39,22 +39,22 @@ func TestPubsub(t *testing.T) {
payload: nil,
},
{
desc: "publish message with string payload",
payload: []byte(data),
desc: "publish message with non-nil payload",
payload: data,
},
{
desc: "publish message with channel",
payload: []byte(data),
payload: data,
channel: channel,
},
{
desc: "publish message with subtopic",
payload: []byte(data),
payload: data,
subtopic: subtopic,
},
{
desc: "publish message with channel and subtopic",
payload: []byte(data),
payload: data,
channel: channel,
subtopic: subtopic,
},
Expand All @@ -69,21 +69,37 @@ func TestPubsub(t *testing.T) {
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

receivedMsg := <-msgChan
// _ = receivedMsg
assert.Equal(t, expectedMsg, receivedMsg, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, expectedMsg, receivedMsg))
}
}

var createds []int64
var created int64

func contains(s []int64, e int64) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}

func handler(msg messaging.Message) error {
msgChan <- msg
if !contains(createds, msg.Created) {
createds = append(createds, created)
msgChan <- msg
}
return nil
}

func message(channel, subtopic string, payload []byte) messaging.Message {
created++
return messaging.Message{
Channel: channel,
Subtopic: subtopic,
Payload: payload,
Created: created,
}
}
func payload(m messaging.Message) ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/messaging/mqtt/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
const (
protocol = "mqtt"
id = "mqtt-publisher"
timeout = time.Second * 3
timeout = time.Second * 5
qos = 1
)

Expand Down
10 changes: 5 additions & 5 deletions pkg/messaging/nats/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
const (
topic = "topic"
chansPrefix = "channels"
data = "payload"
channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b"
subtopic = "engine"
)

var (
msgChan = make(chan messaging.Message)
data = []byte("payload")
)

func TestPubsub(t *testing.T) {
Expand All @@ -42,21 +42,21 @@ func TestPubsub(t *testing.T) {
},
{
desc: "publish message with string payload",
payload: []byte(data),
payload: data,
},
{
desc: "publish message with channel",
payload: []byte(data),
payload: data,
channel: channel,
},
{
desc: "publish message with subtopic",
payload: []byte(data),
payload: data,
subtopic: subtopic,
},
{
desc: "publish message with channel and subtopic",
payload: []byte(data),
payload: data,
channel: channel,
subtopic: subtopic,
},
Expand Down

0 comments on commit fb7d6dc

Please sign in to comment.