Skip to content

Commit

Permalink
NOISSUE - Fix Postgres logs in Things service (absmach#734)
Browse files Browse the repository at this point in the history
* NOISSUE - Fix Postgres logs in Things service

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix reviews

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix reviews

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix reviews

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix reviews

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix reviews

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix uuid package

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
  • Loading branch information
manuio authored and dborovcanin committed May 13, 2019
1 parent 066e2b4 commit e11ec79
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 106 deletions.
4 changes: 2 additions & 2 deletions things/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ type ChannelRepository interface {
RetrieveByID(string, string) (Channel, error)

// RetrieveAll retrieves the subset of channels owned by the specified user.
RetrieveAll(string, uint64, uint64) ChannelsPage
RetrieveAll(string, uint64, uint64) (ChannelsPage, error)

// RetrieveByThing retrieves the subset of channels owned by the specified
// user and have specified thing connected to them.
RetrieveByThing(string, string, uint64, uint64) ChannelsPage
RetrieveByThing(string, string, uint64, uint64) (ChannelsPage, error)

// Remove removes the channel having the provided identifier, that is owned
// by the specified user.
Expand Down
12 changes: 6 additions & 6 deletions things/mocks/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ func (crm *channelRepositoryMock) RetrieveByID(owner, id string) (things.Channel
return things.Channel{}, things.ErrNotFound
}

func (crm *channelRepositoryMock) RetrieveAll(owner string, offset, limit uint64) things.ChannelsPage {
func (crm *channelRepositoryMock) RetrieveAll(owner string, offset, limit uint64) (things.ChannelsPage, error) {
channels := make([]things.Channel, 0)

if offset < 0 || limit <= 0 {
return things.ChannelsPage{}
return things.ChannelsPage{}, nil
}

first := uint64(offset) + 1
Expand Down Expand Up @@ -112,14 +112,14 @@ func (crm *channelRepositoryMock) RetrieveAll(owner string, offset, limit uint64
},
}

return page
return page, nil
}

func (crm *channelRepositoryMock) RetrieveByThing(owner, thingID string, offset, limit uint64) things.ChannelsPage {
func (crm *channelRepositoryMock) RetrieveByThing(owner, thingID string, offset, limit uint64) (things.ChannelsPage, error) {
channels := make([]things.Channel, 0)

if offset < 0 || limit <= 0 {
return things.ChannelsPage{}
return things.ChannelsPage{}, nil
}

first := uint64(offset) + 1
Expand All @@ -145,7 +145,7 @@ func (crm *channelRepositoryMock) RetrieveByThing(owner, thingID string, offset,
},
}

return page
return page, nil
}

func (crm *channelRepositoryMock) Remove(owner, id string) error {
Expand Down
14 changes: 7 additions & 7 deletions things/mocks/things.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@ func (trm *thingRepositoryMock) RetrieveByID(owner, id string) (things.Thing, er
return things.Thing{}, things.ErrNotFound
}

func (trm *thingRepositoryMock) RetrieveAll(owner string, offset, limit uint64) things.ThingsPage {
func (trm *thingRepositoryMock) RetrieveAll(owner string, offset, limit uint64) (things.ThingsPage, error) {
trm.mu.Lock()
defer trm.mu.Unlock()

items := make([]things.Thing, 0)

if offset < 0 || limit <= 0 {
return things.ThingsPage{}
return things.ThingsPage{}, nil
}

first := uint64(offset) + 1
Expand Down Expand Up @@ -149,25 +149,25 @@ func (trm *thingRepositoryMock) RetrieveAll(owner string, offset, limit uint64)
},
}

return page
return page, nil
}

func (trm *thingRepositoryMock) RetrieveByChannel(owner, chanID string, offset, limit uint64) things.ThingsPage {
func (trm *thingRepositoryMock) RetrieveByChannel(owner, chanID string, offset, limit uint64) (things.ThingsPage, error) {
trm.mu.Lock()
defer trm.mu.Unlock()

items := make([]things.Thing, 0)

if offset < 0 || limit <= 0 {
return things.ThingsPage{}
return things.ThingsPage{}, nil
}

first := uint64(offset) + 1
last := first + uint64(limit)

ths, ok := trm.tconns[chanID]
if !ok {
return things.ThingsPage{}
return things.ThingsPage{}, nil
}

for _, v := range ths {
Expand All @@ -190,7 +190,7 @@ func (trm *thingRepositoryMock) RetrieveByChannel(owner, chanID string, offset,
},
}

return page
return page, nil
}

func (trm *thingRepositoryMock) Remove(owner, id string) error {
Expand Down
53 changes: 23 additions & 30 deletions things/postgres/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ package postgres
import (
"database/sql"
"encoding/json"
"fmt"

"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/things"
uuid "github.com/satori/go.uuid"
)

var _ things.ChannelRepository = (*channelRepository)(nil)
Expand All @@ -27,16 +26,14 @@ const (
)

type channelRepository struct {
db *sqlx.DB
log logger.Logger
db *sqlx.DB
}

// NewChannelRepository instantiates a PostgreSQL implementation of channel
// repository.
func NewChannelRepository(db *sqlx.DB, log logger.Logger) things.ChannelRepository {
func NewChannelRepository(db *sqlx.DB) things.ChannelRepository {
return &channelRepository{
db: db,
log: log,
db: db,
}
}

Expand Down Expand Up @@ -108,7 +105,7 @@ func (cr channelRepository) RetrieveByID(owner, id string) (things.Channel, erro
return toChannel(dbch)
}

func (cr channelRepository) RetrieveAll(owner string, offset, limit uint64) things.ChannelsPage {
func (cr channelRepository) RetrieveAll(owner string, offset, limit uint64) (things.ChannelsPage, error) {
q := `SELECT id, name, metadata FROM channels WHERE owner = :owner ORDER BY id LIMIT :limit OFFSET :offset;`

params := map[string]interface{}{
Expand All @@ -118,22 +115,19 @@ func (cr channelRepository) RetrieveAll(owner string, offset, limit uint64) thin
}
rows, err := cr.db.NamedQuery(q, params)
if err != nil {
cr.log.Error(fmt.Sprintf("Failed to retrieve channels due to %s", err))
return things.ChannelsPage{}
return things.ChannelsPage{}, err
}
defer rows.Close()

items := []things.Channel{}
for rows.Next() {
dbch := dbChannel{Owner: owner}
if err := rows.StructScan(&dbch); err != nil {
cr.log.Error(fmt.Sprintf("Failed to read retrieved channel due to %s", err))
return things.ChannelsPage{}
return things.ChannelsPage{}, err
}
ch, err := toChannel(dbch)
if err != nil {
cr.log.Error(fmt.Sprintf("Failed to read retrieved channel due to %s", err))
return things.ChannelsPage{}
return things.ChannelsPage{}, err
}

items = append(items, ch)
Expand All @@ -143,8 +137,7 @@ func (cr channelRepository) RetrieveAll(owner string, offset, limit uint64) thin

var total uint64
if err := cr.db.Get(&total, q, owner); err != nil {
cr.log.Error(fmt.Sprintf("Failed to count channels due to %s", err))
return things.ChannelsPage{}
return things.ChannelsPage{}, err
}

page := things.ChannelsPage{
Expand All @@ -156,10 +149,15 @@ func (cr channelRepository) RetrieveAll(owner string, offset, limit uint64) thin
},
}

return page
return page, nil
}

func (cr channelRepository) RetrieveByThing(owner, thing string, offset, limit uint64) things.ChannelsPage {
func (cr channelRepository) RetrieveByThing(owner, thing string, offset, limit uint64) (things.ChannelsPage, error) {
// Verify if UUID format is valid to avoid internal Postgres error
if _, err := uuid.FromString(thing); err != nil {
return things.ChannelsPage{}, things.ErrNotFound
}

q := `SELECT id, name, metadata
FROM channels ch
INNER JOIN connections co
Expand All @@ -178,23 +176,20 @@ func (cr channelRepository) RetrieveByThing(owner, thing string, offset, limit u

rows, err := cr.db.NamedQuery(q, params)
if err != nil {
cr.log.Error(fmt.Sprintf("Failed to retrieve channels due to %s", err))
return things.ChannelsPage{}
return things.ChannelsPage{}, err
}
defer rows.Close()

items := []things.Channel{}
for rows.Next() {
dbch := dbChannel{Owner: owner}
if err := rows.StructScan(&dbch); err != nil {
cr.log.Error(fmt.Sprintf("Failed to read retrieved channel due to %s", err))
return things.ChannelsPage{}
return things.ChannelsPage{}, err
}

ch, err := toChannel(dbch)
if err != nil {
cr.log.Error(fmt.Sprintf("Failed to read retrieved channel due to %s", err))
return things.ChannelsPage{}
return things.ChannelsPage{}, err
}

items = append(items, ch)
Expand All @@ -208,8 +203,7 @@ func (cr channelRepository) RetrieveByThing(owner, thing string, offset, limit u

var total uint64
if err := cr.db.Get(&total, q, owner, thing); err != nil {
cr.log.Error(fmt.Sprintf("Failed to count channels due to %s", err))
return things.ChannelsPage{}
return things.ChannelsPage{}, err
}

return things.ChannelsPage{
Expand All @@ -219,7 +213,7 @@ func (cr channelRepository) RetrieveByThing(owner, thing string, offset, limit u
Offset: offset,
Limit: limit,
},
}
}, nil
}

func (cr channelRepository) Remove(owner, id string) error {
Expand All @@ -233,7 +227,7 @@ func (cr channelRepository) Remove(owner, id string) error {
}

func (cr channelRepository) Connect(owner, chanID, thingID string) error {
q := `INSERT INTO connections (channel_id, channel_owner, thing_id, thing_owner)
q := `INSERT INTO connections (channel_id, channel_owner, thing_id, thing_owner)
VALUES (:channel, :owner, :thing, :owner);`

conn := dbConnection{
Expand Down Expand Up @@ -293,14 +287,13 @@ func (cr channelRepository) HasThing(chanID, key string) (string, error) {

q := `SELECT id FROM things WHERE key = $1`
if err := cr.db.QueryRow(q, key).Scan(&thingID); err != nil {
cr.log.Error(fmt.Sprintf("Failed to obtain thing's ID due to %s", err))
return "", err

}

q = `SELECT EXISTS (SELECT 1 FROM connections WHERE channel_id = $1 AND thing_id = $2);`
exists := false
if err := cr.db.QueryRow(q, chanID, thingID).Scan(&exists); err != nil {
cr.log.Error(fmt.Sprintf("Failed to check thing existence due to %s", err))
return "", err
}

Expand Down
Loading

0 comments on commit e11ec79

Please sign in to comment.