Skip to content

Commit

Permalink
Remove configurable wildcard env var and mqtt notif leftovers
Browse files Browse the repository at this point in the history
Signed-off-by: Darko Draskovic <darko.draskovic@gmail.com>
  • Loading branch information
darkodraskovic committed Jul 7, 2020
1 parent dd83309 commit 060a294
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 109 deletions.
154 changes: 75 additions & 79 deletions cmd/twins/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,66 +42,63 @@ import (
const (
queue = "twins"

defLogLevel = "error"
defHTTPPort = "8180"
defJaegerURL = ""
defServerCert = ""
defServerKey = ""
defDB = "mainflux-twins"
defDBHost = "localhost"
defDBPort = "27017"
defCacheURL = "localhost:6379"
defCachePass = ""
defCacheDB = "0"
defSingleUserEmail = ""
defSingleUserToken = ""
defClientTLS = "false"
defCACerts = ""
defChannelID = ""
defSubtopicWildcard = "#"
defNatsURL = "nats://localhost:4222"
defAuthnURL = "localhost:8181"
defAuthnTimeout = "1s"

envLogLevel = "MF_TWINS_LOG_LEVEL"
envHTTPPort = "MF_TWINS_HTTP_PORT"
envJaegerURL = "MF_JAEGER_URL"
envServerCert = "MF_TWINS_SERVER_CERT"
envServerKey = "MF_TWINS_SERVER_KEY"
envDB = "MF_TWINS_DB"
envDBHost = "MF_TWINS_DB_HOST"
envDBPort = "MF_TWINS_DB_PORT"
envCacheURL = "MF_TWINS_CACHE_URL"
envCachePass = "MF_TWINS_CACHE_PASS"
envCacheDB = "MF_TWINS_CACHE_DB"
envSingleUserEmail = "MF_TWINS_SINGLE_USER_EMAIL"
envSingleUserToken = "MF_TWINS_SINGLE_USER_TOKEN"
envClientTLS = "MF_TWINS_CLIENT_TLS"
envCACerts = "MF_TWINS_CA_CERTS"
envChannelID = "MF_TWINS_CHANNEL_ID"
envSubtopicWildcard = "MF_TWINS_SUBTOPIC_WILDCARD"
envNatsURL = "MF_NATS_URL"
envAuthnURL = "MF_AUTHN_GRPC_URL"
envAuthnTimeout = "MF_AUTHN_GRPC_TIMEOUT"
defLogLevel = "error"
defHTTPPort = "8180"
defJaegerURL = ""
defServerCert = ""
defServerKey = ""
defDB = "mainflux-twins"
defDBHost = "localhost"
defDBPort = "27017"
defCacheURL = "localhost:6379"
defCachePass = ""
defCacheDB = "0"
defSingleUserEmail = ""
defSingleUserToken = ""
defClientTLS = "false"
defCACerts = ""
defChannelID = ""
defNatsURL = "nats://localhost:4222"
defAuthnURL = "localhost:8181"
defAuthnTimeout = "1s"

envLogLevel = "MF_TWINS_LOG_LEVEL"
envHTTPPort = "MF_TWINS_HTTP_PORT"
envJaegerURL = "MF_JAEGER_URL"
envServerCert = "MF_TWINS_SERVER_CERT"
envServerKey = "MF_TWINS_SERVER_KEY"
envDB = "MF_TWINS_DB"
envDBHost = "MF_TWINS_DB_HOST"
envDBPort = "MF_TWINS_DB_PORT"
envCacheURL = "MF_TWINS_CACHE_URL"
envCachePass = "MF_TWINS_CACHE_PASS"
envCacheDB = "MF_TWINS_CACHE_DB"
envSingleUserEmail = "MF_TWINS_SINGLE_USER_EMAIL"
envSingleUserToken = "MF_TWINS_SINGLE_USER_TOKEN"
envClientTLS = "MF_TWINS_CLIENT_TLS"
envCACerts = "MF_TWINS_CA_CERTS"
envChannelID = "MF_TWINS_CHANNEL_ID"
envNatsURL = "MF_NATS_URL"
envAuthnURL = "MF_AUTHN_GRPC_URL"
envAuthnTimeout = "MF_AUTHN_GRPC_TIMEOUT"
)

type config struct {
logLevel string
httpPort string
jaegerURL string
serverCert string
serverKey string
dbCfg twmongodb.Config
cacheURL string
cachePass string
cacheDB string
singleUserEmail string
singleUserToken string
clientTLS bool
caCerts string
channelID string
subtopicWildcard string
natsURL string
logLevel string
httpPort string
jaegerURL string
serverCert string
serverKey string
dbCfg twmongodb.Config
cacheURL string
cachePass string
cacheDB string
singleUserEmail string
singleUserToken string
clientTLS bool
caCerts string
channelID string
natsURL string

authnURL string
authnTimeout time.Duration
Expand Down Expand Up @@ -138,7 +135,7 @@ func main() {
}
defer pubSub.Close()

svc := newService(pubSub, cfg.channelID, cfg.subtopicWildcard, auth, dbTracer, db, cacheTracer, cacheClient, logger)
svc := newService(pubSub, cfg.channelID, auth, dbTracer, db, cacheTracer, cacheClient, logger)

tracer, closer := initJaeger("twins", cfg.jaegerURL, logger)
defer closer.Close()
Expand Down Expand Up @@ -173,24 +170,23 @@ func loadConfig() config {
}

return config{
logLevel: mainflux.Env(envLogLevel, defLogLevel),
httpPort: mainflux.Env(envHTTPPort, defHTTPPort),
serverCert: mainflux.Env(envServerCert, defServerCert),
serverKey: mainflux.Env(envServerKey, defServerKey),
jaegerURL: mainflux.Env(envJaegerURL, defJaegerURL),
dbCfg: dbCfg,
cacheURL: mainflux.Env(envCacheURL, defCacheURL),
cachePass: mainflux.Env(envCachePass, defCachePass),
cacheDB: mainflux.Env(envCacheDB, defCacheDB),
singleUserEmail: mainflux.Env(envSingleUserEmail, defSingleUserEmail),
singleUserToken: mainflux.Env(envSingleUserToken, defSingleUserToken),
clientTLS: tls,
caCerts: mainflux.Env(envCACerts, defCACerts),
channelID: mainflux.Env(envChannelID, defChannelID),
subtopicWildcard: mainflux.Env(envSubtopicWildcard, defSubtopicWildcard),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
authnURL: mainflux.Env(envAuthnURL, defAuthnURL),
authnTimeout: authnTimeout,
logLevel: mainflux.Env(envLogLevel, defLogLevel),
httpPort: mainflux.Env(envHTTPPort, defHTTPPort),
serverCert: mainflux.Env(envServerCert, defServerCert),
serverKey: mainflux.Env(envServerKey, defServerKey),
jaegerURL: mainflux.Env(envJaegerURL, defJaegerURL),
dbCfg: dbCfg,
cacheURL: mainflux.Env(envCacheURL, defCacheURL),
cachePass: mainflux.Env(envCachePass, defCachePass),
cacheDB: mainflux.Env(envCacheDB, defCacheDB),
singleUserEmail: mainflux.Env(envSingleUserEmail, defSingleUserEmail),
singleUserToken: mainflux.Env(envSingleUserToken, defSingleUserToken),
clientTLS: tls,
caCerts: mainflux.Env(envCACerts, defCACerts),
channelID: mainflux.Env(envChannelID, defChannelID),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
authnURL: mainflux.Env(envAuthnURL, defAuthnURL),
authnTimeout: authnTimeout,
}
}

Expand Down Expand Up @@ -266,8 +262,8 @@ func connectToRedis(cacheURL, cachePass, cacheDB string, logger logger.Logger) *
})
}

func newService(ps messaging.PubSub, chanID string, subtopicWildcard string, users mainflux.AuthNServiceClient, dbTracer opentracing.Tracer, db *mongo.Database, cacheTracer opentracing.Tracer, cacheClient *redis.Client, logger logger.Logger) twins.Service {
twinRepo := twmongodb.NewTwinRepository(db, subtopicWildcard)
func newService(ps messaging.PubSub, chanID string, users mainflux.AuthNServiceClient, dbTracer opentracing.Tracer, db *mongo.Database, cacheTracer opentracing.Tracer, cacheClient *redis.Client, logger logger.Logger) twins.Service {
twinRepo := twmongodb.NewTwinRepository(db)
twinRepo = tracing.TwinRepositoryMiddleware(dbTracer, twinRepo)

stateRepo := twmongodb.NewStateRepository(db)
Expand All @@ -277,7 +273,7 @@ func newService(ps messaging.PubSub, chanID string, subtopicWildcard string, use
twinCache := rediscache.NewTwinCache(cacheClient)
twinCache = tracing.TwinCacheMiddleware(cacheTracer, twinCache)

svc := twins.New(ps, users, twinRepo, twinCache, stateRepo, up, chanID, subtopicWildcard, logger)
svc := twins.New(ps, users, twinRepo, twinCache, stateRepo, up, chanID, logger)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
Expand Down
3 changes: 0 additions & 3 deletions twins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ default values.
| MF_TWINS_CLIENT_TLS | Flag that indicates if TLS should be turned on | false |
| MF_TWINS_CA_CERTS | Path to trusted CAs in PEM format | |
| MF_TWINS_CHANNEL_ID | NATS notifications channel id | |
| MF_TWINS_SUBTOPIC_WILDCARD | Subtopic wildcard for attribute's definition | # |
| MF_NATS_URL | Mainflux NATS broker URL | nats://localhost:4222 |
| MF_AUTHN_GRPC_URL | AuthN service gRPC URL | localhost:8181 |
| MF_AUTHN_GRPC_TIMEOUT | AuthN service gRPC request timeout in seconds | 1s |
Expand Down Expand Up @@ -64,7 +63,6 @@ services:
MF_TWINS_CLIENT_TLS: [Flag that indicates if TLS should be turned on]
MF_TWINS_CA_CERTS: [Path to trusted CAs in PEM format]
MF_TWINS_CHANNEL_ID: [NATS notifications channel id]
MF_TWINS_SUBTOPIC_WILDCARD: [Subtopic wildcard for attribute's definition]
MF_NATS_URL: [Mainflux NATS broker URL]
MF_AUTHN_GRPC_URL: [AuthN service gRPC URL]
MF_AUTHN_GRPC_TIMEOUT: [AuthN service gRPC request timeout in seconds]
Expand Down Expand Up @@ -101,7 +99,6 @@ MF_TWINS_SINGLE_USER_TOKEN: [User token for single user mode] \
MF_TWINS_CLIENT_TLS: [Flag that indicates if TLS should be turned on] \
MF_TWINS_CA_CERTS: [Path to trusted CAs in PEM format] \
MF_TWINS_CHANNEL_ID: [NATS notifications channel id] \
MF_TWINS_SUBTOPIC_WILDCARD: [Subtopic wildcard for attribute's definition] \
MF_NATS_URL: [Mainflux NATS broker URL] \
MF_AUTHN_GRPC_URL: [AuthN service gRPC URL] \
MF_AUTHN_GRPC_TIMEOUT: [AuthN service gRPC request timeout in seconds] \
Expand Down
2 changes: 1 addition & 1 deletion twins/mocks/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewService(tokens map[string]string) twins.Service {
uuidProvider := uuid.NewMock()
subs := map[string]string{"chanID": "chanID"}
broker := NewBroker(subs)
return twins.New(broker, auth, twinsRepo, twinCache, statesRepo, uuidProvider, "chanID", "#", nil)
return twins.New(broker, auth, twinsRepo, twinCache, statesRepo, uuidProvider, "chanID", nil)
}

// CreateDefinition creates twin definition
Expand Down
7 changes: 3 additions & 4 deletions twins/mongodb/twins.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ type twinRepository struct {
var _ twins.TwinRepository = (*twinRepository)(nil)

// NewTwinRepository instantiates a MongoDB implementation of twin repository.
func NewTwinRepository(db *mongo.Database, sw string) twins.TwinRepository {
func NewTwinRepository(db *mongo.Database) twins.TwinRepository {
return &twinRepository{
db: db,
subtopicWildcard: sw,
db: db,
}
}

Expand Down Expand Up @@ -97,7 +96,7 @@ func (tr *twinRepository) RetrieveByAttribute(ctx context.Context, channel, subt
"definition.channel": channel,
"$or": []interface{}{
bson.M{"definition.subtopic": subtopic},
bson.M{"definition.subtopic": "#"},
bson.M{"definition.subtopic": twins.SubtopicWildcard},
},
},
}
Expand Down
43 changes: 21 additions & 22 deletions twins/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ const (
noop = iota
update
save
millisec = 1e6
nanosec = 1e9
millisec = 1e6
nanosec = 1e9
SubtopicWildcard = ">"
)

var crudOp = map[string]string{
Expand All @@ -88,31 +89,29 @@ var crudOp = map[string]string{
}

type twinsService struct {
publisher messaging.Publisher
auth mainflux.AuthNServiceClient
twins TwinRepository
states StateRepository
uuidProvider mainflux.UUIDProvider
channelID string
subtopicWildcard string
twinCache TwinCache
logger logger.Logger
publisher messaging.Publisher
auth mainflux.AuthNServiceClient
twins TwinRepository
states StateRepository
uuidProvider mainflux.UUIDProvider
channelID string
twinCache TwinCache
logger logger.Logger
}

var _ Service = (*twinsService)(nil)

// New instantiates the twins service implementation.
func New(publisher messaging.Publisher, auth mainflux.AuthNServiceClient, twins TwinRepository, tcache TwinCache, sr StateRepository, idp mainflux.UUIDProvider, chann string, sw string, logger logger.Logger) Service {
func New(publisher messaging.Publisher, auth mainflux.AuthNServiceClient, twins TwinRepository, tcache TwinCache, sr StateRepository, idp mainflux.UUIDProvider, chann string, logger logger.Logger) Service {
return &twinsService{
publisher: publisher,
auth: auth,
twins: twins,
twinCache: tcache,
states: sr,
uuidProvider: idp,
subtopicWildcard: sw,
channelID: chann,
logger: logger,
publisher: publisher,
auth: auth,
twins: twins,
twinCache: tcache,
states: sr,
uuidProvider: idp,
channelID: chann,
logger: logger,
}
}

Expand Down Expand Up @@ -364,7 +363,7 @@ func (ts *twinsService) prepareState(st *State, tw *Twin, rec senml.Record, msg
if !attr.PersistState {
continue
}
if attr.Channel == msg.Channel && (attr.Subtopic == ts.subtopicWildcard || attr.Subtopic == msg.Subtopic) {
if attr.Channel == msg.Channel && (attr.Subtopic == SubtopicWildcard || attr.Subtopic == msg.Subtopic) {
action = update
delta := math.Abs(float64(st.Created.UnixNano()) - recNano)
if recNano == 0 || delta > float64(def.Delta) {
Expand Down

0 comments on commit 060a294

Please sign in to comment.