Skip to content

Commit

Permalink
Merge branch 'master' into code-quality
Browse files Browse the repository at this point in the history
  • Loading branch information
drasko authored Jul 28, 2023
2 parents f53120d + 33eb8d8 commit 5e1120a
Show file tree
Hide file tree
Showing 14 changed files with 624 additions and 90 deletions.
14 changes: 6 additions & 8 deletions readers/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ func decodeList(_ context.Context, r *http.Request) (interface{}, error) {
return nil, err
}

vb, err := apiutil.ReadBoolQuery(r, boolValueKey, false)
if err != nil && err != errors.ErrNotFoundParam {
return nil, err
}

from, err := apiutil.ReadFloatQuery(r, fromKey, 0)
if err != nil {
return nil, err
Expand All @@ -150,19 +155,12 @@ func decodeList(_ context.Context, r *http.Request) (interface{}, error) {
Comparator: comparator,
StringValue: vs,
DataValue: vd,
BoolValue: vb,
From: from,
To: to,
},
}

vb, err := apiutil.ReadBoolQuery(r, boolValueKey, false)
if err != nil && err != errors.ErrNotFoundParam {
return nil, err
}
if err == nil {
req.pageMeta.BoolValue = vb
}

return req, nil
}

Expand Down
4 changes: 4 additions & 0 deletions readers/cassandra/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,8 @@ docker-compose -f docker/addons/casandra-reader/docker-compose.yml up -d
Service exposes [HTTP API](https://api.mainflux.io/?urls.primaryName=readers-openapi.yml) for fetching messages.
```
Note: Cassandra Reader doesn't support searching substrings from string_value, due to inefficient searching as the current data model is not suitable for this type of queries.
```
[doc]: https://docs.mainflux.io
4 changes: 2 additions & 2 deletions readers/cassandra/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ const (
var (
addr = "localhost"
v float64 = 5
vs = "value"
vs = "stringValue"
vb = true
vd = "base64"
vd = "dataValue"
sum float64 = 42

idProvider = uuid.New()
Expand Down
27 changes: 18 additions & 9 deletions readers/influxdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.

| Variable | Description | Default |
|------------------------------|-----------------------------------------------------|----------------|
| MF_INFLUX_READER_LOG_LEVEL | Service log level | info |
| MF_INFLUX_READER_PORT | Service HTTP port | 9005 |
| MF_INFLUXDB_HOST | InfluxDB host | localhost |
| MF_INFLUXDB_PORT | Default port of InfluxDB database | 8086 |
| MF_INFLUXDB_ADMIN_USER | Default user of InfluxDB database | mainflux |
| MF_INFLUXDB_ADMIN_PASSWORD | Default password of InfluxDB user | mainflux |
| MF_INFLUXDB_DB | InfluxDB database name | mainflux |
| Variable | Description | Default |
|------------------------------|-----------------------------------------------------|-------------------|
| MF_INFLUX_READER_LOG_LEVEL | Service log level | info |
| MF_INFLUX_READER_PORT | Service HTTP port | 9005 |
| MF_INFLUXDB_HOST | InfluxDB host | localhost |
| MF_INFLUXDB_PORT | Default port of InfluxDB database | 8086 |
| MF_INFLUXDB_ADMIN_USER | Default user of InfluxDB database | mainflux |
| MF_INFLUXDB_ADMIN_PASSWORD | Default password of InfluxDB user | mainflux |
| MF_INFLUXDB_DB | InfluxDB database name | mainflux |
| MF_INFLUXDB_HOST | InfluxDB host name | mainflux-influxdb |
| MF_INFLUXDB_PROTOCOL | InfluxDB protocol | http |
| MF_INFLUXDB_TIMEOUT | InfluxDB client connection readiness timeout | 1s |
Expand Down Expand Up @@ -102,4 +102,13 @@ docker-compose -f docker/addons/influxdb-reader/docker-compose.yml up --env-file
Service exposes [HTTP API](https://api.mainflux.io/?urls.primaryName=readers-openapi.yml) for fetching messages.
Comparator Usage Guide:
| Comparator | Usage | Example |
|----------------------|-----------------------------------------------------------------------------|------------------------------------|
| eq | Return values that are equal to the query | eq["active"] -> "active" |
| ge | Return values that are substrings of the query | ge["tiv"] -> "active" and "tiv" |
| gt | Return values that are substrings of the query and not equal to the query | gt["tiv"] -> "active" |
| le | Return values that are superstrings of the query | le["active"] -> "tiv" |
| lt | Return values that are superstrings of the query and not equal to the query | lt["active"] -> "active" and "tiv" |
Official docs can be found [here](https://docs.mainflux.io).
33 changes: 28 additions & 5 deletions readers/influxdb/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (

queryAPI := repo.client.QueryAPI(repo.cfg.Org)
condition, timeRange := fmtCondition(chanID, rpm)

query := fmt.Sprintf(`
import "influxdata/influxdb/v1" from(bucket: "%s")
import "influxdata/influxdb/v1"
import "strings"
from(bucket: "%s")
%s
|> v1.fieldsAsCols()
|> group()
Expand Down Expand Up @@ -108,7 +111,9 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (

func (repo *influxRepository) count(measurement, condition string, timeRange string) (uint64, error) {
cmd := fmt.Sprintf(`
import "influxdata/influxdb/v1" from(bucket: "%s")
import "influxdata/influxdb/v1"
import "strings"
from(bucket: "%s")
%s
|> v1.fieldsAsCols()
|> filter(fn: (r) => r._measurement == "%s")
Expand All @@ -122,8 +127,8 @@ func (repo *influxRepository) count(measurement, condition string, timeRange str
measurement,
condition)
queryAPI := repo.client.QueryAPI(repo.cfg.Org)

resp, err := queryAPI.Query(context.Background(), cmd)

if err != nil {
return 0, err
}
Expand Down Expand Up @@ -197,11 +202,29 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) (string, string) {
sb.WriteString(`|> filter(fn: (r) => exists r.boolValue)`)
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => r.boolValue == %v)`, value))
case "vs":
comparator := readers.ParseValueComparator(query)
sb.WriteString(`|> filter(fn: (r) => exists r.stringValue)`)
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => r.stringValue == "%s")`, value))
switch comparator {
case "=":
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => r.stringValue == "%s")`, value))
case "<":
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => strings.containsStr(v: "%s", substr: r.stringValue) == true)`, value))
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => r.stringValue !="%s")`, value))
case "<=":
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => strings.containsStr(v: "%s", substr: r.stringValue) == true)`, value))
case ">":
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => strings.containsStr(v: r.stringValue, substr: "%s") == true)`, value))
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => r.stringValue != "%s")`, value))
case ">=":
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => strings.containsStr(v: r.stringValue, substr: "%s") == true)`, value))
}
case "vd":
comparator := readers.ParseValueComparator(query)
if comparator == "=" {
comparator = "=="
}
sb.WriteString(`|> filter(fn: (r) => exists r.dataValue)`)
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => r.dataValue == "%s")`, value))
sb.WriteString(fmt.Sprintf(`|> filter(fn: (r) => r.dataValue%s"%s")`, comparator, value))
}
}

Expand Down
160 changes: 156 additions & 4 deletions readers/influxdb/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (

var (
v float64 = 5
vs string = "a"
vs string = "stringValue"
vb bool = true
vd string = "dataValue"
sum float64 = 42
Expand Down Expand Up @@ -349,6 +349,76 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: vs,
Comparator: readers.EqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and lower-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: "a stringValues b",
Comparator: readers.LowerThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and lower-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: vs,
Comparator: readers.LowerThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and greater-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: "alu",
Comparator: readers.GreaterThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with string value and greater-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
StringValue: vs,
Comparator: readers.GreaterThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(stringMsgs)),
Messages: fromSenml(stringMsgs[0:limit]),
},
},
{
desc: "read message with data value",
chanID: chanID,
Expand All @@ -362,6 +432,88 @@ func TestReadSenml(t *testing.T) {
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with data value and lower-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd + string(rune(1)),
Comparator: readers.LowerThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with data value and lower-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd + string(rune(1)),
Comparator: readers.LowerThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with data value and greater-than comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd[:len(vd)-1] + string(rune(1)),
Comparator: readers.GreaterThanKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with data value and greater-than-or-equal comparator",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: limit,
DataValue: vd[:len(vd)-1] + string(rune(1)),
Comparator: readers.GreaterThanEqualKey,
},
page: readers.MessagesPage{
Total: uint64(len(dataMsgs)),
Messages: fromSenml(dataMsgs[0:limit]),
},
},
{
desc: "read message with from",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: uint64(len(messages[0:21])),
From: messages[20].Time,
},
page: readers.MessagesPage{
Total: uint64(len(messages[0:21])),
Messages: fromSenml(messages[0:21]),
},
},
{
desc: "read message with to",
chanID: chanID,
pageMeta: readers.PageMetadata{
Offset: 0,
Limit: uint64(len(messages[21:])),
To: messages[20].Time,
},
page: readers.MessagesPage{
Total: uint64(len(messages[21:])),
Messages: fromSenml(messages[21:]),
},
},
{
desc: "failing test case : read message with from",
chanID: chanID,
Expand Down Expand Up @@ -406,9 +558,9 @@ func TestReadSenml(t *testing.T) {

for _, tc := range cases {
result, err := reader.ReadAll(tc.chanID, tc.pageMeta)
assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", tc.desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: got incorrect list of senml Messages from ReadAll()", tc.desc))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %d got %d", tc.desc, tc.page.Total, result.Total))
assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s\n", tc.desc, err))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected: %v, got: %v\n", tc.desc, tc.page.Messages, result.Messages))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %d got %d\n", tc.desc, tc.page.Total, result.Total))
}
}

Expand Down
4 changes: 4 additions & 0 deletions readers/mongodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,8 @@ docker-compose -f docker/addons/mongodb-reader/docker-compose.yml up -d
Service exposes [HTTP API](https://api.mainflux.io/?urls.primaryName=readers-openapi.yml) for fetching messages.
```
Note: MongoDB Reader doesn't support searching substrings from string_value, due to inefficient searching as the current data model is not suitable for this type of queries.
```
[doc]: https://docs.mainflux.io
2 changes: 1 addition & 1 deletion readers/mongodb/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (
addr string

v float64 = 5
vs = "value"
vs = "stringValue"
vb = true
vd = "dataValue"
sum float64 = 42
Expand Down
11 changes: 11 additions & 0 deletions readers/postgres/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,14 @@ $GOBIN/mainflux-postgres-reader
## Usage

Starting service will start consuming normalized messages in SenML format.

Comparator Usage Guide:
| Comparator | Usage | Example |
|----------------------|-----------------------------------------------------------------------------|------------------------------------|
| eq | Return values that are equal to the query | eq["active"] -> "active" |
| ge | Return values that are substrings of the query | ge["tiv"] -> "active" and "tiv" |
| gt | Return values that are substrings of the query and not equal to the query | gt["tiv"] -> "active" |
| le | Return values that are superstrings of the query | le["active"] -> "tiv" |
| lt | Return values that are superstrings of the query and not equal to the query | lt["active"] -> "active" and "tiv" |

Official docs can be found [here](https://docs.mainflux.io).
Loading

0 comments on commit 5e1120a

Please sign in to comment.