Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF-596 - Add subtopic to RawMessage #642

Merged
merged 18 commits into from
Mar 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ install:
cp ${BUILD_DIR}/* $(GOBIN)

test:
GOCACHE=off go test -v -race -tags test $(shell go list ./... | grep -v 'vendor\|cmd')
go test -v -race -count 1 -tags test $(shell go list ./... | grep -v 'vendor\|cmd')

proto:
protoc --gofast_out=plugins=grpc:. *.proto
Expand Down
4 changes: 2 additions & 2 deletions cli/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const contentTypeSenml = "application/senml+json"
var cmdMessages = []cobra.Command{
cobra.Command{
Use: "send",
Short: "send <channel_id> <JSON_string> <thing_key>",
Short: "send <channel_id>[.<subtopic>...] <JSON_string> <thing_key>",
Long: `Sends message on the channel`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 3 {
Expand All @@ -32,7 +32,7 @@ var cmdMessages = []cobra.Command{
},
cobra.Command{
Use: "read",
Short: "read <channel_id> <thing_key>",
Short: "read <channel_id>[.<subtopic>...] <thing_key>",
Long: `Reads all channel messages`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 2 {
Expand Down
8 changes: 4 additions & 4 deletions coap/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ var (
type Broker interface {
mainflux.MessagePublisher

// Subscribes to channel with specified id and adds subscription to
// Subscribes to channel with specified id, subtopic and adds subscription to
// service map of subscriptions under given ID.
Subscribe(string, string, *Observer) error
Subscribe(string, string, string, *Observer) error
}

// Service specifies coap service API.
Expand Down Expand Up @@ -135,8 +135,8 @@ func (svc *adapterService) Publish(msg mainflux.RawMessage) error {
return nil
}

func (svc *adapterService) Subscribe(chanID, obsID string, o *Observer) error {
if err := svc.pubsub.Subscribe(chanID, obsID, o); err != nil {
func (svc *adapterService) Subscribe(chanID, subtopic, obsID string, o *Observer) error {
if err := svc.pubsub.Subscribe(chanID, subtopic, obsID, o); err != nil {
return ErrFailedSubscription
}

Expand Down
16 changes: 12 additions & 4 deletions coap/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ func LoggingMiddleware(svc coap.Service, logger log.Logger) coap.Service {

func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method publish to channel %s took %s to complete", msg.Channel, time.Since(begin))
destChannel := msg.Channel
if msg.Subtopic != "" {
destChannel = fmt.Sprintf("%s.%s", destChannel, msg.Subtopic)
}
message := fmt.Sprintf("Method publish to channel %s took %s to complete", destChannel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand All @@ -43,17 +47,21 @@ func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {
return lm.svc.Publish(msg)
}

func (lm *loggingMiddleware) Subscribe(chanID, obsID string, o *coap.Observer) (err error) {
func (lm *loggingMiddleware) Subscribe(chanID, subtopic, obsID string, o *coap.Observer) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method subscribe to channel %s for client %s took %s to complete", chanID, obsID, time.Since(begin))
destChannel := chanID
if subtopic != "" {
destChannel = fmt.Sprintf("%s.%s", destChannel, subtopic)
}
message := fmt.Sprintf("Method subscribe to channel %s for client %s took %s to complete", destChannel, obsID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.Subscribe(chanID, obsID, o)
return lm.svc.Subscribe(chanID, subtopic, obsID, o)
}

func (lm *loggingMiddleware) Unsubscribe(obsID string) {
Expand Down
4 changes: 2 additions & 2 deletions coap/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ func (mm *metricsMiddleware) Publish(msg mainflux.RawMessage) error {
return mm.svc.Publish(msg)
}

func (mm *metricsMiddleware) Subscribe(chanID, clientID string, o *coap.Observer) error {
func (mm *metricsMiddleware) Subscribe(chanID, subtopic, clientID string, o *coap.Observer) error {
defer func(begin time.Time) {
mm.counter.With("method", "subscribe").Add(1)
mm.latency.With("method", "subscribe").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.Subscribe(chanID, clientID, o)
return mm.svc.Subscribe(chanID, subtopic, clientID, o)
}

func (mm *metricsMiddleware) Unsubscribe(clientID string) {
Expand Down
18 changes: 17 additions & 1 deletion coap/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"net"
"net/http"
"strings"
"time"

"github.com/go-zoo/bone"
Expand Down Expand Up @@ -68,7 +69,9 @@ func MakeCOAPHandler(svc coap.Service, tc mainflux.ThingsServiceClient, l log.Lo
pingPeriod = pp
r := mux.NewRouter()
r.Handle("/channels/{id}/messages", gocoap.FuncHandler(receive(svc))).Methods(gocoap.POST)
r.Handle("/channels/{id}/messages/{subtopic:.+}", gocoap.FuncHandler(receive(svc))).Methods(gocoap.POST)
r.Handle("/channels/{id}/messages", gocoap.FuncHandler(observe(svc, responses)))
r.Handle("/channels/{id}/messages/{subtopic:.+}", gocoap.FuncHandler(observe(svc, responses)))
r.NotFoundHandler = gocoap.FuncHandler(notFoundHandler)

return r
Expand Down Expand Up @@ -109,6 +112,16 @@ func authorize(msg *gocoap.Message, res *gocoap.Message, cid string) (string, er
return id.GetValue(), nil
}

func fmtSubtopic(subtopic string) string {
if subtopic != "" {
if strings.HasSuffix(subtopic, "/") {
subtopic = subtopic[:len(subtopic)-1]
}
subtopic = strings.Replace(subtopic, "/", ".", -1)
}
return subtopic
}

func receive(svc coap.Service) handler {
return func(conn *net.UDPConn, addr *net.UDPAddr, msg *gocoap.Message) *gocoap.Message {
// By default message is NonConfirmable, so
Expand Down Expand Up @@ -138,6 +151,7 @@ func receive(svc coap.Service) handler {
res.Code = gocoap.NotFound
return res
}
subtopic := fmtSubtopic(mux.Var(msg, "subtopic"))

publisher, err := authorize(msg, res, chanID)
if err != nil {
Expand All @@ -147,6 +161,7 @@ func receive(svc coap.Service) handler {

rawMsg := mainflux.RawMessage{
Channel: chanID,
Subtopic: subtopic,
Publisher: publisher,
Protocol: protocol,
Payload: msg.Payload,
Expand Down Expand Up @@ -176,6 +191,7 @@ func observe(svc coap.Service, responses chan<- string) handler {
res.Code = gocoap.NotFound
return res
}
subtopic := fmtSubtopic(mux.Var(msg, "subtopic"))

anovakovic01 marked this conversation as resolved.
Show resolved Hide resolved
publisher, err := authorize(msg, res, chanID)
if err != nil {
Expand All @@ -198,7 +214,7 @@ func observe(svc coap.Service, responses chan<- string) handler {
if value, ok := msg.Option(gocoap.Observe).(uint32); ok && value == 0 {
res.AddOption(gocoap.Observe, 1)
o := coap.NewObserver()
if err := svc.Subscribe(chanID, obsID, o); err != nil {
if err := svc.Subscribe(chanID, subtopic, obsID, o); err != nil {
logger.Warn(fmt.Sprintf("Failed to subscribe to NATS subject: %s", err))
res.Code = gocoap.InternalServerError
return res
Expand Down
15 changes: 12 additions & 3 deletions coap/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,27 @@ func New(nc *broker.Conn) coap.Broker {
return &natsPublisher{nc}
}

func (pubsub *natsPublisher) fmtSubject(chanID, subtopic string) string {
subject := fmt.Sprintf("%s.%s", prefix, chanID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}
return subject
}

func (pubsub *natsPublisher) Publish(msg mainflux.RawMessage) error {
data, err := proto.Marshal(&msg)
if err != nil {
return err
}

subject := fmt.Sprintf("%s.%s", prefix, msg.Channel)
subject := pubsub.fmtSubject(msg.Channel, msg.Subtopic)
return pubsub.nc.Publish(subject, data)
}

func (pubsub *natsPublisher) Subscribe(chanID, obsID string, observer *coap.Observer) error {
sub, err := pubsub.nc.Subscribe(fmt.Sprintf("%s.%s", prefix, chanID), func(msg *broker.Msg) {
func (pubsub *natsPublisher) Subscribe(chanID, subtopic, obsID string, observer *coap.Observer) error {
subject := pubsub.fmtSubject(chanID, subtopic)
sub, err := pubsub.nc.Subscribe(subject, func(msg *broker.Msg) {
if msg == nil {
return
}
Expand Down
6 changes: 5 additions & 1 deletion http/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ func LoggingMiddleware(svc mainflux.MessagePublisher, logger log.Logger) mainflu

func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method publish took %s to complete", time.Since(begin))
destChannel := msg.Channel
if msg.Subtopic != "" {
destChannel = fmt.Sprintf("%s.%s", destChannel, msg.Subtopic)
}
message := fmt.Sprintf("Method publish to channel %s took %s to complete", destChannel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand Down
73 changes: 51 additions & 22 deletions http/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"regexp"
"strings"
"time"

kithttp "github.com/go-kit/kit/transport/http"
Expand All @@ -27,73 +30,99 @@ import (
const protocol = "http"

var (
errMalformedData = errors.New("malformed request data")
auth mainflux.ThingsServiceClient
errMalformedData = errors.New("malformed request data")
errMalformedSubtopic = errors.New("malformed subtopic")
)

var (
auth mainflux.ThingsServiceClient
channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages((/[^/?]+)*)?(\?.*)?$`)
)

// MakeHandler returns a HTTP handler for API endpoints.
func MakeHandler(svc mainflux.MessagePublisher, tc mainflux.ThingsServiceClient) http.Handler {
auth = tc

r := bone.New()
r.Post("/channels/:id/messages", handshake(svc))
r.Post("/channels/:id/messages/*", handshake(svc))

r.GetFunc("/version", mainflux.Version("http"))
r.Handle("/metrics", promhttp.Handler())

return r
}

func handshake(svc mainflux.MessagePublisher) *kithttp.Server {
opts := []kithttp.ServerOption{
kithttp.ServerErrorEncoder(encodeError),
}

r := bone.New()

r.Post("/channels/:id/messages", kithttp.NewServer(
return kithttp.NewServer(
sendMessageEndpoint(svc),
decodeRequest,
encodeResponse,
opts...,
))
)
}

r.GetFunc("/version", mainflux.Version("http"))
r.Handle("/metrics", promhttp.Handler())
func parseSubtopic(subtopic string) (string, error) {
if subtopic == "" {
return subtopic, nil
}

return r
var err error
subtopic, err = url.QueryUnescape(subtopic)
if err != nil {
return "", errMalformedSubtopic
}
subtopic = strings.Replace(subtopic, "/", ".", -1)
// channelParts[2] contains the subtopic parts starting with char /
subtopic = subtopic[1:]
return subtopic, nil
}

func decodeRequest(_ context.Context, r *http.Request) (interface{}, error) {
publisher, err := authorize(r)
channelParts := channelPartRegExp.FindStringSubmatch(r.RequestURI)
if len(channelParts) < 2 {
return nil, errMalformedData
}

chanID := bone.GetValue(r, "id")
subtopic, err := parseSubtopic(channelParts[2])
if err != nil {
return nil, err
}

payload, err := decodePayload(r.Body)
publisher, err := authorize(r, chanID)
if err != nil {
return nil, err
}

channel := bone.GetValue(r, "id")
if channel == "" {
return nil, errMalformedData
payload, err := decodePayload(r.Body)
if err != nil {
return nil, err
}

msg := mainflux.RawMessage{
Publisher: publisher,
Protocol: protocol,
ContentType: r.Header.Get("Content-Type"),
Channel: channel,
Channel: chanID,
Subtopic: subtopic,
Payload: payload,
}

return msg, nil
}

func authorize(r *http.Request) (string, error) {
func authorize(r *http.Request, chanID string) (string, error) {
apiKey := r.Header.Get("Authorization")

if apiKey == "" {
return "", things.ErrUnauthorizedAccess
}

// extract ID from /channels/:id/messages
chanID := bone.GetValue(r, "id")
if chanID == "" {
return "", errMalformedData
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

Expand Down
7 changes: 6 additions & 1 deletion http/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
broker "github.com/nats-io/go-nats"
)

const prefix = "channel"

var _ mainflux.MessagePublisher = (*natsPublisher)(nil)

type natsPublisher struct {
Expand All @@ -33,6 +35,9 @@ func (pub *natsPublisher) Publish(msg mainflux.RawMessage) error {
return err
}

subject := fmt.Sprintf("channel.%s", msg.Channel)
subject := fmt.Sprintf("%s.%s", prefix, msg.Channel)
if msg.Subtopic != "" {
drasko marked this conversation as resolved.
Show resolved Hide resolved
subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic)
}
return pub.nc.Publish(subject, data)
}
Loading