Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF-596 - Add subtopic to RawMessage #642

Merged
merged 18 commits into from
Mar 15, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const contentTypeSenml = "application/senml+json"
var cmdMessages = []cobra.Command{
cobra.Command{
Use: "send",
Short: "send <channel_id> <JSON_string> <thing_key>",
Short: "send <channel_id>[.<subtopic>...] <JSON_string> <thing_key>",
Long: `Sends message on the channel`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 3 {
Expand All @@ -32,7 +32,7 @@ var cmdMessages = []cobra.Command{
},
cobra.Command{
Use: "read",
Short: "read <channel_id> <thing_key>",
Short: "read <channel_id>[.<subtopic>...] <thing_key>",
Long: `Reads all channel messages`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 2 {
Expand Down
6 changes: 5 additions & 1 deletion http/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ func LoggingMiddleware(svc mainflux.MessagePublisher, logger log.Logger) mainflu

func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method publish took %s to complete", time.Since(begin))
destChannel := msg.Channel
if msg.Subtopic != "" {
destChannel = fmt.Sprintf("%s.%s", destChannel, msg.Subtopic)
}
message := fmt.Sprintf("Method publish to channel %s took %s to complete", destChannel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand Down
39 changes: 22 additions & 17 deletions http/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"io"
"io/ioutil"
"net/http"
"regexp"
"strings"
"time"

kithttp "github.com/go-kit/kit/transport/http"
Expand All @@ -27,8 +29,9 @@ import (
const protocol = "http"

var (
errMalformedData = errors.New("malformed request data")
auth mainflux.ThingsServiceClient
errMalformedData = errors.New("malformed request data")
auth mainflux.ThingsServiceClient
channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages((/[\w\-]+)*)*\??.*$`)
)

// MakeHandler returns a HTTP handler for API endpoints.
Expand All @@ -41,7 +44,7 @@ func MakeHandler(svc mainflux.MessagePublisher, tc mainflux.ThingsServiceClient)

r := bone.New()

r.Post("/channels/:id/messages", kithttp.NewServer(
r.Post("/channels/*", kithttp.NewServer(
drasko marked this conversation as resolved.
Show resolved Hide resolved
sendMessageEndpoint(svc),
decodeRequest,
encodeResponse,
Expand All @@ -55,7 +58,19 @@ func MakeHandler(svc mainflux.MessagePublisher, tc mainflux.ThingsServiceClient)
}

func decodeRequest(_ context.Context, r *http.Request) (interface{}, error) {
publisher, err := authorize(r)
channelParts := channelPartRegExp.FindStringSubmatch(r.RequestURI)
if len(channelParts) < 2 {
return nil, errMalformedData
}

chanID := channelParts[1]
beres marked this conversation as resolved.
Show resolved Hide resolved
subtopic := strings.Replace(channelParts[2], "/", ".", -1)
drasko marked this conversation as resolved.
Show resolved Hide resolved
if subtopic != "" {
// channelParts[2] contains the subtopic parts starting with char /
subtopic = subtopic[1:]
}

publisher, err := authorize(r, chanID)
if err != nil {
return nil, err
}
Expand All @@ -65,35 +80,25 @@ func decodeRequest(_ context.Context, r *http.Request) (interface{}, error) {
return nil, err
}

channel := bone.GetValue(r, "id")
if channel == "" {
return nil, errMalformedData
}

msg := mainflux.RawMessage{
Publisher: publisher,
Protocol: protocol,
ContentType: r.Header.Get("Content-Type"),
Channel: channel,
Channel: chanID,
Subtopic: subtopic,
Payload: payload,
}

return msg, nil
}

func authorize(r *http.Request) (string, error) {
func authorize(r *http.Request, chanID string) (string, error) {
apiKey := r.Header.Get("Authorization")

if apiKey == "" {
return "", things.ErrUnauthorizedAccess
}

// extract ID from /channels/:id/messages
chanID := bone.GetValue(r, "id")
if chanID == "" {
return "", errMalformedData
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

Expand Down
7 changes: 6 additions & 1 deletion http/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
broker "github.com/nats-io/go-nats"
)

const prefix = "channel"

var _ mainflux.MessagePublisher = (*natsPublisher)(nil)

type natsPublisher struct {
Expand All @@ -33,6 +35,9 @@ func (pub *natsPublisher) Publish(msg mainflux.RawMessage) error {
return err
}

subject := fmt.Sprintf("channel.%s", msg.Channel)
subject := fmt.Sprintf("%s.%s", prefix, msg.Channel)
if msg.Subtopic != "" {
drasko marked this conversation as resolved.
Show resolved Hide resolved
subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic)
}
return pub.nc.Publish(subject, data)
}
Loading