Skip to content

Commit

Permalink
MF-596 - Add subtopic to RawMessage (absmach#642)
Browse files Browse the repository at this point in the history
* Commit for mainflux-596
Modified and tested:
- cli
- http
- mqtt
- normalizer
- all readers
- sdk messages
- all writers
- ws
Missing:
- coap
- lora

Signed-off-by: ale <ale@metaverso.org>

* - fix subtopic name in, when starting with dot, http/ws/mqtt
- add some test on readers

Signed-off-by: ale <ale@metaverso.org>

* - fix string concatenation
- update http/transport regexp to match subtopic names with only \w-
- update ws/transport regexp like http ones with also the wildcard * and >

Signed-off-by: ale <ale@metaverso.org>

* added subtopic support to coap adapter

Signed-off-by: ale <ale@metaverso.org>

* - update replace functions with replaceall when needed
- renamed getDestChannel to fmtSubject
- update api/transport and ws/transport route to be more readable
- fix mqtt syntax
- renamed func andQuery to query as suggested by @anovakovic01
- have a nice we :)

Signed-off-by: ale <ale@metaverso.org>

* - fix error declaration on ws/nat/publisher
- fix regexp added missing allowed chars - and _ on coap/api/transport
- fix subtopic clean suffix / if present on coap/api/transport
- improve regexp on http and ws /api/transport, now does not accept url that do not strictly match
- add some ws subtopic tests

Signed-off-by: ale <ale@metaverso.org>

* - enabled wildcard chars on coap/api/transport
- allow use special chars on http and ws api/transport

Signed-off-by: ale <ale@metaverso.org>

* - use strings.Replace() insted ReplaceAll()

Signed-off-by: ale <ale@metaverso.org>

* - allow every chars on subtopics
- fix replace error on mqtt

Signed-off-by: ale <ale@metaverso.org>

* fix cassandra test

Signed-off-by: ale <ale@metaverso.org>

* fix ws test with invalid subtopic

Signed-off-by: ale <ale@metaverso.org>

* fix invalid GOCACHE in go1.12, replaced by -count 1, see https://golang.org/doc/go1.10#test

Signed-off-by: ale <ale@metaverso.org>

* - improve regexp on http/ws api/transport
- minor changes

Signed-off-by: ale <ale@metaverso.org>

* - add generic function parseSubtopic on ws/http adapters

Signed-off-by: ale <ale@metaverso.org>

* - add generic function fmtSubtopic on coap adapter

Signed-off-by: ale <ale@metaverso.org>
  • Loading branch information
beres authored and davide83 committed May 13, 2019
1 parent 60c9366 commit 3e79f4f
Show file tree
Hide file tree
Showing 41 changed files with 616 additions and 227 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ install:
cp ${BUILD_DIR}/* $(GOBIN)

test:
GOCACHE=off go test -v -race -tags test $(shell go list ./... | grep -v 'vendor\|cmd')
go test -v -race -count 1 -tags test $(shell go list ./... | grep -v 'vendor\|cmd')

proto:
protoc --gofast_out=plugins=grpc:. *.proto
Expand Down
4 changes: 2 additions & 2 deletions cli/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const contentTypeSenml = "application/senml+json"
var cmdMessages = []cobra.Command{
cobra.Command{
Use: "send",
Short: "send <channel_id> <JSON_string> <thing_key>",
Short: "send <channel_id>[.<subtopic>...] <JSON_string> <thing_key>",
Long: `Sends message on the channel`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 3 {
Expand All @@ -32,7 +32,7 @@ var cmdMessages = []cobra.Command{
},
cobra.Command{
Use: "read",
Short: "read <channel_id> <thing_key>",
Short: "read <channel_id>[.<subtopic>...] <thing_key>",
Long: `Reads all channel messages`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 2 {
Expand Down
8 changes: 4 additions & 4 deletions coap/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ var (
type Broker interface {
mainflux.MessagePublisher

// Subscribes to channel with specified id and adds subscription to
// Subscribes to channel with specified id, subtopic and adds subscription to
// service map of subscriptions under given ID.
Subscribe(string, string, *Observer) error
Subscribe(string, string, string, *Observer) error
}

// Service specifies coap service API.
Expand Down Expand Up @@ -135,8 +135,8 @@ func (svc *adapterService) Publish(msg mainflux.RawMessage) error {
return nil
}

func (svc *adapterService) Subscribe(chanID, obsID string, o *Observer) error {
if err := svc.pubsub.Subscribe(chanID, obsID, o); err != nil {
func (svc *adapterService) Subscribe(chanID, subtopic, obsID string, o *Observer) error {
if err := svc.pubsub.Subscribe(chanID, subtopic, obsID, o); err != nil {
return ErrFailedSubscription
}

Expand Down
16 changes: 12 additions & 4 deletions coap/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ func LoggingMiddleware(svc coap.Service, logger log.Logger) coap.Service {

func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method publish to channel %s took %s to complete", msg.Channel, time.Since(begin))
destChannel := msg.Channel
if msg.Subtopic != "" {
destChannel = fmt.Sprintf("%s.%s", destChannel, msg.Subtopic)
}
message := fmt.Sprintf("Method publish to channel %s took %s to complete", destChannel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand All @@ -43,17 +47,21 @@ func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {
return lm.svc.Publish(msg)
}

func (lm *loggingMiddleware) Subscribe(chanID, obsID string, o *coap.Observer) (err error) {
func (lm *loggingMiddleware) Subscribe(chanID, subtopic, obsID string, o *coap.Observer) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method subscribe to channel %s for client %s took %s to complete", chanID, obsID, time.Since(begin))
destChannel := chanID
if subtopic != "" {
destChannel = fmt.Sprintf("%s.%s", destChannel, subtopic)
}
message := fmt.Sprintf("Method subscribe to channel %s for client %s took %s to complete", destChannel, obsID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.Subscribe(chanID, obsID, o)
return lm.svc.Subscribe(chanID, subtopic, obsID, o)
}

func (lm *loggingMiddleware) Unsubscribe(obsID string) {
Expand Down
4 changes: 2 additions & 2 deletions coap/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ func (mm *metricsMiddleware) Publish(msg mainflux.RawMessage) error {
return mm.svc.Publish(msg)
}

func (mm *metricsMiddleware) Subscribe(chanID, clientID string, o *coap.Observer) error {
func (mm *metricsMiddleware) Subscribe(chanID, subtopic, clientID string, o *coap.Observer) error {
defer func(begin time.Time) {
mm.counter.With("method", "subscribe").Add(1)
mm.latency.With("method", "subscribe").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.Subscribe(chanID, clientID, o)
return mm.svc.Subscribe(chanID, subtopic, clientID, o)
}

func (mm *metricsMiddleware) Unsubscribe(clientID string) {
Expand Down
18 changes: 17 additions & 1 deletion coap/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"net"
"net/http"
"strings"
"time"

"github.com/go-zoo/bone"
Expand Down Expand Up @@ -68,7 +69,9 @@ func MakeCOAPHandler(svc coap.Service, tc mainflux.ThingsServiceClient, l log.Lo
pingPeriod = pp
r := mux.NewRouter()
r.Handle("/channels/{id}/messages", gocoap.FuncHandler(receive(svc))).Methods(gocoap.POST)
r.Handle("/channels/{id}/messages/{subtopic:.+}", gocoap.FuncHandler(receive(svc))).Methods(gocoap.POST)
r.Handle("/channels/{id}/messages", gocoap.FuncHandler(observe(svc, responses)))
r.Handle("/channels/{id}/messages/{subtopic:.+}", gocoap.FuncHandler(observe(svc, responses)))
r.NotFoundHandler = gocoap.FuncHandler(notFoundHandler)

return r
Expand Down Expand Up @@ -109,6 +112,16 @@ func authorize(msg *gocoap.Message, res *gocoap.Message, cid string) (string, er
return id.GetValue(), nil
}

func fmtSubtopic(subtopic string) string {
if subtopic != "" {
if strings.HasSuffix(subtopic, "/") {
subtopic = subtopic[:len(subtopic)-1]
}
subtopic = strings.Replace(subtopic, "/", ".", -1)
}
return subtopic
}

func receive(svc coap.Service) handler {
return func(conn *net.UDPConn, addr *net.UDPAddr, msg *gocoap.Message) *gocoap.Message {
// By default message is NonConfirmable, so
Expand Down Expand Up @@ -138,6 +151,7 @@ func receive(svc coap.Service) handler {
res.Code = gocoap.NotFound
return res
}
subtopic := fmtSubtopic(mux.Var(msg, "subtopic"))

publisher, err := authorize(msg, res, chanID)
if err != nil {
Expand All @@ -147,6 +161,7 @@ func receive(svc coap.Service) handler {

rawMsg := mainflux.RawMessage{
Channel: chanID,
Subtopic: subtopic,
Publisher: publisher,
Protocol: protocol,
Payload: msg.Payload,
Expand Down Expand Up @@ -176,6 +191,7 @@ func observe(svc coap.Service, responses chan<- string) handler {
res.Code = gocoap.NotFound
return res
}
subtopic := fmtSubtopic(mux.Var(msg, "subtopic"))

publisher, err := authorize(msg, res, chanID)
if err != nil {
Expand All @@ -198,7 +214,7 @@ func observe(svc coap.Service, responses chan<- string) handler {
if value, ok := msg.Option(gocoap.Observe).(uint32); ok && value == 0 {
res.AddOption(gocoap.Observe, 1)
o := coap.NewObserver()
if err := svc.Subscribe(chanID, obsID, o); err != nil {
if err := svc.Subscribe(chanID, subtopic, obsID, o); err != nil {
logger.Warn(fmt.Sprintf("Failed to subscribe to NATS subject: %s", err))
res.Code = gocoap.InternalServerError
return res
Expand Down
15 changes: 12 additions & 3 deletions coap/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,27 @@ func New(nc *broker.Conn) coap.Broker {
return &natsPublisher{nc}
}

func (pubsub *natsPublisher) fmtSubject(chanID, subtopic string) string {
subject := fmt.Sprintf("%s.%s", prefix, chanID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}
return subject
}

func (pubsub *natsPublisher) Publish(msg mainflux.RawMessage) error {
data, err := proto.Marshal(&msg)
if err != nil {
return err
}

subject := fmt.Sprintf("%s.%s", prefix, msg.Channel)
subject := pubsub.fmtSubject(msg.Channel, msg.Subtopic)
return pubsub.nc.Publish(subject, data)
}

func (pubsub *natsPublisher) Subscribe(chanID, obsID string, observer *coap.Observer) error {
sub, err := pubsub.nc.Subscribe(fmt.Sprintf("%s.%s", prefix, chanID), func(msg *broker.Msg) {
func (pubsub *natsPublisher) Subscribe(chanID, subtopic, obsID string, observer *coap.Observer) error {
subject := pubsub.fmtSubject(chanID, subtopic)
sub, err := pubsub.nc.Subscribe(subject, func(msg *broker.Msg) {
if msg == nil {
return
}
Expand Down
6 changes: 5 additions & 1 deletion http/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ func LoggingMiddleware(svc mainflux.MessagePublisher, logger log.Logger) mainflu

func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method publish took %s to complete", time.Since(begin))
destChannel := msg.Channel
if msg.Subtopic != "" {
destChannel = fmt.Sprintf("%s.%s", destChannel, msg.Subtopic)
}
message := fmt.Sprintf("Method publish to channel %s took %s to complete", destChannel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand Down
73 changes: 51 additions & 22 deletions http/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"regexp"
"strings"
"time"

kithttp "github.com/go-kit/kit/transport/http"
Expand All @@ -27,73 +30,99 @@ import (
const protocol = "http"

var (
errMalformedData = errors.New("malformed request data")
auth mainflux.ThingsServiceClient
errMalformedData = errors.New("malformed request data")
errMalformedSubtopic = errors.New("malformed subtopic")
)

var (
auth mainflux.ThingsServiceClient
channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages((/[^/?]+)*)?(\?.*)?$`)
)

// MakeHandler returns a HTTP handler for API endpoints.
func MakeHandler(svc mainflux.MessagePublisher, tc mainflux.ThingsServiceClient) http.Handler {
auth = tc

r := bone.New()
r.Post("/channels/:id/messages", handshake(svc))
r.Post("/channels/:id/messages/*", handshake(svc))

r.GetFunc("/version", mainflux.Version("http"))
r.Handle("/metrics", promhttp.Handler())

return r
}

func handshake(svc mainflux.MessagePublisher) *kithttp.Server {
opts := []kithttp.ServerOption{
kithttp.ServerErrorEncoder(encodeError),
}

r := bone.New()

r.Post("/channels/:id/messages", kithttp.NewServer(
return kithttp.NewServer(
sendMessageEndpoint(svc),
decodeRequest,
encodeResponse,
opts...,
))
)
}

r.GetFunc("/version", mainflux.Version("http"))
r.Handle("/metrics", promhttp.Handler())
func parseSubtopic(subtopic string) (string, error) {
if subtopic == "" {
return subtopic, nil
}

return r
var err error
subtopic, err = url.QueryUnescape(subtopic)
if err != nil {
return "", errMalformedSubtopic
}
subtopic = strings.Replace(subtopic, "/", ".", -1)
// channelParts[2] contains the subtopic parts starting with char /
subtopic = subtopic[1:]
return subtopic, nil
}

func decodeRequest(_ context.Context, r *http.Request) (interface{}, error) {
publisher, err := authorize(r)
channelParts := channelPartRegExp.FindStringSubmatch(r.RequestURI)
if len(channelParts) < 2 {
return nil, errMalformedData
}

chanID := bone.GetValue(r, "id")
subtopic, err := parseSubtopic(channelParts[2])
if err != nil {
return nil, err
}

payload, err := decodePayload(r.Body)
publisher, err := authorize(r, chanID)
if err != nil {
return nil, err
}

channel := bone.GetValue(r, "id")
if channel == "" {
return nil, errMalformedData
payload, err := decodePayload(r.Body)
if err != nil {
return nil, err
}

msg := mainflux.RawMessage{
Publisher: publisher,
Protocol: protocol,
ContentType: r.Header.Get("Content-Type"),
Channel: channel,
Channel: chanID,
Subtopic: subtopic,
Payload: payload,
}

return msg, nil
}

func authorize(r *http.Request) (string, error) {
func authorize(r *http.Request, chanID string) (string, error) {
apiKey := r.Header.Get("Authorization")

if apiKey == "" {
return "", things.ErrUnauthorizedAccess
}

// extract ID from /channels/:id/messages
chanID := bone.GetValue(r, "id")
if chanID == "" {
return "", errMalformedData
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

Expand Down
7 changes: 6 additions & 1 deletion http/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
broker "github.com/nats-io/go-nats"
)

const prefix = "channel"

var _ mainflux.MessagePublisher = (*natsPublisher)(nil)

type natsPublisher struct {
Expand All @@ -33,6 +35,9 @@ func (pub *natsPublisher) Publish(msg mainflux.RawMessage) error {
return err
}

subject := fmt.Sprintf("channel.%s", msg.Channel)
subject := fmt.Sprintf("%s.%s", prefix, msg.Channel)
if msg.Subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic)
}
return pub.nc.Publish(subject, data)
}
Loading

0 comments on commit 3e79f4f

Please sign in to comment.