diff --git a/certs/postgres/certs.go b/certs/postgres/certs.go index be7b1588b9b..5aa84e629d8 100644 --- a/certs/postgres/certs.go +++ b/certs/postgres/certs.go @@ -13,6 +13,7 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/jmoiron/sqlx" "github.com/mainflux/mainflux/certs" + "github.com/mainflux/mainflux/internal/postgres" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" ) @@ -27,19 +28,19 @@ type Cert struct { } type certsRepository struct { - db *sqlx.DB + db postgres.Database log logger.Logger } // NewRepository instantiates a PostgreSQL implementation of certs // repository. -func NewRepository(db *sqlx.DB, log logger.Logger) certs.Repository { +func NewRepository(db postgres.Database, log logger.Logger) certs.Repository { return &certsRepository{db: db, log: log} } func (cr certsRepository) RetrieveAll(ctx context.Context, ownerID string, offset, limit uint64) (certs.Page, error) { q := `SELECT thing_id, owner_id, serial, expire FROM certs WHERE owner_id = $1 ORDER BY expire LIMIT $2 OFFSET $3;` - rows, err := cr.db.Query(q, ownerID, limit, offset) + rows, err := cr.db.QueryContext(ctx, q, ownerID, limit, offset) if err != nil { cr.log.Error(fmt.Sprintf("Failed to retrieve configs due to %s", err)) return certs.Page{}, err @@ -59,7 +60,7 @@ func (cr certsRepository) RetrieveAll(ctx context.Context, ownerID string, offse q = `SELECT COUNT(*) FROM certs WHERE owner_id = $1` var total uint64 - if err := cr.db.QueryRow(q, ownerID).Scan(&total); err != nil { + if err := cr.db.QueryRowxContext(ctx, q, ownerID).Scan(&total); err != nil { cr.log.Error(fmt.Sprintf("Failed to count certs due to %s", err)) return certs.Page{}, err } @@ -75,7 +76,7 @@ func (cr certsRepository) RetrieveAll(ctx context.Context, ownerID string, offse func (cr certsRepository) Save(ctx context.Context, cert certs.Cert) (string, error) { q := `INSERT INTO certs (thing_id, owner_id, serial, expire) VALUES (:thing_id, :owner_id, :serial, :expire)` - tx, err := cr.db.Beginx() + tx, err := cr.db.BeginTxx(ctx, nil) if err != nil { return "", errors.Wrap(errors.ErrCreateEntity, err) } @@ -116,7 +117,7 @@ func (cr certsRepository) Remove(ctx context.Context, ownerID, serial string) er func (cr certsRepository) RetrieveByThing(ctx context.Context, ownerID, thingID string, offset, limit uint64) (certs.Page, error) { q := `SELECT thing_id, owner_id, serial, expire FROM certs WHERE owner_id = $1 AND thing_id = $2 ORDER BY expire LIMIT $3 OFFSET $4;` - rows, err := cr.db.Query(q, ownerID, thingID, limit, offset) + rows, err := cr.db.QueryContext(ctx, q, ownerID, thingID, limit, offset) if err != nil { cr.log.Error(fmt.Sprintf("Failed to retrieve configs due to %s", err)) return certs.Page{}, err @@ -136,7 +137,7 @@ func (cr certsRepository) RetrieveByThing(ctx context.Context, ownerID, thingID q = `SELECT COUNT(*) FROM certs WHERE owner_id = $1 AND thing_id = $2` var total uint64 - if err := cr.db.QueryRow(q, ownerID, thingID).Scan(&total); err != nil { + if err := cr.db.QueryRowxContext(ctx, q, ownerID, thingID).Scan(&total); err != nil { cr.log.Error(fmt.Sprintf("Failed to count certs due to %s", err)) return certs.Page{}, err } diff --git a/certs/tracing/doc.go b/certs/tracing/doc.go new file mode 100644 index 00000000000..6b640542a3b --- /dev/null +++ b/certs/tracing/doc.go @@ -0,0 +1,12 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package tracing provides tracing instrumentation for Mainflux Users Groups service. +// +// This package provides tracing middleware for Mainflux Users Groups service. +// It can be used to trace incoming requests and add tracing capabilities to +// Mainflux Users Groups service. +// +// For more details about tracing instrumentation for Mainflux messaging refer +// to the documentation at https://docs.mainflux.io/tracing/. +package tracing diff --git a/certs/tracing/tracing.go b/certs/tracing/tracing.go new file mode 100644 index 00000000000..baac1b7f167 --- /dev/null +++ b/certs/tracing/tracing.go @@ -0,0 +1,79 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package tracing + +import ( + "context" + + "github.com/mainflux/mainflux/certs" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +var _ certs.Service = (*tracingMiddleware)(nil) + +type tracingMiddleware struct { + tracer trace.Tracer + svc certs.Service +} + +// New returns a new certs service with tracing capabilities. +func New(svc certs.Service, tracer trace.Tracer) certs.Service { + return &tracingMiddleware{tracer, svc} +} + +// IssueCert traces the "IssueCert" operation of the wrapped certs.Service. +func (tm *tracingMiddleware) IssueCert(ctx context.Context, token, thingID, ttl string) (certs.Cert, error) { + ctx, span := tm.tracer.Start(ctx, "svc_create_group", trace.WithAttributes( + attribute.String("thing_id", thingID), + attribute.String("ttl", ttl), + )) + defer span.End() + + return tm.svc.IssueCert(ctx, token, thingID, ttl) +} + +// ListCerts traces the "ListCerts" operation of the wrapped certs.Service. +func (tm *tracingMiddleware) ListCerts(ctx context.Context, token, thingID string, offset, limit uint64) (certs.Page, error) { + ctx, span := tm.tracer.Start(ctx, "svc_list_certs", trace.WithAttributes( + attribute.String("thing_id", thingID), + attribute.Int64("offset", int64(offset)), + attribute.Int64("limit", int64(limit)), + )) + defer span.End() + + return tm.svc.ListCerts(ctx, token, thingID, offset, limit) +} + +// ListSerials traces the "ListSerials" operation of the wrapped certs.Service. +func (tm *tracingMiddleware) ListSerials(ctx context.Context, token, thingID string, offset, limit uint64) (certs.Page, error) { + ctx, span := tm.tracer.Start(ctx, "svc_list_serials", trace.WithAttributes( + attribute.String("thing_id", thingID), + attribute.Int64("offset", int64(offset)), + attribute.Int64("limit", int64(limit)), + )) + defer span.End() + + return tm.svc.ListSerials(ctx, token, thingID, offset, limit) +} + +// ViewCert traces the "ViewCert" operation of the wrapped certs.Service. +func (tm *tracingMiddleware) ViewCert(ctx context.Context, token, serialID string) (certs.Cert, error) { + ctx, span := tm.tracer.Start(ctx, "svc_view_cert", trace.WithAttributes( + attribute.String("serial_id", serialID), + )) + defer span.End() + + return tm.svc.ViewCert(ctx, token, serialID) +} + +// RevokeCert traces the "RevokeCert" operation of the wrapped certs.Service. +func (tm *tracingMiddleware) RevokeCert(ctx context.Context, token, serialID string) (certs.Revoke, error) { + ctx, span := tm.tracer.Start(ctx, "svc_revoke_cert", trace.WithAttributes( + attribute.String("serial_id", serialID), + )) + defer span.End() + + return tm.svc.RevokeCert(ctx, token, serialID) +} diff --git a/cmd/certs/main.go b/cmd/certs/main.go index 4466d991f87..0036db1ff58 100644 --- a/cmd/certs/main.go +++ b/cmd/certs/main.go @@ -10,27 +10,28 @@ import ( "log" "os" + "github.com/jmoiron/sqlx" chclient "github.com/mainflux/callhome/pkg/client" "github.com/mainflux/mainflux" - "github.com/mainflux/mainflux/certs" "github.com/mainflux/mainflux/certs/api" vault "github.com/mainflux/mainflux/certs/pki" certsPg "github.com/mainflux/mainflux/certs/postgres" + "github.com/mainflux/mainflux/certs/tracing" "github.com/mainflux/mainflux/internal" + authClient "github.com/mainflux/mainflux/internal/clients/grpc/auth" jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" + pgClient "github.com/mainflux/mainflux/internal/clients/postgres" "github.com/mainflux/mainflux/internal/env" + "github.com/mainflux/mainflux/internal/postgres" "github.com/mainflux/mainflux/internal/server" httpserver "github.com/mainflux/mainflux/internal/server/http" mflog "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/users/policies" - "golang.org/x/sync/errgroup" - - "github.com/jmoiron/sqlx" - authClient "github.com/mainflux/mainflux/internal/clients/grpc/auth" - pgClient "github.com/mainflux/mainflux/internal/clients/postgres" mfsdk "github.com/mainflux/mainflux/pkg/sdk/go" "github.com/mainflux/mainflux/pkg/uuid" + "github.com/mainflux/mainflux/users/policies" + "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" ) const ( @@ -45,9 +46,9 @@ type config struct { LogLevel string `env:"MF_CERTS_LOG_LEVEL" envDefault:"info"` CertsURL string `env:"MF_SDK_CERTS_URL" envDefault:"http://localhost"` ThingsURL string `env:"MF_THINGS_URL" envDefault:"http://things:9000"` + JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"` SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"` InstanceID string `env:"MF_CERTS_INSTANCE_ID" envDefault:""` - JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"` // Sign and issue certificates without 3rd party PKI SignCAPath string `env:"MF_CERTS_SIGN_CA_PATH" envDefault:"ca.crt"` @@ -108,12 +109,6 @@ func main() { defer authHandler.Close() logger.Info("Successfully connected to auth grpc server " + authHandler.Secure()) - svc := newService(auth, db, logger, cfg, pkiClient) - - httpServerConfig := server.Config{Port: defSvcHttpPort} - if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil { - logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) - } tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID) if err != nil { logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) @@ -123,6 +118,14 @@ func main() { logger.Error(fmt.Sprintf("error shutting down tracer provider: %v", err)) } }() + tracer := tp.Tracer(svcName) + + svc := newService(auth, db, tracer, logger, cfg, pkiClient) + + httpServerConfig := server.Config{Port: defSvcHttpPort} + if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil { + logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) + } hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, instanceID), logger) if cfg.SendTelemetry { @@ -143,8 +146,9 @@ func main() { } } -func newService(auth policies.AuthServiceClient, db *sqlx.DB, logger mflog.Logger, cfg config, pkiAgent vault.Agent) certs.Service { - certsRepo := certsPg.NewRepository(db, logger) +func newService(auth policies.AuthServiceClient, db *sqlx.DB, tracer trace.Tracer, logger mflog.Logger, cfg config, pkiAgent vault.Agent) certs.Service { + database := postgres.NewDatabase(db, tracer) + certsRepo := certsPg.NewRepository(database, logger) config := mfsdk.Config{ CertsURL: cfg.CertsURL, ThingsURL: cfg.ThingsURL, @@ -154,5 +158,7 @@ func newService(auth policies.AuthServiceClient, db *sqlx.DB, logger mflog.Logge svc = api.LoggingMiddleware(svc, logger) counter, latency := internal.MakeMetrics(svcName, "api") svc = api.MetricsMiddleware(svc, counter, latency) + svc = tracing.New(svc, tracer) + return svc } diff --git a/docker/addons/certs/docker-compose.yml b/docker/addons/certs/docker-compose.yml index e457e03364c..e6829813a9b 100644 --- a/docker/addons/certs/docker-compose.yml +++ b/docker/addons/certs/docker-compose.yml @@ -69,6 +69,7 @@ services: MF_AUTH_GRPC_URL: ${MF_USERS_GRPC_URL} MF_AUTH_GRPC_TIMEOUT: ${MF_USERS_GRPC_TIMEOUT} MF_CERTS_VAULT_HOST: ${MF_CERTS_VAULT_HOST} + MF_JAEGER_URL: ${MF_JAEGER_URL} MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY} MF_CERTS_INSTANCE_ID: ${MF_CERTS_INSTANCE_ID} volumes: