Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Influx uint runtime #3948

Merged
merged 5 commits into from
Mar 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ deps:
gdm restore

telegraf:
go build -i -o $(TELEGRAF) -ldflags "$(LDFLAGS)" $(BUILDFLAGS) ./cmd/telegraf/telegraf.go
go build -i -o $(TELEGRAF) -ldflags "$(LDFLAGS)" ./cmd/telegraf/telegraf.go

go-install:
go install -ldflags "-w -s $(LDFLAGS)" ./cmd/telegraf
Expand All @@ -62,9 +62,6 @@ fmtcheck:
fi
@echo '[INFO] done.'

uint64:
BUILDFLAGS="-tags uint64" $(MAKE) all

lint:
golint ./...

Expand Down
28 changes: 22 additions & 6 deletions docs/DATA_FORMATS_OUTPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

Telegraf is able to serialize metrics into the following output data formats:

1. [InfluxDB Line Protocol](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#influx)
1. [JSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#json)
1. [Graphite](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite)
1. [InfluxDB Line Protocol](#influx)
1. [JSON](#json)
1. [Graphite](#graphite)

Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
[points](https://docs.influxdata.com/influxdb/latest/concepts/glossary/#point),
are a combination of four basic parts:

1. Measurement Name
Expand Down Expand Up @@ -49,8 +49,10 @@ I'll go over below.

# Influx:

There are no additional configuration options for InfluxDB line-protocol. The
metrics are serialized directly into InfluxDB line-protocol.
The `influx` format outputs data as
[InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/).
This is the recommended format to use unless another format is required for
interoperability.

### Influx Configuration:

Expand All @@ -64,6 +66,20 @@ metrics are serialized directly into InfluxDB line-protocol.
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"

## Maximum line length in bytes. Useful only for debugging.
# influx_max_line_bytes = 0

## When true, fields will be output in ascending lexical order. Enabling
## this option will result in decreased performance and is only recommended
## when you need predictable ordering while debugging.
# influx_sort_fields = false

## When true, Telegraf will output unsigned integers as unsigned values,
## i.e.: `42u`. You will need a version of InfluxDB supporting unsigned
## integer values. Enabling this option will result in field type errors if
## existing data has been written.
# influx_uint_support = false
```

# Graphite:
Expand Down
13 changes: 13 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
}
}

if node, ok := tbl.Fields["influx_uint_support"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
var err error
c.InfluxUintSupport, err = b.Boolean()
if err != nil {
return nil, err
}
}
}
}

if node, ok := tbl.Fields["json_timestamp_units"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
Expand All @@ -1409,6 +1421,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error

delete(tbl.Fields, "influx_max_line_bytes")
delete(tbl.Fields, "influx_sort_fields")
delete(tbl.Fields, "influx_uint_support")
delete(tbl.Fields, "data_format")
delete(tbl.Fields, "prefix")
delete(tbl.Fields, "template")
Expand Down
31 changes: 4 additions & 27 deletions metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,6 @@ import (
"github.com/influxdata/telegraf"
)

const MaxInt = int(^uint(0) >> 1)

// enableUint64Support will enable uint64 support if set to true.
var enableUint64Support = false

// EnableUintSupport manually enables uint support for convertValue.
// This function will be removed in the future and only exists for unit tests during the
// transition.
func EnableUintSupport() {
enableUint64Support = true
}

type metric struct {
name string
tags []*telegraf.Tag
Expand Down Expand Up @@ -269,19 +257,8 @@ func convertField(v interface{}) interface{} {
case int:
return int64(v)
case uint:
if v <= uint(MaxInt) {
return int64(v)
} else {
return int64(MaxInt)
}
return uint64(v)
case uint64:
if enableUint64Support == false {
if v <= uint64(MaxInt) {
return int64(v)
} else {
return int64(MaxInt)
}
}
return uint64(v)
case []byte:
return string(v)
Expand All @@ -292,11 +269,11 @@ func convertField(v interface{}) interface{} {
case int8:
return int64(v)
case uint32:
return int64(v)
return uint64(v)
case uint16:
return int64(v)
return uint64(v)
case uint8:
return int64(v)
return uint64(v)
case float32:
return float64(v)
default:
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/influxdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,10 @@ This InfluxDB output plugin writes metrics to the [InfluxDB](https://github.com/
## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"

## When true, Telegraf will output unsigned integers as unsigned values,
## i.e.: "42u". You will need a version of InfluxDB supporting unsigned
## integer values. Enabling this option will result in field type errors if
## existing data has been written.
# influx_uint_support = false
```
3 changes: 2 additions & 1 deletion plugins/outputs/influxdb/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ type HTTPConfig struct {
RetentionPolicy string
Consistency string

Serializer *influx.Serializer
InfluxUintSupport bool `toml:"influx_uint_support"`
Serializer *influx.Serializer
}

type httpClient struct {
Expand Down
10 changes: 10 additions & 0 deletions plugins/outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type InfluxDB struct {
HTTPHeaders map[string]string `toml:"http_headers"`
ContentEncoding string `toml:"content_encoding"`
SkipDatabaseCreation bool `toml:"skip_database_creation"`
InfluxUintSupport bool `toml:"influx_uint_support"`

// Path to CA file
SSLCA string `toml:"ssl_ca"`
Expand Down Expand Up @@ -119,6 +120,12 @@ var sampleConfig = `
## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"

## When true, Telegraf will output unsigned integers as unsigned values,
## i.e.: "42u". You will need a version of InfluxDB supporting unsigned
## integer values. Enabling this option will result in field type errors if
## existing data has been written.
# influx_uint_support = false
`

func (i *InfluxDB) Connect() error {
Expand All @@ -135,6 +142,9 @@ func (i *InfluxDB) Connect() error {
}

i.serializer = influx.NewSerializer()
if i.InfluxUintSupport {
i.serializer.SetFieldTypeSupport(influx.UintSupport)
}

for _, u := range urls {
u, err := url.Parse(u)
Expand Down
10 changes: 5 additions & 5 deletions plugins/parsers/influx/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (h *MetricHandler) AddInt(key []byte, value []byte) {
fk := unescape(key)
fv, err := parseIntBytes(bytes.TrimSuffix(value, []byte("i")), 10, 64)
if err != nil {
log.Errorf("E! Received unparseable int value: %q", value)
log.Errorf("E! Received unparseable int value: %q: %v", value, err)
return
}
h.builder.AddField(fk, fv)
Expand All @@ -58,7 +58,7 @@ func (h *MetricHandler) AddUint(key []byte, value []byte) {
fk := unescape(key)
fv, err := parseUintBytes(bytes.TrimSuffix(value, []byte("u")), 10, 64)
if err != nil {
log.Errorf("E! Received unparseable uint value: %q", value)
log.Errorf("E! Received unparseable uint value: %q: %v", value, err)
return
}
h.builder.AddField(fk, fv)
Expand All @@ -68,7 +68,7 @@ func (h *MetricHandler) AddFloat(key []byte, value []byte) {
fk := unescape(key)
fv, err := parseFloatBytes(value, 64)
if err != nil {
log.Errorf("E! Received unparseable float value: %q", value)
log.Errorf("E! Received unparseable float value: %q: %v", value, err)
return
}
h.builder.AddField(fk, fv)
Expand All @@ -84,7 +84,7 @@ func (h *MetricHandler) AddBool(key []byte, value []byte) {
fk := unescape(key)
fv, err := parseBoolBytes(value)
if err != nil {
log.Errorf("E! Received unparseable boolean value: %q", value)
log.Errorf("E! Received unparseable boolean value: %q: %v", value, err)
return
}
h.builder.AddField(fk, fv)
Expand All @@ -93,7 +93,7 @@ func (h *MetricHandler) AddBool(key []byte, value []byte) {
func (h *MetricHandler) SetTimestamp(tm []byte) {
v, err := parseIntBytes(tm, 10, 64)
if err != nil {
log.Errorf("E! Received unparseable timestamp: %q", tm)
log.Errorf("E! Received unparseable timestamp: %q: %v", tm, err)
return
}
ns := v * int64(h.precision)
Expand Down
52 changes: 38 additions & 14 deletions plugins/parsers/influx/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ func Metric(v telegraf.Metric, err error) telegraf.Metric {
return v
}

const (
Uint64Overflow uint64 = 9223372036854775808
Uint64Max uint64 = 18446744073709551615
Uint64Test uint64 = 42
)

var DefaultTime = func() time.Time {
return time.Unix(42, 0)
}
Expand Down Expand Up @@ -263,15 +257,30 @@ var ptests = []struct {
err: nil,
},
{
name: "field uint",
input: []byte("cpu value=42u"),
name: "field int overflow dropped",
input: []byte("cpu value=9223372036854775808i"),
metrics: []telegraf.Metric{
Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{},
time.Unix(42, 0),
),
),
},
err: nil,
},
{
name: "field int max value",
input: []byte("cpu value=9223372036854775807i"),
metrics: []telegraf.Metric{
Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": Uint64Test,
"value": 9223372036854775807,
},
time.Unix(42, 0),
),
Expand All @@ -280,15 +289,15 @@ var ptests = []struct {
err: nil,
},
{
name: "field uint int overflow",
input: []byte("cpu value=9223372036854775808u"),
name: "field uint",
input: []byte("cpu value=42u"),
metrics: []telegraf.Metric{
Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": Uint64Overflow,
"value": uint64(42),
},
time.Unix(42, 0),
),
Expand All @@ -297,15 +306,30 @@ var ptests = []struct {
err: nil,
},
{
name: "field uint maximum",
name: "field uint overflow dropped",
input: []byte("cpu value=18446744073709551616u"),
metrics: []telegraf.Metric{
Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{},
time.Unix(42, 0),
),
),
},
err: nil,
},
{
name: "field uint max value",
input: []byte("cpu value=18446744073709551615u"),
metrics: []telegraf.Metric{
Metric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": Uint64Max,
"value": uint64(18446744073709551615),
},
time.Unix(42, 0),
),
Expand Down
Loading