diff --git a/readers/api/endpoint.go b/readers/api/endpoint.go index 244166d201..2158fb8a36 100644 --- a/readers/api/endpoint.go +++ b/readers/api/endpoint.go @@ -22,8 +22,13 @@ func listMessagesEndpoint(svc readers.MessageRepository) endpoint.Endpoint { return nil, err } - messages := svc.ReadAll(req.chanID, req.offset, req.limit, req.query) + page := svc.ReadAll(req.chanID, req.offset, req.limit, req.query) - return listMessagesRes{Messages: messages}, nil + return pageRes{ + Total: page.Total, + Offset: page.Offset, + Limit: page.Limit, + Messages: page.Messages, + }, nil } } diff --git a/readers/api/logging.go b/readers/api/logging.go index 6a4b2762f0..4631b70249 100644 --- a/readers/api/logging.go +++ b/readers/api/logging.go @@ -13,7 +13,6 @@ import ( "fmt" "time" - "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/readers" ) @@ -33,7 +32,7 @@ func LoggingMiddleware(svc readers.MessageRepository, logger logger.Logger) read } } -func (lm *loggingMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) []mainflux.Message { +func (lm *loggingMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage { defer func(begin time.Time) { lm.logger.Info(fmt.Sprintf(`Method read_all for offset %d and limit %d took %s to complete without errors.`, offset, limit, time.Since(begin))) }(time.Now()) diff --git a/readers/api/metrics.go b/readers/api/metrics.go index 5e90b3b221..c7df232a7f 100644 --- a/readers/api/metrics.go +++ b/readers/api/metrics.go @@ -13,7 +13,6 @@ import ( "time" "github.com/go-kit/kit/metrics" - "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/readers" ) @@ -35,7 +34,7 @@ func MetricsMiddleware(svc readers.MessageRepository, counter metrics.Counter, l } } -func (mm *metricsMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) []mainflux.Message { +func (mm *metricsMiddleware) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage { defer func(begin time.Time) { mm.counter.With("method", "read_all").Add(1) mm.latency.With("method", "read_all").Observe(time.Since(begin).Seconds()) diff --git a/readers/api/responses.go b/readers/api/responses.go index 1252f9bd30..d36b6bb7f4 100644 --- a/readers/api/responses.go +++ b/readers/api/responses.go @@ -13,20 +13,23 @@ import ( "github.com/mainflux/mainflux" ) -var _ mainflux.Response = (*listMessagesRes)(nil) +var _ mainflux.Response = (*pageRes)(nil) -type listMessagesRes struct { +type pageRes struct { + Total uint64 `json:"total"` + Offset uint64 `json:"offset"` + Limit uint64 `json:"limit"` Messages []mainflux.Message `json:"messages"` } -func (res listMessagesRes) Headers() map[string]string { +func (res pageRes) Headers() map[string]string { return map[string]string{} } -func (res listMessagesRes) Code() int { +func (res pageRes) Code() int { return http.StatusOK } -func (res listMessagesRes) Empty() bool { +func (res pageRes) Empty() bool { return false } diff --git a/readers/cassandra/init.go b/readers/cassandra/init.go index 65c4548bdf..6014c6f570 100644 --- a/readers/cassandra/init.go +++ b/readers/cassandra/init.go @@ -7,7 +7,9 @@ package cassandra -import "github.com/gocql/gocql" +import ( + "github.com/gocql/gocql" +) // Connect establishes connection to the Cassandra cluster. func Connect(hosts []string, keyspace string) (*gocql.Session, error) { diff --git a/readers/cassandra/messages.go b/readers/cassandra/messages.go index 6df06bc3e6..9664a4be31 100644 --- a/readers/cassandra/messages.go +++ b/readers/cassandra/messages.go @@ -26,10 +26,19 @@ func New(session *gocql.Session) readers.MessageRepository { return cassandraRepository{session: session} } -func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) []mainflux.Message { - cql, values := buildQuery(chanID, offset, limit, query) +func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage { + names := []string{} + vals := []interface{}{chanID} + for name, val := range query { + names = append(names, name) + vals = append(vals, val) + } + vals = append(vals, offset+limit) + + selectCQL := buildSelectQuery(chanID, offset, limit, names) + countCQL := buildCountQuery(chanID, names) - iter := cr.session.Query(cql, values...).Iter() + iter := cr.session.Query(selectCQL, vals...).Iter() scanner := iter.Scanner() // skip first OFFSET rows @@ -43,12 +52,19 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query var strVal, dataVal *string var boolVal *bool - page := []mainflux.Message{} + page := readers.MessagesPage{ + Offset: offset, + Limit: limit, + Messages: []mainflux.Message{}, + } for scanner.Next() { var msg mainflux.Message - scanner.Scan(&msg.Channel, &msg.Subtopic, &msg.Publisher, &msg.Protocol, + err := scanner.Scan(&msg.Channel, &msg.Subtopic, &msg.Publisher, &msg.Protocol, &msg.Name, &msg.Unit, &floatVal, &strVal, &boolVal, &dataVal, &valueSum, &msg.Time, &msg.UpdateTime, &msg.Link) + if err != nil { + return readers.MessagesPage{} + } switch { case floatVal != nil: @@ -65,28 +81,47 @@ func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query msg.ValueSum = &mainflux.SumValue{Value: *valueSum} } - page = append(page, msg) + page.Messages = append(page.Messages, msg) } if err := iter.Close(); err != nil { - return []mainflux.Message{} + return readers.MessagesPage{} + } + + if err := cr.session.Query(countCQL, vals[:len(vals)-1]...).Scan(&page.Total); err != nil { + return readers.MessagesPage{} } return page } -func buildQuery(chanID string, offset, limit uint64, query map[string]string) (string, []interface{}) { - var condSql string - var values []interface{} - +func buildSelectQuery(chanID string, offset, limit uint64, names []string) string { + var condCQL string cql := `SELECT channel, subtopic, publisher, protocol, name, unit, - value, string_value, bool_value, data_value, value_sum, time, - update_time, link FROM messages WHERE channel = ? %s LIMIT ? + value, string_value, bool_value, data_value, value_sum, time, + update_time, link FROM messages WHERE channel = ? %s LIMIT ? ALLOW FILTERING` - values = append(values, chanID) + for _, name := range names { + switch name { + case + "channel", + "subtopic", + "publisher", + "name", + "protocol": + condCQL = fmt.Sprintf(`%s AND %s = ?`, condCQL, name) + } + } + + return fmt.Sprintf(cql, condCQL) +} + +func buildCountQuery(chanID string, names []string) string { + var condCQL string + cql := `SELECT COUNT(*) FROM messages WHERE channel = ? %s ALLOW FILTERING` - for name, value := range query { + for _, name := range names { switch name { case "channel", @@ -94,11 +129,9 @@ func buildQuery(chanID string, offset, limit uint64, query map[string]string) (s "publisher", "name", "protocol": - condSql = fmt.Sprintf(`%s AND %s = ?`, condSql, name) - values = append(values, value) + condCQL = fmt.Sprintf(`%s AND %s = ?`, condCQL, name) } } - values = append(values, offset+limit) - return fmt.Sprintf(cql, condSql), values + return fmt.Sprintf(cql, condCQL) } diff --git a/readers/cassandra/messages_test.go b/readers/cassandra/messages_test.go index bc7c9af715..5b221ce470 100644 --- a/readers/cassandra/messages_test.go +++ b/readers/cassandra/messages_test.go @@ -13,8 +13,9 @@ import ( "time" "github.com/mainflux/mainflux" - readers "github.com/mainflux/mainflux/readers/cassandra" - writers "github.com/mainflux/mainflux/writers/cassandra" + "github.com/mainflux/mainflux/readers" + creaders "github.com/mainflux/mainflux/readers/cassandra" + cwriters "github.com/mainflux/mainflux/writers/cassandra" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -37,12 +38,13 @@ var ( ) func TestReadAll(t *testing.T) { - session, err := readers.Connect([]string{addr}, keyspace) + session, err := creaders.Connect([]string{addr}, keyspace) require.Nil(t, err, fmt.Sprintf("failed to connect to Cassandra: %s", err)) defer session.Close() - writer := writers.New(session) + writer := cwriters.New(session) messages := []mainflux.Message{} + subtopicMsgs := []mainflux.Message{} now := time.Now().Unix() for i := 0; i < msgsNum; i++ { // Mix possible values as well as value sum. @@ -63,65 +65,90 @@ func TestReadAll(t *testing.T) { case 5: msg.ValueSum = &mainflux.SumValue{Value: 45} } - msg.Time = float64(now + int64(i)) + msg.Time = float64(now - int64(i)) err := writer.Save(msg) require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err)) messages = append(messages, msg) + if count == 0 { + subtopicMsgs = append(subtopicMsgs, msg) + } } - reader := readers.New(session) + reader := creaders.New(session) // Since messages are not saved in natural order, // cases that return subset of messages are only // checking data result set size, but not content. cases := map[string]struct { - chanID string - offset uint64 - limit uint64 - query map[string]string - messages []mainflux.Message + chanID string + offset uint64 + limit uint64 + query map[string]string + page readers.MessagesPage }{ "read message page for existing channel": { - chanID: chanID, - offset: 0, - limit: msgsNum, - messages: messages, + chanID: chanID, + offset: 0, + limit: msgsNum, + page: readers.MessagesPage{ + Total: msgsNum, + Offset: 0, + Limit: msgsNum, + Messages: messages, + }, }, "read message page for non-existent channel": { - chanID: "2", - offset: 0, - limit: msgsNum, - messages: []mainflux.Message{}, + chanID: "2", + offset: 0, + limit: msgsNum, + page: readers.MessagesPage{ + Total: 0, + Offset: 0, + Limit: msgsNum, + Messages: []mainflux.Message{}, + }, }, "read message last page": { - chanID: chanID, - offset: 40, - limit: 5, - messages: messages[40:42], + chanID: chanID, + offset: 40, + limit: 5, + page: readers.MessagesPage{ + Total: msgsNum, + Offset: 40, + Limit: 5, + Messages: messages[40:42], + }, }, "read message with non-existent subtopic": { - chanID: chanID, - offset: 0, - limit: msgsNum, - query: map[string]string{"subtopic": "not-present"}, - messages: []mainflux.Message{}, + chanID: chanID, + offset: 0, + limit: msgsNum, + query: map[string]string{"subtopic": "not-present"}, + page: readers.MessagesPage{ + Total: 0, + Offset: 0, + Limit: msgsNum, + Messages: []mainflux.Message{}, + }, }, "read message with subtopic": { - chanID: chanID, - offset: 5, - limit: msgsNum, - query: map[string]string{"subtopic": subtopic}, - messages: messages[0:2], + chanID: chanID, + offset: 5, + limit: msgsNum, + query: map[string]string{"subtopic": subtopic}, + page: readers.MessagesPage{ + Total: uint64(len(subtopicMsgs)), + Offset: 5, + Limit: msgsNum, + Messages: subtopicMsgs[5:], + }, }, } for desc, tc := range cases { result := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query) - if tc.offset > 0 { - assert.Equal(t, len(tc.messages), len(result), fmt.Sprintf("%s: expected %d messages, got %d", desc, len(tc.messages), len(result))) - continue - } - assert.ElementsMatch(t, tc.messages, result, fmt.Sprintf("%s: expected %v got %v", desc, tc.messages, result)) + assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages)) + assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total)) } } diff --git a/readers/influxdb/messages.go b/readers/influxdb/messages.go index 3e3a287666..0e8b87401a 100644 --- a/readers/influxdb/messages.go +++ b/readers/influxdb/messages.go @@ -14,7 +14,10 @@ import ( "github.com/mainflux/mainflux" ) -const maxLimit = 100 +const ( + maxLimit = 100 + countCol = "count" +) var _ readers.MessageRepository = (*influxRepository)(nil) @@ -28,13 +31,13 @@ func New(client influxdata.Client, database string) (readers.MessageRepository, return &influxRepository{database, client}, nil } -func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) []mainflux.Message { +func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage { if limit > maxLimit { limit = maxLimit } condition := fmtCondition(chanID, query) - cmd := fmt.Sprintf(`SELECT * from messages WHERE %s ORDER BY time DESC LIMIT %d OFFSET %d`, condition, limit, offset) + cmd := fmt.Sprintf(`SELECT * FROM messages WHERE %s ORDER BY time DESC LIMIT %d OFFSET %d`, condition, limit, offset) q := influxdata.Query{ Command: cmd, Database: repo.database, @@ -44,11 +47,11 @@ func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query resp, err := repo.client.Query(q) if err != nil || resp.Error() != nil { - return ret + return readers.MessagesPage{} } if len(resp.Results) < 1 || len(resp.Results[0].Series) < 1 { - return ret + return readers.MessagesPage{} } result := resp.Results[0].Series[0] @@ -56,7 +59,59 @@ func (repo *influxRepository) ReadAll(chanID string, offset, limit uint64, query ret = append(ret, parseMessage(result.Columns, v)) } - return ret + total, err := repo.count(condition) + if err != nil { + return readers.MessagesPage{} + } + + return readers.MessagesPage{ + Total: total, + Offset: offset, + Limit: limit, + Messages: ret, + } +} + +func (repo *influxRepository) count(condition string) (uint64, error) { + cmd := fmt.Sprintf(`SELECT COUNT(protocol) FROM messages WHERE %s`, condition) + q := influxdata.Query{ + Command: cmd, + Database: repo.database, + } + + resp, err := repo.client.Query(q) + if err != nil { + return 0, err + } + if resp.Error() != nil { + return 0, resp.Error() + } + + if len(resp.Results) < 1 || + len(resp.Results[0].Series) < 1 || + len(resp.Results[0].Series[0].Values) < 1 { + return 0, nil + } + + countIndex := 0 + for i, col := range resp.Results[0].Series[0].Columns { + if col == countCol { + countIndex = i + break + } + } + + result := resp.Results[0].Series[0].Values[0] + if len(result) < countIndex+1 { + return 0, nil + } + + count, ok := result[countIndex].(json.Number) + if !ok { + return 0, nil + } + + return strconv.ParseUint(count.String(), 10, 64) } func fmtCondition(chanID string, query map[string]string) string { diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index 061b58ca4e..f2e76a9270 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -8,6 +8,7 @@ import ( influxdata "github.com/influxdata/influxdb/client/v2" "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/readers" reader "github.com/mainflux/mainflux/readers/influxdb" writer "github.com/mainflux/mainflux/writers/influxdb" @@ -53,10 +54,12 @@ func TestReadAll(t *testing.T) { require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB writer expected to succeed: %s.\n", err)) messages := []mainflux.Message{} + subtopicMsgs := []mainflux.Message{} now := time.Now().Unix() for i := 0; i < msgsNum; i++ { // Mix possible values as well as value sum. count := i % valueFields + msg.Subtopic = "" switch count { case 0: msg.Subtopic = subtopic @@ -77,60 +80,94 @@ func TestReadAll(t *testing.T) { err := writer.Save(msg) require.Nil(t, err, fmt.Sprintf("failed to store message to InfluxDB: %s", err)) messages = append(messages, msg) + if count == 0 { + subtopicMsgs = append(subtopicMsgs, msg) + } } reader, err := reader.New(client, testDB) require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB reader expected to succeed: %s.\n", err)) cases := map[string]struct { - chanID string - offset uint64 - limit uint64 - query map[string]string - messages []mainflux.Message + chanID string + offset uint64 + limit uint64 + query map[string]string + page readers.MessagesPage }{ "read message page for existing channel": { - chanID: chanID, - offset: 0, - limit: 10, - messages: messages[0:10], + chanID: chanID, + offset: 0, + limit: 10, + page: readers.MessagesPage{ + Total: msgsNum, + Offset: 0, + Limit: 10, + Messages: messages[0:10], + }, }, "read message page for too large limit": { - chanID: chanID, - offset: 0, - limit: 101, - messages: messages[0:100], + chanID: chanID, + offset: 0, + limit: 101, + page: readers.MessagesPage{ + Total: msgsNum, + Offset: 0, + Limit: 101, + Messages: messages[0:100], + }, }, "read message page for non-existent channel": { - chanID: "2", - offset: 0, - limit: 10, - messages: []mainflux.Message{}, + chanID: "2", + offset: 0, + limit: 10, + page: readers.MessagesPage{ + Total: 0, + Offset: 0, + Limit: 10, + Messages: []mainflux.Message{}, + }, }, "read message last page": { - chanID: chanID, - offset: 95, - limit: 10, - messages: messages[95:101], + chanID: chanID, + offset: 95, + limit: 10, + page: readers.MessagesPage{ + Total: msgsNum, + Offset: 95, + Limit: 10, + Messages: messages[95:101], + }, }, "read message with non-existent subtopic": { - chanID: chanID, - offset: 0, - limit: msgsNum, - query: map[string]string{"subtopic": "not-present"}, - messages: []mainflux.Message{}, + chanID: chanID, + offset: 0, + limit: msgsNum, + query: map[string]string{"subtopic": "not-present"}, + page: readers.MessagesPage{ + Total: 0, + Offset: 0, + Limit: msgsNum, + Messages: []mainflux.Message{}, + }, }, "read message with subtopic": { - chanID: chanID, - offset: 0, - limit: 10, - query: map[string]string{"subtopic": subtopic}, - messages: messages[0:10], + chanID: chanID, + offset: 0, + limit: 10, + query: map[string]string{"subtopic": subtopic}, + page: readers.MessagesPage{ + Total: uint64(len(subtopicMsgs)), + Offset: 0, + Limit: 10, + Messages: subtopicMsgs[0:10], + }, }, } for desc, tc := range cases { result := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query) - assert.ElementsMatch(t, tc.messages, result, fmt.Sprintf("%s: expected: %v \n-------------\n got: %v", desc, tc.messages, result)) + assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected: %v \n-------------\n got: %v", desc, tc.page.Messages, result.Messages)) + assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %d got %d", desc, tc.page.Total, result.Total)) } } diff --git a/readers/messages.go b/readers/messages.go index 1b7b192110..cf6f3a1b20 100644 --- a/readers/messages.go +++ b/readers/messages.go @@ -20,5 +20,14 @@ var ErrNotFound = errors.New("entity not found") type MessageRepository interface { // ReadAll skips given number of messages for given channel and returns next // limited number of messages. - ReadAll(string, uint64, uint64, map[string]string) []mainflux.Message + ReadAll(string, uint64, uint64, map[string]string) MessagesPage +} + +// MessagesPage contains page related metadata as well as list of messages that +// belong to this page. +type MessagesPage struct { + Total uint64 + Offset uint64 + Limit uint64 + Messages []mainflux.Message } diff --git a/readers/mocks/messages.go b/readers/mocks/messages.go index 74787aff65..5fb8524ce0 100644 --- a/readers/mocks/messages.go +++ b/readers/mocks/messages.go @@ -29,7 +29,7 @@ func NewMessageRepository(messages map[string][]mainflux.Message) readers.Messag } } -func (repo *messageRepositoryMock) ReadAll(chanID string, offset, limit uint64, query map[string]string) []mainflux.Message { +func (repo *messageRepositoryMock) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage { repo.mutex.Lock() defer repo.mutex.Unlock() @@ -37,16 +37,21 @@ func (repo *messageRepositoryMock) ReadAll(chanID string, offset, limit uint64, numOfMessages := uint64(len(repo.messages[chanID])) if offset < 0 || offset >= numOfMessages { - return []mainflux.Message{} + return readers.MessagesPage{} } if limit < 1 { - return []mainflux.Message{} + return readers.MessagesPage{} } if offset+limit > numOfMessages { end = numOfMessages } - return repo.messages[chanID][offset:end] + return readers.MessagesPage{ + Total: numOfMessages, + Limit: limit, + Offset: offset, + Messages: repo.messages[chanID][offset:end], + } } diff --git a/readers/mongodb/messages.go b/readers/mongodb/messages.go index 96455f7fe9..3c4e080091 100644 --- a/readers/mongodb/messages.go +++ b/readers/mongodb/messages.go @@ -48,7 +48,7 @@ func New(db *mongo.Database) readers.MessageRepository { return mongoRepository{db: db} } -func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) []mainflux.Message { +func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) readers.MessagesPage { col := repo.db.Collection(collection) sortMap := map[string]interface{}{ "time": -1, @@ -57,7 +57,7 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m filter := fmtCondition(chanID, query) cursor, err := col.Find(context.Background(), filter, options.Find().SetSort(sortMap).SetLimit(int64(limit)).SetSkip(int64(offset))) if err != nil { - return []mainflux.Message{} + return readers.MessagesPage{} } defer cursor.Close(context.Background()) @@ -65,7 +65,7 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m for cursor.Next(context.Background()) { var m message if err := cursor.Decode(&m); err != nil { - return []mainflux.Message{} + return readers.MessagesPage{} } msg := mainflux.Message{ @@ -98,7 +98,20 @@ func (repo mongoRepository) ReadAll(chanID string, offset, limit uint64, query m messages = append(messages, msg) } - return messages + total, err := col.CountDocuments(context.Background(), filter) + if err != nil { + return readers.MessagesPage{} + } + if total < 0 { + return readers.MessagesPage{} + } + + return readers.MessagesPage{ + Total: uint64(total), + Offset: offset, + Limit: limit, + Messages: messages, + } } func fmtCondition(chanID string, query map[string]string) *bson.D { diff --git a/readers/mongodb/messages_test.go b/readers/mongodb/messages_test.go index 7ce4dc75dc..32fdf8d5bc 100644 --- a/readers/mongodb/messages_test.go +++ b/readers/mongodb/messages_test.go @@ -14,8 +14,9 @@ import ( "testing" "time" - readers "github.com/mainflux/mainflux/readers/mongodb" - writers "github.com/mainflux/mainflux/writers/mongodb" + "github.com/mainflux/mainflux/readers" + mreaders "github.com/mainflux/mainflux/readers/mongodb" + mwriters "github.com/mainflux/mainflux/writers/mongodb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -51,13 +52,15 @@ func TestReadAll(t *testing.T) { require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) db := client.Database(testDB) - writer := writers.New(db) + writer := mwriters.New(db) messages := []mainflux.Message{} + subtopicMsgs := []mainflux.Message{} now := time.Now().Unix() for i := 0; i < msgsNum; i++ { // Mix possible values as well as value sum. count := i % valueFields + msg.Subtopic = "" switch count { case 0: msg.Subtopic = subtopic @@ -78,53 +81,82 @@ func TestReadAll(t *testing.T) { err := writer.Save(msg) require.Nil(t, err, fmt.Sprintf("failed to store message to MongoDB: %s", err)) messages = append(messages, msg) + if count == 0 { + subtopicMsgs = append(subtopicMsgs, msg) + } } - reader := readers.New(db) + reader := mreaders.New(db) cases := map[string]struct { - chanID string - offset uint64 - limit uint64 - query map[string]string - messages []mainflux.Message + chanID string + offset uint64 + limit uint64 + query map[string]string + page readers.MessagesPage }{ "read message page for existing channel": { - chanID: chanID, - offset: 0, - limit: 10, - messages: messages[0:10], + chanID: chanID, + offset: 0, + limit: 10, + page: readers.MessagesPage{ + Total: msgsNum, + Offset: 0, + Limit: 10, + Messages: messages[0:10], + }, }, "read message page for non-existent channel": { - chanID: "2", - offset: 0, - limit: 10, - messages: []mainflux.Message{}, + chanID: "2", + offset: 0, + limit: 10, + page: readers.MessagesPage{ + Total: 0, + Offset: 0, + Limit: 10, + Messages: []mainflux.Message{}, + }, }, "read message last page": { - chanID: chanID, - offset: 40, - limit: 10, - messages: messages[40:42], + chanID: chanID, + offset: 40, + limit: 10, + page: readers.MessagesPage{ + Total: msgsNum, + Offset: 40, + Limit: 10, + Messages: messages[40:42], + }, }, "read message with non-existent subtopic": { - chanID: chanID, - offset: 0, - limit: msgsNum, - query: map[string]string{"subtopic": "not-present"}, - messages: []mainflux.Message{}, + chanID: chanID, + offset: 0, + limit: msgsNum, + query: map[string]string{"subtopic": "not-present"}, + page: readers.MessagesPage{ + Total: 0, + Offset: 0, + Limit: msgsNum, + Messages: []mainflux.Message{}, + }, }, "read message with subtopic": { - chanID: chanID, - offset: 0, - limit: 10, - query: map[string]string{"subtopic": subtopic}, - messages: messages[0:10], + chanID: chanID, + offset: 0, + limit: 10, + query: map[string]string{"subtopic": subtopic}, + page: readers.MessagesPage{ + Total: uint64(len(subtopicMsgs)), + Offset: 0, + Limit: 10, + Messages: subtopicMsgs, + }, }, } for desc, tc := range cases { result := reader.ReadAll(tc.chanID, tc.offset, tc.limit, tc.query) - assert.ElementsMatch(t, tc.messages, result, fmt.Sprintf("%s: expected %v got %v", desc, tc.messages, result)) + assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Messages, result.Messages)) + assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", desc, tc.page.Total, result.Total)) } } diff --git a/readers/swagger.yml b/readers/swagger.yml index 9bbab1e305..383338a005 100644 --- a/readers/swagger.yml +++ b/readers/swagger.yml @@ -27,7 +27,7 @@ paths: 200: description: Data retrieved. schema: - $ref: "#/definitions/MessageList" + $ref: "#/definitions/MessagesPage" 400: description: Failed due to malformed query parameters. 403: @@ -40,9 +40,18 @@ responses: description: Unexpected server-side error occured. definitions: - MessageList: + MessagePage: type: object properties: + total: + type: number + description: Total number of items that are present on the system. + offset: + type: number + description: Number of items that were skipped during retrieval. + limit: + type: number + description: Size of the subset that was retrieved. messages: type: array minItems: 0