Skip to content

Commit

Permalink
Add Graphite line protocol parsing to exec plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
henrypfhu authored and geodimm committed Mar 10, 2016
1 parent 419bab4 commit d9a96a6
Show file tree
Hide file tree
Showing 12 changed files with 950 additions and 59 deletions.
2 changes: 1 addition & 1 deletion Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ golang.org/x/text 6d3c22c4525a4da167968fa2479be5524d2e8bd0
gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
2 changes: 1 addition & 1 deletion Godeps_windows
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ golang.org/x/text 6fc2e00a0d64b1f7fc1212dae5b0c939cf6d9ac4
gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ Currently implemented sources:
* disque
* docker
* elasticsearch
* exec (generic JSON-emitting executable plugin)
* exec (generic executable plugin, support JSON, influx and graphite)
* haproxy
* httpjson (generic JSON-emitting http service plugin)
* influxdb
Expand Down
31 changes: 31 additions & 0 deletions internal/encoding/encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package encoding

import (
"fmt"

"github.com/influxdata/telegraf"
)

type Parser interface {
InitConfig(configs map[string]interface{}) error
Parse(buf []byte) ([]telegraf.Metric, error)
ParseLine(line string) (telegraf.Metric, error)
}

type Creator func() Parser

var Parsers = map[string]Creator{}

func Add(name string, creator Creator) {
Parsers[name] = creator
}

func NewParser(dataFormat string, configs map[string]interface{}) (parser Parser, err error) {
creator := Parsers[dataFormat]
if creator == nil {
return nil, fmt.Errorf("Unsupported data format: %s. ", dataFormat)
}
parser = creator()
err = parser.InitConfig(configs)
return parser, err
}
135 changes: 135 additions & 0 deletions internal/encoding/graphite/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package graphite

import (
"fmt"
"strings"
)

const (
// DefaultSeparator is the default join character to use when joining multiple
// measurment parts in a template.
DefaultSeparator = "."
)

// Config represents the configuration for Graphite endpoints.
type Config struct {
Separator string
Templates []string
}

// Validate validates the config's templates and tags.
func (c *Config) Validate() error {
if err := c.validateTemplates(); err != nil {
return err
}

return nil
}

func (c *Config) validateTemplates() error {
// map to keep track of filters we see
filters := map[string]struct{}{}

for i, t := range c.Templates {
parts := strings.Fields(t)
// Ensure template string is non-empty
if len(parts) == 0 {
return fmt.Errorf("missing template at position: %d", i)
}
if len(parts) == 1 && parts[0] == "" {
return fmt.Errorf("missing template at position: %d", i)
}

if len(parts) > 3 {
return fmt.Errorf("invalid template format: '%s'", t)
}

template := t
filter := ""
tags := ""
if len(parts) >= 2 {
// We could have <filter> <template> or <template> <tags>. Equals is only allowed in
// tags section.
if strings.Contains(parts[1], "=") {
template = parts[0]
tags = parts[1]
} else {
filter = parts[0]
template = parts[1]
}
}

if len(parts) == 3 {
tags = parts[2]
}

// Validate the template has one and only one measurement
if err := c.validateTemplate(template); err != nil {
return err
}

// Prevent duplicate filters in the config
if _, ok := filters[filter]; ok {
return fmt.Errorf("duplicate filter '%s' found at position: %d", filter, i)
}
filters[filter] = struct{}{}

if filter != "" {
// Validate filter expression is valid
if err := c.validateFilter(filter); err != nil {
return err
}
}

if tags != "" {
// Validate tags
for _, tagStr := range strings.Split(tags, ",") {
if err := c.validateTag(tagStr); err != nil {
return err
}
}
}
}
return nil
}

func (c *Config) validateTemplate(template string) error {
hasMeasurement := false
for _, p := range strings.Split(template, ".") {
if p == "measurement" || p == "measurement*" {
hasMeasurement = true
}
}

if !hasMeasurement {
return fmt.Errorf("no measurement in template `%s`", template)
}

return nil
}

func (c *Config) validateFilter(filter string) error {
for _, p := range strings.Split(filter, ".") {
if p == "" {
return fmt.Errorf("filter contains blank section: %s", filter)
}

if strings.Contains(p, "*") && p != "*" {
return fmt.Errorf("invalid filter wildcard section: %s", filter)
}
}
return nil
}

func (c *Config) validateTag(keyValue string) error {
parts := strings.Split(keyValue, "=")
if len(parts) != 2 {
return fmt.Errorf("invalid template tags: '%s'", keyValue)
}

if parts[0] == "" || parts[1] == "" {
return fmt.Errorf("invalid template tags: %s'", keyValue)
}

return nil
}
14 changes: 14 additions & 0 deletions internal/encoding/graphite/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package graphite

import "fmt"

// An UnsupposedValueError is returned when a parsed value is not
// supposed.
type UnsupposedValueError struct {
Field string
Value float64
}

func (err *UnsupposedValueError) Error() string {
return fmt.Sprintf(`field "%s" value: "%v" is unsupported`, err.Field, err.Value)
}
Loading

0 comments on commit d9a96a6

Please sign in to comment.