Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MF-1257 - Access messages from readers endpoint with user access token #1470

Merged
merged 40 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e778278
remove owner id
mteodor Dec 15, 2020
51fc5e1
add user auth for db reader
mteodor Jun 14, 2021
0809e6f
add user auth for db reader
mteodor Jun 14, 2021
5c8303f
enable mongodb reader for user token reading
mteodor Oct 6, 2021
a1b83c0
use uuid check for auth switch between thing key and user tok
mteodor Oct 6, 2021
d43dbb8
enable user token reading
mteodor Oct 6, 2021
066ece8
revert to correct version
mteodor Oct 6, 2021
1f55612
fix endpoint test, add additional tests
mteodor Oct 7, 2021
8cadbb5
remove logs,dead code
mteodor Oct 8, 2021
7400a4f
fix logging messages
mteodor Oct 8, 2021
1280d10
remove auth interface, add authorization header type
mteodor Oct 19, 2021
2e677bb
update api doc
mteodor Oct 19, 2021
de14e8c
remove unused package
mteodor Oct 19, 2021
612c57b
some refactor of cases for authorization switch
mteodor Oct 22, 2021
f401adc
correct description in openapi
mteodor Oct 26, 2021
8b21797
fix endpoint test to match auth service change
mteodor Nov 5, 2021
f5be079
some rename
mteodor Nov 23, 2021
dff4263
initialize auth url
mteodor Dec 13, 2021
c7976c2
add env variables for auth service
mteodor Dec 21, 2021
9dafb25
fix spelling
mteodor Dec 29, 2021
31ed39e
Things prefix and no prefix for Thing authorization, Bearer for user
mteodor Dec 30, 2021
5e13492
update readme file
mteodor Jan 14, 2022
a2fd9e8
fix default things grpc port
mteodor Jan 14, 2022
3f45c5e
enable user reading for timescaledb
mteodor Jan 27, 2022
39c94b7
remove not used error
mteodor Jan 28, 2022
427d276
improve errors
mteodor Jan 28, 2022
cf7b6ab
refactor authorize
mteodor Feb 1, 2022
fabdb8d
add chanID check
mteodor Feb 1, 2022
181d0ce
inline some error checking
mteodor Feb 2, 2022
aa5f3c1
fixing errors
mteodor Feb 3, 2022
c18f628
fixing errors
mteodor Feb 3, 2022
fbadbb2
improve test case description
mteodor Feb 5, 2022
88edad2
remove test code
mteodor Feb 5, 2022
967e9c2
dont inline
mteodor Feb 5, 2022
4ef247a
refactor a bit encodeError
mteodor Feb 8, 2022
d0b5c8b
remove unused error
mteodor Feb 8, 2022
ab541c4
remove unused error
mteodor Feb 8, 2022
50dad70
fix things auth grpc url
mteodor Feb 8, 2022
6a6b930
rename variables for header prefix
mteodor Feb 9, 2022
8665339
Merge branch 'master' into user-reading
dborovcanin Feb 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion api/openapi/readers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ components:
parameters:
Authorization:
name: Authorization
description: Thing access token.
description: |
Thing or User access token:
* For thing access use "Authorization: Thing <thing_key>"
* For user access use "Authorization: Bearer <user_token>"
in: header
schema:
type: string
Expand Down
59 changes: 54 additions & 5 deletions cmd/cassandra-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/gocql/gocql"
"github.com/mainflux/mainflux"
authapi "github.com/mainflux/mainflux/auth/api/grpc"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/readers"
"github.com/mainflux/mainflux/readers/api"
Expand Down Expand Up @@ -46,8 +47,10 @@ const (
defServerCert = ""
defServerKey = ""
defJaegerURL = ""
defThingsAuthURL = "localhost:8181"
defThingsAuthURL = "localhost:8183"
defThingsAuthTimeout = "1s"
defUsersAuthURL = "localhost:8181"
defUsersAuthTimeout = "1s"

envLogLevel = "MF_CASSANDRA_READER_LOG_LEVEL"
envPort = "MF_CASSANDRA_READER_PORT"
Expand All @@ -63,6 +66,8 @@ const (
envJaegerURL = "MF_JAEGER_URL"
envThingsAuthURL = "MF_THINGS_AUTH_GRPC_URL"
envThingsAuthTimeout = "MF_THINGS_AUTH_GRPC_TIMEOUT"
envUsersAuthURL = "MF_AUTH_GRPC_URL"
envUsersAuthTimeout = "MF_AUTH_GRPC_TIMEOUT"
)

type config struct {
Expand All @@ -75,7 +80,9 @@ type config struct {
serverKey string
jaegerURL string
thingsAuthURL string
usersAuthURL string
dborovcanin marked this conversation as resolved.
Show resolved Hide resolved
thingsAuthTimeout time.Duration
usersAuthTimeout time.Duration
}

func main() {
Expand All @@ -96,11 +103,19 @@ func main() {
defer thingsCloser.Close()

tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)
authTracer, authCloser := initJaeger("auth", cfg.jaegerURL, logger)
defer authCloser.Close()

authConn := connectToAuth(cfg, logger)
defer authConn.Close()

auth := authapi.NewClient(authTracer, authConn, cfg.usersAuthTimeout)

repo := newService(session, logger)

errs := make(chan error, 2)

go startHTTPServer(repo, tc, cfg, errs, logger)
go startHTTPServer(repo, tc, auth, cfg, errs, logger)

go func() {
c := make(chan os.Signal)
Expand All @@ -112,6 +127,32 @@ func main() {
logger.Error(fmt.Sprintf("Cassandra reader service terminated: %s", err))
}

func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn {
var opts []grpc.DialOption
logger.Info("Connecting to auth via gRPC")
if cfg.clientTLS {
if cfg.caCerts != "" {
tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "")
if err != nil {
logger.Error(fmt.Sprintf("Failed to create tls credentials: %s", err))
os.Exit(1)
}
opts = append(opts, grpc.WithTransportCredentials(tpc))
}
} else {
opts = append(opts, grpc.WithInsecure())
logger.Info("gRPC communication is not encrypted")
}

conn, err := grpc.Dial(cfg.usersAuthURL, opts...)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to auth service: %s", err))
os.Exit(1)
}
logger.Info(fmt.Sprintf("Established gRPC connection to things via gRPC: %s", cfg.usersAuthURL))
return conn
}

func loadConfig() config {
dbPort, err := strconv.Atoi(mainflux.Env(envDBPort, defDBPort))
if err != nil {
Expand All @@ -136,6 +177,11 @@ func loadConfig() config {
log.Fatalf("Invalid %s value: %s", envThingsAuthTimeout, err.Error())
}

usersAuthTimeout, err := time.ParseDuration(mainflux.Env(envUsersAuthTimeout, defUsersAuthTimeout))
if err != nil {
log.Fatalf("Invalid %s value: %s", envThingsAuthTimeout, err.Error())
}

return config{
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
Expand All @@ -146,6 +192,8 @@ func loadConfig() config {
serverKey: mainflux.Env(envServerKey, defServerKey),
jaegerURL: mainflux.Env(envJaegerURL, defJaegerURL),
thingsAuthURL: mainflux.Env(envThingsAuthURL, defThingsAuthURL),
usersAuthURL: mainflux.Env(envUsersAuthURL, defUsersAuthURL),
usersAuthTimeout: usersAuthTimeout,
thingsAuthTimeout: authTimeout,
}
}
Expand Down Expand Up @@ -181,6 +229,7 @@ func connectToThings(cfg config, logger logger.Logger) *grpc.ClientConn {
logger.Error(fmt.Sprintf("Failed to connect to things service: %s", err))
os.Exit(1)
}
logger.Info(fmt.Sprintf("Established gRPC connection to things via gRPC: %s", cfg.thingsAuthURL))
return conn
}

Expand Down Expand Up @@ -230,14 +279,14 @@ func newService(session *gocql.Session, logger logger.Logger) readers.MessageRep
return repo
}

func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, cfg config, errs chan error, logger logger.Logger) {
func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, ac mainflux.AuthServiceClient, cfg config, errs chan error, logger logger.Logger) {
p := fmt.Sprintf(":%s", cfg.port)
if cfg.serverCert != "" || cfg.serverKey != "" {
logger.Info(fmt.Sprintf("Cassandra reader service started using https on port %s with cert %s key %s",
cfg.port, cfg.serverCert, cfg.serverKey))
errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, api.MakeHandler(repo, tc, "cassandra-reader"))
errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, api.MakeHandler(repo, tc, ac, "cassandra-reader"))
return
}
logger.Info(fmt.Sprintf("Cassandra reader service started, exposed port %s", cfg.port))
errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, "cassandra-reader"))
errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, ac, "cassandra-reader"))
}
2 changes: 1 addition & 1 deletion cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
defClientTLS = "false"
defCACerts = ""
defJaegerURL = ""
defThingsAuthURL = "localhost:8181"
defThingsAuthURL = "localhost:8183"
defThingsAuthTimeout = "1s"

envPort = "MF_COAP_ADAPTER_PORT"
Expand Down
2 changes: 1 addition & 1 deletion cmd/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
defPort = "8180"
defNatsURL = "nats://localhost:4222"
defJaegerURL = ""
defThingsAuthURL = "localhost:8181"
defThingsAuthURL = "localhost:8183"
defThingsAuthTimeout = "1s"

envLogLevel = "MF_HTTP_ADAPTER_LOG_LEVEL"
Expand Down
60 changes: 55 additions & 5 deletions cmd/influxdb-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
authapi "github.com/mainflux/mainflux/auth/api/grpc"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/readers"
"github.com/mainflux/mainflux/readers/api"
Expand All @@ -40,8 +41,10 @@ const (
defServerCert = ""
defServerKey = ""
defJaegerURL = ""
defThingsAuthURL = "localhost:8181"
defThingsAuthURL = "localhost:8183"
defThingsAuthTimeout = "1s"
defUsersAuthURL = "localhost:8181"
defUsersAuthTimeout = "1s"

envLogLevel = "MF_INFLUX_READER_LOG_LEVEL"
envPort = "MF_INFLUX_READER_PORT"
Expand All @@ -57,6 +60,8 @@ const (
envJaegerURL = "MF_JAEGER_URL"
envThingsAuthURL = "MF_THINGS_AUTH_GRPC_URL"
envThingsAuthTimeout = "MF_THINGS_AUTH_GRPC_TIMEOUT"
envAuthURL = "MF_AUTH_GRPC_URL"
envUsersAuthTimeout = "MF_AUTH_GRPC_TIMEOUT"
)

type config struct {
Expand All @@ -73,7 +78,9 @@ type config struct {
serverKey string
jaegerURL string
thingsAuthURL string
usersAuthURL string
thingsAuthTimeout time.Duration
usersAuthTimeout time.Duration
}

func main() {
Expand All @@ -90,6 +97,14 @@ func main() {

tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)

authTracer, authCloser := initJaeger("auth", cfg.jaegerURL, logger)
defer authCloser.Close()

authConn := connectToAuth(cfg, logger)
defer authConn.Close()

auth := authapi.NewClient(authTracer, authConn, cfg.usersAuthTimeout)

client, err := influxdata.NewHTTPClient(clientCfg)
if err != nil {
logger.Error(fmt.Sprintf("Failed to create InfluxDB client: %s", err))
Expand All @@ -106,12 +121,38 @@ func main() {
errs <- fmt.Errorf("%s", <-c)
}()

go startHTTPServer(repo, tc, cfg, logger, errs)
go startHTTPServer(repo, tc, auth, cfg, logger, errs)

err = <-errs
logger.Error(fmt.Sprintf("InfluxDB writer service terminated: %s", err))
}

func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn {
var opts []grpc.DialOption
logger.Info("Connecting to auth via gRPC")
if cfg.clientTLS {
if cfg.caCerts != "" {
tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "")
if err != nil {
logger.Error(fmt.Sprintf("Failed to create tls credentials: %s", err))
os.Exit(1)
}
opts = append(opts, grpc.WithTransportCredentials(tpc))
}
} else {
opts = append(opts, grpc.WithInsecure())
logger.Info("gRPC communication is not encrypted")
}

conn, err := grpc.Dial(cfg.usersAuthURL, opts...)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to auth service: %s", err))
os.Exit(1)
}
logger.Info("Established gRPC connection to auth via gRPC")
return conn
}

func loadConfigs() (config, influxdata.HTTPConfig) {
tls, err := strconv.ParseBool(mainflux.Env(envClientTLS, defClientTLS))
if err != nil {
Expand All @@ -123,6 +164,11 @@ func loadConfigs() (config, influxdata.HTTPConfig) {
log.Fatalf("Invalid %s value: %s", envThingsAuthTimeout, err.Error())
}

userAuthTimeout, err := time.ParseDuration(mainflux.Env(envUsersAuthTimeout, defUsersAuthTimeout))
if err != nil {
log.Fatalf("Invalid %s value: %s", envThingsAuthTimeout, err.Error())
}

cfg := config{
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
Expand All @@ -138,6 +184,8 @@ func loadConfigs() (config, influxdata.HTTPConfig) {
jaegerURL: mainflux.Env(envJaegerURL, defJaegerURL),
thingsAuthURL: mainflux.Env(envThingsAuthURL, defThingsAuthURL),
thingsAuthTimeout: authTimeout,
usersAuthURL: mainflux.Env(envAuthURL, defUsersAuthURL),
usersAuthTimeout: userAuthTimeout,
}

clientCfg := influxdata.HTTPConfig{
Expand All @@ -151,6 +199,7 @@ func loadConfigs() (config, influxdata.HTTPConfig) {

func connectToThings(cfg config, logger logger.Logger) *grpc.ClientConn {
var opts []grpc.DialOption
logger.Info("connecting to things via gRPC")
if cfg.clientTLS {
if cfg.caCerts != "" {
tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "")
Expand All @@ -170,6 +219,7 @@ func connectToThings(cfg config, logger logger.Logger) *grpc.ClientConn {
logger.Error(fmt.Sprintf("Failed to connect to things service: %s", err))
os.Exit(1)
}
logger.Info(fmt.Sprintf("Established gRPC connection to things via gRPC: %s", cfg.thingsAuthURL))
return conn
}

Expand Down Expand Up @@ -219,14 +269,14 @@ func newService(client influxdata.Client, dbName string, logger logger.Logger) r
return repo
}

func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, cfg config, logger logger.Logger, errs chan error) {
func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, ac mainflux.AuthServiceClient, cfg config, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", cfg.port)
if cfg.serverCert != "" || cfg.serverKey != "" {
logger.Info(fmt.Sprintf("InfluxDB reader service started using https on port %s with cert %s key %s",
cfg.port, cfg.serverCert, cfg.serverKey))
errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, api.MakeHandler(repo, tc, "influxdb-reader"))
errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, api.MakeHandler(repo, tc, ac, "influxdb-reader"))
return
}
logger.Info(fmt.Sprintf("InfluxDB reader service started, exposed port %s", cfg.port))
errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, "influxdb-reader"))
errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, ac, "influxdb-reader"))
}
Loading