diff --git a/bootstrap/configs.go b/bootstrap/configs.go index 961bac77d9..10d3936f45 100644 --- a/bootstrap/configs.go +++ b/bootstrap/configs.go @@ -4,6 +4,7 @@ package bootstrap import ( + "context" "time" "github.com/mainflux/mainflux/pkg/clients" @@ -62,54 +63,54 @@ type ConfigsPage struct { type ConfigRepository interface { // Save persists the Config. Successful operation is indicated by non-nil // error response. - Save(cfg Config, chsConnIDs []string) (string, error) + Save(ctx context.Context, cfg Config, chsConnIDs []string) (string, error) // RetrieveByID retrieves the Config having the provided identifier, that is owned // by the specified user. - RetrieveByID(owner, id string) (Config, error) + RetrieveByID(ctx context.Context, owner, id string) (Config, error) // RetrieveAll retrieves a subset of Configs that are owned // by the specific user, with given filter parameters. - RetrieveAll(owner string, filter Filter, offset, limit uint64) ConfigsPage + RetrieveAll(ctx context.Context, owner string, filter Filter, offset, limit uint64) ConfigsPage // RetrieveByExternalID returns Config for given external ID. - RetrieveByExternalID(externalID string) (Config, error) + RetrieveByExternalID(ctx context.Context, externalID string) (Config, error) // Update updates an existing Config. A non-nil error is returned // to indicate operation failure. - Update(cfg Config) error + Update(ctx context.Context, cfg Config) error // UpdateCerts updates an existing Config certificate and owner. // A non-nil error is returned to indicate operation failure. - UpdateCert(owner, thingID, clientCert, clientKey, caCert string) error + UpdateCert(ctx context.Context, owner, thingID, clientCert, clientKey, caCert string) error // UpdateConnections updates a list of Channels the Config is connected to // adding new Channels if needed. - UpdateConnections(owner, id string, channels []Channel, connections []string) error + UpdateConnections(ctx context.Context, owner, id string, channels []Channel, connections []string) error // Remove removes the Config having the provided identifier, that is owned // by the specified user. - Remove(owner, id string) error + Remove(ctx context.Context, owner, id string) error // ChangeState changes of the Config, that is owned by the specific user. - ChangeState(owner, id string, state State) error + ChangeState(ctx context.Context, owner, id string, state State) error // ListExisting retrieves those channels from the given list that exist in DB. - ListExisting(owner string, ids []string) ([]Channel, error) + ListExisting(ctx context.Context, owner string, ids []string) ([]Channel, error) // Methods RemoveThing, UpdateChannel, and RemoveChannel are related to // event sourcing. That's why these methods surpass ownership check. // RemoveThing removes Config of the Thing with the given ID. - RemoveThing(id string) error + RemoveThing(ctx context.Context, id string) error // UpdateChannel updates channel with the given ID. - UpdateChannel(c Channel) error + UpdateChannel(ctx context.Context, c Channel) error // RemoveChannel removes channel with the given ID. - RemoveChannel(id string) error + RemoveChannel(ctx context.Context, id string) error // DisconnectHandler changes state of the Config when the corresponding Thing is // disconnected from the Channel. - DisconnectThing(channelID, thingID string) error + DisconnectThing(ctx context.Context, channelID, thingID string) error } diff --git a/bootstrap/mocks/configs.go b/bootstrap/mocks/configs.go index 4dfd9b682a..8314b68373 100644 --- a/bootstrap/mocks/configs.go +++ b/bootstrap/mocks/configs.go @@ -4,6 +4,7 @@ package mocks import ( + "context" "sort" "strconv" "strings" @@ -32,7 +33,7 @@ func NewConfigsRepository() bootstrap.ConfigRepository { } } -func (crm *configRepositoryMock) Save(config bootstrap.Config, connections []string) (string, error) { +func (crm *configRepositoryMock) Save(_ context.Context, config bootstrap.Config, connections []string) (string, error) { crm.mu.Lock() defer crm.mu.Unlock() @@ -61,7 +62,7 @@ func (crm *configRepositoryMock) Save(config bootstrap.Config, connections []str return config.MFThing, nil } -func (crm *configRepositoryMock) RetrieveByID(token, id string) (bootstrap.Config, error) { +func (crm *configRepositoryMock) RetrieveByID(_ context.Context, token, id string) (bootstrap.Config, error) { crm.mu.Lock() defer crm.mu.Unlock() @@ -77,7 +78,7 @@ func (crm *configRepositoryMock) RetrieveByID(token, id string) (bootstrap.Confi } -func (crm *configRepositoryMock) RetrieveAll(token string, filter bootstrap.Filter, offset, limit uint64) bootstrap.ConfigsPage { +func (crm *configRepositoryMock) RetrieveAll(_ context.Context, token string, filter bootstrap.Filter, offset, limit uint64) bootstrap.ConfigsPage { crm.mu.Lock() defer crm.mu.Unlock() @@ -121,7 +122,7 @@ func (crm *configRepositoryMock) RetrieveAll(token string, filter bootstrap.Filt } } -func (crm *configRepositoryMock) RetrieveByExternalID(externalID string) (bootstrap.Config, error) { +func (crm *configRepositoryMock) RetrieveByExternalID(_ context.Context, externalID string) (bootstrap.Config, error) { crm.mu.Lock() defer crm.mu.Unlock() @@ -134,7 +135,7 @@ func (crm *configRepositoryMock) RetrieveByExternalID(externalID string) (bootst return bootstrap.Config{}, errors.ErrNotFound } -func (crm *configRepositoryMock) Update(config bootstrap.Config) error { +func (crm *configRepositoryMock) Update(_ context.Context, config bootstrap.Config) error { crm.mu.Lock() defer crm.mu.Unlock() @@ -150,7 +151,7 @@ func (crm *configRepositoryMock) Update(config bootstrap.Config) error { return nil } -func (crm *configRepositoryMock) UpdateCert(owner, thingID, clientCert, clientKey, caCert string) error { +func (crm *configRepositoryMock) UpdateCert(_ context.Context, owner, thingID, clientCert, clientKey, caCert string) error { crm.mu.Lock() defer crm.mu.Unlock() var forUpdate bootstrap.Config @@ -171,7 +172,7 @@ func (crm *configRepositoryMock) UpdateCert(owner, thingID, clientCert, clientKe return nil } -func (crm *configRepositoryMock) UpdateConnections(token, id string, channels []bootstrap.Channel, connections []string) error { +func (crm *configRepositoryMock) UpdateConnections(_ context.Context, token, id string, channels []bootstrap.Channel, connections []string) error { crm.mu.Lock() defer crm.mu.Unlock() @@ -197,7 +198,7 @@ func (crm *configRepositoryMock) UpdateConnections(token, id string, channels [] return nil } -func (crm *configRepositoryMock) Remove(token, id string) error { +func (crm *configRepositoryMock) Remove(_ context.Context, token, id string) error { crm.mu.Lock() defer crm.mu.Unlock() @@ -211,7 +212,7 @@ func (crm *configRepositoryMock) Remove(token, id string) error { return nil } -func (crm *configRepositoryMock) ChangeState(token, id string, state bootstrap.State) error { +func (crm *configRepositoryMock) ChangeState(_ context.Context, token, id string, state bootstrap.State) error { crm.mu.Lock() defer crm.mu.Unlock() @@ -228,7 +229,7 @@ func (crm *configRepositoryMock) ChangeState(token, id string, state bootstrap.S return nil } -func (crm *configRepositoryMock) ListExisting(token string, connections []string) ([]bootstrap.Channel, error) { +func (crm *configRepositoryMock) ListExisting(_ context.Context, token string, connections []string) ([]bootstrap.Channel, error) { crm.mu.Lock() defer crm.mu.Unlock() @@ -246,7 +247,7 @@ func (crm *configRepositoryMock) ListExisting(token string, connections []string return ret, nil } -func (crm *configRepositoryMock) RemoveThing(id string) error { +func (crm *configRepositoryMock) RemoveThing(_ context.Context, id string) error { crm.mu.Lock() defer crm.mu.Unlock() @@ -254,7 +255,7 @@ func (crm *configRepositoryMock) RemoveThing(id string) error { return nil } -func (crm *configRepositoryMock) UpdateChannel(ch bootstrap.Channel) error { +func (crm *configRepositoryMock) UpdateChannel(_ context.Context, ch bootstrap.Channel) error { crm.mu.Lock() defer crm.mu.Unlock() @@ -269,7 +270,7 @@ func (crm *configRepositoryMock) UpdateChannel(ch bootstrap.Channel) error { return nil } -func (crm *configRepositoryMock) RemoveChannel(id string) error { +func (crm *configRepositoryMock) RemoveChannel(_ context.Context, id string) error { crm.mu.Lock() defer crm.mu.Unlock() @@ -277,7 +278,7 @@ func (crm *configRepositoryMock) RemoveChannel(id string) error { return nil } -func (crm *configRepositoryMock) DisconnectThing(channelID, thingID string) error { +func (crm *configRepositoryMock) DisconnectThing(_ context.Context, channelID, thingID string) error { crm.mu.Lock() defer crm.mu.Unlock() diff --git a/bootstrap/postgres/configs.go b/bootstrap/postgres/configs.go index fa3418e526..ef3f44964c 100644 --- a/bootstrap/postgres/configs.go +++ b/bootstrap/postgres/configs.go @@ -4,6 +4,7 @@ package postgres import ( + "context" "database/sql" "encoding/json" "fmt" @@ -15,6 +16,7 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/jmoiron/sqlx" "github.com/mainflux/mainflux/bootstrap" + "github.com/mainflux/mainflux/internal/postgres" mflog "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/clients" "github.com/mainflux/mainflux/pkg/errors" @@ -34,21 +36,21 @@ const cleanupQuery = `DELETE FROM channels ch WHERE NOT EXISTS ( var _ bootstrap.ConfigRepository = (*configRepository)(nil) type configRepository struct { - db *sqlx.DB + db postgres.Database log mflog.Logger } // NewConfigRepository instantiates a PostgreSQL implementation of config // repository. -func NewConfigRepository(db *sqlx.DB, log mflog.Logger) bootstrap.ConfigRepository { +func NewConfigRepository(db postgres.Database, log mflog.Logger) bootstrap.ConfigRepository { return &configRepository{db: db, log: log} } -func (cr configRepository) Save(cfg bootstrap.Config, chsConnIDs []string) (string, error) { +func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsConnIDs []string) (string, error) { q := `INSERT INTO configs (mainflux_thing, owner, name, client_cert, client_key, ca_cert, mainflux_key, external_id, external_key, content, state) VALUES (:mainflux_thing, :owner, :name, :client_cert, :client_key, :ca_cert, :mainflux_key, :external_id, :external_key, :content, :state)` - tx, err := cr.db.Beginx() + tx, err := cr.db.BeginTxx(ctx, nil) if err != nil { return "", errors.Wrap(errors.ErrCreateEntity, err) } @@ -65,12 +67,12 @@ func (cr configRepository) Save(cfg bootstrap.Config, chsConnIDs []string) (stri return "", errors.Wrap(errors.ErrCreateEntity, e) } - if err := insertChannels(cfg.Owner, cfg.MFChannels, tx); err != nil { + if err := insertChannels(ctx, cfg.Owner, cfg.MFChannels, tx); err != nil { cr.rollback("Failed to insert Channels", tx) return "", errors.Wrap(errSaveChannels, err) } - if err := insertConnections(cfg, chsConnIDs, tx); err != nil { + if err := insertConnections(ctx, cfg, chsConnIDs, tx); err != nil { cr.rollback("Failed to insert connections", tx) return "", errors.Wrap(errSaveConnections, err) } @@ -83,7 +85,7 @@ func (cr configRepository) Save(cfg bootstrap.Config, chsConnIDs []string) (stri return cfg.MFThing, nil } -func (cr configRepository) RetrieveByID(owner, id string) (bootstrap.Config, error) { +func (cr configRepository) RetrieveByID(ctx context.Context, owner, id string) (bootstrap.Config, error) { q := `SELECT mainflux_thing, mainflux_key, external_id, external_key, name, content, state FROM configs WHERE mainflux_thing = $1 AND owner = $2` @@ -93,7 +95,7 @@ func (cr configRepository) RetrieveByID(owner, id string) (bootstrap.Config, err Owner: owner, } - if err := cr.db.QueryRowx(q, id, owner).StructScan(&dbcfg); err != nil { + if err := cr.db.QueryRowxContext(ctx, q, id, owner).StructScan(&dbcfg); err != nil { empty := bootstrap.Config{} if err == sql.ErrNoRows { return empty, errors.Wrap(errors.ErrNotFound, err) @@ -107,7 +109,7 @@ func (cr configRepository) RetrieveByID(owner, id string) (bootstrap.Config, err ON ch.mainflux_channel = conn.channel_id AND ch.owner = conn.config_owner WHERE conn.config_id = :mainflux_thing AND conn.config_owner = :owner` - rows, err := cr.db.NamedQuery(q, dbcfg) + rows, err := cr.db.NamedQueryContext(ctx, q, dbcfg) if err != nil { cr.log.Error(fmt.Sprintf("Failed to retrieve connected due to %s", err)) return bootstrap.Config{}, errors.Wrap(errors.ErrViewEntity, err) @@ -136,7 +138,7 @@ func (cr configRepository) RetrieveByID(owner, id string) (bootstrap.Config, err return cfg, nil } -func (cr configRepository) RetrieveAll(owner string, filter bootstrap.Filter, offset, limit uint64) bootstrap.ConfigsPage { +func (cr configRepository) RetrieveAll(ctx context.Context, owner string, filter bootstrap.Filter, offset, limit uint64) bootstrap.ConfigsPage { search, params := cr.retrieveAll(owner, filter) n := len(params) @@ -144,7 +146,7 @@ func (cr configRepository) RetrieveAll(owner string, filter bootstrap.Filter, of FROM configs %s ORDER BY mainflux_thing LIMIT $%d OFFSET $%d` q = fmt.Sprintf(q, search, n+1, n+2) - rows, err := cr.db.Query(q, append(params, limit, offset)...) + rows, err := cr.db.QueryContext(ctx, q, append(params, limit, offset)...) if err != nil { cr.log.Error(fmt.Sprintf("Failed to retrieve configs due to %s", err)) return bootstrap.ConfigsPage{} @@ -169,7 +171,7 @@ func (cr configRepository) RetrieveAll(owner string, filter bootstrap.Filter, of q = fmt.Sprintf(`SELECT COUNT(*) FROM configs %s`, search) var total uint64 - if err := cr.db.QueryRow(q, params...).Scan(&total); err != nil { + if err := cr.db.QueryRowxContext(ctx, q, params...).Scan(&total); err != nil { cr.log.Error(fmt.Sprintf("Failed to count configs due to %s", err)) return bootstrap.ConfigsPage{} } @@ -182,7 +184,7 @@ func (cr configRepository) RetrieveAll(owner string, filter bootstrap.Filter, of } } -func (cr configRepository) RetrieveByExternalID(externalID string) (bootstrap.Config, error) { +func (cr configRepository) RetrieveByExternalID(ctx context.Context, externalID string) (bootstrap.Config, error) { q := `SELECT mainflux_thing, mainflux_key, external_key, owner, name, client_cert, client_key, ca_cert, content, state FROM configs WHERE external_id = $1` @@ -190,7 +192,7 @@ func (cr configRepository) RetrieveByExternalID(externalID string) (bootstrap.Co ExternalID: externalID, } - if err := cr.db.QueryRowx(q, externalID).StructScan(&dbcfg); err != nil { + if err := cr.db.QueryRowxContext(ctx, q, externalID).StructScan(&dbcfg); err != nil { empty := bootstrap.Config{} if err == sql.ErrNoRows { return empty, errors.Wrap(errors.ErrNotFound, err) @@ -203,7 +205,7 @@ func (cr configRepository) RetrieveByExternalID(externalID string) (bootstrap.Co ON ch.mainflux_channel = conn.channel_id AND ch.owner = conn.config_owner WHERE conn.config_id = :mainflux_thing AND conn.config_owner = :owner` - rows, err := cr.db.NamedQuery(q, dbcfg) + rows, err := cr.db.NamedQueryContext(ctx, q, dbcfg) if err != nil { cr.log.Error(fmt.Sprintf("Failed to retrieve connected due to %s", err)) return bootstrap.Config{}, errors.Wrap(errors.ErrViewEntity, err) @@ -233,13 +235,13 @@ func (cr configRepository) RetrieveByExternalID(externalID string) (bootstrap.Co return cfg, nil } -func (cr configRepository) Update(cfg bootstrap.Config) error { +func (cr configRepository) Update(ctx context.Context, cfg bootstrap.Config) error { q := `UPDATE configs SET name = $1, content = $2 WHERE mainflux_thing = $3 AND owner = $4` content := nullString(cfg.Content) name := nullString(cfg.Name) - res, err := cr.db.Exec(q, name, content, cfg.MFThing, cfg.Owner) + res, err := cr.db.ExecContext(ctx, q, name, content, cfg.MFThing, cfg.Owner) if err != nil { return errors.Wrap(errors.ErrUpdateEntity, err) } @@ -256,10 +258,10 @@ func (cr configRepository) Update(cfg bootstrap.Config) error { return nil } -func (cr configRepository) UpdateCert(owner, thingID, clientCert, clientKey, caCert string) error { +func (cr configRepository) UpdateCert(ctx context.Context, owner, thingID, clientCert, clientKey, caCert string) error { q := `UPDATE configs SET client_cert = $1, client_key = $2, ca_cert = $3 WHERE mainflux_thing = $4 AND owner = $5` - res, err := cr.db.Exec(q, clientCert, clientKey, caCert, thingID, owner) + res, err := cr.db.ExecContext(ctx, q, clientCert, clientKey, caCert, thingID, owner) if err != nil { return errors.Wrap(errors.ErrUpdateEntity, err) } @@ -276,18 +278,18 @@ func (cr configRepository) UpdateCert(owner, thingID, clientCert, clientKey, caC return nil } -func (cr configRepository) UpdateConnections(owner, id string, channels []bootstrap.Channel, connections []string) error { - tx, err := cr.db.Beginx() +func (cr configRepository) UpdateConnections(ctx context.Context, owner, id string, channels []bootstrap.Channel, connections []string) error { + tx, err := cr.db.BeginTxx(ctx, nil) if err != nil { return errors.Wrap(errors.ErrUpdateEntity, err) } - if err := insertChannels(owner, channels, tx); err != nil { + if err := insertChannels(ctx, owner, channels, tx); err != nil { cr.rollback("Failed to insert Channels during the update", tx) return errors.Wrap(errors.ErrUpdateEntity, err) } - if err := updateConnections(owner, id, connections, tx); err != nil { + if err := updateConnections(ctx, owner, id, connections, tx); err != nil { if e, ok := err.(*pgconn.PgError); ok { if e.Code == pgerrcode.ForeignKeyViolation { return errors.ErrNotFound @@ -305,23 +307,23 @@ func (cr configRepository) UpdateConnections(owner, id string, channels []bootst return nil } -func (cr configRepository) Remove(owner, id string) error { +func (cr configRepository) Remove(ctx context.Context, owner, id string) error { q := `DELETE FROM configs WHERE mainflux_thing = $1 AND owner = $2` - if _, err := cr.db.Exec(q, id, owner); err != nil { + if _, err := cr.db.ExecContext(ctx, q, id, owner); err != nil { return errors.Wrap(errors.ErrRemoveEntity, err) } - if _, err := cr.db.Exec(cleanupQuery); err != nil { + if _, err := cr.db.ExecContext(ctx, cleanupQuery); err != nil { cr.log.Warn("Failed to clean dangling channels after removal") } return nil } -func (cr configRepository) ChangeState(owner, id string, state bootstrap.State) error { +func (cr configRepository) ChangeState(ctx context.Context, owner, id string, state bootstrap.State) error { q := `UPDATE configs SET state = $1 WHERE mainflux_thing = $2 AND owner = $3;` - res, err := cr.db.Exec(q, state, id, owner) + res, err := cr.db.ExecContext(ctx, q, state, id, owner) if err != nil { return errors.Wrap(errors.ErrUpdateEntity, err) } @@ -338,7 +340,7 @@ func (cr configRepository) ChangeState(owner, id string, state bootstrap.State) return nil } -func (cr configRepository) ListExisting(owner string, ids []string) ([]bootstrap.Channel, error) { +func (cr configRepository) ListExisting(ctx context.Context, owner string, ids []string) ([]bootstrap.Channel, error) { var channels []bootstrap.Channel if len(ids) == 0 { return channels, nil @@ -350,7 +352,7 @@ func (cr configRepository) ListExisting(owner string, ids []string) ([]bootstrap } q := "SELECT mainflux_channel, name, metadata FROM channels WHERE owner = $1 AND mainflux_channel = ANY ($2)" - rows, err := cr.db.Queryx(q, owner, chans) + rows, err := cr.db.QueryxContext(ctx, q, owner, chans) if err != nil { return []bootstrap.Channel{}, errors.Wrap(errors.ErrViewEntity, err) } @@ -374,11 +376,11 @@ func (cr configRepository) ListExisting(owner string, ids []string) ([]bootstrap return channels, nil } -func (cr configRepository) RemoveThing(id string) error { +func (cr configRepository) RemoveThing(ctx context.Context, id string) error { q := `DELETE FROM configs WHERE mainflux_thing = $1` - _, err := cr.db.Exec(q, id) + _, err := cr.db.ExecContext(ctx, q, id) - if _, err := cr.db.Exec(cleanupQuery); err != nil { + if _, err := cr.db.ExecContext(ctx, cleanupQuery); err != nil { cr.log.Warn("Failed to clean dangling channels after removal") } if err != nil { @@ -387,7 +389,7 @@ func (cr configRepository) RemoveThing(id string) error { return nil } -func (cr configRepository) UpdateChannel(c bootstrap.Channel) error { +func (cr configRepository) UpdateChannel(ctx context.Context, c bootstrap.Channel) error { dbch, err := toDBChannel("", c) if err != nil { return errors.Wrap(errors.ErrUpdateEntity, err) @@ -395,24 +397,24 @@ func (cr configRepository) UpdateChannel(c bootstrap.Channel) error { q := `UPDATE channels SET name = :name, metadata = :metadata, updated_at = :updated_at, updated_by = :updated_by WHERE mainflux_channel = :mainflux_channel` - if _, err = cr.db.NamedExec(q, dbch); err != nil { + if _, err = cr.db.NamedExecContext(ctx, q, dbch); err != nil { return errors.Wrap(errUpdateChannels, err) } return nil } -func (cr configRepository) RemoveChannel(id string) error { +func (cr configRepository) RemoveChannel(ctx context.Context, id string) error { q := `DELETE FROM channels WHERE mainflux_channel = $1` - if _, err := cr.db.Exec(q, id); err != nil { + if _, err := cr.db.ExecContext(ctx, q, id); err != nil { return errors.Wrap(errRemoveChannels, err) } return nil } -func (cr configRepository) DisconnectThing(channelID, thingID string) error { +func (cr configRepository) DisconnectThing(ctx context.Context, channelID, thingID string) error { q := `UPDATE configs SET state = $1 WHERE EXISTS ( SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)` - if _, err := cr.db.Exec(q, bootstrap.Inactive, thingID, channelID); err != nil { + if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID); err != nil { return errors.Wrap(errDisconnectThing, err) } return nil @@ -447,7 +449,7 @@ func (cr configRepository) rollback(content string, tx *sqlx.Tx) { } } -func insertChannels(owner string, channels []bootstrap.Channel, tx *sqlx.Tx) error { +func insertChannels(ctx context.Context, owner string, channels []bootstrap.Channel, tx *sqlx.Tx) error { if len(channels) == 0 { return nil } @@ -473,7 +475,7 @@ func insertChannels(owner string, channels []bootstrap.Channel, tx *sqlx.Tx) err return nil } -func insertConnections(cfg bootstrap.Config, connections []string, tx *sqlx.Tx) error { +func insertConnections(ctx context.Context, cfg bootstrap.Config, connections []string, tx *sqlx.Tx) error { if len(connections) == 0 { return nil } @@ -495,7 +497,7 @@ func insertConnections(cfg bootstrap.Config, connections []string, tx *sqlx.Tx) return err } -func updateConnections(owner, id string, connections []string, tx *sqlx.Tx) error { +func updateConnections(ctx context.Context, owner, id string, connections []string, tx *sqlx.Tx) error { if len(connections) == 0 { return nil } diff --git a/bootstrap/postgres/configs_test.go b/bootstrap/postgres/configs_test.go index 0e802a814f..d3b185e9b7 100644 --- a/bootstrap/postgres/configs_test.go +++ b/bootstrap/postgres/configs_test.go @@ -4,6 +4,7 @@ package postgres_test import ( + "context" "fmt" "strconv" "testing" @@ -38,7 +39,7 @@ var ( func TestSave(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(repo) + err := deleteChannels(context.Background(), repo) require.Nil(t, err, "Channels cleanup expected to succeed.") diff := "different" @@ -90,7 +91,7 @@ func TestSave(t *testing.T) { }, } for _, tc := range cases { - id, err := repo.Save(tc.config, tc.connections) + id, err := repo.Save(context.Background(), tc.config, tc.connections) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) if err == nil { assert.Equal(t, id, tc.config.MFThing, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.config.MFThing, id)) @@ -100,7 +101,7 @@ func TestSave(t *testing.T) { func TestRetrieveByID(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(repo) + err := deleteChannels(context.Background(), repo) require.Nil(t, err, "Channels cleanup expected to succeed.") c := config @@ -111,7 +112,7 @@ func TestRetrieveByID(t *testing.T) { c.MFThing = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() - id, err := repo.Save(c, channels) + id, err := repo.Save(context.Background(), c, channels) require.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) nonexistentConfID, err := uuid.NewV4() @@ -149,14 +150,14 @@ func TestRetrieveByID(t *testing.T) { }, } for _, tc := range cases { - _, err := repo.RetrieveByID(tc.owner, tc.id) + _, err := repo.RetrieveByID(context.Background(), tc.owner, tc.id) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) } } func TestRetrieveAll(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(repo) + err := deleteChannels(context.Background(), repo) require.Nil(t, err, "Channels cleanup expected to succeed.") for i := 0; i < numConfigs; i++ { @@ -178,7 +179,7 @@ func TestRetrieveAll(t *testing.T) { c.MFChannels = nil } - _, err = repo.Save(c, channels) + _, err = repo.Save(context.Background(), c, channels) require.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) } @@ -229,7 +230,7 @@ func TestRetrieveAll(t *testing.T) { }, } for _, tc := range cases { - ret := repo.RetrieveAll(tc.owner, tc.filter, tc.offset, tc.limit) + ret := repo.RetrieveAll(context.Background(), tc.owner, tc.filter, tc.offset, tc.limit) size := len(ret.Configs) assert.Equal(t, tc.size, size, fmt.Sprintf("%s: expected %d got %d\n", tc.desc, tc.size, size)) } @@ -237,7 +238,7 @@ func TestRetrieveAll(t *testing.T) { func TestRetrieveByExternalID(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(repo) + err := deleteChannels(context.Background(), repo) require.Nil(t, err, "Channels cleanup expected to succeed.") c := config @@ -248,7 +249,7 @@ func TestRetrieveByExternalID(t *testing.T) { c.MFThing = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() - _, err = repo.Save(c, channels) + _, err = repo.Save(context.Background(), c, channels) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) cases := []struct { @@ -268,14 +269,14 @@ func TestRetrieveByExternalID(t *testing.T) { }, } for _, tc := range cases { - _, err := repo.RetrieveByExternalID(tc.externalID) + _, err := repo.RetrieveByExternalID(context.Background(), tc.externalID) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) } } func TestUpdate(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(repo) + err := deleteChannels(context.Background(), repo) require.Nil(t, err, "Channels cleanup expected to succeed.") c := config @@ -286,7 +287,7 @@ func TestUpdate(t *testing.T) { c.MFThing = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() - _, err = repo.Save(c, channels) + _, err = repo.Save(context.Background(), c, channels) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) c.Content = "new content" @@ -313,14 +314,14 @@ func TestUpdate(t *testing.T) { }, } for _, tc := range cases { - err := repo.Update(tc.config) + err := repo.Update(context.Background(), tc.config) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) } } func TestUpdateCert(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(repo) + err := deleteChannels(context.Background(), repo) require.Nil(t, err, "Channels cleanup expected to succeed.") c := config @@ -331,7 +332,7 @@ func TestUpdateCert(t *testing.T) { c.MFThing = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() - _, err = repo.Save(c, channels) + _, err = repo.Save(context.Background(), c, channels) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) c.Content = "new content" @@ -369,14 +370,14 @@ func TestUpdateCert(t *testing.T) { }, } for _, tc := range cases { - err := repo.UpdateCert(tc.owner, tc.thingID, tc.cert, tc.certKey, tc.ca) + err := repo.UpdateCert(context.Background(), tc.owner, tc.thingID, tc.cert, tc.certKey, tc.ca) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) } } func TestUpdateConnections(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(repo) + err := deleteChannels(context.Background(), repo) require.Nil(t, err, "Channels cleanup expected to succeed.") c := config @@ -387,7 +388,7 @@ func TestUpdateConnections(t *testing.T) { c.MFThing = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() - _, err = repo.Save(c, channels) + _, err = repo.Save(context.Background(), c, channels) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) // Use UUID to prevent conflicts. uid, err = uuid.NewV4() @@ -397,7 +398,7 @@ func TestUpdateConnections(t *testing.T) { c.ExternalID = uid.String() c.ExternalKey = uid.String() c.MFChannels = []bootstrap.Channel{} - c2, err := repo.Save(c, []string{channels[0]}) + c2, err := repo.Save(context.Background(), c, []string{channels[0]}) assert.Nil(t, err, fmt.Sprintf("Saving a config expected to succeed: %s.\n", err)) cases := []struct { @@ -442,14 +443,14 @@ func TestUpdateConnections(t *testing.T) { }, } for _, tc := range cases { - err := repo.UpdateConnections(tc.owner, tc.id, tc.channels, tc.connections) + err := repo.UpdateConnections(context.Background(), tc.owner, tc.id, tc.channels, tc.connections) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) } } func TestRemove(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(repo) + err := deleteChannels(context.Background(), repo) require.Nil(t, err, "Channels cleanup expected to succeed.") c := config @@ -460,23 +461,23 @@ func TestRemove(t *testing.T) { c.MFThing = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() - id, err := repo.Save(c, channels) + id, err := repo.Save(context.Background(), c, channels) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) // Removal works the same for both existing and non-existing // (removed) config for i := 0; i < 2; i++ { - err := repo.Remove(c.Owner, id) + err := repo.Remove(context.Background(), c.Owner, id) assert.Nil(t, err, fmt.Sprintf("%d: failed to remove config due to: %s", i, err)) - _, err = repo.RetrieveByID(c.Owner, id) + _, err = repo.RetrieveByID(context.Background(), c.Owner, id) assert.True(t, errors.Contains(err, errors.ErrNotFound), fmt.Sprintf("%d: expected %s got %s", i, errors.ErrNotFound, err)) } } func TestChangeState(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(repo) + err := deleteChannels(context.Background(), repo) require.Nil(t, err, "Channels cleanup expected to succeed.") c := config @@ -487,7 +488,7 @@ func TestChangeState(t *testing.T) { c.MFThing = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() - saved, err := repo.Save(c, channels) + saved, err := repo.Save(context.Background(), c, channels) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) cases := []struct { @@ -525,14 +526,14 @@ func TestChangeState(t *testing.T) { }, } for _, tc := range cases { - err := repo.ChangeState(tc.owner, tc.id, tc.state) + err := repo.ChangeState(context.Background(), tc.owner, tc.id, tc.state) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) } } func TestListExisting(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(repo) + err := deleteChannels(context.Background(), repo) require.Nil(t, err, "Channels cleanup expected to succeed.") c := config @@ -543,7 +544,7 @@ func TestListExisting(t *testing.T) { c.MFThing = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() - _, err = repo.Save(c, channels) + _, err = repo.Save(context.Background(), c, channels) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) var chs []bootstrap.Channel @@ -575,7 +576,7 @@ func TestListExisting(t *testing.T) { }, } for _, tc := range cases { - existing, err := repo.ListExisting(tc.owner, tc.connections) + existing, err := repo.ListExisting(context.Background(), tc.owner, tc.connections) assert.Nil(t, err, fmt.Sprintf("%s: unexpected error: %s", tc.desc, err)) assert.ElementsMatch(t, tc.existing, existing, fmt.Sprintf("%s: Got non-matching elements.", tc.desc)) } @@ -583,7 +584,7 @@ func TestListExisting(t *testing.T) { func TestRemoveThing(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(repo) + err := deleteChannels(context.Background(), repo) require.Nil(t, err, "Channels cleanup expected to succeed.") c := config @@ -594,17 +595,17 @@ func TestRemoveThing(t *testing.T) { c.MFThing = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() - saved, err := repo.Save(c, channels) + saved, err := repo.Save(context.Background(), c, channels) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) for i := 0; i < 2; i++ { - err := repo.RemoveThing(saved) + err := repo.RemoveThing(context.Background(), saved) assert.Nil(t, err, fmt.Sprintf("an unexpected error occurred: %s\n", err)) } } func TestUpdateChannel(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(repo) + err := deleteChannels(context.Background(), repo) require.Nil(t, err, "Channels cleanup expected to succeed.") c := config @@ -615,7 +616,7 @@ func TestUpdateChannel(t *testing.T) { c.MFThing = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() - _, err = repo.Save(c, channels) + _, err = repo.Save(context.Background(), c, channels) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) id := c.MFChannels[0].ID @@ -624,10 +625,10 @@ func TestUpdateChannel(t *testing.T) { Name: "update name", Metadata: map[string]interface{}{"update": "metadata update"}, } - err = repo.UpdateChannel(update) + err = repo.UpdateChannel(context.Background(), update) assert.Nil(t, err, fmt.Sprintf("updating config expected to succeed: %s.\n", err)) - cfg, err := repo.RetrieveByID(c.Owner, c.MFThing) + cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.MFThing) assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) var retreved bootstrap.Channel for _, c := range cfg.MFChannels { @@ -642,7 +643,7 @@ func TestUpdateChannel(t *testing.T) { func TestRemoveChannel(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(repo) + err := deleteChannels(context.Background(), repo) require.Nil(t, err, "Channels cleanup expected to succeed.") c := config @@ -652,20 +653,20 @@ func TestRemoveChannel(t *testing.T) { c.MFThing = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() - _, err = repo.Save(c, channels) + _, err = repo.Save(context.Background(), c, channels) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - err = repo.RemoveChannel(c.MFChannels[0].ID) + err = repo.RemoveChannel(context.Background(), c.MFChannels[0].ID) assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) - cfg, err := repo.RetrieveByID(c.Owner, c.MFThing) + cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.MFThing) assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) assert.NotContains(t, cfg.MFChannels, c.MFChannels[0], fmt.Sprintf("expected to remove channel %s from %s", c.MFChannels[0], cfg.MFChannels)) } func TestDisconnectThing(t *testing.T) { repo := postgres.NewConfigRepository(db, testLog) - err := deleteChannels(repo) + err := deleteChannels(context.Background(), repo) require.Nil(t, err, "Channels cleanup expected to succeed.") c := config @@ -676,20 +677,20 @@ func TestDisconnectThing(t *testing.T) { c.MFThing = uid.String() c.ExternalID = uid.String() c.ExternalKey = uid.String() - saved, err := repo.Save(c, channels) + saved, err := repo.Save(context.Background(), c, channels) assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err)) - err = repo.DisconnectThing(c.MFChannels[0].ID, saved) + err = repo.DisconnectThing(context.Background(), c.MFChannels[0].ID, saved) assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) - cfg, err := repo.RetrieveByID(c.Owner, c.MFThing) + cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.MFThing) assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err)) assert.Equal(t, cfg.State, bootstrap.Inactive, fmt.Sprintf("expected ti be inactive when a connection is removed from %s", cfg)) } -func deleteChannels(repo bootstrap.ConfigRepository) error { +func deleteChannels(ctx context.Context, repo bootstrap.ConfigRepository) error { for _, ch := range channels { - if err := repo.RemoveChannel(ch); err != nil { + if err := repo.RemoveChannel(ctx, ch); err != nil { return err } } diff --git a/bootstrap/service.go b/bootstrap/service.go index ff8e08820b..1e2553545d 100644 --- a/bootstrap/service.go +++ b/bootstrap/service.go @@ -128,7 +128,7 @@ func (bs bootstrapService) Add(ctx context.Context, token string, cfg Config) (C toConnect := bs.toIDList(cfg.MFChannels) // Check if channels exist. This is the way to prevent fetching channels that already exist. - existing, err := bs.configs.ListExisting(owner, toConnect) + existing, err := bs.configs.ListExisting(ctx, owner, toConnect) if err != nil { return Config{}, errors.Wrap(errCheckChannels, err) } @@ -150,7 +150,7 @@ func (bs bootstrapService) Add(ctx context.Context, token string, cfg Config) (C cfg.State = Inactive cfg.MFKey = mfThing.Credentials.Secret - saved, err := bs.configs.Save(cfg, toConnect) + saved, err := bs.configs.Save(ctx, cfg, toConnect) if err != nil { if id == "" { if _, errT := bs.sdk.DisableThing(cfg.MFThing, token); errT != nil { @@ -172,7 +172,7 @@ func (bs bootstrapService) View(ctx context.Context, token, id string) (Config, return Config{}, err } - return bs.configs.RetrieveByID(owner, id) + return bs.configs.RetrieveByID(ctx, owner, id) } func (bs bootstrapService) Update(ctx context.Context, token string, cfg Config) error { @@ -183,7 +183,7 @@ func (bs bootstrapService) Update(ctx context.Context, token string, cfg Config) cfg.Owner = owner - return bs.configs.Update(cfg) + return bs.configs.Update(ctx, cfg) } func (bs bootstrapService) UpdateCert(ctx context.Context, token, thingID, clientCert, clientKey, caCert string) error { @@ -191,7 +191,7 @@ func (bs bootstrapService) UpdateCert(ctx context.Context, token, thingID, clien if err != nil { return err } - if err := bs.configs.UpdateCert(owner, thingID, clientCert, clientKey, caCert); err != nil { + if err := bs.configs.UpdateCert(ctx, owner, thingID, clientCert, clientKey, caCert); err != nil { return errors.Wrap(errUpdateCert, err) } return nil @@ -203,7 +203,7 @@ func (bs bootstrapService) UpdateConnections(ctx context.Context, token, id stri return err } - cfg, err := bs.configs.RetrieveByID(owner, id) + cfg, err := bs.configs.RetrieveByID(ctx, owner, id) if err != nil { return errors.Wrap(errUpdateConnections, err) } @@ -211,7 +211,7 @@ func (bs bootstrapService) UpdateConnections(ctx context.Context, token, id stri add, remove := bs.updateList(cfg, connections) // Check if channels exist. This is the way to prevent fetching channels that already exist. - existing, err := bs.configs.ListExisting(owner, connections) + existing, err := bs.configs.ListExisting(ctx, owner, connections) if err != nil { return errors.Wrap(errUpdateConnections, err) } @@ -248,7 +248,7 @@ func (bs bootstrapService) UpdateConnections(ctx context.Context, token, id stri } } - return bs.configs.UpdateConnections(owner, id, channels, connections) + return bs.configs.UpdateConnections(ctx, owner, id, channels, connections) } func (bs bootstrapService) List(ctx context.Context, token string, filter Filter, offset, limit uint64) (ConfigsPage, error) { @@ -257,7 +257,7 @@ func (bs bootstrapService) List(ctx context.Context, token string, filter Filter return ConfigsPage{}, err } - return bs.configs.RetrieveAll(owner, filter, offset, limit), nil + return bs.configs.RetrieveAll(ctx, owner, filter, offset, limit), nil } func (bs bootstrapService) Remove(ctx context.Context, token, id string) error { @@ -265,14 +265,14 @@ func (bs bootstrapService) Remove(ctx context.Context, token, id string) error { if err != nil { return err } - if err := bs.configs.Remove(owner, id); err != nil { + if err := bs.configs.Remove(ctx, owner, id); err != nil { return errors.Wrap(errRemoveBootstrap, err) } return nil } func (bs bootstrapService) Bootstrap(ctx context.Context, externalKey, externalID string, secure bool) (Config, error) { - cfg, err := bs.configs.RetrieveByExternalID(externalID) + cfg, err := bs.configs.RetrieveByExternalID(ctx, externalID) if err != nil { return cfg, errors.Wrap(ErrBootstrap, err) } @@ -296,7 +296,7 @@ func (bs bootstrapService) ChangeState(ctx context.Context, token, id string, st return err } - cfg, err := bs.configs.RetrieveByID(owner, id) + cfg, err := bs.configs.RetrieveByID(ctx, owner, id) if err != nil { return errors.Wrap(errChangeState, err) } @@ -326,35 +326,35 @@ func (bs bootstrapService) ChangeState(ctx context.Context, token, id string, st } } } - if err := bs.configs.ChangeState(owner, id, state); err != nil { + if err := bs.configs.ChangeState(ctx, owner, id, state); err != nil { return errors.Wrap(errChangeState, err) } return nil } func (bs bootstrapService) UpdateChannelHandler(ctx context.Context, channel Channel) error { - if err := bs.configs.UpdateChannel(channel); err != nil { + if err := bs.configs.UpdateChannel(ctx, channel); err != nil { return errors.Wrap(errUpdateChannel, err) } return nil } func (bs bootstrapService) RemoveConfigHandler(ctx context.Context, id string) error { - if err := bs.configs.RemoveThing(id); err != nil { + if err := bs.configs.RemoveThing(ctx, id); err != nil { return errors.Wrap(errRemoveConfig, err) } return nil } func (bs bootstrapService) RemoveChannelHandler(ctx context.Context, id string) error { - if err := bs.configs.RemoveChannel(id); err != nil { + if err := bs.configs.RemoveChannel(ctx, id); err != nil { return errors.Wrap(errRemoveChannel, err) } return nil } func (bs bootstrapService) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error { - if err := bs.configs.DisconnectThing(channelID, thingID); err != nil { + if err := bs.configs.DisconnectThing(ctx, channelID, thingID); err != nil { return errors.Wrap(errDisconnectThing, err) } return nil diff --git a/bootstrap/tracing/doc.go b/bootstrap/tracing/doc.go new file mode 100644 index 0000000000..ed65cf215f --- /dev/null +++ b/bootstrap/tracing/doc.go @@ -0,0 +1,12 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package tracing provides tracing instrumentation for Mainflux Users service. +// +// This package provides tracing middleware for Mainflux Users service. +// It can be used to trace incoming requests and add tracing capabilities to +// Mainflux Users service. +// +// For more details about tracing instrumentation for Mainflux messaging refer +// to the documentation at https://docs.mainflux.io/tracing/. +package tracing diff --git a/bootstrap/tracing/tracing.go b/bootstrap/tracing/tracing.go new file mode 100644 index 0000000000..f444c9d2d8 --- /dev/null +++ b/bootstrap/tracing/tracing.go @@ -0,0 +1,170 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package tracing + +import ( + "context" + + "github.com/mainflux/mainflux/bootstrap" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +var _ bootstrap.Service = (*tracingMiddleware)(nil) + +type tracingMiddleware struct { + tracer trace.Tracer + svc bootstrap.Service +} + +// New returns a new bootstrap service with tracing capabilities. +func New(svc bootstrap.Service, tracer trace.Tracer) bootstrap.Service { + return &tracingMiddleware{tracer, svc} +} + +// Add traces the "Add" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) Add(ctx context.Context, token string, cfg bootstrap.Config) (bootstrap.Config, error) { + ctx, span := tm.tracer.Start(ctx, "svc_register_client", trace.WithAttributes( + attribute.String("mainflux_thing", cfg.MFThing), + attribute.String("owner", cfg.Owner), + attribute.String("name", cfg.Name), + attribute.String("external_id", cfg.ExternalID), + attribute.String("content", cfg.Content), + attribute.String("state", cfg.State.String()), + )) + defer span.End() + + return tm.svc.Add(ctx, token, cfg) +} + +// View traces the "View" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) View(ctx context.Context, token, id string) (bootstrap.Config, error) { + ctx, span := tm.tracer.Start(ctx, "svc_view_client", trace.WithAttributes( + attribute.String("id", id), + )) + defer span.End() + + return tm.svc.View(ctx, token, id) +} + +// Update traces the "Update" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) Update(ctx context.Context, token string, cfg bootstrap.Config) error { + ctx, span := tm.tracer.Start(ctx, "svc_update_client", trace.WithAttributes( + attribute.String("name", cfg.Name), + attribute.String("content", cfg.Content), + attribute.String("mainflux_thing", cfg.MFThing), + attribute.String("owner", cfg.Owner), + )) + defer span.End() + + return tm.svc.Update(ctx, token, cfg) +} + +// UpdateCert traces the "UpdateCert" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) UpdateCert(ctx context.Context, token, thingID, clientCert, clientKey, caCert string) error { + ctx, span := tm.tracer.Start(ctx, "svc_update_cert", trace.WithAttributes( + attribute.String("thing_id", thingID), + )) + defer span.End() + + return tm.svc.UpdateCert(ctx, token, thingID, clientCert, clientKey, caCert) +} + +// UpdateConnections traces the "UpdateConnections" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) UpdateConnections(ctx context.Context, token, id string, connections []string) error { + ctx, span := tm.tracer.Start(ctx, "svc_update_connections", trace.WithAttributes( + attribute.String("id", id), + attribute.StringSlice("connections", connections), + )) + defer span.End() + + return tm.svc.UpdateConnections(ctx, token, id, connections) +} + +// List traces the "List" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) List(ctx context.Context, token string, filter bootstrap.Filter, offset, limit uint64) (bootstrap.ConfigsPage, error) { + ctx, span := tm.tracer.Start(ctx, "svc_list_clients", trace.WithAttributes( + attribute.Int64("offset", int64(offset)), + attribute.Int64("limit", int64(limit)), + )) + defer span.End() + + return tm.svc.List(ctx, token, filter, offset, limit) +} + +// Remove traces the "Remove" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) Remove(ctx context.Context, token, id string) error { + ctx, span := tm.tracer.Start(ctx, "svc_remove_client", trace.WithAttributes( + attribute.String("id", id), + )) + defer span.End() + + return tm.svc.Remove(ctx, token, id) +} + +// Bootstrap traces the "Bootstrap" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) Bootstrap(ctx context.Context, externalKey, externalID string, secure bool) (bootstrap.Config, error) { + ctx, span := tm.tracer.Start(ctx, "svc_bootstrap_client", trace.WithAttributes( + attribute.String("external_key", externalKey), + attribute.String("external_id", externalID), + attribute.Bool("secure", secure), + )) + defer span.End() + + return tm.svc.Bootstrap(ctx, externalKey, externalID, secure) +} + +// ChangeState traces the "ChangeState" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) ChangeState(ctx context.Context, token, id string, state bootstrap.State) error { + ctx, span := tm.tracer.Start(ctx, "svc_change_state", trace.WithAttributes( + attribute.String("id", id), + attribute.String("state", state.String()), + )) + defer span.End() + + return tm.svc.ChangeState(ctx, token, id, state) +} + +// UpdateChannelHandler traces the "UpdateChannelHandler" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) UpdateChannelHandler(ctx context.Context, channel bootstrap.Channel) error { + ctx, span := tm.tracer.Start(ctx, "svc_update_channel_handler", trace.WithAttributes( + attribute.String("id", channel.ID), + attribute.String("name", channel.Name), + attribute.String("description", channel.Description), + )) + defer span.End() + + return tm.svc.UpdateChannelHandler(ctx, channel) +} + +// RemoveConfigHandler traces the "RemoveConfigHandler" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) RemoveConfigHandler(ctx context.Context, id string) error { + ctx, span := tm.tracer.Start(ctx, "svc_remove_config_handler", trace.WithAttributes( + attribute.String("id", id), + )) + defer span.End() + + return tm.svc.RemoveConfigHandler(ctx, id) +} + +// RemoveChannelHandler traces the "RemoveChannelHandler" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) RemoveChannelHandler(ctx context.Context, id string) error { + ctx, span := tm.tracer.Start(ctx, "svc_remove_channel_handler", trace.WithAttributes( + attribute.String("id", id), + )) + defer span.End() + + return tm.svc.RemoveChannelHandler(ctx, id) +} + +// DisconnectThingHandler traces the "DisconnectThingHandler" operation of the wrapped bootstrap.Service. +func (tm *tracingMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error { + ctx, span := tm.tracer.Start(ctx, "svc_disconnect_thing_handler", trace.WithAttributes( + attribute.String("channel_id", channelID), + attribute.String("thing_id", thingID), + )) + defer span.End() + + return tm.svc.DisconnectThingHandler(ctx, channelID, thingID) +} diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go index 454353e61d..384262ac51 100644 --- a/cmd/bootstrap/main.go +++ b/cmd/bootstrap/main.go @@ -12,15 +12,19 @@ import ( chclient "github.com/mainflux/callhome/pkg/client" "github.com/mainflux/mainflux" + "go.opentelemetry.io/otel/trace" bootstrapPg "github.com/mainflux/mainflux/bootstrap/postgres" rediscons "github.com/mainflux/mainflux/bootstrap/redis/consumer" redisprod "github.com/mainflux/mainflux/bootstrap/redis/producer" + "github.com/mainflux/mainflux/bootstrap/tracing" "github.com/mainflux/mainflux/internal" authClient "github.com/mainflux/mainflux/internal/clients/grpc/auth" + jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" pgClient "github.com/mainflux/mainflux/internal/clients/postgres" redisClient "github.com/mainflux/mainflux/internal/clients/redis" "github.com/mainflux/mainflux/internal/env" + "github.com/mainflux/mainflux/internal/postgres" "github.com/mainflux/mainflux/internal/server" httpserver "github.com/mainflux/mainflux/internal/server/http" mflog "github.com/mainflux/mainflux/logger" @@ -83,15 +87,26 @@ func main() { defer esClient.Close() // Create new auth grpc client api - auth, authHandler, err := authClient.Setup(envPrefix, svcName, cfg.JaegerURL) + auth, authHandler, err := authClient.Setup(envPrefix, svcName) if err != nil { logger.Fatal(err.Error()) } defer authHandler.Close() logger.Info("Successfully connected to auth grpc server " + authHandler.Secure()) + tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) + } + defer func() { + if err := tp.Shutdown(context.Background()); err != nil { + logger.Error(fmt.Sprintf("error shutting down tracer provider: %v", err)) + } + }() + tracer := tp.Tracer(svcName) + // Create new service - svc := newService(auth, db, logger, esClient, cfg) + svc := newService(auth, db, tracer, logger, esClient, cfg) // Create an new HTTP server httpServerConfig := server.Config{Port: defSvcHttpPort} @@ -127,8 +142,9 @@ func main() { } } -func newService(auth policies.AuthServiceClient, db *sqlx.DB, logger mflog.Logger, esClient *redis.Client, cfg config) bootstrap.Service { - repoConfig := bootstrapPg.NewConfigRepository(db, logger) +func newService(auth policies.AuthServiceClient, db *sqlx.DB, tracer trace.Tracer, logger mflog.Logger, esClient *redis.Client, cfg config) bootstrap.Service { + database := postgres.NewDatabase(db, tracer) + repoConfig := bootstrapPg.NewConfigRepository(database, logger) config := mfsdk.Config{ ThingsURL: cfg.ThingsURL, @@ -141,6 +157,7 @@ func newService(auth policies.AuthServiceClient, db *sqlx.DB, logger mflog.Logge svc = api.LoggingMiddleware(svc, logger) counter, latency := internal.MakeMetrics(svcName, "api") svc = api.MetricsMiddleware(svc, counter, latency) + svc = tracing.New(svc, tracer) return svc } diff --git a/cmd/cassandra-reader/main.go b/cmd/cassandra-reader/main.go index 166bc0d5cc..dc9de4a453 100644 --- a/cmd/cassandra-reader/main.go +++ b/cmd/cassandra-reader/main.go @@ -36,7 +36,6 @@ const ( type config struct { LogLevel string `env:"MF_CASSANDRA_READER_LOG_LEVEL" envDefault:"info"` - JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"` SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"` } @@ -57,7 +56,7 @@ func main() { } // Create new thing grpc client - tc, tcHandler, err := thingsClient.Setup(envPrefix, cfg.JaegerURL) + tc, tcHandler, err := thingsClient.Setup(envPrefix) if err != nil { logger.Fatal(err.Error()) } @@ -65,7 +64,7 @@ func main() { logger.Info("Successfully connected to things grpc server " + tcHandler.Secure()) // Create new auth grpc client - auth, authHandler, err := authClient.Setup(envPrefix, svcName, cfg.JaegerURL) + auth, authHandler, err := authClient.Setup(envPrefix, svcName) if err != nil { logger.Fatal(err.Error()) } diff --git a/cmd/certs/main.go b/cmd/certs/main.go index b3afd8202f..9cc83184be 100644 --- a/cmd/certs/main.go +++ b/cmd/certs/main.go @@ -43,7 +43,6 @@ type config struct { LogLevel string `env:"MF_CERTS_LOG_LEVEL" envDefault:"info"` CertsURL string `env:"MF_SDK_CERTS_URL" envDefault:"http://localhost"` ThingsURL string `env:"MF_THINGS_URL" envDefault:"http://things:9000"` - JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"` SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"` // Sign and issue certificates without 3rd party PKI @@ -90,7 +89,7 @@ func main() { } defer db.Close() - auth, authHandler, err := authClient.Setup(envPrefix, svcName, cfg.JaegerURL) + auth, authHandler, err := authClient.Setup(envPrefix, svcName) if err != nil { logger.Fatal(err.Error()) } diff --git a/cmd/coap/main.go b/cmd/coap/main.go index 99cb8baf9c..1bf86463a7 100644 --- a/cmd/coap/main.go +++ b/cmd/coap/main.go @@ -58,7 +58,7 @@ func main() { log.Fatalf("failed to init logger: %s", err) } - tc, tcHandler, err := thingsClient.Setup(envPrefix, cfg.JaegerURL) + tc, tcHandler, err := thingsClient.Setup(envPrefix) if err != nil { logger.Fatal(err.Error()) } diff --git a/cmd/http/main.go b/cmd/http/main.go index 949162e317..f3b1fcaa94 100644 --- a/cmd/http/main.go +++ b/cmd/http/main.go @@ -59,7 +59,7 @@ func main() { log.Fatalf("failed to init logger: %s", err) } - tc, tcHandler, err := thingsClient.Setup(envPrefix, cfg.JaegerURL) + tc, tcHandler, err := thingsClient.Setup(envPrefix) if err != nil { logger.Fatal(err.Error()) } diff --git a/cmd/influxdb-reader/main.go b/cmd/influxdb-reader/main.go index 7d23e1b70a..ea4322b0e8 100644 --- a/cmd/influxdb-reader/main.go +++ b/cmd/influxdb-reader/main.go @@ -38,7 +38,6 @@ const ( type config struct { LogLevel string `env:"MF_INFLUX_READER_LOG_LEVEL" envDefault:"info"` - JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"` SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"` } @@ -56,14 +55,14 @@ func main() { log.Fatalf("failed to init logger: %s", err) } - tc, tcHandler, err := thingsClient.Setup(envPrefix, cfg.JaegerURL) + tc, tcHandler, err := thingsClient.Setup(envPrefix) if err != nil { logger.Fatal(err.Error()) } defer tcHandler.Close() logger.Info("Successfully connected to things grpc server " + tcHandler.Secure()) - auth, authHandler, err := authClient.Setup(envPrefix, svcName, cfg.JaegerURL) + auth, authHandler, err := authClient.Setup(envPrefix, svcName) if err != nil { logger.Fatal(err.Error()) } diff --git a/cmd/mongodb-reader/main.go b/cmd/mongodb-reader/main.go index bdf03b4a7c..d91c962222 100644 --- a/cmd/mongodb-reader/main.go +++ b/cmd/mongodb-reader/main.go @@ -37,7 +37,6 @@ const ( type config struct { LogLevel string `env:"MF_MONGO_READER_LOG_LEVEL" envDefault:"info"` - JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"` SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"` } @@ -62,14 +61,14 @@ func main() { repo := newService(db, logger) - tc, tcHandler, err := thingsClient.Setup(envPrefix, cfg.JaegerURL) + tc, tcHandler, err := thingsClient.Setup(envPrefix) if err != nil { logger.Fatal(err.Error()) } defer tcHandler.Close() logger.Info("Successfully connected to things grpc server " + tcHandler.Secure()) - auth, authHandler, err := authClient.Setup(envPrefix, svcName, cfg.JaegerURL) + auth, authHandler, err := authClient.Setup(envPrefix, svcName) if err != nil { logger.Fatal(err.Error()) } diff --git a/cmd/mqtt/main.go b/cmd/mqtt/main.go index dd742b6efd..f7ee56e6df 100644 --- a/cmd/mqtt/main.go +++ b/cmd/mqtt/main.go @@ -135,7 +135,7 @@ func main() { } defer ac.Close() - tc, tcHandler, err := thingsClient.Setup(envPrefix, cfg.JaegerURL) + tc, tcHandler, err := thingsClient.Setup(envPrefix) if err != nil { logger.Fatal(err.Error()) } diff --git a/cmd/postgres-reader/main.go b/cmd/postgres-reader/main.go index 2b9165a89c..3e8c0e0e8a 100644 --- a/cmd/postgres-reader/main.go +++ b/cmd/postgres-reader/main.go @@ -37,7 +37,6 @@ const ( type config struct { LogLevel string `env:"MF_POSTGRES_READER_LOG_LEVEL" envDefault:"info"` - JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"` SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"` } @@ -55,14 +54,14 @@ func main() { log.Fatalf("failed to init logger: %s", err) } - tc, tcHandler, err := thingsClient.Setup(envPrefix, cfg.JaegerURL) + tc, tcHandler, err := thingsClient.Setup(envPrefix) if err != nil { logger.Fatal(err.Error()) } defer tcHandler.Close() logger.Info("Successfully connected to things grpc server " + tcHandler.Secure()) - auth, authHandler, err := authClient.Setup(envPrefix, svcName, cfg.JaegerURL) + auth, authHandler, err := authClient.Setup(envPrefix, svcName) if err != nil { logger.Fatal(err.Error()) } diff --git a/cmd/smpp-notifier/main.go b/cmd/smpp-notifier/main.go index 2befb3e07c..dbcab70ddf 100644 --- a/cmd/smpp-notifier/main.go +++ b/cmd/smpp-notifier/main.go @@ -97,7 +97,7 @@ func main() { pubSub = pstracing.NewPubSub(tracer, pubSub) defer pubSub.Close() - auth, authHandler, err := authClient.Setup(envPrefix, svcName, cfg.JaegerURL) + auth, authHandler, err := authClient.Setup(envPrefix, svcName) if err != nil { logger.Fatal(err.Error()) } diff --git a/cmd/smtp-notifier/main.go b/cmd/smtp-notifier/main.go index 36bffb8d1f..d912989bb2 100644 --- a/cmd/smtp-notifier/main.go +++ b/cmd/smtp-notifier/main.go @@ -80,7 +80,7 @@ func main() { logger.Fatal(fmt.Sprintf("failed to load email configuration : %s", err)) } - tp, err := jaegerClient.NewProvider("smtp-notifier_db", cfg.JaegerURL) + tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL) if err != nil { logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) } @@ -98,7 +98,7 @@ func main() { pubSub = pstracing.NewPubSub(tracer, pubSub) defer pubSub.Close() - auth, authHandler, err := authClient.Setup(envPrefix, svcName, cfg.JaegerURL) + auth, authHandler, err := authClient.Setup(envPrefix, svcName) if err != nil { logger.Fatal(err.Error()) } diff --git a/cmd/things/main.go b/cmd/things/main.go index 383c8adea7..4c15d122b6 100644 --- a/cmd/things/main.go +++ b/cmd/things/main.go @@ -128,7 +128,7 @@ func main() { auth = localusers.NewAuthService(cfg.StandaloneID, cfg.StandaloneToken) logger.Info("Using standalone auth service") default: - authServiceClient, authHandler, err := authClient.Setup(envPrefix, cfg.JaegerURL, svcName) + authServiceClient, authHandler, err := authClient.Setup(envPrefix, svcName) if err != nil { logger.Fatal(err.Error()) } diff --git a/cmd/timescale-reader/main.go b/cmd/timescale-reader/main.go index d464ec0c09..419e4e5d05 100644 --- a/cmd/timescale-reader/main.go +++ b/cmd/timescale-reader/main.go @@ -37,7 +37,6 @@ const ( type config struct { LogLevel string `env:"MF_TIMESCALE_READER_LOG_LEVEL" envDefault:"info"` - JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"` SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"` } @@ -67,14 +66,14 @@ func main() { repo := newService(db, logger) - auth, authHandler, err := authClient.Setup(envPrefix, svcName, cfg.JaegerURL) + auth, authHandler, err := authClient.Setup(envPrefix, svcName) if err != nil { logger.Fatal(err.Error()) } defer authHandler.Close() logger.Info("Successfully connected to auth grpc server " + authHandler.Secure()) - tc, tcHandler, err := thingsClient.Setup(envPrefix, cfg.JaegerURL) + tc, tcHandler, err := thingsClient.Setup(envPrefix) if err != nil { logger.Fatal(err.Error()) } diff --git a/cmd/twins/main.go b/cmd/twins/main.go index 79275230dd..6117a1a326 100644 --- a/cmd/twins/main.go +++ b/cmd/twins/main.go @@ -83,7 +83,7 @@ func main() { logger.Fatal(fmt.Sprintf("failed to setup postgres database : %s", err)) } - tp, err := jaegerClient.NewProvider("twins_db", cfg.JaegerURL) + tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL) if err != nil { logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) } @@ -99,7 +99,7 @@ func main() { case true: auth = localusers.NewAuthService(cfg.StandaloneID, cfg.StandaloneToken) default: - authServiceClient, authHandler, err := authClient.Setup(envPrefix, svcName, cfg.JaegerURL) + authServiceClient, authHandler, err := authClient.Setup(envPrefix, svcName) if err != nil { logger.Fatal(err.Error()) } diff --git a/cmd/ws/main.go b/cmd/ws/main.go index 8e7e5e9685..cb92ee66c0 100644 --- a/cmd/ws/main.go +++ b/cmd/ws/main.go @@ -59,7 +59,7 @@ func main() { log.Fatalf("failed to init logger: %s", err) } - tc, tcHandler, err := thingsClient.Setup(envPrefix, cfg.JaegerURL) + tc, tcHandler, err := thingsClient.Setup(envPrefix) if err != nil { logger.Fatal(err.Error()) } diff --git a/docker/addons/bootstrap/docker-compose.yml b/docker/addons/bootstrap/docker-compose.yml index e6ec665a5c..0ff88e454e 100644 --- a/docker/addons/bootstrap/docker-compose.yml +++ b/docker/addons/bootstrap/docker-compose.yml @@ -49,9 +49,9 @@ services: MF_THINGS_URL: ${MF_THINGS_URL} MF_THINGS_ES_URL: es-redis:${MF_REDIS_TCP_PORT} MF_BOOTSTRAP_ES_URL: es-redis:${MF_REDIS_TCP_PORT} - MF_JAEGER_URL: ${MF_JAEGER_URL} MF_AUTH_GRPC_URL: ${MF_USERS_GRPC_URL} MF_AUTH_GRPC_TIMMEOUT: ${MF_USERS_GRPC_TIMEOUT} + MF_JAEGER_URL: ${MF_JAEGER_URL} MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY} networks: - docker_mainflux-base-net diff --git a/docker/addons/cassandra-reader/docker-compose.yml b/docker/addons/cassandra-reader/docker-compose.yml index 191fa8916c..121cfede1a 100644 --- a/docker/addons/cassandra-reader/docker-compose.yml +++ b/docker/addons/cassandra-reader/docker-compose.yml @@ -24,7 +24,6 @@ services: MF_CASSANDRA_READER_DB_KEYSPACE: ${MF_CASSANDRA_READER_DB_KEYSPACE} MF_CASSANDRA_READER_SERVER_CERT: ${MF_CASSANDRA_READER_SERVER_CERT} MF_CASSANDRA_READER_SERVER_KEY: ${MF_CASSANDRA_READER_SERVER_KEY} - MF_JAEGER_URL: ${MF_JAEGER_URL} MF_THINGS_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} MF_THINGS_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} MF_AUTH_GRPC_URL: ${MF_USERS_GRPC_URL} diff --git a/docker/addons/certs/docker-compose.yml b/docker/addons/certs/docker-compose.yml index bb6fb056bd..37c04f73fe 100644 --- a/docker/addons/certs/docker-compose.yml +++ b/docker/addons/certs/docker-compose.yml @@ -66,7 +66,6 @@ services: MF_VAULT_CA_ROLE_NAME: ${MF_VAULT_CA_ROLE_NAME} MF_VAULT_PKI_PATH: ${MF_VAULT_PKI_PATH} MF_THINGS_URL: ${MF_THINGS_URL} - MF_JAEGER_URL: ${MF_JAEGER_URL} MF_AUTH_GRPC_URL: ${MF_USERS_GRPC_URL} MF_AUTH_GRPC_TIMEOUT: ${MF_USERS_GRPC_TIMEOUT} MF_CERTS_VAULT_HOST: ${MF_CERTS_VAULT_HOST} diff --git a/docker/addons/influxdb-reader/docker-compose.yml b/docker/addons/influxdb-reader/docker-compose.yml index b6b31269ba..8088d02e4a 100644 --- a/docker/addons/influxdb-reader/docker-compose.yml +++ b/docker/addons/influxdb-reader/docker-compose.yml @@ -32,7 +32,6 @@ services: MF_INFLUXDB_ADMIN_URL: ${MF_INFLUXDB_ADMIN_URL} MF_INFLUX_READER_SERVER_CERT: ${MF_INFLUX_READER_SERVER_CERT} MF_INFLUX_READER_SERVER_KEY: ${MF_INFLUX_READER_SERVER_KEY} - MF_JAEGER_URL: ${MF_JAEGER_URL} MF_THINGS_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} MF_THINGS_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} MF_AUTH_GRPC_URL: ${MF_USERS_GRPC_URL} diff --git a/docker/addons/mongodb-reader/docker-compose.yml b/docker/addons/mongodb-reader/docker-compose.yml index c39d6c55f1..3de39eb8a8 100644 --- a/docker/addons/mongodb-reader/docker-compose.yml +++ b/docker/addons/mongodb-reader/docker-compose.yml @@ -26,7 +26,6 @@ services: MF_MONGO_READER_DB_PORT: ${MF_MONGO_READER_DB_PORT} MF_MONGO_READER_SERVER_CERT: ${MF_MONGO_READER_SERVER_CERT} MF_MONGO_READER_SERVER_KEY: ${MF_MONGO_READER_SERVER_KEY} - MF_JAEGER_URL: ${MF_JAEGER_URL} MF_THINGS_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} MF_THINGS_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} MF_AUTH_GRPC_URL: ${MF_USERS_GRPC_URL} diff --git a/docker/addons/postgres-reader/docker-compose.yml b/docker/addons/postgres-reader/docker-compose.yml index 9dc7bae233..19b682fe68 100644 --- a/docker/addons/postgres-reader/docker-compose.yml +++ b/docker/addons/postgres-reader/docker-compose.yml @@ -32,7 +32,6 @@ services: MF_POSTGRES_READER_DB_SSL_CERT: ${MF_POSTGRES_READER_DB_SSL_CERT} MF_POSTGRES_READER_DB_SSL_KEY: ${MF_POSTGRES_READER_DB_SSL_KEY} MF_POSTGRES_READER_DB_SSL_ROOT_CERT: ${MF_POSTGRES_READER_DB_SSL_ROOT_CERT} - MF_JAEGER_URL: ${MF_JAEGER_URL} MF_THINGS_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} MF_THINGS_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} MF_AUTH_GRPC_URL: ${MF_USERS_GRPC_URL} diff --git a/docker/addons/timescale-reader/docker-compose.yml b/docker/addons/timescale-reader/docker-compose.yml index b90c5ebb7c..e65fdf3196 100644 --- a/docker/addons/timescale-reader/docker-compose.yml +++ b/docker/addons/timescale-reader/docker-compose.yml @@ -32,7 +32,6 @@ services: MF_TIMESCALE_READER_DB_SSL_CERT: ${MF_TIMESCALE_READER_DB_SSL_CERT} MF_TIMESCALE_READER_DB_SSL_KEY: ${MF_TIMESCALE_READER_DB_SSL_KEY} MF_TIMESCALE_READER_DB_SSL_ROOT_CERT: ${MF_TIMESCALE_READER_DB_SSL_ROOT_CERT} - MF_JAEGER_URL: ${MF_JAEGER_URL} MF_THINGS_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} MF_THINGS_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} MF_AUTH_GRPC_URL: ${MF_USERS_GRPC_URL} diff --git a/internal/clients/grpc/auth/client.go b/internal/clients/grpc/auth/client.go index 243139fee7..c9bd8f7454 100644 --- a/internal/clients/grpc/auth/client.go +++ b/internal/clients/grpc/auth/client.go @@ -16,12 +16,12 @@ const envAuthGrpcPrefix = "MF_AUTH_GRPC_" var errGrpcConfig = errors.New("failed to load grpc configuration") // Setup loads Auth gRPC configuration from environment variable and creates new Auth gRPC API. -func Setup(envPrefix, jaegerURL, svcName string) (policies.AuthServiceClient, grpcClient.ClientHandler, error) { +func Setup(envPrefix, svcName string) (policies.AuthServiceClient, grpcClient.ClientHandler, error) { config := grpcClient.Config{} if err := env.Parse(&config, env.Options{Prefix: envAuthGrpcPrefix, AltPrefix: envPrefix}); err != nil { return nil, nil, errors.Wrap(errGrpcConfig, err) } - c, ch, err := grpcClient.Setup(config, svcName, jaegerURL) + c, ch, err := grpcClient.Setup(config, svcName) if err != nil { return nil, nil, err } diff --git a/internal/clients/grpc/connect.go b/internal/clients/grpc/connect.go index 014466d780..1d1ffd462e 100644 --- a/internal/clients/grpc/connect.go +++ b/internal/clients/grpc/connect.go @@ -4,14 +4,10 @@ package grpc import ( - "context" - "fmt" "time" - jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" "github.com/mainflux/mainflux/pkg/errors" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" - tracesdk "go.opentelemetry.io/otel/sdk/trace" gogrpc "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -19,9 +15,7 @@ import ( var ( errGrpcConnect = errors.New("failed to connect to grpc server") - errJaeger = errors.New("failed to initialize jaeger ") errGrpcClose = errors.New("failed to close grpc connection") - errJaegerClose = errors.New("failed to shut down jaeger tracer provider") ) type Config struct { @@ -39,7 +33,6 @@ type ClientHandler interface { type Client struct { *gogrpc.ClientConn - *tracesdk.TracerProvider secure bool } @@ -75,7 +68,7 @@ func Connect(cfg Config) (*gogrpc.ClientConn, bool, error) { } // Setup load gRPC configuration from environment variable, creates new gRPC client and connect to gRPC server. -func Setup(config Config, svcName, jaegerURL string) (*Client, ClientHandler, error) { +func Setup(config Config, svcName string) (*Client, ClientHandler, error) { secure := false // connect to auth grpc server @@ -84,13 +77,7 @@ func Setup(config Config, svcName, jaegerURL string) (*Client, ClientHandler, er return nil, nil, errors.Wrap(errGrpcConnect, err) } - // initialize auth tracer for auth grpc client - tp, err := jaegerClient.NewProvider(fmt.Sprintf("auth.%s", svcName), jaegerURL) - if err != nil { - grpcClient.Close() - return nil, nil, errors.Wrap(errJaeger, err) - } - c := &Client{grpcClient, tp, secure} + c := &Client{grpcClient, secure} return c, NewClientHandler(c), nil } @@ -102,9 +89,6 @@ func (c *Client) Close() error { if err != nil { retErr = errors.Wrap(errGrpcClose, err) } - if err := c.TracerProvider.Shutdown(context.Background()); err != nil { - retErr = errors.Wrap(retErr, errors.Wrap(errJaegerClose, err)) - } return retErr } diff --git a/internal/clients/grpc/things/client.go b/internal/clients/grpc/things/client.go index 61bc2aa6e8..795901f2ea 100644 --- a/internal/clients/grpc/things/client.go +++ b/internal/clients/grpc/things/client.go @@ -16,13 +16,13 @@ const envThingsAuthGrpcPrefix = "MF_THINGS_AUTH_GRPC_" var errGrpcConfig = errors.New("failed to load grpc configuration") // Setup loads Things gRPC configuration from environment variable and creates new Things gRPC API. -func Setup(envPrefix, jaegerURL string) (policies.ThingsServiceClient, grpcClient.ClientHandler, error) { +func Setup(envPrefix string) (policies.ThingsServiceClient, grpcClient.ClientHandler, error) { config := grpcClient.Config{} if err := env.Parse(&config, env.Options{Prefix: envThingsAuthGrpcPrefix, AltPrefix: envPrefix}); err != nil { return nil, nil, errors.Wrap(errGrpcConfig, err) } - c, ch, err := grpcClient.Setup(config, "things", jaegerURL) + c, ch, err := grpcClient.Setup(config, "things") if err != nil { return nil, nil, err } diff --git a/internal/postgres/tracing.go b/internal/postgres/tracing.go index f53541de8d..8b8eda24f0 100644 --- a/internal/postgres/tracing.go +++ b/internal/postgres/tracing.go @@ -34,6 +34,9 @@ type Database interface { // QueryxContext queries the database and returns an *sqlx.Rows QueryxContext(context.Context, string, ...interface{}) (*sqlx.Rows, error) + // QueryContext executes a query that returns rows, typically a SELECT. + QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) + // ExecContext executes a query without returning any rows ExecContext(context.Context, string, ...interface{}) (sql.Result, error) @@ -79,8 +82,14 @@ func (d database) QueryxContext(ctx context.Context, query string, args ...inter return d.db.QueryxContext(ctx, query, args...) } +func (d database) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) { + ctx, span := d.addSpanTags(ctx, "QueryContext", query) + defer span.End() + return d.db.QueryContext(ctx, query, args...) +} + func (d database) BeginTxx(ctx context.Context, opts *sql.TxOptions) (*sqlx.Tx, error) { - ctx, span := d.addSpanTags(ctx, "sql_beginTxx", "") + ctx, span := d.addSpanTags(ctx, "BeginTxx", "") defer span.End() return d.db.BeginTxx(ctx, opts) }