Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF-475 - Replace increment ID with UUID #490

Merged
merged 12 commits into from
Dec 5, 2018
4 changes: 2 additions & 2 deletions coap/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Broker interface {

// Subscribes to channel with specified id and adds subscription to
// service map of subscriptions under given ID.
Subscribe(uint64, string, *Observer) error
Subscribe(string, string, *Observer) error
}

// Service specifies coap service API.
Expand Down Expand Up @@ -135,7 +135,7 @@ func (svc *adapterService) Publish(msg mainflux.RawMessage) error {
return nil
}

func (svc *adapterService) Subscribe(chanID uint64, obsID string, o *Observer) error {
func (svc *adapterService) Subscribe(chanID, obsID string, o *Observer) error {
if err := svc.pubsub.Subscribe(chanID, obsID, o); err != nil {
return ErrFailedSubscription
}
Expand Down
6 changes: 3 additions & 3 deletions coap/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func LoggingMiddleware(svc coap.Service, logger log.Logger) coap.Service {

func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method publish to channel %d took %s to complete", msg.Channel, time.Since(begin))
message := fmt.Sprintf("Method publish to channel %s took %s to complete", msg.Channel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand All @@ -43,9 +43,9 @@ func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {
return lm.svc.Publish(msg)
}

func (lm *loggingMiddleware) Subscribe(chanID uint64, obsID string, o *coap.Observer) (err error) {
func (lm *loggingMiddleware) Subscribe(chanID, obsID string, o *coap.Observer) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method subscribe to channel %d for client %s took %s to complete", chanID, obsID, time.Since(begin))
message := fmt.Sprintf("Method subscribe to channel %s for client %s took %s to complete", chanID, obsID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand Down
2 changes: 1 addition & 1 deletion coap/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (mm *metricsMiddleware) Publish(msg mainflux.RawMessage) error {
return mm.svc.Publish(msg)
}

func (mm *metricsMiddleware) Subscribe(chanID uint64, clientID string, o *coap.Observer) error {
func (mm *metricsMiddleware) Subscribe(chanID, clientID string, o *coap.Observer) error {
defer func(begin time.Time) {
mm.counter.With("method", "subscribe").Add(1)
mm.latency.With("method", "subscribe").Observe(time.Since(begin).Seconds())
Expand Down
24 changes: 10 additions & 14 deletions coap/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"fmt"
"net"
"net/http"
"strconv"
"time"

"github.com/go-zoo/bone"
Expand Down Expand Up @@ -75,7 +74,7 @@ func MakeCOAPHandler(svc coap.Service, tc mainflux.ThingsServiceClient, l log.Lo
return r
}

func authorize(msg *gocoap.Message, res *gocoap.Message, cid uint64) (uint64, error) {
func authorize(msg *gocoap.Message, res *gocoap.Message, cid string) (string, error) {
// Device Key is passed as Uri-Query parameter, which option ID is 15 (0xf).
key, err := authKey(msg.Option(gocoap.URIQuery))
if err != nil {
Expand All @@ -86,7 +85,7 @@ func authorize(msg *gocoap.Message, res *gocoap.Message, cid uint64) (uint64, er
res.Code = gocoap.BadRequest
}

return 0, err
return "", err
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
Expand All @@ -103,7 +102,7 @@ func authorize(msg *gocoap.Message, res *gocoap.Message, cid uint64) (uint64, er
default:
res.Code = gocoap.ServiceUnavailable
}
return 0, err
return "", err
}
res.Code = gocoap.InternalServerError
}
Expand Down Expand Up @@ -134,22 +133,20 @@ func receive(svc coap.Service) handler {
}
}

channelID := mux.Var(msg, "id")

cid, err := strconv.ParseUint(channelID, 10, 64)
if err != nil {
chanID := mux.Var(msg, "id")
if chanID == "" {
res.Code = gocoap.NotFound
return res
}

publisher, err := authorize(msg, res, cid)
publisher, err := authorize(msg, res, chanID)
if err != nil {
res.Code = gocoap.Forbidden
return res
}

rawMsg := mainflux.RawMessage{
Channel: cid,
Channel: chanID,
Publisher: publisher,
Protocol: protocol,
Payload: msg.Payload,
Expand All @@ -174,9 +171,8 @@ func observe(svc coap.Service, responses chan<- string) handler {
}
res.SetOption(gocoap.ContentFormat, gocoap.AppJSON)

id := mux.Var(msg, "id")
chanID, err := strconv.ParseUint(id, 10, 64)
if err != nil {
chanID := mux.Var(msg, "id")
if chanID == "" {
res.Code = gocoap.NotFound
return res
}
Expand All @@ -188,7 +184,7 @@ func observe(svc coap.Service, responses chan<- string) handler {
return res
}

obsID := fmt.Sprintf("%x-%d-%d", msg.Token, publisher, chanID)
obsID := fmt.Sprintf("%x-%s-%s", msg.Token, publisher, chanID)

if msg.Type == gocoap.Acknowledgement {
responses <- obsID
Expand Down
6 changes: 3 additions & 3 deletions coap/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ func (pubsub *natsPublisher) Publish(msg mainflux.RawMessage) error {
return err
}

subject := fmt.Sprintf("%s.%d", prefix, msg.Channel)
subject := fmt.Sprintf("%s.%s", prefix, msg.Channel)
return pubsub.nc.Publish(subject, data)
}

func (pubsub *natsPublisher) Subscribe(chanID uint64, obsID string, observer *coap.Observer) error {
sub, err := pubsub.nc.Subscribe(fmt.Sprintf("%s.%d", prefix, chanID), func(msg *broker.Msg) {
func (pubsub *natsPublisher) Subscribe(chanID, obsID string, observer *coap.Observer) error {
sub, err := pubsub.nc.Subscribe(fmt.Sprintf("%s.%s", prefix, chanID), func(msg *broker.Msg) {
if msg == nil {
return
}
Expand Down
11 changes: 4 additions & 7 deletions http/api/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"io"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"

Expand Down Expand Up @@ -58,13 +57,11 @@ func (tr testRequest) make() (*http.Response, error) {

func TestPublish(t *testing.T) {
chanID := "1"
invalidID := "wrong"
contentType := "application/senml+json"
token := "auth_token"
invalidToken := "invalid_token"
msg := `[{"n":"current","t":-1,"v":1.6}]`
id, _ := strconv.ParseUint(chanID, 10, 64)
thingsClient := mocks.NewThingsClient(map[string]uint64{token: id})
thingsClient := mocks.NewThingsClient(map[string]string{token: chanID})
pub := newService()
ts := newHTTPServer(pub, thingsClient)
defer ts.Close()
Expand Down Expand Up @@ -104,12 +101,12 @@ func TestPublish(t *testing.T) {
auth: token,
status: http.StatusAccepted,
},
"publish message to wrong channel": {
chanID: invalidID,
"publish message to invalid channel": {
chanID: "",
msg: msg,
contentType: contentType,
auth: token,
status: http.StatusNotFound,
status: http.StatusBadRequest,
},
"publish message unable to authorize": {
chanID: chanID,
Expand Down
22 changes: 10 additions & 12 deletions http/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
const protocol = "http"

var (
errMalformedData = errors.New("malformed SenML data")
errMalformedData = errors.New("malformed request data")
auth mainflux.ThingsServiceClient
)

Expand Down Expand Up @@ -65,9 +65,9 @@ func decodeRequest(_ context.Context, r *http.Request) (interface{}, error) {
return nil, err
}

channel, err := things.FromString(bone.GetValue(r, "id"))
if err != nil {
return nil, err
channel := bone.GetValue(r, "id")
if channel == "" {
return nil, errMalformedData
}

msg := mainflux.RawMessage{
Expand All @@ -81,25 +81,25 @@ func decodeRequest(_ context.Context, r *http.Request) (interface{}, error) {
return msg, nil
}

func authorize(r *http.Request) (uint64, error) {
func authorize(r *http.Request) (string, error) {
apiKey := r.Header.Get("Authorization")

if apiKey == "" {
return 0, things.ErrUnauthorizedAccess
return "", things.ErrUnauthorizedAccess
}

// extract ID from /channels/:id/messages
chanID, err := things.FromString(bone.GetValue(r, "id"))
if err != nil {
return 0, things.ErrNotFound
chanID := bone.GetValue(r, "id")
if chanID == "" {
return "", errMalformedData
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

id, err := auth.CanAccess(ctx, &mainflux.AccessReq{Token: apiKey, ChanID: chanID})
if err != nil {
return 0, err
return "", err
}

return id.GetValue(), nil
Expand All @@ -124,8 +124,6 @@ func encodeError(_ context.Context, err error, w http.ResponseWriter) {
switch err {
case errMalformedData:
w.WriteHeader(http.StatusBadRequest)
case things.ErrNotFound:
w.WriteHeader(http.StatusNotFound)
case things.ErrUnauthorizedAccess:
w.WriteHeader(http.StatusForbidden)
default:
Expand Down
4 changes: 2 additions & 2 deletions http/mocks/things.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ var _ mainflux.ThingsServiceClient = (*thingsClient)(nil)
const ServiceErrToken = "unavailable"

type thingsClient struct {
things map[string]uint64
things map[string]string
}

// NewThingsClient returns mock implementation of things service client.
func NewThingsClient(data map[string]uint64) mainflux.ThingsServiceClient {
func NewThingsClient(data map[string]string) mainflux.ThingsServiceClient {
return &thingsClient{data}
}

Expand Down
2 changes: 1 addition & 1 deletion http/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ func (pub *natsPublisher) Publish(msg mainflux.RawMessage) error {
return err
}

subject := fmt.Sprintf("channel.%d", msg.Channel)
subject := fmt.Sprintf("channel.%s", msg.Channel)
return pub.nc.Publish(subject, data)
}
Loading