Skip to content

Commit

Permalink
NOISSUE - Add SMPP notifier (absmach#1464)
Browse files Browse the repository at this point in the history
* Add SMPP notifier

Signed-off-by: Ivan Milosevic <iva@blokovi.com>

* fix readme
remove env file

Signed-off-by: Ivan Milosevic <iva@blokovi.com>

* resolve conversations

Signed-off-by: Ivan Milosevic <iva@blokovi.com>

* Remove debug log

Signed-off-by: Ivan Milosevic <iva@blokovi.com>

* Rename transmiter and transformer fields in struct

Signed-off-by: Ivan Milosevic <iva@blokovi.com>

* fix typo

Signed-off-by: Ivan Milosevic <iva@blokovi.com>
  • Loading branch information
blokovi authored and buraksekili committed Oct 18, 2021
1 parent a54bf65 commit 597f950
Show file tree
Hide file tree
Showing 61 changed files with 15,501 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ MF_DOCKER_IMAGE_NAME_PREFIX ?= mainflux
BUILD_DIR = build
SERVICES = users things http coap lora influxdb-writer influxdb-reader mongodb-writer \
mongodb-reader cassandra-writer cassandra-reader postgres-writer postgres-reader cli \
bootstrap opcua auth twins mqtt provision certs smtp-notifier
bootstrap opcua auth twins mqtt provision certs smtp-notifier smpp-notifier
DOCKERS = $(addprefix docker_,$(SERVICES))
DOCKERS_DEV = $(addprefix docker_dev_,$(SERVICES))
CGO_ENABLED ?= 0
Expand Down
335 changes: 335 additions & 0 deletions cmd/smpp-notifier/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

package main

import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"

kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/jmoiron/sqlx"
"github.com/mainflux/mainflux"
authapi "github.com/mainflux/mainflux/auth/api/grpc"
"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/consumers/notifiers"
"github.com/mainflux/mainflux/consumers/notifiers/api"
"github.com/mainflux/mainflux/consumers/notifiers/postgres"

mfsmpp "github.com/mainflux/mainflux/consumers/notifiers/smpp"
"github.com/mainflux/mainflux/consumers/notifiers/tracing"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/ulid"
opentracing "github.com/opentracing/opentracing-go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
jconfig "github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

const (
defLogLevel = "error"
defDBHost = "localhost"
defDBPort = "5432"
defDBUser = "mainflux"
defDBPass = "mainflux"
defDB = "subscriptions"
defConfigPath = "/config.toml"
defDBSSLMode = "disable"
defDBSSLCert = ""
defDBSSLKey = ""
defDBSSLRootCert = ""
defHTTPPort = "8907"
defServerCert = ""
defServerKey = ""
defFrom = ""
defJaegerURL = ""
defNatsURL = "nats://localhost:4222"

defSmppAddress = ""
defSmppUsername = ""
defSmppPassword = ""
defSmppSystemType = ""
defSmppSrcAddrTON = "0"
defSmppDstAddrTON = "0"
defSmppSrcAddrNPI = "0"
defSmppDstAddrNPI = "0"

defAuthTLS = "false"
defAuthCACerts = ""
defAuthURL = "localhost:8181"
defAuthTimeout = "1s"

envLogLevel = "MF_SMPP_NOTIFIER_LOG_LEVEL"
envDBHost = "MF_SMPP_NOTIFIER_DB_HOST"
envDBPort = "MF_SMPP_NOTIFIER_DB_PORT"
envDBUser = "MF_SMPP_NOTIFIER_DB_USER"
envDBPass = "MF_SMPP_NOTIFIER_DB_PASS"
envDB = "MF_SMPP_NOTIFIER_DB"
envConfigPath = "MF_SMPP_NOTIFIER_WRITER_CONFIG_PATH"
envDBSSLMode = "MF_SMPP_NOTIFIER_DB_SSL_MODE"
envDBSSLCert = "MF_SMPP_NOTIFIER_DB_SSL_CERT"
envDBSSLKey = "MF_SMPP_NOTIFIER_DB_SSL_KEY"
envDBSSLRootCert = "MF_SMPP_NOTIFIER_DB_SSL_ROOT_CERT"
envHTTPPort = "MF_SMPP_NOTIFIER_HTTP_PORT"
envServerCert = "MF_SMPP_NOTIFIER_SERVER_CERT"
envServerKey = "MF_SMPP_NOTIFIER_SERVER_KEY"
envFrom = "MF_SMPP_NOTIFIER_SOURCE_ADDR"
envJaegerURL = "MF_JAEGER_URL"
envNatsURL = "MF_NATS_URL"

envSmppAddress = "MF_SMPP_ADDRESS"
envSmppUsername = "MF_SMPP_USERNAME"
envSmppPassword = "MF_SMPP_PASSWORD"
envSmppSystemType = "MF_SMPP_SYSTEM_TYPE"
envSmppSrcAddrTON = "MF_SMPP_SRC_ADDR_TON"
envSmppDstAddrTON = "MF_SMPP_DST_ADDR_TON"
envSmppSrcAddrNPI = "MF_SMPP_SRC_ADDR_NPI"
envSmppDstAddrNPI = "MF_SMPP_DST_ADDR_NPI"

envAuthTLS = "MF_AUTH_CLIENT_TLS"
envAuthCACerts = "MF_AUTH_CA_CERTS"
envAuthURL = "MF_AUTH_GRPC_URL"
envAuthTimeout = "MF_AUTH_GRPC_TIMEOUT"
)

type config struct {
natsURL string
configPath string
logLevel string
dbConfig postgres.Config
smppConf mfsmpp.Config
from string
httpPort string
serverCert string
serverKey string
jaegerURL string
authTLS bool
authCACerts string
authURL string
authTimeout time.Duration
}

func main() {
cfg := loadConfig()

logger, err := logger.New(os.Stdout, cfg.logLevel)
if err != nil {
log.Fatalf(err.Error())
}

db := connectToDB(cfg.dbConfig, logger)
defer db.Close()

pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
}
defer pubSub.Close()

authTracer, closer := initJaeger("auth", cfg.jaegerURL, logger)
defer closer.Close()

auth, close := connectToAuth(cfg, authTracer, logger)
if close != nil {
defer close()
}

tracer, closer := initJaeger("smpp-notifier", cfg.jaegerURL, logger)
defer closer.Close()

dbTracer, dbCloser := initJaeger("smpp-notifier_db", cfg.jaegerURL, logger)
defer dbCloser.Close()

svc := newService(db, dbTracer, auth, cfg, logger)
errs := make(chan error, 2)

if err = consumers.Start(pubSub, svc, nil, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err))
}

go startHTTPServer(tracer, svc, cfg.httpPort, cfg.serverCert, cfg.serverKey, logger, errs)

go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()

err = <-errs
logger.Error(fmt.Sprintf("Users service terminated: %s", err))
}

func loadConfig() config {
authTimeout, err := time.ParseDuration(mainflux.Env(envAuthTimeout, defAuthTimeout))
if err != nil {
log.Fatalf("Invalid %s value: %s", envAuthTimeout, err.Error())
}

tls, err := strconv.ParseBool(mainflux.Env(envAuthTLS, defAuthTLS))
if err != nil {
log.Fatalf("Invalid value passed for %s\n", envAuthTLS)
}

dbConfig := postgres.Config{
Host: mainflux.Env(envDBHost, defDBHost),
Port: mainflux.Env(envDBPort, defDBPort),
User: mainflux.Env(envDBUser, defDBUser),
Pass: mainflux.Env(envDBPass, defDBPass),
Name: mainflux.Env(envDB, defDB),
SSLMode: mainflux.Env(envDBSSLMode, defDBSSLMode),
SSLCert: mainflux.Env(envDBSSLCert, defDBSSLCert),
SSLKey: mainflux.Env(envDBSSLKey, defDBSSLKey),
SSLRootCert: mainflux.Env(envDBSSLRootCert, defDBSSLRootCert),
}

saton, err := strconv.ParseUint(mainflux.Env(envSmppSrcAddrTON, defSmppSrcAddrTON), 10, 8)
if err != nil {
log.Fatalf("Invalid value passed for %s", envSmppSrcAddrTON)
}
daton, err := strconv.ParseUint(mainflux.Env(envSmppDstAddrTON, defSmppDstAddrTON), 10, 8)
if err != nil {
log.Fatalf("Invalid value passed for %s", envSmppDstAddrTON)
}
sanpi, err := strconv.ParseUint(mainflux.Env(envSmppSrcAddrNPI, defSmppSrcAddrNPI), 10, 8)
if err != nil {
log.Fatalf("Invalid value passed for %s", envSmppSrcAddrNPI)
}
danpi, err := strconv.ParseUint(mainflux.Env(envSmppDstAddrNPI, defSmppDstAddrNPI), 10, 8)
if err != nil {
log.Fatalf("Invalid value passed for %s", envSmppDstAddrNPI)
}

smppConf := mfsmpp.Config{
Address: mainflux.Env(envSmppAddress, defSmppAddress),
Username: mainflux.Env(envSmppUsername, defSmppUsername),
Password: mainflux.Env(envSmppPassword, defSmppPassword),
SystemType: mainflux.Env(envSmppSystemType, defSmppSystemType),
SourceAddrTON: uint8(saton),
DestAddrTON: uint8(daton),
SourceAddrNPI: uint8(sanpi),
DestAddrNPI: uint8(danpi),
}

return config{
logLevel: mainflux.Env(envLogLevel, defLogLevel),
natsURL: mainflux.Env(envNatsURL, defNatsURL),
configPath: mainflux.Env(envConfigPath, defConfigPath),
dbConfig: dbConfig,
smppConf: smppConf,
from: mainflux.Env(envFrom, defFrom),
httpPort: mainflux.Env(envHTTPPort, defHTTPPort),
serverCert: mainflux.Env(envServerCert, defServerCert),
serverKey: mainflux.Env(envServerKey, defServerKey),
jaegerURL: mainflux.Env(envJaegerURL, defJaegerURL),
authTLS: tls,
authCACerts: mainflux.Env(envAuthCACerts, defAuthCACerts),
authURL: mainflux.Env(envAuthURL, defAuthURL),
authTimeout: authTimeout,
}

}

func initJaeger(svcName, url string, logger logger.Logger) (opentracing.Tracer, io.Closer) {
if url == "" {
return opentracing.NoopTracer{}, ioutil.NopCloser(nil)
}

tracer, closer, err := jconfig.Configuration{
ServiceName: svcName,
Sampler: &jconfig.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &jconfig.ReporterConfig{
LocalAgentHostPort: url,
LogSpans: true,
},
}.NewTracer()
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
os.Exit(1)
}

return tracer, closer
}

func connectToDB(dbConfig postgres.Config, logger logger.Logger) *sqlx.DB {
db, err := postgres.Connect(dbConfig)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to postgres: %s", err))
os.Exit(1)
}
return db
}

func connectToAuth(cfg config, tracer opentracing.Tracer, logger logger.Logger) (mainflux.AuthServiceClient, func() error) {
var opts []grpc.DialOption
if cfg.authTLS {
if cfg.authCACerts != "" {
tpc, err := credentials.NewClientTLSFromFile(cfg.authCACerts, "")
if err != nil {
logger.Error(fmt.Sprintf("Failed to create tls credentials: %s", err))
os.Exit(1)
}
opts = append(opts, grpc.WithTransportCredentials(tpc))
}
} else {
opts = append(opts, grpc.WithInsecure())
logger.Info("gRPC communication is not encrypted")
}

conn, err := grpc.Dial(cfg.authURL, opts...)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to auth service: %s", err))
os.Exit(1)
}

return authapi.NewClient(tracer, conn, cfg.authTimeout), conn.Close
}

func newService(db *sqlx.DB, tracer opentracing.Tracer, auth mainflux.AuthServiceClient, c config, logger logger.Logger) notifiers.Service {
database := postgres.NewDatabase(db)
repo := tracing.New(postgres.New(database), tracer)
idp := ulid.New()
notifier := mfsmpp.New(c.smppConf)
svc := notifiers.New(auth, repo, idp, notifier, c.from)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "notifier",
Subsystem: "smpp",
Name: "request_count",
Help: "Number of requests received.",
}, []string{"method"}),
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "notifier",
Subsystem: "smpp",
Name: "request_latency_microseconds",
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)
return svc
}

func startHTTPServer(tracer opentracing.Tracer, svc notifiers.Service, port string, certFile string, keyFile string, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
if certFile != "" || keyFile != "" {
logger.Info(fmt.Sprintf("SMPP notifier service started using https, cert %s key %s, exposed port %s", certFile, keyFile, port))
errs <- http.ListenAndServeTLS(p, certFile, keyFile, api.MakeHandler(svc, tracer))
} else {
logger.Info(fmt.Sprintf("SMPP notifier service started using http, exposed port %s", port))
errs <- http.ListenAndServe(p, api.MakeHandler(svc, tracer))
}
}
10 changes: 7 additions & 3 deletions cmd/smtp-notifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ const (
defDBSSLCert = ""
defDBSSLKey = ""
defDBSSLRootCert = ""
defHTTPPort = "8180"
defHTTPPort = "8906"
defServerCert = ""
defServerKey = ""
defFrom = ""
defJaegerURL = ""
defNatsURL = "nats://localhost:4222"

Expand Down Expand Up @@ -82,6 +83,7 @@ const (
envHTTPPort = "MF_SMTP_NOTIFIER_PORT"
envServerCert = "MF_SMTP_NOTIFIER_SERVER_CERT"
envServerKey = "MF_SMTP_NOTIFIER_SERVER_KEY"
envFrom = "MF_SMTP_NOTIFIER_FROM_ADDR"
envJaegerURL = "MF_JAEGER_URL"
envNatsURL = "MF_NATS_URL"

Expand All @@ -92,7 +94,7 @@ const (
envEmailSecret = "MF_EMAIL_SECRET"
envEmailFromAddress = "MF_EMAIL_FROM_ADDRESS"
envEmailFromName = "MF_EMAIL_FROM_NAME"
envEmailTemplate = "MF_EMAIL_TEMPLATE"
envEmailTemplate = "MF_SMTP_NOTIFIER_TEMPLATE"

envAuthTLS = "MF_AUTH_CLIENT_TLS"
envAuthCACerts = "MF_AUTH_CA_CERTS"
Expand All @@ -106,6 +108,7 @@ type config struct {
logLevel string
dbConfig postgres.Config
emailConf email.Config
from string
httpPort string
serverCert string
serverKey string
Expand Down Expand Up @@ -207,6 +210,7 @@ func loadConfig() config {
configPath: mainflux.Env(envConfigPath, defConfigPath),
dbConfig: dbConfig,
emailConf: emailConf,
from: mainflux.Env(envFrom, defFrom),
httpPort: mainflux.Env(envHTTPPort, defHTTPPort),
serverCert: mainflux.Env(envServerCert, defServerCert),
serverKey: mainflux.Env(envServerKey, defServerKey),
Expand Down Expand Up @@ -289,7 +293,7 @@ func newService(db *sqlx.DB, tracer opentracing.Tracer, auth mainflux.AuthServic
}

notifier := smtp.New(agent)
svc := notifiers.New(auth, repo, idp, notifier)
svc := notifiers.New(auth, repo, idp, notifier, c.from)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
Expand Down
Loading

0 comments on commit 597f950

Please sign in to comment.