diff --git a/cmd/coap/main.go b/cmd/coap/main.go index 7c3b8566f8..cac5bc3cd8 100644 --- a/cmd/coap/main.go +++ b/cmd/coap/main.go @@ -20,8 +20,8 @@ import ( "github.com/mainflux/mainflux/coap" "github.com/mainflux/mainflux/coap/api" logger "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/pkg/messaging/nats" thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc" + broker "github.com/nats-io/nats.go" opentracing "github.com/opentracing/opentracing-go" gocoap "github.com/plgd-dev/go-coap/v2" stdprometheus "github.com/prometheus/client_golang/prometheus" @@ -77,14 +77,13 @@ func main() { tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout) - pubsub, err := nats.NewPubSub(cfg.natsURL, "coap", logger) + nc, err := broker.Connect(cfg.natsURL) if err != nil { log.Fatalf(err.Error()) } + defer nc.Close() - defer pubsub.Close() - - svc := coap.New(tc, pubsub) + svc := coap.New(tc, nc) svc = api.LoggingMiddleware(svc, logger) diff --git a/coap/adapter.go b/coap/adapter.go index 6a5246bce4..7dfeafd5ea 100644 --- a/coap/adapter.go +++ b/coap/adapter.go @@ -11,8 +11,9 @@ import ( "fmt" "sync" + "github.com/gogo/protobuf/proto" "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging/nats" + broker "github.com/nats-io/nats.go" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/pkg/messaging" @@ -41,16 +42,16 @@ var _ Service = (*adapterService)(nil) // Observers is a map of maps, type adapterService struct { auth mainflux.ThingsServiceClient - pubsub nats.PubSub + conn *broker.Conn observers map[string]observers obsLock sync.Mutex } // New instantiates the CoAP adapter implementation. -func New(auth mainflux.ThingsServiceClient, pubsub nats.PubSub) Service { +func New(auth mainflux.ThingsServiceClient, nc *broker.Conn) Service { as := &adapterService{ auth: auth, - pubsub: pubsub, + conn: nc, observers: make(map[string]observers), obsLock: sync.Mutex{}, } @@ -69,7 +70,17 @@ func (svc *adapterService) Publish(ctx context.Context, key string, msg messagin } msg.Publisher = thid.GetValue() - return svc.pubsub.Publish(msg.Channel, msg) + data, err := proto.Marshal(&msg) + if err != nil { + return err + } + + subject := fmt.Sprintf("%s.%s", chansPrefix, msg.Channel) + if msg.Subtopic != "" { + subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic) + } + + return svc.conn.Publish(subject, data) } func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error { @@ -86,12 +97,7 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic subject = fmt.Sprintf("%s.%s", subject, subtopic) } - go func() { - <-c.Done() - svc.remove(subject, c.Token()) - }() - - obs, err := NewObserver(subject, c, svc.pubsub) + obs, err := NewObserver(subject, c, svc.conn) if err != nil { c.Cancel() return err @@ -128,7 +134,7 @@ func (svc *adapterService) put(topic, token string, o Observer) error { } // If observer exists, cancel subscription and replace it. if sub, ok := obs[token]; ok { - if err := sub.Cancel(topic); err != nil { + if err := sub.Cancel(); err != nil { return errors.Wrap(ErrUnsubscribe, err) } } @@ -139,13 +145,12 @@ func (svc *adapterService) put(topic, token string, o Observer) error { func (svc *adapterService) remove(topic, token string) error { svc.obsLock.Lock() defer svc.obsLock.Unlock() - obs, ok := svc.observers[topic] if !ok { return nil } if current, ok := obs[token]; ok { - if err := current.Cancel(topic); err != nil { + if err := current.Cancel(); err != nil { return errors.Wrap(ErrUnsubscribe, err) } } diff --git a/coap/api/logging.go b/coap/api/logging.go index 6bad55c305..f7a44b0cb9 100644 --- a/coap/api/logging.go +++ b/coap/api/logging.go @@ -61,11 +61,20 @@ func (lm *loggingMiddleware) Subscribe(ctx context.Context, key, chanID, subtopi return lm.svc.Subscribe(ctx, key, chanID, subtopic, c) } -func (lm *loggingMiddleware) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) error { +func (lm *loggingMiddleware) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("Method unsubscribe for the client %s from the channel %s and subtopic %s took %s to complete without errors.", token, chanID, subtopic, time.Since(begin)) - lm.logger.Info(fmt.Sprintf(message)) + destChannel := chanID + if subtopic != "" { + destChannel = fmt.Sprintf("%s.%s", destChannel, subtopic) + } + message := fmt.Sprintf("Method unsubscribe for the client %s from the channel %s took %s to complete", token, destChannel, time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) return lm.svc.Unsubscribe(ctx, key, chanID, subtopic, token) + } diff --git a/coap/api/transport.go b/coap/api/transport.go index e16f5e55ff..9ead3e5ca2 100644 --- a/coap/api/transport.go +++ b/coap/api/transport.go @@ -27,13 +27,22 @@ import ( ) const ( - protocol = "coap" - authQuery = "auth" + protocol = "coap" + authQuery = "auth" + startObserve = 0 // observe option value that indicates start of observation ) -var channelPartRegExp = regexp.MustCompile(`^channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`) +var channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`) -var errMalformedSubtopic = errors.New("malformed subtopic") +const ( + numGroups = 3 // entire expression + channel group + subtopic group + channelGroup = 2 // channel group is second in channel regexp +) + +var ( + errMalformedSubtopic = errors.New("malformed subtopic") + errBadOptions = errors.New("bad options") +) var ( logger log.Logger @@ -70,73 +79,79 @@ func handler(w mux.ResponseWriter, m *mux.Message) { Context: m.Context, Options: make(message.Options, 0, 16), } - defer sendResp(w, &resp) - if m.Options == nil { - logger.Warn("Nil options") - resp.Code = codes.BadOption - return - } + msg, err := decodeMessage(m) if err != nil { logger.Warn(fmt.Sprintf("Error decoding message: %s", err)) resp.Code = codes.BadRequest + sendResp(w, &resp) return } key, err := parseKey(m) if err != nil { logger.Warn(fmt.Sprintf("Error parsing auth: %s", err)) resp.Code = codes.Unauthorized + sendResp(w, &resp) return } switch m.Code { case codes.GET: - var obs uint32 - obs, err = m.Options.Observe() - if err != nil { - resp.Code = codes.BadOption - logger.Warn(fmt.Sprintf("Error reading observe option: %s", err)) - return - } - if obs == 0 { - c := coap.NewClient(w.Client(), m.Token, logger) - err = service.Subscribe(context.Background(), key, msg.Channel, msg.Subtopic, c) - break - } - service.Unsubscribe(context.Background(), key, msg.Channel, msg.Subtopic, m.Token.String()) + err = handleGet(m, w.Client(), msg, key) case codes.POST: err = service.Publish(context.Background(), key, msg) default: - resp.Code = codes.NotFound - return + err = errors.ErrNotFound } if err != nil { switch { - case errors.Contains(err, errors.ErrAuthorization): + case err == errBadOptions: + resp.Code = codes.BadOption + case err == errors.ErrNotFound: + resp.Code = codes.NotFound + case errors.Contains(err, errors.ErrAuthorization), + errors.Contains(err, errors.ErrAuthentication): resp.Code = codes.Unauthorized - return - case errors.Contains(err, coap.ErrUnsubscribe): + default: resp.Code = codes.InternalServerError } + sendResp(w, &resp) + } +} + +func handleGet(m *mux.Message, c mux.Client, msg messaging.Message, key string) error { + var obs uint32 + obs, err := m.Options.Observe() + if err != nil { + logger.Warn(fmt.Sprintf("Error reading observe option: %s", err)) + return errBadOptions } + if obs == startObserve { + c := coap.NewClient(c, m.Token, logger) + return service.Subscribe(context.Background(), key, msg.Channel, msg.Subtopic, c) + } + return service.Unsubscribe(context.Background(), key, msg.Channel, msg.Subtopic, m.Token.String()) } func decodeMessage(msg *mux.Message) (messaging.Message, error) { + if msg.Options == nil { + return messaging.Message{}, errBadOptions + } path, err := msg.Options.Path() if err != nil { return messaging.Message{}, err } channelParts := channelPartRegExp.FindStringSubmatch(path) - if len(channelParts) < 2 { + if len(channelParts) < numGroups { return messaging.Message{}, errMalformedSubtopic } - st, err := parseSubtopic(channelParts[2]) + st, err := parseSubtopic(channelParts[channelGroup]) if err != nil { return messaging.Message{}, err } ret := messaging.Message{ Protocol: protocol, - Channel: parseID(path), + Channel: channelParts[1], Subtopic: st, Payload: []byte{}, Created: time.Now().UnixNano(), @@ -152,15 +167,10 @@ func decodeMessage(msg *mux.Message) (messaging.Message, error) { return ret, nil } -func parseID(path string) string { - vars := strings.Split(path, "/") - if len(vars) > 1 { - return vars[1] - } - return "" -} - func parseKey(msg *mux.Message) (string, error) { + if obs, _ := msg.Options.Observe(); obs != 0 && msg.Code == codes.GET { + return "", nil + } authKey, err := msg.Options.GetString(message.URIQuery) if err != nil { return "", err diff --git a/coap/client.go b/coap/client.go index a2d72d8056..1a4eeb5f83 100644 --- a/coap/client.go +++ b/coap/client.go @@ -5,7 +5,9 @@ package coap import ( "bytes" + "context" "fmt" + "sync/atomic" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" @@ -30,25 +32,36 @@ type observers map[string]Observer var ErrOption = errors.New("unable to set option") type client struct { - client mux.Client - token message.Token - logger logger.Logger + client mux.Client + token message.Token + observe uint32 + logger logger.Logger } // NewClient instantiates a new Observer. -func NewClient(mc mux.Client, token message.Token, l logger.Logger) Client { +func NewClient(c mux.Client, tkn message.Token, l logger.Logger) Client { return &client{ - client: mc, - token: token, - logger: l, + client: c, + token: tkn, + logger: l, + observe: 0, } } func (c *client) Done() <-chan struct{} { - return c.client.Context().Done() + return c.client.Done() } func (c *client) Cancel() error { + m := message.Message{ + Code: codes.Content, + Token: c.token, + Context: context.Background(), + Options: make(message.Options, 0, 16), + } + if err := c.client.WriteMessage(&m); err != nil { + c.logger.Error(fmt.Sprintf("Error sending message: %s.", err)) + } return c.client.Close() } @@ -63,22 +76,29 @@ func (c *client) SendMessage(msg messaging.Message) error { Context: c.client.Context(), Body: bytes.NewReader(msg.Payload), } + + atomic.AddUint32(&c.observe, 1) var opts message.Options var buff []byte - opts, n, err := opts.SetContentFormat(buff, message.TextPlain) if err == message.ErrTooSmall { buff = append(buff, make([]byte, n)...) - opts, n, err = opts.SetContentFormat(buff, message.TextPlain) + _, _, err = opts.SetContentFormat(buff, message.TextPlain) } if err != nil { c.logger.Error(fmt.Sprintf("Can't set content format: %s.", err)) return errors.Wrap(ErrOption, err) } - m.Options = opts - if err := c.client.WriteMessage(&m); err != nil { - c.logger.Error(fmt.Sprintf("Error sending message: %s.", err)) - return err + opts = append(opts, message.Option{ID: message.Observe, Value: []byte{byte(c.observe)}}) + opts, n, err = opts.SetObserve(buff, uint32(c.observe)) + if err == message.ErrTooSmall { + buff = append(buff, make([]byte, n)...) + opts, _, err = opts.SetObserve(buff, uint32(c.observe)) } - return nil + if err != nil { + return fmt.Errorf("cannot set options to response: %w", err) + } + + m.Options = opts + return c.client.WriteMessage(&m) } diff --git a/coap/observer.go b/coap/observer.go index 9550c6f108..4dc2424cfa 100644 --- a/coap/observer.go +++ b/coap/observer.go @@ -4,35 +4,43 @@ package coap import ( - "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/gogo/protobuf/proto" + "github.com/mainflux/mainflux/pkg/messaging" broker "github.com/nats-io/nats.go" ) // Observer represents an internal observer used to handle CoAP observe messages. type Observer interface { - Cancel(topic string) error + Cancel() error } // NewObserver returns a new Observer instance. -func NewObserver(subject string, c Client, pubsub nats.PubSub) (Observer, error) { - err := pubsub.Subscribe(subject, c.SendMessage) +func NewObserver(subject string, c Client, conn *broker.Conn) (Observer, error) { + sub, err := conn.Subscribe(subject, func(m *broker.Msg) { + var msg messaging.Message + if err := proto.Unmarshal(m.Data, &msg); err != nil { + return + } + // There is no error handling, but the client takes care to log the error. + c.SendMessage(msg) + }) if err != nil { return nil, err } ret := &observer{ client: c, - pubsub: pubsub, + sub: sub, } return ret, nil } type observer struct { client Client - pubsub nats.PubSub + sub *broker.Subscription } -func (o *observer) Cancel(topic string) error { - if err := o.pubsub.Unsubscribe(topic); err != nil && err != broker.ErrConnectionClosed { +func (o *observer) Cancel() error { + if err := o.sub.Unsubscribe(); err != nil && err != broker.ErrConnectionClosed { return err } return o.client.Cancel() diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 0d86fd2b02..22efb5763e 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -105,6 +105,8 @@ services: - ./nats/:/etc/nats networks: - mainflux-base-net + ports: + - 4222:4222 auth-db: image: postgres:13.3-alpine