diff --git a/consumers/notifiers/service.go b/consumers/notifiers/service.go index b98e80e791d..ce2511d3e9f 100644 --- a/consumers/notifiers/service.go +++ b/consumers/notifiers/service.go @@ -100,7 +100,7 @@ func (ns *notifierService) RemoveSubscription(ctx context.Context, token, id str return ns.subs.Remove(ctx, id) } -func (ns *notifierService) ConsumeBlocking(_ context.Context, message interface{}) error { +func (ns *notifierService) ConsumeBlocking(ctx context.Context, message interface{}) error { msg, ok := message.(*messaging.Message) if !ok { return ErrMessage @@ -114,7 +114,7 @@ func (ns *notifierService) ConsumeBlocking(_ context.Context, message interface{ Offset: 0, Limit: -1, } - page, err := ns.subs.RetrieveAll(context.Background(), pm) + page, err := ns.subs.RetrieveAll(ctx, pm) if err != nil { return err } @@ -133,7 +133,7 @@ func (ns *notifierService) ConsumeBlocking(_ context.Context, message interface{ return nil } -func (ns *notifierService) ConsumeAsync(_ context.Context, message interface{}) { +func (ns *notifierService) ConsumeAsync(ctx context.Context, message interface{}) { msg, ok := message.(*messaging.Message) if !ok { ns.errCh <- ErrMessage @@ -148,7 +148,7 @@ func (ns *notifierService) ConsumeAsync(_ context.Context, message interface{}) Offset: 0, Limit: -1, } - page, err := ns.subs.RetrieveAll(context.Background(), pm) + page, err := ns.subs.RetrieveAll(ctx, pm) if err != nil { ns.errCh <- err return diff --git a/consumers/writers/influxdb/consumer.go b/consumers/writers/influxdb/consumer.go index 2e37978397c..f4ec040bfdc 100644 --- a/consumers/writers/influxdb/consumer.go +++ b/consumers/writers/influxdb/consumer.go @@ -102,7 +102,7 @@ func (repo *influxRepo) Errors() <-chan error { return nil } -func (repo *influxRepo) ConsumeBlocking(_ context.Context, message interface{}) error { +func (repo *influxRepo) ConsumeBlocking(ctx context.Context, message interface{}) error { var err error var pts []*write.Point switch m := message.(type) { @@ -115,7 +115,7 @@ func (repo *influxRepo) ConsumeBlocking(_ context.Context, message interface{}) return err } - return repo.writeAPIBlocking.WritePoint(context.Background(), pts...) + return repo.writeAPIBlocking.WritePoint(ctx, pts...) } func (repo *influxRepo) senmlPoints(messages interface{}) ([]*write.Point, error) { diff --git a/consumers/writers/mongodb/consumer.go b/consumers/writers/mongodb/consumer.go index 51a1db75920..f9e5a0c1cba 100644 --- a/consumers/writers/mongodb/consumer.go +++ b/consumers/writers/mongodb/consumer.go @@ -29,16 +29,16 @@ func New(db *mongo.Database) consumers.BlockingConsumer { return &mongoRepo{db} } -func (repo *mongoRepo) ConsumeBlocking(_ context.Context, message interface{}) error { +func (repo *mongoRepo) ConsumeBlocking(ctx context.Context, message interface{}) error { switch m := message.(type) { case json.Messages: - return repo.saveJSON(m) + return repo.saveJSON(ctx, m) default: - return repo.saveSenml(m) + return repo.saveSenml(ctx, m) } } -func (repo *mongoRepo) saveSenml(messages interface{}) error { +func (repo *mongoRepo) saveSenml(ctx context.Context, messages interface{}) error { msgs, ok := messages.([]senml.Message) if !ok { return errSaveMessage @@ -49,7 +49,7 @@ func (repo *mongoRepo) saveSenml(messages interface{}) error { dbMsgs = append(dbMsgs, msg) } - _, err := coll.InsertMany(context.Background(), dbMsgs) + _, err := coll.InsertMany(ctx, dbMsgs) if err != nil { return errors.Wrap(errSaveMessage, err) } @@ -57,7 +57,7 @@ func (repo *mongoRepo) saveSenml(messages interface{}) error { return nil } -func (repo *mongoRepo) saveJSON(msgs json.Messages) error { +func (repo *mongoRepo) saveJSON(ctx context.Context, msgs json.Messages) error { m := []interface{}{} for _, msg := range msgs.Data { m = append(m, msg) @@ -65,7 +65,7 @@ func (repo *mongoRepo) saveJSON(msgs json.Messages) error { coll := repo.db.Collection(msgs.Format) - _, err := coll.InsertMany(context.Background(), m) + _, err := coll.InsertMany(ctx, m) if err != nil { return errors.Wrap(errSaveMessage, err) } diff --git a/consumers/writers/postgres/consumer.go b/consumers/writers/postgres/consumer.go index 98d7877a9a3..92219534111 100644 --- a/consumers/writers/postgres/consumer.go +++ b/consumers/writers/postgres/consumer.go @@ -36,16 +36,16 @@ func New(db *sqlx.DB) consumers.BlockingConsumer { return &postgresRepo{db: db} } -func (pr postgresRepo) ConsumeBlocking(_ context.Context, message interface{}) (err error) { +func (pr postgresRepo) ConsumeBlocking(ctx context.Context, message interface{}) (err error) { switch m := message.(type) { case mfjson.Messages: return pr.saveJSON(m) default: - return pr.saveSenml(m) + return pr.saveSenml(ctx, m) } } -func (pr postgresRepo) saveSenml(messages interface{}) (err error) { +func (pr postgresRepo) saveSenml(ctx context.Context, messages interface{}) (err error) { msgs, ok := messages.([]senml.Message) if !ok { return errSaveMessage @@ -57,7 +57,7 @@ func (pr postgresRepo) saveSenml(messages interface{}) (err error) { :value, :string_value, :bool_value, :data_value, :sum, :time, :update_time);` - tx, err := pr.db.BeginTxx(context.Background(), nil) + tx, err := pr.db.BeginTxx(ctx, nil) if err != nil { return errors.Wrap(errSaveMessage, err) } diff --git a/consumers/writers/timescale/consumer.go b/consumers/writers/timescale/consumer.go index 24bbbd37635..8444dc8ecdc 100644 --- a/consumers/writers/timescale/consumer.go +++ b/consumers/writers/timescale/consumer.go @@ -35,16 +35,16 @@ func New(db *sqlx.DB) consumers.BlockingConsumer { return ×caleRepo{db: db} } -func (tr *timescaleRepo) ConsumeBlocking(_ context.Context, message interface{}) (err error) { +func (tr *timescaleRepo) ConsumeBlocking(ctx context.Context, message interface{}) (err error) { switch m := message.(type) { case mfjson.Messages: - return tr.saveJSON(m) + return tr.saveJSON(ctx, m) default: - return tr.saveSenml(m) + return tr.saveSenml(ctx, m) } } -func (tr timescaleRepo) saveSenml(messages interface{}) (err error) { +func (tr timescaleRepo) saveSenml(ctx context.Context, messages interface{}) (err error) { msgs, ok := messages.([]senml.Message) if !ok { return errSaveMessage @@ -56,7 +56,7 @@ func (tr timescaleRepo) saveSenml(messages interface{}) (err error) { :value, :string_value, :bool_value, :data_value, :sum, :time, :update_time);` - tx, err := tr.db.BeginTxx(context.Background(), nil) + tx, err := tr.db.BeginTxx(ctx, nil) if err != nil { return errors.Wrap(errSaveMessage, err) } @@ -90,21 +90,21 @@ func (tr timescaleRepo) saveSenml(messages interface{}) (err error) { return err } -func (tr timescaleRepo) saveJSON(msgs mfjson.Messages) error { - if err := tr.insertJSON(msgs); err != nil { +func (tr timescaleRepo) saveJSON(ctx context.Context, msgs mfjson.Messages) error { + if err := tr.insertJSON(ctx, msgs); err != nil { if err == errNoTable { if err := tr.createTable(msgs.Format); err != nil { return err } - return tr.insertJSON(msgs) + return tr.insertJSON(ctx, msgs) } return err } return nil } -func (tr timescaleRepo) insertJSON(msgs mfjson.Messages) error { - tx, err := tr.db.BeginTxx(context.Background(), nil) +func (tr timescaleRepo) insertJSON(ctx context.Context, msgs mfjson.Messages) error { + tx, err := tr.db.BeginTxx(ctx, nil) if err != nil { return errors.Wrap(errSaveMessage, err) }