Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Fix CoAP adapter #1572

Merged
merged 9 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"github.com/mainflux/mainflux/coap"
"github.com/mainflux/mainflux/coap/api"
logger "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
broker "github.com/nats-io/nats.go"
opentracing "github.com/opentracing/opentracing-go"
gocoap "github.com/plgd-dev/go-coap/v2"
stdprometheus "github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -77,14 +77,13 @@ func main() {

tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)

pubsub, err := nats.NewPubSub(cfg.natsURL, "coap", logger)
nc, err := broker.Connect(cfg.natsURL)
if err != nil {
log.Fatalf(err.Error())
}
defer nc.Close()

defer pubsub.Close()

svc := coap.New(tc, pubsub)
svc := coap.New(tc, nc)

svc = api.LoggingMiddleware(svc, logger)

Expand Down
33 changes: 19 additions & 14 deletions coap/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
"fmt"
"sync"

"github.com/gogo/protobuf/proto"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging/nats"
broker "github.com/nats-io/nats.go"

"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/pkg/messaging"
Expand Down Expand Up @@ -41,16 +42,16 @@ var _ Service = (*adapterService)(nil)
// Observers is a map of maps,
type adapterService struct {
auth mainflux.ThingsServiceClient
pubsub nats.PubSub
conn *broker.Conn
observers map[string]observers
obsLock sync.Mutex
}

// New instantiates the CoAP adapter implementation.
func New(auth mainflux.ThingsServiceClient, pubsub nats.PubSub) Service {
func New(auth mainflux.ThingsServiceClient, nc *broker.Conn) Service {
as := &adapterService{
auth: auth,
pubsub: pubsub,
conn: nc,
observers: make(map[string]observers),
obsLock: sync.Mutex{},
}
Expand All @@ -69,7 +70,17 @@ func (svc *adapterService) Publish(ctx context.Context, key string, msg messagin
}
msg.Publisher = thid.GetValue()

return svc.pubsub.Publish(msg.Channel, msg)
data, err := proto.Marshal(&msg)
if err != nil {
return err
}

subject := fmt.Sprintf("%s.%s", chansPrefix, msg.Channel)
if msg.Subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic)
}

return svc.conn.Publish(subject, data)
}

func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error {
Expand All @@ -86,12 +97,7 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}

go func() {
<-c.Done()
svc.remove(subject, c.Token())
}()

obs, err := NewObserver(subject, c, svc.pubsub)
obs, err := NewObserver(subject, c, svc.conn)
if err != nil {
c.Cancel()
return err
Expand Down Expand Up @@ -128,7 +134,7 @@ func (svc *adapterService) put(topic, token string, o Observer) error {
}
// If observer exists, cancel subscription and replace it.
if sub, ok := obs[token]; ok {
if err := sub.Cancel(topic); err != nil {
if err := sub.Cancel(); err != nil {
return errors.Wrap(ErrUnsubscribe, err)
}
}
Expand All @@ -139,13 +145,12 @@ func (svc *adapterService) put(topic, token string, o Observer) error {
func (svc *adapterService) remove(topic, token string) error {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()

obs, ok := svc.observers[topic]
if !ok {
return nil
}
if current, ok := obs[token]; ok {
if err := current.Cancel(topic); err != nil {
if err := current.Cancel(); err != nil {
return errors.Wrap(ErrUnsubscribe, err)
}
}
Expand Down
15 changes: 12 additions & 3 deletions coap/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,20 @@ func (lm *loggingMiddleware) Subscribe(ctx context.Context, key, chanID, subtopi
return lm.svc.Subscribe(ctx, key, chanID, subtopic, c)
}

func (lm *loggingMiddleware) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) error {
func (lm *loggingMiddleware) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method unsubscribe for the client %s from the channel %s and subtopic %s took %s to complete without errors.", token, chanID, subtopic, time.Since(begin))
lm.logger.Info(fmt.Sprintf(message))
destChannel := chanID
if subtopic != "" {
destChannel = fmt.Sprintf("%s.%s", destChannel, subtopic)
}
message := fmt.Sprintf("Method unsubscribe for the client %s from the channel %s took %s to complete", token, destChannel, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.Unsubscribe(ctx, key, chanID, subtopic, token)

}
88 changes: 49 additions & 39 deletions coap/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,22 @@ import (
)

const (
protocol = "coap"
authQuery = "auth"
protocol = "coap"
authQuery = "auth"
startObserve = 0 // observe option value that indicates start of observation
)

var channelPartRegExp = regexp.MustCompile(`^channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`)
var channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`)

var errMalformedSubtopic = errors.New("malformed subtopic")
const (
numGroups = 3 // entire expression + channel group + subtopic group
channelGroup = 2 // channel group is second in channel regexp
)

var (
errMalformedSubtopic = errors.New("malformed subtopic")
errBadOptions = errors.New("bad options")
)

var (
logger log.Logger
Expand Down Expand Up @@ -70,73 +79,79 @@ func handler(w mux.ResponseWriter, m *mux.Message) {
Context: m.Context,
Options: make(message.Options, 0, 16),
}
defer sendResp(w, &resp)
if m.Options == nil {
logger.Warn("Nil options")
resp.Code = codes.BadOption
return
}

msg, err := decodeMessage(m)
if err != nil {
logger.Warn(fmt.Sprintf("Error decoding message: %s", err))
resp.Code = codes.BadRequest
sendResp(w, &resp)
return
}
key, err := parseKey(m)
if err != nil {
logger.Warn(fmt.Sprintf("Error parsing auth: %s", err))
resp.Code = codes.Unauthorized
sendResp(w, &resp)
return
}
switch m.Code {
case codes.GET:
var obs uint32
obs, err = m.Options.Observe()
if err != nil {
resp.Code = codes.BadOption
logger.Warn(fmt.Sprintf("Error reading observe option: %s", err))
return
}
if obs == 0 {
c := coap.NewClient(w.Client(), m.Token, logger)
err = service.Subscribe(context.Background(), key, msg.Channel, msg.Subtopic, c)
break
}
service.Unsubscribe(context.Background(), key, msg.Channel, msg.Subtopic, m.Token.String())
err = handleGet(m, w.Client(), msg, key)
case codes.POST:
err = service.Publish(context.Background(), key, msg)
default:
resp.Code = codes.NotFound
return
err = errors.ErrNotFound
}
if err != nil {
switch {
case errors.Contains(err, errors.ErrAuthorization):
case err == errBadOptions:
resp.Code = codes.BadOption
case err == errors.ErrNotFound:
resp.Code = codes.NotFound
case errors.Contains(err, errors.ErrAuthorization),
errors.Contains(err, errors.ErrAuthentication):
resp.Code = codes.Unauthorized
return
case errors.Contains(err, coap.ErrUnsubscribe):
default:
resp.Code = codes.InternalServerError
}
sendResp(w, &resp)
}
}

func handleGet(m *mux.Message, c mux.Client, msg messaging.Message, key string) error {
var obs uint32
obs, err := m.Options.Observe()
if err != nil {
logger.Warn(fmt.Sprintf("Error reading observe option: %s", err))
return errBadOptions
}
if obs == startObserve {
c := coap.NewClient(c, m.Token, logger)
return service.Subscribe(context.Background(), key, msg.Channel, msg.Subtopic, c)
}
return service.Unsubscribe(context.Background(), key, msg.Channel, msg.Subtopic, m.Token.String())
mteodor marked this conversation as resolved.
Show resolved Hide resolved
}

func decodeMessage(msg *mux.Message) (messaging.Message, error) {
if msg.Options == nil {
return messaging.Message{}, errBadOptions
}
path, err := msg.Options.Path()
if err != nil {
return messaging.Message{}, err
}
channelParts := channelPartRegExp.FindStringSubmatch(path)
if len(channelParts) < 2 {
if len(channelParts) < numGroups {
return messaging.Message{}, errMalformedSubtopic
}

st, err := parseSubtopic(channelParts[2])
st, err := parseSubtopic(channelParts[channelGroup])
if err != nil {
return messaging.Message{}, err
}
ret := messaging.Message{
Protocol: protocol,
Channel: parseID(path),
Channel: channelParts[1],
dborovcanin marked this conversation as resolved.
Show resolved Hide resolved
Subtopic: st,
Payload: []byte{},
Created: time.Now().UnixNano(),
Expand All @@ -152,15 +167,10 @@ func decodeMessage(msg *mux.Message) (messaging.Message, error) {
return ret, nil
}

func parseID(path string) string {
vars := strings.Split(path, "/")
if len(vars) > 1 {
return vars[1]
}
return ""
}

func parseKey(msg *mux.Message) (string, error) {
if obs, _ := msg.Options.Observe(); obs != 0 && msg.Code == codes.GET {
return "", nil
}
authKey, err := msg.Options.GetString(message.URIQuery)
if err != nil {
return "", err
Expand Down
50 changes: 35 additions & 15 deletions coap/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package coap

import (
"bytes"
"context"
"fmt"
"sync/atomic"

"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
Expand All @@ -30,25 +32,36 @@ type observers map[string]Observer
var ErrOption = errors.New("unable to set option")

type client struct {
client mux.Client
token message.Token
logger logger.Logger
client mux.Client
token message.Token
observe uint32
logger logger.Logger
}

// NewClient instantiates a new Observer.
func NewClient(mc mux.Client, token message.Token, l logger.Logger) Client {
func NewClient(c mux.Client, tkn message.Token, l logger.Logger) Client {
return &client{
client: mc,
token: token,
logger: l,
client: c,
token: tkn,
logger: l,
observe: 0,
}
}

func (c *client) Done() <-chan struct{} {
return c.client.Context().Done()
return c.client.Done()
}

func (c *client) Cancel() error {
m := message.Message{
Code: codes.Content,
Token: c.token,
Context: context.Background(),
Options: make(message.Options, 0, 16),
}
if err := c.client.WriteMessage(&m); err != nil {
c.logger.Error(fmt.Sprintf("Error sending message: %s.", err))
}
return c.client.Close()
}

Expand All @@ -63,22 +76,29 @@ func (c *client) SendMessage(msg messaging.Message) error {
Context: c.client.Context(),
Body: bytes.NewReader(msg.Payload),
}

atomic.AddUint32(&c.observe, 1)
var opts message.Options
var buff []byte

opts, n, err := opts.SetContentFormat(buff, message.TextPlain)
if err == message.ErrTooSmall {
buff = append(buff, make([]byte, n)...)
opts, n, err = opts.SetContentFormat(buff, message.TextPlain)
_, _, err = opts.SetContentFormat(buff, message.TextPlain)
}
if err != nil {
c.logger.Error(fmt.Sprintf("Can't set content format: %s.", err))
return errors.Wrap(ErrOption, err)
}
m.Options = opts
if err := c.client.WriteMessage(&m); err != nil {
c.logger.Error(fmt.Sprintf("Error sending message: %s.", err))
return err
opts = append(opts, message.Option{ID: message.Observe, Value: []byte{byte(c.observe)}})
opts, n, err = opts.SetObserve(buff, uint32(c.observe))
if err == message.ErrTooSmall {
buff = append(buff, make([]byte, n)...)
opts, _, err = opts.SetObserve(buff, uint32(c.observe))
}
return nil
if err != nil {
return fmt.Errorf("cannot set options to response: %w", err)
}

m.Options = opts
return c.client.WriteMessage(&m)
}
Loading