From 2c863e97037243280912b4ca1a67aa95965649d7 Mon Sep 17 00:00:00 2001 From: Max U Date: Tue, 24 Jul 2018 14:37:22 -0700 Subject: [PATCH 1/5] new logfmt parser -semi functional --- plugins/parsers/logfmt/parser.go | 87 +++++++++++++++++ plugins/parsers/logfmt/parser_test.go | 130 ++++++++++++++++++++++++++ plugins/parsers/registry.go | 8 ++ 3 files changed, 225 insertions(+) create mode 100644 plugins/parsers/logfmt/parser.go create mode 100644 plugins/parsers/logfmt/parser_test.go diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go new file mode 100644 index 0000000000000..2d54655466afd --- /dev/null +++ b/plugins/parsers/logfmt/parser.go @@ -0,0 +1,87 @@ +// Package logfmt converts logfmt data into metrics. +package logfmt + +import ( + "bytes" + "fmt" + "log" + "strings" + "time" + + glogfmt "github.com/go-logfmt/logfmt" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +// Parser decodes logfmt formatted messages into metrics. +type Parser struct { + MetricName string + DefaultTags map[string]string + Now func() time.Time +} + +// NewParser creates a parser. +func NewParser(metricName string, defaultTags map[string]string) *Parser { + return &Parser{ + MetricName: metricName, + DefaultTags: defaultTags, + Now: time.Now, + } +} + +// Parse converts a slice of bytes in logfmt format to metrics. +func (l *Parser) Parse(b []byte) ([]telegraf.Metric, error) { + reader := bytes.NewReader(b) + decoder := glogfmt.NewDecoder(reader) + metrics := make([]telegraf.Metric, 0) + for decoder.ScanRecord() { + tags := make(map[string]string) + fields := make(map[string]interface{}) + //add default tags + for k, v := range l.DefaultTags { + tags[k] = v + } + + for decoder.ScanKeyval() { + log.Printf("k: %v, v: %v", string(decoder.Key()), string(decoder.Value())) + if string(decoder.Value()) == "" { + return metrics, fmt.Errorf("value could not be found for key: %v", string(decoder.Key())) + } + fields[string(decoder.Key())] = string(decoder.Value()) + } + m, err := metric.New(l.MetricName, tags, fields, l.Now()) + if err != nil { + return nil, err + } + metrics = append(metrics, m) + } + return metrics, nil +} + +// ParseLine converts a single line of text in logfmt to metrics. +func (l *Parser) ParseLine(s string) (telegraf.Metric, error) { + reader := strings.NewReader(s) + decoder := glogfmt.NewDecoder(reader) + + decoder.ScanRecord() + tags := make(map[string]string) + fields := make(map[string]interface{}) + //add default tags + for k, v := range l.DefaultTags { + tags[k] = v + } + + for decoder.ScanKeyval() { + fields[string(decoder.Key())] = string(decoder.Value()) + } + m, err := metric.New(l.MetricName, tags, fields, l.Now()) + if err != nil { + return nil, err + } + return m, nil +} + +// SetDefaultTags adds tags to the metrics outputs of Parse and ParseLine. +func (l *Parser) SetDefaultTags(tags map[string]string) { + l.DefaultTags = tags +} diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go new file mode 100644 index 0000000000000..9876c11b52af0 --- /dev/null +++ b/plugins/parsers/logfmt/parser_test.go @@ -0,0 +1,130 @@ +package logfmt + +import ( + "reflect" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" +) + +func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric { + t.Helper() + v, err := metric.New(m.Measurement, m.Tags, m.Fields, m.Time) + if err != nil { + t.Fatal(err) + } + return v +} + +func TestParse(t *testing.T) { + tests := []struct { + name string + measurement string + now func() time.Time + bytes []byte + want []telegraf.Metric + wantErr bool + }{ + { + name: "no bytes returns no metrics", + now: func() time.Time { return time.Unix(0, 0) }, + want: []telegraf.Metric{}, + }, + { + name: "logfmt parser returns all the fields", + bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`), + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + want: []telegraf.Metric{ + MustMetric(t, &testutil.Metric{ + Measurement: "testlog", + Tags: map[string]string{}, + Fields: map[string]interface{}{ + "lvl": "info", + "msg": "http request", + "method": "POST", + "ts": "2018-07-24T19:43:40.275Z", + }, + Time: time.Unix(0, 0), + }), + }, + }, + { + name: "poorly formatted logfmt returns error", + now: func() time.Time { return time.Unix(0, 0) }, + bytes: []byte(`i am garbage data.`), + want: []telegraf.Metric{}, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := Parser{ + MetricName: tt.measurement, + Now: tt.now, + } + got, err := l.Parse(tt.bytes) + if (err != nil) != tt.wantErr { + t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("Logfmt.Parse = %v, want %v", got, tt.want) + } + }) + } +} + +func TestParseLine(t *testing.T) { + tests := []struct { + name string + s string + measurement string + now func() time.Time + want telegraf.Metric + wantErr bool + }{ + { + name: "test something", + now: func() time.Time { return time.Unix(0, 0) }, + want: MustMetric(t, &testutil.Metric{ + Time: time.Unix(0, 0), + }), + }, + { + name: "log parser fmt returns all fields", + now: func() time.Time { return time.Unix(0, 0) }, + measurement: "testlog", + s: `ts=2018-07-24T19:43:35.207268Z lvl=error msg="Write failed" log_id=09R4e4Rl000`, + want: MustMetric(t, &testutil.Metric{ + Measurement: "testlog", + Fields: map[string]interface{}{ + "ts": "2018-07-24T19:43:35.207268Z", + "lvl": "error", + "msg": "Write failed", + "log_id": "09R4e4Rl000", + }, + Time: time.Unix(0, 0), + }), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + l := Parser{ + MetricName: tt.measurement, + Now: tt.now, + } + got, err := l.ParseLine(tt.s) + if (err != nil) != tt.wantErr { + t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("Logfmt.Parse = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 24e73d4b63ca6..43795ca193aa6 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/grok" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/json" + "github.com/influxdata/telegraf/plugins/parsers/logfmt" "github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/value" ) @@ -139,6 +140,8 @@ func NewParser(config *Config) (Parser, error) { config.GrokCustomPatterns, config.GrokCustomPatternFiles, config.GrokTimeZone) + case "logfmt": + parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags) default: err = fmt.Errorf("Invalid data format: %s", config.DataFormat) } @@ -238,3 +241,8 @@ func NewDropwizardParser( } return parser, err } + +// NewLogFmtParser returns a logfmt parser with the default options. +func NewLogFmtParser(metricName string, defaultTags map[string]string) (Parser, error) { + return logfmt.NewParser(metricName, defaultTags), nil +} From a31a750ceb1ae8906091159133872953580a2070 Mon Sep 17 00:00:00 2001 From: Max U Date: Wed, 25 Jul 2018 11:00:22 -0700 Subject: [PATCH 2/5] modify test cases to be functional --- plugins/parsers/logfmt/parser_test.go | 36 +++++++++++++-------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index 9876c11b52af0..e5c52de73a832 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -1,7 +1,6 @@ package logfmt import ( - "reflect" "testing" "time" @@ -25,21 +24,21 @@ func TestParse(t *testing.T) { measurement string now func() time.Time bytes []byte - want []telegraf.Metric + want []testutil.Metric wantErr bool }{ { name: "no bytes returns no metrics", now: func() time.Time { return time.Unix(0, 0) }, - want: []telegraf.Metric{}, + want: []testutil.Metric{}, }, { name: "logfmt parser returns all the fields", bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`), now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", - want: []telegraf.Metric{ - MustMetric(t, &testutil.Metric{ + want: []testutil.Metric{ + testutil.Metric{ Measurement: "testlog", Tags: map[string]string{}, Fields: map[string]interface{}{ @@ -49,14 +48,14 @@ func TestParse(t *testing.T) { "ts": "2018-07-24T19:43:40.275Z", }, Time: time.Unix(0, 0), - }), + }, }, }, { name: "poorly formatted logfmt returns error", now: func() time.Time { return time.Unix(0, 0) }, bytes: []byte(`i am garbage data.`), - want: []telegraf.Metric{}, + want: []testutil.Metric{}, wantErr: true, }, } @@ -71,8 +70,8 @@ func TestParse(t *testing.T) { t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) return } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("Logfmt.Parse = %v, want %v", got, tt.want) + for i, m := range got { + testutil.MustEqual(t, m, tt.want[i]) } }) } @@ -84,22 +83,24 @@ func TestParseLine(t *testing.T) { s string measurement string now func() time.Time - want telegraf.Metric + want testutil.Metric wantErr bool }{ { name: "test something", now: func() time.Time { return time.Unix(0, 0) }, - want: MustMetric(t, &testutil.Metric{ - Time: time.Unix(0, 0), - }), + want: testutil.Metric{ + Tags: map[string]string{}, + Fields: map[string]interface{}{}, + Time: time.Unix(0, 0), + }, }, { name: "log parser fmt returns all fields", now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", s: `ts=2018-07-24T19:43:35.207268Z lvl=error msg="Write failed" log_id=09R4e4Rl000`, - want: MustMetric(t, &testutil.Metric{ + want: testutil.Metric{ Measurement: "testlog", Fields: map[string]interface{}{ "ts": "2018-07-24T19:43:35.207268Z", @@ -107,8 +108,9 @@ func TestParseLine(t *testing.T) { "msg": "Write failed", "log_id": "09R4e4Rl000", }, + Tags: map[string]string{}, Time: time.Unix(0, 0), - }), + }, }, } for _, tt := range tests { @@ -122,9 +124,7 @@ func TestParseLine(t *testing.T) { t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) return } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("Logfmt.Parse = %v, want %v", got, tt.want) - } + testutil.MustEqual(t, got, tt.want) }) } } From 6d3e632dfc8e2c050c8ab07c0d62001493c7c24d Mon Sep 17 00:00:00 2001 From: Max U Date: Wed, 25 Jul 2018 11:46:40 -0700 Subject: [PATCH 3/5] update README for logfmt parser --- docs/DATA_FORMATS_INPUT.md | 8 +++++++- plugins/parsers/logfmt/parser.go | 2 -- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 24335a4531ad0..9cbdf7c447379 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics: 1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd) 1. [Dropwizard](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#dropwizard) 1. [Grok](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#grok) +1. [Logfmt](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#logfmt) Telegraf metrics, like InfluxDB [points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/), @@ -761,4 +762,9 @@ HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG} ## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones ## 3. UTC -- or blank/unspecified, will return timestamp in UTC grok_timezone = "Canada/Eastern" -``` \ No newline at end of file +``` + +# Logfmt +For extracting key-value pairs from log text in the form `=`. +At the moment, the plugin will produce one metric per line and all keys +are added as fields. Values are left as strings (for now). \ No newline at end of file diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index 2d54655466afd..30f6ec41cfbeb 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -4,7 +4,6 @@ package logfmt import ( "bytes" "fmt" - "log" "strings" "time" @@ -43,7 +42,6 @@ func (l *Parser) Parse(b []byte) ([]telegraf.Metric, error) { } for decoder.ScanKeyval() { - log.Printf("k: %v, v: %v", string(decoder.Key()), string(decoder.Value())) if string(decoder.Value()) == "" { return metrics, fmt.Errorf("value could not be found for key: %v", string(decoder.Key())) } From 7934d4cee6119480a93157f8993c229f2bd62672 Mon Sep 17 00:00:00 2001 From: Max U Date: Wed, 25 Jul 2018 13:55:51 -0700 Subject: [PATCH 4/5] first draft of parser processor --- plugins/parsers/logfmt/parser.go | 29 ++++++- plugins/parsers/logfmt/parser_test.go | 4 +- plugins/processors/fieldparser/fieldparser.go | 87 +++++++++++++++++++ .../fieldparser/fieldparser_test.go | 1 + 4 files changed, 117 insertions(+), 4 deletions(-) create mode 100644 plugins/processors/fieldparser/fieldparser.go create mode 100644 plugins/processors/fieldparser/fieldparser_test.go diff --git a/plugins/parsers/logfmt/parser.go b/plugins/parsers/logfmt/parser.go index 30f6ec41cfbeb..5d19e4b3982d6 100644 --- a/plugins/parsers/logfmt/parser.go +++ b/plugins/parsers/logfmt/parser.go @@ -4,6 +4,7 @@ package logfmt import ( "bytes" "fmt" + "strconv" "strings" "time" @@ -45,7 +46,18 @@ func (l *Parser) Parse(b []byte) ([]telegraf.Metric, error) { if string(decoder.Value()) == "" { return metrics, fmt.Errorf("value could not be found for key: %v", string(decoder.Key())) } - fields[string(decoder.Key())] = string(decoder.Value()) + + //attempt type conversions + value := string(decoder.Value()) + if iValue, err := strconv.Atoi(value); err == nil { + fields[string(decoder.Key())] = iValue + } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { + fields[string(decoder.Key())] = fValue + } else if bValue, err := strconv.ParseBool(value); err == nil { + fields[string(decoder.Key())] = bValue + } else { + fields[string(decoder.Key())] = value + } } m, err := metric.New(l.MetricName, tags, fields, l.Now()) if err != nil { @@ -70,7 +82,20 @@ func (l *Parser) ParseLine(s string) (telegraf.Metric, error) { } for decoder.ScanKeyval() { - fields[string(decoder.Key())] = string(decoder.Value()) + if string(decoder.Value()) == "" { + return nil, fmt.Errorf("value could not be found for key: %v", string(decoder.Key())) + } + //attempt type conversions + value := string(decoder.Value()) + if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { + fields[string(decoder.Key())] = iValue + } else if fValue, err := strconv.ParseFloat(value, 64); err == nil { + fields[string(decoder.Key())] = fValue + } else if bValue, err := strconv.ParseBool(value); err == nil { + fields[string(decoder.Key())] = bValue + } else { + fields[string(decoder.Key())] = value + } } m, err := metric.New(l.MetricName, tags, fields, l.Now()) if err != nil { diff --git a/plugins/parsers/logfmt/parser_test.go b/plugins/parsers/logfmt/parser_test.go index e5c52de73a832..4bbb9784a4e10 100644 --- a/plugins/parsers/logfmt/parser_test.go +++ b/plugins/parsers/logfmt/parser_test.go @@ -99,12 +99,12 @@ func TestParseLine(t *testing.T) { name: "log parser fmt returns all fields", now: func() time.Time { return time.Unix(0, 0) }, measurement: "testlog", - s: `ts=2018-07-24T19:43:35.207268Z lvl=error msg="Write failed" log_id=09R4e4Rl000`, + s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`, want: testutil.Metric{ Measurement: "testlog", Fields: map[string]interface{}{ "ts": "2018-07-24T19:43:35.207268Z", - "lvl": "error", + "lvl": int64(5), "msg": "Write failed", "log_id": "09R4e4Rl000", }, diff --git a/plugins/processors/fieldparser/fieldparser.go b/plugins/processors/fieldparser/fieldparser.go new file mode 100644 index 0000000000000..6152156190f51 --- /dev/null +++ b/plugins/processors/fieldparser/fieldparser.go @@ -0,0 +1,87 @@ +package fieldparser + +import ( + "log" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/processors" +) + +type FieldParser struct { + config parsers.Config + parseFields []string `toml:"parse_fields"` + parseTags []string `toml:"parse_tags"` + Parser parsers.Parser +} + +// holds a default sample config +var SampleConfig = ` +## specify the name of the tag[s] whose value will be parsed +parse_tags = [] + +## specify the name of the field[s] whose value will be parsed +parse_fields = [] + +[processors.fieldparser.config] + data_format = "logfmt" + ## additional configurations for parser go here +` + +// returns the default config +func (p *FieldParser) SampleConfig() string { + return SampleConfig +} + +// returns a brief description of the processor +func (p *FieldParser) Description() string { + return "Parse a value in a specified field/tag(s) and add the result in a new metric" +} + +func (p *FieldParser) Apply(metrics ...telegraf.Metric) []telegraf.Metric { + newMetrics := make([]telegraf.Metric, 0) + + //load input metrics into newMetrics + newMetrics = append(newMetrics, metrics...) + if p.Parser == nil { + var err error + p.Parser, err = parsers.NewParser(&p.config) + if err != nil { + log.Printf("E! [processors.fieldparser] could not create parser: %v", err) + return newMetrics + } + } + + for _, metric := range metrics { + for _, key := range p.parseFields { + value := metric.Fields()[key] + nMetrics, err := p.parseField(value.(string)) + if err != nil { + log.Printf("E! [processors.fieldparser] could not parse field %v: %v", key, err) + return newMetrics + } + newMetrics = append(newMetrics, nMetrics...) + } + for _, key := range p.parseTags { + value := metric.Tags()[key] + nMetrics, err := p.parseField(value) + if err != nil { + log.Printf("E! [processors.fieldparser] could not parse field %v: %v", key, err) + return newMetrics + } + newMetrics = append(newMetrics, nMetrics...) + } + } + return newMetrics + +} + +func (p *FieldParser) parseField(value string) ([]telegraf.Metric, error) { + return p.Parser.Parse([]byte(value)) +} + +func init() { + processors.Add("fieldparser", func() telegraf.Processor { + return &FieldParser{} + }) +} diff --git a/plugins/processors/fieldparser/fieldparser_test.go b/plugins/processors/fieldparser/fieldparser_test.go new file mode 100644 index 0000000000000..3e548eb30a129 --- /dev/null +++ b/plugins/processors/fieldparser/fieldparser_test.go @@ -0,0 +1 @@ +package fieldparser From 958f4d2903cb5877a6bfb7f85dd54c1d61626406 Mon Sep 17 00:00:00 2001 From: Max U Date: Wed, 25 Jul 2018 15:58:32 -0700 Subject: [PATCH 5/5] add functionality and some unit tests for fieldparser processor --- plugins/processors/fieldparser/fieldparser.go | 20 ++- .../fieldparser/fieldparser_test.go | 120 ++++++++++++++++++ 2 files changed, 129 insertions(+), 11 deletions(-) diff --git a/plugins/processors/fieldparser/fieldparser.go b/plugins/processors/fieldparser/fieldparser.go index 6152156190f51..95c71d67a77ea 100644 --- a/plugins/processors/fieldparser/fieldparser.go +++ b/plugins/processors/fieldparser/fieldparser.go @@ -1,6 +1,7 @@ package fieldparser import ( + "fmt" "log" "github.com/influxdata/telegraf" @@ -39,40 +40,37 @@ func (p *FieldParser) Description() string { } func (p *FieldParser) Apply(metrics ...telegraf.Metric) []telegraf.Metric { - newMetrics := make([]telegraf.Metric, 0) - - //load input metrics into newMetrics - newMetrics = append(newMetrics, metrics...) if p.Parser == nil { var err error p.Parser, err = parsers.NewParser(&p.config) if err != nil { log.Printf("E! [processors.fieldparser] could not create parser: %v", err) - return newMetrics + return metrics } } for _, metric := range metrics { for _, key := range p.parseFields { value := metric.Fields()[key] - nMetrics, err := p.parseField(value.(string)) + strVal := fmt.Sprintf("%v", value) + nMetrics, err := p.parseField(strVal) if err != nil { log.Printf("E! [processors.fieldparser] could not parse field %v: %v", key, err) - return newMetrics + return metrics } - newMetrics = append(newMetrics, nMetrics...) + metrics = append(metrics, nMetrics...) } for _, key := range p.parseTags { value := metric.Tags()[key] nMetrics, err := p.parseField(value) if err != nil { log.Printf("E! [processors.fieldparser] could not parse field %v: %v", key, err) - return newMetrics + return metrics } - newMetrics = append(newMetrics, nMetrics...) + metrics = append(metrics, nMetrics...) } } - return newMetrics + return metrics } diff --git a/plugins/processors/fieldparser/fieldparser_test.go b/plugins/processors/fieldparser/fieldparser_test.go index 3e548eb30a129..cb707685e993e 100644 --- a/plugins/processors/fieldparser/fieldparser_test.go +++ b/plugins/processors/fieldparser/fieldparser_test.go @@ -1 +1,121 @@ package fieldparser + +import ( + "reflect" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/stretchr/testify/require" +) + +//compares metrics without comparing time +func compareMetrics(t *testing.T, metrics1 []telegraf.Metric, metrics2 []telegraf.Metric) { + for i, m1 := range metrics1 { + m2 := metrics2[i] + require.True(t, reflect.DeepEqual(m1.Tags(), m2.Tags())) + require.True(t, reflect.DeepEqual(m1.Fields(), m2.Fields())) + //require.True(t, m1.Name() == m2.Name()) + } +} + +func Metric(v telegraf.Metric, err error) telegraf.Metric { + if err != nil { + panic(err) + } + return v +} + +func TestApply(t *testing.T) { + tests := []struct { + name string + parseFields []string + config parsers.Config + input telegraf.Metric + expected []telegraf.Metric + }{ + { + name: "successfully parsed", + config: parsers.Config{ + DataFormat: "logfmt", + }, + input: Metric( + metric.New( + "success", + map[string]string{}, + map[string]interface{}{ + "test_name": `ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`, + }, + time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New( + "success", + map[string]string{}, + map[string]interface{}{ + "test_name": `ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`, + }, + time.Unix(0, 0))), + Metric(metric.New( + "success", + map[string]string{}, + map[string]interface{}{ + "ts": "2018-07-24T19:43:40.275Z", + "lvl": "info", + "msg": "http request", + "method": "post", + }, + time.Unix(0, 0))), + }, + }, + } + + for _, tt := range tests { + parser := FieldParser{ + config: tt.config, + parseFields: tt.parseFields, + } + + output := parser.Apply(tt.input) + + compareMetrics(t, output, tt.expected) + } +} + +func TestBadApply(t *testing.T) { + tests := []struct { + name string + parseFields []string + config parsers.Config + input telegraf.Metric + expected []telegraf.Metric + }{ + { + name: "field not found", + parseFields: []string{"bad_field"}, + config: parsers.Config{ + DataFormat: "logfmt", + }, + input: Metric(metric.New("bad", map[string]string{}, map[string]interface{}{ + "some_field": 5, + }, time.Unix(0, 0))), + expected: []telegraf.Metric{ + Metric(metric.New("bad", map[string]string{}, map[string]interface{}{ + "some_field": 5, + }, time.Unix(0, 0))), + }, + }, + } + + for _, tt := range tests { + parser := FieldParser{ + config: tt.config, + parseFields: tt.parseFields, + } + + output := parser.Apply(tt.input) + + compareMetrics(t, output, tt.expected) + } +}