Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Trace consume operations #1778

Merged
merged 15 commits into from
Jul 22, 2023
13 changes: 7 additions & 6 deletions cmd/cassandra-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/gocql/gocql"
"github.com/mainflux/mainflux/consumers"
consumerTracing "github.com/mainflux/mainflux/consumers/tracing"
"github.com/mainflux/mainflux/consumers/writers/api"
"github.com/mainflux/mainflux/consumers/writers/cassandra"
"github.com/mainflux/mainflux/internal"
Expand Down Expand Up @@ -87,8 +88,14 @@ func main() {
}()
tracer := tp.Tracer(svcName)

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefix, AltPrefix: envPrefixHttp}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}

// Create new cassandra-writer repo
repo := newService(csdSession, logger)
repo = consumerTracing.NewBlocking(tracer, repo, httpServerConfig)

// Create new pub sub broker
pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
Expand All @@ -104,12 +111,6 @@ func main() {
}

// Create new http server
httpServerConfig := server.Config{Port: defSvcHttpPort}

if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefix, AltPrefix: envPrefixHttp}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}

hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger)

if cfg.SendTelemetry {
Expand Down
11 changes: 7 additions & 4 deletions cmd/influxdb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
chclient "github.com/mainflux/callhome/pkg/client"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/consumers"
consumerTracing "github.com/mainflux/mainflux/consumers/tracing"
"github.com/mainflux/mainflux/consumers/writers/api"
"github.com/mainflux/mainflux/consumers/writers/influxdb"
influxDBClient "github.com/mainflux/mainflux/internal/clients/influxdb"
Expand Down Expand Up @@ -101,7 +102,13 @@ func main() {
}
defer client.Close()

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}

repo := influxdb.NewAsync(client, repocfg)
repo = consumerTracing.NewAsync(tracer, repo, httpServerConfig)

// Start consuming and logging errors.
go func(log mflog.Logger) {
Expand All @@ -116,10 +123,6 @@ func main() {
logger.Fatal(fmt.Sprintf("failed to start InfluxDB writer: %s", err))
}

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger)

if cfg.SendTelemetry {
Expand Down
11 changes: 7 additions & 4 deletions cmd/mongodb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
chclient "github.com/mainflux/callhome/pkg/client"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/consumers"
consumerTracing "github.com/mainflux/mainflux/consumers/tracing"
"github.com/mainflux/mainflux/consumers/writers/api"
"github.com/mainflux/mainflux/consumers/writers/mongodb"
"github.com/mainflux/mainflux/internal"
Expand Down Expand Up @@ -91,16 +92,18 @@ func main() {
logger.Fatal(fmt.Sprintf("failed to setup mongo database : %s", err))
}

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}

repo := newService(db, logger)
repo = consumerTracing.NewBlocking(tracer, repo, httpServerConfig)

if err := consumers.Start(ctx, svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil {
logger.Fatal(fmt.Sprintf("failed to start MongoDB writer: %s", err))
}

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger)

if cfg.SendTelemetry {
Expand Down
11 changes: 7 additions & 4 deletions cmd/postgres-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
chclient "github.com/mainflux/callhome/pkg/client"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/consumers"
consumerTracing "github.com/mainflux/mainflux/consumers/tracing"
"github.com/mainflux/mainflux/consumers/writers/api"
writerPg "github.com/mainflux/mainflux/consumers/writers/postgres"
"github.com/mainflux/mainflux/internal"
Expand Down Expand Up @@ -93,16 +94,18 @@ func main() {
}
defer db.Close()

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}

repo := newService(db, logger)
repo = consumerTracing.NewBlocking(tracer, repo, httpServerConfig)

if err = consumers.Start(ctx, svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil {
logger.Fatal(fmt.Sprintf("failed to create Postgres writer: %s", err))
}

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger)

if cfg.SendTelemetry {
Expand Down
11 changes: 7 additions & 4 deletions cmd/timescale-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/jmoiron/sqlx"
"github.com/mainflux/mainflux/consumers"
consumerTracing "github.com/mainflux/mainflux/consumers/tracing"
"github.com/mainflux/mainflux/consumers/writers/api"
"github.com/mainflux/mainflux/consumers/writers/timescale"
"github.com/mainflux/mainflux/internal"
Expand Down Expand Up @@ -87,7 +88,13 @@ func main() {
}()
tracer := tp.Tracer(svcName)

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}

repo := newService(db, logger)
repo = consumerTracing.NewBlocking(tracer, repo, httpServerConfig)

pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
if err != nil {
Expand All @@ -100,10 +107,6 @@ func main() {
logger.Fatal(fmt.Sprintf("failed to create Timescale writer: %s", err))
}

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger)

if cfg.SendTelemetry {
Expand Down
6 changes: 4 additions & 2 deletions consumers/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

package consumers

import "context"

// AsyncConsumer specifies a non-blocking message-consuming API,
// which can be used for writing data to the DB, publishing messages
// to broker, sending notifications, or any other asynchronous job.
type AsyncConsumer interface {
// ConsumeAsync method is used to asynchronously consume received messages.
ConsumeAsync(messages interface{})
ConsumeAsync(ctx context.Context, messages interface{})

// Errors method returns a channel for reading errors which occur during async writes.
// Must be called before performing any writes for errors to be collected.
Expand All @@ -24,5 +26,5 @@ type AsyncConsumer interface {
type BlockingConsumer interface {
// ConsumeBlocking method is used to consume received messages synchronously.
// A non-nil error is returned to indicate operation failure.
ConsumeBlocking(messages interface{}) error
ConsumeBlocking(ctx context.Context, messages interface{}) error
}
12 changes: 6 additions & 6 deletions consumers/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func Start(ctx context.Context, id string, sub messaging.Subscriber, consumer in
for _, subject := range cfg.SubscriberCfg.Subjects {
switch c := consumer.(type) {
case AsyncConsumer:
if err := sub.Subscribe(ctx, id, subject, handleAsync(transformer, c)); err != nil {
if err := sub.Subscribe(ctx, id, subject, handleAsync(ctx, transformer, c)); err != nil {
return err
}
case BlockingConsumer:
if err := sub.Subscribe(ctx, id, subject, handleSync(transformer, c)); err != nil {
if err := sub.Subscribe(ctx, id, subject, handleSync(ctx, transformer, c)); err != nil {
return err
}
default:
Expand All @@ -59,7 +59,7 @@ func Start(ctx context.Context, id string, sub messaging.Subscriber, consumer in
return nil
}

func handleSync(t transformers.Transformer, sc BlockingConsumer) handleFunc {
func handleSync(ctx context.Context, t transformers.Transformer, sc BlockingConsumer) handleFunc {
return func(msg *messaging.Message) error {
m := interface{}(msg)
var err error
Expand All @@ -69,11 +69,11 @@ func handleSync(t transformers.Transformer, sc BlockingConsumer) handleFunc {
return err
}
}
return sc.ConsumeBlocking(m)
return sc.ConsumeBlocking(ctx, m)
}
}

func handleAsync(t transformers.Transformer, ac AsyncConsumer) handleFunc {
func handleAsync(ctx context.Context, t transformers.Transformer, ac AsyncConsumer) handleFunc {
return func(msg *messaging.Message) error {
m := interface{}(msg)
var err error
Expand All @@ -84,7 +84,7 @@ func handleAsync(t transformers.Transformer, ac AsyncConsumer) handleFunc {
}
}

ac.ConsumeAsync(m)
ac.ConsumeAsync(ctx, m)
return nil
}
}
Expand Down
4 changes: 2 additions & 2 deletions consumers/notifiers/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (lm *loggingMiddleware) RemoveSubscription(ctx context.Context, token, id s

// ConsumeBlocking logs the consume_blocking request. It logs the message and the time it took to complete the request.
// If the request fails, it logs the error.
func (lm *loggingMiddleware) ConsumeBlocking(msg interface{}) (err error) {
func (lm *loggingMiddleware) ConsumeBlocking(ctx context.Context, msg interface{}) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method consume took %s to complete", time.Since(begin))
if err != nil {
Expand All @@ -98,5 +98,5 @@ func (lm *loggingMiddleware) ConsumeBlocking(msg interface{}) (err error) {
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.ConsumeBlocking(msg)
return lm.svc.ConsumeBlocking(ctx, msg)
}
4 changes: 2 additions & 2 deletions consumers/notifiers/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ func (ms *metricsMiddleware) RemoveSubscription(ctx context.Context, token, id s
}

// ConsumeBlocking instruments ConsumeBlocking method with metrics.
func (ms *metricsMiddleware) ConsumeBlocking(msg interface{}) error {
func (ms *metricsMiddleware) ConsumeBlocking(ctx context.Context, msg interface{}) error {
defer func(begin time.Time) {
ms.counter.With("method", "consume").Add(1)
ms.latency.With("method", "consume").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.ConsumeBlocking(msg)
return ms.svc.ConsumeBlocking(ctx, msg)
}
8 changes: 4 additions & 4 deletions consumers/notifiers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (ns *notifierService) RemoveSubscription(ctx context.Context, token, id str
return ns.subs.Remove(ctx, id)
}

func (ns *notifierService) ConsumeBlocking(message interface{}) error {
func (ns *notifierService) ConsumeBlocking(ctx context.Context, message interface{}) error {
msg, ok := message.(*messaging.Message)
if !ok {
return ErrMessage
Expand All @@ -114,7 +114,7 @@ func (ns *notifierService) ConsumeBlocking(message interface{}) error {
Offset: 0,
Limit: -1,
}
page, err := ns.subs.RetrieveAll(context.Background(), pm)
page, err := ns.subs.RetrieveAll(ctx, pm)
if err != nil {
return err
}
Expand All @@ -133,7 +133,7 @@ func (ns *notifierService) ConsumeBlocking(message interface{}) error {
return nil
}

func (ns *notifierService) ConsumeAsync(message interface{}) {
func (ns *notifierService) ConsumeAsync(ctx context.Context, message interface{}) {
msg, ok := message.(*messaging.Message)
if !ok {
ns.errCh <- ErrMessage
Expand All @@ -148,7 +148,7 @@ func (ns *notifierService) ConsumeAsync(message interface{}) {
Offset: 0,
Limit: -1,
}
page, err := ns.subs.RetrieveAll(context.Background(), pm)
page, err := ns.subs.RetrieveAll(ctx, pm)
if err != nil {
ns.errCh <- err
return
Expand Down
2 changes: 1 addition & 1 deletion consumers/notifiers/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func TestConsume(t *testing.T) {
}

for _, tc := range cases {
err := svc.ConsumeBlocking(tc.msg)
err := svc.ConsumeBlocking(context.TODO(), tc.msg)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
}
}
Loading