Skip to content

Commit

Permalink
Fix channel ID formatting due to type change
Browse files Browse the repository at this point in the history
Uncomment error handling for authorization.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
  • Loading branch information
dborovcanin committed Oct 2, 2018
1 parent 8ab0749 commit 29607aa
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 24 deletions.
4 changes: 2 additions & 2 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func main() {
errs := make(chan error, 2)

go func() {
p := fmt.Sprintf(":%d", cfg.Port)
logger.Info(fmt.Sprintf("CoAP adapter service started, exposed port %d", cfg.Port))
p := fmt.Sprintf(":%s", cfg.Port)
logger.Info(fmt.Sprintf("CoAP adapter service started, exposed port %s", cfg.Port))
errs <- api.ListenAndServe(svc, cc, p)
}()

Expand Down
3 changes: 1 addition & 2 deletions coap/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ var (
// ErrFailedConnection indicates that service couldn't connect to message broker.
ErrFailedConnection = errors.New("failed to connect to message broker")

// extracted to avoid recomputation
maxTimeout = int(float64(AckTimeout) * ((math.Pow(2, float64(MaxRetransmit))) - 1) * responseBackoffMultiplier)
)

Expand Down Expand Up @@ -111,7 +110,7 @@ func (svc *adapterService) Publish(msg mainflux.RawMessage) error {

func (svc *adapterService) Subscribe(chanID uint64, clientID string, ch nats.Channel) error {
// Remove entry if already exists.
svc.Unsubscribe(clientID)
svc.remove(clientID)
if err := svc.pubsub.Subscribe(chanID, ch); err != nil {
return ErrFailedSubscription
}
Expand Down
4 changes: 2 additions & 2 deletions coap/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func LoggingMiddleware(svc coap.Service, logger log.Logger) coap.Service {

func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method publish to channel %s took %s to complete", msg.Channel, time.Since(begin))
message := fmt.Sprintf("Method publish to channel %d took %s to complete", msg.Channel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand All @@ -39,7 +39,7 @@ func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {

func (lm *loggingMiddleware) Subscribe(chanID uint64, clientID string, channel nats.Channel) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method subscribe to channel %s took %s to complete", chanID, time.Since(begin))
message := fmt.Sprintf("Method subscribe to channel %d took %s to complete", chanID, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
Expand Down
32 changes: 17 additions & 15 deletions coap/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
gocoap "github.com/dustin/go-coap"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/coap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -49,19 +51,19 @@ func authorize(msg *gocoap.Message, res *gocoap.Message, cid uint64) (uint64, er

id, err := auth.CanAccess(ctx, &mainflux.AccessReq{Token: key, ChanID: cid})

// if err != nil {
// e, ok := status.FromError(err)
// if ok {
// switch e.Code() {
// case codes.PermissionDenied:
// res.Code = gocoap.Forbidden
// default:
// res.Code = gocoap.ServiceUnavailable
// }
// return 0, err
// }
// res.Code = gocoap.InternalServerError
// }
if err != nil {
e, ok := status.FromError(err)
if ok {
switch e.Code() {
case codes.PermissionDenied:
res.Code = gocoap.Forbidden
default:
res.Code = gocoap.ServiceUnavailable
}
return 0, err
}
res.Code = gocoap.InternalServerError
}
return id.GetValue(), nil
}

Expand Down Expand Up @@ -94,7 +96,7 @@ func serve(svc coap.Service, conn *net.UDPConn, data []byte, addr *net.UDPAddr,
if err != nil {
break
}
id := fmt.Sprintf("%s-%x", publisher, msg.Token)
id := fmt.Sprintf("%d-%x", publisher, msg.Token)
svc.RemoveTimeout(id)
svc.Unsubscribe(id)
case gocoap.Acknowledgement:
Expand All @@ -108,7 +110,7 @@ func serve(svc coap.Service, conn *net.UDPConn, data []byte, addr *net.UDPAddr,
if err != nil {
break
}
id := fmt.Sprintf("%s-%x", publisher, msg.Token)
id := fmt.Sprintf("%d-%x", publisher, msg.Token)
svc.RemoveTimeout(id)
default:
res = rh.ServeCOAP(conn, addr, &msg)
Expand Down
4 changes: 2 additions & 2 deletions coap/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func observe(svc coap.Service) handler {
}

if value, ok := msg.Option(gocoap.Observe).(uint32); ok && value == 1 {
id := fmt.Sprintf("%s-%x", publisher, msg.Token)
id := fmt.Sprintf("%d-%x", publisher, msg.Token)
svc.Unsubscribe(id)
}

Expand All @@ -150,7 +150,7 @@ func observe(svc coap.Service) handler {
Timer: make(chan bool),
Notify: make(chan bool),
}
id := fmt.Sprintf("%s-%x", publisher, msg.Token)
id := fmt.Sprintf("%d-%x", publisher, msg.Token)
if err := svc.Subscribe(cid, id, ch); err != nil {
res.Code = gocoap.InternalServerError
return res
Expand Down
3 changes: 2 additions & 1 deletion coap/nats/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ func (pubsub *natsPublisher) Publish(msg mainflux.RawMessage) error {
if err != nil {
return err
}
return pubsub.nc.Publish(fmt.Sprintf("%s.%s", prefix, msg.Channel), data)
return pubsub.nc.Publish(fmt.Sprintf("%s.%d", prefix, msg.Channel), data)
}

func (pubsub *natsPublisher) Subscribe(chanID uint64, channel Channel) error {
sub, err := pubsub.nc.Subscribe(fmt.Sprintf("%s.%d", prefix, chanID), func(msg *broker.Msg) {
if msg == nil {
return
}

var rawMsg mainflux.RawMessage
if err := proto.Unmarshal(msg.Data, &rawMsg); err != nil {
return
Expand Down

0 comments on commit 29607aa

Please sign in to comment.