Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exec input with Graphite protocol support #637

Closed
wants to merge 11 commits into from
2 changes: 1 addition & 1 deletion Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ golang.org/x/text 6d3c22c4525a4da167968fa2479be5524d2e8bd0
gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
2 changes: 1 addition & 1 deletion Godeps_windows
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ golang.org/x/text 6fc2e00a0d64b1f7fc1212dae5b0c939cf6d9ac4
gopkg.in/dancannon/gorethink.v1 6f088135ff288deb9d5546f4c71919207f891a70
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/mgo.v2 03c9f3ee4c14c8e51ee521a6a7d0425658dd6f64
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
gopkg.in/yaml.v2 f7716cbe52baa25d2e9b0d0da546fcf909fc16b4
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ Currently implemented sources:
* disque
* docker
* elasticsearch
* exec (generic JSON-emitting executable plugin)
* exec (generic executable plugin, support JSON, influx and graphite)
* haproxy
* httpjson (generic JSON-emitting http service plugin)
* influxdb
Expand Down
31 changes: 31 additions & 0 deletions internal/encoding/encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package encoding

import (
"fmt"

"github.com/influxdata/telegraf"
)

type Parser interface {
InitConfig(configs map[string]interface{}) error
Parse(buf []byte) ([]telegraf.Metric, error)
ParseLine(line string) (telegraf.Metric, error)
}

type Creator func() Parser

var Parsers = map[string]Creator{}

func Add(name string, creator Creator) {
Parsers[name] = creator
}

func NewParser(dataFormat string, configs map[string]interface{}) (parser Parser, err error) {
creator := Parsers[dataFormat]
if creator == nil {
return nil, fmt.Errorf("Unsupported data format: %s. ", dataFormat)
}
parser = creator()
err = parser.InitConfig(configs)
return parser, err
}
135 changes: 135 additions & 0 deletions internal/encoding/graphite/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package graphite

import (
"fmt"
"strings"
)

const (
// DefaultSeparator is the default join character to use when joining multiple
// measurment parts in a template.
DefaultSeparator = "."
)

// Config represents the configuration for Graphite endpoints.
type Config struct {
Separator string
Templates []string
}

// Validate validates the config's templates and tags.
func (c *Config) Validate() error {
if err := c.validateTemplates(); err != nil {
return err
}

return nil
}

func (c *Config) validateTemplates() error {
// map to keep track of filters we see
filters := map[string]struct{}{}

for i, t := range c.Templates {
parts := strings.Fields(t)
// Ensure template string is non-empty
if len(parts) == 0 {
return fmt.Errorf("missing template at position: %d", i)
}
if len(parts) == 1 && parts[0] == "" {
return fmt.Errorf("missing template at position: %d", i)
}

if len(parts) > 3 {
return fmt.Errorf("invalid template format: '%s'", t)
}

template := t
filter := ""
tags := ""
if len(parts) >= 2 {
// We could have <filter> <template> or <template> <tags>. Equals is only allowed in
// tags section.
if strings.Contains(parts[1], "=") {
template = parts[0]
tags = parts[1]
} else {
filter = parts[0]
template = parts[1]
}
}

if len(parts) == 3 {
tags = parts[2]
}

// Validate the template has one and only one measurement
if err := c.validateTemplate(template); err != nil {
return err
}

// Prevent duplicate filters in the config
if _, ok := filters[filter]; ok {
return fmt.Errorf("duplicate filter '%s' found at position: %d", filter, i)
}
filters[filter] = struct{}{}

if filter != "" {
// Validate filter expression is valid
if err := c.validateFilter(filter); err != nil {
return err
}
}

if tags != "" {
// Validate tags
for _, tagStr := range strings.Split(tags, ",") {
if err := c.validateTag(tagStr); err != nil {
return err
}
}
}
}
return nil
}

func (c *Config) validateTemplate(template string) error {
hasMeasurement := false
for _, p := range strings.Split(template, ".") {
if p == "measurement" || p == "measurement*" {
hasMeasurement = true
}
}

if !hasMeasurement {
return fmt.Errorf("no measurement in template `%s`", template)
}

return nil
}

func (c *Config) validateFilter(filter string) error {
for _, p := range strings.Split(filter, ".") {
if p == "" {
return fmt.Errorf("filter contains blank section: %s", filter)
}

if strings.Contains(p, "*") && p != "*" {
return fmt.Errorf("invalid filter wildcard section: %s", filter)
}
}
return nil
}

func (c *Config) validateTag(keyValue string) error {
parts := strings.Split(keyValue, "=")
if len(parts) != 2 {
return fmt.Errorf("invalid template tags: '%s'", keyValue)
}

if parts[0] == "" || parts[1] == "" {
return fmt.Errorf("invalid template tags: %s'", keyValue)
}

return nil
}
14 changes: 14 additions & 0 deletions internal/encoding/graphite/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package graphite

import "fmt"

// An UnsupposedValueError is returned when a parsed value is not
// supposed.
type UnsupposedValueError struct {
Field string
Value float64
}

func (err *UnsupposedValueError) Error() string {
return fmt.Sprintf(`field "%s" value: "%v" is unsupported`, err.Field, err.Value)
}
Loading