Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Implement errors package in senml transformer, readers and writers #1108

Merged
merged 5 commits into from
Apr 13, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/postgres-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

const (
svcName = "postgres-writer"
svcName = "postgres-reader"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

sep = ","

defLogLevel = "error"
Expand Down Expand Up @@ -112,7 +112,7 @@ func main() {
}()

err = <-errs
logger.Error(fmt.Sprintf("Postgres writer service terminated: %s", err))
logger.Error(fmt.Sprintf("Postgres reader service terminated: %s", err))
}

func loadConfig() config {
Expand Down Expand Up @@ -214,13 +214,13 @@ func newService(db *sqlx.DB, logger logger.Logger) readers.MessageRepository {
svc,
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "postgres",
blokovi marked this conversation as resolved.
Show resolved Hide resolved
Subsystem: "message_writer",
Subsystem: "message_reader",
Name: "request_count",
Help: "Number of requests received.",
}, []string{"method"}),
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "postgres",
Subsystem: "message_writer",
Subsystem: "message_reader",
Name: "request_latency_microseconds",
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
Expand Down
2 changes: 1 addition & 1 deletion cmd/postgres-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
defLogLevel = "error"
defNatsURL = "nats://localhost:4222"
defPort = "8180"
defDBHost = "postgres"
defDBHost = "localhost"
defDBPort = "5432"
defDBUser = "mainflux"
defDBPass = "mainflux"
Expand Down
4 changes: 4 additions & 0 deletions readers/api/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,7 @@ func (res pageRes) Code() int {
func (res pageRes) Empty() bool {
return false
}

type errorRes struct {
Err string `json:"error"`
}
17 changes: 12 additions & 5 deletions readers/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ package api
import (
"context"
"encoding/json"
"errors"
"net/http"
"strconv"
"time"

kithttp "github.com/go-kit/kit/transport/http"
"github.com/go-zoo/bone"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/readers"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -111,15 +111,22 @@ func encodeResponse(_ context.Context, w http.ResponseWriter, response interface
}

func encodeError(_ context.Context, err error, w http.ResponseWriter) {
switch err {
case nil:
case errInvalidRequest:
switch {
case errors.Contains(err, nil):
case errors.Contains(err, errInvalidRequest):
w.WriteHeader(http.StatusBadRequest)
case errUnauthorizedAccess:
case errors.Contains(err, errUnauthorizedAccess):
w.WriteHeader(http.StatusForbidden)
default:
w.WriteHeader(http.StatusInternalServerError)
}
errorVal, ok := err.(errors.Error)
if ok {
w.Header().Set("Content-Type", contentType)
if err := json.NewEncoder(w).Encode(errorRes{Err: errorVal.Msg()}); err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
}
}

func authorize(r *http.Request, chanID string) error {
Expand Down
7 changes: 5 additions & 2 deletions readers/cassandra/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"fmt"

"github.com/gocql/gocql"
"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/readers"
"github.com/mainflux/mainflux/transformers/senml"
)

var errReadMessages = errors.New("faled to read messages from cassandra database")

var _ readers.MessageRepository = (*cassandraRepository)(nil)

type cassandraRepository struct {
Expand Down Expand Up @@ -59,13 +62,13 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query
&msg.Name, &msg.Unit, &msg.Value, &msg.StringValue, &msg.BoolValue,
&msg.DataValue, &msg.Sum, &msg.Time, &msg.UpdateTime)
if err != nil {
return readers.MessagesPage{}, err
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
page.Messages = append(page.Messages, msg)
}

if err := cr.session.Query(countCQL, vals[:len(vals)-1]...).Scan(&page.Total); err != nil {
return readers.MessagesPage{}, err
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}

return page, nil
Expand Down
9 changes: 6 additions & 3 deletions readers/influxdb/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/readers"

influxdata "github.com/influxdata/influxdb/client/v2"
Expand All @@ -16,6 +17,8 @@ import (

const countCol = "count"

var errReadMessages = errors.New("faled to read messages from influxdb database")

var _ readers.MessageRepository = (*influxRepository)(nil)

type influxRepository struct {
Expand Down Expand Up @@ -43,10 +46,10 @@ func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query

resp, err := repo.client.Query(q)
if err != nil {
return readers.MessagesPage{}, err
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
if resp.Error() != nil {
return readers.MessagesPage{}, resp.Error()
return readers.MessagesPage{}, errors.Wrap(errReadMessages, resp.Error())
}

if len(resp.Results) < 1 || len(resp.Results[0].Series) < 1 {
Expand All @@ -60,7 +63,7 @@ func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query

total, err := repo.count(condition)
if err != nil {
return readers.MessagesPage{}, err
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}

return readers.MessagesPage{
Expand Down
9 changes: 6 additions & 3 deletions readers/mongodb/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package mongodb
import (
"context"

"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/readers"
"github.com/mainflux/mainflux/transformers/senml"
"go.mongodb.org/mongo-driver/bson"
Expand All @@ -15,6 +16,8 @@ import (

const collection = "mainflux"

var errReadMessages = errors.New("faled to read messages from mongodb database")

var _ readers.MessageRepository = (*mongoRepository)(nil)

type mongoRepository struct {
Expand Down Expand Up @@ -54,15 +57,15 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m
filter := fmtCondition(chanID, query)
cursor, err := col.Find(context.Background(), filter, options.Find().SetSort(sortMap).SetLimit(int64(limit)).SetSkip(int64(offset)))
if err != nil {
return readers.MessagesPage{}, err
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
defer cursor.Close(context.Background())

messages := []senml.Message{}
for cursor.Next(context.Background()) {
var m message
if err := cursor.Decode(&m); err != nil {
return readers.MessagesPage{}, err
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}

msg := senml.Message{
Expand Down Expand Up @@ -93,7 +96,7 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m

total, err := col.CountDocuments(context.Background(), filter)
if err != nil {
return readers.MessagesPage{}, err
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
if total < 0 {
return readers.MessagesPage{}, nil
Expand Down
10 changes: 5 additions & 5 deletions readers/postgres/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
package postgres

import (
"errors"
"fmt"

"github.com/jmoiron/sqlx" // required for DB access
"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/readers"
"github.com/mainflux/mainflux/transformers/senml"
)

const errInvalid = "invalid_text_representation"

var errInvalidMessage = errors.New("invalid message representation")
var errReadMessages = errors.New("faled to read messages from postgres database")

var _ readers.MessageRepository = (*postgresRepository)(nil)

Expand Down Expand Up @@ -46,7 +46,7 @@ func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query

rows, err := tr.db.NamedQuery(q, params)
if err != nil {
return readers.MessagesPage{}, err
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}
defer rows.Close()

Expand All @@ -58,7 +58,7 @@ func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query
for rows.Next() {
dbm := dbMessage{Channel: chanID}
if err := rows.StructScan(&dbm); err != nil {
return readers.MessagesPage{}, err
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}

msg := toMessage(dbm)
Expand All @@ -74,7 +74,7 @@ func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query
}

if err := tr.db.QueryRow(q, qParams...).Scan(&page.Total); err != nil {
return readers.MessagesPage{}, err
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err)
}

return page, nil
Expand Down
10 changes: 8 additions & 2 deletions transformers/senml/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@ package senml

import (
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/transformers"
"github.com/mainflux/senml"
)

var (
errDecode = errors.New("failed to decode senml")
errNormalize = errors.New("faled to normalize senml")
)

var formats = map[string]senml.Format{
JSON: senml.JSON,
CBOR: senml.CBOR,
Expand All @@ -29,12 +35,12 @@ func (n transformer) Transform(msg broker.Message) (interface{}, error) {

raw, err := senml.Decode(msg.Payload, format)
if err != nil {
return nil, err
return nil, errors.Wrap(errDecode, err)
}

normalized, err := senml.Normalize(raw)
if err != nil {
return nil, err
return nil, errors.Wrap(errNormalize, err)
}

msgs := make([]Message, len(normalized.Records))
Expand Down
3 changes: 2 additions & 1 deletion transformers/senml/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"

"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/transformers/senml"
mfsenml "github.com/mainflux/senml"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -102,6 +103,6 @@ func TestTransform(t *testing.T) {
for _, tc := range cases {
msgs, err := tr.Transform(tc.msg)
assert.Equal(t, tc.msgs, msgs, fmt.Sprintf("%s expected %v, got %v", tc.desc, tc.msgs, msgs))
assert.Equal(t, tc.err, err, fmt.Sprintf("%s expected %s, got %s", tc.desc, tc.err, err))
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s expected %s, got %s", tc.desc, tc.err, err))
}
}
5 changes: 4 additions & 1 deletion writers/cassandra/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ package cassandra

import (
"github.com/gocql/gocql"
"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/transformers/senml"
"github.com/mainflux/mainflux/writers"
)

var errSaveMessage = errors.New("faled to save message to cassandra database")

var _ writers.MessageRepository = (*cassandraRepository)(nil)

type cassandraRepository struct {
Expand All @@ -32,7 +35,7 @@ func (cr *cassandraRepository) Save(messages ...senml.Message) error {
msg.Protocol, msg.Name, msg.Unit, msg.Value, msg.StringValue,
msg.BoolValue, msg.DataValue, msg.Sum, msg.Time, msg.UpdateTime).Exec()
if err != nil {
return err
return errors.Wrap(errSaveMessage, err)
}
}

Expand Down
13 changes: 9 additions & 4 deletions writers/influxdb/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"
"time"

"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/transformers/senml"
"github.com/mainflux/mainflux/writers"

Expand All @@ -16,6 +17,8 @@ import (

const pointName = "messages"

var errSaveMessage = errors.New("faled to save message to influxdb database")

var _ writers.MessageRepository = (*influxRepo)(nil)

type influxRepo struct {
Expand All @@ -39,7 +42,7 @@ func New(client influxdata.Client, database string) writers.MessageRepository {
func (repo *influxRepo) Save(messages ...senml.Message) error {
pts, err := influxdata.NewBatchPoints(repo.cfg)
if err != nil {
return err
return errors.Wrap(errSaveMessage, err)
}

for _, msg := range messages {
Expand All @@ -50,12 +53,14 @@ func (repo *influxRepo) Save(messages ...senml.Message) error {

pt, err := influxdata.NewPoint(pointName, tgs, flds, t)
if err != nil {
return err
return errors.Wrap(errSaveMessage, err)
}
pts.AddPoint(pt)
}

return repo.client.Write(pts)
if err := repo.client.Write(pts); err != nil {
return errors.Wrap(errSaveMessage, err)
}
return nil
}

func (repo *influxRepo) tagsOf(msg *senml.Message) tags {
Expand Down
8 changes: 7 additions & 1 deletion writers/mongodb/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ import (

"go.mongodb.org/mongo-driver/mongo"

"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/transformers/senml"
"github.com/mainflux/mainflux/writers"
)

const collectionName string = "mainflux"

var errSaveMessage = errors.New("faled to save message to mongodb database")

var _ writers.MessageRepository = (*mongoRepo)(nil)

type mongoRepo struct {
Expand Down Expand Up @@ -73,5 +76,8 @@ func (repo *mongoRepo) Save(messages ...senml.Message) error {
}

_, err := coll.InsertMany(context.Background(), msgs)
return err
if err != nil {
return errors.Wrap(errSaveMessage, err)
}
return nil
}
Loading