From b3c745dde695cd1f95620bb74508e7d2e86dc52c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Milo=C5=A1evi=C4=87?= Date: Mon, 13 Apr 2020 01:56:50 +0200 Subject: [PATCH 1/5] Implement errors package in senml transformer, readers and writers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ivan Milošević --- cmd/postgres-reader/main.go | 8 ++++---- readers/api/responses.go | 4 ++++ readers/api/transport.go | 17 ++++++++++++----- readers/cassandra/messages.go | 7 +++++-- readers/influxdb/messages.go | 9 ++++++--- readers/mongodb/messages.go | 9 ++++++--- readers/postgres/messages.go | 13 ++++++++----- transformers/senml/transformer.go | 10 ++++++++-- transformers/senml/transformer_test.go | 3 ++- writers/cassandra/messages.go | 5 ++++- writers/influxdb/messages.go | 13 +++++++++---- writers/mongodb/messages.go | 8 +++++++- writers/postgres/messages.go | 26 +++++++++++++++----------- 13 files changed, 90 insertions(+), 42 deletions(-) diff --git a/cmd/postgres-reader/main.go b/cmd/postgres-reader/main.go index 4e42cbfc10..7938511ab1 100644 --- a/cmd/postgres-reader/main.go +++ b/cmd/postgres-reader/main.go @@ -31,7 +31,7 @@ import ( ) const ( - svcName = "postgres-writer" + svcName = "postgres-reader" sep = "," defLogLevel = "error" @@ -112,7 +112,7 @@ func main() { }() err = <-errs - logger.Error(fmt.Sprintf("Postgres writer service terminated: %s", err)) + logger.Error(fmt.Sprintf("Postgres reader service terminated: %s", err)) } func loadConfig() config { @@ -214,13 +214,13 @@ func newService(db *sqlx.DB, logger logger.Logger) readers.MessageRepository { svc, kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: "postgres", - Subsystem: "message_writer", + Subsystem: "message_reader", Name: "request_count", Help: "Number of requests received.", }, []string{"method"}), kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ Namespace: "postgres", - Subsystem: "message_writer", + Subsystem: "message_reader", Name: "request_latency_microseconds", Help: "Total duration of requests in microseconds.", }, []string{"method"}), diff --git a/readers/api/responses.go b/readers/api/responses.go index 7214fb8630..8ec5d86506 100644 --- a/readers/api/responses.go +++ b/readers/api/responses.go @@ -30,3 +30,7 @@ func (res pageRes) Code() int { func (res pageRes) Empty() bool { return false } + +type errorRes struct { + Err string `json:"error"` +} diff --git a/readers/api/transport.go b/readers/api/transport.go index 01d97b0993..bad445dcba 100644 --- a/readers/api/transport.go +++ b/readers/api/transport.go @@ -6,7 +6,6 @@ package api import ( "context" "encoding/json" - "errors" "net/http" "strconv" "time" @@ -14,6 +13,7 @@ import ( kithttp "github.com/go-kit/kit/transport/http" "github.com/go-zoo/bone" "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/readers" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc/codes" @@ -111,15 +111,22 @@ func encodeResponse(_ context.Context, w http.ResponseWriter, response interface } func encodeError(_ context.Context, err error, w http.ResponseWriter) { - switch err { - case nil: - case errInvalidRequest: + switch { + case errors.Contains(err, nil): + case errors.Contains(err, errInvalidRequest): w.WriteHeader(http.StatusBadRequest) - case errUnauthorizedAccess: + case errors.Contains(err, errUnauthorizedAccess): w.WriteHeader(http.StatusForbidden) default: w.WriteHeader(http.StatusInternalServerError) } + errorVal, ok := err.(errors.Error) + if ok { + w.Header().Set("Content-Type", contentType) + if err := json.NewEncoder(w).Encode(errorRes{Err: errorVal.Msg()}); err != nil { + w.WriteHeader(http.StatusInternalServerError) + } + } } func authorize(r *http.Request, chanID string) error { diff --git a/readers/cassandra/messages.go b/readers/cassandra/messages.go index 584d25cd29..0dd6222c83 100644 --- a/readers/cassandra/messages.go +++ b/readers/cassandra/messages.go @@ -7,10 +7,13 @@ import ( "fmt" "github.com/gocql/gocql" + "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/readers" "github.com/mainflux/mainflux/transformers/senml" ) +var errReadMessages = errors.New("faled to read messages from cassandra database") + var _ readers.MessageRepository = (*cassandraRepository)(nil) type cassandraRepository struct { @@ -59,13 +62,13 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query &msg.Name, &msg.Unit, &msg.Value, &msg.StringValue, &msg.BoolValue, &msg.DataValue, &msg.Sum, &msg.Time, &msg.UpdateTime) if err != nil { - return readers.MessagesPage{}, err + return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } page.Messages = append(page.Messages, msg) } if err := cr.session.Query(countCQL, vals[:len(vals)-1]...).Scan(&page.Total); err != nil { - return readers.MessagesPage{}, err + return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } return page, nil diff --git a/readers/influxdb/messages.go b/readers/influxdb/messages.go index 43146cf8b6..59e866850f 100644 --- a/readers/influxdb/messages.go +++ b/readers/influxdb/messages.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/readers" influxdata "github.com/influxdata/influxdb/client/v2" @@ -16,6 +17,8 @@ import ( const countCol = "count" +var errReadMessages = errors.New("faled to read messages from influxdb database") + var _ readers.MessageRepository = (*influxRepository)(nil) type influxRepository struct { @@ -43,10 +46,10 @@ func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query resp, err := repo.client.Query(q) if err != nil { - return readers.MessagesPage{}, err + return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } if resp.Error() != nil { - return readers.MessagesPage{}, resp.Error() + return readers.MessagesPage{}, errors.Wrap(errReadMessages, resp.Error()) } if len(resp.Results) < 1 || len(resp.Results[0].Series) < 1 { @@ -60,7 +63,7 @@ func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query total, err := repo.count(condition) if err != nil { - return readers.MessagesPage{}, err + return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } return readers.MessagesPage{ diff --git a/readers/mongodb/messages.go b/readers/mongodb/messages.go index 227313f7ab..846e724939 100644 --- a/readers/mongodb/messages.go +++ b/readers/mongodb/messages.go @@ -6,6 +6,7 @@ package mongodb import ( "context" + "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/readers" "github.com/mainflux/mainflux/transformers/senml" "go.mongodb.org/mongo-driver/bson" @@ -15,6 +16,8 @@ import ( const collection = "mainflux" +var errReadMessages = errors.New("faled to read messages from mongodb database") + var _ readers.MessageRepository = (*mongoRepository)(nil) type mongoRepository struct { @@ -54,7 +57,7 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m filter := fmtCondition(chanID, query) cursor, err := col.Find(context.Background(), filter, options.Find().SetSort(sortMap).SetLimit(int64(limit)).SetSkip(int64(offset))) if err != nil { - return readers.MessagesPage{}, err + return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } defer cursor.Close(context.Background()) @@ -62,7 +65,7 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m for cursor.Next(context.Background()) { var m message if err := cursor.Decode(&m); err != nil { - return readers.MessagesPage{}, err + return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } msg := senml.Message{ @@ -93,7 +96,7 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m total, err := col.CountDocuments(context.Background(), filter) if err != nil { - return readers.MessagesPage{}, err + return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } if total < 0 { return readers.MessagesPage{}, nil diff --git a/readers/postgres/messages.go b/readers/postgres/messages.go index cfaf9b883c..cce994ffce 100644 --- a/readers/postgres/messages.go +++ b/readers/postgres/messages.go @@ -4,17 +4,20 @@ package postgres import ( - "errors" "fmt" "github.com/jmoiron/sqlx" // required for DB access + "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/readers" "github.com/mainflux/mainflux/transformers/senml" ) const errInvalid = "invalid_text_representation" -var errInvalidMessage = errors.New("invalid message representation") +var ( + errInvalidMessage = errors.New("invalid message representation") + errReadMessages = errors.New("faled to read messages from postgres database") +) var _ readers.MessageRepository = (*postgresRepository)(nil) @@ -46,7 +49,7 @@ func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query rows, err := tr.db.NamedQuery(q, params) if err != nil { - return readers.MessagesPage{}, err + return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } defer rows.Close() @@ -58,7 +61,7 @@ func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query for rows.Next() { dbm := dbMessage{Channel: chanID} if err := rows.StructScan(&dbm); err != nil { - return readers.MessagesPage{}, err + return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } msg := toMessage(dbm) @@ -74,7 +77,7 @@ func (tr postgresRepository) ReadAll(chanID string, offset, limit uint64, query } if err := tr.db.QueryRow(q, qParams...).Scan(&page.Total); err != nil { - return readers.MessagesPage{}, err + return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } return page, nil diff --git a/transformers/senml/transformer.go b/transformers/senml/transformer.go index a1eef8772f..7bb1602fac 100644 --- a/transformers/senml/transformer.go +++ b/transformers/senml/transformer.go @@ -5,10 +5,16 @@ package senml import ( "github.com/mainflux/mainflux/broker" + "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/transformers" "github.com/mainflux/senml" ) +var ( + errDecode = errors.New("failed to decode SenML") + errNormalize = errors.New("faled to normalize SenML") +) + var formats = map[string]senml.Format{ JSON: senml.JSON, CBOR: senml.CBOR, @@ -29,12 +35,12 @@ func (n transformer) Transform(msg broker.Message) (interface{}, error) { raw, err := senml.Decode(msg.Payload, format) if err != nil { - return nil, err + return nil, errors.Wrap(errDecode, err) } normalized, err := senml.Normalize(raw) if err != nil { - return nil, err + return nil, errors.Wrap(errNormalize, err) } msgs := make([]Message, len(normalized.Records)) diff --git a/transformers/senml/transformer_test.go b/transformers/senml/transformer_test.go index bf6916eca5..fdfdc3c2c4 100644 --- a/transformers/senml/transformer_test.go +++ b/transformers/senml/transformer_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/mainflux/mainflux/broker" + "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/transformers/senml" mfsenml "github.com/mainflux/senml" "github.com/stretchr/testify/assert" @@ -102,6 +103,6 @@ func TestTransform(t *testing.T) { for _, tc := range cases { msgs, err := tr.Transform(tc.msg) assert.Equal(t, tc.msgs, msgs, fmt.Sprintf("%s expected %v, got %v", tc.desc, tc.msgs, msgs)) - assert.Equal(t, tc.err, err, fmt.Sprintf("%s expected %s, got %s", tc.desc, tc.err, err)) + assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s expected %s, got %s", tc.desc, tc.err, err)) } } diff --git a/writers/cassandra/messages.go b/writers/cassandra/messages.go index 1227208977..259e814a71 100644 --- a/writers/cassandra/messages.go +++ b/writers/cassandra/messages.go @@ -5,10 +5,13 @@ package cassandra import ( "github.com/gocql/gocql" + "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/transformers/senml" "github.com/mainflux/mainflux/writers" ) +var errSaveMessage = errors.New("faled to save message to cassandra database") + var _ writers.MessageRepository = (*cassandraRepository)(nil) type cassandraRepository struct { @@ -32,7 +35,7 @@ func (cr *cassandraRepository) Save(messages ...senml.Message) error { msg.Protocol, msg.Name, msg.Unit, msg.Value, msg.StringValue, msg.BoolValue, msg.DataValue, msg.Sum, msg.Time, msg.UpdateTime).Exec() if err != nil { - return err + return errors.Wrap(errSaveMessage, err) } } diff --git a/writers/influxdb/messages.go b/writers/influxdb/messages.go index 4c014b4a2d..b971a9959f 100644 --- a/writers/influxdb/messages.go +++ b/writers/influxdb/messages.go @@ -8,6 +8,7 @@ import ( "strconv" "time" + "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/transformers/senml" "github.com/mainflux/mainflux/writers" @@ -16,6 +17,8 @@ import ( const pointName = "messages" +var errSaveMessage = errors.New("faled to save message to influxdb database") + var _ writers.MessageRepository = (*influxRepo)(nil) type influxRepo struct { @@ -39,7 +42,7 @@ func New(client influxdata.Client, database string) writers.MessageRepository { func (repo *influxRepo) Save(messages ...senml.Message) error { pts, err := influxdata.NewBatchPoints(repo.cfg) if err != nil { - return err + return errors.Wrap(errSaveMessage, err) } for _, msg := range messages { @@ -50,12 +53,14 @@ func (repo *influxRepo) Save(messages ...senml.Message) error { pt, err := influxdata.NewPoint(pointName, tgs, flds, t) if err != nil { - return err + return errors.Wrap(errSaveMessage, err) } pts.AddPoint(pt) } - - return repo.client.Write(pts) + if err := repo.client.Write(pts); err != nil { + return errors.Wrap(errSaveMessage, err) + } + return nil } func (repo *influxRepo) tagsOf(msg *senml.Message) tags { diff --git a/writers/mongodb/messages.go b/writers/mongodb/messages.go index 6c071fc6f2..f596603e2c 100644 --- a/writers/mongodb/messages.go +++ b/writers/mongodb/messages.go @@ -8,12 +8,15 @@ import ( "go.mongodb.org/mongo-driver/mongo" + "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/transformers/senml" "github.com/mainflux/mainflux/writers" ) const collectionName string = "mainflux" +var errSaveMessage = errors.New("faled to save message to mongodb database") + var _ writers.MessageRepository = (*mongoRepo)(nil) type mongoRepo struct { @@ -73,5 +76,8 @@ func (repo *mongoRepo) Save(messages ...senml.Message) error { } _, err := coll.InsertMany(context.Background(), msgs) - return err + if err != nil { + return errors.Wrap(errSaveMessage, err) + } + return nil } diff --git a/writers/postgres/messages.go b/writers/postgres/messages.go index 29a8d355bd..f504856cba 100644 --- a/writers/postgres/messages.go +++ b/writers/postgres/messages.go @@ -5,21 +5,23 @@ package postgres import ( "context" - "errors" "github.com/gofrs/uuid" - "github.com/jmoiron/sqlx" "github.com/lib/pq" // required for DB access + "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/transformers/senml" "github.com/mainflux/mainflux/writers" ) const errInvalid = "invalid_text_representation" -// ErrInvalidMessage indicates that service received message that -// doesn't fit required format. -var ErrInvalidMessage = errors.New("invalid message representation") +var ( + // ErrInvalidMessage indicates that service received message that + // doesn't fit required format. + ErrInvalidMessage = errors.New("invalid message representation") + errSaveMessage = errors.New("faled to save message to postgress database") +) var _ writers.MessageRepository = (*postgresRepo)(nil) @@ -42,13 +44,13 @@ func (pr postgresRepo) Save(messages ...senml.Message) error { tx, err := pr.db.BeginTxx(context.Background(), nil) if err != nil { - return err + return errors.Wrap(errSaveMessage, err) } for _, msg := range messages { dbth, err := toDBMessage(msg) if err != nil { - return err + return errors.Wrap(errSaveMessage, err) } if _, err := tx.NamedExec(q, dbth); err != nil { @@ -56,15 +58,17 @@ func (pr postgresRepo) Save(messages ...senml.Message) error { if ok { switch pqErr.Code.Name() { case errInvalid: - return ErrInvalidMessage + return errors.Wrap(errSaveMessage, ErrInvalidMessage) } } - return err + return errors.Wrap(errSaveMessage, err) } } - - return tx.Commit() + if err := tx.Commit(); err != nil { + errors.Wrap(errSaveMessage, err) + } + return nil } type dbMessage struct { From 00be125e54e055f99de31703d02fb94df6ef039c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Milo=C5=A1evi=C4=87?= Date: Mon, 13 Apr 2020 02:13:51 +0200 Subject: [PATCH 2/5] Remove unused const Return wrapped error in postgres writer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ivan Milošević --- readers/postgres/messages.go | 5 +---- writers/postgres/messages.go | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/readers/postgres/messages.go b/readers/postgres/messages.go index cce994ffce..3d355da7ad 100644 --- a/readers/postgres/messages.go +++ b/readers/postgres/messages.go @@ -14,10 +14,7 @@ import ( const errInvalid = "invalid_text_representation" -var ( - errInvalidMessage = errors.New("invalid message representation") - errReadMessages = errors.New("faled to read messages from postgres database") -) +var errReadMessages = errors.New("faled to read messages from postgres database") var _ readers.MessageRepository = (*postgresRepository)(nil) diff --git a/writers/postgres/messages.go b/writers/postgres/messages.go index f504856cba..89a5afe4b1 100644 --- a/writers/postgres/messages.go +++ b/writers/postgres/messages.go @@ -66,7 +66,7 @@ func (pr postgresRepo) Save(messages ...senml.Message) error { } } if err := tx.Commit(); err != nil { - errors.Wrap(errSaveMessage, err) + return errors.Wrap(errSaveMessage, err) } return nil } From 3c5a7dd1a64835fa59337d46bee7e52d9123db99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Milo=C5=A1evi=C4=87?= Date: Mon, 13 Apr 2020 10:40:49 +0200 Subject: [PATCH 3/5] fix default db host in postgres writer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ivan Milošević --- cmd/postgres-writer/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/postgres-writer/main.go b/cmd/postgres-writer/main.go index 42e13361c1..895c5ee678 100644 --- a/cmd/postgres-writer/main.go +++ b/cmd/postgres-writer/main.go @@ -30,7 +30,7 @@ const ( defLogLevel = "error" defNatsURL = "nats://localhost:4222" defPort = "8180" - defDBHost = "postgres" + defDBHost = "localhost" defDBPort = "5432" defDBUser = "mainflux" defDBPass = "mainflux" From 20760d943f5e170d9aababa38941533a0c221cbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Milo=C5=A1evi=C4=87?= Date: Mon, 13 Apr 2020 10:43:41 +0200 Subject: [PATCH 4/5] fix capital letters in errors messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ivan Milošević --- transformers/senml/transformer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transformers/senml/transformer.go b/transformers/senml/transformer.go index 7bb1602fac..a24a472c92 100644 --- a/transformers/senml/transformer.go +++ b/transformers/senml/transformer.go @@ -11,8 +11,8 @@ import ( ) var ( - errDecode = errors.New("failed to decode SenML") - errNormalize = errors.New("faled to normalize SenML") + errDecode = errors.New("failed to decode senml") + errNormalize = errors.New("faled to normalize senml") ) var formats = map[string]senml.Format{ From c3e334395105d8225aad20892a208142c7ced8dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Milo=C5=A1evi=C4=87?= Date: Mon, 13 Apr 2020 12:30:27 +0200 Subject: [PATCH 5/5] use svcName instead of postgres for Promethius initialization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ivan Milošević --- cmd/postgres-reader/main.go | 4 ++-- cmd/postgres-writer/main.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/postgres-reader/main.go b/cmd/postgres-reader/main.go index 7938511ab1..6cabbac392 100644 --- a/cmd/postgres-reader/main.go +++ b/cmd/postgres-reader/main.go @@ -213,13 +213,13 @@ func newService(db *sqlx.DB, logger logger.Logger) readers.MessageRepository { svc = api.MetricsMiddleware( svc, kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Namespace: "postgres", + Namespace: svcName, Subsystem: "message_reader", Name: "request_count", Help: "Number of requests received.", }, []string{"method"}), kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ - Namespace: "postgres", + Namespace: svcName, Subsystem: "message_reader", Name: "request_latency_microseconds", Help: "Total duration of requests in microseconds.", diff --git a/cmd/postgres-writer/main.go b/cmd/postgres-writer/main.go index 895c5ee678..c770072a9b 100644 --- a/cmd/postgres-writer/main.go +++ b/cmd/postgres-writer/main.go @@ -139,13 +139,13 @@ func newService(db *sqlx.DB, logger logger.Logger) writers.MessageRepository { svc = api.MetricsMiddleware( svc, kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Namespace: "postgres", + Namespace: svcName, Subsystem: "message_writer", Name: "request_count", Help: "Number of requests received.", }, []string{"method"}), kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ - Namespace: "postgres", + Namespace: svcName, Subsystem: "message_writer", Name: "request_latency_microseconds", Help: "Total duration of requests in microseconds.",