Skip to content

Commit

Permalink
Allow exec plugin to parse line-protocol
Browse files Browse the repository at this point in the history
closes #613
  • Loading branch information
sparrc committed Jan 29, 2016
1 parent 338341a commit c925299
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 16 deletions.
47 changes: 31 additions & 16 deletions plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"os/exec"
"time"

"github.com/gonuts/go-shellquote"

Expand All @@ -14,18 +15,20 @@ import (
)

const sampleConfig = `
# NOTE This plugin only reads numerical measurements, strings and booleans
# will be ignored.
# the command to run
command = "/usr/bin/mycollector --foo=bar"
# Data format to consume. This can be "json" or "influx" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "json"
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
`

type Exec struct {
Command string
Command string
DataFormat string

runner Runner
}
Expand Down Expand Up @@ -71,20 +74,32 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
return err
}

var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut)
if err != nil {
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s",
e.Command, err)
}

f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
switch e.DataFormat {
case "", "json":
var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut)
if err != nil {
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s",
e.Command, err)
}

f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
return err
}
acc.AddFields("exec", f.Fields, nil)
case "influx":
now := time.Now()
metrics, err := telegraf.ParseMetrics(out)
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now)
}
return err
default:
return fmt.Errorf("Unsupported data format: %s. Must be either json "+
"or influx.", e.DataFormat)
}

acc.AddFields("exec", f.Fields, nil)
return nil
}

Expand Down
73 changes: 73 additions & 0 deletions plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ const malformedJson = `
"status": "green",
`

const lineProtocol = `cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
`

const lineProtocol2 = `cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu3,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu4,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu5,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
`

type runnerMock struct {
out []byte
err error
Expand Down Expand Up @@ -97,3 +109,64 @@ func TestCommandError(t *testing.T) {
require.Error(t, err)
assert.Equal(t, acc.NFields(), 0, "No new points should have been added")
}

func TestLineProtocolParse(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol), nil),
Command: "line-protocol",
DataFormat: "influx",
}

var acc testutil.Accumulator
err := e.Gather(&acc)
require.NoError(t, err)

fields := map[string]interface{}{
"usage_idle": float64(99),
"usage_busy": float64(1),
}
tags := map[string]string{
"host": "foo",
"datacenter": "us-east",
}
acc.AssertContainsTaggedFields(t, "cpu", fields, tags)
}

func TestLineProtocolParseMultiple(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol2), nil),
Command: "line-protocol",
DataFormat: "influx",
}

var acc testutil.Accumulator
err := e.Gather(&acc)
require.NoError(t, err)

fields := map[string]interface{}{
"usage_idle": float64(99),
"usage_busy": float64(1),
}
tags := map[string]string{
"host": "foo",
"datacenter": "us-east",
}
cpuTags := []string{"cpu0", "cpu1", "cpu2", "cpu3", "cpu4", "cpu5", "cpu6"}

for _, cpu := range cpuTags {
tags["cpu"] = cpu
acc.AssertContainsTaggedFields(t, "cpu", fields, tags)
}
}

func TestInvalidDataFormat(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol), nil),
Command: "bad data format",
DataFormat: "FooBar",
}

var acc testutil.Accumulator
err := e.Gather(&acc)
require.Error(t, err)
}

0 comments on commit c925299

Please sign in to comment.