Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF-374 - Bring back CoAP adapter #413

Merged
merged 23 commits into from
Oct 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 207 additions & 23 deletions Gopkg.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@
branch = "master"
name = "github.com/gocql/gocql"

[[constraint]]
branch = "master"
name = "github.com/dereulenspiegel/coap-mux"

[[constraint]]
branch = "master"
name = "github.com/dustin/go-coap"

[prune]
go-tests = true
unused-packages = true
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
BUILD_DIR = build
SERVICES = users things http normalizer ws influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader cli
SERVICES = users things http normalizer ws coap influxdb-writer influxdb-reader mongodb-writer mongodb-reader cassandra-writer cassandra-reader cli
DOCKERS = $(addprefix docker_,$(SERVICES))
DOCKERS_DEV = $(addprefix docker_dev_,$(SERVICES))
CGO_ENABLED ?= 0
Expand Down
129 changes: 129 additions & 0 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//
// Copyright (c) 2018
// Mainflux
//
// SPDX-License-Identifier: Apache-2.0
//

package main

import (
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"

gocoap "github.com/dustin/go-coap"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/coap"
"github.com/mainflux/mainflux/coap/api"
"github.com/mainflux/mainflux/coap/nats"
logger "github.com/mainflux/mainflux/logger"
thingsapi "github.com/mainflux/mainflux/things/api/grpc"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"

broker "github.com/nats-io/go-nats"
)

const (
defPort = "5683"
defNatsURL = broker.DefaultURL
defThingsURL = "localhost:8181"
defLogLevel = "error"

envPort = "MF_COAP_ADAPTER_PORT"
envNatsURL = "MF_NATS_URL"
envThingsURL = "MF_THINGS_URL"
envLogLevel = "MF_COAP_ADAPTER_LOG_LEVEL"
)

type config struct {
port string
natsURL string
thingsURL string
logLevel string
}

func main() {
cfg := loadConfig()

logger, err := logger.New(os.Stdout, cfg.logLevel)
if err != nil {
log.Fatalf(err.Error())
}

nc, err := broker.Connect(cfg.natsURL)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer nc.Close()

conn, err := grpc.Dial(cfg.thingsURL, grpc.WithInsecure())
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to users service: %s", err))
os.Exit(1)
}
defer conn.Close()

cc := thingsapi.NewClient(conn)
respChan := make(chan string, 10000)
pubsub := nats.New(nc)
svc := coap.New(pubsub, respChan)
svc = api.LoggingMiddleware(svc, logger)

svc = api.MetricsMiddleware(
svc,
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "coap_adapter",
Subsystem: "api",
Name: "request_count",
Help: "Number of requests received.",
}, []string{"method"}),
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "coap_adapter",
Subsystem: "api",
Name: "request_latency_microseconds",
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)

errs := make(chan error, 2)

go startHTTPServer(cfg.port, logger, errs)
go startCOAPServer(cfg.port, svc, cc, respChan, logger, errs)

go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()

err = <-errs
logger.Error(fmt.Sprintf("CoAP adapter terminated: %s", err))
}

func loadConfig() config {
return config{
thingsURL: mainflux.Env(envThingsURL, defThingsURL),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
port: mainflux.Env(envPort, defPort),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
}
}

func startHTTPServer(port string, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
logger.Info(fmt.Sprintf("CoAP service started, exposed port %s", port))
errs <- http.ListenAndServe(p, api.MakeHTTPHandler())
}

func startCOAPServer(port string, svc coap.Service, auth mainflux.ThingsServiceClient, respChan chan<- string, l logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
l.Info(fmt.Sprintf("CoAP adapter service started, exposed port %s", port))
errs <- gocoap.ListenAndServe("udp", p, api.MakeCOAPHandler(svc, auth, l, respChan))
}
61 changes: 61 additions & 0 deletions coap/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Mainflux CoAP Adapter

Mainflux CoAP adapter provides an [CoAP](http://coap.technology/) API for sending messages through the
platform.

## Configuration

The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.

| Variable | Description | Default |
|---------------------------|------------------------|-----------------------|
| MF_COAP_ADAPTER_PORT | Service listening port | 5683 |
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_THINGS_URL | Things service URL | localhost:8181 |
| MF_COAP_ADAPTER_LOG_LEVEL | Service log level | error |

## Deployment

The service is distributed as Docker container. The following snippet provides
a compose file template that can be used to deploy the service container locally:

```yaml
version: "2"
services:
adapter:
image: mainflux/coap:[version]
container_name: [instance name]
ports:
- [host machine port]:[configured port]
environment:
MF_COAP_ADAPTER_PORT: [Service HTTP port]
MF_NATS_URL: [NATS instance URL]
MF_THINGS_URL: [Things service URL]
MF_COAP_ADAPTER_LOG_LEVEL: [Service log level]
```

Running this service outside of container requires working instance of the NATS service.
To start the service outside of the container, execute the following shell script:

```bash
# download the latest version of the service
go get github.com/mainflux/mainflux

cd $GOPATH/src/github.com/mainflux/mainflux

# compile the http
make coap

# copy binary to bin
make install

# set the environment variables and run the service
MF_THINGS_URL=[Things service URL] MF_NATS_URL=[NATS instance URL] MF_COAP_ADAPTER_PORT=[Service HTTP port] MF_COAP_ADAPTER_LOG_LEVEL=[Service log level] $GOBIN/mainflux-coap
```

## Usage

Since CoAP protocol does not support `Authorization` header (option), in order to send CoAP messages,
client valid `authorization` value must be present in `Uri-Query` option.
150 changes: 150 additions & 0 deletions coap/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//
// Copyright (c) 2018
// Mainflux
//
// SPDX-License-Identifier: Apache-2.0
//

// Package coap contains the domain concept definitions needed to support
// Mainflux coap adapter service functionality. All constant values are taken
// from RFC, and could be adjusted based on specific use case.
package coap

import (
"errors"
"sync"
"time"

"github.com/mainflux/mainflux"
broker "github.com/nats-io/go-nats"
)

const (
chanID = "id"
keyHeader = "key"

// AckRandomFactor is default ACK coefficient.
AckRandomFactor = 1.5
// AckTimeout is the amount of time to wait for a response.
AckTimeout = 2000 * time.Millisecond
// MaxRetransmit is the maximum number of times a message will be retransmitted.
MaxRetransmit = 4
)

var (
errBadOption = errors.New("bad option")
// ErrFailedMessagePublish indicates that message publishing failed.
ErrFailedMessagePublish = errors.New("failed to publish message")

// ErrFailedSubscription indicates that client couldn't subscribe to specified channel.
ErrFailedSubscription = errors.New("failed to subscribe to a channel")

// ErrFailedConnection indicates that service couldn't connect to message broker.
ErrFailedConnection = errors.New("failed to connect to message broker")
)

// Broker represents NATS broker instance.
type Broker interface {
mainflux.MessagePublisher

// Subscribes to channel with specified id and adds subscription to
// service map of subscriptions under given ID.
Subscribe(uint64, string, *Observer) error
}

// Service specifies coap service API.
type Service interface {
Broker
// Unsubscribe method is used to stop observing resource.
Unsubscribe(string)
}

var _ Service = (*adapterService)(nil)

type adapterService struct {
pubsub Broker
obs map[string]*Observer
obsLock sync.Mutex
}

// New instantiates the CoAP adapter implementation.
func New(pubsub Broker, responses <-chan string) Service {
as := &adapterService{
pubsub: pubsub,
obs: make(map[string]*Observer),
obsLock: sync.Mutex{},
}

go as.listenResponses(responses)
return as
}

func (svc *adapterService) get(obsID string) (*Observer, bool) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()

val, ok := svc.obs[obsID]
return val, ok
}

func (svc *adapterService) put(obsID string, o *Observer) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()

val, ok := svc.obs[obsID]
if ok {
close(val.Cancel)
}

svc.obs[obsID] = o
}

func (svc *adapterService) remove(obsID string) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()

val, ok := svc.obs[obsID]
if ok {
close(val.Cancel)
delete(svc.obs, obsID)
}
}

// ListenResponses method handles ACK messages received from client.
func (svc *adapterService) listenResponses(responses <-chan string) {
for {
id := <-responses

val, ok := svc.get(id)
if ok {
val.StoreExpired(false)
}
}
}

func (svc *adapterService) Publish(msg mainflux.RawMessage) error {
if err := svc.pubsub.Publish(msg); err != nil {
switch err {
case broker.ErrConnectionClosed, broker.ErrInvalidConnection:
return ErrFailedConnection
default:
return ErrFailedMessagePublish
}
}

return nil
}

func (svc *adapterService) Subscribe(chanID uint64, obsID string, o *Observer) error {
if err := svc.pubsub.Subscribe(chanID, obsID, o); err != nil {
return ErrFailedSubscription
}

// Put method removes Observer if already exists.
svc.put(obsID, o)
return nil
}

func (svc *adapterService) Unsubscribe(obsID string) {
svc.remove(obsID)
}
10 changes: 10 additions & 0 deletions coap/api/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//
// Copyright (c) 2018
// Mainflux
//
// SPDX-License-Identifier: Apache-2.0
//

// Package api contains API-related concerns: endpoint definitions, middlewares
// and all resource representations.
package api
Loading