-
Notifications
You must be signed in to change notification settings - Fork 674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MF-1264 - Add support for JSON readers #1295
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1295 +/- ##
==========================================
- Coverage 60.97% 59.81% -1.16%
==========================================
Files 110 113 +3
Lines 8581 8797 +216
==========================================
+ Hits 5232 5262 +30
- Misses 2904 3078 +174
- Partials 445 457 +12
Continue to review full report at Codecov.
|
b3f9e77
to
f330638
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
readers/mongodb/messages.go
Outdated
if err := cursor.Decode(&m); err != nil { | ||
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) | ||
messages := []interface{}{} | ||
// var m interface{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the comment please
readers/mongodb/messages.go
Outdated
var m message | ||
if err := cursor.Decode(&m); err != nil { | ||
return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) | ||
messages := []interface{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use var messages []interface{}
here?
readers/influxdb/messages_test.go
Outdated
|
||
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %d got %d", desc, tc.page.Total, result.Total)) | ||
} | ||
} | ||
|
||
func fromSenml(in []senml.Message) []interface{} { | ||
ret := []interface{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var ret []interface{}
?
@@ -54,7 +54,7 @@ services: | |||
networks: | |||
- docker_mainflux-base-net | |||
volumes: | |||
- ./subjects.toml:/config/subjects.toml | |||
- ./config.toml:/config.toml |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this path ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because the path is changed from /config
to the root (/
).
readers/cassandra/messages.go
Outdated
@@ -114,3 +137,33 @@ func buildCountQuery(chanID string, names []string) string { | |||
|
|||
return fmt.Sprintf(cql, condCQL) | |||
} | |||
|
|||
func parseFlat(flat interface{}) interface{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this function is duplicated for each reader. Should we maybe export it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll probably put it in the transformers/json
as a utility function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe transformers/pkg
as shared package? Its oft to have multiple pkg
and internal
(not just in root).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put it in pkg/tranformers/json
. It's a utility function used by JSON transformer and readers, so I guess it's OK to leave it there.
if len(subs) == 0 { | ||
return nil, errors.Wrap(ErrTransform, errUnknownFormat) | ||
} | ||
format := subs[len(subs)-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if len == 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will take the first element of the subtopics slice.
pkg/transformers/json/transformer.go
Outdated
|
||
func flatten(prefix string, m, m1 map[string]interface{}) (map[string]interface{}, error) { | ||
for k, v := range m1 { | ||
if k == "publisher" || k == "protocol" || k == "channel" || k == "subtopic" || strings.Contains(k, sep) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we make appropriate constants for this?
pkg/transformers/transformer.go
Outdated
@@ -3,7 +3,9 @@ | |||
|
|||
package transformers | |||
|
|||
import "github.com/mainflux/mainflux/pkg/messaging" | |||
import ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
single import
readers/postgres/messages.go
Outdated
WHERE %s ORDER BY time DESC | ||
LIMIT :limit OFFSET :offset;`, fmtCondition(chanID, query)) | ||
LIMIT :limit OFFSET :offset;`, format, fmtCondition(chanID, query)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idedntation doesnt seem right here
readers/postgres/messages.go
Outdated
switch format { | ||
case defTable: | ||
for rows.Next() { | ||
msg := dbMessage{Message: senml.Message{Channel: chanID}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you sure this works? Channel
doesnt get overwritten?
writers/cassandra/init.go
Outdated
@@ -3,9 +3,12 @@ | |||
|
|||
package cassandra | |||
|
|||
import "github.com/gocql/gocql" | |||
import ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
single import
name, unit, value, string_value, bool_value, data_value, sum, | ||
time, update_time) | ||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` | ||
name, unit, value, string_value, bool_value, data_value, sum, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
identation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're using spaces, it's probably just an issue with the preview.
readers/influxdb/messages.go
Outdated
q := influxdata.Query{ | ||
Command: cmd, | ||
Database: repo.database, | ||
} | ||
|
||
ret := []senml.Message{} | ||
ret := []interface{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create custom type on domain level, like in transformers
and use it like []MsgPaylod
? With probably better naming :)
const ( | ||
countCol = "count_protocol" | ||
format = "format" | ||
defMeasurement = "messages" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a bit unclear (at least from the naming itself) what defMeasurement
represents, maybe leave a short comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's basically the measurement / table / collection for SenML messages. It's called messages
for the sake of backward compatibility.
readers/mongodb/messages_test.go
Outdated
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", desc, err)) | ||
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages)) | ||
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total)) | ||
} | ||
} | ||
|
||
func fromSenml(in []senml.Message) []interface{} { | ||
ret := []interface{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe var ret []interface{}
here as well? I am not bothered with this one neither, but if I remember well we aligned to a previous definition before. Please compare and align to the code in other Mainflux services.
"github.com/mainflux/mainflux/pkg/transformers/senml" | ||
"github.com/mainflux/mainflux/readers" | ||
) | ||
|
||
const errInvalid = "invalid_text_representation" | ||
|
||
const ( | ||
format = "format" | ||
defTable = "messages" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, maybe small comment for a code reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although here is really looks like you are using def
for default
, and maybe comment is not necessary.
readers/postgres/messages_test.go
Outdated
@@ -168,3 +168,11 @@ func TestMessageReadAll(t *testing.T) { | |||
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total)) | |||
} | |||
} | |||
|
|||
func fromSenml(in []senml.Message) []interface{} { | |||
ret := []interface{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here - variable definition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
f289d27
to
beb45fc
Compare
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
pkg/transformers/json/message.go
Outdated
|
||
// Messages represents a list of JSON messages. | ||
type Messages struct { | ||
Messages []Message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit eye-catchy Messages.Messages, maybe Messages.List or Messages.Data or something like that. Detail.
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
Signed-off-by: dusanb94 dusan.borovcanin@mainflux.com
What does this do?
This pull request updates the Mainflux Readers interface to match recently added support for storing arbitrary JSON data.
Which issue(s) does this PR fix/relate to?
Resolves #1264.
List any changes that modify/break current functionality
There are no breaking changes.
Have you included tests for your changes?
No. Existing tests are fixed, but new tests for reading data in a format other than SenML need to be added.
Did you document any new/modified functionality?
No. The existing documentation is sufficient at the moment.