Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add influx2.0 output plugin #4645

Merged
merged 11 commits into from
Sep 12, 2018
Merged
54 changes: 50 additions & 4 deletions plugins/outputs/influxdb_v2/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"compress/gzip"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"path"
"strconv"
"time"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -43,6 +45,7 @@ type HTTPConfig struct {
Token string
Organization string
Bucket string
Precision string
Timeout time.Duration
Headers map[string]string
Proxy *url.URL
Expand Down Expand Up @@ -101,7 +104,8 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
writeURL, err := makeWriteURL(
*config.URL,
config.Organization,
config.Bucket)
config.Bucket,
config.Precision)
if err != nil {
return nil, err
}
Expand All @@ -119,7 +123,7 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
return net.DialTimeout(
config.URL.Scheme,
config.URL.Path,
defaultRequestTimeout,
timeout,
)
},
}
Expand Down Expand Up @@ -147,6 +151,19 @@ func (c *httpClient) URL() string {
return c.url.String()
}

type genericRespError struct {
Code string
Message string
Op string
Err string
Line int32
MaxLength int32
}

func (g genericRespError) String() string {
return fmt.Sprintf("%s: %s", g.Code, g.Message)
}

func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error {
var err error
glinton marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -166,7 +183,33 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
return nil
}

desc := resp.Header.Get("X-Influx-Error")
writeResp := &genericRespError{}
json.NewDecoder(resp.Body).Decode(writeResp)
glinton marked this conversation as resolved.
Show resolved Hide resolved
var desc string

switch resp.StatusCode {
glinton marked this conversation as resolved.
Show resolved Hide resolved
case http.StatusBadRequest: // 400
// LineProtocolError
desc = fmt.Sprintf("%s - %s;%d;%s", writeResp, writeResp.Op, writeResp.Line, writeResp.Err)
glinton marked this conversation as resolved.
Show resolved Hide resolved
glinton marked this conversation as resolved.
Show resolved Hide resolved
case http.StatusUnauthorized, http.StatusForbidden: // 401, 403
// Error
desc = fmt.Sprintf("%s - %s;%s", writeResp, writeResp.Op, writeResp.Err)
glinton marked this conversation as resolved.
Show resolved Hide resolved
case http.StatusRequestEntityTooLarge: // 413
// LineProtocolLengthError
desc = fmt.Sprintf("%s - %s;%d", writeResp, writeResp.Op, writeResp.MaxLength)
glinton marked this conversation as resolved.
Show resolved Hide resolved
case http.StatusTooManyRequests, http.StatusServiceUnavailable: // 429, 503
retryAfter := resp.Header.Get("Retry-After")
retry, err := strconv.Atoi(retryAfter)
if err != nil {
return fmt.Errorf("Bad value for 'Retry-After': %s", err.Error())
}
time.Sleep(time.Second * time.Duration(retry))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want a max sleep just in case the server says you sleep for a long time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10s seems reasonable to me as a max sleep. The big issue with the influxdb output is that, when under load, the next filled batch will almost immediately trigger the next write even after a write timeout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh but don't sleep and write because of #2919, hang on to the time and return. On the next call to Write check the time and if the amount of time has not passed then return an error.

Let's define a special error type specifically for this, something like CircuitOpenErr? We can put it in the internal package for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the failed to write metric get cached, timestamp included? If the next call to write is less than the initial retry-time, would that second metric also get cached and be written on the next attempt that is after the retry-time? If the next write also returns a retry-time, is it acceptable to drop that new metric, even though it didn't get a chance to retry?
I put in a max retries limit based on the connection/httpClient, but it seemed wrong, would that be useful to put back in?

c.Write(ctx, metrics)
}

if xErr := resp.Header.Get("X-Influx-Error"); xErr != "" {
desc = fmt.Sprintf("%s - %s", desc, xErr)
glinton marked this conversation as resolved.
Show resolved Hide resolved
}

return &APIError{
StatusCode: resp.StatusCode,
Expand Down Expand Up @@ -219,10 +262,13 @@ func compressWithGzip(data io.Reader) (io.Reader, error) {
return pipeReader, err
}

func makeWriteURL(loc url.URL, org, bucket string) (string, error) {
func makeWriteURL(loc url.URL, org, bucket, precision string) (string, error) {
params := url.Values{}
params.Set("bucket", bucket)
params.Set("org", org)
if precision != "" {
params.Set("precision", precision)
}

switch loc.Scheme {
case "unix":
Expand Down
7 changes: 6 additions & 1 deletion plugins/outputs/influxdb_v2/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ var sampleConfig = `
## Token for authentication.
token = ""

## Organization is the name of the organization you wish to write to.
## Organization is the name of the organization you wish to write to; must exist.
organization = ""

## Bucket to the name fo the bucketwrite into; must exist.
bucket = ""

## Precision for the unix timestamps within the body line-protocol.
# precision = "ns"
glinton marked this conversation as resolved.
Show resolved Hide resolved

## Timeout for HTTP messages.
# timeout = "5s"

Expand Down Expand Up @@ -74,6 +77,7 @@ type InfluxDB struct {
Token string `toml:"token"`
Organization string `toml:"organization"`
Bucket string `toml:"bucket"`
Precision string `toml:"precision"`
Timeout internal.Duration `toml:"timeout"`
HTTPHeaders map[string]string `toml:"http_headers"`
HTTPProxy string `toml:"http_proxy"`
Expand Down Expand Up @@ -168,6 +172,7 @@ func (i *InfluxDB) getHTTPClient(ctx context.Context, url *url.URL, proxy *url.U
Token: i.Token,
Organization: i.Organization,
Bucket: i.Bucket,
Precision: i.Precision,
Timeout: i.Timeout.Duration,
Headers: i.HTTPHeaders,
Proxy: proxy,
Expand Down