Skip to content

Commit

Permalink
MF-950 - Runtime error in normalizer - CBOR SenML (#974)
Browse files Browse the repository at this point in the history
* Add Transformer tests

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix readers and writers

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Fix README typo

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Tidy vendor

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove link field from docs

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
  • Loading branch information
dborovcanin authored and anovakovic01 committed Nov 29, 2019
1 parent 5edd797 commit 54168fa
Show file tree
Hide file tree
Showing 100 changed files with 5,348 additions and 54,391 deletions.
8 changes: 4 additions & 4 deletions coap/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,9 @@ func handleMessage(conn *net.UDPConn, addr *net.UDPAddr, o *coap.Observer, msg *

coapCT := senMLJSON
switch msg.ContentType {
case senml.SenMLJSON:
case senml.JSON:
coapCT = senMLJSON
case senml.SenMLCBOR:
case senml.CBOR:
coapCT = senMLCBOR
}
notifyMsg.SetOption(gocoap.ContentFormat, coapCT)
Expand Down Expand Up @@ -396,9 +396,9 @@ func contentType(msg *gocoap.Message) (string, error) {
ct := ""
switch ctid {
case senMLJSON:
ct = senml.SenMLJSON
ct = senml.JSON
case senMLCBOR:
ct = senml.SenMLCBOR
ct = senml.CBOR
}

return ct, nil
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/Microsoft/go-winio v0.4.7 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/cenkalti/backoff v2.0.0+incompatible // indirect
github.com/cisco/senml v0.0.0-20181031221301-910a55054e16
github.com/containerd/continuity v0.0.0-20180416230128-c6cef3483023 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/docker/docker v1.13.1
Expand All @@ -32,6 +31,7 @@ require (
github.com/influxdata/influxdb v1.6.4
github.com/jmoiron/sqlx v1.2.1-0.20190319043955-cdf62fdf55f6
github.com/lib/pq v1.0.0
github.com/mainflux/senml v1.0.0
github.com/mattn/go-colorable v0.0.9 // indirect
github.com/mattn/go-isatty v0.0.3 // indirect
github.com/nats-io/go-nats v1.6.0
Expand All @@ -49,7 +49,7 @@ require (
github.com/spf13/cobra v0.0.3
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.5.0
github.com/stretchr/testify v1.2.2
github.com/stretchr/testify v1.4.0
github.com/uber/jaeger-client-go v2.16.0+incompatible
github.com/uber/jaeger-lib v2.0.0+incompatible // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fxamacker/cbor v1.3.2 h1:jMCvPyzpTVWoe1jRDUFPupVoV+DzDvnc1VP+9VU4ql8=
github.com/fxamacker/cbor v1.3.2/go.mod h1:Uy2lR31/2WfmW0yiA4i3t+we5kF3B/wzKsttcux+i/g=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-kit/kit v0.7.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0=
Expand Down Expand Up @@ -128,6 +130,12 @@ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mainflux/senml v0.0.0-20191128185934-061491bfd7d6 h1:l5qGnZOGbiVnuclQYib6QiiQ2wA0k5vRdVdiJrTUsjw=
github.com/mainflux/senml v0.0.0-20191128185934-061491bfd7d6/go.mod h1:g9i8pj4WMs29KkUpXivbe/PP0qJd1kt3b1CF77S8A3s=
github.com/mainflux/senml v0.0.0-20191129155045-8e0c95e00589 h1:xoR2XyJGqBlReRXzABImJobL6l1CkgdUfJxQANvAq6A=
github.com/mainflux/senml v0.0.0-20191129155045-8e0c95e00589/go.mod h1:g9i8pj4WMs29KkUpXivbe/PP0qJd1kt3b1CF77S8A3s=
github.com/mainflux/senml v1.0.0 h1:oLS5aBhvdHjgQ8kfq3jX7yD+DaquhvpyvIWNsPil3X0=
github.com/mainflux/senml v1.0.0/go.mod h1:g9i8pj4WMs29KkUpXivbe/PP0qJd1kt3b1CF77S8A3s=
github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI=
Expand Down Expand Up @@ -206,9 +214,12 @@ github.com/spf13/viper v1.4.0 h1:yXHLWeravcrgGyFSyCgdYpXQ9dR9c/WED3pg1RhxqEU=
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/spf13/viper v1.5.0 h1:GpsTwfsQ27oS/Aha/6d1oD7tpKIqWnOA6tgOX9HHkt4=
github.com/spf13/viper v1.5.0/go.mod h1:AkYRkVJF8TkSG/xet6PzXX+l39KhhXa2pdqVSxnTcn4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand All @@ -219,6 +230,8 @@ github.com/uber/jaeger-lib v2.0.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6
github.com/ugorji/go v1.1.1/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=
github.com/ugorji/go v1.1.4 h1:j4s+tAvLfL3bZyefP2SEWmhBzmuIlH/eqNuPdFPgngw=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/x448/senml v0.2.0 h1:W6P7p4bA8ZvsEkl6+f7kjoc9n5vPHBYL5V6kGJkyDy4=
github.com/x448/senml v0.2.0/go.mod h1:dAsCrOGmYEKO/L/DUpZpinKEgI1PCxAUjt2eAwXCDx8=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.1-0.20180714160509-73f8eece6fdc h1:vIp1tjhVogU0yBy7w96P027ewvNPeH6gzuNcoc+NReU=
Expand Down
4 changes: 2 additions & 2 deletions readers/cassandra/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query
var msg senml.Message
err := scanner.Scan(&msg.Channel, &msg.Subtopic, &msg.Publisher, &msg.Protocol,
&msg.Name, &msg.Unit, &msg.Value, &msg.StringValue, &msg.BoolValue,
&msg.DataValue, &msg.Sum, &msg.Time, &msg.UpdateTime, &msg.Link)
&msg.DataValue, &msg.Sum, &msg.Time, &msg.UpdateTime)
if err != nil {
return readers.MessagesPage{}, err
}
Expand All @@ -75,7 +75,7 @@ func buildSelectQuery(chanID string, offset, limit uint64, names []string) strin
var condCQL string
cql := `SELECT channel, subtopic, publisher, protocol, name, unit,
value, string_value, bool_value, data_value, sum, time,
update_time, link FROM messages WHERE channel = ? %s LIMIT ?
update_time FROM messages WHERE channel = ? %s LIMIT ?
ALLOW FILTERING`

for _, name := range names {
Expand Down
1 change: 0 additions & 1 deletion readers/influxdb/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ var (
Unit: "U",
Time: 123456,
UpdateTime: 1234,
Link: "link",
}
)

Expand Down
2 changes: 0 additions & 2 deletions readers/mongodb/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type message struct {
Sum *float64 `bson:"sum,omitempty"`
Time float64 `bson:"time,omitempty"`
UpdateTime float64 `bson:"updateTime,omitempty"`
Link string `bson:"link,omitempty"`
}

// New returns new MongoDB reader.
Expand Down Expand Up @@ -75,7 +74,6 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m
Unit: m.Unit,
Time: m.Time,
UpdateTime: m.UpdateTime,
Link: m.Link,
Sum: m.Sum,
}

Expand Down
1 change: 0 additions & 1 deletion readers/postgres/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func migrateDB(db *sqlx.DB) error {
sum FLOAT,
time FlOAT,
update_time FLOAT,
link TEXT,
PRIMARY KEY (id)
)`,
},
Expand Down
2 changes: 0 additions & 2 deletions readers/postgres/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type dbMessage struct {
Sum *float64 `db:"sum"`
Time float64 `db:"time"`
UpdateTime float64 `db:"update_time"`
Link string `db:"link"`
}

func toMessage(dbm dbMessage) senml.Message {
Expand All @@ -123,7 +122,6 @@ func toMessage(dbm dbMessage) senml.Message {
Unit: dbm.Unit,
Time: dbm.Time,
UpdateTime: dbm.UpdateTime,
Link: dbm.Link,
Sum: dbm.Sum,
}

Expand Down
2 changes: 0 additions & 2 deletions readers/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ definitions:
updateTime:
type: number
description: Time of updating measurement.
link:
type: string

parameters:
Authorization:
Expand Down
4 changes: 2 additions & 2 deletions transformers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

Transformers services consume events published by adapters and transform them to any other message format.
They be imported as a standalone package and used for message transformation on the consumer side.
Mainflux (SenML transformer)[transformers] is an example of Transformer service for SenML messages.
Mainflux (writers) [writers] are using a standalone SenML transformer to preprocess messages before storing them.
Mainflux [SenML transformer](transformer) is an example of Transformer service for SenML messages.
Mainflux [writers](writers) are using a standalone SenML transformer to preprocess messages before storing them.

[transformers]: https://github.com/mainflux/mainflux/tree/master/transformers/senml
[writers]: https://github.com/mainflux/mainflux/tree/master/writers
9 changes: 4 additions & 5 deletions transformers/senml/message.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package senml

const (
// SenMLJSON represents SenML in JSON format content type.
SenMLJSON = "application/senml+json"
// JSON represents SenML in JSON format content type.
JSON = "application/senml+json"

// SenMLCBOR represents SenML in CBOR format content type.
SenMLCBOR = "application/senml+cbor"
// CBOR represents SenML in CBOR format content type.
CBOR = "application/senml+cbor"
)

// Message represents a resolved (normalized) SenML record.
Expand All @@ -14,7 +14,6 @@ type Message struct {
Subtopic string `json:"subtopic,omitempty"`
Publisher string `json:"publisher,omitempty"`
Protocol string `json:"protocol,omitempty"`
Link string `json:"link,omitempty"`
Name string `json:"name,omitempty"`
Unit string `json:"unit,omitempty"`
Time float64 `json:"time,omitempty"`
Expand Down
51 changes: 22 additions & 29 deletions transformers/senml/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
package senml

import (
"github.com/cisco/senml"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/transformers"
"github.com/mainflux/senml"
)

var formats = map[string]senml.Format{
SenMLJSON: senml.JSON,
SenMLCBOR: senml.CBOR,
JSON: senml.JSON,
CBOR: senml.CBOR,
}

type transformer struct{}
Expand All @@ -32,35 +32,28 @@ func (n transformer) Transform(msg mainflux.Message) (interface{}, error) {
return nil, err
}

normalized := senml.Normalize(raw)
normalized, err := senml.Normalize(raw)
if err != nil {
return nil, err
}

msgs := make([]Message, len(normalized.Records))
for k, v := range normalized.Records {
m := Message{
Channel: msg.Channel,
Subtopic: msg.Subtopic,
Publisher: msg.Publisher,
Protocol: msg.Protocol,
Name: v.Name,
Unit: v.Unit,
Time: v.Time,
UpdateTime: v.UpdateTime,
Link: v.Link,
Sum: v.Sum,
for i, v := range normalized.Records {
msgs[i] = Message{
Channel: msg.Channel,
Subtopic: msg.Subtopic,
Publisher: msg.Publisher,
Protocol: msg.Protocol,
Name: v.Name,
Unit: v.Unit,
Time: v.Time,
UpdateTime: v.UpdateTime,
Value: v.Value,
BoolValue: v.BoolValue,
DataValue: v.DataValue,
StringValue: v.StringValue,
Sum: v.Sum,
}

switch {
case v.Value != nil:
m.Value = v.Value
case v.BoolValue != nil:
m.BoolValue = v.BoolValue
case v.DataValue != "":
m.DataValue = &v.DataValue
case v.StringValue != "":
m.StringValue = &v.StringValue
}

msgs[k] = m
}

return msgs, nil
Expand Down
107 changes: 107 additions & 0 deletions transformers/senml/transformer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

package senml_test

import (
"encoding/hex"
"fmt"
"testing"

"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/transformers/senml"
mfsenml "github.com/mainflux/senml"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestTransform(t *testing.T) {
// Following hex-encoded bytes correspond to the content of:
// [{-2: "base-name", -3: 100.0, -4: "base-unit", -1: 10, -5: 10.0, -6: 100.0, 0: "name", 1: "unit", 6: 300.0, 7: 150.0, 2: 42.0, 5: 10.0}]
// For more details for mapping SenML labels to integers, please take a look here: https://tools.ietf.org/html/rfc8428#page-19.
cborBytes, err := hex.DecodeString("81ac2169626173652d6e616d6522fb40590000000000002369626173652d756e6974200a24fb402400000000000025fb405900000000000000646e616d650164756e697406fb4072c0000000000007fb4062c0000000000002fb404500000000000005fb4024000000000000")
require.Nil(t, err, "Decoding CBOR expected to succeed")

jsonBytes, err := hex.DecodeString("5b7b22626e223a22626173652d6e616d65222c226274223a3130302c226275223a22626173652d756e6974222c2262766572223a31302c226276223a31302c226273223a3130302c226e223a226e616d65222c2275223a22756e6974222c2274223a3330302c227574223a3135302c2276223a34322c2273223a31307d5d")
require.Nil(t, err, "Decoding CBOR expected to succeed")

tooManyBytes, err := hex.DecodeString("82AD2169626173652D6E616D6522F956402369626173652D756E6974200A24F9490025F9564000646E616D650164756E697406F95CB0036331323307F958B002F9514005F94900AA2169626173652D6E616D6522F956402369626173652D756E6974200A24F9490025F9564000646E616D6506F95CB007F958B005F94900")
require.Nil(t, err, "Decoding CBOR expected to succeed")

tr := senml.New()
msg := mainflux.Message{
Channel: "channel",
Subtopic: "subtopic",
Publisher: "publisher",
Protocol: "protocol",
Payload: jsonBytes,
}

// 82AD2169626173652D6E616D6522F956402369626173652D756E6974200A24F9490025F9564000646E616D650164756E697406F95CB0036331323307F958B002F9514005F94900AA2169626173652D6E616D6522F956402369626173652D756E6974200A24F9490025F9564000646E616D6506F95CB007F958B005F94900

jsonPld := msg
jsonPld.ContentType = senml.JSON
jsonPld.Payload = jsonBytes

cborPld := msg
cborPld.ContentType = senml.CBOR
cborPld.Payload = cborBytes

tooManyMsg := msg
tooManyMsg.ContentType = senml.CBOR
tooManyMsg.Payload = tooManyBytes

val := 52.0
sum := 110.0
msgs := []senml.Message{senml.Message{
Channel: "channel",
Subtopic: "subtopic",
Publisher: "publisher",
Protocol: "protocol",
Name: "base-namename",
Unit: "unit",
Time: 400,
UpdateTime: 150,
Value: &val,
Sum: &sum,
},
}

cases := []struct {
desc string
msg mainflux.Message
msgs interface{}
err error
}{
{
desc: "test normalize CBOR",
msg: cborPld,
msgs: msgs,
err: nil,
},
{
desc: "test normalize JSON",
msg: jsonPld,
msgs: msgs,
err: nil,
},
{
desc: "test normalize defaults to JSON",
msg: msg,
msgs: msgs,
err: nil,
},
{
desc: "test invalid payload",
msg: tooManyMsg,
msgs: nil,
err: mfsenml.ErrTooManyValues,
},
}

for _, tc := range cases {
msgs, err := tr.Transform(tc.msg)
assert.Equal(t, tc.msgs, msgs, fmt.Sprintf("%s expected %v, got %v", tc.desc, tc.msgs, msgs))
assert.Equal(t, tc.err, err, fmt.Sprintf("%s expected %s, got %s", tc.desc, tc.err, err))
}
}
Loading

0 comments on commit 54168fa

Please sign in to comment.