Skip to content

Commit

Permalink
improve context management
Browse files Browse the repository at this point in the history
Signed-off-by: SammyOina <sammyoina@gmail.com>
  • Loading branch information
SammyOina committed Jul 13, 2023
1 parent e7e4a61 commit 3c22665
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 27 deletions.
8 changes: 4 additions & 4 deletions consumers/notifiers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (ns *notifierService) RemoveSubscription(ctx context.Context, token, id str
return ns.subs.Remove(ctx, id)
}

func (ns *notifierService) ConsumeBlocking(_ context.Context, message interface{}) error {
func (ns *notifierService) ConsumeBlocking(ctx context.Context, message interface{}) error {
msg, ok := message.(*messaging.Message)
if !ok {
return ErrMessage
Expand All @@ -114,7 +114,7 @@ func (ns *notifierService) ConsumeBlocking(_ context.Context, message interface{
Offset: 0,
Limit: -1,
}
page, err := ns.subs.RetrieveAll(context.Background(), pm)
page, err := ns.subs.RetrieveAll(ctx, pm)
if err != nil {
return err
}
Expand All @@ -133,7 +133,7 @@ func (ns *notifierService) ConsumeBlocking(_ context.Context, message interface{
return nil
}

func (ns *notifierService) ConsumeAsync(_ context.Context, message interface{}) {
func (ns *notifierService) ConsumeAsync(ctx context.Context, message interface{}) {
msg, ok := message.(*messaging.Message)
if !ok {
ns.errCh <- ErrMessage
Expand All @@ -148,7 +148,7 @@ func (ns *notifierService) ConsumeAsync(_ context.Context, message interface{})
Offset: 0,
Limit: -1,
}
page, err := ns.subs.RetrieveAll(context.Background(), pm)
page, err := ns.subs.RetrieveAll(ctx, pm)
if err != nil {
ns.errCh <- err
return
Expand Down
4 changes: 2 additions & 2 deletions consumers/writers/influxdb/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (repo *influxRepo) Errors() <-chan error {
return nil
}

func (repo *influxRepo) ConsumeBlocking(_ context.Context, message interface{}) error {
func (repo *influxRepo) ConsumeBlocking(ctx context.Context, message interface{}) error {
var err error
var pts []*write.Point
switch m := message.(type) {
Expand All @@ -115,7 +115,7 @@ func (repo *influxRepo) ConsumeBlocking(_ context.Context, message interface{})
return err
}

return repo.writeAPIBlocking.WritePoint(context.Background(), pts...)
return repo.writeAPIBlocking.WritePoint(ctx, pts...)
}

func (repo *influxRepo) senmlPoints(messages interface{}) ([]*write.Point, error) {
Expand Down
14 changes: 7 additions & 7 deletions consumers/writers/mongodb/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ func New(db *mongo.Database) consumers.BlockingConsumer {
return &mongoRepo{db}
}

func (repo *mongoRepo) ConsumeBlocking(_ context.Context, message interface{}) error {
func (repo *mongoRepo) ConsumeBlocking(ctx context.Context, message interface{}) error {
switch m := message.(type) {
case json.Messages:
return repo.saveJSON(m)
return repo.saveJSON(ctx, m)
default:
return repo.saveSenml(m)
return repo.saveSenml(ctx, m)
}
}

func (repo *mongoRepo) saveSenml(messages interface{}) error {
func (repo *mongoRepo) saveSenml(ctx context.Context, messages interface{}) error {
msgs, ok := messages.([]senml.Message)
if !ok {
return errSaveMessage
Expand All @@ -49,23 +49,23 @@ func (repo *mongoRepo) saveSenml(messages interface{}) error {
dbMsgs = append(dbMsgs, msg)
}

_, err := coll.InsertMany(context.Background(), dbMsgs)
_, err := coll.InsertMany(ctx, dbMsgs)
if err != nil {
return errors.Wrap(errSaveMessage, err)
}

return nil
}

func (repo *mongoRepo) saveJSON(msgs json.Messages) error {
func (repo *mongoRepo) saveJSON(ctx context.Context, msgs json.Messages) error {
m := []interface{}{}
for _, msg := range msgs.Data {
m = append(m, msg)
}

coll := repo.db.Collection(msgs.Format)

_, err := coll.InsertMany(context.Background(), m)
_, err := coll.InsertMany(ctx, m)
if err != nil {
return errors.Wrap(errSaveMessage, err)
}
Expand Down
8 changes: 4 additions & 4 deletions consumers/writers/postgres/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ func New(db *sqlx.DB) consumers.BlockingConsumer {
return &postgresRepo{db: db}
}

func (pr postgresRepo) ConsumeBlocking(_ context.Context, message interface{}) (err error) {
func (pr postgresRepo) ConsumeBlocking(ctx context.Context, message interface{}) (err error) {
switch m := message.(type) {
case mfjson.Messages:
return pr.saveJSON(m)
default:
return pr.saveSenml(m)
return pr.saveSenml(ctx, m)
}
}

func (pr postgresRepo) saveSenml(messages interface{}) (err error) {
func (pr postgresRepo) saveSenml(ctx context.Context, messages interface{}) (err error) {
msgs, ok := messages.([]senml.Message)
if !ok {
return errSaveMessage
Expand All @@ -57,7 +57,7 @@ func (pr postgresRepo) saveSenml(messages interface{}) (err error) {
:value, :string_value, :bool_value, :data_value, :sum,
:time, :update_time);`

tx, err := pr.db.BeginTxx(context.Background(), nil)
tx, err := pr.db.BeginTxx(ctx, nil)
if err != nil {
return errors.Wrap(errSaveMessage, err)
}
Expand Down
20 changes: 10 additions & 10 deletions consumers/writers/timescale/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ func New(db *sqlx.DB) consumers.BlockingConsumer {
return &timescaleRepo{db: db}
}

func (tr *timescaleRepo) ConsumeBlocking(_ context.Context, message interface{}) (err error) {
func (tr *timescaleRepo) ConsumeBlocking(ctx context.Context, message interface{}) (err error) {
switch m := message.(type) {
case mfjson.Messages:
return tr.saveJSON(m)
return tr.saveJSON(ctx, m)
default:
return tr.saveSenml(m)
return tr.saveSenml(ctx, m)
}
}

func (tr timescaleRepo) saveSenml(messages interface{}) (err error) {
func (tr timescaleRepo) saveSenml(ctx context.Context, messages interface{}) (err error) {
msgs, ok := messages.([]senml.Message)
if !ok {
return errSaveMessage
Expand All @@ -56,7 +56,7 @@ func (tr timescaleRepo) saveSenml(messages interface{}) (err error) {
:value, :string_value, :bool_value, :data_value, :sum,
:time, :update_time);`

tx, err := tr.db.BeginTxx(context.Background(), nil)
tx, err := tr.db.BeginTxx(ctx, nil)
if err != nil {
return errors.Wrap(errSaveMessage, err)
}
Expand Down Expand Up @@ -90,21 +90,21 @@ func (tr timescaleRepo) saveSenml(messages interface{}) (err error) {
return err
}

func (tr timescaleRepo) saveJSON(msgs mfjson.Messages) error {
if err := tr.insertJSON(msgs); err != nil {
func (tr timescaleRepo) saveJSON(ctx context.Context, msgs mfjson.Messages) error {
if err := tr.insertJSON(ctx, msgs); err != nil {
if err == errNoTable {
if err := tr.createTable(msgs.Format); err != nil {
return err
}
return tr.insertJSON(msgs)
return tr.insertJSON(ctx, msgs)
}
return err
}
return nil
}

func (tr timescaleRepo) insertJSON(msgs mfjson.Messages) error {
tx, err := tr.db.BeginTxx(context.Background(), nil)
func (tr timescaleRepo) insertJSON(ctx context.Context, msgs mfjson.Messages) error {
tx, err := tr.db.BeginTxx(ctx, nil)
if err != nil {
return errors.Wrap(errSaveMessage, err)
}
Expand Down

0 comments on commit 3c22665

Please sign in to comment.