Skip to content

Commit

Permalink
MF-1061 - Implement InfluxDB filters value, v, vb, vs, vd, from, to (#…
Browse files Browse the repository at this point in the history
…1312)

* MF-1061 - Implement InfluxDB filters value, v, vb, vs, vd, from, to

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use time filters as float64 instead of int64

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix reviews

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Remove unnecessary cast

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use a const for limit in tests

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix typo

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Revert float64 cast when dividing

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Remove value filter in favour to v

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use v, vb, vs, vd

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use v, vb, vs, vd

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Rm unecessary cast

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
  • Loading branch information
manuio committed Dec 25, 2020
1 parent cb9985d commit b2ccbae
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 39 deletions.
46 changes: 38 additions & 8 deletions readers/api/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ const (
)

var (
v float64 = 5
stringV = "value"
boolV = true
dataV = "base64"
sum float64 = 42
v float64 = 5
vs = "value"
vb = true
vd = "dataValue"
sum float64 = 42
)

func newService() readers.MessageRepository {
Expand All @@ -49,11 +49,11 @@ func newService() readers.MessageRepository {
case 0:
msg.Value = &v
case 1:
msg.BoolValue = &boolV
msg.BoolValue = &vb
case 2:
msg.StringValue = &stringV
msg.StringValue = &vs
case 3:
msg.DataValue = &dataV
msg.DataValue = &vd
case 4:
msg.Sum = &sum
}
Expand Down Expand Up @@ -166,6 +166,36 @@ func TestReadAll(t *testing.T) {
token: token,
status: http.StatusOK,
},
"read page with value": {
url: fmt.Sprintf("%s/channels/%s/messages?v=%f", ts.URL, chanID, v),
token: token,
status: http.StatusOK,
},
"read page with boolean value": {
url: fmt.Sprintf("%s/channels/%s/messages?vb=%t", ts.URL, chanID, vb),
token: token,
status: http.StatusOK,
},
"read page with string value": {
url: fmt.Sprintf("%s/channels/%s/messages?vs=%s", ts.URL, chanID, vd),
token: token,
status: http.StatusOK,
},
"read page with data value": {
url: fmt.Sprintf("%s/channels/%s/messages?vd=%s", ts.URL, chanID, vd),
token: token,
status: http.StatusOK,
},
"read page with from": {
url: fmt.Sprintf("%s/channels/%s/messages?from=1608651539.673909", ts.URL, chanID),
token: token,
status: http.StatusOK,
},
"read page with to": {
url: fmt.Sprintf("%s/channels/%s/messages?to=1508651539.673909", ts.URL, chanID),
token: token,
status: http.StatusOK,
},
}

for desc, tc := range cases {
Expand Down
2 changes: 1 addition & 1 deletion readers/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var (
errInvalidRequest = errors.New("received invalid request")
errUnauthorizedAccess = errors.New("missing or invalid credentials provided")
auth mainflux.ThingsServiceClient
queryFields = []string{"subtopic", "publisher", "protocol", "name", "value", "v", "vs", "vb", "vd"}
queryFields = []string{"subtopic", "publisher", "protocol", "name", "v", "vs", "vb", "vd", "from", "to"}
)

// MakeHandler returns a HTTP handler for API endpoints.
Expand Down
24 changes: 24 additions & 0 deletions readers/influxdb/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,30 @@ func fmtCondition(chanID string, query map[string]string) string {
"protocol":
condition = fmt.Sprintf(`%s AND "%s"='%s'`, condition, name,
strings.Replace(value, "\"", "\\\"", -1))
case "v":
condition = fmt.Sprintf(`%s AND value = %s`, condition, value)
case "vb":
condition = fmt.Sprintf(`%s AND boolValue = %s`, condition, value)
case "vs":
condition = fmt.Sprintf(`%s AND "stringValue"='%s'`, condition,
strings.Replace(value, "\"", "\\\"", -1))
case "vd":
condition = fmt.Sprintf(`%s AND "dataValue"='%s'`, condition,
strings.Replace(value, "\"", "\\\"", -1))
case "from":
fVal, err := strconv.ParseFloat(value, 64)
if err != nil {
continue
}
iVal := int64(fVal * 1e9)
condition = fmt.Sprintf(`%s AND time >= %d`, condition, iVal)
case "to":
fVal, err := strconv.ParseFloat(value, 64)
if err != nil {
continue
}
iVal := int64(fVal * 1e9)
condition = fmt.Sprintf(`%s AND time < %d`, condition, iVal)
}
}
return condition
Expand Down
132 changes: 102 additions & 30 deletions readers/influxdb/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@ import (
)

const (
testDB = "test"
chanID = "1"
subtopic = "topic"
msgsNum = 101
testDB = "test"
chanID = "1"
subtopic = "topic"
msgsNum = 100
fromToNum = 4
msgsValNum = 20
limit = 10
)

var (
v float64 = 5
stringV = "value"
boolV = true
dataV = "base64"
sum float64 = 42
v float64 = 5
vs = "value"
vb = true
vd = "dataValue"
sum float64 = 42
)

var (
Expand Down Expand Up @@ -58,31 +61,37 @@ func TestReadAll(t *testing.T) {
writer := writer.New(client, testDB)

messages := []senml.Message{}
subtopicMsgs := []senml.Message{}
valSubtopicMsgs := []senml.Message{}
boolMsgs := []senml.Message{}
stringMsgs := []senml.Message{}
dataMsgs := []senml.Message{}

now := time.Now().UnixNano()
for i := 0; i < msgsNum; i++ {
// Mix possible values as well as value sum.
count := i % valueFields
msg := m
msg.Time = float64(now)/1e9 - float64(i)

count := i % valueFields
switch count {
case 0:
msg.Subtopic = subtopic
msg.Value = &v
valSubtopicMsgs = append(valSubtopicMsgs, msg)
case 1:
msg.BoolValue = &boolV
msg.BoolValue = &vb
boolMsgs = append(boolMsgs, msg)
case 2:
msg.StringValue = &stringV
msg.StringValue = &vs
stringMsgs = append(stringMsgs, msg)
case 3:
msg.DataValue = &dataV
msg.DataValue = &vd
dataMsgs = append(dataMsgs, msg)
case 4:
msg.Sum = &sum
}

msg.Time = float64(now)/float64(1e9) - float64(i)
messages = append(messages, msg)
if count == 0 {
subtopicMsgs = append(subtopicMsgs, msg)
}
}

err := writer.Save(messages...)
Expand All @@ -101,34 +110,34 @@ func TestReadAll(t *testing.T) {
"read message page for existing channel": {
chanID: chanID,
offset: 0,
limit: 10,
limit: limit,
page: readers.MessagesPage{
Total: msgsNum,
Offset: 0,
Limit: 10,
Messages: messages[0:10],
Limit: limit,
Messages: messages[0:limit],
},
},
"read message page for non-existent channel": {
chanID: "2",
offset: 0,
limit: 10,
limit: limit,
page: readers.MessagesPage{
Total: 0,
Offset: 0,
Limit: 10,
Limit: limit,
Messages: []senml.Message{},
},
},
"read message last page": {
chanID: chanID,
offset: 95,
limit: 10,
limit: limit,
page: readers.MessagesPage{
Total: msgsNum,
Offset: 95,
Limit: 10,
Messages: messages[95:101],
Limit: limit,
Messages: messages[95:msgsNum],
},
},
"read message with non-existent subtopic": {
Expand All @@ -146,13 +155,76 @@ func TestReadAll(t *testing.T) {
"read message with subtopic": {
chanID: chanID,
offset: 0,
limit: 10,
limit: limit,
query: map[string]string{"subtopic": subtopic},
page: readers.MessagesPage{
Total: uint64(len(subtopicMsgs)),
Total: uint64(len(valSubtopicMsgs)),
Offset: 0,
Limit: limit,
Messages: valSubtopicMsgs[0:limit],
},
},
"read message with value": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{"v": fmt.Sprintf("%f", v)},
page: readers.MessagesPage{
Total: msgsValNum,
Offset: 0,
Limit: limit,
Messages: valSubtopicMsgs[0:limit],
},
},
"read message with boolean value": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{"vb": fmt.Sprintf("%t", vb)},
page: readers.MessagesPage{
Total: msgsValNum,
Offset: 0,
Limit: limit,
Messages: boolMsgs[0:limit],
},
},
"read message with string value": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{"vs": vs},
page: readers.MessagesPage{
Total: msgsValNum,
Offset: 0,
Limit: limit,
Messages: stringMsgs[0:limit],
},
},
"read message with data value": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{"vd": vd},
page: readers.MessagesPage{
Total: msgsValNum,
Offset: 0,
Limit: limit,
Messages: dataMsgs[0:limit],
},
},
"read message with from/to": {
chanID: chanID,
offset: 0,
limit: limit,
query: map[string]string{
"from": fmt.Sprintf("%f", messages[fromToNum].Time),
"to": fmt.Sprintf("%f", messages[0].Time),
},
page: readers.MessagesPage{
Total: fromToNum,
Offset: 0,
Limit: 10,
Messages: subtopicMsgs[0:10],
Limit: limit,
Messages: messages[1:5],
},
},
}
Expand Down
48 changes: 48 additions & 0 deletions readers/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ paths:
- $ref: "#/components/parameters/Offset"
- $ref: "#/components/parameters/Publisher"
- $ref: "#/components/parameters/Name"
- $ref: "#/components/parameters/Value"
- $ref: "#/components/parameters/BoolValue"
- $ref: "#/components/parameters/StringValue"
- $ref: "#/components/parameters/DataValue"
- $ref: "#/components/parameters/From"
- $ref: "#/components/parameters/To"
responses:
'200':
$ref: "#/components/responses/MessagesPageRes"
Expand Down Expand Up @@ -140,6 +146,48 @@ components:
schema:
type: string
required: false
Value:
name: v
description: SenML message value.
in: query
schema:
type: string
required: false
BoolValue:
name: vb
description: SenML message bool value.
in: query
schema:
type: boolean
required: false
StringValue:
name: vs
description: SenML message string value.
in: query
schema:
type: string
required: false
DataValue:
name: vd
description: SenML message data value.
in: query
schema:
type: string
required: false
From:
name: from
description: SenML message time in nanoseconds (integer part represents seconds).
in: query
schema:
type: float
required: false
To:
name: to
description: SenML message time in nanoseconds (integer part represents seconds).
in: query
schema:
type: float
required: false

responses:
MessagesPageRes:
Expand Down

0 comments on commit b2ccbae

Please sign in to comment.