forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request influxdata#92 from Asana/exec
Add exec plugin
- Loading branch information
Showing
3 changed files
with
214 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
package exec | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"fmt" | ||
"github.com/gonuts/go-shellquote" | ||
"github.com/influxdb/telegraf/plugins" | ||
"os/exec" | ||
"sync" | ||
) | ||
|
||
const sampleConfig = ` | ||
# specify commands via an array of tables | ||
[[exec.commands]] | ||
# the command to run | ||
command = "/usr/bin/mycollector --foo=bar" | ||
# name of the command (used as a prefix for measurements) | ||
name = "mycollector" | ||
` | ||
|
||
type Command struct { | ||
Command string | ||
Name string | ||
} | ||
|
||
type Exec struct { | ||
Commands []*Command | ||
runner Runner | ||
} | ||
|
||
type Runner interface { | ||
Run(string, ...string) ([]byte, error) | ||
} | ||
|
||
type CommandRunner struct { | ||
} | ||
|
||
func NewExec() *Exec { | ||
return &Exec{runner: CommandRunner{}} | ||
} | ||
|
||
func (c CommandRunner) Run(command string, args ...string) ([]byte, error) { | ||
cmd := exec.Command(command, args...) | ||
var out bytes.Buffer | ||
cmd.Stdout = &out | ||
|
||
if err := cmd.Run(); err != nil { | ||
return nil, fmt.Errorf("exec: %s for command '%s'", err, command) | ||
} | ||
|
||
return out.Bytes(), nil | ||
} | ||
|
||
func (e *Exec) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
func (e *Exec) Description() string { | ||
return "Read flattened metrics from one or more commands that output JSON to stdout" | ||
} | ||
|
||
func (e *Exec) Gather(acc plugins.Accumulator) error { | ||
var wg sync.WaitGroup | ||
|
||
var outerr error | ||
|
||
for _, c := range e.Commands { | ||
wg.Add(1) | ||
go func(c *Command, acc plugins.Accumulator) { | ||
defer wg.Done() | ||
outerr = e.gatherCommand(c, acc) | ||
}(c, acc) | ||
} | ||
|
||
wg.Wait() | ||
|
||
return outerr | ||
} | ||
|
||
func (e *Exec) gatherCommand(c *Command, acc plugins.Accumulator) error { | ||
words, err := shellquote.Split(c.Command) | ||
if err != nil || len(words) == 0 { | ||
return fmt.Errorf("exec: unable to parse command, %s", err) | ||
} | ||
|
||
out, err := e.runner.Run(words[0], words[1:]...) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
var jsonOut interface{} | ||
err = json.Unmarshal(out, &jsonOut) | ||
if err != nil { | ||
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s", c.Command, err) | ||
} | ||
|
||
return processResponse(acc, c.Name, map[string]string{}, jsonOut) | ||
} | ||
|
||
func processResponse(acc plugins.Accumulator, prefix string, tags map[string]string, v interface{}) error { | ||
switch t := v.(type) { | ||
case map[string]interface{}: | ||
for k, v := range t { | ||
if err := processResponse(acc, prefix+"_"+k, tags, v); err != nil { | ||
return err | ||
} | ||
} | ||
case float64: | ||
acc.Add(prefix, v, tags) | ||
case bool, string, []interface{}: | ||
// ignored types | ||
return nil | ||
default: | ||
return fmt.Errorf("exec: got unexpected type %T with value %v (%s)", t, v, prefix) | ||
} | ||
return nil | ||
} | ||
|
||
func init() { | ||
plugins.Add("exec", func() plugins.Plugin { | ||
return NewExec() | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
package exec | ||
|
||
import ( | ||
"fmt" | ||
"github.com/influxdb/telegraf/testutil" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"testing" | ||
) | ||
|
||
const validJson = ` | ||
{ | ||
"status": "green", | ||
"num_processes": 82, | ||
"cpu": { | ||
"status": "red", | ||
"used": 8234, | ||
"free": 32 | ||
}, | ||
"percent": 0.81, | ||
"users": [0, 1, 2, 3] | ||
}` | ||
|
||
const malformedJson = ` | ||
{ | ||
"status": "green", | ||
` | ||
|
||
type runnerMock struct { | ||
out []byte | ||
err error | ||
} | ||
|
||
func newRunnerMock(out []byte, err error) Runner { | ||
return &runnerMock{out: out, err: err} | ||
} | ||
|
||
func (r runnerMock) Run(command string, args ...string) ([]byte, error) { | ||
if r.err != nil { | ||
return nil, r.err | ||
} | ||
return r.out, nil | ||
} | ||
|
||
func TestExec(t *testing.T) { | ||
runner := newRunnerMock([]byte(validJson), nil) | ||
command := Command{Command: "testcommand arg1", Name: "mycollector"} | ||
e := &Exec{runner: runner, Commands: []*Command{&command}} | ||
|
||
var acc testutil.Accumulator | ||
err := e.Gather(&acc) | ||
require.NoError(t, err) | ||
|
||
checkFloat := []struct { | ||
name string | ||
value float64 | ||
}{ | ||
{"mycollector_num_processes", 82}, | ||
{"mycollector_cpu_used", 8234}, | ||
{"mycollector_cpu_free", 32}, | ||
{"mycollector_percent", 0.81}, | ||
} | ||
|
||
for _, c := range checkFloat { | ||
assert.True(t, acc.CheckValue(c.name, c.value)) | ||
} | ||
|
||
assert.Equal(t, len(acc.Points), 4, "non-numeric measurements should be ignored") | ||
} | ||
|
||
func TestExecMalformed(t *testing.T) { | ||
runner := newRunnerMock([]byte(malformedJson), nil) | ||
command := Command{Command: "badcommand arg1", Name: "mycollector"} | ||
e := &Exec{runner: runner, Commands: []*Command{&command}} | ||
|
||
var acc testutil.Accumulator | ||
err := e.Gather(&acc) | ||
require.Error(t, err) | ||
} | ||
|
||
func TestCommandError(t *testing.T) { | ||
runner := newRunnerMock(nil, fmt.Errorf("exit status code 1")) | ||
command := Command{Command: "badcommand", Name: "mycollector"} | ||
e := &Exec{runner: runner, Commands: []*Command{&command}} | ||
var acc testutil.Accumulator | ||
err := e.Gather(&acc) | ||
require.Error(t, err) | ||
} |