Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF-525 - Add pagination response to the readers #729

Merged
merged 2 commits into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions readers/api/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ func listMessagesEndpoint(svc readers.MessageRepository) endpoint.Endpoint {
return nil, err
}

messages := svc.ReadAll(req.chanID, req.offset, req.limit, req.query)
page := svc.ReadAll(req.chanID, req.offset, req.limit, req.query)

return listMessagesRes{Messages: messages}, nil
return pageRes{
Total: page.Total,
Offset: page.Offset,
Limit: page.Limit,
Messages: page.Messages,
}, nil
}
}
3 changes: 1 addition & 2 deletions readers/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"fmt"
"time"

"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/readers"
)
Expand All @@ -33,7 +32,7 @@ func LoggingMiddleware(svc readers.MessageRepository, logger logger.Logger) read
}
}

func (lm *loggingMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) []mainflux.Message {
func (lm *loggingMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage {
defer func(begin time.Time) {
lm.logger.Info(fmt.Sprintf(`Method read_all for offset %d and limit %d took %s to complete without errors.`, offset, limit, time.Since(begin)))
}(time.Now())
Expand Down
3 changes: 1 addition & 2 deletions readers/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"time"

"github.com/go-kit/kit/metrics"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/readers"
)

Expand All @@ -35,7 +34,7 @@ func MetricsMiddleware(svc readers.MessageRepository, counter metrics.Counter, l
}
}

func (mm *metricsMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) []mainflux.Message {
func (mm *metricsMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage {
defer func(begin time.Time) {
mm.counter.With("method", "read_all").Add(1)
mm.latency.With("method", "read_all").Observe(time.Since(begin).Seconds())
Expand Down
13 changes: 8 additions & 5 deletions readers/api/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,23 @@ import (
"github.com/mainflux/mainflux"
)

var _ mainflux.Response = (*listMessagesRes)(nil)
var _ mainflux.Response = (*pageRes)(nil)

type listMessagesRes struct {
type pageRes struct {
Total uint64 `json:"total"`
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
Messages []mainflux.Message `json:"messages"`
}

func (res listMessagesRes) Headers() map[string]string {
func (res pageRes) Headers() map[string]string {
return map[string]string{}
}

func (res listMessagesRes) Code() int {
func (res pageRes) Code() int {
return http.StatusOK
}

func (res listMessagesRes) Empty() bool {
func (res pageRes) Empty() bool {
return false
}
4 changes: 3 additions & 1 deletion readers/cassandra/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

package cassandra

import "github.com/gocql/gocql"
import (
"github.com/gocql/gocql"
)

// Connect establishes connection to the Cassandra cluster.
func Connect(hosts []string, keyspace string) (*gocql.Session, error) {
Expand Down
71 changes: 52 additions & 19 deletions readers/cassandra/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,19 @@ func New(session *gocql.Session) readers.MessageRepository {
return cassandraRepository{session: session}
}

func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) []mainflux.Message {
cql, values := buildQuery(chanID, offset, limit, query)
func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage {
names := []string{}
vals := []interface{}{chanID}
for name, val := range query {
names = append(names, name)
vals = append(vals, val)
}
vals = append(vals, offset+limit)

selectCQL := buildSelectQuery(chanID, offset, limit, names)
countCQL := buildCountQuery(chanID, names)

iter := cr.session.Query(cql, values...).Iter()
iter := cr.session.Query(selectCQL, vals...).Iter()
scanner := iter.Scanner()

// skip first OFFSET rows
Expand All @@ -43,12 +52,19 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query
var strVal, dataVal *string
var boolVal *bool

page := []mainflux.Message{}
page := readers.MessagesPage{
Offset: offset,
Limit: limit,
Messages: []mainflux.Message{},
}
for scanner.Next() {
var msg mainflux.Message
scanner.Scan(&msg.Channel, &msg.Subtopic, &msg.Publisher, &msg.Protocol,
err := scanner.Scan(&msg.Channel, &msg.Subtopic, &msg.Publisher, &msg.Protocol,
&msg.Name, &msg.Unit, &floatVal, &strVal, &boolVal,
&dataVal, &valueSum, &msg.Time, &msg.UpdateTime, &msg.Link)
if err != nil {
return readers.MessagesPage{}
}

switch {
case floatVal != nil:
Expand All @@ -65,40 +81,57 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query
msg.ValueSum = &mainflux.SumValue{Value: *valueSum}
}

page = append(page, msg)
page.Messages = append(page.Messages, msg)
}

if err := iter.Close(); err != nil {
return []mainflux.Message{}
return readers.MessagesPage{}
}

if err := cr.session.Query(countCQL, vals[:len(vals)-1]...).Scan(&page.Total); err != nil {
return readers.MessagesPage{}
}

return page
}

func buildQuery(chanID string, offset, limit uint64, query map[string]string) (string, []interface{}) {
var condSql string
var values []interface{}

func buildSelectQuery(chanID string, offset, limit uint64, names []string) string {
var condCQL string
cql := `SELECT channel, subtopic, publisher, protocol, name, unit,
value, string_value, bool_value, data_value, value_sum, time,
update_time, link FROM messages WHERE channel = ? %s LIMIT ?
value, string_value, bool_value, data_value, value_sum, time,
update_time, link FROM messages WHERE channel = ? %s LIMIT ?
ALLOW FILTERING`

values = append(values, chanID)
for _, name := range names {
switch name {
case
"channel",
"subtopic",
"publisher",
"name",
"protocol":
condCQL = fmt.Sprintf(`%s AND %s = ?`, condCQL, name)
}
}

return fmt.Sprintf(cql, condCQL)
}

func buildCountQuery(chanID string, names []string) string {
var condCQL string
cql := `SELECT COUNT(*) FROM messages WHERE channel = ? %s ALLOW FILTERING`

for name, value := range query {
for _, name := range names {
switch name {
case
"channel",
"subtopic",
"publisher",
"name",
"protocol":
condSql = fmt.Sprintf(`%s AND %s = ?`, condSql, name)
values = append(values, value)
condCQL = fmt.Sprintf(`%s AND %s = ?`, condCQL, name)
}
}

values = append(values, offset+limit)
return fmt.Sprintf(cql, condSql), values
return fmt.Sprintf(cql, condCQL)
}
103 changes: 65 additions & 38 deletions readers/cassandra/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
"time"

"github.com/mainflux/mainflux"
readers "github.com/mainflux/mainflux/readers/cassandra"
writers "github.com/mainflux/mainflux/writers/cassandra"
"github.com/mainflux/mainflux/readers"
creaders "github.com/mainflux/mainflux/readers/cassandra"
cwriters "github.com/mainflux/mainflux/writers/cassandra"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -37,12 +38,13 @@ var (
)

func TestReadAll(t *testing.T) {
session, err := readers.Connect([]string{addr}, keyspace)
session, err := creaders.Connect([]string{addr}, keyspace)
require.Nil(t, err, fmt.Sprintf("failed to connect to Cassandra: %s", err))
defer session.Close()
writer := writers.New(session)
writer := cwriters.New(session)

messages := []mainflux.Message{}
subtopicMsgs := []mainflux.Message{}
now := time.Now().Unix()
for i := 0; i < msgsNum; i++ {
// Mix possible values as well as value sum.
Expand All @@ -63,65 +65,90 @@ func TestReadAll(t *testing.T) {
case 5:
msg.ValueSum = &mainflux.SumValue{Value: 45}
}
msg.Time = float64(now + int64(i))
msg.Time = float64(now - int64(i))

err := writer.Save(msg)
require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err))
messages = append(messages, msg)
if count == 0 {
subtopicMsgs = append(subtopicMsgs, msg)
}
}

reader := readers.New(session)
reader := creaders.New(session)

// Since messages are not saved in natural order,
// cases that return subset of messages are only
// checking data result set size, but not content.
cases := map[string]struct {
chanID string
offset uint64
limit uint64
query map[string]string
messages []mainflux.Message
chanID string
offset uint64
limit uint64
query map[string]string
page readers.MessagesPage
}{
"read message page for existing channel": {
chanID: chanID,
offset: 0,
limit: msgsNum,
messages: messages,
chanID: chanID,
offset: 0,
limit: msgsNum,
page: readers.MessagesPage{
Total: msgsNum,
Offset: 0,
Limit: msgsNum,
Messages: messages,
},
},
"read message page for non-existent channel": {
chanID: "2",
offset: 0,
limit: msgsNum,
messages: []mainflux.Message{},
chanID: "2",
offset: 0,
limit: msgsNum,
page: readers.MessagesPage{
Total: 0,
Offset: 0,
Limit: msgsNum,
Messages: []mainflux.Message{},
},
},
"read message last page": {
chanID: chanID,
offset: 40,
limit: 5,
messages: messages[40:42],
chanID: chanID,
offset: 40,
limit: 5,
page: readers.MessagesPage{
Total: msgsNum,
Offset: 40,
Limit: 5,
Messages: messages[40:42],
},
},
"read message with non-existent subtopic": {
chanID: chanID,
offset: 0,
limit: msgsNum,
query: map[string]string{"subtopic": "not-present"},
messages: []mainflux.Message{},
chanID: chanID,
offset: 0,
limit: msgsNum,
query: map[string]string{"subtopic": "not-present"},
page: readers.MessagesPage{
Total: 0,
Offset: 0,
Limit: msgsNum,
Messages: []mainflux.Message{},
},
},
"read message with subtopic": {
chanID: chanID,
offset: 5,
limit: msgsNum,
query: map[string]string{"subtopic": subtopic},
messages: messages[0:2],
chanID: chanID,
offset: 5,
limit: msgsNum,
query: map[string]string{"subtopic": subtopic},
page: readers.MessagesPage{
Total: uint64(len(subtopicMsgs)),
Offset: 5,
Limit: msgsNum,
Messages: subtopicMsgs[5:],
},
},
}

for desc, tc := range cases {
result := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query)
if tc.offset > 0 {
assert.Equal(t, len(tc.messages), len(result), fmt.Sprintf("%s: expected %d messages, got %d", desc, len(tc.messages), len(result)))
continue
}
assert.ElementsMatch(t, tc.messages, result, fmt.Sprintf("%s: expected %v got %v", desc, tc.messages, result))
assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages))
assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total))
}
}
Loading