-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Logfmt parser #4539
Merged
Merged
Logfmt parser #4539
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
53dd0d9
Add logfmt parser
danielnelson 4ccfea3
add applyDefaultTags function so that tags will be set at the end of …
89f783e
Reformat the ParseLine function so that there is less duplicate code …
11e9587
Rename test
1edf8cc
ParseLine should fail if there are no metrics to Parse
8ec40fb
WIP: support parsing of multiple lines
4eeff2f
Add counter to check for empty metrics
0589d50
Test with and without trailing end
a049014
WIP: reformat code to fix No Metric In line and no bytes returns no m…
4bfecde
Fix error so that logfmt parses every line test returns two Metrics
44f9e06
Fix syntax in Test with and without trailing end
995e068
Add additional documentation for logfmt parser
594402d
Remove logs and unnecessary comments
b061a3e
Fix parseline's multi parse line test
40a6c4f
Use strconv.ParseInt for conversion
cd24af8
Reformat
98b9b17
Refactor
8d31561
update README for logfmt parser
e75dec4
Add requested changes
ff03ce2
better error messages
maxunt 990df29
Merge branch 'master' into logfmt-parser
maxunt a3f3331
Fix conflict markers
danielnelson File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
package logfmt | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"strconv" | ||
"time" | ||
|
||
"github.com/go-logfmt/logfmt" | ||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/metric" | ||
) | ||
|
||
var ( | ||
ErrNoMetric = fmt.Errorf("no metric in line") | ||
) | ||
|
||
// Parser decodes logfmt formatted messages into metrics. | ||
type Parser struct { | ||
MetricName string | ||
DefaultTags map[string]string | ||
Now func() time.Time | ||
} | ||
|
||
// NewParser creates a parser. | ||
func NewParser(metricName string, defaultTags map[string]string) *Parser { | ||
return &Parser{ | ||
MetricName: metricName, | ||
DefaultTags: defaultTags, | ||
Now: time.Now, | ||
} | ||
} | ||
|
||
// Parse converts a slice of bytes in logfmt format to metrics. | ||
func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) { | ||
reader := bytes.NewReader(b) | ||
decoder := logfmt.NewDecoder(reader) | ||
metrics := make([]telegraf.Metric, 0) | ||
for { | ||
ok := decoder.ScanRecord() | ||
if !ok { | ||
err := decoder.Err() | ||
if err != nil { | ||
return nil, err | ||
} | ||
break | ||
} | ||
fields := make(map[string]interface{}) | ||
for decoder.ScanKeyval() { | ||
if string(decoder.Value()) == "" { | ||
continue | ||
} | ||
|
||
//type conversions | ||
value := string(decoder.Value()) | ||
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { | ||
fields[string(decoder.Key())] = iValue | ||
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil { | ||
fields[string(decoder.Key())] = fValue | ||
} else if bValue, err := strconv.ParseBool(value); err == nil { | ||
fields[string(decoder.Key())] = bValue | ||
} else { | ||
fields[string(decoder.Key())] = value | ||
} | ||
} | ||
if len(fields) == 0 { | ||
continue | ||
} | ||
|
||
m, err := metric.New(p.MetricName, map[string]string{}, fields, p.Now()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
metrics = append(metrics, m) | ||
} | ||
p.applyDefaultTags(metrics) | ||
return metrics, nil | ||
} | ||
|
||
// ParseLine converts a single line of text in logfmt format to metrics. | ||
func (p *Parser) ParseLine(s string) (telegraf.Metric, error) { | ||
metrics, err := p.Parse([]byte(s)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if len(metrics) < 1 { | ||
return nil, ErrNoMetric | ||
} | ||
return metrics[0], nil | ||
} | ||
|
||
// SetDefaultTags adds tags to the metrics outputs of Parse and ParseLine. | ||
func (p *Parser) SetDefaultTags(tags map[string]string) { | ||
p.DefaultTags = tags | ||
} | ||
|
||
func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) { | ||
if len(p.DefaultTags) == 0 { | ||
return | ||
} | ||
|
||
for _, m := range metrics { | ||
for k, v := range p.DefaultTags { | ||
if !m.HasTag(k) { | ||
m.AddTag(k, v) | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,231 @@ | ||
package logfmt | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/metric" | ||
"github.com/influxdata/telegraf/testutil" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric { | ||
t.Helper() | ||
v, err := metric.New(m.Measurement, m.Tags, m.Fields, m.Time) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
return v | ||
} | ||
|
||
func TestParse(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
measurement string | ||
now func() time.Time | ||
bytes []byte | ||
want []testutil.Metric | ||
wantErr bool | ||
}{ | ||
{ | ||
name: "no bytes returns no metrics", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
want: []testutil.Metric{}, | ||
}, | ||
{ | ||
name: "test without trailing end", | ||
bytes: []byte("foo=\"bar\""), | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
want: []testutil.Metric{ | ||
testutil.Metric{ | ||
Measurement: "testlog", | ||
Tags: map[string]string{}, | ||
Fields: map[string]interface{}{ | ||
"foo": "bar", | ||
}, | ||
Time: time.Unix(0, 0), | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "test with trailing end", | ||
bytes: []byte("foo=\"bar\"\n"), | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
want: []testutil.Metric{ | ||
testutil.Metric{ | ||
Measurement: "testlog", | ||
Tags: map[string]string{}, | ||
Fields: map[string]interface{}{ | ||
"foo": "bar", | ||
}, | ||
Time: time.Unix(0, 0), | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "logfmt parser returns all the fields", | ||
bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`), | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
want: []testutil.Metric{ | ||
testutil.Metric{ | ||
Measurement: "testlog", | ||
Tags: map[string]string{}, | ||
Fields: map[string]interface{}{ | ||
"lvl": "info", | ||
"msg": "http request", | ||
"method": "POST", | ||
"ts": "2018-07-24T19:43:40.275Z", | ||
}, | ||
Time: time.Unix(0, 0), | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "logfmt parser parses every line", | ||
bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"), | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
want: []testutil.Metric{ | ||
testutil.Metric{ | ||
Measurement: "testlog", | ||
Tags: map[string]string{}, | ||
Fields: map[string]interface{}{ | ||
"lvl": "info", | ||
"msg": "http request", | ||
"method": "POST", | ||
"ts": "2018-07-24T19:43:40.275Z", | ||
}, | ||
Time: time.Unix(0, 0), | ||
}, | ||
testutil.Metric{ | ||
Measurement: "testlog", | ||
Tags: map[string]string{}, | ||
Fields: map[string]interface{}{ | ||
"parent_id": "088876RL000", | ||
"duration": 7.45, | ||
"log_id": "09R4e4Rl000", | ||
}, | ||
Time: time.Unix(0, 0), | ||
}, | ||
}, | ||
}, | ||
{ | ||
name: "keys without = or values are ignored", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
bytes: []byte(`i am no data.`), | ||
want: []testutil.Metric{}, | ||
wantErr: false, | ||
}, | ||
{ | ||
name: "keys without values are ignored", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
bytes: []byte(`foo="" bar=`), | ||
want: []testutil.Metric{}, | ||
wantErr: false, | ||
}, | ||
{ | ||
name: "unterminated quote produces error", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
bytes: []byte(`bar=baz foo="bar`), | ||
want: []testutil.Metric{}, | ||
wantErr: true, | ||
}, | ||
{ | ||
name: "malformed key", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
bytes: []byte(`"foo=" bar=baz`), | ||
want: []testutil.Metric{}, | ||
wantErr: true, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
l := Parser{ | ||
MetricName: tt.measurement, | ||
Now: tt.now, | ||
} | ||
got, err := l.Parse(tt.bytes) | ||
if (err != nil) != tt.wantErr { | ||
t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) | ||
return | ||
} | ||
require.Equal(t, len(tt.want), len(got)) | ||
for i, m := range got { | ||
testutil.MustEqual(t, m, tt.want[i]) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestParseLine(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
s string | ||
measurement string | ||
now func() time.Time | ||
want testutil.Metric | ||
wantErr bool | ||
}{ | ||
{ | ||
name: "No Metric In line", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
want: testutil.Metric{}, | ||
wantErr: true, | ||
}, | ||
{ | ||
name: "Log parser fmt returns all fields", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`, | ||
want: testutil.Metric{ | ||
Measurement: "testlog", | ||
Fields: map[string]interface{}{ | ||
"ts": "2018-07-24T19:43:35.207268Z", | ||
"lvl": int64(5), | ||
"msg": "Write failed", | ||
"log_id": "09R4e4Rl000", | ||
}, | ||
Tags: map[string]string{}, | ||
Time: time.Unix(0, 0), | ||
}, | ||
}, | ||
{ | ||
name: "ParseLine only returns metrics from first string", | ||
now: func() time.Time { return time.Unix(0, 0) }, | ||
measurement: "testlog", | ||
s: "ts=2018-07-24T19:43:35.207268Z lvl=5 msg=\"Write failed\" log_id=09R4e4Rl000\nmethod=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000", | ||
want: testutil.Metric{ | ||
Measurement: "testlog", | ||
Fields: map[string]interface{}{ | ||
"ts": "2018-07-24T19:43:35.207268Z", | ||
"lvl": int64(5), | ||
"msg": "Write failed", | ||
"log_id": "09R4e4Rl000", | ||
}, | ||
Tags: map[string]string{}, | ||
Time: time.Unix(0, 0), | ||
}, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
l := Parser{ | ||
MetricName: tt.measurement, | ||
Now: tt.now, | ||
} | ||
got, err := l.ParseLine(tt.s) | ||
if (err != nil) != tt.wantErr { | ||
t.Fatalf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr) | ||
} | ||
if got != nil { | ||
testutil.MustEqual(t, got, tt.want) | ||
} | ||
}) | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if this is not set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default, doesn't telegraf use the plugin name of the input/processor that uses it?