Skip to content

Commit

Permalink
Add logfmt parser (influxdata#4539)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ayrdrie authored and otherpirate committed Mar 15, 2019
1 parent 8393f38 commit 35f7862
Show file tree
Hide file tree
Showing 4 changed files with 367 additions and 0 deletions.
17 changes: 17 additions & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Collectd](#collectd)
1. [Dropwizard](#dropwizard)
1. [Grok](#grok)
1. [Logfmt](#logfmt)
1. [Wavefront](#wavefront)

Telegraf metrics, like InfluxDB
Expand Down Expand Up @@ -882,6 +883,22 @@ the file output will only print once per `flush_interval`.
- If successful, add the next token, update the pattern and retest.
- Continue one token at a time until the entire line is successfully parsed.

# Logfmt
This parser implements the logfmt format by extracting and converting key-value pairs from log text in the form `<key>=<value>`.
At the moment, the plugin will produce one metric per line and all keys
are added as fields.
A typical log
```
method=GET host=influxdata.org ts=2018-07-24T19:43:40.275Z
connect=4ms service=8ms status=200 bytes=1653
```
will be converted into
```
logfmt method="GET",host="influxdata.org",ts="2018-07-24T19:43:40.275Z",connect="4ms",service="8ms",status=200i,bytes=1653i
```
Additional information about the logfmt format can be found [here](https://brandur.org/logfmt).

# Wavefront:

Wavefront Data Format is metrics are parsed directly into Telegraf metrics.
Expand Down
111 changes: 111 additions & 0 deletions plugins/parsers/logfmt/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package logfmt

import (
"bytes"
"fmt"
"strconv"
"time"

"github.com/go-logfmt/logfmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)

var (
ErrNoMetric = fmt.Errorf("no metric in line")
)

// Parser decodes logfmt formatted messages into metrics.
type Parser struct {
MetricName string
DefaultTags map[string]string
Now func() time.Time
}

// NewParser creates a parser.
func NewParser(metricName string, defaultTags map[string]string) *Parser {
return &Parser{
MetricName: metricName,
DefaultTags: defaultTags,
Now: time.Now,
}
}

// Parse converts a slice of bytes in logfmt format to metrics.
func (p *Parser) Parse(b []byte) ([]telegraf.Metric, error) {
reader := bytes.NewReader(b)
decoder := logfmt.NewDecoder(reader)
metrics := make([]telegraf.Metric, 0)
for {
ok := decoder.ScanRecord()
if !ok {
err := decoder.Err()
if err != nil {
return nil, err
}
break
}
fields := make(map[string]interface{})
for decoder.ScanKeyval() {
if string(decoder.Value()) == "" {
continue
}

//type conversions
value := string(decoder.Value())
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
fields[string(decoder.Key())] = iValue
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil {
fields[string(decoder.Key())] = fValue
} else if bValue, err := strconv.ParseBool(value); err == nil {
fields[string(decoder.Key())] = bValue
} else {
fields[string(decoder.Key())] = value
}
}
if len(fields) == 0 {
continue
}

m, err := metric.New(p.MetricName, map[string]string{}, fields, p.Now())
if err != nil {
return nil, err
}

metrics = append(metrics, m)
}
p.applyDefaultTags(metrics)
return metrics, nil
}

// ParseLine converts a single line of text in logfmt format to metrics.
func (p *Parser) ParseLine(s string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(s))
if err != nil {
return nil, err
}

if len(metrics) < 1 {
return nil, ErrNoMetric
}
return metrics[0], nil
}

// SetDefaultTags adds tags to the metrics outputs of Parse and ParseLine.
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}

func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) {
if len(p.DefaultTags) == 0 {
return
}

for _, m := range metrics {
for k, v := range p.DefaultTags {
if !m.HasTag(k) {
m.AddTag(k, v)
}
}
}
}
231 changes: 231 additions & 0 deletions plugins/parsers/logfmt/parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package logfmt

import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric {
t.Helper()
v, err := metric.New(m.Measurement, m.Tags, m.Fields, m.Time)
if err != nil {
t.Fatal(err)
}
return v
}

func TestParse(t *testing.T) {
tests := []struct {
name string
measurement string
now func() time.Time
bytes []byte
want []testutil.Metric
wantErr bool
}{
{
name: "no bytes returns no metrics",
now: func() time.Time { return time.Unix(0, 0) },
want: []testutil.Metric{},
},
{
name: "test without trailing end",
bytes: []byte("foo=\"bar\""),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"foo": "bar",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "test with trailing end",
bytes: []byte("foo=\"bar\"\n"),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"foo": "bar",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "logfmt parser returns all the fields",
bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"lvl": "info",
"msg": "http request",
"method": "POST",
"ts": "2018-07-24T19:43:40.275Z",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "logfmt parser parses every line",
bytes: []byte("ts=2018-07-24T19:43:40.275Z lvl=info msg=\"http request\" method=POST\nparent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000"),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"lvl": "info",
"msg": "http request",
"method": "POST",
"ts": "2018-07-24T19:43:40.275Z",
},
Time: time.Unix(0, 0),
},
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"parent_id": "088876RL000",
"duration": 7.45,
"log_id": "09R4e4Rl000",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "keys without = or values are ignored",
now: func() time.Time { return time.Unix(0, 0) },
bytes: []byte(`i am no data.`),
want: []testutil.Metric{},
wantErr: false,
},
{
name: "keys without values are ignored",
now: func() time.Time { return time.Unix(0, 0) },
bytes: []byte(`foo="" bar=`),
want: []testutil.Metric{},
wantErr: false,
},
{
name: "unterminated quote produces error",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
bytes: []byte(`bar=baz foo="bar`),
want: []testutil.Metric{},
wantErr: true,
},
{
name: "malformed key",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
bytes: []byte(`"foo=" bar=baz`),
want: []testutil.Metric{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := Parser{
MetricName: tt.measurement,
Now: tt.now,
}
got, err := l.Parse(tt.bytes)
if (err != nil) != tt.wantErr {
t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr)
return
}
require.Equal(t, len(tt.want), len(got))
for i, m := range got {
testutil.MustEqual(t, m, tt.want[i])
}
})
}
}

func TestParseLine(t *testing.T) {
tests := []struct {
name string
s string
measurement string
now func() time.Time
want testutil.Metric
wantErr bool
}{
{
name: "No Metric In line",
now: func() time.Time { return time.Unix(0, 0) },
want: testutil.Metric{},
wantErr: true,
},
{
name: "Log parser fmt returns all fields",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`,
want: testutil.Metric{
Measurement: "testlog",
Fields: map[string]interface{}{
"ts": "2018-07-24T19:43:35.207268Z",
"lvl": int64(5),
"msg": "Write failed",
"log_id": "09R4e4Rl000",
},
Tags: map[string]string{},
Time: time.Unix(0, 0),
},
},
{
name: "ParseLine only returns metrics from first string",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
s: "ts=2018-07-24T19:43:35.207268Z lvl=5 msg=\"Write failed\" log_id=09R4e4Rl000\nmethod=POST parent_id=088876RL000 duration=7.45 log_id=09R4e4Rl000",
want: testutil.Metric{
Measurement: "testlog",
Fields: map[string]interface{}{
"ts": "2018-07-24T19:43:35.207268Z",
"lvl": int64(5),
"msg": "Write failed",
"log_id": "09R4e4Rl000",
},
Tags: map[string]string{},
Time: time.Unix(0, 0),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := Parser{
MetricName: tt.measurement,
Now: tt.now,
}
got, err := l.ParseLine(tt.s)
if (err != nil) != tt.wantErr {
t.Fatalf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr)
}
if got != nil {
testutil.MustEqual(t, got, tt.want)
}
})
}
}
Loading

0 comments on commit 35f7862

Please sign in to comment.