Skip to content

Commit

Permalink
Add Wavefront parser (#4402)
Browse files Browse the repository at this point in the history
  • Loading branch information
puckpuck authored and glinton committed Aug 13, 2018
1 parent b9ff1d0 commit 6454319
Show file tree
Hide file tree
Showing 8 changed files with 809 additions and 14 deletions.
27 changes: 27 additions & 0 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Telegraf is able to parse the following input data formats into metrics:
1. [Collectd](#collectd)
1. [Dropwizard](#dropwizard)
1. [Grok](#grok)
1. [Wavefront](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#wavefront)

Telegraf metrics, like InfluxDB
[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
Expand Down Expand Up @@ -881,3 +882,29 @@ the file output will only print once per `flush_interval`.
- Continue one token at a time until the entire line is successfully parsed.


```
# Wavefront:
Wavefront Data Format is metrics are parsed directly into Telegraf metrics.
For more information about the Wavefront Data Format see
[here](https://docs.wavefront.com/wavefront_data_format.html).
There are no additional configuration options for Wavefront Data Format line-protocol.
#### Wavefront Configuration:
```toml
[[inputs.exec]]
## Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]
## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "wavefront"
```
34 changes: 20 additions & 14 deletions plugins/outputs/wavefront/wavefront.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,26 +189,32 @@ func buildTags(mTags map[string]string, w *Wavefront) (string, map[string]string
}

var source string
sourceTagFound := false

for _, s := range w.SourceOverride {
for k, v := range mTags {
if k == s {
source = v
mTags["telegraf_host"] = mTags["host"]
sourceTagFound = true
delete(mTags, k)

if s, ok := mTags["source"]; ok {
source = s
delete(mTags, "source")
} else {
sourceTagFound := false
for _, s := range w.SourceOverride {
for k, v := range mTags {
if k == s {
source = v
mTags["telegraf_host"] = mTags["host"]
sourceTagFound = true
delete(mTags, k)
break
}
}
if sourceTagFound {
break
}
}
if sourceTagFound {
break

if !sourceTagFound {
source = mTags["host"]
}
}

if !sourceTagFound {
source = mTags["host"]
}
delete(mTags, "host")

return tagValueReplacer.Replace(source), mTags
Expand Down
7 changes: 7 additions & 0 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/plugins/parsers/wavefront"
)

// ParserInput is an interface for input plugins that are able to parse
Expand Down Expand Up @@ -131,6 +132,8 @@ func NewParser(config *Config) (Parser, error) {
config.DefaultTags,
config.Separator,
config.Templates)
case "wavefront":
parser, err = NewWavefrontParser(config.DefaultTags)
case "grok":
parser, err = newGrokParser(
config.MetricName,
Expand Down Expand Up @@ -238,3 +241,7 @@ func NewDropwizardParser(
}
return parser, err
}

func NewWavefrontParser(defaultTags map[string]string) (Parser, error) {
return wavefront.NewWavefrontParser(defaultTags), nil
}
238 changes: 238 additions & 0 deletions plugins/parsers/wavefront/element.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package wavefront

import (
"errors"
"fmt"
"strconv"
"time"
)

var (
ErrEOF = errors.New("EOF")
ErrInvalidTimestamp = errors.New("Invalid timestamp")
)

// Interface for parsing line elements.
type ElementParser interface {
parse(p *PointParser, pt *Point) error
}

type NameParser struct{}
type ValueParser struct{}
type TimestampParser struct {
optional bool
}
type WhiteSpaceParser struct {
nextOptional bool
}
type TagParser struct{}
type LoopedParser struct {
wrappedParser ElementParser
wsPaser *WhiteSpaceParser
}
type LiteralParser struct {
literal string
}

func (ep *NameParser) parse(p *PointParser, pt *Point) error {
//Valid characters are: a-z, A-Z, 0-9, hyphen ("-"), underscore ("_"), dot (".").
// Forward slash ("/") and comma (",") are allowed if metricName is enclosed in double quotes.
name, err := parseLiteral(p)
if err != nil {
return err
}
pt.Name = name
return nil
}

func (ep *ValueParser) parse(p *PointParser, pt *Point) error {
tok, lit := p.scan()
if tok == EOF {
return fmt.Errorf("found %q, expected number", lit)
}

p.writeBuf.Reset()
if tok == MINUS_SIGN {
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}

for tok != EOF && (tok == LETTER || tok == NUMBER || tok == DOT) {
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}
p.unscan()

pt.Value = p.writeBuf.String()
_, err := strconv.ParseFloat(pt.Value, 64)
if err != nil {
return fmt.Errorf("invalid metric value %s", pt.Value)
}
return nil
}

func (ep *TimestampParser) parse(p *PointParser, pt *Point) error {
tok, lit := p.scan()
if tok == EOF {
if ep.optional {
p.unscanTokens(2)
return setTimestamp(pt, 0, 1)
}
return fmt.Errorf("found %q, expected number", lit)
}

if tok != NUMBER {
if ep.optional {
p.unscanTokens(2)
return setTimestamp(pt, 0, 1)
}
return ErrInvalidTimestamp
}

p.writeBuf.Reset()
for tok != EOF && tok == NUMBER {
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}
p.unscan()

tsStr := p.writeBuf.String()
ts, err := strconv.ParseInt(tsStr, 10, 64)
if err != nil {
return err
}
return setTimestamp(pt, ts, len(tsStr))
}

func setTimestamp(pt *Point, ts int64, numDigits int) error {

if numDigits == 19 {
// nanoseconds
ts = ts / 1e9
} else if numDigits == 16 {
// microseconds
ts = ts / 1e6
} else if numDigits == 13 {
// milliseconds
ts = ts / 1e3
} else if numDigits != 10 {
// must be in seconds, return error if not 0
if ts == 0 {
ts = getCurrentTime()
} else {
return ErrInvalidTimestamp
}
}
pt.Timestamp = ts
return nil
}

func (ep *LoopedParser) parse(p *PointParser, pt *Point) error {
for {
err := ep.wrappedParser.parse(p, pt)
if err != nil {
return err
}
err = ep.wsPaser.parse(p, pt)
if err == ErrEOF {
break
}
}
return nil
}

func (ep *TagParser) parse(p *PointParser, pt *Point) error {
k, err := parseLiteral(p)
if err != nil {
if k == "" {
return nil
}
return err
}

next, lit := p.scan()
if next != EQUALS {
return fmt.Errorf("found %q, expected equals", lit)
}

v, err := parseLiteral(p)
if err != nil {
return err
}
if len(pt.Tags) == 0 {
pt.Tags = make(map[string]string)
}
pt.Tags[k] = v
return nil
}

func (ep *WhiteSpaceParser) parse(p *PointParser, pt *Point) error {
tok := WS
for tok != EOF && tok == WS {
tok, _ = p.scan()
}

if tok == EOF {
if !ep.nextOptional {
return ErrEOF
}
return nil
}
p.unscan()
return nil
}

func (ep *LiteralParser) parse(p *PointParser, pt *Point) error {
l, err := parseLiteral(p)
if err != nil {
return err
}

if l != ep.literal {
return fmt.Errorf("found %s, expected %s", l, ep.literal)
}
return nil
}

func parseQuotedLiteral(p *PointParser) (string, error) {
p.writeBuf.Reset()

escaped := false
tok, lit := p.scan()
for tok != EOF && (tok != QUOTES || (tok == QUOTES && escaped)) {
// let everything through
escaped = tok == BACKSLASH
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}
if tok == EOF {
return "", fmt.Errorf("found %q, expected quotes", lit)
}
return p.writeBuf.String(), nil
}

func parseLiteral(p *PointParser) (string, error) {
tok, lit := p.scan()
if tok == EOF {
return "", fmt.Errorf("found %q, expected literal", lit)
}

if tok == QUOTES {
return parseQuotedLiteral(p)
}

p.writeBuf.Reset()
for tok != EOF && tok > literal_beg && tok < literal_end {
p.writeBuf.WriteString(lit)
tok, lit = p.scan()
}
if tok == QUOTES {
return "", errors.New("found quote inside unquoted literal")
}
p.unscan()
return p.writeBuf.String(), nil
}

func getCurrentTime() int64 {
return time.Now().UnixNano() / 1e9
}
Loading

0 comments on commit 6454319

Please sign in to comment.