diff --git a/CHANGELOG.md b/CHANGELOG.md index c945eccd3df6a..42847fd2b28a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - [#1807](https://github.com/influxdata/telegraf/pull/1807): Option to use device name rather than path for reporting disk stats. - [#1348](https://github.com/influxdata/telegraf/issues/1348): Telegraf "internal" plugin for collecting stats on itself. - [#2127](https://github.com/influxdata/telegraf/pull/2127): Update Go version to 1.7.4. +- [#2126](https://github.com/influxdata/telegraf/pull/2126): Support a metric.Split function. ### Bugfixes diff --git a/metric.go b/metric.go index a29a63340b66b..cb230512f9f5c 100644 --- a/metric.go +++ b/metric.go @@ -22,6 +22,10 @@ type Metric interface { Serialize() []byte String() string // convenience function for string(Serialize()) Copy() Metric + // Split will attempt to return multiple metrics with the same timestamp + // whose string representations are no longer than maxSize. + // Metrics with a single field may exceed the requested size. + Split(maxSize int) []Metric // Tag functions HasTag(key string) bool diff --git a/metric/metric.go b/metric/metric.go index 4514d8ecc19f4..8a18c0f2ce2f8 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -178,6 +178,57 @@ func (m *metric) Serialize() []byte { return tmp } +func (m *metric) Split(maxSize int) []telegraf.Metric { + if m.Len() < maxSize { + return []telegraf.Metric{m} + } + var out []telegraf.Metric + + // constant number of bytes for each metric (in addition to field bytes) + constant := len(m.name) + len(m.tags) + len(m.t) + 3 + // currently selected fields + fields := make([]byte, 0, maxSize) + + i := 0 + for { + if i >= len(m.fields) { + // hit the end of the field byte slice + if len(fields) > 0 { + out = append(out, copyWith(m.name, m.tags, fields, m.t)) + } + break + } + + // find the end of the next field + j := indexUnescapedByte(m.fields[i:], ',') + if j == -1 { + j = len(m.fields) + } else { + j += i + } + + // if true, then we need to create a metric _not_ including the currently + // selected field + if len(m.fields[i:j])+len(fields)+constant > maxSize { + // if false, then we'll create a metric including the currently + // selected field anyways. This means that the given maxSize is too + // small for a single field to fit. + if len(fields) > 0 { + out = append(out, copyWith(m.name, m.tags, fields, m.t)) + } + + fields = make([]byte, 0, maxSize) + } + if len(fields) > 0 { + fields = append(fields, ',') + } + fields = append(fields, m.fields[i:j]...) + + i = j + 1 + } + return out +} + func (m *metric) Fields() map[string]interface{} { fieldMap := map[string]interface{}{} i := 0 @@ -380,17 +431,21 @@ func (m *metric) RemoveField(key string) error { } func (m *metric) Copy() telegraf.Metric { - mOut := metric{ - name: make([]byte, len(m.name)), - tags: make([]byte, len(m.tags)), - fields: make([]byte, len(m.fields)), - t: make([]byte, len(m.t)), + return copyWith(m.name, m.tags, m.fields, m.t) +} + +func copyWith(name, tags, fields, t []byte) telegraf.Metric { + out := metric{ + name: make([]byte, len(name)), + tags: make([]byte, len(tags)), + fields: make([]byte, len(fields)), + t: make([]byte, len(t)), } - copy(mOut.name, m.name) - copy(mOut.tags, m.tags) - copy(mOut.fields, m.fields) - copy(mOut.t, m.t) - return &mOut + copy(out.name, name) + copy(out.tags, tags) + copy(out.fields, fields) + copy(out.t, t) + return &out } func (m *metric) HashID() uint64 { diff --git a/metric/metric_benchmark_test.go b/metric/metric_benchmark_test.go index 86906483e2d13..9383fb0dc8bf6 100644 --- a/metric/metric_benchmark_test.go +++ b/metric/metric_benchmark_test.go @@ -50,6 +50,21 @@ func BenchmarkAddTag(b *testing.B) { s = string(mt.String()) } +func BenchmarkSplit(b *testing.B) { + var mt telegraf.Metric + mt = &metric{ + name: []byte("cpu"), + tags: []byte(",host=localhost"), + fields: []byte("a=101,b=10i,c=10101,d=101010,e=42"), + t: []byte("1480614053000000000"), + } + var metrics []telegraf.Metric + for n := 0; n < b.N; n++ { + metrics = mt.Split(60) + } + s = string(metrics[0].String()) +} + func BenchmarkTags(b *testing.B) { for n := 0; n < b.N; n++ { var mt, _ = New("test_metric", diff --git a/metric/metric_test.go b/metric/metric_test.go index b373cabd9869b..f209dc3e4484d 100644 --- a/metric/metric_test.go +++ b/metric/metric_test.go @@ -3,6 +3,7 @@ package metric import ( "fmt" "math" + "regexp" "testing" "time" @@ -434,6 +435,149 @@ func TestNewCounterMetric(t *testing.T) { assert.Equal(t, now.UnixNano(), m.UnixNano()) } +// test splitting metric into various max lengths +func TestSplitMetric(t *testing.T) { + now := time.Unix(0, 1480940990034083306) + tags := map[string]string{ + "host": "localhost", + } + fields := map[string]interface{}{ + "float": float64(100001), + "int": int64(100001), + "bool": true, + "false": false, + "string": "test", + } + m, err := New("cpu", tags, fields, now) + assert.NoError(t, err) + + split80 := m.Split(80) + assert.Len(t, split80, 2) + + split70 := m.Split(70) + assert.Len(t, split70, 3) + + split60 := m.Split(60) + assert.Len(t, split60, 4) +} + +// test splitting metric into various max lengths +// use a simple regex check to verify that the split metrics are valid +func TestSplitMetric_RegexVerify(t *testing.T) { + now := time.Unix(0, 1480940990034083306) + tags := map[string]string{ + "host": "localhost", + } + fields := map[string]interface{}{ + "foo": float64(98934259085), + "bar": float64(19385292), + "number": float64(19385292), + "another": float64(19385292), + "n": float64(19385292), + } + m, err := New("cpu", tags, fields, now) + assert.NoError(t, err) + + // verification regex + re := regexp.MustCompile(`cpu,host=localhost \w+=\d+(,\w+=\d+)* 1480940990034083306`) + + split90 := m.Split(90) + assert.Len(t, split90, 2) + for _, splitM := range split90 { + assert.True(t, re.Match(splitM.Serialize()), splitM.String()) + } + + split70 := m.Split(70) + assert.Len(t, split70, 3) + for _, splitM := range split70 { + assert.True(t, re.Match(splitM.Serialize()), splitM.String()) + } + + split20 := m.Split(20) + assert.Len(t, split20, 5) + for _, splitM := range split20 { + assert.True(t, re.Match(splitM.Serialize()), splitM.String()) + } +} + +// test splitting metric even when given length is shorter than +// shortest possible length +// Split should split metric as short as possible, ie, 1 field per metric +func TestSplitMetric_TooShort(t *testing.T) { + now := time.Unix(0, 1480940990034083306) + tags := map[string]string{ + "host": "localhost", + } + fields := map[string]interface{}{ + "float": float64(100001), + "int": int64(100001), + "bool": true, + "false": false, + "string": "test", + } + m, err := New("cpu", tags, fields, now) + assert.NoError(t, err) + + split := m.Split(10) + assert.Len(t, split, 5) + strings := make([]string, 5) + for i, splitM := range split { + strings[i] = splitM.String() + } + + assert.Contains(t, strings, "cpu,host=localhost float=100001 1480940990034083306\n") + assert.Contains(t, strings, "cpu,host=localhost int=100001i 1480940990034083306\n") + assert.Contains(t, strings, "cpu,host=localhost bool=true 1480940990034083306\n") + assert.Contains(t, strings, "cpu,host=localhost false=false 1480940990034083306\n") + assert.Contains(t, strings, "cpu,host=localhost string=\"test\" 1480940990034083306\n") +} + +func TestSplitMetric_NoOp(t *testing.T) { + now := time.Unix(0, 1480940990034083306) + tags := map[string]string{ + "host": "localhost", + } + fields := map[string]interface{}{ + "float": float64(100001), + "int": int64(100001), + "bool": true, + "false": false, + "string": "test", + } + m, err := New("cpu", tags, fields, now) + assert.NoError(t, err) + + split := m.Split(1000) + assert.Len(t, split, 1) + assert.Equal(t, m, split[0]) +} + +func TestSplitMetric_OneField(t *testing.T) { + now := time.Unix(0, 1480940990034083306) + tags := map[string]string{ + "host": "localhost", + } + fields := map[string]interface{}{ + "float": float64(100001), + } + m, err := New("cpu", tags, fields, now) + assert.NoError(t, err) + + assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", m.String()) + + split := m.Split(1000) + assert.Len(t, split, 1) + assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String()) + + split = m.Split(1) + assert.Len(t, split, 1) + assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String()) + + split = m.Split(40) + assert.Len(t, split, 1) + assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String()) +} + func TestNewMetricAggregate(t *testing.T) { now := time.Now()