Skip to content

Commit

Permalink
Initial Commit: Add Tracing To Certs Service
Browse files Browse the repository at this point in the history
Signed-off-by: rodneyosodo <blackd0t@protonmail.com>
  • Loading branch information
rodneyosodo committed Jul 5, 2023
1 parent 889b35e commit 5470a7c
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 18 deletions.
15 changes: 8 additions & 7 deletions certs/postgres/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/jackc/pgx/v5/pgconn"
"github.com/jmoiron/sqlx"
"github.com/mainflux/mainflux/certs"
"github.com/mainflux/mainflux/internal/postgres"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
)
Expand All @@ -27,19 +28,19 @@ type Cert struct {
}

type certsRepository struct {
db *sqlx.DB
db postgres.Database
log logger.Logger
}

// NewRepository instantiates a PostgreSQL implementation of certs
// repository.
func NewRepository(db *sqlx.DB, log logger.Logger) certs.Repository {
func NewRepository(db postgres.Database, log logger.Logger) certs.Repository {
return &certsRepository{db: db, log: log}
}

func (cr certsRepository) RetrieveAll(ctx context.Context, ownerID string, offset, limit uint64) (certs.Page, error) {
q := `SELECT thing_id, owner_id, serial, expire FROM certs WHERE owner_id = $1 ORDER BY expire LIMIT $2 OFFSET $3;`
rows, err := cr.db.Query(q, ownerID, limit, offset)
rows, err := cr.db.QueryContext(ctx, q, ownerID, limit, offset)
if err != nil {
cr.log.Error(fmt.Sprintf("Failed to retrieve configs due to %s", err))
return certs.Page{}, err
Expand All @@ -59,7 +60,7 @@ func (cr certsRepository) RetrieveAll(ctx context.Context, ownerID string, offse

q = `SELECT COUNT(*) FROM certs WHERE owner_id = $1`
var total uint64
if err := cr.db.QueryRow(q, ownerID).Scan(&total); err != nil {
if err := cr.db.QueryRowxContext(ctx, q, ownerID).Scan(&total); err != nil {
cr.log.Error(fmt.Sprintf("Failed to count certs due to %s", err))
return certs.Page{}, err
}
Expand All @@ -75,7 +76,7 @@ func (cr certsRepository) RetrieveAll(ctx context.Context, ownerID string, offse
func (cr certsRepository) Save(ctx context.Context, cert certs.Cert) (string, error) {
q := `INSERT INTO certs (thing_id, owner_id, serial, expire) VALUES (:thing_id, :owner_id, :serial, :expire)`

tx, err := cr.db.Beginx()
tx, err := cr.db.BeginTxx(ctx, nil)
if err != nil {
return "", errors.Wrap(errors.ErrCreateEntity, err)
}
Expand Down Expand Up @@ -116,7 +117,7 @@ func (cr certsRepository) Remove(ctx context.Context, ownerID, serial string) er

func (cr certsRepository) RetrieveByThing(ctx context.Context, ownerID, thingID string, offset, limit uint64) (certs.Page, error) {
q := `SELECT thing_id, owner_id, serial, expire FROM certs WHERE owner_id = $1 AND thing_id = $2 ORDER BY expire LIMIT $3 OFFSET $4;`
rows, err := cr.db.Query(q, ownerID, thingID, limit, offset)
rows, err := cr.db.QueryContext(ctx, q, ownerID, thingID, limit, offset)
if err != nil {
cr.log.Error(fmt.Sprintf("Failed to retrieve configs due to %s", err))
return certs.Page{}, err
Expand All @@ -136,7 +137,7 @@ func (cr certsRepository) RetrieveByThing(ctx context.Context, ownerID, thingID

q = `SELECT COUNT(*) FROM certs WHERE owner_id = $1 AND thing_id = $2`
var total uint64
if err := cr.db.QueryRow(q, ownerID, thingID).Scan(&total); err != nil {
if err := cr.db.QueryRowxContext(ctx, q, ownerID, thingID).Scan(&total); err != nil {
cr.log.Error(fmt.Sprintf("Failed to count certs due to %s", err))
return certs.Page{}, err
}
Expand Down
4 changes: 2 additions & 2 deletions certs/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (cs *certsService) IssueCert(ctx context.Context, token, thingID string, tt
Expire: time.Unix(0, int64(cert.Expire)*int64(time.Second)),
}

_, err = cs.certsRepo.Save(context.Background(), c)
_, err = cs.certsRepo.Save(ctx, c)
return c, err
}

Expand Down Expand Up @@ -135,7 +135,7 @@ func (cs *certsService) RevokeCert(ctx context.Context, token, thingID string) (
return revoke, errors.Wrap(ErrFailedCertRevocation, err)
}
revoke.RevocationTime = revTime
if err = cs.certsRepo.Remove(context.Background(), u.GetId(), c.Serial); err != nil {
if err = cs.certsRepo.Remove(ctx, u.GetId(), c.Serial); err != nil {
return revoke, errors.Wrap(ErrFailedToRemoveCertFromDB, err)
}
}
Expand Down
12 changes: 12 additions & 0 deletions certs/tracing/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

// Package tracing provides tracing instrumentation for Mainflux Users Groups service.
//
// This package provides tracing middleware for Mainflux Users Groups service.
// It can be used to trace incoming requests and add tracing capabilities to
// Mainflux Users Groups service.
//
// For more details about tracing instrumentation for Mainflux messaging refer
// to the documentation at https://docs.mainflux.io/tracing/.
package tracing
79 changes: 79 additions & 0 deletions certs/tracing/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

package tracing

import (
"context"

"github.com/mainflux/mainflux/certs"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

var _ certs.Service = (*tracingMiddleware)(nil)

type tracingMiddleware struct {
tracer trace.Tracer
svc certs.Service
}

// New returns a new certs service with tracing capabilities.
func New(svc certs.Service, tracer trace.Tracer) certs.Service {
return &tracingMiddleware{tracer, svc}
}

// IssueCert traces the "IssueCert" operation of the wrapped certs.Service.
func (tm *tracingMiddleware) IssueCert(ctx context.Context, token, thingID, ttl string) (certs.Cert, error) {
ctx, span := tm.tracer.Start(ctx, "svc_create_group", trace.WithAttributes(
attribute.String("thing_id", thingID),
attribute.String("ttl", ttl),
))
defer span.End()

return tm.svc.IssueCert(ctx, token, thingID, ttl)
}

// ListCerts traces the "ListCerts" operation of the wrapped certs.Service.
func (tm *tracingMiddleware) ListCerts(ctx context.Context, token, thingID string, offset, limit uint64) (certs.Page, error) {
ctx, span := tm.tracer.Start(ctx, "svc_list_certs", trace.WithAttributes(
attribute.String("thing_id", thingID),
attribute.Int64("offset", int64(offset)),
attribute.Int64("limit", int64(limit)),
))
defer span.End()

return tm.svc.ListCerts(ctx, token, thingID, offset, limit)
}

// ListSerials traces the "ListSerials" operation of the wrapped certs.Service.
func (tm *tracingMiddleware) ListSerials(ctx context.Context, token, thingID string, offset, limit uint64) (certs.Page, error) {
ctx, span := tm.tracer.Start(ctx, "svc_list_serials", trace.WithAttributes(
attribute.String("thing_id", thingID),
attribute.Int64("offset", int64(offset)),
attribute.Int64("limit", int64(limit)),
))
defer span.End()

return tm.svc.ListSerials(ctx, token, thingID, offset, limit)
}

// ViewCert traces the "ViewCert" operation of the wrapped certs.Service.
func (tm *tracingMiddleware) ViewCert(ctx context.Context, token, serialID string) (certs.Cert, error) {
ctx, span := tm.tracer.Start(ctx, "svc_view_cert", trace.WithAttributes(
attribute.String("serial_id", serialID),
))
defer span.End()

return tm.svc.ViewCert(ctx, token, serialID)
}

// RevokeCert traces the "RevokeCert" operation of the wrapped certs.Service.
func (tm *tracingMiddleware) RevokeCert(ctx context.Context, token, serialID string) (certs.Revoke, error) {
ctx, span := tm.tracer.Start(ctx, "svc_revoke_cert", trace.WithAttributes(
attribute.String("serial_id", serialID),
))
defer span.End()

return tm.svc.RevokeCert(ctx, token, serialID)
}
35 changes: 26 additions & 9 deletions cmd/certs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,27 @@ import (
"log"
"os"

"github.com/jmoiron/sqlx"
chclient "github.com/mainflux/callhome/pkg/client"
"github.com/mainflux/mainflux"

"github.com/mainflux/mainflux/certs"
"github.com/mainflux/mainflux/certs/api"
vault "github.com/mainflux/mainflux/certs/pki"
certsPg "github.com/mainflux/mainflux/certs/postgres"
"github.com/mainflux/mainflux/certs/tracing"
"github.com/mainflux/mainflux/internal"
authClient "github.com/mainflux/mainflux/internal/clients/grpc/auth"
jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger"
pgClient "github.com/mainflux/mainflux/internal/clients/postgres"
"github.com/mainflux/mainflux/internal/env"
"github.com/mainflux/mainflux/internal/postgres"
"github.com/mainflux/mainflux/internal/server"
httpserver "github.com/mainflux/mainflux/internal/server/http"
mflog "github.com/mainflux/mainflux/logger"
mfsdk "github.com/mainflux/mainflux/pkg/sdk/go"
"github.com/mainflux/mainflux/users/policies"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"

"github.com/jmoiron/sqlx"
authClient "github.com/mainflux/mainflux/internal/clients/grpc/auth"
pgClient "github.com/mainflux/mainflux/internal/clients/postgres"
mfsdk "github.com/mainflux/mainflux/pkg/sdk/go"
)

const (
Expand All @@ -43,6 +45,7 @@ type config struct {
LogLevel string `env:"MF_CERTS_LOG_LEVEL" envDefault:"info"`
CertsURL string `env:"MF_SDK_CERTS_URL" envDefault:"http://localhost"`
ThingsURL string `env:"MF_THINGS_URL" envDefault:"http://things:9000"`
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`

// Sign and issue certificates without 3rd party PKI
Expand Down Expand Up @@ -96,7 +99,18 @@ func main() {
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())

svc := newService(auth, db, logger, cfg, pkiClient)
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err))
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
logger.Error(fmt.Sprintf("error shutting down tracer provider: %v", err))
}
}()
tracer := tp.Tracer(svcName)

svc := newService(auth, db, tracer, logger, cfg, pkiClient)

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
Expand All @@ -122,8 +136,9 @@ func main() {
}
}

func newService(auth policies.AuthServiceClient, db *sqlx.DB, logger mflog.Logger, cfg config, pkiAgent vault.Agent) certs.Service {
certsRepo := certsPg.NewRepository(db, logger)
func newService(auth policies.AuthServiceClient, db *sqlx.DB, tracer trace.Tracer, logger mflog.Logger, cfg config, pkiAgent vault.Agent) certs.Service {
database := postgres.NewDatabase(db, tracer)
certsRepo := certsPg.NewRepository(database, logger)
config := mfsdk.Config{
CertsURL: cfg.CertsURL,
ThingsURL: cfg.ThingsURL,
Expand All @@ -133,5 +148,7 @@ func newService(auth policies.AuthServiceClient, db *sqlx.DB, logger mflog.Logge
svc = api.LoggingMiddleware(svc, logger)
counter, latency := internal.MakeMetrics(svcName, "api")
svc = api.MetricsMiddleware(svc, counter, latency)
svc = tracing.New(svc, tracer)

return svc
}
1 change: 1 addition & 0 deletions docker/addons/certs/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ services:
MF_AUTH_GRPC_URL: ${MF_USERS_GRPC_URL}
MF_AUTH_GRPC_TIMEOUT: ${MF_USERS_GRPC_TIMEOUT}
MF_CERTS_VAULT_HOST: ${MF_CERTS_VAULT_HOST}
MF_JAEGER_URL: ${MF_JAEGER_URL}
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
volumes:
- ../../ssl/certs/ca.key:/etc/ssl/certs/ca.key
Expand Down

0 comments on commit 5470a7c

Please sign in to comment.