Skip to content

Commit

Permalink
NOISSUE - Adding subtopics filtering in writer services (#1072)
Browse files Browse the repository at this point in the history
* Add feature of filtering by subtopics in writer

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Fix mistake

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Refactoring writer sevices

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Rename variables related to filter (channels & subtopics)

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Set default value of filtering when configuration file doesn't exist

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Add a blank line at the end of the file

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Refactor loading filters configuration (moving into writer package, merge both loading methods & returning error)

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Remove useless log

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Change type of variables (channels & subtopics) and simplify loading method

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Add logging error when loading filters

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Simplify return configuration in loading method

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Merge both filter files into one file

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Move loading subjects into writer package

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Add subscribe to selected subjects

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Edit README of writer services

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Keep only subscribe loop

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Use full NATS subjects

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

* Edit comment in subjects files

Signed-off-by: Jonathan Dreyer <jonathan.dreyer@cleanenergie.ch>

Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
  • Loading branch information
jonathandreyer and drasko authored Mar 30, 2020
1 parent d2153a8 commit 46aadcf
Show file tree
Hide file tree
Showing 21 changed files with 255 additions and 354 deletions.
90 changes: 30 additions & 60 deletions cmd/cassandra-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package main

import (
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
Expand All @@ -14,7 +13,6 @@ import (
"strings"
"syscall"

"github.com/BurntSushi/toml"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/gocql/gocql"
"github.com/mainflux/mainflux"
Expand All @@ -31,33 +29,33 @@ const (
svcName = "cassandra-writer"
sep = ","

defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "8180"
defCluster = "127.0.0.1"
defKeyspace = "mainflux"
defDBUsername = ""
defDBPassword = ""
defDBPort = "9042"
defChanCfgPath = "/config/channels.toml"

envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_CASSANDRA_WRITER_LOG_LEVEL"
envPort = "MF_CASSANDRA_WRITER_PORT"
envCluster = "MF_CASSANDRA_WRITER_DB_CLUSTER"
envKeyspace = "MF_CASSANDRA_WRITER_DB_KEYSPACE"
envDBUsername = "MF_CASSANDRA_WRITER_DB_USERNAME"
envDBPassword = "MF_CASSANDRA_WRITER_DB_PASSWORD"
envDBPort = "MF_CASSANDRA_WRITER_DB_PORT"
envChanCfgPath = "MF_CASSANDRA_WRITER_CHANNELS_CONFIG"
defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "8180"
defCluster = "127.0.0.1"
defKeyspace = "mainflux"
defDBUsername = ""
defDBPassword = ""
defDBPort = "9042"
defSubjectsCfgPath = "/config/subjects.toml"

envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_CASSANDRA_WRITER_LOG_LEVEL"
envPort = "MF_CASSANDRA_WRITER_PORT"
envCluster = "MF_CASSANDRA_WRITER_DB_CLUSTER"
envKeyspace = "MF_CASSANDRA_WRITER_DB_KEYSPACE"
envDBUsername = "MF_CASSANDRA_WRITER_DB_USERNAME"
envDBPassword = "MF_CASSANDRA_WRITER_DB_PASSWORD"
envDBPort = "MF_CASSANDRA_WRITER_DB_PORT"
envSubjectsCfgPath = "MF_CASSANDRA_WRITER_SUBJECTS_CONFIG"
)

type config struct {
natsURL string
logLevel string
port string
dbCfg cassandra.DBConfig
channels map[string]bool
natsURL string
logLevel string
port string
dbCfg cassandra.DBConfig
subjectsCfgPath string
}

func main() {
Expand All @@ -76,7 +74,7 @@ func main() {

repo := newService(session, logger)
st := senml.New()
if err := writers.Start(nc, repo, st, svcName, cfg.channels, logger); err != nil {
if err := writers.Start(nc, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err))
}

Expand Down Expand Up @@ -108,43 +106,15 @@ func loadConfig() config {
Port: dbPort,
}

chanCfgPath := mainflux.Env(envChanCfgPath, defChanCfgPath)
return config{
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbCfg: dbCfg,
channels: loadChansConfig(chanCfgPath),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbCfg: dbCfg,
subjectsCfgPath: mainflux.Env(envSubjectsCfgPath, defSubjectsCfgPath),
}
}

type channels struct {
List []string `toml:"filter"`
}

type chanConfig struct {
Channels channels `toml:"channels"`
}

func loadChansConfig(chanConfigPath string) map[string]bool {
data, err := ioutil.ReadFile(chanConfigPath)
if err != nil {
log.Fatal(err)
}

var chanCfg chanConfig
if err := toml.Unmarshal(data, &chanCfg); err != nil {
log.Fatal(err)
}

chans := map[string]bool{}
for _, ch := range chanCfg.Channels.List {
chans[ch] = true
}

return chans
}

func connectToNATS(url string, logger logger.Logger) *nats.Conn {
nc, err := nats.Connect(url)
if err != nil {
Expand Down
106 changes: 38 additions & 68 deletions cmd/influxdb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ package main

import (
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/BurntSushi/toml"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
Expand All @@ -28,37 +26,37 @@ import (
const (
svcName = "influxdb-writer"

defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "8180"
defDBName = "mainflux"
defDBHost = "localhost"
defDBPort = "8086"
defDBUser = "mainflux"
defDBPass = "mainflux"
defChanCfgPath = "/config/channels.toml"

envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL"
envPort = "MF_INFLUX_WRITER_PORT"
envDBName = "MF_INFLUX_WRITER_DB_NAME"
envDBHost = "MF_INFLUX_WRITER_DB_HOST"
envDBPort = "MF_INFLUX_WRITER_DB_PORT"
envDBUser = "MF_INFLUX_WRITER_DB_USER"
envDBPass = "MF_INFLUX_WRITER_DB_PASS"
envChanCfgPath = "MF_INFLUX_WRITER_CHANNELS_CONFIG"
defNatsURL = nats.DefaultURL
defLogLevel = "error"
defPort = "8180"
defDBName = "mainflux"
defDBHost = "localhost"
defDBPort = "8086"
defDBUser = "mainflux"
defDBPass = "mainflux"
defSubjectsCfgPath = "/config/subjects.toml"

envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL"
envPort = "MF_INFLUX_WRITER_PORT"
envDBName = "MF_INFLUX_WRITER_DB_NAME"
envDBHost = "MF_INFLUX_WRITER_DB_HOST"
envDBPort = "MF_INFLUX_WRITER_DB_PORT"
envDBUser = "MF_INFLUX_WRITER_DB_USER"
envDBPass = "MF_INFLUX_WRITER_DB_PASS"
envSubjectsCfgPath = "MF_INFLUX_WRITER_SUBJECTS_CONFIG"
)

type config struct {
natsURL string
logLevel string
port string
dbName string
dbHost string
dbPort string
dbUser string
dbPass string
channels map[string]bool
natsURL string
logLevel string
port string
dbName string
dbHost string
dbPort string
dbUser string
dbPass string
subjectsCfgPath string
}

func main() {
Expand Down Expand Up @@ -89,7 +87,7 @@ func main() {
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(repo, counter, latency)
st := senml.New()
if err := writers.Start(nc, repo, st, svcName, cfg.channels, logger); err != nil {
if err := writers.Start(nc, repo, st, svcName, cfg.subjectsCfgPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err))
os.Exit(1)
}
Expand All @@ -108,17 +106,16 @@ func main() {
}

func loadConfigs() (config, influxdata.HTTPConfig) {
chanCfgPath := mainflux.Env(envChanCfgPath, defChanCfgPath)
cfg := config{
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbName: mainflux.Env(envDBName, defDBName),
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
dbUser: mainflux.Env(envDBUser, defDBUser),
dbPass: mainflux.Env(envDBPass, defDBPass),
channels: loadChansConfig(chanCfgPath),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbName: mainflux.Env(envDBName, defDBName),
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
dbUser: mainflux.Env(envDBUser, defDBUser),
dbPass: mainflux.Env(envDBPass, defDBPass),
subjectsCfgPath: mainflux.Env(envSubjectsCfgPath, defSubjectsCfgPath),
}

clientCfg := influxdata.HTTPConfig{
Expand All @@ -130,33 +127,6 @@ func loadConfigs() (config, influxdata.HTTPConfig) {
return cfg, clientCfg
}

type channels struct {
List []string `toml:"filter"`
}

type chanConfig struct {
Channels channels `toml:"channels"`
}

func loadChansConfig(chanConfigPath string) map[string]bool {
data, err := ioutil.ReadFile(chanConfigPath)
if err != nil {
log.Fatal(err)
}

var chanCfg chanConfig
if err := toml.Unmarshal(data, &chanCfg); err != nil {
log.Fatal(err)
}

chans := map[string]bool{}
for _, ch := range chanCfg.Channels.List {
chans[ch] = true
}

return chans
}

func makeMetrics() (*kitprometheus.Counter, *kitprometheus.Summary) {
counter := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "influxdb",
Expand Down
Loading

0 comments on commit 46aadcf

Please sign in to comment.