From 95bd195042d6bbb90ae4c9b01fa9cd731bb0ca10 Mon Sep 17 00:00:00 2001 From: Greg Date: Wed, 12 Sep 2018 15:48:59 -0600 Subject: [PATCH] Add influx v2 output plugin (#4645) --- plugins/outputs/all/all.go | 1 + plugins/outputs/influxdb_v2/README.md | 51 +++ plugins/outputs/influxdb_v2/http.go | 292 ++++++++++++++++++ .../outputs/influxdb_v2/http_internal_test.go | 59 ++++ plugins/outputs/influxdb_v2/http_test.go | 49 +++ plugins/outputs/influxdb_v2/influxdb.go | 201 ++++++++++++ plugins/outputs/influxdb_v2/influxdb_test.go | 103 ++++++ 7 files changed, 756 insertions(+) create mode 100644 plugins/outputs/influxdb_v2/README.md create mode 100644 plugins/outputs/influxdb_v2/http.go create mode 100644 plugins/outputs/influxdb_v2/http_internal_test.go create mode 100644 plugins/outputs/influxdb_v2/http_test.go create mode 100644 plugins/outputs/influxdb_v2/influxdb.go create mode 100644 plugins/outputs/influxdb_v2/influxdb_test.go diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 4d49c0c6e1f27..24748c53ee7ee 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -15,6 +15,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/graylog" _ "github.com/influxdata/telegraf/plugins/outputs/http" _ "github.com/influxdata/telegraf/plugins/outputs/influxdb" + _ "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" _ "github.com/influxdata/telegraf/plugins/outputs/instrumental" _ "github.com/influxdata/telegraf/plugins/outputs/kafka" _ "github.com/influxdata/telegraf/plugins/outputs/kinesis" diff --git a/plugins/outputs/influxdb_v2/README.md b/plugins/outputs/influxdb_v2/README.md new file mode 100644 index 0000000000000..795f4467c415b --- /dev/null +++ b/plugins/outputs/influxdb_v2/README.md @@ -0,0 +1,51 @@ +# InfluxDB Output Plugin + +This InfluxDB output plugin writes metrics to the [InfluxDB 2.0](https://github.com/influxdata/platform) HTTP service. + +### Configuration: + +```toml +# Configuration for sending metrics to InfluxDB 2.0 +[[outputs.influxdb_v2]] + ## The URLs of the InfluxDB cluster nodes. + ## + ## Multiple URLs can be specified for a single cluster, only ONE of the + ## urls will be written to each interval. + urls = ["http://127.0.0.1:9999"] + + ## Token for authentication. + token = "" + + ## Organization is the name of the organization you wish to write to. + organization = "" + + ## Bucket to the name fo the bucketwrite into; must exist. + bucket = "" + + ## Timeout for HTTP messages. + # timeout = "5s" + + ## Additional HTTP headers + # http_headers = {"X-Special-Header" = "Special-Value"} + + ## HTTP Proxy override, if unset values the standard proxy environment + ## variables are consulted to determine which proxy, if any, should be used. + # http_proxy = "http://corporate.proxy:3128" + + ## HTTP User-Agent + # user_agent = "telegraf" + + ## Content-Encoding for write request body, can be set to "gzip" to + ## compress body or "identity" to apply no encoding. + # content_encoding = "gzip" + + ## Enable or disable uint support for writing uints influxdb 2.0. + # influx_uint_support = false + + ## Optional TLS Config for use on HTTP connections. + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +``` diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go new file mode 100644 index 0000000000000..1e7061a270a39 --- /dev/null +++ b/plugins/outputs/influxdb_v2/http.go @@ -0,0 +1,292 @@ +package influxdb_v2 + +import ( + "compress/gzip" + "context" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net" + "net/http" + "net/url" + "path" + "strconv" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/serializers/influx" +) + +type APIErrorType int + +type APIError struct { + StatusCode int + Title string + Description string + Type APIErrorType +} + +func (e APIError) Error() string { + if e.Description != "" { + return fmt.Sprintf("%s: %s", e.Title, e.Description) + } + return e.Title +} + +const ( + defaultRequestTimeout = time.Second * 5 + defaultMaxWait = 10 // seconds + defaultDatabase = "telegraf" + defaultUserAgent = "telegraf" +) + +type HTTPConfig struct { + URL *url.URL + Token string + Organization string + Bucket string + Timeout time.Duration + Headers map[string]string + Proxy *url.URL + UserAgent string + ContentEncoding string + TLSConfig *tls.Config + + Serializer *influx.Serializer +} + +type httpClient struct { + WriteURL string + ContentEncoding string + Timeout time.Duration + Headers map[string]string + + client *http.Client + serializer *influx.Serializer + url *url.URL + retryTime time.Time +} + +func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { + if config.URL == nil { + return nil, ErrMissingURL + } + + timeout := config.Timeout + if timeout == 0 { + timeout = defaultRequestTimeout + } + + userAgent := config.UserAgent + if userAgent == "" { + userAgent = defaultUserAgent + } + + var headers = make(map[string]string, len(config.Headers)+2) + headers["User-Agent"] = userAgent + headers["Authorization"] = "Token " + config.Token + for k, v := range config.Headers { + headers[k] = v + } + + var proxy func(*http.Request) (*url.URL, error) + if config.Proxy != nil { + proxy = http.ProxyURL(config.Proxy) + } else { + proxy = http.ProxyFromEnvironment + } + + serializer := config.Serializer + if serializer == nil { + serializer = influx.NewSerializer() + } + + writeURL, err := makeWriteURL( + *config.URL, + config.Organization, + config.Bucket) + if err != nil { + return nil, err + } + + var transport *http.Transport + switch config.URL.Scheme { + case "http", "https": + transport = &http.Transport{ + Proxy: proxy, + TLSClientConfig: config.TLSConfig, + } + case "unix": + transport = &http.Transport{ + Dial: func(_, _ string) (net.Conn, error) { + return net.DialTimeout( + config.URL.Scheme, + config.URL.Path, + timeout, + ) + }, + } + default: + return nil, fmt.Errorf("unsupported scheme %q", config.URL.Scheme) + } + + client := &httpClient{ + serializer: serializer, + client: &http.Client{ + Timeout: timeout, + Transport: transport, + }, + url: config.URL, + WriteURL: writeURL, + ContentEncoding: config.ContentEncoding, + Timeout: timeout, + Headers: headers, + } + return client, nil +} + +// URL returns the origin URL that this client connects too. +func (c *httpClient) URL() string { + return c.url.String() +} + +type genericRespError struct { + Code string + Message string + Line *int32 + MaxLength *int32 +} + +func (g genericRespError) Error() string { + errString := fmt.Sprintf("%s: %s", g.Code, g.Message) + if g.Line != nil { + return fmt.Sprintf("%s - line[%d]", errString, g.Line) + } else if g.MaxLength != nil { + return fmt.Sprintf("%s - maxlen[%d]", errString, g.MaxLength) + } + return errString +} + +func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error { + if c.retryTime.After(time.Now()) { + return errors.New("Retry time has not elapsed") + } + reader := influx.NewReader(metrics, c.serializer) + req, err := c.makeWriteRequest(reader) + if err != nil { + return err + } + + resp, err := c.client.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNoContent { + return nil + } + + writeResp := &genericRespError{} + err = json.NewDecoder(resp.Body).Decode(writeResp) + desc := writeResp.Error() + if err != nil { + desc = err.Error() + } + + switch resp.StatusCode { + case http.StatusBadRequest, http.StatusUnauthorized, + http.StatusForbidden, http.StatusRequestEntityTooLarge: + log.Printf("E! [outputs.influxdb_v2] Failed to write metric: %s\n", desc) + return nil + case http.StatusTooManyRequests, http.StatusServiceUnavailable: + retryAfter := resp.Header.Get("Retry-After") + retry, err := strconv.Atoi(retryAfter) + if err != nil { + retry = 0 + } + if retry > defaultMaxWait { + retry = defaultMaxWait + } + c.retryTime = time.Now().Add(time.Duration(retry) * time.Second) + return fmt.Errorf("Waiting %ds for server before sending metric again", retry) + } + + // This is only until platform spec is fully implemented. As of the + // time of writing, there is no error body returned. + if xErr := resp.Header.Get("X-Influx-Error"); xErr != "" { + desc = fmt.Sprintf("%s; %s", desc, xErr) + } + + return &APIError{ + StatusCode: resp.StatusCode, + Title: resp.Status, + Description: desc, + } +} + +func (c *httpClient) makeWriteRequest(body io.Reader) (*http.Request, error) { + var err error + if c.ContentEncoding == "gzip" { + body, err = compressWithGzip(body) + if err != nil { + return nil, err + } + } + + req, err := http.NewRequest("POST", c.WriteURL, body) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "text/plain; charset=utf-8") + c.addHeaders(req) + + if c.ContentEncoding == "gzip" { + req.Header.Set("Content-Encoding", "gzip") + } + + return req, nil +} + +func (c *httpClient) addHeaders(req *http.Request) { + for header, value := range c.Headers { + req.Header.Set(header, value) + } +} + +func compressWithGzip(data io.Reader) (io.Reader, error) { + pipeReader, pipeWriter := io.Pipe() + gzipWriter := gzip.NewWriter(pipeWriter) + var err error + + go func() { + _, err = io.Copy(gzipWriter, data) + gzipWriter.Close() + pipeWriter.Close() + }() + + return pipeReader, err +} + +func makeWriteURL(loc url.URL, org, bucket string) (string, error) { + params := url.Values{} + params.Set("bucket", bucket) + params.Set("org", org) + + switch loc.Scheme { + case "unix": + loc.Scheme = "http" + loc.Host = "127.0.0.1" + loc.Path = "v2/write" + case "http", "https": + loc.Path = path.Join(loc.Path, "v2/write") + default: + return "", fmt.Errorf("unsupported scheme: %q", loc.Scheme) + } + loc.RawQuery = params.Encode() + return loc.String(), nil +} diff --git a/plugins/outputs/influxdb_v2/http_internal_test.go b/plugins/outputs/influxdb_v2/http_internal_test.go new file mode 100644 index 0000000000000..5df51fc85f1e7 --- /dev/null +++ b/plugins/outputs/influxdb_v2/http_internal_test.go @@ -0,0 +1,59 @@ +package influxdb_v2 + +import ( + "io" + "net/url" + "testing" + + "github.com/stretchr/testify/require" +) + +func genURL(u string) *url.URL { + URL, _ := url.Parse(u) + return URL +} + +func TestMakeWriteURL(t *testing.T) { + tests := []struct { + err bool + url *url.URL + act string + }{ + { + url: genURL("http://localhost:9999"), + act: "http://localhost:9999/v2/write?bucket=telegraf&org=influx", + }, + { + url: genURL("unix://var/run/influxd.sock"), + act: "http://127.0.0.1/v2/write?bucket=telegraf&org=influx", + }, + { + err: true, + url: genURL("udp://localhost:9999"), + }, + } + + for i := range tests { + rURL, err := makeWriteURL(*tests[i].url, "influx", "telegraf") + if !tests[i].err { + require.NoError(t, err) + } else { + require.Error(t, err) + t.Log(err) + } + if err == nil { + require.Equal(t, tests[i].act, rURL) + } + } +} + +func TestMakeWriteRequest(t *testing.T) { + reader, _ := io.Pipe() + cli := httpClient{ + WriteURL: "http://localhost:9999/v2/write?bucket=telegraf&org=influx", + ContentEncoding: "gzip", + Headers: map[string]string{"x": "y"}, + } + _, err := cli.makeWriteRequest(reader) + require.NoError(t, err) +} diff --git a/plugins/outputs/influxdb_v2/http_test.go b/plugins/outputs/influxdb_v2/http_test.go new file mode 100644 index 0000000000000..33ff9e24b90e3 --- /dev/null +++ b/plugins/outputs/influxdb_v2/http_test.go @@ -0,0 +1,49 @@ +package influxdb_v2_test + +import ( + "net/url" + "testing" + + influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" + "github.com/stretchr/testify/require" +) + +func genURL(u string) *url.URL { + URL, _ := url.Parse(u) + return URL +} +func TestNewHTTPClient(t *testing.T) { + tests := []struct { + err bool + cfg *influxdb.HTTPConfig + }{ + { + err: true, + cfg: &influxdb.HTTPConfig{}, + }, + { + err: true, + cfg: &influxdb.HTTPConfig{ + URL: genURL("udp://localhost:9999"), + }, + }, + { + cfg: &influxdb.HTTPConfig{ + URL: genURL("unix://var/run/influxd.sock"), + }, + }, + } + + for i := range tests { + client, err := influxdb.NewHTTPClient(tests[i].cfg) + if !tests[i].err { + require.NoError(t, err) + } else { + require.Error(t, err) + t.Log(err) + } + if err == nil { + client.URL() + } + } +} diff --git a/plugins/outputs/influxdb_v2/influxdb.go b/plugins/outputs/influxdb_v2/influxdb.go new file mode 100644 index 0000000000000..886907c035ebd --- /dev/null +++ b/plugins/outputs/influxdb_v2/influxdb.go @@ -0,0 +1,201 @@ +package influxdb_v2 + +import ( + "context" + "errors" + "fmt" + "log" + "math/rand" + "net/url" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers/influx" +) + +var ( + defaultURL = "http://localhost:9999" + + ErrMissingURL = errors.New("missing URL") +) + +var sampleConfig = ` + ## The URLs of the InfluxDB cluster nodes. + ## + ## Multiple URLs can be specified for a single cluster, only ONE of the + ## urls will be written to each interval. + urls = ["http://127.0.0.1:9999"] + + ## Token for authentication. + token = "" + + ## Organization is the name of the organization you wish to write to; must exist. + organization = "" + + ## Bucket to the name fo the bucketwrite into; must exist. + bucket = "" + + ## Timeout for HTTP messages. + # timeout = "5s" + + ## Additional HTTP headers + # http_headers = {"X-Special-Header" = "Special-Value"} + + ## HTTP Proxy override, if unset values the standard proxy environment + ## variables are consulted to determine which proxy, if any, should be used. + # http_proxy = "http://corporate.proxy:3128" + + ## HTTP User-Agent + # user_agent = "telegraf" + + ## Content-Encoding for write request body, can be set to "gzip" to + ## compress body or "identity" to apply no encoding. + # content_encoding = "gzip" + + ## Enable or disable uint support for writing uints influxdb 2.0. + # influx_uint_support = false + + ## Optional TLS Config for use on HTTP connections. + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +` + +type Client interface { + Write(context.Context, []telegraf.Metric) error + + URL() string // for logging +} + +type InfluxDB struct { + URLs []string `toml:"urls"` + Token string `toml:"token"` + Organization string `toml:"organization"` + Bucket string `toml:"bucket"` + Timeout internal.Duration `toml:"timeout"` + HTTPHeaders map[string]string `toml:"http_headers"` + HTTPProxy string `toml:"http_proxy"` + UserAgent string `toml:"user_agent"` + ContentEncoding string `toml:"content_encoding"` + UintSupport bool `toml:"influx_uint_support"` + tls.ClientConfig + + clients []Client + serializer *influx.Serializer +} + +func (i *InfluxDB) Connect() error { + ctx := context.Background() + + if len(i.URLs) == 0 { + i.URLs = append(i.URLs, defaultURL) + } + + i.serializer = influx.NewSerializer() + if i.UintSupport { + i.serializer.SetFieldTypeSupport(influx.UintSupport) + } + + for _, u := range i.URLs { + parts, err := url.Parse(u) + if err != nil { + return fmt.Errorf("error parsing url [%q]: %v", u, err) + } + + var proxy *url.URL + if len(i.HTTPProxy) > 0 { + proxy, err = url.Parse(i.HTTPProxy) + if err != nil { + return fmt.Errorf("error parsing proxy_url [%s]: %v", i.HTTPProxy, err) + } + } + + switch parts.Scheme { + case "http", "https", "unix": + c, err := i.getHTTPClient(ctx, parts, proxy) + if err != nil { + return err + } + + i.clients = append(i.clients, c) + default: + return fmt.Errorf("unsupported scheme [%q]: %q", u, parts.Scheme) + } + } + + return nil +} + +func (i *InfluxDB) Close() error { + return nil +} + +func (i *InfluxDB) Description() string { + return "Configuration for sending metrics to InfluxDB" +} + +func (i *InfluxDB) SampleConfig() string { + return sampleConfig +} + +// Write sends metrics to one of the configured servers, logging each +// unsuccessful. If all servers fail, return an error. +func (i *InfluxDB) Write(metrics []telegraf.Metric) error { + ctx := context.Background() + + var err error + p := rand.Perm(len(i.clients)) + for _, n := range p { + client := i.clients[n] + err = client.Write(ctx, metrics) + if err == nil { + return nil + } + + log.Printf("E! [outputs.influxdb] when writing to [%s]: %v", client.URL(), err) + } + + return errors.New("could not write any address") +} + +func (i *InfluxDB) getHTTPClient(ctx context.Context, url *url.URL, proxy *url.URL) (Client, error) { + tlsConfig, err := i.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + config := &HTTPConfig{ + URL: url, + Token: i.Token, + Organization: i.Organization, + Bucket: i.Bucket, + Timeout: i.Timeout.Duration, + Headers: i.HTTPHeaders, + Proxy: proxy, + UserAgent: i.UserAgent, + ContentEncoding: i.ContentEncoding, + TLSConfig: tlsConfig, + Serializer: i.serializer, + } + + c, err := NewHTTPClient(config) + if err != nil { + return nil, fmt.Errorf("error creating HTTP client [%s]: %v", url, err) + } + + return c, nil +} + +func init() { + outputs.Add("influxdb_v2", func() telegraf.Output { + return &InfluxDB{ + Timeout: internal.Duration{Duration: time.Second * 5}, + ContentEncoding: "gzip", + } + }) +} diff --git a/plugins/outputs/influxdb_v2/influxdb_test.go b/plugins/outputs/influxdb_v2/influxdb_test.go new file mode 100644 index 0000000000000..3702b4309d774 --- /dev/null +++ b/plugins/outputs/influxdb_v2/influxdb_test.go @@ -0,0 +1,103 @@ +package influxdb_v2_test + +import ( + "testing" + + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/outputs" + influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" + "github.com/stretchr/testify/require" +) + +func TestDefaultURL(t *testing.T) { + output := influxdb.InfluxDB{} + err := output.Connect() + require.NoError(t, err) + if len(output.URLs) < 1 { + t.Fatal("Default URL failed to get set") + } + require.Equal(t, "http://localhost:9999", output.URLs[0]) +} +func TestConnect(t *testing.T) { + tests := []struct { + err bool + out influxdb.InfluxDB + }{ + { + out: influxdb.InfluxDB{ + URLs: []string{"http://localhost:1234"}, + HTTPProxy: "http://localhost:9999", + HTTPHeaders: map[string]string{ + "x": "y", + }, + }, + }, + { + err: true, + out: influxdb.InfluxDB{ + URLs: []string{"!@#$qwert"}, + HTTPProxy: "http://localhost:9999", + HTTPHeaders: map[string]string{ + "x": "y", + }, + }, + }, + { + err: true, + out: influxdb.InfluxDB{ + URLs: []string{"http://localhost:1234"}, + HTTPProxy: "!@#$%^&*()_+", + HTTPHeaders: map[string]string{ + "x": "y", + }, + }, + }, + { + err: true, + out: influxdb.InfluxDB{ + URLs: []string{"!@#$%^&*()_+"}, + HTTPProxy: "http://localhost:9999", + HTTPHeaders: map[string]string{ + "x": "y", + }, + }, + }, + { + err: true, + out: influxdb.InfluxDB{ + URLs: []string{":::@#$qwert"}, + HTTPProxy: "http://localhost:9999", + HTTPHeaders: map[string]string{ + "x": "y", + }, + }, + }, + { + err: true, + out: influxdb.InfluxDB{ + URLs: []string{"https://localhost:8080"}, + ClientConfig: tls.ClientConfig{ + TLSCA: "thing", + }, + }, + }, + } + + for i := range tests { + err := tests[i].out.Connect() + if !tests[i].err { + require.NoError(t, err) + } else { + require.Error(t, err) + t.Log(err) + } + } +} + +func TestUnused(t *testing.T) { + thing := influxdb.InfluxDB{} + thing.Close() + thing.Description() + thing.SampleConfig() + outputs.Outputs["influxdb_v2"]() +}