Skip to content

Commit

Permalink
Add OAuth2 support to HTTP output plugin (influxdata#4536)
Browse files Browse the repository at this point in the history
  • Loading branch information
vikrant6 authored and rgitzel committed Oct 17, 2018
1 parent fdc9ece commit 800fafd
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 17 deletions.
25 changes: 24 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,7 @@
[[constraint]]
name = "github.com/Azure/go-autorest"
version = "10.12.0"

[[constraint]]
branch = "master"
name = "golang.org/x/oauth2"
1 change: 1 addition & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ following works:
- github.com/zensqlmonitor/go-mssqldb [BSD](https://github.com/zensqlmonitor/go-mssqldb/blob/master/LICENSE.txt)
- golang.org/x/crypto [BSD](https://github.com/golang/crypto/blob/master/LICENSE)
- golang.org/x/net [BSD](https://go.googlesource.com/net/+/master/LICENSE)
- golang.org/x/oauth2 [BSD](https://go.googlesource.com/oauth2/+/master/LICENSE)
- golang.org/x/text [BSD](https://go.googlesource.com/text/+/master/LICENSE)
- golang.org/x/sys [BSD](https://go.googlesource.com/sys/+/master/LICENSE)
- google.golang.org/grpc [APACHE](https://github.com/google/grpc-go/blob/master/LICENSE)
Expand Down
8 changes: 7 additions & 1 deletion plugins/outputs/http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ data formats. For data_formats that support batching, metrics are sent in batch
# username = "username"
# password = "pa$$word"

## OAuth2 Client Credentials Grant
# client_id = "clientid"
# client_secret = "secret"
# token_url = "https://indentityprovider/oauth2/v1/token"
# scopes = ["urn:opc:idm:__myscopes__"]

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
Expand All @@ -33,7 +39,7 @@ data formats. For data_formats that support batching, metrics are sent in batch
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
# data_format = "influx"

## Additional HTTP headers
# [outputs.http.headers]
# # Should be set manually to "application/json" for json data_format
Expand Down
66 changes: 51 additions & 15 deletions plugins/outputs/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -13,6 +14,8 @@ import (
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)

var sampleConfig = `
Expand All @@ -29,6 +32,12 @@ var sampleConfig = `
# username = "username"
# password = "pa$$word"
## OAuth2 Client Credentials Grant
# client_id = "clientid"
# client_secret = "secret"
# token_url = "https://indentityprovider/oauth2/v1/token"
# scopes = ["urn:opc:idm:__myscopes__"]
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
Expand All @@ -41,7 +50,7 @@ var sampleConfig = `
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
# data_format = "influx"
## Additional HTTP headers
# [outputs.http.headers]
# # Should be set manually to "application/json" for json data_format
Expand All @@ -55,12 +64,16 @@ const (
)

type HTTP struct {
URL string `toml:"url"`
Timeout internal.Duration `toml:"timeout"`
Method string `toml:"method"`
Username string `toml:"username"`
Password string `toml:"password"`
Headers map[string]string `toml:"headers"`
URL string `toml:"url"`
Timeout internal.Duration `toml:"timeout"`
Method string `toml:"method"`
Username string `toml:"username"`
Password string `toml:"password"`
Headers map[string]string `toml:"headers"`
ClientID string `toml:"client_id"`
ClientSecret string `toml:"client_secret"`
TokenURL string `toml:"token_url"`
Scopes []string `toml:"scopes"`
tls.ClientConfig

client *http.Client
Expand All @@ -71,6 +84,34 @@ func (h *HTTP) SetSerializer(serializer serializers.Serializer) {
h.serializer = serializer
}

func (h *HTTP) createClient(ctx context.Context) (*http.Client, error) {
tlsCfg, err := h.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}

client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: http.ProxyFromEnvironment,
},
Timeout: h.Timeout.Duration,
}

if h.ClientID != "" && h.ClientSecret != "" && h.TokenURL != "" {
oauthConfig := clientcredentials.Config{
ClientID: h.ClientID,
ClientSecret: h.ClientSecret,
TokenURL: h.TokenURL,
Scopes: h.Scopes,
}
ctx = context.WithValue(ctx, oauth2.HTTPClient, client)
client = oauthConfig.Client(ctx)
}

return client, nil
}

func (h *HTTP) Connect() error {
if h.Method == "" {
h.Method = http.MethodPost
Expand All @@ -84,18 +125,13 @@ func (h *HTTP) Connect() error {
h.Timeout.Duration = defaultClientTimeout
}

tlsCfg, err := h.ClientConfig.TLSConfig()
ctx := context.Background()
client, err := h.createClient(ctx)
if err != nil {
return err
}

h.client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
Proxy: http.ProxyFromEnvironment,
},
Timeout: h.Timeout.Duration,
}
h.client = client

return nil
}
Expand Down
73 changes: 73 additions & 0 deletions plugins/outputs/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,76 @@ func TestBasicAuth(t *testing.T) {
})
}
}

type TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)

func TestOAuthClientCredentialsGrant(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()

var token = "2YotnFZFEjr1zCsicMWpAA"

u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)

tests := []struct {
name string
plugin *HTTP
tokenHandler TestHandlerFunc
handler TestHandlerFunc
}{
{
name: "no credentials",
plugin: &HTTP{
URL: u.String(),
},
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Len(t, r.Header["Authorization"], 0)
w.WriteHeader(http.StatusOK)
},
},
{
name: "success",
plugin: &HTTP{
URL: u.String() + "/write",
ClientID: "howdy",
ClientSecret: "secret",
TokenURL: u.String() + "/token",
Scopes: []string{"urn:opc:idm:__myscopes__"},
},
tokenHandler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
values := url.Values{}
values.Add("access_token", token)
values.Add("token_type", "bearer")
values.Add("expires_in", "3600")
w.Write([]byte(values.Encode()))
},
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
require.Equal(t, []string{"Bearer " + token}, r.Header["Authorization"])
w.WriteHeader(http.StatusOK)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/write":
tt.handler(t, w, r)
case "/token":
tt.tokenHandler(t, w, r)
}
})

serializer := influx.NewSerializer()
tt.plugin.SetSerializer(serializer)
err = tt.plugin.Connect()
require.NoError(t, err)

err = tt.plugin.Write([]telegraf.Metric{getMetric()})
require.NoError(t, err)
})
}
}

0 comments on commit 800fafd

Please sign in to comment.