Skip to content

Commit

Permalink
NOISSUE - Add instance ID (absmach#1776)
Browse files Browse the repository at this point in the history
* update or to sync with clients branch

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Add empty lines

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* update inline constant

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* rebase pr to sync with master branch

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* update pr to sync with updated master absmach#1849

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

---------

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>
  • Loading branch information
ianmuchyri authored and WashingtonKK committed Jul 12, 2023
1 parent 122c27a commit e6d38db
Show file tree
Hide file tree
Showing 75 changed files with 421 additions and 117 deletions.
1 change: 0 additions & 1 deletion api/openapi/users.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1967,4 +1967,3 @@ components:
security:
- bearerAuth: []
- refreshAuth: []

5 changes: 3 additions & 2 deletions bootstrap/api/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
addExternalKey = "external-key"
addName = "name"
addContent = "config"
instanceID = "5de9b29a-feb9-11ed-be56-0242ac120002"
)

var (
Expand Down Expand Up @@ -201,15 +202,15 @@ func newThingsService(auth upolicies.AuthServiceClient) (clients.Service, groups
func newThingsServer(csvc clients.Service, gsvc groups.Service, psvc tpolicies.Service) *httptest.Server {
logger := mflog.NewMock()
mux := bone.New()
capi.MakeHandler(csvc, mux, logger)
capi.MakeHandler(csvc, mux, logger, instanceID)
gapi.MakeHandler(gsvc, mux, logger)
papi.MakeHandler(csvc, psvc, mux, logger)
return httptest.NewServer(mux)
}

func newBootstrapServer(svc bootstrap.Service) *httptest.Server {
logger := mflog.NewMock()
mux := bsapi.MakeHandler(svc, bootstrap.NewConfigReader(encKey), logger)
mux := bsapi.MakeHandler(svc, bootstrap.NewConfigReader(encKey), logger, instanceID)
return httptest.NewServer(mux)
}

Expand Down
4 changes: 2 additions & 2 deletions bootstrap/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
)

// MakeHandler returns a HTTP handler for API endpoints.
func MakeHandler(svc bootstrap.Service, reader bootstrap.ConfigReader, logger mflog.Logger) http.Handler {
func MakeHandler(svc bootstrap.Service, reader bootstrap.ConfigReader, logger mflog.Logger, instanceID string) http.Handler {
opts := []kithttp.ServerOption{
kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, encodeError)),
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func MakeHandler(svc bootstrap.Service, reader bootstrap.ConfigReader, logger mf
encodeResponse,
opts...))

r.GetFunc("/health", mainflux.Health("bootstrap"))
r.GetFunc("/health", mainflux.Health("bootstrap", instanceID))
r.Handle("/metrics", promhttp.Handler())

return r
Expand Down
3 changes: 2 additions & 1 deletion bootstrap/redis/producer/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
thingStateChange = thingPrefix + "state_change"
thingBootstrap = thingPrefix + "bootstrap"
thingUpdateConnections = thingPrefix + "update_connections"
instanceID = "5de9b29a-feb9-11ed-be56-0242ac120002"
)

var (
Expand Down Expand Up @@ -101,7 +102,7 @@ func newThingsService(auth upolicies.AuthServiceClient) (clients.Service, groups
func newThingsServer(csvc clients.Service, gsvc groups.Service, psvc tpolicies.Service) *httptest.Server {
logger := logger.NewMock()
mux := bone.New()
capi.MakeHandler(csvc, mux, logger)
capi.MakeHandler(csvc, mux, logger, instanceID)
gapi.MakeHandler(gsvc, mux, logger)
papi.MakeHandler(csvc, psvc, mux, logger)
return httptest.NewServer(mux)
Expand Down
3 changes: 2 additions & 1 deletion bootstrap/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
email = "test@example.com"
unknown = "unknown"
channelsNum = 3
instanceID = "5de9b29a-feb9-11ed-be56-0242ac120002"
)

var (
Expand Down Expand Up @@ -92,7 +93,7 @@ func newThingsService(auth upolicies.AuthServiceClient) (clients.Service, groups
func newThingsServer(csvc clients.Service, gsvc groups.Service, psvc tpolicies.Service) *httptest.Server {
logger := mflog.NewMock()
mux := bone.New()
capi.MakeHandler(csvc, mux, logger)
capi.MakeHandler(csvc, mux, logger, instanceID)
gapi.MakeHandler(gsvc, mux, logger)
papi.MakeHandler(csvc, psvc, mux, logger)
return httptest.NewServer(mux)
Expand Down
4 changes: 2 additions & 2 deletions certs/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
)

// MakeHandler returns a HTTP handler for API endpoints.
func MakeHandler(svc certs.Service, logger logger.Logger) http.Handler {
func MakeHandler(svc certs.Service, logger logger.Logger, instanceID string) http.Handler {
opts := []kithttp.ServerOption{
kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, encodeError)),
}
Expand Down Expand Up @@ -63,7 +63,7 @@ func MakeHandler(svc certs.Service, logger logger.Logger) http.Handler {
))

r.Handle("/metrics", promhttp.Handler())
r.GetFunc("/health", mainflux.Health("certs"))
r.GetFunc("/health", mainflux.Health("certs", instanceID))

return r
}
Expand Down
3 changes: 2 additions & 1 deletion certs/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
caPath = "../docker/ssl/certs/ca.crt"
caKeyPath = "../docker/ssl/certs/ca.key"
cfgSignHoursValid = "24h"
instanceID = "5de9b29a-feb9-11ed-be56-0242ac120002"
)

func newService(tokens map[string]string) (certs.Service, error) {
Expand Down Expand Up @@ -367,6 +368,6 @@ func TestViewCert(t *testing.T) {
func newThingsServer(svc clients.Service) *httptest.Server {
logger := logger.NewMock()
mux := bone.New()
httpapi.MakeHandler(svc, mux, logger)
httpapi.MakeHandler(svc, mux, logger, instanceID)
return httptest.NewServer(mux)
}
14 changes: 12 additions & 2 deletions cmd/bootstrap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
httpserver "github.com/mainflux/mainflux/internal/server/http"
mflog "github.com/mainflux/mainflux/logger"
mfsdk "github.com/mainflux/mainflux/pkg/sdk/go"
"github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/users/policies"
"golang.org/x/sync/errgroup"

Expand All @@ -54,6 +55,7 @@ type config struct {
ThingsURL string `env:"MF_THINGS_URL" envDefault:"http://localhost:9000"`
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MF_BOOTSTRAP_INSTANCE_ID" envDefault:""`
}

func main() {
Expand All @@ -70,6 +72,14 @@ func main() {
log.Fatalf("failed to init logger: %s", err)
}

instanceID := cfg.InstanceID
if instanceID == "" {
instanceID, err = uuid.New().ID()
if err != nil {
log.Fatalf("Failed to generate instanceID: %s", err)
}
}

// Create new postgres client
dbConfig := pgClient.Config{Name: defDB}

Expand All @@ -94,7 +104,7 @@ func main() {
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())

tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL)
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err))
}
Expand All @@ -113,7 +123,7 @@ func main() {
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, bootstrap.NewConfigReader([]byte(cfg.EncKey)), logger), logger)
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, bootstrap.NewConfigReader([]byte(cfg.EncKey)), logger, instanceID), logger)

if cfg.SendTelemetry {
chc := chclient.New(svcName, mainflux.Version, logger, cancel)
Expand Down
12 changes: 11 additions & 1 deletion cmd/cassandra-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/mainflux/mainflux/internal/server"
httpserver "github.com/mainflux/mainflux/internal/server/http"
mflog "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/readers"
"github.com/mainflux/mainflux/readers/api"
"github.com/mainflux/mainflux/readers/cassandra"
Expand All @@ -37,6 +38,7 @@ const (
type config struct {
LogLevel string `env:"MF_CASSANDRA_READER_LOG_LEVEL" envDefault:"info"`
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MF_CASSANDRA_READER_INSTANCE_ID" envDefault:""`
}

func main() {
Expand All @@ -55,6 +57,14 @@ func main() {
log.Fatalf("failed to init logger: %s", err)
}

instanceID := cfg.InstanceID
if instanceID == "" {
instanceID, err = uuid.New().ID()
if err != nil {
log.Fatalf("Failed to generate instanceID: %s", err)
}
}

// Create new thing grpc client
tc, tcHandler, err := thingsClient.Setup(envPrefix)
if err != nil {
Expand Down Expand Up @@ -88,7 +98,7 @@ func main() {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}

hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, tc, auth, svcName), logger)
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, tc, auth, svcName, instanceID), logger)

if cfg.SendTelemetry {
chc := chclient.New(svcName, mainflux.Version, logger, cancel)
Expand Down
14 changes: 12 additions & 2 deletions cmd/cassandra-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
mflog "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/brokers"
"github.com/mainflux/mainflux/pkg/messaging/tracing"
"github.com/mainflux/mainflux/pkg/uuid"
"golang.org/x/sync/errgroup"
)

Expand All @@ -42,6 +43,7 @@ type config struct {
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"localhost:6831"`
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MF_CASSANDRA_WRITER_INSTANCE_ID" envDefault:""`
}

func main() {
Expand All @@ -59,14 +61,22 @@ func main() {
log.Fatalf("failed to init logger: %s", err)
}

instanceID := cfg.InstanceID
if instanceID == "" {
instanceID, err = uuid.New().ID()
if err != nil {
log.Fatalf("Failed to generate instanceID: %s", err)
}
}

// Create new to cassandra client
csdSession, err := cassandraClient.SetupDB(envPrefix, cassandra.Table)
if err != nil {
logger.Fatal(err.Error())
}
defer csdSession.Close()

tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL)
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
}
Expand Down Expand Up @@ -100,7 +110,7 @@ func main() {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}

hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName), logger)
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger)

if cfg.SendTelemetry {
chc := chclient.New(svcName, mainflux.Version, logger, cancel)
Expand Down
12 changes: 11 additions & 1 deletion cmd/certs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
authClient "github.com/mainflux/mainflux/internal/clients/grpc/auth"
pgClient "github.com/mainflux/mainflux/internal/clients/postgres"
mfsdk "github.com/mainflux/mainflux/pkg/sdk/go"
"github.com/mainflux/mainflux/pkg/uuid"
)

const (
Expand All @@ -44,6 +45,7 @@ type config struct {
CertsURL string `env:"MF_SDK_CERTS_URL" envDefault:"http://localhost"`
ThingsURL string `env:"MF_THINGS_URL" envDefault:"http://things:9000"`
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MF_CERTS_INSTANCE_ID" envDefault:""`

// Sign and issue certificates without 3rd party PKI
SignCAPath string `env:"MF_CERTS_SIGN_CA_PATH" envDefault:"ca.crt"`
Expand Down Expand Up @@ -73,6 +75,14 @@ func main() {
log.Fatalf("failed to init logger: %s", err)
}

instanceID := cfg.InstanceID
if instanceID == "" {
instanceID, err = uuid.New().ID()
if err != nil {
log.Fatalf("Failed to generate instanceID: %s", err)
}
}

if cfg.PkiHost == "" {
logger.Fatal("No host specified for PKI engine")
}
Expand Down Expand Up @@ -102,7 +112,7 @@ func main() {
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, logger), logger)
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, instanceID), logger)

if cfg.SendTelemetry {
chc := chclient.New(svcName, mainflux.Version, logger, cancel)
Expand Down
14 changes: 12 additions & 2 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
mflog "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/brokers"
pstracing "github.com/mainflux/mainflux/pkg/messaging/tracing"
"github.com/mainflux/mainflux/pkg/uuid"
"golang.org/x/sync/errgroup"
)

Expand All @@ -42,6 +43,7 @@ type config struct {
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MF_COAP_ADAPTER_INSTANCE_ID" envDefault:""`
}

func main() {
Expand All @@ -58,14 +60,22 @@ func main() {
log.Fatalf("failed to init logger: %s", err)
}

instanceID := cfg.InstanceID
if instanceID == "" {
instanceID, err = uuid.New().ID()
if err != nil {
log.Fatalf("Failed to generate instanceID: %s", err)
}
}

tc, tcHandler, err := thingsClient.Setup(envPrefix)
if err != nil {
logger.Fatal(err.Error())
}
defer tcHandler.Close()
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())

tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL)
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
}
Expand Down Expand Up @@ -96,7 +106,7 @@ func main() {
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(), logger)
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(instanceID), logger)

coapServerConfig := server.Config{Port: defSvcCoapPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixCoap, AltPrefix: envPrefix}); err != nil {
Expand Down
14 changes: 12 additions & 2 deletions cmd/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/pkg/messaging/brokers"
pstracing "github.com/mainflux/mainflux/pkg/messaging/tracing"
"github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/things/policies"
"go.opentelemetry.io/otel/trace"

Expand All @@ -43,6 +44,7 @@ type config struct {
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MF_HTTP_ADAPTER_INSTANCE_ID" envDefault:""`
}

func main() {
Expand All @@ -59,14 +61,22 @@ func main() {
log.Fatalf("failed to init logger: %s", err)
}

instanceID := cfg.InstanceID
if instanceID == "" {
instanceID, err = uuid.New().ID()
if err != nil {
log.Fatalf("Failed to generate instanceID: %s", err)
}
}

tc, tcHandler, err := thingsClient.Setup(envPrefix)
if err != nil {
logger.Fatal(err.Error())
}
defer tcHandler.Close()
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())

tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL)
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
}
Expand All @@ -90,7 +100,7 @@ func main() {
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc), logger)
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, instanceID), logger)

if cfg.SendTelemetry {
chc := chclient.New(svcName, mainflux.Version, logger, cancel)
Expand Down
Loading

0 comments on commit e6d38db

Please sign in to comment.