Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exec plugin #92

Merged
merged 2 commits into from
Aug 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package all
import (
_ "github.com/influxdb/telegraf/plugins/disque"
_ "github.com/influxdb/telegraf/plugins/elasticsearch"
_ "github.com/influxdb/telegraf/plugins/exec"
_ "github.com/influxdb/telegraf/plugins/haproxy"
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/lustre2"
Expand Down
125 changes: 125 additions & 0 deletions plugins/exec/exec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package exec

import (
"bytes"
"encoding/json"
"fmt"
"github.com/gonuts/go-shellquote"
"github.com/influxdb/telegraf/plugins"
"os/exec"
"sync"
)

const sampleConfig = `
# specify commands via an array of tables
[[exec.commands]]
# the command to run
command = "/usr/bin/mycollector --foo=bar"

# name of the command (used as a prefix for measurements)
name = "mycollector"
`

type Command struct {
Command string
Name string
}

type Exec struct {
Commands []*Command
runner Runner
}

type Runner interface {
Run(string, ...string) ([]byte, error)
}

type CommandRunner struct {
}

func NewExec() *Exec {
return &Exec{runner: CommandRunner{}}
}

func (c CommandRunner) Run(command string, args ...string) ([]byte, error) {
cmd := exec.Command(command, args...)
var out bytes.Buffer
cmd.Stdout = &out

if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("exec: %s for command '%s'", err, command)
}

return out.Bytes(), nil
}

func (e *Exec) SampleConfig() string {
return sampleConfig
}

func (e *Exec) Description() string {
return "Read flattened metrics from one or more commands that output JSON to stdout"
}

func (e *Exec) Gather(acc plugins.Accumulator) error {
var wg sync.WaitGroup

var outerr error

for _, c := range e.Commands {
wg.Add(1)
go func(c *Command, acc plugins.Accumulator) {
defer wg.Done()
outerr = e.gatherCommand(c, acc)
}(c, acc)
}

wg.Wait()

return outerr
}

func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error {
words, err := shellquote.Split(c.Command)
if err != nil || len(words) == 0 {
return fmt.Errorf("exec: unable to parse command, %s", err)
}

out, err := e.runner.Run(words[0], words[1:]...)
if err != nil {
return err
}

var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut)
if err != nil {
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err)
}

return processResponse(acc, c.Name, map[string]string{}, jsonOut)
}

func processResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) error {
switch t := v.(type) {
case map[string]interface{}:
for k, v := range t {
if err := processResponse(acc, prefix+"_"+k, tags, v); err != nil {
return err
}
}
case float64:
acc.Add(prefix, v, tags)
case bool, string, []interface{}:
// ignored types
return nil
default:
return fmt.Errorf("exec: got unexpected type %T with value %v (%s)", t, v, prefix)
}
return nil
}

func init() {
plugins.Add("exec", func() plugins.Plugin {
return NewExec()
})
}
88 changes: 88 additions & 0 deletions plugins/exec/exec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package exec

import (
"fmt"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
)

const validJson = `
{
"status": "green",
"num_processes": 82,
"cpu": {
"status": "red",
"used": 8234,
"free": 32
},
"percent": 0.81,
"users": [0, 1, 2, 3]
}`

const malformedJson = `
{
"status": "green",
`

type runnerMock struct {
out []byte
err error
}

func newRunnerMock(out []byte, err error) Runner {
return &runnerMock{out: out, err: err}
}

func (r runnerMock) Run(command string, args ...string) ([]byte, error) {
if r.err != nil {
return nil, r.err
}
return r.out, nil
}

func TestExec(t *testing.T) {
runner := newRunnerMock([]byte(validJson), nil)
command := Command{Command: "testcommand arg1", Name: "mycollector"}
e := &Exec{runner: runner, Commands: []*Command{&command}}

var acc testutil.Accumulator
err := e.Gather(&acc)
require.NoError(t, err)

checkFloat := []struct {
name string
value float64
}{
{"mycollector_num_processes", 82},
{"mycollector_cpu_used", 8234},
{"mycollector_cpu_free", 32},
{"mycollector_percent", 0.81},
}

for _, c := range checkFloat {
assert.True(t, acc.CheckValue(c.name, c.value))
}

assert.Equal(t, len(acc.Points), 4, "non-numeric measurements should be ignored")
}

func TestExecMalformed(t *testing.T) {
runner := newRunnerMock([]byte(malformedJson), nil)
command := Command{Command: "badcommand arg1", Name: "mycollector"}
e := &Exec{runner: runner, Commands: []*Command{&command}}

var acc testutil.Accumulator
err := e.Gather(&acc)
require.Error(t, err)
}

func TestCommandError(t *testing.T) {
runner := newRunnerMock(nil, fmt.Errorf("exit status code 1"))
command := Command{Command: "badcommand", Name: "mycollector"}
e := &Exec{runner: runner, Commands: []*Command{&command}}
var acc testutil.Accumulator
err := e.Gather(&acc)
require.Error(t, err)
}