Skip to content

Commit

Permalink
- fix subtopic name in, when starting with dot, http/ws/mqtt
Browse files Browse the repository at this point in the history
- add some test on readers

Signed-off-by: ale <ale@metaverso.org>
  • Loading branch information
beres committed Mar 8, 2019
1 parent 60e2a79 commit 672194d
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 13 deletions.
6 changes: 5 additions & 1 deletion http/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ func LoggingMiddleware(svc mainflux.MessagePublisher, logger log.Logger) mainflu

func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method publish took %s to complete", time.Since(begin))
destChannel := msg.Channel
if msg.Subtopic != "" {
destChannel += "." + msg.Subtopic
}
message := fmt.Sprintf("Method publish to channel %s took %s to complete", destChannel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand Down
4 changes: 4 additions & 0 deletions http/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func decodeRequest(_ context.Context, r *http.Request) (interface{}, error) {

chanID := channelParts[1]
subtopic := strings.Replace(channelParts[2], "/", ".", -1)
if subtopic != "" {
// channelParts[2] contains the subtopic parts starting with char /
subtopic = subtopic[1:]
}

publisher, err := authorize(r, chanID)
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion http/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
broker "github.com/nats-io/go-nats"
)

const prefix = "channel"

var _ mainflux.MessagePublisher = (*natsPublisher)(nil)

type natsPublisher struct {
Expand All @@ -33,6 +35,9 @@ func (pub *natsPublisher) Publish(msg mainflux.RawMessage) error {
return err
}

subject := fmt.Sprintf("channel.%s%s", msg.Channel, msg.Subtopic)
subject := fmt.Sprintf("%s.%s", prefix, msg.Channel)
if msg.Subtopic != "" {
subject += "." + msg.Subtopic
}
return pub.nc.Publish(subject, data)
}
3 changes: 2 additions & 1 deletion mqtt/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ nats.subscribe('channel.>', {'queue':'mqtts'}, function (msg) {
var m = message.RawMessage.decode(Buffer.from(msg)),
packet, subtopic;
if (m && m.protocol !== 'mqtt') {
subtopic = m.subtopic !== '' ? m.subtopic.replace('.', '/') : ''
subtopic = m.subtopic !== '' ? '/' + m.subtopic.replace('.', '/') : ''

packet = {
cmd: 'publish',
qos: 2,
Expand Down
19 changes: 18 additions & 1 deletion readers/cassandra/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
const (
keyspace = "mainflux"
chanID = "1"
subtopic = "subtopic"
msgsNum = 42
valueFields = 6
)
Expand All @@ -48,6 +49,7 @@ func TestReadAll(t *testing.T) {
count := i % valueFields
switch count {
case 0:
msg.Subtopic = subtopic
msg.Value = &mainflux.Message_FloatValue{FloatValue: 5}
case 1:
msg.Value = &mainflux.Message_BoolValue{BoolValue: false}
Expand Down Expand Up @@ -76,6 +78,7 @@ func TestReadAll(t *testing.T) {
chanID string
offset uint64
limit uint64
andQuery map[string]string
messages []mainflux.Message
}{
"read message page for existing channel": {
Expand All @@ -96,10 +99,24 @@ func TestReadAll(t *testing.T) {
limit: 5,
messages: messages[40:42],
},
"read message with non-existent subtopic": {
chanID: chanID,
offset: 0,
limit: msgsNum,
andQuery: map[string]string{"subtopic": "not-present"},
messages: []mainflux.Message{},
},
fmt.Sprintf("read message with subtopic: %s", subtopic): {
chanID: chanID,
offset: 10,
limit: msgsNum,
andQuery: map[string]string{"subtopic": subtopic},
messages: messages[0:10],
},
}

for desc, tc := range cases {
result := reader.ReadAll(tc.chanID, tc.offset, tc.limit)
result := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.andQuery)
if tc.offset > 0 {
assert.Equal(t, len(tc.messages), len(result), fmt.Sprintf("%s: expected %d messages, got %d", desc, len(tc.messages), len(result)))
continue
Expand Down
12 changes: 11 additions & 1 deletion readers/influxdb/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
const (
testDB = "test"
chanID = "1"
subtopic = "topic"
msgsNum = 101
valueFields = 6
)
Expand Down Expand Up @@ -58,6 +59,7 @@ func TestReadAll(t *testing.T) {
count := i % valueFields
switch count {
case 0:
msg.Subtopic = subtopic
msg.Value = &mainflux.Message_FloatValue{FloatValue: 5}
case 1:
msg.Value = &mainflux.Message_BoolValue{BoolValue: false}
Expand All @@ -84,6 +86,7 @@ func TestReadAll(t *testing.T) {
chanID string
offset uint64
limit uint64
andQuery map[string]string
messages []mainflux.Message
}{
"read message page for existing channel": {
Expand All @@ -110,10 +113,17 @@ func TestReadAll(t *testing.T) {
limit: 10,
messages: messages[95:101],
},
fmt.Sprintf("read message with subtopic: %s", subtopic): {
chanID: chanID,
offset: 0,
limit: 10,
andQuery: map[string]string{"subtopic": subtopic},
messages: messages[0:10],
},
}

for desc, tc := range cases {
result := reader.ReadAll(tc.chanID, tc.offset, tc.limit)
result := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.andQuery)
assert.ElementsMatch(t, tc.messages, result, fmt.Sprintf("%s: expected: %v \n-------------\n got: %v", desc, tc.messages, result))
}
}
19 changes: 18 additions & 1 deletion readers/mongodb/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
testDB = "test"
collection = "mainflux"
chanID = "1"
subtopic = "subtopic"
msgsNum = 42
valueFields = 6
)
Expand Down Expand Up @@ -58,6 +59,7 @@ func TestReadAll(t *testing.T) {
count := i % valueFields
switch count {
case 0:
msg.Subtopic = subtopic
msg.Value = &mainflux.Message_FloatValue{FloatValue: 5}
case 1:
msg.Value = &mainflux.Message_BoolValue{BoolValue: false}
Expand All @@ -83,6 +85,7 @@ func TestReadAll(t *testing.T) {
chanID string
offset uint64
limit uint64
andQuery map[string]string
messages []mainflux.Message
}{
"read message page for existing channel": {
Expand All @@ -103,10 +106,24 @@ func TestReadAll(t *testing.T) {
limit: 10,
messages: messages[40:42],
},
"read message with non-existent subtopic": {
chanID: chanID,
offset: 0,
limit: msgsNum,
andQuery: map[string]string{"subtopic": "not-present"},
messages: []mainflux.Message{},
},
fmt.Sprintf("read message with subtopic: %s", subtopic): {
chanID: chanID,
offset: 0,
limit: 10,
andQuery: map[string]string{"subtopic": subtopic},
messages: messages[0:10],
},
}

for desc, tc := range cases {
result := reader.ReadAll(tc.chanID, tc.offset, tc.limit)
result := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.andQuery)
assert.ElementsMatch(t, tc.messages, result, fmt.Sprintf("%s: expected %v got %v", desc, tc.messages, result))
}
}
13 changes: 10 additions & 3 deletions ws/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ func LoggingMiddleware(svc ws.Service, logger log.Logger) ws.Service {

func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method publish to channel %s took %s to complete", msg.Channel, time.Since(begin))
destChannel := msg.Channel
if msg.Subtopic != "" {
destChannel += "." + msg.Subtopic
}
message := fmt.Sprintf("Method publish to channel %s took %s to complete", destChannel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand All @@ -45,8 +49,11 @@ func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {

func (lm *loggingMiddleware) Subscribe(chanID, subtopic string, channel *ws.Channel) (err error) {
defer func(begin time.Time) {

message := fmt.Sprintf("Method subscribe to channel %s%s took %s to complete", chanID, subtopic, time.Since(begin))
destChannel := chanID
if subtopic != "" {
destChannel += "." + subtopic
}
message := fmt.Sprintf("Method subscribe to channel %s took %s to complete", destChannel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand Down
4 changes: 4 additions & 0 deletions ws/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func authorize(r *http.Request) (subscription, error) {

chanID := channelParts[1]
subtopic := strings.Replace(channelParts[2], "/", ".", -1)
if subtopic != "" {
// channelParts[2] contains the subtopic parts starting with char /
subtopic = subtopic[1:]
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
12 changes: 8 additions & 4 deletions ws/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ func New(nc *broker.Conn) ws.Service {
return &natsPubSub{nc, cb}
}

func (pubsub *natsPubSub) getFullChannelName(chanID, subtopic string) string {
return fmt.Sprintf("%s.%s%s", prefix, chanID, subtopic)
func (pubsub *natsPubSub) getDestChannel(chanID, subtopic string) string {
destChannel := fmt.Sprintf("%s.%s", prefix, chanID)
if subtopic != "" {
destChannel += "." + subtopic
}
return destChannel
}

func (pubsub *natsPubSub) Publish(msg mainflux.RawMessage) error {
Expand All @@ -57,13 +61,13 @@ func (pubsub *natsPubSub) Publish(msg mainflux.RawMessage) error {

// TODO actually if someone subscribe to some channel with jolly chars, publish
// does not work, return an error message or silently fail?
return pubsub.nc.Publish(pubsub.getFullChannelName(msg.Channel, msg.Subtopic), data)
return pubsub.nc.Publish(pubsub.getDestChannel(msg.Channel, msg.Subtopic), data)
}

func (pubsub *natsPubSub) Subscribe(chanID, subtopic string, channel *ws.Channel) error {
var sub *broker.Subscription

sub, err := pubsub.nc.Subscribe(pubsub.getFullChannelName(chanID, subtopic), func(msg *broker.Msg) {
sub, err := pubsub.nc.Subscribe(pubsub.getDestChannel(chanID, subtopic), func(msg *broker.Msg) {
if msg == nil {
return
}
Expand Down

0 comments on commit 672194d

Please sign in to comment.