Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parser/logfmt - unfinished #4467

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Collectd](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#collectd)
1. [Dropwizard](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#dropwizard)
1. [Grok](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#grok)
1. [Logfmt](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#logfmt)

Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
Expand Down Expand Up @@ -761,4 +762,9 @@ HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}
## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
## 3. UTC -- or blank/unspecified, will return timestamp in UTC
grok_timezone = "Canada/Eastern"
```
```

# Logfmt
For extracting key-value pairs from log text in the form `<key>=<value>`.
At the moment, the plugin will produce one metric per line and all keys
are added as fields. Values are left as strings (for now).
110 changes: 110 additions & 0 deletions plugins/parsers/logfmt/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Package logfmt converts logfmt data into metrics.
package logfmt

import (
"bytes"
"fmt"
"strconv"
"strings"
"time"

glogfmt "github.com/go-logfmt/logfmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)

// Parser decodes logfmt formatted messages into metrics.
type Parser struct {
MetricName string
DefaultTags map[string]string
Now func() time.Time
}

// NewParser creates a parser.
func NewParser(metricName string, defaultTags map[string]string) *Parser {
return &Parser{
MetricName: metricName,
DefaultTags: defaultTags,
Now: time.Now,
}
}

// Parse converts a slice of bytes in logfmt format to metrics.
func (l *Parser) Parse(b []byte) ([]telegraf.Metric, error) {
reader := bytes.NewReader(b)
decoder := glogfmt.NewDecoder(reader)
metrics := make([]telegraf.Metric, 0)
for decoder.ScanRecord() {
tags := make(map[string]string)
fields := make(map[string]interface{})
//add default tags
for k, v := range l.DefaultTags {
tags[k] = v
}

for decoder.ScanKeyval() {
if string(decoder.Value()) == "" {
return metrics, fmt.Errorf("value could not be found for key: %v", string(decoder.Key()))
}

//attempt type conversions
value := string(decoder.Value())
if iValue, err := strconv.Atoi(value); err == nil {
fields[string(decoder.Key())] = iValue
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil {
fields[string(decoder.Key())] = fValue
} else if bValue, err := strconv.ParseBool(value); err == nil {
fields[string(decoder.Key())] = bValue
} else {
fields[string(decoder.Key())] = value
}
}
m, err := metric.New(l.MetricName, tags, fields, l.Now())
if err != nil {
return nil, err
}
metrics = append(metrics, m)
}
return metrics, nil
}

// ParseLine converts a single line of text in logfmt to metrics.
func (l *Parser) ParseLine(s string) (telegraf.Metric, error) {
reader := strings.NewReader(s)
decoder := glogfmt.NewDecoder(reader)

decoder.ScanRecord()
tags := make(map[string]string)
fields := make(map[string]interface{})
//add default tags
for k, v := range l.DefaultTags {
tags[k] = v
}

for decoder.ScanKeyval() {
if string(decoder.Value()) == "" {
return nil, fmt.Errorf("value could not be found for key: %v", string(decoder.Key()))
}
//attempt type conversions
value := string(decoder.Value())
if iValue, err := strconv.ParseInt(value, 10, 64); err == nil {
fields[string(decoder.Key())] = iValue
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil {
fields[string(decoder.Key())] = fValue
} else if bValue, err := strconv.ParseBool(value); err == nil {
fields[string(decoder.Key())] = bValue
} else {
fields[string(decoder.Key())] = value
}
}
m, err := metric.New(l.MetricName, tags, fields, l.Now())
if err != nil {
return nil, err
}
return m, nil
}

// SetDefaultTags adds tags to the metrics outputs of Parse and ParseLine.
func (l *Parser) SetDefaultTags(tags map[string]string) {
l.DefaultTags = tags
}
130 changes: 130 additions & 0 deletions plugins/parsers/logfmt/parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package logfmt

import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)

func MustMetric(t *testing.T, m *testutil.Metric) telegraf.Metric {
t.Helper()
v, err := metric.New(m.Measurement, m.Tags, m.Fields, m.Time)
if err != nil {
t.Fatal(err)
}
return v
}

func TestParse(t *testing.T) {
tests := []struct {
name string
measurement string
now func() time.Time
bytes []byte
want []testutil.Metric
wantErr bool
}{
{
name: "no bytes returns no metrics",
now: func() time.Time { return time.Unix(0, 0) },
want: []testutil.Metric{},
},
{
name: "logfmt parser returns all the fields",
bytes: []byte(`ts=2018-07-24T19:43:40.275Z lvl=info msg="http request" method=POST`),
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
want: []testutil.Metric{
testutil.Metric{
Measurement: "testlog",
Tags: map[string]string{},
Fields: map[string]interface{}{
"lvl": "info",
"msg": "http request",
"method": "POST",
"ts": "2018-07-24T19:43:40.275Z",
},
Time: time.Unix(0, 0),
},
},
},
{
name: "poorly formatted logfmt returns error",
now: func() time.Time { return time.Unix(0, 0) },
bytes: []byte(`i am garbage data.`),
want: []testutil.Metric{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := Parser{
MetricName: tt.measurement,
Now: tt.now,
}
got, err := l.Parse(tt.bytes)
if (err != nil) != tt.wantErr {
t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr)
return
}
for i, m := range got {
testutil.MustEqual(t, m, tt.want[i])
}
})
}
}

func TestParseLine(t *testing.T) {
tests := []struct {
name string
s string
measurement string
now func() time.Time
want testutil.Metric
wantErr bool
}{
{
name: "test something",
now: func() time.Time { return time.Unix(0, 0) },
want: testutil.Metric{
Tags: map[string]string{},
Fields: map[string]interface{}{},
Time: time.Unix(0, 0),
},
},
{
name: "log parser fmt returns all fields",
now: func() time.Time { return time.Unix(0, 0) },
measurement: "testlog",
s: `ts=2018-07-24T19:43:35.207268Z lvl=5 msg="Write failed" log_id=09R4e4Rl000`,
want: testutil.Metric{
Measurement: "testlog",
Fields: map[string]interface{}{
"ts": "2018-07-24T19:43:35.207268Z",
"lvl": int64(5),
"msg": "Write failed",
"log_id": "09R4e4Rl000",
},
Tags: map[string]string{},
Time: time.Unix(0, 0),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := Parser{
MetricName: tt.measurement,
Now: tt.now,
}
got, err := l.ParseLine(tt.s)
if (err != nil) != tt.wantErr {
t.Errorf("Logfmt.Parse error = %v, wantErr %v", err, tt.wantErr)
return
}
testutil.MustEqual(t, got, tt.want)
})
}
}
8 changes: 8 additions & 0 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/logfmt"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
"github.com/influxdata/telegraf/plugins/parsers/value"
)
Expand Down Expand Up @@ -139,6 +140,8 @@ func NewParser(config *Config) (Parser, error) {
config.GrokCustomPatterns,
config.GrokCustomPatternFiles,
config.GrokTimeZone)
case "logfmt":
parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags)
default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
}
Expand Down Expand Up @@ -238,3 +241,8 @@ func NewDropwizardParser(
}
return parser, err
}

// NewLogFmtParser returns a logfmt parser with the default options.
func NewLogFmtParser(metricName string, defaultTags map[string]string) (Parser, error) {
return logfmt.NewParser(metricName, defaultTags), nil
}
85 changes: 85 additions & 0 deletions plugins/processors/fieldparser/fieldparser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package fieldparser

import (
"fmt"
"log"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/processors"
)

type FieldParser struct {
config parsers.Config
parseFields []string `toml:"parse_fields"`
parseTags []string `toml:"parse_tags"`
Parser parsers.Parser
}

// holds a default sample config
var SampleConfig = `
## specify the name of the tag[s] whose value will be parsed
parse_tags = []

## specify the name of the field[s] whose value will be parsed
parse_fields = []

[processors.fieldparser.config]
data_format = "logfmt"
## additional configurations for parser go here
`

// returns the default config
func (p *FieldParser) SampleConfig() string {
return SampleConfig
}

// returns a brief description of the processor
func (p *FieldParser) Description() string {
return "Parse a value in a specified field/tag(s) and add the result in a new metric"
}

func (p *FieldParser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
if p.Parser == nil {
var err error
p.Parser, err = parsers.NewParser(&p.config)
if err != nil {
log.Printf("E! [processors.fieldparser] could not create parser: %v", err)
return metrics
}
}

for _, metric := range metrics {
for _, key := range p.parseFields {
value := metric.Fields()[key]
strVal := fmt.Sprintf("%v", value)
nMetrics, err := p.parseField(strVal)
if err != nil {
log.Printf("E! [processors.fieldparser] could not parse field %v: %v", key, err)
return metrics
}
metrics = append(metrics, nMetrics...)
}
for _, key := range p.parseTags {
value := metric.Tags()[key]
nMetrics, err := p.parseField(value)
if err != nil {
log.Printf("E! [processors.fieldparser] could not parse field %v: %v", key, err)
return metrics
}
metrics = append(metrics, nMetrics...)
}
}
return metrics

}

func (p *FieldParser) parseField(value string) ([]telegraf.Metric, error) {
return p.Parser.Parse([]byte(value))
}

func init() {
processors.Add("fieldparser", func() telegraf.Processor {
return &FieldParser{}
})
}
Loading