Skip to content

Commit

Permalink
MF-374 - Bring back CoAP adapter (#413)
Browse files Browse the repository at this point in the history
* Bring old CoAP code back

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix channel ID formatting due to type change

Uncomment error handling for authorization.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update CoAP adapter docs

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Add copyright headers

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Remove redundant type declaration

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Add CoAP adapter to the list of services

Add CoAp adapter in Makefile services list and fix corresponding documentation.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Refactor CoAP code

Merge multipe `const` block int single and declare consts before vars.
Un-export notFound handler since there is no need to export it.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Update http version endpoint

This separates CoAP and HTTP APIs.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Refactor CoAP POST method handling

This PR is a part of CoAP adapter refactoring that will simplify adapter implementation.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Refactor CoAP adapter

Change CoAP message handling to simplify adapter implementation.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Add backoff timeout for server ping to client

Update CoAP adapter to provide subset of necessary features from
protocol specification.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Fix leaking locked goroutine

In case of the stopped ticker, its channel is NOT closed, so pinging might be left stuck waiting for the stopped ticker to send a notification.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Format code

Use more meaningful name for Handlers map.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Use and stop ticker from the same goroutine

Stop handler Ticker from ping goroutine rather than the cancel goroutine.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Check if subscription already exists in put method

Fix potential leak of handlers providing check inside of put method.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Use MessageID as Observe option

Since MessageID satisfies observe option behaviour, use Message ID
instead of local timestamp. Remove Thicker from handler and use it on
transport layer.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Use name Observer insted of Handler

Name `Observer` is used in protocol specification, so this naming makes
code more self-documenting.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Add CoAP adapter to docker-compose.yml

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Add copyright headers

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Remove unused constants

Fix service name in startup log message.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Add metrics endpoint

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Refactor code

Config fields from main.go should not be exported; minor style changes.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>

* Update authorization URI-Query option

Use `authorization` value in URI-Query option instead of `key`. This
mimics Authorization header in some other protocols (e.g. HTTP). Please
note that this value can be replaced with simple `auth` to save space,
due to constrained URI-Query option size.

Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>
  • Loading branch information
dborovcanin authored and anovakovic01 committed Oct 31, 2018
1 parent 9cacf57 commit d6755e4
Show file tree
Hide file tree
Showing 37 changed files with 2,643 additions and 2,548 deletions.
230 changes: 207 additions & 23 deletions Gopkg.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@
branch = "master"
name = "github.com/gocql/gocql"

[[constraint]]
branch = "master"
name = "github.com/dereulenspiegel/coap-mux"

[[constraint]]
branch = "master"
name = "github.com/dustin/go-coap"

[prune]
go-tests = true
unused-packages = true
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
BUILD_DIR = build
SERVICES = users things http normalizer ws influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader cli
SERVICES = users things http normalizer ws coap influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader cli
DOCKERS = $(addprefix docker_,$(SERVICES))
DOCKERS_DEV = $(addprefix docker_dev_,$(SERVICES))
CGO_ENABLED ?= 0
Expand Down
129 changes: 129 additions & 0 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//
// Copyright (c) 2018
// Mainflux
//
// SPDX-License-Identifier: Apache-2.0
//

package main

import (
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"

gocoap "github.com/dustin/go-coap"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/coap"
"github.com/mainflux/mainflux/coap/api"
"github.com/mainflux/mainflux/coap/nats"
logger "github.com/mainflux/mainflux/logger"
thingsapi "github.com/mainflux/mainflux/things/api/grpc"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"

broker "github.com/nats-io/go-nats"
)

const (
defPort = "5683"
defNatsURL = broker.DefaultURL
defThingsURL = "localhost:8181"
defLogLevel = "error"

envPort = "MF_COAP_ADAPTER_PORT"
envNatsURL = "MF_NATS_URL"
envThingsURL = "MF_THINGS_URL"
envLogLevel = "MF_COAP_ADAPTER_LOG_LEVEL"
)

type config struct {
port string
natsURL string
thingsURL string
logLevel string
}

func main() {
cfg := loadConfig()

logger, err := logger.New(os.Stdout, cfg.logLevel)
if err != nil {
log.Fatalf(err.Error())
}

nc, err := broker.Connect(cfg.natsURL)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer nc.Close()

conn, err := grpc.Dial(cfg.thingsURL, grpc.WithInsecure())
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to users service: %s", err))
os.Exit(1)
}
defer conn.Close()

cc := thingsapi.NewClient(conn)
respChan := make(chan string, 10000)
pubsub := nats.New(nc)
svc := coap.New(pubsub, respChan)
svc = api.LoggingMiddleware(svc, logger)

svc = api.MetricsMiddleware(
svc,
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "coap_adapter",
Subsystem: "api",
Name: "request_count",
Help: "Number of requests received.",
}, []string{"method"}),
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "coap_adapter",
Subsystem: "api",
Name: "request_latency_microseconds",
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)

errs := make(chan error, 2)

go startHTTPServer(cfg.port, logger, errs)
go startCOAPServer(cfg.port, svc, cc, respChan, logger, errs)

go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()

err = <-errs
logger.Error(fmt.Sprintf("CoAP adapter terminated: %s", err))
}

func loadConfig() config {
return config{
thingsURL: mainflux.Env(envThingsURL, defThingsURL),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
port: mainflux.Env(envPort, defPort),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
}
}

func startHTTPServer(port string, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
logger.Info(fmt.Sprintf("CoAP service started, exposed port %s", port))
errs <- http.ListenAndServe(p, api.MakeHTTPHandler())
}

func startCOAPServer(port string, svc coap.Service, auth mainflux.ThingsServiceClient, respChan chan<- string, l logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
l.Info(fmt.Sprintf("CoAP adapter service started, exposed port %s", port))
errs <- gocoap.ListenAndServe("udp", p, api.MakeCOAPHandler(svc, auth, l, respChan))
}
61 changes: 61 additions & 0 deletions coap/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Mainflux CoAP Adapter

Mainflux CoAP adapter provides an [CoAP](http://coap.technology/) API for sending messages through the
platform.

## Configuration

The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.

| Variable | Description | Default |
|---------------------------|------------------------|-----------------------|
| MF_COAP_ADAPTER_PORT | Service listening port | 5683 |
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_THINGS_URL | Things service URL | localhost:8181 |
| MF_COAP_ADAPTER_LOG_LEVEL | Service log level | error |

## Deployment

The service is distributed as Docker container. The following snippet provides
a compose file template that can be used to deploy the service container locally:

```yaml
version: "2"
services:
adapter:
image: mainflux/coap:[version]
container_name: [instance name]
ports:
- [host machine port]:[configured port]
environment:
MF_COAP_ADAPTER_PORT: [Service HTTP port]
MF_NATS_URL: [NATS instance URL]
MF_THINGS_URL: [Things service URL]
MF_COAP_ADAPTER_LOG_LEVEL: [Service log level]
```
Running this service outside of container requires working instance of the NATS service.
To start the service outside of the container, execute the following shell script:
```bash
# download the latest version of the service
go get github.com/mainflux/mainflux

cd $GOPATH/src/github.com/mainflux/mainflux

# compile the http
make coap

# copy binary to bin
make install

# set the environment variables and run the service
MF_THINGS_URL=[Things service URL] MF_NATS_URL=[NATS instance URL] MF_COAP_ADAPTER_PORT=[Service HTTP port] MF_COAP_ADAPTER_LOG_LEVEL=[Service log level] $GOBIN/mainflux-coap
```

## Usage

Since CoAP protocol does not support `Authorization` header (option), in order to send CoAP messages,
client valid `authorization` value must be present in `Uri-Query` option.
150 changes: 150 additions & 0 deletions coap/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//
// Copyright (c) 2018
// Mainflux
//
// SPDX-License-Identifier: Apache-2.0
//

// Package coap contains the domain concept definitions needed to support
// Mainflux coap adapter service functionality. All constant values are taken
// from RFC, and could be adjusted based on specific use case.
package coap

import (
"errors"
"sync"
"time"

"github.com/mainflux/mainflux"
broker "github.com/nats-io/go-nats"
)

const (
chanID = "id"
keyHeader = "key"

// AckRandomFactor is default ACK coefficient.
AckRandomFactor = 1.5
// AckTimeout is the amount of time to wait for a response.
AckTimeout = 2000 * time.Millisecond
// MaxRetransmit is the maximum number of times a message will be retransmitted.
MaxRetransmit = 4
)

var (
errBadOption = errors.New("bad option")
// ErrFailedMessagePublish indicates that message publishing failed.
ErrFailedMessagePublish = errors.New("failed to publish message")

// ErrFailedSubscription indicates that client couldn't subscribe to specified channel.
ErrFailedSubscription = errors.New("failed to subscribe to a channel")

// ErrFailedConnection indicates that service couldn't connect to message broker.
ErrFailedConnection = errors.New("failed to connect to message broker")
)

// Broker represents NATS broker instance.
type Broker interface {
mainflux.MessagePublisher

// Subscribes to channel with specified id and adds subscription to
// service map of subscriptions under given ID.
Subscribe(uint64, string, *Observer) error
}

// Service specifies coap service API.
type Service interface {
Broker
// Unsubscribe method is used to stop observing resource.
Unsubscribe(string)
}

var _ Service = (*adapterService)(nil)

type adapterService struct {
pubsub Broker
obs map[string]*Observer
obsLock sync.Mutex
}

// New instantiates the CoAP adapter implementation.
func New(pubsub Broker, responses <-chan string) Service {
as := &adapterService{
pubsub: pubsub,
obs: make(map[string]*Observer),
obsLock: sync.Mutex{},
}

go as.listenResponses(responses)
return as
}

func (svc *adapterService) get(obsID string) (*Observer, bool) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()

val, ok := svc.obs[obsID]
return val, ok
}

func (svc *adapterService) put(obsID string, o *Observer) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()

val, ok := svc.obs[obsID]
if ok {
close(val.Cancel)
}

svc.obs[obsID] = o
}

func (svc *adapterService) remove(obsID string) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()

val, ok := svc.obs[obsID]
if ok {
close(val.Cancel)
delete(svc.obs, obsID)
}
}

// ListenResponses method handles ACK messages received from client.
func (svc *adapterService) listenResponses(responses <-chan string) {
for {
id := <-responses

val, ok := svc.get(id)
if ok {
val.StoreExpired(false)
}
}
}

func (svc *adapterService) Publish(msg mainflux.RawMessage) error {
if err := svc.pubsub.Publish(msg); err != nil {
switch err {
case broker.ErrConnectionClosed, broker.ErrInvalidConnection:
return ErrFailedConnection
default:
return ErrFailedMessagePublish
}
}

return nil
}

func (svc *adapterService) Subscribe(chanID uint64, obsID string, o *Observer) error {
if err := svc.pubsub.Subscribe(chanID, obsID, o); err != nil {
return ErrFailedSubscription
}

// Put method removes Observer if already exists.
svc.put(obsID, o)
return nil
}

func (svc *adapterService) Unsubscribe(obsID string) {
svc.remove(obsID)
}
10 changes: 10 additions & 0 deletions coap/api/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//
// Copyright (c) 2018
// Mainflux
//
// SPDX-License-Identifier: Apache-2.0
//

// Package api contains API-related concerns: endpoint definitions, middlewares
// and all resource representations.
package api
Loading

0 comments on commit d6755e4

Please sign in to comment.