diff --git a/cmd/cassandra-writer/main.go b/cmd/cassandra-writer/main.go index d82601fda3..c8c7c4d5a8 100644 --- a/cmd/cassandra-writer/main.go +++ b/cmd/cassandra-writer/main.go @@ -15,6 +15,7 @@ import ( "github.com/gocql/gocql" "github.com/mainflux/mainflux/consumers" + consumerTracing "github.com/mainflux/mainflux/consumers/tracing" "github.com/mainflux/mainflux/consumers/writers/api" "github.com/mainflux/mainflux/consumers/writers/cassandra" "github.com/mainflux/mainflux/internal" @@ -87,8 +88,14 @@ func main() { }() tracer := tp.Tracer(svcName) + httpServerConfig := server.Config{Port: defSvcHttpPort} + if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefix, AltPrefix: envPrefixHttp}); err != nil { + logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) + } + // Create new cassandra-writer repo repo := newService(csdSession, logger) + repo = consumerTracing.NewBlocking(tracer, repo, httpServerConfig) // Create new pub sub broker pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger) @@ -104,12 +111,6 @@ func main() { } // Create new http server - httpServerConfig := server.Config{Port: defSvcHttpPort} - - if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefix, AltPrefix: envPrefixHttp}); err != nil { - logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) - } - hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger) if cfg.SendTelemetry { diff --git a/cmd/influxdb-writer/main.go b/cmd/influxdb-writer/main.go index 16e6872d53..9202e69527 100644 --- a/cmd/influxdb-writer/main.go +++ b/cmd/influxdb-writer/main.go @@ -13,6 +13,7 @@ import ( chclient "github.com/mainflux/callhome/pkg/client" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/consumers" + consumerTracing "github.com/mainflux/mainflux/consumers/tracing" "github.com/mainflux/mainflux/consumers/writers/api" "github.com/mainflux/mainflux/consumers/writers/influxdb" influxDBClient "github.com/mainflux/mainflux/internal/clients/influxdb" @@ -101,7 +102,13 @@ func main() { } defer client.Close() + httpServerConfig := server.Config{Port: defSvcHttpPort} + if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil { + logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) + } + repo := influxdb.NewAsync(client, repocfg) + repo = consumerTracing.NewAsync(tracer, repo, httpServerConfig) // Start consuming and logging errors. go func(log mflog.Logger) { @@ -116,10 +123,6 @@ func main() { logger.Fatal(fmt.Sprintf("failed to start InfluxDB writer: %s", err)) } - httpServerConfig := server.Config{Port: defSvcHttpPort} - if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil { - logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) - } hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger) if cfg.SendTelemetry { diff --git a/cmd/mongodb-writer/main.go b/cmd/mongodb-writer/main.go index ed7cca0de6..abb0826638 100644 --- a/cmd/mongodb-writer/main.go +++ b/cmd/mongodb-writer/main.go @@ -13,6 +13,7 @@ import ( chclient "github.com/mainflux/callhome/pkg/client" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/consumers" + consumerTracing "github.com/mainflux/mainflux/consumers/tracing" "github.com/mainflux/mainflux/consumers/writers/api" "github.com/mainflux/mainflux/consumers/writers/mongodb" "github.com/mainflux/mainflux/internal" @@ -91,16 +92,18 @@ func main() { logger.Fatal(fmt.Sprintf("failed to setup mongo database : %s", err)) } + httpServerConfig := server.Config{Port: defSvcHttpPort} + if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil { + logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) + } + repo := newService(db, logger) + repo = consumerTracing.NewBlocking(tracer, repo, httpServerConfig) if err := consumers.Start(ctx, svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil { logger.Fatal(fmt.Sprintf("failed to start MongoDB writer: %s", err)) } - httpServerConfig := server.Config{Port: defSvcHttpPort} - if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil { - logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) - } hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger) if cfg.SendTelemetry { diff --git a/cmd/postgres-writer/main.go b/cmd/postgres-writer/main.go index 98451a8c75..c5824ca943 100644 --- a/cmd/postgres-writer/main.go +++ b/cmd/postgres-writer/main.go @@ -14,6 +14,7 @@ import ( chclient "github.com/mainflux/callhome/pkg/client" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/consumers" + consumerTracing "github.com/mainflux/mainflux/consumers/tracing" "github.com/mainflux/mainflux/consumers/writers/api" writerPg "github.com/mainflux/mainflux/consumers/writers/postgres" "github.com/mainflux/mainflux/internal" @@ -93,16 +94,18 @@ func main() { } defer db.Close() + httpServerConfig := server.Config{Port: defSvcHttpPort} + if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil { + logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) + } + repo := newService(db, logger) + repo = consumerTracing.NewBlocking(tracer, repo, httpServerConfig) if err = consumers.Start(ctx, svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil { logger.Fatal(fmt.Sprintf("failed to create Postgres writer: %s", err)) } - httpServerConfig := server.Config{Port: defSvcHttpPort} - if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil { - logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) - } hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger) if cfg.SendTelemetry { diff --git a/cmd/timescale-writer/main.go b/cmd/timescale-writer/main.go index 3cd781f840..3f5a7ed7b6 100644 --- a/cmd/timescale-writer/main.go +++ b/cmd/timescale-writer/main.go @@ -15,6 +15,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/mainflux/mainflux/consumers" + consumerTracing "github.com/mainflux/mainflux/consumers/tracing" "github.com/mainflux/mainflux/consumers/writers/api" "github.com/mainflux/mainflux/consumers/writers/timescale" "github.com/mainflux/mainflux/internal" @@ -87,7 +88,13 @@ func main() { }() tracer := tp.Tracer(svcName) + httpServerConfig := server.Config{Port: defSvcHttpPort} + if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil { + logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) + } + repo := newService(db, logger) + repo = consumerTracing.NewBlocking(tracer, repo, httpServerConfig) pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger) if err != nil { @@ -100,10 +107,6 @@ func main() { logger.Fatal(fmt.Sprintf("failed to create Timescale writer: %s", err)) } - httpServerConfig := server.Config{Port: defSvcHttpPort} - if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil { - logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) - } hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger) if cfg.SendTelemetry { diff --git a/consumers/consumer.go b/consumers/consumer.go index 78ca875032..864d9b21a4 100644 --- a/consumers/consumer.go +++ b/consumers/consumer.go @@ -3,12 +3,14 @@ package consumers +import "context" + // AsyncConsumer specifies a non-blocking message-consuming API, // which can be used for writing data to the DB, publishing messages // to broker, sending notifications, or any other asynchronous job. type AsyncConsumer interface { // ConsumeAsync method is used to asynchronously consume received messages. - ConsumeAsync(messages interface{}) + ConsumeAsync(ctx context.Context, messages interface{}) // Errors method returns a channel for reading errors which occur during async writes. // Must be called before performing any writes for errors to be collected. @@ -24,5 +26,5 @@ type AsyncConsumer interface { type BlockingConsumer interface { // ConsumeBlocking method is used to consume received messages synchronously. // A non-nil error is returned to indicate operation failure. - ConsumeBlocking(messages interface{}) error + ConsumeBlocking(ctx context.Context, messages interface{}) error } diff --git a/consumers/messages.go b/consumers/messages.go index ae29d39170..f8aef5d947 100644 --- a/consumers/messages.go +++ b/consumers/messages.go @@ -44,11 +44,11 @@ func Start(ctx context.Context, id string, sub messaging.Subscriber, consumer in for _, subject := range cfg.SubscriberCfg.Subjects { switch c := consumer.(type) { case AsyncConsumer: - if err := sub.Subscribe(ctx, id, subject, handleAsync(transformer, c)); err != nil { + if err := sub.Subscribe(ctx, id, subject, handleAsync(ctx, transformer, c)); err != nil { return err } case BlockingConsumer: - if err := sub.Subscribe(ctx, id, subject, handleSync(transformer, c)); err != nil { + if err := sub.Subscribe(ctx, id, subject, handleSync(ctx, transformer, c)); err != nil { return err } default: @@ -59,7 +59,7 @@ func Start(ctx context.Context, id string, sub messaging.Subscriber, consumer in return nil } -func handleSync(t transformers.Transformer, sc BlockingConsumer) handleFunc { +func handleSync(ctx context.Context, t transformers.Transformer, sc BlockingConsumer) handleFunc { return func(msg *messaging.Message) error { m := interface{}(msg) var err error @@ -69,11 +69,11 @@ func handleSync(t transformers.Transformer, sc BlockingConsumer) handleFunc { return err } } - return sc.ConsumeBlocking(m) + return sc.ConsumeBlocking(ctx, m) } } -func handleAsync(t transformers.Transformer, ac AsyncConsumer) handleFunc { +func handleAsync(ctx context.Context, t transformers.Transformer, ac AsyncConsumer) handleFunc { return func(msg *messaging.Message) error { m := interface{}(msg) var err error @@ -84,7 +84,7 @@ func handleAsync(t transformers.Transformer, ac AsyncConsumer) handleFunc { } } - ac.ConsumeAsync(m) + ac.ConsumeAsync(ctx, m) return nil } } diff --git a/consumers/notifiers/api/logging.go b/consumers/notifiers/api/logging.go index c1603bc6fc..e1acdd921b 100644 --- a/consumers/notifiers/api/logging.go +++ b/consumers/notifiers/api/logging.go @@ -88,7 +88,7 @@ func (lm *loggingMiddleware) RemoveSubscription(ctx context.Context, token, id s // ConsumeBlocking logs the consume_blocking request. It logs the message and the time it took to complete the request. // If the request fails, it logs the error. -func (lm *loggingMiddleware) ConsumeBlocking(msg interface{}) (err error) { +func (lm *loggingMiddleware) ConsumeBlocking(ctx context.Context, msg interface{}) (err error) { defer func(begin time.Time) { message := fmt.Sprintf("Method consume took %s to complete", time.Since(begin)) if err != nil { @@ -98,5 +98,5 @@ func (lm *loggingMiddleware) ConsumeBlocking(msg interface{}) (err error) { lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.ConsumeBlocking(msg) + return lm.svc.ConsumeBlocking(ctx, msg) } diff --git a/consumers/notifiers/api/metrics.go b/consumers/notifiers/api/metrics.go index 0475416546..f187d92510 100644 --- a/consumers/notifiers/api/metrics.go +++ b/consumers/notifiers/api/metrics.go @@ -71,11 +71,11 @@ func (ms *metricsMiddleware) RemoveSubscription(ctx context.Context, token, id s } // ConsumeBlocking instruments ConsumeBlocking method with metrics. -func (ms *metricsMiddleware) ConsumeBlocking(msg interface{}) error { +func (ms *metricsMiddleware) ConsumeBlocking(ctx context.Context, msg interface{}) error { defer func(begin time.Time) { ms.counter.With("method", "consume").Add(1) ms.latency.With("method", "consume").Observe(time.Since(begin).Seconds()) }(time.Now()) - return ms.svc.ConsumeBlocking(msg) + return ms.svc.ConsumeBlocking(ctx, msg) } diff --git a/consumers/notifiers/service.go b/consumers/notifiers/service.go index baaad1261d..ce2511d3e9 100644 --- a/consumers/notifiers/service.go +++ b/consumers/notifiers/service.go @@ -100,7 +100,7 @@ func (ns *notifierService) RemoveSubscription(ctx context.Context, token, id str return ns.subs.Remove(ctx, id) } -func (ns *notifierService) ConsumeBlocking(message interface{}) error { +func (ns *notifierService) ConsumeBlocking(ctx context.Context, message interface{}) error { msg, ok := message.(*messaging.Message) if !ok { return ErrMessage @@ -114,7 +114,7 @@ func (ns *notifierService) ConsumeBlocking(message interface{}) error { Offset: 0, Limit: -1, } - page, err := ns.subs.RetrieveAll(context.Background(), pm) + page, err := ns.subs.RetrieveAll(ctx, pm) if err != nil { return err } @@ -133,7 +133,7 @@ func (ns *notifierService) ConsumeBlocking(message interface{}) error { return nil } -func (ns *notifierService) ConsumeAsync(message interface{}) { +func (ns *notifierService) ConsumeAsync(ctx context.Context, message interface{}) { msg, ok := message.(*messaging.Message) if !ok { ns.errCh <- ErrMessage @@ -148,7 +148,7 @@ func (ns *notifierService) ConsumeAsync(message interface{}) { Offset: 0, Limit: -1, } - page, err := ns.subs.RetrieveAll(context.Background(), pm) + page, err := ns.subs.RetrieveAll(ctx, pm) if err != nil { ns.errCh <- err return diff --git a/consumers/notifiers/service_test.go b/consumers/notifiers/service_test.go index 726ae951da..dad2de526c 100644 --- a/consumers/notifiers/service_test.go +++ b/consumers/notifiers/service_test.go @@ -322,7 +322,7 @@ func TestConsume(t *testing.T) { } for _, tc := range cases { - err := svc.ConsumeBlocking(tc.msg) + err := svc.ConsumeBlocking(context.TODO(), tc.msg) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) } } diff --git a/consumers/tracing/consumers.go b/consumers/tracing/consumers.go new file mode 100644 index 0000000000..bc3adfb71b --- /dev/null +++ b/consumers/tracing/consumers.go @@ -0,0 +1,127 @@ +package tracing + +import ( + "context" + "fmt" + + "github.com/mainflux/mainflux/consumers" + "github.com/mainflux/mainflux/internal/server" + mfjson "github.com/mainflux/mainflux/pkg/transformers/json" + "github.com/mainflux/mainflux/pkg/transformers/senml" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +const ( + consumeBlockingOP = "retrieve_blocking" // This is not specified in the open telemetry spec. + consumeAsyncOP = "retrieve_async" // This is not specified in the open telemetry spec. +) + +var defaultAttributes = []attribute.KeyValue{ + attribute.String("messaging.system", "nats"), + attribute.Bool("messaging.destination.anonymous", false), + attribute.String("messaging.destination.template", "channels/{channelID}/messages/*"), + attribute.Bool("messaging.destination.temporary", true), + attribute.String("network.protocol.name", "nats"), + attribute.String("network.protocol.version", "2.2.4"), + attribute.String("network.transport", "tcp"), + attribute.String("network.type", "ipv4"), +} + +var _ consumers.AsyncConsumer = (*tracingMiddlewareAsync)(nil) +var _ consumers.BlockingConsumer = (*tracingMiddlewareBlock)(nil) + +type tracingMiddlewareAsync struct { + consumer consumers.AsyncConsumer + tracer trace.Tracer + host server.Config +} +type tracingMiddlewareBlock struct { + consumer consumers.BlockingConsumer + tracer trace.Tracer + host server.Config +} + +// NewAsync creates a new traced consumers.AsyncConsumer service. +func NewAsync(tracer trace.Tracer, consumerAsync consumers.AsyncConsumer, host server.Config) consumers.AsyncConsumer { + return &tracingMiddlewareAsync{ + consumer: consumerAsync, + tracer: tracer, + host: host, + } +} + +// NewBlocking creates a new traced consumers.BlockingConsumer service. +func NewBlocking(tracer trace.Tracer, consumerBlock consumers.BlockingConsumer, host server.Config) consumers.BlockingConsumer { + return &tracingMiddlewareBlock{ + consumer: consumerBlock, + tracer: tracer, + host: host, + } +} + +// ConsumeBlocking traces consume operations for message/s consumed. +func (tm *tracingMiddlewareBlock) ConsumeBlocking(ctx context.Context, messages interface{}) error { + var span trace.Span + switch m := messages.(type) { + case mfjson.Messages: + if len(m.Data) > 0 { + firstMsg := m.Data[0] + ctx, span = createSpan(ctx, consumeBlockingOP, firstMsg.Publisher, firstMsg.Channel, firstMsg.Subtopic, len(m.Data), tm.host, trace.SpanKindConsumer, tm.tracer) + defer span.End() + } + case []senml.Message: + if len(m) > 0 { + firstMsg := m[0] + ctx, span = createSpan(ctx, consumeBlockingOP, firstMsg.Publisher, firstMsg.Channel, firstMsg.Subtopic, len(m), tm.host, trace.SpanKindConsumer, tm.tracer) + defer span.End() + } + } + return tm.consumer.ConsumeBlocking(ctx, messages) +} + +// ConsumeAsync traces consume operations for message/s consumed. +func (tm *tracingMiddlewareAsync) ConsumeAsync(ctx context.Context, messages interface{}) { + var span trace.Span + switch m := messages.(type) { + case mfjson.Messages: + if len(m.Data) > 0 { + firstMsg := m.Data[0] + ctx, span = createSpan(ctx, consumeAsyncOP, firstMsg.Publisher, firstMsg.Channel, firstMsg.Subtopic, len(m.Data), tm.host, trace.SpanKindConsumer, tm.tracer) + defer span.End() + } + case []senml.Message: + if len(m) > 0 { + firstMsg := m[0] + ctx, span = createSpan(ctx, consumeAsyncOP, firstMsg.Publisher, firstMsg.Channel, firstMsg.Subtopic, len(m), tm.host, trace.SpanKindConsumer, tm.tracer) + defer span.End() + } + } + tm.consumer.ConsumeAsync(ctx, messages) +} + +// Errors traces async consume errors. +func (tm *tracingMiddlewareAsync) Errors() <-chan error { + return tm.consumer.Errors() +} + +func createSpan(ctx context.Context, operation, clientID, topic, subTopic string, noMessages int, cfg server.Config, spanKind trace.SpanKind, tracer trace.Tracer) (context.Context, trace.Span) { + var subject = fmt.Sprintf("channels.%s.messages", topic) + if subTopic != "" { + subject = fmt.Sprintf("%s.%s", subject, subTopic) + } + spanName := fmt.Sprintf("%s %s", subject, operation) + + kvOpts := []attribute.KeyValue{ + attribute.String("messaging.operation", operation), + attribute.String("messaging.client_id", clientID), + attribute.String("messaging.destination.name", subject), + attribute.String("server.address", cfg.Host), + attribute.String("server.socket.port", cfg.Port), + attribute.Int("messaging.batch.message_count", noMessages), + } + + kvOpts = append(kvOpts, defaultAttributes...) + + return tracer.Start(ctx, spanName, trace.WithAttributes(kvOpts...), trace.WithSpanKind(spanKind)) +} diff --git a/consumers/writers/api/logging.go b/consumers/writers/api/logging.go index 479a192690..c53179901b 100644 --- a/consumers/writers/api/logging.go +++ b/consumers/writers/api/logging.go @@ -6,6 +6,7 @@ package api import ( + "context" "fmt" "time" @@ -30,7 +31,7 @@ func LoggingMiddleware(consumer consumers.BlockingConsumer, logger mflog.Logger) // ConsumeBlocking logs the consume request. It logs the time it took to complete the request. // If the request fails, it logs the error. -func (lm *loggingMiddleware) ConsumeBlocking(msgs interface{}) (err error) { +func (lm *loggingMiddleware) ConsumeBlocking(ctx context.Context, msgs interface{}) (err error) { defer func(begin time.Time) { message := fmt.Sprintf("Method consume took %s to complete", time.Since(begin)) if err != nil { @@ -40,5 +41,5 @@ func (lm *loggingMiddleware) ConsumeBlocking(msgs interface{}) (err error) { lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.consumer.ConsumeBlocking(msgs) + return lm.consumer.ConsumeBlocking(ctx, msgs) } diff --git a/consumers/writers/api/metrics.go b/consumers/writers/api/metrics.go index 7d9cfe0e9a..0610117bad 100644 --- a/consumers/writers/api/metrics.go +++ b/consumers/writers/api/metrics.go @@ -6,6 +6,7 @@ package api import ( + "context" "time" "github.com/go-kit/kit/metrics" @@ -31,10 +32,10 @@ func MetricsMiddleware(consumer consumers.BlockingConsumer, counter metrics.Coun } // ConsumeBlocking instruments ConsumeBlocking method with metrics. -func (mm *metricsMiddleware) ConsumeBlocking(msgs interface{}) error { +func (mm *metricsMiddleware) ConsumeBlocking(ctx context.Context, msgs interface{}) error { defer func(begin time.Time) { mm.counter.With("method", "consume").Add(1) mm.latency.With("method", "consume").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.consumer.ConsumeBlocking(msgs) + return mm.consumer.ConsumeBlocking(ctx, msgs) } diff --git a/consumers/writers/cassandra/consumer.go b/consumers/writers/cassandra/consumer.go index 37263a7f19..864494a7b9 100644 --- a/consumers/writers/cassandra/consumer.go +++ b/consumers/writers/cassandra/consumer.go @@ -4,6 +4,7 @@ package cassandra import ( + "context" "encoding/json" "fmt" @@ -29,7 +30,7 @@ func New(session *gocql.Session) consumers.BlockingConsumer { return &cassandraRepository{session} } -func (cr *cassandraRepository) ConsumeBlocking(message interface{}) error { +func (cr *cassandraRepository) ConsumeBlocking(_ context.Context, message interface{}) error { switch m := message.(type) { case mfjson.Messages: return cr.saveJSON(m) diff --git a/consumers/writers/cassandra/consumer_test.go b/consumers/writers/cassandra/consumer_test.go index 7561fdf6be..9109ce7238 100644 --- a/consumers/writers/cassandra/consumer_test.go +++ b/consumers/writers/cassandra/consumer_test.go @@ -4,6 +4,7 @@ package cassandra_test import ( + "context" "fmt" "testing" "time" @@ -72,7 +73,7 @@ func TestSaveSenml(t *testing.T) { msgs = append(msgs, msg) } - err = repo.ConsumeBlocking(msgs) + err = repo.ConsumeBlocking(context.TODO(), msgs) assert.Nil(t, err, fmt.Sprintf("expected no error, got %s", err)) } @@ -116,6 +117,6 @@ func TestSaveJSON(t *testing.T) { msgs.Data = append(msgs.Data, msg) } - err = repo.ConsumeBlocking(msgs) + err = repo.ConsumeBlocking(context.TODO(), msgs) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } diff --git a/consumers/writers/influxdb/consumer.go b/consumers/writers/influxdb/consumer.go index c787d72257..f4ec040bfd 100644 --- a/consumers/writers/influxdb/consumer.go +++ b/consumers/writers/influxdb/consumer.go @@ -58,7 +58,7 @@ func NewAsync(client influxdb2.Client, config RepoConfig) consumers.AsyncConsume } } -func (repo *influxRepo) ConsumeAsync(message interface{}) { +func (repo *influxRepo) ConsumeAsync(_ context.Context, message interface{}) { var err error var pts []*write.Point switch m := message.(type) { @@ -102,7 +102,7 @@ func (repo *influxRepo) Errors() <-chan error { return nil } -func (repo *influxRepo) ConsumeBlocking(message interface{}) error { +func (repo *influxRepo) ConsumeBlocking(ctx context.Context, message interface{}) error { var err error var pts []*write.Point switch m := message.(type) { @@ -115,7 +115,7 @@ func (repo *influxRepo) ConsumeBlocking(message interface{}) error { return err } - return repo.writeAPIBlocking.WritePoint(context.Background(), pts...) + return repo.writeAPIBlocking.WritePoint(ctx, pts...) } func (repo *influxRepo) senmlPoints(messages interface{}) ([]*write.Point, error) { diff --git a/consumers/writers/influxdb/consumer_test.go b/consumers/writers/influxdb/consumer_test.go index 3b317e6973..ec6eb155d2 100644 --- a/consumers/writers/influxdb/consumer_test.go +++ b/consumers/writers/influxdb/consumer_test.go @@ -177,7 +177,7 @@ func TestAsyncSaveSenml(t *testing.T) { } errs := asyncRepo.Errors() - asyncRepo.ConsumeAsync(msgs) + asyncRepo.ConsumeAsync(context.TODO(), msgs) err = <-errs assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) @@ -246,7 +246,7 @@ func TestBlockingSaveSenml(t *testing.T) { msgs = append(msgs, msg) } - err = syncRepo.ConsumeBlocking(msgs) + err = syncRepo.ConsumeBlocking(context.TODO(), msgs) assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) count, err := queryDB(rowCountSenml) @@ -350,7 +350,7 @@ func TestAsyncSaveJSON(t *testing.T) { err := resetBucket() assert.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err)) - asyncRepo.ConsumeAsync(msgs) + asyncRepo.ConsumeAsync(context.TODO(), msgs) timer := time.NewTimer(1 * time.Millisecond) select { case err = <-asyncRepo.Errors(): @@ -463,7 +463,7 @@ func TestBlockingSaveJSON(t *testing.T) { err := resetBucket() assert.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err)) - switch err = syncRepo.ConsumeBlocking(tc.msgs); err { + switch err = syncRepo.ConsumeBlocking(context.TODO(), tc.msgs); err { case nil: count, err := queryDB(rowCountJson) assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err)) diff --git a/consumers/writers/mongodb/consumer.go b/consumers/writers/mongodb/consumer.go index 3ac291d62a..f9e5a0c1cb 100644 --- a/consumers/writers/mongodb/consumer.go +++ b/consumers/writers/mongodb/consumer.go @@ -29,16 +29,16 @@ func New(db *mongo.Database) consumers.BlockingConsumer { return &mongoRepo{db} } -func (repo *mongoRepo) ConsumeBlocking(message interface{}) error { +func (repo *mongoRepo) ConsumeBlocking(ctx context.Context, message interface{}) error { switch m := message.(type) { case json.Messages: - return repo.saveJSON(m) + return repo.saveJSON(ctx, m) default: - return repo.saveSenml(m) + return repo.saveSenml(ctx, m) } } -func (repo *mongoRepo) saveSenml(messages interface{}) error { +func (repo *mongoRepo) saveSenml(ctx context.Context, messages interface{}) error { msgs, ok := messages.([]senml.Message) if !ok { return errSaveMessage @@ -49,7 +49,7 @@ func (repo *mongoRepo) saveSenml(messages interface{}) error { dbMsgs = append(dbMsgs, msg) } - _, err := coll.InsertMany(context.Background(), dbMsgs) + _, err := coll.InsertMany(ctx, dbMsgs) if err != nil { return errors.Wrap(errSaveMessage, err) } @@ -57,7 +57,7 @@ func (repo *mongoRepo) saveSenml(messages interface{}) error { return nil } -func (repo *mongoRepo) saveJSON(msgs json.Messages) error { +func (repo *mongoRepo) saveJSON(ctx context.Context, msgs json.Messages) error { m := []interface{}{} for _, msg := range msgs.Data { m = append(m, msg) @@ -65,7 +65,7 @@ func (repo *mongoRepo) saveJSON(msgs json.Messages) error { coll := repo.db.Collection(msgs.Format) - _, err := coll.InsertMany(context.Background(), m) + _, err := coll.InsertMany(ctx, m) if err != nil { return errors.Wrap(errSaveMessage, err) } diff --git a/consumers/writers/mongodb/consumer_test.go b/consumers/writers/mongodb/consumer_test.go index a8cb7e7453..2aa427e3bd 100644 --- a/consumers/writers/mongodb/consumer_test.go +++ b/consumers/writers/mongodb/consumer_test.go @@ -83,7 +83,7 @@ func TestSaveSenml(t *testing.T) { msgs = append(msgs, msg) } - err = repo.ConsumeBlocking(msgs) + err = repo.ConsumeBlocking(context.TODO(), msgs) require.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) count, err := db.Collection(collection).CountDocuments(context.Background(), bson.D{}) @@ -131,6 +131,6 @@ func TestSaveJSON(t *testing.T) { msgs.Data = append(msgs.Data, msg) } - err = repo.ConsumeBlocking(msgs) + err = repo.ConsumeBlocking(context.TODO(), msgs) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } diff --git a/consumers/writers/postgres/consumer.go b/consumers/writers/postgres/consumer.go index 1512cb939d..9221953411 100644 --- a/consumers/writers/postgres/consumer.go +++ b/consumers/writers/postgres/consumer.go @@ -36,16 +36,16 @@ func New(db *sqlx.DB) consumers.BlockingConsumer { return &postgresRepo{db: db} } -func (pr postgresRepo) ConsumeBlocking(message interface{}) (err error) { +func (pr postgresRepo) ConsumeBlocking(ctx context.Context, message interface{}) (err error) { switch m := message.(type) { case mfjson.Messages: return pr.saveJSON(m) default: - return pr.saveSenml(m) + return pr.saveSenml(ctx, m) } } -func (pr postgresRepo) saveSenml(messages interface{}) (err error) { +func (pr postgresRepo) saveSenml(ctx context.Context, messages interface{}) (err error) { msgs, ok := messages.([]senml.Message) if !ok { return errSaveMessage @@ -57,7 +57,7 @@ func (pr postgresRepo) saveSenml(messages interface{}) (err error) { :value, :string_value, :bool_value, :data_value, :sum, :time, :update_time);` - tx, err := pr.db.BeginTxx(context.Background(), nil) + tx, err := pr.db.BeginTxx(ctx, nil) if err != nil { return errors.Wrap(errSaveMessage, err) } diff --git a/consumers/writers/postgres/consumer_test.go b/consumers/writers/postgres/consumer_test.go index 16eed9bf08..5350522cec 100644 --- a/consumers/writers/postgres/consumer_test.go +++ b/consumers/writers/postgres/consumer_test.go @@ -4,6 +4,7 @@ package postgres_test import ( + "context" "fmt" "testing" "time" @@ -67,7 +68,7 @@ func TestSaveSenml(t *testing.T) { msgs = append(msgs, msg) } - err = repo.ConsumeBlocking(msgs) + err = repo.ConsumeBlocking(context.TODO(), msgs) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } @@ -107,6 +108,6 @@ func TestSaveJSON(t *testing.T) { msgs.Data = append(msgs.Data, msg) } - err = repo.ConsumeBlocking(msgs) + err = repo.ConsumeBlocking(context.TODO(), msgs) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } diff --git a/consumers/writers/timescale/consumer.go b/consumers/writers/timescale/consumer.go index 6e652ce778..8444dc8ecd 100644 --- a/consumers/writers/timescale/consumer.go +++ b/consumers/writers/timescale/consumer.go @@ -35,16 +35,16 @@ func New(db *sqlx.DB) consumers.BlockingConsumer { return ×caleRepo{db: db} } -func (tr *timescaleRepo) ConsumeBlocking(message interface{}) (err error) { +func (tr *timescaleRepo) ConsumeBlocking(ctx context.Context, message interface{}) (err error) { switch m := message.(type) { case mfjson.Messages: - return tr.saveJSON(m) + return tr.saveJSON(ctx, m) default: - return tr.saveSenml(m) + return tr.saveSenml(ctx, m) } } -func (tr timescaleRepo) saveSenml(messages interface{}) (err error) { +func (tr timescaleRepo) saveSenml(ctx context.Context, messages interface{}) (err error) { msgs, ok := messages.([]senml.Message) if !ok { return errSaveMessage @@ -56,7 +56,7 @@ func (tr timescaleRepo) saveSenml(messages interface{}) (err error) { :value, :string_value, :bool_value, :data_value, :sum, :time, :update_time);` - tx, err := tr.db.BeginTxx(context.Background(), nil) + tx, err := tr.db.BeginTxx(ctx, nil) if err != nil { return errors.Wrap(errSaveMessage, err) } @@ -90,21 +90,21 @@ func (tr timescaleRepo) saveSenml(messages interface{}) (err error) { return err } -func (tr timescaleRepo) saveJSON(msgs mfjson.Messages) error { - if err := tr.insertJSON(msgs); err != nil { +func (tr timescaleRepo) saveJSON(ctx context.Context, msgs mfjson.Messages) error { + if err := tr.insertJSON(ctx, msgs); err != nil { if err == errNoTable { if err := tr.createTable(msgs.Format); err != nil { return err } - return tr.insertJSON(msgs) + return tr.insertJSON(ctx, msgs) } return err } return nil } -func (tr timescaleRepo) insertJSON(msgs mfjson.Messages) error { - tx, err := tr.db.BeginTxx(context.Background(), nil) +func (tr timescaleRepo) insertJSON(ctx context.Context, msgs mfjson.Messages) error { + tx, err := tr.db.BeginTxx(ctx, nil) if err != nil { return errors.Wrap(errSaveMessage, err) } diff --git a/consumers/writers/timescale/consumer_test.go b/consumers/writers/timescale/consumer_test.go index 6659879433..11c6cde4a1 100644 --- a/consumers/writers/timescale/consumer_test.go +++ b/consumers/writers/timescale/consumer_test.go @@ -4,6 +4,7 @@ package timescale_test import ( + "context" "fmt" "testing" "time" @@ -67,7 +68,7 @@ func TestSaveSenml(t *testing.T) { msgs = append(msgs, msg) } - err = repo.ConsumeBlocking(msgs) + err = repo.ConsumeBlocking(context.TODO(), msgs) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } @@ -107,6 +108,6 @@ func TestSaveJSON(t *testing.T) { msgs.Data = append(msgs.Data, msg) } - err = repo.ConsumeBlocking(msgs) + err = repo.ConsumeBlocking(context.TODO(), msgs) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } diff --git a/readers/cassandra/messages_test.go b/readers/cassandra/messages_test.go index b553d85c94..ad2dba2627 100644 --- a/readers/cassandra/messages_test.go +++ b/readers/cassandra/messages_test.go @@ -4,6 +4,7 @@ package cassandra_test import ( + "context" "fmt" "testing" "time" @@ -115,7 +116,7 @@ func TestReadSenml(t *testing.T) { messages = append(messages, msg) } - err = writer.ConsumeBlocking(messages) + err = writer.ConsumeBlocking(context.TODO(), messages) require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err)) reader := creader.New(session) @@ -452,7 +453,7 @@ func TestReadJSON(t *testing.T) { msgs1 = append(msgs1, m) } - err = writer.ConsumeBlocking(messages1) + err = writer.ConsumeBlocking(context.TODO(), messages1) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) id2, err := idProvider.ID() @@ -486,7 +487,7 @@ func TestReadJSON(t *testing.T) { msgs2 = append(msgs2, m) } - err = writer.ConsumeBlocking(messages2) + err = writer.ConsumeBlocking(context.TODO(), messages2) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) httpMsgs := []map[string]interface{}{} diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index d88f5902fc..127550734e 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -4,6 +4,7 @@ package influxdb_test import ( + "context" "fmt" "testing" "time" @@ -114,7 +115,7 @@ func TestReadSenml(t *testing.T) { } errs := asyncWriter.Errors() - asyncWriter.ConsumeAsync(messages) + asyncWriter.ConsumeAsync(context.TODO(), messages) err = <-errs assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) @@ -439,7 +440,7 @@ func TestReadJSON(t *testing.T) { } errs := asyncWriter.Errors() - asyncWriter.ConsumeAsync(messages1) + asyncWriter.ConsumeAsync(context.TODO(), messages1) err = <-errs require.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) @@ -470,7 +471,7 @@ func TestReadJSON(t *testing.T) { } // Test async - asyncWriter.ConsumeAsync(messages2) + asyncWriter.ConsumeAsync(context.TODO(), messages2) err = <-errs assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) diff --git a/readers/mongodb/messages_test.go b/readers/mongodb/messages_test.go index 67dbf57edb..c35909523d 100644 --- a/readers/mongodb/messages_test.go +++ b/readers/mongodb/messages_test.go @@ -109,7 +109,7 @@ func TestReadSenml(t *testing.T) { } messages = append(messages, msg) } - err = writer.ConsumeBlocking(messages) + err = writer.ConsumeBlocking(context.TODO(), messages) require.Nil(t, err, fmt.Sprintf("failed to store message to MongoDB: %s", err)) reader := mreader.New(db) @@ -417,7 +417,7 @@ func TestReadJSON(t *testing.T) { msgs1 = append(msgs1, m) } - err = writer.ConsumeBlocking(messages1) + err = writer.ConsumeBlocking(context.TODO(), messages1) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) id2, err := idProvider.ID() @@ -451,7 +451,7 @@ func TestReadJSON(t *testing.T) { msgs2 = append(msgs2, m) } - err = writer.ConsumeBlocking(messages2) + err = writer.ConsumeBlocking(context.TODO(), messages2) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) httpMsgs := []map[string]interface{}{} diff --git a/readers/postgres/messages_test.go b/readers/postgres/messages_test.go index 440f3f486a..ad498abeb4 100644 --- a/readers/postgres/messages_test.go +++ b/readers/postgres/messages_test.go @@ -4,6 +4,7 @@ package postgres_test import ( + "context" "fmt" "testing" "time" @@ -99,7 +100,7 @@ func TestReadSenml(t *testing.T) { messages = append(messages, msg) } - err = writer.ConsumeBlocking(messages) + err = writer.ConsumeBlocking(context.TODO(), messages) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) reader := preader.New(db) @@ -409,7 +410,7 @@ func TestReadJSON(t *testing.T) { msgs1 = append(msgs1, m) } - err = writer.ConsumeBlocking(messages1) + err = writer.ConsumeBlocking(context.TODO(), messages1) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) id2, err := idProvider.ID() @@ -440,7 +441,7 @@ func TestReadJSON(t *testing.T) { msgs2 = append(msgs2, m) } - err = writer.ConsumeBlocking(messages2) + err = writer.ConsumeBlocking(context.TODO(), messages2) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) httpMsgs := []map[string]interface{}{} diff --git a/readers/timescale/messages_test.go b/readers/timescale/messages_test.go index 1a92131d80..82310cac36 100644 --- a/readers/timescale/messages_test.go +++ b/readers/timescale/messages_test.go @@ -4,6 +4,7 @@ package timescale_test import ( + "context" "fmt" "testing" "time" @@ -98,7 +99,7 @@ func TestReadSenml(t *testing.T) { messages = append(messages, msg) } - err = writer.ConsumeBlocking(messages) + err = writer.ConsumeBlocking(context.TODO(), messages) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) reader := treader.New(db) @@ -411,7 +412,7 @@ func TestReadJSON(t *testing.T) { msgs1 = append(msgs1, mapped) } - err = writer.ConsumeBlocking(messages1) + err = writer.ConsumeBlocking(context.TODO(), messages1) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) id2, err := idProvider.ID() @@ -443,7 +444,7 @@ func TestReadJSON(t *testing.T) { msgs2 = append(msgs2, mapped) } - err = writer.ConsumeBlocking(messages2) + err = writer.ConsumeBlocking(context.TODO(), messages2) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) httpMsgs := []map[string]interface{}{}